meerschaum

Meerschaum banner

Meerschaum Python API

Welcome to the Meerschaum Python API technical documentation! Here you can find information about the classes and functions provided by the meerschaum package. Visit meerschaum.io for general usage documentation.

Root Module

For your convenience, the following classes and functions may be imported from the root meerschaum namespace:

Examples

Build a Connector

Get existing connectors or build a new one in-memory with the get_connector() factory function:

import meerschaum as mrsm

sql_conn = mrsm.get_connector(
    'sql:temp',
    flavor='sqlite',
    database='/tmp/tmp.db',
)
df = sql_conn.read("SELECT 1 AS foo")
print(df)
#    foo
# 0    1

sql_conn.to_sql(df, 'foo')
print(sql_conn.read('foo'))
#    foo
# 0    1

Create a Custom Connector Class

Decorate your connector classes with make_connector() to designate it as a custom connector:

from datetime import datetime, timezone
from random import randint
import meerschaum as mrsm
from meerschaum.utils.misc import round_time

@mrsm.make_connector
class FooConnector(mrsm.Connector):
    REQUIRED_ATTRIBUTES = ['username', 'password']

    def fetch(
        self,
        begin: datetime | None = None,
        end: datetime | None = None,
    ):
        now = begin or round_time(datetime.now(timezone.utc))
        return [
            {'ts': now, 'id': 1, 'vl': randint(1, 100)},
            {'ts': now, 'id': 2, 'vl': randint(1, 100)},
            {'ts': now, 'id': 3, 'vl': randint(1, 100)},
        ]

foo_conn = mrsm.get_connector(
    'foo:bar',
    username='foo',
    password='bar',
)
docs = foo_conn.fetch()

Build a Pipe

Build a Pipe in-memory:

from datetime import datetime
import meerschaum as mrsm

pipe = mrsm.Pipe(
    foo_conn, 'demo',
    instance=sql_conn,
    columns={'datetime': 'ts', 'id': 'id'},
    tags=['production'],
)
pipe.sync(begin=datetime(2024, 1, 1))
df = pipe.get_data()
print(df)
#           ts  id  vl
# 0 2024-01-01   1  97
# 1 2024-01-01   2  18
# 2 2024-01-01   3  96

Add temporary=True to skip registering the pipe in the pipes table.

Get Registered Pipes

The get_pipes() function returns a dictionary hierarchy of pipes by connector, metric, and location:

import meerschaum as mrsm

pipes = mrsm.get_pipes(instance='sql:temp')
pipe = pipes['foo:bar']['demo'][None]

Add as_list=True to flatten the hierarchy:

import meerschaum as mrsm

pipes = mrsm.get_pipes(
    tags=['production'],
    instance=sql_conn,
    as_list=True,
)
print(pipes)
# [Pipe('foo:bar', 'demo', instance='sql:temp')]

Import Plugins

You can import a plugin's module through Plugin.module:

import meerschaum as mrsm

plugin = mrsm.Plugin('noaa')
with mrsm.Venv(plugin):
    noaa = plugin.module

If your plugin has submodules, use meerschaum.plugins.from_plugin_import:

from meerschaum.plugins import from_plugin_import
get_defined_pipes = from_plugin_import('compose.utils.pipes', 'get_defined_pipes')

Import multiple plugins with meerschaum.plugins.import_plugins:

from meerschaum.plugins import import_plugins
noaa, compose = import_plugins('noaa', 'compose')

Create a Job

Create a Job with name and sysargs:

import meerschaum as mrsm

job = mrsm.Job('syncing-engine', 'sync pipes --loop')
success, msg = job.start()

Pass executor_keys as the connectors keys of an API instance to create a remote job:

import meerschaum as mrsm

job = mrsm.Job(
    'foo',
    'sync pipes -s daily',
    executor_keys='api:main',
)

Import from a Virtual Environment Use the Venv context manager to activate a virtual environment:

import meerschaum as mrsm

with mrsm.Venv('noaa'):
    import requests

print(requests.__file__)
# /home/bmeares/.config/meerschaum/venvs/noaa/lib/python3.12/site-packages/requests/__init__.py

To import packages which may not be installed, use attempt_import():

import meerschaum as mrsm

requests = mrsm.attempt_import('requests', venv='noaa')
print(requests.__file__)
# /home/bmeares/.config/meerschaum/venvs/noaa/lib/python3.12/site-packages/requests/__init__.py

Run Actions

Run sysargs with entry():

import meerschaum as mrsm

success, msg = mrsm.entry('show pipes + show version : x2')

Use meerschaum.actions.get_action() to access an action function directly:

from meerschaum.actions import get_action

show_pipes = get_action(['show', 'pipes'])
success, msg = show_pipes(connector_keys=['plugin:noaa'])

Get a dictionary of available subactions with meerschaum.actions.get_subactions():

from meerschaum.actions import get_subactions

subactions = get_subactions('show')
success, msg = subactions['pipes']()

Create a Plugin

Run bootstrap plugin to create a new plugin:

mrsm bootstrap plugin example

This will create example.py in your plugins directory (default ~/.config/meerschaum/plugins/, Windows: %APPDATA%\Meerschaum\plugins). You may paste the example code from the "Create a Custom Action" example below.

Open your plugin with edit plugin:

mrsm edit plugin example

Run edit plugin and paste the example code below to try out the features.

See the writing plugins guide for more in-depth documentation.

Create a Custom Action

Decorate a function with meerschaum.actions.make_action to designate it as an action. Subactions will be automatically detected if not decorated:

from meerschaum.actions import make_action

@make_action
def sing():
    print('What would you like me to sing?')
    return True, "Success"

def sing_tune():
    return False, "I don't know that song!"

def sing_song():
    print('Hello, World!')
    return True, "Success"

Use meerschaum.plugins.add_plugin_argument() to create new parameters for your action:

from meerschaum.plugins import make_action, add_plugin_argument

add_plugin_argument(
    '--song', type=str, help='What song to sing.',
)

@make_action
def sing_melody(action=None, song=None):
    to_sing = action[0] if action else song
    if not to_sing:
        return False, "Please tell me what to sing!"

    return True, f'~I am singing {to_sing}~'
mrsm sing melody lalala

mrsm sing melody --song do-re-mi

Add a Page to the Web Dashboard Use the decorators meerschaum.plugins.dash_plugin() and meerschaum.plugins.web_page() to add new pages to the web dashboard:

from meerschaum.plugins import dash_plugin, web_page

@dash_plugin
def init_dash(dash_app):

    import dash.html as html
    import dash_bootstrap_components as dbc
    from dash import Input, Output, no_update

    ### Routes to '/dash/my-page'
    @web_page('/my-page', login_required=False)
    def my_page():
        return dbc.Container([
            html.H1("Hello, World!"),
            dbc.Button("Click me", id='my-button'),
            html.Div(id="my-output-div"),
        ])

    @dash_app.callback(
        Output('my-output-div', 'children'),
        Input('my-button', 'n_clicks'),
    )
    def my_button_click(n_clicks):
        if not n_clicks:
            return no_update
        return html.P(f'You clicked {n_clicks} times!')

Submodules

meerschaum.actions
Access functions for actions and subactions.

meerschaum.config
Read and write the Meerschaum configuration registry.

meerschaum.connectors
Build connectors to interact with databases and fetch data.

meerschaum.jobs
Start background jobs.

meerschaum.plugins
Access plugin modules and other API utilties.

meerschaum.utils
Utility functions are available in several submodules:

 1#! /usr/bin/env python
 2# -*- coding: utf-8 -*-
 3# vim:fenc=utf-8
 4
 5"""
 6Copyright 2023 Bennett Meares
 7
 8Licensed under the Apache License, Version 2.0 (the "License");
 9you may not use this file except in compliance with the License.
10You may obtain a copy of the License at
11
12   http://www.apache.org/licenses/LICENSE-2.0
13
14Unless required by applicable law or agreed to in writing, software
15distributed under the License is distributed on an "AS IS" BASIS,
16WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17See the License for the specific language governing permissions and
18limitations under the License.
19"""
20
21import atexit
22from meerschaum.utils.typing import SuccessTuple
23from meerschaum.utils.packages import attempt_import
24from meerschaum.core.Pipe import Pipe
25from meerschaum.plugins import Plugin
26from meerschaum.utils.venv import Venv
27from meerschaum.jobs import Job, make_executor
28from meerschaum.connectors import get_connector, Connector, make_connector
29from meerschaum.utils import get_pipes
30from meerschaum.utils.formatting import pprint
31from meerschaum._internal.docs import index as __doc__
32from meerschaum.config import __version__, get_config
33from meerschaum._internal.entry import entry
34from meerschaum.__main__ import _close_pools
35
36atexit.register(_close_pools)
37
38__pdoc__ = {'gui': False, 'api': False, 'core': False, '_internal': False}
39__all__ = (
40    "get_pipes",
41    "get_connector",
42    "get_config",
43    "Pipe",
44    "Plugin",
45    "Venv",
46    "Plugin",
47    "Job",
48    "pprint",
49    "attempt_import",
50    "actions",
51    "config",
52    "connectors",
53    "jobs",
54    "plugins",
55    "utils",
56    "SuccessTuple",
57    "Connector",
58    "make_connector",
59    "entry",
60)
def get_pipes( connector_keys: Union[str, List[str], NoneType] = None, metric_keys: Union[str, List[str], NoneType] = None, location_keys: Union[str, List[str], NoneType] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, mrsm_instance: Union[str, meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, instance: Union[str, meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, as_list: bool = False, method: str = 'registered', debug: bool = False, **kw: Any) -> Union[Dict[str, Dict[str, Dict[str, Pipe]]], List[Pipe]]:
 19def get_pipes(
 20    connector_keys: Union[str, List[str], None] = None,
 21    metric_keys: Union[str, List[str], None] = None,
 22    location_keys: Union[str, List[str], None] = None,
 23    tags: Optional[List[str]] = None,
 24    params: Optional[Dict[str, Any]] = None,
 25    mrsm_instance: Union[str, InstanceConnector, None] = None,
 26    instance: Union[str, InstanceConnector, None] = None,
 27    as_list: bool = False,
 28    method: str = 'registered',
 29    debug: bool = False,
 30    **kw: Any
 31) -> Union[PipesDict, List[mrsm.Pipe]]:
 32    """
 33    Return a dictionary or list of `meerschaum.Pipe` objects.
 34
 35    Parameters
 36    ----------
 37    connector_keys: Union[str, List[str], None], default None
 38        String or list of connector keys.
 39        If omitted or is `'*'`, fetch all possible keys.
 40        If a string begins with `'_'`, select keys that do NOT match the string.
 41
 42    metric_keys: Union[str, List[str], None], default None
 43        String or list of metric keys. See `connector_keys` for formatting.
 44
 45    location_keys: Union[str, List[str], None], default None
 46        String or list of location keys. See `connector_keys` for formatting.
 47
 48    tags: Optional[List[str]], default None
 49         If provided, only include pipes with these tags.
 50
 51    params: Optional[Dict[str, Any]], default None
 52        Dictionary of additional parameters to search by.
 53        Params are parsed into a SQL WHERE clause.
 54        E.g. `{'a': 1, 'b': 2}` equates to `'WHERE a = 1 AND b = 2'`
 55
 56    mrsm_instance: Union[str, InstanceConnector, None], default None
 57        Connector keys for the Meerschaum instance of the pipes.
 58        Must be a `meerschaum.connectors.sql.SQLConnector.SQLConnector` or
 59        `meerschaum.connectors.api.APIConnector.APIConnector`.
 60        
 61    as_list: bool, default False
 62        If `True`, return pipes in a list instead of a hierarchical dictionary.
 63        `False` : `{connector_keys: {metric_key: {location_key: Pipe}}}`
 64        `True`  : `[Pipe]`
 65
 66    method: str, default 'registered'
 67        Available options: `['registered', 'explicit', 'all']`
 68        If `'registered'` (default), create pipes based on registered keys in the connector's pipes table
 69        (API or SQL connector, depends on mrsm_instance).
 70        If `'explicit'`, create pipes from provided connector_keys, metric_keys, and location_keys
 71        instead of consulting the pipes table. Useful for creating non-existent pipes.
 72        If `'all'`, create pipes from predefined metrics and locations. Required `connector_keys`.
 73        **NOTE:** Method `'all'` is not implemented!
 74
 75    **kw: Any:
 76        Keyword arguments to pass to the `meerschaum.Pipe` constructor.
 77        
 78
 79    Returns
 80    -------
 81    A dictionary of dictionaries and `meerschaum.Pipe` objects
 82    in the connector, metric, location hierarchy.
 83    If `as_list` is `True`, return a list of `meerschaum.Pipe` objects.
 84
 85    Examples
 86    --------
 87    ```
 88    >>> ### Manual definition:
 89    >>> pipes = {
 90    ...     <connector_keys>: {
 91    ...         <metric_key>: {
 92    ...             <location_key>: Pipe(
 93    ...                 <connector_keys>,
 94    ...                 <metric_key>,
 95    ...                 <location_key>,
 96    ...             ),
 97    ...         },
 98    ...     },
 99    ... },
100    >>> ### Accessing a single pipe:
101    >>> pipes['sql:main']['weather'][None]
102    >>> ### Return a list instead:
103    >>> get_pipes(as_list=True)
104    [sql_main_weather]
105    >>> 
106    ```
107    """
108
109    from meerschaum.config import get_config
110    from meerschaum.utils.warnings import error
111    from meerschaum.utils.misc import filter_keywords
112
113    if connector_keys is None:
114        connector_keys = []
115    if metric_keys is None:
116        metric_keys = []
117    if location_keys is None:
118        location_keys = []
119    if params is None:
120        params = {}
121    if tags is None:
122        tags = []
123
124    if isinstance(connector_keys, str):
125        connector_keys = [connector_keys]
126    if isinstance(metric_keys, str):
127        metric_keys = [metric_keys]
128    if isinstance(location_keys, str):
129        location_keys = [location_keys]
130
131    ### Get SQL or API connector (keys come from `connector.fetch_pipes_keys()`).
132    if mrsm_instance is None:
133        mrsm_instance = instance
134    if mrsm_instance is None:
135        mrsm_instance = get_config('meerschaum', 'instance', patch=True)
136    if isinstance(mrsm_instance, str):
137        from meerschaum.connectors.parse import parse_instance_keys
138        connector = parse_instance_keys(keys=mrsm_instance, debug=debug)
139    else:
140        from meerschaum.connectors import instance_types
141        valid_connector = False
142        if hasattr(mrsm_instance, 'type'):
143            if mrsm_instance.type in instance_types:
144                valid_connector = True
145        if not valid_connector:
146            error(f"Invalid instance connector: {mrsm_instance}")
147        connector = mrsm_instance
148    if debug:
149        from meerschaum.utils.debug import dprint
150        dprint(f"Using instance connector: {connector}")
151    if not connector:
152        error(f"Could not create connector from keys: '{mrsm_instance}'")
153
154    ### Get a list of tuples for the keys needed to build pipes.
155    result = fetch_pipes_keys(
156        method,
157        connector,
158        connector_keys = connector_keys,
159        metric_keys = metric_keys,
160        location_keys = location_keys,
161        tags = tags,
162        params = params,
163        debug = debug
164    )
165    if result is None:
166        error(f"Unable to build pipes!")
167
168    ### Populate the `pipes` dictionary with Pipes based on the keys
169    ### obtained from the chosen `method`.
170    from meerschaum import Pipe
171    pipes = {}
172    for ck, mk, lk in result:
173        if ck not in pipes:
174            pipes[ck] = {}
175
176        if mk not in pipes[ck]:
177            pipes[ck][mk] = {}
178
179        pipes[ck][mk][lk] = Pipe(
180            ck, mk, lk,
181            mrsm_instance = connector,
182            debug = debug,
183            **filter_keywords(Pipe, **kw)
184        )
185
186    if not as_list:
187        return pipes
188    from meerschaum.utils.misc import flatten_pipes_dict
189    return flatten_pipes_dict(pipes)

Return a dictionary or list of Pipe objects.

Parameters
  • connector_keys (Union[str, List[str], None], default None): String or list of connector keys. If omitted or is '*', fetch all possible keys. If a string begins with '_', select keys that do NOT match the string.
  • metric_keys (Union[str, List[str], None], default None): String or list of metric keys. See connector_keys for formatting.
  • location_keys (Union[str, List[str], None], default None): String or list of location keys. See connector_keys for formatting.
  • tags (Optional[List[str]], default None): If provided, only include pipes with these tags.
  • params (Optional[Dict[str, Any]], default None): Dictionary of additional parameters to search by. Params are parsed into a SQL WHERE clause. E.g. {'a': 1, 'b': 2} equates to 'WHERE a = 1 AND b = 2'
  • mrsm_instance (Union[str, InstanceConnector, None], default None): Connector keys for the Meerschaum instance of the pipes. Must be a meerschaum.connectors.SQLConnector or meerschaum.connectors.APIConnector.
  • as_list (bool, default False): If True, return pipes in a list instead of a hierarchical dictionary. False : {connector_keys: {metric_key: {location_key: Pipe}}} True : [Pipe]
  • method (str, default 'registered'): Available options: ['registered', 'explicit', 'all'] If 'registered' (default), create pipes based on registered keys in the connector's pipes table (API or SQL connector, depends on mrsm_instance). If 'explicit', create pipes from provided connector_keys, metric_keys, and location_keys instead of consulting the pipes table. Useful for creating non-existent pipes. If 'all', create pipes from predefined metrics and locations. Required connector_keys. NOTE: Method 'all' is not implemented!
  • **kw (Any:): Keyword arguments to pass to the Pipe constructor.
Returns
  • A dictionary of dictionaries and Pipe objects
  • in the connector, metric, location hierarchy.
  • If as_list is True, return a list of Pipe objects.
Examples
>>> ### Manual definition:
>>> pipes = {
...     <connector_keys>: {
...         <metric_key>: {
...             <location_key>: Pipe(
...                 <connector_keys>,
...                 <metric_key>,
...                 <location_key>,
...             ),
...         },
...     },
... },
>>> ### Accessing a single pipe:
>>> pipes['sql:main']['weather'][None]
>>> ### Return a list instead:
>>> get_pipes(as_list=True)
[sql_main_weather]
>>> 
def get_connector( type: str = None, label: str = None, refresh: bool = False, debug: bool = False, **kw: Any) -> Connector:
 80def get_connector(
 81    type: str = None,
 82    label: str = None,
 83    refresh: bool = False,
 84    debug: bool = False,
 85    **kw: Any
 86) -> Connector:
 87    """
 88    Return existing connector or create new connection and store for reuse.
 89    
 90    You can create new connectors if enough parameters are provided for the given type and flavor.
 91    
 92
 93    Parameters
 94    ----------
 95    type: Optional[str], default None
 96        Connector type (sql, api, etc.).
 97        Defaults to the type of the configured `instance_connector`.
 98
 99    label: Optional[str], default None
100        Connector label (e.g. main). Defaults to `'main'`.
101
102    refresh: bool, default False
103        Refresh the Connector instance / construct new object. Defaults to `False`.
104
105    kw: Any
106        Other arguments to pass to the Connector constructor.
107        If the Connector has already been constructed and new arguments are provided,
108        `refresh` is set to `True` and the old Connector is replaced.
109
110    Returns
111    -------
112    A new Meerschaum connector (e.g. `meerschaum.connectors.api.APIConnector`,
113    `meerschaum.connectors.sql.SQLConnector`).
114    
115    Examples
116    --------
117    The following parameters would create a new
118    `meerschaum.connectors.sql.SQLConnector` that isn't in the configuration file.
119
120    ```
121    >>> conn = get_connector(
122    ...     type = 'sql',
123    ...     label = 'newlabel',
124    ...     flavor = 'sqlite',
125    ...     database = '/file/path/to/database.db'
126    ... )
127    >>>
128    ```
129
130    """
131    from meerschaum.connectors.parse import parse_instance_keys
132    from meerschaum.config import get_config
133    from meerschaum.config.static import STATIC_CONFIG
134    from meerschaum.utils.warnings import warn
135    global _loaded_plugin_connectors
136    if isinstance(type, str) and not label and ':' in type:
137        type, label = type.split(':', maxsplit=1)
138
139    with _locks['_loaded_plugin_connectors']:
140        if not _loaded_plugin_connectors:
141            load_plugin_connectors()
142            _load_builtin_custom_connectors()
143            _loaded_plugin_connectors = True
144
145    if type is None and label is None:
146        default_instance_keys = get_config('meerschaum', 'instance', patch=True)
147        ### recursive call to get_connector
148        return parse_instance_keys(default_instance_keys)
149
150    ### NOTE: the default instance connector may not be main.
151    ### Only fall back to 'main' if the type is provided by the label is omitted.
152    label = label if label is not None else STATIC_CONFIG['connectors']['default_label']
153
154    ### type might actually be a label. Check if so and raise a warning.
155    if type not in connectors:
156        possibilities, poss_msg = [], ""
157        for _type in get_config('meerschaum', 'connectors'):
158            if type in get_config('meerschaum', 'connectors', _type):
159                possibilities.append(f"{_type}:{type}")
160        if len(possibilities) > 0:
161            poss_msg = " Did you mean"
162            for poss in possibilities[:-1]:
163                poss_msg += f" '{poss}',"
164            if poss_msg.endswith(','):
165                poss_msg = poss_msg[:-1]
166            if len(possibilities) > 1:
167                poss_msg += " or"
168            poss_msg += f" '{possibilities[-1]}'?"
169
170        warn(f"Cannot create Connector of type '{type}'." + poss_msg, stack=False)
171        return None
172
173    if 'sql' not in types:
174        from meerschaum.connectors.plugin import PluginConnector
175        from meerschaum.connectors.valkey import ValkeyConnector
176        with _locks['types']:
177            types.update({
178                'api': APIConnector,
179                'sql': SQLConnector,
180                'plugin': PluginConnector,
181                'valkey': ValkeyConnector,
182            })
183
184    ### determine if we need to call the constructor
185    if not refresh:
186        ### see if any user-supplied arguments differ from the existing instance
187        if label in connectors[type]:
188            warning_message = None
189            for attribute, value in kw.items():
190                if attribute not in connectors[type][label].meta:
191                    import inspect
192                    cls = connectors[type][label].__class__
193                    cls_init_signature = inspect.signature(cls)
194                    cls_init_params = cls_init_signature.parameters
195                    if attribute not in cls_init_params:
196                        warning_message = (
197                            f"Received new attribute '{attribute}' not present in connector " +
198                            f"{connectors[type][label]}.\n"
199                        )
200                elif connectors[type][label].__dict__[attribute] != value:
201                    warning_message = (
202                        f"Mismatched values for attribute '{attribute}' in connector "
203                        + f"'{connectors[type][label]}'.\n" +
204                        f"  - Keyword value: '{value}'\n" +
205                        f"  - Existing value: '{connectors[type][label].__dict__[attribute]}'\n"
206                    )
207            if warning_message is not None:
208                warning_message += (
209                    "\nSetting `refresh` to True and recreating connector with type:"
210                    + f" '{type}' and label '{label}'."
211                )
212                refresh = True
213                warn(warning_message)
214        else: ### connector doesn't yet exist
215            refresh = True
216
217    ### only create an object if refresh is True
218    ### (can be manually specified, otherwise determined above)
219    if refresh:
220        with _locks['connectors']:
221            try:
222                ### will raise an error if configuration is incorrect / missing
223                conn = types[type](label=label, **kw)
224                connectors[type][label] = conn
225            except InvalidAttributesError as ie:
226                warn(
227                    f"Incorrect attributes for connector '{type}:{label}'.\n"
228                    + str(ie),
229                    stack = False,
230                )
231                conn = None
232            except Exception as e:
233                from meerschaum.utils.formatting import get_console
234                console = get_console()
235                if console:
236                    console.print_exception()
237                warn(
238                    f"Exception when creating connector '{type}:{label}'.\n" + str(e),
239                    stack = False,
240                )
241                conn = None
242        if conn is None:
243            return None
244
245    return connectors[type][label]

Return existing connector or create new connection and store for reuse.

You can create new connectors if enough parameters are provided for the given type and flavor.

Parameters
  • type (Optional[str], default None): Connector type (sql, api, etc.). Defaults to the type of the configured instance_connector.
  • label (Optional[str], default None): Connector label (e.g. main). Defaults to 'main'.
  • refresh (bool, default False): Refresh the Connector instance / construct new object. Defaults to False.
  • kw (Any): Other arguments to pass to the Connector constructor. If the Connector has already been constructed and new arguments are provided, refresh is set to True and the old Connector is replaced.
Returns
Examples

The following parameters would create a new meerschaum.connectors.SQLConnector that isn't in the configuration file.

>>> conn = get_connector(
...     type = 'sql',
...     label = 'newlabel',
...     flavor = 'sqlite',
...     database = '/file/path/to/database.db'
... )
>>>
def get_config( *keys: str, patch: bool = True, substitute: bool = True, sync_files: bool = True, write_missing: bool = True, as_tuple: bool = False, warn: bool = True, debug: bool = False) -> Any:
 82def get_config(
 83        *keys: str,
 84        patch: bool = True,
 85        substitute: bool = True,
 86        sync_files: bool = True,
 87        write_missing: bool = True,
 88        as_tuple: bool = False,
 89        warn: bool = True,
 90        debug: bool = False
 91    ) -> Any:
 92    """
 93    Return the Meerschaum configuration dictionary.
 94    If positional arguments are provided, index by the keys.
 95    Raises a warning if invalid keys are provided.
 96
 97    Parameters
 98    ----------
 99    keys: str:
100        List of strings to index.
101
102    patch: bool, default True
103        If `True`, patch missing default keys into the config directory.
104        Defaults to `True`.
105
106    sync_files: bool, default True
107        If `True`, sync files if needed.
108        Defaults to `True`.
109
110    write_missing: bool, default True
111        If `True`, write default values when the main config files are missing.
112        Defaults to `True`.
113
114    substitute: bool, default True
115        If `True`, subsitute 'MRSM{}' values.
116        Defaults to `True`.
117
118    as_tuple: bool, default False
119        If `True`, return a tuple of type (success, value).
120        Defaults to `False`.
121        
122    Returns
123    -------
124    The value in the configuration directory, indexed by the provided keys.
125
126    Examples
127    --------
128    >>> get_config('meerschaum', 'instance')
129    'sql:main'
130    >>> get_config('does', 'not', 'exist')
131    UserWarning: Invalid keys in config: ('does', 'not', 'exist')
132    """
133    import json
134
135    symlinks_key = STATIC_CONFIG['config']['symlinks_key']
136    if debug:
137        from meerschaum.utils.debug import dprint
138        dprint(f"Indexing keys: {keys}", color=False)
139
140    if len(keys) == 0:
141        _rc = _config(substitute=substitute, sync_files=sync_files, write_missing=write_missing)
142        if as_tuple:
143            return True, _rc 
144        return _rc
145    
146    ### Weird threading issues, only import if substitute is True.
147    if substitute:
148        from meerschaum.config._read_config import search_and_substitute_config
149    ### Invalidate the cache if it was read before with substitute=False
150    ### but there still exist substitutions.
151    if (
152        config is not None and substitute and keys[0] != symlinks_key
153        and 'MRSM{' in json.dumps(config.get(keys[0]))
154    ):
155        try:
156            _subbed = search_and_substitute_config({keys[0]: config[keys[0]]})
157        except Exception as e:
158            import traceback
159            traceback.print_exc()
160        config[keys[0]] = _subbed[keys[0]]
161        if symlinks_key in _subbed:
162            if symlinks_key not in config:
163                config[symlinks_key] = {}
164            if keys[0] not in config[symlinks_key]:
165                config[symlinks_key][keys[0]] = {}
166            config[symlinks_key][keys[0]] = apply_patch_to_config(
167                _subbed,
168                config[symlinks_key][keys[0]]
169            )
170
171    from meerschaum.config._sync import sync_files as _sync_files
172    if config is None:
173        _config(*keys, sync_files=sync_files)
174
175    invalid_keys = False
176    if keys[0] not in config and keys[0] != symlinks_key:
177        single_key_config = read_config(
178            keys=[keys[0]], substitute=substitute, write_missing=write_missing
179        )
180        if keys[0] not in single_key_config:
181            invalid_keys = True
182        else:
183            config[keys[0]] = single_key_config.get(keys[0], None)
184            if symlinks_key in single_key_config and keys[0] in single_key_config[symlinks_key]:
185                if symlinks_key not in config:
186                    config[symlinks_key] = {}
187                config[symlinks_key][keys[0]] = single_key_config[symlinks_key][keys[0]]
188
189            if sync_files:
190                _sync_files(keys=[keys[0]])
191
192    c = config
193    if len(keys) > 0:
194        for k in keys:
195            try:
196                c = c[k]
197            except Exception as e:
198                invalid_keys = True
199                break
200        if invalid_keys:
201            ### Check if the keys are in the default configuration.
202            from meerschaum.config._default import default_config
203            in_default = True
204            patched_default_config = (
205                search_and_substitute_config(default_config)
206                if substitute else copy.deepcopy(default_config)
207            )
208            _c = patched_default_config
209            for k in keys:
210                try:
211                    _c = _c[k]
212                except Exception as e:
213                    in_default = False
214            if in_default:
215                c = _c
216                invalid_keys = False
217            warning_msg = f"Invalid keys in config: {keys}"
218            if not in_default:
219                try:
220                    if warn:
221                        from meerschaum.utils.warnings import warn as _warn
222                        _warn(warning_msg, stacklevel=3, color=False)
223                except Exception as e:
224                    if warn:
225                        print(warning_msg)
226                if as_tuple:
227                    return False, None
228                return None
229
230            ### Don't write keys that we haven't yet loaded into memory.
231            not_loaded_keys = [k for k in patched_default_config if k not in config]
232            for k in not_loaded_keys:
233                patched_default_config.pop(k, None)
234
235            set_config(
236                apply_patch_to_config(
237                    patched_default_config,
238                    config,
239                )
240            )
241            if patch and keys[0] != symlinks_key:
242                if write_missing:
243                    write_config(config, debug=debug)
244
245    if as_tuple:
246        return (not invalid_keys), c
247    return c

Return the Meerschaum configuration dictionary. If positional arguments are provided, index by the keys. Raises a warning if invalid keys are provided.

Parameters
  • keys (str:): List of strings to index.
  • patch (bool, default True): If True, patch missing default keys into the config directory. Defaults to True.
  • sync_files (bool, default True): If True, sync files if needed. Defaults to True.
  • write_missing (bool, default True): If True, write default values when the main config files are missing. Defaults to True.
  • substitute (bool, default True): If True, subsitute 'MRSM{}' values. Defaults to True.
  • as_tuple (bool, default False): If True, return a tuple of type (success, value). Defaults to False.
Returns
  • The value in the configuration directory, indexed by the provided keys.
Examples
>>> get_config('meerschaum', 'instance')
'sql:main'
>>> get_config('does', 'not', 'exist')
UserWarning: Invalid keys in config: ('does', 'not', 'exist')
class Pipe:
 60class Pipe:
 61    """
 62    Access Meerschaum pipes via Pipe objects.
 63    
 64    Pipes are identified by the following:
 65
 66    1. Connector keys (e.g. `'sql:main'`)
 67    2. Metric key (e.g. `'weather'`)
 68    3. Location (optional; e.g. `None`)
 69    
 70    A pipe's connector keys correspond to a data source, and when the pipe is synced,
 71    its `fetch` definition is evaluated and executed to produce new data.
 72    
 73    Alternatively, new data may be directly synced via `pipe.sync()`:
 74    
 75    ```
 76    >>> from meerschaum import Pipe
 77    >>> pipe = Pipe('csv', 'weather')
 78    >>>
 79    >>> import pandas as pd
 80    >>> df = pd.read_csv('weather.csv')
 81    >>> pipe.sync(df)
 82    ```
 83    """
 84
 85    from ._fetch import (
 86        fetch,
 87        get_backtrack_interval,
 88    )
 89    from ._data import (
 90        get_data,
 91        get_backtrack_data,
 92        get_rowcount,
 93        _get_data_as_iterator,
 94        get_chunk_interval,
 95        get_chunk_bounds,
 96    )
 97    from ._register import register
 98    from ._attributes import (
 99        attributes,
100        parameters,
101        columns,
102        dtypes,
103        get_columns,
104        get_columns_types,
105        get_indices,
106        tags,
107        get_id,
108        id,
109        get_val_column,
110        parents,
111        children,
112        target,
113        _target_legacy,
114        guess_datetime,
115    )
116    from ._show import show
117    from ._edit import edit, edit_definition, update
118    from ._sync import (
119        sync,
120        get_sync_time,
121        exists,
122        filter_existing,
123        _get_chunk_label,
124        get_num_workers,
125        _persist_new_json_columns,
126        _persist_new_numeric_columns,
127    )
128    from ._verify import (
129        verify,
130        get_bound_interval,
131        get_bound_time,
132    )
133    from ._delete import delete
134    from ._drop import drop
135    from ._clear import clear
136    from ._deduplicate import deduplicate
137    from ._bootstrap import bootstrap
138    from ._dtypes import enforce_dtypes, infer_dtypes
139    from ._copy import copy_to
140
141    def __init__(
142        self,
143        connector: str = '',
144        metric: str = '',
145        location: Optional[str] = None,
146        parameters: Optional[Dict[str, Any]] = None,
147        columns: Union[Dict[str, str], List[str], None] = None,
148        tags: Optional[List[str]] = None,
149        target: Optional[str] = None,
150        dtypes: Optional[Dict[str, str]] = None,
151        instance: Optional[Union[str, InstanceConnector]] = None,
152        temporary: bool = False,
153        mrsm_instance: Optional[Union[str, InstanceConnector]] = None,
154        cache: bool = False,
155        debug: bool = False,
156        connector_keys: Optional[str] = None,
157        metric_key: Optional[str] = None,
158        location_key: Optional[str] = None,
159    ):
160        """
161        Parameters
162        ----------
163        connector: str
164            Keys for the pipe's source connector, e.g. `'sql:main'`.
165
166        metric: str
167            Label for the pipe's contents, e.g. `'weather'`.
168
169        location: str, default None
170            Label for the pipe's location. Defaults to `None`.
171
172        parameters: Optional[Dict[str, Any]], default None
173            Optionally set a pipe's parameters from the constructor,
174            e.g. columns and other attributes.
175            You can edit these parameters with `edit pipes`.
176
177        columns: Optional[Dict[str, str]], default None
178            Set the `columns` dictionary of `parameters`.
179            If `parameters` is also provided, this dictionary is added under the `'columns'` key.
180
181        tags: Optional[List[str]], default None
182            A list of strings to be added under the `'tags'` key of `parameters`.
183            You can select pipes with certain tags using `--tags`.
184
185        dtypes: Optional[Dict[str, str]], default None
186            Set the `dtypes` dictionary of `parameters`.
187            If `parameters` is also provided, this dictionary is added under the `'dtypes'` key.
188
189        mrsm_instance: Optional[Union[str, InstanceConnector]], default None
190            Connector for the Meerschaum instance where the pipe resides.
191            Defaults to the preconfigured default instance (`'sql:main'`).
192
193        instance: Optional[Union[str, InstanceConnector]], default None
194            Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored.
195
196        temporary: bool, default False
197            If `True`, prevent instance tables (pipes, users, plugins) from being created.
198
199        cache: bool, default False
200            If `True`, cache fetched data into a local database file.
201            Defaults to `False`.
202        """
203        from meerschaum.utils.warnings import error, warn
204        if (not connector and not connector_keys) or (not metric and not metric_key):
205            error(
206                "Please provide strings for the connector and metric\n    "
207                + "(first two positional arguments)."
208            )
209
210        ### Fall back to legacy `location_key` just in case.
211        if not location:
212            location = location_key
213
214        if not connector:
215            connector = connector_keys
216
217        if not metric:
218            metric = metric_key
219
220        if location in ('[None]', 'None'):
221            location = None
222
223        from meerschaum.config.static import STATIC_CONFIG
224        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
225        for k in (connector, metric, location, *(tags or [])):
226            if str(k).startswith(negation_prefix):
227                error(f"A pipe's keys and tags cannot start with the prefix '{negation_prefix}'.")
228
229        self.connector_keys = str(connector)
230        self.connector_key = self.connector_keys ### Alias
231        self.metric_key = metric
232        self.location_key = location
233        self.temporary = temporary
234
235        self._attributes = {
236            'connector_keys': self.connector_keys,
237            'metric_key': self.metric_key,
238            'location_key': self.location_key,
239            'parameters': {},
240        }
241
242        ### only set parameters if values are provided
243        if isinstance(parameters, dict):
244            self._attributes['parameters'] = parameters
245        else:
246            if parameters is not None:
247                warn(f"The provided parameters are of invalid type '{type(parameters)}'.")
248            self._attributes['parameters'] = {}
249
250        columns = columns or self._attributes.get('parameters', {}).get('columns', {})
251        if isinstance(columns, list):
252            columns = {str(col): str(col) for col in columns}
253        if isinstance(columns, dict):
254            self._attributes['parameters']['columns'] = columns
255        elif columns is not None:
256            warn(f"The provided columns are of invalid type '{type(columns)}'.")
257
258        if isinstance(tags, (list, tuple)):
259            self._attributes['parameters']['tags'] = tags
260        elif tags is not None:
261            warn(f"The provided tags are of invalid type '{type(tags)}'.")
262
263        if isinstance(target, str):
264            self._attributes['parameters']['target'] = target
265        elif target is not None:
266            warn(f"The provided target is of invalid type '{type(target)}'.")
267
268        if isinstance(dtypes, dict):
269            self._attributes['parameters']['dtypes'] = dtypes
270        elif dtypes is not None:
271            warn(f"The provided dtypes are of invalid type '{type(dtypes)}'.")
272
273        ### NOTE: The parameters dictionary is {} by default.
274        ###       A Pipe may be registered without parameters, then edited,
275        ###       or a Pipe may be registered with parameters set in-memory first.
276        #  from meerschaum.config import get_config
277        _mrsm_instance = mrsm_instance if mrsm_instance is not None else instance
278        if _mrsm_instance is None:
279            _mrsm_instance = get_config('meerschaum', 'instance', patch=True)
280
281        if not isinstance(_mrsm_instance, str):
282            self._instance_connector = _mrsm_instance
283            self.instance_keys = str(_mrsm_instance)
284        else: ### NOTE: must be SQL or API Connector for this work
285            self.instance_keys = _mrsm_instance
286
287        self._cache = cache and get_config('system', 'experimental', 'cache')
288
289
290    @property
291    def meta(self):
292        """
293        Return the four keys needed to reconstruct this pipe.
294        """
295        return {
296            'connector': self.connector_keys,
297            'metric': self.metric_key,
298            'location': self.location_key,
299            'instance': self.instance_keys,
300        }
301
302
303    def keys(self) -> List[str]:
304        """
305        Return the ordered keys for this pipe.
306        """
307        return {
308            key: val
309            for key, val in self.meta.items()
310            if key != 'instance'
311        }
312
313
314    @property
315    def instance_connector(self) -> Union[InstanceConnector, None]:
316        """
317        The connector to where this pipe resides.
318        May either be of type `meerschaum.connectors.sql.SQLConnector` or
319        `meerschaum.connectors.api.APIConnector`.
320        """
321        if '_instance_connector' not in self.__dict__:
322            from meerschaum.connectors.parse import parse_instance_keys
323            conn = parse_instance_keys(self.instance_keys)
324            if conn:
325                self._instance_connector = conn
326            else:
327                return None
328        return self._instance_connector
329
330    @property
331    def connector(self) -> Union[meerschaum.connectors.Connector, None]:
332        """
333        The connector to the data source.
334        """
335        if '_connector' not in self.__dict__:
336            from meerschaum.connectors.parse import parse_instance_keys
337            import warnings
338            with warnings.catch_warnings():
339                warnings.simplefilter('ignore')
340                try:
341                    conn = parse_instance_keys(self.connector_keys)
342                except Exception as e:
343                    conn = None
344            if conn:
345                self._connector = conn
346            else:
347                return None
348        return self._connector
349
350
351    @property
352    def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]:
353        """
354        If the pipe was created with `cache=True`, return the connector to the pipe's
355        SQLite database for caching.
356        """
357        if not self._cache:
358            return None
359
360        if '_cache_connector' not in self.__dict__:
361            from meerschaum.connectors import get_connector
362            from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH
363            _resources_path = SQLITE_RESOURCES_PATH
364            self._cache_connector = get_connector(
365                'sql', '_cache_' + str(self),
366                flavor='sqlite',
367                database=str(_resources_path / ('_cache_' + str(self) + '.db')),
368            )
369
370        return self._cache_connector
371
372
373    @property
374    def cache_pipe(self) -> Union['meerschaum.Pipe', None]:
375        """
376        If the pipe was created with `cache=True`, return another `meerschaum.Pipe` used to
377        manage the local data.
378        """
379        if self.cache_connector is None:
380            return None
381        if '_cache_pipe' not in self.__dict__:
382            from meerschaum.config._patch import apply_patch_to_config
383            from meerschaum.utils.sql import sql_item_name
384            _parameters = copy.deepcopy(self.parameters)
385            _fetch_patch = {
386                'fetch': ({
387                    'definition': (
388                        f"SELECT * FROM "
389                        + sql_item_name(
390                            str(self.target),
391                            self.instance_connector.flavor,
392                            self.instance_connector.get_pipe_schema(self),
393                        )
394                    ),
395                }) if self.instance_connector.type == 'sql' else ({
396                    'connector_keys': self.connector_keys,
397                    'metric_key': self.metric_key,
398                    'location_key': self.location_key,
399                })
400            }
401            _parameters = apply_patch_to_config(_parameters, _fetch_patch)
402            self._cache_pipe = Pipe(
403                self.instance_keys,
404                (self.connector_keys + '_' + self.metric_key + '_cache'),
405                self.location_key,
406                mrsm_instance = self.cache_connector,
407                parameters = _parameters,
408                cache = False,
409                temporary = True,
410            )
411
412        return self._cache_pipe
413
414
415    def __str__(self, ansi: bool=False):
416        return pipe_repr(self, ansi=ansi)
417
418
419    def __eq__(self, other):
420        try:
421            return (
422                isinstance(self, type(other))
423                and self.connector_keys == other.connector_keys
424                and self.metric_key == other.metric_key
425                and self.location_key == other.location_key
426                and self.instance_keys == other.instance_keys
427            )
428        except Exception as e:
429            return False
430
431    def __hash__(self):
432        ### Using an esoteric separator to avoid collisions.
433        sep = "[\"']"
434        return hash(
435            str(self.connector_keys) + sep
436            + str(self.metric_key) + sep
437            + str(self.location_key) + sep
438            + str(self.instance_keys) + sep
439        )
440
441    def __repr__(self, ansi: bool=True, **kw) -> str:
442        if not hasattr(sys, 'ps1'):
443            ansi = False
444
445        return pipe_repr(self, ansi=ansi, **kw)
446
447    def __pt_repr__(self):
448        from meerschaum.utils.packages import attempt_import
449        prompt_toolkit_formatted_text = attempt_import('prompt_toolkit.formatted_text', lazy=False)
450        return prompt_toolkit_formatted_text.ANSI(pipe_repr(self, ansi=True))
451
452    def __getstate__(self) -> Dict[str, Any]:
453        """
454        Define the state dictionary (pickling).
455        """
456        return {
457            'connector': self.connector_keys,
458            'metric': self.metric_key,
459            'location': self.location_key,
460            'parameters': self.parameters,
461            'instance': self.instance_keys,
462        }
463
464    def __setstate__(self, _state: Dict[str, Any]):
465        """
466        Read the state (unpickling).
467        """
468        self.__init__(**_state)
469
470
471    def __getitem__(self, key: str) -> Any:
472        """
473        Index the pipe's attributes.
474        If the `key` cannot be found`, return `None`.
475        """
476        if key in self.attributes:
477            return self.attributes.get(key, None)
478
479        aliases = {
480            'connector': 'connector_keys',
481            'connector_key': 'connector_keys',
482            'metric': 'metric_key',
483            'location': 'location_key',
484        }
485        aliased_key = aliases.get(key, None)
486        if aliased_key is not None:
487            return self.attributes.get(aliased_key, None)
488
489        property_aliases = {
490            'instance': 'instance_keys',
491            'instance_key': 'instance_keys',
492        }
493        aliased_key = property_aliases.get(key, None)
494        if aliased_key is not None:
495            key = aliased_key
496        return getattr(self, key, None)

Access Meerschaum pipes via Pipe objects.

Pipes are identified by the following:

  1. Connector keys (e.g. 'sql:main')
  2. Metric key (e.g. 'weather')
  3. Location (optional; e.g. None)

A pipe's connector keys correspond to a data source, and when the pipe is synced, its fetch definition is evaluated and executed to produce new data.

Alternatively, new data may be directly synced via pipe.sync():

>>> from meerschaum import Pipe
>>> pipe = Pipe('csv', 'weather')
>>>
>>> import pandas as pd
>>> df = pd.read_csv('weather.csv')
>>> pipe.sync(df)
Pipe( connector: str = '', metric: str = '', location: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None, columns: Union[Dict[str, str], List[str], NoneType] = None, tags: Optional[List[str]] = None, target: Optional[str] = None, dtypes: Optional[Dict[str, str]] = None, instance: Union[str, meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, temporary: bool = False, mrsm_instance: Union[str, meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, cache: bool = False, debug: bool = False, connector_keys: Optional[str] = None, metric_key: Optional[str] = None, location_key: Optional[str] = None)
141    def __init__(
142        self,
143        connector: str = '',
144        metric: str = '',
145        location: Optional[str] = None,
146        parameters: Optional[Dict[str, Any]] = None,
147        columns: Union[Dict[str, str], List[str], None] = None,
148        tags: Optional[List[str]] = None,
149        target: Optional[str] = None,
150        dtypes: Optional[Dict[str, str]] = None,
151        instance: Optional[Union[str, InstanceConnector]] = None,
152        temporary: bool = False,
153        mrsm_instance: Optional[Union[str, InstanceConnector]] = None,
154        cache: bool = False,
155        debug: bool = False,
156        connector_keys: Optional[str] = None,
157        metric_key: Optional[str] = None,
158        location_key: Optional[str] = None,
159    ):
160        """
161        Parameters
162        ----------
163        connector: str
164            Keys for the pipe's source connector, e.g. `'sql:main'`.
165
166        metric: str
167            Label for the pipe's contents, e.g. `'weather'`.
168
169        location: str, default None
170            Label for the pipe's location. Defaults to `None`.
171
172        parameters: Optional[Dict[str, Any]], default None
173            Optionally set a pipe's parameters from the constructor,
174            e.g. columns and other attributes.
175            You can edit these parameters with `edit pipes`.
176
177        columns: Optional[Dict[str, str]], default None
178            Set the `columns` dictionary of `parameters`.
179            If `parameters` is also provided, this dictionary is added under the `'columns'` key.
180
181        tags: Optional[List[str]], default None
182            A list of strings to be added under the `'tags'` key of `parameters`.
183            You can select pipes with certain tags using `--tags`.
184
185        dtypes: Optional[Dict[str, str]], default None
186            Set the `dtypes` dictionary of `parameters`.
187            If `parameters` is also provided, this dictionary is added under the `'dtypes'` key.
188
189        mrsm_instance: Optional[Union[str, InstanceConnector]], default None
190            Connector for the Meerschaum instance where the pipe resides.
191            Defaults to the preconfigured default instance (`'sql:main'`).
192
193        instance: Optional[Union[str, InstanceConnector]], default None
194            Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored.
195
196        temporary: bool, default False
197            If `True`, prevent instance tables (pipes, users, plugins) from being created.
198
199        cache: bool, default False
200            If `True`, cache fetched data into a local database file.
201            Defaults to `False`.
202        """
203        from meerschaum.utils.warnings import error, warn
204        if (not connector and not connector_keys) or (not metric and not metric_key):
205            error(
206                "Please provide strings for the connector and metric\n    "
207                + "(first two positional arguments)."
208            )
209
210        ### Fall back to legacy `location_key` just in case.
211        if not location:
212            location = location_key
213
214        if not connector:
215            connector = connector_keys
216
217        if not metric:
218            metric = metric_key
219
220        if location in ('[None]', 'None'):
221            location = None
222
223        from meerschaum.config.static import STATIC_CONFIG
224        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
225        for k in (connector, metric, location, *(tags or [])):
226            if str(k).startswith(negation_prefix):
227                error(f"A pipe's keys and tags cannot start with the prefix '{negation_prefix}'.")
228
229        self.connector_keys = str(connector)
230        self.connector_key = self.connector_keys ### Alias
231        self.metric_key = metric
232        self.location_key = location
233        self.temporary = temporary
234
235        self._attributes = {
236            'connector_keys': self.connector_keys,
237            'metric_key': self.metric_key,
238            'location_key': self.location_key,
239            'parameters': {},
240        }
241
242        ### only set parameters if values are provided
243        if isinstance(parameters, dict):
244            self._attributes['parameters'] = parameters
245        else:
246            if parameters is not None:
247                warn(f"The provided parameters are of invalid type '{type(parameters)}'.")
248            self._attributes['parameters'] = {}
249
250        columns = columns or self._attributes.get('parameters', {}).get('columns', {})
251        if isinstance(columns, list):
252            columns = {str(col): str(col) for col in columns}
253        if isinstance(columns, dict):
254            self._attributes['parameters']['columns'] = columns
255        elif columns is not None:
256            warn(f"The provided columns are of invalid type '{type(columns)}'.")
257
258        if isinstance(tags, (list, tuple)):
259            self._attributes['parameters']['tags'] = tags
260        elif tags is not None:
261            warn(f"The provided tags are of invalid type '{type(tags)}'.")
262
263        if isinstance(target, str):
264            self._attributes['parameters']['target'] = target
265        elif target is not None:
266            warn(f"The provided target is of invalid type '{type(target)}'.")
267
268        if isinstance(dtypes, dict):
269            self._attributes['parameters']['dtypes'] = dtypes
270        elif dtypes is not None:
271            warn(f"The provided dtypes are of invalid type '{type(dtypes)}'.")
272
273        ### NOTE: The parameters dictionary is {} by default.
274        ###       A Pipe may be registered without parameters, then edited,
275        ###       or a Pipe may be registered with parameters set in-memory first.
276        #  from meerschaum.config import get_config
277        _mrsm_instance = mrsm_instance if mrsm_instance is not None else instance
278        if _mrsm_instance is None:
279            _mrsm_instance = get_config('meerschaum', 'instance', patch=True)
280
281        if not isinstance(_mrsm_instance, str):
282            self._instance_connector = _mrsm_instance
283            self.instance_keys = str(_mrsm_instance)
284        else: ### NOTE: must be SQL or API Connector for this work
285            self.instance_keys = _mrsm_instance
286
287        self._cache = cache and get_config('system', 'experimental', 'cache')
Parameters
  • connector (str): Keys for the pipe's source connector, e.g. 'sql:main'.
  • metric (str): Label for the pipe's contents, e.g. 'weather'.
  • location (str, default None): Label for the pipe's location. Defaults to None.
  • parameters (Optional[Dict[str, Any]], default None): Optionally set a pipe's parameters from the constructor, e.g. columns and other attributes. You can edit these parameters with edit pipes.
  • columns (Optional[Dict[str, str]], default None): Set the columns dictionary of parameters. If parameters is also provided, this dictionary is added under the 'columns' key.
  • tags (Optional[List[str]], default None): A list of strings to be added under the 'tags' key of parameters. You can select pipes with certain tags using --tags.
  • dtypes (Optional[Dict[str, str]], default None): Set the dtypes dictionary of parameters. If parameters is also provided, this dictionary is added under the 'dtypes' key.
  • mrsm_instance (Optional[Union[str, InstanceConnector]], default None): Connector for the Meerschaum instance where the pipe resides. Defaults to the preconfigured default instance ('sql:main').
  • instance (Optional[Union[str, InstanceConnector]], default None): Alias for mrsm_instance. If mrsm_instance is supplied, this value is ignored.
  • temporary (bool, default False): If True, prevent instance tables (pipes, users, plugins) from being created.
  • cache (bool, default False): If True, cache fetched data into a local database file. Defaults to False.
connector_keys
connector_key
metric_key
location_key
temporary
meta
290    @property
291    def meta(self):
292        """
293        Return the four keys needed to reconstruct this pipe.
294        """
295        return {
296            'connector': self.connector_keys,
297            'metric': self.metric_key,
298            'location': self.location_key,
299            'instance': self.instance_keys,
300        }

Return the four keys needed to reconstruct this pipe.

def keys(self) -> List[str]:
303    def keys(self) -> List[str]:
304        """
305        Return the ordered keys for this pipe.
306        """
307        return {
308            key: val
309            for key, val in self.meta.items()
310            if key != 'instance'
311        }

Return the ordered keys for this pipe.

instance_connector: Union[meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType]
314    @property
315    def instance_connector(self) -> Union[InstanceConnector, None]:
316        """
317        The connector to where this pipe resides.
318        May either be of type `meerschaum.connectors.sql.SQLConnector` or
319        `meerschaum.connectors.api.APIConnector`.
320        """
321        if '_instance_connector' not in self.__dict__:
322            from meerschaum.connectors.parse import parse_instance_keys
323            conn = parse_instance_keys(self.instance_keys)
324            if conn:
325                self._instance_connector = conn
326            else:
327                return None
328        return self._instance_connector

The connector to where this pipe resides. May either be of type meerschaum.connectors.SQLConnector or meerschaum.connectors.APIConnector.

connector: Optional[Connector]
330    @property
331    def connector(self) -> Union[meerschaum.connectors.Connector, None]:
332        """
333        The connector to the data source.
334        """
335        if '_connector' not in self.__dict__:
336            from meerschaum.connectors.parse import parse_instance_keys
337            import warnings
338            with warnings.catch_warnings():
339                warnings.simplefilter('ignore')
340                try:
341                    conn = parse_instance_keys(self.connector_keys)
342                except Exception as e:
343                    conn = None
344            if conn:
345                self._connector = conn
346            else:
347                return None
348        return self._connector

The connector to the data source.

cache_connector: Optional[meerschaum.connectors.SQLConnector]
351    @property
352    def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]:
353        """
354        If the pipe was created with `cache=True`, return the connector to the pipe's
355        SQLite database for caching.
356        """
357        if not self._cache:
358            return None
359
360        if '_cache_connector' not in self.__dict__:
361            from meerschaum.connectors import get_connector
362            from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH
363            _resources_path = SQLITE_RESOURCES_PATH
364            self._cache_connector = get_connector(
365                'sql', '_cache_' + str(self),
366                flavor='sqlite',
367                database=str(_resources_path / ('_cache_' + str(self) + '.db')),
368            )
369
370        return self._cache_connector

If the pipe was created with cache=True, return the connector to the pipe's SQLite database for caching.

cache_pipe: Optional[Pipe]
373    @property
374    def cache_pipe(self) -> Union['meerschaum.Pipe', None]:
375        """
376        If the pipe was created with `cache=True`, return another `meerschaum.Pipe` used to
377        manage the local data.
378        """
379        if self.cache_connector is None:
380            return None
381        if '_cache_pipe' not in self.__dict__:
382            from meerschaum.config._patch import apply_patch_to_config
383            from meerschaum.utils.sql import sql_item_name
384            _parameters = copy.deepcopy(self.parameters)
385            _fetch_patch = {
386                'fetch': ({
387                    'definition': (
388                        f"SELECT * FROM "
389                        + sql_item_name(
390                            str(self.target),
391                            self.instance_connector.flavor,
392                            self.instance_connector.get_pipe_schema(self),
393                        )
394                    ),
395                }) if self.instance_connector.type == 'sql' else ({
396                    'connector_keys': self.connector_keys,
397                    'metric_key': self.metric_key,
398                    'location_key': self.location_key,
399                })
400            }
401            _parameters = apply_patch_to_config(_parameters, _fetch_patch)
402            self._cache_pipe = Pipe(
403                self.instance_keys,
404                (self.connector_keys + '_' + self.metric_key + '_cache'),
405                self.location_key,
406                mrsm_instance = self.cache_connector,
407                parameters = _parameters,
408                cache = False,
409                temporary = True,
410            )
411
412        return self._cache_pipe

If the pipe was created with cache=True, return another Pipe used to manage the local data.

def fetch( self, begin: Union[datetime.datetime, str, NoneType] = '', end: Optional[datetime.datetime] = None, check_existing: bool = True, sync_chunks: bool = False, debug: bool = False, **kw: Any) -> Union[pandas.core.frame.DataFrame, Iterator[pandas.core.frame.DataFrame], NoneType]:
21def fetch(
22        self,
23        begin: Union[datetime, str, None] = '',
24        end: Optional[datetime] = None,
25        check_existing: bool = True,
26        sync_chunks: bool = False,
27        debug: bool = False,
28        **kw: Any
29    ) -> Union['pd.DataFrame', Iterator['pd.DataFrame'], None]:
30    """
31    Fetch a Pipe's latest data from its connector.
32
33    Parameters
34    ----------
35    begin: Union[datetime, str, None], default '':
36        If provided, only fetch data newer than or equal to `begin`.
37
38    end: Optional[datetime], default None:
39        If provided, only fetch data older than or equal to `end`.
40
41    check_existing: bool, default True
42        If `False`, do not apply the backtrack interval.
43
44    sync_chunks: bool, default False
45        If `True` and the pipe's connector is of type `'sql'`, begin syncing chunks while fetching
46        loads chunks into memory.
47
48    debug: bool, default False
49        Verbosity toggle.
50
51    Returns
52    -------
53    A `pd.DataFrame` of the newest unseen data.
54
55    """
56    if 'fetch' not in dir(self.connector):
57        warn(f"No `fetch()` function defined for connector '{self.connector}'")
58        return None
59
60    from meerschaum.connectors import custom_types, get_connector_plugin
61    from meerschaum.utils.debug import dprint, _checkpoint
62    from meerschaum.utils.misc import filter_arguments
63
64    _chunk_hook = kw.pop('chunk_hook', None)
65    kw['workers'] = self.get_num_workers(kw.get('workers', None))
66    if sync_chunks and _chunk_hook is None:
67
68        def _chunk_hook(chunk, **_kw) -> SuccessTuple:
69            """
70            Wrap `Pipe.sync()` with a custom chunk label prepended to the message.
71            """
72            from meerschaum.config._patch import apply_patch_to_config
73            kwargs = apply_patch_to_config(kw, _kw)
74            chunk_success, chunk_message = self.sync(chunk, **kwargs)
75            chunk_label = self._get_chunk_label(chunk, self.columns.get('datetime', None))
76            if chunk_label:
77                chunk_message = '\n' + chunk_label + '\n' + chunk_message
78            return chunk_success, chunk_message
79
80    with mrsm.Venv(get_connector_plugin(self.connector)):
81        _args, _kwargs = filter_arguments(
82            self.connector.fetch,
83            self,
84            begin=_determine_begin(
85                self,
86                begin,
87                check_existing=check_existing,
88                debug=debug,
89            ),
90            end=end,
91            chunk_hook=_chunk_hook,
92            debug=debug,
93            **kw
94        )
95        df = self.connector.fetch(*_args, **_kwargs)
96    return df

Fetch a Pipe's latest data from its connector.

Parameters
  • begin (Union[datetime, str, None], default '':): If provided, only fetch data newer than or equal to begin.
  • end (Optional[datetime], default None:): If provided, only fetch data older than or equal to end.
  • check_existing (bool, default True): If False, do not apply the backtrack interval.
  • sync_chunks (bool, default False): If True and the pipe's connector is of type 'sql', begin syncing chunks while fetching loads chunks into memory.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pd.DataFrame of the newest unseen data.
def get_backtrack_interval( self, check_existing: bool = True, debug: bool = False) -> Union[datetime.timedelta, int]:
 99def get_backtrack_interval(
100    self,
101    check_existing: bool = True,
102    debug: bool = False,
103) -> Union[timedelta, int]:
104    """
105    Get the chunk interval to use for this pipe.
106
107    Parameters
108    ----------
109    check_existing: bool, default True
110        If `False`, return a backtrack_interval of 0 minutes.
111
112    Returns
113    -------
114    The backtrack interval (`timedelta` or `int`) to use with this pipe's `datetime` axis.
115    """
116    default_backtrack_minutes = get_config('pipes', 'parameters', 'fetch', 'backtrack_minutes')
117    configured_backtrack_minutes = self.parameters.get('fetch', {}).get('backtrack_minutes', None)
118    backtrack_minutes = (
119        configured_backtrack_minutes
120        if configured_backtrack_minutes is not None
121        else default_backtrack_minutes
122    ) if check_existing else 0
123
124    backtrack_interval = timedelta(minutes=backtrack_minutes)
125    dt_col = self.columns.get('datetime', None)
126    if dt_col is None:
127        return backtrack_interval
128
129    dt_dtype = self.dtypes.get(dt_col, 'datetime64[ns]')
130    if 'int' in dt_dtype.lower():
131        return backtrack_minutes
132
133    return backtrack_interval

Get the chunk interval to use for this pipe.

Parameters
  • check_existing (bool, default True): If False, return a backtrack_interval of 0 minutes.
Returns
  • The backtrack interval (timedelta or int) to use with this pipe's datetime axis.
def get_data( self, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, as_iterator: bool = False, as_chunks: bool = False, as_dask: bool = False, chunk_interval: Union[datetime.timedelta, int, NoneType] = None, order: Optional[str] = 'asc', limit: Optional[int] = None, fresh: bool = False, debug: bool = False, **kw: Any) -> Union[pandas.core.frame.DataFrame, Iterator[pandas.core.frame.DataFrame], NoneType]:
 23def get_data(
 24    self,
 25    select_columns: Optional[List[str]] = None,
 26    omit_columns: Optional[List[str]] = None,
 27    begin: Union[datetime, int, None] = None,
 28    end: Union[datetime, int, None] = None,
 29    params: Optional[Dict[str, Any]] = None,
 30    as_iterator: bool = False,
 31    as_chunks: bool = False,
 32    as_dask: bool = False,
 33    chunk_interval: Union[timedelta, int, None] = None,
 34    order: Optional[str] = 'asc',
 35    limit: Optional[int] = None,
 36    fresh: bool = False,
 37    debug: bool = False,
 38    **kw: Any
 39) -> Union['pd.DataFrame', Iterator['pd.DataFrame'], None]:
 40    """
 41    Get a pipe's data from the instance connector.
 42
 43    Parameters
 44    ----------
 45    select_columns: Optional[List[str]], default None
 46        If provided, only select these given columns.
 47        Otherwise select all available columns (i.e. `SELECT *`).
 48
 49    omit_columns: Optional[List[str]], default None
 50        If provided, remove these columns from the selection.
 51
 52    begin: Union[datetime, int, None], default None
 53        Lower bound datetime to begin searching for data (inclusive).
 54        Translates to a `WHERE` clause like `WHERE datetime >= begin`.
 55        Defaults to `None`.
 56
 57    end: Union[datetime, int, None], default None
 58        Upper bound datetime to stop searching for data (inclusive).
 59        Translates to a `WHERE` clause like `WHERE datetime < end`.
 60        Defaults to `None`.
 61
 62    params: Optional[Dict[str, Any]], default None
 63        Filter the retrieved data by a dictionary of parameters.
 64        See `meerschaum.utils.sql.build_where` for more details. 
 65
 66    as_iterator: bool, default False
 67        If `True`, return a generator of chunks of pipe data.
 68
 69    as_chunks: bool, default False
 70        Alias for `as_iterator`.
 71
 72    as_dask: bool, default False
 73        If `True`, return a `dask.DataFrame`
 74        (which may be loaded into a Pandas DataFrame with `df.compute()`).
 75
 76    chunk_interval: Union[timedelta, int, None], default None
 77        If `as_iterator`, then return chunks with `begin` and `end` separated by this interval.
 78        This may be set under `pipe.parameters['chunk_minutes']`.
 79        By default, use a timedelta of 1440 minutes (1 day).
 80        If `chunk_interval` is an integer and the `datetime` axis a timestamp,
 81        the use a timedelta with the number of minutes configured to this value.
 82        If the `datetime` axis is an integer, default to the configured chunksize.
 83        If `chunk_interval` is a `timedelta` and the `datetime` axis an integer,
 84        use the number of minutes in the `timedelta`.
 85
 86    order: Optional[str], default 'asc'
 87        If `order` is not `None`, sort the resulting dataframe by indices.
 88
 89    limit: Optional[int], default None
 90        If provided, cap the dataframe to this many rows.
 91
 92    fresh: bool, default True
 93        If `True`, skip local cache and directly query the instance connector.
 94        Defaults to `True`.
 95
 96    debug: bool, default False
 97        Verbosity toggle.
 98        Defaults to `False`.
 99
100    Returns
101    -------
102    A `pd.DataFrame` for the pipe's data corresponding to the provided parameters.
103
104    """
105    from meerschaum.utils.warnings import warn
106    from meerschaum.utils.venv import Venv
107    from meerschaum.connectors import get_connector_plugin
108    from meerschaum.utils.misc import iterate_chunks, items_str
109    from meerschaum.utils.dtypes import to_pandas_dtype
110    from meerschaum.utils.dataframe import add_missing_cols_to_df, df_is_chunk_generator
111    from meerschaum.utils.packages import attempt_import
112    dd = attempt_import('dask.dataframe') if as_dask else None
113    dask = attempt_import('dask') if as_dask else None
114
115    if select_columns == '*':
116        select_columns = None
117    elif isinstance(select_columns, str):
118        select_columns = [select_columns]
119
120    if isinstance(omit_columns, str):
121        omit_columns = [omit_columns]
122
123    as_iterator = as_iterator or as_chunks
124
125    def _sort_df(_df):
126        if df_is_chunk_generator(_df):
127            return _df
128        dt_col = self.columns.get('datetime', None)
129        indices = [] if dt_col not in _df.columns else [dt_col]
130        non_dt_cols = [
131            col
132            for col_ix, col in self.columns.items()
133            if col_ix != 'datetime' and col in _df.columns
134        ]
135        indices.extend(non_dt_cols)
136        if 'dask' not in _df.__module__:
137            _df.sort_values(
138                by=indices,
139                inplace=True,
140                ascending=(str(order).lower() == 'asc'),
141            )
142            _df.reset_index(drop=True, inplace=True)
143        else:
144            _df = _df.sort_values(
145                by=indices,
146                ascending=(str(order).lower() == 'asc'),
147            )
148            _df = _df.reset_index(drop=True)
149        if limit is not None and len(_df) > limit:
150            return _df.head(limit)
151        return _df
152
153    if as_iterator or as_chunks:
154        df = self._get_data_as_iterator(
155            select_columns=select_columns,
156            omit_columns=omit_columns,
157            begin=begin,
158            end=end,
159            params=params,
160            chunk_interval=chunk_interval,
161            limit=limit,
162            order=order,
163            fresh=fresh,
164            debug=debug,
165        )
166        return _sort_df(df)
167
168    if as_dask:
169        from multiprocessing.pool import ThreadPool
170        dask_pool = ThreadPool(self.get_num_workers())
171        dask.config.set(pool=dask_pool)
172        chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug)
173        bounds = self.get_chunk_bounds(
174            begin=begin,
175            end=end,
176            bounded=False,
177            chunk_interval=chunk_interval,
178            debug=debug,
179        )
180        dask_chunks = [
181            dask.delayed(self.get_data)(
182                select_columns=select_columns,
183                omit_columns=omit_columns,
184                begin=chunk_begin,
185                end=chunk_end,
186                params=params,
187                chunk_interval=chunk_interval,
188                order=order,
189                limit=limit,
190                fresh=fresh,
191                debug=debug,
192            )
193            for (chunk_begin, chunk_end) in bounds
194        ]
195        dask_meta = {
196            col: to_pandas_dtype(typ)
197            for col, typ in self.dtypes.items()
198        }
199        return _sort_df(dd.from_delayed(dask_chunks, meta=dask_meta))
200
201    if not self.exists(debug=debug):
202        return None
203
204    if self.cache_pipe is not None:
205        if not fresh:
206            _sync_cache_tuple = self.cache_pipe.sync(
207                begin=begin,
208                end=end,
209                params=params,
210                debug=debug,
211                **kw
212            )
213            if not _sync_cache_tuple[0]:
214                warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1])
215                fresh = True
216            else: ### Successfully synced cache.
217                return self.enforce_dtypes(
218                    self.cache_pipe.get_data(
219                        select_columns=select_columns,
220                        omit_columns=omit_columns,
221                        begin=begin,
222                        end=end,
223                        params=params,
224                        order=order,
225                        limit=limit,
226                        debug=debug,
227                        fresh=True,
228                        **kw
229                    ),
230                    debug=debug,
231                )
232
233    with Venv(get_connector_plugin(self.instance_connector)):
234        df = self.instance_connector.get_pipe_data(
235            pipe=self,
236            select_columns=select_columns,
237            omit_columns=omit_columns,
238            begin=begin,
239            end=end,
240            params=params,
241            limit=limit,
242            order=order,
243            debug=debug,
244            **kw
245        )
246        if df is None:
247            return df
248
249        if not select_columns:
250            select_columns = [col for col in df.columns]
251
252        cols_to_omit = [
253            col
254            for col in df.columns
255            if (
256                col in (omit_columns or [])
257                or
258                col not in (select_columns or [])
259            )
260        ]
261        cols_to_add = [
262            col
263            for col in select_columns
264            if col not in df.columns
265        ]
266        if cols_to_omit:
267            warn(
268                (
269                    f"Received {len(cols_to_omit)} omitted column"
270                    + ('s' if len(cols_to_omit) != 1 else '')
271                    + f" for {self}. "
272                    + "Consider adding `select_columns` and `omit_columns` support to "
273                    + f"'{self.instance_connector.type}' connectors to improve performance."
274                ),
275                stack=False,
276            )
277            _cols_to_select = [col for col in df.columns if col not in cols_to_omit]
278            df = df[_cols_to_select]
279
280        if cols_to_add:
281            warn(
282                (
283                    f"Specified columns {items_str(cols_to_add)} were not found on {self}. "
284                    + "Adding these to the DataFrame as null columns."
285                ),
286                stack=False,
287            )
288            df = add_missing_cols_to_df(df, {col: 'string' for col in cols_to_add})
289
290        enforced_df = self.enforce_dtypes(df, debug=debug)
291
292        if order:
293            return _sort_df(enforced_df)
294        return enforced_df

Get a pipe's data from the instance connector.

Parameters
  • select_columns (Optional[List[str]], default None): If provided, only select these given columns. Otherwise select all available columns (i.e. SELECT *).
  • omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
  • begin (Union[datetime, int, None], default None): Lower bound datetime to begin searching for data (inclusive). Translates to a WHERE clause like WHERE datetime >= begin. Defaults to None.
  • end (Union[datetime, int, None], default None): Upper bound datetime to stop searching for data (inclusive). Translates to a WHERE clause like WHERE datetime < end. Defaults to None.
  • params (Optional[Dict[str, Any]], default None): Filter the retrieved data by a dictionary of parameters. See meerschaum.utils.sql.build_where for more details.
  • as_iterator (bool, default False): If True, return a generator of chunks of pipe data.
  • as_chunks (bool, default False): Alias for as_iterator.
  • as_dask (bool, default False): If True, return a dask.DataFrame (which may be loaded into a Pandas DataFrame with df.compute()).
  • chunk_interval (Union[timedelta, int, None], default None): If as_iterator, then return chunks with begin and end separated by this interval. This may be set under pipe.parameters['chunk_minutes']. By default, use a timedelta of 1440 minutes (1 day). If chunk_interval is an integer and the datetime axis a timestamp, the use a timedelta with the number of minutes configured to this value. If the datetime axis is an integer, default to the configured chunksize. If chunk_interval is a timedelta and the datetime axis an integer, use the number of minutes in the timedelta.
  • order (Optional[str], default 'asc'): If order is not None, sort the resulting dataframe by indices.
  • limit (Optional[int], default None): If provided, cap the dataframe to this many rows.
  • fresh (bool, default True): If True, skip local cache and directly query the instance connector. Defaults to True.
  • debug (bool, default False): Verbosity toggle. Defaults to False.
Returns
  • A pd.DataFrame for the pipe's data corresponding to the provided parameters.
def get_backtrack_data( self, backtrack_minutes: Optional[int] = None, begin: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, limit: Optional[int] = None, fresh: bool = False, debug: bool = False, **kw: Any) -> Optional[pandas.core.frame.DataFrame]:
395def get_backtrack_data(
396    self,
397    backtrack_minutes: Optional[int] = None,
398    begin: Union[datetime, int, None] = None,
399    params: Optional[Dict[str, Any]] = None,
400    limit: Optional[int] = None,
401    fresh: bool = False,
402    debug: bool = False,
403    **kw: Any
404) -> Optional['pd.DataFrame']:
405    """
406    Get the most recent data from the instance connector as a Pandas DataFrame.
407
408    Parameters
409    ----------
410    backtrack_minutes: Optional[int], default None
411        How many minutes from `begin` to select from.
412        If `None`, use `pipe.parameters['fetch']['backtrack_minutes']`.
413
414    begin: Optional[datetime], default None
415        The starting point to search for data.
416        If begin is `None` (default), use the most recent observed datetime
417        (AKA sync_time).
418
419        ```
420        E.g. begin = 02:00
421
422        Search this region.           Ignore this, even if there's data.
423        /  /  /  /  /  /  /  /  /  |
424        -----|----------|----------|----------|----------|----------|
425        00:00      01:00      02:00      03:00      04:00      05:00
426
427        ```
428
429    params: Optional[Dict[str, Any]], default None
430        The standard Meerschaum `params` query dictionary.
431
432    limit: Optional[int], default None
433        If provided, cap the number of rows to be returned.
434
435    fresh: bool, default False
436        If `True`, Ignore local cache and pull directly from the instance connector.
437        Only comes into effect if a pipe was created with `cache=True`.
438
439    debug: bool default False
440        Verbosity toggle.
441
442    Returns
443    -------
444    A `pd.DataFrame` for the pipe's data corresponding to the provided parameters. Backtrack data
445    is a convenient way to get a pipe's data "backtracked" from the most recent datetime.
446    """
447    from meerschaum.utils.warnings import warn
448    from meerschaum.utils.venv import Venv
449    from meerschaum.connectors import get_connector_plugin
450
451    if not self.exists(debug=debug):
452        return None
453
454    backtrack_interval = self.get_backtrack_interval(debug=debug)
455    if backtrack_minutes is None:
456        backtrack_minutes = (
457            (backtrack_interval.total_seconds() * 60)
458            if isinstance(backtrack_interval, timedelta)
459            else backtrack_interval
460        )
461
462    if self.cache_pipe is not None:
463        if not fresh:
464            _sync_cache_tuple = self.cache_pipe.sync(begin=begin, params=params, debug=debug, **kw)
465            if not _sync_cache_tuple[0]:
466                warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1])
467                fresh = True
468            else: ### Successfully synced cache.
469                return self.enforce_dtypes(
470                    self.cache_pipe.get_backtrack_data(
471                        fresh=True,
472                        begin=begin,
473                        backtrack_minutes=backtrack_minutes,
474                        params=params,
475                        limit=limit,
476                        order=kw.get('order', 'desc'),
477                        debug=debug,
478                        **kw
479                    ),
480                    debug=debug,
481                )
482
483    if hasattr(self.instance_connector, 'get_backtrack_data'):
484        with Venv(get_connector_plugin(self.instance_connector)):
485            return self.enforce_dtypes(
486                self.instance_connector.get_backtrack_data(
487                    pipe=self,
488                    begin=begin,
489                    backtrack_minutes=backtrack_minutes,
490                    params=params,
491                    limit=limit,
492                    debug=debug,
493                    **kw
494                ),
495                debug=debug,
496            )
497
498    if begin is None:
499        begin = self.get_sync_time(params=params, debug=debug)
500
501    backtrack_interval = (
502        timedelta(minutes=backtrack_minutes)
503        if isinstance(begin, datetime)
504        else backtrack_minutes
505    )
506    if begin is not None:
507        begin = begin - backtrack_interval
508
509    return self.get_data(
510        begin=begin,
511        params=params,
512        debug=debug,
513        limit=limit,
514        order=kw.get('order', 'desc'),
515        **kw
516    )

Get the most recent data from the instance connector as a Pandas DataFrame.

Parameters
  • backtrack_minutes (Optional[int], default None): How many minutes from begin to select from. If None, use pipe.parameters['fetch']['backtrack_minutes'].
  • begin (Optional[datetime], default None): The starting point to search for data. If begin is None (default), use the most recent observed datetime (AKA sync_time).

    E.g. begin = 02:00
    
    Search this region.           Ignore this, even if there's data.
    /  /  /  /  /  /  /  /  /  |
    -----|----------|----------|----------|----------|----------|
    00:00      01:00      02:00      03:00      04:00      05:00
    
    
  • params (Optional[Dict[str, Any]], default None): The standard Meerschaum params query dictionary.

  • limit (Optional[int], default None): If provided, cap the number of rows to be returned.
  • fresh (bool, default False): If True, Ignore local cache and pull directly from the instance connector. Only comes into effect if a pipe was created with cache=True.
  • debug (bool default False): Verbosity toggle.
Returns
  • A pd.DataFrame for the pipe's data corresponding to the provided parameters. Backtrack data
  • is a convenient way to get a pipe's data "backtracked" from the most recent datetime.
def get_rowcount( self, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, params: Optional[Dict[str, Any]] = None, remote: bool = False, debug: bool = False) -> int:
520def get_rowcount(
521        self,
522        begin: Optional[datetime] = None,
523        end: Optional['datetime'] = None,
524        params: Optional[Dict[str, Any]] = None,
525        remote: bool = False,
526        debug: bool = False
527    ) -> int:
528    """
529    Get a Pipe's instance or remote rowcount.
530
531    Parameters
532    ----------
533    begin: Optional[datetime], default None
534        Count rows where datetime > begin.
535
536    end: Optional[datetime], default None
537        Count rows where datetime < end.
538
539    remote: bool, default False
540        Count rows from a pipe's remote source.
541        **NOTE**: This is experimental!
542
543    debug: bool, default False
544        Verbosity toggle.
545
546    Returns
547    -------
548    An `int` of the number of rows in the pipe corresponding to the provided parameters.
549    Returned 0 if the pipe does not exist.
550    """
551    from meerschaum.utils.warnings import warn
552    from meerschaum.utils.venv import Venv
553    from meerschaum.connectors import get_connector_plugin
554
555    connector = self.instance_connector if not remote else self.connector
556    try:
557        with Venv(get_connector_plugin(connector)):
558            rowcount = connector.get_pipe_rowcount(
559                self,
560                begin = begin,
561                end = end,
562                params = params,
563                remote = remote,
564                debug = debug,
565            )
566            if rowcount is None:
567                return 0
568            return rowcount
569    except AttributeError as e:
570        warn(e)
571        if remote:
572            return 0
573    warn(f"Failed to get a rowcount for {self}.")
574    return 0

Get a Pipe's instance or remote rowcount.

Parameters
  • begin (Optional[datetime], default None): Count rows where datetime > begin.
  • end (Optional[datetime], default None): Count rows where datetime < end.
  • remote (bool, default False): Count rows from a pipe's remote source. NOTE: This is experimental!
  • debug (bool, default False): Verbosity toggle.
Returns
  • An int of the number of rows in the pipe corresponding to the provided parameters.
  • Returned 0 if the pipe does not exist.
def get_chunk_interval( self, chunk_interval: Union[datetime.timedelta, int, NoneType] = None, debug: bool = False) -> Union[datetime.timedelta, int]:
577def get_chunk_interval(
578        self,
579        chunk_interval: Union[timedelta, int, None] = None,
580        debug: bool = False,
581    ) -> Union[timedelta, int]:
582    """
583    Get the chunk interval to use for this pipe.
584
585    Parameters
586    ----------
587    chunk_interval: Union[timedelta, int, None], default None
588        If provided, coerce this value into the correct type.
589        For example, if the datetime axis is an integer, then
590        return the number of minutes.
591
592    Returns
593    -------
594    The chunk interval (`timedelta` or `int`) to use with this pipe's `datetime` axis.
595    """
596    default_chunk_minutes = get_config('pipes', 'parameters', 'verify', 'chunk_minutes')
597    configured_chunk_minutes = self.parameters.get('verify', {}).get('chunk_minutes', None)
598    chunk_minutes = (
599        (configured_chunk_minutes or default_chunk_minutes)
600        if chunk_interval is None
601        else (
602            chunk_interval
603            if isinstance(chunk_interval, int)
604            else int(chunk_interval.total_seconds() / 60)
605        )
606    )
607
608    dt_col = self.columns.get('datetime', None)
609    if dt_col is None:
610        return timedelta(minutes=chunk_minutes)
611
612    dt_dtype = self.dtypes.get(dt_col, 'datetime64[ns]')
613    if 'int' in dt_dtype.lower():
614        return chunk_minutes
615    return timedelta(minutes=chunk_minutes)

Get the chunk interval to use for this pipe.

Parameters
  • chunk_interval (Union[timedelta, int, None], default None): If provided, coerce this value into the correct type. For example, if the datetime axis is an integer, then return the number of minutes.
Returns
  • The chunk interval (timedelta or int) to use with this pipe's datetime axis.
def get_chunk_bounds( self, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, bounded: bool = False, chunk_interval: Union[datetime.timedelta, int, NoneType] = None, debug: bool = False) -> List[Tuple[Union[datetime.datetime, int, NoneType], Union[datetime.datetime, int, NoneType]]]:
618def get_chunk_bounds(
619        self,
620        begin: Union[datetime, int, None] = None,
621        end: Union[datetime, int, None] = None,
622        bounded: bool = False,
623        chunk_interval: Union[timedelta, int, None] = None,
624        debug: bool = False,
625    ) -> List[
626        Tuple[
627            Union[datetime, int, None],
628            Union[datetime, int, None],
629        ]
630    ]:
631    """
632    Return a list of datetime bounds for iterating over the pipe's `datetime` axis.
633
634    Parameters
635    ----------
636    begin: Union[datetime, int, None], default None
637        If provided, do not select less than this value.
638        Otherwise the first chunk will be unbounded.
639
640    end: Union[datetime, int, None], default None
641        If provided, do not select greater than or equal to this value.
642        Otherwise the last chunk will be unbounded.
643
644    bounded: bool, default False
645        If `True`, do not include `None` in the first chunk.
646
647    chunk_interval: Union[timedelta, int, None], default None
648        If provided, use this interval for the size of chunk boundaries.
649        The default value for this pipe may be set
650        under `pipe.parameters['verify']['chunk_minutes']`.
651
652    debug: bool, default False
653        Verbosity toggle.
654
655    Returns
656    -------
657    A list of chunk bounds (datetimes or integers).
658    If unbounded, the first and last chunks will include `None`.
659    """
660    include_less_than_begin = not bounded and begin is None
661    include_greater_than_end = not bounded and end is None
662    if begin is None:
663        begin = self.get_sync_time(newest=False, debug=debug)
664    if end is None:
665        end = self.get_sync_time(newest=True, debug=debug)
666    if begin is None and end is None:
667        return [(None, None)]
668
669    ### Set the chunk interval under `pipe.parameters['verify']['chunk_minutes']`.
670    chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug)
671    
672    ### Build a list of tuples containing the chunk boundaries
673    ### so that we can sync multiple chunks in parallel.
674    ### Run `verify pipes --workers 1` to sync chunks in series.
675    chunk_bounds = []
676    begin_cursor = begin
677    while begin_cursor < end:
678        end_cursor = begin_cursor + chunk_interval
679        chunk_bounds.append((begin_cursor, end_cursor))
680        begin_cursor = end_cursor
681
682    ### The chunk interval might be too large.
683    if not chunk_bounds and end >= begin:
684        chunk_bounds = [(begin, end)]
685
686    ### Truncate the last chunk to the end timestamp.
687    if chunk_bounds[-1][1] > end:
688        chunk_bounds[-1] = (chunk_bounds[-1][0], end)
689
690    ### Pop the last chunk if its bounds are equal.
691    if chunk_bounds[-1][0] == chunk_bounds[-1][1]:
692        chunk_bounds = chunk_bounds[:-1]
693
694    if include_less_than_begin:
695        chunk_bounds = [(None, begin)] + chunk_bounds
696    if include_greater_than_end:
697        chunk_bounds = chunk_bounds + [(end, None)]
698
699    return chunk_bounds

Return a list of datetime bounds for iterating over the pipe's datetime axis.

Parameters
  • begin (Union[datetime, int, None], default None): If provided, do not select less than this value. Otherwise the first chunk will be unbounded.
  • end (Union[datetime, int, None], default None): If provided, do not select greater than or equal to this value. Otherwise the last chunk will be unbounded.
  • bounded (bool, default False): If True, do not include None in the first chunk.
  • chunk_interval (Union[timedelta, int, None], default None): If provided, use this interval for the size of chunk boundaries. The default value for this pipe may be set under pipe.parameters['verify']['chunk_minutes'].
  • debug (bool, default False): Verbosity toggle.
Returns
  • A list of chunk bounds (datetimes or integers).
  • If unbounded, the first and last chunks will include None.
def register(self, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
12def register(
13        self,
14        debug: bool = False,
15        **kw: Any
16    ) -> SuccessTuple:
17    """
18    Register a new Pipe along with its attributes.
19
20    Parameters
21    ----------
22    debug: bool, default False
23        Verbosity toggle.
24
25    kw: Any
26        Keyword arguments to pass to `instance_connector.register_pipe()`.
27
28    Returns
29    -------
30    A `SuccessTuple` of success, message.
31    """
32    if self.temporary:
33        return False, "Cannot register pipes created with `temporary=True` (read-only)."
34
35    from meerschaum.utils.formatting import get_console
36    from meerschaum.utils.venv import Venv
37    from meerschaum.connectors import get_connector_plugin, custom_types
38    from meerschaum.config._patch import apply_patch_to_config
39
40    import warnings
41    with warnings.catch_warnings():
42        warnings.simplefilter('ignore')
43        try:
44            _conn = self.connector
45        except Exception as e:
46            _conn = None
47
48    if (
49        _conn is not None
50        and
51        (_conn.type == 'plugin' or _conn.type in custom_types)
52        and
53        getattr(_conn, 'register', None) is not None
54    ):
55        try:
56            with Venv(get_connector_plugin(_conn), debug=debug):
57                params = self.connector.register(self)
58        except Exception as e:
59            get_console().print_exception()
60            params = None
61        params = {} if params is None else params
62        if not isinstance(params, dict):
63            from meerschaum.utils.warnings import warn
64            warn(
65                f"Invalid parameters returned from `register()` in connector {self.connector}:\n"
66                + f"{params}"
67            )
68        else:
69            self.parameters = apply_patch_to_config(params, self.parameters)
70
71    if not self.parameters:
72        cols = self.columns if self.columns else {'datetime': None, 'id': None}
73        self.parameters = {
74            'columns': cols,
75        }
76
77    with Venv(get_connector_plugin(self.instance_connector)):
78        return self.instance_connector.register_pipe(self, debug=debug, **kw)

Register a new Pipe along with its attributes.

Parameters
  • debug (bool, default False): Verbosity toggle.
  • kw (Any): Keyword arguments to pass to instance_connector.register_pipe().
Returns
attributes: Dict[str, Any]
14@property
15def attributes(self) -> Dict[str, Any]:
16    """
17    Return a dictionary of a pipe's keys and parameters.
18    These values are reflected directly from the pipes table of the instance.
19    """
20    import time
21    from meerschaum.config import get_config
22    from meerschaum.config._patch import apply_patch_to_config
23    from meerschaum.utils.venv import Venv
24    from meerschaum.connectors import get_connector_plugin
25
26    timeout_seconds = get_config('pipes', 'attributes', 'local_cache_timeout_seconds')
27
28    if '_attributes' not in self.__dict__:
29        self._attributes = {}
30
31    now = time.perf_counter()
32    last_refresh = self.__dict__.get('_attributes_sync_time', None)
33    timed_out = (
34        last_refresh is None
35        or
36        (timeout_seconds is not None and (now - last_refresh) >= timeout_seconds)
37    )
38    if not self.temporary and timed_out:
39        self._attributes_sync_time = now
40        local_attributes = self.__dict__.get('_attributes', {})
41        with Venv(get_connector_plugin(self.instance_connector)):
42            instance_attributes = self.instance_connector.get_pipe_attributes(self)
43        self._attributes = apply_patch_to_config(instance_attributes, local_attributes)
44    return self._attributes

Return a dictionary of a pipe's keys and parameters. These values are reflected directly from the pipes table of the instance.

parameters: Optional[Dict[str, Any]]
47@property
48def parameters(self) -> Optional[Dict[str, Any]]:
49    """
50    Return the parameters dictionary of the pipe.
51    """
52    if 'parameters' not in self.attributes:
53        self.attributes['parameters'] = {}
54    return self.attributes['parameters']

Return the parameters dictionary of the pipe.

columns: Optional[Dict[str, str]]
66@property
67def columns(self) -> Union[Dict[str, str], None]:
68    """
69    Return the `columns` dictionary defined in `meerschaum.Pipe.parameters`.
70    """
71    if 'columns' not in self.parameters:
72        self.parameters['columns'] = {}
73    cols = self.parameters['columns']
74    if not isinstance(cols, dict):
75        cols = {}
76        self.parameters['columns'] = cols
77    return cols

Return the columns dictionary defined in Pipe.parameters.

dtypes: Optional[Dict[str, Any]]
117@property
118def dtypes(self) -> Union[Dict[str, Any], None]:
119    """
120    If defined, return the `dtypes` dictionary defined in `meerschaum.Pipe.parameters`.
121    """
122    from meerschaum.config._patch import apply_patch_to_config
123    configured_dtypes = self.parameters.get('dtypes', {})
124    remote_dtypes = self.infer_dtypes(persist=False)
125    patched_dtypes = apply_patch_to_config(remote_dtypes, configured_dtypes)
126    return patched_dtypes

If defined, return the dtypes dictionary defined in Pipe.parameters.

def get_columns(self, *args: str, error: bool = False) -> Union[str, Tuple[str]]:
138def get_columns(self, *args: str, error: bool = False) -> Union[str, Tuple[str]]:
139    """
140    Check if the requested columns are defined.
141
142    Parameters
143    ----------
144    *args: str
145        The column names to be retrieved.
146        
147    error: bool, default False
148        If `True`, raise an `Exception` if the specified column is not defined.
149
150    Returns
151    -------
152    A tuple of the same size of `args` or a `str` if `args` is a single argument.
153
154    Examples
155    --------
156    >>> pipe = mrsm.Pipe('test', 'test')
157    >>> pipe.columns = {'datetime': 'dt', 'id': 'id'}
158    >>> pipe.get_columns('datetime', 'id')
159    ('dt', 'id')
160    >>> pipe.get_columns('value', error=True)
161    Exception:  🛑 Missing 'value' column for Pipe('test', 'test').
162    """
163    from meerschaum.utils.warnings import error as _error, warn
164    if not args:
165        args = tuple(self.columns.keys())
166    col_names = []
167    for col in args:
168        col_name = None
169        try:
170            col_name = self.columns[col]
171            if col_name is None and error:
172                _error(f"Please define the name of the '{col}' column for {self}.")
173        except Exception as e:
174            col_name = None
175        if col_name is None and error:
176            _error(f"Missing '{col}'" + f" column for {self}.")
177        col_names.append(col_name)
178    if len(col_names) == 1:
179        return col_names[0]
180    return tuple(col_names)

Check if the requested columns are defined.

Parameters
  • *args (str): The column names to be retrieved.
  • error (bool, default False): If True, raise an Exception if the specified column is not defined.
Returns
  • A tuple of the same size of args or a str if args is a single argument.
Examples
>>> pipe = mrsm.Pipe('test', 'test')
>>> pipe.columns = {'datetime': 'dt', 'id': 'id'}
>>> pipe.get_columns('datetime', 'id')
('dt', 'id')
>>> pipe.get_columns('value', error=True)
Exception:  🛑 Missing 'value' column for Pipe('test', 'test').
def get_columns_types(self, debug: bool = False) -> Optional[Dict[str, str]]:
183def get_columns_types(self, debug: bool = False) -> Union[Dict[str, str], None]:
184    """
185    Get a dictionary of a pipe's column names and their types.
186
187    Parameters
188    ----------
189    debug: bool, default False:
190        Verbosity toggle.
191
192    Returns
193    -------
194    A dictionary of column names (`str`) to column types (`str`).
195
196    Examples
197    --------
198    >>> pipe.get_columns_types()
199    {
200      'dt': 'TIMESTAMP WITHOUT TIMEZONE',
201      'id': 'BIGINT',
202      'val': 'DOUBLE PRECISION',
203    }
204    >>>
205    """
206    from meerschaum.utils.venv import Venv
207    from meerschaum.connectors import get_connector_plugin
208
209    with Venv(get_connector_plugin(self.instance_connector)):
210        return self.instance_connector.get_pipe_columns_types(self, debug=debug)

Get a dictionary of a pipe's column names and their types.

Parameters
  • debug (bool, default False:): Verbosity toggle.
Returns
  • A dictionary of column names (str) to column types (str).
Examples
>>> pipe.get_columns_types()
{
  'dt': 'TIMESTAMP WITHOUT TIMEZONE',
  'id': 'BIGINT',
  'val': 'DOUBLE PRECISION',
}
>>>
def get_indices(self) -> Dict[str, str]:
435def get_indices(self) -> Dict[str, str]:
436    """
437    Return a dictionary in the form of `pipe.columns` but map to index names.
438    """
439    return {
440        ix: (self.target + '_' + col + '_index')
441        for ix, col in self.columns.items() if col
442    }

Return a dictionary in the form of pipe.columns but map to index names.

tags: Optional[List[str]]
92@property
93def tags(self) -> Union[List[str], None]:
94    """
95    If defined, return the `tags` list defined in `meerschaum.Pipe.parameters`.
96    """
97    if 'tags' not in self.parameters:
98        self.parameters['tags'] = []
99    return self.parameters['tags']

If defined, return the tags list defined in Pipe.parameters.

def get_id(self, **kw: Any) -> Optional[int]:
213def get_id(self, **kw: Any) -> Union[int, None]:
214    """
215    Fetch a pipe's ID from its instance connector.
216    If the pipe does not exist, return `None`.
217    """
218    if self.temporary:
219        return None
220    from meerschaum.utils.venv import Venv
221    from meerschaum.connectors import get_connector_plugin
222
223    with Venv(get_connector_plugin(self.instance_connector)):
224        return self.instance_connector.get_pipe_id(self, **kw)

Fetch a pipe's ID from its instance connector. If the pipe does not exist, return None.

id: Optional[int]
227@property
228def id(self) -> Union[int, None]:
229    """
230    Fetch and cache a pipe's ID.
231    """
232    if not ('_id' in self.__dict__ and self._id):
233        self._id = self.get_id()
234    return self._id

Fetch and cache a pipe's ID.

def get_val_column(self, debug: bool = False) -> Optional[str]:
237def get_val_column(self, debug: bool = False) -> Union[str, None]:
238    """
239    Return the name of the value column if it's defined, otherwise make an educated guess.
240    If not set in the `columns` dictionary, return the first numeric column that is not
241    an ID or datetime column.
242    If none may be found, return `None`.
243
244    Parameters
245    ----------
246    debug: bool, default False:
247        Verbosity toggle.
248
249    Returns
250    -------
251    Either a string or `None`.
252    """
253    from meerschaum.utils.debug import dprint
254    if debug:
255        dprint('Attempting to determine the value column...')
256    try:
257        val_name = self.get_columns('value')
258    except Exception as e:
259        val_name = None
260    if val_name is not None:
261        if debug:
262            dprint(f"Value column: {val_name}")
263        return val_name
264
265    cols = self.columns
266    if cols is None:
267        if debug:
268            dprint('No columns could be determined. Returning...')
269        return None
270    try:
271        dt_name = self.get_columns('datetime', error=False)
272    except Exception as e:
273        dt_name = None
274    try:
275        id_name = self.get_columns('id', errors=False)
276    except Exception as e:
277        id_name = None
278
279    if debug:
280        dprint(f"dt_name: {dt_name}")
281        dprint(f"id_name: {id_name}")
282
283    cols_types = self.get_columns_types(debug=debug)
284    if cols_types is None:
285        return None
286    if debug:
287        dprint(f"cols_types: {cols_types}")
288    if dt_name is not None:
289        cols_types.pop(dt_name, None)
290    if id_name is not None:
291        cols_types.pop(id_name, None)
292
293    candidates = []
294    candidate_keywords = {'float', 'double', 'precision', 'int', 'numeric',}
295    for search_term in candidate_keywords:
296        for col, typ in cols_types.items():
297            if search_term in typ.lower():
298                candidates.append(col)
299                break
300    if not candidates:
301        if debug:
302            dprint(f"No value column could be determined.")
303        return None
304
305    return candidates[0]

Return the name of the value column if it's defined, otherwise make an educated guess. If not set in the columns dictionary, return the first numeric column that is not an ID or datetime column. If none may be found, return None.

Parameters
  • debug (bool, default False:): Verbosity toggle.
Returns
  • Either a string or None.
parents: List[Pipe]
308@property
309def parents(self) -> List[meerschaum.Pipe]:
310    """
311    Return a list of `meerschaum.Pipe` objects to be designated as parents.
312    """
313    if 'parents' not in self.parameters:
314        return []
315    from meerschaum.utils.warnings import warn
316    _parents_keys = self.parameters['parents']
317    if not isinstance(_parents_keys, list):
318        warn(
319            f"Please ensure the parents for {self} are defined as a list of keys.",
320            stacklevel = 4
321        )
322        return []
323    from meerschaum import Pipe
324    _parents = []
325    for keys in _parents_keys:
326        try:
327            p = Pipe(**keys)
328        except Exception as e:
329            warn(f"Unable to build parent with keys '{keys}' for {self}:\n{e}")
330            continue
331        _parents.append(p)
332    return _parents

Return a list of Pipe objects to be designated as parents.

children: List[Pipe]
335@property
336def children(self) -> List[meerschaum.Pipe]:
337    """
338    Return a list of `meerschaum.Pipe` objects to be designated as children.
339    """
340    if 'children' not in self.parameters:
341        return []
342    from meerschaum.utils.warnings import warn
343    _children_keys = self.parameters['children']
344    if not isinstance(_children_keys, list):
345        warn(
346            f"Please ensure the children for {self} are defined as a list of keys.",
347            stacklevel = 4
348        )
349        return []
350    from meerschaum import Pipe
351    _children = []
352    for keys in _children_keys:
353        try:
354            p = Pipe(**keys)
355        except Exception as e:
356            warn(f"Unable to build parent with keys '{keys}' for {self}:\n{e}")
357            continue
358        _children.append(p)
359    return _children

Return a list of Pipe objects to be designated as children.

target: str
362@property
363def target(self) -> str:
364    """
365    The target table name.
366    You can set the target name under on of the following keys
367    (checked in this order):
368      - `target`
369      - `target_name`
370      - `target_table`
371      - `target_table_name`
372    """
373    if 'target' not in self.parameters:
374        target = self._target_legacy()
375        potential_keys = ('target_name', 'target_table', 'target_table_name')
376        for k in potential_keys:
377            if k in self.parameters:
378                target = self.parameters[k]
379                break
380
381        if self.instance_connector.type == 'sql':
382            from meerschaum.utils.sql import truncate_item_name
383            truncated_target = truncate_item_name(target, self.instance_connector.flavor)
384            if truncated_target != target:
385                warn(
386                    f"The target '{target}' is too long for '{self.instance_connector.flavor}', "
387                    + f"will use {truncated_target} instead."
388                )
389                target = truncated_target
390
391        self.target = target
392    return self.parameters['target']

The target table name. You can set the target name under on of the following keys (checked in this order):

  • target
  • target_name
  • target_table
  • target_table_name
def guess_datetime(self) -> Optional[str]:
415def guess_datetime(self) -> Union[str, None]:
416    """
417    Try to determine a pipe's datetime column.
418    """
419    dtypes = self.dtypes
420
421    ### Abort if the user explictly disallows a datetime index.
422    if 'datetime' in dtypes:
423        if dtypes['datetime'] is None:
424            return None
425
426    dt_cols = [
427        col for col, typ in self.dtypes.items()
428        if str(typ).startswith('datetime')
429    ]
430    if not dt_cols:
431        return None
432    return dt_cols[0]    

Try to determine a pipe's datetime column.

def show( self, nopretty: bool = False, debug: bool = False, **kw) -> Tuple[bool, str]:
12def show(
13        self,
14        nopretty: bool = False,
15        debug: bool = False,
16        **kw
17    ) -> SuccessTuple:
18    """
19    Show attributes of a Pipe.
20
21    Parameters
22    ----------
23    nopretty: bool, default False
24        If `True`, simply print the JSON of the pipe's attributes.
25
26    debug: bool, default False
27        Verbosity toggle.
28
29    Returns
30    -------
31    A `SuccessTuple` of success, message.
32
33    """
34    import json
35    from meerschaum.utils.formatting import (
36        pprint, make_header, ANSI, highlight_pipes, fill_ansi, get_console,
37    )
38    from meerschaum.utils.packages import import_rich, attempt_import
39    from meerschaum.utils.warnings import info
40    attributes_json = json.dumps(self.attributes)
41    if not nopretty:
42        _to_print = f"Attributes for {self}:"
43        if ANSI:
44            _to_print = fill_ansi(highlight_pipes(make_header(_to_print)), 'magenta')
45            print(_to_print)
46            rich = import_rich()
47            rich_json = attempt_import('rich.json')
48            get_console().print(rich_json.JSON(attributes_json))
49        else:
50            print(_to_print)
51    else:
52        print(attributes_json)
53
54    return True, "Success"

Show attributes of a Pipe.

Parameters
  • nopretty (bool, default False): If True, simply print the JSON of the pipe's attributes.
  • debug (bool, default False): Verbosity toggle.
Returns
def edit( self, patch: bool = False, interactive: bool = False, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
21def edit(
22    self,
23    patch: bool = False,
24    interactive: bool = False,
25    debug: bool = False,
26    **kw: Any
27) -> SuccessTuple:
28    """
29    Edit a Pipe's configuration.
30
31    Parameters
32    ----------
33    patch: bool, default False
34        If `patch` is True, update parameters by cascading rather than overwriting.
35    interactive: bool, default False
36        If `True`, open an editor for the user to make changes to the pipe's YAML file.
37    debug: bool, default False
38        Verbosity toggle.
39
40    Returns
41    -------
42    A `SuccessTuple` of success, message.
43
44    """
45    from meerschaum.utils.venv import Venv
46    from meerschaum.connectors import get_connector_plugin
47
48    if self.temporary:
49        return False, "Cannot edit pipes created with `temporary=True` (read-only)."
50
51    if not interactive:
52        with Venv(get_connector_plugin(self.instance_connector)):
53            return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw)
54
55    from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
56    from meerschaum.utils.misc import edit_file
57    parameters_filename = str(self) + '.yaml'
58    parameters_path = PIPES_CACHE_RESOURCES_PATH / parameters_filename
59
60    from meerschaum.utils.yaml import yaml
61
62    edit_text = f"Edit the parameters for {self}"
63    edit_top = '#' * (len(edit_text) + 4)
64    edit_header = edit_top + f'\n# {edit_text} #\n' + edit_top + '\n\n'
65
66    from meerschaum.config import get_config
67    parameters = dict(get_config('pipes', 'parameters', patch=True))
68    from meerschaum.config._patch import apply_patch_to_config
69    parameters = apply_patch_to_config(parameters, self.parameters)
70
71    ### write parameters to yaml file
72    with open(parameters_path, 'w+') as f:
73        f.write(edit_header)
74        yaml.dump(parameters, stream=f, sort_keys=False)
75
76    ### only quit editing if yaml is valid
77    editing = True
78    while editing:
79        edit_file(parameters_path)
80        try:
81            with open(parameters_path, 'r') as f:
82                file_parameters = yaml.load(f.read())
83        except Exception as e:
84            from meerschaum.utils.warnings import warn
85            warn(f"Invalid format defined for '{self}':\n\n{e}")
86            input(f"Press [Enter] to correct the configuration for '{self}': ")
87        else:
88            editing = False
89
90    self.parameters = file_parameters
91
92    if debug:
93        from meerschaum.utils.formatting import pprint
94        pprint(self.parameters)
95
96    with Venv(get_connector_plugin(self.instance_connector)):
97        return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw)

Edit a Pipe's configuration.

Parameters
  • patch (bool, default False): If patch is True, update parameters by cascading rather than overwriting.
  • interactive (bool, default False): If True, open an editor for the user to make changes to the pipe's YAML file.
  • debug (bool, default False): Verbosity toggle.
Returns
def edit_definition( self, yes: bool = False, noask: bool = False, force: bool = False, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
100def edit_definition(
101    self,
102    yes: bool = False,
103    noask: bool = False,
104    force: bool = False,
105    debug : bool = False,
106    **kw : Any
107) -> SuccessTuple:
108    """
109    Edit a pipe's definition file and update its configuration.
110    **NOTE:** This function is interactive and should not be used in automated scripts!
111
112    Returns
113    -------
114    A `SuccessTuple` of success, message.
115
116    """
117    if self.temporary:
118        return False, "Cannot edit pipes created with `temporary=True` (read-only)."
119
120    from meerschaum.connectors import instance_types
121    if (self.connector is None) or self.connector.type not in instance_types:
122        return self.edit(interactive=True, debug=debug, **kw)
123
124    import json
125    from meerschaum.utils.warnings import info, warn
126    from meerschaum.utils.debug import dprint
127    from meerschaum.config._patch import apply_patch_to_config
128    from meerschaum.utils.misc import edit_file
129
130    _parameters = self.parameters
131    if 'fetch' not in _parameters:
132        _parameters['fetch'] = {}
133
134    def _edit_api():
135        from meerschaum.utils.prompt import prompt, yes_no
136        info(
137            f"Please enter the keys of the source pipe from '{self.connector}'.\n" +
138            "Type 'None' for None, or empty when there is no default. Press [CTRL+C] to skip."
139        )
140
141        _keys = { 'connector_keys' : None, 'metric_key' : None, 'location_key' : None }
142        for k in _keys:
143            _keys[k] = _parameters['fetch'].get(k, None)
144
145        for k, v in _keys.items():
146            try:
147                _keys[k] = prompt(k.capitalize().replace('_', ' ') + ':', icon=True, default=v)
148            except KeyboardInterrupt:
149                continue
150            if _keys[k] in ('', 'None', '\'None\'', '[None]'):
151                _keys[k] = None
152
153        _parameters['fetch'] = apply_patch_to_config(_parameters['fetch'], _keys)
154
155        info("You may optionally specify additional filter parameters as JSON.")
156        print("  Parameters are translated into a 'WHERE x AND y' clause, and lists are IN clauses.")
157        print("  For example, the following JSON would correspond to 'WHERE x = 1 AND y IN (2, 3)':")
158        print(json.dumps({'x': 1, 'y': [2, 3]}, indent=2, separators=(',', ': ')))
159        if force or yes_no(
160            "Would you like to add additional filter parameters?",
161            yes=yes, noask=noask
162        ):
163            from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
164            definition_filename = str(self) + '.json'
165            definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename
166            try:
167                definition_path.touch()
168                with open(definition_path, 'w+') as f:
169                    json.dump(_parameters.get('fetch', {}).get('params', {}), f, indent=2)
170            except Exception as e:
171                return False, f"Failed writing file '{definition_path}':\n" + str(e)
172
173            _params = None
174            while True:
175                edit_file(definition_path)
176                try:
177                    with open(definition_path, 'r') as f:
178                        _params = json.load(f)
179                except Exception as e:
180                    warn(f'Failed to read parameters JSON:\n{e}', stack=False)
181                    if force or yes_no(
182                        "Would you like to try again?\n  "
183                        + "If not, the parameters JSON file will be ignored.",
184                        noask=noask, yes=yes
185                    ):
186                        continue
187                    _params = None
188                break
189            if _params is not None:
190                if 'fetch' not in _parameters:
191                    _parameters['fetch'] = {}
192                _parameters['fetch']['params'] = _params
193
194        self.parameters = _parameters
195        return True, "Success"
196
197    def _edit_sql():
198        import pathlib, os, textwrap
199        from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
200        from meerschaum.utils.misc import edit_file
201        definition_filename = str(self) + '.sql'
202        definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename
203
204        sql_definition = _parameters['fetch'].get('definition', None)
205        if sql_definition is None:
206            sql_definition = ''
207        sql_definition = textwrap.dedent(sql_definition).lstrip()
208
209        try:
210            definition_path.touch()
211            with open(definition_path, 'w+') as f:
212                f.write(sql_definition)
213        except Exception as e:
214            return False, f"Failed writing file '{definition_path}':\n" + str(e)
215
216        edit_file(definition_path)
217        try:
218            with open(definition_path, 'r') as f:
219                file_definition = f.read()
220        except Exception as e:
221            return False, f"Failed reading file '{definition_path}':\n" + str(e)
222
223        if sql_definition == file_definition:
224            return False, f"No changes made to definition for {self}."
225
226        if ' ' not in file_definition:
227            return False, f"Invalid SQL definition for {self}."
228
229        if debug:
230            dprint("Read SQL definition:\n\n" + file_definition)
231        _parameters['fetch']['definition'] = file_definition
232        self.parameters = _parameters
233        return True, "Success"
234
235    locals()['_edit_' + str(self.connector.type)]()
236    return self.edit(interactive=False, debug=debug, **kw)

Edit a pipe's definition file and update its configuration. NOTE: This function is interactive and should not be used in automated scripts!

Returns
def update(self, *args, **kw) -> Tuple[bool, str]:
13def update(self, *args, **kw) -> SuccessTuple:
14    """
15    Update a pipe's parameters in its instance.
16    """
17    kw['interactive'] = False
18    return self.edit(*args, **kw)

Update a pipe's parameters in its instance.

def sync( self, df: Union[pandas.core.frame.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]], meerschaum.core.Pipe._sync.InferFetch] = <class 'meerschaum.core.Pipe._sync.InferFetch'>, begin: Union[datetime.datetime, int, str, NoneType] = '', end: Union[datetime.datetime, int, NoneType] = None, force: bool = False, retries: int = 10, min_seconds: int = 1, check_existing: bool = True, blocking: bool = True, workers: Optional[int] = None, callback: Optional[Callable[[Tuple[bool, str]], Any]] = None, error_callback: Optional[Callable[[Exception], Any]] = None, chunksize: Optional[int] = -1, sync_chunks: bool = True, debug: bool = False, _inplace: bool = True, **kw: Any) -> Tuple[bool, str]:
 40def sync(
 41    self,
 42    df: Union[
 43        pd.DataFrame,
 44        Dict[str, List[Any]],
 45        List[Dict[str, Any]],
 46        InferFetch
 47    ] = InferFetch,
 48    begin: Union[datetime, int, str, None] = '',
 49    end: Union[datetime, int, None] = None,
 50    force: bool = False,
 51    retries: int = 10,
 52    min_seconds: int = 1,
 53    check_existing: bool = True,
 54    blocking: bool = True,
 55    workers: Optional[int] = None,
 56    callback: Optional[Callable[[Tuple[bool, str]], Any]] = None,
 57    error_callback: Optional[Callable[[Exception], Any]] = None,
 58    chunksize: Optional[int] = -1,
 59    sync_chunks: bool = True,
 60    debug: bool = False,
 61    _inplace: bool = True,
 62    **kw: Any
 63) -> SuccessTuple:
 64    """
 65    Fetch new data from the source and update the pipe's table with new data.
 66    
 67    Get new remote data via fetch, get existing data in the same time period,
 68    and merge the two, only keeping the unseen data.
 69
 70    Parameters
 71    ----------
 72    df: Union[None, pd.DataFrame, Dict[str, List[Any]]], default None
 73        An optional DataFrame to sync into the pipe. Defaults to `None`.
 74
 75    begin: Union[datetime, int, str, None], default ''
 76        Optionally specify the earliest datetime to search for data.
 77
 78    end: Union[datetime, int, str, None], default None
 79        Optionally specify the latest datetime to search for data.
 80
 81    force: bool, default False
 82        If `True`, keep trying to sync untul `retries` attempts.
 83
 84    retries: int, default 10
 85        If `force`, how many attempts to try syncing before declaring failure.
 86
 87    min_seconds: Union[int, float], default 1
 88        If `force`, how many seconds to sleep between retries. Defaults to `1`.
 89
 90    check_existing: bool, default True
 91        If `True`, pull and diff with existing data from the pipe.
 92
 93    blocking: bool, default True
 94        If `True`, wait for sync to finish and return its result, otherwise
 95        asyncronously sync (oxymoron?) and return success. Defaults to `True`.
 96        Only intended for specific scenarios.
 97
 98    workers: Optional[int], default None
 99        If provided and the instance connector is thread-safe
100        (`pipe.instance_connector.IS_THREAD_SAFE is True`),
101        limit concurrent sync to this many threads.
102
103    callback: Optional[Callable[[Tuple[bool, str]], Any]], default None
104        Callback function which expects a SuccessTuple as input.
105        Only applies when `blocking=False`.
106
107    error_callback: Optional[Callable[[Exception], Any]], default None
108        Callback function which expects an Exception as input.
109        Only applies when `blocking=False`.
110
111    chunksize: int, default -1
112        Specify the number of rows to sync per chunk.
113        If `-1`, resort to system configuration (default is `900`).
114        A `chunksize` of `None` will sync all rows in one transaction.
115
116    sync_chunks: bool, default True
117        If possible, sync chunks while fetching them into memory.
118
119    debug: bool, default False
120        Verbosity toggle. Defaults to False.
121
122    Returns
123    -------
124    A `SuccessTuple` of success (`bool`) and message (`str`).
125    """
126    from meerschaum.utils.debug import dprint, _checkpoint
127    from meerschaum.connectors import custom_types
128    from meerschaum.plugins import Plugin
129    from meerschaum.utils.formatting import get_console
130    from meerschaum.utils.venv import Venv
131    from meerschaum.connectors import get_connector_plugin
132    from meerschaum.utils.misc import df_is_chunk_generator, filter_keywords, filter_arguments
133    from meerschaum.utils.pool import get_pool
134    from meerschaum.config import get_config
135
136    if (callback is not None or error_callback is not None) and blocking:
137        warn("Callback functions are only executed when blocking = False. Ignoring...")
138
139    _checkpoint(_total=2, **kw)
140
141    if chunksize == 0:
142        chunksize = None
143        sync_chunks = False
144
145    kw.update({
146        'begin': begin,
147        'end': end,
148        'force': force,
149        'retries': retries,
150        'min_seconds': min_seconds,
151        'check_existing': check_existing,
152        'blocking': blocking,
153        'workers': workers,
154        'callback': callback,
155        'error_callback': error_callback,
156        'sync_chunks': sync_chunks,
157        'chunksize': chunksize,
158    })
159
160    ### NOTE: Invalidate `_exists` cache before and after syncing.
161    self._exists = None
162
163    def _sync(
164        p: 'meerschaum.Pipe',
165        df: Union[
166            'pd.DataFrame',
167            Dict[str, List[Any]],
168            List[Dict[str, Any]],
169            InferFetch
170        ] = InferFetch,
171    ) -> SuccessTuple:
172        if df is None:
173            p._exists = None
174            return (
175                False,
176                f"You passed `None` instead of data into `sync()` for {p}.\n"
177                + "Omit the DataFrame to infer fetching.",
178            )
179        ### Ensure that Pipe is registered.
180        if not p.temporary and p.get_id(debug=debug) is None:
181            ### NOTE: This may trigger an interactive session for plugins!
182            register_success, register_msg = p.register(debug=debug)
183            if not register_success:
184                if 'already' not in register_msg:
185                    p._exists = None
186                    return register_success, register_msg
187
188        ### If connector is a plugin with a `sync()` method, return that instead.
189        ### If the plugin does not have a `sync()` method but does have a `fetch()` method,
190        ### use that instead.
191        ### NOTE: The DataFrame must be omitted for the plugin sync method to apply.
192        ### If a DataFrame is provided, continue as expected.
193        if hasattr(df, 'MRSM_INFER_FETCH'):
194            try:
195                if p.connector is None:
196                    if ':' not in p.connector_keys:
197                        return True, f"{p} does not support fetching; nothing to do."
198
199                    msg = f"{p} does not have a valid connector."
200                    if p.connector_keys.startswith('plugin:'):
201                        msg += f"\n    Perhaps {p.connector_keys} has a syntax error?"
202                    p._exists = None
203                    return False, msg
204            except Exception:
205                p._exists = None
206                return False, f"Unable to create the connector for {p}."
207
208            ### Sync in place if this is a SQL pipe.
209            if (
210                str(self.connector) == str(self.instance_connector)
211                and 
212                hasattr(self.instance_connector, 'sync_pipe_inplace')
213                and
214                _inplace
215                and
216                get_config('system', 'experimental', 'inplace_sync')
217            ):
218                with Venv(get_connector_plugin(self.instance_connector)):
219                    p._exists = None
220                    _args, _kwargs = filter_arguments(
221                        p.instance_connector.sync_pipe_inplace,
222                        p,
223                        debug=debug,
224                        **kw
225                    )
226                    return self.instance_connector.sync_pipe_inplace(
227                        *_args,
228                        **_kwargs
229                    )
230
231            ### Activate and invoke `sync(pipe)` for plugin connectors with `sync` methods.
232            try:
233                if getattr(p.connector, 'sync', None) is not None:
234                    with Venv(get_connector_plugin(p.connector), debug=debug):
235                        _args, _kwargs = filter_arguments(
236                            p.connector.sync,
237                            p,
238                            debug=debug,
239                            **kw
240                        )
241                        return_tuple = p.connector.sync(*_args, **_kwargs)
242                    p._exists = None
243                    if not isinstance(return_tuple, tuple):
244                        return_tuple = (
245                            False,
246                            f"Plugin '{p.connector.label}' returned non-tuple value: {return_tuple}"
247                        )
248                    return return_tuple
249
250            except Exception as e:
251                get_console().print_exception()
252                msg = f"Failed to sync {p} with exception: '" + str(e) + "'"
253                if debug:
254                    error(msg, silent=False)
255                p._exists = None
256                return False, msg
257
258            ### Fetch the dataframe from the connector's `fetch()` method.
259            try:
260                with Venv(get_connector_plugin(p.connector), debug=debug):
261                    df = p.fetch(
262                        **filter_keywords(
263                            p.fetch,
264                            debug=debug,
265                            **kw
266                        )
267                    )
268            except Exception as e:
269                get_console().print_exception(
270                    suppress=[
271                        'meerschaum/core/Pipe/_sync.py',
272                        'meerschaum/core/Pipe/_fetch.py',
273                    ]
274                )
275                msg = f"Failed to fetch data from {p.connector}:\n    {e}"
276                df = None
277
278            if df is None:
279                p._exists = None
280                return False, f"No data were fetched for {p}."
281
282            if isinstance(df, list):
283                if len(df) == 0:
284                    return True, f"No new rows were returned for {p}."
285
286                ### May be a chunk hook results list.
287                if isinstance(df[0], tuple):
288                    success = all([_success for _success, _ in df])
289                    message = '\n'.join([_message for _, _message in df])
290                    return success, message
291
292            ### TODO: Depreciate async?
293            if df is True:
294                p._exists = None
295                return True, f"{p} is being synced in parallel."
296
297        ### CHECKPOINT: Retrieved the DataFrame.
298        _checkpoint(**kw)
299        
300        ### Allow for dataframe generators or iterables.
301        if df_is_chunk_generator(df):
302            kw['workers'] = p.get_num_workers(kw.get('workers', None))
303            dt_col = p.columns.get('datetime', None)
304            pool = get_pool(workers=kw.get('workers', 1))
305            if debug:
306                dprint(f"Received {type(df)}. Attempting to sync first chunk...")
307
308            try:
309                chunk = next(df)
310            except StopIteration:
311                return True, "Received an empty generator; nothing to do."
312
313            chunk_success, chunk_msg = _sync(p, chunk)
314            chunk_msg = '\n' + self._get_chunk_label(chunk, dt_col) + '\n' + chunk_msg
315            if not chunk_success:
316                return chunk_success, f"Unable to sync initial chunk for {p}:\n{chunk_msg}"
317            if debug:
318                dprint("Successfully synced the first chunk, attemping the rest...")
319
320            failed_chunks = []
321            def _process_chunk(_chunk):
322                try:
323                    _chunk_success, _chunk_msg = _sync(p, _chunk)
324                except Exception as e:
325                    _chunk_success, _chunk_msg = False, str(e)
326                if not _chunk_success:
327                    failed_chunks.append(_chunk)
328                return (
329                    _chunk_success,
330                    (
331                        '\n'
332                        + self._get_chunk_label(_chunk, dt_col)
333                        + '\n'
334                        + _chunk_msg
335                    )
336                )
337
338            results = sorted(
339                [(chunk_success, chunk_msg)] + (
340                    list(pool.imap(_process_chunk, df))
341                    if not df_is_chunk_generator(chunk)
342                    else [
343                        _process_chunk(_child_chunks)
344                        for _child_chunks in df
345                    ]
346                )
347            )
348            chunk_messages = [chunk_msg for _, chunk_msg in results]
349            success_bools = [chunk_success for chunk_success, _ in results]
350            success = all(success_bools)
351            msg = '\n'.join(chunk_messages)
352
353            ### If some chunks succeeded, retry the failures.
354            retry_success = True
355            if not success and any(success_bools):
356                if debug:
357                    dprint("Retrying failed chunks...")
358                chunks_to_retry = [c for c in failed_chunks]
359                failed_chunks = []
360                for chunk in chunks_to_retry:
361                    chunk_success, chunk_msg = _process_chunk(chunk)
362                    msg += f"\n\nRetried chunk:\n{chunk_msg}\n"
363                    retry_success = retry_success and chunk_success
364
365            success = success and retry_success
366            return success, msg
367
368        ### Cast to a dataframe and ensure datatypes are what we expect.
369        df = self.enforce_dtypes(df, chunksize=chunksize, debug=debug)
370
371        ### Capture `numeric` and `json` columns.
372        self._persist_new_json_columns(df, debug=debug)
373        self._persist_new_numeric_columns(df, debug=debug)
374
375        if debug:
376            dprint(
377                "DataFrame to sync:\n"
378                + (
379                    str(df)[:255]
380                    + '...'
381                    if len(str(df)) >= 256
382                    else str(df)
383                ),
384                **kw
385            )
386
387        ### if force, continue to sync until success
388        return_tuple = False, f"Did not sync {p}."
389        run = True
390        _retries = 1
391        while run:
392            with Venv(get_connector_plugin(self.instance_connector)):
393                return_tuple = p.instance_connector.sync_pipe(
394                    pipe=p,
395                    df=df,
396                    debug=debug,
397                    **kw
398                )
399            _retries += 1
400            run = (not return_tuple[0]) and force and _retries <= retries
401            if run and debug:
402                dprint(f"Syncing failed for {p}. Attempt ( {_retries} / {retries} )", **kw)
403                dprint(f"Sleeping for {min_seconds} seconds...", **kw)
404                time.sleep(min_seconds)
405            if _retries > retries:
406                warn(
407                    f"Unable to sync {p} within {retries} attempt" +
408                        ("s" if retries != 1 else "") + "!"
409                )
410
411        ### CHECKPOINT: Finished syncing. Handle caching.
412        _checkpoint(**kw)
413        if self.cache_pipe is not None:
414            if debug:
415                dprint("Caching retrieved dataframe.", **kw)
416                _sync_cache_tuple = self.cache_pipe.sync(df, debug=debug, **kw)
417                if not _sync_cache_tuple[0]:
418                    warn(f"Failed to sync local cache for {self}.")
419
420        self._exists = None
421        return return_tuple
422
423    if blocking:
424        self._exists = None
425        return _sync(self, df = df)
426
427    from meerschaum.utils.threading import Thread
428    def default_callback(result_tuple: SuccessTuple):
429        dprint(f"Asynchronous result from {self}: {result_tuple}", **kw)
430
431    def default_error_callback(x: Exception):
432        dprint(f"Error received for {self}: {x}", **kw)
433
434    if callback is None and debug:
435        callback = default_callback
436    if error_callback is None and debug:
437        error_callback = default_error_callback
438    try:
439        thread = Thread(
440            target=_sync,
441            args=(self,),
442            kwargs={'df': df},
443            daemon=False,
444            callback=callback,
445            error_callback=error_callback,
446        )
447        thread.start()
448    except Exception as e:
449        self._exists = None
450        return False, str(e)
451
452    self._exists = None
453    return True, f"Spawned asyncronous sync for {self}."

Fetch new data from the source and update the pipe's table with new data.

Get new remote data via fetch, get existing data in the same time period, and merge the two, only keeping the unseen data.

Parameters
  • df (Union[None, pd.DataFrame, Dict[str, List[Any]]], default None): An optional DataFrame to sync into the pipe. Defaults to None.
  • begin (Union[datetime, int, str, None], default ''): Optionally specify the earliest datetime to search for data.
  • end (Union[datetime, int, str, None], default None): Optionally specify the latest datetime to search for data.
  • force (bool, default False): If True, keep trying to sync untul retries attempts.
  • retries (int, default 10): If force, how many attempts to try syncing before declaring failure.
  • min_seconds (Union[int, float], default 1): If force, how many seconds to sleep between retries. Defaults to 1.
  • check_existing (bool, default True): If True, pull and diff with existing data from the pipe.
  • blocking (bool, default True): If True, wait for sync to finish and return its result, otherwise asyncronously sync (oxymoron?) and return success. Defaults to True. Only intended for specific scenarios.
  • workers (Optional[int], default None): If provided and the instance connector is thread-safe (pipe.instance_connector.IS_THREAD_SAFE is True), limit concurrent sync to this many threads.
  • callback (Optional[Callable[[Tuple[bool, str]], Any]], default None): Callback function which expects a SuccessTuple as input. Only applies when blocking=False.
  • error_callback (Optional[Callable[[Exception], Any]], default None): Callback function which expects an Exception as input. Only applies when blocking=False.
  • chunksize (int, default -1): Specify the number of rows to sync per chunk. If -1, resort to system configuration (default is 900). A chunksize of None will sync all rows in one transaction.
  • sync_chunks (bool, default True): If possible, sync chunks while fetching them into memory.
  • debug (bool, default False): Verbosity toggle. Defaults to False.
Returns
def get_sync_time( self, params: Optional[Dict[str, Any]] = None, newest: bool = True, apply_backtrack_interval: bool = False, round_down: bool = False, debug: bool = False) -> Optional[datetime.datetime]:
456def get_sync_time(
457    self,
458    params: Optional[Dict[str, Any]] = None,
459    newest: bool = True,
460    apply_backtrack_interval: bool = False,
461    round_down: bool = False,
462    debug: bool = False
463) -> Union['datetime', None]:
464    """
465    Get the most recent datetime value for a Pipe.
466
467    Parameters
468    ----------
469    params: Optional[Dict[str, Any]], default None
470        Dictionary to build a WHERE clause for a specific column.
471        See `meerschaum.utils.sql.build_where`.
472
473    newest: bool, default True
474        If `True`, get the most recent datetime (honoring `params`).
475        If `False`, get the oldest datetime (`ASC` instead of `DESC`).
476
477    apply_backtrack_interval: bool, default False
478        If `True`, subtract the backtrack interval from the sync time.
479
480    round_down: bool, default False
481        If `True`, round down the datetime value to the nearest minute.
482
483    debug: bool, default False
484        Verbosity toggle.
485
486    Returns
487    -------
488    A `datetime` object if the pipe exists, otherwise `None`.
489
490    """
491    from meerschaum.utils.venv import Venv
492    from meerschaum.connectors import get_connector_plugin
493    from meerschaum.utils.misc import round_time
494
495    with Venv(get_connector_plugin(self.instance_connector)):
496        sync_time = self.instance_connector.get_sync_time(
497            self,
498            params=params,
499            newest=newest,
500            debug=debug,
501        )
502
503    if round_down and isinstance(sync_time, datetime):
504        sync_time = round_time(sync_time, timedelta(minutes=1))
505
506    if apply_backtrack_interval and sync_time is not None:
507        backtrack_interval = self.get_backtrack_interval(debug=debug)
508        try:
509            sync_time -= backtrack_interval
510        except Exception as e:
511            warn(f"Failed to apply backtrack interval:\n{e}")
512
513    return sync_time

Get the most recent datetime value for a Pipe.

Parameters
  • params (Optional[Dict[str, Any]], default None): Dictionary to build a WHERE clause for a specific column. See meerschaum.utils.sql.build_where.
  • newest (bool, default True): If True, get the most recent datetime (honoring params). If False, get the oldest datetime (ASC instead of DESC).
  • apply_backtrack_interval (bool, default False): If True, subtract the backtrack interval from the sync time.
  • round_down (bool, default False): If True, round down the datetime value to the nearest minute.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A datetime object if the pipe exists, otherwise None.
def exists(self, debug: bool = False) -> bool:
516def exists(
517        self,
518        debug : bool = False
519    ) -> bool:
520    """
521    See if a Pipe's table exists.
522
523    Parameters
524    ----------
525    debug: bool, default False
526        Verbosity toggle.
527
528    Returns
529    -------
530    A `bool` corresponding to whether a pipe's underlying table exists.
531
532    """
533    import time
534    from meerschaum.utils.venv import Venv
535    from meerschaum.connectors import get_connector_plugin
536    from meerschaum.config import STATIC_CONFIG
537    from meerschaum.utils.debug import dprint
538    now = time.perf_counter()
539    exists_timeout_seconds = STATIC_CONFIG['pipes']['exists_timeout_seconds']
540
541    _exists = self.__dict__.get('_exists', None)
542    if _exists:
543        exists_timestamp = self.__dict__.get('_exists_timestamp', None)
544        if exists_timestamp is not None:
545            delta = now - exists_timestamp
546            if delta < exists_timeout_seconds:
547                if debug:
548                    dprint(f"Returning cached `exists` for {self} ({round(delta, 2)} seconds old).")
549                return _exists
550
551    with Venv(get_connector_plugin(self.instance_connector)):
552        _exists = self.instance_connector.pipe_exists(pipe=self, debug=debug)
553
554    self.__dict__['_exists'] = _exists
555    self.__dict__['_exists_timestamp'] = now
556    return _exists

See if a Pipe's table exists.

Parameters
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool corresponding to whether a pipe's underlying table exists.
def filter_existing( self, df: pandas.core.frame.DataFrame, safe_copy: bool = True, date_bound_only: bool = False, include_unchanged_columns: bool = False, chunksize: Optional[int] = -1, debug: bool = False, **kw) -> Tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame, pandas.core.frame.DataFrame]:
559def filter_existing(
560    self,
561    df: 'pd.DataFrame',
562    safe_copy: bool = True,
563    date_bound_only: bool = False,
564    include_unchanged_columns: bool = False,
565    chunksize: Optional[int] = -1,
566    debug: bool = False,
567    **kw
568) -> Tuple['pd.DataFrame', 'pd.DataFrame', 'pd.DataFrame']:
569    """
570    Inspect a dataframe and filter out rows which already exist in the pipe.
571
572    Parameters
573    ----------
574    df: 'pd.DataFrame'
575        The dataframe to inspect and filter.
576
577    safe_copy: bool, default True
578        If `True`, create a copy before comparing and modifying the dataframes.
579        Setting to `False` may mutate the DataFrames.
580        See `meerschaum.utils.dataframe.filter_unseen_df`.
581
582    date_bound_only: bool, default False
583        If `True`, only use the datetime index to fetch the sample dataframe.
584
585    include_unchanged_columns: bool, default False
586        If `True`, include the backtrack columns which haven't changed in the update dataframe.
587        This is useful if you can't update individual keys.
588
589    chunksize: Optional[int], default -1
590        The `chunksize` used when fetching existing data.
591
592    debug: bool, default False
593        Verbosity toggle.
594
595    Returns
596    -------
597    A tuple of three pandas DataFrames: unseen, update, and delta.
598    """
599    from meerschaum.utils.warnings import warn
600    from meerschaum.utils.debug import dprint
601    from meerschaum.utils.packages import attempt_import, import_pandas
602    from meerschaum.utils.misc import round_time
603    from meerschaum.utils.dataframe import (
604        filter_unseen_df,
605        add_missing_cols_to_df,
606        get_unhashable_cols,
607        get_numeric_cols,
608    )
609    from meerschaum.utils.dtypes import (
610        to_pandas_dtype,
611        none_if_null,
612    )
613    from meerschaum.config import get_config
614    pd = import_pandas()
615    pandas = attempt_import('pandas')
616    if 'dataframe' not in str(type(df)).lower():
617        df = self.enforce_dtypes(df, chunksize=chunksize, debug=debug)
618    is_dask = 'dask' in df.__module__
619    if is_dask:
620        dd = attempt_import('dask.dataframe')
621        merge = dd.merge
622        NA = pandas.NA
623    else:
624        merge = pd.merge
625        NA = pd.NA
626
627    def get_empty_df():
628        empty_df = pd.DataFrame([])
629        dtypes = dict(df.dtypes) if df is not None else {}
630        dtypes.update(self.dtypes)
631        pd_dtypes = {
632            col: to_pandas_dtype(str(typ))
633            for col, typ in dtypes.items()
634        }
635        return add_missing_cols_to_df(empty_df, pd_dtypes)
636
637    if df is None:
638        empty_df = get_empty_df()
639        return empty_df, empty_df, empty_df
640
641    if (df.empty if not is_dask else len(df) == 0):
642        return df, df, df
643
644    ### begin is the oldest data in the new dataframe
645    begin, end = None, None
646    dt_col = self.columns.get('datetime', None)
647    dt_type = self.dtypes.get(dt_col, 'datetime64[ns]') if dt_col else None
648    try:
649        min_dt_val = df[dt_col].min(skipna=True) if dt_col else None
650        if is_dask and min_dt_val is not None:
651            min_dt_val = min_dt_val.compute()
652        min_dt = (
653            pandas.to_datetime(min_dt_val).to_pydatetime()
654            if min_dt_val is not None and 'datetime' in str(dt_type)
655            else min_dt_val
656        )
657    except Exception:
658        min_dt = None
659    if not ('datetime' in str(type(min_dt))) or str(min_dt) == 'NaT':
660        if 'int' not in str(type(min_dt)).lower():
661            min_dt = None
662
663    if isinstance(min_dt, datetime):
664        begin = (
665            round_time(
666                min_dt,
667                to='down'
668            ) - timedelta(minutes=1)
669        )
670    elif dt_type and 'int' in dt_type.lower():
671        begin = min_dt
672    elif dt_col is None:
673        begin = None
674
675    ### end is the newest data in the new dataframe
676    try:
677        max_dt_val = df[dt_col].max(skipna=True) if dt_col else None
678        if is_dask and max_dt_val is not None:
679            max_dt_val = max_dt_val.compute()
680        max_dt = (
681            pandas.to_datetime(max_dt_val).to_pydatetime()
682            if max_dt_val is not None and 'datetime' in str(dt_type)
683            else max_dt_val
684        )
685    except Exception:
686        import traceback
687        traceback.print_exc()
688        max_dt = None
689
690    if ('datetime' not in str(type(max_dt))) or str(min_dt) == 'NaT':
691        if 'int' not in str(type(max_dt)).lower():
692            max_dt = None
693
694    if isinstance(max_dt, datetime):
695        end = (
696            round_time(
697                max_dt,
698                to='down'
699            ) + timedelta(minutes=1)
700        )
701    elif dt_type and 'int' in dt_type.lower():
702        end = max_dt + 1
703
704    if max_dt is not None and min_dt is not None and min_dt > max_dt:
705        warn("Detected minimum datetime greater than maximum datetime.")
706
707    if begin is not None and end is not None and begin > end:
708        if isinstance(begin, datetime):
709            begin = end - timedelta(minutes=1)
710        ### We might be using integers for the datetime axis.
711        else:
712            begin = end - 1
713
714    unique_index_vals = {
715        col: df[col].unique()
716        for col in self.columns
717        if col in df.columns and col != dt_col
718    } if not date_bound_only else {}
719    filter_params_index_limit = get_config('pipes', 'sync', 'filter_params_index_limit')
720    _ = kw.pop('params', None)
721    params = {
722        col: [
723            none_if_null(val)
724            for val in unique_vals
725        ]
726        for col, unique_vals in unique_index_vals.items()
727        if len(unique_vals) <= filter_params_index_limit
728    } if not date_bound_only else {}
729
730    if debug:
731        dprint(f"Looking at data between '{begin}' and '{end}':", **kw)
732
733    backtrack_df = self.get_data(
734        begin=begin,
735        end=end,
736        chunksize=chunksize,
737        params=params,
738        debug=debug,
739        **kw
740    )
741    if backtrack_df is None:
742        if debug:
743            dprint(f"No backtrack data was found for {self}.")
744        return df, get_empty_df(), df
745
746    if debug:
747        dprint(f"Existing data for {self}:\n" + str(backtrack_df), **kw)
748        dprint(f"Existing dtypes for {self}:\n" + str(backtrack_df.dtypes))
749
750    ### Separate new rows from changed ones.
751    on_cols = [
752        col for col_key, col in self.columns.items()
753        if (
754            col
755            and
756            col_key != 'value'
757            and col in backtrack_df.columns
758        )
759    ]
760    self_dtypes = self.dtypes
761    on_cols_dtypes = {
762        col: to_pandas_dtype(typ)
763        for col, typ in self_dtypes.items()
764        if col in on_cols
765    }
766
767    ### Detect changes between the old target and new source dataframes.
768    delta_df = add_missing_cols_to_df(
769        filter_unseen_df(
770            backtrack_df,
771            df,
772            dtypes={
773                col: to_pandas_dtype(typ)
774                for col, typ in self_dtypes.items()
775            },
776            safe_copy=safe_copy,
777            debug=debug
778        ),
779        on_cols_dtypes,
780    )
781
782    ### Cast dicts or lists to strings so we can merge.
783    serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str)
784
785    def deserializer(x):
786        return json.loads(x) if isinstance(x, str) else x
787
788    unhashable_delta_cols = get_unhashable_cols(delta_df)
789    unhashable_backtrack_cols = get_unhashable_cols(backtrack_df)
790    for col in unhashable_delta_cols:
791        delta_df[col] = delta_df[col].apply(serializer)
792    for col in unhashable_backtrack_cols:
793        backtrack_df[col] = backtrack_df[col].apply(serializer)
794    casted_cols = set(unhashable_delta_cols + unhashable_backtrack_cols)
795
796    joined_df = merge(
797        delta_df.infer_objects(copy=False).fillna(NA),
798        backtrack_df.infer_objects(copy=False).fillna(NA),
799        how='left',
800        on=on_cols,
801        indicator=True,
802        suffixes=('', '_old'),
803    ) if on_cols else delta_df
804    for col in casted_cols:
805        if col in joined_df.columns:
806            joined_df[col] = joined_df[col].apply(deserializer)
807        if col in delta_df.columns:
808            delta_df[col] = delta_df[col].apply(deserializer)
809
810    ### Determine which rows are completely new.
811    new_rows_mask = (joined_df['_merge'] == 'left_only') if on_cols else None
812    cols = list(delta_df.columns)
813
814    unseen_df = (
815        joined_df
816        .where(new_rows_mask)
817        .dropna(how='all')[cols]
818        .reset_index(drop=True)
819    ) if on_cols else delta_df
820
821    ### Rows that have already been inserted but values have changed.
822    update_df = (
823        joined_df
824        .where(~new_rows_mask)
825        .dropna(how='all')[cols]
826        .reset_index(drop=True)
827    ) if on_cols else get_empty_df()
828
829    if include_unchanged_columns and on_cols:
830        unchanged_backtrack_cols = [
831            col
832            for col in backtrack_df.columns
833            if col in on_cols or col not in update_df.columns
834        ]
835        update_df = merge(
836            backtrack_df[unchanged_backtrack_cols],
837            update_df,
838            how='inner',
839            on=on_cols,
840        )
841
842    return unseen_df, update_df, delta_df

Inspect a dataframe and filter out rows which already exist in the pipe.

Parameters
  • df ('pd.DataFrame'): The dataframe to inspect and filter.
  • safe_copy (bool, default True): If True, create a copy before comparing and modifying the dataframes. Setting to False may mutate the DataFrames. See meerschaum.utils.dataframe.filter_unseen_df.
  • date_bound_only (bool, default False): If True, only use the datetime index to fetch the sample dataframe.
  • include_unchanged_columns (bool, default False): If True, include the backtrack columns which haven't changed in the update dataframe. This is useful if you can't update individual keys.
  • chunksize (Optional[int], default -1): The chunksize used when fetching existing data.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A tuple of three pandas DataFrames (unseen, update, and delta.):
def get_num_workers(self, workers: Optional[int] = None) -> int:
867def get_num_workers(self, workers: Optional[int] = None) -> int:
868    """
869    Get the number of workers to use for concurrent syncs.
870
871    Parameters
872    ----------
873    The number of workers passed via `--workers`.
874
875    Returns
876    -------
877    The number of workers, capped for safety.
878    """
879    is_thread_safe = getattr(self.instance_connector, 'IS_THREAD_SAFE', False)
880    if not is_thread_safe:
881        return 1
882
883    engine_pool_size = (
884        self.instance_connector.engine.pool.size()
885        if self.instance_connector.type == 'sql'
886        else None
887    )
888    current_num_threads = threading.active_count()
889    current_num_connections = (
890        self.instance_connector.engine.pool.checkedout()
891        if engine_pool_size is not None
892        else current_num_threads
893    )
894    desired_workers = (
895        min(workers or engine_pool_size, engine_pool_size)
896        if engine_pool_size is not None
897        else workers
898    )
899    if desired_workers is None:
900        desired_workers = (multiprocessing.cpu_count() if is_thread_safe else 1)
901
902    return max(
903        (desired_workers - current_num_connections),
904        1,
905    )

Get the number of workers to use for concurrent syncs.

Parameters
  • The number of workers passed via --workers.
Returns
  • The number of workers, capped for safety.
def verify( self, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, chunk_interval: Union[datetime.timedelta, int, NoneType] = None, bounded: Optional[bool] = None, deduplicate: bool = False, workers: Optional[int] = None, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
 15def verify(
 16    self,
 17    begin: Union[datetime, int, None] = None,
 18    end: Union[datetime, int, None] = None,
 19    params: Optional[Dict[str, Any]] = None,
 20    chunk_interval: Union[timedelta, int, None] = None,
 21    bounded: Optional[bool] = None,
 22    deduplicate: bool = False,
 23    workers: Optional[int] = None,
 24    debug: bool = False,
 25    **kwargs: Any
 26) -> SuccessTuple:
 27    """
 28    Verify the contents of the pipe by resyncing its interval.
 29
 30    Parameters
 31    ----------
 32    begin: Union[datetime, int, None], default None
 33        If specified, only verify rows greater than or equal to this value.
 34
 35    end: Union[datetime, int, None], default None
 36        If specified, only verify rows less than this value.
 37
 38    chunk_interval: Union[timedelta, int, None], default None
 39        If provided, use this as the size of the chunk boundaries.
 40        Default to the value set in `pipe.parameters['chunk_minutes']` (1440).
 41
 42    bounded: Optional[bool], default None
 43        If `True`, do not verify older than the oldest sync time or newer than the newest.
 44        If `False`, verify unbounded syncs outside of the new and old sync times.
 45        The default behavior (`None`) is to bound only if a bound interval is set
 46        (e.g. `pipe.parameters['verify']['bound_days']`).
 47
 48    deduplicate: bool, default False
 49        If `True`, deduplicate the pipe's table after the verification syncs.
 50
 51    workers: Optional[int], default None
 52        If provided, limit the verification to this many threads.
 53        Use a value of `1` to sync chunks in series.
 54
 55    debug: bool, default False
 56        Verbosity toggle.
 57
 58    kwargs: Any
 59        All keyword arguments are passed to `pipe.sync()`.
 60
 61    Returns
 62    -------
 63    A SuccessTuple indicating whether the pipe was successfully resynced.
 64    """
 65    from meerschaum.utils.pool import get_pool
 66    from meerschaum.utils.misc import interval_str
 67    workers = self.get_num_workers(workers)
 68
 69    ### Skip configured bounding in parameters
 70    ### if `bounded` is explicitly `False`.
 71    bound_time = (
 72        self.get_bound_time(debug=debug)
 73        if bounded is not False
 74        else None
 75    )
 76    if bounded is None:
 77        bounded = bound_time is not None
 78
 79    if bounded and begin is None:
 80        begin = (
 81            bound_time
 82            if bound_time is not None
 83            else self.get_sync_time(newest=False, debug=debug)
 84        )
 85    if bounded and end is None:
 86        end = self.get_sync_time(newest=True, debug=debug)
 87
 88    if bounded and end is not None:
 89        end += (
 90            timedelta(minutes=1)
 91            if isinstance(end, datetime)
 92            else 1
 93        )
 94
 95    sync_less_than_begin = not bounded and begin is None
 96    sync_greater_than_end = not bounded and end is None
 97
 98    cannot_determine_bounds = not self.exists(debug=debug)
 99
100    if cannot_determine_bounds:
101        sync_success, sync_msg = self.sync(
102            begin = begin,
103            end = end,
104            params = params,
105            workers = workers,
106            debug = debug,
107            **kwargs
108        )
109        if not sync_success:
110            return sync_success, sync_msg
111        if deduplicate:
112            return self.deduplicate(
113                begin = begin,
114                end = end,
115                params = params,
116                workers = workers,
117                debug = debug,
118                **kwargs
119            )
120        return sync_success, sync_msg
121
122
123    chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug)
124    chunk_bounds = self.get_chunk_bounds(
125        begin = begin,
126        end = end,
127        chunk_interval = chunk_interval,
128        bounded = bounded,
129        debug = debug,
130    )
131
132    ### Consider it a success if no chunks need to be verified.
133    if not chunk_bounds:
134        if deduplicate:
135            return self.deduplicate(
136                begin = begin,
137                end = end,
138                params = params,
139                workers = workers,
140                debug = debug,
141                **kwargs
142            )
143        return True, f"Could not determine chunks between '{begin}' and '{end}'; nothing to do."
144
145    begin_to_print = (
146        begin
147        if begin is not None
148        else (
149            chunk_bounds[0][0]
150            if bounded
151            else chunk_bounds[0][1]
152        )
153    )
154    end_to_print = (
155        end
156        if end is not None
157        else (
158            chunk_bounds[-1][1]
159            if bounded
160            else chunk_bounds[-1][0]
161        )
162    )
163
164    info(
165        f"Syncing {len(chunk_bounds)} chunk" + ('s' if len(chunk_bounds) != 1 else '')
166        + f" ({'un' if not bounded else ''}bounded)"
167        + f" of size '{interval_str(chunk_interval)}'"
168        + f" between '{begin_to_print}' and '{end_to_print}'."
169    )
170
171    pool = get_pool(workers=workers)
172
173    ### Dictionary of the form bounds -> success_tuple, e.g.:
174    ### {
175    ###    (2023-01-01, 2023-01-02): (True, "Success")
176    ### }
177    bounds_success_tuples = {}
178    def process_chunk_bounds(
179            chunk_begin_and_end: Tuple[
180                Union[int, datetime],
181                Union[int, datetime]
182            ]
183        ):
184        if chunk_begin_and_end in bounds_success_tuples:
185            return chunk_begin_and_end, bounds_success_tuples[chunk_begin_and_end]
186
187        chunk_begin, chunk_end = chunk_begin_and_end
188        return