Module meerschaum.Pipe

Pipes are the primary metaphor of the Meerschaum system. You can interact with pipe data via meerschaum.Pipe objects.

If you are working with multiple pipes, it is highly recommended that you instead use meerschaum.utils.get_pipes (available as meerschaum.get_pipes) to create a dictionary of Pipe objects.

>>> from meerschaum import get_pipes
>>> pipes = get_pipes()

Examples

For the below examples to work, sql:remote_server must be defined (check with edit config) with correct credentials, as well as a network connection and valid permissions.

Manually Adding Data


>>> from meerschaum import Pipe
>>> pipe = Pipe('csv', 'energy')
>>> 
>>> ### Columns only need to be defined if you're creating a new pipe.
>>> pipe.columns = { 'datetime' : 'time', 'id' : 'station_id' }
>>> 
>>> ### Create a Pandas DataFrame somehow,
>>> ### or you can use a dictionary of lists instead.
>>> df = pd.read_csv('data.csv')
>>> 
>>> pipe.sync(df)

Registering a Remote Pipe


>>> from meerschaum import Pipe
>>> pipe = Pipe('sql:remote_server', 'energy')
>>> 
>>> pipe.attributes = {
...     'fetch' : {
...         'definition' : 'SELECT * FROM energy_table',
...     },
... }
>>> 
>>> ### Columns are a subset of attributes, so define columns
>>> ### after defining attributes.
>>> pipe.columns = { 'datetime' : 'time', 'id' : 'station_id' }
>>> 
>>> pipe.sync()
Expand source code
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8

"""
Pipes are the primary metaphor of the Meerschaum system.
You can interact with pipe data via `meerschaum.Pipe` objects.

If you are working with multiple pipes, it is highly recommended that you instead use
`meerschaum.utils.get_pipes` (available as `meerschaum.get_pipes`)
to create a dictionary of Pipe objects.

```
>>> from meerschaum import get_pipes
>>> pipes = get_pipes()
```

# Examples
For the below examples to work, `sql:remote_server` must be defined (check with `edit config`)
with correct credentials, as well as a network connection and valid permissions.

## Manually Adding Data
---

```
>>> from meerschaum import Pipe
>>> pipe = Pipe('csv', 'energy')
>>> 
>>> ### Columns only need to be defined if you're creating a new pipe.
>>> pipe.columns = { 'datetime' : 'time', 'id' : 'station_id' }
>>> 
>>> ### Create a Pandas DataFrame somehow,
>>> ### or you can use a dictionary of lists instead.
>>> df = pd.read_csv('data.csv')
>>> 
>>> pipe.sync(df)
```

## Registering a Remote Pipe
---

```
>>> from meerschaum import Pipe
>>> pipe = Pipe('sql:remote_server', 'energy')
>>> 
>>> pipe.attributes = {
...     'fetch' : {
...         'definition' : 'SELECT * FROM energy_table',
...     },
... }
>>> 
>>> ### Columns are a subset of attributes, so define columns
>>> ### after defining attributes.
>>> pipe.columns = { 'datetime' : 'time', 'id' : 'station_id' }
>>> 
>>> pipe.sync()
```

"""

from __future__ import annotations
from meerschaum.utils.typing import Optional, Dict, Any, Union, InstanceConnector

class Pipe:
    """
    Access Meerschaum pipes via Pipe objects.
    
    Pipes are identified by the following:

    1. Connector keys (e.g. `'sql:main'`)
    2. Metric key (e.g. `'weather'`)
    3. Location (optional; e.g. `None`)
    
    A pipe's connector keys correspond to a data source, and when the pipe is synced,
    its `fetch` definition is evaluated and executed to produce new data.
    
    Alternatively, new data may be directly synced via `pipe.sync()`:
    
    ```
    >>> from meerschaum import Pipe
    >>> pipe = Pipe('csv', 'weather')
    >>>
    >>> import pandas as pd
    >>> df = pd.read_csv('weather.csv')
    >>> pipe.sync(df)
    ```
    """

    from ._fetch import fetch
    from ._data import get_data, get_backtrack_data, get_rowcount
    from ._register import register
    from ._attributes import (
        attributes,
        parameters,
        columns,
        get_columns,
        get_columns_types,
        get_id,
        id,
        get_val_column,
        parents,
    )
    from ._show import show
    from ._edit import edit, edit_definition
    from ._sync import sync, get_sync_time, exists, filter_existing
    from ._delete import delete
    from ._drop import drop
    from ._clear import clear
    from ._bootstrap import bootstrap

    def __init__(
        self,
        connector_keys: str,
        metric_key: str,
        location_key: Optional[str] = None,
        parameters: Optional[Dict[str, Any]] = None,
        columns: Optional[Dict[str, str]] = None,
        mrsm_instance: Optional[Union[str, InstanceConnector]] = None,
        instance: Optional[Union[str, InstanceConnector]] = None,
        cache: bool = False,
        debug: bool = False
    ):
        """
        Parameters
        ----------

        connector_keys: str
            Keys for the pipe's source connector, e.g. `'sql:main'`.

        metric_key: str
            Label for the pipe's contents, e.g. `'weather'`.

        location_key: str, default None
            Label for the pipe's location. Defaults to `None`.

        parameters: Optional[Dict[str, Any]], default None
            Optionally set a pipe's parameters from the constructor,
            e.g. columns and other attributes.
            Defaults to `None`.

        columns: Optional[Dict[str, str]], default None
            Subset of parameters for ease of use.
            If `parameters` is provided, `columns` has not effect.
            Defaults to `None`.

        mrsm_instance: Optional[Union[str, InstanceConnector]], default None
            Connector for the Meerschaum instance where the pipe resides.
            Defaults to the preconfigured default instance (`'sql:main'`).

        instance: Optional[Union[str, InstanceConnector]], default None
            Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored.

        cache: bool, default False
            If `True`, cache fetched data into a local database file.
            Experimental features must be enabled.
            You can enable experimental caching under `system:experimental:cache`.
            Defaults to `False`.
        """
        if location_key in ('[None]', 'None'):
            location_key = None
        self.connector_keys = connector_keys
        self.metric_key = metric_key
        self.location_key = location_key

        ### only set parameters if values are provided
        if parameters is not None:
            self._parameters = parameters

        if columns is not None:
            if self.__dict__.get('_parameters', None) is None:
                self._parameters = {}
            self._parameters['columns'] = columns

        ### NOTE: The parameters dictionary is {} by default.
        ###       A Pipe may be registered without parameters, then edited,
        ###       or a Pipe may be registered with parameters set in-memory first.
        from meerschaum.config import get_config
        _mrsm_instance = mrsm_instance if mrsm_instance is not None else instance
        if _mrsm_instance is None:
            _mrsm_instance = get_config('meerschaum', 'instance', patch=True)
        if not isinstance(_mrsm_instance, str):
            self._instance_connector = _mrsm_instance
            self.instance_keys = str(_mrsm_instance)
        else: ### NOTE: must be SQL or API Connector for this work
            self.instance_keys = _mrsm_instance

        self._cache = cache and get_config('system', 'experimental', 'cache')

    @property
    def meta(self):
        """Simulate the MetaPipe model without importing FastAPI."""
        refresh = False
        if '_meta' not in self.__dict__:
            refresh = True
        #  elif self.parameters != self.__dict__['_meta']['parameters']:
            #  refresh = True

        if refresh:
            #  parameters = self.parameters
            #  if parameters is None:
                #  parameters = dict()
            self._meta = {
                'connector_keys' : self.connector_keys,
                'metric_key'     : self.metric_key,
                'location_key'   : self.location_key,
                #  'parameters'     : parameters,
                'instance'       : self.instance_keys,
            }
        return self._meta

    @property
    def instance_connector(self) -> Union[InstanceConnector, None]:
        """
        The connector to where this pipe resides.
        May either be of type `'sql'` (`meerschaum.connectors.sql.SQLConnector` or of type `'api'`
        (`meerschaum.connectors.api.APIConnector`).
        """
        if '_instance_connector' not in self.__dict__:
            from meerschaum.connectors.parse import parse_instance_keys
            conn = parse_instance_keys(self.instance_keys)
            if conn:
                self._instance_connector = conn
            else:
                return None
        return self._instance_connector

    @property
    def connector(self) -> Union[meerschaum.connectors.Connector, None]:
        """
        The connector to the data source.
        May be of type `'sql'`, `'api`', `'mqtt'`, or `'plugin'`.
        """
        if '_connector' not in self.__dict__:
            from meerschaum.connectors.parse import parse_instance_keys
            conn = parse_instance_keys(self.connector_keys)
            if conn:
                self._connector = conn
            else:
                return None
        return self._connector

    @property
    def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]:
        """
        If the pipe was created with `cache=True`, return the connector to the pipe's
        SQLite database for caching.
        """
        if not self._cache:
            return None

        if '_cache_connector' not in self.__dict__:
            from meerschaum.connectors import get_connector
            from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH
            _resources_path = SQLITE_RESOURCES_PATH
            self._cache_connector = get_connector(
                'sql', '_cache_' + str(self),
                flavor='sqlite',
                database=str(_resources_path / ('_cache_' + str(self) + '.db')),
            )

        return self._cache_connector

    @property
    def cache_pipe(self) -> Union['meerschaum.Pipe.Pipe', None]:
        """
        If the pipe was created with `cache=True`, return another `meerschaum.Pipe.Pipe` used to
        manage the local data.
        """
        if self.cache_connector is None:
            return None
        if '_cache_pipe' not in self.__dict__:
            from meerschaum import Pipe
            from meerschaum.config._patch import apply_patch_to_config
            from meerschaum.connectors.sql.tools import sql_item_name
            _parameters = self.parameters.copy()
            _fetch_patch = {
                'fetch': ({
                    'definition': (
                        f"SELECT * FROM {sql_item_name(str(self), self.instance_connector.flavor)}"
                    ),
                }) if self.instance_connector.type == 'sql' else ({
                    'connector_keys': self.connector_keys,
                    'metric_key': self.metric_key,
                    'location_key': self.location_key,
                })
            }
            _parameters = apply_patch_to_config(_parameters, _fetch_patch)
            self._cache_pipe = Pipe(
                self.instance_keys,
                (self.connector_keys + '_' + self.metric_key + '_cache'),
                self.location_key,
                mrsm_instance=self.cache_connector,
                parameters=_parameters,
                cache=False,
            )

        return self._cache_pipe

    @property
    def sync_time(self) -> Union[datetime.datetime, None]:
        """
        Convenience function to get the pipe's latest datetime.
        Use `meerschaum.Pipe.Pipe.get_sync_time()` instead.
        """
        return self.get_sync_time()

    def __str__(self):
        """
        The Pipe's SQL table name. Converts the `':'` in the `connector_keys` to an `'_'`.
        """
        name = f"{self.connector_keys.replace(':', '_')}_{self.metric_key}"
        if self.location_key is not None:
            name += f"_{self.location_key}"
        return name

    def __eq__(self, other):
        try:
            return (
                type(self) == type(other)
                and self.connector_keys == other.connector_keys
                and self.metric_key == other.metric_key
                and self.location_key == other.location_key
                and self.instance_keys == other.instance_keys
            )
        except Exception as e:
            return False

    def __hash__(self):
        ### Using an esoteric separator to avoid collisions.
        sep = "[\"']"
        return hash(
            str(self.connector_keys) + sep
            + str(self.metric_key) + sep
            + str(self.location_key) + sep
            + str(self.instance_keys) + sep
        )

    def __repr__(self):
        return str(self)

    def __getstate__(self):
        """
        Define the state dictionary (pickling).
        """
        state = {
            'connector_keys' : self.connector_keys,
            'metric_key' : self.metric_key,
            'location_key' : self.location_key,
            'parameters' : self.parameters,
            'mrsm_instance' :  self.instance_keys,
        }
        return state

    def __setstate__(self, _state : dict):
        """
        Read the state (unpickling).
        """
        self.__init__(**_state)

Classes

class Pipe (connector_keys: str, metric_key: str, location_key: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None, columns: Optional[Dict[str, str]] = None, mrsm_instance: Optional[Union[str, InstanceConnector]] = None, instance: Optional[Union[str, InstanceConnector]] = None, cache: bool = False, debug: bool = False)

Access Meerschaum pipes via Pipe objects.

Pipes are identified by the following:

  1. Connector keys (e.g. 'sql:main')
  2. Metric key (e.g. 'weather')
  3. Location (optional; e.g. None)

A pipe's connector keys correspond to a data source, and when the pipe is synced, its fetch definition is evaluated and executed to produce new data.

Alternatively, new data may be directly synced via pipe.sync():

>>> from meerschaum import Pipe
>>> pipe = Pipe('csv', 'weather')
>>>
>>> import pandas as pd
>>> df = pd.read_csv('weather.csv')
>>> pipe.sync(df)

Parameters

connector_keys : str
Keys for the pipe's source connector, e.g. 'sql:main'.
metric_key : str
Label for the pipe's contents, e.g. 'weather'.
location_key : str, default None
Label for the pipe's location. Defaults to None.
parameters : Optional[Dict[str, Any]], default None
Optionally set a pipe's parameters from the constructor, e.g. columns and other attributes. Defaults to None.
columns : Optional[Dict[str, str]], default None
Subset of parameters for ease of use. If parameters is provided, columns has not effect. Defaults to None.
mrsm_instance : Optional[Union[str, InstanceConnector]], default None
Connector for the Meerschaum instance where the pipe resides. Defaults to the preconfigured default instance ('sql:main').
instance : Optional[Union[str, InstanceConnector]], default None
Alias for mrsm_instance. If mrsm_instance is supplied, this value is ignored.
cache : bool, default False
If True, cache fetched data into a local database file. Experimental features must be enabled. You can enable experimental caching under system:experimental:cache. Defaults to False.
Expand source code
class Pipe:
    """
    Access Meerschaum pipes via Pipe objects.
    
    Pipes are identified by the following:

    1. Connector keys (e.g. `'sql:main'`)
    2. Metric key (e.g. `'weather'`)
    3. Location (optional; e.g. `None`)
    
    A pipe's connector keys correspond to a data source, and when the pipe is synced,
    its `fetch` definition is evaluated and executed to produce new data.
    
    Alternatively, new data may be directly synced via `pipe.sync()`:
    
    ```
    >>> from meerschaum import Pipe
    >>> pipe = Pipe('csv', 'weather')
    >>>
    >>> import pandas as pd
    >>> df = pd.read_csv('weather.csv')
    >>> pipe.sync(df)
    ```
    """

    from ._fetch import fetch
    from ._data import get_data, get_backtrack_data, get_rowcount
    from ._register import register
    from ._attributes import (
        attributes,
        parameters,
        columns,
        get_columns,
        get_columns_types,
        get_id,
        id,
        get_val_column,
        parents,
    )
    from ._show import show
    from ._edit import edit, edit_definition
    from ._sync import sync, get_sync_time, exists, filter_existing
    from ._delete import delete
    from ._drop import drop
    from ._clear import clear
    from ._bootstrap import bootstrap

    def __init__(
        self,
        connector_keys: str,
        metric_key: str,
        location_key: Optional[str] = None,
        parameters: Optional[Dict[str, Any]] = None,
        columns: Optional[Dict[str, str]] = None,
        mrsm_instance: Optional[Union[str, InstanceConnector]] = None,
        instance: Optional[Union[str, InstanceConnector]] = None,
        cache: bool = False,
        debug: bool = False
    ):
        """
        Parameters
        ----------

        connector_keys: str
            Keys for the pipe's source connector, e.g. `'sql:main'`.

        metric_key: str
            Label for the pipe's contents, e.g. `'weather'`.

        location_key: str, default None
            Label for the pipe's location. Defaults to `None`.

        parameters: Optional[Dict[str, Any]], default None
            Optionally set a pipe's parameters from the constructor,
            e.g. columns and other attributes.
            Defaults to `None`.

        columns: Optional[Dict[str, str]], default None
            Subset of parameters for ease of use.
            If `parameters` is provided, `columns` has not effect.
            Defaults to `None`.

        mrsm_instance: Optional[Union[str, InstanceConnector]], default None
            Connector for the Meerschaum instance where the pipe resides.
            Defaults to the preconfigured default instance (`'sql:main'`).

        instance: Optional[Union[str, InstanceConnector]], default None
            Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored.

        cache: bool, default False
            If `True`, cache fetched data into a local database file.
            Experimental features must be enabled.
            You can enable experimental caching under `system:experimental:cache`.
            Defaults to `False`.
        """
        if location_key in ('[None]', 'None'):
            location_key = None
        self.connector_keys = connector_keys
        self.metric_key = metric_key
        self.location_key = location_key

        ### only set parameters if values are provided
        if parameters is not None:
            self._parameters = parameters

        if columns is not None:
            if self.__dict__.get('_parameters', None) is None:
                self._parameters = {}
            self._parameters['columns'] = columns

        ### NOTE: The parameters dictionary is {} by default.
        ###       A Pipe may be registered without parameters, then edited,
        ###       or a Pipe may be registered with parameters set in-memory first.
        from meerschaum.config import get_config
        _mrsm_instance = mrsm_instance if mrsm_instance is not None else instance
        if _mrsm_instance is None:
            _mrsm_instance = get_config('meerschaum', 'instance', patch=True)
        if not isinstance(_mrsm_instance, str):
            self._instance_connector = _mrsm_instance
            self.instance_keys = str(_mrsm_instance)
        else: ### NOTE: must be SQL or API Connector for this work
            self.instance_keys = _mrsm_instance

        self._cache = cache and get_config('system', 'experimental', 'cache')

    @property
    def meta(self):
        """Simulate the MetaPipe model without importing FastAPI."""
        refresh = False
        if '_meta' not in self.__dict__:
            refresh = True
        #  elif self.parameters != self.__dict__['_meta']['parameters']:
            #  refresh = True

        if refresh:
            #  parameters = self.parameters
            #  if parameters is None:
                #  parameters = dict()
            self._meta = {
                'connector_keys' : self.connector_keys,
                'metric_key'     : self.metric_key,
                'location_key'   : self.location_key,
                #  'parameters'     : parameters,
                'instance'       : self.instance_keys,
            }
        return self._meta

    @property
    def instance_connector(self) -> Union[InstanceConnector, None]:
        """
        The connector to where this pipe resides.
        May either be of type `'sql'` (`meerschaum.connectors.sql.SQLConnector` or of type `'api'`
        (`meerschaum.connectors.api.APIConnector`).
        """
        if '_instance_connector' not in self.__dict__:
            from meerschaum.connectors.parse import parse_instance_keys
            conn = parse_instance_keys(self.instance_keys)
            if conn:
                self._instance_connector = conn
            else:
                return None
        return self._instance_connector

    @property
    def connector(self) -> Union[meerschaum.connectors.Connector, None]:
        """
        The connector to the data source.
        May be of type `'sql'`, `'api`', `'mqtt'`, or `'plugin'`.
        """
        if '_connector' not in self.__dict__:
            from meerschaum.connectors.parse import parse_instance_keys
            conn = parse_instance_keys(self.connector_keys)
            if conn:
                self._connector = conn
            else:
                return None
        return self._connector

    @property
    def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]:
        """
        If the pipe was created with `cache=True`, return the connector to the pipe's
        SQLite database for caching.
        """
        if not self._cache:
            return None

        if '_cache_connector' not in self.__dict__:
            from meerschaum.connectors import get_connector
            from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH
            _resources_path = SQLITE_RESOURCES_PATH
            self._cache_connector = get_connector(
                'sql', '_cache_' + str(self),
                flavor='sqlite',
                database=str(_resources_path / ('_cache_' + str(self) + '.db')),
            )

        return self._cache_connector

    @property
    def cache_pipe(self) -> Union['meerschaum.Pipe.Pipe', None]:
        """
        If the pipe was created with `cache=True`, return another `meerschaum.Pipe.Pipe` used to
        manage the local data.
        """
        if self.cache_connector is None:
            return None
        if '_cache_pipe' not in self.__dict__:
            from meerschaum import Pipe
            from meerschaum.config._patch import apply_patch_to_config
            from meerschaum.connectors.sql.tools import sql_item_name
            _parameters = self.parameters.copy()
            _fetch_patch = {
                'fetch': ({
                    'definition': (
                        f"SELECT * FROM {sql_item_name(str(self), self.instance_connector.flavor)}"
                    ),
                }) if self.instance_connector.type == 'sql' else ({
                    'connector_keys': self.connector_keys,
                    'metric_key': self.metric_key,
                    'location_key': self.location_key,
                })
            }
            _parameters = apply_patch_to_config(_parameters, _fetch_patch)
            self._cache_pipe = Pipe(
                self.instance_keys,
                (self.connector_keys + '_' + self.metric_key + '_cache'),
                self.location_key,
                mrsm_instance=self.cache_connector,
                parameters=_parameters,
                cache=False,
            )

        return self._cache_pipe

    @property
    def sync_time(self) -> Union[datetime.datetime, None]:
        """
        Convenience function to get the pipe's latest datetime.
        Use `meerschaum.Pipe.Pipe.get_sync_time()` instead.
        """
        return self.get_sync_time()

    def __str__(self):
        """
        The Pipe's SQL table name. Converts the `':'` in the `connector_keys` to an `'_'`.
        """
        name = f"{self.connector_keys.replace(':', '_')}_{self.metric_key}"
        if self.location_key is not None:
            name += f"_{self.location_key}"
        return name

    def __eq__(self, other):
        try:
            return (
                type(self) == type(other)
                and self.connector_keys == other.connector_keys
                and self.metric_key == other.metric_key
                and self.location_key == other.location_key
                and self.instance_keys == other.instance_keys
            )
        except Exception as e:
            return False

    def __hash__(self):
        ### Using an esoteric separator to avoid collisions.
        sep = "[\"']"
        return hash(
            str(self.connector_keys) + sep
            + str(self.metric_key) + sep
            + str(self.location_key) + sep
            + str(self.instance_keys) + sep
        )

    def __repr__(self):
        return str(self)

    def __getstate__(self):
        """
        Define the state dictionary (pickling).
        """
        state = {
            'connector_keys' : self.connector_keys,
            'metric_key' : self.metric_key,
            'location_key' : self.location_key,
            'parameters' : self.parameters,
            'mrsm_instance' :  self.instance_keys,
        }
        return state

    def __setstate__(self, _state : dict):
        """
        Read the state (unpickling).
        """
        self.__init__(**_state)

Instance variables

var attributes : Union[Dict[str, Any], NoneType]

Return a dictionary of a pipe's keys and parameters. Is a superset of Pipe.parameters.

Expand source code
@property
def attributes(self) -> Optional[Dict[str, Any]]:
    """
    Return a dictionary of a pipe's keys and parameters.
    Is a superset of `meerschaum.Pipe.Pipe.parameters`.
    """
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.warnings import warn
    if '_attributes' not in self.__dict__:
        if self.id is None:
            return None
        self._attributes = self.instance_connector.get_pipe_attributes(self)
    return self._attributes
var cache_connector : Union[meerschaum.connectors.sql.SQLConnector, None]

If the pipe was created with cache=True, return the connector to the pipe's SQLite database for caching.

Expand source code
@property
def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]:
    """
    If the pipe was created with `cache=True`, return the connector to the pipe's
    SQLite database for caching.
    """
    if not self._cache:
        return None

    if '_cache_connector' not in self.__dict__:
        from meerschaum.connectors import get_connector
        from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH
        _resources_path = SQLITE_RESOURCES_PATH
        self._cache_connector = get_connector(
            'sql', '_cache_' + str(self),
            flavor='sqlite',
            database=str(_resources_path / ('_cache_' + str(self) + '.db')),
        )

    return self._cache_connector
var cache_pipe : Union[Pipe, NoneType]

If the pipe was created with cache=True, return another Pipe used to manage the local data.

Expand source code
@property
def cache_pipe(self) -> Union['meerschaum.Pipe.Pipe', None]:
    """
    If the pipe was created with `cache=True`, return another `meerschaum.Pipe.Pipe` used to
    manage the local data.
    """
    if self.cache_connector is None:
        return None
    if '_cache_pipe' not in self.__dict__:
        from meerschaum import Pipe
        from meerschaum.config._patch import apply_patch_to_config
        from meerschaum.connectors.sql.tools import sql_item_name
        _parameters = self.parameters.copy()
        _fetch_patch = {
            'fetch': ({
                'definition': (
                    f"SELECT * FROM {sql_item_name(str(self), self.instance_connector.flavor)}"
                ),
            }) if self.instance_connector.type == 'sql' else ({
                'connector_keys': self.connector_keys,
                'metric_key': self.metric_key,
                'location_key': self.location_key,
            })
        }
        _parameters = apply_patch_to_config(_parameters, _fetch_patch)
        self._cache_pipe = Pipe(
            self.instance_keys,
            (self.connector_keys + '_' + self.metric_key + '_cache'),
            self.location_key,
            mrsm_instance=self.cache_connector,
            parameters=_parameters,
            cache=False,
        )

    return self._cache_pipe
var columns : Union[Dict[str, str], NoneType]

If defined, return the columns dictionary defined in Pipe.parameters.

Expand source code
@property
def columns(self) -> Union[Dict[str, str], None]:
    """
    If defined, return the `columns` dictionary defined in `meerschaum.Pipe.Pipe.parameters`.
    """
    if not self.parameters:
        if '_columns' in self.__dict__:
            return self._columns
        return None
    if 'columns' not in self.parameters:
        return None
    return self.parameters['columns']
var connector : Union[meerschaum.connectors.Connector, None]

The connector to the data source. May be of type 'sql', 'api', 'mqtt', or 'plugin'.

Expand source code
@property
def connector(self) -> Union[meerschaum.connectors.Connector, None]:
    """
    The connector to the data source.
    May be of type `'sql'`, `'api`', `'mqtt'`, or `'plugin'`.
    """
    if '_connector' not in self.__dict__:
        from meerschaum.connectors.parse import parse_instance_keys
        conn = parse_instance_keys(self.connector_keys)
        if conn:
            self._connector = conn
        else:
            return None
    return self._connector
var id : Union[int, NoneType]

Fetch and cache a pipe's ID.

Expand source code
@property
def id(self) -> Union[int, None]:
    """
    Fetch and cache a pipe's ID.
    """
    if not ('_id' in self.__dict__ and self._id):
        self._id = self.get_id()
    return self._id
var instance_connector : Union[meerschaum.connectors.sql.SQLConnectormeerschaum.connectors.api.APIConnector, NoneType]

The connector to where this pipe resides. May either be of type 'sql' (meerschaum.connectors.sql.SQLConnector or of type 'api' (meerschaum.connectors.api.APIConnector).

Expand source code
@property
def instance_connector(self) -> Union[InstanceConnector, None]:
    """
    The connector to where this pipe resides.
    May either be of type `'sql'` (`meerschaum.connectors.sql.SQLConnector` or of type `'api'`
    (`meerschaum.connectors.api.APIConnector`).
    """
    if '_instance_connector' not in self.__dict__:
        from meerschaum.connectors.parse import parse_instance_keys
        conn = parse_instance_keys(self.instance_keys)
        if conn:
            self._instance_connector = conn
        else:
            return None
    return self._instance_connector
var meta

Simulate the MetaPipe model without importing FastAPI.

Expand source code
@property
def meta(self):
    """Simulate the MetaPipe model without importing FastAPI."""
    refresh = False
    if '_meta' not in self.__dict__:
        refresh = True
    #  elif self.parameters != self.__dict__['_meta']['parameters']:
        #  refresh = True

    if refresh:
        #  parameters = self.parameters
        #  if parameters is None:
            #  parameters = dict()
        self._meta = {
            'connector_keys' : self.connector_keys,
            'metric_key'     : self.metric_key,
            'location_key'   : self.location_key,
            #  'parameters'     : parameters,
            'instance'       : self.instance_keys,
        }
    return self._meta
var parameters : Union[Dict[str, Any], NoneType]

Return the parameters dictionary of the pipe.

Expand source code
@property
def parameters(self) -> Optional[Dict[str, Any]]:
    """
    Return the parameters dictionary of the pipe.
    """
    if '_parameters' not in self.__dict__:
        if not self.attributes:
            return None
        self._parameters = self.attributes['parameters']
    return self._parameters
var parents : List[Pipe]

Return a list of Pipe objects. These pipes will be synced before this pipe.

NOTE: Not yet in use!

Expand source code
@property
def parents(self) -> List[meerschaum.Pipe.Pipe]:
    """
    Return a list of `meerschaum.Pipe.Pipe` objects.
    These pipes will be synced before this pipe.

    NOTE: Not yet in use!
    """
    if 'parents' not in self.parameters:
        return []
    from meerschaum.utils.warnings import warn
    _parents_keys = self.parameters['parents']
    if not isinstance(_parents_keys, list):
        warn(
            f"Please ensure the parents for pipe '{self}' are defined as a list of keys.",
            stacklevel = 4
        )
        return []
    from meerschaum import Pipe
    _parents = []
    for keys in _parents_keys:
        try:
            p = Pipe(**keys)
        except Exception as e:
            warn(f"Unable to build parent with keys '{keys}' for pipe '{self}':\n{e}")
            continue
        _parents.append(p)
    return _parents
var sync_time : Union[datetime.datetime, None]

Convenience function to get the pipe's latest datetime. Use get_sync_time() instead.

Expand source code
@property
def sync_time(self) -> Union[datetime.datetime, None]:
    """
    Convenience function to get the pipe's latest datetime.
    Use `meerschaum.Pipe.Pipe.get_sync_time()` instead.
    """
    return self.get_sync_time()

Methods

def bootstrap(self, debug: bool = False, yes: bool = False, force: bool = False, noask: bool = False, shell: bool = False, **kw) ‑> Tuple[bool, str]

Prompt the user to create a pipe's requirements all from one method. This method shouldn't be used in any automated scripts because it interactively prompts the user and therefore may hang.

Parameters

debug : bool, default False:
Verbosity toggle.
yes : bool, default False:
Print the questions and automatically agree.
force : bool, default False:
Skip the questions and agree anyway.
noask : bool, default False:
Print the questions but go with the default answer.
shell : bool, default False:
Used to determine if we are in the interactive shell.

Returns

A SuccessTuple corresponding to the success of this procedure.

Expand source code
def bootstrap(
        self,
        debug: bool = False,
        yes: bool = False,
        force: bool = False,
        noask: bool = False,
        shell: bool = False,
        **kw
    ) -> SuccessTuple:
    """
    Prompt the user to create a pipe's requirements all from one method.
    This method shouldn't be used in any automated scripts because it interactively
    prompts the user and therefore may hang.

    Parameters
    ----------
    debug: bool, default False:
        Verbosity toggle.

    yes: bool, default False:
        Print the questions and automatically agree.

    force: bool, default False:
        Skip the questions and agree anyway.

    noask: bool, default False:
        Print the questions but go with the default answer.

    shell: bool, default False:
        Used to determine if we are in the interactive shell.
        
    Returns
    -------
    A `SuccessTuple` corresponding to the success of this procedure.

    """

    from meerschaum.utils.warnings import warn, info, error
    from meerschaum.utils.prompt import prompt, yes_no
    from meerschaum.utils.formatting import pprint
    from meerschaum.config import get_config
    from meerschaum.utils.formatting._shell import clear_screen
    from meerschaum.utils.formatting import print_tuple
    from meerschaum.actions import actions

    _clear = get_config('shell', 'clear_screen', patch=True)

    if self.get_id(debug=debug) is not None:
        delete_tuple = self.delete(debug=debug)
        if not delete_tuple[0]:
            return delete_tuple

    if _clear:
        clear_screen(debug=debug)

    _parameters = _get_parameters(self, debug=debug)
    self.parameters = _parameters
    pprint(self.parameters)
    try:
        prompt(
            f"\n    Press [Enter] to register pipe '{self}' with the above configuration:",
            icon = False
        )
    except KeyboardInterrupt as e:
        return False, f"Aborting bootstrapping pipe '{self}'."
    register_tuple = self.instance_connector.register_pipe(self, debug=debug)
    if not register_tuple[0]:
        return register_tuple

    if _clear:
        clear_screen(debug=debug)

    try:
        if yes_no(
            f"Would you like to edit the definition for pipe '{self}'?", yes=yes, noask=noask
        ):
            edit_tuple = self.edit_definition(debug=debug)
            if not edit_tuple[0]:
                return edit_tuple

        if yes_no(f"Would you like to try syncing pipe '{self}' now?", yes=yes, noask=noask):
            #  sync_tuple = self.sync(debug=debug)
            sync_tuple = actions['sync'](
                ['pipes'],
                connector_keys = [self.connector_keys],
                metric_keys = [self.metric_key],
                location_keys = [self.location_key],
                mrsm_instance = str(self.instance_connector),
                debug = debug,
                shell = shell,
            )
            if not sync_tuple[0]:
                return sync_tuple
    except Exception as e:
        return False, f"Failed to bootstrap pipe '{self}':\n" + str(e)

    print_tuple((True, f"Finished bootstrapping pipe '{self}'!"))
    info(
        f"You can edit this pipe later with `edit pipes` or set the definition with `edit pipes definition`.\n" +
        "    To sync data into your pipe, run `sync pipes`."
    )

    return True, "Success"
def clear(self, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, debug: bool = False, **kw: Any) ‑> SuccessTuple

Call the Pipe's instance connector's clear_pipe method.

Parameters

begin : Optional[datetime.datetime], default None:
If provided, only remove rows newer than this datetime value.
end : Optional[datetime.datetime], default None:
If provided, only remove rows older than this datetime column (not including end).
debug : bool, default False:
Verbositity toggle.

Returns

A SuccessTuple corresponding to whether this procedure completed successfully.

Examples

>>> pipe = mrsm.Pipe('test', 'test', columns={'datetime': 'dt'}, instance='sql:local')
>>> pipe.sync({'dt': [datetime.datetime(2020, 1, 1, 0, 0)]})
>>> pipe.sync({'dt': [datetime.datetime(2021, 1, 1, 0, 0)]})
>>> pipe.sync({'dt': [datetime.datetime(2022, 1, 1, 0, 0)]})
>>> 
>>> pipe.clear(begin=datetime.datetime(2021, 1, 1, 0, 0))
>>> pipe.get_data()
          dt
0 2020-01-01
Expand source code
def clear(
        self,
        begin: Optional[datetime.datetime] = None,
        end: Optional[datetime.datetime] = None,
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """
    Call the Pipe's instance connector's `clear_pipe` method.

    Parameters
    ----------
    begin: Optional[datetime.datetime], default None:
        If provided, only remove rows newer than this datetime value.

    end: Optional[datetime.datetime], default None:
        If provided, only remove rows older than this datetime column (not including end).

    debug: bool, default False:
        Verbositity toggle.

    Returns
    -------
    A `SuccessTuple` corresponding to whether this procedure completed successfully.

    Examples
    --------
    >>> pipe = mrsm.Pipe('test', 'test', columns={'datetime': 'dt'}, instance='sql:local')
    >>> pipe.sync({'dt': [datetime.datetime(2020, 1, 1, 0, 0)]})
    >>> pipe.sync({'dt': [datetime.datetime(2021, 1, 1, 0, 0)]})
    >>> pipe.sync({'dt': [datetime.datetime(2022, 1, 1, 0, 0)]})
    >>> 
    >>> pipe.clear(begin=datetime.datetime(2021, 1, 1, 0, 0))
    >>> pipe.get_data()
              dt
    0 2020-01-01

    """
    from meerschaum.utils.warnings import warn
    if self.cache_pipe is not None:
        success, msg = self.cache_pipe.clear(begin=begin, end=end, debug=debug, **kw)
        if not success:
            warn(msg)
    return self.instance_connector.clear_pipe(self, begin=begin, end=end, debug=debug, **kw)
def delete(self, debug: bool = False, **kw) ‑> Tuple[bool, str]

Call the Pipe's instance connector's delete_pipe() method.

Parameters

debug : bool, default False:
Verbosity toggle.

Returns

A SuccessTuple of success (bool), message (str).

Expand source code
def delete(
        self,
        debug: bool = False,
        **kw
    ) -> SuccessTuple:
    """
    Call the Pipe's instance connector's `delete_pipe()` method.

    Parameters
    ----------
    debug : bool, default False:
        Verbosity toggle.

    Returns
    -------
    A `SuccessTuple` of success (`bool`), message (`str`).

    """
    import os, pathlib
    from meerschaum.utils.warnings import warn
    if self.cache_pipe is not None:
        _delete_cache_tuple = self.cache_pipe.delete(debug=debug, **kw)
        if not _delete_cache_tuple[0]:
            warn(_delete_cache_tuple[1])
        _cache_db_path = pathlib.Path(self.cache_connector.database)
        try:
            os.remove(_cache_db_path)
        except Exception as e:
            warn(f"Could not delete cache file '{_cache_db_path}' for pipe '{self}':\n{e}")
    result = self.instance_connector.delete_pipe(self, debug=debug, **kw)
    if not isinstance(result, tuple):
        return False, f"Received unexpected result from '{self.instance_connector}': {result}"
    if result[0]:
        to_delete = ['_id', '_attributes', '_parameters', '_columns', '_data']
        for member in to_delete:
            if member in self.__dict__:
                del self.__dict__[member]
    return result
def drop(self, debug: bool = False, **kw: Any) ‑> Tuple[bool, str]

Call the Pipe's instance connector's drop_pipe() method

Parameters

debug : bool, default False:
Verbosity toggle.

Returns

A SuccessTuple of success, message.

Expand source code
def drop(
        self,
        debug: bool = False,
        **kw : Any
    ) -> SuccessTuple:
    """
    Call the Pipe's instance connector's `drop_pipe()` method

    Parameters
    ----------
    debug: bool, default False:
        Verbosity toggle.

    Returns
    -------
    A `SuccessTuple` of success, message.

    """
    from meerschaum.utils.warnings import warn
    if self.cache_pipe is not None:
        _drop_cache_tuple = self.cache_pipe.drop(debug=debug, **kw)
        if not _drop_cache_tuple[0]:
            warn(_drop_cache_tuple[1])
    return self.instance_connector.drop_pipe(self, debug=debug, **kw)
def edit(self, patch: bool = False, interactive: bool = False, debug: bool = False, **kw: Any) ‑> Tuple[bool, str]

Edit a Pipe's configuration.

Parameters

patch : bool, default False
If patch is True, update parameters by cascading rather than overwriting.
interactive : bool, default False
If True, open an editor for the user to make changes to the pipe's YAML file.
debug : bool, default False
Verbosity toggle.

Returns

A SuccessTuple of success, message.

Expand source code
def edit(
        self,
        patch: bool = False,
        interactive: bool = False,
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """
    Edit a Pipe's configuration.

    Parameters
    ----------
    patch: bool, default False
        If `patch` is True, update parameters by cascading rather than overwriting.
    interactive: bool, default False
        If `True`, open an editor for the user to make changes to the pipe's YAML file.
    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `SuccessTuple` of success, message.

    """
    if not interactive:
        return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw)
    from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
    from meerschaum.utils.misc import edit_file
    import pathlib, os
    parameters_filename = str(self) + '.yaml'
    parameters_path = pathlib.Path(os.path.join(PIPES_CACHE_RESOURCES_PATH, parameters_filename))
    
    from meerschaum.utils.yaml import yaml

    edit_header = "#######################################"
    for i in range(len(str(self))):
        edit_header += "#"
    edit_header += "\n"
    edit_header += f"# Edit the parameters for the Pipe '{self}' #"
    edit_header += "\n#######################################"
    for i in range(len(str(self))):
        edit_header += "#"
    edit_header += "\n\n"

    from meerschaum.config import get_config
    parameters = dict(get_config('pipes', 'parameters', patch=True))
    from meerschaum.config._patch import apply_patch_to_config
    parameters = apply_patch_to_config(parameters, self.parameters)

    ### write parameters to yaml file
    with open(parameters_path, 'w+') as f:
        f.write(edit_header)
        yaml.dump(parameters, stream=f, sort_keys=False)

    ### only quit editing if yaml is valid
    editing = True
    while editing:
        edit_file(parameters_path)
        try:
            with open(parameters_path, 'r') as f:
                file_parameters = yaml.load(f.read())
        except Exception as e:
            from meerschaum.utils.warnings import warn
            warn(f"Invalid format defined for '{self}':\n\n{e}")
            input(f"Press [Enter] to correct the configuration for '{self}': ")
        else:
            editing = False

    self.parameters = file_parameters

    if debug:
        from meerschaum.utils.formatting import pprint
        pprint(self.parameters)

    return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw)
def edit_definition(self, yes: bool = False, noask: bool = False, force: bool = False, debug: bool = False, **kw: Any) ‑> Tuple[bool, str]

Edit a pipe's definition file and update its configuration. NOTE: This function is interactive and should not be used in automated scripts!

Returns

A SuccessTuple of success, message.

Expand source code
def edit_definition(
        self,
        yes: bool = False,
        noask: bool = False,
        force: bool = False,
        debug : bool = False,
        **kw : Any
    ) -> SuccessTuple:
    """
    Edit a pipe's definition file and update its configuration.
    **NOTE:** This function is interactive and should not be used in automated scripts!

    Returns
    -------
    A `SuccessTuple` of success, message.

    """
    if self.connector.type not in ('sql', 'api'):
        return self.edit(interactive=True, debug=debug, **kw)

    import json
    from meerschaum.utils.warnings import info, warn
    from meerschaum.utils.debug import dprint
    from meerschaum.config._patch import apply_patch_to_config
    from meerschaum.utils.misc import edit_file

    _parameters = self.parameters
    if 'fetch' not in _parameters:
        _parameters['fetch'] = {}

    def _edit_api():
        from meerschaum.utils.prompt import prompt, yes_no
        info(
            f"Please enter the keys of the source pipe from '{self.connector}'.\n" +
            "Type 'None' for None, or empty when there is no default. Press [CTRL+C] to skip."
        )

        _keys = { 'connector_keys' : None, 'metric_key' : None, 'location_key' : None }
        for k in _keys:
            _keys[k] = _parameters['fetch'].get(k, None)

        for k, v in _keys.items():
            try:
                _keys[k] = prompt(k.capitalize().replace('_', ' ') + ':', icon=True, default=v)
            except KeyboardInterrupt:
                continue
            if _keys[k] in ('', 'None', '\'None\'', '[None]'):
                _keys[k] = None

        _parameters['fetch'] = apply_patch_to_config(_parameters['fetch'], _keys)

        info("You may optionally specify additional filter parameters as JSON.")
        print("  Parameters are translated into a 'WHERE x AND y' clause, and lists are IN clauses.")
        print("  For example, the following JSON would correspond to 'WHERE x = 1 AND y IN (2, 3)':")
        print(json.dumps({'x': 1, 'y': [2, 3]}, indent=2, separators=(',', ': ')))
        if force or yes_no(
            "Would you like to add additional filter parameters?",
            yes=yes, noask=noask
        ):
            from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
            definition_filename = str(self) + '.json'
            definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename
            try:
                definition_path.touch()
                with open(definition_path, 'w+') as f:
                    json.dump(_parameters.get('fetch', {}).get('params', {}), f, indent=2)
            except Exception as e:
                return False, f"Failed writing file '{definition_path}':\n" + str(e)

            _params = None
            while True:
                edit_file(definition_path)
                try:
                    with open(definition_path, 'r') as f:
                        _params = json.load(f)
                except Exception as e:
                    warn(f'Failed to read parameters JSON:\n{e}', stack=False)
                    if force or yes_no(
                        "Would you like to try again?\n  "
                        + "If not, the parameters JSON file will be ignored.",
                        noask=noask, yes=yes
                    ):
                        continue
                    _params = None
                break
            if _params is not None:
                if 'fetch' not in _parameters:
                    _parameters['fetch'] = {}
                _parameters['fetch']['params'] = _params

        self.parameters = _parameters
        return True, "Success"

    def _edit_sql():
        import pathlib, os, textwrap
        from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
        from meerschaum.utils.misc import edit_file
        definition_filename = str(self) + '.sql'
        definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename

        sql_definition = _parameters['fetch'].get('definition', None)
        if sql_definition is None:
            sql_definition = ''
        sql_definition = textwrap.dedent(sql_definition).lstrip()

        try:
            definition_path.touch()
            with open(definition_path, 'w+') as f:
                f.write(sql_definition)
        except Exception as e:
            return False, f"Failed writing file '{definition_path}':\n" + str(e)

        edit_file(definition_path)
        try:
            with open(definition_path, 'r') as f:
                file_definition = f.read()
        except Exception as e:
            return False, f"Failed reading file '{definition_path}':\n" + str(e)

        if sql_definition == file_definition:
            return False, f"No changes made to definition for pipe '{self}'."

        if ' ' not in file_definition:
            return False, f"Invalid SQL definition for pipe '{self}'."

        if debug:
            dprint("Read SQL definition:\n\n" + file_definition)
        _parameters['fetch']['definition'] = file_definition
        self.parameters = _parameters
        return True, "Success"

    locals()['_edit_' + str(self.connector.type)]()
    return self.edit(interactive=False, debug=debug, **kw)
def exists(self, debug: bool = False) ‑> bool

See if a Pipe's table exists.

Parameters

debug : bool, default False
Verbosity toggle.

Returns

A bool corresponding to whether a pipe's underlying table exists.

Expand source code
def exists(
        self,
        debug : bool = False
    ) -> bool:
    """
    See if a Pipe's table exists.

    Parameters
    ----------
    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `bool` corresponding to whether a pipe's underlying table exists.

    """
    ### TODO test against views
    return self.instance_connector.pipe_exists(pipe=self, debug=debug)
def fetch(self, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, sync_chunks: bool = False, deactivate_plugin_venv: bool = True, debug: bool = False, **kw: Any) ‑> 'pd.DataFrame or None'

Fetch a Pipe's latest data from its connector.

Parameters

begin : Optional[datetime.datetime], default None:
If provided, only fetch data newer than or equal to begin.
end : Optional[datetime.datetime], default None:
If provided, only fetch data older than or equal to end.
sync_chunks : bool, default False
If True and the pipe's connector is of type 'sql', begin syncing chunks while fetching loads chunks into memory.
deactivate_plugin_venv : bool, default True
If True and the pipe's connector is of type 'plugin', deactivate the plugin's virtual environment after retrieving the dataframe. Not intended for general use.
debug : bool, default False
Verbosity toggle.

Returns

A pd.DataFrame of the newest unseen data.

Expand source code
def fetch(
        self,
        begin: Optional[datetime.datetime] = None,
        end: Optional[datetime.datetime] = None,
        sync_chunks: bool = False,
        deactivate_plugin_venv: bool = True,
        debug: bool = False,
        **kw: Any
    ) -> 'pd.DataFrame or None':
    """
    Fetch a Pipe's latest data from its connector.

    Parameters
    ----------
    begin: Optional[datetime.datetime], default None:
        If provided, only fetch data newer than or equal to `begin`.

    end: Optional[datetime.datetime], default None:
        If provided, only fetch data older than or equal to `end`.

    sync_chunks: bool, default False
        If `True` and the pipe's connector is of type `'sql'`, begin syncing chunks while fetching
        loads chunks into memory.

    deactivate_plugin_venv: bool, default True
        If `True` and the pipe's connector is of type `'plugin'`, deactivate the plugin's
        virtual environment after retrieving the dataframe.
        Not intended for general use.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `pd.DataFrame` of the newest unseen data.

    """
    if 'fetch' not in dir(self.connector):
        from meerschaum.utils.warnings import warn
        warn(f"No `fetch()` function defined for connector '{self.connector}'")
        return None

    from meerschaum.utils.debug import dprint, _checkpoint
    if self.connector.type == 'plugin':
        from meerschaum.utils.packages import activate_venv, deactivate_venv
        activate_venv(self.connector.label, debug=debug)
    
    _chunk_hook = kw.pop('chunk_hook') if 'chunk_hook' in kw else None

    df = self.connector.fetch(
        self,
        begin = begin,
        end = end,
        chunk_hook = (
            self.sync if sync_chunks and _chunk_hook is None
            else _chunk_hook
        ),
        debug = debug,
        **kw
    )
    if self.connector.type == 'plugin' and deactivate_plugin_venv:
        deactivate_venv(self.connector.label, debug=debug)
    ### Return True if we're syncing in parallel, else continue as usual.
    if sync_chunks:
        return True
    return df
def filter_existing(self, df: "'pd.DataFrame'", begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, chunksize: Optional[int] = -1, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw) ‑> 'pd.DataFrame'

Inspect a dataframe and filter out rows which already exist in the pipe.

Parameters

df : 'pd.DataFrame'
The dataframe to inspect and filter.
begin : Optional[datetime.datetime], default None
If provided, use this boundary when searching for existing data.
end : Optional[datetime.datetime], default
If provided, use this boundary when searching for existing data.
chunksize : Optional[int], default -1
The chunksize used when fetching existing data.
params : Optional[Dict[str, Any]], default None
If provided, use this filter when searching for existing data.
debug : bool, default False
Verbosity toggle.

Returns

A pd.DataFrame with existing rows removed.

Expand source code
def filter_existing(
        self,
        df: 'pd.DataFrame',
        begin: Optional[datetime.datetime] = None,
        end: Optional[datetime.datetime] = None,
        chunksize: Optional[int] = -1,
        params: Optional[Dict[str, Any]] = None,
        debug: bool = False,
        **kw
    ) -> 'pd.DataFrame':
    """
    Inspect a dataframe and filter out rows which already exist in the pipe.

    Parameters
    ----------
    df: 'pd.DataFrame'
        The dataframe to inspect and filter.
        
    begin: Optional[datetime.datetime], default None
        If provided, use this boundary when searching for existing data.

    end: Optional[datetime.datetime], default
        If provided, use this boundary when searching for existing data.

    chunksize: Optional[int], default -1
        The `chunksize` used when fetching existing data.

    params: Optional[Dict[str, Any]], default None
        If provided, use this filter when searching for existing data. 

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `pd.DataFrame` with existing rows removed.

    """
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.misc import round_time
    from meerschaum.utils.packages import attempt_import, import_pandas
    import datetime
    pd = import_pandas()
    ### begin is the oldest data in the new dataframe
    try:
        min_dt = pd.to_datetime(df[self.get_columns('datetime')].min(skipna=True)).to_pydatetime()
    except Exception as e:
        ### NOTE: This will fetch the entire pipe!
        min_dt = self.get_sync_time(newest=False, debug=debug)
    if not isinstance(min_dt, datetime.datetime) or str(min_dt) == 'NaT':
        ### min_dt might be None, a user-supplied value, or the sync time.
        min_dt = begin
    ### If `min_dt` is None, use `datetime.utcnow()`.
    begin = round_time(
        min_dt,
        to = 'down'
    ) - datetime.timedelta(minutes=1)

    ### end is the newest data in the new dataframe
    try:
        max_dt = pd.to_datetime(df[self.get_columns('datetime')].max(skipna=True)).to_pydatetime()
    except Exception as e:
        max_dt = end
    if not isinstance(max_dt, datetime.datetime) or str(max_dt) == 'NaT':
        max_dt = None

    if max_dt is not None and min_dt > max_dt:
        warn(f"Detected minimum datetime greater than maximum datetime.")

    ### If `max_dt` is `None`, unbound the search.
    end = (
        round_time(
            max_dt,
            to = 'down'
        ) + datetime.timedelta(minutes=1)
    ) if max_dt is not None else end
    if begin is not None and end is not None and begin > end:
        begin = end - datetime.timedelta(minutes=1)

    if debug:
        dprint(f"Looking at data between '{begin}' and '{end}'.", **kw)

    ### backtrack_df is existing Pipe data that overlaps with the fetched df
    try:
        backtrack_minutes = self.parameters['fetch']['backtrack_minutes']
    except Exception as e:
        backtrack_minutes = 0

    backtrack_df = self.get_data(
        begin = begin,
        end = end,
        chunksize = chunksize,
        params = params,
        debug = debug,
        **kw
    )
    if debug:
        dprint("Existing data:\n" + str(backtrack_df), **kw)

    ### remove data we've already seen before
    from meerschaum.utils.misc import filter_unseen_df
    return filter_unseen_df(backtrack_df, df, debug=debug)
def get_backtrack_data(self, backtrack_minutes: int = 0, begin: "Optional['datetime.datetime']" = None, fresh: bool = False, debug: bool = False, **kw: Any) ‑> Union[pd.DataFrame, NoneType]

Get the most recent data from the instance connector as a Pandas DataFrame.

Parameters

backtrack_minutes : int, default 0
How many minutes from begin to select from. Defaults to 0. This may return a few rows due to a rounding quirk.
begin : Optional[datetime.datetime], default None
The starting point to search for data. If begin is None (default), use the most recent observed datetime (AKA sync_time).
E.g. begin = 02:00

Search this region.           Ignore this, even if there's data.
/  /  /  /  /  /  /  /  /  |
-----|----------|----------|----------|----------|----------|
00:00      01:00      02:00      03:00      04:00      05:00

fresh : bool, default False
If True, Ignore local cache and pull directly from the instance connector. Only comes into effect if a pipe was created with cache=True.
debug : bool default False
Verbosity toggle.

Returns

A pd.DataFrame for the pipe's data corresponding to the provided parameters. Backtrack data is a convenient way to get a pipe's data "backtracked" from the most recent datetime.

Expand source code
def get_backtrack_data(
        self,
        backtrack_minutes: int = 0,
        begin: Optional['datetime.datetime'] = None,
        fresh: bool = False,
        debug : bool = False,
        **kw : Any
    ) -> Optional['pd.DataFrame']:
    """
    Get the most recent data from the instance connector as a Pandas DataFrame.

    Parameters
    ----------
    backtrack_minutes: int, default 0
        How many minutes from `begin` to select from.
        Defaults to 0. This may return a few rows due to a rounding quirk.
    begin: Optional[datetime.datetime], default None
        The starting point to search for data.
        If begin is `None` (default), use the most recent observed datetime
        (AKA sync_time).
        
        
    ```
    E.g. begin = 02:00

    Search this region.           Ignore this, even if there's data.
    /  /  /  /  /  /  /  /  /  |
    -----|----------|----------|----------|----------|----------|
    00:00      01:00      02:00      03:00      04:00      05:00

    ```

    fresh: bool, default False
        If `True`, Ignore local cache and pull directly from the instance connector.
        Only comes into effect if a pipe was created with `cache=True`.

    debug: bool default False
        Verbosity toggle.

    Returns
    -------
    A `pd.DataFrame` for the pipe's data corresponding to the provided parameters. Backtrack data
    is a convenient way to get a pipe's data "backtracked" from the most recent datetime.

    """
    from meerschaum.utils.warnings import warn
    kw.update({'backtrack_minutes': backtrack_minutes, 'begin': begin,})

    if not self.exists(debug=debug):
        return None

    if self.cache_pipe is not None:
        if not fresh:
            _sync_cache_tuple = self.cache_pipe.sync(debug=debug, **kw)
            if not _sync_cache_tuple[0]:
                warn(f"Failed to sync cache for pipe '{self}':\n" + _sync_cache_tuple[1])
                fresh = True
            else: ### Successfully synced cache.
                return self.cache_pipe.get_backtrack_data(debug=debug, fresh=True, **kw)

    ### If `fresh` or the syncing failed, directly pull from the instance connector.
    return self.instance_connector.get_backtrack_data(
        pipe = self,
        debug = debug,
        **kw
    )
def get_columns(self, *args: str, error: bool = True) ‑> Tuple[str]

Check if the requested columns are defined.

Parameters

*args : str :
The column names to be retrieved.
error : bool, default True:
If True, raise an Exception if the specified column is not defined.

Returns

A tuple of the same size of args.

Examples

>>> pipe = mrsm.Pipe('test', 'test')
>>> pipe.columns = {'datetime': 'dt', 'id': 'id'}
>>> pipe.get_columns('datetime', 'id')
('dt', 'id')
>>> pipe.get_columns('value')
Exception:  🛑 Missing 'value' column for Pipe 'test_test'.
Expand source code
def get_columns(self, *args: str, error : bool = True) -> Tuple[str]:
    """
    Check if the requested columns are defined.

    Parameters
    ----------
    *args : str :
        The column names to be retrieved.
        
    error : bool, default True:
        If `True`, raise an `Exception` if the specified column is not defined.

    Returns
    -------
    A tuple of the same size of `args`.

    Examples
    --------
    >>> pipe = mrsm.Pipe('test', 'test')
    >>> pipe.columns = {'datetime': 'dt', 'id': 'id'}
    >>> pipe.get_columns('datetime', 'id')
    ('dt', 'id')
    >>> pipe.get_columns('value')
    Exception:  🛑 Missing 'value' column for Pipe 'test_test'.
    """
    from meerschaum.utils.warnings import error as _error, warn
    if not args:
        args = tuple(self.columns.keys())
    col_names = []
    for col in args:
        col_name = None
        try:
            col_name = self.columns[col]
            if col_name is None and error:
                _error(f"Please define the name of the '{col}' column for Pipe '{self}'.")
        except Exception as e:
            col_name = None
        if col_name is None and error:
            _error(f"Missing '{col}'" + f" column for Pipe '{self}'.")
        col_names.append(col_name)
    if len(col_names) == 1:
        return col_names[0]
    return tuple(col_names)
def get_columns_types(self, debug: bool = False) ‑> Union[Dict[str, str], NoneType]

Get a dictionary of a pipe's column names and their types.

Parameters

debug : bool, default False:
Verbosity toggle.

Returns

A dictionary of column names (str) to column types (str).

Examples

>>> pipe.get_columns_types()
{
  'dt': 'TIMESTAMP WITHOUT TIMEZONE',
  'id': 'BIGINT',
  'val': 'DOUBLE PRECISION',
}
>>>
Expand source code
def get_columns_types(self, debug : bool = False) -> Union[Dict[str, str], None]:
    """
    Get a dictionary of a pipe's column names and their types.

    Parameters
    ----------
    debug : bool, default False:
        Verbosity toggle.

    Returns
    -------
    A dictionary of column names (`str`) to column types (`str`).

    Examples
    --------
    >>> pipe.get_columns_types()
    {
      'dt': 'TIMESTAMP WITHOUT TIMEZONE',
      'id': 'BIGINT',
      'val': 'DOUBLE PRECISION',
    }
    >>>
    """
    return self.instance_connector.get_pipe_columns_types(self, debug=debug)
def get_data(self, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, params: Optional[Dict[str, Any]] = None, fresh: bool = False, debug: bool = False, **kw: Any) ‑> Optional[pandas.DataFrame]

Get a pipe's data from the instance connector.

Parameters

begin : Optional[datetime.datetime], default None
Lower bound datetime to begin searching for data (inclusive). Translates to a WHERE clause like WHERE datetime >= begin. Defaults to None.
end : Optional[datetime.datetime], default None
Upper bound datetime to stop searching for data (inclusive). Translates to a WHERE clause like WHERE datetime <= end. Defaults to None.
params : Optional[Dict[str, Any]], default None
Filter the retrieved data by a dictionary of parameters. See build_where() for more details.
fresh : bool, default True
If True, skip local cache and directly query the instance connector. Defaults to True.
debug : bool, default False
Verbosity toggle. Defaults to False.

Returns

A pd.DataFrame for the pipe's data corresponding to the provided parameters.

Expand source code
def get_data(
        self,
        begin: Optional[datetime.datetime] = None,
        end: Optional[datetime.datetime] = None,
        params: Optional[Dict[str, Any]] = None,
        fresh: bool = False,
        debug: bool = False,
        **kw: Any
    ) -> Optional[pandas.DataFrame]:
    """
    Get a pipe's data from the instance connector.

    Parameters
    ----------
    begin: Optional[datetime.datetime], default None
        Lower bound datetime to begin searching for data (inclusive).
        Translates to a `WHERE` clause like `WHERE datetime >= begin`.
        Defaults to `None`.

    end: Optional[datetime.datetime], default None
        Upper bound datetime to stop searching for data (inclusive).
        Translates to a `WHERE` clause like `WHERE datetime <= end`.
        Defaults to `None`.

    params: Optional[Dict[str, Any]], default None
        Filter the retrieved data by a dictionary of parameters.
        See `meerschaum.connectors.sql.tools.build_where` for more details. 

    fresh: bool, default True
        If `True`, skip local cache and directly query the instance connector.
        Defaults to `True`.

    debug: bool, default False
        Verbosity toggle.
        Defaults to `False`.

    Returns
    -------
    A `pd.DataFrame` for the pipe's data corresponding to the provided parameters.

    """
    from meerschaum.utils.warnings import warn
    kw.update({'begin': begin, 'end': end, 'params': params,})

    if not self.exists(debug=debug):
        return None

    if self.cache_pipe is not None:
        if not fresh:
            _sync_cache_tuple = self.cache_pipe.sync(debug=debug, **kw)
            if not _sync_cache_tuple[0]:
                warn(f"Failed to sync cache for pipe '{self}':\n" + _sync_cache_tuple[1])
                fresh = True
            else: ### Successfully synced cache.
                return self.cache_pipe.get_data(debug=debug, fresh=True, **kw)

    ### If `fresh` or the syncing failed, directly pull from the instance connector.
    return self.instance_connector.get_pipe_data(
        pipe = self,
        debug = debug,
        **kw
    )
def get_id(self, **kw: Any) ‑> Union[int, NoneType]

Fetch a pipe's ID from its instance connector. If the pipe does not exist, return None.

Expand source code
def get_id(self, **kw : Any) -> Union[int, None]:
    """
    Fetch a pipe's ID from its instance connector.
    If the pipe does not exist, return `None`.
    """
    return self.instance_connector.get_pipe_id(self, **kw)
def get_rowcount(self, begin: "Optional['datetime.datetime']" = None, end: "Optional['datetime.datetime']" = None, remote: bool = False, params: Optional[Dict[str, Any]] = None, debug: bool = False) ‑> Union[int, None]

Get a Pipe's instance or remote rowcount.

Parameters

begin : Optional[datetime.datetime], default None
Count rows where datetime > begin.
end : Optional[datetime.datetime], default None
Count rows where datetime <= end.
remote : bool, default False
Count rows from a pipe's remote source. NOTE: This is experimental!
debug : bool, default False
Verbosity toggle.

Returns

An int of the number of rows in the pipe corresponding to the provided parameters. None is returned if the pipe does not exist.

Expand source code
def get_rowcount(
        self,
        begin: Optional['datetime.datetime'] = None,
        end: Optional['datetime.datetime'] = None,
        remote: bool = False,
        params: Optional[Dict[str, Any]] = None,
        debug: bool = False
    ) -> Union[int, None]:
    """
    Get a Pipe's instance or remote rowcount.

    Parameters
    ----------
    begin: Optional[datetime.datetime], default None
        Count rows where datetime > begin.

    end: Optional[datetime.datetime], default None
        Count rows where datetime <= end.

    remote: bool, default False
        Count rows from a pipe's remote source.
        **NOTE**: This is experimental!

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    An `int` of the number of rows in the pipe corresponding to the provided parameters.
    `None` is returned if the pipe does not exist.

    """
    from meerschaum.utils.warnings import warn
    connector = self.instance_connector if not remote else self.connector
    try:
        return connector.get_pipe_rowcount(
            self, begin=begin, end=end, remote=remote, params=params, debug=debug
        )
    except AttributeError as e:
        warn(e)
        if remote:
            return None
    warn(f"Failed to get a rowcount for pipe '{self}'.")
    return None
def get_sync_time(self, params: Optional[Dict[str, Any]] = None, newest: bool = True, round_down: bool = True, debug: bool = False) ‑> Union[datetime.datetime, NoneType]

Get the most recent datetime value for a Pipe.

Parameters

params : Optional[Dict[str, Any]], default None
Dictionary to build a WHERE clause for a specific column. See build_where().
newest : bool, default True
If True, get the most recent datetime (honoring params). If False, get the oldest datetime (ASC instead of DESC).
round_down : bool, default True
If True, round down the sync time to the nearest minute.
debug : bool, default False
Verbosity toggle.

Returns

A datetime.datetime object if the pipe exists, otherwise None.

Expand source code
def get_sync_time(
        self,
        params : Optional[Dict[str, Any]] = None,
        newest: bool = True,
        round_down: bool = True,
        debug : bool = False
    ) -> Union['datetime.datetime', None]:
    """
    Get the most recent datetime value for a Pipe.

    Parameters
    ----------
    params: Optional[Dict[str, Any]], default None
        Dictionary to build a WHERE clause for a specific column.
        See `meerschaum.connectors.sql.tools.build_where`.

    newest: bool, default True
        If `True`, get the most recent datetime (honoring `params`).
        If `False`, get the oldest datetime (`ASC` instead of `DESC`).

    round_down: bool, default True
        If `True`, round down the sync time to the nearest minute.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `datetime.datetime` object if the pipe exists, otherwise `None`.

    """
    from meerschaum.utils.warnings import error, warn
    if self.columns is None:
        warn(
            f"No columns found for Pipe '{self}'. " +
            "Pipe might not be registered or is missing columns in parameters."
        )
        return None

    if 'datetime' not in self.columns:
        warn(
            f"'datetime' must be declared in parameters:columns for Pipe '{self}'.\n\n" +
            f"You can add parameters for this Pipe with the following command:\n\n" +
            f"mrsm edit pipes -C {self.connector_keys} -M " +
            f"{self.metric_key} -L " +
            (f"[None]" if self.location_key is None else f"{self.location_key}")
        )
        return None

    return self.instance_connector.get_sync_time(
        self,
        params = params,
        newest = newest,
        round_down = round_down,
        debug = debug,
    )
def get_val_column(self, debug: bool = False) ‑> Union[str, NoneType]

Return the name of the value column if it's defined, otherwise make an educated guess. If not set in the columns dictionary, return the first numeric column that is not an ID or datetime column. If none may be found, return None.

Parameters

debug : bool, default False:
Verbosity toggle.

Returns

Either a string or None.

Expand source code
def get_val_column(self, debug: bool = False) -> Union[str, None]:
    """
    Return the name of the value column if it's defined, otherwise make an educated guess.
    If not set in the `columns` dictionary, return the first numeric column that is not
    an ID or datetime column.
    If none may be found, return `None`.

    Parameters
    ----------
    debug: bool, default False:
        Verbosity toggle.

    Returns
    -------
    Either a string or `None`.
    """
    from meerschaum.utils.debug import dprint
    if debug:
        dprint('Attempting to determine the value column...')
    try:
        val_name = self.get_columns('value')
    except Exception as e:
        val_name = None
    if val_name is not None:
        if debug:
            dprint(f"Value column: {val_name}")
        return val_name

    cols = self.columns
    if cols is None:
        if debug:
            dprint('No columns could be determined. Returning...')
        return None
    try:
        dt_name = self.get_columns('datetime')
    except Exception as e:
        dt_name = None
    try:
        id_name = self.get_columns('id')
    except Exception as e:
        id_name = None

    if debug:
        dprint(f"dt_name: {dt_name}")
        dprint(f"id_name: {id_name}")

    cols_types = self.get_columns_types(debug=debug)
    if cols_types is None:
        return None
    if debug:
        dprint(f"cols_types: {cols_types}")
    if dt_name is not None:
        cols_types.pop(dt_name, None)
    if id_name is not None:
        cols_types.pop(id_name, None)

    candidates = []
    candidate_keywords = {'float', 'double', 'precision', 'int', 'numeric',}
    for search_term in candidate_keywords:
        for col, typ in cols_types.items():
            if search_term in typ.lower():
                candidates.append(col)
                break
    if not candidates:
        if debug:
            dprint(f"No value column could be determined.")
        return None

    return candidates[0]
def register(self, debug: bool = False)

Register a new Pipe along with its attributes.

Parameters

debug : bool, default False
Verbosity toggle.

Returns

A SuccessTuple of success, message.

Expand source code
def register(
        self,
        debug: bool = False
    ):
    """
    Register a new Pipe along with its attributes.

    Parameters
    ----------
    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `SuccessTuple` of success, message.

    """
    import warnings
    with warnings.catch_warnings():
        warnings.simplefilter('ignore')
        try:
            _conn = self.connector
        except Exception as e:
            _conn = None

    if _conn is not None and _conn.type == 'plugin' and _conn.register is not None:
        params = self.connector.register(self)
        params = {} if params is None else params
        if not isinstance(params, dict):
            from meerschaum.utils.warnings import warn
            warn(
                f"Invalid parameters returned from `register()` in plugin {self.connector}:\n"
                + f"{params}"
            )
        else:
            self.parameters = params

    if not self.parameters:
        self.parameters = {
            'columns': self.columns,
        }

    return self.instance_connector.register_pipe(self, debug=debug)
def show(self, nopretty: bool = False, debug: bool = False, **kw) ‑> Tuple[bool, str]

Show attributes of a Pipe.

Parameters

nopretty : bool, default False
If True, simply print the JSON of the pipe's attributes.
debug : bool, default False
Verbosity toggle.

Returns

A SuccessTuple of success, message.

Expand source code
def show(
        self,
        nopretty: bool = False,
        debug: bool = False,
        **kw
    ) -> SuccessTuple:
    """
    Show attributes of a Pipe.

    Parameters
    ----------
    nopretty: bool, default False
        If `True`, simply print the JSON of the pipe's attributes.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `SuccessTuple` of success, message.

    """
    import json
    from meerschaum.utils.formatting import pprint, make_header
    from meerschaum.utils.warnings import info
    if not nopretty:
        print(make_header(f"Attributes for pipe '{self}':"))
        pprint(self.attributes)
    else:
        print(json.dumps(self.attributes))

    return True, "Success"
def sync(self, df: Optional[Union[pandas.DataFrame, Dict[str, List[Any]]]] = None, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, force: bool = False, retries: int = 10, min_seconds: int = 1, check_existing: bool = True, blocking: bool = True, workers: Optional[int] = None, callback: Optional[Callable[[Tuple[bool, str]], Any]] = None, error_callback: Optional[Callable[[Exception], Any]] = None, chunksize: Optional[int] = -1, sync_chunks: bool = False, deactivate_plugin_venv: bool = True, debug: bool = False, **kw: Any) ‑> SuccessTuple

Fetch new data from the source and update the pipe's table with new data.

Get new remote data via fetch, get existing data in the same time period, and merge the two, only keeping the unseen data.

Parameters

df : Union[None, pd.DataFrame, Dict[str, List[Any]]], default None
An optional DataFrame to sync into the pipe. Defaults to None.
begin : Optional[datetime.datetime], default None
Optionally specify the earliest datetime to search for data. Defaults to None.
end : Optional[datetime.datetime], default None
Optionally specify the latest datetime to search for data. Defaults to None.
force : bool, default False
If True, keep trying to sync untul retries attempts. Defaults to False.
retries : int, default 10
If force, how many attempts to try syncing before declaring failure. Defaults to 10.
min_seconds : Union[int, float], default 1
If force, how many seconds to sleep between retries. Defaults to 1.
check_existing : bool, default True
If True, pull and diff with existing data from the pipe. Defaults to True.
blocking : bool, default True
If True, wait for sync to finish and return its result, otherwise asyncronously sync (oxymoron?) and return success. Defaults to True. Only intended for specific scenarios.
workers : Optional[int], default None
No use directly within sync(). Instead is passed on to instance connectors' sync_pipe() methods (e.g. meerschaum.connectors.plugin.PluginConnector). Defaults to None.
callback : Optional[Callable[[Tuple[bool, str]], Any]], default None
Callback function which expects a SuccessTuple as input. Only applies when blocking=False.
error_callback : Optional[Callable[[Exception], Any]], default None
Callback function which expects an Exception as input. Only applies when blocking=False.
chunksize : int, default -1
Specify the number of rows to sync per chunk. If -1, resort to system configuration (default is 900). A chunksize of None will sync all rows in one transaction. Defaults to -1.
sync_chunks : bool, default False
If possible, sync chunks while fetching them into memory. Defaults to False.
deactivate_plugin_venv : bool, default True
If True, deactivate a plugin's virtual environment after syncing. Defaults to True.
debug : bool, default False
Verbosity toggle. Defaults to False.

Returns

A SuccessTuple of success (bool) and message (str).

Expand source code
def sync(
        self,
        df: Optional[Union[pandas.DataFrame, Dict[str, List[Any]]]] = None,
        begin: Optional[datetime.datetime] = None,
        end: Optional[datetime.datetime] = None,
        force: bool = False,
        retries: int = 10,
        min_seconds: int = 1,
        check_existing: bool = True,
        blocking: bool = True,
        workers: Optional[int] = None,
        callback: Optional[Callable[[Tuple[bool, str]], Any]] = None,
        error_callback: Optional[Callable[[Exception], Any]] = None,
        chunksize: Optional[int] = -1,
        sync_chunks: bool = False,
        deactivate_plugin_venv: bool = True,
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """
    Fetch new data from the source and update the pipe's table with new data.
    
    Get new remote data via fetch, get existing data in the same time period,
    and merge the two, only keeping the unseen data.

    Parameters
    ----------
    df: Union[None, pd.DataFrame, Dict[str, List[Any]]], default None
        An optional DataFrame to sync into the pipe. Defaults to `None`.

    begin: Optional[datetime.datetime], default None
        Optionally specify the earliest datetime to search for data.
        Defaults to `None`.

    end: Optional[datetime.datetime], default None
        Optionally specify the latest datetime to search for data.
        Defaults to `None`.

    force: bool, default False
        If `True`, keep trying to sync untul `retries` attempts.
        Defaults to `False`.

    retries: int, default 10
        If `force`, how many attempts to try syncing before declaring failure.
        Defaults to `10`.

    min_seconds: Union[int, float], default 1
        If `force`, how many seconds to sleep between retries. Defaults to `1`.

    check_existing: bool, default True
        If `True`, pull and diff with existing data from the pipe.
        Defaults to `True`.

    blocking: bool, default True
        If `True`, wait for sync to finish and return its result, otherwise
        asyncronously sync (oxymoron?) and return success. Defaults to `True`.
        Only intended for specific scenarios.

    workers: Optional[int], default None
        No use directly within `Pipe.sync()`. Instead is passed on to
        instance connectors' `sync_pipe()` methods
        (e.g. `meerschaum.connectors.plugin.PluginConnector`).
        Defaults to `None`.

    callback: Optional[Callable[[Tuple[bool, str]], Any]], default None
        Callback function which expects a SuccessTuple as input.
        Only applies when `blocking=False`.

    error_callback: Optional[Callable[[Exception], Any]], default None
        Callback function which expects an Exception as input.
        Only applies when `blocking=False`.

    chunksize: int, default -1
        Specify the number of rows to sync per chunk.
        If `-1`, resort to system configuration (default is `900`).
        A `chunksize` of `None` will sync all rows in one transaction.
        Defaults to `-1`.

    sync_chunks: bool, default False
        If possible, sync chunks while fetching them into memory.
        Defaults to `False`.

    deactivate_plugin_venv: bool, default True
        If `True`, deactivate a plugin's virtual environment after syncing.
        Defaults to `True`.

    debug: bool, default False
        Verbosity toggle. Defaults to False.

    Returns
    -------
    A `SuccessTuple` of success (`bool`) and message (`str`).

    """
    from meerschaum.utils.debug import dprint, _checkpoint
    from meerschaum.utils.warnings import warn, error
    import datetime
    import time
    if (callback is not None or error_callback is not None) and blocking:
        warn("Callback functions are only executed when blocking = False. Ignoring...")

    _checkpoint(_total=2, **kw)

    if (
          not self.connector_keys.startswith('plugin:')
          and not self.get_columns('datetime', error=False)
    ):
        return False, f"Cannot sync pipe '{self}' without a datetime column."

    ### NOTE: Setting begin to the sync time for Simple Sync.
    ### TODO: Add flag for specifying syncing method.
    begin = _determine_begin(self, begin, debug=debug)
    kw.update({
        'begin': begin, 'end': end, 'force': force, 'retries': retries, 'min_seconds': min_seconds,
        'check_existing': check_existing, 'blocking': blocking, 'workers': workers,
        'callback': callback, 'error_callback': error_callback, 'sync_chunks': sync_chunks,
        'chunksize': chunksize,
    })


    def _sync(
        p: 'meerschaum.Pipe',
        df: Optional['pandas.DataFrame'] = None
    ) -> SuccessTuple:
        ### Ensure that Pipe is registered.
        if p.get_id(debug=debug) is None:
            ### NOTE: This may trigger an interactive session for plugins!
            register_tuple = p.register(debug=debug)
            if not register_tuple[0]:
                return register_tuple

        ### If connector is a plugin with a `sync()` method, return that instead.
        ### If the plugin does not have a `sync()` method but does have a `fetch()` method,
        ### use that instead.
        ### NOTE: The DataFrame must be None for the plugin sync method to apply.
        ### If a DataFrame is provided, continue as expected.
        if df is None:
            try:
                if p.connector.type == 'plugin' and p.connector.sync is not None:
                    from meerschaum.utils.packages import activate_venv, deactivate_venv
                    activate_venv(p.connector.label, debug=debug)
                    return_tuple = p.connector.sync(p, debug=debug, **kw)
                    if deactivate_plugin_venv:
                        deactivate_venv(p.connector.label, debug=debug)
                    if not isinstance(return_tuple, tuple):
                        return_tuple = (
                            False,
                            f"Plugin '{p.connector.label}' returned non-tuple value: {return_tuple}"
                        )
                    return return_tuple

            except Exception as e:
                msg = f"Failed to sync pipe '{p}' with exception: '" + str(e) + "'"
                if debug:
                    error(msg, silent=False)
                return False, msg

        ### default: fetch new data via the connector.
        ### If new data is provided, skip fetching.
        if df is None:
            if p.connector is None:
                return False, f"Cannot fetch data for pipe '{p}' without a connector."
            df = p.fetch(debug=debug, **kw)
            if df is None:
                return False, f"Unable to fetch data for pipe '{p}'."
            if df is True:
                return True, f"Pipe '{p}' was synced in parallel."

        ### CHECKPOINT: Retrieved the DataFrame.
        _checkpoint(**kw)
        if debug:
            dprint(
                "DataFrame to sync:\n"
                + (str(df)[:255] + '...' if len(str(df)) >= 256 else str(df)),
                **kw
            )

        ### if force, continue to sync until success
        return_tuple = False, f"Did not sync pipe '{p}'."
        run = True
        _retries = 1
        while run:
            return_tuple = p.instance_connector.sync_pipe(
                pipe = p,
                df = df,
                debug = debug,
                **kw
            )
            _retries += 1
            run = (not return_tuple[0]) and force and _retries <= retries
            if run and debug:
                dprint(f"Syncing failed for pipe '{p}'. Attempt ( {_retries} / {retries} )", **kw)
                dprint(f"Sleeping for {min_seconds} seconds...", **kw)
                time.sleep(min_seconds)
            if _retries > retries:
                warn(
                    f"Unable to sync pipe '{p}' within {retries} attempt" +
                        ("s" if retries != 1 else "") + "!"
                )

        ### CHECKPOINT: Finished syncing. Handle caching.
        _checkpoint(**kw)
        if self.cache_pipe is not None:
            if debug:
                dprint(f"Caching retrieved dataframe.", **kw)
                _sync_cache_tuple = self.cache_pipe.sync(df, debug=debug, **kw)
                if not _sync_cache_tuple[0]:
                    warn(f"Failed to sync local cache for pipe '{self}'.")

        return return_tuple

    if blocking:
        return _sync(self, df = df)

    ### TODO implement concurrent syncing (split DataFrame? mimic the functionality of modin?)
    from meerschaum.utils.threading import Thread
    def default_callback(result_tuple : SuccessTuple):
        dprint(f"Asynchronous result from Pipe '{self}': {result_tuple}", **kw)
    def default_error_callback(x : Exception):
        dprint(f"Error received for Pipe '{self}': {x}", **kw)
    if callback is None and debug:
        callback = default_callback
    if error_callback is None and debug:
        error_callback = default_error_callback
    try:
        thread = Thread(
            target = _sync,
            args = (self,),
            kwargs = {'df' : df},
            daemon = False,
            callback = callback,
            error_callback = error_callback
        )
        thread.start()
    except Exception as e:
        return False, str(e)
    return True, f"Spawned asyncronous sync for pipe '{self}'."