meerschaum

Meerschaum banner

Meerschaum Python API

Welcome to the Meerschaum Python API technical documentation! Here you can find information about the classes and functions provided by the meerschaum package. Visit meerschaum.io for general usage documentation.

Root Module

For your convenience, the following classes and functions may be imported from the root meerschaum namespace:

Examples

Build a Connector

import meerschaum as mrsm

sql_conn = mrsm.get_connector(
    'sql:temp',
    flavor='sqlite',
    database='/tmp/tmp.db',
)
df = sql_conn.read("SELECT 1 AS foo")
print(df)
#    foo
# 0    1

sql_conn.to_sql(df, 'foo')
print(sql_conn.read('foo'))
#    foo
# 0    1

Create a Custom Connector Class

from datetime import datetime, timezone
from random import randint
import meerschaum as mrsm
from meerschaum.utils.misc import round_time

@mrsm.make_connector
class FooConnector(mrsm.Connector):
    REQUIRED_ATTRIBUTES = ['username', 'password']

    def fetch(
        self,
        begin: datetime | None = None,
        end: datetime | None = None,
    ):
        now = begin or round_time(datetime.now(timezone.utc))
        return [
            {'ts': now, 'id': 1, 'vl': randint(1, 100)},
            {'ts': now, 'id': 2, 'vl': randint(1, 100)},
            {'ts': now, 'id': 3, 'vl': randint(1, 100)},
        ]

foo_conn = mrsm.get_connector(
    'foo:bar',
    username='foo',
    password='bar',
)
docs = foo_conn.fetch()

Build a Pipe

from datetime import datetime
import meerschaum as mrsm

pipe = mrsm.Pipe(
    foo_conn, 'demo',
    instance=sql_conn,
    columns={'datetime': 'ts', 'id': 'id'},
    tags=['production'],
)
pipe.sync(begin=datetime(2024, 1, 1))
df = pipe.get_data()
print(df)
#           ts  id  vl
# 0 2024-01-01   1  97
# 1 2024-01-01   2  18
# 2 2024-01-01   3  96

Get Registered Pipes

import meerschaum as mrsm

pipes = mrsm.get_pipes(
    tags=['production'],
    instance=sql_conn,
    as_list=True,
)
print(pipes)
# [Pipe('foo:bar', 'demo', instance='sql:temp')]

Access a Plugin's Module

import meerschaum as mrsm

plugin = mrsm.Plugin('noaa')
with mrsm.Venv(plugin):
    noaa = plugin.module
    print(noaa.get_station_info('KGMU'))
# {'name': 'Greenville Downtown Airport', 'geometry': {'type': 'Point', 'coordinates': [-82.35004, 34.84873]}}

Submodules

meerschaum.actions
Access functions for actions and subactions.

meerschaum.config
Read and write the Meerschaum configuration registry.

meerschaum.connectors
Build connectors to interact with databases and fetch data.

meerschaum.plugins
Access plugin modules and other API utilties.

meerschaum.utils
Utility functions are available in several submodules:

 1#! /usr/bin/env python
 2# -*- coding: utf-8 -*-
 3# vim:fenc=utf-8
 4
 5"""
 6Copyright 2023 Bennett Meares
 7
 8Licensed under the Apache License, Version 2.0 (the "License");
 9you may not use this file except in compliance with the License.
10You may obtain a copy of the License at
11
12   http://www.apache.org/licenses/LICENSE-2.0
13
14Unless required by applicable law or agreed to in writing, software
15distributed under the License is distributed on an "AS IS" BASIS,
16WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17See the License for the specific language governing permissions and
18limitations under the License.
19"""
20
21import atexit
22from meerschaum.utils.typing import SuccessTuple
23from meerschaum.core.Pipe import Pipe
24from meerschaum.plugins import Plugin
25from meerschaum.utils.venv import Venv
26from meerschaum.connectors import get_connector, Connector, make_connector
27from meerschaum.utils import get_pipes
28from meerschaum.utils.formatting import pprint
29from meerschaum._internal.docs import index as __doc__
30from meerschaum.config import __version__, get_config
31from meerschaum.utils.packages import attempt_import
32from meerschaum.__main__ import _close_pools
33
34atexit.register(_close_pools)
35
36__pdoc__ = {'gui': False, 'api': False, 'core': False, '_internal': False}
37__all__ = (
38    "get_pipes",
39    "get_connector",
40    "get_config",
41    "Pipe",
42    "Plugin",
43    "Venv",
44    "Plugin",
45    "pprint",
46    "attempt_import",
47    "actions",
48    "config",
49    "connectors",
50    "plugins",
51    "utils",
52    "SuccessTuple",
53    "Connector",
54    "make_connector",
55)
def get_pipes( connector_keys: Union[str, List[str], NoneType] = None, metric_keys: Union[str, List[str], NoneType] = None, location_keys: Union[str, List[str], NoneType] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, mrsm_instance: Union[str, meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, instance: Union[str, meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, as_list: bool = False, method: str = 'registered', wait: bool = False, debug: bool = False, **kw: Any) -> Union[Dict[str, Dict[str, Dict[str, Pipe]]], List[Pipe]]:
 19def get_pipes(
 20        connector_keys: Union[str, List[str], None] = None,
 21        metric_keys: Union[str, List[str], None] = None,
 22        location_keys: Union[str, List[str], None] = None,
 23        tags: Optional[List[str]] = None,
 24        params: Optional[Dict[str, Any]] = None,
 25        mrsm_instance: Union[str, InstanceConnector, None] = None,
 26        instance: Union[str, InstanceConnector, None] = None,
 27        as_list: bool = False,
 28        method: str = 'registered',
 29        wait: bool = False,
 30        debug: bool = False,
 31        **kw: Any
 32    ) -> Union[PipesDict, List[mrsm.Pipe]]:
 33    """
 34    Return a dictionary or list of `meerschaum.Pipe` objects.
 35
 36    Parameters
 37    ----------
 38    connector_keys: Union[str, List[str], None], default None
 39        String or list of connector keys.
 40        If omitted or is `'*'`, fetch all possible keys.
 41        If a string begins with `'_'`, select keys that do NOT match the string.
 42
 43    metric_keys: Union[str, List[str], None], default None
 44        String or list of metric keys. See `connector_keys` for formatting.
 45
 46    location_keys: Union[str, List[str], None], default None
 47        String or list of location keys. See `connector_keys` for formatting.
 48
 49    tags: Optional[List[str]], default None
 50         If provided, only include pipes with these tags.
 51
 52    params: Optional[Dict[str, Any]], default None
 53        Dictionary of additional parameters to search by.
 54        Params are parsed into a SQL WHERE clause.
 55        E.g. `{'a': 1, 'b': 2}` equates to `'WHERE a = 1 AND b = 2'`
 56
 57    mrsm_instance: Union[str, InstanceConnector, None], default None
 58        Connector keys for the Meerschaum instance of the pipes.
 59        Must be a `meerschaum.connectors.sql.SQLConnector.SQLConnector` or
 60        `meerschaum.connectors.api.APIConnector.APIConnector`.
 61        
 62    as_list: bool, default False
 63        If `True`, return pipes in a list instead of a hierarchical dictionary.
 64        `False` : `{connector_keys: {metric_key: {location_key: Pipe}}}`
 65        `True`  : `[Pipe]`
 66
 67    method: str, default 'registered'
 68        Available options: `['registered', 'explicit', 'all']`
 69        If `'registered'` (default), create pipes based on registered keys in the connector's pipes table
 70        (API or SQL connector, depends on mrsm_instance).
 71        If `'explicit'`, create pipes from provided connector_keys, metric_keys, and location_keys
 72        instead of consulting the pipes table. Useful for creating non-existent pipes.
 73        If `'all'`, create pipes from predefined metrics and locations. Required `connector_keys`.
 74        **NOTE:** Method `'all'` is not implemented!
 75
 76    wait: bool, default False
 77        Wait for a connection before getting Pipes. Should only be true for cases where the
 78        database might not be running (like the API).
 79
 80    **kw: Any:
 81        Keyword arguments to pass to the `meerschaum.Pipe` constructor.
 82        
 83
 84    Returns
 85    -------
 86    A dictionary of dictionaries and `meerschaum.Pipe` objects
 87    in the connector, metric, location hierarchy.
 88    If `as_list` is `True`, return a list of `meerschaum.Pipe` objects.
 89
 90    Examples
 91    --------
 92    ```
 93    >>> ### Manual definition:
 94    >>> pipes = {
 95    ...     <connector_keys>: {
 96    ...         <metric_key>: {
 97    ...             <location_key>: Pipe(
 98    ...                 <connector_keys>,
 99    ...                 <metric_key>,
100    ...                 <location_key>,
101    ...             ),
102    ...         },
103    ...     },
104    ... },
105    >>> ### Accessing a single pipe:
106    >>> pipes['sql:main']['weather'][None]
107    >>> ### Return a list instead:
108    >>> get_pipes(as_list=True)
109    [sql_main_weather]
110    >>> 
111    ```
112    """
113
114    from meerschaum.config import get_config
115    from meerschaum.utils.warnings import error
116    from meerschaum.utils.misc import filter_keywords
117
118    if connector_keys is None:
119        connector_keys = []
120    if metric_keys is None:
121        metric_keys = []
122    if location_keys is None:
123        location_keys = []
124    if params is None:
125        params = {}
126    if tags is None:
127        tags = []
128
129    if isinstance(connector_keys, str):
130        connector_keys = [connector_keys]
131    if isinstance(metric_keys, str):
132        metric_keys = [metric_keys]
133    if isinstance(location_keys, str):
134        location_keys = [location_keys]
135
136    ### Get SQL or API connector (keys come from `connector.fetch_pipes_keys()`).
137    ### If `wait`, wait until a connection is made
138    if mrsm_instance is None:
139        mrsm_instance = instance
140    if mrsm_instance is None:
141        mrsm_instance = get_config('meerschaum', 'instance', patch=True)
142    if isinstance(mrsm_instance, str):
143        from meerschaum.connectors.parse import parse_instance_keys
144        connector = parse_instance_keys(keys=mrsm_instance, wait=wait, debug=debug)
145    else: ### NOTE: mrsm_instance MUST be a SQL or API connector for this to work
146        from meerschaum.connectors import instance_types
147        valid_connector = False
148        if hasattr(mrsm_instance, 'type'):
149            if mrsm_instance.type in instance_types:
150                valid_connector = True
151        if not valid_connector:
152            error(f"Invalid instance connector: {mrsm_instance}")
153        connector = mrsm_instance
154    if debug:
155        from meerschaum.utils.debug import dprint
156        dprint(f"Using instance connector: {connector}")
157    if not connector:
158        error(f"Could not create connector from keys: '{mrsm_instance}'")
159
160    ### Get a list of tuples for the keys needed to build pipes.
161    result = fetch_pipes_keys(
162        method,
163        connector,
164        connector_keys = connector_keys,
165        metric_keys = metric_keys,
166        location_keys = location_keys,
167        tags = tags,
168        params = params,
169        debug = debug
170    )
171    if result is None:
172        error(f"Unable to build pipes!")
173
174    ### Populate the `pipes` dictionary with Pipes based on the keys
175    ### obtained from the chosen `method`.
176    from meerschaum import Pipe
177    pipes = {}
178    for ck, mk, lk in result:
179        if ck not in pipes:
180            pipes[ck] = {}
181
182        if mk not in pipes[ck]:
183            pipes[ck][mk] = {}
184
185        pipes[ck][mk][lk] = Pipe(
186            ck, mk, lk,
187            mrsm_instance = connector,
188            debug = debug,
189            **filter_keywords(Pipe, **kw)
190        )
191
192    if not as_list:
193        return pipes
194    from meerschaum.utils.misc import flatten_pipes_dict
195    return flatten_pipes_dict(pipes)

Return a dictionary or list of Pipe objects.

Parameters
  • connector_keys (Union[str, List[str], None], default None): String or list of connector keys. If omitted or is '*', fetch all possible keys. If a string begins with '_', select keys that do NOT match the string.
  • metric_keys (Union[str, List[str], None], default None): String or list of metric keys. See connector_keys for formatting.
  • location_keys (Union[str, List[str], None], default None): String or list of location keys. See connector_keys for formatting.
  • tags (Optional[List[str]], default None): If provided, only include pipes with these tags.
  • params (Optional[Dict[str, Any]], default None): Dictionary of additional parameters to search by. Params are parsed into a SQL WHERE clause. E.g. {'a': 1, 'b': 2} equates to 'WHERE a = 1 AND b = 2'
  • mrsm_instance (Union[str, InstanceConnector, None], default None): Connector keys for the Meerschaum instance of the pipes. Must be a meerschaum.connectors.SQLConnector or meerschaum.connectors.APIConnector.
  • as_list (bool, default False): If True, return pipes in a list instead of a hierarchical dictionary. False : {connector_keys: {metric_key: {location_key: Pipe}}} True : [Pipe]
  • method (str, default 'registered'): Available options: ['registered', 'explicit', 'all'] If 'registered' (default), create pipes based on registered keys in the connector's pipes table (API or SQL connector, depends on mrsm_instance). If 'explicit', create pipes from provided connector_keys, metric_keys, and location_keys instead of consulting the pipes table. Useful for creating non-existent pipes. If 'all', create pipes from predefined metrics and locations. Required connector_keys. NOTE: Method 'all' is not implemented!
  • wait (bool, default False): Wait for a connection before getting Pipes. Should only be true for cases where the database might not be running (like the API).
  • **kw (Any:): Keyword arguments to pass to the Pipe constructor.
Returns
  • A dictionary of dictionaries and Pipe objects
  • in the connector, metric, location hierarchy.
  • If as_list is True, return a list of Pipe objects.
Examples
>>> ### Manual definition:
>>> pipes = {
...     <connector_keys>: {
...         <metric_key>: {
...             <location_key>: Pipe(
...                 <connector_keys>,
...                 <metric_key>,
...                 <location_key>,
...             ),
...         },
...     },
... },
>>> ### Accessing a single pipe:
>>> pipes['sql:main']['weather'][None]
>>> ### Return a list instead:
>>> get_pipes(as_list=True)
[sql_main_weather]
>>> 
def get_connector( type: str = None, label: str = None, refresh: bool = False, debug: bool = False, **kw: Any) -> Connector:
 73def get_connector(
 74        type: str = None,
 75        label: str = None,
 76        refresh: bool = False,
 77        debug: bool = False,
 78        **kw: Any
 79    ) -> Connector:
 80    """
 81    Return existing connector or create new connection and store for reuse.
 82    
 83    You can create new connectors if enough parameters are provided for the given type and flavor.
 84    
 85
 86    Parameters
 87    ----------
 88    type: Optional[str], default None
 89        Connector type (sql, api, etc.).
 90        Defaults to the type of the configured `instance_connector`.
 91
 92    label: Optional[str], default None
 93        Connector label (e.g. main). Defaults to `'main'`.
 94
 95    refresh: bool, default False
 96        Refresh the Connector instance / construct new object. Defaults to `False`.
 97
 98    kw: Any
 99        Other arguments to pass to the Connector constructor.
100        If the Connector has already been constructed and new arguments are provided,
101        `refresh` is set to `True` and the old Connector is replaced.
102
103    Returns
104    -------
105    A new Meerschaum connector (e.g. `meerschaum.connectors.api.APIConnector`,
106    `meerschaum.connectors.sql.SQLConnector`).
107    
108    Examples
109    --------
110    The following parameters would create a new
111    `meerschaum.connectors.sql.SQLConnector` that isn't in the configuration file.
112
113    ```
114    >>> conn = get_connector(
115    ...     type = 'sql',
116    ...     label = 'newlabel',
117    ...     flavor = 'sqlite',
118    ...     database = '/file/path/to/database.db'
119    ... )
120    >>>
121    ```
122
123    """
124    from meerschaum.connectors.parse import parse_instance_keys
125    from meerschaum.config import get_config
126    from meerschaum.config.static import STATIC_CONFIG
127    from meerschaum.utils.warnings import warn
128    global _loaded_plugin_connectors
129    if isinstance(type, str) and not label and ':' in type:
130        type, label = type.split(':', maxsplit=1)
131    with _locks['_loaded_plugin_connectors']:
132        if not _loaded_plugin_connectors:
133            load_plugin_connectors()
134            _loaded_plugin_connectors = True
135    if type is None and label is None:
136        default_instance_keys = get_config('meerschaum', 'instance', patch=True)
137        ### recursive call to get_connector
138        return parse_instance_keys(default_instance_keys)
139
140    ### NOTE: the default instance connector may not be main.
141    ### Only fall back to 'main' if the type is provided by the label is omitted.
142    label = label if label is not None else STATIC_CONFIG['connectors']['default_label']
143
144    ### type might actually be a label. Check if so and raise a warning.
145    if type not in connectors:
146        possibilities, poss_msg = [], ""
147        for _type in get_config('meerschaum', 'connectors'):
148            if type in get_config('meerschaum', 'connectors', _type):
149                possibilities.append(f"{_type}:{type}")
150        if len(possibilities) > 0:
151            poss_msg = " Did you mean"
152            for poss in possibilities[:-1]:
153                poss_msg += f" '{poss}',"
154            if poss_msg.endswith(','):
155                poss_msg = poss_msg[:-1]
156            if len(possibilities) > 1:
157                poss_msg += " or"
158            poss_msg += f" '{possibilities[-1]}'?"
159
160        warn(f"Cannot create Connector of type '{type}'." + poss_msg, stack=False)
161        return None
162
163    if 'sql' not in types:
164        from meerschaum.connectors.plugin import PluginConnector
165        with _locks['types']:
166            types.update({
167                'api'   : APIConnector,
168                'sql'   : SQLConnector,
169                'plugin': PluginConnector,
170            })
171    
172    ### determine if we need to call the constructor
173    if not refresh:
174        ### see if any user-supplied arguments differ from the existing instance
175        if label in connectors[type]:
176            warning_message = None
177            for attribute, value in kw.items():
178                if attribute not in connectors[type][label].meta:
179                    import inspect
180                    cls = connectors[type][label].__class__
181                    cls_init_signature = inspect.signature(cls)
182                    cls_init_params = cls_init_signature.parameters
183                    if attribute not in cls_init_params:
184                        warning_message = (
185                            f"Received new attribute '{attribute}' not present in connector " +
186                            f"{connectors[type][label]}.\n"
187                        )
188                elif connectors[type][label].__dict__[attribute] != value:
189                    warning_message = (
190                        f"Mismatched values for attribute '{attribute}' in connector "
191                        + f"'{connectors[type][label]}'.\n" +
192                        f"  - Keyword value: '{value}'\n" +
193                        f"  - Existing value: '{connectors[type][label].__dict__[attribute]}'\n"
194                    )
195            if warning_message is not None:
196                warning_message += (
197                    "\nSetting `refresh` to True and recreating connector with type:"
198                    + f" '{type}' and label '{label}'."
199                )
200                refresh = True
201                warn(warning_message)
202        else: ### connector doesn't yet exist
203            refresh = True
204
205    ### only create an object if refresh is True
206    ### (can be manually specified, otherwise determined above)
207    if refresh:
208        with _locks['connectors']:
209            try:
210                ### will raise an error if configuration is incorrect / missing
211                conn = types[type](label=label, **kw)
212                connectors[type][label] = conn
213            except InvalidAttributesError as ie:
214                warn(
215                    f"Incorrect attributes for connector '{type}:{label}'.\n"
216                    + str(ie),
217                    stack = False,
218                )
219                conn = None
220            except Exception as e:
221                from meerschaum.utils.formatting import get_console
222                console = get_console()
223                if console:
224                    console.print_exception()
225                warn(
226                    f"Exception when creating connector '{type}:{label}'.\n" + str(e),
227                    stack = False,
228                )
229                conn = None
230        if conn is None:
231            return None
232
233    return connectors[type][label]

Return existing connector or create new connection and store for reuse.

You can create new connectors if enough parameters are provided for the given type and flavor.

Parameters
  • type (Optional[str], default None): Connector type (sql, api, etc.). Defaults to the type of the configured instance_connector.
  • label (Optional[str], default None): Connector label (e.g. main). Defaults to 'main'.
  • refresh (bool, default False): Refresh the Connector instance / construct new object. Defaults to False.
  • kw (Any): Other arguments to pass to the Connector constructor. If the Connector has already been constructed and new arguments are provided, refresh is set to True and the old Connector is replaced.
Returns
Examples

The following parameters would create a new meerschaum.connectors.SQLConnector that isn't in the configuration file.

>>> conn = get_connector(
...     type = 'sql',
...     label = 'newlabel',
...     flavor = 'sqlite',
...     database = '/file/path/to/database.db'
... )
>>>
def get_config( *keys: str, patch: bool = True, substitute: bool = True, sync_files: bool = True, write_missing: bool = True, as_tuple: bool = False, warn: bool = True, debug: bool = False) -> Any:
 82def get_config(
 83        *keys: str,
 84        patch: bool = True,
 85        substitute: bool = True,
 86        sync_files: bool = True,
 87        write_missing: bool = True,
 88        as_tuple: bool = False,
 89        warn: bool = True,
 90        debug: bool = False
 91    ) -> Any:
 92    """
 93    Return the Meerschaum configuration dictionary.
 94    If positional arguments are provided, index by the keys.
 95    Raises a warning if invalid keys are provided.
 96
 97    Parameters
 98    ----------
 99    keys: str:
100        List of strings to index.
101
102    patch: bool, default True
103        If `True`, patch missing default keys into the config directory.
104        Defaults to `True`.
105
106    sync_files: bool, default True
107        If `True`, sync files if needed.
108        Defaults to `True`.
109
110    write_missing: bool, default True
111        If `True`, write default values when the main config files are missing.
112        Defaults to `True`.
113
114    substitute: bool, default True
115        If `True`, subsitute 'MRSM{}' values.
116        Defaults to `True`.
117
118    as_tuple: bool, default False
119        If `True`, return a tuple of type (success, value).
120        Defaults to `False`.
121        
122    Returns
123    -------
124    The value in the configuration directory, indexed by the provided keys.
125
126    Examples
127    --------
128    >>> get_config('meerschaum', 'instance')
129    'sql:main'
130    >>> get_config('does', 'not', 'exist')
131    UserWarning: Invalid keys in config: ('does', 'not', 'exist')
132    """
133    import json
134
135    symlinks_key = STATIC_CONFIG['config']['symlinks_key']
136    if debug:
137        from meerschaum.utils.debug import dprint
138        dprint(f"Indexing keys: {keys}", color=False)
139
140    if len(keys) == 0:
141        _rc = _config(substitute=substitute, sync_files=sync_files, write_missing=write_missing)
142        if as_tuple:
143            return True, _rc 
144        return _rc
145    
146    ### Weird threading issues, only import if substitute is True.
147    if substitute:
148        from meerschaum.config._read_config import search_and_substitute_config
149    ### Invalidate the cache if it was read before with substitute=False
150    ### but there still exist substitutions.
151    if (
152        config is not None and substitute and keys[0] != symlinks_key
153        and 'MRSM{' in json.dumps(config.get(keys[0]))
154    ):
155        try:
156            _subbed = search_and_substitute_config({keys[0]: config[keys[0]]})
157        except Exception as e:
158            import traceback
159            traceback.print_exc()
160        config[keys[0]] = _subbed[keys[0]]
161        if symlinks_key in _subbed:
162            if symlinks_key not in config:
163                config[symlinks_key] = {}
164            if keys[0] not in config[symlinks_key]:
165                config[symlinks_key][keys[0]] = {}
166            config[symlinks_key][keys[0]] = apply_patch_to_config(
167                _subbed,
168                config[symlinks_key][keys[0]]
169            )
170
171    from meerschaum.config._sync import sync_files as _sync_files
172    if config is None:
173        _config(*keys, sync_files=sync_files)
174
175    invalid_keys = False
176    if keys[0] not in config and keys[0] != symlinks_key:
177        single_key_config = read_config(
178            keys=[keys[0]], substitute=substitute, write_missing=write_missing
179        )
180        if keys[0] not in single_key_config:
181            invalid_keys = True
182        else:
183            config[keys[0]] = single_key_config.get(keys[0], None)
184            if symlinks_key in single_key_config and keys[0] in single_key_config[symlinks_key]:
185                if symlinks_key not in config:
186                    config[symlinks_key] = {}
187                config[symlinks_key][keys[0]] = single_key_config[symlinks_key][keys[0]]
188
189            if sync_files:
190                _sync_files(keys=[keys[0]])
191
192    c = config
193    if len(keys) > 0:
194        for k in keys:
195            try:
196                c = c[k]
197            except Exception as e:
198                invalid_keys = True
199                break
200        if invalid_keys:
201            ### Check if the keys are in the default configuration.
202            from meerschaum.config._default import default_config
203            in_default = True
204            patched_default_config = (
205                search_and_substitute_config(default_config)
206                if substitute else copy.deepcopy(default_config)
207            )
208            _c = patched_default_config
209            for k in keys:
210                try:
211                    _c = _c[k]
212                except Exception as e:
213                    in_default = False
214            if in_default:
215                c = _c
216                invalid_keys = False
217            warning_msg = f"Invalid keys in config: {keys}"
218            if not in_default:
219                try:
220                    if warn:
221                        from meerschaum.utils.warnings import warn as _warn
222                        _warn(warning_msg, stacklevel=3, color=False)
223                except Exception as e:
224                    if warn:
225                        print(warning_msg)
226                if as_tuple:
227                    return False, None
228                return None
229
230            ### Don't write keys that we haven't yet loaded into memory.
231            not_loaded_keys = [k for k in patched_default_config if k not in config]
232            for k in not_loaded_keys:
233                patched_default_config.pop(k, None)
234
235            set_config(
236                apply_patch_to_config(
237                    patched_default_config,
238                    config,
239                )
240            )
241            if patch and keys[0] != symlinks_key:
242                if write_missing:
243                    write_config(config, debug=debug)
244
245    if as_tuple:
246        return (not invalid_keys), c
247    return c

Return the Meerschaum configuration dictionary. If positional arguments are provided, index by the keys. Raises a warning if invalid keys are provided.

Parameters
  • keys (str:): List of strings to index.
  • patch (bool, default True): If True, patch missing default keys into the config directory. Defaults to True.
  • sync_files (bool, default True): If True, sync files if needed. Defaults to True.
  • write_missing (bool, default True): If True, write default values when the main config files are missing. Defaults to True.
  • substitute (bool, default True): If True, subsitute 'MRSM{}' values. Defaults to True.
  • as_tuple (bool, default False): If True, return a tuple of type (success, value). Defaults to False.
Returns
  • The value in the configuration directory, indexed by the provided keys.
Examples
>>> get_config('meerschaum', 'instance')
'sql:main'
>>> get_config('does', 'not', 'exist')
UserWarning: Invalid keys in config: ('does', 'not', 'exist')
class Pipe:
 60class Pipe:
 61    """
 62    Access Meerschaum pipes via Pipe objects.
 63    
 64    Pipes are identified by the following:
 65
 66    1. Connector keys (e.g. `'sql:main'`)
 67    2. Metric key (e.g. `'weather'`)
 68    3. Location (optional; e.g. `None`)
 69    
 70    A pipe's connector keys correspond to a data source, and when the pipe is synced,
 71    its `fetch` definition is evaluated and executed to produce new data.
 72    
 73    Alternatively, new data may be directly synced via `pipe.sync()`:
 74    
 75    ```
 76    >>> from meerschaum import Pipe
 77    >>> pipe = Pipe('csv', 'weather')
 78    >>>
 79    >>> import pandas as pd
 80    >>> df = pd.read_csv('weather.csv')
 81    >>> pipe.sync(df)
 82    ```
 83    """
 84
 85    from ._fetch import (
 86        fetch,
 87        get_backtrack_interval,
 88    )
 89    from ._data import (
 90        get_data,
 91        get_backtrack_data,
 92        get_rowcount,
 93        _get_data_as_iterator,
 94        get_chunk_interval,
 95        get_chunk_bounds,
 96    )
 97    from ._register import register
 98    from ._attributes import (
 99        attributes,
100        parameters,
101        columns,
102        dtypes,
103        get_columns,
104        get_columns_types,
105        get_indices,
106        tags,
107        get_id,
108        id,
109        get_val_column,
110        parents,
111        children,
112        target,
113        _target_legacy,
114        guess_datetime,
115    )
116    from ._show import show
117    from ._edit import edit, edit_definition, update
118    from ._sync import (
119        sync,
120        get_sync_time,
121        exists,
122        filter_existing,
123        _get_chunk_label,
124        get_num_workers,
125    )
126    from ._verify import (
127        verify,
128        get_bound_interval,
129        get_bound_time,
130    )
131    from ._delete import delete
132    from ._drop import drop
133    from ._clear import clear
134    from ._deduplicate import deduplicate
135    from ._bootstrap import bootstrap
136    from ._dtypes import enforce_dtypes, infer_dtypes
137
138    def __init__(
139        self,
140        connector: str = '',
141        metric: str = '',
142        location: Optional[str] = None,
143        parameters: Optional[Dict[str, Any]] = None,
144        columns: Union[Dict[str, str], List[str], None] = None,
145        tags: Optional[List[str]] = None,
146        target: Optional[str] = None,
147        dtypes: Optional[Dict[str, str]] = None,
148        instance: Optional[Union[str, InstanceConnector]] = None,
149        temporary: bool = False,
150        mrsm_instance: Optional[Union[str, InstanceConnector]] = None,
151        cache: bool = False,
152        debug: bool = False,
153        connector_keys: Optional[str] = None,
154        metric_key: Optional[str] = None,
155        location_key: Optional[str] = None,
156    ):
157        """
158        Parameters
159        ----------
160        connector: str
161            Keys for the pipe's source connector, e.g. `'sql:main'`.
162
163        metric: str
164            Label for the pipe's contents, e.g. `'weather'`.
165
166        location: str, default None
167            Label for the pipe's location. Defaults to `None`.
168
169        parameters: Optional[Dict[str, Any]], default None
170            Optionally set a pipe's parameters from the constructor,
171            e.g. columns and other attributes.
172            You can edit these parameters with `edit pipes`.
173
174        columns: Optional[Dict[str, str]], default None
175            Set the `columns` dictionary of `parameters`.
176            If `parameters` is also provided, this dictionary is added under the `'columns'` key.
177
178        tags: Optional[List[str]], default None
179            A list of strings to be added under the `'tags'` key of `parameters`.
180            You can select pipes with certain tags using `--tags`.
181
182        dtypes: Optional[Dict[str, str]], default None
183            Set the `dtypes` dictionary of `parameters`.
184            If `parameters` is also provided, this dictionary is added under the `'dtypes'` key.
185
186        mrsm_instance: Optional[Union[str, InstanceConnector]], default None
187            Connector for the Meerschaum instance where the pipe resides.
188            Defaults to the preconfigured default instance (`'sql:main'`).
189
190        instance: Optional[Union[str, InstanceConnector]], default None
191            Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored.
192
193        temporary: bool, default False
194            If `True`, prevent instance tables (pipes, users, plugins) from being created.
195
196        cache: bool, default False
197            If `True`, cache fetched data into a local database file.
198            Defaults to `False`.
199        """
200        from meerschaum.utils.warnings import error, warn
201        if (not connector and not connector_keys) or (not metric and not metric_key):
202            error(
203                "Please provide strings for the connector and metric\n    "
204                + "(first two positional arguments)."
205            )
206
207        ### Fall back to legacy `location_key` just in case.
208        if not location:
209            location = location_key
210
211        if not connector:
212            connector = connector_keys
213
214        if not metric:
215            metric = metric_key
216
217        if location in ('[None]', 'None'):
218            location = None
219
220        from meerschaum.config.static import STATIC_CONFIG
221        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
222        for k in (connector, metric, location, *(tags or [])):
223            if str(k).startswith(negation_prefix):
224                error(f"A pipe's keys and tags cannot start with the prefix '{negation_prefix}'.")
225
226        self.connector_keys = str(connector)
227        self.connector_key = self.connector_keys ### Alias
228        self.metric_key = metric
229        self.location_key = location
230        self.temporary = temporary
231
232        self._attributes = {
233            'connector_keys': self.connector_keys,
234            'metric_key': self.metric_key,
235            'location_key': self.location_key,
236            'parameters': {},
237        }
238
239        ### only set parameters if values are provided
240        if isinstance(parameters, dict):
241            self._attributes['parameters'] = parameters
242        else:
243            if parameters is not None:
244                warn(f"The provided parameters are of invalid type '{type(parameters)}'.")
245            self._attributes['parameters'] = {}
246
247        columns = columns or self._attributes.get('parameters', {}).get('columns', {})
248        if isinstance(columns, list):
249            columns = {str(col): str(col) for col in columns}
250        if isinstance(columns, dict):
251            self._attributes['parameters']['columns'] = columns
252        elif columns is not None:
253            warn(f"The provided columns are of invalid type '{type(columns)}'.")
254
255        if isinstance(tags, (list, tuple)):
256            self._attributes['parameters']['tags'] = tags
257        elif tags is not None:
258            warn(f"The provided tags are of invalid type '{type(tags)}'.")
259
260        if isinstance(target, str):
261            self._attributes['parameters']['target'] = target
262        elif target is not None:
263            warn(f"The provided target is of invalid type '{type(target)}'.")
264
265        if isinstance(dtypes, dict):
266            self._attributes['parameters']['dtypes'] = dtypes
267        elif dtypes is not None:
268            warn(f"The provided dtypes are of invalid type '{type(dtypes)}'.")
269
270        ### NOTE: The parameters dictionary is {} by default.
271        ###       A Pipe may be registered without parameters, then edited,
272        ###       or a Pipe may be registered with parameters set in-memory first.
273        #  from meerschaum.config import get_config
274        _mrsm_instance = mrsm_instance if mrsm_instance is not None else instance
275        if _mrsm_instance is None:
276            _mrsm_instance = get_config('meerschaum', 'instance', patch=True)
277
278        if not isinstance(_mrsm_instance, str):
279            self._instance_connector = _mrsm_instance
280            self.instance_keys = str(_mrsm_instance)
281        else: ### NOTE: must be SQL or API Connector for this work
282            self.instance_keys = _mrsm_instance
283
284        self._cache = cache and get_config('system', 'experimental', 'cache')
285
286
287    @property
288    def meta(self):
289        """
290        Return the four keys needed to reconstruct this pipe.
291        """
292        return {
293            'connector': self.connector_keys,
294            'metric': self.metric_key,
295            'location': self.location_key,
296            'instance': self.instance_keys,
297        }
298
299
300    def keys(self) -> List[str]:
301        """
302        Return the ordered keys for this pipe.
303        """
304        return {
305            key: val
306            for key, val in self.meta.items()
307            if key != 'instance'
308        }
309
310
311    @property
312    def instance_connector(self) -> Union[InstanceConnector, None]:
313        """
314        The connector to where this pipe resides.
315        May either be of type `meerschaum.connectors.sql.SQLConnector` or
316        `meerschaum.connectors.api.APIConnector`.
317        """
318        if '_instance_connector' not in self.__dict__:
319            from meerschaum.connectors.parse import parse_instance_keys
320            conn = parse_instance_keys(self.instance_keys)
321            if conn:
322                self._instance_connector = conn
323            else:
324                return None
325        return self._instance_connector
326
327    @property
328    def connector(self) -> Union[meerschaum.connectors.Connector, None]:
329        """
330        The connector to the data source.
331        """
332        if '_connector' not in self.__dict__:
333            from meerschaum.connectors.parse import parse_instance_keys
334            import warnings
335            with warnings.catch_warnings():
336                warnings.simplefilter('ignore')
337                try:
338                    conn = parse_instance_keys(self.connector_keys)
339                except Exception as e:
340                    conn = None
341            if conn:
342                self._connector = conn
343            else:
344                return None
345        return self._connector
346
347
348    @property
349    def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]:
350        """
351        If the pipe was created with `cache=True`, return the connector to the pipe's
352        SQLite database for caching.
353        """
354        if not self._cache:
355            return None
356
357        if '_cache_connector' not in self.__dict__:
358            from meerschaum.connectors import get_connector
359            from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH
360            _resources_path = SQLITE_RESOURCES_PATH
361            self._cache_connector = get_connector(
362                'sql', '_cache_' + str(self),
363                flavor='sqlite',
364                database=str(_resources_path / ('_cache_' + str(self) + '.db')),
365            )
366
367        return self._cache_connector
368
369
370    @property
371    def cache_pipe(self) -> Union['meerschaum.Pipe', None]:
372        """
373        If the pipe was created with `cache=True`, return another `meerschaum.Pipe` used to
374        manage the local data.
375        """
376        if self.cache_connector is None:
377            return None
378        if '_cache_pipe' not in self.__dict__:
379            from meerschaum.config._patch import apply_patch_to_config
380            from meerschaum.utils.sql import sql_item_name
381            _parameters = copy.deepcopy(self.parameters)
382            _fetch_patch = {
383                'fetch': ({
384                    'definition': (
385                        f"SELECT * FROM "
386                        + sql_item_name(
387                            str(self.target),
388                            self.instance_connector.flavor,
389                            self.instance_connector.get_pipe_schema(self),
390                        )
391                    ),
392                }) if self.instance_connector.type == 'sql' else ({
393                    'connector_keys': self.connector_keys,
394                    'metric_key': self.metric_key,
395                    'location_key': self.location_key,
396                })
397            }
398            _parameters = apply_patch_to_config(_parameters, _fetch_patch)
399            self._cache_pipe = Pipe(
400                self.instance_keys,
401                (self.connector_keys + '_' + self.metric_key + '_cache'),
402                self.location_key,
403                mrsm_instance = self.cache_connector,
404                parameters = _parameters,
405                cache = False,
406                temporary = True,
407            )
408
409        return self._cache_pipe
410
411
412    def __str__(self, ansi: bool=False):
413        return pipe_repr(self, ansi=ansi)
414
415
416    def __eq__(self, other):
417        try:
418            return (
419                isinstance(self, type(other))
420                and self.connector_keys == other.connector_keys
421                and self.metric_key == other.metric_key
422                and self.location_key == other.location_key
423                and self.instance_keys == other.instance_keys
424            )
425        except Exception as e:
426            return False
427
428    def __hash__(self):
429        ### Using an esoteric separator to avoid collisions.
430        sep = "[\"']"
431        return hash(
432            str(self.connector_keys) + sep
433            + str(self.metric_key) + sep
434            + str(self.location_key) + sep
435            + str(self.instance_keys) + sep
436        )
437
438    def __repr__(self, ansi: bool=True, **kw) -> str:
439        if not hasattr(sys, 'ps1'):
440            ansi = False
441
442        return pipe_repr(self, ansi=ansi, **kw)
443
444    def __pt_repr__(self):
445        from meerschaum.utils.packages import attempt_import
446        prompt_toolkit_formatted_text = attempt_import('prompt_toolkit.formatted_text', lazy=False)
447        return prompt_toolkit_formatted_text.ANSI(pipe_repr(self, ansi=True))
448
449    def __getstate__(self) -> Dict[str, Any]:
450        """
451        Define the state dictionary (pickling).
452        """
453        return {
454            'connector': self.connector_keys,
455            'metric': self.metric_key,
456            'location': self.location_key,
457            'parameters': self.parameters,
458            'instance': self.instance_keys,
459        }
460
461    def __setstate__(self, _state: Dict[str, Any]):
462        """
463        Read the state (unpickling).
464        """
465        self.__init__(**_state)
466
467
468    def __getitem__(self, key: str) -> Any:
469        """
470        Index the pipe's attributes.
471        If the `key` cannot be found`, return `None`.
472        """
473        if key in self.attributes:
474            return self.attributes.get(key, None)
475
476        aliases = {
477            'connector': 'connector_keys',
478            'connector_key': 'connector_keys',
479            'metric': 'metric_key',
480            'location': 'location_key',
481        }
482        aliased_key = aliases.get(key, None)
483        if aliased_key is not None:
484            return self.attributes.get(aliased_key, None)
485
486        property_aliases = {
487            'instance': 'instance_keys',
488            'instance_key': 'instance_keys',
489        }
490        aliased_key = property_aliases.get(key, None)
491        if aliased_key is not None:
492            key = aliased_key
493        return getattr(self, key, None)

Access Meerschaum pipes via Pipe objects.

Pipes are identified by the following:

  1. Connector keys (e.g. 'sql:main')
  2. Metric key (e.g. 'weather')
  3. Location (optional; e.g. None)

A pipe's connector keys correspond to a data source, and when the pipe is synced, its fetch definition is evaluated and executed to produce new data.

Alternatively, new data may be directly synced via pipe.sync():

>>> from meerschaum import Pipe
>>> pipe = Pipe('csv', 'weather')
>>>
>>> import pandas as pd
>>> df = pd.read_csv('weather.csv')
>>> pipe.sync(df)
Pipe( connector: str = '', metric: str = '', location: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None, columns: Union[Dict[str, str], List[str], NoneType] = None, tags: Optional[List[str]] = None, target: Optional[str] = None, dtypes: Optional[Dict[str, str]] = None, instance: Union[str, meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, temporary: bool = False, mrsm_instance: Union[str, meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType] = None, cache: bool = False, debug: bool = False, connector_keys: Optional[str] = None, metric_key: Optional[str] = None, location_key: Optional[str] = None)
138    def __init__(
139        self,
140        connector: str = '',
141        metric: str = '',
142        location: Optional[str] = None,
143        parameters: Optional[Dict[str, Any]] = None,
144        columns: Union[Dict[str, str], List[str], None] = None,
145        tags: Optional[List[str]] = None,
146        target: Optional[str] = None,
147        dtypes: Optional[Dict[str, str]] = None,
148        instance: Optional[Union[str, InstanceConnector]] = None,
149        temporary: bool = False,
150        mrsm_instance: Optional[Union[str, InstanceConnector]] = None,
151        cache: bool = False,
152        debug: bool = False,
153        connector_keys: Optional[str] = None,
154        metric_key: Optional[str] = None,
155        location_key: Optional[str] = None,
156    ):
157        """
158        Parameters
159        ----------
160        connector: str
161            Keys for the pipe's source connector, e.g. `'sql:main'`.
162
163        metric: str
164            Label for the pipe's contents, e.g. `'weather'`.
165
166        location: str, default None
167            Label for the pipe's location. Defaults to `None`.
168
169        parameters: Optional[Dict[str, Any]], default None
170            Optionally set a pipe's parameters from the constructor,
171            e.g. columns and other attributes.
172            You can edit these parameters with `edit pipes`.
173
174        columns: Optional[Dict[str, str]], default None
175            Set the `columns` dictionary of `parameters`.
176            If `parameters` is also provided, this dictionary is added under the `'columns'` key.
177
178        tags: Optional[List[str]], default None
179            A list of strings to be added under the `'tags'` key of `parameters`.
180            You can select pipes with certain tags using `--tags`.
181
182        dtypes: Optional[Dict[str, str]], default None
183            Set the `dtypes` dictionary of `parameters`.
184            If `parameters` is also provided, this dictionary is added under the `'dtypes'` key.
185
186        mrsm_instance: Optional[Union[str, InstanceConnector]], default None
187            Connector for the Meerschaum instance where the pipe resides.
188            Defaults to the preconfigured default instance (`'sql:main'`).
189
190        instance: Optional[Union[str, InstanceConnector]], default None
191            Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored.
192
193        temporary: bool, default False
194            If `True`, prevent instance tables (pipes, users, plugins) from being created.
195
196        cache: bool, default False
197            If `True`, cache fetched data into a local database file.
198            Defaults to `False`.
199        """
200        from meerschaum.utils.warnings import error, warn
201        if (not connector and not connector_keys) or (not metric and not metric_key):
202            error(
203                "Please provide strings for the connector and metric\n    "
204                + "(first two positional arguments)."
205            )
206
207        ### Fall back to legacy `location_key` just in case.
208        if not location:
209            location = location_key
210
211        if not connector:
212            connector = connector_keys
213
214        if not metric:
215            metric = metric_key
216
217        if location in ('[None]', 'None'):
218            location = None
219
220        from meerschaum.config.static import STATIC_CONFIG
221        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
222        for k in (connector, metric, location, *(tags or [])):
223            if str(k).startswith(negation_prefix):
224                error(f"A pipe's keys and tags cannot start with the prefix '{negation_prefix}'.")
225
226        self.connector_keys = str(connector)
227        self.connector_key = self.connector_keys ### Alias
228        self.metric_key = metric
229        self.location_key = location
230        self.temporary = temporary
231
232        self._attributes = {
233            'connector_keys': self.connector_keys,
234            'metric_key': self.metric_key,
235            'location_key': self.location_key,
236            'parameters': {},
237        }
238
239        ### only set parameters if values are provided
240        if isinstance(parameters, dict):
241            self._attributes['parameters'] = parameters
242        else:
243            if parameters is not None:
244                warn(f"The provided parameters are of invalid type '{type(parameters)}'.")
245            self._attributes['parameters'] = {}
246
247        columns = columns or self._attributes.get('parameters', {}).get('columns', {})
248        if isinstance(columns, list):
249            columns = {str(col): str(col) for col in columns}
250        if isinstance(columns, dict):
251            self._attributes['parameters']['columns'] = columns
252        elif columns is not None:
253            warn(f"The provided columns are of invalid type '{type(columns)}'.")
254
255        if isinstance(tags, (list, tuple)):
256            self._attributes['parameters']['tags'] = tags
257        elif tags is not None:
258            warn(f"The provided tags are of invalid type '{type(tags)}'.")
259
260        if isinstance(target, str):
261            self._attributes['parameters']['target'] = target
262        elif target is not None:
263            warn(f"The provided target is of invalid type '{type(target)}'.")
264
265        if isinstance(dtypes, dict):
266            self._attributes['parameters']['dtypes'] = dtypes
267        elif dtypes is not None:
268            warn(f"The provided dtypes are of invalid type '{type(dtypes)}'.")
269
270        ### NOTE: The parameters dictionary is {} by default.
271        ###       A Pipe may be registered without parameters, then edited,
272        ###       or a Pipe may be registered with parameters set in-memory first.
273        #  from meerschaum.config import get_config
274        _mrsm_instance = mrsm_instance if mrsm_instance is not None else instance
275        if _mrsm_instance is None:
276            _mrsm_instance = get_config('meerschaum', 'instance', patch=True)
277
278        if not isinstance(_mrsm_instance, str):
279            self._instance_connector = _mrsm_instance
280            self.instance_keys = str(_mrsm_instance)
281        else: ### NOTE: must be SQL or API Connector for this work
282            self.instance_keys = _mrsm_instance
283
284        self._cache = cache and get_config('system', 'experimental', 'cache')
Parameters
  • connector (str): Keys for the pipe's source connector, e.g. 'sql:main'.
  • metric (str): Label for the pipe's contents, e.g. 'weather'.
  • location (str, default None): Label for the pipe's location. Defaults to None.
  • parameters (Optional[Dict[str, Any]], default None): Optionally set a pipe's parameters from the constructor, e.g. columns and other attributes. You can edit these parameters with edit pipes.
  • columns (Optional[Dict[str, str]], default None): Set the columns dictionary of parameters. If parameters is also provided, this dictionary is added under the 'columns' key.
  • tags (Optional[List[str]], default None): A list of strings to be added under the 'tags' key of parameters. You can select pipes with certain tags using --tags.
  • dtypes (Optional[Dict[str, str]], default None): Set the dtypes dictionary of parameters. If parameters is also provided, this dictionary is added under the 'dtypes' key.
  • mrsm_instance (Optional[Union[str, InstanceConnector]], default None): Connector for the Meerschaum instance where the pipe resides. Defaults to the preconfigured default instance ('sql:main').
  • instance (Optional[Union[str, InstanceConnector]], default None): Alias for mrsm_instance. If mrsm_instance is supplied, this value is ignored.
  • temporary (bool, default False): If True, prevent instance tables (pipes, users, plugins) from being created.
  • cache (bool, default False): If True, cache fetched data into a local database file. Defaults to False.
connector_keys
connector_key
metric_key
location_key
temporary
meta
287    @property
288    def meta(self):
289        """
290        Return the four keys needed to reconstruct this pipe.
291        """
292        return {
293            'connector': self.connector_keys,
294            'metric': self.metric_key,
295            'location': self.location_key,
296            'instance': self.instance_keys,
297        }

Return the four keys needed to reconstruct this pipe.

def keys(self) -> List[str]:
300    def keys(self) -> List[str]:
301        """
302        Return the ordered keys for this pipe.
303        """
304        return {
305            key: val
306            for key, val in self.meta.items()
307            if key != 'instance'
308        }

Return the ordered keys for this pipe.

instance_connector: Union[meerschaum.connectors.SQLConnector, meerschaum.connectors.APIConnector, NoneType]
311    @property
312    def instance_connector(self) -> Union[InstanceConnector, None]:
313        """
314        The connector to where this pipe resides.
315        May either be of type `meerschaum.connectors.sql.SQLConnector` or
316        `meerschaum.connectors.api.APIConnector`.
317        """
318        if '_instance_connector' not in self.__dict__:
319            from meerschaum.connectors.parse import parse_instance_keys
320            conn = parse_instance_keys(self.instance_keys)
321            if conn:
322                self._instance_connector = conn
323            else:
324                return None
325        return self._instance_connector

The connector to where this pipe resides. May either be of type meerschaum.connectors.SQLConnector or meerschaum.connectors.APIConnector.

connector: Optional[Connector]
327    @property
328    def connector(self) -> Union[meerschaum.connectors.Connector, None]:
329        """
330        The connector to the data source.
331        """
332        if '_connector' not in self.__dict__:
333            from meerschaum.connectors.parse import parse_instance_keys
334            import warnings
335            with warnings.catch_warnings():
336                warnings.simplefilter('ignore')
337                try:
338                    conn = parse_instance_keys(self.connector_keys)
339                except Exception as e:
340                    conn = None
341            if conn:
342                self._connector = conn
343            else:
344                return None
345        return self._connector

The connector to the data source.

cache_connector: Optional[meerschaum.connectors.SQLConnector]
348    @property
349    def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]:
350        """
351        If the pipe was created with `cache=True`, return the connector to the pipe's
352        SQLite database for caching.
353        """
354        if not self._cache:
355            return None
356
357        if '_cache_connector' not in self.__dict__:
358            from meerschaum.connectors import get_connector
359            from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH
360            _resources_path = SQLITE_RESOURCES_PATH
361            self._cache_connector = get_connector(
362                'sql', '_cache_' + str(self),
363                flavor='sqlite',
364                database=str(_resources_path / ('_cache_' + str(self) + '.db')),
365            )
366
367        return self._cache_connector

If the pipe was created with cache=True, return the connector to the pipe's SQLite database for caching.

cache_pipe: Optional[Pipe]
370    @property
371    def cache_pipe(self) -> Union['meerschaum.Pipe', None]:
372        """
373        If the pipe was created with `cache=True`, return another `meerschaum.Pipe` used to
374        manage the local data.
375        """
376        if self.cache_connector is None:
377            return None
378        if '_cache_pipe' not in self.__dict__:
379            from meerschaum.config._patch import apply_patch_to_config
380            from meerschaum.utils.sql import sql_item_name
381            _parameters = copy.deepcopy(self.parameters)
382            _fetch_patch = {
383                'fetch': ({
384                    'definition': (
385                        f"SELECT * FROM "
386                        + sql_item_name(
387                            str(self.target),
388                            self.instance_connector.flavor,
389                            self.instance_connector.get_pipe_schema(self),
390                        )
391                    ),
392                }) if self.instance_connector.type == 'sql' else ({
393                    'connector_keys': self.connector_keys,
394                    'metric_key': self.metric_key,
395                    'location_key': self.location_key,
396                })
397            }
398            _parameters = apply_patch_to_config(_parameters, _fetch_patch)
399            self._cache_pipe = Pipe(
400                self.instance_keys,
401                (self.connector_keys + '_' + self.metric_key + '_cache'),
402                self.location_key,
403                mrsm_instance = self.cache_connector,
404                parameters = _parameters,
405                cache = False,
406                temporary = True,
407            )
408
409        return self._cache_pipe

If the pipe was created with cache=True, return another Pipe used to manage the local data.

def fetch( self, begin: Union[datetime.datetime, str, NoneType] = '', end: Optional[datetime.datetime] = None, check_existing: bool = True, sync_chunks: bool = False, debug: bool = False, **kw: Any) -> Union[pandas.core.frame.DataFrame, Iterator[pandas.core.frame.DataFrame], NoneType]:
21def fetch(
22        self,
23        begin: Union[datetime, str, None] = '',
24        end: Optional[datetime] = None,
25        check_existing: bool = True,
26        sync_chunks: bool = False,
27        debug: bool = False,
28        **kw: Any
29    ) -> Union['pd.DataFrame', Iterator['pd.DataFrame'], None]:
30    """
31    Fetch a Pipe's latest data from its connector.
32
33    Parameters
34    ----------
35    begin: Union[datetime, str, None], default '':
36        If provided, only fetch data newer than or equal to `begin`.
37
38    end: Optional[datetime], default None:
39        If provided, only fetch data older than or equal to `end`.
40
41    check_existing: bool, default True
42        If `False`, do not apply the backtrack interval.
43
44    sync_chunks: bool, default False
45        If `True` and the pipe's connector is of type `'sql'`, begin syncing chunks while fetching
46        loads chunks into memory.
47
48    debug: bool, default False
49        Verbosity toggle.
50
51    Returns
52    -------
53    A `pd.DataFrame` of the newest unseen data.
54
55    """
56    if 'fetch' not in dir(self.connector):
57        warn(f"No `fetch()` function defined for connector '{self.connector}'")
58        return None
59
60    from meerschaum.connectors import custom_types, get_connector_plugin
61    from meerschaum.utils.debug import dprint, _checkpoint
62    from meerschaum.utils.misc import filter_arguments
63
64    _chunk_hook = kw.pop('chunk_hook', None)
65    kw['workers'] = self.get_num_workers(kw.get('workers', None))
66    if sync_chunks and _chunk_hook is None:
67
68        def _chunk_hook(chunk, **_kw) -> SuccessTuple:
69            """
70            Wrap `Pipe.sync()` with a custom chunk label prepended to the message.
71            """
72            from meerschaum.config._patch import apply_patch_to_config
73            kwargs = apply_patch_to_config(kw, _kw)
74            chunk_success, chunk_message = self.sync(chunk, **kwargs)
75            chunk_label = self._get_chunk_label(chunk, self.columns.get('datetime', None))
76            if chunk_label:
77                chunk_message = '\n' + chunk_label + '\n' + chunk_message
78            return chunk_success, chunk_message
79
80    with mrsm.Venv(get_connector_plugin(self.connector)):
81        _args, _kwargs = filter_arguments(
82            self.connector.fetch,
83            self,
84            begin=_determine_begin(
85                self,
86                begin,
87                check_existing=check_existing,
88                debug=debug,
89            ),
90            end=end,
91            chunk_hook=_chunk_hook,
92            debug=debug,
93            **kw
94        )
95        df = self.connector.fetch(*_args, **_kwargs)
96    return df

Fetch a Pipe's latest data from its connector.

Parameters
  • begin (Union[datetime, str, None], default '':): If provided, only fetch data newer than or equal to begin.
  • end (Optional[datetime], default None:): If provided, only fetch data older than or equal to end.
  • check_existing (bool, default True): If False, do not apply the backtrack interval.
  • sync_chunks (bool, default False): If True and the pipe's connector is of type 'sql', begin syncing chunks while fetching loads chunks into memory.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pd.DataFrame of the newest unseen data.
def get_backtrack_interval( self, check_existing: bool = True, debug: bool = False) -> Union[datetime.timedelta, int]:
 99def get_backtrack_interval(
100    self,
101    check_existing: bool = True,
102    debug: bool = False,
103) -> Union[timedelta, int]:
104    """
105    Get the chunk interval to use for this pipe.
106
107    Parameters
108    ----------
109    check_existing: bool, default True
110        If `False`, return a backtrack_interval of 0 minutes.
111
112    Returns
113    -------
114    The backtrack interval (`timedelta` or `int`) to use with this pipe's `datetime` axis.
115    """
116    default_backtrack_minutes = get_config('pipes', 'parameters', 'fetch', 'backtrack_minutes')
117    configured_backtrack_minutes = self.parameters.get('fetch', {}).get('backtrack_minutes', None)
118    backtrack_minutes = (
119        configured_backtrack_minutes
120        if configured_backtrack_minutes is not None
121        else default_backtrack_minutes
122    ) if check_existing else 0
123
124    backtrack_interval = timedelta(minutes=backtrack_minutes)
125    dt_col = self.columns.get('datetime', None)
126    if dt_col is None:
127        return backtrack_interval
128
129    dt_dtype = self.dtypes.get(dt_col, 'datetime64[ns]')
130    if 'int' in dt_dtype.lower():
131        return backtrack_minutes
132
133    return backtrack_interval

Get the chunk interval to use for this pipe.

Parameters
  • check_existing (bool, default True): If False, return a backtrack_interval of 0 minutes.
Returns
  • The backtrack interval (timedelta or int) to use with this pipe's datetime axis.
def get_data( self, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, as_iterator: bool = False, as_chunks: bool = False, as_dask: bool = False, chunk_interval: Union[datetime.timedelta, int, NoneType] = None, fresh: bool = False, debug: bool = False, **kw: Any) -> Union[pandas.core.frame.DataFrame, Iterator[pandas.core.frame.DataFrame], NoneType]:
 23def get_data(
 24    self,
 25    select_columns: Optional[List[str]] = None,
 26    omit_columns: Optional[List[str]] = None,
 27    begin: Union[datetime, int, None] = None,
 28    end: Union[datetime, int, None] = None,
 29    params: Optional[Dict[str, Any]] = None,
 30    as_iterator: bool = False,
 31    as_chunks: bool = False,
 32    as_dask: bool = False,
 33    chunk_interval: Union[timedelta, int, None] = None,
 34    fresh: bool = False,
 35    debug: bool = False,
 36    **kw: Any
 37) -> Union['pd.DataFrame', Iterator['pd.DataFrame'], None]:
 38    """
 39    Get a pipe's data from the instance connector.
 40
 41    Parameters
 42    ----------
 43    select_columns: Optional[List[str]], default None
 44        If provided, only select these given columns.
 45        Otherwise select all available columns (i.e. `SELECT *`).
 46
 47    omit_columns: Optional[List[str]], default None
 48        If provided, remove these columns from the selection.
 49
 50    begin: Union[datetime, int, None], default None
 51        Lower bound datetime to begin searching for data (inclusive).
 52        Translates to a `WHERE` clause like `WHERE datetime >= begin`.
 53        Defaults to `None`.
 54
 55    end: Union[datetime, int, None], default None
 56        Upper bound datetime to stop searching for data (inclusive).
 57        Translates to a `WHERE` clause like `WHERE datetime < end`.
 58        Defaults to `None`.
 59
 60    params: Optional[Dict[str, Any]], default None
 61        Filter the retrieved data by a dictionary of parameters.
 62        See `meerschaum.utils.sql.build_where` for more details. 
 63
 64    as_iterator: bool, default False
 65        If `True`, return a generator of chunks of pipe data.
 66
 67    as_chunks: bool, default False
 68        Alias for `as_iterator`.
 69
 70    as_dask: bool, default False
 71        If `True`, return a `dask.DataFrame`
 72        (which may be loaded into a Pandas DataFrame with `df.compute()`).
 73
 74    chunk_interval: Union[timedelta, int, None], default None
 75        If `as_iterator`, then return chunks with `begin` and `end` separated by this interval.
 76        This may be set under `pipe.parameters['chunk_minutes']`.
 77        By default, use a timedelta of 1440 minutes (1 day).
 78        If `chunk_interval` is an integer and the `datetime` axis a timestamp,
 79        the use a timedelta with the number of minutes configured to this value.
 80        If the `datetime` axis is an integer, default to the configured chunksize.
 81        If `chunk_interval` is a `timedelta` and the `datetime` axis an integer,
 82        use the number of minutes in the `timedelta`.
 83
 84    fresh: bool, default True
 85        If `True`, skip local cache and directly query the instance connector.
 86        Defaults to `True`.
 87
 88    debug: bool, default False
 89        Verbosity toggle.
 90        Defaults to `False`.
 91
 92    Returns
 93    -------
 94    A `pd.DataFrame` for the pipe's data corresponding to the provided parameters.
 95
 96    """
 97    from meerschaum.utils.warnings import warn
 98    from meerschaum.utils.venv import Venv
 99    from meerschaum.connectors import get_connector_plugin
100    from meerschaum.utils.misc import iterate_chunks, items_str
101    from meerschaum.utils.dtypes import to_pandas_dtype
102    from meerschaum.utils.dataframe import add_missing_cols_to_df
103    from meerschaum.utils.packages import attempt_import
104    dd = attempt_import('dask.dataframe') if as_dask else None
105    dask = attempt_import('dask') if as_dask else None
106
107    if select_columns == '*':
108        select_columns = None
109    elif isinstance(select_columns, str):
110        select_columns = [select_columns]
111
112    if isinstance(omit_columns, str):
113        omit_columns = [omit_columns]
114
115    as_iterator = as_iterator or as_chunks
116
117    if as_iterator or as_chunks:
118        return self._get_data_as_iterator(
119            select_columns = select_columns,
120            omit_columns = omit_columns,
121            begin = begin,
122            end = end,
123            params = params,
124            chunk_interval = chunk_interval,
125            fresh = fresh,
126            debug = debug,
127        )
128
129    if as_dask:
130        from multiprocessing.pool import ThreadPool
131        dask_pool = ThreadPool(self.get_num_workers())
132        dask.config.set(pool=dask_pool)
133        chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug)
134        bounds = self.get_chunk_bounds(
135            begin = begin,
136            end = end,
137            bounded = False,
138            chunk_interval = chunk_interval,
139            debug = debug,
140        )
141        dask_chunks = [
142            dask.delayed(self.get_data)(
143                select_columns = select_columns,
144                omit_columns = omit_columns,
145                begin = chunk_begin,
146                end = chunk_end,
147                params = params,
148                chunk_interval = chunk_interval,
149                fresh = fresh,
150                debug = debug,
151            )
152            for (chunk_begin, chunk_end) in bounds
153        ]
154        dask_meta = {
155            col: to_pandas_dtype(typ)
156            for col, typ in self.dtypes.items()
157        }
158        return dd.from_delayed(dask_chunks, meta=dask_meta)
159
160    if not self.exists(debug=debug):
161        return None
162       
163    if self.cache_pipe is not None:
164        if not fresh:
165            _sync_cache_tuple = self.cache_pipe.sync(
166                begin = begin,
167                end = end,
168                params = params,
169                debug = debug,
170                **kw
171            )
172            if not _sync_cache_tuple[0]:
173                warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1])
174                fresh = True
175            else: ### Successfully synced cache.
176                return self.enforce_dtypes(
177                    self.cache_pipe.get_data(
178                        select_columns = select_columns,
179                        omit_columns = omit_columns,
180                        begin = begin,
181                        end = end,
182                        params = params,
183                        debug = debug,
184                        fresh = True,
185                        **kw
186                    ),
187                    debug = debug,
188                )
189
190    with Venv(get_connector_plugin(self.instance_connector)):
191        df = self.instance_connector.get_pipe_data(
192            pipe = self,
193            select_columns = select_columns,
194            omit_columns = omit_columns,
195            begin = begin,
196            end = end,
197            params = params,
198            debug = debug,
199            **kw
200        )
201        if df is None:
202            return df
203
204        if not select_columns:
205            select_columns = [col for col in df.columns]
206
207        cols_to_omit = [
208            col
209            for col in df.columns
210            if (
211                col in (omit_columns or [])
212                or
213                col not in (select_columns or [])
214            )
215        ]
216        cols_to_add = [
217            col
218            for col in select_columns
219            if col not in df.columns
220        ]
221        if cols_to_omit:
222            warn(
223                (
224                    f"Received {len(cols_to_omit)} omitted column"
225                    + ('s' if len(cols_to_omit) != 1 else '')
226                    + f" for {self}. "
227                    + "Consider adding `select_columns` and `omit_columns` support to "
228                    + f"'{self.instance_connector.type}' connectors to improve performance."
229                ),
230                stack = False,
231            )
232            _cols_to_select = [col for col in df.columns if col not in cols_to_omit]
233            df = df[_cols_to_select]
234
235        if cols_to_add:
236            warn(
237                (
238                    f"Specified columns {items_str(cols_to_add)} were not found on {self}. "
239                    + "Adding these to the DataFrame as null columns."
240                ),
241                stack = False,
242            )
243            df = add_missing_cols_to_df(df, {col: 'string' for col in cols_to_add})
244
245        return self.enforce_dtypes(df, debug=debug)

Get a pipe's data from the instance connector.

Parameters
  • select_columns (Optional[List[str]], default None): If provided, only select these given columns. Otherwise select all available columns (i.e. SELECT *).
  • omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
  • begin (Union[datetime, int, None], default None): Lower bound datetime to begin searching for data (inclusive). Translates to a WHERE clause like WHERE datetime >= begin. Defaults to None.
  • end (Union[datetime, int, None], default None): Upper bound datetime to stop searching for data (inclusive). Translates to a WHERE clause like WHERE datetime < end. Defaults to None.
  • params (Optional[Dict[str, Any]], default None): Filter the retrieved data by a dictionary of parameters. See meerschaum.utils.sql.build_where for more details.
  • as_iterator (bool, default False): If True, return a generator of chunks of pipe data.
  • as_chunks (bool, default False): Alias for as_iterator.
  • as_dask (bool, default False): If True, return a dask.DataFrame (which may be loaded into a Pandas DataFrame with df.compute()).
  • chunk_interval (Union[timedelta, int, None], default None): If as_iterator, then return chunks with begin and end separated by this interval. This may be set under pipe.parameters['chunk_minutes']. By default, use a timedelta of 1440 minutes (1 day). If chunk_interval is an integer and the datetime axis a timestamp, the use a timedelta with the number of minutes configured to this value. If the datetime axis is an integer, default to the configured chunksize. If chunk_interval is a timedelta and the datetime axis an integer, use the number of minutes in the timedelta.
  • fresh (bool, default True): If True, skip local cache and directly query the instance connector. Defaults to True.
  • debug (bool, default False): Verbosity toggle. Defaults to False.
Returns
  • A pd.DataFrame for the pipe's data corresponding to the provided parameters.
def get_backtrack_data( self, backtrack_minutes: Optional[int] = None, begin: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, fresh: bool = False, debug: bool = False, **kw: Any) -> Optional[pandas.core.frame.DataFrame]:
340def get_backtrack_data(
341        self,
342        backtrack_minutes: Optional[int] = None,
343        begin: Union[datetime, int, None] = None,
344        params: Optional[Dict[str, Any]] = None,
345        fresh: bool = False,
346        debug: bool = False,
347        **kw: Any
348    ) -> Optional['pd.DataFrame']:
349    """
350    Get the most recent data from the instance connector as a Pandas DataFrame.
351
352    Parameters
353    ----------
354    backtrack_minutes: Optional[int], default None
355        How many minutes from `begin` to select from.
356        If `None`, use `pipe.parameters['fetch']['backtrack_minutes']`.
357
358    begin: Optional[datetime], default None
359        The starting point to search for data.
360        If begin is `None` (default), use the most recent observed datetime
361        (AKA sync_time).
362
363        ```
364        E.g. begin = 02:00
365
366        Search this region.           Ignore this, even if there's data.
367        /  /  /  /  /  /  /  /  /  |
368        -----|----------|----------|----------|----------|----------|
369        00:00      01:00      02:00      03:00      04:00      05:00
370
371        ```
372
373    params: Optional[Dict[str, Any]], default None
374        The standard Meerschaum `params` query dictionary.
375        
376        
377    fresh: bool, default False
378        If `True`, Ignore local cache and pull directly from the instance connector.
379        Only comes into effect if a pipe was created with `cache=True`.
380
381    debug: bool default False
382        Verbosity toggle.
383
384    Returns
385    -------
386    A `pd.DataFrame` for the pipe's data corresponding to the provided parameters. Backtrack data
387    is a convenient way to get a pipe's data "backtracked" from the most recent datetime.
388    """
389    from meerschaum.utils.warnings import warn
390    from meerschaum.utils.venv import Venv
391    from meerschaum.connectors import get_connector_plugin
392
393    if not self.exists(debug=debug):
394        return None
395
396    backtrack_interval = self.get_backtrack_interval(debug=debug)
397    if backtrack_minutes is None:
398        backtrack_minutes = (
399            (backtrack_interval.total_seconds() * 60)
400            if isinstance(backtrack_interval, timedelta)
401            else backtrack_interval
402        )
403
404    if self.cache_pipe is not None:
405        if not fresh:
406            _sync_cache_tuple = self.cache_pipe.sync(begin=begin, params=params, debug=debug, **kw)
407            if not _sync_cache_tuple[0]:
408                warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1])
409                fresh = True
410            else: ### Successfully synced cache.
411                return self.enforce_dtypes(
412                    self.cache_pipe.get_backtrack_data(
413                        fresh = True,
414                        begin = begin,
415                        backtrack_minutes = backtrack_minutes,
416                        params = params,
417                        debug = deubg,
418                        **kw
419                    ),
420                    debug = debug,
421                )
422
423    if hasattr(self.instance_connector, 'get_backtrack_data'):
424        with Venv(get_connector_plugin(self.instance_connector)):
425            return self.enforce_dtypes(
426                self.instance_connector.get_backtrack_data(
427                    pipe = self,
428                    begin = begin,
429                    backtrack_minutes = backtrack_minutes,
430                    params = params,
431                    debug = debug,
432                    **kw
433                ),
434                debug = debug,
435            )
436
437    if begin is None:
438        begin = self.get_sync_time(params=params, debug=debug)
439
440    backtrack_interval = (
441        timedelta(minutes=backtrack_minutes)
442        if isinstance(begin, datetime)
443        else backtrack_minutes
444    )
445    if begin is not None:
446        begin = begin - backtrack_interval
447
448    return self.get_data(
449        begin = begin,
450        params = params,
451        debug = debug,
452        **kw
453    )

Get the most recent data from the instance connector as a Pandas DataFrame.

Parameters
  • backtrack_minutes (Optional[int], default None): How many minutes from begin to select from. If None, use pipe.parameters['fetch']['backtrack_minutes'].
  • begin (Optional[datetime], default None): The starting point to search for data. If begin is None (default), use the most recent observed datetime (AKA sync_time).

    E.g. begin = 02:00
    
    Search this region.           Ignore this, even if there's data.
    /  /  /  /  /  /  /  /  /  |
    -----|----------|----------|----------|----------|----------|
    00:00      01:00      02:00      03:00      04:00      05:00
    
    
  • params (Optional[Dict[str, Any]], default None): The standard Meerschaum params query dictionary.

  • fresh (bool, default False): If True, Ignore local cache and pull directly from the instance connector. Only comes into effect if a pipe was created with cache=True.
  • debug (bool default False): Verbosity toggle.
Returns
  • A pd.DataFrame for the pipe's data corresponding to the provided parameters. Backtrack data
  • is a convenient way to get a pipe's data "backtracked" from the most recent datetime.
def get_rowcount( self, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, params: Optional[Dict[str, Any]] = None, remote: bool = False, debug: bool = False) -> int:
456def get_rowcount(
457        self,
458        begin: Optional[datetime] = None,
459        end: Optional['datetime'] = None,
460        params: Optional[Dict[str, Any]] = None,
461        remote: bool = False,
462        debug: bool = False
463    ) -> int:
464    """
465    Get a Pipe's instance or remote rowcount.
466
467    Parameters
468    ----------
469    begin: Optional[datetime], default None
470        Count rows where datetime > begin.
471
472    end: Optional[datetime], default None
473        Count rows where datetime < end.
474
475    remote: bool, default False
476        Count rows from a pipe's remote source.
477        **NOTE**: This is experimental!
478
479    debug: bool, default False
480        Verbosity toggle.
481
482    Returns
483    -------
484    An `int` of the number of rows in the pipe corresponding to the provided parameters.
485    Returned 0 if the pipe does not exist.
486    """
487    from meerschaum.utils.warnings import warn
488    from meerschaum.utils.venv import Venv
489    from meerschaum.connectors import get_connector_plugin
490
491    connector = self.instance_connector if not remote else self.connector
492    try:
493        with Venv(get_connector_plugin(connector)):
494            rowcount = connector.get_pipe_rowcount(
495                self,
496                begin = begin,
497                end = end,
498                params = params,
499                remote = remote,
500                debug = debug,
501            )
502            if rowcount is None:
503                return 0
504            return rowcount
505    except AttributeError as e:
506        warn(e)
507        if remote:
508            return 0
509    warn(f"Failed to get a rowcount for {self}.")
510    return 0

Get a Pipe's instance or remote rowcount.

Parameters
  • begin (Optional[datetime], default None): Count rows where datetime > begin.
  • end (Optional[datetime], default None): Count rows where datetime < end.
  • remote (bool, default False): Count rows from a pipe's remote source. NOTE: This is experimental!
  • debug (bool, default False): Verbosity toggle.
Returns
  • An int of the number of rows in the pipe corresponding to the provided parameters.
  • Returned 0 if the pipe does not exist.
def get_chunk_interval( self, chunk_interval: Union[datetime.timedelta, int, NoneType] = None, debug: bool = False) -> Union[datetime.timedelta, int]:
513def get_chunk_interval(
514        self,
515        chunk_interval: Union[timedelta, int, None] = None,
516        debug: bool = False,
517    ) -> Union[timedelta, int]:
518    """
519    Get the chunk interval to use for this pipe.
520
521    Parameters
522    ----------
523    chunk_interval: Union[timedelta, int, None], default None
524        If provided, coerce this value into the correct type.
525        For example, if the datetime axis is an integer, then
526        return the number of minutes.
527
528    Returns
529    -------
530    The chunk interval (`timedelta` or `int`) to use with this pipe's `datetime` axis.
531    """
532    default_chunk_minutes = get_config('pipes', 'parameters', 'verify', 'chunk_minutes')
533    configured_chunk_minutes = self.parameters.get('verify', {}).get('chunk_minutes', None)
534    chunk_minutes = (
535        (configured_chunk_minutes or default_chunk_minutes)
536        if chunk_interval is None
537        else (
538            chunk_interval
539            if isinstance(chunk_interval, int)
540            else int(chunk_interval.total_seconds() / 60)
541        )
542    )
543
544    dt_col = self.columns.get('datetime', None)
545    if dt_col is None:
546        return timedelta(minutes=chunk_minutes)
547
548    dt_dtype = self.dtypes.get(dt_col, 'datetime64[ns]')
549    if 'int' in dt_dtype.lower():
550        return chunk_minutes
551    return timedelta(minutes=chunk_minutes)

Get the chunk interval to use for this pipe.

Parameters
  • chunk_interval (Union[timedelta, int, None], default None): If provided, coerce this value into the correct type. For example, if the datetime axis is an integer, then return the number of minutes.
Returns
  • The chunk interval (timedelta or int) to use with this pipe's datetime axis.
def get_chunk_bounds( self, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, bounded: bool = False, chunk_interval: Union[datetime.timedelta, int, NoneType] = None, debug: bool = False) -> List[Tuple[Union[datetime.datetime, int, NoneType], Union[datetime.datetime, int, NoneType]]]:
554def get_chunk_bounds(
555        self,
556        begin: Union[datetime, int, None] = None,
557        end: Union[datetime, int, None] = None,
558        bounded: bool = False,
559        chunk_interval: Union[timedelta, int, None] = None,
560        debug: bool = False,
561    ) -> List[
562        Tuple[
563            Union[datetime, int, None],
564            Union[datetime, int, None],
565        ]
566    ]:
567    """
568    Return a list of datetime bounds for iterating over the pipe's `datetime` axis.
569
570    Parameters
571    ----------
572    begin: Union[datetime, int, None], default None
573        If provided, do not select less than this value.
574        Otherwise the first chunk will be unbounded.
575
576    end: Union[datetime, int, None], default None
577        If provided, do not select greater than or equal to this value.
578        Otherwise the last chunk will be unbounded.
579
580    bounded: bool, default False
581        If `True`, do not include `None` in the first chunk.
582
583    chunk_interval: Union[timedelta, int, None], default None
584        If provided, use this interval for the size of chunk boundaries.
585        The default value for this pipe may be set
586        under `pipe.parameters['verify']['chunk_minutes']`.
587
588    debug: bool, default False
589        Verbosity toggle.
590
591    Returns
592    -------
593    A list of chunk bounds (datetimes or integers).
594    If unbounded, the first and last chunks will include `None`.
595    """
596    include_less_than_begin = not bounded and begin is None
597    include_greater_than_end = not bounded and end is None
598    if begin is None:
599        begin = self.get_sync_time(newest=False, debug=debug)
600    if end is None:
601        end = self.get_sync_time(newest=True, debug=debug)
602    if begin is None and end is None:
603        return [(None, None)]
604
605    ### Set the chunk interval under `pipe.parameters['verify']['chunk_minutes']`.
606    chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug)
607    
608    ### Build a list of tuples containing the chunk boundaries
609    ### so that we can sync multiple chunks in parallel.
610    ### Run `verify pipes --workers 1` to sync chunks in series.
611    chunk_bounds = []
612    begin_cursor = begin
613    while begin_cursor < end:
614        end_cursor = begin_cursor + chunk_interval
615        chunk_bounds.append((begin_cursor, end_cursor))
616        begin_cursor = end_cursor
617
618    ### The chunk interval might be too large.
619    if not chunk_bounds and end >= begin:
620        chunk_bounds = [(begin, end)]
621
622    ### Truncate the last chunk to the end timestamp.
623    if chunk_bounds[-1][1] > end:
624        chunk_bounds[-1] = (chunk_bounds[-1][0], end)
625
626    ### Pop the last chunk if its bounds are equal.
627    if chunk_bounds[-1][0] == chunk_bounds[-1][1]:
628        chunk_bounds = chunk_bounds[:-1]
629
630    if include_less_than_begin:
631        chunk_bounds = [(None, begin)] + chunk_bounds
632    if include_greater_than_end:
633        chunk_bounds = chunk_bounds + [(end, None)]
634
635    return chunk_bounds

Return a list of datetime bounds for iterating over the pipe's datetime axis.

Parameters
  • begin (Union[datetime, int, None], default None): If provided, do not select less than this value. Otherwise the first chunk will be unbounded.
  • end (Union[datetime, int, None], default None): If provided, do not select greater than or equal to this value. Otherwise the last chunk will be unbounded.
  • bounded (bool, default False): If True, do not include None in the first chunk.
  • chunk_interval (Union[timedelta, int, None], default None): If provided, use this interval for the size of chunk boundaries. The default value for this pipe may be set under pipe.parameters['verify']['chunk_minutes'].
  • debug (bool, default False): Verbosity toggle.
Returns
  • A list of chunk bounds (datetimes or integers).
  • If unbounded, the first and last chunks will include None.
def register(self, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
12def register(
13        self,
14        debug: bool = False,
15        **kw: Any
16    ) -> SuccessTuple:
17    """
18    Register a new Pipe along with its attributes.
19
20    Parameters
21    ----------
22    debug: bool, default False
23        Verbosity toggle.
24
25    kw: Any
26        Keyword arguments to pass to `instance_connector.register_pipe()`.
27
28    Returns
29    -------
30    A `SuccessTuple` of success, message.
31    """
32    if self.temporary:
33        return False, "Cannot register pipes created with `temporary=True` (read-only)."
34
35    from meerschaum.utils.formatting import get_console
36    from meerschaum.utils.venv import Venv
37    from meerschaum.connectors import get_connector_plugin, custom_types
38    from meerschaum.config._patch import apply_patch_to_config
39
40    import warnings
41    with warnings.catch_warnings():
42        warnings.simplefilter('ignore')
43        try:
44            _conn = self.connector
45        except Exception as e:
46            _conn = None
47
48    if (
49        _conn is not None
50        and
51        (_conn.type == 'plugin' or _conn.type in custom_types)
52        and
53        getattr(_conn, 'register', None) is not None
54    ):
55        try:
56            with Venv(get_connector_plugin(_conn), debug=debug):
57                params = self.connector.register(self)
58        except Exception as e:
59            get_console().print_exception()
60            params = None
61        params = {} if params is None else params
62        if not isinstance(params, dict):
63            from meerschaum.utils.warnings import warn
64            warn(
65                f"Invalid parameters returned from `register()` in connector {self.connector}:\n"
66                + f"{params}"
67            )
68        else:
69            self.parameters = apply_patch_to_config(params, self.parameters)
70
71    if not self.parameters:
72        cols = self.columns if self.columns else {'datetime': None, 'id': None}
73        self.parameters = {
74            'columns': cols,
75        }
76
77    with Venv(get_connector_plugin(self.instance_connector)):
78        return self.instance_connector.register_pipe(self, debug=debug, **kw)

Register a new Pipe along with its attributes.

Parameters
  • debug (bool, default False): Verbosity toggle.
  • kw (Any): Keyword arguments to pass to instance_connector.register_pipe().
Returns
attributes: Dict[str, Any]
14@property
15def attributes(self) -> Dict[str, Any]:
16    """
17    Return a dictionary of a pipe's keys and parameters.
18    These values are reflected directly from the pipes table of the instance.
19    """
20    import time
21    from meerschaum.config import get_config
22    from meerschaum.config._patch import apply_patch_to_config
23    from meerschaum.utils.venv import Venv
24    from meerschaum.connectors import get_connector_plugin
25
26    timeout_seconds = get_config('pipes', 'attributes', 'local_cache_timeout_seconds')
27
28    if '_attributes' not in self.__dict__:
29        self._attributes = {}
30
31    now = time.perf_counter()
32    last_refresh = self.__dict__.get('_attributes_sync_time', None)
33    timed_out = (
34        last_refresh is None
35        or
36        (timeout_seconds is not None and (now - last_refresh) >= timeout_seconds)
37    )
38    if not self.temporary and timed_out:
39        self._attributes_sync_time = now
40        local_attributes = self.__dict__.get('_attributes', {})
41        with Venv(get_connector_plugin(self.instance_connector)):
42            instance_attributes = self.instance_connector.get_pipe_attributes(self)
43        self._attributes = apply_patch_to_config(instance_attributes, local_attributes)
44    return self._attributes

Return a dictionary of a pipe's keys and parameters. These values are reflected directly from the pipes table of the instance.

parameters: Optional[Dict[str, Any]]
47@property
48def parameters(self) -> Optional[Dict[str, Any]]:
49    """
50    Return the parameters dictionary of the pipe.
51    """
52    if 'parameters' not in self.attributes:
53        self.attributes['parameters'] = {}
54    return self.attributes['parameters']

Return the parameters dictionary of the pipe.

columns: Optional[Dict[str, str]]
66@property
67def columns(self) -> Union[Dict[str, str], None]:
68    """
69    Return the `columns` dictionary defined in `meerschaum.Pipe.parameters`.
70    """
71    if 'columns' not in self.parameters:
72        self.parameters['columns'] = {}
73    cols = self.parameters['columns']
74    if not isinstance(cols, dict):
75        cols = {}
76        self.parameters['columns'] = cols
77    return cols

Return the columns dictionary defined in Pipe.parameters.

dtypes: Optional[Dict[str, Any]]
117@property
118def dtypes(self) -> Union[Dict[str, Any], None]:
119    """
120    If defined, return the `dtypes` dictionary defined in `meerschaum.Pipe.parameters`.
121    """
122    from meerschaum.config._patch import apply_patch_to_config
123    configured_dtypes = self.parameters.get('dtypes', {})
124    remote_dtypes = self.infer_dtypes(persist=False)
125    patched_dtypes = apply_patch_to_config(remote_dtypes, configured_dtypes)
126    self.parameters['dtypes'] = patched_dtypes
127    return self.parameters['dtypes']

If defined, return the dtypes dictionary defined in Pipe.parameters.

def get_columns(self, *args: str, error: bool = False) -> Union[str, Tuple[str]]:
139def get_columns(self, *args: str, error: bool = False) -> Union[str, Tuple[str]]:
140    """
141    Check if the requested columns are defined.
142
143    Parameters
144    ----------
145    *args: str
146        The column names to be retrieved.
147        
148    error: bool, default False
149        If `True`, raise an `Exception` if the specified column is not defined.
150
151    Returns
152    -------
153    A tuple of the same size of `args` or a `str` if `args` is a single argument.
154
155    Examples
156    --------
157    >>> pipe = mrsm.Pipe('test', 'test')
158    >>> pipe.columns = {'datetime': 'dt', 'id': 'id'}
159    >>> pipe.get_columns('datetime', 'id')
160    ('dt', 'id')
161    >>> pipe.get_columns('value', error=True)
162    Exception:  🛑 Missing 'value' column for Pipe('test', 'test').
163    """
164    from meerschaum.utils.warnings import error as _error, warn
165    if not args:
166        args = tuple(self.columns.keys())
167    col_names = []
168    for col in args:
169        col_name = None
170        try:
171            col_name = self.columns[col]
172            if col_name is None and error:
173                _error(f"Please define the name of the '{col}' column for {self}.")
174        except Exception as e:
175            col_name = None
176        if col_name is None and error:
177            _error(f"Missing '{col}'" + f" column for {self}.")
178        col_names.append(col_name)
179    if len(col_names) == 1:
180        return col_names[0]
181    return tuple(col_names)

Check if the requested columns are defined.

Parameters
  • *args (str): The column names to be retrieved.
  • error (bool, default False): If True, raise an Exception if the specified column is not defined.
Returns
  • A tuple of the same size of args or a str if args is a single argument.
Examples
>>> pipe = mrsm.Pipe('test', 'test')
>>> pipe.columns = {'datetime': 'dt', 'id': 'id'}
>>> pipe.get_columns('datetime', 'id')
('dt', 'id')
>>> pipe.get_columns('value', error=True)
Exception:  🛑 Missing 'value' column for Pipe('test', 'test').
def get_columns_types(self, debug: bool = False) -> Optional[Dict[str, str]]:
184def get_columns_types(self, debug: bool = False) -> Union[Dict[str, str], None]:
185    """
186    Get a dictionary of a pipe's column names and their types.
187
188    Parameters
189    ----------
190    debug: bool, default False:
191        Verbosity toggle.
192
193    Returns
194    -------
195    A dictionary of column names (`str`) to column types (`str`).
196
197    Examples
198    --------
199    >>> pipe.get_columns_types()
200    {
201      'dt': 'TIMESTAMP WITHOUT TIMEZONE',
202      'id': 'BIGINT',
203      'val': 'DOUBLE PRECISION',
204    }
205    >>>
206    """
207    from meerschaum.utils.venv import Venv
208    from meerschaum.connectors import get_connector_plugin
209
210    with Venv(get_connector_plugin(self.instance_connector)):
211        return self.instance_connector.get_pipe_columns_types(self, debug=debug)

Get a dictionary of a pipe's column names and their types.

Parameters
  • debug (bool, default False:): Verbosity toggle.
Returns
  • A dictionary of column names (str) to column types (str).
Examples
>>> pipe.get_columns_types()
{
  'dt': 'TIMESTAMP WITHOUT TIMEZONE',
  'id': 'BIGINT',
  'val': 'DOUBLE PRECISION',
}
>>>
def get_indices(self) -> Dict[str, str]:
436def get_indices(self) -> Dict[str, str]:
437    """
438    Return a dictionary in the form of `pipe.columns` but map to index names.
439    """
440    return {
441        ix: (self.target + '_' + col + '_index')
442        for ix, col in self.columns.items() if col
443    }

Return a dictionary in the form of pipe.columns but map to index names.

tags: Optional[List[str]]
92@property
93def tags(self) -> Union[List[str], None]:
94    """
95    If defined, return the `tags` list defined in `meerschaum.Pipe.parameters`.
96    """
97    if 'tags' not in self.parameters:
98        self.parameters['tags'] = []
99    return self.parameters['tags']

If defined, return the tags list defined in Pipe.parameters.

def get_id(self, **kw: Any) -> Optional[int]:
214def get_id(self, **kw: Any) -> Union[int, None]:
215    """
216    Fetch a pipe's ID from its instance connector.
217    If the pipe does not exist, return `None`.
218    """
219    if self.temporary:
220        return None
221    from meerschaum.utils.venv import Venv
222    from meerschaum.connectors import get_connector_plugin
223
224    with Venv(get_connector_plugin(self.instance_connector)):
225        return self.instance_connector.get_pipe_id(self, **kw)

Fetch a pipe's ID from its instance connector. If the pipe does not exist, return None.

id: Optional[int]
228@property
229def id(self) -> Union[int, None]:
230    """
231    Fetch and cache a pipe's ID.
232    """
233    if not ('_id' in self.__dict__ and self._id):
234        self._id = self.get_id()
235    return self._id

Fetch and cache a pipe's ID.

def get_val_column(self, debug: bool = False) -> Optional[str]:
238def get_val_column(self, debug: bool = False) -> Union[str, None]:
239    """
240    Return the name of the value column if it's defined, otherwise make an educated guess.
241    If not set in the `columns` dictionary, return the first numeric column that is not
242    an ID or datetime column.
243    If none may be found, return `None`.
244
245    Parameters
246    ----------
247    debug: bool, default False:
248        Verbosity toggle.
249
250    Returns
251    -------
252    Either a string or `None`.
253    """
254    from meerschaum.utils.debug import dprint
255    if debug:
256        dprint('Attempting to determine the value column...')
257    try:
258        val_name = self.get_columns('value')
259    except Exception as e:
260        val_name = None
261    if val_name is not None:
262        if debug:
263            dprint(f"Value column: {val_name}")
264        return val_name
265
266    cols = self.columns
267    if cols is None:
268        if debug:
269            dprint('No columns could be determined. Returning...')
270        return None
271    try:
272        dt_name = self.get_columns('datetime', error=False)
273    except Exception as e:
274        dt_name = None
275    try:
276        id_name = self.get_columns('id', errors=False)
277    except Exception as e:
278        id_name = None
279
280    if debug:
281        dprint(f"dt_name: {dt_name}")
282        dprint(f"id_name: {id_name}")
283
284    cols_types = self.get_columns_types(debug=debug)
285    if cols_types is None:
286        return None
287    if debug:
288        dprint(f"cols_types: {cols_types}")
289    if dt_name is not None:
290        cols_types.pop(dt_name, None)
291    if id_name is not None:
292        cols_types.pop(id_name, None)
293
294    candidates = []
295    candidate_keywords = {'float', 'double', 'precision', 'int', 'numeric',}
296    for search_term in candidate_keywords:
297        for col, typ in cols_types.items():
298            if search_term in typ.lower():
299                candidates.append(col)
300                break
301    if not candidates:
302        if debug:
303            dprint(f"No value column could be determined.")
304        return None
305
306    return candidates[0]

Return the name of the value column if it's defined, otherwise make an educated guess. If not set in the columns dictionary, return the first numeric column that is not an ID or datetime column. If none may be found, return None.

Parameters
  • debug (bool, default False:): Verbosity toggle.
Returns
  • Either a string or None.
parents: List[Pipe]
309@property
310def parents(self) -> List[meerschaum.Pipe]:
311    """
312    Return a list of `meerschaum.Pipe` objects to be designated as parents.
313    """
314    if 'parents' not in self.parameters:
315        return []
316    from meerschaum.utils.warnings import warn
317    _parents_keys = self.parameters['parents']
318    if not isinstance(_parents_keys, list):
319        warn(
320            f"Please ensure the parents for {self} are defined as a list of keys.",
321            stacklevel = 4
322        )
323        return []
324    from meerschaum import Pipe
325    _parents = []
326    for keys in _parents_keys:
327        try:
328            p = Pipe(**keys)
329        except Exception as e:
330            warn(f"Unable to build parent with keys '{keys}' for {self}:\n{e}")
331            continue
332        _parents.append(p)
333    return _parents

Return a list of Pipe objects to be designated as parents.

children: List[Pipe]
336@property
337def children(self) -> List[meerschaum.Pipe]:
338    """
339    Return a list of `meerschaum.Pipe` objects to be designated as children.
340    """
341    if 'children' not in self.parameters:
342        return []
343    from meerschaum.utils.warnings import warn
344    _children_keys = self.parameters['children']
345    if not isinstance(_children_keys, list):
346        warn(
347            f"Please ensure the children for {self} are defined as a list of keys.",
348            stacklevel = 4
349        )
350        return []
351    from meerschaum import Pipe
352    _children = []
353    for keys in _children_keys:
354        try:
355            p = Pipe(**keys)
356        except Exception as e:
357            warn(f"Unable to build parent with keys '{keys}' for {self}:\n{e}")
358            continue
359        _children.append(p)
360    return _children

Return a list of Pipe objects to be designated as children.

target: str
363@property
364def target(self) -> str:
365    """
366    The target table name.
367    You can set the target name under on of the following keys
368    (checked in this order):
369      - `target`
370      - `target_name`
371      - `target_table`
372      - `target_table_name`
373    """
374    if 'target' not in self.parameters:
375        target = self._target_legacy()
376        potential_keys = ('target_name', 'target_table', 'target_table_name')
377        for k in potential_keys:
378            if k in self.parameters:
379                target = self.parameters[k]
380                break
381
382        if self.instance_connector.type == 'sql':
383            from meerschaum.utils.sql import truncate_item_name
384            truncated_target = truncate_item_name(target, self.instance_connector.flavor)
385            if truncated_target != target:
386                warn(
387                    f"The target '{target}' is too long for '{self.instance_connector.flavor}', "
388                    + f"will use {truncated_target} instead."
389                )
390                target = truncated_target
391
392        self.target = target
393    return self.parameters['target']

The target table name. You can set the target name under on of the following keys (checked in this order):

  • target
  • target_name
  • target_table
  • target_table_name
def guess_datetime(self) -> Optional[str]:
416def guess_datetime(self) -> Union[str, None]:
417    """
418    Try to determine a pipe's datetime column.
419    """
420    dtypes = self.dtypes
421
422    ### Abort if the user explictly disallows a datetime index.
423    if 'datetime' in dtypes:
424        if dtypes['datetime'] is None:
425            return None
426
427    dt_cols = [
428        col for col, typ in self.dtypes.items()
429        if str(typ).startswith('datetime')
430    ]
431    if not dt_cols:
432        return None
433    return dt_cols[0]    

Try to determine a pipe's datetime column.

def show( self, nopretty: bool = False, debug: bool = False, **kw) -> Tuple[bool, str]:
12def show(
13        self,
14        nopretty: bool = False,
15        debug: bool = False,
16        **kw
17    ) -> SuccessTuple:
18    """
19    Show attributes of a Pipe.
20
21    Parameters
22    ----------
23    nopretty: bool, default False
24        If `True`, simply print the JSON of the pipe's attributes.
25
26    debug: bool, default False
27        Verbosity toggle.
28
29    Returns
30    -------
31    A `SuccessTuple` of success, message.
32
33    """
34    import json
35    from meerschaum.utils.formatting import (
36        pprint, make_header, ANSI, highlight_pipes, fill_ansi, get_console,
37    )
38    from meerschaum.utils.packages import import_rich, attempt_import
39    from meerschaum.utils.warnings import info
40    attributes_json = json.dumps(self.attributes)
41    if not nopretty:
42        _to_print = f"Attributes for {self}:"
43        if ANSI:
44            _to_print = fill_ansi(highlight_pipes(make_header(_to_print)), 'magenta')
45            print(_to_print)
46            rich = import_rich()
47            rich_json = attempt_import('rich.json')
48            get_console().print(rich_json.JSON(attributes_json))
49        else:
50            print(_to_print)
51    else:
52        print(attributes_json)
53
54    return True, "Success"

Show attributes of a Pipe.

Parameters
  • nopretty (bool, default False): If True, simply print the JSON of the pipe's attributes.
  • debug (bool, default False): Verbosity toggle.
Returns
def edit( self, patch: bool = False, interactive: bool = False, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
21def edit(
22        self,
23        patch: bool = False,
24        interactive: bool = False,
25        debug: bool = False,
26        **kw: Any
27    ) -> SuccessTuple:
28    """
29    Edit a Pipe's configuration.
30
31    Parameters
32    ----------
33    patch: bool, default False
34        If `patch` is True, update parameters by cascading rather than overwriting.
35    interactive: bool, default False
36        If `True`, open an editor for the user to make changes to the pipe's YAML file.
37    debug: bool, default False
38        Verbosity toggle.
39
40    Returns
41    -------
42    A `SuccessTuple` of success, message.
43
44    """
45    from meerschaum.utils.venv import Venv
46    from meerschaum.connectors import get_connector_plugin
47
48    if self.temporary:
49        return False, "Cannot edit pipes created with `temporary=True` (read-only)."
50
51    if not interactive:
52        with Venv(get_connector_plugin(self.instance_connector)):
53            return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw)
54    from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
55    from meerschaum.utils.misc import edit_file
56    parameters_filename = str(self) + '.yaml'
57    parameters_path = PIPES_CACHE_RESOURCES_PATH / parameters_filename
58    
59    from meerschaum.utils.yaml import yaml
60
61    edit_text = f"Edit the parameters for {self}"
62    edit_top = '#' * (len(edit_text) + 4)
63    edit_header = edit_top + f'\n# {edit_text} #\n' + edit_top + '\n\n'
64
65    from meerschaum.config import get_config
66    parameters = dict(get_config('pipes', 'parameters', patch=True))
67    from meerschaum.config._patch import apply_patch_to_config
68    parameters = apply_patch_to_config(parameters, self.parameters)
69
70    ### write parameters to yaml file
71    with open(parameters_path, 'w+') as f:
72        f.write(edit_header)
73        yaml.dump(parameters, stream=f, sort_keys=False)
74
75    ### only quit editing if yaml is valid
76    editing = True
77    while editing:
78        edit_file(parameters_path)
79        try:
80            with open(parameters_path, 'r') as f:
81                file_parameters = yaml.load(f.read())
82        except Exception as e:
83            from meerschaum.utils.warnings import warn
84            warn(f"Invalid format defined for '{self}':\n\n{e}")
85            input(f"Press [Enter] to correct the configuration for '{self}': ")
86        else:
87            editing = False
88
89    self.parameters = file_parameters
90
91    if debug:
92        from meerschaum.utils.formatting import pprint
93        pprint(self.parameters)
94
95    with Venv(get_connector_plugin(self.instance_connector)):
96        return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw)

Edit a Pipe's configuration.

Parameters
  • patch (bool, default False): If patch is True, update parameters by cascading rather than overwriting.
  • interactive (bool, default False): If True, open an editor for the user to make changes to the pipe's YAML file.
  • debug (bool, default False): Verbosity toggle.
Returns
def edit_definition( self, yes: bool = False, noask: bool = False, force: bool = False, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
 99def edit_definition(
100        self,
101        yes: bool = False,
102        noask: bool = False,
103        force: bool = False,
104        debug : bool = False,
105        **kw : Any
106    ) -> SuccessTuple:
107    """
108    Edit a pipe's definition file and update its configuration.
109    **NOTE:** This function is interactive and should not be used in automated scripts!
110
111    Returns
112    -------
113    A `SuccessTuple` of success, message.
114
115    """
116    if self.temporary:
117        return False, "Cannot edit pipes created with `temporary=True` (read-only)."
118
119    from meerschaum.connectors import instance_types
120    if (self.connector is None) or self.connector.type not in instance_types:
121        return self.edit(interactive=True, debug=debug, **kw)
122
123    import json
124    from meerschaum.utils.warnings import info, warn
125    from meerschaum.utils.debug import dprint
126    from meerschaum.config._patch import apply_patch_to_config
127    from meerschaum.utils.misc import edit_file
128
129    _parameters = self.parameters
130    if 'fetch' not in _parameters:
131        _parameters['fetch'] = {}
132
133    def _edit_api():
134        from meerschaum.utils.prompt import prompt, yes_no
135        info(
136            f"Please enter the keys of the source pipe from '{self.connector}'.\n" +
137            "Type 'None' for None, or empty when there is no default. Press [CTRL+C] to skip."
138        )
139
140        _keys = { 'connector_keys' : None, 'metric_key' : None, 'location_key' : None }
141        for k in _keys:
142            _keys[k] = _parameters['fetch'].get(k, None)
143
144        for k, v in _keys.items():
145            try:
146                _keys[k] = prompt(k.capitalize().replace('_', ' ') + ':', icon=True, default=v)
147            except KeyboardInterrupt:
148                continue
149            if _keys[k] in ('', 'None', '\'None\'', '[None]'):
150                _keys[k] = None
151
152        _parameters['fetch'] = apply_patch_to_config(_parameters['fetch'], _keys)
153
154        info("You may optionally specify additional filter parameters as JSON.")
155        print("  Parameters are translated into a 'WHERE x AND y' clause, and lists are IN clauses.")
156        print("  For example, the following JSON would correspond to 'WHERE x = 1 AND y IN (2, 3)':")
157        print(json.dumps({'x': 1, 'y': [2, 3]}, indent=2, separators=(',', ': ')))
158        if force or yes_no(
159            "Would you like to add additional filter parameters?",
160            yes=yes, noask=noask
161        ):
162            from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
163            definition_filename = str(self) + '.json'
164            definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename
165            try:
166                definition_path.touch()
167                with open(definition_path, 'w+') as f:
168                    json.dump(_parameters.get('fetch', {}).get('params', {}), f, indent=2)
169            except Exception as e:
170                return False, f"Failed writing file '{definition_path}':\n" + str(e)
171
172            _params = None
173            while True:
174                edit_file(definition_path)
175                try:
176                    with open(definition_path, 'r') as f:
177                        _params = json.load(f)
178                except Exception as e:
179                    warn(f'Failed to read parameters JSON:\n{e}', stack=False)
180                    if force or yes_no(
181                        "Would you like to try again?\n  "
182                        + "If not, the parameters JSON file will be ignored.",
183                        noask=noask, yes=yes
184                    ):
185                        continue
186                    _params = None
187                break
188            if _params is not None:
189                if 'fetch' not in _parameters:
190                    _parameters['fetch'] = {}
191                _parameters['fetch']['params'] = _params
192
193        self.parameters = _parameters
194        return True, "Success"
195
196    def _edit_sql():
197        import pathlib, os, textwrap
198        from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
199        from meerschaum.utils.misc import edit_file
200        definition_filename = str(self) + '.sql'
201        definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename
202
203        sql_definition = _parameters['fetch'].get('definition', None)
204        if sql_definition is None:
205            sql_definition = ''
206        sql_definition = textwrap.dedent(sql_definition).lstrip()
207
208        try:
209            definition_path.touch()
210            with open(definition_path, 'w+') as f:
211                f.write(sql_definition)
212        except Exception as e:
213            return False, f"Failed writing file '{definition_path}':\n" + str(e)
214
215        edit_file(definition_path)
216        try:
217            with open(definition_path, 'r') as f:
218                file_definition = f.read()
219        except Exception as e:
220            return False, f"Failed reading file '{definition_path}':\n" + str(e)
221
222        if sql_definition == file_definition:
223            return False, f"No changes made to definition for {self}."
224
225        if ' ' not in file_definition:
226            return False, f"Invalid SQL definition for {self}."
227
228        if debug:
229            dprint("Read SQL definition:\n\n" + file_definition)
230        _parameters['fetch']['definition'] = file_definition
231        self.parameters = _parameters
232        return True, "Success"
233
234    locals()['_edit_' + str(self.connector.type)]()
235    return self.edit(interactive=False, debug=debug, **kw)

Edit a pipe's definition file and update its configuration. NOTE: This function is interactive and should not be used in automated scripts!

Returns
def update(self, *args, **kw) -> Tuple[bool, str]:
13def update(self, *args, **kw) -> SuccessTuple:
14    """
15    Update a pipe's parameters in its instance.
16    """
17    kw['interactive'] = False
18    return self.edit(*args, **kw)

Update a pipe's parameters in its instance.

def sync( self, df: Union[pandas.core.frame.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]], meerschaum.core.Pipe._sync.InferFetch] = <class 'meerschaum.core.Pipe._sync.InferFetch'>, begin: Union[datetime.datetime, int, str, NoneType] = '', end: Union[datetime.datetime, int, NoneType] = None, force: bool = False, retries: int = 10, min_seconds: int = 1, check_existing: bool = True, blocking: bool = True, workers: Optional[int] = None, callback: Optional[Callable[[Tuple[bool, str]], Any]] = None, error_callback: Optional[Callable[[Exception], Any]] = None, chunksize: Optional[int] = -1, sync_chunks: bool = True, debug: bool = False, _inplace: bool = True, **kw: Any) -> Tuple[bool, str]:
 42def sync(
 43    self,
 44    df: Union[
 45        pd.DataFrame,
 46        Dict[str, List[Any]],
 47        List[Dict[str, Any]],
 48        InferFetch
 49    ] = InferFetch,
 50    begin: Union[datetime, int, str, None] = '',
 51    end: Union[datetime, int, None] = None,
 52    force: bool = False,
 53    retries: int = 10,
 54    min_seconds: int = 1,
 55    check_existing: bool = True,
 56    blocking: bool = True,
 57    workers: Optional[int] = None,
 58    callback: Optional[Callable[[Tuple[bool, str]], Any]] = None,
 59    error_callback: Optional[Callable[[Exception], Any]] = None,
 60    chunksize: Optional[int] = -1,
 61    sync_chunks: bool = True,
 62    debug: bool = False,
 63    _inplace: bool = True,
 64    **kw: Any
 65) -> SuccessTuple:
 66    """
 67    Fetch new data from the source and update the pipe's table with new data.
 68    
 69    Get new remote data via fetch, get existing data in the same time period,
 70    and merge the two, only keeping the unseen data.
 71
 72    Parameters
 73    ----------
 74    df: Union[None, pd.DataFrame, Dict[str, List[Any]]], default None
 75        An optional DataFrame to sync into the pipe. Defaults to `None`.
 76
 77    begin: Union[datetime, int, str, None], default ''
 78        Optionally specify the earliest datetime to search for data.
 79
 80    end: Union[datetime, int, str, None], default None
 81        Optionally specify the latest datetime to search for data.
 82
 83    force: bool, default False
 84        If `True`, keep trying to sync untul `retries` attempts.
 85
 86    retries: int, default 10
 87        If `force`, how many attempts to try syncing before declaring failure.
 88
 89    min_seconds: Union[int, float], default 1
 90        If `force`, how many seconds to sleep between retries. Defaults to `1`.
 91
 92    check_existing: bool, default True
 93        If `True`, pull and diff with existing data from the pipe.
 94
 95    blocking: bool, default True
 96        If `True`, wait for sync to finish and return its result, otherwise
 97        asyncronously sync (oxymoron?) and return success. Defaults to `True`.
 98        Only intended for specific scenarios.
 99
100    workers: Optional[int], default None
101        If provided and the instance connector is thread-safe
102        (`pipe.instance_connector.IS_THREAD_SAFE is True`),
103        limit concurrent sync to this many threads.
104
105    callback: Optional[Callable[[Tuple[bool, str]], Any]], default None
106        Callback function which expects a SuccessTuple as input.
107        Only applies when `blocking=False`.
108
109    error_callback: Optional[Callable[[Exception], Any]], default None
110        Callback function which expects an Exception as input.
111        Only applies when `blocking=False`.
112
113    chunksize: int, default -1
114        Specify the number of rows to sync per chunk.
115        If `-1`, resort to system configuration (default is `900`).
116        A `chunksize` of `None` will sync all rows in one transaction.
117
118    sync_chunks: bool, default True
119        If possible, sync chunks while fetching them into memory.
120
121    debug: bool, default False
122        Verbosity toggle. Defaults to False.
123
124    Returns
125    -------
126    A `SuccessTuple` of success (`bool`) and message (`str`).
127    """
128    from meerschaum.utils.debug import dprint, _checkpoint
129    from meerschaum.connectors import custom_types
130    from meerschaum.plugins import Plugin
131    from meerschaum.utils.formatting import get_console
132    from meerschaum.utils.venv import Venv
133    from meerschaum.connectors import get_connector_plugin
134    from meerschaum.utils.misc import df_is_chunk_generator, filter_keywords, filter_arguments
135    from meerschaum.utils.pool import get_pool
136    from meerschaum.config import get_config
137
138    if (callback is not None or error_callback is not None) and blocking:
139        warn("Callback functions are only executed when blocking = False. Ignoring...")
140
141    _checkpoint(_total=2, **kw)
142
143    if chunksize == 0:
144        chunksize = None
145        sync_chunks = False
146
147    kw.update({
148        'begin': begin,
149        'end': end,
150        'force': force,
151        'retries': retries,
152        'min_seconds': min_seconds,
153        'check_existing': check_existing,
154        'blocking': blocking,
155        'workers': workers,
156        'callback': callback,
157        'error_callback': error_callback,
158        'sync_chunks': sync_chunks,
159        'chunksize': chunksize,
160    })
161
162    ### NOTE: Invalidate `_exists` cache before and after syncing.
163    self._exists = None
164
165    def _sync(
166        p: 'meerschaum.Pipe',
167        df: Union[
168            'pd.DataFrame',
169            Dict[str, List[Any]],
170            List[Dict[str, Any]],
171            InferFetch
172        ] = InferFetch,
173    ) -> SuccessTuple:
174        if df is None:
175            p._exists = None
176            return (
177                False,
178                f"You passed `None` instead of data into `sync()` for {p}.\n"
179                + "Omit the DataFrame to infer fetching.",
180            )
181        ### Ensure that Pipe is registered.
182        if not p.temporary and p.get_id(debug=debug) is None:
183            ### NOTE: This may trigger an interactive session for plugins!
184            register_success, register_msg = p.register(debug=debug)
185            if not register_success:
186                if 'already' not in register_msg:
187                    p._exists = None
188                    return register_success, register_msg
189
190        ### If connector is a plugin with a `sync()` method, return that instead.
191        ### If the plugin does not have a `sync()` method but does have a `fetch()` method,
192        ### use that instead.
193        ### NOTE: The DataFrame must be omitted for the plugin sync method to apply.
194        ### If a DataFrame is provided, continue as expected.
195        if hasattr(df, 'MRSM_INFER_FETCH'):
196            try:
197                if p.connector is None:
198                    if ':' not in p.connector_keys:
199                        return True, f"{p} does not support fetching; nothing to do."
200
201                    msg = f"{p} does not have a valid connector."
202                    if p.connector_keys.startswith('plugin:'):
203                        msg += f"\n    Perhaps {p.connector_keys} has a syntax error?"
204                    p._exists = None
205                    return False, msg
206            except Exception:
207                p._exists = None
208                return False, f"Unable to create the connector for {p}."
209
210            ### Sync in place if this is a SQL pipe.
211            if (
212                str(self.connector) == str(self.instance_connector)
213                and 
214                hasattr(self.instance_connector, 'sync_pipe_inplace')
215                and
216                _inplace
217                and
218                get_config('system', 'experimental', 'inplace_sync')
219            ):
220                with Venv(get_connector_plugin(self.instance_connector)):
221                    p._exists = None
222                    _args, _kwargs = filter_arguments(
223                        p.instance_connector.sync_pipe_inplace,
224                        p,
225                        debug=debug,
226                        **kw
227                    )
228                    return self.instance_connector.sync_pipe_inplace(
229                        *_args,
230                        **_kwargs
231                    )
232
233            ### Activate and invoke `sync(pipe)` for plugin connectors with `sync` methods.
234            try:
235                if getattr(p.connector, 'sync', None) is not None:
236                    with Venv(get_connector_plugin(p.connector), debug=debug):
237                        _args, _kwargs = filter_arguments(
238                            p.connector.sync,
239                            p,
240                            debug=debug,
241                            **kw
242                        )
243                        return_tuple = p.connector.sync(*_args, **_kwargs)
244                    p._exists = None
245                    if not isinstance(return_tuple, tuple):
246                        return_tuple = (
247                            False,
248                            f"Plugin '{p.connector.label}' returned non-tuple value: {return_tuple}"
249                        )
250                    return return_tuple
251
252            except Exception as e:
253                get_console().print_exception()
254                msg = f"Failed to sync {p} with exception: '" + str(e) + "'"
255                if debug:
256                    error(msg, silent=False)
257                p._exists = None
258                return False, msg
259
260            ### Fetch the dataframe from the connector's `fetch()` method.
261            try:
262                with Venv(get_connector_plugin(p.connector), debug=debug):
263                    df = p.fetch(
264                        **filter_keywords(
265                            p.fetch,
266                            debug=debug,
267                            **kw
268                        )
269                    )
270
271            except Exception as e:
272                get_console().print_exception(
273                    suppress=[
274                        'meerschaum/core/Pipe/_sync.py',
275                        'meerschaum/core/Pipe/_fetch.py',
276                    ]
277                )
278                msg = f"Failed to fetch data from {p.connector}:\n    {e}"
279                df = None
280
281            if df is None:
282                p._exists = None
283                return False, f"No data were fetched for {p}."
284
285            if isinstance(df, list):
286                if len(df) == 0:
287                    return True, f"No new rows were returned for {p}."
288
289                ### May be a chunk hook results list.
290                if isinstance(df[0], tuple):
291                    success = all([_success for _success, _ in df])
292                    message = '\n'.join([_message for _, _message in df])
293                    return success, message
294
295            ### TODO: Depreciate async?
296            if df is True:
297                p._exists = None
298                return True, f"{p} is being synced in parallel."
299
300        ### CHECKPOINT: Retrieved the DataFrame.
301        _checkpoint(**kw)
302        
303        ### Allow for dataframe generators or iterables.
304        if df_is_chunk_generator(df):
305            kw['workers'] = p.get_num_workers(kw.get('workers', None))
306            dt_col = p.columns.get('datetime', None)
307            pool = get_pool(workers=kw.get('workers', 1))
308            if debug:
309                dprint(f"Received {type(df)}. Attempting to sync first chunk...")
310
311            try:
312                chunk = next(df)
313            except StopIteration:
314                return True, "Received an empty generator; nothing to do."
315
316            chunk_success, chunk_msg = _sync(p, chunk)
317            chunk_msg = '\n' + self._get_chunk_label(chunk, dt_col) + '\n' + chunk_msg
318            if not chunk_success:
319                return chunk_success, f"Unable to sync initial chunk for {p}:\n{chunk_msg}"
320            if debug:
321                dprint("Successfully synced the first chunk, attemping the rest...")
322
323            failed_chunks = []
324            def _process_chunk(_chunk):
325                try:
326                    _chunk_success, _chunk_msg = _sync(p, _chunk)
327                except Exception as e:
328                    _chunk_success, _chunk_msg = False, str(e)
329                if not _chunk_success:
330                    failed_chunks.append(_chunk)
331                return (
332                    _chunk_success,
333                    (
334                        '\n'
335                        + self._get_chunk_label(_chunk, dt_col)
336                        + '\n'
337                        + _chunk_msg
338                    )
339                )
340
341            results = sorted(
342                [(chunk_success, chunk_msg)] + (
343                    list(pool.imap(_process_chunk, df))
344                    if not df_is_chunk_generator(chunk)
345                    else [
346                        _process_chunk(_child_chunks)
347                        for _child_chunks in df
348                    ]
349                )
350            )
351            chunk_messages = [chunk_msg for _, chunk_msg in results]
352            success_bools = [chunk_success for chunk_success, _ in results]
353            success = all(success_bools)
354            msg = '\n'.join(chunk_messages)
355
356            ### If some chunks succeeded, retry the failures.
357            retry_success = True
358            if not success and any(success_bools):
359                if debug:
360                    dprint("Retrying failed chunks...")
361                chunks_to_retry = [c for c in failed_chunks]
362                failed_chunks = []
363                for chunk in chunks_to_retry:
364                    chunk_success, chunk_msg = _process_chunk(chunk)
365                    msg += f"\n\nRetried chunk:\n{chunk_msg}\n"
366                    retry_success = retry_success and chunk_success
367
368            success = success and retry_success
369            return success, msg
370
371        ### Cast to a dataframe and ensure datatypes are what we expect.
372        df = self.enforce_dtypes(df, chunksize=chunksize, debug=debug)
373        if debug:
374            dprint(
375                "DataFrame to sync:\n"
376                + (
377                    str(df)[:255]
378                    + '...'
379                    if len(str(df)) >= 256
380                    else str(df)
381                ),
382                **kw
383            )
384
385        ### if force, continue to sync until success
386        return_tuple = False, f"Did not sync {p}."
387        run = True
388        _retries = 1
389        while run:
390            with Venv(get_connector_plugin(self.instance_connector)):
391                return_tuple = p.instance_connector.sync_pipe(
392                    pipe=p,
393                    df=df,
394                    debug=debug,
395                    **kw
396                )
397            _retries += 1
398            run = (not return_tuple[0]) and force and _retries <= retries
399            if run and debug:
400                dprint(f"Syncing failed for {p}. Attempt ( {_retries} / {retries} )", **kw)
401                dprint(f"Sleeping for {min_seconds} seconds...", **kw)
402                time.sleep(min_seconds)
403            if _retries > retries:
404                warn(
405                    f"Unable to sync {p} within {retries} attempt" +
406                        ("s" if retries != 1 else "") + "!"
407                )
408
409        ### CHECKPOINT: Finished syncing. Handle caching.
410        _checkpoint(**kw)
411        if self.cache_pipe is not None:
412            if debug:
413                dprint("Caching retrieved dataframe.", **kw)
414                _sync_cache_tuple = self.cache_pipe.sync(df, debug=debug, **kw)
415                if not _sync_cache_tuple[0]:
416                    warn(f"Failed to sync local cache for {self}.")
417
418        self._exists = None
419        return return_tuple
420
421    if blocking:
422        self._exists = None
423        return _sync(self, df = df)
424
425    from meerschaum.utils.threading import Thread
426    def default_callback(result_tuple: SuccessTuple):
427        dprint(f"Asynchronous result from {self}: {result_tuple}", **kw)
428
429    def default_error_callback(x: Exception):
430        dprint(f"Error received for {self}: {x}", **kw)
431
432    if callback is None and debug:
433        callback = default_callback
434    if error_callback is None and debug:
435        error_callback = default_error_callback
436    try:
437        thread = Thread(
438            target=_sync,
439            args=(self,),
440            kwargs={'df': df},
441            daemon=False,
442            callback=callback,
443            error_callback=error_callback,
444        )
445        thread.start()
446    except Exception as e:
447        self._exists = None
448        return False, str(e)
449
450    self._exists = None
451    return True, f"Spawned asyncronous sync for {self}."

Fetch new data from the source and update the pipe's table with new data.

Get new remote data via fetch, get existing data in the same time period, and merge the two, only keeping the unseen data.

Parameters
  • df (Union[None, pd.DataFrame, Dict[str, List[Any]]], default None): An optional DataFrame to sync into the pipe. Defaults to None.
  • begin (Union[datetime, int, str, None], default ''): Optionally specify the earliest datetime to search for data.
  • end (Union[datetime, int, str, None], default None): Optionally specify the latest datetime to search for data.
  • force (bool, default False): If True, keep trying to sync untul retries attempts.
  • retries (int, default 10): If force, how many attempts to try syncing before declaring failure.
  • min_seconds (Union[int, float], default 1): If force, how many seconds to sleep between retries. Defaults to 1.
  • check_existing (bool, default True): If True, pull and diff with existing data from the pipe.
  • blocking (bool, default True): If True, wait for sync to finish and return its result, otherwise asyncronously sync (oxymoron?) and return success. Defaults to True. Only intended for specific scenarios.
  • workers (Optional[int], default None): If provided and the instance connector is thread-safe (pipe.instance_connector.IS_THREAD_SAFE is True), limit concurrent sync to this many threads.
  • callback (Optional[Callable[[Tuple[bool, str]], Any]], default None): Callback function which expects a SuccessTuple as input. Only applies when blocking=False.
  • error_callback (Optional[Callable[[Exception], Any]], default None): Callback function which expects an Exception as input. Only applies when blocking=False.
  • chunksize (int, default -1): Specify the number of rows to sync per chunk. If -1, resort to system configuration (default is 900). A chunksize of None will sync all rows in one transaction.
  • sync_chunks (bool, default True): If possible, sync chunks while fetching them into memory.
  • debug (bool, default False): Verbosity toggle. Defaults to False.
Returns
def get_sync_time( self, params: Optional[Dict[str, Any]] = None, newest: bool = True, apply_backtrack_interval: bool = False, round_down: bool = False, debug: bool = False) -> Optional[datetime.datetime]:
454def get_sync_time(
455    self,
456    params: Optional[Dict[str, Any]] = None,
457    newest: bool = True,
458    apply_backtrack_interval: bool = False,
459    round_down: bool = False,
460    debug: bool = False
461) -> Union['datetime', None]:
462    """
463    Get the most recent datetime value for a Pipe.
464
465    Parameters
466    ----------
467    params: Optional[Dict[str, Any]], default None
468        Dictionary to build a WHERE clause for a specific column.
469        See `meerschaum.utils.sql.build_where`.
470
471    newest: bool, default True
472        If `True`, get the most recent datetime (honoring `params`).
473        If `False`, get the oldest datetime (`ASC` instead of `DESC`).
474
475    apply_backtrack_interval: bool, default False
476        If `True`, subtract the backtrack interval from the sync time.
477
478    round_down: bool, default False
479        If `True`, round down the datetime value to the nearest minute.
480
481    debug: bool, default False
482        Verbosity toggle.
483
484    Returns
485    -------
486    A `datetime` object if the pipe exists, otherwise `None`.
487
488    """
489    from meerschaum.utils.venv import Venv
490    from meerschaum.connectors import get_connector_plugin
491    from meerschaum.utils.misc import round_time
492
493    with Venv(get_connector_plugin(self.instance_connector)):
494        sync_time = self.instance_connector.get_sync_time(
495            self,
496            params=params,
497            newest=newest,
498            debug=debug,
499        )
500
501    if round_down and isinstance(sync_time, datetime):
502        sync_time = round_time(sync_time, timedelta(minutes=1))
503
504    if apply_backtrack_interval and sync_time is not None:
505        backtrack_interval = self.get_backtrack_interval(debug=debug)
506        try:
507            sync_time -= backtrack_interval
508        except Exception as e:
509            warn(f"Failed to apply backtrack interval:\n{e}")
510
511    return sync_time

Get the most recent datetime value for a Pipe.

Parameters
  • params (Optional[Dict[str, Any]], default None): Dictionary to build a WHERE clause for a specific column. See meerschaum.utils.sql.build_where.
  • newest (bool, default True): If True, get the most recent datetime (honoring params). If False, get the oldest datetime (ASC instead of DESC).
  • apply_backtrack_interval (bool, default False): If True, subtract the backtrack interval from the sync time.
  • round_down (bool, default False): If True, round down the datetime value to the nearest minute.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A datetime object if the pipe exists, otherwise None.
def exists(self, debug: bool = False) -> bool:
514def exists(
515        self,
516        debug : bool = False
517    ) -> bool:
518    """
519    See if a Pipe's table exists.
520
521    Parameters
522    ----------
523    debug: bool, default False
524        Verbosity toggle.
525
526    Returns
527    -------
528    A `bool` corresponding to whether a pipe's underlying table exists.
529
530    """
531    import time
532    from meerschaum.utils.venv import Venv
533    from meerschaum.connectors import get_connector_plugin
534    from meerschaum.config import STATIC_CONFIG
535    from meerschaum.utils.debug import dprint
536    now = time.perf_counter()
537    exists_timeout_seconds = STATIC_CONFIG['pipes']['exists_timeout_seconds']
538
539    _exists = self.__dict__.get('_exists', None)
540    if _exists:
541        exists_timestamp = self.__dict__.get('_exists_timestamp', None)
542        if exists_timestamp is not None:
543            delta = now - exists_timestamp
544            if delta < exists_timeout_seconds:
545                if debug:
546                    dprint(f"Returning cached `exists` for {self} ({round(delta, 2)} seconds old).")
547                return _exists
548
549    with Venv(get_connector_plugin(self.instance_connector)):
550        _exists = self.instance_connector.pipe_exists(pipe=self, debug=debug)
551
552    self.__dict__['_exists'] = _exists
553    self.__dict__['_exists_timestamp'] = now
554    return _exists

See if a Pipe's table exists.

Parameters
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool corresponding to whether a pipe's underlying table exists.
def filter_existing( self, df: pandas.core.frame.DataFrame, safe_copy: bool = True, date_bound_only: bool = False, chunksize: Optional[int] = -1, debug: bool = False, **kw) -> Tuple[pandas.core.frame.DataFrame, pandas.core.frame.DataFrame, pandas.core.frame.DataFrame]:
557def filter_existing(
558        self,
559        df: 'pd.DataFrame',
560        safe_copy: bool = True,
561        date_bound_only: bool = False,
562        chunksize: Optional[int] = -1,
563        debug: bool = False,
564        **kw
565    ) -> Tuple['pd.DataFrame', 'pd.DataFrame', 'pd.DataFrame']:
566    """
567    Inspect a dataframe and filter out rows which already exist in the pipe.
568
569    Parameters
570    ----------
571    df: 'pd.DataFrame'
572        The dataframe to inspect and filter.
573        
574    safe_copy: bool, default True
575        If `True`, create a copy before comparing and modifying the dataframes.
576        Setting to `False` may mutate the DataFrames.
577        See `meerschaum.utils.dataframe.filter_unseen_df`.
578
579    date_bound_only: bool, default False
580        If `True`, only use the datetime index to fetch the sample dataframe.
581
582    chunksize: Optional[int], default -1
583        The `chunksize` used when fetching existing data.
584
585    debug: bool, default False
586        Verbosity toggle.
587
588    Returns
589    -------
590    A tuple of three pandas DataFrames: unseen, update, and delta.
591    """
592    from meerschaum.utils.warnings import warn
593    from meerschaum.utils.debug import dprint
594    from meerschaum.utils.packages import attempt_import, import_pandas
595    from meerschaum.utils.misc import round_time
596    from meerschaum.utils.dataframe import (
597        filter_unseen_df,
598        add_missing_cols_to_df,
599        get_unhashable_cols,
600        get_numeric_cols,
601    )
602    from meerschaum.utils.dtypes import (
603        to_pandas_dtype,
604        none_if_null,
605    )
606    from meerschaum.config import get_config
607    pd = import_pandas()
608    pandas = attempt_import('pandas')
609    if not 'dataframe' in str(type(df)).lower():
610        df = self.enforce_dtypes(df, chunksize=chunksize, debug=debug)
611    is_dask = 'dask' in df.__module__
612    if is_dask:
613        dd = attempt_import('dask.dataframe')
614        merge = dd.merge
615        NA = pandas.NA
616    else:
617        merge = pd.merge
618        NA = pd.NA
619    if df is None:
620        return df, df, df
621    if (df.empty if not is_dask else len(df) == 0):
622        return df, df, df
623
624    ### begin is the oldest data in the new dataframe
625    begin, end = None, None
626    dt_col = self.columns.get('datetime', None)
627    dt_type = self.dtypes.get(dt_col, 'datetime64[ns]') if dt_col else None
628    try:
629        min_dt_val = df[dt_col].min(skipna=True) if dt_col else None
630        if is_dask and min_dt_val is not None:
631            min_dt_val = min_dt_val.compute()
632        min_dt = (
633            pandas.to_datetime(min_dt_val).to_pydatetime()
634            if min_dt_val is not None and 'datetime' in str(dt_type)
635            else min_dt_val
636        )
637    except Exception as e:
638        min_dt = None
639    if not ('datetime' in str(type(min_dt))) or str(min_dt) == 'NaT':
640        if 'int' not in str(type(min_dt)).lower():
641            min_dt = None
642
643    if isinstance(min_dt, datetime):
644        begin = (
645            round_time(
646                min_dt,
647                to = 'down'
648            ) - timedelta(minutes=1)
649        )
650    elif dt_type and 'int' in dt_type.lower():
651        begin = min_dt
652    elif dt_col is None:
653        begin = None
654
655    ### end is the newest data in the new dataframe
656    try:
657        max_dt_val = df[dt_col].max(skipna=True) if dt_col else None
658        if is_dask and max_dt_val is not None:
659            max_dt_val = max_dt_val.compute()
660        max_dt = (
661            pandas.to_datetime(max_dt_val).to_pydatetime()
662            if max_dt_val is not None and 'datetime' in str(dt_type)
663            else max_dt_val
664        )
665    except Exception as e:
666        import traceback
667        traceback.print_exc()
668        max_dt = None
669
670    if ('datetime' not in str(type(max_dt))) or str(min_dt) == 'NaT':
671        if 'int' not in str(type(max_dt)).lower():
672            max_dt = None
673
674    if isinstance(max_dt, datetime):
675        end = (
676            round_time(
677                max_dt,
678                to = 'down'
679            ) + timedelta(minutes=1)
680        )
681    elif dt_type and 'int' in dt_type.lower():
682        end = max_dt + 1
683
684    if max_dt is not None and min_dt is not None and min_dt > max_dt:
685        warn(f"Detected minimum datetime greater than maximum datetime.")
686
687    if begin is not None and end is not None and begin > end:
688        if isinstance(begin, datetime):
689            begin = end - timedelta(minutes=1)
690        ### We might be using integers for the datetime axis.
691        else:
692            begin = end - 1
693
694    unique_index_vals = {
695        col: df[col].unique()
696        for col in self.columns
697        if col in df.columns and col != dt_col
698    } if not date_bound_only else {}
699    filter_params_index_limit = get_config('pipes', 'sync', 'filter_params_index_limit')
700    _ = kw.pop('params', None)
701    params = {
702        col: [
703            none_if_null(val)
704            for val in unique_vals
705        ]
706        for col, unique_vals in unique_index_vals.items()
707        if len(unique_vals) <= filter_params_index_limit
708    } if not date_bound_only else {}
709
710    if debug:
711        dprint(f"Looking at data between '{begin}' and '{end}':", **kw)
712
713    backtrack_df = self.get_data(
714        begin = begin,
715        end = end,
716        chunksize = chunksize,
717        params = params,
718        debug = debug,
719        **kw
720    )
721    if debug:
722        dprint(f"Existing data for {self}:\n" + str(backtrack_df), **kw)
723        dprint(f"Existing dtypes for {self}:\n" + str(backtrack_df.dtypes))
724
725    ### Separate new rows from changed ones.
726    on_cols = [
727        col for col_key, col in self.columns.items()
728        if (
729            col
730            and
731            col_key != 'value'
732            and col in backtrack_df.columns
733        )
734    ]
735    self_dtypes = self.dtypes
736    on_cols_dtypes = {
737        col: to_pandas_dtype(typ)
738        for col, typ in self_dtypes.items()
739        if col in on_cols
740    }
741
742    ### Detect changes between the old target and new source dataframes.
743    delta_df = add_missing_cols_to_df(
744        filter_unseen_df(
745            backtrack_df,
746            df,
747            dtypes = {
748                col: to_pandas_dtype(typ)
749                for col, typ in self_dtypes.items()
750            },
751            safe_copy = safe_copy,
752            debug = debug
753        ),
754        on_cols_dtypes,
755    )
756
757    ### Cast dicts or lists to strings so we can merge.
758    serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str)
759    def deserializer(x):
760        return json.loads(x) if isinstance(x, str) else x
761
762    unhashable_delta_cols = get_unhashable_cols(delta_df)
763    unhashable_backtrack_cols = get_unhashable_cols(backtrack_df)
764    for col in unhashable_delta_cols:
765        delta_df[col] = delta_df[col].apply(serializer)
766    for col in unhashable_backtrack_cols:
767        backtrack_df[col] = backtrack_df[col].apply(serializer)
768    casted_cols = set(unhashable_delta_cols + unhashable_backtrack_cols)
769
770    joined_df = merge(
771        delta_df.fillna(NA),
772        backtrack_df.fillna(NA),
773        how = 'left',
774        on = on_cols,
775        indicator = True,
776        suffixes = ('', '_old'),
777    ) if on_cols else delta_df
778    for col in casted_cols:
779        if col in joined_df.columns:
780            joined_df[col] = joined_df[col].apply(deserializer)
781        if col in delta_df.columns:
782            delta_df[col] = delta_df[col].apply(deserializer)
783
784    ### Determine which rows are completely new.
785    new_rows_mask = (joined_df['_merge'] == 'left_only') if on_cols else None
786    cols = list(backtrack_df.columns)
787
788    unseen_df = (
789        (
790            joined_df
791            .where(new_rows_mask)
792            .dropna(how='all')[cols]
793            .reset_index(drop=True)
794        ) if not is_dask else (
795            joined_df
796            .where(new_rows_mask)
797            .dropna(how='all')[cols]
798            .reset_index(drop=True)
799        )
800    ) if on_cols else delta_df
801
802    ### Rows that have already been inserted but values have changed.
803    update_df = (
804        joined_df
805        .where(~new_rows_mask)
806        .dropna(how='all')[cols]
807        .reset_index(drop=True)
808    ) if on_cols else None
809
810    return unseen_df, update_df, delta_df

Inspect a dataframe and filter out rows which already exist in the pipe.

Parameters
  • df ('pd.DataFrame'): The dataframe to inspect and filter.
  • safe_copy (bool, default True): If True, create a copy before comparing and modifying the dataframes. Setting to False may mutate the DataFrames. See meerschaum.utils.dataframe.filter_unseen_df.
  • date_bound_only (bool, default False): If True, only use the datetime index to fetch the sample dataframe.
  • chunksize (Optional[int], default -1): The chunksize used when fetching existing data.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A tuple of three pandas DataFrames (unseen, update, and delta.):
def get_num_workers(self, workers: Optional[int] = None) -> int:
835def get_num_workers(self, workers: Optional[int] = None) -> int:
836    """
837    Get the number of workers to use for concurrent syncs.
838
839    Parameters
840    ----------
841    The number of workers passed via `--workers`.
842
843    Returns
844    -------
845    The number of workers, capped for safety.
846    """
847    is_thread_safe = getattr(self.instance_connector, 'IS_THREAD_SAFE', False)
848    if not is_thread_safe:
849        return 1
850
851    engine_pool_size = (
852        self.instance_connector.engine.pool.size()
853        if self.instance_connector.type == 'sql'
854        else None
855    )
856    current_num_threads = threading.active_count()
857    current_num_connections = (
858        self.instance_connector.engine.pool.checkedout()
859        if engine_pool_size is not None
860        else current_num_threads
861    )
862    desired_workers = (
863        min(workers or engine_pool_size, engine_pool_size)
864        if engine_pool_size is not None
865        else workers
866    )
867    if desired_workers is None:
868        desired_workers = (multiprocessing.cpu_count() if is_thread_safe else 1)
869
870    return max(
871        (desired_workers - current_num_connections),
872        1,
873    )

Get the number of workers to use for concurrent syncs.

Parameters
  • The number of workers passed via --workers.
Returns
  • The number of workers, capped for safety.
def verify( self, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, chunk_interval: Union[datetime.timedelta, int, NoneType] = None, bounded: Optional[bool] = None, deduplicate: bool = False, workers: Optional[int] = None, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
 15def verify(
 16        self,
 17        begin: Union[datetime, int, None] = None,
 18        end: Union[datetime, int, None] = None,
 19        params: Optional[Dict[str, Any]] = None,
 20        chunk_interval: Union[timedelta, int, None] = None,
 21        bounded: Optional[bool] = None,
 22        deduplicate: bool = False,
 23        workers: Optional[int] = None,
 24        debug: bool = False,
 25        **kwargs: Any
 26    ) -> SuccessTuple:
 27    """
 28    Verify the contents of the pipe by resyncing its interval.
 29
 30    Parameters
 31    ----------
 32    begin: Union[datetime, int, None], default None
 33        If specified, only verify rows greater than or equal to this value.
 34
 35    end: Union[datetime, int, None], default None
 36        If specified, only verify rows less than this value.
 37
 38    chunk_interval: Union[timedelta, int, None], default None
 39        If provided, use this as the size of the chunk boundaries.
 40        Default to the value set in `pipe.parameters['chunk_minutes']` (1440).
 41
 42    bounded: Optional[bool], default None
 43        If `True`, do not verify older than the oldest sync time or newer than the newest.
 44        If `False`, verify unbounded syncs outside of the new and old sync times.
 45        The default behavior (`None`) is to bound only if a bound interval is set
 46        (e.g. `pipe.parameters['verify']['bound_days']`).
 47
 48    deduplicate: bool, default False
 49        If `True`, deduplicate the pipe's table after the verification syncs.
 50
 51    workers: Optional[int], default None
 52        If provided, limit the verification to this many threads.
 53        Use a value of `1` to sync chunks in series.
 54
 55    debug: bool, default False
 56        Verbosity toggle.
 57
 58    kwargs: Any
 59        All keyword arguments are passed to `pipe.sync()`.
 60
 61    Returns
 62    -------
 63    A SuccessTuple indicating whether the pipe was successfully resynced.
 64    """
 65    from meerschaum.utils.pool import get_pool
 66    from meerschaum.utils.misc import interval_str
 67    workers = self.get_num_workers(workers)
 68
 69    ### Skip configured bounding in parameters
 70    ### if `bounded` is explicitly `False`.
 71    bound_time = (
 72        self.get_bound_time(debug=debug)
 73        if bounded is not False
 74        else None
 75    )
 76    if bounded is None:
 77        bounded = bound_time is not None
 78
 79    if bounded and begin is None:
 80        begin = (
 81            bound_time
 82            if bound_time is not None
 83            else self.get_sync_time(newest=False, debug=debug)
 84        )
 85    if bounded and end is None:
 86        end = self.get_sync_time(newest=True, debug=debug)
 87
 88    if bounded and end is not None:
 89        end += (
 90            timedelta(minutes=1)
 91            if isinstance(end, datetime)
 92            else 1
 93        )
 94
 95    sync_less_than_begin = not bounded and begin is None
 96    sync_greater_than_end = not bounded and end is None
 97
 98    cannot_determine_bounds = not self.exists(debug=debug)
 99
100    if cannot_determine_bounds:
101        sync_success, sync_msg = self.sync(
102            begin = begin,
103            end = end,
104            params = params,
105            workers = workers,
106            debug = debug,
107            **kwargs
108        )
109        if not sync_success:
110            return sync_success, sync_msg
111        if deduplicate:
112            return self.deduplicate(
113                begin = begin,
114                end = end,
115                params = params,
116                workers = workers,
117                debug = debug,
118                **kwargs
119            )
120        return sync_success, sync_msg
121
122
123    chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug)
124    chunk_bounds = self.get_chunk_bounds(
125        begin = begin,
126        end = end,
127        chunk_interval = chunk_interval,
128        bounded = bounded,
129        debug = debug,
130    )
131
132    ### Consider it a success if no chunks need to be verified.
133    if not chunk_bounds:
134        if deduplicate:
135            return self.deduplicate(
136                begin = begin,
137                end = end,
138                params = params,
139                workers = workers,
140                debug = debug,
141                **kwargs
142            )
143        return True, f"Could not determine chunks between '{begin}' and '{end}'; nothing to do."
144
145    begin_to_print = (
146        begin
147        if begin is not None
148        else (
149            chunk_bounds[0][0]
150            if bounded
151            else chunk_bounds[0][1]
152        )
153    )
154    end_to_print = (
155        end
156        if end is not None
157        else (
158            chunk_bounds[-1][1]
159            if bounded
160            else chunk_bounds[-1][0]
161        )
162    )
163
164    info(
165        f"Syncing {len(chunk_bounds)} chunk" + ('s' if len(chunk_bounds) != 1 else '')
166        + f" ({'un' if not bounded else ''}bounded)"
167        + f" of size '{interval_str(chunk_interval)}'"
168        + f" between '{begin_to_print}' and '{end_to_print}'."
169    )
170
171    pool = get_pool(workers=workers)
172
173    ### Dictionary of the form bounds -> success_tuple, e.g.:
174    ### {
175    ###    (2023-01-01, 2023-01-02): (True, "Success")
176    ### }
177    bounds_success_tuples = {}
178    def process_chunk_bounds(
179            chunk_begin_and_end: Tuple[
180                Union[int, datetime],
181                Union[int, datetime]
182            ]
183        ):
184        if chunk_begin_and_end in bounds_success_tuples:
185            return chunk_begin_and_end, bounds_success_tuples[chunk_begin_and_end]
186
187        chunk_begin, chunk_end = chunk_begin_and_end
188        return chunk_begin_and_end, self.sync(
189            begin = chunk_begin,
190            end = chunk_end,
191            params = params,
192            workers = workers,
193            debug = debug,
194            **kwargs
195        )
196
197    ### If we have more than one chunk, attempt to sync the first one and return if its fails.
198    if len(chunk_bounds) > 1:
199        first_chunk_bounds = chunk_bounds[0]
200        (
201            (first_begin, first_end),
202            (first_success, first_msg)
203        ) = process_chunk_bounds(first_chunk_bounds)
204        if not first_success:
205            return (
206                first_success,
207                f"\n{first_begin} - {first_end}\n"
208                + f"Failed to sync first chunk:\n{first_msg}"
209            )
210        bounds_success_tuples[first_chunk_bounds] = (first_success, first_msg)
211
212    bounds_success_tuples.update(dict(pool.map(process_chunk_bounds, chunk_bounds)))
213    bounds_success_bools = {bounds: tup[0] for bounds, tup in bounds_success_tuples.items()}
214
215    message_header = f"{begin_to_print} - {end_to_print}"
216    if all(bounds_success_bools.values()):
217        msg = get_chunks_success_message(bounds_success_tuples, header=message_header)
218        if deduplicate:
219            deduplicate_success, deduplicate_msg = self.deduplicate(
220                begin = begin,
221                end = end,
222                params = params,
223                workers = workers,
224                debug = debug,
225                **kwargs
226            )
227            return deduplicate_success, msg + '\n\n' + deduplicate_msg
228        return True, msg
229
230    chunk_bounds_to_resync = [
231        bounds
232        for bounds, success in zip(chunk_bounds, bounds_success_bools)
233        if not success
234    ]
235    bounds_to_print = [
236        f"{bounds[0]} - {bounds[1]}"
237        for bounds in chunk_bounds_to_resync
238    ]
239    if bounds_to_print:
240        warn(
241            f"Will resync the following failed chunks:\n    "
242            + '\n    '.join(bounds_to_print),
243            stack = False,
244        )
245
246    retry_bounds_success_tuples = dict(pool.map(process_chunk_bounds, chunk_bounds_to_resync))
247    bounds_success_tuples.update(retry_bounds_success_tuples)
248    retry_bounds_success_bools = {
249        bounds: tup[0]
250        for bounds, tup in retry_bounds_success_tuples.items()
251    }
252
253    if all(retry_bounds_success_bools.values()):
254        message = (
255            get_chunks_success_message(bounds_success_tuples, header=message_header)
256            + f"\nRetried {len(chunk_bounds_to_resync)} chunks."
257        )
258        if deduplicate:
259            deduplicate_success, deduplicate_msg = self.deduplicate(
260                begin = begin,
261                end = end,
262                params = params,
263                workers = workers,
264                debug = debug,
265                **kwargs
266            )
267            return deduplicate_success, message + '\n\n' + deduplicate_msg
268        return True, message
269
270    message = get_chunks_success_message(bounds_success_tuples, header=message_header)
271    if deduplicate:
272        deduplicate_success, deduplicate_msg = self.deduplicate(
273            begin = begin,
274            end = end,
275            params = params,
276            workers = workers,
277            debug = debug,
278            **kwargs
279        )
280        return deduplicate_success, message + '\n\n' + deduplicate_msg
281    return False, message

Verify the contents of the pipe by resyncing its interval.

Parameters
  • begin (Union[datetime, int, None], default None): If specified, only verify rows greater than or equal to this value.
  • end (Union[datetime, int, None], default None): If specified, only verify rows less than this value.
  • chunk_interval (Union[timedelta, int, None], default None): If provided, use this as the size of the chunk boundaries. Default to the value set in pipe.parameters['chunk_minutes'] (1440).
  • bounded (Optional[bool], default None): If True, do not verify older than the oldest sync time or newer than the newest. If False, verify unbounded syncs outside of the new and old sync times. The default behavior (None) is to bound only if a bound interval is set (e.g. pipe.parameters['verify']['bound_days']).
  • deduplicate (bool, default False): If True, deduplicate the pipe's table after the verification syncs.
  • workers (Optional[int], default None): If provided, limit the verification to this many threads. Use a value of 1 to sync chunks in series.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All keyword arguments are passed to pipe.sync().
Returns
  • A SuccessTuple indicating whether the pipe was successfully resynced.
def get_bound_interval(self, debug: bool = False) -> Union[datetime.timedelta, int, NoneType]:
353def get_bound_interval(self, debug: bool = False) -> Union[timedelta, int, None]:
354    """
355    Return the interval used to determine the bound time (limit for verification syncs).
356    If the datetime axis is an integer, just return its value.
357
358    Below are the supported keys for the bound interval:
359
360        - `pipe.parameters['verify']['bound_minutes']`
361        - `pipe.parameters['verify']['bound_hours']`
362        - `pipe.parameters['verify']['bound_days']`
363        - `pipe.parameters['verify']['bound_weeks']`
364        - `pipe.parameters['verify']['bound_years']`
365        - `pipe.parameters['verify']['bound_seconds']`
366
367    If multiple keys are present, the first on this priority list will be used.
368
369    Returns
370    -------
371    A `timedelta` or `int` value to be used to determine the bound time.
372    """
373    verify_params = self.parameters.get('verify', {})
374    prefix = 'bound_'
375    suffixes_to_check = ('minutes', 'hours', 'days', 'weeks', 'years', 'seconds')
376    keys_to_search = {
377        key: val
378        for key, val in verify_params.items()
379        if key.startswith(prefix)
380    }
381    bound_time_key, bound_time_value = None, None
382    for key, value in keys_to_search.items():
383        for suffix in suffixes_to_check:
384            if key == prefix + suffix:
385                bound_time_key = key
386                bound_time_value = value
387                break
388        if bound_time_key is not None:
389            break
390
391    if bound_time_value is None:
392        return bound_time_value
393
394    dt_col = self.columns.get('datetime', None)
395    if not dt_col:
396        return bound_time_value
397
398    dt_typ = self.dtypes.get(dt_col, 'datetime64[ns]')
399    if 'int' in dt_typ.lower():
400        return int(bound_time_value)
401
402    interval_type = bound_time_key.replace(prefix, '')
403    return timedelta(**{interval_type: bound_time_value})

Return the interval used to determine the bound time (limit for verification syncs). If the datetime axis is an integer, just return its value.

Below are the supported keys for the bound interval:

- `pipe.parameters['verify']['bound_minutes']`
- `pipe.parameters['verify']['bound_hours']`
- `pipe.parameters['verify']['bound_days']`
- `pipe.parameters['verify']['bound_weeks']`
- `pipe.parameters['verify']['bound_years']`
- `pipe.parameters['verify']['bound_seconds']`

If multiple keys are present, the first on this priority list will be used.

Returns
  • A timedelta or int value to be used to determine the bound time.
def get_bound_time(self, debug: bool = False) -> Union[datetime.datetime, int, NoneType]:
406def get_bound_time(self, debug: bool = False) -> Union[datetime, int, None]:
407    """
408    The bound time is the limit at which long-running verification syncs should stop.
409    A value of `None` means verification syncs should be unbounded.
410
411    Like deriving a backtrack time from `pipe.get_sync_time()`,
412    the bound time is the sync time minus a large window (e.g. 366 days).
413
414    Unbound verification syncs (i.e. `bound_time is None`)
415    if the oldest sync time is less than the bound interval.
416
417    Returns
418    -------
419    A `datetime` or `int` corresponding to the
420    `begin` bound for verification and deduplication syncs.
421    """ 
422    bound_interval = self.get_bound_interval(debug=debug)
423    if bound_interval is None:
424        return None
425
426    sync_time = self.get_sync_time(debug=debug)
427    if sync_time is None:
428        return None
429
430    bound_time = sync_time - bound_interval
431    oldest_sync_time = self.get_sync_time(newest=False, debug=debug)
432
433    return (
434        bound_time
435        if bound_time > oldest_sync_time
436        else None
437    )

The bound time is the limit at which long-running verification syncs should stop. A value of None means verification syncs should be unbounded.

Like deriving a backtrack time from pipe.get_sync_time(), the bound time is the sync time minus a large window (e.g. 366 days).

Unbound verification syncs (i.e. bound_time is None) if the oldest sync time is less than the bound interval.

Returns
  • A datetime or int corresponding to the
  • begin bound for verification and deduplication syncs.
def delete(self, drop: bool = True, debug: bool = False, **kw) -> Tuple[bool, str]:
12def delete(
13        self,
14        drop: bool = True,
15        debug: bool = False,
16        **kw
17    ) -> SuccessTuple:
18    """
19    Call the Pipe's instance connector's `delete_pipe()` method.
20
21    Parameters
22    ----------
23    drop: bool, default True
24        If `True`, drop the pipes' target table.
25
26    debug : bool, default False
27        Verbosity toggle.
28
29    Returns
30    -------
31    A `SuccessTuple` of success (`bool`), message (`str`).
32
33    """
34    import os, pathlib
35    from meerschaum.utils.warnings import warn
36    from meerschaum.utils.venv import Venv
37    from meerschaum.connectors import get_connector_plugin
38
39    if self.temporary:
40        return (
41            False,
42            "Cannot delete pipes created with `temporary=True` (read-only). "
43            + "You may want to call `pipe.drop()` instead."
44        )
45
46    if self.cache_pipe is not None:
47        _drop_cache_tuple = self.cache_pipe.drop(debug=debug, **kw)
48        if not _drop_cache_tuple[0]:
49            warn(_drop_cache_tuple[1])
50        if getattr(self.cache_connector, 'flavor', None) == 'sqlite':
51            _cache_db_path = pathlib.Path(self.cache_connector.database)
52            try:
53                os.remove(_cache_db_path)
54            except Exception as e:
55                warn(f"Could not delete cache file '{_cache_db_path}' for {self}:\n{e}")
56
57    if drop:
58        drop_success, drop_msg = self.drop(debug=debug)
59        if not drop_success:
60            warn(f"Failed to drop {self}:\n{drop_msg}")
61
62    with Venv(get_connector_plugin(self.instance_connector)):
63        result = self.instance_connector.delete_pipe(self, debug=debug, **kw)
64
65    if not isinstance(result, tuple):
66        return False, f"Received an unexpected result from '{self.instance_connector}': {result}"
67
68    if result[0]:
69        to_delete = ['_id']
70        for member in to_delete:
71            if member in self.__dict__:
72                del self.__dict__[member]
73    return result

Call the Pipe's instance connector's delete_pipe() method.

Parameters
  • drop (bool, default True): If True, drop the pipes' target table.
  • debug (bool, default False): Verbosity toggle.
Returns
def drop(self, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
13def drop(
14        self,
15        debug: bool = False,
16        **kw: Any
17    ) -> SuccessTuple:
18    """
19    Call the Pipe's instance connector's `drop_pipe()` method.
20
21    Parameters
22    ----------
23    debug: bool, default False:
24        Verbosity toggle.
25
26    Returns
27    -------
28    A `SuccessTuple` of success, message.
29
30    """
31    self._exists = False
32    from meerschaum.utils.warnings import warn
33    from meerschaum.utils.venv import Venv
34    from meerschaum.connectors import get_connector_plugin
35
36    if self.cache_pipe is not None:
37        _drop_cache_tuple = self.cache_pipe.drop(debug=debug, **kw)
38        if not _drop_cache_tuple[0]:
39            warn(_drop_cache_tuple[1])
40
41    with Venv(get_connector_plugin(self.instance_connector)):
42        result = self.instance_connector.drop_pipe(self, debug=debug, **kw)
43    return result

Call the Pipe's instance connector's drop_pipe() method.

Parameters
  • debug (bool, default False:): Verbosity toggle.
Returns
def clear( self, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
13def clear(
14        self,
15        begin: Optional[datetime.datetime] = None,
16        end: Optional[datetime.datetime] = None,
17        params: Optional[Dict[str, Any]] = None,
18        debug: bool = False,
19        **kwargs: Any
20    ) -> SuccessTuple:
21    """
22    Call the Pipe's instance connector's `clear_pipe` method.
23
24    Parameters
25    ----------
26    begin: Optional[datetime.datetime], default None:
27        If provided, only remove rows newer than this datetime value.
28
29    end: Optional[datetime.datetime], default None:
30        If provided, only remove rows older than this datetime column (not including end).
31
32    params: Optional[Dict[str, Any]], default None
33         See `meerschaum.utils.sql.build_where`.
34
35    debug: bool, default False:
36        Verbositity toggle.
37
38    Returns
39    -------
40    A `SuccessTuple` corresponding to whether this procedure completed successfully.
41
42    Examples
43    --------
44    >>> pipe = mrsm.Pipe('test', 'test', columns={'datetime': 'dt'}, instance='sql:local')
45    >>> pipe.sync({'dt': [datetime.datetime(2020, 1, 1, 0, 0)]})
46    >>> pipe.sync({'dt': [datetime.datetime(2021, 1, 1, 0, 0)]})
47    >>> pipe.sync({'dt': [datetime.datetime(2022, 1, 1, 0, 0)]})
48    >>> 
49    >>> pipe.clear(begin=datetime.datetime(2021, 1, 1, 0, 0))
50    >>> pipe.get_data()
51              dt
52    0 2020-01-01
53
54    """
55    from meerschaum.utils.warnings import warn
56    from meerschaum.utils.venv import Venv
57    from meerschaum.connectors import get_connector_plugin
58
59    if self.cache_pipe is not None:
60        success, msg = self.cache_pipe.clear(
61            begin = begin,
62            end = end,
63            params = params,
64            debug = debug,
65            **kwargs
66        )
67        if not success:
68            warn(msg)
69
70    with Venv(get_connector_plugin(self.instance_connector)):
71        return self.instance_connector.clear_pipe(
72            self,
73            begin = begin,
74            end = end,
75            params = params,
76            debug = debug,
77            **kwargs
78        )

Call the Pipe's instance connector's clear_pipe method.

Parameters
  • begin (Optional[datetime.datetime], default None:): If provided, only remove rows newer than this datetime value.
  • end (Optional[datetime.datetime], default None:): If provided, only remove rows older than this datetime column (not including end).
  • params (Optional[Dict[str, Any]], default None): See meerschaum.utils.sql.build_where.
  • debug (bool, default False:): Verbositity toggle.
Returns
  • A SuccessTuple corresponding to whether this procedure completed successfully.
Examples
>>> pipe = mrsm.Pipe('test', 'test', columns={'datetime': 'dt'}, instance='sql:local')
>>> pipe.sync({'dt': [datetime.datetime(2020, 1, 1, 0, 0)]})
>>> pipe.sync({'dt': [datetime.datetime(2021, 1, 1, 0, 0)]})
>>> pipe.sync({'dt': [datetime.datetime(2022, 1, 1, 0, 0)]})
>>> 
>>> pipe.clear(begin=datetime.datetime(2021, 1, 1, 0, 0))
>>> pipe.get_data()
          dt
0 2020-01-01
def deduplicate( self, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, chunk_interval: Union[datetime.datetime, int, NoneType] = None, bounded: Optional[bool] = None, workers: Optional[int] = None, debug: bool = False, _use_instance_method: bool = True, **kwargs: Any) -> Tuple[bool, str]:
 15def deduplicate(
 16        self,
 17        begin: Union[datetime, int, None] = None,
 18        end: Union[datetime, int, None] = None,
 19        params: Optional[Dict[str, Any]] = None,
 20        chunk_interval: Union[datetime, int, None] = None,
 21        bounded: Optional[bool] = None,
 22        workers: Optional[int] = None,
 23        debug: bool = False,
 24        _use_instance_method: bool = True,
 25        **kwargs: Any
 26    ) -> SuccessTuple:
 27    """
 28    Call the Pipe's instance connector's `delete_duplicates` method to delete duplicate rows.
 29
 30    Parameters
 31    ----------
 32    begin: Union[datetime, int, None], default None:
 33        If provided, only deduplicate rows newer than this datetime value.
 34
 35    end: Union[datetime, int, None], default None:
 36        If provided, only deduplicate rows older than this datetime column (not including end).
 37
 38    params: Optional[Dict[str, Any]], default None
 39        Restrict deduplication to this filter (for multiplexed data streams).
 40        See `meerschaum.utils.sql.build_where`.
 41
 42    chunk_interval: Union[timedelta, int, None], default None
 43        If provided, use this for the chunk bounds.
 44        Defaults to the value set in `pipe.parameters['chunk_minutes']` (1440).
 45
 46    bounded: Optional[bool], default None
 47        Only check outside the oldest and newest sync times if bounded is explicitly `False`.
 48
 49    workers: Optional[int], default None
 50        If the instance connector is thread-safe, limit concurrenct syncs to this many threads.
 51
 52    debug: bool, default False:
 53        Verbositity toggle.
 54
 55    kwargs: Any
 56        All other keyword arguments are passed to
 57        `pipe.sync()`, `pipe.clear()`, and `pipe.get_data().
 58
 59    Returns
 60    -------
 61    A `SuccessTuple` corresponding to whether all of the chunks were successfully deduplicated.
 62    """
 63    from meerschaum.utils.warnings import warn, info
 64    from meerschaum.utils.misc import interval_str, items_str
 65    from meerschaum.utils.venv import Venv
 66    from meerschaum.connectors import get_connector_plugin
 67    from meerschaum.utils.pool import get_pool
 68
 69    if self.cache_pipe is not None:
 70        success, msg = self.cache_pipe.deduplicate(
 71            begin = begin,
 72            end = end,
 73            params = params,
 74            bounded = bounded,
 75            debug = debug,
 76            _use_instance_method = _use_instance_method,
 77            **kwargs
 78        )
 79        if not success:
 80            warn(msg)
 81
 82    workers = self.get_num_workers(workers=workers)
 83    pool = get_pool(workers=workers)
 84
 85    if _use_instance_method:
 86        with Venv(get_connector_plugin(self.instance_connector)):
 87            if hasattr(self.instance_connector, 'deduplicate_pipe'):
 88                return self.instance_connector.deduplicate_pipe(
 89                    self,
 90                    begin = begin,
 91                    end = end,
 92                    params = params,
 93                    bounded = bounded,
 94                    debug = debug,
 95                    **kwargs
 96                )
 97
 98    ### Only unbound if explicitly False.
 99    if bounded is None:
100        bounded = True
101    chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug)
102
103    bound_time = self.get_bound_time(debug=debug)
104    if bounded and begin is None:
105        begin = (
106            bound_time
107            if bound_time is not None
108            else self.get_sync_time(newest=False, debug=debug)
109        )
110    if bounded and end is None:
111        end = self.get_sync_time(newest=True, debug=debug)
112
113    if bounded and end is not None:
114        end += (
115            timedelta(minutes=1)
116            if isinstance(end, datetime)
117            else 1
118        )
119
120    chunk_bounds = self.get_chunk_bounds(
121        bounded = bounded,
122        begin = begin,
123        end = end,
124        chunk_interval = chunk_interval,
125        debug = debug,
126    )
127
128    indices = [col for col in self.columns.values() if col]
129    if not indices:
130        return False, f"Cannot deduplicate without index columns."
131    dt_col = self.columns.get('datetime', None)
132
133    def process_chunk_bounds(bounds) -> Tuple[
134            Tuple[
135                Union[datetime, int, None],
136                Union[datetime, int, None]
137            ],
138            SuccessTuple
139        ]:
140        ### Only selecting the index values here to keep bandwidth down.
141        chunk_begin, chunk_end = bounds
142        chunk_df = self.get_data(
143            select_columns = indices, 
144            begin = chunk_begin,
145            end = chunk_end,
146            params = params,
147            debug = debug,
148        )
149        if chunk_df is None:
150            return bounds, (True, "")
151        existing_chunk_len = len(chunk_df)
152        deduped_chunk_df = chunk_df.drop_duplicates(keep='last')
153        deduped_chunk_len = len(deduped_chunk_df)
154
155        if existing_chunk_len == deduped_chunk_len:
156            return bounds, (True, "")
157
158        chunk_msg_header = f"\n{chunk_begin} - {chunk_end}"
159        chunk_msg_body = ""
160
161        full_chunk = self.get_data(
162            begin = chunk_begin,
163            end = chunk_end,
164            params = params,
165            debug = debug,
166        )
167        if full_chunk is None or len(full_chunk) == 0:
168            return bounds, (True, f"{chunk_msg_header}\nChunk is empty, skipping...")
169
170        chunk_indices = [ix for ix in indices if ix in full_chunk.columns]
171        if not chunk_indices:
172            return bounds, (False, f"None of {items_str(indices)} were present in chunk.")
173        try:
174            full_chunk = full_chunk.drop_duplicates(
175                subset = chunk_indices,
176                keep = 'last'
177            ).reset_index(
178                drop = True,
179            )
180        except Exception as e:
181            return (
182                bounds,
183                (False, f"Failed to deduplicate chunk on {items_str(chunk_indices)}:\n({e})")
184            )
185
186        clear_success, clear_msg = self.clear(
187            begin = chunk_begin,
188            end = chunk_end,
189            params = params,
190            debug = debug,
191        )
192        if not clear_success:
193            chunk_msg_body += f"Failed to clear chunk while deduplicating:\n{clear_msg}\n"
194            warn(chunk_msg_body)
195
196        sync_success, sync_msg = self.sync(full_chunk, debug=debug)
197        if not sync_success:
198            chunk_msg_body += f"Failed to sync chunk while deduplicating:\n{sync_msg}\n"
199         
200        ### Finally check if the deduplication worked.
201        chunk_rowcount = self.get_rowcount(
202            begin = chunk_begin,
203            end = chunk_end,
204            params = params,
205            debug = debug,
206        )
207        if chunk_rowcount != deduped_chunk_len:
208            return bounds, (
209                False, (
210                    chunk_msg_header + "\n"
211                    + chunk_msg_body + ("\n" if chunk_msg_body else '')
212                    + "Chunk rowcounts still differ ("
213                    + f"{chunk_rowcount} rowcount vs {deduped_chunk_len} chunk length)."
214                )
215            )
216
217        return bounds, (
218            True, (
219                chunk_msg_header + "\n"
220                + chunk_msg_body + ("\n" if chunk_msg_body else '')
221                + f"Deduplicated chunk from {existing_chunk_len} to {chunk_rowcount} rows."
222            )
223        )
224
225    info(
226        f"Deduplicating {len(chunk_bounds)} chunk"
227        + ('s' if len(chunk_bounds) != 1 else '')
228        + f" ({'un' if not bounded else ''}bounded)"
229        + f" of size '{interval_str(chunk_interval)}'"
230        + f" on {self}."
231    )
232    bounds_success_tuples = dict(pool.map(process_chunk_bounds, chunk_bounds))
233    bounds_successes = {
234        bounds: success_tuple
235        for bounds, success_tuple in bounds_success_tuples.items()
236        if success_tuple[0]
237    }
238    bounds_failures = {
239        bounds: success_tuple
240        for bounds, success_tuple in bounds_success_tuples.items()
241        if not success_tuple[0]
242    }
243
244    ### No need to retry if everything failed.
245    if len(bounds_failures) > 0 and len(bounds_successes) == 0:
246        return (
247            False,
248            (
249                f"Failed to deduplicate {len(bounds_failures)} chunk"
250                + ('s' if len(bounds_failures) != 1 else '')
251                + ".\n"
252                + "\n".join([msg for _, (_, msg) in bounds_failures.items() if msg])
253            )
254        )
255
256    retry_bounds = [bounds for bounds in bounds_failures]
257    if not retry_bounds:
258        return (
259            True,
260            (
261                f"Successfully deduplicated {len(bounds_successes)} chunk"
262                + ('s' if len(bounds_successes) != 1 else '')
263                + ".\n"
264                + "\n".join([msg for _, (_, msg) in bounds_successes.items() if msg])
265            ).rstrip('\n')
266        )
267
268    info(f"Retrying {len(retry_bounds)} chunks for {self}...")
269    retry_bounds_success_tuples = dict(pool.map(process_chunk_bounds, retry_bounds))
270    retry_bounds_successes = {
271        bounds: success_tuple
272        for bounds, success_tuple in bounds_success_tuples.items()
273        if success_tuple[0]
274    }
275    retry_bounds_failures = {
276        bounds: success_tuple
277        for bounds, success_tuple in bounds_success_tuples.items()
278        if not success_tuple[0]
279    }
280
281    bounds_successes.update(retry_bounds_successes)
282    if not retry_bounds_failures:
283        return (
284            True,
285            (
286                f"Successfully deduplicated {len(bounds_successes)} chunk"
287                + ('s' if len(bounds_successes) != 1 else '')
288                + f"({len(retry_bounds_successes)} retried):\n"
289                + "\n".join([msg for _, (_, msg) in bounds_successes.items() if msg])
290            ).rstrip('\n')
291        )
292
293    return (
294        False,
295        (
296            f"Failed to deduplicate {len(bounds_failures)} chunk"
297            + ('s' if len(retry_bounds_failures) != 1 else '')
298            + ".\n"
299            + "\n".join([msg for _, (_, msg) in retry_bounds_failures.items() if msg])
300        ).rstrip('\n')
301    )

Call the Pipe's instance connector's delete_duplicates method to delete duplicate rows.

Parameters
  • begin (Union[datetime, int, None], default None:): If provided, only deduplicate rows newer than this datetime value.
  • end (Union[datetime, int, None], default None:): If provided, only deduplicate rows older than this datetime column (not including end).
  • params (Optional[Dict[str, Any]], default None): Restrict deduplication to this filter (for multiplexed data streams). See meerschaum.utils.sql.build_where.
  • chunk_interval (Union[timedelta, int, None], default None): If provided, use this for the chunk bounds. Defaults to the value set in pipe.parameters['chunk_minutes'] (1440).
  • bounded (Optional[bool], default None): Only check outside the oldest and newest sync times if bounded is explicitly False.
  • workers (Optional[int], default None): If the instance connector is thread-safe, limit concurrenct syncs to this many threads.
  • debug (bool, default False:): Verbositity toggle.
  • kwargs (Any): All other keyword arguments are passed to pipe.sync(), pipe.clear(), and `pipe.get_data().
Returns
  • A SuccessTuple corresponding to whether all of the chunks were successfully deduplicated.
def bootstrap( self, debug: bool = False, yes: bool = False, force: bool = False, noask: bool = False, shell: bool = False, **kw) -> Tuple[bool, str]:
 13def bootstrap(
 14        self,
 15        debug: bool = False,
 16        yes: bool = False,
 17        force: bool = False,
 18        noask: bool = False,
 19        shell: bool = False,
 20        **kw
 21    ) -> SuccessTuple:
 22    """
 23    Prompt the user to create a pipe's requirements all from one method.
 24    This method shouldn't be used in any automated scripts because it interactively
 25    prompts the user and therefore may hang.
 26
 27    Parameters
 28    ----------
 29    debug: bool, default False:
 30        Verbosity toggle.
 31
 32    yes: bool, default False:
 33        Print the questions and automatically agree.
 34
 35    force: bool, default False:
 36        Skip the questions and agree anyway.
 37
 38    noask: bool, default False:
 39        Print the questions but go with the default answer.
 40
 41    shell: bool, default False:
 42        Used to determine if we are in the interactive shell.
 43        
 44    Returns
 45    -------
 46    A `SuccessTuple` corresponding to the success of this procedure.
 47
 48    """
 49
 50    from meerschaum.utils.warnings import warn, info, error
 51    from meerschaum.utils.prompt import prompt, yes_no
 52    from meerschaum.utils.formatting import pprint
 53    from meerschaum.config import get_config
 54    from meerschaum.utils.formatting._shell import clear_screen
 55    from meerschaum.utils.formatting import print_tuple
 56    from meerschaum.actions import actions
 57    from meerschaum.utils.venv import Venv
 58    from meerschaum.connectors import get_connector_plugin
 59
 60    _clear = get_config('shell', 'clear_screen', patch=True)
 61
 62    if self.get_id(debug=debug) is not None:
 63        delete_tuple = self.delete(debug=debug)
 64        if not delete_tuple[0]:
 65            return delete_tuple
 66
 67    if _clear:
 68        clear_screen(debug=debug)
 69
 70    _parameters = _get_parameters(self, debug=debug)
 71    self.parameters = _parameters
 72    pprint(self.parameters)
 73    try:
 74        prompt(
 75            f"\n    Press [Enter] to register {self} with the above configuration:",
 76            icon = False
 77        )
 78    except KeyboardInterrupt as e:
 79        return False, f"Aborting bootstrapping {self}."
 80
 81    with Venv(get_connector_plugin(self.instance_connector)):
 82        register_tuple = self.instance_connector.register_pipe(self, debug=debug)
 83
 84    if not register_tuple[0]:
 85        return register_tuple
 86
 87    if _clear:
 88        clear_screen(debug=debug)
 89
 90    try:
 91        if yes_no(
 92            f"Would you like to edit the definition for {self}?", yes=yes, noask=noask
 93        ):
 94            edit_tuple = self.edit_definition(debug=debug)
 95            if not edit_tuple[0]:
 96                return edit_tuple
 97
 98        if yes_no(f"Would you like to try syncing {self} now?", yes=yes, noask=noask):
 99            sync_tuple = actions['sync'](
100                ['pipes'],
101                connector_keys = [self.connector_keys],
102                metric_keys = [self.metric_key],
103                location_keys = [self.location_key],
104                mrsm_instance = str(self.instance_connector),
105                debug = debug,
106                shell = shell,
107            )
108            if not sync_tuple[0]:
109                return sync_tuple
110    except Exception as e:
111        return False, f"Failed to bootstrap {self}:\n" + str(e)
112
113    print_tuple((True, f"Finished bootstrapping {self}!"))
114    info(
115        f"You can edit this pipe later with `edit pipes` "
116        + "or set the definition with `edit pipes definition`.\n"
117        + "    To sync data into your pipe, run `sync pipes`."
118    )
119
120    return True, "Success"

Prompt the user to create a pipe's requirements all from one method. This method shouldn't be used in any automated scripts because it interactively prompts the user and therefore may hang.

Parameters
  • debug (bool, default False:): Verbosity toggle.
  • yes (bool, default False:): Print the questions and automatically agree.
  • force (bool, default False:): Skip the questions and agree anyway.
  • noask (bool, default False:): Print the questions but go with the default answer.
  • shell (bool, default False:): Used to determine if we are in the interactive shell.
Returns
  • A SuccessTuple corresponding to the success of this procedure.
def enforce_dtypes( self, df: pandas.core.frame.DataFrame, chunksize: Optional[int] = -1, safe_copy: bool = True, debug: bool = False) -> pandas.core.frame.DataFrame:
19def enforce_dtypes(
20        self,
21        df: 'pd.DataFrame',
22        chunksize: Optional[int] = -1,
23        safe_copy: bool = True,
24        debug: bool = False,
25    ) -> 'pd.DataFrame':
26    """
27    Cast the input dataframe to the pipe's registered data types.
28    If the pipe does not exist and dtypes are not set, return the dataframe.
29    """
30    import traceback
31    from meerschaum.utils.warnings import warn
32    from meerschaum.utils.debug import dprint
33    from meerschaum.utils.dataframe import parse_df_datetimes, enforce_dtypes as _enforce_dtypes
34    from meerschaum.utils.packages import import_pandas
35    pd = import_pandas(debug=debug)
36    if df is None:
37        if debug:
38            dprint(
39                f"Received None instead of a DataFrame.\n"
40                + "    Skipping dtype enforcement..."
41            )
42        return df
43
44    pipe_dtypes = self.dtypes
45
46    try:
47        if isinstance(df, str):
48            df = parse_df_datetimes(
49                pd.read_json(StringIO(df)),
50                ignore_cols = [
51                    col
52                    for col, dtype in pipe_dtypes.items()
53                    if 'datetime' not in str(dtype)
54                ],
55                chunksize = chunksize,
56                debug = debug,
57            )
58        else:
59            df = parse_df_datetimes(
60                df,
61                ignore_cols = [
62                    col
63                    for col, dtype in pipe_dtypes.items()
64                    if 'datetime' not in str(dtype)
65                ],
66                chunksize = chunksize,
67                debug = debug,
68            )
69    except Exception as e:
70        warn(f"Unable to cast incoming data as a DataFrame...:\n{e}\n\n{traceback.format_exc()}")
71        return None
72
73    if not pipe_dtypes:
74        if debug:
75            dprint(
76                f"Could not find dtypes for {self}.\n"
77                + "    Skipping dtype enforcement..."
78            )
79        return df
80
81    return _enforce_dtypes(df, pipe_dtypes, safe_copy=safe_copy, debug=debug)

Cast the input dataframe to the pipe's registered data types. If the pipe does not exist and dtypes are not set, return the dataframe.

def infer_dtypes(self, persist: bool = False, debug: bool = False) -> Dict[str, Any]:
 84def infer_dtypes(self, persist: bool=False, debug: bool=False) -> Dict[str, Any]:
 85    """
 86    If `dtypes` is not set in `meerschaum.Pipe.parameters`,
 87    infer the data types from the underlying table if it exists.
 88
 89    Parameters
 90    ----------
 91    persist: bool, default False
 92        If `True`, persist the inferred data types to `meerschaum.Pipe.parameters`.
 93
 94    Returns
 95    -------
 96    A dictionary of strings containing the pandas data types for this Pipe.
 97    """
 98    if not self.exists(debug=debug):
 99        dtypes = {}
100        if not self.columns:
101            return {}
102        dt_col = self.columns.get('datetime', None)
103        if dt_col:
104            if not self.parameters.get('dtypes', {}).get(dt_col, None):
105                dtypes[dt_col] = 'datetime64[ns]'
106        return dtypes
107
108    from meerschaum.utils.sql import get_pd_type
109    from meerschaum.utils.misc import to_pandas_dtype
110    columns_types = self.get_columns_types(debug=debug)
111
112    ### NOTE: get_columns_types() may return either the types as
113    ###       PostgreSQL- or Pandas-style.
114    dtypes = {
115        c: (
116            get_pd_type(t, allow_custom_dtypes=True)
117            if str(t).isupper()
118            else to_pandas_dtype(t)
119        )
120        for c, t in columns_types.items()
121    } if columns_types else {}
122    if persist:
123        self.dtypes = dtypes
124        self.edit(interactive=False, debug=debug)
125    return dtypes

If dtypes is not set in Pipe.parameters, infer the data types from the underlying table if it exists.

Parameters
  • persist (bool, default False): If True, persist the inferred data types to Pipe.parameters.
Returns
  • A dictionary of strings containing the pandas data types for this Pipe.
class Plugin:
 33class Plugin:
 34    """Handle packaging of Meerschaum plugins."""
 35    def __init__(
 36        self,
 37        name: str,
 38        version: Optional[str] = None,
 39        user_id: Optional[int] = None,
 40        required: Optional[List[str]] = None,
 41        attributes: Optional[Dict[str, Any]] = None,
 42        archive_path: Optional[pathlib.Path] = None,
 43        venv_path: Optional[pathlib.Path] = None,
 44        repo_connector: Optional['meerschaum.connectors.api.APIConnector'] = None,
 45        repo: Union['meerschaum.connectors.api.APIConnector', str, None] = None,
 46    ):
 47        from meerschaum.config.static import STATIC_CONFIG
 48        sep = STATIC_CONFIG['plugins']['repo_separator']
 49        _repo = None
 50        if sep in name:
 51            try:
 52                name, _repo = name.split(sep)
 53            except Exception as e:
 54                error(f"Invalid plugin name: '{name}'")
 55        self._repo_in_name = _repo
 56
 57        if attributes is None:
 58            attributes = {}
 59        self.name = name
 60        self.attributes = attributes
 61        self.user_id = user_id
 62        self._version = version
 63        if required:
 64            self._required = required
 65        self.archive_path = (
 66            archive_path if archive_path is not None
 67            else PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz"
 68        )
 69        self.venv_path = (
 70            venv_path if venv_path is not None
 71            else VIRTENV_RESOURCES_PATH / self.name
 72        )
 73        self._repo_connector = repo_connector
 74        self._repo_keys = repo
 75
 76
 77    @property
 78    def repo_connector(self):
 79        """
 80        Return the repository connector for this plugin.
 81        NOTE: This imports the `connectors` module, which imports certain plugin modules.
 82        """
 83        if self._repo_connector is None:
 84            from meerschaum.connectors.parse import parse_repo_keys
 85
 86            repo_keys = self._repo_keys or self._repo_in_name
 87            if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name:
 88                error(
 89                    f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'."
 90                )
 91            repo_connector = parse_repo_keys(repo_keys)
 92            self._repo_connector = repo_connector
 93        return self._repo_connector
 94
 95
 96    @property
 97    def version(self):
 98        """
 99        Return the plugin's module version is defined (`__version__`) if it's defined.
100        """
101        if self._version is None:
102            try:
103                self._version = self.module.__version__
104            except Exception as e:
105                self._version = None
106        return self._version
107
108
109    @property
110    def module(self):
111        """
112        Return the Python module of the underlying plugin.
113        """
114        if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None:
115            if self.__file__ is None:
116                return None
117            from meerschaum.plugins import import_plugins
118            self._module = import_plugins(str(self), warn=False)
119        return self._module
120
121
122    @property
123    def __file__(self) -> Union[str, None]:
124        """
125        Return the file path (str) of the plugin if it exists, otherwise `None`.
126        """
127        if self.__dict__.get('_module', None) is not None:
128            return self.module.__file__
129
130        potential_dir = PLUGINS_RESOURCES_PATH / self.name
131        if (
132            potential_dir.exists()
133            and potential_dir.is_dir()
134            and (potential_dir / '__init__.py').exists()
135        ):
136            return str((potential_dir / '__init__.py').as_posix())
137
138        potential_file = PLUGINS_RESOURCES_PATH / (self.name + '.py')
139        if potential_file.exists() and not potential_file.is_dir():
140            return str(potential_file.as_posix())
141
142        return None
143
144
145    @property
146    def requirements_file_path(self) -> Union[pathlib.Path, None]:
147        """
148        If a file named `requirements.txt` exists, return its path.
149        """
150        if self.__file__ is None:
151            return None
152        path = pathlib.Path(self.__file__).parent / 'requirements.txt'
153        if not path.exists():
154            return None
155        return path
156
157
158    def is_installed(self, **kw) -> bool:
159        """
160        Check whether a plugin is correctly installed.
161
162        Returns
163        -------
164        A `bool` indicating whether a plugin exists and is successfully imported.
165        """
166        return self.__file__ is not None
167
168
169    def make_tar(self, debug: bool = False) -> pathlib.Path:
170        """
171        Compress the plugin's source files into a `.tar.gz` archive and return the archive's path.
172
173        Parameters
174        ----------
175        debug: bool, default False
176            Verbosity toggle.
177
178        Returns
179        -------
180        A `pathlib.Path` to the archive file's path.
181
182        """
183        import tarfile, pathlib, subprocess, fnmatch
184        from meerschaum.utils.debug import dprint
185        from meerschaum.utils.packages import attempt_import
186        pathspec = attempt_import('pathspec', debug=debug)
187
188        if not self.__file__:
189            from meerschaum.utils.warnings import error
190            error(f"Could not find file for plugin '{self}'.")
191        if '__init__.py' in self.__file__ or os.path.isdir(self.__file__):
192            path = self.__file__.replace('__init__.py', '')
193            is_dir = True
194        else:
195            path = self.__file__
196            is_dir = False
197
198        old_cwd = os.getcwd()
199        real_parent_path = pathlib.Path(os.path.realpath(path)).parent
200        os.chdir(real_parent_path)
201
202        default_patterns_to_ignore = [
203            '.pyc',
204            '__pycache__/',
205            'eggs/',
206            '__pypackages__/',
207            '.git',
208        ]
209
210        def parse_gitignore() -> 'Set[str]':
211            gitignore_path = pathlib.Path(path) / '.gitignore'
212            if not gitignore_path.exists():
213                return set(default_patterns_to_ignore)
214            with open(gitignore_path, 'r', encoding='utf-8') as f:
215                gitignore_text = f.read()
216            return set(pathspec.PathSpec.from_lines(
217                pathspec.patterns.GitWildMatchPattern,
218                default_patterns_to_ignore + gitignore_text.splitlines()
219            ).match_tree(path))
220
221        patterns_to_ignore = parse_gitignore() if is_dir else set()
222
223        if debug:
224            dprint(f"Patterns to ignore:\n{patterns_to_ignore}")
225
226        with tarfile.open(self.archive_path, 'w:gz') as tarf:
227            if not is_dir:
228                tarf.add(f"{self.name}.py")
229            else:
230                for root, dirs, files in os.walk(self.name):
231                    for f in files:
232                        good_file = True
233                        fp = os.path.join(root, f)
234                        for pattern in patterns_to_ignore:
235                            if pattern in str(fp) or f.startswith('.'):
236                                good_file = False
237                                break
238                        if good_file:
239                            if debug:
240                                dprint(f"Adding '{fp}'...")
241                            tarf.add(fp)
242
243        ### clean up and change back to old directory
244        os.chdir(old_cwd)
245
246        ### change to 775 to avoid permissions issues with the API in a Docker container
247        self.archive_path.chmod(0o775)
248
249        if debug:
250            dprint(f"Created archive '{self.archive_path}'.")
251        return self.archive_path
252
253
254    def install(
255            self,
256            skip_deps: bool = False,
257            force: bool = False,
258            debug: bool = False,
259        ) -> SuccessTuple:
260        """
261        Extract a plugin's tar archive to the plugins directory.
262        
263        This function checks if the plugin is already installed and if the version is equal or
264        greater than the existing installation.
265
266        Parameters
267        ----------
268        skip_deps: bool, default False
269            If `True`, do not install dependencies.
270
271        force: bool, default False
272            If `True`, continue with installation, even if required packages fail to install.
273
274        debug: bool, default False
275            Verbosity toggle.
276
277        Returns
278        -------
279        A `SuccessTuple` of success (bool) and a message (str).
280
281        """
282        if self.full_name in _ongoing_installations:
283            return True, f"Already installing plugin '{self}'."
284        _ongoing_installations.add(self.full_name)
285        from meerschaum.utils.warnings import warn, error
286        if debug:
287            from meerschaum.utils.debug import dprint
288        import tarfile
289        import re
290        import ast
291        from meerschaum.plugins import sync_plugins_symlinks
292        from meerschaum.utils.packages import attempt_import, determine_version, reload_meerschaum
293        from meerschaum.utils.venv import init_venv
294        from meerschaum.utils.misc import safely_extract_tar
295        old_cwd = os.getcwd()
296        old_version = ''
297        new_version = ''
298        temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name
299        temp_dir.mkdir(exist_ok=True)
300
301        if not self.archive_path.exists():
302            return False, f"Missing archive file for plugin '{self}'."
303        if self.version is not None:
304            old_version = self.version
305            if debug:
306                dprint(f"Found existing version '{old_version}' for plugin '{self}'.")
307
308        if debug:
309            dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...")
310
311        try:
312            with tarfile.open(self.archive_path, 'r:gz') as tarf:
313                safely_extract_tar(tarf, temp_dir)
314        except Exception as e:
315            warn(e)
316            return False, f"Failed to extract plugin '{self.name}'."
317
318        ### search for version information
319        files = os.listdir(temp_dir)
320        
321        if str(files[0]) == self.name:
322            is_dir = True
323        elif str(files[0]) == self.name + '.py':
324            is_dir = False
325        else:
326            error(f"Unknown format encountered for plugin '{self}'.")
327
328        fpath = temp_dir / files[0]
329        if is_dir:
330            fpath = fpath / '__init__.py'
331
332        init_venv(self.name, debug=debug)
333        with open(fpath, 'r', encoding='utf-8') as f:
334            init_lines = f.readlines()
335        new_version = None
336        for line in init_lines:
337            if '__version__' not in line:
338                continue
339            version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip())
340            if not version_match:
341                continue
342            new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip())
343            break
344        if not new_version:
345            warn(
346                f"No `__version__` defined for plugin '{self}'. "
347                + "Assuming new version...",
348                stack = False,
349            )
350
351        packaging_version = attempt_import('packaging.version')
352        try:
353            is_new_version = (not new_version and not old_version) or (
354                packaging_version.parse(old_version) < packaging_version.parse(new_version)
355            )
356            is_same_version = new_version and old_version and (
357                packaging_version.parse(old_version) == packaging_version.parse(new_version)
358            )
359        except Exception as e:
360            is_new_version, is_same_version = True, False
361
362        ### Determine where to permanently store the new plugin.
363        plugin_installation_dir_path = PLUGINS_DIR_PATHS[0]
364        for path in PLUGINS_DIR_PATHS:
365            files_in_plugins_dir = os.listdir(path)
366            if (
367                self.name in files_in_plugins_dir
368                or
369                (self.name + '.py') in files_in_plugins_dir
370            ):
371                plugin_installation_dir_path = path
372                break
373
374        success_msg = (
375            f"Successfully installed plugin '{self}'"
376            + ("\n    (skipped dependencies)" if skip_deps else "")
377            + "."
378        )
379        success, abort = None, None
380
381        if is_same_version and not force:
382            success, msg = True, (
383                f"Plugin '{self}' is up-to-date (version {old_version}).\n" +
384                "    Install again with `-f` or `--force` to reinstall."
385            )
386            abort = True
387        elif is_new_version or force:
388            for src_dir, dirs, files in os.walk(temp_dir):
389                if success is not None:
390                    break
391                dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path))
392                if not os.path.exists(dst_dir):
393                    os.mkdir(dst_dir)
394                for f in files:
395                    src_file = os.path.join(src_dir, f)
396                    dst_file = os.path.join(dst_dir, f)
397                    if os.path.exists(dst_file):
398                        os.remove(dst_file)
399
400                    if debug:
401                        dprint(f"Moving '{src_file}' to '{dst_dir}'...")
402                    try:
403                        shutil.move(src_file, dst_dir)
404                    except Exception as e:
405                        success, msg = False, (
406                            f"Failed to install plugin '{self}': " +
407                            f"Could not move file '{src_file}' to '{dst_dir}'"
408                        )
409                        print(msg)
410                        break
411            if success is None:
412                success, msg = True, success_msg
413        else:
414            success, msg = False, (
415                f"Your installed version of plugin '{self}' ({old_version}) is higher than "
416                + f"attempted version {new_version}."
417            )
418
419        shutil.rmtree(temp_dir)
420        os.chdir(old_cwd)
421
422        ### Reload the plugin's module.
423        sync_plugins_symlinks(debug=debug)
424        if '_module' in self.__dict__:
425            del self.__dict__['_module']
426        init_venv(venv=self.name, force=True, debug=debug)
427        reload_meerschaum(debug=debug)
428
429        ### if we've already failed, return here
430        if not success or abort:
431            _ongoing_installations.remove(self.full_name)
432            return success, msg
433
434        ### attempt to install dependencies
435        dependencies_installed = skip_deps or self.install_dependencies(force=force, debug=debug)
436        if not dependencies_installed:
437            _ongoing_installations.remove(self.full_name)
438            return False, f"Failed to install dependencies for plugin '{self}'."
439
440        ### handling success tuple, bool, or other (typically None)
441        setup_tuple = self.setup(debug=debug)
442        if isinstance(setup_tuple, tuple):
443            if not setup_tuple[0]:
444                success, msg = setup_tuple
445        elif isinstance(setup_tuple, bool):
446            if not setup_tuple:
447                success, msg = False, (
448                    f"Failed to run post-install setup for plugin '{self}'." + '\n' +
449                    f"Check `setup()` in '{self.__file__}' for more information " +
450                    f"(no error message provided)."
451                )
452            else:
453                success, msg = True, success_msg
454        elif setup_tuple is None:
455            success = True
456            msg = (
457                f"Post-install for plugin '{self}' returned None. " +
458                f"Assuming plugin successfully installed."
459            )
460            warn(msg)
461        else:
462            success = False
463            msg = (
464                f"Post-install for plugin '{self}' returned unexpected value " +
465                f"of type '{type(setup_tuple)}': {setup_tuple}"
466            )
467
468        _ongoing_installations.remove(self.full_name)
469        module = self.module
470        return success, msg
471
472
473    def remove_archive(
474            self,        
475            debug: bool = False
476        ) -> SuccessTuple:
477        """Remove a plugin's archive file."""
478        if not self.archive_path.exists():
479            return True, f"Archive file for plugin '{self}' does not exist."
480        try:
481            self.archive_path.unlink()
482        except Exception as e:
483            return False, f"Failed to remove archive for plugin '{self}':\n{e}"
484        return True, "Success"
485
486
487    def remove_venv(
488            self,        
489            debug: bool = False
490        ) -> SuccessTuple:
491        """Remove a plugin's virtual environment."""
492        if not self.venv_path.exists():
493            return True, f"Virtual environment for plugin '{self}' does not exist."
494        try:
495            shutil.rmtree(self.venv_path)
496        except Exception as e:
497            return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}"
498        return True, "Success"
499
500
501    def uninstall(self, debug: bool = False) -> SuccessTuple:
502        """
503        Remove a plugin, its virtual environment, and archive file.
504        """
505        from meerschaum.utils.packages import reload_meerschaum
506        from meerschaum.plugins import sync_plugins_symlinks
507        from meerschaum.utils.warnings import warn, info
508        warnings_thrown_count: int = 0
509        max_warnings: int = 3
510
511        if not self.is_installed():
512            info(
513                f"Plugin '{self.name}' doesn't seem to be installed.\n    "
514                + "Checking for artifacts...",
515                stack = False,
516            )
517        else:
518            real_path = pathlib.Path(os.path.realpath(self.__file__))
519            try:
520                if real_path.name == '__init__.py':
521                    shutil.rmtree(real_path.parent)
522                else:
523                    real_path.unlink()
524            except Exception as e:
525                warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False)
526                warnings_thrown_count += 1
527            else:
528                info(f"Removed source files for plugin '{self.name}'.")
529
530        if self.venv_path.exists():
531            success, msg = self.remove_venv(debug=debug)
532            if not success:
533                warn(msg, stack=False)
534                warnings_thrown_count += 1
535            else:
536                info(f"Removed virtual environment from plugin '{self.name}'.")
537
538        success = warnings_thrown_count < max_warnings
539        sync_plugins_symlinks(debug=debug)
540        self.deactivate_venv(force=True, debug=debug)
541        reload_meerschaum(debug=debug)
542        return success, (
543            f"Successfully uninstalled plugin '{self}'." if success
544            else f"Failed to uninstall plugin '{self}'."
545        )
546
547
548    def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]:
549        """
550        If exists, run the plugin's `setup()` function.
551
552        Parameters
553        ----------
554        *args: str
555            The positional arguments passed to the `setup()` function.
556            
557        debug: bool, default False
558            Verbosity toggle.
559
560        **kw: Any
561            The keyword arguments passed to the `setup()` function.
562
563        Returns
564        -------
565        A `SuccessTuple` or `bool` indicating success.
566
567        """
568        from meerschaum.utils.debug import dprint
569        import inspect
570        _setup = None
571        for name, fp in inspect.getmembers(self.module):
572            if name == 'setup' and inspect.isfunction(fp):
573                _setup = fp
574                break
575
576        ### assume success if no setup() is found (not necessary)
577        if _setup is None:
578            return True
579
580        sig = inspect.signature(_setup)
581        has_debug, has_kw = ('debug' in sig.parameters), False
582        for k, v in sig.parameters.items():
583            if '**' in str(v):
584                has_kw = True
585                break
586
587        _kw = {}
588        if has_kw:
589            _kw.update(kw)
590        if has_debug:
591            _kw['debug'] = debug
592
593        if debug:
594            dprint(f"Running setup for plugin '{self}'...")
595        try:
596            self.activate_venv(debug=debug)
597            return_tuple = _setup(*args, **_kw)
598            self.deactivate_venv(debug=debug)
599        except Exception as e:
600            return False, str(e)
601
602        if isinstance(return_tuple, tuple):
603            return return_tuple
604        if isinstance(return_tuple, bool):
605            return return_tuple, f"Setup for Plugin '{self.name}' did not return a message."
606        if return_tuple is None:
607            return False, f"Setup for Plugin '{self.name}' returned None."
608        return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"
609
610
611    def get_dependencies(
612            self,
613            debug: bool = False,
614        ) -> List[str]:
615        """
616        If the Plugin has specified dependencies in a list called `required`, return the list.
617        
618        **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages.
619        Meerschaum plugins may also specify connector keys for a repo after `'@'`.
620
621        Parameters
622        ----------
623        debug: bool, default False
624            Verbosity toggle.
625
626        Returns
627        -------
628        A list of required packages and plugins (str).
629
630        """
631        if '_required' in self.__dict__:
632            return self._required
633
634        ### If the plugin has not yet been imported,
635        ### infer the dependencies from the source text.
636        ### This is not super robust, and it doesn't feel right
637        ### having multiple versions of the logic.
638        ### This is necessary when determining the activation order
639        ### without having import the module.
640        ### For consistency's sake, the module-less method does not cache the requirements.
641        if self.__dict__.get('_module', None) is None:
642            file_path = self.__file__
643            if file_path is None:
644                return []
645            with open(file_path, 'r', encoding='utf-8') as f:
646                text = f.read()
647
648            if 'required' not in text:
649                return []
650
651            ### This has some limitations:
652            ### It relies on `required` being manually declared.
653            ### We lose the ability to dynamically alter the `required` list,
654            ### which is why we've kept the module-reliant method below.
655            import ast, re
656            ### NOTE: This technically would break 
657            ### if `required` was the very first line of the file.
658            req_start_match = re.search(r'required(:\s*)?.*=', text)
659            if not req_start_match:
660                return []
661            req_start = req_start_match.start()
662            equals_sign = req_start + text[req_start:].find('=')
663
664            ### Dependencies may have brackets within the strings, so push back the index.
665            first_opening_brace = equals_sign + 1 + text[equals_sign:].find('[')
666            if first_opening_brace == -1:
667                return []
668
669            next_closing_brace = equals_sign + 1 + text[equals_sign:].find(']')
670            if next_closing_brace == -1:
671                return []
672
673            start_ix = first_opening_brace + 1
674            end_ix = next_closing_brace
675
676            num_braces = 0
677            while True:
678                if '[' not in text[start_ix:end_ix]:
679                    break
680                num_braces += 1
681                start_ix = end_ix
682                end_ix += text[end_ix + 1:].find(']') + 1
683
684            req_end = end_ix + 1
685            req_text = (
686                text[(first_opening_brace-1):req_end]
687                .lstrip()
688                .replace('=', '', 1)
689                .lstrip()
690                .rstrip()
691            )
692            try:
693                required = ast.literal_eval(req_text)
694            except Exception as e:
695                warn(
696                    f"Unable to determine requirements for plugin '{self.name}' "
697                    + "without importing the module.\n"
698                    + "    This may be due to dynamically setting the global `required` list.\n"
699                    + f"    {e}"
700                )
701                return []
702            return required
703
704        import inspect
705        self.activate_venv(dependencies=False, debug=debug)
706        required = []
707        for name, val in inspect.getmembers(self.module):
708            if name == 'required':
709                required = val
710                break
711        self._required = required
712        self.deactivate_venv(dependencies=False, debug=debug)
713        return required
714
715
716    def get_required_plugins(self, debug: bool=False) -> List[meerschaum.plugins.Plugin]:
717        """
718        Return a list of required Plugin objects.
719        """
720        from meerschaum.utils.warnings import warn
721        from meerschaum.config import get_config
722        from meerschaum.config.static import STATIC_CONFIG
723        plugins = []
724        _deps = self.get_dependencies(debug=debug)
725        sep = STATIC_CONFIG['plugins']['repo_separator']
726        plugin_names = [
727            _d[len('plugin:'):] for _d in _deps
728            if _d.startswith('plugin:') and len(_d) > len('plugin:')
729        ]
730        default_repo_keys = get_config('meerschaum', 'default_repository')
731        for _plugin_name in plugin_names:
732            if sep in _plugin_name:
733                try:
734                    _plugin_name, _repo_keys = _plugin_name.split(sep)
735                except Exception as e:
736                    _repo_keys = default_repo_keys
737                    warn(
738                        f"Invalid repo keys for required plugin '{_plugin_name}'.\n    "
739                        + f"Will try to use '{_repo_keys}' instead.",
740                        stack = False,
741                    )
742            else:
743                _repo_keys = default_repo_keys
744            plugins.append(Plugin(_plugin_name, repo=_repo_keys))
745        return plugins
746
747
748    def get_required_packages(self, debug: bool=False) -> List[str]:
749        """
750        Return the required package names (excluding plugins).
751        """
752        _deps = self.get_dependencies(debug=debug)
753        return [_d for _d in _deps if not _d.startswith('plugin:')]
754
755
756    def activate_venv(self, dependencies: bool=True, debug: bool=False, **kw) -> bool:
757        """
758        Activate the virtual environments for the plugin and its dependencies.
759
760        Parameters
761        ----------
762        dependencies: bool, default True
763            If `True`, activate the virtual environments for required plugins.
764
765        Returns
766        -------
767        A bool indicating success.
768        """
769        from meerschaum.utils.venv import venv_target_path
770        from meerschaum.utils.packages import activate_venv
771        from meerschaum.utils.misc import make_symlink, is_symlink
772        from meerschaum.config._paths import PACKAGE_ROOT_PATH
773
774        if dependencies:
775            for plugin in self.get_required_plugins(debug=debug):
776                plugin.activate_venv(debug=debug, **kw)
777
778        vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True)
779        venv_meerschaum_path = vtp / 'meerschaum'
780
781        try:
782            success, msg = True, "Success"
783            if is_symlink(venv_meerschaum_path):
784                if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH:
785                    venv_meerschaum_path.unlink()
786                    success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH)
787        except Exception as e:
788            success, msg = False, str(e)
789        if not success:
790            warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}")
791
792        return activate_venv(self.name, debug=debug, **kw)
793
794
795    def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool:
796        """
797        Deactivate the virtual environments for the plugin and its dependencies.
798
799        Parameters
800        ----------
801        dependencies: bool, default True
802            If `True`, deactivate the virtual environments for required plugins.
803
804        Returns
805        -------
806        A bool indicating success.
807        """
808        from meerschaum.utils.packages import deactivate_venv
809        success = deactivate_venv(self.name, debug=debug, **kw)
810        if dependencies:
811            for plugin in self.get_required_plugins(debug=debug):
812                plugin.deactivate_venv(debug=debug, **kw)
813        return success
814
815
816    def install_dependencies(
817            self,
818            force: bool = False,
819            debug: bool = False,
820        ) -> bool:
821        """
822        If specified, install dependencies.
823        
824        **NOTE:** Dependencies that start with `'plugin:'` will be installed as
825        Meerschaum plugins from the same repository as this Plugin.
826        To install from a different repository, add the repo keys after `'@'`
827        (e.g. `'plugin:foo@api:bar'`).
828
829        Parameters
830        ----------
831        force: bool, default False
832            If `True`, continue with the installation, even if some
833            required packages fail to install.
834
835        debug: bool, default False
836            Verbosity toggle.
837
838        Returns
839        -------
840        A bool indicating success.
841
842        """
843        from meerschaum.utils.packages import pip_install, venv_contains_package
844        from meerschaum.utils.debug import dprint
845        from meerschaum.utils.warnings import warn, info
846        from meerschaum.connectors.parse import parse_repo_keys
847        _deps = self.get_dependencies(debug=debug)
848        if not _deps and self.requirements_file_path is None:
849            return True
850
851        plugins = self.get_required_plugins(debug=debug)
852        for _plugin in plugins:
853            if _plugin.name == self.name:
854                warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False)
855                continue
856            _success, _msg = _plugin.repo_connector.install_plugin(
857                _plugin.name, debug=debug, force=force
858            )
859            if not _success:
860                warn(
861                    f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'"
862                    + f" for plugin '{self.name}':\n" + _msg,
863                    stack = False,
864                )
865                if not force:
866                    warn(
867                        "Try installing with the `--force` flag to continue anyway.",
868                        stack = False,
869                    )
870                    return False
871                info(
872                    "Continuing with installation despite the failure "
873                    + "(careful, things might be broken!)...",
874                    icon = False
875                )
876
877
878        ### First step: parse `requirements.txt` if it exists.
879        if self.requirements_file_path is not None:
880            if not pip_install(
881                requirements_file_path=self.requirements_file_path,
882                venv=self.name, debug=debug
883            ):
884                warn(
885                    f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.",
886                    stack = False,
887                )
888                if not force:
889                    warn(
890                        "Try installing with `--force` to continue anyway.",
891                        stack = False,
892                    )
893                    return False
894                info(
895                    "Continuing with installation despite the failure "
896                    + "(careful, things might be broken!)...",
897                    icon = False
898                )
899
900
901        ### Don't reinstall packages that are already included in required plugins.
902        packages = []
903        _packages = self.get_required_packages(debug=debug)
904        accounted_for_packages = set()
905        for package_name in _packages:
906            for plugin in plugins:
907                if venv_contains_package(package_name, plugin.name):
908                    accounted_for_packages.add(package_name)
909                    break
910        packages = [pkg for pkg in _packages if pkg not in accounted_for_packages]
911
912        ### Attempt pip packages installation.
913        if packages:
914            for package in packages:
915                if not pip_install(package, venv=self.name, debug=debug):
916                    warn(
917                        f"Failed to install required package '{package}'"
918                        + f" for plugin '{self.name}'.",
919                        stack = False,
920                    )
921                    if not force:
922                        warn(
923                            "Try installing with `--force` to continue anyway.",
924                            stack = False,
925                        )
926                        return False
927                    info(
928                        "Continuing with installation despite the failure "
929                        + "(careful, things might be broken!)...",
930                        icon = False
931                    )
932        return True
933
934
935    @property
936    def full_name(self) -> str:
937        """
938        Include the repo keys with the plugin's name.
939        """
940        from meerschaum.config.static import STATIC_CONFIG
941        sep = STATIC_CONFIG['plugins']['repo_separator']
942        return self.name + sep + str(self.repo_connector)
943
944
945    def __str__(self):
946        return self.name
947
948
949    def __repr__(self):
950        return f"Plugin('{self.name}', repo='{self.repo_connector}')"
951
952
953    def __del__(self):
954        pass

Handle packaging of Meerschaum plugins.

Plugin( name: str, version: Optional[str] = None, user_id: Optional[int] = None, required: Optional[List[str]] = None, attributes: Optional[Dict[str, Any]] = None, archive_path: Optional[pathlib.Path] = None, venv_path: Optional[pathlib.Path] = None, repo_connector: Optional[meerschaum.connectors.APIConnector] = None, repo: Union[meerschaum.connectors.APIConnector, str, NoneType] = None)
35    def __init__(
36        self,
37        name: str,
38        version: Optional[str] = None,
39        user_id: Optional[int] = None,
40        required: Optional[List[str]] = None,
41        attributes: Optional[Dict[str, Any]] = None,
42        archive_path: Optional[pathlib.Path] = None,
43        venv_path: Optional[pathlib.Path] = None,
44        repo_connector: Optional['meerschaum.connectors.api.APIConnector'] = None,
45        repo: Union['meerschaum.connectors.api.APIConnector', str, None] = None,
46    ):
47        from meerschaum.config.static import STATIC_CONFIG
48        sep = STATIC_CONFIG['plugins']['repo_separator']
49        _repo = None
50        if sep in name:
51            try:
52                name, _repo = name.split(sep)
53            except Exception as e:
54                error(f"Invalid plugin name: '{name}'")
55        self._repo_in_name = _repo
56
57        if attributes is None:
58            attributes = {}
59        self.name = name
60        self.attributes = attributes
61        self.user_id = user_id
62        self._version = version
63        if required:
64            self._required = required
65        self.archive_path = (
66            archive_path if archive_path is not None
67            else PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz"
68        )
69        self.venv_path = (
70            venv_path if venv_path is not None
71            else VIRTENV_RESOURCES_PATH / self.name
72        )
73        self._repo_connector = repo_connector
74        self._repo_keys = repo
name
attributes
user_id
archive_path
venv_path
repo_connector
77    @property
78    def repo_connector(self):
79        """
80        Return the repository connector for this plugin.
81        NOTE: This imports the `connectors` module, which imports certain plugin modules.
82        """
83        if self._repo_connector is None:
84            from meerschaum.connectors.parse import parse_repo_keys
85
86            repo_keys = self._repo_keys or self._repo_in_name
87            if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name:
88                error(
89                    f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'."
90                )
91            repo_connector = parse_repo_keys(repo_keys)
92            self._repo_connector = repo_connector
93        return self._repo_connector

Return the repository connector for this plugin. NOTE: This imports the connectors module, which imports certain plugin modules.

version
 96    @property
 97    def version(self):
 98        """
 99        Return the plugin's module version is defined (`__version__`) if it's defined.
100        """
101        if self._version is None:
102            try:
103                self._version = self.module.__version__
104            except Exception as e:
105                self._version = None
106        return self._version

Return the plugin's module version is defined (__version__) if it's defined.

module
109    @property
110    def module(self):
111        """
112        Return the Python module of the underlying plugin.
113        """
114        if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None:
115            if self.__file__ is None:
116                return None
117            from meerschaum.plugins import import_plugins
118            self._module = import_plugins(str(self), warn=False)
119        return self._module

Return the Python module of the underlying plugin.

requirements_file_path: Optional[pathlib.Path]
145    @property
146    def requirements_file_path(self) -> Union[pathlib.Path, None]:
147        """
148        If a file named `requirements.txt` exists, return its path.
149        """
150        if self.__file__ is None:
151            return None
152        path = pathlib.Path(self.__file__).parent / 'requirements.txt'
153        if not path.exists():
154            return None
155        return path

If a file named requirements.txt exists, return its path.

def is_installed(self, **kw) -> bool:
158    def is_installed(self, **kw) -> bool:
159        """
160        Check whether a plugin is correctly installed.
161
162        Returns
163        -------
164        A `bool` indicating whether a plugin exists and is successfully imported.
165        """
166        return self.__file__ is not None

Check whether a plugin is correctly installed.

Returns
  • A bool indicating whether a plugin exists and is successfully imported.
def make_tar(self, debug: bool = False) -> pathlib.Path:
169    def make_tar(self, debug: bool = False) -> pathlib.Path:
170        """
171        Compress the plugin's source files into a `.tar.gz` archive and return the archive's path.
172
173        Parameters
174        ----------
175        debug: bool, default False
176            Verbosity toggle.
177
178        Returns
179        -------
180        A `pathlib.Path` to the archive file's path.
181
182        """
183        import tarfile, pathlib, subprocess, fnmatch
184        from meerschaum.utils.debug import dprint
185        from meerschaum.utils.packages import attempt_import
186        pathspec = attempt_import('pathspec', debug=debug)
187
188        if not self.__file__:
189            from meerschaum.utils.warnings import error
190            error(f"Could not find file for plugin '{self}'.")
191        if '__init__.py' in self.__file__ or os.path.isdir(self.__file__):
192            path = self.__file__.replace('__init__.py', '')
193            is_dir = True
194        else:
195            path = self.__file__
196            is_dir = False
197
198        old_cwd = os.getcwd()
199        real_parent_path = pathlib.Path(os.path.realpath(path)).parent
200        os.chdir(real_parent_path)
201
202        default_patterns_to_ignore = [
203            '.pyc',
204            '__pycache__/',
205            'eggs/',
206            '__pypackages__/',
207            '.git',
208        ]
209
210        def parse_gitignore() -> 'Set[str]':
211            gitignore_path = pathlib.Path(path) / '.gitignore'
212            if not gitignore_path.exists():
213                return set(default_patterns_to_ignore)
214            with open(gitignore_path, 'r', encoding='utf-8') as f:
215                gitignore_text = f.read()
216            return set(pathspec.PathSpec.from_lines(
217                pathspec.patterns.GitWildMatchPattern,
218                default_patterns_to_ignore + gitignore_text.splitlines()
219            ).match_tree(path))
220
221        patterns_to_ignore = parse_gitignore() if is_dir else set()
222
223        if debug:
224            dprint(f"Patterns to ignore:\n{patterns_to_ignore}")
225
226        with tarfile.open(self.archive_path, 'w:gz') as tarf:
227            if not is_dir:
228                tarf.add(f"{self.name}.py")
229            else:
230                for root, dirs, files in os.walk(self.name):
231                    for f in files:
232                        good_file = True
233                        fp = os.path.join(root, f)
234                        for pattern in patterns_to_ignore:
235                            if pattern in str(fp) or f.startswith('.'):
236                                good_file = False
237                                break
238                        if good_file:
239                            if debug:
240                                dprint(f"Adding '{fp}'...")
241                            tarf.add(fp)
242
243        ### clean up and change back to old directory
244        os.chdir(old_cwd)
245
246        ### change to 775 to avoid permissions issues with the API in a Docker container
247        self.archive_path.chmod(0o775)
248
249        if debug:
250            dprint(f"Created archive '{self.archive_path}'.")
251        return self.archive_path

Compress the plugin's source files into a .tar.gz archive and return the archive's path.

Parameters
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pathlib.Path to the archive file's path.
def install( self, skip_deps: bool = False, force: bool = False, debug: bool = False) -> Tuple[bool, str]:
254    def install(
255            self,
256            skip_deps: bool = False,
257            force: bool = False,
258            debug: bool = False,
259        ) -> SuccessTuple:
260        """
261        Extract a plugin's tar archive to the plugins directory.
262        
263        This function checks if the plugin is already installed and if the version is equal or
264        greater than the existing installation.
265
266        Parameters
267        ----------
268        skip_deps: bool, default False
269            If `True`, do not install dependencies.
270
271        force: bool, default False
272            If `True`, continue with installation, even if required packages fail to install.
273
274        debug: bool, default False
275            Verbosity toggle.
276
277        Returns
278        -------
279        A `SuccessTuple` of success (bool) and a message (str).
280
281        """
282        if self.full_name in _ongoing_installations:
283            return True, f"Already installing plugin '{self}'."
284        _ongoing_installations.add(self.full_name)
285        from meerschaum.utils.warnings import warn, error
286        if debug:
287            from meerschaum.utils.debug import dprint
288        import tarfile
289        import re
290        import ast
291        from meerschaum.plugins import sync_plugins_symlinks
292        from meerschaum.utils.packages import attempt_import, determine_version, reload_meerschaum
293        from meerschaum.utils.venv import init_venv
294        from meerschaum.utils.misc import safely_extract_tar
295        old_cwd = os.getcwd()
296        old_version = ''
297        new_version = ''
298        temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name
299        temp_dir.mkdir(exist_ok=True)
300
301        if not self.archive_path.exists():
302            return False, f"Missing archive file for plugin '{self}'."
303        if self.version is not None:
304            old_version = self.version
305            if debug:
306                dprint(f"Found existing version '{old_version}' for plugin '{self}'.")
307
308        if debug:
309            dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...")
310
311        try:
312            with tarfile.open(self.archive_path, 'r:gz') as tarf:
313                safely_extract_tar(tarf, temp_dir)
314        except Exception as e:
315            warn(e)
316            return False, f"Failed to extract plugin '{self.name}'."
317
318        ### search for version information
319        files = os.listdir(temp_dir)
320        
321        if str(files[0]) == self.name:
322            is_dir = True
323        elif str(files[0]) == self.name + '.py':
324            is_dir = False
325        else:
326            error(f"Unknown format encountered for plugin '{self}'.")
327
328        fpath = temp_dir / files[0]
329        if is_dir:
330            fpath = fpath / '__init__.py'
331
332        init_venv(self.name, debug=debug)
333        with open(fpath, 'r', encoding='utf-8') as f:
334            init_lines = f.readlines()
335        new_version = None
336        for line in init_lines:
337            if '__version__' not in line:
338                continue
339            version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip())
340            if not version_match:
341                continue
342            new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip())
343            break
344        if not new_version:
345            warn(
346                f"No `__version__` defined for plugin '{self}'. "
347                + "Assuming new version...",
348                stack = False,
349            )
350
351        packaging_version = attempt_import('packaging.version')
352        try:
353            is_new_version = (not new_version and not old_version) or (
354                packaging_version.parse(old_version) < packaging_version.parse(new_version)
355            )
356            is_same_version = new_version and old_version and (
357                packaging_version.parse(old_version) == packaging_version.parse(new_version)
358            )
359        except Exception as e:
360            is_new_version, is_same_version = True, False
361
362        ### Determine where to permanently store the new plugin.
363        plugin_installation_dir_path = PLUGINS_DIR_PATHS[0]
364        for path in PLUGINS_DIR_PATHS:
365            files_in_plugins_dir = os.listdir(path)
366            if (
367                self.name in files_in_plugins_dir
368                or
369                (self.name + '.py') in files_in_plugins_dir
370            ):
371                plugin_installation_dir_path = path
372                break
373
374        success_msg = (
375            f"Successfully installed plugin '{self}'"
376            + ("\n    (skipped dependencies)" if skip_deps else "")
377            + "."
378        )
379        success, abort = None, None
380
381        if is_same_version and not force:
382            success, msg = True, (
383                f"Plugin '{self}' is up-to-date (version {old_version}).\n" +
384                "    Install again with `-f` or `--force` to reinstall."
385            )
386            abort = True
387        elif is_new_version or force:
388            for src_dir, dirs, files in os.walk(temp_dir):
389                if success is not None:
390                    break
391                dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path))
392                if not os.path.exists(dst_dir):
393                    os.mkdir(dst_dir)
394                for f in files:
395                    src_file = os.path.join(src_dir, f)
396                    dst_file = os.path.join(dst_dir, f)
397                    if os.path.exists(dst_file):
398                        os.remove(dst_file)
399
400                    if debug:
401                        dprint(f"Moving '{src_file}' to '{dst_dir}'...")
402                    try:
403                        shutil.move(src_file, dst_dir)
404                    except Exception as e:
405                        success, msg = False, (
406                            f"Failed to install plugin '{self}': " +
407                            f"Could not move file '{src_file}' to '{dst_dir}'"
408                        )
409                        print(msg)
410                        break
411            if success is None:
412                success, msg = True, success_msg
413        else:
414            success, msg = False, (
415                f"Your installed version of plugin '{self}' ({old_version}) is higher than "
416                + f"attempted version {new_version}."
417            )
418
419        shutil.rmtree(temp_dir)
420        os.chdir(old_cwd)
421
422        ### Reload the plugin's module.
423        sync_plugins_symlinks(debug=debug)
424        if '_module' in self.__dict__:
425            del self.__dict__['_module']
426        init_venv(venv=self.name, force=True, debug=debug)
427        reload_meerschaum(debug=debug)
428
429        ### if we've already failed, return here
430        if not success or abort:
431            _ongoing_installations.remove(self.full_name)
432            return success, msg
433
434        ### attempt to install dependencies
435        dependencies_installed = skip_deps or self.install_dependencies(force=force, debug=debug)
436        if not dependencies_installed:
437            _ongoing_installations.remove(self.full_name)
438            return False, f"Failed to install dependencies for plugin '{self}'."
439
440        ### handling success tuple, bool, or other (typically None)
441        setup_tuple = self.setup(debug=debug)
442        if isinstance(setup_tuple, tuple):
443            if not setup_tuple[0]:
444                success, msg = setup_tuple
445        elif isinstance(setup_tuple, bool):
446            if not setup_tuple:
447                success, msg = False, (
448                    f"Failed to run post-install setup for plugin '{self}'." + '\n' +
449                    f"Check `setup()` in '{self.__file__}' for more information " +
450                    f"(no error message provided)."
451                )
452            else:
453                success, msg = True, success_msg
454        elif setup_tuple is None:
455            success = True
456            msg = (
457                f"Post-install for plugin '{self}' returned None. " +
458                f"Assuming plugin successfully installed."
459            )
460            warn(msg)
461        else:
462            success = False
463            msg = (
464                f"Post-install for plugin '{self}' returned unexpected value " +
465                f"of type '{type(setup_tuple)}': {setup_tuple}"
466            )
467
468        _ongoing_installations.remove(self.full_name)
469        module = self.module
470        return success, msg

Extract a plugin's tar archive to the plugins directory.

This function checks if the plugin is already installed and if the version is equal or greater than the existing installation.

Parameters
  • skip_deps (bool, default False): If True, do not install dependencies.
  • force (bool, default False): If True, continue with installation, even if required packages fail to install.
  • debug (bool, default False): Verbosity toggle.
Returns
def remove_archive(self, debug: bool = False) -> Tuple[bool, str]:
473    def remove_archive(
474            self,        
475            debug: bool = False
476        ) -> SuccessTuple:
477        """Remove a plugin's archive file."""
478        if not self.archive_path.exists():
479            return True, f"Archive file for plugin '{self}' does not exist."
480        try:
481            self.archive_path.unlink()
482        except Exception as e:
483            return False, f"Failed to remove archive for plugin '{self}':\n{e}"
484        return True, "Success"

Remove a plugin's archive file.

def remove_venv(self, debug: bool = False) -> Tuple[bool, str]:
487    def remove_venv(
488            self,        
489            debug: bool = False
490        ) -> SuccessTuple:
491        """Remove a plugin's virtual environment."""
492        if not self.venv_path.exists():
493            return True, f"Virtual environment for plugin '{self}' does not exist."
494        try:
495            shutil.rmtree(self.venv_path)
496        except Exception as e:
497            return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}"
498        return True, "Success"

Remove a plugin's virtual environment.

def uninstall(self, debug: bool = False) -> Tuple[bool, str]:
501    def uninstall(self, debug: bool = False) -> SuccessTuple:
502        """
503        Remove a plugin, its virtual environment, and archive file.
504        """
505        from meerschaum.utils.packages import reload_meerschaum
506        from meerschaum.plugins import sync_plugins_symlinks
507        from meerschaum.utils.warnings import warn, info
508        warnings_thrown_count: int = 0
509        max_warnings: int = 3
510
511        if not self.is_installed():
512            info(
513                f"Plugin '{self.name}' doesn't seem to be installed.\n    "
514                + "Checking for artifacts...",
515                stack = False,
516            )
517        else:
518            real_path = pathlib.Path(os.path.realpath(self.__file__))
519            try:
520                if real_path.name == '__init__.py':
521                    shutil.rmtree(real_path.parent)
522                else:
523                    real_path.unlink()
524            except Exception as e:
525                warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False)
526                warnings_thrown_count += 1
527            else:
528                info(f"Removed source files for plugin '{self.name}'.")
529
530        if self.venv_path.exists():
531            success, msg = self.remove_venv(debug=debug)
532            if not success:
533                warn(msg, stack=False)
534                warnings_thrown_count += 1
535            else:
536                info(f"Removed virtual environment from plugin '{self.name}'.")
537
538        success = warnings_thrown_count < max_warnings
539        sync_plugins_symlinks(debug=debug)
540        self.deactivate_venv(force=True, debug=debug)
541        reload_meerschaum(debug=debug)
542        return success, (
543            f"Successfully uninstalled plugin '{self}'." if success
544            else f"Failed to uninstall plugin '{self}'."
545        )

Remove a plugin, its virtual environment, and archive file.

def setup( self, *args: str, debug: bool = False, **kw: Any) -> Union[Tuple[bool, str], bool]:
548    def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]:
549        """
550        If exists, run the plugin's `setup()` function.
551
552        Parameters
553        ----------
554        *args: str
555            The positional arguments passed to the `setup()` function.
556            
557        debug: bool, default False
558            Verbosity toggle.
559
560        **kw: Any
561            The keyword arguments passed to the `setup()` function.
562
563        Returns
564        -------
565        A `SuccessTuple` or `bool` indicating success.
566
567        """
568        from meerschaum.utils.debug import dprint
569        import inspect
570        _setup = None
571        for name, fp in inspect.getmembers(self.module):
572            if name == 'setup' and inspect.isfunction(fp):
573                _setup = fp
574                break
575
576        ### assume success if no setup() is found (not necessary)
577        if _setup is None:
578            return True
579
580        sig = inspect.signature(_setup)
581        has_debug, has_kw = ('debug' in sig.parameters), False
582        for k, v in sig.parameters.items():
583            if '**' in str(v):
584                has_kw = True
585                break
586
587        _kw = {}
588        if has_kw:
589            _kw.update(kw)
590        if has_debug:
591            _kw['debug'] = debug
592
593        if debug:
594            dprint(f"Running setup for plugin '{self}'...")
595        try:
596            self.activate_venv(debug=debug)
597            return_tuple = _setup(*args, **_kw)
598            self.deactivate_venv(debug=debug)
599        except Exception as e:
600            return False, str(e)
601
602        if isinstance(return_tuple, tuple):
603            return return_tuple
604        if isinstance(return_tuple, bool):
605            return return_tuple, f"Setup for Plugin '{self.name}' did not return a message."
606        if return_tuple is None:
607            return False, f"Setup for Plugin '{self.name}' returned None."
608        return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"

If exists, run the plugin's setup() function.

Parameters
  • *args (str): The positional arguments passed to the setup() function.
  • debug (bool, default False): Verbosity toggle.
  • **kw (Any): The keyword arguments passed to the setup() function.
Returns
def get_dependencies(self, debug: bool = False) -> List[str]:
611    def get_dependencies(
612            self,
613            debug: bool = False,
614        ) -> List[str]:
615        """
616        If the Plugin has specified dependencies in a list called `required`, return the list.
617        
618        **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages.
619        Meerschaum plugins may also specify connector keys for a repo after `'@'`.
620
621        Parameters
622        ----------
623        debug: bool, default False
624            Verbosity toggle.
625
626        Returns
627        -------
628        A list of required packages and plugins (str).
629
630        """
631        if '_required' in self.__dict__:
632            return self._required
633
634        ### If the plugin has not yet been imported,
635        ### infer the dependencies from the source text.
636        ### This is not super robust, and it doesn't feel right
637        ### having multiple versions of the logic.
638        ### This is necessary when determining the activation order
639        ### without having import the module.
640        ### For consistency's sake, the module-less method does not cache the requirements.
641        if self.__dict__.get('_module', None) is None:
642            file_path = self.__file__
643            if file_path is None:
644                return []
645            with open(file_path, 'r', encoding='utf-8') as f:
646                text = f.read()
647
648            if 'required' not in text:
649                return []
650
651            ### This has some limitations:
652            ### It relies on `required` being manually declared.
653            ### We lose the ability to dynamically alter the `required` list,
654            ### which is why we've kept the module-reliant method below.
655            import ast, re
656            ### NOTE: This technically would break 
657            ### if `required` was the very first line of the file.
658            req_start_match = re.search(r'required(:\s*)?.*=', text)
659            if not req_start_match:
660                return []
661            req_start = req_start_match.start()
662            equals_sign = req_start + text[req_start:].find('=')
663
664            ### Dependencies may have brackets within the strings, so push back the index.
665            first_opening_brace = equals_sign + 1 + text[equals_sign:].find('[')
666            if first_opening_brace == -1:
667                return []
668
669            next_closing_brace = equals_sign + 1 + text[equals_sign:].find(']')
670            if next_closing_brace == -1:
671                return []
672
673            start_ix = first_opening_brace + 1
674            end_ix = next_closing_brace
675
676            num_braces = 0
677            while True:
678                if '[' not in text[start_ix:end_ix]:
679                    break
680                num_braces += 1
681                start_ix = end_ix
682                end_ix += text[end_ix + 1:].find(']') + 1
683
684            req_end = end_ix + 1
685            req_text = (
686                text[(first_opening_brace-1):req_end]
687                .lstrip()
688                .replace('=', '', 1)
689                .lstrip()
690                .rstrip()
691            )
692            try:
693                required = ast.literal_eval(req_text)
694            except Exception as e:
695                warn(
696                    f"Unable to determine requirements for plugin '{self.name}' "
697                    + "without importing the module.\n"
698                    + "    This may be due to dynamically setting the global `required` list.\n"
699                    + f"    {e}"
700                )
701                return []
702            return required
703
704        import inspect
705        self.activate_venv(dependencies=False, debug=debug)
706        required = []
707        for name, val in inspect.getmembers(self.module):
708            if name == 'required':
709                required = val
710                break
711        self._required = required
712        self.deactivate_venv(dependencies=False, debug=debug)
713        return required

If the Plugin has specified dependencies in a list called required, return the list.

NOTE: Dependecies which start with 'plugin:' are Meerschaum plugins, not pip packages. Meerschaum plugins may also specify connector keys for a repo after '@'.

Parameters
  • debug (bool, default False): Verbosity toggle.
Returns
  • A list of required packages and plugins (str).
def get_required_plugins(self, debug: bool = False) -> List[Plugin]:
716    def get_required_plugins(self, debug: bool=False) -> List[meerschaum.plugins.Plugin]:
717        """
718        Return a list of required Plugin objects.
719        """
720        from meerschaum.utils.warnings import warn
721        from meerschaum.config import get_config
722        from meerschaum.config.static import STATIC_CONFIG
723        plugins = []
724        _deps = self.get_dependencies(debug=debug)
725        sep = STATIC_CONFIG['plugins']['repo_separator']
726        plugin_names = [
727            _d[len('plugin:'):] for _d in _deps
728            if _d.startswith('plugin:') and len(_d) > len('plugin:')
729        ]
730        default_repo_keys = get_config('meerschaum', 'default_repository')
731        for _plugin_name in plugin_names:
732            if sep in _plugin_name:
733                try:
734                    _plugin_name, _repo_keys = _plugin_name.split(sep)
735                except Exception as e:
736                    _repo_keys = default_repo_keys
737                    warn(
738                        f"Invalid repo keys for required plugin '{_plugin_name}'.\n    "
739                        + f"Will try to use '{_repo_keys}' instead.",
740                        stack = False,
741                    )
742            else:
743                _repo_keys = default_repo_keys
744            plugins.append(Plugin(_plugin_name, repo=_repo_keys))
745        return plugins

Return a list of required Plugin objects.

def get_required_packages(self, debug: bool = False) -> List[str]:
748    def get_required_packages(self, debug: bool=False) -> List[str]:
749        """
750        Return the required package names (excluding plugins).
751        """
752        _deps = self.get_dependencies(debug=debug)
753        return [_d for _d in _deps if not _d.startswith('plugin:')]

Return the required package names (excluding plugins).

def activate_venv(self, dependencies: bool = True, debug: bool = False, **kw) -> bool:
756    def activate_venv(self, dependencies: bool=True, debug: bool=False, **kw) -> bool:
757        """
758        Activate the virtual environments for the plugin and its dependencies.
759
760        Parameters
761        ----------
762        dependencies: bool, default True
763            If `True`, activate the virtual environments for required plugins.
764
765        Returns
766        -------
767        A bool indicating success.
768        """
769        from meerschaum.utils.venv import venv_target_path
770        from meerschaum.utils.packages import activate_venv
771        from meerschaum.utils.misc import make_symlink, is_symlink
772        from meerschaum.config._paths import PACKAGE_ROOT_PATH
773
774        if dependencies:
775            for plugin in self.get_required_plugins(debug=debug):
776                plugin.activate_venv(debug=debug, **kw)
777
778        vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True)
779        venv_meerschaum_path = vtp / 'meerschaum'
780
781        try:
782            success, msg = True, "Success"
783            if is_symlink(venv_meerschaum_path):
784                if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH:
785                    venv_meerschaum_path.unlink()
786                    success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH)
787        except Exception as e:
788            success, msg = False, str(e)
789        if not success:
790            warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}")
791
792        return activate_venv(self.name, debug=debug, **kw)

Activate the virtual environments for the plugin and its dependencies.

Parameters
  • dependencies (bool, default True): If True, activate the virtual environments for required plugins.
Returns
  • A bool indicating success.
def deactivate_venv(self, dependencies: bool = True, debug: bool = False, **kw) -> bool:
795    def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool:
796        """
797        Deactivate the virtual environments for the plugin and its dependencies.
798
799        Parameters
800        ----------
801        dependencies: bool, default True
802            If `True`, deactivate the virtual environments for required plugins.
803
804        Returns
805        -------
806        A bool indicating success.
807        """
808        from meerschaum.utils.packages import deactivate_venv
809        success = deactivate_venv(self.name, debug=debug, **kw)
810        if dependencies:
811            for plugin in self.get_required_plugins(debug=debug):
812                plugin.deactivate_venv(debug=debug, **kw)
813        return success

Deactivate the virtual environments for the plugin and its dependencies.

Parameters
  • dependencies (bool, default True): If True, deactivate the virtual environments for required plugins.
Returns
  • A bool indicating success.
def install_dependencies(self, force: bool = False, debug: bool = False) -> bool:
816    def install_dependencies(
817            self,
818            force: bool = False,
819            debug: bool = False,
820        ) -> bool:
821        """
822        If specified, install dependencies.
823        
824        **NOTE:** Dependencies that start with `'plugin:'` will be installed as
825        Meerschaum plugins from the same repository as this Plugin.
826        To install from a different repository, add the repo keys after `'@'`
827        (e.g. `'plugin:foo@api:bar'`).
828
829        Parameters
830        ----------
831        force: bool, default False
832            If `True`, continue with the installation, even if some
833            required packages fail to install.
834
835        debug: bool, default False
836            Verbosity toggle.
837
838        Returns
839        -------
840        A bool indicating success.
841
842        """
843        from meerschaum.utils.packages import pip_install, venv_contains_package
844        from meerschaum.utils.debug import dprint
845        from meerschaum.utils.warnings import warn, info
846        from meerschaum.connectors.parse import parse_repo_keys
847        _deps = self.get_dependencies(debug=debug)
848        if not _deps and self.requirements_file_path is None:
849            return True
850
851        plugins = self.get_required_plugins(debug=debug)
852        for _plugin in plugins:
853            if _plugin.name == self.name:
854                warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False)
855                continue
856            _success, _msg = _plugin.repo_connector.install_plugin(
857                _plugin.name, debug=debug, force=force
858            )
859            if not _success:
860                warn(
861                    f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'"
862                    + f" for plugin '{self.name}':\n" + _msg,
863                    stack = False,
864                )
865                if not force:
866                    warn(
867                        "Try installing with the `--force` flag to continue anyway.",
868                        stack = False,
869                    )
870                    return False
871                info(
872                    "Continuing with installation despite the failure "
873                    + "(careful, things might be broken!)...",
874                    icon = False
875                )
876
877
878        ### First step: parse `requirements.txt` if it exists.
879        if self.requirements_file_path is not None:
880            if not pip_install(
881                requirements_file_path=self.requirements_file_path,
882                venv=self.name, debug=debug
883            ):
884                warn(
885                    f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.",
886                    stack = False,
887                )
888                if not force:
889                    warn(
890                        "Try installing with `--force` to continue anyway.",
891                        stack = False,
892                    )
893                    return False
894                info(
895                    "Continuing with installation despite the failure "
896                    + "(careful, things might be broken!)...",
897                    icon = False
898                )
899
900
901        ### Don't reinstall packages that are already included in required plugins.
902        packages = []
903        _packages = self.get_required_packages(debug=debug)
904        accounted_for_packages = set()
905        for package_name in _packages:
906            for plugin in plugins:
907                if venv_contains_package(package_name, plugin.name):
908                    accounted_for_packages.add(package_name)
909                    break
910        packages = [pkg for pkg in _packages if pkg not in accounted_for_packages]
911
912        ### Attempt pip packages installation.
913        if packages:
914            for package in packages:
915                if not pip_install(package, venv=self.name, debug=debug):
916                    warn(
917                        f"Failed to install required package '{package}'"
918                        + f" for plugin '{self.name}'.",
919                        stack = False,
920                    )
921                    if not force:
922                        warn(
923                            "Try installing with `--force` to continue anyway.",
924                            stack = False,
925                        )
926                        return False
927                    info(
928                        "Continuing with installation despite the failure "
929                        + "(careful, things might be broken!)...",
930                        icon = False
931                    )
932        return True

If specified, install dependencies.

NOTE: Dependencies that start with 'plugin:' will be installed as Meerschaum plugins from the same repository as this Plugin. To install from a different repository, add the repo keys after '@' (e.g. 'plugin:foo@api:bar').

Parameters
  • force (bool, default False): If True, continue with the installation, even if some required packages fail to install.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool indicating success.
full_name: str
935    @property
936    def full_name(self) -> str:
937        """
938        Include the repo keys with the plugin's name.
939        """
940        from meerschaum.config.static import STATIC_CONFIG
941        sep = STATIC_CONFIG['plugins']['repo_separator']
942        return self.name + sep + str(self.repo_connector)

Include the repo keys with the plugin's name.

class Venv:
 18class Venv:
 19    """
 20    Manage a virtual enviroment's activation status.
 21
 22    Examples
 23    --------
 24    >>> from meerschaum.plugins import Plugin
 25    >>> with Venv('mrsm') as venv:
 26    ...     import pandas
 27    >>> with Venv(Plugin('noaa')) as venv:
 28    ...     import requests
 29    >>> venv = Venv('mrsm')
 30    >>> venv.activate()
 31    True
 32    >>> venv.deactivate()
 33    True
 34    >>> 
 35    """
 36
 37    def __init__(
 38            self,
 39            venv: Union[str, 'meerschaum.plugins.Plugin', None] = 'mrsm',
 40            debug: bool = False,
 41        ) -> None:
 42        from meerschaum.utils.venv import activate_venv, deactivate_venv, active_venvs
 43        ### For some weird threading issue,
 44        ### we can't use `isinstance` here.
 45        if 'meerschaum.plugins._Plugin' in str(type(venv)):
 46            self._venv = venv.name
 47            self._activate = venv.activate_venv
 48            self._deactivate = venv.deactivate_venv
 49            self._kwargs = {}
 50        else:
 51            self._venv = venv
 52            self._activate = activate_venv
 53            self._deactivate = deactivate_venv
 54            self._kwargs = {'venv': venv}
 55        self._debug = debug
 56        ### In case someone calls `deactivate()` before `activate()`.
 57        self._kwargs['previously_active_venvs'] = copy.deepcopy(active_venvs)
 58
 59
 60    def activate(self, debug: bool = False) -> bool:
 61        """
 62        Activate this virtual environment.
 63        If a `meerschaum.plugins.Plugin` was provided, its dependent virtual environments
 64        will also be activated.
 65        """
 66        from meerschaum.utils.venv import active_venvs
 67        self._kwargs['previously_active_venvs'] = copy.deepcopy(active_venvs)
 68        return self._activate(debug=(debug or self._debug), **self._kwargs)
 69
 70
 71    def deactivate(self, debug: bool = False) -> bool:
 72        """
 73        Deactivate this virtual environment.
 74        If a `meerschaum.plugins.Plugin` was provided, its dependent virtual environments
 75        will also be deactivated.
 76        """
 77        return self._deactivate(debug=(debug or self._debug), **self._kwargs)
 78
 79
 80    @property
 81    def target_path(self) -> pathlib.Path:
 82        """
 83        Return the target site-packages path for this virtual environment.
 84        A `meerschaum.utils.venv.Venv` may have one virtual environment per minor Python version
 85        (e.g. Python 3.10 and Python 3.7).
 86        """
 87        from meerschaum.utils.venv import venv_target_path
 88        return venv_target_path(venv=self._venv, allow_nonexistent=True, debug=self._debug)
 89
 90
 91    @property
 92    def root_path(self) -> pathlib.Path:
 93        """
 94        Return the top-level path for this virtual environment.
 95        """
 96        from meerschaum.config._paths import VIRTENV_RESOURCES_PATH
 97        if self._venv is None:
 98            return self.target_path.parent
 99        return VIRTENV_RESOURCES_PATH / self._venv
100
101
102    def __enter__(self) -> None:
103        self.activate(debug=self._debug)
104
105
106    def __exit__(self, exc_type, exc_value, exc_traceback) -> None:
107        self.deactivate(debug=self._debug)
108
109
110    def __str__(self) -> str:
111        quote = "'" if self._venv is not None else ""
112        return "Venv(" + quote + str(self._venv) + quote + ")"
113
114
115    def __repr__(self) -> str:
116        return self.__str__()

Manage a virtual enviroment's activation status.

Examples
>>> from meerschaum.plugins import Plugin
>>> with Venv('mrsm') as venv:
...     import pandas
>>> with Venv(Plugin('noaa')) as venv:
...     import requests
>>> venv = Venv('mrsm')
>>> venv.activate()
True
>>> venv.deactivate()
True
>>>
Venv( venv: Union[str, Plugin, NoneType] = 'mrsm', debug: bool = False)
37    def __init__(
38            self,
39            venv: Union[str, 'meerschaum.plugins.Plugin', None] = 'mrsm',
40            debug: bool = False,
41        ) -> None:
42        from meerschaum.utils.venv import activate_venv, deactivate_venv, active_venvs
43        ### For some weird threading issue,
44        ### we can't use `isinstance` here.
45        if 'meerschaum.plugins._Plugin' in str(type(venv)):
46            self._venv = venv.name
47            self._activate = venv.activate_venv
48            self._deactivate = venv.deactivate_venv
49            self._kwargs = {}
50        else:
51            self._venv = venv
52            self._activate = activate_venv
53            self._deactivate = deactivate_venv
54            self._kwargs = {'venv': venv}
55        self._debug = debug
56        ### In case someone calls `deactivate()` before `activate()`.
57        self._kwargs['previously_active_venvs'] = copy.deepcopy(active_venvs)
def activate(self, debug: bool = False) -> bool:
60    def activate(self, debug: bool = False) -> bool:
61        """
62        Activate this virtual environment.
63        If a `meerschaum.plugins.Plugin` was provided, its dependent virtual environments
64        will also be activated.
65        """
66        from meerschaum.utils.venv import active_venvs
67        self._kwargs['previously_active_venvs'] = copy.deepcopy(active_venvs)
68        return self._activate(debug=(debug or self._debug), **self._kwargs)

Activate this virtual environment. If a Plugin was provided, its dependent virtual environments will also be activated.

def deactivate(self, debug: bool = False) -> bool:
71    def deactivate(self, debug: bool = False) -> bool:
72        """
73        Deactivate this virtual environment.
74        If a `meerschaum.plugins.Plugin` was provided, its dependent virtual environments
75        will also be deactivated.
76        """
77        return self._deactivate(debug=(debug or self._debug), **self._kwargs)

Deactivate this virtual environment. If a Plugin was provided, its dependent virtual environments will also be deactivated.

target_path: pathlib.Path
80    @property
81    def target_path(self) -> pathlib.Path:
82        """
83        Return the target site-packages path for this virtual environment.
84        A `meerschaum.utils.venv.Venv` may have one virtual environment per minor Python version
85        (e.g. Python 3.10 and Python 3.7).
86        """
87        from meerschaum.utils.venv import venv_target_path
88        return venv_target_path(venv=self._venv, allow_nonexistent=True, debug=self._debug)

Return the target site-packages path for this virtual environment. A Venv may have one virtual environment per minor Python version (e.g. Python 3.10 and Python 3.7).

root_path: pathlib.Path
91    @property
92    def root_path(self) -> pathlib.Path:
93        """
94        Return the top-level path for this virtual environment.
95        """
96        from meerschaum.config._paths import VIRTENV_RESOURCES_PATH
97        if self._venv is None:
98            return self.target_path.parent
99        return VIRTENV_RESOURCES_PATH / self._venv

Return the top-level path for this virtual environment.

def pprint( *args, detect_password: bool = True, nopretty: bool = False, **kw) -> None:
 10def pprint(
 11        *args,
 12        detect_password: bool = True,
 13        nopretty: bool = False,
 14        **kw
 15    ) -> None:
 16    """Pretty print an object according to the configured ANSI and UNICODE settings.
 17    If detect_password is True (default), search and replace passwords with '*' characters.
 18    Does not mutate objects.
 19    """
 20    from meerschaum.utils.packages import attempt_import, import_rich
 21    from meerschaum.utils.formatting import ANSI, UNICODE, get_console, print_tuple
 22    from meerschaum.utils.warnings import error
 23    from meerschaum.utils.misc import replace_password, dict_from_od, filter_keywords
 24    from collections import OrderedDict
 25    import copy, json
 26
 27    if (
 28        len(args) == 1
 29        and
 30        isinstance(args[0], tuple)
 31        and
 32        len(args[0]) == 2
 33        and
 34        isinstance(args[0][0], bool)
 35        and
 36        isinstance(args[0][1], str)
 37    ):
 38        return print_tuple(args[0])
 39
 40    modify = True
 41    rich_pprint = None
 42    if ANSI and not nopretty:
 43        rich = import_rich()
 44        if rich is not None:
 45            rich_pretty = attempt_import('rich.pretty')
 46        if rich_pretty is not None:
 47            def _rich_pprint(*args, **kw):
 48                _console = get_console()
 49                _kw = filter_keywords(_console.print, **kw)
 50                _console.print(*args, **_kw)
 51            rich_pprint = _rich_pprint
 52    elif not nopretty:
 53        pprintpp = attempt_import('pprintpp', warn=False)
 54        try:
 55            _pprint = pprintpp.pprint
 56        except Exception as e:
 57            import pprint as _pprint_module
 58            _pprint = _pprint_module.pprint
 59
 60    func = (
 61        _pprint if rich_pprint is None else rich_pprint
 62    ) if not nopretty else print
 63
 64    try:
 65        args_copy = copy.deepcopy(args)
 66    except Exception as e:
 67        args_copy = args
 68        modify = False
 69    _args = []
 70    for a in args:
 71        c = a
 72        ### convert OrderedDict into dict
 73        if isinstance(a, OrderedDict) or issubclass(type(a), OrderedDict):
 74            c = dict_from_od(copy.deepcopy(c))
 75        _args.append(c)
 76    args = _args
 77
 78    _args = list(args)
 79    if detect_password and modify:
 80        _args = []
 81        for a in args:
 82            c = a
 83            if isinstance(c, dict):
 84                c = replace_password(copy.deepcopy(c))
 85            if nopretty:
 86                try:
 87                    c = json.dumps(c)
 88                    is_json = True
 89                except Exception as e:
 90                    is_json = False
 91                if not is_json:
 92                    try:
 93                        c = str(c)
 94                    except Exception as e:
 95                        pass
 96            _args.append(c)
 97
 98    ### filter out unsupported keywords
 99    func_kw = filter_keywords(func, **kw) if not nopretty else {}
100    error_msg = None
101    try:
102        func(*_args, **func_kw)
103    except Exception as e:
104        error_msg = e
105    if error_msg is not None:
106        error(error_msg)

Pretty print an object according to the configured ANSI and UNICODE settings. If detect_password is True (default), search and replace passwords with '*' characters. Does not mutate objects.

def attempt_import( *names: str, lazy: bool = True, warn: bool = True, install: bool = True, venv: Optional[str] = 'mrsm', precheck: bool = True, split: bool = True, check_update: bool = False, check_pypi: bool = False, check_is_installed: bool = True, allow_outside_venv: bool = True, color: bool = True, debug: bool = False) -> Any:
1197def attempt_import(
1198        *names: str,
1199        lazy: bool = True,
1200        warn: bool = True,
1201        install: bool = True,
1202        venv: Optional[str] = 'mrsm',
1203        precheck: bool = True,
1204        split: bool = True,
1205        check_update: bool = False,
1206        check_pypi: bool = False,
1207        check_is_installed: bool = True,
1208        allow_outside_venv: bool = True,
1209        color: bool = True,
1210        debug: bool = False
1211    ) -> Any:
1212    """
1213    Raise a warning if packages are not installed; otherwise import and return modules.
1214    If `lazy` is `True`, return lazy-imported modules.
1215    
1216    Returns tuple of modules if multiple names are provided, else returns one module.
1217    
1218    Parameters
1219    ----------
1220    names: List[str]
1221        The packages to be imported.
1222
1223    lazy: bool, default True
1224        If `True`, lazily load packages.
1225
1226    warn: bool, default True
1227        If `True`, raise a warning if a package cannot be imported.
1228
1229    install: bool, default True
1230        If `True`, attempt to install a missing package into the designated virtual environment.
1231        If `check_update` is True, install updates if available.
1232
1233    venv: Optional[str], default 'mrsm'
1234        The virtual environment in which to search for packages and to install packages into.
1235
1236    precheck: bool, default True
1237        If `True`, attempt to find module before importing (necessary for checking if modules exist
1238        and retaining lazy imports), otherwise assume lazy is `False`.
1239
1240    split: bool, default True
1241        If `True`, split packages' names on `'.'`.
1242
1243    check_update: bool, default False
1244        If `True` and `install` is `True`, install updates if the required minimum version
1245        does not match.
1246
1247    check_pypi: bool, default False
1248        If `True` and `check_update` is `True`, check PyPI when determining whether
1249        an update is required.
1250
1251    check_is_installed: bool, default True
1252        If `True`, check if the package is contained in the virtual environment.
1253
1254    allow_outside_venv: bool, default True
1255        If `True`, search outside of the specified virtual environment
1256        if the package cannot be found.
1257        Setting to `False` will reinstall the package into a virtual environment, even if it
1258        is installed outside.
1259
1260    color: bool, default True
1261        If `False`, do not print ANSI colors.
1262
1263    Returns
1264    -------
1265    The specified modules. If they're not available and `install` is `True`, it will first
1266    download them into a virtual environment and return the modules.
1267
1268    Examples
1269    --------
1270    >>> pandas, sqlalchemy = attempt_import('pandas', 'sqlalchemy')
1271    >>> pandas = attempt_import('pandas')
1272
1273    """
1274
1275    import importlib.util
1276
1277    ### to prevent recursion, check if parent Meerschaum package is being imported
1278    if names == ('meerschaum',):
1279        return _import_module('meerschaum')
1280
1281    if venv == 'mrsm' and _import_hook_venv is not None:
1282        if debug:
1283            print(f"Import hook for virtual environment '{_import_hook_venv}' is active.")
1284        venv = _import_hook_venv
1285
1286    _warnings = _import_module('meerschaum.utils.warnings')
1287    warn_function = _warnings.warn
1288
1289    def do_import(_name: str, **kw) -> Union['ModuleType', None]:
1290        with Venv(venv=venv, debug=debug):
1291            ### determine the import method (lazy vs normal)
1292            from meerschaum.utils.misc import filter_keywords
1293            import_method = (
1294                _import_module if not lazy
1295                else lazy_import
1296            )
1297            try:
1298                mod = import_method(_name, **(filter_keywords(import_method, **kw)))
1299            except Exception as e:
1300                if warn:
1301                    import traceback
1302                    traceback.print_exception(type(e), e, e.__traceback__)
1303                    warn_function(
1304                        f"Failed to import module '{_name}'.\nException:\n{e}",
1305                        ImportWarning,
1306                        stacklevel = (5 if lazy else 4),
1307                        color = False,
1308                    )
1309                mod = None
1310        return mod
1311
1312    modules = []
1313    for name in names:
1314        ### Check if package is a declared dependency.
1315        root_name = name.split('.')[0] if split else name
1316        install_name = _import_to_install_name(root_name)
1317
1318        if install_name is None:
1319            install_name = root_name
1320            if warn and root_name != 'plugins':
1321                warn_function(
1322                    f"Package '{root_name}' is not declared in meerschaum.utils.packages.",
1323                    ImportWarning,
1324                    stacklevel = 3,
1325                    color = False
1326                )
1327
1328        ### Determine if the package exists.
1329        if precheck is False:
1330            found_module = (
1331                do_import(
1332                    name, debug=debug, warn=False, venv=venv, color=color,
1333                    check_update=False, check_pypi=False, split=split,
1334                ) is not None
1335            )
1336        else:
1337            if check_is_installed:
1338                with _locks['_is_installed_first_check']:
1339                    if not _is_installed_first_check.get(name, False):
1340                        package_is_installed = is_installed(
1341                            name,
1342                            venv = venv,
1343                            split = split,
1344                            allow_outside_venv = allow_outside_venv,
1345                            debug = debug,
1346                        )
1347                        _is_installed_first_check[name] = package_is_installed
1348                    else:
1349                        package_is_installed = _is_installed_first_check[name]
1350            else:
1351                package_is_installed = _is_installed_first_check.get(
1352                    name,
1353                    venv_contains_package(name, venv=venv, split=split, debug=debug)
1354                )
1355            found_module = package_is_installed
1356
1357        if not found_module:
1358            if install:
1359                if not pip_install(
1360                    install_name,
1361                    venv = venv,
1362                    split = False,
1363                    check_update = check_update,
1364                    color = color,
1365                    debug = debug
1366                ) and warn:
1367                    warn_function(
1368                        f"Failed to install '{install_name}'.",
1369                        ImportWarning,
1370                        stacklevel = 3,
1371                        color = False,
1372                    )
1373            elif warn:
1374                ### Raise a warning if we can't find the package and install = False.
1375                warn_function(
1376                    (f"\n\nMissing package '{name}' from virtual environment '{venv}'; "
1377                     + "some features will not work correctly."
1378                     + f"\n\nSet install=True when calling attempt_import.\n"),
1379                    ImportWarning,
1380                    stacklevel = 3,
1381                    color = False,
1382                )
1383
1384        ### Do the import. Will be lazy if lazy=True.
1385        m = do_import(
1386            name, debug=debug, warn=warn, venv=venv, color=color,
1387            check_update=check_update, check_pypi=check_pypi, install=install, split=split,
1388        )
1389        modules.append(m)
1390
1391    modules = tuple(modules)
1392    if len(modules) == 1:
1393        return modules[0]
1394    return modules

Raise a warning if packages are not installed; otherwise import and return modules. If lazy is True, return lazy-imported modules.

Returns tuple of modules if multiple names are provided, else returns one module.

Parameters
  • names (List[str]): The packages to be imported.
  • lazy (bool, default True): If True, lazily load packages.
  • warn (bool, default True): If True, raise a warning if a package cannot be imported.
  • install (bool, default True): If True, attempt to install a missing package into the designated virtual environment. If check_update is True, install updates if available.
  • venv (Optional[str], default 'mrsm'): The virtual environment in which to search for packages and to install packages into.
  • precheck (bool, default True): If True, attempt to find module before importing (necessary for checking if modules exist and retaining lazy imports), otherwise assume lazy is False.
  • split (bool, default True): If True, split packages' names on '.'.
  • check_update (bool, default False): If True and install is True, install updates if the required minimum version does not match.
  • check_pypi (bool, default False): If True and check_update is True, check PyPI when determining whether an update is required.
  • check_is_installed (bool, default True): If True, check if the package is contained in the virtual environment.
  • allow_outside_venv (bool, default True): If True, search outside of the specified virtual environment if the package cannot be found. Setting to False will reinstall the package into a virtual environment, even if it is installed outside.
  • color (bool, default True): If False, do not print ANSI colors.
Returns
  • The specified modules. If they're not available and install is True, it will first
  • download them into a virtual environment and return the modules.
Examples
>>> pandas, sqlalchemy = attempt_import('pandas', 'sqlalchemy')
>>> pandas = attempt_import('pandas')
SuccessTuple = typing.Tuple[bool, str]
class Connector:
 20class Connector(metaclass=abc.ABCMeta):
 21    """
 22    The base connector class to hold connection attributes.
 23    """
 24    def __init__(
 25            self,
 26            type: Optional[str] = None,
 27            label: Optional[str] = None,
 28            **kw: Any
 29        ):
 30        """
 31        Set the given keyword arguments as attributes.
 32
 33        Parameters
 34        ----------
 35        type: str
 36            The `type` of the connector (e.g. `sql`, `api`, `plugin`).
 37
 38        label: str
 39            The `label` for the connector.
 40
 41
 42        Examples
 43        --------
 44        Run `mrsm edit config` and to edit connectors in the YAML file:
 45
 46        ```yaml
 47        meerschaum:
 48            connections:
 49                {type}:
 50                    {label}:
 51                        ### attributes go here
 52        ```
 53
 54        """
 55        self._original_dict = copy.deepcopy(self.__dict__)
 56        self._set_attributes(type=type, label=label, **kw)
 57        self.verify_attributes(getattr(self, 'REQUIRED_ATTRIBUTES', None))
 58
 59    def _reset_attributes(self):
 60        self.__dict__ = self._original_dict
 61
 62    def _set_attributes(
 63            self,
 64            *args,
 65            inherit_default: bool = True,
 66            **kw: Any
 67        ):
 68        from meerschaum.config.static import STATIC_CONFIG
 69        from meerschaum.utils.warnings import error
 70
 71        self._attributes = {}
 72
 73        default_label = STATIC_CONFIG['connectors']['default_label']
 74
 75        ### NOTE: Support the legacy method of explicitly passing the type.
 76        label = kw.get('label', None)
 77        if label is None:
 78            if len(args) == 2:
 79                label = args[1]
 80            elif len(args) == 0:
 81                label = None
 82            else:
 83                label = args[0]
 84
 85        if label == 'default':
 86            error(
 87                f"Label cannot be 'default'. Did you mean '{default_label}'?",
 88                InvalidAttributesError,
 89            )
 90        self.__dict__['label'] = label
 91
 92        from meerschaum.config import get_config
 93        conn_configs = copy.deepcopy(get_config('meerschaum', 'connectors'))
 94        connector_config = copy.deepcopy(get_config('system', 'connectors'))
 95
 96        ### inherit attributes from 'default' if exists
 97        if inherit_default:
 98            inherit_from = 'default'
 99            if self.type in conn_configs and inherit_from in conn_configs[self.type]:
100                _inherit_dict = copy.deepcopy(conn_configs[self.type][inherit_from])
101                self._attributes.update(_inherit_dict)
102
103        ### load user config into self._attributes
104        if self.type in conn_configs and self.label in conn_configs[self.type]:
105            self._attributes.update(conn_configs[self.type][self.label])
106
107        ### load system config into self._sys_config
108        ### (deep copy so future Connectors don't inherit changes)
109        if self.type in connector_config:
110            self._sys_config = copy.deepcopy(connector_config[self.type])
111
112        ### add additional arguments or override configuration
113        self._attributes.update(kw)
114
115        ### finally, update __dict__ with _attributes.
116        self.__dict__.update(self._attributes)
117
118
119    def verify_attributes(
120            self,
121            required_attributes: Optional[List[str]] = None,
122            debug: bool = False
123        ) -> None:
124        """
125        Ensure that the required attributes have been met.
126        
127        The Connector base class checks the minimum requirements.
128        Child classes may enforce additional requirements.
129
130        Parameters
131        ----------
132        required_attributes: Optional[List[str]], default None
133            Attributes to be verified. If `None`, default to `['label']`.
134
135        debug: bool, default False
136            Verbosity toggle.
137
138        Returns
139        -------
140        Don't return anything.
141
142        Raises
143        ------
144        An error if any of the required attributes are missing.
145        """
146        from meerschaum.utils.warnings import error, warn
147        from meerschaum.utils.debug import dprint
148        from meerschaum.utils.misc import items_str
149        if required_attributes is None:
150            required_attributes = ['label']
151        missing_attributes = set()
152        for a in required_attributes:
153            if a not in self.__dict__:
154                missing_attributes.add(a)
155        if len(missing_attributes) > 0:
156            error(
157                (
158                    f"Missing {items_str(list(missing_attributes))} "
159                    + f"for connector '{self.type}:{self.label}'."
160                ),
161                InvalidAttributesError,
162                silent = True,
163                stack = False
164            )
165
166
167    def __str__(self):
168        """
169        When cast to a string, return type:label.
170        """
171        return f"{self.type}:{self.label}"
172
173    def __repr__(self):
174        """
175        Represent the connector as type:label.
176        """
177        return str(self)
178
179    @property
180    def meta(self) -> Dict[str, Any]:
181        """
182        Return the keys needed to reconstruct this Connector.
183        """
184        _meta = {
185            key: value
186            for key, value in self.__dict__.items()
187            if not str(key).startswith('_')
188        }
189        _meta.update({
190            'type': self.type,
191            'label': self.label,
192        })
193        return _meta
194
195
196    @property
197    def type(self) -> str:
198        """
199        Return the type for this connector.
200        """
201        _type = self.__dict__.get('type', None)
202        if _type is None:
203            import re
204            _type = re.sub(r'connector$', '', self.__class__.__name__.lower())
205            self.__dict__['type'] = _type
206        return _type
207
208
209    @property
210    def label(self) -> str:
211        """
212        Return the label for this connector.
213        """
214        _label = self.__dict__.get('label', None)
215        if _label is None:
216            from meerschaum.config.static import STATIC_CONFIG
217            _label = STATIC_CONFIG['connectors']['default_label']
218            self.__dict__['label'] = _label
219        return _label

The base connector class to hold connection attributes.

Connector(type: Optional[str] = None, label: Optional[str] = None, **kw: Any)
24    def __init__(
25            self,
26            type: Optional[str] = None,
27            label: Optional[str] = None,
28            **kw: Any
29        ):
30        """
31        Set the given keyword arguments as attributes.
32
33        Parameters
34        ----------
35        type: str
36            The `type` of the connector (e.g. `sql`, `api`, `plugin`).
37
38        label: str
39            The `label` for the connector.
40
41
42        Examples
43        --------
44        Run `mrsm edit config` and to edit connectors in the YAML file:
45
46        ```yaml
47        meerschaum:
48            connections:
49                {type}:
50                    {label}:
51                        ### attributes go here
52        ```
53
54        """
55        self._original_dict = copy.deepcopy(self.__dict__)
56        self._set_attributes(type=type, label=label, **kw)
57        self.verify_attributes(getattr(self, 'REQUIRED_ATTRIBUTES', None))

Set the given keyword arguments as attributes.

Parameters
  • type (str): The type of the connector (e.g. sql, api, plugin).
  • label (str): The label for the connector.
Examples

Run mrsm edit config and to edit connectors in the YAML file:

meerschaum:
    connections:
        {type}:
            {label}:
                ### attributes go here
def verify_attributes( self, required_attributes: Optional[List[str]] = None, debug: bool = False) -> None:
119    def verify_attributes(
120            self,
121            required_attributes: Optional[List[str]] = None,
122            debug: bool = False
123        ) -> None:
124        """
125        Ensure that the required attributes have been met.
126        
127        The Connector base class checks the minimum requirements.
128        Child classes may enforce additional requirements.
129
130        Parameters
131        ----------
132        required_attributes: Optional[List[str]], default None
133            Attributes to be verified. If `None`, default to `['label']`.
134
135        debug: bool, default False
136            Verbosity toggle.
137
138        Returns
139        -------
140        Don't return anything.
141
142        Raises
143        ------
144        An error if any of the required attributes are missing.
145        """
146        from meerschaum.utils.warnings import error, warn
147        from meerschaum.utils.debug import dprint
148        from meerschaum.utils.misc import items_str
149        if required_attributes is None:
150            required_attributes = ['label']
151        missing_attributes = set()
152        for a in required_attributes:
153            if a not in self.__dict__:
154                missing_attributes.add(a)
155        if len(missing_attributes) > 0:
156            error(
157                (
158                    f"Missing {items_str(list(missing_attributes))} "
159                    + f"for connector '{self.type}:{self.label}'."
160                ),
161                InvalidAttributesError,
162                silent = True,
163                stack = False
164            )

Ensure that the required attributes have been met.

The Connector base class checks the minimum requirements. Child classes may enforce additional requirements.

Parameters
  • required_attributes (Optional[List[str]], default None): Attributes to be verified. If None, default to ['label'].
  • debug (bool, default False): Verbosity toggle.
Returns
  • Don't return anything.
Raises
  • An error if any of the required attributes are missing.
meta: Dict[str, Any]
179    @property
180    def meta(self) -> Dict[str, Any]:
181        """
182        Return the keys needed to reconstruct this Connector.
183        """
184        _meta = {
185            key: value
186            for key, value in self.__dict__.items()
187            if not str(key).startswith('_')
188        }
189        _meta.update({
190            'type': self.type,
191            'label': self.label,
192        })
193        return _meta

Return the keys needed to reconstruct this Connector.

type: str
196    @property
197    def type(self) -> str:
198        """
199        Return the type for this connector.
200        """
201        _type = self.__dict__.get('type', None)
202        if _type is None:
203            import re
204            _type = re.sub(r'connector$', '', self.__class__.__name__.lower())
205            self.__dict__['type'] = _type
206        return _type

Return the type for this connector.

label: str
209    @property
210    def label(self) -> str:
211        """
212        Return the label for this connector.
213        """
214        _label = self.__dict__.get('label', None)
215        if _label is None:
216            from meerschaum.config.static import STATIC_CONFIG
217            _label = STATIC_CONFIG['connectors']['default_label']
218            self.__dict__['label'] = _label
219        return _label

Return the label for this connector.

def make_connector(cls):
278def make_connector(
279        cls,
280    ):
281    """
282    Register a class as a `Connector`.
283    The `type` will be the lower case of the class name, without the suffix `connector`.
284
285    Parameters
286    ----------
287    instance: bool, default False
288        If `True`, make this connector type an instance connector.
289        This requires implementing the various pipes functions and lots of testing.
290
291    Examples
292    --------
293    >>> import meerschaum as mrsm
294    >>> from meerschaum.connectors import make_connector, Connector
295    >>> 
296    >>> @make_connector
297    >>> class FooConnector(Connector):
298    ...     REQUIRED_ATTRIBUTES: list[str] = ['username', 'password']
299    ... 
300    >>> conn = mrsm.get_connector('foo:bar', username='dog', password='cat')
301    >>> print(conn.username, conn.password)
302    dog cat
303    >>> 
304    """
305    import re
306    typ = re.sub(r'connector$', '', cls.__name__.lower())
307    with _locks['types']:
308        types[typ] = cls
309    with _locks['custom_types']:
310        custom_types.add(typ)
311    with _locks['connectors']:
312        if typ not in connectors:
313            connectors[typ] = {}
314    if getattr(cls, 'IS_INSTANCE', False):
315        with _locks['instance_types']:
316            if typ not in instance_types:
317                instance_types.append(typ)
318
319    return cls

Register a class as a Connector. The type will be the lower case of the class name, without the suffix connector.

Parameters
  • instance (bool, default False): If True, make this connector type an instance connector. This requires implementing the various pipes functions and lots of testing.
Examples
>>> import meerschaum as mrsm
>>> from meerschaum.connectors import make_connector, Connector
>>> 
>>> @make_connector
>>> class FooConnector(Connector):
...     REQUIRED_ATTRIBUTES: list[str] = ['username', 'password']
... 
>>> conn = mrsm.get_connector('foo:bar', username='dog', password='cat')
>>> print(conn.username, conn.password)
dog cat
>>>