meerschaum

Meerschaum banner

PyPI GitHub Info Stats
PyPI GitHub Repo stars License Number of plugins
PyPI - Python Version GitHub Sponsors meerschaum Tutorials Number of registered users

Meerschaum demo

What is Meerschaum?

Meerschaum is a tool for quickly synchronizing time-series data streams called pipes. With Meerschaum, you can have a data visualization stack running in minutes.

Why Meerschaum?

If you've worked with time-series data, you know the headaches that come with ETL. Data engineering often gets in analysts' way, and when work needs to get done, every minute spent on pipelining is time taken away from real analysis.

Rather than copy / pasting your ETL scripts, simply build pipes with Meerschaum! Meerschaum gives you the tools to design your data streams how you like โ€• and don't worry โ€” you can always incorporate Meerschaum into your existing systems!

Features

  • ๐Ÿ“Š Built for Data Scientists and Analysts
    • Integrate with Pandas, Grafana and other popular data analysis tools.
    • Persist your dataframes and always get the latest data.
  • โšก๏ธ Production-Ready, Batteries Included
  • ๐Ÿ”Œ Easily Expandable
  • โœจ Tailored for Your Experience
    • Rich CLI makes managing your data streams surprisingly enjoyable!
    • Web dashboard for those who prefer a more graphical experience.
    • Manage your database connections with Meerschaum connectors
    • Utility commands with sensible syntax let you control many pipes with grace.
  • ๐Ÿ’ผ Portable from the Start
    • The environment variables $MRSM_ROOT_DIR, $MRSM_PLUGINS_DIR, and $MRSM_VENVS_DIR let you emulate multiple installations and group together your instances.
    • No dependencies required; anything needed will be installed into virtual environments.
    • Specify required packages for your plugins, and users will get those packages in a virtual environment.

Installation

For a more thorough setup guide, visit the Getting Started page at meerschaum.io.

TL;DR

pip install -U --user meerschaum
mrsm stack up -d db grafana
mrsm bootstrap pipes

Usage Documentation

Please visit meerschaum.io for setup, usage, and troubleshooting information. You can find technical documentation at docs.meerschaum.io, and here is a complete list of the Meerschaum actions.

>>> import meerschaum as mrsm
>>> pipe = mrsm.Pipe("plugin:noaa", "weather")
>>> cols_to_select = ['timestamp', 'station', 'temperature (degC)']
>>> df = pipe.get_data(cols_to_select, begin='2023-11-15', end='2023-11-20')
>>> df
              timestamp station  temperature (degC)
0   2023-11-15 00:52:00    KATL                16.1
1   2023-11-15 00:52:00    KCLT                11.7
2   2023-11-15 00:53:00    KGMU                15.0
3   2023-11-15 00:54:00    KCEU                13.9
4   2023-11-15 01:52:00    KATL                15.6
..                  ...     ...                 ...
535 2023-11-19 22:54:00    KCEU                15.6
536 2023-11-19 23:52:00    KATL                16.7
537 2023-11-19 23:52:00    KCLT                13.9
538 2023-11-19 23:53:00    KGMU                15.6
539 2023-11-19 23:54:00    KCEU                15.0

[540 rows x 3 columns]
>>>

Plugins

Check out the Awesome Meerschaum list for a list of community plugins as well as the public plugins repository.

For details on installing, using, and writing plugins, check out the plugins documentation at meerschaum.io.

Example Plugin

# ~/.config/meerschaum/plugins/example.py
__version__ = '0.0.1'
required = []

def register(pipe, **kw):
    return {
        'columns': {
            'datetime': 'dt',
            'id': 'id',
            'value': 'val',
        }
    }

def fetch(pipe, **kw):
    import random
    from datetime import datetime
    docs = [
        {
            'dt': datetime.now(),
            'id': i,
            'val': random.ranint(0, 200),
        }
        for i in range(random.randint(0, 100))
    ]
    return docs

Support Meerschaum's Development

For consulting services and to support Meerschaum's development, please considering sponsoring me on GitHub sponsors.

Additionally, you can always buy me a coffeeโ˜•!

 1#! /usr/bin/env python
 2# -*- coding: utf-8 -*-
 3# vim:fenc=utf-8
 4
 5"""
 6Copyright 2023 Bennett Meares
 7
 8Licensed under the Apache License, Version 2.0 (the "License");
 9you may not use this file except in compliance with the License.
10You may obtain a copy of the License at
11
12   http://www.apache.org/licenses/LICENSE-2.0
13
14Unless required by applicable law or agreed to in writing, software
15distributed under the License is distributed on an "AS IS" BASIS,
16WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17See the License for the specific language governing permissions and
18limitations under the License.
19"""
20
21import atexit
22from meerschaum.utils.typing import SuccessTuple
23from meerschaum.core.Pipe import Pipe
24from meerschaum.plugins import Plugin
25from meerschaum.utils.venv import Venv
26from meerschaum.connectors import get_connector
27from meerschaum.utils import get_pipes
28from meerschaum.utils.formatting import pprint
29from meerschaum._internal.docs import index as __doc__
30from meerschaum.config import __version__, get_config
31from meerschaum.utils.packages import attempt_import
32from meerschaum.__main__ import _close_pools
33
34atexit.register(_close_pools)
35
36__pdoc__ = {'gui': False, 'api': False, 'core': False, '_internal': False}
37__all__ = (
38    "get_pipes",
39    "get_connector",
40    "get_config",
41    "Pipe",
42    "Plugin",
43    "Venv",
44    "Plugin",
45    "pprint",
46    "attempt_import",
47    "actions",
48    "config",
49    "connectors",
50    "plugins",
51    "utils",
52)
def get_pipes( connector_keys: Union[str, List[str], NoneType] = None, metric_keys: Union[str, List[str], NoneType] = None, location_keys: Union[str, List[str], NoneType] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, mrsm_instance: Union[str, meerschaum.connectors.sql.SQLConnector.SQLConnector, meerschaum.connectors.api.APIConnector.APIConnector, NoneType] = None, instance: Union[str, meerschaum.connectors.sql.SQLConnector.SQLConnector, meerschaum.connectors.api.APIConnector.APIConnector, NoneType] = None, as_list: bool = False, method: str = 'registered', wait: bool = False, debug: bool = False, **kw: Any) -> Union[Dict[str, Dict[str, Dict[str, Pipe]]], List[Pipe]]:
 19def get_pipes(
 20        connector_keys: Union[str, List[str], None] = None,
 21        metric_keys: Union[str, List[str], None] = None,
 22        location_keys: Union[str, List[str], None] = None,
 23        tags: Optional[List[str]] = None,
 24        params: Optional[Dict[str, Any]] = None,
 25        mrsm_instance: Union[str, InstanceConnector, None] = None,
 26        instance: Union[str, InstanceConnector, None] = None,
 27        as_list: bool = False,
 28        method: str = 'registered',
 29        wait: bool = False,
 30        debug: bool = False,
 31        **kw: Any
 32    ) -> Union[PipesDict, List[mrsm.Pipe]]:
 33    """
 34    Return a dictionary or list of `meerschaum.Pipe` objects.
 35
 36    Parameters
 37    ----------
 38    connector_keys: Union[str, List[str], None], default None
 39        String or list of connector keys.
 40        If omitted or is `'*'`, fetch all possible keys.
 41        If a string begins with `'_'`, select keys that do NOT match the string.
 42
 43    metric_keys: Union[str, List[str], None], default None
 44        String or list of metric keys. See `connector_keys` for formatting.
 45
 46    location_keys: Union[str, List[str], None], default None
 47        String or list of location keys. See `connector_keys` for formatting.
 48
 49    tags: Optional[List[str]], default None
 50         If provided, only include pipes with these tags.
 51
 52    params: Optional[Dict[str, Any]], default None
 53        Dictionary of additional parameters to search by.
 54        Params are parsed into a SQL WHERE clause.
 55        E.g. `{'a': 1, 'b': 2}` equates to `'WHERE a = 1 AND b = 2'`
 56
 57    mrsm_instance: Union[str, InstanceConnector, None], default None
 58        Connector keys for the Meerschaum instance of the pipes.
 59        Must be a `meerschaum.connectors.sql.SQLConnector.SQLConnector` or
 60        `meerschaum.connectors.api.APIConnector.APIConnector`.
 61        
 62    as_list: bool, default False
 63        If `True`, return pipes in a list instead of a hierarchical dictionary.
 64        `False` : `{connector_keys: {metric_key: {location_key: Pipe}}}`
 65        `True`  : `[Pipe]`
 66
 67    method: str, default 'registered'
 68        Available options: `['registered', 'explicit', 'all']`
 69        If `'registered'` (default), create pipes based on registered keys in the connector's pipes table
 70        (API or SQL connector, depends on mrsm_instance).
 71        If `'explicit'`, create pipes from provided connector_keys, metric_keys, and location_keys
 72        instead of consulting the pipes table. Useful for creating non-existent pipes.
 73        If `'all'`, create pipes from predefined metrics and locations. Required `connector_keys`.
 74        **NOTE:** Method `'all'` is not implemented!
 75
 76    wait: bool, default False
 77        Wait for a connection before getting Pipes. Should only be true for cases where the
 78        database might not be running (like the API).
 79
 80    **kw: Any:
 81        Keyword arguments to pass to the `meerschaum.Pipe` constructor.
 82        
 83
 84    Returns
 85    -------
 86    A dictionary of dictionaries and `meerschaum.Pipe` objects
 87    in the connector, metric, location hierarchy.
 88    If `as_list` is `True`, return a list of `meerschaum.Pipe` objects.
 89
 90    Examples
 91    --------
 92    ```
 93    >>> ### Manual definition:
 94    >>> pipes = {
 95    ...     <connector_keys>: {
 96    ...         <metric_key>: {
 97    ...             <location_key>: Pipe(
 98    ...                 <connector_keys>,
 99    ...                 <metric_key>,
100    ...                 <location_key>,
101    ...             ),
102    ...         },
103    ...     },
104    ... },
105    >>> ### Accessing a single pipe:
106    >>> pipes['sql:main']['weather'][None]
107    >>> ### Return a list instead:
108    >>> get_pipes(as_list=True)
109    [sql_main_weather]
110    >>> 
111    ```
112    """
113
114    from meerschaum.config import get_config
115    from meerschaum.utils.warnings import error
116    from meerschaum.utils.misc import filter_keywords
117
118    if connector_keys is None:
119        connector_keys = []
120    if metric_keys is None:
121        metric_keys = []
122    if location_keys is None:
123        location_keys = []
124    if params is None:
125        params = {}
126    if tags is None:
127        tags = []
128
129    if isinstance(connector_keys, str):
130        connector_keys = [connector_keys]
131    if isinstance(metric_keys, str):
132        metric_keys = [metric_keys]
133    if isinstance(location_keys, str):
134        location_keys = [location_keys]
135
136    ### Get SQL or API connector (keys come from `connector.fetch_pipes_keys()`).
137    ### If `wait`, wait until a connection is made
138    if mrsm_instance is None:
139        mrsm_instance = instance
140    if mrsm_instance is None:
141        mrsm_instance = get_config('meerschaum', 'instance', patch=True)
142    if isinstance(mrsm_instance, str):
143        from meerschaum.connectors.parse import parse_instance_keys
144        connector = parse_instance_keys(keys=mrsm_instance, wait=wait, debug=debug)
145    else: ### NOTE: mrsm_instance MUST be a SQL or API connector for this to work
146        from meerschaum.connectors import instance_types
147        valid_connector = False
148        if hasattr(mrsm_instance, 'type'):
149            if mrsm_instance.type in instance_types:
150                valid_connector = True
151        if not valid_connector:
152            error(f"Invalid instance connector: {mrsm_instance}")
153        connector = mrsm_instance
154    if debug:
155        from meerschaum.utils.debug import dprint
156        dprint(f"Using instance connector: {connector}")
157    if not connector:
158        error(f"Could not create connector from keys: '{mrsm_instance}'")
159
160    ### Get a list of tuples for the keys needed to build pipes.
161    result = fetch_pipes_keys(
162        method,
163        connector,
164        connector_keys = connector_keys,
165        metric_keys = metric_keys,
166        location_keys = location_keys,
167        tags = tags,
168        params = params,
169        debug = debug
170    )
171    if result is None:
172        error(f"Unable to build pipes!")
173
174    ### Populate the `pipes` dictionary with Pipes based on the keys
175    ### obtained from the chosen `method`.
176    from meerschaum import Pipe
177    pipes = {}
178    for ck, mk, lk in result:
179        if ck not in pipes:
180            pipes[ck] = {}
181
182        if mk not in pipes[ck]:
183            pipes[ck][mk] = {}
184
185        pipes[ck][mk][lk] = Pipe(
186            ck, mk, lk,
187            mrsm_instance = connector,
188            debug = debug,
189            **filter_keywords(Pipe, **kw)
190        )
191
192    if not as_list:
193        return pipes
194    from meerschaum.utils.misc import flatten_pipes_dict
195    return flatten_pipes_dict(pipes)

Return a dictionary or list of meerschaum.Pipe objects.

Parameters
  • connector_keys (Union[str, List[str], None], default None): String or list of connector keys. If omitted or is '*', fetch all possible keys. If a string begins with '_', select keys that do NOT match the string.
  • metric_keys (Union[str, List[str], None], default None): String or list of metric keys. See connector_keys for formatting.
  • location_keys (Union[str, List[str], None], default None): String or list of location keys. See connector_keys for formatting.
  • tags (Optional[List[str]], default None): If provided, only include pipes with these tags.
  • params (Optional[Dict[str, Any]], default None): Dictionary of additional parameters to search by. Params are parsed into a SQL WHERE clause. E.g. {'a': 1, 'b': 2} equates to 'WHERE a = 1 AND b = 2'
  • mrsm_instance (Union[str, InstanceConnector, None], default None): Connector keys for the Meerschaum instance of the pipes. Must be a meerschaum.connectors.sql.SQLConnector.SQLConnector or meerschaum.connectors.api.APIConnector.APIConnector.
  • as_list (bool, default False): If True, return pipes in a list instead of a hierarchical dictionary. False : {connector_keys: {metric_key: {location_key: Pipe}}} True : [Pipe]
  • method (str, default 'registered'): Available options: ['registered', 'explicit', 'all'] If 'registered' (default), create pipes based on registered keys in the connector's pipes table (API or SQL connector, depends on mrsm_instance). If 'explicit', create pipes from provided connector_keys, metric_keys, and location_keys instead of consulting the pipes table. Useful for creating non-existent pipes. If 'all', create pipes from predefined metrics and locations. Required connector_keys. NOTE: Method 'all' is not implemented!
  • wait (bool, default False): Wait for a connection before getting Pipes. Should only be true for cases where the database might not be running (like the API).
  • **kw (Any:): Keyword arguments to pass to the meerschaum.Pipe constructor.
Returns
  • A dictionary of dictionaries and meerschaum.Pipe objects
  • in the connector, metric, location hierarchy.
  • If as_list is True, return a list of meerschaum.Pipe objects.
Examples
>>> ### Manual definition:
>>> pipes = {
...     <connector_keys>: {
...         <metric_key>: {
...             <location_key>: Pipe(
...                 <connector_keys>,
...                 <metric_key>,
...                 <location_key>,
...             ),
...         },
...     },
... },
>>> ### Accessing a single pipe:
>>> pipes['sql:main']['weather'][None]
>>> ### Return a list instead:
>>> get_pipes(as_list=True)
[sql_main_weather]
>>> 
def get_connector( type: str = None, label: str = None, refresh: bool = False, debug: bool = False, **kw: Any) -> meerschaum.connectors.Connector.Connector:
 65def get_connector(
 66        type: str = None,
 67        label: str = None,
 68        refresh: bool = False,
 69        debug: bool = False,
 70        **kw: Any
 71    ) -> Connector:
 72    """
 73    Return existing connector or create new connection and store for reuse.
 74    
 75    You can create new connectors if enough parameters are provided for the given type and flavor.
 76    
 77
 78    Parameters
 79    ----------
 80    type: Optional[str], default None
 81        Connector type (sql, api, etc.).
 82        Defaults to the type of the configured `instance_connector`.
 83
 84    label: Optional[str], default None
 85        Connector label (e.g. main). Defaults to `'main'`.
 86
 87    refresh: bool, default False
 88        Refresh the Connector instance / construct new object. Defaults to `False`.
 89
 90    kw: Any
 91        Other arguments to pass to the Connector constructor.
 92        If the Connector has already been constructed and new arguments are provided,
 93        `refresh` is set to `True` and the old Connector is replaced.
 94
 95    Returns
 96    -------
 97    A new Meerschaum connector (e.g. `meerschaum.connectors.api.APIConnector`,
 98    `meerschaum.connectors.sql.SQLConnector`).
 99    
100    Examples
101    --------
102    The following parameters would create a new
103    `meerschaum.connectors.sql.SQLConnector` that isn't in the configuration file.
104
105    ```
106    >>> conn = get_connector(
107    ...     type = 'sql',
108    ...     label = 'newlabel',
109    ...     flavor = 'sqlite',
110    ...     database = '/file/path/to/database.db'
111    ... )
112    >>>
113    ```
114
115    """
116    from meerschaum.connectors.parse import parse_instance_keys
117    from meerschaum.config import get_config
118    from meerschaum.config.static import STATIC_CONFIG
119    from meerschaum.utils.warnings import warn
120    global _loaded_plugin_connectors
121    if isinstance(type, str) and not label and ':' in type:
122        type, label = type.split(':', maxsplit=1)
123    with _locks['_loaded_plugin_connectors']:
124        if not _loaded_plugin_connectors:
125            load_plugin_connectors()
126            _loaded_plugin_connectors = True
127    if type is None and label is None:
128        default_instance_keys = get_config('meerschaum', 'instance', patch=True)
129        ### recursive call to get_connector
130        return parse_instance_keys(default_instance_keys)
131
132    ### NOTE: the default instance connector may not be main.
133    ### Only fall back to 'main' if the type is provided by the label is omitted.
134    label = label if label is not None else STATIC_CONFIG['connectors']['default_label']
135
136    ### type might actually be a label. Check if so and raise a warning.
137    if type not in connectors:
138        possibilities, poss_msg = [], ""
139        for _type in get_config('meerschaum', 'connectors'):
140            if type in get_config('meerschaum', 'connectors', _type):
141                possibilities.append(f"{_type}:{type}")
142        if len(possibilities) > 0:
143            poss_msg = " Did you mean"
144            for poss in possibilities[:-1]:
145                poss_msg += f" '{poss}',"
146            if poss_msg.endswith(','):
147                poss_msg = poss_msg[:-1]
148            if len(possibilities) > 1:
149                poss_msg += " or"
150            poss_msg += f" '{possibilities[-1]}'?"
151
152        warn(f"Cannot create Connector of type '{type}'." + poss_msg, stack=False)
153        return None
154
155    if 'sql' not in types:
156        from meerschaum.connectors.plugin import PluginConnector
157        with _locks['types']:
158            types.update({
159                'api'   : APIConnector,
160                'sql'   : SQLConnector,
161                'plugin': PluginConnector,
162            })
163    
164    ### determine if we need to call the constructor
165    if not refresh:
166        ### see if any user-supplied arguments differ from the existing instance
167        if label in connectors[type]:
168            warning_message = None
169            for attribute, value in kw.items():
170                if attribute not in connectors[type][label].meta:
171                    import inspect
172                    cls = connectors[type][label].__class__
173                    cls_init_signature = inspect.signature(cls)
174                    cls_init_params = cls_init_signature.parameters
175                    if attribute not in cls_init_params:
176                        warning_message = (
177                            f"Received new attribute '{attribute}' not present in connector " +
178                            f"{connectors[type][label]}.\n"
179                        )
180                elif connectors[type][label].__dict__[attribute] != value:
181                    warning_message = (
182                        f"Mismatched values for attribute '{attribute}' in connector "
183                        + f"'{connectors[type][label]}'.\n" +
184                        f"  - Keyword value: '{value}'\n" +
185                        f"  - Existing value: '{connectors[type][label].__dict__[attribute]}'\n"
186                    )
187            if warning_message is not None:
188                warning_message += (
189                    "\nSetting `refresh` to True and recreating connector with type:"
190                    + f" '{type}' and label '{label}'."
191                )
192                refresh = True
193                warn(warning_message)
194        else: ### connector doesn't yet exist
195            refresh = True
196
197    ### only create an object if refresh is True
198    ### (can be manually specified, otherwise determined above)
199    if refresh:
200        with _locks['connectors']:
201            try:
202                ### will raise an error if configuration is incorrect / missing
203                conn = types[type](label=label, **kw)
204                connectors[type][label] = conn
205            except InvalidAttributesError as ie:
206                warn(
207                    f"Incorrect attributes for connector '{type}:{label}'.\n"
208                    + str(ie),
209                    stack = False,
210                )
211                conn = None
212            except Exception as e:
213                from meerschaum.utils.formatting import get_console
214                console = get_console()
215                if console:
216                    console.print_exception()
217                warn(
218                    f"Exception when creating connector '{type}:{label}'.\n" + str(e),
219                    stack = False,
220                )
221                conn = None
222        if conn is None:
223            return None
224
225    return connectors[type][label]

Return existing connector or create new connection and store for reuse.

You can create new connectors if enough parameters are provided for the given type and flavor.

Parameters
  • type (Optional[str], default None): Connector type (sql, api, etc.). Defaults to the type of the configured instance_connector.
  • label (Optional[str], default None): Connector label (e.g. main). Defaults to 'main'.
  • refresh (bool, default False): Refresh the Connector instance / construct new object. Defaults to False.
  • kw (Any): Other arguments to pass to the Connector constructor. If the Connector has already been constructed and new arguments are provided, refresh is set to True and the old Connector is replaced.
Returns
  • A new Meerschaum connector (e.g. meerschaum.connectors.api.APIConnector,
  • meerschaum.connectors.sql.SQLConnector).
Examples

The following parameters would create a new meerschaum.connectors.sql.SQLConnector that isn't in the configuration file.

>>> conn = get_connector(
...     type = 'sql',
...     label = 'newlabel',
...     flavor = 'sqlite',
...     database = '/file/path/to/database.db'
... )
>>>
def get_config( *keys: str, patch: bool = True, substitute: bool = True, sync_files: bool = True, write_missing: bool = True, as_tuple: bool = False, warn: bool = True, debug: bool = False) -> Any:
 75def get_config(
 76        *keys: str,
 77        patch: bool = True,
 78        substitute: bool = True,
 79        sync_files: bool = True,
 80        write_missing: bool = True,
 81        as_tuple: bool = False,
 82        warn: bool = True,
 83        debug: bool = False
 84    ) -> Any:
 85    """
 86    Return the Meerschaum configuration dictionary.
 87    If positional arguments are provided, index by the keys.
 88    Raises a warning if invalid keys are provided.
 89
 90    Parameters
 91    ----------
 92    keys: str:
 93        List of strings to index.
 94
 95    patch: bool, default True
 96        If `True`, patch missing default keys into the config directory.
 97        Defaults to `True`.
 98
 99    sync_files: bool, default True
100        If `True`, sync files if needed.
101        Defaults to `True`.
102
103    write_missing: bool, default True
104        If `True`, write default values when the main config files are missing.
105        Defaults to `True`.
106
107    substitute: bool, default True
108        If `True`, subsitute 'MRSM{}' values.
109        Defaults to `True`.
110
111    as_tuple: bool, default False
112        If `True`, return a tuple of type (success, value).
113        Defaults to `False`.
114        
115    Returns
116    -------
117    The value in the configuration directory, indexed by the provided keys.
118
119    Examples
120    --------
121    >>> get_config('meerschaum', 'instance')
122    'sql:main'
123    >>> get_config('does', 'not', 'exist')
124    UserWarning: Invalid keys in config: ('does', 'not', 'exist')
125    """
126    import json
127
128    symlinks_key = STATIC_CONFIG['config']['symlinks_key']
129    if debug:
130        from meerschaum.utils.debug import dprint
131        dprint(f"Indexing keys: {keys}", color=False)
132
133    if len(keys) == 0:
134        _rc = _config(substitute=substitute, sync_files=sync_files, write_missing=write_missing)
135        if as_tuple:
136            return True, _rc 
137        return _rc
138    
139    ### Weird threading issues, only import if substitute is True.
140    if substitute:
141        from meerschaum.config._read_config import search_and_substitute_config
142    ### Invalidate the cache if it was read before with substitute=False
143    ### but there still exist substitutions.
144    if (
145        config is not None and substitute and keys[0] != symlinks_key
146        and 'MRSM{' in json.dumps(config.get(keys[0]))
147    ):
148        try:
149            _subbed = search_and_substitute_config({keys[0]: config[keys[0]]})
150        except Exception as e:
151            import traceback
152            traceback.print_exc()
153        config[keys[0]] = _subbed[keys[0]]
154        if symlinks_key in _subbed:
155            if symlinks_key not in config:
156                config[symlinks_key] = {}
157            if keys[0] not in config[symlinks_key]:
158                config[symlinks_key][keys[0]] = {}
159            config[symlinks_key][keys[0]] = apply_patch_to_config(
160                _subbed,
161                config[symlinks_key][keys[0]]
162            )
163
164    from meerschaum.config._sync import sync_files as _sync_files
165    if config is None:
166        _config(*keys, sync_files=sync_files)
167
168    invalid_keys = False
169    if keys[0] not in config and keys[0] != symlinks_key:
170        single_key_config = read_config(
171            keys=[keys[0]], substitute=substitute, write_missing=write_missing
172        )
173        if keys[0] not in single_key_config:
174            invalid_keys = True
175        else:
176            config[keys[0]] = single_key_config.get(keys[0], None)
177            if symlinks_key in single_key_config and keys[0] in single_key_config[symlinks_key]:
178                if symlinks_key not in config:
179                    config[symlinks_key] = {}
180                config[symlinks_key][keys[0]] = single_key_config[symlinks_key][keys[0]]
181
182            if sync_files:
183                _sync_files(keys=[keys[0]])
184
185    c = config
186    if len(keys) > 0:
187        for k in keys:
188            try:
189                c = c[k]
190            except Exception as e:
191                invalid_keys = True
192                break
193        if invalid_keys:
194            ### Check if the keys are in the default configuration.
195            from meerschaum.config._default import default_config
196            in_default = True
197            patched_default_config = (
198                search_and_substitute_config(default_config)
199                if substitute else copy.deepcopy(default_config)
200            )
201            _c = patched_default_config
202            for k in keys:
203                try:
204                    _c = _c[k]
205                except Exception as e:
206                    in_default = False
207            if in_default:
208                c = _c
209                invalid_keys = False
210            warning_msg = f"Invalid keys in config: {keys}"
211            if not in_default:
212                try:
213                    if warn:
214                        from meerschaum.utils.warnings import warn as _warn
215                        _warn(warning_msg, stacklevel=3, color=False)
216                except Exception as e:
217                    if warn:
218                        print(warning_msg)
219                if as_tuple:
220                    return False, None
221                return None
222
223            ### Don't write keys that we haven't yet loaded into memory.
224            not_loaded_keys = [k for k in patched_default_config if k not in config]
225            for k in not_loaded_keys:
226                patched_default_config.pop(k, None)
227
228            set_config(
229                apply_patch_to_config(
230                    patched_default_config,
231                    config,
232                )
233            )
234            if patch and keys[0] != symlinks_key:
235                if write_missing:
236                    write_config(config, debug=debug)
237
238    if as_tuple:
239        return (not invalid_keys), c
240    return c

Return the Meerschaum configuration dictionary. If positional arguments are provided, index by the keys. Raises a warning if invalid keys are provided.

Parameters
  • keys (str:): List of strings to index.
  • patch (bool, default True): If True, patch missing default keys into the config directory. Defaults to True.
  • sync_files (bool, default True): If True, sync files if needed. Defaults to True.
  • write_missing (bool, default True): If True, write default values when the main config files are missing. Defaults to True.
  • substitute (bool, default True): If True, subsitute 'MRSM{}' values. Defaults to True.
  • as_tuple (bool, default False): If True, return a tuple of type (success, value). Defaults to False.
Returns
  • The value in the configuration directory, indexed by the provided keys.
Examples
>>> get_config('meerschaum', 'instance')
'sql:main'
>>> get_config('does', 'not', 'exist')
UserWarning: Invalid keys in config: ('does', 'not', 'exist')
class Pipe:
 59class Pipe:
 60    """
 61    Access Meerschaum pipes via Pipe objects.
 62    
 63    Pipes are identified by the following:
 64
 65    1. Connector keys (e.g. `'sql:main'`)
 66    2. Metric key (e.g. `'weather'`)
 67    3. Location (optional; e.g. `None`)
 68    
 69    A pipe's connector keys correspond to a data source, and when the pipe is synced,
 70    its `fetch` definition is evaluated and executed to produce new data.
 71    
 72    Alternatively, new data may be directly synced via `pipe.sync()`:
 73    
 74    ```
 75    >>> from meerschaum import Pipe
 76    >>> pipe = Pipe('csv', 'weather')
 77    >>>
 78    >>> import pandas as pd
 79    >>> df = pd.read_csv('weather.csv')
 80    >>> pipe.sync(df)
 81    ```
 82    """
 83
 84    from ._fetch import (
 85        fetch,
 86        get_backtrack_interval,
 87    )
 88    from ._data import (
 89        get_data,
 90        get_backtrack_data,
 91        get_rowcount,
 92        _get_data_as_iterator,
 93        get_chunk_interval,
 94        get_chunk_bounds,
 95    )
 96    from ._register import register
 97    from ._attributes import (
 98        attributes,
 99        parameters,
100        columns,
101        dtypes,
102        get_columns,
103        get_columns_types,
104        get_indices,
105        tags,
106        get_id,
107        id,
108        get_val_column,
109        parents,
110        children,
111        target,
112        _target_legacy,
113        guess_datetime,
114    )
115    from ._show import show
116    from ._edit import edit, edit_definition, update
117    from ._sync import (
118        sync,
119        get_sync_time,
120        exists,
121        filter_existing,
122        _get_chunk_label,
123        get_num_workers,
124    )
125    from ._verify import (
126        verify,
127        get_bound_interval,
128        get_bound_time,
129    )
130    from ._delete import delete
131    from ._drop import drop
132    from ._clear import clear
133    from ._deduplicate import deduplicate
134    from ._bootstrap import bootstrap
135    from ._dtypes import enforce_dtypes, infer_dtypes
136
137    def __init__(
138        self,
139        connector: str = '',
140        metric: str = '',
141        location: Optional[str] = None,
142        parameters: Optional[Dict[str, Any]] = None,
143        columns: Union[Dict[str, str], List[str], None] = None,
144        tags: Optional[List[str]] = None,
145        target: Optional[str] = None,
146        dtypes: Optional[Dict[str, str]] = None,
147        instance: Optional[Union[str, InstanceConnector]] = None,
148        temporary: bool = False,
149        mrsm_instance: Optional[Union[str, InstanceConnector]] = None,
150        cache: bool = False,
151        debug: bool = False,
152        connector_keys: Optional[str] = None,
153        metric_key: Optional[str] = None,
154        location_key: Optional[str] = None,
155    ):
156        """
157        Parameters
158        ----------
159        connector: str
160            Keys for the pipe's source connector, e.g. `'sql:main'`.
161
162        metric: str
163            Label for the pipe's contents, e.g. `'weather'`.
164
165        location: str, default None
166            Label for the pipe's location. Defaults to `None`.
167
168        parameters: Optional[Dict[str, Any]], default None
169            Optionally set a pipe's parameters from the constructor,
170            e.g. columns and other attributes.
171            You can edit these parameters with `edit pipes`.
172
173        columns: Optional[Dict[str, str]], default None
174            Set the `columns` dictionary of `parameters`.
175            If `parameters` is also provided, this dictionary is added under the `'columns'` key.
176
177        tags: Optional[List[str]], default None
178            A list of strings to be added under the `'tags'` key of `parameters`.
179            You can select pipes with certain tags using `--tags`.
180
181        dtypes: Optional[Dict[str, str]], default None
182            Set the `dtypes` dictionary of `parameters`.
183            If `parameters` is also provided, this dictionary is added under the `'dtypes'` key.
184
185        mrsm_instance: Optional[Union[str, InstanceConnector]], default None
186            Connector for the Meerschaum instance where the pipe resides.
187            Defaults to the preconfigured default instance (`'sql:main'`).
188
189        instance: Optional[Union[str, InstanceConnector]], default None
190            Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored.
191
192        temporary: bool, default False
193            If `True`, prevent instance tables (pipes, users, plugins) from being created.
194
195        cache: bool, default False
196            If `True`, cache fetched data into a local database file.
197            Defaults to `False`.
198        """
199        from meerschaum.utils.warnings import error, warn
200        if (not connector and not connector_keys) or (not metric and not metric_key):
201            error(
202                "Please provide strings for the connector and metric\n    "
203                + "(first two positional arguments)."
204            )
205
206        ### Fall back to legacy `location_key` just in case.
207        if not location:
208            location = location_key
209
210        if not connector:
211            connector = connector_keys
212
213        if not metric:
214            metric = metric_key
215
216        if location in ('[None]', 'None'):
217            location = None
218
219        from meerschaum.config.static import STATIC_CONFIG
220        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
221        for k in (connector, metric, location, *(tags or [])):
222            if str(k).startswith(negation_prefix):
223                error(f"A pipe's keys and tags cannot start with the prefix '{negation_prefix}'.")
224
225        self.connector_keys = str(connector)
226        self.connector_key = self.connector_keys ### Alias
227        self.metric_key = metric
228        self.location_key = location
229        self.temporary = temporary
230
231        self._attributes = {
232            'connector_keys': self.connector_keys,
233            'metric_key': self.metric_key,
234            'location_key': self.location_key,
235            'parameters': {},
236        }
237
238        ### only set parameters if values are provided
239        if isinstance(parameters, dict):
240            self._attributes['parameters'] = parameters
241        else:
242            if parameters is not None:
243                warn(f"The provided parameters are of invalid type '{type(parameters)}'.")
244            self._attributes['parameters'] = {}
245
246        columns = columns or self._attributes.get('parameters', {}).get('columns', {})
247        if isinstance(columns, list):
248            columns = {str(col): str(col) for col in columns}
249        if isinstance(columns, dict):
250            self._attributes['parameters']['columns'] = columns
251        elif columns is not None:
252            warn(f"The provided columns are of invalid type '{type(columns)}'.")
253
254        if isinstance(tags, (list, tuple)):
255            self._attributes['parameters']['tags'] = tags
256        elif tags is not None:
257            warn(f"The provided tags are of invalid type '{type(tags)}'.")
258
259        if isinstance(target, str):
260            self._attributes['parameters']['target'] = target
261        elif target is not None:
262            warn(f"The provided target is of invalid type '{type(target)}'.")
263
264        if isinstance(dtypes, dict):
265            self._attributes['parameters']['dtypes'] = dtypes
266        elif dtypes is not None:
267            warn(f"The provided dtypes are of invalid type '{type(dtypes)}'.")
268
269        ### NOTE: The parameters dictionary is {} by default.
270        ###       A Pipe may be registered without parameters, then edited,
271        ###       or a Pipe may be registered with parameters set in-memory first.
272        #  from meerschaum.config import get_config
273        _mrsm_instance = mrsm_instance if mrsm_instance is not None else instance
274        if _mrsm_instance is None:
275            _mrsm_instance = get_config('meerschaum', 'instance', patch=True)
276
277        if not isinstance(_mrsm_instance, str):
278            self._instance_connector = _mrsm_instance
279            self.instance_keys = str(_mrsm_instance)
280        else: ### NOTE: must be SQL or API Connector for this work
281            self.instance_keys = _mrsm_instance
282
283        self._cache = cache and get_config('system', 'experimental', 'cache')
284
285
286    @property
287    def meta(self):
288        """
289        Return the four keys needed to reconstruct this pipe.
290        """
291        return {
292            'connector': self.connector_keys,
293            'metric': self.metric_key,
294            'location': self.location_key,
295            'instance': self.instance_keys,
296        }
297
298
299    def keys(self) -> List[str]:
300        """
301        Return the ordered keys for this pipe.
302        """
303        return {
304            key: val
305            for key, val in self.meta.items()
306            if key != 'instance'
307        }
308
309
310    @property
311    def instance_connector(self) -> Union[InstanceConnector, None]:
312        """
313        The connector to where this pipe resides.
314        May either be of type `meerschaum.connectors.sql.SQLConnector` or
315        `meerschaum.connectors.api.APIConnector`.
316        """
317        if '_instance_connector' not in self.__dict__:
318            from meerschaum.connectors.parse import parse_instance_keys
319            conn = parse_instance_keys(self.instance_keys)
320            if conn:
321                self._instance_connector = conn
322            else:
323                return None
324        return self._instance_connector
325
326    @property
327    def connector(self) -> Union[meerschaum.connectors.Connector, None]:
328        """
329        The connector to the data source.
330        """
331        if '_connector' not in self.__dict__:
332            from meerschaum.connectors.parse import parse_instance_keys
333            import warnings
334            with warnings.catch_warnings():
335                warnings.simplefilter('ignore')
336                try:
337                    conn = parse_instance_keys(self.connector_keys)
338                except Exception as e:
339                    conn = None
340            if conn:
341                self._connector = conn
342            else:
343                return None
344        return self._connector
345
346
347    @property
348    def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]:
349        """
350        If the pipe was created with `cache=True`, return the connector to the pipe's
351        SQLite database for caching.
352        """
353        if not self._cache:
354            return None
355
356        if '_cache_connector' not in self.__dict__:
357            from meerschaum.connectors import get_connector
358            from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH
359            _resources_path = SQLITE_RESOURCES_PATH
360            self._cache_connector = get_connector(
361                'sql', '_cache_' + str(self),
362                flavor='sqlite',
363                database=str(_resources_path / ('_cache_' + str(self) + '.db')),
364            )
365
366        return self._cache_connector
367
368
369    @property
370    def cache_pipe(self) -> Union['meerschaum.Pipe', None]:
371        """
372        If the pipe was created with `cache=True`, return another `meerschaum.Pipe` used to
373        manage the local data.
374        """
375        if self.cache_connector is None:
376            return None
377        if '_cache_pipe' not in self.__dict__:
378            from meerschaum.config._patch import apply_patch_to_config
379            from meerschaum.utils.sql import sql_item_name
380            _parameters = copy.deepcopy(self.parameters)
381            _fetch_patch = {
382                'fetch': ({
383                    'definition': (
384                        f"SELECT * FROM "
385                        + sql_item_name(
386                            str(self.target),
387                            self.instance_connector.flavor,
388                            self.instance_connector.get_pipe_schema(self),
389                        )
390                    ),
391                }) if self.instance_connector.type == 'sql' else ({
392                    'connector_keys': self.connector_keys,
393                    'metric_key': self.metric_key,
394                    'location_key': self.location_key,
395                })
396            }
397            _parameters = apply_patch_to_config(_parameters, _fetch_patch)
398            self._cache_pipe = Pipe(
399                self.instance_keys,
400                (self.connector_keys + '_' + self.metric_key + '_cache'),
401                self.location_key,
402                mrsm_instance = self.cache_connector,
403                parameters = _parameters,
404                cache = False,
405                temporary = True,
406            )
407
408        return self._cache_pipe
409
410
411    def __str__(self, ansi: bool=False):
412        return pipe_repr(self, ansi=ansi)
413
414
415    def __eq__(self, other):
416        try:
417            return (
418                isinstance(self, type(other))
419                and self.connector_keys == other.connector_keys
420                and self.metric_key == other.metric_key
421                and self.location_key == other.location_key
422                and self.instance_keys == other.instance_keys
423            )
424        except Exception as e:
425            return False
426
427    def __hash__(self):
428        ### Using an esoteric separator to avoid collisions.
429        sep = "[\"']"
430        return hash(
431            str(self.connector_keys) + sep
432            + str(self.metric_key) + sep
433            + str(self.location_key) + sep
434            + str(self.instance_keys) + sep
435        )
436
437    def __repr__(self, **kw) -> str:
438        return pipe_repr(self, **kw)
439
440    def __getstate__(self) -> Dict[str, Any]:
441        """
442        Define the state dictionary (pickling).
443        """
444        return {
445            'connector': self.connector_keys,
446            'metric': self.metric_key,
447            'location': self.location_key,
448            'parameters': self.parameters,
449            'instance': self.instance_keys,
450        }
451
452    def __setstate__(self, _state: Dict[str, Any]):
453        """
454        Read the state (unpickling).
455        """
456        self.__init__(**_state)
457
458
459    def __getitem__(self, key: str) -> Any:
460        """
461        Index the pipe's attributes.
462        If the `key` cannot be found`, return `None`.
463        """
464        if key in self.attributes:
465            return self.attributes.get(key, None)
466
467        aliases = {
468            'connector': 'connector_keys',
469            'connector_key': 'connector_keys',
470            'metric': 'metric_key',
471            'location': 'location_key',
472        }
473        aliased_key = aliases.get(key, None)
474        if aliased_key is not None:
475            return self.attributes.get(aliased_key, None)
476
477        property_aliases = {
478            'instance': 'instance_keys',
479            'instance_key': 'instance_keys',
480        }
481        aliased_key = property_aliases.get(key, None)
482        if aliased_key is not None:
483            key = aliased_key
484        return getattr(self, key, None)

Access Meerschaum pipes via Pipe objects.

Pipes are identified by the following:

  1. Connector keys (e.g. 'sql:main')
  2. Metric key (e.g. 'weather')
  3. Location (optional; e.g. None)

A pipe's connector keys correspond to a data source, and when the pipe is synced, its fetch definition is evaluated and executed to produce new data.

Alternatively, new data may be directly synced via pipe.sync():

>>> from meerschaum import Pipe
>>> pipe = Pipe('csv', 'weather')
>>>
>>> import pandas as pd
>>> df = pd.read_csv('weather.csv')
>>> pipe.sync(df)
Pipe( connector: str = '', metric: str = '', location: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None, columns: Union[Dict[str, str], List[str], NoneType] = None, tags: Optional[List[str]] = None, target: Optional[str] = None, dtypes: Optional[Dict[str, str]] = None, instance: Union[str, meerschaum.connectors.sql.SQLConnector.SQLConnector, meerschaum.connectors.api.APIConnector.APIConnector, NoneType] = None, temporary: bool = False, mrsm_instance: Union[str, meerschaum.connectors.sql.SQLConnector.SQLConnector, meerschaum.connectors.api.APIConnector.APIConnector, NoneType] = None, cache: bool = False, debug: bool = False, connector_keys: Optional[str] = None, metric_key: Optional[str] = None, location_key: Optional[str] = None)
137    def __init__(
138        self,
139        connector: str = '',
140        metric: str = '',
141        location: Optional[str] = None,
142        parameters: Optional[Dict[str, Any]] = None,
143        columns: Union[Dict[str, str], List[str], None] = None,
144        tags: Optional[List[str]] = None,
145        target: Optional[str] = None,
146        dtypes: Optional[Dict[str, str]] = None,
147        instance: Optional[Union[str, InstanceConnector]] = None,
148        temporary: bool = False,
149        mrsm_instance: Optional[Union[str, InstanceConnector]] = None,
150        cache: bool = False,
151        debug: bool = False,
152        connector_keys: Optional[str] = None,
153        metric_key: Optional[str] = None,
154        location_key: Optional[str] = None,
155    ):
156        """
157        Parameters
158        ----------
159        connector: str
160            Keys for the pipe's source connector, e.g. `'sql:main'`.
161
162        metric: str
163            Label for the pipe's contents, e.g. `'weather'`.
164
165        location: str, default None
166            Label for the pipe's location. Defaults to `None`.
167
168        parameters: Optional[Dict[str, Any]], default None
169            Optionally set a pipe's parameters from the constructor,
170            e.g. columns and other attributes.
171            You can edit these parameters with `edit pipes`.
172
173        columns: Optional[Dict[str, str]], default None
174            Set the `columns` dictionary of `parameters`.
175            If `parameters` is also provided, this dictionary is added under the `'columns'` key.
176
177        tags: Optional[List[str]], default None
178            A list of strings to be added under the `'tags'` key of `parameters`.
179            You can select pipes with certain tags using `--tags`.
180
181        dtypes: Optional[Dict[str, str]], default None
182            Set the `dtypes` dictionary of `parameters`.
183            If `parameters` is also provided, this dictionary is added under the `'dtypes'` key.
184
185        mrsm_instance: Optional[Union[str, InstanceConnector]], default None
186            Connector for the Meerschaum instance where the pipe resides.
187            Defaults to the preconfigured default instance (`'sql:main'`).
188
189        instance: Optional[Union[str, InstanceConnector]], default None
190            Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored.
191
192        temporary: bool, default False
193            If `True`, prevent instance tables (pipes, users, plugins) from being created.
194
195        cache: bool, default False
196            If `True`, cache fetched data into a local database file.
197            Defaults to `False`.
198        """
199        from meerschaum.utils.warnings import error, warn
200        if (not connector and not connector_keys) or (not metric and not metric_key):
201            error(
202                "Please provide strings for the connector and metric\n    "
203                + "(first two positional arguments)."
204            )
205
206        ### Fall back to legacy `location_key` just in case.
207        if not location:
208            location = location_key
209
210        if not connector:
211            connector = connector_keys
212
213        if not metric:
214            metric = metric_key
215
216        if location in ('[None]', 'None'):
217            location = None
218
219        from meerschaum.config.static import STATIC_CONFIG
220        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
221        for k in (connector, metric, location, *(tags or [])):
222            if str(k).startswith(negation_prefix):
223                error(f"A pipe's keys and tags cannot start with the prefix '{negation_prefix}'.")
224
225        self.connector_keys = str(connector)
226        self.connector_key = self.connector_keys ### Alias
227        self.metric_key = metric
228        self.location_key = location
229        self.temporary = temporary
230
231        self._attributes = {
232            'connector_keys': self.connector_keys,
233            'metric_key': self.metric_key,
234            'location_key': self.location_key,
235            'parameters': {},
236        }
237
238        ### only set parameters if values are provided
239        if isinstance(parameters, dict):
240            self._attributes['parameters'] = parameters
241        else:
242            if parameters is not None:
243                warn(f"The provided parameters are of invalid type '{type(parameters)}'.")
244            self._attributes['parameters'] = {}
245
246        columns = columns or self._attributes.get('parameters', {}).get('columns', {})
247        if isinstance(columns, list):
248            columns = {str(col): str(col) for col in columns}
249        if isinstance(columns, dict):
250            self._attributes['parameters']['columns'] = columns
251        elif columns is not None:
252            warn(f"The provided columns are of invalid type '{type(columns)}'.")
253
254        if isinstance(tags, (list, tuple)):
255            self._attributes['parameters']['tags'] = tags
256        elif tags is not None:
257            warn(f"The provided tags are of invalid type '{type(tags)}'.")
258
259        if isinstance(target, str):
260            self._attributes['parameters']['target'] = target
261        elif target is not None:
262            warn(f"The provided target is of invalid type '{type(target)}'.")
263
264        if isinstance(dtypes, dict):
265            self._attributes['parameters']['dtypes'] = dtypes
266        elif dtypes is not None:
267            warn(f"The provided dtypes are of invalid type '{type(dtypes)}'.")
268
269        ### NOTE: The parameters dictionary is {} by default.
270        ###       A Pipe may be registered without parameters, then edited,
271        ###       or a Pipe may be registered with parameters set in-memory first.
272        #  from meerschaum.config import get_config
273        _mrsm_instance = mrsm_instance if mrsm_instance is not None else instance
274        if _mrsm_instance is None:
275            _mrsm_instance = get_config('meerschaum', 'instance', patch=True)
276
277        if not isinstance(_mrsm_instance, str):
278            self._instance_connector = _mrsm_instance
279            self.instance_keys = str(_mrsm_instance)
280        else: ### NOTE: must be SQL or API Connector for this work
281            self.instance_keys = _mrsm_instance
282
283        self._cache = cache and get_config('system', 'experimental', 'cache')
Parameters
  • connector (str): Keys for the pipe's source connector, e.g. 'sql:main'.
  • metric (str): Label for the pipe's contents, e.g. 'weather'.
  • location (str, default None): Label for the pipe's location. Defaults to None.
  • parameters (Optional[Dict[str, Any]], default None): Optionally set a pipe's parameters from the constructor, e.g. columns and other attributes. You can edit these parameters with edit pipes.
  • columns (Optional[Dict[str, str]], default None): Set the columns dictionary of parameters. If parameters is also provided, this dictionary is added under the 'columns' key.
  • tags (Optional[List[str]], default None): A list of strings to be added under the 'tags' key of parameters. You can select pipes with certain tags using --tags.
  • dtypes (Optional[Dict[str, str]], default None): Set the dtypes dictionary of parameters. If parameters is also provided, this dictionary is added under the 'dtypes' key.
  • mrsm_instance (Optional[Union[str, InstanceConnector]], default None): Connector for the Meerschaum instance where the pipe resides. Defaults to the preconfigured default instance ('sql:main').
  • instance (Optional[Union[str, InstanceConnector]], default None): Alias for mrsm_instance. If mrsm_instance is supplied, this value is ignored.
  • temporary (bool, default False): If True, prevent instance tables (pipes, users, plugins) from being created.
  • cache (bool, default False): If True, cache fetched data into a local database file. Defaults to False.
connector_keys
connector_key
metric_key
location_key
temporary
meta

Return the four keys needed to reconstruct this pipe.

def keys(self) -> List[str]:
299    def keys(self) -> List[str]:
300        """
301        Return the ordered keys for this pipe.
302        """
303        return {
304            key: val
305            for key, val in self.meta.items()
306            if key != 'instance'
307        }

Return the ordered keys for this pipe.

instance_connector: Union[meerschaum.connectors.sql.SQLConnector.SQLConnector, meerschaum.connectors.api.APIConnector.APIConnector, NoneType]

The connector to where this pipe resides. May either be of type meerschaum.connectors.sql.SQLConnector or meerschaum.connectors.api.APIConnector.

The connector to the data source.

cache_connector: Optional[meerschaum.connectors.sql.SQLConnector.SQLConnector]

If the pipe was created with cache=True, return the connector to the pipe's SQLite database for caching.

cache_pipe: Optional[Pipe]

If the pipe was created with cache=True, return another meerschaum.Pipe used to manage the local data.

def fetch( self, begin: Union[datetime.datetime, str, NoneType] = '', end: Optional[datetime.datetime] = None, check_existing: bool = True, sync_chunks: bool = False, debug: bool = False, **kw: Any) -> "Union['pd.DataFrame', Iterator['pd.DataFrame'], None]":
17def fetch(
18        self,
19        begin: Union[datetime, str, None] = '',
20        end: Optional[datetime] = None,
21        check_existing: bool = True,
22        sync_chunks: bool = False,
23        debug: bool = False,
24        **kw: Any
25    ) -> Union['pd.DataFrame', Iterator['pd.DataFrame'], None]:
26    """
27    Fetch a Pipe's latest data from its connector.
28
29    Parameters
30    ----------
31    begin: Union[datetime, str, None], default '':
32        If provided, only fetch data newer than or equal to `begin`.
33
34    end: Optional[datetime], default None:
35        If provided, only fetch data older than or equal to `end`.
36
37    check_existing: bool, default True
38        If `False`, do not apply the backtrack interval.
39
40    sync_chunks: bool, default False
41        If `True` and the pipe's connector is of type `'sql'`, begin syncing chunks while fetching
42        loads chunks into memory.
43
44    debug: bool, default False
45        Verbosity toggle.
46
47    Returns
48    -------
49    A `pd.DataFrame` of the newest unseen data.
50
51    """
52    if 'fetch' not in dir(self.connector):
53        warn(f"No `fetch()` function defined for connector '{self.connector}'")
54        return None
55
56    from meerschaum.connectors import custom_types, get_connector_plugin
57    from meerschaum.utils.debug import dprint, _checkpoint
58
59    _chunk_hook = kw.pop('chunk_hook', None)
60    kw['workers'] = self.get_num_workers(kw.get('workers', None))
61    if sync_chunks and _chunk_hook is None:
62
63        def _chunk_hook(chunk, **_kw) -> SuccessTuple:
64            """
65            Wrap `Pipe.sync()` with a custom chunk label prepended to the message.
66            """
67            from meerschaum.config._patch import apply_patch_to_config
68            kwargs = apply_patch_to_config(kw, _kw)
69            chunk_success, chunk_message = self.sync(chunk, **kwargs)
70            chunk_label = self._get_chunk_label(chunk, self.columns.get('datetime', None))
71            if chunk_label:
72                chunk_message = '\n' + chunk_label + '\n' + chunk_message
73            return chunk_success, chunk_message
74
75
76    with mrsm.Venv(get_connector_plugin(self.connector)):
77        df = self.connector.fetch(
78            self,
79            begin = _determine_begin(
80                self,
81                begin,
82                check_existing = check_existing,
83                debug = debug,
84            ),
85            end = end,
86            chunk_hook = _chunk_hook,
87            debug = debug,
88            **kw
89        )
90    return df

Fetch a Pipe's latest data from its connector.

Parameters
  • begin (Union[datetime, str, None], default '':): If provided, only fetch data newer than or equal to begin.
  • end (Optional[datetime], default None:): If provided, only fetch data older than or equal to end.
  • check_existing (bool, default True): If False, do not apply the backtrack interval.
  • sync_chunks (bool, default False): If True and the pipe's connector is of type 'sql', begin syncing chunks while fetching loads chunks into memory.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pd.DataFrame of the newest unseen data.
def get_backtrack_interval( self, check_existing: bool = True, debug: bool = False) -> Union[datetime.timedelta, int]:
 93def get_backtrack_interval(
 94        self,
 95        check_existing: bool = True,
 96        debug: bool = False,
 97    ) -> Union[timedelta, int]:
 98    """
 99    Get the chunk interval to use for this pipe.
100
101    Parameters
102    ----------
103    check_existing: bool, default True
104        If `False`, return a backtrack_interval of 0 minutes.
105
106    Returns
107    -------
108    The backtrack interval (`timedelta` or `int`) to use with this pipe's `datetime` axis.
109    """
110    default_backtrack_minutes = get_config('pipes', 'parameters', 'fetch', 'backtrack_minutes')
111    configured_backtrack_minutes = self.parameters.get('fetch', {}).get('backtrack_minutes', None)
112    backtrack_minutes = (
113        configured_backtrack_minutes
114        if configured_backtrack_minutes is not None
115        else default_backtrack_minutes
116    ) if check_existing else 0
117
118    backtrack_interval = timedelta(minutes=backtrack_minutes)
119    dt_col = self.columns.get('datetime', None)
120    if dt_col is None:
121        return backtrack_interval
122
123    dt_dtype = self.dtypes.get(dt_col, 'datetime64[ns]')
124    if 'int' in dt_dtype.lower():
125        return backtrack_minutes
126
127    return backtrack_interval

Get the chunk interval to use for this pipe.

Parameters
  • check_existing (bool, default True): If False, return a backtrack_interval of 0 minutes.
Returns
  • The backtrack interval (timedelta or int) to use with this pipe's datetime axis.
def get_data( self, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, as_iterator: bool = False, as_chunks: bool = False, as_dask: bool = False, chunk_interval: Union[datetime.timedelta, int, NoneType] = None, fresh: bool = False, debug: bool = False, **kw: Any) -> "Union['pd.DataFrame', Generator['pd.DataFrame'], None]":
 15def get_data(
 16        self,
 17        select_columns: Optional[List[str]] = None,
 18        omit_columns: Optional[List[str]] = None,
 19        begin: Union[datetime, int, None] = None,
 20        end: Union[datetime, int, None] = None,
 21        params: Optional[Dict[str, Any]] = None,
 22        as_iterator: bool = False,
 23        as_chunks: bool = False,
 24        as_dask: bool = False,
 25        chunk_interval: Union[timedelta, int, None] = None,
 26        fresh: bool = False,
 27        debug: bool = False,
 28        **kw: Any
 29    ) -> Union['pd.DataFrame', Generator['pd.DataFrame'], None]:
 30    """
 31    Get a pipe's data from the instance connector.
 32
 33    Parameters
 34    ----------
 35    select_columns: Optional[List[str]], default None
 36        If provided, only select these given columns.
 37        Otherwise select all available columns (i.e. `SELECT *`).
 38
 39    omit_columns: Optional[List[str]], default None
 40        If provided, remove these columns from the selection.
 41
 42    begin: Union[datetime, int, None], default None
 43        Lower bound datetime to begin searching for data (inclusive).
 44        Translates to a `WHERE` clause like `WHERE datetime >= begin`.
 45        Defaults to `None`.
 46
 47    end: Union[datetime, int, None], default None
 48        Upper bound datetime to stop searching for data (inclusive).
 49        Translates to a `WHERE` clause like `WHERE datetime < end`.
 50        Defaults to `None`.
 51
 52    params: Optional[Dict[str, Any]], default None
 53        Filter the retrieved data by a dictionary of parameters.
 54        See `meerschaum.utils.sql.build_where` for more details. 
 55
 56    as_iterator: bool, default False
 57        If `True`, return a generator of chunks of pipe data.
 58
 59    as_chunks: bool, default False
 60        Alias for `as_iterator`.
 61
 62    as_dask: bool, default False
 63        If `True`, return a `dask.DataFrame`
 64        (which may be loaded into a Pandas DataFrame with `df.compute()`).
 65
 66    chunk_interval: Union[timedelta, int, None], default None
 67        If `as_iterator`, then return chunks with `begin` and `end` separated by this interval.
 68        This may be set under `pipe.parameters['chunk_minutes']`.
 69        By default, use a timedelta of 1440 minutes (1 day).
 70        If `chunk_interval` is an integer and the `datetime` axis a timestamp,
 71        the use a timedelta with the number of minutes configured to this value.
 72        If the `datetime` axis is an integer, default to the configured chunksize.
 73        If `chunk_interval` is a `timedelta` and the `datetime` axis an integer,
 74        use the number of minutes in the `timedelta`.
 75
 76    fresh: bool, default True
 77        If `True`, skip local cache and directly query the instance connector.
 78        Defaults to `True`.
 79
 80    debug: bool, default False
 81        Verbosity toggle.
 82        Defaults to `False`.
 83
 84    Returns
 85    -------
 86    A `pd.DataFrame` for the pipe's data corresponding to the provided parameters.
 87
 88    """
 89    from meerschaum.utils.warnings import warn
 90    from meerschaum.utils.venv import Venv
 91    from meerschaum.connectors import get_connector_plugin
 92    from meerschaum.utils.misc import iterate_chunks, items_str
 93    from meerschaum.utils.dtypes import to_pandas_dtype
 94    from meerschaum.utils.dataframe import add_missing_cols_to_df
 95    from meerschaum.utils.packages import attempt_import
 96    dd = attempt_import('dask.dataframe') if as_dask else None
 97    dask = attempt_import('dask') if as_dask else None
 98
 99    if select_columns == '*':
100        select_columns = None
101    elif isinstance(select_columns, str):
102        select_columns = [select_columns]
103
104    if isinstance(omit_columns, str):
105        omit_columns = [omit_columns]
106
107    as_iterator = as_iterator or as_chunks
108
109    if as_iterator or as_chunks:
110        return self._get_data_as_iterator(
111            select_columns = select_columns,
112            omit_columns = omit_columns,
113            begin = begin,
114            end = end,
115            params = params,
116            chunk_interval = chunk_interval,
117            fresh = fresh,
118            debug = debug,
119        )
120
121    if as_dask:
122        from multiprocessing.pool import ThreadPool
123        dask_pool = ThreadPool(self.get_num_workers())
124        dask.config.set(pool=dask_pool)
125        chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug)
126        bounds = self.get_chunk_bounds(
127            begin = begin,
128            end = end,
129            bounded = False,
130            chunk_interval = chunk_interval,
131            debug = debug,
132        )
133        dask_chunks = [
134            dask.delayed(self.get_data)(
135                select_columns = select_columns,
136                omit_columns = omit_columns,
137                begin = chunk_begin,
138                end = chunk_end,
139                params = params,
140                chunk_interval = chunk_interval,
141                fresh = fresh,
142                debug = debug,
143            )
144            for (chunk_begin, chunk_end) in bounds
145        ]
146        dask_meta = {
147            col: to_pandas_dtype(typ)
148            for col, typ in self.dtypes.items()
149        }
150        return dd.from_delayed(dask_chunks, meta=dask_meta)
151
152    if not self.exists(debug=debug):
153        return None
154       
155    if self.cache_pipe is not None:
156        if not fresh:
157            _sync_cache_tuple = self.cache_pipe.sync(
158                begin = begin,
159                end = end,
160                params = params,
161                debug = debug,
162                **kw
163            )
164            if not _sync_cache_tuple[0]:
165                warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1])
166                fresh = True
167            else: ### Successfully synced cache.
168                return self.enforce_dtypes(
169                    self.cache_pipe.get_data(
170                        select_columns = select_columns,
171                        omit_columns = omit_columns,
172                        begin = begin,
173                        end = end,
174                        params = params,
175                        debug = debug,
176                        fresh = True,
177                        **kw
178                    ),
179                    debug = debug,
180                )
181
182    with Venv(get_connector_plugin(self.instance_connector)):
183        df = self.instance_connector.get_pipe_data(
184            pipe = self,
185            select_columns = select_columns,
186            omit_columns = omit_columns,
187            begin = begin,
188            end = end,
189            params = params,
190            debug = debug,
191            **kw
192        )
193        if df is None:
194            return df
195
196        if not select_columns:
197            select_columns = [col for col in df.columns]
198
199        cols_to_omit = [
200            col
201            for col in df.columns
202            if (
203                col in (omit_columns or [])
204                or
205                col not in (select_columns or [])
206            )
207        ]
208        cols_to_add = [
209            col
210            for col in select_columns
211            if col not in df.columns
212        ]
213        if cols_to_omit:
214            warn(
215                (
216                    f"Received {len(cols_to_omit)} omitted column"
217                    + ('s' if len(cols_to_omit) != 1 else '')
218                    + f" for {self}. "
219                    + "Consider adding `select_columns` and `omit_columns` support to "
220                    + f"'{self.instance_connector.type}' connectors to improve performance."
221                ),
222                stack = False,
223            )
224            _cols_to_select = [col for col in df.columns if col not in cols_to_omit]
225            df = df[_cols_to_select]
226
227        if cols_to_add:
228            warn(
229                (
230                    f"Specified columns {items_str(cols_to_add)} were not found on {self}. "
231                    + "Adding these to the DataFrame as null columns."
232                ),
233                stack = False,
234            )
235            df = add_missing_cols_to_df(df, {col: 'string' for col in cols_to_add})
236
237        return self.enforce_dtypes(df, debug=debug)

Get a pipe's data from the instance connector.

Parameters
  • select_columns (Optional[List[str]], default None): If provided, only select these given columns. Otherwise select all available columns (i.e. SELECT *).
  • omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
  • begin (Union[datetime, int, None], default None): Lower bound datetime to begin searching for data (inclusive). Translates to a WHERE clause like WHERE datetime >= begin. Defaults to None.
  • end (Union[datetime, int, None], default None): Upper bound datetime to stop searching for data (inclusive). Translates to a WHERE clause like WHERE datetime < end. Defaults to None.
  • params (Optional[Dict[str, Any]], default None): Filter the retrieved data by a dictionary of parameters. See meerschaum.utils.sql.build_where for more details.
  • as_iterator (bool, default False): If True, return a generator of chunks of pipe data.
  • as_chunks (bool, default False): Alias for as_iterator.
  • as_dask (bool, default False): If True, return a dask.DataFrame (which may be loaded into a Pandas DataFrame with df.compute()).
  • chunk_interval (Union[timedelta, int, None], default None): If as_iterator, then return chunks with begin and end separated by this interval. This may be set under pipe.parameters['chunk_minutes']. By default, use a timedelta of 1440 minutes (1 day). If chunk_interval is an integer and the datetime axis a timestamp, the use a timedelta with the number of minutes configured to this value. If the datetime axis is an integer, default to the configured chunksize. If chunk_interval is a timedelta and the datetime axis an integer, use the number of minutes in the timedelta.
  • fresh (bool, default True): If True, skip local cache and directly query the instance connector. Defaults to True.
  • debug (bool, default False): Verbosity toggle. Defaults to False.
Returns
  • A pd.DataFrame for the pipe's data corresponding to the provided parameters.
def get_backtrack_data( self, backtrack_minutes: Optional[int] = None, begin: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, fresh: bool = False, debug: bool = False, **kw: Any) -> "Optional['pd.DataFrame']":
332def get_backtrack_data(
333        self,
334        backtrack_minutes: Optional[int] = None,
335        begin: Union[datetime, int, None] = None,
336        params: Optional[Dict[str, Any]] = None,
337        fresh: bool = False,
338        debug: bool = False,
339        **kw: Any
340    ) -> Optional['pd.DataFrame']:
341    """
342    Get the most recent data from the instance connector as a Pandas DataFrame.
343
344    Parameters
345    ----------
346    backtrack_minutes: Optional[int], default None
347        How many minutes from `begin` to select from.
348        If `None`, use `pipe.parameters['fetch']['backtrack_minutes']`.
349
350    begin: Optional[datetime], default None
351        The starting point to search for data.
352        If begin is `None` (default), use the most recent observed datetime
353        (AKA sync_time).
354
355        ```
356        E.g. begin = 02:00
357
358        Search this region.           Ignore this, even if there's data.
359        /  /  /  /  /  /  /  /  /  |
360        -----|----------|----------|----------|----------|----------|
361        00:00      01:00      02:00      03:00      04:00      05:00
362
363        ```
364
365    params: Optional[Dict[str, Any]], default None
366        The standard Meerschaum `params` query dictionary.
367        
368        
369    fresh: bool, default False
370        If `True`, Ignore local cache and pull directly from the instance connector.
371        Only comes into effect if a pipe was created with `cache=True`.
372
373    debug: bool default False
374        Verbosity toggle.
375
376    Returns
377    -------
378    A `pd.DataFrame` for the pipe's data corresponding to the provided parameters. Backtrack data
379    is a convenient way to get a pipe's data "backtracked" from the most recent datetime.
380    """
381    from meerschaum.utils.warnings import warn
382    from meerschaum.utils.venv import Venv
383    from meerschaum.connectors import get_connector_plugin
384
385    if not self.exists(debug=debug):
386        return None
387
388    backtrack_interval = self.get_backtrack_interval(debug=debug)
389    if backtrack_minutes is None:
390        backtrack_minutes = (
391            (backtrack_interval.total_seconds() * 60)
392            if isinstance(backtrack_interval, timedelta)
393            else backtrack_interval
394        )
395
396    if self.cache_pipe is not None:
397        if not fresh:
398            _sync_cache_tuple = self.cache_pipe.sync(begin=begin, params=params, debug=debug, **kw)
399            if not _sync_cache_tuple[0]:
400                warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1])
401                fresh = True
402            else: ### Successfully synced cache.
403                return self.enforce_dtypes(
404                    self.cache_pipe.get_backtrack_data(
405                        fresh = True,
406                        begin = begin,
407                        backtrack_minutes = backtrack_minutes,
408                        params = params,
409                        debug = deubg,
410                        **kw
411                    ),
412                    debug = debug,
413                )
414
415    if hasattr(self.instance_connector, 'get_backtrack_data'):
416        with Venv(get_connector_plugin(self.instance_connector)):
417            return self.enforce_dtypes(
418                self.instance_connector.get_backtrack_data(
419                    pipe = self,
420                    begin = begin,
421                    backtrack_minutes = backtrack_minutes,
422                    params = params,
423                    debug = debug,
424                    **kw
425                ),
426                debug = debug,
427            )
428
429    if begin is None:
430        begin = self.get_sync_time(params=params, debug=debug)
431
432    backtrack_interval = (
433        timedelta(minutes=backtrack_minutes)
434        if isinstance(begin, datetime)
435        else backtrack_minutes
436    )
437    if begin is not None:
438        begin = begin - backtrack_interval
439
440    return self.get_data(
441        begin = begin,
442        params = params,
443        debug = debug,
444        **kw
445    )

Get the most recent data from the instance connector as a Pandas DataFrame.

Parameters
  • backtrack_minutes (Optional[int], default None): How many minutes from begin to select from. If None, use pipe.parameters['fetch']['backtrack_minutes'].
  • begin (Optional[datetime], default None): The starting point to search for data. If begin is None (default), use the most recent observed datetime (AKA sync_time).
    E.g. begin = 02:00
    
    Search this region.           Ignore this, even if there's data.
    /  /  /  /  /  /  /  /  /  |
    -----|----------|----------|----------|----------|----------|
    00:00      01:00      02:00      03:00      04:00      05:00
    
    
  • params (Optional[Dict[str, Any]], default None): The standard Meerschaum params query dictionary.
  • fresh (bool, default False): If True, Ignore local cache and pull directly from the instance connector. Only comes into effect if a pipe was created with cache=True.
  • debug (bool default False): Verbosity toggle.
Returns
  • A pd.DataFrame for the pipe's data corresponding to the provided parameters. Backtrack data
  • is a convenient way to get a pipe's data "backtracked" from the most recent datetime.
def get_rowcount( self, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, params: Optional[Dict[str, Any]] = None, remote: bool = False, debug: bool = False) -> int:
448def get_rowcount(
449        self,
450        begin: Optional[datetime] = None,
451        end: Optional['datetime'] = None,
452        params: Optional[Dict[str, Any]] = None,
453        remote: bool = False,
454        debug: bool = False
455    ) -> int:
456    """
457    Get a Pipe's instance or remote rowcount.
458
459    Parameters
460    ----------
461    begin: Optional[datetime], default None
462        Count rows where datetime > begin.
463
464    end: Optional[datetime], default None
465        Count rows where datetime < end.
466
467    remote: bool, default False
468        Count rows from a pipe's remote source.
469        **NOTE**: This is experimental!
470
471    debug: bool, default False
472        Verbosity toggle.
473
474    Returns
475    -------
476    An `int` of the number of rows in the pipe corresponding to the provided parameters.
477    Returned 0 if the pipe does not exist.
478    """
479    from meerschaum.utils.warnings import warn
480    from meerschaum.utils.venv import Venv
481    from meerschaum.connectors import get_connector_plugin
482
483    connector = self.instance_connector if not remote else self.connector
484    try:
485        with Venv(get_connector_plugin(connector)):
486            rowcount = connector.get_pipe_rowcount(
487                self,
488                begin = begin,
489                end = end,
490                params = params,
491                remote = remote,
492                debug = debug,
493            )
494            if rowcount is None:
495                return 0
496            return rowcount
497    except AttributeError as e:
498        warn(e)
499        if remote:
500            return 0
501    warn(f"Failed to get a rowcount for {self}.")
502    return 0

Get a Pipe's instance or remote rowcount.

Parameters
  • begin (Optional[datetime], default None): Count rows where datetime > begin.
  • end (Optional[datetime], default None): Count rows where datetime < end.
  • remote (bool, default False): Count rows from a pipe's remote source. NOTE: This is experimental!
  • debug (bool, default False): Verbosity toggle.
Returns
  • An int of the number of rows in the pipe corresponding to the provided parameters.
  • Returned 0 if the pipe does not exist.
def get_chunk_interval( self, chunk_interval: Union[datetime.timedelta, int, NoneType] = None, debug: bool = False) -> Union[datetime.timedelta, int]:
505def get_chunk_interval(
506        self,
507        chunk_interval: Union[timedelta, int, None] = None,
508        debug: bool = False,
509    ) -> Union[timedelta, int]:
510    """
511    Get the chunk interval to use for this pipe.
512
513    Parameters
514    ----------
515    chunk_interval: Union[timedelta, int, None], default None
516        If provided, coerce this value into the correct type.
517        For example, if the datetime axis is an integer, then
518        return the number of minutes.
519
520    Returns
521    -------
522    The chunk interval (`timedelta` or `int`) to use with this pipe's `datetime` axis.
523    """
524    default_chunk_minutes = get_config('pipes', 'parameters', 'verify', 'chunk_minutes')
525    configured_chunk_minutes = self.parameters.get('verify', {}).get('chunk_minutes', None)
526    chunk_minutes = (
527        (configured_chunk_minutes or default_chunk_minutes)
528        if chunk_interval is None
529        else (
530            chunk_interval
531            if isinstance(chunk_interval, int)
532            else int(chunk_interval.total_seconds() / 60)
533        )
534    )
535
536    dt_col = self.columns.get('datetime', None)
537    if dt_col is None:
538        return timedelta(minutes=chunk_minutes)
539
540    dt_dtype = self.dtypes.get(dt_col, 'datetime64[ns]')
541    if 'int' in dt_dtype.lower():
542        return chunk_minutes
543    return timedelta(minutes=chunk_minutes)

Get the chunk interval to use for this pipe.

Parameters
  • chunk_interval (Union[timedelta, int, None], default None): If provided, coerce this value into the correct type. For example, if the datetime axis is an integer, then return the number of minutes.
Returns
  • The chunk interval (timedelta or int) to use with this pipe's datetime axis.
def get_chunk_bounds( self, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, bounded: bool = False, chunk_interval: Union[datetime.timedelta, int, NoneType] = None, debug: bool = False) -> List[Tuple[Union[datetime.datetime, int, NoneType], Union[datetime.datetime, int, NoneType]]]:
546def get_chunk_bounds(
547        self,
548        begin: Union[datetime, int, None] = None,
549        end: Union[datetime, int, None] = None,
550        bounded: bool = False,
551        chunk_interval: Union[timedelta, int, None] = None,
552        debug: bool = False,
553    ) -> List[
554        Tuple[
555            Union[datetime, int, None],
556            Union[datetime, int, None],
557        ]
558    ]:
559    """
560    Return a list of datetime bounds for iterating over the pipe's `datetime` axis.
561
562    Parameters
563    ----------
564    begin: Union[datetime, int, None], default None
565        If provided, do not select less than this value.
566        Otherwise the first chunk will be unbounded.
567
568    end: Union[datetime, int, None], default None
569        If provided, do not select greater than or equal to this value.
570        Otherwise the last chunk will be unbounded.
571
572    bounded: bool, default False
573        If `True`, do not include `None` in the first chunk.
574
575    chunk_interval: Union[timedelta, int, None], default None
576        If provided, use this interval for the size of chunk boundaries.
577        The default value for this pipe may be set
578        under `pipe.parameters['verify']['chunk_minutes']`.
579
580    debug: bool, default False
581        Verbosity toggle.
582
583    Returns
584    -------
585    A list of chunk bounds (datetimes or integers).
586    If unbounded, the first and last chunks will include `None`.
587    """
588    include_less_than_begin = not bounded and begin is None
589    include_greater_than_end = not bounded and end is None
590    if begin is None:
591        begin = self.get_sync_time(newest=False, debug=debug)
592    if end is None:
593        end = self.get_sync_time(newest=True, debug=debug)
594    if begin is None and end is None:
595        return [(None, None)]
596
597    ### Set the chunk interval under `pipe.parameters['verify']['chunk_minutes']`.
598    chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug)
599    
600    ### Build a list of tuples containing the chunk boundaries
601    ### so that we can sync multiple chunks in parallel.
602    ### Run `verify pipes --workers 1` to sync chunks in series.
603    chunk_bounds = []
604    begin_cursor = begin
605    while begin_cursor < end:
606        end_cursor = begin_cursor + chunk_interval
607        chunk_bounds.append((begin_cursor, end_cursor))
608        begin_cursor = end_cursor
609
610    ### The chunk interval might be too large.
611    if not chunk_bounds and end >= begin:
612        chunk_bounds = [(begin, end)]
613
614    ### Truncate the last chunk to the end timestamp.
615    if chunk_bounds[-1][1] > end:
616        chunk_bounds[-1] = (chunk_bounds[-1][0], end)
617
618    ### Pop the last chunk if its bounds are equal.
619    if chunk_bounds[-1][0] == chunk_bounds[-1][1]:
620        chunk_bounds = chunk_bounds[:-1]
621
622    if include_less_than_begin:
623        chunk_bounds = [(None, begin)] + chunk_bounds
624    if include_greater_than_end:
625        chunk_bounds = chunk_bounds + [(end, None)]
626
627    return chunk_bounds

Return a list of datetime bounds for iterating over the pipe's datetime axis.

Parameters
  • begin (Union[datetime, int, None], default None): If provided, do not select less than this value. Otherwise the first chunk will be unbounded.
  • end (Union[datetime, int, None], default None): If provided, do not select greater than or equal to this value. Otherwise the last chunk will be unbounded.
  • bounded (bool, default False): If True, do not include None in the first chunk.
  • chunk_interval (Union[timedelta, int, None], default None): If provided, use this interval for the size of chunk boundaries. The default value for this pipe may be set under pipe.parameters['verify']['chunk_minutes'].
  • debug (bool, default False): Verbosity toggle.
Returns
  • A list of chunk bounds (datetimes or integers).
  • If unbounded, the first and last chunks will include None.
def register(self, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
12def register(
13        self,
14        debug: bool = False,
15        **kw: Any
16    ) -> SuccessTuple:
17    """
18    Register a new Pipe along with its attributes.
19
20    Parameters
21    ----------
22    debug: bool, default False
23        Verbosity toggle.
24
25    kw: Any
26        Keyword arguments to pass to `instance_connector.register_pipe()`.
27
28    Returns
29    -------
30    A `SuccessTuple` of success, message.
31    """
32    if self.temporary:
33        return False, "Cannot register pipes created with `temporary=True` (read-only)."
34
35    from meerschaum.utils.formatting import get_console
36    from meerschaum.utils.venv import Venv
37    from meerschaum.connectors import get_connector_plugin, custom_types
38    from meerschaum.config._patch import apply_patch_to_config
39
40    import warnings
41    with warnings.catch_warnings():
42        warnings.simplefilter('ignore')
43        try:
44            _conn = self.connector
45        except Exception as e:
46            _conn = None
47
48    if (
49        _conn is not None
50        and
51        (_conn.type == 'plugin' or _conn.type in custom_types)
52        and
53        getattr(_conn, 'register', None) is not None
54    ):
55        try:
56            with Venv(get_connector_plugin(_conn), debug=debug):
57                params = self.connector.register(self)
58        except Exception as e:
59            get_console().print_exception()
60            params = None
61        params = {} if params is None else params
62        if not isinstance(params, dict):
63            from meerschaum.utils.warnings import warn
64            warn(
65                f"Invalid parameters returned from `register()` in connector {self.connector}:\n"
66                + f"{params}"
67            )
68        else:
69            self.parameters = apply_patch_to_config(params, self.parameters)
70
71    if not self.parameters:
72        cols = self.columns if self.columns else {'datetime': None, 'id': None}
73        self.parameters = {
74            'columns': cols,
75        }
76
77    with Venv(get_connector_plugin(self.instance_connector)):
78        return self.instance_connector.register_pipe(self, debug=debug, **kw)

Register a new Pipe along with its attributes.

Parameters
  • debug (bool, default False): Verbosity toggle.
  • kw (Any): Keyword arguments to pass to instance_connector.register_pipe().
Returns
  • A SuccessTuple of success, message.
attributes: Dict[str, Any]

Return a dictionary of a pipe's keys and parameters. These values are reflected directly from the pipes table of the instance.

parameters: Optional[Dict[str, Any]]

Return the parameters dictionary of the pipe.

columns: Optional[Dict[str, str]]

Return the columns dictionary defined in meerschaum.Pipe.parameters.

dtypes: Optional[Dict[str, Any]]

If defined, return the dtypes dictionary defined in meerschaum.Pipe.parameters.

def get_columns(self, *args: str, error: bool = False) -> Union[str, Tuple[str]]:
139def get_columns(self, *args: str, error: bool = False) -> Union[str, Tuple[str]]:
140    """
141    Check if the requested columns are defined.
142
143    Parameters
144    ----------
145    *args: str
146        The column names to be retrieved.
147        
148    error: bool, default False
149        If `True`, raise an `Exception` if the specified column is not defined.
150
151    Returns
152    -------
153    A tuple of the same size of `args` or a `str` if `args` is a single argument.
154
155    Examples
156    --------
157    >>> pipe = mrsm.Pipe('test', 'test')
158    >>> pipe.columns = {'datetime': 'dt', 'id': 'id'}
159    >>> pipe.get_columns('datetime', 'id')
160    ('dt', 'id')
161    >>> pipe.get_columns('value', error=True)
162    Exception:  ๐Ÿ›‘ Missing 'value' column for Pipe('test', 'test').
163    """
164    from meerschaum.utils.warnings import error as _error, warn
165    if not args:
166        args = tuple(self.columns.keys())
167    col_names = []
168    for col in args:
169        col_name = None
170        try:
171            col_name = self.columns[col]
172            if col_name is None and error:
173                _error(f"Please define the name of the '{col}' column for {self}.")
174        except Exception as e:
175            col_name = None
176        if col_name is None and error:
177            _error(f"Missing '{col}'" + f" column for {self}.")
178        col_names.append(col_name)
179    if len(col_names) == 1:
180        return col_names[0]
181    return tuple(col_names)

Check if the requested columns are defined.

Parameters
  • *args (str): The column names to be retrieved.
  • error (bool, default False): If True, raise an Exception if the specified column is not defined.
Returns
  • A tuple of the same size of args or a str if args is a single argument.
Examples
>>> pipe = mrsm.Pipe('test', 'test')
>>> pipe.columns = {'datetime': 'dt', 'id': 'id'}
>>> pipe.get_columns('datetime', 'id')
('dt', 'id')
>>> pipe.get_columns('value', error=True)
Exception:  ๐Ÿ›‘ Missing 'value' column for Pipe('test', 'test').
def get_columns_types(self, debug: bool = False) -> Optional[Dict[str, str]]:
184def get_columns_types(self, debug: bool = False) -> Union[Dict[str, str], None]:
185    """
186    Get a dictionary of a pipe's column names and their types.
187
188    Parameters
189    ----------
190    debug: bool, default False:
191        Verbosity toggle.
192
193    Returns
194    -------
195    A dictionary of column names (`str`) to column types (`str`).
196
197    Examples
198    --------
199    >>> pipe.get_columns_types()
200    {
201      'dt': 'TIMESTAMP WITHOUT TIMEZONE',
202      'id': 'BIGINT',
203      'val': 'DOUBLE PRECISION',
204    }
205    >>>
206    """
207    from meerschaum.utils.venv import Venv
208    from meerschaum.connectors import get_connector_plugin
209
210    with Venv(get_connector_plugin(self.instance_connector)):
211        return self.instance_connector.get_pipe_columns_types(self, debug=debug)

Get a dictionary of a pipe's column names and their types.

Parameters
  • debug (bool, default False:): Verbosity toggle.
Returns
  • A dictionary of column names (str) to column types (str).
Examples
>>> pipe.get_columns_types()
{
  'dt': 'TIMESTAMP WITHOUT TIMEZONE',
  'id': 'BIGINT',
  'val': 'DOUBLE PRECISION',
}
>>>
def get_indices(self) -> Dict[str, str]:
436def get_indices(self) -> Dict[str, str]:
437    """
438    Return a dictionary in the form of `pipe.columns` but map to index names.
439    """
440    return {
441        ix: (self.target + '_' + col + '_index')
442        for ix, col in self.columns.items() if col
443    }

Return a dictionary in the form of pipe.columns but map to index names.

tags: Optional[List[str]]

If defined, return the tags list defined in meerschaum.Pipe.parameters.

def get_id(self, **kw: Any) -> Optional[int]:
214def get_id(self, **kw: Any) -> Union[int, None]:
215    """
216    Fetch a pipe's ID from its instance connector.
217    If the pipe does not exist, return `None`.
218    """
219    if self.temporary:
220        return None
221    from meerschaum.utils.venv import Venv
222    from meerschaum.connectors import get_connector_plugin
223
224    with Venv(get_connector_plugin(self.instance_connector)):
225        return self.instance_connector.get_pipe_id(self, **kw)

Fetch a pipe's ID from its instance connector. If the pipe does not exist, return None.

id: Optional[int]

Fetch and cache a pipe's ID.

def get_val_column(self, debug: bool = False) -> Optional[str]:
238def get_val_column(self, debug: bool = False) -> Union[str, None]:
239    """
240    Return the name of the value column if it's defined, otherwise make an educated guess.
241    If not set in the `columns` dictionary, return the first numeric column that is not
242    an ID or datetime column.
243    If none may be found, return `None`.
244
245    Parameters
246    ----------
247    debug: bool, default False:
248        Verbosity toggle.
249
250    Returns
251    -------
252    Either a string or `None`.
253    """
254    from meerschaum.utils.debug import dprint
255    if debug:
256        dprint('Attempting to determine the value column...')
257    try:
258        val_name = self.get_columns('value')
259    except Exception as e:
260        val_name = None
261    if val_name is not None:
262        if debug:
263            dprint(f"Value column: {val_name}")
264        return val_name
265
266    cols = self.columns
267    if cols is None:
268        if debug:
269            dprint('No columns could be determined. Returning...')
270        return None
271    try:
272        dt_name = self.get_columns('datetime', error=False)
273    except Exception as e:
274        dt_name = None
275    try:
276        id_name = self.get_columns('id', errors=False)
277    except Exception as e:
278        id_name = None
279
280    if debug:
281        dprint(f"dt_name: {dt_name}")
282        dprint(f"id_name: {id_name}")
283
284    cols_types = self.get_columns_types(debug=debug)
285    if cols_types is None:
286        return None
287    if debug:
288        dprint(f"cols_types: {cols_types}")
289    if dt_name is not None:
290        cols_types.pop(dt_name, None)
291    if id_name is not None:
292        cols_types.pop(id_name, None)
293
294    candidates = []
295    candidate_keywords = {'float', 'double', 'precision', 'int', 'numeric',}
296    for search_term in candidate_keywords:
297        for col, typ in cols_types.items():
298            if search_term in typ.lower():
299                candidates.append(col)
300                break
301    if not candidates:
302        if debug:
303            dprint(f"No value column could be determined.")
304        return None
305
306    return candidates[0]

Return the name of the value column if it's defined, otherwise make an educated guess. If not set in the columns dictionary, return the first numeric column that is not an ID or datetime column. If none may be found, return None.

Parameters
  • debug (bool, default False:): Verbosity toggle.
Returns
  • Either a string or None.
parents: List[Pipe]

Return a list of meerschaum.Pipe objects to be designated as parents.

children: List[Pipe]

Return a list of meerschaum.Pipe objects to be designated as children.

target: str

The target table name. You can set the target name under on of the following keys (checked in this order):

  • target
  • target_name
  • target_table
  • target_table_name
def guess_datetime(self) -> Optional[str]:
416def guess_datetime(self) -> Union[str, None]:
417    """
418    Try to determine a pipe's datetime column.
419    """
420    dtypes = self.dtypes
421
422    ### Abort if the user explictly disallows a datetime index.
423    if 'datetime' in dtypes:
424        if dtypes['datetime'] is None:
425            return None
426
427    dt_cols = [
428        col for col, typ in self.dtypes.items()
429        if str(typ).startswith('datetime')
430    ]
431    if not dt_cols:
432        return None
433    return dt_cols[0]    

Try to determine a pipe's datetime column.

def show( self, nopretty: bool = False, debug: bool = False, **kw) -> Tuple[bool, str]:
12def show(
13        self,
14        nopretty: bool = False,
15        debug: bool = False,
16        **kw
17    ) -> SuccessTuple:
18    """
19    Show attributes of a Pipe.
20
21    Parameters
22    ----------
23    nopretty: bool, default False
24        If `True`, simply print the JSON of the pipe's attributes.
25
26    debug: bool, default False
27        Verbosity toggle.
28
29    Returns
30    -------
31    A `SuccessTuple` of success, message.
32
33    """
34    import json
35    from meerschaum.utils.formatting import (
36        pprint, make_header, ANSI, highlight_pipes, fill_ansi, get_console,
37    )
38    from meerschaum.utils.packages import import_rich, attempt_import
39    from meerschaum.utils.warnings import info
40    attributes_json = json.dumps(self.attributes)
41    if not nopretty:
42        _to_print = f"Attributes for {self}:"
43        if ANSI:
44            _to_print = fill_ansi(highlight_pipes(make_header(_to_print)), 'magenta')
45            print(_to_print)
46            rich = import_rich()
47            rich_json = attempt_import('rich.json')
48            get_console().print(rich_json.JSON(attributes_json))
49        else:
50            print(_to_print)
51    else:
52        print(attributes_json)
53
54    return True, "Success"

Show attributes of a Pipe.

Parameters
  • nopretty (bool, default False): If True, simply print the JSON of the pipe's attributes.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A SuccessTuple of success, message.
def edit( self, patch: bool = False, interactive: bool = False, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
21def edit(
22        self,
23        patch: bool = False,
24        interactive: bool = False,
25        debug: bool = False,
26        **kw: Any
27    ) -> SuccessTuple:
28    """
29    Edit a Pipe's configuration.
30
31    Parameters
32    ----------
33    patch: bool, default False
34        If `patch` is True, update parameters by cascading rather than overwriting.
35    interactive: bool, default False
36        If `True`, open an editor for the user to make changes to the pipe's YAML file.
37    debug: bool, default False
38        Verbosity toggle.
39
40    Returns
41    -------
42    A `SuccessTuple` of success, message.
43
44    """
45    from meerschaum.utils.venv import Venv
46    from meerschaum.connectors import get_connector_plugin
47
48    if self.temporary:
49        return False, "Cannot edit pipes created with `temporary=True` (read-only)."
50
51    if not interactive:
52        with Venv(get_connector_plugin(self.instance_connector)):
53            return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw)
54    from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
55    from meerschaum.utils.misc import edit_file
56    parameters_filename = str(self) + '.yaml'
57    parameters_path = PIPES_CACHE_RESOURCES_PATH / parameters_filename
58    
59    from meerschaum.utils.yaml import yaml
60
61    edit_text = f"Edit the parameters for {self}"
62    edit_top = '#' * (len(edit_text) + 4)
63    edit_header = edit_top + f'\n# {edit_text} #\n' + edit_top + '\n\n'
64
65    from meerschaum.config import get_config
66    parameters = dict(get_config('pipes', 'parameters', patch=True))
67    from meerschaum.config._patch import apply_patch_to_config
68    parameters = apply_patch_to_config(parameters, self.parameters)
69
70    ### write parameters to yaml file
71    with open(parameters_path, 'w+') as f:
72        f.write(edit_header)
73        yaml.dump(parameters, stream=f, sort_keys=False)
74
75    ### only quit editing if yaml is valid
76    editing = True
77    while editing:
78        edit_file(parameters_path)
79        try:
80            with open(parameters_path, 'r') as f:
81                file_parameters = yaml.load(f.read())
82        except Exception as e:
83            from meerschaum.utils.warnings import warn
84            warn(f"Invalid format defined for '{self}':\n\n{e}")
85            input(f"Press [Enter] to correct the configuration for '{self}': ")
86        else:
87            editing = False
88
89    self.parameters = file_parameters
90
91    if debug:
92        from meerschaum.utils.formatting import pprint
93        pprint(self.parameters)
94
95    with Venv(get_connector_plugin(self.instance_connector)):
96        return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw)

Edit a Pipe's configuration.

Parameters
  • patch (bool, default False): If patch is True, update parameters by cascading rather than overwriting.
  • interactive (bool, default False): If True, open an editor for the user to make changes to the pipe's YAML file.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A SuccessTuple of success, message.
def edit_definition( self, yes: bool = False, noask: bool = False, force: bool = False, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
 99def edit_definition(
100        self,
101        yes: bool = False,
102        noask: bool = False,
103        force: bool = False,
104        debug : bool = False,
105        **kw : Any
106    ) -> SuccessTuple:
107    """
108    Edit a pipe's definition file and update its configuration.
109    **NOTE:** This function is interactive and should not be used in automated scripts!
110
111    Returns
112    -------
113    A `SuccessTuple` of success, message.
114
115    """
116    if self.temporary:
117        return False, "Cannot edit pipes created with `temporary=True` (read-only)."
118
119    from meerschaum.connectors import instance_types
120    if (self.connector is None) or self.connector.type not in instance_types:
121        return self.edit(interactive=True, debug=debug, **kw)
122
123    import json
124    from meerschaum.utils.warnings import info, warn
125    from meerschaum.utils.debug import dprint
126    from meerschaum.config._patch import apply_patch_to_config
127    from meerschaum.utils.misc import edit_file
128
129    _parameters = self.parameters
130    if 'fetch' not in _parameters:
131        _parameters['fetch'] = {}
132
133    def _edit_api():
134        from meerschaum.utils.prompt import prompt, yes_no
135        info(
136            f"Please enter the keys of the source pipe from '{self.connector}'.\n" +
137            "Type 'None' for None, or empty when there is no default. Press [CTRL+C] to skip."
138        )
139
140        _keys = { 'connector_keys' : None, 'metric_key' : None, 'location_key' : None }
141        for k in _keys:
142            _keys[k] = _parameters['fetch'].get(k, None)
143
144        for k, v in _keys.items():
145            try:
146                _keys[k] = prompt(k.capitalize().replace('_', ' ') + ':', icon=True, default=v)
147            except KeyboardInterrupt:
148                continue
149            if _keys[k] in ('', 'None', '\'None\'', '[None]'):
150                _keys[k] = None
151
152        _parameters['fetch'] = apply_patch_to_config(_parameters['fetch'], _keys)
153
154        info("You may optionally specify additional filter parameters as JSON.")
155        print("  Parameters are translated into a 'WHERE x AND y' clause, and lists are IN clauses.")
156        print("  For example, the following JSON would correspond to 'WHERE x = 1 AND y IN (2, 3)':")
157        print(json.dumps({'x': 1, 'y': [2, 3]}, indent=2, separators=(',', ': ')))
158        if force or yes_no(
159            "Would you like to add additional filter parameters?",
160            yes=yes, noask=noask
161        ):
162            from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
163            definition_filename = str(self) + '.json'
164            definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename
165            try:
166                definition_path.touch()
167                with open(definition_path, 'w+') as f:
168                    json.dump(_parameters.get('fetch', {}).get('params', {}), f, indent=2)
169            except Exception as e:
170                return False, f"Failed writing file '{definition_path}':\n" + str(e)
171
172            _params = None
173            while True:
174                edit_file(definition_path)
175                try:
176                    with open(definition_path, 'r') as f:
177                        _params = json.load(f)
178                except Exception as e:
179                    warn(f'Failed to read parameters JSON:\n{e}', stack=False)
180                    if force or yes_no(
181                        "Would you like to try again?\n  "
182                        + "If not, the parameters JSON file will be ignored.",
183                        noask=noask, yes=yes
184                    ):
185                        continue
186                    _params = None
187                break
188            if _params is not None:
189                if 'fetch' not in _parameters:
190                    _parameters['fetch'] = {}
191                _parameters['fetch']['params'] = _params
192
193        self.parameters = _parameters
194        return True, "Success"
195
196    def _edit_sql():
197        import pathlib, os, textwrap
198        from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
199        from meerschaum.utils.misc import edit_file
200        definition_filename = str(self) + '.sql'
201        definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename
202
203        sql_definition = _parameters['fetch'].get('definition', None)
204        if sql_definition is None:
205            sql_definition = ''
206        sql_definition = textwrap.dedent(sql_definition).lstrip()
207
208        try:
209            definition_path.touch()
210            with open(definition_path, 'w+') as f:
211                f.write(sql_definition)
212        except Exception as e:
213            return False, f"Failed writing file '{definition_path}':\n" + str(e)
214
215        edit_file(definition_path)
216        try:
217            with open(definition_path, 'r') as f:
218                file_definition = f.read()
219        except Exception as e:
220            return False, f"Failed reading file '{definition_path}':\n" + str(e)
221
222        if sql_definition == file_definition:
223            return False, f"No changes made to definition for {self}."
224
225        if ' ' not in file_definition:
226            return False, f"Invalid SQL definition for {self}."
227
228        if debug:
229            dprint("Read SQL definition:\n\n" + file_definition)
230        _parameters['fetch']['definition'] = file_definition
231        self.parameters = _parameters
232        return True, "Success"
233
234    locals()['_edit_' + str(self.connector.type)]()
235    return self.edit(interactive=False, debug=debug, **kw)

Edit a pipe's definition file and update its configuration. NOTE: This function is interactive and should not be used in automated scripts!

Returns
  • A SuccessTuple of success, message.
def update(self, *args, **kw) -> Tuple[bool, str]:
13def update(self, *args, **kw) -> SuccessTuple:
14    """
15    Update a pipe's parameters in its instance.
16    """
17    kw['interactive'] = False
18    return self.edit(*args, **kw)

Update a pipe's parameters in its instance.

def sync( self, df: 'Union[pd.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]], InferFetch]' = <class 'meerschaum.core.Pipe._sync.InferFetch'>, begin: Union[datetime.datetime, int, str, NoneType] = '', end: Union[datetime.datetime, int] = None, force: bool = False, retries: int = 10, min_seconds: int = 1, check_existing: bool = True, blocking: bool = True, workers: Optional[int] = None, callback: Optional[Callable[[Tuple[bool, str]], Any]] = None, error_callback: Optional[Callable[[Exception], Any]] = None, chunksize: Optional[int] = -1, sync_chunks: bool = True, debug: bool = False, _inplace: bool = True, **kw: Any) -> Tuple[bool, str]:
 37def sync(
 38        self,
 39        df: Union[
 40            pd.DataFrame,
 41            Dict[str, List[Any]],
 42            List[Dict[str, Any]],
 43            InferFetch
 44        ] = InferFetch,
 45        begin: Union[datetime, int, str, None] = '',
 46        end: Union[datetime, int] = None,
 47        force: bool = False,
 48        retries: int = 10,
 49        min_seconds: int = 1,
 50        check_existing: bool = True,
 51        blocking: bool = True,
 52        workers: Optional[int] = None,
 53        callback: Optional[Callable[[Tuple[bool, str]], Any]] = None,
 54        error_callback: Optional[Callable[[Exception], Any]] = None,
 55        chunksize: Optional[int] = -1,
 56        sync_chunks: bool = True,
 57        debug: bool = False,
 58        _inplace: bool = True,
 59        **kw: Any
 60    ) -> SuccessTuple:
 61    """
 62    Fetch new data from the source and update the pipe's table with new data.
 63    
 64    Get new remote data via fetch, get existing data in the same time period,
 65    and merge the two, only keeping the unseen data.
 66
 67    Parameters
 68    ----------
 69    df: Union[None, pd.DataFrame, Dict[str, List[Any]]], default None
 70        An optional DataFrame to sync into the pipe. Defaults to `None`.
 71
 72    begin: Union[datetime, int, str, None], default ''
 73        Optionally specify the earliest datetime to search for data.
 74
 75    end: Union[datetime, int, str, None], default None
 76        Optionally specify the latest datetime to search for data.
 77
 78    force: bool, default False
 79        If `True`, keep trying to sync untul `retries` attempts.
 80
 81    retries: int, default 10
 82        If `force`, how many attempts to try syncing before declaring failure.
 83
 84    min_seconds: Union[int, float], default 1
 85        If `force`, how many seconds to sleep between retries. Defaults to `1`.
 86
 87    check_existing: bool, default True
 88        If `True`, pull and diff with existing data from the pipe.
 89
 90    blocking: bool, default True
 91        If `True`, wait for sync to finish and return its result, otherwise
 92        asyncronously sync (oxymoron?) and return success. Defaults to `True`.
 93        Only intended for specific scenarios.
 94
 95    workers: Optional[int], default None
 96        If provided and the instance connector is thread-safe
 97        (`pipe.instance_connector.IS_THREAD_SAFE is True`),
 98        limit concurrent sync to this many threads.
 99
100    callback: Optional[Callable[[Tuple[bool, str]], Any]], default None
101        Callback function which expects a SuccessTuple as input.
102        Only applies when `blocking=False`.
103
104    error_callback: Optional[Callable[[Exception], Any]], default None
105        Callback function which expects an Exception as input.
106        Only applies when `blocking=False`.
107
108    chunksize: int, default -1
109        Specify the number of rows to sync per chunk.
110        If `-1`, resort to system configuration (default is `900`).
111        A `chunksize` of `None` will sync all rows in one transaction.
112
113    sync_chunks: bool, default True
114        If possible, sync chunks while fetching them into memory.
115
116    debug: bool, default False
117        Verbosity toggle. Defaults to False.
118
119    Returns
120    -------
121    A `SuccessTuple` of success (`bool`) and message (`str`).
122    """
123    from meerschaum.utils.debug import dprint, _checkpoint
124    from meerschaum.connectors import custom_types
125    from meerschaum.plugins import Plugin
126    from meerschaum.utils.formatting import get_console
127    from meerschaum.utils.venv import Venv
128    from meerschaum.connectors import get_connector_plugin
129    from meerschaum.utils.misc import df_is_chunk_generator
130    from meerschaum.utils.pool import get_pool
131    from meerschaum.config import get_config
132
133    if (callback is not None or error_callback is not None) and blocking:
134        warn("Callback functions are only executed when blocking = False. Ignoring...")
135
136    _checkpoint(_total=2, **kw)
137
138    if chunksize == 0:
139        chunksize = None
140        sync_chunks = False
141
142    kw.update({
143        'begin': begin,
144        'end': end,
145        'force': force,
146        'retries': retries,
147        'min_seconds': min_seconds,
148        'check_existing': check_existing,
149        'blocking': blocking,
150        'workers': workers,
151        'callback': callback,
152        'error_callback': error_callback,
153        'sync_chunks': sync_chunks,
154        'chunksize': chunksize,
155    })
156
157    ### NOTE: Invalidate `_exists` cache before and after syncing.
158    self._exists = None
159
160    def _sync(
161        p: 'meerschaum.Pipe',
162        df: Union[
163            'pd.DataFrame',
164            Dict[str, List[Any]],
165            List[Dict[str, Any]],
166            InferFetch
167        ] = InferFetch,
168    ) -> SuccessTuple:
169        if df is None:
170            p._exists = None
171            return (
172                False,
173                f"You passed `None` instead of data into `sync()` for {p}.\n"
174                + "Omit the DataFrame to infer fetching.",
175            )
176        ### Ensure that Pipe is registered.
177        if not p.temporary and p.get_id(debug=debug) is None:
178            ### NOTE: This may trigger an interactive session for plugins!
179            register_success, register_msg = p.register(debug=debug)
180            if not register_success:
181                if 'already' not in register_msg:
182                    p._exists = None
183                    return register_success, register_msg
184
185        ### If connector is a plugin with a `sync()` method, return that instead.
186        ### If the plugin does not have a `sync()` method but does have a `fetch()` method,
187        ### use that instead.
188        ### NOTE: The DataFrame must be omitted for the plugin sync method to apply.
189        ### If a DataFrame is provided, continue as expected.
190        if hasattr(df, 'MRSM_INFER_FETCH'):                   
191            try:
192                if p.connector is None:
193                    msg = f"{p} does not have a valid connector."
194                    if p.connector_keys.startswith('plugin:'):
195                        msg += f"\n    Perhaps {p.connector_keys} has a syntax error?"
196                    p._exists = None
197                    return False, msg
198            except Exception as e:
199                p._exists = None
200                return False, f"Unable to create the connector for {p}."
201
202            ### Sync in place if this is a SQL pipe.
203            if (
204                str(self.connector) == str(self.instance_connector)
205                and 
206                hasattr(self.instance_connector, 'sync_pipe_inplace')
207                and
208                _inplace
209                and
210                get_config('system', 'experimental', 'inplace_sync')
211            ):
212                with Venv(get_connector_plugin(self.instance_connector)):
213                    p._exists = None
214                    return self.instance_connector.sync_pipe_inplace(p, debug=debug, **kw)
215
216
217            ### Activate and invoke `sync(pipe)` for plugin connectors with `sync` methods.
218            try:
219                if p.connector.type == 'plugin' and p.connector.sync is not None:
220                    connector_plugin = Plugin(p.connector.label)
221                    with Venv(connector_plugin, debug=debug):
222                        return_tuple = p.connector.sync(p, debug=debug, **kw)
223                    p._exists = None
224                    if not isinstance(return_tuple, tuple):
225                        return_tuple = (
226                            False,
227                            f"Plugin '{p.connector.label}' returned non-tuple value: {return_tuple}"
228                        )
229                    return return_tuple
230
231            except Exception as e:
232                get_console().print_exception()
233                msg = f"Failed to sync {p} with exception: '" + str(e) + "'"
234                if debug:
235                    error(msg, silent=False)
236                p._exists = None
237                return False, msg
238
239            ### Fetch the dataframe from the connector's `fetch()` method.
240            try:
241                with Venv(get_connector_plugin(p.connector), debug=debug):
242                    df = p.fetch(debug=debug, **kw)
243
244            except Exception as e:
245                get_console().print_exception(
246                    suppress = [
247                        'meerschaum/core/Pipe/_sync.py', 
248                        'meerschaum/core/Pipe/_fetch.py', 
249                    ]
250                )
251                msg = f"Failed to fetch data from {p.connector}:\n    {e}"
252                df = None
253
254            if df is None:
255                p._exists = None
256                return False, f"No data were fetched for {p}."
257
258            if isinstance(df, list):
259                if len(df) == 0:
260                    return True, f"No new rows were returned for {p}."
261
262                ### May be a chunk hook results list.
263                if isinstance(df[0], tuple):
264                    success = all([_success for _success, _ in df])
265                    message = '\n'.join([_message for _, _message in df])
266                    return success, message
267
268            ### TODO: Depreciate async?
269            if df is True:
270                p._exists = None
271                return True, f"{p} is being synced in parallel."
272
273        ### CHECKPOINT: Retrieved the DataFrame.
274        _checkpoint(**kw)
275        
276        ### Allow for dataframe generators or iterables.
277        if df_is_chunk_generator(df):
278            kw['workers'] = p.get_num_workers(kw.get('workers', None))
279            dt_col = p.columns.get('datetime', None)
280            pool = get_pool(workers=kw.get('workers', 1))
281            if debug:
282                dprint(f"Received {type(df)}. Attempting to sync first chunk...")
283
284            try:
285                chunk = next(df)
286            except StopIteration:
287                return True, "Received an empty generator; nothing to do."
288
289            chunk_success, chunk_msg = _sync(p, chunk)
290            chunk_msg = '\n' + self._get_chunk_label(chunk, dt_col) + '\n' + chunk_msg
291            if not chunk_success:
292                return chunk_success, f"Unable to sync initial chunk for {p}:\n{chunk_msg}"
293            if debug:
294                dprint(f"Successfully synced the first chunk, attemping the rest...")
295
296            failed_chunks = []
297            def _process_chunk(_chunk):
298                try:
299                    _chunk_success, _chunk_msg = _sync(p, _chunk)
300                except Exception as e:
301                    _chunk_success, _chunk_msg = False, str(e)
302                if not _chunk_success:
303                    failed_chunks.append(_chunk)
304                return (
305                    _chunk_success,
306                    (
307                        '\n'
308                        + self._get_chunk_label(_chunk, dt_col)
309                        + '\n'
310                        + _chunk_msg
311                    )
312                )
313
314
315            results = sorted(
316                [(chunk_success, chunk_msg)] + (
317                    list(pool.imap(_process_chunk, df))
318                    if not df_is_chunk_generator(chunk)
319                    else [
320                        _process_chunk(_child_chunks)
321                        for _child_chunks in df
322                    ]
323                )
324            )
325            chunk_messages = [chunk_msg for _, chunk_msg in results]
326            success_bools = [chunk_success for chunk_success, _ in results]
327            success = all(success_bools)
328            msg = '\n'.join(chunk_messages)
329
330            ### If some chunks succeeded, retry the failures.
331            retry_success = True
332            if not success and any(success_bools):
333                if debug:
334                    dprint(f"Retrying failed chunks...")
335                chunks_to_retry = [c for c in failed_chunks]
336                failed_chunks = []
337                for chunk in chunks_to_retry:
338                    chunk_success, chunk_msg = _process_chunk(chunk)
339                    msg += f"\n\nRetried chunk:\n{chunk_msg}\n"
340                    retry_success = retry_success and chunk_success
341
342            success = success and retry_success
343            return success, msg
344
345        ### Cast to a dataframe and ensure datatypes are what we expect.
346        df = self.enforce_dtypes(df, chunksize=chunksize, debug=debug)
347        if debug:
348            dprint(
349                "DataFrame to sync:\n"
350                + (
351                    str(df)[:255]
352                    + '...'
353                    if len(str(df)) >= 256
354                    else str(df)
355                ),
356                **kw
357            )
358
359        ### if force, continue to sync until success
360        return_tuple = False, f"Did not sync {p}."
361        run = True
362        _retries = 1
363        while run:
364            with Venv(get_connector_plugin(self.instance_connector)):
365                return_tuple = p.instance_connector.sync_pipe(
366                    pipe = p,
367                    df = df,
368                    debug = debug,
369                    **kw
370                )
371            _retries += 1
372            run = (not return_tuple[0]) and force and _retries <= retries
373            if run and debug:
374                dprint(f"Syncing failed for {p}. Attempt ( {_retries} / {retries} )", **kw)
375                dprint(f"Sleeping for {min_seconds} seconds...", **kw)
376                time.sleep(min_seconds)
377            if _retries > retries:
378                warn(
379                    f"Unable to sync {p} within {retries} attempt" +
380                        ("s" if retries != 1 else "") + "!"
381                )
382
383        ### CHECKPOINT: Finished syncing. Handle caching.
384        _checkpoint(**kw)
385        if self.cache_pipe is not None:
386            if debug:
387                dprint(f"Caching retrieved dataframe.", **kw)
388                _sync_cache_tuple = self.cache_pipe.sync(df, debug=debug, **kw)
389                if not _sync_cache_tuple[0]:
390                    warn(f"Failed to sync local cache for {self}.")
391
392        self._exists = None
393        return return_tuple
394
395    if blocking:
396        self._exists = None
397        return _sync(self, df = df)
398
399    from meerschaum.utils.threading import Thread
400    def default_callback(result_tuple : SuccessTuple):
401        dprint(f"Asynchronous result from {self}: {result_tuple}", **kw)
402
403    def default_error_callback(x : Exception):
404        dprint(f"Error received for {self}: {x}", **kw)
405
406    if callback is None and debug:
407        callback = default_callback
408    if error_callback is None and debug:
409        error_callback = default_error_callback
410    try:
411        thread = Thread(
412            target = _sync,
413            args = (self,),
414            kwargs = {'df' : df},
415            daemon = False,
416            callback = callback,
417            error_callback = error_callback
418        )
419        thread.start()
420    except Exception as e:
421        self._exists = None
422        return False, str(e)
423
424    self._exists = None
425    return True, f"Spawned asyncronous sync for {self}."

Fetch new data from the source and update the pipe's table with new data.

Get new remote data via fetch, get existing data in the same time period, and merge the two, only keeping the unseen data.

Parameters
  • df (Union[None, pd.DataFrame, Dict[str, List[Any]]], default None): An optional DataFrame to sync into the pipe. Defaults to None.
  • begin (Union[datetime, int, str, None], default ''): Optionally specify the earliest datetime to search for data.
  • end (Union[datetime, int, str, None], default None): Optionally specify the latest datetime to search for data.
  • force (bool, default False): If True, keep trying to sync untul retries attempts.
  • retries (int, default 10): If force, how many attempts to try syncing before declaring failure.
  • min_seconds (Union[int, float], default 1): If force, how many seconds to sleep between retries. Defaults to 1.
  • check_existing (bool, default True): If True, pull and diff with existing data from the pipe.
  • blocking (bool, default True): If True, wait for sync to finish and return its result, otherwise asyncronously sync (oxymoron?) and return success. Defaults to True. Only intended for specific scenarios.
  • workers (Optional[int], default None): If provided and the instance connector is thread-safe (pipe.instance_connector.IS_THREAD_SAFE is True), limit concurrent sync to this many threads.
  • callback (Optional[Callable[[Tuple[bool, str]], Any]], default None): Callback function which expects a SuccessTuple as input. Only applies when blocking=False.
  • error_callback (Optional[Callable[[Exception], Any]], default None): Callback function which expects an Exception as input. Only applies when blocking=False.
  • chunksize (int, default -1): Specify the number of rows to sync per chunk. If -1, resort to system configuration (default is 900). A chunksize of None will sync all rows in one transaction.
  • sync_chunks (bool, default True): If possible, sync chunks while fetching them into memory.
  • debug (bool, default False): Verbosity toggle. Defaults to False.
Returns
  • A SuccessTuple of success (bool) and message (str).
def get_sync_time( self, params: Optional[Dict[str, Any]] = None, newest: bool = True, round_down: bool = False, debug: bool = False) -> Optional[datetime.datetime]:
428def get_sync_time(
429        self,
430        params: Optional[Dict[str, Any]] = None,
431        newest: bool = True,
432        round_down: bool = False, 
433        debug: bool = False
434    ) -> Union['datetime', None]:
435    """
436    Get the most recent datetime value for a Pipe.
437
438    Parameters
439    ----------
440    params: Optional[Dict[str, Any]], default None
441        Dictionary to build a WHERE clause for a specific column.
442        See `meerschaum.utils.sql.build_where`.
443
444    newest: bool, default True
445        If `True`, get the most recent datetime (honoring `params`).
446        If `False`, get the oldest datetime (`ASC` instead of `DESC`).
447
448    round_down: bool, default False
449        If `True`, round down the datetime value to the nearest minute.
450
451    debug: bool, default False
452        Verbosity toggle.
453
454    Returns
455    -------
456    A `datetime` object if the pipe exists, otherwise `None`.
457
458    """
459    from meerschaum.utils.venv import Venv
460    from meerschaum.connectors import get_connector_plugin
461    from meerschaum.utils.misc import round_time
462
463    with Venv(get_connector_plugin(self.instance_connector)):
464        sync_time = self.instance_connector.get_sync_time(
465            self,
466            params = params,
467            newest = newest,
468            debug = debug,
469        )
470
471    if not round_down or not isinstance(sync_time, datetime):
472        return sync_time
473
474    return round_time(sync_time, timedelta(minutes=1))

Get the most recent datetime value for a Pipe.

Parameters
  • params (Optional[Dict[str, Any]], default None): Dictionary to build a WHERE clause for a specific column. See meerschaum.utils.sql.build_where.
  • newest (bool, default True): If True, get the most recent datetime (honoring params). If False, get the oldest datetime (ASC instead of DESC).
  • round_down (bool, default False): If True, round down the datetime value to the nearest minute.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A datetime object if the pipe exists, otherwise None.
def exists(self, debug: bool = False) -> bool:
477def exists(
478        self,
479        debug : bool = False
480    ) -> bool:
481    """
482    See if a Pipe's table exists.
483
484    Parameters
485    ----------
486    debug: bool, default False
487        Verbosity toggle.
488
489    Returns
490    -------
491    A `bool` corresponding to whether a pipe's underlying table exists.
492
493    """
494    import time
495    from meerschaum.utils.venv import Venv
496    from meerschaum.connectors import get_connector_plugin
497    from meerschaum.config import STATIC_CONFIG
498    from meerschaum.utils.debug import dprint
499    now = time.perf_counter()
500    exists_timeout_seconds = STATIC_CONFIG['pipes']['exists_timeout_seconds']
501
502    _exists = self.__dict__.get('_exists', None)
503    if _exists:
504        exists_timestamp = self.__dict__.get('_exists_timestamp', None)
505        if exists_timestamp is not None:
506            delta = now - exists_timestamp
507            if delta < exists_timeout_seconds:
508                if debug:
509                    dprint(f"Returning cached `exists` for {self} ({round(delta, 2)} seconds old).")
510                return _exists
511
512    with Venv(get_connector_plugin(self.instance_connector)):
513        _exists = self.instance_connector.pipe_exists(pipe=self, debug=debug)
514
515    self.__dict__['_exists'] = _exists
516    self.__dict__['_exists_timestamp'] = now
517    return _exists

See if a Pipe's table exists.

Parameters
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool corresponding to whether a pipe's underlying table exists.
def filter_existing( self, df: "'pd.DataFrame'", safe_copy: bool = True, date_bound_only: bool = False, chunksize: Optional[int] = -1, debug: bool = False, **kw) -> "Tuple['pd.DataFrame', 'pd.DataFrame', 'pd.DataFrame']":
520def filter_existing(
521        self,
522        df: 'pd.DataFrame',
523        safe_copy: bool = True,
524        date_bound_only: bool = False,
525        chunksize: Optional[int] = -1,
526        debug: bool = False,
527        **kw
528    ) -> Tuple['pd.DataFrame', 'pd.DataFrame', 'pd.DataFrame']:
529    """
530    Inspect a dataframe and filter out rows which already exist in the pipe.
531
532    Parameters
533    ----------
534    df: 'pd.DataFrame'
535        The dataframe to inspect and filter.
536        
537    safe_copy: bool, default True
538        If `True`, create a copy before comparing and modifying the dataframes.
539        Setting to `False` may mutate the DataFrames.
540        See `meerschaum.utils.dataframe.filter_unseen_df`.
541
542    date_bound_only: bool, default False
543        If `True`, only use the datetime index to fetch the sample dataframe.
544
545    chunksize: Optional[int], default -1
546        The `chunksize` used when fetching existing data.
547
548    debug: bool, default False
549        Verbosity toggle.
550
551    Returns
552    -------
553    A tuple of three pandas DataFrames: unseen, update, and delta.
554    """
555    from meerschaum.utils.warnings import warn
556    from meerschaum.utils.debug import dprint
557    from meerschaum.utils.packages import attempt_import, import_pandas
558    from meerschaum.utils.misc import round_time
559    from meerschaum.utils.dataframe import (
560        filter_unseen_df,
561        add_missing_cols_to_df,
562        get_unhashable_cols,
563        get_numeric_cols,
564    )
565    from meerschaum.utils.dtypes import (
566        to_pandas_dtype,
567        none_if_null,
568    )
569    from meerschaum.config import get_config
570    pd = import_pandas()
571    pandas = attempt_import('pandas')
572    if not 'dataframe' in str(type(df)).lower():
573        df = self.enforce_dtypes(df, chunksize=chunksize, debug=debug)
574    is_dask = 'dask' in df.__module__
575    if is_dask:
576        dd = attempt_import('dask.dataframe')
577        merge = dd.merge
578        NA = pandas.NA
579    else:
580        merge = pd.merge
581        NA = pd.NA
582    if df is None:
583        return df, df, df
584    if (df.empty if not is_dask else len(df) == 0):
585        return df, df, df
586
587    ### begin is the oldest data in the new dataframe
588    begin, end = None, None
589    dt_col = self.columns.get('datetime', None)
590    dt_type = self.dtypes.get(dt_col, 'datetime64[ns]') if dt_col else None
591    try:
592        min_dt_val = df[dt_col].min(skipna=True) if dt_col else None
593        if is_dask and min_dt_val is not None:
594            min_dt_val = min_dt_val.compute()
595        min_dt = (
596            pandas.to_datetime(min_dt_val).to_pydatetime()
597            if min_dt_val is not None and 'datetime' in str(dt_type)
598            else min_dt_val
599        )
600    except Exception as e:
601        min_dt = None
602    if not ('datetime' in str(type(min_dt))) or str(min_dt) == 'NaT':
603        if 'int' not in str(type(min_dt)).lower():
604            min_dt = None
605
606    if isinstance(min_dt, datetime):
607        begin = (
608            round_time(
609                min_dt,
610                to = 'down'
611            ) - timedelta(minutes=1)
612        )
613    elif dt_type and 'int' in dt_type.lower():
614        begin = min_dt
615    elif dt_col is None:
616        begin = None
617
618    ### end is the newest data in the new dataframe
619    try:
620        max_dt_val = df[dt_col].max(skipna=True) if dt_col else None
621        if is_dask and max_dt_val is not None:
622            max_dt_val = max_dt_val.compute()
623        max_dt = (
624            pandas.to_datetime(max_dt_val).to_pydatetime()
625            if max_dt_val is not None and 'datetime' in str(dt_type)
626            else max_dt_val
627        )
628    except Exception as e:
629        import traceback
630        traceback.print_exc()
631        max_dt = None
632
633    if ('datetime' not in str(type(max_dt))) or str(min_dt) == 'NaT':
634        if 'int' not in str(type(max_dt)).lower():
635            max_dt = None
636
637    if isinstance(max_dt, datetime):
638        end = (
639            round_time(
640                max_dt,
641                to = 'down'
642            ) + timedelta(minutes=1)
643        )
644    elif dt_type and 'int' in dt_type.lower():
645        end = max_dt + 1
646
647    if max_dt is not None and min_dt is not None and min_dt > max_dt:
648        warn(f"Detected minimum datetime greater than maximum datetime.")
649
650    if begin is not None and end is not None and begin > end:
651        if isinstance(begin, datetime):
652            begin = end - timedelta(minutes=1)
653        ### We might be using integers for the datetime axis.
654        else:
655            begin = end - 1
656
657    unique_index_vals = {
658        col: df[col].unique()
659        for col in self.columns
660        if col in df.columns and col != dt_col
661    } if not date_bound_only else {}
662    filter_params_index_limit = get_config('pipes', 'sync', 'filter_params_index_limit')
663    _ = kw.pop('params', None)
664    params = {
665        col: [
666            none_if_null(val)
667            for val in unique_vals
668        ]
669        for col, unique_vals in unique_index_vals.items()
670        if len(unique_vals) <= filter_params_index_limit
671    } if not date_bound_only else {}
672
673    if debug:
674        dprint(f"Looking at data between '{begin}' and '{end}':", **kw)
675
676    backtrack_df = self.get_data(
677        begin = begin,
678        end = end,
679        chunksize = chunksize,
680        params = params,
681        debug = debug,
682        **kw
683    )
684    if debug:
685        dprint(f"Existing data for {self}:\n" + str(backtrack_df), **kw)
686        dprint(f"Existing dtypes for {self}:\n" + str(backtrack_df.dtypes))
687
688    ### Separate new rows from changed ones.
689    on_cols = [
690        col for col_key, col in self.columns.items()
691        if (
692            col
693            and
694            col_key != 'value'
695            and col in backtrack_df.columns
696        )
697    ]
698    self_dtypes = self.dtypes
699    on_cols_dtypes = {
700        col: to_pandas_dtype(typ)
701        for col, typ in self_dtypes.items()
702        if col in on_cols
703    }
704
705    ### Detect changes between the old target and new source dataframes.
706    delta_df = add_missing_cols_to_df(
707        filter_unseen_df(
708            backtrack_df,
709            df,
710            dtypes = {
711                col: to_pandas_dtype(typ)
712                for col, typ in self_dtypes.items()
713            },
714            safe_copy = safe_copy,
715            debug = debug
716        ),
717        on_cols_dtypes,
718    )
719
720    ### Cast dicts or lists to strings so we can merge.
721    serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str)
722    def deserializer(x):
723        return json.loads(x) if isinstance(x, str) else x
724
725    unhashable_delta_cols = get_unhashable_cols(delta_df)
726    unhashable_backtrack_cols = get_unhashable_cols(backtrack_df)
727    for col in unhashable_delta_cols:
728        delta_df[col] = delta_df[col].apply(serializer)
729    for col in unhashable_backtrack_cols:
730        backtrack_df[col] = backtrack_df[col].apply(serializer)
731    casted_cols = set(unhashable_delta_cols + unhashable_backtrack_cols)
732
733    joined_df = merge(
734        delta_df.fillna(NA),
735        backtrack_df.fillna(NA),
736        how = 'left',
737        on = on_cols,
738        indicator = True,
739        suffixes = ('', '_old'),
740    ) if on_cols else delta_df
741    for col in casted_cols:
742        if col in joined_df.columns:
743            joined_df[col] = joined_df[col].apply(deserializer)
744        if col in delta_df.columns:
745            delta_df[col] = delta_df[col].apply(deserializer)
746
747    ### Determine which rows are completely new.
748    new_rows_mask = (joined_df['_merge'] == 'left_only') if on_cols else None
749    cols = list(backtrack_df.columns)
750
751    unseen_df = (
752        (
753            joined_df
754            .where(new_rows_mask)
755            .dropna(how='all')[cols]
756            .reset_index(drop=True)
757        ) if not is_dask else (
758            joined_df
759            .where(new_rows_mask)
760            .dropna(how='all')[cols]
761            .reset_index(drop=True)
762        )
763    ) if on_cols else delta_df
764
765    ### Rows that have already been inserted but values have changed.
766    update_df = (
767        joined_df
768        .where(~new_rows_mask)
769        .dropna(how='all')[cols]
770        .reset_index(drop=True)
771    ) if on_cols else None
772
773    return unseen_df, update_df, delta_df

Inspect a dataframe and filter out rows which already exist in the pipe.

Parameters
  • df ('pd.DataFrame'): The dataframe to inspect and filter.
  • safe_copy (bool, default True): If True, create a copy before comparing and modifying the dataframes. Setting to False may mutate the DataFrames. See meerschaum.utils.dataframe.filter_unseen_df.
  • date_bound_only (bool, default False): If True, only use the datetime index to fetch the sample dataframe.
  • chunksize (Optional[int], default -1): The chunksize used when fetching existing data.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A tuple of three pandas DataFrames (unseen, update, and delta.):
def get_num_workers(self, workers: Optional[int] = None) -> int:
798def get_num_workers(self, workers: Optional[int] = None) -> int:
799    """
800    Get the number of workers to use for concurrent syncs.
801
802    Parameters
803    ----------
804    The number of workers passed via `--workers`.
805
806    Returns
807    -------
808    The number of workers, capped for safety.
809    """
810    is_thread_safe = getattr(self.instance_connector, 'IS_THREAD_SAFE', False)
811    if not is_thread_safe:
812        return 1
813
814    engine_pool_size = (
815        self.instance_connector.engine.pool.size()
816        if self.instance_connector.type == 'sql'
817        else None
818    )
819    current_num_threads = threading.active_count()
820    current_num_connections = (
821        self.instance_connector.engine.pool.checkedout()
822        if engine_pool_size is not None
823        else current_num_threads
824    )
825    desired_workers = (
826        min(workers or engine_pool_size, engine_pool_size)
827        if engine_pool_size is not None
828        else workers
829    )
830    if desired_workers is None:
831        desired_workers = (multiprocessing.cpu_count() if is_thread_safe else 1)
832
833    return max(
834        (desired_workers - current_num_connections),
835        1,
836    )

Get the number of workers to use for concurrent syncs.

Parameters
  • The number of workers passed via --workers.
Returns
  • The number of workers, capped for safety.
def verify( self, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, chunk_interval: Union[datetime.timedelta, int, NoneType] = None, bounded: Optional[bool] = None, deduplicate: bool = False, workers: Optional[int] = None, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
 15def verify(
 16        self,
 17        begin: Union[datetime, int, None] = None,
 18        end: Union[datetime, int, None] = None,
 19        params: Optional[Dict[str, Any]] = None,
 20        chunk_interval: Union[timedelta, int, None] = None,
 21        bounded: Optional[bool] = None,
 22        deduplicate: bool = False,
 23        workers: Optional[int] = None,
 24        debug: bool = False,
 25        **kwargs: Any
 26    ) -> SuccessTuple:
 27    """
 28    Verify the contents of the pipe by resyncing its interval.
 29
 30    Parameters
 31    ----------
 32    begin: Union[datetime, int, None], default None
 33        If specified, only verify rows greater than or equal to this value.
 34
 35    end: Union[datetime, int, None], default None
 36        If specified, only verify rows less than this value.
 37
 38    chunk_interval: Union[timedelta, int, None], default None
 39        If provided, use this as the size of the chunk boundaries.
 40        Default to the value set in `pipe.parameters['chunk_minutes']` (1440).
 41
 42    bounded: Optional[bool], default None
 43        If `True`, do not verify older than the oldest sync time or newer than the newest.
 44        If `False`, verify unbounded syncs outside of the new and old sync times.
 45        The default behavior (`None`) is to bound only if a bound interval is set
 46        (e.g. `pipe.parameters['verify']['bound_days']`).
 47
 48    deduplicate: bool, default False
 49        If `True`, deduplicate the pipe's table after the verification syncs.
 50
 51    workers: Optional[int], default None
 52        If provided, limit the verification to this many threads.
 53        Use a value of `1` to sync chunks in series.
 54
 55    debug: bool, default False
 56        Verbosity toggle.
 57
 58    kwargs: Any
 59        All keyword arguments are passed to `pipe.sync()`.
 60
 61    Returns
 62    -------
 63    A SuccessTuple indicating whether the pipe was successfully resynced.
 64    """
 65    from meerschaum.utils.pool import get_pool
 66    from meerschaum.utils.misc import interval_str
 67    workers = self.get_num_workers(workers)
 68
 69    ### Skip configured bounding in parameters
 70    ### if `bounded` is explicitly `False`.
 71    bound_time = (
 72        self.get_bound_time(debug=debug)
 73        if bounded is not False
 74        else None
 75    )
 76    if bounded is None:
 77        bounded = bound_time is not None
 78
 79    if bounded and begin is None:
 80        begin = (
 81            bound_time
 82            if bound_time is not None
 83            else self.get_sync_time(newest=False, debug=debug)
 84        )
 85    if bounded and end is None:
 86        end = self.get_sync_time(newest=True, debug=debug)
 87
 88    if bounded and end is not None:
 89        end += (
 90            timedelta(minutes=1)
 91            if isinstance(end, datetime)
 92            else 1
 93        )
 94
 95    sync_less_than_begin = not bounded and begin is None
 96    sync_greater_than_end = not bounded and end is None
 97
 98    cannot_determine_bounds = not self.exists(debug=debug)
 99
100    if cannot_determine_bounds:
101        sync_success, sync_msg = self.sync(
102            begin = begin,
103            end = end,
104            params = params,
105            workers = workers,
106            debug = debug,
107            **kwargs
108        )
109        if not sync_success:
110            return sync_success, sync_msg
111        if deduplicate:
112            return self.deduplicate(
113                begin = begin,
114                end = end,
115                params = params,
116                workers = workers,
117                debug = debug,
118                **kwargs
119            )
120        return sync_success, sync_msg
121
122
123    chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug)
124    chunk_bounds = self.get_chunk_bounds(
125        begin = begin,
126        end = end,
127        chunk_interval = chunk_interval,
128        bounded = bounded,
129        debug = debug,
130    )
131
132    ### Consider it a success if no chunks need to be verified.
133    if not chunk_bounds:
134        if deduplicate:
135            return self.deduplicate(
136                begin = begin,
137                end = end,
138                params = params,
139                workers = workers,
140                debug = debug,
141                **kwargs
142            )
143        return True, f"Could not determine chunks between '{begin}' and '{end}'; nothing to do."
144
145    begin_to_print = (
146        begin
147        if begin is not None
148        else (
149            chunk_bounds[0][0]
150            if bounded
151            else chunk_bounds[0][1]
152        )
153    )
154    end_to_print = (
155        end
156        if end is not None
157        else (
158            chunk_bounds[-1][1]
159            if bounded
160            else chunk_bounds[-1][0]
161        )
162    )
163
164    info(
165        f"Syncing {len(chunk_bounds)} chunk" + ('s' if len(chunk_bounds) != 1 else '')
166        + f" ({'un' if not bounded else ''}bounded)"
167        + f" of size '{interval_str(chunk_interval)}'"
168        + f" between '{begin_to_print}' and '{end_to_print}'."
169    )
170
171    pool = get_pool(workers=workers)
172
173    ### Dictionary of the form bounds -> success_tuple, e.g.:
174    ### {
175    ###    (2023-01-01, 2023-01-02): (True, "Success")
176    ### }
177    bounds_success_tuples = {}
178    def process_chunk_bounds(
179            chunk_begin_and_end: Tuple[
180                Union[int, datetime],
181                Union[int, datetime]
182            ]
183        ):
184        if chunk_begin_and_end in bounds_success_tuples:
185            return chunk_begin_and_end, bounds_success_tuples[chunk_begin_and_end]
186
187        chunk_begin, chunk_end = chunk_begin_and_end
188        return chunk_begin_and_end, self.sync(
189            begin = chunk_begin,
190            end = chunk_end,
191            params = params,
192            workers = workers,
193            debug = debug,
194            **kwargs
195        )
196
197    ### If we have more than one chunk, attempt to sync the first one and return if its fails.
198    if len(chunk_bounds) > 1:
199        first_chunk_bounds = chunk_bounds[0]
200        (
201            (first_begin, first_end),
202            (first_success, first_msg)
203        ) = process_chunk_bounds(first_chunk_bounds)
204        if not first_success:
205            return (
206                first_success,
207                f"\n{first_begin} - {first_end}\n"
208                + f"Failed to sync first chunk:\n{first_msg}"
209            )
210        bounds_success_tuples[first_chunk_bounds] = (first_success, first_msg)
211
212    bounds_success_tuples.update(dict(pool.map(process_chunk_bounds, chunk_bounds)))
213    bounds_success_bools = {bounds: tup[0] for bounds, tup in bounds_success_tuples.items()}
214
215    message_header = f"{begin_to_print} - {end_to_print}"
216    if all(bounds_success_bools.values()):
217        msg = get_chunks_success_message(bounds_success_tuples, header=message_header)
218        if deduplicate:
219            deduplicate_success, deduplicate_msg = self.deduplicate(
220                begin = begin,
221                end = end,
222                params = params,
223                workers = workers,
224                debug = debug,
225                **kwargs
226            )
227            return deduplicate_success, msg + '\n\n' + deduplicate_msg
228        return True, msg
229
230    chunk_bounds_to_resync = [
231        bounds
232        for bounds, success in zip(chunk_bounds, bounds_success_bools)
233        if not success
234    ]
235    bounds_to_print = [
236        f"{bounds[0]} - {bounds[1]}"
237        for bounds in chunk_bounds_to_resync
238    ]
239    if bounds_to_print:
240        warn(
241            f"Will resync the following failed chunks:\n    "
242            + '\n    '.join(bounds_to_print),
243            stack = False,
244        )
245
246    retry_bounds_success_tuples = dict(pool.map(process_chunk_bounds, chunk_bounds_to_resync))
247    bounds_success_tuples.update(retry_bounds_success_tuples)
248    retry_bounds_success_bools = {
249        bounds: tup[0]
250        for bounds, tup in retry_bounds_success_tuples.items()
251    }
252
253    if all(retry_bounds_success_bools.values()):
254        message = (
255            get_chunks_success_message(bounds_success_tuples, header=message_header)
256            + f"\nRetried {len(chunk_bounds_to_resync)} chunks."
257        )
258        if deduplicate:
259            deduplicate_success, deduplicate_msg = self.deduplicate(
260                begin = begin,
261                end = end,
262                params = params,
263                workers = workers,
264                debug = debug,
265                **kwargs
266            )
267            return deduplicate_success, message + '\n\n' + deduplicate_msg
268        return True, message
269
270    message = get_chunks_success_message(bounds_success_tuples, header=message_header)
271    if deduplicate:
272        deduplicate_success, deduplicate_msg = self.deduplicate(
273            begin = begin,
274            end = end,
275            params = params,
276            workers = workers,
277            debug = debug,
278            **kwargs
279        )
280        return deduplicate_success, message + '\n\n' + deduplicate_msg
281    return False, message

Verify the contents of the pipe by resyncing its interval.

Parameters
  • begin (Union[datetime, int, None], default None): If specified, only verify rows greater than or equal to this value.
  • end (Union[datetime, int, None], default None): If specified, only verify rows less than this value.
  • chunk_interval (Union[timedelta, int, None], default None): If provided, use this as the size of the chunk boundaries. Default to the value set in pipe.parameters['chunk_minutes'] (1440).
  • bounded (Optional[bool], default None): If True, do not verify older than the oldest sync time or newer than the newest. If False, verify unbounded syncs outside of the new and old sync times. The default behavior (None) is to bound only if a bound interval is set (e.g. pipe.parameters['verify']['bound_days']).
  • deduplicate (bool, default False): If True, deduplicate the pipe's table after the verification syncs.
  • workers (Optional[int], default None): If provided, limit the verification to this many threads. Use a value of 1 to sync chunks in series.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All keyword arguments are passed to pipe.sync().
Returns
  • A SuccessTuple indicating whether the pipe was successfully resynced.
def get_bound_interval(self, debug: bool = False) -> Union[datetime.timedelta, int, NoneType]:
353def get_bound_interval(self, debug: bool = False) -> Union[timedelta, int, None]:
354    """
355    Return the interval used to determine the bound time (limit for verification syncs).
356    If the datetime axis is an integer, just return its value.
357
358    Below are the supported keys for the bound interval:
359
360        - `pipe.parameters['verify']['bound_minutes']`
361        - `pipe.parameters['verify']['bound_hours']`
362        - `pipe.parameters['verify']['bound_days']`
363        - `pipe.parameters['verify']['bound_weeks']`
364        - `pipe.parameters['verify']['bound_years']`
365        - `pipe.parameters['verify']['bound_seconds']`
366
367    If multiple keys are present, the first on this priority list will be used.
368
369    Returns
370    -------
371    A `timedelta` or `int` value to be used to determine the bound time.
372    """
373    verify_params = self.parameters.get('verify', {})
374    prefix = 'bound_'
375    suffixes_to_check = ('minutes', 'hours', 'days', 'weeks', 'years', 'seconds')
376    keys_to_search = {
377        key: val
378        for key, val in verify_params.items()
379        if key.startswith(prefix)
380    }
381    bound_time_key, bound_time_value = None, None
382    for key, value in keys_to_search.items():
383        for suffix in suffixes_to_check:
384            if key == prefix + suffix:
385                bound_time_key = key
386                bound_time_value = value
387                break
388        if bound_time_key is not None:
389            break
390
391    if bound_time_value is None:
392        return bound_time_value
393
394    dt_col = self.columns.get('datetime', None)
395    if not dt_col:
396        return bound_time_value
397
398    dt_typ = self.dtypes.get(dt_col, 'datetime64[ns]')
399    if 'int' in dt_typ.lower():
400        return int(bound_time_value)
401
402    interval_type = bound_time_key.replace(prefix, '')
403    return timedelta(**{interval_type: bound_time_value})

Return the interval used to determine the bound time (limit for verification syncs). If the datetime axis is an integer, just return its value.

Below are the supported keys for the bound interval:

- `pipe.parameters['verify']['bound_minutes']`
- `pipe.parameters['verify']['bound_hours']`
- `pipe.parameters['verify']['bound_days']`
- `pipe.parameters['verify']['bound_weeks']`
- `pipe.parameters['verify']['bound_years']`
- `pipe.parameters['verify']['bound_seconds']`

If multiple keys are present, the first on this priority list will be used.

Returns
  • A timedelta or int value to be used to determine the bound time.
def get_bound_time(self, debug: bool = False) -> Union[datetime.datetime, int, NoneType]:
406def get_bound_time(self, debug: bool = False) -> Union[datetime, int, None]:
407    """
408    The bound time is the limit at which long-running verification syncs should stop.
409    A value of `None` means verification syncs should be unbounded.
410
411    Like deriving a backtrack time from `pipe.get_sync_time()`,
412    the bound time is the sync time minus a large window (e.g. 366 days).
413
414    Unbound verification syncs (i.e. `bound_time is None`)
415    if the oldest sync time is less than the bound interval.
416
417    Returns
418    -------
419    A `datetime` or `int` corresponding to the
420    `begin` bound for verification and deduplication syncs.
421    """ 
422    bound_interval = self.get_bound_interval(debug=debug)
423    if bound_interval is None:
424        return None
425
426    sync_time = self.get_sync_time(debug=debug)
427    if sync_time is None:
428        return None
429
430    bound_time = sync_time - bound_interval
431    oldest_sync_time = self.get_sync_time(newest=False, debug=debug)
432
433    return (
434        bound_time
435        if bound_time > oldest_sync_time
436        else None
437    )

The bound time is the limit at which long-running verification syncs should stop. A value of None means verification syncs should be unbounded.

Like deriving a backtrack time from pipe.get_sync_time(), the bound time is the sync time minus a large window (e.g. 366 days).

Unbound verification syncs (i.e. bound_time is None) if the oldest sync time is less than the bound interval.

Returns
  • A datetime or int corresponding to the
  • begin bound for verification and deduplication syncs.
def delete(self, drop: bool = True, debug: bool = False, **kw) -> Tuple[bool, str]:
12def delete(
13        self,
14        drop: bool = True,
15        debug: bool = False,
16        **kw
17    ) -> SuccessTuple:
18    """
19    Call the Pipe's instance connector's `delete_pipe()` method.
20
21    Parameters
22    ----------
23    drop: bool, default True
24        If `True`, drop the pipes' target table.
25
26    debug : bool, default False
27        Verbosity toggle.
28
29    Returns
30    -------
31    A `SuccessTuple` of success (`bool`), message (`str`).
32
33    """
34    import os, pathlib
35    from meerschaum.utils.warnings import warn
36    from meerschaum.utils.venv import Venv
37    from meerschaum.connectors import get_connector_plugin
38
39    if self.temporary:
40        return (
41            False,
42            "Cannot delete pipes created with `temporary=True` (read-only). "
43            + "You may want to call `pipe.drop()` instead."
44        )
45
46    if self.cache_pipe is not None:
47        _drop_cache_tuple = self.cache_pipe.drop(debug=debug, **kw)
48        if not _drop_cache_tuple[0]:
49            warn(_drop_cache_tuple[1])
50        if getattr(self.cache_connector, 'flavor', None) == 'sqlite':
51            _cache_db_path = pathlib.Path(self.cache_connector.database)
52            try:
53                os.remove(_cache_db_path)
54            except Exception as e:
55                warn(f"Could not delete cache file '{_cache_db_path}' for {self}:\n{e}")
56
57    if drop:
58        drop_success, drop_msg = self.drop(debug=debug)
59        if not drop_success:
60            warn(f"Failed to drop {self}:\n{drop_msg}")
61
62    with Venv(get_connector_plugin(self.instance_connector)):
63        result = self.instance_connector.delete_pipe(self, debug=debug, **kw)
64
65    if not isinstance(result, tuple):
66        return False, f"Received an unexpected result from '{self.instance_connector}': {result}"
67
68    if result[0]:
69        to_delete = ['_id']
70        for member in to_delete:
71            if member in self.__dict__:
72                del self.__dict__[member]
73    return result

Call the Pipe's instance connector's delete_pipe() method.

Parameters
  • drop (bool, default True): If True, drop the pipes' target table.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A SuccessTuple of success (bool), message (str).
def drop(self, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
13def drop(
14        self,
15        debug: bool = False,
16        **kw: Any
17    ) -> SuccessTuple:
18    """
19    Call the Pipe's instance connector's `drop_pipe()` method.
20
21    Parameters
22    ----------
23    debug: bool, default False:
24        Verbosity toggle.
25
26    Returns
27    -------
28    A `SuccessTuple` of success, message.
29
30    """
31    self._exists = False
32    from meerschaum.utils.warnings import warn
33    from meerschaum.utils.venv import Venv
34    from meerschaum.connectors import get_connector_plugin
35
36    if self.cache_pipe is not None:
37        _drop_cache_tuple = self.cache_pipe.drop(debug=debug, **kw)
38        if not _drop_cache_tuple[0]:
39            warn(_drop_cache_tuple[1])
40
41    with Venv(get_connector_plugin(self.instance_connector)):
42        result = self.instance_connector.drop_pipe(self, debug=debug, **kw)
43    return result

Call the Pipe's instance connector's drop_pipe() method.

Parameters
  • debug (bool, default False:): Verbosity toggle.
Returns
  • A SuccessTuple of success, message.
def clear( self, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
13def clear(
14        self,
15        begin: Optional[datetime.datetime] = None,
16        end: Optional[datetime.datetime] = None,
17        params: Optional[Dict[str, Any]] = None,
18        debug: bool = False,
19        **kwargs: Any
20    ) -> SuccessTuple:
21    """
22    Call the Pipe's instance connector's `clear_pipe` method.
23
24    Parameters
25    ----------
26    begin: Optional[datetime.datetime], default None:
27        If provided, only remove rows newer than this datetime value.
28
29    end: Optional[datetime.datetime], default None:
30        If provided, only remove rows older than this datetime column (not including end).
31
32    params: Optional[Dict[str, Any]], default None
33         See `meerschaum.utils.sql.build_where`.
34
35    debug: bool, default False:
36        Verbositity toggle.
37
38    Returns
39    -------
40    A `SuccessTuple` corresponding to whether this procedure completed successfully.
41
42    Examples
43    --------
44    >>> pipe = mrsm.Pipe('test', 'test', columns={'datetime': 'dt'}, instance='sql:local')
45    >>> pipe.sync({'dt': [datetime.datetime(2020, 1, 1, 0, 0)]})
46    >>> pipe.sync({'dt': [datetime.datetime(2021, 1, 1, 0, 0)]})
47    >>> pipe.sync({'dt': [datetime.datetime(2022, 1, 1, 0, 0)]})
48    >>> 
49    >>> pipe.clear(begin=datetime.datetime(2021, 1, 1, 0, 0))
50    >>> pipe.get_data()
51              dt
52    0 2020-01-01
53
54    """
55    from meerschaum.utils.warnings import warn
56    from meerschaum.utils.venv import Venv
57    from meerschaum.connectors import get_connector_plugin
58
59    if self.cache_pipe is not None:
60        success, msg = self.cache_pipe.clear(
61            begin = begin,
62            end = end,
63            params = params,
64            debug = debug,
65            **kwargs
66        )
67        if not success:
68            warn(msg)
69
70    with Venv(get_connector_plugin(self.instance_connector)):
71        return self.instance_connector.clear_pipe(
72            self,
73            begin = begin,
74            end = end,
75            params = params,
76            debug = debug,
77            **kwargs
78        )

Call the Pipe's instance connector's clear_pipe method.

Parameters
  • begin (Optional[datetime.datetime], default None:): If provided, only remove rows newer than this datetime value.
  • end (Optional[datetime.datetime], default None:): If provided, only remove rows older than this datetime column (not including end).
  • params (Optional[Dict[str, Any]], default None): See meerschaum.utils.sql.build_where.
  • debug (bool, default False:): Verbositity toggle.
Returns
  • A SuccessTuple corresponding to whether this procedure completed successfully.
Examples
>>> pipe = mrsm.Pipe('test', 'test', columns={'datetime': 'dt'}, instance='sql:local')
>>> pipe.sync({'dt': [datetime.datetime(2020, 1, 1, 0, 0)]})
>>> pipe.sync({'dt': [datetime.datetime(2021, 1, 1, 0, 0)]})
>>> pipe.sync({'dt': [datetime.datetime(2022, 1, 1, 0, 0)]})
>>> 
>>> pipe.clear(begin=datetime.datetime(2021, 1, 1, 0, 0))
>>> pipe.get_data()
          dt
0 2020-01-01
def deduplicate( self, begin: 'Union[datetime, int, None]' = None, end: 'Union[datetime, int, None]' = None, params: Optional[Dict[str, Any]] = None, chunk_interval: 'Union[datetime, int, None]' = None, bounded: Optional[bool] = None, workers: Optional[int] = None, debug: bool = False, _use_instance_method: bool = True, **kwargs: Any) -> Tuple[bool, str]:
 15def deduplicate(
 16        self,
 17        begin: Union[datetime, int, None] = None,
 18        end: Union[datetime, int, None] = None,
 19        params: Optional[Dict[str, Any]] = None,
 20        chunk_interval: Union[datetime, int, None] = None,
 21        bounded: Optional[bool] = None,
 22        workers: Optional[int] = None,
 23        debug: bool = False,
 24        _use_instance_method: bool = True,
 25        **kwargs: Any
 26    ) -> SuccessTuple:
 27    """
 28    Call the Pipe's instance connector's `delete_duplicates` method to delete duplicate rows.
 29
 30    Parameters
 31    ----------
 32    begin: Union[datetime, int, None], default None:
 33        If provided, only deduplicate rows newer than this datetime value.
 34
 35    end: Union[datetime, int, None], default None:
 36        If provided, only deduplicate rows older than this datetime column (not including end).
 37
 38    params: Optional[Dict[str, Any]], default None
 39        Restrict deduplication to this filter (for multiplexed data streams).
 40        See `meerschaum.utils.sql.build_where`.
 41
 42    chunk_interval: Union[timedelta, int, None], default None
 43        If provided, use this for the chunk bounds.
 44        Defaults to the value set in `pipe.parameters['chunk_minutes']` (1440).
 45
 46    bounded: Optional[bool], default None
 47        Only check outside the oldest and newest sync times if bounded is explicitly `False`.
 48
 49    workers: Optional[int], default None
 50        If the instance connector is thread-safe, limit concurrenct syncs to this many threads.
 51
 52    debug: bool, default False:
 53        Verbositity toggle.
 54
 55    kwargs: Any
 56        All other keyword arguments are passed to
 57        `pipe.sync()`, `pipe.clear()`, and `pipe.get_data().
 58
 59    Returns
 60    -------
 61    A `SuccessTuple` corresponding to whether all of the chunks were successfully deduplicated.
 62    """
 63    from meerschaum.utils.warnings import warn, info
 64    from meerschaum.utils.misc import interval_str, items_str
 65    from meerschaum.utils.venv import Venv
 66    from meerschaum.connectors import get_connector_plugin
 67    from meerschaum.utils.pool import get_pool
 68
 69    if self.cache_pipe is not None:
 70        success, msg = self.cache_pipe.deduplicate(
 71            begin = begin,
 72            end = end,
 73            params = params,
 74            bounded = bounded,
 75            debug = debug,
 76            _use_instance_method = _use_instance_method,
 77            **kwargs
 78        )
 79        if not success:
 80            warn(msg)
 81
 82    workers = self.get_num_workers(workers=workers)
 83    pool = get_pool(workers=workers)
 84
 85    if _use_instance_method:
 86        with Venv(get_connector_plugin(self.instance_connector)):
 87            if hasattr(self.instance_connector, 'deduplicate_pipe'):
 88                return self.instance_connector.deduplicate_pipe(
 89                    self,
 90                    begin = begin,
 91                    end = end,
 92                    params = params,
 93                    bounded = bounded,
 94                    debug = debug,
 95                    **kwargs
 96                )
 97
 98    ### Only unbound if explicitly False.
 99    if bounded is None:
100        bounded = True
101    chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug)
102
103    bound_time = self.get_bound_time(debug=debug)
104    if bounded and begin is None:
105        begin = (
106            bound_time
107            if bound_time is not None
108            else self.get_sync_time(newest=False, debug=debug)
109        )
110    if bounded and end is None:
111        end = self.get_sync_time(newest=True, debug=debug)
112
113    if bounded and end is not None:
114        end += (
115            timedelta(minutes=1)
116            if isinstance(end, datetime)
117            else 1
118        )
119
120    chunk_bounds = self.get_chunk_bounds(
121        bounded = bounded,
122        begin = begin,
123        end = end,
124        chunk_interval = chunk_interval,
125        debug = debug,
126    )
127
128    indices = [col for col in self.columns.values() if col]
129    if not indices:
130        return False, f"Cannot deduplicate without index columns."
131    dt_col = self.columns.get('datetime', None)
132
133    def process_chunk_bounds(bounds) -> Tuple[
134            Tuple[
135                Union[datetime, int, None],
136                Union[datetime, int, None]
137            ],
138            SuccessTuple
139        ]:
140        ### Only selecting the index values here to keep bandwidth down.
141        chunk_begin, chunk_end = bounds
142        chunk_df = self.get_data(
143            select_columns = indices, 
144            begin = chunk_begin,
145            end = chunk_end,
146            params = params,
147            debug = debug,
148        )
149        if chunk_df is None:
150            return bounds, (True, "")
151        existing_chunk_len = len(chunk_df)
152        deduped_chunk_df = chunk_df.drop_duplicates(keep='last')
153        deduped_chunk_len = len(deduped_chunk_df)
154
155        if existing_chunk_len == deduped_chunk_len:
156            return bounds, (True, "")
157
158        chunk_msg_header = f"\n{chunk_begin} - {chunk_end}"
159        chunk_msg_body = ""
160
161        full_chunk = self.get_data(
162            begin = chunk_begin,
163            end = chunk_end,
164            params = params,
165            debug = debug,
166        )
167        if full_chunk is None or len(full_chunk) == 0:
168            return bounds, (True, f"{chunk_msg_header}\nChunk is empty, skipping...")
169
170        chunk_indices = [ix for ix in indices if ix in full_chunk.columns]
171        if not chunk_indices:
172            return bounds, (False, f"None of {items_str(indices)} were present in chunk.")
173        try:
174            full_chunk = full_chunk.drop_duplicates(
175                subset = chunk_indices,
176                keep = 'last'
177            ).reset_index(
178                drop = True,
179            )
180        except Exception as e:
181            return (
182                bounds,
183                (False, f"Failed to deduplicate chunk on {items_str(chunk_indices)}:\n({e})")
184            )
185
186        clear_success, clear_msg = self.clear(
187            begin = chunk_begin,
188            end = chunk_end,
189            params = params,
190            debug = debug,
191        )
192        if not clear_success:
193            chunk_msg_body += f"Failed to clear chunk while deduplicating:\n{clear_msg}\n"
194            warn(chunk_msg_body)
195
196        sync_success, sync_msg = self.sync(full_chunk, debug=debug)
197        if not sync_success:
198            chunk_msg_body += f"Failed to sync chunk while deduplicating:\n{sync_msg}\n"
199         
200        ### Finally check if the deduplication worked.
201        chunk_rowcount = self.get_rowcount(
202            begin = chunk_begin,
203            end = chunk_end,
204            params = params,
205            debug = debug,
206        )
207        if chunk_rowcount != deduped_chunk_len:
208            return bounds, (
209                False, (
210                    chunk_msg_header + "\n"
211                    + chunk_msg_body + ("\n" if chunk_msg_body else '')
212                    + "Chunk rowcounts still differ ("
213                    + f"{chunk_rowcount} rowcount vs {deduped_chunk_len} chunk length)."
214                )
215            )
216
217        return bounds, (
218            True, (
219                chunk_msg_header + "\n"
220                + chunk_msg_body + ("\n" if chunk_msg_body else '')
221                + f"Deduplicated chunk from {existing_chunk_len} to {chunk_rowcount} rows."
222            )
223        )
224
225    info(
226        f"Deduplicating {len(chunk_bounds)} chunk"
227        + ('s' if len(chunk_bounds) != 1 else '')
228        + f" ({'un' if not bounded else ''}bounded)"
229        + f" of size '{interval_str(chunk_interval)}'"
230        + f" on {self}."
231    )
232    bounds_success_tuples = dict(pool.map(process_chunk_bounds, chunk_bounds))
233    bounds_successes = {
234        bounds: success_tuple
235        for bounds, success_tuple in bounds_success_tuples.items()
236        if success_tuple[0]
237    }
238    bounds_failures = {
239        bounds: success_tuple
240        for bounds, success_tuple in bounds_success_tuples.items()
241        if not success_tuple[0]
242    }
243
244    ### No need to retry if everything failed.
245    if len(bounds_failures) > 0 and len(bounds_successes) == 0:
246        return (
247            False,
248            (
249                f"Failed to deduplicate {len(bounds_failures)} chunk"
250                + ('s' if len(bounds_failures) != 1 else '')
251                + ".\n"
252                + "\n".join([msg for _, (_, msg) in bounds_failures.items() if msg])
253            )
254        )
255
256    retry_bounds =