meerschaum

Meerschaum banner

Meerschaum Python API

Welcome to the Meerschaum Python API technical documentation! Here you can find information about the classes and functions provided by the meerschaum package. Visit meerschaum.io for general usage documentation.

Root Module

For your convenience, the following classes and functions may be imported from the root meerschaum namespace:

Examples

Build a Connector

Get existing connectors or build a new one in-memory with the meerschaum.get_connector() factory function:

import meerschaum as mrsm

sql_conn = mrsm.get_connector(
    'sql:temp',
    flavor='sqlite',
    database='/tmp/tmp.db',
)
df = sql_conn.read("SELECT 1 AS foo")
print(df)
#    foo
# 0    1

sql_conn.to_sql(df, 'foo')
print(sql_conn.read('foo'))
#    foo
# 0    1

Create a Custom Connector Class

Decorate your connector classes with meerschaum.make_connector() to designate it as a custom connector:

from datetime import datetime, timezone
from random import randint
import meerschaum as mrsm
from meerschaum.utils.misc import round_time

@mrsm.make_connector
class FooConnector(mrsm.Connector):
    REQUIRED_ATTRIBUTES = ['username', 'password']

    def fetch(
        self,
        begin: datetime | None = None,
        end: datetime | None = None,
    ):
        now = begin or round_time(datetime.now(timezone.utc))
        return [
            {'ts': now, 'id': 1, 'vl': randint(1, 100)},
            {'ts': now, 'id': 2, 'vl': randint(1, 100)},
            {'ts': now, 'id': 3, 'vl': randint(1, 100)},
        ]

foo_conn = mrsm.get_connector(
    'foo:bar',
    username='foo',
    password='bar',
)
docs = foo_conn.fetch()

Build a Pipe

Build a meerschaum.Pipe in-memory:

from datetime import datetime
import meerschaum as mrsm

pipe = mrsm.Pipe(
    foo_conn, 'demo',
    instance=sql_conn,
    columns={'datetime': 'ts', 'id': 'id'},
    tags=['production'],
)
pipe.sync(begin=datetime(2024, 1, 1))
df = pipe.get_data()
print(df)
#           ts  id  vl
# 0 2024-01-01   1  97
# 1 2024-01-01   2  18
# 2 2024-01-01   3  96

Add temporary=True to skip registering the pipe in the pipes table.

Get Registered Pipes

The meerschaum.get_pipes() function returns a dictionary hierarchy of pipes by connector, metric, and location:

import meerschaum as mrsm

pipes = mrsm.get_pipes(instance='sql:temp')
pipe = pipes['foo:bar']['demo'][None]

Add as_list=True to flatten the hierarchy:

import meerschaum as mrsm

pipes = mrsm.get_pipes(
    tags=['production'],
    instance=sql_conn,
    as_list=True,
)
print(pipes)
# [Pipe('foo:bar', 'demo', instance='sql:temp')]

Import Plugins

You can import a plugin's module through meerschaum.Plugin.module:

import meerschaum as mrsm

plugin = mrsm.Plugin('noaa')
with mrsm.Venv(plugin):
    noaa = plugin.module

If your plugin has submodules, use meerschaum.plugins.from_plugin_import:

from meerschaum.plugins import from_plugin_import
get_defined_pipes = from_plugin_import('compose.utils.pipes', 'get_defined_pipes')

Import multiple plugins with meerschaum.plugins.import_plugins:

from meerschaum.plugins import import_plugins
noaa, compose = import_plugins('noaa', 'compose')

Create a Job

Create a meerschaum.Job with name and sysargs:

import meerschaum as mrsm

job = mrsm.Job('syncing-engine', 'sync pipes --loop')
success, msg = job.start()

Pass executor_keys as the connectors keys of an API instance to create a remote job:

import meerschaum as mrsm

job = mrsm.Job(
    'foo',
    'sync pipes -s daily',
    executor_keys='api:main',
)

Import from a Virtual Environment Use the meerschaum.Venv context manager to activate a virtual environment:

import meerschaum as mrsm

with mrsm.Venv('noaa'):
    import requests

print(requests.__file__)
# /home/bmeares/.config/meerschaum/venvs/noaa/lib/python3.12/site-packages/requests/__init__.py

To import packages which may not be installed, use meerschaum.attempt_import():

import meerschaum as mrsm

requests = mrsm.attempt_import('requests', venv='noaa')
print(requests.__file__)
# /home/bmeares/.config/meerschaum/venvs/noaa/lib/python3.12/site-packages/requests/__init__.py

Run Actions

Run sysargs with meerschaum.entry():

import meerschaum as mrsm

success, msg = mrsm.entry('show pipes + show version : x2')

Use meerschaum.actions.get_action() to access an action function directly:

from meerschaum.actions import get_action

show_pipes = get_action(['show', 'pipes'])
success, msg = show_pipes(connector_keys=['plugin:noaa'])

Get a dictionary of available subactions with meerschaum.actions.get_subactions():

from meerschaum.actions import get_subactions

subactions = get_subactions('show')
success, msg = subactions['pipes']()

Create a Plugin

Run bootstrap plugin to create a new plugin:

mrsm bootstrap plugin example

This will create example.py in your plugins directory (default ~/.config/meerschaum/plugins/, Windows: %APPDATA%\Meerschaum\plugins). You may paste the example code from the "Create a Custom Action" example below.

Open your plugin with edit plugin:

mrsm edit plugin example

Run edit plugin and paste the example code below to try out the features.

See the writing plugins guide for more in-depth documentation.

Create a Custom Action

Decorate a function with meerschaum.actions.make_action to designate it as an action. Subactions will be automatically detected if not decorated:

from meerschaum.actions import make_action

@make_action
def sing():
    print('What would you like me to sing?')
    return True, "Success"

def sing_tune():
    return False, "I don't know that song!"

def sing_song():
    print('Hello, World!')
    return True, "Success"

Use meerschaum.plugins.add_plugin_argument() to create new parameters for your action:

from meerschaum.plugins import make_action, add_plugin_argument

add_plugin_argument(
    '--song', type=str, help='What song to sing.',
)

@make_action
def sing_melody(action=None, song=None):
    to_sing = action[0] if action else song
    if not to_sing:
        return False, "Please tell me what to sing!"

    return True, f'~I am singing {to_sing}~'
mrsm sing melody lalala

mrsm sing melody --song do-re-mi

Add a Page to the Web Dashboard Use the decorators meerschaum.plugins.dash_plugin() and meerschaum.plugins.web_page() to add new pages to the web dashboard:

from meerschaum.plugins import dash_plugin, web_page

@dash_plugin
def init_dash(dash_app):

    import dash.html as html
    import dash_bootstrap_components as dbc
    from dash import Input, Output, no_update

    ### Routes to '/dash/my-page'
    @web_page('/my-page', login_required=False)
    def my_page():
        return dbc.Container([
            html.H1("Hello, World!"),
            dbc.Button("Click me", id='my-button'),
            html.Div(id="my-output-div"),
        ])

    @dash_app.callback(
        Output('my-output-div', 'children'),
        Input('my-button', 'n_clicks'),
    )
    def my_button_click(n_clicks):
        if not n_clicks:
            return no_update
        return html.P(f'You clicked {n_clicks} times!')

Submodules

meerschaum.actions
Access functions for actions and subactions.

meerschaum.config
Read and write the Meerschaum configuration registry.

meerschaum.connectors
Build connectors to interact with databases and fetch data.

meerschaum.jobs
Start background jobs.

meerschaum.plugins
Access plugin modules and other API utilties.

meerschaum.utils
Utility functions are available in several submodules:

 1#! /usr/bin/env python
 2# -*- coding: utf-8 -*-
 3# vim:fenc=utf-8
 4
 5"""
 6Copyright 2023 Bennett Meares
 7
 8Licensed under the Apache License, Version 2.0 (the "License");
 9you may not use this file except in compliance with the License.
10You may obtain a copy of the License at
11
12   http://www.apache.org/licenses/LICENSE-2.0
13
14Unless required by applicable law or agreed to in writing, software
15distributed under the License is distributed on an "AS IS" BASIS,
16WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17See the License for the specific language governing permissions and
18limitations under the License.
19"""
20
21import atexit
22from meerschaum.utils.typing import SuccessTuple
23from meerschaum.utils.packages import attempt_import
24from meerschaum.core.Pipe import Pipe
25from meerschaum.plugins import Plugin
26from meerschaum.utils.venv import Venv
27from meerschaum.jobs import Job, make_executor
28from meerschaum.connectors import get_connector, Connector, make_connector
29from meerschaum.utils import get_pipes
30from meerschaum.utils.formatting import pprint
31from meerschaum._internal.docs import index as __doc__
32from meerschaum.config import __version__, get_config
33from meerschaum._internal.entry import entry
34from meerschaum.__main__ import _close_pools
35
36atexit.register(_close_pools)
37
38__pdoc__ = {'gui': False, 'api': False, 'core': False, '_internal': False}
39__all__ = (
40    "get_pipes",
41    "get_connector",
42    "get_config",
43    "Pipe",
44    "Plugin",
45    "Venv",
46    "Plugin",
47    "Job",
48    "pprint",
49    "attempt_import",
50    "actions",
51    "config",
52    "connectors",
53    "jobs",
54    "plugins",
55    "utils",
56    "SuccessTuple",
57    "Connector",
58    "make_connector",
59    "entry",
60)
def get_pipes( connector_keys: Union[str, List[str], NoneType] = None, metric_keys: Union[str, List[str], NoneType] = None, location_keys: Union[str, List[str], NoneType] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, mrsm_instance: Union[str, meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, instance: Union[str, meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, as_list: bool = False, method: str = 'registered', debug: bool = False, **kw: Any) -> Union[Dict[str, Dict[str, Dict[str, Pipe]]], List[Pipe]]:
 19def get_pipes(
 20    connector_keys: Union[str, List[str], None] = None,
 21    metric_keys: Union[str, List[str], None] = None,
 22    location_keys: Union[str, List[str], None] = None,
 23    tags: Optional[List[str]] = None,
 24    params: Optional[Dict[str, Any]] = None,
 25    mrsm_instance: Union[str, InstanceConnector, None] = None,
 26    instance: Union[str, InstanceConnector, None] = None,
 27    as_list: bool = False,
 28    method: str = 'registered',
 29    debug: bool = False,
 30    **kw: Any
 31) -> Union[PipesDict, List[mrsm.Pipe]]:
 32    """
 33    Return a dictionary or list of `meerschaum.Pipe` objects.
 34
 35    Parameters
 36    ----------
 37    connector_keys: Union[str, List[str], None], default None
 38        String or list of connector keys.
 39        If omitted or is `'*'`, fetch all possible keys.
 40        If a string begins with `'_'`, select keys that do NOT match the string.
 41
 42    metric_keys: Union[str, List[str], None], default None
 43        String or list of metric keys. See `connector_keys` for formatting.
 44
 45    location_keys: Union[str, List[str], None], default None
 46        String or list of location keys. See `connector_keys` for formatting.
 47
 48    tags: Optional[List[str]], default None
 49         If provided, only include pipes with these tags.
 50
 51    params: Optional[Dict[str, Any]], default None
 52        Dictionary of additional parameters to search by.
 53        Params are parsed into a SQL WHERE clause.
 54        E.g. `{'a': 1, 'b': 2}` equates to `'WHERE a = 1 AND b = 2'`
 55
 56    mrsm_instance: Union[str, InstanceConnector, None], default None
 57        Connector keys for the Meerschaum instance of the pipes.
 58        Must be a `meerschaum.connectors.sql.SQLConnector.SQLConnector` or
 59        `meerschaum.connectors.api.APIConnector.APIConnector`.
 60        
 61    as_list: bool, default False
 62        If `True`, return pipes in a list instead of a hierarchical dictionary.
 63        `False` : `{connector_keys: {metric_key: {location_key: Pipe}}}`
 64        `True`  : `[Pipe]`
 65
 66    method: str, default 'registered'
 67        Available options: `['registered', 'explicit', 'all']`
 68        If `'registered'` (default), create pipes based on registered keys in the connector's pipes table
 69        (API or SQL connector, depends on mrsm_instance).
 70        If `'explicit'`, create pipes from provided connector_keys, metric_keys, and location_keys
 71        instead of consulting the pipes table. Useful for creating non-existent pipes.
 72        If `'all'`, create pipes from predefined metrics and locations. Required `connector_keys`.
 73        **NOTE:** Method `'all'` is not implemented!
 74
 75    **kw: Any:
 76        Keyword arguments to pass to the `meerschaum.Pipe` constructor.
 77        
 78
 79    Returns
 80    -------
 81    A dictionary of dictionaries and `meerschaum.Pipe` objects
 82    in the connector, metric, location hierarchy.
 83    If `as_list` is `True`, return a list of `meerschaum.Pipe` objects.
 84
 85    Examples
 86    --------
 87    ```
 88    >>> ### Manual definition:
 89    >>> pipes = {
 90    ...     <connector_keys>: {
 91    ...         <metric_key>: {
 92    ...             <location_key>: Pipe(
 93    ...                 <connector_keys>,
 94    ...                 <metric_key>,
 95    ...                 <location_key>,
 96    ...             ),
 97    ...         },
 98    ...     },
 99    ... },
100    >>> ### Accessing a single pipe:
101    >>> pipes['sql:main']['weather'][None]
102    >>> ### Return a list instead:
103    >>> get_pipes(as_list=True)
104    [sql_main_weather]
105    >>> 
106    ```
107    """
108
109    from meerschaum.config import get_config
110    from meerschaum.utils.warnings import error
111    from meerschaum.utils.misc import filter_keywords
112
113    if connector_keys is None:
114        connector_keys = []
115    if metric_keys is None:
116        metric_keys = []
117    if location_keys is None:
118        location_keys = []
119    if params is None:
120        params = {}
121    if tags is None:
122        tags = []
123
124    if isinstance(connector_keys, str):
125        connector_keys = [connector_keys]
126    if isinstance(metric_keys, str):
127        metric_keys = [metric_keys]
128    if isinstance(location_keys, str):
129        location_keys = [location_keys]
130
131    ### Get SQL or API connector (keys come from `connector.fetch_pipes_keys()`).
132    if mrsm_instance is None:
133        mrsm_instance = instance
134    if mrsm_instance is None:
135        mrsm_instance = get_config('meerschaum', 'instance', patch=True)
136    if isinstance(mrsm_instance, str):
137        from meerschaum.connectors.parse import parse_instance_keys
138        connector = parse_instance_keys(keys=mrsm_instance, debug=debug)
139    else:
140        from meerschaum.connectors import instance_types
141        valid_connector = False
142        if hasattr(mrsm_instance, 'type'):
143            if mrsm_instance.type in instance_types:
144                valid_connector = True
145        if not valid_connector:
146            error(f"Invalid instance connector: {mrsm_instance}")
147        connector = mrsm_instance
148    if debug:
149        from meerschaum.utils.debug import dprint
150        dprint(f"Using instance connector: {connector}")
151    if not connector:
152        error(f"Could not create connector from keys: '{mrsm_instance}'")
153
154    ### Get a list of tuples for the keys needed to build pipes.
155    result = fetch_pipes_keys(
156        method,
157        connector,
158        connector_keys = connector_keys,
159        metric_keys = metric_keys,
160        location_keys = location_keys,
161        tags = tags,
162        params = params,
163        debug = debug
164    )
165    if result is None:
166        error(f"Unable to build pipes!")
167
168    ### Populate the `pipes` dictionary with Pipes based on the keys
169    ### obtained from the chosen `method`.
170    from meerschaum import Pipe
171    pipes = {}
172    for ck, mk, lk in result:
173        if ck not in pipes:
174            pipes[ck] = {}
175
176        if mk not in pipes[ck]:
177            pipes[ck][mk] = {}
178
179        pipes[ck][mk][lk] = Pipe(
180            ck, mk, lk,
181            mrsm_instance = connector,
182            debug = debug,
183            **filter_keywords(Pipe, **kw)
184        )
185
186    if not as_list:
187        return pipes
188    from meerschaum.utils.misc import flatten_pipes_dict
189    return flatten_pipes_dict(pipes)

Return a dictionary or list of meerschaum.Pipe objects.

Parameters
  • connector_keys (Union[str, List[str], None], default None): String or list of connector keys. If omitted or is '*', fetch all possible keys. If a string begins with '_', select keys that do NOT match the string.
  • metric_keys (Union[str, List[str], None], default None): String or list of metric keys. See connector_keys for formatting.
  • location_keys (Union[str, List[str], None], default None): String or list of location keys. See connector_keys for formatting.
  • tags (Optional[List[str]], default None): If provided, only include pipes with these tags.
  • params (Optional[Dict[str, Any]], default None): Dictionary of additional parameters to search by. Params are parsed into a SQL WHERE clause. E.g. {'a': 1, 'b': 2} equates to 'WHERE a = 1 AND b = 2'
  • mrsm_instance (Union[str, InstanceConnector, None], default None): Connector keys for the Meerschaum instance of the pipes. Must be a meerschaum.connectors.sql.SQLConnector.SQLConnector or meerschaum.connectors.api.APIConnector.APIConnector.
  • as_list (bool, default False): If True, return pipes in a list instead of a hierarchical dictionary. False : {connector_keys: {metric_key: {location_key: Pipe}}} True : [Pipe]
  • method (str, default 'registered'): Available options: ['registered', 'explicit', 'all'] If 'registered' (default), create pipes based on registered keys in the connector's pipes table (API or SQL connector, depends on mrsm_instance). If 'explicit', create pipes from provided connector_keys, metric_keys, and location_keys instead of consulting the pipes table. Useful for creating non-existent pipes. If 'all', create pipes from predefined metrics and locations. Required connector_keys. NOTE: Method 'all' is not implemented!
  • **kw (Any:): Keyword arguments to pass to the meerschaum.Pipe constructor.
Returns
  • A dictionary of dictionaries and meerschaum.Pipe objects
  • in the connector, metric, location hierarchy.
  • If as_list is True, return a list of meerschaum.Pipe objects.
Examples
>>> ### Manual definition:
>>> pipes = {
...     <connector_keys>: {
...         <metric_key>: {
...             <location_key>: Pipe(
...                 <connector_keys>,
...                 <metric_key>,
...                 <location_key>,
...             ),
...         },
...     },
... },
>>> ### Accessing a single pipe:
>>> pipes['sql:main']['weather'][None]
>>> ### Return a list instead:
>>> get_pipes(as_list=True)
[sql_main_weather]
>>> 
def get_connector( type: str = None, label: str = None, refresh: bool = False, debug: bool = False, **kw: Any) -> Connector:
 80def get_connector(
 81    type: str = None,
 82    label: str = None,
 83    refresh: bool = False,
 84    debug: bool = False,
 85    **kw: Any
 86) -> Connector:
 87    """
 88    Return existing connector or create new connection and store for reuse.
 89    
 90    You can create new connectors if enough parameters are provided for the given type and flavor.
 91    
 92
 93    Parameters
 94    ----------
 95    type: Optional[str], default None
 96        Connector type (sql, api, etc.).
 97        Defaults to the type of the configured `instance_connector`.
 98
 99    label: Optional[str], default None
100        Connector label (e.g. main). Defaults to `'main'`.
101
102    refresh: bool, default False
103        Refresh the Connector instance / construct new object. Defaults to `False`.
104
105    kw: Any
106        Other arguments to pass to the Connector constructor.
107        If the Connector has already been constructed and new arguments are provided,
108        `refresh` is set to `True` and the old Connector is replaced.
109
110    Returns
111    -------
112    A new Meerschaum connector (e.g. `meerschaum.connectors.api.APIConnector`,
113    `meerschaum.connectors.sql.SQLConnector`).
114    
115    Examples
116    --------
117    The following parameters would create a new
118    `meerschaum.connectors.sql.SQLConnector` that isn't in the configuration file.
119
120    ```
121    >>> conn = get_connector(
122    ...     type = 'sql',
123    ...     label = 'newlabel',
124    ...     flavor = 'sqlite',
125    ...     database = '/file/path/to/database.db'
126    ... )
127    >>>
128    ```
129
130    """
131    from meerschaum.connectors.parse import parse_instance_keys
132    from meerschaum.config import get_config
133    from meerschaum.config.static import STATIC_CONFIG
134    from meerschaum.utils.warnings import warn
135    global _loaded_plugin_connectors
136    if isinstance(type, str) and not label and ':' in type:
137        type, label = type.split(':', maxsplit=1)
138
139    with _locks['_loaded_plugin_connectors']:
140        if not _loaded_plugin_connectors:
141            load_plugin_connectors()
142            _load_builtin_custom_connectors()
143            _loaded_plugin_connectors = True
144
145    if type is None and label is None:
146        default_instance_keys = get_config('meerschaum', 'instance', patch=True)
147        ### recursive call to get_connector
148        return parse_instance_keys(default_instance_keys)
149
150    ### NOTE: the default instance connector may not be main.
151    ### Only fall back to 'main' if the type is provided by the label is omitted.
152    label = label if label is not None else STATIC_CONFIG['connectors']['default_label']
153
154    ### type might actually be a label. Check if so and raise a warning.
155    if type not in connectors:
156        possibilities, poss_msg = [], ""
157        for _type in get_config('meerschaum', 'connectors'):
158            if type in get_config('meerschaum', 'connectors', _type):
159                possibilities.append(f"{_type}:{type}")
160        if len(possibilities) > 0:
161            poss_msg = " Did you mean"
162            for poss in possibilities[:-1]:
163                poss_msg += f" '{poss}',"
164            if poss_msg.endswith(','):
165                poss_msg = poss_msg[:-1]
166            if len(possibilities) > 1:
167                poss_msg += " or"
168            poss_msg += f" '{possibilities[-1]}'?"
169
170        warn(f"Cannot create Connector of type '{type}'." + poss_msg, stack=False)
171        return None
172
173    if 'sql' not in types:
174        from meerschaum.connectors.plugin import PluginConnector
175        from meerschaum.connectors.valkey import ValkeyConnector
176        with _locks['types']:
177            types.update({
178                'api': APIConnector,
179                'sql': SQLConnector,
180                'plugin': PluginConnector,
181                'valkey': ValkeyConnector,
182            })
183
184    ### determine if we need to call the constructor
185    if not refresh:
186        ### see if any user-supplied arguments differ from the existing instance
187        if label in connectors[type]:
188            warning_message = None
189            for attribute, value in kw.items():
190                if attribute not in connectors[type][label].meta:
191                    import inspect
192                    cls = connectors[type][label].__class__
193                    cls_init_signature = inspect.signature(cls)
194                    cls_init_params = cls_init_signature.parameters
195                    if attribute not in cls_init_params:
196                        warning_message = (
197                            f"Received new attribute '{attribute}' not present in connector " +
198                            f"{connectors[type][label]}.\n"
199                        )
200                elif connectors[type][label].__dict__[attribute] != value:
201                    warning_message = (
202                        f"Mismatched values for attribute '{attribute}' in connector "
203                        + f"'{connectors[type][label]}'.\n" +
204                        f"  - Keyword value: '{value}'\n" +
205                        f"  - Existing value: '{connectors[type][label].__dict__[attribute]}'\n"
206                    )
207            if warning_message is not None:
208                warning_message += (
209                    "\nSetting `refresh` to True and recreating connector with type:"
210                    + f" '{type}' and label '{label}'."
211                )
212                refresh = True
213                warn(warning_message)
214        else: ### connector doesn't yet exist
215            refresh = True
216
217    ### only create an object if refresh is True
218    ### (can be manually specified, otherwise determined above)
219    if refresh:
220        with _locks['connectors']:
221            try:
222                ### will raise an error if configuration is incorrect / missing
223                conn = types[type](label=label, **kw)
224                connectors[type][label] = conn
225            except InvalidAttributesError as ie:
226                warn(
227                    f"Incorrect attributes for connector '{type}:{label}'.\n"
228                    + str(ie),
229                    stack = False,
230                )
231                conn = None
232            except Exception as e:
233                from meerschaum.utils.formatting import get_console
234                console = get_console()
235                if console:
236                    console.print_exception()
237                warn(
238                    f"Exception when creating connector '{type}:{label}'.\n" + str(e),
239                    stack = False,
240                )
241                conn = None
242        if conn is None:
243            return None
244
245    return connectors[type][label]

Return existing connector or create new connection and store for reuse.

You can create new connectors if enough parameters are provided for the given type and flavor.

Parameters
  • type (Optional[str], default None): Connector type (sql, api, etc.). Defaults to the type of the configured instance_connector.
  • label (Optional[str], default None): Connector label (e.g. main). Defaults to 'main'.
  • refresh (bool, default False): Refresh the Connector instance / construct new object. Defaults to False.
  • kw (Any): Other arguments to pass to the Connector constructor. If the Connector has already been constructed and new arguments are provided, refresh is set to True and the old Connector is replaced.
Returns
Examples

The following parameters would create a new meerschaum.connectors.sql.SQLConnector that isn't in the configuration file.

>>> conn = get_connector(
...     type = 'sql',
...     label = 'newlabel',
...     flavor = 'sqlite',
...     database = '/file/path/to/database.db'
... )
>>>
def get_config( *keys: str, patch: bool = True, substitute: bool = True, sync_files: bool = True, write_missing: bool = True, as_tuple: bool = False, warn: bool = True, debug: bool = False) -> Any:
 82def get_config(
 83    *keys: str,
 84    patch: bool = True,
 85    substitute: bool = True,
 86    sync_files: bool = True,
 87    write_missing: bool = True,
 88    as_tuple: bool = False,
 89    warn: bool = True,
 90    debug: bool = False
 91) -> Any:
 92    """
 93    Return the Meerschaum configuration dictionary.
 94    If positional arguments are provided, index by the keys.
 95    Raises a warning if invalid keys are provided.
 96
 97    Parameters
 98    ----------
 99    keys: str:
100        List of strings to index.
101
102    patch: bool, default True
103        If `True`, patch missing default keys into the config directory.
104        Defaults to `True`.
105
106    sync_files: bool, default True
107        If `True`, sync files if needed.
108        Defaults to `True`.
109
110    write_missing: bool, default True
111        If `True`, write default values when the main config files are missing.
112        Defaults to `True`.
113
114    substitute: bool, default True
115        If `True`, subsitute 'MRSM{}' values.
116        Defaults to `True`.
117
118    as_tuple: bool, default False
119        If `True`, return a tuple of type (success, value).
120        Defaults to `False`.
121        
122    Returns
123    -------
124    The value in the configuration directory, indexed by the provided keys.
125
126    Examples
127    --------
128    >>> get_config('meerschaum', 'instance')
129    'sql:main'
130    >>> get_config('does', 'not', 'exist')
131    UserWarning: Invalid keys in config: ('does', 'not', 'exist')
132    """
133    import json
134
135    symlinks_key = STATIC_CONFIG['config']['symlinks_key']
136    if debug:
137        from meerschaum.utils.debug import dprint
138        dprint(f"Indexing keys: {keys}", color=False)
139
140    if len(keys) == 0:
141        _rc = _config(substitute=substitute, sync_files=sync_files, write_missing=write_missing)
142        if as_tuple:
143            return True, _rc 
144        return _rc
145    
146    ### Weird threading issues, only import if substitute is True.
147    if substitute:
148        from meerschaum.config._read_config import search_and_substitute_config
149    ### Invalidate the cache if it was read before with substitute=False
150    ### but there still exist substitutions.
151    if (
152        config is not None and substitute and keys[0] != symlinks_key
153        and 'MRSM{' in json.dumps(config.get(keys[0]))
154    ):
155        try:
156            _subbed = search_and_substitute_config({keys[0]: config[keys[0]]})
157        except Exception as e:
158            import traceback
159            traceback.print_exc()
160        config[keys[0]] = _subbed[keys[0]]
161        if symlinks_key in _subbed:
162            if symlinks_key not in config:
163                config[symlinks_key] = {}
164            if keys[0] not in config[symlinks_key]:
165                config[symlinks_key][keys[0]] = {}
166            config[symlinks_key][keys[0]] = apply_patch_to_config(
167                _subbed,
168                config[symlinks_key][keys[0]]
169            )
170
171    from meerschaum.config._sync import sync_files as _sync_files
172    if config is None:
173        _config(*keys, sync_files=sync_files)
174
175    invalid_keys = False
176    if keys[0] not in config and keys[0] != symlinks_key:
177        single_key_config = read_config(
178            keys=[keys[0]], substitute=substitute, write_missing=write_missing
179        )
180        if keys[0] not in single_key_config:
181            invalid_keys = True
182        else:
183            config[keys[0]] = single_key_config.get(keys[0], None)
184            if symlinks_key in single_key_config and keys[0] in single_key_config[symlinks_key]:
185                if symlinks_key not in config:
186                    config[symlinks_key] = {}
187                config[symlinks_key][keys[0]] = single_key_config[symlinks_key][keys[0]]
188
189            if sync_files:
190                _sync_files(keys=[keys[0]])
191
192    c = config
193    if len(keys) > 0:
194        for k in keys:
195            try:
196                c = c[k]
197            except Exception as e:
198                invalid_keys = True
199                break
200        if invalid_keys:
201            ### Check if the keys are in the default configuration.
202            from meerschaum.config._default import default_config
203            in_default = True
204            patched_default_config = (
205                search_and_substitute_config(default_config)
206                if substitute else copy.deepcopy(default_config)
207            )
208            _c = patched_default_config
209            for k in keys:
210                try:
211                    _c = _c[k]
212                except Exception as e:
213                    in_default = False
214            if in_default:
215                c = _c
216                invalid_keys = False
217            warning_msg = f"Invalid keys in config: {keys}"
218            if not in_default:
219                try:
220                    if warn:
221                        from meerschaum.utils.warnings import warn as _warn
222                        _warn(warning_msg, stacklevel=3, color=False)
223                except Exception as e:
224                    if warn:
225                        print(warning_msg)
226                if as_tuple:
227                    return False, None
228                return None
229
230            ### Don't write keys that we haven't yet loaded into memory.
231            not_loaded_keys = [k for k in patched_default_config if k not in config]
232            for k in not_loaded_keys:
233                patched_default_config.pop(k, None)
234
235            set_config(
236                apply_patch_to_config(
237                    patched_default_config,
238                    config,
239                )
240            )
241            if patch and keys[0] != symlinks_key:
242                if write_missing:
243                    write_config(config, debug=debug)
244
245    if as_tuple:
246        return (not invalid_keys), c
247    return c

Return the Meerschaum configuration dictionary. If positional arguments are provided, index by the keys. Raises a warning if invalid keys are provided.

Parameters
  • keys (str:): List of strings to index.
  • patch (bool, default True): If True, patch missing default keys into the config directory. Defaults to True.
  • sync_files (bool, default True): If True, sync files if needed. Defaults to True.
  • write_missing (bool, default True): If True, write default values when the main config files are missing. Defaults to True.
  • substitute (bool, default True): If True, subsitute 'MRSM{}' values. Defaults to True.
  • as_tuple (bool, default False): If True, return a tuple of type (success, value). Defaults to False.
Returns
  • The value in the configuration directory, indexed by the provided keys.
Examples
>>> get_config('meerschaum', 'instance')
'sql:main'
>>> get_config('does', 'not', 'exist')
UserWarning: Invalid keys in config: ('does', 'not', 'exist')
class Pipe:
 60class Pipe:
 61    """
 62    Access Meerschaum pipes via Pipe objects.
 63    
 64    Pipes are identified by the following:
 65
 66    1. Connector keys (e.g. `'sql:main'`)
 67    2. Metric key (e.g. `'weather'`)
 68    3. Location (optional; e.g. `None`)
 69    
 70    A pipe's connector keys correspond to a data source, and when the pipe is synced,
 71    its `fetch` definition is evaluated and executed to produce new data.
 72    
 73    Alternatively, new data may be directly synced via `pipe.sync()`:
 74    
 75    ```
 76    >>> from meerschaum import Pipe
 77    >>> pipe = Pipe('csv', 'weather')
 78    >>>
 79    >>> import pandas as pd
 80    >>> df = pd.read_csv('weather.csv')
 81    >>> pipe.sync(df)
 82    ```
 83    """
 84
 85    from ._fetch import (
 86        fetch,
 87        get_backtrack_interval,
 88    )
 89    from ._data import (
 90        get_data,
 91        get_backtrack_data,
 92        get_rowcount,
 93        _get_data_as_iterator,
 94        get_chunk_interval,
 95        get_chunk_bounds,
 96    )
 97    from ._register import register
 98    from ._attributes import (
 99        attributes,
100        parameters,
101        columns,
102        dtypes,
103        get_columns,
104        get_columns_types,
105        get_indices,
106        tags,
107        get_id,
108        id,
109        get_val_column,
110        parents,
111        children,
112        target,
113        _target_legacy,
114        guess_datetime,
115    )
116    from ._show import show
117    from ._edit import edit, edit_definition, update
118    from ._sync import (
119        sync,
120        get_sync_time,
121        exists,
122        filter_existing,
123        _get_chunk_label,
124        get_num_workers,
125        _persist_new_json_columns,
126        _persist_new_numeric_columns,
127        _persist_new_uuid_columns,
128    )
129    from ._verify import (
130        verify,
131        get_bound_interval,
132        get_bound_time,
133    )
134    from ._delete import delete
135    from ._drop import drop
136    from ._clear import clear
137    from ._deduplicate import deduplicate
138    from ._bootstrap import bootstrap
139    from ._dtypes import enforce_dtypes, infer_dtypes
140    from ._copy import copy_to
141
142    def __init__(
143        self,
144        connector: str = '',
145        metric: str = '',
146        location: Optional[str] = None,
147        parameters: Optional[Dict[str, Any]] = None,
148        columns: Union[Dict[str, str], List[str], None] = None,
149        tags: Optional[List[str]] = None,
150        target: Optional[str] = None,
151        dtypes: Optional[Dict[str, str]] = None,
152        instance: Optional[Union[str, InstanceConnector]] = None,
153        temporary: bool = False,
154        mrsm_instance: Optional[Union[str, InstanceConnector]] = None,
155        cache: bool = False,
156        debug: bool = False,
157        connector_keys: Optional[str] = None,
158        metric_key: Optional[str] = None,
159        location_key: Optional[str] = None,
160    ):
161        """
162        Parameters
163        ----------
164        connector: str
165            Keys for the pipe's source connector, e.g. `'sql:main'`.
166
167        metric: str
168            Label for the pipe's contents, e.g. `'weather'`.
169
170        location: str, default None
171            Label for the pipe's location. Defaults to `None`.
172
173        parameters: Optional[Dict[str, Any]], default None
174            Optionally set a pipe's parameters from the constructor,
175            e.g. columns and other attributes.
176            You can edit these parameters with `edit pipes`.
177
178        columns: Optional[Dict[str, str]], default None
179            Set the `columns` dictionary of `parameters`.
180            If `parameters` is also provided, this dictionary is added under the `'columns'` key.
181
182        tags: Optional[List[str]], default None
183            A list of strings to be added under the `'tags'` key of `parameters`.
184            You can select pipes with certain tags using `--tags`.
185
186        dtypes: Optional[Dict[str, str]], default None
187            Set the `dtypes` dictionary of `parameters`.
188            If `parameters` is also provided, this dictionary is added under the `'dtypes'` key.
189
190        mrsm_instance: Optional[Union[str, InstanceConnector]], default None
191            Connector for the Meerschaum instance where the pipe resides.
192            Defaults to the preconfigured default instance (`'sql:main'`).
193
194        instance: Optional[Union[str, InstanceConnector]], default None
195            Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored.
196
197        temporary: bool, default False
198            If `True`, prevent instance tables (pipes, users, plugins) from being created.
199
200        cache: bool, default False
201            If `True`, cache fetched data into a local database file.
202            Defaults to `False`.
203        """
204        from meerschaum.utils.warnings import error, warn
205        if (not connector and not connector_keys) or (not metric and not metric_key):
206            error(
207                "Please provide strings for the connector and metric\n    "
208                + "(first two positional arguments)."
209            )
210
211        ### Fall back to legacy `location_key` just in case.
212        if not location:
213            location = location_key
214
215        if not connector:
216            connector = connector_keys
217
218        if not metric:
219            metric = metric_key
220
221        if location in ('[None]', 'None'):
222            location = None
223
224        from meerschaum.config.static import STATIC_CONFIG
225        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
226        for k in (connector, metric, location, *(tags or [])):
227            if str(k).startswith(negation_prefix):
228                error(f"A pipe's keys and tags cannot start with the prefix '{negation_prefix}'.")
229
230        self.connector_keys = str(connector)
231        self.connector_key = self.connector_keys ### Alias
232        self.metric_key = metric
233        self.location_key = location
234        self.temporary = temporary
235
236        self._attributes = {
237            'connector_keys': self.connector_keys,
238            'metric_key': self.metric_key,
239            'location_key': self.location_key,
240            'parameters': {},
241        }
242
243        ### only set parameters if values are provided
244        if isinstance(parameters, dict):
245            self._attributes['parameters'] = parameters
246        else:
247            if parameters is not None:
248                warn(f"The provided parameters are of invalid type '{type(parameters)}'.")
249            self._attributes['parameters'] = {}
250
251        columns = columns or self._attributes.get('parameters', {}).get('columns', {})
252        if isinstance(columns, list):
253            columns = {str(col): str(col) for col in columns}
254        if isinstance(columns, dict):
255            self._attributes['parameters']['columns'] = columns
256        elif columns is not None:
257            warn(f"The provided columns are of invalid type '{type(columns)}'.")
258
259        if isinstance(tags, (list, tuple)):
260            self._attributes['parameters']['tags'] = tags
261        elif tags is not None:
262            warn(f"The provided tags are of invalid type '{type(tags)}'.")
263
264        if isinstance(target, str):
265            self._attributes['parameters']['target'] = target
266        elif target is not None:
267            warn(f"The provided target is of invalid type '{type(target)}'.")
268
269        if isinstance(dtypes, dict):
270            self._attributes['parameters']['dtypes'] = dtypes
271        elif dtypes is not None:
272            warn(f"The provided dtypes are of invalid type '{type(dtypes)}'.")
273
274        ### NOTE: The parameters dictionary is {} by default.
275        ###       A Pipe may be registered without parameters, then edited,
276        ###       or a Pipe may be registered with parameters set in-memory first.
277        #  from meerschaum.config import get_config
278        _mrsm_instance = mrsm_instance if mrsm_instance is not None else instance
279        if _mrsm_instance is None:
280            _mrsm_instance = get_config('meerschaum', 'instance', patch=True)
281
282        if not isinstance(_mrsm_instance, str):
283            self._instance_connector = _mrsm_instance
284            self.instance_keys = str(_mrsm_instance)
285        else: ### NOTE: must be SQL or API Connector for this work
286            self.instance_keys = _mrsm_instance
287
288        self._cache = cache and get_config('system', 'experimental', 'cache')
289
290
291    @property
292    def meta(self):
293        """
294        Return the four keys needed to reconstruct this pipe.
295        """
296        return {
297            'connector': self.connector_keys,
298            'metric': self.metric_key,
299            'location': self.location_key,
300            'instance': self.instance_keys,
301        }
302
303
304    def keys(self) -> List[str]:
305        """
306        Return the ordered keys for this pipe.
307        """
308        return {
309            key: val
310            for key, val in self.meta.items()
311            if key != 'instance'
312        }
313
314
315    @property
316    def instance_connector(self) -> Union[InstanceConnector, None]:
317        """
318        The connector to where this pipe resides.
319        May either be of type `meerschaum.connectors.sql.SQLConnector` or
320        `meerschaum.connectors.api.APIConnector`.
321        """
322        if '_instance_connector' not in self.__dict__:
323            from meerschaum.connectors.parse import parse_instance_keys
324            conn = parse_instance_keys(self.instance_keys)
325            if conn:
326                self._instance_connector = conn
327            else:
328                return None
329        return self._instance_connector
330
331    @property
332    def connector(self) -> Union[meerschaum.connectors.Connector, None]:
333        """
334        The connector to the data source.
335        """
336        if '_connector' not in self.__dict__:
337            from meerschaum.connectors.parse import parse_instance_keys
338            import warnings
339            with warnings.catch_warnings():
340                warnings.simplefilter('ignore')
341                try:
342                    conn = parse_instance_keys(self.connector_keys)
343                except Exception as e:
344                    conn = None
345            if conn:
346                self._connector = conn
347            else:
348                return None
349        return self._connector
350
351
352    @property
353    def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]:
354        """
355        If the pipe was created with `cache=True`, return the connector to the pipe's
356        SQLite database for caching.
357        """
358        if not self._cache:
359            return None
360
361        if '_cache_connector' not in self.__dict__:
362            from meerschaum.connectors import get_connector
363            from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH
364            _resources_path = SQLITE_RESOURCES_PATH
365            self._cache_connector = get_connector(
366                'sql', '_cache_' + str(self),
367                flavor='sqlite',
368                database=str(_resources_path / ('_cache_' + str(self) + '.db')),
369            )
370
371        return self._cache_connector
372
373
374    @property
375    def cache_pipe(self) -> Union['meerschaum.Pipe', None]:
376        """
377        If the pipe was created with `cache=True`, return another `meerschaum.Pipe` used to
378        manage the local data.
379        """
380        if self.cache_connector is None:
381            return None
382        if '_cache_pipe' not in self.__dict__:
383            from meerschaum.config._patch import apply_patch_to_config
384            from meerschaum.utils.sql import sql_item_name
385            _parameters = copy.deepcopy(self.parameters)
386            _fetch_patch = {
387                'fetch': ({
388                    'definition': (
389                        f"SELECT * FROM "
390                        + sql_item_name(
391                            str(self.target),
392                            self.instance_connector.flavor,
393                            self.instance_connector.get_pipe_schema(self),
394                        )
395                    ),
396                }) if self.instance_connector.type == 'sql' else ({
397                    'connector_keys': self.connector_keys,
398                    'metric_key': self.metric_key,
399                    'location_key': self.location_key,
400                })
401            }
402            _parameters = apply_patch_to_config(_parameters, _fetch_patch)
403            self._cache_pipe = Pipe(
404                self.instance_keys,
405                (self.connector_keys + '_' + self.metric_key + '_cache'),
406                self.location_key,
407                mrsm_instance = self.cache_connector,
408                parameters = _parameters,
409                cache = False,
410                temporary = True,
411            )
412
413        return self._cache_pipe
414
415
416    def __str__(self, ansi: bool=False):
417        return pipe_repr(self, ansi=ansi)
418
419
420    def __eq__(self, other):
421        try:
422            return (
423                isinstance(self, type(other))
424                and self.connector_keys == other.connector_keys
425                and self.metric_key == other.metric_key
426                and self.location_key == other.location_key
427                and self.instance_keys == other.instance_keys
428            )
429        except Exception as e:
430            return False
431
432    def __hash__(self):
433        ### Using an esoteric separator to avoid collisions.
434        sep = "[\"']"
435        return hash(
436            str(self.connector_keys) + sep
437            + str(self.metric_key) + sep
438            + str(self.location_key) + sep
439            + str(self.instance_keys) + sep
440        )
441
442    def __repr__(self, ansi: bool=True, **kw) -> str:
443        if not hasattr(sys, 'ps1'):
444            ansi = False
445
446        return pipe_repr(self, ansi=ansi, **kw)
447
448    def __pt_repr__(self):
449        from meerschaum.utils.packages import attempt_import
450        prompt_toolkit_formatted_text = attempt_import('prompt_toolkit.formatted_text', lazy=False)
451        return prompt_toolkit_formatted_text.ANSI(pipe_repr(self, ansi=True))
452
453    def __getstate__(self) -> Dict[str, Any]:
454        """
455        Define the state dictionary (pickling).
456        """
457        return {
458            'connector': self.connector_keys,
459            'metric': self.metric_key,
460            'location': self.location_key,
461            'parameters': self.parameters,
462            'instance': self.instance_keys,
463        }
464
465    def __setstate__(self, _state: Dict[str, Any]):
466        """
467        Read the state (unpickling).
468        """
469        self.__init__(**_state)
470
471
472    def __getitem__(self, key: str) -> Any:
473        """
474        Index the pipe's attributes.
475        If the `key` cannot be found`, return `None`.
476        """
477        if key in self.attributes:
478            return self.attributes.get(key, None)
479
480        aliases = {
481            'connector': 'connector_keys',
482            'connector_key': 'connector_keys',
483            'metric': 'metric_key',
484            'location': 'location_key',
485        }
486        aliased_key = aliases.get(key, None)
487        if aliased_key is not None:
488            return self.attributes.get(aliased_key, None)
489
490        property_aliases = {
491            'instance': 'instance_keys',
492            'instance_key': 'instance_keys',
493        }
494        aliased_key = property_aliases.get(key, None)
495        if aliased_key is not None:
496            key = aliased_key
497        return getattr(self, key, None)

Access Meerschaum pipes via Pipe objects.

Pipes are identified by the following:

  1. Connector keys (e.g. 'sql:main')
  2. Metric key (e.g. 'weather')
  3. Location (optional; e.g. None)

A pipe's connector keys correspond to a data source, and when the pipe is synced, its fetch definition is evaluated and executed to produce new data.

Alternatively, new data may be directly synced via pipe.sync():

>>> from meerschaum import Pipe
>>> pipe = Pipe('csv', 'weather')
>>>
>>> import pandas as pd
>>> df = pd.read_csv('weather.csv')
>>> pipe.sync(df)
Pipe( connector: str = '', metric: str = '', location: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None, columns: Union[Dict[str, str], List[str], NoneType] = None, tags: Optional[List[str]] = None, target: Optional[str] = None, dtypes: Optional[Dict[str, str]] = None, instance: Union[str, meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, temporary: bool = False, mrsm_instance: Union[str, meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, cache: bool = False, debug: bool = False, connector_keys: Optional[str] = None, metric_key: Optional[str] = None, location_key: Optional[str] = None)
142    def __init__(
143        self,
144        connector: str = '',
145        metric: str = '',
146        location: Optional[str] = None,
147        parameters: Optional[Dict[str, Any]] = None,
148        columns: Union[Dict[str, str], List[str], None] = None,
149        tags: Optional[List[str]] = None,
150        target: Optional[str] = None,
151        dtypes: Optional[Dict[str, str]] = None,
152        instance: Optional[Union[str, InstanceConnector]] = None,
153        temporary: bool = False,
154        mrsm_instance: Optional[Union[str, InstanceConnector]] = None,
155        cache: bool = False,
156        debug: bool = False,
157        connector_keys: Optional[str] = None,
158        metric_key: Optional[str] = None,
159        location_key: Optional[str] = None,
160    ):
161        """
162        Parameters
163        ----------
164        connector: str
165            Keys for the pipe's source connector, e.g. `'sql:main'`.
166
167        metric: str
168            Label for the pipe's contents, e.g. `'weather'`.
169
170        location: str, default None
171            Label for the pipe's location. Defaults to `None`.
172
173        parameters: Optional[Dict[str, Any]], default None
174            Optionally set a pipe's parameters from the constructor,
175            e.g. columns and other attributes.
176            You can edit these parameters with `edit pipes`.
177
178        columns: Optional[Dict[str, str]], default None
179            Set the `columns` dictionary of `parameters`.
180            If `parameters` is also provided, this dictionary is added under the `'columns'` key.
181
182        tags: Optional[List[str]], default None
183            A list of strings to be added under the `'tags'` key of `parameters`.
184            You can select pipes with certain tags using `--tags`.
185
186        dtypes: Optional[Dict[str, str]], default None
187            Set the `dtypes` dictionary of `parameters`.
188            If `parameters` is also provided, this dictionary is added under the `'dtypes'` key.
189
190        mrsm_instance: Optional[Union[str, InstanceConnector]], default None
191            Connector for the Meerschaum instance where the pipe resides.
192            Defaults to the preconfigured default instance (`'sql:main'`).
193
194        instance: Optional[Union[str, InstanceConnector]], default None
195            Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored.
196
197        temporary: bool, default False
198            If `True`, prevent instance tables (pipes, users, plugins) from being created.
199
200        cache: bool, default False
201            If `True`, cache fetched data into a local database file.
202            Defaults to `False`.
203        """
204        from meerschaum.utils.warnings import error, warn
205        if (not connector and not connector_keys) or (not metric and not metric_key):
206            error(
207                "Please provide strings for the connector and metric\n    "
208                + "(first two positional arguments)."
209            )
210
211        ### Fall back to legacy `location_key` just in case.
212        if not location:
213            location = location_key
214
215        if not connector:
216            connector = connector_keys
217
218        if not metric:
219            metric = metric_key
220
221        if location in ('[None]', 'None'):
222            location = None
223
224        from meerschaum.config.static import STATIC_CONFIG
225        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
226        for k in (connector, metric, location, *(tags or [])):
227            if str(k).startswith(negation_prefix):
228                error(f"A pipe's keys and tags cannot start with the prefix '{negation_prefix}'.")
229
230        self.connector_keys = str(connector)
231        self.connector_key = self.connector_keys ### Alias
232        self.metric_key = metric
233        self.location_key = location
234        self.temporary = temporary
235
236        self._attributes = {
237            'connector_keys': self.connector_keys,
238            'metric_key': self.metric_key,
239            'location_key': self.location_key,
240            'parameters': {},
241        }
242
243        ### only set parameters if values are provided
244        if isinstance(parameters, dict):
245            self._attributes['parameters'] = parameters
246        else:
247            if parameters is not None:
248                warn(f"The provided parameters are of invalid type '{type(parameters)}'.")
249            self._attributes['parameters'] = {}
250
251        columns = columns or self._attributes.get('parameters', {}).get('columns', {})
252        if isinstance(columns, list):
253            columns = {str(col): str(col) for col in columns}
254        if isinstance(columns, dict):
255            self._attributes['parameters']['columns'] = columns
256        elif columns is not None:
257            warn(f"The provided columns are of invalid type '{type(columns)}'.")
258
259        if isinstance(tags, (list, tuple)):
260            self._attributes['parameters']['tags'] = tags
261        elif tags is not None:
262            warn(f"The provided tags are of invalid type '{type(tags)}'.")
263
264        if isinstance(target, str):
265            self._attributes['parameters']['target'] = target
266        elif target is not None:
267            warn(f"The provided target is of invalid type '{type(target)}'.")
268
269        if isinstance(dtypes, dict):
270            self._attributes['parameters']['dtypes'] = dtypes
271        elif dtypes is not None:
272            warn(f"The provided dtypes are of invalid type '{type(dtypes)}'.")
273
274        ### NOTE: The parameters dictionary is {} by default.
275        ###       A Pipe may be registered without parameters, then edited,
276        ###       or a Pipe may be registered with parameters set in-memory first.
277        #  from meerschaum.config import get_config
278        _mrsm_instance = mrsm_instance if mrsm_instance is not None else instance
279        if _mrsm_instance is None:
280            _mrsm_instance = get_config('meerschaum', 'instance', patch=True)
281
282        if not isinstance(_mrsm_instance, str):
283            self._instance_connector = _mrsm_instance
284            self.instance_keys = str(_mrsm_instance)
285        else: ### NOTE: must be SQL or API Connector for this work
286            self.instance_keys = _mrsm_instance
287
288        self._cache = cache and get_config('system', 'experimental', 'cache')
Parameters
  • connector (str): Keys for the pipe's source connector, e.g. 'sql:main'.
  • metric (str): Label for the pipe's contents, e.g. 'weather'.
  • location (str, default None): Label for the pipe's location. Defaults to None.
  • parameters (Optional[Dict[str, Any]], default None): Optionally set a pipe's parameters from the constructor, e.g. columns and other attributes. You can edit these parameters with edit pipes.
  • columns (Optional[Dict[str, str]], default None): Set the columns dictionary of parameters. If parameters is also provided, this dictionary is added under the 'columns' key.
  • tags (Optional[List[str]], default None): A list of strings to be added under the 'tags' key of parameters. You can select pipes with certain tags using --tags.
  • dtypes (Optional[Dict[str, str]], default None): Set the dtypes dictionary of parameters. If parameters is also provided, this dictionary is added under the 'dtypes' key.
  • mrsm_instance (Optional[Union[str, InstanceConnector]], default None): Connector for the Meerschaum instance where the pipe resides. Defaults to the preconfigured default instance ('sql:main').
  • instance (Optional[Union[str, InstanceConnector]], default None): Alias for mrsm_instance. If mrsm_instance is supplied, this value is ignored.
  • temporary (bool, default False): If True, prevent instance tables (pipes, users, plugins) from being created.
  • cache (bool, default False): If True, cache fetched data into a local database file. Defaults to False.
connector_keys
connector_key
metric_key
location_key
temporary
meta
291    @property
292    def meta(self):
293        """
294        Return the four keys needed to reconstruct this pipe.
295        """
296        return {
297            'connector': self.connector_keys,
298            'metric': self.metric_key,
299            'location': self.location_key,
300            'instance': self.instance_keys,
301        }

Return the four keys needed to reconstruct this pipe.

def keys(self) -> List[str]:
304    def keys(self) -> List[str]:
305        """
306        Return the ordered keys for this pipe.
307        """
308        return {
309            key: val
310            for key, val in self.meta.items()
311            if key != 'instance'
312        }

Return the ordered keys for this pipe.

instance_connector: Union[meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType]
315    @property
316    def instance_connector(self) -> Union[InstanceConnector, None]:
317        """
318        The connector to where this pipe resides.
319        May either be of type `meerschaum.connectors.sql.SQLConnector` or
320        `meerschaum.connectors.api.APIConnector`.
321        """
322        if '_instance_connector' not in self.__dict__:
323            from meerschaum.connectors.parse import parse_instance_keys
324            conn = parse_instance_keys(self.instance_keys)
325            if conn:
326                self._instance_connector = conn
327            else:
328                return None
329        return self._instance_connector

The connector to where this pipe resides. May either be of type meerschaum.connectors.sql.SQLConnector or meerschaum.connectors.api.APIConnector.

connector: Optional[Connector]
331    @property
332    def connector(self) -> Union[meerschaum.connectors.Connector, None]:
333        """
334        The connector to the data source.
335        """
336        if '_connector' not in self.__dict__:
337            from meerschaum.connectors.parse import parse_instance_keys
338            import warnings
339            with warnings.catch_warnings():
340                warnings.simplefilter('ignore')
341                try:
342                    conn = parse_instance_keys(self.connector_keys)
343                except Exception as e:
344                    conn = None
345            if conn:
346                self._connector = conn
347            else:
348                return None
349        return self._connector

The connector to the data source.

cache_connector: Optional[meerschaum.connectors.SQLConnector]
352    @property
353    def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]:
354        """
355        If the pipe was created with `cache=True`, return the connector to the pipe's
356        SQLite database for caching.
357        """
358        if not self._cache:
359            return None
360
361        if '_cache_connector' not in self.__dict__:
362            from meerschaum.connectors import get_connector
363            from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH
364            _resources_path = SQLITE_RESOURCES_PATH
365            self._cache_connector = get_connector(
366                'sql', '_cache_' + str(self),
367                flavor='sqlite',
368                database=str(_resources_path / ('_cache_' + str(self) + '.db')),
369            )
370
371        return self._cache_connector

If the pipe was created with cache=True, return the connector to the pipe's SQLite database for caching.

cache_pipe: Optional[Pipe]
374    @property
375    def cache_pipe(self) -> Union['meerschaum.Pipe', None]:
376        """
377        If the pipe was created with `cache=True`, return another `meerschaum.Pipe` used to
378        manage the local data.
379        """
380        if self.cache_connector is None:
381            return None
382        if '_cache_pipe' not in self.__dict__:
383            from meerschaum.config._patch import apply_patch_to_config
384            from meerschaum.utils.sql import sql_item_name
385            _parameters = copy.deepcopy(self.parameters)
386            _fetch_patch = {
387                'fetch': ({
388                    'definition': (
389                        f"SELECT * FROM "
390                        + sql_item_name(
391                            str(self.target),
392                            self.instance_connector.flavor,
393                            self.instance_connector.get_pipe_schema(self),
394                        )
395                    ),
396                }) if self.instance_connector.type == 'sql' else ({
397                    'connector_keys': self.connector_keys,
398                    'metric_key': self.metric_key,
399                    'location_key': self.location_key,
400                })
401            }
402            _parameters = apply_patch_to_config(_parameters, _fetch_patch)
403            self._cache_pipe = Pipe(
404                self.instance_keys,
405                (self.connector_keys + '_' + self.metric_key + '_cache'),
406                self.location_key,
407                mrsm_instance = self.cache_connector,
408                parameters = _parameters,
409                cache = False,
410                temporary = True,
411            )
412
413        return self._cache_pipe

If the pipe was created with cache=True, return another meerschaum.Pipe used to manage the local data.

def fetch( self, begin: Union[datetime.datetime, str, NoneType] = '', end: Optional[datetime.datetime] = None, check_existing: bool = True, sync_chunks: bool = False, debug: bool = False, **kw: Any) -> Union[pandas.core.frame.DataFrame, Iterator[pandas.core.frame.DataFrame], NoneType]:
21def fetch(
22        self,
23        begin: Union[datetime, str, None] = '',
24        end: Optional[datetime] = None,
25        check_existing: bool = True,
26        sync_chunks: bool = False,
27        debug: bool = False,
28        **kw: Any
29    ) -> Union['pd.DataFrame', Iterator['pd.DataFrame'], None]:
30    """
31    Fetch a Pipe's latest data from its connector.
32
33    Parameters
34    ----------
35    begin: Union[datetime, str, None], default '':
36        If provided, only fetch data newer than or equal to `begin`.
37
38    end: Optional[datetime], default None:
39        If provided, only fetch data older than or equal to `end`.
40
41    check_existing: bool, default True
42        If `False`, do not apply the backtrack interval.
43
44    sync_chunks: bool, default False
45        If `True` and the pipe's connector is of type `'sql'`, begin syncing chunks while fetching
46        loads chunks into memory.
47
48    debug: bool, default False
49        Verbosity toggle.
50
51    Returns
52    -------
53    A `pd.DataFrame` of the newest unseen data.
54
55    """
56    if 'fetch' not in dir(self.connector):
57        warn(f"No `fetch()` function defined for connector '{self.connector}'")
58        return None
59
60    from meerschaum.connectors import custom_types, get_connector_plugin
61    from meerschaum.utils.debug import dprint, _checkpoint
62    from meerschaum.utils.misc import filter_arguments
63
64    _chunk_hook = kw.pop('chunk_hook', None)
65    kw['workers'] = self.get_num_workers(kw.get('workers', None))
66    if sync_chunks and _chunk_hook is None:
67
68        def _chunk_hook(chunk, **_kw) -> SuccessTuple:
69            """
70            Wrap `Pipe.sync()` with a custom chunk label prepended to the message.
71            """
72            from meerschaum.config._patch import apply_patch_to_config
73            kwargs = apply_patch_to_config(kw, _kw)
74            chunk_success, chunk_message = self.sync(chunk, **kwargs)
75            chunk_label = self._get_chunk_label(chunk, self.columns.get('datetime', None))
76            if chunk_label:
77                chunk_message = '\n' + chunk_label + '\n' + chunk_message
78            return chunk_success, chunk_message
79
80    with mrsm.Venv(get_connector_plugin(self.connector)):
81        _args, _kwargs = filter_arguments(
82            self.connector.fetch,
83            self,
84            begin=_determine_begin(
85                self,
86                begin,
87                check_existing=check_existing,
88                debug=debug,
89            ),
90            end=end,
91            chunk_hook=_chunk_hook,
92            debug=debug,
93            **kw
94        )
95        df = self.connector.fetch(*_args, **_kwargs)
96    return df

Fetch a Pipe's latest data from its connector.

Parameters
  • begin (Union[datetime, str, None], default '':): If provided, only fetch data newer than or equal to begin.
  • end (Optional[datetime], default None:): If provided, only fetch data older than or equal to end.
  • check_existing (bool, default True): If False, do not apply the backtrack interval.
  • sync_chunks (bool, default False): If True and the pipe's connector is of type 'sql', begin syncing chunks while fetching loads chunks into memory.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pd.DataFrame of the newest unseen data.
def get_backtrack_interval( self, check_existing: bool = True, debug: bool = False) -> Union[datetime.timedelta, int]:
 99def get_backtrack_interval(
100    self,
101    check_existing: bool = True,
102    debug: bool = False,
103) -> Union[timedelta, int]:
104    """
105    Get the chunk interval to use for this pipe.
106
107    Parameters
108    ----------
109    check_existing: bool, default True
110        If `False`, return a backtrack_interval of 0 minutes.
111
112    Returns
113    -------
114    The backtrack interval (`timedelta` or `int`) to use with this pipe's `datetime` axis.
115    """
116    default_backtrack_minutes = get_config('pipes', 'parameters', 'fetch', 'backtrack_minutes')
117    configured_backtrack_minutes = self.parameters.get('fetch', {}).get('backtrack_minutes', None)
118    backtrack_minutes = (
119        configured_backtrack_minutes
120        if configured_backtrack_minutes is not None
121        else default_backtrack_minutes
122    ) if check_existing else 0
123
124    backtrack_interval = timedelta(minutes=backtrack_minutes)
125    dt_col = self.columns.get('datetime', None)
126    if dt_col is None:
127        return backtrack_interval
128
129    dt_dtype = self.dtypes.get(dt_col, 'datetime64[ns]')
130    if 'int' in dt_dtype.lower():
131        return backtrack_minutes
132
133    return backtrack_interval

Get the chunk interval to use for this pipe.

Parameters
  • check_existing (bool, default True): If False, return a backtrack_interval of 0 minutes.
Returns
  • The backtrack interval (timedelta or int) to use with this pipe's datetime axis.
def get_data( self, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, as_iterator: bool = False, as_chunks: bool = False, as_dask: bool = False, chunk_interval: Union[datetime.timedelta, int, NoneType] = None, order: Optional[str] = 'asc', limit: Optional[int] = None, fresh: bool = False, debug: bool = False, **kw: Any) -> Union[pandas.core.frame.DataFrame, Iterator[pandas.core.frame.DataFrame], NoneType]:
 23def get_data(
 24    self,
 25    select_columns: Optional[List[str]] = None,
 26    omit_columns: Optional[List[str]] = None,
 27    begin: Union[datetime, int, None] = None,
 28    end: Union[datetime, int, None] = None,
 29    params: Optional[Dict[str, Any]] = None,
 30    as_iterator: bool = False,
 31    as_chunks: bool = False,
 32    as_dask: bool = False,
 33    chunk_interval: Union[timedelta, int, None] = None,
 34    order: Optional[str] = 'asc',
 35    limit: Optional[int] = None,
 36    fresh: bool = False,
 37    debug: bool = False,
 38    **kw: Any
 39) -> Union['pd.DataFrame', Iterator['pd.DataFrame'], None]:
 40    """
 41    Get a pipe's data from the instance connector.
 42
 43    Parameters
 44    ----------
 45    select_columns: Optional[List[str]], default None
 46        If provided, only select these given columns.
 47        Otherwise select all available columns (i.e. `SELECT *`).
 48
 49    omit_columns: Optional[List[str]], default None
 50        If provided, remove these columns from the selection.
 51
 52    begin: Union[datetime, int, None], default None
 53        Lower bound datetime to begin searching for data (inclusive).
 54        Translates to a `WHERE` clause like `WHERE datetime >= begin`.
 55        Defaults to `None`.
 56
 57    end: Union[datetime, int, None], default None
 58        Upper bound datetime to stop searching for data (inclusive).
 59        Translates to a `WHERE` clause like `WHERE datetime < end`.
 60        Defaults to `None`.
 61
 62    params: Optional[Dict[str, Any]], default None
 63        Filter the retrieved data by a dictionary of parameters.
 64        See `meerschaum.utils.sql.build_where` for more details. 
 65
 66    as_iterator: bool, default False
 67        If `True`, return a generator of chunks of pipe data.
 68
 69    as_chunks: bool, default False
 70        Alias for `as_iterator`.
 71
 72    as_dask: bool, default False
 73        If `True`, return a `dask.DataFrame`
 74        (which may be loaded into a Pandas DataFrame with `df.compute()`).
 75
 76    chunk_interval: Union[timedelta, int, None], default None
 77        If `as_iterator`, then return chunks with `begin` and `end` separated by this interval.
 78        This may be set under `pipe.parameters['chunk_minutes']`.
 79        By default, use a timedelta of 1440 minutes (1 day).
 80        If `chunk_interval` is an integer and the `datetime` axis a timestamp,
 81        the use a timedelta with the number of minutes configured to this value.
 82        If the `datetime` axis is an integer, default to the configured chunksize.
 83        If `chunk_interval` is a `timedelta` and the `datetime` axis an integer,
 84        use the number of minutes in the `timedelta`.
 85
 86    order: Optional[str], default 'asc'
 87        If `order` is not `None`, sort the resulting dataframe by indices.
 88
 89    limit: Optional[int], default None
 90        If provided, cap the dataframe to this many rows.
 91
 92    fresh: bool, default True
 93        If `True`, skip local cache and directly query the instance connector.
 94        Defaults to `True`.
 95
 96    debug: bool, default False
 97        Verbosity toggle.
 98        Defaults to `False`.
 99
100    Returns
101    -------
102    A `pd.DataFrame` for the pipe's data corresponding to the provided parameters.
103
104    """
105    from meerschaum.utils.warnings import warn
106    from meerschaum.utils.venv import Venv
107    from meerschaum.connectors import get_connector_plugin
108    from meerschaum.utils.misc import iterate_chunks, items_str
109    from meerschaum.utils.dtypes import to_pandas_dtype
110    from meerschaum.utils.dataframe import add_missing_cols_to_df, df_is_chunk_generator
111    from meerschaum.utils.packages import attempt_import
112    dd = attempt_import('dask.dataframe') if as_dask else None
113    dask = attempt_import('dask') if as_dask else None
114
115    if select_columns == '*':
116        select_columns = None
117    elif isinstance(select_columns, str):
118        select_columns = [select_columns]
119
120    if isinstance(omit_columns, str):
121        omit_columns = [omit_columns]
122
123    as_iterator = as_iterator or as_chunks
124
125    def _sort_df(_df):
126        if df_is_chunk_generator(_df):
127            return _df
128        dt_col = self.columns.get('datetime', None)
129        indices = [] if dt_col not in _df.columns else [dt_col]
130        non_dt_cols = [
131            col
132            for col_ix, col in self.columns.items()
133            if col_ix != 'datetime' and col in _df.columns
134        ]
135        indices.extend(non_dt_cols)
136        if 'dask' not in _df.__module__:
137            _df.sort_values(
138                by=indices,
139                inplace=True,
140                ascending=(str(order).lower() == 'asc'),
141            )
142            _df.reset_index(drop=True, inplace=True)
143        else:
144            _df = _df.sort_values(
145                by=indices,
146                ascending=(str(order).lower() == 'asc'),
147            )
148            _df = _df.reset_index(drop=True)
149        if limit is not None and len(_df) > limit:
150            return _df.head(limit)
151        return _df
152
153    if as_iterator or as_chunks:
154        df = self._get_data_as_iterator(
155            select_columns=select_columns,
156            omit_columns=omit_columns,
157            begin=begin,
158            end=end,
159            params=params,
160            chunk_interval=chunk_interval,
161            limit=limit,
162            order=order,
163            fresh=fresh,
164            debug=debug,
165        )
166        return _sort_df(df)
167
168    if as_dask:
169        from multiprocessing.pool import ThreadPool
170        dask_pool = ThreadPool(self.get_num_workers())
171        dask.config.set(pool=dask_pool)
172        chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug)
173        bounds = self.get_chunk_bounds(
174            begin=begin,
175            end=end,
176            bounded=False,
177            chunk_interval=chunk_interval,
178            debug=debug,
179        )
180        dask_chunks = [
181            dask.delayed(self.get_data)(
182                select_columns=select_columns,
183                omit_columns=omit_columns,
184                begin=chunk_begin,
185                end=chunk_end,
186                params=params,
187                chunk_interval=chunk_interval,
188                order=order,
189                limit=limit,
190                fresh=fresh,
191                debug=debug,
192            )
193            for (chunk_begin, chunk_end) in bounds
194        ]
195        dask_meta = {
196            col: to_pandas_dtype(typ)
197            for col, typ in self.dtypes.items()
198        }
199        return _sort_df(dd.from_delayed(dask_chunks, meta=dask_meta))
200
201    if not self.exists(debug=debug):
202        return None
203
204    if self.cache_pipe is not None:
205        if not fresh:
206            _sync_cache_tuple = self.cache_pipe.sync(
207                begin=begin,
208                end=end,
209                params=params,
210                debug=debug,
211                **kw
212            )
213            if not _sync_cache_tuple[0]:
214                warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1])
215                fresh = True
216            else: ### Successfully synced cache.
217                return self.enforce_dtypes(
218                    self.cache_pipe.get_data(
219                        select_columns=select_columns,
220                        omit_columns=omit_columns,
221                        begin=begin,
222                        end=end,
223                        params=params,
224                        order=order,
225                        limit=limit,
226                        debug=debug,
227                        fresh=True,
228                        **kw
229                    ),
230                    debug=debug,
231                )
232
233    with Venv(get_connector_plugin(self.instance_connector)):
234        df = self.instance_connector.get_pipe_data(
235            pipe=self,
236            select_columns=select_columns,
237            omit_columns=omit_columns,
238            begin=begin,
239            end=end,
240            params=params,
241            limit=limit,
242            order=order,
243            debug=debug,
244            **kw
245        )
246        if df is None:
247            return df
248
249        if not select_columns:
250            select_columns = [col for col in df.columns]
251
252        cols_to_omit = [
253            col
254            for col in df.columns
255            if (
256                col in (omit_columns or [])
257                or
258                col not in (select_columns or [])
259            )
260        ]
261        cols_to_add = [
262            col
263            for col in select_columns
264            if col not in df.columns
265        ]
266        if cols_to_omit:
267            warn(
268                (
269                    f"Received {len(cols_to_omit)} omitted column"
270                    + ('s' if len(cols_to_omit) != 1 else '')
271                    + f" for {self}. "
272                    + "Consider adding `select_columns` and `omit_columns` support to "
273                    + f"'{self.instance_connector.type}' connectors to improve performance."
274                ),
275                stack=False,
276            )
277            _cols_to_select = [col for col in df.columns if col not in cols_to_omit]
278            df = df[_cols_to_select]
279
280        if cols_to_add:
281            warn(
282                (
283                    f"Specified columns {items_str(cols_to_add)} were not found on {self}. "
284                    + "Adding these to the DataFrame as null columns."
285                ),
286                stack=False,
287            )
288            df = add_missing_cols_to_df(df, {col: 'string' for col in cols_to_add})
289
290        enforced_df = self.enforce_dtypes(df, debug=debug)
291
292        if order:
293            return _sort_df(enforced_df)
294        return enforced_df

Get a pipe's data from the instance connector.

Parameters
  • select_columns (Optional[List[str]], default None): If provided, only select these given columns. Otherwise select all available columns (i.e. SELECT *).
  • omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
  • begin (Union[datetime, int, None], default None): Lower bound datetime to begin searching for data (inclusive). Translates to a WHERE clause like WHERE datetime >= begin. Defaults to None.
  • end (Union[datetime, int, None], default None): Upper bound datetime to stop searching for data (inclusive). Translates to a WHERE clause like WHERE datetime < end. Defaults to None.
  • params (Optional[Dict[str, Any]], default None): Filter the retrieved data by a dictionary of parameters. See meerschaum.utils.sql.build_where for more details.
  • as_iterator (bool, default False): If True, return a generator of chunks of pipe data.
  • as_chunks (bool, default False): Alias for as_iterator.
  • as_dask (bool, default False): If True, return a dask.DataFrame (which may be loaded into a Pandas DataFrame with df.compute()).
  • chunk_interval (Union[timedelta, int, None], default None): If as_iterator, then return chunks with begin and end separated by this interval. This may be set under pipe.parameters['chunk_minutes']. By default, use a timedelta of 1440 minutes (1 day). If chunk_interval is an integer and the datetime axis a timestamp, the use a timedelta with the number of minutes configured to this value. If the datetime axis is an integer, default to the configured chunksize. If chunk_interval is a timedelta and the datetime axis an integer, use the number of minutes in the timedelta.
  • order (Optional[str], default 'asc'): If order is not None, sort the resulting dataframe by indices.
  • limit (Optional[int], default None): If provided, cap the dataframe to this many rows.
  • fresh (bool, default True): If True, skip local cache and directly query the instance connector. Defaults to True.
  • debug (bool, default False): Verbosity toggle. Defaults to False.
Returns
  • A pd.DataFrame for the pipe's data corresponding to the provided parameters.
def get_backtrack_data( self, backtrack_minutes: Optional[int] = None, begin: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, limit: Optional[int] = None, fresh: bool = False, debug: bool = False, **kw: Any) -> Optional[pandas.core.frame.DataFrame]:
395def get_backtrack_data(
396    self,
397    backtrack_minutes: Optional[int] = None,
398    begin: Union[datetime, int, None] = None,
399    params: Optional[Dict[str, Any]] = None,
400    limit: Optional[int] = None,
401    fresh: bool = False,
402    debug: bool = False,
403    **kw: Any
404) -> Optional['pd.DataFrame']:
405    """
406    Get the most recent data from the instance connector as a Pandas DataFrame.
407
408    Parameters
409    ----------
410    backtrack_minutes: Optional[int], default None
411        How many minutes from `begin` to select from.
412        If `None`, use `pipe.parameters['fetch']['backtrack_minutes']`.
413
414    begin: Optional[datetime], default None
415        The starting point to search for data.
416        If begin is `None` (default), use the most recent observed datetime
417        (AKA sync_time).
418
419        ```
420        E.g. begin = 02:00
421
422        Search this region.           Ignore this, even if there's data.
423        /  /  /  /  /  /  /  /  /  |
424        -----|----------|----------|----------|----------|----------|
425        00:00      01:00      02:00      03:00      04:00      05:00
426
427        ```
428
429    params: Optional[Dict[str, Any]], default None
430        The standard Meerschaum `params` query dictionary.
431
432    limit: Optional[int], default None
433        If provided, cap the number of rows to be returned.
434
435    fresh: bool, default False
436        If `True`, Ignore local cache and pull directly from the instance connector.
437        Only comes into effect if a pipe was created with `cache=True`.
438
439    debug: bool default False
440        Verbosity toggle.
441
442    Returns
443    -------
444    A `pd.DataFrame` for the pipe's data corresponding to the provided parameters. Backtrack data
445    is a convenient way to get a pipe's data "backtracked" from the most recent datetime.
446    """
447    from meerschaum.utils.warnings import warn
448    from meerschaum.utils.venv import Venv
449    from meerschaum.connectors import get_connector_plugin
450
451    if not self.exists(debug=debug):
452        return None
453
454    backtrack_interval = self.get_backtrack_interval(debug=debug)
455    if backtrack_minutes is None:
456        backtrack_minutes = (
457            (backtrack_interval.total_seconds() * 60)
458            if isinstance(backtrack_interval, timedelta)
459            else backtrack_interval
460        )
461
462    if self.cache_pipe is not None:
463        if not fresh:
464            _sync_cache_tuple = self.cache_pipe.sync(begin=begin, params=params, debug=debug, **kw)
465            if not _sync_cache_tuple[0]:
466                warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1])
467                fresh = True
468            else: ### Successfully synced cache.
469                return self.enforce_dtypes(
470                    self.cache_pipe.get_backtrack_data(
471                        fresh=True,
472                        begin=begin,
473                        backtrack_minutes=backtrack_minutes,
474                        params=params,
475                        limit=limit,
476                        order=kw.get('order', 'desc'),
477                        debug=debug,
478                        **kw
479                    ),
480                    debug=debug,
481                )
482
483    if hasattr(self.instance_connector, 'get_backtrack_data'):
484        with Venv(get_connector_plugin(self.instance_connector)):
485            return self.enforce_dtypes(
486                self.instance_connector.get_backtrack_data(
487                    pipe=self,
488                    begin=begin,
489                    backtrack_minutes=backtrack_minutes,
490                    params=params,
491                    limit=limit,
492                    debug=debug,
493                    **kw
494                ),
495                debug=debug,
496            )
497
498    if begin is None:
499        begin = self.get_sync_time(params=params, debug=debug)
500
501    backtrack_interval = (
502        timedelta(minutes=backtrack_minutes)
503        if isinstance(begin, datetime)
504        else backtrack_minutes
505    )
506    if begin is not None:
507        begin = begin - backtrack_interval
508
509    return self.get_data(
510        begin=begin,
511        params=params,
512        debug=debug,
513        limit=limit,
514        order=kw.get('order', 'desc'),
515        **kw
516    )

Get the most recent data from the instance connector as a Pandas DataFrame.

Parameters
  • backtrack_minutes (Optional[int], default None): How many minutes from begin to select from. If None, use pipe.parameters['fetch']['backtrack_minutes'].
  • begin (Optional[datetime], default None): The starting point to search for data. If begin is None (default), use the most recent observed datetime (AKA sync_time).

    E.g. begin = 02:00
    
    Search this region.           Ignore this, even if there's data.
    /  /  /  /  /  /  /  /  /  |
    -----|----------|----------|----------|----------|----------|
    00:00      01:00      02:00      03:00      04:00      05:00
    
    
  • params (Optional[Dict[str, Any]], default None): The standard Meerschaum params query dictionary.

  • limit (Optional[int], default None): If provided, cap the number of rows to be returned.
  • fresh (bool, default False): If True, Ignore local cache and pull directly from the instance connector. Only comes into effect if a pipe was created with cache=True.
  • debug (bool default False): Verbosity toggle.
Returns
  • A pd.DataFrame for the pipe's data corresponding to the provided parameters. Backtrack data
  • is a convenient way to get a pipe's data "backtracked" from the most recent datetime.
def get_rowcount( self, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, remote: bool = False, debug: bool = False) -> int:
519def get_rowcount(
520    self,
521    begin: Union[datetime, int, None] = None,
522    end: Union[datetime, int, None] = None,
523    params: Optional[Dict[str, Any]] = None,
524    remote: bool = False,
525    debug: bool = False
526) -> int:
527    """
528    Get a Pipe's instance or remote rowcount.
529
530    Parameters
531    ----------
532    begin: Optional[datetime], default None
533        Count rows where datetime > begin.
534
535    end: Optional[datetime], default None
536        Count rows where datetime < end.
537
538    remote: bool, default False
539        Count rows from a pipe's remote source.
540        **NOTE**: This is experimental!
541
542    debug: bool, default False
543        Verbosity toggle.
544
545    Returns
546    -------
547    An `int` of the number of rows in the pipe corresponding to the provided parameters.
548    Returned 0 if the pipe does not exist.
549    """
550    from meerschaum.utils.warnings import warn
551    from meerschaum.utils.venv import Venv
552    from meerschaum.connectors import get_connector_plugin
553
554    connector = self.instance_connector if not remote else self.connector
555    try:
556        with Venv(get_connector_plugin(connector)):
557            rowcount = connector.get_pipe_rowcount(
558                self,
559                begin=begin,
560                end=end,
561                params=params,
562                remote=remote,
563                debug=debug,
564            )
565            if rowcount is None:
566                return 0
567            return rowcount
568    except AttributeError as e:
569        warn(e)
570        if remote:
571            return 0
572    warn(f"Failed to get a rowcount for {self}.")
573    return 0

Get a Pipe's instance or remote rowcount.

Parameters
  • begin (Optional[datetime], default None): Count rows where datetime > begin.
  • end (Optional[datetime], default None): Count rows where datetime < end.
  • remote (bool, default False): Count rows from a pipe's remote source. NOTE: This is experimental!
  • debug (bool, default False): Verbosity toggle.
Returns
  • An int of the number of rows in the pipe corresponding to the provided parameters.
  • Returned 0 if the pipe does not exist.
def get_chunk_interval( self, chunk_interval: Union[datetime.timedelta, int, NoneType] = None, debug: bool = False) -> Union[datetime.timedelta, int]:
576def get_chunk_interval(
577    self,
578    chunk_interval: Union[timedelta, int, None] = None,
579    debug: bool = False,
580) -> Union[timedelta, int]:
581    """
582    Get the chunk interval to use for this pipe.
583
584    Parameters
585    ----------
586    chunk_interval: Union[timedelta, int, None], default None
587        If provided, coerce this value into the correct type.
588        For example, if the datetime axis is an integer, then
589        return the number of minutes.
590
591    Returns
592    -------
593    The chunk interval (`timedelta` or `int`) to use with this pipe's `datetime` axis.
594    """
595    default_chunk_minutes = get_config('pipes', 'parameters', 'verify', 'chunk_minutes')
596    configured_chunk_minutes = self.parameters.get('verify', {}).get('chunk_minutes', None)
597    chunk_minutes = (
598        (configured_chunk_minutes or default_chunk_minutes)
599        if chunk_interval is None
600        else (
601            chunk_interval
602            if isinstance(chunk_interval, int)
603            else int(chunk_interval.total_seconds() / 60)
604        )
605    )
606
607    dt_col = self.columns.get('datetime', None)
608    if dt_col is None:
609        return timedelta(minutes=chunk_minutes)
610
611    dt_dtype = self.dtypes.get(dt_col, 'datetime64[ns]')
612    if 'int' in dt_dtype.lower():
613        return chunk_minutes
614    return timedelta(minutes=chunk_minutes)

Get the chunk interval to use for this pipe.

Parameters
  • chunk_interval (Union[timedelta, int, None], default None): If provided, coerce this value into the correct type. For example, if the datetime axis is an integer, then return the number of minutes.
Returns
  • The chunk interval (timedelta or int) to use with this pipe's datetime axis.
def get_chunk_bounds( self, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, bounded: bool = False, chunk_interval: Union[datetime.timedelta, int, NoneType] = None, debug: bool = False) -> List[Tuple[Union[datetime.datetime, int, NoneType], Union[datetime.datetime, int, NoneType]]]:
617def get_chunk_bounds(
618    self,
619    begin: Union[datetime, int, None] = None,
620    end: Union[datetime, int, None] = None,
621    bounded: bool = False,
622    chunk_interval: Union[timedelta, int, None] = None,
623    debug: bool = False,
624) -> List[
625    Tuple[
626        Union[datetime, int, None],
627        Union[datetime, int, None],
628    ]
629]:
630    """
631    Return a list of datetime bounds for iterating over the pipe's `datetime` axis.
632
633    Parameters
634    ----------
635    begin: Union[datetime, int, None], default None
636        If provided, do not select less than this value.
637        Otherwise the first chunk will be unbounded.
638
639    end: Union[datetime, int, None], default None
640        If provided, do not select greater than or equal to this value.
641        Otherwise the last chunk will be unbounded.
642
643    bounded: bool, default False
644        If `True`, do not include `None` in the first chunk.
645
646    chunk_interval: Union[timedelta, int, None], default None
647        If provided, use this interval for the size of chunk boundaries.
648        The default value for this pipe may be set
649        under `pipe.parameters['verify']['chunk_minutes']`.
650
651    debug: bool, default False
652        Verbosity toggle.
653
654    Returns
655    -------
656    A list of chunk bounds (datetimes or integers).
657    If unbounded, the first and last chunks will include `None`.
658    """
659    include_less_than_begin = not bounded and begin is None
660    include_greater_than_end = not bounded and end is None
661    if begin is None:
662        begin = self.get_sync_time(newest=False, debug=debug)
663    if end is None:
664        end = self.get_sync_time(newest=True, debug=debug)
665    if begin is None and end is None:
666        return [(None, None)]
667
668    ### Set the chunk interval under `pipe.parameters['verify']['chunk_minutes']`.
669    chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug)
670    
671    ### Build a list of tuples containing the chunk boundaries
672    ### so that we can sync multiple chunks in parallel.
673    ### Run `verify pipes --workers 1` to sync chunks in series.
674    chunk_bounds = []
675    begin_cursor = begin
676    while begin_cursor < end:
677        end_cursor = begin_cursor + chunk_interval
678        chunk_bounds.append((begin_cursor, end_cursor))
679        begin_cursor = end_cursor
680
681    ### The chunk interval might be too large.
682    if not chunk_bounds and end >= begin:
683        chunk_bounds = [(begin, end)]
684
685    ### Truncate the last chunk to the end timestamp.
686    if chunk_bounds[-1][1] > end:
687        chunk_bounds[-1] = (chunk_bounds[-1][0], end)
688
689    ### Pop the last chunk if its bounds are equal.
690    if chunk_bounds[-1][0] == chunk_bounds[-1][1]:
691        chunk_bounds = chunk_bounds[:-1]
692
693    if include_less_than_begin:
694        chunk_bounds = [(None, begin)] + chunk_bounds
695    if include_greater_than_end:
696        chunk_bounds = chunk_bounds + [(end, None)]
697
698    return chunk_bounds

Return a list of datetime bounds for iterating over the pipe's datetime axis.

Parameters
  • begin (Union[datetime, int, None], default None): If provided, do not select less than this value. Otherwise the first chunk will be unbounded.
  • end (Union[datetime, int, None], default None): If provided, do not select greater than or equal to this value. Otherwise the last chunk will be unbounded.
  • bounded (bool, default False): If True, do not include None in the first chunk.
  • chunk_interval (Union[timedelta, int, None], default None): If provided, use this interval for the size of chunk boundaries. The default value for this pipe may be set under pipe.parameters['verify']['chunk_minutes'].
  • debug (bool, default False): Verbosity toggle.
Returns
  • A list of chunk bounds (datetimes or integers).
  • If unbounded, the first and last chunks will include None.
def register(self, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
12def register(
13        self,
14        debug: bool = False,
15        **kw: Any
16    ) -> SuccessTuple:
17    """
18    Register a new Pipe along with its attributes.
19
20    Parameters
21    ----------
22    debug: bool, default False
23        Verbosity toggle.
24
25    kw: Any
26        Keyword arguments to pass to `instance_connector.register_pipe()`.
27
28    Returns
29    -------
30    A `SuccessTuple` of success, message.
31    """
32    if self.temporary:
33        return False, "Cannot register pipes created with `temporary=True` (read-only)."
34
35    from meerschaum.utils.formatting import get_console
36    from meerschaum.utils.venv import Venv
37    from meerschaum.connectors import get_connector_plugin, custom_types
38    from meerschaum.config._patch import apply_patch_to_config
39
40    import warnings
41    with warnings.catch_warnings():
42        warnings.simplefilter('ignore')
43        try:
44            _conn = self.connector
45        except Exception as e:
46            _conn = None
47
48    if (
49        _conn is not None
50        and
51        (_conn.type == 'plugin' or _conn.type in custom_types)
52        and
53        getattr(_conn, 'register', None) is not None
54    ):
55        try:
56            with Venv(get_connector_plugin(_conn), debug=debug):
57                params = self.connector.register(self)
58        except Exception as e:
59            get_console().print_exception()
60            params = None
61        params = {} if params is None else params
62        if not isinstance(params, dict):
63            from meerschaum.utils.warnings import warn
64            warn(
65                f"Invalid parameters returned from `register()` in connector {self.connector}:\n"
66                + f"{params}"
67            )
68        else:
69            self.parameters = apply_patch_to_config(params, self.parameters)
70
71    if not self.parameters:
72        cols = self.columns if self.columns else {'datetime': None, 'id': None}
73        self.parameters = {
74            'columns': cols,
75        }
76
77    with Venv(get_connector_plugin(self.instance_connector)):
78        return self.instance_connector.register_pipe(self, debug=debug, **kw)

Register a new Pipe along with its attributes.

Parameters
  • debug (bool, default False): Verbosity toggle.
  • kw (Any): Keyword arguments to pass to instance_connector.register_pipe().
Returns
attributes: Dict[str, Any]
14@property
15def attributes(self) -> Dict[str, Any]:
16    """
17    Return a dictionary of a pipe's keys and parameters.
18    These values are reflected directly from the pipes table of the instance.
19    """
20    import time
21    from meerschaum.config import get_config
22    from meerschaum.config._patch import apply_patch_to_config
23    from meerschaum.utils.venv import Venv
24    from meerschaum.connectors import get_connector_plugin
25
26    timeout_seconds = get_config('pipes', 'attributes', 'local_cache_timeout_seconds')
27
28    if '_attributes' not in self.__dict__:
29        self._attributes = {}
30
31    now = time.perf_counter()
32    last_refresh = self.__dict__.get('_attributes_sync_time', None)
33    timed_out = (
34        last_refresh is None
35        or
36        (timeout_seconds is not None and (now - last_refresh) >= timeout_seconds)
37    )
38    if not self.temporary and timed_out:
39        self._attributes_sync_time = now
40        local_attributes = self.__dict__.get('_attributes', {})
41        with Venv(get_connector_plugin(self.instance_connector)):
42            instance_attributes = self.instance_connector.get_pipe_attributes(self)
43        self._attributes = apply_patch_to_config(instance_attributes, local_attributes)
44    return self._attributes

Return a dictionary of a pipe's keys and parameters. These values are reflected directly from the pipes table of the instance.

parameters: Optional[Dict[str, Any]]
47@property
48def parameters(self) -> Optional[Dict[str, Any]]:
49    """
50    Return the parameters dictionary of the pipe.
51    """
52    if 'parameters' not in self.attributes:
53        self.attributes['parameters'] = {}
54    return self.attributes['parameters']

Return the parameters dictionary of the pipe.

columns: Optional[Dict[str, str]]
66@property
67def columns(self) -> Union[Dict[str, str], None]:
68    """
69    Return the `columns` dictionary defined in `meerschaum.Pipe.parameters`.
70    """
71    if 'columns' not in self.parameters:
72        self.parameters['columns'] = {}
73    cols = self.parameters['columns']
74    if not isinstance(cols, dict):
75        cols = {}
76        self.parameters['columns'] = cols
77    return cols

Return the columns dictionary defined in meerschaum.Pipe.parameters.

dtypes: Optional[Dict[str, Any]]
117@property
118def dtypes(self) -> Union[Dict[str, Any], None]:
119    """
120    If defined, return the `dtypes` dictionary defined in `meerschaum.Pipe.parameters`.
121    """
122    from meerschaum.config._patch import apply_patch_to_config
123    configured_dtypes = self.parameters.get('dtypes', {})
124    remote_dtypes = self.infer_dtypes(persist=False)
125    patched_dtypes = apply_patch_to_config(remote_dtypes, configured_dtypes)
126    return patched_dtypes

If defined, return the dtypes dictionary defined in meerschaum.Pipe.parameters.

def get_columns(self, *args: str, error: bool = False) -> Union[str, Tuple[str]]:
138def get_columns(self, *args: str, error: bool = False) -> Union[str, Tuple[str]]:
139    """
140    Check if the requested columns are defined.
141
142    Parameters
143    ----------
144    *args: str
145        The column names to be retrieved.
146        
147    error: bool, default False
148        If `True`, raise an `Exception` if the specified column is not defined.
149
150    Returns
151    -------
152    A tuple of the same size of `args` or a `str` if `args` is a single argument.
153
154    Examples
155    --------
156    >>> pipe = mrsm.Pipe('test', 'test')
157    >>> pipe.columns = {'datetime': 'dt', 'id': 'id'}
158    >>> pipe.get_columns('datetime', 'id')
159    ('dt', 'id')
160    >>> pipe.get_columns('value', error=True)
161    Exception:  🛑 Missing 'value' column for Pipe('test', 'test').
162    """
163    from meerschaum.utils.warnings import error as _error, warn
164    if not args:
165        args = tuple(self.columns.keys())
166    col_names = []
167    for col in args:
168        col_name = None
169        try:
170            col_name = self.columns[col]
171            if col_name is None and error:
172                _error(f"Please define the name of the '{col}' column for {self}.")
173        except Exception as e:
174            col_name = None
175        if col_name is None and error:
176            _error(f"Missing '{col}'" + f" column for {self}.")
177        col_names.append(col_name)
178    if len(col_names) == 1:
179        return col_names[0]
180    return tuple(col_names)

Check if the requested columns are defined.

Parameters
  • *args (str): The column names to be retrieved.
  • error (bool, default False): If True, raise an Exception if the specified column is not defined.
Returns
  • A tuple of the same size of args or a str if args is a single argument.
Examples
>>> pipe = mrsm.Pipe('test', 'test')
>>> pipe.columns = {'datetime': 'dt', 'id': 'id'}
>>> pipe.get_columns('datetime', 'id')
('dt', 'id')
>>> pipe.get_columns('value', error=True)
Exception:  🛑 Missing 'value' column for Pipe('test', 'test').
def get_columns_types(self, debug: bool = False) -> Optional[Dict[str, str]]:
183def get_columns_types(self, debug: bool = False) -> Union[Dict[str, str], None]:
184    """
185    Get a dictionary of a pipe's column names and their types.
186
187    Parameters
188    ----------
189    debug: bool, default False:
190        Verbosity toggle.
191
192    Returns
193    -------
194    A dictionary of column names (`str`) to column types (`str`).
195
196    Examples
197    --------
198    >>> pipe.get_columns_types()
199    {
200      'dt': 'TIMESTAMP WITHOUT TIMEZONE',
201      'id': 'BIGINT',
202      'val': 'DOUBLE PRECISION',
203    }
204    >>>
205    """
206    from meerschaum.utils.venv import Venv
207    from meerschaum.connectors import get_connector_plugin
208
209    with Venv(get_connector_plugin(self.instance_connector)):
210        return self.instance_connector.get_pipe_columns_types(self, debug=debug)

Get a dictionary of a pipe's column names and their types.

Parameters
  • debug (bool, default False:): Verbosity toggle.
Returns
  • A dictionary of column names (str) to column types (str).
Examples
>>> pipe.get_columns_types()
{
  'dt': 'TIMESTAMP WITHOUT TIMEZONE',
  'id': 'BIGINT',
  'val': 'DOUBLE PRECISION',
}
>>>
def get_indices(self) -> Dict[str, str]:
435def get_indices(self) -> Dict[str, str]:
436    """
437    Return a dictionary in the form of `pipe.columns` but map to index names.
438    """
439    return {
440        ix: (self.target + '_' + col + '_index')
441        for ix, col in self.columns.items() if col
442    }

Return a dictionary in the form of pipe.columns but map to index names.

tags: Optional[List[str]]
92@property
93def tags(self) -> Union[List[str], None]:
94    """
95    If defined, return the `tags` list defined in `meerschaum.Pipe.parameters`.
96    """
97    if 'tags' not in self.parameters:
98        self.parameters['tags'] = []
99    return self.parameters['tags']

If defined, return the tags list defined in meerschaum.Pipe.parameters.

def get_id(self, **kw: Any) -> Optional[int]:
213def get_id(self, **kw: Any) -> Union[int, None]:
214    """
215    Fetch a pipe's ID from its instance connector.
216    If the pipe does not exist, return `None`.
217    """
218    if self.temporary:
219        return None
220    from meerschaum.utils.venv import Venv
221    from meerschaum.connectors import get_connector_plugin
222
223    with Venv(get_connector_plugin(self.instance_connector)):
224        return self.instance_connector.get_pipe_id(self, **kw)

Fetch a pipe's ID from its instance connector. If the pipe does not exist, return None.

id: Optional[int]
227@property
228def id(self) -> Union[int, None]:
229    """
230    Fetch and cache a pipe's ID.
231    """
232    if not ('_id' in self.__dict__ and self._id):
233        self._id = self.get_id()
234    return self._id

Fetch and cache a pipe's ID.

def get_val_column(self, debug: bool = False) -> Optional[str]:
237def get_val_column(self, debug: bool = False) -> Union[str, None]:
238    """
239    Return the name of the value column if it's defined, otherwise make an educated guess.
240    If not set in the `columns` dictionary, return the first numeric column that is not
241    an ID or datetime column.
242    If none may be found, return `None`.
243
244    Parameters
245    ----------
246    debug: bool, default False:
247        Verbosity toggle.
248
249    Returns
250    -------
251    Either a string or `None`.
252    """
253    from meerschaum.utils.debug import dprint
254    if debug:
255        dprint('Attempting to determine the value column...')
256    try:
257        val_name = self.get_columns('value')
258    except Exception as e:
259        val_name = None
260    if val_name is not None:
261        if debug:
262            dprint(f"Value column: {val_name}")
263        return val_name
264
265    cols = self.columns
266    if cols is None:
267        if debug:
268            dprint('No columns could be determined. Returning...')
269        return None
270    try:
271        dt_name = self.get_columns('datetime', error=False)
272    except Exception as e:
273        dt_name = None
274    try:
275        id_name = self.get_columns('id', errors=False)
276    except Exception as e:
277        id_name = None
278
279    if debug:
280        dprint(f"dt_name: {dt_name}")
281        dprint(f"id_name: {id_name}")
282
283    cols_types = self.get_columns_types(debug=debug)
284    if cols_types is None:
285        return None
286    if debug:
287        dprint(f"cols_types: {cols_types}")
288    if dt_name is not None:
289        cols_types.pop(dt_name, None)
290    if id_name is not None:
291        cols_types.pop(id_name, None)
292
293    candidates = []
294    candidate_keywords = {'float', 'double', 'precision', 'int', 'numeric',}
295    for search_term in candidate_keywords:
296        for col, typ in cols_types.items():
297            if search_term in typ.lower():
298                candidates.append(col)
299                break
300    if not candidates:
301        if debug:
302            dprint("No value column could be determined.")
303        return None
304
305    return candidates[0]

Return the name of the value column if it's defined, otherwise make an educated guess. If not set in the columns dictionary, return the first numeric column that is not an ID or datetime column. If none may be found, return None.

Parameters
  • debug (bool, default False:): Verbosity toggle.
Returns
  • Either a string or None.
parents: List[Pipe]
308@property
309def parents(self) -> List[meerschaum.Pipe]:
310    """
311    Return a list of `meerschaum.Pipe` objects to be designated as parents.
312    """
313    if 'parents' not in self.parameters:
314        return []
315    from meerschaum.utils.warnings import warn
316    _parents_keys = self.parameters['parents']
317    if not isinstance(_parents_keys, list):
318        warn(
319            f"Please ensure the parents for {self} are defined as a list of keys.",
320            stacklevel = 4
321        )
322        return []
323    from meerschaum import Pipe
324    _parents = []
325    for keys in _parents_keys:
326        try:
327            p = Pipe(**keys)
328        except Exception as e:
329            warn(f"Unable to build parent with keys '{keys}' for {self}:\n{e}")
330            continue
331        _parents.append(p)
332    return _parents

Return a list of meerschaum.Pipe objects to be designated as parents.

children: List[Pipe]
335@property
336def children(self) -> List[meerschaum.Pipe]:
337    """
338    Return a list of `meerschaum.Pipe` objects to be designated as children.
339    """
340    if 'children' not in self.parameters:
341        return []
342    from meerschaum.utils.warnings import warn
343    _children_keys = self.parameters['children']
344    if not isinstance(_children_keys, list):
345        warn(
346            f"Please ensure the children for {self} are defined as a list of keys.",
347            stacklevel = 4
348        )
349        return []
350    from meerschaum import Pipe
351    _children = []
352    for keys in _children_keys:
353        try:
354            p = Pipe(**keys)
355        except Exception as e:
356            warn(f"Unable to build parent with keys '{keys}' for {self}:\n{e}")
357            continue
358        _children.append(p)
359    return _children

Return a list of meerschaum.Pipe objects to be designated as children.

target: str
362@property
363def target(self) -> str:
364    """
365    The target table name.
366    You can set the target name under on of the following keys
367    (checked in this order):
368      - `target`
369      - `target_name`
370      - `target_table`
371      - `target_table_name`
372    """
373    if 'target' not in self.parameters:
374        target = self._target_legacy()
375        potential_keys = ('target_name', 'target_table', 'target_table_name')
376        for k in potential_keys:
377            if k in self.parameters:
378                target = self.parameters[k]
379                break
380
381        if self.instance_connector.type == 'sql':
382            from meerschaum.utils.sql import truncate_item_name
383            truncated_target = truncate_item_name(target, self.instance_connector.flavor)
384            if truncated_target != target:
385                warn(
386                    f"The target '{target}' is too long for '{self.instance_connector.flavor}', "
387                    + f"will use {truncated_target} instead."
388                )
389                target = truncated_target
390
391        self.target = target
392    return self.parameters['target']

The target table name. You can set the target name under on of the following keys (checked in this order):

  • target
  • target_name
  • target_table
  • target_table_name
def guess_datetime(self) -> Optional[str]:
415def guess_datetime(self) -> Union[str, None]:
416    """
417    Try to determine a pipe's datetime column.
418    """
419    dtypes = self.dtypes
420
421    ### Abort if the user explictly disallows a datetime index.
422    if 'datetime' in dtypes:
423        if dtypes['datetime'] is None:
424            return None
425
426    dt_cols = [
427        col for col, typ in self.dtypes.items()
428        if str(typ).startswith('datetime')
429    ]
430    if not dt_cols:
431        return None
432    return dt_cols[0]    

Try to determine a pipe's datetime column.

def show( self, nopretty: bool = False, debug: bool = False, **kw) -> Tuple[bool, str]:
12def show(
13        self,
14        nopretty: bool = False,
15        debug: bool = False,
16        **kw
17    ) -> SuccessTuple:
18    """
19    Show attributes of a Pipe.
20
21    Parameters
22    ----------
23    nopretty: bool, default False
24        If `True`, simply print the JSON of the pipe's attributes.
25
26    debug: bool, default False
27        Verbosity toggle.
28
29    Returns
30    -------
31    A `SuccessTuple` of success, message.
32
33    """
34    import json
35    from meerschaum.utils.formatting import (
36        pprint, make_header, ANSI, highlight_pipes, fill_ansi, get_console,
37    )
38    from meerschaum.utils.packages import import_rich, attempt_import
39    from meerschaum.utils.warnings import info
40    attributes_json = json.dumps(self.attributes)
41    if not nopretty:
42        _to_print = f"Attributes for {self}:"
43        if ANSI:
44            _to_print = fill_ansi(highlight_pipes(make_header(_to_print)), 'magenta')
45            print(_to_print)
46            rich = import_rich()
47            rich_json = attempt_import('rich.json')
48            get_console().print(rich_json.JSON(attributes_json))
49        else:
50            print(_to_print)
51    else:
52        print(attributes_json)
53
54    return True, "Success"

Show attributes of a Pipe.

Parameters
  • nopretty (bool, default False): If True, simply print the JSON of the pipe's attributes.
  • debug (bool, default False): Verbosity toggle.
Returns
def edit( self, patch: bool = False, interactive: bool = False, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
21def edit(
22    self,
23    patch: bool = False,
24    interactive: bool = False,
25    debug: bool = False,
26    **kw: Any
27) -> SuccessTuple:
28    """
29    Edit a Pipe's configuration.
30
31    Parameters
32    ----------
33    patch: bool, default False
34        If `patch` is True, update parameters by cascading rather than overwriting.
35    interactive: bool, default False
36        If `True`, open an editor for the user to make changes to the pipe's YAML file.
37    debug: bool, default False
38        Verbosity toggle.
39
40    Returns
41    -------
42    A `SuccessTuple` of success, message.
43
44    """
45    from meerschaum.utils.venv import Venv
46    from meerschaum.connectors import get_connector_plugin
47
48    if self.temporary:
49        return False, "Cannot edit pipes created with `temporary=True` (read-only)."
50
51    if not interactive:
52        with Venv(get_connector_plugin(self.instance_connector)):
53            return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw)
54
55    from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
56    from meerschaum.utils.misc import edit_file
57    parameters_filename = str(self) + '.yaml'
58    parameters_path = PIPES_CACHE_RESOURCES_PATH / parameters_filename
59
60    from meerschaum.utils.yaml import yaml
61
62    edit_text = f"Edit the parameters for {self}"
63    edit_top = '#' * (len(edit_text) + 4)
64    edit_header = edit_top + f'\n# {edit_text} #\n' + edit_top + '\n\n'
65
66    from meerschaum.config import get_config
67    parameters = dict(get_config('pipes', 'parameters', patch=True))
68    from meerschaum.config._patch import apply_patch_to_config
69    parameters = apply_patch_to_config(parameters, self.parameters)
70
71    ### write parameters to yaml file
72    with open(parameters_path, 'w+') as f:
73        f.write(edit_header)
74        yaml.dump(parameters, stream=f, sort_keys=False)
75
76    ### only quit editing if yaml is valid
77    editing = True
78    while editing:
79        edit_file(parameters_path)
80        try:
81            with open(parameters_path, 'r') as f:
82                file_parameters = yaml.load(f.read())
83        except Exception as e:
84            from meerschaum.utils.warnings import warn
85            warn(f"Invalid format defined for '{self}':\n\n{e}")
86            input(f"Press [Enter] to correct the configuration for '{self}': ")
87        else:
88            editing = False
89
90    self.parameters = file_parameters
91
92    if debug:
93        from meerschaum.utils.formatting import pprint
94        pprint(self.parameters)
95
96    with Venv(get_connector_plugin(self.instance_connector)):
97        return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw)

Edit a Pipe's configuration.

Parameters
  • patch (bool, default False): If patch is True, update parameters by cascading rather than overwriting.
  • interactive (bool, default False): If True, open an editor for the user to make changes to the pipe's YAML file.
  • debug (bool, default False): Verbosity toggle.
Returns
def edit_definition( self, yes: bool = False, noask: bool = False, force: bool = False, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
100def edit_definition(
101    self,
102    yes: bool = False,
103    noask: bool = False,
104    force: bool = False,
105    debug : bool = False,
106    **kw : Any
107) -> SuccessTuple:
108    """
109    Edit a pipe's definition file and update its configuration.
110    **NOTE:** This function is interactive and should not be used in automated scripts!
111
112    Returns
113    -------
114    A `SuccessTuple` of success, message.
115
116    """
117    if self.temporary:
118        return False, "Cannot edit pipes created with `temporary=True` (read-only)."
119
120    from meerschaum.connectors import instance_types
121    if (self.connector is None) or self.connector.type not in instance_types:
122        return self.edit(interactive=True, debug=debug, **kw)
123
124    import json
125    from meerschaum.utils.warnings import info, warn
126    from meerschaum.utils.debug import dprint
127    from meerschaum.config._patch import apply_patch_to_config
128    from meerschaum.utils.misc import edit_file
129
130    _parameters = self.parameters
131    if 'fetch' not in _parameters:
132        _parameters['fetch'] = {}
133
134    def _edit_api():
135        from meerschaum.utils.prompt import prompt, yes_no
136        info(
137            f"Please enter the keys of the source pipe from '{self.connector}'.\n" +
138            "Type 'None' for None, or empty when there is no default. Press [CTRL+C] to skip."
139        )
140
141        _keys = { 'connector_keys' : None, 'metric_key' : None, 'location_key' : None }
142        for k in _keys:
143            _keys[k] = _parameters['fetch'].get(k, None)
144
145        for k, v in _keys.items():
146            try:
147                _keys[k] = prompt(k.capitalize().replace('_', ' ') + ':', icon=True, default=v)
148            except KeyboardInterrupt:
149                continue
150            if _keys[k] in ('', 'None', '\'None\'', '[None]'):
151                _keys[k] = None
152
153        _parameters['fetch'] = apply_patch_to_config(_parameters['fetch'], _keys)
154
155        info("You may optionally specify additional filter parameters as JSON.")
156        print("  Parameters are translated into a 'WHERE x AND y' clause, and lists are IN clauses.")
157        print("  For example, the following JSON would correspond to 'WHERE x = 1 AND y IN (2, 3)':")
158        print(json.dumps({'x': 1, 'y': [2, 3]}, indent=2, separators=(',', ': ')))
159        if force or yes_no(
160            "Would you like to add additional filter parameters?",
161            yes=yes, noask=noask
162        ):
163            from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
164            definition_filename = str(self) + '.json'
165            definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename
166            try:
167                definition_path.touch()
168                with open(definition_path, 'w+') as f:
169                    json.dump(_parameters.get('fetch', {}).get('params', {}), f, indent=2)
170            except Exception as e:
171                return False, f"Failed writing file '{definition_path}':\n" + str(e)
172
173            _params = None
174            while True:
175                edit_file(definition_path)
176                try:
177                    with open(definition_path, 'r') as f:
178                        _params = json.load(f)
179                except Exception as e:
180                    warn(f'Failed to read parameters JSON:\n{e}', stack=False)
181                    if force or yes_no(
182                        "Would you like to try again?\n  "
183                        + "If not, the parameters JSON file will be ignored.",
184                        noask=noask, yes=yes
185                    ):
186                        continue
187                    _params = None
188                break
189            if _params is not None:
190                if 'fetch' not in _parameters:
191                    _parameters['fetch'] = {}
192                _parameters['fetch']['params'] = _params
193
194        self.parameters = _parameters
195        return True, "Success"
196
197    def _edit_sql():
198        import pathlib, os, textwrap
199        from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
200        from meerschaum.utils.misc import edit_file
201        definition_filename = str(self) + '.sql'
202        definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename
203
204        sql_definition = _parameters['fetch'].get('definition', None)
205        if sql_definition is None:
206            sql_definition = ''
207        sql_definition = textwrap.dedent(sql_definition).lstrip()
208
209        try:
210            definition_path.touch()
211            with open(definition_path, 'w+') as f:
212                f.write(sql_definition)
213        except Exception as e:
214            return False, f"Failed writing file '{definition_path}':\n" + str(e)
215
216        edit_file(definition_path)
217        try:
218            with open(definition_path, 'r') as f:
219                file_definition = f.read()
220        except Exception as e:
221            return False, f"Failed reading file '{definition_path}':\n" + str(e)
222
223        if sql_definition == file_definition:
224            return False, f"No changes made to definition for {self}."
225
226        if ' ' not in file_definition:
227            return False, f"Invalid SQL definition for {self}."
228
229        if debug:
230            dprint("Read SQL definition:\n\n" + file_definition)
231        _parameters['fetch']['definition'] = file_definition
232        self.parameters = _parameters
233        return True, "Success"
234
235    locals()['_edit_' + str(self.connector.type)]()
236    return self.edit(interactive=False, debug=debug, **kw)

Edit a pipe's definition file and update its configuration. NOTE: This function is interactive and should not be used in automated scripts!

Returns
def update(self, *args, **kw) -> Tuple[bool, str]:
13def update(self, *args, **kw) -> SuccessTuple:
14    """
15    Update a pipe's parameters in its instance.
16    """
17    kw['interactive'] = False
18    return self.edit(*args, **kw)

Update a pipe's parameters in its instance.

def sync( self, df: Union[pandas.core.frame.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]], meerschaum.core.Pipe._sync.InferFetch] = <class 'meerschaum.core.Pipe._sync.InferFetch'>, begin: Union[datetime.datetime, int, str, NoneType] = '', end: Union[datetime.datetime, int, NoneType] = None, force: bool = False, retries: int = 10, min_seconds: int = 1, check_existing: bool = True, blocking: bool = True, workers: Optional[int] = None, callback: Optional[Callable[[Tuple[bool, str]], Any]] = None, error_callback: Optional[Callable[[Exception], Any]] = None, chunksize: Optional[int] = -1, sync_chunks: bool = True, debug: bool = False, _inplace: bool = True, **kw: Any) -> Tuple[bool, str]:
 40def sync(
 41    self,
 42    df: Union[
 43        pd.DataFrame,
 44        Dict[str, List[Any]],
 45        List[Dict[str, Any]],
 46        InferFetch
 47    ] = InferFetch,
 48    begin: Union[datetime, int, str, None] = '',
 49    end: Union[datetime, int, None] = None,
 50    force: bool = False,
 51    retries: int = 10,
 52    min_seconds: int = 1,
 53    check_existing: bool = True,
 54    blocking: bool = True,
 55    workers: Optional[int] = None,
 56    callback: Optional[Callable[[Tuple[bool, str]], Any]] = None,
 57    error_callback: Optional[Callable[[Exception], Any]] = None,
 58    chunksize: Optional[int] = -1,
 59    sync_chunks: bool = True,
 60    debug: bool = False,
 61    _inplace: bool = True,
 62    **kw: Any
 63) -> SuccessTuple:
 64    """
 65    Fetch new data from the source and update the pipe's table with new data.
 66    
 67    Get new remote data via fetch, get existing data in the same time period,
 68    and merge the two, only keeping the unseen data.
 69
 70    Parameters
 71    ----------
 72    df: Union[None, pd.DataFrame, Dict[str, List[Any]]], default None
 73        An optional DataFrame to sync into the pipe. Defaults to `None`.
 74
 75    begin: Union[datetime, int, str, None], default ''
 76        Optionally specify the earliest datetime to search for data.
 77
 78    end: Union[datetime, int, str, None], default None
 79        Optionally specify the latest datetime to search for data.
 80
 81    force: bool, default False
 82        If `True`, keep trying to sync untul `retries` attempts.
 83
 84    retries: int, default 10
 85        If `force`, how many attempts to try syncing before declaring failure.
 86
 87    min_seconds: Union[int, float], default 1
 88        If `force`, how many seconds to sleep between retries. Defaults to `1`.
 89
 90    check_existing: bool, default True
 91        If `True`, pull and diff with existing data from the pipe.
 92
 93    blocking: bool, default True
 94        If `True`, wait for sync to finish and return its result, otherwise
 95        asyncronously sync (oxymoron?) and return success. Defaults to `True`.
 96        Only intended for specific scenarios.
 97
 98    workers: Optional[int], default None
 99        If provided and the instance connector is thread-safe
100        (`pipe.instance_connector.IS_THREAD_SAFE is True`),
101        limit concurrent sync to this many threads.
102
103    callback: Optional[Callable[[Tuple[bool, str]], Any]], default None
104        Callback function which expects a SuccessTuple as input.
105        Only applies when `blocking=False`.
106
107    error_callback: Optional[Callable[[Exception], Any]], default None
108        Callback function which expects an Exception as input.
109        Only applies when `blocking=False`.
110
111    chunksize: int, default -1
112        Specify the number of rows to sync per chunk.
113        If `-1`, resort to system configuration (default is `900`).
114        A `chunksize` of `None` will sync all rows in one transaction.
115
116    sync_chunks: bool, default True
117        If possible, sync chunks while fetching them into memory.
118
119    debug: bool, default False
120        Verbosity toggle. Defaults to False.
121
122    Returns
123    -------
124    A `SuccessTuple` of success (`bool`) and message (`str`).
125    """
126    from meerschaum.utils.debug import dprint, _checkpoint
127    from meerschaum.connectors import custom_types
128    from meerschaum.plugins import Plugin
129    from meerschaum.utils.formatting import get_console
130    from meerschaum.utils.venv import Venv
131    from meerschaum.connectors import get_connector_plugin
132    from meerschaum.utils.misc import df_is_chunk_generator, filter_keywords, filter_arguments
133    from meerschaum.utils.pool import get_pool
134    from meerschaum.config import get_config
135
136    if (callback is not None or error_callback is not None) and blocking:
137        warn("Callback functions are only executed when blocking = False. Ignoring...")
138
139    _checkpoint(_total=2, **kw)
140
141    if chunksize == 0:
142        chunksize = None
143        sync_chunks = False
144
145    kw.update({
146        'begin': begin,
147        'end': end,
148        'force': force,
149        'retries': retries,
150        'min_seconds': min_seconds,
151        'check_existing': check_existing,
152        'blocking': blocking,
153        'workers': workers,
154        'callback': callback,
155        'error_callback': error_callback,
156        'sync_chunks': sync_chunks,
157        'chunksize': chunksize,
158    })
159
160    ### NOTE: Invalidate `_exists` cache before and after syncing.
161    self._exists = None
162
163    def _sync(
164        p: 'meerschaum.Pipe',
165        df: Union[
166            'pd.DataFrame',
167            Dict[str, List[Any]],
168            List[Dict[str, Any]],
169            InferFetch
170        ] = InferFetch,
171    ) -> SuccessTuple:
172        if df is None:
173            p._exists = None
174            return (
175                False,
176                f"You passed `None` instead of data into `sync()` for {p}.\n"
177                + "Omit the DataFrame to infer fetching.",
178            )
179        ### Ensure that Pipe is registered.
180        if not p.temporary and p.get_id(debug=debug) is None:
181            ### NOTE: This may trigger an interactive session for plugins!
182            register_success, register_msg = p.register(debug=debug)
183            if not register_success:
184                if 'already' not in register_msg:
185                    p._exists = None
186                    return register_success, register_msg
187
188        ### If connector is a plugin with a `sync()` method, return that instead.
189        ### If the plugin does not have a `sync()` method but does have a `fetch()` method,
190        ### use that instead.
191        ### NOTE: The DataFrame must be omitted for the plugin sync method to apply.
192        ### If a DataFrame is provided, continue as expected.
193        if hasattr(df, 'MRSM_INFER_FETCH'):
194            try:
195                if p.connector is None:
196                    if ':' not in p.connector_keys:
197                        return True, f"{p} does not support fetching; nothing to do."
198
199                    msg = f"{p} does not have a valid connector."
200                    if p.connector_keys.startswith('plugin:'):
201                        msg += f"\n    Perhaps {p.connector_keys} has a syntax error?"
202                    p._exists = None
203                    return False, msg
204            except Exception:
205                p._exists = None
206                return False, f"Unable to create the connector for {p}."
207
208            ### Sync in place if this is a SQL pipe.
209            if (
210                str(self.connector) == str(self.instance_connector)
211                and 
212                hasattr(self.instance_connector, 'sync_pipe_inplace')
213                and
214                _inplace
215                and
216                get_config('system', 'experimental', 'inplace_sync')
217            ):
218                with Venv(get_connector_plugin(self.instance_connector)):
219                    p._exists = None
220                    _args, _kwargs = filter_arguments(
221                        p.instance_connector.sync_pipe_inplace,
222                        p,
223                        debug=debug,
224                        **kw
225                    )
226                    return self.instance_connector.sync_pipe_inplace(
227                        *_args,
228                        **_kwargs
229                    )
230
231            ### Activate and invoke `sync(pipe)` for plugin connectors with `sync` methods.
232            try:
233                if getattr(p.connector, 'sync', None) is not None:
234                    with Venv(get_connector_plugin(p.connector), debug=debug):
235                        _args, _kwargs = filter_arguments(
236                            p.connector.sync,
237                            p,
238                            debug=debug,
239                            **kw
240                        )
241                        return_tuple = p.connector.sync(*_args, **_kwargs)
242                    p._exists = None
243                    if not isinstance(return_tuple, tuple):
244                        return_tuple = (
245                            False,
246                            f"Plugin '{p.connector.label}' returned non-tuple value: {return_tuple}"
247                        )
248                    return return_tuple
249
250            except Exception as e:
251                get_console().print_exception()
252                msg = f"Failed to sync {p} with exception: '" + str(e) + "'"
253                if debug:
254                    error(msg, silent=False)
255                p._exists = None
256                return False, msg
257
258            ### Fetch the dataframe from the connector's `fetch()` method.
259            try:
260                with Venv(get_connector_plugin(p.connector), debug=debug):
261                    df = p.fetch(
262                        **filter_keywords(
263                            p.fetch,
264                            debug=debug,
265                            **kw
266                        )
267                    )
268            except Exception as e:
269                get_console().print_exception(
270                    suppress=[
271                        'meerschaum/core/Pipe/_sync.py',
272                        'meerschaum/core/Pipe/_fetch.py',
273                    ]
274                )
275                msg = f"Failed to fetch data from {p.connector}:\n    {e}"
276                df = None
277
278            if df is None:
279                p._exists = None
280                return False, f"No data were fetched for {p}."
281
282            if isinstance(df, list):
283                if len(df) == 0:
284                    return True, f"No new rows were returned for {p}."
285
286                ### May be a chunk hook results list.
287                if isinstance(df[0], tuple):
288                    success = all([_success for _success, _ in df])
289                    message = '\n'.join([_message for _, _message in df])
290                    return success, message
291
292            ### TODO: Depreciate async?
293            if df is True:
294                p._exists = None
295                return True, f"{p} is being synced in parallel."
296
297        ### CHECKPOINT: Retrieved the DataFrame.
298        _checkpoint(**kw)
299
300        ### Allow for dataframe generators or iterables.
301        if df_is_chunk_generator(df):
302            kw['workers'] = p.get_num_workers(kw.get('workers', None))
303            dt_col = p.columns.get('datetime', None)
304            pool = get_pool(workers=kw.get('workers', 1))
305            if debug:
306                dprint(f"Received {type(df)}. Attempting to sync first chunk...")
307
308            try:
309                chunk = next(df)
310            except StopIteration:
311                return True, "Received an empty generator; nothing to do."
312
313            chunk_success, chunk_msg = _sync(p, chunk)
314            chunk_msg = '\n' + self._get_chunk_label(chunk, dt_col) + '\n' + chunk_msg
315            if not chunk_success:
316                return chunk_success, f"Unable to sync initial chunk for {p}:\n{chunk_msg}"
317            if debug:
318                dprint("Successfully synced the first chunk, attemping the rest...")
319
320            failed_chunks = []
321            def _process_chunk(_chunk):
322                try:
323                    _chunk_success, _chunk_msg = _sync(p, _chunk)
324                except Exception as e:
325                    _chunk_success, _chunk_msg = False, str(e)
326                if not _chunk_success:
327                    failed_chunks.append(_chunk)
328                return (
329                    _chunk_success,
330                    (
331                        '\n'
332                        + self._get_chunk_label(_chunk, dt_col)
333                        + '\n'
334                        + _chunk_msg
335                    )
336                )
337
338            results = sorted(
339                [(chunk_success, chunk_msg)] + (
340                    list(pool.imap(_process_chunk, df))
341                    if not df_is_chunk_generator(chunk)
342                    else [
343                        _process_chunk(_child_chunks)
344                        for _child_chunks in df
345                    ]
346                )
347            )
348            chunk_messages = [chunk_msg for _, chunk_msg in results]
349            success_bools = [chunk_success for chunk_success, _ in results]
350            success = all(success_bools)
351            msg = '\n'.join(chunk_messages)
352
353            ### If some chunks succeeded, retry the failures.
354            retry_success = True
355            if not success and any(success_bools):
356                if debug:
357                    dprint("Retrying failed chunks...")
358                chunks_to_retry = [c for c in failed_chunks]
359                failed_chunks = []
360                for chunk in chunks_to_retry:
361                    chunk_success, chunk_msg = _process_chunk(chunk)
362                    msg += f"\n\nRetried chunk:\n{chunk_msg}\n"
363                    retry_success = retry_success and chunk_success
364
365            success = success and retry_success
366            return success, msg
367
368        ### Cast to a dataframe and ensure datatypes are what we expect.
369        df = self.enforce_dtypes(df, chunksize=chunksize, debug=debug)
370
371        ### Capture `numeric`, `uuid`, and `json` columns.
372        self._persist_new_json_columns(df, debug=debug)
373        self._persist_new_numeric_columns(df, debug=debug)
374        self._persist_new_uuid_columns(df, debug=debug)
375
376        if debug:
377            dprint(
378                "DataFrame to sync:\n"
379                + (
380                    str(df)[:255]
381                    + '...'
382                    if len(str(df)) >= 256
383                    else str(df)
384                ),
385                **kw
386            )
387
388        ### if force, continue to sync until success
389        return_tuple = False, f"Did not sync {p}."
390        run = True
391        _retries = 1
392        while run:
393            with Venv(get_connector_plugin(self.instance_connector)):
394                return_tuple = p.instance_connector.sync_pipe(
395                    pipe=p,
396                    df=df,
397                    debug=debug,
398                    **kw
399                )
400            _retries += 1
401            run = (not return_tuple[0]) and force and _retries <= retries
402            if run and debug:
403                dprint(f"Syncing failed for {p}. Attempt ( {_retries} / {retries} )", **kw)
404                dprint(f"Sleeping for {min_seconds} seconds...", **kw)
405                time.sleep(min_seconds)
406            if _retries > retries:
407                warn(
408                    f"Unable to sync {p} within {retries} attempt" +
409                        ("s" if retries != 1 else "") + "!"
410                )
411
412        ### CHECKPOINT: Finished syncing. Handle caching.
413        _checkpoint(**kw)
414        if self.cache_pipe is not None:
415            if debug:
416                dprint("Caching retrieved dataframe.", **kw)
417                _sync_cache_tuple = self.cache_pipe.sync(df, debug=debug, **kw)
418                if not _sync_cache_tuple[0]:
419                    warn(f"Failed to sync local cache for {self}.")
420
421        self._exists = None
422        return return_tuple
423
424    if blocking:
425        self._exists = None
426        return _sync(self, df = df)
427
428    from meerschaum.utils.threading import Thread
429    def default_callback(result_tuple: SuccessTuple):
430        dprint(f"Asynchronous result from {self}: {result_tuple}", **kw)
431
432    def default_error_callback(x: Exception):
433        dprint(f"Error received for {self}: {x}", **kw)
434
435    if callback is None and debug:
436        callback = default_callback
437    if error_callback is None and debug:
438        error_callback = default_error_callback
439    try:
440        thread = Thread(
441            target=_sync,
442            args=(self,),
443            kwargs={'df': df},
444            daemon=False,
445            callback=callback,
446            error_callback=error_callback,
447        )
448        thread.start()
449    except Exception as e:
450        self._exists = None
451        return False, str(e)
452
453    self._exists = None
454    return True, f"Spawned asyncronous sync for {self}."

Fetch new data from the source and update the pipe's table with new data.

Get new remote data via fetch, get existing data in the same time period, and merge the two, only keeping the unseen data.

Parameters
  • df (Union[None, pd.DataFrame, Dict[str, List[Any]]], default None): An optional DataFrame to sync into the pipe. Defaults to None.
  • begin (Union[datetime, int, str, None], default ''): Optionally specify the earliest datetime to search for data.
  • end (Union[datetime, int, str, None], default None): Optionally specify the latest datetime to search for data.
  • force (bool, default False): If True, keep trying to sync untul retries attempts.
  • retries (int, default 10): If force, how many attempts to try syncing before declaring failure.
  • min_seconds (Union[int, float], default 1): If force, how many seconds to sleep between retries. Defaults to 1.
  • check_existing (bool, default True): If True, pull and diff with existing data from the pipe.
  • blocking (bool, default True): If True, wait for sync to finish and return its result, otherwise asyncronously sync (oxymoron?) and return success. Defaults to True. Only intended for specific scenarios.
  • workers (Optional[int], default None): If provided and the instance connector is thread-safe (pipe.instance_connector.IS_THREAD_SAFE is True), limit concurrent sync to this many threads.
  • callback (Optional[Callable[[Tuple[bool, str]], Any]], default None): Callback function which expects a SuccessTuple as input. Only applies when blocking=False.
  • error_callback (Optional[Callable[[Exception], Any]], default None): Callback function which expects an Exception as input. Only applies when blocking=False.
  • chunksize (int, default -1): Specify the number of rows to sync per chunk. If -1, resort to system configuration (default is 900). A chunksize of None will sync all rows in one transaction.
  • sync_chunks (bool, default True): If possible, sync chunks while fetching them into memory.
  • debug (bool, default False): Verbosity toggle. Defaults to False.
Returns
def get_sync_time( self, params: Optional[Dict[str, Any]] = None, newest: bool = True, apply_backtrack_interval: bool = False, round_down: bool = False, debug: bool = False) -> Optional[datetime.datetime]:
457def get_sync_time(
458    self,
459    params: Optional[Dict[str, Any]] = None,
460    newest: bool = True,
461    apply_backtrack_interval: bool = False,
462    round_down: bool = False,
463    debug: bool = False
464) -> Union['datetime', None]:
465    """
466    Get the most recent datetime value for a Pipe.
467
468    Parameters
469    ----------
470    params: Optional[Dict[str, Any]], default None
471        Dictionary to build a WHERE clause for a specific column.
472        See `meerschaum.utils.sql.build_where`.
473
474    newest: bool, default True
475        If `True`, get the most recent datetime (honoring `params`).
476        If `False`, get the oldest datetime (`ASC` instead of `DESC`).
477
478    apply_backtrack_interval: bool, default False
479        If `True`, subtract the backtrack interval from the sync time.
480
481    round_down: bool, default False
482        If `True`, round down the datetime value to the nearest minute.
483
484    debug: bool, default False
485        Verbosity toggle.
486
487    Returns
488    -------
489    A `datetime` object if the pipe exists, otherwise `None`.
490
491    """
492    from meerschaum.utils.venv import Venv
493    from meerschaum.connectors import get_connector_plugin
494    from meerschaum.utils.misc import round_time
495
496    with Venv(get_connector_plugin(self.instance_connector)):
497        sync_time = self.instance_connector.get_sync_time(
498            self,
499            params=params,
500            newest=newest,
501            debug=debug,
502        )
503
504    if round_down and isinstance(sync_time, datetime):
505        sync_time = round_time(sync_time, timedelta(minutes=1))
506
507    if apply_backtrack_interval and sync_time is not None:
508        backtrack_interval = self.get_backtrack_interval(debug=debug)
509        try:
510            sync_time -= backtrack_interval
511        except Exception as e:
512            warn(f"Failed to apply backtrack interval:\n{e}")
513
514    return sync_time

Get the most recent datetime value for a Pipe.

Parameters
  • params (Optional[Dict[str, Any]], default None): Dictionary to build a WHERE clause for a specific column. See meerschaum.utils.sql.build_where.
  • newest (bool, default True): If True, get the most recent datetime (honoring params). If False, get the oldest datetime (ASC instead of DESC).
  • apply_backtrack_interval (bool, default False): If True, subtract the backtrack interval from the sync time.
  • round_down (bool, default False): If True, round down the datetime value to the nearest minute.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A datetime object if the pipe exists, otherwise None.
def exists(self, debug: bool = False) -> bool:
517def exists(
518        self,
519        debug : bool = False
520    ) -> bool:
521    """
522    See if a Pipe's table exists.
523
524    Parameters
525    ----------
526    debug: bool, default False
527        Verbosity toggle.
528
529    Returns
530    -------
531    A `bool` corresponding to whether a pipe's underlying table exists.
532
533    """
534    import time
535    from meerschaum.utils.venv import Venv
536    from meerschaum.connectors import get_connector_plugin
537    from meerschaum.config import STATIC_CONFIG
538    from meerschaum.utils.debug import dprint
539    now = time.perf_counter()
540    exists_timeout_seconds = STATIC_CONFIG['pipes']['exists_timeout_seconds']
541
542    _exists = self.__dict__.get('_exists', None)
543    if _exists:
544        exists_timestamp = self.__dict__.get('_exists_timestamp', None)
545        if exists_timestamp is not None:
546            delta = now - exists_timestamp
547            if delta < exists_timeout_seconds:
548                if debug:
549                    dprint(f"Returning cached `exists` for {self} ({round(delta, 2)} seconds old).")
550                return _exists
551
552    with Venv(get_connector_plugin(self.instance_connector)):
553        _exists = self.instance_connector.pipe_exists(pipe=self, debug=debug)
554
555    self.__dict__['_exists'] = _exists
556    self.__dict__['_exists_timestamp'] = now
557    return _exists

See if a Pipe's table exists.

Parameters
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool corresponding to whether a pipe's underlying table exists.
def filter_existing( self, df: pandas.core.frame.DataFrame, safe_copy: bool = True, date_bound_only: bool = False, include_unchanged_columns: bool = False, chunksize: Optional[int] = -1, debug: bool = False, **kw) -> Tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame, pandas.core.frame.DataFrame]:
560def filter_existing(
561    self,
562    df: 'pd.DataFrame',
563    safe_copy: bool = True,
564    date_bound_only: bool = False,
565    include_unchanged_columns: bool = False,
566    chunksize: Optional[int] = -1,
567    debug: bool = False,
568    **kw
569) -> Tuple['pd.DataFrame', 'pd.DataFrame', 'pd.DataFrame']:
570    """
571    Inspect a dataframe and filter out rows which already exist in the pipe.
572
573    Parameters
574    ----------
575    df: 'pd.DataFrame'
576        The dataframe to inspect and filter.
577
578    safe_copy: bool, default True
579        If `True`, create a copy before comparing and modifying the dataframes.
580        Setting to `False` may mutate the DataFrames.
581        See `meerschaum.utils.dataframe.filter_unseen_df`.
582
583    date_bound_only: bool, default False
584        If `True`, only use the datetime index to fetch the sample dataframe.
585
586    include_unchanged_columns: bool, default False
587        If `True`, include the backtrack columns which haven't changed in the update dataframe.
588        This is useful if you can't update individual keys.
589
590    chunksize: Optional[int], default -1
591        The `chunksize` used when fetching existing data.
592
593    debug: bool, default False
594        Verbosity toggle.
595
596    Returns
597    -------
598    A tuple of three pandas DataFrames: unseen, update, and delta.
599    """
600    from meerschaum.utils.warnings import warn
601    from meerschaum.utils.debug import dprint
602    from meerschaum.utils.packages import attempt_import, import_pandas
603    from meerschaum.utils.misc import round_time
604    from meerschaum.utils.dataframe import (
605        filter_unseen_df,
606        add_missing_cols_to_df,
607        get_unhashable_cols,
608        get_numeric_cols,
609    )
610    from meerschaum.utils.dtypes import (
611        to_pandas_dtype,
612        none_if_null,
613    )
614    from meerschaum.config import get_config
615    pd = import_pandas()
616    pandas = attempt_import('pandas')
617    if 'dataframe' not in str(type(df)).lower():
618        df = self.enforce_dtypes(df, chunksize=chunksize, debug=debug)
619    is_dask = 'dask' in df.__module__
620    if is_dask:
621        dd = attempt_import('dask.dataframe')
622        merge = dd.merge
623        NA = pandas.NA
624    else:
625        merge = pd.merge
626        NA = pd.NA
627
628    def get_empty_df():
629        empty_df = pd.DataFrame([])
630        dtypes = dict(df.dtypes) if df is not None else {}
631        dtypes.update(self.dtypes)
632        pd_dtypes = {
633            col: to_pandas_dtype(str(typ))
634            for col, typ in dtypes.items()
635        }
636        return add_missing_cols_to_df(empty_df, pd_dtypes)
637
638    if df is None:
639        empty_df = get_empty_df()
640        return empty_df, empty_df, empty_df
641
642    if (df.empty if not is_dask else len(df) == 0):
643        return df, df, df
644
645    ### begin is the oldest data in the new dataframe
646    begin, end = None, None
647    dt_col = self.columns.get('datetime', None)
648    dt_type = self.dtypes.get(dt_col, 'datetime64[ns]') if dt_col else None
649    try:
650        min_dt_val = df[dt_col].min(skipna=True) if dt_col else None
651        if is_dask and min_dt_val is not None:
652            min_dt_val = min_dt_val.compute()
653        min_dt = (
654            pandas.to_datetime(min_dt_val).to_pydatetime()
655            if min_dt_val is not None and 'datetime' in str(dt_type)
656            else min_dt_val
657        )
658    except Exception:
659        min_dt = None
660    if not ('datetime' in str(type(min_dt))) or str(min_dt) == 'NaT':
661        if 'int' not in str(type(min_dt)).lower():
662            min_dt = None
663
664    if isinstance(min_dt, datetime):
665        begin = (
666            round_time(
667                min_dt,
668                to='down'
669            ) - timedelta(minutes=1)
670        )
671    elif dt_type and 'int' in dt_type.lower():
672        begin = min_dt
673    elif dt_col is None:
674        begin = None
675
676    ### end is the newest data in the new dataframe
677    try:
678        max_dt_val = df[dt_col].max(skipna=True) if dt_col else None
679        if is_dask and max_dt_val is not None:
680            max_dt_val = max_dt_val.compute()
681        max_dt = (
682            pandas.to_datetime(max_dt_val).to_pydatetime()
683            if max_dt_val is not None and 'datetime' in str(dt_type)
684            else max_dt_val
685        )
686    except Exception:
687        import traceback
688        traceback.print_exc()
689        max_dt = None
690
691    if ('datetime' not in str(type(max_dt))) or str(min_dt) == 'NaT':
692        if 'int' not in str(type(max_dt)).lower():
693            max_dt = None
694
695    if isinstance(max_dt, datetime):
696        end = (
697            round_time(
698                max_dt,
699                to='down'
700            ) + timedelta(minutes=1)
701        )
702    elif dt_type and 'int' in dt_type.lower():
703        end = max_dt + 1
704
705    if max_dt is not None and min_dt is not None and min_dt > max_dt:
706        warn("Detected minimum datetime greater than maximum datetime.")
707
708    if begin is not None and end is not None and begin > end:
709        if isinstance(begin, datetime):
710            begin = end - timedelta(minutes=1)
711        ### We might be using integers for the datetime axis.
712        else:
713            begin = end - 1
714
715    unique_index_vals = {
716        col: df[col].unique()
717        for col in self.columns
718        if col in df.columns and col != dt_col
719    } if not date_bound_only else {}
720    filter_params_index_limit = get_config('pipes', 'sync', 'filter_params_index_limit')
721    _ = kw.pop('params', None)
722    params = {
723        col: [
724            none_if_null(val)
725            for val in unique_vals
726        ]
727        for col, unique_vals in unique_index_vals.items()
728        if len(unique_vals) <= filter_params_index_limit
729    } if not date_bound_only else {}
730
731    if debug:
732        dprint(f"Looking at data between '{begin}' and '{end}':", **kw)
733
734    backtrack_df = self.get_data(
735        begin=begin,
736        end=end,
737        chunksize=chunksize,
738        params=params,
739        debug=debug,
740        **kw
741    )
742    if backtrack_df is None:
743        if debug:
744            dprint(f"No backtrack data was found for {self}.")
745        return df, get_empty_df(), df
746
747    if debug:
748        dprint(f"Existing data for {self}:\n" + str(backtrack_df), **kw)
749        dprint(f"Existing dtypes for {self}:\n" + str(backtrack_df.dtypes))
750
751    ### Separate new rows from changed ones.
752    on_cols = [
753        col for col_key, col in self.columns.items()
754        if (
755            col
756            and
757            col_key != 'value'
758            and col in backtrack_df.columns
759        )
760    ]
761    self_dtypes = self.dtypes
762    on_cols_dtypes = {
763        col: to_pandas_dtype(typ)
764        for col, typ in self_dtypes.items()
765        if col in on_cols
766    }
767
768    ### Detect changes between the old target and new source dataframes.
769    delta_df = add_missing_cols_to_df(
770        filter_unseen_df(
771            backtrack_df,
772            df,
773            dtypes={
774                col: to_pandas_dtype(typ)
775                for col, typ in self_dtypes.items()
776            },
777            safe_copy=safe_copy,
778            debug=debug
779        ),
780        on_cols_dtypes,
781    )
782
783    ### Cast dicts or lists to strings so we can merge.
784    serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str)
785
786    def deserializer(x):
787        return json.loads(x) if isinstance(x, str) else x
788
789    unhashable_delta_cols = get_unhashable_cols(delta_df)
790    unhashable_backtrack_cols = get_unhashable_cols(backtrack_df)
791    for col in unhashable_delta_cols:
792        delta_df[col] = delta_df[col].apply(serializer)
793    for col in unhashable_backtrack_cols:
794        backtrack_df[col] = backtrack_df[col].apply(serializer)
795    casted_cols = set(unhashable_delta_cols + unhashable_backtrack_cols)
796
797    joined_df = merge(
798        delta_df.infer_objects(copy=False).fillna(NA),
799        backtrack_df.infer_objects(copy=False).fillna(NA),
800        how='left',
801        on=on_cols,
802        indicator=True,
803        suffixes=('', '_old'),
804    ) if on_cols else delta_df
805    for col in casted_cols:
806        if col in joined_df.columns:
807            joined_df[col] = joined_df[col].apply(deserializer)
808        if col in delta_df.columns:
809            delta_df[col] = delta_df[col].apply(deserializer)
810
811    ### Determine which rows are completely new.
812    new_rows_mask = (joined_df['_merge'] == 'left_only') if on_cols else None
813    cols = list(delta_df.columns)
814
815    unseen_df = (
816        joined_df
817        .where(new_rows_mask)
818        .dropna(how='all')[cols]
819        .reset_index(drop=True)
820    ) if on_cols else delta_df
821
822    ### Rows that have already been inserted but values have changed.
823    update_df = (
824        joined_df
825        .where(~new_rows_mask)
826        .dropna(how='all')[cols]
827        .reset_index(drop=True)
828    ) if on_cols else get_empty_df()
829
830    if include_unchanged_columns and on_cols:
831        unchanged_backtrack_cols = [
832            col
833            for col in backtrack_df.columns
834            if col in on_cols or col not in update_df.columns
835        ]
836        update_df = merge(
837            backtrack_df[unchanged_backtrack_cols],
838            update_df,
839            how='inner',
840            on=on_cols,
841        )
842
843    return unseen_df, update_df, delta_df

Inspect a dataframe and filter out rows which already exist in the pipe.

Parameters
  • df ('pd.DataFrame'): The dataframe to inspect and filter.
  • safe_copy (bool, default True): If True, create a copy before comparing and modifying the dataframes. Setting to False may mutate the DataFrames. See meerschaum.utils.dataframe.filter_unseen_df.
  • date_bound_only (bool, default False): If True, only use the datetime index to fetch the sample dataframe.
  • include_unchanged_columns (bool, default False): If True, include the backtrack columns which haven't changed in the update dataframe. This is useful if you can't update individual keys.
  • chunksize (Optional[int], default -1): The chunksize used when fetching existing data.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A tuple of three pandas DataFrames (unseen, update, and delta.):
def get_num_workers(self, workers: Optional[int] = None) -> int:
868def get_num_workers(self, workers: Optional[int] = None) -> int:
869    """
870    Get the number of workers to use for concurrent syncs.
871
872    Parameters
873    ----------
874    The number of workers passed via `--workers`.
875
876    Returns
877    -------
878    The number of workers, capped for safety.
879    """
880    is_thread_safe = getattr(self.instance_connector, 'IS_THREAD_SAFE', False)
881    if not is_thread_safe:
882        return 1
883
884    engine_pool_size = (
885        self.instance_connector.engine.pool.size()
886        if self.instance_connector.type == 'sql'
887        else None
888    )
889    current_num_threads = threading.active_count()
890    current_num_connections = (
891        self.instance_connector.engine.pool.checkedout()
892        if engine_pool_size is not None
893        else current_num_threads
894    )
895    desired_workers = (
896        min(workers or engine_pool_size, engine_pool_size)
897        if engine_pool_size is not None
898        else workers
899    )
900    if desired_workers is None:
901        desired_workers = (multiprocessing.cpu_count() if is_thread_safe else 1)
902
903    return max(
904        (desired_workers - current_num_connections),
905        1,
906    )

Get the number of workers to use for concurrent syncs.

Parameters
  • The number of workers passed via --workers.
Returns
  • The number of workers, capped for safety.
def verify( self, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, chunk_interval: Union[datetime.timedelta, int, NoneType] = None, bounded: Optional[bool] = None, deduplicate: bool = False, workers: Optional[int] = None, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
 15def verify(
 16    self,
 17    begin: Union[datetime, int, None] = None,
 18    end: Union[datetime, int, None] = None,
 19    params: Optional[Dict[str, Any]] = None,
 20    chunk_interval: Union[timedelta, int, None] = None,
 21    bounded: Optional[bool] = None,
 22    deduplicate: bool = False,
 23    workers: Optional[int] = None,
 24    debug: bool = False,
 25    **kwargs: Any
 26) -> SuccessTuple:
 27    """
 28    Verify the contents of the pipe by resyncing its interval.
 29
 30    Parameters
 31    ----------
 32    begin: Union[datetime, int, None], default None
 33        If specified, only verify rows greater than or equal to this value.
 34
 35    end: Union[datetime, int, None], default None
 36        If specified, only verify rows less than this value.
 37
 38    chunk_interval: Union[timedelta, int, None], default None
 39        If provided, use this as the size of the chunk boundaries.
 40        Default to the value set in `pipe.parameters['chunk_minutes']` (1440).
 41
 42    bounded: Optional[bool], default None
 43        If `True`, do not verify older than the oldest sync time or newer than the newest.
 44        If `False`, verify unbounded syncs outside of the new and old sync times.
 45        The default behavior (`None`) is to bound only if a bound interval is set
 46        (e.g. `pipe.parameters['verify']['bound_days']`).
 47
 48    deduplicate: bool, default False
 49        If `True`, deduplicate the pipe's table after the verification syncs.
 50
 51    workers: Optional[int], default None
 52        If provided, limit the verification to this many threads.
 53        Use a value of `1` to sync chunks in series.
 54
 55    debug: bool, default False
 56        Verbosity toggle.
 57
 58    kwargs: Any
 59        All keyword arguments are passed to `pipe.sync()`.
 60
 61    Returns
 62    -------
 63    A SuccessTuple indicating whether the pipe was successfully resynced.
 64    """
 65    from meerschaum.utils.pool import get_pool
 66    from meerschaum.utils.misc import interval_str
 67    workers = self.get_num_workers(workers)
 68
 69    ### Skip configured bounding in parameters
 70    ### if `bounded` is explicitly `False`.
 71    bound_time = (
 72        self.get_bound_time(debug=debug)
 73        if bounded is not False
 74        else None
 75    )
 76    if bounded is None:
 77        bounded = bound_time is not None
 78
 79    if bounded and begin is None:
 80        begin = (
 81            bound_time
 82            if bound_time is not None
 83            else self.get_sync_time(newest=False, debug=debug)
 84        )
 85    if bounded and end is None:
 86        end = self.get_sync_time(newest=True, debug=debug)
 87
 88    if bounded and end is not None:
 89        end += (
 90            timedelta(minutes=1)
 91            if isinstance(end, datetime)
 92            else 1
 93        )
 94
 95    sync_less_than_begin = not bounded and begin is None
 96    sync_greater_than_end = not bounded and end is None
 97
 98    cannot_determine_bounds = not self.exists(debug=debug)
 99
100    if cannot_determine_bounds:
101        sync_success, sync_msg = self.sync(
102            begin = begin,
103            end = end,
104            params = params,
105            workers = workers,
106            debug = debug,
107            **kwargs
108        )
109        if not sync_success:
110            return sync_success, sync_msg
111        if deduplicate:
112            return self.deduplicate(
113                begin = begin,
114                end = end,
115                params = params,
116                workers = workers,
117                debug = debug,
118                **kwargs
119            )
120        return sync_success, sync_msg
121
122
123    chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug)
124    chunk_bounds = self.get_chunk_bounds(
125        begin = begin,
126        end = end,
127        chunk_interval = chunk_interval,
128        bounded = bounded,
129        debug = debug,
130    )
131
132    ### Consider it a success if no chunks need to be verified.
133    if not chunk_bounds:
134        if deduplicate:
135            return self.deduplicate(
136                begin = begin,
137                end = end,
138                params = params,
139                workers = workers,
140                debug = debug,
141                **kwargs
142            )
143        return True, f"Could not determine chunks between '{begin}' and '{end}'; nothing to do."
144
145    begin_to_print = (
146        begin
147        if begin is not None
148        else (
149            chunk_bounds[0][0]
150            if bounded
151            else chunk_bounds[0][1]
152        )
153    )
154    end_to_print = (
155        end
156        if end is not None
157        else (
158            chunk_bounds[-1][1]
159            if bounded
160            else chunk_bounds[-1][0]
161        )
162    )
163
164    info(
165        f"Syncing {len(chunk_bounds)} chunk" + ('s' if len(chunk_bounds) != 1 else '')
166        + f" ({'un' if not bounded else ''}bounded)"
167        + f" of size '{interval_str(chunk_interval)}'"
168        + f" between '{begin_to_print}' and '{end_to_print}'."
169    )
170
171    pool = get_pool(workers=workers)
172
173    ### Dictionary of the form bounds -> success_tuple, e.g.:
174    ### {
175    ###    (2023-01-01, 2023-01-02): (True, "Success")
176    ### }
177    bounds_success_tuples = {}
178    def process_chunk_bounds(
179            chunk_begin_and_end: Tuple[
180                Union[int, datetime],
181                Union