Package meerschaum

Meerschaum banner

PyPI GitHub Info Stats
PyPI GitHub Repo stars License Number of plugins
PyPI - Python Version GitHub Sponsors meerschaum Tutorials Number of registered users

Meerschaum demo

What is Meerschaum?

Meerschaum is a tool for quickly synchronizing time-series data streams called pipes. With Meerschaum, you can have a data visualization stack running in minutes.

Why Meerschaum?

If you've worked with time-series data, you know the headaches that come with ETL. Data engineering often gets in analysts' way, and when work needs to get done, every minute spent on pipelining is time taken away from real analysis.

Rather than copy / pasting your ETL scripts, simply build pipes with Meerschaum! Meerschaum gives you the tools to design your data streams how you like ― and don't worry — you can always incorporate Meerschaum into your existing systems!

Want to Learn More?

You can find a wealth of information at meerschaum.io!

Additionally, below are several articles published about Meerschaum:

Features

  • 📊 Built for Data Scientists and Analysts
    • Integrate with Pandas, Grafana and other popular data analysis tools.
    • Persist your dataframes and always get the latest data.
  • ⚡️ Production-Ready, Batteries Included
  • 🔌 Easily Expandable
  • Tailored for Your Experience
    • Rich CLI makes managing your data streams surprisingly enjoyable!
    • Web dashboard for those who prefer a more graphical experience.
    • Manage your database connections with Meerschaum connectors
    • Utility commands with sensible syntax let you control many pipes with grace.
  • 💼 Portable from the Start
    • The environment variable $MRSM_ROOT_DIR lets you emulate multiple installations and group together your instances.
    • No dependencies required; anything needed will be installed into a virtual environment.
    • Specify required packages for your plugins, and users will get those packages in a virtual environment.

Installation

For a more thorough setup guide, visit the Getting Started page at meerschaum.io.

TL;DR

pip install -U --user meerschaum
mrsm stack up -d db grafana
mrsm bootstrap pipes

Usage Documentation

Please visit meerschaum.io for setup, usage, and troubleshooting information. You can find technical documentation at docs.meerschaum.io, and here is a complete list of the Meerschaum actions.

>>> import meerschaum as mrsm
>>> pipe = mrsm.Pipe("plugin:noaa", "weather")
>>> df = pipe.get_data(begin='2022-02-02')
>>> df[['timestamp', 'station', 'temperature (wmoUnit:degC)']]
               timestamp station  temperature (wmoUnit:degC)
0    2022-03-29 09:54:00    KCEU                         8.3
1    2022-03-29 09:52:00    KATL                        10.6
2    2022-03-29 09:52:00    KCLT                         7.2
3    2022-03-29 08:54:00    KCEU                         8.3
4    2022-03-29 08:52:00    KATL                        11.1
...                  ...     ...                         ...
1626 2022-02-02 01:52:00    KATL                        10.0
1627 2022-02-02 01:52:00    KCLT                         7.8
1628 2022-02-02 00:54:00    KCEU                         8.3
1629 2022-02-02 00:52:00    KATL                        10.0
1630 2022-02-02 00:52:00    KCLT                         8.3

[1631 rows x 3 columns]
>>>

Plugins

Here is the list of community plugins and the public plugins repository.

For details on installing, using, and writing plugins, check out the plugins documentation at meerschaum.io.

Example Plugin

# ~/.config/meerschaum/plugins/example.py
__version__ = '0.0.1'
required = []

def register(pipe, **kw):
    return {
        'columns': {
            'datetime': 'dt',
            'id': 'id',
            'value': 'val',
        }
    }

def fetch(pipe, **kw):
    import datetime, random
    return {
        'dt': [datetime.datetime.utcnow()],
        'id': [1],
        'val': [random.randint(0, 100)],
    }

Support Meerschaum's Development

For consulting services and to support Meerschaum's development, please considering sponsoring me on GitHub sponsors.

Additionally, you can always buy me a coffee☕!

Expand source code
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8

"""
Copyright 2021 Bennett Meares

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from meerschaum.config import __version__
from meerschaum._internal.docs import index as __doc__
from meerschaum.core.Pipe import Pipe
from meerschaum.utils import get_pipes
from meerschaum.connectors import get_connector
from meerschaum.plugins import Plugin

__pdoc__ = {'gui': False, 'api': False, 'core': False,}
__all__ = ("Pipe", "get_pipes", "get_connector", "Plugin",)

Sub-modules

meerschaum.actions

Default actions available to the mrsm CLI.

meerschaum.config

Meerschaum v1.2.9

meerschaum.connectors

Create connectors with get_connector(). For ease of use, you can also import from the root meerschaum module:

```python-repl
>>> from …
meerschaum.plugins

Expose plugin management APIs from the meerschaum.plugins module.

meerschaum.utils

The utils module contains utility functions. These include tools from primary utilities (get_pipes) to miscellaneous helper functions.

Functions

def get_connector(type: str = None, label: str = None, refresh: bool = False, debug: bool = False, **kw: Any)

Return existing connector or create new connection and store for reuse.

You can create new connectors if enough parameters are provided for the given type and flavor.

Parameters

type : Optional[str], default None
Connector type (sql, api, etc.). Defaults to the type of the configured instance_connector.
label : Optional[str], default None
Connector label (e.g. main). Defaults to 'main'.
refresh : bool, default False
Refresh the Connector instance / construct new object. Defaults to False.
kw : Any
Other arguments to pass to the Connector constructor. If the Connector has already been constructed and new arguments are provided, refresh is set to True and the old Connector is replaced.

Returns

A new Meerschaum connector (e.g. meerschaum.connectors.api.APIConnector, meerschaum.connectors.sql.SQLConnector).

Examples

The following parameters would create a new meerschaum.connectors.sql.SQLConnector that isn't in the configuration file.

>>> conn = get_connector(
...     type = 'sql',
...     label = 'newlabel',
...     flavor = 'sqlite',
...     database = '/file/path/to/database.db'
... )
>>>
Expand source code
def get_connector(
        type: str = None,
        label: str = None,
        refresh: bool = False,
        debug: bool = False,
        **kw: Any
    ):
    """
    Return existing connector or create new connection and store for reuse.
    
    You can create new connectors if enough parameters are provided for the given type and flavor.
    

    Parameters
    ----------
    type: Optional[str], default None
        Connector type (sql, api, etc.).
        Defaults to the type of the configured `instance_connector`.

    label: Optional[str], default None
        Connector label (e.g. main). Defaults to `'main'`.

    refresh: bool, default False
        Refresh the Connector instance / construct new object. Defaults to `False`.

    kw: Any
        Other arguments to pass to the Connector constructor.
        If the Connector has already been constructed and new arguments are provided,
        `refresh` is set to `True` and the old Connector is replaced.

    Returns
    -------
    A new Meerschaum connector (e.g. `meerschaum.connectors.api.APIConnector`,
    `meerschaum.connectors.sql.SQLConnector`).
    
    Examples
    --------
    The following parameters would create a new
    `meerschaum.connectors.sql.SQLConnector` that isn't in the configuration file.

    ```
    >>> conn = get_connector(
    ...     type = 'sql',
    ...     label = 'newlabel',
    ...     flavor = 'sqlite',
    ...     database = '/file/path/to/database.db'
    ... )
    >>>
    ```

    """
    from meerschaum.connectors.parse import parse_instance_keys
    from meerschaum.config import get_config
    from meerschaum.config.static import _static_config
    global _loaded_plugin_connectors
    with _locks['_loaded_plugin_connectors']:
        if not _loaded_plugin_connectors:
            load_plugin_connectors()
            _loaded_plugin_connectors = True
    if type is None and label is None:
        default_instance_keys = get_config('meerschaum', 'instance', patch=True)
        ### recursive call to get_connector
        return parse_instance_keys(default_instance_keys)

    ### NOTE: the default instance connector may not be main.
    ### Only fall back to 'main' if the type is provided by the label is omitted.
    label = label if label is not None else _static_config()['connectors']['default_label']

    ### type might actually be a label. Check if so and raise a warning.
    if type not in connectors:
        possibilities, poss_msg = [], ""
        for _type in get_config('meerschaum', 'connectors'):
            if type in get_config('meerschaum', 'connectors', _type):
                possibilities.append(f"{_type}:{type}")
        if len(possibilities) > 0:
            poss_msg = " Did you mean"
            for poss in possibilities[:-1]:
                poss_msg += f" '{poss}',"
            if poss_msg.endswith(','):
                poss_msg = poss_msg[:-1]
            if len(possibilities) > 1:
                poss_msg += " or"
            poss_msg += f" '{possibilities[-1]}'?"

        warn(f"Cannot create Connector of type '{type}'." + poss_msg, stack=False)
        return None

    if 'sql' not in types:
        from meerschaum.connectors.mqtt import MQTTConnector
        from meerschaum.connectors.plugin import PluginConnector
        with _locks['types']:
            types.update({
                'api'   : APIConnector,
                'sql'   : SQLConnector,
                'mqtt'  : MQTTConnector,
                'plugin': PluginConnector,
            })
    
    ### always refresh MQTT Connectors NOTE: test this!
    if type == 'mqtt':
        refresh = True

    ### determine if we need to call the constructor
    if not refresh:
        ### see if any user-supplied arguments differ from the existing instance
        if label in connectors[type]:
            warning_message = None
            for attribute, value in kw.items():
                if attribute not in connectors[type][label].__dict__:
                    warning_message = (
                        f"Received new attribute '{attribute}' not present in connector " +
                        f"{connectors[type][label]}.\n"
                    )
                elif connectors[type][label].__dict__[attribute] != value:
                    warning_message = (
                        f"Mismatched values for attribute '{attribute}' in connector "
                        + f"'{connectors[type][label]}'.\n" +
                        f"  - Keyword value: '{value}'\n" +
                        f"  - Existing value: '{connectors[type][label].__dict__[attribute]}'\n"
                    )
            if warning_message is not None:
                warning_message += (
                    "\nSetting `refresh` to True and recreating connector with type:"
                    + f" '{type}' and label '{label}'."
                )
                refresh = True
                warn(warning_message)
        else: ### connector doesn't yet exist
            refresh = True

    ### only create an object if refresh is True
    ### (can be manually specified, otherwise determined above)
    if refresh:
        with _locks['connectors']:
            try:
                ### will raise an error if configuration is incorrect / missing
                conn = types[type](label=label, debug=debug, **kw)
                connectors[type][label] = conn
            except Exception as e:
                warn(f"Exception when creating connector '{type}:{label}'\n" + str(e), stack=False)
                conn = None
        if conn is None:
            return None

    return connectors[type][label]
def get_pipes(connector_keys: Union[str, List[str], None] = None, metric_keys: Union[str, List[str], None] = None, location_keys: Union[str, List[str], None] = None, tags: Optional[List[str], None] = None, params: Optional[Dict[str, Any]] = None, mrsm_instance: Union[str, InstanceConnector, None] = None, instance: Union[str, InstanceConnector, None] = None, as_list: bool = False, method: str = 'registered', wait: bool = False, debug: bool = False, **kw: Any) ‑> Union[PipesDict, List['Pipe']]

Return a dictionary or list of Pipe objects.

Parameters

connector_keys : Union[str, List[str], None], default None
String or list of connector keys. If omitted or is '*', fetch all possible keys. If a string begins with '_', select keys that do NOT match the string.
metric_keys : Union[str, List[str], None], default None
String or list of metric keys. See connector_keys for formatting.
location_keys : Union[str, List[str], None], default None
String or list of location keys. See connector_keys for formatting.
tags : Optional[List[str]], default None
If provided, only include pipes with these tags.
params : Optional[Dict[str, Any]], default None
Dictionary of additional parameters to search by. Params are parsed into a SQL WHERE clause. E.g. {'a': 1, 'b': 2} equates to 'WHERE a = 1 AND b = 2'
mrsm_instance : Union[str, InstanceConnector, None], default None
Connector keys for the Meerschaum instance of the pipes. Must be a SQLConnector or APIConnector.
as_list : bool, default False
If True, return pipes in a list instead of a hierarchical dictionary. False : {connector_keys: {metric_key: {location_key: Pipe}}} True : [Pipe]
method : str, default 'registered'
Available options: ['registered', 'explicit', 'all'] If 'registered' (default), create pipes based on registered keys in the connector's pipes table (API or SQL connector, depends on mrsm_instance). If 'explicit', create pipes from provided connector_keys, metric_keys, and location_keys instead of consulting the pipes table. Useful for creating non-existent pipes. If 'all', create pipes from predefined metrics and locations. Required connector_keys. NOTE: Method 'all' is not implemented!
wait : bool, default False
Wait for a connection before getting Pipes. Should only be true for cases where the database might not be running (like the API).
**kw : Any:
Keyword arguments to pass to the Pipe constructor.

Returns

A dictionary of dictionaries and Pipe objects in the connector, metric, location hierarchy. If as_list is True, return a list of Pipe objects.

Examples

>>> ### Manual definition:
>>> pipes = {
...     <connector_keys>: {
...         <metric_key>: {
...             <location_key>: Pipe(
...                 <connector_keys>,
...                 <metric_key>,
...                 <location_key>,
...             ),
...         },
...     },
... },
>>> ### Accessing a single pipe:
>>> pipes['sql:main']['weather'][None]
>>> ### Return a list instead:
>>> get_pipes(as_list=True)
[sql_main_weather]
>>> 
Expand source code
def get_pipes(
        connector_keys: Union[str, List[str], None] = None,
        metric_keys: Union[str, List[str], None] = None,
        location_keys: Union[str, List[str], None] = None,
        tags: Optional[List[str], None] = None,
        params: Optional[Dict[str, Any]] = None,
        mrsm_instance: Union[str, InstanceConnector, None] = None,
        instance: Union[str, InstanceConnector, None] = None,
        as_list: bool = False,
        method: str = 'registered',
        wait: bool = False,
        debug: bool = False,
        **kw: Any
    ) -> Union[PipesDict, List['meerschaum.Pipe']]:
    """
    Return a dictionary or list of `meerschaum.Pipe` objects.

    Parameters
    ----------
    connector_keys: Union[str, List[str], None], default None
        String or list of connector keys.
        If omitted or is `'*'`, fetch all possible keys.
        If a string begins with `'_'`, select keys that do NOT match the string.

    metric_keys: Union[str, List[str], None], default None
        String or list of metric keys. See `connector_keys` for formatting.

    location_keys: Union[str, List[str], None], default None
        String or list of location keys. See `connector_keys` for formatting.

    tags: Optional[List[str]], default None
         If provided, only include pipes with these tags.

    params: Optional[Dict[str, Any]], default None
        Dictionary of additional parameters to search by.
        Params are parsed into a SQL WHERE clause.
        E.g. `{'a': 1, 'b': 2}` equates to `'WHERE a = 1 AND b = 2'`

    mrsm_instance: Union[str, InstanceConnector, None], default None
        Connector keys for the Meerschaum instance of the pipes.
        Must be a `meerschaum.connectors.sql.SQLConnector.SQLConnector` or
        `meerschaum.connectors.api.APIConnector.APIConnector`.
        
    as_list: bool, default False
        If `True`, return pipes in a list instead of a hierarchical dictionary.
        `False` : `{connector_keys: {metric_key: {location_key: Pipe}}}`
        `True`  : `[Pipe]`

    method: str, default 'registered'
        Available options: `['registered', 'explicit', 'all']`
        If `'registered'` (default), create pipes based on registered keys in the connector's pipes table
        (API or SQL connector, depends on mrsm_instance).
        If `'explicit'`, create pipes from provided connector_keys, metric_keys, and location_keys
        instead of consulting the pipes table. Useful for creating non-existent pipes.
        If `'all'`, create pipes from predefined metrics and locations. Required `connector_keys`.
        **NOTE:** Method `'all'` is not implemented!

    wait: bool, default False
        Wait for a connection before getting Pipes. Should only be true for cases where the
        database might not be running (like the API).

    **kw: Any:
        Keyword arguments to pass to the `meerschaum.Pipe` constructor.
        

    Returns
    -------
    A dictionary of dictionaries and `meerschaum.Pipe` objects
    in the connector, metric, location hierarchy.
    If `as_list` is `True`, return a list of `meerschaum.Pipe` objects.

    Examples
    --------
    ```
    >>> ### Manual definition:
    >>> pipes = {
    ...     <connector_keys>: {
    ...         <metric_key>: {
    ...             <location_key>: Pipe(
    ...                 <connector_keys>,
    ...                 <metric_key>,
    ...                 <location_key>,
    ...             ),
    ...         },
    ...     },
    ... },
    >>> ### Accessing a single pipe:
    >>> pipes['sql:main']['weather'][None]
    >>> ### Return a list instead:
    >>> get_pipes(as_list=True)
    [sql_main_weather]
    >>> 
    ```
    """

    from meerschaum.config import get_config
    from meerschaum.utils.warnings import error
    from meerschaum.utils.misc import filter_keywords

    if connector_keys is None:
        connector_keys = []
    if metric_keys is None:
        metric_keys = []
    if location_keys is None:
        location_keys = []
    if params is None:
        params = {}
    if tags is None:
        tags = []

    if isinstance(connector_keys, str):
        connector_keys = [connector_keys]
    if isinstance(metric_keys, str):
        metric_keys = [metric_keys]
    if isinstance(location_keys, str):
        location_keys = [location_keys]

    ### Get SQL or API connector (keys come from `connector.fetch_pipes_keys()`).
    ### If `wait`, wait until a connection is made
    if mrsm_instance is None:
        mrsm_instance = instance
    if mrsm_instance is None:
        mrsm_instance = get_config('meerschaum', 'instance', patch=True)
    if isinstance(mrsm_instance, str):
        from meerschaum.connectors.parse import parse_instance_keys
        connector = parse_instance_keys(keys=mrsm_instance, wait=wait, debug=debug)
    else: ### NOTE: mrsm_instance MUST be a SQL or API connector for this to work
        from meerschaum.connectors import Connector
        valid_connector = False
        if issubclass(type(mrsm_instance), Connector):
            if mrsm_instance.type in ('api', 'sql'):
                valid_connector = True
        if not valid_connector:
            error(f"Invalid instance connector: {mrsm_instance}")
        connector = mrsm_instance
    if debug:
        from meerschaum.utils.debug import dprint
        dprint(f"Using instance connector: {connector}")
    if not connector:
        error(f"Could not create connector from keys: '{mrsm_instance}'")

    ### Get a list of tuples for the keys needed to build pipes.
    result = fetch_pipes_keys(
        method,
        connector,
        connector_keys = connector_keys,
        metric_keys = metric_keys,
        location_keys = location_keys,
        tags = tags,
        params = params,
        debug = debug
    )
    if result is None:
        error(f"Unable to build pipes!")

    ### Populate the `pipes` dictionary with Pipes based on the keys
    ### obtained from the chosen `method`.
    from meerschaum import Pipe
    pipes = {}
    for ck, mk, lk in result:
        if ck not in pipes:
            pipes[ck] = {}

        if mk not in pipes[ck]:
            pipes[ck][mk] = {}

        pipes[ck][mk][lk] = Pipe(
            ck, mk, lk,
            mrsm_instance=connector,
            debug=debug,
            **filter_keywords(Pipe, **kw)
        )

    if not as_list:
        return pipes
    from meerschaum.utils.misc import flatten_pipes_dict
    return flatten_pipes_dict(pipes)

Classes

class Pipe (connector: str = '', metric: str = '', location: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None, columns: Optional[Dict[str, str]] = None, tags: Optional[List[str]] = None, target: Optional[str] = None, dtypes: Optional[Dict[str, str]] = None, instance: Optional[Union[str, InstanceConnector]] = None, mrsm_instance: Optional[Union[str, InstanceConnector]] = None, cache: bool = False, debug: bool = False, connector_keys: Optional[str] = None, metric_key: Optional[str] = None, location_key: Optional[str] = None)

Access Meerschaum pipes via Pipe objects.

Pipes are identified by the following:

  1. Connector keys (e.g. 'sql:main')
  2. Metric key (e.g. 'weather')
  3. Location (optional; e.g. None)

A pipe's connector keys correspond to a data source, and when the pipe is synced, its fetch definition is evaluated and executed to produce new data.

Alternatively, new data may be directly synced via pipe.sync():

>>> from meerschaum import Pipe
>>> pipe = Pipe('csv', 'weather')
>>>
>>> import pandas as pd
>>> df = pd.read_csv('weather.csv')
>>> pipe.sync(df)

Parameters

connector : str
Keys for the pipe's source connector, e.g. 'sql:main'.
metric : str
Label for the pipe's contents, e.g. 'weather'.
location : str, default None
Label for the pipe's location. Defaults to None.
parameters : Optional[Dict[str, Any]], default None
Optionally set a pipe's parameters from the constructor, e.g. columns and other attributes. You can edit these parameters with edit pipes.
columns : Optional[Dict[str, str]], default None
Set the columns dictionary of parameters. If parameters is also provided, this dictionary is added under the 'columns' key.
tags : Optional[List[str]], default None
A list of strings to be added under the 'tags' key of parameters. You can select pipes with certain tags using --tags.
dtypes : Optional[Dict[str, str]], default None
Set the dtypes dictionary of parameters. If parameters is also provided, this dictionary is added under the 'dtypes' key.
mrsm_instance : Optional[Union[str, InstanceConnector]], default None
Connector for the Meerschaum instance where the pipe resides. Defaults to the preconfigured default instance ('sql:main').
instance : Optional[Union[str, InstanceConnector]], default None
Alias for mrsm_instance. If mrsm_instance is supplied, this value is ignored.
cache : bool, default False
If True, cache fetched data into a local database file. Defaults to False.
Expand source code
class Pipe:
    """
    Access Meerschaum pipes via Pipe objects.
    
    Pipes are identified by the following:

    1. Connector keys (e.g. `'sql:main'`)
    2. Metric key (e.g. `'weather'`)
    3. Location (optional; e.g. `None`)
    
    A pipe's connector keys correspond to a data source, and when the pipe is synced,
    its `fetch` definition is evaluated and executed to produce new data.
    
    Alternatively, new data may be directly synced via `pipe.sync()`:
    
    ```
    >>> from meerschaum import Pipe
    >>> pipe = Pipe('csv', 'weather')
    >>>
    >>> import pandas as pd
    >>> df = pd.read_csv('weather.csv')
    >>> pipe.sync(df)
    ```
    """

    from ._fetch import fetch
    from ._data import get_data, get_backtrack_data, get_rowcount
    from ._register import register
    from ._attributes import (
        attributes,
        parameters,
        columns,
        dtypes,
        get_columns,
        get_columns_types,
        tags,
        get_id,
        id,
        get_val_column,
        parents,
        target,
        _target_legacy,
        guess_datetime,
    )
    from ._show import show
    from ._edit import edit, edit_definition
    from ._sync import sync, get_sync_time, exists, filter_existing
    from ._delete import delete
    from ._drop import drop
    from ._clear import clear
    from ._bootstrap import bootstrap
    from ._dtypes import enforce_dtypes, infer_dtypes

    def __init__(
        self,
        connector: str = '',
        metric: str = '',
        location: Optional[str] = None,
        parameters: Optional[Dict[str, Any]] = None,
        columns: Optional[Dict[str, str]] = None,
        tags: Optional[List[str]] = None,
        target: Optional[str] = None,
        dtypes: Optional[Dict[str, str]] = None,
        instance: Optional[Union[str, InstanceConnector]] = None,
        mrsm_instance: Optional[Union[str, InstanceConnector]] = None,
        cache: bool = False,
        debug: bool = False,
        connector_keys: Optional[str] = None,
        metric_key: Optional[str] = None,
        location_key: Optional[str] = None,
    ):
        """
        Parameters
        ----------
        connector: str
            Keys for the pipe's source connector, e.g. `'sql:main'`.

        metric: str
            Label for the pipe's contents, e.g. `'weather'`.

        location: str, default None
            Label for the pipe's location. Defaults to `None`.

        parameters: Optional[Dict[str, Any]], default None
            Optionally set a pipe's parameters from the constructor,
            e.g. columns and other attributes.
            You can edit these parameters with `edit pipes`.

        columns: Optional[Dict[str, str]], default None
            Set the `columns` dictionary of `parameters`.
            If `parameters` is also provided, this dictionary is added under the `'columns'` key.

        tags: Optional[List[str]], default None
            A list of strings to be added under the `'tags'` key of `parameters`.
            You can select pipes with certain tags using `--tags`.

        dtypes: Optional[Dict[str, str]], default None
            Set the `dtypes` dictionary of `parameters`.
            If `parameters` is also provided, this dictionary is added under the `'dtypes'` key.

        mrsm_instance: Optional[Union[str, InstanceConnector]], default None
            Connector for the Meerschaum instance where the pipe resides.
            Defaults to the preconfigured default instance (`'sql:main'`).

        instance: Optional[Union[str, InstanceConnector]], default None
            Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored.

        cache: bool, default False
            If `True`, cache fetched data into a local database file.
            Defaults to `False`.
        """
        from meerschaum.utils.warnings import error
        if (not connector and not connector_keys) or (not metric and not metric_key):
            error(
                "Please provide strings for the connector and metric\n    "
                + "(first two positional arguments)."
            )

        ### Fall back to legacy `location_key` just in case.
        if not location:
            location = location_key

        if not connector:
            connector = connector_keys

        if not metric:
            metric = metric_key

        if location in ('[None]', 'None'):
            location = None

        from meerschaum.config.static import _static_config
        negation_prefix = _static_config()['system']['fetch_pipes_keys']['negation_prefix']
        for k in (connector, metric, location, *(tags or [])):
            if str(k).startswith(negation_prefix):
                error(f"A pipe's keys and tags cannot start with the prefix '{negation_prefix}'.")

        self.connector_keys = str(connector)
        self.connector_key = self.connector_keys ### Alias
        self.metric_key = metric
        self.location_key = location

        ### only set parameters if values are provided
        if parameters is not None:
            self._parameters = parameters

        if columns is not None:
            if self.__dict__.get('_parameters', None) is None:
                self._parameters = {}
            self._parameters['columns'] = columns

        if tags is not None:
            if self.__dict__.get('_parameters', None) is None:
                self._parameters = {}
            self._parameters['tags'] = tags

        if target is not None:
            if self.__dict__.get('_parameters', None) is None:
                self._parameters = {}
            self._parameters['target'] = target

        if dtypes is not None:
            if self.__dict__.get('_parameters', None) is None:
                self._parameters = {}
            self._parameters['dtypes'] = dtypes


        ### NOTE: The parameters dictionary is {} by default.
        ###       A Pipe may be registered without parameters, then edited,
        ###       or a Pipe may be registered with parameters set in-memory first.
        #  from meerschaum.config import get_config
        _mrsm_instance = mrsm_instance if mrsm_instance is not None else instance
        if _mrsm_instance is None:
            _mrsm_instance = get_config('meerschaum', 'instance', patch=True)
        if not isinstance(_mrsm_instance, str):
            self._instance_connector = _mrsm_instance
            self.instance_keys = str(_mrsm_instance)
        else: ### NOTE: must be SQL or API Connector for this work
            self.instance_keys = _mrsm_instance

        self._cache = cache and get_config('system', 'experimental', 'cache')

    @property
    def meta(self):
        """Simulate the MetaPipe model without importing FastAPI."""
        refresh = False
        if '_meta' not in self.__dict__:
            refresh = True

        if refresh:
            self._meta = {
                'connector_keys' : self.connector_keys,
                'metric_key'     : self.metric_key,
                'location_key'   : self.location_key,
                'instance'       : self.instance_keys,
            }
        return self._meta

    @property
    def instance_connector(self) -> Union[InstanceConnector, None]:
        """
        The connector to where this pipe resides.
        May either be of type `meerschaum.connectors.sql.SQLConnector` or
        `meerschaum.connectors.api.APIConnector`.
        """
        if '_instance_connector' not in self.__dict__:
            from meerschaum.connectors.parse import parse_instance_keys
            conn = parse_instance_keys(self.instance_keys)
            if conn:
                self._instance_connector = conn
            else:
                return None
        return self._instance_connector

    @property
    def connector(self) -> Union[meerschaum.connectors.Connector, None]:
        """
        The connector to the data source.
        May be of type `'sql'`, `'api`', `'mqtt'`, or `'plugin'`.
        """
        if '_connector' not in self.__dict__:
            from meerschaum.connectors.parse import parse_instance_keys
            import warnings
            with warnings.catch_warnings():
                warnings.simplefilter('ignore')
                try:
                    conn = parse_instance_keys(self.connector_keys)
                except Exception as e:
                    conn = None
            if conn:
                self._connector = conn
            else:
                return None
        return self._connector


    @property
    def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]:
        """
        If the pipe was created with `cache=True`, return the connector to the pipe's
        SQLite database for caching.
        """
        if not self._cache:
            return None

        if '_cache_connector' not in self.__dict__:
            from meerschaum.connectors import get_connector
            from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH
            _resources_path = SQLITE_RESOURCES_PATH
            self._cache_connector = get_connector(
                'sql', '_cache_' + str(self),
                flavor='sqlite',
                database=str(_resources_path / ('_cache_' + str(self) + '.db')),
            )

        return self._cache_connector


    @property
    def cache_pipe(self) -> Union['meerschaum.Pipe', None]:
        """
        If the pipe was created with `cache=True`, return another `meerschaum.Pipe` used to
        manage the local data.
        """
        if self.cache_connector is None:
            return None
        if '_cache_pipe' not in self.__dict__:
            from meerschaum.config._patch import apply_patch_to_config
            from meerschaum.utils.sql import sql_item_name
            _parameters = self.parameters.copy()
            _fetch_patch = {
                'fetch': ({
                    'definition': (
                        f"SELECT * FROM {sql_item_name(str(self), self.instance_connector.flavor)}"
                    ),
                }) if self.instance_connector.type == 'sql' else ({
                    'connector_keys': self.connector_keys,
                    'metric_key': self.metric_key,
                    'location_key': self.location_key,
                })
            }
            _parameters = apply_patch_to_config(_parameters, _fetch_patch)
            self._cache_pipe = Pipe(
                self.instance_keys,
                (self.connector_keys + '_' + self.metric_key + '_cache'),
                self.location_key,
                mrsm_instance=self.cache_connector,
                parameters=_parameters,
                cache=False,
            )

        return self._cache_pipe

    @property
    def sync_time(self) -> Union['datetime.datetime', None]:
        """
        Convenience function to get the pipe's latest datetime.
        Use `meerschaum.Pipe.get_sync_time()` instead.
        """
        return self.get_sync_time()

    def __str__(self, ansi: bool=False):
        return pipe_repr(self, ansi=ansi)


    def __eq__(self, other):
        try:
            return (
                isinstance(self, type(other))
                and self.connector_keys == other.connector_keys
                and self.metric_key == other.metric_key
                and self.location_key == other.location_key
                and self.instance_keys == other.instance_keys
            )
        except Exception as e:
            return False

    def __hash__(self):
        ### Using an esoteric separator to avoid collisions.
        sep = "[\"']"
        return hash(
            str(self.connector_keys) + sep
            + str(self.metric_key) + sep
            + str(self.location_key) + sep
            + str(self.instance_keys) + sep
        )

    def __repr__(self, **kw) -> str:
        return pipe_repr(self, **kw)

    def __getstate__(self) -> Dict[str, Any]:
        """
        Define the state dictionary (pickling).
        """
        return {
            'connector_keys': self.connector_keys,
            'metric_key': self.metric_key,
            'location_key': self.location_key,
            'parameters': self.parameters,
            'mrsm_instance': self.instance_keys,
        }

    def __setstate__(self, _state: Dict[str, Any]):
        """
        Read the state (unpickling).
        """
        connector_keys = _state.pop('connector_keys')
        metric_key = _state.pop('metric_key')
        location_key = _state.pop('location_key')
        self.__init__(connector_keys, metric_key, location_key, **_state)

Instance variables

var attributes : Union[Dict[str, Any], NoneType]

Return a dictionary of a pipe's keys and parameters. Is a superset of Pipe.parameters and ONLY returns a dictionary if the pipe is registered. An unregistered pipe may still set its parameters. Use Pipe.meta to retrieve keys from unregistered pipes.

Expand source code
@property
def attributes(self) -> Union[Dict[str, Any], None]:
    """
    Return a dictionary of a pipe's keys and parameters.
    Is a superset of `meerschaum.Pipe.parameters` and
    **ONLY** returns a dictionary if the pipe is registered.
    An unregistered pipe may still set its parameters.
    Use `meerschaum.Pipe.meta` to retrieve keys from unregistered pipes.
    """
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.warnings import warn
    if '_attributes' not in self.__dict__:
        if self.id is None:
            return None
        self._attributes = self.instance_connector.get_pipe_attributes(self)
    return self._attributes
var cache_connector : Union[meerschaum.connectors.sql.SQLConnector, None]

If the pipe was created with cache=True, return the connector to the pipe's SQLite database for caching.

Expand source code
@property
def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]:
    """
    If the pipe was created with `cache=True`, return the connector to the pipe's
    SQLite database for caching.
    """
    if not self._cache:
        return None

    if '_cache_connector' not in self.__dict__:
        from meerschaum.connectors import get_connector
        from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH
        _resources_path = SQLITE_RESOURCES_PATH
        self._cache_connector = get_connector(
            'sql', '_cache_' + str(self),
            flavor='sqlite',
            database=str(_resources_path / ('_cache_' + str(self) + '.db')),
        )

    return self._cache_connector
var cache_pipe : Union[Pipe, NoneType]

If the pipe was created with cache=True, return another Pipe used to manage the local data.

Expand source code
@property
def cache_pipe(self) -> Union['meerschaum.Pipe', None]:
    """
    If the pipe was created with `cache=True`, return another `meerschaum.Pipe` used to
    manage the local data.
    """
    if self.cache_connector is None:
        return None
    if '_cache_pipe' not in self.__dict__:
        from meerschaum.config._patch import apply_patch_to_config
        from meerschaum.utils.sql import sql_item_name
        _parameters = self.parameters.copy()
        _fetch_patch = {
            'fetch': ({
                'definition': (
                    f"SELECT * FROM {sql_item_name(str(self), self.instance_connector.flavor)}"
                ),
            }) if self.instance_connector.type == 'sql' else ({
                'connector_keys': self.connector_keys,
                'metric_key': self.metric_key,
                'location_key': self.location_key,
            })
        }
        _parameters = apply_patch_to_config(_parameters, _fetch_patch)
        self._cache_pipe = Pipe(
            self.instance_keys,
            (self.connector_keys + '_' + self.metric_key + '_cache'),
            self.location_key,
            mrsm_instance=self.cache_connector,
            parameters=_parameters,
            cache=False,
        )

    return self._cache_pipe
var columns : Union[Dict[str, str], NoneType]

If defined, return the columns dictionary defined in Pipe.parameters.

Expand source code
@property
def columns(self) -> Union[Dict[str, str], None]:
    """
    If defined, return the `columns` dictionary defined in `meerschaum.Pipe.parameters`.
    """
    if not self.parameters:
        if '_columns' in self.__dict__:
            return self._columns
        return None
    if 'columns' not in self.parameters:
        return None
    return self.parameters['columns']
var connector : Union[Connector, None]

The connector to the data source. May be of type 'sql', 'api', 'mqtt', or 'plugin'.

Expand source code
@property
def connector(self) -> Union[meerschaum.connectors.Connector, None]:
    """
    The connector to the data source.
    May be of type `'sql'`, `'api`', `'mqtt'`, or `'plugin'`.
    """
    if '_connector' not in self.__dict__:
        from meerschaum.connectors.parse import parse_instance_keys
        import warnings
        with warnings.catch_warnings():
            warnings.simplefilter('ignore')
            try:
                conn = parse_instance_keys(self.connector_keys)
            except Exception as e:
                conn = None
        if conn:
            self._connector = conn
        else:
            return None
    return self._connector
var dtypes : Union[Dict[str, Any], NoneType]

If defined, return the dtypes dictionary defined in Pipe.parameters.

Expand source code
@property
def dtypes(self) -> Union[Dict[str, Any], None]:
    """
    If defined, return the `dtypes` dictionary defined in `meerschaum.Pipe.parameters`.
    """
    if self.parameters is None or self.parameters.get('dtypes', None) is None:
        if self.__dict__.get('_dtypes', None):
            return self._dtypes
        _dtypes = self.infer_dtypes(persist=False)
        if not self.exists():
            return _dtypes
        self._dtypes = _dtypes
        return self._dtypes

    return self.parameters['dtypes']
var id : Union[int, NoneType]

Fetch and cache a pipe's ID.

Expand source code
@property
def id(self) -> Union[int, None]:
    """
    Fetch and cache a pipe's ID.
    """
    if not ('_id' in self.__dict__ and self._id):
        self._id = self.get_id()
    return self._id
var instance_connector : Union[meerschaum.connectors.sql.SQLConnectormeerschaum.connectors.api.APIConnector, NoneType]

The connector to where this pipe resides. May either be of type meerschaum.connectors.sql.SQLConnector or meerschaum.connectors.api.APIConnector.

Expand source code
@property
def instance_connector(self) -> Union[InstanceConnector, None]:
    """
    The connector to where this pipe resides.
    May either be of type `meerschaum.connectors.sql.SQLConnector` or
    `meerschaum.connectors.api.APIConnector`.
    """
    if '_instance_connector' not in self.__dict__:
        from meerschaum.connectors.parse import parse_instance_keys
        conn = parse_instance_keys(self.instance_keys)
        if conn:
            self._instance_connector = conn
        else:
            return None
    return self._instance_connector
var meta

Simulate the MetaPipe model without importing FastAPI.

Expand source code
@property
def meta(self):
    """Simulate the MetaPipe model without importing FastAPI."""
    refresh = False
    if '_meta' not in self.__dict__:
        refresh = True

    if refresh:
        self._meta = {
            'connector_keys' : self.connector_keys,
            'metric_key'     : self.metric_key,
            'location_key'   : self.location_key,
            'instance'       : self.instance_keys,
        }
    return self._meta
var parameters : Union[Dict[str, Any], NoneType]

Return the parameters dictionary of the pipe.

Expand source code
@property
def parameters(self) -> Optional[Dict[str, Any]]:
    """
    Return the parameters dictionary of the pipe.
    """
    if '_parameters' not in self.__dict__:
        if not self.attributes:
            return None
        self._parameters = self.attributes['parameters']
    return self._parameters
var parents : List[Pipe]

Return a list of Pipe objects. These pipes will be synced before this pipe.

NOTE: Not yet in use!

Expand source code
@property
def parents(self) -> List[meerschaum.Pipe]:
    """
    Return a list of `meerschaum.Pipe` objects.
    These pipes will be synced before this pipe.

    NOTE: Not yet in use!
    """
    if 'parents' not in self.parameters:
        return []
    from meerschaum.utils.warnings import warn
    _parents_keys = self.parameters['parents']
    if not isinstance(_parents_keys, list):
        warn(
            f"Please ensure the parents for {self} are defined as a list of keys.",
            stacklevel = 4
        )
        return []
    from meerschaum import Pipe
    _parents = []
    for keys in _parents_keys:
        try:
            p = Pipe(**keys)
        except Exception as e:
            warn(f"Unable to build parent with keys '{keys}' for {self}:\n{e}")
            continue
        _parents.append(p)
    return _parents
var sync_time : Union[datetime.datetime, NoneType]

Convenience function to get the pipe's latest datetime. Use get_sync_time() instead.

Expand source code
@property
def sync_time(self) -> Union['datetime.datetime', None]:
    """
    Convenience function to get the pipe's latest datetime.
    Use `meerschaum.Pipe.get_sync_time()` instead.
    """
    return self.get_sync_time()
var tags : Union[List[str], NoneType]

If defined, return the tags list defined in Pipe.parameters.

Expand source code
@property
def tags(self) -> Union[List[str], None]:
    """
    If defined, return the `tags` list defined in `meerschaum.Pipe.parameters`.
    """
    if not self.parameters:
        if '_tags' in self.__dict__:
            return self._tags
        return None
    if 'tags' not in self.parameters:
        return None
    return self.parameters['tags']
var target : str

The target table name. You can set the target name under on of the following keys (checked in this order): - target - target_name - target_table - target_table_name

Expand source code
@property
def target(self) -> str:
    """
    The target table name.
    You can set the target name under on of the following keys
    (checked in this order):
      - `target`
      - `target_name`
      - `target_table`
      - `target_table_name`
    """
    _target_key = '_target'
    if _target_key not in self.__dict__:
        if not self.parameters:
            self.__dict__[_target_key] = self._target_legacy()
        else:
            potential_keys = ('target', 'target_name', 'target_table', 'target_table_name')
            for k in potential_keys:
                if k in self.parameters:
                    self.__dict__[_target_key] = self.parameters[k]
                    break
    return self.__dict__.get(_target_key, self._target_legacy())

Methods

def bootstrap(self, debug: bool = False, yes: bool = False, force: bool = False, noask: bool = False, shell: bool = False, **kw) ‑> Tuple[bool, str]

Prompt the user to create a pipe's requirements all from one method. This method shouldn't be used in any automated scripts because it interactively prompts the user and therefore may hang.

Parameters

debug : bool, default False:
Verbosity toggle.
yes : bool, default False:
Print the questions and automatically agree.
force : bool, default False:
Skip the questions and agree anyway.
noask : bool, default False:
Print the questions but go with the default answer.
shell : bool, default False:
Used to determine if we are in the interactive shell.

Returns

A SuccessTuple corresponding to the success of this procedure.

Expand source code
def bootstrap(
        self,
        debug: bool = False,
        yes: bool = False,
        force: bool = False,
        noask: bool = False,
        shell: bool = False,
        **kw
    ) -> SuccessTuple:
    """
    Prompt the user to create a pipe's requirements all from one method.
    This method shouldn't be used in any automated scripts because it interactively
    prompts the user and therefore may hang.

    Parameters
    ----------
    debug: bool, default False:
        Verbosity toggle.

    yes: bool, default False:
        Print the questions and automatically agree.

    force: bool, default False:
        Skip the questions and agree anyway.

    noask: bool, default False:
        Print the questions but go with the default answer.

    shell: bool, default False:
        Used to determine if we are in the interactive shell.
        
    Returns
    -------
    A `SuccessTuple` corresponding to the success of this procedure.

    """

    from meerschaum.utils.warnings import warn, info, error
    from meerschaum.utils.prompt import prompt, yes_no
    from meerschaum.utils.formatting import pprint
    from meerschaum.config import get_config
    from meerschaum.utils.formatting._shell import clear_screen
    from meerschaum.utils.formatting import print_tuple
    from meerschaum.actions import actions

    _clear = get_config('shell', 'clear_screen', patch=True)

    if self.get_id(debug=debug) is not None:
        delete_tuple = self.delete(debug=debug)
        if not delete_tuple[0]:
            return delete_tuple

    if _clear:
        clear_screen(debug=debug)

    _parameters = _get_parameters(self, debug=debug)
    self.parameters = _parameters
    pprint(self.parameters)
    try:
        prompt(
            f"\n    Press [Enter] to register {self} with the above configuration:",
            icon = False
        )
    except KeyboardInterrupt as e:
        return False, f"Aborting bootstrapping {self}."
    register_tuple = self.instance_connector.register_pipe(self, debug=debug)
    if not register_tuple[0]:
        return register_tuple

    if _clear:
        clear_screen(debug=debug)

    try:
        if yes_no(
            f"Would you like to edit the definition for {self}?", yes=yes, noask=noask
        ):
            edit_tuple = self.edit_definition(debug=debug)
            if not edit_tuple[0]:
                return edit_tuple

        if yes_no(f"Would you like to try syncing {self} now?", yes=yes, noask=noask):
            sync_tuple = actions['sync'](
                ['pipes'],
                connector_keys = [self.connector_keys],
                metric_keys = [self.metric_key],
                location_keys = [self.location_key],
                mrsm_instance = str(self.instance_connector),
                debug = debug,
                shell = shell,
            )
            if not sync_tuple[0]:
                return sync_tuple
    except Exception as e:
        return False, f"Failed to bootstrap {self}:\n" + str(e)

    print_tuple((True, f"Finished bootstrapping {self}!"))
    info(
        f"You can edit this pipe later with `edit pipes` "
        + "or set the definition with `edit pipes definition`.\n"
        + "    To sync data into your pipe, run `sync pipes`."
    )

    return True, "Success"
def clear(self, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw: Any) ‑> SuccessTuple

Call the Pipe's instance connector's clear_pipe method.

Parameters

begin : Optional[datetime.datetime], default None:
If provided, only remove rows newer than this datetime value.
end : Optional[datetime.datetime], default None:
If provided, only remove rows older than this datetime column (not including end).
params : Optional[Dict[str, Any]], default None
See build_where().
debug : bool, default False:
Verbositity toggle.

Returns

A SuccessTuple corresponding to whether this procedure completed successfully.

Examples

>>> pipe = mrsm.Pipe('test', 'test', columns={'datetime': 'dt'}, instance='sql:local')
>>> pipe.sync({'dt': [datetime.datetime(2020, 1, 1, 0, 0)]})
>>> pipe.sync({'dt': [datetime.datetime(2021, 1, 1, 0, 0)]})
>>> pipe.sync({'dt': [datetime.datetime(2022, 1, 1, 0, 0)]})
>>> 
>>> pipe.clear(begin=datetime.datetime(2021, 1, 1, 0, 0))
>>> pipe.get_data()
          dt
0 2020-01-01
Expand source code
def clear(
        self,
        begin: Optional[datetime.datetime] = None,
        end: Optional[datetime.datetime] = None,
        params: Optional[Dict[str, Any]] = None,
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """
    Call the Pipe's instance connector's `clear_pipe` method.

    Parameters
    ----------
    begin: Optional[datetime.datetime], default None:
        If provided, only remove rows newer than this datetime value.

    end: Optional[datetime.datetime], default None:
        If provided, only remove rows older than this datetime column (not including end).

    params: Optional[Dict[str, Any]], default None
         See `meerschaum.utils.sql.build_where`.

    debug: bool, default False:
        Verbositity toggle.

    Returns
    -------
    A `SuccessTuple` corresponding to whether this procedure completed successfully.

    Examples
    --------
    >>> pipe = mrsm.Pipe('test', 'test', columns={'datetime': 'dt'}, instance='sql:local')
    >>> pipe.sync({'dt': [datetime.datetime(2020, 1, 1, 0, 0)]})
    >>> pipe.sync({'dt': [datetime.datetime(2021, 1, 1, 0, 0)]})
    >>> pipe.sync({'dt': [datetime.datetime(2022, 1, 1, 0, 0)]})
    >>> 
    >>> pipe.clear(begin=datetime.datetime(2021, 1, 1, 0, 0))
    >>> pipe.get_data()
              dt
    0 2020-01-01

    """
    from meerschaum.utils.warnings import warn
    if self.cache_pipe is not None:
        success, msg = self.cache_pipe.clear(begin=begin, end=end, debug=debug, **kw)
        if not success:
            warn(msg)
    return self.instance_connector.clear_pipe(
        self,
        begin=begin, end=end, params=params, debug=debug,
        **kw
    )
def delete(self, debug: bool = False, **kw) ‑> Tuple[bool, str]

Call the Pipe's instance connector's delete_pipe() method.

Parameters

debug : bool, default False:
Verbosity toggle.

Returns

A SuccessTuple of success (bool), message (str).

Expand source code
def delete(
        self,
        debug: bool = False,
        **kw
    ) -> SuccessTuple:
    """
    Call the Pipe's instance connector's `delete_pipe()` method.

    Parameters
    ----------
    debug : bool, default False:
        Verbosity toggle.

    Returns
    -------
    A `SuccessTuple` of success (`bool`), message (`str`).

    """
    import os, pathlib
    from meerschaum.utils.warnings import warn
    if self.cache_pipe is not None:
        _delete_cache_tuple = self.cache_pipe.delete(debug=debug, **kw)
        if not _delete_cache_tuple[0]:
            warn(_delete_cache_tuple[1])
        _cache_db_path = pathlib.Path(self.cache_connector.database)
        try:
            os.remove(_cache_db_path)
        except Exception as e:
            warn(f"Could not delete cache file '{_cache_db_path}' for {self}:\n{e}")
    result = self.instance_connector.delete_pipe(self, debug=debug, **kw)
    if not isinstance(result, tuple):
        return False, f"Received unexpected result from '{self.instance_connector}': {result}"
    if result[0]:
        to_delete = ['_id', '_attributes', '_columns', '_tags', '_data']
        for member in to_delete:
            if member in self.__dict__:
                del self.__dict__[member]
    return result
def drop(self, debug: bool = False, **kw: Any) ‑> Tuple[bool, str]

Call the Pipe's instance connector's drop_pipe() method

Parameters

debug : bool, default False:
Verbosity toggle.

Returns

A SuccessTuple of success, message.

Expand source code
def drop(
        self,
        debug: bool = False,
        **kw : Any
    ) -> SuccessTuple:
    """
    Call the Pipe's instance connector's `drop_pipe()` method

    Parameters
    ----------
    debug: bool, default False:
        Verbosity toggle.

    Returns
    -------
    A `SuccessTuple` of success, message.

    """
    from meerschaum.utils.warnings import warn
    if self.cache_pipe is not None:
        _drop_cache_tuple = self.cache_pipe.drop(debug=debug, **kw)
        if not _drop_cache_tuple[0]:
            warn(_drop_cache_tuple[1])
    return self.instance_connector.drop_pipe(self, debug=debug, **kw)
def edit(self, patch: bool = False, interactive: bool = False, debug: bool = False, **kw: Any) ‑> Tuple[bool, str]

Edit a Pipe's configuration.

Parameters

patch : bool, default False
If patch is True, update parameters by cascading rather than overwriting.
interactive : bool, default False
If True, open an editor for the user to make changes to the pipe's YAML file.
debug : bool, default False
Verbosity toggle.

Returns

A SuccessTuple of success, message.

Expand source code
def edit(
        self,
        patch: bool = False,
        interactive: bool = False,
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """
    Edit a Pipe's configuration.

    Parameters
    ----------
    patch: bool, default False
        If `patch` is True, update parameters by cascading rather than overwriting.
    interactive: bool, default False
        If `True`, open an editor for the user to make changes to the pipe's YAML file.
    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `SuccessTuple` of success, message.

    """
    if not interactive:
        return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw)
    from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
    from meerschaum.utils.misc import edit_file
    import pathlib, os
    parameters_filename = str(self) + '.yaml'
    parameters_path = pathlib.Path(os.path.join(PIPES_CACHE_RESOURCES_PATH, parameters_filename))
    
    from meerschaum.utils.yaml import yaml

    edit_text = f"Edit the parameters for {self}"
    edit_top = '#' * (len(edit_text) + 4)
    edit_header = edit_top + f'\n# {edit_text} #\n' + edit_top + '\n\n'

    from meerschaum.config import get_config
    parameters = dict(get_config('pipes', 'parameters', patch=True))
    from meerschaum.config._patch import apply_patch_to_config
    parameters = apply_patch_to_config(parameters, self.parameters)

    ### write parameters to yaml file
    with open(parameters_path, 'w+') as f:
        f.write(edit_header)
        yaml.dump(parameters, stream=f, sort_keys=False)

    ### only quit editing if yaml is valid
    editing = True
    while editing:
        edit_file(parameters_path)
        try:
            with open(parameters_path, 'r') as f:
                file_parameters = yaml.load(f.read())
        except Exception as e:
            from meerschaum.utils.warnings import warn
            warn(f"Invalid format defined for '{self}':\n\n{e}")
            input(f"Press [Enter] to correct the configuration for '{self}': ")
        else:
            editing = False

    self.parameters = file_parameters

    if debug:
        from meerschaum.utils.formatting import pprint
        pprint(self.parameters)

    return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw)
def edit_definition(self, yes: bool = False, noask: bool = False, force: bool = False, debug: bool = False, **kw: Any) ‑> Tuple[bool, str]

Edit a pipe's definition file and update its configuration. NOTE: This function is interactive and should not be used in automated scripts!

Returns

A SuccessTuple of success, message.

Expand source code
def edit_definition(
        self,
        yes: bool = False,
        noask: bool = False,
        force: bool = False,
        debug : bool = False,
        **kw : Any
    ) -> SuccessTuple:
    """
    Edit a pipe's definition file and update its configuration.
    **NOTE:** This function is interactive and should not be used in automated scripts!

    Returns
    -------
    A `SuccessTuple` of success, message.

    """
    if (self.connector is None) or self.connector.type not in ('sql', 'api'):
        return self.edit(interactive=True, debug=debug, **kw)

    import json
    from meerschaum.utils.warnings import info, warn
    from meerschaum.utils.debug import dprint
    from meerschaum.config._patch import apply_patch_to_config
    from meerschaum.utils.misc import edit_file

    _parameters = self.parameters
    if 'fetch' not in _parameters:
        _parameters['fetch'] = {}

    def _edit_api():
        from meerschaum.utils.prompt import prompt, yes_no
        info(
            f"Please enter the keys of the source pipe from '{self.connector}'.\n" +
            "Type 'None' for None, or empty when there is no default. Press [CTRL+C] to skip."
        )

        _keys = { 'connector_keys' : None, 'metric_key' : None, 'location_key' : None }
        for k in _keys:
            _keys[k] = _parameters['fetch'].get(k, None)

        for k, v in _keys.items():
            try:
                _keys[k] = prompt(k.capitalize().replace('_', ' ') + ':', icon=True, default=v)
            except KeyboardInterrupt:
                continue
            if _keys[k] in ('', 'None', '\'None\'', '[None]'):
                _keys[k] = None

        _parameters['fetch'] = apply_patch_to_config(_parameters['fetch'], _keys)

        info("You may optionally specify additional filter parameters as JSON.")
        print("  Parameters are translated into a 'WHERE x AND y' clause, and lists are IN clauses.")
        print("  For example, the following JSON would correspond to 'WHERE x = 1 AND y IN (2, 3)':")
        print(json.dumps({'x': 1, 'y': [2, 3]}, indent=2, separators=(',', ': ')))
        if force or yes_no(
            "Would you like to add additional filter parameters?",
            yes=yes, noask=noask
        ):
            from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
            definition_filename = str(self) + '.json'
            definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename
            try:
                definition_path.touch()
                with open(definition_path, 'w+') as f:
                    json.dump(_parameters.get('fetch', {}).get('params', {}), f, indent=2)
            except Exception as e:
                return False, f"Failed writing file '{definition_path}':\n" + str(e)

            _params = None
            while True:
                edit_file(definition_path)
                try:
                    with open(definition_path, 'r') as f:
                        _params = json.load(f)
                except Exception as e:
                    warn(f'Failed to read parameters JSON:\n{e}', stack=False)
                    if force or yes_no(
                        "Would you like to try again?\n  "
                        + "If not, the parameters JSON file will be ignored.",
                        noask=noask, yes=yes
                    ):
                        continue
                    _params = None
                break
            if _params is not None:
                if 'fetch' not in _parameters:
                    _parameters['fetch'] = {}
                _parameters['fetch']['params'] = _params

        self.parameters = _parameters
        return True, "Success"

    def _edit_sql():
        import pathlib, os, textwrap
        from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
        from meerschaum.utils.misc import edit_file
        definition_filename = str(self) + '.sql'
        definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename

        sql_definition = _parameters['fetch'].get('definition', None)
        if sql_definition is None:
            sql_definition = ''
        sql_definition = textwrap.dedent(sql_definition).lstrip()

        try:
            definition_path.touch()
            with open(definition_path, 'w+') as f:
                f.write(sql_definition)
        except Exception as e:
            return False, f"Failed writing file '{definition_path}':\n" + str(e)

        edit_file(definition_path)
        try:
            with open(definition_path, 'r') as f:
                file_definition = f.read()
        except Exception as e:
            return False, f"Failed reading file '{definition_path}':\n" + str(e)

        if sql_definition == file_definition:
            return False, f"No changes made to definition for {self}."

        if ' ' not in file_definition:
            return False, f"Invalid SQL definition for {self}."

        if debug:
            dprint("Read SQL definition:\n\n" + file_definition)
        _parameters['fetch']['definition'] = file_definition
        self.parameters = _parameters
        return True, "Success"

    locals()['_edit_' + str(self.connector.type)]()
    return self.edit(interactive=False, debug=debug, **kw)
def enforce_dtypes(self, df: "'pd.DataFrame'", debug: bool = False) ‑> pd.DataFrame

Cast the input dataframe to the pipe's registered data types. If the pipe does not exist and dtypes are not set, return the dataframe.

Expand source code
def enforce_dtypes(self, df: 'pd.DataFrame', debug: bool=False) -> 'pd.DataFrame':
    """
    Cast the input dataframe to the pipe's registered data types.
    If the pipe does not exist and dtypes are not set, return the dataframe.

    """
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.formatting import pprint
    from meerschaum.config.static import STATIC_CONFIG
    from meerschaum.utils.packages import import_pandas
    if df is None:
        if debug:
            dprint(
                f"Received None instead of a DataFrame.\n"
                + "    Skipping dtype enforcement..."
            )
        return df

    if not hasattr(df, 'dtypes'):
        pd = import_pandas(debug=debug)
        try:
            df = pd.DataFrame(df)
        except Exception as e:
            warn(f"Unable to cast incoming data as a DataFrame...:\n{e}")
            return df

    if not self.dtypes:
        if debug:
            dprint(
                f"Could not find dtypes for {self}.\n"
                + "    Skipping dtype enforcement..."
            )
        return df

    df_dtypes = {c: t.name for c, t in dict(df.dtypes).items()}
    if len(df_dtypes) == 0:
        if debug:
            dprint("Incoming DataFrame has no columns. Skipping enforcement...")
        return df

    if debug:
        dprint(f"Data types for {self}:")
        pprint(self.dtypes)
        dprint(f"Data types for incoming DataFrame:")
        pprint(df_dtypes)
    if df_dtypes == self.dtypes:
        if debug:
            dprint(f"Data types already match. Skipping enforcement...")
        return df

    common_dtypes = {}
    common_diff_dtypes = {}
    for d, t in self.dtypes.items():
        if d in df_dtypes:
            common_dtypes[d] = t
            if t != df_dtypes[d]:
                common_diff_dtypes[d] = df_dtypes[d]

    if debug:
        dprint(f"Common columns with different dtypes:")
        pprint(common_diff_dtypes)

    detected_dt_cols = {}
    for d, t in common_diff_dtypes.items():
        if 'datetime' in t and 'datetime' in common_dtypes[d]:
            df_dtypes[d] = t
            detected_dt_cols[d] = (common_dtypes[d], common_diff_dtypes[d])
    for d in detected_dt_cols:
        del common_diff_dtypes[d]

    if debug:
        dprint(f"Common columns with different dtypes (after dates):")
        pprint(common_diff_dtypes)

    if df_dtypes == self.dtypes:
        if debug:
            dprint(
                "The incoming DataFrame has mostly the same types as {self}, skipping enforcement."
                + f"The only detected difference was in the following datetime columns.\n"
                + "    Timezone information may be stripped."
            )
            pprint(detected_dt_cols)
        return df

    if len(common_dtypes) == len(df_dtypes):
        min_ratio = STATIC_CONFIG['pipes']['dtypes']['min_ratio_columns_changed_for_full_astype']
        if (
            len(common_diff_dtypes) >= int(len(common_dtypes) * min_ratio)
        ):
            if debug:
                dprint(f"Enforcing data types for {self} on incoming DataFrame...")
            try:
                return df[list(common_dtypes.keys())].astype(self.dtypes)
            except Exception as e:
                warn(f"Encountered an error when enforcing data types for {self}:\n{e}")
                return df
    
    new_df = df.copy()
    for d in common_diff_dtypes:
        t = common_dtypes[d]
        if debug:
            dprint(f"Casting column {d} to dtype {t}.")
        try:
            new_df[d] = new_df[d].astype(t)
        except Exception as e:
            warn(f"Encountered an error when casting column {d} to type {t}:\n{e}")
    return new_df
def exists(self, debug: bool = False) ‑> bool

See if a Pipe's table exists.

Parameters

debug : bool, default False
Verbosity toggle.

Returns

A bool corresponding to whether a pipe's underlying table exists.

Expand source code
def exists(
        self,
        debug : bool = False
    ) -> bool:
    """
    See if a Pipe's table exists.

    Parameters
    ----------
    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `bool` corresponding to whether a pipe's underlying table exists.

    """
    return self.instance_connector.pipe_exists(pipe=self, debug=debug)
def fetch(self, begin: Optional[datetime.datetime, str] = '', end: Optional[datetime.datetime] = None, sync_chunks: bool = False, deactivate_plugin_venv: bool = True, debug: bool = False, **kw: Any) ‑> 'pd.DataFrame or None'

Fetch a Pipe's latest data from its connector.

Parameters

begin : Optional[datetime.datetime, str], default '':
If provided, only fetch data newer than or equal to begin.
end : Optional[datetime.datetime], default None:
If provided, only fetch data older than or equal to end.
sync_chunks : bool, default False
If True and the pipe's connector is of type 'sql', begin syncing chunks while fetching loads chunks into memory.
deactivate_plugin_venv : bool, default True
If True and the pipe's connector is of type 'plugin', deactivate the plugin's virtual environment after retrieving the dataframe. Not intended for general use.
debug : bool, default False
Verbosity toggle.

Returns

A pd.DataFrame of the newest unseen data.

Expand source code
def fetch(
        self,
        begin: Optional[datetime.datetime, str] = '',
        end: Optional[datetime.datetime] = None,
        sync_chunks: bool = False,
        deactivate_plugin_venv: bool = True,
        debug: bool = False,
        **kw: Any
    ) -> 'pd.DataFrame or None':
    """
    Fetch a Pipe's latest data from its connector.

    Parameters
    ----------
    begin: Optional[datetime.datetime, str], default '':
        If provided, only fetch data newer than or equal to `begin`.

    end: Optional[datetime.datetime], default None:
        If provided, only fetch data older than or equal to `end`.

    sync_chunks: bool, default False
        If `True` and the pipe's connector is of type `'sql'`, begin syncing chunks while fetching
        loads chunks into memory.

    deactivate_plugin_venv: bool, default True
        If `True` and the pipe's connector is of type `'plugin'`, deactivate the plugin's
        virtual environment after retrieving the dataframe.
        Not intended for general use.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `pd.DataFrame` of the newest unseen data.

    """
    if 'fetch' not in dir(self.connector):
        from meerschaum.utils.warnings import warn
        warn(f"No `fetch()` function defined for connector '{self.connector}'")
        return None

    from meerschaum.connectors import custom_types
    from meerschaum.utils.debug import dprint, _checkpoint
    if (
        self.connector.type == 'plugin'
        or
        self.connector.type in custom_types
    ):
        from meerschaum.plugins import Plugin
        from meerschaum.utils.packages import activate_venv, deactivate_venv
        plugin_name = (
            self.connector.label if self.connector.type == 'plugin'
            else self.connector.__module__.replace('plugins.', '').split('.')[0]
        )
        connector_plugin = Plugin(plugin_name)
        connector_plugin.activate_venv(debug=debug)
    
    _chunk_hook = kw.pop('chunk_hook') if 'chunk_hook' in kw else None

    df = self.connector.fetch(
        self,
        begin = begin,
        end = end,
        chunk_hook = (
            self.sync if sync_chunks and _chunk_hook is None
            else _chunk_hook
        ),
        debug = debug,
        **kw
    )
    if (
        self.connector.type == 'plugin'
        or
        self.connector.type in custom_types
    ) and deactivate_plugin_venv:
        connector_plugin.deactivate_venv(debug=debug)
    ### Return True if we're syncing in parallel, else continue as usual.
    if sync_chunks:
        return True
    return df
def filter_existing(self, df: "'pd.DataFrame'", begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, chunksize: Optional[int] = -1, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw) ‑> Tuple['pd.DataFrame', Union[('pd.DataFrame', None)]]

Inspect a dataframe and filter out rows which already exist in the pipe.

Parameters

df : 'pd.DataFrame'
The dataframe to inspect and filter.
begin : Optional[datetime.datetime], default None
If provided, use this boundary when searching for existing data.
end : Optional[datetime.datetime], default
If provided, use this boundary when searching for existing data.
chunksize : Optional[int], default -1
The chunksize used when fetching existing data.
params : Optional[Dict[str, Any]], default None
If provided, use this filter when searching for existing data.
debug : bool, default False
Verbosity toggle.

Returns

A pd.DataFrame with existing rows removed.

Expand source code
def filter_existing(
        self,
        df: 'pd.DataFrame',
        begin: Optional[datetime.datetime] = None,
        end: Optional[datetime.datetime] = None,
        chunksize: Optional[int] = -1,
        params: Optional[Dict[str, Any]] = None,
        debug: bool = False,
        **kw
    ) -> Tuple['pd.DataFrame', Union['pd.DataFrame', None]]:
    """
    Inspect a dataframe and filter out rows which already exist in the pipe.

    Parameters
    ----------
    df: 'pd.DataFrame'
        The dataframe to inspect and filter.
        
    begin: Optional[datetime.datetime], default None
        If provided, use this boundary when searching for existing data.

    end: Optional[datetime.datetime], default
        If provided, use this boundary when searching for existing data.

    chunksize: Optional[int], default -1
        The `chunksize` used when fetching existing data.

    params: Optional[Dict[str, Any]], default None
        If provided, use this filter when searching for existing data. 

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `pd.DataFrame` with existing rows removed.

    """
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.misc import round_time
    from meerschaum.utils.packages import attempt_import, import_pandas
    import datetime
    pd = import_pandas()
    if not isinstance(df, pd.DataFrame):
        df = self.enforce_dtypes(df, debug=debug)
    ### begin is the oldest data in the new dataframe
    try:
        min_dt = pd.to_datetime(df[self.get_columns('datetime')].min(skipna=True)).to_pydatetime()
    except Exception as e:
        ### NOTE: This will fetch the entire pipe!
        min_dt = self.get_sync_time(newest=False, debug=debug)
    if not isinstance(min_dt, datetime.datetime) or str(min_dt) == 'NaT':
        ### min_dt might be None, a user-supplied value, or the sync time.
        min_dt = begin
    ### If `min_dt` is None, use `datetime.utcnow()`.
    begin = round_time(
        min_dt,
        to = 'down'
    ) - datetime.timedelta(minutes=1)

    ### end is the newest data in the new dataframe
    try:
        max_dt = pd.to_datetime(df[self.get_columns('datetime')].max(skipna=True)).to_pydatetime()
    except Exception as e:
        max_dt = end
    if not isinstance(max_dt, datetime.datetime) or str(max_dt) == 'NaT':
        max_dt = None

    if max_dt is not None and min_dt > max_dt:
        warn(f"Detected minimum datetime greater than maximum datetime.")

    ### If `max_dt` is `None`, unbound the search.
    end = (
        round_time(
            max_dt,
            to = 'down'
        ) + datetime.timedelta(minutes=1)
    ) if max_dt is not None else end
    if begin is not None and end is not None and begin > end:
        begin = end - datetime.timedelta(minutes=1)

    if debug:
        dprint(f"Looking at data between '{begin}' and '{end}'.", **kw)

    ### backtrack_df is existing Pipe data that overlaps with the fetched df
    try:
        backtrack_minutes = self.parameters['fetch']['backtrack_minutes']
    except Exception as e:
        backtrack_minutes = 0

    backtrack_df = self.get_data(
        begin = begin,
        end = end,
        chunksize = chunksize,
        params = params,
        debug = debug,
        **kw
    )
    if debug:
        dprint("Existing data:\n" + str(backtrack_df), **kw)

    ### Detect changes between the old target and new source dataframes.
    from meerschaum.utils.misc import filter_unseen_df
    delta_df = filter_unseen_df(backtrack_df, df, dtypes=self.dtypes, debug=debug)

    ### Separate new rows from changed ones.
    dt_col = self.columns['datetime'] if self.columns and 'datetime' in self.columns else None
    id_col = self.columns['id'] if self.columns and 'id' in self.columns else None
    if dt_col:
        on_cols = [dt_col] + ([id_col] if id_col is not None else [])
    else:
        on_cols = []

    joined_df = pd.merge(
        delta_df,
        backtrack_df,
        how='left',
        on=on_cols,
        indicator=True,
        suffixes=('', '_old'),
    ) if on_cols else delta_df

    ### Determine which rows are completely new.
    new_rows_mask = (joined_df['_merge'] == 'left_only') if on_cols else None
    cols = list(backtrack_df.columns)

    unseen_df = (
        joined_df
        .where(new_rows_mask)
        .dropna(how='all')[cols]
        .reset_index(drop=True)
    ) if on_cols else delta_df

    ### Rows that have already been inserted but values have changed.
    update_df = (
        joined_df
        .where(~new_rows_mask)
        .dropna(how='all')[cols]
        .reset_index(drop=True)
    ) if on_cols else None

    return unseen_df, update_df, delta_df
def get_backtrack_data(self, backtrack_minutes: int = 0, begin: "Optional['datetime.datetime']" = None, fresh: bool = False, debug: bool = False, **kw: Any) ‑> Union[pd.DataFrame, NoneType]

Get the most recent data from the instance connector as a Pandas DataFrame.

Parameters

backtrack_minutes : int, default 0
How many minutes from begin to select from. Defaults to 0. This may return a few rows due to a rounding quirk.
begin : Optional[datetime.datetime], default None
The starting point to search for data. If begin is None (default), use the most recent observed datetime (AKA sync_time).
E.g. begin = 02:00

Search this region.           Ignore this, even if there's data.
/  /  /  /  /  /  /  /  /  |
-----|----------|----------|----------|----------|----------|
00:00      01:00      02:00      03:00      04:00      05:00

fresh : bool, default False
If True, Ignore local cache and pull directly from the instance connector. Only comes into effect if a pipe was created with cache=True.
debug : bool default False
Verbosity toggle.

Returns

A pd.DataFrame for the pipe's data corresponding to the provided parameters. Backtrack data is a convenient way to get a pipe's data "backtracked" from the most recent datetime.

Expand source code
def get_backtrack_data(
        self,
        backtrack_minutes: int = 0,
        begin: Optional['datetime.datetime'] = None,
        fresh: bool = False,
        debug : bool = False,
        **kw : Any
    ) -> Optional['pd.DataFrame']:
    """
    Get the most recent data from the instance connector as a Pandas DataFrame.

    Parameters
    ----------
    backtrack_minutes: int, default 0
        How many minutes from `begin` to select from.
        Defaults to 0. This may return a few rows due to a rounding quirk.
    begin: Optional[datetime.datetime], default None
        The starting point to search for data.
        If begin is `None` (default), use the most recent observed datetime
        (AKA sync_time).
        
        
    ```
    E.g. begin = 02:00

    Search this region.           Ignore this, even if there's data.
    /  /  /  /  /  /  /  /  /  |
    -----|----------|----------|----------|----------|----------|
    00:00      01:00      02:00      03:00      04:00      05:00

    ```

    fresh: bool, default False
        If `True`, Ignore local cache and pull directly from the instance connector.
        Only comes into effect if a pipe was created with `cache=True`.

    debug: bool default False
        Verbosity toggle.

    Returns
    -------
    A `pd.DataFrame` for the pipe's data corresponding to the provided parameters. Backtrack data
    is a convenient way to get a pipe's data "backtracked" from the most recent datetime.

    """
    from meerschaum.utils.warnings import warn
    kw.update({'backtrack_minutes': backtrack_minutes, 'begin': begin,})

    if not self.exists(debug=debug):
        return None

    if self.cache_pipe is not None:
        if not fresh:
            _sync_cache_tuple = self.cache_pipe.sync(debug=debug, **kw)
            if not _sync_cache_tuple[0]:
                warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1])
                fresh = True
            else: ### Successfully synced cache.
                return self.enforce_dtypes(
                    self.cache_pipe.get_backtrack_data(debug=debug, fresh=True, **kw),
                    debug = debug,
                )

    ### If `fresh` or the syncing failed, directly pull from the instance connector.
    return self.enforce_dtypes(
        self.instance_connector.get_backtrack_data(
            pipe = self,
            debug = debug,
            **kw
        ),
        debug = debug,
    )
def get_columns(self, *args: str, error: bool = True) ‑> Union[str, Tuple[str]]

Check if the requested columns are defined.

Parameters

*args : str
The column names to be retrieved.
error : bool, default True
If True, raise an Exception if the specified column is not defined.

Returns

A tuple of the same size of args or a str if args is a single argument.

Examples

>>> pipe = mrsm.Pipe('test', 'test')
>>> pipe.columns = {'datetime': 'dt', 'id': 'id'}
>>> pipe.get_columns('datetime', 'id')
('dt', 'id')
>>> pipe.get_columns('value')
Exception:  🛑 Missing 'value' column for Pipe('test', 'test').
Expand source code
def get_columns(self, *args: str, error: bool = True) -> Union[str, Tuple[str]]:
    """
    Check if the requested columns are defined.

    Parameters
    ----------
    *args: str
        The column names to be retrieved.
        
    error: bool, default True
        If `True`, raise an `Exception` if the specified column is not defined.

    Returns
    -------
    A tuple of the same size of `args` or a `str` if `args` is a single argument.

    Examples
    --------
    >>> pipe = mrsm.Pipe('test', 'test')
    >>> pipe.columns = {'datetime': 'dt', 'id': 'id'}
    >>> pipe.get_columns('datetime', 'id')
    ('dt', 'id')
    >>> pipe.get_columns('value')
    Exception:  🛑 Missing 'value' column for Pipe('test', 'test').
    """
    from meerschaum.utils.warnings import error as _error, warn
    if not args:
        args = tuple(self.columns.keys())
    col_names = []
    for col in args:
        col_name = None
        try:
            col_name = self.columns[col]
            if col_name is None and error:
                _error(f"Please define the name of the '{col}' column for {self}.")
        except Exception as e:
            col_name = None
        if col_name is None and error:
            _error(f"Missing '{col}'" + f" column for {self}.")
        col_names.append(col_name)
    if len(col_names) == 1:
        return col_names[0]
    return tuple(col_names)
def get_columns_types(self, debug: bool = False) ‑> Union[Dict[str, str], NoneType]

Get a dictionary of a pipe's column names and their types.

Parameters

debug : bool, default False:
Verbosity toggle.

Returns

A dictionary of column names (str) to column types (str).

Examples

>>> pipe.get_columns_types()
{
  'dt': 'TIMESTAMP WITHOUT TIMEZONE',
  'id': 'BIGINT',
  'val': 'DOUBLE PRECISION',
}
>>>
Expand source code
def get_columns_types(self, debug: bool = False) -> Union[Dict[str, str], None]:
    """
    Get a dictionary of a pipe's column names and their types.

    Parameters
    ----------
    debug: bool, default False:
        Verbosity toggle.

    Returns
    -------
    A dictionary of column names (`str`) to column types (`str`).

    Examples
    --------
    >>> pipe.get_columns_types()
    {
      'dt': 'TIMESTAMP WITHOUT TIMEZONE',
      'id': 'BIGINT',
      'val': 'DOUBLE PRECISION',
    }
    >>>
    """
    return self.instance_connector.get_pipe_columns_types(self, debug=debug)
def get_data(self, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, params: Optional[Dict[str, Any]] = None, fresh: bool = False, debug: bool = False, **kw: Any) ‑> Union[('pd.DataFrame', None)]

Get a pipe's data from the instance connector.

Parameters

begin : Optional[datetime.datetime], default None
Lower bound datetime to begin searching for data (inclusive). Translates to a WHERE clause like WHERE datetime >= begin. Defaults to None.
end : Optional[datetime.datetime], default None
Upper bound datetime to stop searching for data (inclusive). Translates to a WHERE clause like WHERE datetime < end. Defaults to None.
params : Optional[Dict[str, Any]], default None
Filter the retrieved data by a dictionary of parameters. See build_where() for more details.
fresh : bool, default True
If True, skip local cache and directly query the instance connector. Defaults to True.
debug : bool, default False
Verbosity toggle. Defaults to False.

Returns

A pd.DataFrame for the pipe's data corresponding to the provided parameters.

Expand source code
def get_data(
        self,
        begin: Optional[datetime.datetime] = None,
        end: Optional[datetime.datetime] = None,
        params: Optional[Dict[str, Any]] = None,
        fresh: bool = False,
        debug: bool = False,
        **kw: Any
    ) -> Union['pd.DataFrame', None]:
    """
    Get a pipe's data from the instance connector.

    Parameters
    ----------
    begin: Optional[datetime.datetime], default None
        Lower bound datetime to begin searching for data (inclusive).
        Translates to a `WHERE` clause like `WHERE datetime >= begin`.
        Defaults to `None`.

    end: Optional[datetime.datetime], default None
        Upper bound datetime to stop searching for data (inclusive).
        Translates to a `WHERE` clause like `WHERE datetime < end`.
        Defaults to `None`.

    params: Optional[Dict[str, Any]], default None
        Filter the retrieved data by a dictionary of parameters.
        See `meerschaum.utils.sql.build_where` for more details. 

    fresh: bool, default True
        If `True`, skip local cache and directly query the instance connector.
        Defaults to `True`.

    debug: bool, default False
        Verbosity toggle.
        Defaults to `False`.

    Returns
    -------
    A `pd.DataFrame` for the pipe's data corresponding to the provided parameters.

    """
    from meerschaum.utils.warnings import warn
    kw.update({'begin': begin, 'end': end, 'params': params,})

    if not self.exists(debug=debug):
        return None

    if self.cache_pipe is not None:
        if not fresh:
            _sync_cache_tuple = self.cache_pipe.sync(debug=debug, **kw)
            if not _sync_cache_tuple[0]:
                warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1])
                fresh = True
            else: ### Successfully synced cache.
                return self.enforce_dtypes(
                    self.cache_pipe.get_data(debug=debug, fresh=True, **kw),
                    debug = debug,
                )

    ### If `fresh` or the syncing failed, directly pull from the instance connector.
    return self.enforce_dtypes(
        self.instance_connector.get_pipe_data(
            pipe = self,
            debug = debug,
            **kw
        ),
        debug = debug,
    )
def get_id(self, **kw: Any) ‑> Union[int, NoneType]

Fetch a pipe's ID from its instance connector. If the pipe does not exist, return None.

Expand source code
def get_id(self, **kw: Any) -> Union[int, None]:
    """
    Fetch a pipe's ID from its instance connector.
    If the pipe does not exist, return `None`.
    """
    return self.instance_connector.get_pipe_id(self, **kw)
def get_rowcount(self, begin: "Optional['datetime.datetime']" = None, end: "Optional['datetime.datetime']" = None, remote: bool = False, params: Optional[Dict[str, Any]] = None, debug: bool = False) ‑> Union[int, NoneType]

Get a Pipe's instance or remote rowcount.

Parameters

begin : Optional[datetime.datetime], default None
Count rows where datetime > begin.
end : Optional[datetime.datetime], default None
Count rows where datetime < end.
remote : bool, default False
Count rows from a pipe's remote source. NOTE: This is experimental!
debug : bool, default False
Verbosity toggle.

Returns

An int of the number of rows in the pipe corresponding to the provided parameters. None is returned if the pipe does not exist.

Expand source code
def get_rowcount(
        self,
        begin: Optional['datetime.datetime'] = None,
        end: Optional['datetime.datetime'] = None,
        remote: bool = False,
        params: Optional[Dict[str, Any]] = None,
        debug: bool = False
    ) -> Union[int, None]:
    """
    Get a Pipe's instance or remote rowcount.

    Parameters
    ----------
    begin: Optional[datetime.datetime], default None
        Count rows where datetime > begin.

    end: Optional[datetime.datetime], default None
        Count rows where datetime < end.

    remote: bool, default False
        Count rows from a pipe's remote source.
        **NOTE**: This is experimental!

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    An `int` of the number of rows in the pipe corresponding to the provided parameters.
    `None` is returned if the pipe does not exist.

    """
    from meerschaum.utils.warnings import warn
    connector = self.instance_connector if not remote else self.connector
    try:
        return connector.get_pipe_rowcount(
            self, begin=begin, end=end, remote=remote, params=params, debug=debug
        )
    except AttributeError as e:
        warn(e)
        if remote:
            return None
    warn(f"Failed to get a rowcount for {self}.")
    return None
def get_sync_time(self, params: Optional[Dict[str, Any]] = None, newest: bool = True, round_down: bool = True, debug: bool = False) ‑> Union[datetime.datetime, NoneType]

Get the most recent datetime value for a Pipe.

Parameters

params : Optional[Dict[str, Any]], default None
Dictionary to build a WHERE clause for a specific column. See build_where().
newest : bool, default True
If True, get the most recent datetime (honoring params). If False, get the oldest datetime (ASC instead of DESC).
round_down : bool, default True
If True, round down the sync time to the nearest minute.
debug : bool, default False
Verbosity toggle.

Returns

A datetime.datetime object if the pipe exists, otherwise None.

Expand source code
def get_sync_time(
        self,
        params: Optional[Dict[str, Any]] = None,
        newest: bool = True,
        round_down: bool = True,
        debug: bool = False
    ) -> Union['datetime.datetime', None]:
    """
    Get the most recent datetime value for a Pipe.

    Parameters
    ----------
    params: Optional[Dict[str, Any]], default None
        Dictionary to build a WHERE clause for a specific column.
        See `meerschaum.utils.sql.build_where`.

    newest: bool, default True
        If `True`, get the most recent datetime (honoring `params`).
        If `False`, get the oldest datetime (`ASC` instead of `DESC`).

    round_down: bool, default True
        If `True`, round down the sync time to the nearest minute.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `datetime.datetime` object if the pipe exists, otherwise `None`.

    """
    from meerschaum.utils.warnings import error, warn
    if self.columns is None:
        warn(
            f"No columns found in parameters for {self}.",
            stack = False,
        )

    if 'datetime' not in (self.columns or {}):
        warn(
            f"'datetime' must be declared in parameters:columns for {self}.\n"
            + f"    You can add parameters for this pipe with the following command:\n\n"
            + f"    mrsm edit pipes -c {self.connector_keys} -m "
            + f"{self.metric_key} -l "
            + (f"[None]" if self.location_key is None else f"{self.location_key}"),
            stack = False,
        )

    return self.instance_connector.get_sync_time(
        self,
        params = params,
        newest = newest,
        round_down = round_down,
        debug = debug,
    )
def get_val_column(self, debug: bool = False) ‑> Union[str, NoneType]

Return the name of the value column if it's defined, otherwise make an educated guess. If not set in the columns dictionary, return the first numeric column that is not an ID or datetime column. If none may be found, return None.

Parameters

debug : bool, default False:
Verbosity toggle.

Returns

Either a string or None.

Expand source code
def get_val_column(self, debug: bool = False) -> Union[str, None]:
    """
    Return the name of the value column if it's defined, otherwise make an educated guess.
    If not set in the `columns` dictionary, return the first numeric column that is not
    an ID or datetime column.
    If none may be found, return `None`.

    Parameters
    ----------
    debug: bool, default False:
        Verbosity toggle.

    Returns
    -------
    Either a string or `None`.
    """
    from meerschaum.utils.debug import dprint
    if debug:
        dprint('Attempting to determine the value column...')
    try:
        val_name = self.get_columns('value')
    except Exception as e:
        val_name = None
    if val_name is not None:
        if debug:
            dprint(f"Value column: {val_name}")
        return val_name

    cols = self.columns
    if cols is None:
        if debug:
            dprint('No columns could be determined. Returning...')
        return None
    try:
        dt_name = self.get_columns('datetime')
    except Exception as e:
        dt_name = None
    try:
        id_name = self.get_columns('id')
    except Exception as e:
        id_name = None

    if debug:
        dprint(f"dt_name: {dt_name}")
        dprint(f"id_name: {id_name}")

    cols_types = self.get_columns_types(debug=debug)
    if cols_types is None:
        return None
    if debug:
        dprint(f"cols_types: {cols_types}")
    if dt_name is not None:
        cols_types.pop(dt_name, None)
    if id_name is not None:
        cols_types.pop(id_name, None)

    candidates = []
    candidate_keywords = {'float', 'double', 'precision', 'int', 'numeric',}
    for search_term in candidate_keywords:
        for col, typ in cols_types.items():
            if search_term in typ.lower():
                candidates.append(col)
                break
    if not candidates:
        if debug:
            dprint(f"No value column could be determined.")
        return None

    return candidates[0]
def guess_datetime(self) ‑> Union[str, NoneType]

Try to determine a pipe's datetime column.

Expand source code
def guess_datetime(self) -> Union[str, None]:
    """
    Try to determine a pipe's datetime column.
    """
    dt_cols = [
        col for col, typ in self.dtypes.items()
        if typ.startswith('datetime')
    ]
    if not dt_cols:
        return None
    return dt_cols[0]    
def infer_dtypes(self, persist: bool = False, debug: bool = False) ‑> Dict[str, Any]

If dtypes is not set in Pipe.parameters, infer the data types from the underlying table if it exists.

Parameters

persist : bool, default False
If True, persist the inferred data types to Pipe.parameters.

Returns

A dictionary of strings containing the pandas data types for this Pipe.

Expand source code
def infer_dtypes(self, persist: bool=False, debug: bool=False) -> Dict[str, Any]:
    """
    If `dtypes` is not set in `meerschaum.Pipe.parameters`,
    infer the data types from the underlying table if it exists.

    Parameters
    ----------
    persist: bool, default False
        If `True`, persist the inferred data types to `meerschaum.Pipe.parameters`.

    Returns
    -------
    A dictionary of strings containing the pandas data types for this Pipe.
    """
    if not self.exists(debug=debug):
        dtypes = {}
        if not self.columns:
            return {}
        if 'datetime' in self.columns:
            dtypes[self.columns['datetime']] = 'datetime64[ns]'
        return dtypes
    from meerschaum.utils.sql import get_pd_type
    columns_types = self.get_columns_types(debug=debug)
    dtypes = {
        c: get_pd_type(t) for c, t in columns_types.items()
    } if columns_types else {}
    if persist:
        self.dtypes = dtypes
        self.edit(interactive=False, debug=debug)
    return dtypes
def register(self, debug: bool = False) ‑> Tuple[bool, str]

Register a new Pipe along with its attributes.

Parameters

debug : bool, default False
Verbosity toggle.

Returns

A SuccessTuple of success, message.

Expand source code
def register(
        self,
        debug: bool = False
    ) -> SuccessTuple:
    """
    Register a new Pipe along with its attributes.

    Parameters
    ----------
    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `SuccessTuple` of success, message.

    """
    from meerschaum.connectors import custom_types
    from meerschaum.utils.formatting import get_console
    import warnings
    with warnings.catch_warnings():
        warnings.simplefilter('ignore')
        try:
            _conn = self.connector
        except Exception as e:
            _conn = None

    if (
        _conn is not None
        and
        (_conn.type == 'plugin' or _conn.type in custom_types)
        and
        getattr(_conn, 'register', None) is not None
    ):
        try:
            params = self.connector.register(self)
        except Exception as e:
            get_console().print_exception()
            params = None
        params = {} if params is None else params
        if not isinstance(params, dict):
            from meerschaum.utils.warnings import warn
            warn(
                f"Invalid parameters returned from `register()` in connector {self.connector}:\n"
                + f"{params}"
            )
        else:
            self.parameters = params

    if not self.parameters:
        cols = self.columns if self.columns else {'datetime': None, 'id': None}
        self.parameters = {
            'columns': cols,
        }

    return self.instance_connector.register_pipe(self, debug=debug)
def show(self, nopretty: bool = False, debug: bool = False, **kw) ‑> Tuple[bool, str]

Show attributes of a Pipe.

Parameters

nopretty : bool, default False
If True, simply print the JSON of the pipe's attributes.
debug : bool, default False
Verbosity toggle.

Returns

A SuccessTuple of success, message.

Expand source code
def show(
        self,
        nopretty: bool = False,
        debug: bool = False,
        **kw
    ) -> SuccessTuple:
    """
    Show attributes of a Pipe.

    Parameters
    ----------
    nopretty: bool, default False
        If `True`, simply print the JSON of the pipe's attributes.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `SuccessTuple` of success, message.

    """
    import json
    from meerschaum.utils.formatting import pprint, make_header, ANSI, highlight_pipes, fill_ansi
    from meerschaum.utils.warnings import info
    if not nopretty:
        _to_print = f"Attributes for {self}:"
        if ANSI:
            _to_print = fill_ansi(highlight_pipes(make_header(_to_print)), 'magenta')
        print(_to_print)
        pprint(self.attributes)
    else:
        print(json.dumps(self.attributes))

    return True, "Success"
def sync(self, df: Union[pd.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]], InferFetch] = meerschaum.core.Pipe._sync.InferFetch, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, force: bool = False, retries: int = 10, min_seconds: int = 1, check_existing: bool = True, blocking: bool = True, workers: Optional[int] = None, callback: Optional[Callable[[Tuple[bool, str]], Any]] = None, error_callback: Optional[Callable[[Exception], Any]] = None, chunksize: Optional[int] = -1, sync_chunks: bool = False, deactivate_plugin_venv: bool = True, debug: bool = False, **kw: Any) ‑> SuccessTuple

Fetch new data from the source and update the pipe's table with new data.

Get new remote data via fetch, get existing data in the same time period, and merge the two, only keeping the unseen data.

Parameters

df : Union[None, pd.DataFrame, Dict[str, List[Any]]], default None
An optional DataFrame to sync into the pipe. Defaults to None.
begin : Optional[datetime.datetime], default None
Optionally specify the earliest datetime to search for data. Defaults to None.
end : Optional[datetime.datetime], default None
Optionally specify the latest datetime to search for data. Defaults to None.
force : bool, default False
If True, keep trying to sync untul retries attempts. Defaults to False.
retries : int, default 10
If force, how many attempts to try syncing before declaring failure. Defaults to 10.
min_seconds : Union[int, float], default 1
If force, how many seconds to sleep between retries. Defaults to 1.
check_existing : bool, default True
If True, pull and diff with existing data from the pipe. Defaults to True.
blocking : bool, default True
If True, wait for sync to finish and return its result, otherwise asyncronously sync (oxymoron?) and return success. Defaults to True. Only intended for specific scenarios.
workers : Optional[int], default None
No use directly within sync(). Instead is passed on to instance connectors' sync_pipe() methods (e.g. meerschaum.connectors.plugin.PluginConnector). Defaults to None.
callback : Optional[Callable[[Tuple[bool, str]], Any]], default None
Callback function which expects a SuccessTuple as input. Only applies when blocking=False.
error_callback : Optional[Callable[[Exception], Any]], default None
Callback function which expects an Exception as input. Only applies when blocking=False.
chunksize : int, default -1
Specify the number of rows to sync per chunk. If -1, resort to system configuration (default is 900). A chunksize of None will sync all rows in one transaction. Defaults to -1.
sync_chunks : bool, default False
If possible, sync chunks while fetching them into memory. Defaults to False.
deactivate_plugin_venv : bool, default True
If True, deactivate a plugin's virtual environment after syncing. Defaults to True.
debug : bool, default False
Verbosity toggle. Defaults to False.

Returns

A SuccessTuple of success (bool) and message (str).

Expand source code
def sync(
        self,
        df: Union[
            pd.DataFrame,
            Dict[str, List[Any]],
            List[Dict[str, Any]],
            InferFetch
        ] = InferFetch,
        begin: Optional[datetime.datetime] = None,
        end: Optional[datetime.datetime] = None,
        force: bool = False,
        retries: int = 10,
        min_seconds: int = 1,
        check_existing: bool = True,
        blocking: bool = True,
        workers: Optional[int] = None,
        callback: Optional[Callable[[Tuple[bool, str]], Any]] = None,
        error_callback: Optional[Callable[[Exception], Any]] = None,
        chunksize: Optional[int] = -1,
        sync_chunks: bool = False,
        deactivate_plugin_venv: bool = True,
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """
    Fetch new data from the source and update the pipe's table with new data.
    
    Get new remote data via fetch, get existing data in the same time period,
    and merge the two, only keeping the unseen data.

    Parameters
    ----------
    df: Union[None, pd.DataFrame, Dict[str, List[Any]]], default None
        An optional DataFrame to sync into the pipe. Defaults to `None`.

    begin: Optional[datetime.datetime], default None
        Optionally specify the earliest datetime to search for data.
        Defaults to `None`.

    end: Optional[datetime.datetime], default None
        Optionally specify the latest datetime to search for data.
        Defaults to `None`.

    force: bool, default False
        If `True`, keep trying to sync untul `retries` attempts.
        Defaults to `False`.

    retries: int, default 10
        If `force`, how many attempts to try syncing before declaring failure.
        Defaults to `10`.

    min_seconds: Union[int, float], default 1
        If `force`, how many seconds to sleep between retries. Defaults to `1`.

    check_existing: bool, default True
        If `True`, pull and diff with existing data from the pipe.
        Defaults to `True`.

    blocking: bool, default True
        If `True`, wait for sync to finish and return its result, otherwise
        asyncronously sync (oxymoron?) and return success. Defaults to `True`.
        Only intended for specific scenarios.

    workers: Optional[int], default None
        No use directly within `Pipe.sync()`. Instead is passed on to
        instance connectors' `sync_pipe()` methods
        (e.g. `meerschaum.connectors.plugin.PluginConnector`).
        Defaults to `None`.

    callback: Optional[Callable[[Tuple[bool, str]], Any]], default None
        Callback function which expects a SuccessTuple as input.
        Only applies when `blocking=False`.

    error_callback: Optional[Callable[[Exception], Any]], default None
        Callback function which expects an Exception as input.
        Only applies when `blocking=False`.

    chunksize: int, default -1
        Specify the number of rows to sync per chunk.
        If `-1`, resort to system configuration (default is `900`).
        A `chunksize` of `None` will sync all rows in one transaction.
        Defaults to `-1`.

    sync_chunks: bool, default False
        If possible, sync chunks while fetching them into memory.
        Defaults to `False`.

    deactivate_plugin_venv: bool, default True
        If `True`, deactivate a plugin's virtual environment after syncing.
        Defaults to `True`.

    debug: bool, default False
        Verbosity toggle. Defaults to False.

    Returns
    -------
    A `SuccessTuple` of success (`bool`) and message (`str`).

    """
    from meerschaum.utils.debug import dprint, _checkpoint
    from meerschaum.utils.warnings import warn, error
    from meerschaum.connectors import custom_types
    from meerschaum.plugins import Plugin
    from meerschaum.utils.formatting import get_console
    import datetime
    import time
    if (callback is not None or error_callback is not None) and blocking:
        warn("Callback functions are only executed when blocking = False. Ignoring...")

    _checkpoint(_total=2, **kw)

    ### NOTE: Setting begin to the sync time for Simple Sync.
    ### TODO: Add flag for specifying syncing method.
    begin = _determine_begin(self, begin, debug=debug)
    kw.update({
        'begin': begin, 'end': end, 'force': force, 'retries': retries, 'min_seconds': min_seconds,
        'check_existing': check_existing, 'blocking': blocking, 'workers': workers,
        'callback': callback, 'error_callback': error_callback, 'sync_chunks': sync_chunks,
        'chunksize': chunksize,
    })


    def _sync(
        p: 'meerschaum.Pipe',
        df: Union[
            'pd.DataFrame',
            Dict[str, List[Any]],
            List[Dict[str, Any]],
            InferFetch
        ] = InferFetch,
    ) -> SuccessTuple:
        if df is None:
            return (
                False,
                f"You passed `None` instead of data into `sync()` for {p}.\n"
                + "Omit the DataFrame to infer fetching.",
            )
        ### Ensure that Pipe is registered.
        if p.get_id(debug=debug) is None:
            ### NOTE: This may trigger an interactive session for plugins!
            register_tuple = p.register(debug=debug)
            if not register_tuple[0]:
                return register_tuple

        ### If connector is a plugin with a `sync()` method, return that instead.
        ### If the plugin does not have a `sync()` method but does have a `fetch()` method,
        ### use that instead.
        ### NOTE: The DataFrame must be omitted for the plugin sync method to apply.
        ### If a DataFrame is provided, continue as expected.
        if hasattr(df, 'MRSM_INFER_FETCH'):
            try:
                if p.connector is None:
                    msg = f"{p} does not have a valid connector."
                    if p.connector_keys.startswith('plugin:'):
                        msg += f"\n    Perhaps {p.connector_keys} has a syntax error?"
                    return False, msg
            except Exception as e:
                return False, f"Unable to create the connector for {p}."

            ### Activate and invoke `sync(pipe)` for plugin connectors with `sync` methods.
            try:
                if p.connector.type == 'plugin' and p.connector.sync is not None:
                    connector_plugin = Plugin(p.connector.label)
                    connector_plugin.activate_venv(debug=debug)
                    return_tuple = p.connector.sync(p, debug=debug, **kw)
                    if deactivate_plugin_venv:
                        connector_plugin.deactivate_venv(debug=debug)
                    if not isinstance(return_tuple, tuple):
                        return_tuple = (
                            False,
                            f"Plugin '{p.connector.label}' returned non-tuple value: {return_tuple}"
                        )
                    return return_tuple

            except Exception as e:
                get_console().print_exception()
                msg = f"Failed to sync {p} with exception: '" + str(e) + "'"
                if debug:
                    error(msg, silent=False)
                return False, msg

            ### Fetch the dataframe from the connector's `fetch()` method.
            try:
                ### If was added by `make_connector`, activate the plugin's virtual environment.
                is_custom = p.connector.type in custom_types
                plugin = (
                    Plugin(p.connector.__module__.replace('plugins.', '').split('.')[0])
                    if is_custom else (
                        Plugin(p.connector.label) if p.connector.type == 'plugin'
                        else None
                    )
                )
                if plugin is not None:
                    plugin.activate_venv(debug=debug)

                df = p.fetch(debug=debug, **kw)

                if plugin is not None and deactivate_plugin_venv:
                    plugin.deactivate_venv(debug=debug)
            except Exception as e:
                get_console().print_exception(
                    suppress = [
                        'meerschaum/core/Pipe/_sync.py', 
                        'meerschaum/core/Pipe/_fetch.py', 
                    ]
                )
                msg = f"Failed to fetch data from {p.connector}:\n    {e}"
                df = None

            if df is None:
                return False, f"No data were fetched for {p}."

            ### TODO: Depreciate async?
            if df is True:
                return True, f"{p} is being synced in parallel."

        ### CHECKPOINT: Retrieved the DataFrame.
        _checkpoint(**kw)
        ### Cast to a dataframe and ensure datatypes are what we expect.
        df = self.enforce_dtypes(df, debug=debug)
        if debug:
            dprint(
                "DataFrame to sync:\n"
                + (str(df)[:255] + '...' if len(str(df)) >= 256 else str(df)),
                **kw
            )

        ### if force, continue to sync until success
        return_tuple = False, f"Did not sync {p}."
        run = True
        _retries = 1
        while run:
            return_tuple = p.instance_connector.sync_pipe(
                pipe = p,
                df = df,
                debug = debug,
                **kw
            )
            _retries += 1
            run = (not return_tuple[0]) and force and _retries <= retries
            if run and debug:
                dprint(f"Syncing failed for {p}. Attempt ( {_retries} / {retries} )", **kw)
                dprint(f"Sleeping for {min_seconds} seconds...", **kw)
                time.sleep(min_seconds)
            if _retries > retries:
                warn(
                    f"Unable to sync {p} within {retries} attempt" +
                        ("s" if retries != 1 else "") + "!"
                )

        ### CHECKPOINT: Finished syncing. Handle caching.
        _checkpoint(**kw)
        if self.cache_pipe is not None:
            if debug:
                dprint(f"Caching retrieved dataframe.", **kw)
                _sync_cache_tuple = self.cache_pipe.sync(df, debug=debug, **kw)
                if not _sync_cache_tuple[0]:
                    warn(f"Failed to sync local cache for {self}.")

        return return_tuple

    if blocking:
        return _sync(self, df = df)

    ### TODO implement concurrent syncing (split DataFrame? mimic the functionality of modin?)
    from meerschaum.utils.threading import Thread
    def default_callback(result_tuple : SuccessTuple):
        dprint(f"Asynchronous result from {self}: {result_tuple}", **kw)
    def default_error_callback(x : Exception):
        dprint(f"Error received for {self}: {x}", **kw)
    if callback is None and debug:
        callback = default_callback
    if error_callback is None and debug:
        error_callback = default_error_callback
    try:
        thread = Thread(
            target = _sync,
            args = (self,),
            kwargs = {'df' : df},
            daemon = False,
            callback = callback,
            error_callback = error_callback
        )
        thread.start()
    except Exception as e:
        return False, str(e)
    return True, f"Spawned asyncronous sync for {self}."
class Plugin (name: str, version: Optional[str] = None, user_id: Optional[int] = None, required: Optional[List[str]] = None, attributes: Optional[Dict[str, Any]] = None, archive_path: Optional[pathlib.Path] = None, venv_path: Optional[pathlib.Path] = None, repo_connector: "Optional['meerschaum.connectors.api.APIConnector']" = None, repo: "Union['meerschaum.connectors.api.APIConnector', str, None]" = None)

Handle packaging of Meerschaum plugins.

Expand source code
class Plugin:
    """Handle packaging of Meerschaum plugins."""
    def __init__(
        self,
        name: str,
        version: Optional[str] = None,
        user_id: Optional[int] = None,
        required: Optional[List[str]] = None,
        attributes: Optional[Dict[str, Any]] = None,
        archive_path: Optional[pathlib.Path] = None,
        venv_path: Optional[pathlib.Path] = None,
        repo_connector: Optional['meerschaum.connectors.api.APIConnector'] = None,
        repo: Union['meerschaum.connectors.api.APIConnector', str, None] = None,
    ):
        from meerschaum.config.static import STATIC_CONFIG
        sep = STATIC_CONFIG['plugins']['repo_separator']
        _repo = None
        if sep in name:
            try:
                name, _repo = name.split(sep)
            except Exception as e:
                error(f"Invalid plugin name: '{name}'")
        self._repo_in_name = _repo

        if attributes is None:
            attributes = {}
        self.name = name
        self.attributes = attributes
        self.user_id = user_id
        self._version = version
        if required:
            self._required = required
        self.archive_path = (
            archive_path if archive_path is not None
            else PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz"
        )
        self.venv_path = (
            venv_path if venv_path is not None
            else VIRTENV_RESOURCES_PATH / self.name
        )
        self._repo_connector = repo_connector
        self._repo_keys = repo


    @property
    def repo_connector(self):
        """
        Return the repository connector for this plugin.
        NOTE: This imports the `connectors` module, which imports certain plugin modules.
        """
        if self._repo_connector is None:
            from meerschaum.connectors.parse import parse_repo_keys

            repo_keys = self._repo_keys or self._repo_in_name
            if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name:
                error(
                    f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'."
                )
            repo_connector = parse_repo_keys(repo_keys)
            self._repo_connector = repo_connector
        return self._repo_connector


    @property
    def version(self):
        """
        Return the plugin's module version is defined (`__version__`) if it's defined.
        """
        if self._version is None:
            try:
                self._version = self.module.__version__
            except Exception as e:
                self._version = None
        return self._version


    @property
    def module(self):
        """
        Return the Python module of the underlying plugin.
        """
        if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None:
            if self.__file__ is None:
                return None
            from meerschaum.plugins import import_plugins
            self._module = import_plugins(str(self), warn=False)
        return self._module

    @property
    def __file__(self) -> Union[str, None]:
        """
        Return the file path (str) of the plugin if it exists, otherwise `None`.
        """
        if '_module' in self.__dict__:
            return self.module.__file__
        potential_dir = PLUGINS_RESOURCES_PATH / self.name
        if (
            potential_dir.exists()
            and potential_dir.is_dir()
            and (potential_dir / '__init__.py').exists()
        ):
            return str(potential_dir / '__init__.py')

        potential_file = PLUGINS_RESOURCES_PATH / (self.name + '.py')
        if potential_file.exists() and not potential_file.is_dir():
            return str(potential_file)

        return None


    @property
    def requirements_file_path(self) -> Union[pathlib.Path, None]:
        """
        If a file named `requirements.txt` exists, return its path.
        """
        if self.__file__ is None:
            return None
        path = pathlib.Path(self.__file__).parent / 'requirements.txt'
        if not path.exists():
            return None
        return path


    def is_installed(self, try_import: bool = True) -> bool:
        """
        Check whether a plugin is correctly installed.
        **NOTE:** This plugin will import the plugin's module.
        Set `try_import` to `False` to avoid importing.

        Parameters
        ----------
        try_import: bool, default True
            If `True`, attempt importing the plugin's module.            

        Returns
        -------
        A `bool` indicating whether a plugin exists and is successfully imported.
        """
        #  if not self.__file__:
            #  return False
        return self.__file__ is not None
        #  try:
            #  _installed = (
                #  self.__dict__.get('_module', None) is not None and self.__file__ is not None
            #  ) if try_import else (self.__file__ is not None)
        #  except ModuleNotFoundError as e:
            #  _installed = False
        #  return _installed


    def make_tar(self, debug: bool = False) -> pathlib.Path:
        """
        Compress the plugin's source files into a `.tar.gz` archive and return the archive's path.

        Parameters
        ----------
        debug: bool, default False
            Verbosity toggle.

        Returns
        -------
        A `pathlib.Path` to the archive file's path.

        """
        import tarfile, pathlib, subprocess, fnmatch
        from meerschaum.utils.debug import dprint
        from meerschaum.utils.packages import attempt_import
        pathspec = attempt_import('pathspec', debug=debug)

        old_cwd = os.getcwd()
        os.chdir(PLUGINS_RESOURCES_PATH)

        if not self.__file__:
            from meerschaum.utils.warnings import error
            error(f"Could not find file for plugin '{self}'.")
        if '__init__.py' in self.__file__ or os.path.isdir(self.__file__):
            path = self.__file__.replace('__init__.py', '')
            is_dir = True
        else:
            path = self.__file__
            is_dir = False

        default_patterns_to_ignore = [
            '.pyc',
            '__pycache__/',
            'eggs/',
            '__pypackages__/',
            '.git',
        ]

        def parse_gitignore() -> 'Set[str]':
            gitignore_path = pathlib.Path(path) / '.gitignore'
            if not gitignore_path.exists():
                return set()
            with open(gitignore_path, 'r', encoding='utf-8') as f:
                gitignore_text = f.read()
            return set(pathspec.PathSpec.from_lines(
                pathspec.patterns.GitWildMatchPattern,
                default_patterns_to_ignore + gitignore_text.splitlines()
            ).match_tree(path))

        patterns_to_ignore = parse_gitignore() if is_dir else set()

        if debug:
            dprint(f"Patterns to ignore:\n{patterns_to_ignore}")

        with tarfile.open(self.archive_path, 'w:gz') as tarf:
            if not is_dir:
                tarf.add(f"{self.name}.py")
            else:
                for root, dirs, files in os.walk(self.name):
                    for f in files:
                        good_file = True
                        fp = os.path.join(root, f)
                        for pattern in patterns_to_ignore:
                            if pattern in str(fp) or f.startswith('.'):
                                good_file = False
                                break
                        if good_file:
                            if debug:
                                dprint(f"Adding '{fp}'...")
                            tarf.add(fp)

        ### clean up and change back to old directory
        os.chdir(old_cwd)

        ### change to 775 to avoid permissions issues with the API in a Docker container
        self.archive_path.chmod(0o775)

        if debug:
            dprint(f"Created archive '{self.archive_path}'.")
        return self.archive_path


    def install(
            self,
            force: bool = False,
            debug: bool = False,
        ) -> SuccessTuple:
        """
        Extract a plugin's tar archive to the plugins directory.
        
        This function checks if the plugin is already installed and if the version is equal or
        greater than the existing installation.

        Parameters
        ----------
        force: bool, default False
            If `True`, continue with installation, even if required packages fail to install.

        debug: bool, default False
            Verbosity toggle.

        Returns
        -------
        A `SuccessTuple` of success (bool) and a message (str).

        """
        if self.full_name in _ongoing_installations:
            return True, "Already installing plugin '{self}'."
        _ongoing_installations.add(self.full_name)
        from meerschaum.utils.warnings import warn, error
        if debug:
            from meerschaum.utils.debug import dprint
        import tarfile
        from meerschaum.plugins import reload_plugins
        from meerschaum.utils.packages import attempt_import, determine_version
        from meerschaum.utils.venv import init_venv
        old_cwd = os.getcwd()
        old_version = ''
        new_version = ''
        temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name
        temp_dir.mkdir(exist_ok=True)

        if not self.archive_path.exists():
            return False, f"Missing archive file for plugin '{self}'."
        if self.version is not None:
            old_version = self.version
            if debug:
                dprint(f"Found existing version '{old_version}' for plugin '{self}'.")
        try:
            with tarfile.open(self.archive_path, 'r:gz') as tarf:
                tarf.extractall(temp_dir)
        except Exception as e:
            warn(e)
            return False, f"Failed to extract plugin '{self.name}'."

        if debug:
            dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...")

        ### search for version information
        files = os.listdir(temp_dir)
        
        if str(files[0]) == self.name:
            is_dir = True
        elif str(files[0]) == self.name + '.py':
            is_dir = False
        else:
            error(f"Unknown format encountered for plugin '{self}'.")

        fpath = temp_dir / files[0]
        if is_dir:
            fpath = fpath / '__init__.py'

        init_venv(self.name, debug=debug)
        new_version = determine_version(
            fpath,
            import_name = self.name,
            search_for_metadata = False,
            warn = True,
            debug = debug,
            venv = self.name,
        )
        if not new_version:
            warn(
                f"No `__version__` defined for plugin '{self}'. "
                + "Assuming new version...",
                stack = False,
            )

        packaging_version = attempt_import('packaging.version')
        try:
            is_new_version = (not new_version and not old_version) or (
                packaging_version.parse(old_version) < packaging_version.parse(new_version)
            )
            is_same_version = new_version and old_version and (
                packaging_version.parse(old_version) == packaging_version.parse(new_version)
            )
        except Exception as e:
            is_new_version, is_same_version = True, False
        success_msg = f"Successfully installed plugin '{self}'."
        success, abort = None, None
        if is_same_version and not force:
            success, msg = True, (
                f"Plugin '{self}' is up-to-date (version {old_version}).\n" +
                "    Install again with `-f` or `--force` to reinstall."
            )
            abort = True
        elif is_new_version or force:
            for src_dir, dirs, files in os.walk(temp_dir):
                if success is not None:
                    break
                dst_dir = str(src_dir).replace(str(temp_dir), str(PLUGINS_RESOURCES_PATH))
                if not os.path.exists(dst_dir):
                    os.mkdir(dst_dir)
                for f in files:
                    src_file = os.path.join(src_dir, f)
                    dst_file = os.path.join(dst_dir, f)
                    if os.path.exists(dst_file):
                        os.remove(dst_file)

                    if debug:
                        dprint(f"Moving '{src_file}' to '{dst_dir}'...")
                    try:
                        shutil.move(src_file, dst_dir)
                    except Exception as e:
                        success, msg = False, (
                            f"Failed to install plugin '{self}': " +
                            f"Could not move file '{src_file}' to '{dst_dir}'"
                        )
                        print(msg)
                        break
            if success is None:
                success, msg = True, success_msg
        else:
            success, msg = False, (
                f"Your installed version of plugin '{self}' ({old_version}) is higher than "
                + f"attempted version {new_version}."
            )

        shutil.rmtree(temp_dir)
        os.chdir(old_cwd)

        ### Reload the plugin's module.
        if '_module' in self.__dict__:
            del self.__dict__['_module']
        reload_plugins([self.name], debug=debug)

        ### if we've already failed, return here
        if not success or abort:
            _ongoing_installations.remove(self.full_name)
            return success, msg

        ### attempt to install dependencies
        if not self.install_dependencies(force=force, debug=debug):
            _ongoing_installations.remove(self.full_name)
            return False, f"Failed to install dependencies for plugin '{self}'."

        ### handling success tuple, bool, or other (typically None)
        setup_tuple = self.setup(debug=debug)
        if isinstance(setup_tuple, tuple):
            if not setup_tuple[0]:
                success, msg = setup_tuple
        elif isinstance(setup_tuple, bool):
            if not setup_tuple:
                success, msg = False, (
                    f"Failed to run post-install setup for plugin '{self}'." + '\n' +
                    f"Check `setup()` in '{self.__file__}' for more information " +
                    f"(no error message provided)."
                )
            else:
                success, msg = True, success_msg
        elif setup_tuple is None:
            success = True
            msg = (
                f"Post-install for plugin '{self}' returned None. " +
                f"Assuming plugin successfully installed."
            )
            warn(msg)
        else:
            success = False
            msg = (
                f"Post-install for plugin '{self}' returned unexpected value " +
                f"of type '{type(setup_tuple)}': {setup_tuple}"
            )

        _ongoing_installations.remove(self.full_name)
        return success, msg


    def remove_archive(
            self,        
            debug: bool = False
        ) -> SuccessTuple:
        """Remove a plugin's archive file."""
        if not self.archive_path.exists():
            return True, f"Archive file for plugin '{self}' does not exist."
        try:
            self.archive_path.unlink()
        except Exception as e:
            return False, f"Failed to remove archive for plugin '{self}':\n{e}"
        return True, "Success"


    def remove_venv(
            self,        
            debug: bool = False
        ) -> SuccessTuple:
        """Remove a plugin's virtual environment."""
        if not self.venv_path.exists():
            return True, f"Virtual environment for plugin '{self}' does not exist."
        try:
            shutil.rmtree(self.venv_path)
        except Exception as e:
            return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}"
        return True, "Success"


    def uninstall(self, debug: bool = False) -> SuccessTuple:
        """
        Remove a plugin, its virtual environment, and archive file.
        """
        from meerschaum.utils.warnings import warn, info
        warnings_thrown_count: int = 0
        max_warnings: int = 3

        if not self.is_installed():
            info(
                f"Plugin '{self.name}' doesn't seem to be installed.\n    "
                + "Checking for artifacts...",
                stack = False,
            )
        else:
            try:
                if '__init__.py' in self.__file__:
                    shutil.rmtree(self.__file__.replace('__init__.py', ''))
                else:
                    os.remove(self.__file__)
            except Exception as e:
                warn(f"Could not remove source files of plugin '{self.name}'.", stack=False)
                warnings_thrown_count += 1
            else:
                info(f"Removed source files for plugin '{self.name}'.")

        if self.venv_path.exists():
            success, msg = self.remove_venv(debug=debug)
            if not success:
                warn(msg, stack=False)
                warnings_thrown_count += 1
            else:
                info(f"Removed virtual environment from plugin '{self.name}'.")

        success = warnings_thrown_count < max_warnings
        return success, (
            f"Successfully uninstalled plugin '{self}'." if success
            else f"Failed to uninstall plugin '{self}'."
        )


    def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]:
        """
        If exists, run the plugin's `setup()` function.

        Parameters
        ----------
        *args: str
            The positional arguments passed to the `setup()` function.
            
        debug: bool, default False
            Verbosity toggle.

        **kw: Any
            The keyword arguments passed to the `setup()` function.

        Returns
        -------
        A `SuccessTuple` or `bool` indicating success.

        """
        from meerschaum.utils.debug import dprint
        import inspect
        _setup = None
        for name, fp in inspect.getmembers(self.module):
            if name == 'setup' and inspect.isfunction(fp):
                _setup = fp
                break

        ### assume success if no setup() is found (not necessary)
        if _setup is None:
            return True

        sig = inspect.signature(_setup)
        has_debug, has_kw = ('debug' in sig.parameters), False
        for k, v in sig.parameters.items():
            if '**' in str(v):
                has_kw = True
                break

        _kw = {}
        if has_kw:
            _kw.update(kw)
        if has_debug:
            _kw['debug'] = debug

        if debug:
            dprint(f"Running setup for plugin '{self}'...")
        try:
            self.activate_venv(debug=debug)
            return_tuple = _setup(*args, **_kw)
            self.deactivate_venv(debug=debug)
        except Exception as e:
            return False, str(e)

        if isinstance(return_tuple, tuple):
            return return_tuple
        if isinstance(return_tuple, bool):
            return return_tuple, f"Setup for Plugin '{self.name}' did not return a message."
        if return_tuple is None:
            return False, f"Setup for Plugin '{self.name}' returned None."
        return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"


    def get_dependencies(
            self,
            debug: bool = False,
        ) -> List[str]:
        """
        If the Plugin has specified dependencies in a list called `required`, return the list.
        
        **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages.
        Meerschaum plugins may also specify connector keys for a repo after `'@'`.

        Parameters
        ----------
        debug: bool, default False
            Verbosity toggle.

        Returns
        -------
        A list of required packages and plugins (str).

        """
        if '_required' in self.__dict__:
            return self._required

        ### If the plugin has not yet been imported,
        ### infer the dependencies from the source text.
        ### This is not super robust, and it doesn't feel right
        ### having multiple versions of the logic.
        ### This is necessary when determining the activation order
        ### without having import the module.
        ### For consistency's sake, the module-less method does not cache the requirements.
        if self.__dict__.get('_module', None) is None:
            with open(self.__file__, 'r', encoding='utf-8') as f:
                text = f.read()

            if 'required' not in text:
                return []

            ### This has some limitations:
            ### It relies on `required` being manually declared.
            ### We lose the ability to dynamically alter the `required` list,
            ### which is why we've kept the module-reliant method below.
            import ast, re
            ### NOTE: This technically would break 
            ### if `required` was the very first line of the file.
            req_start_match = re.search(r'\nrequired(\s?)=', text)
            if not req_start_match:
                return []
            req_start = req_start_match.start()
            req_end   = req_start + 1 + text[req_start:].find(']')
            if req_end == -1:
                return []
            req_text = (
                text[req_start:req_end]
                .lstrip()
                .replace('required', '', 1)
                .lstrip()
                .replace('=', '', 1)
                .lstrip()
            )
            try:
                required = ast.literal_eval(req_text)
            except Exception as e:
                warn(
                    f"Unable to determine requirements for plugin '{self.name}' "
                    + "without importing the module.\n"
                    + "    This may be due to dynamically setting the global `required` list.\n"
                    + f"    {e}"
                )
                return []
            return required

        import inspect
        self.activate_venv(dependencies=False, debug=debug)
        required = []
        for name, val in inspect.getmembers(self.module):
            if name == 'required':
                required = val
                break
        self._required = required
        self.deactivate_venv(dependencies=False, debug=debug)
        return required


    def get_required_plugins(self, debug: bool=False) -> List[meerschaum.plugins.Plugin]:
        """
        Return a list of required Plugin objects.
        """
        from meerschaum.utils.warnings import warn
        from meerschaum.config import get_config
        from meerschaum.config.static import STATIC_CONFIG
        plugins = []
        _deps = self.get_dependencies(debug=debug)
        sep = STATIC_CONFIG['plugins']['repo_separator']
        plugin_names = [
            _d[len('plugin:'):] for _d in _deps
            if _d.startswith('plugin:') and len(_d) > len('plugin:')
        ]
        default_repo_keys = get_config('meerschaum', 'default_repository')
        for _plugin_name in plugin_names:
            if sep in _plugin_name:
                try:
                    _plugin_name, _repo_keys = _plugin_name.split(sep)
                except Exception as e:
                    _repo_keys = default_repo_keys
                    warn(
                        f"Invalid repo keys for required plugin '{_plugin_name}'.\n    "
                        + f"Will try to use '{_repo_keys}' instead.",
                        stack = False,
                    )
            else:
                _repo_keys = default_repo_keys
            plugins.append(Plugin(_plugin_name, repo=_repo_keys))
        return plugins


    def get_required_packages(self, debug: bool=False) -> List[str]:
        """
        Return the required package names (excluding plugins).
        """
        _deps = self.get_dependencies(debug=debug)
        return [_d for _d in _deps if not _d.startswith('plugin:')]


    def activate_venv(self, dependencies: bool=True, debug: bool=False, **kw) -> bool:
        """
        Activate the virtual environments for the plugin and its dependencies.

        Parameters
        ----------
        dependencies: bool, default True
            If `True`, activate the virtual environments for required plugins.

        Returns
        -------
        A bool indicating success.
        """
        from meerschaum.utils.venv import venv_target_path
        from meerschaum.utils.packages import activate_venv
        from meerschaum.utils.misc import make_symlink, is_symlink
        from meerschaum.config._paths import PACKAGE_ROOT_PATH

        if dependencies:
            for plugin in self.get_required_plugins(debug=debug):
                plugin.activate_venv(debug=debug, **kw)

        vtp = venv_target_path(self.name, debug=debug)
        venv_meerschaum_path = vtp / 'meerschaum'

        try:
            success, msg = True, "Success"
            if is_symlink(venv_meerschaum_path):
                if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH:
                    venv_meerschaum_path.unlink()
                    success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH)
        except Exception as e:
            success, msg = False, str(e)
        if not success:
            warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}")

        return activate_venv(self.name, debug=debug, **kw)


    def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool:
        """
        Deactivate the virtual environments for the plugin and its dependencies.

        Parameters
        ----------
        dependencies: bool, default True
            If `True`, deactivate the virtual environments for required plugins.

        Returns
        -------
        A bool indicating success.
        """
        from meerschaum.utils.packages import deactivate_venv
        success = deactivate_venv(self.name, debug=debug, **kw)
        if dependencies:
            for plugin in self.get_required_plugins(debug=debug):
                plugin.deactivate_venv(debug=debug, **kw)
        return success


    def install_dependencies(
            self,
            force: bool = False,
            debug: bool = False,
        ) -> bool:
        """
        If specified, install dependencies.
        
        **NOTE:** Dependencies that start with `'plugin:'` will be installed as
        Meerschaum plugins from the same repository as this Plugin.
        To install from a different repository, add the repo keys after `'@'`
        (e.g. `'plugin:foo@api:bar'`).

        Parameters
        ----------
        force: bool, default False
            If `True`, continue with the installation, even if some
            required packages fail to install.

        debug: bool, default False
            Verbosity toggle.

        Returns
        -------
        A bool indicating success.

        """
        from meerschaum.utils.packages import pip_install, venv_contains_package
        from meerschaum.utils.debug import dprint
        from meerschaum.utils.warnings import warn, info
        from meerschaum.connectors.parse import parse_repo_keys
        _deps = self.get_dependencies(debug=debug)
        if not _deps and self.requirements_file_path is None:
            return True

        plugins = self.get_required_plugins(debug=debug)
        for _plugin in plugins:
            if _plugin.name == self.name:
                warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False)
                continue
            _success, _msg = _plugin.repo_connector.install_plugin(
                _plugin.name, debug=debug, force=force
            )
            if not _success:
                warn(
                    f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'"
                    + f" for plugin '{self.name}':\n" + _msg,
                    stack = False,
                )
                if not force:
                    warn(
                        "Try installing with the `--force` flag to continue anyway.",
                        stack = False,
                    )
                    return False
                info(
                    "Continuing with installation despite the failure "
                    + "(careful, things might be broken!)...",
                    icon = False
                )


        ### First step: parse `requirements.txt` if it exists.
        if self.requirements_file_path is not None:
            if not pip_install(
                requirements_file_path=self.requirements_file_path,
                venv=self.name, debug=debug
            ):
                warn(
                    f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.",
                    stack = False,
                )
                if not force:
                    warn(
                        "Try installing with `--force` to continue anyway.",
                        stack = False,
                    )
                    return False
                info(
                    "Continuing with installation despite the failure "
                    + "(careful, things might be broken!)...",
                    icon = False
                )


        ### Don't reinstall packages that are already included in required plugins.
        packages = []
        _packages = self.get_required_packages(debug=debug)
        accounted_for_packages = set()
        for package_name in _packages:
            for plugin in plugins:
                if venv_contains_package(package_name, plugin.name):
                    accounted_for_packages.add(package_name)
                    break
        packages = [pkg for pkg in _packages if pkg not in accounted_for_packages]

        ### Attempt pip packages installation.
        if packages:
            for package in packages:
                if not pip_install(package, venv=self.name, debug=debug):
                    warn(
                        f"Failed to install required package '{package}'"
                        + f" for plugin '{self.name}'.",
                        stack = False,
                    )
                    if not force:
                        warn(
                            "Try installing with `--force` to continue anyway.",
                            stack = False,
                        )
                        return False
                    info(
                        "Continuing with installation despite the failure "
                        + "(careful, things might be broken!)...",
                        icon = False
                    )
        return True


    @property
    def full_name(self) -> str:
        """
        Include the repo keys with the plugin's name.
        """
        from meerschaum.config.static import STATIC_CONFIG
        sep = STATIC_CONFIG['plugins']['repo_separator']
        return self.name + sep + str(self.repo_connector)


    def __str__(self):
        return self.name


    def __repr__(self):
        return f"Plugin('{self.name}', repo='{self.repo_connector}')"


    def __del__(self):
        pass

Instance variables

var full_name : str

Include the repo keys with the plugin's name.

Expand source code
@property
def full_name(self) -> str:
    """
    Include the repo keys with the plugin's name.
    """
    from meerschaum.config.static import STATIC_CONFIG
    sep = STATIC_CONFIG['plugins']['repo_separator']
    return self.name + sep + str(self.repo_connector)
var module

Return the Python module of the underlying plugin.

Expand source code
@property
def module(self):
    """
    Return the Python module of the underlying plugin.
    """
    if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None:
        if self.__file__ is None:
            return None
        from meerschaum.plugins import import_plugins
        self._module = import_plugins(str(self), warn=False)
    return self._module
var repo_connector

Return the repository connector for this plugin. NOTE: This imports the meerschaum.connectors module, which imports certain plugin modules.

Expand source code
@property
def repo_connector(self):
    """
    Return the repository connector for this plugin.
    NOTE: This imports the `connectors` module, which imports certain plugin modules.
    """
    if self._repo_connector is None:
        from meerschaum.connectors.parse import parse_repo_keys

        repo_keys = self._repo_keys or self._repo_in_name
        if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name:
            error(
                f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'."
            )
        repo_connector = parse_repo_keys(repo_keys)
        self._repo_connector = repo_connector
    return self._repo_connector
var requirements_file_path : Union[pathlib.Path, NoneType]

If a file named requirements.txt exists, return its path.

Expand source code
@property
def requirements_file_path(self) -> Union[pathlib.Path, None]:
    """
    If a file named `requirements.txt` exists, return its path.
    """
    if self.__file__ is None:
        return None
    path = pathlib.Path(self.__file__).parent / 'requirements.txt'
    if not path.exists():
        return None
    return path
var version

Return the plugin's module version is defined (__version__) if it's defined.

Expand source code
@property
def version(self):
    """
    Return the plugin's module version is defined (`__version__`) if it's defined.
    """
    if self._version is None:
        try:
            self._version = self.module.__version__
        except Exception as e:
            self._version = None
    return self._version

Methods

def activate_venv(self, dependencies: bool = True, debug: bool = False, **kw) ‑> bool

Activate the virtual environments for the plugin and its dependencies.

Parameters

dependencies : bool, default True
If True, activate the virtual environments for required plugins.

Returns

A bool indicating success.

Expand source code
def activate_venv(self, dependencies: bool=True, debug: bool=False, **kw) -> bool:
    """
    Activate the virtual environments for the plugin and its dependencies.

    Parameters
    ----------
    dependencies: bool, default True
        If `True`, activate the virtual environments for required plugins.

    Returns
    -------
    A bool indicating success.
    """
    from meerschaum.utils.venv import venv_target_path
    from meerschaum.utils.packages import activate_venv
    from meerschaum.utils.misc import make_symlink, is_symlink
    from meerschaum.config._paths import PACKAGE_ROOT_PATH

    if dependencies:
        for plugin in self.get_required_plugins(debug=debug):
            plugin.activate_venv(debug=debug, **kw)

    vtp = venv_target_path(self.name, debug=debug)
    venv_meerschaum_path = vtp / 'meerschaum'

    try:
        success, msg = True, "Success"
        if is_symlink(venv_meerschaum_path):
            if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH:
                venv_meerschaum_path.unlink()
                success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH)
    except Exception as e:
        success, msg = False, str(e)
    if not success:
        warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}")

    return activate_venv(self.name, debug=debug, **kw)
def deactivate_venv(self, dependencies: bool = True, debug: bool = False, **kw) ‑> bool

Deactivate the virtual environments for the plugin and its dependencies.

Parameters

dependencies : bool, default True
If True, deactivate the virtual environments for required plugins.

Returns

A bool indicating success.

Expand source code
def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool:
    """
    Deactivate the virtual environments for the plugin and its dependencies.

    Parameters
    ----------
    dependencies: bool, default True
        If `True`, deactivate the virtual environments for required plugins.

    Returns
    -------
    A bool indicating success.
    """
    from meerschaum.utils.packages import deactivate_venv
    success = deactivate_venv(self.name, debug=debug, **kw)
    if dependencies:
        for plugin in self.get_required_plugins(debug=debug):
            plugin.deactivate_venv(debug=debug, **kw)
    return success
def get_dependencies(self, debug: bool = False) ‑> List[str]

If the Plugin has specified dependencies in a list called required, return the list.

NOTE: Dependecies which start with 'plugin:' are Meerschaum plugins, not pip packages. Meerschaum plugins may also specify connector keys for a repo after '@'.

Parameters

debug : bool, default False
Verbosity toggle.

Returns

A list of required packages and plugins (str).

Expand source code
def get_dependencies(
        self,
        debug: bool = False,
    ) -> List[str]:
    """
    If the Plugin has specified dependencies in a list called `required`, return the list.
    
    **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages.
    Meerschaum plugins may also specify connector keys for a repo after `'@'`.

    Parameters
    ----------
    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A list of required packages and plugins (str).

    """
    if '_required' in self.__dict__:
        return self._required

    ### If the plugin has not yet been imported,
    ### infer the dependencies from the source text.
    ### This is not super robust, and it doesn't feel right
    ### having multiple versions of the logic.
    ### This is necessary when determining the activation order
    ### without having import the module.
    ### For consistency's sake, the module-less method does not cache the requirements.
    if self.__dict__.get('_module', None) is None:
        with open(self.__file__, 'r', encoding='utf-8') as f:
            text = f.read()

        if 'required' not in text:
            return []

        ### This has some limitations:
        ### It relies on `required` being manually declared.
        ### We lose the ability to dynamically alter the `required` list,
        ### which is why we've kept the module-reliant method below.
        import ast, re
        ### NOTE: This technically would break 
        ### if `required` was the very first line of the file.
        req_start_match = re.search(r'\nrequired(\s?)=', text)
        if not req_start_match:
            return []
        req_start = req_start_match.start()
        req_end   = req_start + 1 + text[req_start:].find(']')
        if req_end == -1:
            return []
        req_text = (
            text[req_start:req_end]
            .lstrip()
            .replace('required', '', 1)
            .lstrip()
            .replace('=', '', 1)
            .lstrip()
        )
        try:
            required = ast.literal_eval(req_text)
        except Exception as e:
            warn(
                f"Unable to determine requirements for plugin '{self.name}' "
                + "without importing the module.\n"
                + "    This may be due to dynamically setting the global `required` list.\n"
                + f"    {e}"
            )
            return []
        return required

    import inspect
    self.activate_venv(dependencies=False, debug=debug)
    required = []
    for name, val in inspect.getmembers(self.module):
        if name == 'required':
            required = val
            break
    self._required = required
    self.deactivate_venv(dependencies=False, debug=debug)
    return required
def get_required_packages(self, debug: bool = False) ‑> List[str]

Return the required package names (excluding plugins).

Expand source code
def get_required_packages(self, debug: bool=False) -> List[str]:
    """
    Return the required package names (excluding plugins).
    """
    _deps = self.get_dependencies(debug=debug)
    return [_d for _d in _deps if not _d.startswith('plugin:')]
def get_required_plugins(self, debug: bool = False) ‑> List[Plugin]

Return a list of required Plugin objects.

Expand source code
def get_required_plugins(self, debug: bool=False) -> List[meerschaum.plugins.Plugin]:
    """
    Return a list of required Plugin objects.
    """
    from meerschaum.utils.warnings import warn
    from meerschaum.config import get_config
    from meerschaum.config.static import STATIC_CONFIG
    plugins = []
    _deps = self.get_dependencies(debug=debug)
    sep = STATIC_CONFIG['plugins']['repo_separator']
    plugin_names = [
        _d[len('plugin:'):] for _d in _deps
        if _d.startswith('plugin:') and len(_d) > len('plugin:')
    ]
    default_repo_keys = get_config('meerschaum', 'default_repository')
    for _plugin_name in plugin_names:
        if sep in _plugin_name:
            try:
                _plugin_name, _repo_keys = _plugin_name.split(sep)
            except Exception as e:
                _repo_keys = default_repo_keys
                warn(
                    f"Invalid repo keys for required plugin '{_plugin_name}'.\n    "
                    + f"Will try to use '{_repo_keys}' instead.",
                    stack = False,
                )
        else:
            _repo_keys = default_repo_keys
        plugins.append(Plugin(_plugin_name, repo=_repo_keys))
    return plugins
def install(self, force: bool = False, debug: bool = False) ‑> Tuple[bool, str]

Extract a plugin's tar archive to the plugins directory.

This function checks if the plugin is already installed and if the version is equal or greater than the existing installation.

Parameters

force : bool, default False
If True, continue with installation, even if required packages fail to install.
debug : bool, default False
Verbosity toggle.

Returns

A SuccessTuple of success (bool) and a message (str).

Expand source code
def install(
        self,
        force: bool = False,
        debug: bool = False,
    ) -> SuccessTuple:
    """
    Extract a plugin's tar archive to the plugins directory.
    
    This function checks if the plugin is already installed and if the version is equal or
    greater than the existing installation.

    Parameters
    ----------
    force: bool, default False
        If `True`, continue with installation, even if required packages fail to install.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `SuccessTuple` of success (bool) and a message (str).

    """
    if self.full_name in _ongoing_installations:
        return True, "Already installing plugin '{self}'."
    _ongoing_installations.add(self.full_name)
    from meerschaum.utils.warnings import warn, error
    if debug:
        from meerschaum.utils.debug import dprint
    import tarfile
    from meerschaum.plugins import reload_plugins
    from meerschaum.utils.packages import attempt_import, determine_version
    from meerschaum.utils.venv import init_venv
    old_cwd = os.getcwd()
    old_version = ''
    new_version = ''
    temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name
    temp_dir.mkdir(exist_ok=True)

    if not self.archive_path.exists():
        return False, f"Missing archive file for plugin '{self}'."
    if self.version is not None:
        old_version = self.version
        if debug:
            dprint(f"Found existing version '{old_version}' for plugin '{self}'.")
    try:
        with tarfile.open(self.archive_path, 'r:gz') as tarf:
            tarf.extractall(temp_dir)
    except Exception as e:
        warn(e)
        return False, f"Failed to extract plugin '{self.name}'."

    if debug:
        dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...")

    ### search for version information
    files = os.listdir(temp_dir)
    
    if str(files[0]) == self.name:
        is_dir = True
    elif str(files[0]) == self.name + '.py':
        is_dir = False
    else:
        error(f"Unknown format encountered for plugin '{self}'.")

    fpath = temp_dir / files[0]
    if is_dir:
        fpath = fpath / '__init__.py'

    init_venv(self.name, debug=debug)
    new_version = determine_version(
        fpath,
        import_name = self.name,
        search_for_metadata = False,
        warn = True,
        debug = debug,
        venv = self.name,
    )
    if not new_version:
        warn(
            f"No `__version__` defined for plugin '{self}'. "
            + "Assuming new version...",
            stack = False,
        )

    packaging_version = attempt_import('packaging.version')
    try:
        is_new_version = (not new_version and not old_version) or (
            packaging_version.parse(old_version) < packaging_version.parse(new_version)
        )
        is_same_version = new_version and old_version and (
            packaging_version.parse(old_version) == packaging_version.parse(new_version)
        )
    except Exception as e:
        is_new_version, is_same_version = True, False
    success_msg = f"Successfully installed plugin '{self}'."
    success, abort = None, None
    if is_same_version and not force:
        success, msg = True, (
            f"Plugin '{self}' is up-to-date (version {old_version}).\n" +
            "    Install again with `-f` or `--force` to reinstall."
        )
        abort = True
    elif is_new_version or force:
        for src_dir, dirs, files in os.walk(temp_dir):
            if success is not None:
                break
            dst_dir = str(src_dir).replace(str(temp_dir), str(PLUGINS_RESOURCES_PATH))
            if not os.path.exists(dst_dir):
                os.mkdir(dst_dir)
            for f in files:
                src_file = os.path.join(src_dir, f)
                dst_file = os.path.join(dst_dir, f)
                if os.path.exists(dst_file):
                    os.remove(dst_file)

                if debug:
                    dprint(f"Moving '{src_file}' to '{dst_dir}'...")
                try:
                    shutil.move(src_file, dst_dir)
                except Exception as e:
                    success, msg = False, (
                        f"Failed to install plugin '{self}': " +
                        f"Could not move file '{src_file}' to '{dst_dir}'"
                    )
                    print(msg)
                    break
        if success is None:
            success, msg = True, success_msg
    else:
        success, msg = False, (
            f"Your installed version of plugin '{self}' ({old_version}) is higher than "
            + f"attempted version {new_version}."
        )

    shutil.rmtree(temp_dir)
    os.chdir(old_cwd)

    ### Reload the plugin's module.
    if '_module' in self.__dict__:
        del self.__dict__['_module']
    reload_plugins([self.name], debug=debug)

    ### if we've already failed, return here
    if not success or abort:
        _ongoing_installations.remove(self.full_name)
        return success, msg

    ### attempt to install dependencies
    if not self.install_dependencies(force=force, debug=debug):
        _ongoing_installations.remove(self.full_name)
        return False, f"Failed to install dependencies for plugin '{self}'."

    ### handling success tuple, bool, or other (typically None)
    setup_tuple = self.setup(debug=debug)
    if isinstance(setup_tuple, tuple):
        if not setup_tuple[0]:
            success, msg = setup_tuple
    elif isinstance(setup_tuple, bool):
        if not setup_tuple:
            success, msg = False, (
                f"Failed to run post-install setup for plugin '{self}'." + '\n' +
                f"Check `setup()` in '{self.__file__}' for more information " +
                f"(no error message provided)."
            )
        else:
            success, msg = True, success_msg
    elif setup_tuple is None:
        success = True
        msg = (
            f"Post-install for plugin '{self}' returned None. " +
            f"Assuming plugin successfully installed."
        )
        warn(msg)
    else:
        success = False
        msg = (
            f"Post-install for plugin '{self}' returned unexpected value " +
            f"of type '{type(setup_tuple)}': {setup_tuple}"
        )

    _ongoing_installations.remove(self.full_name)
    return success, msg
def install_dependencies(self, force: bool = False, debug: bool = False) ‑> bool

If specified, install dependencies.

NOTE: Dependencies that start with 'plugin:' will be installed as Meerschaum plugins from the same repository as this Plugin. To install from a different repository, add the repo keys after '@' (e.g. 'plugin:foo@api:bar').

Parameters

force : bool, default False
If True, continue with the installation, even if some required packages fail to install.
debug : bool, default False
Verbosity toggle.

Returns

A bool indicating success.

Expand source code
def install_dependencies(
        self,
        force: bool = False,
        debug: bool = False,
    ) -> bool:
    """
    If specified, install dependencies.
    
    **NOTE:** Dependencies that start with `'plugin:'` will be installed as
    Meerschaum plugins from the same repository as this Plugin.
    To install from a different repository, add the repo keys after `'@'`
    (e.g. `'plugin:foo@api:bar'`).

    Parameters
    ----------
    force: bool, default False
        If `True`, continue with the installation, even if some
        required packages fail to install.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A bool indicating success.

    """
    from meerschaum.utils.packages import pip_install, venv_contains_package
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.warnings import warn, info
    from meerschaum.connectors.parse import parse_repo_keys
    _deps = self.get_dependencies(debug=debug)
    if not _deps and self.requirements_file_path is None:
        return True

    plugins = self.get_required_plugins(debug=debug)
    for _plugin in plugins:
        if _plugin.name == self.name:
            warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False)
            continue
        _success, _msg = _plugin.repo_connector.install_plugin(
            _plugin.name, debug=debug, force=force
        )
        if not _success:
            warn(
                f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'"
                + f" for plugin '{self.name}':\n" + _msg,
                stack = False,
            )
            if not force:
                warn(
                    "Try installing with the `--force` flag to continue anyway.",
                    stack = False,
                )
                return False
            info(
                "Continuing with installation despite the failure "
                + "(careful, things might be broken!)...",
                icon = False
            )


    ### First step: parse `requirements.txt` if it exists.
    if self.requirements_file_path is not None:
        if not pip_install(
            requirements_file_path=self.requirements_file_path,
            venv=self.name, debug=debug
        ):
            warn(
                f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.",
                stack = False,
            )
            if not force:
                warn(
                    "Try installing with `--force` to continue anyway.",
                    stack = False,
                )
                return False
            info(
                "Continuing with installation despite the failure "
                + "(careful, things might be broken!)...",
                icon = False
            )


    ### Don't reinstall packages that are already included in required plugins.
    packages = []
    _packages = self.get_required_packages(debug=debug)
    accounted_for_packages = set()
    for package_name in _packages:
        for plugin in plugins:
            if venv_contains_package(package_name, plugin.name):
                accounted_for_packages.add(package_name)
                break
    packages = [pkg for pkg in _packages if pkg not in accounted_for_packages]

    ### Attempt pip packages installation.
    if packages:
        for package in packages:
            if not pip_install(package, venv=self.name, debug=debug):
                warn(
                    f"Failed to install required package '{package}'"
                    + f" for plugin '{self.name}'.",
                    stack = False,
                )
                if not force:
                    warn(
                        "Try installing with `--force` to continue anyway.",
                        stack = False,
                    )
                    return False
                info(
                    "Continuing with installation despite the failure "
                    + "(careful, things might be broken!)...",
                    icon = False
                )
    return True
def is_installed(self, try_import: bool = True) ‑> bool

Check whether a plugin is correctly installed. NOTE: This plugin will import the plugin's module. Set try_import to False to avoid importing.

Parameters

try_import : bool, default True
If True, attempt importing the plugin's module.

Returns

A bool indicating whether a plugin exists and is successfully imported.

Expand source code
def is_installed(self, try_import: bool = True) -> bool:
    """
    Check whether a plugin is correctly installed.
    **NOTE:** This plugin will import the plugin's module.
    Set `try_import` to `False` to avoid importing.

    Parameters
    ----------
    try_import: bool, default True
        If `True`, attempt importing the plugin's module.            

    Returns
    -------
    A `bool` indicating whether a plugin exists and is successfully imported.
    """
    #  if not self.__file__:
        #  return False
    return self.__file__ is not None
    #  try:
        #  _installed = (
            #  self.__dict__.get('_module', None) is not None and self.__file__ is not None
        #  ) if try_import else (self.__file__ is not None)
    #  except ModuleNotFoundError as e:
        #  _installed = False
    #  return _installed
def make_tar(self, debug: bool = False) ‑> pathlib.Path

Compress the plugin's source files into a .tar.gz archive and return the archive's path.

Parameters

debug : bool, default False
Verbosity toggle.

Returns

A pathlib.Path to the archive file's path.

Expand source code
def make_tar(self, debug: bool = False) -> pathlib.Path:
    """
    Compress the plugin's source files into a `.tar.gz` archive and return the archive's path.

    Parameters
    ----------
    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `pathlib.Path` to the archive file's path.

    """
    import tarfile, pathlib, subprocess, fnmatch
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.packages import attempt_import
    pathspec = attempt_import('pathspec', debug=debug)

    old_cwd = os.getcwd()
    os.chdir(PLUGINS_RESOURCES_PATH)

    if not self.__file__:
        from meerschaum.utils.warnings import error
        error(f"Could not find file for plugin '{self}'.")
    if '__init__.py' in self.__file__ or os.path.isdir(self.__file__):
        path = self.__file__.replace('__init__.py', '')
        is_dir = True
    else:
        path = self.__file__
        is_dir = False

    default_patterns_to_ignore = [
        '.pyc',
        '__pycache__/',
        'eggs/',
        '__pypackages__/',
        '.git',
    ]

    def parse_gitignore() -> 'Set[str]':
        gitignore_path = pathlib.Path(path) / '.gitignore'
        if not gitignore_path.exists():
            return set()
        with open(gitignore_path, 'r', encoding='utf-8') as f:
            gitignore_text = f.read()
        return set(pathspec.PathSpec.from_lines(
            pathspec.patterns.GitWildMatchPattern,
            default_patterns_to_ignore + gitignore_text.splitlines()
        ).match_tree(path))

    patterns_to_ignore = parse_gitignore() if is_dir else set()

    if debug:
        dprint(f"Patterns to ignore:\n{patterns_to_ignore}")

    with tarfile.open(self.archive_path, 'w:gz') as tarf:
        if not is_dir:
            tarf.add(f"{self.name}.py")
        else:
            for root, dirs, files in os.walk(self.name):
                for f in files:
                    good_file = True
                    fp = os.path.join(root, f)
                    for pattern in patterns_to_ignore:
                        if pattern in str(fp) or f.startswith('.'):
                            good_file = False
                            break
                    if good_file:
                        if debug:
                            dprint(f"Adding '{fp}'...")
                        tarf.add(fp)

    ### clean up and change back to old directory
    os.chdir(old_cwd)

    ### change to 775 to avoid permissions issues with the API in a Docker container
    self.archive_path.chmod(0o775)

    if debug:
        dprint(f"Created archive '{self.archive_path}'.")
    return self.archive_path
def remove_archive(self, debug: bool = False) ‑> Tuple[bool, str]

Remove a plugin's archive file.

Expand source code
def remove_archive(
        self,        
        debug: bool = False
    ) -> SuccessTuple:
    """Remove a plugin's archive file."""
    if not self.archive_path.exists():
        return True, f"Archive file for plugin '{self}' does not exist."
    try:
        self.archive_path.unlink()
    except Exception as e:
        return False, f"Failed to remove archive for plugin '{self}':\n{e}"
    return True, "Success"
def remove_venv(self, debug: bool = False) ‑> Tuple[bool, str]

Remove a plugin's virtual environment.

Expand source code
def remove_venv(
        self,        
        debug: bool = False
    ) -> SuccessTuple:
    """Remove a plugin's virtual environment."""
    if not self.venv_path.exists():
        return True, f"Virtual environment for plugin '{self}' does not exist."
    try:
        shutil.rmtree(self.venv_path)
    except Exception as e:
        return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}"
    return True, "Success"
def setup(self, *args: str, debug: bool = False, **kw: Any) ‑> Union[Tuple[bool, str], bool]

If exists, run the plugin's setup() function.

Parameters

*args : str
The positional arguments passed to the setup() function.
debug : bool, default False
Verbosity toggle.
**kw : Any
The keyword arguments passed to the setup() function.

Returns

A SuccessTuple or bool indicating success.

Expand source code
def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]:
    """
    If exists, run the plugin's `setup()` function.

    Parameters
    ----------
    *args: str
        The positional arguments passed to the `setup()` function.
        
    debug: bool, default False
        Verbosity toggle.

    **kw: Any
        The keyword arguments passed to the `setup()` function.

    Returns
    -------
    A `SuccessTuple` or `bool` indicating success.

    """
    from meerschaum.utils.debug import dprint
    import inspect
    _setup = None
    for name, fp in inspect.getmembers(self.module):
        if name == 'setup' and inspect.isfunction(fp):
            _setup = fp
            break

    ### assume success if no setup() is found (not necessary)
    if _setup is None:
        return True

    sig = inspect.signature(_setup)
    has_debug, has_kw = ('debug' in sig.parameters), False
    for k, v in sig.parameters.items():
        if '**' in str(v):
            has_kw = True
            break

    _kw = {}
    if has_kw:
        _kw.update(kw)
    if has_debug:
        _kw['debug'] = debug

    if debug:
        dprint(f"Running setup for plugin '{self}'...")
    try:
        self.activate_venv(debug=debug)
        return_tuple = _setup(*args, **_kw)
        self.deactivate_venv(debug=debug)
    except Exception as e:
        return False, str(e)

    if isinstance(return_tuple, tuple):
        return return_tuple
    if isinstance(return_tuple, bool):
        return return_tuple, f"Setup for Plugin '{self.name}' did not return a message."
    if return_tuple is None:
        return False, f"Setup for Plugin '{self.name}' returned None."
    return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"
def uninstall(self, debug: bool = False) ‑> Tuple[bool, str]

Remove a plugin, its virtual environment, and archive file.

Expand source code
def uninstall(self, debug: bool = False) -> SuccessTuple:
    """
    Remove a plugin, its virtual environment, and archive file.
    """
    from meerschaum.utils.warnings import warn, info
    warnings_thrown_count: int = 0
    max_warnings: int = 3

    if not self.is_installed():
        info(
            f"Plugin '{self.name}' doesn't seem to be installed.\n    "
            + "Checking for artifacts...",
            stack = False,
        )
    else:
        try:
            if '__init__.py' in self.__file__:
                shutil.rmtree(self.__file__.replace('__init__.py', ''))
            else:
                os.remove(self.__file__)
        except Exception as e:
            warn(f"Could not remove source files of plugin '{self.name}'.", stack=False)
            warnings_thrown_count += 1
        else:
            info(f"Removed source files for plugin '{self.name}'.")

    if self.venv_path.exists():
        success, msg = self.remove_venv(debug=debug)
        if not success:
            warn(msg, stack=False)
            warnings_thrown_count += 1
        else:
            info(f"Removed virtual environment from plugin '{self.name}'.")

    success = warnings_thrown_count < max_warnings
    return success, (
        f"Successfully uninstalled plugin '{self}'." if success
        else f"Failed to uninstall plugin '{self}'."
    )