meerschaum

Meerschaum banner

Meerschaum Python API

Welcome to the Meerschaum Python API technical documentation! Here you can find information about the classes and functions provided by the meerschaum package. Visit meerschaum.io for general usage documentation.

Root Module

For your convenience, the following classes and functions may be imported from the root meerschaum namespace:

Examples

Build a Connector

Get existing connectors or build a new one in-memory with the meerschaum.get_connector() factory function:

import meerschaum as mrsm

sql_conn = mrsm.get_connector(
    'sql:temp',
    flavor='sqlite',
    database='/tmp/tmp.db',
)
df = sql_conn.read("SELECT 1 AS foo")
print(df)
#    foo
# 0    1

sql_conn.to_sql(df, 'foo')
print(sql_conn.read('foo'))
#    foo
# 0    1

Create a Custom Connector Class

Decorate your connector classes with meerschaum.make_connector() to designate it as a custom connector:

from datetime import datetime, timezone
from random import randint
import meerschaum as mrsm
from meerschaum.utils.misc import round_time

@mrsm.make_connector
class FooConnector(mrsm.Connector):
    REQUIRED_ATTRIBUTES = ['username', 'password']

    def fetch(
        self,
        begin: datetime | None = None,
        end: datetime | None = None,
    ):
        now = begin or round_time(datetime.now(timezone.utc))
        return [
            {'ts': now, 'id': 1, 'vl': randint(1, 100)},
            {'ts': now, 'id': 2, 'vl': randint(1, 100)},
            {'ts': now, 'id': 3, 'vl': randint(1, 100)},
        ]

foo_conn = mrsm.get_connector(
    'foo:bar',
    username='foo',
    password='bar',
)
docs = foo_conn.fetch()

Build a Pipe

Build a meerschaum.Pipe in-memory:

from datetime import datetime
import meerschaum as mrsm

pipe = mrsm.Pipe(
    foo_conn, 'demo',
    instance=sql_conn,
    columns={'datetime': 'ts', 'id': 'id'},
    tags=['production'],
)
pipe.sync(begin=datetime(2024, 1, 1))
df = pipe.get_data()
print(df)
#           ts  id  vl
# 0 2024-01-01   1  97
# 1 2024-01-01   2  18
# 2 2024-01-01   3  96

Add temporary=True to skip registering the pipe in the pipes table.

Get Registered Pipes

The meerschaum.get_pipes() function returns a dictionary hierarchy of pipes by connector, metric, and location:

import meerschaum as mrsm

pipes = mrsm.get_pipes(instance='sql:temp')
pipe = pipes['foo:bar']['demo'][None]

Add as_list=True to flatten the hierarchy:

import meerschaum as mrsm

pipes = mrsm.get_pipes(
    tags=['production'],
    instance=sql_conn,
    as_list=True,
)
print(pipes)
# [Pipe('foo:bar', 'demo', instance='sql:temp')]

Import Plugins

You can import a plugin's module through meerschaum.Plugin.module:

import meerschaum as mrsm

plugin = mrsm.Plugin('noaa')
with mrsm.Venv(plugin):
    noaa = plugin.module

If your plugin has submodules, use meerschaum.plugins.from_plugin_import:

from meerschaum.plugins import from_plugin_import
get_defined_pipes = from_plugin_import('compose.utils.pipes', 'get_defined_pipes')

Import multiple plugins with meerschaum.plugins.import_plugins:

from meerschaum.plugins import import_plugins
noaa, compose = import_plugins('noaa', 'compose')

Create a Job

Create a meerschaum.Job with name and sysargs:

import meerschaum as mrsm

job = mrsm.Job('syncing-engine', 'sync pipes --loop')
success, msg = job.start()

Pass executor_keys as the connectors keys of an API instance to create a remote job:

import meerschaum as mrsm

job = mrsm.Job(
    'foo',
    'sync pipes -s daily',
    executor_keys='api:main',
)

Import from a Virtual Environment Use the meerschaum.Venv context manager to activate a virtual environment:

import meerschaum as mrsm

with mrsm.Venv('noaa'):
    import requests

print(requests.__file__)
# /home/bmeares/.config/meerschaum/venvs/noaa/lib/python3.12/site-packages/requests/__init__.py

To import packages which may not be installed, use meerschaum.attempt_import():

import meerschaum as mrsm

requests = mrsm.attempt_import('requests', venv='noaa')
print(requests.__file__)
# /home/bmeares/.config/meerschaum/venvs/noaa/lib/python3.12/site-packages/requests/__init__.py

Run Actions

Run sysargs with meerschaum.entry():

import meerschaum as mrsm

success, msg = mrsm.entry('show pipes + show version : x2')

Use meerschaum.actions.get_action() to access an action function directly:

from meerschaum.actions import get_action

show_pipes = get_action(['show', 'pipes'])
success, msg = show_pipes(connector_keys=['plugin:noaa'])

Get a dictionary of available subactions with meerschaum.actions.get_subactions():

from meerschaum.actions import get_subactions

subactions = get_subactions('show')
success, msg = subactions['pipes']()

Create a Plugin

Run bootstrap plugin to create a new plugin:

mrsm bootstrap plugin example

This will create example.py in your plugins directory (default ~/.config/meerschaum/plugins/, Windows: %APPDATA%\Meerschaum\plugins). You may paste the example code from the "Create a Custom Action" example below.

Open your plugin with edit plugin:

mrsm edit plugin example

Run edit plugin and paste the example code below to try out the features.

See the writing plugins guide for more in-depth documentation.

Create a Custom Action

Decorate a function with meerschaum.actions.make_action to designate it as an action. Subactions will be automatically detected if not decorated:

from meerschaum.actions import make_action

@make_action
def sing():
    print('What would you like me to sing?')
    return True, "Success"

def sing_tune():
    return False, "I don't know that song!"

def sing_song():
    print('Hello, World!')
    return True, "Success"

Use meerschaum.plugins.add_plugin_argument() to create new parameters for your action:

from meerschaum.plugins import make_action, add_plugin_argument

add_plugin_argument(
    '--song', type=str, help='What song to sing.',
)

@make_action
def sing_melody(action=None, song=None):
    to_sing = action[0] if action else song
    if not to_sing:
        return False, "Please tell me what to sing!"

    return True, f'~I am singing {to_sing}~'
mrsm sing melody lalala

mrsm sing melody --song do-re-mi

Add a Page to the Web Dashboard Use the decorators meerschaum.plugins.dash_plugin() and meerschaum.plugins.web_page() to add new pages to the web dashboard:

from meerschaum.plugins import dash_plugin, web_page

@dash_plugin
def init_dash(dash_app):

    import dash.html as html
    import dash_bootstrap_components as dbc
    from dash import Input, Output, no_update

    ### Routes to '/dash/my-page'
    @web_page('/my-page', login_required=False)
    def my_page():
        return dbc.Container([
            html.H1("Hello, World!"),
            dbc.Button("Click me", id='my-button'),
            html.Div(id="my-output-div"),
        ])

    @dash_app.callback(
        Output('my-output-div', 'children'),
        Input('my-button', 'n_clicks'),
    )
    def my_button_click(n_clicks):
        if not n_clicks:
            return no_update
        return html.P(f'You clicked {n_clicks} times!')

Submodules

meerschaum.actions
Access functions for actions and subactions.

meerschaum.config
Read and write the Meerschaum configuration registry.

meerschaum.connectors
Build connectors to interact with databases and fetch data.

meerschaum.jobs
Start background jobs.

meerschaum.plugins
Access plugin modules and other API utilties.

meerschaum.utils
Utility functions are available in several submodules:

 1#! /usr/bin/env python
 2# -*- coding: utf-8 -*-
 3# vim:fenc=utf-8
 4
 5"""
 6Copyright 2023 Bennett Meares
 7
 8Licensed under the Apache License, Version 2.0 (the "License");
 9you may not use this file except in compliance with the License.
10You may obtain a copy of the License at
11
12   http://www.apache.org/licenses/LICENSE-2.0
13
14Unless required by applicable law or agreed to in writing, software
15distributed under the License is distributed on an "AS IS" BASIS,
16WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17See the License for the specific language governing permissions and
18limitations under the License.
19"""
20
21import atexit
22from meerschaum.utils.typing import SuccessTuple
23from meerschaum.utils.packages import attempt_import
24from meerschaum.core.Pipe import Pipe
25from meerschaum.plugins import Plugin
26from meerschaum.utils.venv import Venv
27from meerschaum.jobs import Job, make_executor
28from meerschaum.connectors import get_connector, Connector, make_connector
29from meerschaum.utils import get_pipes
30from meerschaum.utils.formatting import pprint
31from meerschaum._internal.docs import index as __doc__
32from meerschaum.config import __version__, get_config
33from meerschaum._internal.entry import entry
34from meerschaum.__main__ import _close_pools
35
36atexit.register(_close_pools)
37
38__pdoc__ = {'gui': False, 'api': False, 'core': False, '_internal': False}
39__all__ = (
40    "get_pipes",
41    "get_connector",
42    "get_config",
43    "Pipe",
44    "Plugin",
45    "Venv",
46    "Plugin",
47    "Job",
48    "pprint",
49    "attempt_import",
50    "actions",
51    "config",
52    "connectors",
53    "jobs",
54    "plugins",
55    "utils",
56    "SuccessTuple",
57    "Connector",
58    "make_connector",
59    "entry",
60)
def get_pipes( connector_keys: Union[str, List[str], NoneType] = None, metric_keys: Union[str, List[str], NoneType] = None, location_keys: Union[str, List[str], NoneType] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, mrsm_instance: Union[str, meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, instance: Union[str, meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, as_list: bool = False, method: str = 'registered', debug: bool = False, **kw: Any) -> Union[Dict[str, Dict[str, Dict[str, Pipe]]], List[Pipe]]:
 19def get_pipes(
 20    connector_keys: Union[str, List[str], None] = None,
 21    metric_keys: Union[str, List[str], None] = None,
 22    location_keys: Union[str, List[str], None] = None,
 23    tags: Optional[List[str]] = None,
 24    params: Optional[Dict[str, Any]] = None,
 25    mrsm_instance: Union[str, InstanceConnector, None] = None,
 26    instance: Union[str, InstanceConnector, None] = None,
 27    as_list: bool = False,
 28    method: str = 'registered',
 29    debug: bool = False,
 30    **kw: Any
 31) -> Union[PipesDict, List[mrsm.Pipe]]:
 32    """
 33    Return a dictionary or list of `meerschaum.Pipe` objects.
 34
 35    Parameters
 36    ----------
 37    connector_keys: Union[str, List[str], None], default None
 38        String or list of connector keys.
 39        If omitted or is `'*'`, fetch all possible keys.
 40        If a string begins with `'_'`, select keys that do NOT match the string.
 41
 42    metric_keys: Union[str, List[str], None], default None
 43        String or list of metric keys. See `connector_keys` for formatting.
 44
 45    location_keys: Union[str, List[str], None], default None
 46        String or list of location keys. See `connector_keys` for formatting.
 47
 48    tags: Optional[List[str]], default None
 49         If provided, only include pipes with these tags.
 50
 51    params: Optional[Dict[str, Any]], default None
 52        Dictionary of additional parameters to search by.
 53        Params are parsed into a SQL WHERE clause.
 54        E.g. `{'a': 1, 'b': 2}` equates to `'WHERE a = 1 AND b = 2'`
 55
 56    mrsm_instance: Union[str, InstanceConnector, None], default None
 57        Connector keys for the Meerschaum instance of the pipes.
 58        Must be a `meerschaum.connectors.sql.SQLConnector.SQLConnector` or
 59        `meerschaum.connectors.api.APIConnector.APIConnector`.
 60        
 61    as_list: bool, default False
 62        If `True`, return pipes in a list instead of a hierarchical dictionary.
 63        `False` : `{connector_keys: {metric_key: {location_key: Pipe}}}`
 64        `True`  : `[Pipe]`
 65
 66    method: str, default 'registered'
 67        Available options: `['registered', 'explicit', 'all']`
 68        If `'registered'` (default), create pipes based on registered keys in the connector's pipes table
 69        (API or SQL connector, depends on mrsm_instance).
 70        If `'explicit'`, create pipes from provided connector_keys, metric_keys, and location_keys
 71        instead of consulting the pipes table. Useful for creating non-existent pipes.
 72        If `'all'`, create pipes from predefined metrics and locations. Required `connector_keys`.
 73        **NOTE:** Method `'all'` is not implemented!
 74
 75    **kw: Any:
 76        Keyword arguments to pass to the `meerschaum.Pipe` constructor.
 77        
 78
 79    Returns
 80    -------
 81    A dictionary of dictionaries and `meerschaum.Pipe` objects
 82    in the connector, metric, location hierarchy.
 83    If `as_list` is `True`, return a list of `meerschaum.Pipe` objects.
 84
 85    Examples
 86    --------
 87    ```
 88    >>> ### Manual definition:
 89    >>> pipes = {
 90    ...     <connector_keys>: {
 91    ...         <metric_key>: {
 92    ...             <location_key>: Pipe(
 93    ...                 <connector_keys>,
 94    ...                 <metric_key>,
 95    ...                 <location_key>,
 96    ...             ),
 97    ...         },
 98    ...     },
 99    ... },
100    >>> ### Accessing a single pipe:
101    >>> pipes['sql:main']['weather'][None]
102    >>> ### Return a list instead:
103    >>> get_pipes(as_list=True)
104    [sql_main_weather]
105    >>> 
106    ```
107    """
108
109    from meerschaum.config import get_config
110    from meerschaum.utils.warnings import error
111    from meerschaum.utils.misc import filter_keywords
112
113    if connector_keys is None:
114        connector_keys = []
115    if metric_keys is None:
116        metric_keys = []
117    if location_keys is None:
118        location_keys = []
119    if params is None:
120        params = {}
121    if tags is None:
122        tags = []
123
124    if isinstance(connector_keys, str):
125        connector_keys = [connector_keys]
126    if isinstance(metric_keys, str):
127        metric_keys = [metric_keys]
128    if isinstance(location_keys, str):
129        location_keys = [location_keys]
130
131    ### Get SQL or API connector (keys come from `connector.fetch_pipes_keys()`).
132    if mrsm_instance is None:
133        mrsm_instance = instance
134    if mrsm_instance is None:
135        mrsm_instance = get_config('meerschaum', 'instance', patch=True)
136    if isinstance(mrsm_instance, str):
137        from meerschaum.connectors.parse import parse_instance_keys
138        connector = parse_instance_keys(keys=mrsm_instance, debug=debug)
139    else:
140        from meerschaum.connectors import instance_types
141        valid_connector = False
142        if hasattr(mrsm_instance, 'type'):
143            if mrsm_instance.type in instance_types:
144                valid_connector = True
145        if not valid_connector:
146            error(f"Invalid instance connector: {mrsm_instance}")
147        connector = mrsm_instance
148    if debug:
149        from meerschaum.utils.debug import dprint
150        dprint(f"Using instance connector: {connector}")
151    if not connector:
152        error(f"Could not create connector from keys: '{mrsm_instance}'")
153
154    ### Get a list of tuples for the keys needed to build pipes.
155    result = fetch_pipes_keys(
156        method,
157        connector,
158        connector_keys = connector_keys,
159        metric_keys = metric_keys,
160        location_keys = location_keys,
161        tags = tags,
162        params = params,
163        debug = debug
164    )
165    if result is None:
166        error(f"Unable to build pipes!")
167
168    ### Populate the `pipes` dictionary with Pipes based on the keys
169    ### obtained from the chosen `method`.
170    from meerschaum import Pipe
171    pipes = {}
172    for ck, mk, lk in result:
173        if ck not in pipes:
174            pipes[ck] = {}
175
176        if mk not in pipes[ck]:
177            pipes[ck][mk] = {}
178
179        pipes[ck][mk][lk] = Pipe(
180            ck, mk, lk,
181            mrsm_instance = connector,
182            debug = debug,
183            **filter_keywords(Pipe, **kw)
184        )
185
186    if not as_list:
187        return pipes
188    from meerschaum.utils.misc import flatten_pipes_dict
189    return flatten_pipes_dict(pipes)

Return a dictionary or list of meerschaum.Pipe objects.

Parameters
  • connector_keys (Union[str, List[str], None], default None): String or list of connector keys. If omitted or is '*', fetch all possible keys. If a string begins with '_', select keys that do NOT match the string.
  • metric_keys (Union[str, List[str], None], default None): String or list of metric keys. See connector_keys for formatting.
  • location_keys (Union[str, List[str], None], default None): String or list of location keys. See connector_keys for formatting.
  • tags (Optional[List[str]], default None): If provided, only include pipes with these tags.
  • params (Optional[Dict[str, Any]], default None): Dictionary of additional parameters to search by. Params are parsed into a SQL WHERE clause. E.g. {'a': 1, 'b': 2} equates to 'WHERE a = 1 AND b = 2'
  • mrsm_instance (Union[str, InstanceConnector, None], default None): Connector keys for the Meerschaum instance of the pipes. Must be a meerschaum.connectors.sql.SQLConnector.SQLConnector or meerschaum.connectors.api.APIConnector.APIConnector.
  • as_list (bool, default False): If True, return pipes in a list instead of a hierarchical dictionary. False : {connector_keys: {metric_key: {location_key: Pipe}}} True : [Pipe]
  • method (str, default 'registered'): Available options: ['registered', 'explicit', 'all'] If 'registered' (default), create pipes based on registered keys in the connector's pipes table (API or SQL connector, depends on mrsm_instance). If 'explicit', create pipes from provided connector_keys, metric_keys, and location_keys instead of consulting the pipes table. Useful for creating non-existent pipes. If 'all', create pipes from predefined metrics and locations. Required connector_keys. NOTE: Method 'all' is not implemented!
  • **kw (Any:): Keyword arguments to pass to the meerschaum.Pipe constructor.
Returns
  • A dictionary of dictionaries and meerschaum.Pipe objects
  • in the connector, metric, location hierarchy.
  • If as_list is True, return a list of meerschaum.Pipe objects.
Examples
>>> ### Manual definition:
>>> pipes = {
...     <connector_keys>: {
...         <metric_key>: {
...             <location_key>: Pipe(
...                 <connector_keys>,
...                 <metric_key>,
...                 <location_key>,
...             ),
...         },
...     },
... },
>>> ### Accessing a single pipe:
>>> pipes['sql:main']['weather'][None]
>>> ### Return a list instead:
>>> get_pipes(as_list=True)
[sql_main_weather]
>>> 
def get_connector( type: str = None, label: str = None, refresh: bool = False, debug: bool = False, **kw: Any) -> Connector:
 80def get_connector(
 81    type: str = None,
 82    label: str = None,
 83    refresh: bool = False,
 84    debug: bool = False,
 85    **kw: Any
 86) -> Connector:
 87    """
 88    Return existing connector or create new connection and store for reuse.
 89    
 90    You can create new connectors if enough parameters are provided for the given type and flavor.
 91    
 92
 93    Parameters
 94    ----------
 95    type: Optional[str], default None
 96        Connector type (sql, api, etc.).
 97        Defaults to the type of the configured `instance_connector`.
 98
 99    label: Optional[str], default None
100        Connector label (e.g. main). Defaults to `'main'`.
101
102    refresh: bool, default False
103        Refresh the Connector instance / construct new object. Defaults to `False`.
104
105    kw: Any
106        Other arguments to pass to the Connector constructor.
107        If the Connector has already been constructed and new arguments are provided,
108        `refresh` is set to `True` and the old Connector is replaced.
109
110    Returns
111    -------
112    A new Meerschaum connector (e.g. `meerschaum.connectors.api.APIConnector`,
113    `meerschaum.connectors.sql.SQLConnector`).
114    
115    Examples
116    --------
117    The following parameters would create a new
118    `meerschaum.connectors.sql.SQLConnector` that isn't in the configuration file.
119
120    ```
121    >>> conn = get_connector(
122    ...     type = 'sql',
123    ...     label = 'newlabel',
124    ...     flavor = 'sqlite',
125    ...     database = '/file/path/to/database.db'
126    ... )
127    >>>
128    ```
129
130    """
131    from meerschaum.connectors.parse import parse_instance_keys
132    from meerschaum.config import get_config
133    from meerschaum.config.static import STATIC_CONFIG
134    from meerschaum.utils.warnings import warn
135    global _loaded_plugin_connectors
136    if isinstance(type, str) and not label and ':' in type:
137        type, label = type.split(':', maxsplit=1)
138
139    with _locks['_loaded_plugin_connectors']:
140        if not _loaded_plugin_connectors:
141            load_plugin_connectors()
142            _load_builtin_custom_connectors()
143            _loaded_plugin_connectors = True
144
145    if type is None and label is None:
146        default_instance_keys = get_config('meerschaum', 'instance', patch=True)
147        ### recursive call to get_connector
148        return parse_instance_keys(default_instance_keys)
149
150    ### NOTE: the default instance connector may not be main.
151    ### Only fall back to 'main' if the type is provided by the label is omitted.
152    label = label if label is not None else STATIC_CONFIG['connectors']['default_label']
153
154    ### type might actually be a label. Check if so and raise a warning.
155    if type not in connectors:
156        possibilities, poss_msg = [], ""
157        for _type in get_config('meerschaum', 'connectors'):
158            if type in get_config('meerschaum', 'connectors', _type):
159                possibilities.append(f"{_type}:{type}")
160        if len(possibilities) > 0:
161            poss_msg = " Did you mean"
162            for poss in possibilities[:-1]:
163                poss_msg += f" '{poss}',"
164            if poss_msg.endswith(','):
165                poss_msg = poss_msg[:-1]
166            if len(possibilities) > 1:
167                poss_msg += " or"
168            poss_msg += f" '{possibilities[-1]}'?"
169
170        warn(f"Cannot create Connector of type '{type}'." + poss_msg, stack=False)
171        return None
172
173    if 'sql' not in types:
174        from meerschaum.connectors.plugin import PluginConnector
175        from meerschaum.connectors.valkey import ValkeyConnector
176        with _locks['types']:
177            types.update({
178                'api': APIConnector,
179                'sql': SQLConnector,
180                'plugin': PluginConnector,
181                'valkey': ValkeyConnector,
182            })
183
184    ### determine if we need to call the constructor
185    if not refresh:
186        ### see if any user-supplied arguments differ from the existing instance
187        if label in connectors[type]:
188            warning_message = None
189            for attribute, value in kw.items():
190                if attribute not in connectors[type][label].meta:
191                    import inspect
192                    cls = connectors[type][label].__class__
193                    cls_init_signature = inspect.signature(cls)
194                    cls_init_params = cls_init_signature.parameters
195                    if attribute not in cls_init_params:
196                        warning_message = (
197                            f"Received new attribute '{attribute}' not present in connector " +
198                            f"{connectors[type][label]}.\n"
199                        )
200                elif connectors[type][label].__dict__[attribute] != value:
201                    warning_message = (
202                        f"Mismatched values for attribute '{attribute}' in connector "
203                        + f"'{connectors[type][label]}'.\n" +
204                        f"  - Keyword value: '{value}'\n" +
205                        f"  - Existing value: '{connectors[type][label].__dict__[attribute]}'\n"
206                    )
207            if warning_message is not None:
208                warning_message += (
209                    "\nSetting `refresh` to True and recreating connector with type:"
210                    + f" '{type}' and label '{label}'."
211                )
212                refresh = True
213                warn(warning_message)
214        else: ### connector doesn't yet exist
215            refresh = True
216
217    ### only create an object if refresh is True
218    ### (can be manually specified, otherwise determined above)
219    if refresh:
220        with _locks['connectors']:
221            try:
222                ### will raise an error if configuration is incorrect / missing
223                conn = types[type](label=label, **kw)
224                connectors[type][label] = conn
225            except InvalidAttributesError as ie:
226                warn(
227                    f"Incorrect attributes for connector '{type}:{label}'.\n"
228                    + str(ie),
229                    stack = False,
230                )
231                conn = None
232            except Exception as e:
233                from meerschaum.utils.formatting import get_console
234                console = get_console()
235                if console:
236                    console.print_exception()
237                warn(
238                    f"Exception when creating connector '{type}:{label}'.\n" + str(e),
239                    stack = False,
240                )
241                conn = None
242        if conn is None:
243            return None
244
245    return connectors[type][label]

Return existing connector or create new connection and store for reuse.

You can create new connectors if enough parameters are provided for the given type and flavor.

Parameters
  • type (Optional[str], default None): Connector type (sql, api, etc.). Defaults to the type of the configured instance_connector.
  • label (Optional[str], default None): Connector label (e.g. main). Defaults to 'main'.
  • refresh (bool, default False): Refresh the Connector instance / construct new object. Defaults to False.
  • kw (Any): Other arguments to pass to the Connector constructor. If the Connector has already been constructed and new arguments are provided, refresh is set to True and the old Connector is replaced.
Returns
Examples

The following parameters would create a new meerschaum.connectors.sql.SQLConnector that isn't in the configuration file.

>>> conn = get_connector(
...     type = 'sql',
...     label = 'newlabel',
...     flavor = 'sqlite',
...     database = '/file/path/to/database.db'
... )
>>>
def get_config( *keys: str, patch: bool = True, substitute: bool = True, sync_files: bool = True, write_missing: bool = True, as_tuple: bool = False, warn: bool = True, debug: bool = False) -> Any:
 82def get_config(
 83    *keys: str,
 84    patch: bool = True,
 85    substitute: bool = True,
 86    sync_files: bool = True,
 87    write_missing: bool = True,
 88    as_tuple: bool = False,
 89    warn: bool = True,
 90    debug: bool = False
 91) -> Any:
 92    """
 93    Return the Meerschaum configuration dictionary.
 94    If positional arguments are provided, index by the keys.
 95    Raises a warning if invalid keys are provided.
 96
 97    Parameters
 98    ----------
 99    keys: str:
100        List of strings to index.
101
102    patch: bool, default True
103        If `True`, patch missing default keys into the config directory.
104        Defaults to `True`.
105
106    sync_files: bool, default True
107        If `True`, sync files if needed.
108        Defaults to `True`.
109
110    write_missing: bool, default True
111        If `True`, write default values when the main config files are missing.
112        Defaults to `True`.
113
114    substitute: bool, default True
115        If `True`, subsitute 'MRSM{}' values.
116        Defaults to `True`.
117
118    as_tuple: bool, default False
119        If `True`, return a tuple of type (success, value).
120        Defaults to `False`.
121        
122    Returns
123    -------
124    The value in the configuration directory, indexed by the provided keys.
125
126    Examples
127    --------
128    >>> get_config('meerschaum', 'instance')
129    'sql:main'
130    >>> get_config('does', 'not', 'exist')
131    UserWarning: Invalid keys in config: ('does', 'not', 'exist')
132    """
133    import json
134
135    symlinks_key = STATIC_CONFIG['config']['symlinks_key']
136    if debug:
137        from meerschaum.utils.debug import dprint
138        dprint(f"Indexing keys: {keys}", color=False)
139
140    if len(keys) == 0:
141        _rc = _config(substitute=substitute, sync_files=sync_files, write_missing=write_missing)
142        if as_tuple:
143            return True, _rc 
144        return _rc
145    
146    ### Weird threading issues, only import if substitute is True.
147    if substitute:
148        from meerschaum.config._read_config import search_and_substitute_config
149    ### Invalidate the cache if it was read before with substitute=False
150    ### but there still exist substitutions.
151    if (
152        config is not None and substitute and keys[0] != symlinks_key
153        and 'MRSM{' in json.dumps(config.get(keys[0]))
154    ):
155        try:
156            _subbed = search_and_substitute_config({keys[0]: config[keys[0]]})
157        except Exception as e:
158            import traceback
159            traceback.print_exc()
160        config[keys[0]] = _subbed[keys[0]]
161        if symlinks_key in _subbed:
162            if symlinks_key not in config:
163                config[symlinks_key] = {}
164            if keys[0] not in config[symlinks_key]:
165                config[symlinks_key][keys[0]] = {}
166            config[symlinks_key][keys[0]] = apply_patch_to_config(
167                _subbed,
168                config[symlinks_key][keys[0]]
169            )
170
171    from meerschaum.config._sync import sync_files as _sync_files
172    if config is None:
173        _config(*keys, sync_files=sync_files)
174
175    invalid_keys = False
176    if keys[0] not in config and keys[0] != symlinks_key:
177        single_key_config = read_config(
178            keys=[keys[0]], substitute=substitute, write_missing=write_missing
179        )
180        if keys[0] not in single_key_config:
181            invalid_keys = True
182        else:
183            config[keys[0]] = single_key_config.get(keys[0], None)
184            if symlinks_key in single_key_config and keys[0] in single_key_config[symlinks_key]:
185                if symlinks_key not in config:
186                    config[symlinks_key] = {}
187                config[symlinks_key][keys[0]] = single_key_config[symlinks_key][keys[0]]
188
189            if sync_files:
190                _sync_files(keys=[keys[0]])
191
192    c = config
193    if len(keys) > 0:
194        for k in keys:
195            try:
196                c = c[k]
197            except Exception as e:
198                invalid_keys = True
199                break
200        if invalid_keys:
201            ### Check if the keys are in the default configuration.
202            from meerschaum.config._default import default_config
203            in_default = True
204            patched_default_config = (
205                search_and_substitute_config(default_config)
206                if substitute else copy.deepcopy(default_config)
207            )
208            _c = patched_default_config
209            for k in keys:
210                try:
211                    _c = _c[k]
212                except Exception as e:
213                    in_default = False
214            if in_default:
215                c = _c
216                invalid_keys = False
217            warning_msg = f"Invalid keys in config: {keys}"
218            if not in_default:
219                try:
220                    if warn:
221                        from meerschaum.utils.warnings import warn as _warn
222                        _warn(warning_msg, stacklevel=3, color=False)
223                except Exception as e:
224                    if warn:
225                        print(warning_msg)
226                if as_tuple:
227                    return False, None
228                return None
229
230            ### Don't write keys that we haven't yet loaded into memory.
231            not_loaded_keys = [k for k in patched_default_config if k not in config]
232            for k in not_loaded_keys:
233                patched_default_config.pop(k, None)
234
235            set_config(
236                apply_patch_to_config(
237                    patched_default_config,
238                    config,
239                )
240            )
241            if patch and keys[0] != symlinks_key:
242                if write_missing:
243                    write_config(config, debug=debug)
244
245    if as_tuple:
246        return (not invalid_keys), c
247    return c

Return the Meerschaum configuration dictionary. If positional arguments are provided, index by the keys. Raises a warning if invalid keys are provided.

Parameters
  • keys (str:): List of strings to index.
  • patch (bool, default True): If True, patch missing default keys into the config directory. Defaults to True.
  • sync_files (bool, default True): If True, sync files if needed. Defaults to True.
  • write_missing (bool, default True): If True, write default values when the main config files are missing. Defaults to True.
  • substitute (bool, default True): If True, subsitute 'MRSM{}' values. Defaults to True.
  • as_tuple (bool, default False): If True, return a tuple of type (success, value). Defaults to False.
Returns
  • The value in the configuration directory, indexed by the provided keys.
Examples
>>> get_config('meerschaum', 'instance')
'sql:main'
>>> get_config('does', 'not', 'exist')
UserWarning: Invalid keys in config: ('does', 'not', 'exist')
class Pipe:
 60class Pipe:
 61    """
 62    Access Meerschaum pipes via Pipe objects.
 63    
 64    Pipes are identified by the following:
 65
 66    1. Connector keys (e.g. `'sql:main'`)
 67    2. Metric key (e.g. `'weather'`)
 68    3. Location (optional; e.g. `None`)
 69    
 70    A pipe's connector keys correspond to a data source, and when the pipe is synced,
 71    its `fetch` definition is evaluated and executed to produce new data.
 72    
 73    Alternatively, new data may be directly synced via `pipe.sync()`:
 74    
 75    ```
 76    >>> from meerschaum import Pipe
 77    >>> pipe = Pipe('csv', 'weather')
 78    >>>
 79    >>> import pandas as pd
 80    >>> df = pd.read_csv('weather.csv')
 81    >>> pipe.sync(df)
 82    ```
 83    """
 84
 85    from ._fetch import (
 86        fetch,
 87        get_backtrack_interval,
 88    )
 89    from ._data import (
 90        get_data,
 91        get_backtrack_data,
 92        get_rowcount,
 93        _get_data_as_iterator,
 94        get_chunk_interval,
 95        get_chunk_bounds,
 96        parse_date_bounds,
 97    )
 98    from ._register import register
 99    from ._attributes import (
100        attributes,
101        parameters,
102        columns,
103        indices,
104        indexes,
105        dtypes,
106        autoincrement,
107        upsert,
108        static,
109        tzinfo,
110        enforce,
111        null_indices,
112        get_columns,
113        get_columns_types,
114        get_columns_indices,
115        get_indices,
116        tags,
117        get_id,
118        id,
119        get_val_column,
120        parents,
121        children,
122        target,
123        _target_legacy,
124        guess_datetime,
125    )
126    from ._show import show
127    from ._edit import edit, edit_definition, update
128    from ._sync import (
129        sync,
130        get_sync_time,
131        exists,
132        filter_existing,
133        _get_chunk_label,
134        get_num_workers,
135        _persist_new_json_columns,
136        _persist_new_numeric_columns,
137        _persist_new_uuid_columns,
138        _persist_new_bytes_columns,
139    )
140    from ._verify import (
141        verify,
142        get_bound_interval,
143        get_bound_time,
144    )
145    from ._delete import delete
146    from ._drop import drop, drop_indices
147    from ._index import create_indices
148    from ._clear import clear
149    from ._deduplicate import deduplicate
150    from ._bootstrap import bootstrap
151    from ._dtypes import enforce_dtypes, infer_dtypes
152    from ._copy import copy_to
153
154    def __init__(
155        self,
156        connector: str = '',
157        metric: str = '',
158        location: Optional[str] = None,
159        parameters: Optional[Dict[str, Any]] = None,
160        columns: Union[Dict[str, str], List[str], None] = None,
161        indices: Optional[Dict[str, Union[str, List[str]]]] = None,
162        tags: Optional[List[str]] = None,
163        target: Optional[str] = None,
164        dtypes: Optional[Dict[str, str]] = None,
165        instance: Optional[Union[str, InstanceConnector]] = None,
166        temporary: bool = False,
167        upsert: Optional[bool] = None,
168        autoincrement: Optional[bool] = None,
169        static: Optional[bool] = None,
170        enforce: Optional[bool] = None,
171        null_indices: Optional[bool] = None,
172        mrsm_instance: Optional[Union[str, InstanceConnector]] = None,
173        cache: bool = False,
174        debug: bool = False,
175        connector_keys: Optional[str] = None,
176        metric_key: Optional[str] = None,
177        location_key: Optional[str] = None,
178        instance_keys: Optional[str] = None,
179        indexes: Union[Dict[str, str], List[str], None] = None,
180    ):
181        """
182        Parameters
183        ----------
184        connector: str
185            Keys for the pipe's source connector, e.g. `'sql:main'`.
186
187        metric: str
188            Label for the pipe's contents, e.g. `'weather'`.
189
190        location: str, default None
191            Label for the pipe's location. Defaults to `None`.
192
193        parameters: Optional[Dict[str, Any]], default None
194            Optionally set a pipe's parameters from the constructor,
195            e.g. columns and other attributes.
196            You can edit these parameters with `edit pipes`.
197
198        columns: Union[Dict[str, str], List[str], None], default None
199            Set the `columns` dictionary of `parameters`.
200            If `parameters` is also provided, this dictionary is added under the `'columns'` key.
201
202        indices: Optional[Dict[str, Union[str, List[str]]]], default None
203            Set the `indices` dictionary of `parameters`.
204            If `parameters` is also provided, this dictionary is added under the `'indices'` key.
205
206        tags: Optional[List[str]], default None
207            A list of strings to be added under the `'tags'` key of `parameters`.
208            You can select pipes with certain tags using `--tags`.
209
210        dtypes: Optional[Dict[str, str]], default None
211            Set the `dtypes` dictionary of `parameters`.
212            If `parameters` is also provided, this dictionary is added under the `'dtypes'` key.
213
214        mrsm_instance: Optional[Union[str, InstanceConnector]], default None
215            Connector for the Meerschaum instance where the pipe resides.
216            Defaults to the preconfigured default instance (`'sql:main'`).
217
218        instance: Optional[Union[str, InstanceConnector]], default None
219            Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored.
220
221        upsert: Optional[bool], default None
222            If `True`, set `upsert` to `True` in the parameters.
223
224        autoincrement: Optional[bool], default None
225            If `True`, set `autoincrement` in the parameters.
226
227        static: Optional[bool], default None
228            If `True`, set `static` in the parameters.
229
230        enforce: Optional[bool], default None
231            If `False`, skip data type enforcement.
232            Default behavior is `True`.
233
234        null_indices: Optional[bool], default None
235            Set to `False` if there will be no null values in the index columns.
236            Defaults to `True`.
237
238        temporary: bool, default False
239            If `True`, prevent instance tables (pipes, users, plugins) from being created.
240
241        cache: bool, default False
242            If `True`, cache fetched data into a local database file.
243            Defaults to `False`.
244        """
245        from meerschaum.utils.warnings import error, warn
246        if (not connector and not connector_keys) or (not metric and not metric_key):
247            error(
248                "Please provide strings for the connector and metric\n    "
249                + "(first two positional arguments)."
250            )
251
252        ### Fall back to legacy `location_key` just in case.
253        if not location:
254            location = location_key
255
256        if not connector:
257            connector = connector_keys
258
259        if not metric:
260            metric = metric_key
261
262        if location in ('[None]', 'None'):
263            location = None
264
265        from meerschaum.config.static import STATIC_CONFIG
266        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
267        for k in (connector, metric, location, *(tags or [])):
268            if str(k).startswith(negation_prefix):
269                error(f"A pipe's keys and tags cannot start with the prefix '{negation_prefix}'.")
270
271        self.connector_keys = str(connector)
272        self.connector_key = self.connector_keys ### Alias
273        self.metric_key = metric
274        self.location_key = location
275        self.temporary = temporary
276
277        self._attributes = {
278            'connector_keys': self.connector_keys,
279            'metric_key': self.metric_key,
280            'location_key': self.location_key,
281            'parameters': {},
282        }
283
284        ### only set parameters if values are provided
285        if isinstance(parameters, dict):
286            self._attributes['parameters'] = parameters
287        else:
288            if parameters is not None:
289                warn(f"The provided parameters are of invalid type '{type(parameters)}'.")
290            self._attributes['parameters'] = {}
291
292        columns = columns or self._attributes.get('parameters', {}).get('columns', {})
293        if isinstance(columns, list):
294            columns = {str(col): str(col) for col in columns}
295        if isinstance(columns, dict):
296            self._attributes['parameters']['columns'] = columns
297        elif columns is not None:
298            warn(f"The provided columns are of invalid type '{type(columns)}'.")
299
300        indices = (
301            indices
302            or indexes
303            or self._attributes.get('parameters', {}).get('indices', None)
304            or self._attributes.get('parameters', {}).get('indexes', None)
305        )
306        if isinstance(indices, dict):
307            indices_key = (
308                'indexes'
309                if 'indexes' in self._attributes['parameters']
310                else 'indices'
311            )
312            self._attributes['parameters'][indices_key] = indices
313
314        if isinstance(tags, (list, tuple)):
315            self._attributes['parameters']['tags'] = tags
316        elif tags is not None:
317            warn(f"The provided tags are of invalid type '{type(tags)}'.")
318
319        if isinstance(target, str):
320            self._attributes['parameters']['target'] = target
321        elif target is not None:
322            warn(f"The provided target is of invalid type '{type(target)}'.")
323
324        if isinstance(dtypes, dict):
325            self._attributes['parameters']['dtypes'] = dtypes
326        elif dtypes is not None:
327            warn(f"The provided dtypes are of invalid type '{type(dtypes)}'.")
328
329        if isinstance(upsert, bool):
330            self._attributes['parameters']['upsert'] = upsert
331
332        if isinstance(autoincrement, bool):
333            self._attributes['parameters']['autoincrement'] = autoincrement
334
335        if isinstance(static, bool):
336            self._attributes['parameters']['static'] = static
337
338        if isinstance(enforce, bool):
339            self._attributes['parameters']['enforce'] = enforce
340
341        if isinstance(null_indices, bool):
342            self._attributes['parameters']['null_indices'] = null_indices
343
344        ### NOTE: The parameters dictionary is {} by default.
345        ###       A Pipe may be registered without parameters, then edited,
346        ###       or a Pipe may be registered with parameters set in-memory first.
347        _mrsm_instance = mrsm_instance if mrsm_instance is not None else (instance or instance_keys)
348        if _mrsm_instance is None:
349            _mrsm_instance = get_config('meerschaum', 'instance', patch=True)
350
351        if not isinstance(_mrsm_instance, str):
352            self._instance_connector = _mrsm_instance
353            self.instance_keys = str(_mrsm_instance)
354        else: ### NOTE: must be SQL or API Connector for this work
355            self.instance_keys = _mrsm_instance
356
357        self._cache = cache and get_config('system', 'experimental', 'cache')
358
359    @property
360    def meta(self):
361        """
362        Return the four keys needed to reconstruct this pipe.
363        """
364        return {
365            'connector_keys': self.connector_keys,
366            'metric_key': self.metric_key,
367            'location_key': self.location_key,
368            'instance_keys': self.instance_keys,
369        }
370
371    def keys(self) -> List[str]:
372        """
373        Return the ordered keys for this pipe.
374        """
375        return {
376            key: val
377            for key, val in self.meta.items()
378            if key != 'instance'
379        }
380
381    @property
382    def instance_connector(self) -> Union[InstanceConnector, None]:
383        """
384        The connector to where this pipe resides.
385        May either be of type `meerschaum.connectors.sql.SQLConnector` or
386        `meerschaum.connectors.api.APIConnector`.
387        """
388        if '_instance_connector' not in self.__dict__:
389            from meerschaum.connectors.parse import parse_instance_keys
390            conn = parse_instance_keys(self.instance_keys)
391            if conn:
392                self._instance_connector = conn
393            else:
394                return None
395        return self._instance_connector
396
397    @property
398    def connector(self) -> Union[meerschaum.connectors.Connector, None]:
399        """
400        The connector to the data source.
401        """
402        if '_connector' not in self.__dict__:
403            from meerschaum.connectors.parse import parse_instance_keys
404            import warnings
405            with warnings.catch_warnings():
406                warnings.simplefilter('ignore')
407                try:
408                    conn = parse_instance_keys(self.connector_keys)
409                except Exception:
410                    conn = None
411            if conn:
412                self._connector = conn
413            else:
414                return None
415        return self._connector
416
417    @property
418    def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]:
419        """
420        If the pipe was created with `cache=True`, return the connector to the pipe's
421        SQLite database for caching.
422        """
423        if not self._cache:
424            return None
425
426        if '_cache_connector' not in self.__dict__:
427            from meerschaum.connectors import get_connector
428            from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH
429            _resources_path = SQLITE_RESOURCES_PATH
430            self._cache_connector = get_connector(
431                'sql', '_cache_' + str(self),
432                flavor='sqlite',
433                database=str(_resources_path / ('_cache_' + str(self) + '.db')),
434            )
435
436        return self._cache_connector
437
438    @property
439    def cache_pipe(self) -> Union['meerschaum.Pipe', None]:
440        """
441        If the pipe was created with `cache=True`, return another `meerschaum.Pipe` used to
442        manage the local data.
443        """
444        if self.cache_connector is None:
445            return None
446        if '_cache_pipe' not in self.__dict__:
447            from meerschaum.config._patch import apply_patch_to_config
448            from meerschaum.utils.sql import sql_item_name
449            _parameters = copy.deepcopy(self.parameters)
450            _fetch_patch = {
451                'fetch': ({
452                    'definition': (
453                        "SELECT * FROM "
454                        + sql_item_name(
455                            str(self.target),
456                            self.instance_connector.flavor,
457                            self.instance_connector.get_pipe_schema(self),
458                        )
459                    ),
460                }) if self.instance_connector.type == 'sql' else ({
461                    'connector_keys': self.connector_keys,
462                    'metric_key': self.metric_key,
463                    'location_key': self.location_key,
464                })
465            }
466            _parameters = apply_patch_to_config(_parameters, _fetch_patch)
467            self._cache_pipe = Pipe(
468                self.instance_keys,
469                (self.connector_keys + '_' + self.metric_key + '_cache'),
470                self.location_key,
471                mrsm_instance = self.cache_connector,
472                parameters = _parameters,
473                cache = False,
474                temporary = True,
475            )
476
477        return self._cache_pipe
478
479    def __str__(self, ansi: bool=False):
480        return pipe_repr(self, ansi=ansi)
481
482    def __eq__(self, other):
483        try:
484            return (
485                isinstance(self, type(other))
486                and self.connector_keys == other.connector_keys
487                and self.metric_key == other.metric_key
488                and self.location_key == other.location_key
489                and self.instance_keys == other.instance_keys
490            )
491        except Exception:
492            return False
493
494    def __hash__(self):
495        ### Using an esoteric separator to avoid collisions.
496        sep = "[\"']"
497        return hash(
498            str(self.connector_keys) + sep
499            + str(self.metric_key) + sep
500            + str(self.location_key) + sep
501            + str(self.instance_keys) + sep
502        )
503
504    def __repr__(self, ansi: bool=True, **kw) -> str:
505        if not hasattr(sys, 'ps1'):
506            ansi = False
507
508        return pipe_repr(self, ansi=ansi, **kw)
509
510    def __pt_repr__(self):
511        from meerschaum.utils.packages import attempt_import
512        prompt_toolkit_formatted_text = attempt_import('prompt_toolkit.formatted_text', lazy=False)
513        return prompt_toolkit_formatted_text.ANSI(pipe_repr(self, ansi=True))
514
515    def __getstate__(self) -> Dict[str, Any]:
516        """
517        Define the state dictionary (pickling).
518        """
519        return {
520            'connector_keys': self.connector_keys,
521            'metric_key': self.metric_key,
522            'location_key': self.location_key,
523            'parameters': self.parameters,
524            'instance_keys': self.instance_keys,
525        }
526
527    def __setstate__(self, _state: Dict[str, Any]):
528        """
529        Read the state (unpickling).
530        """
531        self.__init__(**_state)
532
533    def __getitem__(self, key: str) -> Any:
534        """
535        Index the pipe's attributes.
536        If the `key` cannot be found`, return `None`.
537        """
538        if key in self.attributes:
539            return self.attributes.get(key, None)
540
541        aliases = {
542            'connector': 'connector_keys',
543            'connector_key': 'connector_keys',
544            'metric': 'metric_key',
545            'location': 'location_key',
546        }
547        aliased_key = aliases.get(key, None)
548        if aliased_key is not None:
549            return self.attributes.get(aliased_key, None)
550
551        property_aliases = {
552            'instance': 'instance_keys',
553            'instance_key': 'instance_keys',
554        }
555        aliased_key = property_aliases.get(key, None)
556        if aliased_key is not None:
557            key = aliased_key
558        return getattr(self, key, None)

Access Meerschaum pipes via Pipe objects.

Pipes are identified by the following:

  1. Connector keys (e.g. 'sql:main')
  2. Metric key (e.g. 'weather')
  3. Location (optional; e.g. None)

A pipe's connector keys correspond to a data source, and when the pipe is synced, its fetch definition is evaluated and executed to produce new data.

Alternatively, new data may be directly synced via pipe.sync():

>>> from meerschaum import Pipe
>>> pipe = Pipe('csv', 'weather')
>>>
>>> import pandas as pd
>>> df = pd.read_csv('weather.csv')
>>> pipe.sync(df)
Pipe( connector: str = '', metric: str = '', location: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None, columns: Union[Dict[str, str], List[str], NoneType] = None, indices: Optional[Dict[str, Union[str, List[str]]]] = None, tags: Optional[List[str]] = None, target: Optional[str] = None, dtypes: Optional[Dict[str, str]] = None, instance: Union[str, meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, temporary: bool = False, upsert: Optional[bool] = None, autoincrement: Optional[bool] = None, static: Optional[bool] = None, enforce: Optional[bool] = None, null_indices: Optional[bool] = None, mrsm_instance: Union[str, meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, cache: bool = False, debug: bool = False, connector_keys: Optional[str] = None, metric_key: Optional[str] = None, location_key: Optional[str] = None, instance_keys: Optional[str] = None, indexes: Union[Dict[str, str], List[str], NoneType] = None)
154    def __init__(
155        self,
156        connector: str = '',
157        metric: str = '',
158        location: Optional[str] = None,
159        parameters: Optional[Dict[str, Any]] = None,
160        columns: Union[Dict[str, str], List[str], None] = None,
161        indices: Optional[Dict[str, Union[str, List[str]]]] = None,
162        tags: Optional[List[str]] = None,
163        target: Optional[str] = None,
164        dtypes: Optional[Dict[str, str]] = None,
165        instance: Optional[Union[str, InstanceConnector]] = None,
166        temporary: bool = False,
167        upsert: Optional[bool] = None,
168        autoincrement: Optional[bool] = None,
169        static: Optional[bool] = None,
170        enforce: Optional[bool] = None,
171        null_indices: Optional[bool] = None,
172        mrsm_instance: Optional[Union[str, InstanceConnector]] = None,
173        cache: bool = False,
174        debug: bool = False,
175        connector_keys: Optional[str] = None,
176        metric_key: Optional[str] = None,
177        location_key: Optional[str] = None,
178        instance_keys: Optional[str] = None,
179        indexes: Union[Dict[str, str], List[str], None] = None,
180    ):
181        """
182        Parameters
183        ----------
184        connector: str
185            Keys for the pipe's source connector, e.g. `'sql:main'`.
186
187        metric: str
188            Label for the pipe's contents, e.g. `'weather'`.
189
190        location: str, default None
191            Label for the pipe's location. Defaults to `None`.
192
193        parameters: Optional[Dict[str, Any]], default None
194            Optionally set a pipe's parameters from the constructor,
195            e.g. columns and other attributes.
196            You can edit these parameters with `edit pipes`.
197
198        columns: Union[Dict[str, str], List[str], None], default None
199            Set the `columns` dictionary of `parameters`.
200            If `parameters` is also provided, this dictionary is added under the `'columns'` key.
201
202        indices: Optional[Dict[str, Union[str, List[str]]]], default None
203            Set the `indices` dictionary of `parameters`.
204            If `parameters` is also provided, this dictionary is added under the `'indices'` key.
205
206        tags: Optional[List[str]], default None
207            A list of strings to be added under the `'tags'` key of `parameters`.
208            You can select pipes with certain tags using `--tags`.
209
210        dtypes: Optional[Dict[str, str]], default None
211            Set the `dtypes` dictionary of `parameters`.
212            If `parameters` is also provided, this dictionary is added under the `'dtypes'` key.
213
214        mrsm_instance: Optional[Union[str, InstanceConnector]], default None
215            Connector for the Meerschaum instance where the pipe resides.
216            Defaults to the preconfigured default instance (`'sql:main'`).
217
218        instance: Optional[Union[str, InstanceConnector]], default None
219            Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored.
220
221        upsert: Optional[bool], default None
222            If `True`, set `upsert` to `True` in the parameters.
223
224        autoincrement: Optional[bool], default None
225            If `True`, set `autoincrement` in the parameters.
226
227        static: Optional[bool], default None
228            If `True`, set `static` in the parameters.
229
230        enforce: Optional[bool], default None
231            If `False`, skip data type enforcement.
232            Default behavior is `True`.
233
234        null_indices: Optional[bool], default None
235            Set to `False` if there will be no null values in the index columns.
236            Defaults to `True`.
237
238        temporary: bool, default False
239            If `True`, prevent instance tables (pipes, users, plugins) from being created.
240
241        cache: bool, default False
242            If `True`, cache fetched data into a local database file.
243            Defaults to `False`.
244        """
245        from meerschaum.utils.warnings import error, warn
246        if (not connector and not connector_keys) or (not metric and not metric_key):
247            error(
248                "Please provide strings for the connector and metric\n    "
249                + "(first two positional arguments)."
250            )
251
252        ### Fall back to legacy `location_key` just in case.
253        if not location:
254            location = location_key
255
256        if not connector:
257            connector = connector_keys
258
259        if not metric:
260            metric = metric_key
261
262        if location in ('[None]', 'None'):
263            location = None
264
265        from meerschaum.config.static import STATIC_CONFIG
266        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
267        for k in (connector, metric, location, *(tags or [])):
268            if str(k).startswith(negation_prefix):
269                error(f"A pipe's keys and tags cannot start with the prefix '{negation_prefix}'.")
270
271        self.connector_keys = str(connector)
272        self.connector_key = self.connector_keys ### Alias
273        self.metric_key = metric
274        self.location_key = location
275        self.temporary = temporary
276
277        self._attributes = {
278            'connector_keys': self.connector_keys,
279            'metric_key': self.metric_key,
280            'location_key': self.location_key,
281            'parameters': {},
282        }
283
284        ### only set parameters if values are provided
285        if isinstance(parameters, dict):
286            self._attributes['parameters'] = parameters
287        else:
288            if parameters is not None:
289                warn(f"The provided parameters are of invalid type '{type(parameters)}'.")
290            self._attributes['parameters'] = {}
291
292        columns = columns or self._attributes.get('parameters', {}).get('columns', {})
293        if isinstance(columns, list):
294            columns = {str(col): str(col) for col in columns}
295        if isinstance(columns, dict):
296            self._attributes['parameters']['columns'] = columns
297        elif columns is not None:
298            warn(f"The provided columns are of invalid type '{type(columns)}'.")
299
300        indices = (
301            indices
302            or indexes
303            or self._attributes.get('parameters', {}).get('indices', None)
304            or self._attributes.get('parameters', {}).get('indexes', None)
305        )
306        if isinstance(indices, dict):
307            indices_key = (
308                'indexes'
309                if 'indexes' in self._attributes['parameters']
310                else 'indices'
311            )
312            self._attributes['parameters'][indices_key] = indices
313
314        if isinstance(tags, (list, tuple)):
315            self._attributes['parameters']['tags'] = tags
316        elif tags is not None:
317            warn(f"The provided tags are of invalid type '{type(tags)}'.")
318
319        if isinstance(target, str):
320            self._attributes['parameters']['target'] = target
321        elif target is not None:
322            warn(f"The provided target is of invalid type '{type(target)}'.")
323
324        if isinstance(dtypes, dict):
325            self._attributes['parameters']['dtypes'] = dtypes
326        elif dtypes is not None:
327            warn(f"The provided dtypes are of invalid type '{type(dtypes)}'.")
328
329        if isinstance(upsert, bool):
330            self._attributes['parameters']['upsert'] = upsert
331
332        if isinstance(autoincrement, bool):
333            self._attributes['parameters']['autoincrement'] = autoincrement
334
335        if isinstance(static, bool):
336            self._attributes['parameters']['static'] = static
337
338        if isinstance(enforce, bool):
339            self._attributes['parameters']['enforce'] = enforce
340
341        if isinstance(null_indices, bool):
342            self._attributes['parameters']['null_indices'] = null_indices
343
344        ### NOTE: The parameters dictionary is {} by default.
345        ###       A Pipe may be registered without parameters, then edited,
346        ###       or a Pipe may be registered with parameters set in-memory first.
347        _mrsm_instance = mrsm_instance if mrsm_instance is not None else (instance or instance_keys)
348        if _mrsm_instance is None:
349            _mrsm_instance = get_config('meerschaum', 'instance', patch=True)
350
351        if not isinstance(_mrsm_instance, str):
352            self._instance_connector = _mrsm_instance
353            self.instance_keys = str(_mrsm_instance)
354        else: ### NOTE: must be SQL or API Connector for this work
355            self.instance_keys = _mrsm_instance
356
357        self._cache = cache and get_config('system', 'experimental', 'cache')
Parameters
  • connector (str): Keys for the pipe's source connector, e.g. 'sql:main'.
  • metric (str): Label for the pipe's contents, e.g. 'weather'.
  • location (str, default None): Label for the pipe's location. Defaults to None.
  • parameters (Optional[Dict[str, Any]], default None): Optionally set a pipe's parameters from the constructor, e.g. columns and other attributes. You can edit these parameters with edit pipes.
  • columns (Union[Dict[str, str], List[str], None], default None): Set the columns dictionary of parameters. If parameters is also provided, this dictionary is added under the 'columns' key.
  • indices (Optional[Dict[str, Union[str, List[str]]]], default None): Set the indices dictionary of parameters. If parameters is also provided, this dictionary is added under the 'indices' key.
  • tags (Optional[List[str]], default None): A list of strings to be added under the 'tags' key of parameters. You can select pipes with certain tags using --tags.
  • dtypes (Optional[Dict[str, str]], default None): Set the dtypes dictionary of parameters. If parameters is also provided, this dictionary is added under the 'dtypes' key.
  • mrsm_instance (Optional[Union[str, InstanceConnector]], default None): Connector for the Meerschaum instance where the pipe resides. Defaults to the preconfigured default instance ('sql:main').
  • instance (Optional[Union[str, InstanceConnector]], default None): Alias for mrsm_instance. If mrsm_instance is supplied, this value is ignored.
  • upsert (Optional[bool], default None): If True, set upsert to True in the parameters.
  • autoincrement (Optional[bool], default None): If True, set autoincrement in the parameters.
  • static (Optional[bool], default None): If True, set static in the parameters.
  • enforce (Optional[bool], default None): If False, skip data type enforcement. Default behavior is True.
  • null_indices (Optional[bool], default None): Set to False if there will be no null values in the index columns. Defaults to True.
  • temporary (bool, default False): If True, prevent instance tables (pipes, users, plugins) from being created.
  • cache (bool, default False): If True, cache fetched data into a local database file. Defaults to False.
connector_keys
connector_key
metric_key
location_key
temporary
meta
359    @property
360    def meta(self):
361        """
362        Return the four keys needed to reconstruct this pipe.
363        """
364        return {
365            'connector_keys': self.connector_keys,
366            'metric_key': self.metric_key,
367            'location_key': self.location_key,
368            'instance_keys': self.instance_keys,
369        }

Return the four keys needed to reconstruct this pipe.

def keys(self) -> List[str]:
371    def keys(self) -> List[str]:
372        """
373        Return the ordered keys for this pipe.
374        """
375        return {
376            key: val
377            for key, val in self.meta.items()
378            if key != 'instance'
379        }

Return the ordered keys for this pipe.

instance_connector: Union[meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType]
381    @property
382    def instance_connector(self) -> Union[InstanceConnector, None]:
383        """
384        The connector to where this pipe resides.
385        May either be of type `meerschaum.connectors.sql.SQLConnector` or
386        `meerschaum.connectors.api.APIConnector`.
387        """
388        if '_instance_connector' not in self.__dict__:
389            from meerschaum.connectors.parse import parse_instance_keys
390            conn = parse_instance_keys(self.instance_keys)
391            if conn:
392                self._instance_connector = conn
393            else:
394                return None
395        return self._instance_connector

The connector to where this pipe resides. May either be of type meerschaum.connectors.sql.SQLConnector or meerschaum.connectors.api.APIConnector.

connector: Optional[Connector]
397    @property
398    def connector(self) -> Union[meerschaum.connectors.Connector, None]:
399        """
400        The connector to the data source.
401        """
402        if '_connector' not in self.__dict__:
403            from meerschaum.connectors.parse import parse_instance_keys
404            import warnings
405            with warnings.catch_warnings():
406                warnings.simplefilter('ignore')
407                try:
408                    conn = parse_instance_keys(self.connector_keys)
409                except Exception:
410                    conn = None
411            if conn:
412                self._connector = conn
413            else:
414                return None
415        return self._connector

The connector to the data source.

cache_connector: Optional[meerschaum.connectors.SQLConnector]
417    @property
418    def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]:
419        """
420        If the pipe was created with `cache=True`, return the connector to the pipe's
421        SQLite database for caching.
422        """
423        if not self._cache:
424            return None
425
426        if '_cache_connector' not in self.__dict__:
427            from meerschaum.connectors import get_connector
428            from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH
429            _resources_path = SQLITE_RESOURCES_PATH
430            self._cache_connector = get_connector(
431                'sql', '_cache_' + str(self),
432                flavor='sqlite',
433                database=str(_resources_path / ('_cache_' + str(self) + '.db')),
434            )
435
436        return self._cache_connector

If the pipe was created with cache=True, return the connector to the pipe's SQLite database for caching.

cache_pipe: Optional[Pipe]
438    @property
439    def cache_pipe(self) -> Union['meerschaum.Pipe', None]:
440        """
441        If the pipe was created with `cache=True`, return another `meerschaum.Pipe` used to
442        manage the local data.
443        """
444        if self.cache_connector is None:
445            return None
446        if '_cache_pipe' not in self.__dict__:
447            from meerschaum.config._patch import apply_patch_to_config
448            from meerschaum.utils.sql import sql_item_name
449            _parameters = copy.deepcopy(self.parameters)
450            _fetch_patch = {
451                'fetch': ({
452                    'definition': (
453                        "SELECT * FROM "
454                        + sql_item_name(
455                            str(self.target),
456                            self.instance_connector.flavor,
457                            self.instance_connector.get_pipe_schema(self),
458                        )
459                    ),
460                }) if self.instance_connector.type == 'sql' else ({
461                    'connector_keys': self.connector_keys,
462                    'metric_key': self.metric_key,
463                    'location_key': self.location_key,
464                })
465            }
466            _parameters = apply_patch_to_config(_parameters, _fetch_patch)
467            self._cache_pipe = Pipe(
468                self.instance_keys,
469                (self.connector_keys + '_' + self.metric_key + '_cache'),
470                self.location_key,
471                mrsm_instance = self.cache_connector,
472                parameters = _parameters,
473                cache = False,
474                temporary = True,
475            )
476
477        return self._cache_pipe

If the pipe was created with cache=True, return another meerschaum.Pipe used to manage the local data.

def fetch( self, begin: Union[datetime.datetime, int, str, NoneType] = '', end: Union[datetime.datetime, int, NoneType] = None, check_existing: bool = True, sync_chunks: bool = False, debug: bool = False, **kw: Any) -> Union[pandas.core.frame.DataFrame, Iterator[pandas.core.frame.DataFrame], NoneType]:
21def fetch(
22    self,
23    begin: Union[datetime, int, str, None] = '',
24    end: Union[datetime, int, None] = None,
25    check_existing: bool = True,
26    sync_chunks: bool = False,
27    debug: bool = False,
28    **kw: Any
29) -> Union['pd.DataFrame', Iterator['pd.DataFrame'], None]:
30    """
31    Fetch a Pipe's latest data from its connector.
32
33    Parameters
34    ----------
35    begin: Union[datetime, str, None], default '':
36        If provided, only fetch data newer than or equal to `begin`.
37
38    end: Optional[datetime], default None:
39        If provided, only fetch data older than or equal to `end`.
40
41    check_existing: bool, default True
42        If `False`, do not apply the backtrack interval.
43
44    sync_chunks: bool, default False
45        If `True` and the pipe's connector is of type `'sql'`, begin syncing chunks while fetching
46        loads chunks into memory.
47
48    debug: bool, default False
49        Verbosity toggle.
50
51    Returns
52    -------
53    A `pd.DataFrame` of the newest unseen data.
54
55    """
56    if 'fetch' not in dir(self.connector):
57        warn(f"No `fetch()` function defined for connector '{self.connector}'")
58        return None
59
60    from meerschaum.connectors import get_connector_plugin
61    from meerschaum.utils.misc import filter_arguments
62
63    _chunk_hook = kw.pop('chunk_hook', None)
64    kw['workers'] = self.get_num_workers(kw.get('workers', None))
65    if sync_chunks and _chunk_hook is None:
66
67        def _chunk_hook(chunk, **_kw) -> SuccessTuple:
68            """
69            Wrap `Pipe.sync()` with a custom chunk label prepended to the message.
70            """
71            from meerschaum.config._patch import apply_patch_to_config
72            kwargs = apply_patch_to_config(kw, _kw)
73            chunk_success, chunk_message = self.sync(chunk, **kwargs)
74            chunk_label = self._get_chunk_label(chunk, self.columns.get('datetime', None))
75            if chunk_label:
76                chunk_message = '\n' + chunk_label + '\n' + chunk_message
77            return chunk_success, chunk_message
78
79    begin, end = self.parse_date_bounds(begin, end)
80
81    with mrsm.Venv(get_connector_plugin(self.connector)):
82        _args, _kwargs = filter_arguments(
83            self.connector.fetch,
84            self,
85            begin=_determine_begin(
86                self,
87                begin,
88                end,
89                check_existing=check_existing,
90                debug=debug,
91            ),
92            end=end,
93            chunk_hook=_chunk_hook,
94            debug=debug,
95            **kw
96        )
97        df = self.connector.fetch(*_args, **_kwargs)
98    return df

Fetch a Pipe's latest data from its connector.

Parameters
  • begin (Union[datetime, str, None], default '':): If provided, only fetch data newer than or equal to begin.
  • end (Optional[datetime], default None:): If provided, only fetch data older than or equal to end.
  • check_existing (bool, default True): If False, do not apply the backtrack interval.
  • sync_chunks (bool, default False): If True and the pipe's connector is of type 'sql', begin syncing chunks while fetching loads chunks into memory.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pd.DataFrame of the newest unseen data.
def get_backtrack_interval( self, check_existing: bool = True, debug: bool = False) -> Union[datetime.timedelta, int]:
101def get_backtrack_interval(
102    self,
103    check_existing: bool = True,
104    debug: bool = False,
105) -> Union[timedelta, int]:
106    """
107    Get the chunk interval to use for this pipe.
108
109    Parameters
110    ----------
111    check_existing: bool, default True
112        If `False`, return a backtrack_interval of 0 minutes.
113
114    Returns
115    -------
116    The backtrack interval (`timedelta` or `int`) to use with this pipe's `datetime` axis.
117    """
118    default_backtrack_minutes = get_config('pipes', 'parameters', 'fetch', 'backtrack_minutes')
119    configured_backtrack_minutes = self.parameters.get('fetch', {}).get('backtrack_minutes', None)
120    backtrack_minutes = (
121        configured_backtrack_minutes
122        if configured_backtrack_minutes is not None
123        else default_backtrack_minutes
124    ) if check_existing else 0
125
126    backtrack_interval = timedelta(minutes=backtrack_minutes)
127    dt_col = self.columns.get('datetime', None)
128    if dt_col is None:
129        return backtrack_interval
130
131    dt_dtype = self.dtypes.get(dt_col, 'datetime64[ns, UTC]')
132    if 'int' in dt_dtype.lower():
133        return backtrack_minutes
134
135    return backtrack_interval

Get the chunk interval to use for this pipe.

Parameters
  • check_existing (bool, default True): If False, return a backtrack_interval of 0 minutes.
Returns
  • The backtrack interval (timedelta or int) to use with this pipe's datetime axis.
def get_data( self, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[datetime.datetime, int, str, NoneType] = None, end: Union[datetime.datetime, int, str, NoneType] = None, params: Optional[Dict[str, Any]] = None, as_iterator: bool = False, as_chunks: bool = False, as_dask: bool = False, chunk_interval: Union[datetime.timedelta, int, NoneType] = None, order: Optional[str] = 'asc', limit: Optional[int] = None, fresh: bool = False, debug: bool = False, **kw: Any) -> Union[pandas.core.frame.DataFrame, Iterator[pandas.core.frame.DataFrame], NoneType]:
 23def get_data(
 24    self,
 25    select_columns: Optional[List[str]] = None,
 26    omit_columns: Optional[List[str]] = None,
 27    begin: Union[datetime, int, str, None] = None,
 28    end: Union[datetime, int, str, None] = None,
 29    params: Optional[Dict[str, Any]] = None,
 30    as_iterator: bool = False,
 31    as_chunks: bool = False,
 32    as_dask: bool = False,
 33    chunk_interval: Union[timedelta, int, None] = None,
 34    order: Optional[str] = 'asc',
 35    limit: Optional[int] = None,
 36    fresh: bool = False,
 37    debug: bool = False,
 38    **kw: Any
 39) -> Union['pd.DataFrame', Iterator['pd.DataFrame'], None]:
 40    """
 41    Get a pipe's data from the instance connector.
 42
 43    Parameters
 44    ----------
 45    select_columns: Optional[List[str]], default None
 46        If provided, only select these given columns.
 47        Otherwise select all available columns (i.e. `SELECT *`).
 48
 49    omit_columns: Optional[List[str]], default None
 50        If provided, remove these columns from the selection.
 51
 52    begin: Union[datetime, int, str, None], default None
 53        Lower bound datetime to begin searching for data (inclusive).
 54        Translates to a `WHERE` clause like `WHERE datetime >= begin`.
 55        Defaults to `None`.
 56
 57    end: Union[datetime, int, str, None], default None
 58        Upper bound datetime to stop searching for data (inclusive).
 59        Translates to a `WHERE` clause like `WHERE datetime < end`.
 60        Defaults to `None`.
 61
 62    params: Optional[Dict[str, Any]], default None
 63        Filter the retrieved data by a dictionary of parameters.
 64        See `meerschaum.utils.sql.build_where` for more details. 
 65
 66    as_iterator: bool, default False
 67        If `True`, return a generator of chunks of pipe data.
 68
 69    as_chunks: bool, default False
 70        Alias for `as_iterator`.
 71
 72    as_dask: bool, default False
 73        If `True`, return a `dask.DataFrame`
 74        (which may be loaded into a Pandas DataFrame with `df.compute()`).
 75
 76    chunk_interval: Union[timedelta, int, None], default None
 77        If `as_iterator`, then return chunks with `begin` and `end` separated by this interval.
 78        This may be set under `pipe.parameters['chunk_minutes']`.
 79        By default, use a timedelta of 1440 minutes (1 day).
 80        If `chunk_interval` is an integer and the `datetime` axis a timestamp,
 81        the use a timedelta with the number of minutes configured to this value.
 82        If the `datetime` axis is an integer, default to the configured chunksize.
 83        If `chunk_interval` is a `timedelta` and the `datetime` axis an integer,
 84        use the number of minutes in the `timedelta`.
 85
 86    order: Optional[str], default 'asc'
 87        If `order` is not `None`, sort the resulting dataframe by indices.
 88
 89    limit: Optional[int], default None
 90        If provided, cap the dataframe to this many rows.
 91
 92    fresh: bool, default True
 93        If `True`, skip local cache and directly query the instance connector.
 94        Defaults to `True`.
 95
 96    debug: bool, default False
 97        Verbosity toggle.
 98        Defaults to `False`.
 99
100    Returns
101    -------
102    A `pd.DataFrame` for the pipe's data corresponding to the provided parameters.
103
104    """
105    from meerschaum.utils.warnings import warn
106    from meerschaum.utils.venv import Venv
107    from meerschaum.connectors import get_connector_plugin
108    from meerschaum.utils.misc import iterate_chunks, items_str
109    from meerschaum.utils.dtypes import to_pandas_dtype, coerce_timezone
110    from meerschaum.utils.dataframe import add_missing_cols_to_df, df_is_chunk_generator
111    from meerschaum.utils.packages import attempt_import
112    dd = attempt_import('dask.dataframe') if as_dask else None
113    dask = attempt_import('dask') if as_dask else None
114    dateutil_parser = attempt_import('dateutil.parser')
115
116    if select_columns == '*':
117        select_columns = None
118    elif isinstance(select_columns, str):
119        select_columns = [select_columns]
120
121    if isinstance(omit_columns, str):
122        omit_columns = [omit_columns]
123
124    begin, end = self.parse_date_bounds(begin, end)
125    as_iterator = as_iterator or as_chunks
126    dt_col = self.columns.get('datetime', None)
127
128    def _sort_df(_df):
129        if df_is_chunk_generator(_df):
130            return _df
131        indices = [] if dt_col not in _df.columns else [dt_col]
132        non_dt_cols = [
133            col
134            for col_ix, col in self.columns.items()
135            if col_ix != 'datetime' and col in _df.columns
136        ]
137        indices.extend(non_dt_cols)
138        if 'dask' not in _df.__module__:
139            _df.sort_values(
140                by=indices,
141                inplace=True,
142                ascending=(str(order).lower() == 'asc'),
143            )
144            _df.reset_index(drop=True, inplace=True)
145        else:
146            _df = _df.sort_values(
147                by=indices,
148                ascending=(str(order).lower() == 'asc'),
149            )
150            _df = _df.reset_index(drop=True)
151        if limit is not None and len(_df) > limit:
152            return _df.head(limit)
153        return _df
154
155    if as_iterator or as_chunks:
156        df = self._get_data_as_iterator(
157            select_columns=select_columns,
158            omit_columns=omit_columns,
159            begin=begin,
160            end=end,
161            params=params,
162            chunk_interval=chunk_interval,
163            limit=limit,
164            order=order,
165            fresh=fresh,
166            debug=debug,
167        )
168        return _sort_df(df)
169
170    if as_dask:
171        from multiprocessing.pool import ThreadPool
172        dask_pool = ThreadPool(self.get_num_workers())
173        dask.config.set(pool=dask_pool)
174        chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug)
175        bounds = self.get_chunk_bounds(
176            begin=begin,
177            end=end,
178            bounded=False,
179            chunk_interval=chunk_interval,
180            debug=debug,
181        )
182        dask_chunks = [
183            dask.delayed(self.get_data)(
184                select_columns=select_columns,
185                omit_columns=omit_columns,
186                begin=chunk_begin,
187                end=chunk_end,
188                params=params,
189                chunk_interval=chunk_interval,
190                order=order,
191                limit=limit,
192                fresh=fresh,
193                debug=debug,
194            )
195            for (chunk_begin, chunk_end) in bounds
196        ]
197        dask_meta = {
198            col: to_pandas_dtype(typ)
199            for col, typ in self.dtypes.items()
200        }
201        return _sort_df(dd.from_delayed(dask_chunks, meta=dask_meta))
202
203    if not self.exists(debug=debug):
204        return None
205
206    if self.cache_pipe is not None:
207        if not fresh:
208            _sync_cache_tuple = self.cache_pipe.sync(
209                begin=begin,
210                end=end,
211                params=params,
212                debug=debug,
213                **kw
214            )
215            if not _sync_cache_tuple[0]:
216                warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1])
217                fresh = True
218            else: ### Successfully synced cache.
219                return self.enforce_dtypes(
220                    self.cache_pipe.get_data(
221                        select_columns=select_columns,
222                        omit_columns=omit_columns,
223                        begin=begin,
224                        end=end,
225                        params=params,
226                        order=order,
227                        limit=limit,
228                        debug=debug,
229                        fresh=True,
230                        **kw
231                    ),
232                    debug=debug,
233                )
234
235    with Venv(get_connector_plugin(self.instance_connector)):
236        df = self.instance_connector.get_pipe_data(
237            pipe=self,
238            select_columns=select_columns,
239            omit_columns=omit_columns,
240            begin=begin,
241            end=end,
242            params=params,
243            limit=limit,
244            order=order,
245            debug=debug,
246            **kw
247        )
248        if df is None:
249            return df
250
251        if not select_columns:
252            select_columns = [col for col in df.columns]
253
254        cols_to_omit = [
255            col
256            for col in df.columns
257            if (
258                col in (omit_columns or [])
259                or
260                col not in (select_columns or [])
261            )
262        ]
263        cols_to_add = [
264            col
265            for col in select_columns
266            if col not in df.columns
267        ]
268        if cols_to_omit:
269            warn(
270                (
271                    f"Received {len(cols_to_omit)} omitted column"
272                    + ('s' if len(cols_to_omit) != 1 else '')
273                    + f" for {self}. "
274                    + "Consider adding `select_columns` and `omit_columns` support to "
275                    + f"'{self.instance_connector.type}' connectors to improve performance."
276                ),
277                stack=False,
278            )
279            _cols_to_select = [col for col in df.columns if col not in cols_to_omit]
280            df = df[_cols_to_select]
281
282        if cols_to_add:
283            warn(
284                (
285                    f"Specified columns {items_str(cols_to_add)} were not found on {self}. "
286                    + "Adding these to the DataFrame as null columns."
287                ),
288                stack=False,
289            )
290            df = add_missing_cols_to_df(df, {col: 'string' for col in cols_to_add})
291
292        enforced_df = self.enforce_dtypes(df, debug=debug)
293
294        if order:
295            return _sort_df(enforced_df)
296        return enforced_df

Get a pipe's data from the instance connector.

Parameters
  • select_columns (Optional[List[str]], default None): If provided, only select these given columns. Otherwise select all available columns (i.e. SELECT *).
  • omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
  • begin (Union[datetime, int, str, None], default None): Lower bound datetime to begin searching for data (inclusive). Translates to a WHERE clause like WHERE datetime >= begin. Defaults to None.
  • end (Union[datetime, int, str, None], default None): Upper bound datetime to stop searching for data (inclusive). Translates to a WHERE clause like WHERE datetime < end. Defaults to None.
  • params (Optional[Dict[str, Any]], default None): Filter the retrieved data by a dictionary of parameters. See meerschaum.utils.sql.build_where for more details.
  • as_iterator (bool, default False): If True, return a generator of chunks of pipe data.
  • as_chunks (bool, default False): Alias for as_iterator.
  • as_dask (bool, default False): If True, return a dask.DataFrame (which may be loaded into a Pandas DataFrame with df.compute()).
  • chunk_interval (Union[timedelta, int, None], default None): If as_iterator, then return chunks with begin and end separated by this interval. This may be set under pipe.parameters['chunk_minutes']. By default, use a timedelta of 1440 minutes (1 day). If chunk_interval is an integer and the datetime axis a timestamp, the use a timedelta with the number of minutes configured to this value. If the datetime axis is an integer, default to the configured chunksize. If chunk_interval is a timedelta and the datetime axis an integer, use the number of minutes in the timedelta.
  • order (Optional[str], default 'asc'): If order is not None, sort the resulting dataframe by indices.
  • limit (Optional[int], default None): If provided, cap the dataframe to this many rows.
  • fresh (bool, default True): If True, skip local cache and directly query the instance connector. Defaults to True.
  • debug (bool, default False): Verbosity toggle. Defaults to False.
Returns
  • A pd.DataFrame for the pipe's data corresponding to the provided parameters.
def get_backtrack_data( self, backtrack_minutes: Optional[int] = None, begin: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, limit: Optional[int] = None, fresh: bool = False, debug: bool = False, **kw: Any) -> Optional[pandas.core.frame.DataFrame]:
388def get_backtrack_data(
389    self,
390    backtrack_minutes: Optional[int] = None,
391    begin: Union[datetime, int, None] = None,
392    params: Optional[Dict[str, Any]] = None,
393    limit: Optional[int] = None,
394    fresh: bool = False,
395    debug: bool = False,
396    **kw: Any
397) -> Optional['pd.DataFrame']:
398    """
399    Get the most recent data from the instance connector as a Pandas DataFrame.
400
401    Parameters
402    ----------
403    backtrack_minutes: Optional[int], default None
404        How many minutes from `begin` to select from.
405        If `None`, use `pipe.parameters['fetch']['backtrack_minutes']`.
406
407    begin: Optional[datetime], default None
408        The starting point to search for data.
409        If begin is `None` (default), use the most recent observed datetime
410        (AKA sync_time).
411
412        ```
413        E.g. begin = 02:00
414
415        Search this region.           Ignore this, even if there's data.
416        /  /  /  /  /  /  /  /  /  |
417        -----|----------|----------|----------|----------|----------|
418        00:00      01:00      02:00      03:00      04:00      05:00
419
420        ```
421
422    params: Optional[Dict[str, Any]], default None
423        The standard Meerschaum `params` query dictionary.
424
425    limit: Optional[int], default None
426        If provided, cap the number of rows to be returned.
427
428    fresh: bool, default False
429        If `True`, Ignore local cache and pull directly from the instance connector.
430        Only comes into effect if a pipe was created with `cache=True`.
431
432    debug: bool default False
433        Verbosity toggle.
434
435    Returns
436    -------
437    A `pd.DataFrame` for the pipe's data corresponding to the provided parameters. Backtrack data
438    is a convenient way to get a pipe's data "backtracked" from the most recent datetime.
439    """
440    from meerschaum.utils.warnings import warn
441    from meerschaum.utils.venv import Venv
442    from meerschaum.connectors import get_connector_plugin
443
444    if not self.exists(debug=debug):
445        return None
446
447    begin = self.parse_date_bounds(begin)
448
449    backtrack_interval = self.get_backtrack_interval(debug=debug)
450    if backtrack_minutes is None:
451        backtrack_minutes = (
452            (backtrack_interval.total_seconds() / 60)
453            if isinstance(backtrack_interval, timedelta)
454            else backtrack_interval
455        )
456
457    if self.cache_pipe is not None:
458        if not fresh:
459            _sync_cache_tuple = self.cache_pipe.sync(begin=begin, params=params, debug=debug, **kw)
460            if not _sync_cache_tuple[0]:
461                warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1])
462                fresh = True
463            else: ### Successfully synced cache.
464                return self.enforce_dtypes(
465                    self.cache_pipe.get_backtrack_data(
466                        fresh=True,
467                        begin=begin,
468                        backtrack_minutes=backtrack_minutes,
469                        params=params,
470                        limit=limit,
471                        order=kw.get('order', 'desc'),
472                        debug=debug,
473                        **kw
474                    ),
475                    debug=debug,
476                )
477
478    if hasattr(self.instance_connector, 'get_backtrack_data'):
479        with Venv(get_connector_plugin(self.instance_connector)):
480            return self.enforce_dtypes(
481                self.instance_connector.get_backtrack_data(
482                    pipe=self,
483                    begin=begin,
484                    backtrack_minutes=backtrack_minutes,
485                    params=params,
486                    limit=limit,
487                    debug=debug,
488                    **kw
489                ),
490                debug=debug,
491            )
492
493    if begin is None:
494        begin = self.get_sync_time(params=params, debug=debug)
495
496    backtrack_interval = (
497        timedelta(minutes=backtrack_minutes)
498        if isinstance(begin, datetime)
499        else backtrack_minutes
500    )
501    if begin is not None:
502        begin = begin - backtrack_interval
503
504    return self.get_data(
505        begin=begin,
506        params=params,
507        debug=debug,
508        limit=limit,
509        order=kw.get('order', 'desc'),
510        **kw
511    )

Get the most recent data from the instance connector as a Pandas DataFrame.

Parameters
  • backtrack_minutes (Optional[int], default None): How many minutes from begin to select from. If None, use pipe.parameters['fetch']['backtrack_minutes'].
  • begin (Optional[datetime], default None): The starting point to search for data. If begin is None (default), use the most recent observed datetime (AKA sync_time).

    E.g. begin = 02:00
    
    Search this region.           Ignore this, even if there's data.
    /  /  /  /  /  /  /  /  /  |
    -----|----------|----------|----------|----------|----------|
    00:00      01:00      02:00      03:00      04:00      05:00
    
    
  • params (Optional[Dict[str, Any]], default None): The standard Meerschaum params query dictionary.

  • limit (Optional[int], default None): If provided, cap the number of rows to be returned.
  • fresh (bool, default False): If True, Ignore local cache and pull directly from the instance connector. Only comes into effect if a pipe was created with cache=True.
  • debug (bool default False): Verbosity toggle.
Returns
  • A pd.DataFrame for the pipe's data corresponding to the provided parameters. Backtrack data
  • is a convenient way to get a pipe's data "backtracked" from the most recent datetime.
def get_rowcount( self, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, remote: bool = False, debug: bool = False) -> int:
514def get_rowcount(
515    self,
516    begin: Union[datetime, int, None] = None,
517    end: Union[datetime, int, None] = None,
518    params: Optional[Dict[str, Any]] = None,
519    remote: bool = False,
520    debug: bool = False
521) -> int:
522    """
523    Get a Pipe's instance or remote rowcount.
524
525    Parameters
526    ----------
527    begin: Optional[datetime], default None
528        Count rows where datetime > begin.
529
530    end: Optional[datetime], default None
531        Count rows where datetime < end.
532
533    remote: bool, default False
534        Count rows from a pipe's remote source.
535        **NOTE**: This is experimental!
536
537    debug: bool, default False
538        Verbosity toggle.
539
540    Returns
541    -------
542    An `int` of the number of rows in the pipe corresponding to the provided parameters.
543    Returned 0 if the pipe does not exist.
544    """
545    from meerschaum.utils.warnings import warn
546    from meerschaum.utils.venv import Venv
547    from meerschaum.connectors import get_connector_plugin
548
549    begin, end = self.parse_date_bounds(begin, end)
550    connector = self.instance_connector if not remote else self.connector
551    try:
552        with Venv(get_connector_plugin(connector)):
553            rowcount = connector.get_pipe_rowcount(
554                self,
555                begin=begin,
556                end=end,
557                params=params,
558                remote=remote,
559                debug=debug,
560            )
561            if rowcount is None:
562                return 0
563            return rowcount
564    except AttributeError as e:
565        warn(e)
566        if remote:
567            return 0
568    warn(f"Failed to get a rowcount for {self}.")
569    return 0

Get a Pipe's instance or remote rowcount.

Parameters
  • begin (Optional[datetime], default None): Count rows where datetime > begin.
  • end (Optional[datetime], default None): Count rows where datetime < end.
  • remote (bool, default False): Count rows from a pipe's remote source. NOTE: This is experimental!
  • debug (bool, default False): Verbosity toggle.
Returns
  • An int of the number of rows in the pipe corresponding to the provided parameters.
  • Returned 0 if the pipe does not exist.
def get_chunk_interval( self, chunk_interval: Union[datetime.timedelta, int, NoneType] = None, debug: bool = False) -> Union[datetime.timedelta, int]:
572def get_chunk_interval(
573    self,
574    chunk_interval: Union[timedelta, int, None] = None,
575    debug: bool = False,
576) -> Union[timedelta, int]:
577    """
578    Get the chunk interval to use for this pipe.
579
580    Parameters
581    ----------
582    chunk_interval: Union[timedelta, int, None], default None
583        If provided, coerce this value into the correct type.
584        For example, if the datetime axis is an integer, then
585        return the number of minutes.
586
587    Returns
588    -------
589    The chunk interval (`timedelta` or `int`) to use with this pipe's `datetime` axis.
590    """
591    default_chunk_minutes = get_config('pipes', 'parameters', 'verify', 'chunk_minutes')
592    configured_chunk_minutes = self.parameters.get('verify', {}).get('chunk_minutes', None)
593    chunk_minutes = (
594        (configured_chunk_minutes or default_chunk_minutes)
595        if chunk_interval is None
596        else (
597            chunk_interval
598            if isinstance(chunk_interval, int)
599            else int(chunk_interval.total_seconds() / 60)
600        )
601    )
602
603    dt_col = self.columns.get('datetime', None)
604    if dt_col is None:
605        return timedelta(minutes=chunk_minutes)
606
607    dt_dtype = self.dtypes.get(dt_col, 'datetime64[ns, UTC]')
608    if 'int' in dt_dtype.lower():
609        return chunk_minutes
610    return timedelta(minutes=chunk_minutes)

Get the chunk interval to use for this pipe.

Parameters
  • chunk_interval (Union[timedelta, int, None], default None): If provided, coerce this value into the correct type. For example, if the datetime axis is an integer, then return the number of minutes.
Returns
  • The chunk interval (timedelta or int) to use with this pipe's datetime axis.
def get_chunk_bounds( self, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, bounded: bool = False, chunk_interval: Union[datetime.timedelta, int, NoneType] = None, debug: bool = False) -> List[Tuple[Union[datetime.datetime, int, NoneType], Union[datetime.datetime, int, NoneType]]]:
613def get_chunk_bounds(
614    self,
615    begin: Union[datetime, int, None] = None,
616    end: Union[datetime, int, None] = None,
617    bounded: bool = False,
618    chunk_interval: Union[timedelta, int, None] = None,
619    debug: bool = False,
620) -> List[
621    Tuple[
622        Union[datetime, int, None],
623        Union[datetime, int, None],
624    ]
625]:
626    """
627    Return a list of datetime bounds for iterating over the pipe's `datetime` axis.
628
629    Parameters
630    ----------
631    begin: Union[datetime, int, None], default None
632        If provided, do not select less than this value.
633        Otherwise the first chunk will be unbounded.
634
635    end: Union[datetime, int, None], default None
636        If provided, do not select greater than or equal to this value.
637        Otherwise the last chunk will be unbounded.
638
639    bounded: bool, default False
640        If `True`, do not include `None` in the first chunk.
641
642    chunk_interval: Union[timedelta, int, None], default None
643        If provided, use this interval for the size of chunk boundaries.
644        The default value for this pipe may be set
645        under `pipe.parameters['verify']['chunk_minutes']`.
646
647    debug: bool, default False
648        Verbosity toggle.
649
650    Returns
651    -------
652    A list of chunk bounds (datetimes or integers).
653    If unbounded, the first and last chunks will include `None`.
654    """
655    include_less_than_begin = not bounded and begin is None
656    include_greater_than_end = not bounded and end is None
657    if begin is None:
658        begin = self.get_sync_time(newest=False, debug=debug)
659    if end is None:
660        end = self.get_sync_time(newest=True, debug=debug)
661    if begin is None and end is None:
662        return [(None, None)]
663
664    begin, end = self.parse_date_bounds(begin, end)
665
666    ### Set the chunk interval under `pipe.parameters['verify']['chunk_minutes']`.
667    chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug)
668    
669    ### Build a list of tuples containing the chunk boundaries
670    ### so that we can sync multiple chunks in parallel.
671    ### Run `verify pipes --workers 1` to sync chunks in series.
672    chunk_bounds = []
673    begin_cursor = begin
674    while begin_cursor < end:
675        end_cursor = begin_cursor + chunk_interval
676        chunk_bounds.append((begin_cursor, end_cursor))
677        begin_cursor = end_cursor
678
679    ### The chunk interval might be too large.
680    if not chunk_bounds and end >= begin:
681        chunk_bounds = [(begin, end)]
682
683    ### Truncate the last chunk to the end timestamp.
684    if chunk_bounds[-1][1] > end:
685        chunk_bounds[-1] = (chunk_bounds[-1][0], end)
686
687    ### Pop the last chunk if its bounds are equal.
688    if chunk_bounds[-1][0] == chunk_bounds[-1][1]:
689        chunk_bounds = chunk_bounds[:-1]
690
691    if include_less_than_begin:
692        chunk_bounds = [(None, begin)] + chunk_bounds
693    if include_greater_than_end:
694        chunk_bounds = chunk_bounds + [(end, None)]
695
696    return chunk_bounds

Return a list of datetime bounds for iterating over the pipe's datetime axis.

Parameters
  • begin (Union[datetime, int, None], default None): If provided, do not select less than this value. Otherwise the first chunk will be unbounded.
  • end (Union[datetime, int, None], default None): If provided, do not select greater than or equal to this value. Otherwise the last chunk will be unbounded.
  • bounded (bool, default False): If True, do not include None in the first chunk.
  • chunk_interval (Union[timedelta, int, None], default None): If provided, use this interval for the size of chunk boundaries. The default value for this pipe may be set under pipe.parameters['verify']['chunk_minutes'].
  • debug (bool, default False): Verbosity toggle.
Returns
  • A list of chunk bounds (datetimes or integers).
  • If unbounded, the first and last chunks will include None.
def parse_date_bounds( self, *dt_vals: Union[datetime.datetime, int, NoneType]) -> Union[datetime.datetime, int, str, NoneType, Tuple[Union[datetime.datetime, int, str, NoneType]]]:
699def parse_date_bounds(self, *dt_vals: Union[datetime, int, None]) -> Union[
700    datetime,
701    int,
702    str,
703    None,
704    Tuple[Union[datetime, int, str, None]]
705]:
706    """
707    Given a date bound (begin, end), coerce a timezone if necessary.
708    """
709    from meerschaum.utils.misc import is_int
710    from meerschaum.utils.dtypes import coerce_timezone
711    from meerschaum.utils.warnings import warn
712    dateutil_parser = mrsm.attempt_import('dateutil.parser')
713
714    def _parse_date_bound(dt_val):
715        if dt_val is None:
716            return None
717
718        if isinstance(dt_val, int):
719            return dt_val
720
721        if dt_val == '':
722            return ''
723
724        if is_int(dt_val):
725            return int(dt_val)
726
727        if isinstance(dt_val, str):
728            try:
729                dt_val = dateutil_parser.parse(dt_val)
730            except Exception as e:
731                warn(f"Could not parse '{dt_val}' as datetime:\n{e}")
732                return None
733
734        dt_col = self.columns.get('datetime', None)
735        dt_typ = str(self.dtypes.get(dt_col, 'datetime64[ns, UTC]'))
736        if dt_typ == 'datetime':
737            dt_typ = 'datetime64[ns, UTC]'
738        return coerce_timezone(dt_val, strip_utc=('utc' not in dt_typ.lower()))
739
740    bounds = tuple(_parse_date_bound(dt_val) for dt_val in dt_vals)
741    if len(bounds) == 1:
742        return bounds[0]
743    return bounds

Given a date bound (begin, end), coerce a timezone if necessary.

def register(self, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
12def register(
13        self,
14        debug: bool = False,
15        **kw: Any
16    ) -> SuccessTuple:
17    """
18    Register a new Pipe along with its attributes.
19
20    Parameters
21    ----------
22    debug: bool, default False
23        Verbosity toggle.
24
25    kw: Any
26        Keyword arguments to pass to `instance_connector.register_pipe()`.
27
28    Returns
29    -------
30    A `SuccessTuple` of success, message.
31    """
32    if self.temporary:
33        return False, "Cannot register pipes created with `temporary=True` (read-only)."
34
35    from meerschaum.utils.formatting import get_console
36    from meerschaum.utils.venv import Venv
37    from meerschaum.connectors import get_connector_plugin, custom_types
38    from meerschaum.config._patch import apply_patch_to_config
39
40    import warnings
41    with warnings.catch_warnings():
42        warnings.simplefilter('ignore')
43        try:
44            _conn = self.connector
45        except Exception as e:
46            _conn = None
47
48    if (
49        _conn is not None
50        and
51        (_conn.type == 'plugin' or _conn.type in custom_types)
52        and
53        getattr(_conn, 'register', None) is not None
54    ):
55        try:
56            with Venv(get_connector_plugin(_conn), debug=debug):
57                params = self.connector.register(self)
58        except Exception as e:
59            get_console().print_exception()
60            params = None
61        params = {} if params is None else params
62        if not isinstance(params, dict):
63            from meerschaum.utils.warnings import warn
64            warn(
65                f"Invalid parameters returned from `register()` in connector {self.connector}:\n"
66                + f"{params}"
67            )
68        else:
69            self.parameters = apply_patch_to_config(params, self.parameters)
70
71    if not self.parameters:
72        cols = self.columns if self.columns else {'datetime': None, 'id': None}
73        self.parameters = {
74            'columns': cols,
75        }
76
77    with Venv(get_connector_plugin(self.instance_connector)):
78        return self.instance_connector.register_pipe(self, debug=debug, **kw)

Register a new Pipe along with its attributes.

Parameters
  • debug (bool, default False): Verbosity toggle.
  • kw (Any): Keyword arguments to pass to instance_connector.register_pipe().
Returns
attributes: Dict[str, Any]
19@property
20def attributes(self) -> Dict[str, Any]:
21    """
22    Return a dictionary of a pipe's keys and parameters.
23    These values are reflected directly from the pipes table of the instance.
24    """
25    import time
26    from meerschaum.config import get_config
27    from meerschaum.config._patch import apply_patch_to_config
28    from meerschaum.utils.venv import Venv
29    from meerschaum.connectors import get_connector_plugin
30
31    timeout_seconds = get_config('pipes', 'attributes', 'local_cache_timeout_seconds')
32
33    if '_attributes' not in self.__dict__:
34        self._attributes = {}
35
36    now = time.perf_counter()
37    last_refresh = self.__dict__.get('_attributes_sync_time', None)
38    timed_out = (
39        last_refresh is None
40        or
41        (timeout_seconds is not None and (now - last_refresh) >= timeout_seconds)
42    )
43    if not self.temporary and timed_out:
44        self._attributes_sync_time = now
45        local_attributes = self.__dict__.get('_attributes', {})
46        with Venv(get_connector_plugin(self.instance_connector)):
47            instance_attributes = self.instance_connector.get_pipe_attributes(self)
48        self._attributes = apply_patch_to_config(instance_attributes, local_attributes)
49    return self._attributes

Return a dictionary of a pipe's keys and parameters. These values are reflected directly from the pipes table of the instance.

parameters: Optional[Dict[str, Any]]
52@property
53def parameters(self) -> Optional[Dict[str, Any]]:
54    """
55    Return the parameters dictionary of the pipe.
56    """
57    if 'parameters' not in self.attributes:
58        self.attributes['parameters'] = {}
59    _parameters = self.attributes['parameters']
60    dt_col = _parameters.get('columns', {}).get('datetime', None)
61    dt_typ = _parameters.get('dtypes', {}).get(dt_col, None) if dt_col else None
62    if dt_col and not dt_typ:
63        if 'dtypes' not in _parameters:
64            self.attributes['parameters']['dtypes'] = {}
65        self.attributes['parameters']['dtypes'][dt_col] = 'datetime'
66    return self.attributes['parameters']

Return the parameters dictionary of the pipe.

columns: Optional[Dict[str, str]]
78@property
79def columns(self) -> Union[Dict[str, str], None]:
80    """
81    Return the `columns` dictionary defined in `meerschaum.Pipe.parameters`.
82    """
83    if 'columns' not in self.parameters:
84        self.parameters['columns'] = {}
85    cols = self.parameters['columns']
86    if not isinstance(cols, dict):
87        cols = {}
88        self.parameters['columns'] = cols
89    return {col_ix: col for col_ix, col in cols.items() if col}

Return the columns dictionary defined in meerschaum.Pipe.parameters.

indices: Optional[Dict[str, Union[str, List[str]]]]
106@property
107def indices(self) -> Union[Dict[str, Union[str, List[str]]], None]:
108    """
109    Return the `indices` dictionary defined in `meerschaum.Pipe.parameters`.
110    """
111    indices_key = (
112        'indexes'
113        if 'indexes' in self.parameters
114        else 'indices'
115    )
116    if indices_key not in self.parameters:
117        self.parameters[indices_key] = {}
118    _indices = self.parameters[indices_key]
119    _columns = self.columns
120    dt_col = _columns.get('datetime', None)
121    if not isinstance(_indices, dict):
122        _indices = {}
123        self.parameters[indices_key] = _indices
124    unique_cols = list(set((
125        [dt_col]
126        if dt_col
127        else []
128    ) + [
129        col
130        for col_ix, col in _columns.items()
131        if col and col_ix != 'datetime'
132    ]))
133    return {
134        **({'unique': unique_cols} if len(unique_cols) > 1 else {}),
135        **{col_ix: col for col_ix, col in _columns.items() if col},
136        **_indices
137    }

Return the indices dictionary defined in meerschaum.Pipe.parameters.

indexes: Optional[Dict[str, Union[str, List[str]]]]
140@property
141def indexes(self) -> Union[Dict[str, Union[str, List[str]]], None]:
142    """
143    Alias for `meerschaum.Pipe.indices`.
144    """
145    return self.indices
dtypes: Optional[Dict[str, Any]]
198@property
199def dtypes(self) -> Union[Dict[str, Any], None]:
200    """
201    If defined, return the `dtypes` dictionary defined in `meerschaum.Pipe.parameters`.
202    """
203    from meerschaum.config._patch import apply_patch_to_config
204    from meerschaum.utils.dtypes import MRSM_ALIAS_DTYPES
205    configured_dtypes = self.parameters.get('dtypes', {})
206    remote_dtypes = self.infer_dtypes(persist=False)
207    patched_dtypes = apply_patch_to_config(remote_dtypes, configured_dtypes)
208    return {
209        col: MRSM_ALIAS_DTYPES.get(typ, typ)
210        for col, typ in patched_dtypes.items()
211        if col and typ
212    }

If defined, return the dtypes dictionary defined in meerschaum.Pipe.parameters.

autoincrement: bool
260@property
261def autoincrement(self) -> bool:
262    """
263    Return the `autoincrement` parameter for the pipe.
264    """
265    if 'autoincrement' not in self.parameters:
266        self.parameters['autoincrement'] = False
267
268    return self.parameters['autoincrement']

Return the autoincrement parameter for the pipe.

upsert: bool
224@property
225def upsert(self) -> bool:
226    """
227    Return whether `upsert` is set for the pipe.
228    """
229    if 'upsert' not in self.parameters:
230        self.parameters['upsert'] = False
231    return self.parameters['upsert']

Return whether upsert is set for the pipe.

static: bool
242@property
243def static(self) -> bool:
244    """
245    Return whether `static` is set for the pipe.
246    """
247    if 'static' not in self.parameters:
248        self.parameters['static'] = False
249    return self.parameters['static']

Return whether static is set for the pipe.

tzinfo: Optional[datetime.timezone]
279@property
280def tzinfo(self) -> Union[None, timezone]:
281    """
282    Return `timezone.utc` if the pipe is timezone-aware.
283    """
284    dt_col = self.columns.get('datetime', None)
285    if not dt_col:
286        return None
287
288    dt_typ = str(self.dtypes.get(dt_col, 'datetime64[ns, UTC]'))
289    if 'utc' in dt_typ.lower() or dt_typ == 'datetime':
290        return timezone.utc
291
292    if dt_typ == 'datetime64[ns]':
293        return None
294
295    return None

Return timezone.utc if the pipe is timezone-aware.

enforce: bool
298@property
299def enforce(self) -> bool:
300    """
301    Return the `enforce` parameter for the pipe.
302    """
303    if 'enforce' not in self.parameters:
304        self.parameters['enforce'] = True
305
306    return self.parameters['enforce']

Return the enforce parameter for the pipe.

null_indices: bool
317@property
318def null_indices(self) -> bool:
319    """
320    Return the `null_indices` parameter for the pipe.
321    """
322    if 'null_indices' not in self.parameters:
323        self.parameters['null_indices'] = True
324
325    return self.parameters['null_indices']

Return the null_indices parameter for the pipe.

def get_columns(self, *args: str, error: bool = False) -> Union[str, Tuple[str]]:
336def get_columns(self, *args: str, error: bool = False) -> Union[str, Tuple[str]]:
337    """
338    Check if the requested columns are defined.
339
340    Parameters
341    ----------
342    *args: str
343        The column names to be retrieved.
344
345    error: bool, default False
346        If `True`, raise an `Exception` if the specified column is not defined.
347
348    Returns
349    -------
350    A tuple of the same size of `args` or a `str` if `args` is a single argument.
351
352    Examples
353    --------
354    >>> pipe = mrsm.Pipe('test', 'test')
355    >>> pipe.columns = {'datetime': 'dt', 'id': 'id'}
356    >>> pipe.get_columns('datetime', 'id')
357    ('dt', 'id')
358    >>> pipe.get_columns('value', error=True)
359    Exception:  🛑 Missing 'value' column for Pipe('test', 'test').
360    """
361    from meerschaum.utils.warnings import error as _error, warn
362    if not args:
363        args = tuple(self.columns.keys())
364    col_names = []
365    for col in args:
366        col_name = None
367        try:
368            col_name = self.columns[col]
369            if col_name is None and error:
370                _error(f"Please define the name of the '{col}' column for {self}.")
371        except Exception as e:
372            col_name = None
373        if col_name is None and error:
374            _error(f"Missing '{col}'" + f" column for {self}.")
375        col_names.append(col_name)
376    if len(col_names) == 1:
377        return col_names[0]
378    return tuple(col_names)

Check if the requested columns are defined.

Parameters
  • *args (str): The column names to be retrieved.
  • error (bool, default False): If True, raise an Exception if the specified column is not defined.
Returns
  • A tuple of the same size of args or a str if args is a single argument.
Examples
>>> pipe = mrsm.Pipe('test', 'test')
>>> pipe.columns = {'datetime': 'dt', 'id': 'id'}
>>> pipe.get_columns('datetime', 'id')
('dt', 'id')
>>> pipe.get_columns('value', error=True)
Exception:  🛑 Missing 'value' column for Pipe('test', 'test').
def get_columns_types( self, refresh: bool = False, debug: bool = False) -> Optional[Dict[str, str]]:
381def get_columns_types(
382    self,
383    refresh: bool = False,
384    debug: bool = False,
385) -> Union[Dict[str, str], None]:
386    """
387    Get a dictionary of a pipe's column names and their types.
388
389    Parameters
390    ----------
391    refresh: bool, default False
392        If `True`, invalidate the cache and fetch directly from the instance connector.
393
394    debug: bool, default False:
395        Verbosity toggle.
396
397    Returns
398    -------
399    A dictionary of column names (`str`) to column types (`str`).
400
401    Examples
402    --------
403    >>> pipe.get_columns_types()
404    {
405      'dt': 'TIMESTAMP WITH TIMEZONE',
406      'id': 'BIGINT',
407      'val': 'DOUBLE PRECISION',
408    }
409    >>>
410    """
411    import time
412    from meerschaum.connectors import get_connector_plugin
413    from meerschaum.config.static import STATIC_CONFIG
414    from meerschaum.utils.warnings import dprint
415
416    now = time.perf_counter()
417    cache_seconds = STATIC_CONFIG['pipes']['static_schema_cache_seconds']
418    if not self.static:
419        refresh = True
420    if refresh:
421        _ = self.__dict__.pop('_columns_types_timestamp', None)
422        _ = self.__dict__.pop('_columns_types', None)
423    _columns_types = self.__dict__.get('_columns_types', None)
424    if _columns_types:
425        columns_types_timestamp = self.__dict__.get('_columns_types_timestamp', None)
426        if columns_types_timestamp is not None:
427            delta = now - columns_types_timestamp
428            if delta < cache_seconds:
429                if debug:
430                    dprint(
431                        f"Returning cached `columns_types` for {self} "
432                        f"({round(delta, 2)} seconds old)."
433                    )
434                return _columns_types
435
436    with mrsm.Venv(get_connector_plugin(self.instance_connector)):
437        _columns_types = (
438            self.instance_connector.get_pipe_columns_types(self, debug=debug)
439            if hasattr(self.instance_connector, 'get_pipe_columns_types')
440            else None
441        )
442
443    self.__dict__['_columns_types'] = _columns_types
444    self.__dict__['_columns_types_timestamp'] = now
445    return _columns_types or {}

Get a dictionary of a pipe's column names and their types.

Parameters
  • refresh (bool, default False): If True, invalidate the cache and fetch directly from the instance connector.
  • debug (bool, default False:): Verbosity toggle.
Returns
  • A dictionary of column names (str) to column types (str).
Examples
>>> pipe.get_columns_types()
{
  'dt': 'TIMESTAMP WITH TIMEZONE',
  'id': 'BIGINT',
  'val': 'DOUBLE PRECISION',
}
>>>
def get_columns_indices( self, debug: bool = False, refresh: bool = False) -> Dict[str, List[Dict[str, str]]]:
448def get_columns_indices(
449    self,
450    debug: bool = False,
451    refresh: bool = False,
452) -> Dict[str, List[Dict[str, str]]]:
453    """
454    Return a dictionary mapping columns to index information.
455    """
456    import time
457    from meerschaum.connectors import get_connector_plugin
458    from meerschaum.config.static import STATIC_CONFIG
459    from meerschaum.utils.warnings import dprint
460
461    now = time.perf_counter()
462    cache_seconds = (
463        STATIC_CONFIG['pipes']['static_schema_cache_seconds']
464        if self.static
465        else STATIC_CONFIG['pipes']['exists_timeout_seconds']
466    )
467    if refresh:
468        _ = self.__dict__.pop('_columns_indices_timestamp', None)
469        _ = self.__dict__.pop('_columns_indices', None)
470    _columns_indices = self.__dict__.get('_columns_indices', None)
471    if _columns_indices:
472        columns_indices_timestamp = self.__dict__.get('_columns_indices_timestamp', None)
473        if columns_indices_timestamp is not None:
474            delta = now - columns_indices_timestamp
475            if delta < cache_seconds:
476                if debug:
477                    dprint(
478                        f"Returning cached `columns_indices` for {self} "
479                        f"({round(delta, 2)} seconds old)."
480                    )
481                return _columns_indices
482
483    with mrsm.Venv(get_connector_plugin(self.instance_connector)):
484        _columns_indices = (
485            self.instance_connector.get_pipe_columns_indices(self, debug=debug)
486            if hasattr(self.instance_connector, 'get_pipe_columns_indices')
487            else None
488        )
489
490    self.__dict__['_columns_indices'] = _columns_indices
491    self.__dict__['_columns_indices_timestamp'] = now
492    return {k: v for k, v in _columns_indices.items() if k and v} or {}

Return a dictionary mapping columns to index information.

def get_indices(self) -> Dict[str, str]:
732def get_indices(self) -> Dict[str, str]:
733    """
734    Return a dictionary mapping index keys to their names in the database.
735
736    Returns
737    -------
738    A dictionary of index keys to index names.
739    """
740    from meerschaum.connectors import get_connector_plugin
741    with mrsm.Venv(get_connector_plugin(self.instance_connector)):
742        if hasattr(self.instance_connector, 'get_pipe_index_names'):
743            result = self.instance_connector.get_pipe_index_names(self)
744        else:
745            result = {}
746    
747    return result

Return a dictionary mapping index keys to their names in the database.

Returns
  • A dictionary of index keys to index names.
tags: Optional[List[str]]
173@property
174def tags(self) -> Union[List[str], None]:
175    """
176    If defined, return the `tags` list defined in `meerschaum.Pipe.parameters`.
177    """
178    if 'tags' not in self.parameters:
179        self.parameters['tags'] = []
180    return self.parameters['tags']

If defined, return the tags list defined in meerschaum.Pipe.parameters.

def get_id(self, **kw: Any) -> Optional[int]:
495def get_id(self, **kw: Any) -> Union[int, None]:
496    """
497    Fetch a pipe's ID from its instance connector.
498    If the pipe does not exist, return `None`.
499    """
500    if self.temporary:
501        return None
502    from meerschaum.utils.venv import Venv
503    from meerschaum.connectors import get_connector_plugin
504
505    with Venv(get_connector_plugin(self.instance_connector)):
506        if hasattr(self.instance_connector, 'get_pipe_id'):
507            return self.instance_connector.get_pipe_id(self, **kw)
508
509    return None

Fetch a pipe's ID from its instance connector. If the pipe does not exist, return None.

id: Optional[int]
512@property
513def id(self) -> Union[int, None]:
514    """
515    Fetch and cache a pipe's ID.
516    """
517    if not ('_id' in self.__dict__ and self._id):
518        self._id = self.get_id()
519    return self._id

Fetch and cache a pipe's ID.

def get_val_column(self, debug: bool = False) -> Optional[str]:
522def get_val_column(self, debug: bool = False) -> Union[str, None]:
523    """
524    Return the name of the value column if it's defined, otherwise make an educated guess.
525    If not set in the `columns` dictionary, return the first numeric column that is not
526    an ID or datetime column.
527    If none may be found, return `None`.
528
529    Parameters
530    ----------
531    debug: bool, default False:
532        Verbosity toggle.
533
534    Returns
535    -------
536    Either a string or `None`.
537    """
538    from meerschaum.utils.debug import dprint
539    if debug:
540        dprint('Attempting to determine the value column...')
541    try:
542        val_name = self.get_columns('value')
543    except Exception as e:
544        val_name = None
545    if val_name is not None:
546        if debug:
547            dprint(f"Value column: {val_name}")
548        return val_name
549
550    cols = self.columns
551    if cols is None:
552        if debug:
553            dprint('No columns could be determined. Returning...')
554        return None
555    try:
556        dt_name = self.get_columns('datetime', error=False)
557    except Exception as e:
558        dt_name = None
559    try:
560        id_name = self.get_columns('id', errors=False)
561    except Exception as e:
562        id_name = None
563
564    if debug:
565        dprint(f"dt_name: {dt_name}")
566        dprint(f"id_name: {id_name}")
567
568    cols_types = self.get_columns_types(debug=debug)
569    if cols_types is None:
570        return None
571    if debug:
572        dprint(f"cols_types: {cols_types}")
573    if dt_name is not None:
574        cols_types.pop(dt_name, None)
575    if id_name is not None:
576        cols_types.pop(id_name, None)
577
578    candidates = []
579    candidate_keywords = {'float', 'double', 'precision', 'int', 'numeric',}
580    for search_term in candidate_keywords:
581        for col, typ in cols_types.items():
582            if search_term in typ.lower():
583                candidates.append(col)
584                break
585    if not candidates:
586        if debug:
587            dprint("No value column could be determined.")
588        return None
589
590    return candidates[0]

Return the name of the value column if it's defined, otherwise make an educated guess. If not set in the columns dictionary, return the first numeric column that is not an ID or datetime column. If none may be found, return None.

Parameters
  • debug (bool, default False:): Verbosity toggle.
Returns
  • Either a string or None.
parents: List[Pipe]
593@property
594def parents(self) -> List[meerschaum.Pipe]:
595    """
596    Return a list of `meerschaum.Pipe` objects to be designated as parents.
597    """
598    if 'parents' not in self.parameters:
599        return []
600    from meerschaum.utils.warnings import warn
601    _parents_keys = self.parameters['parents']
602    if not isinstance(_parents_keys, list):
603        warn(
604            f"Please ensure the parents for {self} are defined as a list of keys.",
605            stacklevel = 4
606        )
607        return []
608    from meerschaum import Pipe
609    _parents = []
610    for keys in _parents_keys:
611        try:
612            p = Pipe(**keys)
613        except Exception as e:
614            warn(f"Unable to build parent with keys '{keys}' for {self}:\n{e}")
615            continue
616        _parents.append(p)
617    return _parents

Return a list of meerschaum.Pipe objects to be designated as parents.

children: List[Pipe]
620@property
621def children(self) -> List[meerschaum.Pipe]:
622    """
623    Return a list of `meerschaum.Pipe` objects to be designated as children.
624    """
625    if 'children' not in self.parameters:
626        return []
627    from meerschaum.utils.warnings import warn
628    _children_keys = self.parameters['children']
629    if not isinstance(_children_keys, list):
630        warn(
631            f"Please ensure the children for {self} are defined as a list of keys.",
632            stacklevel = 4
633        )
634        return []
635    from meerschaum import Pipe
636    _children = []
637    for keys in _children_keys:
638        try:
639            p = Pipe(**keys)
640        except Exception as e:
641            warn(f"Unable to build parent with keys '{keys}' for {self}:\n{e}")
642            continue
643        _children.append(p)
644    return _children

Return a list of meerschaum.Pipe objects to be designated as children.

target: str
647@property
648def target(self) -> str:
649    """
650    The target table name.
651    You can set the target name under on of the following keys
652    (checked in this order):
653      - `target`
654      - `target_name`
655      - `target_table`
656      - `target_table_name`
657    """
658    if 'target' not in self.parameters:
659        default_target = self._target_legacy()
660        default_targets = {default_target}
661        potential_keys = ('target_name', 'target_table', 'target_table_name')
662        _target = None
663        for k in potential_keys:
664            if k in self.parameters:
665                _target = self.parameters[k]
666                break
667
668        _target = _target or default_target
669
670        if self.instance_connector.type == 'sql':
671            from meerschaum.utils.sql import truncate_item_name
672            truncated_target = truncate_item_name(_target, self.instance_connector.flavor)
673            default_targets.add(truncated_target)
674            warned_target = self.__dict__.get('_warned_target', False)
675            if truncated_target != _target and not warned_target:
676                if not warned_target:
677                    warn(
678                        f"The target '{_target}' is too long for '{self.instance_connector.flavor}', "
679                        + f"will use {truncated_target} instead."
680                    )
681                    self.__dict__['_warned_target'] = True
682                _target = truncated_target
683
684        if _target in default_targets:
685            return _target
686        self.target = _target
687    return self.parameters['target']

The target table name. You can set the target name under on of the following keys (checked in this order):

  • target
  • target_name
  • target_table
  • target_table_name
def guess_datetime(self) -> Optional[str]:
710def guess_datetime(self) -> Union[str, None]:
711    """
712    Try to determine a pipe's datetime column.
713    """
714    _dtypes = self.dtypes
715
716    ### Abort if the user explictly disallows a datetime index.
717    if 'datetime' in _dtypes:
718        if _dtypes['datetime'] is None:
719            return None
720
721    from meerschaum.utils.dtypes import are_dtypes_equal
722    dt_cols = [
723        col
724        for col, typ in _dtypes.items()
725        if are_dtypes_equal(typ, 'datetime')
726    ]
727    if not dt_cols:
728        return None
729    return dt_cols[0]

Try to determine a pipe's datetime column.

def show( self, nopretty: bool = False, debug: bool = False, **kw) -> Tuple[bool, str]:
12def show(
13        self,
14        nopretty: bool = False,
15        debug: bool = False,
16        **kw
17    ) -> SuccessTuple:
18    """
19    Show attributes of a Pipe.
20
21    Parameters
22    ----------
23    nopretty: bool, default False
24        If `True`, simply print the JSON of the pipe's attributes.
25
26    debug: bool, default False
27        Verbosity toggle.
28
29    Returns
30    -------
31    A `SuccessTuple` of success, message.
32
33    """
34    import json
35    from meerschaum.utils.formatting import (
36        pprint, make_header, ANSI, highlight_pipes, fill_ansi, get_console,
37    )
38    from meerschaum.utils.packages import import_rich, attempt_import
39    from meerschaum.utils.warnings import info
40    attributes_json = json.dumps(self.attributes)
41    if not nopretty:
42        _to_print = f"Attributes for {self}:"
43        if ANSI:
44            _to_print = fill_ansi(highlight_pipes(make_header(_to_print)), 'magenta')
45            print(_to_print)
46            rich = import_rich()
47            rich_json = attempt_import('rich.json')
48            get_console().print(rich_json.JSON(attributes_json))
49        else:
50            print(_to_print)
51    else:
52        print(attributes_json)
53
54    return True, "Success"

Show attributes of a Pipe.

Parameters
  • nopretty (bool, default False): If True, simply print the JSON of the pipe's attributes.
  • debug (bool, default False): Verbosity toggle.
Returns
def edit( self, patch: bool = False, interactive: bool = False, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
21def edit(
22    self,
23    patch: bool = False,
24    interactive: bool = False,
25    debug: bool = False,
26    **kw: Any
27) -> SuccessTuple:
28    """
29    Edit a Pipe's configuration.
30
31    Parameters
32    ----------
33    patch: bool, default False
34        If `patch` is True, update parameters by cascading rather than overwriting.
35    interactive: bool, default False
36        If `True`, open an editor for the user to make changes to the pipe's YAML file.
37    debug: bool, default False
38        Verbosity toggle.
39
40    Returns
41    -------
42    A `SuccessTuple` of success, message.
43
44    """
45    from meerschaum.utils.venv import Venv
46    from meerschaum.connectors import get_connector_plugin
47
48    if self.temporary:
49        return False, "Cannot edit pipes created with `temporary=True` (read-only)."
50
51    if not interactive:
52        with Venv(get_connector_plugin(self.instance_connector)):
53            return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw)
54
55    from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
56    from meerschaum.utils.misc import edit_file
57    parameters_filename = str(self) + '.yaml'
58    parameters_path = PIPES_CACHE_RESOURCES_PATH / parameters_filename
59
60    from meerschaum.utils.yaml import yaml
61
62    edit_text = f"Edit the parameters for {self}"
63    edit_top = '#' * (len(edit_text) + 4)
64    edit_header = edit_top + f'\n# {edit_text} #\n' + edit_top + '\n\n'
65
66    from meerschaum.config import get_config
67    parameters = dict(get_config('pipes', 'parameters', patch=True))
68    from meerschaum.config._patch import apply_patch_to_config
69    parameters = apply_patch_to_config(parameters, self.parameters)
70
71    ### write parameters to yaml file
72    with open(parameters_path, 'w+') as f:
73        f.write(edit_header)
74        yaml.dump(parameters, stream=f, sort_keys=False)
75
76    ### only quit editing if yaml is valid
77    editing = True
78    while editing:
79        edit_file(parameters_path)
80        try:
81            with open(parameters_path, 'r') as f:
82                file_parameters = yaml.load(f.read())
83        except Exception as e:
84            from meerschaum.utils.warnings import warn
85            warn(f"Invalid format defined for '{self}':\n\n{e}")
86            input(f"Press [Enter] to correct the configuration for '{self}': ")
87        else:
88            editing = False
89
90    self.parameters = file_parameters
91
92    if debug:
93        from meerschaum.utils.formatting import pprint
94        pprint(self.parameters)
95
96    with Venv(get_connector_plugin(self.instance_connector)):
97        return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw)

Edit a Pipe's configuration.

Parameters
  • patch (bool, default False): If patch is True, update parameters by cascading rather than overwriting.
  • interactive (bool, default False): If True, open an editor for the user to make changes to the pipe's YAML file.
  • debug (bool, default False): Verbosity toggle.
Returns
def edit_definition( self, yes: bool = False, noask: bool = False, force: bool = False, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
100def edit_definition(
101    self,
102    yes: bool = False,
103    noask: bool = False,
104    force: bool = False,
105    debug : bool = False,
106    **kw : Any
107) -> SuccessTuple:
108    """
109    Edit a pipe's definition file and update its configuration.
110    **NOTE:** This function is interactive and should not be used in automated scripts!
111
112    Returns
113    -------
114    A `SuccessTuple` of success, message.
115
116    """
117    if self.temporary:
118        return False, "Cannot edit pipes created with `temporary=True` (read-only)."
119
120    from meerschaum.connectors import instance_types
121    if (self.connector is None) or self.connector.type not in instance_types:
122        return self.edit(interactive=True, debug=debug, **kw)
123
124    import json
125    from meerschaum.utils.warnings import info, warn
126    from meerschaum.utils.debug import dprint
127    from meerschaum.config._patch import apply_patch_to_config
128    from meerschaum.utils.misc import edit_file
129
130    _parameters = self.parameters
131    if 'fetch' not in _parameters:
132        _parameters['fetch'] = {}
133
134    def _edit_api():
135        from meerschaum.utils.prompt import prompt, yes_no
136        info(
137            f"Please enter the keys of the source pipe from '{self.connector}'.\n" +
138            "Type 'None' for None, or empty when there is no default. Press [CTRL+C] to skip."
139        )
140
141        _keys = { 'connector_keys' : None, 'metric_key' : None, 'location_key' : None }
142        for k in _keys:
143            _keys[k] = _parameters['fetch'].get(k, None)
144
145        for k, v in _keys.items():
146            try:
147                _keys[k] = prompt(k.capitalize().replace('_', ' ') + ':', icon=True, default=v)
148            except KeyboardInterrupt:
149                continue
150            if _keys[k] in ('', 'None', '\'None\'', '[None]'):
151                _keys[k] = None
152
153        _parameters['fetch'] = apply_patch_to_config(_parameters['fetch'], _keys)
154
155        info("You may optionally specify additional filter parameters as JSON.")
156        print("  Parameters are translated into a 'WHERE x AND y' clause, and lists are IN clauses.")
157        print("  For example, the following JSON would correspond to 'WHERE x = 1 AND y IN (2, 3)':")
158        print(json.dumps({'x': 1, 'y': [2, 3]}, indent=2, separators=(',', ': ')))
159        if force or yes_no(
160            "Would you like to add additional filter parameters?",
161            yes=yes, noask=noask
162        ):
163            from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
164            definition_filename = str(self) + '.json'
165            definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename
166            try:
167                definition_path.touch()
168                with open(definition_path, 'w+') as f:
169                    json.dump(_parameters.get('fetch', {}).get('params', {}), f, indent=2)
170            except Exception as e:
171                return False, f"Failed writing file '{definition_path}':\n" + str(e)
172
173            _params = None
174            while True:
175                edit_file(definition_path)
176                try:
177                    with open(definition_path, 'r') as f:
178                        _params = json.load(f)
179                except Exception as e:
180                    warn(f'Failed to read parameters JSON:\n{e}', stack=False)
181                    if force or yes_no(
182                        "Would you like to try again?\n  "
183                        + "If not, the parameters JSON file will be ignored.",
184                        noask=noask, yes=yes
185                    ):
186                        continue
187                    _params = None
188                break
189            if _params is not None:
190                if 'fetch' not in _parameters:
191                    _parameters['fetch'] = {}
192                _parameters['fetch']['params'] = _params
193
194        self.parameters = _parameters
195        return True, "Success"
196
197    def _edit_sql():
198        import pathlib, os, textwrap
199        from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
200        from meerschaum.utils.misc import edit_file
201        definition_filename = str(self) + '.sql'
202        definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename
203
204        sql_definition = _parameters['fetch'].get('definition', None)
205        if sql_definition is None:
206            sql_definition = ''
207        sql_definition = textwrap.dedent(sql_definition).lstrip()
208
209        try:
210            definition_path.touch()
211            with open(definition_path, 'w+') as f:
212                f.write(sql_definition)
213        except Exception as e:
214            return False, f"Failed writing file '{definition_path}':\n" + str(e)
215
216        edit_file(definition_path)
217        try:
218            with open(definition_path, 'r') as f:
219                file_definition = f.read()
220        except Exception as e:
221            return False, f"Failed reading file '{definition_path}':\n" + str(e)
222
223        if sql_definition == file_definition:
224            return False, f"No changes made to definition for {self}."
225
226        if ' ' not in file_definition:
227            return False, f"Invalid SQL definition for {self}."
228
229        if debug:
230            dprint("Read SQL definition:\n\n" + file_definition)
231        _parameters['fetch']['definition'] = file_definition
232        self.parameters = _parameters
233        return True, "Success"
234
235    locals()['_edit_' + str(self.connector.type)]()
236    return self.edit(interactive=False, debug=debug, **kw)

Edit a pipe's definition file and update its configuration. NOTE: This function is interactive and should not be used in automated scripts!

Returns
def update(self, *args, **kw) -> Tuple[bool, str]:
13def update(self, *args, **kw) -> SuccessTuple:
14    """
15    Update a pipe's parameters in its instance.
16    """
17    kw['interactive'] = False
18    return self.edit(*args, **kw)

Update a pipe's parameters in its instance.

def sync( self, df: Union[pandas.core.frame.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]], meerschaum.core.Pipe._sync.InferFetch] = <class 'meerschaum.core.Pipe._sync.InferFetch'>, begin: Union[datetime.datetime, int, str, NoneType] = '', end: Union[datetime.datetime, int, NoneType] = None, force: bool = False, retries: int = 10, min_seconds: int = 1, check_existing: bool = True, enforce_dtypes: bool = True, blocking: bool = True, workers: Optional[int] = None, callback: Optional[Callable[[Tuple[bool, str]], Any]] = None, error_callback: Optional[Callable[[Exception], Any]] = None, chunksize: Optional[int] = -1, sync_chunks: bool = True, debug: bool = False, _inplace: bool = True, **kw: Any) -> Tuple[bool, str]:
 40def sync(
 41    self,
 42    df: Union[
 43        pd.DataFrame,
 44        Dict[str, List[Any]],
 45        List[Dict[str, Any]],
 46        InferFetch
 47    ] = InferFetch,
 48    begin: Union[datetime, int, str, None] = '',
 49    end: Union[datetime, int, None] = None,
 50    force: bool = False,
 51    retries: int = 10,
 52    min_seconds: int = 1,
 53    check_existing: bool = True,
 54    enforce_dtypes: bool = True,
 55    blocking: bool = True,
 56    workers: Optional[int] = None,
 57    callback: Optional[Callable[[Tuple[bool, str]], Any]] = None,
 58    error_callback: Optional[Callable[[Exception], Any]] = None,
 59    chunksize: Optional[int] = -1,
 60    sync_chunks: bool = True,
 61    debug: bool = False,
 62    _inplace: bool = True,
 63    **kw: Any
 64) -> SuccessTuple:
 65    """
 66    Fetch new data from the source and update the pipe's table with new data.
 67    
 68    Get new remote data via fetch, get existing data in the same time period,
 69    and merge the two, only keeping the unseen data.
 70
 71    Parameters
 72    ----------
 73    df: Union[None, pd.DataFrame, Dict[str, List[Any]]], default None
 74        An optional DataFrame to sync into the pipe. Defaults to `None`.
 75
 76    begin: Union[datetime, int, str, None], default ''
 77        Optionally specify the earliest datetime to search for data.
 78
 79    end: Union[datetime, int, str, None], default None
 80        Optionally specify the latest datetime to search for data.
 81
 82    force: bool, default False
 83        If `True`, keep trying to sync untul `retries` attempts.
 84
 85    retries: int, default 10
 86        If `force`, how many attempts to try syncing before declaring failure.
 87
 88    min_seconds: Union[int, float], default 1
 89        If `force`, how many seconds to sleep between retries. Defaults to `1`.
 90
 91    check_existing: bool, default True
 92        If `True`, pull and diff with existing data from the pipe.
 93
 94    enforce_dtypes: bool, default True
 95        If `True`, enforce dtypes on incoming data.
 96        Set this to `False` if the incoming rows are expected to be of the correct dtypes.
 97
 98    blocking: bool, default True
 99        If `True`, wait for sync to finish and return its result, otherwise
100        asyncronously sync (oxymoron?) and return success. Defaults to `True`.
101        Only intended for specific scenarios.
102
103    workers: Optional[int], default None
104        If provided and the instance connector is thread-safe
105        (`pipe.instance_connector.IS_THREAD_SAFE is True`),
106        limit concurrent sync to this many threads.
107
108    callback: Optional[Callable[[Tuple[bool, str]], Any]], default None
109        Callback function which expects a SuccessTuple as input.
110        Only applies when `blocking=False`.
111
112    error_callback: Optional[Callable[[Exception], Any]], default None
113        Callback function which expects an Exception as input.
114        Only applies when `blocking=False`.
115
116    chunksize: int, default -1
117        Specify the number of rows to sync per chunk.
118        If `-1`, resort to system configuration (default is `900`).
119        A `chunksize` of `None` will sync all rows in one transaction.
120
121    sync_chunks: bool, default True
122        If possible, sync chunks while fetching them into memory.
123
124    debug: bool, default False
125        Verbosity toggle. Defaults to False.
126
127    Returns
128    -------
129    A `SuccessTuple` of success (`bool`) and message (`str`).
130    """
131    from meerschaum.utils.debug import dprint, _checkpoint
132    from meerschaum.utils.formatting import get_console
133    from meerschaum.utils.venv import Venv
134    from meerschaum.connectors import get_connector_plugin
135    from meerschaum.utils.misc import df_is_chunk_generator, filter_keywords, filter_arguments
136    from meerschaum.utils.pool import get_pool
137    from meerschaum.config import get_config
138
139    if (callback is not None or error_callback is not None) and blocking:
140        warn("Callback functions are only executed when blocking = False. Ignoring...")
141
142    _checkpoint(_total=2, **kw)
143
144    if chunksize == 0:
145        chunksize = None
146        sync_chunks = False
147
148    begin, end = self.parse_date_bounds(begin, end)
149    kw.update({
150        'begin': begin,
151        'end': end,
152        'force': force,
153        'retries': retries,
154        'min_seconds': min_seconds,
155        'check_existing': check_existing,
156        'blocking': blocking,
157        'workers': workers,
158        'callback': callback,
159        'error_callback': error_callback,
160        'sync_chunks': sync_chunks,
161        'chunksize': chunksize,
162    })
163
164    ### NOTE: Invalidate `_exists` cache before and after syncing.
165    self._exists = None
166
167    def _sync(
168        p: mrsm.Pipe,
169        df: Union[
170            'pd.DataFrame',
171            Dict[str, List[Any]],
172            List[Dict[str, Any]],
173            InferFetch
174        ] = InferFetch,
175    ) -> SuccessTuple:
176        if df is None:
177            p._exists = None
178            return (
179                False,
180                f"You passed `None` instead of data into `sync()` for {p}.\n"
181                + "Omit the DataFrame to infer fetching.",
182            )
183        ### Ensure that Pipe is registered.
184        if not p.temporary and p.get_id(debug=debug) is None:
185            ### NOTE: This may trigger an interactive session for plugins!
186            register_success, register_msg = p.register(debug=debug)
187            if not register_success:
188                if 'already' not in register_msg:
189                    p._exists = None
190                    return register_success, register_msg
191
192        ### If connector is a plugin with a `sync()` method, return that instead.
193        ### If the plugin does not have a `sync()` method but does have a `fetch()` method,
194        ### use that instead.
195        ### NOTE: The DataFrame must be omitted for the plugin sync method to apply.
196        ### If a DataFrame is provided, continue as expected.
197        if hasattr(df, 'MRSM_INFER_FETCH'):
198            try:
199                if p.connector is None:
200                    if ':' not in p.connector_keys:
201                        return True, f"{p} does not support fetching; nothing to do."
202
203                    msg = f"{p} does not have a valid connector."
204                    if p.connector_keys.startswith('plugin:'):
205                        msg += f"\n    Perhaps {p.connector_keys} has a syntax error?"
206                    p._exists = None
207                    return False, msg
208            except Exception:
209                p._exists = None
210                return False, f"Unable to create the connector for {p}."
211
212            ### Sync in place if this is a SQL pipe.
213            if (
214                str(self.connector) == str(self.instance_connector)
215                and 
216                hasattr(self.instance_connector, 'sync_pipe_inplace')
217                and
218                _inplace
219                and
220                get_config('system', 'experimental', 'inplace_sync')
221            ):
222                with Venv(get_connector_plugin(self.instance_connector)):
223                    p._exists = None
224                    _args, _kwargs = filter_arguments(
225                        p.instance_connector.sync_pipe_inplace,
226                        p,
227                        debug=debug,
228                        **kw
229                    )
230                    return self.instance_connector.sync_pipe_inplace(
231                        *_args,
232                        **_kwargs
233                    )
234
235            ### Activate and invoke `sync(pipe)` for plugin connectors with `sync` methods.
236            try:
237                if getattr(p.connector, 'sync', None) is not None:
238                    with Venv(get_connector_plugin(p.connector), debug=debug):
239                        _args, _kwargs = filter_arguments(
240                            p.connector.sync,
241                            p,
242                            debug=debug,
243                            **kw
244                        )
245                        return_tuple = p.connector.sync(*_args, **_kwargs)
246                    p._exists = None
247                    if not isinstance(return_tuple, tuple):
248                        return_tuple = (
249                            False,
250                            f"Plugin '{p.connector.label}' returned non-tuple value: {return_tuple}"
251                        )
252                    return return_tuple
253
254            except Exception as e:
255                get_console().print_exception()
256                msg = f"Failed to sync {p} with exception: '" + str(e) + "'"
257                if debug:
258                    error(msg, silent=False)
259                p._exists = None
260                return False, msg
261
262            ### Fetch the dataframe from the connector's `fetch()` method.
263            try:
264                with Venv(get_connector_plugin(p.connector), debug=debug):
265                    df = p.fetch(
266                        **filter_keywords(
267                            p.fetch,
268                            debug=debug,
269                            **kw
270                        )
271                    )
272            except Exception as e:
273                get_console().print_exception(
274                    suppress=[
275                        'meerschaum/core/Pipe/_sync.py',
276                        'meerschaum/core/Pipe/_fetch.py',
277                    ]
278                )
279                msg = f"Failed to fetch data from {p.connector}:\n    {e}"
280                df = None
281
282            if df is None:
283                p._exists = None
284                return False, f"No data were fetched for {p}."
285
286            if isinstance(df, list):
287                if len(df) == 0:
288                    return True, f"No new rows were returned for {p}."
289
290                ### May be a chunk hook results list.
291                if isinstance(df[0], tuple):
292                    success = all([_success for _success, _ in df])
293                    message = '\n'.join([_message for _, _message in df])
294                    return success, message
295
296            if df is True:
297                p._exists = None
298                return True, f"{p} is being synced in parallel."
299
300        ### CHECKPOINT: Retrieved the DataFrame.
301        _checkpoint(**kw)
302
303        ### Allow for dataframe generators or iterables.
304        if df_is_chunk_generator(df):
305            kw['workers'] = p.get_num_workers(kw.get('workers', None))
306            dt_col = p.columns.get('datetime', None)
307            pool = get_pool(workers=kw.get('workers', 1))
308            if debug:
309                dprint(f"Received {type(df)}. Attempting to sync first chunk...")
310
311            try:
312                chunk = next(df)
313            except StopIteration:
314                return True, "Received an empty generator; nothing to do."
315
316            chunk_success, chunk_msg = _sync(p, chunk)
317            chunk_msg = '\n' + self._get_chunk_label(chunk, dt_col) + '\n' + chunk_msg
318            if not chunk_success:
319                return chunk_success, f"Unable to sync initial chunk for {p}:\n{chunk_msg}"
320            if debug:
321                dprint("Successfully synced the first chunk, attemping the rest...")
322
323            failed_chunks = []
324            def _process_chunk(_chunk):
325                try:
326                    _chunk_success, _chunk_msg = _sync(p, _chunk)
327                except Exception as e:
328                    _chunk_success, _chunk_msg = False, str(e)
329                if not _chunk_success:
330                    failed_chunks.append(_chunk)
331                _chunk_msg = (
332                    self._get_chunk_label(_chunk, dt_col)
333                    + '\n'
334                    + _chunk_msg
335                )
336
337                mrsm.pprint((_chunk_success, _chunk_msg), calm=True)
338                return _chunk_success, _chunk_msg
339
340            results = sorted(
341                [(chunk_success, chunk_msg)] + (
342                    list(pool.imap(_process_chunk, df))
343                    if (
344                        not df_is_chunk_generator(chunk)  # Handle nested generators.
345                        and kw.get('workers', 1) != 1
346                    )
347                    else list(
348                        _process_chunk(_child_chunks)
349                        for _child_chunks in df
350                    )
351                )
352            )
353            chunk_messages = [chunk_msg for _, chunk_msg in results]
354            success_bools = [chunk_success for chunk_success, _ in results]
355            success = all(success_bools)
356            msg = (
357                f'Synced {len(chunk_messages)} chunk'
358                + ('s' if len(chunk_messages) != 1 else '')
359                + f' to {p}:\n\n'
360                + '\n\n'.join(chunk_messages).lstrip().rstrip()
361            ).lstrip().rstrip()
362
363            ### If some chunks succeeded, retry the failures.
364            retry_success = True
365            if not success and any(success_bools):
366                if debug:
367                    dprint("Retrying failed chunks...")
368                chunks_to_retry = [c for c in failed_chunks]
369                failed_chunks = []
370                for chunk in chunks_to_retry:
371                    chunk_success, chunk_msg = _process_chunk(chunk)
372                    msg += f"\n\nRetried chunk:\n{chunk_msg}\n"
373                    retry_success = retry_success and chunk_success
374
375            success = success and retry_success
376            return success, msg
377
378        ### Cast to a dataframe and ensure datatypes are what we expect.
379        df = self.enforce_dtypes(
380            df,
381            chunksize=chunksize,
382            enforce=enforce_dtypes,
383            debug=debug,
384        )
385
386        ### Capture `numeric`, `uuid`, `json`, and `bytes` columns.
387        self._persist_new_json_columns(df, debug=debug)
388        self._persist_new_numeric_columns(df, debug=debug)
389        self._persist_new_uuid_columns(df, debug=debug)
390        self._persist_new_bytes_columns(df, debug=debug)
391
392        if debug:
393            dprint(
394                "DataFrame to sync:\n"
395                + (
396                    str(df)[:255]
397                    + '...'
398                    if len(str(df)) >= 256
399                    else str(df)
400                ),
401                **kw
402            )
403
404        ### if force, continue to sync until success
405        return_tuple = False, f"Did not sync {p}."
406        run = True
407        _retries = 1
408        while run:
409            with Venv(get_connector_plugin(self.instance_connector)):
410                return_tuple = p.instance_connector.sync_pipe(
411                    pipe=p,
412                    df=df,
413                    debug=debug,
414                    **kw
415                )
416            _retries += 1
417            run = (not return_tuple[0]) and force and _retries <= retries
418            if run and debug:
419                dprint(f"Syncing failed for {p}. Attempt ( {_retries} / {retries} )", **kw)
420                dprint(f"Sleeping for {min_seconds} seconds...", **kw)
421                time.sleep(min_seconds)
422            if _retries > retries:
423                warn(
424                    f"Unable to sync {p} within {retries} attempt" +
425                        ("s" if retries != 1 else "") + "!"
426                )
427
428        ### CHECKPOINT: Finished syncing. Handle caching.
429        _checkpoint(**kw)
430        if self.cache_pipe is not None:
431            if debug:
432                dprint("Caching retrieved dataframe.", **kw)
433                _sync_cache_tuple = self.cache_pipe.sync(df, debug=debug, **kw)
434                if not _sync_cache_tuple[0]:
435                    warn(f"Failed to sync local cache for {self}.")
436
437        self._exists = None
438        return return_tuple
439
440    if blocking:
441        self._exists = None
442        return _sync(self, df=df)
443
444    from meerschaum.utils.threading import Thread
445    def default_callback(result_tuple: SuccessTuple):
446        dprint(f"Asynchronous result from {self}: {result_tuple}", **kw)
447
448    def default_error_callback(x: Exception):
449        dprint(f"Error received for {self}: {x}", **kw)
450
451    if callback is None and debug:
452        callback = default_callback
453    if error_callback is None and debug:
454        error_callback = default_error_callback
455    try:
456        thread = Thread(
457            target=_sync,
458            args=(self,),
459            kwargs={'df': df},
460            daemon=False,
461            callback=callback,
462            error_callback=error_callback,
463        )
464        thread.start()
465    except Exception as e:
466        self._exists = None
467        return False, str(e)
468
469    self._exists = None
470    return True, f"Spawned asyncronous sync for {self}."

Fetch new data from the source and update the pipe's table with new data.

Get new remote data via fetch, get existing data in the same time period, and merge the two, only keeping the unseen data.

Parameters
  • df (Union[None, pd.DataFrame, Dict[str, List[Any]]], default None): An optional DataFrame to sync into the pipe. Defaults to None.
  • begin (Union[datetime, int, str, None], default ''): Optionally specify the earliest datetime to search for data.
  • end (Union[datetime, int, str, None], default None): Optionally specify the latest datetime to search for data.
  • force (bool, default False): If True, keep trying to sync untul retries attempts.
  • retries (int, default 10): If force, how many attempts to try syncing before declaring failure.
  • min_seconds (Union[int, float], default 1): If force, how many seconds to sleep between retries. Defaults to 1.
  • check_existing (bool, default True): If True, pull and diff with existing data from the pipe.
  • enforce_dtypes (bool, default True): If True, enforce dtypes on incoming data. Set this to False if the incoming rows are expected to be of the correct dtypes.
  • blocking (bool, default True): If True, wait for sync to finish and return its result, otherwise asyncronously sync (oxymoron?) and return success. Defaults to True. Only intended for specific scenarios.
  • workers (Optional[int], default None): If provided and the instance connector is thread-safe (pipe.instance_connector.IS_THREAD_SAFE is True), limit concurrent sync to this many threads.
  • callback (Optional[Callable[[Tuple[bool, str]], Any]], default None): Callback function which expects a SuccessTuple as input. Only applies when blocking=False.
  • error_callback (Optional[Callable[[Exception], Any]], default None): Callback function which expects an Exception as input. Only applies when blocking=False.
  • chunksize (int, default -1): Specify the number of rows to sync per chunk. If -1, resort to system configuration (default is 900). A chunksize of None will sync all rows in one transaction.
  • sync_chunks (bool, default True): If possible, sync chunks while fetching them into memory.
  • debug (bool, default False): Verbosity toggle. Defaults to False.
Returns
def get_sync_time( self, params: Optional[Dict[str, Any]] = None, newest: bool = True, apply_backtrack_interval: bool = False, round_down: bool = False, debug: bool = False) -> Union[datetime.datetime, int, NoneType]:
473def get_sync_time(
474    self,
475    params: Optional[Dict[str, Any]] = None,
476    newest: bool = True,
477    apply_backtrack_interval: bool = False,
478    round_down: bool = False,
479    debug: bool = False
480) -> Union['datetime', int, None]:
481    """
482    Get the most recent datetime value for a Pipe.
483
484    Parameters
485    ----------
486    params: Optional[Dict[str, Any]], default None
487        Dictionary to build a WHERE clause for a specific column.
488        See `meerschaum.utils.sql.build_where`.
489
490    newest: bool, default True
491        If `True`, get the most recent datetime (honoring `params`).
492        If `False`, get the oldest datetime (`ASC` instead of `DESC`).
493
494    apply_backtrack_interval: bool, default False
495        If `True`, subtract the backtrack interval from the sync time.
496
497    round_down: bool, default False
498        If `True`, round down the datetime value to the nearest minute.
499
500    debug: bool, default False
501        Verbosity toggle.
502
503    Returns
504    -------
505    A `datetime` or int, if the pipe exists, otherwise `None`.
506
507    """
508    from meerschaum.utils.venv import Venv
509    from meerschaum.connectors import get_connector_plugin
510    from meerschaum.utils.misc import round_time
511
512    if not self.columns.get('datetime', None):
513        return None
514
515    with Venv(get_connector_plugin(self.instance_connector)):
516        sync_time = self.instance_connector.get_sync_time(
517            self,
518            params=params,
519            newest=newest,
520            debug=debug,
521        )
522
523    if round_down and isinstance(sync_time, datetime):
524        sync_time = round_time(sync_time, timedelta(minutes=1))
525
526    if apply_backtrack_interval and sync_time is not None:
527        backtrack_interval = self.get_backtrack_interval(debug=debug)
528        try:
529            sync_time -= backtrack_interval
530        except Exception as e:
531            warn(f"Failed to apply backtrack interval:\n{e}")
532
533    return self.parse_date_bounds(sync_time)

Get the most recent datetime value for a Pipe.

Parameters
  • params (Optional[Dict[str, Any]], default None): Dictionary to build a WHERE clause for a specific column. See meerschaum.utils.sql.build_where.
  • newest (bool, default True): If True, get the most recent datetime (honoring params). If False, get the oldest datetime (ASC instead of DESC).
  • apply_backtrack_interval (bool, default False): If True, subtract the backtrack interval from the sync time.
  • round_down (bool, default False): If True, round down the datetime value to the nearest minute.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A datetime or int, if the pipe exists, otherwise None.
def exists(self, debug: bool = False) -> bool:
536def exists(
537    self,
538    debug: bool = False
539) -> bool:
540    """
541    See if a Pipe's table exists.
542
543    Parameters
544    ----------
545    debug: bool, default False
546        Verbosity toggle.
547
548    Returns
549    -------
550    A `bool` corresponding to whether a pipe's underlying table exists.
551
552    """
553    import time
554    from meerschaum.utils.venv import Venv
555    from meerschaum.connectors import get_connector_plugin
556    from meerschaum.config import STATIC_CONFIG
557    from meerschaum.utils.debug import dprint
558    now = time.perf_counter()
559    exists_timeout_seconds = STATIC_CONFIG['pipes']['exists_timeout_seconds']
560
561    _exists = self.__dict__.get('_exists', None)
562    if _exists:
563        exists_timestamp = self.__dict__.get('_exists_timestamp', None)
564        if exists_timestamp is not None:
565            delta = now - exists_timestamp
566            if delta < exists_timeout_seconds:
567                if debug:
568                    dprint(f"Returning cached `exists` for {self} ({round(delta, 2)} seconds old).")
569                return _exists
570
571    with Venv(get_connector_plugin(self.instance_connector)):
572        _exists = (
573            self.instance_connector.pipe_exists(pipe=self, debug=debug)
574            if hasattr(self.instance_connector, 'pipe_exists')
575            else False
576        )
577
578    self.__dict__['_exists'] = _exists
579    self.__dict__['_exists_timestamp'] = now
580    return _exists

See if a Pipe's table exists.

Parameters
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool corresponding to whether a pipe's underlying table exists.
def filter_existing( self, df: pandas.core.frame.DataFrame, safe_copy: bool = True, date_bound_only: bool = False, include_unchanged_columns: bool = False, enforce_dtypes: bool = False, chunksize: Optional[int] = -1, debug: bool = False, **kw) -> Tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame, pandas.core.frame.DataFrame]: