meerschaum

Meerschaum banner

Meerschaum Python API

Welcome to the Meerschaum Python API technical documentation! Here you can find information about the classes and functions provided by the meerschaum package. Visit meerschaum.io for general usage documentation.

Root Module

For your convenience, the following classes and functions may be imported from the root meerschaum namespace:

Examples

Build a Connector

Get existing connectors or build a new one in-memory with the meerschaum.get_connector() factory function:

import meerschaum as mrsm

sql_conn = mrsm.get_connector(
    'sql:temp',
    flavor='sqlite',
    database='/tmp/tmp.db',
)
df = sql_conn.read("SELECT 1 AS foo")
print(df)
#    foo
# 0    1

sql_conn.to_sql(df, 'foo')
print(sql_conn.read('foo'))
#    foo
# 0    1

Create a Custom Connector Class

Decorate your connector classes with meerschaum.make_connector() to designate it as a custom connector:

from datetime import datetime, timezone
from random import randint
import meerschaum as mrsm
from meerschaum.utils.misc import round_time

@mrsm.make_connector
class FooConnector(mrsm.Connector):
    REQUIRED_ATTRIBUTES = ['username', 'password']

    def fetch(
        self,
        begin: datetime | None = None,
        end: datetime | None = None,
    ):
        now = begin or round_time(datetime.now(timezone.utc))
        return [
            {'ts': now, 'id': 1, 'vl': randint(1, 100)},
            {'ts': now, 'id': 2, 'vl': randint(1, 100)},
            {'ts': now, 'id': 3, 'vl': randint(1, 100)},
        ]

foo_conn = mrsm.get_connector(
    'foo:bar',
    username='foo',
    password='bar',
)
docs = foo_conn.fetch()

Build a Pipe

Build a meerschaum.Pipe in-memory:

from datetime import datetime
import meerschaum as mrsm

pipe = mrsm.Pipe(
    foo_conn, 'demo',
    instance=sql_conn,
    columns={'datetime': 'ts', 'id': 'id'},
    tags=['production'],
)
pipe.sync(begin=datetime(2024, 1, 1))
df = pipe.get_data()
print(df)
#           ts  id  vl
# 0 2024-01-01   1  97
# 1 2024-01-01   2  18
# 2 2024-01-01   3  96

Add temporary=True to skip registering the pipe in the pipes table.

Get Registered Pipes

The meerschaum.get_pipes() function returns a dictionary hierarchy of pipes by connector, metric, and location:

import meerschaum as mrsm

pipes = mrsm.get_pipes(instance='sql:temp')
pipe = pipes['foo:bar']['demo'][None]

Add as_list=True to flatten the hierarchy:

import meerschaum as mrsm

pipes = mrsm.get_pipes(
    tags=['production'],
    instance=sql_conn,
    as_list=True,
)
print(pipes)
# [Pipe('foo:bar', 'demo', instance='sql:temp')]

Import Plugins

You can import a plugin's module through meerschaum.Plugin.module:

import meerschaum as mrsm

plugin = mrsm.Plugin('noaa')
with mrsm.Venv(plugin):
    noaa = plugin.module

If your plugin has submodules, use meerschaum.plugins.from_plugin_import:

from meerschaum.plugins import from_plugin_import
get_defined_pipes = from_plugin_import('compose.utils.pipes', 'get_defined_pipes')

Import multiple plugins with meerschaum.plugins.import_plugins:

from meerschaum.plugins import import_plugins
noaa, compose = import_plugins('noaa', 'compose')

Create a Job

Create a meerschaum.Job with name and sysargs:

import meerschaum as mrsm

job = mrsm.Job('syncing-engine', 'sync pipes --loop')
success, msg = job.start()

Pass executor_keys as the connectors keys of an API instance to create a remote job:

import meerschaum as mrsm

job = mrsm.Job(
    'foo',
    'sync pipes -s daily',
    executor_keys='api:main',
)

Import from a Virtual Environment Use the meerschaum.Venv context manager to activate a virtual environment:

import meerschaum as mrsm

with mrsm.Venv('noaa'):
    import requests

print(requests.__file__)
# /home/bmeares/.config/meerschaum/venvs/noaa/lib/python3.12/site-packages/requests/__init__.py

To import packages which may not be installed, use meerschaum.attempt_import():

import meerschaum as mrsm

requests = mrsm.attempt_import('requests', venv='noaa')
print(requests.__file__)
# /home/bmeares/.config/meerschaum/venvs/noaa/lib/python3.12/site-packages/requests/__init__.py

Run Actions

Run sysargs with meerschaum.entry():

import meerschaum as mrsm

success, msg = mrsm.entry('show pipes + show version : x2')

Use meerschaum.actions.get_action() to access an action function directly:

from meerschaum.actions import get_action

show_pipes = get_action(['show', 'pipes'])
success, msg = show_pipes(connector_keys=['plugin:noaa'])

Get a dictionary of available subactions with meerschaum.actions.get_subactions():

from meerschaum.actions import get_subactions

subactions = get_subactions('show')
success, msg = subactions['pipes']()

Create a Plugin

Run bootstrap plugin to create a new plugin:

mrsm bootstrap plugin example

This will create example.py in your plugins directory (default ~/.config/meerschaum/plugins/, Windows: %APPDATA%\Meerschaum\plugins). You may paste the example code from the "Create a Custom Action" example below.

Open your plugin with edit plugin:

mrsm edit plugin example

Run edit plugin and paste the example code below to try out the features.

See the writing plugins guide for more in-depth documentation.

Create a Custom Action

Decorate a function with meerschaum.actions.make_action to designate it as an action. Subactions will be automatically detected if not decorated:

from meerschaum.actions import make_action

@make_action
def sing():
    print('What would you like me to sing?')
    return True, "Success"

def sing_tune():
    return False, "I don't know that song!"

def sing_song():
    print('Hello, World!')
    return True, "Success"

Use meerschaum.plugins.add_plugin_argument() to create new parameters for your action:

from meerschaum.plugins import make_action, add_plugin_argument

add_plugin_argument(
    '--song', type=str, help='What song to sing.',
)

@make_action
def sing_melody(action=None, song=None):
    to_sing = action[0] if action else song
    if not to_sing:
        return False, "Please tell me what to sing!"

    return True, f'~I am singing {to_sing}~'
mrsm sing melody lalala

mrsm sing melody --song do-re-mi

Add a Page to the Web Dashboard Use the decorators meerschaum.plugins.dash_plugin() and meerschaum.plugins.web_page() to add new pages to the web dashboard:

from meerschaum.plugins import dash_plugin, web_page

@dash_plugin
def init_dash(dash_app):

    import dash.html as html
    import dash_bootstrap_components as dbc
    from dash import Input, Output, no_update

    ### Routes to '/dash/my-page'
    @web_page('/my-page', login_required=False)
    def my_page():
        return dbc.Container([
            html.H1("Hello, World!"),
            dbc.Button("Click me", id='my-button'),
            html.Div(id="my-output-div"),
        ])

    @dash_app.callback(
        Output('my-output-div', 'children'),
        Input('my-button', 'n_clicks'),
    )
    def my_button_click(n_clicks):
        if not n_clicks:
            return no_update
        return html.P(f'You clicked {n_clicks} times!')

Submodules

meerschaum.actions
Access functions for actions and subactions.

meerschaum.config
Read and write the Meerschaum configuration registry.

meerschaum.connectors
Build connectors to interact with databases and fetch data.

meerschaum.jobs
Start background jobs.

meerschaum.plugins
Access plugin modules and other API utilties.

meerschaum.utils
Utility functions are available in several submodules:

 1#! /usr/bin/env python
 2# -*- coding: utf-8 -*-
 3# vim:fenc=utf-8
 4
 5"""
 6Copyright 2023 Bennett Meares
 7
 8Licensed under the Apache License, Version 2.0 (the "License");
 9you may not use this file except in compliance with the License.
10You may obtain a copy of the License at
11
12   http://www.apache.org/licenses/LICENSE-2.0
13
14Unless required by applicable law or agreed to in writing, software
15distributed under the License is distributed on an "AS IS" BASIS,
16WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17See the License for the specific language governing permissions and
18limitations under the License.
19"""
20
21import atexit
22from meerschaum.utils.typing import SuccessTuple
23from meerschaum.utils.packages import attempt_import
24from meerschaum.core.Pipe import Pipe
25from meerschaum.plugins import Plugin
26from meerschaum.utils.venv import Venv
27from meerschaum.jobs import Job, make_executor
28from meerschaum.connectors import get_connector, Connector, make_connector
29from meerschaum.utils import get_pipes
30from meerschaum.utils.formatting import pprint
31from meerschaum._internal.docs import index as __doc__
32from meerschaum.config import __version__, get_config
33from meerschaum._internal.entry import entry
34from meerschaum.__main__ import _close_pools
35
36atexit.register(_close_pools)
37
38__pdoc__ = {'gui': False, 'api': False, 'core': False, '_internal': False}
39__all__ = (
40    "get_pipes",
41    "get_connector",
42    "get_config",
43    "Pipe",
44    "Plugin",
45    "Venv",
46    "Plugin",
47    "Job",
48    "pprint",
49    "attempt_import",
50    "actions",
51    "config",
52    "connectors",
53    "jobs",
54    "plugins",
55    "utils",
56    "SuccessTuple",
57    "Connector",
58    "make_connector",
59    "entry",
60)
def get_pipes( connector_keys: Union[str, List[str], NoneType] = None, metric_keys: Union[str, List[str], NoneType] = None, location_keys: Union[str, List[str], NoneType] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, mrsm_instance: Union[str, meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, instance: Union[str, meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, as_list: bool = False, method: str = 'registered', debug: bool = False, **kw: Any) -> Union[Dict[str, Dict[str, Dict[str, Pipe]]], List[Pipe]]:
 19def get_pipes(
 20    connector_keys: Union[str, List[str], None] = None,
 21    metric_keys: Union[str, List[str], None] = None,
 22    location_keys: Union[str, List[str], None] = None,
 23    tags: Optional[List[str]] = None,
 24    params: Optional[Dict[str, Any]] = None,
 25    mrsm_instance: Union[str, InstanceConnector, None] = None,
 26    instance: Union[str, InstanceConnector, None] = None,
 27    as_list: bool = False,
 28    method: str = 'registered',
 29    debug: bool = False,
 30    **kw: Any
 31) -> Union[PipesDict, List[mrsm.Pipe]]:
 32    """
 33    Return a dictionary or list of `meerschaum.Pipe` objects.
 34
 35    Parameters
 36    ----------
 37    connector_keys: Union[str, List[str], None], default None
 38        String or list of connector keys.
 39        If omitted or is `'*'`, fetch all possible keys.
 40        If a string begins with `'_'`, select keys that do NOT match the string.
 41
 42    metric_keys: Union[str, List[str], None], default None
 43        String or list of metric keys. See `connector_keys` for formatting.
 44
 45    location_keys: Union[str, List[str], None], default None
 46        String or list of location keys. See `connector_keys` for formatting.
 47
 48    tags: Optional[List[str]], default None
 49         If provided, only include pipes with these tags.
 50
 51    params: Optional[Dict[str, Any]], default None
 52        Dictionary of additional parameters to search by.
 53        Params are parsed into a SQL WHERE clause.
 54        E.g. `{'a': 1, 'b': 2}` equates to `'WHERE a = 1 AND b = 2'`
 55
 56    mrsm_instance: Union[str, InstanceConnector, None], default None
 57        Connector keys for the Meerschaum instance of the pipes.
 58        Must be a `meerschaum.connectors.sql.SQLConnector.SQLConnector` or
 59        `meerschaum.connectors.api.APIConnector.APIConnector`.
 60        
 61    as_list: bool, default False
 62        If `True`, return pipes in a list instead of a hierarchical dictionary.
 63        `False` : `{connector_keys: {metric_key: {location_key: Pipe}}}`
 64        `True`  : `[Pipe]`
 65
 66    method: str, default 'registered'
 67        Available options: `['registered', 'explicit', 'all']`
 68        If `'registered'` (default), create pipes based on registered keys in the connector's pipes table
 69        (API or SQL connector, depends on mrsm_instance).
 70        If `'explicit'`, create pipes from provided connector_keys, metric_keys, and location_keys
 71        instead of consulting the pipes table. Useful for creating non-existent pipes.
 72        If `'all'`, create pipes from predefined metrics and locations. Required `connector_keys`.
 73        **NOTE:** Method `'all'` is not implemented!
 74
 75    **kw: Any:
 76        Keyword arguments to pass to the `meerschaum.Pipe` constructor.
 77        
 78
 79    Returns
 80    -------
 81    A dictionary of dictionaries and `meerschaum.Pipe` objects
 82    in the connector, metric, location hierarchy.
 83    If `as_list` is `True`, return a list of `meerschaum.Pipe` objects.
 84
 85    Examples
 86    --------
 87    ```
 88    >>> ### Manual definition:
 89    >>> pipes = {
 90    ...     <connector_keys>: {
 91    ...         <metric_key>: {
 92    ...             <location_key>: Pipe(
 93    ...                 <connector_keys>,
 94    ...                 <metric_key>,
 95    ...                 <location_key>,
 96    ...             ),
 97    ...         },
 98    ...     },
 99    ... },
100    >>> ### Accessing a single pipe:
101    >>> pipes['sql:main']['weather'][None]
102    >>> ### Return a list instead:
103    >>> get_pipes(as_list=True)
104    [sql_main_weather]
105    >>> 
106    ```
107    """
108
109    from meerschaum.config import get_config
110    from meerschaum.utils.warnings import error
111    from meerschaum.utils.misc import filter_keywords
112
113    if connector_keys is None:
114        connector_keys = []
115    if metric_keys is None:
116        metric_keys = []
117    if location_keys is None:
118        location_keys = []
119    if params is None:
120        params = {}
121    if tags is None:
122        tags = []
123
124    if isinstance(connector_keys, str):
125        connector_keys = [connector_keys]
126    if isinstance(metric_keys, str):
127        metric_keys = [metric_keys]
128    if isinstance(location_keys, str):
129        location_keys = [location_keys]
130
131    ### Get SQL or API connector (keys come from `connector.fetch_pipes_keys()`).
132    if mrsm_instance is None:
133        mrsm_instance = instance
134    if mrsm_instance is None:
135        mrsm_instance = get_config('meerschaum', 'instance', patch=True)
136    if isinstance(mrsm_instance, str):
137        from meerschaum.connectors.parse import parse_instance_keys
138        connector = parse_instance_keys(keys=mrsm_instance, debug=debug)
139    else:
140        from meerschaum.connectors import instance_types
141        valid_connector = False
142        if hasattr(mrsm_instance, 'type'):
143            if mrsm_instance.type in instance_types:
144                valid_connector = True
145        if not valid_connector:
146            error(f"Invalid instance connector: {mrsm_instance}")
147        connector = mrsm_instance
148    if debug:
149        from meerschaum.utils.debug import dprint
150        dprint(f"Using instance connector: {connector}")
151    if not connector:
152        error(f"Could not create connector from keys: '{mrsm_instance}'")
153
154    ### Get a list of tuples for the keys needed to build pipes.
155    result = fetch_pipes_keys(
156        method,
157        connector,
158        connector_keys = connector_keys,
159        metric_keys = metric_keys,
160        location_keys = location_keys,
161        tags = tags,
162        params = params,
163        debug = debug
164    )
165    if result is None:
166        error(f"Unable to build pipes!")
167
168    ### Populate the `pipes` dictionary with Pipes based on the keys
169    ### obtained from the chosen `method`.
170    from meerschaum import Pipe
171    pipes = {}
172    for ck, mk, lk in result:
173        if ck not in pipes:
174            pipes[ck] = {}
175
176        if mk not in pipes[ck]:
177            pipes[ck][mk] = {}
178
179        pipes[ck][mk][lk] = Pipe(
180            ck, mk, lk,
181            mrsm_instance = connector,
182            debug = debug,
183            **filter_keywords(Pipe, **kw)
184        )
185
186    if not as_list:
187        return pipes
188    from meerschaum.utils.misc import flatten_pipes_dict
189    return flatten_pipes_dict(pipes)

Return a dictionary or list of meerschaum.Pipe objects.

Parameters
  • connector_keys (Union[str, List[str], None], default None): String or list of connector keys. If omitted or is '*', fetch all possible keys. If a string begins with '_', select keys that do NOT match the string.
  • metric_keys (Union[str, List[str], None], default None): String or list of metric keys. See connector_keys for formatting.
  • location_keys (Union[str, List[str], None], default None): String or list of location keys. See connector_keys for formatting.
  • tags (Optional[List[str]], default None): If provided, only include pipes with these tags.
  • params (Optional[Dict[str, Any]], default None): Dictionary of additional parameters to search by. Params are parsed into a SQL WHERE clause. E.g. {'a': 1, 'b': 2} equates to 'WHERE a = 1 AND b = 2'
  • mrsm_instance (Union[str, InstanceConnector, None], default None): Connector keys for the Meerschaum instance of the pipes. Must be a meerschaum.connectors.sql.SQLConnector.SQLConnector or meerschaum.connectors.api.APIConnector.APIConnector.
  • as_list (bool, default False): If True, return pipes in a list instead of a hierarchical dictionary. False : {connector_keys: {metric_key: {location_key: Pipe}}} True : [Pipe]
  • method (str, default 'registered'): Available options: ['registered', 'explicit', 'all'] If 'registered' (default), create pipes based on registered keys in the connector's pipes table (API or SQL connector, depends on mrsm_instance). If 'explicit', create pipes from provided connector_keys, metric_keys, and location_keys instead of consulting the pipes table. Useful for creating non-existent pipes. If 'all', create pipes from predefined metrics and locations. Required connector_keys. NOTE: Method 'all' is not implemented!
  • **kw (Any:): Keyword arguments to pass to the meerschaum.Pipe constructor.
Returns
  • A dictionary of dictionaries and meerschaum.Pipe objects
  • in the connector, metric, location hierarchy.
  • If as_list is True, return a list of meerschaum.Pipe objects.
Examples
>>> ### Manual definition:
>>> pipes = {
...     <connector_keys>: {
...         <metric_key>: {
...             <location_key>: Pipe(
...                 <connector_keys>,
...                 <metric_key>,
...                 <location_key>,
...             ),
...         },
...     },
... },
>>> ### Accessing a single pipe:
>>> pipes['sql:main']['weather'][None]
>>> ### Return a list instead:
>>> get_pipes(as_list=True)
[sql_main_weather]
>>> 
def get_connector( type: str = None, label: str = None, refresh: bool = False, debug: bool = False, **kw: Any) -> Connector:
 80def get_connector(
 81    type: str = None,
 82    label: str = None,
 83    refresh: bool = False,
 84    debug: bool = False,
 85    **kw: Any
 86) -> Connector:
 87    """
 88    Return existing connector or create new connection and store for reuse.
 89    
 90    You can create new connectors if enough parameters are provided for the given type and flavor.
 91    
 92
 93    Parameters
 94    ----------
 95    type: Optional[str], default None
 96        Connector type (sql, api, etc.).
 97        Defaults to the type of the configured `instance_connector`.
 98
 99    label: Optional[str], default None
100        Connector label (e.g. main). Defaults to `'main'`.
101
102    refresh: bool, default False
103        Refresh the Connector instance / construct new object. Defaults to `False`.
104
105    kw: Any
106        Other arguments to pass to the Connector constructor.
107        If the Connector has already been constructed and new arguments are provided,
108        `refresh` is set to `True` and the old Connector is replaced.
109
110    Returns
111    -------
112    A new Meerschaum connector (e.g. `meerschaum.connectors.api.APIConnector`,
113    `meerschaum.connectors.sql.SQLConnector`).
114    
115    Examples
116    --------
117    The following parameters would create a new
118    `meerschaum.connectors.sql.SQLConnector` that isn't in the configuration file.
119
120    ```
121    >>> conn = get_connector(
122    ...     type = 'sql',
123    ...     label = 'newlabel',
124    ...     flavor = 'sqlite',
125    ...     database = '/file/path/to/database.db'
126    ... )
127    >>>
128    ```
129
130    """
131    from meerschaum.connectors.parse import parse_instance_keys
132    from meerschaum.config import get_config
133    from meerschaum.config.static import STATIC_CONFIG
134    from meerschaum.utils.warnings import warn
135    global _loaded_plugin_connectors
136    if isinstance(type, str) and not label and ':' in type:
137        type, label = type.split(':', maxsplit=1)
138
139    with _locks['_loaded_plugin_connectors']:
140        if not _loaded_plugin_connectors:
141            load_plugin_connectors()
142            _load_builtin_custom_connectors()
143            _loaded_plugin_connectors = True
144
145    if type is None and label is None:
146        default_instance_keys = get_config('meerschaum', 'instance', patch=True)
147        ### recursive call to get_connector
148        return parse_instance_keys(default_instance_keys)
149
150    ### NOTE: the default instance connector may not be main.
151    ### Only fall back to 'main' if the type is provided by the label is omitted.
152    label = label if label is not None else STATIC_CONFIG['connectors']['default_label']
153
154    ### type might actually be a label. Check if so and raise a warning.
155    if type not in connectors:
156        possibilities, poss_msg = [], ""
157        for _type in get_config('meerschaum', 'connectors'):
158            if type in get_config('meerschaum', 'connectors', _type):
159                possibilities.append(f"{_type}:{type}")
160        if len(possibilities) > 0:
161            poss_msg = " Did you mean"
162            for poss in possibilities[:-1]:
163                poss_msg += f" '{poss}',"
164            if poss_msg.endswith(','):
165                poss_msg = poss_msg[:-1]
166            if len(possibilities) > 1:
167                poss_msg += " or"
168            poss_msg += f" '{possibilities[-1]}'?"
169
170        warn(f"Cannot create Connector of type '{type}'." + poss_msg, stack=False)
171        return None
172
173    if 'sql' not in types:
174        from meerschaum.connectors.plugin import PluginConnector
175        from meerschaum.connectors.valkey import ValkeyConnector
176        with _locks['types']:
177            types.update({
178                'api': APIConnector,
179                'sql': SQLConnector,
180                'plugin': PluginConnector,
181                'valkey': ValkeyConnector,
182            })
183
184    ### determine if we need to call the constructor
185    if not refresh:
186        ### see if any user-supplied arguments differ from the existing instance
187        if label in connectors[type]:
188            warning_message = None
189            for attribute, value in kw.items():
190                if attribute not in connectors[type][label].meta:
191                    import inspect
192                    cls = connectors[type][label].__class__
193                    cls_init_signature = inspect.signature(cls)
194                    cls_init_params = cls_init_signature.parameters
195                    if attribute not in cls_init_params:
196                        warning_message = (
197                            f"Received new attribute '{attribute}' not present in connector " +
198                            f"{connectors[type][label]}.\n"
199                        )
200                elif connectors[type][label].__dict__[attribute] != value:
201                    warning_message = (
202                        f"Mismatched values for attribute '{attribute}' in connector "
203                        + f"'{connectors[type][label]}'.\n" +
204                        f"  - Keyword value: '{value}'\n" +
205                        f"  - Existing value: '{connectors[type][label].__dict__[attribute]}'\n"
206                    )
207            if warning_message is not None:
208                warning_message += (
209                    "\nSetting `refresh` to True and recreating connector with type:"
210                    + f" '{type}' and label '{label}'."
211                )
212                refresh = True
213                warn(warning_message)
214        else: ### connector doesn't yet exist
215            refresh = True
216
217    ### only create an object if refresh is True
218    ### (can be manually specified, otherwise determined above)
219    if refresh:
220        with _locks['connectors']:
221            try:
222                ### will raise an error if configuration is incorrect / missing
223                conn = types[type](label=label, **kw)
224                connectors[type][label] = conn
225            except InvalidAttributesError as ie:
226                warn(
227                    f"Incorrect attributes for connector '{type}:{label}'.\n"
228                    + str(ie),
229                    stack = False,
230                )
231                conn = None
232            except Exception as e:
233                from meerschaum.utils.formatting import get_console
234                console = get_console()
235                if console:
236                    console.print_exception()
237                warn(
238                    f"Exception when creating connector '{type}:{label}'.\n" + str(e),
239                    stack = False,
240                )
241                conn = None
242        if conn is None:
243            return None
244
245    return connectors[type][label]

Return existing connector or create new connection and store for reuse.

You can create new connectors if enough parameters are provided for the given type and flavor.

Parameters
  • type (Optional[str], default None): Connector type (sql, api, etc.). Defaults to the type of the configured instance_connector.
  • label (Optional[str], default None): Connector label (e.g. main). Defaults to 'main'.
  • refresh (bool, default False): Refresh the Connector instance / construct new object. Defaults to False.
  • kw (Any): Other arguments to pass to the Connector constructor. If the Connector has already been constructed and new arguments are provided, refresh is set to True and the old Connector is replaced.
Returns
Examples

The following parameters would create a new meerschaum.connectors.sql.SQLConnector that isn't in the configuration file.

>>> conn = get_connector(
...     type = 'sql',
...     label = 'newlabel',
...     flavor = 'sqlite',
...     database = '/file/path/to/database.db'
... )
>>>
def get_config( *keys: str, patch: bool = True, substitute: bool = True, sync_files: bool = True, write_missing: bool = True, as_tuple: bool = False, warn: bool = True, debug: bool = False) -> Any:
 82def get_config(
 83    *keys: str,
 84    patch: bool = True,
 85    substitute: bool = True,
 86    sync_files: bool = True,
 87    write_missing: bool = True,
 88    as_tuple: bool = False,
 89    warn: bool = True,
 90    debug: bool = False
 91) -> Any:
 92    """
 93    Return the Meerschaum configuration dictionary.
 94    If positional arguments are provided, index by the keys.
 95    Raises a warning if invalid keys are provided.
 96
 97    Parameters
 98    ----------
 99    keys: str:
100        List of strings to index.
101
102    patch: bool, default True
103        If `True`, patch missing default keys into the config directory.
104        Defaults to `True`.
105
106    sync_files: bool, default True
107        If `True`, sync files if needed.
108        Defaults to `True`.
109
110    write_missing: bool, default True
111        If `True`, write default values when the main config files are missing.
112        Defaults to `True`.
113
114    substitute: bool, default True
115        If `True`, subsitute 'MRSM{}' values.
116        Defaults to `True`.
117
118    as_tuple: bool, default False
119        If `True`, return a tuple of type (success, value).
120        Defaults to `False`.
121        
122    Returns
123    -------
124    The value in the configuration directory, indexed by the provided keys.
125
126    Examples
127    --------
128    >>> get_config('meerschaum', 'instance')
129    'sql:main'
130    >>> get_config('does', 'not', 'exist')
131    UserWarning: Invalid keys in config: ('does', 'not', 'exist')
132    """
133    import json
134
135    symlinks_key = STATIC_CONFIG['config']['symlinks_key']
136    if debug:
137        from meerschaum.utils.debug import dprint
138        dprint(f"Indexing keys: {keys}", color=False)
139
140    if len(keys) == 0:
141        _rc = _config(substitute=substitute, sync_files=sync_files, write_missing=write_missing)
142        if as_tuple:
143            return True, _rc 
144        return _rc
145    
146    ### Weird threading issues, only import if substitute is True.
147    if substitute:
148        from meerschaum.config._read_config import search_and_substitute_config
149    ### Invalidate the cache if it was read before with substitute=False
150    ### but there still exist substitutions.
151    if (
152        config is not None and substitute and keys[0] != symlinks_key
153        and 'MRSM{' in json.dumps(config.get(keys[0]))
154    ):
155        try:
156            _subbed = search_and_substitute_config({keys[0]: config[keys[0]]})
157        except Exception as e:
158            import traceback
159            traceback.print_exc()
160        config[keys[0]] = _subbed[keys[0]]
161        if symlinks_key in _subbed:
162            if symlinks_key not in config:
163                config[symlinks_key] = {}
164            if keys[0] not in config[symlinks_key]:
165                config[symlinks_key][keys[0]] = {}
166            config[symlinks_key][keys[0]] = apply_patch_to_config(
167                _subbed,
168                config[symlinks_key][keys[0]]
169            )
170
171    from meerschaum.config._sync import sync_files as _sync_files
172    if config is None:
173        _config(*keys, sync_files=sync_files)
174
175    invalid_keys = False
176    if keys[0] not in config and keys[0] != symlinks_key:
177        single_key_config = read_config(
178            keys=[keys[0]], substitute=substitute, write_missing=write_missing
179        )
180        if keys[0] not in single_key_config:
181            invalid_keys = True
182        else:
183            config[keys[0]] = single_key_config.get(keys[0], None)
184            if symlinks_key in single_key_config and keys[0] in single_key_config[symlinks_key]:
185                if symlinks_key not in config:
186                    config[symlinks_key] = {}
187                config[symlinks_key][keys[0]] = single_key_config[symlinks_key][keys[0]]
188
189            if sync_files:
190                _sync_files(keys=[keys[0]])
191
192    c = config
193    if len(keys) > 0:
194        for k in keys:
195            try:
196                c = c[k]
197            except Exception as e:
198                invalid_keys = True
199                break
200        if invalid_keys:
201            ### Check if the keys are in the default configuration.
202            from meerschaum.config._default import default_config
203            in_default = True
204            patched_default_config = (
205                search_and_substitute_config(default_config)
206                if substitute else copy.deepcopy(default_config)
207            )
208            _c = patched_default_config
209            for k in keys:
210                try:
211                    _c = _c[k]
212                except Exception as e:
213                    in_default = False
214            if in_default:
215                c = _c
216                invalid_keys = False
217            warning_msg = f"Invalid keys in config: {keys}"
218            if not in_default:
219                try:
220                    if warn:
221                        from meerschaum.utils.warnings import warn as _warn
222                        _warn(warning_msg, stacklevel=3, color=False)
223                except Exception as e:
224                    if warn:
225                        print(warning_msg)
226                if as_tuple:
227                    return False, None
228                return None
229
230            ### Don't write keys that we haven't yet loaded into memory.
231            not_loaded_keys = [k for k in patched_default_config if k not in config]
232            for k in not_loaded_keys:
233                patched_default_config.pop(k, None)
234
235            set_config(
236                apply_patch_to_config(
237                    patched_default_config,
238                    config,
239                )
240            )
241            if patch and keys[0] != symlinks_key:
242                if write_missing:
243                    write_config(config, debug=debug)
244
245    if as_tuple:
246        return (not invalid_keys), c
247    return c

Return the Meerschaum configuration dictionary. If positional arguments are provided, index by the keys. Raises a warning if invalid keys are provided.

Parameters
  • keys (str:): List of strings to index.
  • patch (bool, default True): If True, patch missing default keys into the config directory. Defaults to True.
  • sync_files (bool, default True): If True, sync files if needed. Defaults to True.
  • write_missing (bool, default True): If True, write default values when the main config files are missing. Defaults to True.
  • substitute (bool, default True): If True, subsitute 'MRSM{}' values. Defaults to True.
  • as_tuple (bool, default False): If True, return a tuple of type (success, value). Defaults to False.
Returns
  • The value in the configuration directory, indexed by the provided keys.
Examples
>>> get_config('meerschaum', 'instance')
'sql:main'
>>> get_config('does', 'not', 'exist')
UserWarning: Invalid keys in config: ('does', 'not', 'exist')
class Pipe:
 60class Pipe:
 61    """
 62    Access Meerschaum pipes via Pipe objects.
 63    
 64    Pipes are identified by the following:
 65
 66    1. Connector keys (e.g. `'sql:main'`)
 67    2. Metric key (e.g. `'weather'`)
 68    3. Location (optional; e.g. `None`)
 69    
 70    A pipe's connector keys correspond to a data source, and when the pipe is synced,
 71    its `fetch` definition is evaluated and executed to produce new data.
 72    
 73    Alternatively, new data may be directly synced via `pipe.sync()`:
 74    
 75    ```
 76    >>> from meerschaum import Pipe
 77    >>> pipe = Pipe('csv', 'weather')
 78    >>>
 79    >>> import pandas as pd
 80    >>> df = pd.read_csv('weather.csv')
 81    >>> pipe.sync(df)
 82    ```
 83    """
 84
 85    from ._fetch import (
 86        fetch,
 87        get_backtrack_interval,
 88    )
 89    from ._data import (
 90        get_data,
 91        get_backtrack_data,
 92        get_rowcount,
 93        _get_data_as_iterator,
 94        get_chunk_interval,
 95        get_chunk_bounds,
 96        get_chunk_bounds_batches,
 97        parse_date_bounds,
 98    )
 99    from ._register import register
100    from ._attributes import (
101        attributes,
102        parameters,
103        columns,
104        indices,
105        indexes,
106        dtypes,
107        autoincrement,
108        upsert,
109        static,
110        tzinfo,
111        enforce,
112        null_indices,
113        get_columns,
114        get_columns_types,
115        get_columns_indices,
116        get_indices,
117        tags,
118        get_id,
119        id,
120        get_val_column,
121        parents,
122        parent,
123        children,
124        target,
125        _target_legacy,
126        guess_datetime,
127    )
128    from ._show import show
129    from ._edit import edit, edit_definition, update
130    from ._sync import (
131        sync,
132        get_sync_time,
133        exists,
134        filter_existing,
135        _get_chunk_label,
136        get_num_workers,
137        _persist_new_json_columns,
138        _persist_new_numeric_columns,
139        _persist_new_uuid_columns,
140        _persist_new_bytes_columns,
141    )
142    from ._verify import (
143        verify,
144        get_bound_interval,
145        get_bound_time,
146    )
147    from ._delete import delete
148    from ._drop import drop, drop_indices
149    from ._index import create_indices
150    from ._clear import clear
151    from ._deduplicate import deduplicate
152    from ._bootstrap import bootstrap
153    from ._dtypes import enforce_dtypes, infer_dtypes
154    from ._copy import copy_to
155
156    def __init__(
157        self,
158        connector: str = '',
159        metric: str = '',
160        location: Optional[str] = None,
161        parameters: Optional[Dict[str, Any]] = None,
162        columns: Union[Dict[str, str], List[str], None] = None,
163        indices: Optional[Dict[str, Union[str, List[str]]]] = None,
164        tags: Optional[List[str]] = None,
165        target: Optional[str] = None,
166        dtypes: Optional[Dict[str, str]] = None,
167        instance: Optional[Union[str, InstanceConnector]] = None,
168        temporary: bool = False,
169        upsert: Optional[bool] = None,
170        autoincrement: Optional[bool] = None,
171        static: Optional[bool] = None,
172        enforce: Optional[bool] = None,
173        null_indices: Optional[bool] = None,
174        mrsm_instance: Optional[Union[str, InstanceConnector]] = None,
175        cache: bool = False,
176        debug: bool = False,
177        connector_keys: Optional[str] = None,
178        metric_key: Optional[str] = None,
179        location_key: Optional[str] = None,
180        instance_keys: Optional[str] = None,
181        indexes: Union[Dict[str, str], List[str], None] = None,
182    ):
183        """
184        Parameters
185        ----------
186        connector: str
187            Keys for the pipe's source connector, e.g. `'sql:main'`.
188
189        metric: str
190            Label for the pipe's contents, e.g. `'weather'`.
191
192        location: str, default None
193            Label for the pipe's location. Defaults to `None`.
194
195        parameters: Optional[Dict[str, Any]], default None
196            Optionally set a pipe's parameters from the constructor,
197            e.g. columns and other attributes.
198            You can edit these parameters with `edit pipes`.
199
200        columns: Union[Dict[str, str], List[str], None], default None
201            Set the `columns` dictionary of `parameters`.
202            If `parameters` is also provided, this dictionary is added under the `'columns'` key.
203
204        indices: Optional[Dict[str, Union[str, List[str]]]], default None
205            Set the `indices` dictionary of `parameters`.
206            If `parameters` is also provided, this dictionary is added under the `'indices'` key.
207
208        tags: Optional[List[str]], default None
209            A list of strings to be added under the `'tags'` key of `parameters`.
210            You can select pipes with certain tags using `--tags`.
211
212        dtypes: Optional[Dict[str, str]], default None
213            Set the `dtypes` dictionary of `parameters`.
214            If `parameters` is also provided, this dictionary is added under the `'dtypes'` key.
215
216        mrsm_instance: Optional[Union[str, InstanceConnector]], default None
217            Connector for the Meerschaum instance where the pipe resides.
218            Defaults to the preconfigured default instance (`'sql:main'`).
219
220        instance: Optional[Union[str, InstanceConnector]], default None
221            Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored.
222
223        upsert: Optional[bool], default None
224            If `True`, set `upsert` to `True` in the parameters.
225
226        autoincrement: Optional[bool], default None
227            If `True`, set `autoincrement` in the parameters.
228
229        static: Optional[bool], default None
230            If `True`, set `static` in the parameters.
231
232        enforce: Optional[bool], default None
233            If `False`, skip data type enforcement.
234            Default behavior is `True`.
235
236        null_indices: Optional[bool], default None
237            Set to `False` if there will be no null values in the index columns.
238            Defaults to `True`.
239
240        temporary: bool, default False
241            If `True`, prevent instance tables (pipes, users, plugins) from being created.
242
243        cache: bool, default False
244            If `True`, cache fetched data into a local database file.
245            Defaults to `False`.
246        """
247        from meerschaum.utils.warnings import error, warn
248        if (not connector and not connector_keys) or (not metric and not metric_key):
249            error(
250                "Please provide strings for the connector and metric\n    "
251                + "(first two positional arguments)."
252            )
253
254        ### Fall back to legacy `location_key` just in case.
255        if not location:
256            location = location_key
257
258        if not connector:
259            connector = connector_keys
260
261        if not metric:
262            metric = metric_key
263
264        if location in ('[None]', 'None'):
265            location = None
266
267        from meerschaum.config.static import STATIC_CONFIG
268        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
269        for k in (connector, metric, location, *(tags or [])):
270            if str(k).startswith(negation_prefix):
271                error(f"A pipe's keys and tags cannot start with the prefix '{negation_prefix}'.")
272
273        self.connector_keys = str(connector)
274        self.connector_key = self.connector_keys ### Alias
275        self.metric_key = metric
276        self.location_key = location
277        self.temporary = temporary
278
279        self._attributes = {
280            'connector_keys': self.connector_keys,
281            'metric_key': self.metric_key,
282            'location_key': self.location_key,
283            'parameters': {},
284        }
285
286        ### only set parameters if values are provided
287        if isinstance(parameters, dict):
288            self._attributes['parameters'] = parameters
289        else:
290            if parameters is not None:
291                warn(f"The provided parameters are of invalid type '{type(parameters)}'.")
292            self._attributes['parameters'] = {}
293
294        columns = columns or self._attributes.get('parameters', {}).get('columns', {})
295        if isinstance(columns, list):
296            columns = {str(col): str(col) for col in columns}
297        if isinstance(columns, dict):
298            self._attributes['parameters']['columns'] = columns
299        elif columns is not None:
300            warn(f"The provided columns are of invalid type '{type(columns)}'.")
301
302        indices = (
303            indices
304            or indexes
305            or self._attributes.get('parameters', {}).get('indices', None)
306            or self._attributes.get('parameters', {}).get('indexes', None)
307        )
308        if isinstance(indices, dict):
309            indices_key = (
310                'indexes'
311                if 'indexes' in self._attributes['parameters']
312                else 'indices'
313            )
314            self._attributes['parameters'][indices_key] = indices
315
316        if isinstance(tags, (list, tuple)):
317            self._attributes['parameters']['tags'] = tags
318        elif tags is not None:
319            warn(f"The provided tags are of invalid type '{type(tags)}'.")
320
321        if isinstance(target, str):
322            self._attributes['parameters']['target'] = target
323        elif target is not None:
324            warn(f"The provided target is of invalid type '{type(target)}'.")
325
326        if isinstance(dtypes, dict):
327            self._attributes['parameters']['dtypes'] = dtypes
328        elif dtypes is not None:
329            warn(f"The provided dtypes are of invalid type '{type(dtypes)}'.")
330
331        if isinstance(upsert, bool):
332            self._attributes['parameters']['upsert'] = upsert
333
334        if isinstance(autoincrement, bool):
335            self._attributes['parameters']['autoincrement'] = autoincrement
336
337        if isinstance(static, bool):
338            self._attributes['parameters']['static'] = static
339
340        if isinstance(enforce, bool):
341            self._attributes['parameters']['enforce'] = enforce
342
343        if isinstance(null_indices, bool):
344            self._attributes['parameters']['null_indices'] = null_indices
345
346        ### NOTE: The parameters dictionary is {} by default.
347        ###       A Pipe may be registered without parameters, then edited,
348        ###       or a Pipe may be registered with parameters set in-memory first.
349        _mrsm_instance = mrsm_instance if mrsm_instance is not None else (instance or instance_keys)
350        if _mrsm_instance is None:
351            _mrsm_instance = get_config('meerschaum', 'instance', patch=True)
352
353        if not isinstance(_mrsm_instance, str):
354            self._instance_connector = _mrsm_instance
355            self.instance_keys = str(_mrsm_instance)
356        else: ### NOTE: must be SQL or API Connector for this work
357            self.instance_keys = _mrsm_instance
358
359        self._cache = cache and get_config('system', 'experimental', 'cache')
360
361    @property
362    def meta(self):
363        """
364        Return the four keys needed to reconstruct this pipe.
365        """
366        return {
367            'connector_keys': self.connector_keys,
368            'metric_key': self.metric_key,
369            'location_key': self.location_key,
370            'instance_keys': self.instance_keys,
371        }
372
373    def keys(self) -> List[str]:
374        """
375        Return the ordered keys for this pipe.
376        """
377        return {
378            key: val
379            for key, val in self.meta.items()
380            if key != 'instance'
381        }
382
383    @property
384    def instance_connector(self) -> Union[InstanceConnector, None]:
385        """
386        The connector to where this pipe resides.
387        May either be of type `meerschaum.connectors.sql.SQLConnector` or
388        `meerschaum.connectors.api.APIConnector`.
389        """
390        if '_instance_connector' not in self.__dict__:
391            from meerschaum.connectors.parse import parse_instance_keys
392            conn = parse_instance_keys(self.instance_keys)
393            if conn:
394                self._instance_connector = conn
395            else:
396                return None
397        return self._instance_connector
398
399    @property
400    def connector(self) -> Union[meerschaum.connectors.Connector, None]:
401        """
402        The connector to the data source.
403        """
404        if '_connector' not in self.__dict__:
405            from meerschaum.connectors.parse import parse_instance_keys
406            import warnings
407            with warnings.catch_warnings():
408                warnings.simplefilter('ignore')
409                try:
410                    conn = parse_instance_keys(self.connector_keys)
411                except Exception:
412                    conn = None
413            if conn:
414                self._connector = conn
415            else:
416                return None
417        return self._connector
418
419    @property
420    def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]:
421        """
422        If the pipe was created with `cache=True`, return the connector to the pipe's
423        SQLite database for caching.
424        """
425        if not self._cache:
426            return None
427
428        if '_cache_connector' not in self.__dict__:
429            from meerschaum.connectors import get_connector
430            from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH
431            _resources_path = SQLITE_RESOURCES_PATH
432            self._cache_connector = get_connector(
433                'sql', '_cache_' + str(self),
434                flavor='sqlite',
435                database=str(_resources_path / ('_cache_' + str(self) + '.db')),
436            )
437
438        return self._cache_connector
439
440    @property
441    def cache_pipe(self) -> Union['meerschaum.Pipe', None]:
442        """
443        If the pipe was created with `cache=True`, return another `meerschaum.Pipe` used to
444        manage the local data.
445        """
446        if self.cache_connector is None:
447            return None
448        if '_cache_pipe' not in self.__dict__:
449            from meerschaum.config._patch import apply_patch_to_config
450            from meerschaum.utils.sql import sql_item_name
451            _parameters = copy.deepcopy(self.parameters)
452            _fetch_patch = {
453                'fetch': ({
454                    'definition': (
455                        "SELECT * FROM "
456                        + sql_item_name(
457                            str(self.target),
458                            self.instance_connector.flavor,
459                            self.instance_connector.get_pipe_schema(self),
460                        )
461                    ),
462                }) if self.instance_connector.type == 'sql' else ({
463                    'connector_keys': self.connector_keys,
464                    'metric_key': self.metric_key,
465                    'location_key': self.location_key,
466                })
467            }
468            _parameters = apply_patch_to_config(_parameters, _fetch_patch)
469            self._cache_pipe = Pipe(
470                self.instance_keys,
471                (self.connector_keys + '_' + self.metric_key + '_cache'),
472                self.location_key,
473                mrsm_instance = self.cache_connector,
474                parameters = _parameters,
475                cache = False,
476                temporary = True,
477            )
478
479        return self._cache_pipe
480
481    def __str__(self, ansi: bool=False):
482        return pipe_repr(self, ansi=ansi)
483
484    def __eq__(self, other):
485        try:
486            return (
487                isinstance(self, type(other))
488                and self.connector_keys == other.connector_keys
489                and self.metric_key == other.metric_key
490                and self.location_key == other.location_key
491                and self.instance_keys == other.instance_keys
492            )
493        except Exception:
494            return False
495
496    def __hash__(self):
497        ### Using an esoteric separator to avoid collisions.
498        sep = "[\"']"
499        return hash(
500            str(self.connector_keys) + sep
501            + str(self.metric_key) + sep
502            + str(self.location_key) + sep
503            + str(self.instance_keys) + sep
504        )
505
506    def __repr__(self, ansi: bool=True, **kw) -> str:
507        if not hasattr(sys, 'ps1'):
508            ansi = False
509
510        return pipe_repr(self, ansi=ansi, **kw)
511
512    def __pt_repr__(self):
513        from meerschaum.utils.packages import attempt_import
514        prompt_toolkit_formatted_text = attempt_import('prompt_toolkit.formatted_text', lazy=False)
515        return prompt_toolkit_formatted_text.ANSI(pipe_repr(self, ansi=True))
516
517    def __getstate__(self) -> Dict[str, Any]:
518        """
519        Define the state dictionary (pickling).
520        """
521        return {
522            'connector_keys': self.connector_keys,
523            'metric_key': self.metric_key,
524            'location_key': self.location_key,
525            'parameters': self.parameters,
526            'instance_keys': self.instance_keys,
527        }
528
529    def __setstate__(self, _state: Dict[str, Any]):
530        """
531        Read the state (unpickling).
532        """
533        self.__init__(**_state)
534
535    def __getitem__(self, key: str) -> Any:
536        """
537        Index the pipe's attributes.
538        If the `key` cannot be found`, return `None`.
539        """
540        if key in self.attributes:
541            return self.attributes.get(key, None)
542
543        aliases = {
544            'connector': 'connector_keys',
545            'connector_key': 'connector_keys',
546            'metric': 'metric_key',
547            'location': 'location_key',
548        }
549        aliased_key = aliases.get(key, None)
550        if aliased_key is not None:
551            return self.attributes.get(aliased_key, None)
552
553        property_aliases = {
554            'instance': 'instance_keys',
555            'instance_key': 'instance_keys',
556        }
557        aliased_key = property_aliases.get(key, None)
558        if aliased_key is not None:
559            key = aliased_key
560        return getattr(self, key, None)

Access Meerschaum pipes via Pipe objects.

Pipes are identified by the following:

  1. Connector keys (e.g. 'sql:main')
  2. Metric key (e.g. 'weather')
  3. Location (optional; e.g. None)

A pipe's connector keys correspond to a data source, and when the pipe is synced, its fetch definition is evaluated and executed to produce new data.

Alternatively, new data may be directly synced via pipe.sync():

>>> from meerschaum import Pipe
>>> pipe = Pipe('csv', 'weather')
>>>
>>> import pandas as pd
>>> df = pd.read_csv('weather.csv')
>>> pipe.sync(df)
Pipe( connector: str = '', metric: str = '', location: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None, columns: Union[Dict[str, str], List[str], NoneType] = None, indices: Optional[Dict[str, Union[str, List[str]]]] = None, tags: Optional[List[str]] = None, target: Optional[str] = None, dtypes: Optional[Dict[str, str]] = None, instance: Union[str, meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, temporary: bool = False, upsert: Optional[bool] = None, autoincrement: Optional[bool] = None, static: Optional[bool] = None, enforce: Optional[bool] = None, null_indices: Optional[bool] = None, mrsm_instance: Union[str, meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, cache: bool = False, debug: bool = False, connector_keys: Optional[str] = None, metric_key: Optional[str] = None, location_key: Optional[str] = None, instance_keys: Optional[str] = None, indexes: Union[Dict[str, str], List[str], NoneType] = None)
156    def __init__(
157        self,
158        connector: str = '',
159        metric: str = '',
160        location: Optional[str] = None,
161        parameters: Optional[Dict[str, Any]] = None,
162        columns: Union[Dict[str, str], List[str], None] = None,
163        indices: Optional[Dict[str, Union[str, List[str]]]] = None,
164        tags: Optional[List[str]] = None,
165        target: Optional[str] = None,
166        dtypes: Optional[Dict[str, str]] = None,
167        instance: Optional[Union[str, InstanceConnector]] = None,
168        temporary: bool = False,
169        upsert: Optional[bool] = None,
170        autoincrement: Optional[bool] = None,
171        static: Optional[bool] = None,
172        enforce: Optional[bool] = None,
173        null_indices: Optional[bool] = None,
174        mrsm_instance: Optional[Union[str, InstanceConnector]] = None,
175        cache: bool = False,
176        debug: bool = False,
177        connector_keys: Optional[str] = None,
178        metric_key: Optional[str] = None,
179        location_key: Optional[str] = None,
180        instance_keys: Optional[str] = None,
181        indexes: Union[Dict[str, str], List[str], None] = None,
182    ):
183        """
184        Parameters
185        ----------
186        connector: str
187            Keys for the pipe's source connector, e.g. `'sql:main'`.
188
189        metric: str
190            Label for the pipe's contents, e.g. `'weather'`.
191
192        location: str, default None
193            Label for the pipe's location. Defaults to `None`.
194
195        parameters: Optional[Dict[str, Any]], default None
196            Optionally set a pipe's parameters from the constructor,
197            e.g. columns and other attributes.
198            You can edit these parameters with `edit pipes`.
199
200        columns: Union[Dict[str, str], List[str], None], default None
201            Set the `columns` dictionary of `parameters`.
202            If `parameters` is also provided, this dictionary is added under the `'columns'` key.
203
204        indices: Optional[Dict[str, Union[str, List[str]]]], default None
205            Set the `indices` dictionary of `parameters`.
206            If `parameters` is also provided, this dictionary is added under the `'indices'` key.
207
208        tags: Optional[List[str]], default None
209            A list of strings to be added under the `'tags'` key of `parameters`.
210            You can select pipes with certain tags using `--tags`.
211
212        dtypes: Optional[Dict[str, str]], default None
213            Set the `dtypes` dictionary of `parameters`.
214            If `parameters` is also provided, this dictionary is added under the `'dtypes'` key.
215
216        mrsm_instance: Optional[Union[str, InstanceConnector]], default None
217            Connector for the Meerschaum instance where the pipe resides.
218            Defaults to the preconfigured default instance (`'sql:main'`).
219
220        instance: Optional[Union[str, InstanceConnector]], default None
221            Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored.
222
223        upsert: Optional[bool], default None
224            If `True`, set `upsert` to `True` in the parameters.
225
226        autoincrement: Optional[bool], default None
227            If `True`, set `autoincrement` in the parameters.
228
229        static: Optional[bool], default None
230            If `True`, set `static` in the parameters.
231
232        enforce: Optional[bool], default None
233            If `False`, skip data type enforcement.
234            Default behavior is `True`.
235
236        null_indices: Optional[bool], default None
237            Set to `False` if there will be no null values in the index columns.
238            Defaults to `True`.
239
240        temporary: bool, default False
241            If `True`, prevent instance tables (pipes, users, plugins) from being created.
242
243        cache: bool, default False
244            If `True`, cache fetched data into a local database file.
245            Defaults to `False`.
246        """
247        from meerschaum.utils.warnings import error, warn
248        if (not connector and not connector_keys) or (not metric and not metric_key):
249            error(
250                "Please provide strings for the connector and metric\n    "
251                + "(first two positional arguments)."
252            )
253
254        ### Fall back to legacy `location_key` just in case.
255        if not location:
256            location = location_key
257
258        if not connector:
259            connector = connector_keys
260
261        if not metric:
262            metric = metric_key
263
264        if location in ('[None]', 'None'):
265            location = None
266
267        from meerschaum.config.static import STATIC_CONFIG
268        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
269        for k in (connector, metric, location, *(tags or [])):
270            if str(k).startswith(negation_prefix):
271                error(f"A pipe's keys and tags cannot start with the prefix '{negation_prefix}'.")
272
273        self.connector_keys = str(connector)
274        self.connector_key = self.connector_keys ### Alias
275        self.metric_key = metric
276        self.location_key = location
277        self.temporary = temporary
278
279        self._attributes = {
280            'connector_keys': self.connector_keys,
281            'metric_key': self.metric_key,
282            'location_key': self.location_key,
283            'parameters': {},
284        }
285
286        ### only set parameters if values are provided
287        if isinstance(parameters, dict):
288            self._attributes['parameters'] = parameters
289        else:
290            if parameters is not None:
291                warn(f"The provided parameters are of invalid type '{type(parameters)}'.")
292            self._attributes['parameters'] = {}
293
294        columns = columns or self._attributes.get('parameters', {}).get('columns', {})
295        if isinstance(columns, list):
296            columns = {str(col): str(col) for col in columns}
297        if isinstance(columns, dict):
298            self._attributes['parameters']['columns'] = columns
299        elif columns is not None:
300            warn(f"The provided columns are of invalid type '{type(columns)}'.")
301
302        indices = (
303            indices
304            or indexes
305            or self._attributes.get('parameters', {}).get('indices', None)
306            or self._attributes.get('parameters', {}).get('indexes', None)
307        )
308        if isinstance(indices, dict):
309            indices_key = (
310                'indexes'
311                if 'indexes' in self._attributes['parameters']
312                else 'indices'
313            )
314            self._attributes['parameters'][indices_key] = indices
315
316        if isinstance(tags, (list, tuple)):
317            self._attributes['parameters']['tags'] = tags
318        elif tags is not None:
319            warn(f"The provided tags are of invalid type '{type(tags)}'.")
320
321        if isinstance(target, str):
322            self._attributes['parameters']['target'] = target
323        elif target is not None:
324            warn(f"The provided target is of invalid type '{type(target)}'.")
325
326        if isinstance(dtypes, dict):
327            self._attributes['parameters']['dtypes'] = dtypes
328        elif dtypes is not None:
329            warn(f"The provided dtypes are of invalid type '{type(dtypes)}'.")
330
331        if isinstance(upsert, bool):
332            self._attributes['parameters']['upsert'] = upsert
333
334        if isinstance(autoincrement, bool):
335            self._attributes['parameters']['autoincrement'] = autoincrement
336
337        if isinstance(static, bool):
338            self._attributes['parameters']['static'] = static
339
340        if isinstance(enforce, bool):
341            self._attributes['parameters']['enforce'] = enforce
342
343        if isinstance(null_indices, bool):
344            self._attributes['parameters']['null_indices'] = null_indices
345
346        ### NOTE: The parameters dictionary is {} by default.
347        ###       A Pipe may be registered without parameters, then edited,
348        ###       or a Pipe may be registered with parameters set in-memory first.
349        _mrsm_instance = mrsm_instance if mrsm_instance is not None else (instance or instance_keys)
350        if _mrsm_instance is None:
351            _mrsm_instance = get_config('meerschaum', 'instance', patch=True)
352
353        if not isinstance(_mrsm_instance, str):
354            self._instance_connector = _mrsm_instance
355            self.instance_keys = str(_mrsm_instance)
356        else: ### NOTE: must be SQL or API Connector for this work
357            self.instance_keys = _mrsm_instance
358
359        self._cache = cache and get_config('system', 'experimental', 'cache')
Parameters
  • connector (str): Keys for the pipe's source connector, e.g. 'sql:main'.
  • metric (str): Label for the pipe's contents, e.g. 'weather'.
  • location (str, default None): Label for the pipe's location. Defaults to None.
  • parameters (Optional[Dict[str, Any]], default None): Optionally set a pipe's parameters from the constructor, e.g. columns and other attributes. You can edit these parameters with edit pipes.
  • columns (Union[Dict[str, str], List[str], None], default None): Set the columns dictionary of parameters. If parameters is also provided, this dictionary is added under the 'columns' key.
  • indices (Optional[Dict[str, Union[str, List[str]]]], default None): Set the indices dictionary of parameters. If parameters is also provided, this dictionary is added under the 'indices' key.
  • tags (Optional[List[str]], default None): A list of strings to be added under the 'tags' key of parameters. You can select pipes with certain tags using --tags.
  • dtypes (Optional[Dict[str, str]], default None): Set the dtypes dictionary of parameters. If parameters is also provided, this dictionary is added under the 'dtypes' key.
  • mrsm_instance (Optional[Union[str, InstanceConnector]], default None): Connector for the Meerschaum instance where the pipe resides. Defaults to the preconfigured default instance ('sql:main').
  • instance (Optional[Union[str, InstanceConnector]], default None): Alias for mrsm_instance. If mrsm_instance is supplied, this value is ignored.
  • upsert (Optional[bool], default None): If True, set upsert to True in the parameters.
  • autoincrement (Optional[bool], default None): If True, set autoincrement in the parameters.
  • static (Optional[bool], default None): If True, set static in the parameters.
  • enforce (Optional[bool], default None): If False, skip data type enforcement. Default behavior is True.
  • null_indices (Optional[bool], default None): Set to False if there will be no null values in the index columns. Defaults to True.
  • temporary (bool, default False): If True, prevent instance tables (pipes, users, plugins) from being created.
  • cache (bool, default False): If True, cache fetched data into a local database file. Defaults to False.
connector_keys
connector_key
metric_key
location_key
temporary
meta
361    @property
362    def meta(self):
363        """
364        Return the four keys needed to reconstruct this pipe.
365        """
366        return {
367            'connector_keys': self.connector_keys,
368            'metric_key': self.metric_key,
369            'location_key': self.location_key,
370            'instance_keys': self.instance_keys,
371        }

Return the four keys needed to reconstruct this pipe.

def keys(self) -> List[str]:
373    def keys(self) -> List[str]:
374        """
375        Return the ordered keys for this pipe.
376        """
377        return {
378            key: val
379            for key, val in self.meta.items()
380            if key != 'instance'
381        }

Return the ordered keys for this pipe.

instance_connector: Union[meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType]
383    @property
384    def instance_connector(self) -> Union[InstanceConnector, None]:
385        """
386        The connector to where this pipe resides.
387        May either be of type `meerschaum.connectors.sql.SQLConnector` or
388        `meerschaum.connectors.api.APIConnector`.
389        """
390        if '_instance_connector' not in self.__dict__:
391            from meerschaum.connectors.parse import parse_instance_keys
392            conn = parse_instance_keys(self.instance_keys)
393            if conn:
394                self._instance_connector = conn
395            else:
396                return None
397        return self._instance_connector

The connector to where this pipe resides. May either be of type meerschaum.connectors.sql.SQLConnector or meerschaum.connectors.api.APIConnector.

connector: Optional[Connector]
399    @property
400    def connector(self) -> Union[meerschaum.connectors.Connector, None]:
401        """
402        The connector to the data source.
403        """
404        if '_connector' not in self.__dict__:
405            from meerschaum.connectors.parse import parse_instance_keys
406            import warnings
407            with warnings.catch_warnings():
408                warnings.simplefilter('ignore')
409                try:
410                    conn = parse_instance_keys(self.connector_keys)
411                except Exception:
412                    conn = None
413            if conn:
414                self._connector = conn
415            else:
416                return None
417        return self._connector

The connector to the data source.

cache_connector: Optional[meerschaum.connectors.SQLConnector]
419    @property
420    def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]:
421        """
422        If the pipe was created with `cache=True`, return the connector to the pipe's
423        SQLite database for caching.
424        """
425        if not self._cache:
426            return None
427
428        if '_cache_connector' not in self.__dict__:
429            from meerschaum.connectors import get_connector
430            from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH
431            _resources_path = SQLITE_RESOURCES_PATH
432            self._cache_connector = get_connector(
433                'sql', '_cache_' + str(self),
434                flavor='sqlite',
435                database=str(_resources_path / ('_cache_' + str(self) + '.db')),
436            )
437
438        return self._cache_connector

If the pipe was created with cache=True, return the connector to the pipe's SQLite database for caching.

cache_pipe: Optional[Pipe]
440    @property
441    def cache_pipe(self) -> Union['meerschaum.Pipe', None]:
442        """
443        If the pipe was created with `cache=True`, return another `meerschaum.Pipe` used to
444        manage the local data.
445        """
446        if self.cache_connector is None:
447            return None
448        if '_cache_pipe' not in self.__dict__:
449            from meerschaum.config._patch import apply_patch_to_config
450            from meerschaum.utils.sql import sql_item_name
451            _parameters = copy.deepcopy(self.parameters)
452            _fetch_patch = {
453                'fetch': ({
454                    'definition': (
455                        "SELECT * FROM "
456                        + sql_item_name(
457                            str(self.target),
458                            self.instance_connector.flavor,
459                            self.instance_connector.get_pipe_schema(self),
460                        )
461                    ),
462                }) if self.instance_connector.type == 'sql' else ({
463                    'connector_keys': self.connector_keys,
464                    'metric_key': self.metric_key,
465                    'location_key': self.location_key,
466                })
467            }
468            _parameters = apply_patch_to_config(_parameters, _fetch_patch)
469            self._cache_pipe = Pipe(
470                self.instance_keys,
471                (self.connector_keys + '_' + self.metric_key + '_cache'),
472                self.location_key,
473                mrsm_instance = self.cache_connector,
474                parameters = _parameters,
475                cache = False,
476                temporary = True,
477            )
478
479        return self._cache_pipe

If the pipe was created with cache=True, return another meerschaum.Pipe used to manage the local data.

def fetch( self, begin: Union[datetime.datetime, int, str, NoneType] = '', end: Union[datetime.datetime, int, NoneType] = None, check_existing: bool = True, sync_chunks: bool = False, debug: bool = False, **kw: Any) -> Union[pandas.core.frame.DataFrame, Iterator[pandas.core.frame.DataFrame], NoneType]:
21def fetch(
22    self,
23    begin: Union[datetime, int, str, None] = '',
24    end: Union[datetime, int, None] = None,
25    check_existing: bool = True,
26    sync_chunks: bool = False,
27    debug: bool = False,
28    **kw: Any
29) -> Union['pd.DataFrame', Iterator['pd.DataFrame'], None]:
30    """
31    Fetch a Pipe's latest data from its connector.
32
33    Parameters
34    ----------
35    begin: Union[datetime, str, None], default '':
36        If provided, only fetch data newer than or equal to `begin`.
37
38    end: Optional[datetime], default None:
39        If provided, only fetch data older than or equal to `end`.
40
41    check_existing: bool, default True
42        If `False`, do not apply the backtrack interval.
43
44    sync_chunks: bool, default False
45        If `True` and the pipe's connector is of type `'sql'`, begin syncing chunks while fetching
46        loads chunks into memory.
47
48    debug: bool, default False
49        Verbosity toggle.
50
51    Returns
52    -------
53    A `pd.DataFrame` of the newest unseen data.
54
55    """
56    if 'fetch' not in dir(self.connector):
57        warn(f"No `fetch()` function defined for connector '{self.connector}'")
58        return None
59
60    from meerschaum.connectors import get_connector_plugin
61    from meerschaum.utils.misc import filter_arguments
62
63    _chunk_hook = kw.pop('chunk_hook', None)
64    kw['workers'] = self.get_num_workers(kw.get('workers', None))
65    if sync_chunks and _chunk_hook is None:
66
67        def _chunk_hook(chunk, **_kw) -> SuccessTuple:
68            """
69            Wrap `Pipe.sync()` with a custom chunk label prepended to the message.
70            """
71            from meerschaum.config._patch import apply_patch_to_config
72            kwargs = apply_patch_to_config(kw, _kw)
73            chunk_success, chunk_message = self.sync(chunk, **kwargs)
74            chunk_label = self._get_chunk_label(chunk, self.columns.get('datetime', None))
75            if chunk_label:
76                chunk_message = '\n' + chunk_label + '\n' + chunk_message
77            return chunk_success, chunk_message
78
79    begin, end = self.parse_date_bounds(begin, end)
80
81    with mrsm.Venv(get_connector_plugin(self.connector)):
82        _args, _kwargs = filter_arguments(
83            self.connector.fetch,
84            self,
85            begin=_determine_begin(
86                self,
87                begin,
88                end,
89                check_existing=check_existing,
90                debug=debug,
91            ),
92            end=end,
93            chunk_hook=_chunk_hook,
94            debug=debug,
95            **kw
96        )
97        df = self.connector.fetch(*_args, **_kwargs)
98    return df

Fetch a Pipe's latest data from its connector.

Parameters
  • begin (Union[datetime, str, None], default '':): If provided, only fetch data newer than or equal to begin.
  • end (Optional[datetime], default None:): If provided, only fetch data older than or equal to end.
  • check_existing (bool, default True): If False, do not apply the backtrack interval.
  • sync_chunks (bool, default False): If True and the pipe's connector is of type 'sql', begin syncing chunks while fetching loads chunks into memory.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pd.DataFrame of the newest unseen data.
def get_backtrack_interval( self, check_existing: bool = True, debug: bool = False) -> Union[datetime.timedelta, int]:
101def get_backtrack_interval(
102    self,
103    check_existing: bool = True,
104    debug: bool = False,
105) -> Union[timedelta, int]:
106    """
107    Get the chunk interval to use for this pipe.
108
109    Parameters
110    ----------
111    check_existing: bool, default True
112        If `False`, return a backtrack_interval of 0 minutes.
113
114    Returns
115    -------
116    The backtrack interval (`timedelta` or `int`) to use with this pipe's `datetime` axis.
117    """
118    default_backtrack_minutes = get_config('pipes', 'parameters', 'fetch', 'backtrack_minutes')
119    configured_backtrack_minutes = self.parameters.get('fetch', {}).get('backtrack_minutes', None)
120    backtrack_minutes = (
121        configured_backtrack_minutes
122        if configured_backtrack_minutes is not None
123        else default_backtrack_minutes
124    ) if check_existing else 0
125
126    backtrack_interval = timedelta(minutes=backtrack_minutes)
127    dt_col = self.columns.get('datetime', None)
128    if dt_col is None:
129        return backtrack_interval
130
131    dt_dtype = self.dtypes.get(dt_col, 'datetime64[ns, UTC]')
132    if 'int' in dt_dtype.lower():
133        return backtrack_minutes
134
135    return backtrack_interval

Get the chunk interval to use for this pipe.

Parameters
  • check_existing (bool, default True): If False, return a backtrack_interval of 0 minutes.
Returns
  • The backtrack interval (timedelta or int) to use with this pipe's datetime axis.
def get_data( self, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[datetime.datetime, int, str, NoneType] = None, end: Union[datetime.datetime, int, str, NoneType] = None, params: Optional[Dict[str, Any]] = None, as_iterator: bool = False, as_chunks: bool = False, as_dask: bool = False, chunk_interval: Union[datetime.timedelta, int, NoneType] = None, order: Optional[str] = 'asc', limit: Optional[int] = None, fresh: bool = False, debug: bool = False, **kw: Any) -> Union[pandas.core.frame.DataFrame, Iterator[pandas.core.frame.DataFrame], NoneType]:
 23def get_data(
 24    self,
 25    select_columns: Optional[List[str]] = None,
 26    omit_columns: Optional[List[str]] = None,
 27    begin: Union[datetime, int, str, None] = None,
 28    end: Union[datetime, int, str, None] = None,
 29    params: Optional[Dict[str, Any]] = None,
 30    as_iterator: bool = False,
 31    as_chunks: bool = False,
 32    as_dask: bool = False,
 33    chunk_interval: Union[timedelta, int, None] = None,
 34    order: Optional[str] = 'asc',
 35    limit: Optional[int] = None,
 36    fresh: bool = False,
 37    debug: bool = False,
 38    **kw: Any
 39) -> Union['pd.DataFrame', Iterator['pd.DataFrame'], None]:
 40    """
 41    Get a pipe's data from the instance connector.
 42
 43    Parameters
 44    ----------
 45    select_columns: Optional[List[str]], default None
 46        If provided, only select these given columns.
 47        Otherwise select all available columns (i.e. `SELECT *`).
 48
 49    omit_columns: Optional[List[str]], default None
 50        If provided, remove these columns from the selection.
 51
 52    begin: Union[datetime, int, str, None], default None
 53        Lower bound datetime to begin searching for data (inclusive).
 54        Translates to a `WHERE` clause like `WHERE datetime >= begin`.
 55        Defaults to `None`.
 56
 57    end: Union[datetime, int, str, None], default None
 58        Upper bound datetime to stop searching for data (inclusive).
 59        Translates to a `WHERE` clause like `WHERE datetime < end`.
 60        Defaults to `None`.
 61
 62    params: Optional[Dict[str, Any]], default None
 63        Filter the retrieved data by a dictionary of parameters.
 64        See `meerschaum.utils.sql.build_where` for more details. 
 65
 66    as_iterator: bool, default False
 67        If `True`, return a generator of chunks of pipe data.
 68
 69    as_chunks: bool, default False
 70        Alias for `as_iterator`.
 71
 72    as_dask: bool, default False
 73        If `True`, return a `dask.DataFrame`
 74        (which may be loaded into a Pandas DataFrame with `df.compute()`).
 75
 76    chunk_interval: Union[timedelta, int, None], default None
 77        If `as_iterator`, then return chunks with `begin` and `end` separated by this interval.
 78        This may be set under `pipe.parameters['chunk_minutes']`.
 79        By default, use a timedelta of 1440 minutes (1 day).
 80        If `chunk_interval` is an integer and the `datetime` axis a timestamp,
 81        the use a timedelta with the number of minutes configured to this value.
 82        If the `datetime` axis is an integer, default to the configured chunksize.
 83        If `chunk_interval` is a `timedelta` and the `datetime` axis an integer,
 84        use the number of minutes in the `timedelta`.
 85
 86    order: Optional[str], default 'asc'
 87        If `order` is not `None`, sort the resulting dataframe by indices.
 88
 89    limit: Optional[int], default None
 90        If provided, cap the dataframe to this many rows.
 91
 92    fresh: bool, default True
 93        If `True`, skip local cache and directly query the instance connector.
 94        Defaults to `True`.
 95
 96    debug: bool, default False
 97        Verbosity toggle.
 98        Defaults to `False`.
 99
100    Returns
101    -------
102    A `pd.DataFrame` for the pipe's data corresponding to the provided parameters.
103
104    """
105    from meerschaum.utils.warnings import warn
106    from meerschaum.utils.venv import Venv
107    from meerschaum.connectors import get_connector_plugin
108    from meerschaum.utils.misc import iterate_chunks, items_str
109    from meerschaum.utils.dtypes import to_pandas_dtype, coerce_timezone
110    from meerschaum.utils.dataframe import add_missing_cols_to_df, df_is_chunk_generator
111    from meerschaum.utils.packages import attempt_import
112    dd = attempt_import('dask.dataframe') if as_dask else None
113    dask = attempt_import('dask') if as_dask else None
114    dateutil_parser = attempt_import('dateutil.parser')
115
116    if select_columns == '*':
117        select_columns = None
118    elif isinstance(select_columns, str):
119        select_columns = [select_columns]
120
121    if isinstance(omit_columns, str):
122        omit_columns = [omit_columns]
123
124    begin, end = self.parse_date_bounds(begin, end)
125    as_iterator = as_iterator or as_chunks
126    dt_col = self.columns.get('datetime', None)
127
128    def _sort_df(_df):
129        if df_is_chunk_generator(_df):
130            return _df
131        indices = [] if dt_col not in _df.columns else [dt_col]
132        non_dt_cols = [
133            col
134            for col_ix, col in self.columns.items()
135            if col_ix != 'datetime' and col in _df.columns
136        ]
137        indices.extend(non_dt_cols)
138        if 'dask' not in _df.__module__:
139            _df.sort_values(
140                by=indices,
141                inplace=True,
142                ascending=(str(order).lower() == 'asc'),
143            )
144            _df.reset_index(drop=True, inplace=True)
145        else:
146            _df = _df.sort_values(
147                by=indices,
148                ascending=(str(order).lower() == 'asc'),
149            )
150            _df = _df.reset_index(drop=True)
151        if limit is not None and len(_df) > limit:
152            return _df.head(limit)
153        return _df
154
155    if as_iterator or as_chunks:
156        df = self._get_data_as_iterator(
157            select_columns=select_columns,
158            omit_columns=omit_columns,
159            begin=begin,
160            end=end,
161            params=params,
162            chunk_interval=chunk_interval,
163            limit=limit,
164            order=order,
165            fresh=fresh,
166            debug=debug,
167        )
168        return _sort_df(df)
169
170    if as_dask:
171        from multiprocessing.pool import ThreadPool
172        dask_pool = ThreadPool(self.get_num_workers())
173        dask.config.set(pool=dask_pool)
174        chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug)
175        bounds = self.get_chunk_bounds(
176            begin=begin,
177            end=end,
178            bounded=False,
179            chunk_interval=chunk_interval,
180            debug=debug,
181        )
182        dask_chunks = [
183            dask.delayed(self.get_data)(
184                select_columns=select_columns,
185                omit_columns=omit_columns,
186                begin=chunk_begin,
187                end=chunk_end,
188                params=params,
189                chunk_interval=chunk_interval,
190                order=order,
191                limit=limit,
192                fresh=fresh,
193                debug=debug,
194            )
195            for (chunk_begin, chunk_end) in bounds
196        ]
197        dask_meta = {
198            col: to_pandas_dtype(typ)
199            for col, typ in self.dtypes.items()
200        }
201        return _sort_df(dd.from_delayed(dask_chunks, meta=dask_meta))
202
203    if not self.exists(debug=debug):
204        return None
205
206    if self.cache_pipe is not None:
207        if not fresh:
208            _sync_cache_tuple = self.cache_pipe.sync(
209                begin=begin,
210                end=end,
211                params=params,
212                debug=debug,
213                **kw
214            )
215            if not _sync_cache_tuple[0]:
216                warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1])
217                fresh = True
218            else: ### Successfully synced cache.
219                return self.enforce_dtypes(
220                    self.cache_pipe.get_data(
221                        select_columns=select_columns,
222                        omit_columns=omit_columns,
223                        begin=begin,
224                        end=end,
225                        params=params,
226                        order=order,
227                        limit=limit,
228                        debug=debug,
229                        fresh=True,
230                        **kw
231                    ),
232                    debug=debug,
233                )
234
235    with Venv(get_connector_plugin(self.instance_connector)):
236        df = self.instance_connector.get_pipe_data(
237            pipe=self,
238            select_columns=select_columns,
239            omit_columns=omit_columns,
240            begin=begin,
241            end=end,
242            params=params,
243            limit=limit,
244            order=order,
245            debug=debug,
246            **kw
247        )
248        if df is None:
249            return df
250
251        if not select_columns:
252            select_columns = [col for col in df.columns]
253
254        cols_to_omit = [
255            col
256            for col in df.columns
257            if (
258                col in (omit_columns or [])
259                or
260                col not in (select_columns or [])
261            )
262        ]
263        cols_to_add = [
264            col
265            for col in select_columns
266            if col not in df.columns
267        ]
268        if cols_to_omit:
269            warn(
270                (
271                    f"Received {len(cols_to_omit)} omitted column"
272                    + ('s' if len(cols_to_omit) != 1 else '')
273                    + f" for {self}. "
274                    + "Consider adding `select_columns` and `omit_columns` support to "
275                    + f"'{self.instance_connector.type}' connectors to improve performance."
276                ),
277                stack=False,
278            )
279            _cols_to_select = [col for col in df.columns if col not in cols_to_omit]
280            df = df[_cols_to_select]
281
282        if cols_to_add:
283            warn(
284                (
285                    f"Specified columns {items_str(cols_to_add)} were not found on {self}. "
286                    + "Adding these to the DataFrame as null columns."
287                ),
288                stack=False,
289            )
290            df = add_missing_cols_to_df(df, {col: 'string' for col in cols_to_add})
291
292        enforced_df = self.enforce_dtypes(df, debug=debug)
293
294        if order:
295            return _sort_df(enforced_df)
296        return enforced_df

Get a pipe's data from the instance connector.

Parameters
  • select_columns (Optional[List[str]], default None): If provided, only select these given columns. Otherwise select all available columns (i.e. SELECT *).
  • omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
  • begin (Union[datetime, int, str, None], default None): Lower bound datetime to begin searching for data (inclusive). Translates to a WHERE clause like WHERE datetime >= begin. Defaults to None.
  • end (Union[datetime, int, str, None], default None): Upper bound datetime to stop searching for data (inclusive). Translates to a WHERE clause like WHERE datetime < end. Defaults to None.
  • params (Optional[Dict[str, Any]], default None): Filter the retrieved data by a dictionary of parameters. See meerschaum.utils.sql.build_where for more details.
  • as_iterator (bool, default False): If True, return a generator of chunks of pipe data.
  • as_chunks (bool, default False): Alias for as_iterator.
  • as_dask (bool, default False): If True, return a dask.DataFrame (which may be loaded into a Pandas DataFrame with df.compute()).
  • chunk_interval (Union[timedelta, int, None], default None): If as_iterator, then return chunks with begin and end separated by this interval. This may be set under pipe.parameters['chunk_minutes']. By default, use a timedelta of 1440 minutes (1 day). If chunk_interval is an integer and the datetime axis a timestamp, the use a timedelta with the number of minutes configured to this value. If the datetime axis is an integer, default to the configured chunksize. If chunk_interval is a timedelta and the datetime axis an integer, use the number of minutes in the timedelta.
  • order (Optional[str], default 'asc'): If order is not None, sort the resulting dataframe by indices.
  • limit (Optional[int], default None): If provided, cap the dataframe to this many rows.
  • fresh (bool, default True): If True, skip local cache and directly query the instance connector. Defaults to True.
  • debug (bool, default False): Verbosity toggle. Defaults to False.
Returns
  • A pd.DataFrame for the pipe's data corresponding to the provided parameters.
def get_backtrack_data( self, backtrack_minutes: Optional[int] = None, begin: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, limit: Optional[int] = None, fresh: bool = False, debug: bool = False, **kw: Any) -> Optional[pandas.core.frame.DataFrame]:
388def get_backtrack_data(
389    self,
390    backtrack_minutes: Optional[int] = None,
391    begin: Union[datetime, int, None] = None,
392    params: Optional[Dict[str, Any]] = None,
393    limit: Optional[int] = None,
394    fresh: bool = False,
395    debug: bool = False,
396    **kw: Any
397) -> Optional['pd.DataFrame']:
398    """
399    Get the most recent data from the instance connector as a Pandas DataFrame.
400
401    Parameters
402    ----------
403    backtrack_minutes: Optional[int], default None
404        How many minutes from `begin` to select from.
405        If `None`, use `pipe.parameters['fetch']['backtrack_minutes']`.
406
407    begin: Optional[datetime], default None
408        The starting point to search for data.
409        If begin is `None` (default), use the most recent observed datetime
410        (AKA sync_time).
411
412        ```
413        E.g. begin = 02:00
414
415        Search this region.           Ignore this, even if there's data.
416        /  /  /  /  /  /  /  /  /  |
417        -----|----------|----------|----------|----------|----------|
418        00:00      01:00      02:00      03:00      04:00      05:00
419
420        ```
421
422    params: Optional[Dict[str, Any]], default None
423        The standard Meerschaum `params` query dictionary.
424
425    limit: Optional[int], default None
426        If provided, cap the number of rows to be returned.
427
428    fresh: bool, default False
429        If `True`, Ignore local cache and pull directly from the instance connector.
430        Only comes into effect if a pipe was created with `cache=True`.
431
432    debug: bool default False
433        Verbosity toggle.
434
435    Returns
436    -------
437    A `pd.DataFrame` for the pipe's data corresponding to the provided parameters. Backtrack data
438    is a convenient way to get a pipe's data "backtracked" from the most recent datetime.
439    """
440    from meerschaum.utils.warnings import warn
441    from meerschaum.utils.venv import Venv
442    from meerschaum.connectors import get_connector_plugin
443
444    if not self.exists(debug=debug):
445        return None
446
447    begin = self.parse_date_bounds(begin)
448
449    backtrack_interval = self.get_backtrack_interval(debug=debug)
450    if backtrack_minutes is None:
451        backtrack_minutes = (
452            (backtrack_interval.total_seconds() / 60)
453            if isinstance(backtrack_interval, timedelta)
454            else backtrack_interval
455        )
456
457    if self.cache_pipe is not None:
458        if not fresh:
459            _sync_cache_tuple = self.cache_pipe.sync(begin=begin, params=params, debug=debug, **kw)
460            if not _sync_cache_tuple[0]:
461                warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1])
462                fresh = True
463            else: ### Successfully synced cache.
464                return self.enforce_dtypes(
465                    self.cache_pipe.get_backtrack_data(
466                        fresh=True,
467                        begin=begin,
468                        backtrack_minutes=backtrack_minutes,
469                        params=params,
470                        limit=limit,
471                        order=kw.get('order', 'desc'),
472                        debug=debug,
473                        **kw
474                    ),
475                    debug=debug,
476                )
477
478    if hasattr(self.instance_connector, 'get_backtrack_data'):
479        with Venv(get_connector_plugin(self.instance_connector)):
480            return self.enforce_dtypes(
481                self.instance_connector.get_backtrack_data(
482                    pipe=self,
483                    begin=begin,
484                    backtrack_minutes=backtrack_minutes,
485                    params=params,
486                    limit=limit,
487                    debug=debug,
488                    **kw
489                ),
490                debug=debug,
491            )
492
493    if begin is None:
494        begin = self.get_sync_time(params=params, debug=debug)
495
496    backtrack_interval = (
497        timedelta(minutes=backtrack_minutes)
498        if isinstance(begin, datetime)
499        else backtrack_minutes
500    )
501    if begin is not None:
502        begin = begin - backtrack_interval
503
504    return self.get_data(
505        begin=begin,
506        params=params,
507        debug=debug,
508        limit=limit,
509        order=kw.get('order', 'desc'),
510        **kw
511    )

Get the most recent data from the instance connector as a Pandas DataFrame.

Parameters
  • backtrack_minutes (Optional[int], default None): How many minutes from begin to select from. If None, use pipe.parameters['fetch']['backtrack_minutes'].
  • begin (Optional[datetime], default None): The starting point to search for data. If begin is None (default), use the most recent observed datetime (AKA sync_time).

    E.g. begin = 02:00
    
    Search this region.           Ignore this, even if there's data.
    /  /  /  /  /  /  /  /  /  |
    -----|----------|----------|----------|----------|----------|
    00:00      01:00      02:00      03:00      04:00      05:00
    
    
  • params (Optional[Dict[str, Any]], default None): The standard Meerschaum params query dictionary.

  • limit (Optional[int], default None): If provided, cap the number of rows to be returned.
  • fresh (bool, default False): If True, Ignore local cache and pull directly from the instance connector. Only comes into effect if a pipe was created with cache=True.
  • debug (bool default False): Verbosity toggle.
Returns
  • A pd.DataFrame for the pipe's data corresponding to the provided parameters. Backtrack data
  • is a convenient way to get a pipe's data "backtracked" from the most recent datetime.
def get_rowcount( self, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, remote: bool = False, debug: bool = False) -> int:
514def get_rowcount(
515    self,
516    begin: Union[datetime, int, None] = None,
517    end: Union[datetime, int, None] = None,
518    params: Optional[Dict[str, Any]] = None,
519    remote: bool = False,
520    debug: bool = False
521) -> int:
522    """
523    Get a Pipe's instance or remote rowcount.
524
525    Parameters
526    ----------
527    begin: Optional[datetime], default None
528        Count rows where datetime > begin.
529
530    end: Optional[datetime], default None
531        Count rows where datetime < end.
532
533    remote: bool, default False
534        Count rows from a pipe's remote source.
535        **NOTE**: This is experimental!
536
537    debug: bool, default False
538        Verbosity toggle.
539
540    Returns
541    -------
542    An `int` of the number of rows in the pipe corresponding to the provided parameters.
543    Returned 0 if the pipe does not exist.
544    """
545    from meerschaum.utils.warnings import warn
546    from meerschaum.utils.venv import Venv
547    from meerschaum.connectors import get_connector_plugin
548    from meerschaum.utils.misc import filter_keywords
549
550    begin, end = self.parse_date_bounds(begin, end)
551    connector = self.instance_connector if not remote else self.connector
552    try:
553        with Venv(get_connector_plugin(connector)):
554            if not hasattr(connector, 'get_pipe_rowcount'):
555                warn(
556                    f"Connectors of type '{connector.type}' "
557                    "do not implement `get_pipe_rowcount()`.",
558                    stack=False,
559                )
560                return 0
561            kwargs = filter_keywords(
562                connector.get_pipe_rowcount,
563                begin=begin,
564                end=end,
565                params=params,
566                remote=remote,
567                debug=debug,
568            )
569            if remote and 'remote' not in kwargs:
570                warn(
571                    f"Connectors of type '{connector.type}' do not support remote rowcounts.",
572                    stack=False,
573                )
574                return 0
575            rowcount = connector.get_pipe_rowcount(
576                self,
577                begin=begin,
578                end=end,
579                params=params,
580                remote=remote,
581                debug=debug,
582            )
583            if rowcount is None:
584                return 0
585            return rowcount
586    except AttributeError as e:
587        warn(e)
588        if remote:
589            return 0
590    warn(f"Failed to get a rowcount for {self}.")
591    return 0

Get a Pipe's instance or remote rowcount.

Parameters
  • begin (Optional[datetime], default None): Count rows where datetime > begin.
  • end (Optional[datetime], default None): Count rows where datetime < end.
  • remote (bool, default False): Count rows from a pipe's remote source. NOTE: This is experimental!
  • debug (bool, default False): Verbosity toggle.
Returns
  • An int of the number of rows in the pipe corresponding to the provided parameters.
  • Returned 0 if the pipe does not exist.
def get_chunk_interval( self, chunk_interval: Union[datetime.timedelta, int, NoneType] = None, debug: bool = False) -> Union[datetime.timedelta, int]:
594def get_chunk_interval(
595    self,
596    chunk_interval: Union[timedelta, int, None] = None,
597    debug: bool = False,
598) -> Union[timedelta, int]:
599    """
600    Get the chunk interval to use for this pipe.
601
602    Parameters
603    ----------
604    chunk_interval: Union[timedelta, int, None], default None
605        If provided, coerce this value into the correct type.
606        For example, if the datetime axis is an integer, then
607        return the number of minutes.
608
609    Returns
610    -------
611    The chunk interval (`timedelta` or `int`) to use with this pipe's `datetime` axis.
612    """
613    default_chunk_minutes = get_config('pipes', 'parameters', 'verify', 'chunk_minutes')
614    configured_chunk_minutes = self.parameters.get('verify', {}).get('chunk_minutes', None)
615    chunk_minutes = (
616        (configured_chunk_minutes or default_chunk_minutes)
617        if chunk_interval is None
618        else (
619            chunk_interval
620            if isinstance(chunk_interval, int)
621            else int(chunk_interval.total_seconds() / 60)
622        )
623    )
624
625    dt_col = self.columns.get('datetime', None)
626    if dt_col is None:
627        return timedelta(minutes=chunk_minutes)
628
629    dt_dtype = self.dtypes.get(dt_col, 'datetime64[ns, UTC]')
630    if 'int' in dt_dtype.lower():
631        return chunk_minutes
632    return timedelta(minutes=chunk_minutes)

Get the chunk interval to use for this pipe.

Parameters
  • chunk_interval (Union[timedelta, int, None], default None): If provided, coerce this value into the correct type. For example, if the datetime axis is an integer, then return the number of minutes.
Returns
  • The chunk interval (timedelta or int) to use with this pipe's datetime axis.
def get_chunk_bounds( self, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, bounded: bool = False, chunk_interval: Union[datetime.timedelta, int, NoneType] = None, debug: bool = False) -> List[Tuple[Union[datetime.datetime, int, NoneType], Union[datetime.datetime, int, NoneType]]]:
635def get_chunk_bounds(
636    self,
637    begin: Union[datetime, int, None] = None,
638    end: Union[datetime, int, None] = None,
639    bounded: bool = False,
640    chunk_interval: Union[timedelta, int, None] = None,
641    debug: bool = False,
642) -> List[
643    Tuple[
644        Union[datetime, int, None],
645        Union[datetime, int, None],
646    ]
647]:
648    """
649    Return a list of datetime bounds for iterating over the pipe's `datetime` axis.
650
651    Parameters
652    ----------
653    begin: Union[datetime, int, None], default None
654        If provided, do not select less than this value.
655        Otherwise the first chunk will be unbounded.
656
657    end: Union[datetime, int, None], default None
658        If provided, do not select greater than or equal to this value.
659        Otherwise the last chunk will be unbounded.
660
661    bounded: bool, default False
662        If `True`, do not include `None` in the first chunk.
663
664    chunk_interval: Union[timedelta, int, None], default None
665        If provided, use this interval for the size of chunk boundaries.
666        The default value for this pipe may be set
667        under `pipe.parameters['verify']['chunk_minutes']`.
668
669    debug: bool, default False
670        Verbosity toggle.
671
672    Returns
673    -------
674    A list of chunk bounds (datetimes or integers).
675    If unbounded, the first and last chunks will include `None`.
676    """
677    from datetime import timedelta
678    from meerschaum.utils.dtypes import are_dtypes_equal
679    from meerschaum.utils.misc import interval_str
680    include_less_than_begin = not bounded and begin is None
681    include_greater_than_end = not bounded and end is None
682    if begin is None:
683        begin = self.get_sync_time(newest=False, debug=debug)
684    if end is None:
685        end = self.get_sync_time(newest=True, debug=debug)
686        if end is not None and hasattr(end, 'tzinfo'):
687            end += timedelta(minutes=1)
688        elif are_dtypes_equal(str(type(end)), 'int'):
689            end += 1
690    if begin is None and end is None:
691        return [(None, None)]
692
693    begin, end = self.parse_date_bounds(begin, end)
694
695    ### Set the chunk interval under `pipe.parameters['verify']['chunk_minutes']`.
696    chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug)
697    
698    ### Build a list of tuples containing the chunk boundaries
699    ### so that we can sync multiple chunks in parallel.
700    ### Run `verify pipes --workers 1` to sync chunks in series.
701    chunk_bounds = []
702    begin_cursor = begin
703    num_chunks = 0
704    max_chunks = 1_000_000
705    while begin_cursor < end:
706        end_cursor = begin_cursor + chunk_interval
707        chunk_bounds.append((begin_cursor, end_cursor))
708        begin_cursor = end_cursor
709        num_chunks += 1
710        if num_chunks >= max_chunks:
711            raise ValueError(
712                f"Too many chunks of size '{interval_str(chunk_interval)}' between '{begin}' and '{end}'."
713            )
714
715    ### The chunk interval might be too large.
716    if not chunk_bounds and end >= begin:
717        chunk_bounds = [(begin, end)]
718
719    ### Truncate the last chunk to the end timestamp.
720    if chunk_bounds[-1][1] > end:
721        chunk_bounds[-1] = (chunk_bounds[-1][0], end)
722
723    ### Pop the last chunk if its bounds are equal.
724    if chunk_bounds[-1][0] == chunk_bounds[-1][1]:
725        chunk_bounds = chunk_bounds[:-1]
726
727    if include_less_than_begin:
728        chunk_bounds = [(None, begin)] + chunk_bounds
729    if include_greater_than_end:
730        chunk_bounds = chunk_bounds + [(end, None)]
731
732    return chunk_bounds

Return a list of datetime bounds for iterating over the pipe's datetime axis.

Parameters
  • begin (Union[datetime, int, None], default None): If provided, do not select less than this value. Otherwise the first chunk will be unbounded.
  • end (Union[datetime, int, None], default None): If provided, do not select greater than or equal to this value. Otherwise the last chunk will be unbounded.
  • bounded (bool, default False): If True, do not include None in the first chunk.
  • chunk_interval (Union[timedelta, int, None], default None): If provided, use this interval for the size of chunk boundaries. The default value for this pipe may be set under pipe.parameters['verify']['chunk_minutes'].
  • debug (bool, default False): Verbosity toggle.
Returns
  • A list of chunk bounds (datetimes or integers).
  • If unbounded, the first and last chunks will include None.
def get_chunk_bounds_batches( self, chunk_bounds: List[Tuple[Union[datetime.datetime, int, NoneType], Union[datetime.datetime, int, NoneType]]], batchsize: Optional[int] = None, workers: Optional[int] = None, debug: bool = False) -> List[Tuple[Tuple[Union[datetime.datetime, int, NoneType], Union[datetime.datetime, int, NoneType]], ...]]:
735def get_chunk_bounds_batches(
736    self,
737    chunk_bounds: List[Tuple[Union[datetime, int, None], Union[datetime, int, None]]],
738    batchsize: Optional[int] = None,
739    workers: Optional[int] = None,
740    debug: bool = False,
741) -> List[
742    Tuple[
743        Tuple[
744            Union[datetime, int, None],
745            Union[datetime, int, None],
746        ], ...
747    ]
748]:
749    """
750    Return a list of tuples of chunk bounds of size `batchsize`.
751
752    Parameters
753    ----------
754    chunk_bounds: List[Tuple[Union[datetime, int, None], Union[datetime, int, None]]]
755        A list of chunk_bounds (see `Pipe.get_chunk_bounds()`).
756
757    batchsize: Optional[int], default None
758        How many chunks to include in a batch. Defaults to `Pipe.get_num_workers()`.
759
760    workers: Optional[int], default None
761        If `batchsize` is `None`, use this as the desired number of workers.
762        Passed to `Pipe.get_num_workers()`.
763
764    Returns
765    -------
766    A list of tuples of chunk bound tuples.
767    """
768    from meerschaum.utils.misc import iterate_chunks
769    
770    if batchsize is None:
771        batchsize = self.get_num_workers(workers=workers)
772
773    return [
774        tuple(
775            _batch_chunk_bounds
776            for _batch_chunk_bounds in batch
777            if _batch_chunk_bounds is not None
778        )
779        for batch in iterate_chunks(chunk_bounds, batchsize)
780        if batch
781    ]

Return a list of tuples of chunk bounds of size batchsize.

Parameters
  • chunk_bounds (List[Tuple[Union[datetime, int, None], Union[datetime, int, None]]]): A list of chunk_bounds (see Pipe.get_chunk_bounds()).
  • batchsize (Optional[int], default None): How many chunks to include in a batch. Defaults to Pipe.get_num_workers().
  • workers (Optional[int], default None): If batchsize is None, use this as the desired number of workers. Passed to Pipe.get_num_workers().
Returns
  • A list of tuples of chunk bound tuples.
def parse_date_bounds( self, *dt_vals: Union[datetime.datetime, int, NoneType]) -> Union[datetime.datetime, int, str, NoneType, Tuple[Union[datetime.datetime, int, str, NoneType]]]:
784def parse_date_bounds(self, *dt_vals: Union[datetime, int, None]) -> Union[
785    datetime,
786    int,
787    str,
788    None,
789    Tuple[Union[datetime, int, str, None]]
790]:
791    """
792    Given a date bound (begin, end), coerce a timezone if necessary.
793    """
794    from meerschaum.utils.misc import is_int
795    from meerschaum.utils.dtypes import coerce_timezone
796    from meerschaum.utils.warnings import warn
797    dateutil_parser = mrsm.attempt_import('dateutil.parser')
798
799    def _parse_date_bound(dt_val):
800        if dt_val is None:
801            return None
802
803        if isinstance(dt_val, int):
804            return dt_val
805
806        if dt_val == '':
807            return ''
808
809        if is_int(dt_val):
810            return int(dt_val)
811
812        if isinstance(dt_val, str):
813            try:
814                dt_val = dateutil_parser.parse(dt_val)
815            except Exception as e:
816                warn(f"Could not parse '{dt_val}' as datetime:\n{e}")
817                return None
818
819        dt_col = self.columns.get('datetime', None)
820        dt_typ = str(self.dtypes.get(dt_col, 'datetime64[ns, UTC]'))
821        if dt_typ == 'datetime':
822            dt_typ = 'datetime64[ns, UTC]'
823        return coerce_timezone(dt_val, strip_utc=('utc' not in dt_typ.lower()))
824
825    bounds = tuple(_parse_date_bound(dt_val) for dt_val in dt_vals)
826    if len(bounds) == 1:
827        return bounds[0]
828    return bounds

Given a date bound (begin, end), coerce a timezone if necessary.

def register(self, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
12def register(
13        self,
14        debug: bool = False,
15        **kw: Any
16    ) -> SuccessTuple:
17    """
18    Register a new Pipe along with its attributes.
19
20    Parameters
21    ----------
22    debug: bool, default False
23        Verbosity toggle.
24
25    kw: Any
26        Keyword arguments to pass to `instance_connector.register_pipe()`.
27
28    Returns
29    -------
30    A `SuccessTuple` of success, message.
31    """
32    if self.temporary:
33        return False, "Cannot register pipes created with `temporary=True` (read-only)."
34
35    from meerschaum.utils.formatting import get_console
36    from meerschaum.utils.venv import Venv
37    from meerschaum.connectors import get_connector_plugin, custom_types
38    from meerschaum.config._patch import apply_patch_to_config
39
40    import warnings
41    with warnings.catch_warnings():
42        warnings.simplefilter('ignore')
43        try:
44            _conn = self.connector
45        except Exception as e:
46            _conn = None
47
48    if (
49        _conn is not None
50        and
51        (_conn.type == 'plugin' or _conn.type in custom_types)
52        and
53        getattr(_conn, 'register', None) is not None
54    ):
55        try:
56            with Venv(get_connector_plugin(_conn), debug=debug):
57                params = self.connector.register(self)
58        except Exception as e:
59            get_console().print_exception()
60            params = None
61        params = {} if params is None else params
62        if not isinstance(params, dict):
63            from meerschaum.utils.warnings import warn
64            warn(
65                f"Invalid parameters returned from `register()` in connector {self.connector}:\n"
66                + f"{params}"
67            )
68        else:
69            self.parameters = apply_patch_to_config(params, self.parameters)
70
71    if not self.parameters:
72        cols = self.columns if self.columns else {'datetime': None, 'id': None}
73        self.parameters = {
74            'columns': cols,
75        }
76
77    with Venv(get_connector_plugin(self.instance_connector)):
78        return self.instance_connector.register_pipe(self, debug=debug, **kw)

Register a new Pipe along with its attributes.

Parameters
  • debug (bool, default False): Verbosity toggle.
  • kw (Any): Keyword arguments to pass to instance_connector.register_pipe().
Returns
attributes: Dict[str, Any]
19@property
20def attributes(self) -> Dict[str, Any]:
21    """
22    Return a dictionary of a pipe's keys and parameters.
23    These values are reflected directly from the pipes table of the instance.
24    """
25    import time
26    from meerschaum.config import get_config
27    from meerschaum.config._patch import apply_patch_to_config
28    from meerschaum.utils.venv import Venv
29    from meerschaum.connectors import get_connector_plugin
30
31    timeout_seconds = get_config('pipes', 'attributes', 'local_cache_timeout_seconds')
32
33    if '_attributes' not in self.__dict__:
34        self._attributes = {}
35
36    now = time.perf_counter()
37    last_refresh = self.__dict__.get('_attributes_sync_time', None)
38    timed_out = (
39        last_refresh is None
40        or
41        (timeout_seconds is not None and (now - last_refresh) >= timeout_seconds)
42    )
43    if not self.temporary and timed_out:
44        self._attributes_sync_time = now
45        local_attributes = self.__dict__.get('_attributes', {})
46        with Venv(get_connector_plugin(self.instance_connector)):
47            instance_attributes = self.instance_connector.get_pipe_attributes(self)
48        self._attributes = apply_patch_to_config(instance_attributes, local_attributes)
49    return self._attributes

Return a dictionary of a pipe's keys and parameters. These values are reflected directly from the pipes table of the instance.

parameters: Optional[Dict[str, Any]]
52@property
53def parameters(self) -> Optional[Dict[str, Any]]:
54    """
55    Return the parameters dictionary of the pipe.
56    """
57    if 'parameters' not in self.attributes:
58        self.attributes['parameters'] = {}
59    _parameters = self.attributes['parameters']
60    dt_col = _parameters.get('columns', {}).get('datetime', None)
61    dt_typ = _parameters.get('dtypes', {}).get(dt_col, None) if dt_col else None
62    if dt_col and not dt_typ:
63        if 'dtypes' not in _parameters:
64            self.attributes['parameters']['dtypes'] = {}
65        self.attributes['parameters']['dtypes'][dt_col] = 'datetime'
66    return self.attributes['parameters']

Return the parameters dictionary of the pipe.

columns: Optional[Dict[str, str]]
78@property
79def columns(self) -> Union[Dict[str, str], None]:
80    """
81    Return the `columns` dictionary defined in `meerschaum.Pipe.parameters`.
82    """
83    if 'columns' not in self.parameters:
84        self.parameters['columns'] = {}
85    cols = self.parameters['columns']
86    if not isinstance(cols, dict):
87        cols = {}
88        self.parameters['columns'] = cols
89    return {col_ix: col for col_ix, col in cols.items() if col}

Return the columns dictionary defined in meerschaum.Pipe.parameters.

indices: Optional[Dict[str, Union[str, List[str]]]]
106@property
107def indices(self) -> Union[Dict[str, Union[str, List[str]]], None]:
108    """
109    Return the `indices` dictionary defined in `meerschaum.Pipe.parameters`.
110    """
111    indices_key = (
112        'indexes'
113        if 'indexes' in self.parameters
114        else 'indices'
115    )
116    if indices_key not in self.parameters:
117        self.parameters[indices_key] = {}
118    _indices = self.parameters[indices_key]
119    _columns = self.columns
120    dt_col = _columns.get('datetime', None)
121    if not isinstance(_indices, dict):
122        _indices = {}
123        self.parameters[indices_key] = _indices
124    unique_cols = list(set((
125        [dt_col]
126        if dt_col
127        else []
128    ) + [
129        col
130        for col_ix, col in _columns.items()
131        if col and col_ix != 'datetime'
132    ]))
133    return {
134        **({'unique': unique_cols} if len(unique_cols) > 1 else {}),
135        **{col_ix: col for col_ix, col in _columns.items() if col},
136        **_indices
137    }

Return the indices dictionary defined in meerschaum.Pipe.parameters.

indexes: Optional[Dict[str, Union[str, List[str]]]]
140@property
141def indexes(self) -> Union[Dict[str, Union[str, List[str]]], None]:
142    """
143    Alias for `meerschaum.Pipe.indices`.
144    """
145    return self.indices
dtypes: Optional[Dict[str, Any]]
198@property
199def dtypes(self) -> Union[Dict[str, Any], None]:
200    """
201    If defined, return the `dtypes` dictionary defined in `meerschaum.Pipe.parameters`.
202    """
203    from meerschaum.config._patch import apply_patch_to_config
204    from meerschaum.utils.dtypes import MRSM_ALIAS_DTYPES
205    configured_dtypes = self.parameters.get('dtypes', {})
206    remote_dtypes = self.infer_dtypes(persist=False)
207    patched_dtypes = apply_patch_to_config(remote_dtypes, configured_dtypes)
208    return {
209        col: MRSM_ALIAS_DTYPES.get(typ, typ)
210        for col, typ in patched_dtypes.items()
211        if col and typ
212    }

If defined, return the dtypes dictionary defined in meerschaum.Pipe.parameters.

autoincrement: bool
260@property
261def autoincrement(self) -> bool:
262    """
263    Return the `autoincrement` parameter for the pipe.
264    """
265    if 'autoincrement' not in self.parameters:
266        self.parameters['autoincrement'] = False
267
268    return self.parameters['autoincrement']

Return the autoincrement parameter for the pipe.

upsert: bool
224@property
225def upsert(self) -> bool:
226    """
227    Return whether `upsert` is set for the pipe.
228    """
229    if 'upsert' not in self.parameters:
230        self.parameters['upsert'] = False
231    return self.parameters['upsert']

Return whether upsert is set for the pipe.

static: bool
242@property
243def static(self) -> bool:
244    """
245    Return whether `static` is set for the pipe.
246    """
247    if 'static' not in self.parameters:
248        self.parameters['static'] = False
249    return self.parameters['static']

Return whether static is set for the pipe.

tzinfo: Optional[datetime.timezone]
279@property
280def tzinfo(self) -> Union[None, timezone]:
281    """
282    Return `timezone.utc` if the pipe is timezone-aware.
283    """
284    dt_col = self.columns.get('datetime', None)
285    if not dt_col:
286        return None
287
288    dt_typ = str(self.dtypes.get(dt_col, 'datetime64[ns, UTC]'))
289    if 'utc' in dt_typ.lower() or dt_typ == 'datetime':
290        return timezone.utc
291
292    if dt_typ == 'datetime64[ns]':
293        return None
294
295    return None

Return timezone.utc if the pipe is timezone-aware.

enforce: bool
298@property
299def enforce(self) -> bool:
300    """
301    Return the `enforce` parameter for the pipe.
302    """
303    if 'enforce' not in self.parameters:
304        self.parameters['enforce'] = True
305
306    return self.parameters['enforce']

Return the enforce parameter for the pipe.

null_indices: bool
317@property
318def null_indices(self) -> bool:
319    """
320    Return the `null_indices` parameter for the pipe.
321    """
322    if 'null_indices' not in self.parameters:
323        self.parameters['null_indices'] = True
324
325    return self.parameters['null_indices']

Return the null_indices parameter for the pipe.

def get_columns(self, *args: str, error: bool = False) -> Union[str, Tuple[str]]:
336def get_columns(self, *args: str, error: bool = False) -> Union[str, Tuple[str]]:
337    """
338    Check if the requested columns are defined.
339
340    Parameters
341    ----------
342    *args: str
343        The column names to be retrieved.
344
345    error: bool, default False
346        If `True`, raise an `Exception` if the specified column is not defined.
347
348    Returns
349    -------
350    A tuple of the same size of `args` or a `str` if `args` is a single argument.
351
352    Examples
353    --------
354    >>> pipe = mrsm.Pipe('test', 'test')
355    >>> pipe.columns = {'datetime': 'dt', 'id': 'id'}
356    >>> pipe.get_columns('datetime', 'id')
357    ('dt', 'id')
358    >>> pipe.get_columns('value', error=True)
359    Exception:  🛑 Missing 'value' column for Pipe('test', 'test').
360    """
361    from meerschaum.utils.warnings import error as _error, warn
362    if not args:
363        args = tuple(self.columns.keys())
364    col_names = []
365    for col in args:
366        col_name = None
367        try:
368            col_name = self.columns[col]
369            if col_name is None and error:
370                _error(f"Please define the name of the '{col}' column for {self}.")
371        except Exception as e:
372            col_name = None
373        if col_name is None and error:
374            _error(f"Missing '{col}'" + f" column for {self}.")
375        col_names.append(col_name)
376    if len(col_names) == 1:
377        return col_names[0]
378    return tuple(col_names)

Check if the requested columns are defined.

Parameters
  • *args (str): The column names to be retrieved.
  • error (bool, default False): If True, raise an Exception if the specified column is not defined.
Returns
  • A tuple of the same size of args or a str if args is a single argument.
Examples
>>> pipe = mrsm.Pipe('test', 'test')
>>> pipe.columns = {'datetime': 'dt', 'id': 'id'}
>>> pipe.get_columns('datetime', 'id')
('dt', 'id')
>>> pipe.get_columns('value', error=True)
Exception:  🛑 Missing 'value' column for Pipe('test', 'test').
def get_columns_types( self, refresh: bool = False, debug: bool = False) -> Optional[Dict[str, str]]:
381def get_columns_types(
382    self,
383    refresh: bool = False,
384    debug: bool = False,
385) -> Union[Dict[str, str], None]:
386    """
387    Get a dictionary of a pipe's column names and their types.
388
389    Parameters
390    ----------
391    refresh: bool, default False
392        If `True`, invalidate the cache and fetch directly from the instance connector.
393
394    debug: bool, default False:
395        Verbosity toggle.
396
397    Returns
398    -------
399    A dictionary of column names (`str`) to column types (`str`).
400
401    Examples
402    --------
403    >>> pipe.get_columns_types()
404    {
405      'dt': 'TIMESTAMP WITH TIMEZONE',
406      'id': 'BIGINT',
407      'val': 'DOUBLE PRECISION',
408    }
409    >>>
410    """
411    import time
412    from meerschaum.connectors import get_connector_plugin
413    from meerschaum.config.static import STATIC_CONFIG
414    from meerschaum.utils.warnings import dprint
415
416    now = time.perf_counter()
417    cache_seconds = STATIC_CONFIG['pipes']['static_schema_cache_seconds']
418    if not self.static:
419        refresh = True
420    if refresh:
421        _ = self.__dict__.pop('_columns_types_timestamp', None)
422        _ = self.__dict__.pop('_columns_types', None)
423    _columns_types = self.__dict__.get('_columns_types', None)
424    if _columns_types:
425        columns_types_timestamp = self.__dict__.get('_columns_types_timestamp', None)
426        if columns_types_timestamp is not None:
427            delta = now - columns_types_timestamp
428            if delta < cache_seconds:
429                if debug:
430                    dprint(
431                        f"Returning cached `columns_types` for {self} "
432                        f"({round(delta, 2)} seconds old)."
433                    )
434                return _columns_types
435
436    with mrsm.Venv(get_connector_plugin(self.instance_connector)):
437        _columns_types = (
438            self.instance_connector.get_pipe_columns_types(self, debug=debug)
439            if hasattr(self.instance_connector, 'get_pipe_columns_types')
440            else None
441        )
442
443    self.__dict__['_columns_types'] = _columns_types
444    self.__dict__['_columns_types_timestamp'] = now
445    return _columns_types or {}

Get a dictionary of a pipe's column names and their types.

Parameters
  • refresh (bool, default False): If True, invalidate the cache and fetch directly from the instance connector.
  • debug (bool, default False:): Verbosity toggle.
Returns
  • A dictionary of column names (str) to column types (str).
Examples
>>> pipe.get_columns_types()
{
  'dt': 'TIMESTAMP WITH TIMEZONE',
  'id': 'BIGINT',
  'val': 'DOUBLE PRECISION',
}
>>>
def get_columns_indices( self, debug: bool = False, refresh: bool = False) -> Dict[str, List[Dict[str, str]]]:
448def get_columns_indices(
449    self,
450    debug: bool = False,
451    refresh: bool = False,
452) -> Dict[str, List[Dict[str, str]]]:
453    """
454    Return a dictionary mapping columns to index information.
455    """
456    import time
457    from meerschaum.connectors import get_connector_plugin
458    from meerschaum.config.static import STATIC_CONFIG
459    from meerschaum.utils.warnings import dprint
460
461    now = time.perf_counter()
462    cache_seconds = (
463        STATIC_CONFIG['pipes']['static_schema_cache_seconds']
464        if self.static
465        else STATIC_CONFIG['pipes']['exists_timeout_seconds']
466    )
467    if refresh:
468        _ = self.__dict__.pop('_columns_indices_timestamp', None)
469        _ = self.__dict__.pop('_columns_indices', None)
470    _columns_indices = self.__dict__.get('_columns_indices', None)
471    if _columns_indices:
472        columns_indices_timestamp = self.__dict__.get('_columns_indices_timestamp', None)
473        if columns_indices_timestamp is not None:
474            delta = now - columns_indices_timestamp
475            if delta < cache_seconds:
476                if debug:
477                    dprint(
478                        f"Returning cached `columns_indices` for {self} "
479                        f"({round(delta, 2)} seconds old)."
480                    )
481                return _columns_indices
482
483    with mrsm.Venv(get_connector_plugin(self.instance_connector)):
484        _columns_indices = (
485            self.instance_connector.get_pipe_columns_indices(self, debug=debug)
486            if hasattr(self.instance_connector, 'get_pipe_columns_indices')
487            else None
488        )
489
490    self.__dict__['_columns_indices'] = _columns_indices
491    self.__dict__['_columns_indices_timestamp'] = now
492    return {k: v for k, v in _columns_indices.items() if k and v} or {}

Return a dictionary mapping columns to index information.

def get_indices(self) -> Dict[str, str]:
743def get_indices(self) -> Dict[str, str]:
744    """
745    Return a dictionary mapping index keys to their names in the database.
746
747    Returns
748    -------
749    A dictionary of index keys to index names.
750    """
751    from meerschaum.connectors import get_connector_plugin
752    with mrsm.Venv(get_connector_plugin(self.instance_connector)):
753        if hasattr(self.instance_connector, 'get_pipe_index_names'):
754            result = self.instance_connector.get_pipe_index_names(self)
755        else:
756            result = {}
757    
758    return result

Return a dictionary mapping index keys to their names in the database.

Returns
  • A dictionary of index keys to index names.
tags: Optional[List[str]]
173@property
174def tags(self) -> Union[List[str], None]:
175    """
176    If defined, return the `tags` list defined in `meerschaum.Pipe.parameters`.
177    """
178    if 'tags' not in self.parameters:
179        self.parameters['tags'] = []
180    return self.parameters['tags']

If defined, return the tags list defined in meerschaum.Pipe.parameters.

def get_id(self, **kw: Any) -> Optional[int]:
495def get_id(self, **kw: Any) -> Union[int, None]:
496    """
497    Fetch a pipe's ID from its instance connector.
498    If the pipe does not exist, return `None`.
499    """
500    if self.temporary:
501        return None
502    from meerschaum.utils.venv import Venv
503    from meerschaum.connectors import get_connector_plugin
504
505    with Venv(get_connector_plugin(self.instance_connector)):
506        if hasattr(self.instance_connector, 'get_pipe_id'):
507            return self.instance_connector.get_pipe_id(self, **kw)
508
509    return None

Fetch a pipe's ID from its instance connector. If the pipe does not exist, return None.

id: Optional[int]
512@property
513def id(self) -> Union[int, None]:
514    """
515    Fetch and cache a pipe's ID.
516    """
517    if not ('_id' in self.__dict__ and self._id):
518        self._id = self.get_id()
519    return self._id

Fetch and cache a pipe's ID.

def get_val_column(self, debug: bool = False) -> Optional[str]:
522def get_val_column(self, debug: bool = False) -> Union[str, None]:
523    """
524    Return the name of the value column if it's defined, otherwise make an educated guess.
525    If not set in the `columns` dictionary, return the first numeric column that is not
526    an ID or datetime column.
527    If none may be found, return `None`.
528
529    Parameters
530    ----------
531    debug: bool, default False:
532        Verbosity toggle.
533
534    Returns
535    -------
536    Either a string or `None`.
537    """
538    from meerschaum.utils.debug import dprint
539    if debug:
540        dprint('Attempting to determine the value column...')
541    try:
542        val_name = self.get_columns('value')
543    except Exception as e:
544        val_name = None
545    if val_name is not None:
546        if debug:
547            dprint(f"Value column: {val_name}")
548        return val_name
549
550    cols = self.columns
551    if cols is None:
552        if debug:
553            dprint('No columns could be determined. Returning...')
554        return None
555    try:
556        dt_name = self.get_columns('datetime', error=False)
557    except Exception as e:
558        dt_name = None
559    try:
560        id_name = self.get_columns('id', errors=False)
561    except Exception as e:
562        id_name = None
563
564    if debug:
565        dprint(f"dt_name: {dt_name}")
566        dprint(f"id_name: {id_name}")
567
568    cols_types = self.get_columns_types(debug=debug)
569    if cols_types is None:
570        return None
571    if debug:
572        dprint(f"cols_types: {cols_types}")
573    if dt_name is not None:
574        cols_types.pop(dt_name, None)
575    if id_name is not None:
576        cols_types.pop(id_name, None)
577
578    candidates = []
579    candidate_keywords = {'float', 'double', 'precision', 'int', 'numeric',}
580    for search_term in candidate_keywords:
581        for col, typ in cols_types.items():
582            if search_term in typ.lower():
583                candidates.append(col)
584                break
585    if not candidates:
586        if debug:
587            dprint("No value column could be determined.")
588        return None
589
590    return candidates[0]

Return the name of the value column if it's defined, otherwise make an educated guess. If not set in the columns dictionary, return the first numeric column that is not an ID or datetime column. If none may be found, return None.

Parameters
  • debug (bool, default False:): Verbosity toggle.
Returns
  • Either a string or None.
parents: List[Pipe]
593@property
594def parents(self) -> List[mrsm.Pipe]:
595    """
596    Return a list of `meerschaum.Pipe` objects to be designated as parents.
597    """
598    if 'parents' not in self.parameters:
599        return []
600    from meerschaum.utils.warnings import warn
601    _parents_keys = self.parameters['parents']
602    if not isinstance(_parents_keys, list):
603        warn(
604            f"Please ensure the parents for {self} are defined as a list of keys.",
605            stacklevel = 4
606        )
607        return []
608    from meerschaum import Pipe
609    _parents = []
610    for keys in _parents_keys:
611        try:
612            p = Pipe(**keys)
613        except Exception as e:
614            warn(f"Unable to build parent with keys '{keys}' for {self}:\n{e}")
615            continue
616        _parents.append(p)
617    return _parents

Return a list of meerschaum.Pipe objects to be designated as parents.

parent: Optional[Pipe]
620@property
621def parent(self) -> Union[mrsm.Pipe, None]:
622    """
623    Return the first pipe in `self.parents` or `None`.
624    """
625    parents = self.parents
626    if not parents:
627        return None
628    return parents[0]

Return the first pipe in self.parents or None.

children: List[Pipe]
631@property
632def children(self) -> List[mrsm.Pipe]:
633    """
634    Return a list of `meerschaum.Pipe` objects to be designated as children.
635    """
636    if 'children' not in self.parameters:
637        return []
638    from meerschaum.utils.warnings import warn
639    _children_keys = self.parameters['children']
640    if not isinstance(_children_keys, list):
641        warn(
642            f"Please ensure the children for {self} are defined as a list of keys.",
643            stacklevel = 4
644        )
645        return []
646    from meerschaum import Pipe
647    _children = []
648    for keys in _children_keys:
649        try:
650            p = Pipe(**keys)
651        except Exception as e:
652            warn(f"Unable to build parent with keys '{keys}' for {self}:\n{e}")
653            continue
654        _children.append(p)
655    return _children

Return a list of meerschaum.Pipe objects to be designated as children.

target: str
658@property
659def target(self) -> str:
660    """
661    The target table name.
662    You can set the target name under on of the following keys
663    (checked in this order):
664      - `target`
665      - `target_name`
666      - `target_table`
667      - `target_table_name`
668    """
669    if 'target' not in self.parameters:
670        default_target = self._target_legacy()
671        default_targets = {default_target}
672        potential_keys = ('target_name', 'target_table', 'target_table_name')
673        _target = None
674        for k in potential_keys:
675            if k in self.parameters:
676                _target = self.parameters[k]
677                break
678
679        _target = _target or default_target
680
681        if self.instance_connector.type == 'sql':
682            from meerschaum.utils.sql import truncate_item_name
683            truncated_target = truncate_item_name(_target, self.instance_connector.flavor)
684            default_targets.add(truncated_target)
685            warned_target = self.__dict__.get('_warned_target', False)
686            if truncated_target != _target and not warned_target:
687                if not warned_target:
688                    warn(
689                        f"The target '{_target}' is too long for '{self.instance_connector.flavor}', "
690                        + f"will use {truncated_target} instead."
691                    )
692                    self.__dict__['_warned_target'] = True
693                _target = truncated_target
694
695        if _target in default_targets:
696            return _target
697        self.target = _target
698    return self.parameters['target']

The target table name. You can set the target name under on of the following keys (checked in this order):

  • target
  • target_name
  • target_table
  • target_table_name
def guess_datetime(self) -> Optional[str]:
721def guess_datetime(self) -> Union[str, None]:
722    """
723    Try to determine a pipe's datetime column.
724    """
725    _dtypes = self.dtypes
726
727    ### Abort if the user explictly disallows a datetime index.
728    if 'datetime' in _dtypes:
729        if _dtypes['datetime'] is None:
730            return None
731
732    from meerschaum.utils.dtypes import are_dtypes_equal
733    dt_cols = [
734        col
735        for col, typ in _dtypes.items()
736        if are_dtypes_equal(typ, 'datetime')
737    ]
738    if not dt_cols:
739        return None
740    return dt_cols[0]

Try to determine a pipe's datetime column.

def show( self, nopretty: bool = False, debug: bool = False, **kw) -> Tuple[bool, str]:
12def show(
13        self,
14        nopretty: bool = False,
15        debug: bool = False,
16        **kw
17    ) -> SuccessTuple:
18    """
19    Show attributes of a Pipe.
20
21    Parameters
22    ----------
23    nopretty: bool, default False
24        If `True`, simply print the JSON of the pipe's attributes.
25
26    debug: bool, default False
27        Verbosity toggle.
28
29    Returns
30    -------
31    A `SuccessTuple` of success, message.
32
33    """
34    import json
35    from meerschaum.utils.formatting import (
36        pprint, make_header, ANSI, highlight_pipes, fill_ansi, get_console,
37    )
38    from meerschaum.utils.packages import import_rich, attempt_import
39    from meerschaum.utils.warnings import info
40    attributes_json = json.dumps(self.attributes)
41    if not nopretty:
42        _to_print = f"Attributes for {self}:"
43        if ANSI:
44            _to_print = fill_ansi(highlight_pipes(make_header(_to_print)), 'magenta')
45            print(_to_print)
46            rich = import_rich()
47            rich_json = attempt_import('rich.json')
48            get_console().print(rich_json.JSON(attributes_json))
49        else:
50            print(_to_print)
51    else:
52        print(attributes_json)
53
54    return True, "Success"

Show attributes of a Pipe.

Parameters
  • nopretty (bool, default False): If True, simply print the JSON of the pipe's attributes.
  • debug (bool, default False): Verbosity toggle.
Returns
def edit( self, patch: bool = False, interactive: bool = False, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
21def edit(
22    self,
23    patch: bool = False,
24    interactive: bool = False,
25    debug: bool = False,
26    **kw: Any
27) -> SuccessTuple:
28    """
29    Edit a Pipe's configuration.
30
31    Parameters
32    ----------
33    patch: bool, default False
34        If `patch` is True, update parameters by cascading rather than overwriting.
35    interactive: bool, default False
36        If `True`, open an editor for the user to make changes to the pipe's YAML file.
37    debug: bool, default False
38        Verbosity toggle.
39
40    Returns
41    -------
42    A `SuccessTuple` of success, message.
43
44    """
45    from meerschaum.utils.venv import Venv
46    from meerschaum.connectors import get_connector_plugin
47
48    if self.temporary:
49        return False, "Cannot edit pipes created with `temporary=True` (read-only)."
50
51    if not interactive:
52        with Venv(get_connector_plugin(self.instance_connector)):
53            return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw)
54
55    from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
56    from meerschaum.utils.misc import edit_file
57    parameters_filename = str(self) + '.yaml'
58    parameters_path = PIPES_CACHE_RESOURCES_PATH / parameters_filename
59
60    from meerschaum.utils.yaml import yaml
61
62    edit_text = f"Edit the parameters for {self}"
63    edit_top = '#' * (len(edit_text) + 4)
64    edit_header = edit_top + f'\n# {edit_text} #\n' + edit_top + '\n\n'
65
66    from meerschaum.config import get_config
67    parameters = dict(get_config('pipes', 'parameters', patch=True))
68    from meerschaum.config._patch import apply_patch_to_config
69    parameters = apply_patch_to_config(parameters, self.parameters)
70
71    ### write parameters to yaml file
72    with open(parameters_path, 'w+') as f:
73        f.write(edit_header)
74        yaml.dump(parameters, stream=f, sort_keys=False)
75
76    ### only quit editing if yaml is valid
77    editing = True
78    while editing:
79        edit_file(parameters_path)
80        try:
81            with open(parameters_path, 'r') as f:
82                file_parameters = yaml.load(f.read())
83        except Exception as e:
84            from meerschaum.utils.warnings import warn
85            warn(f"Invalid format defined for '{self}':\n\n{e}")
86            input(f"Press [Enter] to correct the configuration for '{self}': ")
87        else:
88            editing = False
89
90    self.parameters = file_parameters
91
92    if debug:
93        from meerschaum.utils.formatting import pprint
94        pprint(self.parameters)
95
96    with Venv(get_connector_plugin(self.instance_connector)):
97        return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw)

Edit a Pipe's configuration.

Parameters
  • patch (bool, default False): If patch is True, update parameters by cascading rather than overwriting.
  • interactive (bool, default False): If True, open an editor for the user to make changes to the pipe's YAML file.
  • debug (bool, default False): Verbosity toggle.
Returns
def edit_definition( self, yes: bool = False, noask: bool = False, force: bool = False, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
100def edit_definition(
101    self,
102    yes: bool = False,
103    noask: bool = False,
104    force: bool = False,
105    debug : bool = False,
106    **kw : Any
107) -> SuccessTuple:
108    """
109    Edit a pipe's definition file and update its configuration.
110    **NOTE:** This function is interactive and should not be used in automated scripts!
111
112    Returns
113    -------
114    A `SuccessTuple` of success, message.
115
116    """
117    if self.temporary:
118        return False, "Cannot edit pipes created with `temporary=True` (read-only)."
119
120    from meerschaum.connectors import instance_types
121    if (self.connector is None) or self.connector.type not in instance_types:
122        return self.edit(interactive=True, debug=debug, **kw)
123
124    import json
125    from meerschaum.utils.warnings import info, warn
126    from meerschaum.utils.debug import dprint
127    from meerschaum.config._patch import apply_patch_to_config
128    from meerschaum.utils.misc import edit_file
129
130    _parameters = self.parameters
131    if 'fetch' not in _parameters:
132        _parameters['fetch'] = {}
133
134    def _edit_api():
135        from meerschaum.utils.prompt import prompt, yes_no
136        info(
137            f"Please enter the keys of the source pipe from '{self.connector}'.\n" +
138            "Type 'None' for None, or empty when there is no default. Press [CTRL+C] to skip."
139        )
140
141        _keys = { 'connector_keys' : None, 'metric_key' : None, 'location_key' : None }
142        for k in _keys:
143            _keys[k] = _parameters['fetch'].get(k, None)
144
145        for k, v in _keys.items():
146            try:
147                _keys[k] = prompt(k.capitalize().replace('_', ' ') + ':', icon=True, default=v)
148            except KeyboardInterrupt:
149                continue
150            if _keys[k] in ('', 'None', '\'None\'', '[None]'):
151                _keys[k] = None
152
153        _parameters['fetch'] = apply_patch_to_config(_parameters['fetch'], _keys)
154
155        info("You may optionally specify additional filter parameters as JSON.")
156        print("  Parameters are translated into a 'WHERE x AND y' clause, and lists are IN clauses.")
157        print("  For example, the following JSON would correspond to 'WHERE x = 1 AND y IN (2, 3)':")
158        print(json.dumps({'x': 1, 'y': [2, 3]}, indent=2, separators=(',', ': ')))
159        if force or yes_no(
160            "Would you like to add additional filter parameters?",
161            yes=yes, noask=noask
162        ):
163            from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
164            definition_filename = str(self) + '.json'
165            definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename
166            try:
167                definition_path.touch()
168                with open(definition_path, 'w+') as f:
169                    json.dump(_parameters.get('fetch', {}).get('params', {}), f, indent=2)
170            except Exception as e:
171                return False, f"Failed writing file '{definition_path}':\n" + str(e)
172
173            _params = None
174            while True:
175                edit_file(definition_path)
176                try:
177                    with open(definition_path, 'r') as f:
178                        _params = json.load(f)
179                except Exception as e:
180                    warn(f'Failed to read parameters JSON:\n{e}', stack=False)
181                    if force or yes_no(
182                        "Would you like to try again?\n  "
183                        + "If not, the parameters JSON file will be ignored.",
184                        noask=noask, yes=yes
185                    ):
186                        continue
187                    _params = None
188                break
189            if _params is not None:
190                if 'fetch' not in _parameters:
191                    _parameters['fetch'] = {}
192                _parameters['fetch']['params'] = _params
193
194        self.parameters = _parameters
195        return True, "Success"
196
197    def _edit_sql():
198        import pathlib, os, textwrap
199        from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
200        from meerschaum.utils.misc import edit_file
201        definition_filename = str(self) + '.sql'
202        definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename
203
204        sql_definition = _parameters['fetch'].get('definition', None)
205        if sql_definition is None:
206            sql_definition = ''
207        sql_definition = textwrap.dedent(sql_definition).lstrip()
208
209        try:
210            definition_path.touch()
211            with open(definition_path, 'w+') as f:
212                f.write(sql_definition)
213        except Exception as e:
214            return False, f"Failed writing file '{definition_path}':\n" + str(e)
215
216        edit_file(definition_path)
217        try:
218            with open(definition_path, 'r') as f:
219                file_definition = f.read()
220        except Exception as e:
221            return False, f"Failed reading file '{definition_path}':\n" + str(e)
222
223        if sql_definition == file_definition:
224            return False, f"No changes made to definition for {self}."
225
226        if ' ' not in file_definition:
227            return False, f"Invalid SQL definition for {self}."
228
229        if debug:
230            dprint("Read SQL definition:\n\n" + file_definition)
231        _parameters['fetch']['definition'] = file_definition
232        self.parameters = _parameters
233        return True, "Success"
234
235    locals()['_edit_' + str(self.connector.type)]()
236    return self.edit(interactive=False, debug=debug, **kw)

Edit a pipe's definition file and update its configuration. NOTE: This function is interactive and should not be used in automated scripts!

Returns
def update(self, *args, **kw) -> Tuple[bool, str]:
13def update(self, *args, **kw) -> SuccessTuple:
14    """
15    Update a pipe's parameters in its instance.
16    """
17    kw['interactive'] = False
18    return self.edit(*args, **kw)

Update a pipe's parameters in its instance.

def sync( self, df: Union[pandas.core.frame.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]], meerschaum.core.Pipe._sync.InferFetch] = <class 'meerschaum.core.Pipe._sync.InferFetch'>, begin: Union[datetime.datetime, int, str, NoneType] = '', end: Union[datetime.datetime, int, NoneType] = None, force: bool = False, retries: int = 10, min_seconds: int = 1, check_existing: bool = True, enforce_dtypes: bool = True, blocking: bool = True, workers: Optional[int] = None, callback: Optional[Callable[[Tuple[bool, str]], Any]] = None, error_callback: Optional[Callable[[Exception], Any]] = None, chunksize: Optional[int] = -1, sync_chunks: bool = True, debug: bool = False, _inplace: bool = True, **kw: Any) -> Tuple[bool, str]:
 40def sync(
 41    self,
 42    df: Union[
 43        pd.DataFrame,
 44        Dict[str, List[Any]],
 45        List[Dict[str, Any]],
 46        InferFetch
 47    ] = InferFetch,
 48    begin: Union[datetime, int, str, None] = '',
 49    end: Union[datetime, int, None] = None,
 50    force: bool = False,
 51    retries: int = 10,
 52    min_seconds: int = 1,
 53    check_existing: bool = True,
 54    enforce_dtypes: bool = True,
 55    blocking: bool = True,
 56    workers: Optional[int] = None,
 57    callback: Optional[Callable[[Tuple[bool, str]], Any]] = None,
 58    error_callback: Optional[Callable[[Exception], Any]] = None,
 59    chunksize: Optional[int] = -1,
 60    sync_chunks: bool = True,
 61    debug: bool = False,
 62    _inplace: bool = True,
 63    **kw: Any
 64) -> SuccessTuple:
 65    """
 66    Fetch new data from the source and update the pipe's table with new data.
 67    
 68    Get new remote data via fetch, get existing data in the same time period,
 69    and merge the two, only keeping the unseen data.
 70
 71    Parameters
 72    ----------
 73    df: Union[None, pd.DataFrame, Dict[str, List[Any]]], default None
 74        An optional DataFrame to sync into the pipe. Defaults to `None`.
 75
 76    begin: Union[datetime, int, str, None], default ''
 77        Optionally specify the earliest datetime to search for data.
 78
 79    end: Union[datetime, int, str, None], default None
 80        Optionally specify the latest datetime to search for data.
 81
 82    force: bool, default False
 83        If `True`, keep trying to sync untul `retries` attempts.
 84
 85    retries: int, default 10
 86        If `force`, how many attempts to try syncing before declaring failure.
 87
 88    min_seconds: Union[int, float], default 1
 89        If `force`, how many seconds to sleep between retries. Defaults to `1`.
 90
 91    check_existing: bool, default True
 92        If `True`, pull and diff with existing data from the pipe.
 93
 94    enforce_dtypes: bool, default True
 95        If `True`, enforce dtypes on incoming data.
 96        Set this to `False` if the incoming rows are expected to be of the correct dtypes.
 97
 98    blocking: bool, default True
 99        If `True`, wait for sync to finish and return its result, otherwise
100        asyncronously sync (oxymoron?) and return success. Defaults to `True`.
101        Only intended for specific scenarios.
102
103    workers: Optional[int], default None
104        If provided and the instance connector is thread-safe
105        (`pipe.instance_connector.IS_THREAD_SAFE is True`),
106        limit concurrent sync to this many threads.
107
108    callback: Optional[Callable[[Tuple[bool, str]], Any]], default None
109        Callback function which expects a SuccessTuple as input.
110        Only applies when `blocking=False`.
111
112    error_callback: Optional[Callable[[Exception], Any]], default None
113        Callback function which expects an Exception as input.
114        Only applies when `blocking=False`.
115
116    chunksize: int, default -1
117        Specify the number of rows to sync per chunk.
118        If `-1`, resort to system configuration (default is `900`).
119        A `chunksize` of `None` will sync all rows in one transaction.
120
121    sync_chunks: bool, default True
122        If possible, sync chunks while fetching them into memory.
123
124    debug: bool, default False
125        Verbosity toggle. Defaults to False.
126
127    Returns
128    -------
129    A `SuccessTuple` of success (`bool`) and message (`str`).
130    """
131    from meerschaum.utils.debug import dprint, _checkpoint
132    from meerschaum.utils.formatting import get_console
133    from meerschaum.utils.venv import Venv
134    from meerschaum.connectors import get_connector_plugin
135    from meerschaum.utils.misc import df_is_chunk_generator, filter_keywords, filter_arguments
136    from meerschaum.utils.pool import get_pool
137    from meerschaum.config import get_config
138
139    if (callback is not None or error_callback is not None) and blocking:
140        warn("Callback functions are only executed when blocking = False. Ignoring...")
141
142    _checkpoint(_total=2, **kw)
143
144    if chunksize == 0:
145        chunksize = None
146        sync_chunks = False
147
148    begin, end = self.parse_date_bounds(begin, end)
149    kw.update({
150        'begin': begin,
151        'end': end,
152        'force': force,
153        'retries': retries,
154        'min_seconds': min_seconds,
155        'check_existing': check_existing,
156        'blocking': blocking,
157        'workers': workers,
158        'callback': callback,
159        'error_callback': error_callback,
160        'sync_chunks': sync_chunks,
161        'chunksize': chunksize,
162    })
163
164    ### NOTE: Invalidate `_exists` cache before and after syncing.
165    self._exists = None
166
167    def _sync(
168        p: mrsm.Pipe,
169        df: Union[
170            'pd.DataFrame',
171            Dict[str, List[Any]],
172            List[Dict[str, Any]],
173            InferFetch
174        ] = InferFetch,
175    ) -> SuccessTuple:
176        if df is None:
177            p._exists = None
178            return (
179                False,
180                f"You passed `None` instead of data into `sync()` for {p}.\n"
181                + "Omit the DataFrame to infer fetching.",
182            )
183        ### Ensure that Pipe is registered.
184        if not p.temporary and p.get_id(debug=debug) is None:
185            ### NOTE: This may trigger an interactive session for plugins!
186            register_success, register_msg = p.register(debug=debug)
187            if not register_success:
188                if 'already' not in register_msg:
189                    p._exists = None
190                    return register_success, register_msg
191
192        ### If connector is a plugin with a `sync()` method, return that instead.
193        ### If the plugin does not have a `sync()` method but does have a `fetch()` method,
194        ### use that instead.
195        ### NOTE: The DataFrame must be omitted for the plugin sync method to apply.
196        ### If a DataFrame is provided, continue as expected.
197        if hasattr(df, 'MRSM_INFER_FETCH'):
198            try:
199                if p.connector is None:
200                    if ':' not in p.connector_keys:
201                        return True, f"{p} does not support fetching; nothing to do."
202
203                    msg = f"{p} does not have a valid connector."
204                    if p.connector_keys.startswith('plugin:'):
205                        msg += f"\n    Perhaps {p.connector_keys} has a syntax error?"
206                    p._exists = None
207                    return False, msg
208            except Exception:
209                p._exists = None
210                return False, f"Unable to create the connector for {p}."
211
212            ### Sync in place if this is a SQL pipe.
213            if (
214                str(self.connector) == str(self.instance_connector)
215                and 
216                hasattr(self.instance_connector, 'sync_pipe_inplace')
217                and
218                _inplace
219                and
220                get_config('system', 'experimental', 'inplace_sync')
221            ):
222                with Venv(get_connector_plugin(self.instance_connector)):
223                    p._exists = None
224                    _args, _kwargs = filter_arguments(
225                        p.instance_connector.sync_pipe_inplace,
226                        p,
227                        debug=debug,
228                        **kw
229                    )
230                    return self.instance_connector.sync_pipe_inplace(
231                        *_args,
232                        **_kwargs
233                    )
234
235            ### Activate and invoke `sync(pipe)` for plugin connectors with `sync` methods.
236            try:
237                if getattr(p.connector, 'sync', None) is not None:
238                    with Venv(get_connector_plugin(p.connector), debug=debug):
239                        _args, _kwargs = filter_arguments(
240                            p.connector.sync,
241                            p,
242                            debug=debug,
243                            **kw
244                        )
245                        return_tuple = p.connector.sync(*_args, **_kwargs)
246                    p._exists = None
247                    if not isinstance(return_tuple, tuple):
248                        return_tuple = (
249                            False,
250                            f"Plugin '{p.connector.label}' returned non-tuple value: {return_tuple}"
251                        )
252                    return return_tuple
253
254            except Exception as e:
255                get_console().print_exception()
256                msg = f"Failed to sync {p} with exception: '" + str(e) + "'"
257                if debug:
258                    error(msg, silent=False)
259                p._exists = None
260                return False, msg
261
262            ### Fetch the dataframe from the connector's `fetch()` method.
263            try:
264                with Venv(get_connector_plugin(p.connector), debug=debug):
265                    df = p.fetch(
266                        **filter_keywords(
267                            p.fetch,
268                            debug=debug,
269                            **kw
270                        )
271                    )
272            except Exception as e:
273                get_console().print_exception(
274                    suppress=[
275                        'meerschaum/core/Pipe/_sync.py',
276                        'meerschaum/core/Pipe/_fetch.py',
277                    ]
278                )
279                msg = f"Failed to fetch data from {p.connector}:\n    {e}"
280                df = None
281
282            if df is None:
283                p._exists = None
284                return False, f"No data were fetched for {p}."
285
286            if isinstance(df, list):
287                if len(df) == 0:
288                    return True, f"No new rows were returned for {p}."
289
290                ### May be a chunk hook results list.
291                if isinstance(df[0], tuple):
292                    success = all([_success for _success, _ in df])
293                    message = '\n'.join([_message for _, _message in df])
294                    return success, message
295
296            if df is True:
297                p._exists = None
298                return True, f"{p} is being synced in parallel."
299
300        ### CHECKPOINT: Retrieved the DataFrame.
301        _checkpoint(**kw)
302
303        ### Allow for dataframe generators or iterables.
304        if df_is_chunk_generator(df):
305            kw['workers'] = p.get_num_workers(kw.get('workers', None))
306            dt_col = p.columns.get('datetime', None)
307            pool = get_pool(workers=kw.get('workers', 1))
308            if debug:
309                dprint(f"Received {type(df)}. Attempting to sync first chunk...")
310
311            try:
312                chunk = next(df)
313            except StopIteration:
314                return True, "Received an empty generator; nothing to do."
315
316            chunk_success, chunk_msg = _sync(p, chunk)
317            chunk_msg = '\n' + self._get_chunk_label(chunk, dt_col) + '\n' + chunk_msg
318            if not chunk_success:
319                return chunk_success, f"Unable to sync initial chunk for {p}:\n{chunk_msg}"
320            if debug:
321                dprint("Successfully synced the first chunk, attemping the rest...")
322
323            def _process_chunk(_chunk):
324                _chunk_attempts = 0
325                _max_chunk_attempts = 3
326                while _chunk_attempts < _max_chunk_attempts:
327                    try:
328                        _chunk_success, _chunk_msg = _sync(p, _chunk)
329                    except Exception as e:
330                        _chunk_success, _chunk_msg = False, str(e)
331                    if _chunk_success:
332                        break
333                    _chunk_attempts += 1
334                    _sleep_seconds = _chunk_attempts ** 2
335                    warn(
336                        (
337                            f"Failed to sync chunk to {self} "
338                            + f"(attempt {_chunk_attempts} / {_max_chunk_attempts}).\n"
339                            + f"Sleeping for {_sleep_seconds} second"
340                            + ('s' if _sleep_seconds != 1 else '')
341                            + ":\n{_chunk_msg}"
342                        ),
343                        stack=False,
344                    )
345                    time.sleep(_sleep_seconds)
346
347                num_rows_str = (
348                    f"{num_rows:,} rows"
349                    if (num_rows := len(_chunk)) != 1
350                    else f"{num_rows} row"
351                )
352                _chunk_msg = (
353                    (
354                        "Synced"
355                        if _chunk_success
356                        else "Failed to sync"
357                    ) + f" a chunk ({num_rows_str}) to {p}:\n"
358                    + self._get_chunk_label(_chunk, dt_col)
359                    + '\n'
360                    + _chunk_msg
361                )
362
363                mrsm.pprint((_chunk_success, _chunk_msg), calm=True)
364                return _chunk_success, _chunk_msg
365
366            results = sorted(
367                [(chunk_success, chunk_msg)] + (
368                    list(pool.imap(_process_chunk, df))
369                    if (
370                        not df_is_chunk_generator(chunk)  # Handle nested generators.
371                        and kw.get('workers', 1) != 1
372                    )
373                    else list(
374                        _process_chunk(_child_chunks)
375                        for _child_chunks in df
376                    )
377                )
378            )
379            chunk_messages = [chunk_msg for _, chunk_msg in results]
380            success_bools = [chunk_success for chunk_success, _ in results]
381            num_successes = len([chunk_success for chunk_success, _ in results if chunk_success])
382            num_failures = len([chunk_success for chunk_success, _ in results if not chunk_success])
383            success = all(success_bools)
384            msg = (
385                'Synced '
386                + f'{len(chunk_messages):,} chunk'
387                + ('s' if len(chunk_messages) != 1 else '')
388                + f' to {p}\n({num_successes} succeeded, {num_failures} failed):\n\n'
389                + '\n\n'.join(chunk_messages).lstrip().rstrip()
390            ).lstrip().rstrip()
391            return success, msg
392
393        ### Cast to a dataframe and ensure datatypes are what we expect.
394        df = self.enforce_dtypes(
395            df,
396            chunksize=chunksize,
397            enforce=enforce_dtypes,
398            debug=debug,
399        )
400
401        ### Capture `numeric`, `uuid`, `json`, and `bytes` columns.
402        self._persist_new_json_columns(df, debug=debug)
403        self._persist_new_numeric_columns(df, debug=debug)
404        self._persist_new_uuid_columns(df, debug=debug)
405        self._persist_new_bytes_columns(df, debug=debug)
406
407        if debug:
408            dprint(
409                "DataFrame to sync:\n"
410                + (
411                    str(df)[:255]
412                    + '...'
413                    if len(str(df)) >= 256
414                    else str(df)
415                ),
416                **kw
417            )
418
419        ### if force, continue to sync until success
420        return_tuple = False, f"Did not sync {p}."
421        run = True
422        _retries = 1
423        while run:
424            with Venv(get_connector_plugin(self.instance_connector)):
425                return_tuple = p.instance_connector.sync_pipe(
426                    pipe=p,
427                    df=df,
428                    debug=debug,
429                    **kw
430                )
431            _retries += 1
432            run = (not return_tuple[0]) and force and _retries <= retries
433            if run and debug:
434                dprint(f"Syncing failed for {p}. Attempt ( {_retries} / {retries} )", **kw)
435                dprint(f"Sleeping for {min_seconds} seconds...", **kw)
436                time.sleep(min_seconds)
437            if _retries > retries:
438                warn(
439                    f"Unable to sync {p} within {retries} attempt" +
440                        ("s" if retries != 1 else "") + "!"
441                )
442
443        ### CHECKPOINT: Finished syncing. Handle caching.
444        _checkpoint(**kw)
445        if self.cache_pipe is not None:
446            if debug:
447                dprint("Caching retrieved dataframe.", **kw)
448                _sync_cache_tuple = self.cache_pipe.sync(df, debug=debug, **kw)
449                if not _sync_cache_tuple[0]:
450                    warn(f"Failed to sync local cache for {self}.")
451
452        self._exists = None
453        return return_tuple
454
455    if blocking:
456        self._exists = None
457        return _sync(self, df=df)
458
459    from meerschaum.utils.threading import Thread
460    def default_callback(result_tuple: SuccessTuple):
461        dprint(f"Asynchronous result from {self}: {result_tuple}", **kw)
462
463    def default_error_callback(x: Exception):
464        dprint(f"Error received for {self}: {x}", **kw)
465
466    if callback is None and debug:
467        callback = default_callback
468    if error_callback is None and debug:
469        error_callback = default_error_callback
470    try:
471        thread = Thread(
472            target=_sync,
473            args=(self,),
474            kwargs={'df': df},
475            daemon=False,
476            callback=callback,
477            error_callback=error_callback,
478        )
479        thread.start()
480    except Exception as e:
481        self._exists = None
482        return False, str(e)
483
484    self._exists = None
485    return True, f"Spawned asyncronous sync for {self}."

Fetch new data from the source and update the pipe's table with new data.

Get new remote data via fetch, get existing data in the same time period, and merge the two, only keeping the unseen data.

Parameters
  • df (Union[None, pd.DataFrame, Dict[str, List[Any]]], default None): An optional DataFrame to sync into the pipe. Defaults to None.
  • begin (Union[datetime, int, str, None], default ''): Optionally specify the earliest datetime to search for data.
  • end (Union[datetime, int, str, None], default None): Optionally specify the latest datetime to search for data.
  • force (bool, default False): If True, keep trying to sync untul retries attempts.
  • retries (int, default 10): If force, how many attempts to try syncing before declaring failure.
  • min_seconds (Union[int, float], default 1): If force, how many seconds to sleep between retries. Defaults to 1.
  • check_existing (bool, default True): If True, pull and diff with existing data from the pipe.
  • enforce_dtypes (bool, default True): If True, enforce dtypes on incoming data. Set this to False if the incoming rows are expected to be of the correct dtypes.
  • blocking (bool, default True): If True, wait for sync to finish and return its result, otherwise asyncronously sync (oxymoron?) and return success. Defaults to True. Only intended for specific scenarios.
  • workers (Optional[int], default None): If provided and the instance connector is thread-safe (pipe