meerschaum

Meerschaum banner

Meerschaum Python API

Welcome to the Meerschaum Python API technical documentation! Here you can find information about the classes and functions provided by the meerschaum package. Visit meerschaum.io for general usage documentation.

Root Module

For your convenience, the following classes and functions may be imported from the root meerschaum namespace:

Examples

Build a Connector

Get existing connectors or build a new one in-memory with the meerschaum.get_connector() factory function:

import meerschaum as mrsm

sql_conn = mrsm.get_connector(
    'sql:temp',
    flavor='sqlite',
    database='/tmp/tmp.db',
)
df = sql_conn.read("SELECT 1 AS foo")
print(df)
#    foo
# 0    1

sql_conn.to_sql(df, 'foo')
print(sql_conn.read('foo'))
#    foo
# 0    1

Create a Custom Connector Class

Decorate your connector classes with meerschaum.make_connector() to designate it as a custom connector:

from datetime import datetime, timezone
from random import randint
import meerschaum as mrsm
from meerschaum.utils.misc import round_time

@mrsm.make_connector
class FooConnector(mrsm.Connector):
    REQUIRED_ATTRIBUTES = ['username', 'password']

    def fetch(
        self,
        begin: datetime | None = None,
        end: datetime | None = None,
    ):
        now = begin or round_time(datetime.now(timezone.utc))
        return [
            {'ts': now, 'id': 1, 'vl': randint(1, 100)},
            {'ts': now, 'id': 2, 'vl': randint(1, 100)},
            {'ts': now, 'id': 3, 'vl': randint(1, 100)},
        ]

foo_conn = mrsm.get_connector(
    'foo:bar',
    username='foo',
    password='bar',
)
docs = foo_conn.fetch()

Build a Pipe

Build a meerschaum.Pipe in-memory:

from datetime import datetime
import meerschaum as mrsm

pipe = mrsm.Pipe(
    foo_conn, 'demo',
    instance=sql_conn,
    columns={'datetime': 'ts', 'id': 'id'},
    tags=['production'],
)
pipe.sync(begin=datetime(2024, 1, 1))
df = pipe.get_data()
print(df)
#           ts  id  vl
# 0 2024-01-01   1  97
# 1 2024-01-01   2  18
# 2 2024-01-01   3  96

Add temporary=True to skip registering the pipe in the pipes table.

Get Registered Pipes

The meerschaum.get_pipes() function returns a dictionary hierarchy of pipes by connector, metric, and location:

import meerschaum as mrsm

pipes = mrsm.get_pipes(instance='sql:temp')
pipe = pipes['foo:bar']['demo'][None]

Add as_list=True to flatten the hierarchy:

import meerschaum as mrsm

pipes = mrsm.get_pipes(
    tags=['production'],
    instance=sql_conn,
    as_list=True,
)
print(pipes)
# [Pipe('foo:bar', 'demo', instance='sql:temp')]

Import Plugins

You can import a plugin's module through meerschaum.Plugin.module:

import meerschaum as mrsm

plugin = mrsm.Plugin('noaa')
with mrsm.Venv(plugin):
    noaa = plugin.module

If your plugin has submodules, use meerschaum.plugins.from_plugin_import:

from meerschaum.plugins import from_plugin_import
get_defined_pipes = from_plugin_import('compose.utils.pipes', 'get_defined_pipes')

Import multiple plugins with meerschaum.plugins.import_plugins:

from meerschaum.plugins import import_plugins
noaa, compose = import_plugins('noaa', 'compose')

Create a Job

Create a meerschaum.Job with name and sysargs:

import meerschaum as mrsm

job = mrsm.Job('syncing-engine', 'sync pipes --loop')
success, msg = job.start()

Pass executor_keys as the connectors keys of an API instance to create a remote job:

import meerschaum as mrsm

job = mrsm.Job(
    'foo',
    'sync pipes -s daily',
    executor_keys='api:main',
)

Import from a Virtual Environment Use the meerschaum.Venv context manager to activate a virtual environment:

import meerschaum as mrsm

with mrsm.Venv('noaa'):
    import requests

print(requests.__file__)
# /home/bmeares/.config/meerschaum/venvs/noaa/lib/python3.12/site-packages/requests/__init__.py

To import packages which may not be installed, use meerschaum.attempt_import():

import meerschaum as mrsm

requests = mrsm.attempt_import('requests', venv='noaa')
print(requests.__file__)
# /home/bmeares/.config/meerschaum/venvs/noaa/lib/python3.12/site-packages/requests/__init__.py

Run Actions

Run sysargs with meerschaum.entry():

import meerschaum as mrsm

success, msg = mrsm.entry('show pipes + show version : x2')

Use meerschaum.actions.get_action() to access an action function directly:

from meerschaum.actions import get_action

show_pipes = get_action(['show', 'pipes'])
success, msg = show_pipes(connector_keys=['plugin:noaa'])

Get a dictionary of available subactions with meerschaum.actions.get_subactions():

from meerschaum.actions import get_subactions

subactions = get_subactions('show')
success, msg = subactions['pipes']()

Create a Plugin

Run bootstrap plugin to create a new plugin:

mrsm bootstrap plugin example

This will create example.py in your plugins directory (default ~/.config/meerschaum/plugins/, Windows: %APPDATA%\Meerschaum\plugins). You may paste the example code from the "Create a Custom Action" example below.

Open your plugin with edit plugin:

mrsm edit plugin example

Run edit plugin and paste the example code below to try out the features.

See the writing plugins guide for more in-depth documentation.

Create a Custom Action

Decorate a function with meerschaum.actions.make_action to designate it as an action. Subactions will be automatically detected if not decorated:

from meerschaum.actions import make_action

@make_action
def sing():
    print('What would you like me to sing?')
    return True, "Success"

def sing_tune():
    return False, "I don't know that song!"

def sing_song():
    print('Hello, World!')
    return True, "Success"

Use meerschaum.plugins.add_plugin_argument() to create new parameters for your action:

from meerschaum.plugins import make_action, add_plugin_argument

add_plugin_argument(
    '--song', type=str, help='What song to sing.',
)

@make_action
def sing_melody(action=None, song=None):
    to_sing = action[0] if action else song
    if not to_sing:
        return False, "Please tell me what to sing!"

    return True, f'~I am singing {to_sing}~'
mrsm sing melody lalala

mrsm sing melody --song do-re-mi

Add a Page to the Web Dashboard Use the decorators meerschaum.plugins.dash_plugin() and meerschaum.plugins.web_page() to add new pages to the web dashboard:

from meerschaum.plugins import dash_plugin, web_page

@dash_plugin
def init_dash(dash_app):

    import dash.html as html
    import dash_bootstrap_components as dbc
    from dash import Input, Output, no_update

    ### Routes to '/dash/my-page'
    @web_page('/my-page', login_required=False)
    def my_page():
        return dbc.Container([
            html.H1("Hello, World!"),
            dbc.Button("Click me", id='my-button'),
            html.Div(id="my-output-div"),
        ])

    @dash_app.callback(
        Output('my-output-div', 'children'),
        Input('my-button', 'n_clicks'),
    )
    def my_button_click(n_clicks):
        if not n_clicks:
            return no_update
        return html.P(f'You clicked {n_clicks} times!')

Submodules

meerschaum.actions
Access functions for actions and subactions.

meerschaum.config
Read and write the Meerschaum configuration registry.

meerschaum.connectors
Build connectors to interact with databases and fetch data.

meerschaum.jobs
Start background jobs.

meerschaum.plugins
Access plugin modules and other API utilties.

meerschaum.utils
Utility functions are available in several submodules:

 1#! /usr/bin/env python
 2# -*- coding: utf-8 -*-
 3# vim:fenc=utf-8
 4
 5"""
 6Copyright 2023 Bennett Meares
 7
 8Licensed under the Apache License, Version 2.0 (the "License");
 9you may not use this file except in compliance with the License.
10You may obtain a copy of the License at
11
12   http://www.apache.org/licenses/LICENSE-2.0
13
14Unless required by applicable law or agreed to in writing, software
15distributed under the License is distributed on an "AS IS" BASIS,
16WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17See the License for the specific language governing permissions and
18limitations under the License.
19"""
20
21import atexit
22from meerschaum.utils.typing import SuccessTuple
23from meerschaum.utils.packages import attempt_import
24from meerschaum.core.Pipe import Pipe
25from meerschaum.plugins import Plugin
26from meerschaum.utils.venv import Venv
27from meerschaum.jobs import Job, make_executor
28from meerschaum.connectors import get_connector, Connector, make_connector
29from meerschaum.utils import get_pipes
30from meerschaum.utils.formatting import pprint
31from meerschaum._internal.docs import index as __doc__
32from meerschaum.config import __version__, get_config
33from meerschaum._internal.entry import entry
34from meerschaum.__main__ import _close_pools
35
36atexit.register(_close_pools)
37
38__pdoc__ = {'gui': False, 'api': False, 'core': False, '_internal': False}
39__all__ = (
40    "get_pipes",
41    "get_connector",
42    "get_config",
43    "Pipe",
44    "Plugin",
45    "Venv",
46    "Plugin",
47    "Job",
48    "pprint",
49    "attempt_import",
50    "actions",
51    "config",
52    "connectors",
53    "jobs",
54    "plugins",
55    "utils",
56    "SuccessTuple",
57    "Connector",
58    "make_connector",
59    "entry",
60)
def get_pipes( connector_keys: Union[str, List[str], NoneType] = None, metric_keys: Union[str, List[str], NoneType] = None, location_keys: Union[str, List[str], NoneType] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, mrsm_instance: Union[str, meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, instance: Union[str, meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, as_list: bool = False, method: str = 'registered', debug: bool = False, **kw: Any) -> Union[Dict[str, Dict[str, Dict[str, Pipe]]], List[Pipe]]:
 19def get_pipes(
 20    connector_keys: Union[str, List[str], None] = None,
 21    metric_keys: Union[str, List[str], None] = None,
 22    location_keys: Union[str, List[str], None] = None,
 23    tags: Optional[List[str]] = None,
 24    params: Optional[Dict[str, Any]] = None,
 25    mrsm_instance: Union[str, InstanceConnector, None] = None,
 26    instance: Union[str, InstanceConnector, None] = None,
 27    as_list: bool = False,
 28    method: str = 'registered',
 29    debug: bool = False,
 30    **kw: Any
 31) -> Union[PipesDict, List[mrsm.Pipe]]:
 32    """
 33    Return a dictionary or list of `meerschaum.Pipe` objects.
 34
 35    Parameters
 36    ----------
 37    connector_keys: Union[str, List[str], None], default None
 38        String or list of connector keys.
 39        If omitted or is `'*'`, fetch all possible keys.
 40        If a string begins with `'_'`, select keys that do NOT match the string.
 41
 42    metric_keys: Union[str, List[str], None], default None
 43        String or list of metric keys. See `connector_keys` for formatting.
 44
 45    location_keys: Union[str, List[str], None], default None
 46        String or list of location keys. See `connector_keys` for formatting.
 47
 48    tags: Optional[List[str]], default None
 49         If provided, only include pipes with these tags.
 50
 51    params: Optional[Dict[str, Any]], default None
 52        Dictionary of additional parameters to search by.
 53        Params are parsed into a SQL WHERE clause.
 54        E.g. `{'a': 1, 'b': 2}` equates to `'WHERE a = 1 AND b = 2'`
 55
 56    mrsm_instance: Union[str, InstanceConnector, None], default None
 57        Connector keys for the Meerschaum instance of the pipes.
 58        Must be a `meerschaum.connectors.sql.SQLConnector.SQLConnector` or
 59        `meerschaum.connectors.api.APIConnector.APIConnector`.
 60        
 61    as_list: bool, default False
 62        If `True`, return pipes in a list instead of a hierarchical dictionary.
 63        `False` : `{connector_keys: {metric_key: {location_key: Pipe}}}`
 64        `True`  : `[Pipe]`
 65
 66    method: str, default 'registered'
 67        Available options: `['registered', 'explicit', 'all']`
 68        If `'registered'` (default), create pipes based on registered keys in the connector's pipes table
 69        (API or SQL connector, depends on mrsm_instance).
 70        If `'explicit'`, create pipes from provided connector_keys, metric_keys, and location_keys
 71        instead of consulting the pipes table. Useful for creating non-existent pipes.
 72        If `'all'`, create pipes from predefined metrics and locations. Required `connector_keys`.
 73        **NOTE:** Method `'all'` is not implemented!
 74
 75    **kw: Any:
 76        Keyword arguments to pass to the `meerschaum.Pipe` constructor.
 77        
 78
 79    Returns
 80    -------
 81    A dictionary of dictionaries and `meerschaum.Pipe` objects
 82    in the connector, metric, location hierarchy.
 83    If `as_list` is `True`, return a list of `meerschaum.Pipe` objects.
 84
 85    Examples
 86    --------
 87    ```
 88    >>> ### Manual definition:
 89    >>> pipes = {
 90    ...     <connector_keys>: {
 91    ...         <metric_key>: {
 92    ...             <location_key>: Pipe(
 93    ...                 <connector_keys>,
 94    ...                 <metric_key>,
 95    ...                 <location_key>,
 96    ...             ),
 97    ...         },
 98    ...     },
 99    ... },
100    >>> ### Accessing a single pipe:
101    >>> pipes['sql:main']['weather'][None]
102    >>> ### Return a list instead:
103    >>> get_pipes(as_list=True)
104    [sql_main_weather]
105    >>> 
106    ```
107    """
108
109    from meerschaum.config import get_config
110    from meerschaum.utils.warnings import error
111    from meerschaum.utils.misc import filter_keywords
112
113    if connector_keys is None:
114        connector_keys = []
115    if metric_keys is None:
116        metric_keys = []
117    if location_keys is None:
118        location_keys = []
119    if params is None:
120        params = {}
121    if tags is None:
122        tags = []
123
124    if isinstance(connector_keys, str):
125        connector_keys = [connector_keys]
126    if isinstance(metric_keys, str):
127        metric_keys = [metric_keys]
128    if isinstance(location_keys, str):
129        location_keys = [location_keys]
130
131    ### Get SQL or API connector (keys come from `connector.fetch_pipes_keys()`).
132    if mrsm_instance is None:
133        mrsm_instance = instance
134    if mrsm_instance is None:
135        mrsm_instance = get_config('meerschaum', 'instance', patch=True)
136    if isinstance(mrsm_instance, str):
137        from meerschaum.connectors.parse import parse_instance_keys
138        connector = parse_instance_keys(keys=mrsm_instance, debug=debug)
139    else:
140        from meerschaum.connectors import instance_types
141        valid_connector = False
142        if hasattr(mrsm_instance, 'type'):
143            if mrsm_instance.type in instance_types:
144                valid_connector = True
145        if not valid_connector:
146            error(f"Invalid instance connector: {mrsm_instance}")
147        connector = mrsm_instance
148    if debug:
149        from meerschaum.utils.debug import dprint
150        dprint(f"Using instance connector: {connector}")
151    if not connector:
152        error(f"Could not create connector from keys: '{mrsm_instance}'")
153
154    ### Get a list of tuples for the keys needed to build pipes.
155    result = fetch_pipes_keys(
156        method,
157        connector,
158        connector_keys = connector_keys,
159        metric_keys = metric_keys,
160        location_keys = location_keys,
161        tags = tags,
162        params = params,
163        debug = debug
164    )
165    if result is None:
166        error(f"Unable to build pipes!")
167
168    ### Populate the `pipes` dictionary with Pipes based on the keys
169    ### obtained from the chosen `method`.
170    from meerschaum import Pipe
171    pipes = {}
172    for ck, mk, lk in result:
173        if ck not in pipes:
174            pipes[ck] = {}
175
176        if mk not in pipes[ck]:
177            pipes[ck][mk] = {}
178
179        pipes[ck][mk][lk] = Pipe(
180            ck, mk, lk,
181            mrsm_instance = connector,
182            debug = debug,
183            **filter_keywords(Pipe, **kw)
184        )
185
186    if not as_list:
187        return pipes
188    from meerschaum.utils.misc import flatten_pipes_dict
189    return flatten_pipes_dict(pipes)

Return a dictionary or list of meerschaum.Pipe objects.

Parameters
  • connector_keys (Union[str, List[str], None], default None): String or list of connector keys. If omitted or is '*', fetch all possible keys. If a string begins with '_', select keys that do NOT match the string.
  • metric_keys (Union[str, List[str], None], default None): String or list of metric keys. See connector_keys for formatting.
  • location_keys (Union[str, List[str], None], default None): String or list of location keys. See connector_keys for formatting.
  • tags (Optional[List[str]], default None): If provided, only include pipes with these tags.
  • params (Optional[Dict[str, Any]], default None): Dictionary of additional parameters to search by. Params are parsed into a SQL WHERE clause. E.g. {'a': 1, 'b': 2} equates to 'WHERE a = 1 AND b = 2'
  • mrsm_instance (Union[str, InstanceConnector, None], default None): Connector keys for the Meerschaum instance of the pipes. Must be a meerschaum.connectors.sql.SQLConnector.SQLConnector or meerschaum.connectors.api.APIConnector.APIConnector.
  • as_list (bool, default False): If True, return pipes in a list instead of a hierarchical dictionary. False : {connector_keys: {metric_key: {location_key: Pipe}}} True : [Pipe]
  • method (str, default 'registered'): Available options: ['registered', 'explicit', 'all'] If 'registered' (default), create pipes based on registered keys in the connector's pipes table (API or SQL connector, depends on mrsm_instance). If 'explicit', create pipes from provided connector_keys, metric_keys, and location_keys instead of consulting the pipes table. Useful for creating non-existent pipes. If 'all', create pipes from predefined metrics and locations. Required connector_keys. NOTE: Method 'all' is not implemented!
  • **kw (Any:): Keyword arguments to pass to the meerschaum.Pipe constructor.
Returns
  • A dictionary of dictionaries and meerschaum.Pipe objects
  • in the connector, metric, location hierarchy.
  • If as_list is True, return a list of meerschaum.Pipe objects.
Examples
>>> ### Manual definition:
>>> pipes = {
...     <connector_keys>: {
...         <metric_key>: {
...             <location_key>: Pipe(
...                 <connector_keys>,
...                 <metric_key>,
...                 <location_key>,
...             ),
...         },
...     },
... },
>>> ### Accessing a single pipe:
>>> pipes['sql:main']['weather'][None]
>>> ### Return a list instead:
>>> get_pipes(as_list=True)
[sql_main_weather]
>>> 
def get_connector( type: str = None, label: str = None, refresh: bool = False, debug: bool = False, **kw: Any) -> Connector:
 80def get_connector(
 81    type: str = None,
 82    label: str = None,
 83    refresh: bool = False,
 84    debug: bool = False,
 85    **kw: Any
 86) -> Connector:
 87    """
 88    Return existing connector or create new connection and store for reuse.
 89    
 90    You can create new connectors if enough parameters are provided for the given type and flavor.
 91    
 92
 93    Parameters
 94    ----------
 95    type: Optional[str], default None
 96        Connector type (sql, api, etc.).
 97        Defaults to the type of the configured `instance_connector`.
 98
 99    label: Optional[str], default None
100        Connector label (e.g. main). Defaults to `'main'`.
101
102    refresh: bool, default False
103        Refresh the Connector instance / construct new object. Defaults to `False`.
104
105    kw: Any
106        Other arguments to pass to the Connector constructor.
107        If the Connector has already been constructed and new arguments are provided,
108        `refresh` is set to `True` and the old Connector is replaced.
109
110    Returns
111    -------
112    A new Meerschaum connector (e.g. `meerschaum.connectors.api.APIConnector`,
113    `meerschaum.connectors.sql.SQLConnector`).
114    
115    Examples
116    --------
117    The following parameters would create a new
118    `meerschaum.connectors.sql.SQLConnector` that isn't in the configuration file.
119
120    ```
121    >>> conn = get_connector(
122    ...     type = 'sql',
123    ...     label = 'newlabel',
124    ...     flavor = 'sqlite',
125    ...     database = '/file/path/to/database.db'
126    ... )
127    >>>
128    ```
129
130    """
131    from meerschaum.connectors.parse import parse_instance_keys
132    from meerschaum.config import get_config
133    from meerschaum.config.static import STATIC_CONFIG
134    from meerschaum.utils.warnings import warn
135    global _loaded_plugin_connectors
136    if isinstance(type, str) and not label and ':' in type:
137        type, label = type.split(':', maxsplit=1)
138
139    with _locks['_loaded_plugin_connectors']:
140        if not _loaded_plugin_connectors:
141            load_plugin_connectors()
142            _load_builtin_custom_connectors()
143            _loaded_plugin_connectors = True
144
145    if type is None and label is None:
146        default_instance_keys = get_config('meerschaum', 'instance', patch=True)
147        ### recursive call to get_connector
148        return parse_instance_keys(default_instance_keys)
149
150    ### NOTE: the default instance connector may not be main.
151    ### Only fall back to 'main' if the type is provided by the label is omitted.
152    label = label if label is not None else STATIC_CONFIG['connectors']['default_label']
153
154    ### type might actually be a label. Check if so and raise a warning.
155    if type not in connectors:
156        possibilities, poss_msg = [], ""
157        for _type in get_config('meerschaum', 'connectors'):
158            if type in get_config('meerschaum', 'connectors', _type):
159                possibilities.append(f"{_type}:{type}")
160        if len(possibilities) > 0:
161            poss_msg = " Did you mean"
162            for poss in possibilities[:-1]:
163                poss_msg += f" '{poss}',"
164            if poss_msg.endswith(','):
165                poss_msg = poss_msg[:-1]
166            if len(possibilities) > 1:
167                poss_msg += " or"
168            poss_msg += f" '{possibilities[-1]}'?"
169
170        warn(f"Cannot create Connector of type '{type}'." + poss_msg, stack=False)
171        return None
172
173    if 'sql' not in types:
174        from meerschaum.connectors.plugin import PluginConnector
175        from meerschaum.connectors.valkey import ValkeyConnector
176        with _locks['types']:
177            types.update({
178                'api': APIConnector,
179                'sql': SQLConnector,
180                'plugin': PluginConnector,
181                'valkey': ValkeyConnector,
182            })
183
184    ### determine if we need to call the constructor
185    if not refresh:
186        ### see if any user-supplied arguments differ from the existing instance
187        if label in connectors[type]:
188            warning_message = None
189            for attribute, value in kw.items():
190                if attribute not in connectors[type][label].meta:
191                    import inspect
192                    cls = connectors[type][label].__class__
193                    cls_init_signature = inspect.signature(cls)
194                    cls_init_params = cls_init_signature.parameters
195                    if attribute not in cls_init_params:
196                        warning_message = (
197                            f"Received new attribute '{attribute}' not present in connector " +
198                            f"{connectors[type][label]}.\n"
199                        )
200                elif connectors[type][label].__dict__[attribute] != value:
201                    warning_message = (
202                        f"Mismatched values for attribute '{attribute}' in connector "
203                        + f"'{connectors[type][label]}'.\n" +
204                        f"  - Keyword value: '{value}'\n" +
205                        f"  - Existing value: '{connectors[type][label].__dict__[attribute]}'\n"
206                    )
207            if warning_message is not None:
208                warning_message += (
209                    "\nSetting `refresh` to True and recreating connector with type:"
210                    + f" '{type}' and label '{label}'."
211                )
212                refresh = True
213                warn(warning_message)
214        else: ### connector doesn't yet exist
215            refresh = True
216
217    ### only create an object if refresh is True
218    ### (can be manually specified, otherwise determined above)
219    if refresh:
220        with _locks['connectors']:
221            try:
222                ### will raise an error if configuration is incorrect / missing
223                conn = types[type](label=label, **kw)
224                connectors[type][label] = conn
225            except InvalidAttributesError as ie:
226                warn(
227                    f"Incorrect attributes for connector '{type}:{label}'.\n"
228                    + str(ie),
229                    stack = False,
230                )
231                conn = None
232            except Exception as e:
233                from meerschaum.utils.formatting import get_console
234                console = get_console()
235                if console:
236                    console.print_exception()
237                warn(
238                    f"Exception when creating connector '{type}:{label}'.\n" + str(e),
239                    stack = False,
240                )
241                conn = None
242        if conn is None:
243            return None
244
245    return connectors[type][label]

Return existing connector or create new connection and store for reuse.

You can create new connectors if enough parameters are provided for the given type and flavor.

Parameters
  • type (Optional[str], default None): Connector type (sql, api, etc.). Defaults to the type of the configured instance_connector.
  • label (Optional[str], default None): Connector label (e.g. main). Defaults to 'main'.
  • refresh (bool, default False): Refresh the Connector instance / construct new object. Defaults to False.
  • kw (Any): Other arguments to pass to the Connector constructor. If the Connector has already been constructed and new arguments are provided, refresh is set to True and the old Connector is replaced.
Returns
Examples

The following parameters would create a new meerschaum.connectors.sql.SQLConnector that isn't in the configuration file.

>>> conn = get_connector(
...     type = 'sql',
...     label = 'newlabel',
...     flavor = 'sqlite',
...     database = '/file/path/to/database.db'
... )
>>>
def get_config( *keys: str, patch: bool = True, substitute: bool = True, sync_files: bool = True, write_missing: bool = True, as_tuple: bool = False, warn: bool = True, debug: bool = False) -> Any:
 82def get_config(
 83    *keys: str,
 84    patch: bool = True,
 85    substitute: bool = True,
 86    sync_files: bool = True,
 87    write_missing: bool = True,
 88    as_tuple: bool = False,
 89    warn: bool = True,
 90    debug: bool = False
 91) -> Any:
 92    """
 93    Return the Meerschaum configuration dictionary.
 94    If positional arguments are provided, index by the keys.
 95    Raises a warning if invalid keys are provided.
 96
 97    Parameters
 98    ----------
 99    keys: str:
100        List of strings to index.
101
102    patch: bool, default True
103        If `True`, patch missing default keys into the config directory.
104        Defaults to `True`.
105
106    sync_files: bool, default True
107        If `True`, sync files if needed.
108        Defaults to `True`.
109
110    write_missing: bool, default True
111        If `True`, write default values when the main config files are missing.
112        Defaults to `True`.
113
114    substitute: bool, default True
115        If `True`, subsitute 'MRSM{}' values.
116        Defaults to `True`.
117
118    as_tuple: bool, default False
119        If `True`, return a tuple of type (success, value).
120        Defaults to `False`.
121        
122    Returns
123    -------
124    The value in the configuration directory, indexed by the provided keys.
125
126    Examples
127    --------
128    >>> get_config('meerschaum', 'instance')
129    'sql:main'
130    >>> get_config('does', 'not', 'exist')
131    UserWarning: Invalid keys in config: ('does', 'not', 'exist')
132    """
133    import json
134
135    symlinks_key = STATIC_CONFIG['config']['symlinks_key']
136    if debug:
137        from meerschaum.utils.debug import dprint
138        dprint(f"Indexing keys: {keys}", color=False)
139
140    if len(keys) == 0:
141        _rc = _config(substitute=substitute, sync_files=sync_files, write_missing=write_missing)
142        if as_tuple:
143            return True, _rc 
144        return _rc
145    
146    ### Weird threading issues, only import if substitute is True.
147    if substitute:
148        from meerschaum.config._read_config import search_and_substitute_config
149    ### Invalidate the cache if it was read before with substitute=False
150    ### but there still exist substitutions.
151    if (
152        config is not None and substitute and keys[0] != symlinks_key
153        and 'MRSM{' in json.dumps(config.get(keys[0]))
154    ):
155        try:
156            _subbed = search_and_substitute_config({keys[0]: config[keys[0]]})
157        except Exception as e:
158            import traceback
159            traceback.print_exc()
160        config[keys[0]] = _subbed[keys[0]]
161        if symlinks_key in _subbed:
162            if symlinks_key not in config:
163                config[symlinks_key] = {}
164            if keys[0] not in config[symlinks_key]:
165                config[symlinks_key][keys[0]] = {}
166            config[symlinks_key][keys[0]] = apply_patch_to_config(
167                _subbed,
168                config[symlinks_key][keys[0]]
169            )
170
171    from meerschaum.config._sync import sync_files as _sync_files
172    if config is None:
173        _config(*keys, sync_files=sync_files)
174
175    invalid_keys = False
176    if keys[0] not in config and keys[0] != symlinks_key:
177        single_key_config = read_config(
178            keys=[keys[0]], substitute=substitute, write_missing=write_missing
179        )
180        if keys[0] not in single_key_config:
181            invalid_keys = True
182        else:
183            config[keys[0]] = single_key_config.get(keys[0], None)
184            if symlinks_key in single_key_config and keys[0] in single_key_config[symlinks_key]:
185                if symlinks_key not in config:
186                    config[symlinks_key] = {}
187                config[symlinks_key][keys[0]] = single_key_config[symlinks_key][keys[0]]
188
189            if sync_files:
190                _sync_files(keys=[keys[0]])
191
192    c = config
193    if len(keys) > 0:
194        for k in keys:
195            try:
196                c = c[k]
197            except Exception as e:
198                invalid_keys = True
199                break
200        if invalid_keys:
201            ### Check if the keys are in the default configuration.
202            from meerschaum.config._default import default_config
203            in_default = True
204            patched_default_config = (
205                search_and_substitute_config(default_config)
206                if substitute else copy.deepcopy(default_config)
207            )
208            _c = patched_default_config
209            for k in keys:
210                try:
211                    _c = _c[k]
212                except Exception as e:
213                    in_default = False
214            if in_default:
215                c = _c
216                invalid_keys = False
217            warning_msg = f"Invalid keys in config: {keys}"
218            if not in_default:
219                try:
220                    if warn:
221                        from meerschaum.utils.warnings import warn as _warn
222                        _warn(warning_msg, stacklevel=3, color=False)
223                except Exception as e:
224                    if warn:
225                        print(warning_msg)
226                if as_tuple:
227                    return False, None
228                return None
229
230            ### Don't write keys that we haven't yet loaded into memory.
231            not_loaded_keys = [k for k in patched_default_config if k not in config]
232            for k in not_loaded_keys:
233                patched_default_config.pop(k, None)
234
235            set_config(
236                apply_patch_to_config(
237                    patched_default_config,
238                    config,
239                )
240            )
241            if patch and keys[0] != symlinks_key:
242                if write_missing:
243                    write_config(config, debug=debug)
244
245    if as_tuple:
246        return (not invalid_keys), c
247    return c

Return the Meerschaum configuration dictionary. If positional arguments are provided, index by the keys. Raises a warning if invalid keys are provided.

Parameters
  • keys (str:): List of strings to index.
  • patch (bool, default True): If True, patch missing default keys into the config directory. Defaults to True.
  • sync_files (bool, default True): If True, sync files if needed. Defaults to True.
  • write_missing (bool, default True): If True, write default values when the main config files are missing. Defaults to True.
  • substitute (bool, default True): If True, subsitute 'MRSM{}' values. Defaults to True.
  • as_tuple (bool, default False): If True, return a tuple of type (success, value). Defaults to False.
Returns
  • The value in the configuration directory, indexed by the provided keys.
Examples
>>> get_config('meerschaum', 'instance')
'sql:main'
>>> get_config('does', 'not', 'exist')
UserWarning: Invalid keys in config: ('does', 'not', 'exist')
class Pipe:
 60class Pipe:
 61    """
 62    Access Meerschaum pipes via Pipe objects.
 63    
 64    Pipes are identified by the following:
 65
 66    1. Connector keys (e.g. `'sql:main'`)
 67    2. Metric key (e.g. `'weather'`)
 68    3. Location (optional; e.g. `None`)
 69    
 70    A pipe's connector keys correspond to a data source, and when the pipe is synced,
 71    its `fetch` definition is evaluated and executed to produce new data.
 72    
 73    Alternatively, new data may be directly synced via `pipe.sync()`:
 74    
 75    ```
 76    >>> from meerschaum import Pipe
 77    >>> pipe = Pipe('csv', 'weather')
 78    >>>
 79    >>> import pandas as pd
 80    >>> df = pd.read_csv('weather.csv')
 81    >>> pipe.sync(df)
 82    ```
 83    """
 84
 85    from ._fetch import (
 86        fetch,
 87        get_backtrack_interval,
 88    )
 89    from ._data import (
 90        get_data,
 91        get_backtrack_data,
 92        get_rowcount,
 93        _get_data_as_iterator,
 94        get_chunk_interval,
 95        get_chunk_bounds,
 96        parse_date_bounds,
 97    )
 98    from ._register import register
 99    from ._attributes import (
100        attributes,
101        parameters,
102        columns,
103        indices,
104        indexes,
105        dtypes,
106        autoincrement,
107        upsert,
108        static,
109        tzinfo,
110        get_columns,
111        get_columns_types,
112        get_columns_indices,
113        get_indices,
114        tags,
115        get_id,
116        id,
117        get_val_column,
118        parents,
119        children,
120        target,
121        _target_legacy,
122        guess_datetime,
123    )
124    from ._show import show
125    from ._edit import edit, edit_definition, update
126    from ._sync import (
127        sync,
128        get_sync_time,
129        exists,
130        filter_existing,
131        _get_chunk_label,
132        get_num_workers,
133        _persist_new_json_columns,
134        _persist_new_numeric_columns,
135        _persist_new_uuid_columns,
136    )
137    from ._verify import (
138        verify,
139        get_bound_interval,
140        get_bound_time,
141    )
142    from ._delete import delete
143    from ._drop import drop
144    from ._clear import clear
145    from ._deduplicate import deduplicate
146    from ._bootstrap import bootstrap
147    from ._dtypes import enforce_dtypes, infer_dtypes
148    from ._copy import copy_to
149
150    def __init__(
151        self,
152        connector: str = '',
153        metric: str = '',
154        location: Optional[str] = None,
155        parameters: Optional[Dict[str, Any]] = None,
156        columns: Union[Dict[str, str], List[str], None] = None,
157        indices: Optional[Dict[str, Union[str, List[str]]]] = None,
158        tags: Optional[List[str]] = None,
159        target: Optional[str] = None,
160        dtypes: Optional[Dict[str, str]] = None,
161        instance: Optional[Union[str, InstanceConnector]] = None,
162        temporary: bool = False,
163        upsert: Optional[bool] = None,
164        autoincrement: Optional[bool] = None,
165        static: Optional[bool] = None,
166        mrsm_instance: Optional[Union[str, InstanceConnector]] = None,
167        cache: bool = False,
168        debug: bool = False,
169        connector_keys: Optional[str] = None,
170        metric_key: Optional[str] = None,
171        location_key: Optional[str] = None,
172        indexes: Union[Dict[str, str], List[str], None] = None,
173    ):
174        """
175        Parameters
176        ----------
177        connector: str
178            Keys for the pipe's source connector, e.g. `'sql:main'`.
179
180        metric: str
181            Label for the pipe's contents, e.g. `'weather'`.
182
183        location: str, default None
184            Label for the pipe's location. Defaults to `None`.
185
186        parameters: Optional[Dict[str, Any]], default None
187            Optionally set a pipe's parameters from the constructor,
188            e.g. columns and other attributes.
189            You can edit these parameters with `edit pipes`.
190
191        columns: Union[Dict[str, str], List[str], None], default None
192            Set the `columns` dictionary of `parameters`.
193            If `parameters` is also provided, this dictionary is added under the `'columns'` key.
194
195        indices: Optional[Dict[str, Union[str, List[str]]]], default None
196            Set the `indices` dictionary of `parameters`.
197            If `parameters` is also provided, this dictionary is added under the `'indices'` key.
198
199        tags: Optional[List[str]], default None
200            A list of strings to be added under the `'tags'` key of `parameters`.
201            You can select pipes with certain tags using `--tags`.
202
203        dtypes: Optional[Dict[str, str]], default None
204            Set the `dtypes` dictionary of `parameters`.
205            If `parameters` is also provided, this dictionary is added under the `'dtypes'` key.
206
207        mrsm_instance: Optional[Union[str, InstanceConnector]], default None
208            Connector for the Meerschaum instance where the pipe resides.
209            Defaults to the preconfigured default instance (`'sql:main'`).
210
211        instance: Optional[Union[str, InstanceConnector]], default None
212            Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored.
213
214        upsert: Optional[bool], default None
215            If `True`, set `upsert` to `True` in the parameters.
216
217        autoincrement: Optional[bool], default None
218            If `True`, set `autoincrement` in the parameters.
219
220        static: Optional[bool], default None
221            If `True`, set `static` in the parameters.
222
223        temporary: bool, default False
224            If `True`, prevent instance tables (pipes, users, plugins) from being created.
225
226        cache: bool, default False
227            If `True`, cache fetched data into a local database file.
228            Defaults to `False`.
229        """
230        from meerschaum.utils.warnings import error, warn
231        if (not connector and not connector_keys) or (not metric and not metric_key):
232            error(
233                "Please provide strings for the connector and metric\n    "
234                + "(first two positional arguments)."
235            )
236
237        ### Fall back to legacy `location_key` just in case.
238        if not location:
239            location = location_key
240
241        if not connector:
242            connector = connector_keys
243
244        if not metric:
245            metric = metric_key
246
247        if location in ('[None]', 'None'):
248            location = None
249
250        from meerschaum.config.static import STATIC_CONFIG
251        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
252        for k in (connector, metric, location, *(tags or [])):
253            if str(k).startswith(negation_prefix):
254                error(f"A pipe's keys and tags cannot start with the prefix '{negation_prefix}'.")
255
256        self.connector_keys = str(connector)
257        self.connector_key = self.connector_keys ### Alias
258        self.metric_key = metric
259        self.location_key = location
260        self.temporary = temporary
261
262        self._attributes = {
263            'connector_keys': self.connector_keys,
264            'metric_key': self.metric_key,
265            'location_key': self.location_key,
266            'parameters': {},
267        }
268
269        ### only set parameters if values are provided
270        if isinstance(parameters, dict):
271            self._attributes['parameters'] = parameters
272        else:
273            if parameters is not None:
274                warn(f"The provided parameters are of invalid type '{type(parameters)}'.")
275            self._attributes['parameters'] = {}
276
277        columns = columns or self._attributes.get('parameters', {}).get('columns', {})
278        if isinstance(columns, list):
279            columns = {str(col): str(col) for col in columns}
280        if isinstance(columns, dict):
281            self._attributes['parameters']['columns'] = columns
282        elif columns is not None:
283            warn(f"The provided columns are of invalid type '{type(columns)}'.")
284
285        indices = (
286            indices
287            or indexes
288            or self._attributes.get('parameters', {}).get('indices', None)
289            or self._attributes.get('parameters', {}).get('indexes', None)
290        )
291        if isinstance(indices, dict):
292            indices_key = (
293                'indexes'
294                if 'indexes' in self._attributes['parameters']
295                else 'indices'
296            )
297            self._attributes['parameters'][indices_key] = indices
298
299        if isinstance(tags, (list, tuple)):
300            self._attributes['parameters']['tags'] = tags
301        elif tags is not None:
302            warn(f"The provided tags are of invalid type '{type(tags)}'.")
303
304        if isinstance(target, str):
305            self._attributes['parameters']['target'] = target
306        elif target is not None:
307            warn(f"The provided target is of invalid type '{type(target)}'.")
308
309        if isinstance(dtypes, dict):
310            self._attributes['parameters']['dtypes'] = dtypes
311        elif dtypes is not None:
312            warn(f"The provided dtypes are of invalid type '{type(dtypes)}'.")
313
314        if isinstance(upsert, bool):
315            self._attributes['parameters']['upsert'] = upsert
316
317        if isinstance(autoincrement, bool):
318            self._attributes['parameters']['autoincrement'] = autoincrement
319
320        if isinstance(static, bool):
321            self._attributes['parameters']['static'] = static
322
323        ### NOTE: The parameters dictionary is {} by default.
324        ###       A Pipe may be registered without parameters, then edited,
325        ###       or a Pipe may be registered with parameters set in-memory first.
326        #  from meerschaum.config import get_config
327        _mrsm_instance = mrsm_instance if mrsm_instance is not None else instance
328        if _mrsm_instance is None:
329            _mrsm_instance = get_config('meerschaum', 'instance', patch=True)
330
331        if not isinstance(_mrsm_instance, str):
332            self._instance_connector = _mrsm_instance
333            self.instance_keys = str(_mrsm_instance)
334        else: ### NOTE: must be SQL or API Connector for this work
335            self.instance_keys = _mrsm_instance
336
337        self._cache = cache and get_config('system', 'experimental', 'cache')
338
339    @property
340    def meta(self):
341        """
342        Return the four keys needed to reconstruct this pipe.
343        """
344        return {
345            'connector': self.connector_keys,
346            'metric': self.metric_key,
347            'location': self.location_key,
348            'instance': self.instance_keys,
349        }
350
351    def keys(self) -> List[str]:
352        """
353        Return the ordered keys for this pipe.
354        """
355        return {
356            key: val
357            for key, val in self.meta.items()
358            if key != 'instance'
359        }
360
361    @property
362    def instance_connector(self) -> Union[InstanceConnector, None]:
363        """
364        The connector to where this pipe resides.
365        May either be of type `meerschaum.connectors.sql.SQLConnector` or
366        `meerschaum.connectors.api.APIConnector`.
367        """
368        if '_instance_connector' not in self.__dict__:
369            from meerschaum.connectors.parse import parse_instance_keys
370            conn = parse_instance_keys(self.instance_keys)
371            if conn:
372                self._instance_connector = conn
373            else:
374                return None
375        return self._instance_connector
376
377    @property
378    def connector(self) -> Union[meerschaum.connectors.Connector, None]:
379        """
380        The connector to the data source.
381        """
382        if '_connector' not in self.__dict__:
383            from meerschaum.connectors.parse import parse_instance_keys
384            import warnings
385            with warnings.catch_warnings():
386                warnings.simplefilter('ignore')
387                try:
388                    conn = parse_instance_keys(self.connector_keys)
389                except Exception as e:
390                    conn = None
391            if conn:
392                self._connector = conn
393            else:
394                return None
395        return self._connector
396
397    @property
398    def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]:
399        """
400        If the pipe was created with `cache=True`, return the connector to the pipe's
401        SQLite database for caching.
402        """
403        if not self._cache:
404            return None
405
406        if '_cache_connector' not in self.__dict__:
407            from meerschaum.connectors import get_connector
408            from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH
409            _resources_path = SQLITE_RESOURCES_PATH
410            self._cache_connector = get_connector(
411                'sql', '_cache_' + str(self),
412                flavor='sqlite',
413                database=str(_resources_path / ('_cache_' + str(self) + '.db')),
414            )
415
416        return self._cache_connector
417
418    @property
419    def cache_pipe(self) -> Union['meerschaum.Pipe', None]:
420        """
421        If the pipe was created with `cache=True`, return another `meerschaum.Pipe` used to
422        manage the local data.
423        """
424        if self.cache_connector is None:
425            return None
426        if '_cache_pipe' not in self.__dict__:
427            from meerschaum.config._patch import apply_patch_to_config
428            from meerschaum.utils.sql import sql_item_name
429            _parameters = copy.deepcopy(self.parameters)
430            _fetch_patch = {
431                'fetch': ({
432                    'definition': (
433                        f"SELECT * FROM "
434                        + sql_item_name(
435                            str(self.target),
436                            self.instance_connector.flavor,
437                            self.instance_connector.get_pipe_schema(self),
438                        )
439                    ),
440                }) if self.instance_connector.type == 'sql' else ({
441                    'connector_keys': self.connector_keys,
442                    'metric_key': self.metric_key,
443                    'location_key': self.location_key,
444                })
445            }
446            _parameters = apply_patch_to_config(_parameters, _fetch_patch)
447            self._cache_pipe = Pipe(
448                self.instance_keys,
449                (self.connector_keys + '_' + self.metric_key + '_cache'),
450                self.location_key,
451                mrsm_instance = self.cache_connector,
452                parameters = _parameters,
453                cache = False,
454                temporary = True,
455            )
456
457        return self._cache_pipe
458
459    def __str__(self, ansi: bool=False):
460        return pipe_repr(self, ansi=ansi)
461
462    def __eq__(self, other):
463        try:
464            return (
465                isinstance(self, type(other))
466                and self.connector_keys == other.connector_keys
467                and self.metric_key == other.metric_key
468                and self.location_key == other.location_key
469                and self.instance_keys == other.instance_keys
470            )
471        except Exception as e:
472            return False
473
474    def __hash__(self):
475        ### Using an esoteric separator to avoid collisions.
476        sep = "[\"']"
477        return hash(
478            str(self.connector_keys) + sep
479            + str(self.metric_key) + sep
480            + str(self.location_key) + sep
481            + str(self.instance_keys) + sep
482        )
483
484    def __repr__(self, ansi: bool=True, **kw) -> str:
485        if not hasattr(sys, 'ps1'):
486            ansi = False
487
488        return pipe_repr(self, ansi=ansi, **kw)
489
490    def __pt_repr__(self):
491        from meerschaum.utils.packages import attempt_import
492        prompt_toolkit_formatted_text = attempt_import('prompt_toolkit.formatted_text', lazy=False)
493        return prompt_toolkit_formatted_text.ANSI(pipe_repr(self, ansi=True))
494
495    def __getstate__(self) -> Dict[str, Any]:
496        """
497        Define the state dictionary (pickling).
498        """
499        return {
500            'connector': self.connector_keys,
501            'metric': self.metric_key,
502            'location': self.location_key,
503            'parameters': self.parameters,
504            'instance': self.instance_keys,
505        }
506
507    def __setstate__(self, _state: Dict[str, Any]):
508        """
509        Read the state (unpickling).
510        """
511        self.__init__(**_state)
512
513    def __getitem__(self, key: str) -> Any:
514        """
515        Index the pipe's attributes.
516        If the `key` cannot be found`, return `None`.
517        """
518        if key in self.attributes:
519            return self.attributes.get(key, None)
520
521        aliases = {
522            'connector': 'connector_keys',
523            'connector_key': 'connector_keys',
524            'metric': 'metric_key',
525            'location': 'location_key',
526        }
527        aliased_key = aliases.get(key, None)
528        if aliased_key is not None:
529            return self.attributes.get(aliased_key, None)
530
531        property_aliases = {
532            'instance': 'instance_keys',
533            'instance_key': 'instance_keys',
534        }
535        aliased_key = property_aliases.get(key, None)
536        if aliased_key is not None:
537            key = aliased_key
538        return getattr(self, key, None)

Access Meerschaum pipes via Pipe objects.

Pipes are identified by the following:

  1. Connector keys (e.g. 'sql:main')
  2. Metric key (e.g. 'weather')
  3. Location (optional; e.g. None)

A pipe's connector keys correspond to a data source, and when the pipe is synced, its fetch definition is evaluated and executed to produce new data.

Alternatively, new data may be directly synced via pipe.sync():

>>> from meerschaum import Pipe
>>> pipe = Pipe('csv', 'weather')
>>>
>>> import pandas as pd
>>> df = pd.read_csv('weather.csv')
>>> pipe.sync(df)
Pipe( connector: str = '', metric: str = '', location: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None, columns: Union[Dict[str, str], List[str], NoneType] = None, indices: Optional[Dict[str, Union[str, List[str]]]] = None, tags: Optional[List[str]] = None, target: Optional[str] = None, dtypes: Optional[Dict[str, str]] = None, instance: Union[str, meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, temporary: bool = False, upsert: Optional[bool] = None, autoincrement: Optional[bool] = None, static: Optional[bool] = None, mrsm_instance: Union[str, meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, cache: bool = False, debug: bool = False, connector_keys: Optional[str] = None, metric_key: Optional[str] = None, location_key: Optional[str] = None, indexes: Union[Dict[str, str], List[str], NoneType] = None)
150    def __init__(
151        self,
152        connector: str = '',
153        metric: str = '',
154        location: Optional[str] = None,
155        parameters: Optional[Dict[str, Any]] = None,
156        columns: Union[Dict[str, str], List[str], None] = None,
157        indices: Optional[Dict[str, Union[str, List[str]]]] = None,
158        tags: Optional[List[str]] = None,
159        target: Optional[str] = None,
160        dtypes: Optional[Dict[str, str]] = None,
161        instance: Optional[Union[str, InstanceConnector]] = None,
162        temporary: bool = False,
163        upsert: Optional[bool] = None,
164        autoincrement: Optional[bool] = None,
165        static: Optional[bool] = None,
166        mrsm_instance: Optional[Union[str, InstanceConnector]] = None,
167        cache: bool = False,
168        debug: bool = False,
169        connector_keys: Optional[str] = None,
170        metric_key: Optional[str] = None,
171        location_key: Optional[str] = None,
172        indexes: Union[Dict[str, str], List[str], None] = None,
173    ):
174        """
175        Parameters
176        ----------
177        connector: str
178            Keys for the pipe's source connector, e.g. `'sql:main'`.
179
180        metric: str
181            Label for the pipe's contents, e.g. `'weather'`.
182
183        location: str, default None
184            Label for the pipe's location. Defaults to `None`.
185
186        parameters: Optional[Dict[str, Any]], default None
187            Optionally set a pipe's parameters from the constructor,
188            e.g. columns and other attributes.
189            You can edit these parameters with `edit pipes`.
190
191        columns: Union[Dict[str, str], List[str], None], default None
192            Set the `columns` dictionary of `parameters`.
193            If `parameters` is also provided, this dictionary is added under the `'columns'` key.
194
195        indices: Optional[Dict[str, Union[str, List[str]]]], default None
196            Set the `indices` dictionary of `parameters`.
197            If `parameters` is also provided, this dictionary is added under the `'indices'` key.
198
199        tags: Optional[List[str]], default None
200            A list of strings to be added under the `'tags'` key of `parameters`.
201            You can select pipes with certain tags using `--tags`.
202
203        dtypes: Optional[Dict[str, str]], default None
204            Set the `dtypes` dictionary of `parameters`.
205            If `parameters` is also provided, this dictionary is added under the `'dtypes'` key.
206
207        mrsm_instance: Optional[Union[str, InstanceConnector]], default None
208            Connector for the Meerschaum instance where the pipe resides.
209            Defaults to the preconfigured default instance (`'sql:main'`).
210
211        instance: Optional[Union[str, InstanceConnector]], default None
212            Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored.
213
214        upsert: Optional[bool], default None
215            If `True`, set `upsert` to `True` in the parameters.
216
217        autoincrement: Optional[bool], default None
218            If `True`, set `autoincrement` in the parameters.
219
220        static: Optional[bool], default None
221            If `True`, set `static` in the parameters.
222
223        temporary: bool, default False
224            If `True`, prevent instance tables (pipes, users, plugins) from being created.
225
226        cache: bool, default False
227            If `True`, cache fetched data into a local database file.
228            Defaults to `False`.
229        """
230        from meerschaum.utils.warnings import error, warn
231        if (not connector and not connector_keys) or (not metric and not metric_key):
232            error(
233                "Please provide strings for the connector and metric\n    "
234                + "(first two positional arguments)."
235            )
236
237        ### Fall back to legacy `location_key` just in case.
238        if not location:
239            location = location_key
240
241        if not connector:
242            connector = connector_keys
243
244        if not metric:
245            metric = metric_key
246
247        if location in ('[None]', 'None'):
248            location = None
249
250        from meerschaum.config.static import STATIC_CONFIG
251        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
252        for k in (connector, metric, location, *(tags or [])):
253            if str(k).startswith(negation_prefix):
254                error(f"A pipe's keys and tags cannot start with the prefix '{negation_prefix}'.")
255
256        self.connector_keys = str(connector)
257        self.connector_key = self.connector_keys ### Alias
258        self.metric_key = metric
259        self.location_key = location
260        self.temporary = temporary
261
262        self._attributes = {
263            'connector_keys': self.connector_keys,
264            'metric_key': self.metric_key,
265            'location_key': self.location_key,
266            'parameters': {},
267        }
268
269        ### only set parameters if values are provided
270        if isinstance(parameters, dict):
271            self._attributes['parameters'] = parameters
272        else:
273            if parameters is not None:
274                warn(f"The provided parameters are of invalid type '{type(parameters)}'.")
275            self._attributes['parameters'] = {}
276
277        columns = columns or self._attributes.get('parameters', {}).get('columns', {})
278        if isinstance(columns, list):
279            columns = {str(col): str(col) for col in columns}
280        if isinstance(columns, dict):
281            self._attributes['parameters']['columns'] = columns
282        elif columns is not None:
283            warn(f"The provided columns are of invalid type '{type(columns)}'.")
284
285        indices = (
286            indices
287            or indexes
288            or self._attributes.get('parameters', {}).get('indices', None)
289            or self._attributes.get('parameters', {}).get('indexes', None)
290        )
291        if isinstance(indices, dict):
292            indices_key = (
293                'indexes'
294                if 'indexes' in self._attributes['parameters']
295                else 'indices'
296            )
297            self._attributes['parameters'][indices_key] = indices
298
299        if isinstance(tags, (list, tuple)):
300            self._attributes['parameters']['tags'] = tags
301        elif tags is not None:
302            warn(f"The provided tags are of invalid type '{type(tags)}'.")
303
304        if isinstance(target, str):
305            self._attributes['parameters']['target'] = target
306        elif target is not None:
307            warn(f"The provided target is of invalid type '{type(target)}'.")
308
309        if isinstance(dtypes, dict):
310            self._attributes['parameters']['dtypes'] = dtypes
311        elif dtypes is not None:
312            warn(f"The provided dtypes are of invalid type '{type(dtypes)}'.")
313
314        if isinstance(upsert, bool):
315            self._attributes['parameters']['upsert'] = upsert
316
317        if isinstance(autoincrement, bool):
318            self._attributes['parameters']['autoincrement'] = autoincrement
319
320        if isinstance(static, bool):
321            self._attributes['parameters']['static'] = static
322
323        ### NOTE: The parameters dictionary is {} by default.
324        ###       A Pipe may be registered without parameters, then edited,
325        ###       or a Pipe may be registered with parameters set in-memory first.
326        #  from meerschaum.config import get_config
327        _mrsm_instance = mrsm_instance if mrsm_instance is not None else instance
328        if _mrsm_instance is None:
329            _mrsm_instance = get_config('meerschaum', 'instance', patch=True)
330
331        if not isinstance(_mrsm_instance, str):
332            self._instance_connector = _mrsm_instance
333            self.instance_keys = str(_mrsm_instance)
334        else: ### NOTE: must be SQL or API Connector for this work
335            self.instance_keys = _mrsm_instance
336
337        self._cache = cache and get_config('system', 'experimental', 'cache')
Parameters
  • connector (str): Keys for the pipe's source connector, e.g. 'sql:main'.
  • metric (str): Label for the pipe's contents, e.g. 'weather'.
  • location (str, default None): Label for the pipe's location. Defaults to None.
  • parameters (Optional[Dict[str, Any]], default None): Optionally set a pipe's parameters from the constructor, e.g. columns and other attributes. You can edit these parameters with edit pipes.
  • columns (Union[Dict[str, str], List[str], None], default None): Set the columns dictionary of parameters. If parameters is also provided, this dictionary is added under the 'columns' key.
  • indices (Optional[Dict[str, Union[str, List[str]]]], default None): Set the indices dictionary of parameters. If parameters is also provided, this dictionary is added under the 'indices' key.
  • tags (Optional[List[str]], default None): A list of strings to be added under the 'tags' key of parameters. You can select pipes with certain tags using --tags.
  • dtypes (Optional[Dict[str, str]], default None): Set the dtypes dictionary of parameters. If parameters is also provided, this dictionary is added under the 'dtypes' key.
  • mrsm_instance (Optional[Union[str, InstanceConnector]], default None): Connector for the Meerschaum instance where the pipe resides. Defaults to the preconfigured default instance ('sql:main').
  • instance (Optional[Union[str, InstanceConnector]], default None): Alias for mrsm_instance. If mrsm_instance is supplied, this value is ignored.
  • upsert (Optional[bool], default None): If True, set upsert to True in the parameters.
  • autoincrement (Optional[bool], default None): If True, set autoincrement in the parameters.
  • static (Optional[bool], default None): If True, set static in the parameters.
  • temporary (bool, default False): If True, prevent instance tables (pipes, users, plugins) from being created.
  • cache (bool, default False): If True, cache fetched data into a local database file. Defaults to False.
connector_keys
connector_key
metric_key
location_key
temporary
meta
339    @property
340    def meta(self):
341        """
342        Return the four keys needed to reconstruct this pipe.
343        """
344        return {
345            'connector': self.connector_keys,
346            'metric': self.metric_key,
347            'location': self.location_key,
348            'instance': self.instance_keys,
349        }

Return the four keys needed to reconstruct this pipe.

def keys(self) -> List[str]:
351    def keys(self) -> List[str]:
352        """
353        Return the ordered keys for this pipe.
354        """
355        return {
356            key: val
357            for key, val in self.meta.items()
358            if key != 'instance'
359        }

Return the ordered keys for this pipe.

instance_connector: Union[meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType]
361    @property
362    def instance_connector(self) -> Union[InstanceConnector, None]:
363        """
364        The connector to where this pipe resides.
365        May either be of type `meerschaum.connectors.sql.SQLConnector` or
366        `meerschaum.connectors.api.APIConnector`.
367        """
368        if '_instance_connector' not in self.__dict__:
369            from meerschaum.connectors.parse import parse_instance_keys
370            conn = parse_instance_keys(self.instance_keys)
371            if conn:
372                self._instance_connector = conn
373            else:
374                return None
375        return self._instance_connector

The connector to where this pipe resides. May either be of type meerschaum.connectors.sql.SQLConnector or meerschaum.connectors.api.APIConnector.

connector: Optional[Connector]
377    @property
378    def connector(self) -> Union[meerschaum.connectors.Connector, None]:
379        """
380        The connector to the data source.
381        """
382        if '_connector' not in self.__dict__:
383            from meerschaum.connectors.parse import parse_instance_keys
384            import warnings
385            with warnings.catch_warnings():
386                warnings.simplefilter('ignore')
387                try:
388                    conn = parse_instance_keys(self.connector_keys)
389                except Exception as e:
390                    conn = None
391            if conn:
392                self._connector = conn
393            else:
394                return None
395        return self._connector

The connector to the data source.

cache_connector: Optional[meerschaum.connectors.SQLConnector]
397    @property
398    def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]:
399        """
400        If the pipe was created with `cache=True`, return the connector to the pipe's
401        SQLite database for caching.
402        """
403        if not self._cache:
404            return None
405
406        if '_cache_connector' not in self.__dict__:
407            from meerschaum.connectors import get_connector
408            from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH
409            _resources_path = SQLITE_RESOURCES_PATH
410            self._cache_connector = get_connector(
411                'sql', '_cache_' + str(self),
412                flavor='sqlite',
413                database=str(_resources_path / ('_cache_' + str(self) + '.db')),
414            )
415
416        return self._cache_connector

If the pipe was created with cache=True, return the connector to the pipe's SQLite database for caching.

cache_pipe: Optional[Pipe]
418    @property
419    def cache_pipe(self) -> Union['meerschaum.Pipe', None]:
420        """
421        If the pipe was created with `cache=True`, return another `meerschaum.Pipe` used to
422        manage the local data.
423        """
424        if self.cache_connector is None:
425            return None
426        if '_cache_pipe' not in self.__dict__:
427            from meerschaum.config._patch import apply_patch_to_config
428            from meerschaum.utils.sql import sql_item_name
429            _parameters = copy.deepcopy(self.parameters)
430            _fetch_patch = {
431                'fetch': ({
432                    'definition': (
433                        f"SELECT * FROM "
434                        + sql_item_name(
435                            str(self.target),
436                            self.instance_connector.flavor,
437                            self.instance_connector.get_pipe_schema(self),
438                        )
439                    ),
440                }) if self.instance_connector.type == 'sql' else ({
441                    'connector_keys': self.connector_keys,
442                    'metric_key': self.metric_key,
443                    'location_key': self.location_key,
444                })
445            }
446            _parameters = apply_patch_to_config(_parameters, _fetch_patch)
447            self._cache_pipe = Pipe(
448                self.instance_keys,
449                (self.connector_keys + '_' + self.metric_key + '_cache'),
450                self.location_key,
451                mrsm_instance = self.cache_connector,
452                parameters = _parameters,
453                cache = False,
454                temporary = True,
455            )
456
457        return self._cache_pipe

If the pipe was created with cache=True, return another meerschaum.Pipe used to manage the local data.

def fetch( self, begin: Union[datetime.datetime, int, str, NoneType] = '', end: Union[datetime.datetime, int, NoneType] = None, check_existing: bool = True, sync_chunks: bool = False, debug: bool = False, **kw: Any) -> Union[pandas.core.frame.DataFrame, Iterator[pandas.core.frame.DataFrame], NoneType]:
21def fetch(
22    self,
23    begin: Union[datetime, int, str, None] = '',
24    end: Union[datetime, int, None] = None,
25    check_existing: bool = True,
26    sync_chunks: bool = False,
27    debug: bool = False,
28    **kw: Any
29) -> Union['pd.DataFrame', Iterator['pd.DataFrame'], None]:
30    """
31    Fetch a Pipe's latest data from its connector.
32
33    Parameters
34    ----------
35    begin: Union[datetime, str, None], default '':
36        If provided, only fetch data newer than or equal to `begin`.
37
38    end: Optional[datetime], default None:
39        If provided, only fetch data older than or equal to `end`.
40
41    check_existing: bool, default True
42        If `False`, do not apply the backtrack interval.
43
44    sync_chunks: bool, default False
45        If `True` and the pipe's connector is of type `'sql'`, begin syncing chunks while fetching
46        loads chunks into memory.
47
48    debug: bool, default False
49        Verbosity toggle.
50
51    Returns
52    -------
53    A `pd.DataFrame` of the newest unseen data.
54
55    """
56    if 'fetch' not in dir(self.connector):
57        warn(f"No `fetch()` function defined for connector '{self.connector}'")
58        return None
59
60    from meerschaum.connectors import custom_types, get_connector_plugin
61    from meerschaum.utils.debug import dprint, _checkpoint
62    from meerschaum.utils.misc import filter_arguments
63
64    _chunk_hook = kw.pop('chunk_hook', None)
65    kw['workers'] = self.get_num_workers(kw.get('workers', None))
66    if sync_chunks and _chunk_hook is None:
67
68        def _chunk_hook(chunk, **_kw) -> SuccessTuple:
69            """
70            Wrap `Pipe.sync()` with a custom chunk label prepended to the message.
71            """
72            from meerschaum.config._patch import apply_patch_to_config
73            kwargs = apply_patch_to_config(kw, _kw)
74            chunk_success, chunk_message = self.sync(chunk, **kwargs)
75            chunk_label = self._get_chunk_label(chunk, self.columns.get('datetime', None))
76            if chunk_label:
77                chunk_message = '\n' + chunk_label + '\n' + chunk_message
78            return chunk_success, chunk_message
79
80    begin, end = self.parse_date_bounds(begin, end)
81
82    with mrsm.Venv(get_connector_plugin(self.connector)):
83        _args, _kwargs = filter_arguments(
84            self.connector.fetch,
85            self,
86            begin=_determine_begin(
87                self,
88                begin,
89                check_existing=check_existing,
90                debug=debug,
91            ),
92            end=end,
93            chunk_hook=_chunk_hook,
94            debug=debug,
95            **kw
96        )
97        df = self.connector.fetch(*_args, **_kwargs)
98    return df

Fetch a Pipe's latest data from its connector.

Parameters
  • begin (Union[datetime, str, None], default '':): If provided, only fetch data newer than or equal to begin.
  • end (Optional[datetime], default None:): If provided, only fetch data older than or equal to end.
  • check_existing (bool, default True): If False, do not apply the backtrack interval.
  • sync_chunks (bool, default False): If True and the pipe's connector is of type 'sql', begin syncing chunks while fetching loads chunks into memory.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pd.DataFrame of the newest unseen data.
def get_backtrack_interval( self, check_existing: bool = True, debug: bool = False) -> Union[datetime.timedelta, int]:
101def get_backtrack_interval(
102    self,
103    check_existing: bool = True,
104    debug: bool = False,
105) -> Union[timedelta, int]:
106    """
107    Get the chunk interval to use for this pipe.
108
109    Parameters
110    ----------
111    check_existing: bool, default True
112        If `False`, return a backtrack_interval of 0 minutes.
113
114    Returns
115    -------
116    The backtrack interval (`timedelta` or `int`) to use with this pipe's `datetime` axis.
117    """
118    default_backtrack_minutes = get_config('pipes', 'parameters', 'fetch', 'backtrack_minutes')
119    configured_backtrack_minutes = self.parameters.get('fetch', {}).get('backtrack_minutes', None)
120    backtrack_minutes = (
121        configured_backtrack_minutes
122        if configured_backtrack_minutes is not None
123        else default_backtrack_minutes
124    ) if check_existing else 0
125
126    backtrack_interval = timedelta(minutes=backtrack_minutes)
127    dt_col = self.columns.get('datetime', None)
128    if dt_col is None:
129        return backtrack_interval
130
131    dt_dtype = self.dtypes.get(dt_col, 'datetime64[ns, UTC]')
132    if 'int' in dt_dtype.lower():
133        return backtrack_minutes
134
135    return backtrack_interval

Get the chunk interval to use for this pipe.

Parameters
  • check_existing (bool, default True): If False, return a backtrack_interval of 0 minutes.
Returns
  • The backtrack interval (timedelta or int) to use with this pipe's datetime axis.
def get_data( self, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[datetime.datetime, int, str, NoneType] = None, end: Union[datetime.datetime, int, str, NoneType] = None, params: Optional[Dict[str, Any]] = None, as_iterator: bool = False, as_chunks: bool = False, as_dask: bool = False, chunk_interval: Union[datetime.timedelta, int, NoneType] = None, order: Optional[str] = 'asc', limit: Optional[int] = None, fresh: bool = False, debug: bool = False, **kw: Any) -> Union[pandas.core.frame.DataFrame, Iterator[pandas.core.frame.DataFrame], NoneType]:
 23def get_data(
 24    self,
 25    select_columns: Optional[List[str]] = None,
 26    omit_columns: Optional[List[str]] = None,
 27    begin: Union[datetime, int, str, None] = None,
 28    end: Union[datetime, int, str, None] = None,
 29    params: Optional[Dict[str, Any]] = None,
 30    as_iterator: bool = False,
 31    as_chunks: bool = False,
 32    as_dask: bool = False,
 33    chunk_interval: Union[timedelta, int, None] = None,
 34    order: Optional[str] = 'asc',
 35    limit: Optional[int] = None,
 36    fresh: bool = False,
 37    debug: bool = False,
 38    **kw: Any
 39) -> Union['pd.DataFrame', Iterator['pd.DataFrame'], None]:
 40    """
 41    Get a pipe's data from the instance connector.
 42
 43    Parameters
 44    ----------
 45    select_columns: Optional[List[str]], default None
 46        If provided, only select these given columns.
 47        Otherwise select all available columns (i.e. `SELECT *`).
 48
 49    omit_columns: Optional[List[str]], default None
 50        If provided, remove these columns from the selection.
 51
 52    begin: Union[datetime, int, str, None], default None
 53        Lower bound datetime to begin searching for data (inclusive).
 54        Translates to a `WHERE` clause like `WHERE datetime >= begin`.
 55        Defaults to `None`.
 56
 57    end: Union[datetime, int, str, None], default None
 58        Upper bound datetime to stop searching for data (inclusive).
 59        Translates to a `WHERE` clause like `WHERE datetime < end`.
 60        Defaults to `None`.
 61
 62    params: Optional[Dict[str, Any]], default None
 63        Filter the retrieved data by a dictionary of parameters.
 64        See `meerschaum.utils.sql.build_where` for more details. 
 65
 66    as_iterator: bool, default False
 67        If `True`, return a generator of chunks of pipe data.
 68
 69    as_chunks: bool, default False
 70        Alias for `as_iterator`.
 71
 72    as_dask: bool, default False
 73        If `True`, return a `dask.DataFrame`
 74        (which may be loaded into a Pandas DataFrame with `df.compute()`).
 75
 76    chunk_interval: Union[timedelta, int, None], default None
 77        If `as_iterator`, then return chunks with `begin` and `end` separated by this interval.
 78        This may be set under `pipe.parameters['chunk_minutes']`.
 79        By default, use a timedelta of 1440 minutes (1 day).
 80        If `chunk_interval` is an integer and the `datetime` axis a timestamp,
 81        the use a timedelta with the number of minutes configured to this value.
 82        If the `datetime` axis is an integer, default to the configured chunksize.
 83        If `chunk_interval` is a `timedelta` and the `datetime` axis an integer,
 84        use the number of minutes in the `timedelta`.
 85
 86    order: Optional[str], default 'asc'
 87        If `order` is not `None`, sort the resulting dataframe by indices.
 88
 89    limit: Optional[int], default None
 90        If provided, cap the dataframe to this many rows.
 91
 92    fresh: bool, default True
 93        If `True`, skip local cache and directly query the instance connector.
 94        Defaults to `True`.
 95
 96    debug: bool, default False
 97        Verbosity toggle.
 98        Defaults to `False`.
 99
100    Returns
101    -------
102    A `pd.DataFrame` for the pipe's data corresponding to the provided parameters.
103
104    """
105    from meerschaum.utils.warnings import warn
106    from meerschaum.utils.venv import Venv
107    from meerschaum.connectors import get_connector_plugin
108    from meerschaum.utils.misc import iterate_chunks, items_str
109    from meerschaum.utils.dtypes import to_pandas_dtype, coerce_timezone
110    from meerschaum.utils.dataframe import add_missing_cols_to_df, df_is_chunk_generator
111    from meerschaum.utils.packages import attempt_import
112    dd = attempt_import('dask.dataframe') if as_dask else None
113    dask = attempt_import('dask') if as_dask else None
114    dateutil_parser = attempt_import('dateutil.parser')
115
116    if select_columns == '*':
117        select_columns = None
118    elif isinstance(select_columns, str):
119        select_columns = [select_columns]
120
121    if isinstance(omit_columns, str):
122        omit_columns = [omit_columns]
123
124    begin, end = self.parse_date_bounds(begin, end)
125    as_iterator = as_iterator or as_chunks
126    dt_col = self.columns.get('datetime', None)
127
128    def _sort_df(_df):
129        if df_is_chunk_generator(_df):
130            return _df
131        indices = [] if dt_col not in _df.columns else [dt_col]
132        non_dt_cols = [
133            col
134            for col_ix, col in self.columns.items()
135            if col_ix != 'datetime' and col in _df.columns
136        ]
137        indices.extend(non_dt_cols)
138        if 'dask' not in _df.__module__:
139            _df.sort_values(
140                by=indices,
141                inplace=True,
142                ascending=(str(order).lower() == 'asc'),
143            )
144            _df.reset_index(drop=True, inplace=True)
145        else:
146            _df = _df.sort_values(
147                by=indices,
148                ascending=(str(order).lower() == 'asc'),
149            )
150            _df = _df.reset_index(drop=True)
151        if limit is not None and len(_df) > limit:
152            return _df.head(limit)
153        return _df
154
155    if as_iterator or as_chunks:
156        df = self._get_data_as_iterator(
157            select_columns=select_columns,
158            omit_columns=omit_columns,
159            begin=begin,
160            end=end,
161            params=params,
162            chunk_interval=chunk_interval,
163            limit=limit,
164            order=order,
165            fresh=fresh,
166            debug=debug,
167        )
168        return _sort_df(df)
169
170    if as_dask:
171        from multiprocessing.pool import ThreadPool
172        dask_pool = ThreadPool(self.get_num_workers())
173        dask.config.set(pool=dask_pool)
174        chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug)
175        bounds = self.get_chunk_bounds(
176            begin=begin,
177            end=end,
178            bounded=False,
179            chunk_interval=chunk_interval,
180            debug=debug,
181        )
182        dask_chunks = [
183            dask.delayed(self.get_data)(
184                select_columns=select_columns,
185                omit_columns=omit_columns,
186                begin=chunk_begin,
187                end=chunk_end,
188                params=params,
189                chunk_interval=chunk_interval,
190                order=order,
191                limit=limit,
192                fresh=fresh,
193                debug=debug,
194            )
195            for (chunk_begin, chunk_end) in bounds
196        ]
197        dask_meta = {
198            col: to_pandas_dtype(typ)
199            for col, typ in self.dtypes.items()
200        }
201        return _sort_df(dd.from_delayed(dask_chunks, meta=dask_meta))
202
203    if not self.exists(debug=debug):
204        return None
205
206    if self.cache_pipe is not None:
207        if not fresh:
208            _sync_cache_tuple = self.cache_pipe.sync(
209                begin=begin,
210                end=end,
211                params=params,
212                debug=debug,
213                **kw
214            )
215            if not _sync_cache_tuple[0]:
216                warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1])
217                fresh = True
218            else: ### Successfully synced cache.
219                return self.enforce_dtypes(
220                    self.cache_pipe.get_data(
221                        select_columns=select_columns,
222                        omit_columns=omit_columns,
223                        begin=begin,
224                        end=end,
225                        params=params,
226                        order=order,
227                        limit=limit,
228                        debug=debug,
229                        fresh=True,
230                        **kw
231                    ),
232                    debug=debug,
233                )
234
235    with Venv(get_connector_plugin(self.instance_connector)):
236        df = self.instance_connector.get_pipe_data(
237            pipe=self,
238            select_columns=select_columns,
239            omit_columns=omit_columns,
240            begin=begin,
241            end=end,
242            params=params,
243            limit=limit,
244            order=order,
245            debug=debug,
246            **kw
247        )
248        if df is None:
249            return df
250
251        if not select_columns:
252            select_columns = [col for col in df.columns]
253
254        cols_to_omit = [
255            col
256            for col in df.columns
257            if (
258                col in (omit_columns or [])
259                or
260                col not in (select_columns or [])
261            )
262        ]
263        cols_to_add = [
264            col
265            for col in select_columns
266            if col not in df.columns
267        ]
268        if cols_to_omit:
269            warn(
270                (
271                    f"Received {len(cols_to_omit)} omitted column"
272                    + ('s' if len(cols_to_omit) != 1 else '')
273                    + f" for {self}. "
274                    + "Consider adding `select_columns` and `omit_columns` support to "
275                    + f"'{self.instance_connector.type}' connectors to improve performance."
276                ),
277                stack=False,
278            )
279            _cols_to_select = [col for col in df.columns if col not in cols_to_omit]
280            df = df[_cols_to_select]
281
282        if cols_to_add:
283            warn(
284                (
285                    f"Specified columns {items_str(cols_to_add)} were not found on {self}. "
286                    + "Adding these to the DataFrame as null columns."
287                ),
288                stack=False,
289            )
290            df = add_missing_cols_to_df(df, {col: 'string' for col in cols_to_add})
291
292        enforced_df = self.enforce_dtypes(df, debug=debug)
293
294        if order:
295            return _sort_df(enforced_df)
296        return enforced_df

Get a pipe's data from the instance connector.

Parameters
  • select_columns (Optional[List[str]], default None): If provided, only select these given columns. Otherwise select all available columns (i.e. SELECT *).
  • omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
  • begin (Union[datetime, int, str, None], default None): Lower bound datetime to begin searching for data (inclusive). Translates to a WHERE clause like WHERE datetime >= begin. Defaults to None.
  • end (Union[datetime, int, str, None], default None): Upper bound datetime to stop searching for data (inclusive). Translates to a WHERE clause like WHERE datetime < end. Defaults to None.
  • params (Optional[Dict[str, Any]], default None): Filter the retrieved data by a dictionary of parameters. See meerschaum.utils.sql.build_where for more details.
  • as_iterator (bool, default False): If True, return a generator of chunks of pipe data.
  • as_chunks (bool, default False): Alias for as_iterator.
  • as_dask (bool, default False): If True, return a dask.DataFrame (which may be loaded into a Pandas DataFrame with df.compute()).
  • chunk_interval (Union[timedelta, int, None], default None): If as_iterator, then return chunks with begin and end separated by this interval. This may be set under pipe.parameters['chunk_minutes']. By default, use a timedelta of 1440 minutes (1 day). If chunk_interval is an integer and the datetime axis a timestamp, the use a timedelta with the number of minutes configured to this value. If the datetime axis is an integer, default to the configured chunksize. If chunk_interval is a timedelta and the datetime axis an integer, use the number of minutes in the timedelta.
  • order (Optional[str], default 'asc'): If order is not None, sort the resulting dataframe by indices.
  • limit (Optional[int], default None): If provided, cap the dataframe to this many rows.
  • fresh (bool, default True): If True, skip local cache and directly query the instance connector. Defaults to True.
  • debug (bool, default False): Verbosity toggle. Defaults to False.
Returns
  • A pd.DataFrame for the pipe's data corresponding to the provided parameters.
def get_backtrack_data( self, backtrack_minutes: Optional[int] = None, begin: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, limit: Optional[int] = None, fresh: bool = False, debug: bool = False, **kw: Any) -> Optional[pandas.core.frame.DataFrame]:
388def get_backtrack_data(
389    self,
390    backtrack_minutes: Optional[int] = None,
391    begin: Union[datetime, int, None] = None,
392    params: Optional[Dict[str, Any]] = None,
393    limit: Optional[int] = None,
394    fresh: bool = False,
395    debug: bool = False,
396    **kw: Any
397) -> Optional['pd.DataFrame']:
398    """
399    Get the most recent data from the instance connector as a Pandas DataFrame.
400
401    Parameters
402    ----------
403    backtrack_minutes: Optional[int], default None
404        How many minutes from `begin` to select from.
405        If `None`, use `pipe.parameters['fetch']['backtrack_minutes']`.
406
407    begin: Optional[datetime], default None
408        The starting point to search for data.
409        If begin is `None` (default), use the most recent observed datetime
410        (AKA sync_time).
411
412        ```
413        E.g. begin = 02:00
414
415        Search this region.           Ignore this, even if there's data.
416        /  /  /  /  /  /  /  /  /  |
417        -----|----------|----------|----------|----------|----------|
418        00:00      01:00      02:00      03:00      04:00      05:00
419
420        ```
421
422    params: Optional[Dict[str, Any]], default None
423        The standard Meerschaum `params` query dictionary.
424
425    limit: Optional[int], default None
426        If provided, cap the number of rows to be returned.
427
428    fresh: bool, default False
429        If `True`, Ignore local cache and pull directly from the instance connector.
430        Only comes into effect if a pipe was created with `cache=True`.
431
432    debug: bool default False
433        Verbosity toggle.
434
435    Returns
436    -------
437    A `pd.DataFrame` for the pipe's data corresponding to the provided parameters. Backtrack data
438    is a convenient way to get a pipe's data "backtracked" from the most recent datetime.
439    """
440    from meerschaum.utils.warnings import warn
441    from meerschaum.utils.venv import Venv
442    from meerschaum.connectors import get_connector_plugin
443
444    if not self.exists(debug=debug):
445        return None
446
447    begin = self.parse_date_bounds(begin)
448
449    backtrack_interval = self.get_backtrack_interval(debug=debug)
450    if backtrack_minutes is None:
451        backtrack_minutes = (
452            (backtrack_interval.total_seconds() / 60)
453            if isinstance(backtrack_interval, timedelta)
454            else backtrack_interval
455        )
456
457    if self.cache_pipe is not None:
458        if not fresh:
459            _sync_cache_tuple = self.cache_pipe.sync(begin=begin, params=params, debug=debug, **kw)
460            if not _sync_cache_tuple[0]:
461                warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1])
462                fresh = True
463            else: ### Successfully synced cache.
464                return self.enforce_dtypes(
465                    self.cache_pipe.get_backtrack_data(
466                        fresh=True,
467                        begin=begin,
468                        backtrack_minutes=backtrack_minutes,
469                        params=params,
470                        limit=limit,
471                        order=kw.get('order', 'desc'),
472                        debug=debug,
473                        **kw
474                    ),
475                    debug=debug,
476                )
477
478    if hasattr(self.instance_connector, 'get_backtrack_data'):
479        with Venv(get_connector_plugin(self.instance_connector)):
480            return self.enforce_dtypes(
481                self.instance_connector.get_backtrack_data(
482                    pipe=self,
483                    begin=begin,
484                    backtrack_minutes=backtrack_minutes,
485                    params=params,
486                    limit=limit,
487                    debug=debug,
488                    **kw
489                ),
490                debug=debug,
491            )
492
493    if begin is None:
494        begin = self.get_sync_time(params=params, debug=debug)
495
496    backtrack_interval = (
497        timedelta(minutes=backtrack_minutes)
498        if isinstance(begin, datetime)
499        else backtrack_minutes
500    )
501    if begin is not None:
502        begin = begin - backtrack_interval
503
504    return self.get_data(
505        begin=begin,
506        params=params,
507        debug=debug,
508        limit=limit,
509        order=kw.get('order', 'desc'),
510        **kw
511    )

Get the most recent data from the instance connector as a Pandas DataFrame.

Parameters
  • backtrack_minutes (Optional[int], default None): How many minutes from begin to select from. If None, use pipe.parameters['fetch']['backtrack_minutes'].
  • begin (Optional[datetime], default None): The starting point to search for data. If begin is None (default), use the most recent observed datetime (AKA sync_time).

    E.g. begin = 02:00
    
    Search this region.           Ignore this, even if there's data.
    /  /  /  /  /  /  /  /  /  |
    -----|----------|----------|----------|----------|----------|
    00:00      01:00      02:00      03:00      04:00      05:00
    
    
  • params (Optional[Dict[str, Any]], default None): The standard Meerschaum params query dictionary.

  • limit (Optional[int], default None): If provided, cap the number of rows to be returned.
  • fresh (bool, default False): If True, Ignore local cache and pull directly from the instance connector. Only comes into effect if a pipe was created with cache=True.
  • debug (bool default False): Verbosity toggle.
Returns
  • A pd.DataFrame for the pipe's data corresponding to the provided parameters. Backtrack data
  • is a convenient way to get a pipe's data "backtracked" from the most recent datetime.
def get_rowcount( self, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, remote: bool = False, debug: bool = False) -> int:
514def get_rowcount(
515    self,
516    begin: Union[datetime, int, None] = None,
517    end: Union[datetime, int, None] = None,
518    params: Optional[Dict[str, Any]] = None,
519    remote: bool = False,
520    debug: bool = False
521) -> int:
522    """
523    Get a Pipe's instance or remote rowcount.
524
525    Parameters
526    ----------
527    begin: Optional[datetime], default None
528        Count rows where datetime > begin.
529
530    end: Optional[datetime], default None
531        Count rows where datetime < end.
532
533    remote: bool, default False
534        Count rows from a pipe's remote source.
535        **NOTE**: This is experimental!
536
537    debug: bool, default False
538        Verbosity toggle.
539
540    Returns
541    -------
542    An `int` of the number of rows in the pipe corresponding to the provided parameters.
543    Returned 0 if the pipe does not exist.
544    """
545    from meerschaum.utils.warnings import warn
546    from meerschaum.utils.venv import Venv
547    from meerschaum.connectors import get_connector_plugin
548
549    begin, end = self.parse_date_bounds(begin, end)
550    connector = self.instance_connector if not remote else self.connector
551    try:
552        with Venv(get_connector_plugin(connector)):
553            rowcount = connector.get_pipe_rowcount(
554                self,
555                begin=begin,
556                end=end,
557                params=params,
558                remote=remote,
559                debug=debug,
560            )
561            if rowcount is None:
562                return 0
563            return rowcount
564    except AttributeError as e:
565        warn(e)
566        if remote:
567            return 0
568    warn(f"Failed to get a rowcount for {self}.")
569    return 0

Get a Pipe's instance or remote rowcount.

Parameters
  • begin (Optional[datetime], default None): Count rows where datetime > begin.
  • end (Optional[datetime], default None): Count rows where datetime < end.
  • remote (bool, default False): Count rows from a pipe's remote source. NOTE: This is experimental!
  • debug (bool, default False): Verbosity toggle.
Returns
  • An int of the number of rows in the pipe corresponding to the provided parameters.
  • Returned 0 if the pipe does not exist.
def get_chunk_interval( self, chunk_interval: Union[datetime.timedelta, int, NoneType] = None, debug: bool = False) -> Union[datetime.timedelta, int]:
572def get_chunk_interval(
573    self,
574    chunk_interval: Union[timedelta, int, None] = None,
575    debug: bool = False,
576) -> Union[timedelta, int]:
577    """
578    Get the chunk interval to use for this pipe.
579
580    Parameters
581    ----------
582    chunk_interval: Union[timedelta, int, None], default None
583        If provided, coerce this value into the correct type.
584        For example, if the datetime axis is an integer, then
585        return the number of minutes.
586
587    Returns
588    -------
589    The chunk interval (`timedelta` or `int`) to use with this pipe's `datetime` axis.
590    """
591    default_chunk_minutes = get_config('pipes', 'parameters', 'verify', 'chunk_minutes')
592    configured_chunk_minutes = self.parameters.get('verify', {}).get('chunk_minutes', None)
593    chunk_minutes = (
594        (configured_chunk_minutes or default_chunk_minutes)
595        if chunk_interval is None
596        else (
597            chunk_interval
598            if isinstance(chunk_interval, int)
599            else int(chunk_interval.total_seconds() / 60)
600        )
601    )
602
603    dt_col = self.columns.get('datetime', None)
604    if dt_col is None:
605        return timedelta(minutes=chunk_minutes)
606
607    dt_dtype = self.dtypes.get(dt_col, 'datetime64[ns, UTC]')
608    if 'int' in dt_dtype.lower():
609        return chunk_minutes
610    return timedelta(minutes=chunk_minutes)

Get the chunk interval to use for this pipe.

Parameters
  • chunk_interval (Union[timedelta, int, None], default None): If provided, coerce this value into the correct type. For example, if the datetime axis is an integer, then return the number of minutes.
Returns
  • The chunk interval (timedelta or int) to use with this pipe's datetime axis.
def get_chunk_bounds( self, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, bounded: bool = False, chunk_interval: Union[datetime.timedelta, int, NoneType] = None, debug: bool = False) -> List[Tuple[Union[datetime.datetime, int, NoneType], Union[datetime.datetime, int, NoneType]]]:
613def get_chunk_bounds(
614    self,
615    begin: Union[datetime, int, None] = None,
616    end: Union[datetime, int, None] = None,
617    bounded: bool = False,
618    chunk_interval: Union[timedelta, int, None] = None,
619    debug: bool = False,
620) -> List[
621    Tuple[
622        Union[datetime, int, None],
623        Union[datetime, int, None],
624    ]
625]:
626    """
627    Return a list of datetime bounds for iterating over the pipe's `datetime` axis.
628
629    Parameters
630    ----------
631    begin: Union[datetime, int, None], default None
632        If provided, do not select less than this value.
633        Otherwise the first chunk will be unbounded.
634
635    end: Union[datetime, int, None], default None
636        If provided, do not select greater than or equal to this value.
637        Otherwise the last chunk will be unbounded.
638
639    bounded: bool, default False
640        If `True`, do not include `None` in the first chunk.
641
642    chunk_interval: Union[timedelta, int, None], default None
643        If provided, use this interval for the size of chunk boundaries.
644        The default value for this pipe may be set
645        under `pipe.parameters['verify']['chunk_minutes']`.
646
647    debug: bool, default False
648        Verbosity toggle.
649
650    Returns
651    -------
652    A list of chunk bounds (datetimes or integers).
653    If unbounded, the first and last chunks will include `None`.
654    """
655    include_less_than_begin = not bounded and begin is None
656    include_greater_than_end = not bounded and end is None
657    if begin is None:
658        begin = self.get_sync_time(newest=False, debug=debug)
659    if end is None:
660        end = self.get_sync_time(newest=True, debug=debug)
661    if begin is None and end is None:
662        return [(None, None)]
663
664    begin, end = self.parse_date_bounds(begin, end)
665
666    ### Set the chunk interval under `pipe.parameters['verify']['chunk_minutes']`.
667    chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug)
668    
669    ### Build a list of tuples containing the chunk boundaries
670    ### so that we can sync multiple chunks in parallel.
671    ### Run `verify pipes --workers 1` to sync chunks in series.
672    chunk_bounds = []
673    begin_cursor = begin
674    while begin_cursor < end:
675        end_cursor = begin_cursor + chunk_interval
676        chunk_bounds.append((begin_cursor, end_cursor))
677        begin_cursor = end_cursor
678
679    ### The chunk interval might be too large.
680    if not chunk_bounds and end >= begin:
681        chunk_bounds = [(begin, end)]
682
683    ### Truncate the last chunk to the end timestamp.
684    if chunk_bounds[-1][1] > end:
685        chunk_bounds[-1] = (chunk_bounds[-1][0], end)
686
687    ### Pop the last chunk if its bounds are equal.
688    if chunk_bounds[-1][0] == chunk_bounds[-1][1]:
689        chunk_bounds = chunk_bounds[:-1]
690
691    if include_less_than_begin:
692        chunk_bounds = [(None, begin)] + chunk_bounds
693    if include_greater_than_end:
694        chunk_bounds = chunk_bounds + [(end, None)]
695
696    return chunk_bounds

Return a list of datetime bounds for iterating over the pipe's datetime axis.

Parameters
  • begin (Union[datetime, int, None], default None): If provided, do not select less than this value. Otherwise the first chunk will be unbounded.
  • end (Union[datetime, int, None], default None): If provided, do not select greater than or equal to this value. Otherwise the last chunk will be unbounded.
  • bounded (bool, default False): If True, do not include None in the first chunk.
  • chunk_interval (Union[timedelta, int, None], default None): If provided, use this interval for the size of chunk boundaries. The default value for this pipe may be set under pipe.parameters['verify']['chunk_minutes'].
  • debug (bool, default False): Verbosity toggle.
Returns
  • A list of chunk bounds (datetimes or integers).
  • If unbounded, the first and last chunks will include None.
def parse_date_bounds( self, *dt_vals: Union[datetime.datetime, int, NoneType]) -> Union[datetime.datetime, int, str, NoneType, Tuple[Union[datetime.datetime, int, str, NoneType]]]:
699def parse_date_bounds(self, *dt_vals: Union[datetime, int, None]) -> Union[
700    datetime,
701    int,
702    str,
703    None,
704    Tuple[Union[datetime, int, str, None]]
705]:
706    """
707    Given a date bound (begin, end), coerce a timezone if necessary.
708    """
709    from meerschaum.utils.misc import is_int
710    from meerschaum.utils.dtypes import coerce_timezone
711    from meerschaum.utils.warnings import warn
712    dateutil_parser = mrsm.attempt_import('dateutil.parser')
713
714    def _parse_date_bound(dt_val):
715        if dt_val is None:
716            return None
717
718        if isinstance(dt_val, int):
719            return dt_val
720
721        if dt_val == '':
722            return ''
723
724        if is_int(dt_val):
725            return int(dt_val)
726
727        if isinstance(dt_val, str):
728            try:
729                dt_val = dateutil_parser.parse(dt_val)
730            except Exception as e:
731                warn(f"Could not parse '{dt_val}' as datetime:\n{e}")
732                return None
733
734        dt_col = self.columns.get('datetime', None)
735        dt_typ = str(self.dtypes.get(dt_col, 'datetime64[ns, UTC]'))
736        if dt_typ == 'datetime':
737            dt_typ = 'datetime64[ns, UTC]'
738        return coerce_timezone(dt_val, strip_utc=('utc' not in dt_typ.lower()))
739
740    bounds = tuple(_parse_date_bound(dt_val) for dt_val in dt_vals)
741    if len(bounds) == 1:
742        return bounds[0]
743    return bounds

Given a date bound (begin, end), coerce a timezone if necessary.

def register(self, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
12def register(
13        self,
14        debug: bool = False,
15        **kw: Any
16    ) -> SuccessTuple:
17    """
18    Register a new Pipe along with its attributes.
19
20    Parameters
21    ----------
22    debug: bool, default False
23        Verbosity toggle.
24
25    kw: Any
26        Keyword arguments to pass to `instance_connector.register_pipe()`.
27
28    Returns
29    -------
30    A `SuccessTuple` of success, message.
31    """
32    if self.temporary:
33        return False, "Cannot register pipes created with `temporary=True` (read-only)."
34
35    from meerschaum.utils.formatting import get_console
36    from meerschaum.utils.venv import Venv
37    from meerschaum.connectors import get_connector_plugin, custom_types
38    from meerschaum.config._patch import apply_patch_to_config
39
40    import warnings
41    with warnings.catch_warnings():
42        warnings.simplefilter('ignore')
43        try:
44            _conn = self.connector
45        except Exception as e:
46            _conn = None
47
48    if (
49        _conn is not None
50        and
51        (_conn.type == 'plugin' or _conn.type in custom_types)
52        and
53        getattr(_conn, 'register', None) is not None
54    ):
55        try:
56            with Venv(get_connector_plugin(_conn), debug=debug):
57                params = self.connector.register(self)
58        except Exception as e:
59            get_console().print_exception()
60            params = None
61        params = {} if params is None else params
62        if not isinstance(params, dict):
63            from meerschaum.utils.warnings import warn
64            warn(
65                f"Invalid parameters returned from `register()` in connector {self.connector}:\n"
66                + f"{params}"
67            )
68        else:
69            self.parameters = apply_patch_to_config(params, self.parameters)
70
71    if not self.parameters:
72        cols = self.columns if self.columns else {'datetime': None, 'id': None}
73        self.parameters = {
74            'columns': cols,
75        }
76
77    with Venv(get_connector_plugin(self.instance_connector)):
78        return self.instance_connector.register_pipe(self, debug=debug, **kw)

Register a new Pipe along with its attributes.

Parameters
  • debug (bool, default False): Verbosity toggle.
  • kw (Any): Keyword arguments to pass to instance_connector.register_pipe().
Returns
attributes: Dict[str, Any]
19@property
20def attributes(self) -> Dict[str, Any]:
21    """
22    Return a dictionary of a pipe's keys and parameters.
23    These values are reflected directly from the pipes table of the instance.
24    """
25    import time
26    from meerschaum.config import get_config
27    from meerschaum.config._patch import apply_patch_to_config
28    from meerschaum.utils.venv import Venv
29    from meerschaum.connectors import get_connector_plugin
30
31    timeout_seconds = get_config('pipes', 'attributes', 'local_cache_timeout_seconds')
32
33    if '_attributes' not in self.__dict__:
34        self._attributes = {}
35
36    now = time.perf_counter()
37    last_refresh = self.__dict__.get('_attributes_sync_time', None)
38    timed_out = (
39        last_refresh is None
40        or
41        (timeout_seconds is not None and (now - last_refresh) >= timeout_seconds)
42    )
43    if not self.temporary and timed_out:
44        self._attributes_sync_time = now
45        local_attributes = self.__dict__.get('_attributes', {})
46        with Venv(get_connector_plugin(self.instance_connector)):
47            instance_attributes = self.instance_connector.get_pipe_attributes(self)
48        self._attributes = apply_patch_to_config(instance_attributes, local_attributes)
49    return self._attributes

Return a dictionary of a pipe's keys and parameters. These values are reflected directly from the pipes table of the instance.

parameters: Optional[Dict[str, Any]]
52@property
53def parameters(self) -> Optional[Dict[str, Any]]:
54    """
55    Return the parameters dictionary of the pipe.
56    """
57    if 'parameters' not in self.attributes:
58        self.attributes['parameters'] = {}
59    _parameters = self.attributes['parameters']
60    dt_col = _parameters.get('columns', {}).get('datetime', None)
61    dt_typ = _parameters.get('dtypes', {}).get(dt_col, None) if dt_col else None
62    if dt_col and not dt_typ:
63        if 'dtypes' not in _parameters:
64            self.attributes['parameters']['dtypes'] = {}
65        self.attributes['parameters']['dtypes'][dt_col] = 'datetime'
66    return self.attributes['parameters']

Return the parameters dictionary of the pipe.

columns: Optional[Dict[str, str]]
78@property
79def columns(self) -> Union[Dict[str, str], None]:
80    """
81    Return the `columns` dictionary defined in `meerschaum.Pipe.parameters`.
82    """
83    if 'columns' not in self.parameters:
84        self.parameters['columns'] = {}
85    cols = self.parameters['columns']
86    if not isinstance(cols, dict):
87        cols = {}
88        self.parameters['columns'] = cols
89    return {col_ix: col for col_ix, col in cols.items() if col}

Return the columns dictionary defined in meerschaum.Pipe.parameters.

indices: Optional[Dict[str, Union[str, List[str]]]]
106@property
107def indices(self) -> Union[Dict[str, Union[str, List[str]]], None]:
108    """
109    Return the `indices` dictionary defined in `meerschaum.Pipe.parameters`.
110    """
111    indices_key = (
112        'indexes'
113        if 'indexes' in self.parameters
114        else 'indices'
115    )
116    if indices_key not in self.parameters:
117        self.parameters[indices_key] = {}
118    _indices = self.parameters[indices_key]
119    _columns = self.columns
120    dt_col = _columns.get('datetime', None)
121    if not isinstance(_indices, dict):
122        _indices = {}
123        self.parameters[indices_key] = _indices
124    unique_cols = list(set((
125        [dt_col]
126        if dt_col
127        else []
128    ) + [
129        col
130        for col_ix, col in _columns.items()
131        if col and col_ix != 'datetime'
132    ]))
133    return {
134        **({'unique': unique_cols} if len(unique_cols) > 1 else {}),
135        **{col_ix: col for col_ix, col in _columns.items() if col},
136        **_indices
137    }

Return the indices dictionary defined in meerschaum.Pipe.parameters.

indexes: Optional[Dict[str, Union[str, List[str]]]]
140@property
141def indexes(self) -> Union[Dict[str, Union[str, List[str]]], None]:
142    """
143    Alias for `meerschaum.Pipe.indices`.
144    """
145    return self.indices
dtypes: Optional[Dict[str, Any]]
198@property
199def dtypes(self) -> Union[Dict[str, Any], None]:
200    """
201    If defined, return the `dtypes` dictionary defined in `meerschaum.Pipe.parameters`.
202    """
203    from meerschaum.config._patch import apply_patch_to_config
204    configured_dtypes = self.parameters.get('dtypes', {})
205    remote_dtypes = self.infer_dtypes(persist=False)
206    patched_dtypes = apply_patch_to_config(remote_dtypes, configured_dtypes)
207    return patched_dtypes

If defined, return the dtypes dictionary defined in meerschaum.Pipe.parameters.

autoincrement: bool
255@property
256def autoincrement(self) -> bool:
257    """
258    Return the `autoincrement` parameter for the pipe.
259    """
260    if 'autoincrement' not in self.parameters:
261        self.parameters['autoincrement'] = False
262
263    return self.parameters['autoincrement']

Return the autoincrement parameter for the pipe.

upsert: bool
219@property
220def upsert(self) -> bool:
221    """
222    Return whether `upsert` is set for the pipe.
223    """
224    if 'upsert' not in self.parameters:
225        self.parameters['upsert'] = False
226    return self.parameters['upsert']

Return whether upsert is set for the pipe.

static: bool
237@property
238def static(self) -> bool:
239    """
240    Return whether `static` is set for the pipe.
241    """
242    if 'static' not in self.parameters:
243        self.parameters['static'] = False
244    return self.parameters['static']

Return whether static is set for the pipe.

tzinfo: Optional[datetime.timezone]
274@property
275def tzinfo(self) -> Union[None, timezone]:
276    """
277    Return `timezone.utc` if the pipe is timezone-aware.
278    """
279    dt_col = self.columns.get('datetime', None)
280    if not dt_col:
281        return None
282
283    dt_typ = str(self.dtypes.get(dt_col, 'datetime64[ns, UTC]'))
284    if 'utc' in dt_typ.lower() or dt_typ == 'datetime':
285        return timezone.utc
286
287    if dt_typ == 'datetime64[ns]':
288        return None
289
290    return None

Return timezone.utc if the pipe is timezone-aware.

def get_columns(self, *args: str, error: bool = False) -> Union[str, Tuple[str]]:
293def get_columns(self, *args: str, error: bool = False) -> Union[str, Tuple[str]]:
294    """
295    Check if the requested columns are defined.
296
297    Parameters
298    ----------
299    *args: str
300        The column names to be retrieved.
301
302    error: bool, default False
303        If `True`, raise an `Exception` if the specified column is not defined.
304
305    Returns
306    -------
307    A tuple of the same size of `args` or a `str` if `args` is a single argument.
308
309    Examples
310    --------
311    >>> pipe = mrsm.Pipe('test', 'test')
312    >>> pipe.columns = {'datetime': 'dt', 'id': 'id'}
313    >>> pipe.get_columns('datetime', 'id')
314    ('dt', 'id')
315    >>> pipe.get_columns('value', error=True)
316    Exception:  🛑 Missing 'value' column for Pipe('test', 'test').
317    """
318    from meerschaum.utils.warnings import error as _error, warn
319    if not args:
320        args = tuple(self.columns.keys())
321    col_names = []
322    for col in args:
323        col_name = None
324        try:
325            col_name = self.columns[col]
326            if col_name is None and error:
327                _error(f"Please define the name of the '{col}' column for {self}.")
328        except Exception as e:
329            col_name = None
330        if col_name is None and error:
331            _error(f"Missing '{col}'" + f" column for {self}.")
332        col_names.append(col_name)
333    if len(col_names) == 1:
334        return col_names[0]
335    return tuple(col_names)

Check if the requested columns are defined.

Parameters
  • *args (str): The column names to be retrieved.
  • error (bool, default False): If True, raise an Exception if the specified column is not defined.
Returns
  • A tuple of the same size of args or a str if args is a single argument.
Examples
>>> pipe = mrsm.Pipe('test', 'test')
>>> pipe.columns = {'datetime': 'dt', 'id': 'id'}
>>> pipe.get_columns('datetime', 'id')
('dt', 'id')
>>> pipe.get_columns('value', error=True)
Exception:  🛑 Missing 'value' column for Pipe('test', 'test').
def get_columns_types( self, refresh: bool = False, debug: bool = False) -> Optional[Dict[str, str]]:
338def get_columns_types(
339    self,
340    refresh: bool = False,
341    debug: bool = False,
342) -> Union[Dict[str, str], None]:
343    """
344    Get a dictionary of a pipe's column names and their types.
345
346    Parameters
347    ----------
348    refresh: bool, default False
349        If `True`, invalidate the cache and fetch directly from the instance connector.
350
351    debug: bool, default False:
352        Verbosity toggle.
353
354    Returns
355    -------
356    A dictionary of column names (`str`) to column types (`str`).
357
358    Examples
359    --------
360    >>> pipe.get_columns_types()
361    {
362      'dt': 'TIMESTAMP WITH TIMEZONE',
363      'id': 'BIGINT',
364      'val': 'DOUBLE PRECISION',
365    }
366    >>>
367    """
368    import time
369    from meerschaum.connectors import get_connector_plugin
370    from meerschaum.config.static import STATIC_CONFIG
371    from meerschaum.utils.warnings import dprint
372
373    now = time.perf_counter()
374    cache_seconds = STATIC_CONFIG['pipes']['static_schema_cache_seconds']
375    if not self.static:
376        refresh = True
377    if refresh:
378        _ = self.__dict__.pop('_columns_types_timestamp', None)
379        _ = self.__dict__.pop('_columns_types', None)
380    _columns_types = self.__dict__.get('_columns_types', None)
381    if _columns_types:
382        columns_types_timestamp = self.__dict__.get('_columns_types_timestamp', None)
383        if columns_types_timestamp is not None:
384            delta = now - columns_types_timestamp
385            if delta < cache_seconds:
386                if debug:
387                    dprint(
388                        f"Returning cached `columns_types` for {self} "
389                        f"({round(delta, 2)} seconds old)."
390                    )
391                return _columns_types
392
393    with mrsm.Venv(get_connector_plugin(self.instance_connector)):
394        _columns_types = (
395            self.instance_connector.get_pipe_columns_types(self, debug=debug)
396            if hasattr(self.instance_connector, 'get_pipe_columns_types')
397            else None
398        )
399
400    self.__dict__['_columns_types'] = _columns_types
401    self.__dict__['_columns_types_timestamp'] = now
402    return _columns_types or {}

Get a dictionary of a pipe's column names and their types.

Parameters
  • refresh (bool, default False): If True, invalidate the cache and fetch directly from the instance connector.
  • debug (bool, default False:): Verbosity toggle.
Returns
  • A dictionary of column names (str) to column types (str).
Examples
>>> pipe.get_columns_types()
{
  'dt': 'TIMESTAMP WITH TIMEZONE',
  'id': 'BIGINT',
  'val': 'DOUBLE PRECISION',
}
>>>
def get_columns_indices( self, debug: bool = False, refresh: bool = False) -> Dict[str, List[Dict[str, str]]]:
405def get_columns_indices(
406    self,
407    debug: bool = False,
408    refresh: bool = False,
409) -> Dict[str, List[Dict[str, str]]]:
410    """
411    Return a dictionary mapping columns to index information.
412    """
413    import time
414    from meerschaum.connectors import get_connector_plugin
415    from meerschaum.config.static import STATIC_CONFIG
416    from meerschaum.utils.warnings import dprint
417
418    now = time.perf_counter()
419    cache_seconds = (
420        STATIC_CONFIG['pipes']['static_schema_cache_seconds']
421        if self.static
422        else STATIC_CONFIG['pipes']['exists_timeout_seconds']
423    )
424    if refresh:
425        _ = self.__dict__.pop('_columns_indices_timestamp', None)
426        _ = self.__dict__.pop('_columns_indices', None)
427    _columns_indices = self.__dict__.get('_columns_indices', None)
428    if _columns_indices:
429        columns_indices_timestamp = self.__dict__.get('_columns_indices_timestamp', None)
430        if columns_indices_timestamp is not None:
431            delta = now - columns_indices_timestamp
432            if delta < cache_seconds:
433                if debug:
434                    dprint(
435                        f"Returning cached `columns_indices` for {self} "
436                        f"({round(delta, 2)} seconds old)."
437                    )
438                return _columns_indices
439
440    with mrsm.Venv(get_connector_plugin(self.instance_connector)):
441        _columns_indices = (
442            self.instance_connector.get_pipe_columns_indices(self, debug=debug)
443            if hasattr(self.instance_connector, 'get_pipe_columns_indices')
444            else None
445        )
446
447    self.__dict__['_columns_indices'] = _columns_indices
448    self.__dict__['_columns_indices_timestamp'] = now
449    return _columns_indices or {}

Return a dictionary mapping columns to index information.

def get_indices(self) -> Dict[str, str]:
689def get_indices(self) -> Dict[str, str]:
690    """
691    Return a dictionary mapping index keys to their names on the database.
692
693    Returns
694    -------
695    A dictionary of index keys to column names.
696    """
697    _parameters = self.parameters
698    _index_template = _parameters.get('index_template', "IX_{target}_{column_names}")
699    _indices = self.indices
700    _target = self.target
701    _column_names = {
702        ix: (
703            '_'.join(cols)
704            if isinstance(cols, (list, tuple))
705            else str(cols)
706        )
707        for ix, cols in _indices.items()
708        if cols
709    }
710    _index_names = {
711        ix: _index_template.format(
712            target=_target,
713            column_names=column_names,
714            connector_keys=self.connector_keys,
715            metric_key=self.connector_key,
716            location_key=self.location_key,
717        )
718        for ix, column_names in _column_names.items()
719    }
720    ### NOTE: Skip any duplicate indices.
721    seen_index_names = {}
722    for ix, index_name in _index_names.items():
723        if index_name in seen_index_names:
724            continue
725        seen_index_names[index_name] = ix
726    return {
727        ix: index_name
728        for index_name, ix in seen_index_names.items()
729    }

Return a dictionary mapping index keys to their names on the database.

Returns
  • A dictionary of index keys to column names.
tags: Optional[List[str]]
173@property
174def tags(self) -> Union[List[str], None]:
175    """
176    If defined, return the `tags` list defined in `meerschaum.Pipe.parameters`.
177    """
178    if 'tags' not in self.parameters:
179        self.parameters['tags'] = []
180    return self.parameters['tags']

If defined, return the tags list defined in meerschaum.Pipe.parameters.

def get_id(self, **kw: Any) -> Optional[int]:
452def get_id(self, **kw: Any) -> Union[int, None]:
453    """
454    Fetch a pipe's ID from its instance connector.
455    If the pipe does not exist, return `None`.
456    """
457    if self.temporary:
458        return None
459    from meerschaum.utils.venv import Venv
460    from meerschaum.connectors import get_connector_plugin
461
462    with Venv(get_connector_plugin(self.instance_connector)):
463        if hasattr(self.instance_connector, 'get_pipe_id'):
464            return self.instance_connector.get_pipe_id(self, **kw)
465
466    return None

Fetch a pipe's ID from its instance connector. If the pipe does not exist, return None.

id: Optional[int]
469@property
470def id(self) -> Union[int, None]:
471    """
472    Fetch and cache a pipe's ID.
473    """
474    if not ('_id' in self.__dict__ and self._id):
475        self._id = self.get_id()
476    return self._id

Fetch and cache a pipe's ID.

def get_val_column(self, debug: bool = False) -> Optional[str]:
479def get_val_column(self, debug: bool = False) -> Union[str, None]:
480    """
481    Return the name of the value column if it's defined, otherwise make an educated guess.
482    If not set in the `columns` dictionary, return the first numeric column that is not
483    an ID or datetime column.
484    If none may be found, return `None`.
485
486    Parameters
487    ----------
488    debug: bool, default False:
489        Verbosity toggle.
490
491    Returns
492    -------
493    Either a string or `None`.
494    """
495    from meerschaum.utils.debug import dprint
496    if debug:
497        dprint('Attempting to determine the value column...')
498    try:
499        val_name = self.get_columns('value')
500    except Exception as e:
501        val_name = None
502    if val_name is not None:
503        if debug:
504            dprint(f"Value column: {val_name}")
505        return val_name
506
507    cols = self.columns
508    if cols is None:
509        if debug:
510            dprint('No columns could be determined. Returning...')
511        return None
512    try:
513        dt_name = self.get_columns('datetime', error=False)
514    except Exception as e:
515        dt_name = None
516    try:
517        id_name = self.get_columns('id', errors=False)
518    except Exception as e:
519        id_name = None
520
521    if debug:
522        dprint(f"dt_name: {dt_name}")
523        dprint(f"id_name: {id_name}")
524
525    cols_types = self.get_columns_types(debug=debug)
526    if cols_types is None:
527        return None
528    if debug:
529        dprint(f"cols_types: {cols_types}")
530    if dt_name is not None:
531        cols_types.pop(dt_name, None)
532    if id_name is not None:
533        cols_types.pop(id_name, None)
534
535    candidates = []
536    candidate_keywords = {'float', 'double', 'precision', 'int', 'numeric',}
537    for search_term in candidate_keywords:
538        for col, typ in cols_types.items():
539            if search_term in typ.lower():
540                candidates.append(col)
541                break
542    if not candidates:
543        if debug:
544            dprint("No value column could be determined.")
545        return None
546
547    return candidates[0]

Return the name of the value column if it's defined, otherwise make an educated guess. If not set in the columns dictionary, return the first numeric column that is not an ID or datetime column. If none may be found, return None.

Parameters
  • debug (bool, default False:): Verbosity toggle.
Returns
  • Either a string or None.
parents: List[Pipe]
550@property
551def parents(self) -> List[meerschaum.Pipe]:
552    """
553    Return a list of `meerschaum.Pipe` objects to be designated as parents.
554    """
555    if 'parents' not in self.parameters:
556        return []
557    from meerschaum.utils.warnings import warn
558    _parents_keys = self.parameters['parents']
559    if not isinstance(_parents_keys, list):
560        warn(
561            f"Please ensure the parents for {self} are defined as a list of keys.",
562            stacklevel = 4
563        )
564        return []
565    from meerschaum import Pipe
566    _parents = []
567    for keys in _parents_keys:
568        try:
569            p = Pipe(**keys)
570        except Exception as e:
571            warn(f"Unable to build parent with keys '{keys}' for {self}:\n{e}")
572            continue
573        _parents.append(p)
574    return _parents

Return a list of meerschaum.Pipe objects to be designated as parents.

children: List[Pipe]
577@property
578def children(self) -> List[meerschaum.Pipe]:
579    """
580    Return a list of `meerschaum.Pipe` objects to be designated as children.
581    """
582    if 'children' not in self.parameters:
583        return []
584    from meerschaum.utils.warnings import warn
585    _children_keys = self.parameters['children']
586    if not isinstance(_children_keys, list):
587        warn(
588            f"Please ensure the children for {self} are defined as a list of keys.",
589            stacklevel = 4
590        )
591        return []
592    from meerschaum import Pipe
593    _children = []
594    for keys in _children_keys:
595        try:
596            p = Pipe(**keys)
597        except Exception as e:
598            warn(f"Unable to build parent with keys '{keys}' for {self}:\n{e}")
599            continue
600        _children.append(p)
601    return _children

Return a list of meerschaum.Pipe objects to be designated as children.

target: str
604@property
605def target(self) -> str:
606    """
607    The target table name.
608    You can set the target name under on of the following keys
609    (checked in this order):
610      - `target`
611      - `target_name`
612      - `target_table`
613      - `target_table_name`
614    """
615    if 'target' not in self.parameters:
616        default_target = self._target_legacy()
617        default_targets = {default_target}
618        potential_keys = ('target_name', 'target_table', 'target_table_name')
619        _target = None
620        for k in potential_keys:
621            if k in self.parameters:
622                _target = self.parameters[k]
623                break
624
625        _target = _target or default_target
626
627        if self.instance_connector.type == 'sql':
628            from meerschaum.utils.sql import truncate_item_name
629            truncated_target = truncate_item_name(_target, self.instance_connector.flavor)
630            default_targets.add(truncated_target)
631            warned_target = self.__dict__.get('_warned_target', False)
632            if truncated_target != _target and not warned_target:
633                if not warned_target:
634                    warn(
635                        f"The target '{_target}' is too long for '{self.instance_connector.flavor}', "
636                        + f"will use {truncated_target} instead."
637                    )
638                    self.__dict__['_warned_target'] = True
639                _target = truncated_target
640
641        if _target in default_targets:
642            return _target
643        self.target = _target
644    return self.parameters['target']

The target table name. You can set the target name under on of the following keys (checked in this order):

  • target
  • target_name
  • target_table
  • target_table_name
def guess_datetime(self) -> Optional[str]:
667def guess_datetime(self) -> Union[str, None]:
668    """
669    Try to determine a pipe's datetime column.
670    """
671    _dtypes = self.dtypes
672
673    ### Abort if the user explictly disallows a datetime index.
674    if 'datetime' in _dtypes:
675        if _dtypes['datetime'] is None:
676            return None
677
678    from meerschaum.utils.dtypes import are_dtypes_equal
679    dt_cols = [
680        col
681        for col, typ in _dtypes.items()
682        if are_dtypes_equal(typ, 'datetime')
683    ]
684    if not dt_cols:
685        return None
686    return dt_cols[0]

Try to determine a pipe's datetime column.

def show( self, nopretty: bool = False, debug: bool = False, **kw) -> Tuple[bool, str]:
12def show(
13        self,
14        nopretty: bool = False,
15        debug: bool = False,
16        **kw
17    ) -> SuccessTuple:
18    """
19    Show attributes of a Pipe.
20
21    Parameters
22    ----------
23    nopretty: bool, default False
24        If `True`, simply print the JSON of the pipe's attributes.
25
26    debug: bool, default False
27        Verbosity toggle.
28
29    Returns
30    -------
31    A `SuccessTuple` of success, message.
32
33    """
34    import json
35    from meerschaum.utils.formatting import (
36        pprint, make_header, ANSI, highlight_pipes, fill_ansi, get_console,
37    )
38    from meerschaum.utils.packages import import_rich, attempt_import
39    from meerschaum.utils.warnings import info
40    attributes_json = json.dumps(self.attributes)
41    if not nopretty:
42        _to_print = f"Attributes for {self}:"
43        if ANSI:
44            _to_print = fill_ansi(highlight_pipes(make_header(_to_print)), 'magenta')
45            print(_to_print)
46            rich = import_rich()
47            rich_json = attempt_import('rich.json')
48            get_console().print(rich_json.JSON(attributes_json))
49        else:
50            print(_to_print)
51    else:
52        print(attributes_json)
53
54    return True, "Success"

Show attributes of a Pipe.

Parameters
  • nopretty (bool, default False): If True, simply print the JSON of the pipe's attributes.
  • debug (bool, default False): Verbosity toggle.
Returns
def edit( self, patch: bool = False, interactive: bool = False, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
21def edit(
22    self,
23    patch: bool = False,
24    interactive: bool = False,
25    debug: bool = False,
26    **kw: Any
27) -> SuccessTuple:
28    """
29    Edit a Pipe's configuration.
30
31    Parameters
32    ----------
33    patch: bool, default False
34        If `patch` is True, update parameters by cascading rather than overwriting.
35    interactive: bool, default False
36        If `True`, open an editor for the user to make changes to the pipe's YAML file.
37    debug: bool, default False
38        Verbosity toggle.
39
40    Returns
41    -------
42    A `SuccessTuple` of success, message.
43
44    """
45    from meerschaum.utils.venv import Venv
46    from meerschaum.connectors import get_connector_plugin
47
48    if self.temporary:
49        return False, "Cannot edit pipes created with `temporary=True` (read-only)."
50
51    if not interactive:
52        with Venv(get_connector_plugin(self.instance_connector)):
53            return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw)
54
55    from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
56    from meerschaum.utils.misc import edit_file
57    parameters_filename = str(self) + '.yaml'
58    parameters_path = PIPES_CACHE_RESOURCES_PATH / parameters_filename
59
60    from meerschaum.utils.yaml import yaml
61
62    edit_text = f"Edit the parameters for {self}"
63    edit_top = '#' * (len(edit_text) + 4)
64    edit_header = edit_top + f'\n# {edit_text} #\n' + edit_top + '\n\n'
65
66    from meerschaum.config import get_config
67    parameters = dict(get_config('pipes', 'parameters', patch=True))
68    from meerschaum.config._patch import apply_patch_to_config
69    parameters = apply_patch_to_config(parameters, self.parameters)
70
71    ### write parameters to yaml file
72    with open(parameters_path, 'w+') as f:
73        f.write(edit_header)
74        yaml.dump(parameters, stream=f, sort_keys=False)
75
76    ### only quit editing if yaml is valid
77    editing = True
78    while editing:
79        edit_file(parameters_path)
80        try:
81            with open(parameters_path, 'r') as f:
82                file_parameters = yaml.load(f.read())
83        except Exception as e:
84            from meerschaum.utils.warnings import warn
85            warn(f"Invalid format defined for '{self}':\n\n{e}")
86            input(f"Press [Enter] to correct the configuration for '{self}': ")
87        else:
88            editing = False
89
90    self.parameters = file_parameters
91
92    if debug:
93        from meerschaum.utils.formatting import pprint
94        pprint(self.parameters)
95
96    with Venv(get_connector_plugin(self.instance_connector)):
97        return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw)

Edit a Pipe's configuration.

Parameters
  • patch (bool, default False): If patch is True, update parameters by cascading rather than overwriting.
  • interactive (bool, default False): If True, open an editor for the user to make changes to the pipe's YAML file.
  • debug (bool, default False): Verbosity toggle.
Returns
def edit_definition( self, yes: bool = False, noask: bool = False, force: bool = False, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
100def edit_definition(
101    self,
102    yes: bool = False,
103    noask: bool = False,
104    force: bool = False,
105    debug : bool = False,
106    **kw : Any
107) -> SuccessTuple:
108    """
109    Edit a pipe's definition file and update its configuration.
110    **NOTE:** This function is interactive and should not be used in automated scripts!
111
112    Returns
113    -------
114    A `SuccessTuple` of success, message.
115
116    """
117    if self.temporary:
118        return False, "Cannot edit pipes created with `temporary=True` (read-only)."
119
120    from meerschaum.connectors import instance_types
121    if (self.connector is None) or self.connector.type not in instance_types:
122        return self.edit(interactive=True, debug=debug, **kw)
123
124    import json
125    from meerschaum.utils.warnings import info, warn
126    from meerschaum.utils.debug import dprint
127    from meerschaum.config._patch import apply_patch_to_config
128    from meerschaum.utils.misc import edit_file
129
130    _parameters = self.parameters
131    if 'fetch' not in _parameters:
132        _parameters['fetch'] = {}
133
134    def _edit_api():
135        from meerschaum.utils.prompt import prompt, yes_no
136        info(
137            f"Please enter the keys of the source pipe from '{self.connector}'.\n" +
138            "Type 'None' for None, or empty when there is no default. Press [CTRL+C] to skip."
139        )
140
141        _keys = { 'connector_keys' : None, 'metric_key' : None, 'location_key' : None }
142        for k in _keys:
143            _keys[k] = _parameters['fetch'].get(k, None)
144
145        for k, v in _keys.items():
146            try:
147                _keys[k] = prompt(k.capitalize().replace('_', ' ') + ':', icon=True, default=v)
148            except KeyboardInterrupt:
149                continue
150            if _keys[k] in ('', 'None', '\'None\'', '[None]'):
151                _keys[k] = None
152
153        _parameters['fetch'] = apply_patch_to_config(_parameters['fetch'], _keys)
154
155        info("You may optionally specify additional filter parameters as JSON.")
156        print("  Parameters are translated into a 'WHERE x AND y' clause, and lists are IN clauses.")
157        print("  For example, the following JSON would correspond to 'WHERE x = 1 AND y IN (2, 3)':")
158        print(json.dumps({'x': 1, 'y': [2, 3]}, indent=2, separators=(',', ': ')))
159        if force or yes_no(
160            "Would you like to add additional filter parameters?",
161            yes=yes, noask=noask
162        ):
163            from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
164            definition_filename = str(self) + '.json'
165            definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename
166            try:
167                definition_path.touch()
168                with open(definition_path, 'w+') as f:
169                    json.dump(_parameters.get('fetch', {}).get('params', {}), f, indent=2)
170            except Exception as e:
171                return False, f"Failed writing file '{definition_path}':\n" + str(e)
172
173            _params = None
174            while True:
175                edit_file(definition_path)
176                try:
177                    with open(definition_path, 'r') as f:
178                        _params = json.load(f)
179                except Exception as e:
180                    warn(f'Failed to read parameters JSON:\n{e}', stack=False)
181                    if force or yes_no(
182                        "Would you like to try again?\n  "
183                        + "If not, the parameters JSON file will be ignored.",
184                        noask=noask, yes=yes
185                    ):
186                        continue
187                    _params = None
188                break
189            if _params is not None:
190                if 'fetch' not in _parameters:
191                    _parameters['fetch'] = {}
192                _parameters['fetch']['params'] = _params
193
194        self.parameters = _parameters
195        return True, "Success"
196
197    def _edit_sql():
198        import pathlib, os, textwrap
199        from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
200        from meerschaum.utils.misc import edit_file
201        definition_filename = str(self) + '.sql'
202        definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename
203
204        sql_definition = _parameters['fetch'].get('definition', None)
205        if sql_definition is None:
206            sql_definition = ''
207        sql_definition = textwrap.dedent(sql_definition).lstrip()
208
209        try:
210            definition_path.touch()
211            with open(definition_path, 'w+') as f:
212                f.write(sql_definition)
213        except Exception as e:
214            return False, f"Failed writing file '{definition_path}':\n" + str(e)
215
216        edit_file(definition_path)
217        try:
218            with open(definition_path, 'r') as f:
219                file_definition = f.read()
220        except Exception as e:
221            return False, f"Failed reading file '{definition_path}':\n" + str(e)
222
223        if sql_definition == file_definition:
224            return False, f"No changes made to definition for {self}."
225
226        if ' ' not in file_definition:
227            return False, f"Invalid SQL definition for {self}."
228
229        if debug:
230            dprint("Read SQL definition:\n\n" + file_definition)
231        _parameters['fetch']['definition'] = file_definition
232        self.parameters = _parameters
233        return True, "Success"
234
235    locals()['_edit_' + str(self.connector.type)]()
236    return self.edit(interactive=False, debug=debug, **kw)

Edit a pipe's definition file and update its configuration. NOTE: This function is interactive and should not be used in automated scripts!

Returns
def update(self, *args, **kw) -> Tuple[bool, str]:
13def update(self, *args, **kw) -> SuccessTuple:
14    """
15    Update a pipe's parameters in its instance.
16    """
17    kw['interactive'] = False
18    return self.edit(*args, **kw)

Update a pipe's parameters in its instance.

def sync( self, df: Union[pandas.core.frame.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]], meerschaum.core.Pipe._sync.InferFetch] = <class 'meerschaum.core.Pipe._sync.InferFetch'>, begin: Union[datetime.datetime, int, str, NoneType] = '', end: Union[datetime.datetime, int, NoneType] = None, force: bool = False, retries: int = 10, min_seconds: int = 1, check_existing: bool = True, blocking: bool = True, workers: Optional[int] = None, callback: Optional[Callable[[Tuple[bool, str]], Any]] = None, error_callback: Optional[Callable[[Exception], Any]] = None, chunksize: Optional[int] = -1, sync_chunks: bool = True, debug: bool = False, _inplace: bool = True, **kw: Any) -> Tuple[bool, str]:
 40def sync(
 41    self,
 42    df: Union[
 43        pd.DataFrame,
 44        Dict[str, List[Any]],
 45        List[Dict[str, Any]],
 46        InferFetch
 47    ] = InferFetch,
 48    begin: Union[datetime, int, str, None] = '',
 49    end: Union[datetime, int, None] = None,
 50    force: bool = False,
 51    retries: int = 10,
 52    min_seconds: int = 1,
 53    check_existing: bool = True,
 54    blocking: bool = True,
 55    workers: Optional[int] = None,
 56    callback: Optional[Callable[[Tuple[bool, str]], Any]] = None,
 57    error_callback: Optional[Callable[[Exception], Any]] = None,
 58    chunksize: Optional[int] = -1,
 59    sync_chunks: bool = True,
 60    debug: bool = False,
 61    _inplace: bool = True,
 62    **kw: Any
 63) -> SuccessTuple:
 64    """
 65    Fetch new data from the source and update the pipe's table with new data.
 66    
 67    Get new remote data via fetch, get existing data in the same time period,
 68    and merge the two, only keeping the unseen data.
 69
 70    Parameters
 71    ----------
 72    df: Union[None, pd.DataFrame, Dict[str, List[Any]]], default None
 73        An optional DataFrame to sync into the pipe. Defaults to `None`.
 74
 75    begin: Union[datetime, int, str, None], default ''
 76        Optionally specify the earliest datetime to search for data.
 77
 78    end: Union[datetime, int, str, None], default None
 79        Optionally specify the latest datetime to search for data.
 80
 81    force: bool, default False
 82        If `True`, keep trying to sync untul `retries` attempts.
 83
 84    retries: int, default 10
 85        If `force`, how many attempts to try syncing before declaring failure.
 86
 87    min_seconds: Union[int, float], default 1
 88        If `force`, how many seconds to sleep between retries. Defaults to `1`.
 89
 90    check_existing: bool, default True
 91        If `True`, pull and diff with existing data from the pipe.
 92
 93    blocking: bool, default True
 94        If `True`, wait for sync to finish and return its result, otherwise
 95        asyncronously sync (oxymoron?) and return success. Defaults to `True`.
 96        Only intended for specific scenarios.
 97
 98    workers: Optional[int], default None
 99        If provided and the instance connector is thread-safe
100        (`pipe.instance_connector.IS_THREAD_SAFE is True`),
101        limit concurrent sync to this many threads.
102
103    callback: Optional[Callable[[Tuple[bool, str]], Any]], default None
104        Callback function which expects a SuccessTuple as input.
105        Only applies when `blocking=False`.
106
107    error_callback: Optional[Callable[[Exception], Any]], default None
108        Callback function which expects an Exception as input.
109        Only applies when `blocking=False`.
110
111    chunksize: int, default -1
112        Specify the number of rows to sync per chunk.
113        If `-1`, resort to system configuration (default is `900`).
114        A `chunksize` of `None` will sync all rows in one transaction.
115
116    sync_chunks: bool, default True
117        If possible, sync chunks while fetching them into memory.
118
119    debug: bool, default False
120        Verbosity toggle. Defaults to False.
121
122    Returns
123    -------
124    A `SuccessTuple` of success (`bool`) and message (`str`).
125    """
126    from meerschaum.utils.debug import dprint, _checkpoint
127    from meerschaum.connectors import custom_types
128    from meerschaum.plugins import Plugin
129    from meerschaum.utils.formatting import get_console
130    from meerschaum.utils.venv import Venv
131    from meerschaum.connectors import get_connector_plugin
132    from meerschaum.utils.misc import df_is_chunk_generator, filter_keywords, filter_arguments
133    from meerschaum.utils.pool import get_pool
134    from meerschaum.config import get_config
135
136    if (callback is not None or error_callback is not None) and blocking:
137        warn("Callback functions are only executed when blocking = False. Ignoring...")
138
139    _checkpoint(_total=2, **kw)
140
141    if chunksize == 0:
142        chunksize = None
143        sync_chunks = False
144
145    begin, end = self.parse_date_bounds(begin, end)
146    kw.update({
147        'begin': begin,
148        'end': end,
149        'force': force,
150        'retries': retries,
151        'min_seconds': min_seconds,
152        'check_existing': check_existing,
153        'blocking': blocking,
154        'workers': workers,
155        'callback': callback,
156        'error_callback': error_callback,
157        'sync_chunks': sync_chunks,
158        'chunksize': chunksize,
159    })
160
161    ### NOTE: Invalidate `_exists` cache before and after syncing.
162    self._exists = None
163
164    def _sync(
165        p: 'meerschaum.Pipe',
166        df: Union[
167            'pd.DataFrame',
168            Dict[str, List[Any]],
169            List[Dict[str, Any]],
170            InferFetch
171        ] = InferFetch,
172    ) -> SuccessTuple:
173        if df is None:
174            p._exists = None
175            return (
176                False,
177                f"You passed `None` instead of data into `sync()` for {p}.\n"
178                + "Omit the DataFrame to infer fetching.",
179            )
180        ### Ensure that Pipe is registered.
181        if not p.temporary and p.get_id(debug=debug) is None:
182            ### NOTE: This may trigger an interactive session for plugins!
183            register_success, register_msg = p.register(debug=debug)
184            if not register_success:
185                if 'already' not in register_msg:
186                    p._exists = None
187                    return register_success, register_msg
188
189        ### If connector is a plugin with a `sync()` method, return that instead.
190        ### If the plugin does not have a `sync()` method but does have a `fetch()` method,
191        ### use that instead.
192        ### NOTE: The DataFrame must be omitted for the plugin sync method to apply.
193        ### If a DataFrame is provided, continue as expected.
194        if hasattr(df, 'MRSM_INFER_FETCH'):
195            try:
196                if p.connector is None:
197                    if ':' not in p.connector_keys:
198                        return True, f"{p} does not support fetching; nothing to do."
199
200                    msg = f"{p} does not have a valid connector."
201                    if p.connector_keys.startswith('plugin:'):
202                        msg += f"\n    Perhaps {p.connector_keys} has a syntax error?"
203                    p._exists = None
204                    return False, msg
205            except Exception:
206                p._exists = None
207                return False, f"Unable to create the connector for {p}."
208
209            ### Sync in place if this is a SQL pipe.
210            if (
211                str(self.connector) == str(self.instance_connector)
212                and 
213                hasattr(self.instance_connector, 'sync_pipe_inplace')
214                and
215                _inplace
216                and
217                get_config('system', 'experimental', 'inplace_sync')
218            ):
219                with Venv(get_connector_plugin(self.instance_connector)):
220                    p._exists = None
221                    _args, _kwargs = filter_arguments(
222                        p.instance_connector.sync_pipe_inplace,
223                        p,
224                        debug=debug,
225                        **kw
226                    )
227                    return self.instance_connector.sync_pipe_inplace(
228                        *_args,
229                        **_kwargs
230                    )
231
232            ### Activate and invoke `sync(pipe)` for plugin connectors with `sync` methods.
233            try:
234                if getattr(p.connector, 'sync', None) is not None:
235                    with Venv(get_connector_plugin(p.connector), debug=debug):
236                        _args, _kwargs = filter_arguments(
237                            p.connector.sync,
238                            p,
239                            debug=debug,
240                            **kw
241                        )
242                        return_tuple = p.connector.sync(*_args, **_kwargs)
243                    p._exists = None
244                    if not isinstance(return_tuple, tuple):
245                        return_tuple = (
246                            False,
247                            f"Plugin '{p.connector.label}' returned non-tuple value: {return_tuple}"
248                        )
249                    return return_tuple
250
251            except Exception as e:
252                get_console().print_exception()
253                msg = f"Failed to sync {p} with exception: '" + str(e) + "'"
254                if debug:
255                    error(msg, silent=False)
256                p._exists = None
257                return False, msg
258
259            ### Fetch the dataframe from the connector's `fetch()` method.
260            try:
261                with Venv(get_connector_plugin(p.connector), debug=debug):
262                    df = p.fetch(
263                        **filter_keywords(
264                            p.fetch,
265                            debug=debug,
266                            **kw
267                        )
268                    )
269            except Exception as e:
270                get_console().print_exception(
271                    suppress=[
272                        'meerschaum/core/Pipe/_sync.py',
273                        'meerschaum/core/Pipe/_fetch.py',
274                    ]
275                )
276                msg = f"Failed to fetch data from {p.connector}:\n    {e}"
277                df = None
278
279            if df is None:
280                p._exists = None
281                return False, f"No data were fetched for {p}."
282
283            if isinstance(df, list):
284                if len(df) == 0:
285                    return True, f"No new rows were returned for {p}."
286
287                ### May be a chunk hook results list.
288                if isinstance(df[0], tuple):
289                    success = all([_success for _success, _ in df])
290                    message = '\n'.join([_message for _, _message in df])
291                    return success, message
292
293            ### TODO: Depreciate async?
294            if df is True:
295                p._exists = None
296                return True, f"{p} is being synced in parallel."
297
298        ### CHECKPOINT: Retrieved the DataFrame.
299        _checkpoint(**kw)
300
301        ### Allow for dataframe generators or iterables.
302        if df_is_chunk_generator(df):
303            kw['workers'] = p.get_num_workers(kw.get('workers', None))
304            dt_col = p.columns.get('datetime', None)
305            pool = get_pool(workers=kw.get('workers', 1))
306            if debug:
307                dprint(f"Received {type(df)}. Attempting to sync first chunk...")
308
309            try:
310                chunk = next(df)
311            except StopIteration:
312                return True, "Received an empty generator; nothing to do."
313
314            chunk_success, chunk_msg = _sync(p, chunk)
315            chunk_msg = '\n' + self._get_chunk_label(chunk, dt_col) + '\n' + chunk_msg
316            if not chunk_success:
317                return chunk_success, f"Unable to sync initial chunk for {p}:\n{chunk_msg}"
318            if debug:
319                dprint("Successfully synced the first chunk, attemping the rest...")
320
321            failed_chunks = []
322            def _process_chunk(_chunk):
323                try:
324                    _chunk_success, _chunk_msg = _sync(p, _chunk)
325                except Exception as e:
326                    _chunk_success, _chunk_msg = False, str(e)
327                if not _chunk_success:
328                    failed_chunks.append(_chunk)
329                return (
330                    _chunk_success,
331                    (
332                        '\n'
333                        + self._get_chunk_label(_chunk, dt_col)
334                        + '\n'
335                        + _chunk_msg
336                    )
337                )
338
339            results = sorted(
340                [(chunk_success, chunk_msg)] + (
341                    list(pool.imap(_process_chunk, df))
342                    if not df_is_chunk_generator(chunk)
343                    else [
344                        _process_chunk(_child_chunks)
345                        for _child_chunks in df
346                    ]
347                )
348            )
349            chunk_messages = [chunk_msg for _, chunk_msg in results]
350            success_bools = [chunk_success for chunk_success, _ in results]
351            success = all(success_bools)
352            msg = '\n'.join(chunk_messages)
353
354            ### If some chunks succeeded, retry the failures.
355            retry_success = True
356            if not success and any(success_bools):
357                if debug:
358                    dprint("Retrying failed chunks...")
359                chunks_to_retry = [c for c in failed_chunks]
360                failed_chunks = []
361                for chunk in chunks_to_retry:
362                    chunk_success, chunk_msg = _process_chunk(chunk)
363                    msg += f"\n\nRetried chunk:\n{chunk_msg}\n"
364                    retry_success = retry_success and chunk_success
365
366            success = success and retry_success
367            return success, msg
368
369        ### Cast to a dataframe and ensure datatypes are what we expect.
370        df = self.enforce_dtypes(df, chunksize=chunksize, debug=debug)
371
372        ### Capture `numeric`, `uuid`, and `json` columns.
373        self._persist_new_json_columns(df, debug=debug)
374        self._persist_new_numeric_columns(df, debug=debug)
375        self._persist_new_uuid_columns(df, debug=debug)
376
377        if debug:
378            dprint(
379                "DataFrame to sync:\n"
380                + (
381                    str(df)[:255]
382                    + '...'
383                    if len(str(df)) >= 256
384                    else str(df)
385                ),
386                **kw
387            )
388
389        ### if force, continue to sync until success
390        return_tuple = False, f"Did not sync {p}."
391        run = True
392        _retries = 1
393        while run:
394            with Venv(get_connector_plugin(self.instance_connector)):
395                return_tuple = p.instance_connector.sync_pipe(
396                    pipe=p,
397                    df=df,
398                    debug=debug,
399                    **kw
400                )
401            _retries += 1
402            run = (not return_tuple[0]) and force and _retries <= retries
403            if run and debug:
404                dprint(f"Syncing failed for {p}. Attempt ( {_retries} / {retries} )", **kw)
405                dprint(f"Sleeping for {min_seconds} seconds...", **kw)
406                time.sleep(min_seconds)
407            if _retries > retries:
408                warn(
409                    f"Unable to sync {p} within {retries} attempt" +
410                        ("s" if retries != 1 else "") + "!"
411                )
412
413        ### CHECKPOINT: Finished syncing. Handle caching.
414        _checkpoint(**kw)
415        if self.cache_pipe is not None:
416            if debug:
417                dprint("Caching retrieved dataframe.", **kw)
418                _sync_cache_tuple = self.cache_pipe.sync(df, debug=debug, **kw)
419                if not _sync_cache_tuple[0]:
420                    warn(f"Failed to sync local cache for {self}.")
421
422        self._exists = None
423        return return_tuple
424
425    if blocking:
426        self._exists = None
427        return _sync(self, df = df)
428
429    from meerschaum.utils.threading import Thread
430    def default_callback(result_tuple: SuccessTuple):
431        dprint(f"Asynchronous result from {self}: {result_tuple}", **kw)
432
433    def default_error_callback(x: Exception):
434        dprint(f"Error received for {self}: {x}", **kw)
435
436    if callback is None and debug:
437        callback = default_callback
438    if error_callback is None and debug:
439        error_callback = default_error_callback
440    try:
441        thread = Thread(
442            target=_sync,
443            args=(self,),
444            kwargs={'df': df},
445            daemon=False,
446            callback=callback,
447            error_callback=error_callback,
448        )
449        thread.start()
450    except Exception as e:
451        self._exists = None
452        return False, str(e)
453
454    self._exists = None
455    return True, f"Spawned asyncronous sync for {self}."

Fetch new data from the source and update the pipe's table with new data.

Get new remote data via fetch, get existing data in the same time period, and merge the two, only keeping the unseen data.

Parameters
  • df (Union[None, pd.DataFrame, Dict[str, List[Any]]], default None): An optional DataFrame to sync into the pipe. Defaults to None.
  • begin (Union[datetime, int, str, None], default ''): Optionally specify the earliest datetime to search for data.
  • end (Union[datetime, int, str, None], default None): Optionally specify the latest datetime to search for data.
  • force (bool, default False): If True, keep trying to sync untul retries attempts.
  • retries (int, default 10): If force, how many attempts to try syncing before declaring failure.
  • min_seconds (Union[int, float], default 1): If force, how many seconds to sleep between retries. Defaults to 1.
  • check_existing (bool, default True): If True, pull and diff with existing data from the pipe.
  • blocking (bool, default True): If True, wait for sync to finish and return its result, otherwise asyncronously sync (oxymoron?) and return success. Defaults to True. Only intended for specific scenarios.
  • workers (Optional[int], default None): If provided and the instance connector is thread-safe (pipe.instance_connector.IS_THREAD_SAFE is True), limit concurrent sync to this many threads.
  • callback (Optional[Callable[[Tuple[bool, str]], Any]], default None): Callback function which expects a SuccessTuple as input. Only applies when blocking=False.
  • error_callback (Optional[Callable[[Exception], Any]], default None): Callback function which expects an Exception as input. Only applies when blocking=False.
  • chunksize (int, default -1): Specify the number of rows to sync per chunk. If -1, resort to system configuration (default is 900). A chunksize of None will sync all rows in one transaction.
  • sync_chunks (bool, default True): If possible, sync chunks while fetching them into memory.
  • debug (bool, default False): Verbosity toggle. Defaults to False.
Returns
def get_sync_time( self, params: Optional[Dict[str, Any]] = None, newest: bool = True, apply_backtrack_interval: bool = False, round_down: bool = False, debug: bool = False) -> Union[datetime.datetime, int, NoneType]:
458def get_sync_time(
459    self,
460    params: Optional[Dict[str, Any]] = None,
461    newest: bool = True,
462    apply_backtrack_interval: bool = False,
463    round_down: bool = False,
464    debug: bool = False
465) -> Union['datetime', int, None]:
466    """
467    Get the most recent datetime value for a Pipe.
468
469    Parameters
470    ----------
471    params: Optional[Dict[str, Any]], default None
472        Dictionary to build a WHERE clause for a specific column.
473        See `meerschaum.utils.sql.build_where`.
474
475    newest: bool, default True
476        If `True`, get the most recent datetime (honoring `params`).
477        If `False`, get the oldest datetime (`ASC` instead of `DESC`).
478
479    apply_backtrack_interval: bool, default False
480        If `True`, subtract the backtrack interval from the sync time.
481
482    round_down: bool, default False
483        If `True`, round down the datetime value to the nearest minute.
484
485    debug: bool, default False
486        Verbosity toggle.
487
488    Returns
489    -------
490    A `datetime` or int, if the pipe exists, otherwise `None`.
491
492    """
493    from meerschaum.utils.venv import Venv
494    from meerschaum.connectors import get_connector_plugin
495    from meerschaum.utils.misc import round_time
496
497    if not self.columns.get('datetime', None):
498        return None
499
500    with Venv(get_connector_plugin(self.instance_connector)):
501        sync_time = self.instance_connector.get_sync_time(
502            self,
503            params=params,
504            newest=newest,
505            debug=debug,
506        )
507
508    if round_down and isinstance(sync_time, datetime):
509        sync_time = round_time(sync_time, timedelta(minutes=1))
510
511    if apply_backtrack_interval and sync_time is not None:
512        backtrack_interval = self.get_backtrack_interval(debug=debug)
513        try:
514            sync_time -= backtrack_interval
515        except Exception as e:
516            warn(f"Failed to apply backtrack interval:\n{e}")
517
518    return self.parse_date_bounds(sync_time)

Get the most recent datetime value for a Pipe.

Parameters
  • params (Optional[Dict[str, Any]], default None): Dictionary to build a WHERE clause for a specific column. See meerschaum.utils.sql.build_where.
  • newest (bool, default True): If True, get the most recent datetime (honoring params). If False, get the oldest datetime (ASC instead of DESC).
  • apply_backtrack_interval (bool, default False): If True, subtract the backtrack interval from the sync time.
  • round_down (bool, default False): If True, round down the datetime value to the nearest minute.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A datetime or int, if the pipe exists, otherwise None.
def exists(self, debug: bool = False) -> bool:
521def exists(
522    self,
523    debug: bool = False
524) -> bool:
525    """
526    See if a Pipe's table exists.
527
528    Parameters
529    ----------
530    debug: bool, default False
531        Verbosity toggle.
532
533    Returns
534    -------
535    A `bool` corresponding to whether a pipe's underlying table exists.
536
537    """
538    import time
539    from meerschaum.utils.venv import Venv
540    from meerschaum.connectors import get_connector_plugin
541    from meerschaum.config import STATIC_CONFIG
542    from meerschaum.utils.debug import dprint
543    now = time.perf_counter()
544    exists_timeout_seconds = STATIC_CONFIG['pipes']['exists_timeout_seconds']
545
546    _exists = self.__dict__.get('_exists', None)
547    if _exists:
548        exists_timestamp = self.__dict__.get('_exists_timestamp', None)
549        if exists_timestamp is not None:
550            delta = now - exists_timestamp
551            if delta < exists_timeout_seconds:
552                if debug:
553                    dprint(f"Returning cached `exists` for {self} ({round(delta, 2)} seconds old).")
554                return _exists
555
556    with Venv(get_connector_plugin(self.instance_connector)):
557        _exists = (
558            self.instance_connector.pipe_exists(pipe=self, debug=debug)
559            if hasattr(self.instance_connector, 'pipe_exists')
560            else False
561        )
562
563    self.__dict__['_exists'] = _exists
564    self.__dict__['_exists_timestamp'] = now
565    return _exists

See if a Pipe's table exists.

Parameters
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool corresponding to whether a pipe's underlying table exists.
def filter_existing( self, df: pandas.core.frame.DataFrame, safe_copy: bool = True, date_bound_only: bool = False, include_unchanged_columns: bool = False, enforce_dtypes: bool = False, chunksize: Optional[int] = -1, debug: bool = False, **kw) -> Tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame, pandas.core.frame.DataFrame]:
568def filter_existing(
569    self,
570    df: 'pd.DataFrame',
571    safe_copy: bool = True,
572    date_bound_only: bool = False,
573    include_unchanged_columns: bool = False,
574    enforce_dtypes: bool = False,
575    chunksize: Optional[int] = -1,
576    debug: bool = False,
577    **kw
578) -> Tuple['pd.DataFrame', 'pd.DataFrame', 'pd.DataFrame']:
579    """
580    Inspect a dataframe and filter out rows which already exist in the pipe.
581
582    Parameters
583    ----------
584    df: 'pd.DataFrame'
585        The dataframe to inspect and filter.
586
587    safe_copy: bool, default True
588        If `True`, create a copy before comparing and modifying the dataframes.
589        Setting to `False` may mutate the DataFrames.
590        See `meerschaum.utils.dataframe.filter_unseen_df`.
591
592    date_bound_only: bool, default False
593        If `True`, only use the datetime index to fetch the sample dataframe.
594
595    include_unchanged_columns: bool, default False
596        If `True`, include the backtrack columns which haven't changed in the update dataframe.
597        This is useful if you can't update individual keys.
598
599    enforce_dtypes: bool, default False
600        If `True`, ensure the given and intermediate dataframes are enforced to the correct dtypes.
601        Setting `enforce_dtypes=True` may impact performance.
602
603    chunksize: Optional[int], default -1
604        The `chunksize` used when fetching existing data.
605
606    debug: bool, default False
607        Verbosity toggle.
608
609    Returns
610    -------
611    A tuple of three pandas DataFrames: unseen, update, and delta.
612    """
613    from meerschaum.utils.warnings import warn
614    from meerschaum.utils.debug import dprint
615    from meerschaum.utils.packages import attempt_import, import_pandas
616    from meerschaum.utils.misc import round_time
617    from meerschaum.utils.dataframe import (
618        filter_unseen_df,
619        add_missing_cols_to_df,
620        get_unhashable_cols,
621        get_numeric_cols,
622    )
623    from meerschaum.utils.dtypes import (
624        to_pandas_dtype,
625        none_if_null,
626    )
627    from meerschaum.config import get_config
628    pd = import_pandas()
629    pandas = attempt_import('pandas')
630    if enforce_dtypes or 'dataframe' not in str(type(df)).lower():
631        df = self.enforce_dtypes(df, chunksize=chunksize, debug=debug)
632    is_dask = hasattr('df', '__module__') and 'dask' in df.__module__
633    if is_dask:
634        dd = attempt_import('dask.dataframe')
635        merge = dd.merge
636        NA = pandas.NA