Module meerschaum.connectors.sql.SQLConnector

Interface with SQL servers using sqlalchemy.

Expand source code
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8

"""
Interface with SQL servers using sqlalchemy.
"""

from __future__ import annotations
from meerschaum.utils.typing import Optional, Any

from meerschaum.connectors import Connector
from meerschaum.utils.warnings import error

class SQLConnector(Connector):
    """
    Connect to SQL databases via `sqlalchemy`.
    
    SQLConnectors may be used as Meerschaum instance connectors.
    Read more about connectors and instances at
    https://meerschaum.io/reference/connectors/

    """

    IS_INSTANCE: bool = True

    from ._create_engine import flavor_configs, create_engine
    from ._sql import read, value, exec, execute, to_sql, exec_queries
    from meerschaum.utils.sql import test_connection
    from ._fetch import fetch, get_pipe_metadef
    from ._cli import cli
    from ._pipes import (
        fetch_pipes_keys,
        create_indices,
        drop_indices,
        get_create_index_queries,
        get_drop_index_queries,
        get_add_columns_queries,
        get_alter_columns_queries,
        delete_pipe,
        get_pipe_data,
        get_pipe_data_query,
        register_pipe,
        edit_pipe,
        get_pipe_id,
        get_pipe_attributes,
        sync_pipe,
        sync_pipe_inplace,
        get_sync_time,
        pipe_exists,
        get_pipe_rowcount,
        drop_pipe,
        clear_pipe,
        deduplicate_pipe,
        get_pipe_table,
        get_pipe_columns_types,
        get_to_sql_dtype,
    )
    from ._plugins import (
        register_plugin,
        delete_plugin,
        get_plugin_id,
        get_plugin_version,
        get_plugins,
        get_plugin_user_id,
        get_plugin_username,
        get_plugin_attributes,
    )
    from ._users import (
        register_user,
        get_user_id,
        get_users,
        edit_user,
        delete_user,
        get_user_password_hash,
        get_user_type,
        get_user_attributes,
    )
    from ._uri import from_uri, parse_uri
    
    def __init__(
        self,
        label: Optional[str] = None,
        flavor: Optional[str] = None,
        wait: bool = False,
        connect: bool = False,
        debug: bool = False,
        **kw: Any
    ):
        """
        Parameters
        ----------
        label: str, default 'main'
            The identifying label for the connector.
            E.g. for `sql:main`, 'main' is the label.
            Defaults to 'main'.

        flavor: Optional[str], default None
            The database flavor, e.g.
            `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc.
            To see supported flavors, run the `bootstrap connectors` command.

        wait: bool, default False
            If `True`, block until a database connection has been made.
            Defaults to `False`.

        connect: bool, default False
            If `True`, immediately attempt to connect the database and raise
            a warning if the connection fails.
            Defaults to `False`.

        debug: bool, default False
            Verbosity toggle.
            Defaults to `False`.

        kw: Any
            All other arguments will be passed to the connector's attributes.
            Therefore, a connector may be made without being registered,
            as long enough parameters are supplied to the constructor.
        """
        if 'uri' in kw:
            uri = kw['uri']
            if uri.startswith('postgres://'):
                uri = uri.replace('postgres://', 'postgresql://', 1)
            if uri.startswith('timescaledb://'):
                uri = uri.replace('timescaledb://', 'postgresql://', 1)
                flavor = 'timescaledb'
            kw['uri'] = uri
            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
            label = label or from_uri_params.get('label', None)
            _ = from_uri_params.pop('label', None)

            ### Sometimes the flavor may be provided with a URI.
            kw.update(from_uri_params)
            if flavor:
                kw['flavor'] = flavor


        ### set __dict__ in base class
        super().__init__(
            'sql',
            label = label or self.__dict__.get('label', None),
            **kw
        )

        if self.__dict__.get('flavor', None) == 'sqlite':
            self._reset_attributes()
            self._set_attributes(
                'sql',
                label = label,
                inherit_default = False,
                **kw
            )
            ### For backwards compatability reasons, set the path for sql:local if its missing.
            if self.label == 'local' and not self.__dict__.get('database', None):
                from meerschaum.config._paths import SQLITE_DB_PATH
                self.database = str(SQLITE_DB_PATH)

        ### ensure flavor and label are set accordingly
        if 'flavor' not in self.__dict__:
            if flavor is None and 'uri' not in self.__dict__:
                raise Exception(
                    f"    Missing flavor. Provide flavor as a key for '{self}'."
                )
            self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None)

        if self.flavor == 'postgres':
            self.flavor = 'postgresql'

        self._debug = debug
        ### Store the PID and thread at initialization
        ### so we can dispose of the Pool in child processes or threads.
        import os, threading
        self._pid = os.getpid()
        self._thread_ident = threading.current_thread().ident
        self._sessions = {}
        self._locks = {'_sessions': threading.RLock(), }

        ### verify the flavor's requirements are met
        if self.flavor not in self.flavor_configs:
            error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector")
        if not self.__dict__.get('uri'):
            self.verify_attributes(
                self.flavor_configs[self.flavor].get('requirements', set()),
                debug=debug,
            )

        if wait:
            from meerschaum.connectors.poll import retry_connect
            retry_connect(connector=self, debug=debug)

        if connect:
            if not self.test_connection(debug=debug):
                from meerschaum.utils.warnings import warn
                warn(f"Failed to connect with connector '{self}'!", stack=False)

    @property
    def Session(self):
        if '_Session' not in self.__dict__:
            if self.engine is None:
                return None

            from meerschaum.utils.packages import attempt_import
            sqlalchemy_orm = attempt_import('sqlalchemy.orm')
            session_factory = sqlalchemy_orm.sessionmaker(self.engine)
            self._Session = sqlalchemy_orm.scoped_session(session_factory)

        return self._Session

    @property
    def engine(self):
        import os, threading
        ### build the sqlalchemy engine
        if '_engine' not in self.__dict__:
            self._engine, self._engine_str = self.create_engine(include_uri=True)

        same_process = os.getpid() == self._pid
        same_thread = threading.current_thread().ident == self._thread_ident

        ### handle child processes
        if not same_process:
            self._pid = os.getpid()
            self._thread = threading.current_thread()
            from meerschaum.utils.warnings import warn
            warn(f"Different PID detected. Disposing of connections...")
            self._engine.dispose()

        ### handle different threads
        if not same_thread:
            pass

        return self._engine

    @property
    def DATABASE_URL(self) -> str:
        """
        Return the URI connection string (alias for `SQLConnector.URI`.
        """
        _ = self.engine
        return str(self._engine_str)

    @property
    def URI(self) -> str:
        """
        Return the URI connection string.
        """
        _ = self.engine
        return str(self._engine_str)

    @property
    def IS_THREAD_SAFE(self) -> str:
        """
        Return whether this connector may be multithreaded.
        """
        if self.flavor == 'duckdb':
            return False
        if self.flavor == 'sqlite':
            return ':memory:' not in self.URI
        return True

    @property
    def metadata(self):
        from meerschaum.utils.packages import attempt_import
        sqlalchemy = attempt_import('sqlalchemy')
        if '_metadata' not in self.__dict__:
            self._metadata = sqlalchemy.MetaData()
        return self._metadata

    @property
    def db(self) -> Optional[databases.Database]:
        from meerschaum.utils.packages import attempt_import
        databases = attempt_import('databases', lazy=False, install=True)
        url = self.DATABASE_URL
        if 'mysql' in url:
            url = url.replace('+pymysql', '')
        if '_db' not in self.__dict__:
            try:
                self._db = databases.Database(url)
            except KeyError:
                ### Likely encountered an unsupported flavor.
                from meerschaum.utils.warnings import warn
                self._db = None
        return self._db


    @property
    def db_version(self) -> Union[str, None]:
        """
        Return the database version.
        """
        _db_version = self.__dict__.get('_db_version', None)
        if _db_version is not None:
            return _db_version

        from meerschaum.utils.sql import get_db_version
        self._db_version = get_db_version(self)
        return self._db_version


    def __getstate__(self):
        return self.__dict__

    def __setstate__(self, d):
        self.__dict__.update(d)

    def __call__(self):
        return self

Classes

class SQLConnector (label: Optional[str] = None, flavor: Optional[str] = None, wait: bool = False, connect: bool = False, debug: bool = False, **kw: Any)

Connect to SQL databases via sqlalchemy.

SQLConnectors may be used as Meerschaum instance connectors. Read more about connectors and instances at https://meerschaum.io/reference/connectors/

Parameters

label : str, default 'main'
The identifying label for the connector. E.g. for sql:main, 'main' is the label. Defaults to 'main'.
flavor : Optional[str], default None
The database flavor, e.g. 'sqlite', 'postgresql', 'cockroachdb', etc. To see supported flavors, run the bootstrap connectors command.
wait : bool, default False
If True, block until a database connection has been made. Defaults to False.
connect : bool, default False
If True, immediately attempt to connect the database and raise a warning if the connection fails. Defaults to False.
debug : bool, default False
Verbosity toggle. Defaults to False.
kw : Any
All other arguments will be passed to the connector's attributes. Therefore, a connector may be made without being registered, as long enough parameters are supplied to the constructor.
Expand source code
class SQLConnector(Connector):
    """
    Connect to SQL databases via `sqlalchemy`.
    
    SQLConnectors may be used as Meerschaum instance connectors.
    Read more about connectors and instances at
    https://meerschaum.io/reference/connectors/

    """

    IS_INSTANCE: bool = True

    from ._create_engine import flavor_configs, create_engine
    from ._sql import read, value, exec, execute, to_sql, exec_queries
    from meerschaum.utils.sql import test_connection
    from ._fetch import fetch, get_pipe_metadef
    from ._cli import cli
    from ._pipes import (
        fetch_pipes_keys,
        create_indices,
        drop_indices,
        get_create_index_queries,
        get_drop_index_queries,
        get_add_columns_queries,
        get_alter_columns_queries,
        delete_pipe,
        get_pipe_data,
        get_pipe_data_query,
        register_pipe,
        edit_pipe,
        get_pipe_id,
        get_pipe_attributes,
        sync_pipe,
        sync_pipe_inplace,
        get_sync_time,
        pipe_exists,
        get_pipe_rowcount,
        drop_pipe,
        clear_pipe,
        deduplicate_pipe,
        get_pipe_table,
        get_pipe_columns_types,
        get_to_sql_dtype,
    )
    from ._plugins import (
        register_plugin,
        delete_plugin,
        get_plugin_id,
        get_plugin_version,
        get_plugins,
        get_plugin_user_id,
        get_plugin_username,
        get_plugin_attributes,
    )
    from ._users import (
        register_user,
        get_user_id,
        get_users,
        edit_user,
        delete_user,
        get_user_password_hash,
        get_user_type,
        get_user_attributes,
    )
    from ._uri import from_uri, parse_uri
    
    def __init__(
        self,
        label: Optional[str] = None,
        flavor: Optional[str] = None,
        wait: bool = False,
        connect: bool = False,
        debug: bool = False,
        **kw: Any
    ):
        """
        Parameters
        ----------
        label: str, default 'main'
            The identifying label for the connector.
            E.g. for `sql:main`, 'main' is the label.
            Defaults to 'main'.

        flavor: Optional[str], default None
            The database flavor, e.g.
            `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc.
            To see supported flavors, run the `bootstrap connectors` command.

        wait: bool, default False
            If `True`, block until a database connection has been made.
            Defaults to `False`.

        connect: bool, default False
            If `True`, immediately attempt to connect the database and raise
            a warning if the connection fails.
            Defaults to `False`.

        debug: bool, default False
            Verbosity toggle.
            Defaults to `False`.

        kw: Any
            All other arguments will be passed to the connector's attributes.
            Therefore, a connector may be made without being registered,
            as long enough parameters are supplied to the constructor.
        """
        if 'uri' in kw:
            uri = kw['uri']
            if uri.startswith('postgres://'):
                uri = uri.replace('postgres://', 'postgresql://', 1)
            if uri.startswith('timescaledb://'):
                uri = uri.replace('timescaledb://', 'postgresql://', 1)
                flavor = 'timescaledb'
            kw['uri'] = uri
            from_uri_params = self.from_uri(kw['uri'], as_dict=True)
            label = label or from_uri_params.get('label', None)
            _ = from_uri_params.pop('label', None)

            ### Sometimes the flavor may be provided with a URI.
            kw.update(from_uri_params)
            if flavor:
                kw['flavor'] = flavor


        ### set __dict__ in base class
        super().__init__(
            'sql',
            label = label or self.__dict__.get('label', None),
            **kw
        )

        if self.__dict__.get('flavor', None) == 'sqlite':
            self._reset_attributes()
            self._set_attributes(
                'sql',
                label = label,
                inherit_default = False,
                **kw
            )
            ### For backwards compatability reasons, set the path for sql:local if its missing.
            if self.label == 'local' and not self.__dict__.get('database', None):
                from meerschaum.config._paths import SQLITE_DB_PATH
                self.database = str(SQLITE_DB_PATH)

        ### ensure flavor and label are set accordingly
        if 'flavor' not in self.__dict__:
            if flavor is None and 'uri' not in self.__dict__:
                raise Exception(
                    f"    Missing flavor. Provide flavor as a key for '{self}'."
                )
            self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None)

        if self.flavor == 'postgres':
            self.flavor = 'postgresql'

        self._debug = debug
        ### Store the PID and thread at initialization
        ### so we can dispose of the Pool in child processes or threads.
        import os, threading
        self._pid = os.getpid()
        self._thread_ident = threading.current_thread().ident
        self._sessions = {}
        self._locks = {'_sessions': threading.RLock(), }

        ### verify the flavor's requirements are met
        if self.flavor not in self.flavor_configs:
            error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector")
        if not self.__dict__.get('uri'):
            self.verify_attributes(
                self.flavor_configs[self.flavor].get('requirements', set()),
                debug=debug,
            )

        if wait:
            from meerschaum.connectors.poll import retry_connect
            retry_connect(connector=self, debug=debug)

        if connect:
            if not self.test_connection(debug=debug):
                from meerschaum.utils.warnings import warn
                warn(f"Failed to connect with connector '{self}'!", stack=False)

    @property
    def Session(self):
        if '_Session' not in self.__dict__:
            if self.engine is None:
                return None

            from meerschaum.utils.packages import attempt_import
            sqlalchemy_orm = attempt_import('sqlalchemy.orm')
            session_factory = sqlalchemy_orm.sessionmaker(self.engine)
            self._Session = sqlalchemy_orm.scoped_session(session_factory)

        return self._Session

    @property
    def engine(self):
        import os, threading
        ### build the sqlalchemy engine
        if '_engine' not in self.__dict__:
            self._engine, self._engine_str = self.create_engine(include_uri=True)

        same_process = os.getpid() == self._pid
        same_thread = threading.current_thread().ident == self._thread_ident

        ### handle child processes
        if not same_process:
            self._pid = os.getpid()
            self._thread = threading.current_thread()
            from meerschaum.utils.warnings import warn
            warn(f"Different PID detected. Disposing of connections...")
            self._engine.dispose()

        ### handle different threads
        if not same_thread:
            pass

        return self._engine

    @property
    def DATABASE_URL(self) -> str:
        """
        Return the URI connection string (alias for `SQLConnector.URI`.
        """
        _ = self.engine
        return str(self._engine_str)

    @property
    def URI(self) -> str:
        """
        Return the URI connection string.
        """
        _ = self.engine
        return str(self._engine_str)

    @property
    def IS_THREAD_SAFE(self) -> str:
        """
        Return whether this connector may be multithreaded.
        """
        if self.flavor == 'duckdb':
            return False
        if self.flavor == 'sqlite':
            return ':memory:' not in self.URI
        return True

    @property
    def metadata(self):
        from meerschaum.utils.packages import attempt_import
        sqlalchemy = attempt_import('sqlalchemy')
        if '_metadata' not in self.__dict__:
            self._metadata = sqlalchemy.MetaData()
        return self._metadata

    @property
    def db(self) -> Optional[databases.Database]:
        from meerschaum.utils.packages import attempt_import
        databases = attempt_import('databases', lazy=False, install=True)
        url = self.DATABASE_URL
        if 'mysql' in url:
            url = url.replace('+pymysql', '')
        if '_db' not in self.__dict__:
            try:
                self._db = databases.Database(url)
            except KeyError:
                ### Likely encountered an unsupported flavor.
                from meerschaum.utils.warnings import warn
                self._db = None
        return self._db


    @property
    def db_version(self) -> Union[str, None]:
        """
        Return the database version.
        """
        _db_version = self.__dict__.get('_db_version', None)
        if _db_version is not None:
            return _db_version

        from meerschaum.utils.sql import get_db_version
        self._db_version = get_db_version(self)
        return self._db_version


    def __getstate__(self):
        return self.__dict__

    def __setstate__(self, d):
        self.__dict__.update(d)

    def __call__(self):
        return self

Ancestors

  • meerschaum.connectors.Connector.Connector

Class variables

var IS_INSTANCE : bool
var flavor_configs

Static methods

def from_uri(uri: str, label: Optional[str] = None, as_dict: bool = False) ‑> Union[SQLConnector, Dict[str, Union[str, int]]]

Create a new SQLConnector from a URI string.

Parameters

uri : str
The URI connection string.
label : Optional[str], default None
If provided, use this as the connector label. Otherwise use the determined database name.
as_dict : bool, default False
If True, return a dictionary of the keyword arguments necessary to create a new SQLConnector, otherwise create a new object.

Returns

A new SQLConnector object or a dictionary of attributes (if as_dict is True).

Expand source code
@classmethod
def from_uri(
        cls,
        uri: str,
        label: Optional[str] = None,
        as_dict: bool = False,
    ) -> Union[
        'meerschaum.connectors.SQLConnector',
        Dict[str, Union[str, int]],
    ]:
    """
    Create a new SQLConnector from a URI string.

    Parameters
    ----------
    uri: str
        The URI connection string.

    label: Optional[str], default None
        If provided, use this as the connector label.
        Otherwise use the determined database name.

    as_dict: bool, default False
        If `True`, return a dictionary of the keyword arguments
        necessary to create a new `SQLConnector`, otherwise create a new object.

    Returns
    -------
    A new SQLConnector object or a dictionary of attributes (if `as_dict` is `True`).
    """

    params = cls.parse_uri(uri)
    params['uri'] = uri
    flavor = params.get('flavor', None)
    if not flavor or flavor not in cls.flavor_configs:
        error(f"Invalid flavor '{flavor}' detected from the provided URI.")

    if 'database' not in params:
        error("Unable to determine the database from the provided URI.")

    if flavor in ('sqlite', 'duckdb'):
        if params['database'] == ':memory:':
            params['label'] = label or f'memory_{flavor}'
        else:
            params['label'] = label or params['database'].split(os.path.sep)[-1].lower()
    else:
        params['label'] = label or (
            (
                (params['username'] + '@' if 'username' in params else '')
                + params.get('host', '')
                + ('/' if 'host' in params else '')
                + params.get('database', '')
            ).lower()
        )

    return cls(**params) if not as_dict else params
def parse_uri(uri: str) ‑> Dict[str, Any]

Parse a URI string into a dictionary of parameters.

Parameters

uri : str
The database connection URI.

Returns

A dictionary of attributes.

Examples

>>> parse_uri('sqlite:////home/foo/bar.db')
{'database': '/home/foo/bar.db', 'flavor': 'sqlite'}
>>> parse_uri(
...     'mssql+pyodbc://sa:supersecureSECRETPASSWORD123!@localhost:1439'
...     + '/master?driver=ODBC+Driver+17+for+SQL+Server'
... )
{'host': 'localhost', 'database': 'master', 'username': 'sa',
'password': 'supersecureSECRETPASSWORD123!', 'port': 1439, 'flavor': 'mssql',
'driver': 'ODBC Driver 17 for SQL Server'}
>>>
Expand source code
@staticmethod
def parse_uri(uri: str) -> Dict[str, Any]:
    """
    Parse a URI string into a dictionary of parameters.

    Parameters
    ----------
    uri: str
        The database connection URI.

    Returns
    -------
    A dictionary of attributes.

    Examples
    --------
    >>> parse_uri('sqlite:////home/foo/bar.db')
    {'database': '/home/foo/bar.db', 'flavor': 'sqlite'}
    >>> parse_uri(
    ...     'mssql+pyodbc://sa:supersecureSECRETPASSWORD123!@localhost:1439'
    ...     + '/master?driver=ODBC+Driver+17+for+SQL+Server'
    ... )
    {'host': 'localhost', 'database': 'master', 'username': 'sa',
    'password': 'supersecureSECRETPASSWORD123!', 'port': 1439, 'flavor': 'mssql',
    'driver': 'ODBC Driver 17 for SQL Server'}
    >>> 
    """
    from urllib.parse import parse_qs, urlparse
    sqlalchemy = attempt_import('sqlalchemy')
    parser = sqlalchemy.engine.url.make_url
    params = parser(uri).translate_connect_args()
    params['flavor'] = uri.split(':')[0].split('+')[0]
    if params['flavor'] == 'postgres':
        params['flavor'] = 'postgresql'
    if '?' in uri:
        parsed_uri = urlparse(uri)
        for key, value in parse_qs(parsed_uri.query).items():
            params.update({key: value[0]})
    return params

Instance variables

var DATABASE_URL : str

Return the URI connection string (alias for SQLConnector.URI.

Expand source code
@property
def DATABASE_URL(self) -> str:
    """
    Return the URI connection string (alias for `SQLConnector.URI`.
    """
    _ = self.engine
    return str(self._engine_str)
var IS_THREAD_SAFE : str

Return whether this connector may be multithreaded.

Expand source code
@property
def IS_THREAD_SAFE(self) -> str:
    """
    Return whether this connector may be multithreaded.
    """
    if self.flavor == 'duckdb':
        return False
    if self.flavor == 'sqlite':
        return ':memory:' not in self.URI
    return True
var Session
Expand source code
@property
def Session(self):
    if '_Session' not in self.__dict__:
        if self.engine is None:
            return None

        from meerschaum.utils.packages import attempt_import
        sqlalchemy_orm = attempt_import('sqlalchemy.orm')
        session_factory = sqlalchemy_orm.sessionmaker(self.engine)
        self._Session = sqlalchemy_orm.scoped_session(session_factory)

    return self._Session
var URI : str

Return the URI connection string.

Expand source code
@property
def URI(self) -> str:
    """
    Return the URI connection string.
    """
    _ = self.engine
    return str(self._engine_str)
var db : Optional[databases.Database]
Expand source code
@property
def db(self) -> Optional[databases.Database]:
    from meerschaum.utils.packages import attempt_import
    databases = attempt_import('databases', lazy=False, install=True)
    url = self.DATABASE_URL
    if 'mysql' in url:
        url = url.replace('+pymysql', '')
    if '_db' not in self.__dict__:
        try:
            self._db = databases.Database(url)
        except KeyError:
            ### Likely encountered an unsupported flavor.
            from meerschaum.utils.warnings import warn
            self._db = None
    return self._db
var db_version : Union[str, None]

Return the database version.

Expand source code
@property
def db_version(self) -> Union[str, None]:
    """
    Return the database version.
    """
    _db_version = self.__dict__.get('_db_version', None)
    if _db_version is not None:
        return _db_version

    from meerschaum.utils.sql import get_db_version
    self._db_version = get_db_version(self)
    return self._db_version
var engine
Expand source code
@property
def engine(self):
    import os, threading
    ### build the sqlalchemy engine
    if '_engine' not in self.__dict__:
        self._engine, self._engine_str = self.create_engine(include_uri=True)

    same_process = os.getpid() == self._pid
    same_thread = threading.current_thread().ident == self._thread_ident

    ### handle child processes
    if not same_process:
        self._pid = os.getpid()
        self._thread = threading.current_thread()
        from meerschaum.utils.warnings import warn
        warn(f"Different PID detected. Disposing of connections...")
        self._engine.dispose()

    ### handle different threads
    if not same_thread:
        pass

    return self._engine
var metadata
Expand source code
@property
def metadata(self):
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')
    if '_metadata' not in self.__dict__:
        self._metadata = sqlalchemy.MetaData()
    return self._metadata

Methods

def clear_pipe(self, pipe: mrsm.Pipe, begin: Union[datetime, int, None] = None, end: Union[datetime, int, None] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw) ‑> Tuple[bool, str]

Delete a pipe's data within a bounded or unbounded interval without dropping the table.

Parameters

pipe : mrsm.Pipe
The pipe to clear.
begin : Union[datetime, int, None], default None
Beginning datetime. Inclusive.
end : Union[datetime, int, None], default None
Ending datetime. Exclusive.
params : Optional[Dict[str, Any]], default None
See build_where().
Expand source code
def clear_pipe(
        self,
        pipe: mrsm.Pipe,
        begin: Union[datetime, int, None] = None,
        end: Union[datetime, int, None] = None,
        params: Optional[Dict[str, Any]] = None,
        debug: bool = False,
        **kw
    ) -> SuccessTuple:
    """
    Delete a pipe's data within a bounded or unbounded interval without dropping the table.

    Parameters
    ----------
    pipe: mrsm.Pipe
        The pipe to clear.
        
    begin: Union[datetime, int, None], default None
        Beginning datetime. Inclusive.

    end: Union[datetime, int, None], default None
         Ending datetime. Exclusive.

    params: Optional[Dict[str, Any]], default None
         See `meerschaum.utils.sql.build_where`.

    """
    if not pipe.exists(debug=debug):
        return True, f"{pipe} does not exist, so nothing was cleared."

    from meerschaum.utils.sql import sql_item_name, build_where, dateadd_str
    from meerschaum.utils.warnings import warn
    pipe_name = sql_item_name(pipe.target, self.flavor)

    if not pipe.columns.get('datetime', None):
        _dt = pipe.guess_datetime()
        dt_name = sql_item_name(_dt, self.flavor) if _dt else None
        is_guess = True
    else:
        _dt = pipe.get_columns('datetime')
        dt_name = sql_item_name(_dt, self.flavor)
        is_guess = False

    if begin is not None or end is not None:
        if is_guess:
            if _dt is None:
                warn(
                    f"No datetime could be determined for {pipe}."
                    + "\n    Ignoring datetime bounds...",
                    stack = False,
                )
                begin, end = None, None
            else:
                warn(
                    f"A datetime wasn't specified for {pipe}.\n"
                    + f"    Using column \"{_dt}\" for datetime bounds...",
                    stack = False,
                )

    valid_params = {}
    if params is not None:
        existing_cols = pipe.get_columns_types(debug=debug)
        valid_params = {k: v for k, v in params.items() if k in existing_cols}
    clear_query = (
        f"DELETE FROM {pipe_name}\nWHERE 1 = 1\n"
        + ('  AND ' + build_where(valid_params, self, with_where=False) if valid_params else '')
        + (
            f'  AND {dt_name} >= ' + dateadd_str(self.flavor, 'day', 0, begin)
            if begin is not None else ''
        ) + (
            f'  AND {dt_name} < ' + dateadd_str(self.flavor, 'day', 0, end)
            if end is not None else ''
        )
    )
    success = self.exec(clear_query, silent=True, debug=debug) is not None
    msg = "Success" if success else f"Failed to clear {pipe}."
    return success, msg
def cli(self, debug: bool = False) ‑> Tuple[bool, str]

Launch an interactive CLI for the SQLConnector's flavor.

Expand source code
def cli(
        self,
        debug : bool = False
    ) -> SuccessTuple:
    """Launch an interactive CLI for the SQLConnector's flavor."""
    from meerschaum.utils.packages import venv_exec, attempt_import
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.warnings import error
    import sys, subprocess, os

    if self.flavor not in flavor_clis:
        return False, f"No CLI available for flavor '{self.flavor}'."

    if self.flavor == 'duckdb':
        gadwall = attempt_import('gadwall', debug = debug, lazy=False)
        gadwall_shell = gadwall.Gadwall(self.database)
        try:
            gadwall_shell.cmdloop()
        except KeyboardInterrupt:
            pass
        return True, "Success"
    elif self.flavor == 'mssql':
        if 'DOTNET_SYSTEM_GLOBALIZATION_INVARIANT' not in os.environ:
            os.environ['DOTNET_SYSTEM_GLOBALIZATION_INVARIANT'] = '1'

    cli_name = flavor_clis[self.flavor]

    ### Install the CLI package and any dependencies.
    cli, cli_main = attempt_import(cli_name, (cli_name + '.main'), lazy=False, debug=debug)
    if cli_name in cli_deps:
        for dep in cli_deps[cli_name]:
            locals()[dep] = attempt_import(dep, lazy=False, warn=False, debug=debug)

    ### NOTE: The `DATABASE_URL` property must be initialized first in case the database is not
    ### yet defined (e.g. 'sql:local').
    cli_arg_str = self.DATABASE_URL
    if self.flavor in ('sqlite', 'duckdb'):
        cli_arg_str = str(self.database)

    ### Define the script to execute to launch the CLI.
    ### The `mssqlcli` script is manually written to avoid telemetry
    ### and because `main.cli()` is not defined.
    launch_cli = f"cli_main.cli(['{cli_arg_str}'])"
    if self.flavor == 'mssql':
        launch_cli = (
            "mssqlclioptionsparser, mssql_cli = attempt_import("
            + "'mssqlcli.mssqlclioptionsparser', 'mssqlcli.mssql_cli', lazy=False)\n"
            + "ms_parser = mssqlclioptionsparser.create_parser()\n"
            + f"ms_options = ms_parser.parse_args(['--server', 'tcp:{self.host},{self.port}', "
            + f"'--database', '{self.database}', "
            + f"'--username', '{self.username}', '--password', '{self.password}'])\n"
            + "ms_object = mssql_cli.MssqlCli(ms_options)\n"
            + "try:\n"
            + "    ms_object.connect_to_database()\n"
            + "    ms_object.run()\n"
            + "finally:\n"
            + "    ms_object.shutdown()"
        )
    elif self.flavor == 'duckdb':
        launch_cli = ()

    try:
        if debug:
            dprint(f'Launching CLI:\n{launch_cli}')
        exec(launch_cli)
        success, msg = True, 'Success'
    except Exception as e:
        success, msg = False, str(e)

    return success, msg
def create_engine(self, include_uri: bool = False, debug: bool = False, **kw) ‑> sqlalchemy.engine.Engine

Create a sqlalchemy engine by building the engine string.

Expand source code
def create_engine(
        self,
        include_uri: bool = False,
        debug: bool = False,
        **kw
    ) -> 'sqlalchemy.engine.Engine':
    """Create a sqlalchemy engine by building the engine string."""
    from meerschaum.utils.packages import attempt_import
    from meerschaum.utils.warnings import error, warn
    sqlalchemy = attempt_import('sqlalchemy')
    import urllib
    import copy
    ### Install and patch required drivers.
    if self.flavor in install_flavor_drivers:
        attempt_import(*install_flavor_drivers[self.flavor], debug=debug, lazy=False, warn=False)
    if self.flavor in require_patching_flavors:
        from meerschaum.utils.packages import determine_version, _monkey_patch_get_distribution
        import pathlib
        for install_name, import_name in require_patching_flavors[self.flavor]:
            pkg = attempt_import(
                import_name,
                debug = debug,
                lazy = False,
                warn = False
            )
            _monkey_patch_get_distribution(
                install_name, determine_version(pathlib.Path(pkg.__file__), venv='mrsm')
            )

    ### supplement missing values with defaults (e.g. port number)
    for a, value in flavor_configs[self.flavor]['defaults'].items():
        if a not in self.__dict__:
            self.__dict__[a] = value

    ### Verify that everything is in order.
    if self.flavor not in flavor_configs:
        error(f"Cannot create a connector with the flavor '{self.flavor}'.")

    _engine = flavor_configs[self.flavor].get('engine', None)
    _username = self.__dict__.get('username', None)
    _password = self.__dict__.get('password', None)
    _host = self.__dict__.get('host', None)
    _port = self.__dict__.get('port', None)
    _database = self.__dict__.get('database', None)
    _driver = self.__dict__.get('driver', None)
    if _driver is not None:
        ### URL-encode the driver if a space is detected.
        ### Not a bullet-proof solution, but this should work for most cases.
        _driver = urllib.parse.quote_plus(_driver) if ' ' in _driver else _driver
    _uri = self.__dict__.get('uri', None)

    ### Handle registering specific dialects (due to installing in virtual environments).
    if self.flavor in flavor_dialects:
        sqlalchemy.dialects.registry.register(*flavor_dialects[self.flavor])

    ### self._sys_config was deepcopied and can be updated safely
    if self.flavor in ("sqlite", "duckdb"):
        engine_str = f"{_engine}:///{_database}" if not _uri else _uri
        if 'create_engine' not in self._sys_config:
            self._sys_config['create_engine'] = {}
        if 'connect_args' not in self._sys_config['create_engine']:
            self._sys_config['create_engine']['connect_args'] = {}
        self._sys_config['create_engine']['connect_args'].update({"check_same_thread" : False})
    else:
        engine_str = (
            _engine + "://" + (_username if _username is not None else '') +
            ((":" + urllib.parse.quote_plus(_password)) if _password is not None else '') +
            "@" + _host + ((":" + str(_port)) if _port is not None else '') +
            (("/" + _database) if _database is not None else '')
            + (("?driver=" + _driver) if _driver is not None else '')
        ) if not _uri else _uri

        ### Sometimes the timescaledb:// flavor can slip in.
        if _uri and self.flavor in ('timescaledb',) and self.flavor in _uri:
            engine_str = engine_str.replace(f'{self.flavor}://', 'postgresql://')

    if debug:
        dprint(
            (
                (engine_str.replace(':' + _password, ':' + ('*' * len(_password))))
                    if _password is not None else engine_str
            ) + '\n' + f"{self._sys_config.get('create_engine', {}).get('connect_args', {})}"
        )

    _kw_copy = copy.deepcopy(kw)

    ### NOTE: Order of inheritance:
    ###       1. Defaults
    ###       2. System configuration
    ###       3. Connector configuration
    ###       4. Keyword arguments
    _create_engine_args = flavor_configs.get(self.flavor, {}).get('create_engine', {})
    def _apply_create_engine_args(update):
        if 'ALL' not in flavor_configs[self.flavor].get('omit_create_engine', {}):
            _create_engine_args.update(
                { k: v for k, v in update.items()
                    if 'omit_create_engine' not in flavor_configs[self.flavor]
                        or k not in flavor_configs[self.flavor].get('omit_create_engine')
                }
            )
    _apply_create_engine_args(self._sys_config.get('create_engine', {}))
    _apply_create_engine_args(self.__dict__.get('create_engine', {}))
    _apply_create_engine_args(_kw_copy)

    try:
        engine = sqlalchemy.create_engine(
            engine_str,
            ### I know this looks confusing, and maybe it's bad code,
            ### but it's simple. It dynamically parses the config string
            ### and splits it to separate the class name (QueuePool)
            ### from the module name (sqlalchemy.pool).
            poolclass    = getattr(
                attempt_import(
                    ".".join(self._sys_config['poolclass'].split('.')[:-1])
                ),
                self._sys_config['poolclass'].split('.')[-1]
            ),
            echo         = debug,
            **_create_engine_args
        )
    except Exception as e:
        warn(f"Failed to create connector '{self}':\n{traceback.format_exc()}", stack=False)
        engine = None

    if include_uri:
        return engine, engine_str
    return engine
def create_indices(self, pipe: mrsm.Pipe, indices: Optional[List[str]] = None, debug: bool = False) ‑> bool

Create a pipe's indices.

Expand source code
def create_indices(
        self,
        pipe: mrsm.Pipe,
        indices: Optional[List[str]] = None,
        debug: bool = False
    ) -> bool:
    """
    Create a pipe's indices.
    """
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.debug import dprint
    if debug:
        dprint(f"Creating indices for {pipe}...")
    if not pipe.columns:
        warn(f"Unable to create indices for {pipe} without columns.", stack=False)
        return False
    ix_queries = {
        ix: queries
        for ix, queries in self.get_create_index_queries(pipe, debug=debug).items()
        if indices is None or ix in indices
    }
    success = True
    for ix, queries in ix_queries.items():
        ix_success = all(self.exec_queries(queries, debug=debug, silent=True))
        if not ix_success:
            success = False
            if debug:
                dprint(f"Failed to create index on column: {ix}")
    return success
def deduplicate_pipe(self, pipe: mrsm.Pipe, begin: Union[datetime, int, None] = None, end: Union[datetime, int, None] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kwargs: Any) ‑> Tuple[bool, str]

Delete duplicate values within a pipe's table.

Parameters

pipe : mrsm.Pipe
The pipe whose table to deduplicate.
begin : Union[datetime, int, None], default None
If provided, only deduplicate values greater than or equal to this value.
end : Union[datetime, int, None], default None
If provided, only deduplicate values less than this value.
params : Optional[Dict[str, Any]], default None
If provided, further limit deduplication to values which match this query dictionary.
debug : bool, default False
Verbosity toggle.

Returns

A SuccessTuple indicating success.

Expand source code
def deduplicate_pipe(
        self,
        pipe: mrsm.Pipe,
        begin: Union[datetime, int, None] = None,
        end: Union[datetime, int, None] = None,
        params: Optional[Dict[str, Any]] = None,
        debug: bool = False,
        **kwargs: Any
    ) -> SuccessTuple:
    """
    Delete duplicate values within a pipe's table.

    Parameters
    ----------
    pipe: mrsm.Pipe
        The pipe whose table to deduplicate.

    begin: Union[datetime, int, None], default None
        If provided, only deduplicate values greater than or equal to this value.

    end: Union[datetime, int, None], default None
        If provided, only deduplicate values less than this value.

    params: Optional[Dict[str, Any]], default None
        If provided, further limit deduplication to values which match this query dictionary.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `SuccessTuple` indicating success.
    """
    from meerschaum.utils.sql import (
        sql_item_name,
        NO_CTE_FLAVORS,
        get_rename_table_queries,
        NO_SELECT_INTO_FLAVORS,
        get_create_table_query,
        format_cte_subquery,
        get_null_replacement,
    )
    from meerschaum.utils.misc import generate_password, flatten_list

    pipe_table_name = sql_item_name(pipe.target, self.flavor)

    if not pipe.exists(debug=debug):
        return False, f"Table {pipe_table_name} does not exist."

    ### TODO: Handle deleting duplicates without a datetime axis.
    dt_col = pipe.columns.get('datetime', None)
    dt_col_name = sql_item_name(dt_col, self.flavor)
    cols_types = pipe.get_columns_types(debug=debug)
    existing_cols = pipe.get_columns_types(debug=debug)

    get_rowcount_query = f"SELECT COUNT(*) FROM {pipe_table_name}"
    old_rowcount = self.value(get_rowcount_query, debug=debug)
    if old_rowcount is None:
        return False, f"Failed to get rowcount for table {pipe_table_name}."

    ### Non-datetime indices that in fact exist.
    indices = [
        col
        for key, col in pipe.columns.items()
        if col and col != dt_col and col in cols_types
    ]
    indices_names = [sql_item_name(index_col, self.flavor) for index_col in indices]
    existing_cols_names = [sql_item_name(col, self.flavor) for col in existing_cols]
    duplicates_cte_name = sql_item_name('dups', self.flavor)
    duplicate_row_number_name = sql_item_name('dup_row_num', self.flavor)
    previous_row_number_name = sql_item_name('prev_row_num', self.flavor)
    
    index_list_str = (
        sql_item_name(dt_col, self.flavor)
        if dt_col
        else ''
    )
    index_list_str_ordered = (
        (
            sql_item_name(dt_col, self.flavor) + " DESC"
        )
        if dt_col
        else ''
    )
    if indices:
        index_list_str += ', ' + ', '.join(indices_names)
        index_list_str_ordered += ', ' + ', '.join(indices_names)
    if index_list_str.startswith(','):
        index_list_str = index_list_str.lstrip(',').lstrip()
    if index_list_str_ordered.startswith(','):
        index_list_str_ordered = index_list_str_ordered.lstrip(',').lstrip()

    cols_list_str = ', '.join(existing_cols_names)

    try:
        ### NOTE: MySQL 5 and below does not support window functions (ROW_NUMBER()).
        is_old_mysql = (
            self.flavor in ('mysql', 'mariadb')
            and
            int(self.db_version.split('.')[0]) < 8
        )
    except Exception as e:
        is_old_mysql = False

    src_query = f"""
        SELECT
            {cols_list_str},
            ROW_NUMBER() OVER (
                PARTITION BY
                {index_list_str}
                ORDER BY {index_list_str_ordered}
            ) AS {duplicate_row_number_name}
        FROM {pipe_table_name}
    """
    duplicates_cte_subquery = format_cte_subquery(
        src_query,
        self.flavor,
        sub_name = 'src',
        cols_to_select = cols_list_str,
    ) + f"""
        WHERE {duplicate_row_number_name} = 1
        """
    old_mysql_query = (
        f"""
        SELECT
            {index_list_str}
        FROM (
          SELECT
            {index_list_str},
            IF(
                @{previous_row_number_name} <> {index_list_str.replace(', ', ' + ')},
                @{duplicate_row_number_name} := 0,
                @{duplicate_row_number_name}
            ),
            @{previous_row_number_name} := {index_list_str.replace(', ', ' + ')},
            @{duplicate_row_number_name} := @{duplicate_row_number_name} + 1 AS """
        + f"""{duplicate_row_number_name}
          FROM
            {pipe_table_name},
            (
                SELECT @{duplicate_row_number_name} := 0
            ) AS {duplicate_row_number_name},
            (
                SELECT @{previous_row_number_name} := '{get_null_replacement('str', 'mysql')}'
            ) AS {previous_row_number_name}
          ORDER BY {index_list_str_ordered}
        ) AS t
        WHERE {duplicate_row_number_name} = 1
        """
    )
    if is_old_mysql:
        duplicates_cte_subquery = old_mysql_query

    session_id = generate_password(3)

    dedup_table = '_' + session_id + f'_dedup_{pipe.target}'
    temp_old_table = '_' + session_id + f"_old_{pipe.target}"

    dedup_table_name = sql_item_name(dedup_table, self.flavor)
    temp_old_table_name = sql_item_name(temp_old_table, self.flavor)
    duplicates_count_name = sql_item_name('num_duplicates', self.flavor)

    create_temporary_table_query = get_create_table_query(
        duplicates_cte_subquery, 
        dedup_table,
        self.flavor,
    ) + f"""
    ORDER BY {index_list_str_ordered}
    """
    alter_queries = flatten_list([
        get_rename_table_queries(pipe.target, temp_old_table, self.flavor),
        get_rename_table_queries(dedup_table, pipe.target, self.flavor),
        f"""
        DROP TABLE {temp_old_table_name}
        """,
    ])

    create_temporary_result = self.execute(create_temporary_table_query, debug=debug)
    if create_temporary_result is None:
        return False, f"Failed to deduplicate table {pipe_table_name}."

    results = self.exec_queries(
        alter_queries,
        break_on_error = True,
        rollback = True,
        debug = debug,
    )

    fail_query = None
    for result, query in zip(results, alter_queries):
        if result is None:
            fail_query = query
            break
    success = fail_query is None

    new_rowcount = (
        self.value(get_rowcount_query, debug=debug)
        if success
        else None
    )

    msg = (
        (
            f"Successfully deduplicated table {pipe_table_name}"
            + (
                f"\nfrom {old_rowcount} to {new_rowcount} rows"
                if old_rowcount != new_rowcount
                else ''
            )
            + '.'
        )
        if success
        else f"Failed to execute query:\n{fail_query}"
    )
    return success, msg
def delete_pipe(self, pipe: mrsm.Pipe, debug: bool = False) ‑> Tuple[bool, str]

Delete a Pipe's registration and drop its table.

Expand source code
def delete_pipe(
        self,
        pipe: mrsm.Pipe,
        debug: bool = False,
    ) -> SuccessTuple:
    """
    Delete a Pipe's registration and drop its table.
    """
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.sql import sql_item_name
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    ### try dropping first
    drop_tuple = pipe.drop(debug=debug)
    if not drop_tuple[0]:
        return drop_tuple

    if not pipe.id:
        return False, f"{pipe} is not registered."

    ### ensure pipes table exists
    from meerschaum.connectors.sql.tables import get_tables
    pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes']

    q = sqlalchemy.delete(pipes_tbl).where(pipes_tbl.c.pipe_id == pipe.id)
    if not self.exec(q, debug=debug):
        return False, f"Failed to delete registration for {pipe}."

    return True, "Success"
def delete_plugin(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False, **kw: Any) ‑> SuccessTuple

Delete a plugin from the plugins table.

Expand source code
def delete_plugin(
        self,
        plugin: 'meerschaum.core.Plugin',
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """Delete a plugin from the plugins table."""
    from meerschaum.utils.warnings import warn, error
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')
    from meerschaum.connectors.sql.tables import get_tables
    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']

    plugin_id = self.get_plugin_id(plugin, debug=debug)
    if plugin_id is None:
        return True, f"Plugin '{plugin}' was not registered."

    bind_variables = {
        'plugin_id' : plugin_id,
    }

    query = sqlalchemy.delete(plugins_tbl).where(plugins_tbl.c.plugin_id == plugin_id)
    result = self.exec(query, debug=debug)
    if result is None:
        return False, f"Failed to delete plugin '{plugin}'."
    return True, f"Successfully deleted plugin '{plugin}'."
def delete_user(self, user: meerschaum.core.User, debug: bool = False) ‑> SuccessTuple

Delete a user's record from the users table.

Expand source code
def delete_user(
        self,
        user: meerschaum.core.User,
        debug: bool = False
    ) -> SuccessTuple:
    """Delete a user's record from the users table."""
    ### ensure users table exists
    from meerschaum.connectors.sql.tables import get_tables
    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']
    plugins = get_tables(mrsm_instance=self, debug=debug)['plugins']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)

    if user_id is None:
        return False, f"User '{user.username}' is not registered and cannot be deleted."

    query = sqlalchemy.delete(users_tbl).where(users_tbl.c.user_id == user_id)

    result = self.exec(query, debug=debug)
    if result is None:
        return False, f"Failed to delete user '{user}'."

    query = sqlalchemy.delete(plugins).where(plugins.c.user_id == user_id)
    result = self.exec(query, debug=debug)
    if result is None:
        return False, f"Failed to delete plugins of user '{user}'."

    return True, f"Successfully deleted user '{user}'"
def drop_indices(self, pipe: mrsm.Pipe, indices: Optional[List[str]] = None, debug: bool = False) ‑> bool

Drop a pipe's indices.

Expand source code
def drop_indices(
        self,
        pipe: mrsm.Pipe,
        indices: Optional[List[str]] = None,
        debug: bool = False
    ) -> bool:
    """
    Drop a pipe's indices.
    """
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.debug import dprint
    if debug:
        dprint(f"Dropping indices for {pipe}...")
    if not pipe.columns:
        warn(f"Unable to drop indices for {pipe} without columns.", stack=False)
        return False
    ix_queries = {
        ix: queries
        for ix, queries in self.get_drop_index_queries(pipe, debug=debug).items()
        if indices is None or ix in indices
    }
    success = True
    for ix, queries in ix_queries.items():
        ix_success = all(self.exec_queries(queries, debug=debug, silent=True))
        if not ix_success:
            success = False
            if debug:
                dprint(f"Failed to drop index on column: {ix}")
    return success
def drop_pipe(self, pipe: mrsm.Pipe, debug: bool = False, **kw) ‑> Tuple[bool, str]

Drop a pipe's tables but maintain its registration.

Parameters

pipe : mrsm.Pipe
The pipe to drop.
Expand source code
def drop_pipe(
        self,
        pipe: mrsm.Pipe,
        debug: bool = False,
        **kw
    ) -> SuccessTuple:
    """
    Drop a pipe's tables but maintain its registration.

    Parameters
    ----------
    pipe: mrsm.Pipe
        The pipe to drop.
        
    """
    from meerschaum.utils.sql import table_exists, sql_item_name
    success = True
    target, temp_target = pipe.target, '_' + pipe.target
    target_name, temp_name = (
        sql_item_name(target, self.flavor),
        sql_item_name(temp_target, self.flavor),
    )
    if table_exists(target, self, debug=debug):
        success = self.exec(f"DROP TABLE {target_name}", silent=True, debug=debug) is not None
    if table_exists(temp_target, self, debug=debug):
        success = (
            success
            and self.exec(f"DROP TABLE {temp_name}", silent=True, debug=debug) is not None
        )

    msg = "Success" if success else f"Failed to drop {pipe}."
    return success, msg
def edit_pipe(self, pipe: mrsm.Pipe = None, patch: bool = False, debug: bool = False, **kw: Any) ‑> Tuple[bool, str]

Persist a Pipe's parameters to its database.

Parameters

pipe : mrsm.Pipe, default None
The pipe to be edited.
patch : bool, default False
If patch is True, update the existing parameters by cascading. Otherwise overwrite the parameters (default).
debug : bool, default False
Verbosity toggle.
Expand source code
def edit_pipe(
        self,
        pipe : mrsm.Pipe = None,
        patch: bool = False,
        debug: bool = False,
        **kw : Any
    ) -> SuccessTuple:
    """
    Persist a Pipe's parameters to its database.

    Parameters
    ----------
    pipe: mrsm.Pipe, default None
        The pipe to be edited.
    patch: bool, default False
        If patch is `True`, update the existing parameters by cascading.
        Otherwise overwrite the parameters (default).
    debug: bool, default False
        Verbosity toggle.
    """

    if pipe.id is None:
        return False, f"{pipe} is not registered and cannot be edited."

    from meerschaum.utils.debug import dprint
    from meerschaum.utils.packages import attempt_import
    from meerschaum.utils.sql import json_flavors
    if not patch:
        parameters = pipe.__dict__.get('_attributes', {}).get('parameters', {})
    else:
        from meerschaum import Pipe
        from meerschaum.config._patch import apply_patch_to_config
        original_parameters = Pipe(
            pipe.connector_keys, pipe.metric_key, pipe.location_key,
            mrsm_instance=pipe.instance_keys
        ).parameters
        parameters = apply_patch_to_config(
            original_parameters,
            pipe.parameters
        )

    ### ensure pipes table exists
    from meerschaum.connectors.sql.tables import get_tables
    pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes']

    import json
    sqlalchemy = attempt_import('sqlalchemy')

    values = {
        'parameters': (
            json.dumps(parameters)
            if self.flavor not in json_flavors
            else parameters
        ),
    }
    q = sqlalchemy.update(pipes_tbl).values(**values).where(
        pipes_tbl.c.pipe_id == pipe.id
    )

    result = self.exec(q, debug=debug)
    message = (
        f"Successfully edited {pipe}."
        if result is not None else f"Failed to edit {pipe}."
    )
    return (result is not None), message
def edit_user(self, user: meerschaum.core.User, debug: bool = False, **kw: Any) ‑> SuccessTuple

Update an existing user's metadata.

Expand source code
def edit_user(
        self,
        user: meerschaum.core.User,
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """Update an existing user's metadata."""
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')
    from meerschaum.connectors.sql.tables import get_tables
    from meerschaum.utils.sql import json_flavors
    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']

    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)
    if user_id is None:
        return False, (
            f"User '{user.username}' does not exist. " +
            f"Register user '{user.username}' before editing."
        )
    user.user_id = user_id

    import json
    valid_tuple = valid_username(user.username)
    if not valid_tuple[0]:
        return valid_tuple

    bind_variables = {
        'user_id' : user_id,
        'username' : user.username,
    }
    if user.password != '':
        bind_variables['password_hash'] = user.password_hash
    if user.email != '':
        bind_variables['email'] = user.email
    if user.attributes is not None and user.attributes != {}:
        bind_variables['attributes'] = (
            json.dumps(user.attributes) if self.flavor in ('duckdb',)
            else user.attributes
        )
    if user.type != '':
        bind_variables['user_type'] = user.type

    query = (
        sqlalchemy
        .update(users_tbl)
        .values(**bind_variables)
        .where(users_tbl.c.user_id == user_id)
    )

    result = self.exec(query, debug=debug)
    if result is None:
        return False, f"Failed to edit user '{user}'."
    return True, f"Successfully edited user '{user}'."
def exec(self, query: str, *args: Any, silent: bool = False, debug: bool = False, commit: Optional[bool] = None, close: Optional[bool] = None, with_connection: bool = False, **kw: Any) ‑> Union[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.cursor.LegacyCursorResult, Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection], Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection], None]

Execute SQL code and return the sqlalchemy result, e.g. when calling stored procedures.

If inserting data, please use bind variables to avoid SQL injection!

Parameters

query : Union[str, List[str], Tuple[str]]
The query to execute. If query is a list or tuple, call self.exec_queries() instead.
args : Any
Arguments passed to sqlalchemy.engine.execute.
silent : bool, default False
If True, suppress warnings.
commit : Optional[bool], default None
If True, commit the changes after execution. Causes issues with flavors like 'mssql'. This does not apply if query is a list of strings.
close : Optional[bool], default None
If True, close the connection after execution. Causes issues with flavors like 'mssql'. This does not apply if query is a list of strings.
with_connection : bool, default False
If True, return a tuple including the connection object. This does not apply if query is a list of strings.

Returns

The sqlalchemy result object, or a tuple with the connection if with_connection is provided.

Expand source code
def exec(
        self,
        query: str,
        *args: Any,
        silent: bool = False,
        debug: bool = False,
        commit: Optional[bool] = None,
        close: Optional[bool] = None,
        with_connection: bool = False,
        **kw: Any
    ) -> Union[
            sqlalchemy.engine.result.resultProxy,
            sqlalchemy.engine.cursor.LegacyCursorResult,
            Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection],
            Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection],
            None
    ]:
    """
    Execute SQL code and return the `sqlalchemy` result, e.g. when calling stored procedures.
    
    If inserting data, please use bind variables to avoid SQL injection!

    Parameters
    ----------
    query: Union[str, List[str], Tuple[str]]
        The query to execute.
        If `query` is a list or tuple, call `self.exec_queries()` instead.

    args: Any
        Arguments passed to `sqlalchemy.engine.execute`.
        
    silent: bool, default False
        If `True`, suppress warnings.

    commit: Optional[bool], default None
        If `True`, commit the changes after execution.
        Causes issues with flavors like `'mssql'`.
        This does not apply if `query` is a list of strings.

    close: Optional[bool], default None
        If `True`, close the connection after execution.
        Causes issues with flavors like `'mssql'`.
        This does not apply if `query` is a list of strings.

    with_connection: bool, default False
        If `True`, return a tuple including the connection object.
        This does not apply if `query` is a list of strings.
    
    Returns
    -------
    The `sqlalchemy` result object, or a tuple with the connection if `with_connection` is provided.

    """
    if isinstance(query, (list, tuple)):
        return self.exec_queries(
            list(query),
            *args,
            silent = silent,
            debug = debug,
            **kw
        )

    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import("sqlalchemy")
    if debug:
        dprint(f"[{self}] Executing query:\n{query}")

    _close = close if close is not None else (self.flavor != 'mssql')
    _commit = commit if commit is not None else (
        (self.flavor != 'mssql' or 'select' not in str(query).lower())
    )

    ### Select and Insert objects need to be compiled (SQLAlchemy 2.0.0+).
    if not hasattr(query, 'compile'):
        query = sqlalchemy.text(query)

    connection = self.engine.connect()
    transaction = connection.begin() if _commit else None
    try:
        result = connection.execute(query, *args, **kw)
        if _commit:
            transaction.commit()
    except Exception as e:
        if debug:
            dprint(f"[{self}] Failed to execute query:\n\n{query}\n\n{e}")
        if not silent:
            warn(str(e))
        result = None
        if _commit:
            transaction.rollback()
    finally:
        if _close:
            connection.close()

        if with_connection:
            return result, connection

    return result
def exec_queries(self, queries: List[str], break_on_error: bool = False, rollback: bool = True, silent: bool = False, debug: bool = False) ‑> List[sqlalchemy.engine.cursor.LegacyCursorResult]

Execute a list of queries in a single transaction.

Parameters

queries : List[str]
The queries in the transaction to be executed.
break_on_error : bool, default True
If True, stop executing when a query fails.
rollback : bool, default True
If break_on_error is True, rollback the transaction if a query fails.
silent : bool, default False
If True, suppress warnings.

Returns

A list of SQLAlchemy results.

Expand source code
def exec_queries(
        self,
        queries: List[str],
        break_on_error: bool = False,
        rollback: bool = True,
        silent: bool = False,
        debug: bool = False,
    ) -> List[sqlalchemy.engine.cursor.LegacyCursorResult]:
    """
    Execute a list of queries in a single transaction.

    Parameters
    ----------
    queries: List[str]
        The queries in the transaction to be executed.

    break_on_error: bool, default True
        If `True`, stop executing when a query fails.

    rollback: bool, default True
        If `break_on_error` is `True`, rollback the transaction if a query fails.

    silent: bool, default False
        If `True`, suppress warnings.

    Returns
    -------
    A list of SQLAlchemy results.
    """
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    results = []
    with self.engine.begin() as connection:
        for query in queries:
            if debug:
                dprint(f"[{self}]\n" + str(query))
            if isinstance(query, str):
                query = sqlalchemy.text(query)

            try:
                result = connection.execute(query)
            except Exception as e:
                msg = (f"Encountered error while executing:\n{e}")
                if not silent:
                    warn(msg)
                elif debug:
                    dprint(f"[{self}]\n" + str(msg))
                result = None
            results.append(result)
            if result is None and break_on_error:
                if rollback:
                    connection.rollback()
                break
    return results
def execute(self, *args: Any, **kw: Any) ‑> Optional[sqlalchemy.engine.result.resultProxy]

An alias for meerschaum.connectors.sql.SQLConnector.exec.

Expand source code
def execute(
        self,
        *args : Any,
        **kw : Any
    ) -> Optional[sqlalchemy.engine.result.resultProxy]:
    """
    An alias for `meerschaum.connectors.sql.SQLConnector.exec`.
    """
    return self.exec(*args, **kw)
def fetch(self, pipe: Pipe, begin: Union[datetime, int, str, None] = '', end: Union[datetime, int, str, None] = None, check_existing: bool = True, chunk_hook: Optional[Callable[[pd.DataFrame], Any]] = None, chunksize: Optional[int] = -1, workers: Optional[int] = None, debug: bool = False, **kw: Any) ‑> Union['pd.DataFrame', List[Any], None]

Execute the SQL definition and return a Pandas DataFrame.

Parameters

pipe : Pipe

The pipe object which contains the fetch metadata.

  • pipe.columns['datetime']: str
    • Name of the datetime column for the remote table.
  • pipe.parameters['fetch']: Dict[str, Any]
    • Parameters necessary to execute a query.
  • pipe.parameters['fetch']['definition']: str
    • Raw SQL query to execute to generate the pandas DataFrame.
  • pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
    • How many minutes before begin to search for data (optional).
begin : Union[datetime, int, str, None], default None
Most recent datatime to search for data. If backtrack_minutes is provided, subtract backtrack_minutes.
end : Union[datetime, int, str, None], default None
The latest datetime to search for data. If end is None, do not bound
check_existing : bool, defult True
If False, use a backtrack interval of 0 minutes.
chunk_hook : Callable[[pd.DataFrame], Any], default None
A function to pass to read() that accepts a Pandas DataFrame.
chunksize : Optional[int], default -1
How many rows to load into memory at once (when chunk_hook is provided). Otherwise the entire result set is loaded into memory.
workers : Optional[int], default None
How many threads to use when consuming the generator (when `chunk_hook is provided). Defaults to the number of cores.
debug : bool, default False
Verbosity toggle.

Returns

A pandas DataFrame or None. If chunk_hook is not None, return a list of the hook function's results.

Expand source code
def fetch(
        self,
        pipe: meerschaum.Pipe,
        begin: Union[datetime, int, str, None] = '',
        end: Union[datetime, int, str, None] = None,
        check_existing: bool = True,
        chunk_hook: Optional[Callable[[pd.DataFrame], Any]] = None,
        chunksize: Optional[int] = -1,
        workers: Optional[int] = None,
        debug: bool = False,
        **kw: Any
    ) -> Union['pd.DataFrame', List[Any], None]:
    """Execute the SQL definition and return a Pandas DataFrame.

    Parameters
    ----------
    pipe: meerschaum.Pipe
        The pipe object which contains the `fetch` metadata.
        
        - pipe.columns['datetime']: str
            - Name of the datetime column for the remote table.
        - pipe.parameters['fetch']: Dict[str, Any]
            - Parameters necessary to execute a query.
        - pipe.parameters['fetch']['definition']: str
            - Raw SQL query to execute to generate the pandas DataFrame.
        - pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
            - How many minutes before `begin` to search for data (*optional*).

    begin: Union[datetime, int, str, None], default None
        Most recent datatime to search for data.
        If `backtrack_minutes` is provided, subtract `backtrack_minutes`.

    end: Union[datetime, int, str, None], default None
        The latest datetime to search for data.
        If `end` is `None`, do not bound 

    check_existing: bool, defult True
        If `False`, use a backtrack interval of 0 minutes.

    chunk_hook: Callable[[pd.DataFrame], Any], default None
        A function to pass to `SQLConnector.read()` that accepts a Pandas DataFrame.

    chunksize: Optional[int], default -1
        How many rows to load into memory at once (when `chunk_hook` is provided).
        Otherwise the entire result set is loaded into memory.

    workers: Optional[int], default None
        How many threads to use when consuming the generator (when `chunk_hook is provided).
        Defaults to the number of cores.

    debug: bool, default False
        Verbosity toggle.
       
    Returns
    -------
    A pandas DataFrame or `None`.
    If `chunk_hook` is not None, return a list of the hook function's results.
    """
    meta_def = self.get_pipe_metadef(
        pipe,
        begin = begin,
        end = end,
        check_existing = check_existing,
        debug = debug,
        **kw
    )
    as_hook_results = chunk_hook is not None
    chunks = self.read(
        meta_def,
        chunk_hook = chunk_hook,
        as_hook_results = as_hook_results,
        chunksize = chunksize,
        workers = workers,
        debug = debug,
    )
    ### if sqlite, parse for datetimes
    if not as_hook_results and self.flavor == 'sqlite':
        from meerschaum.utils.misc import parse_df_datetimes
        ignore_cols = [
            col
            for col, dtype in pipe.dtypes.items()
            if 'datetime' not in str(dtype)
        ]
        return (
            parse_df_datetimes(
                chunk,
                ignore_cols = ignore_cols,
                debug = debug,
            )
            for chunk in chunks
        )
    return chunks
def fetch_pipes_keys(self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) ‑> Optional[List[Tuple[str, str, Optional[str]]]]

Return a list of tuples corresponding to the parameters provided.

Parameters

connector_keys : Optional[List[str]], default None
List of connector_keys to search by.
metric_keys : Optional[List[str]], default None
List of metric_keys to search by.
location_keys : Optional[List[str]], default None
List of location_keys to search by.
params : Optional[Dict[str, Any]], default None
Dictionary of additional parameters to search by. E.g. --params pipe_id:1
debug : bool, default False
Verbosity toggle.
Expand source code
def fetch_pipes_keys(
        self,
        connector_keys: Optional[List[str]] = None,
        metric_keys: Optional[List[str]] = None,
        location_keys: Optional[List[str]] = None,
        tags: Optional[List[str]] = None,
        params: Optional[Dict[str, Any]] = None,
        debug: bool = False
    ) -> Optional[List[Tuple[str, str, Optional[str]]]]:
    """
    Return a list of tuples corresponding to the parameters provided.

    Parameters
    ----------
    connector_keys: Optional[List[str]], default None
        List of connector_keys to search by.

    metric_keys: Optional[List[str]], default None
        List of metric_keys to search by.

    location_keys: Optional[List[str]], default None
        List of location_keys to search by.

    params: Optional[Dict[str, Any]], default None
        Dictionary of additional parameters to search by.
        E.g. `--params pipe_id:1`

    debug: bool, default False
        Verbosity toggle.
    """
    from meerschaum.utils.warnings import error
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.packages import attempt_import
    from meerschaum.utils.misc import separate_negation_values
    from meerschaum.utils.sql import OMIT_NULLSFIRST_FLAVORS, table_exists
    from meerschaum.config.static import STATIC_CONFIG
    sqlalchemy = attempt_import('sqlalchemy')
    import json
    from copy import deepcopy

    if connector_keys is None:
        connector_keys = []
    if metric_keys is None:
        metric_keys = []
    if location_keys is None:
        location_keys = []
    else:
        location_keys = [
            (lk if lk not in ('[None]', 'None', 'null') else None)
                for lk in location_keys
        ]
    if tags is None:
        tags = []

    if params is None:
        params = {}

    ### Add three primary keys to params dictionary
    ###   (separated for convenience of arguments).
    cols = {
        'connector_keys': connector_keys,
        'metric_key': metric_keys,
        'location_key': location_keys,
    }

    ### Make deep copy so we don't mutate this somewhere else.
    parameters = deepcopy(params)
    for col, vals in cols.items():
        ### Allow for IS NULL to be declared as a single-item list ([None]).
        if vals == [None]:
            vals = None
        if vals not in [[], ['*']]:
            parameters[col] = vals
    cols = {k: v for k, v in cols.items() if v != [None]}

    if not table_exists('pipes', self, debug=debug):
        return []

    from meerschaum.connectors.sql.tables import get_tables
    pipes_tbl = get_tables(mrsm_instance=self, create=False, debug=debug)['pipes']

    _params = {}
    for k, v in parameters.items():
        _v = json.dumps(v) if isinstance(v, dict) else v
        _params[k] = _v

    negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
    ### Parse regular params.
    ### If a param begins with '_', negate it instead.
    _where = [
        (
            (pipes_tbl.c[key] == val) if not str(val).startswith(negation_prefix)
            else (pipes_tbl.c[key] != key)
        ) for key, val in _params.items()
            if not isinstance(val, (list, tuple)) and key in pipes_tbl.c
    ]
    select_cols = (
        [pipes_tbl.c.connector_keys, pipes_tbl.c.metric_key, pipes_tbl.c.location_key]
        + ([pipes_tbl.c.parameters] if tags else [])
    )

    q = sqlalchemy.select(*select_cols).where(sqlalchemy.and_(True, *_where))

    ### Parse IN params and add OR IS NULL if None in list.
    for c, vals in cols.items():
        if not isinstance(vals, (list, tuple)) or not vals or not c in pipes_tbl.c:
            continue
        _in_vals, _ex_vals = separate_negation_values(vals)
        ### Include params (positive)
        q = (
            q.where(pipes_tbl.c[c].in_(_in_vals)) if None not in _in_vals
            else q.where(sqlalchemy.or_(pipes_tbl.c[c].in_(_in_vals), pipes_tbl.c[c].is_(None)))
        ) if _in_vals else q
        ### Exclude params (negative)
        q = q.where(pipes_tbl.c[c].not_in(_ex_vals)) if _ex_vals else q

    ### Finally, parse tags.
    _in_tags, _ex_tags = separate_negation_values(tags)
    ors = []
    for nt in _in_tags:
        ors.append(
            sqlalchemy.cast(
                pipes_tbl.c['parameters'],
                sqlalchemy.String,
            ).like(f'%"tags":%"{nt}"%')
        )
    q = q.where(sqlalchemy.and_(sqlalchemy.or_(*ors).self_group())) if ors else q
    ors = []
    for xt in _ex_tags:
        ors.append(
            sqlalchemy.cast(
                pipes_tbl.c['parameters'],
                sqlalchemy.String,
            ).not_like(f'%"tags":%"{xt}"%')
        )
    q = q.where(sqlalchemy.and_(sqlalchemy.or_(*ors).self_group())) if ors else q
    loc_asc = sqlalchemy.asc(pipes_tbl.c['location_key'])
    if self.flavor not in OMIT_NULLSFIRST_FLAVORS:
        loc_asc = sqlalchemy.nullsfirst(loc_asc)
    q = q.order_by(
        sqlalchemy.asc(pipes_tbl.c['connector_keys']),
        sqlalchemy.asc(pipes_tbl.c['metric_key']),
        loc_asc,
    )

    ### execute the query and return a list of tuples
    if debug:
        dprint(q.compile(compile_kwargs={'literal_binds': True}))
    try:
        rows = (
            self.execute(q).fetchall()
            if self.flavor != 'duckdb'
            else [
                (row['connector_keys'], row['metric_key'], row['location_key'])
                for row in self.read(q).to_dict(orient='records')
            ]
        )
    except Exception as e:
        error(str(e))

    _keys = [(row[0], row[1], row[2]) for row in rows]
    if not tags:
        return _keys
    ### Make 100% sure that the tags are correct.
    keys = []
    for row in rows:
        ktup = (row[0], row[1], row[2])
        _actual_tags = (
            json.loads(row[3]) if isinstance(row[3], str)
            else row[3]
        ).get('tags', [])
        for nt in _in_tags:
            if nt in _actual_tags:
                keys.append(ktup)
        for xt in _ex_tags:
            if xt in _actual_tags:
                keys.remove(ktup)
            else:
                keys.append(ktup)
    return keys
def get_add_columns_queries(self, pipe: mrsm.Pipe, df: Union[pd.DataFrame, Dict[str, str]], debug: bool = False) ‑> List[str]

Add new null columns of the correct type to a table from a dataframe.

Parameters

pipe : mrsm.Pipe
The pipe to be altered.
df : Union[pd.DataFrame, Dict[str, str]]
The pandas DataFrame which contains new columns. If a dictionary is provided, assume it maps columns to Pandas data types.

Returns

A list of the ALTER TABLE SQL query or queries to be executed on the provided connector.

Expand source code
def get_add_columns_queries(
        self,
        pipe: mrsm.Pipe,
        df: Union[pd.DataFrame, Dict[str, str]],
        debug: bool = False,
    ) -> List[str]:
    """
    Add new null columns of the correct type to a table from a dataframe.

    Parameters
    ----------
    pipe: mrsm.Pipe
        The pipe to be altered.

    df: Union[pd.DataFrame, Dict[str, str]]
        The pandas DataFrame which contains new columns.
        If a dictionary is provided, assume it maps columns to Pandas data types.

    Returns
    -------
    A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector.
    """
    if not pipe.exists(debug=debug):
        return []
    import copy
    from meerschaum.utils.sql import (
        sql_item_name,
        SINGLE_ALTER_TABLE_FLAVORS,
    )
    from meerschaum.utils.dtypes.sql import (
        get_pd_type_from_db_type,
        get_db_type_from_pd_type,
    )
    from meerschaum.utils.misc import flatten_list
    table_obj = self.get_pipe_table(pipe, debug=debug)
    is_dask = 'dask' in df.__module__ if not isinstance(df, dict) else False
    if is_dask:
        df = df.partitions[0].compute()
    df_cols_types = (
        {
            col: str(typ)
            for col, typ in df.dtypes.items()
        }
        if not isinstance(df, dict)
        else copy.deepcopy(df)
    )
    if not isinstance(df, dict) and len(df.index) > 0:
        for col, typ in list(df_cols_types.items()):
            if typ != 'object':
                continue
            val = df.iloc[0][col]
            if isinstance(val, (dict, list)):
                df_cols_types[col] = 'json'
            elif isinstance(val, str):
                df_cols_types[col] = 'str'
    db_cols_types = {
        col: get_pd_type_from_db_type(str(typ.type))
        for col, typ in table_obj.columns.items()
    }
    new_cols = set(df_cols_types) - set(db_cols_types)
    if not new_cols:
        return []

    new_cols_types = {
        col: get_db_type_from_pd_type(
            df_cols_types[col],
            self.flavor
        ) for col in new_cols
    }

    alter_table_query = "ALTER TABLE " + sql_item_name(pipe.target, self.flavor)
    queries = []
    for col, typ in new_cols_types.items():
        add_col_query = "\nADD " + sql_item_name(col, self.flavor) + " " + typ + ","

        if self.flavor in SINGLE_ALTER_TABLE_FLAVORS:
            queries.append(alter_table_query + add_col_query[:-1])
        else:
            alter_table_query += add_col_query

    ### For most flavors, only one query is required.
    ### This covers SQLite which requires one query per column.
    if not queries:
        queries.append(alter_table_query[:-1])

    if self.flavor != 'duckdb':
        return queries

    ### NOTE: For DuckDB, we must drop and rebuild the indices.
    drop_index_queries = list(flatten_list(
        [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()]
    ))
    create_index_queries = list(flatten_list(
        [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()]
    ))

    return drop_index_queries + queries + create_index_queries
def get_alter_columns_queries(self, pipe: mrsm.Pipe, df: Union[pd.DataFrame, Dict[str, str]], debug: bool = False) ‑> List[str]

If we encounter a column of a different type, set the entire column to text.

Parameters

pipe : mrsm.Pipe
The pipe to be altered.
df : Union[pd.DataFrame, Dict[str, str]]
The pandas DataFrame which may contain altered columns. If a dict is provided, assume it maps columns to Pandas data types.

Returns

A list of the ALTER TABLE SQL query or queries to be executed on the provided connector.

Expand source code
def get_alter_columns_queries(
        self,
        pipe: mrsm.Pipe,
        df: Union[pd.DataFrame, Dict[str, str]],
        debug: bool = False,
    ) -> List[str]:
    """
    If we encounter a column of a different type, set the entire column to text.

    Parameters
    ----------
    pipe: mrsm.Pipe
        The pipe to be altered.

    df: Union[pd.DataFrame, Dict[str, str]]
        The pandas DataFrame which may contain altered columns.
        If a dict is provided, assume it maps columns to Pandas data types.

    Returns
    -------
    A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector.
    """
    if not pipe.exists(debug=debug):
        return []
    from meerschaum.utils.sql import sql_item_name
    from meerschaum.utils.dtypes import are_dtypes_equal
    from meerschaum.utils.dtypes.sql import (
        get_pd_type_from_db_type,
        get_db_type_from_pd_type,
    )
    from meerschaum.utils.misc import flatten_list, generate_password
    table_obj = self.get_pipe_table(pipe, debug=debug)
    target = pipe.target
    session_id = generate_password(3)
    df_cols_types = (
        {
            col: str(typ)
            for col, typ in df.dtypes.items()
        }
        if not isinstance(df, dict)
        else df
    )
    db_cols_types = {
        col: get_pd_type_from_db_type(str(typ.type))
        for col, typ in table_obj.columns.items()
    }
    pd_db_df_aliases = {
        'int': 'bool',
    }

    altered_cols = {
        col: (db_cols_types.get(col, 'object'), typ)
        for col, typ in df_cols_types.items()
        if not are_dtypes_equal(typ, db_cols_types.get(col, 'object').lower())
        and not are_dtypes_equal(db_cols_types.get(col, 'object'), 'string')
    }

    ### NOTE: Sometimes bools are coerced into ints.
    altered_cols_to_ignore = set()
    for col, (db_typ, df_typ) in altered_cols.items():
        for db_alias, df_alias in pd_db_df_aliases.items():
            if db_alias in db_typ.lower() and df_alias in df_typ.lower():
                altered_cols_to_ignore.add(col)
                continue
    for col in altered_cols_to_ignore:
        _ = altered_cols.pop(col, None)
    if not altered_cols:
        return []

    text_type = get_db_type_from_pd_type('str', self.flavor)
    altered_cols_types = {
        col: text_type
        for col in altered_cols
    }

    if self.flavor == 'sqlite':
        temp_table_name = '_' + session_id + '_' + target
        rename_query = (
            "ALTER TABLE "
            + sql_item_name(target, self.flavor)
            + " RENAME TO "
            + sql_item_name(temp_table_name, self.flavor)
        )
        create_query = (
            "CREATE TABLE "
            + sql_item_name(target, self.flavor)
            + " (\n"
        )
        for col_name, col_obj in table_obj.columns.items():
            create_query += (
                sql_item_name(col_name, self.flavor)
                + " "
                + (str(col_obj.type) if col_name not in altered_cols else text_type)
                + ",\n"
            )
        create_query = create_query[:-2] + "\n)"

        insert_query = (
            "INSERT INTO "
            + sql_item_name(target, self.flavor)
            + "\nSELECT\n"
        )
        for col_name, col_obj in table_obj.columns.items():
            new_col_str = (
                sql_item_name(col_name, self.flavor)
                if col_name not in altered_cols
                else f"CAST({sql_item_name(col_name, self.flavor)} AS {text_type})"
            )
            insert_query += new_col_str + ",\n"
        insert_query = insert_query[:-2] + f"\nFROM {sql_item_name(temp_table_name, self.flavor)}"

        drop_query = f"DROP TABLE {sql_item_name(temp_table_name, self.flavor)}"
        return [
            rename_query,
            create_query,
            insert_query,
            drop_query,
        ]

    queries = []
    if self.flavor == 'oracle':
        add_query = "ALTER TABLE " + sql_item_name(target, self.flavor)
        for col, typ in altered_cols_types.items():
            add_query += "\nADD " + sql_item_name(col + '_temp', self.flavor) + " " + typ + ","
        add_query = add_query[:-1]
        queries.append(add_query)

        populate_temp_query = "UPDATE " + sql_item_name(target, self.flavor)
        for col, typ in altered_cols_types.items():
            populate_temp_query += (
                "\nSET " + sql_item_name(col + '_temp', self.flavor)
                + ' = ' + sql_item_name(col, self.flavor) + ','
            )
        populate_temp_query = populate_temp_query[:-1]
        queries.append(populate_temp_query)

        set_old_cols_to_null_query = "UPDATE " + sql_item_name(target, self.flavor)
        for col, typ in altered_cols_types.items():
            set_old_cols_to_null_query += (
                "\nSET " + sql_item_name(col, self.flavor)
                + ' = NULL,'
            )
        set_old_cols_to_null_query = set_old_cols_to_null_query[:-1]
        queries.append(set_old_cols_to_null_query)

        alter_type_query = "ALTER TABLE " + sql_item_name(target, self.flavor)
        for col, typ in altered_cols_types.items():
            alter_type_query += (
                "\nMODIFY " + sql_item_name(col, self.flavor) + ' '
                + typ + ','
            )
        alter_type_query = alter_type_query[:-1]
        queries.append(alter_type_query)

        set_old_to_temp_query = "UPDATE " + sql_item_name(target, self.flavor)
        for col, typ in altered_cols_types.items():
            set_old_to_temp_query += (
                "\nSET " + sql_item_name(col, self.flavor)
                + ' = ' + sql_item_name(col + '_temp', self.flavor) + ','
            )
        set_old_to_temp_query = set_old_to_temp_query[:-1]
        queries.append(set_old_to_temp_query)

        drop_temp_query = "ALTER TABLE " + sql_item_name(target, self.flavor)
        for col, typ in altered_cols_types.items():
            drop_temp_query += (
                "\nDROP COLUMN " + sql_item_name(col + '_temp', self.flavor) + ','
            )
        drop_temp_query = drop_temp_query[:-1]
        queries.append(drop_temp_query)

        return queries


    query = "ALTER TABLE " + sql_item_name(target, self.flavor)
    for col, typ in altered_cols_types.items():
        alter_col_prefix = (
            'ALTER' if self.flavor not in ('mysql', 'mariadb', 'oracle')
            else 'MODIFY'
        )
        type_prefix = (
            '' if self.flavor in ('mssql', 'mariadb', 'mysql')
            else 'TYPE '
        )
        column_str = 'COLUMN' if self.flavor != 'oracle' else ''
        query += (
            f"\n{alter_col_prefix} {column_str} "
            + sql_item_name(col, self.flavor)
            + " " + type_prefix + typ + ","
        )

    query = query[:-1]
    queries.append(query)
    if self.flavor != 'duckdb':
        return queries

    drop_index_queries = list(flatten_list(
        [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()]
    ))
    create_index_queries = list(flatten_list(
        [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()]
    ))

    return drop_index_queries + queries + create_index_queries
def get_create_index_queries(self, pipe: mrsm.Pipe, debug: bool = False) ‑> Dict[str, List[str]]

Return a dictionary mapping columns to a CREATE INDEX or equivalent query.

Parameters

pipe : mrsm.Pipe
The pipe to which the queries will correspond.

Returns

A dictionary of column names mapping to lists of queries.

Expand source code
def get_create_index_queries(
        self,
        pipe: mrsm.Pipe,
        debug: bool = False,
    ) -> Dict[str, List[str]]:
    """
    Return a dictionary mapping columns to a `CREATE INDEX` or equivalent query.

    Parameters
    ----------
    pipe: mrsm.Pipe
        The pipe to which the queries will correspond.

    Returns
    -------
    A dictionary of column names mapping to lists of queries.
    """
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.sql import sql_item_name, get_distinct_col_count
    from meerschaum.utils.warnings import warn
    from meerschaum.config import get_config
    index_queries = {}

    indices = pipe.get_indices()

    _datetime = pipe.get_columns('datetime', error=False)
    _datetime_type = pipe.dtypes.get(_datetime, 'datetime64[ns]')
    _datetime_name = sql_item_name(_datetime, self.flavor) if _datetime is not None else None
    _datetime_index_name = (
        sql_item_name(indices['datetime'], self.flavor) if indices.get('datetime', None)
        else None
    )
    _id = pipe.get_columns('id', error=False)
    _id_name = sql_item_name(_id, self.flavor) if _id is not None else None

    _id_index_name = sql_item_name(indices['id'], self.flavor) if indices.get('id') else None
    _pipe_name = sql_item_name(pipe.target, self.flavor)
    _create_space_partition = get_config('system', 'experimental', 'space')

    ### create datetime index
    if _datetime is not None:
        if self.flavor == 'timescaledb':
            _id_count = (
                get_distinct_col_count(_id, f"SELECT {_id_name} FROM {_pipe_name}", self)
                if (_id is not None and _create_space_partition) else None
            )

            chunk_interval = pipe.get_chunk_interval(debug=debug)
            chunk_interval_minutes = (
                chunk_interval
                if isinstance(chunk_interval, int)
                else int(chunk_interval.total_seconds() / 60)
            )
            chunk_time_interval = (
                f"INTERVAL '{chunk_interval_minutes} MINUTES'"
                if isinstance(chunk_interval, timedelta)
                else f'{chunk_interval_minutes}'
            )

            dt_query = (
                f"SELECT create_hypertable('{_pipe_name}', " +
                f"'{_datetime}', "
                + (
                    f"'{_id}', {_id_count}, " if (_id is not None and _create_space_partition)
                    else ''
                )
                + f'chunk_time_interval => {chunk_time_interval}, '
                + "migrate_data => true);"
            )
        else: ### mssql, sqlite, etc.
            dt_query = (
                f"CREATE INDEX {_datetime_index_name} "
                + f"ON {_pipe_name} ({_datetime_name})"
            )

        index_queries[_datetime] = [dt_query]

    ### create id index
    if _id_name is not None:
        if self.flavor == 'timescaledb':
            ### Already created indices via create_hypertable.
            id_query = (
                None if (_id is not None and _create_space_partition)
                else (
                    f"CREATE INDEX {_id_index_name} ON {_pipe_name} ({_id_name})"
                    if _id is not None
                    else None
                )
            )
            pass
        elif self.flavor == 'citus':
            id_query = [(
                f"CREATE INDEX {_id_index_name} "
                + f"ON {_pipe_name} ({_id_name});"
            ), (
                f"SELECT create_distributed_table('{_pipe_name}', '{_id}');"
            )]
        else: ### mssql, sqlite, etc.
            id_query = f"CREATE INDEX {_id_index_name} ON {_pipe_name} ({_id_name})"

        if id_query is not None:
            index_queries[_id] = [id_query]


    ### Create indices for other labels in `pipe.columns`.
    other_indices = {
        ix_key: ix_unquoted
        for ix_key, ix_unquoted in pipe.get_indices().items()
        if ix_key not in ('datetime', 'id')
    }
    for ix_key, ix_unquoted in other_indices.items():
        ix_name = sql_item_name(ix_unquoted, self.flavor)
        col = pipe.columns[ix_key]
        col_name = sql_item_name(col, self.flavor)
        index_queries[col] = [f"CREATE INDEX {ix_name} ON {_pipe_name} ({col_name})"]

    return index_queries
def get_drop_index_queries(self, pipe: mrsm.Pipe, debug: bool = False) ‑> Dict[str, List[str]]

Return a dictionary mapping columns to a DROP INDEX or equivalent query.

Parameters

pipe : mrsm.Pipe
The pipe to which the queries will correspond.

Returns

A dictionary of column names mapping to lists of queries.

Expand source code
def get_drop_index_queries(
        self,
        pipe: mrsm.Pipe,
        debug: bool = False,
    ) -> Dict[str, List[str]]:
    """
    Return a dictionary mapping columns to a `DROP INDEX` or equivalent query.

    Parameters
    ----------
    pipe: mrsm.Pipe
        The pipe to which the queries will correspond.

    Returns
    -------
    A dictionary of column names mapping to lists of queries.
    """
    if not pipe.exists(debug=debug):
        return {}
    from meerschaum.utils.sql import sql_item_name, table_exists, hypertable_queries
    drop_queries = {}
    indices = pipe.get_indices()
    pipe_name = sql_item_name(pipe.target, self.flavor)

    if self.flavor not in hypertable_queries:
        is_hypertable = False
    else:
        is_hypertable_query = hypertable_queries[self.flavor].format(table_name=pipe_name)
        is_hypertable = self.value(is_hypertable_query, silent=True, debug=debug) is not None

    if is_hypertable:
        nuke_queries = []
        temp_table = '_' + pipe.target + '_temp_migration'
        temp_table_name = sql_item_name(temp_table, self.flavor)

        if table_exists(temp_table, self, debug=debug):
            nuke_queries.append(f"DROP TABLE {temp_table_name}")
        nuke_queries += [
            f"SELECT * INTO {temp_table_name} FROM {pipe_name}",
            f"DROP TABLE {pipe_name}",
            f"ALTER TABLE {temp_table_name} RENAME TO {pipe_name}",
        ]
        nuke_ix_keys = ('datetime', 'id')
        nuked = False
        for ix_key in nuke_ix_keys:
            if ix_key in indices and not nuked:
                drop_queries[ix_key] = nuke_queries
                nuked = True

    drop_queries.update({
        ix_key: ["DROP INDEX " + sql_item_name(ix_unquoted, self.flavor)]
        for ix_key, ix_unquoted in indices.items()
        if ix_key not in drop_queries
    })
    return drop_queries
def get_pipe_attributes(self, pipe: mrsm.Pipe, debug: bool = False) ‑> Dict[str, Any]

Get a Pipe's attributes dictionary.

Expand source code
def get_pipe_attributes(
        self,
        pipe: mrsm.Pipe,
        debug: bool = False,
    ) -> Dict[str, Any]:
    """
    Get a Pipe's attributes dictionary.
    """
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.debug import dprint
    from meerschaum.connectors.sql.tables import get_tables
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    if pipe.get_id(debug=debug) is None:
        return {}

    pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes']

    try:
        q = sqlalchemy.select(pipes_tbl).where(pipes_tbl.c.pipe_id == pipe.id)
        if debug:
            dprint(q)
        attributes = (
            dict(self.exec(q, silent=True, debug=debug).first()._mapping)
            if self.flavor != 'duckdb'
            else self.read(q, debug=debug).to_dict(orient='records')[0]
        )
    except Exception as e:
        import traceback
        traceback.print_exc()
        warn(e)
        print(pipe)
        return {}

    ### handle non-PostgreSQL databases (text vs JSON)
    if not isinstance(attributes.get('parameters', None), dict):
        try:
            import json
            parameters = json.loads(attributes['parameters'])
            if isinstance(parameters, str) and parameters[0] == '{':
                parameters = json.loads(parameters)
            attributes['parameters'] = parameters
        except Exception as e:
            attributes['parameters'] = {}

    return attributes
def get_pipe_columns_types(self, pipe: mrsm.Pipe, debug: bool = False) ‑> Optional[Dict[str, str]]

Get the pipe's columns and types.

Parameters

pipe : mrsm.Pipe:
The pipe to get the columns for.

Returns

A dictionary of columns names (str) and types (str).

Examples

>>> conn.get_pipe_columns_types(pipe)
{
  'dt': 'TIMESTAMP WITHOUT TIMEZONE',
  'id': 'BIGINT',
  'val': 'DOUBLE PRECISION',
}
>>>
Expand source code
def get_pipe_columns_types(
        self,
        pipe: mrsm.Pipe,
        debug: bool = False,
    ) -> Optional[Dict[str, str]]:
    """
    Get the pipe's columns and types.

    Parameters
    ----------
    pipe: mrsm.Pipe:
        The pipe to get the columns for.
        
    Returns
    -------
    A dictionary of columns names (`str`) and types (`str`).

    Examples
    --------
    >>> conn.get_pipe_columns_types(pipe)
    {
      'dt': 'TIMESTAMP WITHOUT TIMEZONE',
      'id': 'BIGINT',
      'val': 'DOUBLE PRECISION',
    }
    >>> 
    """
    if not pipe.exists(debug=debug):
        return {}
    table_columns = {}
    try:
        pipe_table = self.get_pipe_table(pipe, debug=debug)
        if pipe_table is None:
            return {}
        for col in pipe_table.columns:
            table_columns[str(col.name)] = str(col.type)
    except Exception as e:
        import traceback
        traceback.print_exc()
        from meerschaum.utils.warnings import warn
        warn(e)
        table_columns = None

    return table_columns
def get_pipe_data(self, pipe: Optional[mrsm.Pipe] = None, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[datetime, str, None] = None, end: Union[datetime, str, None] = None, params: Optional[Dict[str, Any]] = None, order: str = 'asc', limit: Optional[int] = None, begin_add_minutes: int = 0, end_add_minutes: int = 0, debug: bool = False, **kw: Any) ‑> Union[pd.DataFrame, None]

Access a pipe's data from the SQL instance.

Parameters

pipe : mrsm.Pipe:
The pipe to get data from.
select_columns : Optional[List[str]], default None
If provided, only select these given columns. Otherwise select all available columns (i.e. SELECT *).
omit_columns : Optional[List[str]], default None
If provided, remove these columns from the selection.
begin : Union[datetime, str, None], default None
If provided, get rows newer than or equal to this value.
end : Union[datetime, str, None], default None
If provided, get rows older than or equal to this value.
params : Optional[Dict[str, Any]], default None
Additional parameters to filter by. See meerschaum.connectors.sql.build_where.
order : Optional[str], default 'asc'
The selection order for all of the indices in the query. If None, omit the ORDER BY clause.
limit : Optional[int], default None
If specified, limit the number of rows retrieved to this value.
begin_add_minutes : int, default 0
The number of minutes to add to the begin datetime (i.e. DATEADD.
end_add_minutes : int, default 0
The number of minutes to add to the end datetime (i.e. DATEADD.
chunksize : Optional[int], default -1
The size of dataframe chunks to load into memory.
debug : bool, default False
Verbosity toggle.

Returns

A pd.DataFrame of the pipe's data.

Expand source code
def get_pipe_data(
        self,
        pipe: Optional[mrsm.Pipe] = None,
        select_columns: Optional[List[str]] = None,
        omit_columns: Optional[List[str]] = None,
        begin: Union[datetime, str, None] = None,
        end: Union[datetime, str, None] = None,
        params: Optional[Dict[str, Any]] = None,
        order: str = 'asc',
        limit: Optional[int] = None,
        begin_add_minutes: int = 0,
        end_add_minutes: int = 0,
        debug: bool = False,
        **kw: Any
    ) -> Union[pd.DataFrame, None]:
    """
    Access a pipe's data from the SQL instance.

    Parameters
    ----------
    pipe: mrsm.Pipe:
        The pipe to get data from.

    select_columns: Optional[List[str]], default None
        If provided, only select these given columns.
        Otherwise select all available columns (i.e. `SELECT *`).

    omit_columns: Optional[List[str]], default None
        If provided, remove these columns from the selection.

    begin: Union[datetime, str, None], default None
        If provided, get rows newer than or equal to this value.

    end: Union[datetime, str, None], default None
        If provided, get rows older than or equal to this value.

    params: Optional[Dict[str, Any]], default None
        Additional parameters to filter by.
        See `meerschaum.connectors.sql.build_where`.

    order: Optional[str], default 'asc'
        The selection order for all of the indices in the query.
        If `None`, omit the `ORDER BY` clause.

    limit: Optional[int], default None
        If specified, limit the number of rows retrieved to this value.

    begin_add_minutes: int, default 0
        The number of minutes to add to the `begin` datetime (i.e. `DATEADD`.

    end_add_minutes: int, default 0
        The number of minutes to add to the `end` datetime (i.e. `DATEADD`.

    chunksize: Optional[int], default -1
        The size of dataframe chunks to load into memory.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `pd.DataFrame` of the pipe's data.

    """
    import json
    from meerschaum.utils.sql import sql_item_name
    from meerschaum.utils.misc import parse_df_datetimes, to_pandas_dtype
    from meerschaum.utils.packages import import_pandas
    pd = import_pandas()
    is_dask = 'dask' in pd.__name__

    dtypes = pipe.dtypes
    if dtypes:
        if self.flavor == 'sqlite':
            if not pipe.columns.get('datetime', None):
                _dt = pipe.guess_datetime()
                dt = sql_item_name(_dt, self.flavor) if _dt else None
                is_guess = True
            else:
                _dt = pipe.get_columns('datetime')
                dt = sql_item_name(_dt, self.flavor)
                is_guess = False

            if _dt:
                dt_type = dtypes.get(_dt, 'object').lower()
                if 'datetime' not in dt_type:
                    if 'int' not in dt_type:
                        dtypes[_dt] = 'datetime64[ns]'
    existing_cols = pipe.get_columns_types(debug=debug)
    select_columns = (
        [
            col
            for col in existing_cols
            if col not in (omit_columns or [])
        ]
        if not select_columns
        else [
            col
            for col in select_columns
            if col in existing_cols
            and col not in (omit_columns or [])
        ]
    )
    if select_columns:
        dtypes = {col: typ for col, typ in dtypes.items() if col in select_columns}
    dtypes = {
        col: to_pandas_dtype(typ)
        for col, typ in dtypes.items()
        if col in select_columns and col not in (omit_columns or [])
    }
    query = self.get_pipe_data_query(
        pipe,
        select_columns = select_columns,
        omit_columns = omit_columns,
        begin = begin,
        end = end,
        params = params,
        order = order,
        limit = limit,
        begin_add_minutes = begin_add_minutes,
        end_add_minutes = end_add_minutes,
        debug = debug,
        **kw
    )

    if is_dask:
        index_col = pipe.columns.get('datetime', None)
        kw['index_col'] = index_col

    df = self.read(
        query,
        dtype = dtypes,
        debug = debug,
        **kw
    )
    if self.flavor == 'sqlite':
        ### NOTE: We have to consume the iterator here to ensure that datetimes are parsed correctly
        df = (
            parse_df_datetimes(
                df,
                ignore_cols = [
                    col
                    for col, dtype in pipe.dtypes.items()
                    if 'datetime' not in str(dtype)
                ],
                chunksize = kw.get('chunksize', None),
                debug = debug,
            ) if isinstance(df, pd.DataFrame) else (
                [
                    parse_df_datetimes(
                        c,
                        ignore_cols = [
                            col
                            for col, dtype in pipe.dtypes.items()
                            if 'datetime' not in str(dtype)
                        ],
                        chunksize = kw.get('chunksize', None),
                        debug = debug,
                    )
                    for c in df
                ]
            )
        )
        for col, typ in dtypes.items():
            if typ != 'json':
                continue
            df[col] = df[col].apply(lambda x: json.loads(x) if x is not None else x)
    return df
def get_pipe_data_query(self, pipe: mrsm.Pipe, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[datetime, int, str, None] = None, end: Union[datetime, int, str, None] = None, params: Optional[Dict[str, Any]] = None, order: str = 'asc', limit: Optional[int] = None, begin_add_minutes: int = 0, end_add_minutes: int = 0, replace_nulls: Optional[str] = None, debug: bool = False, **kw: Any) ‑> Optional[str]

Return the SELECT query for retrieving a pipe's data from its instance.

Parameters

pipe : mrsm.Pipe:
The pipe to get data from.
select_columns : Optional[List[str]], default None
If provided, only select these given columns. Otherwise select all available columns (i.e. SELECT *).
omit_columns : Optional[List[str]], default None
If provided, remove these columns from the selection.
begin : Union[datetime, int, str, None], default None
If provided, get rows newer than or equal to this value.
end : Union[datetime, str, None], default None
If provided, get rows older than or equal to this value.
params : Optional[Dict[str, Any]], default None
Additional parameters to filter by. See meerschaum.connectors.sql.build_where.
order : Optional[str], default 'asc'
The selection order for all of the indices in the query. If None, omit the ORDER BY clause.
limit : Optional[int], default None
If specified, limit the number of rows retrieved to this value.
begin_add_minutes : int, default 0
The number of minutes to add to the begin datetime (i.e. DATEADD).
end_add_minutes : int, default 0
The number of minutes to add to the end datetime (i.e. DATEADD).
chunksize : Optional[int], default -1
The size of dataframe chunks to load into memory.
replace_nulls : Optional[str], default None
If provided, replace null values with this value.
debug : bool, default False
Verbosity toggle.

Returns

A SELECT query to retrieve a pipe's data.

Expand source code
def get_pipe_data_query(
        self,
        pipe: mrsm.Pipe,
        select_columns: Optional[List[str]] = None,
        omit_columns: Optional[List[str]] = None,
        begin: Union[datetime, int, str, None] = None,
        end: Union[datetime, int, str, None] = None,
        params: Optional[Dict[str, Any]] = None,
        order: str = 'asc',
        limit: Optional[int] = None,
        begin_add_minutes: int = 0,
        end_add_minutes: int = 0,
        replace_nulls: Optional[str] = None,
        debug: bool = False,
        **kw: Any
    ) -> Union[str, None]:
    """
    Return the `SELECT` query for retrieving a pipe's data from its instance.

    Parameters
    ----------
    pipe: mrsm.Pipe:
        The pipe to get data from.

    select_columns: Optional[List[str]], default None
        If provided, only select these given columns.
        Otherwise select all available columns (i.e. `SELECT *`).

    omit_columns: Optional[List[str]], default None
        If provided, remove these columns from the selection.

    begin: Union[datetime, int, str, None], default None
        If provided, get rows newer than or equal to this value.

    end: Union[datetime, str, None], default None
        If provided, get rows older than or equal to this value.

    params: Optional[Dict[str, Any]], default None
        Additional parameters to filter by.
        See `meerschaum.connectors.sql.build_where`.

    order: Optional[str], default 'asc'
        The selection order for all of the indices in the query.
        If `None`, omit the `ORDER BY` clause.

    limit: Optional[int], default None
        If specified, limit the number of rows retrieved to this value.

    begin_add_minutes: int, default 0
        The number of minutes to add to the `begin` datetime (i.e. `DATEADD`).

    end_add_minutes: int, default 0
        The number of minutes to add to the `end` datetime (i.e. `DATEADD`).

    chunksize: Optional[int], default -1
        The size of dataframe chunks to load into memory.

    replace_nulls: Optional[str], default None
        If provided, replace null values with this value.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `SELECT` query to retrieve a pipe's data.
    """
    import json
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.misc import items_str
    from meerschaum.utils.sql import sql_item_name, dateadd_str
    from meerschaum.utils.packages import import_pandas
    from meerschaum.utils.warnings import warn
    pd = import_pandas()
    existing_cols = pipe.get_columns_types(debug=debug)
    select_columns = (
        [col for col in existing_cols]
        if not select_columns
        else [col for col in select_columns if col in existing_cols]
    )
    if omit_columns:
        select_columns = [col for col in select_columns if col not in omit_columns]

    if begin == '':
        begin = pipe.get_sync_time(debug=debug)
        backtrack_interval = pipe.get_backtrack_interval(debug=debug)
        if begin is not None:
            begin -= backtrack_interval

    cols_names = [sql_item_name(col, self.flavor) for col in select_columns]
    select_cols_str = (
        'SELECT\n'
        + ',\n    '.join(
            [
                (
                    col_name
                    if not replace_nulls
                    else f"COALESCE(col_name, '{replace_nulls}') AS {col_name}"
                )
                for col_name in cols_names
            ]
        )
    )
    pipe_table_name = sql_item_name(pipe.target, self.flavor)
    query = f"{select_cols_str}\nFROM {pipe_table_name}"
    where = ""

    if order is not None:
        default_order = 'asc'
        if order not in ('asc', 'desc'):
            warn(f"Ignoring unsupported order '{order}'. Falling back to '{default_order}'.")
            order = default_order
        order = order.upper()

    if not pipe.columns.get('datetime', None):
        _dt = pipe.guess_datetime()
        dt = sql_item_name(_dt, self.flavor) if _dt else None
        is_guess = True
    else:
        _dt = pipe.get_columns('datetime')
        dt = sql_item_name(_dt, self.flavor)
        is_guess = False

    quoted_indices = {
        key: sql_item_name(val, self.flavor)
        for key, val in pipe.columns.items()
        if val in existing_cols
    }

    if begin is not None or end is not None:
        if is_guess:
            if _dt is None:
                warn(
                    f"No datetime could be determined for {pipe}."
                    + "\n    Ignoring begin and end...",
                    stack = False,
                )
                begin, end = None, None
            else:
                warn(
                    f"A datetime wasn't specified for {pipe}.\n"
                    + f"    Using column \"{_dt}\" for datetime bounds...",
                    stack = False,
                )

    is_dt_bound = False
    if begin is not None and _dt in existing_cols:
        begin_da = dateadd_str(
            flavor = self.flavor,
            datepart = 'minute',
            number = begin_add_minutes,
            begin = begin
        )
        where += f"{dt} >= {begin_da}" + (" AND " if end is not None else "")
        is_dt_bound = True

    if end is not None and _dt in existing_cols:
        if 'int' in str(type(end)).lower() and end == begin:
            end += 1
        end_da = dateadd_str(
            flavor = self.flavor,
            datepart = 'minute',
            number = end_add_minutes,
            begin = end
        )
        where += f"{dt} < {end_da}"
        is_dt_bound = True

    if params is not None:
        from meerschaum.utils.sql import build_where
        valid_params = {k: v for k, v in params.items() if k in existing_cols}
        if valid_params:
            where += build_where(valid_params, self).replace(
                'WHERE', ('AND' if is_dt_bound else "")
            )

    if len(where) > 0:
        query += "\nWHERE " + where

    if order is not None:
        ### Sort by indices, starting with datetime.
        order_by = ""
        if quoted_indices:
            order_by += "\nORDER BY "
            if _dt and _dt in existing_cols:
                order_by += dt + ' ' + order + ','
            for key, quoted_col_name in quoted_indices.items():
                if key == 'datetime':
                    continue
                order_by += ' ' + quoted_col_name + ' ' + order + ','
            order_by = order_by[:-1]

        query += order_by

    if isinstance(limit, int):
        if self.flavor == 'mssql':
            query = f'SELECT TOP {limit} ' + query[len("SELECT *"):]
        elif self.flavor == 'oracle':
            query = f"SELECT * FROM (\n  {query}\n)\nWHERE ROWNUM = 1"
        else:
            query += f"\nLIMIT {limit}"
    
    if debug:
        to_print = (
            []
            + ([f"begin='{begin}'"] if begin else [])
            + ([f"end='{end}'"] if end else [])
            + ([f"params='{json.dumps(params)}'"] if params else [])
        )
        dprint("Getting pipe data with constraints: " + items_str(to_print, quotes=False))

    return query
def get_pipe_id(self, pipe: mrsm.Pipe, debug: bool = False) ‑> Any

Get a Pipe's ID from the pipes table.

Expand source code
def get_pipe_id(
        self,
        pipe: mrsm.Pipe,
        debug: bool = False,
    ) -> Any:
    """
    Get a Pipe's ID from the pipes table.
    """
    if pipe.temporary:
        return None
    from meerschaum.utils.packages import attempt_import
    import json
    sqlalchemy = attempt_import('sqlalchemy')
    from meerschaum.connectors.sql.tables import get_tables
    pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes']

    query = sqlalchemy.select(pipes_tbl.c.pipe_id).where(
        pipes_tbl.c.connector_keys == pipe.connector_keys
    ).where(
        pipes_tbl.c.metric_key == pipe.metric_key
    ).where(
        (pipes_tbl.c.location_key == pipe.location_key) if pipe.location_key is not None
        else pipes_tbl.c.location_key.is_(None)
    )
    _id = self.value(query, debug=debug, silent=pipe.temporary)
    if _id is not None:
        _id = int(_id)
    return _id
def get_pipe_metadef(self, pipe: Pipe, params: Optional[Dict[str, Any]] = None, begin: Union[datetime, int, str, None] = '', end: Union[datetime, int, str, None] = None, check_existing: bool = True, debug: bool = False, **kw: Any) ‑> Union[str, None]

Return a pipe's meta definition fetch query.

params: Optional[Dict[str, Any]], default None Optional params dictionary to build the WHERE clause. See build_where().

begin: Union[datetime, int, str, None], default None Most recent datatime to search for data. If backtrack_minutes is provided, subtract backtrack_minutes.

end: Union[datetime, int, str, None], default None The latest datetime to search for data. If end is None, do not bound

check_existing: bool, default True If True, apply the backtrack interval.

debug: bool, default False Verbosity toggle.

Returns

A pipe's meta definition fetch query string.

Expand source code
def get_pipe_metadef(
        self,
        pipe: meerschaum.Pipe,
        params: Optional[Dict[str, Any]] = None,
        begin: Union[datetime, int, str, None] = '',
        end: Union[datetime, int, str, None] = None,
        check_existing: bool = True,
        debug: bool = False,
        **kw: Any
    ) -> Union[str, None]:
    """
    Return a pipe's meta definition fetch query.

    params: Optional[Dict[str, Any]], default None
        Optional params dictionary to build the `WHERE` clause.
        See `meerschaum.utils.sql.build_where`.

    begin: Union[datetime, int, str, None], default None
        Most recent datatime to search for data.
        If `backtrack_minutes` is provided, subtract `backtrack_minutes`.

    end: Union[datetime, int, str, None], default None
        The latest datetime to search for data.
        If `end` is `None`, do not bound 

    check_existing: bool, default True
        If `True`, apply the backtrack interval.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A pipe's meta definition fetch query string.
    """
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.warnings import warn, error
    from meerschaum.utils.sql import sql_item_name, dateadd_str, build_where
    from meerschaum.utils.misc import is_int
    from meerschaum.config import get_config

    definition = get_pipe_query(pipe)

    if not pipe.columns.get('datetime', None):
        _dt = pipe.guess_datetime()
        dt_name = sql_item_name(_dt, self.flavor) if _dt else None
        is_guess = True
    else:
        _dt = pipe.get_columns('datetime')
        dt_name = sql_item_name(_dt, self.flavor)
        is_guess = False

    if begin not in (None, '') or end is not None:
        if is_guess:
            if _dt is None:
                warn(
                    f"Unable to determine a datetime column for {pipe}."
                    + "\n    Ignoring begin and end...",
                    stack = False,
                )
                begin, end = '', None
            else:
                warn(
                    f"A datetime wasn't specified for {pipe}.\n"
                    + f"    Using column \"{_dt}\" for datetime bounds...",
                    stack = False
                )


    if 'order by' in definition.lower() and 'over' not in definition.lower():
        error("Cannot fetch with an ORDER clause in the definition")

    apply_backtrack = begin == '' and check_existing
    backtrack_interval = pipe.get_backtrack_interval(check_existing=check_existing, debug=debug)
    btm = (
        int(backtrack_interval.total_seconds() / 60)
        if isinstance(backtrack_interval, timedelta)
        else backtrack_interval
    )
    begin = (
        pipe.get_sync_time(debug=debug)
        if begin == ''
        else begin
    )

    if begin and end and begin >= end:
        begin = None
    
    da = None
    if dt_name:
        begin_da = (
            dateadd_str(
                flavor = self.flavor,
                datepart = 'minute',
                number = ((-1 * btm) if apply_backtrack else 0), 
                begin = begin,
            )
            if begin
            else None
        )
        end_da = (
            dateadd_str(
                flavor = self.flavor,
                datepart = 'minute',
                number = 0,
                begin = end,
            )
            if end
            else None
        )

    meta_def = (
        _simple_fetch_query(pipe) if (
            (not (pipe.columns or {}).get('id', None))
            or (not get_config('system', 'experimental', 'join_fetch'))
        ) else _join_fetch_query(pipe, debug=debug, **kw)
    )

    has_where = 'where' in meta_def.lower()[meta_def.lower().rfind('definition'):]
    if dt_name and (begin_da or end_da):
        definition_dt_name = (
            dateadd_str(self.flavor, 'minute', 0, f"definition.{dt_name}")
            if not is_int((begin_da or end_da))
            else f"definition.{dt_name}"
        )
        meta_def += "\n" + ("AND" if has_where else "WHERE") + " "
        has_where = True
        if begin_da:
            meta_def += f"{definition_dt_name} >= {begin_da}"
        if begin_da and end_da:
            meta_def += " AND "
        if end_da:
            meta_def += f"{definition_dt_name} < {end_da}"

    if params is not None:
        params_where = build_where(params, self, with_where=False)
        meta_def += "\n" + ("AND" if has_where else "WHERE") + " "
        has_where = True
        meta_def += params_where

    return meta_def
def get_pipe_rowcount(self, pipe: mrsm.Pipe, begin: Union[datetime, int, None] = None, end: Union[datetime, int, None] = None, params: Optional[Dict[str, Any]] = None, remote: bool = False, debug: bool = False) ‑> Optional[int]

Get the rowcount for a pipe in accordance with given parameters.

Parameters

pipe : mrsm.Pipe
The pipe to query with.
begin : Union[datetime, int, None], default None
The begin datetime value.
end : Union[datetime, int, None], default None
The end datetime value.
params : Optional[Dict[str, Any]], default None
See build_where().
remote : bool, default False
If True, get the rowcount for the remote table.
debug : bool, default False
Verbosity toggle.

Returns

An int for the number of rows if the pipe exists, otherwise None.

Expand source code
def get_pipe_rowcount(
        self,
        pipe: mrsm.Pipe,
        begin: Union[datetime, int, None] = None,
        end: Union[datetime, int, None] = None,
        params: Optional[Dict[str, Any]] = None,
        remote: bool = False,
        debug: bool = False
    ) -> Union[int, None]:
    """
    Get the rowcount for a pipe in accordance with given parameters.

    Parameters
    ----------
    pipe: mrsm.Pipe
        The pipe to query with.
        
    begin: Union[datetime, int, None], default None
        The begin datetime value.

    end: Union[datetime, int, None], default None
        The end datetime value.

    params: Optional[Dict[str, Any]], default None
        See `meerschaum.utils.sql.build_where`.

    remote: bool, default False
        If `True`, get the rowcount for the remote table.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    An `int` for the number of rows if the `pipe` exists, otherwise `None`.

    """
    from meerschaum.utils.sql import dateadd_str, sql_item_name, NO_CTE_FLAVORS
    from meerschaum.utils.warnings import error, warn
    from meerschaum.connectors.sql._fetch import get_pipe_query
    if remote:
        msg = f"'fetch:definition' must be an attribute of {pipe} to get a remote rowcount."
        if 'fetch' not in pipe.parameters:
            error(msg)
            return None
        if 'definition' not in pipe.parameters['fetch']:
            error(msg)
            return None

    _pipe_name = sql_item_name(pipe.target, self.flavor)

    if not pipe.columns.get('datetime', None):
        _dt = pipe.guess_datetime()
        dt = sql_item_name(_dt, self.flavor) if _dt else None
        is_guess = True
    else:
        _dt = pipe.get_columns('datetime')
        dt = sql_item_name(_dt, self.flavor)
        is_guess = False

    if begin is not None or end is not None:
        if is_guess:
            if _dt is None:
                warn(
                    f"No datetime could be determined for {pipe}."
                    + "\n    Ignoring begin and end...",
                    stack = False,
                )
                begin, end = None, None
            else:
                warn(
                    f"A datetime wasn't specified for {pipe}.\n"
                    + f"    Using column \"{_dt}\" for datetime bounds...",
                    stack = False,
                )


    _datetime_name = sql_item_name(
        _dt,
        pipe.instance_connector.flavor if not remote else pipe.connector.flavor
    )
    _cols_names = [
        sql_item_name(col, pipe.instance_connector.flavor if not remote else pipe.connector.flavor)
        for col in set(
            ([_dt] if _dt else [])
            + ([] if params is None else list(params.keys()))
        )
    ]
    if not _cols_names:
        _cols_names = ['*']

    src = (
        f"SELECT {', '.join(_cols_names)} FROM {_pipe_name}"
        if not remote
        else get_pipe_query(pipe)
    )
    query = (
        f"""
        WITH src AS ({src})
        SELECT COUNT(*)
        FROM src
        """
    ) if self.flavor not in ('mysql', 'mariadb') else (
        f"""
        SELECT COUNT(*)
        FROM ({src}) AS src
        """
    )
    if begin is not None or end is not None:
        query += "WHERE"
    if begin is not None:
        query += f"""
        {dt} >= {dateadd_str(self.flavor, datepart='minute', number=0, begin=begin)}
        """
    if end is not None and begin is not None:
        query += "AND"
    if end is not None:
        query += f"""
        {dt} < {dateadd_str(self.flavor, datepart='minute', number=0, begin=end)}
        """
    if params is not None:
        from meerschaum.utils.sql import build_where
        existing_cols = pipe.get_columns_types(debug=debug)
        valid_params = {k: v for k, v in params.items() if k in existing_cols}
        if valid_params:
            query += build_where(valid_params, self).replace('WHERE', (
                'AND' if (begin is not None or end is not None)
                    else 'WHERE'
                )
            )
        
    result = self.value(query, debug=debug, silent=True)
    try:
        return int(result)
    except Exception as e:
        return None
def get_pipe_table(self, pipe: mrsm.Pipe, debug: bool = False) ‑> sqlalchemy.Table

Return the sqlalchemy.Table object for a mrsm.Pipe.

Parameters

pipe : mrsm.Pipe:
The pipe in question.

Returns

A sqlalchemy.Table object.

Expand source code
def get_pipe_table(
        self,
        pipe: mrsm.Pipe,
        debug: bool = False,
    ) -> sqlalchemy.Table:
    """
    Return the `sqlalchemy.Table` object for a `mrsm.Pipe`.

    Parameters
    ----------
    pipe: mrsm.Pipe:
        The pipe in question.
        

    Returns
    -------
    A `sqlalchemy.Table` object. 

    """
    from meerschaum.utils.sql import get_sqlalchemy_table
    if not pipe.exists(debug=debug):
        return None
    return get_sqlalchemy_table(pipe.target, connector=self, debug=debug, refresh=True)
def get_plugin_attributes(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False) ‑> Dict[str, Any]

Return the attributes of a plugin.

Expand source code
def get_plugin_attributes(
        self,
        plugin: 'meerschaum.core.Plugin',
        debug: bool = False
    ) -> Dict[str, Any]:
    """
    Return the attributes of a plugin.
    """
    ### ensure plugins table exists
    from meerschaum.connectors.sql.tables import get_tables
    import json
    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    query = (
        sqlalchemy
        .select(plugins_tbl.c.attributes)
        .where(plugins_tbl.c.plugin_name == plugin.name)
    )

    _attr = self.value(query, debug=debug)
    if isinstance(_attr, str):
        _attr = json.loads(_attr)
    elif _attr is None:
        _attr = {}
    return _attr
def get_plugin_id(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False) ‑> Optional[int]

Return a plugin's ID.

Expand source code
def get_plugin_id(
        self,
        plugin: 'meerschaum.core.Plugin',
        debug: bool = False
    ) -> Optional[int]:
    """
    Return a plugin's ID.
    """
    ### ensure plugins table exists
    from meerschaum.connectors.sql.tables import get_tables
    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    query = (
        sqlalchemy
        .select(plugins_tbl.c.plugin_id)
        .where(plugins_tbl.c.plugin_name == plugin.name)
    )
    
    try:
        return int(self.value(query, debug=debug))
    except Exception as e:
        return None
def get_plugin_user_id(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False) ‑> Optional[int]

Return a plugin's user ID.

Expand source code
def get_plugin_user_id(
        self,
        plugin: 'meerschaum.core.Plugin',
        debug: bool = False
    ) -> Optional[int]:
    """
    Return a plugin's user ID.
    """
    ### ensure plugins table exists
    from meerschaum.connectors.sql.tables import get_tables
    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    query = (
        sqlalchemy
        .select(plugins_tbl.c.user_id)
        .where(plugins_tbl.c.plugin_name == plugin.name)
    )

    try:
        return int(self.value(query, debug=debug))
    except Exception as e:
        return None
def get_plugin_username(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False) ‑> Optional[str]

Return the username of a plugin's owner.

Expand source code
def get_plugin_username(
        self,
        plugin: 'meerschaum.core.Plugin',
        debug: bool = False
    ) -> Optional[str]:
    """
    Return the username of a plugin's owner.
    """
    ### ensure plugins table exists
    from meerschaum.connectors.sql.tables import get_tables
    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
    users = get_tables(mrsm_instance=self, debug=debug)['users']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    query = (
        sqlalchemy.select(users.c.username)
        .where(
            users.c.user_id == plugins_tbl.c.user_id
            and plugins_tbl.c.plugin_name == plugin.name
        )
    )

    return self.value(query, debug=debug)
def get_plugin_version(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False) ‑> Optional[str]

Return a plugin's version.

Expand source code
def get_plugin_version(
        self,
        plugin: 'meerschaum.core.Plugin',
        debug: bool = False
    ) -> Optional[str]:
    """
    Return a plugin's version.
    """
    ### ensure plugins table exists
    from meerschaum.connectors.sql.tables import get_tables
    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    query = sqlalchemy.select(plugins_tbl.c.version).where(plugins_tbl.c.plugin_name == plugin.name)

    return self.value(query, debug=debug)
def get_plugins(self, user_id: Optional[int] = None, search_term: Optional[str] = None, debug: bool = False, **kw: Any) ‑> List[str]

Return a list of all registered plugins.

Parameters

user_id : Optional[int], default None
If specified, filter plugins by a specific user_id.
search_term : Optional[str], default None
If specified, add a WHERE plugin_name LIKE '{search_term}%' clause to filter the plugins.

Returns

A list of plugin names.

Expand source code
def get_plugins(
        self,
        user_id: Optional[int] = None,
        search_term: Optional[str] = None,
        debug: bool = False,
        **kw: Any
    ) -> List[str]:
    """
    Return a list of all registered plugins.

    Parameters
    ----------
    user_id: Optional[int], default None
        If specified, filter plugins by a specific `user_id`.

    search_term: Optional[str], default None
        If specified, add a `WHERE plugin_name LIKE '{search_term}%'` clause to filter the plugins.


    Returns
    -------
    A list of plugin names.
    """
    ### ensure plugins table exists
    from meerschaum.connectors.sql.tables import get_tables
    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    query = sqlalchemy.select(plugins_tbl.c.plugin_name)
    if user_id is not None:
        query = query.where(plugins_tbl.c.user_id == user_id)
    if search_term is not None:
        query = query.where(plugins_tbl.c.plugin_name.like(search_term + '%'))

    rows = (
        self.execute(query).fetchall()
        if self.flavor != 'duckdb'
        else [
            (row['plugin_name'],)
            for row in self.read(query).to_dict(orient='records')
        ]
    )
    
    return [row[0] for row in rows]
def get_sync_time(self, pipe: "'mrsm.Pipe'", params: Optional[Dict[str, Any]] = None, newest: bool = True, debug: bool = False) ‑> Union[datetime.datetime, int, ForwardRef(None)]

Get a Pipe's most recent datetime value.

Parameters

pipe : mrsm.Pipe
The pipe to get the sync time for.
params : Optional[Dict[str, Any]], default None
Optional params dictionary to build the WHERE clause. See build_where().
newest : bool, default True
If True, get the most recent datetime (honoring params). If False, get the oldest datetime (ASC instead of DESC).

Returns

A datetime object (or int if using an integer axis) if the pipe exists, otherwise None.

Expand source code
def get_sync_time(
        self,
        pipe: 'mrsm.Pipe',
        params: Optional[Dict[str, Any]] = None,
        newest: bool = True,
        debug: bool = False,
    ) -> Union[datetime, int, None]:
    """Get a Pipe's most recent datetime value.

    Parameters
    ----------
    pipe: mrsm.Pipe
        The pipe to get the sync time for.

    params: Optional[Dict[str, Any]], default None
        Optional params dictionary to build the `WHERE` clause.
        See `meerschaum.utils.sql.build_where`.

    newest: bool, default True
        If `True`, get the most recent datetime (honoring `params`).
        If `False`, get the oldest datetime (ASC instead of DESC).

    Returns
    -------
    A `datetime` object (or `int` if using an integer axis) if the pipe exists, otherwise `None`.
    """
    from meerschaum.utils.sql import sql_item_name, build_where
    from meerschaum.utils.warnings import warn
    table = sql_item_name(pipe.target, self.flavor)

    dt_col = pipe.columns.get('datetime', None)
    dt_type = pipe.dtypes.get(dt_col, 'datetime64[ns]')
    if not dt_col:
        _dt = pipe.guess_datetime()
        dt = sql_item_name(_dt, self.flavor) if _dt else None
        is_guess = True
    else:
        _dt = dt_col
        dt = sql_item_name(_dt, self.flavor)
        is_guess = False

    if _dt is None:
        return None

    ASC_or_DESC = "DESC" if newest else "ASC"
    existing_cols = pipe.get_columns_types(debug=debug)
    valid_params = {}
    if params is not None:
        valid_params = {k: v for k, v in params.items() if k in existing_cols}

    ### If no bounds are provided for the datetime column,
    ### add IS NOT NULL to the WHERE clause.
    if _dt not in valid_params:
        valid_params[_dt] = '_None'
    where = "" if not valid_params else build_where(valid_params, self)
    q = f"SELECT {dt}\nFROM {table}{where}\nORDER BY {dt} {ASC_or_DESC}\nLIMIT 1"
    if self.flavor == 'mssql':
        q = f"SELECT TOP 1 {dt}\nFROM {table}{where}\nORDER BY {dt} {ASC_or_DESC}"
    elif self.flavor == 'oracle':
        q = (
            "SELECT * FROM (\n"
            + f"    SELECT {dt}\nFROM {table}{where}\n    ORDER BY {dt} {ASC_or_DESC}\n"
            + ") WHERE ROWNUM = 1"
        )

    try:
        db_time = self.value(q, silent=True, debug=debug)

        ### No datetime could be found.
        if db_time is None:
            return None
        ### sqlite returns str.
        if isinstance(db_time, str):
            from meerschaum.utils.packages import attempt_import
            dateutil_parser = attempt_import('dateutil.parser')
            st = dateutil_parser.parse(db_time)
        ### Do nothing if a datetime object is returned.
        elif isinstance(db_time, datetime):
            if hasattr(db_time, 'to_pydatetime'):
                st = db_time.to_pydatetime()
            else:
                st = db_time
        ### Sometimes the datetime is actually a date.
        elif isinstance(db_time, date):
            st = datetime.combine(db_time, datetime.min.time())
        ### Adding support for an integer datetime axis.
        elif 'int' in str(type(db_time)).lower():
            st = int(db_time)
        ### Convert pandas timestamp to Python datetime.
        else:
            st = db_time.to_pydatetime()

        sync_time = st

    except Exception as e:
        sync_time = None
        warn(str(e))

    return sync_time
def get_to_sql_dtype(self, pipe: "'mrsm.Pipe'", df: "'pd.DataFrame'", update_dtypes: bool = True) ‑> Dict[str, 'sqlalchemy.sql.visitors.TraversibleType']

Given a pipe and DataFrame, return the dtype dictionary for to_sql().

Parameters

pipe : mrsm.Pipe
The pipe which may contain a dtypes parameter.
df : pd.DataFrame
The DataFrame to be pushed via to_sql().
update_dtypes : bool, default True
If True, patch the pipe's dtypes onto the DataFrame's dtypes.

Returns

A dictionary with sqlalchemy datatypes.

Examples

>>> import pandas as pd
>>> import meerschaum as mrsm
>>> 
>>> conn = mrsm.get_connector('sql:memory')
>>> df = pd.DataFrame([{'a': {'b': 1}}])
>>> pipe = mrsm.Pipe('a', 'b', dtypes={'a': 'json'})
>>> get_to_sql_dtype(pipe, df)
{'a': <class 'sqlalchemy.sql.sqltypes.JSON'>}
Expand source code
def get_to_sql_dtype(
        self,
        pipe: 'mrsm.Pipe',
        df: 'pd.DataFrame',
        update_dtypes: bool = True,
    ) -> Dict[str, 'sqlalchemy.sql.visitors.TraversibleType']:
    """
    Given a pipe and DataFrame, return the `dtype` dictionary for `to_sql()`.

    Parameters
    ----------
    pipe: mrsm.Pipe
        The pipe which may contain a `dtypes` parameter.

    df: pd.DataFrame
        The DataFrame to be pushed via `to_sql()`.

    update_dtypes: bool, default True
        If `True`, patch the pipe's dtypes onto the DataFrame's dtypes.

    Returns
    -------
    A dictionary with `sqlalchemy` datatypes.

    Examples
    --------
    >>> import pandas as pd
    >>> import meerschaum as mrsm
    >>> 
    >>> conn = mrsm.get_connector('sql:memory')
    >>> df = pd.DataFrame([{'a': {'b': 1}}])
    >>> pipe = mrsm.Pipe('a', 'b', dtypes={'a': 'json'})
    >>> get_to_sql_dtype(pipe, df)
    {'a': <class 'sqlalchemy.sql.sqltypes.JSON'>}
    """
    from meerschaum.utils.misc import get_json_cols
    from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type
    df_dtypes = {
        col: str(typ)
        for col, typ in df.dtypes.items()
    }
    json_cols = get_json_cols(df)
    df_dtypes.update({col: 'json' for col in json_cols})
    if update_dtypes:
        df_dtypes.update(pipe.dtypes)
    return {
        col: get_db_type_from_pd_type(typ, self.flavor, as_sqlalchemy=True)
        for col, typ in df_dtypes.items()
    }
def get_user_attributes(self, user: meerschaum.core.User, debug: bool = False) ‑> Union[Dict[str, Any], None]

Return the user's attributes.

Expand source code
def get_user_attributes(
        self,
        user: meerschaum.core.User,
        debug: bool = False
    ) -> Union[Dict[str, Any], None]:
    """
    Return the user's attributes.
    """
    ### ensure users table exists
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')
    from meerschaum.connectors.sql.tables import get_tables
    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']

    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)

    query = (
        sqlalchemy.select(users_tbl.c.attributes)
        .where(users_tbl.c.user_id == user_id)
    )

    result = self.value(query, debug=debug)
    if result is not None and not isinstance(result, dict):
        try:
            result = dict(result)
            _parsed = True
        except Exception as e:
            _parsed = False
        if not _parsed:
            try:
                import json
                result = json.loads(result)
                _parsed = True
            except Exception as e:
                _parsed = False
        if not _parsed:
            warn(f"Received unexpected type for attributes: {result}")
    return result
def get_user_id(self, user: meerschaum.core.User, debug: bool = False) ‑> Optional[int]

If a user is registered, return the user_id.

Expand source code
def get_user_id(
        self,
        user: meerschaum.core.User,
        debug : bool = False
    ) -> Optional[int]:
    """If a user is registered, return the `user_id`."""
    ### ensure users table exists
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')
    from meerschaum.connectors.sql.tables import get_tables
    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']

    query = (
        sqlalchemy.select(users_tbl.c.user_id)
        .where(users_tbl.c.username == user.username)
    )

    result = self.value(query, debug=debug)
    if result is not None:
        return int(result)
    return None
def get_user_password_hash(self, user: meerschaum.core.User, debug: bool = False, **kw: Any) ‑> Optional[str]

Return the password has for a user. NOTE: This may be dangerous and is only allowed if the security settings explicity allow it.

Expand source code
def get_user_password_hash(
        self,
        user: meerschaum.core.User,
        debug: bool = False,
        **kw: Any
    ) -> Optional[str]:
    """
    Return the password has for a user.
    **NOTE**: This may be dangerous and is only allowed if the security settings explicity allow it.
    """
    from meerschaum.utils.debug import dprint
    from meerschaum.connectors.sql.tables import get_tables
    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    if user.user_id is not None:
        user_id = user.user_id
        if debug:
            dprint(f"Already given user_id: {user_id}")
    else:
        if debug:
            dprint(f"Fetching user_id...")
        user_id = self.get_user_id(user, debug=debug)

    if user_id is None:
        return None

    query = sqlalchemy.select(users_tbl.c.password_hash).where(users_tbl.c.user_id == user_id)

    return self.value(query, debug=debug)
def get_user_type(self, user: meerschaum.core.User, debug: bool = False, **kw: Any) ‑> Optional[str]

Return the user's type.

Expand source code
def get_user_type(
        self,
        user: meerschaum.core.User,
        debug: bool = False,
        **kw: Any
    ) -> Optional[str]:
    """
    Return the user's type.
    """
    from meerschaum.connectors.sql.tables import get_tables
    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug)

    if user_id is None:
        return None

    query = sqlalchemy.select(users_tbl.c.user_type).where(users_tbl.c.user_id == user_id)

    return self.value(query, debug=debug)
def get_users(self, debug: bool = False, **kw: Any) ‑> List[str]

Get the registered usernames.

Expand source code
def get_users(
        self,
        debug: bool = False,
        **kw: Any
    ) -> List[str]:
    """
    Get the registered usernames.
    """
    ### ensure users table exists
    from meerschaum.connectors.sql.tables import get_tables
    users_tbl = get_tables(mrsm_instance=self, debug=debug)['users']
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')

    query = sqlalchemy.select(users_tbl.c.username)

    return list(self.read(query, debug=debug)['username'])
def pipe_exists(self, pipe: mrsm.Pipe, debug: bool = False) ‑> bool

Check that a Pipe's table exists.

Parameters

pipe : mrsm.Pipe:
The pipe to check.
debug : bool, default False
Verbosity toggle.

Returns

A bool corresponding to whether a pipe's table exists.

Expand source code
def pipe_exists(
        self,
        pipe: mrsm.Pipe,
        debug: bool = False
    ) -> bool:
    """
    Check that a Pipe's table exists.

    Parameters
    ----------
    pipe: mrsm.Pipe:
        The pipe to check.
        
    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `bool` corresponding to whether a pipe's table exists.

    """
    from meerschaum.utils.sql import table_exists
    exists = table_exists(pipe.target, self, debug=debug)
    if debug:
        from meerschaum.utils.debug import dprint
        dprint(f"{pipe} " + ('exists.' if exists else 'does not exist.'))
    return exists
def read(self, query_or_table: Union[str, sqlalchemy.Query], params: Optional[Dict[str, Any], List[str]] = None, dtype: Optional[Dict[str, Any]] = None, dtype_backend: str = 'pyarrow', chunksize: Optional[int] = -1, workers: Optional[int] = None, chunk_hook: Optional[Callable[[pandas.DataFrame], Any]] = None, as_hook_results: bool = False, chunks: Optional[int] = None, as_chunks: bool = False, as_iterator: bool = False, as_dask: bool = False, index_col: Optional[str] = None, silent: bool = False, debug: bool = False, **kw: Any) ‑> Union[pandas.DataFrame, dask.DataFrame, List[pandas.DataFrame], List[Any], None]

Read a SQL query or table into a pandas dataframe.

Parameters

query_or_table : Union[str, sqlalchemy.Query]
The SQL query (sqlalchemy Query or string) or name of the table from which to select.
params : Optional[Dict[str, Any]], default None
List or Dict of parameters to pass to pandas.read_sql(). See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html
dtype : Optional[Dict[str, Any]], default None
A dictionary of data types to pass to pandas.read_sql(). See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html
dtype_backend : str, default 'pyarrow'
Which pandas dtype engine to use.
chunksize : Optional[int], default -1

How many chunks to read at a time. None will read everything in one large chunk. Defaults to system configuration.

NOTE: DuckDB does not allow for chunking.

workers : Optional[int], default None
How many threads to use when consuming the generator. Only applies if chunk_hook is provided.
chunk_hook : Optional[Callable[[pandas.DataFrame], Any]], default None
Hook function to execute once per chunk, e.g. writing and reading chunks intermittently. See --sync-chunks for an example. NOTE: as_iterator MUST be False (default).
as_hook_results : bool, default False

If True, return a List of the outputs of the hook function. Only applicable if chunk_hook is not None.

NOTE: as_iterator MUST be False (default).

chunks : Optional[int], default None
Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and return into a single dataframe. For example, to limit the returned dataframe to 100,000 rows, you could specify a chunksize of 1000 and chunks of 100.
as_chunks : bool, default False
If True, return a list of DataFrames. Otherwise return a single DataFrame.
as_iterator : bool, default False
If True, return the pandas DataFrame iterator. chunksize must not be None (falls back to 1000 if so), and hooks are not called in this case.
index_col : Optional[str], default None
If using Dask, use this column as the index column. If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame.
silent : bool, default False
If True, don't raise warnings in case of errors. Defaults to False.

Returns

A pd.DataFrame (default case), or an iterator, or a list of dataframes / iterators, or None if something breaks.

Expand source code
def read(
        self,
        query_or_table: Union[str, sqlalchemy.Query],
        params: Optional[Dict[str, Any], List[str]] = None,
        dtype: Optional[Dict[str, Any]] = None,
        dtype_backend: str = 'pyarrow',
        chunksize: Optional[int] = -1,
        workers: Optional[int] = None,
        chunk_hook: Optional[Callable[[pandas.DataFrame], Any]] = None,
        as_hook_results: bool = False,
        chunks: Optional[int] = None,
        as_chunks: bool = False,
        as_iterator: bool = False,
        as_dask: bool = False,
        index_col: Optional[str] = None,
        silent: bool = False,
        debug: bool = False,
        **kw: Any
    ) -> Union[
        pandas.DataFrame,
        dask.DataFrame,
        List[pandas.DataFrame],
        List[Any],
        None,
    ]:
    """
    Read a SQL query or table into a pandas dataframe.

    Parameters
    ----------
    query_or_table: Union[str, sqlalchemy.Query]
        The SQL query (sqlalchemy Query or string) or name of the table from which to select.

    params: Optional[Dict[str, Any]], default None
        `List` or `Dict` of parameters to pass to `pandas.read_sql()`.
        See the pandas documentation for more information:
        https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html

    dtype: Optional[Dict[str, Any]], default None
        A dictionary of data types to pass to `pandas.read_sql()`.
        See the pandas documentation for more information:
        https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html

    dtype_backend: str, default 'pyarrow'
        Which pandas dtype engine to use.

    chunksize: Optional[int], default -1
        How many chunks to read at a time. `None` will read everything in one large chunk.
        Defaults to system configuration.

        **NOTE:** DuckDB does not allow for chunking.

    workers: Optional[int], default None
        How many threads to use when consuming the generator.
        Only applies if `chunk_hook` is provided.

    chunk_hook: Optional[Callable[[pandas.DataFrame], Any]], default None
        Hook function to execute once per chunk, e.g. writing and reading chunks intermittently.
        See `--sync-chunks` for an example.
        **NOTE:** `as_iterator` MUST be False (default).

    as_hook_results: bool, default False
        If `True`, return a `List` of the outputs of the hook function.
        Only applicable if `chunk_hook` is not None.

        **NOTE:** `as_iterator` MUST be `False` (default).

    chunks: Optional[int], default None
        Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and
        return into a single dataframe.
        For example, to limit the returned dataframe to 100,000 rows,
        you could specify a `chunksize` of `1000` and `chunks` of `100`.

    as_chunks: bool, default False
        If `True`, return a list of DataFrames. 
        Otherwise return a single DataFrame.

    as_iterator: bool, default False
        If `True`, return the pandas DataFrame iterator.
        `chunksize` must not be `None` (falls back to 1000 if so),
        and hooks are not called in this case.

    index_col: Optional[str], default None
        If using Dask, use this column as the index column.
        If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame.

    silent: bool, default False
        If `True`, don't raise warnings in case of errors.
        Defaults to `False`.

    Returns
    -------
    A `pd.DataFrame` (default case), or an iterator, or a list of dataframes / iterators,
    or `None` if something breaks.

    """
    if chunks is not None and chunks <= 0:
        return []
    from meerschaum.utils.sql import sql_item_name, truncate_item_name
    from meerschaum.utils.packages import attempt_import, import_pandas
    from meerschaum.utils.pool import get_pool
    from meerschaum.utils.dataframe import chunksize_to_npartitions
    import warnings
    import inspect
    import traceback
    pd = import_pandas()
    dd = None
    is_dask = 'dask' in pd.__name__
    pd = attempt_import('pandas')
    #  pd = import_pandas()
    is_dask = dd is not None
    npartitions = chunksize_to_npartitions(chunksize)
    if is_dask:
        chunksize = None

    sqlalchemy = attempt_import("sqlalchemy")
    default_chunksize = self._sys_config.get('chunksize', None)
    chunksize = chunksize if chunksize != -1 else default_chunksize
    if chunksize is None and as_iterator:
        if not silent and self.flavor not in _disallow_chunks_flavors:
            warn(
                f"An iterator may only be generated if chunksize is not None.\n"
                + "Falling back to a chunksize of 1000.", stacklevel=3,
            )
        chunksize = 1000
    if chunksize is not None and self.flavor in _max_chunks_flavors:
        if chunksize > _max_chunks_flavors[self.flavor]:
            if chunksize != default_chunksize:
                warn(
                    f"The specified chunksize of {chunksize} exceeds the maximum of "
                    + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n"
                    + f"    Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.",
                    stacklevel = 3,
                )
            chunksize = _max_chunks_flavors[self.flavor]

    ### NOTE: A bug in duckdb_engine does not allow for chunks.
    if chunksize is not None and self.flavor in _disallow_chunks_flavors:
        chunksize = None

    if debug:
        import time
        start = time.perf_counter()
        dprint(query_or_table)
        dprint(f"[{self}] Fetching with chunksize: {chunksize}")

    ### This might be sqlalchemy object or the string of a table name.
    ### We check for spaces and quotes to see if it might be a weird table.
    if (
        ' ' not in str(query_or_table)
        or (
            ' ' in str(query_or_table)
            and str(query_or_table).startswith('"')
            and str(query_or_table).endswith('"')
        )
    ):
        truncated_table_name = truncate_item_name(str(query_or_table), self.flavor)
        if truncated_table_name != str(query_or_table) and not silent:
            warn(
                f"Table '{name}' is too long for '{self.flavor}',"
                + f" will instead create the table '{truncated_name}'."
            )

        query_or_table = sql_item_name(str(query_or_table), self.flavor)
        if debug:
            dprint(f"[{self}] Reading from table {query_or_table}")
        formatted_query = sqlalchemy.text("SELECT * FROM " + str(query_or_table))
        str_query = f"SELECT * FROM {query_or_table}"
    else:
        str_query = query_or_table

    formatted_query = (
        sqlalchemy.text(str_query)
        if not is_dask and isinstance(str_query, str)
        else format_sql_query_for_dask(str_query)
    )

    chunk_list = []
    chunk_hook_results = []
    try:
        stream_results = not as_iterator and chunk_hook is not None and chunksize is not None
        with warnings.catch_warnings():
            warnings.filterwarnings('ignore', 'case sensitivity issues')

            read_sql_query_kwargs = {
                'params': params,
                'dtype': dtype,
                'dtype_backend': dtype_backend,
                'index_col': index_col,
            }
            if is_dask:
                if index_col is None:
                    dd = None
                    pd = attempt_import('pandas')
                    read_sql_query_kwargs.update({
                        'chunksize': chunksize,
                    })
            else:
                read_sql_query_kwargs.update({
                    'chunksize': chunksize,
                })

            if is_dask and dd is not None:
                ddf = dd.read_sql_query(
                    formatted_query,
                    self.URI,
                    **read_sql_query_kwargs
                )
            else:

                with self.engine.begin() as transaction:
                    with transaction.execution_options(stream_results=stream_results) as connection:
                        chunk_generator = pd.read_sql_query(
                            formatted_query,
                            connection,
                            params = params,
                            chunksize = chunksize,
                            dtype = dtype,
                        )

                        ### `stream_results` must be False (will load everything into memory).
                        if as_iterator or chunksize is None:
                            return chunk_generator

                        ### We must consume the generator in this context if using server-side cursors.
                        if stream_results:

                            pool = get_pool(workers=workers)

                            def _process_chunk(_chunk, _retry_on_failure: bool = True):
                                if not as_hook_results:
                                    chunk_list.append(_chunk)
                                result = None
                                if chunk_hook is not None:
                                    try:
                                        result = chunk_hook(
                                            _chunk,
                                            workers = workers,
                                            chunksize = chunksize,
                                            debug = debug,
                                            **kw
                                        )
                                    except Exception as e:
                                        result = False, traceback.format_exc()
                                        from meerschaum.utils.formatting import get_console
                                        get_console().print_exception()

                                    ### If the chunk fails to process, try it again one more time.
                                    if isinstance(result, tuple) and result[0] is False:
                                        if _retry_on_failure:
                                            return _process_chunk(_chunk, _retry_on_failure=False)

                                return result

                            chunk_hook_results = list(pool.imap(_process_chunk, chunk_generator))
                            if as_hook_results:
                                return chunk_hook_results

    except Exception as e:
        if debug:
            dprint(f"[{self}] Failed to execute query:\n\n{query_or_table}\n\n")
        if not silent:
            warn(str(e), stacklevel=3)
        from meerschaum.utils.formatting import get_console
        get_console().print_exception()

        return None

    if is_dask and dd is not None:
        ddf = ddf.reset_index()
        return ddf

    chunk_list = []
    read_chunks = 0
    chunk_hook_results = []
    if chunksize is None:
        chunk_list.append(chunk_generator)
    elif as_iterator:
        return chunk_generator
    else:
        try:
            for chunk in chunk_generator:
                if chunk_hook is not None:
                    chunk_hook_results.append(
                        chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw)
                    )
                chunk_list.append(chunk)
                read_chunks += 1
                if chunks is not None and read_chunks >= chunks:
                    break
        except Exception as e:
            warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3)
            from meerschaum.utils.formatting import get_console
            get_console().print_exception()

    read_chunks = 0
    try:
        for chunk in chunk_generator:
            if chunk_hook is not None:
                chunk_hook_results.append(
                    chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw)
                )
            chunk_list.append(chunk)
            read_chunks += 1
            if chunks is not None and read_chunks >= chunks:
                break
    except Exception as e:
        warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3)
        from meerschaum.utils.formatting import get_console
        get_console().print_exception()

        return None

    ### If no chunks returned, read without chunks
    ### to get columns
    if len(chunk_list) == 0:
        with warnings.catch_warnings():
            warnings.filterwarnings('ignore', 'case sensitivity issues')
            with self.engine.begin() as connection:
                chunk_list.append(
                    pd.read_sql_query(
                        formatted_query,
                        connection,
                        params = params, 
                        dtype = dtype,
                    )
                )

    ### call the hook on any missed chunks.
    if chunk_hook is not None and len(chunk_list) > len(chunk_hook_results):
        for c in chunk_list[len(chunk_hook_results):]:
            chunk_hook_results.append(
                chunk_hook(c, chunksize=chunksize, debug=debug, **kw)
            )

    ### chunksize is not None so must iterate
    if debug:
        end = time.perf_counter()
        dprint(f"Fetched {len(chunk_list)} chunks in {round(end - start, 2)} seconds.")

    if as_hook_results:
        return chunk_hook_results
    
    ### Skip `pd.concat()` if `as_chunks` is specified.
    if as_chunks:
        for c in chunk_list:
            c.reset_index(drop=True, inplace=True)
        return chunk_list

    return pd.concat(chunk_list).reset_index(drop=True)
def register_pipe(self, pipe: mrsm.Pipe, debug: bool = False) ‑> Tuple[bool, str]

Register a new pipe. A pipe's attributes must be set before registering.

Expand source code
def register_pipe(
        self,
        pipe: mrsm.Pipe,
        debug: bool = False,
    ) -> SuccessTuple:
    """
    Register a new pipe.
    A pipe's attributes must be set before registering.
    """
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.packages import attempt_import
    from meerschaum.utils.sql import json_flavors

    ### ensure pipes table exists
    from meerschaum.connectors.sql.tables import get_tables
    pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes']

    if pipe.get_id(debug=debug) is not None:
        return False, f"{pipe} is already registered."

    ### NOTE: if `parameters` is supplied in the Pipe constructor,
    ###       then `pipe.parameters` will exist and not be fetched from the database.

    ### 1. Prioritize the Pipe object's `parameters` first.
    ###    E.g. if the user manually sets the `parameters` property
    ###    or if the Pipe already exists
    ###    (which shouldn't be able to be registered anyway but that's an issue for later).
    parameters = None
    try:
        parameters = pipe.parameters
    except Exception as e:
        if debug:
            dprint(str(e))
        parameters = None

    ### ensure `parameters` is a dictionary
    if parameters is None:
        parameters = {}

    import json
    sqlalchemy = attempt_import('sqlalchemy')
    values = {
        'connector_keys' : pipe.connector_keys,
        'metric_key'     : pipe.metric_key,
        'location_key'   : pipe.location_key,
        'parameters'     : (
            json.dumps(parameters)
            if self.flavor not in json_flavors
            else parameters
        ),
    }
    query = sqlalchemy.insert(pipes_tbl).values(**values)
    result = self.exec(query, debug=debug)
    if result is None:
        return False, f"Failed to register {pipe}."
    return True, f"Successfully registered {pipe}."
def register_plugin(self, plugin: "'meerschaum.core.Plugin'", force: bool = False, debug: bool = False, **kw: Any) ‑> SuccessTuple

Register a new plugin to the plugins table.

Expand source code
def register_plugin(
        self,
        plugin: 'meerschaum.core.Plugin',
        force: bool = False,
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """Register a new plugin to the plugins table."""
    from meerschaum.utils.warnings import warn, error
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')
    from meerschaum.utils.sql import json_flavors
    from meerschaum.connectors.sql.tables import get_tables
    plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins']

    old_id = self.get_plugin_id(plugin, debug=debug)

    ### Check for version conflict. May be overridden with `--force`.
    if old_id is not None and not force:
        old_version = self.get_plugin_version(plugin, debug=debug)
        new_version = plugin.version
        if old_version is None:
            old_version = ''
        if new_version is None:
            new_version = ''

        ### verify that the new version is greater than the old
        packaging_version = attempt_import('packaging.version')
        if (
            old_version and new_version
            and packaging_version.parse(old_version) >= packaging_version.parse(new_version)
        ):
            return False, (
                f"Version '{new_version}' of plugin '{plugin}' " +
                f"must be greater than existing version '{old_version}'."
            )

    import json
    bind_variables = {
        'plugin_name' : plugin.name,
        'version' : plugin.version,
        'attributes' : (
            json.dumps(plugin.attributes) if self.flavor not in json_flavors else plugin.attributes
        ),
        'user_id' : plugin.user_id,
    }

    if old_id is None:
        query = sqlalchemy.insert(plugins_tbl).values(**bind_variables)
    else:
        query = (
            sqlalchemy.update(plugins_tbl)
            .values(**bind_variables)
            .where(plugins_tbl.c.plugin_id == old_id)
        )

    result = self.exec(query, debug=debug)
    if result is None:
        return False, f"Failed to register plugin '{plugin}'."
    return True, f"Successfully registered plugin '{plugin}'."
def register_user(self, user: meerschaum.core.User, debug: bool = False, **kw: Any) ‑> SuccessTuple

Register a new user.

Expand source code
def register_user(
        self,
        user: meerschaum.core.User,
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """Register a new user."""
    from meerschaum.utils.warnings import warn, error, info
    from meerschaum.utils.packages import attempt_import
    from meerschaum.utils.sql import json_flavors
    sqlalchemy = attempt_import('sqlalchemy')

    valid_tuple = valid_username(user.username)
    if not valid_tuple[0]:
        return valid_tuple

    old_id = self.get_user_id(user, debug=debug)

    if old_id is not None:
        return False, f"User '{user}' already exists."

    ### ensure users table exists
    from meerschaum.connectors.sql.tables import get_tables
    tables = get_tables(mrsm_instance=self, debug=debug)

    import json
    bind_variables = {
        'username' : user.username,
        'email' : user.email,
        'password_hash' : user.password_hash,
        'user_type' : user.type,
        'attributes' : (
            json.dumps(user.attributes) if self.flavor not in json_flavors else user.attributes
        ),
    }
    if old_id is not None:
        return False, f"User '{user.username}' already exists."
    if old_id is None:
        query = (
            sqlalchemy.insert(tables['users']).
            values(**bind_variables)
        )

    result = self.exec(query, debug=debug)
    if result is None:
        return False, f"Failed to register user '{user}'."
    return True, f"Successfully registered user '{user}'."
def sync_pipe(self, pipe: mrsm.Pipe, df: Union[pandas.DataFrame, str, Dict[Any, Any], None] = None, begin: Optional[datetime] = None, end: Optional[datetime] = None, chunksize: Optional[int] = -1, check_existing: bool = True, blocking: bool = True, debug: bool = False, **kw: Any) ‑> SuccessTuple

Sync a pipe using a database connection.

Parameters

pipe : mrsm.Pipe
The Meerschaum Pipe instance into which to sync the data.
df : Union[pandas.DataFrame, str, Dict[Any, Any], List[Dict[str, Any]]]
An optional DataFrame or equivalent to sync into the pipe. Defaults to None.
begin : Optional[datetime], default None
Optionally specify the earliest datetime to search for data. Defaults to None.
end : Optional[datetime], default None
Optionally specify the latest datetime to search for data. Defaults to None.
chunksize : Optional[int], default -1
Specify the number of rows to sync per chunk. If -1, resort to system configuration (default is 900). A chunksize of None will sync all rows in one transaction. Defaults to -1.
check_existing : bool, default True
If True, pull and diff with existing data from the pipe. Defaults to True.
blocking : bool, default True
If True, wait for sync to finish and return its result, otherwise asyncronously sync. Defaults to True.
debug : bool, default False
Verbosity toggle. Defaults to False.
kw : Any
Catch-all for keyword arguments.

Returns

A SuccessTuple of success (bool) and message (str).

Expand source code
def sync_pipe(
        self,
        pipe: mrsm.Pipe,
        df: Union[pandas.DataFrame, str, Dict[Any, Any], None] = None,
        begin: Optional[datetime] = None,
        end: Optional[datetime] = None,
        chunksize: Optional[int] = -1,
        check_existing: bool = True,
        blocking: bool = True,
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """
    Sync a pipe using a database connection.

    Parameters
    ----------
    pipe: mrsm.Pipe
        The Meerschaum Pipe instance into which to sync the data.

    df: Union[pandas.DataFrame, str, Dict[Any, Any], List[Dict[str, Any]]]
        An optional DataFrame or equivalent to sync into the pipe.
        Defaults to `None`.

    begin: Optional[datetime], default None
        Optionally specify the earliest datetime to search for data.
        Defaults to `None`.

    end: Optional[datetime], default None
        Optionally specify the latest datetime to search for data.
        Defaults to `None`.

    chunksize: Optional[int], default -1
        Specify the number of rows to sync per chunk.
        If `-1`, resort to system configuration (default is `900`).
        A `chunksize` of `None` will sync all rows in one transaction.
        Defaults to `-1`.

    check_existing: bool, default True
        If `True`, pull and diff with existing data from the pipe. Defaults to `True`.

    blocking: bool, default True
        If `True`, wait for sync to finish and return its result, otherwise asyncronously sync.
        Defaults to `True`.

    debug: bool, default False
        Verbosity toggle. Defaults to False.

    kw: Any
        Catch-all for keyword arguments.

    Returns
    -------
    A `SuccessTuple` of success (`bool`) and message (`str`).
    """
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.packages import import_pandas
    from meerschaum.utils.sql import get_update_queries, sql_item_name, json_flavors
    from meerschaum.utils.misc import generate_password
    from meerschaum.utils.dataframe import get_json_cols
    from meerschaum.utils.dtypes import are_dtypes_equal
    from meerschaum import Pipe
    import time
    import copy
    pd = import_pandas()
    if df is None:
        msg = f"DataFrame is None. Cannot sync {pipe}."
        warn(msg)
        return False, msg

    start = time.perf_counter()

    if not pipe.temporary and not pipe.get_id(debug=debug):
        register_tuple = pipe.register(debug=debug)
        if not register_tuple[0]:
            return register_tuple

    ### df is the dataframe returned from the remote source
    ### via the connector
    if debug:
        dprint("Fetched data:\n" + str(df))

    if not isinstance(df, pd.DataFrame):
        df = pipe.enforce_dtypes(df, chunksize=chunksize, debug=debug)

    ### if table does not exist, create it with indices
    is_new = False
    add_cols_query = None
    if not pipe.exists(debug=debug):
        check_existing = False
        is_new = True
    else:
        ### Check for new columns.
        add_cols_queries = self.get_add_columns_queries(pipe, df, debug=debug)
        if add_cols_queries:
            if not self.exec_queries(add_cols_queries, debug=debug):
                warn(f"Failed to add new columns to {pipe}.")

        alter_cols_queries = self.get_alter_columns_queries(pipe, df, debug=debug)
        if alter_cols_queries:
            if not self.exec_queries(alter_cols_queries, debug=debug):
                warn(f"Failed to alter columns for {pipe}.")
            else:
                _ = pipe.infer_dtypes(persist=True)

    ### NOTE: Oracle SQL < 23c (2023) and SQLite does not support booleans,
    ### so infer bools and persist them to `dtypes`.
    ### MSSQL supports `BIT` for booleans, but we coerce bools to int for MSSQL
    ### to avoid merge issues.
    if self.flavor in ('oracle', 'sqlite', 'mssql'):
        pipe_dtypes = pipe.dtypes
        new_bool_cols = {
            col: 'bool[pyarrow]'
            for col, typ in df.dtypes.items()
            if col not in pipe_dtypes
            and are_dtypes_equal(str(typ), 'bool')
        }
        pipe_dtypes.update(new_bool_cols)
        pipe.dtypes = pipe_dtypes
        if not pipe.temporary:
            infer_bool_success, infer_bool_msg = pipe.edit(debug=debug)
            if not infer_bool_success:
                return infer_bool_success, infer_bool_msg

    unseen_df, update_df, delta_df = (
        pipe.filter_existing(
            df,
            chunksize = chunksize,
            debug = debug,
            **kw
        ) if check_existing else (df, None, df)
    )
    if debug:
        dprint("Delta data:\n" + str(delta_df))
        dprint("Unseen data:\n" + str(unseen_df))
        if update_df is not None:
            dprint("Update data:\n" + str(update_df))

    if update_df is not None and not len(update_df) == 0:
        transact_id = generate_password(3)
        temp_target = '_' + transact_id + '_' + pipe.target
        update_kw = copy.deepcopy(kw)
        update_kw.update({
            'name': temp_target,
            'if_exists': 'append',
            'chunksize': chunksize,
            'dtype': self.get_to_sql_dtype(pipe, update_df, update_dtypes=False),
            'debug': debug,
        })
        self.to_sql(update_df, **update_kw)
        temp_pipe = Pipe(
            pipe.connector_keys + '_', pipe.metric_key, pipe.location_key,
            instance = pipe.instance_keys,
            columns = pipe.columns,
            target = temp_target,
            temporary = True,
        )

        existing_cols = pipe.get_columns_types(debug=debug)
        join_cols = [
            col for col_key, col in pipe.columns.items()
            if col and col_key != 'value' and col in existing_cols
        ]

        queries = get_update_queries(
            pipe.target,
            temp_target,
            self,
            join_cols,
            debug = debug
        )
        success = all(self.exec_queries(queries, break_on_error=True, debug=debug))
        drop_success, drop_msg = temp_pipe.drop(debug=debug)
        if not drop_success:
            warn(drop_msg)
        if not success:
            return False, f"Failed to apply update to {pipe}."

    if_exists = kw.get('if_exists', 'append')
    if 'if_exists' in kw:
        kw.pop('if_exists')
    if 'name' in kw:
        kw.pop('name')

    ### Account for first-time syncs of JSON columns.
    unseen_json_cols = get_json_cols(unseen_df)
    update_json_cols = get_json_cols(update_df) if update_df is not None else []
    json_cols = list(set(unseen_json_cols + update_json_cols))
    existing_json_cols = [col for col, typ in pipe.dtypes.items() if typ == 'json']
    new_json_cols = [col for col in json_cols if col not in existing_json_cols]
    if new_json_cols:
        pipe.dtypes.update({col: 'json' for col in json_cols})
        if not pipe.temporary:
            edit_success, edit_msg = pipe.edit(interactive=False, debug=debug)
            if not edit_success:
                warn(f"Unable to update JSON dtypes for {pipe}:\n{edit_msg}")

    ### Insert new data into Pipe's table.
    unseen_kw = copy.deepcopy(kw)
    unseen_kw.update({
        'name': pipe.target,
        'if_exists': if_exists,
        'debug': debug,
        'as_dict': True,
        'chunksize': chunksize,
        'dtype': self.get_to_sql_dtype(pipe, unseen_df, update_dtypes=True),
    })

    stats = self.to_sql(unseen_df, **unseen_kw)
    if is_new:
        if not self.create_indices(pipe, debug=debug):
            if debug:
                dprint(f"Failed to create indices for {pipe}. Continuing...")

    end = time.perf_counter()
    success = stats['success']
    if not success:
        return success, stats['msg']
    msg = (
        f"Inserted {len(unseen_df.index)}, "
        + f"updated {len(update_df.index) if update_df is not None else 0} rows."
    )
    if debug:
        msg = msg[:-1] + (
            f"\non table {sql_item_name(pipe.target, self.flavor)}\n"
            + f"in {round(end-start, 2)} seconds."
        )
    return success, msg
def sync_pipe_inplace(self, pipe: "'mrsm.Pipe'", params: Optional[Dict[str, Any]] = None, begin: Optional[datetime] = None, end: Optional[datetime] = None, chunksize: Optional[int] = -1, check_existing: bool = True, debug: bool = False, **kw: Any) ‑> Tuple[bool, str]

If a pipe's connector is the same as its instance connector, it's more efficient to sync the pipe in-place rather than reading data into Pandas.

Parameters

pipe : mrsm.Pipe
The pipe whose connector is the same as its instance.
params : Optional[Dict[str, Any]], default None
Optional params dictionary to build the WHERE clause. See build_where().
begin : Optional[datetime], default None
Optionally specify the earliest datetime to search for data. Defaults to None.
end : Optional[datetime], default None
Optionally specify the latest datetime to search for data. Defaults to None.
chunksize : Optional[int], default -1
Specify the number of rows to sync per chunk. If -1, resort to system configuration (default is 900). A chunksize of None will sync all rows in one transaction. Defaults to -1.
check_existing : bool, default True
If True, pull and diff with existing data from the pipe.
debug : bool, default False
Verbosity toggle.

Returns

A SuccessTuple.

Expand source code
def sync_pipe_inplace(
        self,
        pipe: 'mrsm.Pipe',
        params: Optional[Dict[str, Any]] = None,
        begin: Optional[datetime] = None,
        end: Optional[datetime] = None,
        chunksize: Optional[int] = -1,
        check_existing: bool = True,
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """
    If a pipe's connector is the same as its instance connector,
    it's more efficient to sync the pipe in-place rather than reading data into Pandas.

    Parameters
    ----------
    pipe: mrsm.Pipe
        The pipe whose connector is the same as its instance.

    params: Optional[Dict[str, Any]], default None
        Optional params dictionary to build the `WHERE` clause.
        See `meerschaum.utils.sql.build_where`.

    begin: Optional[datetime], default None
        Optionally specify the earliest datetime to search for data.
        Defaults to `None`.

    end: Optional[datetime], default None
        Optionally specify the latest datetime to search for data.
        Defaults to `None`.

    chunksize: Optional[int], default -1
        Specify the number of rows to sync per chunk.
        If `-1`, resort to system configuration (default is `900`).
        A `chunksize` of `None` will sync all rows in one transaction.
        Defaults to `-1`.

    check_existing: bool, default True
        If `True`, pull and diff with existing data from the pipe.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A SuccessTuple.
    """
    from meerschaum.utils.sql import (
        sql_item_name,
        table_exists,
        get_sqlalchemy_table,
        get_update_queries,
        get_null_replacement,
        NO_CTE_FLAVORS,
        NO_SELECT_INTO_FLAVORS,
        format_cte_subquery,
        get_create_table_query,
    )
    from meerschaum.utils.dtypes.sql import (
        get_pd_type_from_db_type,
    )
    from meerschaum.utils.misc import generate_password
    from meerschaum.utils.debug import dprint
    metadef = self.get_pipe_metadef(
        pipe,
        params = params,
        begin = begin,
        end = end,
        check_existing = check_existing,
        debug = debug,
    )
    metadef_name = sql_item_name('metadef', self.flavor)
    pipe_name = sql_item_name(pipe.target, self.flavor)

    if not pipe.exists(debug=debug):
        create_pipe_query = get_create_table_query(metadef, pipe.target, self.flavor)
        result = self.exec(create_pipe_query, debug=debug)
        if result is None:
            return False, f"Could not insert new data into {pipe} from its SQL query definition."
        if not self.create_indices(pipe, debug=debug):
            if debug:
                dprint(f"Failed to create indices for {pipe}. Continuing...")

        rowcount = pipe.get_rowcount(debug=debug)
        return True, f"Inserted {rowcount}, updated 0 rows."

    ### Generate names for the tables.
    transact_id = generate_password(3)
    def get_temp_table_name(label: str) -> str:
        return '_' + transact_id + '_' + label + '_' + pipe.target

    backtrack_table_raw = get_temp_table_name('backtrack')
    backtrack_table_name = sql_item_name(backtrack_table_raw, self.flavor)
    new_table_raw = get_temp_table_name('new')
    new_table_name = sql_item_name(new_table_raw, self.flavor)
    delta_table_raw = get_temp_table_name('delta')
    delta_table_name = sql_item_name(delta_table_raw, self.flavor)
    joined_table_raw = get_temp_table_name('joined')
    joined_table_name = sql_item_name(joined_table_raw, self.flavor)
    unseen_table_raw = get_temp_table_name('unseen')
    unseen_table_name = sql_item_name(unseen_table_raw, self.flavor)
    update_table_raw = get_temp_table_name('update')
    update_table_name = sql_item_name(update_table_raw, self.flavor)
    metadef_name = sql_item_name('metadef', self.flavor)

    new_queries = []
    drop_new_query = f"DROP TABLE {new_table_name}"
    if table_exists(new_table_raw, self, debug=debug):
        new_queries.append(drop_new_query)

    create_new_query = get_create_table_query(metadef, new_table_raw, self.flavor)
    new_queries.append(create_new_query)

    new_success = all(self.exec_queries(new_queries, break_on_error=True, debug=debug))
    if not new_success:
        self.exec_queries([drop_new_query], break_on_error=False, debug=debug)
        return False, f"Could not fetch new data for {pipe}."

    new_table_obj = get_sqlalchemy_table(
        new_table_raw,
        connector = self,
        refresh = True,
        debug = debug,
    )
    new_cols = {
        str(col.name): get_pd_type_from_db_type(str(col.type))
        for col in new_table_obj.columns
    }

    add_cols_queries = self.get_add_columns_queries(pipe, new_cols, debug=debug)
    if add_cols_queries:
        if not self.exec_queries(add_cols_queries, debug=debug):
            warn(f"Failed to add new columns to {pipe}.")

    alter_cols_queries = self.get_alter_columns_queries(pipe, new_cols, debug=debug)
    if alter_cols_queries:
        if not self.exec_queries(alter_cols_queries, debug=debug):
            warn(f"Failed to alter columns for {pipe}.")
        else:
            _ = pipe.infer_dtypes(persist=True)

    if not check_existing:
        new_count = self.value(f"SELECT COUNT(*) FROM {new_table_name}", debug=debug)
        insert_queries = [
            (
                f"INSERT INTO {pipe_name}\n"
                + f"SELECT *\nFROM {new_table_name}"
            ),
            f"DROP TABLE {new_table_name}"
        ]
        if not self.exec_queries(insert_queries, debug=debug, break_on_error=False):
            return False, f"Failed to insert into rows into {pipe}."
        return True, f"Inserted {new_count}, updated 0 rows."


    backtrack_queries = []
    drop_backtrack_query = f"DROP TABLE {backtrack_table_name}"
    if table_exists(backtrack_table_raw, self, debug=debug):
        backtrack_queries.append(drop_backtrack_query)
    backtrack_def = self.get_pipe_data_query(
        pipe,
        begin = begin,
        end = end,
        begin_add_minutes = 0,
        end_add_minutes = 1,
        params = params,
        debug = debug,
        order = None,
    )

    select_backtrack_query = format_cte_subquery(
        backtrack_def,
        self.flavor,
        sub_name = 'backtrack_def',
    )
    create_backtrack_query = get_create_table_query(
        backtrack_def,
        backtrack_table_raw,
        self.flavor,
    )
    backtrack_queries.append(create_backtrack_query)
    backtrack_success = all(self.exec_queries(backtrack_queries, break_on_error=True, debug=debug))
    if not backtrack_success:
        self.exec_queries([drop_new_query, drop_backtrack_query], break_on_error=False, debug=debug)
        return False, f"Could not fetch backtrack data from {pipe}."

    ### Determine which index columns are present in both tables.
    backtrack_table_obj = get_sqlalchemy_table(
        backtrack_table_raw,
        connector = self,
        refresh = True,
        debug = debug,
    )
    backtrack_cols = {str(col.name): str(col.type) for col in backtrack_table_obj.columns}
    common_cols = [col for col in new_cols if col in backtrack_cols]
    on_cols = {
        col: new_cols.get(col, 'object')
        for col_key, col in pipe.columns.items()
        if (
            col
            and
            col_key != 'value'
            and col in backtrack_cols
            and col in new_cols
        )
    }

    delta_queries = []
    drop_delta_query = f"DROP TABLE {delta_table_name}"
    if table_exists(delta_table_raw, self, debug=debug):
        delta_queries.append(drop_delta_query)

    null_replace_new_cols_str = (
        ', '.join([
            f"COALESCE({new_table_name}.{sql_item_name(col, self.flavor)}, "
            + f"{get_null_replacement(typ, self.flavor)}) AS "
            + sql_item_name(col, self.flavor)
            for col, typ in new_cols.items()
        ])
    )

    select_delta_query = (
        f"SELECT\n"
        + null_replace_new_cols_str + "\n"
        + f"\nFROM {new_table_name}\n"
        + f"LEFT OUTER JOIN {backtrack_table_name}\nON\n"
        + '\nAND\n'.join([
            (
                f'COALESCE({new_table_name}.' + sql_item_name(c, self.flavor) + ", "
                + get_null_replacement(new_cols[c], self.flavor) + ") "
                + ' = '
                + f'COALESCE({backtrack_table_name}.' + sql_item_name(c, self.flavor) + ", "
                + get_null_replacement(backtrack_cols[c], self.flavor) + ") "
            ) for c in common_cols
        ])
        + "\nWHERE\n"
        + '\nAND\n'.join([
            (
                f'{backtrack_table_name}.' + sql_item_name(c, self.flavor) + ' IS NULL'
            ) for c in common_cols
        ])
    )
    create_delta_query = get_create_table_query(select_delta_query, delta_table_raw, self.flavor)
    delta_queries.append(create_delta_query)

    delta_success = all(self.exec_queries(delta_queries, break_on_error=True, debug=debug))
    if not delta_success:
        self.exec_queries(
            [
                drop_new_query,
                drop_backtrack_query,
                drop_delta_query,
            ],
            break_on_error = False,
            debug = debug,
        )
        return False, f"Could not filter data for {pipe}."

    delta_table_obj = get_sqlalchemy_table(
        delta_table_raw,
        connector = self,
        refresh = True,
        debug = debug,
    )
    delta_cols = {
        str(col.name): get_pd_type_from_db_type(str(col.type))
        for col in delta_table_obj.columns
    }

    joined_queries = []
    drop_joined_query = f"DROP TABLE {joined_table_name}"
    if on_cols and table_exists(joined_table_raw, self, debug=debug):
        joined_queries.append(drop_joined_query)

    select_joined_query = (
        "SELECT "
        + (', '.join([
            (
                f'{delta_table_name}.' + sql_item_name(c, self.flavor)
                + " AS " + sql_item_name(c + '_delta', self.flavor)
            ) for c in delta_cols
        ]))
        + ", "
        + (', '.join([
            (
                f'{backtrack_table_name}.' + sql_item_name(c, self.flavor)
                + " AS " + sql_item_name(c + '_backtrack', self.flavor)
            ) for c in backtrack_cols
        ]))
        + f"\nFROM {delta_table_name}\n"
        + f"LEFT OUTER JOIN {backtrack_table_name}\nON\n"
        + '\nAND\n'.join([
            (
                f'COALESCE({delta_table_name}.' + sql_item_name(c, self.flavor)
                + ", " + get_null_replacement(typ, self.flavor) + ")"
                + ' = '
                + f'COALESCE({backtrack_table_name}.' + sql_item_name(c, self.flavor)
                + ", " + get_null_replacement(typ, self.flavor) + ")"
            ) for c, typ in on_cols.items()
        ])
    )

    create_joined_query = get_create_table_query(select_joined_query, joined_table_raw, self.flavor)
    joined_queries.append(create_joined_query)

    joined_success = (
        all(self.exec_queries(joined_queries, break_on_error=True, debug=debug))
        if on_cols else True
    )
    if not joined_success:
        self.exec_queries(
            [
                drop_new_query,
                drop_backtrack_query,
                drop_delta_query,
                drop_joined_query,
            ],
            break_on_error = False,
            debug = debug,
        )
        return False, f"Could not separate new and updated data for {pipe}."

    unseen_queries = []
    drop_unseen_query = f"DROP TABLE {unseen_table_name}"
    if on_cols and table_exists(unseen_table_raw, self, debug=debug):
        unseen_queries.append(drop_unseen_query)

    select_unseen_query = (
        "SELECT "
        + (', '.join([
            (
                "CASE\n    WHEN " + sql_item_name(c + '_delta', self.flavor)
                + " != " + get_null_replacement(typ, self.flavor) 
                + " THEN " + sql_item_name(c + '_delta', self.flavor)
                + "\n    ELSE NULL\nEND "
                + " AS " + sql_item_name(c, self.flavor)
            ) for c, typ in delta_cols.items()
        ]))
        + f"\nFROM {joined_table_name}\n"
        + f"WHERE "
        + '\nAND\n'.join([
            (
                sql_item_name(c + '_backtrack', self.flavor) + ' IS NULL'
            ) for c in delta_cols
        ])
    )
    create_unseen_query = get_create_table_query(select_unseen_query, unseen_table_raw, self.flavor)
    unseen_queries.append(create_unseen_query)

    unseen_success = (
        all(self.exec_queries(unseen_queries, break_on_error=True, debug=debug))
        if on_cols else True
    )
    if not unseen_success:
        self.exec_queries(
            [
                drop_new_query,
                drop_backtrack_query,
                drop_delta_query,
                drop_joined_query,
                drop_unseen_query,
            ],
            break_on_error = False,
            debug = debug,
        )
        return False, f"Could not determine new data for {pipe}."
    unseen_count = self.value(
        (
            "SELECT COUNT(*) FROM "
            + (unseen_table_name if on_cols else delta_table_name)
        ), debug = debug,
    )

    update_queries = []
    drop_update_query = f"DROP TABLE {update_table_name}"
    if on_cols and table_exists(update_table_raw, self, debug=debug):
        update_queries.append(drop_unseen_query)

    select_update_query = (
        "SELECT "
        + (', '.join([
            (
                "CASE\n    WHEN " + sql_item_name(c + '_delta', self.flavor)
                + " != " + get_null_replacement(typ, self.flavor)
                + " THEN " + sql_item_name(c + '_delta', self.flavor)
                + "\n    ELSE NULL\nEND "
                + " AS " + sql_item_name(c, self.flavor)
            ) for c, typ in delta_cols.items()
        ]))
        + f"\nFROM {joined_table_name}\n"
        + f"WHERE "
        + '\nOR\n'.join([
            (
                sql_item_name(c + '_backtrack', self.flavor) + ' IS NOT NULL'
            ) for c in delta_cols
        ])
    )

    create_update_query = get_create_table_query(select_update_query, update_table_raw, self.flavor)
    update_queries.append(create_update_query)

    update_success = (
        all(self.exec_queries(update_queries, break_on_error=True, debug=debug))
        if on_cols else True
    )
    if not update_success:
        self.exec_queries(
            [
                drop_new_query,
                drop_backtrack_query,
                drop_delta_query,
                drop_joined_query,
                drop_unseen_query,
                drop_update_query,
            ],
            break_on_error = False,
            debug = debug,
        )
        return False, "Could not determine updated data for {pipe}."
    update_count = (
        self.value(f"SELECT COUNT(*) FROM {update_table_name}", debug=debug)
        if on_cols else 0
    )

    apply_update_queries = (
        get_update_queries(
            pipe.target,
            update_table_raw,
            self,
            on_cols,
            debug = debug
        )
        if on_cols else []
    )

    apply_unseen_queries = [
        (
            f"INSERT INTO {pipe_name}\n"
            + f"SELECT *\nFROM "
            + (
                unseen_table_name
                if on_cols
                else delta_table_name
            )
        ),
    ]

    apply_queries = (
        (apply_unseen_queries if unseen_count > 0 else [])
        + (apply_update_queries if update_count > 0 else [])
        + [
            drop_new_query,
            drop_backtrack_query,
            drop_delta_query,
        ] + (
            [
                drop_joined_query,
                drop_unseen_query,
                drop_update_query,
            ] if on_cols else []
        )
    )
    success = all(self.exec_queries(apply_queries, break_on_error=False, debug=debug))
    msg = (
        f"Was not able to apply changes to {pipe}."
        if not success else f"Inserted {unseen_count}, updated {update_count} rows."
    )
    return success, msg
def test_connection(self, **kw: Any) ‑> Optional[bool]

Test if a successful connection to the database may be made.

Parameters

**kw: The keyword arguments are passed to retry_connect().

Returns

True if a connection is made, otherwise False or None in case of failure.

Expand source code
def test_connection(
        self,
        **kw: Any
    ) -> Union[bool, None]:
    """
    Test if a successful connection to the database may be made.

    Parameters
    ----------
    **kw:
        The keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`.

    Returns
    -------
    `True` if a connection is made, otherwise `False` or `None` in case of failure.

    """
    import warnings
    from meerschaum.connectors.poll import retry_connect
    _default_kw = {'max_retries': 1, 'retry_wait': 0, 'warn': False, 'connector': self}
    _default_kw.update(kw)
    with warnings.catch_warnings():
        warnings.filterwarnings('ignore', 'Could not')
        try:
            return retry_connect(**_default_kw)
        except Exception as e:
            return False
def to_sql(self, df: pandas.DataFrame, name: str = None, index: bool = False, if_exists: str = 'replace', method: str = '', chunksize: Optional[int] = -1, silent: bool = False, debug: bool = False, as_tuple: bool = False, as_dict: bool = False, **kw) ‑> Union[bool, SuccessTuple]

Upload a DataFrame's contents to the SQL server.

Parameters

df : pd.DataFrame
The DataFrame to be uploaded.
name : str
The name of the table to be created.
index : bool, default False
If True, creates the DataFrame's indices as columns.
if_exists : str, default 'replace'
Drop and create the table ('replace') or append if it exists ('append') or raise Exception ('fail'). Options are ['replace', 'append', 'fail'].
method : str, default ''
None or multi. Details on pandas.to_sql.
as_tuple : bool, default False
If True, return a (success_bool, message) tuple instead of a bool. Defaults to False.
as_dict : bool, default False
If True, return a dictionary of transaction information. The keys are success, msg, start, end, duration, num_rows, chunksize, method, and target.
kw : Any
Additional arguments will be passed to the DataFrame's to_sql function

Returns

Either a bool or a SuccessTuple (depends on as_tuple).

Expand source code
def to_sql(
        self,
        df: pandas.DataFrame,
        name: str = None,
        index: bool = False,
        if_exists: str = 'replace',
        method: str = "",
        chunksize: Optional[int] = -1,
        silent: bool = False,
        debug: bool = False,
        as_tuple: bool = False,
        as_dict: bool = False,
        **kw
    ) -> Union[bool, SuccessTuple]:
    """
    Upload a DataFrame's contents to the SQL server.

    Parameters
    ----------
    df: pd.DataFrame
        The DataFrame to be uploaded.

    name: str
        The name of the table to be created.

    index: bool, default False
        If True, creates the DataFrame's indices as columns.

    if_exists: str, default 'replace'
        Drop and create the table ('replace') or append if it exists
        ('append') or raise Exception ('fail').
        Options are ['replace', 'append', 'fail'].

    method: str, default ''
        None or multi. Details on pandas.to_sql.

    as_tuple: bool, default False
        If `True`, return a (success_bool, message) tuple instead of a `bool`.
        Defaults to `False`.

    as_dict: bool, default False
        If `True`, return a dictionary of transaction information.
        The keys are `success`, `msg`, `start`, `end`, `duration`, `num_rows`, `chunksize`,
        `method`, and `target`.
        
    kw: Any
        Additional arguments will be passed to the DataFrame's `to_sql` function

    Returns
    -------
    Either a `bool` or a `SuccessTuple` (depends on `as_tuple`).
    """
    import time
    import json
    from meerschaum.utils.warnings import error, warn
    import warnings
    import functools
    if name is None:
        error(f"Name must not be `None` to insert data into {self}.")

    ### We're requiring `name` to be positional, and sometimes it's passed in from background jobs.
    kw.pop('name', None)

    from meerschaum.utils.sql import sql_item_name, table_exists, json_flavors, truncate_item_name
    from meerschaum.utils.dataframe import get_json_cols
    from meerschaum.utils.dtypes import are_dtypes_equal
    from meerschaum.connectors.sql._create_engine import flavor_configs
    from meerschaum.utils.packages import attempt_import, import_pandas
    sqlalchemy = attempt_import('sqlalchemy', debug=debug)
    pd = import_pandas()
    is_dask = 'dask' in df.__module__

    stats = {'target': name, }
    ### resort to defaults if None
    if method == "":
        if self.flavor in _bulk_flavors:
            method = psql_insert_copy
        else:
            ### Should resolve to 'multi' or `None`.
            method = flavor_configs.get(self.flavor, {}).get('to_sql', {}).get('method', 'multi')
    stats['method'] = method.__name__ if hasattr(method, '__name__') else str(method)

    default_chunksize = self._sys_config.get('chunksize', None)
    chunksize = chunksize if chunksize != -1 else default_chunksize
    if chunksize is not None and self.flavor in _max_chunks_flavors:
        if chunksize > _max_chunks_flavors[self.flavor]:
            if chunksize != default_chunksize:
                warn(
                    f"The specified chunksize of {chunksize} exceeds the maximum of "
                    + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n"
                    + f"    Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.",
                    stacklevel = 3,
                )
            chunksize = _max_chunks_flavors[self.flavor]
    stats['chunksize'] = chunksize

    success, msg = False, "Default to_sql message"
    start = time.perf_counter()
    if debug:
        msg = f"[{self}] Inserting {len(df)} rows with chunksize: {chunksize}..."
        print(msg, end="", flush=True)
    stats['num_rows'] = len(df)

    ### Check if the name is too long.
    truncated_name = truncate_item_name(name, self.flavor)
    if name != truncated_name:
        warn(
            f"Table '{name}' is too long for '{self.flavor}',"
            + f" will instead create the table '{truncated_name}'."
        )

    ### filter out non-pandas args
    import inspect
    to_sql_params = inspect.signature(df.to_sql).parameters
    to_sql_kw = {}
    for k, v in kw.items():
        if k in to_sql_params:
            to_sql_kw[k] = v

    to_sql_kw.update({
        'name': truncated_name,
        ('con' if not is_dask else 'uri'): (self.engine if not is_dask else self.URI),
        'index': index,
        'if_exists': if_exists,
        'method': method,
        'chunksize': chunksize,
    })
    if is_dask:
        to_sql_kw.update({
            'parallel': True,
        })

    if self.flavor == 'oracle':
        ### For some reason 'replace' doesn't work properly in pandas,
        ### so try dropping first.
        if if_exists == 'replace' and table_exists(name, self, debug=debug):
            success = self.exec("DROP TABLE " + sql_item_name(name, 'oracle')) is not None
            if not success:
                warn(f"Unable to drop {name}")


        ### Enforce NVARCHAR(2000) as text instead of CLOB.
        dtype = to_sql_kw.get('dtype', {})
        for col, typ in df.dtypes.items():
            if are_dtypes_equal(str(typ), 'object'):
                dtype[col] = sqlalchemy.types.NVARCHAR(2000)
            elif are_dtypes_equal(str(typ), 'int'):
                dtype[col] = sqlalchemy.types.INTEGER
        to_sql_kw['dtype'] = dtype
    elif self.flavor == 'mssql':
        dtype = to_sql_kw.get('dtype', {})
        for col, typ in df.dtypes.items():
            if are_dtypes_equal(str(typ), 'bool'):
                dtype[col] = sqlalchemy.types.INTEGER
        to_sql_kw['dtype'] = dtype

    ### Check for JSON columns.
    if self.flavor not in json_flavors:
        json_cols = get_json_cols(df)
        if json_cols:
            for col in json_cols:
                df[col] = df[col].apply(
                    (
                        lambda x: json.dumps(x, default=str, sort_keys=True)
                        if not isinstance(x, Hashable)
                        else x
                    )
                )


    try:
        with warnings.catch_warnings():
            warnings.filterwarnings('ignore', 'case sensitivity issues')
            df.to_sql(**to_sql_kw)
        success = True
    except Exception as e:
        if not silent:
            warn(str(e))
        success, msg = False, str(e)

    end = time.perf_counter()
    if success:
        msg = f"It took {round(end - start, 2)} seconds to sync {len(df)} rows to {name}."
    stats['start'] = start
    stats['end'] = end
    stats['duration'] = end - start

    if debug:
        print(f" done.", flush=True)
        dprint(msg)

    stats['success'] = success
    stats['msg'] = msg
    if as_tuple:
        return success, msg
    if as_dict:
        return stats
    return success
def value(self, query: str, *args: Any, use_pandas: bool = False, **kw: Any) ‑> Any

Execute the provided query and return the first value.

Parameters

query : str
The SQL query to execute.
*args : Any
The arguments passed to meerschaum.connectors.sql.SQLConnector.exec if use_pandas is False (default) or to meerschaum.connectors.sql.SQLConnector.read.
use_pandas : bool, default False
If True, use read(), otherwise use meerschaum.connectors.sql.SQLConnector.exec (default). NOTE: This is always True for DuckDB.
**kw : Any
See args.

Returns

Any value returned from the query.

Expand source code
def value(
        self,
        query: str,
        *args: Any,
        use_pandas: bool = False,
        **kw: Any
    ) -> Any:
    """
    Execute the provided query and return the first value.

    Parameters
    ----------
    query: str
        The SQL query to execute.
        
    *args: Any
        The arguments passed to `meerschaum.connectors.sql.SQLConnector.exec`
        if `use_pandas` is `False` (default) or to `meerschaum.connectors.sql.SQLConnector.read`.
        
    use_pandas: bool, default False
        If `True`, use `meerschaum.connectors.SQLConnector.read`, otherwise use
        `meerschaum.connectors.sql.SQLConnector.exec` (default).
        **NOTE:** This is always `True` for DuckDB.

    **kw: Any
        See `args`.

    Returns
    -------
    Any value returned from the query.

    """
    from meerschaum.utils.packages import attempt_import
    sqlalchemy = attempt_import('sqlalchemy')
    if self.flavor == 'duckdb':
        use_pandas = True
    if use_pandas:
        try:
            return self.read(query, *args, **kw).iloc[0, 0]
        except Exception as e:
            #  import traceback
            #  traceback.print_exc()
            #  warn(e)
            return None

    _close = kw.get('close', True)
    _commit = kw.get('commit', (self.flavor != 'mssql'))
    try:
        result, connection = self.exec(
            query,
            *args,
            with_connection=True,
            close=False,
            commit=_commit,
            **kw
        )
        first = result.first() if result is not None else None
        _val = first[0] if first is not None else None
    except Exception as e:
        warn(e, stacklevel=3)
        return None
    if _close:
        try:
            connection.close()
        except Exception as e:
            warn("Failed to close connection with exception:\n" + str(e))
    return _val