meerschaum.connectors

Create connectors with get_connector(). For ease of use, you can also import from the root meerschaum module:

>>> from meerschaum import get_connector
>>> conn = get_connector()
  1#! /usr/bin/env python
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Create connectors with `meerschaum.connectors.get_connector()`.
  7For ease of use, you can also import from the root `meerschaum` module:
  8```
  9>>> from meerschaum import get_connector
 10>>> conn = get_connector()
 11```
 12"""
 13
 14from __future__ import annotations
 15
 16import meerschaum as mrsm
 17from meerschaum.utils.typing import Any, SuccessTuple, Union, Optional, List, Dict
 18from meerschaum.utils.threading import Lock, RLock
 19from meerschaum.utils.warnings import error, warn
 20
 21from meerschaum.connectors.Connector import Connector, InvalidAttributesError
 22from meerschaum.connectors.sql.SQLConnector import SQLConnector
 23from meerschaum.connectors.api.APIConnector import APIConnector
 24from meerschaum.connectors.sql._create_engine import flavor_configs as sql_flavor_configs
 25
 26__all__ = ("Connector", "SQLConnector", "APIConnector", "get_connector", "is_connected")
 27
 28### store connectors partitioned by
 29### type, label for reuse
 30connectors: Dict[str, Dict[str, Connector]] = {
 31    'api'   : {},
 32    'sql'   : {},
 33    'plugin': {},
 34}
 35instance_types: List[str] = ['sql', 'api']
 36_locks: Dict[str, RLock] = {
 37    'connectors'               : RLock(),
 38    'types'                    : RLock(),
 39    'custom_types'             : RLock(),
 40    '_loaded_plugin_connectors': RLock(),
 41    'instance_types'           : RLock(),
 42}
 43attributes: Dict[str, Dict[str, Any]] = {
 44    'api': {
 45        'required': [
 46            'host',
 47            'username',
 48            'password'
 49        ],
 50        'default': {
 51            'protocol': 'http',
 52        },
 53    },
 54    'sql': {
 55        'flavors': sql_flavor_configs,
 56    },
 57}
 58### Fill this with objects only when connectors are first referenced.
 59types: Dict[str, Any] = {}
 60custom_types: set = set()
 61_loaded_plugin_connectors: bool = False
 62
 63
 64def get_connector(
 65        type: str = None,
 66        label: str = None,
 67        refresh: bool = False,
 68        debug: bool = False,
 69        **kw: Any
 70    ) -> Connector:
 71    """
 72    Return existing connector or create new connection and store for reuse.
 73    
 74    You can create new connectors if enough parameters are provided for the given type and flavor.
 75    
 76
 77    Parameters
 78    ----------
 79    type: Optional[str], default None
 80        Connector type (sql, api, etc.).
 81        Defaults to the type of the configured `instance_connector`.
 82
 83    label: Optional[str], default None
 84        Connector label (e.g. main). Defaults to `'main'`.
 85
 86    refresh: bool, default False
 87        Refresh the Connector instance / construct new object. Defaults to `False`.
 88
 89    kw: Any
 90        Other arguments to pass to the Connector constructor.
 91        If the Connector has already been constructed and new arguments are provided,
 92        `refresh` is set to `True` and the old Connector is replaced.
 93
 94    Returns
 95    -------
 96    A new Meerschaum connector (e.g. `meerschaum.connectors.api.APIConnector`,
 97    `meerschaum.connectors.sql.SQLConnector`).
 98    
 99    Examples
100    --------
101    The following parameters would create a new
102    `meerschaum.connectors.sql.SQLConnector` that isn't in the configuration file.
103
104    ```
105    >>> conn = get_connector(
106    ...     type = 'sql',
107    ...     label = 'newlabel',
108    ...     flavor = 'sqlite',
109    ...     database = '/file/path/to/database.db'
110    ... )
111    >>>
112    ```
113
114    """
115    from meerschaum.connectors.parse import parse_instance_keys
116    from meerschaum.config import get_config
117    from meerschaum.config.static import STATIC_CONFIG
118    from meerschaum.utils.warnings import warn
119    global _loaded_plugin_connectors
120    if isinstance(type, str) and not label and ':' in type:
121        type, label = type.split(':', maxsplit=1)
122    with _locks['_loaded_plugin_connectors']:
123        if not _loaded_plugin_connectors:
124            load_plugin_connectors()
125            _loaded_plugin_connectors = True
126    if type is None and label is None:
127        default_instance_keys = get_config('meerschaum', 'instance', patch=True)
128        ### recursive call to get_connector
129        return parse_instance_keys(default_instance_keys)
130
131    ### NOTE: the default instance connector may not be main.
132    ### Only fall back to 'main' if the type is provided by the label is omitted.
133    label = label if label is not None else STATIC_CONFIG['connectors']['default_label']
134
135    ### type might actually be a label. Check if so and raise a warning.
136    if type not in connectors:
137        possibilities, poss_msg = [], ""
138        for _type in get_config('meerschaum', 'connectors'):
139            if type in get_config('meerschaum', 'connectors', _type):
140                possibilities.append(f"{_type}:{type}")
141        if len(possibilities) > 0:
142            poss_msg = " Did you mean"
143            for poss in possibilities[:-1]:
144                poss_msg += f" '{poss}',"
145            if poss_msg.endswith(','):
146                poss_msg = poss_msg[:-1]
147            if len(possibilities) > 1:
148                poss_msg += " or"
149            poss_msg += f" '{possibilities[-1]}'?"
150
151        warn(f"Cannot create Connector of type '{type}'." + poss_msg, stack=False)
152        return None
153
154    if 'sql' not in types:
155        from meerschaum.connectors.plugin import PluginConnector
156        with _locks['types']:
157            types.update({
158                'api'   : APIConnector,
159                'sql'   : SQLConnector,
160                'plugin': PluginConnector,
161            })
162    
163    ### determine if we need to call the constructor
164    if not refresh:
165        ### see if any user-supplied arguments differ from the existing instance
166        if label in connectors[type]:
167            warning_message = None
168            for attribute, value in kw.items():
169                if attribute not in connectors[type][label].meta:
170                    import inspect
171                    cls = connectors[type][label].__class__
172                    cls_init_signature = inspect.signature(cls)
173                    cls_init_params = cls_init_signature.parameters
174                    if attribute not in cls_init_params:
175                        warning_message = (
176                            f"Received new attribute '{attribute}' not present in connector " +
177                            f"{connectors[type][label]}.\n"
178                        )
179                elif connectors[type][label].__dict__[attribute] != value:
180                    warning_message = (
181                        f"Mismatched values for attribute '{attribute}' in connector "
182                        + f"'{connectors[type][label]}'.\n" +
183                        f"  - Keyword value: '{value}'\n" +
184                        f"  - Existing value: '{connectors[type][label].__dict__[attribute]}'\n"
185                    )
186            if warning_message is not None:
187                warning_message += (
188                    "\nSetting `refresh` to True and recreating connector with type:"
189                    + f" '{type}' and label '{label}'."
190                )
191                refresh = True
192                warn(warning_message)
193        else: ### connector doesn't yet exist
194            refresh = True
195
196    ### only create an object if refresh is True
197    ### (can be manually specified, otherwise determined above)
198    if refresh:
199        with _locks['connectors']:
200            try:
201                ### will raise an error if configuration is incorrect / missing
202                conn = types[type](label=label, **kw)
203                connectors[type][label] = conn
204            except InvalidAttributesError as ie:
205                warn(
206                    f"Incorrect attributes for connector '{type}:{label}'.\n"
207                    + str(ie),
208                    stack = False,
209                )
210                conn = None
211            except Exception as e:
212                from meerschaum.utils.formatting import get_console
213                console = get_console()
214                if console:
215                    console.print_exception()
216                warn(
217                    f"Exception when creating connector '{type}:{label}'.\n" + str(e),
218                    stack = False,
219                )
220                conn = None
221        if conn is None:
222            return None
223
224    return connectors[type][label]
225
226
227def is_connected(keys: str, **kw) -> bool:
228    """
229    Check if the connector keys correspond to an active connection.
230    If the connector has not been created, it will immediately return `False`.
231    If the connector exists but cannot communicate with the source, return `False`.
232    
233    **NOTE:** Only works with instance connectors (`SQLConnectors` and `APIConnectors`).
234    Keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`.
235
236    Parameters
237    ----------
238    keys:
239        The keys to the connector (e.g. `'sql:main'`).
240        
241    Returns
242    -------
243    A `bool` corresponding to whether a successful connection may be made.
244
245    """
246    import warnings
247    if ':' not in keys:
248        warn(f"Invalid connector keys '{keys}'")
249
250    try:
251        typ, label = keys.split(':')
252    except Exception as e:
253        return False
254    if typ not in instance_types:
255        return False
256    if not (label in connectors.get(typ, {})):
257        return False
258
259    from meerschaum.connectors.parse import parse_instance_keys
260    conn = parse_instance_keys(keys)
261    try:
262        with warnings.catch_warnings():
263            warnings.filterwarnings('ignore')
264            return conn.test_connection(**kw)
265    except Exception as e:
266        return False
267
268
269def make_connector(
270        cls,
271    ):
272    """
273    Register a class as a `Connector`.
274    The `type` will be the lower case of the class name, without the suffix `connector`.
275
276    Parameters
277    ----------
278    instance: bool, default False
279        If `True`, make this connector type an instance connector.
280        This requires implementing the various pipes functions and lots of testing.
281
282    Examples
283    --------
284    >>> import meerschaum as mrsm
285    >>> from meerschaum.connectors import make_connector, Connector
286    >>> class FooConnector(Connector):
287    ...     def __init__(self, label: str, **kw):
288    ...         super().__init__('foo', label, **kw)
289    ... 
290    >>> make_connector(FooConnector)
291    >>> mrsm.get_connector('foo', 'bar')
292    foo:bar
293    >>> 
294    """
295    import re
296    typ = re.sub(r'connector$', '', cls.__name__.lower())
297    with _locks['types']:
298        types[typ] = cls
299    with _locks['custom_types']:
300        custom_types.add(typ)
301    with _locks['connectors']:
302        if typ not in connectors:
303            connectors[typ] = {}
304    if getattr(cls, 'IS_INSTANCE', False):
305        with _locks['instance_types']:
306            if typ not in instance_types:
307                instance_types.append(typ)
308
309    return cls
310
311
312def load_plugin_connectors():
313    """
314    If a plugin makes use of the `make_connector` decorator,
315    load its module.
316    """
317    from meerschaum.plugins import get_plugins, import_plugins
318    to_import = []
319    for plugin in get_plugins():
320        with open(plugin.__file__, encoding='utf-8') as f:
321            text = f.read()
322        if 'make_connector' in text:
323            to_import.append(plugin.name)
324    if not to_import:
325        return
326    import_plugins(*to_import) 
327
328
329def get_connector_plugin(
330        connector: Connector,
331    ) -> Union[str, None, mrsm.Plugin]:
332    """
333    Determine the plugin for a connector.
334    This is useful for handling virtual environments for custom instance connectors.
335
336    Parameters
337    ----------
338    connector: Connector
339        The connector which may require a virtual environment.
340
341    Returns
342    -------
343    A Plugin, 'mrsm', or None.
344    """
345    if not hasattr(connector, 'type'):
346        return None
347    plugin_name = (
348        connector.__module__.replace('plugins.', '').split('.')[0]
349        if connector.type in custom_types else (
350            connector.label
351            if connector.type == 'plugin'
352            else 'mrsm'
353        )
354    )
355    plugin = mrsm.Plugin(plugin_name)
356    return plugin if plugin.is_installed() else None
class Connector:
 20class Connector(metaclass=abc.ABCMeta):
 21    """
 22    The base connector class to hold connection attributes,
 23    """
 24    def __init__(
 25            self,
 26            type: Optional[str] = None,
 27            label: Optional[str] = None,
 28            **kw: Any
 29        ):
 30        """
 31        Parameters
 32        ----------
 33        type: str
 34            The `type` of the connector (e.g. `sql`, `api`, `plugin`).
 35
 36        label: str
 37            The `label` for the connector.
 38
 39        Run `mrsm edit config` and to edit connectors in the YAML file:
 40
 41        ```
 42        meerschaum:
 43            connections:
 44                {type}:
 45                    {label}:
 46                        ### attributes go here
 47        ```
 48
 49        """
 50        self._original_dict = copy.deepcopy(self.__dict__)
 51        self._set_attributes(type=type, label=label, **kw)
 52        self.verify_attributes(getattr(self, 'REQUIRED_ATTRIBUTES', None))
 53
 54    def _reset_attributes(self):
 55        self.__dict__ = self._original_dict
 56
 57    def _set_attributes(
 58            self,
 59            *args,
 60            inherit_default: bool = True,
 61            **kw: Any
 62        ):
 63        from meerschaum.config.static import STATIC_CONFIG
 64        from meerschaum.utils.warnings import error
 65
 66        self._attributes = {}
 67
 68        default_label = STATIC_CONFIG['connectors']['default_label']
 69
 70        ### NOTE: Support the legacy method of explicitly passing the type.
 71        label = kw.get('label', None)
 72        if label is None:
 73            if len(args) == 2:
 74                label = args[1]
 75            elif len(args) == 0:
 76                label = None
 77            else:
 78                label = args[0]
 79
 80        if label == 'default':
 81            error(
 82                f"Label cannot be 'default'. Did you mean '{default_label}'?",
 83                InvalidAttributesError,
 84            )
 85        self.__dict__['label'] = label
 86
 87        from meerschaum.config import get_config
 88        conn_configs = copy.deepcopy(get_config('meerschaum', 'connectors'))
 89        connector_config = copy.deepcopy(get_config('system', 'connectors'))
 90
 91        ### inherit attributes from 'default' if exists
 92        if inherit_default:
 93            inherit_from = 'default'
 94            if self.type in conn_configs and inherit_from in conn_configs[self.type]:
 95                _inherit_dict = copy.deepcopy(conn_configs[self.type][inherit_from])
 96                self._attributes.update(_inherit_dict)
 97
 98        ### load user config into self._attributes
 99        if self.type in conn_configs and self.label in conn_configs[self.type]:
100            self._attributes.update(conn_configs[self.type][self.label])
101
102        ### load system config into self._sys_config
103        ### (deep copy so future Connectors don't inherit changes)
104        if self.type in connector_config:
105            self._sys_config = copy.deepcopy(connector_config[self.type])
106
107        ### add additional arguments or override configuration
108        self._attributes.update(kw)
109
110        ### finally, update __dict__ with _attributes.
111        self.__dict__.update(self._attributes)
112
113
114    def verify_attributes(
115            self,
116            required_attributes: Optional[List[str]] = None,
117            debug: bool = False
118        ) -> None:
119        """
120        Ensure that the required attributes have been met.
121        
122        The Connector base class checks the minimum requirements.
123        Child classes may enforce additional requirements.
124
125        Parameters
126        ----------
127        required_attributes: Optional[List[str]], default None
128            Attributes to be verified. If `None`, default to `['label']`.
129
130        debug: bool, default False
131            Verbosity toggle.
132
133        Returns
134        -------
135        Don't return anything.
136
137        Raises
138        ------
139        An error if any of the required attributes are missing.
140        """
141        from meerschaum.utils.warnings import error, warn
142        from meerschaum.utils.debug import dprint
143        from meerschaum.utils.misc import items_str
144        if required_attributes is None:
145            required_attributes = ['label']
146        missing_attributes = set()
147        for a in required_attributes:
148            if a not in self.__dict__:
149                missing_attributes.add(a)
150        if len(missing_attributes) > 0:
151            error(
152                (
153                    f"Missing {items_str(list(missing_attributes))} "
154                    + f"for connector '{self.type}:{self.label}'."
155                ),
156                InvalidAttributesError,
157                silent = True,
158                stack = False
159            )
160
161
162    def __str__(self):
163        """
164        When cast to a string, return type:label.
165        """
166        return f"{self.type}:{self.label}"
167
168    def __repr__(self):
169        """
170        Represent the connector as type:label.
171        """
172        return str(self)
173
174    @property
175    def meta(self) -> Dict[str, Any]:
176        """
177        Return the keys needed to reconstruct this Connector.
178        """
179        _meta = {
180            key: value
181            for key, value in self.__dict__.items()
182            if not str(key).startswith('_')
183        }
184        _meta.update({
185            'type': self.type,
186            'label': self.label,
187        })
188        return _meta
189
190
191    @property
192    def type(self) -> str:
193        """
194        Return the type for this connector.
195        """
196        _type = self.__dict__.get('type', None)
197        if _type is None:
198            import re
199            _type = re.sub(r'connector$', '', self.__class__.__name__.lower())
200            self.__dict__['type'] = _type
201        return _type
202
203
204    @property
205    def label(self) -> str:
206        """
207        Return the label for this connector.
208        """
209        _label = self.__dict__.get('label', None)
210        if _label is None:
211            from meerschaum.config.static import STATIC_CONFIG
212            _label = STATIC_CONFIG['connectors']['default_label']
213            self.__dict__['label'] = _label
214        return _label

The base connector class to hold connection attributes,

Connector(type: Optional[str] = None, label: Optional[str] = None, **kw: Any)
24    def __init__(
25            self,
26            type: Optional[str] = None,
27            label: Optional[str] = None,
28            **kw: Any
29        ):
30        """
31        Parameters
32        ----------
33        type: str
34            The `type` of the connector (e.g. `sql`, `api`, `plugin`).
35
36        label: str
37            The `label` for the connector.
38
39        Run `mrsm edit config` and to edit connectors in the YAML file:
40
41        ```
42        meerschaum:
43            connections:
44                {type}:
45                    {label}:
46                        ### attributes go here
47        ```
48
49        """
50        self._original_dict = copy.deepcopy(self.__dict__)
51        self._set_attributes(type=type, label=label, **kw)
52        self.verify_attributes(getattr(self, 'REQUIRED_ATTRIBUTES', None))
Parameters
  • type (str): The type of the connector (e.g. sql, api, plugin).
  • label (str): The label for the connector.
  • Run mrsm edit config and to edit connectors in the YAML file:
  • ```
  • meerschaum:: connections: {type}: {label}: ### attributes go here
  • ```
def verify_attributes( self, required_attributes: Optional[List[str]] = None, debug: bool = False) -> None:
114    def verify_attributes(
115            self,
116            required_attributes: Optional[List[str]] = None,
117            debug: bool = False
118        ) -> None:
119        """
120        Ensure that the required attributes have been met.
121        
122        The Connector base class checks the minimum requirements.
123        Child classes may enforce additional requirements.
124
125        Parameters
126        ----------
127        required_attributes: Optional[List[str]], default None
128            Attributes to be verified. If `None`, default to `['label']`.
129
130        debug: bool, default False
131            Verbosity toggle.
132
133        Returns
134        -------
135        Don't return anything.
136
137        Raises
138        ------
139        An error if any of the required attributes are missing.
140        """
141        from meerschaum.utils.warnings import error, warn
142        from meerschaum.utils.debug import dprint
143        from meerschaum.utils.misc import items_str
144        if required_attributes is None:
145            required_attributes = ['label']
146        missing_attributes = set()
147        for a in required_attributes:
148            if a not in self.__dict__:
149                missing_attributes.add(a)
150        if len(missing_attributes) > 0:
151            error(
152                (
153                    f"Missing {items_str(list(missing_attributes))} "
154                    + f"for connector '{self.type}:{self.label}'."
155                ),
156                InvalidAttributesError,
157                silent = True,
158                stack = False
159            )

Ensure that the required attributes have been met.

The Connector base class checks the minimum requirements. Child classes may enforce additional requirements.

Parameters
  • required_attributes (Optional[List[str]], default None): Attributes to be verified. If None, default to ['label'].
  • debug (bool, default False): Verbosity toggle.
Returns
  • Don't return anything.
Raises
  • An error if any of the required attributes are missing.
meta: Dict[str, Any]

Return the keys needed to reconstruct this Connector.

type: str

Return the type for this connector.

label: str

Return the label for this connector.

class SQLConnector(meerschaum.connectors.Connector):
 17class SQLConnector(Connector):
 18    """
 19    Connect to SQL databases via `sqlalchemy`.
 20    
 21    SQLConnectors may be used as Meerschaum instance connectors.
 22    Read more about connectors and instances at
 23    https://meerschaum.io/reference/connectors/
 24
 25    """
 26
 27    IS_INSTANCE: bool = True
 28
 29    from ._create_engine import flavor_configs, create_engine
 30    from ._sql import read, value, exec, execute, to_sql, exec_queries
 31    from meerschaum.utils.sql import test_connection
 32    from ._fetch import fetch, get_pipe_metadef
 33    from ._cli import cli, _cli_exit
 34    from ._pipes import (
 35        fetch_pipes_keys,
 36        create_indices,
 37        drop_indices,
 38        get_create_index_queries,
 39        get_drop_index_queries,
 40        get_add_columns_queries,
 41        get_alter_columns_queries,
 42        delete_pipe,
 43        get_pipe_data,
 44        get_pipe_data_query,
 45        register_pipe,
 46        edit_pipe,
 47        get_pipe_id,
 48        get_pipe_attributes,
 49        sync_pipe,
 50        sync_pipe_inplace,
 51        get_sync_time,
 52        pipe_exists,
 53        get_pipe_rowcount,
 54        drop_pipe,
 55        clear_pipe,
 56        deduplicate_pipe,
 57        get_pipe_table,
 58        get_pipe_columns_types,
 59        get_to_sql_dtype,
 60        get_pipe_schema,
 61    )
 62    from ._plugins import (
 63        register_plugin,
 64        delete_plugin,
 65        get_plugin_id,
 66        get_plugin_version,
 67        get_plugins,
 68        get_plugin_user_id,
 69        get_plugin_username,
 70        get_plugin_attributes,
 71    )
 72    from ._users import (
 73        register_user,
 74        get_user_id,
 75        get_users,
 76        edit_user,
 77        delete_user,
 78        get_user_password_hash,
 79        get_user_type,
 80        get_user_attributes,
 81    )
 82    from ._uri import from_uri, parse_uri
 83    from ._instance import (
 84        _log_temporary_tables_creation,
 85        _drop_temporary_table,
 86        _drop_temporary_tables,
 87        _drop_old_temporary_tables,
 88    )
 89    
 90    def __init__(
 91        self,
 92        label: Optional[str] = None,
 93        flavor: Optional[str] = None,
 94        wait: bool = False,
 95        connect: bool = False,
 96        debug: bool = False,
 97        **kw: Any
 98    ):
 99        """
100        Parameters
101        ----------
102        label: str, default 'main'
103            The identifying label for the connector.
104            E.g. for `sql:main`, 'main' is the label.
105            Defaults to 'main'.
106
107        flavor: Optional[str], default None
108            The database flavor, e.g.
109            `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc.
110            To see supported flavors, run the `bootstrap connectors` command.
111
112        wait: bool, default False
113            If `True`, block until a database connection has been made.
114            Defaults to `False`.
115
116        connect: bool, default False
117            If `True`, immediately attempt to connect the database and raise
118            a warning if the connection fails.
119            Defaults to `False`.
120
121        debug: bool, default False
122            Verbosity toggle.
123            Defaults to `False`.
124
125        kw: Any
126            All other arguments will be passed to the connector's attributes.
127            Therefore, a connector may be made without being registered,
128            as long enough parameters are supplied to the constructor.
129        """
130        if 'uri' in kw:
131            uri = kw['uri']
132            if uri.startswith('postgres://'):
133                uri = uri.replace('postgres://', 'postgresql://', 1)
134            if uri.startswith('timescaledb://'):
135                uri = uri.replace('timescaledb://', 'postgresql://', 1)
136                flavor = 'timescaledb'
137            kw['uri'] = uri
138            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
139            label = label or from_uri_params.get('label', None)
140            _ = from_uri_params.pop('label', None)
141
142            ### Sometimes the flavor may be provided with a URI.
143            kw.update(from_uri_params)
144            if flavor:
145                kw['flavor'] = flavor
146
147
148        ### set __dict__ in base class
149        super().__init__(
150            'sql',
151            label = label or self.__dict__.get('label', None),
152            **kw
153        )
154
155        if self.__dict__.get('flavor', None) == 'sqlite':
156            self._reset_attributes()
157            self._set_attributes(
158                'sql',
159                label = label,
160                inherit_default = False,
161                **kw
162            )
163            ### For backwards compatability reasons, set the path for sql:local if its missing.
164            if self.label == 'local' and not self.__dict__.get('database', None):
165                from meerschaum.config._paths import SQLITE_DB_PATH
166                self.database = str(SQLITE_DB_PATH)
167
168        ### ensure flavor and label are set accordingly
169        if 'flavor' not in self.__dict__:
170            if flavor is None and 'uri' not in self.__dict__:
171                raise Exception(
172                    f"    Missing flavor. Provide flavor as a key for '{self}'."
173                )
174            self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None)
175
176        if self.flavor == 'postgres':
177            self.flavor = 'postgresql'
178
179        self._debug = debug
180        ### Store the PID and thread at initialization
181        ### so we can dispose of the Pool in child processes or threads.
182        import os, threading
183        self._pid = os.getpid()
184        self._thread_ident = threading.current_thread().ident
185        self._sessions = {}
186        self._locks = {'_sessions': threading.RLock(), }
187
188        ### verify the flavor's requirements are met
189        if self.flavor not in self.flavor_configs:
190            error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector")
191        if not self.__dict__.get('uri'):
192            self.verify_attributes(
193                self.flavor_configs[self.flavor].get('requirements', set()),
194                debug=debug,
195            )
196
197        if wait:
198            from meerschaum.connectors.poll import retry_connect
199            retry_connect(connector=self, debug=debug)
200
201        if connect:
202            if not self.test_connection(debug=debug):
203                from meerschaum.utils.warnings import warn
204                warn(f"Failed to connect with connector '{self}'!", stack=False)
205
206    @property
207    def Session(self):
208        if '_Session' not in self.__dict__:
209            if self.engine is None:
210                return None
211
212            from meerschaum.utils.packages import attempt_import
213            sqlalchemy_orm = attempt_import('sqlalchemy.orm')
214            session_factory = sqlalchemy_orm.sessionmaker(self.engine)
215            self._Session = sqlalchemy_orm.scoped_session(session_factory)
216
217        return self._Session
218
219    @property
220    def engine(self):
221        import os, threading
222        ### build the sqlalchemy engine
223        if '_engine' not in self.__dict__:
224            self._engine, self._engine_str = self.create_engine(include_uri=True)
225
226        same_process = os.getpid() == self._pid
227        same_thread = threading.current_thread().ident == self._thread_ident
228
229        ### handle child processes
230        if not same_process:
231            self._pid = os.getpid()
232            self._thread = threading.current_thread()
233            from meerschaum.utils.warnings import warn
234            warn(f"Different PID detected. Disposing of connections...")
235            self._engine.dispose()
236
237        ### handle different threads
238        if not same_thread:
239            pass
240
241        return self._engine
242
243    @property
244    def DATABASE_URL(self) -> str:
245        """
246        Return the URI connection string (alias for `SQLConnector.URI`.
247        """
248        _ = self.engine
249        return str(self._engine_str)
250
251    @property
252    def URI(self) -> str:
253        """
254        Return the URI connection string.
255        """
256        _ = self.engine
257        return str(self._engine_str)
258
259    @property
260    def IS_THREAD_SAFE(self) -> str:
261        """
262        Return whether this connector may be multithreaded.
263        """
264        if self.flavor == 'duckdb':
265            return False
266        if self.flavor == 'sqlite':
267            return ':memory:' not in self.URI
268        return True
269
270
271    @property
272    def metadata(self):
273        """
274        Return the metadata bound to this configured schema.
275        """
276        from meerschaum.utils.packages import attempt_import
277        sqlalchemy = attempt_import('sqlalchemy')
278        if '_metadata' not in self.__dict__:
279            self._metadata = sqlalchemy.MetaData(schema=self.schema)
280        return self._metadata
281
282
283    @property
284    def instance_schema(self):
285        """
286        Return the schema name for Meerschaum tables. 
287        """
288        return self.schema
289
290
291    @property
292    def internal_schema(self):
293        """
294        Return the schema name for internal tables. 
295        """
296        from meerschaum.config.static import STATIC_CONFIG
297        from meerschaum.utils.packages import attempt_import
298        from meerschaum.utils.sql import NO_SCHEMA_FLAVORS
299        schema_name = self.__dict__.get('internal_schema', None) or (
300            STATIC_CONFIG['sql']['internal_schema']
301            if self.flavor not in NO_SCHEMA_FLAVORS
302            else self.schema
303        )
304
305        if '_internal_schema' not in self.__dict__:
306            self._internal_schema = schema_name
307        return self._internal_schema
308
309
310    @property
311    def db(self) -> Optional[databases.Database]:
312        from meerschaum.utils.packages import attempt_import
313        databases = attempt_import('databases', lazy=False, install=True)
314        url = self.DATABASE_URL
315        if 'mysql' in url:
316            url = url.replace('+pymysql', '')
317        if '_db' not in self.__dict__:
318            try:
319                self._db = databases.Database(url)
320            except KeyError:
321                ### Likely encountered an unsupported flavor.
322                from meerschaum.utils.warnings import warn
323                self._db = None
324        return self._db
325
326
327    @property
328    def db_version(self) -> Union[str, None]:
329        """
330        Return the database version.
331        """
332        _db_version = self.__dict__.get('_db_version', None)
333        if _db_version is not None:
334            return _db_version
335
336        from meerschaum.utils.sql import get_db_version
337        self._db_version = get_db_version(self)
338        return self._db_version
339
340
341    @property
342    def schema(self) -> Union[str, None]:
343        """
344        Return the default schema to use.
345        A value of `None` will not prepend a schema.
346        """
347        if 'schema' in self.__dict__:
348            return self.__dict__['schema']
349
350        from meerschaum.utils.sql import NO_SCHEMA_FLAVORS
351        if self.flavor in NO_SCHEMA_FLAVORS:
352            self.__dict__['schema'] = None
353            return None
354
355        sqlalchemy = mrsm.attempt_import('sqlalchemy')
356        _schema = sqlalchemy.inspect(self.engine).default_schema_name
357        self.__dict__['schema'] = _schema
358        return _schema
359
360
361    def __getstate__(self):
362        return self.__dict__
363
364    def __setstate__(self, d):
365        self.__dict__.update(d)
366
367    def __call__(self):
368        return self

Connect to SQL databases via sqlalchemy.

SQLConnectors may be used as Meerschaum instance connectors. Read more about connectors and instances at https://meerschaum.io/reference/connectors/

SQLConnector( label: Optional[str] = None, flavor: Optional[str] = None, wait: bool = False, connect: bool = False, debug: bool = False, **kw: Any)
 90    def __init__(
 91        self,
 92        label: Optional[str] = None,
 93        flavor: Optional[str] = None,
 94        wait: bool = False,
 95        connect: bool = False,
 96        debug: bool = False,
 97        **kw: Any
 98    ):
 99        """
100        Parameters
101        ----------
102        label: str, default 'main'
103            The identifying label for the connector.
104            E.g. for `sql:main`, 'main' is the label.
105            Defaults to 'main'.
106
107        flavor: Optional[str], default None
108            The database flavor, e.g.
109            `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc.
110            To see supported flavors, run the `bootstrap connectors` command.
111
112        wait: bool, default False
113            If `True`, block until a database connection has been made.
114            Defaults to `False`.
115
116        connect: bool, default False
117            If `True`, immediately attempt to connect the database and raise
118            a warning if the connection fails.
119            Defaults to `False`.
120
121        debug: bool, default False
122            Verbosity toggle.
123            Defaults to `False`.
124
125        kw: Any
126            All other arguments will be passed to the connector's attributes.
127            Therefore, a connector may be made without being registered,
128            as long enough parameters are supplied to the constructor.
129        """
130        if 'uri' in kw:
131            uri = kw['uri']
132            if uri.startswith('postgres://'):
133                uri = uri.replace('postgres://', 'postgresql://', 1)
134            if uri.startswith('timescaledb://'):
135                uri = uri.replace('timescaledb://', 'postgresql://', 1)
136                flavor = 'timescaledb'
137            kw['uri'] = uri
138            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
139            label = label or from_uri_params.get('label', None)
140            _ = from_uri_params.pop('label', None)
141
142            ### Sometimes the flavor may be provided with a URI.
143            kw.update(from_uri_params)
144            if flavor:
145                kw['flavor'] = flavor
146
147
148        ### set __dict__ in base class
149        super().__init__(
150            'sql',
151            label = label or self.__dict__.get('label', None),
152            **kw
153        )
154
155        if self.__dict__.get('flavor', None) == 'sqlite':
156            self._reset_attributes()
157            self._set_attributes(
158                'sql',
159                label = label,
160                inherit_default = False,
161                **kw
162            )
163            ### For backwards compatability reasons, set the path for sql:local if its missing.
164            if self.label == 'local' and not self.__dict__.get('database', None):
165                from meerschaum.config._paths import SQLITE_DB_PATH
166                self.database = str(SQLITE_DB_PATH)
167
168        ### ensure flavor and label are set accordingly
169        if 'flavor' not in self.__dict__:
170            if flavor is None and 'uri' not in self.__dict__:
171                raise Exception(
172                    f"    Missing flavor. Provide flavor as a key for '{self}'."
173                )
174            self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None)
175
176        if self.flavor == 'postgres':
177            self.flavor = 'postgresql'
178
179        self._debug = debug
180        ### Store the PID and thread at initialization
181        ### so we can dispose of the Pool in child processes or threads.
182        import os, threading
183        self._pid = os.getpid()
184        self._thread_ident = threading.current_thread().ident
185        self._sessions = {}
186        self._locks = {'_sessions': threading.RLock(), }
187
188        ### verify the flavor's requirements are met
189        if self.flavor not in self.flavor_configs:
190            error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector")
191        if not self.__dict__.get('uri'):
192            self.verify_attributes(
193                self.flavor_configs[self.flavor].get('requirements', set()),
194                debug=debug,
195            )
196
197        if wait:
198            from meerschaum.connectors.poll import retry_connect
199            retry_connect(connector=self, debug=debug)
200
201        if connect:
202            if not self.test_connection(debug=debug):
203                from meerschaum.utils.warnings import warn
204                warn(f"Failed to connect with connector '{self}'!", stack=False)
Parameters
  • label (str, default 'main'): The identifying label for the connector. E.g. for sql:main, 'main' is the label. Defaults to 'main'.
  • flavor (Optional[str], default None): The database flavor, e.g. 'sqlite', 'postgresql', 'cockroachdb', etc. To see supported flavors, run the bootstrap connectors command.
  • wait (bool, default False): If True, block until a database connection has been made. Defaults to False.
  • connect (bool, default False): If True, immediately attempt to connect the database and raise a warning if the connection fails. Defaults to False.
  • debug (bool, default False): Verbosity toggle. Defaults to False.
  • kw (Any): All other arguments will be passed to the connector's attributes. Therefore, a connector may be made without being registered, as long enough parameters are supplied to the constructor.
IS_INSTANCE: bool = True
Session
engine
DATABASE_URL: str

Return the URI connection string (alias for SQLConnector.URI.

URI: str

Return the URI connection string.

IS_THREAD_SAFE: str

Return whether this connector may be multithreaded.

metadata

Return the metadata bound to this configured schema.

instance_schema

Return the schema name for Meerschaum tables.

internal_schema

Return the schema name for internal tables.

db: 'Optional[databases.Database]'
db_version: Optional[str]

Return the database version.

schema: Optional[str]

Return the default schema to use. A value of None will not prepend a schema.

flavor_configs = {'timescaledb': {'engine': 'postgresql', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {}, 'requirements': {'host', 'database', 'password', 'username'}, 'defaults': {'port': 5432}}, 'postgresql': {'engine': 'postgresql', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {}, 'requirements': {'host', 'database', 'password', 'username'}, 'defaults': {'port': 5432}}, 'citus': {'engine': 'postgresql', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {}, 'requirements': {'host', 'database', 'password', 'username'}, 'defaults': {'port': 5432}}, 'mssql': {'engine': 'mssql+pyodbc', 'create_engine': {'fast_executemany': True, 'isolation_level': 'AUTOCOMMIT', 'use_setinputsizes': False}, 'omit_create_engine': {'method'}, 'to_sql': {'method': None}, 'requirements': {'host', 'database', 'password', 'username'}, 'defaults': {'port': 1433, 'options': {'driver': 'ODBC Driver 17 for SQL Server', 'UseFMTONLY': 'Yes'}}}, 'mysql': {'engine': 'mysql+pymysql', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {'method': 'multi'}, 'requirements': {'host', 'database', 'password', 'username'}, 'defaults': {'port': 3306}}, 'mariadb': {'engine': 'mysql+pymysql', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {'method': 'multi'}, 'requirements': {'host', 'database', 'password', 'username'}, 'defaults': {'port': 3306}}, 'oracle': {'engine': 'oracle+cx_oracle', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {'method': None}, 'requirements': {'host', 'database', 'password', 'username'}, 'defaults': {'port': 1521}}, 'sqlite': {'engine': 'sqlite', 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'omit_create_engine': {'method'}, 'to_sql': {'method': 'multi'}, 'requirements': {'database'}, 'defaults': {}}, 'duckdb': {'engine': 'duckdb', 'create_engine': {}, 'omit_create_engine': {'ALL'}, 'to_sql': {'method': 'multi'}, 'requirements': '', 'defaults': {}}, 'cockroachdb': {'engine': 'cockroachdb', 'omit_create_engine': {'method'}, 'create_engine': {'pool_size': 5, 'max_overflow': 10, 'pool_recycle': 3600, 'connect_args': {}}, 'to_sql': {'method': 'multi'}, 'requirements': {'host'}, 'defaults': {'port': 26257, 'database': 'defaultdb', 'username': 'root', 'password': 'admin'}}}
def create_engine( self, include_uri: bool = False, debug: bool = False, **kw) -> 'sqlalchemy.engine.Engine':
174def create_engine(
175        self,
176        include_uri: bool = False,
177        debug: bool = False,
178        **kw
179    ) -> 'sqlalchemy.engine.Engine':
180    """Create a sqlalchemy engine by building the engine string."""
181    from meerschaum.utils.packages import attempt_import
182    from meerschaum.utils.warnings import error, warn
183    sqlalchemy = attempt_import('sqlalchemy')
184    import urllib
185    import copy
186    ### Install and patch required drivers.
187    if self.flavor in install_flavor_drivers:
188        attempt_import(*install_flavor_drivers[self.flavor], debug=debug, lazy=False, warn=False)
189    if self.flavor in require_patching_flavors:
190        from meerschaum.utils.packages import determine_version, _monkey_patch_get_distribution
191        import pathlib
192        for install_name, import_name in require_patching_flavors[self.flavor]:
193            pkg = attempt_import(
194                import_name,
195                debug = debug,
196                lazy = False,
197                warn = False
198            )
199            _monkey_patch_get_distribution(
200                install_name, determine_version(pathlib.Path(pkg.__file__), venv='mrsm')
201            )
202
203    ### supplement missing values with defaults (e.g. port number)
204    for a, value in flavor_configs[self.flavor]['defaults'].items():
205        if a not in self.__dict__:
206            self.__dict__[a] = value
207
208    ### Verify that everything is in order.
209    if self.flavor not in flavor_configs:
210        error(f"Cannot create a connector with the flavor '{self.flavor}'.")
211
212    _engine = flavor_configs[self.flavor].get('engine', None)
213    _username = self.__dict__.get('username', None)
214    _password = self.__dict__.get('password', None)
215    _host = self.__dict__.get('host', None)
216    _port = self.__dict__.get('port', None)
217    _database = self.__dict__.get('database', None)
218    _options = self.__dict__.get('options', {})
219    if isinstance(_options, str):
220        _options = dict(urllib.parse.parse_qsl(_options))
221    _uri = self.__dict__.get('uri', None)
222
223    ### Handle registering specific dialects (due to installing in virtual environments).
224    if self.flavor in flavor_dialects:
225        sqlalchemy.dialects.registry.register(*flavor_dialects[self.flavor])
226
227    ### self._sys_config was deepcopied and can be updated safely
228    if self.flavor in ("sqlite", "duckdb"):
229        engine_str = f"{_engine}:///{_database}" if not _uri else _uri
230        if 'create_engine' not in self._sys_config:
231            self._sys_config['create_engine'] = {}
232        if 'connect_args' not in self._sys_config['create_engine']:
233            self._sys_config['create_engine']['connect_args'] = {}
234        self._sys_config['create_engine']['connect_args'].update({"check_same_thread" : False})
235    else:
236        engine_str = (
237            _engine + "://" + (_username if _username is not None else '') +
238            ((":" + urllib.parse.quote_plus(_password)) if _password is not None else '') +
239            "@" + _host + ((":" + str(_port)) if _port is not None else '') +
240            (("/" + _database) if _database is not None else '')
241            + (("?" + urllib.parse.urlencode(_options)) if _options else '')
242        ) if not _uri else _uri
243
244        ### Sometimes the timescaledb:// flavor can slip in.
245        if _uri and self.flavor in ('timescaledb',) and self.flavor in _uri:
246            engine_str = engine_str.replace(f'{self.flavor}://', 'postgresql://')
247
248    if debug:
249        dprint(
250            (
251                (engine_str.replace(':' + _password, ':' + ('*' * len(_password))))
252                    if _password is not None else engine_str
253            ) + '\n' + f"{self._sys_config.get('create_engine', {}).get('connect_args', {})}"
254        )
255
256    _kw_copy = copy.deepcopy(kw)
257
258    ### NOTE: Order of inheritance:
259    ###       1. Defaults
260    ###       2. System configuration
261    ###       3. Connector configuration
262    ###       4. Keyword arguments
263    _create_engine_args = flavor_configs.get(self.flavor, {}).get('create_engine', {})
264    def _apply_create_engine_args(update):
265        if 'ALL' not in flavor_configs[self.flavor].get('omit_create_engine', {}):
266            _create_engine_args.update(
267                { k: v for k, v in update.items()
268                    if 'omit_create_engine' not in flavor_configs[self.flavor]
269                        or k not in flavor_configs[self.flavor].get('omit_create_engine')
270                }
271            )
272    _apply_create_engine_args(self._sys_config.get('create_engine', {}))
273    _apply_create_engine_args(self.__dict__.get('create_engine', {}))
274    _apply_create_engine_args(_kw_copy)
275
276    try:
277        engine = sqlalchemy.create_engine(
278            engine_str,
279            ### I know this looks confusing, and maybe it's bad code,
280            ### but it's simple. It dynamically parses the config string
281            ### and splits it to separate the class name (QueuePool)
282            ### from the module name (sqlalchemy.pool).
283            poolclass    = getattr(
284                attempt_import(
285                    ".".join(self._sys_config['poolclass'].split('.')[:-1])
286                ),
287                self._sys_config['poolclass'].split('.')[-1]
288            ),
289            echo         = debug,
290            **_create_engine_args
291        )
292    except Exception as e:
293        warn(f"Failed to create connector '{self}':\n{traceback.format_exc()}", stack=False)
294        engine = None
295
296    if include_uri:
297        return engine, engine_str
298    return engine

Create a sqlalchemy engine by building the engine string.

def read( self, query_or_table: 'Union[str, sqlalchemy.Query]', params: Union[Dict[str, Any], List[str], NoneType] = None, dtype: Optional[Dict[str, Any]] = None, coerce_float: bool = True, chunksize: Optional[int] = -1, workers: Optional[int] = None, chunk_hook: Optional[Callable[[pandas.core.frame.DataFrame], Any]] = None, as_hook_results: bool = False, chunks: Optional[int] = None, schema: Optional[str] = None, as_chunks: bool = False, as_iterator: bool = False, as_dask: bool = False, index_col: Optional[str] = None, silent: bool = False, debug: bool = False, **kw: Any) -> 'Union[pandas.DataFrame, dask.DataFrame, List[pandas.DataFrame], List[Any], None]':
 24def read(
 25        self,
 26        query_or_table: Union[str, sqlalchemy.Query],
 27        params: Union[Dict[str, Any], List[str], None] = None,
 28        dtype: Optional[Dict[str, Any]] = None,
 29        coerce_float: bool = True,
 30        chunksize: Optional[int] = -1,
 31        workers: Optional[int] = None,
 32        chunk_hook: Optional[Callable[[pandas.DataFrame], Any]] = None,
 33        as_hook_results: bool = False,
 34        chunks: Optional[int] = None,
 35        schema: Optional[str] = None,
 36        as_chunks: bool = False,
 37        as_iterator: bool = False,
 38        as_dask: bool = False,
 39        index_col: Optional[str] = None,
 40        silent: bool = False,
 41        debug: bool = False,
 42        **kw: Any
 43    ) -> Union[
 44        pandas.DataFrame,
 45        dask.DataFrame,
 46        List[pandas.DataFrame],
 47        List[Any],
 48        None,
 49    ]:
 50    """
 51    Read a SQL query or table into a pandas dataframe.
 52
 53    Parameters
 54    ----------
 55    query_or_table: Union[str, sqlalchemy.Query]
 56        The SQL query (sqlalchemy Query or string) or name of the table from which to select.
 57
 58    params: Optional[Dict[str, Any]], default None
 59        `List` or `Dict` of parameters to pass to `pandas.read_sql()`.
 60        See the pandas documentation for more information:
 61        https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html
 62
 63    dtype: Optional[Dict[str, Any]], default None
 64        A dictionary of data types to pass to `pandas.read_sql()`.
 65        See the pandas documentation for more information:
 66        https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html
 67
 68    chunksize: Optional[int], default -1
 69        How many chunks to read at a time. `None` will read everything in one large chunk.
 70        Defaults to system configuration.
 71
 72        **NOTE:** DuckDB does not allow for chunking.
 73
 74    workers: Optional[int], default None
 75        How many threads to use when consuming the generator.
 76        Only applies if `chunk_hook` is provided.
 77
 78    chunk_hook: Optional[Callable[[pandas.DataFrame], Any]], default None
 79        Hook function to execute once per chunk, e.g. writing and reading chunks intermittently.
 80        See `--sync-chunks` for an example.
 81        **NOTE:** `as_iterator` MUST be False (default).
 82
 83    as_hook_results: bool, default False
 84        If `True`, return a `List` of the outputs of the hook function.
 85        Only applicable if `chunk_hook` is not None.
 86
 87        **NOTE:** `as_iterator` MUST be `False` (default).
 88
 89    chunks: Optional[int], default None
 90        Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and
 91        return into a single dataframe.
 92        For example, to limit the returned dataframe to 100,000 rows,
 93        you could specify a `chunksize` of `1000` and `chunks` of `100`.
 94
 95    schema: Optional[str], default None
 96        If just a table name is provided, optionally specify the table schema.
 97        Defaults to `SQLConnector.schema`.
 98
 99    as_chunks: bool, default False
100        If `True`, return a list of DataFrames. 
101        Otherwise return a single DataFrame.
102
103    as_iterator: bool, default False
104        If `True`, return the pandas DataFrame iterator.
105        `chunksize` must not be `None` (falls back to 1000 if so),
106        and hooks are not called in this case.
107
108    index_col: Optional[str], default None
109        If using Dask, use this column as the index column.
110        If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame.
111
112    silent: bool, default False
113        If `True`, don't raise warnings in case of errors.
114        Defaults to `False`.
115
116    Returns
117    -------
118    A `pd.DataFrame` (default case), or an iterator, or a list of dataframes / iterators,
119    or `None` if something breaks.
120
121    """
122    if chunks is not None and chunks <= 0:
123        return []
124    from meerschaum.utils.sql import sql_item_name, truncate_item_name
125    from meerschaum.utils.dtypes.sql import NUMERIC_PRECISION_FLAVORS
126    from meerschaum.utils.packages import attempt_import, import_pandas
127    from meerschaum.utils.pool import get_pool
128    from meerschaum.utils.dataframe import chunksize_to_npartitions, get_numeric_cols
129    import warnings
130    import inspect
131    import traceback
132    from decimal import Decimal
133    pd = import_pandas()
134    dd = None
135    is_dask = 'dask' in pd.__name__
136    pd = attempt_import('pandas')
137    is_dask = dd is not None
138    npartitions = chunksize_to_npartitions(chunksize)
139    if is_dask:
140        chunksize = None
141    schema = schema or self.schema
142
143    sqlalchemy = attempt_import("sqlalchemy")
144    default_chunksize = self._sys_config.get('chunksize', None)
145    chunksize = chunksize if chunksize != -1 else default_chunksize
146    if chunksize is None and as_iterator:
147        if not silent and self.flavor not in _disallow_chunks_flavors:
148            warn(
149                f"An iterator may only be generated if chunksize is not None.\n"
150                + "Falling back to a chunksize of 1000.", stacklevel=3,
151            )
152        chunksize = 1000
153    if chunksize is not None and self.flavor in _max_chunks_flavors:
154        if chunksize > _max_chunks_flavors[self.flavor]:
155            if chunksize != default_chunksize:
156                warn(
157                    f"The specified chunksize of {chunksize} exceeds the maximum of "
158                    + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n"
159                    + f"    Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.",
160                    stacklevel = 3,
161                )
162            chunksize = _max_chunks_flavors[self.flavor]
163
164    ### NOTE: A bug in duckdb_engine does not allow for chunks.
165    if chunksize is not None and self.flavor in _disallow_chunks_flavors:
166        chunksize = None
167
168    if debug:
169        import time
170        start = time.perf_counter()
171        dprint(f"[{self}]\n{query_or_table}")
172        dprint(f"[{self}] Fetching with chunksize: {chunksize}")
173
174    ### This might be sqlalchemy object or the string of a table name.
175    ### We check for spaces and quotes to see if it might be a weird table.
176    if (
177        ' ' not in str(query_or_table)
178        or (
179            ' ' in str(query_or_table)
180            and str(query_or_table).startswith('"')
181            and str(query_or_table).endswith('"')
182        )
183    ):
184        truncated_table_name = truncate_item_name(str(query_or_table), self.flavor)
185        if truncated_table_name != str(query_or_table) and not silent:
186            warn(
187                f"Table '{name}' is too long for '{self.flavor}',"
188                + f" will instead create the table '{truncated_name}'."
189            )
190
191        query_or_table = sql_item_name(str(query_or_table), self.flavor, schema)
192        if debug:
193            dprint(f"[{self}] Reading from table {query_or_table}")
194        formatted_query = sqlalchemy.text("SELECT * FROM " + str(query_or_table))
195        str_query = f"SELECT * FROM {query_or_table}"
196    else:
197        str_query = query_or_table
198
199    formatted_query = (
200        sqlalchemy.text(str_query)
201        if not is_dask and isinstance(str_query, str)
202        else format_sql_query_for_dask(str_query)
203    )
204
205    chunk_list = []
206    chunk_hook_results = []
207    try:
208        stream_results = not as_iterator and chunk_hook is not None and chunksize is not None
209        with warnings.catch_warnings():
210            warnings.filterwarnings('ignore', 'case sensitivity issues')
211
212            read_sql_query_kwargs = {
213                'params': params,
214                'dtype': dtype,
215                'coerce_float': coerce_float,
216                'index_col': index_col,
217            }
218            if is_dask:
219                if index_col is None:
220                    dd = None
221                    pd = attempt_import('pandas')
222                    read_sql_query_kwargs.update({
223                        'chunksize': chunksize,
224                    })
225            else:
226                read_sql_query_kwargs.update({
227                    'chunksize': chunksize,
228                })
229
230            if is_dask and dd is not None:
231                ddf = dd.read_sql_query(
232                    formatted_query,
233                    self.URI,
234                    **read_sql_query_kwargs
235                )
236            else:
237
238                with self.engine.begin() as transaction:
239                    with transaction.execution_options(stream_results=stream_results) as connection:
240                        chunk_generator = pd.read_sql_query(
241                            formatted_query,
242                            connection,
243                            **read_sql_query_kwargs
244                        )
245
246                        ### `stream_results` must be False (will load everything into memory).
247                        if as_iterator or chunksize is None:
248                            return chunk_generator
249
250                        ### We must consume the generator in this context if using server-side cursors.
251                        if stream_results:
252
253                            pool = get_pool(workers=workers)
254
255                            def _process_chunk(_chunk, _retry_on_failure: bool = True):
256                                if not as_hook_results:
257                                    chunk_list.append(_chunk)
258                                result = None
259                                if chunk_hook is not None:
260                                    try:
261                                        result = chunk_hook(
262                                            _chunk,
263                                            workers = workers,
264                                            chunksize = chunksize,
265                                            debug = debug,
266                                            **kw
267                                        )
268                                    except Exception as e:
269                                        result = False, traceback.format_exc()
270                                        from meerschaum.utils.formatting import get_console
271                                        if not silent:
272                                            get_console().print_exception()
273
274                                    ### If the chunk fails to process, try it again one more time.
275                                    if isinstance(result, tuple) and result[0] is False:
276                                        if _retry_on_failure:
277                                            return _process_chunk(_chunk, _retry_on_failure=False)
278
279                                return result
280
281                            chunk_hook_results = list(pool.imap(_process_chunk, chunk_generator))
282                            if as_hook_results:
283                                return chunk_hook_results
284
285    except Exception as e:
286        if debug:
287            dprint(f"[{self}] Failed to execute query:\n\n{query_or_table}\n\n")
288        if not silent:
289            warn(str(e), stacklevel=3)
290        from meerschaum.utils.formatting import get_console
291        if not silent:
292            get_console().print_exception()
293
294        return None
295
296    if is_dask and dd is not None:
297        ddf = ddf.reset_index()
298        return ddf
299
300    chunk_list = []
301    read_chunks = 0
302    chunk_hook_results = []
303    if chunksize is None:
304        chunk_list.append(chunk_generator)
305    elif as_iterator:
306        return chunk_generator
307    else:
308        try:
309            for chunk in chunk_generator:
310                if chunk_hook is not None:
311                    chunk_hook_results.append(
312                        chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw)
313                    )
314                chunk_list.append(chunk)
315                read_chunks += 1
316                if chunks is not None and read_chunks >= chunks:
317                    break
318        except Exception as e:
319            warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3)
320            from meerschaum.utils.formatting import get_console
321            if not silent:
322                get_console().print_exception()
323
324    read_chunks = 0
325    try:
326        for chunk in chunk_generator:
327            if chunk_hook is not None:
328                chunk_hook_results.append(
329                    chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw)
330                )
331            chunk_list.append(chunk)
332            read_chunks += 1
333            if chunks is not None and read_chunks >= chunks:
334                break
335    except Exception as e:
336        warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3)
337        from meerschaum.utils.formatting import get_console
338        if not silent:
339            get_console().print_exception()
340
341        return None
342
343    ### If no chunks returned, read without chunks
344    ### to get columns
345    if len(chunk_list) == 0:
346        with warnings.catch_warnings():
347            warnings.filterwarnings('ignore', 'case sensitivity issues')
348            _ = read_sql_query_kwargs.pop('chunksize', None)
349            with self.engine.begin() as connection:
350                chunk_list.append(
351                    pd.read_sql_query(
352                        formatted_query,
353                        connection,
354                        **read_sql_query_kwargs
355                    )
356                )
357
358    ### call the hook on any missed chunks.
359    if chunk_hook is not None and len(chunk_list) > len(chunk_hook_results):
360        for c in chunk_list[len(chunk_hook_results):]:
361            chunk_hook_results.append(
362                chunk_hook(c, chunksize=chunksize, debug=debug, **kw)
363            )
364
365    ### chunksize is not None so must iterate
366    if debug:
367        end = time.perf_counter()
368        dprint(f"Fetched {len(chunk_list)} chunks in {round(end - start, 2)} seconds.")
369
370    if as_hook_results:
371        return chunk_hook_results
372    
373    ### Skip `pd.concat()` if `as_chunks` is specified.
374    if as_chunks:
375        for c in chunk_list:
376            c.reset_index(drop=True, inplace=True)
377            for col in get_numeric_cols(c):
378                c[col] = c[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x)
379        return chunk_list
380
381    df = pd.concat(chunk_list).reset_index(drop=True)
382    ### NOTE: The calls to `canonical()` are to drop leading and trailing zeroes.
383    for col in get_numeric_cols(df):
384        df[col] = df[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x)
385
386    return df

Read a SQL query or table into a pandas dataframe.

Parameters
  • query_or_table (Union[str, sqlalchemy.Query]): The SQL query (sqlalchemy Query or string) or name of the table from which to select.
  • params (Optional[Dict[str, Any]], default None): List or Dict of parameters to pass to pandas.read_sql(). See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html
  • dtype (Optional[Dict[str, Any]], default None): A dictionary of data types to pass to pandas.read_sql(). See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html
  • chunksize (Optional[int], default -1): How many chunks to read at a time. None will read everything in one large chunk. Defaults to system configuration.

    NOTE: DuckDB does not allow for chunking.

  • workers (Optional[int], default None): How many threads to use when consuming the generator. Only applies if chunk_hook is provided.
  • chunk_hook (Optional[Callable[[pandas.DataFrame], Any]], default None): Hook function to execute once per chunk, e.g. writing and reading chunks intermittently. See --sync-chunks for an example. NOTE: as_iterator MUST be False (default).
  • as_hook_results (bool, default False): If True, return a List of the outputs of the hook function. Only applicable if chunk_hook is not None.

    NOTE: as_iterator MUST be False (default).

  • chunks (Optional[int], default None): Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and return into a single dataframe. For example, to limit the returned dataframe to 100,000 rows, you could specify a chunksize of 1000 and chunks of 100.
  • schema (Optional[str], default None): If just a table name is provided, optionally specify the table schema. Defaults to SQLConnector.schema.
  • as_chunks (bool, default False): If True, return a list of DataFrames. Otherwise return a single DataFrame.
  • as_iterator (bool, default False): If True, return the pandas DataFrame iterator. chunksize must not be None (falls back to 1000 if so), and hooks are not called in this case.
  • index_col (Optional[str], default None): If using Dask, use this column as the index column. If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame.
  • silent (bool, default False): If True, don't raise warnings in case of errors. Defaults to False.
Returns
  • A pd.DataFrame (default case), or an iterator, or a list of dataframes / iterators,
  • or None if something breaks.
def value(self, query: str, *args: Any, use_pandas: bool = False, **kw: Any) -> Any:
389def value(
390        self,
391        query: str,
392        *args: Any,
393        use_pandas: bool = False,
394        **kw: Any
395    ) -> Any:
396    """
397    Execute the provided query and return the first value.
398
399    Parameters
400    ----------
401    query: str
402        The SQL query to execute.
403        
404    *args: Any
405        The arguments passed to `meerschaum.connectors.sql.SQLConnector.exec`
406        if `use_pandas` is `False` (default) or to `meerschaum.connectors.sql.SQLConnector.read`.
407        
408    use_pandas: bool, default False
409        If `True`, use `meerschaum.connectors.SQLConnector.read`, otherwise use
410        `meerschaum.connectors.sql.SQLConnector.exec` (default).
411        **NOTE:** This is always `True` for DuckDB.
412
413    **kw: Any
414        See `args`.
415
416    Returns
417    -------
418    Any value returned from the query.
419
420    """
421    from meerschaum.utils.packages import attempt_import
422    sqlalchemy = attempt_import('sqlalchemy')
423    if self.flavor == 'duckdb':
424        use_pandas = True
425    if use_pandas:
426        try:
427            return self.read(query, *args, **kw).iloc[0, 0]
428        except Exception as e:
429            return None
430
431    _close = kw.get('close', True)
432    _commit = kw.get('commit', (self.flavor != 'mssql'))
433    try:
434        result, connection = self.exec(
435            query,
436            *args,
437            with_connection = True,
438            close = False,
439            commit = _commit,
440            **kw
441        )
442        first = result.first() if result is not None else None
443        _val = first[0] if first is not None else None
444    except Exception as e:
445        warn(e, stacklevel=3)
446        return None
447    if _close:
448        try:
449            connection.close()
450        except Exception as e:
451            warn("Failed to close connection with exception:\n" + str(e))
452    return _val

Execute the provided query and return the first value.

Parameters
  • query (str): The SQL query to execute.
  • *args (Any): The arguments passed to meerschaum.connectors.sql.SQLConnector.exec if use_pandas is False (default) or to meerschaum.connectors.sql.SQLConnector.read.
  • use_pandas (bool, default False): If True, use meerschaum.connectors.SQLConnector.read, otherwise use meerschaum.connectors.sql.SQLConnector.exec (default). NOTE: This is always True for DuckDB.
  • **kw (Any): See args.
Returns
  • Any value returned from the query.
def exec( self, query: str, *args: Any, silent: bool = False, debug: bool = False, commit: Optional[bool] = None, close: Optional[bool] = None, with_connection: bool = False, **kw: Any) -> 'Union[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.cursor.LegacyCursorResult, Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection], Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection], None]':
466def exec(
467        self,
468        query: str,
469        *args: Any,
470        silent: bool = False,
471        debug: bool = False,
472        commit: Optional[bool] = None,
473        close: Optional[bool] = None,
474        with_connection: bool = False,
475        **kw: Any
476    ) -> Union[
477            sqlalchemy.engine.result.resultProxy,
478            sqlalchemy.engine.cursor.LegacyCursorResult,
479            Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection],
480            Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection],
481            None
482    ]:
483    """
484    Execute SQL code and return the `sqlalchemy` result, e.g. when calling stored procedures.
485    
486    If inserting data, please use bind variables to avoid SQL injection!
487
488    Parameters
489    ----------
490    query: Union[str, List[str], Tuple[str]]
491        The query to execute.
492        If `query` is a list or tuple, call `self.exec_queries()` instead.
493
494    args: Any
495        Arguments passed to `sqlalchemy.engine.execute`.
496        
497    silent: bool, default False
498        If `True`, suppress warnings.
499
500    commit: Optional[bool], default None
501        If `True`, commit the changes after execution.
502        Causes issues with flavors like `'mssql'`.
503        This does not apply if `query` is a list of strings.
504
505    close: Optional[bool], default None
506        If `True`, close the connection after execution.
507        Causes issues with flavors like `'mssql'`.
508        This does not apply if `query` is a list of strings.
509
510    with_connection: bool, default False
511        If `True`, return a tuple including the connection object.
512        This does not apply if `query` is a list of strings.
513    
514    Returns
515    -------
516    The `sqlalchemy` result object, or a tuple with the connection if `with_connection` is provided.
517
518    """
519    if isinstance(query, (list, tuple)):
520        return self.exec_queries(
521            list(query),
522            *args,
523            silent = silent,
524            debug = debug,
525            **kw
526        )
527
528    from meerschaum.utils.packages import attempt_import
529    sqlalchemy = attempt_import("sqlalchemy")
530    if debug:
531        dprint(f"[{self}] Executing query:\n{query}")
532
533    _close = close if close is not None else (self.flavor != 'mssql')
534    _commit = commit if commit is not None else (
535        (self.flavor != 'mssql' or 'select' not in str(query).lower())
536    )
537
538    ### Select and Insert objects need to be compiled (SQLAlchemy 2.0.0+).
539    if not hasattr(query, 'compile'):
540        query = sqlalchemy.text(query)
541
542    connection = self.engine.connect()
543    transaction = connection.begin() if _commit else None
544    try:
545        result = connection.execute(query, *args, **kw)
546        if _commit:
547            transaction.commit()
548    except Exception as e:
549        if debug:
550            dprint(f"[{self}] Failed to execute query:\n\n{query}\n\n{e}")
551        if not silent:
552            warn(str(e), stacklevel=3)
553        result = None
554        if _commit:
555            transaction.rollback()
556    finally:
557        if _close:
558            connection.close()
559
560        if with_connection:
561            return result, connection
562
563    return result

Execute SQL code and return the sqlalchemy result, e.g. when calling stored procedures.

If inserting data, please use bind variables to avoid SQL injection!

Parameters
  • query (Union[str, List[str], Tuple[str]]): The query to execute. If query is a list or tuple, call self.exec_queries() instead.
  • args (Any): Arguments passed to sqlalchemy.engine.execute.
  • silent (bool, default False): If True, suppress warnings.
  • commit (Optional[bool], default None): If True, commit the changes after execution. Causes issues with flavors like 'mssql'. This does not apply if query is a list of strings.
  • close (Optional[bool], default None): If True, close the connection after execution. Causes issues with flavors like 'mssql'. This does not apply if query is a list of strings.
  • with_connection (bool, default False): If True, return a tuple including the connection object. This does not apply if query is a list of strings.
Returns
  • The sqlalchemy result object, or a tuple with the connection if with_connection is provided.
def execute( self, *args: Any, **kw: Any) -> 'Optional[sqlalchemy.engine.result.resultProxy]':
455def execute(
456        self,
457        *args : Any,
458        **kw : Any
459    ) -> Optional[sqlalchemy.engine.result.resultProxy]:
460    """
461    An alias for `meerschaum.connectors.sql.SQLConnector.exec`.
462    """
463    return self.exec(*args, **kw)

An alias for meerschaum.connectors.sql.SQLConnector.exec.

def to_sql( self, df: pandas.core.frame.DataFrame, name: str = None, index: bool = False, if_exists: str = 'replace', method: str = '', chunksize: Optional[int] = -1, schema: Optional[str] = None, silent: bool = False, debug: bool = False, as_tuple: bool = False, as_dict: bool = False, **kw) -> Union[bool, Tuple[bool, str]]:
660def to_sql(
661        self,
662        df: pandas.DataFrame,
663        name: str = None,
664        index: bool = False,
665        if_exists: str = 'replace',
666        method: str = "",
667        chunksize: Optional[int] = -1,
668        schema: Optional[str] = None,
669        silent: bool = False,
670        debug: bool = False,
671        as_tuple: bool = False,
672        as_dict: bool = False,
673        **kw
674    ) -> Union[bool, SuccessTuple]:
675    """
676    Upload a DataFrame's contents to the SQL server.
677
678    Parameters
679    ----------
680    df: pd.DataFrame
681        The DataFrame to be uploaded.
682
683    name: str
684        The name of the table to be created.
685
686    index: bool, default False
687        If True, creates the DataFrame's indices as columns.
688
689    if_exists: str, default 'replace'
690        Drop and create the table ('replace') or append if it exists
691        ('append') or raise Exception ('fail').
692        Options are ['replace', 'append', 'fail'].
693
694    method: str, default ''
695        None or multi. Details on pandas.to_sql.
696
697    chunksize: Optional[int], default -1
698        How many rows to insert at a time.
699
700    schema: Optional[str], default None
701        Optionally override the schema for the table.
702        Defaults to `SQLConnector.schema`.
703
704    as_tuple: bool, default False
705        If `True`, return a (success_bool, message) tuple instead of a `bool`.
706        Defaults to `False`.
707
708    as_dict: bool, default False
709        If `True`, return a dictionary of transaction information.
710        The keys are `success`, `msg`, `start`, `end`, `duration`, `num_rows`, `chunksize`,
711        `method`, and `target`.
712        
713    kw: Any
714        Additional arguments will be passed to the DataFrame's `to_sql` function
715
716    Returns
717    -------
718    Either a `bool` or a `SuccessTuple` (depends on `as_tuple`).
719    """
720    import time
721    import json
722    import decimal
723    from decimal import Decimal, Context
724    from meerschaum.utils.warnings import error, warn
725    import warnings
726    import functools
727    if name is None:
728        error(f"Name must not be `None` to insert data into {self}.")
729
730    ### We're requiring `name` to be positional, and sometimes it's passed in from background jobs.
731    kw.pop('name', None)
732
733    schema = schema or self.schema
734
735    from meerschaum.utils.sql import (
736        sql_item_name,
737        table_exists,
738        json_flavors,
739        truncate_item_name,
740    )
741    from meerschaum.utils.dataframe import get_json_cols, get_numeric_cols
742    from meerschaum.utils.dtypes import are_dtypes_equal, quantize_decimal
743    from meerschaum.utils.dtypes.sql import NUMERIC_PRECISION_FLAVORS
744    from meerschaum.connectors.sql._create_engine import flavor_configs
745    from meerschaum.utils.packages import attempt_import, import_pandas
746    sqlalchemy = attempt_import('sqlalchemy', debug=debug)
747    pd = import_pandas()
748    is_dask = 'dask' in df.__module__
749
750    stats = {'target': name, }
751    ### resort to defaults if None
752    if method == "":
753        if self.flavor in _bulk_flavors:
754            method = functools.partial(psql_insert_copy, schema=self.schema)
755        else:
756            ### Should resolve to 'multi' or `None`.
757            method = flavor_configs.get(self.flavor, {}).get('to_sql', {}).get('method', 'multi')
758    stats['method'] = method.__name__ if hasattr(method, '__name__') else str(method)
759
760    default_chunksize = self._sys_config.get('chunksize', None)
761    chunksize = chunksize if chunksize != -1 else default_chunksize
762    if chunksize is not None and self.flavor in _max_chunks_flavors:
763        if chunksize > _max_chunks_flavors[self.flavor]:
764            if chunksize != default_chunksize:
765                warn(
766                    f"The specified chunksize of {chunksize} exceeds the maximum of "
767                    + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n"
768                    + f"    Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.",
769                    stacklevel = 3,
770                )
771            chunksize = _max_chunks_flavors[self.flavor]
772    stats['chunksize'] = chunksize
773
774    success, msg = False, "Default to_sql message"
775    start = time.perf_counter()
776    if debug:
777        msg = f"[{self}] Inserting {len(df)} rows with chunksize: {chunksize}..."
778        print(msg, end="", flush=True)
779    stats['num_rows'] = len(df)
780
781    ### Check if the name is too long.
782    truncated_name = truncate_item_name(name, self.flavor)
783    if name != truncated_name:
784        warn(
785            f"Table '{name}' is too long for '{self.flavor}',"
786            + f" will instead create the table '{truncated_name}'."
787        )
788
789    ### filter out non-pandas args
790    import inspect
791    to_sql_params = inspect.signature(df.to_sql).parameters
792    to_sql_kw = {}
793    for k, v in kw.items():
794        if k in to_sql_params:
795            to_sql_kw[k] = v
796
797    to_sql_kw.update({
798        'name': truncated_name,
799        'schema': schema,
800        ('con' if not is_dask else 'uri'): (self.engine if not is_dask else self.URI),
801        'index': index,
802        'if_exists': if_exists,
803        'method': method,
804        'chunksize': chunksize,
805    })
806    if is_dask:
807        to_sql_kw.update({
808            'parallel': True,
809        })
810
811    if self.flavor == 'oracle':
812        ### For some reason 'replace' doesn't work properly in pandas,
813        ### so try dropping first.
814        if if_exists == 'replace' and table_exists(name, self, schema=schema, debug=debug):
815            success = self.exec(
816                "DROP TABLE " + sql_item_name(name, 'oracle', schema)
817            ) is not None
818            if not success:
819                warn(f"Unable to drop {name}")
820
821
822        ### Enforce NVARCHAR(2000) as text instead of CLOB.
823        dtype = to_sql_kw.get('dtype', {})
824        for col, typ in df.dtypes.items():
825            if are_dtypes_equal(str(typ), 'object'):
826                dtype[col] = sqlalchemy.types.NVARCHAR(2000)
827            elif are_dtypes_equal(str(typ), 'int'):
828                dtype[col] = sqlalchemy.types.INTEGER
829        to_sql_kw['dtype'] = dtype
830    elif self.flavor == 'mssql':
831        dtype = to_sql_kw.get('dtype', {})
832        for col, typ in df.dtypes.items():
833            if are_dtypes_equal(str(typ), 'bool'):
834                dtype[col] = sqlalchemy.types.INTEGER
835        to_sql_kw['dtype'] = dtype
836
837    ### Check for JSON columns.
838    if self.flavor not in json_flavors:
839        json_cols = get_json_cols(df)
840        if json_cols:
841            for col in json_cols:
842                df[col] = df[col].apply(
843                    (
844                        lambda x: json.dumps(x, default=str, sort_keys=True)
845                        if not isinstance(x, Hashable)
846                        else x
847                    )
848                )
849
850    ### Check for numeric columns.
851    numeric_scale, numeric_precision = NUMERIC_PRECISION_FLAVORS.get(self.flavor, (None, None))
852    if numeric_precision is not None and numeric_scale is not None:
853        numeric_cols = get_numeric_cols(df)
854        for col in numeric_cols:
855            df[col] = df[col].apply(
856                lambda x: (
857                    quantize_decimal(x, numeric_scale, numeric_precision)
858                    if isinstance(x, Decimal)
859                    else x
860                )
861            )
862
863    try:
864        with warnings.catch_warnings():
865            warnings.filterwarnings('ignore', 'case sensitivity issues')
866            df.to_sql(**to_sql_kw)
867        success = True
868    except Exception as e:
869        if not silent:
870            warn(str(e))
871        success, msg = False, str(e)
872
873    end = time.perf_counter()
874    if success:
875        msg = f"It took {round(end - start, 2)} seconds to sync {len(df)} rows to {name}."
876    stats['start'] = start
877    stats['end'] = end
878    stats['duration'] = end - start
879
880    if debug:
881        print(f" done.", flush=True)
882        dprint(msg)
883
884    stats['success'] = success
885    stats['msg'] = msg
886    if as_tuple:
887        return success, msg
888    if as_dict:
889        return stats
890    return success

Upload a DataFrame's contents to the SQL server.

Parameters
  • df (pd.DataFrame): The DataFrame to be uploaded.
  • name (str): The name of the table to be created.
  • index (bool, default False): If True, creates the DataFrame's indices as columns.
  • if_exists (str, default 'replace'): Drop and create the table ('replace') or append if it exists ('append') or raise Exception ('fail'). Options are ['replace', 'append', 'fail'].
  • method (str, default ''): None or multi. Details on pandas.to_sql.
  • chunksize (Optional[int], default -1): How many rows to insert at a time.
  • schema (Optional[str], default None): Optionally override the schema for the table. Defaults to SQLConnector.schema.
  • as_tuple (bool, default False): If True, return a (success_bool, message) tuple instead of a bool. Defaults to False.
  • as_dict (bool, default False): If True, return a dictionary of transaction information. The keys are success, msg, start, end, duration, num_rows, chunksize, method, and target.
  • kw (Any): Additional arguments will be passed to the DataFrame's to_sql function
Returns
  • Either a bool or a SuccessTuple (depends on as_tuple).
def exec_queries( self, queries: "List[Union[str, Tuple[str, Callable[['sqlalchemy.orm.session.Session'], List[str]]]]]", break_on_error: bool = False, rollback: bool = True, silent: bool = False, debug: bool = False) -> 'List[sqlalchemy.engine.cursor.LegacyCursorResult]':
566def exec_queries(
567        self,
568        queries: List[
569            Union[
570                str,
571                Tuple[str, Callable[['sqlalchemy.orm.session.Session'], List[str]]]
572            ]
573        ],
574        break_on_error: bool = False,
575        rollback: bool = True,
576        silent: bool = False,
577        debug: bool = False,
578    ) -> List[sqlalchemy.engine.cursor.LegacyCursorResult]:
579    """
580    Execute a list of queries in a single transaction.
581
582    Parameters
583    ----------
584    queries: List[
585        Union[
586            str,
587            Tuple[str, Callable[[], List[str]]]
588        ]
589    ]
590        The queries in the transaction to be executed.
591        If a query is a tuple, the second item of the tuple
592        will be considered a callable hook that returns a list of queries to be executed
593        before the next item in the list.
594
595    break_on_error: bool, default True
596        If `True`, stop executing when a query fails.
597
598    rollback: bool, default True
599        If `break_on_error` is `True`, rollback the transaction if a query fails.
600
601    silent: bool, default False
602        If `True`, suppress warnings.
603
604    Returns
605    -------
606    A list of SQLAlchemy results.
607    """
608    from meerschaum.utils.warnings import warn
609    from meerschaum.utils.debug import dprint
610    from meerschaum.utils.packages import attempt_import
611    sqlalchemy, sqlalchemy_orm = attempt_import('sqlalchemy', 'sqlalchemy.orm')
612    session = sqlalchemy_orm.Session(self.engine)
613
614    result = None
615    results = []
616    with session.begin():
617        for query in queries:
618            hook = None
619            result = None
620
621            if isinstance(query, tuple):
622                query, hook = query
623            if isinstance(query, str):
624                query = sqlalchemy.text(query)
625
626            if debug:
627                dprint(f"[{self}]\n" + str(query))
628 
629            try:
630                result = session.execute(query)
631                session.flush()
632            except Exception as e:
633                msg = (f"Encountered error while executing:\n{e}")
634                if not silent:
635                    warn(msg)
636                elif debug:
637                    dprint(f"[{self}]\n" + str(msg))
638                result = None
639            if result is None and break_on_error:
640                if rollback:
641                    session.rollback()
642                break
643            elif result is not None and hook is not None:
644                hook_queries = hook(session)
645                if hook_queries:
646                    hook_results = self.exec_queries(
647                        hook_queries,
648                        break_on_error = break_on_error,
649                        rollback = rollback,
650                        silent = silent,
651                        debug = debug,
652                    )
653                    result = (result, hook_results)
654
655            results.append(result)
656
657    return results

Execute a list of queries in a single transaction.

Parameters
  • queries (List[): Union[ str, Tuple[str, Callable[[], List[str]]] ]
  • ]: The queries in the transaction to be executed. If a query is a tuple, the second item of the tuple will be considered a callable hook that returns a list of queries to be executed before the next item in the list.
  • break_on_error (bool, default True): If True, stop executing when a query fails.
  • rollback (bool, default True): If break_on_error is True, rollback the transaction if a query fails.
  • silent (bool, default False): If True, suppress warnings.
Returns
  • A list of SQLAlchemy results.
def test_connection(self, **kw: Any) -> Optional[bool]:
414def test_connection(
415        self,
416        **kw: Any
417    ) -> Union[bool, None]:
418    """
419    Test if a successful connection to the database may be made.
420
421    Parameters
422    ----------
423    **kw:
424        The keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`.
425
426    Returns
427    -------
428    `True` if a connection is made, otherwise `False` or `None` in case of failure.
429
430    """
431    import warnings
432    from meerschaum.connectors.poll import retry_connect
433    _default_kw = {'max_retries': 1, 'retry_wait': 0, 'warn': False, 'connector': self}
434    _default_kw.update(kw)
435    with warnings.catch_warnings():
436        warnings.filterwarnings('ignore', 'Could not')
437        try:
438            return retry_connect(**_default_kw)
439        except Exception as e:
440            return False

Test if a successful connection to the database may be made.

Parameters
  • **kw:: The keyword arguments are passed to meerschaum.connectors.poll.retry_connect.
Returns
  • True if a connection is made, otherwise False or None in case of failure.
def fetch( self, pipe: meerschaum.core.Pipe.Pipe, begin: Union[datetime.datetime, int, str, NoneType] = '', end: Union[datetime.datetime, int, str, NoneType] = None, check_existing: bool = True, chunk_hook: 'Optional[Callable[[pd.DataFrame], Any]]' = None, chunksize: Optional[int] = -1, workers: Optional[int] = None, debug: bool = False, **kw: Any) -> "Union['pd.DataFrame', List[Any], None]":
 15def fetch(
 16        self,
 17        pipe: meerschaum.Pipe,
 18        begin: Union[datetime, int, str, None] = '',
 19        end: Union[datetime, int, str, None] = None,
 20        check_existing: bool = True,
 21        chunk_hook: Optional[Callable[[pd.DataFrame], Any]] = None,
 22        chunksize: Optional[int] = -1,
 23        workers: Optional[int] = None,
 24        debug: bool = False,
 25        **kw: Any
 26    ) -> Union['pd.DataFrame', List[Any], None]:
 27    """Execute the SQL definition and return a Pandas DataFrame.
 28
 29    Parameters
 30    ----------
 31    pipe: meerschaum.Pipe
 32        The pipe object which contains the `fetch` metadata.
 33        
 34        - pipe.columns['datetime']: str
 35            - Name of the datetime column for the remote table.
 36        - pipe.parameters['fetch']: Dict[str, Any]
 37            - Parameters necessary to execute a query.
 38        - pipe.parameters['fetch']['definition']: str
 39            - Raw SQL query to execute to generate the pandas DataFrame.
 40        - pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
 41            - How many minutes before `begin` to search for data (*optional*).
 42
 43    begin: Union[datetime, int, str, None], default None
 44        Most recent datatime to search for data.
 45        If `backtrack_minutes` is provided, subtract `backtrack_minutes`.
 46
 47    end: Union[datetime, int, str, None], default None
 48        The latest datetime to search for data.
 49        If `end` is `None`, do not bound 
 50
 51    check_existing: bool, defult True
 52        If `False`, use a backtrack interval of 0 minutes.
 53
 54    chunk_hook: Callable[[pd.DataFrame], Any], default None
 55        A function to pass to `SQLConnector.read()` that accepts a Pandas DataFrame.
 56
 57    chunksize: Optional[int], default -1
 58        How many rows to load into memory at once (when `chunk_hook` is provided).
 59        Otherwise the entire result set is loaded into memory.
 60
 61    workers: Optional[int], default None
 62        How many threads to use when consuming the generator (when `chunk_hook is provided).
 63        Defaults to the number of cores.
 64
 65    debug: bool, default False
 66        Verbosity toggle.
 67       
 68    Returns
 69    -------
 70    A pandas DataFrame or `None`.
 71    If `chunk_hook` is not None, return a list of the hook function's results.
 72    """
 73    meta_def = self.get_pipe_metadef(
 74        pipe,
 75        begin = begin,
 76        end = end,
 77        check_existing = check_existing,
 78        debug = debug,
 79        **kw
 80    )
 81    as_hook_results = chunk_hook is not None
 82    chunks = self.read(
 83        meta_def,
 84        chunk_hook = chunk_hook,
 85        as_hook_results = as_hook_results,
 86        chunksize = chunksize,
 87        workers = workers,
 88        debug = debug,
 89    )
 90    ### if sqlite, parse for datetimes
 91    if not as_hook_results and self.flavor == 'sqlite':
 92        from meerschaum.utils.misc import parse_df_datetimes
 93        ignore_cols = [
 94            col
 95            for col, dtype in pipe.dtypes.items()
 96            if 'datetime' not in str(dtype)
 97        ]
 98        return (
 99            parse_df_datetimes(
100                chunk,
101                ignore_cols = ignore_cols,
102                debug = debug,
103            )
104            for chunk in chunks
105        )
106    return chunks

Execute the SQL definition and return a Pandas DataFrame.

Parameters
  • pipe (meerschaum.Pipe): The pipe object which contains the fetch metadata.

    • pipe.columns['datetime']: str
      • Name of the datetime column for the remote table.
    • pipe.parameters['fetch']: Dict[str, Any]
      • Parameters necessary to execute a query.
    • pipe.parameters['fetch']['definition']: str
      • Raw SQL query to execute to generate the pandas DataFrame.
    • pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
      • How many minutes before begin to search for data (optional).
  • begin (Union[datetime, int, str, None], default None): Most recent datatime to search for data. If backtrack_minutes is provided, subtract backtrack_minutes.
  • end (Union[datetime, int, str, None], default None): The latest datetime to search for data. If end is None, do not bound
  • check_existing (bool, defult True): If False, use a backtrack interval of 0 minutes.
  • chunk_hook (Callable[[pd.DataFrame], Any], default None): A function to pass to SQLConnector.read() that accepts a Pandas DataFrame.
  • chunksize (Optional[int], default -1): How many rows to load into memory at once (when chunk_hook is provided). Otherwise the entire result set is loaded into memory.
  • workers (Optional[int], default None): How many threads to use when consuming the generator (when `chunk_hook is provided). Defaults to the number of cores.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pandas DataFrame or None.
  • If chunk_hook is not None, return a list of the hook function's results.
def get_pipe_metadef( self, pipe: meerschaum.core.Pipe.Pipe, params: 'Optional[Dict[str, Any]]' = None, begin: Union[datetime.datetime, int, str, NoneType] = '', end: Union[datetime.datetime, int, str, NoneType] = None, check_existing: bool = True, debug: bool = False, **kw: Any) -> Optional[str]:
109def get_pipe_metadef(
110        self,
111        pipe: meerschaum.Pipe,
112        params: Optional[Dict[str, Any]] = None,
113        begin: Union[datetime, int, str, None] = '',
114        end: Union[datetime, int, str, None] = None,
115        check_existing: bool = True,
116        debug: bool = False,
117        **kw: Any
118    ) -> Union[str, None]:
119    """
120    Return a pipe's meta definition fetch query.
121
122    params: Optional[Dict[str, Any]], default None
123        Optional params dictionary to build the `WHERE` clause.
124        See `meerschaum.utils.sql.build_where`.
125
126    begin: Union[datetime, int, str, None], default None
127        Most recent datatime to search for data.
128        If `backtrack_minutes` is provided, subtract `backtrack_minutes`.
129
130    end: Union[datetime, int, str, None], default None
131        The latest datetime to search for data.
132        If `end` is `None`, do not bound 
133
134    check_existing: bool, default True
135        If `True`, apply the backtrack interval.
136
137    debug: bool, default False
138        Verbosity toggle.
139
140    Returns
141    -------
142    A pipe's meta definition fetch query string.
143    """
144    from meerschaum.utils.debug import dprint
145    from meerschaum.utils.warnings import warn, error
146    from meerschaum.utils.sql import sql_item_name, dateadd_str, build_where
147    from meerschaum.utils.misc import is_int
148    from meerschaum.config import get_config
149
150    definition = get_pipe_query(pipe)
151
152    if not pipe.columns.get('datetime', None):
153        _dt = pipe.guess_datetime()
154        dt_name = sql_item_name(_dt, self.flavor, None) if _dt else None
155        is_guess = True
156    else:
157        _dt = pipe.get_columns('datetime')
158        dt_name = sql_item_name(_dt, self.flavor, None)
159        is_guess = False
160
161    if begin not in (None, '') or end is not None:
162        if is_guess:
163            if _dt is None:
164                warn(
165                    f"Unable to determine a datetime column for {pipe}."
166                    + "\n    Ignoring begin and end...",
167                    stack = False,
168                )
169                begin, end = '', None
170            else:
171                warn(
172                    f"A datetime wasn't specified for {pipe}.\n"
173                    + f"    Using column \"{_dt}\" for datetime bounds...",
174                    stack = False
175                )
176
177
178    apply_backtrack = begin == '' and check_existing
179    backtrack_interval = pipe.get_backtrack_interval(check_existing=check_existing, debug=debug)
180    btm = (
181        int(backtrack_interval.total_seconds() / 60)
182        if isinstance(backtrack_interval, timedelta)
183        else backtrack_interval
184    )
185    begin = (
186        pipe.get_sync_time(debug=debug)
187        if begin == ''
188        else begin
189    )
190
191    if begin and end and begin >= end:
192        begin = None
193    
194    da = None
195    if dt_name:
196        begin_da = (
197            dateadd_str(
198                flavor = self.flavor,
199                datepart = 'minute',
200                number = ((-1 * btm) if apply_backtrack else 0), 
201                begin = begin,
202            )
203            if begin
204            else None
205        )
206        end_da = (
207            dateadd_str(
208                flavor = self.flavor,
209                datepart = 'minute',
210                number = 0,
211                begin = end,
212            )
213            if end
214            else None
215        )
216
217    meta_def = (
218        _simple_fetch_query(pipe) if (
219            (not (pipe.columns or {}).get('id', None))
220            or (not get_config('system', 'experimental', 'join_fetch'))
221        ) else _join_fetch_query(pipe, debug=debug, **kw)
222    )
223
224    has_where = 'where' in meta_def.lower()[meta_def.lower().rfind('definition'):]
225    if dt_name and (begin_da or end_da):
226        definition_dt_name = (
227            dateadd_str(self.flavor, 'minute', 0, f"definition.{dt_name}")
228            if not is_int((begin_da or end_da))
229            else f"definition.{dt_name}"
230        )
231        meta_def += "\n" + ("AND" if has_where else "WHERE") + " "
232        has_where = True
233        if begin_da:
234            meta_def += f"{definition_dt_name} >= {begin_da}"
235        if begin_da and end_da:
236            meta_def += " AND "
237        if end_da:
238            meta_def += f"{definition_dt_name} < {end_da}"
239
240    if params is not None:
241        params_where = build_where(params, self, with_where=False)
242        meta_def += "\n" + ("AND" if has_where else "WHERE") + " "
243        has_where = True
244        meta_def += params_where
245
246    return meta_def

Return a pipe's meta definition fetch query.

params: Optional[Dict[str, Any]], default None Optional params dictionary to build the WHERE clause. See meerschaum.utils.sql.build_where.

begin: Union[datetime, int, str, None], default None Most recent datatime to search for data. If backtrack_minutes is provided, subtract backtrack_minutes.

end: Union[datetime, int, str, None], default None The latest datetime to search for data. If end is None, do not bound

check_existing: bool, default True If True, apply the backtrack interval.

debug: bool, default False Verbosity toggle.

Returns
  • A pipe's meta definition fetch query string.
def cli(self, debug: bool = False) -> Tuple[bool, str]:
35def cli(
36        self,
37        debug: bool = False,
38    ) -> SuccessTuple:
39    """
40    Launch a subprocess for an interactive CLI.
41    """
42    from meerschaum.utils.venv import venv_exec
43    env = copy.deepcopy(dict(os.environ))
44    env[f'MRSM_SQL_{self.label.upper()}'] = json.dumps(self.meta)
45    cli_code = (
46        "import meerschaum as mrsm\n"
47        f"conn = mrsm.get_connector('sql:{self.label}')\n"
48        f"conn._cli_exit()"
49    )
50    try:
51        _ = venv_exec(cli_code, venv=None, debug=debug, capture_output=False)
52    except Exception as e:
53        return False, f"[{self}] Failed to start CLI:\n{e}"
54    return True, "Success"

Launch a subprocess for an interactive CLI.

def fetch_pipes_keys( self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) -> Optional[List[Tuple[str, str, Optional[str]]]]:
143def fetch_pipes_keys(
144        self,
145        connector_keys: Optional[List[str]] = None,
146        metric_keys: Optional[List[str]] = None,
147        location_keys: Optional[List[str]] = None,
148        tags: Optional[List[str]] = None,
149        params: Optional[Dict[str, Any]] = None,
150        debug: bool = False
151    ) -> Optional[List[Tuple[str, str, Optional[str]]]]:
152    """
153    Return a list of tuples corresponding to the parameters provided.
154
155    Parameters
156    ----------
157    connector_keys: Optional[List[str]], default None
158        List of connector_keys to search by.
159
160    metric_keys: Optional[List[str]], default None
161        List of metric_keys to search by.
162
163    location_keys: Optional[List[str]], default None
164        List of location_keys to search by.
165
166    params: Optional[Dict[str, Any]], default None
167        Dictionary of additional parameters to search by.
168        E.g. `--params pipe_id:1`
169
170    debug: bool, default False
171        Verbosity toggle.
172    """
173    from meerschaum.utils.debug import dprint
174    from meerschaum.utils.packages import attempt_import
175    from meerschaum.utils.misc import separate_negation_values, flatten_list
176    from meerschaum.utils.sql import OMIT_NULLSFIRST_FLAVORS, table_exists
177    from meerschaum.config.static import STATIC_CONFIG
178    import json
179    from copy import deepcopy
180    sqlalchemy, sqlalchemy_sql_functions = attempt_import('sqlalchemy', 'sqlalchemy.sql.functions')
181    coalesce = sqlalchemy_sql_functions.coalesce
182
183    if connector_keys is None:
184        connector_keys = []
185    if metric_keys is None:
186        metric_keys = []
187    if location_keys is None:
188        location_keys = []
189    else:
190        location_keys = [
191            (
192                lk
193                if lk not in ('[None]', 'None', 'null')
194                else 'None'
195            )
196            for lk in location_keys
197        ]
198    if tags is None:
199        tags = []
200
201    if params is None:
202        params = {}
203
204    ### Add three primary keys to params dictionary
205    ###   (separated for convenience of arguments).
206    cols = {
207        'connector_keys': [str(ck) for ck in connector_keys],
208        'metric_key': [str(mk) for mk in metric_keys],
209        'location_key': [str(lk) for lk in location_keys],
210    }
211
212    ### Make deep copy so we don't mutate this somewhere else.
213    parameters = deepcopy(params)
214    for col, vals in cols.items():
215        if vals not in [[], ['*']]:
216            parameters[col] = vals
217
218    if not table_exists('mrsm_pipes', self, schema=self.instance_schema, debug=debug):
219        return []
220
221    from meerschaum.connectors.sql.tables import get_tables
222    pipes_tbl = get_tables(mrsm_instance=self, create=False, debug=debug)['pipes']
223
224    _params = {}
225    for k, v in parameters.items():
226        _v = json.dumps(v) if isinstance(v, dict) else v
227        _params[k] = _v
228
229    negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
230    ### Parse regular params.
231    ### If a param begins with '_', negate it instead.
232    _where = [
233        (
234            (coalesce(pipes_tbl.c[key], 'None') == val)
235            if not str(val).startswith(negation_prefix)
236            else (pipes_tbl.c[key] != key)
237        ) for key, val in _params.items()
238        if not isinstance(val, (list, tuple)) and key in pipes_tbl.c
239    ]
240    select_cols = (
241        [
242            pipes_tbl.c.connector_keys,
243            pipes_tbl.c.metric_key,
244            pipes_tbl.c.location_key,
245        ]
246    )
247
248    q = sqlalchemy.select(*select_cols).where(sqlalchemy.and_(True, *_where))
249    for c, vals in cols.items():
250        if not isinstance(vals, (list, tuple)) or not vals or not c in pipes_tbl.c:
251            continue
252        _in_vals, _ex_vals = separate_negation_values(vals)
253        q = q.where(coalesce(pipes_tbl.c[c], 'None').in_(_in_vals)) if _in_vals else q
254        q = q.where(coalesce(pipes_tbl.c[c], 'None').not_in(_ex_vals)) if _ex_vals else q
255
256    ### Finally, parse tags.
257    tag_groups = [tag.split(',') for tag in tags]
258    in_ex_tag_groups = [separate_negation_values(tag_group) for tag_group in tag_groups]
259
260    ors, nands = [], []
261    for _in_tags, _ex_tags in in_ex_tag_groups:
262        sub_ands = []
263        for nt in _in_tags:
264            sub_ands.append(
265                sqlalchemy.cast(
266                    pipes_tbl.c['parameters'],
267                    sqlalchemy.String,
268                ).like(f'%"tags":%"{nt}"%')
269            )
270        if sub_ands:
271            ors.append(sqlalchemy.and_(*sub_ands))
272
273        for xt in _ex_tags:
274            nands.append(
275                sqlalchemy.cast(
276                    pipes_tbl.c['parameters'],
277                    sqlalchemy.String,
278                ).not_like(f'%"tags":%"{xt}"%')
279            )
280
281    q = q.where(sqlalchemy.and_(*nands)) if nands else q
282    q = q.where(sqlalchemy.or_(*ors)) if ors else q
283    loc_asc = sqlalchemy.asc(pipes_tbl.c['location_key'])
284    if self.flavor not in OMIT_NULLSFIRST_FLAVORS:
285        loc_asc = sqlalchemy.nullsfirst(loc_asc)
286    q = q.order_by(
287        sqlalchemy.asc(pipes_tbl.c['connector_keys']),
288        sqlalchemy.asc(pipes_tbl.c['metric_key']),
289        loc_asc,
290    )
291
292    ### execute the query and return a list of tuples
293    if debug:
294        dprint(q.compile(compile_kwargs={'literal_binds': True}))
295    try:
296        rows = (
297            self.execute(q).fetchall()
298            if self.flavor != 'duckdb'
299            else [
300                (row['connector_keys'], row['metric_key'], row['location_key'])
301                for row in self.read(q).to_dict(orient='records')
302            ]
303        )
304    except Exception as e:
305        error(str(e))
306
307    return [(row[0], row[1], row[2]) for row in rows]

Return a list of tuples corresponding to the parameters provided.

Parameters
  • connector_keys (Optional[List[str]], default None): List of connector_keys to search by.
  • metric_keys (Optional[List[str]], default None): List of metric_keys to search by.
  • location_keys (Optional[List[str]], default None): List of location_keys to search by.
  • params (Optional[Dict[str, Any]], default None): Dictionary of additional parameters to search by. E.g. --params pipe_id:1
  • debug (bool, default False): Verbosity toggle.
def create_indices( self, pipe: meerschaum.core.Pipe.Pipe, indices: Optional[List[str]] = None, debug: bool = False) -> bool:
310def create_indices(
311        self,
312        pipe: mrsm.Pipe,
313        indices: Optional[List[str]] = None,
314        debug: bool = False
315    ) -> bool:
316    """
317    Create a pipe's indices.
318    """
319    from meerschaum.utils.sql import sql_item_name, update_queries
320    from meerschaum.utils.debug import dprint
321    if debug:
322        dprint(f"Creating indices for {pipe}...")
323    if not pipe.columns:
324        warn(f"{pipe} has no index columns; skipping index creation.", stack=False)
325        return True
326
327    ix_queries = {
328        ix: queries
329        for ix, queries in self.get_create_index_queries(pipe, debug=debug).items()
330        if indices is None or ix in indices
331    }
332    success = True
333    for ix, queries in ix_queries.items():
334        ix_success = all(self.exec_queries(queries, debug=debug, silent=False))
335        success = success and ix_success
336        if not ix_success:
337            warn(f"Failed to create index on column: {ix}")
338
339    return success

Create a pipe's indices.

def drop_indices( self, pipe: meerschaum.core.Pipe.Pipe, indices: Optional[List[str]] = None, debug: bool = False) -> bool:
342def drop_indices(
343        self,
344        pipe: mrsm.Pipe,
345        indices: Optional[List[str]] = None,
346        debug: bool = False
347    ) -> bool:
348    """
349    Drop a pipe's indices.
350    """
351    from meerschaum.utils.debug import dprint
352    if debug:
353        dprint(f"Dropping indices for {pipe}...")
354    if not pipe.columns:
355        warn(f"Unable to drop indices for {pipe} without columns.", stack=False)
356        return False
357    ix_queries = {
358        ix: queries
359        for ix, queries in self.get_drop_index_queries(pipe, debug=debug).items()
360        if indices is None or ix in indices
361    }
362    success = True
363    for ix, queries in ix_queries.items():
364        ix_success = all(self.exec_queries(queries, debug=debug, silent=True))
365        if not ix_success:
366            success = False
367            if debug:
368                dprint(f"Failed to drop index on column: {ix}")
369    return success

Drop a pipe's indices.

def get_create_index_queries( self, pipe: meerschaum.core.Pipe.Pipe, debug: bool = False) -> Dict[str, List[str]]:
372def get_create_index_queries(
373        self,
374        pipe: mrsm.Pipe,
375        debug: bool = False,
376    ) -> Dict[str, List[str]]:
377    """
378    Return a dictionary mapping columns to a `CREATE INDEX` or equivalent query.
379
380    Parameters
381    ----------
382    pipe: mrsm.Pipe
383        The pipe to which the queries will correspond.
384
385    Returns
386    -------
387    A dictionary of column names mapping to lists of queries.
388    """
389    from meerschaum.utils.sql import (
390        sql_item_name,
391        get_distinct_col_count,
392        update_queries,
393        get_null_replacement,
394        COALESCE_UNIQUE_INDEX_FLAVORS,
395    )
396    from meerschaum.config import get_config
397    index_queries = {}
398
399    upsert = pipe.parameters.get('upsert', False) and (self.flavor + '-upsert') in update_queries
400    indices = pipe.get_indices()
401
402    _datetime = pipe.get_columns('datetime', error=False)
403    _datetime_type = pipe.dtypes.get(_datetime, 'datetime64[ns]')
404    _datetime_name = (
405        sql_item_name(_datetime, self.flavor, None)
406        if _datetime is not None else None
407    )
408    _datetime_index_name = (
409        sql_item_name(indices['datetime'], self.flavor, None)
410        if indices.get('datetime', None)
411        else None
412    )
413    _id = pipe.get_columns('id', error=False)
414    _id_name = (
415        sql_item_name(_id, self.flavor, None)
416        if _id is not None
417        else None
418    )
419
420    _id_index_name = (
421        sql_item_name(indices['id'], self.flavor, None)
422        if indices.get('id', None)
423        else None
424    )
425    _pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))
426    _create_space_partition = get_config('system', 'experimental', 'space')
427
428    ### create datetime index
429    if _datetime is not None:
430        if self.flavor == 'timescaledb' and pipe.parameters.get('hypertable', True):
431            _id_count = (
432                get_distinct_col_count(_id, f"SELECT {_id_name} FROM {_pipe_name}", self)
433                if (_id is not None and _create_space_partition) else None
434            )
435
436            chunk_interval = pipe.get_chunk_interval(debug=debug)
437            chunk_interval_minutes = (
438                chunk_interval
439                if isinstance(chunk_interval, int)
440                else int(chunk_interval.total_seconds() / 60)
441            )
442            chunk_time_interval = (
443                f"INTERVAL '{chunk_interval_minutes} MINUTES'"
444                if isinstance(chunk_interval, timedelta)
445                else f'{chunk_interval_minutes}'
446            )
447
448            dt_query = (
449                f"SELECT public.create_hypertable('{_pipe_name}', " +
450                f"'{_datetime}', "
451                + (
452                    f"'{_id}', {_id_count}, " if (_id is not None and _create_space_partition)
453                    else ''
454                )
455                + f'chunk_time_interval => {chunk_time_interval}, '
456                + 'if_not_exists => true, '
457                + "migrate_data => true);"
458            )
459        else: ### mssql, sqlite, etc.
460            dt_query = (
461                f"CREATE INDEX {_datetime_index_name} "
462                + f"ON {_pipe_name} ({_datetime_name})"
463            )
464
465        index_queries[_datetime] = [dt_query]
466
467    ### create id index
468    if _id_name is not None:
469        if self.flavor == 'timescaledb':
470            ### Already created indices via create_hypertable.
471            id_query = (
472                None if (_id is not None and _create_space_partition)
473                else (
474                    f"CREATE INDEX IF NOT EXISTS {_id_index_name} ON {_pipe_name} ({_id_name})"
475                    if _id is not None
476                    else None
477                )
478            )
479            pass
480        else: ### mssql, sqlite, etc.
481            id_query = f"CREATE INDEX {_id_index_name} ON {_pipe_name} ({_id_name})"
482
483        if id_query is not None:
484            index_queries[_id] = id_query if isinstance(id_query, list) else [id_query]
485
486
487    ### Create indices for other labels in `pipe.columns`.
488    other_indices = {
489        ix_key: ix_unquoted
490        for ix_key, ix_unquoted in pipe.get_indices().items()
491        if ix_key not in ('datetime', 'id')
492    }
493    for ix_key, ix_unquoted in other_indices.items():
494        ix_name = sql_item_name(ix_unquoted, self.flavor, None)
495        col = pipe.columns[ix_key]
496        col_name = sql_item_name(col, self.flavor, None)
497        index_queries[col] = [f"CREATE INDEX {ix_name} ON {_pipe_name} ({col_name})"]
498
499    existing_cols_types = pipe.get_columns_types(debug=debug)
500    indices_cols_str = ', '.join(
501        [
502            sql_item_name(ix, self.flavor)
503            for ix_key, ix in pipe.columns.items()
504            if ix and ix in existing_cols_types
505        ]
506    )
507    coalesce_indices_cols_str = ', '.join(
508        [
509            (
510                "COALESCE("
511                + sql_item_name(ix, self.flavor)
512                + ", "
513                + get_null_replacement(existing_cols_types[ix], self.flavor)
514                + ") "
515            ) if ix_key != 'datetime' else (sql_item_name(ix, self.flavor))
516            for ix_key, ix in pipe.columns.items()
517            if ix and ix in existing_cols_types
518        ]
519    )
520    unique_index_name = sql_item_name(pipe.target + '_unique_index', self.flavor)
521    constraint_name = sql_item_name(pipe.target + '_constraint', self.flavor)
522    add_constraint_query = (
523        f"ALTER TABLE {_pipe_name} ADD CONSTRAINT {constraint_name} UNIQUE ({indices_cols_str})"
524    )
525    unique_index_cols_str = (
526        indices_cols_str
527        if self.flavor not in COALESCE_UNIQUE_INDEX_FLAVORS
528        else coalesce_indices_cols_str
529    )
530    create_unique_index_query = (
531        f"CREATE UNIQUE INDEX {unique_index_name} ON {_pipe_name} ({unique_index_cols_str})"
532    )
533    constraint_queries = [create_unique_index_query]
534    if self.flavor != 'sqlite':
535        constraint_queries.append(add_constraint_query)
536    if upsert and indices_cols_str:
537        index_queries[unique_index_name] = constraint_queries
538    return index_queries

Return a dictionary mapping columns to a CREATE INDEX or equivalent query.

Parameters
  • pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
  • A dictionary of column names mapping to lists of queries.
def get_drop_index_queries( self, pipe: meerschaum.core.Pipe.Pipe, debug: bool = False) -> Dict[str, List[str]]:
541def get_drop_index_queries(
542        self,
543        pipe: mrsm.Pipe,
544        debug: bool = False,
545    ) -> Dict[str, List[str]]:
546    """
547    Return a dictionary mapping columns to a `DROP INDEX` or equivalent query.
548
549    Parameters
550    ----------
551    pipe: mrsm.Pipe
552        The pipe to which the queries will correspond.
553
554    Returns
555    -------
556    A dictionary of column names mapping to lists of queries.
557    """
558    if not pipe.exists(debug=debug):
559        return {}
560    from meerschaum.utils.sql import sql_item_name, table_exists, hypertable_queries
561    drop_queries = {}
562    schema = self.get_pipe_schema(pipe)
563    schema_prefix = (schema + '_') if schema else ''
564    indices = {
565        col: schema_prefix + ix
566        for col, ix in pipe.get_indices().items()
567    }
568    pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))
569    pipe_name_no_schema = sql_item_name(pipe.target, self.flavor, None)
570
571    if self.flavor not in hypertable_queries:
572        is_hypertable = False
573    else:
574        is_hypertable_query = hypertable_queries[self.flavor].format(table_name=pipe_name)
575        is_hypertable = self.value(is_hypertable_query, silent=True, debug=debug) is not None
576
577    if is_hypertable:
578        nuke_queries = []
579        temp_table = '_' + pipe.target + '_temp_migration'
580        temp_table_name = sql_item_name(temp_table, self.flavor, self.get_pipe_schema(pipe))
581
582        if table_exists(temp_table, self, schema=self.get_pipe_schema(pipe), debug=debug):
583            nuke_queries.append(f"DROP TABLE {temp_table_name}")
584        nuke_queries += [
585            f"SELECT * INTO {temp_table_name} FROM {pipe_name}",
586            f"DROP TABLE {pipe_name}",
587            f"ALTER TABLE {temp_table_name} RENAME TO {pipe_name_no_schema}",
588        ]
589        nuke_ix_keys = ('datetime', 'id')
590        nuked = False
591        for ix_key in nuke_ix_keys:
592            if ix_key in indices and not nuked:
593                drop_queries[ix_key] = nuke_queries
594                nuked = True
595
596    drop_queries.update({
597        ix_key: ["DROP INDEX " + sql_item_name(ix_unquoted, self.flavor, None)]
598        for ix_key, ix_unquoted in indices.items()
599        if ix_key not in drop_queries
600    })
601    return drop_queries

Return a dictionary mapping columns to a DROP INDEX or equivalent query.

Parameters
  • pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
  • A dictionary of column names mapping to lists of queries.
def get_add_columns_queries( self, pipe: meerschaum.core.Pipe.Pipe, df: 'Union[pd.DataFrame, Dict[str, str]]', debug: bool = False) -> List[str]:
2392def get_add_columns_queries(
2393        self,
2394        pipe: mrsm.Pipe,
2395        df: Union[pd.DataFrame, Dict[str, str]],
2396        debug: bool = False,
2397    ) -> List[str]:
2398    """
2399    Add new null columns of the correct type to a table from a dataframe.
2400
2401    Parameters
2402    ----------
2403    pipe: mrsm.Pipe
2404        The pipe to be altered.
2405
2406    df: Union[pd.DataFrame, Dict[str, str]]
2407        The pandas DataFrame which contains new columns.
2408        If a dictionary is provided, assume it maps columns to Pandas data types.
2409
2410    Returns
2411    -------
2412    A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector.
2413    """
2414    if not pipe.exists(debug=debug):
2415        return []
2416
2417    from decimal import Decimal
2418    import copy
2419    from meerschaum.utils.sql import (
2420        sql_item_name,
2421        SINGLE_ALTER_TABLE_FLAVORS,
2422    )
2423    from meerschaum.utils.dtypes.sql import (
2424        get_pd_type_from_db_type,
2425        get_db_type_from_pd_type,
2426    )
2427    from meerschaum.utils.misc import flatten_list
2428    table_obj = self.get_pipe_table(pipe, debug=debug)
2429    is_dask = 'dask' in df.__module__ if not isinstance(df, dict) else False
2430    if is_dask:
2431        df = df.partitions[0].compute()
2432    df_cols_types = (
2433        {
2434            col: str(typ)
2435            for col, typ in df.dtypes.items()
2436        }
2437        if not isinstance(df, dict)
2438        else copy.deepcopy(df)
2439    )
2440    if not isinstance(df, dict) and len(df.index) > 0:
2441        for col, typ in list(df_cols_types.items()):
2442            if typ != 'object':
2443                continue
2444            val = df.iloc[0][col]
2445            if isinstance(val, (dict, list)):
2446                df_cols_types[col] = 'json'
2447            elif isinstance(val, Decimal):
2448                df_cols_types[col] = 'numeric'
2449            elif isinstance(val, str):
2450                df_cols_types[col] = 'str'
2451    db_cols_types = {
2452        col: get_pd_type_from_db_type(str(typ.type))
2453        for col, typ in table_obj.columns.items()
2454    }
2455    new_cols = set(df_cols_types) - set(db_cols_types)
2456    if not new_cols:
2457        return []
2458
2459    new_cols_types = {
2460        col: get_db_type_from_pd_type(
2461            df_cols_types[col],
2462            self.flavor
2463        ) for col in new_cols
2464    }
2465
2466    alter_table_query = "ALTER TABLE " + sql_item_name(
2467        pipe.target, self.flavor, self.get_pipe_schema(pipe)
2468    )
2469    queries = []
2470    for col, typ in new_cols_types.items():
2471        add_col_query = (
2472            "\nADD "
2473            + sql_item_name(col, self.flavor, None)
2474            + " " + typ + ","
2475        )
2476
2477        if self.flavor in SINGLE_ALTER_TABLE_FLAVORS:
2478            queries.append(alter_table_query + add_col_query[:-1])
2479        else:
2480            alter_table_query += add_col_query
2481
2482    ### For most flavors, only one query is required.
2483    ### This covers SQLite which requires one query per column.
2484    if not queries:
2485        queries.append(alter_table_query[:-1])
2486
2487    if self.flavor != 'duckdb':
2488        return queries
2489
2490    ### NOTE: For DuckDB, we must drop and rebuild the indices.
2491    drop_index_queries = list(flatten_list(
2492        [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()]
2493    ))
2494    create_index_queries = list(flatten_list(
2495        [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()]
2496    ))
2497
2498    return drop_index_queries + queries + create_index_queries

Add new null columns of the correct type to a table from a dataframe.

Parameters
  • pipe (mrsm.Pipe): The pipe to be altered.
  • df (Union[pd.DataFrame, Dict[str, str]]): The pandas DataFrame which contains new columns. If a dictionary is provided, assume it maps columns to Pandas data types.
Returns
  • A list of the ALTER TABLE SQL query or queries to be executed on the provided connector.
def get_alter_columns_queries( self, pipe: meerschaum.core.Pipe.Pipe, df: 'Union[pd.DataFrame, Dict[str, str]]', debug: bool = False) -> List[str]:
2501def get_alter_columns_queries(
2502        self,
2503        pipe: mrsm.Pipe,
2504        df: Union[pd.DataFrame, Dict[str, str]],
2505        debug: bool = False,
2506    ) -> List[str]:
2507    """
2508    If we encounter a column of a different type, set the entire column to text.
2509    If the altered columns are numeric, alter to numeric instead.
2510
2511    Parameters
2512    ----------
2513    pipe: mrsm.Pipe
2514        The pipe to be altered.
2515
2516    df: Union[pd.DataFrame, Dict[str, str]]
2517        The pandas DataFrame which may contain altered columns.
2518        If a dict is provided, assume it maps columns to Pandas data types.
2519
2520    Returns
2521    -------
2522    A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector.
2523    """
2524    if not pipe.exists(debug=debug):
2525        return []
2526    from meerschaum.utils.sql import sql_item_name
2527    from meerschaum.utils.dataframe import get_numeric_cols
2528    from meerschaum.utils.dtypes import are_dtypes_equal
2529    from meerschaum.utils.dtypes.sql import (
2530        get_pd_type_from_db_type,
2531        get_db_type_from_pd_type,
2532    )
2533    from meerschaum.utils.misc import flatten_list, generate_password, items_str
2534    table_obj = self.get_pipe_table(pipe, debug=debug)
2535    target = pipe.target
2536    session_id = generate_password(3)
2537    numeric_cols = (
2538        get_numeric_cols(df)
2539        if not isinstance(df, dict)
2540        else [
2541            col
2542            for col, typ in df.items()
2543            if typ == 'numeric'
2544        ]
2545    )
2546    df_cols_types = (
2547        {
2548            col: str(typ)
2549            for col, typ in df.dtypes.items()
2550        }
2551        if not isinstance(df, dict)
2552        else df
2553    )
2554    db_cols_types = {
2555        col: get_pd_type_from_db_type(str(typ.type))
2556        for col, typ in table_obj.columns.items()
2557    }
2558    pipe_bool_cols = [col for col, typ in pipe.dtypes.items() if are_dtypes_equal(str(typ), 'bool')]
2559    pd_db_df_aliases = {
2560        'int': 'bool',
2561        'float': 'bool',
2562        'numeric': 'bool',
2563    }
2564    if self.flavor == 'oracle':
2565        pd_db_df_aliases['int'] = 'numeric'
2566
2567    altered_cols = {
2568        col: (db_cols_types.get(col, 'object'), typ)
2569        for col, typ in df_cols_types.items()
2570        if not are_dtypes_equal(typ, db_cols_types.get(col, 'object').lower())
2571        and not are_dtypes_equal(db_cols_types.get(col, 'object'), 'string')
2572    }
2573
2574    ### NOTE: Sometimes bools are coerced into ints or floats.
2575    altered_cols_to_ignore = set()
2576    for col, (db_typ, df_typ) in altered_cols.items():
2577        for db_alias, df_alias in pd_db_df_aliases.items():
2578            if db_alias in db_typ.lower() and df_alias in df_typ.lower():
2579                altered_cols_to_ignore.add(col)
2580
2581    ### Oracle's bool handling sometimes mixes NUMBER and INT.
2582    for bool_col in pipe_bool_cols:
2583        if bool_col not in altered_cols:
2584            continue
2585        db_is_bool_compatible = (
2586            are_dtypes_equal('int', altered_cols[bool_col][0])
2587            or are_dtypes_equal('float', altered_cols[bool_col][0])
2588            or are_dtypes_equal('numeric', altered_cols[bool_col][0])
2589            or are_dtypes_equal('bool', altered_cols[bool_col][0])
2590        )
2591        df_is_bool_compatible = (
2592            are_dtypes_equal('int', altered_cols[bool_col][1])
2593            or are_dtypes_equal('float', altered_cols[bool_col][1])
2594            or are_dtypes_equal('numeric', altered_cols[bool_col][1])
2595            or are_dtypes_equal('bool', altered_cols[bool_col][1])
2596        )
2597        if db_is_bool_compatible and df_is_bool_compatible:
2598            altered_cols_to_ignore.add(bool_col)
2599
2600    for col in altered_cols_to_ignore:
2601        _ = altered_cols.pop(col, None)
2602    if not altered_cols:
2603        return []
2604
2605    if numeric_cols:
2606        pipe.dtypes.update({col: 'numeric' for col in numeric_cols})
2607        edit_success, edit_msg = pipe.edit(debug=debug)
2608        if not edit_success:
2609            warn(
2610                f"Failed to update dtypes for numeric columns {items_str(numeric_cols)}:\n"
2611                + f"{edit_msg}"
2612            )
2613    else:
2614        numeric_cols.extend([col for col, typ in pipe.dtypes.items() if typ == 'numeric'])
2615
2616    pipe_dtypes = pipe.dtypes
2617    numeric_type = get_db_type_from_pd_type('numeric', self.flavor, as_sqlalchemy=False)
2618    text_type = get_db_type_from_pd_type('str', self.flavor, as_sqlalchemy=False)
2619    altered_cols_types = {
2620        col: (
2621            numeric_type
2622            if col in numeric_cols
2623            else text_type
2624        )
2625        for col, (db_typ, typ) in altered_cols.items()
2626    }
2627
2628    if self.flavor == 'sqlite':
2629        temp_table_name = '-' + session_id + '_' + target
2630        rename_query = (
2631            "ALTER TABLE "
2632            + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
2633            + " RENAME TO "
2634            + sql_item_name(temp_table_name, self.flavor, None)
2635        )
2636        create_query = (
2637            "CREATE TABLE "
2638            + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
2639            + " (\n"
2640        )
2641        for col_name, col_obj in table_obj.columns.items():
2642            create_query += (
2643                sql_item_name(col_name, self.flavor, None)
2644                + " "
2645                + (
2646                    str(col_obj.type)
2647                    if col_name not in altered_cols
2648                    else altered_cols_types[col_name]
2649                )
2650                + ",\n"
2651            )
2652        create_query = create_query[:-2] + "\n)"
2653
2654        insert_query = (
2655            "INSERT INTO "
2656            + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
2657            + ' ('
2658            + ', '.join([
2659                sql_item_name(col_name, self.flavor, None)
2660                for col_name, _ in table_obj.columns.items()
2661            ])
2662            + ')'
2663            + "\nSELECT\n"
2664        )
2665        for col_name, col_obj in table_obj.columns.items():
2666            new_col_str = (
2667                sql_item_name(col_name, self.flavor, None)
2668                if col_name not in altered_cols
2669                else (
2670                    "CAST("
2671                    + sql_item_name(col_name, self.flavor, None)
2672                    + " AS "
2673                    + altered_cols_types[col_name]
2674                    + ")"
2675                )
2676            )
2677            insert_query += new_col_str + ",\n"
2678        insert_query = insert_query[:-2] + (
2679            f"\nFROM {sql_item_name(temp_table_name, self.flavor, self.get_pipe_schema(pipe))}"
2680        )
2681
2682        drop_query = "DROP TABLE " + sql_item_name(
2683            temp_table_name, self.flavor, self.get_pipe_schema(pipe)
2684        )
2685        return [
2686            rename_query,
2687            create_query,
2688            insert_query,
2689            drop_query,
2690        ]
2691
2692    queries = []
2693    if self.flavor == 'oracle':
2694        for col, typ in altered_cols_types.items():
2695            add_query = (
2696                "ALTER TABLE "
2697                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
2698                + "\nADD " + sql_item_name(col + '_temp', self.flavor, None)
2699                + " " + typ
2700            )
2701            queries.append(add_query)
2702
2703        for col, typ in altered_cols_types.items():
2704            populate_temp_query = (
2705                "UPDATE "
2706                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
2707                + "\nSET " + sql_item_name(col + '_temp', self.flavor, None)
2708                + ' = ' + sql_item_name(col, self.flavor, None)
2709            )
2710            queries.append(populate_temp_query)
2711
2712        for col, typ in altered_cols_types.items():
2713            set_old_cols_to_null_query = (
2714                "UPDATE "
2715                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
2716                + "\nSET " + sql_item_name(col, self.flavor, None)
2717                + ' = NULL'
2718            )
2719            queries.append(set_old_cols_to_null_query)
2720
2721        for col, typ in altered_cols_types.items():
2722            alter_type_query = (
2723                "ALTER TABLE "
2724                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
2725                + "\nMODIFY " + sql_item_name(col, self.flavor, None) + ' '
2726                + typ
2727            )
2728            queries.append(alter_type_query)
2729
2730        for col, typ in altered_cols_types.items():
2731            set_old_to_temp_query = (
2732                "UPDATE "
2733                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
2734                + "\nSET " + sql_item_name(col, self.flavor, None)
2735                + ' = ' + sql_item_name(col + '_temp', self.flavor, None)
2736            )
2737            queries.append(set_old_to_temp_query)
2738
2739        for col, typ in altered_cols_types.items():
2740            drop_temp_query = (
2741                "ALTER TABLE "
2742                + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
2743                + "\nDROP COLUMN " + sql_item_name(col + '_temp', self.flavor, None)
2744            )
2745            queries.append(drop_temp_query)
2746
2747        return queries
2748
2749
2750    query = "ALTER TABLE " + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe))
2751    for col, typ in altered_cols_types.items():
2752        alter_col_prefix = (
2753            'ALTER' if self.flavor not in ('mysql', 'mariadb', 'oracle')
2754            else 'MODIFY'
2755        )
2756        type_prefix = (
2757            '' if self.flavor in ('mssql', 'mariadb', 'mysql')
2758            else 'TYPE '
2759        )
2760        column_str = 'COLUMN' if self.flavor != 'oracle' else ''
2761        query += (
2762            f"\n{alter_col_prefix} {column_str} "
2763            + sql_item_name(col, self.flavor, None)
2764            + " " + type_prefix + typ + ","
2765        )
2766
2767    query = query[:-1]
2768    queries.append(query)
2769    if self.flavor != 'duckdb':
2770        return queries
2771
2772    drop_index_queries = list(flatten_list(
2773        [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()]
2774    ))
2775    create_index_queries = list(flatten_list(
2776        [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()]
2777    ))
2778
2779    return drop_index_queries + queries + create_index_queries

If we encounter a column of a different type, set the entire column to text. If the altered columns are numeric, alter to numeric instead.

Parameters
  • pipe (mrsm.Pipe): The pipe to be altered.
  • df (Union[pd.DataFrame, Dict[str, str]]): The pandas DataFrame which may contain altered columns. If a dict is provided, assume it maps columns to Pandas data types.
Returns
  • A list of the ALTER TABLE SQL query or queries to be executed on the provided connector.
def delete_pipe( self, pipe: meerschaum.core.Pipe.Pipe, debug: bool = False) -> Tuple[bool, str]:
604def delete_pipe(
605        self,
606        pipe: mrsm.Pipe,
607        debug: bool = False,
608    ) -> SuccessTuple:
609    """
610    Delete a Pipe's registration.
611    """
612    from meerschaum.utils.sql import sql_item_name
613    from meerschaum.utils.debug import dprint
614    from meerschaum.utils.packages import attempt_import
615    sqlalchemy = attempt_import('sqlalchemy')
616
617    if not pipe.id:
618        return False, f"{pipe} is not registered."
619
620    ### ensure pipes table exists
621    from meerschaum.connectors.sql.tables import get_tables
622    pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes']
623
624    q = sqlalchemy.delete(pipes_tbl).where(pipes_tbl.c.pipe_id == pipe.id)
625    if not self.exec(q, debug=debug):
626        return False, f"Failed to delete registration for {pipe}."
627
628    return True, "Success"

Delete a Pipe's registration.

def get_pipe_data( self, pipe: meerschaum.core.Pipe.Pipe, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[datetime.datetime, str, NoneType] = None, end: Union[datetime.datetime, str, NoneType] = None, params: Optional[Dict[str, Any]] = None, order: str = 'asc', limit: Optional[int] = None, begin_add_minutes: int = 0, end_add_minutes: int = 0, debug: bool = False, **kw: Any) -> 'Union[pd.DataFrame, None]':
631def get_pipe_data(
632        self,
633        pipe: mrsm.Pipe,
634        select_columns: Optional[List[str]] = None,
635        omit_columns: Optional[List[str]] = None,
636        begin: Union[datetime, str, None] = None,
637        end: Union[datetime, str, None] = None,
638        params: Optional[Dict[str, Any]] = None,
639        order: str = 'asc',
640        limit: Optional[int] = None,
641        begin_add_minutes: int = 0,
642        end_add_minutes: int = 0,
643        debug: bool = False,
644        **kw: Any
645    ) -> Union[pd.DataFrame, None]:
646    """
647    Access a pipe's data from the SQL instance.
648
649    Parameters
650    ----------
651    pipe: mrsm.Pipe:
652        The pipe to get data from.
653
654    select_columns: Optional[List[str]], default None
655        If provided, only select these given columns.
656        Otherwise select all available columns (i.e. `SELECT *`).
657
658    omit_columns: Optional[List[str]], default None
659        If provided, remove these columns from the selection.
660
661    begin: Union[datetime, str, None], default None
662        If provided, get rows newer than or equal to this value.
663
664    end: Union[datetime, str, None], default None
665        If provided, get rows older than or equal to this value.
666
667    params: Optional[Dict[str, Any]], default None
668        Additional parameters to filter by.
669        See `meerschaum.connectors.sql.build_where`.
670
671    order: Optional[str], default 'asc'
672        The selection order for all of the indices in the query.
673        If `None`, omit the `ORDER BY` clause.
674
675    limit: Optional[int], default None
676        If specified, limit the number of rows retrieved to this value.
677
678    begin_add_minutes: int, default 0
679        The number of minutes to add to the `begin` datetime (i.e. `DATEADD`.
680
681    end_add_minutes: int, default 0
682        The number of minutes to add to the `end` datetime (i.e. `DATEADD`.
683
684    chunksize: Optional[int], default -1
685        The size of dataframe chunks to load into memory.
686
687    debug: bool, default False
688        Verbosity toggle.
689
690    Returns
691    -------
692    A `pd.DataFrame` of the pipe's data.
693
694    """
695    import json
696    from meerschaum.utils.sql import sql_item_name
697    from meerschaum.utils.misc import parse_df_datetimes, to_pandas_dtype
698    from meerschaum.utils.packages import import_pandas
699    from meerschaum.utils.dtypes import attempt_cast_to_numeric
700    pd = import_pandas()
701    is_dask = 'dask' in pd.__name__
702
703    dtypes = pipe.dtypes
704    if dtypes:
705        if self.flavor == 'sqlite':
706            if not pipe.columns.get('datetime', None):
707                _dt = pipe.guess_datetime()
708                dt = sql_item_name(_dt, self.flavor, None) if _dt else None
709                is_guess = True
710            else:
711                _dt = pipe.get_columns('datetime')
712                dt = sql_item_name(_dt, self.flavor, None)
713                is_guess = False
714
715            if _dt:
716                dt_type = dtypes.get(_dt, 'object').lower()
717                if 'datetime' not in dt_type:
718                    if 'int' not in dt_type:
719                        dtypes[_dt] = 'datetime64[ns]'
720    existing_cols = pipe.get_columns_types(debug=debug)
721    select_columns = (
722        [
723            col
724            for col in existing_cols
725            if col not in (omit_columns or [])
726        ]
727        if not select_columns
728        else [
729            col
730            for col in select_columns
731            if col in existing_cols
732            and col not in (omit_columns or [])
733        ]
734    )
735    if select_columns:
736        dtypes = {col: typ for col, typ in dtypes.items() if col in select_columns}
737    dtypes = {
738        col: to_pandas_dtype(typ)
739        for col, typ in dtypes.items()
740        if col in select_columns and col not in (omit_columns or [])
741    }
742    query = self.get_pipe_data_query(
743        pipe,
744        select_columns = select_columns,
745        omit_columns = omit_columns,
746        begin = begin,
747        end = end,
748        params = params,
749        order = order,
750        limit = limit,
751        begin_add_minutes = begin_add_minutes,
752        end_add_minutes = end_add_minutes,
753        debug = debug,
754        **kw
755    )
756
757    if is_dask:
758        index_col = pipe.columns.get('datetime', None)
759        kw['index_col'] = index_col
760
761    numeric_columns = [
762        col
763        for col, typ in pipe.dtypes.items()
764        if typ == 'numeric' and col in dtypes
765    ]
766    kw['coerce_float'] = kw.get('coerce_float', (len(numeric_columns) == 0))
767    df = self.read(
768        query,
769        dtype = dtypes,
770        debug = debug,
771        **kw
772    )
773    for col in numeric_columns:
774        if col not in df.columns:
775            continue
776        df[col] = df[col].apply(attempt_cast_to_numeric)
777
778    if self.flavor == 'sqlite':
779        ### NOTE: We have to consume the iterator here to ensure that datetimes are parsed correctly
780        df = (
781            parse_df_datetimes(
782                df,
783                ignore_cols = [
784                    col
785                    for col, dtype in pipe.dtypes.items()
786                    if 'datetime' not in str(dtype)
787                ],
788                chunksize = kw.get('chunksize', None),
789                debug = debug,
790            ) if isinstance(df, pd.DataFrame) else (
791                [
792                    parse_df_datetimes(
793                        c,
794                        ignore_cols = [
795                            col
796                            for col, dtype in pipe.dtypes.items()
797                            if 'datetime' not in str(dtype)
798                        ],
799                        chunksize = kw.get('chunksize', None),
800                        debug = debug,
801                    )
802                    for c in df
803                ]
804            )
805        )
806        for col, typ in dtypes.items():
807            if typ != 'json':
808                continue
809            df[col] = df[col].apply(lambda x: json.loads(x) if x is not None else x)
810    return df

Access a pipe's data from the SQL instance.

Parameters
  • pipe (mrsm.Pipe:): The pipe to get data from.
  • select_columns (Optional[List[str]], default None): If provided, only select these given columns. Otherwise select all available columns (i.e. SELECT *).
  • omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
  • begin (Union[datetime, str, None], default None): If provided, get rows newer than or equal to this value.
  • end (Union[datetime, str, None], default None): If provided, get rows older than or equal to this value.
  • params (Optional[Dict[str, Any]], default None): Additional parameters to filter by. See meerschaum.connectors.sql.build_where.
  • order (Optional[str], default 'asc'): The selection order for all of the indices in the query. If None, omit the ORDER BY clause.
  • limit (Optional[int], default None): If specified, limit the number of rows retrieved to this value.
  • begin_add_minutes (int, default 0): The number of minutes to add to the begin datetime (i.e. DATEADD.
  • end_add_minutes (int, default 0): The number of minutes to add to the end datetime (i.e. DATEADD.
  • chunksize (Optional[int], default -1): The size of dataframe chunks to load into memory.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pd.DataFrame of the pipe's data.
def get_pipe_data_query( self, pipe: meerschaum.core.Pipe.Pipe, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[datetime.datetime, int, str, NoneType] = None, end: Union[datetime.datetime, int, str, NoneType] = None, params: Optional[Dict[str, Any]] = None, order: str = 'asc', limit: Optional[int] = None, begin_add_minutes: int = 0, end_add_minutes: int = 0, replace_nulls: Optional[str] = None, debug: bool = False, **kw: Any) -> Optional[str]:
 813def get_pipe_data_query(
 814        self,
 815        pipe: mrsm.Pipe,
 816        select_columns: Optional[List[str]] = None,
 817        omit_columns: Optional[List[str]] = None,
 818        begin: Union[datetime, int, str, None] = None,
 819        end: Union[datetime, int, str, None] = None,
 820        params: Optional[Dict[str, Any]] = None,
 821        order: str = 'asc',
 822        limit: Optional[int] = None,
 823        begin_add_minutes: int = 0,
 824        end_add_minutes: int = 0,
 825        replace_nulls: Optional[str] = None,
 826        debug: bool = False,
 827        **kw: Any
 828    ) -> Union[str, None]:
 829    """
 830    Return the `SELECT` query for retrieving a pipe's data from its instance.
 831
 832    Parameters
 833    ----------
 834    pipe: mrsm.Pipe:
 835        The pipe to get data from.
 836
 837    select_columns: Optional[List[str]], default None
 838        If provided, only select these given columns.
 839        Otherwise select all available columns (i.e. `SELECT *`).
 840
 841    omit_columns: Optional[List[str]], default None
 842        If provided, remove these columns from the selection.
 843
 844    begin: Union[datetime, int, str, None], default None
 845        If provided, get rows newer than or equal to this value.
 846
 847    end: Union[datetime, str, None], default None
 848        If provided, get rows older than or equal to this value.
 849
 850    params: Optional[Dict[str, Any]], default None
 851        Additional parameters to filter by.
 852        See `meerschaum.connectors.sql.build_where`.
 853
 854    order: Optional[str], default 'asc'
 855        The selection order for all of the indices in the query.
 856        If `None`, omit the `ORDER BY` clause.
 857
 858    limit: Optional[int], default None
 859        If specified, limit the number of rows retrieved to this value.
 860
 861    begin_add_minutes: int, default 0
 862        The number of minutes to add to the `begin` datetime (i.e. `DATEADD`).
 863
 864    end_add_minutes: int, default 0
 865        The number of minutes to add to the `end` datetime (i.e. `DATEADD`).
 866
 867    chunksize: Optional[int], default -1
 868        The size of dataframe chunks to load into memory.
 869
 870    replace_nulls: Optional[str], default None
 871        If provided, replace null values with this value.
 872
 873    debug: bool, default False
 874        Verbosity toggle.
 875
 876    Returns
 877    -------
 878    A `SELECT` query to retrieve a pipe's data.
 879    """
 880    import json
 881    from meerschaum.utils.debug import dprint
 882    from meerschaum.utils.misc import items_str
 883    from meerschaum.utils.sql import sql_item_name, dateadd_str
 884    from meerschaum.utils.packages import import_pandas
 885    pd = import_pandas()
 886    existing_cols = pipe.get_columns_types(debug=debug)
 887    select_columns = (
 888        [col for col in existing_cols]
 889        if not select_columns
 890        else [col for col in select_columns if col in existing_cols]
 891    )
 892    if omit_columns:
 893        select_columns = [col for col in select_columns if col not in omit_columns]
 894
 895    if begin == '':
 896        begin = pipe.get_sync_time(debug=debug)
 897        backtrack_interval = pipe.get_backtrack_interval(debug=debug)
 898        if begin is not None:
 899            begin -= backtrack_interval
 900
 901    cols_names = [sql_item_name(col, self.flavor, None) for col in select_columns]
 902    select_cols_str = (
 903        'SELECT\n'
 904        + ',\n    '.join(
 905            [
 906                (
 907                    col_name
 908                    if not replace_nulls
 909                    else f"COALESCE(col_name, '{replace_nulls}') AS {col_name}"
 910                )
 911                for col_name in cols_names
 912            ]
 913        )
 914    )
 915    pipe_table_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))
 916    query = f"{select_cols_str}\nFROM {pipe_table_name}"
 917    where = ""
 918
 919    if order is not None:
 920        default_order = 'asc'
 921        if order not in ('asc', 'desc'):
 922            warn(f"Ignoring unsupported order '{order}'. Falling back to '{default_order}'.")
 923            order = default_order
 924        order = order.upper()
 925
 926    if not pipe.columns.get('datetime', None):
 927        _dt = pipe.guess_datetime()
 928        dt = sql_item_name(_dt, self.flavor, None) if _dt else None
 929        is_guess = True
 930    else:
 931        _dt = pipe.get_columns('datetime')
 932        dt = sql_item_name(_dt, self.flavor, None)
 933        is_guess = False
 934
 935    quoted_indices = {
 936        key: sql_item_name(val, self.flavor, None)
 937        for key, val in pipe.columns.items()
 938        if val in existing_cols
 939    }
 940
 941    if begin is not None or end is not None:
 942        if is_guess:
 943            if _dt is None:
 944                warn(
 945                    f"No datetime could be determined for {pipe}."
 946                    + "\n    Ignoring begin and end...",
 947                    stack = False,
 948                )
 949                begin, end = None, None
 950            else:
 951                warn(
 952                    f"A datetime wasn't specified for {pipe}.\n"
 953                    + f"    Using column \"{_dt}\" for datetime bounds...",
 954                    stack = False,
 955                )
 956
 957    is_dt_bound = False
 958    if begin is not None and _dt in existing_cols:
 959        begin_da = dateadd_str(
 960            flavor = self.flavor,
 961            datepart = 'minute',
 962            number = begin_add_minutes,
 963            begin = begin
 964        )
 965        where += f"{dt} >= {begin_da}" + (" AND " if end is not None else "")
 966        is_dt_bound = True
 967
 968    if end is not None and _dt in existing_cols:
 969        if 'int' in str(type(end)).lower() and end == begin:
 970            end += 1
 971        end_da = dateadd_str(
 972            flavor = self.flavor,
 973            datepart = 'minute',
 974            number = end_add_minutes,
 975            begin = end
 976        )
 977        where += f"{dt} < {end_da}"
 978        is_dt_bound = True
 979
 980    if params is not None:
 981        from meerschaum.utils.sql import build_where
 982        valid_params = {k: v for k, v in params.items() if k in existing_cols}
 983        if valid_params:
 984            where += build_where(valid_params, self).replace(
 985                'WHERE', ('AND' if is_dt_bound else "")
 986            )
 987
 988    if len(where) > 0:
 989        query += "\nWHERE " + where