meerschaum

Meerschaum banner

PyPI GitHub Info Stats
PyPI GitHub Repo stars License Number of plugins
PyPI - Python Version GitHub Sponsors meerschaum Tutorials Number of registered users

Meerschaum demo

What is Meerschaum?

Meerschaum is a tool for quickly synchronizing time-series data streams called pipes. With Meerschaum, you can have a data visualization stack running in minutes.

Why Meerschaum?

If you've worked with time-series data, you know the headaches that come with ETL. Data engineering often gets in analysts' way, and when work needs to get done, every minute spent on pipelining is time taken away from real analysis.

Rather than copy / pasting your ETL scripts, simply build pipes with Meerschaum! Meerschaum gives you the tools to design your data streams how you like โ€• and don't worry โ€” you can always incorporate Meerschaum into your existing systems!

Features

  • ๐Ÿ“Š Built for Data Scientists and Analysts
    • Integrate with Pandas, Grafana and other popular data analysis tools.
    • Persist your dataframes and always get the latest data.
  • โšก๏ธ Production-Ready, Batteries Included
  • ๐Ÿ”Œ Easily Expandable
  • โœจ Tailored for Your Experience
    • Rich CLI makes managing your data streams surprisingly enjoyable!
    • Web dashboard for those who prefer a more graphical experience.
    • Manage your database connections with Meerschaum connectors
    • Utility commands with sensible syntax let you control many pipes with grace.
  • ๐Ÿ’ผ Portable from the Start
    • The environment variables $MRSM_ROOT_DIR, $MRSM_PLUGINS_DIR, and $MRSM_VENVS_DIR let you emulate multiple installations and group together your instances.
    • No dependencies required; anything needed will be installed into virtual environments.
    • Specify required packages for your plugins, and users will get those packages in a virtual environment.

Installation

For a more thorough setup guide, visit the Getting Started page at meerschaum.io.

TL;DR

pip install -U --user meerschaum
mrsm stack up -d db grafana
mrsm bootstrap pipes

Usage Documentation

Please visit meerschaum.io for setup, usage, and troubleshooting information. You can find technical documentation at docs.meerschaum.io, and here is a complete list of the Meerschaum actions.

>>> import meerschaum as mrsm
>>> pipe = mrsm.Pipe("plugin:noaa", "weather")
>>> cols_to_select = ['timestamp', 'station', 'temperature (degC)']
>>> df = pipe.get_data(cols_to_select, begin='2023-11-15', end='2023-11-20')
>>> df
              timestamp station  temperature (degC)
0   2023-11-15 00:52:00    KATL                16.1
1   2023-11-15 00:52:00    KCLT                11.7
2   2023-11-15 00:53:00    KGMU                15.0
3   2023-11-15 00:54:00    KCEU                13.9
4   2023-11-15 01:52:00    KATL                15.6
..                  ...     ...                 ...
535 2023-11-19 22:54:00    KCEU                15.6
536 2023-11-19 23:52:00    KATL                16.7
537 2023-11-19 23:52:00    KCLT                13.9
538 2023-11-19 23:53:00    KGMU                15.6
539 2023-11-19 23:54:00    KCEU                15.0

[540 rows x 3 columns]
>>>

Plugins

Check out the Awesome Meerschaum list for a list of community plugins as well as the public plugins repository.

For details on installing, using, and writing plugins, check out the plugins documentation at meerschaum.io.

Example Plugin

# ~/.config/meerschaum/plugins/example.py
__version__ = '0.0.1'
required = []

def register(pipe, **kw):
    return {
        'columns': {
            'datetime': 'dt',
            'id': 'id',
            'value': 'val',
        }
    }

def fetch(pipe, **kw):
    import random
    from datetime import datetime
    docs = [
        {
            'dt': datetime.now(),
            'id': i,
            'val': random.ranint(0, 200),
        }
        for i in range(random.randint(0, 100))
    ]
    return docs

Support Meerschaum's Development

For consulting services and to support Meerschaum's development, please considering sponsoring me on GitHub sponsors.

Additionally, you can always buy me a coffeeโ˜•!

 1#! /usr/bin/env python
 2# -*- coding: utf-8 -*-
 3# vim:fenc=utf-8
 4
 5"""
 6Copyright 2023 Bennett Meares
 7
 8Licensed under the Apache License, Version 2.0 (the "License");
 9you may not use this file except in compliance with the License.
10You may obtain a copy of the License at
11
12   http://www.apache.org/licenses/LICENSE-2.0
13
14Unless required by applicable law or agreed to in writing, software
15distributed under the License is distributed on an "AS IS" BASIS,
16WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17See the License for the specific language governing permissions and
18limitations under the License.
19"""
20
21import atexit
22from meerschaum.utils.typing import SuccessTuple
23from meerschaum.core.Pipe import Pipe
24from meerschaum.plugins import Plugin
25from meerschaum.utils.venv import Venv
26from meerschaum.connectors import get_connector
27from meerschaum.utils import get_pipes
28from meerschaum.utils.formatting import pprint
29from meerschaum._internal.docs import index as __doc__
30from meerschaum.config import __version__, get_config
31from meerschaum.utils.packages import attempt_import
32from meerschaum.__main__ import _close_pools
33
34atexit.register(_close_pools)
35
36__pdoc__ = {'gui': False, 'api': False, 'core': False, '_internal': False}
37__all__ = (
38    "get_pipes",
39    "get_connector",
40    "get_config",
41    "Pipe",
42    "Plugin",
43    "Venv",
44    "Plugin",
45    "pprint",
46    "attempt_import",
47    "actions",
48    "config",
49    "connectors",
50    "plugins",
51    "utils",
52)
def get_pipes( connector_keys: Union[str, List[str], NoneType] = None, metric_keys: Union[str, List[str], NoneType] = None, location_keys: Union[str, List[str], NoneType] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, mrsm_instance: Union[str, meerschaum.connectors.sql.SQLConnector.SQLConnector, meerschaum.connectors.api.APIConnector.APIConnector, NoneType] = None, instance: Union[str, meerschaum.connectors.sql.SQLConnector.SQLConnector, meerschaum.connectors.api.APIConnector.APIConnector, NoneType] = None, as_list: bool = False, method: str = 'registered', wait: bool = False, debug: bool = False, **kw: Any) -> Union[Dict[str, Dict[str, Dict[str, Pipe]]], List[Pipe]]:
 19def get_pipes(
 20        connector_keys: Union[str, List[str], None] = None,
 21        metric_keys: Union[str, List[str], None] = None,
 22        location_keys: Union[str, List[str], None] = None,
 23        tags: Optional[List[str]] = None,
 24        params: Optional[Dict[str, Any]] = None,
 25        mrsm_instance: Union[str, InstanceConnector, None] = None,
 26        instance: Union[str, InstanceConnector, None] = None,
 27        as_list: bool = False,
 28        method: str = 'registered',
 29        wait: bool = False,
 30        debug: bool = False,
 31        **kw: Any
 32    ) -> Union[PipesDict, List[mrsm.Pipe]]:
 33    """
 34    Return a dictionary or list of `meerschaum.Pipe` objects.
 35
 36    Parameters
 37    ----------
 38    connector_keys: Union[str, List[str], None], default None
 39        String or list of connector keys.
 40        If omitted or is `'*'`, fetch all possible keys.
 41        If a string begins with `'_'`, select keys that do NOT match the string.
 42
 43    metric_keys: Union[str, List[str], None], default None
 44        String or list of metric keys. See `connector_keys` for formatting.
 45
 46    location_keys: Union[str, List[str], None], default None
 47        String or list of location keys. See `connector_keys` for formatting.
 48
 49    tags: Optional[List[str]], default None
 50         If provided, only include pipes with these tags.
 51
 52    params: Optional[Dict[str, Any]], default None
 53        Dictionary of additional parameters to search by.
 54        Params are parsed into a SQL WHERE clause.
 55        E.g. `{'a': 1, 'b': 2}` equates to `'WHERE a = 1 AND b = 2'`
 56
 57    mrsm_instance: Union[str, InstanceConnector, None], default None
 58        Connector keys for the Meerschaum instance of the pipes.
 59        Must be a `meerschaum.connectors.sql.SQLConnector.SQLConnector` or
 60        `meerschaum.connectors.api.APIConnector.APIConnector`.
 61        
 62    as_list: bool, default False
 63        If `True`, return pipes in a list instead of a hierarchical dictionary.
 64        `False` : `{connector_keys: {metric_key: {location_key: Pipe}}}`
 65        `True`  : `[Pipe]`
 66
 67    method: str, default 'registered'
 68        Available options: `['registered', 'explicit', 'all']`
 69        If `'registered'` (default), create pipes based on registered keys in the connector's pipes table
 70        (API or SQL connector, depends on mrsm_instance).
 71        If `'explicit'`, create pipes from provided connector_keys, metric_keys, and location_keys
 72        instead of consulting the pipes table. Useful for creating non-existent pipes.
 73        If `'all'`, create pipes from predefined metrics and locations. Required `connector_keys`.
 74        **NOTE:** Method `'all'` is not implemented!
 75
 76    wait: bool, default False
 77        Wait for a connection before getting Pipes. Should only be true for cases where the
 78        database might not be running (like the API).
 79
 80    **kw: Any:
 81        Keyword arguments to pass to the `meerschaum.Pipe` constructor.
 82        
 83
 84    Returns
 85    -------
 86    A dictionary of dictionaries and `meerschaum.Pipe` objects
 87    in the connector, metric, location hierarchy.
 88    If `as_list` is `True`, return a list of `meerschaum.Pipe` objects.
 89
 90    Examples
 91    --------
 92    ```
 93    >>> ### Manual definition:
 94    >>> pipes = {
 95    ...     <connector_keys>: {
 96    ...         <metric_key>: {
 97    ...             <location_key>: Pipe(
 98    ...                 <connector_keys>,
 99    ...                 <metric_key>,
100    ...                 <location_key>,
101    ...             ),
102    ...         },
103    ...     },
104    ... },
105    >>> ### Accessing a single pipe:
106    >>> pipes['sql:main']['weather'][None]
107    >>> ### Return a list instead:
108    >>> get_pipes(as_list=True)
109    [sql_main_weather]
110    >>> 
111    ```
112    """
113
114    from meerschaum.config import get_config
115    from meerschaum.utils.warnings import error
116    from meerschaum.utils.misc import filter_keywords
117
118    if connector_keys is None:
119        connector_keys = []
120    if metric_keys is None:
121        metric_keys = []
122    if location_keys is None:
123        location_keys = []
124    if params is None:
125        params = {}
126    if tags is None:
127        tags = []
128
129    if isinstance(connector_keys, str):
130        connector_keys = [connector_keys]
131    if isinstance(metric_keys, str):
132        metric_keys = [metric_keys]
133    if isinstance(location_keys, str):
134        location_keys = [location_keys]
135
136    ### Get SQL or API connector (keys come from `connector.fetch_pipes_keys()`).
137    ### If `wait`, wait until a connection is made
138    if mrsm_instance is None:
139        mrsm_instance = instance
140    if mrsm_instance is None:
141        mrsm_instance = get_config('meerschaum', 'instance', patch=True)
142    if isinstance(mrsm_instance, str):
143        from meerschaum.connectors.parse import parse_instance_keys
144        connector = parse_instance_keys(keys=mrsm_instance, wait=wait, debug=debug)
145    else: ### NOTE: mrsm_instance MUST be a SQL or API connector for this to work
146        from meerschaum.connectors import instance_types
147        valid_connector = False
148        if hasattr(mrsm_instance, 'type'):
149            if mrsm_instance.type in instance_types:
150                valid_connector = True
151        if not valid_connector:
152            error(f"Invalid instance connector: {mrsm_instance}")
153        connector = mrsm_instance
154    if debug:
155        from meerschaum.utils.debug import dprint
156        dprint(f"Using instance connector: {connector}")
157    if not connector:
158        error(f"Could not create connector from keys: '{mrsm_instance}'")
159
160    ### Get a list of tuples for the keys needed to build pipes.
161    result = fetch_pipes_keys(
162        method,
163        connector,
164        connector_keys = connector_keys,
165        metric_keys = metric_keys,
166        location_keys = location_keys,
167        tags = tags,
168        params = params,
169        debug = debug
170    )
171    if result is None:
172        error(f"Unable to build pipes!")
173
174    ### Populate the `pipes` dictionary with Pipes based on the keys
175    ### obtained from the chosen `method`.
176    from meerschaum import Pipe
177    pipes = {}
178    for ck, mk, lk in result:
179        if ck not in pipes:
180            pipes[ck] = {}
181
182        if mk not in pipes[ck]:
183            pipes[ck][mk] = {}
184
185        pipes[ck][mk][lk] = Pipe(
186            ck, mk, lk,
187            mrsm_instance = connector,
188            debug = debug,
189            **filter_keywords(Pipe, **kw)
190        )
191
192    if not as_list:
193        return pipes
194    from meerschaum.utils.misc import flatten_pipes_dict
195    return flatten_pipes_dict(pipes)

Return a dictionary or list of meerschaum.Pipe objects.

Parameters
  • connector_keys (Union[str, List[str], None], default None): String or list of connector keys. If omitted or is '*', fetch all possible keys. If a string begins with '_', select keys that do NOT match the string.
  • metric_keys (Union[str, List[str], None], default None): String or list of metric keys. See connector_keys for formatting.
  • location_keys (Union[str, List[str], None], default None): String or list of location keys. See connector_keys for formatting.
  • tags (Optional[List[str]], default None): If provided, only include pipes with these tags.
  • params (Optional[Dict[str, Any]], default None): Dictionary of additional parameters to search by. Params are parsed into a SQL WHERE clause. E.g. {'a': 1, 'b': 2} equates to 'WHERE a = 1 AND b = 2'
  • mrsm_instance (Union[str, InstanceConnector, None], default None): Connector keys for the Meerschaum instance of the pipes. Must be a meerschaum.connectors.sql.SQLConnector.SQLConnector or meerschaum.connectors.api.APIConnector.APIConnector.
  • as_list (bool, default False): If True, return pipes in a list instead of a hierarchical dictionary. False : {connector_keys: {metric_key: {location_key: Pipe}}} True : [Pipe]
  • method (str, default 'registered'): Available options: ['registered', 'explicit', 'all'] If 'registered' (default), create pipes based on registered keys in the connector's pipes table (API or SQL connector, depends on mrsm_instance). If 'explicit', create pipes from provided connector_keys, metric_keys, and location_keys instead of consulting the pipes table. Useful for creating non-existent pipes. If 'all', create pipes from predefined metrics and locations. Required connector_keys. NOTE: Method 'all' is not implemented!
  • wait (bool, default False): Wait for a connection before getting Pipes. Should only be true for cases where the database might not be running (like the API).
  • **kw (Any:): Keyword arguments to pass to the meerschaum.Pipe constructor.
Returns
  • A dictionary of dictionaries and meerschaum.Pipe objects
  • in the connector, metric, location hierarchy.
  • If as_list is True, return a list of meerschaum.Pipe objects.
Examples
>>> ### Manual definition:
>>> pipes = {
...     <connector_keys>: {
...         <metric_key>: {
...             <location_key>: Pipe(
...                 <connector_keys>,
...                 <metric_key>,
...                 <location_key>,
...             ),
...         },
...     },
... },
>>> ### Accessing a single pipe:
>>> pipes['sql:main']['weather'][None]
>>> ### Return a list instead:
>>> get_pipes(as_list=True)
[sql_main_weather]
>>> 
def get_connector( type: str = None, label: str = None, refresh: bool = False, debug: bool = False, **kw: Any) -> meerschaum.connectors.Connector.Connector:
 65def get_connector(
 66        type: str = None,
 67        label: str = None,
 68        refresh: bool = False,
 69        debug: bool = False,
 70        **kw: Any
 71    ) -> Connector:
 72    """
 73    Return existing connector or create new connection and store for reuse.
 74    
 75    You can create new connectors if enough parameters are provided for the given type and flavor.
 76    
 77
 78    Parameters
 79    ----------
 80    type: Optional[str], default None
 81        Connector type (sql, api, etc.).
 82        Defaults to the type of the configured `instance_connector`.
 83
 84    label: Optional[str], default None
 85        Connector label (e.g. main). Defaults to `'main'`.
 86
 87    refresh: bool, default False
 88        Refresh the Connector instance / construct new object. Defaults to `False`.
 89
 90    kw: Any
 91        Other arguments to pass to the Connector constructor.
 92        If the Connector has already been constructed and new arguments are provided,
 93        `refresh` is set to `True` and the old Connector is replaced.
 94
 95    Returns
 96    -------
 97    A new Meerschaum connector (e.g. `meerschaum.connectors.api.APIConnector`,
 98    `meerschaum.connectors.sql.SQLConnector`).
 99    
100    Examples
101    --------
102    The following parameters would create a new
103    `meerschaum.connectors.sql.SQLConnector` that isn't in the configuration file.
104
105    ```
106    >>> conn = get_connector(
107    ...     type = 'sql',
108    ...     label = 'newlabel',
109    ...     flavor = 'sqlite',
110    ...     database = '/file/path/to/database.db'
111    ... )
112    >>>
113    ```
114
115    """
116    from meerschaum.connectors.parse import parse_instance_keys
117    from meerschaum.config import get_config
118    from meerschaum.config.static import STATIC_CONFIG
119    from meerschaum.utils.warnings import warn
120    global _loaded_plugin_connectors
121    if isinstance(type, str) and not label and ':' in type:
122        type, label = type.split(':', maxsplit=1)
123    with _locks['_loaded_plugin_connectors']:
124        if not _loaded_plugin_connectors:
125            load_plugin_connectors()
126            _loaded_plugin_connectors = True
127    if type is None and label is None:
128        default_instance_keys = get_config('meerschaum', 'instance', patch=True)
129        ### recursive call to get_connector
130        return parse_instance_keys(default_instance_keys)
131
132    ### NOTE: the default instance connector may not be main.
133    ### Only fall back to 'main' if the type is provided by the label is omitted.
134    label = label if label is not None else STATIC_CONFIG['connectors']['default_label']
135
136    ### type might actually be a label. Check if so and raise a warning.
137    if type not in connectors:
138        possibilities, poss_msg = [], ""
139        for _type in get_config('meerschaum', 'connectors'):
140            if type in get_config('meerschaum', 'connectors', _type):
141                possibilities.append(f"{_type}:{type}")
142        if len(possibilities) > 0:
143            poss_msg = " Did you mean"
144            for poss in possibilities[:-1]:
145                poss_msg += f" '{poss}',"
146            if poss_msg.endswith(','):
147                poss_msg = poss_msg[:-1]
148            if len(possibilities) > 1:
149                poss_msg += " or"
150            poss_msg += f" '{possibilities[-1]}'?"
151
152        warn(f"Cannot create Connector of type '{type}'." + poss_msg, stack=False)
153        return None
154
155    if 'sql' not in types:
156        from meerschaum.connectors.plugin import PluginConnector
157        with _locks['types']:
158            types.update({
159                'api'   : APIConnector,
160                'sql'   : SQLConnector,
161                'plugin': PluginConnector,
162            })
163    
164    ### determine if we need to call the constructor
165    if not refresh:
166        ### see if any user-supplied arguments differ from the existing instance
167        if label in connectors[type]:
168            warning_message = None
169            for attribute, value in kw.items():
170                if attribute not in connectors[type][label].meta:
171                    import inspect
172                    cls = connectors[type][label].__class__
173                    cls_init_signature = inspect.signature(cls)
174                    cls_init_params = cls_init_signature.parameters
175                    if attribute not in cls_init_params:
176                        warning_message = (
177                            f"Received new attribute '{attribute}' not present in connector " +
178                            f"{connectors[type][label]}.\n"
179                        )
180                elif connectors[type][label].__dict__[attribute] != value:
181                    warning_message = (
182                        f"Mismatched values for attribute '{attribute}' in connector "
183                        + f"'{connectors[type][label]}'.\n" +
184                        f"  - Keyword value: '{value}'\n" +
185                        f"  - Existing value: '{connectors[type][label].__dict__[attribute]}'\n"
186                    )
187            if warning_message is not None:
188                warning_message += (
189                    "\nSetting `refresh` to True and recreating connector with type:"
190                    + f" '{type}' and label '{label}'."
191                )
192                refresh = True
193                warn(warning_message)
194        else: ### connector doesn't yet exist
195            refresh = True
196
197    ### only create an object if refresh is True
198    ### (can be manually specified, otherwise determined above)
199    if refresh:
200        with _locks['connectors']:
201            try:
202                ### will raise an error if configuration is incorrect / missing
203                conn = types[type](label=label, **kw)
204                connectors[type][label] = conn
205            except InvalidAttributesError as ie:
206                warn(
207                    f"Incorrect attributes for connector '{type}:{label}'.\n"
208                    + str(ie),
209                    stack = False,
210                )
211                conn = None
212            except Exception as e:
213                from meerschaum.utils.formatting import get_console
214                console = get_console()
215                if console:
216                    console.print_exception()
217                warn(
218                    f"Exception when creating connector '{type}:{label}'.\n" + str(e),
219                    stack = False,
220                )
221                conn = None
222        if conn is None:
223            return None
224
225    return connectors[type][label]

Return existing connector or create new connection and store for reuse.

You can create new connectors if enough parameters are provided for the given type and flavor.

Parameters
  • type (Optional[str], default None): Connector type (sql, api, etc.). Defaults to the type of the configured instance_connector.
  • label (Optional[str], default None): Connector label (e.g. main). Defaults to 'main'.
  • refresh (bool, default False): Refresh the Connector instance / construct new object. Defaults to False.
  • kw (Any): Other arguments to pass to the Connector constructor. If the Connector has already been constructed and new arguments are provided, refresh is set to True and the old Connector is replaced.
Returns
  • A new Meerschaum connector (e.g. meerschaum.connectors.api.APIConnector,
  • meerschaum.connectors.sql.SQLConnector).
Examples

The following parameters would create a new meerschaum.connectors.sql.SQLConnector that isn't in the configuration file.

>>> conn = get_connector(
...     type = 'sql',
...     label = 'newlabel',
...     flavor = 'sqlite',
...     database = '/file/path/to/database.db'
... )
>>>
def get_config( *keys: str, patch: bool = True, substitute: bool = True, sync_files: bool = True, write_missing: bool = True, as_tuple: bool = False, warn: bool = True, debug: bool = False) -> Any:
 75def get_config(
 76        *keys: str,
 77        patch: bool = True,
 78        substitute: bool = True,
 79        sync_files: bool = True,
 80        write_missing: bool = True,
 81        as_tuple: bool = False,
 82        warn: bool = True,
 83        debug: bool = False
 84    ) -> Any:
 85    """
 86    Return the Meerschaum configuration dictionary.
 87    If positional arguments are provided, index by the keys.
 88    Raises a warning if invalid keys are provided.
 89
 90    Parameters
 91    ----------
 92    keys: str:
 93        List of strings to index.
 94
 95    patch: bool, default True
 96        If `True`, patch missing default keys into the config directory.
 97        Defaults to `True`.
 98
 99    sync_files: bool, default True
100        If `True`, sync files if needed.
101        Defaults to `True`.
102
103    write_missing: bool, default True
104        If `True`, write default values when the main config files are missing.
105        Defaults to `True`.
106
107    substitute: bool, default True
108        If `True`, subsitute 'MRSM{}' values.
109        Defaults to `True`.
110
111    as_tuple: bool, default False
112        If `True`, return a tuple of type (success, value).
113        Defaults to `False`.
114        
115    Returns
116    -------
117    The value in the configuration directory, indexed by the provided keys.
118
119    Examples
120    --------
121    >>> get_config('meerschaum', 'instance')
122    'sql:main'
123    >>> get_config('does', 'not', 'exist')
124    UserWarning: Invalid keys in config: ('does', 'not', 'exist')
125    """
126    import json
127
128    symlinks_key = STATIC_CONFIG['config']['symlinks_key']
129    if debug:
130        from meerschaum.utils.debug import dprint
131        dprint(f"Indexing keys: {keys}", color=False)
132
133    if len(keys) == 0:
134        _rc = _config(substitute=substitute, sync_files=sync_files, write_missing=write_missing)
135        if as_tuple:
136            return True, _rc 
137        return _rc
138    
139    ### Weird threading issues, only import if substitute is True.
140    if substitute:
141        from meerschaum.config._read_config import search_and_substitute_config
142    ### Invalidate the cache if it was read before with substitute=False
143    ### but there still exist substitutions.
144    if (
145        config is not None and substitute and keys[0] != symlinks_key
146        and 'MRSM{' in json.dumps(config.get(keys[0]))
147    ):
148        try:
149            _subbed = search_and_substitute_config({keys[0]: config[keys[0]]})
150        except Exception as e:
151            import traceback
152            traceback.print_exc()
153        config[keys[0]] = _subbed[keys[0]]
154        if symlinks_key in _subbed:
155            if symlinks_key not in config:
156                config[symlinks_key] = {}
157            if keys[0] not in config[symlinks_key]:
158                config[symlinks_key][keys[0]] = {}
159            config[symlinks_key][keys[0]] = apply_patch_to_config(
160                _subbed,
161                config[symlinks_key][keys[0]]
162            )
163
164    from meerschaum.config._sync import sync_files as _sync_files
165    if config is None:
166        _config(*keys, sync_files=sync_files)
167
168    invalid_keys = False
169    if keys[0] not in config and keys[0] != symlinks_key:
170        single_key_config = read_config(
171            keys=[keys[0]], substitute=substitute, write_missing=write_missing
172        )
173        if keys[0] not in single_key_config:
174            invalid_keys = True
175        else:
176            config[keys[0]] = single_key_config.get(keys[0], None)
177            if symlinks_key in single_key_config and keys[0] in single_key_config[symlinks_key]:
178                if symlinks_key not in config:
179                    config[symlinks_key] = {}
180                config[symlinks_key][keys[0]] = single_key_config[symlinks_key][keys[0]]
181
182            if sync_files:
183                _sync_files(keys=[keys[0]])
184
185    c = config
186    if len(keys) > 0:
187        for k in keys:
188            try:
189                c = c[k]
190            except Exception as e:
191                invalid_keys = True
192                break
193        if invalid_keys:
194            ### Check if the keys are in the default configuration.
195            from meerschaum.config._default import default_config
196            in_default = True
197            patched_default_config = (
198                search_and_substitute_config(default_config)
199                if substitute else copy.deepcopy(default_config)
200            )
201            _c = patched_default_config
202            for k in keys:
203                try:
204                    _c = _c[k]
205                except Exception as e:
206                    in_default = False
207            if in_default:
208                c = _c
209                invalid_keys = False
210            warning_msg = f"Invalid keys in config: {keys}"
211            if not in_default:
212                try:
213                    if warn:
214                        from meerschaum.utils.warnings import warn as _warn
215                        _warn(warning_msg, stacklevel=3, color=False)
216                except Exception as e:
217                    if warn:
218                        print(warning_msg)
219                if as_tuple:
220                    return False, None
221                return None
222
223            ### Don't write keys that we haven't yet loaded into memory.
224            not_loaded_keys = [k for k in patched_default_config if k not in config]
225            for k in not_loaded_keys:
226                patched_default_config.pop(k, None)
227
228            set_config(
229                apply_patch_to_config(
230                    patched_default_config,
231                    config,
232                )
233            )
234            if patch and keys[0] != symlinks_key:
235                if write_missing:
236                    write_config(config, debug=debug)
237
238    if as_tuple:
239        return (not invalid_keys), c
240    return c

Return the Meerschaum configuration dictionary. If positional arguments are provided, index by the keys. Raises a warning if invalid keys are provided.

Parameters
  • keys (str:): List of strings to index.
  • patch (bool, default True): If True, patch missing default keys into the config directory. Defaults to True.
  • sync_files (bool, default True): If True, sync files if needed. Defaults to True.
  • write_missing (bool, default True): If True, write default values when the main config files are missing. Defaults to True.
  • substitute (bool, default True): If True, subsitute 'MRSM{}' values. Defaults to True.
  • as_tuple (bool, default False): If True, return a tuple of type (success, value). Defaults to False.
Returns
  • The value in the configuration directory, indexed by the provided keys.
Examples
>>> get_config('meerschaum', 'instance')
'sql:main'
>>> get_config('does', 'not', 'exist')
UserWarning: Invalid keys in config: ('does', 'not', 'exist')
class Pipe:
 59class Pipe:
 60    """
 61    Access Meerschaum pipes via Pipe objects.
 62    
 63    Pipes are identified by the following:
 64
 65    1. Connector keys (e.g. `'sql:main'`)
 66    2. Metric key (e.g. `'weather'`)
 67    3. Location (optional; e.g. `None`)
 68    
 69    A pipe's connector keys correspond to a data source, and when the pipe is synced,
 70    its `fetch` definition is evaluated and executed to produce new data.
 71    
 72    Alternatively, new data may be directly synced via `pipe.sync()`:
 73    
 74    ```
 75    >>> from meerschaum import Pipe
 76    >>> pipe = Pipe('csv', 'weather')
 77    >>>
 78    >>> import pandas as pd
 79    >>> df = pd.read_csv('weather.csv')
 80    >>> pipe.sync(df)
 81    ```
 82    """
 83
 84    from ._fetch import (
 85        fetch,
 86        get_backtrack_interval,
 87    )
 88    from ._data import (
 89        get_data,
 90        get_backtrack_data,
 91        get_rowcount,
 92        _get_data_as_iterator,
 93        get_chunk_interval,
 94        get_chunk_bounds,
 95    )
 96    from ._register import register
 97    from ._attributes import (
 98        attributes,
 99        parameters,
100        columns,
101        dtypes,
102        get_columns,
103        get_columns_types,
104        get_indices,
105        tags,
106        get_id,
107        id,
108        get_val_column,
109        parents,
110        children,
111        target,
112        _target_legacy,
113        guess_datetime,
114    )
115    from ._show import show
116    from ._edit import edit, edit_definition, update
117    from ._sync import (
118        sync,
119        get_sync_time,
120        exists,
121        filter_existing,
122        _get_chunk_label,
123        get_num_workers,
124    )
125    from ._verify import (
126        verify,
127        get_bound_interval,
128        get_bound_time,
129    )
130    from ._delete import delete
131    from ._drop import drop
132    from ._clear import clear
133    from ._deduplicate import deduplicate
134    from ._bootstrap import bootstrap
135    from ._dtypes import enforce_dtypes, infer_dtypes
136
137    def __init__(
138        self,
139        connector: str = '',
140        metric: str = '',
141        location: Optional[str] = None,
142        parameters: Optional[Dict[str, Any]] = None,
143        columns: Union[Dict[str, str], List[str], None] = None,
144        tags: Optional[List[str]] = None,
145        target: Optional[str] = None,
146        dtypes: Optional[Dict[str, str]] = None,
147        instance: Optional[Union[str, InstanceConnector]] = None,
148        temporary: bool = False,
149        mrsm_instance: Optional[Union[str, InstanceConnector]] = None,
150        cache: bool = False,
151        debug: bool = False,
152        connector_keys: Optional[str] = None,
153        metric_key: Optional[str] = None,
154        location_key: Optional[str] = None,
155    ):
156        """
157        Parameters
158        ----------
159        connector: str
160            Keys for the pipe's source connector, e.g. `'sql:main'`.
161
162        metric: str
163            Label for the pipe's contents, e.g. `'weather'`.
164
165        location: str, default None
166            Label for the pipe's location. Defaults to `None`.
167
168        parameters: Optional[Dict[str, Any]], default None
169            Optionally set a pipe's parameters from the constructor,
170            e.g. columns and other attributes.
171            You can edit these parameters with `edit pipes`.
172
173        columns: Optional[Dict[str, str]], default None
174            Set the `columns` dictionary of `parameters`.
175            If `parameters` is also provided, this dictionary is added under the `'columns'` key.
176
177        tags: Optional[List[str]], default None
178            A list of strings to be added under the `'tags'` key of `parameters`.
179            You can select pipes with certain tags using `--tags`.
180
181        dtypes: Optional[Dict[str, str]], default None
182            Set the `dtypes` dictionary of `parameters`.
183            If `parameters` is also provided, this dictionary is added under the `'dtypes'` key.
184
185        mrsm_instance: Optional[Union[str, InstanceConnector]], default None
186            Connector for the Meerschaum instance where the pipe resides.
187            Defaults to the preconfigured default instance (`'sql:main'`).
188
189        instance: Optional[Union[str, InstanceConnector]], default None
190            Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored.
191
192        temporary: bool, default False
193            If `True`, prevent instance tables (pipes, users, plugins) from being created.
194
195        cache: bool, default False
196            If `True`, cache fetched data into a local database file.
197            Defaults to `False`.
198        """
199        from meerschaum.utils.warnings import error, warn
200        if (not connector and not connector_keys) or (not metric and not metric_key):
201            error(
202                "Please provide strings for the connector and metric\n    "
203                + "(first two positional arguments)."
204            )
205
206        ### Fall back to legacy `location_key` just in case.
207        if not location:
208            location = location_key
209
210        if not connector:
211            connector = connector_keys
212
213        if not metric:
214            metric = metric_key
215
216        if location in ('[None]', 'None'):
217            location = None
218
219        from meerschaum.config.static import STATIC_CONFIG
220        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
221        for k in (connector, metric, location, *(tags or [])):
222            if str(k).startswith(negation_prefix):
223                error(f"A pipe's keys and tags cannot start with the prefix '{negation_prefix}'.")
224
225        self.connector_keys = str(connector)
226        self.connector_key = self.connector_keys ### Alias
227        self.metric_key = metric
228        self.location_key = location
229        self.temporary = temporary
230
231        self._attributes = {
232            'connector_keys': self.connector_keys,
233            'metric_key': self.metric_key,
234            'location_key': self.location_key,
235            'parameters': {},
236        }
237
238        ### only set parameters if values are provided
239        if isinstance(parameters, dict):
240            self._attributes['parameters'] = parameters
241        else:
242            if parameters is not None:
243                warn(f"The provided parameters are of invalid type '{type(parameters)}'.")
244            self._attributes['parameters'] = {}
245
246        columns = columns or self._attributes.get('parameters', {}).get('columns', {})
247        if isinstance(columns, list):
248            columns = {str(col): str(col) for col in columns}
249        if isinstance(columns, dict):
250            self._attributes['parameters']['columns'] = columns
251        elif columns is not None:
252            warn(f"The provided columns are of invalid type '{type(columns)}'.")
253
254        if isinstance(tags, (list, tuple)):
255            self._attributes['parameters']['tags'] = tags
256        elif tags is not None:
257            warn(f"The provided tags are of invalid type '{type(tags)}'.")
258
259        if isinstance(target, str):
260            self._attributes['parameters']['target'] = target
261        elif target is not None:
262            warn(f"The provided target is of invalid type '{type(target)}'.")
263
264        if isinstance(dtypes, dict):
265            self._attributes['parameters']['dtypes'] = dtypes
266        elif dtypes is not None:
267            warn(f"The provided dtypes are of invalid type '{type(dtypes)}'.")
268
269        ### NOTE: The parameters dictionary is {} by default.
270        ###       A Pipe may be registered without parameters, then edited,
271        ###       or a Pipe may be registered with parameters set in-memory first.
272        #  from meerschaum.config import get_config
273        _mrsm_instance = mrsm_instance if mrsm_instance is not None else instance
274        if _mrsm_instance is None:
275            _mrsm_instance = get_config('meerschaum', 'instance', patch=True)
276
277        if not isinstance(_mrsm_instance, str):
278            self._instance_connector = _mrsm_instance
279            self.instance_keys = str(_mrsm_instance)
280        else: ### NOTE: must be SQL or API Connector for this work
281            self.instance_keys = _mrsm_instance
282
283        self._cache = cache and get_config('system', 'experimental', 'cache')
284
285
286    @property
287    def meta(self):
288        """
289        Return the four keys needed to reconstruct this pipe.
290        """
291        return {
292            'connector': self.connector_keys,
293            'metric': self.metric_key,
294            'location': self.location_key,
295            'instance': self.instance_keys,
296        }
297
298
299    def keys(self) -> List[str]:
300        """
301        Return the ordered keys for this pipe.
302        """
303        return {
304            key: val
305            for key, val in self.meta.items()
306            if key != 'instance'
307        }
308
309
310    @property
311    def instance_connector(self) -> Union[InstanceConnector, None]:
312        """
313        The connector to where this pipe resides.
314        May either be of type `meerschaum.connectors.sql.SQLConnector` or
315        `meerschaum.connectors.api.APIConnector`.
316        """
317        if '_instance_connector' not in self.__dict__:
318            from meerschaum.connectors.parse import parse_instance_keys
319            conn = parse_instance_keys(self.instance_keys)
320            if conn:
321                self._instance_connector = conn
322            else:
323                return None
324        return self._instance_connector
325
326    @property
327    def connector(self) -> Union[meerschaum.connectors.Connector, None]:
328        """
329        The connector to the data source.
330        """
331        if '_connector' not in self.__dict__:
332            from meerschaum.connectors.parse import parse_instance_keys
333            import warnings
334            with warnings.catch_warnings():
335                warnings.simplefilter('ignore')
336                try:
337                    conn = parse_instance_keys(self.connector_keys)
338                except Exception as e:
339                    conn = None
340            if conn:
341                self._connector = conn
342            else:
343                return None
344        return self._connector
345
346
347    @property
348    def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]:
349        """
350        If the pipe was created with `cache=True`, return the connector to the pipe's
351        SQLite database for caching.
352        """
353        if not self._cache:
354            return None
355
356        if '_cache_connector' not in self.__dict__:
357            from meerschaum.connectors import get_connector
358            from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH
359            _resources_path = SQLITE_RESOURCES_PATH
360            self._cache_connector = get_connector(
361                'sql', '_cache_' + str(self),
362                flavor='sqlite',
363                database=str(_resources_path / ('_cache_' + str(self) + '.db')),
364            )
365
366        return self._cache_connector
367
368
369    @property
370    def cache_pipe(self) -> Union['meerschaum.Pipe', None]:
371        """
372        If the pipe was created with `cache=True`, return another `meerschaum.Pipe` used to
373        manage the local data.
374        """
375        if self.cache_connector is None:
376            return None
377        if '_cache_pipe' not in self.__dict__:
378            from meerschaum.config._patch import apply_patch_to_config
379            from meerschaum.utils.sql import sql_item_name
380            _parameters = copy.deepcopy(self.parameters)
381            _fetch_patch = {
382                'fetch': ({
383                    'definition': (
384                        f"SELECT * FROM "
385                        + sql_item_name(
386                            str(self.target),
387                            self.instance_connector.flavor,
388                            self.instance_connector.get_pipe_schema(self),
389                        )
390                    ),
391                }) if self.instance_connector.type == 'sql' else ({
392                    'connector_keys': self.connector_keys,
393                    'metric_key': self.metric_key,
394                    'location_key': self.location_key,
395                })
396            }
397            _parameters = apply_patch_to_config(_parameters, _fetch_patch)
398            self._cache_pipe = Pipe(
399                self.instance_keys,
400                (self.connector_keys + '_' + self.metric_key + '_cache'),
401                self.location_key,
402                mrsm_instance = self.cache_connector,
403                parameters = _parameters,
404                cache = False,
405                temporary = True,
406            )
407
408        return self._cache_pipe
409
410
411    def __str__(self, ansi: bool=False):
412        return pipe_repr(self, ansi=ansi)
413
414
415    def __eq__(self, other):
416        try:
417            return (
418                isinstance(self, type(other))
419                and self.connector_keys == other.connector_keys
420                and self.metric_key == other.metric_key
421                and self.location_key == other.location_key
422                and self.instance_keys == other.instance_keys
423            )
424        except Exception as e:
425            return False
426
427    def __hash__(self):
428        ### Using an esoteric separator to avoid collisions.
429        sep = "[\"']"
430        return hash(
431            str(self.connector_keys) + sep
432            + str(self.metric_key) + sep
433            + str(self.location_key) + sep
434            + str(self.instance_keys) + sep
435        )
436
437    def __repr__(self, **kw) -> str:
438        return pipe_repr(self, **kw)
439
440    def __getstate__(self) -> Dict[str, Any]:
441        """
442        Define the state dictionary (pickling).
443        """
444        return {
445            'connector': self.connector_keys,
446            'metric': self.metric_key,
447            'location': self.location_key,
448            'parameters': self.parameters,
449            'instance': self.instance_keys,
450        }
451
452    def __setstate__(self, _state: Dict[str, Any]):
453        """
454        Read the state (unpickling).
455        """
456        self.__init__(**_state)
457
458
459    def __getitem__(self, key: str) -> Any:
460        """
461        Index the pipe's attributes.
462        If the `key` cannot be found`, return `None`.
463        """
464        if key in self.attributes:
465            return self.attributes.get(key, None)
466
467        aliases = {
468            'connector': 'connector_keys',
469            'connector_key': 'connector_keys',
470            'metric': 'metric_key',
471            'location': 'location_key',
472        }
473        aliased_key = aliases.get(key, None)
474        if aliased_key is not None:
475            return self.attributes.get(aliased_key, None)
476
477        property_aliases = {
478            'instance': 'instance_keys',
479            'instance_key': 'instance_keys',
480        }
481        aliased_key = property_aliases.get(key, None)
482        if aliased_key is not None:
483            key = aliased_key
484        return getattr(self, key, None)

Access Meerschaum pipes via Pipe objects.

Pipes are identified by the following:

  1. Connector keys (e.g. 'sql:main')
  2. Metric key (e.g. 'weather')
  3. Location (optional; e.g. None)

A pipe's connector keys correspond to a data source, and when the pipe is synced, its fetch definition is evaluated and executed to produce new data.

Alternatively, new data may be directly synced via pipe.sync():

>>> from meerschaum import Pipe
>>> pipe = Pipe('csv', 'weather')
>>>
>>> import pandas as pd
>>> df = pd.read_csv('weather.csv')
>>> pipe.sync(df)
Pipe( connector: str = '', metric: str = '', location: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None, columns: Union[Dict[str, str], List[str], NoneType] = None, tags: Optional[List[str]] = None, target: Optional[str] = None, dtypes: Optional[Dict[str, str]] = None, instance: Union[str, meerschaum.connectors.sql.SQLConnector.SQLConnector, meerschaum.connectors.api.APIConnector.APIConnector, NoneType] = None, temporary: bool = False, mrsm_instance: Union[str, meerschaum.connectors.sql.SQLConnector.SQLConnector, meerschaum.connectors.api.APIConnector.APIConnector, NoneType] = None, cache: bool = False, debug: bool = False, connector_keys: Optional[str] = None, metric_key: Optional[str] = None, location_key: Optional[str] = None)
137    def __init__(
138        self,
139        connector: str = '',
140        metric: str = '',
141        location: Optional[str] = None,
142        parameters: Optional[Dict[str, Any]] = None,
143        columns: Union[Dict[str, str], List[str], None] = None,
144        tags: Optional[List[str]] = None,
145        target: Optional[str] = None,
146        dtypes: Optional[Dict[str, str]] = None,
147        instance: Optional[Union[str, InstanceConnector]] = None,
148        temporary: bool = False,
149        mrsm_instance: Optional[Union[str, InstanceConnector]] = None,
150        cache: bool = False,
151        debug: bool = False,
152        connector_keys: Optional[str] = None,
153        metric_key: Optional[str] = None,
154        location_key: Optional[str] = None,
155    ):
156        """
157        Parameters
158        ----------
159        connector: str
160            Keys for the pipe's source connector, e.g. `'sql:main'`.
161
162        metric: str
163            Label for the pipe's contents, e.g. `'weather'`.
164
165        location: str, default None
166            Label for the pipe's location. Defaults to `None`.
167
168        parameters: Optional[Dict[str, Any]], default None
169            Optionally set a pipe's parameters from the constructor,
170            e.g. columns and other attributes.
171            You can edit these parameters with `edit pipes`.
172
173        columns: Optional[Dict[str, str]], default None
174            Set the `columns` dictionary of `parameters`.
175            If `parameters` is also provided, this dictionary is added under the `'columns'` key.
176
177        tags: Optional[List[str]], default None
178            A list of strings to be added under the `'tags'` key of `parameters`.
179            You can select pipes with certain tags using `--tags`.
180
181        dtypes: Optional[Dict[str, str]], default None
182            Set the `dtypes` dictionary of `parameters`.
183            If `parameters` is also provided, this dictionary is added under the `'dtypes'` key.
184
185        mrsm_instance: Optional[Union[str, InstanceConnector]], default None
186            Connector for the Meerschaum instance where the pipe resides.
187            Defaults to the preconfigured default instance (`'sql:main'`).
188
189        instance: Optional[Union[str, InstanceConnector]], default None
190            Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored.
191
192        temporary: bool, default False
193            If `True`, prevent instance tables (pipes, users, plugins) from being created.
194
195        cache: bool, default False
196            If `True`, cache fetched data into a local database file.
197            Defaults to `False`.
198        """
199        from meerschaum.utils.warnings import error, warn
200        if (not connector and not connector_keys) or (not metric and not metric_key):
201            error(
202                "Please provide strings for the connector and metric\n    "
203                + "(first two positional arguments)."
204            )
205
206        ### Fall back to legacy `location_key` just in case.
207        if not location:
208            location = location_key
209
210        if not connector:
211            connector = connector_keys
212
213        if not metric:
214            metric = metric_key
215
216        if location in ('[None]', 'None'):
217            location = None
218
219        from meerschaum.config.static import STATIC_CONFIG
220        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
221        for k in (connector, metric, location, *(tags or [])):
222            if str(k).startswith(negation_prefix):
223                error(f"A pipe's keys and tags cannot start with the prefix '{negation_prefix}'.")
224
225        self.connector_keys = str(connector)
226        self.connector_key = self.connector_keys ### Alias
227        self.metric_key = metric
228        self.location_key = location
229        self.temporary = temporary
230
231        self._attributes = {
232            'connector_keys': self.connector_keys,
233            'metric_key': self.metric_key,
234            'location_key': self.location_key,
235            'parameters': {},
236        }
237
238        ### only set parameters if values are provided
239        if isinstance(parameters, dict):
240            self._attributes['parameters'] = parameters
241        else:
242            if parameters is not None:
243                warn(f"The provided parameters are of invalid type '{type(parameters)}'.")
244            self._attributes['parameters'] = {}
245
246        columns = columns or self._attributes.get('parameters', {}).get('columns', {})
247        if isinstance(columns, list):
248            columns = {str(col): str(col) for col in columns}
249        if isinstance(columns, dict):
250            self._attributes['parameters']['columns'] = columns
251        elif columns is not None:
252            warn(f"The provided columns are of invalid type '{type(columns)}'.")
253
254        if isinstance(tags, (list, tuple)):
255            self._attributes['parameters']['tags'] = tags
256        elif tags is not None:
257            warn(f"The provided tags are of invalid type '{type(tags)}'.")
258
259        if isinstance(target, str):
260            self._attributes['parameters']['target'] = target
261        elif target is not None:
262            warn(f"The provided target is of invalid type '{type(target)}'.")
263
264        if isinstance(dtypes, dict):
265            self._attributes['parameters']['dtypes'] = dtypes
266        elif dtypes is not None:
267            warn(f"The provided dtypes are of invalid type '{type(dtypes)}'.")
268
269        ### NOTE: The parameters dictionary is {} by default.
270        ###       A Pipe may be registered without parameters, then edited,
271        ###       or a Pipe may be registered with parameters set in-memory first.
272        #  from meerschaum.config import get_config
273        _mrsm_instance = mrsm_instance if mrsm_instance is not None else instance
274        if _mrsm_instance is None:
275            _mrsm_instance = get_config('meerschaum', 'instance', patch=True)
276
277        if not isinstance(_mrsm_instance, str):
278            self._instance_connector = _mrsm_instance
279            self.instance_keys = str(_mrsm_instance)
280        else: ### NOTE: must be SQL or API Connector for this work
281            self.instance_keys = _mrsm_instance
282
283        self._cache = cache and get_config('system', 'experimental', 'cache')
Parameters
  • connector (str): Keys for the pipe's source connector, e.g. 'sql:main'.
  • metric (str): Label for the pipe's contents, e.g. 'weather'.
  • location (str, default None): Label for the pipe's location. Defaults to None.
  • parameters (Optional[Dict[str, Any]], default None): Optionally set a pipe's parameters from the constructor, e.g. columns and other attributes. You can edit these parameters with edit pipes.
  • columns (Optional[Dict[str, str]], default None): Set the columns dictionary of parameters. If parameters is also provided, this dictionary is added under the 'columns' key.
  • tags (Optional[List[str]], default None): A list of strings to be added under the 'tags' key of parameters. You can select pipes with certain tags using --tags.
  • dtypes (Optional[Dict[str, str]], default None): Set the dtypes dictionary of parameters. If parameters is also provided, this dictionary is added under the 'dtypes' key.
  • mrsm_instance (Optional[Union[str, InstanceConnector]], default None): Connector for the Meerschaum instance where the pipe resides. Defaults to the preconfigured default instance ('sql:main').
  • instance (Optional[Union[str, InstanceConnector]], default None): Alias for mrsm_instance. If mrsm_instance is supplied, this value is ignored.
  • temporary (bool, default False): If True, prevent instance tables (pipes, users, plugins) from being created.
  • cache (bool, default False): If True, cache fetched data into a local database file. Defaults to False.
connector_keys
connector_key
metric_key
location_key
temporary
meta

Return the four keys needed to reconstruct this pipe.

def keys(self) -> List[str]:
299    def keys(self) -> List[str]:
300        """
301        Return the ordered keys for this pipe.
302        """
303        return {
304            key: val
305            for key, val in self.meta.items()
306            if key != 'instance'
307        }

Return the ordered keys for this pipe.

instance_connector: Union[meerschaum.connectors.sql.SQLConnector.SQLConnector, meerschaum.connectors.api.APIConnector.APIConnector, NoneType]

The connector to where this pipe resides. May either be of type meerschaum.connectors.sql.SQLConnector or meerschaum.connectors.api.APIConnector.

The connector to the data source.

cache_connector: Optional[meerschaum.connectors.sql.SQLConnector.SQLConnector]

If the pipe was created with cache=True, return the connector to the pipe's SQLite database for caching.

cache_pipe: Optional[Pipe]

If the pipe was created with cache=True, return another meerschaum.Pipe used to manage the local data.

def fetch( self, begin: Union[datetime.datetime, str, NoneType] = '', end: Optional[datetime.datetime] = None, check_existing: bool = True, sync_chunks: bool = False, debug: bool = False, **kw: Any) -> "Union['pd.DataFrame', Iterator['pd.DataFrame'], None]":
17def fetch(
18        self,
19        begin: Union[datetime, str, None] = '',
20        end: Optional[datetime] = None,
21        check_existing: bool = True,
22        sync_chunks: bool = False,
23        debug: bool = False,
24        **kw: Any
25    ) -> Union['pd.DataFrame', Iterator['pd.DataFrame'], None]:
26    """
27    Fetch a Pipe's latest data from its connector.
28
29    Parameters
30    ----------
31    begin: Union[datetime, str, None], default '':
32        If provided, only fetch data newer than or equal to `begin`.
33
34    end: Optional[datetime], default None:
35        If provided, only fetch data older than or equal to `end`.
36
37    check_existing: bool, default True
38        If `False`, do not apply the backtrack interval.
39
40    sync_chunks: bool, default False
41        If `True` and the pipe's connector is of type `'sql'`, begin syncing chunks while fetching
42        loads chunks into memory.
43
44    debug: bool, default False
45        Verbosity toggle.
46
47    Returns
48    -------
49    A `pd.DataFrame` of the newest unseen data.
50
51    """
52    if 'fetch' not in dir(self.connector):
53        warn(f"No `fetch()` function defined for connector '{self.connector}'")
54        return None
55
56    from meerschaum.connectors import custom_types, get_connector_plugin
57    from meerschaum.utils.debug import dprint, _checkpoint
58
59    _chunk_hook = kw.pop('chunk_hook', None)
60    kw['workers'] = self.get_num_workers(kw.get('workers', None))
61    if sync_chunks and _chunk_hook is None:
62
63        def _chunk_hook(chunk, **_kw) -> SuccessTuple:
64            """
65            Wrap `Pipe.sync()` with a custom chunk label prepended to the message.
66            """
67            from meerschaum.config._patch import apply_patch_to_config
68            kwargs = apply_patch_to_config(kw, _kw)
69            chunk_success, chunk_message = self.sync(chunk, **kwargs)
70            chunk_label = self._get_chunk_label(chunk, self.columns.get('datetime', None))
71            if chunk_label:
72                chunk_message = '\n' + chunk_label + '\n' + chunk_message
73            return chunk_success, chunk_message
74
75
76    with mrsm.Venv(get_connector_plugin(self.connector)):
77        df = self.connector.fetch(
78            self,
79            begin = _determine_begin(
80                self,
81                begin,
82                check_existing = check_existing,
83                debug = debug,
84            ),
85            end = end,
86            chunk_hook = _chunk_hook,
87            debug = debug,
88            **kw
89        )
90    return df

Fetch a Pipe's latest data from its connector.

Parameters
  • begin (Union[datetime, str, None], default '':): If provided, only fetch data newer than or equal to begin.
  • end (Optional[datetime], default None:): If provided, only fetch data older than or equal to end.
  • check_existing (bool, default True): If False, do not apply the backtrack interval.
  • sync_chunks (bool, default False): If True and the pipe's connector is of type 'sql', begin syncing chunks while fetching loads chunks into memory.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pd.DataFrame of the newest unseen data.
def get_backtrack_interval( self, check_existing: bool = True, debug: bool = False) -> Union[datetime.timedelta, int]:
 93def get_backtrack_interval(
 94        self,
 95        check_existing: bool = True,
 96        debug: bool = False,
 97    ) -> Union[timedelta, int]:
 98    """
 99    Get the chunk interval to use for this pipe.
100
101    Parameters
102    ----------
103    check_existing: bool, default True
104        If `False`, return a backtrack_interval of 0 minutes.
105
106    Returns
107    -------
108    The backtrack interval (`timedelta` or `int`) to use with this pipe's `datetime` axis.
109    """
110    default_backtrack_minutes = get_config('pipes', 'parameters', 'fetch', 'backtrack_minutes')
111    configured_backtrack_minutes = self.parameters.get('fetch', {}).get('backtrack_minutes', None)
112    backtrack_minutes = (
113        configured_backtrack_minutes
114        if configured_backtrack_minutes is not None
115        else default_backtrack_minutes
116    ) if check_existing else 0
117
118    backtrack_interval = timedelta(minutes=backtrack_minutes)
119    dt_col = self.columns.get('datetime', None)
120    if dt_col is None:
121        return backtrack_interval
122
123    dt_dtype = self.dtypes.get(dt_col, 'datetime64[ns]')
124    if 'int' in dt_dtype.lower():
125        return backtrack_minutes
126
127    return backtrack_interval

Get the chunk interval to use for this pipe.

Parameters
  • check_existing (bool, default True): If False, return a backtrack_interval of 0 minutes.
Returns
  • The backtrack interval (timedelta or int) to use with this pipe's datetime axis.
def get_data( self, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, as_iterator: bool = False, as_chunks: bool = False, as_dask: bool = False, chunk_interval: Union[datetime.timedelta, int, NoneType] = None, fresh: bool = False, debug: bool = False, **kw: Any) -> "Union['pd.DataFrame', Generator['pd.DataFrame'], None]":
 15def get_data(
 16        self,
 17        select_columns: Optional[List[str]] = None,
 18        omit_columns: Optional[List[str]] = None,
 19        begin: Union[datetime, int, None] = None,
 20        end: Union[datetime, int, None] = None,
 21        params: Optional[Dict[str, Any]] = None,
 22        as_iterator: bool = False,
 23        as_chunks: bool = False,
 24        as_dask: bool = False,
 25        chunk_interval: Union[timedelta, int, None] = None,
 26        fresh: bool = False,
 27        debug: bool = False,
 28        **kw: Any
 29    ) -> Union['pd.DataFrame', Generator['pd.DataFrame'], None]:
 30    """
 31    Get a pipe's data from the instance connector.
 32
 33    Parameters
 34    ----------
 35    select_columns: Optional[List[str]], default None
 36        If provided, only select these given columns.
 37        Otherwise select all available columns (i.e. `SELECT *`).
 38
 39    omit_columns: Optional[List[str]], default None
 40        If provided, remove these columns from the selection.
 41
 42    begin: Union[datetime, int, None], default None
 43        Lower bound datetime to begin searching for data (inclusive).
 44        Translates to a `WHERE` clause like `WHERE datetime >= begin`.
 45        Defaults to `None`.
 46
 47    end: Union[datetime, int, None], default None
 48        Upper bound datetime to stop searching for data (inclusive).
 49        Translates to a `WHERE` clause like `WHERE datetime < end`.
 50        Defaults to `None`.
 51
 52    params: Optional[Dict[str, Any]], default None
 53        Filter the retrieved data by a dictionary of parameters.
 54        See `meerschaum.utils.sql.build_where` for more details. 
 55
 56    as_iterator: bool, default False
 57        If `True`, return a generator of chunks of pipe data.
 58
 59    as_chunks: bool, default False
 60        Alias for `as_iterator`.
 61
 62    as_dask: bool, default False
 63        If `True`, return a `dask.DataFrame`
 64        (which may be loaded into a Pandas DataFrame with `df.compute()`).
 65
 66    chunk_interval: Union[timedelta, int, None], default None
 67        If `as_iterator`, then return chunks with `begin` and `end` separated by this interval.
 68        This may be set under `pipe.parameters['chunk_minutes']`.
 69        By default, use a timedelta of 1440 minutes (1 day).
 70        If `chunk_interval` is an integer and the `datetime` axis a timestamp,
 71        the use a timedelta with the number of minutes configured to this value.
 72        If the `datetime` axis is an integer, default to the configured chunksize.
 73        If `chunk_interval` is a `timedelta` and the `datetime` axis an integer,
 74        use the number of minutes in the `timedelta`.
 75
 76    fresh: bool, default True
 77        If `True`, skip local cache and directly query the instance connector.
 78        Defaults to `True`.
 79
 80    debug: bool, default False
 81        Verbosity toggle.
 82        Defaults to `False`.
 83
 84    Returns
 85    -------
 86    A `pd.DataFrame` for the pipe's data corresponding to the provided parameters.
 87
 88    """
 89    from meerschaum.utils.warnings import warn
 90    from meerschaum.utils.venv import Venv
 91    from meerschaum.connectors import get_connector_plugin
 92    from meerschaum.utils.misc import iterate_chunks, items_str
 93    from meerschaum.utils.dtypes import to_pandas_dtype
 94    from meerschaum.utils.dataframe import add_missing_cols_to_df
 95    from meerschaum.utils.packages import attempt_import
 96    dd = attempt_import('dask.dataframe') if as_dask else None
 97    dask = attempt_import('dask') if as_dask else None
 98
 99    if select_columns == '*':
100        select_columns = None
101    elif isinstance(select_columns, str):
102        select_columns = [select_columns]
103
104    if isinstance(omit_columns, str):
105        omit_columns = [omit_columns]
106
107    as_iterator = as_iterator or as_chunks
108
109    if as_iterator or as_chunks:
110        return self._get_data_as_iterator(
111            select_columns = select_columns,
112            omit_columns = omit_columns,
113            begin = begin,
114            end = end,
115            params = params,
116            chunk_interval = chunk_interval,
117            fresh = fresh,
118            debug = debug,
119        )
120
121    if as_dask:
122        from multiprocessing.pool import ThreadPool
123        dask_pool = ThreadPool(self.get_num_workers())
124        dask.config.set(pool=dask_pool)
125        chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug)
126        bounds = self.get_chunk_bounds(
127            begin = begin,
128            end = end,
129            bounded = False,
130            chunk_interval = chunk_interval,
131            debug = debug,
132        )
133        dask_chunks = [
134            dask.delayed(self.get_data)(
135                select_columns = select_columns,
136                omit_columns = omit_columns,
137                begin = chunk_begin,
138                end = chunk_end,
139                params = params,
140                chunk_interval = chunk_interval,
141                fresh = fresh,
142                debug = debug,
143            )
144            for (chunk_begin, chunk_end) in bounds
145        ]
146        dask_meta = {
147            col: to_pandas_dtype(typ)
148            for col, typ in self.dtypes.items()
149        }
150        return dd.from_delayed(dask_chunks, meta=dask_meta)
151
152    if not self.exists(debug=debug):
153        return None
154       
155    if self.cache_pipe is not None:
156        if not fresh:
157            _sync_cache_tuple = self.cache_pipe.sync(
158                begin = begin,
159                end = end,
160                params = params,
161                debug = debug,
162                **kw
163            )
164            if not _sync_cache_tuple[0]:
165                warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1])
166                fresh = True
167            else: ### Successfully synced cache.
168                return self.enforce_dtypes(
169                    self.cache_pipe.get_data(
170                        select_columns = select_columns,
171                        omit_columns = omit_columns,
172                        begin = begin,
173                        end = end,
174                        params = params,
175                        debug = debug,
176                        fresh = True,
177                        **kw
178                    ),
179                    debug = debug,
180                )
181
182    with Venv(get_connector_plugin(self.instance_connector)):
183        df = self.instance_connector.get_pipe_data(
184            pipe = self,
185            select_columns = select_columns,
186            omit_columns = omit_columns,
187            begin = begin,
188            end = end,
189            params = params,
190            debug = debug,
191            **kw
192        )
193        if df is None:
194            return df
195
196        if not select_columns:
197            select_columns = [col for col in df.columns]
198
199        cols_to_omit = [
200            col
201            for col in df.columns
202            if (
203                col in (omit_columns or [])
204                or
205                col not in (select_columns or [])
206            )
207        ]
208        cols_to_add = [
209            col
210            for col in select_columns
211            if col not in df.columns
212        ]
213        if cols_to_omit:
214            warn(
215                (
216                    f"Received {len(cols_to_omit)} omitted column"
217                    + ('s' if len(cols_to_omit) != 1 else '')
218                    + f" for {self}. "
219                    + "Consider adding `select_columns` and `omit_columns` support to "
220                    + f"'{self.instance_connector.type}' connectors to improve performance."
221                ),
222                stack = False,
223            )
224            _cols_to_select = [col for col in df.columns if col not in cols_to_omit]
225            df = df[_cols_to_select]
226
227        if cols_to_add:
228            warn(
229                (
230                    f"Specified columns {items_str(cols_to_add)} were not found on {self}. "
231                    + "Adding these to the DataFrame as null columns."
232                ),
233                stack = False,
234            )
235            df = add_missing_cols_to_df(df, {col: 'string' for col in cols_to_add})
236
237        return self.enforce_dtypes(df, debug=debug)

Get a pipe's data from the instance connector.

Parameters
  • select_columns (Optional[List[str]], default None): If provided, only select these given columns. Otherwise select all available columns (i.e. SELECT *).
  • omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
  • begin (Union[datetime, int, None], default None): Lower bound datetime to begin searching for data (inclusive). Translates to a WHERE clause like WHERE datetime >= begin. Defaults to None.
  • end (Union[datetime, int, None], default None): Upper bound datetime to stop searching for data (inclusive). Translates to a WHERE clause like WHERE datetime < end. Defaults to None.
  • params (Optional[Dict[str, Any]], default None): Filter the retrieved data by a dictionary of parameters. See meerschaum.utils.sql.build_where for more details.
  • as_iterator (bool, default False): If True, return a generator of chunks of pipe data.
  • as_chunks (bool, default False): Alias for as_iterator.
  • as_dask (bool, default False): If True, return a dask.DataFrame (which may be loaded into a Pandas DataFrame with df.compute()).
  • chunk_interval (Union[timedelta, int, None], default None): If as_iterator, then return chunks with begin and end separated by this interval. This may be set under pipe.parameters['chunk_minutes']. By default, use a timedelta of 1440 minutes (1 day). If chunk_interval is an integer and the datetime axis a timestamp, the use a timedelta with the number of minutes configured to this value. If the datetime axis is an integer, default to the configured chunksize. If chunk_interval is a timedelta and the datetime axis an integer, use the number of minutes in the timedelta.
  • fresh (bool, default True): If True, skip local cache and directly query the instance connector. Defaults to True.
  • debug (bool, default False): Verbosity toggle. Defaults to False.
Returns
  • A pd.DataFrame for the pipe's data corresponding to the provided parameters.
def get_backtrack_data( self, backtrack_minutes: Optional[int] = None, begin: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, fresh: bool = False, debug: bool = False, **kw: Any) -> "Optional['pd.DataFrame']":
332def get_backtrack_data(
333        self,
334        backtrack_minutes: Optional[int] = None,
335        begin: Union[datetime, int, None] = None,
336        params: Optional[Dict[str, Any]] = None,
337        fresh: bool = False,
338        debug: bool = False,
339        **kw: Any
340    ) -> Optional['pd.DataFrame']:
341    """
342    Get the most recent data from the instance connector as a Pandas DataFrame.
343
344    Parameters
345    ----------
346    backtrack_minutes: Optional[int], default None
347        How many minutes from `begin` to select from.
348        If `None`, use `pipe.parameters['fetch']['backtrack_minutes']`.
349
350    begin: Optional[datetime], default None
351        The starting point to search for data.
352        If begin is `None` (default), use the most recent observed datetime
353        (AKA sync_time).
354
355        ```
356        E.g. begin = 02:00
357
358        Search this region.           Ignore this, even if there's data.
359        /  /  /  /  /  /  /  /  /  |
360        -----|----------|----------|----------|----------|----------|
361        00:00      01:00      02:00      03:00      04:00      05:00
362
363        ```
364
365    params: Optional[Dict[str, Any]], default None
366        The standard Meerschaum `params` query dictionary.
367        
368        
369    fresh: bool, default False
370        If `True`, Ignore local cache and pull directly from the instance connector.
371        Only comes into effect if a pipe was created with `cache=True`.
372
373    debug: bool default False
374        Verbosity toggle.
375
376    Returns
377    -------
378    A `pd.DataFrame` for the pipe's data corresponding to the provided parameters. Backtrack data
379    is a convenient way to get a pipe's data "backtracked" from the most recent datetime.
380    """
381    from meerschaum.utils.warnings import warn
382    from meerschaum.utils.venv import Venv
383    from meerschaum.connectors import get_connector_plugin
384
385    if not self.exists(debug=debug):
386        return None
387
388    backtrack_interval = self.get_backtrack_interval(debug=debug)
389    if backtrack_minutes is None:
390        backtrack_minutes = (
391            (backtrack_interval.total_seconds() * 60)
392            if isinstance(backtrack_interval, timedelta)
393            else backtrack_interval
394        )
395
396    if self.cache_pipe is not None:
397        if not fresh:
398            _sync_cache_tuple = self.cache_pipe.sync(begin=begin, params=params, debug=debug, **kw)
399            if not _sync_cache_tuple[0]:
400                warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1])
401                fresh = True
402            else: ### Successfully synced cache.
403                return self.enforce_dtypes(
404                    self.cache_pipe.get_backtrack_data(
405                        fresh = True,
406                        begin = begin,
407                        backtrack_minutes = backtrack_minutes,
408                        params = params,
409                        debug = deubg,
410                        **kw
411                    ),
412                    debug = debug,
413                )
414
415    if hasattr(self.instance_connector, 'get_backtrack_data'):
416        with Venv(get_connector_plugin(self.instance_connector)):
417            return self.enforce_dtypes(
418                self.instance_connector.get_backtrack_data(
419                    pipe = self,
420                    begin = begin,
421                    backtrack_minutes = backtrack_minutes,
422                    params = params,
423                    debug = debug,
424                    **kw
425                ),
426                debug = debug,
427            )
428
429    if begin is None:
430        begin = self.get_sync_time(params=params, debug=debug)
431
432    backtrack_interval = (
433        timedelta(minutes=backtrack_minutes)
434        if isinstance(begin, datetime)
435        else backtrack_minutes
436    )
437    if begin is not None:
438        begin = begin - backtrack_interval
439
440    return self.get_data(
441        begin = begin,
442        params = params,
443        debug = debug,
444        **kw
445    )

Get the most recent data from the instance connector as a Pandas DataFrame.

Parameters
  • backtrack_minutes (Optional[int], default None): How many minutes from begin to select from. If None, use pipe.parameters['fetch']['backtrack_minutes'].
  • begin (Optional[datetime], default None): The starting point to search for data. If begin is None (default), use the most recent observed datetime (AKA sync_time).
    E.g. begin = 02:00
    
    Search this region.           Ignore this, even if there's data.
    /  /  /  /  /  /  /  /  /  |
    -----|----------|----------|----------|----------|----------|
    00:00      01:00      02:00      03:00      04:00      05:00
    
    
  • params (Optional[Dict[str, Any]], default None): The standard Meerschaum params query dictionary.
  • fresh (bool, default False): If True, Ignore local cache and pull directly from the instance connector. Only comes into effect if a pipe was created with cache=True.
  • debug (bool default False): Verbosity toggle.
Returns
  • A pd.DataFrame for the pipe's data corresponding to the provided parameters. Backtrack data
  • is a convenient way to get a pipe's data "backtracked" from the most recent datetime.
def get_rowcount( self, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, params: Optional[Dict[str, Any]] = None, remote: bool = False, debug: bool = False) -> int:
448def get_rowcount(
449        self,
450        begin: Optional[datetime] = None,
451        end: Optional['datetime'] = None,
452        params: Optional[Dict[str, Any]] = None,
453        remote: bool = False,
454        debug: bool = False
455    ) -> int:
456    """
457    Get a Pipe's instance or remote rowcount.
458
459    Parameters
460    ----------
461    begin: Optional[datetime], default None
462        Count rows where datetime > begin.
463
464    end: Optional[datetime], default None
465        Count rows where datetime < end.
466
467    remote: bool, default False
468        Count rows from a pipe's remote source.
469        **NOTE**: This is experimental!
470
471    debug: bool, default False
472        Verbosity toggle.
473
474    Returns
475    -------
476    An `int` of the number of rows in the pipe corresponding to the provided parameters.
477    Returned 0 if the pipe does not exist.
478    """
479    from meerschaum.utils.warnings import warn
480    from meerschaum.utils.venv import Venv
481    from meerschaum.connectors import get_connector_plugin
482
483    connector = self.instance_connector if not remote else self.connector
484    try:
485        with Venv(get_connector_plugin(connector)):
486            rowcount = connector.get_pipe_rowcount(
487                self,
488                begin = begin,
489                end = end,
490                params = params,
491                remote = remote,
492                debug = debug,
493            )
494            if rowcount is None:
495                return 0
496            return rowcount
497    except AttributeError as e:
498        warn(e)
499        if remote:
500            return 0
501    warn(f"Failed to get a rowcount for {self}.")
502    return 0

Get a Pipe's instance or remote rowcount.

Parameters
  • begin (Optional[datetime], default None): Count rows where datetime > begin.
  • end (Optional[datetime], default None): Count rows where datetime < end.
  • remote (bool, default False): Count rows from a pipe's remote source. NOTE: This is experimental!
  • debug (bool, default False): Verbosity toggle.
Returns
  • An int of the number of rows in the pipe corresponding to the provided parameters.
  • Returned 0 if the pipe does not exist.
def get_chunk_interval( self, chunk_interval: Union[datetime.timedelta, int, NoneType] = None, debug: bool = False) -> Union[datetime.timedelta, int]:
505def get_chunk_interval(
506        self,
507        chunk_interval: Union[timedelta, int, None] = None,
508        debug: bool = False,
509    ) -> Union[timedelta, int]:
510    """
511    Get the chunk interval to use for this pipe.
512
513    Parameters
514    ----------
515    chunk_interval: Union[timedelta, int, None], default None
516        If provided, coerce this value into the correct type.
517        For example, if the datetime axis is an integer, then
518        return the number of minutes.
519
520    Returns
521    -------
522    The chunk interval (`timedelta` or `int`) to use with this pipe's `datetime` axis.
523    """
524    default_chunk_minutes = get_config('pipes', 'parameters', 'verify', 'chunk_minutes')
525    configured_chunk_minutes = self.parameters.get('verify', {}).get('chunk_minutes', None)
526    chunk_minutes = (
527        (configured_chunk_minutes or default_chunk_minutes)
528        if chunk_interval is None
529        else (
530            chunk_interval
531            if isinstance(chunk_interval, int)
532            else int(chunk_interval.total_seconds() / 60)
533        )
534    )
535
536    dt_col = self.columns.get('datetime', None)
537    if dt_col is None:
538        return timedelta(minutes=chunk_minutes)
539
540    dt_dtype = self.dtypes.get(dt_col, 'datetime64[ns]')
541    if 'int' in dt_dtype.lower():
542        return chunk_minutes
543    return timedelta(minutes=chunk_minutes)

Get the chunk interval to use for this pipe.

Parameters
  • chunk_interval (Union[timedelta, int, None], default None): If provided, coerce this value into the correct type. For example, if the datetime axis is an integer, then return the number of minutes.
Returns
  • The chunk interval (timedelta or int) to use with this pipe's datetime axis.
def get_chunk_bounds( self, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, bounded: bool = False, chunk_interval: Union[datetime.timedelta, int, NoneType] = None, debug: bool = False) -> List[Tuple[Union[datetime.datetime, int, NoneType], Union[datetime.datetime, int, NoneType]]]:
546def get_chunk_bounds(
547        self,
548        begin: Union[datetime, int, None] = None,
549        end: Union[datetime, int, None] = None,
550        bounded: bool = False,
551        chunk_interval: Union[timedelta, int, None] = None,
552        debug: bool = False,
553    ) -> List[
554        Tuple[
555            Union[datetime, int, None],
556            Union[datetime, int, None],
557        ]
558    ]:
559    """
560    Return a list of datetime bounds for iterating over the pipe's `datetime` axis.
561
562    Parameters
563    ----------
564    begin: Union[datetime, int, None], default None
565        If provided, do not select less than this value.
566        Otherwise the first chunk will be unbounded.
567
568    end: Union[datetime, int, None], default None
569        If provided, do not select greater than or equal to this value.
570        Otherwise the last chunk will be unbounded.
571
572    bounded: bool, default False
573        If `True`, do not include `None` in the first chunk.
574
575    chunk_interval: Union[timedelta, int, None], default None
576        If provided, use this interval for the size of chunk boundaries.
577        The default value for this pipe may be set
578        under `pipe.parameters['verify']['chunk_minutes']`.
579
580    debug: bool, default False
581        Verbosity toggle.
582
583    Returns
584    -------
585    A list of chunk bounds (datetimes or integers).
586    If unbounded, the first and last chunks will include `None`.
587    """
588    include_less_than_begin = not bounded and begin is None
589    include_greater_than_end = not bounded and end is None
590    if begin is None:
591        begin = self.get_sync_time(newest=False, debug=debug)
592    if end is None:
593        end = self.get_sync_time(newest=True, debug=debug)
594    if begin is None and end is None:
595        return [(None, None)]
596
597    ### Set the chunk interval under `pipe.parameters['verify']['chunk_minutes']`.
598    chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug)
599    
600    ### Build a list of tuples containing the chunk boundaries
601    ### so that we can sync multiple chunks in parallel.
602    ### Run `verify pipes --workers 1` to sync chunks in series.
603    chunk_bounds = []
604    begin_cursor = begin
605    while begin_cursor < end:
606        end_cursor = begin_cursor + chunk_interval
607        chunk_bounds.append((begin_cursor, end_cursor))
608        begin_cursor = end_cursor
609
610    ### The chunk interval might be too large.
611    if not chunk_bounds and end >= begin:
612        chunk_bounds = [(begin, end)]
613
614    ### Truncate the last chunk to the end timestamp.
615    if chunk_bounds[-1][1] > end:
616        chunk_bounds[-1] = (chunk_bounds[-1][0], end)
617
618    ### Pop the last chunk if its bounds are equal.
619    if chunk_bounds[-1][0] == chunk_bounds[-1][1]:
620        chunk_bounds = chunk_bounds[:-1]
621
622    if include_less_than_begin:
623        chunk_bounds = [(None, begin)] + chunk_bounds
624    if include_greater_than_end:
625        chunk_bounds = chunk_bounds + [(end, None)]
626
627    return chunk_bounds

Return a list of datetime bounds for iterating over the pipe's datetime axis.

Parameters
  • begin (Union[datetime, int, None], default None): If provided, do not select less than this value. Otherwise the first chunk will be unbounded.
  • end (Union[datetime, int, None], default None): If provided, do not select greater than or equal to this value. Otherwise the last chunk will be unbounded.
  • bounded (bool, default False): If True, do not include None in the first chunk.
  • chunk_interval (Union[timedelta, int, None], default None): If provided, use this interval for the size of chunk boundaries. The default value for this pipe may be set under pipe.parameters['verify']['chunk_minutes'].
  • debug (bool, default False): Verbosity toggle.
Returns
  • A list of chunk bounds (datetimes or integers).
  • If unbounded, the first and last chunks will include None.
def register(self, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
12def register(
13        self,
14        debug: bool = False,
15        **kw: Any
16    ) -> SuccessTuple:
17    """
18    Register a new Pipe along with its attributes.
19
20    Parameters
21    ----------
22    debug: bool, default False
23        Verbosity toggle.
24
25    kw: Any
26        Keyword arguments to pass to `instance_connector.register_pipe()`.
27
28    Returns
29    -------
30    A `SuccessTuple` of success, message.
31    """
32    if self.temporary:
33        return False, "Cannot register pipes created with `temporary=True` (read-only)."
34
35    from meerschaum.utils.formatting import get_console
36    from meerschaum.utils.venv import Venv
37    from meerschaum.connectors import get_connector_plugin, custom_types
38    from meerschaum.config._patch import apply_patch_to_config
39
40    import warnings
41    with warnings.catch_warnings():
42        warnings.simplefilter('ignore')
43        try:
44            _conn = self.connector
45        except Exception as e:
46            _conn = None
47
48    if (
49        _conn is not None
50        and
51        (_conn.type == 'plugin' or _conn.type in custom_types)
52        and
53        getattr(_conn, 'register', None) is not None
54    ):
55        try:
56            with Venv(get_connector_plugin(_conn), debug=debug):
57                params = self.connector.register(self)
58        except Exception as e:
59            get_console().print_exception()
60            params = None
61        params = {} if params is None else params
62        if not isinstance(params, dict):
63            from meerschaum.utils.warnings import warn
64            warn(
65                f"Invalid parameters returned from `register()` in connector {self.connector}:\n"
66                + f"{params}"
67            )
68        else:
69            self.parameters = apply_patch_to_config(params, self.parameters)
70
71    if not self.parameters:
72        cols = self.columns if self.columns else {'datetime': None, 'id': None}
73        self.parameters = {
74            'columns': cols,
75        }
76
77    with Venv(get_connector_plugin(self.instance_connector)):
78        return self.instance_connector.register_pipe(self, debug=debug, **kw)

Register a new Pipe along with its attributes.

Parameters
  • debug (bool, default False): Verbosity toggle.
  • kw (Any): Keyword arguments to pass to instance_connector.register_pipe().
Returns
  • A SuccessTuple of success, message.
attributes: Dict[str, Any]

Return a dictionary of a pipe's keys and parameters. These values are reflected directly from the pipes table of the instance.

parameters: Optional[Dict[str, Any]]

Return the parameters dictionary of the pipe.

columns: Optional[Dict[str, str]]

Return the columns dictionary defined in meerschaum.Pipe.parameters.

dtypes: Optional[Dict[str, Any]]

If defined, return the dtypes dictionary defined in meerschaum.Pipe.parameters.

def get_columns(self, *args: str, error: bool = False) -> Union[str, Tuple[str]]:
139def get_columns(self, *args: str, error: bool = False) -> Union[str, Tuple[str]]:
140    """
141    Check if the requested columns are defined.
142
143    Parameters
144    ----------
145    *args: str
146        The column names to be retrieved.
147        
148    error: bool, default False
149        If `True`, raise an `Exception` if the specified column is not defined.
150
151    Returns
152    -------
153    A tuple of the same size of `args` or a `str` if `args` is a single argument.
154
155    Examples
156    --------
157    >>> pipe = mrsm.Pipe('test', 'test')
158    >>> pipe.columns = {'datetime': 'dt', 'id': 'id'}
159    >>> pipe.get_columns('datetime', 'id')
160    ('dt', 'id')
161    >>> pipe.get_columns('value', error=True)
162    Exception:  ๐Ÿ›‘ Missing 'value' column for Pipe('test', 'test').
163    """
164    from meerschaum.utils.warnings import error as _error, warn
165    if not args:
166        args = tuple(self.columns.keys())
167    col_names = []
168    for col in args:
169        col_name = None
170        try:
171            col_name = self.columns[col]
172            if col_name is None and error:
173                _error(f"Please define the name of the '{col}' column for {self}.")
174        except Exception as e:
175            col_name = None
176        if col_name is None and error:
177            _error(f"Missing '{col}'" + f" column for {self}.")
178        col_names.append(col_name)
179    if len(col_names) == 1:
180        return col_names[0]
181    return tuple(col_names)

Check if the requested columns are defined.

Parameters
  • *args (str): The column names to be retrieved.
  • error (bool, default False): If True, raise an Exception if the specified column is not defined.
Returns
  • A tuple of the same size of args or a str if args is a single argument.
Examples
>>> pipe = mrsm.Pipe('test', 'test')
>>> pipe.columns = {'datetime': 'dt', 'id': 'id'}
>>> pipe.get_columns('datetime', 'id')
('dt', 'id')
>>> pipe.get_columns('value', error=True)
Exception:  ๐Ÿ›‘ Missing 'value' column for Pipe('test', 'test').
def get_columns_types(self, debug: bool = False) -> Optional[Dict[str, str]]:
184def get_columns_types(self, debug: bool = False) -> Union[Dict[str, str], None]:
185    """
186    Get a dictionary of a pipe's column names and their types.
187
188    Parameters
189    ----------
190    debug: bool, default False:
191        Verbosity toggle.
192
193    Returns
194    -------
195    A dictionary of column names (`str`) to column types (`str`).
196
197    Examples
198    --------
199    >>> pipe.get_columns_types()
200    {
201      'dt': 'TIMESTAMP WITHOUT TIMEZONE',
202      'id': 'BIGINT',
203      'val': 'DOUBLE PRECISION',
204    }
205    >>>
206    """
207    from meerschaum.utils.venv import Venv
208    from meerschaum.connectors import get_connector_plugin
209
210    with Venv(get_connector_plugin(self.instance_connector)):
211        return self.instance_connector.get_pipe_columns_types(self, debug=debug)

Get a dictionary of a pipe's column names and their types.

Parameters
  • debug (bool, default False:): Verbosity toggle.
Returns
  • A dictionary of column names (str) to column types (str).
Examples
>>> pipe.get_columns_types()
{
  'dt': 'TIMESTAMP WITHOUT TIMEZONE',
  'id': 'BIGINT',
  'val': 'DOUBLE PRECISION',
}
>>>
def get_indices(self) -> Dict[str, str]:
436def get_indices(self) -> Dict[str, str]:
437    """
438    Return a dictionary in the form of `pipe.columns` but map to index names.
439    """
440    return {
441        ix: (self.target + '_' + col + '_index')
442        for ix, col in self.columns.items() if col
443    }

Return a dictionary in the form of pipe.columns but map to index names.

tags: Optional[List[str]]

If defined, return the tags list defined in meerschaum.Pipe.parameters.

def get_id(self, **kw: Any) -> Optional[int]:
214def get_id(self, **kw: Any) -> Union[int, None]:
215    """
216    Fetch a pipe's ID from its instance connector.
217    If the pipe does not exist, return `None`.
218    """
219    if self.temporary:
220        return None
221    from meerschaum.utils.venv import Venv
222    from meerschaum.connectors import get_connector_plugin
223
224    with Venv(get_connector_plugin(self.instance_connector)):
225        return self.instance_connector.get_pipe_id(self, **kw)

Fetch a pipe's ID from its instance connector. If the pipe does not exist, return None.

id: Optional[int]

Fetch and cache a pipe's ID.

def get_val_column(self, debug: bool = False) -> Optional[str]:
238def get_val_column(self, debug: bool = False) -> Union[str, None]:
239    """
240    Return the name of the value column if it's defined, otherwise make an educated guess.
241    If not set in the `columns` dictionary, return the first numeric column that is not
242    an ID or datetime column.
243    If none may be found, return `None`.
244
245    Parameters
246    ----------
247    debug: bool, default False:
248        Verbosity toggle.
249
250    Returns
251    -------
252    Either a string or `None`.
253    """
254    from meerschaum.utils.debug import dprint
255    if debug:
256        dprint('Attempting to determine the value column...')
257    try:
258        val_name = self.get_columns('value')
259    except Exception as e:
260        val_name = None
261    if val_name is not None:
262        if debug:
263            dprint(f"Value column: {val_name}")
264        return val_name
265
266    cols = self.columns
267    if cols is None:
268        if debug:
269            dprint('No columns could be determined. Returning...')
270        return None
271    try:
272        dt_name = self.get_columns('datetime', error=False)
273    except Exception as e:
274        dt_name = None
275    try:
276        id_name = self.get_columns('id', errors=False)
277    except Exception as e:
278        id_name = None
279
280    if debug:
281        dprint(f"dt_name: {dt_name}")
282        dprint(f"id_name: {id_name}")
283
284    cols_types = self.get_columns_types(debug=debug)
285    if cols_types is None:
286        return None
287    if debug:
288        dprint(f"cols_types: {cols_types}")
289    if dt_name is not None:
290        cols_types.pop(dt_name, None)
291    if id_name is not None:
292        cols_types.pop(id_name, None)
293
294    candidates = []
295    candidate_keywords = {'float', 'double', 'precision', 'int', 'numeric',}
296    for search_term in candidate_keywords:
297        for col, typ in cols_types.items():
298            if search_term in typ.lower():
299                candidates.append(col)
300                break
301    if not candidates:
302        if debug:
303            dprint(f"No value column could be determined.")
304        return None
305
306    return candidates[0]

Return the name of the value column if it's defined, otherwise make an educated guess. If not set in the columns dictionary, return the first numeric column that is not an ID or datetime column. If none may be found, return None.

Parameters
  • debug (bool, default False:): Verbosity toggle.
Returns
  • Either a string or None.
parents: List[Pipe]

Return a list of meerschaum.Pipe objects to be designated as parents.

children: List[Pipe]

Return a list of meerschaum.Pipe objects to be designated as children.

target: str

The target table name. You can set the target name under on of the following keys (checked in this order):

  • target
  • target_name
  • target_table
  • target_table_name
def guess_datetime(self) -> Optional[str]:
416def guess_datetime(self) -> Union[str, None]:
417    """
418    Try to determine a pipe's datetime column.
419    """
420    dtypes = self.dtypes
421
422    ### Abort if the user explictly disallows a datetime index.
423    if 'datetime' in dtypes:
424        if dtypes['datetime'] is None:
425            return None
426
427    dt_cols = [
428        col for col, typ in self.dtypes.items()
429        if str(typ).startswith('datetime')
430    ]
431    if not dt_cols:
432        return None
433    return dt_cols[0]    

Try to determine a pipe's datetime column.

def show( self, nopretty: bool = False, debug: bool = False, **kw) -> Tuple[bool, str]:
12def show(
13        self,
14        nopretty: bool = False,
15        debug: bool = False,
16        **kw
17    ) -> SuccessTuple:
18    """
19    Show attributes of a Pipe.
20
21    Parameters
22    ----------
23    nopretty: bool, default False
24        If `True`, simply print the JSON of the pipe's attributes.
25
26    debug: bool, default False
27        Verbosity toggle.
28
29    Returns
30    -------
31    A `SuccessTuple` of success, message.
32
33    """
34    import json
35    from meerschaum.utils.formatting import (
36        pprint, make_header, ANSI, highlight_pipes, fill_ansi, get_console,
37    )
38    from meerschaum.utils.packages import import_rich, attempt_import
39    from meerschaum.utils.warnings import info
40    attributes_json = json.dumps(self.attributes)
41    if not nopretty:
42        _to_print = f"Attributes for {self}:"
43        if ANSI:
44            _to_print = fill_ansi(highlight_pipes(make_header(_to_print)), 'magenta')
45            print(_to_print)
46            rich = import_rich()
47            rich_json = attempt_import('rich.json')
48            get_console().print(rich_json.JSON(attributes_json))
49        else:
50            print(_to_print)
51    else:
52        print(attributes_json)
53
54    return True, "Success"

Show attributes of a Pipe.

Parameters
  • nopretty (bool, default False): If True, simply print the JSON of the pipe's attributes.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A SuccessTuple of success, message.
def edit( self, patch: bool = False, interactive: bool = False, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
21def edit(
22        self,
23        patch: bool = False,
24        interactive: bool = False,
25        debug: bool = False,
26        **kw: Any
27    ) -> SuccessTuple:
28    """
29    Edit a Pipe's configuration.
30
31    Parameters
32    ----------
33    patch: bool, default False
34        If `patch` is True, update parameters by cascading rather than overwriting.
35    interactive: bool, default False
36        If `True`, open an editor for the user to make changes to the pipe's YAML file.
37    debug: bool, default False
38        Verbosity toggle.
39
40    Returns
41    -------
42    A `SuccessTuple` of success, message.
43
44    """
45    from meerschaum.utils.venv import Venv
46    from meerschaum.connectors import get_connector_plugin
47
48    if self.temporary:
49        return False, "Cannot edit pipes created with `temporary=True` (read-only)."
50
51    if not interactive:
52        with Venv(get_connector_plugin(self.instance_connector)):
53            return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw)
54    from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
55    from meerschaum.utils.misc import edit_file
56    parameters_filename = str(self) + '.yaml'
57    parameters_path = PIPES_CACHE_RESOURCES_PATH / parameters_filename
58    
59    from meerschaum.utils.yaml import yaml
60
61    edit_text = f"Edit the parameters for {self}"
62    edit_top = '#' * (len(edit_text) + 4)
63    edit_header = edit_top + f'\n# {edit_text} #\n' + edit_top + '\n\n'
64
65    from meerschaum.config import get_config
66    parameters = dict(get_config('pipes', 'parameters', patch=True))
67    from meerschaum.config._patch import apply_patch_to_config
68    parameters = apply_patch_to_config(parameters, self.parameters)
69
70    ### write parameters to yaml file
71    with open(parameters_path, 'w+') as f:
72        f.write(edit_header)
73        yaml.dump(parameters, stream=f, sort_keys=False)
74
75    ### only quit editing if yaml is valid
76    editing = True
77    while editing:
78        edit_file(parameters_path)
79        try:
80            with open(parameters_path, 'r') as f:
81                file_parameters = yaml.load(f.read())
82        except Exception as e:
83            from meerschaum.utils.warnings import warn
84            warn(f"Invalid format defined for '{self}':\n\n{e}")
85            input(f"Press [Enter] to correct the configuration for '{self}': ")
86        else:
87            editing = False
88
89    self.parameters = file_parameters
90
91    if debug:
92        from meerschaum.utils.formatting import pprint
93        pprint(self.parameters)
94
95    with Venv(get_connector_plugin(self.instance_connector)):
96        return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw)

Edit a Pipe's configuration.

Parameters
  • patch (bool, default False): If patch is True, update parameters by cascading rather than overwriting.
  • interactive (bool, default False): If True, open an editor for the user to make changes to the pipe's YAML file.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A SuccessTuple of success, message.
def edit_definition( self, yes: bool = False, noask: bool = False, force: bool = False, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
 99def edit_definition(
100        self,
101        yes: bool = False,
102        noask: bool = False,
103        force: bool = False,
104        debug : bool = False,
105        **kw : Any
106    ) -> SuccessTuple:
107    """
108    Edit a pipe's definition file and update its configuration.
109    **NOTE:** This function is interactive and should not be used in automated scripts!
110
111    Returns
112    -------
113    A `SuccessTuple` of success, message.
114
115    """
116    if self.temporary:
117        return False, "Cannot edit pipes created with `temporary=True` (read-only)."
118
119    from meerschaum.connectors import instance_types
120    if (self.connector is None) or self.connector.type not in instance_types:
121        return self.edit(interactive=True, debug=debug, **kw)
122
123    import json
124    from meerschaum.utils.warnings import info, warn
125    from meerschaum.utils.debug import dprint
126    from meerschaum.config._patch import apply_patch_to_config
127    from meerschaum.utils.misc import edit_file
128
129    _parameters = self.parameters
130    if 'fetch' not in _parameters:
131        _parameters['fetch'] = {}
132
133    def _edit_api():
134        from meerschaum.utils.prompt import prompt, yes_no
135        info(
136            f"Please enter the keys of the source pipe from '{self.connector}'.\n" +
137            "Type 'None' for None, or empty when there is no default. Press [CTRL+C] to skip."
138        )
139
140        _keys = { 'connector_keys' : None, 'metric_key' : None, 'location_key' : None }
141        for k in _keys:
142            _keys[k] = _parameters['fetch'].get(k, None)
143
144        for k, v in _keys.items():
145            try:
146                _keys[k] = prompt(k.capitalize().replace('_', ' ') + ':', icon=True, default=v)
147            except KeyboardInterrupt:
148                continue
149            if _keys[k] in ('', 'None', '\'None\'', '[None]'):
150                _keys[k] = None
151
152        _parameters['fetch'] = apply_patch_to_config(_parameters['fetch'], _keys)
153
154        info("You may optionally specify additional filter parameters as JSON.")
155        print("  Parameters are translated into a 'WHERE x AND y' clause, and lists are IN clauses.")
156        print("  For example, the following JSON would correspond to 'WHERE x = 1 AND y IN (2, 3)':")
157        print(json.dumps({'x': 1, 'y': [2, 3]}, indent=2, separators=(',', ': ')))
158        if force or yes_no(
159            "Would you like to add additional filter parameters?",
160            yes=yes, noask=noask
161        ):
162            from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
163            definition_filename = str(self) + '.json'
164            definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename
165            try:
166                definition_path.touch()
167                with open(definition_path, 'w+') as f:
168                    json.dump(_parameters.get('fetch', {}).get('params', {}), f, indent=2)
169            except Exception as e:
170                return False, f"Failed writing file '{definition_path}':\n" + str(e)
171
172            _params = None
173            while True:
174                edit_file(definition_path)
175                try:
176                    with open(definition_path, 'r') as f:
177                        _params = json.load(f)
178                except Exception as e:
179                    warn(f'Failed to read parameters JSON:\n{e}', stack=False)
180                    if force or yes_no(
181                        "Would you like to try again?\n  "
182                        + "If not, the parameters JSON file will be ignored.",
183                        noask=noask, yes=yes
184                    ):
185                        continue
186                    _params = None
187                break
188            if _params is not None:
189                if 'fetch' not in _parameters:
190                    _parameters['fetch'] = {}
191                _parameters['fetch']['params'] = _params
192
193        self.parameters = _parameters
194        return True, "Success"
195
196    def _edit_sql():
197        import pathlib, os, textwrap
198        from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
199        from meerschaum.utils.misc import edit_file
200        definition_filename = str(self) + '.sql'
201        definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename
202
203        sql_definition = _parameters['fetch'].get('definition', None)
204        if sql_definition is None:
205            sql_definition = ''
206        sql_definition = textwrap.dedent(sql_definition).lstrip()
207
208        try:
209            definition_path.touch()
210            with open(definition_path, 'w+') as f:
211                f.write(sql_definition)
212        except Exception as e:
213            return False, f"Failed writing file '{definition_path}':\n" + str(e)
214
215        edit_file(definition_path)
216        try:
217            with open(definition_path, 'r') as f:
218                file_definition = f.read()
219        except Exception as e:
220            return False, f"Failed reading file '{definition_path}':\n" + str(e)
221
222        if sql_definition == file_definition:
223            return False, f"No changes made to definition for {self}."
224
225        if ' ' not in file_definition:
226            return False, f"Invalid SQL definition for {self}."
227
228        if debug:
229            dprint("Read SQL definition:\n\n" + file_definition)
230        _parameters['fetch']['definition'] = file_definition
231        self.parameters = _parameters
232        return True, "Success"
233
234    locals()['_edit_' + str(self.connector.type)]()
235    return self.edit(interactive=False, debug=debug, **kw)

Edit a pipe's definition file and update its configuration. NOTE: This function is interactive and should not be used in automated scripts!

Returns
  • A SuccessTuple of success, message.
def update(self, *args, **kw) -> Tuple[bool, str]:
13def update(self, *args, **kw) -> SuccessTuple:
14    """
15    Update a pipe's parameters in its instance.
16    """
17    kw['interactive'] = False
18    return self.edit(*args, **kw)

Update a pipe's parameters in its instance.

def sync( self, df: 'Union[pd.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]], InferFetch]' = <class 'meerschaum.core.Pipe._sync.InferFetch'>, begin: Union[datetime.datetime, int, str, NoneType] = '', end: Union[datetime.datetime, int] = None, force: bool = False, retries: int = 10, min_seconds: int = 1, check_existing: bool = True, blocking: bool = True, workers: Optional[int] = None, callback: Optional[Callable[[Tuple[bool, str]], Any]] = None, error_callback: Optional[Callable[[Exception], Any]] = None, chunksize: Optional[int] = -1, sync_chunks: bool = True, debug: bool = False, _inplace: bool = True, **kw: Any) -> Tuple[bool, str]:
 37def sync(
 38        self,
 39        df: Union[
 40            pd.DataFrame,
 41            Dict[str, List[Any]],
 42            List[Dict[str, Any]],
 43            InferFetch
 44        ] = InferFetch,
 45        begin: Union[datetime, int, str, None] = '',
 46        end: Union[datetime, int] = None,
 47        force: bool = False,
 48        retries: int = 10,
 49        min_seconds: int = 1,
 50        check_existing: bool = True,
 51        blocking: bool = True,
 52        workers: Optional[int] = None,
 53        callback: Optional[Callable[[Tuple[bool, str]], Any]] = None,
 54        error_callback: Optional[Callable[[Exception], Any]] = None,
 55        chunksize: Optional[int] = -1,
 56        sync_chunks: bool = True,
 57        debug: bool = False,
 58        _inplace: bool = True,
 59        **kw: Any
 60    ) -> SuccessTuple:
 61    """
 62    Fetch new data from the source and update the pipe's table with new data.
 63    
 64    Get new remote data via fetch, get existing data in the same time period,
 65    and merge the two, only keeping the unseen data.
 66
 67    Parameters
 68    ----------
 69    df: Union[None, pd.DataFrame, Dict[str, List[Any]]], default None
 70        An optional DataFrame to sync into the pipe. Defaults to `None`.
 71
 72    begin: Union[datetime, int, str, None], default ''
 73        Optionally specify the earliest datetime to search for data.
 74
 75    end: Union[datetime, int, str, None], default None
 76        Optionally specify the latest datetime to search for data.
 77
 78    force: bool, default False
 79        If `True`, keep trying to sync untul `retries` attempts.
 80
 81    retries: int, default 10
 82        If `force`, how many attempts to try syncing before declaring failure.
 83
 84    min_seconds: Union[int, float], default 1
 85        If `force`, how many seconds to sleep between retries. Defaults to `1`.
 86
 87    check_existing: bool, default True
 88        If `True`, pull and diff with existing data from the pipe.
 89
 90    blocking: bool, default True
 91        If `True`, wait for sync to finish and return its result, otherwise
 92        asyncronously sync (oxymoron?) and return success. Defaults to `True`.
 93        Only intended for specific scenarios.
 94
 95    workers: Optional[int], default None
 96        If provided and the instance connector is thread-safe
 97        (`pipe.instance_connector.IS_THREAD_SAFE is True`),
 98        limit concurrent sync to this many threads.
 99
100    callback: Optional[Callable[[Tuple[bool, str]], Any]], default None
101        Callback function which expects a SuccessTuple as input.
102        Only applies when `blocking=False`.
103
104    error_callback: Optional[Callable[[Exception], Any]], default None
105        Callback function which expects an Exception as input.
106        Only applies when `blocking=False`.
107
108    chunksize: int, default -1
109        Specify the number of rows to sync per chunk.
110        If `-1`, resort to system configuration (default is `900`).
111        A `chunksize` of `None` will sync all rows in one transaction.
112
113    sync_chunks: bool, default True
114        If possible, sync chunks while fetching them into memory.
115
116    debug: bool, default False
117        Verbosity toggle. Defaults to False.
118
119    Returns
120    -------
121    A `SuccessTuple` of success (`bool`) and message (`str`).
122    """
123    from meerschaum.utils.debug import dprint, _checkpoint
124    from meerschaum.connectors import custom_types
125    from meerschaum.plugins import Plugin
126    from meerschaum.utils.formatting import get_console
127    from meerschaum.utils.venv import Venv
128    from meerschaum.connectors import get_connector_plugin
129    from meerschaum.utils.misc import df_is_chunk_generator
130    from meerschaum.utils.pool import get_pool
131    from meerschaum.config import get_config
132
133    if (callback is not None or error_callback is not None) and blocking:
134        warn("Callback functions are only executed when blocking = False. Ignoring...")
135
136    _checkpoint(_total=2, **kw)
137
138    if chunksize == 0:
139        chunksize = None
140        sync_chunks = False
141
142    kw.update({
143        'begin': begin,
144        'end': end,
145        'force': force,
146        'retries': retries,
147        'min_seconds': min_seconds,
148        'check_existing': check_existing,
149        'blocking': blocking,
150        'workers': workers,
151        'callback': callback,
152        'error_callback': error_callback,
153        'sync_chunks': sync_chunks,
154        'chunksize': chunksize,
155    })
156
157    ### NOTE: Invalidate `_exists` cache before and after syncing.
158    self._exists = None
159
160    def _sync(
161        p: 'meerschaum.Pipe',
162        df: Union[
163            'pd.DataFrame',
164            Dict[str, List[Any]],
165            List[Dict[str, Any]],
166            InferFetch
167        ] = InferFetch,
168    ) -> SuccessTuple:
169        if df is None:
170            p._exists = None
171            return (
172                False,
173                f"You passed `None` instead of data into `sync()` for {p}.\n"
174                + "Omit the DataFrame to infer fetching.",
175            )
176        ### Ensure that Pipe is registered.
177        if not p.temporary and p.get_id(debug=debug) is None:
178            ### NOTE: This may trigger an interactive session for plugins!
179            register_success, register_msg = p.register(debug=debug)
180            if not register_success:
181                if 'already' not in register_msg:
182                    p._exists = None
183                    return register_success, register_msg
184
185        ### If connector is a plugin with a `sync()` method, return that instead.
186        ### If the plugin does not have a `sync()` method but does have a `fetch()` method,
187        ### use that instead.
188        ### NOTE: The DataFrame must be omitted for the plugin sync method to apply.
189        ### If a DataFrame is provided, continue as expected.
190        if hasattr(df, 'MRSM_INFER_FETCH'):                   
191            try:
192                if p.connector is None:
193                    msg = f"{p} does not have a valid connector."
194                    if p.connector_keys.startswith('plugin:'):
195                        msg += f"\n    Perhaps {p.connector_keys} has a syntax error?"
196                    p._exists = None
197                    return False, msg
198            except Exception as e:
199                p._exists = None
200                return False, f"Unable to create the connector for {p}."
201
202            ### Sync in place if this is a SQL pipe.
203            if (
204                str(self.connector) == str(self.instance_connector)
205                and 
206                hasattr(self.instance_connector, 'sync_pipe_inplace')
207                and
208                _inplace
209                and
210                get_config('system', 'experimental', 'inplace_sync')
211            ):
212                with Venv(get_connector_plugin(self.instance_connector)):
213                    p._exists = None
214                    return self.instance_connector.sync_pipe_inplace(p, debug=debug, **kw)
215
216
217            ### Activate and invoke `sync(pipe)` for plugin connectors with `sync` methods.
218            try:
219                if p.connector.type == 'plugin' and p.connector.sync is not None:
220                    connector_plugin = Plugin(p.connector.label)
221                    with Venv(connector_plugin, debug=debug):
222                        return_tuple = p.connector.sync(p, debug=debug, **kw)
223                    p._exists = None
224                    if not isinstance(return_tuple, tuple):
225                        return_tuple = (
226                            False,
227                            f"Plugin '{p.connector.label}' returned non-tuple value: {return_tuple}"
228                        )
229                    return return_tuple
230
231            except Exception as e:
232                get_console().print_exception()
233                msg = f"Failed to sync {p} with exception: '" + str(e) + "'"
234                if debug:
235                    error(msg, silent=False)
236                p._exists = None
237                return False, msg
238
239            ### Fetch the dataframe from the connector's `fetch()` method.
240            try:
241                with Venv(get_connector_plugin(p.connector), debug=debug):
242                    df = p.fetch(debug=debug, **kw)
243
244            except Exception as e:
245                get_console().print_exception(
246                    suppress = [
247                        'meerschaum/core/Pipe/_sync.py', 
248                        'meerschaum/core/Pipe/_fetch.py', 
249                    ]
250                )
251                msg = f"Failed to fetch data from {p.connector}:\n    {e}"
252                df = None
253
254            if df is None:
255                p._exists = None
256                return False, f"No data were fetched for {p}."
257
258            if isinstance(df, list):
259                if len(df) == 0:
260                    return True, f"No new rows were returned for {p}."
261
262                ### May be a chunk hook results list.
263                if isinstance(df[0], tuple):
264                    success = all([_success for _success, _ in df])
265                    message = '\n'.join([_message for _, _message in df])
266                    return success, message
267
268            ### TODO: Depreciate async?
269            if df is True:
270                p._exists = None
271                return True, f"{p} is being synced in parallel."
272
273        ### CHECKPOINT: Retrieved the DataFrame.
274        _checkpoint(**kw)
275        
276        ### Allow for dataframe generators or iterables.
277        if df_is_chunk_generator(df):
278            kw['workers'] = p.get_num_workers(kw.get('workers', None))
279            dt_col = p.columns.get('datetime', None)
280            pool = get_pool(workers=kw.get('workers', 1))
281            if debug:
282                dprint(f"Received {type(df)}. Attempting to sync first chunk...")
283
284            try:
285                chunk = next(df)
286            except StopIteration:
287                return True, "Received an empty generator; nothing to do."
288
289            chunk_success, chunk_msg = _sync(p, chunk)
290            chunk_msg = '\n' + self._get_chunk_label(chunk, dt_col) + '\n' + chunk_msg
291            if not chunk_success:
292                return chunk_success, f"Unable to sync initial chunk for {p}:\n{chunk_msg}"
293            if debug:
294                dprint(f"Successfully synced the first chunk, attemping the rest...")
295
296            failed_chunks = []
297            def _process_chunk(_chunk):
298                try:
299                    _chunk_success, _chunk_msg = _sync(p, _chunk)
300                except Exception as e:
301                    _chunk_success, _chunk_msg = False, str(e)
302                if not _chunk_success:
303                    failed_chunks.append(_chunk)
304                return (
305                    _chunk_success,
306                    (
307                        '\n'
308                        + self._get_chunk_label(_chunk, dt_col)
309                        + '\n'
310                        + _chunk_msg
311                    )
312                )
313
314
315            results = sorted(
316                [(chunk_success, chunk_msg)] + (
317                    list(pool.imap(_process_chunk, df))
318                    if not df_is_chunk_generator(chunk)
319                    else [
320                        _process_chunk(_child_chunks)
321                        for _child_chunks in df
322                    ]
323                )
324            )
325            chunk_messages = [chunk_msg for _, chunk_msg in results]
326            success_bools = [chunk_success for chunk_success, _ in results]
327            success = all(success_bools)
328            msg = '\n'.join(chunk_messages)
329
330            ### If some chunks succeeded, retry the failures.
331            retry_success = True
332            if not success and any(success_bools):
333                if debug:
334                    dprint(f"Retrying failed chunks...")
335                chunks_to_retry = [c for c in failed_chunks]
336                failed_chunks = []
337                for chunk in chunks_to_retry:
338                    chunk_success, chunk_msg = _process_chunk(chunk)
339                    msg += f"\n\nRetried chunk:\n{chunk_msg}\n"
340                    retry_success = retry_success and chunk_success
341
342            success = success and retry_success
343            return success, msg
344
345        ### Cast to a dataframe and ensure datatypes are what we expect.
346        df = self.enforce_dtypes(df, chunksize=chunksize, debug=debug)
347        if debug:
348            dprint(
349                "DataFrame to sync:\n"
350                + (
351                    str(df)[:255]
352                    + '...'
353                    if len(str(df)) >= 256
354                    else str(df)
355                ),
356                **kw
357            )
358
359        ### if force, continue to sync until success
360        return_tuple = False, f"Did not sync {p}."
361        run = True
362        _retries = 1
363        while run:
364            with Venv(get_connector_plugin(self.instance_connector)):
365                return_tuple = p.instance_connector.sync_pipe(
366                    pipe = p,
367                    df = df,
368                    debug = debug,
369                    **kw
370                )
371            _retries += 1
372            run = (not return_tuple[0]) and force and _retries <= retries
373            if run and debug:
374                dprint(f"Syncing failed for {p}. Attempt ( {_retries} / {retries} )", **kw)
375                dprint(f"Sleeping for {min_seconds} seconds...", **kw)
376                time.sleep(min_seconds)
377            if _retries > retries:
378                warn(
379                    f"Unable to sync {p} within {retries} attempt" +
380                        ("s" if retries != 1 else "") + "!"
381                )
382
383        ### CHECKPOINT: Finished syncing. Handle caching.
384        _checkpoint(**kw)
385        if self.cache_pipe is not None:
386            if debug:
387                dprint(f"Caching retrieved dataframe.", **kw)
388                _sync_cache_tuple = self.cache_pipe.sync(df, debug=debug, **kw)
389                if not _sync_cache_tuple[0]:
390                    warn(f"Failed to sync local cache for {self}.")
391
392        self._exists = None
393        return return_tuple
394
395    if blocking:
396        self._exists = None
397        return _sync(self, df = df)
398
399    from meerschaum.utils.threading import Thread
400    def default_callback(result_tuple : SuccessTuple):
401        dprint(f"Asynchronous result from {self}: {result_tuple}", **kw)
402
403    def default_error_callback(x : Exception):
404        dprint(f"Error received for {self}: {x}", **kw)
405
406    if callback is None and debug:
407        callback = default_callback
408    if error_callback is None and debug:
409        error_callback = default_error_callback
410    try:
411        thread = Thread(
412            target = _sync,
413            args = (self,),
414            kwargs = {'df' : df},
415            daemon = False,
416            callback = callback,
417            error_callback = error_callback
418        )
419        thread.start()
420    except Exception as e:
421        self._exists = None
422        return False, str(e)
423
424    self._exists = None
425    return True, f"Spawned asyncronous sync for {self}."

Fetch new data from the source and update the pipe's table with new data.

Get new remote data via fetch, get existing data in the same time period, and merge the two, only keeping the unseen data.

Parameters
  • df (Union[None, pd.DataFrame, Dict[str, List[Any]]], default None): An optional DataFrame to sync into the pipe. Defaults to None.
  • begin (Union[datetime, int, str, None], default ''): Optionally specify the earliest datetime to search for data.
  • end (Union[datetime, int, str, None], default None): Optionally specify the latest datetime to search for data.
  • force (bool, default False): If True, keep trying to sync untul retries attempts.
  • retries (int, default 10): If force, how many attempts to try syncing before declaring failure.
  • min_seconds (Union[int, float], default 1): If force, how many seconds to sleep between retries. Defaults to 1.
  • check_existing (bool, default True): If True, pull and diff with existing data from the pipe.
  • blocking (bool, default True): If True, wait for sync to finish and return its result, otherwise asyncronously sync (oxymoron?) and return success. Defaults to True. Only intended for specific scenarios.
  • workers (Optional[int], default None): If provided and the instance connector is thread-safe (pipe.instance_connector.IS_THREAD_SAFE is True), limit concurrent sync to this many threads.
  • callback (Optional[Callable[[Tuple[bool, str]], Any]], default None): Callback function which expects a SuccessTuple as input. Only applies when blocking=False.
  • error_callback (Optional[Callable[[Exception], Any]], default None): Callback function which expects an Exception as input. Only applies when blocking=False.
  • chunksize (int, default -1): Specify the number of rows to sync per chunk. If -1, resort to system configuration (default is 900). A chunksize of None will sync all rows in one transaction.
  • sync_chunks (bool, default True): If possible, sync chunks while fetching them into memory.
  • debug (bool, default False): Verbosity toggle. Defaults to False.
Returns
  • A SuccessTuple of success (bool) and message (str).
def get_sync_time( self, params: Optional[Dict[str, Any]] = None, newest: bool = True, round_down: bool = False, debug: bool = False) -> Optional[datetime.datetime]:
428def get_sync_time(
429        self,
430        params: Optional[Dict[str, Any]] = None,
431        newest: bool = True,
432        round_down: bool = False, 
433        debug: bool = False
434    ) -> Union['datetime', None]:
435    """
436    Get the most recent datetime value for a Pipe.
437
438    Parameters
439    ----------
440    params: Optional[Dict[str, Any]], default None
441        Dictionary to build a WHERE clause for a specific column.
442        See `meerschaum.utils.sql.build_where`.
443
444    newest: bool, default True
445        If `True`, get the most recent datetime (honoring `params`).
446        If `False`, get the oldest datetime (`ASC` instead of `DESC`).
447
448    round_down: bool, default False
449        If `True`, round down the datetime value to the nearest minute.
450
451    debug: bool, default False
452        Verbosity toggle.
453
454    Returns
455    -------
456    A `datetime` object if the pipe exists, otherwise `None`.
457
458    """
459    from meerschaum.utils.venv import Venv
460    from meerschaum.connectors import get_connector_plugin
461    from meerschaum.utils.misc import round_time
462
463    with Venv(get_connector_plugin(self.instance_connector)):
464        sync_time = self.instance_connector.get_sync_time(
465            self,
466            params = params,
467            newest = newest,
468            debug = debug,
469        )
470
471    if not round_down or not isinstance(sync_time, datetime):
472        return sync_time
473
474    return round_time(sync_time, timedelta(minutes=1))

Get the most recent datetime value for a Pipe.

Parameters
  • params (Optional[Dict[str, Any]], default None): Dictionary to build a WHERE clause for a specific column. See meerschaum.utils.sql.build_where.
  • newest (bool, default True): If True, get the most recent datetime (honoring params). If False, get the oldest datetime (ASC instead of DESC).
  • round_down (bool, default False): If True, round down the datetime value to the nearest minute.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A datetime object if the pipe exists, otherwise None.
def exists(self, debug: bool = False) -> bool:
477def exists(
478        self,
479        debug : bool = False
480    ) -> bool:
481    """
482    See if a Pipe's table exists.
483
484    Parameters
485    ----------
486    debug: bool, default False
487        Verbosity toggle.
488
489    Returns
490    -------
491    A `bool` corresponding to whether a pipe's underlying table exists.
492
493    """
494    import time
495    from meerschaum.utils.venv import Venv
496    from meerschaum.connectors import get_connector_plugin
497    from meerschaum.config import STATIC_CONFIG
498    from meerschaum.utils.debug import dprint
499    now = time.perf_counter()
500    exists_timeout_seconds = STATIC_CONFIG['pipes']['exists_timeout_seconds']
501
502    _exists = self.__dict__.get('_exists', None)
503    if _exists:
504        exists_timestamp = self.__dict__.get('_exists_timestamp', None)
505        if exists_timestamp is not None:
506            delta = now - exists_timestamp
507            if delta < exists_timeout_seconds:
508                if debug:
509                    dprint(f"Returning cached `exists` for {self} ({round(delta, 2)} seconds old).")
510                return _exists
511
512    with Venv(get_connector_plugin(self.instance_connector)):
513        _exists = self.instance_connector.pipe_exists(pipe=self, debug=debug)
514
515    self.__dict__['_exists'] = _exists
516    self.__dict__['_exists_timestamp'] = now
517    return _exists

See if a Pipe's table exists.

Parameters
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool corresponding to whether a pipe's underlying table exists.
def filter_existing( self, df: "'pd.DataFrame'", safe_copy: bool = True, date_bound_only: bool = False, chunksize: Optional[int] = -1, debug: bool = False, **kw) -> "Tuple['pd.DataFrame', 'pd.DataFrame', 'pd.DataFrame']":
520def filter_existing(
521        self,
522        df: 'pd.DataFrame',
523        safe_copy: bool = True,
524        date_bound_only: bool = False,
525        chunksize: Optional[int] = -1,
526        debug: bool = False,
527        **kw
528    ) -> Tuple['pd.DataFrame', 'pd.DataFrame', 'pd.DataFrame']:
529    """
530    Inspect a dataframe and filter out rows which already exist in the pipe.
531
532    Parameters
533    ----------
534    df: 'pd.DataFrame'
535        The dataframe to inspect and filter.
536        
537    safe_copy: bool, default True
538        If `True`, create a copy before comparing and modifying the dataframes.
539        Setting to `False` may mutate the DataFrames.
540        See `meerschaum.utils.dataframe.filter_unseen_df`.
541
542    date_bound_only: bool, default False
543        If `True`, only use the datetime index to fetch the sample dataframe.
544
545    chunksize: Optional[int], default -1
546        The `chunksize` used when fetching existing data.
547
548    debug: bool, default False
549        Verbosity toggle.
550
551    Returns
552    -------
553    A tuple of three pandas DataFrames: unseen, update, and delta.
554    """
555    from meerschaum.utils.warnings import warn
556    from meerschaum.utils.debug import dprint
557    from meerschaum.utils.packages import attempt_import, import_pandas
558    from meerschaum.utils.misc import round_time
559    from meerschaum.utils.dataframe import (
560        filter_unseen_df,
561        add_missing_cols_to_df,
562        get_unhashable_cols,
563        get_numeric_cols,
564    )
565    from meerschaum.utils.dtypes import (
566        to_pandas_dtype,
567        none_if_null,
568    )
569    from meerschaum.config import get_config
570    pd = import_pandas()
571    pandas = attempt_import('pandas')
572    if not 'dataframe' in str(type(df)).lower():
573        df = self.enforce_dtypes(df, chunksize=chunksize, debug=debug)
574    is_dask = 'dask' in df.__module__
575    if is_dask:
576        dd = attempt_import('dask.dataframe')
577        merge = dd.merge
578        NA = pandas.NA
579    else:
580        merge = pd.merge
581        NA = pd.NA
582    if df is None:
583        return df, df, df
584    if (df.empty if not is_dask else len(df) == 0):
585        return df, df, df
586
587    ### begin is the oldest data in the new dataframe
588    begin, end = None, None
589    dt_col = self.columns.get('datetime', None)
590    dt_type = self.dtypes.get(dt_col, 'datetime64[ns]') if dt_col else None
591    try:
592        min_dt_val = df[dt_col].min(skipna=True) if dt_col else None
593        if is_dask and min_dt_val is not None:
594            min_dt_val = min_dt_val.compute()
595        min_dt = (
596            pandas.to_datetime(min_dt_val).to_pydatetime()
597            if min_dt_val is not None and 'datetime' in str(dt_type)
598            else min_dt_val
599        )
600    except Exception as e:
601        min_dt = None
602    if not ('datetime' in str(type(min_dt))) or str(min_dt) == 'NaT':
603        if 'int' not in str(type(min_dt)).lower():
604            min_dt = None
605
606    if isinstance(min_dt, datetime):
607        begin = (
608            round_time(
609                min_dt,
610                to = 'down'
611            ) - timedelta(minutes=1)
612        )
613    elif dt_type and 'int' in dt_type.lower():
614        begin = min_dt
615    elif dt_col is None:
616        begin = None
617
618    ### end is the newest data in the new dataframe
619    try:
620        max_dt_val = df[dt_col].max(skipna=True) if dt_col else None
621        if is_dask and max_dt_val is not None:
622            max_dt_val = max_dt_val.compute()
623        max_dt = (
624            pandas.to_datetime(max_dt_val).to_pydatetime()
625            if max_dt_val is not None and 'datetime' in str(dt_type)
626            else max_dt_val
627        )
628    except Exception as e:
629        import traceback
630        traceback.print_exc()
631        max_dt = None
632
633    if ('datetime' not in str(type(max_dt))) or str(min_dt) == 'NaT':
634        if 'int' not in str(type(max_dt)).lower():
635            max_dt = None
636
637    if isinstance(max_dt, datetime):
638        end = (
639            round_time(
640                max_dt,
641                to = 'down'
642            ) + timedelta(minutes=1)
643        )
644    elif dt_type and 'int' in dt_type.lower():
645        end = max_dt + 1
646
647    if max_dt is not None and min_dt is not None and min_dt > max_dt:
648        warn(f"Detected minimum datetime greater than maximum datetime.")
649
650    if begin is not None and end is not None and begin > end:
651        if isinstance(begin, datetime):
652            begin = end - timedelta(minutes=1)
653        ### We might be using integers for the datetime axis.
654        else:
655            begin = end - 1
656
657    unique_index_vals = {
658        col: df[col].unique()
659        for col in self.columns
660        if col in df.columns and col != dt_col
661    } if not date_bound_only else {}
662    filter_params_index_limit = get_config('pipes', 'sync', 'filter_params_index_limit')
663    _ = kw.pop('params', None)
664    params = {
665        col: [
666            none_if_null(val)
667            for val in unique_vals
668        ]
669        for col, unique_vals in unique_index_vals.items()
670        if len(unique_vals) <= filter_params_index_limit
671    } if not date_bound_only else {}
672
673    if debug:
674        dprint(f"Looking at data between '{begin}' and '{end}':", **kw)
675
676    backtrack_df = self.get_data(
677        begin = begin,
678        end = end,
679        chunksize = chunksize,
680        params = params,
681        debug = debug,
682        **kw
683    )
684    if debug:
685        dprint(f"Existing data for {self}:\n" + str(backtrack_df), **kw)
686        dprint(f"Existing dtypes for {self}:\n" + str(backtrack_df.dtypes))
687
688    ### Separate new rows from changed ones.
689    on_cols = [
690        col for col_key, col in self.columns.items()
691        if (
692            col
693            and
694            col_key != 'value'
695            and col in backtrack_df.columns
696        )
697    ]
698    self_dtypes = self.dtypes
699    on_cols_dtypes = {
700        col: to_pandas_dtype(typ)
701        for col, typ in self_dtypes.items()
702        if col in on_cols
703    }
704
705    ### Detect changes between the old target and new source dataframes.
706    delta_df = add_missing_cols_to_df(
707        filter_unseen_df(
708            backtrack_df,
709            df,
710            dtypes = {
711                col: to_pandas_dtype(typ)
712                for col, typ in self_dtypes.items()
713            },
714            safe_copy = safe_copy,
715            debug = debug
716        ),
717        on_cols_dtypes,
718    )
719
720    ### Cast dicts or lists to strings so we can merge.
721    serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str)
722    def deserializer(x):
723        return json.loads(x) if isinstance(x, str) else x
724
725    unhashable_delta_cols = get_unhashable_cols(delta_df)
726    unhashable_backtrack_cols = get_unhashable_cols(backtrack_df)
727    for col in unhashable_delta_cols:
728        delta_df[col] = delta_df[col].apply(serializer)
729    for col in unhashable_backtrack_cols:
730        backtrack_df[col] = backtrack_df[col].apply(serializer)
731    casted_cols = set(unhashable_delta_cols + unhashable_backtrack_cols)
732
733    joined_df = merge(
734        delta_df.fillna(NA),
735        backtrack_df.fillna(NA),
736        how = 'left',
737        on = on_cols,
738        indicator = True,
739        suffixes = ('', '_old'),
740    ) if on_cols else delta_df
741    for col in casted_cols:
742        if col in joined_df.columns:
743            joined_df[col] = joined_df[col].apply(deserializer)
744        if col in delta_df.columns:
745            delta_df[col] = delta_df[col].apply(deserializer)
746
747    ### Determine which rows are completely new.
748    new_rows_mask = (joined_df['_merge'] == 'left_only') if on_cols else None
749    cols = list(backtrack_df.columns)
750
751    unseen_df = (
752        (
753            joined_df
754            .where(new_rows_mask)
755            .dropna(how='all')[cols]
756            .reset_index(drop=True)
757        ) if not is_dask else (
758            joined_df
759            .where(new_rows_mask)
760            .dropna(how='all')[cols]
761            .reset_index(drop=True)
762        )
763    ) if on_cols else delta_df
764
765    ### Rows that have already been inserted but values have changed.
766    update_df = (
767        joined_df
768        .where(~new_rows_mask)
769        .dropna(how='all')[cols]
770        .reset_index(drop=True)
771    ) if on_cols else None
772
773    return unseen_df, update_df, delta_df

Inspect a dataframe and filter out rows which already exist in the pipe.

Parameters
  • df ('pd.DataFrame'): The dataframe to inspect and filter.
  • safe_copy (bool, default True): If True, create a copy before comparing and modifying the dataframes. Setting to False may mutate the DataFrames. See meerschaum.utils.dataframe.filter_unseen_df.
  • date_bound_only (bool, default False): If True, only use the datetime index to fetch the sample dataframe.
  • chunksize (Optional[int], default -1): The chunksize used when fetching existing data.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A tuple of three pandas DataFrames (unseen, update, and delta.):
def get_num_workers(self, workers: Optional[int] = None) -> int:
798def get_num_workers(self, workers: Optional[int] = None) -> int:
799    """
800    Get the number of workers to use for concurrent syncs.
801
802    Parameters
803    ----------
804    The number of workers passed via `--workers`.
805
806    Returns
807    -------
808    The number of workers, capped for safety.
809    """
810    is_thread_safe = getattr(self.instance_connector, 'IS_THREAD_SAFE', False)
811    if not is_thread_safe:
812        return 1
813
814    engine_pool_size = (
815        self.instance_connector.engine.pool.size()
816        if self.instance_connector.type == 'sql'
817        else None
818    )
819    current_num_threads = threading.active_count()
820    current_num_connections = (
821        self.instance_connector.engine.pool.checkedout()
822        if engine_pool_size is not None
823        else current_num_threads
824    )
825    desired_workers = (
826        min(workers or engine_pool_size, engine_pool_size)
827        if engine_pool_size is not None
828        else workers
829    )
830    if desired_workers is None:
831        desired_workers = (multiprocessing.cpu_count() if is_thread_safe else 1)
832
833    return max(
834        (desired_workers - current_num_connections),
835        1,
836    )

Get the number of workers to use for concurrent syncs.

Parameters
  • The number of workers passed via --workers.
Returns
  • The number of workers, capped for safety.
def verify( self, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, params: Optional[Dict[str, Any]] = None, chunk_interval: Union[datetime.timedelta, int, NoneType] = None, bounded: Optional[bool] = None, deduplicate: bool = False, workers: Optional[int] = None, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
 15def verify(
 16        self,
 17        begin: Union[datetime, int, None] = None,
 18        end: Union[datetime, int, None] = None,
 19        params: Optional[Dict[str, Any]] = None,
 20        chunk_interval: Union[timedelta, int, None] = None,
 21        bounded: Optional[bool] = None,
 22        deduplicate: bool = False,
 23        workers: Optional[int] = None,
 24        debug: bool = False,
 25        **kwargs: Any
 26    ) -> SuccessTuple:
 27    """
 28    Verify the contents of the pipe by resyncing its interval.
 29
 30    Parameters
 31    ----------
 32    begin: Union[datetime, int, None], default None
 33        If specified, only verify rows greater than or equal to this value.
 34
 35    end: Union[datetime, int, None], default None
 36        If specified, only verify rows less than this value.
 37
 38    chunk_interval: Union[timedelta, int, None], default None
 39        If provided, use this as the size of the chunk boundaries.
 40        Default to the value set in `pipe.parameters['chunk_minutes']` (1440).
 41
 42    bounded: Optional[bool], default None
 43        If `True`, do not verify older than the oldest sync time or newer than the newest.
 44        If `False`, verify unbounded syncs outside of the new and old sync times.
 45        The default behavior (`None`) is to bound only if a bound interval is set
 46        (e.g. `pipe.parameters['verify']['bound_days']`).
 47
 48    deduplicate: bool, default False
 49        If `True`, deduplicate the pipe's table after the verification syncs.
 50
 51    workers: Optional[int], default None
 52        If provided, limit the verification to this many threads.
 53        Use a value of `1` to sync chunks in series.
 54
 55    debug: bool, default False
 56        Verbosity toggle.
 57
 58    kwargs: Any
 59        All keyword arguments are passed to `pipe.sync()`.
 60
 61    Returns
 62    -------
 63    A SuccessTuple indicating whether the pipe was successfully resynced.
 64    """
 65    from meerschaum.utils.pool import get_pool
 66    from meerschaum.utils.misc import interval_str
 67    workers = self.get_num_workers(workers)
 68
 69    ### Skip configured bounding in parameters
 70    ### if `bounded` is explicitly `False`.
 71    bound_time = (
 72        self.get_bound_time(debug=debug)
 73        if bounded is not False
 74        else None
 75    )
 76    if bounded is None:
 77        bounded = bound_time is not None
 78
 79    if bounded and begin is None:
 80        begin = (
 81            bound_time
 82            if bound_time is not None
 83            else self.get_sync_time(newest=False, debug=debug)
 84        )
 85    if bounded and end is None:
 86        end = self.get_sync_time(newest=True, debug=debug)
 87
 88    if bounded and end is not None:
 89        end += (
 90            timedelta(minutes=1)
 91            if isinstance(end, datetime)
 92            else 1
 93        )
 94
 95    sync_less_than_begin = not bounded and begin is None
 96    sync_greater_than_end = not bounded and end is None
 97
 98    cannot_determine_bounds = not self.exists(debug=debug)
 99
100    if cannot_determine_bounds:
101        sync_success, sync_msg = self.sync(
102            begin = begin,
103            end = end,
104            params = params,
105            workers = workers,
106            debug = debug,
107            **kwargs
108        )
109        if not sync_success:
110            return sync_success, sync_msg
111        if deduplicate:
112            return self.deduplicate(
113                begin = begin,
114                end = end,
115                params = params,
116                workers = workers,
117                debug = debug,
118                **kwargs
119            )
120        return sync_success, sync_msg
121
122
123    chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug)
124    chunk_bounds = self.get_chunk_bounds(
125        begin = begin,
126        end = end,
127        chunk_interval = chunk_interval,
128        bounded = bounded,
129        debug = debug,
130    )
131
132    ### Consider it a success if no chunks need to be verified.
133    if not chunk_bounds:
134        if deduplicate:
135            return self.deduplicate(
136                begin = begin,
137                end = end,
138                params = params,
139                workers = workers,
140                debug = debug,
141                **kwargs
142            )
143        return True, f"Could not determine chunks between '{begin}' and '{end}'; nothing to do."
144
145    begin_to_print = (
146        begin
147        if begin is not None
148        else (
149            chunk_bounds[0][0]
150            if bounded
151            else chunk_bounds[0][1]
152        )
153    )
154    end_to_print = (
155        end
156        if end is not None
157        else (
158            chunk_bounds[-1][1]
159            if bounded
160            else chunk_bounds[-1][0]
161        )
162    )
163
164    info(
165        f"Syncing {len(chunk_bounds)} chunk" + ('s' if len(chunk_bounds) != 1 else '')
166        + f" ({'un' if not bounded else ''}bounded)"
167        + f" of size '{interval_str(chunk_interval)}'"
168        + f" between '{begin_to_print}' and '{end_to_print}'."
169    )
170
171    pool = get_pool(workers=workers)
172
173    ### Dictionary of the form bounds -> success_tuple, e.g.:
174    ### {
175    ###    (2023-01-01, 2023-01-02): (True, "Success")
176    ### }
177    bounds_success_tuples = {}
178    def process_chunk_bounds(
179            chunk_begin_and_end: Tuple[
180                Union[int, datetime],
181                Union[int, datetime]
182            ]
183        ):
184        if chunk_begin_and_end in bounds_success_tuples:
185            return chunk_begin_and_end, bounds_success_tuples[chunk_begin_and_end]
186
187        chunk_begin, chunk_end = chunk_begin_and_end
188        return chunk_begin_and_end, self.sync(
189            begin = chunk_begin,
190            end = chunk_end,
191            params = params,
192            workers = workers,
193            debug = debug,
194            **kwargs
195        )
196
197    ### If we have more than one chunk, attempt to sync the first one and return if its fails.
198    if len(chunk_bounds) > 1:
199        first_chunk_bounds = chunk_bounds[0]
200        (
201            (first_begin, first_end),
202            (first_success, first_msg)
203        ) = process_chunk_bounds(first_chunk_bounds)
204        if not first_success:
205            return (
206                first_success,
207                f"\n{first_begin} - {first_end}\n"
208                + f"Failed to sync first chunk:\n{first_msg}"
209            )
210        bounds_success_tuples[first_chunk_bounds] = (first_success, first_msg)
211
212    bounds_success_tuples.update(dict(pool.map(process_chunk_bounds, chunk_bounds)))
213    bounds_success_bools = {bounds: tup[0] for bounds, tup in bounds_success_tuples.items()}
214
215    message_header = f"{begin_to_print} - {end_to_print}"
216    if all(bounds_success_bools.values()):
217        msg = get_chunks_success_message(bounds_success_tuples, header=message_header)
218        if deduplicate:
219            deduplicate_success, deduplicate_msg = self.deduplicate(
220                begin = begin,
221                end = end,
222                params = params,
223                workers = workers,
224                debug = debug,
225                **kwargs
226            )
227            return deduplicate_success, msg + '\n\n' + deduplicate_msg
228        return True, msg
229
230    chunk_bounds_to_resync = [
231        bounds
232        for bounds, success in zip(chunk_bounds, bounds_success_bools)
233        if not success
234    ]
235    bounds_to_print = [
236        f"{bounds[0]} - {bounds[1]}"
237        for bounds in chunk_bounds_to_resync
238    ]
239    if bounds_to_print:
240        warn(
241            f"Will resync the following failed chunks:\n    "
242            + '\n    '.join(bounds_to_print),
243            stack = False,
244        )
245
246    retry_bounds_success_tuples = dict(pool.map(process_chunk_bounds, chunk_bounds_to_resync))
247    bounds_success_tuples.update(retry_bounds_success_tuples)
248    retry_bounds_success_bools = {
249        bounds: tup[0]
250        for bounds, tup in retry_bounds_success_tuples.items()
251    }
252
253    if all(retry_bounds_success_bools.values()):
254        message = (
255            get_chunks_success_message(bounds_success_tuples, header=message_header)
256            + f"\nRetried {len(chunk_bounds_to_resync)} chunks."
257        )
258        if deduplicate:
259            deduplicate_success, deduplicate_msg = self.deduplicate(
260                begin = begin,
261                end = end,
262                params = params,
263                workers = workers,
264                debug = debug,
265                **kwargs
266            )
267            return deduplicate_success, message + '\n\n' + deduplicate_msg
268        return True, message
269
270    message = get_chunks_success_message(bounds_success_tuples, header=message_header)
271    if deduplicate:
272        deduplicate_success, deduplicate_msg = self.deduplicate(
273            begin = begin,
274            end = end,
275            params = params,
276            workers = workers,
277            debug = debug,
278            **kwargs
279        )
280        return deduplicate_success, message + '\n\n' + deduplicate_msg
281    return False, message

Verify the contents of the pipe by resyncing its interval.

Parameters
  • begin (Union[datetime, int, None], default None): If specified, only verify rows greater than or equal to this value.
  • end (Union[datetime, int, None], default None): If specified, only verify rows less than this value.
  • chunk_interval (Union[timedelta, int, None], default None): If provided, use this as the size of the chunk boundaries. Default to the value set in pipe.parameters['chunk_minutes'] (1440).
  • bounded (Optional[bool], default None): If True, do not verify older than the oldest sync time or newer than the newest. If False, verify unbounded syncs outside of the new and old sync times. The default behavior (None) is to bound only if a bound interval is set (e.g. pipe.parameters['verify']['bound_days']).
  • deduplicate (bool, default False): If True, deduplicate the pipe's table after the verification syncs.
  • workers (Optional[int], default None): If provided, limit the verification to this many threads. Use a value of 1 to sync chunks in series.
  • debug (bool, default False): Verbosity toggle.
  • kwargs (Any): All keyword arguments are passed to pipe.sync().
Returns
  • A SuccessTuple indicating whether the pipe was successfully resynced.
def get_bound_interval(self, debug: bool = False) -> Union[datetime.timedelta, int, NoneType]:
353def get_bound_interval(self, debug: bool = False) -> Union[timedelta, int, None]:
354    """
355    Return the interval used to determine the bound time (limit for verification syncs).
356    If the datetime axis is an integer, just return its value.
357
358    Below are the supported keys for the bound interval:
359
360        - `pipe.parameters['verify']['bound_minutes']`
361        - `pipe.parameters['verify']['bound_hours']`
362        - `pipe.parameters['verify']['bound_days']`
363        - `pipe.parameters['verify']['bound_weeks']`
364        - `pipe.parameters['verify']['bound_years']`
365        - `pipe.parameters['verify']['bound_seconds']`
366
367    If multiple keys are present, the first on this priority list will be used.
368
369    Returns
370    -------
371    A `timedelta` or `int` value to be used to determine the bound time.
372    """
373    verify_params = self.parameters.get('verify', {})
374    prefix = 'bound_'
375    suffixes_to_check = ('minutes', 'hours', 'days', 'weeks', 'years', 'seconds')
376    keys_to_search = {
377        key: val
378        for key, val in verify_params.items()
379        if key.startswith(prefix)
380    }
381    bound_time_key, bound_time_value = None, None
382    for key, value in keys_to_search.items():
383        for suffix in suffixes_to_check:
384            if key == prefix + suffix:
385                bound_time_key = key
386                bound_time_value = value
387                break
388        if bound_time_key is not None:
389            break
390
391    if bound_time_value is None:
392        return bound_time_value
393
394    dt_col = self.columns.get('datetime', None)
395    if not dt_col:
396        return bound_time_value
397
398    dt_typ = self.dtypes.get(dt_col, 'datetime64[ns]')
399    if 'int' in dt_typ.lower():
400        return int(bound_time_value)
401
402    interval_type = bound_time_key.replace(prefix, '')
403    return timedelta(**{interval_type: bound_time_value})

Return the interval used to determine the bound time (limit for verification syncs). If the datetime axis is an integer, just return its value.

Below are the supported keys for the bound interval:

- `pipe.parameters['verify']['bound_minutes']`
- `pipe.parameters['verify']['bound_hours']`
- `pipe.parameters['verify']['bound_days']`
- `pipe.parameters['verify']['bound_weeks']`
- `pipe.parameters['verify']['bound_years']`
- `pipe.parameters['verify']['bound_seconds']`

If multiple keys are present, the first on this priority list will be used.

Returns
  • A timedelta or int value to be used to determine the bound time.
def get_bound_time(self, debug: bool = False) -> Union[datetime.datetime, int, NoneType]:
406def get_bound_time(self, debug: bool = False) -> Union[datetime, int, None]:
407    """
408    The bound time is the limit at which long-running verification syncs should stop.
409    A value of `None` means verification syncs should be unbounded.
410
411    Like deriving a backtrack time from `pipe.get_sync_time()`,
412    the bound time is the sync time minus a large window (e.g. 366 days).
413
414    Unbound verification syncs (i.e. `bound_time is None`)
415    if the oldest sync time is less than the bound interval.
416
417    Returns
418    -------
419    A `datetime` or `int` corresponding to the
420    `begin` bound for verification and deduplication syncs.
421    """ 
422    bound_interval = self.get_bound_interval(debug=debug)
423    if bound_interval is None:
424        return None
425
426    sync_time = self.get_sync_time(debug=debug)
427    if sync_time is None:
428        return None
429
430    bound_time = sync_time - bound_interval
431    oldest_sync_time = self.get_sync_time(newest=False, debug=debug)
432
433    return (
434        bound_time
435        if bound_time > oldest_sync_time
436        else None
437    )

The bound time is the limit at which long-running verification syncs should stop. A value of None means verification syncs should be unbounded.

Like deriving a backtrack time from pipe.get_sync_time(), the bound time is the sync time minus a large window (e.g. 366 days).

Unbound verification syncs (i.e. bound_time is None) if the oldest sync time is less than the bound interval.

Returns
  • A datetime or int corresponding to the
  • begin bound for verification and deduplication syncs.
def delete(self, drop: bool = True, debug: bool = False, **kw) -> Tuple[bool, str]:
12def delete(
13        self,
14        drop: bool = True,
15        debug: bool = False,
16        **kw
17    ) -> SuccessTuple:
18    """
19    Call the Pipe's instance connector's `delete_pipe()` method.
20
21    Parameters
22    ----------
23    drop: bool, default True
24        If `True`, drop the pipes' target table.
25
26    debug : bool, default False
27        Verbosity toggle.
28
29    Returns
30    -------
31    A `SuccessTuple` of success (`bool`), message (`str`).
32
33    """
34    import os, pathlib
35    from meerschaum.utils.warnings import warn
36    from meerschaum.utils.venv import Venv
37    from meerschaum.connectors import get_connector_plugin
38
39    if self.temporary:
40        return (
41            False,
42            "Cannot delete pipes created with `temporary=True` (read-only). "
43            + "You may want to call `pipe.drop()` instead."
44        )
45
46    if self.cache_pipe is not None:
47        _drop_cache_tuple = self.cache_pipe.drop(debug=debug, **kw)
48        if not _drop_cache_tuple[0]:
49            warn(_drop_cache_tuple[1])
50        if getattr(self.cache_connector, 'flavor', None) == 'sqlite':
51            _cache_db_path = pathlib.Path(self.cache_connector.database)
52            try:
53                os.remove(_cache_db_path)
54            except Exception as e:
55                warn(f"Could not delete cache file '{_cache_db_path}' for {self}:\n{e}")
56
57    if drop:
58        drop_success, drop_msg = self.drop(debug=debug)
59        if not drop_success:
60            warn(f"Failed to drop {self}:\n{drop_msg}")
61
62    with Venv(get_connector_plugin(self.instance_connector)):
63        result = self.instance_connector.delete_pipe(self, debug=debug, **kw)
64
65    if not isinstance(result, tuple):
66        return False, f"Received an unexpected result from '{self.instance_connector}': {result}"
67
68    if result[0]:
69        to_delete = ['_id']
70        for member in to_delete:
71            if member in self.__dict__:
72                del self.__dict__[member]
73    return result

Call the Pipe's instance connector's delete_pipe() method.

Parameters
  • drop (bool, default True): If True, drop the pipes' target table.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A SuccessTuple of success (bool), message (str).
def drop(self, debug: bool = False, **kw: Any) -> Tuple[bool, str]:
13def drop(
14        self,
15        debug: bool = False,
16        **kw: Any
17    ) -> SuccessTuple:
18    """
19    Call the Pipe's instance connector's `drop_pipe()` method.
20
21    Parameters
22    ----------
23    debug: bool, default False:
24        Verbosity toggle.
25
26    Returns
27    -------
28    A `SuccessTuple` of success, message.
29
30    """
31    self._exists = False
32    from meerschaum.utils.warnings import warn
33    from meerschaum.utils.venv import Venv
34    from meerschaum.connectors import get_connector_plugin
35
36    if self.cache_pipe is not None:
37        _drop_cache_tuple = self.cache_pipe.drop(debug=debug, **kw)
38        if not _drop_cache_tuple[0]:
39            warn(_drop_cache_tuple[1])
40
41    with Venv(get_connector_plugin(self.instance_connector)):
42        result = self.instance_connector.drop_pipe(self, debug=debug, **kw)
43    return result

Call the Pipe's instance connector's drop_pipe() method.

Parameters
  • debug (bool, default False:): Verbosity toggle.
Returns
  • A SuccessTuple of success, message.
def clear( self, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kwargs: Any) -> Tuple[bool, str]:
13def clear(
14        self,
15        begin: Optional[datetime.datetime] = None,
16        end: Optional[datetime.datetime] = None,
17        params: Optional[Dict[str, Any]] = None,
18        debug: bool = False,
19        **kwargs: Any
20    ) -> SuccessTuple:
21    """
22    Call the Pipe's instance connector's `clear_pipe` method.
23
24    Parameters
25    ----------
26    begin: Optional[datetime.datetime], default None:
27        If provided, only remove rows newer than this datetime value.
28
29    end: Optional[datetime.datetime], default None:
30        If provided, only remove rows older than this datetime column (not including end).
31
32    params: Optional[Dict[str, Any]], default None
33         See `meerschaum.utils.sql.build_where`.
34
35    debug: bool, default False:
36        Verbositity toggle.
37
38    Returns
39    -------
40    A `SuccessTuple` corresponding to whether this procedure completed successfully.
41
42    Examples
43    --------
44    >>> pipe = mrsm.Pipe('test', 'test', columns={'datetime': 'dt'}, instance='sql:local')
45    >>> pipe.sync({'dt': [datetime.datetime(2020, 1, 1, 0, 0)]})
46    >>> pipe.sync({'dt': [datetime.datetime(2021, 1, 1, 0, 0)]})
47    >>> pipe.sync({'dt': [datetime.datetime(2022, 1, 1, 0, 0)]})
48    >>> 
49    >>> pipe.clear(begin=datetime.datetime(2021, 1, 1, 0, 0))
50    >>> pipe.get_data()
51              dt
52    0 2020-01-01
53
54    """
55    from meerschaum.utils.warnings import warn
56    from meerschaum.utils.venv import Venv
57    from meerschaum.connectors import get_connector_plugin
58
59    if self.cache_pipe is not None:
60        success, msg = self.cache_pipe.clear(
61            begin = begin,
62            end = end,
63            params = params,
64            debug = debug,
65            **kwargs
66        )
67        if not success:
68            warn(msg)
69
70    with Venv(get_connector_plugin(self.instance_connector)):
71        return self.instance_connector.clear_pipe(
72            self,
73            begin = begin,
74            end = end,
75            params = params,
76            debug = debug,
77            **kwargs
78        )

Call the Pipe's instance connector's clear_pipe method.

Parameters
  • begin (Optional[datetime.datetime], default None:): If provided, only remove rows newer than this datetime value.
  • end (Optional[datetime.datetime], default None:): If provided, only remove rows older than this datetime column (not including end).
  • params (Optional[Dict[str, Any]], default None): See meerschaum.utils.sql.build_where.
  • debug (bool, default False:): Verbositity toggle.
Returns
  • A SuccessTuple corresponding to whether this procedure completed successfully.
Examples
>>> pipe = mrsm.Pipe('test', 'test', columns={'datetime': 'dt'}, instance='sql:local')
>>> pipe.sync({'dt': [datetime.datetime(2020, 1, 1, 0, 0)]})
>>> pipe.sync({'dt': [datetime.datetime(2021, 1, 1, 0, 0)]})
>>> pipe.sync({'dt': [datetime.datetime(2022, 1, 1, 0, 0)]})
>>> 
>>> pipe.clear(begin=datetime.datetime(2021, 1, 1, 0, 0))
>>> pipe.get_data()
          dt
0 2020-01-01
def deduplicate( self, begin: 'Union[datetime, int, None]' = None, end: 'Union[datetime, int, None]' = None, params: Optional[Dict[str, Any]] = None, chunk_interval: 'Union[datetime, int, None]' = None, bounded: Optional[bool] = None, workers: Optional[int] = None, debug: bool = False, _use_instance_method: bool = True, **kwargs: Any) -> Tuple[bool, str]:
 15def deduplicate(
 16        self,
 17        begin: Union[datetime, int, None] = None,
 18        end: Union[datetime, int, None] = None,
 19        params: Optional[Dict[str, Any]] = None,
 20        chunk_interval: Union[datetime, int, None] = None,
 21        bounded: Optional[bool] = None,
 22        workers: Optional[int] = None,
 23        debug: bool = False,
 24        _use_instance_method: bool = True,
 25        **kwargs: Any
 26    ) -> SuccessTuple:
 27    """
 28    Call the Pipe's instance connector's `delete_duplicates` method to delete duplicate rows.
 29
 30    Parameters
 31    ----------
 32    begin: Union[datetime, int, None], default None:
 33        If provided, only deduplicate rows newer than this datetime value.
 34
 35    end: Union[datetime, int, None], default None:
 36        If provided, only deduplicate rows older than this datetime column (not including end).
 37
 38    params: Optional[Dict[str, Any]], default None
 39        Restrict deduplication to this filter (for multiplexed data streams).
 40        See `meerschaum.utils.sql.build_where`.
 41
 42    chunk_interval: Union[timedelta, int, None], default None
 43        If provided, use this for the chunk bounds.
 44        Defaults to the value set in `pipe.parameters['chunk_minutes']` (1440).
 45
 46    bounded: Optional[bool], default None
 47        Only check outside the oldest and newest sync times if bounded is explicitly `False`.
 48
 49    workers: Optional[int], default None
 50        If the instance connector is thread-safe, limit concurrenct syncs to this many threads.
 51
 52    debug: bool, default False:
 53        Verbositity toggle.
 54
 55    kwargs: Any
 56        All other keyword arguments are passed to
 57        `pipe.sync()`, `pipe.clear()`, and `pipe.get_data().
 58
 59    Returns
 60    -------
 61    A `SuccessTuple` corresponding to whether all of the chunks were successfully deduplicated.
 62    """
 63    from meerschaum.utils.warnings import warn, info
 64    from meerschaum.utils.misc import interval_str, items_str
 65    from meerschaum.utils.venv import Venv
 66    from meerschaum.connectors import get_connector_plugin
 67    from meerschaum.utils.pool import get_pool
 68
 69    if self.cache_pipe is not None:
 70        success, msg = self.cache_pipe.deduplicate(
 71            begin = begin,
 72            end = end,
 73            params = params,
 74            bounded = bounded,
 75            debug = debug,
 76            _use_instance_method = _use_instance_method,
 77            **kwargs
 78        )
 79        if not success:
 80            warn(msg)
 81
 82    workers = self.get_num_workers(workers=workers)
 83    pool = get_pool(workers=workers)
 84
 85    if _use_instance_method:
 86        with Venv(get_connector_plugin(self.instance_connector)):
 87            if hasattr(self.instance_connector, 'deduplicate_pipe'):
 88                return self.instance_connector.deduplicate_pipe(
 89                    self,
 90                    begin = begin,
 91                    end = end,
 92                    params = params,
 93                    bounded = bounded,
 94                    debug = debug,
 95                    **kwargs
 96                )
 97
 98    ### Only unbound if explicitly False.
 99    if bounded is None:
100        bounded = True
101    chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug)
102
103    bound_time = self.get_bound_time(debug=debug)
104    if bounded and begin is None:
105        begin = (
106            bound_time
107            if bound_time is not None
108            else self.get_sync_time(newest=False, debug=debug)
109        )
110    if bounded and end is None:
111        end = self.get_sync_time(newest=True, debug=debug)
112
113    if bounded and end is not None:
114        end += (
115            timedelta(minutes=1)
116            if isinstance(end, datetime)
117            else 1
118        )
119
120    chunk_bounds = self.get_chunk_bounds(
121        bounded = bounded,
122        begin = begin,
123        end = end,
124        chunk_interval = chunk_interval,
125        debug = debug,
126    )
127
128    indices = [col for col in self.columns.values() if col]
129    if not indices:
130        return False, f"Cannot deduplicate without index columns."
131    dt_col = self.columns.get('datetime', None)
132
133    def process_chunk_bounds(bounds) -> Tuple[
134            Tuple[
135                Union[datetime, int, None],
136                Union[datetime, int, None]
137            ],
138            SuccessTuple
139        ]:
140        ### Only selecting the index values here to keep bandwidth down.
141        chunk_begin, chunk_end = bounds
142        chunk_df = self.get_data(
143            select_columns = indices, 
144            begin = chunk_begin,
145            end = chunk_end,
146            params = params,
147            debug = debug,
148        )
149        if chunk_df is None:
150            return bounds, (True, "")
151        existing_chunk_len = len(chunk_df)
152        deduped_chunk_df = chunk_df.drop_duplicates(keep='last')
153        deduped_chunk_len = len(deduped_chunk_df)
154
155        if existing_chunk_len == deduped_chunk_len:
156            return bounds, (True, "")
157
158        chunk_msg_header = f"\n{chunk_begin} - {chunk_end}"
159        chunk_msg_body = ""
160
161        full_chunk = self.get_data(
162            begin = chunk_begin,
163            end = chunk_end,
164            params = params,
165            debug = debug,
166        )
167        if full_chunk is None or len(full_chunk) == 0:
168            return bounds, (True, f"{chunk_msg_header}\nChunk is empty, skipping...")
169
170        chunk_indices = [ix for ix in indices if ix in full_chunk.columns]
171        if not chunk_indices:
172            return bounds, (False, f"None of {items_str(indices)} were present in chunk.")
173        try:
174            full_chunk = full_chunk.drop_duplicates(
175                subset = chunk_indices,
176                keep = 'last'
177            ).reset_index(
178                drop = True,
179            )
180        except Exception as e:
181            return (
182                bounds,
183                (False, f"Failed to deduplicate chunk on {items_str(chunk_indices)}:\n({e})")
184            )
185
186        clear_success, clear_msg = self.clear(
187            begin = chunk_begin,
188            end = chunk_end,
189            params = params,
190            debug = debug,
191        )
192        if not clear_success:
193            chunk_msg_body += f"Failed to clear chunk while deduplicating:\n{clear_msg}\n"
194            warn(chunk_msg_body)
195
196        sync_success, sync_msg = self.sync(full_chunk, debug=debug)
197        if not sync_success:
198            chunk_msg_body += f"Failed to sync chunk while deduplicating:\n{sync_msg}\n"
199         
200        ### Finally check if the deduplication worked.
201        chunk_rowcount = self.get_rowcount(
202            begin = chunk_begin,
203            end = chunk_end,
204            params = params,
205            debug = debug,
206        )
207        if chunk_rowcount != deduped_chunk_len:
208            return bounds, (
209                False, (
210                    chunk_msg_header + "\n"
211                    + chunk_msg_body + ("\n" if chunk_msg_body else '')
212                    + "Chunk rowcounts still differ ("
213                    + f"{chunk_rowcount} rowcount vs {deduped_chunk_len} chunk length)."
214                )
215            )
216
217        return bounds, (
218            True, (
219                chunk_msg_header + "\n"
220                + chunk_msg_body + ("\n" if chunk_msg_body else '')
221                + f"Deduplicated chunk from {existing_chunk_len} to {chunk_rowcount} rows."
222            )
223        )
224
225    info(
226        f"Deduplicating {len(chunk_bounds)} chunk"
227        + ('s' if len(chunk_bounds) != 1 else '')
228        + f" ({'un' if not bounded else ''}bounded)"
229        + f" of size '{interval_str(chunk_interval)}'"
230        + f" on {self}."
231    )
232    bounds_success_tuples = dict(pool.map(process_chunk_bounds, chunk_bounds))
233    bounds_successes = {
234        bounds: success_tuple
235        for bounds, success_tuple in bounds_success_tuples.items()
236        if success_tuple[0]
237    }
238    bounds_failures = {
239        bounds: success_tuple
240        for bounds, success_tuple in bounds_success_tuples.items()
241        if not success_tuple[0]
242    }
243
244    ### No need to retry if everything failed.
245    if len(bounds_failures) > 0 and len(bounds_successes) == 0:
246        return (
247            False,
248            (
249                f"Failed to deduplicate {len(bounds_failures)} chunk"
250                + ('s' if len(bounds_failures) != 1 else '')
251                + ".\n"
252                + "\n".join([msg for _, (_, msg) in bounds_failures.items() if msg])
253            )
254        )
255
256    retry_bounds = [bounds for bounds in bounds_failures]
257    if not retry_bounds:
258        return (
259            True,
260            (
261                f"Successfully deduplicated {len(bounds_successes)} chunk"
262                + ('s' if len(bounds_successes) != 1 else '')
263                + ".\n"
264                + "\n".join([msg for _, (_, msg) in bounds_successes.items() if msg])
265            ).rstrip('\n')
266        )
267
268    info(f"Retrying {len(retry_bounds)} chunks for {self}...")
269    retry_bounds_success_tuples = dict(pool.map(process_chunk_bounds, retry_bounds))
270    retry_bounds_successes = {
271        bounds: success_tuple
272        for bounds, success_tuple in bounds_success_tuples.items()
273        if success_tuple[0]
274    }
275    retry_bounds_failures = {
276        bounds: success_tuple
277        for bounds, success_tuple in bounds_success_tuples.items()
278        if not success_tuple[0]
279    }
280
281    bounds_successes.update(retry_bounds_successes)
282    if not retry_bounds_failures:
283        return (
284            True,
285            (
286                f"Successfully deduplicated {len(bounds_successes)} chunk"
287                + ('s' if len(bounds_successes) != 1 else '')
288                + f"({len(retry_bounds_successes)} retried):\n"
289                + "\n".join([msg for _, (_, msg) in bounds_successes.items() if msg])
290            ).rstrip('\n')
291        )
292
293    return (
294        False,
295        (
296            f"Failed to deduplicate {len(bounds_failures)} chunk"
297            + ('s' if len(retry_bounds_failures) != 1 else '')
298            + ".\n"
299            + "\n".join([msg for _, (_, msg) in retry_bounds_failures.items() if msg])
300        ).rstrip('\n')
301    )

Call the Pipe's instance connector's delete_duplicates method to delete duplicate rows.

Parameters
  • begin (Union[datetime, int, None], default None:): If provided, only deduplicate rows newer than this datetime value.
  • end (Union[datetime, int, None], default None:): If provided, only deduplicate rows older than this datetime column (not including end).
  • params (Optional[Dict[str, Any]], default None): Restrict deduplication to this filter (for multiplexed data streams). See meerschaum.utils.sql.build_where.
  • chunk_interval (Union[timedelta, int, None], default None): If provided, use this for the chunk bounds. Defaults to the value set in pipe.parameters['chunk_minutes'] (1440).
  • bounded (Optional[bool], default None): Only check outside the oldest and newest sync times if bounded is explicitly False.
  • workers (Optional[int], default None): If the instance connector is thread-safe, limit concurrenct syncs to this many threads.
  • debug (bool, default False:): Verbositity toggle.
  • kwargs (Any): All other keyword arguments are passed to pipe.sync(), pipe.clear(), and `pipe.get_data().
Returns
  • A SuccessTuple corresponding to whether all of the chunks were successfully deduplicated.
def bootstrap( self, debug: bool = False, yes: bool = False, force: bool = False, noask: bool = False, shell: bool = False, **kw) -> Tuple[bool, str]:
 13def bootstrap(
 14        self,
 15        debug: bool = False,
 16        yes: bool = False,
 17        force: bool = False,
 18        noask: bool = False,
 19        shell: bool = False,
 20        **kw
 21    ) -> SuccessTuple:
 22    """
 23    Prompt the user to create a pipe's requirements all from one method.
 24    This method shouldn't be used in any automated scripts because it interactively
 25    prompts the user and therefore may hang.
 26
 27    Parameters
 28    ----------
 29    debug: bool, default False:
 30        Verbosity toggle.
 31
 32    yes: bool, default False:
 33        Print the questions and automatically agree.
 34
 35    force: bool, default False:
 36        Skip the questions and agree anyway.
 37
 38    noask: bool, default False:
 39        Print the questions but go with the default answer.
 40
 41    shell: bool, default False:
 42        Used to determine if we are in the interactive shell.
 43        
 44    Returns
 45    -------
 46    A `SuccessTuple` corresponding to the success of this procedure.
 47
 48    """
 49
 50    from meerschaum.utils.warnings import warn, info, error
 51    from meerschaum.utils.prompt import prompt, yes_no
 52    from meerschaum.utils.formatting import pprint
 53    from meerschaum.config import get_config
 54    from meerschaum.utils.formatting._shell import clear_screen
 55    from meerschaum.utils.formatting import print_tuple
 56    from meerschaum.actions import actions
 57    from meerschaum.utils.venv import Venv
 58    from meerschaum.connectors import get_connector_plugin
 59
 60    _clear = get_config('shell', 'clear_screen', patch=True)
 61
 62    if self.get_id(debug=debug) is not None:
 63        delete_tuple = self.delete(debug=debug)
 64        if not delete_tuple[0]:
 65            return delete_tuple
 66
 67    if _clear:
 68        clear_screen(debug=debug)
 69
 70    _parameters = _get_parameters(self, debug=debug)
 71    self.parameters = _parameters
 72    pprint(self.parameters)
 73    try:
 74        prompt(
 75            f"\n    Press [Enter] to register {self} with the above configuration:",
 76            icon = False
 77        )
 78    except KeyboardInterrupt as e:
 79        return False, f"Aborting bootstrapping {self}."
 80
 81    with Venv(get_connector_plugin(self.instance_connector)):
 82        register_tuple = self.instance_connector.register_pipe(self, debug=debug)
 83
 84    if not register_tuple[0]:
 85        return register_tuple
 86
 87    if _clear:
 88        clear_screen(debug=debug)
 89
 90    try:
 91        if yes_no(
 92            f"Would you like to edit the definition for {self}?", yes=yes, noask=noask
 93        ):
 94            edit_tuple = self.edit_definition(debug=debug)
 95            if not edit_tuple[0]:
 96                return edit_tuple
 97
 98        if yes_no(f"Would you like to try syncing {self} now?", yes=yes, noask=noask):
 99            sync_tuple = actions['sync'](
100                ['pipes'],
101                connector_keys = [self.connector_keys],
102                metric_keys = [self.metric_key],
103                location_keys = [self.location_key],
104                mrsm_instance = str(self.instance_connector),
105                debug = debug,
106                shell = shell,
107            )
108            if not sync_tuple[0]:
109                return sync_tuple
110    except Exception as e:
111        return False, f"Failed to bootstrap {self}:\n" + str(e)
112
113    print_tuple((True, f"Finished bootstrapping {self}!"))
114    info(
115        f"You can edit this pipe later with `edit pipes` "
116        + "or set the definition with `edit pipes definition`.\n"
117        + "    To sync data into your pipe, run `sync pipes`."
118    )
119
120    return True, "Success"

Prompt the user to create a pipe's requirements all from one method. This method shouldn't be used in any automated scripts because it interactively prompts the user and therefore may hang.

Parameters
  • debug (bool, default False:): Verbosity toggle.
  • yes (bool, default False:): Print the questions and automatically agree.
  • force (bool, default False:): Skip the questions and agree anyway.
  • noask (bool, default False:): Print the questions but go with the default answer.
  • shell (bool, default False:): Used to determine if we are in the interactive shell.
Returns
  • A SuccessTuple corresponding to the success of this procedure.
def enforce_dtypes( self, df: "'pd.DataFrame'", chunksize: Optional[int] = -1, safe_copy: bool = True, debug: bool = False) -> "'pd.DataFrame'":
14def enforce_dtypes(
15        self,
16        df: 'pd.DataFrame',
17        chunksize: Optional[int] = -1,
18        safe_copy: bool = True,
19        debug: bool = False,
20    ) -> 'pd.DataFrame':
21    """
22    Cast the input dataframe to the pipe's registered data types.
23    If the pipe does not exist and dtypes are not set, return the dataframe.
24    """
25    import traceback
26    from meerschaum.utils.warnings import warn
27    from meerschaum.utils.debug import dprint
28    from meerschaum.utils.dataframe import parse_df_datetimes, enforce_dtypes as _enforce_dtypes
29    from meerschaum.utils.packages import import_pandas
30    pd = import_pandas(debug=debug)
31    if df is None:
32        if debug:
33            dprint(
34                f"Received None instead of a DataFrame.\n"
35                + "    Skipping dtype enforcement..."
36            )
37        return df
38
39    pipe_dtypes = self.dtypes
40
41    try:
42        if isinstance(df, str):
43            df = parse_df_datetimes(
44                pd.read_json(StringIO(df)),
45                ignore_cols = [
46                    col
47                    for col, dtype in pipe_dtypes.items()
48                    if 'datetime' not in str(dtype)
49                ],
50                chunksize = chunksize,
51                debug = debug,
52            )
53        else:
54            df = parse_df_datetimes(
55                df,
56                ignore_cols = [
57                    col
58                    for col, dtype in pipe_dtypes.items()
59                    if 'datetime' not in str(dtype)
60                ],
61                chunksize = chunksize,
62                debug = debug,
63            )
64    except Exception as e:
65        warn(f"Unable to cast incoming data as a DataFrame...:\n{e}\n\n{traceback.format_exc()}")
66        return None
67
68    if not pipe_dtypes:
69        if debug:
70            dprint(
71                f"Could not find dtypes for {self}.\n"
72                + "    Skipping dtype enforcement..."
73            )
74        return df
75
76    return _enforce_dtypes(df, pipe_dtypes, safe_copy=safe_copy, debug=debug)

Cast the input dataframe to the pipe's registered data types. If the pipe does not exist and dtypes are not set, return the dataframe.

def infer_dtypes(self, persist: bool = False, debug: bool = False) -> Dict[str, Any]:
 79def infer_dtypes(self, persist: bool=False, debug: bool=False) -> Dict[str, Any]:
 80    """
 81    If `dtypes` is not set in `meerschaum.Pipe.parameters`,
 82    infer the data types from the underlying table if it exists.
 83
 84    Parameters
 85    ----------
 86    persist: bool, default False
 87        If `True`, persist the inferred data types to `meerschaum.Pipe.parameters`.
 88
 89    Returns
 90    -------
 91    A dictionary of strings containing the pandas data types for this Pipe.
 92    """
 93    if not self.exists(debug=debug):
 94        dtypes = {}
 95        if not self.columns:
 96            return {}
 97        dt_col = self.columns.get('datetime', None)
 98        if dt_col:
 99            if not self.parameters.get('dtypes', {}).get(dt_col, None):
100                dtypes[dt_col] = 'datetime64[ns]'
101        return dtypes
102
103    from meerschaum.utils.sql import get_pd_type
104    from meerschaum.utils.misc import to_pandas_dtype
105    columns_types = self.get_columns_types(debug=debug)
106
107    ### NOTE: get_columns_types() may return either the types as
108    ###       PostgreSQL- or Pandas-style.
109    dtypes = {
110        c: (
111            get_pd_type(t, allow_custom_dtypes=True)
112            if str(t).isupper()
113            else to_pandas_dtype(t)
114        )
115        for c, t in columns_types.items()
116    } if columns_types else {}
117    if persist:
118        self.dtypes = dtypes
119        self.edit(interactive=False, debug=debug)
120    return dtypes

If dtypes is not set in meerschaum.Pipe.parameters, infer the data types from the underlying table if it exists.

Parameters
Returns
  • A dictionary of strings containing the pandas data types for this Pipe.
class Plugin:
 33class Plugin:
 34    """Handle packaging of Meerschaum plugins."""
 35    def __init__(
 36        self,
 37        name: str,
 38        version: Optional[str] = None,
 39        user_id: Optional[int] = None,
 40        required: Optional[List[str]] = None,
 41        attributes: Optional[Dict[str, Any]] = None,
 42        archive_path: Optional[pathlib.Path] = None,
 43        venv_path: Optional[pathlib.Path] = None,
 44        repo_connector: Optional['meerschaum.connectors.api.APIConnector'] = None,
 45        repo: Union['meerschaum.connectors.api.APIConnector', str, None] = None,
 46    ):
 47        from meerschaum.config.static import STATIC_CONFIG
 48        sep = STATIC_CONFIG['plugins']['repo_separator']
 49        _repo = None
 50        if sep in name:
 51            try:
 52                name, _repo = name.split(sep)
 53            except Exception as e:
 54                error(f"Invalid plugin name: '{name}'")
 55        self._repo_in_name = _repo
 56
 57        if attributes is None:
 58            attributes = {}
 59        self.name = name
 60        self.attributes = attributes
 61        self.user_id = user_id
 62        self._version = version
 63        if required:
 64            self._required = required
 65        self.archive_path = (
 66            archive_path if archive_path is not None
 67            else PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz"
 68        )
 69        self.venv_path = (
 70            venv_path if venv_path is not None
 71            else VIRTENV_RESOURCES_PATH / self.name
 72        )
 73        self._repo_connector = repo_connector
 74        self._repo_keys = repo
 75
 76
 77    @property
 78    def repo_connector(self):
 79        """
 80        Return the repository connector for this plugin.
 81        NOTE: This imports the `connectors` module, which imports certain plugin modules.
 82        """
 83        if self._repo_connector is None:
 84            from meerschaum.connectors.parse import parse_repo_keys
 85
 86            repo_keys = self._repo_keys or self._repo_in_name
 87            if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name:
 88                error(
 89                    f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'."
 90                )
 91            repo_connector = parse_repo_keys(repo_keys)
 92            self._repo_connector = repo_connector
 93        return self._repo_connector
 94
 95
 96    @property
 97    def version(self):
 98        """
 99        Return the plugin's module version is defined (`__version__`) if it's defined.
100        """
101        if self._version is None:
102            try:
103                self._version = self.module.__version__
104            except Exception as e:
105                self._version = None
106        return self._version
107
108
109    @property
110    def module(self):
111        """
112        Return the Python module of the underlying plugin.
113        """
114        if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None:
115            if self.__file__ is None:
116                return None
117            from meerschaum.plugins import import_plugins
118            self._module = import_plugins(str(self), warn=False)
119        return self._module
120
121
122    @property
123    def __file__(self) -> Union[str, None]:
124        """
125        Return the file path (str) of the plugin if it exists, otherwise `None`.
126        """
127        if self.__dict__.get('_module', None) is not None:
128            return self.module.__file__
129
130        potential_dir = PLUGINS_RESOURCES_PATH / self.name
131        if (
132            potential_dir.exists()
133            and potential_dir.is_dir()
134            and (potential_dir / '__init__.py').exists()
135        ):
136            return str((potential_dir / '__init__.py').as_posix())
137
138        potential_file = PLUGINS_RESOURCES_PATH / (self.name + '.py')
139        if potential_file.exists() and not potential_file.is_dir():
140            return str(potential_file.as_posix())
141
142        return None
143
144
145    @property
146    def requirements_file_path(self) -> Union[pathlib.Path, None]:
147        """
148        If a file named `requirements.txt` exists, return its path.
149        """
150        if self.__file__ is None:
151            return None
152        path = pathlib.Path(self.__file__).parent / 'requirements.txt'
153        if not path.exists():
154            return None
155        return path
156
157
158    def is_installed(self, **kw) -> bool:
159        """
160        Check whether a plugin is correctly installed.
161
162        Returns
163        -------
164        A `bool` indicating whether a plugin exists and is successfully imported.
165        """
166        return self.__file__ is not None
167
168
169    def make_tar(self, debug: bool = False) -> pathlib.Path:
170        """
171        Compress the plugin's source files into a `.tar.gz` archive and return the archive's path.
172
173        Parameters
174        ----------
175        debug: bool, default False
176            Verbosity toggle.
177
178        Returns
179        -------
180        A `pathlib.Path` to the archive file's path.
181
182        """
183        import tarfile, pathlib, subprocess, fnmatch
184        from meerschaum.utils.debug import dprint
185        from meerschaum.utils.packages import attempt_import
186        pathspec = attempt_import('pathspec', debug=debug)
187
188        if not self.__file__:
189            from meerschaum.utils.warnings import error
190            error(f"Could not find file for plugin '{self}'.")
191        if '__init__.py' in self.__file__ or os.path.isdir(self.__file__):
192            path = self.__file__.replace('__init__.py', '')
193            is_dir = True
194        else:
195            path = self.__file__
196            is_dir = False
197
198        old_cwd = os.getcwd()
199        real_parent_path = pathlib.Path(os.path.realpath(path)).parent
200        os.chdir(real_parent_path)
201
202        default_patterns_to_ignore = [
203            '.pyc',
204            '__pycache__/',
205            'eggs/',
206            '__pypackages__/',
207            '.git',
208        ]
209
210        def parse_gitignore() -> 'Set[str]':
211            gitignore_path = pathlib.Path(path) / '.gitignore'
212            if not gitignore_path.exists():
213                return set()
214            with open(gitignore_path, 'r', encoding='utf-8') as f:
215                gitignore_text = f.read()
216            return set(pathspec.PathSpec.from_lines(
217                pathspec.patterns.GitWildMatchPattern,
218                default_patterns_to_ignore + gitignore_text.splitlines()
219            ).match_tree(path))
220
221        patterns_to_ignore = parse_gitignore() if is_dir else set()
222
223        if debug:
224            dprint(f"Patterns to ignore:\n{patterns_to_ignore}")
225
226        with tarfile.open(self.archive_path, 'w:gz') as tarf:
227            if not is_dir:
228                tarf.add(f"{self.name}.py")
229            else:
230                for root, dirs, files in os.walk(self.name):
231                    for f in files:
232                        good_file = True
233                        fp = os.path.join(root, f)
234                        for pattern in patterns_to_ignore:
235                            if pattern in str(fp) or f.startswith('.'):
236                                good_file = False
237                                break
238                        if good_file:
239                            if debug:
240                                dprint(f"Adding '{fp}'...")
241                            tarf.add(fp)
242
243        ### clean up and change back to old directory
244        os.chdir(old_cwd)
245
246        ### change to 775 to avoid permissions issues with the API in a Docker container
247        self.archive_path.chmod(0o775)
248
249        if debug:
250            dprint(f"Created archive '{self.archive_path}'.")
251        return self.archive_path
252
253
254    def install(
255            self,
256            force: bool = False,
257            debug: bool = False,
258        ) -> SuccessTuple:
259        """
260        Extract a plugin's tar archive to the plugins directory.
261        
262        This function checks if the plugin is already installed and if the version is equal or
263        greater than the existing installation.
264
265        Parameters
266        ----------
267        force: bool, default False
268            If `True`, continue with installation, even if required packages fail to install.
269
270        debug: bool, default False
271            Verbosity toggle.
272
273        Returns
274        -------
275        A `SuccessTuple` of success (bool) and a message (str).
276
277        """
278        if self.full_name in _ongoing_installations:
279            return True, f"Already installing plugin '{self}'."
280        _ongoing_installations.add(self.full_name)
281        from meerschaum.utils.warnings import warn, error
282        if debug:
283            from meerschaum.utils.debug import dprint
284        import tarfile
285        import re
286        import ast
287        from meerschaum.plugins import sync_plugins_symlinks
288        from meerschaum.utils.packages import attempt_import, determine_version, reload_meerschaum
289        from meerschaum.utils.venv import init_venv
290        from meerschaum.utils.misc import safely_extract_tar
291        old_cwd = os.getcwd()
292        old_version = ''
293        new_version = ''
294        temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name
295        temp_dir.mkdir(exist_ok=True)
296
297        if not self.archive_path.exists():
298            return False, f"Missing archive file for plugin '{self}'."
299        if self.version is not None:
300            old_version = self.version
301            if debug:
302                dprint(f"Found existing version '{old_version}' for plugin '{self}'.")
303
304        if debug:
305            dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...")
306
307        try:
308            with tarfile.open(self.archive_path, 'r:gz') as tarf:
309                safely_extract_tar(tarf, temp_dir)
310        except Exception as e:
311            warn(e)
312            return False, f"Failed to extract plugin '{self.name}'."
313
314        ### search for version information
315        files = os.listdir(temp_dir)
316        
317        if str(files[0]) == self.name:
318            is_dir = True
319        elif str(files[0]) == self.name + '.py':
320            is_dir = False
321        else:
322            error(f"Unknown format encountered for plugin '{self}'.")
323
324        fpath = temp_dir / files[0]
325        if is_dir:
326            fpath = fpath / '__init__.py'
327
328        init_venv(self.name, debug=debug)
329        with open(fpath, 'r', encoding='utf-8') as f:
330            init_lines = f.readlines()
331        new_version = None
332        for line in init_lines:
333            if '__version__' not in line:
334                continue
335            version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip())
336            if not version_match:
337                continue
338            new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip())
339            break
340        if not new_version:
341            warn(
342                f"No `__version__` defined for plugin '{self}'. "
343                + "Assuming new version...",
344                stack = False,
345            )
346
347        packaging_version = attempt_import('packaging.version')
348        try:
349            is_new_version = (not new_version and not old_version) or (
350                packaging_version.parse(old_version) < packaging_version.parse(new_version)
351            )
352            is_same_version = new_version and old_version and (
353                packaging_version.parse(old_version) == packaging_version.parse(new_version)
354            )
355        except Exception as e:
356            is_new_version, is_same_version = True, False
357
358        ### Determine where to permanently store the new plugin.
359        plugin_installation_dir_path = PLUGINS_DIR_PATHS[0]
360        for path in PLUGINS_DIR_PATHS:
361            files_in_plugins_dir = os.listdir(path)
362            if (
363                self.name in files_in_plugins_dir
364                or
365                (self.name + '.py') in files_in_plugins_dir
366            ):
367                plugin_installation_dir_path = path
368                break
369
370        success_msg = f"Successfully installed plugin '{self}'."
371        success, abort = None, None
372
373        if is_same_version and not force:
374            success, msg = True, (
375                f"Plugin '{self}' is up-to-date (version {old_version}).\n" +
376                "    Install again with `-f` or `--force` to reinstall."
377            )
378            abort = True
379        elif is_new_version or force:
380            for src_dir, dirs, files in os.walk(temp_dir):
381                if success is not None:
382                    break
383                dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path))
384                if not os.path.exists(dst_dir):
385                    os.mkdir(dst_dir)
386                for f in files:
387                    src_file = os.path.join(src_dir, f)
388                    dst_file = os.path.join(dst_dir, f)
389                    if os.path.exists(dst_file):
390                        os.remove(dst_file)
391
392                    if debug:
393                        dprint(f"Moving '{src_file}' to '{dst_dir}'...")
394                    try:
395                        shutil.move(src_file, dst_dir)
396                    except Exception as e:
397                        success, msg = False, (
398                            f"Failed to install plugin '{self}': " +
399                            f"Could not move file '{src_file}' to '{dst_dir}'"
400                        )
401                        print(msg)
402                        break
403            if success is None:
404                success, msg = True, success_msg
405        else:
406            success, msg = False, (
407                f"Your installed version of plugin '{self}' ({old_version}) is higher than "
408                + f"attempted version {new_version}."
409            )
410
411        shutil.rmtree(temp_dir)
412        os.chdir(old_cwd)
413
414        ### Reload the plugin's module.
415        sync_plugins_symlinks(debug=debug)
416        if '_module' in self.__dict__:
417            del self.__dict__['_module']
418        init_venv(venv=self.name, force=True, debug=debug)
419        reload_meerschaum(debug=debug)
420
421        ### if we've already failed, return here
422        if not success or abort:
423            _ongoing_installations.remove(self.full_name)
424            return success, msg
425
426        ### attempt to install dependencies
427        if not self.install_dependencies(force=force, debug=debug):
428            _ongoing_installations.remove(self.full_name)
429            return False, f"Failed to install dependencies for plugin '{self}'."
430
431        ### handling success tuple, bool, or other (typically None)
432        setup_tuple = self.setup(debug=debug)
433        if isinstance(setup_tuple, tuple):
434            if not setup_tuple[0]:
435                success, msg = setup_tuple
436        elif isinstance(setup_tuple, bool):
437            if not setup_tuple:
438                success, msg = False, (
439                    f"Failed to run post-install setup for plugin '{self}'." + '\n' +
440                    f"Check `setup()` in '{self.__file__}' for more information " +
441                    f"(no error message provided)."
442                )
443            else:
444                success, msg = True, success_msg
445        elif setup_tuple is None:
446            success = True
447            msg = (
448                f"Post-install for plugin '{self}' returned None. " +
449                f"Assuming plugin successfully installed."
450            )
451            warn(msg)
452        else:
453            success = False
454            msg = (
455                f"Post-install for plugin '{self}' returned unexpected value " +
456                f"of type '{type(setup_tuple)}': {setup_tuple}"
457            )
458
459        _ongoing_installations.remove(self.full_name)
460        module = self.module
461        return success, msg
462
463
464    def remove_archive(
465            self,        
466            debug: bool = False
467        ) -> SuccessTuple:
468        """Remove a plugin's archive file."""
469        if not self.archive_path.exists():
470            return True, f"Archive file for plugin '{self}' does not exist."
471        try:
472            self.archive_path.unlink()
473        except Exception as e:
474            return False, f"Failed to remove archive for plugin '{self}':\n{e}"
475        return True, "Success"
476
477
478    def remove_venv(
479            self,        
480            debug: bool = False
481        ) -> SuccessTuple:
482        """Remove a plugin's virtual environment."""
483        if not self.venv_path.exists():
484            return True, f"Virtual environment for plugin '{self}' does not exist."
485        try:
486            shutil.rmtree(self.venv_path)
487        except Exception as e:
488            return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}"
489        return True, "Success"
490
491
492    def uninstall(self, debug: bool = False) -> SuccessTuple:
493        """
494        Remove a plugin, its virtual environment, and archive file.
495        """
496        from meerschaum.utils.packages import reload_meerschaum
497        from meerschaum.plugins import sync_plugins_symlinks
498        from meerschaum.utils.warnings import warn, info
499        warnings_thrown_count: int = 0
500        max_warnings: int = 3
501
502        if not self.is_installed():
503            info(
504                f"Plugin '{self.name}' doesn't seem to be installed.\n    "
505                + "Checking for artifacts...",
506                stack = False,
507            )
508        else:
509            real_path = pathlib.Path(os.path.realpath(self.__file__))
510            try:
511                if real_path.name == '__init__.py':
512                    shutil.rmtree(real_path.parent)
513                else:
514                    real_path.unlink()
515            except Exception as e:
516                warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False)
517                warnings_thrown_count += 1
518            else:
519                info(f"Removed source files for plugin '{self.name}'.")
520
521        if self.venv_path.exists():
522            success, msg = self.remove_venv(debug=debug)
523            if not success:
524                warn(msg, stack=False)
525                warnings_thrown_count += 1
526            else:
527                info(f"Removed virtual environment from plugin '{self.name}'.")
528
529        success = warnings_thrown_count < max_warnings
530        sync_plugins_symlinks(debug=debug)
531        self.deactivate_venv(force=True, debug=debug)
532        reload_meerschaum(debug=debug)
533        return success, (
534            f"Successfully uninstalled plugin '{self}'." if success
535            else f"Failed to uninstall plugin '{self}'."
536        )
537
538
539    def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]:
540        """
541        If exists, run the plugin's `setup()` function.
542
543        Parameters
544        ----------
545        *args: str
546            The positional arguments passed to the `setup()` function.
547            
548        debug: bool, default False
549            Verbosity toggle.
550
551        **kw: Any
552            The keyword arguments passed to the `setup()` function.
553
554        Returns
555        -------
556        A `SuccessTuple` or `bool` indicating success.
557
558        """
559        from meerschaum.utils.debug import dprint
560        import inspect
561        _setup = None
562        for name, fp in inspect.getmembers(self.module):
563            if name == 'setup' and inspect.isfunction(fp):
564                _setup = fp
565                break
566
567        ### assume success if no setup() is found (not necessary)
568        if _setup is None:
569            return True
570
571        sig = inspect.signature(_setup)
572        has_debug, has_kw = ('debug' in sig.parameters), False
573        for k, v in sig.parameters.items():
574            if '**' in str(v):
575                has_kw = True
576                break
577
578        _kw = {}
579        if has_kw:
580            _kw.update(kw)
581        if has_debug:
582            _kw['debug'] = debug
583
584        if debug:
585            dprint(f"Running setup for plugin '{self}'...")
586        try:
587            self.activate_venv(debug=debug)
588            return_tuple = _setup(*args, **_kw)
589            self.deactivate_venv(debug=debug)
590        except Exception as e:
591            return False, str(e)
592
593        if isinstance(return_tuple, tuple):
594            return return_tuple
595        if isinstance(return_tuple, bool):
596            return return_tuple, f"Setup for Plugin '{self.name}' did not return a message."
597        if return_tuple is None:
598            return False, f"Setup for Plugin '{self.name}' returned None."
599        return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"
600
601
602    def get_dependencies(
603            self,
604            debug: bool = False,
605        ) -> List[str]:
606        """
607        If the Plugin has specified dependencies in a list called `required`, return the list.
608        
609        **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages.
610        Meerschaum plugins may also specify connector keys for a repo after `'@'`.
611
612        Parameters
613        ----------
614        debug: bool, default False
615            Verbosity toggle.
616
617        Returns
618        -------
619        A list of required packages and plugins (str).
620
621        """
622        if '_required' in self.__dict__:
623            return self._required
624
625        ### If the plugin has not yet been imported,
626        ### infer the dependencies from the source text.
627        ### This is not super robust, and it doesn't feel right
628        ### having multiple versions of the logic.
629        ### This is necessary when determining the activation order
630        ### without having import the module.
631        ### For consistency's sake, the module-less method does not cache the requirements.
632        if self.__dict__.get('_module', None) is None:
633            file_path = self.__file__
634            if file_path is None:
635                return []
636            with open(file_path, 'r', encoding='utf-8') as f:
637                text = f.read()
638
639            if 'required' not in text:
640                return []
641
642            ### This has some limitations:
643            ### It relies on `required` being manually declared.
644            ### We lose the ability to dynamically alter the `required` list,
645            ### which is why we've kept the module-reliant method below.
646            import ast, re
647            ### NOTE: This technically would break 
648            ### if `required` was the very first line of the file.
649            req_start_match = re.search(r'\nrequired(\s?)=', text)
650            if not req_start_match:
651                return []
652            req_start = req_start_match.start()
653
654            ### Dependencies may have brackets within the strings, so push back the index.
655            first_opening_brace = req_start + 1 + text[req_start:].find('[')
656            if first_opening_brace == -1:
657                return []
658
659            next_closing_brace = req_start + 1 + text[req_start:].find(']')
660            if next_closing_brace == -1:
661                return []
662
663            start_ix = first_opening_brace + 1
664            end_ix = next_closing_brace
665
666            num_braces = 0
667            while True:
668                if '[' not in text[start_ix:end_ix]:
669                    break
670                num_braces += 1
671                start_ix = end_ix
672                end_ix += text[end_ix + 1:].find(']') + 1
673
674            req_end = end_ix + 1
675            req_text = (
676                text[req_start:req_end]
677                .lstrip()
678                .replace('required', '', 1)
679                .lstrip()
680                .replace('=', '', 1)
681                .lstrip()
682            )
683            try:
684                required = ast.literal_eval(req_text)
685            except Exception as e:
686                warn(
687                    f"Unable to determine requirements for plugin '{self.name}' "
688                    + "without importing the module.\n"
689                    + "    This may be due to dynamically setting the global `required` list.\n"
690                    + f"    {e}"
691                )
692                return []
693            return required
694
695        import inspect
696        self.activate_venv(dependencies=False, debug=debug)
697        required = []
698        for name, val in inspect.getmembers(self.module):
699            if name == 'required':
700                required = val
701                break
702        self._required = required
703        self.deactivate_venv(dependencies=False, debug=debug)
704        return required
705
706
707    def get_required_plugins(self, debug: bool=False) -> List[meerschaum.plugins.Plugin]:
708        """
709        Return a list of required Plugin objects.
710        """
711        from meerschaum.utils.warnings import warn
712        from meerschaum.config import get_config
713        from meerschaum.config.static import STATIC_CONFIG
714        plugins = []
715        _deps = self.get_dependencies(debug=debug)
716        sep = STATIC_CONFIG['plugins']['repo_separator']
717        plugin_names = [
718            _d[len('plugin:'):] for _d in _deps
719            if _d.startswith('plugin:') and len(_d) > len('plugin:')
720        ]
721        default_repo_keys = get_config('meerschaum', 'default_repository')
722        for _plugin_name in plugin_names:
723            if sep in _plugin_name:
724                try:
725                    _plugin_name, _repo_keys = _plugin_name.split(sep)
726                except Exception as e:
727                    _repo_keys = default_repo_keys
728                    warn(
729                        f"Invalid repo keys for required plugin '{_plugin_name}'.\n    "
730                        + f"Will try to use '{_repo_keys}' instead.",
731                        stack = False,
732                    )
733            else:
734                _repo_keys = default_repo_keys
735            plugins.append(Plugin(_plugin_name, repo=_repo_keys))
736        return plugins
737
738
739    def get_required_packages(self, debug: bool=False) -> List[str]:
740        """
741        Return the required package names (excluding plugins).
742        """
743        _deps = self.get_dependencies(debug=debug)
744        return [_d for _d in _deps if not _d.startswith('plugin:')]
745
746
747    def activate_venv(self, dependencies: bool=True, debug: bool=False, **kw) -> bool:
748        """
749        Activate the virtual environments for the plugin and its dependencies.
750
751        Parameters
752        ----------
753        dependencies: bool, default True
754            If `True`, activate the virtual environments for required plugins.
755
756        Returns
757        -------
758        A bool indicating success.
759        """
760        from meerschaum.utils.venv import venv_target_path
761        from meerschaum.utils.packages import activate_venv
762        from meerschaum.utils.misc import make_symlink, is_symlink
763        from meerschaum.config._paths import PACKAGE_ROOT_PATH
764
765        if dependencies:
766            for plugin in self.get_required_plugins(debug=debug):
767                plugin.activate_venv(debug=debug, **kw)
768
769        vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True)
770        venv_meerschaum_path = vtp / 'meerschaum'
771
772        try:
773            success, msg = True, "Success"
774            if is_symlink(venv_meerschaum_path):
775                if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH:
776                    venv_meerschaum_path.unlink()
777                    success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH)
778        except Exception as e:
779            success, msg = False, str(e)
780        if not success:
781            warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}")
782
783        return activate_venv(self.name, debug=debug, **kw)
784
785
786    def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool:
787        """
788        Deactivate the virtual environments for the plugin and its dependencies.
789
790        Parameters
791        ----------
792        dependencies: bool, default True
793            If `True`, deactivate the virtual environments for required plugins.
794
795        Returns
796        -------
797        A bool indicating success.
798        """
799        from meerschaum.utils.packages import deactivate_venv
800        success = deactivate_venv(self.name, debug=debug, **kw)
801        if dependencies:
802            for plugin in self.get_required_plugins(debug=debug):
803                plugin.deactivate_venv(debug=debug, **kw)
804        return success
805
806
807    def install_dependencies(
808            self,
809            force: bool = False,
810            debug: bool = False,
811        ) -> bool:
812        """
813        If specified, install dependencies.
814        
815        **NOTE:** Dependencies that start with `'plugin:'` will be installed as
816        Meerschaum plugins from the same repository as this Plugin.
817        To install from a different repository, add the repo keys after `'@'`
818        (e.g. `'plugin:foo@api:bar'`).
819
820        Parameters
821        ----------
822        force: bool, default False
823            If `True`, continue with the installation, even if some
824            required packages fail to install.
825
826        debug: bool, default False
827            Verbosity toggle.
828
829        Returns
830        -------
831        A bool indicating success.
832
833        """
834        from meerschaum.utils.packages import pip_install, venv_contains_package
835        from meerschaum.utils.debug import dprint
836        from meerschaum.utils.warnings import warn, info
837        from meerschaum.connectors.parse import parse_repo_keys
838        _deps = self.get_dependencies(debug=debug)
839        if not _deps and self.requirements_file_path is None:
840            return True
841
842        plugins = self.get_required_plugins(debug=debug)
843        for _plugin in plugins:
844            if _plugin.name == self.name:
845                warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False)
846                continue
847            _success, _msg = _plugin.repo_connector.install_plugin(
848                _plugin.name, debug=debug, force=force
849            )
850            if not _success:
851                warn(
852                    f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'"
853                    + f" for plugin '{self.name}':\n" + _msg,
854                    stack = False,
855                )
856                if not force:
857                    warn(
858                        "Try installing with the `--force` flag to continue anyway.",
859                        stack = False,
860                    )
861                    return False
862                info(
863                    "Continuing with installation despite the failure "
864                    + "(careful, things might be broken!)...",
865                    icon = False
866                )
867
868
869        ### First step: parse `requirements.txt` if it exists.
870        if self.requirements_file_path is not None:
871            if not pip_install(
872                requirements_file_path=self.requirements_file_path,
873                venv=self.name, debug=debug
874            ):
875                warn(
876                    f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.",
877                    stack = False,
878                )
879                if not force:
880                    warn(
881                        "Try installing with `--force` to continue anyway.",
882                        stack = False,
883                    )
884                    return False
885                info(
886                    "Continuing with installation despite the failure "
887                    + "(careful, things might be broken!)...",
888                    icon = False
889                )
890
891
892        ### Don't reinstall packages that are already included in required plugins.
893        packages = []
894        _packages = self.get_required_packages(debug=debug)
895        accounted_for_packages = set()
896        for package_name in _packages:
897            for plugin in plugins:
898                if venv_contains_package(package_name, plugin.name):
899                    accounted_for_packages.add(package_name)
900                    break
901        packages = [pkg for pkg in _packages if pkg not in accounted_for_packages]
902
903        ### Attempt pip packages installation.
904        if packages:
905            for package in packages:
906                if not pip_install(package, venv=self.name, debug=debug):
907                    warn(
908                        f"Failed to install required package '{package}'"
909                        + f" for plugin '{self.name}'.",
910                        stack = False,
911                    )
912                    if not force:
913                        warn(
914                            "Try installing with `--force` to continue anyway.",
915                            stack = False,
916                        )
917                        return False
918                    info(
919                        "Continuing with installation despite the failure "
920                        + "(careful, things might be broken!)...",
921                        icon = False
922                    )
923        return True
924
925
926    @property
927    def full_name(self) -> str:
928        """
929        Include the repo keys with the plugin's name.
930        """
931        from meerschaum.config.static import STATIC_CONFIG
932        sep = STATIC_CONFIG['plugins']['repo_separator']
933        return self.name + sep + str(self.repo_connector)
934
935
936    def __str__(self):
937        return self.name
938
939
940    def __repr__(self):
941        return f"Plugin('{self.name}', repo='{self.repo_connector}')"
942
943
944    def __del__(self):
945        pass

Handle packaging of Meerschaum plugins.

Plugin( name: str, version: Optional[str] = None, user_id: Optional[int] = None, required: Optional[List[str]] = None, attributes: Optional[Dict[str, Any]] = None, archive_path: Optional[pathlib.Path] = None, venv_path: Optional[pathlib.Path] = None, repo_connector: Optional[meerschaum.connectors.api.APIConnector.APIConnector] = None, repo: Union[meerschaum.connectors.api.APIConnector.APIConnector, str, NoneType] = None)
35    def __init__(
36        self,
37        name: str,
38        version: Optional[str] = None,
39        user_id: Optional[int] = None,
40        required: Optional[List[str]] = None,
41        attributes: Optional[Dict[str, Any]] = None,
42        archive_path: Optional[pathlib.Path] = None,
43        venv_path: Optional[pathlib.Path] = None,
44        repo_connector: Optional['meerschaum.connectors.api.APIConnector'] = None,
45        repo: Union['meerschaum.connectors.api.APIConnector', str, None] = None,
46    ):
47        from meerschaum.config.static import STATIC_CONFIG
48        sep = STATIC_CONFIG['plugins']['repo_separator']
49        _repo = None
50        if sep in name:
51            try:
52                name, _repo = name.split(sep)
53            except Exception as e:
54                error(f"Invalid plugin name: '{name}'")
55        self._repo_in_name = _repo
56
57        if attributes is None:
58            attributes = {}
59        self.name = name
60        self.attributes = attributes
61        self.user_id = user_id
62        self._version = version
63        if required:
64            self._required = required
65        self.archive_path = (
66            archive_path if archive_path is not None
67            else PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz"
68        )
69        self.venv_path = (
70            venv_path if venv_path is not None
71            else VIRTENV_RESOURCES_PATH / self.name
72        )
73        self._repo_connector = repo_connector
74        self._repo_keys = repo
name
attributes
user_id
archive_path
venv_path
repo_connector

Return the repository connector for this plugin. NOTE: This imports the connectors module, which imports certain plugin modules.

version

Return the plugin's module version is defined (__version__) if it's defined.

module

Return the Python module of the underlying plugin.

requirements_file_path: Optional[pathlib.Path]

If a file named requirements.txt exists, return its path.

def is_installed(self, **kw) -> bool:
158    def is_installed(self, **kw) -> bool:
159        """
160        Check whether a plugin is correctly installed.
161
162        Returns
163        -------
164        A `bool` indicating whether a plugin exists and is successfully imported.
165        """
166        return self.__file__ is not None

Check whether a plugin is correctly installed.

Returns
  • A bool indicating whether a plugin exists and is successfully imported.
def make_tar(self, debug: bool = False) -> pathlib.Path:
169    def make_tar(self, debug: bool = False) -> pathlib.Path:
170        """
171        Compress the plugin's source files into a `.tar.gz` archive and return the archive's path.
172
173        Parameters
174        ----------
175        debug: bool, default False
176            Verbosity toggle.
177
178        Returns
179        -------
180        A `pathlib.Path` to the archive file's path.
181
182        """
183        import tarfile, pathlib, subprocess, fnmatch
184        from meerschaum.utils.debug import dprint
185        from meerschaum.utils.packages import attempt_import
186        pathspec = attempt_import('pathspec', debug=debug)
187
188        if not self.__file__:
189            from meerschaum.utils.warnings import error
190            error(f"Could not find file for plugin '{self}'.")
191        if '__init__.py' in self.__file__ or os.path.isdir(self.__file__):
192            path = self.__file__.replace('__init__.py', '')
193            is_dir = True
194        else:
195            path = self.__file__
196            is_dir = False
197
198        old_cwd = os.getcwd()
199        real_parent_path = pathlib.Path(os.path.realpath(path)).parent
200        os.chdir(real_parent_path)
201
202        default_patterns_to_ignore = [
203            '.pyc',
204            '__pycache__/',
205            'eggs/',
206            '__pypackages__/',
207            '.git',
208        ]
209
210        def parse_gitignore() -> 'Set[str]':
211            gitignore_path = pathlib.Path(path) / '.gitignore'
212            if not gitignore_path.exists():
213                return set()
214            with open(gitignore_path, 'r', encoding='utf-8') as f:
215                gitignore_text = f.read()
216            return set(pathspec.PathSpec.from_lines(
217                pathspec.patterns.GitWildMatchPattern,
218                default_patterns_to_ignore + gitignore_text.splitlines()
219            ).match_tree(path))
220
221        patterns_to_ignore = parse_gitignore() if is_dir else set()
222
223        if debug:
224            dprint(f"Patterns to ignore:\n{patterns_to_ignore}")
225
226        with tarfile.open(self.archive_path, 'w:gz') as tarf:
227            if not is_dir:
228                tarf.add(f"{self.name}.py")
229            else:
230                for root, dirs, files in os.walk(self.name):
231                    for f in files:
232                        good_file = True
233                        fp = os.path.join(root, f)
234                        for pattern in patterns_to_ignore:
235                            if pattern in str(fp) or f.startswith('.'):
236                                good_file = False
237                                break
238                        if good_file:
239                            if debug:
240                                dprint(f"Adding '{fp}'...")
241                            tarf.add(fp)
242
243        ### clean up and change back to old directory
244        os.chdir(old_cwd)
245
246        ### change to 775 to avoid permissions issues with the API in a Docker container
247        self.archive_path.chmod(0o775)
248
249        if debug:
250            dprint(f"Created archive '{self.archive_path}'.")
251        return self.archive_path

Compress the plugin's source files into a .tar.gz archive and return the archive's path.

Parameters
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pathlib.Path to the archive file's path.
def install(self, force: bool = False, debug: bool = False) -> Tuple[bool, str]:
254    def install(
255            self,
256            force: bool = False,
257            debug: bool = False,
258        ) -> SuccessTuple:
259        """
260        Extract a plugin's tar archive to the plugins directory.
261        
262        This function checks if the plugin is already installed and if the version is equal or
263        greater than the existing installation.
264
265        Parameters
266        ----------
267        force: bool, default False
268            If `True`, continue with installation, even if required packages fail to install.
269
270        debug: bool, default False
271            Verbosity toggle.
272
273        Returns
274        -------
275        A `SuccessTuple` of success (bool) and a message (str).
276
277        """
278        if self.full_name in _ongoing_installations:
279            return True, f"Already installing plugin '{self}'."
280        _ongoing_installations.add(self.full_name)
281        from meerschaum.utils.warnings import warn, error
282        if debug:
283            from meerschaum.utils.debug import dprint
284        import tarfile
285        import re
286        import ast
287        from meerschaum.plugins import sync_plugins_symlinks
288        from meerschaum.utils.packages import attempt_import, determine_version, reload_meerschaum
289        from meerschaum.utils.venv import init_venv
290        from meerschaum.utils.misc import safely_extract_tar
291        old_cwd = os.getcwd()
292        old_version = ''
293        new_version = ''
294        temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name
295        temp_dir.mkdir(exist_ok=True)
296
297        if not self.archive_path.exists():
298            return False, f"Missing archive file for plugin '{self}'."
299        if self.version is not None:
300            old_version = self.version
301            if debug:
302                dprint(f"Found existing version '{old_version}' for plugin '{self}'.")
303
304        if debug:
305            dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...")
306
307        try:
308            with tarfile.open(self.archive_path, 'r:gz') as tarf:
309                safely_extract_tar(tarf, temp_dir)
310        except Exception as e:
311            warn(e)
312            return False, f"Failed to extract plugin '{self.name}'."
313
314        ### search for version information
315        files = os.listdir(temp_dir)
316        
317        if str(files[0]) == self.name:
318            is_dir = True
319        elif str(files[0]) == self.name + '.py':
320            is_dir = False
321        else:
322            error(f"Unknown format encountered for plugin '{self}'.")
323
324        fpath = temp_dir / files[0]
325        if is_dir:
326            fpath = fpath / '__init__.py'
327
328        init_venv(self.name, debug=debug)
329        with open(fpath, 'r', encoding='utf-8') as f:
330            init_lines = f.readlines()
331        new_version = None
332        for line in init_lines:
333            if '__version__' not in line:
334                continue
335            version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip())
336            if not version_match:
337                continue
338            new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip())
339            break
340        if not new_version:
341            warn(
342                f"No `__version__` defined for plugin '{self}'. "
343                + "Assuming new version...",
344                stack = False,
345            )
346
347        packaging_version = attempt_import('packaging.version')
348        try:
349            is_new_version = (not new_version and not old_version) or (
350                packaging_version.parse(old_version) < packaging_version.parse(new_version)
351            )
352            is_same_version = new_version and old_version and (
353                packaging_version.parse(old_version) == packaging_version.parse(new_version)
354            )
355        except Exception as e:
356            is_new_version, is_same_version = True, False
357
358        ### Determine where to permanently store the new plugin.
359        plugin_installation_dir_path = PLUGINS_DIR_PATHS[0]
360        for path in PLUGINS_DIR_PATHS:
361            files_in_plugins_dir = os.listdir(path)
362            if (
363                self.name in files_in_plugins_dir
364                or
365                (self.name + '.py') in files_in_plugins_dir
366            ):
367                plugin_installation_dir_path = path
368                break
369
370        success_msg = f"Successfully installed plugin '{self}'."
371        success, abort = None, None
372
373        if is_same_version and not force:
374            success, msg = True, (
375                f"Plugin '{self}' is up-to-date (version {old_version}).\n" +
376                "    Install again with `-f` or `--force` to reinstall."
377            )
378            abort = True
379        elif is_new_version or force:
380            for src_dir, dirs, files in os.walk(temp_dir):
381                if success is not None:
382                    break
383                dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path))
384                if not os.path.exists(dst_dir):
385                    os.mkdir(dst_dir)
386                for f in files:
387                    src_file = os.path.join(src_dir, f)
388                    dst_file = os.path.join(dst_dir, f)
389                    if os.path.exists(dst_file):
390                        os.remove(dst_file)
391
392                    if debug:
393                        dprint(f"Moving '{src_file}' to '{dst_dir}'...")
394                    try:
395                        shutil.move(src_file, dst_dir)
396                    except Exception as e:
397                        success, msg = False, (
398                            f"Failed to install plugin '{self}': " +
399                            f"Could not move file '{src_file}' to '{dst_dir}'"
400                        )
401                        print(msg)
402                        break
403            if success is None:
404                success, msg = True, success_msg
405        else:
406            success, msg = False, (
407                f"Your installed version of plugin '{self}' ({old_version}) is higher than "
408                + f"attempted version {new_version}."
409            )
410
411        shutil.rmtree(temp_dir)
412        os.chdir(old_cwd)
413
414        ### Reload the plugin's module.
415        sync_plugins_symlinks(debug=debug)
416        if '_module' in self.__dict__:
417            del self.__dict__['_module']
418        init_venv(venv=self.name, force=True, debug=debug)
419        reload_meerschaum(debug=debug)
420
421        ### if we've already failed, return here
422        if not success or abort:
423            _ongoing_installations.remove(self.full_name)
424            return success, msg
425
426        ### attempt to install dependencies
427        if not self.install_dependencies(force=force, debug=debug):
428            _ongoing_installations.remove(self.full_name)
429            return False, f"Failed to install dependencies for plugin '{self}'."
430
431        ### handling success tuple, bool, or other (typically None)
432        setup_tuple = self.setup(debug=debug)
433        if isinstance(setup_tuple, tuple):
434            if not setup_tuple[0]:
435                success, msg = setup_tuple
436        elif isinstance(setup_tuple, bool):
437            if not setup_tuple:
438                success, msg = False, (
439                    f"Failed to run post-install setup for plugin '{self}'." + '\n' +
440                    f"Check `setup()` in '{self.__file__}' for more information " +
441                    f"(no error message provided)."
442                )
443            else:
444                success, msg = True, success_msg
445        elif setup_tuple is None:
446            success = True
447            msg = (
448                f"Post-install for plugin '{self}' returned None. " +
449                f"Assuming plugin successfully installed."
450            )
451            warn(msg)
452        else:
453            success = False
454            msg = (
455                f"Post-install for plugin '{self}' returned unexpected value " +
456                f"of type '{type(setup_tuple)}': {setup_tuple}"
457            )
458
459        _ongoing_installations.remove(self.full_name)
460        module = self.module
461        return success, msg

Extract a plugin's tar archive to the plugins directory.

This function checks if the plugin is already installed and if the version is equal or greater than the existing installation.

Parameters
  • force (bool, default False): If True, continue with installation, even if required packages fail to install.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A SuccessTuple of success (bool) and a message (str).
def remove_archive(self, debug: bool = False) -> Tuple[bool, str]:
464    def remove_archive(
465            self,        
466            debug: bool = False
467        ) -> SuccessTuple:
468        """Remove a plugin's archive file."""
469        if not self.archive_path.exists():
470            return True, f"Archive file for plugin '{self}' does not exist."
471        try:
472            self.archive_path.unlink()
473        except Exception as e:
474            return False, f"Failed to remove archive for plugin '{self}':\n{e}"
475        return True, "Success"

Remove a plugin's archive file.

def remove_venv(self, debug: bool = False) -> Tuple[bool, str]:
478    def remove_venv(
479            self,        
480            debug: bool = False
481        ) -> SuccessTuple:
482        """Remove a plugin's virtual environment."""
483        if not self.venv_path.exists():
484            return True, f"Virtual environment for plugin '{self}' does not exist."
485        try:
486            shutil.rmtree(self.venv_path)
487        except Exception as e:
488            return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}"
489        return True, "Success"

Remove a plugin's virtual environment.

def uninstall(self, debug: bool = False) -> Tuple[bool, str]:
492    def uninstall(self, debug: bool = False) -> SuccessTuple:
493        """
494        Remove a plugin, its virtual environment, and archive file.
495        """
496        from meerschaum.utils.packages import reload_meerschaum
497        from meerschaum.plugins import sync_plugins_symlinks
498        from meerschaum.utils.warnings import warn, info
499        warnings_thrown_count: int = 0
500        max_warnings: int = 3
501
502        if not self.is_installed():
503            info(
504                f"Plugin '{self.name}' doesn't seem to be installed.\n    "
505                + "Checking for artifacts...",
506                stack = False,
507            )
508        else:
509            real_path = pathlib.Path(os.path.realpath(self.__file__))
510            try:
511                if real_path.name == '__init__.py':
512                    shutil.rmtree(real_path.parent)
513                else:
514                    real_path.unlink()
515            except Exception as e:
516                warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False)
517                warnings_thrown_count += 1
518            else:
519                info(f"Removed source files for plugin '{self.name}'.")
520
521        if self.venv_path.exists():
522            success, msg = self.remove_venv(debug=debug)
523            if not success:
524                warn(msg, stack=False)
525                warnings_thrown_count += 1
526            else:
527                info(f"Removed virtual environment from plugin '{self.name}'.")
528
529        success = warnings_thrown_count < max_warnings
530        sync_plugins_symlinks(debug=debug)
531        self.deactivate_venv(force=True, debug=debug)
532        reload_meerschaum(debug=debug)
533        return success, (
534            f"Successfully uninstalled plugin '{self}'." if success
535            else f"Failed to uninstall plugin '{self}'."
536        )

Remove a plugin, its virtual environment, and archive file.

def setup( self, *args: str, debug: bool = False, **kw: Any) -> Union[Tuple[bool, str], bool]:
539    def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]:
540        """
541        If exists, run the plugin's `setup()` function.
542
543        Parameters
544        ----------
545        *args: str
546            The positional arguments passed to the `setup()` function.
547            
548        debug: bool, default False
549            Verbosity toggle.
550
551        **kw: Any
552            The keyword arguments passed to the `setup()` function.
553
554        Returns
555        -------
556        A `SuccessTuple` or `bool` indicating success.
557
558        """
559        from meerschaum.utils.debug import dprint
560        import inspect
561        _setup = None
562        for name, fp in inspect.getmembers(self.module):
563            if name == 'setup' and inspect.isfunction(fp):
564                _setup = fp
565                break
566
567        ### assume success if no setup() is found (not necessary)
568        if _setup is None:
569            return True
570
571        sig = inspect.signature(_setup)
572        has_debug, has_kw = ('debug' in sig.parameters), False
573        for k, v in sig.parameters.items():
574            if '**' in str(v):
575                has_kw = True
576                break
577
578        _kw = {}
579        if has_kw:
580            _kw.update(kw)
581        if has_debug:
582            _kw['debug'] = debug
583
584        if debug:
585            dprint(f"Running setup for plugin '{self}'...")
586        try:
587            self.activate_venv(debug=debug)
588            return_tuple = _setup(*args, **_kw)
589            self.deactivate_venv(debug=debug)
590        except Exception as e:
591            return False, str(e)
592
593        if isinstance(return_tuple, tuple):
594            return return_tuple
595        if isinstance(return_tuple, bool):
596            return return_tuple, f"Setup for Plugin '{self.name}' did not return a message."
597        if return_tuple is None:
598            return False, f"Setup for Plugin '{self.name}' returned None."
599        return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"

If exists, run the plugin's setup() function.

Parameters
  • *args (str): The positional arguments passed to the setup() function.
  • debug (bool, default False): Verbosity toggle.
  • **kw (Any): The keyword arguments passed to the setup() function.
Returns
  • A SuccessTuple or bool indicating success.
def get_dependencies(self, debug: bool = False) -> List[str]:
602    def get_dependencies(
603            self,
604            debug: bool = False,
605        ) -> List[str]:
606        """
607        If the Plugin has specified dependencies in a list called `required`, return the list.
608        
609        **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages.
610        Meerschaum plugins may also specify connector keys for a repo after `'@'`.
611
612        Parameters
613        ----------
614        debug: bool, default False
615            Verbosity toggle.
616
617        Returns
618        -------
619        A list of required packages and plugins (str).
620
621        """
622        if '_required' in self.__dict__:
623            return self._required
624
625        ### If the plugin has not yet been imported,
626        ### infer the dependencies from the source text.
627        ### This is not super robust, and it doesn't feel right
628        ### having multiple versions of the logic.
629        ### This is necessary when determining the activation order
630        ### without having import the module.
631        ### For consistency's sake, the module-less method does not cache the requirements.
632        if self.__dict__.get('_module', None) is None:
633            file_path = self.__file__
634            if file_path is None:
635                return []
636            with open(file_path, 'r', encoding='utf-8') as f:
637                text = f.read()
638
639            if 'required' not in text:
640                return []
641
642            ### This has some limitations:
643            ### It relies on `required` being manually declared.
644            ### We lose the ability to dynamically alter the `required` list,
645            ### which is why we've kept the module-reliant method below.
646            import ast, re
647            ### NOTE: This technically would break 
648            ### if `required` was the very first line of the file.
649            req_start_match = re.search(r'\nrequired(\s?)=', text)
650            if not req_start_match:
651                return []
652            req_start = req_start_match.start()
653
654            ### Dependencies may have brackets within the strings, so push back the index.
655            first_opening_brace = req_start + 1 + text[req_start:].find('[')
656            if first_opening_brace == -1:
657                return []
658
659            next_closing_brace = req_start + 1 + text[req_start:].find(']')
660            if next_closing_brace == -1:
661                return []
662
663            start_ix = first_opening_brace + 1
664            end_ix = next_closing_brace
665
666            num_braces = 0
667            while True:
668                if '[' not in text[start_ix:end_ix]:
669                    break
670                num_braces += 1
671                start_ix = end_ix
672                end_ix += text[end_ix + 1:].find(']') + 1
673
674            req_end = end_ix + 1
675            req_text = (
676                text[req_start:req_end]
677                .lstrip()
678                .replace('required', '', 1)
679                .lstrip()
680                .replace('=', '', 1)
681                .lstrip()
682            )
683            try:
684                required = ast.literal_eval(req_text)
685            except Exception as e:
686                warn(
687                    f"Unable to determine requirements for plugin '{self.name}' "
688                    + "without importing the module.\n"
689                    + "    This may be due to dynamically setting the global `required` list.\n"
690                    + f"    {e}"
691                )
692                return []
693            return required
694
695        import inspect
696        self.activate_venv(dependencies=False, debug=debug)
697        required = []
698        for name, val in inspect.getmembers(self.module):
699            if name == 'required':
700                required = val
701                break
702        self._required = required
703        self.deactivate_venv(dependencies=False, debug=debug)
704        return required

If the Plugin has specified dependencies in a list called required, return the list.

NOTE: Dependecies which start with 'plugin:' are Meerschaum plugins, not pip packages. Meerschaum plugins may also specify connector keys for a repo after '@'.

Parameters
  • debug (bool, default False): Verbosity toggle.
Returns
  • A list of required packages and plugins (str).
def get_required_plugins(self, debug: bool = False) -> List[Plugin]:
707    def get_required_plugins(self, debug: bool=False) -> List[meerschaum.plugins.Plugin]:
708        """
709        Return a list of required Plugin objects.
710        """
711        from meerschaum.utils.warnings import warn
712        from meerschaum.config import get_config
713        from meerschaum.config.static import STATIC_CONFIG
714        plugins = []
715        _deps = self.get_dependencies(debug=debug)
716        sep = STATIC_CONFIG['plugins']['repo_separator']
717        plugin_names = [
718            _d[len('plugin:'):] for _d in _deps
719            if _d.startswith('plugin:') and len(_d) > len('plugin:')
720        ]
721        default_repo_keys = get_config('meerschaum', 'default_repository')
722        for _plugin_name in plugin_names:
723            if sep in _plugin_name:
724                try:
725                    _plugin_name, _repo_keys = _plugin_name.split(sep)
726                except Exception as e:
727                    _repo_keys = default_repo_keys
728                    warn(
729                        f"Invalid repo keys for required plugin '{_plugin_name}'.\n    "
730                        + f"Will try to use '{_repo_keys}' instead.",
731                        stack = False,
732                    )
733            else:
734                _repo_keys = default_repo_keys
735            plugins.append(Plugin(_plugin_name, repo=_repo_keys))
736        return plugins

Return a list of required Plugin objects.

def get_required_packages(self, debug: bool = False) -> List[str]:
739    def get_required_packages(self, debug: bool=False) -> List[str]:
740        """
741        Return the required package names (excluding plugins).
742        """
743        _deps = self.get_dependencies(debug=debug)
744        return [_d for _d in _deps if not _d.startswith('plugin:')]

Return the required package names (excluding plugins).

def activate_venv(self, dependencies: bool = True, debug: bool = False, **kw) -> bool:
747    def activate_venv(self, dependencies: bool=True, debug: bool=False, **kw) -> bool:
748        """
749        Activate the virtual environments for the plugin and its dependencies.
750
751        Parameters
752        ----------
753        dependencies: bool, default True
754            If `True`, activate the virtual environments for required plugins.
755
756        Returns
757        -------
758        A bool indicating success.
759        """
760        from meerschaum.utils.venv import venv_target_path
761        from meerschaum.utils.packages import activate_venv
762        from meerschaum.utils.misc import make_symlink, is_symlink
763        from meerschaum.config._paths import PACKAGE_ROOT_PATH
764
765        if dependencies:
766            for plugin in self.get_required_plugins(debug=debug):
767                plugin.activate_venv(debug=debug, **kw)
768
769        vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True)
770        venv_meerschaum_path = vtp / 'meerschaum'
771
772        try:
773            success, msg = True, "Success"
774            if is_symlink(venv_meerschaum_path):
775                if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH:
776                    venv_meerschaum_path.unlink()
777                    success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH)
778        except Exception as e:
779            success, msg = False, str(e)
780        if not success:
781            warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}")
782
783        return activate_venv(self.name, debug=debug, **kw)

Activate the virtual environments for the plugin and its dependencies.

Parameters
  • dependencies (bool, default True): If True, activate the virtual environments for required plugins.
Returns
  • A bool indicating success.
def deactivate_venv(self, dependencies: bool = True, debug: bool = False, **kw) -> bool:
786    def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool:
787        """
788        Deactivate the virtual environments for the plugin and its dependencies.
789
790        Parameters
791        ----------
792        dependencies: bool, default True
793            If `True`, deactivate the virtual environments for required plugins.
794
795        Returns
796        -------
797        A bool indicating success.
798        """
799        from meerschaum.utils.packages import deactivate_venv
800        success = deactivate_venv(self.name, debug=debug, **kw)
801        if dependencies:
802            for plugin in self.get_required_plugins(debug=debug):
803                plugin.deactivate_venv(debug=debug, **kw)
804        return success

Deactivate the virtual environments for the plugin and its dependencies.

Parameters
  • dependencies (bool, default True): If True, deactivate the virtual environments for required plugins.
Returns
  • A bool indicating success.
def install_dependencies(self, force: bool = False, debug: bool = False) -> bool:
807    def install_dependencies(
808            self,
809            force: bool = False,
810            debug: bool = False,
811        ) -> bool:
812        """
813        If specified, install dependencies.
814        
815        **NOTE:** Dependencies that start with `'plugin:'` will be installed as
816        Meerschaum plugins from the same repository as this Plugin.
817        To install from a different repository, add the repo keys after `'@'`
818        (e.g. `'plugin:foo@api:bar'`).
819
820        Parameters
821        ----------
822        force: bool, default False
823            If `True`, continue with the installation, even if some
824            required packages fail to install.
825
826        debug: bool, default False
827            Verbosity toggle.
828
829        Returns
830        -------
831        A bool indicating success.
832
833        """
834        from meerschaum.utils.packages import pip_install, venv_contains_package
835        from meerschaum.utils.debug import dprint
836        from meerschaum.utils.warnings import warn, info
837        from meerschaum.connectors.parse import parse_repo_keys
838        _deps = self.get_dependencies(debug=debug)
839        if not _deps and self.requirements_file_path is None:
840            return True
841
842        plugins = self.get_required_plugins(debug=debug)
843        for _plugin in plugins:
844            if _plugin.name == self.name:
845                warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False)
846                continue
847            _success, _msg = _plugin.repo_connector.install_plugin(
848                _plugin.name, debug=debug, force=force
849            )
850            if not _success:
851                warn(
852                    f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'"
853                    + f" for plugin '{self.name}':\n" + _msg,
854                    stack = False,
855                )
856                if not force:
857                    warn(
858                        "Try installing with the `--force` flag to continue anyway.",
859                        stack = False,
860                    )
861                    return False
862                info(
863                    "Continuing with installation despite the failure "
864                    + "(careful, things might be broken!)...",
865                    icon = False
866                )
867
868
869        ### First step: parse `requirements.txt` if it exists.
870        if self.requirements_file_path is not None:
871            if not pip_install(
872                requirements_file_path=self.requirements_file_path,
873                venv=self.name, debug=debug
874            ):
875                warn(
876                    f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.",
877                    stack = False,
878                )
879                if not force:
880                    warn(
881                        "Try installing with `--force` to continue anyway.",
882                        stack = False,
883                    )
884                    return False
885                info(
886                    "Continuing with installation despite the failure "
887                    + "(careful, things might be broken!)...",
888                    icon = False
889                )
890
891
892        ### Don't reinstall packages that are already included in required plugins.
893        packages = []
894        _packages = self.get_required_packages(debug=debug)
895        accounted_for_packages = set()
896        for package_name in _packages:
897            for plugin in plugins:
898                if venv_contains_package(package_name, plugin.name):
899                    accounted_for_packages.add(package_name)
900                    break
901        packages = [pkg for pkg in _packages if pkg not in accounted_for_packages]
902
903        ### Attempt pip packages installation.
904        if packages:
905            for package in packages:
906                if not pip_install(package, venv=self.name, debug=debug):
907                    warn(
908                        f"Failed to install required package '{package}'"
909                        + f" for plugin '{self.name}'.",
910                        stack = False,
911                    )
912                    if not force:
913                        warn(
914                            "Try installing with `--force` to continue anyway.",
915                            stack = False,
916                        )
917                        return False
918                    info(
919                        "Continuing with installation despite the failure "
920                        + "(careful, things might be broken!)...",
921                        icon = False
922                    )
923        return True

If specified, install dependencies.

NOTE: Dependencies that start with 'plugin:' will be installed as Meerschaum plugins from the same repository as this Plugin. To install from a different repository, add the repo keys after '@' (e.g. 'plugin:foo@api:bar').

Parameters
  • force (bool, default False): If True, continue with the installation, even if some required packages fail to install.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool indicating success.
full_name: str

Include the repo keys with the plugin's name.

class Venv:
 18class Venv:
 19    """
 20    Manage a virtual enviroment's activation status.
 21
 22    Examples
 23    --------
 24    >>> from meerschaum.plugins import Plugin
 25    >>> with Venv('mrsm') as venv:
 26    ...     import pandas
 27    >>> with Venv(Plugin('noaa')) as venv:
 28    ...     import requests
 29    >>> venv = Venv('mrsm')
 30    >>> venv.activate()
 31    True
 32    >>> venv.deactivate()
 33    True
 34    >>> 
 35    """
 36
 37    def __init__(
 38            self,
 39            venv: Union[str, 'meerschaum.plugins.Plugin', None] = 'mrsm',
 40            debug: bool = False,
 41        ) -> None:
 42        from meerschaum.utils.venv import activate_venv, deactivate_venv, active_venvs
 43        ### For some weird threading issue,
 44        ### we can't use `isinstance` here.
 45        if 'meerschaum.plugins._Plugin' in str(type(venv)):
 46            self._venv = venv.name
 47            self._activate = venv.activate_venv
 48            self._deactivate = venv.deactivate_venv
 49            self._kwargs = {}
 50        else:
 51            self._venv = venv
 52            self._activate = activate_venv
 53            self._deactivate = deactivate_venv
 54            self._kwargs = {'venv': venv}
 55        self._debug = debug
 56        ### In case someone calls `deactivate()` before `activate()`.
 57        self._kwargs['previously_active_venvs'] = copy.deepcopy(active_venvs)
 58
 59
 60    def activate(self, debug: bool = False) -> bool:
 61        """
 62        Activate this virtual environment.
 63        If a `meerschaum.plugins.Plugin` was provided, its dependent virtual environments
 64        will also be activated.
 65        """
 66        from meerschaum.utils.venv import active_venvs
 67        self._kwargs['previously_active_venvs'] = copy.deepcopy(active_venvs)
 68        return self._activate(debug=(debug or self._debug), **self._kwargs)
 69
 70
 71    def deactivate(self, debug: bool = False) -> bool:
 72        """
 73        Deactivate this virtual environment.
 74        If a `meerschaum.plugins.Plugin` was provided, its dependent virtual environments
 75        will also be deactivated.
 76        """
 77        return self._deactivate(debug=(debug or self._debug), **self._kwargs)
 78
 79
 80    @property
 81    def target_path(self) -> pathlib.Path:
 82        """
 83        Return the target site-packages path for this virtual environment.
 84        A `meerschaum.utils.venv.Venv` may have one virtual environment per minor Python version
 85        (e.g. Python 3.10 and Python 3.7).
 86        """
 87        from meerschaum.utils.venv import venv_target_path
 88        return venv_target_path(venv=self._venv, allow_nonexistent=True, debug=self._debug)
 89
 90
 91    @property
 92    def root_path(self) -> pathlib.Path:
 93        """
 94        Return the top-level path for this virtual environment.
 95        """
 96        from meerschaum.config._paths import VIRTENV_RESOURCES_PATH
 97        if self._venv is None:
 98            return self.target_path.parent
 99        return VIRTENV_RESOURCES_PATH / self._venv
100
101
102    def __enter__(self) -> None:
103        self.activate(debug=self._debug)
104
105
106    def __exit__(self, exc_type, exc_value, exc_traceback) -> None:
107        self.deactivate(debug=self._debug)
108
109
110    def __str__(self) -> str:
111        quote = "'" if self._venv is not None else ""
112        return "Venv(" + quote + str(self._venv) + quote + ")"
113
114
115    def __repr__(self) -> str:
116        return self.__str__()

Manage a virtual enviroment's activation status.

Examples
>>> from meerschaum.plugins import Plugin
>>> with Venv('mrsm') as venv:
...     import pandas
>>> with Venv(Plugin('noaa')) as venv:
...     import requests
>>> venv = Venv('mrsm')
>>> venv.activate()
True
>>> venv.deactivate()
True
>>>
Venv( venv: Union[str, Plugin, NoneType] = 'mrsm', debug: bool = False)
37    def __init__(
38            self,
39            venv: Union[str, 'meerschaum.plugins.Plugin', None] = 'mrsm',
40            debug: bool = False,
41        ) -> None:
42        from meerschaum.utils.venv import activate_venv, deactivate_venv, active_venvs
43        ### For some weird threading issue,
44        ### we can't use `isinstance` here.
45        if 'meerschaum.plugins._Plugin' in str(type(venv)):
46            self._venv = venv.name
47            self._activate = venv.activate_venv
48            self._deactivate = venv.deactivate_venv
49            self._kwargs = {}
50        else:
51            self._venv = venv
52            self._activate = activate_venv
53            self._deactivate = deactivate_venv
54            self._kwargs = {'venv': venv}
55        self._debug = debug
56        ### In case someone calls `deactivate()` before `activate()`.
57        self._kwargs['previously_active_venvs'] = copy.deepcopy(active_venvs)
def activate(self, debug: bool = False) -> bool:
60    def activate(self, debug: bool = False) -> bool:
61        """
62        Activate this virtual environment.
63        If a `meerschaum.plugins.Plugin` was provided, its dependent virtual environments
64        will also be activated.
65        """
66        from meerschaum.utils.venv import active_venvs
67        self._kwargs['previously_active_venvs'] = copy.deepcopy(active_venvs)
68        return self._activate(debug=(debug or self._debug), **self._kwargs)

Activate this virtual environment. If a meerschaum.plugins.Plugin was provided, its dependent virtual environments will also be activated.

def deactivate(self, debug: bool = False) -> bool:
71    def deactivate(self, debug: bool = False) -> bool:
72        """
73        Deactivate this virtual environment.
74        If a `meerschaum.plugins.Plugin` was provided, its dependent virtual environments
75        will also be deactivated.
76        """
77        return self._deactivate(debug=(debug or self._debug), **self._kwargs)

Deactivate this virtual environment. If a meerschaum.plugins.Plugin was provided, its dependent virtual environments will also be deactivated.

target_path: pathlib.Path

Return the target site-packages path for this virtual environment. A meerschaum.utils.venv.Venv may have one virtual environment per minor Python version (e.g. Python 3.10 and Python 3.7).

root_path: pathlib.Path

Return the top-level path for this virtual environment.

def pprint( *args, detect_password: bool = True, nopretty: bool = False, **kw) -> None:
 10def pprint(
 11        *args,
 12        detect_password: bool = True,
 13        nopretty: bool = False,
 14        **kw
 15    ) -> None:
 16    """Pretty print an object according to the configured ANSI and UNICODE settings.
 17    If detect_password is True (default), search and replace passwords with '*' characters.
 18    Does not mutate objects.
 19    """
 20    from meerschaum.utils.packages import attempt_import, import_rich
 21    from meerschaum.utils.formatting import ANSI, UNICODE, get_console, print_tuple
 22    from meerschaum.utils.warnings import error
 23    from meerschaum.utils.misc import replace_password, dict_from_od, filter_keywords
 24    from collections import OrderedDict
 25    import copy, json
 26
 27    if (
 28        len(args) == 1
 29        and
 30        isinstance(args[0], tuple)
 31        and
 32        len(args[0]) == 2
 33        and
 34        isinstance(args[0][0], bool)
 35        and
 36        isinstance(args[0][1], str)
 37    ):
 38        return print_tuple(args[0])
 39
 40    modify = True
 41    rich_pprint = None
 42    if ANSI and not nopretty:
 43        rich = import_rich()
 44        if rich is not None:
 45            rich_pretty = attempt_import('rich.pretty')
 46        if rich_pretty is not None:
 47            def _rich_pprint(*args, **kw):
 48                _console = get_console()
 49                _kw = filter_keywords(_console.print, **kw)
 50                _console.print(*args, **_kw)
 51            rich_pprint = _rich_pprint
 52    elif not nopretty:
 53        pprintpp = attempt_import('pprintpp', warn=False)
 54        try:
 55            _pprint = pprintpp.pprint
 56        except Exception as e:
 57            import pprint as _pprint_module
 58            _pprint = _pprint_module.pprint
 59
 60    func = (
 61        _pprint if rich_pprint is None else rich_pprint
 62    ) if not nopretty else print
 63
 64    try:
 65        args_copy = copy.deepcopy(args)
 66    except Exception as e:
 67        args_copy = args
 68        modify = False
 69    _args = []
 70    for a in args:
 71        c = a
 72        ### convert OrderedDict into dict
 73        if isinstance(a, OrderedDict) or issubclass(type(a), OrderedDict):
 74            c = dict_from_od(copy.deepcopy(c))
 75        _args.append(c)
 76    args = _args
 77
 78    _args = list(args)
 79    if detect_password and modify:
 80        _args = []
 81        for a in args:
 82            c = a
 83            if isinstance(c, dict):
 84                c = replace_password(copy.deepcopy(c))
 85            if nopretty:
 86                try:
 87                    c = json.dumps(c)
 88                    is_json = True
 89                except Exception as e:
 90                    is_json = False
 91                if not is_json:
 92                    try:
 93                        c = str(c)
 94                    except Exception as e:
 95                        pass
 96            _args.append(c)
 97
 98    ### filter out unsupported keywords
 99    func_kw = filter_keywords(func, **kw) if not nopretty else {}
100    error_msg = None
101    try:
102        func(*_args, **func_kw)
103    except Exception as e:
104        error_msg = e
105    if error_msg is not None:
106        error(error_msg)

Pretty print an object according to the configured ANSI and UNICODE settings. If detect_password is True (default), search and replace passwords with '*' characters. Does not mutate objects.

def attempt_import( *names: str, lazy: bool = True, warn: bool = True, install: bool = True, venv: Optional[str] = 'mrsm', precheck: bool = True, split: bool = True, check_update: bool = False, check_pypi: bool = False, check_is_installed: bool = True, color: bool = True, debug: bool = False) -> Union[Any, Tuple[Any]]:
1052def attempt_import(
1053        *names: str,
1054        lazy: bool = True,
1055        warn: bool = True,
1056        install: bool = True,
1057        venv: Optional[str] = 'mrsm',
1058        precheck: bool = True,
1059        split: bool = True,
1060        check_update: bool = False,
1061        check_pypi: bool = False,
1062        check_is_installed: bool = True,
1063        color: bool = True,
1064        debug: bool = False
1065    ) -> Union[Any, Tuple[Any]]:
1066    """
1067    Raise a warning if packages are not installed; otherwise import and return modules.
1068    If `lazy` is `True`, return lazy-imported modules.
1069    
1070    Returns tuple of modules if multiple names are provided, else returns one module.
1071    
1072    Parameters
1073    ----------
1074    names: List[str]
1075        The packages to be imported.
1076
1077    lazy: bool, default True
1078        If `True`, lazily load packages.
1079
1080    warn: bool, default True
1081        If `True`, raise a warning if a package cannot be imported.
1082
1083    install: bool, default True
1084        If `True`, attempt to install a missing package into the designated virtual environment.
1085        If `check_update` is True, install updates if available.
1086
1087    venv: Optional[str], default 'mrsm'
1088        The virtual environment in which to search for packages and to install packages into.
1089
1090    precheck: bool, default True
1091        If `True`, attempt to find module before importing (necessary for checking if modules exist
1092        and retaining lazy imports), otherwise assume lazy is `False`.
1093
1094    split: bool, default True
1095        If `True`, split packages' names on `'.'`.
1096
1097    check_update: bool, default False
1098        If `True` and `install` is `True`, install updates if the required minimum version
1099        does not match.
1100
1101    check_pypi: bool, default False
1102        If `True` and `check_update` is `True`, check PyPI when determining whether
1103        an update is required.
1104
1105    check_is_installed: bool, default True
1106        If `True`, check if the package is contained in the virtual environment.
1107
1108    Returns
1109    -------
1110    The specified modules. If they're not available and `install` is `True`, it will first
1111    download them into a virtual environment and return the modules.
1112
1113    Examples
1114    --------
1115    >>> pandas, sqlalchemy = attempt_import('pandas', 'sqlalchemy')
1116    >>> pandas = attempt_import('pandas')
1117
1118    """
1119
1120    import importlib.util
1121
1122    ### to prevent recursion, check if parent Meerschaum package is being imported
1123    if names == ('meerschaum',):
1124        return _import_module('meerschaum')
1125
1126    if venv == 'mrsm' and _import_hook_venv is not None:
1127        if debug:
1128            print(f"Import hook for virtual environment '{_import_hook_venv}' is active.")
1129        venv = _import_hook_venv
1130
1131    _warnings = _import_module('meerschaum.utils.warnings')
1132    warn_function = _warnings.warn
1133
1134    def do_import(_name: str, **kw) -> Union['ModuleType', None]:
1135        with Venv(venv=venv, debug=debug):
1136            ### determine the import method (lazy vs normal)
1137            from meerschaum.utils.misc import filter_keywords
1138            import_method = (
1139                _import_module if not lazy
1140                else lazy_import
1141            )
1142            try:
1143                mod = import_method(_name, **(filter_keywords(import_method, **kw)))
1144            except Exception as e:
1145                if warn:
1146                    import traceback
1147                    traceback.print_exception(type(e), e, e.__traceback__)
1148                    warn_function(
1149                        f"Failed to import module '{_name}'.\nException:\n{e}",
1150                        ImportWarning,
1151                        stacklevel = (5 if lazy else 4),
1152                        color = False,
1153                    )
1154                mod = None
1155        return mod
1156
1157    modules = []
1158    for name in names:
1159        ### Check if package is a declared dependency.
1160        root_name = name.split('.')[0] if split else name
1161        install_name = _import_to_install_name(root_name)
1162
1163        if install_name is None:
1164            install_name = root_name
1165            if warn and root_name != 'plugins':
1166                warn_function(
1167                    f"Package '{root_name}' is not declared in meerschaum.utils.packages.",
1168                    ImportWarning,
1169                    stacklevel = 3,
1170                    color = False
1171                )
1172
1173        ### Determine if the package exists.
1174        if precheck is False:
1175            found_module = (
1176                do_import(
1177                    name, debug=debug, warn=False, venv=venv, color=color,
1178                    check_update=False, check_pypi=False, split=split,
1179                ) is not None
1180            )
1181        else:
1182            if check_is_installed:
1183                with _locks['_is_installed_first_check']:
1184                    if not _is_installed_first_check.get(name, False):
1185                        package_is_installed = is_installed(
1186                            name,
1187                            venv = venv,
1188                            split = split,
1189                            debug = debug,
1190                        )
1191                        _is_installed_first_check[name] = package_is_installed
1192                    else:
1193                        package_is_installed = _is_installed_first_check[name]
1194            else:
1195                package_is_installed = _is_installed_first_check.get(
1196                    name,
1197                    venv_contains_package(name, venv=venv, split=split, debug=debug)
1198                )
1199            found_module = package_is_installed
1200
1201        if not found_module:
1202            if install:
1203                if not pip_install(
1204                    install_name,
1205                    venv = venv,
1206                    split = False,
1207                    check_update = check_update,
1208                    color = color,
1209                    debug = debug
1210                ) and warn:
1211                    warn_function(
1212                        f"Failed to install '{install_name}'.",
1213                        ImportWarning,
1214                        stacklevel = 3,
1215                        color = False,
1216                    )
1217            elif warn:
1218                ### Raise a warning if we can't find the package and install = False.
1219                warn_function(
1220                    (f"\n\nMissing package '{name}' from virtual environment '{venv}'; "
1221                     + "some features will not work correctly."
1222                     + f"\n\nSet install=True when calling attempt_import.\n"),
1223                    ImportWarning,
1224                    stacklevel = 3,
1225                    color = False,
1226                )
1227
1228        ### Do the import. Will be lazy if lazy=True.
1229        m = do_import(
1230            name, debug=debug, warn=warn, venv=venv, color=color,
1231            check_update=check_update, check_pypi=check_pypi, install=install, split=split,
1232        )
1233        modules.append(m)
1234
1235    modules = tuple(modules)
1236    if len(modules) == 1:
1237        return modules[0]
1238    return modules

Raise a warning if packages are not installed; otherwise import and return modules. If lazy is True, return lazy-imported modules.

Returns tuple of modules if multiple names are provided, else returns one module.

Parameters
  • names (List[str]): The packages to be imported.
  • lazy (bool, default True): If True, lazily load packages.
  • warn (bool, default True): If True, raise a warning if a package cannot be imported.
  • install (bool, default True): If True, attempt to install a missing package into the designated virtual environment. If check_update is True, install updates if available.
  • venv (Optional[str], default 'mrsm'): The virtual environment in which to search for packages and to install packages into.
  • precheck (bool, default True): If True, attempt to find module before importing (necessary for checking if modules exist and retaining lazy imports), otherwise assume lazy is False.
  • split (bool, default True): If True, split packages' names on '.'.
  • check_update (bool, default False): If True and install is True, install updates if the required minimum version does not match.
  • check_pypi (bool, default False): If True and check_update is True, check PyPI when determining whether an update is required.
  • check_is_installed (bool, default True): If True, check if the package is contained in the virtual environment.
Returns
  • The specified modules. If they're not available and install is True, it will first
  • download them into a virtual environment and return the modules.
Examples
>>> pandas, sqlalchemy = attempt_import('pandas', 'sqlalchemy')
>>> pandas = attempt_import('pandas')