Module meerschaum.connectors

Create connectors with get_connector(). For ease of use, you can also import from the root meerschaum module:

>>> from meerschaum import get_connector
>>> conn = get_connector()
Expand source code
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8

"""
Create connectors with `meerschaum.connectors.get_connector()`.
For ease of use, you can also import from the root `meerschaum` module:
```
>>> from meerschaum import get_connector
>>> conn = get_connector()
```
"""

from __future__ import annotations

from meerschaum.utils.typing import Any, SuccessTuple, Union, Optional, List, Dict
from meerschaum.utils.threading import Lock, RLock
from meerschaum.utils.warnings import error, warn

from meerschaum.connectors.Connector import Connector, InvalidAttributesError
from meerschaum.connectors.sql.SQLConnector import SQLConnector
from meerschaum.connectors.api.APIConnector import APIConnector
from meerschaum.connectors.sql._create_engine import flavor_configs as sql_flavor_configs

__all__ = ("Connector", "SQLConnector", "APIConnector", "get_connector", "is_connected")

### store connectors partitioned by
### type, label for reuse
connectors: Dict[str, Dict[str, Connector]] = {
    'api'   : {},
    'sql'   : {},
    'plugin': {},
}
instance_types: List[str] = ['sql', 'api']
_locks: Dict[str, RLock] = {
    'connectors'               : RLock(),
    'types'                    : RLock(),
    'custom_types'             : RLock(),
    '_loaded_plugin_connectors': RLock(),
    'instance_types'           : RLock(),
}
attributes: Dict[str, Dict[str, Any]] = {
    'api': {
        'required': [
            'host',
            'username',
            'password'
        ],
        'default': {
            'protocol': 'http',
            'port'    : 8000,
        },
    },
    'sql': {
        'flavors': sql_flavor_configs,
    },
}
### Fill this with objects only when connectors are first referenced.
types: Dict[str, Any] = {}
custom_types: set = set()
_loaded_plugin_connectors: bool = False


def get_connector(
        type: str = None,
        label: str = None,
        refresh: bool = False,
        debug: bool = False,
        **kw: Any
    ) -> Connector:
    """
    Return existing connector or create new connection and store for reuse.
    
    You can create new connectors if enough parameters are provided for the given type and flavor.
    

    Parameters
    ----------
    type: Optional[str], default None
        Connector type (sql, api, etc.).
        Defaults to the type of the configured `instance_connector`.

    label: Optional[str], default None
        Connector label (e.g. main). Defaults to `'main'`.

    refresh: bool, default False
        Refresh the Connector instance / construct new object. Defaults to `False`.

    kw: Any
        Other arguments to pass to the Connector constructor.
        If the Connector has already been constructed and new arguments are provided,
        `refresh` is set to `True` and the old Connector is replaced.

    Returns
    -------
    A new Meerschaum connector (e.g. `meerschaum.connectors.api.APIConnector`,
    `meerschaum.connectors.sql.SQLConnector`).
    
    Examples
    --------
    The following parameters would create a new
    `meerschaum.connectors.sql.SQLConnector` that isn't in the configuration file.

    ```
    >>> conn = get_connector(
    ...     type = 'sql',
    ...     label = 'newlabel',
    ...     flavor = 'sqlite',
    ...     database = '/file/path/to/database.db'
    ... )
    >>>
    ```

    """
    from meerschaum.connectors.parse import parse_instance_keys
    from meerschaum.config import get_config
    from meerschaum.config.static import STATIC_CONFIG
    from meerschaum.utils.warnings import warn
    global _loaded_plugin_connectors
    if isinstance(type, str) and not label and ':' in type:
        type, label = type.split(':', maxsplit=1)
    with _locks['_loaded_plugin_connectors']:
        if not _loaded_plugin_connectors:
            load_plugin_connectors()
            _loaded_plugin_connectors = True
    if type is None and label is None:
        default_instance_keys = get_config('meerschaum', 'instance', patch=True)
        ### recursive call to get_connector
        return parse_instance_keys(default_instance_keys)

    ### NOTE: the default instance connector may not be main.
    ### Only fall back to 'main' if the type is provided by the label is omitted.
    label = label if label is not None else STATIC_CONFIG['connectors']['default_label']

    ### type might actually be a label. Check if so and raise a warning.
    if type not in connectors:
        possibilities, poss_msg = [], ""
        for _type in get_config('meerschaum', 'connectors'):
            if type in get_config('meerschaum', 'connectors', _type):
                possibilities.append(f"{_type}:{type}")
        if len(possibilities) > 0:
            poss_msg = " Did you mean"
            for poss in possibilities[:-1]:
                poss_msg += f" '{poss}',"
            if poss_msg.endswith(','):
                poss_msg = poss_msg[:-1]
            if len(possibilities) > 1:
                poss_msg += " or"
            poss_msg += f" '{possibilities[-1]}'?"

        warn(f"Cannot create Connector of type '{type}'." + poss_msg, stack=False)
        return None

    if 'sql' not in types:
        from meerschaum.connectors.plugin import PluginConnector
        with _locks['types']:
            types.update({
                'api'   : APIConnector,
                'sql'   : SQLConnector,
                'plugin': PluginConnector,
            })
    
    ### determine if we need to call the constructor
    if not refresh:
        ### see if any user-supplied arguments differ from the existing instance
        if label in connectors[type]:
            warning_message = None
            for attribute, value in kw.items():
                if attribute not in connectors[type][label].meta:
                    import inspect
                    cls = connectors[type][label].__class__
                    cls_init_signature = inspect.signature(cls)
                    cls_init_params = cls_init_signature.parameters
                    if attribute not in cls_init_params:
                        warning_message = (
                            f"Received new attribute '{attribute}' not present in connector " +
                            f"{connectors[type][label]}.\n"
                        )
                elif connectors[type][label].__dict__[attribute] != value:
                    warning_message = (
                        f"Mismatched values for attribute '{attribute}' in connector "
                        + f"'{connectors[type][label]}'.\n" +
                        f"  - Keyword value: '{value}'\n" +
                        f"  - Existing value: '{connectors[type][label].__dict__[attribute]}'\n"
                    )
            if warning_message is not None:
                warning_message += (
                    "\nSetting `refresh` to True and recreating connector with type:"
                    + f" '{type}' and label '{label}'."
                )
                refresh = True
                warn(warning_message)
        else: ### connector doesn't yet exist
            refresh = True

    ### only create an object if refresh is True
    ### (can be manually specified, otherwise determined above)
    if refresh:
        with _locks['connectors']:
            try:
                ### will raise an error if configuration is incorrect / missing
                conn = types[type](label=label, **kw)
                connectors[type][label] = conn
            except InvalidAttributesError as ie:
                warn(
                    f"Incorrect attributes for connector '{type}:{label}'.\n"
                    + str(ie),
                    stack = False,
                )
                conn = None
            except Exception as e:
                from meerschaum.utils.formatting import get_console
                console = get_console()
                if console:
                    console.print_exception()
                warn(
                    f"Exception when creating connector '{type}:{label}'.\n" + str(e),
                    stack = False,
                )
                conn = None
        if conn is None:
            return None

    return connectors[type][label]


def is_connected(keys: str, **kw) -> bool:
    """
    Check if the connector keys correspond to an active connection.
    If the connector has not been created, it will immediately return `False`.
    If the connector exists but cannot communicate with the source, return `False`.
    
    **NOTE:** Only works with instance connectors (`SQLConnectors` and `APIConnectors`).
    Keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`.

    Parameters
    ----------
    keys:
        The keys to the connector (e.g. `'sql:main'`).
        
    Returns
    -------
    A `bool` corresponding to whether a successful connection may be made.

    """
    import warnings
    if ':' not in keys:
        warn(f"Invalid connector keys '{keys}'")

    try:
        typ, label = keys.split(':')
    except Exception as e:
        return False
    if typ not in instance_types:
        return False
    if not (label in connectors.get(typ, {})):
        return False

    from meerschaum.connectors.parse import parse_instance_keys
    conn = parse_instance_keys(keys)
    try:
        with warnings.catch_warnings():
            warnings.filterwarnings('ignore')
            return conn.test_connection(**kw)
    except Exception as e:
        return False


def make_connector(
        cls,
    ):
    """
    Register a class as a `Connector`.
    The `type` will be the lower case of the class name, without the suffix `connector`.

    Parameters
    ----------
    instance: bool, default False
        If `True`, make this connector type an instance connector.
        This requires implementing the various pipes functions and lots of testing.

    Examples
    --------
    >>> import meerschaum as mrsm
    >>> from meerschaum.connectors import make_connector, Connector
    >>> class FooConnector(Connector):
    ...     def __init__(self, label: str, **kw):
    ...         super().__init__('foo', label, **kw)
    ... 
    >>> make_connector(FooConnector)
    >>> mrsm.get_connector('foo', 'bar')
    foo:bar
    >>> 
    """
    import re
    typ = re.sub(r'connector$', '', cls.__name__.lower())
    with _locks['types']:
        types[typ] = cls
    with _locks['custom_types']:
        custom_types.add(typ)
    with _locks['connectors']:
        if typ not in connectors:
            connectors[typ] = {}
    if getattr(cls, 'IS_INSTANCE', False):
        with _locks['instance_types']:
            if typ not in instance_types:
                instance_types.append(typ)

    return cls


def load_plugin_connectors():
    """
    If a plugin makes use of the `make_connector` decorator,
    load its module.
    """
    from meerschaum.plugins import get_plugins, import_plugins
    to_import = []
    for plugin in get_plugins():
        with open(plugin.__file__, encoding='utf-8') as f:
            text = f.read()
        if 'make_connector' in text:
            to_import.append(plugin.name)
    if not to_import:
        return
    import_plugins(*to_import) 


def get_connector_plugin(
        connector: Connector,
    ) -> Union[str, None, 'meerschaum.Plugin']:
    """
    Determine the plugin for a connector.
    This is useful for handling virtual environments for custom instance connectors.

    Parameters
    ----------
    connector: Connector
        The connector which may require a virtual environment.

    Returns
    -------
    A Plugin, 'mrsm', or None.
    """
    if not hasattr(connector, 'type'):
        return None
    from meerschaum import Plugin
    plugin_name = (
        connector.__module__.replace('plugins.', '').split('.')[0]
        if connector.type in custom_types else (
            connector.label
            if connector.type == 'plugin'
            else 'mrsm'
        )
    )
    plugin = Plugin(plugin_name)
    return plugin if plugin.is_installed() else None

Sub-modules

meerschaum.connectors.api

Interact with the Meerschaum API (send commands, pull data, etc.)

meerschaum.connectors.parse

Utility functions for parsing connector keys.

meerschaum.connectors.plugin

Allow pipes to source data from installed plugins.

meerschaum.connectors.poll

Poll database and API connections.

meerschaum.connectors.sql

Subpackage for SQLConnector subclass

Functions

def get_connector(type: str = None, label: str = None, refresh: bool = False, debug: bool = False, **kw: Any) ‑> meerschaum.connectors.Connector.Connector

Return existing connector or create new connection and store for reuse.

You can create new connectors if enough parameters are provided for the given type and flavor.

Parameters

type : Optional[str], default None
Connector type (sql, api, etc.). Defaults to the type of the configured instance_connector.
label : Optional[str], default None
Connector label (e.g. main). Defaults to 'main'.
refresh : bool, default False
Refresh the Connector instance / construct new object. Defaults to False.
kw : Any
Other arguments to pass to the Connector constructor. If the Connector has already been constructed and new arguments are provided, refresh is set to True and the old Connector is replaced.

Returns

A new Meerschaum connector (e.g. meerschaum.connectors.api.APIConnector, meerschaum.connectors.sql.SQLConnector).

Examples

The following parameters would create a new meerschaum.connectors.sql.SQLConnector that isn't in the configuration file.

>>> conn = get_connector(
...     type = 'sql',
...     label = 'newlabel',
...     flavor = 'sqlite',
...     database = '/file/path/to/database.db'
... )
>>>
Expand source code
def get_connector(
        type: str = None,
        label: str = None,
        refresh: bool = False,
        debug: bool = False,
        **kw: Any
    ) -> Connector:
    """
    Return existing connector or create new connection and store for reuse.
    
    You can create new connectors if enough parameters are provided for the given type and flavor.
    

    Parameters
    ----------
    type: Optional[str], default None
        Connector type (sql, api, etc.).
        Defaults to the type of the configured `instance_connector`.

    label: Optional[str], default None
        Connector label (e.g. main). Defaults to `'main'`.

    refresh: bool, default False
        Refresh the Connector instance / construct new object. Defaults to `False`.

    kw: Any
        Other arguments to pass to the Connector constructor.
        If the Connector has already been constructed and new arguments are provided,
        `refresh` is set to `True` and the old Connector is replaced.

    Returns
    -------
    A new Meerschaum connector (e.g. `meerschaum.connectors.api.APIConnector`,
    `meerschaum.connectors.sql.SQLConnector`).
    
    Examples
    --------
    The following parameters would create a new
    `meerschaum.connectors.sql.SQLConnector` that isn't in the configuration file.

    ```
    >>> conn = get_connector(
    ...     type = 'sql',
    ...     label = 'newlabel',
    ...     flavor = 'sqlite',
    ...     database = '/file/path/to/database.db'
    ... )
    >>>
    ```

    """
    from meerschaum.connectors.parse import parse_instance_keys
    from meerschaum.config import get_config
    from meerschaum.config.static import STATIC_CONFIG
    from meerschaum.utils.warnings import warn
    global _loaded_plugin_connectors
    if isinstance(type, str) and not label and ':' in type:
        type, label = type.split(':', maxsplit=1)
    with _locks['_loaded_plugin_connectors']:
        if not _loaded_plugin_connectors:
            load_plugin_connectors()
            _loaded_plugin_connectors = True
    if type is None and label is None:
        default_instance_keys = get_config('meerschaum', 'instance', patch=True)
        ### recursive call to get_connector
        return parse_instance_keys(default_instance_keys)

    ### NOTE: the default instance connector may not be main.
    ### Only fall back to 'main' if the type is provided by the label is omitted.
    label = label if label is not None else STATIC_CONFIG['connectors']['default_label']

    ### type might actually be a label. Check if so and raise a warning.
    if type not in connectors:
        possibilities, poss_msg = [], ""
        for _type in get_config('meerschaum', 'connectors'):
            if type in get_config('meerschaum', 'connectors', _type):
                possibilities.append(f"{_type}:{type}")
        if len(possibilities) > 0:
            poss_msg = " Did you mean"
            for poss in possibilities[:-1]:
                poss_msg += f" '{poss}',"
            if poss_msg.endswith(','):
                poss_msg = poss_msg[:-1]
            if len(possibilities) > 1:
                poss_msg += " or"
            poss_msg += f" '{possibilities[-1]}'?"

        warn(f"Cannot create Connector of type '{type}'." + poss_msg, stack=False)
        return None

    if 'sql' not in types:
        from meerschaum.connectors.plugin import PluginConnector
        with _locks['types']:
            types.update({
                'api'   : APIConnector,
                'sql'   : SQLConnector,
                'plugin': PluginConnector,
            })
    
    ### determine if we need to call the constructor
    if not refresh:
        ### see if any user-supplied arguments differ from the existing instance
        if label in connectors[type]:
            warning_message = None
            for attribute, value in kw.items():
                if attribute not in connectors[type][label].meta:
                    import inspect
                    cls = connectors[type][label].__class__
                    cls_init_signature = inspect.signature(cls)
                    cls_init_params = cls_init_signature.parameters
                    if attribute not in cls_init_params:
                        warning_message = (
                            f"Received new attribute '{attribute}' not present in connector " +
                            f"{connectors[type][label]}.\n"
                        )
                elif connectors[type][label].__dict__[attribute] != value:
                    warning_message = (
                        f"Mismatched values for attribute '{attribute}' in connector "
                        + f"'{connectors[type][label]}'.\n" +
                        f"  - Keyword value: '{value}'\n" +
                        f"  - Existing value: '{connectors[type][label].__dict__[attribute]}'\n"
                    )
            if warning_message is not None:
                warning_message += (
                    "\nSetting `refresh` to True and recreating connector with type:"
                    + f" '{type}' and label '{label}'."
                )
                refresh = True
                warn(warning_message)
        else: ### connector doesn't yet exist
            refresh = True

    ### only create an object if refresh is True
    ### (can be manually specified, otherwise determined above)
    if refresh:
        with _locks['connectors']:
            try:
                ### will raise an error if configuration is incorrect / missing
                conn = types[type](label=label, **kw)
                connectors[type][label] = conn
            except InvalidAttributesError as ie:
                warn(
                    f"Incorrect attributes for connector '{type}:{label}'.\n"
                    + str(ie),
                    stack = False,
                )
                conn = None
            except Exception as e:
                from meerschaum.utils.formatting import get_console
                console = get_console()
                if console:
                    console.print_exception()
                warn(
                    f"Exception when creating connector '{type}:{label}'.\n" + str(e),
                    stack = False,
                )
                conn = None
        if conn is None:
            return None

    return connectors[type][label]
def is_connected(keys: str, **kw) ‑> bool

Check if the connector keys correspond to an active connection. If the connector has not been created, it will immediately return False. If the connector exists but cannot communicate with the source, return False.

NOTE: Only works with instance connectors (SQLConnectors and APIConnectors). Keyword arguments are passed to retry_connect().

Parameters

keys: The keys to the connector (e.g. 'sql:main').

Returns

A bool corresponding to whether a successful connection may be made.

Expand source code
def is_connected(keys: str, **kw) -> bool:
    """
    Check if the connector keys correspond to an active connection.
    If the connector has not been created, it will immediately return `False`.
    If the connector exists but cannot communicate with the source, return `False`.
    
    **NOTE:** Only works with instance connectors (`SQLConnectors` and `APIConnectors`).
    Keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`.

    Parameters
    ----------
    keys:
        The keys to the connector (e.g. `'sql:main'`).
        
    Returns
    -------
    A `bool` corresponding to whether a successful connection may be made.

    """
    import warnings
    if ':' not in keys:
        warn(f"Invalid connector keys '{keys}'")

    try:
        typ, label = keys.split(':')
    except Exception as e:
        return False
    if typ not in instance_types:
        return False
    if not (label in connectors.get(typ, {})):
        return False

    from meerschaum.connectors.parse import parse_instance_keys
    conn = parse_instance_keys(keys)
    try:
        with warnings.catch_warnings():
            warnings.filterwarnings('ignore')
            return conn.test_connection(**kw)
    except Exception as e:
        return False

Classes

class APIConnector (label: Optional[str] = None, wait: bool = False, debug: bool = False, **kw)

Connect to a Meerschaum API instance.

Parameters

type : str
The type of the connector (e.g. meerschaum.connectors.sql, meerschaum.connectors.api, meerschaum.connectors.plugin).
label : str
The label for the connector.

Run mrsm edit config and to edit connectors in the YAML file:

meerschaum:
    connections:
        {type}:
            {label}:
                ### attributes go here
Expand source code
class APIConnector(Connector):
    """
    Connect to a Meerschaum API instance.
    """

    IS_INSTANCE: bool = True
    IS_THREAD_SAFE: bool = False

    from ._delete import delete
    from ._post import post
    from ._patch import patch
    from ._get import get, wget
    from ._actions import get_actions, do_action
    from ._misc import get_mrsm_version, get_chaining_status
    from ._pipes import (
        register_pipe,
        fetch_pipes_keys,
        edit_pipe,
        sync_pipe,
        delete_pipe,
        get_pipe_data,
        get_backtrack_data,
        get_pipe_id,
        get_pipe_attributes,
        get_sync_time,
        pipe_exists,
        create_metadata,
        get_pipe_rowcount,
        drop_pipe,
        clear_pipe,
        get_pipe_columns_types,
    )
    from ._fetch import fetch
    from ._plugins import (
        register_plugin,
        install_plugin,
        delete_plugin,
        get_plugins,
        get_plugin_attributes,
    )
    from ._login import login, test_connection
    from ._users import (
        register_user,
        get_user_id,
        get_users,
        edit_user,
        delete_user,
        get_user_password_hash,
        get_user_type,
        get_user_attributes,
    )
    from ._uri import from_uri

    def __init__(
        self,
        label: Optional[str] = None,
        wait: bool = False,
        debug: bool = False,
        **kw
    ):
        if 'uri' in kw:
            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
            label = label or from_uri_params.get('label', None)
            from_uri_params.pop('label', None)
            kw.update(from_uri_params)

        super().__init__('api', label=label, **kw)
        if 'protocol' not in self.__dict__:
            self.protocol = 'http'
        if 'port' not in self.__dict__:
            self.port = 8000
        if 'uri' not in self.__dict__:
            self.verify_attributes(required_attributes)
        else:
            from meerschaum.connectors.sql import SQLConnector
            conn_attrs = SQLConnector.parse_uri(self.__dict__['uri'])
            if 'host' not in conn_attrs:
                raise Exception(f"Invalid URI for '{self}'.")
            self.__dict__.update(conn_attrs)
        self.url = (
            self.protocol + '://' +
            self.host + ':' +
            str(self.port)
        )
        self._token = None
        self._expires = None
        self._session = None


    @property
    def URI(self) -> str:
        """
        Return the fully qualified URI.
        """
        username = self.__dict__.get('username', None)
        password = self.__dict__.get('password', None)
        creds = (username + ':' + password + '@') if username and password else ''

        return (
            self.protocol + '://' + creds
            + self.host + ':' + str(self.port)
        )


    @property
    def session(self):
        if self._session is None:
            requests = attempt_import('requests')
            if requests:
                self._session = requests.Session()
            if self._session is None:
                error(f"Failed to import requests. Is requests installed?")
        return self._session

    @property
    def token(self):
        expired = (
            True if self._expires is None else (
                (self._expires < datetime.datetime.utcnow() + datetime.timedelta(minutes=1))
            )
        )

        if self._token is None or expired:
            success, msg = self.login()
            if not success:
                warn(msg, stack=False)
        return self._token

Ancestors

  • meerschaum.connectors.Connector.Connector

Class variables

var IS_INSTANCE : bool
var IS_THREAD_SAFE : bool

Static methods

def from_uri(uri: str, label: Optional[str] = None, as_dict: bool = False) ‑> Union[APIConnector, Dict[str, Union[str, int]]]

Create a new APIConnector from a URI string.

Parameters

uri : str
The URI connection string.
label : Optional[str], default None
If provided, use this as the connector label. Otherwise use the determined database name.
as_dict : bool, default False
If True, return a dictionary of the keyword arguments necessary to create a new APIConnector, otherwise create a new object.

Returns

A new APIConnector object or a dictionary of attributes (if as_dict is True).

Expand source code
@classmethod
def from_uri(
        cls,
        uri: str,
        label: Optional[str] = None,
        as_dict: bool = False,
    ) -> Union[
        'meerschaum.connectors.APIConnector',
        Dict[str, Union[str, int]],
    ]:
    """
    Create a new APIConnector from a URI string.

    Parameters
    ----------
    uri: str
        The URI connection string.

    label: Optional[str], default None
        If provided, use this as the connector label.
        Otherwise use the determined database name.

    as_dict: bool, default False
        If `True`, return a dictionary of the keyword arguments
        necessary to create a new `APIConnector`, otherwise create a new object.

    Returns
    -------
    A new APIConnector object or a dictionary of attributes (if `as_dict` is `True`).
    """
    from meerschaum.connectors.sql import SQLConnector
    params = SQLConnector.parse_uri(uri)
    if 'host' not in params:
        error("No host was found in the provided URI.")
    params['protocol'] = params.pop('flavor')
    params['label'] = label or (
        (
            (params['username'] + '@' if 'username' in params else '')
            + params['host']
        ).lower()
    )

    return cls(**params) if not as_dict else params

Instance variables

var URI : str

Return the fully qualified URI.

Expand source code
@property
def URI(self) -> str:
    """
    Return the fully qualified URI.
    """
    username = self.__dict__.get('username', None)
    password = self.__dict__.get('password', None)
    creds = (username + ':' + password + '@') if username and password else ''

    return (
        self.protocol + '://' + creds
        + self.host + ':' + str(self.port)
    )
var session
Expand source code
@property
def session(self):
    if self._session is None:
        requests = attempt_import('requests')
        if requests:
            self._session = requests.Session()
        if self._session is None:
            error(f"Failed to import requests. Is requests installed?")
    return self._session
var token
Expand source code
@property
def token(self):
    expired = (
        True if self._expires is None else (
            (self._expires < datetime.datetime.utcnow() + datetime.timedelta(minutes=1))
        )
    )

    if self._token is None or expired:
        success, msg = self.login()
        if not success:
            warn(msg, stack=False)
    return self._token

Methods

def clear_pipe(self, pipe: Pipe, debug: bool = False, **kw) ‑> SuccessTuple

Delete rows in a pipe's table.

Parameters

pipe : Pipe
The pipe with rows to be deleted.

Returns

A success tuple.

Expand source code
def clear_pipe(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False,
        **kw
    ) -> SuccessTuple:
    """
    Delete rows in a pipe's table.

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe with rows to be deleted.
        
    Returns
    -------
    A success tuple.
    """
    kw.pop('metric_keys', None)
    kw.pop('connector_keys', None)
    kw.pop('location_keys', None)
    kw.pop('action', None)
    kw.pop('force', None)
    return self.do_action(
        ['clear', 'pipes'],
        connector_keys = pipe.connector_keys,
        metric_keys = pipe.metric_key,
        location_keys = pipe.location_key,
        force = True,
        debug = debug,
        **kw
    )
def create_metadata(self, debug: bool = False) ‑> bool

Create metadata tables.

Returns

A bool indicating success.

Expand source code
def create_metadata(
        self,
        debug: bool = False
    ) -> bool:
    """Create metadata tables.

    Returns
    -------
    A bool indicating success.
    """
    from meerschaum.utils.debug import dprint
    from meerschaum.config.static import STATIC_CONFIG
    import json
    r_url = STATIC_CONFIG['api']['endpoints']['metadata']
    response = self.post(r_url, debug=debug)
    if debug:
        dprint("Create metadata response: {response.text}")
    try:
        metadata_response = json.loads(response.text)
    except Exception as e:
        metadata_response = False
    return False
def delete(self, r_url: str, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Ahy) ‑> requests.Response

Wrapper for requests.delete.

Expand source code
def delete(
        self,
        r_url : str,
        headers : Optional[Dict[str, Any]] = None,
        use_token : bool = True,
        debug : bool = False,
        **kw : Ahy,
    ) -> requests.Response:
    """Wrapper for `requests.delete`."""
    if debug:
        from meerschaum.utils.debug import dprint
    
    if headers is None:
        headers = {}

    if use_token:
        if debug:
            dprint(f"Checking token...")
        headers.update({ 'Authorization': f'Bearer {self.token}' })

    if debug:
        from meerschaum.utils.formatting import pprint
        dprint(f"Sending DELETE request to {self.url + r_url}.")

    return self.session.delete(
        self.url + r_url,
        headers = headers,
        **kw
    )
def delete_pipe(self, pipe: Optional[Pipe] = None, debug: bool = None) ‑> SuccessTuple

Delete a Pipe and drop its table.

Expand source code
def delete_pipe(
        self,
        pipe: Optional[meerschaum.Pipe] = None,
        debug: bool = None,        
    ) -> SuccessTuple:
    """Delete a Pipe and drop its table."""
    from meerschaum.utils.warnings import error
    from meerschaum.utils.debug import dprint
    if pipe is None:
        error(f"Pipe cannot be None.")
    r_url = pipe_r_url(pipe)
    response = self.delete(
        r_url + '/delete',
        debug = debug,
    )
    if debug:
        dprint(response.text)
    if isinstance(response.json(), list):
        response_tuple = response.__bool__(), response.json()[1]
    elif 'detail' in response.json():
        response_tuple = response.__bool__(), response.json()['detail']
    else:
        response_tuple = response.__bool__(), response.text
    return response_tuple
def delete_plugin(self, plugin: meerschaum.core.Plugin, debug: bool = False) ‑> SuccessTuple

Delete a plugin from an API repository.

Expand source code
def delete_plugin(
        self,
        plugin: meerschaum.core.Plugin,
        debug: bool = False
    ) -> SuccessTuple:
    """Delete a plugin from an API repository."""
    import json
    r_url = plugin_r_url(plugin)
    try:
        response = self.delete(r_url, debug=debug)
    except Exception as e:
        return False, f"Failed to delete plugin '{plugin}'."

    try:
        success, msg = json.loads(response.text)
    except Exception as e:
        return False, response.text

    return success, msg
def delete_user(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> SuccessTuple

Delete a user.

Expand source code
def delete_user(
        self,
        user: 'meerschaum.core.User',
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """Delete a user."""
    from meerschaum.config.static import _static_config
    import json
    r_url = f"{_static_config()['api']['endpoints']['users']}/{user.username}"
    response = self.delete(r_url, debug=debug)
    try:
        _json = json.loads(response.text)
        if isinstance(_json, dict) and 'detail' in _json:
            return False, _json['detail']
        success_tuple = tuple(_json)
    except Exception as e:
        success_tuple = False, f"Failed to delete user '{user.username}'."
    return success_tuple
def do_action(self, action: Optional[List[str]] = None, sysargs: Optional[List[str]] = None, debug: bool = False, **kw) ‑> Tuple[bool, str]

Execute a Meerschaum action remotely.

If sysargs is provided, parse those instead. Otherwise infer everything from keyword arguments.

NOTE: The first index of action should NOT be removed! Example: action = ['show', 'config']

Returns: tuple (succeeded : bool, message : str)

Parameters

action : Optional[List[str]] :
(Default value = None)
sysargs : Optional[List[str]] :
(Default value = None)
debug : bool :
(Default value = False)

**kw :

Returns

Expand source code
def do_action(
        self,
        action: Optional[List[str]] = None,
        sysargs: Optional[List[str]] = None,
        debug: bool = False,
        **kw
    ) -> SuccessTuple:
    """Execute a Meerschaum action remotely.
    
    If sysargs is provided, parse those instead. Otherwise infer everything from keyword arguments.
    
    NOTE: The first index of `action` should NOT be removed!
    Example: action = ['show', 'config']
    
    Returns: tuple (succeeded : bool, message : str)

    Parameters
    ----------
    action: Optional[List[str]] :
         (Default value = None)
    sysargs: Optional[List[str]] :
         (Default value = None)
    debug: bool :
         (Default value = False)
    **kw :
        

    Returns
    -------

    """
    import sys, json
    from meerschaum.utils.debug import dprint
    from meerschaum.config.static import _static_config
    from meerschaum.utils.misc import json_serialize_datetime
    if action is None:
        action = []

    if sysargs is not None and action and action[0] == '':
        from meerschaum._internal.arguments import parse_arguments
        if debug:
            dprint(f"Parsing sysargs:\n{sysargs}")
        json_dict = parse_arguments(sysargs)
    else:
        json_dict = kw
        json_dict['action'] = action
        json_dict['debug'] = debug

    root_action = json_dict['action'][0]
    del json_dict['action'][0]
    r_url = f"{_static_config()['api']['endpoints']['actions']}/{root_action}"
    
    if debug:
        from meerschaum.utils.formatting import pprint
        dprint(f"Sending data to '{self.url + r_url}':")
        pprint(json_dict, stream=sys.stderr)

    response = self.post(
        r_url,
        data = json.dumps(json_dict, default=json_serialize_datetime),
        debug = debug,
    )
    try:
        response_list = json.loads(response.text)
        if isinstance(response_list, dict) and 'detail' in response_list:
            return False, response_list['detail']
    except Exception as e:
        print(f"Invalid response: {response}")
        print(e)
        return False, response.text
    if debug:
        dprint(response)
    try:
        return response_list[0], response_list[1]
    except Exception as e:
        return False, f"Failed to parse result from action '{root_action}'"
def drop_pipe(self, pipe: Pipe, debug: bool = False) ‑> SuccessTuple

Drop a pipe's table but maintain its registration.

Parameters

pipe : meerschaum.Pipe:
The pipe to be dropped.

Returns

A success tuple (bool, str).

Expand source code
def drop_pipe(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False
    ) -> SuccessTuple:
    """Drop a pipe's table but maintain its registration.

    Parameters
    ----------
    pipe: meerschaum.Pipe:
        The pipe to be dropped.
        
    Returns
    -------
    A success tuple (bool, str).
    """
    return self.do_action(
        ['drop', 'pipes'],
        connector_keys = pipe.connector_keys,
        metric_keys = pipe.metric_key,
        location_keys = pipe.location_key,
        force = True,
        debug = debug
    )
def edit_pipe(self, pipe: Pipe, patch: bool = False, debug: bool = False) ‑> SuccessTuple

Submit a PATCH to the API to edit an existing Pipe object. Returns a tuple of (success_bool, response_dict).

Expand source code
def edit_pipe(
        self,
        pipe: meerschaum.Pipe,
        patch: bool = False,
        debug: bool = False,
    ) -> SuccessTuple:
    """Submit a PATCH to the API to edit an existing Pipe object.
    Returns a tuple of (success_bool, response_dict).
    """
    from meerschaum.utils.debug import dprint
    ### NOTE: if `parameters` is supplied in the Pipe constructor,
    ###       then `pipe.parameters` will exist and not be fetched from the database.
    r_url = pipe_r_url(pipe)
    response = self.patch(
        r_url + '/edit',
        params = {'patch': patch,},
        json = pipe.parameters,
        debug = debug,
    )
    if debug:
        dprint(response.text)
    if isinstance(response.json(), list):
        response_tuple = response.__bool__(), response.json()[1]
    elif 'detail' in response.json():
        response_tuple = response.__bool__(), response.json()['detail']
    else:
        response_tuple = response.__bool__(), response.text
    return response_tuple
def edit_user(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> SuccessTuple

Edit an existing user.

Expand source code
def edit_user(
        self,
        user: 'meerschaum.core.User',
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """Edit an existing user."""
    import json
    from meerschaum.config.static import STATIC_CONFIG
    r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/edit"
    data = {
        'username': user.username,
        'password': user.password,
        'type': user.type,
        'email': user.email,
        'attributes': json.dumps(user.attributes),
    }
    response = self.post(r_url, data=data, debug=debug)
    try:
        _json = json.loads(response.text)
        if isinstance(_json, dict) and 'detail' in _json:
            return False, _json['detail']
        success_tuple = tuple(_json)
    except Exception as e:
        msg = response.text if response else f"Failed to edit user '{user}'."
        return False, msg

    return tuple(success_tuple)
def fetch(self, pipe: Pipe, begin: Optional[datetime.datetime, str] = '', end: Optional[datetime.datetime] = None, params: Optional[Dict, Any] = None, debug: bool = False, **kw: Any) ‑> pandas.DataFrame

Get the Pipe data from the remote Pipe.

Expand source code
def fetch(
        self,
        pipe: meerschaum.Pipe,
        begin: Optional[datetime.datetime, str] = '',
        end: Optional[datetime.datetime] = None,
        params: Optional[Dict, Any] = None,
        debug: bool = False,
        **kw: Any
    ) -> pandas.DataFrame:
    """Get the Pipe data from the remote Pipe."""
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.warnings import warn, error
    from meerschaum.config.static import _static_config
    from meerschaum.config._patch import apply_patch_to_config

    if 'fetch' not in pipe.parameters:
        warn(f"Missing 'fetch' parameters for Pipe '{pipe}'.", stack=False)
        return None

    instructions = pipe.parameters['fetch']

    if 'connector_keys' not in instructions:
        warn(f"Missing connector_keys in fetch parameters for Pipe '{pipe}'", stack=False)
        return None
    remote_connector_keys = instructions.get('connector_keys', None)
    if 'metric_key' not in instructions:
        warn(f"Missing metric_key in fetch parameters for Pipe '{pipe}'", stack=False)
        return None
    remote_metric_key = instructions.get('metric_key', None)
    remote_location_key = instructions.get('location_key', None)
    if begin is None:
        begin = pipe.sync_time

    _params = copy.deepcopy(params) if params is not None else {}
    _params = apply_patch_to_config(_params, instructions.get('params', {}))

    from meerschaum import Pipe
    p = Pipe(
        remote_connector_keys,
        remote_metric_key,
        remote_location_key,
        mrsm_instance = self
    )
    begin = (
        begin if not (isinstance(begin, str) and begin == '')
        else pipe.get_sync_time(debug=debug)
    )

    return p.get_data(
        begin=begin, end=end,
        params=_params,
        debug=debug
    )
def fetch_pipes_keys(self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) ‑> List[Tuple[str, str, Optional[str]]]

Fetch registered Pipes' keys from the API.

Parameters

connector_keys : Optional[List[str]], default None
The connector keys for the query.
metric_keys : Optional[List[str]], default None
The metric keys for the query.
location_keys : Optional[List[str]], default None
The location keys for the query.
tags : Optional[List[str]], default None
A list of tags for the query.
params : Optional[Dict[str, Any]], default None
A parameters dictionary for filtering against the pipes table (e.g. {'connector_keys': 'plugin:foo'}). Not recommeded to be used.
debug : bool, default False
Verbosity toggle.

Returns

A list of tuples containing pipes' keys.

Expand source code
def fetch_pipes_keys(
        self,
        connector_keys: Optional[List[str]] = None,
        metric_keys: Optional[List[str]] = None,
        location_keys: Optional[List[str]] = None,
        tags: Optional[List[str]] = None,
        params: Optional[Dict[str, Any]] = None,
        debug: bool = False
    ) -> Union[List[Tuple[str, str, Union[str, None]]]]:
    """
    Fetch registered Pipes' keys from the API.
    
    Parameters
    ----------
    connector_keys: Optional[List[str]], default None
        The connector keys for the query.

    metric_keys: Optional[List[str]], default None
        The metric keys for the query.

    location_keys: Optional[List[str]], default None
        The location keys for the query.

    tags: Optional[List[str]], default None
        A list of tags for the query.

    params: Optional[Dict[str, Any]], default None
        A parameters dictionary for filtering against the `pipes` table
        (e.g. `{'connector_keys': 'plugin:foo'}`).
        Not recommeded to be used.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A list of tuples containing pipes' keys.
    """
    from meerschaum.utils.warnings import error
    from meerschaum.config.static import STATIC_CONFIG
    import json
    if connector_keys is None:
        connector_keys = []
    if metric_keys is None:
        metric_keys = []
    if location_keys is None:
        location_keys = []
    if tags is None:
        tags = []

    r_url = STATIC_CONFIG['api']['endpoints']['pipes'] + '/keys'
    try:
        j = self.get(
            r_url,
            params = {
                'connector_keys': json.dumps(connector_keys),
                'metric_keys': json.dumps(metric_keys),
                'location_keys': json.dumps(location_keys),
                'tags': json.dumps(tags),
                'params': json.dumps(params),
            },
            debug=debug
        ).json()
    except Exception as e:
        error(str(e))

    if 'detail' in j:
        error(j['detail'], stack=False)
    return [tuple(r) for r in j]
def get(self, r_url: str, headers: Optional[Dict[str, str]] = None, use_token: bool = True, debug: bool = False, **kw: Any) ‑> requests.Reponse

Wrapper for requests.get.

Expand source code
def get(
        self,
        r_url: str,
        headers: Optional[Dict[str, str]] = None,
        use_token: bool = True,
        debug: bool = False,
        **kw: Any
    ) -> requests.Reponse:
    """Wrapper for `requests.get`."""
    if debug:
        from meerschaum.utils.debug import dprint

    if headers is None:
        headers = {}

    if use_token:
        if debug:
            dprint(f"Checking login token.")
        headers.update({'Authorization': f'Bearer {self.token}'})

    if debug:
        from meerschaum.utils.formatting import pprint
        dprint(f"Sending GET request to {self.url + r_url}.")

    return self.session.get(
        self.url + r_url,
        headers = headers,
        **kw
    )
def get_actions(self) ‑> list

Get available actions from the API server

Expand source code
def get_actions(
        self,
    ) -> list:
    """Get available actions from the API server"""
    from meerschaum.config.static import STATIC_CONFIG
    return self.get(STATIC_CONFIG['api']['endpoints']['actions'])
def get_backtrack_data(self, pipe: Pipe, begin: datetime.datetime, backtrack_minutes: int = 0, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw: Any) ‑> pandas.DataFrame

Get a Pipe's backtrack data from the API.

Expand source code
def get_backtrack_data(
        self,
        pipe: meerschaum.Pipe,
        begin: datetime.datetime,
        backtrack_minutes: int = 0,
        params: Optional[Dict[str, Any]] = None,
        debug: bool = False,
        **kw: Any,
    ) -> pandas.DataFrame:
    """Get a Pipe's backtrack data from the API."""
    import json
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.warnings import warn
    r_url = pipe_r_url(pipe)
    try:
        response = self.get(
            r_url + "/backtrack_data",
            params = {
                'begin': begin,
                'backtrack_minutes': backtrack_minutes,
                'params': json.dumps(params),
            },
            debug = debug
        )
    except Exception as e:
        warn(f"Failed to parse backtrack data JSON for {pipe}. Exception:\n" + str(e))
        return None
    from meerschaum.utils.packages import import_pandas
    from meerschaum.utils.misc import parse_df_datetimes
    if debug:
        dprint(response.text)
    pd = import_pandas()
    try:
        df = pd.read_json(response.text)
    except Exception as e:
        warn(str(e))
        return None
    df = parse_df_datetimes(pd.read_json(response.text), debug=debug)
    return df
def get_chaining_status(self, **kw) ‑> Optional[bool]

Parameters

**kw :

Returns

type
 
Expand source code
def get_chaining_status(self, **kw) -> Optional[bool]:
    """

    Parameters
    ----------
    **kw :
        

    Returns
    -------
    type
        

    """
    from meerschaum.config.static import _static_config
    try:
        response = self.get(
            _static_config()['api']['endpoints']['chaining'],
            use_token = True,
            **kw
        )
        if not response:
            return None
    except Exception as e:
        return None

    return response.json()
def get_mrsm_version(self, **kw) ‑> Optional[str]

Parameters

**kw :

Returns

type
 
Expand source code
def get_mrsm_version(self, **kw) -> Optional[str]:
    """

    Parameters
    ----------
    **kw :
        

    Returns
    -------
    type
        

    """
    from meerschaum.config.static import _static_config
    try:
        j = self.get(
            _static_config()['api']['endpoints']['version'] + '/mrsm',
            use_token = True,
            **kw
        ).json()
    except Exception as e:
        return None
    if isinstance(j, dict) and 'detail' in j:
        return None
    return j
def get_pipe_attributes(self, pipe: Pipe, debug: bool = False) ‑> Dict[str, Any]

Get a Pipe's attributes from the API

Parameters

pipe : Pipe
The pipe whose attributes we are fetching.

Returns

A dictionary of a pipe's attributes. If the pipe does not exist, return an empty dictionary.

Expand source code
def get_pipe_attributes(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False,
    ) -> Dict[str, Any]:
    """Get a Pipe's attributes from the API

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe whose attributes we are fetching.
        
    Returns
    -------
    A dictionary of a pipe's attributes.
    If the pipe does not exist, return an empty dictionary.
    """
    r_url = pipe_r_url(pipe)
    response = self.get(r_url + '/attributes', debug=debug)
    import json
    try:
        return json.loads(response.text)
    except Exception as e:
        return {}
def get_pipe_columns_types(self, pipe: Pipe, debug: bool = False) ‑> Union[Dict[str, str], None]

Fetch the columns and types of the pipe's table.

Parameters

pipe : Pipe
The pipe whose columns to be queried.

Returns

A dictionary mapping column names to their database types.

Examples

>>> {
...   'dt': 'TIMESTAMP WITHOUT TIMEZONE',
...   'id': 'BIGINT',
...   'val': 'DOUBLE PRECISION',
... }
>>>
Expand source code
def get_pipe_columns_types(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False,
    ) -> Union[Dict[str, str], None]:
    """
    Fetch the columns and types of the pipe's table.

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe whose columns to be queried.
        
    Returns
    -------
    A dictionary mapping column names to their database types.

    Examples
    --------
    >>> {
    ...   'dt': 'TIMESTAMP WITHOUT TIMEZONE',
    ...   'id': 'BIGINT',
    ...   'val': 'DOUBLE PRECISION',
    ... }
    >>>
    """
    r_url = pipe_r_url(pipe) + '/columns/types'
    response = self.get(
        r_url,
        debug = debug
    )
    j = response.json()
    if isinstance(j, dict) and 'detail' in j and len(j.keys()) == 1:
        from meerschaum.utils.warnings import warn
        warn(j['detail'])
        return None
    if not isinstance(j, dict):
        warn(response.text)
        return None
    return j
def get_pipe_data(self, pipe: Pipe, begin: Union[str, datetime.datetime, int, None] = None, end: Union[str, datetime.datetime, int, None] = None, params: Optional[Dict[str, Any]] = None, as_chunks: bool = False, debug: bool = False, **kw: Any) ‑> Union[pandas.DataFrame, None]

Fetch data from the API.

Expand source code
def get_pipe_data(
        self,
        pipe: meerschaum.Pipe,
        begin: Union[str, datetime.datetime, int, None] = None,
        end: Union[str, datetime.datetime, int, None] = None,
        params: Optional[Dict[str, Any]] = None,
        as_chunks: bool = False,
        debug: bool = False,
        **kw: Any
    ) -> Union[pandas.DataFrame, None]:
    """Fetch data from the API."""
    import json
    from meerschaum.utils.warnings import warn
    r_url = pipe_r_url(pipe)
    chunks_list = []
    while True:
        try:
            response = self.get(
                r_url + "/data",
                params = {'begin': begin, 'end': end, 'params': json.dumps(params)},
                debug = debug
            )
            if not response.ok:
                return None
            j = response.json()
        except Exception as e:
            warn(str(e))
            return None
        if isinstance(j, dict) and 'detail' in j:
            return False, j['detail']
        break
    from meerschaum.utils.packages import import_pandas
    from meerschaum.utils.misc import parse_df_datetimes
    pd = import_pandas()
    try:
        df = pd.read_json(response.text)
    except Exception as e:
        warn(str(e))
        return None
    df = parse_df_datetimes(
        df,
        ignore_cols = [
            col
            for col, dtype in pipe.dtypes.items()
            if 'datetime' not in str(dtype)
        ],
        debug = debug,
    )
    return df
def get_pipe_id(self, pipe: meerschuam.Pipe, debug: bool = False) ‑> int

Get a Pipe's ID from the API.

Expand source code
def get_pipe_id(
        self,
        pipe: meerschuam.Pipe,
        debug: bool = False,
    ) -> int:
    """Get a Pipe's ID from the API."""
    from meerschaum.utils.debug import dprint
    r_url = pipe_r_url(pipe)
    response = self.get(
        r_url + '/id',
        debug = debug
    )
    if debug:
        dprint(f"Got pipe ID: {response.text}")
    try:
        return int(response.text)
    except Exception as e:
        return None
def get_pipe_rowcount(self, pipe: "'Pipe'", begin: "Optional['datetime.datetime']" = None, end: "Optional['datetime.datetime']" = None, params: Optional[Dict[str, Any]] = None, remote: bool = False, debug: bool = False) ‑> int

Get a pipe's row count from the API.

Parameters

pipe : 'meerschaum.Pipe':
The pipe whose row count we are counting.
begin : Optional[datetime.datetime], default None
If provided, bound the count by this datetime.
end : Optional[datetime.datetime]
If provided, bound the count by this datetime.
params : Optional[Dict[str, Any]], default None
If provided, bound the count by these parameters.
remote : bool, default False
 

Returns

The number of rows in the pipe's table, bound the given parameters. If the table does not exist, return 0.

Expand source code
def get_pipe_rowcount(
        self,
        pipe: 'meerschaum.Pipe',
        begin: Optional['datetime.datetime'] = None,
        end: Optional['datetime.datetime'] = None,
        params: Optional[Dict[str, Any]] = None,
        remote: bool = False,
        debug: bool = False,
    ) -> int:
    """Get a pipe's row count from the API.

    Parameters
    ----------
    pipe: 'meerschaum.Pipe':
        The pipe whose row count we are counting.
        
    begin: Optional[datetime.datetime], default None
        If provided, bound the count by this datetime.

    end: Optional[datetime.datetime]
        If provided, bound the count by this datetime.

    params: Optional[Dict[str, Any]], default None
        If provided, bound the count by these parameters.

    remote: bool, default False

    Returns
    -------
    The number of rows in the pipe's table, bound the given parameters.
    If the table does not exist, return 0.
    """
    import json
    r_url = pipe_r_url(pipe)
    response = self.get(
        r_url + "/rowcount",
        json = params,
        params = {
            'begin' : begin,
            'end' : end,
            'remote' : remote,
        },
        debug = debug
    )
    try:
        return int(json.loads(response.text))
    except Exception as e:
        return 0
def get_plugin_attributes(self, plugin: meerschaum.core.Plugin, debug: bool = False) ‑> Mapping[str, Any]

Return a plugin's attributes.

Expand source code
def get_plugin_attributes(
        self,
        plugin: meerschaum.core.Plugin,
        debug: bool = False
    ) -> Mapping[str, Any]:
    """
    Return a plugin's attributes.
    """
    import json
    from meerschaum.utils.warnings import warn, error
    from meerschaum.config.static import _static_config
    r_url = plugin_r_url(plugin) + '/attributes'
    response = self.get(r_url, use_token=True, debug=debug)
    attributes = response.json()
    if isinstance(attributes, str) and attributes and attributes[0] == '{':
        try:
            attributes = json.loads(attributes)
        except Exception as e:
            pass
    if not isinstance(attributes, dict):
        error(response.text)
    elif not response and 'detail' in attributes:
        warn(attributes['detail'])
        return {}
    return attributes
def get_plugins(self, user_id: Optional[int] = None, search_term: Optional[str] = None, debug: bool = False) ‑> Sequence[str]

Return a list of registered plugin names.

Parameters

user_id :
If specified, return all plugins from a certain user.
user_id : Optional[int] :
(Default value = None)
search_term : Optional[str] :
(Default value = None)
debug : bool :
(Default value = False)

Returns

Expand source code
def get_plugins(
        self,
        user_id : Optional[int] = None,
        search_term : Optional[str] = None,
        debug : bool = False
    ) -> Sequence[str]:
    """Return a list of registered plugin names.

    Parameters
    ----------
    user_id :
        If specified, return all plugins from a certain user.
    user_id : Optional[int] :
         (Default value = None)
    search_term : Optional[str] :
         (Default value = None)
    debug : bool :
         (Default value = False)

    Returns
    -------

    """
    import json
    from meerschaum.utils.warnings import warn, error
    from meerschaum.config.static import _static_config
    response = self.get(
        _static_config()['api']['endpoints']['plugins'],
        params = {'user_id' : user_id, 'search_term' : search_term},
        use_token = True,
        debug = debug
    )
    if not response:
        return []
    plugins = json.loads(response.text)
    if not isinstance(plugins, list):
        error(response.text)
    return plugins
def get_sync_time(self, pipe: "'Pipe'", params: Optional[Dict[str, Any]] = None, newest: bool = True, round_down: bool = True, debug: bool = False) ‑> Union[datetime.datetime, int, None]

Get a Pipe's most recent datetime value from the API.

Parameters

pipe : Pipe
The pipe to select from.
params : Optional[Dict[str, Any]], default None
Optional params dictionary to build the WHERE clause.
newest : bool, default True
If True, get the most recent datetime (honoring params). If False, get the oldest datetime (ASC instead of DESC).
round_down : bool, default True
If True, round the resulting datetime value down to the nearest minute.

Returns

The most recent (or oldest if newest is False) datetime of a pipe, rounded down to the closest minute.

Expand source code
def get_sync_time(
        self,
        pipe: 'meerschaum.Pipe',
        params: Optional[Dict[str, Any]] = None,
        newest: bool = True,
        round_down: bool = True,
        debug: bool = False,
    ) -> Union[datetime.datetime, int, None]:
    """Get a Pipe's most recent datetime value from the API.

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe to select from.

    params: Optional[Dict[str, Any]], default None
        Optional params dictionary to build the WHERE clause.

    newest: bool, default True
        If `True`, get the most recent datetime (honoring `params`).
        If `False`, get the oldest datetime (ASC instead of DESC).

    round_down: bool, default True
        If `True`, round the resulting datetime value down to the nearest minute.

    Returns
    -------
    The most recent (or oldest if `newest` is `False`) datetime of a pipe,
    rounded down to the closest minute.
    """
    from meerschaum.utils.misc import is_int
    from meerschaum.utils.warnings import warn
    import datetime, json
    r_url = pipe_r_url(pipe)
    response = self.get(
        r_url + '/sync_time',
        json = params,
        params = {'newest': newest, 'debug': debug, 'round_down': round_down},
        debug = debug,
    )
    if not response:
        warn(response.text)
        return None
    j = response.json()
    if j is None:
        dt = None
    else:
        try:
            dt = (
                datetime.datetime.fromisoformat(j)
            ) if not is_int(j) else int(j)
        except Exception as e:
            warn(e)
            dt = None
    return dt
def get_user_attributes(self, user: "'meerschaum.core.User'", debug: bool = False, **kw) ‑> int

Get a user's attributes.

Expand source code
def get_user_attributes(
        self,
        user: 'meerschaum.core.User',
        debug: bool = False,
        **kw
    ) -> int:
    """Get a user's attributes."""
    from meerschaum.config.static import _static_config
    import json
    r_url = f"{_static_config()['api']['endpoints']['users']}/{user.username}/attributes"
    response = self.get(r_url, debug=debug, **kw)
    try:
        attributes = json.loads(response.text)
    except Exception as e:
        attributes = None
    return attributes
def get_user_id(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> Optional[int]

Get a user's ID.

Expand source code
def get_user_id(
        self,
        user: 'meerschaum.core.User',
        debug: bool = False,
        **kw: Any
    ) -> Optional[int]:
    """Get a user's ID."""
    from meerschaum.config.static import _static_config
    import json
    r_url = f"{_static_config()['api']['endpoints']['users']}/{user.username}/id"
    response = self.get(r_url, debug=debug, **kw)
    try:
        user_id = int(json.loads(response.text))
    except Exception as e:
        user_id = None
    return user_id
def get_user_password_hash(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> Optional[str]

If configured, get a user's password hash.

Expand source code
def get_user_password_hash(
        self,
        user: 'meerschaum.core.User',
        debug: bool = False,
        **kw: Any
    ) -> Optional[str]:
    """If configured, get a user's password hash."""
    from meerschaum.config.static import _static_config
    r_url = _static_config()['api']['endpoints']['users'] + '/' + user.username + '/password_hash'
    response = self.get(r_url, debug=debug, **kw)
    if not response:
        return None
    return response.json()
def get_user_type(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> Optional[str]

If configured, get a user's type.

Expand source code
def get_user_type(
        self,
        user: 'meerschaum.core.User',
        debug: bool = False,
        **kw: Any
    ) -> Optional[str]:
    """If configured, get a user's type."""
    from meerschaum.config.static import _static_config
    r_url = _static_config()['api']['endpoints']['users'] + '/' + user.username + '/type'
    response = self.get(r_url, debug=debug, **kw)
    if not response:
        return None
    return response.json()
def get_users(self, debug: bool = False, **kw: Any) ‑> List[str]

Return a list of registered usernames.

Expand source code
def get_users(
        self,
        debug: bool = False,
        **kw : Any
    ) -> List[str]:
    """
    Return a list of registered usernames.
    """
    from meerschaum.config.static import _static_config
    import json
    response = self.get(
        f"{_static_config()['api']['endpoints']['users']}",
        debug = debug,
        use_token = True,
    )
    if not response:
        return []
    try:
        return response.json()
    except Exception as e:
        return []
def install_plugin(self, name: str, force: bool = False, debug: bool = False) ‑> Tuple[bool, str]

Download and attempt to install a plugin from the API.

Expand source code
def install_plugin(
        self,
        name: str,
        force: bool = False,
        debug: bool = False
    ) -> SuccessTuple:
    """Download and attempt to install a plugin from the API."""
    import os, pathlib, json
    from meerschaum.core import Plugin
    from meerschaum.config._paths import PLUGINS_TEMP_RESOURCES_PATH
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.packages import attempt_import
    binaryornot_check = attempt_import('binaryornot.check', lazy=False)
    r_url = plugin_r_url(name)
    dest = pathlib.Path(os.path.join(PLUGINS_TEMP_RESOURCES_PATH, name + '.tar.gz'))
    if debug:
        dprint(f"Fetching from '{self.url + r_url}' to '{dest}'...")
    archive_path = self.wget(r_url, dest, debug=debug) 
    is_binary = binaryornot_check.is_binary(str(archive_path))
    if not is_binary:
        fail_msg = f"Failed to download binary for plugin '{name}'."
        try:
            with open(archive_path, 'r') as f:
                j = json.load(f)
            if isinstance(j, list):
                success, msg = tuple(j)
            elif isinstance(j, dict) and 'detail' in j:
                success, msg = False, fail_msg
        except Exception as e:
            success, msg = False, fail_msg
        return success, msg
    plugin = Plugin(name, archive_path=archive_path, repo_connector=self)
    return plugin.install(force=force, debug=debug)
def login(self, debug: bool = False, warn: bool = True, **kw: Any) ‑> Tuple[bool, str]

Log in and set the session token.

Expand source code
def login(
        self,
        debug: bool = False,
        warn: bool = True,
        **kw: Any
    ) -> SuccessTuple:
    """Log in and set the session token."""
    from meerschaum.utils.warnings import warn as _warn, info, error
    from meerschaum.core import User
    from meerschaum.config.static import _static_config
    import json, datetime
    try:
        login_data = {
            'username': self.username,
            'password': self.password,
        }
    except AttributeError:
        return False, f"Please login with the command `login {self}`."
    response = self.post(
        _static_config()['api']['endpoints']['login'],
        data = login_data,
        use_token = False,
        debug = debug
    )
    if response:
        msg = f"Successfully logged into '{self}' as user '{login_data['username']}'."
        self._token = json.loads(response.text)['access_token']
        self._expires = datetime.datetime.strptime(
            json.loads(response.text)['expires'], 
            '%Y-%m-%dT%H:%M:%S.%f'
        )
    else:
        msg = (
            f"Failed to log into '{self}' as user '{login_data['username']}'.\n" +
            f"    Please verify login details for connector '{self}'."
        )
        if warn:
            _warn(msg, stack=False)

    return response.__bool__(), msg
def patch(self, r_url: str, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Any) ‑> requests.Response

Wrapper for requests.patch.

Expand source code
def patch(
        self,
        r_url: str,
        headers: Optional[Dict[str, Any]] = None,
        use_token: bool = True,
        debug: bool = False,
        **kw: Any
    ) -> requests.Response:
    """Wrapper for `requests.patch`."""
    if debug:
        from meerschaum.utils.debug import dprint

    if headers is None:
        headers = {}

    if use_token:
        if debug:
            dprint(f"Checking login token.")
        headers.update({ 'Authorization': f'Bearer {self.token}' })

    if debug:
        from meerschaum.utils.formatting import pprint
        dprint(f"Sending PATCH request to {self.url + r_url}")

    return self.session.patch(
        self.url + r_url,
        headers = headers,
        **kw
    )
def pipe_exists(self, pipe: "'Pipe'", debug: bool = False) ‑> bool

Check the API to see if a Pipe exists.

Parameters

pipe : 'meerschaum.Pipe'
The pipe which were are querying.

Returns

A bool indicating whether a pipe's underlying table exists.

Expand source code
def pipe_exists(
        self,
        pipe: 'meerschaum.Pipe',
        debug: bool = False
    ) -> bool:
    """Check the API to see if a Pipe exists.

    Parameters
    ----------
    pipe: 'meerschaum.Pipe'
        The pipe which were are querying.
        
    Returns
    -------
    A bool indicating whether a pipe's underlying table exists.
    """
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.warnings import warn
    r_url = pipe_r_url(pipe)
    response = self.get(r_url + '/exists', debug=debug)
    if debug:
        dprint("Received response: " + str(response.text))
    j = response.json()
    if isinstance(j, dict) and 'detail' in j:
        warn(j['detail'])
    return j
def post(self, r_url: str, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Any) ‑> requests.Response

Wrapper for requests.post.

Expand source code
def post(
        self,
        r_url: str,
        headers: Optional[Dict[str, Any]] = None,
        use_token: bool = True,
        debug: bool = False,
        **kw: Any
    ) -> requests.Response:
    """Wrapper for `requests.post`."""
    if debug:
        from meerschaum.utils.debug import dprint

    if headers is None:
        headers = {}

    if use_token:
        if debug:
            dprint(f"Checking token...")
        headers.update({'Authorization': f'Bearer {self.token}'})

    if debug:
        from meerschaum.utils.formatting import pprint
        dprint(f"Sending POST request to {self.url + r_url}")

    return self.session.post(
        self.url + r_url,
        headers = headers,
        **kw
    )
def register_pipe(self, pipe: Pipe, debug: bool = False) ‑> SuccessTuple

Submit a POST to the API to register a new Pipe object. Returns a tuple of (success_bool, response_dict).

Expand source code
def register_pipe(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False
    ) -> SuccessTuple:
    """Submit a POST to the API to register a new Pipe object.
    Returns a tuple of (success_bool, response_dict).
    """
    from meerschaum.utils.debug import dprint
    from meerschaum.config.static import STATIC_CONFIG
    ### NOTE: if `parameters` is supplied in the Pipe constructor,
    ###       then `pipe.parameters` will exist and not be fetched from the database.
    r_url = pipe_r_url(pipe)
    response = self.post(
        r_url + '/register',
        json = pipe.parameters,
        debug = debug,
    )
    if debug:
        dprint(response.text)
    if isinstance(response.json(), list):
        response_tuple = response.__bool__(), response.json()[1]
    elif 'detail' in response.json():
        response_tuple = response.__bool__(), response.json()['detail']
    else:
        response_tuple = response.__bool__(), response.text
    return response_tuple
def register_plugin(self, plugin: meerschaum.core.Plugin, make_archive: bool = True, debug: bool = False) ‑> SuccessTuple

Register a plugin and upload its archive.

Expand source code
def register_plugin(
        self,
        plugin: meerschaum.core.Plugin,
        make_archive: bool = True,
        debug: bool = False,
    ) -> SuccessTuple:
    """Register a plugin and upload its archive."""
    import json
    archive_path = plugin.make_tar(debug=debug) if make_archive else plugin.archive_path
    file_pointer = open(archive_path, 'rb')
    files = {'archive': file_pointer}
    metadata = {
        'version': plugin.version,
        'attributes': json.dumps(plugin.attributes),
    }
    r_url = plugin_r_url(plugin)
    try:
        response = self.post(r_url, files=files, params=metadata, debug=debug)
    except Exception as e:
        return False, f"Failed to register plugin '{plugin}'."
    finally:
        file_pointer.close()

    try:
        success, msg = json.loads(response.text)
    except Exception as e:
        return False, response.text

    return success, msg
def register_user(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> SuccessTuple

Register a new user.

Expand source code
def register_user(
        self,
        user: 'meerschaum.core.User',
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """Register a new user."""
    import json
    from meerschaum.config.static import _static_config
    r_url = f"{_static_config()['api']['endpoints']['users']}/register"
    data = {
        'username': user.username,
        'password': user.password,
        'attributes': json.dumps(user.attributes),
    }
    if user.type:
        data['type'] = user.type
    if user.email:
        data['email'] = user.email
    response = self.post(r_url, data=data, debug=debug)
    try:
        _json = json.loads(response.text)
        if isinstance(_json, dict) and 'detail' in _json:
            return False, _json['detail']
        success_tuple = tuple(_json)
    except Exception:
        msg = response.text if response else f"Failed to register user '{user}'."
        return False, msg

    return tuple(success_tuple)
def sync_pipe(self, pipe: Optional[Pipe] = None, df: Optional[Union[pandas.DataFrame, Dict[Any, Any], str]] = None, chunksize: Optional[int] = -1, debug: bool = False, **kw: Any) ‑> SuccessTuple

Append a pandas DataFrame to a Pipe. If Pipe does not exist, it is registered with supplied metadata.

Expand source code
def sync_pipe(
        self,
        pipe: Optional[meerschaum.Pipe] = None,
        df: Optional[Union[pandas.DataFrame, Dict[Any, Any], str]] = None,
        chunksize: Optional[int] = -1,
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """Append a pandas DataFrame to a Pipe.
    If Pipe does not exist, it is registered with supplied metadata.
    """
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.misc import json_serialize_datetime
    from meerschaum.config import get_config
    from meerschaum.utils.packages import attempt_import
    import json, time
    begin = time.time()
    more_itertools = attempt_import('more_itertools')
    if df is None:
        msg = f"DataFrame is `None`. Cannot sync {pipe}."
        return False, msg

    def get_json_str(c):
        ### allow syncing dict or JSON without needing to import pandas (for IOT devices)
        return (
            json.dumps(c, default=json_serialize_datetime) if isinstance(c, (dict, list))
            else c.to_json(date_format='iso', date_unit='ns')
        )

    df = json.loads(df) if isinstance(df, str) else df

    ### TODO Make separate chunksize for API?
    _chunksize : Optional[int] = (1 if chunksize is None else (
        get_config('system', 'connectors', 'sql', 'chunksize') if chunksize == -1
        else chunksize
    ))
    keys : list = list(df.keys())
    chunks = []
    if hasattr(df, 'index'):
        rowcount = len(df)
        chunks = [df.iloc[i] for i in more_itertools.chunked(df.index, _chunksize)]
    elif isinstance(df, dict):
        ### `_chunks` is a dict of lists of dicts.
        ### e.g. {'a' : [ {'a':[1, 2]}, {'a':[3, 4]} ] }
        _chunks = {k: [] for k in keys}
        rowcount = len(df[keys[0]])
        for k in keys:
            if len(df[k]) != rowcount:
                return False, "Arrays must all be the same length."
            chunk_iter = more_itertools.chunked(df[k], _chunksize)
            for l in chunk_iter:
                _chunks[k].append({k: l})

        ### `chunks` is a list of dicts (e.g. orient by rows in pandas JSON).
        for k, l in _chunks.items():
            for i, c in enumerate(l):
                try:
                    chunks[i].update(c)
                except IndexError:
                    chunks.append(c)
    elif isinstance(df, list):
        chunks = [df[i] for i in more_itertools.chunked(df, _chunksize)]
        rowcount = len(chunks)

    ### Send columns in case the user has defined them locally.
    if pipe.columns:
        kw['columns'] = json.dumps(pipe.columns)
    r_url = pipe_r_url(pipe) + '/data'

    for i, c in enumerate(chunks):
        if debug:
            dprint(f"Posting chunk ({i + 1} / {len(chunks)}) to {r_url}...")
        json_str = get_json_str(c)

        try:
            response = self.post(
                r_url,
                ### handles check_existing
                params = kw,
                data = json_str,
                debug = debug
            )
        except Exception as e:
            warn(str(e))
            return False, str(e)
            
        if not response:
            return False, f"Failed to receive response. Response text: {response.text}"

        try:
            j = json.loads(response.text)
        except Exception as e:
            return False, str(e)

        if isinstance(j, dict) and 'detail' in j:
            return False, j['detail']

        try:
            j = tuple(j)
        except Exception as e:
            return False, response.text

        if debug:
            dprint("Received response: " + str(j))
        if not j[0]:
            return j

    len_chunks = len(chunks)

    success_tuple = True, (
        f"It took {round(time.time() - begin, 2)} seconds to sync {rowcount} row"
        + ('s' if rowcount != 1 else '')
        + f" across {len_chunks} chunk" + ('s' if len_chunks != 1 else '') +
        f" to {pipe}."
    )
    return success_tuple
def test_connection(self, **kw: Any) ‑> Optional[bool]

Test if a successful connection to the API may be made.

Expand source code
def test_connection(
        self,
        **kw: Any
    ) -> Union[bool, None]:
    """Test if a successful connection to the API may be made."""
    from meerschaum.connectors.poll import retry_connect
    _default_kw = {
        'max_retries': 1, 'retry_wait': 0, 'warn': False,
        'connector': self, 'enforce_chaining': False,
        'enforce_login': False,
    }
    _default_kw.update(kw)
    try:
        return retry_connect(**_default_kw)
    except Exception as e:
        return False
def wget(self, r_url: str, dest: Optional[Union[str, pathlib.Path]] = None, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Any) ‑> pathlib.Path

Mimic wget with requests.

Expand source code
def wget(
        self,
        r_url: str,
        dest: Optional[Union[str, pathlib.Path]] = None,
        headers: Optional[Dict[str, Any]] = None,
        use_token: bool = True,
        debug: bool = False,
        **kw: Any
    ) -> pathlib.Path:
    """Mimic wget with requests.
    """
    from meerschaum.utils.misc import wget
    from meerschaum.utils.debug import dprint
    if headers is None:
        headers = {}
    if use_token:
        if debug:
            dprint(f"Checking login token.")
        headers.update({'Authorization': f'Bearer {self.token}'})
    return wget(self.url + r_url, dest=dest, headers=headers, **kw)
class Connector (type: Optional[str] = None, label: Optional[str] = None, **kw: Any)

The base connector class to hold connection attributes,

Parameters

type : str
The type of the connector (e.g. meerschaum.connectors.sql, meerschaum.connectors.api, meerschaum.connectors.plugin).
label : str
The label for the connector.

Run mrsm edit config and to edit connectors in the YAML file:

meerschaum:
    connections:
        {type}:
            {label}:
                ### attributes go here
Expand source code
class Connector(metaclass=abc.ABCMeta):
    """
    The base connector class to hold connection attributes,
    """
    def __init__(
            self,
            type: Optional[str] = None,
            label: Optional[str] = None,
            **kw: Any
        ):
        """
        Parameters
        ----------
        type: str
            The `type` of the connector (e.g. `sql`, `api`, `plugin`).

        label: str
            The `label` for the connector.

        Run `mrsm edit config` and to edit connectors in the YAML file:

        ```
        meerschaum:
            connections:
                {type}:
                    {label}:
                        ### attributes go here
        ```

        """
        self._original_dict = copy.deepcopy(self.__dict__)
        self._set_attributes(type=type, label=label, **kw)
        self.verify_attributes(getattr(self, 'REQUIRED_ATTRIBUTES', None))

    def _reset_attributes(self):
        self.__dict__ = self._original_dict

    def _set_attributes(
            self,
            *args,
            inherit_default: bool = True,
            **kw: Any
        ):
        from meerschaum.config.static import STATIC_CONFIG
        from meerschaum.utils.warnings import error

        self._attributes = {}

        default_label = STATIC_CONFIG['connectors']['default_label']

        ### NOTE: Support the legacy method of explicitly passing the type.
        label = kw.get('label', None)
        if label is None:
            if len(args) == 2:
                label = args[1]
            elif len(args) == 0:
                label = None
            else:
                label = args[0]

        if label == 'default':
            error(
                f"Label cannot be 'default'. Did you mean '{default_label}'?",
                InvalidAttributesError,
            )
        self.__dict__['label'] = label

        from meerschaum.config import get_config
        conn_configs = copy.deepcopy(get_config('meerschaum', 'connectors'))
        connector_config = copy.deepcopy(get_config('system', 'connectors'))

        ### inherit attributes from 'default' if exists
        if inherit_default:
            inherit_from = 'default'
            if self.type in conn_configs and inherit_from in conn_configs[self.type]:
                _inherit_dict = copy.deepcopy(conn_configs[self.type][inherit_from])
                self._attributes.update(_inherit_dict)

        ### load user config into self._attributes
        if self.type in conn_configs and self.label in conn_configs[self.type]:
            self._attributes.update(conn_configs[self.type][self.label])

        ### load system config into self._sys_config
        ### (deep copy so future Connectors don't inherit changes)
        if self.type in connector_config:
            self._sys_config = copy.deepcopy(connector_config[self.type])

        ### add additional arguments or override configuration
        self._attributes.update(kw)

        ### finally, update __dict__ with _attributes.
        self.__dict__.update(self._attributes)


    def verify_attributes(
            self,
            required_attributes: Optional[List[str]] = None,
            debug: bool = False
        ) -> None:
        """
        Ensure that the required attributes have been met.
        
        The Connector base class checks the minimum requirements.
        Child classes may enforce additional requirements.

        Parameters
        ----------
        required_attributes: Optional[List[str]], default None
            Attributes to be verified. If `None`, default to `['label']`.

        debug: bool, default False
            Verbosity toggle.

        Returns
        -------
        Don't return anything.

        Raises
        ------
        An error if any of the required attributes are missing.
        """
        from meerschaum.utils.warnings import error, warn
        from meerschaum.utils.debug import dprint
        from meerschaum.utils.misc import items_str
        if required_attributes is None:
            required_attributes = ['label']
        missing_attributes = set()
        for a in required_attributes:
            if a not in self.__dict__:
                missing_attributes.add(a)
        if len(missing_attributes) > 0:
            error(
                (
                    f"Missing {items_str(list(missing_attributes))} "
                    + f"for connector '{self.type}:{self.label}'."
                ),
                InvalidAttributesError,
                silent = True,
                stack = False
            )


    def __str__(self):
        """
        When cast to a string, return type:label.
        """
        return f"{self.type}:{self.label}"

    def __repr__(self):
        """
        Represent the connector as type:label.
        """
        return str(self)

    @property
    def meta(self) -> Dict[str, Any]:
        """
        Return the keys needed to reconstruct this Connector.
        """
        _meta = {
            key: value
            for key, value in self.__dict__.items()
            if not str(key).startswith('_')
        }
        _meta.update({
            'type': self.type,
            'label': self.label,
        })
        return _meta


    @property
    def type(self) -> str:
        """
        Return the type for this connector.
        """
        _type = self.__dict__.get('type', None)
        if _type is None:
            import re
            _type = re.sub(r'connector$', '', self.__class__.__name__.lower())
            self.__dict__['type'] = _type
        return _type


    @property
    def label(self) -> str:
        """
        Return the label for this connector.
        """
        _label = self.__dict__.get('label', None)
        if _label is None:
            from meerschaum.config.static import STATIC_CONFIG
            _label = STATIC_CONFIG['connectors']['default_label']
            self.__dict__['label'] = _label
        return _label

Subclasses

Instance variables

var label : str

Return the label for this connector.

Expand source code
@property
def label(self) -> str:
    """
    Return the label for this connector.
    """
    _label = self.__dict__.get('label', None)
    if _label is None:
        from meerschaum.config.static import STATIC_CONFIG
        _label = STATIC_CONFIG['connectors']['default_label']
        self.__dict__['label'] = _label
    return _label
var meta : Dict[str, Any]

Return the keys needed to reconstruct this Connector.

Expand source code
@property
def meta(self) -> Dict[str, Any]:
    """
    Return the keys needed to reconstruct this Connector.
    """
    _meta = {
        key: value
        for key, value in self.__dict__.items()
        if not str(key).startswith('_')
    }
    _meta.update({
        'type': self.type,
        'label': self.label,
    })
    return _meta
var type : str

Return the type for this connector.

Expand source code
@property
def type(self) -> str:
    """
    Return the type for this connector.
    """
    _type = self.__dict__.get('type', None)
    if _type is None:
        import re
        _type = re.sub(r'connector$', '', self.__class__.__name__.lower())
        self.__dict__['type'] = _type
    return _type

Methods

def verify_attributes(self, required_attributes: Optional[List[str]] = None, debug: bool = False) ‑> None

Ensure that the required attributes have been met.

The Connector base class checks the minimum requirements. Child classes may enforce additional requirements.

Parameters

required_attributes : Optional[List[str]], default None
Attributes to be verified. If None, default to ['label'].
debug : bool, default False
Verbosity toggle.

Returns

Don't return anything.

Raises

An error if any of the required attributes are missing.

Expand source code
def verify_attributes(
        self,
        required_attributes: Optional[List[str]] = None,
        debug: bool = False
    ) -> None:
    """
    Ensure that the required attributes have been met.
    
    The Connector base class checks the minimum requirements.
    Child classes may enforce additional requirements.

    Parameters
    ----------
    required_attributes: Optional[List[str]], default None
        Attributes to be verified. If `None`, default to `['label']`.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    Don't return anything.

    Raises
    ------
    An error if any of the required attributes are missing.
    """
    from meerschaum.utils.warnings import error, warn
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.misc import items_str
    if required_attributes is None:
        required_attributes = ['label']
    missing_attributes = set()
    for a in required_attributes:
        if a not in self.__dict__:
            missing_attributes.add(a)
    if len(missing_attributes) > 0:
        error(
            (
                f"Missing {items_str(list(missing_attributes))} "
                + f"for connector '{self.type}:{self.label}'."
            ),
            InvalidAttributesError,
            silent = True,
            stack = False
        )
class SQLConnector (label: Optional[str] = None, flavor: Optional[str] = None, wait: bool = False, connect: bool = False, debug: bool = False, **kw: Any)

Connect to SQL databases via sqlalchemy.

SQLConnectors may be used as Meerschaum instance connectors. Read more about connectors and instances at https://meerschaum.io/reference/connectors/

Parameters

label : str, default 'main'
The identifying label for the connector. E.g. for sql:main, 'main' is the label. Defaults to 'main'.
flavor : Optional[str], default None
The database flavor, e.g. 'sqlite', 'postgresql', 'cockroachdb', etc. To see supported flavors, run the bootstrap connectors command.
wait : bool, default False
If True, block until a database connection has been made. Defaults to False.
connect : bool, default False
If True, immediately attempt to connect the database and raise a warning if the connection fails. Defaults to False.
debug : bool, default False
Verbosity toggle. Defaults to False.
kw : Any
All other arguments will be passed to the connector's attributes. Therefore, a connector may be made without being registered, as long enough parameters are supplied to the constructor.
Expand source code
class SQLConnector(Connector):
    """
    Connect to SQL databases via `sqlalchemy`.
    
    SQLConnectors may be used as Meerschaum instance connectors.
    Read more about connectors and instances at
    https://meerschaum.io/reference/connectors/

    """

    IS_INSTANCE: bool = True

    from ._create_engine import flavor_configs, create_engine
    from ._sql import read, value, exec, execute, to_sql, exec_queries
    from meerschaum.utils.sql import test_connection
    from ._fetch import fetch, get_pipe_metadef, get_pipe_backtrack_minutes
    from ._cli import cli
    from ._pipes import (
        fetch_pipes_keys,
        create_indices,
        drop_indices,
        get_create_index_queries,
        get_drop_index_queries,
        get_add_columns_queries,
        get_alter_columns_queries,
        delete_pipe,
        get_backtrack_data,
        get_pipe_data,
        get_pipe_data_query,
        register_pipe,
        edit_pipe,
        get_pipe_id,
        get_pipe_attributes,
        sync_pipe,
        sync_pipe_inplace,
        get_sync_time,
        pipe_exists,
        get_pipe_rowcount,
        drop_pipe,
        clear_pipe,
        get_pipe_table,
        get_pipe_columns_types,
        get_to_sql_dtype,
    )
    from ._plugins import (
        register_plugin,
        delete_plugin,
        get_plugin_id,
        get_plugin_version,
        get_plugins,
        get_plugin_user_id,
        get_plugin_username,
        get_plugin_attributes,
    )
    from ._users import (
        register_user,
        get_user_id,
        get_users,
        edit_user,
        delete_user,
        get_user_password_hash,
        get_user_type,
        get_user_attributes,
    )
    from ._uri import from_uri, parse_uri
    
    def __init__(
        self,
        label: Optional[str] = None,
        flavor: Optional[str] = None,
        wait: bool = False,
        connect: bool = False,
        debug: bool = False,
        **kw: Any
    ):
        """
        Parameters
        ----------
        label: str, default 'main'
            The identifying label for the connector.
            E.g. for `sql:main`, 'main' is the label.
            Defaults to 'main'.

        flavor: Optional[str], default None
            The database flavor, e.g.
            `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc.
            To see supported flavors, run the `bootstrap connectors` command.

        wait: bool, default False
            If `True`, block until a database connection has been made.
            Defaults to `False`.

        connect: bool, default False
            If `True`, immediately attempt to connect the database and raise
            a warning if the connection fails.
            Defaults to `False`.

        debug: bool, default False
            Verbosity toggle.
            Defaults to `False`.

        kw: Any
            All other arguments will be passed to the connector's attributes.
            Therefore, a connector may be made without being registered,
            as long enough parameters are supplied to the constructor.
        """
        if 'uri' in kw:
            uri = kw['uri']
            if uri.startswith('postgres://'):
                uri = uri.replace('postgres://', 'postgresql://', 1)
            kw['uri'] = uri
            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
            label = label or from_uri_params.get('label', None)
            from_uri_params.pop('label', None)
            kw.update(from_uri_params)

        ### set __dict__ in base class
        super().__init__(
            'sql',
            label = label or self.__dict__.get('label', None),
            **kw
        )

        if self.__dict__.get('flavor', None) == 'sqlite':
            self._reset_attributes()
            self._set_attributes(
                'sql',
                label = label,
                inherit_default = False,
                **kw
            )
            ### For backwards compatability reasons, set the path for sql:local if its missing.
            if self.label == 'local' and not self.__dict__.get('database', None):
                from meerschaum.config._paths import SQLITE_DB_PATH
                self.database = str(SQLITE_DB_PATH)

        ### ensure flavor and label are set accordingly
        if 'flavor' not in self.__dict__:
            if flavor is None and 'uri' not in self.__dict__:
                raise Exception(
                    f"    Missing flavor. Provide flavor as a key for '{self}'."
                )
            self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None)

        if self.flavor == 'postgres':
            self.flavor = 'postgresql'

        self._debug = debug
        ### Store the PID and thread at initialization
        ### so we can dispose of the Pool in child processes or threads.
        import os, threading
        self._pid = os.getpid()
        self._thread_ident = threading.current_thread().ident
        self._sessions = {}
        self._locks = {'_sessions': threading.RLock(), }

        ### verify the flavor's requirements are met
        if self.flavor not in self.flavor_configs:
            error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector")
        if not self.__dict__.get('uri'):
            self.verify_attributes(
                self.flavor_configs[self.flavor].get('requirements', set()),
                debug=debug,
            )

        if wait:
            from meerschaum.connectors.poll import retry_connect
            retry_connect(connector=self, debug=debug)

        if connect:
            if not self.test_connection(debug=debug):
                from meerschaum.utils.warnings import warn
                warn(f"Failed to connect with connector '{self}'!", stack=False)

    @property
    def Session(self):
        if '_Session' not in self.__dict__:
            if self.engine is None:
                return None

            from meerschaum.utils.packages import attempt_import
            sqlalchemy_orm = attempt_import('sqlalchemy.orm')
            session_factory = sqlalchemy_orm.sessionmaker(self.engine)
            self._Session = sqlalchemy_orm.scoped_session(session_factory)

        return self._Session

    @property
    def engine(self):
        import os, threading
        ### build the sqlalchemy engine
        if '_engine' not in self.__dict__:
            self._engine, self._engine_str = self.create_engine(include_uri=True)

        same_process = os.getpid() == self._pid
        same_thread = threading.current_thread().ident == self._thread_ident

        ### handle child processes
        if not same_process:
            self._pid = os.getpid()
            self._thread = threading.current_thread()
            from meerschaum.utils.warnings import warn
            warn(f"Different PID detected. Disposing of connections...")
            self._engine.dispose()

        ### handle different threads
        if not same_thread:
            pass

        return self._engine

    @property
    def DATABASE_URL(self) -> str:
        """
        Return the URI connection string (alias for `SQLConnector.URI`.
        """
        _ = self.engine
        return str(self._engine_str)

    @property
    def URI(self) -> str:
        """
        Return the URI connection string.
        """
        _ = self.engine
        return str(self._engine_str)

    @property
    def IS_THREAD_SAFE(self) -> str:
        """
        Return whether this connector may be multithreaded.
        """
        if self.flavor == 'duckdb':
            return False
        if self.flavor == 'sqlite':
            return ':memory:' not in self.URI
        return True

    @property
    def metadata(self):
        from meerschaum.utils.packages import attempt_import
        sqlalchemy = attempt_import('sqlalchemy')
        if '_metadata' not in self.__dict__:
            self._metadata = sqlalchemy.MetaData()
        return self._metadata

    @property
    def db(self) -> Optional[databases.Database]:
        from meerschaum.utils.packages import attempt_import
        databases = attempt_import('databases', lazy=False, install=True)
        url = self.DATABASE_URL
        if 'mysql' in url:
            url = url.replace('+pymysql', '')
        if '_db' not in self.__dict__:
            try:
                self._db = databases.Database(url)
            except KeyError:
                ### Likely encountered an unsupported flavor.
                from meerschaum.utils.warnings import warn
                self._db = None
        return self._db

    def __getstate__(self):
        return self.__dict__

    def __setstate__(self, d):
        self.__dict__.update(d)

    def __call__(self):
        return self

Ancestors

  • meerschaum.connectors.Connector.Connector

Class variables

var IS_INSTANCE : bool
var flavor_configs

Static methods

def from_uri(uri: str, label: Optional[str] = None, as_dict: bool = False) ‑> Union[SQLConnector, Dict[str, Union[str, int]]]

Create a new SQLConnector from a URI string.

Parameters

uri : str
The URI connection string.
label : Optional[str], default None
If provided, use this as the connector label. Otherwise use the determined database name.
as_dict : bool, default False
If True, return a dictionary of the keyword arguments necessary to create a new SQLConnector, otherwise create a new object.

Returns

A new SQLConnector object or a dictionary of attributes (if as_dict is True).

Expand source code
@classmethod
def from_uri(
        cls,
        uri: str,
        label: Optional[str] = None,
        as_dict: bool = False,
    ) -> Union[
        'meerschaum.connectors.SQLConnector',
        Dict[str, Union[str, int]],
    ]:
    """
    Create a new SQLConnector from a URI string.

    Parameters
    ----------
    uri: str
        The URI connection string.

    label: Optional[str], default None
        If provided, use this as the connector label.
        Otherwise use the determined database name.

    as_dict: bool, default False
        If `True`, return a dictionary of the keyword arguments
        necessary to create a new `SQLConnector`, otherwise create a new object.

    Returns
    -------
    A new SQLConnector object or a dictionary of attributes (if `as_dict` is `True`).
    """

    params = cls.parse_uri(uri)
    params['uri'] = uri
    flavor = params.get('flavor', None)
    if not flavor or flavor not in cls.flavor_configs:
        error(f"Invalid flavor '{flavor}' detected from the provided URI.")

    if 'database' not in params:
        error("Unable to determine the database from the provided URI.")

    if flavor in ('sqlite', 'duckdb'):
        if params['database'] == ':memory:':
            params['label'] = label or f'memory_{flavor}'
        else:
            params['label'] = label or params['database'].split(os.path.sep)[-1].lower()
    else:
        params['label'] = label or (
            (
                (params['username'] + '@' if 'username' in params else '')
                + params.get('host', '')
                + ('/' if 'host' in params else '')
                + params.get('database', '')
            ).lower()
        )

    return cls(**params) if not as_dict else params
def get_pipe_backtrack_minutes(pipe) ‑> Union[int, float]

Return the first available value for the following parameter keys:

  • fetch, backtrack_minutes
  • backtrack_minutes
Expand source code
@staticmethod
def get_pipe_backtrack_minutes(pipe) -> Union[int, float]:
    """
    Return the first available value for the following parameter keys:
    
    - fetch, backtrack_minutes
    - backtrack_minutes
    """
    if pipe.parameters.get('fetch', {}).get('backtrack_minutes', None):
        btm = pipe.parameters['fetch']['backtrack_minutes']
    elif pipe.parameters.get('backtrack_minutes', None):
        btm = pipe.parameters['backtrack_minutes']
    else:
        btm = 0
    return btm
def parse_uri(uri: str) ‑> Dict[str, Any]

Parse a URI string into a dictionary of parameters.

Parameters

uri : str
The database connection URI.

Returns

A dictionary of attributes.

Examples

>>> parse_uri('sqlite:////home/foo/bar.db')
{'database': '/home/foo/bar.db', 'flavor': 'sqlite'}
>>> parse_uri(
...     'mssql+pyodbc://sa:supersecureSECRETPASSWORD123!@localhost:1439'
...     + '/master?driver=ODBC+Driver+17+for+SQL+Server'
... )
{'host': 'localhost', 'database': 'master', 'username': 'sa',
'password': 'supersecureSECRETPASSWORD123!', 'port': 1439, 'flavor': 'mssql',
'driver': 'ODBC Driver 17 for SQL Server'}
>>>
Expand source code
@staticmethod
def parse_uri(uri: str) -> Dict[str, Any]:
    """
    Parse a URI string into a dictionary of parameters.

    Parameters
    ----------
    uri: str
        The database connection URI.

    Returns
    -------
    A dictionary of attributes.

    Examples
    --------
    >>> parse_uri('sqlite:////home/foo/bar.db')
    {'database': '/home/foo/bar.db', 'flavor': 'sqlite'}
    >>> parse_uri(
    ...     'mssql+pyodbc://sa:supersecureSECRETPASSWORD123!@localhost:1439'
    ...     + '/master?driver=ODBC+Driver+17+for+SQL+Server'
    ... )
    {'host': 'localhost', 'database': 'master', 'username': 'sa',
    'password': 'supersecureSECRETPASSWORD123!', 'port': 1439, 'flavor': 'mssql',
    'driver': 'ODBC Driver 17 for SQL Server'}
    >>> 
    """
    from urllib.parse import parse_qs, urlparse
    sqlalchemy = attempt_import('sqlalchemy')
    parser = sqlalchemy.engine.url.make_url
    params = parser(uri).translate_connect_args()
    params['flavor'] = uri.split(':')[0].split('+')[0]
    if params['flavor'] == 'postgres':
        params['flavor'] = 'postgresql'
    if '?' in uri:
        parsed_uri = urlparse(uri)
        for key, value in parse_qs(parsed_uri.query).items():
            params.update({key: value[0]})
    return params

Instance variables

var DATABASE_URL : str

Return the URI connection string (alias for SQLConnector.URI.

Expand source code
@property
def DATABASE_URL(self) -> str:
    """
    Return the URI connection string (alias for `SQLConnector.URI`.
    """
    _ = self.engine
    return str(self._engine_str)
var IS_THREAD_SAFE : str

Return whether this connector may be multithreaded.

Expand source code
@property
def IS_THREAD_SAFE(self) -> str:
    """
    Return whether this connector may be multithreaded.
    """
    if self.flavor == 'duckdb':
        return False
    if self.flavor == 'sqlite':
        return ':memory:' not in self.URI
    return True
var Session
Expand source code
@property
def Session(self):
    if '_Session' not in self.__dict__:
        if self.engine is None:
            return None

        from meerschaum.utils.packages import attempt_import
        sqlalchemy_orm = attempt_import('sqlalchemy.orm')
        session_factory = sqlalchemy_orm.sessionmaker(self.engine)
        self._Session = sqlalchemy_orm.scoped_session(session_factory)

    return self._Session
var URI : str

Return the URI connection string.

Expand source code
@property
def URI(self) -> str:
    """
    Return the URI connection string.
    """
    _ = self.engine
    return str(self._engine_str)
var db : Optional[databases.Database]
Expand source code
@property
def db(self) -> Optional[databases.Database]:
    from meerschaum.utils.packages import attempt_import
    databases = attempt_import('databases', lazy=False, install=True)
    url = self.DATABASE_URL
    if 'mysql' in url:
        url = url.replace('+pymysql', '')
    if '_db' not in self.__dict__:
        try:
            self._db = databases.Database(url)
        except KeyError:
            ### Likely encountered an unsupported flavor.
            from meerschaum.utils.warnings import warn
            self._db = None
    return self._db
var engine
Expand source code
@property
def engine(self):
    import os, threading
    ### build the sqlalchemy engine
    if '_engine' not in self.__dict__:
        self._engine, self._engine_str = self.create_engine(include_uri=True)

    same_process = os.getpid() == self._pid
    same_thread = threading.current_thread().ident == self._thread_ident

    ### handle child processes
    if not same_process:
        self._pid = os.getpid()
        self._thread = threading.current_thread()
        from meerschaum.utils.warnings import warn
        warn(f"Different PID detected. Disposing of connections...")
        self._engine.dispose()

    ### handle different threads
    if not same_thread:
        pass

    return self._engine
var metadata
Expand source code
@property
def metadata(self):
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')
    if '_metadata' not in self.__dict__:
        self._metadata = sqlalchemy.MetaData()
    return self._metadata

Methods

def clear_pipe(self, pipe: Pipe, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw) ‑> SuccessTuple

Delete a pipe's data within a bounded or unbounded interval without dropping the table.

Parameters

pipe : Pipe
The pipe to clear.
begin : Optional[datetime.datetime], default None
Beginning datetime. Inclusive.
end : Optional[datetime.datetime], default None
Ending datetime. Exclusive.
params : Optional[Dict[str, Any]], default None
See build_where().
Expand source code
def clear_pipe(
        self,
        pipe: meerschaum.Pipe,
        begin: Optional[datetime.datetime] = None,
        end: Optional[datetime.datetime] = None,
        params: Optional[Dict[str, Any]] = None,
        debug: bool = False,
        **kw
    ) -> SuccessTuple:
    """
    Delete a pipe's data within a bounded or unbounded interval without dropping the table.

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe to clear.
        
    begin: Optional[datetime.datetime], default None
        Beginning datetime. Inclusive.

    end: Optional[datetime.datetime], default None
         Ending datetime. Exclusive.

    params: Optional[Dict[str, Any]], default None
         See `meerschaum.utils.sql.build_where`.

    """
    if not pipe.exists(debug=debug):
        return True, f"{pipe} does not exist, so nothing was cleared."

    from meerschaum.utils.sql import sql_item_name, build_where, dateadd_str
    from meerschaum.utils.warnings import warn
    pipe_name = sql_item_name(pipe.target, self.flavor)

    if not pipe.columns.get('datetime', None):
        _dt = pipe.guess_datetime()
        dt_name = sql_item_name(_dt, self.flavor) if _dt else None
        is_guess = True
    else:
        _dt = pipe.get_columns('datetime')
        dt_name = sql_item_name(_dt, self.flavor)
        is_guess = False

    if begin is not None or end is not None:
        if is_guess:
            if _dt is None:
                warn(
                    f"No datetime could be determined for {pipe}."
                    + "\n    Ignoring datetime bounds...",
                    stack = False,
                )
                begin, end = None, None
            else:
                warn(
                    f"A datetime wasn't specified for {pipe}.\n"
                    + f"    Using column \"{_dt}\" for datetime bounds...",
                    stack = False,
                )

    valid_params = {}
    if params is not None:
        existing_cols = pipe.get_columns_types(debug=debug)
        valid_params = {k: v for k, v in params.items() if k in existing_cols}
    clear_query = (
        f"DELETE FROM {pipe_name}\nWHERE 1 = 1\n"
        + ('  AND ' + build_where(valid_params, self, with_where=False) if valid_params else '')
        + (
            f'  AND {dt_name} >= ' + dateadd_str(self.flavor, 'day', 0, begin)
            if begin is not None else ''
        ) + (
            f'  AND {dt_name} < ' + dateadd_str(self.flavor, 'day', 0, end)
            if end is not None else ''
        )
    )
    success = self.exec(clear_query, silent=True, debug=debug) is not None
    msg = "Success" if success else f"Failed to clear {pipe}."
    return success, msg
def cli(self, debug: bool = False) ‑> Tuple[bool, str]

Launch an interactive CLI for the SQLConnector's flavor.

Expand source code
def cli(
        self,
        debug : bool = False
    ) -> SuccessTuple:
    """Launch an interactive CLI for the SQLConnector's flavor."""
    from meerschaum.utils.packages import venv_exec, attempt_import
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.warnings import error
    import sys, subprocess, os

    if self.flavor not in flavor_clis:
        return False, f"No CLI available for flavor '{self.flavor}'."

    if self.flavor == 'duckdb':
        gadwall = attempt_import('gadwall', debug = debug, lazy=False)
        gadwall_shell = gadwall.Gadwall(self.database)
        try:
            gadwall_shell.cmdloop()
        except KeyboardInterrupt:
            pass
        return True, "Success"
    elif self.flavor == 'mssql':
        if 'DOTNET_SYSTEM_GLOBALIZATION_INVARIANT' not in os.environ:
            os.environ['DOTNET_SYSTEM_GLOBALIZATION_INVARIANT'] = '1'

    cli_name = flavor_clis[self.flavor]

    ### Install the CLI package and any dependencies.
    cli, cli_main = attempt_import(cli_name, (cli_name + '.main'), lazy=False, debug=debug)
    if cli_name in cli_deps:
        for dep in cli_deps[cli_name]:
            locals()[dep] = attempt_import(dep, lazy=False, warn=False, debug=debug)

    ### NOTE: The `DATABASE_URL` property must be initialized first in case the database is not
    ### yet defined (e.g. 'sql:local').
    cli_arg_str = self.DATABASE_URL
    if self.flavor in ('sqlite', 'duckdb'):
        cli_arg_str = str(self.database)

    ### Define the script to execute to launch the CLI.
    ### The `mssqlcli` script is manually written to avoid telemetry
    ### and because `main.cli()` is not defined.
    launch_cli = f"cli_main.cli(['{cli_arg_str}'])"
    if self.flavor == 'mssql':
        launch_cli = (
            "mssqlclioptionsparser, mssql_cli = attempt_import("
            + "'mssqlcli.mssqlclioptionsparser', 'mssqlcli.mssql_cli', lazy=False)\n"
            + "ms_parser = mssqlclioptionsparser.create_parser()\n"
            + f"ms_options = ms_parser.parse_args(['--server', 'tcp:{self.host},{self.port}', "
            + f"'--database', '{self.database}', "
            + f"'--username', '{self.username}', '--password', '{self.password}'])\n"
            + "ms_object = mssql_cli.MssqlCli(ms_options)\n"
            + "try:\n"
            + "    ms_object.connect_to_database()\n"
            + "    ms_object.run()\n"
            + "finally:\n"
            + "    ms_object.shutdown()"
        )
    elif self.flavor == 'duckdb':
        launch_cli = ()

    try:
        if debug:
            dprint(f'Launching CLI:\n{launch_cli}')
        exec(launch_cli)
        success, msg = True, 'Success'
    except Exception as e:
        success, msg = False, str(e)

    return success, msg
def create_engine(self, include_uri: bool = False, debug: bool = False, **kw) ‑> sqlalchemy.engine.Engine

Create a sqlalchemy engine by building the engine string.

Expand source code
def create_engine(
        self,
        include_uri: bool = False,
        debug: bool = False,
        **kw
    ) -> 'sqlalchemy.engine.Engine':
    """Create a sqlalchemy engine by building the engine string."""
    from meerschaum.utils.packages import attempt_import
    from meerschaum.utils.warnings import error, warn
    sqlalchemy = attempt_import('sqlalchemy')
    import urllib
    import copy
    ### Install and patch required drivers.
    if self.flavor in install_flavor_drivers:
        attempt_import(*install_flavor_drivers[self.flavor], debug=debug, lazy=False, warn=False)
    if self.flavor in require_patching_flavors:
        from meerschaum.utils.packages import determine_version, _monkey_patch_get_distribution
        import pathlib
        for install_name, import_name in require_patching_flavors[self.flavor]:
            pkg = attempt_import(
                import_name,
                debug = debug,
                lazy = False,
                warn = False
            )
            _monkey_patch_get_distribution(
                install_name, determine_version(pathlib.Path(pkg.__file__), venv='mrsm')
            )

    ### supplement missing values with defaults (e.g. port number)
    for a, value in flavor_configs[self.flavor]['defaults'].items():
        if a not in self.__dict__:
            self.__dict__[a] = value

    ### Verify that everything is in order.
    if self.flavor not in flavor_configs:
        error(f"Cannot create a connector with the flavor '{self.flavor}'.")

    _engine = flavor_configs[self.flavor].get('engine', None)
    _username = self.__dict__.get('username', None)
    _password = self.__dict__.get('password', None)
    _host = self.__dict__.get('host', None)
    _port = self.__dict__.get('port', None)
    _database = self.__dict__.get('database', None)
    _driver = self.__dict__.get('driver', None)
    if _driver is not None:
        ### URL-encode the driver if a space is detected.
        ### Not a bullet-proof solution, but this should work for most cases.
        _driver = urllib.parse.quote_plus(_driver) if ' ' in _driver else _driver
    _uri = self.__dict__.get('uri', None)

    ### Handle registering specific dialects (due to installing in virtual environments).
    if self.flavor in flavor_dialects:
        sqlalchemy.dialects.registry.register(*flavor_dialects[self.flavor])

    ### self._sys_config was deepcopied and can be updated safely
    if self.flavor in ("sqlite", "duckdb"):
        engine_str = f"{_engine}:///{_database}" if not _uri else _uri
        if 'create_engine' not in self._sys_config:
            self._sys_config['create_engine'] = {}
        if 'connect_args' not in self._sys_config['create_engine']:
            self._sys_config['create_engine']['connect_args'] = {}
        self._sys_config['create_engine']['connect_args'].update({"check_same_thread" : False})
    else:
        engine_str = (
            _engine + "://" + (_username if _username is not None else '') +
            ((":" + urllib.parse.quote_plus(_password)) if _password is not None else '') +
            "@" + _host + ((":" + str(_port)) if _port is not None else '') +
            (("/" + _database) if _database is not None else '')
            + (("?driver=" + _driver) if _driver is not None else '')
        ) if not _uri else _uri
    if debug:
        dprint(
            (
                (engine_str.replace(':' + _password, ':' + ('*' * len(_password))))
                    if _password is not None else engine_str
            ) + '\n' + f"{self._sys_config.get('create_engine', {}).get('connect_args', {})}"
        )

    _kw_copy = copy.deepcopy(kw)

    ### NOTE: Order of inheritance:
    ###       1. Defaults
    ###       2. System configuration
    ###       3. Connector configuration
    ###       4. Keyword arguments
    _create_engine_args = flavor_configs.get(self.flavor, {}).get('create_engine', {})
    def _apply_create_engine_args(update):
        if 'ALL' not in flavor_configs[self.flavor].get('omit_create_engine', {}):
            _create_engine_args.update(
                { k: v for k, v in update.items()
                    if 'omit_create_engine' not in flavor_configs[self.flavor]
                        or k not in flavor_configs[self.flavor].get('omit_create_engine')
                }
            )
    _apply_create_engine_args(self._sys_config.get('create_engine', {}))
    _apply_create_engine_args(self.__dict__.get('create_engine', {}))
    _apply_create_engine_args(_kw_copy)

    try:
        engine = sqlalchemy.create_engine(
            engine_str,
            ### I know this looks confusing, and maybe it's bad code,
            ### but it's simple. It dynamically parses the config string
            ### and splits it to separate the class name (QueuePool)
            ### from the module name (sqlalchemy.pool).
            poolclass    = getattr(
                attempt_import(
                    ".".join(self._sys_config['poolclass'].split('.')[:-1])
                ),
                self._sys_config['poolclass'].split('.')[-1]
            ),
            echo         = debug,
            **_create_engine_args
        )
    except Exception as e:
        warn(e)
        warn(f"Failed to create connector '{self}'.", stack=False)
        engine = None

    if include_uri:
        return engine, engine_str
    return engine
def create_indices(self, pipe: Pipe, indices: Optional[List[str]] = None, debug: bool = False) ‑> bool

Create a pipe's indices.

Expand source code
def create_indices(
        self,
        pipe: meerschaum.Pipe,
        indices: Optional[List[str]] = None,
        debug: bool = False
    ) -> bool:
    """
    Create a pipe's indices.
    """
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.debug import dprint
    if debug:
        dprint(f"Creating indices for {pipe}...")
    if not pipe.columns:
        warn(f"Unable to create indices for {pipe} without columns.", stack=False)
        return False
    ix_queries = {
        ix: queries
        for ix, queries in self.get_create_index_queries(pipe, debug=debug).items()
        if indices is None or ix in indices
    }
    success = True
    for ix, queries in ix_queries.items():
        ix_success = all(self.exec_queries(queries, debug=debug, silent=True))
        if not ix_success:
            success = False
            if debug:
                dprint(f"Failed to create index on column: {ix}")
    return success
def delete_pipe(self, pipe: Pipe, debug: bool = False) ‑> SuccessTuple

Delete a Pipe's registration and drop its table.

Expand source code
def delete_pipe(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False,
    ) -> SuccessTuple:
    """
    Delete a Pipe's registration and drop its table.
    """
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.sql import sql_item_name
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    ### try dropping first
    drop_tuple = pipe.drop(debug=debug)
    if not drop_tuple[0]:
        return drop_tuple

    if not pipe.id:
        return False, f"{pipe} is not registered."

    ### ensure pipes table exists
    from meerschaum.connectors.sql.tables import get_tables
    pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes']

    q = sqlalchemy.delete(pipes_tbl).where(pipes_tbl.c.pipe_id == pipe.id)
    if not self.exec(q, debug=debug):
        return False, f"Failed to delete registration for {pipe}."

    return True, "Success"
def delete_plugin(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False, **kw: Any) ‑> SuccessTuple

Delete a plugin from the plugins table.

Expand source code
def delete_plugin(
        self,
        plugin: 'meerschaum.core.Plugin',
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """Delete a plugin from the plugins table."""
    from meerschaum.utils.warnings import warn, error
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')
    from meerschaum.connectors.sql.tables import get_tables
    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']

    plugin_id = self.get_plugin_id(plugin, debug=debug)
    if plugin_id is None:
        return True, f"Plugin '{plugin}' was not registered."

    bind_variables = {
        'plugin_id' : plugin_id,
    }

    query = sqlalchemy.delete(plugins_tbl).where(plugins_tbl.c.plugin_id == plugin_id)
    result = self.exec(query, debug=debug)
    if result is None:
        return False, f"Failed to delete plugin '{plugin}'."
    return True, f"Successfully deleted plugin '{plugin}'."
def delete_user(self, user: meerschaum.core.User, debug: bool = False) ‑> SuccessTuple

Delete a user's record from the users table.

Expand source code
def delete_user(
        self,
        user: meerschaum.core.User,
        debug: bool = False
    ) -> SuccessTuple:
    """Delete a user's record from the users table."""
    ### ensure users table exists
    from meerschaum.connectors.sql.tables import get_tables
    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']
    plugins = get_tables(mrsm_instance=self, debug=debug)['plugins']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)

    if user_id is None:
        return False, f"User '{user.username}' is not registered and cannot be deleted."

    query = sqlalchemy.delete(users_tbl).where(users_tbl.c.user_id == user_id)

    result = self.exec(query, debug=debug)
    if result is None:
        return False, f"Failed to delete user '{user}'."

    query = sqlalchemy.delete(plugins).where(plugins.c.user_id == user_id)
    result = self.exec(query, debug=debug)
    if result is None:
        return False, f"Failed to delete plugins of user '{user}'."

    return True, f"Successfully deleted user '{user}'"
def drop_indices(self, pipe: Pipe, indices: Optional[List[str]] = None, debug: bool = False) ‑> bool

Drop a pipe's indices.

Expand source code
def drop_indices(
        self,
        pipe: meerschaum.Pipe,
        indices: Optional[List[str]] = None,
        debug: bool = False
    ) -> bool:
    """
    Drop a pipe's indices.
    """
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.debug import dprint
    if debug:
        dprint(f"Dropping indices for {pipe}...")
    if not pipe.columns:
        warn(f"Unable to drop indices for {pipe} without columns.", stack=False)
        return False
    ix_queries = {
        ix: queries
        for ix, queries in self.get_drop_index_queries(pipe, debug=debug).items()
        if indices is None or ix in indices
    }
    success = True
    for ix, queries in ix_queries.items():
        ix_success = all(self.exec_queries(queries, debug=debug, silent=True))
        if not ix_success:
            success = False
            if debug:
                dprint(f"Failed to drop index on column: {ix}")
    return success
def drop_pipe(self, pipe: Pipe, debug: bool = False, **kw) ‑> SuccessTuple

Drop a pipe's tables but maintain its registration.

Parameters

pipe : Pipe
The pipe to drop.
Expand source code
def drop_pipe(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False,
        **kw
    ) -> SuccessTuple:
    """
    Drop a pipe's tables but maintain its registration.

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe to drop.
        
    """
    from meerschaum.utils.sql import table_exists, sql_item_name
    success = True
    target, temp_target = pipe.target, '_' + pipe.target
    target_name, temp_name = (
        sql_item_name(target, self.flavor),
        sql_item_name(temp_target, self.flavor),
    )
    if table_exists(target, self, debug=debug):
        success = self.exec(f"DROP TABLE {target_name}", silent=True, debug=debug) is not None
    if table_exists(temp_target, self, debug=debug):
        success = (
            success
            and self.exec(f"DROP TABLE {temp_name}", silent=True, debug=debug) is not None
        )

    msg = "Success" if success else f"Failed to drop {pipe}."
    return success, msg
def edit_pipe(self, pipe: Pipe = None, patch: bool = False, debug: bool = False, **kw: Any) ‑> SuccessTuple

Persist a Pipe's parameters to its database.

Parameters

pipe : Pipe, default None
The pipe to be edited.
patch : bool, default False
If patch is True, update the existing parameters by cascading. Otherwise overwrite the parameters (default).
debug : bool, default False
Verbosity toggle.
Expand source code
def edit_pipe(
        self,
        pipe : meerschaum.Pipe = None,
        patch: bool = False,
        debug: bool = False,
        **kw : Any
    ) -> SuccessTuple:
    """
    Persist a Pipe's parameters to its database.

    Parameters
    ----------
    pipe: meerschaum.Pipe, default None
        The pipe to be edited.
    patch: bool, default False
        If patch is `True`, update the existing parameters by cascading.
        Otherwise overwrite the parameters (default).
    debug: bool, default False
        Verbosity toggle.
    """

    if pipe.id is None:
        return False, f"{pipe} is not registered and cannot be edited."

    from meerschaum.utils.debug import dprint
    from meerschaum.utils.packages import attempt_import
    from meerschaum.utils.sql import json_flavors
    if not patch:
        parameters = pipe.__dict__.get('_attributes', {}).get('parameters', {})
    else:
        from meerschaum import Pipe
        from meerschaum.config._patch import apply_patch_to_config
        original_parameters = Pipe(
            pipe.connector_keys, pipe.metric_key, pipe.location_key,
            mrsm_instance=pipe.instance_keys
        ).parameters
        parameters = apply_patch_to_config(
            original_parameters,
            pipe.parameters
        )

    ### ensure pipes table exists
    from meerschaum.connectors.sql.tables import get_tables
    pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes']

    import json
    sqlalchemy = attempt_import('sqlalchemy')

    values = {
        'parameters': (
            json.dumps(parameters)
            if self.flavor not in json_flavors
            else parameters
        ),
    }
    q = sqlalchemy.update(pipes_tbl).values(**values).where(
        pipes_tbl.c.pipe_id == pipe.id
    )

    result = self.exec(q, debug=debug)
    message = (
        f"Successfully edited {pipe}."
        if result is not None else f"Failed to edit {pipe}."
    )
    return (result is not None), message
def edit_user(self, user: meerschaum.core.User, debug: bool = False, **kw: Any) ‑> SuccessTuple

Update an existing user's metadata.

Expand source code
def edit_user(
        self,
        user: meerschaum.core.User,
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """Update an existing user's metadata."""
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')
    from meerschaum.connectors.sql.tables import get_tables
    from meerschaum.utils.sql import json_flavors
    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']

    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
    if user_id is None:
        return False, (
            f"User '{user.username}' does not exist. " +
            f"Register user '{user.username}' before editing."
        )
    user.user_id = user_id

    import json
    valid_tuple = valid_username(user.username)
    if not valid_tuple[0]:
        return valid_tuple

    bind_variables = {
        'user_id' : user_id,
        'username' : user.username,
    }
    if user.password != '':
        bind_variables['password_hash'] = user.password_hash
    if user.email != '':
        bind_variables['email'] = user.email
    if user.attributes is not None and user.attributes != {}:
        bind_variables['attributes'] = (
            json.dumps(user.attributes) if self.flavor in ('duckdb',)
            else user.attributes
        )
    if user.type != '':
        bind_variables['user_type'] = user.type

    query = (
        sqlalchemy
        .update(users_tbl)
        .values(**bind_variables)
        .where(users_tbl.c.user_id == user_id)
    )

    result = self.exec(query, debug=debug)
    if result is None:
        return False, f"Failed to edit user '{user}'."
    return True, f"Successfully edited user '{user}'."
def exec(self, query: str, *args: Any, silent: bool = False, debug: bool = False, commit: Optional[bool] = None, close: Optional[bool] = None, with_connection: bool = False, **kw: Any) ‑> Union[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.cursor.LegacyCursorResult, Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection], Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection], None]

Execute SQL code and return the sqlalchemy result, e.g. when calling stored procedures.

If inserting data, please use bind variables to avoid SQL injection!

Parameters

query : Union[str, List[str], Tuple[str]]
The query to execute. If query is a list or tuple, call self.exec_queries() instead.
args : Any
Arguments passed to sqlalchemy.engine.execute.
silent : bool, default False
If True, suppress warnings.
commit : Optional[bool], default None
If True, commit the changes after execution. Causes issues with flavors like 'mssql'. This does not apply if query is a list of strings.
close : Optional[bool], default None
If True, close the connection after execution. Causes issues with flavors like 'mssql'. This does not apply if query is a list of strings.
with_connection : bool, default False
If True, return a tuple including the connection object. This does not apply if query is a list of strings.

Returns

The sqlalchemy result object, or a tuple with the connection if with_connection is provided.

Expand source code
def exec(
        self,
        query: str,
        *args: Any,
        silent: bool = False,
        debug: bool = False,
        commit: Optional[bool] = None,
        close: Optional[bool] = None,
        with_connection: bool = False,
        **kw: Any
    ) -> Union[
            sqlalchemy.engine.result.resultProxy,
            sqlalchemy.engine.cursor.LegacyCursorResult,
            Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection],
            Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection],
            None
    ]:
    """
    Execute SQL code and return the `sqlalchemy` result, e.g. when calling stored procedures.
    
    If inserting data, please use bind variables to avoid SQL injection!

    Parameters
    ----------
    query: Union[str, List[str], Tuple[str]]
        The query to execute.
        If `query` is a list or tuple, call `self.exec_queries()` instead.

    args: Any
        Arguments passed to `sqlalchemy.engine.execute`.
        
    silent: bool, default False
        If `True`, suppress warnings.

    commit: Optional[bool], default None
        If `True`, commit the changes after execution.
        Causes issues with flavors like `'mssql'`.
        This does not apply if `query` is a list of strings.

    close: Optional[bool], default None
        If `True`, close the connection after execution.
        Causes issues with flavors like `'mssql'`.
        This does not apply if `query` is a list of strings.

    with_connection: bool, default False
        If `True`, return a tuple including the connection object.
        This does not apply if `query` is a list of strings.
    
    Returns
    -------
    The `sqlalchemy` result object, or a tuple with the connection if `with_connection` is provided.

    """
    if isinstance(query, (list, tuple)):
        return self.exec_queries(
            list(query),
            *args,
            silent = silent,
            debug = debug,
            **kw
        )

    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import("sqlalchemy")
    if debug:
        dprint("Executing query:\n" + f"{query}")

    _close = close if close is not None else (self.flavor != 'mssql')
    _commit = commit if commit is not None else (
        (self.flavor != 'mssql' or 'select' not in str(query).lower())
    )

    ### Select and Insert objects need to be compiled (SQLAlchemy 2.0.0+).
    if not hasattr(query, 'compile'):
        query = sqlalchemy.text(query)

    connection = self.engine.connect()
    transaction = connection.begin() if _commit else None
    try:
        result = connection.execute(query, *args, **kw)
        if _commit:
            transaction.commit()
    except Exception as e:
        if debug:
            dprint(f"Failed to execute query:\n\n{query}\n\n{e}")
        if not silent:
            warn(str(e))
        result = None
        if _commit:
            transaction.rollback()
    finally:
        if _close:
            connection.close()

        if with_connection:
            return result, connection

    return result
def exec_queries(self, queries: List[str], break_on_error: bool = False, silent: bool = False, debug: bool = False) ‑> List[sqlalchemy.engine.cursor.LegacyCursorResult]

Execute a list of queries in a single transaction.

Parameters

queries : List[str]
The queries in the transaction to be executed.
break_on_error : bool, default False
If True, stop executing when a query fails.
silent : bool, default False
If True, suppress warnings.

Returns

A list of SQLAlchemy results.

Expand source code
def exec_queries(
        self,
        queries: List[str],
        break_on_error: bool = False,
        silent: bool = False,
        debug: bool = False,
    ) -> List[sqlalchemy.engine.cursor.LegacyCursorResult]:
    """
    Execute a list of queries in a single transaction.

    Parameters
    ----------
    queries: List[str]
        The queries in the transaction to be executed.

    break_on_error: bool, default False
        If `True`, stop executing when a query fails.

    silent: bool, default False
        If `True`, suppress warnings.

    Returns
    -------
    A list of SQLAlchemy results.
    """
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    results = []
    with self.engine.begin() as connection:
        for query in queries:
            if debug:
                dprint(query)
            if isinstance(query, str):
                query = sqlalchemy.text(query)

            try:
                result = connection.execute(query)
            except Exception as e:
                msg = (f"Encountered error while executing:\n{e}")
                if not silent:
                    warn(msg)
                elif debug:
                    dprint(msg)
                result = None
            results.append(result)
            if result is None and break_on_error:
                break
    return results
def execute(self, *args: Any, **kw: Any) ‑> Optional[sqlalchemy.engine.result.resultProxy]

An alias for meerschaum.connectors.sql.SQLConnector.exec.

Expand source code
def execute(
        self,
        *args : Any,
        **kw : Any
    ) -> Optional[sqlalchemy.engine.result.resultProxy]:
    """
    An alias for `meerschaum.connectors.sql.SQLConnector.exec`.
    """
    return self.exec(*args, **kw)
def fetch(self, pipe: Pipe, begin: Union[datetime.datetime, str, None] = '', end: Union[datetime.datetime, str, None] = None, chunk_hook: Optional[Callable[[pd.DataFrame], Any]] = None, chunksize: Optional[int] = -1, workers: Optional[int] = None, debug: bool = False, **kw: Any) ‑> Union['pd.DataFrame', List[Any], None]

Execute the SQL definition and return a Pandas DataFrame.

Parameters

pipe : Pipe

The pipe object which contains the fetch metadata.

  • pipe.columns['datetime']: str
    • Name of the datetime column for the remote table.
  • pipe.parameters['fetch']: Dict[str, Any]
    • Parameters necessary to execute a query.
  • pipe.parameters['fetch']['definition']: str
    • Raw SQL query to execute to generate the pandas DataFrame.
  • pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
    • How many minutes before begin to search for data (optional).
begin : Union[datetime.datetime, str, None], default None
Most recent datatime to search for data. If backtrack_minutes is provided, subtract backtrack_minutes.
end : Union[datetime.datetime, str, None], default None
The latest datetime to search for data. If end is None, do not bound
chunk_hook : Callable[[pd.DataFrame], Any], default None
A function to pass to read() that accepts a Pandas DataFrame.
chunksize : Optional[int], default -1
How many rows to load into memory at once (when chunk_hook is provided). Otherwise the entire result set is loaded into memory.
workers : Optional[int], default None
How many threads to use when consuming the generator (when `chunk_hook is provided). Defaults to the number of cores.
debug : bool, default False
Verbosity toggle.

Returns

A pandas DataFrame or None. If chunk_hook is not None, return a list of the hook function's results.

Expand source code
def fetch(
        self,
        pipe: meerschaum.Pipe,
        begin: Union[datetime.datetime, str, None] = '',
        end: Union[datetime.datetime, str, None] = None,
        chunk_hook: Optional[Callable[[pd.DataFrame], Any]] = None,
        chunksize: Optional[int] = -1,
        workers: Optional[int] = None,
        debug: bool = False,
        **kw: Any
    ) -> Union['pd.DataFrame', List[Any], None]:
    """Execute the SQL definition and return a Pandas DataFrame.

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe object which contains the `fetch` metadata.
        
        - pipe.columns['datetime']: str
            - Name of the datetime column for the remote table.
        - pipe.parameters['fetch']: Dict[str, Any]
            - Parameters necessary to execute a query.
        - pipe.parameters['fetch']['definition']: str
            - Raw SQL query to execute to generate the pandas DataFrame.
        - pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
            - How many minutes before `begin` to search for data (*optional*).

    begin: Union[datetime.datetime, str, None], default None
        Most recent datatime to search for data.
        If `backtrack_minutes` is provided, subtract `backtrack_minutes`.

    end: Union[datetime.datetime, str, None], default None
        The latest datetime to search for data.
        If `end` is `None`, do not bound 

    chunk_hook: Callable[[pd.DataFrame], Any], default None
        A function to pass to `SQLConnector.read()` that accepts a Pandas DataFrame.

    chunksize: Optional[int], default -1
        How many rows to load into memory at once (when `chunk_hook` is provided).
        Otherwise the entire result set is loaded into memory.

    workers: Optional[int], default None
        How many threads to use when consuming the generator (when `chunk_hook is provided).
        Defaults to the number of cores.

    debug: bool, default False
        Verbosity toggle.
       
    Returns
    -------
    A pandas DataFrame or `None`.
    If `chunk_hook` is not None, return a list of the hook function's results.
    """
    meta_def = self.get_pipe_metadef(
        pipe,
        begin = begin,
        end = end,
        debug = debug,
        **kw
    )
    as_hook_results = chunk_hook is not None
    chunks = self.read(
        meta_def,
        chunk_hook = chunk_hook,
        as_hook_results = as_hook_results,
        chunksize = chunksize,
        workers = workers,
        debug = debug,
    )
    ### if sqlite, parse for datetimes
    if not as_hook_results and self.flavor == 'sqlite':
        from meerschaum.utils.misc import parse_df_datetimes
        ignore_cols = [
            col
            for col, dtype in pipe.dtypes.items()
            if 'datetime' not in str(dtype)
        ]
        return (
            parse_df_datetimes(
                chunk,
                ignore_cols = ignore_cols,
                debug = debug,
            )
            for chunk in chunks
        )
    return chunks
def fetch_pipes_keys(self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) ‑> Optional[List[Tuple[str, str, Optional[str]]]]

Return a list of tuples corresponding to the parameters provided.

Parameters

connector_keys : Optional[List[str]], default None
List of connector_keys to search by.
metric_keys : Optional[List[str]], default None
List of metric_keys to search by.
location_keys : Optional[List[str]], default None
List of location_keys to search by.
params : Optional[Dict[str, Any]], default None
Dictionary of additional parameters to search by. E.g. --params pipe_id:1
debug : bool, default False
Verbosity toggle.
Expand source code
def fetch_pipes_keys(
        self,
        connector_keys: Optional[List[str]] = None,
        metric_keys: Optional[List[str]] = None,
        location_keys: Optional[List[str]] = None,
        tags: Optional[List[str]] = None,
        params: Optional[Dict[str, Any]] = None,
        debug: bool = False
    ) -> Optional[List[Tuple[str, str, Optional[str]]]]:
    """
    Return a list of tuples corresponding to the parameters provided.

    Parameters
    ----------
    connector_keys: Optional[List[str]], default None
        List of connector_keys to search by.

    metric_keys: Optional[List[str]], default None
        List of metric_keys to search by.

    location_keys: Optional[List[str]], default None
        List of location_keys to search by.

    params: Optional[Dict[str, Any]], default None
        Dictionary of additional parameters to search by.
        E.g. `--params pipe_id:1`

    debug: bool, default False
        Verbosity toggle.
    """
    from meerschaum.utils.warnings import error
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.packages import attempt_import
    from meerschaum.utils.misc import separate_negation_values
    from meerschaum.utils.sql import OMIT_NULLSFIRST_FLAVORS, table_exists
    from meerschaum.config.static import STATIC_CONFIG
    sqlalchemy = attempt_import('sqlalchemy')
    import json
    from copy import deepcopy

    if connector_keys is None:
        connector_keys = []
    if metric_keys is None:
        metric_keys = []
    if location_keys is None:
        location_keys = []
    else:
        location_keys = [
            (lk if lk not in ('[None]', 'None', 'null') else None)
                for lk in location_keys
        ]
    if tags is None:
        tags = []

    if params is None:
        params = {}

    ### Add three primary keys to params dictionary
    ###   (separated for convenience of arguments).
    cols = {
        'connector_keys': connector_keys,
        'metric_key': metric_keys,
        'location_key': location_keys,
    }

    ### Make deep copy so we don't mutate this somewhere else.
    parameters = deepcopy(params)
    for col, vals in cols.items():
        ### Allow for IS NULL to be declared as a single-item list ([None]).
        if vals == [None]:
            vals = None
        if vals not in [[], ['*']]:
            parameters[col] = vals
    cols = {k: v for k, v in cols.items() if v != [None]}

    if not table_exists('pipes', self, debug=debug):
        return []

    from meerschaum.connectors.sql.tables import get_tables
    pipes_tbl = get_tables(mrsm_instance=self, create=False, debug=debug)['pipes']

    _params = {}
    for k, v in parameters.items():
        _v = json.dumps(v) if isinstance(v, dict) else v
        _params[k] = _v

    negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
    ### Parse regular params.
    ### If a param begins with '_', negate it instead.
    _where = [
        (
            (pipes_tbl.c[key] == val) if not str(val).startswith(negation_prefix)
            else (pipes_tbl.c[key] != key)
        ) for key, val in _params.items()
            if not isinstance(val, (list, tuple)) and key in pipes_tbl.c
    ]
    select_cols = (
        [pipes_tbl.c.connector_keys, pipes_tbl.c.metric_key, pipes_tbl.c.location_key]
        + ([pipes_tbl.c.parameters] if tags else [])
    )

    q = sqlalchemy.select(*select_cols).where(sqlalchemy.and_(True, *_where))

    ### Parse IN params and add OR IS NULL if None in list.
    for c, vals in cols.items():
        if not isinstance(vals, (list, tuple)) or not vals or not c in pipes_tbl.c:
            continue
        _in_vals, _ex_vals = separate_negation_values(vals)
        ### Include params (positive)
        q = (
            q.where(pipes_tbl.c[c].in_(_in_vals)) if None not in _in_vals
            else q.where(sqlalchemy.or_(pipes_tbl.c[c].in_(_in_vals), pipes_tbl.c[c].is_(None)))
        ) if _in_vals else q
        ### Exclude params (negative)
        q = q.where(pipes_tbl.c[c].not_in(_ex_vals)) if _ex_vals else q

    ### Finally, parse tags.
    _in_tags, _ex_tags = separate_negation_values(tags)
    ors = []
    for nt in _in_tags:
        ors.append(
            sqlalchemy.cast(
                pipes_tbl.c['parameters'],
                sqlalchemy.String,
            ).like(f'%"tags":%"{nt}"%')
        )
    q = q.where(sqlalchemy.and_(sqlalchemy.or_(*ors).self_group())) if ors else q
    ors = []
    for xt in _ex_tags:
        ors.append(
            sqlalchemy.cast(
                pipes_tbl.c['parameters'],
                sqlalchemy.String,
            ).not_like(f'%"tags":%"{xt}"%')
        )
    q = q.where(sqlalchemy.and_(sqlalchemy.or_(*ors).self_group())) if ors else q
    loc_asc = sqlalchemy.asc(pipes_tbl.c['location_key'])
    if self.flavor not in OMIT_NULLSFIRST_FLAVORS:
        loc_asc = sqlalchemy.nullsfirst(loc_asc)
    q = q.order_by(
        sqlalchemy.asc(pipes_tbl.c['connector_keys']),
        sqlalchemy.asc(pipes_tbl.c['metric_key']),
        loc_asc,
    )

    ### execute the query and return a list of tuples
    if debug:
        dprint(q.compile(compile_kwargs={'literal_binds': True}))
    try:
        rows = self.execute(q).fetchall()
    except Exception as e:
        error(str(e))

    _keys = [(row[0], row[1], row[2]) for row in rows]
    if not tags:
        return _keys
    ### Make 100% sure that the tags are correct.
    keys = []
    for row in rows:
        ktup = (row[0], row[1], row[2])
        _actual_tags = (
            json.loads(row[3]) if isinstance(row[3], str)
            else row[3]
        ).get('tags', [])
        for nt in _in_tags:
            if nt in _actual_tags:
                keys.append(ktup)
        for xt in _ex_tags:
            if xt in _actual_tags:
                keys.remove(ktup)
            else:
                keys.append(ktup)
    return keys
def get_add_columns_queries(self, pipe: mrsm.Pipe, df: Union[pd.DataFrame, Dict[str, str]], debug: bool = False) ‑> List[str]

Add new null columns of the correct type to a table from a dataframe.

Parameters

pipe : mrsm.Pipe
The pipe to be altered.
df : Union[pd.DataFrame, Dict[str, str]]
The pandas DataFrame which contains new columns. If a dictionary is provided, assume it maps columns to Pandas data types.

Returns

A list of the ALTER TABLE SQL query or queries to be executed on the provided connector.

Expand source code
def get_add_columns_queries(
        self,
        pipe: mrsm.Pipe,
        df: Union[pd.DataFrame, Dict[str, str]],
        debug: bool = False,
    ) -> List[str]:
    """
    Add new null columns of the correct type to a table from a dataframe.

    Parameters
    ----------
    pipe: mrsm.Pipe
        The pipe to be altered.

    df: Union[pd.DataFrame, Dict[str, str]]
        The pandas DataFrame which contains new columns.
        If a dictionary is provided, assume it maps columns to Pandas data types.

    Returns
    -------
    A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector.
    """
    if not pipe.exists(debug=debug):
        return []
    import copy
    from meerschaum.utils.sql import (
        get_pd_type,
        get_db_type,
        sql_item_name,
        SINGLE_ALTER_TABLE_FLAVORS,
    )
    from meerschaum.utils.misc import flatten_list
    table_obj = self.get_pipe_table(pipe, debug=debug)
    df_cols_types = (
        {
            col: str(typ)
            for col, typ in df.dtypes.items()
        }
        if not isinstance(df, dict)
        else copy.deepcopy(df)
    )
    if len(df) > 0 and not isinstance(df, dict):
        for col, typ in list(df_cols_types.items()):
            if typ != 'object':
                continue
            val = df.iloc[0][col]
            if isinstance(val, (dict, list)):
                df_cols_types[col] = 'json'
            elif isinstance(val, str):
                df_cols_types[col] = 'str'
    db_cols_types = {col: get_pd_type(str(typ.type)) for col, typ in table_obj.columns.items()}
    new_cols = set(df_cols_types) - set(db_cols_types)
    if not new_cols:
        return []

    new_cols_types = {
        col: get_db_type(
            df_cols_types[col],
            self.flavor
        ) for col in new_cols
    }

    alter_table_query = "ALTER TABLE " + sql_item_name(pipe.target, self.flavor)
    queries = []
    for col, typ in new_cols_types.items():
        add_col_query = "\nADD " + sql_item_name(col, self.flavor) + " " + typ + ","

        if self.flavor in SINGLE_ALTER_TABLE_FLAVORS:
            queries.append(alter_table_query + add_col_query[:-1])
        else:
            alter_table_query += add_col_query

    ### For most flavors, only one query is required.
    ### This covers SQLite which requires one query per column.
    if not queries:
        queries.append(alter_table_query[:-1])

    if self.flavor != 'duckdb':
        return queries

    ### NOTE: For DuckDB, we must drop and rebuild the indices.
    drop_index_queries = list(flatten_list(
        [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()]
    ))
    create_index_queries = list(flatten_list(
        [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()]
    ))

    return drop_index_queries + queries + create_index_queries
def get_alter_columns_queries(self, pipe: mrsm.Pipe, df: Union[pd.DataFrame, Dict[str, str]], debug: bool = False) ‑> List[str]

If we encounter a column of a different type, set the entire column to text.

Parameters

pipe : mrsm.Pipe
The pipe to be altered.
df : Union[pd.DataFrame, Dict[str, str]]
The pandas DataFrame which may contain altered columns. If a dict is provided, assume it maps columns to Pandas data types.

Returns

A list of the ALTER TABLE SQL query or queries to be executed on the provided connector.

Expand source code
def get_alter_columns_queries(
        self,
        pipe: mrsm.Pipe,
        df: Union[pd.DataFrame, Dict[str, str]],
        debug: bool = False,
    ) -> List[str]:
    """
    If we encounter a column of a different type, set the entire column to text.

    Parameters
    ----------
    pipe: mrsm.Pipe
        The pipe to be altered.

    df: Union[pd.DataFrame, Dict[str, str]]
        The pandas DataFrame which may contain altered columns.
        If a dict is provided, assume it maps columns to Pandas data types.

    Returns
    -------
    A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector.
    """
    if not pipe.exists(debug=debug):
        return []
    from meerschaum.utils.sql import get_pd_type, get_db_type, sql_item_name
    from meerschaum.utils.misc import flatten_list, generate_password
    table_obj = self.get_pipe_table(pipe, debug=debug)
    target = pipe.target
    session_id = generate_password(3)
    df_cols_types = (
        {col: str(typ) for col, typ in df.dtypes.items()}
        if not isinstance(df, dict) else df
    )
    db_cols_types = {col: get_pd_type(str(typ.type)) for col, typ in table_obj.columns.items()}
    altered_cols = [
        col for col, typ in df_cols_types.items()
        if typ.lower() != db_cols_types.get(col, 'object').lower()
        and db_cols_types.get(col, 'object') != 'object'
    ]
    if not altered_cols:
        return []

    text_type = get_db_type('str', self.flavor)
    altered_cols_types = {
        col: text_type
        for col in altered_cols
    }

    if self.flavor == 'sqlite':
        temp_table_name = '_' + session_id + '_' + target
        rename_query = (
            "ALTER TABLE "
            + sql_item_name(target, self.flavor)
            + " RENAME TO "
            + sql_item_name(temp_table_name, self.flavor)
        )
        create_query = (
            "CREATE TABLE "
            + sql_item_name(target, self.flavor)
            + " (\n"
        )
        for col_name, col_obj in table_obj.columns.items():
            create_query += (
                sql_item_name(col_name, self.flavor)
                + " "
                + (str(col_obj.type) if col_name not in altered_cols else text_type)
                + ",\n"
            )
        create_query = create_query[:-2] + "\n)"

        insert_query = (
            "INSERT INTO "
            + sql_item_name(target, self.flavor)
            + "\nSELECT\n"
        )
        for col_name, col_obj in table_obj.columns.items():
            new_col_str = (
                sql_item_name(col_name, self.flavor)
                if col_name not in altered_cols
                else f"CAST({sql_item_name(col_name, self.flavor)} AS {text_type})"
            )
            insert_query += new_col_str + ",\n"
        insert_query = insert_query[:-2] + f"\nFROM {sql_item_name(temp_table_name, self.flavor)}"

        drop_query = f"DROP TABLE {sql_item_name(temp_table_name, self.flavor)}"
        return [
            rename_query,
            create_query,
            insert_query,
            drop_query,
        ]

    queries = []
    if self.flavor == 'oracle':
        add_query = "ALTER TABLE " + sql_item_name(target, self.flavor)
        for col, typ in altered_cols_types.items():
            add_query += "\nADD " + sql_item_name(col + '_temp', self.flavor) + " " + typ + ","
        add_query = add_query[:-1]
        queries.append(add_query)

        populate_temp_query = "UPDATE " + sql_item_name(target, self.flavor)
        for col, typ in altered_cols_types.items():
            populate_temp_query += (
                "\nSET " + sql_item_name(col + '_temp', self.flavor)
                + ' = ' + sql_item_name(col, self.flavor) + ','
            )
        populate_temp_query = populate_temp_query[:-1]
        queries.append(populate_temp_query)

        set_old_cols_to_null_query = "UPDATE " + sql_item_name(target, self.flavor)
        for col, typ in altered_cols_types.items():
            set_old_cols_to_null_query += (
                "\nSET " + sql_item_name(col, self.flavor)
                + ' = NULL,'
            )
        set_old_cols_to_null_query = set_old_cols_to_null_query[:-1]
        queries.append(set_old_cols_to_null_query)

        alter_type_query = "ALTER TABLE " + sql_item_name(target, self.flavor)
        for col, typ in altered_cols_types.items():
            alter_type_query += (
                "\nMODIFY " + sql_item_name(col, self.flavor) + ' '
                + typ + ','
            )
        alter_type_query = alter_type_query[:-1]
        queries.append(alter_type_query)

        set_old_to_temp_query = "UPDATE " + sql_item_name(target, self.flavor)
        for col, typ in altered_cols_types.items():
            set_old_to_temp_query += (
                "\nSET " + sql_item_name(col, self.flavor)
                + ' = ' + sql_item_name(col + '_temp', self.flavor) + ','
            )
        set_old_to_temp_query = set_old_to_temp_query[:-1]
        queries.append(set_old_to_temp_query)

        drop_temp_query = "ALTER TABLE " + sql_item_name(target, self.flavor)
        for col, typ in altered_cols_types.items():
            drop_temp_query += (
                "\nDROP COLUMN " + sql_item_name(col + '_temp', self.flavor) + ','
            )
        drop_temp_query = drop_temp_query[:-1]
        queries.append(drop_temp_query)

        return queries


    query = "ALTER TABLE " + sql_item_name(target, self.flavor)
    for col, typ in altered_cols_types.items():
        alter_col_prefix = (
            'ALTER' if self.flavor not in ('mysql', 'mariadb', 'oracle')
            else 'MODIFY'
        )
        type_prefix = (
            '' if self.flavor in ('mssql', 'mariadb', 'mysql')
            else 'TYPE '
        )
        column_str = 'COLUMN' if self.flavor != 'oracle' else ''
        query += (
            f"\n{alter_col_prefix} {column_str} "
            + sql_item_name(col, self.flavor)
            + " " + type_prefix + typ + ","
        )

    query = query[:-1]
    queries.append(query)
    if self.flavor != 'duckdb':
        return queries

    drop_index_queries = list(flatten_list(
        [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()]
    ))
    create_index_queries = list(flatten_list(
        [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()]
    ))

    return drop_index_queries + queries + create_index_queries
def get_backtrack_data(self, pipe: Optional[Pipe] = None, backtrack_minutes: int = 0, begin: Optional[datetime.datetime] = None, params: Optional[Dict[str, Any]] = None, limit: Optional[int] = None, chunksize: Optional[int] = -1, debug: bool = False) ‑> Union[pandas.DataFrame, None]

Get the most recent backtrack_minutes' worth of data from a Pipe.

Parameters

pipe : meerschaum.Pipe:
The pipe to get data from.
backtrack_minutes : int, default 0
How far into the past to look for data.
begin : Optional[datetime.datetime], default None
Where to start traversing from. Defaults to None, which uses the get_sync_time() value.
params : Optional[Dict[str, Any]], default None
Additional parameters to filter by. See meerschaum.connectors.sql.build_where.
limit : Optional[int], default None
If specified, limit the number of rows retrieved to this value.
chunksize : Optional[int], default -1
The size of dataframe chunks to load into memory.
debug : bool, default False
Verbosity toggle.

Returns

A pd.DataFrame of backtracked data.

Expand source code
def get_backtrack_data(
        self,
        pipe: Optional[meerschaum.Pipe] = None,
        backtrack_minutes: int = 0,
        begin: Optional[datetime.datetime] = None,
        params: Optional[Dict[str, Any]] = None,
        limit: Optional[int] = None,
        chunksize: Optional[int] = -1,
        debug: bool = False
    ) -> Union[pandas.DataFrame, None]:
    """
    Get the most recent backtrack_minutes' worth of data from a Pipe.

    Parameters
    ----------
    pipe: meerschaum.Pipe:
        The pipe to get data from.

    backtrack_minutes: int, default 0
        How far into the past to look for data.

    begin: Optional[datetime.datetime], default None
        Where to start traversing from. Defaults to `None`, which uses the
        `meerschaum.Pipe.get_sync_time` value.

    params: Optional[Dict[str, Any]], default None
        Additional parameters to filter by.
        See `meerschaum.connectors.sql.build_where`.

    limit: Optional[int], default None
        If specified, limit the number of rows retrieved to this value.

    chunksize: Optional[int], default -1
        The size of dataframe chunks to load into memory.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `pd.DataFrame` of backtracked data.

    """
    import datetime
    from meerschaum.utils.warnings import error
    if pipe is None:
        error("Pipe must be provided.")
    if begin is None:
        begin = pipe.get_sync_time(debug=debug)

    return pipe.get_data(
        begin = begin,
        begin_add_minutes = (-1 * backtrack_minutes),
        order = 'desc',
        params = params,
        limit = limit,
        chunksize = chunksize,
        debug = debug,
    )
def get_create_index_queries(self, pipe: Pipe, debug: bool = False) ‑> Dict[str, List[str]]

Return a dictionary mapping columns to a CREATE INDEX or equivalent query.

Parameters

pipe : Pipe
The pipe to which the queries will correspond.

Returns

A dictionary of column names mapping to lists of queries.

Expand source code
def get_create_index_queries(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False,
    ) -> Dict[str, List[str]]:
    """
    Return a dictionary mapping columns to a `CREATE INDEX` or equivalent query.

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe to which the queries will correspond.

    Returns
    -------
    A dictionary of column names mapping to lists of queries.
    """
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.sql import sql_item_name, get_distinct_col_count
    from meerschaum.utils.warnings import warn
    from meerschaum.config import get_config
    index_queries = {}

    indices = pipe.get_indices()

    _datetime = pipe.get_columns('datetime', error=False)
    _datetime_type = pipe.dtypes.get(_datetime, 'datetime64[ns]')
    _datetime_name = sql_item_name(_datetime, self.flavor) if _datetime is not None else None
    _datetime_index_name = (
        sql_item_name(indices['datetime'], self.flavor) if indices.get('datetime', None)
        else None
    )
    _id = pipe.get_columns('id', error=False)
    _id_name = sql_item_name(_id, self.flavor) if _id is not None else None

    _id_index_name = sql_item_name(indices['id'], self.flavor) if indices.get('id') else None
    _pipe_name = sql_item_name(pipe.target, self.flavor)
    _create_space_partition = get_config('system', 'experimental', 'space')

    ### create datetime index
    if _datetime is not None:
        if self.flavor == 'timescaledb':
            _id_count = (
                get_distinct_col_count(_id, f"SELECT {_id_name} FROM {_pipe_name}", self)
                if (_id is not None and _create_space_partition) else None
            )
            chunk_time_interval = (
                pipe.parameters.get('chunk_time_interval', None)
                or
                ("INTERVAL '1 DAY'" if not 'int' in _datetime_type.lower() else '100000')
            )

            dt_query = (
                f"SELECT create_hypertable('{_pipe_name}', " +
                f"'{_datetime}', "
                + (
                    f"'{_id}', {_id_count}, " if (_id is not None and _create_space_partition)
                    else ''
                )
                + f'chunk_time_interval => {chunk_time_interval}, '
                + "migrate_data => true);"
            )
        else: ### mssql, sqlite, etc.
            dt_query = (
                f"CREATE INDEX {_datetime_index_name} "
                + f"ON {_pipe_name} ({_datetime_name})"
            )

        index_queries[_datetime] = [dt_query]

    ### create id index
    if _id_name is not None:
        if self.flavor == 'timescaledb':
            ### Already created indices via create_hypertable.
            id_query = (
                None if (_id is not None and _create_space_partition)
                else (
                    f"CREATE INDEX {_id_index_name} ON {_pipe_name} ({_id_name})"
                    if _id is not None
                    else None
                )
            )
            pass
        elif self.flavor == 'citus':
            id_query = [(
                f"CREATE INDEX {_id_index_name} "
                + f"ON {_pipe_name} ({_id_name});"
            ), (
                f"SELECT create_distributed_table('{_pipe_name}', '{_id}');"
            )]
        else: ### mssql, sqlite, etc.
            id_query = f"CREATE INDEX {_id_index_name} ON {_pipe_name} ({_id_name})"

        if id_query is not None:
            index_queries[_id] = [id_query]


    ### Create indices for other labels in `pipe.columns`.
    other_indices = {
        ix_key: ix_unquoted
        for ix_key, ix_unquoted in pipe.get_indices().items()
        if ix_key not in ('datetime', 'id')
    }
    for ix_key, ix_unquoted in other_indices.items():
        ix_name = sql_item_name(ix_unquoted, self.flavor)
        col = pipe.columns[ix_key]
        col_name = sql_item_name(col, self.flavor)
        index_queries[col] = [f"CREATE INDEX {ix_name} ON {_pipe_name} ({col_name})"]

    return index_queries
def get_drop_index_queries(self, pipe: Pipe, debug: bool = False) ‑> Dict[str, List[str]]

Return a dictionary mapping columns to a DROP INDEX or equivalent query.

Parameters

pipe : Pipe
The pipe to which the queries will correspond.

Returns

A dictionary of column names mapping to lists of queries.

Expand source code
def get_drop_index_queries(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False,
    ) -> Dict[str, List[str]]:
    """
    Return a dictionary mapping columns to a `DROP INDEX` or equivalent query.

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe to which the queries will correspond.

    Returns
    -------
    A dictionary of column names mapping to lists of queries.
    """
    if not pipe.exists(debug=debug):
        return {}
    from meerschaum.utils.sql import sql_item_name, table_exists, hypertable_queries
    drop_queries = {}
    indices = pipe.get_indices()
    pipe_name = sql_item_name(pipe.target, self.flavor)

    if self.flavor not in hypertable_queries:
        is_hypertable = False
    else:
        is_hypertable_query = hypertable_queries[self.flavor].format(table_name=pipe_name)
        is_hypertable = self.value(is_hypertable_query, silent=True, debug=debug) is not None

    if is_hypertable:
        nuke_queries = []
        temp_table = '_' + pipe.target + '_temp_migration'
        temp_table_name = sql_item_name(temp_table, self.flavor)

        if table_exists(temp_table, self, debug=debug):
            nuke_queries.append(f"DROP TABLE {temp_table_name}")
        nuke_queries += [
            f"SELECT * INTO {temp_table_name} FROM {pipe_name}",
            f"DROP TABLE {pipe_name}",
            f"ALTER TABLE {temp_table_name} RENAME TO {pipe_name}",
        ]
        nuke_ix_keys = ('datetime', 'id')
        nuked = False
        for ix_key in nuke_ix_keys:
            if ix_key in indices and not nuked:
                drop_queries[ix_key] = nuke_queries
                nuked = True

    drop_queries.update({
        ix_key: ["DROP INDEX " + sql_item_name(ix_unquoted, self.flavor)]
        for ix_key, ix_unquoted in indices.items()
        if ix_key not in drop_queries
    })
    return drop_queries
def get_pipe_attributes(self, pipe: Pipe, debug: bool = False) ‑> Dict[str, Any]

Get a Pipe's attributes dictionary.

Expand source code
def get_pipe_attributes(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False,
    ) -> Dict[str, Any]:
    """
    Get a Pipe's attributes dictionary.
    """
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.debug import dprint
    from meerschaum.connectors.sql.tables import get_tables
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    if pipe.get_id(debug=debug) is None:
        return {}

    pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes']

    try:
        q = sqlalchemy.select(pipes_tbl).where(pipes_tbl.c.pipe_id == pipe.id)
        if debug:
            dprint(q)
        attributes = (
            dict(self.exec(q, silent=True, debug=debug).first()._mapping)
            if self.flavor != 'duckdb'
            else self.read(q, debug=debug).to_dict(orient='records')[0]
        )
    except Exception as e:
        import traceback
        traceback.print_exc()
        warn(e)
        print(pipe)
        return {}

    ### handle non-PostgreSQL databases (text vs JSON)
    if not isinstance(attributes.get('parameters', None), dict):
        try:
            import json
            parameters = json.loads(attributes['parameters'])
            if isinstance(parameters, str) and parameters[0] == '{':
                parameters = json.loads(parameters)
            attributes['parameters'] = parameters
        except Exception as e:
            attributes['parameters'] = {}

    return attributes
def get_pipe_columns_types(self, pipe: Pipe, debug: bool = False) ‑> Optional[Dict[str, str]]

Get the pipe's columns and types.

Parameters

pipe : meerschaum.Pipe:
The pipe to get the columns for.

Returns

A dictionary of columns names (str) and types (str).

Examples

>>> conn.get_pipe_columns_types(pipe)
{
  'dt': 'TIMESTAMP WITHOUT TIMEZONE',
  'id': 'BIGINT',
  'val': 'DOUBLE PRECISION',
}
>>>
Expand source code
def get_pipe_columns_types(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False,
    ) -> Optional[Dict[str, str]]:
    """
    Get the pipe's columns and types.

    Parameters
    ----------
    pipe: meerschaum.Pipe:
        The pipe to get the columns for.
        
    Returns
    -------
    A dictionary of columns names (`str`) and types (`str`).

    Examples
    --------
    >>> conn.get_pipe_columns_types(pipe)
    {
      'dt': 'TIMESTAMP WITHOUT TIMEZONE',
      'id': 'BIGINT',
      'val': 'DOUBLE PRECISION',
    }
    >>> 
    """
    if not pipe.exists(debug=debug):
        return {}
    table_columns = {}
    try:
        pipe_table = self.get_pipe_table(pipe, debug=debug)
        if pipe_table is None:
            return {}
        for col in pipe_table.columns:
            table_columns[str(col.name)] = str(col.type)
    except Exception as e:
        import traceback
        traceback.print_exc()
        from meerschaum.utils.warnings import warn
        warn(e)
        table_columns = None

    return table_columns
def get_pipe_data(self, pipe: Optional[Pipe] = None, begin: Union[datetime.datetime, str, None] = None, end: Union[datetime.datetime, str, None] = None, params: Optional[Dict[str, Any]] = None, order: str = 'asc', limit: Optional[int] = None, begin_add_minutes: int = 0, end_add_minutes: int = 0, debug: bool = False, **kw: Any) ‑> Union[pd.DataFrame, None]

Access a pipe's data from the SQL instance.

Parameters

pipe : meerschaum.Pipe:
The pipe to get data from.
begin : Optional[datetime.datetime], default None
If provided, get rows newer than or equal to this value.
end : Optional[datetime.datetime], default None
If provided, get rows older than or equal to this value.
params : Optional[Dict[str, Any]], default None
Additional parameters to filter by. See meerschaum.connectors.sql.build_where.
order : Optional[str], default 'asc'
The selection order for all of the indices in the query. If None, omit the ORDER BY clause.
limit : Optional[int], default None
If specified, limit the number of rows retrieved to this value.
begin_add_minutes : int, default 0
The number of minutes to add to the begin datetime (i.e. DATEADD.
end_add_minutes : int, default 0
The number of minutes to add to the end datetime (i.e. DATEADD.
chunksize : Optional[int], default -1
The size of dataframe chunks to load into memory.
debug : bool, default False
Verbosity toggle.

Returns

A pd.DataFrame of the pipe's data.

Expand source code
def get_pipe_data(
        self,
        pipe: Optional[meerschaum.Pipe] = None,
        begin: Union[datetime.datetime, str, None] = None,
        end: Union[datetime.datetime, str, None] = None,
        params: Optional[Dict[str, Any]] = None,
        order: str = 'asc',
        limit: Optional[int] = None,
        begin_add_minutes: int = 0,
        end_add_minutes: int = 0,
        debug: bool = False,
        **kw: Any
    ) -> Union[pd.DataFrame, None]:
    """
    Access a pipe's data from the SQL instance.

    Parameters
    ----------
    pipe: meerschaum.Pipe:
        The pipe to get data from.

    begin: Optional[datetime.datetime], default None
        If provided, get rows newer than or equal to this value.

    end: Optional[datetime.datetime], default None
        If provided, get rows older than or equal to this value.

    params: Optional[Dict[str, Any]], default None
        Additional parameters to filter by.
        See `meerschaum.connectors.sql.build_where`.

    order: Optional[str], default 'asc'
        The selection order for all of the indices in the query.
        If `None`, omit the `ORDER BY` clause.

    limit: Optional[int], default None
        If specified, limit the number of rows retrieved to this value.

    begin_add_minutes: int, default 0
        The number of minutes to add to the `begin` datetime (i.e. `DATEADD`.

    end_add_minutes: int, default 0
        The number of minutes to add to the `end` datetime (i.e. `DATEADD`.

    chunksize: Optional[int], default -1
        The size of dataframe chunks to load into memory.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `pd.DataFrame` of the pipe's data.

    """
    import json
    from meerschaum.utils.sql import sql_item_name
    dtypes = pipe.dtypes
    if dtypes:
        if self.flavor == 'sqlite':
            if not pipe.columns.get('datetime', None):
                _dt = pipe.guess_datetime()
                dt = sql_item_name(_dt, self.flavor) if _dt else None
                is_guess = True
            else:
                _dt = pipe.get_columns('datetime')
                dt = sql_item_name(_dt, self.flavor)
                is_guess = False

            if _dt:
                dt_type = dtypes.get(_dt, 'object').lower()
                if 'datetime' not in dt_type:
                    if 'int' not in dt_type:
                        dtypes[_dt] = 'datetime64[ns]'
    existing_cols = pipe.get_columns_types(debug=debug)
    if existing_cols:
        dtypes = {col: typ for col, typ in dtypes.items() if col in existing_cols}
    query = self.get_pipe_data_query(
        pipe,
        begin = begin,
        end = end,
        params = params,
        order = order,
        limit = limit,
        begin_add_minutes = begin_add_minutes,
        end_add_minutes = end_add_minutes,
        debug = debug,
        **kw
    )
    df = self.read(
        query,
        debug = debug,
        **kw
    )
    if self.flavor == 'sqlite':
        from meerschaum.utils.misc import parse_df_datetimes
        from meerschaum.utils.packages import import_pandas
        pd = import_pandas()
        ### NOTE: We have to consume the iterator here to ensure that datetimes are parsed correctly
        df = (
            parse_df_datetimes(
                df,
                ignore_cols = [
                    col
                    for col, dtype in pipe.dtypes.items()
                    if 'datetime' not in str(dtype)
                ],
                debug = debug,
            ) if isinstance(df, pd.DataFrame) else (
                [
                    parse_df_datetimes(
                        c,
                        ignore_cols = [
                            col
                            for col, dtype in pipe.dtypes.items()
                            if 'datetime' not in str(dtype)
                        ],
                        debug = debug,
                    )
                    for c in df
                ]
            )
        )
        for col, typ in dtypes.items():
            if typ != 'json':
                continue
            df[col] = df[col].apply(lambda x: json.loads(x) if x is not None else x)
    return df
def get_pipe_data_query(self, pipe: Optional[Pipe] = None, begin: Union[datetime.datetime, str, None] = None, end: Union[datetime.datetime, str, None] = None, params: Optional[Dict[str, Any]] = None, order: str = 'asc', limit: Optional[int] = None, begin_add_minutes: int = 0, end_add_minutes: int = 0, replace_nulls: Optional[str] = None, debug: bool = False, **kw: Any) ‑> Union[str, None]

Return the SELECT query for retrieving a pipe's data from its instance.

Parameters

pipe : meerschaum.Pipe:
The pipe to get data from.
begin : Optional[datetime.datetime], default None
If provided, get rows newer than or equal to this value.
end : Optional[datetime.datetime], default None
If provided, get rows older than or equal to this value.
params : Optional[Dict[str, Any]], default None
Additional parameters to filter by. See meerschaum.connectors.sql.build_where.
order : Optional[str], default 'asc'
The selection order for all of the indices in the query. If None, omit the ORDER BY clause.
limit : Optional[int], default None
If specified, limit the number of rows retrieved to this value.
begin_add_minutes : int, default 0
The number of minutes to add to the begin datetime (i.e. DATEADD.
end_add_minutes : int, default 0
The number of minutes to add to the end datetime (i.e. DATEADD.
chunksize : Optional[int], default -1
The size of dataframe chunks to load into memory.
replace_nulls : Optional[str], default None
If provided, replace null values with this value.
debug : bool, default False
Verbosity toggle.

Returns

A SELECT query to retrieve a pipe's data.

Expand source code
def get_pipe_data_query(
        self,
        pipe: Optional[meerschaum.Pipe] = None,
        begin: Union[datetime.datetime, str, None] = None,
        end: Union[datetime.datetime, str, None] = None,
        params: Optional[Dict[str, Any]] = None,
        order: str = 'asc',
        limit: Optional[int] = None,
        begin_add_minutes: int = 0,
        end_add_minutes: int = 0,
        replace_nulls: Optional[str] = None,
        debug: bool = False,
        **kw: Any
    ) -> Union[str, None]:
    """
    Return the `SELECT` query for retrieving a pipe's data from its instance.

    Parameters
    ----------
    pipe: meerschaum.Pipe:
        The pipe to get data from.

    begin: Optional[datetime.datetime], default None
        If provided, get rows newer than or equal to this value.

    end: Optional[datetime.datetime], default None
        If provided, get rows older than or equal to this value.

    params: Optional[Dict[str, Any]], default None
        Additional parameters to filter by.
        See `meerschaum.connectors.sql.build_where`.

    order: Optional[str], default 'asc'
        The selection order for all of the indices in the query.
        If `None`, omit the `ORDER BY` clause.

    limit: Optional[int], default None
        If specified, limit the number of rows retrieved to this value.

    begin_add_minutes: int, default 0
        The number of minutes to add to the `begin` datetime (i.e. `DATEADD`.

    end_add_minutes: int, default 0
        The number of minutes to add to the `end` datetime (i.e. `DATEADD`.

    chunksize: Optional[int], default -1
        The size of dataframe chunks to load into memory.

    replace_nulls: Optional[str], default None
        If provided, replace null values with this value.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `SELECT` query to retrieve a pipe's data.
    """
    import json
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.misc import items_str
    from meerschaum.utils.sql import sql_item_name, dateadd_str
    from meerschaum.utils.packages import import_pandas
    from meerschaum.utils.warnings import warn
    pd = import_pandas()

    select_cols = "SELECT *"
    if replace_nulls:
        existing_cols = pipe.get_columns_types(debug=debug)
        if existing_cols:
            select_cols = "SELECT "
            for col in existing_cols:
                select_cols += (
                    f"\n    COALESCE({sql_item_name(col, self.flavor)}, "
                    + f"'{replace_nulls}') AS {sql_item_name(col, self.flavor)},"
                )
            select_cols = select_cols[:-1]
    query = f"{select_cols}\nFROM {sql_item_name(pipe.target, self.flavor)}"
    where = ""

    if order is not None:
        default_order = 'asc'
        if order not in ('asc', 'desc'):
            warn(f"Ignoring unsupported order '{order}'. Falling back to '{default_order}'.")
            order = default_order
        order = order.upper()

    existing_cols = pipe.get_columns_types(debug=debug)

    if not pipe.columns.get('datetime', None):
        _dt = pipe.guess_datetime()
        dt = sql_item_name(_dt, self.flavor) if _dt else None
        is_guess = True
    else:
        _dt = pipe.get_columns('datetime')
        dt = sql_item_name(_dt, self.flavor)
        is_guess = False

    quoted_indices = {
        key: sql_item_name(val, self.flavor)
        for key, val in pipe.columns.items()
        if val in existing_cols
    }

    if begin is not None or end is not None:
        if is_guess:
            if _dt is None:
                warn(
                    f"No datetime could be determined for {pipe}."
                    + "\n    Ignoring begin and end...",
                    stack = False,
                )
                begin, end = None, None
            else:
                warn(
                    f"A datetime wasn't specified for {pipe}.\n"
                    + f"    Using column \"{_dt}\" for datetime bounds...",
                    stack = False,
                )

    is_dt_bound = False
    if begin is not None and _dt in existing_cols:
        begin_da = dateadd_str(
            flavor = self.flavor,
            datepart = 'minute',
            number = begin_add_minutes,
            begin = begin
        )
        where += f"{dt} >= {begin_da}" + (" AND " if end is not None else "")
        is_dt_bound = True

    if end is not None and _dt in existing_cols:
        if 'int' in str(type(end)).lower() and end == begin:
            end += 1
        end_da = dateadd_str(
            flavor = self.flavor,
            datepart = 'minute',
            number = end_add_minutes,
            begin = end
        )
        where += f"{dt} < {end_da}"
        is_dt_bound = True

    if params is not None:
        from meerschaum.utils.sql import build_where
        valid_params = {k: v for k, v in params.items() if k in existing_cols}
        if valid_params:
            where += build_where(valid_params, self).replace(
                'WHERE', ('AND' if is_dt_bound else "")
            )

    if len(where) > 0:
        query += "\nWHERE " + where

    if order is not None:
        ### Sort by indices, starting with datetime.
        order_by = ""
        if quoted_indices:
            order_by += "\nORDER BY "
            if _dt and _dt in existing_cols:
                order_by += dt + ' ' + order + ','
            for key, quoted_col_name in quoted_indices.items():
                if key == 'datetime':
                    continue
                order_by += ' ' + quoted_col_name + ' ' + order + ','
            order_by = order_by[:-1]

        query += order_by

    if isinstance(limit, int):
        if self.flavor == 'mssql':
            query = f'SELECT TOP {limit} ' + query[len("SELECT *"):]
        elif self.flavor == 'oracle':
            query = f"SELECT * FROM (\n  {query}\n)\nWHERE ROWNUM = 1"
        else:
            query += f"\nLIMIT {limit}"
    
    if debug:
        to_print = (
            []
            + ([f"begin='{begin}'"] if begin else [])
            + ([f"end='{end}'"] if end else [])
            + ([f"params='{json.dumps(params)}'"] if params else [])
        )
        dprint("Getting pipe data with constraints: " + items_str(to_print, quotes=False))

    return query
def get_pipe_id(self, pipe: Pipe, debug: bool = False) ‑> Any

Get a Pipe's ID from the pipes table.

Expand source code
def get_pipe_id(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False,
    ) -> Any:
    """
    Get a Pipe's ID from the pipes table.
    """
    if pipe.temporary:
        return None
    from meerschaum.utils.packages import attempt_import
    import json
    sqlalchemy = attempt_import('sqlalchemy')
    from meerschaum.connectors.sql.tables import get_tables
    pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes']

    query = sqlalchemy.select(pipes_tbl.c.pipe_id).where(
        pipes_tbl.c.connector_keys == pipe.connector_keys
    ).where(
        pipes_tbl.c.metric_key == pipe.metric_key
    ).where(
        (pipes_tbl.c.location_key == pipe.location_key) if pipe.location_key is not None
        else pipes_tbl.c.location_key.is_(None)
    )
    _id = self.value(query, debug=debug, silent=pipe.temporary)
    if _id is not None:
        _id = int(_id)
    return _id
def get_pipe_metadef(self, pipe: Pipe, params: Optional[Dict[str, Any]] = None, begin: Union[datetime.datetime, str, None] = '', end: Union[datetime.datetime, str, None] = None, debug: bool = False, **kw: Any) ‑> Union[str, None]

Return a pipe's meta definition fetch query (definition with

params: Optional[Dict[str, Any]], default None Optional params dictionary to build the WHERE clause. See build_where().

begin: Union[datetime.datetime, str, None], default None Most recent datatime to search for data. If backtrack_minutes is provided, subtract backtrack_minutes.

end: Union[datetime.datetime, str, None], default None The latest datetime to search for data. If end is None, do not bound

debug: bool, default False Verbosity toggle.

Expand source code
def get_pipe_metadef(
        self,
        pipe: meerschaum.Pipe,
        params: Optional[Dict[str, Any]] = None,
        begin: Union[datetime.datetime, str, None] = '',
        end: Union[datetime.datetime, str, None] = None,
        debug: bool = False,
        **kw: Any
    ) -> Union[str, None]:
    """
    Return a pipe's meta definition fetch query (definition with 

    params: Optional[Dict[str, Any]], default None
        Optional params dictionary to build the `WHERE` clause.
        See `meerschaum.utils.sql.build_where`.

    begin: Union[datetime.datetime, str, None], default None
        Most recent datatime to search for data.
        If `backtrack_minutes` is provided, subtract `backtrack_minutes`.

    end: Union[datetime.datetime, str, None], default None
        The latest datetime to search for data.
        If `end` is `None`, do not bound 

    debug: bool, default False
        Verbosity toggle.
 
    """
    import datetime
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.warnings import warn, error
    from meerschaum.utils.sql import sql_item_name, dateadd_str, build_where
    from meerschaum.utils.misc import is_int
    from meerschaum.config import get_config

    definition = get_pipe_query(pipe)
    btm = self.get_pipe_backtrack_minutes(pipe) 

    if not pipe.columns.get('datetime', None):
        _dt = pipe.guess_datetime()
        dt_name = sql_item_name(_dt, self.flavor) if _dt else None
        is_guess = True
    else:
        _dt = pipe.get_columns('datetime')
        dt_name = sql_item_name(_dt, self.flavor)
        is_guess = False

    if begin is not None or end is not None:
        if is_guess:
            if _dt is None:
                warn(
                    f"Unable to determine a datetime column for {pipe}."
                    + "\n    Ignoring begin and end...",
                    stack = False,
                )
                begin, end = '', None
            else:
                warn(
                    f"A datetime wasn't specified for {pipe}.\n"
                    + f"    Using column \"{_dt}\" for datetime bounds...",
                    stack = False
                )


    if 'order by' in definition.lower() and 'over' not in definition.lower():
        error("Cannot fetch with an ORDER clause in the definition")

    begin = (
        begin if not (isinstance(begin, str) and begin == '')
        else pipe.get_sync_time(debug=debug)
    )
        
    da = None
    if dt_name:
        ### default: do not backtrack
        begin_da = dateadd_str(
            flavor=self.flavor, datepart='minute', number=(-1 * btm), begin=begin,
        ) if begin else None
        end_da = dateadd_str(
            flavor=self.flavor, datepart='minute', number=1, begin=end,
        ) if end else None

    meta_def = (
        _simple_fetch_query(pipe) if (
            (not (pipe.columns or {}).get('id', None))
            or (not get_config('system', 'experimental', 'join_fetch'))
        ) else _join_fetch_query(pipe, debug=debug, **kw)
    )

    has_where = 'where' in meta_def.lower()[meta_def.lower().rfind('definition'):]
    if dt_name and (begin_da or end_da):
        definition_dt_name = (
            dateadd_str(self.flavor, 'minute', 0, f"definition.{dt_name}")
            if not is_int((begin_da or end_da))
            else f"definition.{dt_name}"
        )
        meta_def += "\n" + ("AND" if has_where else "WHERE") + " "
        has_where = True
        if begin_da:
            meta_def += f"{definition_dt_name} >= {begin_da}"
        if begin_da and end_da:
            meta_def += " AND "
        if end_da:
            meta_def += f"{definition_dt_name} < {end_da}"

    if params is not None:
        params_where = build_where(params, self, with_where=False)
        meta_def += "\n" + ("AND" if has_where else "WHERE") + " "
        has_where = True
        meta_def += params_where

    return meta_def
def get_pipe_rowcount(self, pipe: Pipe, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, remote: bool = False, params: Optional[Dict[str, Any]] = None, debug: bool = False) ‑> int

Get the rowcount for a pipe in accordance with given parameters.

Parameters

pipe : Pipe
The pipe to query with.
begin : Optional[datetime.datetime], default None
The beginning datetime value.
end : Optional[datetime.datetime], default None
The beginning datetime value.
remote : bool, default False
If True, get the rowcount for the remote table.
params : Optional[Dict[str, Any]], default None
See build_where().
debug : bool, default False
Verbosity toggle.

Returns

An int for the number of rows if the pipe exists, otherwise None.

Expand source code
def get_pipe_rowcount(
        self,
        pipe: meerschaum.Pipe,
        begin: Optional[datetime.datetime] = None,
        end: Optional[datetime.datetime] = None,
        remote: bool = False,
        params: Optional[Dict[str, Any]] = None,
        debug: bool = False
    ) -> int:
    """
    Get the rowcount for a pipe in accordance with given parameters.

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe to query with.
        
    begin: Optional[datetime.datetime], default None
        The beginning datetime value.

    end: Optional[datetime.datetime], default None
        The beginning datetime value.

    remote: bool, default False
        If `True`, get the rowcount for the remote table.

    params: Optional[Dict[str, Any]], default None
        See `meerschaum.utils.sql.build_where`.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    An `int` for the number of rows if the `pipe` exists, otherwise `None`.

    """
    from meerschaum.utils.sql import dateadd_str, sql_item_name
    from meerschaum.utils.warnings import error, warn
    from meerschaum.connectors.sql._fetch import get_pipe_query
    if remote:
        msg = f"'fetch:definition' must be an attribute of {pipe} to get a remote rowcount."
        if 'fetch' not in pipe.parameters:
            error(msg)
            return None
        if 'definition' not in pipe.parameters['fetch']:
            error(msg)
            return None

    _pipe_name = sql_item_name(pipe.target, self.flavor)

    if not pipe.columns.get('datetime', None):
        _dt = pipe.guess_datetime()
        dt = sql_item_name(_dt, self.flavor) if _dt else None
        is_guess = True
    else:
        _dt = pipe.get_columns('datetime')
        dt = sql_item_name(_dt, self.flavor)
        is_guess = False

    if begin is not None or end is not None:
        if is_guess:
            if _dt is None:
                warn(
                    f"No datetime could be determined for {pipe}."
                    + "\n    Ignoring begin and end...",
                    stack = False,
                )
                begin, end = None, None
            else:
                warn(
                    f"A datetime wasn't specified for {pipe}.\n"
                    + f"    Using column \"{_dt}\" for datetime bounds...",
                    stack = False,
                )


    _datetime_name = sql_item_name(
        _dt,
        pipe.instance_connector.flavor if not remote else pipe.connector.flavor
    )
    _cols_names = [
        sql_item_name(col, pipe.instance_connector.flavor if not remote else pipe.connector.flavor)
        for col in set(
            ([_dt] if _dt else [])
            + ([] if params is None else list(params.keys()))
        )
    ]
    if not _cols_names:
        _cols_names = ['*']

    src = (
        f"SELECT {', '.join(_cols_names)} FROM {_pipe_name}"
        if not remote else get_pipe_query(pipe)
    )
    query = (
        f"""
        WITH src AS ({src})
        SELECT COUNT(*)
        FROM src
        """
    ) if self.flavor not in ('mysql', 'mariadb') else (
        f"""
        SELECT COUNT(*)
        FROM ({src}) AS src
        """
    )
    if begin is not None or end is not None:
        query += "WHERE"
    if begin is not None:
        query += f"""
        {dt} >= {dateadd_str(self.flavor, datepart='minute', number=0, begin=begin)}
        """
    if end is not None and begin is not None:
        query += "AND"
    if end is not None:
        query += f"""
        {dt} < {dateadd_str(self.flavor, datepart='minute', number=0, begin=end)}
        """
    if params is not None:
        from meerschaum.utils.sql import build_where
        existing_cols = pipe.get_columns_types(debug=debug)
        valid_params = {k: v for k, v in params.items() if k in existing_cols}
        if valid_params:
            query += build_where(valid_params, self).replace('WHERE', (
                'AND' if (begin is not None or end is not None)
                    else 'WHERE'
                )
            )
        
    result = self.value(query, debug=debug, silent=True)
    try:
        return int(result)
    except Exception as e:
        return None
def get_pipe_table(self, pipe: Pipe, debug: bool = False) ‑> sqlalchemy.Table

Return the sqlalchemy.Table object for a Pipe.

Parameters

pipe : meerschaum.Pipe:
The pipe in question.

Returns

A sqlalchemy.Table object.

Expand source code
def get_pipe_table(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False,
    ) -> sqlalchemy.Table:
    """
    Return the `sqlalchemy.Table` object for a `meerschaum.Pipe`.

    Parameters
    ----------
    pipe: meerschaum.Pipe:
        The pipe in question.
        

    Returns
    -------
    A `sqlalchemy.Table` object. 

    """
    from meerschaum.utils.sql import get_sqlalchemy_table
    if not pipe.exists(debug=debug):
        return None
    return get_sqlalchemy_table(pipe.target, connector=self, debug=debug, refresh=True)
def get_plugin_attributes(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False) ‑> Dict[str, Any]

Return the attributes of a plugin.

Expand source code
def get_plugin_attributes(
        self,
        plugin: 'meerschaum.core.Plugin',
        debug: bool = False
    ) -> Dict[str, Any]:
    """
    Return the attributes of a plugin.
    """
    ### ensure plugins table exists
    from meerschaum.connectors.sql.tables import get_tables
    import json
    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    query = (
        sqlalchemy
        .select(plugins_tbl.c.attributes)
        .where(plugins_tbl.c.plugin_name == plugin.name)
    )

    _attr = self.value(query, debug=debug)
    if isinstance(_attr, str):
        _attr = json.loads(_attr)
    elif _attr is None:
        _attr = {}
    return _attr
def get_plugin_id(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False) ‑> Optional[int]

Return a plugin's ID.

Expand source code
def get_plugin_id(
        self,
        plugin: 'meerschaum.core.Plugin',
        debug: bool = False
    ) -> Optional[int]:
    """
    Return a plugin's ID.
    """
    ### ensure plugins table exists
    from meerschaum.connectors.sql.tables import get_tables
    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    query = (
        sqlalchemy
        .select(plugins_tbl.c.plugin_id)
        .where(plugins_tbl.c.plugin_name == plugin.name)
    )
    
    try:
        return int(self.value(query, debug=debug))
    except Exception as e:
        return None
def get_plugin_user_id(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False) ‑> Optional[int]

Return a plugin's user ID.

Expand source code
def get_plugin_user_id(
        self,
        plugin: 'meerschaum.core.Plugin',
        debug: bool = False
    ) -> Optional[int]:
    """
    Return a plugin's user ID.
    """
    ### ensure plugins table exists
    from meerschaum.connectors.sql.tables import get_tables
    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    query = (
        sqlalchemy
        .select(plugins_tbl.c.user_id)
        .where(plugins_tbl.c.plugin_name == plugin.name)
    )

    try:
        return int(self.value(query, debug=debug))
    except Exception as e:
        return None
def get_plugin_username(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False) ‑> Optional[str]

Return the username of a plugin's owner.

Expand source code
def get_plugin_username(
        self,
        plugin: 'meerschaum.core.Plugin',
        debug: bool = False
    ) -> Optional[str]:
    """
    Return the username of a plugin's owner.
    """
    ### ensure plugins table exists
    from meerschaum.connectors.sql.tables import get_tables
    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
    users = get_tables(mrsm_instance=self, debug=debug)['users']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    query = (
        sqlalchemy.select(users.c.username)
        .where(
            users.c.user_id == plugins_tbl.c.user_id
            and plugins_tbl.c.plugin_name == plugin.name
        )
    )

    return self.value(query, debug=debug)
def get_plugin_version(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False) ‑> Optional[str]

Return a plugin's version.

Expand source code
def get_plugin_version(
        self,
        plugin: 'meerschaum.core.Plugin',
        debug: bool = False
    ) -> Optional[str]:
    """
    Return a plugin's version.
    """
    ### ensure plugins table exists
    from meerschaum.connectors.sql.tables import get_tables
    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    query = sqlalchemy.select(plugins_tbl.c.version).where(plugins_tbl.c.plugin_name == plugin.name)

    return self.value(query, debug=debug)
def get_plugins(self, user_id: Optional[int] = None, search_term: Optional[str] = None, debug: bool = False, **kw: Any) ‑> List[str]

Return a list of all registered plugins.

Parameters

user_id : Optional[int], default None
If specified, filter plugins by a specific user_id.
search_term : Optional[str], default None
If specified, add a WHERE plugin_name LIKE '{search_term}%' clause to filter the plugins.

Returns

A list of plugin names.

Expand source code
def get_plugins(
        self,
        user_id: Optional[int] = None,
        search_term: Optional[str] = None,
        debug: bool = False,
        **kw: Any
    ) -> List[str]:
    """
    Return a list of all registered plugins.

    Parameters
    ----------
    user_id: Optional[int], default None
        If specified, filter plugins by a specific `user_id`.

    search_term: Optional[str], default None
        If specified, add a `WHERE plugin_name LIKE '{search_term}%'` clause to filter the plugins.


    Returns
    -------
    A list of plugin names.
    """
    ### ensure plugins table exists
    from meerschaum.connectors.sql.tables import get_tables
    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    query = sqlalchemy.select(plugins_tbl.c.plugin_name)
    if user_id is not None:
        query = query.where(plugins_tbl.c.user_id == user_id)
    if search_term is not None:
        query = query.where(plugins_tbl.c.plugin_name.like(search_term + '%'))

    return [row[0] for row in self.execute(query).fetchall()]
def get_sync_time(self, pipe: "'Pipe'", params: Optional[Dict[str, Any]] = None, newest: bool = True, round_down: bool = True, debug: bool = False) ‑> 'datetime.datetime'

Get a Pipe's most recent datetime.

Parameters

pipe : Pipe
The pipe to get the sync time for.
params : Optional[Dict[str, Any]], default None
Optional params dictionary to build the WHERE clause. See build_where().
newest : bool, default True
If True, get the most recent datetime (honoring params). If False, get the oldest datetime (ASC instead of DESC).
round_down : bool, default True
If True, round the resulting datetime value down to the nearest minute. Defaults to True.

Returns

A datetime.datetime object if the pipe exists, otherwise None.

Expand source code
def get_sync_time(
        self,
        pipe: 'meerschaum.Pipe',
        params: Optional[Dict[str, Any]] = None,
        newest: bool = True,
        round_down: bool = True,
        debug: bool = False,
    ) -> 'datetime.datetime':
    """Get a Pipe's most recent datetime.

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe to get the sync time for.

    params: Optional[Dict[str, Any]], default None
        Optional params dictionary to build the `WHERE` clause.
        See `meerschaum.utils.sql.build_where`.

    newest: bool, default True
        If `True`, get the most recent datetime (honoring `params`).
        If `False`, get the oldest datetime (ASC instead of DESC).

    round_down: bool, default True
        If `True`, round the resulting datetime value down to the nearest minute.
        Defaults to `True`.

    Returns
    -------
    A `datetime.datetime` object if the pipe exists, otherwise `None`.
    """
    from meerschaum.utils.sql import sql_item_name, build_where
    from meerschaum.utils.warnings import warn
    import datetime
    table = sql_item_name(pipe.target, self.flavor)

    dt_col = pipe.columns.get('datetime', None)
    dt_type = pipe.dtypes.get(dt_col, 'datetime64[ns]')
    if not dt_col:
        _dt = pipe.guess_datetime()
        dt = sql_item_name(_dt, self.flavor) if _dt else None
        is_guess = True
    else:
        _dt = dt_col
        dt = sql_item_name(_dt, self.flavor)
        is_guess = False

    if _dt is None:
        return None

    ASC_or_DESC = "DESC" if newest else "ASC"
    existing_cols = pipe.get_columns_types(debug=debug)
    valid_params = {}
    if params is not None:
        valid_params = {k: v for k, v in params.items() if k in existing_cols}

    ### If no bounds are provided for the datetime column,
    ### add IS NOT NULL to the WHERE clause.
    if _dt not in valid_params:
        valid_params[_dt] = '_None'
    where = "" if not valid_params else build_where(valid_params, self)
    q = f"SELECT {dt}\nFROM {table}{where}\nORDER BY {dt} {ASC_or_DESC}\nLIMIT 1"
    if self.flavor == 'mssql':
        q = f"SELECT TOP 1 {dt}\nFROM {table}{where}\nORDER BY {dt} {ASC_or_DESC}"
    elif self.flavor == 'oracle':
        q = (
            "SELECT * FROM (\n"
            + f"    SELECT {dt}\nFROM {table}{where}\n    ORDER BY {dt} {ASC_or_DESC}\n"
            + ") WHERE ROWNUM = 1"
        )

    try:
        from meerschaum.utils.misc import round_time
        import datetime
        db_time = self.value(q, silent=True, debug=debug)

        ### No datetime could be found.
        if db_time is None:
            return None
        ### sqlite returns str.
        if isinstance(db_time, str):
            from meerschaum.utils.packages import attempt_import
            dateutil_parser = attempt_import('dateutil.parser')
            st = dateutil_parser.parse(db_time)
        ### Do nothing if a datetime object is returned.
        elif isinstance(db_time, datetime.datetime):
            if hasattr(db_time, 'to_pydatetime'):
                st = db_time.to_pydatetime()
            else:
                st = db_time
        ### Sometimes the datetime is actually a date.
        elif isinstance(db_time, datetime.date):
            st = datetime.datetime.combine(db_time, datetime.datetime.min.time())
        ### Adding support for an integer datetime axis.
        elif 'int' in str(type(db_time)).lower():
            st = int(db_time)
        ### Convert pandas timestamp to Python datetime.
        else:
            st = db_time.to_pydatetime()

        ### round down to smooth timestamp
        sync_time = (
            round_time(st, date_delta=datetime.timedelta(minutes=1), to='down')
            if round_down else st
        ) if not isinstance(st, int) else st

    except Exception as e:
        sync_time = None
        warn(str(e))

    return sync_time
def get_to_sql_dtype(self, pipe: "'Pipe'", df: "'pd.DataFrame'", update_dtypes: bool = True) ‑> Dict[str, 'sqlalchemy.sql.visitors.TraversibleType']

Given a pipe and DataFrame, return the dtype dictionary for to_sql().

Parameters

pipe : Pipe
The pipe which may contain a dtypes parameter.
df : pd.DataFrame
The DataFrame to be pushed via to_sql().
update_dtypes : bool, default True
If True, patch the pipe's dtypes onto the DataFrame's dtypes.

Returns

A dictionary with sqlalchemy datatypes.

Examples

>>> import pandas as pd
>>> import meerschaum as mrsm
>>> 
>>> conn = mrsm.get_connector('sql:memory')
>>> df = pd.DataFrame([{'a': {'b': 1}}])
>>> pipe = mrsm.Pipe('a', 'b', dtypes={'a': 'json'})
>>> get_to_sql_dtype(pipe, df)
{'a': <class 'sqlalchemy.sql.sqltypes.JSON'>}
Expand source code
def get_to_sql_dtype(
        self,
        pipe: 'meerschaum.Pipe',
        df: 'pd.DataFrame',
        update_dtypes: bool = True,
    ) -> Dict[str, 'sqlalchemy.sql.visitors.TraversibleType']:
    """
    Given a pipe and DataFrame, return the `dtype` dictionary for `to_sql()`.

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe which may contain a `dtypes` parameter.

    df: pd.DataFrame
        The DataFrame to be pushed via `to_sql()`.

    update_dtypes: bool, default True
        If `True`, patch the pipe's dtypes onto the DataFrame's dtypes.

    Returns
    -------
    A dictionary with `sqlalchemy` datatypes.

    Examples
    --------
    >>> import pandas as pd
    >>> import meerschaum as mrsm
    >>> 
    >>> conn = mrsm.get_connector('sql:memory')
    >>> df = pd.DataFrame([{'a': {'b': 1}}])
    >>> pipe = mrsm.Pipe('a', 'b', dtypes={'a': 'json'})
    >>> get_to_sql_dtype(pipe, df)
    {'a': <class 'sqlalchemy.sql.sqltypes.JSON'>}
    """
    from meerschaum.utils.misc import get_json_cols
    from meerschaum.utils.sql import get_db_type
    df_dtypes = {col: str(typ) for col, typ in df.dtypes.items()}
    json_cols = get_json_cols(df)
    df_dtypes.update({col: 'json' for col in json_cols})
    if update_dtypes:
        df_dtypes.update(pipe.dtypes)
    return {
        col: get_db_type(typ, self.flavor, as_sqlalchemy=True)
        for col, typ in df_dtypes.items()
    }
def get_user_attributes(self, user: meerschaum.core.User, debug: bool = False) ‑> Union[Dict[str, Any], None]

Return the user's attributes.

Expand source code
def get_user_attributes(
        self,
        user: meerschaum.core.User,
        debug: bool = False
    ) -> Union[Dict[str, Any], None]:
    """
    Return the user's attributes.
    """
    ### ensure users table exists
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')
    from meerschaum.connectors.sql.tables import get_tables
    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']

    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)

    query = (
        sqlalchemy.select(users_tbl.c.attributes)
        .where(users_tbl.c.user_id == user_id)
    )

    result = self.value(query, debug=debug)
    if result is not None and not isinstance(result, dict):
        try:
            result = dict(result)
            _parsed = True
        except Exception as e:
            _parsed = False
        if not _parsed:
            try:
                import json
                result = json.loads(result)
                _parsed = True
            except Exception as e:
                _parsed = False
        if not _parsed:
            warn(f"Received unexpected type for attributes: {result}")
    return result
def get_user_id(self, user: meerschaum.core.User, debug: bool = False) ‑> Optional[int]

If a user is registered, return the user_id.

Expand source code
def get_user_id(
        self,
        user: meerschaum.core.User,
        debug : bool = False
    ) -> Optional[int]:
    """If a user is registered, return the `user_id`."""
    ### ensure users table exists
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')
    from meerschaum.connectors.sql.tables import get_tables
    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']

    query = (
        sqlalchemy.select(users_tbl.c.user_id)
        .where(users_tbl.c.username == user.username)
    )

    result = self.value(query, debug=debug)
    if result is not None:
        return int(result)
    return None
def get_user_password_hash(self, user: meerschaum.core.User, debug: bool = False, **kw: Any) ‑> Optional[str]

Return the password has for a user. NOTE: This may be dangerous and is only allowed if the security settings explicity allow it.

Expand source code
def get_user_password_hash(
        self,
        user: meerschaum.core.User,
        debug: bool = False,
        **kw: Any
    ) -> Optional[str]:
    """
    Return the password has for a user.
    **NOTE**: This may be dangerous and is only allowed if the security settings explicity allow it.
    """
    from meerschaum.utils.debug import dprint
    from meerschaum.connectors.sql.tables import get_tables
    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    if user.user_id is not None:
        user_id = user.user_id
        if debug:
            dprint(f"Already given user_id: {user_id}")
    else:
        if debug:
            dprint(f"Fetching user_id...")
        user_id = self.get_user_id(user, debug=debug)

    if user_id is None:
        return None

    query = sqlalchemy.select(users_tbl.c.password_hash).where(users_tbl.c.user_id == user_id)

    return self.value(query, debug=debug)
def get_user_type(self, user: meerschaum.core.User, debug: bool = False, **kw: Any) ‑> Optional[str]

Return the user's type.

Expand source code
def get_user_type(
        self,
        user: meerschaum.core.User,
        debug: bool = False,
        **kw: Any
    ) -> Optional[str]:
    """
    Return the user's type.
    """
    from meerschaum.connectors.sql.tables import get_tables
    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)

    if user_id is None:
        return None

    query = sqlalchemy.select(users_tbl.c.user_type).where(users_tbl.c.user_id == user_id)

    return self.value(query, debug=debug)
def get_users(self, debug: bool = False, **kw: Any) ‑> List[str]

Get the registered usernames.

Expand source code
def get_users(
        self,
        debug: bool = False,
        **kw: Any
    ) -> List[str]:
    """
    Get the registered usernames.
    """
    ### ensure users table exists
    from meerschaum.connectors.sql.tables import get_tables
    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    query = sqlalchemy.select(users_tbl.c.username)

    return list(self.read(query, debug=debug)['username'])
def pipe_exists(self, pipe: Pipe, debug: bool = False) ‑> bool

Check that a Pipe's table exists.

Parameters

pipe : meerschaum.Pipe:
The pipe to check.
debug : bool, default False
Verbosity toggle.

Returns

A bool corresponding to whether a pipe's table exists.

Expand source code
def pipe_exists(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False
    ) -> bool:
    """
    Check that a Pipe's table exists.

    Parameters
    ----------
    pipe: meerschaum.Pipe:
        The pipe to check.
        
    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `bool` corresponding to whether a pipe's table exists.

    """
    from meerschaum.utils.sql import table_exists
    exists = table_exists(pipe.target, self, debug=debug)
    if debug:
        from meerschaum.utils.debug import dprint
        dprint(f"{pipe} " + ('exists.' if exists else 'does not exist.'))
    return exists
def read(self, query_or_table: Union[str, sqlalchemy.Query], params: Optional[Dict[str, Any], List[str]] = None, dtype: Optional[Dict[str, Any]] = None, chunksize: Optional[int] = -1, workers: Optional[int] = None, chunk_hook: Optional[Callable[[pandas.DataFrame], Any]] = None, as_hook_results: bool = False, chunks: Optional[int] = None, as_chunks: bool = False, as_iterator: bool = False, silent: bool = False, debug: bool = False, **kw: Any) ‑> Union[pandas.DataFrame, List[pandas.DataFrame], List[Any], None]

Read a SQL query or table into a pandas dataframe.

Parameters

query_or_table : Union[str, sqlalchemy.Query]
The SQL query (sqlalchemy Query or string) or name of the table from which to select.
params : Optional[Dict[str, Any]], default None
List or Dict of parameters to pass to pandas.read_sql(). See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html
dtype : Optional[Dict[str, Any]], default None
A dictionary of data types to pass to pandas.read_sql(). See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html
chunksize : Optional[int], default -1

How many chunks to read at a time. None will read everything in one large chunk. Defaults to system configuration.

NOTE: DuckDB does not allow for chunking.

workers : Optional[int], default None
How many threads to use when consuming the generator. Only applies if chunk_hook is provided.
chunk_hook : Optional[Callable[[pandas.DataFrame], Any]], default None
Hook function to execute once per chunk, e.g. writing and reading chunks intermittently. See --sync-chunks for an example. NOTE: as_iterator MUST be False (default).
as_hook_results : bool, default False

If True, return a List of the outputs of the hook function. Only applicable if chunk_hook is not None.

NOTE: as_iterator MUST be False (default).

chunks : Optional[int], default None
Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and return into a single dataframe. For example, to limit the returned dataframe to 100,000 rows, you could specify a chunksize of 1000 and chunks of 100.
as_chunks : bool, default False
If True, return a list of DataFrames. Otherwise return a single DataFrame. Defaults to False.
as_iterator : bool, default False
If True, return the pandas DataFrame iterator. chunksize must not be None (falls back to 1000 if so), and hooks are not called in this case. Defaults to False.
silent : bool, default False
If True, don't raise warnings in case of errors. Defaults to False.

Returns

A pd.DataFrame (default case), or an iterator, or a list of dataframes / iterators, or None if something breaks.

Expand source code
def read(
        self,
        query_or_table: Union[str, sqlalchemy.Query],
        params: Optional[Dict[str, Any], List[str]] = None,
        dtype: Optional[Dict[str, Any]] = None,
        chunksize: Optional[int] = -1,
        workers: Optional[int] = None,
        chunk_hook: Optional[Callable[[pandas.DataFrame], Any]] = None,
        as_hook_results: bool = False,
        chunks: Optional[int] = None,
        as_chunks: bool = False,
        as_iterator: bool = False,
        silent: bool = False,
        debug: bool = False,
        **kw: Any
    ) -> Union[
        pandas.DataFrame,
        List[pandas.DataFrame],
        List[Any],
        None,
    ]:
    """
    Read a SQL query or table into a pandas dataframe.

    Parameters
    ----------
    query_or_table: Union[str, sqlalchemy.Query]
        The SQL query (sqlalchemy Query or string) or name of the table from which to select.

    params: Optional[Dict[str, Any]], default None
        `List` or `Dict` of parameters to pass to `pandas.read_sql()`.
        See the pandas documentation for more information:
        https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html

    dtype: Optional[Dict[str, Any]], default None
        A dictionary of data types to pass to `pandas.read_sql()`.
        See the pandas documentation for more information:
        https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html

    chunksize: Optional[int], default -1
        How many chunks to read at a time. `None` will read everything in one large chunk.
        Defaults to system configuration.

        **NOTE:** DuckDB does not allow for chunking.

    workers: Optional[int], default None
        How many threads to use when consuming the generator.
        Only applies if `chunk_hook` is provided.

    chunk_hook: Optional[Callable[[pandas.DataFrame], Any]], default None
        Hook function to execute once per chunk, e.g. writing and reading chunks intermittently.
        See `--sync-chunks` for an example.
        **NOTE:** `as_iterator` MUST be False (default).

    as_hook_results: bool, default False
        If `True`, return a `List` of the outputs of the hook function.
        Only applicable if `chunk_hook` is not None.

        **NOTE:** `as_iterator` MUST be `False` (default).

    chunks: Optional[int], default None
        Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and
        return into a single dataframe.
        For example, to limit the returned dataframe to 100,000 rows,
        you could specify a `chunksize` of `1000` and `chunks` of `100`.

    as_chunks: bool, default False
        If `True`, return a list of DataFrames. Otherwise return a single DataFrame.
        Defaults to `False`.

    as_iterator: bool, default False
        If `True`, return the pandas DataFrame iterator.
        `chunksize` must not be `None` (falls back to 1000 if so),
        and hooks are not called in this case.
        Defaults to `False`.

    silent: bool, default False
        If `True`, don't raise warnings in case of errors.
        Defaults to `False`.

    Returns
    -------
    A `pd.DataFrame` (default case), or an iterator, or a list of dataframes / iterators,
    or `None` if something breaks.

    """
    if chunks is not None and chunks <= 0:
        return []
    from meerschaum.utils.sql import sql_item_name, truncate_item_name
    from meerschaum.utils.packages import attempt_import, import_pandas
    from meerschaum.utils.pool import get_pool
    import warnings
    import inspect
    import traceback
    pd = import_pandas()
    sqlalchemy = attempt_import("sqlalchemy")
    default_chunksize = self._sys_config.get('chunksize', None)
    chunksize = chunksize if chunksize != -1 else default_chunksize
    if chunksize is None and as_iterator:
        if not silent and self.flavor not in _disallow_chunks_flavors:
            warn(
                f"An iterator may only be generated if chunksize is not None.\n"
                + "Falling back to a chunksize of 1000.", stacklevel=3,
            )
        chunksize = 1000
    if chunksize is not None and self.flavor in _max_chunks_flavors:
        if chunksize > _max_chunks_flavors[self.flavor]:
            if chunksize != default_chunksize:
                warn(
                    f"The specified chunksize of {chunksize} exceeds the maximum of "
                    + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n"
                    + f"    Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.",
                    stacklevel = 3,
                )
            chunksize = _max_chunks_flavors[self.flavor]

    ### NOTE: A bug in duckdb_engine does not allow for chunks.
    if chunksize is not None and self.flavor in _disallow_chunks_flavors:
        chunksize = None

    if debug:
        import time
        start = time.perf_counter()
        dprint(query_or_table)
        dprint(f"Fetching with chunksize: {chunksize}")

    ### This might be sqlalchemy object or the string of a table name.
    ### We check for spaces and quotes to see if it might be a weird table.
    if (
        ' ' not in str(query_or_table)
        or (
            ' ' in str(query_or_table)
            and str(query_or_table).startswith('"')
            and str(query_or_table).endswith('"')
        )
    ):
        truncated_table_name = truncate_item_name(str(query_or_table), self.flavor)
        if truncated_table_name != str(query_or_table) and not silent:
            warn(
                f"Table '{name}' is too long for '{self.flavor}',"
                + f" will instead create the table '{truncated_name}'."
            )

        query_or_table = sql_item_name(str(query_or_table), self.flavor)
        if debug:
            dprint(f"Reading from table {query_or_table}")
        formatted_query = sqlalchemy.text("SELECT * FROM " + str(query_or_table))
    else:
        try:
            formatted_query = sqlalchemy.text(query_or_table)
        except Exception as e:
            formatted_query = query_or_table

    chunk_list = []
    chunk_hook_results = []
    try:
        stream_results = not as_iterator and chunk_hook is not None and chunksize is not None
        with warnings.catch_warnings():
            warnings.filterwarnings('ignore', 'case sensitivity issues')
            with self.engine.begin() as transaction:
                with transaction.execution_options(stream_results=stream_results) as connection:
                    chunk_generator = pd.read_sql_query(
                        formatted_query,
                        connection,
                        params = params,
                        chunksize = chunksize,
                        dtype = dtype,
                    )

                    ### `stream_results` must be False (will load everything into memory).
                    if as_iterator or chunksize is None:
                        return chunk_generator

                    ### We must consume the generator in this context if using server-side cursors.
                    if stream_results:

                        pool = get_pool(workers=workers)

                        def _process_chunk(_chunk, _retry_on_failure: bool = True):
                            if not as_hook_results:
                                chunk_list.append(_chunk)
                            result = None
                            if chunk_hook is not None:
                                try:
                                    result = chunk_hook(
                                        _chunk,
                                        workers = workers,
                                        chunksize = chunksize,
                                        debug = debug,
                                        **kw
                                    )
                                except Exception as e:
                                    result = False, traceback.format_exc()
                                    from meerschaum.utils.formatting import get_console
                                    get_console().print_exception()

                                ### If the chunk fails to process, try it again one more time.
                                if isinstance(result, tuple) and result[0] is False:
                                    if _retry_on_failure:
                                        return _process_chunk(_chunk, _retry_on_failure=False)

                            return result

                        chunk_hook_results = list(pool.imap(_process_chunk, chunk_generator))
                        if as_hook_results:
                            return chunk_hook_results

    except Exception as e:
        if debug:
            dprint(f"[{self}] Failed to execute query:\n\n{query_or_table}\n\n")
        if not silent:
            warn(str(e), stacklevel=3)
        from meerschaum.utils.formatting import get_console
        get_console().print_exception()

        return None

    read_chunks = 0
    try:
        for chunk in chunk_generator:
            if chunk_hook is not None:
                chunk_hook_results.append(
                    chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw)
                )
            chunk_list.append(chunk)
            read_chunks += 1
            if chunks is not None and read_chunks >= chunks:
                break
    except Exception as e:
        warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3)
        from meerschaum.utils.formatting import get_console
        get_console().print_exception()

        return None

    ### If no chunks returned, read without chunks
    ### to get columns
    if len(chunk_list) == 0:
        with warnings.catch_warnings():
            warnings.filterwarnings('ignore', 'case sensitivity issues')
            with self.engine.begin() as connection:
                chunk_list.append(
                    pd.read_sql_query(
                        formatted_query,
                        connection,
                        params = params, 
                        dtype = dtype,
                    )
                )

    ### call the hook on any missed chunks.
    if chunk_hook is not None and len(chunk_list) > len(chunk_hook_results):
        for c in chunk_list[len(chunk_hook_results):]:
            chunk_hook_results.append(
                chunk_hook(c, chunksize=chunksize, debug=debug, **kw)
            )

    ### chunksize is not None so must iterate
    if debug:
        end = time.perf_counter()
        dprint(f"Fetched {len(chunk_list)} chunks in {round(end - start, 2)} seconds.")

    if as_hook_results:
        return chunk_hook_results
    
    ### Skip `pd.concat()` if `as_chunks` is specified.
    if as_chunks:
        for c in chunk_list:
            c.reset_index(drop=True, inplace=True)
        return chunk_list

    return pd.concat(chunk_list).reset_index(drop=True)
def register_pipe(self, pipe: Pipe, debug: bool = False) ‑> SuccessTuple

Register a new pipe. A pipe's attributes must be set before registering.

Expand source code
def register_pipe(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False,
    ) -> SuccessTuple:
    """
    Register a new pipe.
    A pipe's attributes must be set before registering.
    """
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.packages import attempt_import
    from meerschaum.utils.sql import json_flavors

    ### ensure pipes table exists
    from meerschaum.connectors.sql.tables import get_tables
    pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes']

    if pipe.get_id(debug=debug) is not None:
        return False, f"{pipe} is already registered."

    ### NOTE: if `parameters` is supplied in the Pipe constructor,
    ###       then `pipe.parameters` will exist and not be fetched from the database.

    ### 1. Prioritize the Pipe object's `parameters` first.
    ###    E.g. if the user manually sets the `parameters` property
    ###    or if the Pipe already exists
    ###    (which shouldn't be able to be registered anyway but that's an issue for later).
    parameters = None
    try:
        parameters = pipe.parameters
    except Exception as e:
        if debug:
            dprint(str(e))
        parameters = None

    ### ensure `parameters` is a dictionary
    if parameters is None:
        parameters = {}

    import json
    sqlalchemy = attempt_import('sqlalchemy')
    values = {
        'connector_keys' : pipe.connector_keys,
        'metric_key'     : pipe.metric_key,
        'location_key'   : pipe.location_key,
        'parameters'     : (
            json.dumps(parameters)
            if self.flavor not in json_flavors
            else parameters
        ),
    }
    query = sqlalchemy.insert(pipes_tbl).values(**values)
    result = self.exec(query, debug=debug)
    if result is None:
        return False, f"Failed to register {pipe}."
    return True, f"Successfully registered {pipe}."
def register_plugin(self, plugin: "'meerschaum.core.Plugin'", force: bool = False, debug: bool = False, **kw: Any) ‑> SuccessTuple

Register a new plugin to the plugins table.

Expand source code
def register_plugin(
        self,
        plugin: 'meerschaum.core.Plugin',
        force: bool = False,
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """Register a new plugin to the plugins table."""
    from meerschaum.utils.warnings import warn, error
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')
    from meerschaum.utils.sql import json_flavors
    from meerschaum.connectors.sql.tables import get_tables
    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']

    old_id = self.get_plugin_id(plugin, debug=debug)

    ### Check for version conflict. May be overridden with `--force`.
    if old_id is not None and not force:
        old_version = self.get_plugin_version(plugin, debug=debug)
        new_version = plugin.version
        if old_version is None:
            old_version = ''
        if new_version is None:
            new_version = ''

        ### verify that the new version is greater than the old
        packaging_version = attempt_import('packaging.version')
        if (
            old_version and new_version
            and packaging_version.parse(old_version) >= packaging_version.parse(new_version)
        ):
            return False, (
                f"Version '{new_version}' of plugin '{plugin}' " +
                f"must be greater than existing version '{old_version}'."
            )

    import json
    bind_variables = {
        'plugin_name' : plugin.name,
        'version' : plugin.version,
        'attributes' : (
            json.dumps(plugin.attributes) if self.flavor not in json_flavors else plugin.attributes
        ),
        'user_id' : plugin.user_id,
    }

    if old_id is None:
        query = sqlalchemy.insert(plugins_tbl).values(**bind_variables)
    else:
        query = (
            sqlalchemy.update(plugins_tbl)
            .values(**bind_variables)
            .where(plugins_tbl.c.plugin_id == old_id)
        )

    result = self.exec(query, debug=debug)
    if result is None:
        return False, f"Failed to register plugin '{plugin}'."
    return True, f"Successfully registered plugin '{plugin}'."
def register_user(self, user: meerschaum.core.User, debug: bool = False, **kw: Any) ‑> SuccessTuple

Register a new user.

Expand source code
def register_user(
        self,
        user: meerschaum.core.User,
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """Register a new user."""
    from meerschaum.utils.warnings import warn, error, info
    from meerschaum.utils.packages import attempt_import
    from meerschaum.utils.sql import json_flavors
    sqlalchemy = attempt_import('sqlalchemy')

    valid_tuple = valid_username(user.username)
    if not valid_tuple[0]:
        return valid_tuple

    old_id = self.get_user_id(user, debug=debug)

    if old_id is not None:
        return False, f"User '{user}' already exists."

    ### ensure users table exists
    from meerschaum.connectors.sql.tables import get_tables
    tables = get_tables(mrsm_instance=self, debug=debug)

    import json
    bind_variables = {
        'username' : user.username,
        'email' : user.email,
        'password_hash' : user.password_hash,
        'user_type' : user.type,
        'attributes' : (
            json.dumps(user.attributes) if self.flavor not in json_flavors else user.attributes
        ),
    }
    if old_id is not None:
        return False, f"User '{user.username}' already exists."
    if old_id is None:
        query = (
            sqlalchemy.insert(tables['users']).
            values(**bind_variables)
        )

    result = self.exec(query, debug=debug)
    if result is None:
        return False, f"Failed to register user '{user}'."
    return True, f"Successfully registered user '{user}'."
def sync_pipe(self, pipe: Pipe, df: Union[pandas.DataFrame, str, Dict[Any, Any], None] = None, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, chunksize: Optional[int] = -1, check_existing: bool = True, blocking: bool = True, debug: bool = False, **kw: Any) ‑> SuccessTuple

Sync a pipe using a database connection.

Parameters

pipe : Pipe
The Meerschaum Pipe instance into which to sync the data.
df : Union[pandas.DataFrame, str, Dict[Any, Any], List[Dict[str, Any]]]
An optional DataFrame or equivalent to sync into the pipe. Defaults to None.
begin : Optional[datetime.datetime], default None
Optionally specify the earliest datetime to search for data. Defaults to None.
end : Optional[datetime.datetime], default None
Optionally specify the latest datetime to search for data. Defaults to None.
chunksize : Optional[int], default -1
Specify the number of rows to sync per chunk. If -1, resort to system configuration (default is 900). A chunksize of None will sync all rows in one transaction. Defaults to -1.
check_existing : bool, default True
If True, pull and diff with existing data from the pipe. Defaults to True.
blocking : bool, default True
If True, wait for sync to finish and return its result, otherwise asyncronously sync. Defaults to True.
debug : bool, default False
Verbosity toggle. Defaults to False.
kw : Any
Catch-all for keyword arguments.

Returns

A SuccessTuple of success (bool) and message (str).

Expand source code
def sync_pipe(
        self,
        pipe: meerschaum.Pipe,
        df: Union[pandas.DataFrame, str, Dict[Any, Any], None] = None,
        begin: Optional[datetime.datetime] = None,
        end: Optional[datetime.datetime] = None,
        chunksize: Optional[int] = -1,
        check_existing: bool = True,
        blocking: bool = True,
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """
    Sync a pipe using a database connection.

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The Meerschaum Pipe instance into which to sync the data.

    df: Union[pandas.DataFrame, str, Dict[Any, Any], List[Dict[str, Any]]]
        An optional DataFrame or equivalent to sync into the pipe.
        Defaults to `None`.

    begin: Optional[datetime.datetime], default None
        Optionally specify the earliest datetime to search for data.
        Defaults to `None`.

    end: Optional[datetime.datetime], default None
        Optionally specify the latest datetime to search for data.
        Defaults to `None`.

    chunksize: Optional[int], default -1
        Specify the number of rows to sync per chunk.
        If `-1`, resort to system configuration (default is `900`).
        A `chunksize` of `None` will sync all rows in one transaction.
        Defaults to `-1`.

    check_existing: bool, default True
        If `True`, pull and diff with existing data from the pipe. Defaults to `True`.

    blocking: bool, default True
        If `True`, wait for sync to finish and return its result, otherwise asyncronously sync.
        Defaults to `True`.

    debug: bool, default False
        Verbosity toggle. Defaults to False.

    kw: Any
        Catch-all for keyword arguments.

    Returns
    -------
    A `SuccessTuple` of success (`bool`) and message (`str`).
    """
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.packages import import_pandas
    from meerschaum.utils.sql import get_update_queries, sql_item_name, json_flavors
    from meerschaum.utils.misc import generate_password, get_json_cols
    from meerschaum import Pipe
    import time
    import copy
    pd = import_pandas()
    if df is None:
        msg = f"DataFrame is None. Cannot sync {pipe}."
        warn(msg)
        return False, msg

    start = time.perf_counter()

    if not pipe.temporary and not pipe.get_id(debug=debug):
        register_tuple = pipe.register(debug=debug)
        if not register_tuple[0]:
            return register_tuple

    ### df is the dataframe returned from the remote source
    ### via the connector
    if debug:
        dprint("Fetched data:\n" + str(df))

    if not isinstance(df, pd.DataFrame):
        df = pipe.enforce_dtypes(df, debug=debug)

    ### if table does not exist, create it with indices
    is_new = False
    add_cols_query = None
    if not pipe.exists(debug=debug):
        check_existing = False
        is_new = True
    else:
        ### Check for new columns.
        add_cols_queries = self.get_add_columns_queries(pipe, df, debug=debug)
        if add_cols_queries:
            if not self.exec_queries(add_cols_queries, debug=debug):
                warn(f"Failed to add new columns to {pipe}.")

        alter_cols_queries = self.get_alter_columns_queries(pipe, df, debug=debug)
        if alter_cols_queries:
            if not self.exec_queries(alter_cols_queries, debug=debug):
                warn(f"Failed to alter columns for {pipe}.")
            else:
                pipe.infer_dtypes(persist=True)

    unseen_df, update_df, delta_df = (
        pipe.filter_existing(
            df,
            chunksize = chunksize,
            debug = debug,
            **kw
        ) if check_existing else (df, None, df)
    )
    if debug:
        dprint("Delta data:\n" + str(delta_df))
        dprint("Unseen data:\n" + str(unseen_df))
        if update_df is not None:
            dprint("Update data:\n" + str(update_df))

    if update_df is not None and not update_df.empty:
        transact_id = generate_password(3)
        temp_target = '_' + transact_id + '_' + pipe.target
        update_kw = copy.deepcopy(kw)
        update_kw.update({
            'name': temp_target,
            'if_exists': 'append',
            'chunksize': chunksize,
            'dtype': self.get_to_sql_dtype(pipe, update_df, update_dtypes=False),
            'debug': debug,
        })
        self.to_sql(update_df, **update_kw)
        temp_pipe = Pipe(
            pipe.connector_keys + '_', pipe.metric_key, pipe.location_key,
            instance = pipe.instance_keys,
            columns = pipe.columns,
            target = temp_target,
            temporary = True,
        )

        existing_cols = pipe.get_columns_types(debug=debug)
        join_cols = [
            col for col_key, col in pipe.columns.items()
            if col and col_key != 'value' and col in existing_cols
        ]

        queries = get_update_queries(
            pipe.target,
            temp_target,
            self,
            join_cols,
            debug = debug
        )
        success = all(self.exec_queries(queries, break_on_error=True, debug=debug))
        drop_success, drop_msg = temp_pipe.drop(debug=debug)
        if not drop_success:
            warn(drop_msg)
        if not success:
            return False, f"Failed to apply update to {pipe}."

    if_exists = kw.get('if_exists', 'append')
    if 'if_exists' in kw:
        kw.pop('if_exists')
    if 'name' in kw:
        kw.pop('name')

    ### Account for first-time syncs of JSON columns.
    unseen_json_cols = get_json_cols(unseen_df)
    update_json_cols = get_json_cols(update_df) if update_df is not None else []
    json_cols = list(set(unseen_json_cols + update_json_cols))
    existing_json_cols = [col for col, typ in pipe.dtypes.items() if typ == 'json']
    new_json_cols = [col for col in json_cols if col not in existing_json_cols]
    if new_json_cols:
        pipe.dtypes.update({col: 'json' for col in json_cols})
        edit_success, edit_msg = pipe.edit(interactive=False, debug=debug)
        if not edit_success:
            warn(f"Unable to update JSON dtypes for {pipe}:\n{edit_msg}")

    ### Insert new data into Pipe's table.
    unseen_kw = copy.deepcopy(kw)
    unseen_kw.update({
        'name': pipe.target,
        'if_exists': if_exists,
        'debug': debug,
        'as_dict': True,
        'chunksize': chunksize,
        'dtype': self.get_to_sql_dtype(pipe, unseen_df, update_dtypes=True),
    })

    stats = self.to_sql(unseen_df, **unseen_kw)
    if is_new:
        if not self.create_indices(pipe, debug=debug):
            if debug:
                dprint(f"Failed to create indices for {pipe}. Continuing...")

    end = time.perf_counter()
    success = stats['success']
    if not success:
        return success, stats['msg']
    msg = (
        f"Inserted {len(unseen_df)}, "
        + f"updated {len(update_df) if update_df is not None else 0} rows."
    )
    if debug:
        msg = msg[:-1] + (
            f"\non table {sql_item_name(pipe.target, self.flavor)}\n"
            + f"in {round(end-start, 2)} seconds."
        )
    return success, msg
def sync_pipe_inplace(self, pipe: "'Pipe'", params: Optional[Dict[str, Any]] = None, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, chunksize: Optional[int] = -1, check_existing: bool = True, debug: bool = False, **kw: Any) ‑> SuccessTuple

If a pipe's connector is the same as its instance connector, it's more efficient to sync the pipe in-place rather than reading data into Pandas.

Parameters

pipe : Pipe
The pipe whose connector is the same as its instance.
params : Optional[Dict[str, Any]], default None
Optional params dictionary to build the WHERE clause. See build_where().
begin : Optional[datetime.datetime], default None
Optionally specify the earliest datetime to search for data. Defaults to None.
end : Optional[datetime.datetime], default None
Optionally specify the latest datetime to search for data. Defaults to None.
chunksize : Optional[int], default -1
Specify the number of rows to sync per chunk. If -1, resort to system configuration (default is 900). A chunksize of None will sync all rows in one transaction. Defaults to -1.
check_existing : bool, default True
If True, pull and diff with existing data from the pipe. Defaults to True.
debug : bool, default False
Verbosity toggle.

Returns

A SuccessTuple.

Expand source code
def sync_pipe_inplace(
        self,
        pipe: 'meerschaum.Pipe',
        params: Optional[Dict[str, Any]] = None,
        begin: Optional[datetime.datetime] = None,
        end: Optional[datetime.datetime] = None,
        chunksize: Optional[int] = -1,
        check_existing: bool = True,
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """
    If a pipe's connector is the same as its instance connector,
    it's more efficient to sync the pipe in-place rather than reading data into Pandas.

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe whose connector is the same as its instance.

    params: Optional[Dict[str, Any]], default None
        Optional params dictionary to build the `WHERE` clause.
        See `meerschaum.utils.sql.build_where`.

    begin: Optional[datetime.datetime], default None
        Optionally specify the earliest datetime to search for data.
        Defaults to `None`.

    end: Optional[datetime.datetime], default None
        Optionally specify the latest datetime to search for data.
        Defaults to `None`.

    chunksize: Optional[int], default -1
        Specify the number of rows to sync per chunk.
        If `-1`, resort to system configuration (default is `900`).
        A `chunksize` of `None` will sync all rows in one transaction.
        Defaults to `-1`.

    check_existing: bool, default True
        If `True`, pull and diff with existing data from the pipe. Defaults to `True`.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A SuccessTuple.
    """
    from meerschaum.utils.sql import (
        sql_item_name, table_exists, get_sqlalchemy_table, get_pd_type,
        get_update_queries, get_null_replacement,
    )
    from meerschaum.utils.misc import generate_password
    from meerschaum.utils.debug import dprint
    metadef = self.get_pipe_metadef(
        pipe,
        params = params,
        begin = begin,
        end = end,
        debug = debug,
    )
    if self.flavor in ('mssql',):
        final_select_ix = metadef.lower().rfind('select')
        def_name = metadef[len('WITH '):].split(' ', maxsplit=1)[0]
        metadef = (
            metadef[:final_select_ix].rstrip() + ',\n'
            + "metadef AS (\n"
            + metadef[final_select_ix:]
            + "\n)\n"
        )

    pipe_name = sql_item_name(pipe.target, self.flavor)
    if not pipe.exists(debug=debug):
        if self.flavor in ('mssql',):
            create_pipe_query = metadef + f"SELECT *\nINTO {pipe_name}\nFROM metadef"
        elif self.flavor in ('sqlite', 'oracle', 'mysql', 'mariadb', 'duckdb'):
            create_pipe_query = (
                f"CREATE TABLE {pipe_name} AS\n"
                + f"SELECT *\nFROM ({metadef})"
                + (" AS metadef" if self.flavor in ('mysql', 'mariadb') else '')
            )
        else:
            create_pipe_query = f"SELECT *\nINTO {pipe_name}\nFROM ({metadef}) AS metadef"
        result = self.exec(create_pipe_query, debug=debug)
        if result is None:
            return False, f"Could not insert new data into {pipe} from its SQL query definition."
        if not self.create_indices(pipe, debug=debug):
            if debug:
                dprint(f"Failed to create indices for {pipe}. Continuing...")

        rowcount = pipe.get_rowcount(debug=debug)
        return True, f"Inserted {rowcount}, updated 0 rows."

    ### Generate names for the tables.
    transact_id = generate_password(3)
    def get_temp_table_name(label: str) -> str:
        return '_' + transact_id + '_' + label + '_' + pipe.target

    backtrack_table_raw = get_temp_table_name('backtrack')
    backtrack_table_name = sql_item_name(backtrack_table_raw, self.flavor)
    new_table_raw = get_temp_table_name('new')
    new_table_name = sql_item_name(new_table_raw, self.flavor)
    delta_table_raw = get_temp_table_name('delta')
    delta_table_name = sql_item_name(delta_table_raw, self.flavor)
    joined_table_raw = get_temp_table_name('joined')
    joined_table_name = sql_item_name(joined_table_raw, self.flavor)
    unseen_table_raw = get_temp_table_name('unseen')
    unseen_table_name = sql_item_name(unseen_table_raw, self.flavor)
    update_table_raw = get_temp_table_name('update')
    update_table_name = sql_item_name(update_table_raw, self.flavor)

    new_queries = []
    drop_new_query = f"DROP TABLE {new_table_name}"
    if table_exists(new_table_raw, self, debug=debug):
        new_queries.append(drop_new_query)

    if self.flavor in ('mssql',):
        create_new_query = metadef + f"SELECT *\nINTO {new_table_name}\nFROM metadef"
    elif self.flavor in ('sqlite', 'oracle', 'mysql', 'mariadb', 'duckdb'):
        create_new_query = (
            f"CREATE TABLE {new_table_name} AS\n"
            + f"SELECT *\nFROM ({metadef})"
            + (" AS metadef" if self.flavor in ('mysql', 'mariadb') else '')
        )
    else:
        create_new_query = f"SELECT *\nINTO {new_table_name}\nFROM ({metadef}) AS metadef"

    new_queries.append(create_new_query)

    new_success = all(self.exec_queries(new_queries, break_on_error=True, debug=debug))
    if not new_success:
        self.exec_queries([drop_new_query], break_on_error=False, debug=debug)
        return False, f"Could not fetch new data for {pipe}."

    new_table_obj = get_sqlalchemy_table(
        new_table_raw,
        connector = self,
        refresh = True,
        debug = debug,
    )
    new_cols = {str(col.name): get_pd_type(str(col.type)) for col in new_table_obj.columns}

    add_cols_queries = self.get_add_columns_queries(pipe, new_cols, debug=debug)
    if add_cols_queries:
        if not self.exec_queries(add_cols_queries, debug=debug):
            warn(f"Failed to add new columns to {pipe}.")

    alter_cols_queries = self.get_alter_columns_queries(pipe, new_cols, debug=debug)
    if alter_cols_queries:
        if not self.exec_queries(alter_cols_queries, debug=debug):
            warn(f"Failed to alter columns for {pipe}.")
        else:
            pipe.infer_dtypes(persist=True)

    if not check_existing:
        new_count = self.value(f"SELECT COUNT(*) FROM {new_table_name}", debug=debug)
        insert_queries = [
            (
                f"INSERT INTO {pipe_name}\n"
                + f"SELECT *\nFROM {new_table_name}"
            ),
            f"DROP TABLE {new_table_name}"
        ]
        if not self.exec_queries(insert_queries, debug=debug, break_on_error=False):
            return False, f"Failed to insert into rows into {pipe}."
        return True, f"Inserted {new_count}, updated 0 rows."


    backtrack_queries = []
    drop_backtrack_query = f"DROP TABLE {backtrack_table_name}"
    if table_exists(backtrack_table_raw, self, debug=debug):
        backtrack_queries.append(drop_backtrack_query)
    btm = max(self.get_pipe_backtrack_minutes(pipe), 1)
    backtrack_def = self.get_pipe_data_query(
        pipe,
        begin = begin,
        end = end,
        begin_add_minutes = (-1 * btm),
        end_add_minutes = 1,
        params = params,
        debug = debug,
        order = None,
    )

    create_backtrack_query = (
        (
            f"WITH backtrack_def AS (\n{backtrack_def}\n)\n"
            + f"SELECT *\nINTO {backtrack_table_name}\nFROM backtrack_def"
        ) if self.flavor not in ('sqlite', 'oracle', 'mysql', 'mariadb', 'duckdb')
        else (
            f"CREATE TABLE {backtrack_table_name} AS\n"
            + f"SELECT *\nFROM ({backtrack_def})"
            + (" AS backtrack" if self.flavor in ('mysql', 'mariadb') else '')
        )
    )
    backtrack_queries.append(create_backtrack_query)
    backtrack_success = all(self.exec_queries(backtrack_queries, break_on_error=True, debug=debug))
    if not backtrack_success:
        self.exec_queries([drop_new_query, drop_backtrack_query], break_on_error=False, debug=debug)
        return False, f"Could not fetch backtrack data from {pipe}."


    ### Determine which index columns are present in both tables.
    backtrack_table_obj = get_sqlalchemy_table(
        backtrack_table_raw,
        connector = self,
        refresh = True,
        debug = debug,
    )
    backtrack_cols = {str(col.name): str(col.type) for col in backtrack_table_obj.columns}
    common_cols = [col for col in new_cols if col in backtrack_cols]
    on_cols = {
        col: new_cols.get(col, 'object')
        for col_key, col in pipe.columns.items()
        if (
            col
            and
            col_key != 'value'
            and col in backtrack_cols
            and col in new_cols
        )
    }

    delta_queries = []
    drop_delta_query = f"DROP TABLE {delta_table_name}"
    if table_exists(delta_table_raw, self, debug=debug):
        delta_queries.append(drop_delta_query)

    create_delta_query = (
        (
            f"SELECT\n"
            + (
                ', '.join([
                    f"COALESCE(new.{sql_item_name(col, self.flavor)}, "
                    + f"{get_null_replacement(typ, self.flavor)}) AS "
                    + sql_item_name(col, self.flavor)
                    for col, typ in new_cols.items()
                ])
            )
            + "\n"
            + f"INTO {delta_table_name}\n"
            + f"FROM {new_table_name} AS new\n"
            + f"LEFT OUTER JOIN {backtrack_table_name} AS old\nON\n"
            + '\nAND\n'.join([
                (
                    'COALESCE(new.' + sql_item_name(c, self.flavor) + ", "
                    + get_null_replacement(new_cols[c], self.flavor) + ") "
                    + ' = '
                    + 'COALESCE(old.' + sql_item_name(c, self.flavor) + ", "
                    + get_null_replacement(backtrack_cols[c], self.flavor) + ") "
                ) for c in common_cols
            ])
            + "\nWHERE\n"
            + '\nAND\n'.join([
                (
                    'old.' + sql_item_name(c, self.flavor) + ' IS NULL'
                ) for c in common_cols
            ])
            #  + "\nAND\n"
            #  + '\nAND\n'.join([
                #  (
                    #  'new.' + sql_item_name(c, self.flavor) + ' IS NOT NULL'
                #  ) for c in new_cols
            #  ])
        ) if self.flavor not in ('sqlite', 'oracle', 'mysql', 'mariadb', 'duckdb')
        else (
            f"CREATE TABLE {delta_table_name} AS\n"
            + f"SELECT\n"
            + (
                ', '.join([
                    f"COALESCE(new.{sql_item_name(col, self.flavor)}, "
                    + f"{get_null_replacement(typ, self.flavor)}) AS "
                    + sql_item_name(col, self.flavor)
                    for col, typ in new_cols.items()
                ])
            )
            + "\n"
            + f"FROM {new_table_name} new\n"
            + f"LEFT OUTER JOIN {backtrack_table_name} old\nON\n"
            + '\nAND\n'.join([
                (
                    'COALESCE(new.' + sql_item_name(c, self.flavor) + ", "
                    + get_null_replacement(new_cols[c], self.flavor) + ") "
                    + ' = '
                    + 'COALESCE(old.' + sql_item_name(c, self.flavor) + ", "
                    + get_null_replacement(backtrack_cols[c], self.flavor) + ") "
                ) for c in common_cols
            ])
            + "\nWHERE\n"
            + '\nAND\n'.join([
                (
                    'old.' + sql_item_name(c, self.flavor) + ' IS NULL'
                ) for c in common_cols
            ])
            #  + "\nAND\n"
            #  + '\nAND\n'.join([
                #  (
                    #  'new.' + sql_item_name(c, self.flavor) + ' IS NOT NULL'
                #  ) for c in new_cols
            #  ])
        )
    )

    delta_queries.append(create_delta_query)

    delta_success = all(self.exec_queries(delta_queries, break_on_error=True, debug=debug))
    if not delta_success:
        self.exec_queries(
            [
                drop_new_query,
                drop_backtrack_query,
                drop_delta_query,
            ],
            break_on_error = False,
            debug = debug,
        )
        return False, f"Could not filter data for {pipe}."

    delta_table_obj = get_sqlalchemy_table(
        delta_table_raw,
        connector = self,
        refresh = True,
        debug = debug,
    )
    delta_cols = {str(col.name): get_pd_type(str(col.type)) for col in delta_table_obj.columns}

    joined_queries = []
    drop_joined_query = f"DROP TABLE {joined_table_name}"
    if on_cols and table_exists(joined_table_raw, self, debug=debug):
        joined_queries.append(drop_joined_query)

    create_joined_query = (
        (
            "SELECT "
            + (', '.join([
                (
                    'delta.' + sql_item_name(c, self.flavor)
                    + " AS " + sql_item_name(c + '_delta', self.flavor)
                ) for c in delta_cols
            ]))
            + ", "
            + (', '.join([
                (
                    'bt.' + sql_item_name(c, self.flavor)
                    + " AS " + sql_item_name(c + '_backtrack', self.flavor)
                ) for c in backtrack_cols
            ]))
            + f"\nINTO {joined_table_name}\n"
            + f"FROM {delta_table_name} AS delta\n"
            + f"LEFT OUTER JOIN {backtrack_table_name} AS bt\nON\n"
            + '\nAND\n'.join([
                (
                    'COALESCE(delta.' + sql_item_name(c, self.flavor)
                    + ", " + get_null_replacement(typ, self.flavor) + ")"
                    + ' = '
                    + 'COALESCE(bt.' + sql_item_name(c, self.flavor)
                    + ", " + get_null_replacement(typ, self.flavor) + ")"
                ) for c, typ in on_cols.items()
            ])
        ) if self.flavor not in ('sqlite', 'oracle', 'mysql', 'mariadb', 'duckdb')
        else (
            f"CREATE TABLE {joined_table_name} AS\n"
            + "SELECT "
            + (', '.join([
                (
                    'delta.' + sql_item_name(c, self.flavor)
                    + " AS " + sql_item_name(c + '_delta', self.flavor)
                ) for c in delta_cols
            ]))
            + ", "
            + (', '.join([
                (
                    'bt.' + sql_item_name(c, self.flavor)
                    + " AS " + sql_item_name(c + '_backtrack', self.flavor)
                ) for c in backtrack_cols
            ]))
            + f"\nFROM {delta_table_name} delta\n"
            + f"LEFT OUTER JOIN {backtrack_table_name} bt\nON\n"
            + '\nAND\n'.join([
                (
                    'COALESCE(delta.' + sql_item_name(c, self.flavor)
                    + ", " + get_null_replacement(typ, self.flavor) + ")"
                    + ' = '
                    + 'COALESCE(bt.' + sql_item_name(c, self.flavor)
                    + ", " + get_null_replacement(typ, self.flavor) + ")"
                ) for c, typ in on_cols.items()
            ])
        )
    )

    joined_queries.append(create_joined_query)

    joined_success = (
        all(self.exec_queries(joined_queries, break_on_error=True, debug=debug))
        if on_cols else True
    )
    if not joined_success:
        self.exec_queries(
            [
                drop_new_query,
                drop_backtrack_query,
                drop_delta_query,
                drop_joined_query,
            ],
            break_on_error = False,
            debug = debug,
        )
        return False, f"Could not separate new and updated data for {pipe}."

    unseen_queries = []
    drop_unseen_query = f"DROP TABLE {unseen_table_name}"
    if on_cols and table_exists(unseen_table_raw, self, debug=debug):
        unseen_queries.append(drop_unseen_query)

    create_unseen_query = (
        (
            "SELECT "
            + (', '.join([
                (
                    "CASE\n    WHEN " + sql_item_name(c + '_delta', self.flavor)
                    + " != " + get_null_replacement(typ, self.flavor) 
                    + " THEN " + sql_item_name(c + '_delta', self.flavor)
                    + "\n    ELSE NULL\nEND "
                    + " AS " + sql_item_name(c, self.flavor)
                ) for c, typ in delta_cols.items()
            ]))
            + f"\nINTO {unseen_table_name}\n"
            + f"\nFROM {joined_table_name} AS joined\n"
            + f"WHERE "
            + '\nAND\n'.join([
                (
                    sql_item_name(c + '_backtrack', self.flavor) + ' IS NULL'
                ) for c in delta_cols
            ])
            #  + "\nAND\n"
            #  + '\nAND\n'.join([
                #  (
                    #  sql_item_name(c + '_delta', self.flavor) + ' IS NOT NULL'
                #  ) for c in delta_cols
            #  ])
        ) if self.flavor not in ('sqlite', 'oracle', 'mysql', 'mariadb', 'duckdb') else (
            f"CREATE TABLE {unseen_table_name} AS\n"
            + "SELECT "
            + (', '.join([
                (
                    "CASE\n    WHEN " + sql_item_name(c + '_delta', self.flavor)
                    + " != "
                    + get_null_replacement(typ, self.flavor)
                    + " THEN " + sql_item_name(c + '_delta', self.flavor)
                    + "\n    ELSE NULL\nEND "
                    + " AS " + sql_item_name(c, self.flavor)
                ) for c, typ in delta_cols.items()
            ]))
            + f"\nFROM {joined_table_name} joined\n"
            + f"WHERE "
            + '\nAND\n'.join([
                (
                    sql_item_name(c + '_backtrack', self.flavor) + ' IS NULL'
                ) for c in delta_cols
            ])
            #  + "\nAND\n"
            #  + '\nAND\n'.join([
                #  (
                    #  sql_item_name(c + '_delta', self.flavor) + ' IS NOT NULL'
                #  ) for c in delta_cols
            #  ])
        )
    )

    unseen_queries.append(create_unseen_query)

    unseen_success = (
        all(self.exec_queries(unseen_queries, break_on_error=True, debug=debug))
        if on_cols else True
    )
    if not unseen_success:
        self.exec_queries(
            [
                drop_new_query,
                drop_backtrack_query,
                drop_delta_query,
                drop_joined_query,
                drop_unseen_query,
            ],
            break_on_error = False,
            debug = debug,
        )
        return False, f"Could not determine new data for {pipe}."
    unseen_count = self.value(
        (
            "SELECT COUNT(*) FROM "
            + (unseen_table_name if on_cols else delta_table_name)
        ), debug = debug,
    )

    update_queries = []
    drop_update_query = f"DROP TABLE {update_table_name}"
    if on_cols and table_exists(update_table_raw, self, debug=debug):
        update_queries.append(drop_unseen_query)

    create_update_query = (
        (
            "SELECT "
            + (', '.join([
                (
                    "CASE\n    WHEN " + sql_item_name(c + '_delta', self.flavor)
                    + " != " + get_null_replacement(typ, self.flavor)
                    + " THEN " + sql_item_name(c + '_delta', self.flavor)
                    + "\n    ELSE NULL\nEND "
                    + " AS " + sql_item_name(c, self.flavor)
                ) for c, typ in delta_cols.items()
            ]))
            + f"\nINTO {update_table_name}"
            + f"\nFROM {joined_table_name} AS joined\n"
            + f"WHERE "
            + '\nOR\n'.join([
                (
                    sql_item_name(c + '_backtrack', self.flavor) + ' IS NOT NULL'
                ) for c in delta_cols
            ])
        ) if self.flavor not in ('sqlite', 'oracle', 'mysql', 'mariadb', 'duckdb') else (
            f"CREATE TABLE {update_table_name} AS\n"
            + "SELECT "
            + (', '.join([
                (
                    "CASE\n    WHEN " + sql_item_name(c + '_delta', self.flavor)
                    + " != "
                    + get_null_replacement(typ, self.flavor)
                    + " THEN " + sql_item_name(c + '_delta', self.flavor)
                    + "\n    ELSE NULL\nEND "
                    + " AS " + sql_item_name(c, self.flavor)
                ) for c, typ in delta_cols.items()
            ]))
            + f"\nFROM {joined_table_name} joined\n"
            + f"WHERE "
            + '\nOR\n'.join([
                (
                    sql_item_name(c + '_backtrack', self.flavor) + ' IS NOT NULL'
                ) for c in delta_cols
            ])
        )
    )

    update_queries.append(create_update_query)

    update_success = (
        all(self.exec_queries(update_queries, break_on_error=True, debug=debug))
        if on_cols else True
    )
    if not update_success:
        self.exec_queries(
            [
                drop_new_query,
                drop_backtrack_query,
                drop_delta_query,
                drop_joined_query,
                drop_unseen_query,
                drop_update_query,
            ],
            break_on_error = False,
            debug = debug,
        )
        return False, "Could not determine updated data for {pipe}."
    update_count = (
        self.value(f"SELECT COUNT(*) FROM {update_table_name}", debug=debug)
        if on_cols else 0
    )

    apply_update_queries = (
        get_update_queries(
            pipe.target,
            update_table_raw,
            self,
            on_cols,
            debug = debug
        )
        if on_cols else []
    )

    apply_unseen_queries = [
        (
            f"INSERT INTO {pipe_name}\n"
            + f"SELECT *\nFROM " + (unseen_table_name if on_cols else delta_table_name)
        ),
    ]

    apply_queries = (
        (apply_unseen_queries if unseen_count > 0 else [])
        + (apply_update_queries if update_count > 0 else [])
        + [
            drop_new_query,
            drop_backtrack_query,
            drop_delta_query,
        ] + (
            [
                drop_joined_query,
                drop_unseen_query,
                drop_update_query,
            ] if on_cols else []
        )
    )
    success = all(self.exec_queries(apply_queries, break_on_error=False, debug=debug))
    msg = (
        f"Was not able to apply changes to {pipe}."
        if not success else f"Inserted {unseen_count}, updated {update_count} rows."
    )
    return success, msg
def test_connection(self, **kw: Any) ‑> Optional[bool]

Test if a successful connection to the database may be made.

Parameters

**kw: The keyword arguments are passed to retry_connect().

Returns

True if a connection is made, otherwise False or None in case of failure.

Expand source code
def test_connection(
        self,
        **kw: Any
    ) -> Union[bool, None]:
    """
    Test if a successful connection to the database may be made.

    Parameters
    ----------
    **kw:
        The keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`.

    Returns
    -------
    `True` if a connection is made, otherwise `False` or `None` in case of failure.

    """
    import warnings
    from meerschaum.connectors.poll import retry_connect
    _default_kw = {'max_retries': 1, 'retry_wait': 0, 'warn': False, 'connector': self}
    _default_kw.update(kw)
    with warnings.catch_warnings():
        warnings.filterwarnings('ignore', 'Could not')
        try:
            return retry_connect(**_default_kw)
        except Exception as e:
            return False
def to_sql(self, df: pandas.DataFrame, name: str = None, index: bool = False, if_exists: str = 'replace', method: str = '', chunksize: Optional[int] = -1, silent: bool = False, debug: bool = False, as_tuple: bool = False, as_dict: bool = False, **kw) ‑> Union[bool, SuccessTuple]

Upload a DataFrame's contents to the SQL server.

Parameters

df : pd.DataFrame
The DataFrame to be uploaded.
name : str
The name of the table to be created.
index : bool, default False
If True, creates the DataFrame's indices as columns.
if_exists : str, default 'replace'
Drop and create the table ('replace') or append if it exists ('append') or raise Exception ('fail'). Options are ['replace', 'append', 'fail'].
method : str, default ''
None or multi. Details on pandas.to_sql.
as_tuple : bool, default False
If True, return a (success_bool, message) tuple instead of a bool. Defaults to False.
as_dict : bool, default False
If True, return a dictionary of transaction information. The keys are success, msg, start, end, duration, num_rows, chunksize, method, and target.
kw : Any
Additional arguments will be passed to the DataFrame's to_sql function

Returns

Either a bool or a SuccessTuple (depends on as_tuple).

Expand source code
def to_sql(
        self,
        df: pandas.DataFrame,
        name: str = None,
        index: bool = False,
        if_exists: str = 'replace',
        method: str = "",
        chunksize: Optional[int] = -1,
        silent: bool = False,
        debug: bool = False,
        as_tuple: bool = False,
        as_dict: bool = False,
        **kw
    ) -> Union[bool, SuccessTuple]:
    """
    Upload a DataFrame's contents to the SQL server.

    Parameters
    ----------
    df: pd.DataFrame
        The DataFrame to be uploaded.

    name: str
        The name of the table to be created.

    index: bool, default False
        If True, creates the DataFrame's indices as columns.

    if_exists: str, default 'replace'
        Drop and create the table ('replace') or append if it exists
        ('append') or raise Exception ('fail').
        Options are ['replace', 'append', 'fail'].

    method: str, default ''
        None or multi. Details on pandas.to_sql.

    as_tuple: bool, default False
        If `True`, return a (success_bool, message) tuple instead of a `bool`.
        Defaults to `False`.

    as_dict: bool, default False
        If `True`, return a dictionary of transaction information.
        The keys are `success`, `msg`, `start`, `end`, `duration`, `num_rows`, `chunksize`,
        `method`, and `target`.
        
    kw: Any
        Additional arguments will be passed to the DataFrame's `to_sql` function

    Returns
    -------
    Either a `bool` or a `SuccessTuple` (depends on `as_tuple`).
    """
    import time
    import json
    from meerschaum.utils.warnings import error, warn
    import warnings
    import functools
    if name is None:
        error(f"Name must not be `None` to insert data into {self}.")

    ### We're requiring `name` to be positional, and sometimes it's passed in from background jobs.
    kw.pop('name', None)

    from meerschaum.utils.sql import sql_item_name, table_exists, json_flavors, truncate_item_name
    from meerschaum.utils.misc import get_json_cols, is_bcp_available
    from meerschaum.connectors.sql._create_engine import flavor_configs
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy', debug=debug)

    stats = {'target': name, }
    ### resort to defaults if None
    if method == "":
        if self.flavor in _bulk_flavors:
            method = psql_insert_copy
        else:
            ### Should resolve to 'multi' or `None`.
            method = flavor_configs.get(self.flavor, {}).get('to_sql', {}).get('method', 'multi')
    stats['method'] = method.__name__ if hasattr(method, '__name__') else str(method)

    default_chunksize = self._sys_config.get('chunksize', None)
    chunksize = chunksize if chunksize != -1 else default_chunksize
    if chunksize is not None and self.flavor in _max_chunks_flavors:
        if chunksize > _max_chunks_flavors[self.flavor]:
            if chunksize != default_chunksize:
                warn(
                    f"The specified chunksize of {chunksize} exceeds the maximum of "
                    + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n"
                    + f"    Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.",
                    stacklevel = 3,
                )
            chunksize = _max_chunks_flavors[self.flavor]
    stats['chunksize'] = chunksize

    success, msg = False, "Default to_sql message"
    start = time.perf_counter()
    if debug:
        msg = f"Inserting {len(df)} rows with chunksize: {chunksize}..."
        print(msg, end="", flush=True)
    stats['num_rows'] = len(df)

    ### filter out non-pandas args
    import inspect
    to_sql_params = inspect.signature(df.to_sql).parameters
    to_sql_kw = {}
    for k, v in kw.items():
        if k in to_sql_params:
            to_sql_kw[k] = v

    if self.flavor == 'oracle':
        ### For some reason 'replace' doesn't work properly in pandas,
        ### so try dropping first.
        if if_exists == 'replace' and table_exists(name, self, debug=debug):
            success = self.exec("DROP TABLE " + sql_item_name(name, 'oracle')) is not None
            if not success:
                warn(f"Unable to drop {name}")


        ### Enforce NVARCHAR(2000) as text instead of CLOB.
        dtype = to_sql_kw.get('dtype', {})
        for col, typ in df.dtypes.items():
            if str(typ) == 'object':
                dtype[col] = sqlalchemy.types.NVARCHAR(2000)
            elif str(typ).lower().startswith('int'):
                dtype[col] = sqlalchemy.types.INTEGER

        to_sql_kw['dtype'] = dtype

    ### Check for JSON columns.
    if self.flavor not in json_flavors:
        json_cols = get_json_cols(df)
        if json_cols:
            for col in json_cols:
                df[col] = df[col].apply(
                    (
                        lambda x: json.dumps(x, default=str, sort_keys=True)
                        if not isinstance(x, Hashable)
                        else x
                    )
                )

    ### Check if the name is too long.
    truncated_name = truncate_item_name(name, self.flavor)
    if name != truncated_name:
        warn(
            f"Table '{name}' is too long for '{self.flavor}',"
            + f" will instead create the table '{truncated_name}'."
        )

    try:
        with warnings.catch_warnings():
            warnings.filterwarnings('ignore', 'case sensitivity issues')
            df.to_sql(
                name = truncated_name,
                con = self.engine,
                index = index,
                if_exists = if_exists,
                method = method,
                chunksize = chunksize,
                **to_sql_kw
            )
        success = True
    except Exception as e:
        if not silent:
            warn(str(e))
        success, msg = False, str(e)

    end = time.perf_counter()
    if success:
        msg = f"It took {round(end - start, 2)} seconds to sync {len(df)} rows to {name}."
    stats['start'] = start
    stats['end'] = end
    stats['duration'] = end - start

    if debug:
        print(f" done.", flush=True)
        dprint(msg)

    stats['success'] = success
    stats['msg'] = msg
    if as_tuple:
        return success, msg
    if as_dict:
        return stats
    return success
def value(self, query: str, *args: Any, use_pandas: bool = False, **kw: Any) ‑> Any

Execute the provided query and return the first value.

Parameters

query : str
The SQL query to execute.
*args : Any
The arguments passed to meerschaum.connectors.sql.SQLConnector.exec if use_pandas is False (default) or to meerschaum.connectors.sql.SQLConnector.read.
use_pandas : bool, default False
If True, use read(), otherwise use meerschaum.connectors.sql.SQLConnector.exec (default). NOTE: This is always True for DuckDB.
**kw : Any
See args.

Returns

Any value returned from the query.

Expand source code
def value(
        self,
        query: str,
        *args: Any,
        use_pandas: bool = False,
        **kw: Any
    ) -> Any:
    """
    Execute the provided query and return the first value.

    Parameters
    ----------
    query: str
        The SQL query to execute.
        
    *args: Any
        The arguments passed to `meerschaum.connectors.sql.SQLConnector.exec`
        if `use_pandas` is `False` (default) or to `meerschaum.connectors.sql.SQLConnector.read`.
        
    use_pandas: bool, default False
        If `True`, use `meerschaum.connectors.SQLConnector.read`, otherwise use
        `meerschaum.connectors.sql.SQLConnector.exec` (default).
        **NOTE:** This is always `True` for DuckDB.

    **kw: Any
        See `args`.

    Returns
    -------
    Any value returned from the query.

    """
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')
    if self.flavor == 'duckdb':
        use_pandas = True
    if use_pandas:
        try:
            return self.read(query, *args, **kw).iloc[0, 0]
        except Exception as e:
            #  warn(e)
            return None

    _close = kw.get('close', True)
    _commit = kw.get('commit', (self.flavor != 'mssql'))
    try:
        result, connection = self.exec(
            query,
            *args,
            with_connection=True,
            close=False,
            commit=_commit,
            **kw
        )
        first = result.first() if result is not None else None
        _val = first[0] if first is not None else None
    except Exception as e:
        warn(e, stacklevel=3)
        return None
    if _close:
        try:
            connection.close()
        except Exception as e:
            warn("Failed to close connection with exception:\n" + str(e))
    return _val