meerschaum.connectors

Create connectors with meerschaum.connectors.get_connector(). For ease of use, you can also import from the root meerschaum module:

>>> from meerschaum import get_connector
>>> conn = get_connector()
  1#! /usr/bin/env python
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Create connectors with `meerschaum.connectors.get_connector()`.
  7For ease of use, you can also import from the root `meerschaum` module:
  8```
  9>>> from meerschaum import get_connector
 10>>> conn = get_connector()
 11```
 12"""
 13
 14from __future__ import annotations
 15
 16import meerschaum as mrsm
 17from meerschaum.utils.typing import Any, Union, List, Dict
 18from meerschaum.utils.threading import RLock
 19from meerschaum.utils.warnings import warn
 20
 21from meerschaum.connectors._Connector import Connector, InvalidAttributesError
 22from meerschaum.connectors.sql._SQLConnector import SQLConnector
 23from meerschaum.connectors.api._APIConnector import APIConnector
 24from meerschaum.connectors.sql._create_engine import flavor_configs as sql_flavor_configs
 25
 26__all__ = (
 27    "make_connector",
 28    "Connector",
 29    "SQLConnector",
 30    "APIConnector",
 31    "get_connector",
 32    "is_connected",
 33    "poll",
 34    "api",
 35    "sql",
 36    "valkey",
 37)
 38
 39### store connectors partitioned by
 40### type, label for reuse
 41connectors: Dict[str, Dict[str, Connector]] = {
 42    'api'    : {},
 43    'sql'    : {},
 44    'plugin' : {},
 45    'valkey' : {},
 46}
 47instance_types: List[str] = ['sql', 'api']
 48_locks: Dict[str, RLock] = {
 49    'connectors'               : RLock(),
 50    'types'                    : RLock(),
 51    'custom_types'             : RLock(),
 52    '_loaded_plugin_connectors': RLock(),
 53    'instance_types'           : RLock(),
 54}
 55attributes: Dict[str, Dict[str, Any]] = {
 56    'api': {
 57        'required': [
 58            'host',
 59            'username',
 60            'password',
 61        ],
 62        'optional': [
 63            'port',
 64        ],
 65        'default': {
 66            'protocol': 'http',
 67        },
 68    },
 69    'sql': {
 70        'flavors': sql_flavor_configs,
 71    },
 72}
 73### Fill this with objects only when connectors are first referenced.
 74types: Dict[str, Any] = {}
 75custom_types: set = set()
 76_loaded_plugin_connectors: bool = False
 77
 78
 79def get_connector(
 80    type: str = None,
 81    label: str = None,
 82    refresh: bool = False,
 83    debug: bool = False,
 84    **kw: Any
 85) -> Connector:
 86    """
 87    Return existing connector or create new connection and store for reuse.
 88    
 89    You can create new connectors if enough parameters are provided for the given type and flavor.
 90    
 91
 92    Parameters
 93    ----------
 94    type: Optional[str], default None
 95        Connector type (sql, api, etc.).
 96        Defaults to the type of the configured `instance_connector`.
 97
 98    label: Optional[str], default None
 99        Connector label (e.g. main). Defaults to `'main'`.
100
101    refresh: bool, default False
102        Refresh the Connector instance / construct new object. Defaults to `False`.
103
104    kw: Any
105        Other arguments to pass to the Connector constructor.
106        If the Connector has already been constructed and new arguments are provided,
107        `refresh` is set to `True` and the old Connector is replaced.
108
109    Returns
110    -------
111    A new Meerschaum connector (e.g. `meerschaum.connectors.api.APIConnector`,
112    `meerschaum.connectors.sql.SQLConnector`).
113    
114    Examples
115    --------
116    The following parameters would create a new
117    `meerschaum.connectors.sql.SQLConnector` that isn't in the configuration file.
118
119    ```
120    >>> conn = get_connector(
121    ...     type = 'sql',
122    ...     label = 'newlabel',
123    ...     flavor = 'sqlite',
124    ...     database = '/file/path/to/database.db'
125    ... )
126    >>>
127    ```
128
129    """
130    from meerschaum.connectors.parse import parse_instance_keys
131    from meerschaum.config import get_config
132    from meerschaum.config.static import STATIC_CONFIG
133    from meerschaum.utils.warnings import warn
134    global _loaded_plugin_connectors
135    if isinstance(type, str) and not label and ':' in type:
136        type, label = type.split(':', maxsplit=1)
137
138    with _locks['_loaded_plugin_connectors']:
139        if not _loaded_plugin_connectors:
140            load_plugin_connectors()
141            _load_builtin_custom_connectors()
142            _loaded_plugin_connectors = True
143
144    if type is None and label is None:
145        default_instance_keys = get_config('meerschaum', 'instance', patch=True)
146        ### recursive call to get_connector
147        return parse_instance_keys(default_instance_keys)
148
149    ### NOTE: the default instance connector may not be main.
150    ### Only fall back to 'main' if the type is provided by the label is omitted.
151    label = label if label is not None else STATIC_CONFIG['connectors']['default_label']
152
153    ### type might actually be a label. Check if so and raise a warning.
154    if type not in connectors:
155        possibilities, poss_msg = [], ""
156        for _type in get_config('meerschaum', 'connectors'):
157            if type in get_config('meerschaum', 'connectors', _type):
158                possibilities.append(f"{_type}:{type}")
159        if len(possibilities) > 0:
160            poss_msg = " Did you mean"
161            for poss in possibilities[:-1]:
162                poss_msg += f" '{poss}',"
163            if poss_msg.endswith(','):
164                poss_msg = poss_msg[:-1]
165            if len(possibilities) > 1:
166                poss_msg += " or"
167            poss_msg += f" '{possibilities[-1]}'?"
168
169        warn(f"Cannot create Connector of type '{type}'." + poss_msg, stack=False)
170        return None
171
172    if 'sql' not in types:
173        from meerschaum.connectors.plugin import PluginConnector
174        from meerschaum.connectors.valkey import ValkeyConnector
175        with _locks['types']:
176            types.update({
177                'api': APIConnector,
178                'sql': SQLConnector,
179                'plugin': PluginConnector,
180                'valkey': ValkeyConnector,
181            })
182
183    ### determine if we need to call the constructor
184    if not refresh:
185        ### see if any user-supplied arguments differ from the existing instance
186        if label in connectors[type]:
187            warning_message = None
188            for attribute, value in kw.items():
189                if attribute not in connectors[type][label].meta:
190                    import inspect
191                    cls = connectors[type][label].__class__
192                    cls_init_signature = inspect.signature(cls)
193                    cls_init_params = cls_init_signature.parameters
194                    if attribute not in cls_init_params:
195                        warning_message = (
196                            f"Received new attribute '{attribute}' not present in connector " +
197                            f"{connectors[type][label]}.\n"
198                        )
199                elif connectors[type][label].__dict__[attribute] != value:
200                    warning_message = (
201                        f"Mismatched values for attribute '{attribute}' in connector "
202                        + f"'{connectors[type][label]}'.\n" +
203                        f"  - Keyword value: '{value}'\n" +
204                        f"  - Existing value: '{connectors[type][label].__dict__[attribute]}'\n"
205                    )
206            if warning_message is not None:
207                warning_message += (
208                    "\nSetting `refresh` to True and recreating connector with type:"
209                    + f" '{type}' and label '{label}'."
210                )
211                refresh = True
212                warn(warning_message)
213        else: ### connector doesn't yet exist
214            refresh = True
215
216    ### only create an object if refresh is True
217    ### (can be manually specified, otherwise determined above)
218    if refresh:
219        with _locks['connectors']:
220            try:
221                ### will raise an error if configuration is incorrect / missing
222                conn = types[type](label=label, **kw)
223                connectors[type][label] = conn
224            except InvalidAttributesError as ie:
225                warn(
226                    f"Incorrect attributes for connector '{type}:{label}'.\n"
227                    + str(ie),
228                    stack = False,
229                )
230                conn = None
231            except Exception as e:
232                from meerschaum.utils.formatting import get_console
233                console = get_console()
234                if console:
235                    console.print_exception()
236                warn(
237                    f"Exception when creating connector '{type}:{label}'.\n" + str(e),
238                    stack = False,
239                )
240                conn = None
241        if conn is None:
242            return None
243
244    return connectors[type][label]
245
246
247def is_connected(keys: str, **kw) -> bool:
248    """
249    Check if the connector keys correspond to an active connection.
250    If the connector has not been created, it will immediately return `False`.
251    If the connector exists but cannot communicate with the source, return `False`.
252    
253    **NOTE:** Only works with instance connectors (`SQLConnectors` and `APIConnectors`).
254    Keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`.
255
256    Parameters
257    ----------
258    keys:
259        The keys to the connector (e.g. `'sql:main'`).
260        
261    Returns
262    -------
263    A `bool` corresponding to whether a successful connection may be made.
264
265    """
266    import warnings
267    if ':' not in keys:
268        warn(f"Invalid connector keys '{keys}'")
269
270    try:
271        typ, label = keys.split(':')
272    except Exception:
273        return False
274    if typ not in instance_types:
275        return False
276    if label not in connectors.get(typ, {}):
277        return False
278
279    from meerschaum.connectors.parse import parse_instance_keys
280    conn = parse_instance_keys(keys)
281    try:
282        with warnings.catch_warnings():
283            warnings.filterwarnings('ignore')
284            return conn.test_connection(**kw)
285    except Exception:
286        return False
287
288
289def make_connector(cls, _is_executor: bool = False):
290    """
291    Register a class as a `Connector`.
292    The `type` will be the lower case of the class name, without the suffix `connector`.
293
294    Parameters
295    ----------
296    instance: bool, default False
297        If `True`, make this connector type an instance connector.
298        This requires implementing the various pipes functions and lots of testing.
299
300    Examples
301    --------
302    >>> import meerschaum as mrsm
303    >>> from meerschaum.connectors import make_connector, Connector
304    >>> 
305    >>> @make_connector
306    >>> class FooConnector(Connector):
307    ...     REQUIRED_ATTRIBUTES: list[str] = ['username', 'password']
308    ... 
309    >>> conn = mrsm.get_connector('foo:bar', username='dog', password='cat')
310    >>> print(conn.username, conn.password)
311    dog cat
312    >>> 
313    """
314    import re
315    suffix_regex = (
316        r'connector$'
317        if not _is_executor
318        else r'executor$'
319    )
320    typ = re.sub(suffix_regex, '', cls.__name__.lower())
321    with _locks['types']:
322        types[typ] = cls
323    with _locks['custom_types']:
324        custom_types.add(typ)
325    with _locks['connectors']:
326        if typ not in connectors:
327            connectors[typ] = {}
328    if getattr(cls, 'IS_INSTANCE', False):
329        with _locks['instance_types']:
330            if typ not in instance_types:
331                instance_types.append(typ)
332
333    return cls
334
335
336def load_plugin_connectors():
337    """
338    If a plugin makes use of the `make_connector` decorator,
339    load its module.
340    """
341    from meerschaum.plugins import get_plugins, import_plugins
342    to_import = []
343    for plugin in get_plugins():
344        if plugin is None:
345            continue
346        with open(plugin.__file__, encoding='utf-8') as f:
347            text = f.read()
348        if 'make_connector' in text or 'Connector' in text:
349            to_import.append(plugin.name)
350    if not to_import:
351        return
352    import_plugins(*to_import)
353
354
355def get_connector_plugin(
356    connector: Connector,
357) -> Union[str, None, mrsm.Plugin]:
358    """
359    Determine the plugin for a connector.
360    This is useful for handling virtual environments for custom instance connectors.
361
362    Parameters
363    ----------
364    connector: Connector
365        The connector which may require a virtual environment.
366
367    Returns
368    -------
369    A Plugin, 'mrsm', or None.
370    """
371    if not hasattr(connector, 'type'):
372        return None
373    plugin_name = (
374        connector.__module__.replace('plugins.', '').split('.')[0]
375        if connector.type in custom_types else (
376            connector.label
377            if connector.type == 'plugin'
378            else 'mrsm'
379        )
380    )
381    plugin = mrsm.Plugin(plugin_name)
382    return plugin if plugin.is_installed() else None
383
384
385def _load_builtin_custom_connectors():
386    """
387    Import custom connectors decorated with `@make_connector` or `@make_executor`.
388    """
389    import meerschaum.jobs.systemd
390    import meerschaum.connectors.valkey
def make_connector(cls, _is_executor: bool = False):
290def make_connector(cls, _is_executor: bool = False):
291    """
292    Register a class as a `Connector`.
293    The `type` will be the lower case of the class name, without the suffix `connector`.
294
295    Parameters
296    ----------
297    instance: bool, default False
298        If `True`, make this connector type an instance connector.
299        This requires implementing the various pipes functions and lots of testing.
300
301    Examples
302    --------
303    >>> import meerschaum as mrsm
304    >>> from meerschaum.connectors import make_connector, Connector
305    >>> 
306    >>> @make_connector
307    >>> class FooConnector(Connector):
308    ...     REQUIRED_ATTRIBUTES: list[str] = ['username', 'password']
309    ... 
310    >>> conn = mrsm.get_connector('foo:bar', username='dog', password='cat')
311    >>> print(conn.username, conn.password)
312    dog cat
313    >>> 
314    """
315    import re
316    suffix_regex = (
317        r'connector$'
318        if not _is_executor
319        else r'executor$'
320    )
321    typ = re.sub(suffix_regex, '', cls.__name__.lower())
322    with _locks['types']:
323        types[typ] = cls
324    with _locks['custom_types']:
325        custom_types.add(typ)
326    with _locks['connectors']:
327        if typ not in connectors:
328            connectors[typ] = {}
329    if getattr(cls, 'IS_INSTANCE', False):
330        with _locks['instance_types']:
331            if typ not in instance_types:
332                instance_types.append(typ)
333
334    return cls

Register a class as a Connector. The type will be the lower case of the class name, without the suffix connector.

Parameters
  • instance (bool, default False): If True, make this connector type an instance connector. This requires implementing the various pipes functions and lots of testing.
Examples
>>> import meerschaum as mrsm
>>> from meerschaum.connectors import make_connector, Connector
>>> 
>>> @make_connector
>>> class FooConnector(Connector):
...     REQUIRED_ATTRIBUTES: list[str] = ['username', 'password']
... 
>>> conn = mrsm.get_connector('foo:bar', username='dog', password='cat')
>>> print(conn.username, conn.password)
dog cat
>>>
class Connector:
 20class Connector(metaclass=abc.ABCMeta):
 21    """
 22    The base connector class to hold connection attributes.
 23    """
 24    def __init__(
 25        self,
 26        type: Optional[str] = None,
 27        label: Optional[str] = None,
 28        **kw: Any
 29    ):
 30        """
 31        Set the given keyword arguments as attributes.
 32
 33        Parameters
 34        ----------
 35        type: str
 36            The `type` of the connector (e.g. `sql`, `api`, `plugin`).
 37
 38        label: str
 39            The `label` for the connector.
 40
 41
 42        Examples
 43        --------
 44        Run `mrsm edit config` and to edit connectors in the YAML file:
 45
 46        ```yaml
 47        meerschaum:
 48            connections:
 49                {type}:
 50                    {label}:
 51                        ### attributes go here
 52        ```
 53
 54        """
 55        self._original_dict = copy.deepcopy(self.__dict__)
 56        self._set_attributes(type=type, label=label, **kw)
 57
 58        ### NOTE: Override `REQUIRED_ATTRIBUTES` if `uri` is set.
 59        self.verify_attributes(
 60            ['uri']
 61            if 'uri' in self.__dict__
 62            else getattr(self, 'REQUIRED_ATTRIBUTES', None)
 63        )
 64
 65    def _reset_attributes(self):
 66        self.__dict__ = self._original_dict
 67
 68    def _set_attributes(
 69        self,
 70        *args,
 71        inherit_default: bool = True,
 72        **kw: Any
 73    ):
 74        from meerschaum.config.static import STATIC_CONFIG
 75        from meerschaum.utils.warnings import error
 76
 77        self._attributes = {}
 78
 79        default_label = STATIC_CONFIG['connectors']['default_label']
 80
 81        ### NOTE: Support the legacy method of explicitly passing the type.
 82        label = kw.get('label', None)
 83        if label is None:
 84            if len(args) == 2:
 85                label = args[1]
 86            elif len(args) == 0:
 87                label = None
 88            else:
 89                label = args[0]
 90
 91        if label == 'default':
 92            error(
 93                f"Label cannot be 'default'. Did you mean '{default_label}'?",
 94                InvalidAttributesError,
 95            )
 96        self.__dict__['label'] = label
 97
 98        from meerschaum.config import get_config
 99        conn_configs = copy.deepcopy(get_config('meerschaum', 'connectors'))
100        connector_config = copy.deepcopy(get_config('system', 'connectors'))
101
102        ### inherit attributes from 'default' if exists
103        if inherit_default:
104            inherit_from = 'default'
105            if self.type in conn_configs and inherit_from in conn_configs[self.type]:
106                _inherit_dict = copy.deepcopy(conn_configs[self.type][inherit_from])
107                self._attributes.update(_inherit_dict)
108
109        ### load user config into self._attributes
110        if self.type in conn_configs and self.label in conn_configs[self.type]:
111            self._attributes.update(conn_configs[self.type][self.label] or {})
112
113        ### load system config into self._sys_config
114        ### (deep copy so future Connectors don't inherit changes)
115        if self.type in connector_config:
116            self._sys_config = copy.deepcopy(connector_config[self.type])
117
118        ### add additional arguments or override configuration
119        self._attributes.update(kw)
120
121        ### finally, update __dict__ with _attributes.
122        self.__dict__.update(self._attributes)
123
124    def verify_attributes(
125        self,
126        required_attributes: Optional[List[str]] = None,
127        debug: bool = False,
128    ) -> None:
129        """
130        Ensure that the required attributes have been met.
131        
132        The Connector base class checks the minimum requirements.
133        Child classes may enforce additional requirements.
134
135        Parameters
136        ----------
137        required_attributes: Optional[List[str]], default None
138            Attributes to be verified. If `None`, default to `['label']`.
139
140        debug: bool, default False
141            Verbosity toggle.
142
143        Returns
144        -------
145        Don't return anything.
146
147        Raises
148        ------
149        An error if any of the required attributes are missing.
150        """
151        from meerschaum.utils.warnings import error, warn
152        from meerschaum.utils.debug import dprint
153        from meerschaum.utils.misc import items_str
154        if required_attributes is None:
155            required_attributes = ['label']
156
157        missing_attributes = set()
158        for a in required_attributes:
159            if a not in self.__dict__:
160                missing_attributes.add(a)
161        if len(missing_attributes) > 0:
162            error(
163                (
164                    f"Missing {items_str(list(missing_attributes))} "
165                    + f"for connector '{self.type}:{self.label}'."
166                ),
167                InvalidAttributesError,
168                silent=True,
169                stack=False
170            )
171
172
173    def __str__(self):
174        """
175        When cast to a string, return type:label.
176        """
177        return f"{self.type}:{self.label}"
178
179    def __repr__(self):
180        """
181        Represent the connector as type:label.
182        """
183        return str(self)
184
185    @property
186    def meta(self) -> Dict[str, Any]:
187        """
188        Return the keys needed to reconstruct this Connector.
189        """
190        _meta = {
191            key: value
192            for key, value in self.__dict__.items()
193            if not str(key).startswith('_')
194        }
195        _meta.update({
196            'type': self.type,
197            'label': self.label,
198        })
199        return _meta
200
201
202    @property
203    def type(self) -> str:
204        """
205        Return the type for this connector.
206        """
207        _type = self.__dict__.get('type', None)
208        if _type is None:
209            import re
210            is_executor = self.__class__.__name__.lower().endswith('executor')
211            suffix_regex = (
212                r'connector$'
213                if not is_executor
214                else r'executor$'
215            )
216            _type = re.sub(suffix_regex, '', self.__class__.__name__.lower())
217            self.__dict__['type'] = _type
218        return _type
219
220
221    @property
222    def label(self) -> str:
223        """
224        Return the label for this connector.
225        """
226        _label = self.__dict__.get('label', None)
227        if _label is None:
228            from meerschaum.config.static import STATIC_CONFIG
229            _label = STATIC_CONFIG['connectors']['default_label']
230            self.__dict__['label'] = _label
231        return _label

The base connector class to hold connection attributes.

Connector(type: Optional[str] = None, label: Optional[str] = None, **kw: Any)
24    def __init__(
25        self,
26        type: Optional[str] = None,
27        label: Optional[str] = None,
28        **kw: Any
29    ):
30        """
31        Set the given keyword arguments as attributes.
32
33        Parameters
34        ----------
35        type: str
36            The `type` of the connector (e.g. `sql`, `api`, `plugin`).
37
38        label: str
39            The `label` for the connector.
40
41
42        Examples
43        --------
44        Run `mrsm edit config` and to edit connectors in the YAML file:
45
46        ```yaml
47        meerschaum:
48            connections:
49                {type}:
50                    {label}:
51                        ### attributes go here
52        ```
53
54        """
55        self._original_dict = copy.deepcopy(self.__dict__)
56        self._set_attributes(type=type, label=label, **kw)
57
58        ### NOTE: Override `REQUIRED_ATTRIBUTES` if `uri` is set.
59        self.verify_attributes(
60            ['uri']
61            if 'uri' in self.__dict__
62            else getattr(self, 'REQUIRED_ATTRIBUTES', None)
63        )

Set the given keyword arguments as attributes.

Parameters
  • type (str): The type of the connector (e.g. sql, api, plugin).
  • label (str): The label for the connector.
Examples

Run mrsm edit config and to edit connectors in the YAML file:

meerschaum:
    connections:
        {type}:
            {label}:
                ### attributes go here
def verify_attributes( self, required_attributes: Optional[List[str]] = None, debug: bool = False) -> None:
124    def verify_attributes(
125        self,
126        required_attributes: Optional[List[str]] = None,
127        debug: bool = False,
128    ) -> None:
129        """
130        Ensure that the required attributes have been met.
131        
132        The Connector base class checks the minimum requirements.
133        Child classes may enforce additional requirements.
134
135        Parameters
136        ----------
137        required_attributes: Optional[List[str]], default None
138            Attributes to be verified. If `None`, default to `['label']`.
139
140        debug: bool, default False
141            Verbosity toggle.
142
143        Returns
144        -------
145        Don't return anything.
146
147        Raises
148        ------
149        An error if any of the required attributes are missing.
150        """
151        from meerschaum.utils.warnings import error, warn
152        from meerschaum.utils.debug import dprint
153        from meerschaum.utils.misc import items_str
154        if required_attributes is None:
155            required_attributes = ['label']
156
157        missing_attributes = set()
158        for a in required_attributes:
159            if a not in self.__dict__:
160                missing_attributes.add(a)
161        if len(missing_attributes) > 0:
162            error(
163                (
164                    f"Missing {items_str(list(missing_attributes))} "
165                    + f"for connector '{self.type}:{self.label}'."
166                ),
167                InvalidAttributesError,
168                silent=True,
169                stack=False
170            )

Ensure that the required attributes have been met.

The Connector base class checks the minimum requirements. Child classes may enforce additional requirements.

Parameters
  • required_attributes (Optional[List[str]], default None): Attributes to be verified. If None, default to ['label'].
  • debug (bool, default False): Verbosity toggle.
Returns
  • Don't return anything.
Raises
  • An error if any of the required attributes are missing.
meta: Dict[str, Any]
185    @property
186    def meta(self) -> Dict[str, Any]:
187        """
188        Return the keys needed to reconstruct this Connector.
189        """
190        _meta = {
191            key: value
192            for key, value in self.__dict__.items()
193            if not str(key).startswith('_')
194        }
195        _meta.update({
196            'type': self.type,
197            'label': self.label,
198        })
199        return _meta

Return the keys needed to reconstruct this Connector.

type: str
202    @property
203    def type(self) -> str:
204        """
205        Return the type for this connector.
206        """
207        _type = self.__dict__.get('type', None)
208        if _type is None:
209            import re
210            is_executor = self.__class__.__name__.lower().endswith('executor')
211            suffix_regex = (
212                r'connector$'
213                if not is_executor
214                else r'executor$'
215            )
216            _type = re.sub(suffix_regex, '', self.__class__.__name__.lower())
217            self.__dict__['type'] = _type
218        return _type

Return the type for this connector.

label: str
221    @property
222    def label(self) -> str:
223        """
224        Return the label for this connector.
225        """
226        _label = self.__dict__.get('label', None)
227        if _label is None:
228            from meerschaum.config.static import STATIC_CONFIG
229            _label = STATIC_CONFIG['connectors']['default_label']
230            self.__dict__['label'] = _label
231        return _label

Return the label for this connector.

class SQLConnector(meerschaum.connectors.Connector):
 18class SQLConnector(Connector):
 19    """
 20    Connect to SQL databases via `sqlalchemy`.
 21    
 22    SQLConnectors may be used as Meerschaum instance connectors.
 23    Read more about connectors and instances at
 24    https://meerschaum.io/reference/connectors/
 25
 26    """
 27
 28    IS_INSTANCE: bool = True
 29
 30    from ._create_engine import flavor_configs, create_engine
 31    from ._sql import (
 32        read,
 33        value,
 34        exec,
 35        execute,
 36        to_sql,
 37        exec_queries,
 38        get_connection,
 39        _cleanup_connections,
 40    )
 41    from meerschaum.utils.sql import test_connection
 42    from ._fetch import fetch, get_pipe_metadef
 43    from ._cli import cli, _cli_exit
 44    from ._pipes import (
 45        fetch_pipes_keys,
 46        create_indices,
 47        drop_indices,
 48        get_create_index_queries,
 49        get_drop_index_queries,
 50        get_add_columns_queries,
 51        get_alter_columns_queries,
 52        delete_pipe,
 53        get_pipe_data,
 54        get_pipe_data_query,
 55        register_pipe,
 56        edit_pipe,
 57        get_pipe_id,
 58        get_pipe_attributes,
 59        sync_pipe,
 60        sync_pipe_inplace,
 61        get_sync_time,
 62        pipe_exists,
 63        get_pipe_rowcount,
 64        drop_pipe,
 65        clear_pipe,
 66        deduplicate_pipe,
 67        get_pipe_table,
 68        get_pipe_columns_types,
 69        get_to_sql_dtype,
 70        get_pipe_schema,
 71        create_pipe_table_from_df,
 72        get_pipe_columns_indices,
 73        get_temporary_target,
 74        create_pipe_indices,
 75        drop_pipe_indices,
 76        get_pipe_index_names,
 77    )
 78    from ._plugins import (
 79        register_plugin,
 80        delete_plugin,
 81        get_plugin_id,
 82        get_plugin_version,
 83        get_plugins,
 84        get_plugin_user_id,
 85        get_plugin_username,
 86        get_plugin_attributes,
 87    )
 88    from ._users import (
 89        register_user,
 90        get_user_id,
 91        get_users,
 92        edit_user,
 93        delete_user,
 94        get_user_password_hash,
 95        get_user_type,
 96        get_user_attributes,
 97    )
 98    from ._uri import from_uri, parse_uri
 99    from ._instance import (
100        _log_temporary_tables_creation,
101        _drop_temporary_table,
102        _drop_temporary_tables,
103        _drop_old_temporary_tables,
104    )
105
106    def __init__(
107        self,
108        label: Optional[str] = None,
109        flavor: Optional[str] = None,
110        wait: bool = False,
111        connect: bool = False,
112        debug: bool = False,
113        **kw: Any
114    ):
115        """
116        Parameters
117        ----------
118        label: str, default 'main'
119            The identifying label for the connector.
120            E.g. for `sql:main`, 'main' is the label.
121            Defaults to 'main'.
122
123        flavor: Optional[str], default None
124            The database flavor, e.g.
125            `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc.
126            To see supported flavors, run the `bootstrap connectors` command.
127
128        wait: bool, default False
129            If `True`, block until a database connection has been made.
130            Defaults to `False`.
131
132        connect: bool, default False
133            If `True`, immediately attempt to connect the database and raise
134            a warning if the connection fails.
135            Defaults to `False`.
136
137        debug: bool, default False
138            Verbosity toggle.
139            Defaults to `False`.
140
141        kw: Any
142            All other arguments will be passed to the connector's attributes.
143            Therefore, a connector may be made without being registered,
144            as long enough parameters are supplied to the constructor.
145        """
146        if 'uri' in kw:
147            uri = kw['uri']
148            if uri.startswith('postgres') and not uri.startswith('postgresql'):
149                uri = uri.replace('postgres', 'postgresql', 1)
150            if uri.startswith('postgresql') and not uri.startswith('postgresql+'):
151                uri = uri.replace('postgresql://', 'postgresql+psycopg://', 1)
152            if uri.startswith('timescaledb://'):
153                uri = uri.replace('timescaledb://', 'postgresql+psycopg://', 1)
154                flavor = 'timescaledb'
155            kw['uri'] = uri
156            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
157            label = label or from_uri_params.get('label', None)
158            _ = from_uri_params.pop('label', None)
159
160            ### Sometimes the flavor may be provided with a URI.
161            kw.update(from_uri_params)
162            if flavor:
163                kw['flavor'] = flavor
164
165        ### set __dict__ in base class
166        super().__init__(
167            'sql',
168            label = label or self.__dict__.get('label', None),
169            **kw
170        )
171
172        if self.__dict__.get('flavor', None) == 'sqlite':
173            self._reset_attributes()
174            self._set_attributes(
175                'sql',
176                label = label,
177                inherit_default = False,
178                **kw
179            )
180            ### For backwards compatability reasons, set the path for sql:local if its missing.
181            if self.label == 'local' and not self.__dict__.get('database', None):
182                from meerschaum.config._paths import SQLITE_DB_PATH
183                self.database = str(SQLITE_DB_PATH)
184
185        ### ensure flavor and label are set accordingly
186        if 'flavor' not in self.__dict__:
187            if flavor is None and 'uri' not in self.__dict__:
188                raise Exception(
189                    f"    Missing flavor. Provide flavor as a key for '{self}'."
190                )
191            self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None)
192
193        if self.flavor == 'postgres':
194            self.flavor = 'postgresql'
195
196        self._debug = debug
197        ### Store the PID and thread at initialization
198        ### so we can dispose of the Pool in child processes or threads.
199        import os
200        import threading
201        self._pid = os.getpid()
202        self._thread_ident = threading.current_thread().ident
203        self._sessions = {}
204        self._locks = {'_sessions': threading.RLock(), }
205
206        ### verify the flavor's requirements are met
207        if self.flavor not in self.flavor_configs:
208            error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector")
209        if not self.__dict__.get('uri'):
210            self.verify_attributes(
211                self.flavor_configs[self.flavor].get('requirements', set()),
212                debug=debug,
213            )
214
215        if wait:
216            from meerschaum.connectors.poll import retry_connect
217            retry_connect(connector=self, debug=debug)
218
219        if connect:
220            if not self.test_connection(debug=debug):
221                warn(f"Failed to connect with connector '{self}'!", stack=False)
222
223    @property
224    def Session(self):
225        if '_Session' not in self.__dict__:
226            if self.engine is None:
227                return None
228
229            from meerschaum.utils.packages import attempt_import
230            sqlalchemy_orm = attempt_import('sqlalchemy.orm', lazy=False)
231            session_factory = sqlalchemy_orm.sessionmaker(self.engine)
232            self._Session = sqlalchemy_orm.scoped_session(session_factory)
233
234        return self._Session
235
236    @property
237    def engine(self):
238        """
239        Return the SQLAlchemy engine connected to the configured database.
240        """
241        import os
242        import threading
243        if '_engine' not in self.__dict__:
244            self._engine, self._engine_str = self.create_engine(include_uri=True)
245
246        same_process = os.getpid() == self._pid
247        same_thread = threading.current_thread().ident == self._thread_ident
248
249        ### handle child processes
250        if not same_process:
251            self._pid = os.getpid()
252            self._thread = threading.current_thread()
253            warn("Different PID detected. Disposing of connections...")
254            self._engine.dispose()
255
256        ### handle different threads
257        if not same_thread:
258            if self.flavor == 'duckdb':
259                warn("Different thread detected.")
260                self._engine.dispose()
261
262        return self._engine
263
264    @property
265    def DATABASE_URL(self) -> str:
266        """
267        Return the URI connection string (alias for `SQLConnector.URI`.
268        """
269        _ = self.engine
270        return str(self._engine_str)
271
272    @property
273    def URI(self) -> str:
274        """
275        Return the URI connection string.
276        """
277        _ = self.engine
278        return str(self._engine_str)
279
280    @property
281    def IS_THREAD_SAFE(self) -> str:
282        """
283        Return whether this connector may be multithreaded.
284        """
285        if self.flavor in ('duckdb', 'oracle'):
286            return False
287        if self.flavor == 'sqlite':
288            return ':memory:' not in self.URI
289        return True
290
291    @property
292    def metadata(self):
293        """
294        Return the metadata bound to this configured schema.
295        """
296        from meerschaum.utils.packages import attempt_import
297        sqlalchemy = attempt_import('sqlalchemy', lazy=False)
298        if '_metadata' not in self.__dict__:
299            self._metadata = sqlalchemy.MetaData(schema=self.schema)
300        return self._metadata
301
302    @property
303    def instance_schema(self):
304        """
305        Return the schema name for Meerschaum tables. 
306        """
307        return self.schema
308
309    @property
310    def internal_schema(self):
311        """
312        Return the schema name for internal tables. 
313        """
314        from meerschaum.config.static import STATIC_CONFIG
315        from meerschaum.utils.sql import NO_SCHEMA_FLAVORS
316        schema_name = self.__dict__.get('internal_schema', None) or (
317            STATIC_CONFIG['sql']['internal_schema']
318            if self.flavor not in NO_SCHEMA_FLAVORS
319            else self.schema
320        )
321
322        if '_internal_schema' not in self.__dict__:
323            self._internal_schema = schema_name
324        return self._internal_schema
325
326    @property
327    def db(self) -> Optional[databases.Database]:
328        from meerschaum.utils.packages import attempt_import
329        databases = attempt_import('databases', lazy=False, install=True)
330        url = self.DATABASE_URL
331        if 'mysql' in url:
332            url = url.replace('+pymysql', '')
333        if '_db' not in self.__dict__:
334            try:
335                self._db = databases.Database(url)
336            except KeyError:
337                ### Likely encountered an unsupported flavor.
338                from meerschaum.utils.warnings import warn
339                self._db = None
340        return self._db
341
342    @property
343    def db_version(self) -> Union[str, None]:
344        """
345        Return the database version.
346        """
347        _db_version = self.__dict__.get('_db_version', None)
348        if _db_version is not None:
349            return _db_version
350
351        from meerschaum.utils.sql import get_db_version
352        self._db_version = get_db_version(self)
353        return self._db_version
354
355    @property
356    def schema(self) -> Union[str, None]:
357        """
358        Return the default schema to use.
359        A value of `None` will not prepend a schema.
360        """
361        if 'schema' in self.__dict__:
362            return self.__dict__['schema']
363
364        from meerschaum.utils.sql import NO_SCHEMA_FLAVORS
365        if self.flavor in NO_SCHEMA_FLAVORS:
366            self.__dict__['schema'] = None
367            return None
368
369        sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False)
370        _schema = sqlalchemy.inspect(self.engine).default_schema_name
371        self.__dict__['schema'] = _schema
372        return _schema
373
374    def __getstate__(self):
375        return self.__dict__
376
377    def __setstate__(self, d):
378        self.__dict__.update(d)
379
380    def __call__(self):
381        return self

Connect to SQL databases via sqlalchemy.

SQLConnectors may be used as Meerschaum instance connectors. Read more about connectors and instances at https://meerschaum.io/reference/connectors/

SQLConnector( label: Optional[str] = None, flavor: Optional[str] = None, wait: bool = False, connect: bool = False, debug: bool = False, **kw: Any)
106    def __init__(
107        self,
108        label: Optional[str] = None,
109        flavor: Optional[str] = None,
110        wait: bool = False,
111        connect: bool = False,
112        debug: bool = False,
113        **kw: Any
114    ):
115        """
116        Parameters
117        ----------
118        label: str, default 'main'
119            The identifying label for the connector.
120            E.g. for `sql:main`, 'main' is the label.
121            Defaults to 'main'.
122
123        flavor: Optional[str], default None
124            The database flavor, e.g.
125            `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc.
126            To see supported flavors, run the `bootstrap connectors` command.
127
128        wait: bool, default False
129            If `True`, block until a database connection has been made.
130            Defaults to `False`.
131
132        connect: bool, default False
133            If `True`, immediately attempt to connect the database and raise
134            a warning if the connection fails.
135            Defaults to `False`.
136
137        debug: bool, default False
138            Verbosity toggle.
139            Defaults to `False`.
140
141        kw: Any
142            All other arguments will be passed to the connector's attributes.
143            Therefore, a connector may be made without being registered,
144            as long enough parameters are supplied to the constructor.
145        """
146        if 'uri' in kw:
147            uri = kw['uri']
148            if uri.startswith('postgres') and not uri.startswith('postgresql'):
149                uri = uri.replace('postgres', 'postgresql', 1)
150            if uri.startswith('postgresql') and not uri.startswith('postgresql+'):
151                uri = uri.replace('postgresql://', 'postgresql+psycopg://', 1)
152            if uri.startswith('timescaledb://'):
153                uri = uri.replace('timescaledb://', 'postgresql+psycopg://', 1)
154                flavor = 'timescaledb'
155            kw['uri'] = uri
156            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
157            label = label or from_uri_params.get('label', None)
158            _ = from_uri_params.pop('label', None)
159
160            ### Sometimes the flavor may be provided with a URI.
161            kw.update(from_uri_params)
162            if flavor:
163                kw['flavor'] = flavor
164
165        ### set __dict__ in base class
166        super().__init__(
167            'sql',
168            label = label or self.__dict__.get('label', None),
169            **kw
170        )
171
172        if self.__dict__.get('flavor', None) == 'sqlite':
173            self._reset_attributes()
174            self._set_attributes(
175                'sql',
176                label = label,
177                inherit_default = False,
178                **kw
179            )
180            ### For backwards compatability reasons, set the path for sql:local if its missing.
181            if self.label == 'local' and not self.__dict__.get('database', None):
182                from meerschaum.config._paths import SQLITE_DB_PATH
183                self.database = str(SQLITE_DB_PATH)
184
185        ### ensure flavor and label are set accordingly
186        if 'flavor' not in self.__dict__:
187            if flavor is None and 'uri' not in self.__dict__:
188                raise Exception(
189                    f"    Missing flavor. Provide flavor as a key for '{self}'."
190                )
191            self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None)
192
193        if self.flavor == 'postgres':
194            self.flavor = 'postgresql'
195
196        self._debug = debug
197        ### Store the PID and thread at initialization
198        ### so we can dispose of the Pool in child processes or threads.
199        import os
200        import threading
201        self._pid = os.getpid()
202        self._thread_ident = threading.current_thread().ident
203        self._sessions = {}
204        self._locks = {'_sessions': threading.RLock(), }
205
206        ### verify the flavor's requirements are met
207        if self.flavor not in self.flavor_configs:
208            error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector")
209        if not self.__dict__.get('uri'):
210            self.verify_attributes(
211                self.flavor_configs[self.flavor].get('requirements', set()),
212                debug=debug,
213            )
214
215        if wait:
216            from meerschaum.connectors.poll import retry_connect
217            retry_connect(connector=self, debug=debug)
218
219        if connect:
220            if not self.test_connection(debug=debug):
221                warn(f"Failed to connect with connector '{self}'!", stack=False)
Parameters
  • label (str, default 'main'): The identifying label for the connector. E.g. for sql:main, 'main' is the label. Defaults to 'main'.
  • flavor (Optional[str], default None): The database flavor, e.g. 'sqlite', 'postgresql', 'cockroachdb', etc. To see supported flavors, run the bootstrap connectors command.
  • wait (bool, default False): If True, block until a database connection has been made. Defaults to False.
  • connect (bool, default False): If True, immediately attempt to connect the database and raise a warning if the connection fails. Defaults to False.
  • debug (bool, default False): Verbosity toggle. Defaults to False.
  • kw (Any): All other arguments will be passed to the connector's attributes. Therefore, a connector may be made without being registered, as long enough parameters are supplied to the constructor.
IS_INSTANCE: bool = True
Session
223    @property
224    def Session(self):
225        if '_Session' not in self.__dict__:
226            if self.engine is None:
227                return None
228
229            from meerschaum.utils.packages import attempt_import
230            sqlalchemy_orm = attempt_import('sqlalchemy.orm', lazy=False)
231            session_factory = sqlalchemy_orm.sessionmaker(self.engine)
232            self._Session = sqlalchemy_orm.scoped_session(session_factory)
233
234        return self._Session
engine
236    @property
237    def engine(self):
238        """
239        Return the SQLAlchemy engine connected to the configured database.
240        """
241        import os
242        import threading
243        if '_engine' not in self.__dict__:
244            self._engine, self._engine_str = self.create_engine(include_uri=True)
245
246        same_process = os.getpid() == self._pid
247        same_thread = threading.current_thread().ident == self._thread_ident
248
249        ### handle child processes
250        if not same_process:
251            self._pid = os.getpid()
252            self._thread = threading.current_thread()
253            warn("Different PID detected. Disposing of connections...")
254            self._engine.dispose()
255
256        ### handle different threads
257        if not same_thread:
258            if self.flavor == 'duckdb':
259                warn("Different thread detected.")
260                self._engine.dispose()
261
262        return self._engine

Return the SQLAlchemy engine connected to the configured database.

DATABASE_URL: str
264    @property
265    def DATABASE_URL(self) -> str:
266        """
267        Return the URI connection string (alias for `SQLConnector.URI`.
268        """
269        _ = self.engine
270        return str(self._engine_str)

Return the URI connection string (alias for SQLConnector.URI.

URI: str
272    @property
273    def URI(self) -> str:
274        """
275        Return the URI connection string.
276        """
277        _ = self.engine
278        return str(self._engine_str)

Return the URI connection string.

IS_THREAD_SAFE: str
280    @property
281    def IS_THREAD_SAFE(self) -> str:
282        """
283        Return whether this connector may be multithreaded.
284        """
285        if self.flavor in ('duckdb', 'oracle'):
286            return False
287        if self.flavor == 'sqlite':
288            return ':memory:' not in self.URI
289        return True

Return whether this connector may be multithreaded.

metadata
291    @property
292    def metadata(self):
293        """
294        Return the metadata bound to this configured schema.
295        """
296        from meerschaum.utils.packages import attempt_import
297        sqlalchemy = attempt_import('sqlalchemy', lazy=False)
298        if '_metadata' not in self.__dict__:
299            self._metadata = sqlalchemy.MetaData(schema=self.schema)
300        return self._metadata

Return the metadata bound to this configured schema.

instance_schema
302    @property
303    def instance_schema(self):
304        """
305        Return the schema name for Meerschaum tables. 
306        """
307        return self.schema

Return the schema name for Meerschaum tables.

internal_schema
309    @property
310    def internal_schema(self):
311        """
312        Return the schema name for internal tables. 
313        """
314        from meerschaum.config.static import STATIC_CONFIG
315        from meerschaum.utils.sql import NO_SCHEMA_FLAVORS
316        schema_name = self.__dict__.get('internal_schema', None) or (
317            STATIC_CONFIG['sql']['internal_schema']
318            if self.flavor not in NO_SCHEMA_FLAVORS
319            else self.schema
320        )
321
322        if '_internal_schema' not in self.__dict__:
323            self._internal_schema = schema_name
324        return self._internal_schema

Return the schema name for internal tables.

db: 'Optional[databases.Database]'
326    @property
327    def db(self) -> Optional[databases.Database]:
328        from meerschaum.utils.packages import attempt_import
329        databases = attempt_import('databases', lazy=False, install=True)
330        url = self.DATABASE_URL
331        if 'mysql' in url:
332            url = url.replace('+pymysql', '')
333        if '_db' not in self.__dict__:
334            try:
335                self._db = databases.Database(url)
336            except KeyError:
337                ### Likely encountered an unsupported flavor.
338                from meerschaum.utils.warnings import warn
339                self._db = None
340        return self._db
db_version: Optional[str]
342    @property
343    def db_version(self) -> Union[str, None]:
344        """
345        Return the database version.
346        """
347        _db_version = self.__dict__.get('_db_version', None)
348        if _db_version is not None:
349            return _db_version
350
351        from meerschaum.utils.sql import get_db_version
352        self._db_version = get_db_version(self)
353        return self._db_version

Return the database version.

schema: Optional[str]
355    @property
356    def schema(self) -> Union[str, None]:
357        """
358        Return the default schema to use.
359        A value of `None` will not prepend a schema.
360        """
361        if 'schema' in self.__dict__:
362            return self.__dict__['schema']
363
364        from meerschaum.utils.sql import NO_SCHEMA_FLAVORS
365        if self.flavor in NO_SCHEMA_FLAVORS:
366            self.__dict__['schema'] = None
367            return None
368
369        sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False)
370        _schema = sqlalchemy.inspect(self.engine).default_schema_name
371        self.__dict__['schema'] = _schema
372        return _schema

Return the default schema to use. A value of None will not prepend a schema.

flavor_configs = {'timescaledb': {'engine': 'postgresql+psycopg', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {}, 'requirements': {'password', 'host', 'username', 'database'}, 'defaults': {'port': 5432}}, 'postgresql': {'engine': 'postgresql+psycopg', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {}, 'requirements': {'password', 'host', 'username', 'database'}, 'defaults': {'port': 5432}}, 'citus': {'engine': 'postgresql+psycopg', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {}, 'requirements': {'password', 'host', 'username', 'database'}, 'defaults': {'port': 5432}}, 'mssql': {'engine': 'mssql+pyodbc', 'create_engine': {'fast_executemany': True, 'use_insertmanyvalues': False, 'isolation_level': 'AUTOCOMMIT', 'use_setinputsizes': False, 'pool_pre_ping': True, 'ignore_no_transaction_on_rollback': True}, 'omit_create_engine': {'method'}, 'to_sql': {'method': None}, 'requirements': {'password', 'host', 'username', 'database'}, 'defaults': {'port': 1433, 'options': 'driver=ODBC Driver 18 for SQL Server&UseFMTONLY=Yes&TrustServerCertificate=yes&Encrypt=no&MARS_Connection=yes'}}, 'mysql': {'engine': 'mysql+pymysql', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {'method': 'multi'}, 'requirements': {'password', 'host', 'username', 'database'}, 'defaults': {'port': 3306}}, 'mariadb': {'engine': 'mysql+pymysql', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {'method': 'multi'}, 'requirements': {'password', 'host', 'username', 'database'}, 'defaults': {'port': 3306}}, 'oracle': {'engine': 'oracle+oracledb', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {'method': None}, 'requirements': {'password', 'host', 'username', 'database'}, 'defaults': {'port': 1521}}, 'sqlite': {'engine': 'sqlite', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {'method': 'multi'}, 'requirements': {'database'}, 'defaults': {}}, 'duckdb': {'engine': 'duckdb', 'create_engine': {}, 'omit_create_engine': {'ALL'}, 'to_sql': {'method': 'multi'}, 'requirements': '', 'defaults': {}}, 'cockroachdb': {'engine': 'cockroachdb', 'omit_create_engine': {'method'}, 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'to_sql': {'method': 'multi'}, 'requirements': {'host'}, 'defaults': {'port': 26257, 'database': 'defaultdb', 'username': 'root', 'password': 'admin'}}}
def create_engine( self, include_uri: bool = False, debug: bool = False, **kw) -> 'sqlalchemy.engine.Engine':
181def create_engine(
182    self,
183    include_uri: bool = False,
184    debug: bool = False,
185    **kw
186) -> 'sqlalchemy.engine.Engine':
187    """Create a sqlalchemy engine by building the engine string."""
188    from meerschaum.utils.packages import attempt_import
189    from meerschaum.utils.warnings import error, warn
190    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
191    import urllib
192    import copy
193    ### Install and patch required drivers.
194    if self.flavor in install_flavor_drivers:
195        _ = attempt_import(
196            *install_flavor_drivers[self.flavor],
197            debug=debug,
198            lazy=False,
199            warn=False,
200        )
201        if self.flavor == 'mssql':
202            pyodbc = attempt_import('pyodbc', debug=debug, lazy=False, warn=False)
203            pyodbc.pooling = False
204    if self.flavor in require_patching_flavors:
205        from meerschaum.utils.packages import determine_version, _monkey_patch_get_distribution
206        import pathlib
207        for install_name, import_name in require_patching_flavors[self.flavor]:
208            pkg = attempt_import(
209                import_name,
210                debug=debug,
211                lazy=False,
212                warn=False
213            )
214            _monkey_patch_get_distribution(
215                install_name, determine_version(pathlib.Path(pkg.__file__), venv='mrsm')
216            )
217
218    ### supplement missing values with defaults (e.g. port number)
219    for a, value in flavor_configs[self.flavor]['defaults'].items():
220        if a not in self.__dict__:
221            self.__dict__[a] = value
222
223    ### Verify that everything is in order.
224    if self.flavor not in flavor_configs:
225        error(f"Cannot create a connector with the flavor '{self.flavor}'.")
226
227    _engine = flavor_configs[self.flavor].get('engine', None)
228    _username = self.__dict__.get('username', None)
229    _password = self.__dict__.get('password', None)
230    _host = self.__dict__.get('host', None)
231    _port = self.__dict__.get('port', None)
232    _database = self.__dict__.get('database', None)
233    _options = self.__dict__.get('options', {})
234    if isinstance(_options, str):
235        _options = dict(urllib.parse.parse_qsl(_options))
236    _uri = self.__dict__.get('uri', None)
237
238    ### Handle registering specific dialects (due to installing in virtual environments).
239    if self.flavor in flavor_dialects:
240        sqlalchemy.dialects.registry.register(*flavor_dialects[self.flavor])
241
242    ### self._sys_config was deepcopied and can be updated safely
243    if self.flavor in ("sqlite", "duckdb"):
244        engine_str = f"{_engine}:///{_database}" if not _uri else _uri
245        if 'create_engine' not in self._sys_config:
246            self._sys_config['create_engine'] = {}
247        if 'connect_args' not in self._sys_config['create_engine']:
248            self._sys_config['create_engine']['connect_args'] = {}
249        self._sys_config['create_engine']['connect_args'].update({"check_same_thread": False})
250    else:
251        engine_str = (
252            _engine + "://" + (_username if _username is not None else '') +
253            ((":" + urllib.parse.quote_plus(_password)) if _password is not None else '') +
254            "@" + _host + ((":" + str(_port)) if _port is not None else '') +
255            (("/" + _database) if _database is not None else '')
256            + (("?" + urllib.parse.urlencode(_options)) if _options else '')
257        ) if not _uri else _uri
258
259        ### Sometimes the timescaledb:// flavor can slip in.
260        if _uri and self.flavor in ('timescaledb',) and self.flavor in _uri:
261            engine_str = engine_str.replace(f'{self.flavor}', 'postgresql', 1)
262
263    if debug:
264        dprint(
265            (
266                (engine_str.replace(':' + _password, ':' + ('*' * len(_password))))
267                    if _password is not None else engine_str
268            ) + '\n' + f"{self._sys_config.get('create_engine', {}).get('connect_args', {})}"
269        )
270
271    _kw_copy = copy.deepcopy(kw)
272
273    ### NOTE: Order of inheritance:
274    ###       1. Defaults
275    ###       2. System configuration
276    ###       3. Connector configuration
277    ###       4. Keyword arguments
278    _create_engine_args = flavor_configs.get(self.flavor, {}).get('create_engine', {})
279    def _apply_create_engine_args(update):
280        if 'ALL' not in flavor_configs[self.flavor].get('omit_create_engine', {}):
281            _create_engine_args.update(
282                { k: v for k, v in update.items()
283                    if 'omit_create_engine' not in flavor_configs[self.flavor]
284                        or k not in flavor_configs[self.flavor].get('omit_create_engine')
285                }
286            )
287    _apply_create_engine_args(self._sys_config.get('create_engine', {}))
288    _apply_create_engine_args(self.__dict__.get('create_engine', {}))
289    _apply_create_engine_args(_kw_copy)
290
291    try:
292        engine = sqlalchemy.create_engine(
293            engine_str,
294            ### I know this looks confusing, and maybe it's bad code,
295            ### but it's simple. It dynamically parses the config string
296            ### and splits it to separate the class name (QueuePool)
297            ### from the module name (sqlalchemy.pool).
298            poolclass    = getattr(
299                attempt_import(
300                    ".".join(self._sys_config['poolclass'].split('.')[:-1])
301                ),
302                self._sys_config['poolclass'].split('.')[-1]
303            ),
304            echo         = debug,
305            **_create_engine_args
306        )
307    except Exception as e:
308        warn(f"Failed to create connector '{self}':\n{traceback.format_exc()}", stack=False)
309        engine = None
310
311    if include_uri:
312        return engine, engine_str
313    return engine

Create a sqlalchemy engine by building the engine string.

def read( self, query_or_table: 'Union[str, sqlalchemy.Query]', params: Union[Dict[str, Any], List[str], NoneType] = None, dtype: Optional[Dict[str, Any]] = None, coerce_float: bool = True, chunksize: Optional[int] = -1, workers: Optional[int] = None, chunk_hook: Optional[Callable[[pandas.core.frame.DataFrame], Any]] = None, as_hook_results: bool = False, chunks: Optional[int] = None, schema: Optional[str] = None, as_chunks: bool = False, as_iterator: bool = False, as_dask: bool = False, index_col: Optional[str] = None, silent: bool = False, debug: bool = False, **kw: Any) -> 'Union[pandas.DataFrame, dask.DataFrame, List[pandas.DataFrame], List[Any], None]':
 28def read(
 29    self,
 30    query_or_table: Union[str, sqlalchemy.Query],
 31    params: Union[Dict[str, Any], List[str], None] = None,
 32    dtype: Optional[Dict[str, Any]] = None,
 33    coerce_float: bool = True,
 34    chunksize: Optional[int] = -1,
 35    workers: Optional[int] = None,
 36    chunk_hook: Optional[Callable[[pandas.DataFrame], Any]] = None,
 37    as_hook_results: bool = False,
 38    chunks: Optional[int] = None,
 39    schema: Optional[str] = None,
 40    as_chunks: bool = False,
 41    as_iterator: bool = False,
 42    as_dask: bool = False,
 43    index_col: Optional[str] = None,
 44    silent: bool = False,
 45    debug: bool = False,
 46    **kw: Any
 47) -> Union[
 48    pandas.DataFrame,
 49    dask.DataFrame,
 50    List[pandas.DataFrame],
 51    List[Any],
 52    None,
 53]:
 54    """
 55    Read a SQL query or table into a pandas dataframe.
 56
 57    Parameters
 58    ----------
 59    query_or_table: Union[str, sqlalchemy.Query]
 60        The SQL query (sqlalchemy Query or string) or name of the table from which to select.
 61
 62    params: Optional[Dict[str, Any]], default None
 63        `List` or `Dict` of parameters to pass to `pandas.read_sql()`.
 64        See the pandas documentation for more information:
 65        https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html
 66
 67    dtype: Optional[Dict[str, Any]], default None
 68        A dictionary of data types to pass to `pandas.read_sql()`.
 69        See the pandas documentation for more information:
 70        https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html
 71
 72    chunksize: Optional[int], default -1
 73        How many chunks to read at a time. `None` will read everything in one large chunk.
 74        Defaults to system configuration.
 75
 76        **NOTE:** DuckDB does not allow for chunking.
 77
 78    workers: Optional[int], default None
 79        How many threads to use when consuming the generator.
 80        Only applies if `chunk_hook` is provided.
 81
 82    chunk_hook: Optional[Callable[[pandas.DataFrame], Any]], default None
 83        Hook function to execute once per chunk, e.g. writing and reading chunks intermittently.
 84        See `--sync-chunks` for an example.
 85        **NOTE:** `as_iterator` MUST be False (default).
 86
 87    as_hook_results: bool, default False
 88        If `True`, return a `List` of the outputs of the hook function.
 89        Only applicable if `chunk_hook` is not None.
 90
 91        **NOTE:** `as_iterator` MUST be `False` (default).
 92
 93    chunks: Optional[int], default None
 94        Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and
 95        return into a single dataframe.
 96        For example, to limit the returned dataframe to 100,000 rows,
 97        you could specify a `chunksize` of `1000` and `chunks` of `100`.
 98
 99    schema: Optional[str], default None
100        If just a table name is provided, optionally specify the table schema.
101        Defaults to `SQLConnector.schema`.
102
103    as_chunks: bool, default False
104        If `True`, return a list of DataFrames.
105        Otherwise return a single DataFrame.
106
107    as_iterator: bool, default False
108        If `True`, return the pandas DataFrame iterator.
109        `chunksize` must not be `None` (falls back to 1000 if so),
110        and hooks are not called in this case.
111
112    index_col: Optional[str], default None
113        If using Dask, use this column as the index column.
114        If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame.
115
116    silent: bool, default False
117        If `True`, don't raise warnings in case of errors.
118        Defaults to `False`.
119
120    Returns
121    -------
122    A `pd.DataFrame` (default case), or an iterator, or a list of dataframes / iterators,
123    or `None` if something breaks.
124
125    """
126    if chunks is not None and chunks <= 0:
127        return []
128    from meerschaum.utils.sql import sql_item_name, truncate_item_name
129    from meerschaum.utils.dtypes import are_dtypes_equal, coerce_timezone
130    from meerschaum.utils.dtypes.sql import TIMEZONE_NAIVE_FLAVORS
131    from meerschaum.utils.packages import attempt_import, import_pandas
132    from meerschaum.utils.pool import get_pool
133    from meerschaum.utils.dataframe import chunksize_to_npartitions, get_numeric_cols
134    import warnings
135    import traceback
136    from decimal import Decimal
137    pd = import_pandas()
138    dd = None
139    is_dask = 'dask' in pd.__name__
140    pandas = attempt_import('pandas')
141    is_dask = dd is not None
142    npartitions = chunksize_to_npartitions(chunksize)
143    if is_dask:
144        chunksize = None
145    schema = schema or self.schema
146    utc_dt_cols = [
147        col
148        for col, typ in dtype.items()
149        if are_dtypes_equal(typ, 'datetime') and 'utc' in typ.lower()
150    ] if dtype else []
151
152    if dtype and utc_dt_cols and self.flavor in TIMEZONE_NAIVE_FLAVORS:
153        dtype = dtype.copy()
154        for col in utc_dt_cols:
155            dtype[col] = 'datetime64[ns]'
156
157    pool = get_pool(workers=workers)
158    sqlalchemy = attempt_import("sqlalchemy", lazy=False)
159    default_chunksize = self._sys_config.get('chunksize', None)
160    chunksize = chunksize if chunksize != -1 else default_chunksize
161    if chunksize is None and as_iterator:
162        if not silent and self.flavor not in _disallow_chunks_flavors:
163            warn(
164                "An iterator may only be generated if chunksize is not None.\n"
165                + "Falling back to a chunksize of 1000.", stacklevel=3,
166            )
167        chunksize = 1000
168    if chunksize is not None and self.flavor in _max_chunks_flavors:
169        if chunksize > _max_chunks_flavors[self.flavor]:
170            if chunksize != default_chunksize:
171                warn(
172                    f"The specified chunksize of {chunksize} exceeds the maximum of "
173                    + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n"
174                    + f"    Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.",
175                    stacklevel=3,
176                )
177            chunksize = _max_chunks_flavors[self.flavor]
178
179    if chunksize is not None and self.flavor in _disallow_chunks_flavors:
180        chunksize = None
181
182    if debug:
183        import time
184        start = time.perf_counter()
185        dprint(f"[{self}]\n{query_or_table}")
186        dprint(f"[{self}] Fetching with chunksize: {chunksize}")
187
188    ### This might be sqlalchemy object or the string of a table name.
189    ### We check for spaces and quotes to see if it might be a weird table.
190    if (
191        ' ' not in str(query_or_table)
192        or (
193            ' ' in str(query_or_table)
194            and str(query_or_table).startswith('"')
195            and str(query_or_table).endswith('"')
196        )
197    ):
198        truncated_table_name = truncate_item_name(str(query_or_table), self.flavor)
199        if truncated_table_name != str(query_or_table) and not silent:
200            warn(
201                f"Table '{query_or_table}' is too long for '{self.flavor}',"
202                + f" will instead read the table '{truncated_table_name}'."
203            )
204
205        query_or_table = sql_item_name(str(query_or_table), self.flavor, schema)
206        if debug:
207            dprint(f"[{self}] Reading from table {query_or_table}")
208        formatted_query = sqlalchemy.text("SELECT * FROM " + str(query_or_table))
209        str_query = f"SELECT * FROM {query_or_table}"
210    else:
211        str_query = query_or_table
212
213    formatted_query = (
214        sqlalchemy.text(str_query)
215        if not is_dask and isinstance(str_query, str)
216        else format_sql_query_for_dask(str_query)
217    )
218
219    chunk_list = []
220    chunk_hook_results = []
221    def _process_chunk(_chunk, _retry_on_failure: bool = True):
222        if self.flavor in TIMEZONE_NAIVE_FLAVORS:
223            for col in utc_dt_cols:
224                _chunk[col] = coerce_timezone(_chunk[col], strip_timezone=False)
225        if not as_hook_results:
226            chunk_list.append(_chunk)
227        if chunk_hook is None:
228            return None
229
230        result = None
231        try:
232            result = chunk_hook(
233                _chunk,
234                workers=workers,
235                chunksize=chunksize,
236                debug=debug,
237                **kw
238            )
239        except Exception:
240            result = False, traceback.format_exc()
241            from meerschaum.utils.formatting import get_console
242            if not silent:
243                get_console().print_exception()
244
245        ### If the chunk fails to process, try it again one more time.
246        if isinstance(result, tuple) and result[0] is False:
247            if _retry_on_failure:
248                return _process_chunk(_chunk, _retry_on_failure=False)
249
250        return result
251
252    try:
253        stream_results = not as_iterator and chunk_hook is not None and chunksize is not None
254        with warnings.catch_warnings():
255            warnings.filterwarnings('ignore', 'case sensitivity issues')
256
257            read_sql_query_kwargs = {
258                'params': params,
259                'dtype': dtype,
260                'coerce_float': coerce_float,
261                'index_col': index_col,
262            }
263            if is_dask:
264                if index_col is None:
265                    dd = None
266                    pd = attempt_import('pandas')
267                    read_sql_query_kwargs.update({
268                        'chunksize': chunksize,
269                    })
270            else:
271                read_sql_query_kwargs.update({
272                    'chunksize': chunksize,
273                })
274
275            if is_dask and dd is not None:
276                ddf = dd.read_sql_query(
277                    formatted_query,
278                    self.URI,
279                    **read_sql_query_kwargs
280                )
281            else:
282
283                def get_chunk_generator(connectable):
284                    chunk_generator = pd.read_sql_query(
285                        formatted_query,
286                        self.engine,
287                        **read_sql_query_kwargs
288                    )
289                    to_return = (
290                        chunk_generator
291                        if as_iterator or chunksize is None
292                        else (
293                            list(pool.imap(_process_chunk, chunk_generator))
294                            if as_hook_results
295                            else None
296                        )
297                    )
298                    return chunk_generator, to_return
299
300                if self.flavor in SKIP_READ_TRANSACTION_FLAVORS:
301                    chunk_generator, to_return = get_chunk_generator(self.engine)
302                else:
303                    with self.engine.begin() as transaction:
304                        with transaction.execution_options(stream_results=stream_results) as connection:
305                            chunk_generator, to_return = get_chunk_generator(connection)
306
307                if to_return is not None:
308                    return to_return
309
310    except Exception as e:
311        if debug:
312            dprint(f"[{self}] Failed to execute query:\n\n{query_or_table}\n\n")
313        if not silent:
314            warn(str(e), stacklevel=3)
315        from meerschaum.utils.formatting import get_console
316        if not silent:
317            get_console().print_exception()
318
319        return None
320
321    if is_dask and dd is not None:
322        ddf = ddf.reset_index()
323        return ddf
324
325    chunk_list = []
326    read_chunks = 0
327    chunk_hook_results = []
328    if chunksize is None:
329        chunk_list.append(chunk_generator)
330    elif as_iterator:
331        return chunk_generator
332    else:
333        try:
334            for chunk in chunk_generator:
335                if chunk_hook is not None:
336                    chunk_hook_results.append(
337                        chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw)
338                    )
339                chunk_list.append(chunk)
340                read_chunks += 1
341                if chunks is not None and read_chunks >= chunks:
342                    break
343        except Exception as e:
344            warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3)
345            from meerschaum.utils.formatting import get_console
346            if not silent:
347                get_console().print_exception()
348
349    read_chunks = 0
350    try:
351        for chunk in chunk_generator:
352            if chunk_hook is not None:
353                chunk_hook_results.append(
354                    chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw)
355                )
356            chunk_list.append(chunk)
357            read_chunks += 1
358            if chunks is not None and read_chunks >= chunks:
359                break
360    except Exception as e:
361        warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3)
362        from meerschaum.utils.formatting import get_console
363        if not silent:
364            get_console().print_exception()
365
366        return None
367
368    ### If no chunks returned, read without chunks
369    ### to get columns
370    if len(chunk_list) == 0:
371        with warnings.catch_warnings():
372            warnings.filterwarnings('ignore', 'case sensitivity issues')
373            _ = read_sql_query_kwargs.pop('chunksize', None)
374            with self.engine.begin() as connection:
375                chunk_list.append(
376                    pd.read_sql_query(
377                        formatted_query,
378                        connection,
379                        **read_sql_query_kwargs
380                    )
381                )
382
383    ### call the hook on any missed chunks.
384    if chunk_hook is not None and len(chunk_list) > len(chunk_hook_results):
385        for c in chunk_list[len(chunk_hook_results):]:
386            chunk_hook_results.append(
387                chunk_hook(c, chunksize=chunksize, debug=debug, **kw)
388            )
389
390    ### chunksize is not None so must iterate
391    if debug:
392        end = time.perf_counter()
393        dprint(f"Fetched {len(chunk_list)} chunks in {round(end - start, 2)} seconds.")
394
395    if as_hook_results:
396        return chunk_hook_results
397    
398    ### Skip `pd.concat()` if `as_chunks` is specified.
399    if as_chunks:
400        for c in chunk_list:
401            c.reset_index(drop=True, inplace=True)
402            for col in get_numeric_cols(c):
403                c[col] = c[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x)
404        return chunk_list
405
406    df = pd.concat(chunk_list).reset_index(drop=True)
407    ### NOTE: The calls to `canonical()` are to drop leading and trailing zeroes.
408    for col in get_numeric_cols(df):
409        df[col] = df[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x)
410
411    return df

Read a SQL query or table into a pandas dataframe.

Parameters
  • query_or_table (Union[str, sqlalchemy.Query]): The SQL query (sqlalchemy Query or string) or name of the table from which to select.
  • params (Optional[Dict[str, Any]], default None): List or Dict of parameters to pass to pandas.read_sql(). See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html
  • dtype (Optional[Dict[str, Any]], default None): A dictionary of data types to pass to pandas.read_sql(). See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html
  • chunksize (Optional[int], default -1): How many chunks to read at a time. None will read everything in one large chunk. Defaults to system configuration.

    NOTE: DuckDB does not allow for chunking.

  • workers (Optional[int], default None): How many threads to use when consuming the generator. Only applies if chunk_hook is provided.
  • chunk_hook (Optional[Callable[[pandas.DataFrame], Any]], default None): Hook function to execute once per chunk, e.g. writing and reading chunks intermittently. See --sync-chunks for an example. NOTE: as_iterator MUST be False (default).
  • as_hook_results (bool, default False): If True, return a List of the outputs of the hook function. Only applicable if chunk_hook is not None.

    NOTE: as_iterator MUST be False (default).

  • chunks (Optional[int], default None): Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and return into a single dataframe. For example, to limit the returned dataframe to 100,000 rows, you could specify a chunksize of 1000 and chunks of 100.
  • schema (Optional[str], default None): If just a table name is provided, optionally specify the table schema. Defaults to SQLConnector.schema.
  • as_chunks (bool, default False): If True, return a list of DataFrames. Otherwise return a single DataFrame.
  • as_iterator (bool, default False): If True, return the pandas DataFrame iterator. chunksize must not be None (falls back to 1000 if so), and hooks are not called in this case.
  • index_col (Optional[str], default None): If using Dask, use this column as the index column. If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame.
  • silent (bool, default False): If True, don't raise warnings in case of errors. Defaults to False.
Returns
  • A pd.DataFrame (default case), or an iterator, or a list of dataframes / iterators,
  • or None if something breaks.
def value(self, query: str, *args: Any, use_pandas: bool = False, **kw: Any) -> Any:
414def value(
415    self,
416    query: str,
417    *args: Any,
418    use_pandas: bool = False,
419    **kw: Any
420) -> Any:
421    """
422    Execute the provided query and return the first value.
423
424    Parameters
425    ----------
426    query: str
427        The SQL query to execute.
428        
429    *args: Any
430        The arguments passed to `meerschaum.connectors.sql.SQLConnector.exec`
431        if `use_pandas` is `False` (default) or to `meerschaum.connectors.sql.SQLConnector.read`.
432        
433    use_pandas: bool, default False
434        If `True`, use `meerschaum.connectors.SQLConnector.read`, otherwise use
435        `meerschaum.connectors.sql.SQLConnector.exec` (default).
436        **NOTE:** This is always `True` for DuckDB.
437
438    **kw: Any
439        See `args`.
440
441    Returns
442    -------
443    Any value returned from the query.
444
445    """
446    from meerschaum.utils.packages import attempt_import
447    if self.flavor == 'duckdb':
448        use_pandas = True
449    if use_pandas:
450        try:
451            return self.read(query, *args, **kw).iloc[0, 0]
452        except Exception:
453            return None
454
455    _close = kw.get('close', True)
456    _commit = kw.get('commit', (self.flavor != 'mssql'))
457
458    try:
459        result, connection = self.exec(
460            query,
461            *args,
462            with_connection=True,
463            close=False,
464            commit=_commit,
465            **kw
466        )
467        first = result.first() if result is not None else None
468        _val = first[0] if first is not None else None
469    except Exception as e:
470        warn(e, stacklevel=3)
471        return None
472    if _close:
473        try:
474            connection.close()
475        except Exception as e:
476            warn("Failed to close connection with exception:\n" + str(e))
477    return _val

Execute the provided query and return the first value.

Parameters
Returns
  • Any value returned from the query.
def exec( self, query: str, *args: Any, silent: bool = False, debug: bool = False, commit: Optional[bool] = None, close: Optional[bool] = None, with_connection: bool = False, _connection=None, _transaction=None, **kw: Any) -> 'Union[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.cursor.LegacyCursorResult, Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection], Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection], None]':
491def exec(
492    self,
493    query: str,
494    *args: Any,
495    silent: bool = False,
496    debug: bool = False,
497    commit: Optional[bool] = None,
498    close: Optional[bool] = None,
499    with_connection: bool = False,
500    _connection=None,
501    _transaction=None,
502    **kw: Any
503) -> Union[
504        sqlalchemy.engine.result.resultProxy,
505        sqlalchemy.engine.cursor.LegacyCursorResult,
506        Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection],
507        Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection],
508        None
509]:
510    """
511    Execute SQL code and return the `sqlalchemy` result, e.g. when calling stored procedures.
512
513    If inserting data, please use bind variables to avoid SQL injection!
514
515    Parameters
516    ----------
517    query: Union[str, List[str], Tuple[str]]
518        The query to execute.
519        If `query` is a list or tuple, call `self.exec_queries()` instead.
520
521    args: Any
522        Arguments passed to `sqlalchemy.engine.execute`.
523
524    silent: bool, default False
525        If `True`, suppress warnings.
526
527    commit: Optional[bool], default None
528        If `True`, commit the changes after execution.
529        Causes issues with flavors like `'mssql'`.
530        This does not apply if `query` is a list of strings.
531
532    close: Optional[bool], default None
533        If `True`, close the connection after execution.
534        Causes issues with flavors like `'mssql'`.
535        This does not apply if `query` is a list of strings.
536
537    with_connection: bool, default False
538        If `True`, return a tuple including the connection object.
539        This does not apply if `query` is a list of strings.
540
541    Returns
542    -------
543    The `sqlalchemy` result object, or a tuple with the connection if `with_connection` is provided.
544
545    """
546    if isinstance(query, (list, tuple)):
547        return self.exec_queries(
548            list(query),
549            *args,
550            silent=silent,
551            debug=debug,
552            **kw
553        )
554
555    from meerschaum.utils.packages import attempt_import
556    sqlalchemy = attempt_import("sqlalchemy", lazy=False)
557    if debug:
558        dprint(f"[{self}] Executing query:\n{query}")
559
560    _close = close if close is not None else (self.flavor != 'mssql')
561    _commit = commit if commit is not None else (
562        (self.flavor != 'mssql' or 'select' not in str(query).lower())
563    )
564
565    ### Select and Insert objects need to be compiled (SQLAlchemy 2.0.0+).
566    if not hasattr(query, 'compile'):
567        query = sqlalchemy.text(query)
568
569    connection = _connection if _connection is not None else self.get_connection()
570
571    try:
572        transaction = (
573            _transaction
574            if _transaction is not None else (
575                connection.begin()
576                if _commit
577                else None
578            )
579        )
580    except sqlalchemy.exc.InvalidRequestError as e:
581        if _connection is not None or _transaction is not None:
582            raise e
583        connection = self.get_connection(rebuild=True)
584        transaction = connection.begin()
585
586    if transaction is not None and not transaction.is_active and _transaction is not None:
587        connection = self.get_connection(rebuild=True)
588        transaction = connection.begin() if _commit else None
589
590    result = None
591    try:
592        result = connection.execute(query, *args, **kw)
593        if _commit:
594            transaction.commit()
595    except Exception as e:
596        if debug:
597            dprint(f"[{self}] Failed to execute query:\n\n{query}\n\n{e}")
598        if not silent:
599            warn(str(e), stacklevel=3)
600        result = None
601        if _commit:
602            transaction.rollback()
603            connection = self.get_connection(rebuild=True)
604    finally:
605        if _close:
606            connection.close()
607
608    if with_connection:
609        return result, connection
610
611    return result

Execute SQL code and return the sqlalchemy result, e.g. when calling stored procedures.

If inserting data, please use bind variables to avoid SQL injection!

Parameters
  • query (Union[str, List[str], Tuple[str]]): The query to execute. If query is a list or tuple, call self.exec_queries() instead.
  • args (Any): Arguments passed to sqlalchemy.engine.execute.
  • silent (bool, default False): If True, suppress warnings.
  • commit (Optional[bool], default None): If True, commit the changes after execution. Causes issues with flavors like 'mssql'. This does not apply if query is a list of strings.
  • close (Optional[bool], default None): If True, close the connection after execution. Causes issues with flavors like 'mssql'. This does not apply if query is a list of strings.
  • with_connection (bool, default False): If True, return a tuple including the connection object. This does not apply if query is a list of strings.
Returns
  • The sqlalchemy result object, or a tuple with the connection if with_connection is provided.
def execute( self, *args: Any, **kw: Any) -> 'Optional[sqlalchemy.engine.result.resultProxy]':
480def execute(
481    self,
482    *args : Any,
483    **kw : Any
484) -> Optional[sqlalchemy.engine.result.resultProxy]:
485    """
486    An alias for `meerschaum.connectors.sql.SQLConnector.exec`.
487    """
488    return self.exec(*args, **kw)
def to_sql( self, df: pandas.core.frame.DataFrame, name: str = None, index: bool = False, if_exists: str = 'replace', method: str = '', chunksize: Optional[int] = -1, schema: Optional[str] = None, safe_copy: bool = True, silent: bool = False, debug: bool = False, as_tuple: bool = False, as_dict: bool = False, _connection=None, _transaction=None, **kw) -> Union[bool, Tuple[bool, str]]:
 709def to_sql(
 710    self,
 711    df: pandas.DataFrame,
 712    name: str = None,
 713    index: bool = False,
 714    if_exists: str = 'replace',
 715    method: str = "",
 716    chunksize: Optional[int] = -1,
 717    schema: Optional[str] = None,
 718    safe_copy: bool = True,
 719    silent: bool = False,
 720    debug: bool = False,
 721    as_tuple: bool = False,
 722    as_dict: bool = False,
 723    _connection=None,
 724    _transaction=None,
 725    **kw
 726) -> Union[bool, SuccessTuple]:
 727    """
 728    Upload a DataFrame's contents to the SQL server.
 729
 730    Parameters
 731    ----------
 732    df: pd.DataFrame
 733        The DataFrame to be inserted.
 734
 735    name: str
 736        The name of the table to be created.
 737
 738    index: bool, default False
 739        If True, creates the DataFrame's indices as columns.
 740
 741    if_exists: str, default 'replace'
 742        Drop and create the table ('replace') or append if it exists
 743        ('append') or raise Exception ('fail').
 744        Options are ['replace', 'append', 'fail'].
 745
 746    method: str, default ''
 747        None or multi. Details on pandas.to_sql.
 748
 749    chunksize: Optional[int], default -1
 750        How many rows to insert at a time.
 751
 752    schema: Optional[str], default None
 753        Optionally override the schema for the table.
 754        Defaults to `SQLConnector.schema`.
 755
 756    safe_copy: bool, defaul True
 757        If `True`, copy the dataframe before making any changes.
 758
 759    as_tuple: bool, default False
 760        If `True`, return a (success_bool, message) tuple instead of a `bool`.
 761        Defaults to `False`.
 762
 763    as_dict: bool, default False
 764        If `True`, return a dictionary of transaction information.
 765        The keys are `success`, `msg`, `start`, `end`, `duration`, `num_rows`, `chunksize`,
 766        `method`, and `target`.
 767
 768    kw: Any
 769        Additional arguments will be passed to the DataFrame's `to_sql` function
 770
 771    Returns
 772    -------
 773    Either a `bool` or a `SuccessTuple` (depends on `as_tuple`).
 774    """
 775    import time
 776    import json
 777    from decimal import Decimal
 778    from datetime import timedelta
 779    from meerschaum.utils.warnings import error, warn
 780    import warnings
 781    import functools
 782
 783    if name is None:
 784        error(f"Name must not be `None` to insert data into {self}.")
 785
 786    ### We're requiring `name` to be positional, and sometimes it's passed in from background jobs.
 787    kw.pop('name', None)
 788
 789    schema = schema or self.schema
 790
 791    from meerschaum.utils.sql import (
 792        sql_item_name,
 793        table_exists,
 794        json_flavors,
 795        truncate_item_name,
 796        DROP_IF_EXISTS_FLAVORS,
 797    )
 798    from meerschaum.utils.dataframe import (
 799        get_json_cols,
 800        get_numeric_cols,
 801        get_uuid_cols,
 802        get_bytes_cols,
 803    )
 804    from meerschaum.utils.dtypes import (
 805        are_dtypes_equal,
 806        coerce_timezone,
 807        encode_bytes_for_bytea,
 808        serialize_bytes,
 809        serialize_decimal,
 810        json_serialize_value,
 811    )
 812    from meerschaum.utils.dtypes.sql import (
 813        PD_TO_SQLALCHEMY_DTYPES_FLAVORS,
 814        get_db_type_from_pd_type,
 815        get_pd_type_from_db_type,
 816        get_numeric_precision_scale,
 817    )
 818    from meerschaum.utils.misc import interval_str
 819    from meerschaum.connectors.sql._create_engine import flavor_configs
 820    from meerschaum.utils.packages import attempt_import, import_pandas
 821    sqlalchemy = attempt_import('sqlalchemy', debug=debug, lazy=False)
 822    pd = import_pandas()
 823    is_dask = 'dask' in df.__module__
 824
 825    bytes_cols = get_bytes_cols(df)
 826    numeric_cols = get_numeric_cols(df)
 827    numeric_cols_dtypes = {
 828        col: typ
 829        for col, typ in kw.get('dtype', {}).items()
 830        if (
 831            col in df.columns
 832            and 'numeric' in str(typ).lower()
 833        )
 834        
 835    }
 836    numeric_cols.extend([col for col in numeric_cols_dtypes if col not in numeric_cols])
 837
 838    enable_bulk_insert = mrsm.get_config(
 839        'system', 'connectors', 'sql', 'bulk_insert'
 840    ).get(self.flavor, False)
 841    stats = {'target': name}
 842    ### resort to defaults if None
 843    copied = False
 844    use_bulk_insert = False
 845    if method == "":
 846        if enable_bulk_insert:
 847            method = (
 848                functools.partial(mssql_insert_json, debug=debug)
 849                if self.flavor == 'mssql'
 850                else functools.partial(psql_insert_copy, debug=debug)
 851            )
 852            use_bulk_insert = True
 853        else:
 854            ### Should resolve to 'multi' or `None`.
 855            method = flavor_configs.get(self.flavor, {}).get('to_sql', {}).get('method', 'multi')
 856
 857    if bytes_cols and (use_bulk_insert or self.flavor == 'oracle'):
 858        if safe_copy and not copied:
 859            df = df.copy()
 860            copied = True
 861        bytes_serializer = (
 862            functools.partial(encode_bytes_for_bytea, with_prefix=(self.flavor != 'oracle'))
 863            if self.flavor != 'mssql'
 864            else serialize_bytes
 865        )
 866        for col in bytes_cols:
 867            df[col] = df[col].apply(bytes_serializer)
 868
 869    ### Check for numeric columns.
 870    for col in numeric_cols:
 871        typ = numeric_cols_dtypes.get(col, None)
 872
 873        precision, scale = (
 874            (typ.precision, typ.scale)
 875            if hasattr(typ, 'precision')
 876            else get_numeric_precision_scale(self.flavor)
 877        )
 878
 879        df[col] = df[col].apply(
 880            functools.partial(
 881                serialize_decimal,
 882                quantize=True,
 883                precision=precision,
 884                scale=scale,
 885            )
 886        )
 887
 888    stats['method'] = method.__name__ if hasattr(method, '__name__') else str(method)
 889
 890    default_chunksize = self._sys_config.get('chunksize', None)
 891    chunksize = chunksize if chunksize != -1 else default_chunksize
 892    if chunksize is not None and self.flavor in _max_chunks_flavors:
 893        if chunksize > _max_chunks_flavors[self.flavor]:
 894            if chunksize != default_chunksize:
 895                warn(
 896                    f"The specified chunksize of {chunksize} exceeds the maximum of "
 897                    + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n"
 898                    + f"    Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.",
 899                    stacklevel = 3,
 900                )
 901            chunksize = _max_chunks_flavors[self.flavor]
 902    stats['chunksize'] = chunksize
 903
 904    success, msg = False, "Default to_sql message"
 905    start = time.perf_counter()
 906    if debug:
 907        msg = f"[{self}] Inserting {len(df)} rows with chunksize: {chunksize}..."
 908        print(msg, end="", flush=True)
 909    stats['num_rows'] = len(df)
 910
 911    ### Check if the name is too long.
 912    truncated_name = truncate_item_name(name, self.flavor)
 913    if name != truncated_name:
 914        warn(
 915            f"Table '{name}' is too long for '{self.flavor}',"
 916            f" will instead create the table '{truncated_name}'."
 917        )
 918
 919    ### filter out non-pandas args
 920    import inspect
 921    to_sql_params = inspect.signature(df.to_sql).parameters
 922    to_sql_kw = {}
 923    for k, v in kw.items():
 924        if k in to_sql_params:
 925            to_sql_kw[k] = v
 926
 927    to_sql_kw.update({
 928        'name': truncated_name,
 929        'schema': schema,
 930        ('con' if not is_dask else 'uri'): (self.engine if not is_dask else self.URI),
 931        'index': index,
 932        'if_exists': if_exists,
 933        'method': method,
 934        'chunksize': chunksize,
 935    })
 936    if is_dask:
 937        to_sql_kw.update({
 938            'parallel': True,
 939        })
 940    elif _connection is not None:
 941        to_sql_kw['con'] = _connection
 942
 943    if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else ""
 944    if self.flavor == 'oracle':
 945        ### For some reason 'replace' doesn't work properly in pandas,
 946        ### so try dropping first.
 947        if if_exists == 'replace' and table_exists(name, self, schema=schema, debug=debug):
 948            success = self.exec(
 949                f"DROP TABLE {if_exists_str}" + sql_item_name(name, 'oracle', schema)
 950            ) is not None
 951            if not success:
 952                warn(f"Unable to drop {name}")
 953
 954        ### Enforce NVARCHAR(2000) as text instead of CLOB.
 955        dtype = to_sql_kw.get('dtype', {})
 956        for col, typ in df.dtypes.items():
 957            if are_dtypes_equal(str(typ), 'object'):
 958                dtype[col] = sqlalchemy.types.NVARCHAR(2000)
 959            elif are_dtypes_equal(str(typ), 'int'):
 960                dtype[col] = sqlalchemy.types.INTEGER
 961        to_sql_kw['dtype'] = dtype
 962    elif self.flavor == 'duckdb':
 963        dtype = to_sql_kw.get('dtype', {})
 964        dt_cols = [col for col, typ in df.dtypes.items() if are_dtypes_equal(str(typ), 'datetime')]
 965        for col in dt_cols:
 966            df[col] = coerce_timezone(df[col], strip_utc=False)
 967    elif self.flavor == 'mssql':
 968        dtype = to_sql_kw.get('dtype', {})
 969        dt_cols = [col for col, typ in df.dtypes.items() if are_dtypes_equal(str(typ), 'datetime')]
 970        new_dtype = {}
 971        for col in dt_cols:
 972            if col in dtype:
 973                continue
 974            dt_typ = get_db_type_from_pd_type(str(df.dtypes[col]), self.flavor, as_sqlalchemy=True)
 975            if col not in dtype:
 976                new_dtype[col] = dt_typ
 977
 978        dtype.update(new_dtype)
 979        to_sql_kw['dtype'] = dtype
 980
 981    ### Check for JSON columns.
 982    if self.flavor not in json_flavors:
 983        json_cols = get_json_cols(df)
 984        for col in json_cols:
 985            df[col] = df[col].apply(
 986                (
 987                    lambda x: json.dumps(x, default=json_serialize_value, sort_keys=True)
 988                    if not isinstance(x, Hashable)
 989                    else x
 990                )
 991            )
 992
 993    if PD_TO_SQLALCHEMY_DTYPES_FLAVORS['uuid'].get(self.flavor, None) != 'Uuid':
 994        uuid_cols = get_uuid_cols(df)
 995        for col in uuid_cols:
 996            df[col] = df[col].astype(str)
 997
 998    try:
 999        with warnings.catch_warnings():
1000            warnings.filterwarnings('ignore')
1001            df.to_sql(**to_sql_kw)
1002        success = True
1003    except Exception as e:
1004        if not silent:
1005            warn(str(e))
1006        success, msg = False, str(e)
1007
1008    end = time.perf_counter()
1009    if success:
1010        num_rows = len(df)
1011        msg = (
1012            f"It took {interval_str(timedelta(seconds=(end - start)))} "
1013            + f"to sync {num_rows:,} row"
1014            + ('s' if num_rows != 1 else '')
1015            + f" to {name}."
1016        )
1017    stats['start'] = start
1018    stats['end'] = end
1019    stats['duration'] = end - start
1020
1021    if debug:
1022        print(" done.", flush=True)
1023        dprint(msg)
1024
1025    stats['success'] = success
1026    stats['msg'] = msg
1027    if as_tuple:
1028        return success, msg
1029    if as_dict:
1030        return stats
1031    return success

Upload a DataFrame's contents to the SQL server.

Parameters
  • df (pd.DataFrame): The DataFrame to be inserted.
  • name (str): The name of the table to be created.
  • index (bool, default False): If True, creates the DataFrame's indices as columns.
  • if_exists (str, default 'replace'): Drop and create the table ('replace') or append if it exists ('append') or raise Exception ('fail'). Options are ['replace', 'append', 'fail'].
  • method (str, default ''): None or multi. Details on pandas.to_sql.
  • chunksize (Optional[int], default -1): How many rows to insert at a time.
  • schema (Optional[str], default None): Optionally override the schema for the table. Defaults to SQLConnector.schema.
  • safe_copy (bool, defaul True): If True, copy the dataframe before making any changes.
  • as_tuple (bool, default False): If True, return a (success_bool, message) tuple instead of a bool. Defaults to False.
  • as_dict (bool, default False): If True, return a dictionary of transaction information. The keys are success, msg, start, end, duration, num_rows, chunksize, method, and target.
  • kw (Any): Additional arguments will be passed to the DataFrame's to_sql function
Returns
  • Either a bool or a SuccessTuple (depends on as_tuple).
def exec_queries( self, queries: "List[Union[str, Tuple[str, Callable[['sqlalchemy.orm.session.Session'], List[str]]]]]", break_on_error: bool = False, rollback: bool = True, silent: bool = False, debug: bool = False) -> 'List[Union[sqlalchemy.engine.cursor.CursorResult, None]]':
614def exec_queries(
615    self,
616    queries: List[
617        Union[
618            str,
619            Tuple[str, Callable[['sqlalchemy.orm.session.Session'], List[str]]]
620        ]
621    ],
622    break_on_error: bool = False,
623    rollback: bool = True,
624    silent: bool = False,
625    debug: bool = False,
626) -> List[Union[sqlalchemy.engine.cursor.CursorResult, None]]:
627    """
628    Execute a list of queries in a single transaction.
629
630    Parameters
631    ----------
632    queries: List[
633        Union[
634            str,
635            Tuple[str, Callable[[], List[str]]]
636        ]
637    ]
638        The queries in the transaction to be executed.
639        If a query is a tuple, the second item of the tuple
640        will be considered a callable hook that returns a list of queries to be executed
641        before the next item in the list.
642
643    break_on_error: bool, default False
644        If `True`, stop executing when a query fails.
645
646    rollback: bool, default True
647        If `break_on_error` is `True`, rollback the transaction if a query fails.
648
649    silent: bool, default False
650        If `True`, suppress warnings.
651
652    Returns
653    -------
654    A list of SQLAlchemy results.
655    """
656    from meerschaum.utils.warnings import warn
657    from meerschaum.utils.debug import dprint
658    from meerschaum.utils.packages import attempt_import
659    sqlalchemy, sqlalchemy_orm = attempt_import('sqlalchemy', 'sqlalchemy.orm', lazy=False)
660    session = sqlalchemy_orm.Session(self.engine)
661
662    result = None
663    results = []
664    with session.begin():
665        for query in queries:
666            hook = None
667            result = None
668
669            if isinstance(query, tuple):
670                query, hook = query
671            if isinstance(query, str):
672                query = sqlalchemy.text(query)
673
674            if debug:
675                dprint(f"[{self}]\n" + str(query))
676
677            try:
678                result = session.execute(query)
679                session.flush()
680            except Exception as e:
681                msg = (f"Encountered error while executing:\n{e}")
682                if not silent:
683                    warn(msg)
684                elif debug:
685                    dprint(f"[{self}]\n" + str(msg))
686                result = None
687            if result is None and break_on_error:
688                if rollback:
689                    session.rollback()
690                results.append(result)
691                break
692            elif result is not None and hook is not None:
693                hook_queries = hook(session)
694                if hook_queries:
695                    hook_results = self.exec_queries(
696                        hook_queries,
697                        break_on_error = break_on_error,
698                        rollback=rollback,
699                        silent=silent,
700                        debug=debug,
701                    )
702                    result = (result, hook_results)
703
704            results.append(result)
705
706    return results

Execute a list of queries in a single transaction.

Parameters
  • queries (List[): Union[ str, Tuple[str, Callable[[], List[str]]] ]
  • ]: The queries in the transaction to be executed. If a query is a tuple, the second item of the tuple will be considered a callable hook that returns a list of queries to be executed before the next item in the list.
  • break_on_error (bool, default False): If True, stop executing when a query fails.
  • rollback (bool, default True): If break_on_error is True, rollback the transaction if a query fails.
  • silent (bool, default False): If True, suppress warnings.
Returns
  • A list of SQLAlchemy results.
def get_connection(self, rebuild: bool = False) -> "'sqlalchemy.engine.base.Connection'":
1214def get_connection(self, rebuild: bool = False) -> 'sqlalchemy.engine.base.Connection':
1215    """
1216    Return the current alive connection.
1217
1218    Parameters
1219    ----------
1220    rebuild: bool, default False
1221        If `True`, close the previous connection and open a new one.
1222
1223    Returns
1224    -------
1225    A `sqlalchemy.engine.base.Connection` object.
1226    """
1227    import threading
1228    if '_thread_connections' not in self.__dict__:
1229        self.__dict__['_thread_connections'] = {}
1230
1231    self._cleanup_connections()
1232
1233    thread_id = threading.get_ident()
1234
1235    thread_connections = self.__dict__.get('_thread_connections', {})
1236    connection = thread_connections.get(thread_id, None)
1237
1238    if rebuild and connection is not None:
1239        try:
1240            connection.close()
1241        except Exception:
1242            pass
1243
1244        _ = thread_connections.pop(thread_id, None)
1245        connection = None
1246
1247    if connection is None or connection.closed:
1248        connection = self.engine.connect()
1249        thread_connections[thread_id] = connection
1250
1251    return connection

Return the current alive connection.

Parameters
  • rebuild (bool, default False): If True, close the previous connection and open a new one.
Returns
  • A sqlalchemy.engine.base.Connection object.
def test_connection(self, **kw: Any) -> Optional[bool]:
707def test_connection(
708    self,
709    **kw: Any
710) -> Union[bool, None]:
711    """
712    Test if a successful connection to the database may be made.
713
714    Parameters
715    ----------
716    **kw:
717        The keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`.
718
719    Returns
720    -------
721    `True` if a connection is made, otherwise `False` or `None` in case of failure.
722
723    """
724    import warnings
725    from meerschaum.connectors.poll import retry_connect
726    _default_kw = {'max_retries': 1, 'retry_wait': 0, 'warn': False, 'connector': self}
727    _default_kw.update(kw)
728    with warnings.catch_warnings():
729        warnings.filterwarnings('ignore', 'Could not')
730        try:
731            return retry_connect(**_default_kw)
732        except Exception:
733            return False

Test if a successful connection to the database may be made.

Parameters
Returns
  • True if a connection is made, otherwise False or None in case of failure.
def fetch( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, int, str, NoneType] = '', end: Union[datetime.datetime, int, str, NoneType] = None, check_existing: bool = True, chunksize: Optional[int] = -1, workers: Optional[int] = None, debug: bool = False, **kw: Any) -> "Union['pd.DataFrame', List[Any], None]":
18def fetch(
19    self,
20    pipe: mrsm.Pipe,
21    begin: Union[datetime, int, str, None] = '',
22    end: Union[datetime, int, str, None] = None,
23    check_existing: bool = True,
24    chunksize: Optional[int] = -1,
25    workers: Optional[int] = None,
26    debug: bool = False,
27    **kw: Any
28) -> Union['pd.DataFrame', List[Any], None]:
29    """Execute the SQL definition and return a Pandas DataFrame.
30
31    Parameters
32    ----------
33    pipe: mrsm.Pipe
34        The pipe object which contains the `fetch` metadata.
35
36        - pipe.columns['datetime']: str
37            - Name of the datetime column for the remote table.
38        - pipe.parameters['fetch']: Dict[str, Any]
39            - Parameters necessary to execute a query.
40        - pipe.parameters['fetch']['definition']: str
41            - Raw SQL query to execute to generate the pandas DataFrame.
42        - pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
43            - How many minutes before `begin` to search for data (*optional*).
44
45    begin: Union[datetime, int, str, None], default None
46        Most recent datatime to search for data.
47        If `backtrack_minutes` is provided, subtract `backtrack_minutes`.
48
49    end: Union[datetime, int, str, None], default None
50        The latest datetime to search for data.
51        If `end` is `None`, do not bound 
52
53    check_existing: bool, defult True
54        If `False`, use a backtrack interval of 0 minutes.
55
56    chunksize: Optional[int], default -1
57        How many rows to load into memory at once.
58        Otherwise the entire result set is loaded into memory.
59
60    workers: Optional[int], default None
61        How many threads to use when consuming the generator.
62        Defaults to the number of cores.
63
64    debug: bool, default False
65        Verbosity toggle.
66
67    Returns
68    -------
69    A pandas DataFrame generator.
70    """
71    meta_def = self.get_pipe_metadef(
72        pipe,
73        begin=begin,
74        end=end,
75        check_existing=check_existing,
76        debug=debug,
77        **kw
78    )
79    chunks = self.read(
80        meta_def,
81        chunksize=chunksize,
82        workers=workers,
83        as_iterator=True,
84        debug=debug,
85    )
86    return chunks

Execute the SQL definition and return a Pandas DataFrame.

Parameters
  • pipe (mrsm.Pipe): The pipe object which contains the fetch metadata.

    • pipe.columns['datetime']: str
      • Name of the datetime column for the remote table.
    • pipe.parameters['fetch']: Dict[str, Any]
      • Parameters necessary to execute a query.
    • pipe.parameters['fetch']['definition']: str
      • Raw SQL query to execute to generate the pandas DataFrame.
    • pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
      • How many minutes before begin to search for data (optional).
  • begin (Union[datetime, int, str, None], default None): Most recent datatime to search for data. If backtrack_minutes is provided, subtract backtrack_minutes.
  • end (Union[datetime, int, str, None], default None): The latest datetime to search for data. If end is None, do not bound
  • check_existing (bool, defult True): If False, use a backtrack interval of 0 minutes.
  • chunksize (Optional[int], default -1): How many rows to load into memory at once. Otherwise the entire result set is loaded into memory.
  • workers (Optional[int], default None): How many threads to use when consuming the generator. Defaults to the number of cores.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pandas DataFrame generator.
def get_pipe_metadef( self, pipe: meerschaum.Pipe, params: Optional[Dict[str, Any]] = None, begin: Union[datetime.datetime, int, str, NoneType] = '', end: Union[datetime.datetime, int, str, NoneType] = None, check_existing: bool = True, debug: bool = False, **kw: Any) -> Optional[str]:
 89def get_pipe_metadef(
 90    self,
 91    pipe: mrsm.Pipe,
 92    params: Optional[Dict[str, Any]] = None,
 93    begin: Union[datetime, int, str, None] = '',
 94    end: Union[datetime, int, str, None] = None,
 95    check_existing: bool = True,
 96    debug: bool = False,
 97    **kw: Any
 98) -> Union[str, None]:
 99    """
100    Return a pipe's meta definition fetch query.
101
102    params: Optional[Dict[str, Any]], default None
103        Optional params dictionary to build the `WHERE` clause.
104        See `meerschaum.utils.sql.build_where`.
105
106    begin: Union[datetime, int, str, None], default None
107        Most recent datatime to search for data.
108        If `backtrack_minutes` is provided, subtract `backtrack_minutes`.
109
110    end: Union[datetime, int, str, None], default None
111        The latest datetime to search for data.
112        If `end` is `None`, do not bound 
113
114    check_existing: bool, default True
115        If `True`, apply the backtrack interval.
116
117    debug: bool, default False
118        Verbosity toggle.
119
120    Returns
121    -------
122    A pipe's meta definition fetch query string.
123    """
124    from meerschaum.utils.warnings import warn
125    from meerschaum.utils.sql import sql_item_name, dateadd_str, build_where
126    from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type
127    from meerschaum.config import get_config
128
129    dt_col = pipe.columns.get('datetime', None)
130    if not dt_col:
131        dt_col = pipe.guess_datetime()
132        dt_name = sql_item_name(dt_col, self.flavor, None) if dt_col else None
133        is_guess = True
134    else:
135        dt_name = sql_item_name(dt_col, self.flavor, None)
136        is_guess = False
137    dt_typ = pipe.dtypes.get(dt_col, 'datetime') if dt_col else None
138    db_dt_typ = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None
139
140    if begin not in (None, '') or end is not None:
141        if is_guess:
142            if dt_col is None:
143                warn(
144                    f"Unable to determine a datetime column for {pipe}."
145                    + "\n    Ignoring begin and end...",
146                    stack=False,
147                )
148                begin, end = '', None
149            else:
150                warn(
151                    f"A datetime wasn't specified for {pipe}.\n"
152                    + f"    Using column \"{dt_col}\" for datetime bounds...",
153                    stack=False
154                )
155
156    apply_backtrack = begin == '' and check_existing
157    backtrack_interval = pipe.get_backtrack_interval(check_existing=check_existing, debug=debug)
158    btm = (
159        int(backtrack_interval.total_seconds() / 60)
160        if isinstance(backtrack_interval, timedelta)
161        else backtrack_interval
162    )
163    begin = (
164        pipe.get_sync_time(debug=debug)
165        if begin == ''
166        else begin
167    )
168
169    if begin not in (None, '') and end is not None and begin >= end:
170        begin = None
171
172    if dt_name:
173        begin_da = (
174            dateadd_str(
175                flavor=self.flavor,
176                datepart='minute',
177                number=((-1 * btm) if apply_backtrack else 0),
178                begin=begin,
179                db_type=db_dt_typ,
180            )
181            if begin not in ('', None)
182            else None
183        )
184        end_da = (
185            dateadd_str(
186                flavor=self.flavor,
187                datepart='minute',
188                number=0,
189                begin=end,
190                db_type=db_dt_typ,
191            )
192            if end is not None
193            else None
194        )
195
196    definition_name = sql_item_name('definition', self.flavor, None)
197    meta_def = (
198        _simple_fetch_query(pipe, self.flavor) if (
199            (not (pipe.columns or {}).get('id', None))
200            or (not get_config('system', 'experimental', 'join_fetch'))
201        ) else _join_fetch_query(pipe, self.flavor, debug=debug, **kw)
202    )
203
204    has_where = 'where' in meta_def.lower()[meta_def.lower().rfind('definition'):]
205    if dt_name and (begin_da or end_da):
206        definition_dt_name = f"{definition_name}.{dt_name}"
207        meta_def += "\n" + ("AND" if has_where else "WHERE") + " "
208        has_where = True
209        if begin_da:
210            meta_def += f"\n    {definition_dt_name}\n    >=\n    {begin_da}\n"
211        if begin_da and end_da:
212            meta_def += "    AND"
213        if end_da:
214            meta_def += f"\n    {definition_dt_name}\n    <\n    {end_da}\n"
215
216    if params is not None:
217        params_where = build_where(params, self, with_where=False)
218        meta_def += "\n    " + ("AND" if has_where else "WHERE") + "    "
219        has_where = True
220        meta_def += params_where
221
222    return meta_def.rstrip()

Return a pipe's meta definition fetch query.

params: Optional[Dict[str, Any]], default None Optional params dictionary to build the WHERE clause. See meerschaum.utils.sql.build_where.

begin: Union[datetime, int, str, None], default None Most recent datatime to search for data. If backtrack_minutes is provided, subtract backtrack_minutes.

end: Union[datetime, int, str, None], default None The latest datetime to search for data. If end is None, do not bound

check_existing: bool, default True If True, apply the backtrack interval.

debug: bool, default False Verbosity toggle.

Returns
  • A pipe's meta definition fetch query string.
def cli(self, debug: bool = False) -> Tuple[bool, str]:
36def cli(
37    self,
38    debug: bool = False,
39) -> SuccessTuple:
40    """
41    Launch a subprocess for an interactive CLI.
42    """
43    from meerschaum.utils.warnings import dprint
44    from meerschaum.utils.venv import venv_exec
45    env = copy.deepcopy(dict(os.environ))
46    env_key = f"MRSM_SQL_{self.label.upper()}"
47    env_val = json.dumps(self.meta)
48    env[env_key] = env_val
49    cli_code = (
50        "import sys\n"
51        "import meerschaum as mrsm\n"
52        "import os\n"
53        f"conn = mrsm.get_connector('sql:{self.label}')\n"
54        "success, msg = conn._cli_exit()\n"
55        "mrsm.pprint((success, msg))\n"
56        "if not success:\n"
57        "    raise Exception(msg)"
58    )
59    if debug:
60        dprint(cli_code)
61    try:
62        _ = venv_exec(cli_code, venv=None, env=env, debug=debug, capture_output=False)
63    except Exception as e:
64        return False, f"[{self}] Failed to start CLI:\n{e}"
65    return True, "Success"

Launch a subprocess for an interactive CLI.

def fetch_pipes_keys( self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) -> Optional[List[Tuple[str, str, Optional[str]]]]:
143def fetch_pipes_keys(
144    self,
145    connector_keys: Optional[List[str]] = None,
146    metric_keys: Optional[List[str]] = None,
147    location_keys: Optional[List[str]] = None,
148    tags: Optional[List[str]] = None,
149    params: Optional[Dict[str, Any]] = None,
150    debug: bool = False
151) -> Optional[List[Tuple[str, str, Optional[str]]]]:
152    """
153    Return a list of tuples corresponding to the parameters provided.
154
155    Parameters
156    ----------
157    connector_keys: Optional[List[str]], default None
158        List of connector_keys to search by.
159
160    metric_keys: Optional[List[str]], default None
161        List of metric_keys to search by.
162
163    location_keys: Optional[List[str]], default None
164        List of location_keys to search by.
165
166    params: Optional[Dict[str, Any]], default None
167        Dictionary of additional parameters to search by.
168        E.g. `--params pipe_id:1`
169
170    debug: bool, default False
171        Verbosity toggle.
172    """
173    from meerschaum.utils.debug import dprint
174    from meerschaum.utils.packages import attempt_import
175    from meerschaum.utils.misc import separate_negation_values
176    from meerschaum.utils.sql import OMIT_NULLSFIRST_FLAVORS, table_exists
177    from meerschaum.config.static import STATIC_CONFIG
178    import json
179    from copy import deepcopy
180    sqlalchemy, sqlalchemy_sql_functions = attempt_import(
181        'sqlalchemy',
182        'sqlalchemy.sql.functions', lazy=False,
183    )
184    coalesce = sqlalchemy_sql_functions.coalesce
185
186    if connector_keys is None:
187        connector_keys = []
188    if metric_keys is None:
189        metric_keys = []
190    if location_keys is None:
191        location_keys = []
192    else:
193        location_keys = [
194            (
195                lk
196                if lk not in ('[None]', 'None', 'null')
197                else 'None'
198            )
199            for lk in location_keys
200        ]
201    if tags is None:
202        tags = []
203
204    if params is None:
205        params = {}
206
207    ### Add three primary keys to params dictionary
208    ###   (separated for convenience of arguments).
209    cols = {
210        'connector_keys': [str(ck) for ck in connector_keys],
211        'metric_key': [str(mk) for mk in metric_keys],
212        'location_key': [str(lk) for lk in location_keys],
213    }
214
215    ### Make deep copy so we don't mutate this somewhere else.
216    parameters = deepcopy(params)
217    for col, vals in cols.items():
218        if vals not in [[], ['*']]:
219            parameters[col] = vals
220
221    if not table_exists('mrsm_pipes', self, schema=self.instance_schema, debug=debug):
222        return []
223
224    from meerschaum.connectors.sql.tables import get_tables
225    pipes_tbl = get_tables(mrsm_instance=self, create=False, debug=debug)['pipes']
226
227    _params = {}
228    for k, v in parameters.items():
229        _v = json.dumps(v) if isinstance(v, dict) else v
230        _params[k] = _v
231
232    negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
233    ### Parse regular params.
234    ### If a param begins with '_', negate it instead.
235    _where = [
236        (
237            (coalesce(pipes_tbl.c[key], 'None') == val)
238            if not str(val).startswith(negation_prefix)
239            else (pipes_tbl.c[key] != key)
240        ) for key, val in _params.items()
241        if not isinstance(val, (list, tuple)) and key in pipes_tbl.c
242    ]
243    select_cols = (
244        [
245            pipes_tbl.c.connector_keys,
246            pipes_tbl.c.metric_key,
247            pipes_tbl.c.location_key,
248        ]
249    )
250
251    q = sqlalchemy.select(*select_cols).where(sqlalchemy.and_(True, *_where))
252    for c, vals in cols.items():
253        if not isinstance(vals, (list, tuple)) or not vals or c not in pipes_tbl.c:
254            continue
255        _in_vals, _ex_vals = separate_negation_values(vals)
256        q = q.where(coalesce(pipes_tbl.c[c], 'None').in_(_in_vals)) if _in_vals else q
257        q = q.where(coalesce(pipes_tbl.c[c], 'None').not_in(_ex_vals)) if _ex_vals else q
258
259    ### Finally, parse tags.
260    tag_groups = [tag.split(',') for tag in tags]
261    in_ex_tag_groups = [separate_negation_values(tag_group) for tag_group in tag_groups]
262
263    ors, nands = [], []
264    for _in_tags, _ex_tags in in_ex_tag_groups:
265        sub_ands = []
266        for nt in _in_tags:
267            sub_ands.append(
268                sqlalchemy.cast(
269                    pipes_tbl.c['parameters'],
270                    sqlalchemy.String,
271                ).like(f'%"tags":%"{nt}"%')
272            )
273        if sub_ands:
274            ors.append(sqlalchemy.and_(*sub_ands))
275
276        for xt in _ex_tags:
277            nands.append(
278                sqlalchemy.cast(
279                    pipes_tbl.c['parameters'],
280                    sqlalchemy.String,
281                ).not_like(f'%"tags":%"{xt}"%')
282            )
283
284    q = q.where(sqlalchemy.and_(*nands)) if nands else q
285    q = q.where(sqlalchemy.or_(*ors)) if ors else q
286    loc_asc = sqlalchemy.asc(pipes_tbl.c['location_key'])
287    if self.flavor not in OMIT_NULLSFIRST_FLAVORS:
288        loc_asc = sqlalchemy.nullsfirst(loc_asc)
289    q = q.order_by(
290        sqlalchemy.asc(pipes_tbl.c['connector_keys']),
291        sqlalchemy.asc(pipes_tbl.c['metric_key']),
292        loc_asc,
293    )
294
295    ### execute the query and return a list of tuples
296    if debug:
297        dprint(q.compile(compile_kwargs={'literal_binds': True}))
298    try:
299        rows = (
300            self.execute(q).fetchall()
301            if self.flavor != 'duckdb'
302            else [
303                (row['connector_keys'], row['metric_key'], row['location_key'])
304                for row in self.read(q).to_dict(orient='records')
305            ]
306        )
307    except Exception as e:
308        error(str(e))
309
310    return [(row[0], row[1], row[2]) for row in rows]

Return a list of tuples corresponding to the parameters provided.

Parameters
  • connector_keys (Optional[List[str]], default None): List of connector_keys to search by.
  • metric_keys (Optional[List[str]], default None): List of metric_keys to search by.
  • location_keys (Optional[List[str]], default None): List of location_keys to search by.
  • params (Optional[Dict[str, Any]], default None): Dictionary of additional parameters to search by. E.g. --params pipe_id:1
  • debug (bool, default False): Verbosity toggle.
def create_indices( self, pipe: meerschaum.Pipe, columns: Optional[List[str]] = None, indices: Optional[List[str]] = None, debug: bool = False) -> bool:
331def create_indices(
332    self,
333    pipe: mrsm.Pipe,
334    columns: Optional[List[str]] = None,
335    indices: Optional[List[str]] = None,
336    debug: bool = False
337) -> bool:
338    """
339    Create a pipe's indices.
340    """
341    from meerschaum.utils.debug import dprint
342    if debug:
343        dprint(f"Creating indices for {pipe}...")
344
345    if not pipe.indices:
346        warn(f"{pipe} has no index columns; skipping index creation.", stack=False)
347        return True
348
349    cols_to_include = set((columns or []) + (indices or [])) or None
350
351    _ = pipe.__dict__.pop('_columns_indices', None)
352    ix_queries = {
353        col: queries
354        for col, queries in self.get_create_index_queries(pipe, debug=debug).items()
355        if cols_to_include is None or col in cols_to_include
356    }
357    success = True
358    for col, queries in ix_queries.items():
359        ix_success = all(self.exec_queries(queries, debug=debug, silent=False))
360        success = success and ix_success
361        if not ix_success:
362            warn(f"Failed to create index on column: {col}")
363
364    return success

Create a pipe's indices.

def drop_indices( self, pipe: meerschaum.Pipe, columns: Optional[List[str]] = None, indices: Optional[List[str]] = None, debug: bool = False) -> bool:
385def drop_indices(
386    self,
387    pipe: mrsm.Pipe,
388    columns: Optional[List[str]] = None,
389    indices: Optional[List[str]] = None,
390    debug: bool = False
391) -> bool:
392    """
393    Drop a pipe's indices.
394    """
395    from meerschaum.utils.debug import dprint
396    if debug:
397        dprint(f"Dropping indices for {pipe}...")
398
399    if not pipe.indices:
400        warn(f"No indices to drop for {pipe}.", stack=False)
401        return False
402
403    cols_to_include = set((columns or []) + (indices or [])) or None
404
405    ix_queries = {
406        col: queries
407        for col, queries in self.get_drop_index_queries(pipe, debug=debug).items()
408        if cols_to_include is None or col in cols_to_include
409    }
410    success = True
411    for col, queries in ix_queries.items():
412        ix_success = all(self.exec_queries(queries, debug=debug, silent=(not debug)))
413        if not ix_success:
414            success = False
415            if debug:
416                dprint(f"Failed to drop index on column: {col}")
417    return success

Drop a pipe's indices.

def get_create_index_queries( self, pipe: meerschaum.Pipe, debug: bool = False) -> Dict[str, List[str]]:
473def get_create_index_queries(
474    self,
475    pipe: mrsm.Pipe,
476    debug: bool = False,
477) -> Dict[str, List[str]]:
478    """
479    Return a dictionary mapping columns to a `CREATE INDEX` or equivalent query.
480
481    Parameters
482    ----------
483    pipe: mrsm.Pipe
484        The pipe to which the queries will correspond.
485
486    Returns
487    -------
488    A dictionary of index names mapping to lists of queries.
489    """
490    ### NOTE: Due to recent breaking changes in DuckDB, indices don't behave properly.
491    if self.flavor == 'duckdb':
492        return {}
493    from meerschaum.utils.sql import (
494        sql_item_name,
495        get_distinct_col_count,
496        UPDATE_QUERIES,
497        get_null_replacement,
498        get_create_table_queries,
499        get_rename_table_queries,
500        COALESCE_UNIQUE_INDEX_FLAVORS,
501    )
502    from meerschaum.utils.dtypes.sql import (
503        get_db_type_from_pd_type,
504        get_pd_type_from_db_type,
505        AUTO_INCREMENT_COLUMN_FLAVORS,
506    )
507    from meerschaum.config import get_config
508    index_queries = {}
509
510    upsert = pipe.parameters.get('upsert', False) and (self.flavor + '-upsert') in UPDATE_QUERIES
511    static = pipe.parameters.get('static', False)
512    null_indices = pipe.parameters.get('null_indices', True)
513    index_names = pipe.get_indices()
514    unique_index_name_unquoted = index_names.get('unique', None) or f'IX_{pipe.target}_unique'
515    if upsert:
516        _ = index_names.pop('unique', None)
517    indices = pipe.indices
518    existing_cols_types = pipe.get_columns_types(debug=debug)
519    existing_cols_pd_types = {
520        col: get_pd_type_from_db_type(typ)
521        for col, typ in existing_cols_types.items()
522    }
523    existing_cols_indices = self.get_pipe_columns_indices(pipe, debug=debug)
524    existing_ix_names = set()
525    existing_primary_keys = []
526    existing_clustered_primary_keys = []
527    for col, col_indices in existing_cols_indices.items():
528        for col_ix_doc in col_indices:
529            existing_ix_names.add(col_ix_doc.get('name', '').lower())
530            if col_ix_doc.get('type', None) == 'PRIMARY KEY':
531                existing_primary_keys.append(col.lower())
532                if col_ix_doc.get('clustered', True):
533                    existing_clustered_primary_keys.append(col.lower())
534
535    _datetime = pipe.get_columns('datetime', error=False)
536    _datetime_name = (
537        sql_item_name(_datetime, self.flavor, None)
538        if _datetime is not None else None
539    )
540    _datetime_index_name = (
541        sql_item_name(index_names['datetime'], flavor=self.flavor, schema=None)
542        if index_names.get('datetime', None)
543        else None
544    )
545    _id = pipe.get_columns('id', error=False)
546    _id_name = (
547        sql_item_name(_id, self.flavor, None)
548        if _id is not None
549        else None
550    )
551    primary_key = pipe.columns.get('primary', None)
552    primary_key_name = (
553        sql_item_name(primary_key, flavor=self.flavor, schema=None)
554        if primary_key
555        else None
556    )
557    autoincrement = (
558        pipe.parameters.get('autoincrement', False)
559        or (
560            primary_key is not None
561            and primary_key not in existing_cols_pd_types
562        )
563    )
564    primary_key_db_type = (
565        get_db_type_from_pd_type(pipe.dtypes.get(primary_key, 'int') or 'int', self.flavor)
566        if primary_key
567        else None
568    )
569    primary_key_constraint_name = (
570        sql_item_name(f'PK_{pipe.target}', self.flavor, None)
571        if primary_key is not None
572        else None
573    )
574    primary_key_clustered = "CLUSTERED" if _datetime is None else "NONCLUSTERED"
575    datetime_clustered = (
576        "CLUSTERED"
577        if not existing_clustered_primary_keys and _datetime is not None
578        else "NONCLUSTERED"
579    )
580    include_columns_str = "\n    ,".join(
581        [
582            sql_item_name(col, flavor=self.flavor) for col in existing_cols_types
583            if col != _datetime
584        ]
585    ).rstrip(',')
586    include_clause = (
587        (
588            f"\nINCLUDE (\n    {include_columns_str}\n)"
589        )
590        if datetime_clustered == 'NONCLUSTERED'
591        else ''
592    )
593
594    _id_index_name = (
595        sql_item_name(index_names['id'], self.flavor, None)
596        if index_names.get('id', None)
597        else None
598    )
599    _pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))
600    _create_space_partition = get_config('system', 'experimental', 'space')
601
602    ### create datetime index
603    dt_query = None
604    if _datetime is not None:
605        if self.flavor == 'timescaledb' and pipe.parameters.get('hypertable', True):
606            _id_count = (
607                get_distinct_col_count(_id, f"SELECT {_id_name} FROM {_pipe_name}", self)
608                if (_id is not None and _create_space_partition) else None
609            )
610
611            chunk_interval = pipe.get_chunk_interval(debug=debug)
612            chunk_interval_minutes = (
613                chunk_interval
614                if isinstance(chunk_interval, int)
615                else int(chunk_interval.total_seconds() / 60)
616            )
617            chunk_time_interval = (
618                f"INTERVAL '{chunk_interval_minutes} MINUTES'"
619                if isinstance(chunk_interval, timedelta)
620                else f'{chunk_interval_minutes}'
621            )
622
623            dt_query = (
624                f"SELECT public.create_hypertable('{_pipe_name}', " +
625                f"'{_datetime}', "
626                + (
627                    f"'{_id}', {_id_count}, " if (_id is not None and _create_space_partition)
628                    else ''
629                )
630                + f'chunk_time_interval => {chunk_time_interval}, '
631                + 'if_not_exists => true, '
632                + "migrate_data => true);"
633            )
634        elif _datetime_index_name and _datetime != primary_key:
635            if self.flavor == 'mssql':
636                dt_query = (
637                    f"CREATE {datetime_clustered} INDEX {_datetime_index_name} "
638                    f"\nON {_pipe_name} ({_datetime_name}){include_clause}"
639                )
640            else:
641                dt_query = (
642                    f"CREATE INDEX {_datetime_index_name} "
643                    + f"ON {_pipe_name} ({_datetime_name})"
644                )
645
646    if dt_query:
647        index_queries[_datetime] = [dt_query]
648
649    primary_queries = []
650    if (
651        primary_key is not None
652        and primary_key.lower() not in existing_primary_keys
653        and not static
654    ):
655        if autoincrement and primary_key not in existing_cols_pd_types:
656            autoincrement_str = AUTO_INCREMENT_COLUMN_FLAVORS.get(
657                self.flavor,
658                AUTO_INCREMENT_COLUMN_FLAVORS['default']
659            )
660            primary_queries.extend([
661                (
662                    f"ALTER TABLE {_pipe_name}\n"
663                    f"ADD {primary_key_name} {primary_key_db_type} {autoincrement_str}"
664                ),
665            ])
666        elif not autoincrement and primary_key in existing_cols_pd_types:
667            if self.flavor == 'sqlite':
668                new_table_name = sql_item_name(
669                    f'_new_{pipe.target}',
670                    self.flavor,
671                    self.get_pipe_schema(pipe)
672                )
673                select_cols_str = ', '.join(
674                    [
675                        sql_item_name(col, self.flavor, None)
676                        for col in existing_cols_types
677                    ]
678                )
679                primary_queries.extend(
680                    get_create_table_queries(
681                        existing_cols_pd_types,
682                        f'_new_{pipe.target}',
683                        self.flavor,
684                        schema=self.get_pipe_schema(pipe),
685                        primary_key=primary_key,
686                    ) + [
687                        (
688                            f"INSERT INTO {new_table_name} ({select_cols_str})\n"
689                            f"SELECT {select_cols_str}\nFROM {_pipe_name}"
690                        ),
691                        f"DROP TABLE {_pipe_name}",
692                    ] + get_rename_table_queries(
693                        f'_new_{pipe.target}',
694                        pipe.target,
695                        self.flavor,
696                        schema=self.get_pipe_schema(pipe),
697                    )
698                )
699            elif self.flavor == 'oracle':
700                primary_queries.extend([
701                    (
702                        f"ALTER TABLE {_pipe_name}\n"
703                        f"MODIFY {primary_key_name} NOT NULL"
704                    ),
705                    (
706                        f"ALTER TABLE {_pipe_name}\n"
707                        f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})"
708                    )
709                ])
710            elif self.flavor in ('mysql', 'mariadb'):
711                primary_queries.extend([
712                    (
713                        f"ALTER TABLE {_pipe_name}\n"
714                        f"MODIFY {primary_key_name} {primary_key_db_type} NOT NULL"
715                    ),
716                    (
717                        f"ALTER TABLE {_pipe_name}\n"
718                        f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})"
719                    )
720                ])
721            elif self.flavor == 'timescaledb':
722                primary_queries.extend([
723                    (
724                        f"ALTER TABLE {_pipe_name}\n"
725                        f"ALTER COLUMN {primary_key_name} SET NOT NULL"
726                    ),
727                    (
728                        f"ALTER TABLE {_pipe_name}\n"
729                        f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY (" + (
730                            f"{_datetime_name}, " if _datetime_name else ""
731                        ) + f"{primary_key_name})"
732                    ),
733                ])
734            elif self.flavor in ('citus', 'postgresql', 'duckdb'):
735                primary_queries.extend([
736                    (
737                        f"ALTER TABLE {_pipe_name}\n"
738                        f"ALTER COLUMN {primary_key_name} SET NOT NULL"
739                    ),
740                    (
741                        f"ALTER TABLE {_pipe_name}\n"
742                        f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})"
743                    ),
744                ])
745            else:
746                primary_queries.extend([
747                    (
748                        f"ALTER TABLE {_pipe_name}\n"
749                        f"ALTER COLUMN {primary_key_name} {primary_key_db_type} NOT NULL"
750                    ),
751                    (
752                        f"ALTER TABLE {_pipe_name}\n"
753                        f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY {primary_key_clustered} ({primary_key_name})"
754                    ),
755                ])
756        index_queries[primary_key] = primary_queries
757
758    ### create id index
759    if _id_name is not None:
760        if self.flavor == 'timescaledb':
761            ### Already created indices via create_hypertable.
762            id_query = (
763                None if (_id is not None and _create_space_partition)
764                else (
765                    f"CREATE INDEX IF NOT EXISTS {_id_index_name} ON {_pipe_name} ({_id_name})"
766                    if _id is not None
767                    else None
768                )
769            )
770            pass
771        else: ### mssql, sqlite, etc.
772            id_query = f"CREATE INDEX {_id_index_name} ON {_pipe_name} ({_id_name})"
773
774        if id_query is not None:
775            index_queries[_id] = id_query if isinstance(id_query, list) else [id_query]
776
777    ### Create indices for other labels in `pipe.columns`.
778    other_index_names = {
779        ix_key: ix_unquoted
780        for ix_key, ix_unquoted in index_names.items()
781        if (
782            ix_key not in ('datetime', 'id', 'primary')
783            and ix_unquoted.lower() not in existing_ix_names
784        )
785    }
786    for ix_key, ix_unquoted in other_index_names.items():
787        ix_name = sql_item_name(ix_unquoted, self.flavor, None)
788        cols = indices[ix_key]
789        if not isinstance(cols, (list, tuple)):
790            cols = [cols]
791        if ix_key == 'unique' and upsert:
792            continue
793        cols_names = [sql_item_name(col, self.flavor, None) for col in cols if col]
794        if not cols_names:
795            continue
796        cols_names_str = ", ".join(cols_names)
797        index_queries[ix_key] = [f"CREATE INDEX {ix_name} ON {_pipe_name} ({cols_names_str})"]
798
799    indices_cols_str = ', '.join(
800        list({
801            sql_item_name(ix, self.flavor)
802            for ix_key, ix in pipe.columns.items()
803            if ix and ix in existing_cols_types
804        })
805    )
806    coalesce_indices_cols_str = ', '.join(
807        [
808            (
809                (
810                    "COALESCE("
811                    + sql_item_name(ix, self.flavor)
812                    + ", "
813                    + get_null_replacement(existing_cols_types[ix], self.flavor)
814                    + ") "
815                )
816                if ix_key != 'datetime' and null_indices
817                else sql_item_name(ix, self.flavor)
818            )
819            for ix_key, ix in pipe.columns.items()
820            if ix and ix in existing_cols_types
821        ]
822    )
823    unique_index_name = sql_item_name(unique_index_name_unquoted, self.flavor)
824    constraint_name_unquoted = unique_index_name_unquoted.replace('IX_', 'UQ_')
825    constraint_name = sql_item_name(constraint_name_unquoted, self.flavor)
826    add_constraint_query = (
827        f"ALTER TABLE {_pipe_name} ADD CONSTRAINT {constraint_name} UNIQUE ({indices_cols_str})"
828    )
829    unique_index_cols_str = (
830        indices_cols_str
831        if self.flavor not in COALESCE_UNIQUE_INDEX_FLAVORS or not null_indices
832        else coalesce_indices_cols_str
833    )
834    create_unique_index_query = (
835        f"CREATE UNIQUE INDEX {unique_index_name} ON {_pipe_name} ({unique_index_cols_str})"
836    )
837    constraint_queries = [create_unique_index_query]
838    if self.flavor != 'sqlite':
839        constraint_queries.append(add_constraint_query)
840    if upsert and indices_cols_str:
841        index_queries[unique_index_name] = constraint_queries
842    return index_queries

Return a dictionary mapping columns to a CREATE INDEX or equivalent query.

Parameters
  • pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
  • A dictionary of index names mapping to lists of queries.
def get_drop_index_queries( self, pipe: meerschaum.Pipe, debug: bool = False) -> Dict[str, List[str]]:
845def get_drop_index_queries(
846    self,
847    pipe: mrsm.Pipe,
848    debug: bool = False,
849) -> Dict[str, List[str]]:
850    """
851    Return a dictionary mapping columns to a `DROP INDEX` or equivalent query.
852
853    Parameters
854    ----------
855    pipe: mrsm.Pipe
856        The pipe to which the queries will correspond.
857
858    Returns
859    -------
860    A dictionary of column names mapping to lists of queries.
861    """
862    ### NOTE: Due to breaking changes within DuckDB, indices must be skipped.
863    if self.flavor == 'duckdb':
864        return {}
865    if not pipe.exists(debug=debug):
866        return {}
867
868    from collections import defaultdict
869    from meerschaum.utils.sql import (
870        sql_item_name,
871        table_exists,
872        hypertable_queries,
873        DROP_INDEX_IF_EXISTS_FLAVORS,
874    )
875    drop_queries = defaultdict(lambda: [])
876    schema = self.get_pipe_schema(pipe)
877    index_schema = schema if self.flavor != 'mssql' else None
878    indices = {
879        ix_key: ix
880        for ix_key, ix in pipe.get_indices().items()
881    }
882    cols_indices = pipe.get_columns_indices(debug=debug)
883    existing_indices = set()
884    clustered_ix = None
885    for col, ix_metas in cols_indices.items():
886        for ix_meta in ix_metas:
887            ix_name = ix_meta.get('name', None)
888            if ix_meta.get('clustered', False):
889                clustered_ix = ix_name
890            existing_indices.add(ix_name.lower())
891    pipe_name = sql_item_name(pipe.target, self.flavor, schema)
892    pipe_name_no_schema = sql_item_name(pipe.target, self.flavor, None)
893    upsert = pipe.upsert
894
895    if self.flavor not in hypertable_queries:
896        is_hypertable = False
897    else:
898        is_hypertable_query = hypertable_queries[self.flavor].format(table_name=pipe_name)
899        is_hypertable = self.value(is_hypertable_query, silent=True, debug=debug) is not None
900
901    if_exists_str = "IF EXISTS " if self.flavor in DROP_INDEX_IF_EXISTS_FLAVORS else ""
902    if is_hypertable:
903        nuke_queries = []
904        temp_table = '_' + pipe.target + '_temp_migration'
905        temp_table_name = sql_item_name(temp_table, self.flavor, self.get_pipe_schema(pipe))
906
907        if table_exists(temp_table, self, schema=self.get_pipe_schema(pipe), debug=debug):
908            nuke_queries.append(f"DROP TABLE {if_exists_str} {temp_table_name}")
909        nuke_queries += [
910            f"SELECT * INTO {temp_table_name} FROM {pipe_name}",
911            f"DROP TABLE {if_exists_str}{pipe_name}",
912            f"ALTER TABLE {temp_table_name} RENAME TO {pipe_name_no_schema}",
913        ]
914        nuke_ix_keys = ('datetime', 'id')
915        nuked = False
916        for ix_key in nuke_ix_keys:
917            if ix_key in indices and not nuked:
918                drop_queries[ix_key].extend(nuke_queries)
919                nuked = True
920
921    for ix_key, ix_unquoted in indices.items():
922        if ix_key in drop_queries:
923            continue
924        if ix_unquoted.lower() not in existing_indices:
925            continue
926
927        if ix_key == 'unique' and upsert and self.flavor not in ('sqlite',) and not is_hypertable:
928            constraint_name_unquoted = ix_unquoted.replace('IX_', 'UQ_')
929            constraint_name = sql_item_name(constraint_name_unquoted, self.flavor)
930            constraint_or_index = (
931                "CONSTRAINT"
932                if self.flavor not in ('mysql', 'mariadb')
933                else 'INDEX'
934            )
935            drop_queries[ix_key].append(
936                f"ALTER TABLE {pipe_name}\n"
937                f"DROP {constraint_or_index} {constraint_name}"
938            )
939
940        query = (
941            (
942                f"ALTER TABLE {pipe_name}\n"
943                if self.flavor in ('mysql', 'mariadb')
944                else ''
945            )
946            + f"DROP INDEX {if_exists_str}"
947            + sql_item_name(ix_unquoted, self.flavor, index_schema)
948        )
949        if self.flavor == 'mssql':
950            query += f"\nON {pipe_name}"
951            if ix_unquoted == clustered_ix:
952                query += "\nWITH (ONLINE = ON, MAXDOP = 4)"
953        drop_queries[ix_key].append(query)
954
955
956    return drop_queries

Return a dictionary mapping columns to a DROP INDEX or equivalent query.

Parameters
  • pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
  • A dictionary of column names mapping to lists of queries.
def get_add_columns_queries( self, pipe: meerschaum.Pipe, df: 'Union[pd.DataFrame, Dict[str, str]]', _is_db_types: bool = False, debug: bool = False) -> List[str]:
3105def get_add_columns_queries(
3106    self,
3107    pipe: mrsm.Pipe,
3108    df: Union[pd.DataFrame, Dict[str, str]],
3109    _is_db_types: bool = False,
3110    debug: bool = False,
3111) -> List[str]:
3112    """
3113    Add new null columns of the correct type to a table from a dataframe.
3114
3115    Parameters
3116    ----------
3117    pipe: mrsm.Pipe
3118        The pipe to be altered.
3119
3120    df: Union[pd.DataFrame, Dict[str, str]]
3121        The pandas DataFrame which contains new columns.
3122        If a dictionary is provided, assume it maps columns to Pandas data types.
3123
3124    _is_db_types: bool, default False
3125        If `True`, assume `df` is a dictionary mapping columns to SQL native dtypes.
3126
3127    Returns
3128    -------
3129    A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector.
3130    """
3131    if not pipe.exists(debug=debug):
3132        return []
3133
3134    if pipe.parameters.get('static', False):
3135        return []
3136
3137    from decimal import Decimal
3138    import copy
3139    from meerschaum.utils.sql import (
3140        sql_item_name,
3141        SINGLE_ALTER_TABLE_FLAVORS,
3142        get_table_cols_types,
3143    )
3144    from meerschaum.utils.dtypes.sql import (
3145        get_pd_type_from_db_type,
3146        get_db_type_from_pd_type,
3147    )
3148    from meerschaum.utils.misc import flatten_list
3149    table_obj = self.get_pipe_table(pipe, debug=debug)
3150    is_dask = 'dask' in df.__module__ if not isinstance(df, dict) else False
3151    if is_dask:
3152        df = df.partitions[0].compute()
3153    df_cols_types = (
3154        {
3155            col: str(typ)
3156            for col, typ in df.dtypes.items()
3157        }
3158        if not isinstance(df, dict)
3159        else copy.deepcopy(df)
3160    )
3161    if not isinstance(df, dict) and len(df.index) > 0:
3162        for col, typ in list(df_cols_types.items()):
3163            if typ != 'object':
3164                continue
3165            val = df.iloc[0][col]
3166            if isinstance(val, (dict, list)):
3167                df_cols_types[col] = 'json'
3168            elif isinstance(val, Decimal):
3169                df_cols_types[col] = 'numeric'
3170            elif isinstance(val, str):
3171                df_cols_types[col] = 'str'
3172    db_cols_types = {
3173        col: get_pd_type_from_db_type(str(typ.type))
3174        for col, typ in table_obj.columns.items()
3175    } if table_obj is not None else {
3176        col: get_pd_type_from_db_type(typ)
3177        for col, typ in get_table_cols_types(
3178            pipe.target,
3179            self,
3180            schema=self.get_pipe_schema(pipe),
3181            debug=debug,
3182        ).items()
3183    }
3184    new_cols = set(df_cols_types) - set(db_cols_types)
3185    if not new_cols:
3186        return []
3187
3188    new_cols_types = {
3189        col: get_db_type_from_pd_type(
3190            df_cols_types[col],
3191            self.flavor