Module meerschaum.connectors.sql.SQLConnector

Interface with SQL servers using sqlalchemy.

Expand source code
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8

"""
Interface with SQL servers using sqlalchemy.
"""

from __future__ import annotations
from meerschaum.utils.typing import Optional, Any

from meerschaum.connectors import Connector
from meerschaum.utils.warnings import error

class SQLConnector(Connector):
    """
    Connect to SQL databases via `sqlalchemy`.
    
    SQLConnectors may be used as Meerschaum instance connectors.
    Read more about connectors and instances at
    https://meerschaum.io/reference/connectors/

    """

    IS_INSTANCE: bool = True

    from ._create_engine import flavor_configs, create_engine
    from ._sql import read, value, exec, execute, to_sql, exec_queries
    from meerschaum.utils.sql import test_connection
    from ._fetch import fetch, get_pipe_metadef
    from ._cli import cli
    from ._pipes import (
        fetch_pipes_keys,
        create_indices,
        drop_indices,
        get_create_index_queries,
        get_drop_index_queries,
        get_add_columns_queries,
        get_alter_columns_queries,
        delete_pipe,
        get_backtrack_data,
        get_pipe_data,
        get_pipe_data_query,
        register_pipe,
        edit_pipe,
        get_pipe_id,
        get_pipe_attributes,
        sync_pipe,
        sync_pipe_inplace,
        get_sync_time,
        pipe_exists,
        get_pipe_rowcount,
        drop_pipe,
        clear_pipe,
        get_pipe_table,
        get_pipe_columns_types,
    )
    from ._plugins import (
        register_plugin,
        delete_plugin,
        get_plugin_id,
        get_plugin_version,
        get_plugins,
        get_plugin_user_id,
        get_plugin_username,
        get_plugin_attributes,
    )
    from ._users import (
        register_user,
        get_user_id,
        get_users,
        edit_user,
        delete_user,
        get_user_password_hash,
        get_user_type,
        get_user_attributes,
    )
    from ._uri import from_uri, parse_uri
    
    def __init__(
        self,
        label: Optional[str] = None,
        flavor: Optional[str] = None,
        wait: bool = False,
        connect: bool = False,
        debug: bool = False,
        **kw: Any
    ):
        """
        Parameters
        ----------
        label: str, default 'main'
            The identifying label for the connector.
            E.g. for `sql:main`, 'main' is the label.
            Defaults to 'main'.

        flavor: Optional[str], default None
            The database flavor, e.g.
            `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc.
            To see supported flavors, run the `bootstrap connectors` command.

        wait: bool, default False
            If `True`, block until a database connection has been made.
            Defaults to `False`.

        connect: bool, default False
            If `True`, immediately attempt to connect the database and raise
            a warning if the connection fails.
            Defaults to `False`.

        debug: bool, default False
            Verbosity toggle.
            Defaults to `False`.

        kw: Any
            All other arguments will be passed to the connector's attributes.
            Therefore, a connector may be made without being registered,
            as long enough parameters are supplied to the constructor.
        """
        if 'uri' in kw:
            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
            label = label or from_uri_params.get('label', None)
            from_uri_params.pop('label', None)
            kw.update(from_uri_params)

        ### set __dict__ in base class
        super().__init__(
            'sql',
            label = label or self.__dict__.get('label', None),
            **kw
        )

        if self.__dict__.get('flavor', None) == 'sqlite':
            self._reset_attributes()
            self._set_attributes(
                'sql',
                label = label,
                inherit_default = False,
                **kw
            )
            ### For backwards compatability reasons, set the path for sql:local if its missing.
            if self.label == 'local' and not self.__dict__.get('database', None):
                from meerschaum.config._paths import SQLITE_DB_PATH
                self.database = str(SQLITE_DB_PATH)

        ### ensure flavor and label are set accordingly
        if 'flavor' not in self.__dict__:
            if flavor is None and 'uri' not in self.__dict__:
                raise Exception(
                    f"    Missing flavor. Provide flavor as a key for '{self}'."
                )
            self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None)

        self._debug = debug
        ### Store the PID and thread at initialization
        ### so we can dispose of the Pool in child processes or threads.
        import os, threading
        self._pid = os.getpid()
        self._thread_ident = threading.current_thread().ident
        self._sessions = {}
        self._locks = {'_sessions': threading.RLock(), }

        ### verify the flavor's requirements are met
        if self.flavor not in self.flavor_configs:
            error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector")
        if not self.__dict__.get('uri'):
            self.verify_attributes(
                self.flavor_configs[self.flavor].get('requirements', set()),
                debug=debug,
            )

        self.wait = wait
        if self.wait:
            from meerschaum.connectors.poll import retry_connect
            retry_connect(connector=self, debug=debug)

        if connect:
            if not self.test_connection(debug=debug):
                from meerschaum.utils.warnings import warn
                warn(f"Failed to connect with connector '{self}'!", stack=False)

    @property
    def Session(self):
        if '_Session' not in self.__dict__:
            if self.engine is None:
                return None

            from meerschaum.utils.packages import attempt_import
            sqlalchemy_orm = attempt_import('sqlalchemy.orm')
            session_factory = sqlalchemy_orm.sessionmaker(self.engine)
            self._Session = sqlalchemy_orm.scoped_session(session_factory)

        return self._Session

    @property
    def engine(self):
        import os, threading
        ### build the sqlalchemy engine
        if '_engine' not in self.__dict__:
            self._engine = self.create_engine()

        same_process = os.getpid() == self._pid
        same_thread = threading.current_thread().ident == self._thread_ident

        ### handle child processes
        if not same_process:
            self._pid = os.getpid()
            self._thread = threading.current_thread()
            from meerschaum.utils.warnings import warn
            warn(f"Different PID detected. Disposing of connections...")
            self._engine.dispose()

        ### handle different threads
        if not same_thread:
            #  print('different thread!')
            pass

        return self._engine

    @property
    def DATABASE_URL(self):
        return str(self.engine.url)

    @property
    def URI(self):
        return str(self.engine.url)

    @property
    def metadata(self):
        from meerschaum.utils.packages import attempt_import
        sqlalchemy = attempt_import('sqlalchemy')
        if '_metadata' not in self.__dict__:
            self._metadata = sqlalchemy.MetaData(self.engine)
        return self._metadata

    @property
    def db(self) -> Optional[databases.Database]:
        from meerschaum.utils.packages import attempt_import
        databases = attempt_import('databases', lazy=False, install=True)
        url = self.DATABASE_URL
        if 'mysql' in url:
            url = url.replace('+pymysql', '')
        if '_db' not in self.__dict__:
            try:
                self._db = databases.Database(url)
            except KeyError:
                ### Likely encountered an unsupported flavor.
                from meerschaum.utils.warnings import warn
                self._db = None
        return self._db

    def __getstate__(self):
        return self.__dict__

    def __setstate__(self, d):
        self.__dict__.update(d)

    def __call__(self):
        return self

Classes

class SQLConnector (label: Optional[str] = None, flavor: Optional[str] = None, wait: bool = False, connect: bool = False, debug: bool = False, **kw: Any)

Connect to SQL databases via sqlalchemy.

SQLConnectors may be used as Meerschaum instance connectors. Read more about connectors and instances at https://meerschaum.io/reference/connectors/

Parameters

label : str, default 'main'
The identifying label for the connector. E.g. for sql:main, 'main' is the label. Defaults to 'main'.
flavor : Optional[str], default None
The database flavor, e.g. 'sqlite', 'postgresql', 'cockroachdb', etc. To see supported flavors, run the bootstrap connectors command.
wait : bool, default False
If True, block until a database connection has been made. Defaults to False.
connect : bool, default False
If True, immediately attempt to connect the database and raise a warning if the connection fails. Defaults to False.
debug : bool, default False
Verbosity toggle. Defaults to False.
kw : Any
All other arguments will be passed to the connector's attributes. Therefore, a connector may be made without being registered, as long enough parameters are supplied to the constructor.
Expand source code
class SQLConnector(Connector):
    """
    Connect to SQL databases via `sqlalchemy`.
    
    SQLConnectors may be used as Meerschaum instance connectors.
    Read more about connectors and instances at
    https://meerschaum.io/reference/connectors/

    """

    IS_INSTANCE: bool = True

    from ._create_engine import flavor_configs, create_engine
    from ._sql import read, value, exec, execute, to_sql, exec_queries
    from meerschaum.utils.sql import test_connection
    from ._fetch import fetch, get_pipe_metadef
    from ._cli import cli
    from ._pipes import (
        fetch_pipes_keys,
        create_indices,
        drop_indices,
        get_create_index_queries,
        get_drop_index_queries,
        get_add_columns_queries,
        get_alter_columns_queries,
        delete_pipe,
        get_backtrack_data,
        get_pipe_data,
        get_pipe_data_query,
        register_pipe,
        edit_pipe,
        get_pipe_id,
        get_pipe_attributes,
        sync_pipe,
        sync_pipe_inplace,
        get_sync_time,
        pipe_exists,
        get_pipe_rowcount,
        drop_pipe,
        clear_pipe,
        get_pipe_table,
        get_pipe_columns_types,
    )
    from ._plugins import (
        register_plugin,
        delete_plugin,
        get_plugin_id,
        get_plugin_version,
        get_plugins,
        get_plugin_user_id,
        get_plugin_username,
        get_plugin_attributes,
    )
    from ._users import (
        register_user,
        get_user_id,
        get_users,
        edit_user,
        delete_user,
        get_user_password_hash,
        get_user_type,
        get_user_attributes,
    )
    from ._uri import from_uri, parse_uri
    
    def __init__(
        self,
        label: Optional[str] = None,
        flavor: Optional[str] = None,
        wait: bool = False,
        connect: bool = False,
        debug: bool = False,
        **kw: Any
    ):
        """
        Parameters
        ----------
        label: str, default 'main'
            The identifying label for the connector.
            E.g. for `sql:main`, 'main' is the label.
            Defaults to 'main'.

        flavor: Optional[str], default None
            The database flavor, e.g.
            `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc.
            To see supported flavors, run the `bootstrap connectors` command.

        wait: bool, default False
            If `True`, block until a database connection has been made.
            Defaults to `False`.

        connect: bool, default False
            If `True`, immediately attempt to connect the database and raise
            a warning if the connection fails.
            Defaults to `False`.

        debug: bool, default False
            Verbosity toggle.
            Defaults to `False`.

        kw: Any
            All other arguments will be passed to the connector's attributes.
            Therefore, a connector may be made without being registered,
            as long enough parameters are supplied to the constructor.
        """
        if 'uri' in kw:
            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
            label = label or from_uri_params.get('label', None)
            from_uri_params.pop('label', None)
            kw.update(from_uri_params)

        ### set __dict__ in base class
        super().__init__(
            'sql',
            label = label or self.__dict__.get('label', None),
            **kw
        )

        if self.__dict__.get('flavor', None) == 'sqlite':
            self._reset_attributes()
            self._set_attributes(
                'sql',
                label = label,
                inherit_default = False,
                **kw
            )
            ### For backwards compatability reasons, set the path for sql:local if its missing.
            if self.label == 'local' and not self.__dict__.get('database', None):
                from meerschaum.config._paths import SQLITE_DB_PATH
                self.database = str(SQLITE_DB_PATH)

        ### ensure flavor and label are set accordingly
        if 'flavor' not in self.__dict__:
            if flavor is None and 'uri' not in self.__dict__:
                raise Exception(
                    f"    Missing flavor. Provide flavor as a key for '{self}'."
                )
            self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None)

        self._debug = debug
        ### Store the PID and thread at initialization
        ### so we can dispose of the Pool in child processes or threads.
        import os, threading
        self._pid = os.getpid()
        self._thread_ident = threading.current_thread().ident
        self._sessions = {}
        self._locks = {'_sessions': threading.RLock(), }

        ### verify the flavor's requirements are met
        if self.flavor not in self.flavor_configs:
            error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector")
        if not self.__dict__.get('uri'):
            self.verify_attributes(
                self.flavor_configs[self.flavor].get('requirements', set()),
                debug=debug,
            )

        self.wait = wait
        if self.wait:
            from meerschaum.connectors.poll import retry_connect
            retry_connect(connector=self, debug=debug)

        if connect:
            if not self.test_connection(debug=debug):
                from meerschaum.utils.warnings import warn
                warn(f"Failed to connect with connector '{self}'!", stack=False)

    @property
    def Session(self):
        if '_Session' not in self.__dict__:
            if self.engine is None:
                return None

            from meerschaum.utils.packages import attempt_import
            sqlalchemy_orm = attempt_import('sqlalchemy.orm')
            session_factory = sqlalchemy_orm.sessionmaker(self.engine)
            self._Session = sqlalchemy_orm.scoped_session(session_factory)

        return self._Session

    @property
    def engine(self):
        import os, threading
        ### build the sqlalchemy engine
        if '_engine' not in self.__dict__:
            self._engine = self.create_engine()

        same_process = os.getpid() == self._pid
        same_thread = threading.current_thread().ident == self._thread_ident

        ### handle child processes
        if not same_process:
            self._pid = os.getpid()
            self._thread = threading.current_thread()
            from meerschaum.utils.warnings import warn
            warn(f"Different PID detected. Disposing of connections...")
            self._engine.dispose()

        ### handle different threads
        if not same_thread:
            #  print('different thread!')
            pass

        return self._engine

    @property
    def DATABASE_URL(self):
        return str(self.engine.url)

    @property
    def URI(self):
        return str(self.engine.url)

    @property
    def metadata(self):
        from meerschaum.utils.packages import attempt_import
        sqlalchemy = attempt_import('sqlalchemy')
        if '_metadata' not in self.__dict__:
            self._metadata = sqlalchemy.MetaData(self.engine)
        return self._metadata

    @property
    def db(self) -> Optional[databases.Database]:
        from meerschaum.utils.packages import attempt_import
        databases = attempt_import('databases', lazy=False, install=True)
        url = self.DATABASE_URL
        if 'mysql' in url:
            url = url.replace('+pymysql', '')
        if '_db' not in self.__dict__:
            try:
                self._db = databases.Database(url)
            except KeyError:
                ### Likely encountered an unsupported flavor.
                from meerschaum.utils.warnings import warn
                self._db = None
        return self._db

    def __getstate__(self):
        return self.__dict__

    def __setstate__(self, d):
        self.__dict__.update(d)

    def __call__(self):
        return self

Ancestors

  • meerschaum.connectors.Connector.Connector

Class variables

var IS_INSTANCE : bool
var flavor_configs

Static methods

def from_uri(uri: str, label: Union[str, NoneType] = None, as_dict: bool = False) ‑> Union[SQLConnector, Dict[str, Union[str, int]]]

Create a new SQLConnector from a URI string.

Parameters

uri : str
The URI connection string.
label : Optional[str], default None
If provided, use this as the connector label. Otherwise use the determined database name.
as_dict : bool, default False
If True, return a dictionary of the keyword arguments necessary to create a new SQLConnector, otherwise create a new object.

Returns

A new SQLConnector object or a dictionary of attributes (if as_dict is True).

Expand source code
@classmethod
def from_uri(
        cls,
        uri: str,
        label: Optional[str] = None,
        as_dict: bool = False,
    ) -> Union[
        'meerschaum.connectors.SQLConnector',
        Dict[str, Union[str, int]],
    ]:
    """
    Create a new SQLConnector from a URI string.

    Parameters
    ----------
    uri: str
        The URI connection string.

    label: Optional[str], default None
        If provided, use this as the connector label.
        Otherwise use the determined database name.

    as_dict: bool, default False
        If `True`, return a dictionary of the keyword arguments
        necessary to create a new `SQLConnector`, otherwise create a new object.

    Returns
    -------
    A new SQLConnector object or a dictionary of attributes (if `as_dict` is `True`).
    """

    params = cls.parse_uri(uri)
    params['uri'] = uri
    flavor = params.get('flavor', None)
    if not flavor or flavor not in cls.flavor_configs:
        error(f"Invalid flavor '{flavor}' detected from the provided URI.")

    if 'database' not in params:
        error("Unable to determine the database from the provided URI.")

    if flavor in ('sqlite', 'duckdb'):
        if params['database'] == ':memory:':
            params['label'] = label or f'memory_{flavor}'
        else:
            params['label'] = label or params['database'].split(os.path.sep)[-1].lower()
    else:
        params['label'] = label or (
            (
                (params['username'] + '@' if 'username' in params else '')
                + params.get('host', '')
                + ('/' if 'host' in params else '')
                + params.get('database', '')
            ).lower()
        )

    return cls(**params) if not as_dict else params
def parse_uri(uri: str) ‑> Dict[str, Any]

Parse a URI string into a dictionary of parameters.

Parameters

uri : str
The database connection URI.

Returns

A dictionary of attributes.

Examples

>>> parse_uri('sqlite:////home/foo/bar.db')
{'database': '/home/foo/bar.db', 'flavor': 'sqlite'}
>>> parse_uri(
...     'mssql+pyodbc://sa:supersecureSECRETPASSWORD123!@localhost:1439'
...     + '/master?driver=ODBC+Driver+17+for+SQL+Server'
... )
{'host': 'localhost', 'database': 'master', 'username': 'sa',
'password': 'supersecureSECRETPASSWORD123!', 'port': 1439, 'flavor': 'mssql',
'driver': 'ODBC Driver 17 for SQL Server'}
>>>
Expand source code
@staticmethod
def parse_uri(uri: str) -> Dict[str, Any]:
    """
    Parse a URI string into a dictionary of parameters.

    Parameters
    ----------
    uri: str
        The database connection URI.

    Returns
    -------
    A dictionary of attributes.

    Examples
    --------
    >>> parse_uri('sqlite:////home/foo/bar.db')
    {'database': '/home/foo/bar.db', 'flavor': 'sqlite'}
    >>> parse_uri(
    ...     'mssql+pyodbc://sa:supersecureSECRETPASSWORD123!@localhost:1439'
    ...     + '/master?driver=ODBC+Driver+17+for+SQL+Server'
    ... )
    {'host': 'localhost', 'database': 'master', 'username': 'sa',
    'password': 'supersecureSECRETPASSWORD123!', 'port': 1439, 'flavor': 'mssql',
    'driver': 'ODBC Driver 17 for SQL Server'}
    >>> 
    """
    from urllib.parse import parse_qs, urlparse
    sqlalchemy = attempt_import('sqlalchemy')
    parser = sqlalchemy.engine.url.make_url
    params = parser(uri).translate_connect_args()
    params['flavor'] = uri.split(':')[0].split('+')[0]
    if params['flavor'] == 'postgres':
        params['flavor'] = 'postgresql'
    if '?' in uri:
        parsed_uri = urlparse(uri)
        for key, value in parse_qs(parsed_uri.query).items():
            params.update({key: value[0]})
    return params

Instance variables

var DATABASE_URL
Expand source code
@property
def DATABASE_URL(self):
    return str(self.engine.url)
var Session
Expand source code
@property
def Session(self):
    if '_Session' not in self.__dict__:
        if self.engine is None:
            return None

        from meerschaum.utils.packages import attempt_import
        sqlalchemy_orm = attempt_import('sqlalchemy.orm')
        session_factory = sqlalchemy_orm.sessionmaker(self.engine)
        self._Session = sqlalchemy_orm.scoped_session(session_factory)

    return self._Session
var URI
Expand source code
@property
def URI(self):
    return str(self.engine.url)
var db : Optional[databases.Database]
Expand source code
@property
def db(self) -> Optional[databases.Database]:
    from meerschaum.utils.packages import attempt_import
    databases = attempt_import('databases', lazy=False, install=True)
    url = self.DATABASE_URL
    if 'mysql' in url:
        url = url.replace('+pymysql', '')
    if '_db' not in self.__dict__:
        try:
            self._db = databases.Database(url)
        except KeyError:
            ### Likely encountered an unsupported flavor.
            from meerschaum.utils.warnings import warn
            self._db = None
    return self._db
var engine
Expand source code
@property
def engine(self):
    import os, threading
    ### build the sqlalchemy engine
    if '_engine' not in self.__dict__:
        self._engine = self.create_engine()

    same_process = os.getpid() == self._pid
    same_thread = threading.current_thread().ident == self._thread_ident

    ### handle child processes
    if not same_process:
        self._pid = os.getpid()
        self._thread = threading.current_thread()
        from meerschaum.utils.warnings import warn
        warn(f"Different PID detected. Disposing of connections...")
        self._engine.dispose()

    ### handle different threads
    if not same_thread:
        #  print('different thread!')
        pass

    return self._engine
var metadata
Expand source code
@property
def metadata(self):
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')
    if '_metadata' not in self.__dict__:
        self._metadata = sqlalchemy.MetaData(self.engine)
    return self._metadata

Methods

def clear_pipe(self, pipe: Pipe, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw) ‑> SuccessTuple

Delete a pipe's data within a bounded or unbounded interval without dropping the table.

Parameters

pipe : Pipe
The pipe to clear.
begin : Optional[datetime.datetime], default None
Beginning datetime. Inclusive.
end : Optional[datetime.datetime], default None
Ending datetime. Exclusive.
params : Optional[Dict[str, Any]], default None
See build_where().
Expand source code
def clear_pipe(
        self,
        pipe: meerschaum.Pipe,
        begin: Optional[datetime.datetime] = None,
        end: Optional[datetime.datetime] = None,
        params: Optional[Dict[str, Any]] = None,
        debug: bool = False,
        **kw
    ) -> SuccessTuple:
    """
    Delete a pipe's data within a bounded or unbounded interval without dropping the table.

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe to clear.
        
    begin: Optional[datetime.datetime], default None
        Beginning datetime. Inclusive.

    end: Optional[datetime.datetime], default None
         Ending datetime. Exclusive.

    params: Optional[Dict[str, Any]], default None
         See `meerschaum.utils.sql.build_where`.

    """
    if not pipe.exists(debug=debug):
        return True, f"{pipe} does not exist, so nothing was cleared."

    from meerschaum.utils.sql import sql_item_name, build_where, dateadd_str
    from meerschaum.utils.warnings import warn
    pipe_name = sql_item_name(pipe.target, self.flavor)

    if not pipe.columns.get('datetime', None):
        _dt = pipe.guess_datetime()
        dt_name = sql_item_name(_dt, self.flavor) if _dt else None
        is_guess = True
    else:
        _dt = pipe.get_columns('datetime')
        dt_name = sql_item_name(_dt, self.flavor)
        is_guess = False

    if begin is not None or end is not None:
        if is_guess:
            if _dt is None:
                warn(
                    f"No datetime could be determined for {pipe}."
                    + "\n    Ignoring datetime bounds...",
                    stack = False,
                )
                begin, end = None, None
            else:
                warn(
                    f"A datetime wasn't specified for {pipe}.\n"
                    + f"    Using column \"{_dt}\" for datetime bounds...",
                    stack = False,
                )

    valid_params = {}
    if params is not None:
        existing_cols = pipe.get_columns_types(debug=debug)
        valid_params = {k: v for k, v in params.items() if k in existing_cols}
    clear_query = (
        f"DELETE FROM {pipe_name}\nWHERE 1 = 1\n"
        + ('  AND ' + build_where(valid_params, self, with_where=False) if valid_params else '')
        + (
            f'  AND {dt_name} >= ' + dateadd_str(self.flavor, 'day', 0, begin)
            if begin is not None else ''
        ) + (
            f'  AND {dt_name} < ' + dateadd_str(self.flavor, 'day', 0, end)
            if end is not None else ''
        )
    )
    success = self.exec(clear_query, silent=True, debug=debug) is not None
    msg = "Success" if success else f"Failed to clear {pipe}."
    return success, msg
def cli(self, debug: bool = False) ‑> Tuple[bool, str]

Launch an interactive CLI for the SQLConnector's flavor.

Expand source code
def cli(
        self,
        debug : bool = False
    ) -> SuccessTuple:
    """Launch an interactive CLI for the SQLConnector's flavor."""
    from meerschaum.utils.packages import venv_exec, attempt_import
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.warnings import error
    import sys, subprocess

    if self.flavor not in flavor_clis:
        return False, f"No CLI available for flavor '{self.flavor}'."

    if self.flavor == 'duckdb':
        gadwall = attempt_import('gadwall', debug = debug, lazy=False)
        gadwall_shell = gadwall.Gadwall(self.database)
        try:
            gadwall_shell.cmdloop()
        except KeyboardInterrupt:
            pass
        return True, "Success"

    cli_name = flavor_clis[self.flavor]

    ### Install the CLI package and any dependencies.
    cli, cli_main = attempt_import(cli_name, (cli_name + '.main'), lazy=False, debug=debug)
    if cli_name in cli_deps:
        for dep in cli_deps[cli_name]:
            locals()[dep] = attempt_import(dep, lazy=False, warn=False, debug=debug)

    ### NOTE: The `DATABASE_URL` property must be initialized first in case the database is not
    ### yet defined (e.g. 'sql:local').
    cli_arg_str = self.DATABASE_URL
    if self.flavor in ('sqlite', 'duckdb'):
        cli_arg_str = str(self.database)

    ### Define the script to execute to launch the CLI.
    ### The `mssqlcli` script is manually written to avoid telemetry
    ### and because `main.cli()` is not defined.
    launch_cli = f"cli_main.cli(['{cli_arg_str}'])"
    if self.flavor == 'mssql':
        launch_cli = (
            "mssqlclioptionsparser, mssql_cli = attempt_import("
            + "'mssqlcli.mssqlclioptionsparser', 'mssqlcli.mssql_cli', lazy=False)\n"
            + "ms_parser = mssqlclioptionsparser.create_parser()\n"
            + f"ms_options = ms_parser.parse_args(['--server', 'tcp:{self.host},{self.port}', "
            + f"'--database', '{self.database}', "
            + f"'--username', '{self.username}', '--password', '{self.password}'])\n"
            + "ms_object = mssql_cli.MssqlCli(ms_options)\n"
            + "try:\n"
            + "    ms_object.connect_to_database()\n"
            + "    ms_object.run()\n"
            + "finally:\n"
            + "    ms_object.shutdown()"
        )
    elif self.flavor == 'duckdb':
        launch_cli = ()

    try:
        if debug:
            dprint(f'Launching CLI:\n{launch_cli}')
        exec(launch_cli)
        success, msg = True, 'Success'
    except Exception as e:
        success, msg = False, str(e)

    return success, msg
def create_engine(self, include_uri: bool = False, debug: bool = False, **kw) ‑> sqlalchemy.engine.Engine

Create a sqlalchemy engine by building the engine string.

Expand source code
def create_engine(
        self,
        include_uri: bool = False,
        debug: bool = False,
        **kw
    ) -> 'sqlalchemy.engine.Engine':
    """Create a sqlalchemy engine by building the engine string."""
    from meerschaum.utils.packages import attempt_import
    from meerschaum.utils.warnings import error, warn
    sqlalchemy = attempt_import('sqlalchemy')
    import urllib
    import copy
    ### Install and patch required drivers.
    if self.flavor in install_flavor_drivers:
        attempt_import(*install_flavor_drivers[self.flavor], debug=debug, lazy=False, warn=False)
    if self.flavor in require_patching_flavors:
        from meerschaum.utils.packages import determine_version, _monkey_patch_get_distribution
        import pathlib
        for install_name, import_name in require_patching_flavors[self.flavor]:
            pkg = attempt_import(
                import_name,
                debug = debug,
                lazy = False,
                warn = False
            )
            _monkey_patch_get_distribution(
                install_name, determine_version(pathlib.Path(pkg.__file__), venv='mrsm')
            )

    ### supplement missing values with defaults (e.g. port number)
    for a, value in flavor_configs[self.flavor]['defaults'].items():
        if a not in self.__dict__:
            self.__dict__[a] = value

    ### Verify that everything is in order.
    if self.flavor not in flavor_configs:
        error(f"Cannot create a connector with the flavor '{self.flavor}'.")

    _engine = flavor_configs[self.flavor].get('engine', None)
    _username = self.__dict__.get('username', None)
    _password = self.__dict__.get('password', None)
    _host = self.__dict__.get('host', None)
    _port = self.__dict__.get('port', None)
    _database = self.__dict__.get('database', None)
    _driver = self.__dict__.get('driver', None)
    if _driver is not None:
        ### URL-encode the driver if a space is detected.
        ### Not a bullet-proof solution, but this should work for most cases.
        _driver = urllib.parse.quote_plus(_driver) if ' ' in _driver else _driver
    _uri = self.__dict__.get('uri', None)

    ### Handle registering specific dialects (due to installing in virtual environments).
    if self.flavor in flavor_dialects:
        sqlalchemy.dialects.registry.register(*flavor_dialects[self.flavor])

    ### self.sys_config was deepcopied and can be updated safely
    if self.flavor in ("sqlite", "duckdb"):
        engine_str = f"{_engine}:///{_database}" if not _uri else _uri
        if 'create_engine' not in self.sys_config:
            self.sys_config['create_engine'] = {}
        if 'connect_args' not in self.sys_config['create_engine']:
            self.sys_config['create_engine']['connect_args'] = {}
        self.sys_config['create_engine']['connect_args'].update({"check_same_thread" : False})
    else:
        engine_str = (
            _engine + "://" + (_username if _username is not None else '') +
            ((":" + urllib.parse.quote_plus(_password)) if _password is not None else '') +
            "@" + _host + ((":" + str(_port)) if _port is not None else '') +
            (("/" + _database) if _database is not None else '')
            + (("?driver=" + _driver) if _driver is not None else '')
        ) if not _uri else _uri
    if debug:
        dprint(
            (
                (engine_str.replace(':' + _password, ':' + ('*' * len(_password))))
                    if _password is not None else engine_str
            ) + '\n' + f"{self.sys_config.get('create_engine', {}).get('connect_args', {})}"
        )

    _kw_copy = copy.deepcopy(kw)

    ### NOTE: Order of inheritance:
    ###       1. Defaults
    ###       2. System configuration
    ###       3. Connector configuration
    ###       4. Keyword arguments
    _create_engine_args = flavor_configs.get(self.flavor, {}).get('create_engine', {})
    def _apply_create_engine_args(update):
        if 'ALL' not in flavor_configs[self.flavor].get('omit_create_engine', {}):
            _create_engine_args.update(
                { k: v for k, v in update.items()
                    if 'omit_create_engine' not in flavor_configs[self.flavor]
                        or k not in flavor_configs[self.flavor].get('omit_create_engine')
                }
            )
    _apply_create_engine_args(self.sys_config.get('create_engine', {}))
    _apply_create_engine_args(self.__dict__.get('create_engine', {}))
    _apply_create_engine_args(_kw_copy)

    try:
        engine = sqlalchemy.create_engine(
            engine_str,
            ### I know this looks confusing, and maybe it's bad code,
            ### but it's simple. It dynamically parses the config string
            ### and splits it to separate the class name (QueuePool)
            ### from the module name (sqlalchemy.pool).
            poolclass    = getattr(
                attempt_import(
                    ".".join(self.sys_config['poolclass'].split('.')[:-1])
                ),
                self.sys_config['poolclass'].split('.')[-1]
            ),
            echo         = debug,
            **_create_engine_args
        )
    except Exception as e:
        warn(e)
        warn(f"Failed to create connector '{self}'.", stack=False)
        engine = None

    if include_uri:
        return engine, engine_str
    return engine
def create_indices(self, pipe: Pipe, indices: Optional[List[str]] = None, debug: bool = False) ‑> bool

Create a pipe's indices.

Expand source code
def create_indices(
        self,
        pipe: meerschaum.Pipe,
        indices: Optional[List[str]] = None,
        debug: bool = False
    ) -> bool:
    """
    Create a pipe's indices.
    """
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.debug import dprint
    if debug:
        dprint(f"Creating indices for {pipe}...")
    if not pipe.columns:
        warn(f"Unable to create indices for {pipe} without columns.", stack=False)
        return False
    ix_queries = {
        ix: queries
        for ix, queries in self.get_create_index_queries(pipe, debug=debug).items()
        if indices is None or ix in indices
    }
    success = True
    for ix, queries in ix_queries.items():
        ix_success = all(self.exec_queries(queries, debug=debug, silent=True))
        if not ix_success:
            success = False
            if debug:
                dprint(f"Failed to create index on column: {ix}")
    return success
def delete_pipe(self, pipe: Pipe, debug: bool = False) ‑> SuccessTuple

Delete a Pipe's registration and drop its table.

Expand source code
def delete_pipe(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False,
    ) -> SuccessTuple:
    """
    Delete a Pipe's registration and drop its table.
    """
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.sql import sql_item_name
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    ### try dropping first
    drop_tuple = pipe.drop(debug=debug)
    if not drop_tuple[0]:
        return drop_tuple

    if not pipe.id:
        return False, f"{pipe} is not registered."

    ### ensure pipes table exists
    from meerschaum.connectors.sql.tables import get_tables
    pipes = get_tables(mrsm_instance=self, debug=debug)['pipes']

    q = sqlalchemy.delete(pipes).where(pipes.c.pipe_id == pipe.id)
    if not self.exec(q, debug=debug):
        return False, f"Failed to delete registration for '{pipe}'."

    return True, "Success"
def delete_plugin(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False, **kw: Any) ‑> Tuple[bool, str]

Delete a plugin from the plugins table.

Expand source code
def delete_plugin(
        self,
        plugin: 'meerschaum.core.Plugin',
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """Delete a plugin from the plugins table."""
    from meerschaum.utils.warnings import warn, error
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')
    from meerschaum.connectors.sql.tables import get_tables
    plugins = get_tables(mrsm_instance=self, debug=debug)['plugins']

    plugin_id = self.get_plugin_id(plugin, debug=debug)
    if plugin_id is None:
        return True, f"Plugin '{plugin}' was not registered."

    bind_variables = {
        'plugin_id' : plugin_id,
    }

    query = sqlalchemy.delete(plugins).where(plugins.c.plugin_id == plugin_id)
    result = self.exec(query, debug=debug)
    if result is None:
        return False, f"Failed to delete plugin '{plugin}'."
    return True, f"Successfully deleted plugin '{plugin}'."
def delete_user(self, user: meerschaum.core.User, debug: bool = False) ‑> SuccessTuple

Delete a user's record from the users table.

Expand source code
def delete_user(
        self,
        user: meerschaum.core.User,
        debug: bool = False
    ) -> SuccessTuple:
    """Delete a user's record from the users table."""
    ### ensure users table exists
    from meerschaum.connectors.sql.tables import get_tables
    users = get_tables(mrsm_instance=self, debug=debug)['users']
    plugins = get_tables(mrsm_instance=self, debug=debug)['plugins']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)

    if user_id is None:
        return False, f"User '{user.username}' is not registered and cannot be deleted."

    query = sqlalchemy.delete(users).where(users.c.user_id == user_id)

    result = self.exec(query, debug=debug)
    if result is None:
        return False, f"Failed to delete user '{user}'."

    query = sqlalchemy.delete(plugins).where(plugins.c.user_id == user_id)
    result = self.exec(query, debug=debug)
    if result is None:
        return False, f"Failed to delete plugins of user '{user}'."

    return True, f"Successfully deleted user '{user}'"
def drop_indices(self, pipe: Pipe, indices: Optional[List[str]] = None, debug: bool = False) ‑> bool

Drop a pipe's indices.

Expand source code
def drop_indices(
        self,
        pipe: meerschaum.Pipe,
        indices: Optional[List[str]] = None,
        debug: bool = False
    ) -> bool:
    """
    Drop a pipe's indices.
    """
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.debug import dprint
    if debug:
        dprint(f"Dropping indices for {pipe}...")
    if not pipe.columns:
        warn(f"Unable to drop indices for {pipe} without columns.", stack=False)
        return False
    ix_queries = {
        ix: queries
        for ix, queries in self.get_drop_index_queries(pipe, debug=debug).items()
        if indices is None or ix in indices
    }
    success = True
    for ix, queries in ix_queries.items():
        ix_success = all(self.exec_queries(queries, debug=debug, silent=True))
        if not ix_success:
            success = False
            if debug:
                dprint(f"Failed to drop index on column: {ix}")
    return success
def drop_pipe(self, pipe: Pipe, debug: bool = False, **kw) ‑> SuccessTuple

Drop a pipe's tables but maintain its registration.

Parameters

pipe : Pipe
The pipe to drop.
Expand source code
def drop_pipe(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False,
        **kw
    ) -> SuccessTuple:
    """
    Drop a pipe's tables but maintain its registration.

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe to drop.
        
    """
    from meerschaum.utils.sql import table_exists, sql_item_name
    success = True
    target, temp_target = pipe.target, '_' + pipe.target
    target_name, temp_name = (
        sql_item_name(target, self.flavor),
        sql_item_name(temp_target, self.flavor),
    )
    if table_exists(target, self, debug=debug):
        success = self.exec(f"DROP TABLE {target_name}", silent=True, debug=debug) is not None
    if table_exists(temp_target, self, debug=debug):
        success = (
            success
            and self.exec(f"DROP TABLE {temp_name}", silent=True, debug=debug) is not None
        )

    msg = "Success" if success else f"Failed to drop {pipe}."
    return success, msg
def edit_pipe(self, pipe: Pipe = None, patch: bool = False, debug: bool = False, **kw: Any) ‑> SuccessTuple

Persist a Pipe's parameters to its database.

Parameters

pipe : Pipe, default None
The pipe to be edited.
patch : bool, default False
If patch is True, update the existing parameters by cascading. Otherwise overwrite the parameters (default).
debug : bool, default False
Verbosity toggle.
Expand source code
def edit_pipe(
        self,
        pipe : meerschaum.Pipe = None,
        patch: bool = False,
        debug: bool = False,
        **kw : Any
    ) -> SuccessTuple:
    """
    Persist a Pipe's parameters to its database.

    Parameters
    ----------
    pipe: meerschaum.Pipe, default None
        The pipe to be edited.
    patch: bool, default False
        If patch is `True`, update the existing parameters by cascading.
        Otherwise overwrite the parameters (default).
    debug: bool, default False
        Verbosity toggle.
    """

    if pipe.id is None:
        return False, f"{pipe} is not registered and cannot be edited."

    from meerschaum.utils.debug import dprint
    from meerschaum.utils.packages import attempt_import
    from meerschaum.utils.sql import json_flavors
    if not patch:
        parameters = pipe.__dict__.get('_attributes', {}).get('parameters', {})
    else:
        from meerschaum import Pipe
        from meerschaum.config._patch import apply_patch_to_config
        original_parameters = Pipe(
            pipe.connector_keys, pipe.metric_key, pipe.location_key,
            mrsm_instance=pipe.instance_keys
        ).parameters
        parameters = apply_patch_to_config(
            original_parameters,
            pipe.parameters
        )

    ### ensure pipes table exists
    from meerschaum.connectors.sql.tables import get_tables
    pipes = get_tables(mrsm_instance=self, debug=debug)['pipes']

    import json
    sqlalchemy = attempt_import('sqlalchemy')

    values = {
        'parameters' : (
            json.dumps(parameters) if self.flavor in ('duckdb',)
            else parameters
        ),
    }
    q = sqlalchemy.update(pipes).values(**values).where(
        pipes.c.pipe_id == pipe.id
    )

    result = self.exec(q, debug=debug)
    message = (
        f"Successfully edited {pipe}."
        if result is not None else f"Failed to edit {pipe}."
    )
    return (result is not None), message
def edit_user(self, user: meerschaum.core.User, debug: bool = False, **kw: Any) ‑> SuccessTuple

Update an existing user's metadata.

Expand source code
def edit_user(
        self,
        user: meerschaum.core.User,
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """Update an existing user's metadata."""
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')
    from meerschaum.connectors.sql.tables import get_tables
    from meerschaum.utils.sql import json_flavors
    users = get_tables(mrsm_instance=self, debug=debug)['users']

    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
    if user_id is None:
        return False, (
            f"User '{user.username}' does not exist. " +
            f"Register user '{user.username}' before editing."
        )
    user.user_id = user_id

    import json
    valid_tuple = valid_username(user.username)
    if not valid_tuple[0]:
        return valid_tuple

    bind_variables = {
        'user_id' : user_id,
        'username' : user.username,
    }
    if user.password != '':
        bind_variables['password_hash'] = user.password_hash
    if user.email != '':
        bind_variables['email'] = user.email
    if user.attributes is not None and user.attributes != {}:
        bind_variables['attributes'] = (
            json.dumps(user.attributes) if self.flavor in ('duckdb',)
            else user.attributes
        )
    if user.type != '':
        bind_variables['user_type'] = user.type

    query = sqlalchemy.update(users).values(**bind_variables).where(users.c.user_id == user_id)

    result = self.exec(query, debug=debug)
    if result is None:
        return False, f"Failed to edit user '{user}'."
    return True, f"Successfully edited user '{user}'."
def exec(self, query: str, *args: Any, silent: bool = False, debug: bool = False, commit: Optional[bool] = None, close: Optional[bool] = None, with_connection: bool = False, **kw: Any) ‑> Union[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.cursor.LegacyCursorResult, Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection], Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection], None]

Execute SQL code and return the sqlalchemy result, e.g. when calling stored procedures.

If inserting data, please use bind variables to avoid SQL injection!

Parameters

query : Union[str, List[str], Tuple[str]]
The query to execute. If query is a list or tuple, call self.exec_queries() instead.
args : Any
Arguments passed to sqlalchemy.engine.execute.
silent : bool, default False
If True, suppress warnings.
commit : Optional[bool], default None
If True, commit the changes after execution. Causes issues with flavors like 'mssql'. This does not apply if query is a list of strings.
close : Optional[bool], default None
If True, close the connection after execution. Causes issues with flavors like 'mssql'. This does not apply if query is a list of strings.
with_connection : bool, default False
If True, return a tuple including the connection object. This does not apply if query is a list of strings.

Returns

The sqlalchemy result object, or a tuple with the connection if with_connection is provided.

Expand source code
def exec(
        self,
        query: str,
        *args: Any,
        silent: bool = False,
        debug: bool = False,
        commit: Optional[bool] = None,
        close: Optional[bool] = None,
        with_connection: bool = False,
        **kw: Any
    ) -> Union[
            sqlalchemy.engine.result.resultProxy,
            sqlalchemy.engine.cursor.LegacyCursorResult,
            Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection],
            Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection],
            None
    ]:
    """
    Execute SQL code and return the `sqlalchemy` result, e.g. when calling stored procedures.
    
    If inserting data, please use bind variables to avoid SQL injection!

    Parameters
    ----------
    query: Union[str, List[str], Tuple[str]]
        The query to execute.
        If `query` is a list or tuple, call `self.exec_queries()` instead.

    args: Any
        Arguments passed to `sqlalchemy.engine.execute`.
        
    silent: bool, default False
        If `True`, suppress warnings.

    commit: Optional[bool], default None
        If `True`, commit the changes after execution.
        Causes issues with flavors like `'mssql'`.
        This does not apply if `query` is a list of strings.

    close: Optional[bool], default None
        If `True`, close the connection after execution.
        Causes issues with flavors like `'mssql'`.
        This does not apply if `query` is a list of strings.

    with_connection: bool, default False
        If `True`, return a tuple including the connection object.
        This does not apply if `query` is a list of strings.
    
    Returns
    -------
    The `sqlalchemy` result object, or a tuple with the connection if `with_connection` is provided.

    """
    if isinstance(query, (list, tuple)):
        return self.exec_queries(
            list(query),
            *args,
            silent = silent,
            debug = debug,
            **kw
        )

    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import("sqlalchemy")
    if debug:
        dprint("Executing query:\n" + f"{query}")

    _close = close if close is not None else (self.flavor != 'mssql')
    _commit = commit if commit is not None else (
        (self.flavor != 'mssql' or 'select' not in str(query).lower())
    )

    connection = self.engine.connect()
    transaction = connection.begin() if _commit else None
    try:
        result = connection.execute(query, *args, **kw)
        if _commit:
            transaction.commit()
    except Exception as e:
        if debug:
            dprint(f"Failed to execute query:\n\n{query}\n\n{e}")
        if not silent:
            warn(str(e))
        result = None
        if _commit:
            transaction.rollback()
    finally:
        if _close:
            connection.close()

        if with_connection:
            return result, connection

    return result
def exec_queries(self, queries: List[str], break_on_error: bool = False, silent: bool = False, debug: bool = False) ‑> List[sqlalchemy.engine.cursor.LegacyCursorResult]

Execute a list of queries in a single transaction.

Parameters

queries : List[str]
The queries in the transaction to be executed.
break_on_error : bool, default False
If True, stop executing when a query fails.
silent : bool, default False
If True, suppress warnings.

Returns

A list of SQLAlchemy results.

Expand source code
def exec_queries(
        self,
        queries: List[str],
        break_on_error: bool = False,
        silent: bool = False,
        debug: bool = False,
    ) -> List[sqlalchemy.engine.cursor.LegacyCursorResult]:
    """
    Execute a list of queries in a single transaction.

    Parameters
    ----------
    queries: List[str]
        The queries in the transaction to be executed.

    break_on_error: bool, default False
        If `True`, stop executing when a query fails.

    silent: bool, default False
        If `True`, suppress warnings.

    Returns
    -------
    A list of SQLAlchemy results.
    """
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.debug import dprint

    results = []
    with self.engine.begin() as connection:
        for query in queries:
            if debug:
                dprint(query)
            try:
                result = connection.execute(query)
            except Exception as e:
                msg = (f"Encountered error while executing:\n{e}")
                if not silent:
                    warn(msg)
                elif debug:
                    dprint(msg)
                result = None
            results.append(result)
            if result is None and break_on_error:
                break
    return results
def execute(self, *args: Any, **kw: Any) ‑> Optional[sqlalchemy.engine.result.resultProxy]

An alias for meerschaum.connectors.sql.SQLConnector.exec.

Expand source code
def execute(
        self,
        *args : Any,
        **kw : Any
    ) -> Optional[sqlalchemy.engine.result.resultProxy]:
    """
    An alias for `meerschaum.connectors.sql.SQLConnector.exec`.
    """
    return self.exec(*args, **kw)
def fetch(self, pipe: Pipe, begin: Union[datetime.datetime, str, None] = '', end: Union[datetime.datetime, str, None] = None, chunk_hook: Optional[Callable[[pd.DataFrame], Any]] = None, chunksize: Optional[int] = -1, debug: bool = False, **kw: Any) ‑> Union[('pd.DataFrame', None)]

Execute the SQL definition and return a Pandas DataFrame.

Parameters

pipe : Pipe

The pipe object which contains the fetch metadata.

  • pipe.columns['datetime']: str
    • Name of the datetime column for the remote table.
  • pipe.parameters['fetch']: Dict[str, Any]
    • Parameters necessary to execute a query.
  • pipe.parameters['fetch']['definition']: str
    • Raw SQL query to execute to generate the pandas DataFrame.
  • pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
    • How many minutes before begin to search for data (optional).
begin : Union[datetime.datetime, str, None], default None
Most recent datatime to search for data. If backtrack_minutes is provided, subtract backtrack_minutes.
end : Union[datetime.datetime, str, None], default None
The latest datetime to search for data. If end is None, do not bound
debug : bool, default False
Verbosity toggle.

Returns

A pandas DataFrame or None.

Expand source code
def fetch(
        self,
        pipe: meerschaum.Pipe,
        begin: Union[datetime.datetime, str, None] = '',
        end: Union[datetime.datetime, str, None] = None,
        chunk_hook: Optional[Callable[[pd.DataFrame], Any]] = None,
        chunksize: Optional[int] = -1,
        debug: bool = False,
        **kw: Any
    ) -> Union['pd.DataFrame', None]:
    """Execute the SQL definition and return a Pandas DataFrame.

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe object which contains the `fetch` metadata.
        
        - pipe.columns['datetime']: str
            - Name of the datetime column for the remote table.
        - pipe.parameters['fetch']: Dict[str, Any]
            - Parameters necessary to execute a query.
        - pipe.parameters['fetch']['definition']: str
            - Raw SQL query to execute to generate the pandas DataFrame.
        - pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
            - How many minutes before `begin` to search for data (*optional*).

    begin: Union[datetime.datetime, str, None], default None
        Most recent datatime to search for data.
        If `backtrack_minutes` is provided, subtract `backtrack_minutes`.

    end: Union[datetime.datetime, str, None], default None
        The latest datetime to search for data.
        If `end` is `None`, do not bound 

    debug: bool, default False
        Verbosity toggle.
       
    Returns
    -------
    A pandas DataFrame or `None`.

    """
    meta_def = self.get_pipe_metadef(
        pipe,
        begin = begin,
        end = end,
        debug = debug,
        **kw
    )
    df = self.read(meta_def, chunk_hook=chunk_hook, chunksize=chunksize, debug=debug)
    ### if sqlite, parse for datetimes
    if self.flavor == 'sqlite':
        from meerschaum.utils.misc import parse_df_datetimes
        df = parse_df_datetimes(df, debug=debug)
    return df
def fetch_pipes_keys(self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) ‑> Union[List[Tuple[str, str, Union[str, NoneType]]], NoneType]

Return a list of tuples corresponding to the parameters provided.

Parameters

connector_keys : Optional[List[str]], default None
List of connector_keys to search by.
metric_keys : Optional[List[str]], default None
List of metric_keys to search by.
location_keys : Optional[List[str]], default None
List of location_keys to search by.
params : Optional[Dict[str, Any]], default None
Dictionary of additional parameters to search by. E.g. --params pipe_id:1
debug : bool, default False
Verbosity toggle.
Expand source code
def fetch_pipes_keys(
        self,
        connector_keys: Optional[List[str]] = None,
        metric_keys: Optional[List[str]] = None,
        location_keys: Optional[List[str]] = None,
        tags: Optional[List[str]] = None,
        params: Optional[Dict[str, Any]] = None,
        debug: bool = False
    ) -> Optional[List[Tuple[str, str, Optional[str]]]]:
    """
    Return a list of tuples corresponding to the parameters provided.

    Parameters
    ----------
    connector_keys: Optional[List[str]], default None
        List of connector_keys to search by.

    metric_keys: Optional[List[str]], default None
        List of metric_keys to search by.

    location_keys: Optional[List[str]], default None
        List of location_keys to search by.

    params: Optional[Dict[str, Any]], default None
        Dictionary of additional parameters to search by.
        E.g. `--params pipe_id:1`

    debug: bool, default False
        Verbosity toggle.
    """
    from meerschaum.utils.warnings import error
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.packages import attempt_import
    from meerschaum.utils.misc import separate_negation_values
    from meerschaum.utils.sql import OMIT_NULLSFIRST_FLAVORS
    from meerschaum.config.static import _static_config
    sqlalchemy = attempt_import('sqlalchemy')
    import json
    from copy import deepcopy

    if connector_keys is None:
        connector_keys = []
    if metric_keys is None:
        metric_keys = []
    if location_keys is None:
        location_keys = []
    else:
        location_keys = [
            (lk if lk not in ('[None]', 'None', 'null') else None)
                for lk in location_keys
        ]
    if tags is None:
        tags = []

    if params is None:
        params = {}

    ### Add three primary keys to params dictionary
    ###   (separated for convenience of arguments).
    cols = {
        'connector_keys': connector_keys,
        'metric_key': metric_keys,
        'location_key': location_keys,
    }

    ### Make deep copy so we don't mutate this somewhere else.
    parameters = deepcopy(params)
    for col, vals in cols.items():
        ### Allow for IS NULL to be declared as a single-item list ([None]).
        if vals == [None]:
            vals = None
        if vals not in [[], ['*']]:
            parameters[col] = vals
    cols = {k: v for k, v in cols.items() if v != [None]}

    from meerschaum.connectors.sql.tables import get_tables
    pipes = get_tables(mrsm_instance=self, debug=debug)['pipes']

    _params = {}
    for k, v in parameters.items():
        _v = json.dumps(v) if isinstance(v, dict) else v
        _params[k] = _v

    negation_prefix = _static_config()['system']['fetch_pipes_keys']['negation_prefix']
    ### Parse regular params.
    ### If a param begins with '_', negate it instead.
    _where = [
        (
            (pipes.c[key] == val) if not str(val).startswith(negation_prefix)
            else (pipes.c[key] != key)
        ) for key, val in _params.items()
            if not isinstance(val, (list, tuple)) and key in pipes.c
    ]
    q = sqlalchemy.select(
        [pipes.c.connector_keys, pipes.c.metric_key, pipes.c.location_key]
        + ([pipes.c.parameters] if tags else [])
    ).where(sqlalchemy.and_(True, *_where))

    ### Parse IN params and add OR IS NULL if None in list.
    for c, vals in cols.items():
        if not isinstance(vals, (list, tuple)) or not vals or not c in pipes.c:
            continue
        _in_vals, _ex_vals = separate_negation_values(vals)
        ### Include params (positive)
        q = (
            q.where(pipes.c[c].in_(_in_vals)) if None not in _in_vals
            else q.where(sqlalchemy.or_(pipes.c[c].in_(_in_vals), pipes.c[c].is_(None)))
        ) if _in_vals else q
        ### Exclude params (negative)
        q = q.where(pipes.c[c].not_in(_ex_vals)) if _ex_vals else q

    ### Finally, parse tags.
    _in_tags, _ex_tags = separate_negation_values(tags)
    ors = []
    for nt in _in_tags:
        ors.append(
            sqlalchemy.cast(
                pipes.c['parameters'],
                sqlalchemy.String,
            ).like(f'%"tags":%"{nt}"%')
        )
    q = q.where(sqlalchemy.and_(sqlalchemy.or_(*ors).self_group())) if ors else q
    ors = []
    for xt in _ex_tags:
        ors.append(
            sqlalchemy.cast(
                pipes.c['parameters'],
                sqlalchemy.String,
            ).not_like(f'%"tags":%"{xt}"%')
        )
    q = q.where(sqlalchemy.and_(sqlalchemy.or_(*ors).self_group())) if ors else q
    loc_asc = sqlalchemy.asc(pipes.c['location_key'])
    if self.flavor not in OMIT_NULLSFIRST_FLAVORS:
        loc_asc = sqlalchemy.nullsfirst(loc_asc)
    q = q.order_by(
        sqlalchemy.asc(pipes.c['connector_keys']),
        sqlalchemy.asc(pipes.c['metric_key']),
        loc_asc,
    )

    ### execute the query and return a list of tuples
    if debug:
        dprint(q.compile(compile_kwargs={'literal_binds': True}))
    try:
        rows = self.engine.execute(q).fetchall()
    except Exception as e:
        error(str(e))

    _keys = [(row['connector_keys'], row['metric_key'], row['location_key']) for row in rows]
    if not tags:
        return _keys
    ### Make 100% sure that the tags are correct.
    keys = []
    for row in rows:
        ktup = (row['connector_keys'], row['metric_key'], row['location_key'])
        _actual_tags = (
            json.loads(row['parameters']) if isinstance(row['parameters'], str)
            else row['parameters']
        ).get('tags', [])
        for nt in _in_tags:
            if nt in _actual_tags:
                keys.append(ktup)
        for xt in _ex_tags:
            if xt in _actual_tags:
                keys.remove(ktup)
            else:
                keys.append(ktup)
    return keys
def get_add_columns_queries(self, pipe: mrsm.Pipe, df: Union[pd.DataFrame, Dict[str, str]], debug: bool = False) ‑> List[str]

Add new null columns of the correct type to a table from a dataframe.

Parameters

pipe : mrsm.Pipe
The pipe to be altered.
df : Union[pd.DataFrame, Dict[str, str]]
The pandas DataFrame which contains new columns. If a dictionary is provided, assume it maps columns to Pandas data types.

Returns

A list of the ALTER TABLE SQL query or queries to be executed on the provided connector.

Expand source code
def get_add_columns_queries(
        self,
        pipe: mrsm.Pipe,
        df: Union[pd.DataFrame, Dict[str, str]],
        debug: bool = False,
    ) -> List[str]:
    """
    Add new null columns of the correct type to a table from a dataframe.

    Parameters
    ----------
    pipe: mrsm.Pipe
        The pipe to be altered.

    df: Union[pd.DataFrame, Dict[str, str]]
        The pandas DataFrame which contains new columns.
        If a dictionary is provided, assume it maps columns to Pandas data types.

    Returns
    -------
    A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector.
    """
    if not pipe.exists(debug=debug):
        return []
    from meerschaum.utils.sql import get_pd_type, get_db_type, sql_item_name
    from meerschaum.utils.misc import flatten_list
    table_obj = self.get_pipe_table(pipe, debug=debug)
    df_cols_types = (
        {col: str(typ) for col, typ in df.dtypes.items()}
        if not isinstance(df, dict) else df
    )
    db_cols_types = {col: get_pd_type(str(typ.type)) for col, typ in table_obj.columns.items()}
    new_cols = set(df_cols_types) - set(db_cols_types)
    if not new_cols:
        return []

    new_cols_types = {
        col: get_db_type(
            df_cols_types[col],
            self.flavor
        ) for col in new_cols
    }

    query = "ALTER TABLE " + sql_item_name(pipe.target, self.flavor)
    for col, typ in new_cols_types.items():
        query += "\nADD " + sql_item_name(col, self.flavor) + " " + typ + ","
    query = query[:-1]
    if self.flavor != 'duckdb':
        return [query]

    drop_index_queries = list(flatten_list(
        [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()]
    ))
    create_index_queries = list(flatten_list(
        [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()]
    ))

    return drop_index_queries + [query] + create_index_queries
def get_alter_columns_queries(self, pipe: mrsm.Pipe, df: Union[pd.DataFrame, Dict[str, str]], debug: bool = False) ‑> List[str]

If we encounter a column of a different type, set the entire column to text.

Parameters

pipe : mrsm.Pipe
The pipe to be altered.
df : Union[pd.DataFrame, Dict[str, str]]
The pandas DataFrame which may contain altered columns. If a dict is provided, assume it maps columns to Pandas data types.

Returns

A list of the ALTER TABLE SQL query or queries to be executed on the provided connector.

Expand source code
def get_alter_columns_queries(
        self,
        pipe: mrsm.Pipe,
        df: Union[pd.DataFrame, Dict[str, str]],
        debug: bool = False,
    ) -> List[str]:
    """
    If we encounter a column of a different type, set the entire column to text.

    Parameters
    ----------
    pipe: mrsm.Pipe
        The pipe to be altered.

    df: Union[pd.DataFrame, Dict[str, str]]
        The pandas DataFrame which may contain altered columns.
        If a dict is provided, assume it maps columns to Pandas data types.

    Returns
    -------
    A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector.
    """
    if self.flavor == 'sqlite':
        return []
    if not pipe.exists(debug=debug):
        return []
    from meerschaum.utils.sql import get_pd_type, get_db_type, sql_item_name
    from meerschaum.utils.misc import flatten_list
    table_obj = self.get_pipe_table(pipe, debug=debug)
    df_cols_types = (
        {col: str(typ) for col, typ in df.dtypes.items()}
        if not isinstance(df, dict) else df
    )
    db_cols_types = {col: get_pd_type(str(typ.type)) for col, typ in table_obj.columns.items()}
    altered_cols = [
        col for col, typ in df_cols_types.items()
        if typ.lower() != db_cols_types.get(col, 'object').lower()
        and db_cols_types.get(col, 'object') != 'object'
    ]
    if not altered_cols:
        return []

    text_type = get_db_type('object', self.flavor)
    altered_cols_types = {
        col: text_type
        for col in altered_cols
    }

    queries = []
    if self.flavor == 'oracle':
        add_query = "ALTER TABLE " + sql_item_name(pipe.target, self.flavor)
        for col, typ in altered_cols_types.items():
            add_query += "\nADD " + sql_item_name(col + '_temp', self.flavor) + " " + typ + ","
        add_query = add_query[:-1]
        queries.append(add_query)

        populate_temp_query = "UPDATE " + sql_item_name(pipe.target, self.flavor)
        for col, typ in altered_cols_types.items():
            populate_temp_query += (
                "\nSET " + sql_item_name(col + '_temp', self.flavor)
                + ' = ' + sql_item_name(col, self.flavor) + ','
            )
        populate_temp_query = populate_temp_query[:-1]
        queries.append(populate_temp_query)

        set_old_cols_to_null_query = "UPDATE " + sql_item_name(pipe.target, self.flavor)
        for col, typ in altered_cols_types.items():
            set_old_cols_to_null_query += (
                "\nSET " + sql_item_name(col, self.flavor)
                + ' = NULL,'
            )
        set_old_cols_to_null_query = set_old_cols_to_null_query[:-1]
        queries.append(set_old_cols_to_null_query)

        alter_type_query = "ALTER TABLE " + sql_item_name(pipe.target, self.flavor)
        for col, typ in altered_cols_types.items():
            alter_type_query += (
                "\nMODIFY " + sql_item_name(col, self.flavor) + ' '
                + typ + ','
            )
        alter_type_query = alter_type_query[:-1]
        queries.append(alter_type_query)

        set_old_to_temp_query = "UPDATE " + sql_item_name(pipe.target, self.flavor)
        for col, typ in altered_cols_types.items():
            set_old_to_temp_query += (
                "\nSET " + sql_item_name(col, self.flavor)
                + ' = ' + sql_item_name(col + '_temp', self.flavor) + ','
            )
        set_old_to_temp_query = set_old_to_temp_query[:-1]
        queries.append(set_old_to_temp_query)

        drop_temp_query = "ALTER TABLE " + sql_item_name(pipe.target, self.flavor)
        for col, typ in altered_cols_types.items():
            drop_temp_query += (
                "\nDROP COLUMN " + sql_item_name(col + '_temp', self.flavor) + ','
            )
        drop_temp_query = drop_temp_query[:-1]
        queries.append(drop_temp_query)

        return queries


    query = "ALTER TABLE " + sql_item_name(pipe.target, self.flavor)
    for col, typ in altered_cols_types.items():
        alter_col_prefix = (
            'ALTER' if self.flavor not in ('mysql', 'mariadb', 'oracle')
            else 'MODIFY'
        )
        type_prefix = (
            '' if self.flavor in ('mssql', 'mariadb', 'mysql')
            else 'TYPE '
        )
        column_str = 'COLUMN' if self.flavor != 'oracle' else ''
        query += (
            f"\n{alter_col_prefix} {column_str} "
            + sql_item_name(col, self.flavor)
            + " " + type_prefix + typ + ","
        )

    query = query[:-1]
    queries.append(query)
    if self.flavor != 'duckdb':
        return queries

    drop_index_queries = list(flatten_list(
        [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()]
    ))
    create_index_queries = list(flatten_list(
        [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()]
    ))

    return drop_index_queries + queries + create_index_queries
def get_backtrack_data(self, pipe: Optional[Pipe] = None, backtrack_minutes: int = 0, begin: Optional[datetime.datetime] = None, params: Optional[Dict[str, Any]] = None, limit: Optional[int] = None, chunksize: Optional[int] = -1, debug: bool = False) ‑> Union[pandas.DataFrame, None]

Get the most recent backtrack_minutes' worth of data from a Pipe.

Parameters

pipe : meerschaum.Pipe:
The pipe to get data from.
backtrack_minutes : int, default 0
How far into the past to look for data.
begin : Optional[datetime.datetime], default None
Where to start traversing from. Defaults to None, which uses the get_sync_time() value.
params : Optional[Dict[str, Any]], default None
Additional parameters to filter by. See meerschaum.connectors.sql.build_where.
limit : Optional[int], default None
If specified, limit the number of rows retrieved to this value.
chunksize : Optional[int], default -1
The size of dataframe chunks to load into memory.
debug : bool, default False
Verbosity toggle.

Returns

A pd.DataFrame of backtracked data.

Expand source code
def get_backtrack_data(
        self,
        pipe: Optional[meerschaum.Pipe] = None,
        backtrack_minutes: int = 0,
        begin: Optional[datetime.datetime] = None,
        params: Optional[Dict[str, Any]] = None,
        limit: Optional[int] = None,
        chunksize: Optional[int] = -1,
        debug: bool = False
    ) -> Union[pandas.DataFrame, None]:
    """
    Get the most recent backtrack_minutes' worth of data from a Pipe.

    Parameters
    ----------
    pipe: meerschaum.Pipe:
        The pipe to get data from.

    backtrack_minutes: int, default 0
        How far into the past to look for data.

    begin: Optional[datetime.datetime], default None
        Where to start traversing from. Defaults to `None`, which uses the
        `meerschaum.Pipe.get_sync_time` value.

    params: Optional[Dict[str, Any]], default None
        Additional parameters to filter by.
        See `meerschaum.connectors.sql.build_where`.

    limit: Optional[int], default None
        If specified, limit the number of rows retrieved to this value.

    chunksize: Optional[int], default -1
        The size of dataframe chunks to load into memory.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `pd.DataFrame` of backtracked data.

    """
    import datetime
    from meerschaum.utils.warnings import error
    if pipe is None:
        error("Pipe must be provided.")
    if begin is None:
        begin = pipe.get_sync_time(debug=debug)

    return pipe.get_data(
        begin = begin,
        begin_add_minutes = (-1 * backtrack_minutes),
        order = 'desc',
        params = params,
        limit = limit,
        chunksize = chunksize,
        debug = debug,
    )
def get_create_index_queries(self, pipe: Pipe, debug: bool = False) ‑> Dict[str, List[str]]

Return a dictionary mapping columns to a CREATE INDEX or equivalent query.

Parameters

pipe : Pipe
The pipe to which the queries will correspond.

Returns

A dictionary of column names mapping to lists of queries.

Expand source code
def get_create_index_queries(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False,
    ) -> Dict[str, List[str]]:
    """
    Return a dictionary mapping columns to a `CREATE INDEX` or equivalent query.

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe to which the queries will correspond.

    Returns
    -------
    A dictionary of column names mapping to lists of queries.
    """
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.sql import sql_item_name, get_distinct_col_count
    from meerschaum.utils.warnings import warn
    from meerschaum.config import get_config
    index_queries = {}

    indices = pipe.get_indices()

    _datetime = pipe.get_columns('datetime', error=False)
    _datetime_name = sql_item_name(_datetime, self.flavor) if _datetime is not None else None
    _datetime_index_name = (
        sql_item_name(indices['datetime'], self.flavor) if indices.get('datetime', None)
        else None
    )
    _id = pipe.get_columns('id', error=False)
    _id_name = sql_item_name(_id, self.flavor) if _id is not None else None

    _id_index_name = sql_item_name(indices['id'], self.flavor) if indices.get('id') else None
    _pipe_name = sql_item_name(pipe.target, self.flavor)
    _create_space_partition = get_config('system', 'experimental', 'space')

    ### create datetime index
    if _datetime is not None:
        if self.flavor == 'timescaledb':
            _id_count = (
                get_distinct_col_count(_id, f"SELECT {_id_name} FROM {_pipe_name}", self)
                if (_id is not None and _create_space_partition) else None
            )
            dt_query = (
                f"SELECT create_hypertable('{_pipe_name}', " +
                f"'{_datetime}', "
                + (
                    f"'{_id}', {_id_count}, " if (_id is not None and _create_space_partition)
                    else ''
                ) +
                "migrate_data => true);"
            )
        else: ### mssql, sqlite, etc.
            dt_query = (
                f"CREATE INDEX {_datetime_index_name} "
                + f"ON {_pipe_name} ({_datetime_name})"
            )

        index_queries[_datetime] = [dt_query]

    ### create id index
    if _id_name is not None:
        if self.flavor == 'timescaledb':
            ### Already created indices via create_hypertable.
            id_query = (
                None if (_id is not None and _create_space_partition)
                else (
                    f"CREATE INDEX {_id_index_name} ON {_pipe_name} ({_id_name})" if _id is not None
                    else None
                )
            )
            pass
        elif self.flavor == 'citus':
            id_query = [(
                f"CREATE INDEX {_id_index_name} "
                + f"ON {_pipe_name} ({_id_name});"
            ), (
                f"SELECT create_distributed_table('{_pipe_name}', '{_id}');"
            )]
        else: ### mssql, sqlite, etc.
            id_query = f"CREATE INDEX {_id_index_name} ON {_pipe_name} ({_id_name})"

        if id_query is not None:
            index_queries[_id] = [id_query]


    ### Create indices for other label in `pipe.columns`.
    other_indices = {
        ix_key: ix_unquoted
        for ix_key, ix_unquoted in pipe.get_indices().items()
        if ix_key not in ('datetime', 'id')
    }
    for ix_key, ix_unquoted in other_indices.items():
        ix_name = sql_item_name(ix_unquoted, self.flavor)
        col = pipe.columns[ix_key]
        col_name = sql_item_name(col, self.flavor)
        index_queries[col] = [f"CREATE INDEX {ix_name} ON {_pipe_name} ({col_name})"]

    return index_queries
def get_drop_index_queries(self, pipe: Pipe, debug: bool = False) ‑> Dict[str, List[str]]

Return a dictionary mapping columns to a DROP INDEX or equivalent query.

Parameters

pipe : Pipe
The pipe to which the queries will correspond.

Returns

A dictionary of column names mapping to lists of queries.

Expand source code
def get_drop_index_queries(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False,
    ) -> Dict[str, List[str]]:
    """
    Return a dictionary mapping columns to a `DROP INDEX` or equivalent query.

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe to which the queries will correspond.

    Returns
    -------
    A dictionary of column names mapping to lists of queries.
    """
    if not pipe.exists(debug=debug):
        return {}
    from meerschaum.utils.sql import sql_item_name, table_exists, hypertable_queries
    drop_queries = {}
    indices = pipe.get_indices()
    pipe_name = sql_item_name(pipe.target, self.flavor)

    if self.flavor not in hypertable_queries:
        is_hypertable = False
    else:
        is_hypertable_query = hypertable_queries[self.flavor].format(table_name=pipe_name)
        is_hypertable = self.value(is_hypertable_query, silent=True, debug=debug) is not None

    if is_hypertable:
        nuke_queries = []
        temp_table = '_' + pipe.target + '_temp_migration'
        temp_table_name = sql_item_name(temp_table, self.flavor)

        if table_exists(temp_table, self, debug=debug):
            nuke_queries.append(f"DROP TABLE {temp_table_name}")
        nuke_queries += [
            f"SELECT * INTO {temp_table_name} FROM {pipe_name}",
            f"DROP TABLE {pipe_name}",
            f"ALTER TABLE {temp_table_name} RENAME TO {pipe_name}",
        ]
        nuke_ix_keys = ('datetime', 'id')
        nuked = False
        for ix_key in nuke_ix_keys:
            if ix_key in indices and not nuked:
                drop_queries[ix_key] = nuke_queries
                nuked = True

    drop_queries.update({
        ix_key: ["DROP INDEX " + sql_item_name(ix_unquoted, self.flavor)]
        for ix_key, ix_unquoted in indices.items()
        if ix_key not in drop_queries
    })
    return drop_queries
def get_pipe_attributes(self, pipe: Pipe, debug: bool = False) ‑> Dict[str, Any]

Get a Pipe's attributes dictionary.

Expand source code
def get_pipe_attributes(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False,
    ) -> Dict[str, Any]:
    """
    Get a Pipe's attributes dictionary.
    """
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.debug import dprint
    from meerschaum.connectors.sql.tables import get_tables
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')
    pipes = get_tables(mrsm_instance=self, debug=debug)['pipes']

    if pipe.get_id(debug=debug) is None:
        return {}

    try:
        q = sqlalchemy.select([pipes]).where(pipes.c.pipe_id == pipe.id)
        if debug:
            dprint(q)
        attributes = (
            dict(self.exec(q, silent=True, debug=debug).first())
            if self.flavor != 'duckdb'
            else self.read(q, debug=debug).to_dict(orient='records')[0]
        )
    except Exception as e:
        import traceback
        traceback.print_exc()
        warn(e)
        print(pipe)
        return {}

    ### handle non-PostgreSQL databases (text vs JSON)
    if not isinstance(attributes.get('parameters', None), dict):
        try:
            import json
            parameters = json.loads(attributes['parameters'])
            if isinstance(parameters, str) and parameters[0] == '{':
                parameters = json.loads(parameters)
            attributes['parameters'] = parameters
        except Exception as e:
            attributes['parameters'] = {}

    return attributes
def get_pipe_columns_types(self, pipe: Pipe, debug: bool = False) ‑> Optional[Dict[str, str]]

Get the pipe's columns and types.

Parameters

pipe : meerschaum.Pipe:
The pipe to get the columns for.

Returns

A dictionary of columns names (str) and types (str).

Examples

>>> conn.get_pipe_columns_types(pipe)
{
  'dt': 'TIMESTAMP WITHOUT TIMEZONE',
  'id': 'BIGINT',
  'val': 'DOUBLE PRECISION',
}
>>>
Expand source code
def get_pipe_columns_types(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False,
    ) -> Optional[Dict[str, str]]:
    """
    Get the pipe's columns and types.

    Parameters
    ----------
    pipe: meerschaum.Pipe:
        The pipe to get the columns for.
        
    Returns
    -------
    A dictionary of columns names (`str`) and types (`str`).

    Examples
    --------
    >>> conn.get_pipe_columns_types(pipe)
    {
      'dt': 'TIMESTAMP WITHOUT TIMEZONE',
      'id': 'BIGINT',
      'val': 'DOUBLE PRECISION',
    }
    >>> 
    """
    if not pipe.exists(debug=debug):
        return {}
    table_columns = {}
    try:
        pipe_table = self.get_pipe_table(pipe, debug=debug)
        for col in pipe_table.columns:
            table_columns[str(col.name)] = str(col.type)
    except Exception as e:
        import traceback
        traceback.print_exc()
        from meerschaum.utils.warnings import warn
        warn(e)
        table_columns = None

    return table_columns
def get_pipe_data(self, pipe: Optional[Pipe] = None, begin: Union[datetime.datetime, str, None] = None, end: Union[datetime.datetime, str, None] = None, params: Optional[Dict[str, Any]] = None, order: str = 'asc', limit: Optional[int] = None, begin_add_minutes: int = 0, end_add_minutes: int = 0, debug: bool = False, **kw: Any) ‑> Union[pd.DataFrame, None]

Access a pipe's data from the SQL instance.

Parameters

pipe : meerschaum.Pipe:
The pipe to get data from.
begin : Optional[datetime.datetime], default None
If provided, get rows newer than or equal to this value.
end : Optional[datetime.datetime], default None
If provided, get rows older than or equal to this value.
params : Optional[Dict[str, Any]], default None
Additional parameters to filter by. See meerschaum.connectors.sql.build_where.
order : Optional[str], default 'asc'
The selection order for all of the indices in the query. If None, omit the ORDER BY clause.
limit : Optional[int], default None
If specified, limit the number of rows retrieved to this value.
begin_add_minutes : int, default 0
The number of minutes to add to the begin datetime (i.e. DATEADD.
end_add_minutes : int, default 0
The number of minutes to add to the end datetime (i.e. DATEADD.
chunksize : Optional[int], default -1
The size of dataframe chunks to load into memory.
debug : bool, default False
Verbosity toggle.

Returns

A pd.DataFrame of the pipe's data.

Expand source code
def get_pipe_data(
        self,
        pipe: Optional[meerschaum.Pipe] = None,
        begin: Union[datetime.datetime, str, None] = None,
        end: Union[datetime.datetime, str, None] = None,
        params: Optional[Dict[str, Any]] = None,
        order: str = 'asc',
        limit: Optional[int] = None,
        begin_add_minutes: int = 0,
        end_add_minutes: int = 0,
        debug: bool = False,
        **kw: Any
    ) -> Union[pd.DataFrame, None]:
    """
    Access a pipe's data from the SQL instance.

    Parameters
    ----------
    pipe: meerschaum.Pipe:
        The pipe to get data from.

    begin: Optional[datetime.datetime], default None
        If provided, get rows newer than or equal to this value.

    end: Optional[datetime.datetime], default None
        If provided, get rows older than or equal to this value.

    params: Optional[Dict[str, Any]], default None
        Additional parameters to filter by.
        See `meerschaum.connectors.sql.build_where`.

    order: Optional[str], default 'asc'
        The selection order for all of the indices in the query.
        If `None`, omit the `ORDER BY` clause.

    limit: Optional[int], default None
        If specified, limit the number of rows retrieved to this value.

    begin_add_minutes: int, default 0
        The number of minutes to add to the `begin` datetime (i.e. `DATEADD`.

    end_add_minutes: int, default 0
        The number of minutes to add to the `end` datetime (i.e. `DATEADD`.

    chunksize: Optional[int], default -1
        The size of dataframe chunks to load into memory.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `pd.DataFrame` of the pipe's data.

    """
    from meerschaum.utils.sql import sql_item_name
    dtypes = pipe.dtypes
    if dtypes:
        if self.flavor == 'sqlite':
            if not pipe.columns.get('datetime', None):
                _dt = pipe.guess_datetime()
                dt = sql_item_name(_dt, self.flavor) if _dt else None
                is_guess = True
            else:
                _dt = pipe.get_columns('datetime')
                dt = sql_item_name(_dt, self.flavor)
                is_guess = False

            if _dt and 'datetime' not in dtypes.get(_dt, 'object'):
                dtypes[_dt] = 'datetime64[ns]'
    existing_cols = pipe.get_columns_types(debug=debug)
    if existing_cols:
        dtypes = {col: typ for col, typ in dtypes.items() if col in existing_cols}
    if dtypes:
        kw['dtypes'] = dtypes

    query = self.get_pipe_data_query(
        pipe,
        begin = begin,
        end = end,
        params = params,
        order = order,
        limit = limit,
        begin_add_minutes = begin_add_minutes,
        end_add_minutes = end_add_minutes,
        debug = debug,
        **kw
    )
    df = self.read(
        query,
        debug = debug,
        **kw
    )
    if self.flavor == 'sqlite':
        from meerschaum.utils.misc import parse_df_datetimes
        from meerschaum.utils.packages import import_pandas
        pd = import_pandas()
        ### NOTE: We have to consume the iterator here to ensure that datatimes are parsed correctly
        df = parse_df_datetimes(df, debug=debug) if isinstance(df, pd.DataFrame) else (
            [parse_df_datetimes(c, debug=debug) for c in df]
        )
    return df
def get_pipe_data_query(self, pipe: Optional[Pipe] = None, begin: Union[datetime.datetime, str, None] = None, end: Union[datetime.datetime, str, None] = None, params: Optional[Dict[str, Any]] = None, order: str = 'asc', limit: Optional[int] = None, begin_add_minutes: int = 0, end_add_minutes: int = 0, replace_nulls: Optional[str] = None, debug: bool = False, **kw: Any) ‑> Union[str, None]

Return the SELECT query for retrieving a pipe's data from its instance.

Parameters

pipe : meerschaum.Pipe:
The pipe to get data from.
begin : Optional[datetime.datetime], default None
If provided, get rows newer than or equal to this value.
end : Optional[datetime.datetime], default None
If provided, get rows older than or equal to this value.
params : Optional[Dict[str, Any]], default None
Additional parameters to filter by. See meerschaum.connectors.sql.build_where.
order : Optional[str], default 'asc'
The selection order for all of the indices in the query. If None, omit the ORDER BY clause.
limit : Optional[int], default None
If specified, limit the number of rows retrieved to this value.
begin_add_minutes : int, default 0
The number of minutes to add to the begin datetime (i.e. DATEADD.
end_add_minutes : int, default 0
The number of minutes to add to the end datetime (i.e. DATEADD.
chunksize : Optional[int], default -1
The size of dataframe chunks to load into memory.
replace_nulls : Optional[str], default None
If provided, replace null values with this value.
debug : bool, default False
Verbosity toggle.

Returns

A SELECT query to retrieve a pipe's data.

Expand source code
def get_pipe_data_query(
        self,
        pipe: Optional[meerschaum.Pipe] = None,
        begin: Union[datetime.datetime, str, None] = None,
        end: Union[datetime.datetime, str, None] = None,
        params: Optional[Dict[str, Any]] = None,
        order: str = 'asc',
        limit: Optional[int] = None,
        begin_add_minutes: int = 0,
        end_add_minutes: int = 0,
        replace_nulls: Optional[str] = None,
        debug: bool = False,
        **kw: Any
    ) -> Union[str, None]:
    """
    Return the `SELECT` query for retrieving a pipe's data from its instance.

    Parameters
    ----------
    pipe: meerschaum.Pipe:
        The pipe to get data from.

    begin: Optional[datetime.datetime], default None
        If provided, get rows newer than or equal to this value.

    end: Optional[datetime.datetime], default None
        If provided, get rows older than or equal to this value.

    params: Optional[Dict[str, Any]], default None
        Additional parameters to filter by.
        See `meerschaum.connectors.sql.build_where`.

    order: Optional[str], default 'asc'
        The selection order for all of the indices in the query.
        If `None`, omit the `ORDER BY` clause.

    limit: Optional[int], default None
        If specified, limit the number of rows retrieved to this value.

    begin_add_minutes: int, default 0
        The number of minutes to add to the `begin` datetime (i.e. `DATEADD`.

    end_add_minutes: int, default 0
        The number of minutes to add to the `end` datetime (i.e. `DATEADD`.

    chunksize: Optional[int], default -1
        The size of dataframe chunks to load into memory.

    replace_nulls: Optional[str], default None
        If provided, replace null values with this value.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `SELECT` query to retrieve a pipe's data.
    """
    import json
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.misc import items_str
    from meerschaum.utils.sql import sql_item_name, dateadd_str
    from meerschaum.utils.packages import import_pandas
    from meerschaum.utils.warnings import warn
    pd = import_pandas()

    select_cols = "SELECT *"
    if replace_nulls:
        existing_cols = pipe.get_columns_types(debug=debug)
        if existing_cols:
            select_cols = "SELECT "
            for col in existing_cols:
                select_cols += (
                    f"\n    COALESCE({sql_item_name(col, self.flavor)}, "
                    + f"'{replace_nulls}') AS {sql_item_name(col, self.flavor)},"
                )
            select_cols = select_cols[:-1]
    query = f"{select_cols}\nFROM {sql_item_name(pipe.target, self.flavor)}"
    where = ""

    if order is not None:
        default_order = 'asc'
        if order not in ('asc', 'desc'):
            warn(f"Ignoring unsupported order '{order}'. Falling back to '{default_order}'.")
            order = default_order
        order = order.upper()

    existing_cols = pipe.get_columns_types(debug=debug)

    if not pipe.columns.get('datetime', None):
        _dt = pipe.guess_datetime()
        dt = sql_item_name(_dt, self.flavor) if _dt else None
        is_guess = True
    else:
        _dt = pipe.get_columns('datetime')
        dt = sql_item_name(_dt, self.flavor)
        is_guess = False

    quoted_indices = {
        key: sql_item_name(val, self.flavor)
        for key, val in pipe.columns.items()
        if val in existing_cols
    }

    if begin is not None or end is not None:
        if is_guess:
            if _dt is None:
                warn(
                    f"No datetime could be determined for {pipe}."
                    + "\n    Ignoring begin and end...",
                    stack = False,
                )
                begin, end = None, None
            else:
                warn(
                    f"A datetime wasn't specified for {pipe}.\n"
                    + f"    Using column \"{_dt}\" for datetime bounds...",
                    stack = False,
                )

    is_dt_bound = False
    if begin is not None and _dt in existing_cols:
        begin_da = dateadd_str(
            flavor = self.flavor,
            datepart = 'minute',
            number = begin_add_minutes,
            begin = begin
        )
        where += f"{dt} >= {begin_da}" + (" AND " if end is not None else "")
        is_dt_bound = True

    if end is not None and _dt in existing_cols:
        end_da = dateadd_str(
            flavor = self.flavor,
            datepart = 'minute',
            number = end_add_minutes,
            begin = end
        )
        where += f"{dt} < {end_da}"
        is_dt_bound = True

    if params is not None:
        from meerschaum.utils.sql import build_where
        valid_params = {k: v for k, v in params.items() if k in existing_cols}
        if valid_params:
            where += build_where(valid_params, self).replace(
                'WHERE', ('AND' if is_dt_bound else "")
            )

    if len(where) > 0:
        query += "\nWHERE " + where

    if order is not None:
        ### Sort by indices, starting with datetime.
        order_by = ""
        if quoted_indices:
            order_by += "\nORDER BY "
            if _dt and _dt in existing_cols:
                order_by += dt + ' ' + order + ','
            for key, quoted_col_name in quoted_indices.items():
                if key == 'datetime':
                    continue
                order_by += ' ' + quoted_col_name + ' ' + order + ','
            order_by = order_by[:-1]

        query += order_by

    if isinstance(limit, int):
        if self.flavor == 'mssql':
            query = f'SELECT TOP {limit} ' + query[len("SELECT *"):]
        elif self.flavor == 'oracle':
            query = f"SELECT * FROM (\n  {query}\n)\nWHERE ROWNUM = 1"
        else:
            query += f"\nLIMIT {limit}"
    
    if debug:
        to_print = (
            []
            + ([f"begin='{begin}'"] if begin else [])
            + ([f"end='{end}'"] if end else [])
            + ([f"params='{json.dumps(params)}'"] if params else [])
        )
        dprint("Getting pipe data with constraints: " + items_str(to_print, quotes=False))

    return query
def get_pipe_id(self, pipe: Pipe, debug: bool = False) ‑> int

Get a Pipe's ID from the pipes table.

Expand source code
def get_pipe_id(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False,
    ) -> int:
    """
    Get a Pipe's ID from the pipes table.
    """
    from meerschaum.utils.packages import attempt_import
    import json
    sqlalchemy = attempt_import('sqlalchemy')
    from meerschaum.connectors.sql.tables import get_tables
    pipes = get_tables(mrsm_instance=self, debug=debug)['pipes']

    query = sqlalchemy.select([pipes.c.pipe_id]).where(
        pipes.c.connector_keys == pipe.connector_keys
    ).where(
        pipes.c.metric_key == pipe.metric_key
    ).where(
        (pipes.c.location_key == pipe.location_key) if pipe.location_key is not None
        else pipes.c.location_key.is_(None)
    )
    _id = self.value(query, debug=debug)
    if _id is not None:
        _id = int(_id)
    return _id
def get_pipe_metadef(self, pipe: Pipe, params: Optional[Dict[str, Any]] = None, begin: Union[datetime.datetime, str, None] = '', end: Union[datetime.datetime, str, None] = None, debug: bool = False, **kw: Any) ‑> Union[str, None]

Return a pipe's meta definition fetch query (definition with

params: Optional[Dict[str, Any]], default None Optional params dictionary to build the WHERE clause. See build_where().

begin: Union[datetime.datetime, str, None], default None Most recent datatime to search for data. If backtrack_minutes is provided, subtract backtrack_minutes.

end: Union[datetime.datetime, str, None], default None The latest datetime to search for data. If end is None, do not bound

debug: bool, default False Verbosity toggle.

Expand source code
def get_pipe_metadef(
        self,
        pipe: meerschaum.Pipe,
        params: Optional[Dict[str, Any]] = None,
        begin: Union[datetime.datetime, str, None] = '',
        end: Union[datetime.datetime, str, None] = None,
        debug: bool = False,
        **kw: Any
    ) -> Union[str, None]:
    """
    Return a pipe's meta definition fetch query (definition with 

    params: Optional[Dict[str, Any]], default None
        Optional params dictionary to build the `WHERE` clause.
        See `meerschaum.utils.sql.build_where`.

    begin: Union[datetime.datetime, str, None], default None
        Most recent datatime to search for data.
        If `backtrack_minutes` is provided, subtract `backtrack_minutes`.

    end: Union[datetime.datetime, str, None], default None
        The latest datetime to search for data.
        If `end` is `None`, do not bound 

    debug: bool, default False
        Verbosity toggle.
 
    """
    import datetime
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.warnings import warn, error
    from meerschaum.utils.sql import sql_item_name, dateadd_str, build_where
    from meerschaum.config import get_config

    definition = get_pipe_query(pipe)
    btm = get_pipe_backtrack_minutes(pipe) 

    if not pipe.columns.get('datetime', None):
        _dt = pipe.guess_datetime()
        dt_name = sql_item_name(_dt, self.flavor) if _dt else None
        is_guess = True
    else:
        _dt = pipe.get_columns('datetime')
        dt_name = sql_item_name(_dt, self.flavor)
        is_guess = False

    if begin is not None or end is not None:
        if is_guess:
            if _dt is None:
                warn(
                    f"Unable to determine a datetime column for {pipe}."
                    + "\n    Ignoring begin and end...",
                    stack = False,
                )
                begin, end = '', None
            else:
                warn(
                    f"A datetime wasn't specified for {pipe}.\n"
                    + f"    Using column \"{_dt}\" for datetime bounds...",
                    stack = False
                )


    if 'order by' in definition.lower() and 'over' not in definition.lower():
        error("Cannot fetch with an ORDER clause in the definition")

    begin = (
        begin if not (isinstance(begin, str) and begin == '')
        else pipe.get_sync_time(debug=debug)
    )
        
    da = None
    if dt_name:
        ### default: do not backtrack
        begin_da = dateadd_str(
            flavor=self.flavor, datepart='minute', number=(-1 * btm), begin=begin,
        ) if begin else None
        end_da = dateadd_str(
            flavor=self.flavor, datepart='minute', number=1, begin=end,
        ) if end else None

    meta_def = (
        _simple_fetch_query(pipe) if (
            (not (pipe.columns or {}).get('id', None))
            or (not get_config('system', 'experimental', 'join_fetch'))
        ) else _join_fetch_query(pipe, debug=debug, **kw)
    )

    has_where = 'where' in meta_def.lower()[meta_def.lower().rfind('definition'):]
    if dt_name and (begin_da or end_da):
        definition_dt_name = dateadd_str(self.flavor, 'minute', 0, f"definition.{dt_name}")
        meta_def += "\n" + ("AND" if has_where else "WHERE") + " "
        has_where = True
        if begin_da:
            meta_def += f"{definition_dt_name} >= {begin_da}"
        if begin_da and end_da:
            meta_def += " AND "
        if end_da:
            meta_def += f"{definition_dt_name} < {end_da}"

    if params is not None:
        params_where = build_where(params, self, with_where=False)
        meta_def += "\n" + ("AND" if has_where else "WHERE") + " "
        has_where = True
        meta_def += params_where

    return meta_def
def get_pipe_rowcount(self, pipe: Pipe, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, remote: bool = False, params: Optional[Dict[str, Any]] = None, debug: bool = False) ‑> int

Get the rowcount for a pipe in accordance with given parameters.

Parameters

pipe : Pipe
The pipe to query with.
begin : Optional[datetime.datetime], default None
The beginning datetime value.
end : Optional[datetime.datetime], default None
The beginning datetime value.
remote : bool, default False
If True, get the rowcount for the remote table.
params : Optional[Dict[str, Any]], default None
See build_where().
debug : bool, default False
Verbosity toggle.

Returns

An int for the number of rows if the pipe exists, otherwise None.

Expand source code
def get_pipe_rowcount(
        self,
        pipe: meerschaum.Pipe,
        begin: Optional[datetime.datetime] = None,
        end: Optional[datetime.datetime] = None,
        remote: bool = False,
        params: Optional[Dict[str, Any]] = None,
        debug: bool = False
    ) -> int:
    """
    Get the rowcount for a pipe in accordance with given parameters.

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe to query with.
        
    begin: Optional[datetime.datetime], default None
        The beginning datetime value.

    end: Optional[datetime.datetime], default None
        The beginning datetime value.

    remote: bool, default False
        If `True`, get the rowcount for the remote table.

    params: Optional[Dict[str, Any]], default None
        See `meerschaum.utils.sql.build_where`.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    An `int` for the number of rows if the `pipe` exists, otherwise `None`.

    """
    from meerschaum.utils.sql import dateadd_str, sql_item_name
    from meerschaum.utils.warnings import error, warn
    from meerschaum.connectors.sql._fetch import get_pipe_query
    if remote:
        msg = f"'fetch:definition' must be an attribute of {pipe} to get a remote rowcount."
        if 'fetch' not in pipe.parameters:
            error(msg)
            return None
        if 'definition' not in pipe.parameters['fetch']:
            error(msg)
            return None

    _pipe_name = sql_item_name(pipe.target, self.flavor)

    if not pipe.columns.get('datetime', None):
        _dt = pipe.guess_datetime()
        dt = sql_item_name(_dt, self.flavor) if _dt else None
        is_guess = True
    else:
        _dt = pipe.get_columns('datetime')
        dt = sql_item_name(_dt, self.flavor)
        is_guess = False

    if begin is not None or end is not None:
        if is_guess:
            if _dt is None:
                warn(
                    f"No datetime could be determined for {pipe}."
                    + "\n    Ignoring begin and end...",
                    stack = False,
                )
                begin, end = None, None
            else:
                warn(
                    f"A datetime wasn't specified for {pipe}.\n"
                    + f"    Using column \"{_dt}\" for datetime bounds...",
                    stack = False,
                )


    _datetime_name = sql_item_name(
        _dt,
        pipe.instance_connector.flavor if not remote else pipe.connector.flavor
    )
    _cols_names = [
        sql_item_name(col, pipe.instance_connector.flavor if not remote else pipe.connector.flavor)
        for col in set(
            ([_dt] if _dt else [])
            + ([] if params is None else list(params.keys()))
        )
    ]
    if not _cols_names:
        _cols_names = ['*']

    src = (
        f"SELECT {', '.join(_cols_names)} FROM {_pipe_name}"
        if not remote else get_pipe_query(pipe)
    )
    query = (
        f"""
        WITH src AS ({src})
        SELECT COUNT(*)
        FROM src
        """
    ) if self.flavor not in ('mysql', 'mariadb') else (
        f"""
        SELECT COUNT(*)
        FROM ({src}) AS src
        """
    )
    if begin is not None or end is not None:
        query += "WHERE"
    if begin is not None:
        query += f"""
        {dt} >= {dateadd_str(self.flavor, datepart='minute', number=0, begin=begin)}
        """
    if end is not None and begin is not None:
        query += "AND"
    if end is not None:
        query += f"""
        {dt} < {dateadd_str(self.flavor, datepart='minute', number=0, begin=end)}
        """
    if params is not None:
        from meerschaum.utils.sql import build_where
        existing_cols = pipe.get_columns_types(debug=debug)
        valid_params = {k: v for k, v in params.items() if k in existing_cols}
        if valid_params:
            query += build_where(valid_params, self).replace('WHERE', (
                'AND' if (begin is not None or end is not None)
                    else 'WHERE'
                )
            )
        
    result = self.value(query, debug=debug)
    try:
        return int(result)
    except Exception as e:
        return None
def get_pipe_table(self, pipe: Pipe, debug: bool = False) ‑> sqlalchemy.Table

Return the sqlalchemy.Table object for a Pipe.

Parameters

pipe : meerschaum.Pipe:
The pipe in question.

Returns

A sqlalchemy.Table object.

Expand source code
def get_pipe_table(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False,
    ) -> sqlalchemy.Table:
    """
    Return the `sqlalchemy.Table` object for a `meerschaum.Pipe`.

    Parameters
    ----------
    pipe: meerschaum.Pipe:
        The pipe in question.
        

    Returns
    -------
    A `sqlalchemy.Table` object. 

    """
    from meerschaum.utils.sql import get_sqlalchemy_table
    if not pipe.exists(debug=debug):
        return None
    return get_sqlalchemy_table(pipe.target, connector=self, debug=debug, refresh=True)
def get_plugin_attributes(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False) ‑> Dict[str, Any]

Return the attributes of a plugin.

Expand source code
def get_plugin_attributes(
        self,
        plugin: 'meerschaum.core.Plugin',
        debug: bool = False
    ) -> Dict[str, Any]:
    """
    Return the attributes of a plugin.
    """
    ### ensure plugins table exists
    from meerschaum.connectors.sql.tables import get_tables
    import json
    plugins = get_tables(mrsm_instance=self, debug=debug)['plugins']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    query = sqlalchemy.select([plugins.c.attributes]).where(plugins.c.plugin_name == plugin.name)

    _attr = self.value(query, debug=debug)
    if isinstance(_attr, str):
        _attr = json.loads(_attr)
    elif _attr is None:
        _attr = {}
    return _attr
def get_plugin_id(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False) ‑> Union[int, NoneType]

Return a plugin's ID.

Expand source code
def get_plugin_id(
        self,
        plugin: 'meerschaum.core.Plugin',
        debug: bool = False
    ) -> Optional[int]:
    """
    Return a plugin's ID.
    """
    ### ensure plugins table exists
    from meerschaum.connectors.sql.tables import get_tables
    plugins = get_tables(mrsm_instance=self, debug=debug)['plugins']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    query = sqlalchemy.select([plugins.c.plugin_id]).where(plugins.c.plugin_name == plugin.name)
    
    try:
        return int(self.value(query, debug=debug))
    except Exception as e:
        return None
def get_plugin_user_id(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False) ‑> Union[int, NoneType]

Return a plugin's user ID.

Expand source code
def get_plugin_user_id(
        self,
        plugin: 'meerschaum.core.Plugin',
        debug: bool = False
    ) -> Optional[int]:
    """
    Return a plugin's user ID.
    """
    ### ensure plugins table exists
    from meerschaum.connectors.sql.tables import get_tables
    plugins = get_tables(mrsm_instance=self, debug=debug)['plugins']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    query = sqlalchemy.select([plugins.c.user_id]).where(plugins.c.plugin_name == plugin.name)

    try:
        return int(self.value(query, debug=debug))
    except Exception as e:
        return None
def get_plugin_username(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False) ‑> Union[str, NoneType]

Return the username of a plugin's owner.

Expand source code
def get_plugin_username(
        self,
        plugin: 'meerschaum.core.Plugin',
        debug: bool = False
    ) -> Optional[str]:
    """
    Return the username of a plugin's owner.
    """
    ### ensure plugins table exists
    from meerschaum.connectors.sql.tables import get_tables
    plugins = get_tables(mrsm_instance=self, debug=debug)['plugins']
    users = get_tables(mrsm_instance=self, debug=debug)['users']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    query = (
        sqlalchemy.select([users.c.username]).
        where(
            users.c.user_id == plugins.c.user_id
            and plugins.c.plugin_name == plugin.name
        )
    )

    return self.value(query, debug=debug)
def get_plugin_version(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False) ‑> Union[str, NoneType]

Return a plugin's version.

Expand source code
def get_plugin_version(
        self,
        plugin: 'meerschaum.core.Plugin',
        debug: bool = False
    ) -> Optional[str]:
    """
    Return a plugin's version.
    """
    ### ensure plugins table exists
    from meerschaum.connectors.sql.tables import get_tables
    plugins = get_tables(mrsm_instance=self, debug=debug)['plugins']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    query = sqlalchemy.select([plugins.c.version]).where(plugins.c.plugin_name == plugin.name)

    return self.value(query, debug=debug)
def get_plugins(self, user_id: Optional[int] = None, search_term: Optional[str] = None, debug: bool = False, **kw: Any) ‑> List[str]

Return a list of all registered plugins.

Parameters

user_id : Optional[int], default None
If specified, filter plugins by a specific user_id.
search_term : Optional[str], default None
If specified, add a WHERE plugin_name LIKE '{search_term}%' clause to filter the plugins.

Returns

A list of plugin names.

Expand source code
def get_plugins(
        self,
        user_id: Optional[int] = None,
        search_term: Optional[str] = None,
        debug: bool = False,
        **kw: Any
    ) -> List[str]:
    """
    Return a list of all registered plugins.

    Parameters
    ----------
    user_id: Optional[int], default None
        If specified, filter plugins by a specific `user_id`.

    search_term: Optional[str], default None
        If specified, add a `WHERE plugin_name LIKE '{search_term}%'` clause to filter the plugins.


    Returns
    -------
    A list of plugin names.
    """
    ### ensure plugins table exists
    from meerschaum.connectors.sql.tables import get_tables
    plugins = get_tables(mrsm_instance=self, debug=debug)['plugins']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    query = sqlalchemy.select([plugins.c.plugin_name])
    if user_id is not None:
        query = query.where(plugins.c.user_id == user_id)
    if search_term is not None:
        query = query.where(plugins.c.plugin_name.like(search_term + '%'))

    return [row['plugin_name'] for row in self.engine.execute(query).fetchall()]
def get_sync_time(self, pipe: "'Pipe'", params: Optional[Dict[str, Any]] = None, newest: bool = True, round_down: bool = True, debug: bool = False) ‑> datetime.datetime

Get a Pipe's most recent datetime.

Parameters

pipe : Pipe
The pipe to get the sync time for.
params : Optional[Dict[str, Any]], default None
Optional params dictionary to build the WHERE clause. See build_where().
newest : bool, default True
If True, get the most recent datetime (honoring params). If False, get the oldest datetime (ASC instead of DESC).
round_down : bool, default True
If True, round the resulting datetime value down to the nearest minute. Defaults to True.

Returns

A datetime.datetime object if the pipe exists, otherwise None.

Expand source code
def get_sync_time(
        self,
        pipe: 'meerschaum.Pipe',
        params: Optional[Dict[str, Any]] = None,
        newest: bool = True,
        round_down: bool = True,
        debug: bool = False,
    ) -> 'datetime.datetime':
    """Get a Pipe's most recent datetime.

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe to get the sync time for.

    params: Optional[Dict[str, Any]], default None
        Optional params dictionary to build the `WHERE` clause.
        See `meerschaum.utils.sql.build_where`.

    newest: bool, default True
        If `True`, get the most recent datetime (honoring `params`).
        If `False`, get the oldest datetime (ASC instead of DESC).

    round_down: bool, default True
        If `True`, round the resulting datetime value down to the nearest minute.
        Defaults to `True`.

    Returns
    -------
    A `datetime.datetime` object if the pipe exists, otherwise `None`.
    """
    from meerschaum.utils.sql import sql_item_name, build_where
    from meerschaum.utils.warnings import warn
    import datetime
    table = sql_item_name(pipe.target, self.flavor)

    if not pipe.columns.get('datetime', None):
        _dt = pipe.guess_datetime()
        dt = sql_item_name(_dt, self.flavor) if _dt else None
        is_guess = True
    else:
        _dt = pipe.get_columns('datetime')
        dt = sql_item_name(_dt, self.flavor)
        is_guess = False

    if _dt is None:
        return None

    ASC_or_DESC = "DESC" if newest else "ASC"
    existing_cols = pipe.get_columns_types(debug=debug)
    valid_params = {}
    if params is not None:
        valid_params = {k: v for k, v in params.items() if k in existing_cols}
    where = "" if not valid_params else build_where(valid_params, self)
    q = f"SELECT {dt}\nFROM {table}{where}\nORDER BY {dt} {ASC_or_DESC}\nLIMIT 1"
    if self.flavor == 'mssql':
        q = f"SELECT TOP 1 {dt}\nFROM {table}{where}\nORDER BY {dt} {ASC_or_DESC}"
    elif self.flavor == 'oracle':
        q = (
            "SELECT * FROM (\n"
            + f"    SELECT {dt}\nFROM {table}{where}\n    ORDER BY {dt} {ASC_or_DESC}\n"
            + ") WHERE ROWNUM = 1"
        )

    try:
        from meerschaum.utils.misc import round_time
        import datetime
        db_time = self.value(q, silent=True, debug=debug)

        ### No datetime could be found.
        if db_time is None:
            return None
        ### sqlite returns str.
        if isinstance(db_time, str):
            from meerschaum.utils.packages import attempt_import
            dateutil_parser = attempt_import('dateutil.parser')
            st = dateutil_parser.parse(db_time)
        ### Do nothing if a datetime object is returned.
        elif isinstance(db_time, datetime.datetime):
            if hasattr(db_time, 'to_pydatetime'):
                st = db_time.to_pydatetime()
            else:
                st = db_time
        ### Sometimes the datetime is actually a date.
        elif isinstance(db_time, datetime.date):
            st = datetime.datetime.combine(db_time, datetime.datetime.min.time())
        ### Convert pandas timestamp to Python datetime.
        else:
            st = db_time.to_pydatetime()

        ### round down to smooth timestamp
        sync_time = (
            round_time(st, date_delta=datetime.timedelta(minutes=1), to='down')
            if round_down else st
        )

    except Exception as e:
        sync_time = None
        warn(str(e))

    return sync_time
def get_user_attributes(self, user: meerschaum.core.User, debug: bool = False) ‑> Union[Dict[str, Any], None]

Return the user's attributes.

Expand source code
def get_user_attributes(
        self,
        user: meerschaum.core.User,
        debug: bool = False
    ) -> Union[Dict[str, Any], None]:
    """
    Return the user's attributes.
    """
    ### ensure users table exists
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')
    from meerschaum.connectors.sql.tables import get_tables
    users = get_tables(mrsm_instance=self, debug=debug)['users']

    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)

    query = (
        sqlalchemy.select([users.c.attributes]).
        where(users.c.user_id == user_id)
    )

    result = self.value(query, debug=debug)
    if result is not None and not isinstance(result, dict):
        try:
            result = dict(result)
            _parsed = True
        except Exception as e:
            _parsed = False
        if not _parsed:
            try:
                import json
                result = json.loads(result)
                _parsed = True
            except Exception as e:
                _parsed = False
        if not _parsed:
            warn(f"Received unexpected type for attributes: {result}")
    return result
def get_user_id(self, user: meerschaum.core.User, debug: bool = False) ‑> Optional[int]

If a user is registered, return the user_id.

Expand source code
def get_user_id(
        self,
        user: meerschaum.core.User,
        debug : bool = False
    ) -> Optional[int]:
    """If a user is registered, return the `user_id`."""
    ### ensure users table exists
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')
    from meerschaum.connectors.sql.tables import get_tables
    users = get_tables(mrsm_instance=self, debug=debug)['users']

    query = (
        sqlalchemy.select([users.c.user_id]).
        where(users.c.username == user.username)
    )

    result = self.value(query, debug=debug)
    if result is not None:
        return int(result)
    return None
def get_user_password_hash(self, user: meerschaum.core.User, debug: bool = False, **kw: Any) ‑> Optional[str]

Return the password has for a user. NOTE: This may be dangerous and is only allowed if the security settings explicity allow it.

Expand source code
def get_user_password_hash(
        self,
        user: meerschaum.core.User,
        debug: bool = False,
        **kw: Any
    ) -> Optional[str]:
    """
    Return the password has for a user.
    **NOTE**: This may be dangerous and is only allowed if the security settings explicity allow it.
    """
    from meerschaum.utils.debug import dprint
    from meerschaum.connectors.sql.tables import get_tables
    users = get_tables(mrsm_instance=self, debug=debug)['users']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    if user.user_id is not None:
        user_id = user.user_id
        if debug:
            dprint(f"Already given user_id: {user_id}")
    else:
        if debug:
            dprint(f"Fetching user_id...")
        user_id = self.get_user_id(user, debug=debug)

    if user_id is None:
        return None

    query = sqlalchemy.select([users.c.password_hash]).where(users.c.user_id == user_id)

    return self.value(query, debug=debug)
def get_user_type(self, user: meerschaum.core.User, debug: bool = False, **kw: Any) ‑> Optional[str]

Return the user's type.

Expand source code
def get_user_type(
        self,
        user: meerschaum.core.User,
        debug: bool = False,
        **kw: Any
    ) -> Optional[str]:
    """
    Return the user's type.
    """
    from meerschaum.connectors.sql.tables import get_tables
    users = get_tables(mrsm_instance=self, debug=debug)['users']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)

    if user_id is None:
        return None

    query = sqlalchemy.select([users.c.user_type]).where(users.c.user_id == user_id)

    return self.value(query, debug=debug)
def get_users(self, debug: bool = False, **kw: Any) ‑> List[str]

Parameters

debug : bool :
(Default value = False)
**kw : Any :
 

Returns

type
 
Expand source code
def get_users(
        self,
        debug : bool = False,
        **kw : Any
    ) -> List[str]:
    """

    Parameters
    ----------
    debug : bool :
         (Default value = False)
    **kw : Any :
        

    Returns
    -------
    type
        

    """
    ### ensure users table exists
    from meerschaum.connectors.sql.tables import get_tables
    users = get_tables(mrsm_instance=self, debug=debug)['users']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    query = sqlalchemy.select([users.c.username])

    return list(self.read(query, debug=debug)['username'])
def pipe_exists(self, pipe: Pipe, debug: bool = False) ‑> bool

Check that a Pipe's table exists.

Parameters

pipe : meerschaum.Pipe:
The pipe to check.
debug : bool:, default False
Verbosity toggle.

Returns

A bool corresponding to whether a pipe's table exists.

Expand source code
def pipe_exists(
        self,
        pipe : meerschaum.Pipe,
        debug : bool = False
    ) -> bool:
    """
    Check that a Pipe's table exists.

    Parameters
    ----------
    pipe: meerschaum.Pipe:
        The pipe to check.
        
    debug: bool:, default False
        Verbosity toggle.

    Returns
    -------
    A `bool` corresponding to whether a pipe's table exists.

    """
    from meerschaum.utils.sql import table_exists
    exists = table_exists(pipe.target, self, debug=debug)
    if debug:
        from meerschaum.utils.debug import dprint
        dprint(f"{pipe} " + ('exists.' if exists else 'does not exist.'))
    return exists
def read(self, query_or_table: Union[str, sqlalchemy.Query], params: Optional[Dict[str, Any], List[str]] = None, dtype: Optional[Dict[str, Any]] = None, chunksize: Optional[int] = -1, chunk_hook: Optional[Callable[[pandas.DataFrame], Any]] = None, as_hook_results: bool = False, chunks: Optional[int] = None, as_chunks: bool = False, as_iterator: bool = False, silent: bool = False, debug: bool = False, **kw: Any) ‑> Union[pandas.DataFrame, List[pandas.DataFrame], List[Any], None]

Read a SQL query or table into a pandas dataframe.

Parameters

query_or_table : Union[str, sqlalchemy.Query]
The SQL query (sqlalchemy Query or string) or name of the table from which to select.
params : Optional[Dict[str, Any]], default None
List or Dict of parameters to pass to pandas.read_sql(). See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html
dtype : Optional[Dict[str, Any]], default None
A dictionary of data types to pass to pandas.read_sql(). See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html
chunksize : Optional[int], default -1

How many chunks to read at a time. None will read everything in one large chunk. Defaults to system configuration.

NOTE: DuckDB does not allow for chunking.

chunk_hook : Optional[Callable[[pandas.DataFrame], Any]], default None
Hook function to execute once per chunk, e.g. writing and reading chunks intermittently. See --sync-chunks for an example. NOTE: as_iterator MUST be False (default).
as_hook_results : bool, default False

If True, return a List of the outputs of the hook function. Only applicable if chunk_hook is not None.

NOTE: as_iterator MUST be False (default).

chunks : Optional[int], default None
Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and return into a single dataframe. For example, to limit the returned dataframe to 100,000 rows, you could specify a chunksize of 1000 and chunks of 100.
as_chunks : bool, default False
If True, return a list of DataFrames. Otherwise return a single DataFrame. Defaults to False.
as_iterator : bool, default False
If True, return the pandas DataFrame iterator. chunksize must not be None (falls back to 1000 if so), and hooks are not called in this case. Defaults to False.
silent : bool, default False
If True, don't raise warnings in case of errors. Defaults to False.

Returns

A pd.DataFrame (default case), or an iterator, or a list of dataframes / iterators, or None if something breaks.

Expand source code
def read(
        self,
        query_or_table: Union[str, sqlalchemy.Query],
        params: Optional[Dict[str, Any], List[str]] = None,
        dtype: Optional[Dict[str, Any]] = None,
        chunksize: Optional[int] = -1,
        chunk_hook: Optional[Callable[[pandas.DataFrame], Any]] = None,
        as_hook_results: bool = False,
        chunks: Optional[int] = None,
        as_chunks: bool = False,
        as_iterator: bool = False,
        silent: bool = False,
        debug: bool = False,
        **kw: Any
    ) -> Union[
        pandas.DataFrame,
        List[pandas.DataFrame],
        List[Any],
        None,
    ]:
    """
    Read a SQL query or table into a pandas dataframe.

    Parameters
    ----------
    query_or_table: Union[str, sqlalchemy.Query]
        The SQL query (sqlalchemy Query or string) or name of the table from which to select.

    params: Optional[Dict[str, Any]], default None
        `List` or `Dict` of parameters to pass to `pandas.read_sql()`.
        See the pandas documentation for more information:
        https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html

    dtype: Optional[Dict[str, Any]], default None
        A dictionary of data types to pass to `pandas.read_sql()`.
        See the pandas documentation for more information:
        https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html

    chunksize: Optional[int], default -1
        How many chunks to read at a time. `None` will read everything in one large chunk.
        Defaults to system configuration.

        **NOTE:** DuckDB does not allow for chunking.

    chunk_hook: Optional[Callable[[pandas.DataFrame], Any]], default None
        Hook function to execute once per chunk, e.g. writing and reading chunks intermittently.
        See `--sync-chunks` for an example.
        **NOTE:** `as_iterator` MUST be False (default).

    as_hook_results: bool, default False
        If `True`, return a `List` of the outputs of the hook function.
        Only applicable if `chunk_hook` is not None.

        **NOTE:** `as_iterator` MUST be `False` (default).

    chunks: Optional[int], default None
        Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and
        return into a single dataframe.
        For example, to limit the returned dataframe to 100,000 rows,
        you could specify a `chunksize` of `1000` and `chunks` of `100`.

    as_chunks: bool, default False
        If `True`, return a list of DataFrames. Otherwise return a single DataFrame.
        Defaults to `False`.

    as_iterator: bool, default False
        If `True`, return the pandas DataFrame iterator.
        `chunksize` must not be `None` (falls back to 1000 if so),
        and hooks are not called in this case.
        Defaults to `False`.

    silent: bool, default False
        If `True`, don't raise warnings in case of errors.
        Defaults to `False`.

    Returns
    -------
    A `pd.DataFrame` (default case), or an iterator, or a list of dataframes / iterators,
    or `None` if something breaks.

    """
    if chunks is not None and chunks <= 0:
        return []
    from meerschaum.utils.sql import sql_item_name
    from meerschaum.utils.packages import attempt_import, import_pandas
    import warnings
    pd = import_pandas()
    sqlalchemy = attempt_import("sqlalchemy")
    default_chunksize = self.sys_config.get('chunksize', None)
    chunksize = chunksize if chunksize != -1 else default_chunksize
    if chunksize is None and as_iterator:
        if not silent and self.flavor not in _disallow_chunks_flavors:
            warn(
                f"An iterator may only be generated if chunksize is not None.\n"
                + "Falling back to a chunksize of 1000.", stacklevel=3,
            )
        chunksize = 1000
    if chunksize is not None and self.flavor in _max_chunks_flavors:
        if chunksize > _max_chunks_flavors[self.flavor]:
            if chunksize != default_chunksize:
                warn(
                    f"The specified chunksize of {chunksize} exceeds the maximum of "
                    + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n"
                    + f"    Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.",
                    stacklevel = 3,
                )
            chunksize = _max_chunks_flavors[self.flavor]

    ### NOTE: A bug in duckdb_engine does not allow for chunks.
    if chunksize is not None and self.flavor in _disallow_chunks_flavors:
        chunksize = None

    if debug:
        import time
        start = time.perf_counter()
        dprint(query_or_table)
        dprint(f"Fetching with chunksize: {chunksize}")

    ### This might be sqlalchemy object or the string of a table name.
    ### We check for spaces and quotes to see if it might be a weird table.
    if (
        ' ' not in str(query_or_table)
        or (
            ' ' in str(query_or_table)
            and str(query_or_table).startswith('"')
            and str(query_or_table).endswith('"')
        )
    ):
        query_or_table = sql_item_name(str(query_or_table), self.flavor)
        if debug:
            dprint(f"Reading from table {query_or_table}")
        formatted_query = str(sqlalchemy.text("SELECT * FROM " + str(query_or_table)))
    else:
        try:
            formatted_query = str(sqlalchemy.text(query_or_table))
        except Exception as e:
            formatted_query = query_or_table

    try:
        with warnings.catch_warnings():
            warnings.filterwarnings('ignore', 'case sensitivity issues')
            chunk_generator = pd.read_sql_query(
                formatted_query,
                self.engine,
                params = params,
                chunksize = chunksize,
                dtype = dtype,
            )
    except Exception as e:
        import inspect
        if debug:
            dprint(f"Failed to execute query:\n\n{query_or_table}\n\n")
        if not silent:
            warn(str(e), stacklevel=3)

        return None

    chunk_list = []
    read_chunks = 0
    chunk_hook_results = []
    if chunksize is None:
        chunk_list.append(chunk_generator)
    elif as_iterator:
        return chunk_generator
    else:
        for chunk in chunk_generator:
            if chunk_hook is not None:
                chunk_hook_results.append(
                    chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw)
                )
            chunk_list.append(chunk)
            read_chunks += 1
            if chunks is not None and read_chunks >= chunks:
                break

    ### If no chunks returned, read without chunks
    ### to get columns
    if len(chunk_list) == 0:
        with warnings.catch_warnings():
            warnings.filterwarnings('ignore', 'case sensitivity issues')
            chunk_list.append(
                pd.read_sql_query(
                    formatted_query,
                    self.engine,
                    params = params, 
                    dtype = dtype,
                )
            )

    ### call the hook on any missed chunks.
    if chunk_hook is not None and len(chunk_list) > len(chunk_hook_results):
        for c in chunk_list[len(chunk_hook_results):]:
            chunk_hook_results.append(
                chunk_hook(c, chunksize=chunksize, debug=debug, **kw)
            )

    ### chunksize is not None so must iterate
    if debug:
        end = time.perf_counter()
        dprint(f"Fetched {len(chunk_list)} chunks in {round(end - start, 2)} seconds.")

    if as_hook_results:
        return chunk_hook_results
    
    ### Skip `pd.concat()` if `as_chunks` is specified.
    if as_chunks:
        for c in chunk_list:
            c.reset_index(drop=True, inplace=True)
        return chunk_list

    return pd.concat(chunk_list).reset_index(drop=True)
def register_pipe(self, pipe: Pipe, debug: bool = False) ‑> SuccessTuple

Register a new pipe. A pipe's attributes must be set before registering.

Expand source code
def register_pipe(
        self,
        pipe: meerschaum.Pipe,
        debug: bool = False,
    ) -> SuccessTuple:
    """
    Register a new pipe.
    A pipe's attributes must be set before registering.
    """
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.packages import attempt_import
    from meerschaum.utils.sql import json_flavors

    ### ensure pipes table exists
    from meerschaum.connectors.sql.tables import get_tables
    pipes = get_tables(mrsm_instance=self, debug=debug)['pipes']

    if pipe.get_id(debug=debug) is not None:
        return False, f"{pipe} is already registered."

    ### NOTE: if `parameters` is supplied in the Pipe constructor,
    ###       then `pipe.parameters` will exist and not be fetched from the database.

    ### 1. Prioritize the Pipe object's `parameters` first.
    ###    E.g. if the user manually sets the `parameters` property
    ###    or if the Pipe already exists
    ###    (which shouldn't be able to be registered anyway but that's an issue for later).
    parameters = None
    try:
        parameters = pipe.parameters
    except Exception as e:
        if debug:
            dprint(str(e))
        parameters = None

    ### ensure `parameters` is a dictionary
    if parameters is None:
        parameters = {}

    import json
    sqlalchemy = attempt_import('sqlalchemy')
    values = {
        'connector_keys' : pipe.connector_keys,
        'metric_key'     : pipe.metric_key,
        'location_key'   : pipe.location_key,
        'parameters'     : (
            json.dumps(parameters) if self.flavor not in json_flavors else parameters
        ),
    }
    query = sqlalchemy.insert(pipes).values(**values)
    result = self.exec(query, debug=debug)
    if result is None:
        return False, f"Failed to register {pipe}."
    return True, f"Successfully registered {pipe}."
def register_plugin(self, plugin: "'meerschaum.core.Plugin'", force: bool = False, debug: bool = False, **kw: Any) ‑> Tuple[bool, str]

Register a new plugin to the plugins table.

Expand source code
def register_plugin(
        self,
        plugin: 'meerschaum.core.Plugin',
        force: bool = False,
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """Register a new plugin to the plugins table."""
    from meerschaum.utils.warnings import warn, error
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')
    from meerschaum.utils.sql import json_flavors
    from meerschaum.connectors.sql.tables import get_tables
    plugins = get_tables(mrsm_instance=self, debug=debug)['plugins']

    old_id = self.get_plugin_id(plugin, debug=debug)

    ### Check for version conflict. May be overridden with `--force`.
    if old_id is not None and not force:
        old_version = self.get_plugin_version(plugin, debug=debug)
        new_version = plugin.version
        if old_version is None:
            old_version = ''
        if new_version is None:
            new_version = ''

        ### verify that the new version is greater than the old
        packaging_version = attempt_import('packaging.version')
        if (
            old_version and new_version
            and packaging_version.parse(old_version) >= packaging_version.parse(new_version)
        ):
            return False, (
                f"Version '{new_version}' of plugin '{plugin}' " +
                f"must be greater than existing version '{old_version}'."
            )

    import json
    bind_variables = {
        'plugin_name' : plugin.name,
        'version' : plugin.version,
        'attributes' : (
            json.dumps(plugin.attributes) if self.flavor not in json_flavors else plugin.attributes
        ),
        'user_id' : plugin.user_id,
    }

    if old_id is None:
        query = sqlalchemy.insert(plugins).values(**bind_variables)
    else:
        query = (
            sqlalchemy.update(plugins).
            values(**bind_variables).
            where(plugins.c.plugin_id == old_id)
        )

    result = self.exec(query, debug=debug)
    if result is None:
        return False, f"Failed to register plugin '{plugin}'."
    return True, f"Successfully registered plugin '{plugin}'."
def register_user(self, user: meerschaum.core.User, debug: bool = False, **kw: Any) ‑> SuccessTuple

Register a new user.

Expand source code
def register_user(
        self,
        user: meerschaum.core.User,
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """Register a new user."""
    from meerschaum.utils.warnings import warn, error, info
    from meerschaum.utils.packages import attempt_import
    from meerschaum.utils.sql import json_flavors
    sqlalchemy = attempt_import('sqlalchemy')

    valid_tuple = valid_username(user.username)
    if not valid_tuple[0]:
        return valid_tuple

    old_id = self.get_user_id(user, debug=debug)

    if old_id is not None:
        return False, f"User '{user}' already exists."

    ### ensure users table exists
    from meerschaum.connectors.sql.tables import get_tables
    tables = get_tables(mrsm_instance=self, debug=debug)

    import json
    bind_variables = {
        'username' : user.username,
        'email' : user.email,
        'password_hash' : user.password_hash,
        'user_type' : user.type,
        'attributes' : (
            json.dumps(user.attributes) if self.flavor not in json_flavors else user.attributes
        ),
    }
    if old_id is not None:
        return False, f"User '{user.username}' already exists."
    if old_id is None:
        query = (
            sqlalchemy.insert(tables['users']).
            values(**bind_variables)
        )

    result = self.exec(query, debug=debug)
    if result is None:
        return False, f"Failed to register user '{user}'."
    return True, f"Successfully registered user '{user}'."
def sync_pipe(self, pipe: Pipe, df: Union[pandas.DataFrame, str, Dict[Any, Any], None] = None, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, chunksize: Optional[int] = -1, check_existing: bool = True, blocking: bool = True, debug: bool = False, **kw: Any) ‑> SuccessTuple

Sync a pipe using a database connection.

Parameters

pipe : Pipe
The Meerschaum Pipe instance into which to sync the data.
df : Union[pandas.DataFrame, str, Dict[Any, Any], List[Dict[str, Any]]]
An optional DataFrame or equivalent to sync into the pipe. Defaults to None.
begin : Optional[datetime.datetime], default None
Optionally specify the earliest datetime to search for data. Defaults to None.
end : Optional[datetime.datetime], default None
Optionally specify the latest datetime to search for data. Defaults to None.
chunksize : Optional[int], default -1
Specify the number of rows to sync per chunk. If -1, resort to system configuration (default is 900). A chunksize of None will sync all rows in one transaction. Defaults to -1.
check_existing : bool, default True
If True, pull and diff with existing data from the pipe. Defaults to True.
blocking : bool, default True
If True, wait for sync to finish and return its result, otherwise asyncronously sync. Defaults to True.
debug : bool, default False
Verbosity toggle. Defaults to False.
kw : Any
Catch-all for keyword arguments.

Returns

A SuccessTuple of success (bool) and message (str).

Expand source code
def sync_pipe(
        self,
        pipe: meerschaum.Pipe,
        df: Union[pandas.DataFrame, str, Dict[Any, Any], None] = None,
        begin: Optional[datetime.datetime] = None,
        end: Optional[datetime.datetime] = None,
        chunksize: Optional[int] = -1,
        check_existing: bool = True,
        blocking: bool = True,
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """
    Sync a pipe using a database connection.

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The Meerschaum Pipe instance into which to sync the data.

    df: Union[pandas.DataFrame, str, Dict[Any, Any], List[Dict[str, Any]]]
        An optional DataFrame or equivalent to sync into the pipe.
        Defaults to `None`.

    begin: Optional[datetime.datetime], default None
        Optionally specify the earliest datetime to search for data.
        Defaults to `None`.

    end: Optional[datetime.datetime], default None
        Optionally specify the latest datetime to search for data.
        Defaults to `None`.

    chunksize: Optional[int], default -1
        Specify the number of rows to sync per chunk.
        If `-1`, resort to system configuration (default is `900`).
        A `chunksize` of `None` will sync all rows in one transaction.
        Defaults to `-1`.

    check_existing: bool, default True
        If `True`, pull and diff with existing data from the pipe. Defaults to `True`.

    blocking: bool, default True
        If `True`, wait for sync to finish and return its result, otherwise asyncronously sync.
        Defaults to `True`.

    debug: bool, default False
        Verbosity toggle. Defaults to False.

    kw: Any
        Catch-all for keyword arguments.

    Returns
    -------
    A `SuccessTuple` of success (`bool`) and message (`str`).
    """
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.packages import import_pandas
    from meerschaum.utils.sql import get_update_queries, sql_item_name
    from meerschaum.utils.misc import generate_password
    from meerschaum import Pipe
    import time
    import copy
    pd = import_pandas()
    if df is None:
        msg = f"DataFrame is None. Cannot sync {pipe}."
        warn(msg)
        return False, msg

    start = time.perf_counter()

    ### if Pipe is not registered
    if not pipe.get_id(debug=debug):
        register_tuple = pipe.register(debug=debug)
        if not register_tuple[0]:
            return register_tuple

    ### quit here if implicitly syncing MQTT pipes.
    ### (pipe.sync() is called in the callback of the MQTTConnector.fetch() method)
    if df is None and pipe.connector.type == 'mqtt':
        return True, "Success"

    ### df is the dataframe returned from the remote source
    ### via the connector
    if debug:
        dprint("Fetched data:\n" + str(df))

    if not isinstance(df, pd.DataFrame):
        df = pipe.enforce_dtypes(df, debug=debug)

    ### if table does not exist, create it with indices
    is_new = False
    add_cols_query = None
    if not pipe.exists(debug=debug):
        check_existing = False
        is_new = True
    else:
        ### Check for new columns.
        add_cols_queries = self.get_add_columns_queries(pipe, df, debug=debug)
        if add_cols_queries:
            if not self.exec_queries(add_cols_queries, debug=debug):
                warn(f"Failed to add new columns to {pipe}.")

        alter_cols_queries = self.get_alter_columns_queries(pipe, df, debug=debug)
        if alter_cols_queries:
            if not self.exec_queries(alter_cols_queries, debug=debug):
                warn(f"Failed to alter columns for {pipe}.")


    unseen_df, update_df, delta_df = (
        pipe.filter_existing(
            df,
            chunksize = chunksize,
            begin = begin,
            end = end,
            debug = debug,
            **kw
        ) if check_existing else (df, None, df)
    )
    if debug:
        dprint("Delta data:\n" + str(delta_df))
        dprint("Unseen data:\n" + str(unseen_df))
        if update_df is not None:
            dprint("Update data:\n" + str(update_df))

    if update_df is not None and not update_df.empty:
        transact_id = generate_password(6)
        temp_target = '_' + transact_id + '_' + pipe.target
        update_kw = copy.deepcopy(kw)
        update_kw.update({
            'name': temp_target,
            'if_exists': 'append',
            'chunksize': chunksize,
            'debug': debug,
        })
        self.to_sql(update_df, **update_kw)
        temp_pipe = Pipe(
            pipe.connector_keys + '_', pipe.metric_key, pipe.location_key,
            instance = pipe.instance_keys,
            columns = pipe.columns,
            target = temp_target,
        )

        existing_cols = pipe.get_columns_types(debug=debug)
        join_cols = [
            col for col_key, col in pipe.columns.items()
            if col and col_key != 'value' and col in existing_cols
        ]

        queries = get_update_queries(
            pipe.target,
            temp_target,
            self,
            join_cols,
            debug = debug
        )
        success = all(self.exec_queries(queries, break_on_error=True, debug=debug))
        drop_success, drop_msg = temp_pipe.drop(debug=debug)
        if not drop_success:
            warn(drop_msg)
        if not success:
            return False, f"Failed to apply update to {pipe}."

    if_exists = kw.get('if_exists', 'append')
    if 'if_exists' in kw:
        kw.pop('if_exists')
    if 'name' in kw:
        kw.pop('name')

    ### Insert new data into Pipe's table.
    unseen_kw = copy.deepcopy(kw)
    unseen_kw.update({
        'name': pipe.target,
        'if_exists': if_exists,
        'debug': debug,
        'as_dict': True,
        'chunksize': chunksize,
    })
    stats = self.to_sql(unseen_df, **unseen_kw)
    if is_new:
        if not self.create_indices(pipe, debug=debug):
            if debug:
                dprint(f"Failed to create indices for {pipe}. Continuing...")

    end = time.perf_counter()
    success = stats['success']
    if not success:
        return success, stats['msg']
    msg = (
        f"Inserted {len(unseen_df)}, "
        + f"updated {len(update_df) if update_df is not None else 0} rows."
    )
    if debug:
        msg = msg[:-1] + (
            f"\non table {sql_item_name(pipe.target, self.flavor)}\n"
            + f"in {round(end-start, 2)} seconds."
        )
    return success, msg
def sync_pipe_inplace(self, pipe: "'Pipe'", params: Optional[Dict[str, Any]] = None, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, chunksize: Optional[int] = -1, check_existing: bool = True, debug: bool = False, **kw: Any) ‑> SuccessTuple

If a pipe's connector is the same as its instance connector, it's more efficient to sync the pipe in-place rather than reading data into Pandas.

Parameters

pipe : Pipe
The pipe whose connector is the same as its instance.
params : Optional[Dict[str, Any]], default None
Optional params dictionary to build the WHERE clause. See build_where().
begin : Optional[datetime.datetime], default None
Optionally specify the earliest datetime to search for data. Defaults to None.
end : Optional[datetime.datetime], default None
Optionally specify the latest datetime to search for data. Defaults to None.
chunksize : Optional[int], default -1
Specify the number of rows to sync per chunk. If -1, resort to system configuration (default is 900). A chunksize of None will sync all rows in one transaction. Defaults to -1.
check_existing : bool, default True
If True, pull and diff with existing data from the pipe. Defaults to True.
debug : bool, default False
Verbosity toggle.

Returns

A SuccessTuple.

Expand source code
def sync_pipe_inplace(
        self,
        pipe: 'meerschaum.Pipe',
        params: Optional[Dict[str, Any]] = None,
        begin: Optional[datetime.datetime] = None,
        end: Optional[datetime.datetime] = None,
        chunksize: Optional[int] = -1,
        check_existing: bool = True,
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """
    If a pipe's connector is the same as its instance connector,
    it's more efficient to sync the pipe in-place rather than reading data into Pandas.

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe whose connector is the same as its instance.

    params: Optional[Dict[str, Any]], default None
        Optional params dictionary to build the `WHERE` clause.
        See `meerschaum.utils.sql.build_where`.

    begin: Optional[datetime.datetime], default None
        Optionally specify the earliest datetime to search for data.
        Defaults to `None`.

    end: Optional[datetime.datetime], default None
        Optionally specify the latest datetime to search for data.
        Defaults to `None`.

    chunksize: Optional[int], default -1
        Specify the number of rows to sync per chunk.
        If `-1`, resort to system configuration (default is `900`).
        A `chunksize` of `None` will sync all rows in one transaction.
        Defaults to `-1`.

    check_existing: bool, default True
        If `True`, pull and diff with existing data from the pipe. Defaults to `True`.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A SuccessTuple.
    """
    from meerschaum.utils.sql import (
        sql_item_name, table_exists, get_sqlalchemy_table, get_pd_type,
        get_update_queries, get_null_replacement,
    )
    from meerschaum.utils.misc import generate_password
    from meerschaum.utils.debug import dprint
    metadef = self.get_pipe_metadef(
        pipe,
        params = params,
        begin = begin,
        end = end,
        debug = debug,
    )
    if self.flavor in ('mssql',):
        final_select_ix = metadef.lower().rfind('select')
        def_name = metadef[len('WITH '):].split(' ', maxsplit=1)[0]
        metadef = (
            metadef[:final_select_ix].rstrip() + ',\n'
            + "metadef AS (\n"
            + metadef[final_select_ix:]
            + "\n)\n"
        )

    pipe_name = sql_item_name(pipe.target, self.flavor)
    if not pipe.exists(debug=debug):
        if self.flavor in ('mssql',):
            create_pipe_query = metadef + f"SELECT *\nINTO {pipe_name}\nFROM metadef"
        elif self.flavor in ('sqlite', 'oracle', 'mysql', 'mariadb', 'duckdb'):
            create_pipe_query = (
                f"CREATE TABLE {pipe_name} AS\n"
                + f"SELECT *\nFROM ({metadef})"
                + (" AS metadef" if self.flavor in ('mysql', 'mariadb') else '')
            )
        else:
            create_pipe_query = f"SELECT *\nINTO {pipe_name}\nFROM ({metadef}) AS metadef"
        result = self.exec(create_pipe_query, debug=debug)
        if result is None:
            return False, f"Could not insert new data into {pipe} from its SQL query definition."
        if not self.create_indices(pipe, debug=debug):
            if debug:
                dprint(f"Failed to create indices for {pipe}. Continuing...")

        rowcount = pipe.get_rowcount(debug=debug)
        return True, f"Inserted {rowcount}, updated 0 rows."

    ### Generate names for the tables.
    transact_id = generate_password(6)
    def get_temp_table_name(label: str) -> str:
        return '_' + transact_id + '_' + label + '_' + pipe.target

    backtrack_table_raw = get_temp_table_name('backtrack')
    backtrack_table_name = sql_item_name(backtrack_table_raw, self.flavor)
    new_table_raw = get_temp_table_name('new')
    new_table_name = sql_item_name(new_table_raw, self.flavor)
    delta_table_raw = get_temp_table_name('delta')
    delta_table_name = sql_item_name(delta_table_raw, self.flavor)
    joined_table_raw = get_temp_table_name('joined')
    joined_table_name = sql_item_name(joined_table_raw, self.flavor)
    unseen_table_raw = get_temp_table_name('unseen')
    unseen_table_name = sql_item_name(unseen_table_raw, self.flavor)
    update_table_raw = get_temp_table_name('update')
    update_table_name = sql_item_name(update_table_raw, self.flavor)

    new_queries = []
    drop_new_query = f"DROP TABLE {new_table_name}"
    if table_exists(new_table_raw, self, debug=debug):
        new_queries.append(drop_new_query)

    if self.flavor in ('mssql',):
        create_new_query = metadef + f"SELECT *\nINTO {new_table_name}\nFROM metadef"
    elif self.flavor in ('sqlite', 'oracle', 'mysql', 'mariadb', 'duckdb'):
        create_new_query = (
            f"CREATE TABLE {new_table_name} AS\n"
            + f"SELECT *\nFROM ({metadef})"
            + (" AS metadef" if self.flavor in ('mysql', 'mariadb') else '')
        )
    else:
        create_new_query = f"SELECT *\nINTO {new_table_name}\nFROM ({metadef}) AS metadef"

    new_queries.append(create_new_query)

    new_success = all(self.exec_queries(new_queries, break_on_error=True, debug=debug))
    if not new_success:
        self.exec_queries([drop_new_query], break_on_error=False, debug=debug)
        return False, f"Could not fetch new data for {pipe}."

    new_table_obj = get_sqlalchemy_table(
        new_table_raw,
        connector = self,
        refresh = True,
        debug = debug,
    )
    new_cols = {str(col.name): get_pd_type(str(col.type)) for col in new_table_obj.columns}

    add_cols_queries = self.get_add_columns_queries(pipe, new_cols, debug=debug)
    if add_cols_queries:
        if not self.exec_queries(add_cols_queries, debug=debug):
            warn(f"Failed to add new columns to {pipe}.")

    alter_cols_queries = self.get_alter_columns_queries(pipe, new_cols, debug=debug)
    if alter_cols_queries:
        if not self.exec_queries(alter_cols_queries, debug=debug):
            warn(f"Failed to alter columns for {pipe}.")

    if not check_existing:
        new_count = self.value(f"SELECT COUNT(*) FROM {new_table_name}", debug=debug)
        insert_queries = [
            (
                f"INSERT INTO {pipe_name}\n"
                + f"SELECT *\nFROM {new_table_name}"
            ),
            f"DROP TABLE {new_table_name}"
        ]
        if not self.exec_queries(insert_queries, debug=debug, break_on_error=False):
            return False, f"Failed to insert into rows into {pipe}."
        return True, f"Inserted {new_count}, updated 0 rows."


    backtrack_queries = []
    drop_backtrack_query = f"DROP TABLE {backtrack_table_name}"
    if table_exists(backtrack_table_raw, self, debug=debug):
        backtrack_queries.append(drop_backtrack_query)
    backtrack_def = self.get_pipe_data_query(
        pipe,
        begin = begin,
        end = end,
        params = params,
        debug = debug,
        order = None,
    )

    create_backtrack_query = (
        (
            f"WITH backtrack_def AS ({backtrack_def})\n"
            + f"SELECT *\nINTO {backtrack_table_name}\nFROM backtrack_def"
        ) if self.flavor not in ('sqlite', 'oracle', 'mysql', 'mariadb', 'duckdb')
        else (
            f"CREATE TABLE {backtrack_table_name} AS\n"
            + f"SELECT *\nFROM ({backtrack_def})"
            + (" AS backtrack" if self.flavor in ('mysql', 'mariadb') else '')
        )
    )
    backtrack_queries.append(create_backtrack_query)
    backtrack_success = all(self.exec_queries(backtrack_queries, break_on_error=True, debug=debug))
    if not backtrack_success:
        self.exec_queries([drop_new_query, drop_backtrack_query], break_on_error=False, debug=debug)
        return False, f"Could not fetch backtrack data from {pipe}."


    ### Determine which index columns are present in both tables.
    backtrack_table_obj = get_sqlalchemy_table(
        backtrack_table_raw,
        connector = self,
        refresh = True,
        debug = debug,
    )
    backtrack_cols = {str(col.name): str(col.type) for col in backtrack_table_obj.columns}
    common_cols = [col for col in new_cols if col in backtrack_cols]
    on_cols = {
        col: new_cols.get(col, 'object')
        for col_key, col in pipe.columns.items()
        if (
            col
            and
            col_key != 'value'
            and col in backtrack_cols
            and col in new_cols
        )
    }

    delta_queries = []
    drop_delta_query = f"DROP TABLE {delta_table_name}"
    if table_exists(delta_table_raw, self, debug=debug):
        delta_queries.append(drop_delta_query)

    create_delta_query = (
        (
            f"SELECT\n"
            + (
                ', '.join([
                    f"COALESCE(new.{sql_item_name(col, self.flavor)}, "
                    + f"{get_null_replacement(typ, self.flavor)}) AS "
                    + sql_item_name(col, self.flavor)
                    for col, typ in new_cols.items()
                ])
            )
            + "\n"
            + f"INTO {delta_table_name}\n"
            + f"FROM {new_table_name} AS new\n"
            + f"LEFT OUTER JOIN {backtrack_table_name} AS old\nON\n"
            + '\nAND\n'.join([
                (
                    'COALESCE(new.' + sql_item_name(c, self.flavor) + ", "
                    + get_null_replacement(new_cols[c], self.flavor) + ") "
                    + ' = '
                    + 'COALESCE(old.' + sql_item_name(c, self.flavor) + ", "
                    + get_null_replacement(backtrack_cols[c], self.flavor) + ") "
                ) for c in common_cols
            ])
            + "\nWHERE\n"
            + '\nAND\n'.join([
                (
                    'old.' + sql_item_name(c, self.flavor) + ' IS NULL'
                ) for c in common_cols
            ])
            #  + "\nAND\n"
            #  + '\nAND\n'.join([
                #  (
                    #  'new.' + sql_item_name(c, self.flavor) + ' IS NOT NULL'
                #  ) for c in new_cols
            #  ])
        ) if self.flavor not in ('sqlite', 'oracle', 'mysql', 'mariadb', 'duckdb')
        else (
            f"CREATE TABLE {delta_table_name} AS\n"
            + f"SELECT\n"
            + (
                ', '.join([
                    f"COALESCE(new.{sql_item_name(col, self.flavor)}, "
                    + f"{get_null_replacement(typ, self.flavor)}) AS "
                    + sql_item_name(col, self.flavor)
                    for col, typ in new_cols.items()
                ])
            )
            + "\n"
            + f"FROM {new_table_name} new\n"
            + f"LEFT OUTER JOIN {backtrack_table_name} old\nON\n"
            + '\nAND\n'.join([
                (
                    'COALESCE(new.' + sql_item_name(c, self.flavor) + ", "
                    + get_null_replacement(new_cols[c], self.flavor) + ") "
                    + ' = '
                    + 'COALESCE(old.' + sql_item_name(c, self.flavor) + ", "
                    + get_null_replacement(backtrack_cols[c], self.flavor) + ") "
                ) for c in common_cols
            ])
            + "\nWHERE\n"
            + '\nAND\n'.join([
                (
                    'old.' + sql_item_name(c, self.flavor) + ' IS NULL'
                ) for c in common_cols
            ])
            #  + "\nAND\n"
            #  + '\nAND\n'.join([
                #  (
                    #  'new.' + sql_item_name(c, self.flavor) + ' IS NOT NULL'
                #  ) for c in new_cols
            #  ])
        )
    )

    delta_queries.append(create_delta_query)

    delta_success = all(self.exec_queries(delta_queries, break_on_error=True, debug=debug))
    if not delta_success:
        self.exec_queries(
            [
                drop_new_query,
                drop_backtrack_query,
                drop_delta_query,
            ],
            break_on_error = False,
            debug = debug,
        )
        return False, f"Could not filter data for {pipe}."

    delta_table_obj = get_sqlalchemy_table(
        delta_table_raw,
        connector = self,
        refresh = True,
        debug = debug,
    )
    delta_cols = {str(col.name): get_pd_type(str(col.type)) for col in delta_table_obj.columns}

    joined_queries = []
    drop_joined_query = f"DROP TABLE {joined_table_name}"
    if on_cols and table_exists(joined_table_raw, self, debug=debug):
        joined_queries.append(drop_joined_query)

    create_joined_query = (
        (
            "SELECT "
            + (', '.join([
                (
                    'delta.' + sql_item_name(c, self.flavor)
                    + " AS " + sql_item_name(c + '_delta', self.flavor)
                ) for c in delta_cols
            ]))
            + ", "
            + (', '.join([
                (
                    'bt.' + sql_item_name(c, self.flavor)
                    + " AS " + sql_item_name(c + '_backtrack', self.flavor)
                ) for c in backtrack_cols
            ]))
            + f"\nINTO {joined_table_name}\n"
            + f"FROM {delta_table_name} AS delta\n"
            + f"LEFT OUTER JOIN {backtrack_table_name} AS bt\nON\n"
            + '\nAND\n'.join([
                (
                    'COALESCE(delta.' + sql_item_name(c, self.flavor)
                    + ", " + get_null_replacement(typ, self.flavor) + ")"
                    + ' = '
                    + 'COALESCE(bt.' + sql_item_name(c, self.flavor)
                    + ", " + get_null_replacement(typ, self.flavor) + ")"
                ) for c, typ in on_cols.items()
            ])
        ) if self.flavor not in ('sqlite', 'oracle', 'mysql', 'mariadb', 'duckdb')
        else (
            f"CREATE TABLE {joined_table_name} AS\n"
            + "SELECT "
            + (', '.join([
                (
                    'delta.' + sql_item_name(c, self.flavor)
                    + " AS " + sql_item_name(c + '_delta', self.flavor)
                ) for c in delta_cols
            ]))
            + ", "
            + (', '.join([
                (
                    'bt.' + sql_item_name(c, self.flavor)
                    + " AS " + sql_item_name(c + '_backtrack', self.flavor)
                ) for c in backtrack_cols
            ]))
            + f"\nFROM {delta_table_name} delta\n"
            + f"LEFT OUTER JOIN {backtrack_table_name} bt\nON\n"
            + '\nAND\n'.join([
                (
                    'COALESCE(delta.' + sql_item_name(c, self.flavor)
                    + ", " + get_null_replacement(typ, self.flavor) + ")"
                    + ' = '
                    + 'COALESCE(bt.' + sql_item_name(c, self.flavor)
                    + ", " + get_null_replacement(typ, self.flavor) + ")"
                ) for c, typ in on_cols.items()
            ])
        )
    )

    joined_queries.append(create_joined_query)

    joined_success = (
        all(self.exec_queries(joined_queries, break_on_error=True, debug=debug))
        if on_cols else True
    )
    if not joined_success:
        self.exec_queries(
            [
                drop_new_query,
                drop_backtrack_query,
                drop_delta_query,
                drop_joined_query,
            ],
            break_on_error = False,
            debug = debug,
        )
        return False, f"Could not separate new and updated data for {pipe}."

    unseen_queries = []
    drop_unseen_query = f"DROP TABLE {unseen_table_name}"
    if on_cols and table_exists(unseen_table_raw, self, debug=debug):
        unseen_queries.append(drop_unseen_query)

    create_unseen_query = (
        (
            "SELECT "
            + (', '.join([
                (
                    "CASE\n    WHEN " + sql_item_name(c + '_delta', self.flavor)
                    + " != " + get_null_replacement(typ, self.flavor) 
                    + " THEN " + sql_item_name(c + '_delta', self.flavor)
                    + "\n    ELSE NULL\nEND "
                    + " AS " + sql_item_name(c, self.flavor)
                ) for c, typ in delta_cols.items()
            ]))
            + f"\nINTO {unseen_table_name}\n"
            + f"\nFROM {joined_table_name} AS joined\n"
            + f"WHERE "
            + '\nAND\n'.join([
                (
                    sql_item_name(c + '_backtrack', self.flavor) + ' IS NULL'
                ) for c in delta_cols
            ])
            #  + "\nAND\n"
            #  + '\nAND\n'.join([
                #  (
                    #  sql_item_name(c + '_delta', self.flavor) + ' IS NOT NULL'
                #  ) for c in delta_cols
            #  ])
        ) if self.flavor not in ('sqlite', 'oracle', 'mysql', 'mariadb', 'duckdb') else (
            f"CREATE TABLE {unseen_table_name} AS\n"
            + "SELECT "
            + (', '.join([
                (
                    "CASE\n    WHEN " + sql_item_name(c + '_delta', self.flavor)
                    + " != "
                    + get_null_replacement(typ, self.flavor)
                    + " THEN " + sql_item_name(c + '_delta', self.flavor)
                    + "\n    ELSE NULL\nEND "
                    + " AS " + sql_item_name(c, self.flavor)
                ) for c, typ in delta_cols.items()
            ]))
            + f"\nFROM {joined_table_name} joined\n"
            + f"WHERE "
            + '\nAND\n'.join([
                (
                    sql_item_name(c + '_backtrack', self.flavor) + ' IS NULL'
                ) for c in delta_cols
            ])
            #  + "\nAND\n"
            #  + '\nAND\n'.join([
                #  (
                    #  sql_item_name(c + '_delta', self.flavor) + ' IS NOT NULL'
                #  ) for c in delta_cols
            #  ])
        )
    )

    unseen_queries.append(create_unseen_query)

    unseen_success = (
        all(self.exec_queries(unseen_queries, break_on_error=True, debug=debug))
        if on_cols else True
    )
    if not unseen_success:
        self.exec_queries(
            [
                drop_new_query,
                drop_backtrack_query,
                drop_delta_query,
                drop_joined_query,
                drop_unseen_query,
            ],
            break_on_error = False,
            debug = debug,
        )
        return False, f"Could not determine new data for {pipe}."
    unseen_count = self.value(
        (
            "SELECT COUNT(*) FROM "
            + (unseen_table_name if on_cols else delta_table_name)
        ), debug = debug,
    )

    update_queries = []
    drop_update_query = f"DROP TABLE {update_table_name}"
    if on_cols and table_exists(update_table_raw, self, debug=debug):
        update_queries.append(drop_unseen_query)

    create_update_query = (
        (
            "SELECT "
            + (', '.join([
                (
                    "CASE\n    WHEN " + sql_item_name(c + '_delta', self.flavor)
                    + " != " + get_null_replacement(typ, self.flavor)
                    + " THEN " + sql_item_name(c + '_delta', self.flavor)
                    + "\n    ELSE NULL\nEND "
                    + " AS " + sql_item_name(c, self.flavor)
                ) for c, typ in delta_cols.items()
            ]))
            + f"\nINTO {update_table_name}"
            + f"\nFROM {joined_table_name} AS joined\n"
            + f"WHERE "
            + '\nOR\n'.join([
                (
                    sql_item_name(c + '_backtrack', self.flavor) + ' IS NOT NULL'
                ) for c in delta_cols
            ])
        ) if self.flavor not in ('sqlite', 'oracle', 'mysql', 'mariadb', 'duckdb') else (
            f"CREATE TABLE {update_table_name} AS\n"
            + "SELECT "
            + (', '.join([
                (
                    "CASE\n    WHEN " + sql_item_name(c + '_delta', self.flavor)
                    + " != "
                    + get_null_replacement(typ, self.flavor)
                    + " THEN " + sql_item_name(c + '_delta', self.flavor)
                    + "\n    ELSE NULL\nEND "
                    + " AS " + sql_item_name(c, self.flavor)
                ) for c, typ in delta_cols.items()
            ]))
            + f"\nFROM {joined_table_name} joined\n"
            + f"WHERE "
            + '\nOR\n'.join([
                (
                    sql_item_name(c + '_backtrack', self.flavor) + ' IS NOT NULL'
                ) for c in delta_cols
            ])
        )
    )

    update_queries.append(create_update_query)

    update_success = (
        all(self.exec_queries(update_queries, break_on_error=True, debug=debug))
        if on_cols else True
    )
    if not update_success:
        self.exec_queries(
            [
                drop_new_query,
                drop_backtrack_query,
                drop_delta_query,
                drop_joined_query,
                drop_unseen_query,
                drop_update_query,
            ],
            break_on_error = False,
            debug = debug,
        )
        return False, "Could not determine updated data for {pipe}."
    update_count = (
        self.value(f"SELECT COUNT(*) FROM {update_table_name}", debug=debug)
        if on_cols else 0
    )

    apply_update_queries = (
        get_update_queries(
            pipe.target,
            update_table_raw,
            self,
            on_cols,
            debug = debug
        )
        if on_cols else []
    )

    apply_unseen_queries = [
        (
            f"INSERT INTO {pipe_name}\n"
            + f"SELECT *\nFROM " + (unseen_table_name if on_cols else delta_table_name)
        ),
    ]

    apply_queries = (
        (apply_unseen_queries if unseen_count > 0 else [])
        + (apply_update_queries if update_count > 0 else [])
        + [
            drop_new_query,
            drop_backtrack_query,
            drop_delta_query,
        ] + (
            [
                drop_joined_query,
                drop_unseen_query,
                drop_update_query,
            ] if on_cols else []
        )
    )
    success = all(self.exec_queries(apply_queries, break_on_error=False, debug=debug))
    msg = (
        f"Was not able to apply changes to {pipe}."
        if not success else f"Inserted {unseen_count}, updated {update_count} rows."
    )
    return success, msg
def test_connection(self, **kw: Any) ‑> Union[bool, NoneType]

Test if a successful connection to the database may be made.

Parameters

**kw: The keyword arguments are passed to retry_connect().

Returns

True if a connection is made, otherwise False or None in case of failure.

Expand source code
def test_connection(
        self,
        **kw: Any
    ) -> Union[bool, None]:
    """
    Test if a successful connection to the database may be made.

    Parameters
    ----------
    **kw:
        The keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`.

    Returns
    -------
    `True` if a connection is made, otherwise `False` or `None` in case of failure.

    """
    import warnings
    from meerschaum.connectors.poll import retry_connect
    _default_kw = {'max_retries': 1, 'retry_wait': 0, 'warn': False, 'connector': self}
    _default_kw.update(kw)
    with warnings.catch_warnings():
        warnings.filterwarnings('ignore', 'Could not')
        try:
            return retry_connect(**_default_kw)
        except Exception as e:
            return False
def to_sql(self, df: pandas.DataFrame, name: str = None, index: bool = False, if_exists: str = 'replace', method: str = '', chunksize: Optional[int] = -1, silent: bool = False, debug: bool = False, as_tuple: bool = False, as_dict: bool = False, **kw) ‑> Union[bool, SuccessTuple]

Upload a DataFrame's contents to the SQL server.

Parameters

df : pd.DataFrame
The DataFrame to be uploaded.
name : str
The name of the table to be created.
index : bool, default False
If True, creates the DataFrame's indices as columns.
if_exists : str, default 'replace'
Drop and create the table ('replace') or append if it exists ('append') or raise Exception ('fail'). Options are ['replace', 'append', 'fail'].
method : str, default ''
None or multi. Details on pandas.to_sql.
as_tuple : bool, default False
If True, return a (success_bool, message) tuple instead of a bool. Defaults to False.
as_dict : bool, default False
If True, return a dictionary of transaction information. The keys are success, msg, start, end, duration, num_rows, chunksize, method, and target.
kw : Any
Additional arguments will be passed to the DataFrame's to_sql function

Returns

Either a bool or a SuccessTuple (depends on as_tuple).

Expand source code
def to_sql(
        self,
        df: pandas.DataFrame,
        name: str = None,
        index: bool = False,
        if_exists: str = 'replace',
        method: str = "",
        chunksize: Optional[int] = -1,
        silent: bool = False,
        debug: bool = False,
        as_tuple: bool = False,
        as_dict: bool = False,
        **kw
    ) -> Union[bool, SuccessTuple]:
    """
    Upload a DataFrame's contents to the SQL server.

    Parameters
    ----------
    df: pd.DataFrame
        The DataFrame to be uploaded.

    name: str
        The name of the table to be created.

    index: bool, default False
        If True, creates the DataFrame's indices as columns.

    if_exists: str, default 'replace'
        Drop and create the table ('replace') or append if it exists
        ('append') or raise Exception ('fail').
        Options are ['replace', 'append', 'fail'].

    method: str, default ''
        None or multi. Details on pandas.to_sql.

    as_tuple: bool, default False
        If `True`, return a (success_bool, message) tuple instead of a `bool`.
        Defaults to `False`.

    as_dict: bool, default False
        If `True`, return a dictionary of transaction information.
        The keys are `success`, `msg`, `start`, `end`, `duration`, `num_rows`, `chunksize`,
        `method`, and `target`.
        
    kw: Any
        Additional arguments will be passed to the DataFrame's `to_sql` function

    Returns
    -------
    Either a `bool` or a `SuccessTuple` (depends on `as_tuple`).
    """
    import time
    from meerschaum.utils.warnings import error
    import warnings
    if name is None:
        error(f"Name must not be `None` to insert data into {self}.")

    ### We're requiring `name` to be positional, and sometimes it's passed in from background jobs.
    kw.pop('name', None)

    from meerschaum.utils.sql import sql_item_name, table_exists
    from meerschaum.connectors.sql._create_engine import flavor_configs
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy', debug=debug)

    stats = {'target': name, }
    ### resort to defaults if None
    if method == "":
        if self.flavor in _bulk_flavors:
            method = psql_insert_copy
        else:
            ### Should resolve to 'multi' or `None`.
            method = flavor_configs.get(self.flavor, {}).get('to_sql', {}).get('method', 'multi')
    stats['method'] = method.__name__ if hasattr(method, '__name__') else str(method)
    chunksize = chunksize if chunksize != -1 else self.sys_config.get('chunksize', None)
    stats['chunksize'] = chunksize

    success, msg = False, "Default to_sql message"
    start = time.perf_counter()
    if debug:
        msg = f"Inserting {len(df)} rows with chunksize: {chunksize}..."
        print(msg, end="", flush=True)
    stats['num_rows'] = len(df)

    ### filter out non-pandas args
    import inspect
    to_sql_params = inspect.signature(df.to_sql).parameters
    to_sql_kw = {}
    for k, v in kw.items():
        if k in to_sql_params:
            to_sql_kw[k] = v

    if self.flavor == 'oracle':
        ### For some reason 'replace' doesn't work properly in pandas,
        ### so try dropping first.
        if if_exists == 'replace' and table_exists(name, self, debug=debug):
            success = self.exec("DROP TABLE " + sql_item_name(name, 'oracle')) is not None
            if not success:
                warn(f"Unable to drop {name}")


        ### Enforce NVARCHAR(2000) as text instead of CLOB.
        dtype = to_sql_kw.get('dtype', {})
        for col, typ in df.dtypes.items():
            if str(typ) == 'object':
                dtype[col] = sqlalchemy.types.NVARCHAR(2000)
            elif str(typ).lower().startswith('int'):
                dtype[col] = sqlalchemy.types.INTEGER

        to_sql_kw['dtype'] = dtype

    try:
        with warnings.catch_warnings():
            warnings.filterwarnings('ignore', 'case sensitivity issues')
            df.to_sql(
                name = name,
                con = self.engine,
                index = index,
                if_exists = if_exists,
                method = method,
                chunksize = chunksize,
                **to_sql_kw
            )
        success = True
    except Exception as e:
        if not silent:
            warn(str(e))
        success, msg = False, str(e)

    end = time.perf_counter()
    if success:
        msg = f"It took {round(end - start, 2)} seconds to sync {len(df)} rows to {name}."
    stats['start'] = start
    stats['end'] = end
    stats['duration'] = end - start

    if debug:
        print(f" done.", flush=True)
        dprint(msg)

    stats['success'] = success
    stats['msg'] = msg
    if as_tuple:
        return success, msg
    if as_dict:
        return stats
    return success
def value(self, query: str, *args: Any, use_pandas: bool = False, **kw: Any) ‑> Any

Execute the provided query and return the first value.

Parameters

query : str
The SQL query to execute.
*args : Any
The arguments passed to meerschaum.connectors.sql.SQLConnector.exec if use_pandas is False (default) or to meerschaum.connectors.sql.SQLConnector.read.
use_pandas : bool, default False
If True, use read(), otherwise use meerschaum.connectors.sql.SQLConnector.exec (default). NOTE: This is always True for DuckDB.
**kw : Any
See args.

Returns

Any value returned from the query.

Expand source code
def value(
        self,
        query: str,
        *args: Any,
        use_pandas: bool = False,
        **kw: Any
    ) -> Any:
    """
    Execute the provided query and return the first value.

    Parameters
    ----------
    query: str
        The SQL query to execute.
        
    *args: Any
        The arguments passed to `meerschaum.connectors.sql.SQLConnector.exec`
        if `use_pandas` is `False` (default) or to `meerschaum.connectors.sql.SQLConnector.read`.
        
    use_pandas: bool, default False
        If `True`, use `meerschaum.connectors.SQLConnector.read`, otherwise use
        `meerschaum.connectors.sql.SQLConnector.exec` (default).
        **NOTE:** This is always `True` for DuckDB.

    **kw: Any
        See `args`.

    Returns
    -------
    Any value returned from the query.

    """
    if self.flavor == 'duckdb':
        use_pandas = True
    if use_pandas:
        try:
            return self.read(query, *args, **kw).iloc[0, 0]
        except Exception as e:
            #  warn(e)
            return None

    _close = kw.get('close', True)
    _commit = kw.get('commit', (self.flavor != 'mssql'))
    try:
        result, connection = self.exec(
            query,
            *args,
            with_connection=True,
            close=False,
            commit=_commit,
            **kw
        )
        first = result.first() if result is not None else None
        _val = first[0] if first is not None else None
    except Exception as e:
        warn(e, stacklevel=3)
        return None
    if _close:
        try:
            connection.close()
        except Exception as e:
            warn("Failed to close connection with exception:\n" + str(e))
    return _val