Package meerschaum

Meerschaum banner

PyPI GitHub Info Stats
PyPI GitHub Repo stars License Number of plugins
PyPI - Python Version GitHub Sponsors meerschaum Tutorials Number of registered users

Meerschaum demo

What is Meerschaum?

Meerschaum is a tool for quickly synchronizing time-series data streams called pipes. With Meerschaum, you can have a data visualization stack running in minutes.

Why Meerschaum?

If you've worked with time-series data, you know the headaches that come with ETL. Data engineering often gets in analysts' way, and when work needs to get done, every minute spent on pipelining is time taken away from real analysis.

Rather than copy / pasting your ETL scripts, simply build pipes with Meerschaum! Meerschaum gives you the tools to design your data streams how you like ― and don't worry — you can always incorporate Meerschaum into your existing systems!

Want to Learn More?

You can find a wealth of information at meerschaum.io!

Additionally, below are several articles published about Meerschaum:

Features

  • 📊 Built for Data Scientists and Analysts
    • Integrate with Pandas, Grafana and other popular data analysis tools.
    • Persist your dataframes and always get the latest data.
  • ⚡️ Production-Ready, Batteries Included
  • 🔌 Easily Expandable
  • Tailored for Your Experience
    • Rich CLI makes managing your data streams surprisingly enjoyable!
    • Web dashboard for those who prefer a more graphical experience.
    • Manage your database connections with Meerschaum connectors
    • Utility commands with sensible syntax let you control many pipes with grace.
  • 💼 Portable from the Start
    • The environment variable $MRSM_ROOT_DIR lets you emulate multiple installations and group together your instances.
    • No dependencies required; anything needed will be installed into a virtual environment.
    • Specify required packages for your plugins, and users will get those packages in a virtual environment.

Installation

For a more thorough setup guide, visit the Getting Started page at meerschaum.io.

TL;DR

pip install -U --user meerschaum
mrsm stack up -d db grafana
mrsm bootstrap pipes

Usage Documentation

Please visit meerschaum.io for setup, usage, and troubleshooting information. You can find technical documentation at docs.meerschaum.io, and here is a complete list of the Meerschaum actions.

>>> import meerschaum as mrsm
>>> pipe = mrsm.Pipe("plugin:noaa", "weather")
>>> df = pipe.get_data(begin='2022-02-02')
>>> df[['timestamp', 'station', 'temperature (wmoUnit:degC)']]
               timestamp station  temperature (wmoUnit:degC)
0    2022-03-29 09:54:00    KCEU                         8.3
1    2022-03-29 09:52:00    KATL                        10.6
2    2022-03-29 09:52:00    KCLT                         7.2
3    2022-03-29 08:54:00    KCEU                         8.3
4    2022-03-29 08:52:00    KATL                        11.1
...                  ...     ...                         ...
1626 2022-02-02 01:52:00    KATL                        10.0
1627 2022-02-02 01:52:00    KCLT                         7.8
1628 2022-02-02 00:54:00    KCEU                         8.3
1629 2022-02-02 00:52:00    KATL                        10.0
1630 2022-02-02 00:52:00    KCLT                         8.3

[1631 rows x 3 columns]
>>>

Plugins

Here is the list of community plugins and the public plugins repository.

For details on installing, using, and writing plugins, check out the plugins documentation at meerschaum.io.

Example Plugin

# ~/.config/meerschaum/plugins/example.py
__version__ = '0.0.1'
required = []

def register(pipe, **kw):
    return {
        'columns': {
            'datetime': 'dt',
            'id': 'id',
            'value': 'val',
        }
    }

def fetch(pipe, **kw):
    import datetime, random
    return {
        'dt': [datetime.datetime.utcnow()],
        'id': [1],
        'val': [random.randint(0, 100)],
    }

Support Meerschaum's Development

For consulting services and to support Meerschaum's development, please considering sponsoring me on GitHub sponsors.

Additionally, you can always buy me a coffee☕!

Expand source code
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8

"""
Copyright 2023 Bennett Meares

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

   http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from meerschaum.core.Pipe import Pipe
from meerschaum.plugins import Plugin
from meerschaum.utils import get_pipes
from meerschaum.utils.venv import Venv
from meerschaum.utils.formatting import pprint
from meerschaum._internal.docs import index as __doc__
from meerschaum.connectors import get_connector
from meerschaum.config import __version__, get_config
from meerschaum.utils.packages import attempt_import

__pdoc__ = {'gui': False, 'api': False, 'core': False,}
__all__ = (
    "Pipe",
    "get_pipes",
    "get_connector",
    "Plugin",
    "Venv",
    "Plugin",
    "pprint",
    "attempt_import",
)

Sub-modules

meerschaum.actions

Default actions available to the mrsm CLI.

meerschaum.config

Meerschaum v1.6.13

meerschaum.connectors

Create connectors with get_connector(). For ease of use, you can also import from the root meerschaum module:

```python-repl
>>> from …
meerschaum.plugins

Expose plugin management APIs from the meerschaum.plugins module.

meerschaum.utils

The utils module contains utility functions. These include tools from primary utilities (get_pipes) to miscellaneous helper functions.

Functions

def attempt_import(*names: List[str], lazy: bool = True, warn: bool = True, install: bool = True, venv: Optional[str] = 'mrsm', precheck: bool = True, split: bool = True, check_update: bool = False, check_pypi: bool = False, check_is_installed: bool = True, color: bool = True, debug: bool = False) ‑> Union['ModuleType', Tuple['ModuleType']]

Raise a warning if packages are not installed; otherwise import and return modules. If lazy is True, return lazy-imported modules.

Returns tuple of modules if multiple names are provided, else returns one module.

Parameters

names : List[str]
The packages to be imported.
lazy : bool, default True
If True, lazily load packages.
warn : bool, default True
If True, raise a warning if a package cannot be imported.
install : bool, default True
If True, attempt to install a missing package into the designated virtual environment. If check_update is True, install updates if available.
venv : Optional[str], default 'mrsm'
The virtual environment in which to search for packages and to install packages into.
precheck : bool, default True
If True, attempt to find module before importing (necessary for checking if modules exist and retaining lazy imports), otherwise assume lazy is False.
split : bool, default True
If True, split packages' names on '.'.
check_update : bool, default False
If True and install is True, install updates if the required minimum version does not match.
check_pypi : bool, default False
If True and check_update is True, check PyPI when determining whether an update is required.
check_is_installed : bool, default True
If True, check if the package is contained in the virtual environment.

Returns

The specified modules. If they're not available and install is True, it will first download them into a virtual environment and return the modules.

Examples

>>> pandas, sqlalchemy = attempt_import('pandas', 'sqlalchemy')
>>> pandas = attempt_import('pandas')
Expand source code
def attempt_import(
        *names: List[str],
        lazy: bool = True,
        warn: bool = True,
        install: bool = True,
        venv: Optional[str] = 'mrsm',
        precheck: bool = True,
        split: bool = True,
        check_update: bool = False,
        check_pypi: bool = False,
        check_is_installed: bool = True,
        color: bool = True,
        debug: bool = False
    ) -> Union['ModuleType', Tuple['ModuleType']]:
    """
    Raise a warning if packages are not installed; otherwise import and return modules.
    If `lazy` is `True`, return lazy-imported modules.
    
    Returns tuple of modules if multiple names are provided, else returns one module.
    
    Parameters
    ----------
    names: List[str]
        The packages to be imported.

    lazy: bool, default True
        If `True`, lazily load packages.

    warn: bool, default True
        If `True`, raise a warning if a package cannot be imported.

    install: bool, default True
        If `True`, attempt to install a missing package into the designated virtual environment.
        If `check_update` is True, install updates if available.

    venv: Optional[str], default 'mrsm'
        The virtual environment in which to search for packages and to install packages into.

    precheck: bool, default True
        If `True`, attempt to find module before importing (necessary for checking if modules exist
        and retaining lazy imports), otherwise assume lazy is `False`.

    split: bool, default True
        If `True`, split packages' names on `'.'`.

    check_update: bool, default False
        If `True` and `install` is `True`, install updates if the required minimum version
        does not match.

    check_pypi: bool, default False
        If `True` and `check_update` is `True`, check PyPI when determining whether
        an update is required.

    check_is_installed: bool, default True
        If `True`, check if the package is contained in the virtual environment.

    Returns
    -------
    The specified modules. If they're not available and `install` is `True`, it will first
    download them into a virtual environment and return the modules.

    Examples
    --------
    >>> pandas, sqlalchemy = attempt_import('pandas', 'sqlalchemy')
    >>> pandas = attempt_import('pandas')

    """

    import importlib.util

    ### to prevent recursion, check if parent Meerschaum package is being imported
    if names == ('meerschaum',):
        return _import_module('meerschaum')

    if venv == 'mrsm' and _import_hook_venv is not None:
        if debug:
            print(f"Import hook for virtual environment '{_import_hook_venv}' is active.")
        venv = _import_hook_venv

    _warnings = _import_module('meerschaum.utils.warnings')
    warn_function = _warnings.warn

    def do_import(_name: str, **kw) -> Union['ModuleType', None]:
        with Venv(venv=venv, debug=debug):
            ### determine the import method (lazy vs normal)
            from meerschaum.utils.misc import filter_keywords
            import_method = (
                _import_module if not lazy
                else lazy_import
            )
            try:
                mod = import_method(_name, **(filter_keywords(import_method, **kw)))
            except Exception as e:
                if warn:
                    import traceback
                    traceback.print_exception(type(e), e, e.__traceback__)
                    warn_function(
                        f"Failed to import module '{_name}'.\nException:\n{e}",
                        ImportWarning,
                        stacklevel = (5 if lazy else 4),
                        color = False,
                    )
                mod = None
        return mod

    modules = []
    for name in names:
        ### Check if package is a declared dependency.
        root_name = name.split('.')[0] if split else name
        install_name = _import_to_install_name(root_name)

        if install_name is None:
            install_name = root_name
            if warn and root_name != 'plugins':
                warn_function(
                    f"Package '{root_name}' is not declared in meerschaum.utils.packages.",
                    ImportWarning,
                    stacklevel = 3,
                    color = False
                )

        ### Determine if the package exists.
        if precheck is False:
            found_module = (
                do_import(
                    name, debug=debug, warn=False, venv=venv, color=color,
                    check_update=False, check_pypi=False, split=split,
                ) is not None
            )
        else:
            if check_is_installed:
                with _locks['_is_installed_first_check']:
                    if not _is_installed_first_check.get(name, False):
                        package_is_installed = is_installed(
                            name,
                            venv = venv,
                            split = split,
                            debug = debug,
                        )
                        _is_installed_first_check[name] = package_is_installed
                    else:
                        package_is_installed = _is_installed_first_check[name]
            else:
                package_is_installed = _is_installed_first_check.get(
                    name,
                    venv_contains_package(name, venv=venv, split=split, debug=debug)
                )
            found_module = package_is_installed

        if not found_module:
            if install:
                if not pip_install(
                    install_name,
                    venv = venv,
                    split = False,
                    check_update = check_update,
                    color = color,
                    debug = debug
                ) and warn:
                    warn_function(
                        f"Failed to install '{install_name}'.",
                        ImportWarning,
                        stacklevel = 3,
                        color = False,
                    )
            elif warn:
                ### Raise a warning if we can't find the package and install = False.
                warn_function(
                    (f"\n\nMissing package '{name}' from virtual environment '{venv}'; "
                     + "some features will not work correctly."
                     + f"\n\nSet install=True when calling attempt_import.\n"),
                    ImportWarning,
                    stacklevel = 3,
                    color = False,
                )

        ### Do the import. Will be lazy if lazy=True.
        m = do_import(
            name, debug=debug, warn=warn, venv=venv, color=color,
            check_update=check_update, check_pypi=check_pypi, install=install, split=split,
        )
        modules.append(m)

    modules = tuple(modules)
    if len(modules) == 1:
        return modules[0]
    return modules
def get_connector(type: str = None, label: str = None, refresh: bool = False, debug: bool = False, **kw: Any) ‑> meerschaum.connectors.Connector.Connector

Return existing connector or create new connection and store for reuse.

You can create new connectors if enough parameters are provided for the given type and flavor.

Parameters

type : Optional[str], default None
Connector type (sql, api, etc.). Defaults to the type of the configured instance_connector.
label : Optional[str], default None
Connector label (e.g. main). Defaults to 'main'.
refresh : bool, default False
Refresh the Connector instance / construct new object. Defaults to False.
kw : Any
Other arguments to pass to the Connector constructor. If the Connector has already been constructed and new arguments are provided, refresh is set to True and the old Connector is replaced.

Returns

A new Meerschaum connector (e.g. meerschaum.connectors.api.APIConnector, meerschaum.connectors.sql.SQLConnector).

Examples

The following parameters would create a new meerschaum.connectors.sql.SQLConnector that isn't in the configuration file.

>>> conn = get_connector(
...     type = 'sql',
...     label = 'newlabel',
...     flavor = 'sqlite',
...     database = '/file/path/to/database.db'
... )
>>>
Expand source code
def get_connector(
        type: str = None,
        label: str = None,
        refresh: bool = False,
        debug: bool = False,
        **kw: Any
    ) -> Connector:
    """
    Return existing connector or create new connection and store for reuse.
    
    You can create new connectors if enough parameters are provided for the given type and flavor.
    

    Parameters
    ----------
    type: Optional[str], default None
        Connector type (sql, api, etc.).
        Defaults to the type of the configured `instance_connector`.

    label: Optional[str], default None
        Connector label (e.g. main). Defaults to `'main'`.

    refresh: bool, default False
        Refresh the Connector instance / construct new object. Defaults to `False`.

    kw: Any
        Other arguments to pass to the Connector constructor.
        If the Connector has already been constructed and new arguments are provided,
        `refresh` is set to `True` and the old Connector is replaced.

    Returns
    -------
    A new Meerschaum connector (e.g. `meerschaum.connectors.api.APIConnector`,
    `meerschaum.connectors.sql.SQLConnector`).
    
    Examples
    --------
    The following parameters would create a new
    `meerschaum.connectors.sql.SQLConnector` that isn't in the configuration file.

    ```
    >>> conn = get_connector(
    ...     type = 'sql',
    ...     label = 'newlabel',
    ...     flavor = 'sqlite',
    ...     database = '/file/path/to/database.db'
    ... )
    >>>
    ```

    """
    from meerschaum.connectors.parse import parse_instance_keys
    from meerschaum.config import get_config
    from meerschaum.config.static import STATIC_CONFIG
    from meerschaum.utils.warnings import warn
    global _loaded_plugin_connectors
    if isinstance(type, str) and not label and ':' in type:
        type, label = type.split(':', maxsplit=1)
    with _locks['_loaded_plugin_connectors']:
        if not _loaded_plugin_connectors:
            load_plugin_connectors()
            _loaded_plugin_connectors = True
    if type is None and label is None:
        default_instance_keys = get_config('meerschaum', 'instance', patch=True)
        ### recursive call to get_connector
        return parse_instance_keys(default_instance_keys)

    ### NOTE: the default instance connector may not be main.
    ### Only fall back to 'main' if the type is provided by the label is omitted.
    label = label if label is not None else STATIC_CONFIG['connectors']['default_label']

    ### type might actually be a label. Check if so and raise a warning.
    if type not in connectors:
        possibilities, poss_msg = [], ""
        for _type in get_config('meerschaum', 'connectors'):
            if type in get_config('meerschaum', 'connectors', _type):
                possibilities.append(f"{_type}:{type}")
        if len(possibilities) > 0:
            poss_msg = " Did you mean"
            for poss in possibilities[:-1]:
                poss_msg += f" '{poss}',"
            if poss_msg.endswith(','):
                poss_msg = poss_msg[:-1]
            if len(possibilities) > 1:
                poss_msg += " or"
            poss_msg += f" '{possibilities[-1]}'?"

        warn(f"Cannot create Connector of type '{type}'." + poss_msg, stack=False)
        return None

    if 'sql' not in types:
        from meerschaum.connectors.plugin import PluginConnector
        with _locks['types']:
            types.update({
                'api'   : APIConnector,
                'sql'   : SQLConnector,
                'plugin': PluginConnector,
            })
    
    ### determine if we need to call the constructor
    if not refresh:
        ### see if any user-supplied arguments differ from the existing instance
        if label in connectors[type]:
            warning_message = None
            for attribute, value in kw.items():
                if attribute not in connectors[type][label].meta:
                    import inspect
                    cls = connectors[type][label].__class__
                    cls_init_signature = inspect.signature(cls)
                    cls_init_params = cls_init_signature.parameters
                    if attribute not in cls_init_params:
                        warning_message = (
                            f"Received new attribute '{attribute}' not present in connector " +
                            f"{connectors[type][label]}.\n"
                        )
                elif connectors[type][label].__dict__[attribute] != value:
                    warning_message = (
                        f"Mismatched values for attribute '{attribute}' in connector "
                        + f"'{connectors[type][label]}'.\n" +
                        f"  - Keyword value: '{value}'\n" +
                        f"  - Existing value: '{connectors[type][label].__dict__[attribute]}'\n"
                    )
            if warning_message is not None:
                warning_message += (
                    "\nSetting `refresh` to True and recreating connector with type:"
                    + f" '{type}' and label '{label}'."
                )
                refresh = True
                warn(warning_message)
        else: ### connector doesn't yet exist
            refresh = True

    ### only create an object if refresh is True
    ### (can be manually specified, otherwise determined above)
    if refresh:
        with _locks['connectors']:
            try:
                ### will raise an error if configuration is incorrect / missing
                conn = types[type](label=label, **kw)
                connectors[type][label] = conn
            except InvalidAttributesError as ie:
                warn(
                    f"Incorrect attributes for connector '{type}:{label}'.\n"
                    + str(ie),
                    stack = False,
                )
                conn = None
            except Exception as e:
                from meerschaum.utils.formatting import get_console
                console = get_console()
                if console:
                    console.print_exception()
                warn(
                    f"Exception when creating connector '{type}:{label}'.\n" + str(e),
                    stack = False,
                )
                conn = None
        if conn is None:
            return None

    return connectors[type][label]
def get_pipes(connector_keys: Union[str, List[str], None] = None, metric_keys: Union[str, List[str], None] = None, location_keys: Union[str, List[str], None] = None, tags: Optional[List[str], None] = None, params: Optional[Dict[str, Any]] = None, mrsm_instance: Union[str, InstanceConnector, None] = None, instance: Union[str, InstanceConnector, None] = None, as_list: bool = False, method: str = 'registered', wait: bool = False, debug: bool = False, **kw: Any) ‑> Union[PipesDict, List['Pipe']]

Return a dictionary or list of Pipe objects.

Parameters

connector_keys : Union[str, List[str], None], default None
String or list of connector keys. If omitted or is '*', fetch all possible keys. If a string begins with '_', select keys that do NOT match the string.
metric_keys : Union[str, List[str], None], default None
String or list of metric keys. See connector_keys for formatting.
location_keys : Union[str, List[str], None], default None
String or list of location keys. See connector_keys for formatting.
tags : Optional[List[str]], default None
If provided, only include pipes with these tags.
params : Optional[Dict[str, Any]], default None
Dictionary of additional parameters to search by. Params are parsed into a SQL WHERE clause. E.g. {'a': 1, 'b': 2} equates to 'WHERE a = 1 AND b = 2'
mrsm_instance : Union[str, InstanceConnector, None], default None
Connector keys for the Meerschaum instance of the pipes. Must be a SQLConnector or APIConnector.
as_list : bool, default False
If True, return pipes in a list instead of a hierarchical dictionary. False : {connector_keys: {metric_key: {location_key: Pipe}}} True : [Pipe]
method : str, default 'registered'
Available options: ['registered', 'explicit', 'all'] If 'registered' (default), create pipes based on registered keys in the connector's pipes table (API or SQL connector, depends on mrsm_instance). If 'explicit', create pipes from provided connector_keys, metric_keys, and location_keys instead of consulting the pipes table. Useful for creating non-existent pipes. If 'all', create pipes from predefined metrics and locations. Required connector_keys. NOTE: Method 'all' is not implemented!
wait : bool, default False
Wait for a connection before getting Pipes. Should only be true for cases where the database might not be running (like the API).
**kw : Any:
Keyword arguments to pass to the Pipe constructor.

Returns

A dictionary of dictionaries and Pipe objects in the connector, metric, location hierarchy. If as_list is True, return a list of Pipe objects.

Examples

>>> ### Manual definition:
>>> pipes = {
...     <connector_keys>: {
...         <metric_key>: {
...             <location_key>: Pipe(
...                 <connector_keys>,
...                 <metric_key>,
...                 <location_key>,
...             ),
...         },
...     },
... },
>>> ### Accessing a single pipe:
>>> pipes['sql:main']['weather'][None]
>>> ### Return a list instead:
>>> get_pipes(as_list=True)
[sql_main_weather]
>>> 
Expand source code
def get_pipes(
        connector_keys: Union[str, List[str], None] = None,
        metric_keys: Union[str, List[str], None] = None,
        location_keys: Union[str, List[str], None] = None,
        tags: Optional[List[str], None] = None,
        params: Optional[Dict[str, Any]] = None,
        mrsm_instance: Union[str, InstanceConnector, None] = None,
        instance: Union[str, InstanceConnector, None] = None,
        as_list: bool = False,
        method: str = 'registered',
        wait: bool = False,
        debug: bool = False,
        **kw: Any
    ) -> Union[PipesDict, List['meerschaum.Pipe']]:
    """
    Return a dictionary or list of `meerschaum.Pipe` objects.

    Parameters
    ----------
    connector_keys: Union[str, List[str], None], default None
        String or list of connector keys.
        If omitted or is `'*'`, fetch all possible keys.
        If a string begins with `'_'`, select keys that do NOT match the string.

    metric_keys: Union[str, List[str], None], default None
        String or list of metric keys. See `connector_keys` for formatting.

    location_keys: Union[str, List[str], None], default None
        String or list of location keys. See `connector_keys` for formatting.

    tags: Optional[List[str]], default None
         If provided, only include pipes with these tags.

    params: Optional[Dict[str, Any]], default None
        Dictionary of additional parameters to search by.
        Params are parsed into a SQL WHERE clause.
        E.g. `{'a': 1, 'b': 2}` equates to `'WHERE a = 1 AND b = 2'`

    mrsm_instance: Union[str, InstanceConnector, None], default None
        Connector keys for the Meerschaum instance of the pipes.
        Must be a `meerschaum.connectors.sql.SQLConnector.SQLConnector` or
        `meerschaum.connectors.api.APIConnector.APIConnector`.
        
    as_list: bool, default False
        If `True`, return pipes in a list instead of a hierarchical dictionary.
        `False` : `{connector_keys: {metric_key: {location_key: Pipe}}}`
        `True`  : `[Pipe]`

    method: str, default 'registered'
        Available options: `['registered', 'explicit', 'all']`
        If `'registered'` (default), create pipes based on registered keys in the connector's pipes table
        (API or SQL connector, depends on mrsm_instance).
        If `'explicit'`, create pipes from provided connector_keys, metric_keys, and location_keys
        instead of consulting the pipes table. Useful for creating non-existent pipes.
        If `'all'`, create pipes from predefined metrics and locations. Required `connector_keys`.
        **NOTE:** Method `'all'` is not implemented!

    wait: bool, default False
        Wait for a connection before getting Pipes. Should only be true for cases where the
        database might not be running (like the API).

    **kw: Any:
        Keyword arguments to pass to the `meerschaum.Pipe` constructor.
        

    Returns
    -------
    A dictionary of dictionaries and `meerschaum.Pipe` objects
    in the connector, metric, location hierarchy.
    If `as_list` is `True`, return a list of `meerschaum.Pipe` objects.

    Examples
    --------
    ```
    >>> ### Manual definition:
    >>> pipes = {
    ...     <connector_keys>: {
    ...         <metric_key>: {
    ...             <location_key>: Pipe(
    ...                 <connector_keys>,
    ...                 <metric_key>,
    ...                 <location_key>,
    ...             ),
    ...         },
    ...     },
    ... },
    >>> ### Accessing a single pipe:
    >>> pipes['sql:main']['weather'][None]
    >>> ### Return a list instead:
    >>> get_pipes(as_list=True)
    [sql_main_weather]
    >>> 
    ```
    """

    from meerschaum.config import get_config
    from meerschaum.utils.warnings import error
    from meerschaum.utils.misc import filter_keywords

    if connector_keys is None:
        connector_keys = []
    if metric_keys is None:
        metric_keys = []
    if location_keys is None:
        location_keys = []
    if params is None:
        params = {}
    if tags is None:
        tags = []

    if isinstance(connector_keys, str):
        connector_keys = [connector_keys]
    if isinstance(metric_keys, str):
        metric_keys = [metric_keys]
    if isinstance(location_keys, str):
        location_keys = [location_keys]

    ### Get SQL or API connector (keys come from `connector.fetch_pipes_keys()`).
    ### If `wait`, wait until a connection is made
    if mrsm_instance is None:
        mrsm_instance = instance
    if mrsm_instance is None:
        mrsm_instance = get_config('meerschaum', 'instance', patch=True)
    if isinstance(mrsm_instance, str):
        from meerschaum.connectors.parse import parse_instance_keys
        connector = parse_instance_keys(keys=mrsm_instance, wait=wait, debug=debug)
    else: ### NOTE: mrsm_instance MUST be a SQL or API connector for this to work
        from meerschaum.connectors import instance_types
        valid_connector = False
        if hasattr(mrsm_instance, 'type'):
            if mrsm_instance.type in instance_types:
                valid_connector = True
        if not valid_connector:
            error(f"Invalid instance connector: {mrsm_instance}")
        connector = mrsm_instance
    if debug:
        from meerschaum.utils.debug import dprint
        dprint(f"Using instance connector: {connector}")
    if not connector:
        error(f"Could not create connector from keys: '{mrsm_instance}'")

    ### Get a list of tuples for the keys needed to build pipes.
    result = fetch_pipes_keys(
        method,
        connector,
        connector_keys = connector_keys,
        metric_keys = metric_keys,
        location_keys = location_keys,
        tags = tags,
        params = params,
        debug = debug
    )
    if result is None:
        error(f"Unable to build pipes!")

    ### Populate the `pipes` dictionary with Pipes based on the keys
    ### obtained from the chosen `method`.
    from meerschaum import Pipe
    pipes = {}
    for ck, mk, lk in result:
        if ck not in pipes:
            pipes[ck] = {}

        if mk not in pipes[ck]:
            pipes[ck][mk] = {}

        pipes[ck][mk][lk] = Pipe(
            ck, mk, lk,
            mrsm_instance=connector,
            debug=debug,
            **filter_keywords(Pipe, **kw)
        )

    if not as_list:
        return pipes
    from meerschaum.utils.misc import flatten_pipes_dict
    return flatten_pipes_dict(pipes)
def pprint(*args, detect_password: bool = True, nopretty: bool = False, **kw)

Pretty print an object according to the configured ANSI and UNICODE settings. If detect_password is True (default), search and replace passwords with '*' characters. Does not mutate objects.

Parameters

*args :

detect_password : bool :
(Default value = True)
nopretty : bool :
(Default value = False)

**kw :

Returns

Expand source code
def pprint(
        *args,
        detect_password : bool = True,
        nopretty : bool = False,
        **kw
    ):
    """Pretty print an object according to the configured ANSI and UNICODE settings.
    If detect_password is True (default), search and replace passwords with '*' characters.
    Does not mutate objects.

    Parameters
    ----------
    *args :
        
    detect_password : bool :
         (Default value = True)
    nopretty : bool :
         (Default value = False)
    **kw :
        

    Returns
    -------

    """
    from meerschaum.utils.packages import attempt_import, import_rich
    from meerschaum.utils.formatting import ANSI, UNICODE, get_console
    from meerschaum.utils.warnings import error
    from meerschaum.utils.misc import replace_password, dict_from_od, filter_keywords
    from collections import OrderedDict
    import copy, json
    modify = True
    rich_pprint = None
    if ANSI and not nopretty:
        rich = import_rich()
        if rich is not None:
            rich_pretty = attempt_import('rich.pretty')
        if rich_pretty is not None:
            def _rich_pprint(*args, **kw):
                _console = get_console()
                _kw = filter_keywords(_console.print, **kw)
                _console.print(*args, **_kw)
            rich_pprint = _rich_pprint
    elif not nopretty:
        pprintpp = attempt_import('pprintpp', warn=False)
        try:
            _pprint = pprintpp.pprint
        except Exception as e:
            import pprint as _pprint_module
            _pprint = _pprint_module.pprint

    func = (
        _pprint if rich_pprint is None else rich_pprint
    ) if not nopretty else print

    try:
        args_copy = copy.deepcopy(args)
    except Exception as e:
        args_copy = args
        modify = False
    _args = []
    for a in args:
        c = a
        ### convert OrderedDict into dict
        if isinstance(a, OrderedDict) or issubclass(type(a), OrderedDict):
            c = dict_from_od(copy.deepcopy(c))
        _args.append(c)
    args = _args

    _args = list(args)
    if detect_password and modify:
        _args = []
        for a in args:
            c = a
            if isinstance(c, dict):
                c = replace_password(copy.deepcopy(c))
            if nopretty:
                try:
                    c = json.dumps(c)
                    is_json = True
                except Exception as e:
                    is_json = False
                if not is_json:
                    try:
                        c = str(c)
                    except Exception as e:
                        pass
            _args.append(c)

    ### filter out unsupported keywords
    func_kw = filter_keywords(func, **kw) if not nopretty else {}
    error_msg = None
    try:
        func(*_args, **func_kw)
    except Exception as e:
        error_msg = e
    if error_msg is not None:
        error(error_msg)

Classes

class Pipe (connector: str = '', metric: str = '', location: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None, columns: Optional[Dict[str, str]] = None, tags: Optional[List[str]] = None, target: Optional[str] = None, dtypes: Optional[Dict[str, str]] = None, instance: Optional[Union[str, InstanceConnector]] = None, temporary: bool = False, mrsm_instance: Optional[Union[str, InstanceConnector]] = None, cache: bool = False, debug: bool = False, connector_keys: Optional[str] = None, metric_key: Optional[str] = None, location_key: Optional[str] = None)

Access Meerschaum pipes via Pipe objects.

Pipes are identified by the following:

  1. Connector keys (e.g. 'sql:main')
  2. Metric key (e.g. 'weather')
  3. Location (optional; e.g. None)

A pipe's connector keys correspond to a data source, and when the pipe is synced, its fetch definition is evaluated and executed to produce new data.

Alternatively, new data may be directly synced via pipe.sync():

>>> from meerschaum import Pipe
>>> pipe = Pipe('csv', 'weather')
>>>
>>> import pandas as pd
>>> df = pd.read_csv('weather.csv')
>>> pipe.sync(df)

Parameters

connector : str
Keys for the pipe's source connector, e.g. 'sql:main'.
metric : str
Label for the pipe's contents, e.g. 'weather'.
location : str, default None
Label for the pipe's location. Defaults to None.
parameters : Optional[Dict[str, Any]], default None
Optionally set a pipe's parameters from the constructor, e.g. columns and other attributes. You can edit these parameters with edit pipes.
columns : Optional[Dict[str, str]], default None
Set the columns dictionary of parameters. If parameters is also provided, this dictionary is added under the 'columns' key.
tags : Optional[List[str]], default None
A list of strings to be added under the 'tags' key of parameters. You can select pipes with certain tags using --tags.
dtypes : Optional[Dict[str, str]], default None
Set the dtypes dictionary of parameters. If parameters is also provided, this dictionary is added under the 'dtypes' key.
mrsm_instance : Optional[Union[str, InstanceConnector]], default None
Connector for the Meerschaum instance where the pipe resides. Defaults to the preconfigured default instance ('sql:main').
instance : Optional[Union[str, InstanceConnector]], default None
Alias for mrsm_instance. If mrsm_instance is supplied, this value is ignored.
temporary : bool, default False
If True, prevent instance tables (pipes, users, plugins) from being created.
cache : bool, default False
If True, cache fetched data into a local database file. Defaults to False.
Expand source code
class Pipe:
    """
    Access Meerschaum pipes via Pipe objects.
    
    Pipes are identified by the following:

    1. Connector keys (e.g. `'sql:main'`)
    2. Metric key (e.g. `'weather'`)
    3. Location (optional; e.g. `None`)
    
    A pipe's connector keys correspond to a data source, and when the pipe is synced,
    its `fetch` definition is evaluated and executed to produce new data.
    
    Alternatively, new data may be directly synced via `pipe.sync()`:
    
    ```
    >>> from meerschaum import Pipe
    >>> pipe = Pipe('csv', 'weather')
    >>>
    >>> import pandas as pd
    >>> df = pd.read_csv('weather.csv')
    >>> pipe.sync(df)
    ```
    """

    from ._fetch import fetch
    from ._data import get_data, get_backtrack_data, get_rowcount, _get_data_as_iterator
    from ._register import register
    from ._attributes import (
        attributes,
        parameters,
        columns,
        dtypes,
        get_columns,
        get_columns_types,
        get_indices,
        tags,
        get_id,
        id,
        get_val_column,
        parents,
        children,
        target,
        _target_legacy,
        guess_datetime,
    )
    from ._show import show
    from ._edit import edit, edit_definition, update
    from ._sync import sync, get_sync_time, exists, filter_existing, _get_chunk_label
    from ._delete import delete
    from ._drop import drop
    from ._clear import clear
    from ._bootstrap import bootstrap
    from ._dtypes import enforce_dtypes, infer_dtypes

    def __init__(
        self,
        connector: str = '',
        metric: str = '',
        location: Optional[str] = None,
        parameters: Optional[Dict[str, Any]] = None,
        columns: Optional[Dict[str, str]] = None,
        tags: Optional[List[str]] = None,
        target: Optional[str] = None,
        dtypes: Optional[Dict[str, str]] = None,
        instance: Optional[Union[str, InstanceConnector]] = None,
        temporary: bool = False,
        mrsm_instance: Optional[Union[str, InstanceConnector]] = None,
        cache: bool = False,
        debug: bool = False,
        connector_keys: Optional[str] = None,
        metric_key: Optional[str] = None,
        location_key: Optional[str] = None,
    ):
        """
        Parameters
        ----------
        connector: str
            Keys for the pipe's source connector, e.g. `'sql:main'`.

        metric: str
            Label for the pipe's contents, e.g. `'weather'`.

        location: str, default None
            Label for the pipe's location. Defaults to `None`.

        parameters: Optional[Dict[str, Any]], default None
            Optionally set a pipe's parameters from the constructor,
            e.g. columns and other attributes.
            You can edit these parameters with `edit pipes`.

        columns: Optional[Dict[str, str]], default None
            Set the `columns` dictionary of `parameters`.
            If `parameters` is also provided, this dictionary is added under the `'columns'` key.

        tags: Optional[List[str]], default None
            A list of strings to be added under the `'tags'` key of `parameters`.
            You can select pipes with certain tags using `--tags`.

        dtypes: Optional[Dict[str, str]], default None
            Set the `dtypes` dictionary of `parameters`.
            If `parameters` is also provided, this dictionary is added under the `'dtypes'` key.

        mrsm_instance: Optional[Union[str, InstanceConnector]], default None
            Connector for the Meerschaum instance where the pipe resides.
            Defaults to the preconfigured default instance (`'sql:main'`).

        instance: Optional[Union[str, InstanceConnector]], default None
            Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored.

        temporary: bool, default False
            If `True`, prevent instance tables (pipes, users, plugins) from being created.

        cache: bool, default False
            If `True`, cache fetched data into a local database file.
            Defaults to `False`.
        """
        from meerschaum.utils.warnings import error, warn
        if (not connector and not connector_keys) or (not metric and not metric_key):
            error(
                "Please provide strings for the connector and metric\n    "
                + "(first two positional arguments)."
            )

        ### Fall back to legacy `location_key` just in case.
        if not location:
            location = location_key

        if not connector:
            connector = connector_keys

        if not metric:
            metric = metric_key

        if location in ('[None]', 'None'):
            location = None

        from meerschaum.config.static import STATIC_CONFIG
        negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
        for k in (connector, metric, location, *(tags or [])):
            if str(k).startswith(negation_prefix):
                error(f"A pipe's keys and tags cannot start with the prefix '{negation_prefix}'.")

        self.connector_keys = str(connector)
        self.connector_key = self.connector_keys ### Alias
        self.metric_key = metric
        self.location_key = location
        self.temporary = temporary

        self._attributes = {
            'connector_keys': self.connector_keys,
            'metric_key': self.metric_key,
            'location_key': self.location_key,
            'parameters': {},
        }

        ### only set parameters if values are provided
        if isinstance(parameters, dict):
            self._attributes['parameters'] = parameters
        else:
            if parameters is not None:
                warn(f"The provided parameters are of invalid type '{type(parameters)}'.")
            self._attributes['parameters'] = {}

        if isinstance(columns, dict):
            self._attributes['parameters']['columns'] = columns
        elif columns is not None:
            warn(f"The provided columns are of invalid type '{type(columns)}'.")

        if isinstance(tags, (list, tuple)):
            self._attributes['parameters']['tags'] = tags
        elif tags is not None:
            warn(f"The provided tags are of invalid type '{type(tags)}'.")

        if isinstance(target, str):
            self._attributes['parameters']['target'] = target
        elif target is not None:
            warn(f"The provided target is of invalid type '{type(target)}'.")

        if isinstance(dtypes, dict):
            self._attributes['parameters']['dtypes'] = dtypes
        elif dtypes is not None:
            warn(f"The provided dtypes are of invalid type '{type(dtypes)}'.")

        ### NOTE: The parameters dictionary is {} by default.
        ###       A Pipe may be registered without parameters, then edited,
        ###       or a Pipe may be registered with parameters set in-memory first.
        #  from meerschaum.config import get_config
        _mrsm_instance = mrsm_instance if mrsm_instance is not None else instance
        if _mrsm_instance is None:
            _mrsm_instance = get_config('meerschaum', 'instance', patch=True)

        if not isinstance(_mrsm_instance, str):
            self._instance_connector = _mrsm_instance
            self.instance_keys = str(_mrsm_instance)
        else: ### NOTE: must be SQL or API Connector for this work
            self.instance_keys = _mrsm_instance

        self._cache = cache and get_config('system', 'experimental', 'cache')


    @property
    def meta(self):
        """Simulate the MetaPipe model without importing FastAPI."""
        if '_meta' not in self.__dict__:
            self._meta = {
                'connector_keys' : self.connector_keys,
                'metric_key'     : self.metric_key,
                'location_key'   : self.location_key,
                'instance'       : self.instance_keys,
            }
        return self._meta


    @property
    def instance_connector(self) -> Union[InstanceConnector, None]:
        """
        The connector to where this pipe resides.
        May either be of type `meerschaum.connectors.sql.SQLConnector` or
        `meerschaum.connectors.api.APIConnector`.
        """
        if '_instance_connector' not in self.__dict__:
            from meerschaum.connectors.parse import parse_instance_keys
            conn = parse_instance_keys(self.instance_keys)
            if conn:
                self._instance_connector = conn
            else:
                return None
        return self._instance_connector

    @property
    def connector(self) -> Union[meerschaum.connectors.Connector, None]:
        """
        The connector to the data source.
        """
        if '_connector' not in self.__dict__:
            from meerschaum.connectors.parse import parse_instance_keys
            import warnings
            with warnings.catch_warnings():
                warnings.simplefilter('ignore')
                try:
                    conn = parse_instance_keys(self.connector_keys)
                except Exception as e:
                    conn = None
            if conn:
                self._connector = conn
            else:
                return None
        return self._connector


    @property
    def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]:
        """
        If the pipe was created with `cache=True`, return the connector to the pipe's
        SQLite database for caching.
        """
        if not self._cache:
            return None

        if '_cache_connector' not in self.__dict__:
            from meerschaum.connectors import get_connector
            from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH
            _resources_path = SQLITE_RESOURCES_PATH
            self._cache_connector = get_connector(
                'sql', '_cache_' + str(self),
                flavor='sqlite',
                database=str(_resources_path / ('_cache_' + str(self) + '.db')),
            )

        return self._cache_connector


    @property
    def cache_pipe(self) -> Union['meerschaum.Pipe', None]:
        """
        If the pipe was created with `cache=True`, return another `meerschaum.Pipe` used to
        manage the local data.
        """
        if self.cache_connector is None:
            return None
        if '_cache_pipe' not in self.__dict__:
            from meerschaum.config._patch import apply_patch_to_config
            from meerschaum.utils.sql import sql_item_name
            _parameters = copy.deepcopy(self.parameters)
            _fetch_patch = {
                'fetch': ({
                    'definition': (
                        f"SELECT * "
                        + f"FROM {sql_item_name(str(self.target), self.instance_connector.flavor)}"
                    ),
                }) if self.instance_connector.type == 'sql' else ({
                    'connector_keys': self.connector_keys,
                    'metric_key': self.metric_key,
                    'location_key': self.location_key,
                })
            }
            _parameters = apply_patch_to_config(_parameters, _fetch_patch)
            self._cache_pipe = Pipe(
                self.instance_keys,
                (self.connector_keys + '_' + self.metric_key + '_cache'),
                self.location_key,
                mrsm_instance = self.cache_connector,
                parameters = _parameters,
                cache = False,
                temporary = True,
            )

        return self._cache_pipe


    @property
    def sync_time(self) -> Union['datetime.datetime', None]:
        """
        Convenience function to get the pipe's latest datetime.
        Use `meerschaum.Pipe.get_sync_time()` instead.
        """
        return self.get_sync_time()

    def __str__(self, ansi: bool=False):
        return pipe_repr(self, ansi=ansi)


    def __eq__(self, other):
        try:
            return (
                isinstance(self, type(other))
                and self.connector_keys == other.connector_keys
                and self.metric_key == other.metric_key
                and self.location_key == other.location_key
                and self.instance_keys == other.instance_keys
            )
        except Exception as e:
            return False

    def __hash__(self):
        ### Using an esoteric separator to avoid collisions.
        sep = "[\"']"
        return hash(
            str(self.connector_keys) + sep
            + str(self.metric_key) + sep
            + str(self.location_key) + sep
            + str(self.instance_keys) + sep
        )

    def __repr__(self, **kw) -> str:
        return pipe_repr(self, **kw)

    def __getstate__(self) -> Dict[str, Any]:
        """
        Define the state dictionary (pickling).
        """
        return {
            'connector_keys': self.connector_keys,
            'metric_key': self.metric_key,
            'location_key': self.location_key,
            'parameters': self.parameters,
            'mrsm_instance': self.instance_keys,
        }

    def __setstate__(self, _state: Dict[str, Any]):
        """
        Read the state (unpickling).
        """
        connector_keys = _state.pop('connector_keys')
        metric_key = _state.pop('metric_key')
        location_key = _state.pop('location_key')
        self.__init__(connector_keys, metric_key, location_key, **_state)

Instance variables

var attributes : Dict[str, Any]

Return a dictionary of a pipe's keys and parameters. These values are reflected directly from the pipes table of the instance.

Expand source code
@property
def attributes(self) -> Dict[str, Any]:
    """
    Return a dictionary of a pipe's keys and parameters.
    These values are reflected directly from the pipes table of the instance.
    """
    import time
    from meerschaum.config import get_config
    from meerschaum.config._patch import apply_patch_to_config
    from meerschaum.utils.venv import Venv
    from meerschaum.connectors import get_connector_plugin

    timeout_seconds = get_config('pipes', 'attributes', 'local_cache_timeout_seconds')

    if '_attributes' not in self.__dict__:
        self._attributes = {}

    now = time.perf_counter()
    last_refresh = self.__dict__.get('_attributes_sync_time', None)
    timed_out = (
        last_refresh is None
        or
        (timeout_seconds is not None and (now - last_refresh) >= timeout_seconds)
    )
    if not self.temporary and timed_out:
        self._attributes_sync_time = now
        local_attributes = self.__dict__.get('_attributes', {})
        with Venv(get_connector_plugin(self.instance_connector)):
            instance_attributes = self.instance_connector.get_pipe_attributes(self)
        self._attributes = apply_patch_to_config(instance_attributes, local_attributes)
    return self._attributes
var cache_connector : Union[meerschaum.connectors.sql.SQLConnector, None]

If the pipe was created with cache=True, return the connector to the pipe's SQLite database for caching.

Expand source code
@property
def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]:
    """
    If the pipe was created with `cache=True`, return the connector to the pipe's
    SQLite database for caching.
    """
    if not self._cache:
        return None

    if '_cache_connector' not in self.__dict__:
        from meerschaum.connectors import get_connector
        from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH
        _resources_path = SQLITE_RESOURCES_PATH
        self._cache_connector = get_connector(
            'sql', '_cache_' + str(self),
            flavor='sqlite',
            database=str(_resources_path / ('_cache_' + str(self) + '.db')),
        )

    return self._cache_connector
var cache_pipe : Union['Pipe', None]

If the pipe was created with cache=True, return another Pipe used to manage the local data.

Expand source code
@property
def cache_pipe(self) -> Union['meerschaum.Pipe', None]:
    """
    If the pipe was created with `cache=True`, return another `meerschaum.Pipe` used to
    manage the local data.
    """
    if self.cache_connector is None:
        return None
    if '_cache_pipe' not in self.__dict__:
        from meerschaum.config._patch import apply_patch_to_config
        from meerschaum.utils.sql import sql_item_name
        _parameters = copy.deepcopy(self.parameters)
        _fetch_patch = {
            'fetch': ({
                'definition': (
                    f"SELECT * "
                    + f"FROM {sql_item_name(str(self.target), self.instance_connector.flavor)}"
                ),
            }) if self.instance_connector.type == 'sql' else ({
                'connector_keys': self.connector_keys,
                'metric_key': self.metric_key,
                'location_key': self.location_key,
            })
        }
        _parameters = apply_patch_to_config(_parameters, _fetch_patch)
        self._cache_pipe = Pipe(
            self.instance_keys,
            (self.connector_keys + '_' + self.metric_key + '_cache'),
            self.location_key,
            mrsm_instance = self.cache_connector,
            parameters = _parameters,
            cache = False,
            temporary = True,
        )

    return self._cache_pipe
var children : List[Pipe]

Return a list of Pipe objects to be designated as children.

Expand source code
@property
def children(self) -> List[meerschaum.Pipe]:
    """
    Return a list of `meerschaum.Pipe` objects to be designated as children.
    """
    if 'children' not in self.parameters:
        return []
    from meerschaum.utils.warnings import warn
    _children_keys = self.parameters['children']
    if not isinstance(_children_keys, list):
        warn(
            f"Please ensure the children for {self} are defined as a list of keys.",
            stacklevel = 4
        )
        return []
    from meerschaum import Pipe
    _children = []
    for keys in _children_keys:
        try:
            p = Pipe(**keys)
        except Exception as e:
            warn(f"Unable to build parent with keys '{keys}' for {self}:\n{e}")
            continue
        _children.append(p)
    return _children
var columns : Optional[Dict[str, str]]

Return the columns dictionary defined in Pipe.parameters.

Expand source code
@property
def columns(self) -> Union[Dict[str, str], None]:
    """
    Return the `columns` dictionary defined in `meerschaum.Pipe.parameters`.
    """
    if 'columns' not in self.parameters:
        self.parameters['columns'] = {}
    cols = self.parameters['columns']
    if not isinstance(cols, dict):
        cols = {}
        self.parameters['columns'] = cols
    return cols
var connector : Union[Connector, None]

The connector to the data source.

Expand source code
@property
def connector(self) -> Union[meerschaum.connectors.Connector, None]:
    """
    The connector to the data source.
    """
    if '_connector' not in self.__dict__:
        from meerschaum.connectors.parse import parse_instance_keys
        import warnings
        with warnings.catch_warnings():
            warnings.simplefilter('ignore')
            try:
                conn = parse_instance_keys(self.connector_keys)
            except Exception as e:
                conn = None
        if conn:
            self._connector = conn
        else:
            return None
    return self._connector
var dtypes : Optional[Dict[str, Any]]

If defined, return the dtypes dictionary defined in Pipe.parameters.

Expand source code
@property
def dtypes(self) -> Union[Dict[str, Any], None]:
    """
    If defined, return the `dtypes` dictionary defined in `meerschaum.Pipe.parameters`.
    """
    from meerschaum.config._patch import apply_patch_to_config
    configured_dtypes = self.parameters.get('dtypes', {})
    remote_dtypes = self.infer_dtypes(persist=False)
    patched_dtypes = apply_patch_to_config(remote_dtypes, configured_dtypes)
    self.parameters['dtypes'] = patched_dtypes
    return self.parameters['dtypes']
var id : Optional[int]

Fetch and cache a pipe's ID.

Expand source code
@property
def id(self) -> Union[int, None]:
    """
    Fetch and cache a pipe's ID.
    """
    if not ('_id' in self.__dict__ and self._id):
        self._id = self.get_id()
    return self._id
var instance_connector : Union[InstanceConnector, None]

The connector to where this pipe resides. May either be of type meerschaum.connectors.sql.SQLConnector or meerschaum.connectors.api.APIConnector.

Expand source code
@property
def instance_connector(self) -> Union[InstanceConnector, None]:
    """
    The connector to where this pipe resides.
    May either be of type `meerschaum.connectors.sql.SQLConnector` or
    `meerschaum.connectors.api.APIConnector`.
    """
    if '_instance_connector' not in self.__dict__:
        from meerschaum.connectors.parse import parse_instance_keys
        conn = parse_instance_keys(self.instance_keys)
        if conn:
            self._instance_connector = conn
        else:
            return None
    return self._instance_connector
var meta

Simulate the MetaPipe model without importing FastAPI.

Expand source code
@property
def meta(self):
    """Simulate the MetaPipe model without importing FastAPI."""
    if '_meta' not in self.__dict__:
        self._meta = {
            'connector_keys' : self.connector_keys,
            'metric_key'     : self.metric_key,
            'location_key'   : self.location_key,
            'instance'       : self.instance_keys,
        }
    return self._meta
var parameters : Optional[Dict[str, Any]]

Return the parameters dictionary of the pipe.

Expand source code
@property
def parameters(self) -> Optional[Dict[str, Any]]:
    """
    Return the parameters dictionary of the pipe.
    """
    if 'parameters' not in self.attributes:
        self.attributes['parameters'] = {}
    return self.attributes['parameters']
var parents : List[Pipe]

Return a list of Pipe objects to be designated as parents.

Expand source code
@property
def parents(self) -> List[meerschaum.Pipe]:
    """
    Return a list of `meerschaum.Pipe` objects to be designated as parents.
    """
    if 'parents' not in self.parameters:
        return []
    from meerschaum.utils.warnings import warn
    _parents_keys = self.parameters['parents']
    if not isinstance(_parents_keys, list):
        warn(
            f"Please ensure the parents for {self} are defined as a list of keys.",
            stacklevel = 4
        )
        return []
    from meerschaum import Pipe
    _parents = []
    for keys in _parents_keys:
        try:
            p = Pipe(**keys)
        except Exception as e:
            warn(f"Unable to build parent with keys '{keys}' for {self}:\n{e}")
            continue
        _parents.append(p)
    return _parents
var sync_time : Union['datetime.datetime', None]

Convenience function to get the pipe's latest datetime. Use get_sync_time() instead.

Expand source code
@property
def sync_time(self) -> Union['datetime.datetime', None]:
    """
    Convenience function to get the pipe's latest datetime.
    Use `meerschaum.Pipe.get_sync_time()` instead.
    """
    return self.get_sync_time()
var tags : Optional[List[str]]

If defined, return the tags list defined in Pipe.parameters.

Expand source code
@property
def tags(self) -> Union[List[str], None]:
    """
    If defined, return the `tags` list defined in `meerschaum.Pipe.parameters`.
    """
    if 'tags' not in self.parameters:
        self.parameters['tags'] = []
    return self.parameters['tags']
var target : str

The target table name. You can set the target name under on of the following keys (checked in this order): - target - target_name - target_table - target_table_name

Expand source code
@property
def target(self) -> str:
    """
    The target table name.
    You can set the target name under on of the following keys
    (checked in this order):
      - `target`
      - `target_name`
      - `target_table`
      - `target_table_name`
    """
    if 'target' not in self.parameters:
        target = self._target_legacy()
        potential_keys = ('target_name', 'target_table', 'target_table_name')
        for k in potential_keys:
            if k in self.parameters:
                target = self.parameters[k]
                break

        if self.instance_connector.type == 'sql':
            from meerschaum.utils.sql import truncate_item_name
            truncated_target = truncate_item_name(target, self.instance_connector.flavor)
            if truncated_target != target:
                warn(
                    f"The target '{target}' is too long for '{self.instance_connector.flavor}', "
                    + f"will use {truncated_target} instead."
                )
                target = truncated_target

        self.target = target
    return self.parameters['target']

Methods

def bootstrap(self, debug: bool = False, yes: bool = False, force: bool = False, noask: bool = False, shell: bool = False, **kw) ‑> Tuple[bool, str]

Prompt the user to create a pipe's requirements all from one method. This method shouldn't be used in any automated scripts because it interactively prompts the user and therefore may hang.

Parameters

debug : bool, default False:
Verbosity toggle.
yes : bool, default False:
Print the questions and automatically agree.
force : bool, default False:
Skip the questions and agree anyway.
noask : bool, default False:
Print the questions but go with the default answer.
shell : bool, default False:
Used to determine if we are in the interactive shell.

Returns

A SuccessTuple corresponding to the success of this procedure.

Expand source code
def bootstrap(
        self,
        debug: bool = False,
        yes: bool = False,
        force: bool = False,
        noask: bool = False,
        shell: bool = False,
        **kw
    ) -> SuccessTuple:
    """
    Prompt the user to create a pipe's requirements all from one method.
    This method shouldn't be used in any automated scripts because it interactively
    prompts the user and therefore may hang.

    Parameters
    ----------
    debug: bool, default False:
        Verbosity toggle.

    yes: bool, default False:
        Print the questions and automatically agree.

    force: bool, default False:
        Skip the questions and agree anyway.

    noask: bool, default False:
        Print the questions but go with the default answer.

    shell: bool, default False:
        Used to determine if we are in the interactive shell.
        
    Returns
    -------
    A `SuccessTuple` corresponding to the success of this procedure.

    """

    from meerschaum.utils.warnings import warn, info, error
    from meerschaum.utils.prompt import prompt, yes_no
    from meerschaum.utils.formatting import pprint
    from meerschaum.config import get_config
    from meerschaum.utils.formatting._shell import clear_screen
    from meerschaum.utils.formatting import print_tuple
    from meerschaum.actions import actions
    from meerschaum.utils.venv import Venv
    from meerschaum.connectors import get_connector_plugin

    _clear = get_config('shell', 'clear_screen', patch=True)

    if self.get_id(debug=debug) is not None:
        delete_tuple = self.delete(debug=debug)
        if not delete_tuple[0]:
            return delete_tuple

    if _clear:
        clear_screen(debug=debug)

    _parameters = _get_parameters(self, debug=debug)
    self.parameters = _parameters
    pprint(self.parameters)
    try:
        prompt(
            f"\n    Press [Enter] to register {self} with the above configuration:",
            icon = False
        )
    except KeyboardInterrupt as e:
        return False, f"Aborting bootstrapping {self}."

    with Venv(get_connector_plugin(self.instance_connector)):
        register_tuple = self.instance_connector.register_pipe(self, debug=debug)

    if not register_tuple[0]:
        return register_tuple

    if _clear:
        clear_screen(debug=debug)

    try:
        if yes_no(
            f"Would you like to edit the definition for {self}?", yes=yes, noask=noask
        ):
            edit_tuple = self.edit_definition(debug=debug)
            if not edit_tuple[0]:
                return edit_tuple

        if yes_no(f"Would you like to try syncing {self} now?", yes=yes, noask=noask):
            sync_tuple = actions['sync'](
                ['pipes'],
                connector_keys = [self.connector_keys],
                metric_keys = [self.metric_key],
                location_keys = [self.location_key],
                mrsm_instance = str(self.instance_connector),
                debug = debug,
                shell = shell,
            )
            if not sync_tuple[0]:
                return sync_tuple
    except Exception as e:
        return False, f"Failed to bootstrap {self}:\n" + str(e)

    print_tuple((True, f"Finished bootstrapping {self}!"))
    info(
        f"You can edit this pipe later with `edit pipes` "
        + "or set the definition with `edit pipes definition`.\n"
        + "    To sync data into your pipe, run `sync pipes`."
    )

    return True, "Success"
def clear(self, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw: Any) ‑> SuccessTuple

Call the Pipe's instance connector's clear_pipe method.

Parameters

begin : Optional[datetime.datetime], default None:
If provided, only remove rows newer than this datetime value.
end : Optional[datetime.datetime], default None:
If provided, only remove rows older than this datetime column (not including end).
params : Optional[Dict[str, Any]], default None
See build_where().
debug : bool, default False:
Verbositity toggle.

Returns

A SuccessTuple corresponding to whether this procedure completed successfully.

Examples

>>> pipe = mrsm.Pipe('test', 'test', columns={'datetime': 'dt'}, instance='sql:local')
>>> pipe.sync({'dt': [datetime.datetime(2020, 1, 1, 0, 0)]})
>>> pipe.sync({'dt': [datetime.datetime(2021, 1, 1, 0, 0)]})
>>> pipe.sync({'dt': [datetime.datetime(2022, 1, 1, 0, 0)]})
>>> 
>>> pipe.clear(begin=datetime.datetime(2021, 1, 1, 0, 0))
>>> pipe.get_data()
          dt
0 2020-01-01
Expand source code
def clear(
        self,
        begin: Optional[datetime.datetime] = None,
        end: Optional[datetime.datetime] = None,
        params: Optional[Dict[str, Any]] = None,
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """
    Call the Pipe's instance connector's `clear_pipe` method.

    Parameters
    ----------
    begin: Optional[datetime.datetime], default None:
        If provided, only remove rows newer than this datetime value.

    end: Optional[datetime.datetime], default None:
        If provided, only remove rows older than this datetime column (not including end).

    params: Optional[Dict[str, Any]], default None
         See `meerschaum.utils.sql.build_where`.

    debug: bool, default False:
        Verbositity toggle.

    Returns
    -------
    A `SuccessTuple` corresponding to whether this procedure completed successfully.

    Examples
    --------
    >>> pipe = mrsm.Pipe('test', 'test', columns={'datetime': 'dt'}, instance='sql:local')
    >>> pipe.sync({'dt': [datetime.datetime(2020, 1, 1, 0, 0)]})
    >>> pipe.sync({'dt': [datetime.datetime(2021, 1, 1, 0, 0)]})
    >>> pipe.sync({'dt': [datetime.datetime(2022, 1, 1, 0, 0)]})
    >>> 
    >>> pipe.clear(begin=datetime.datetime(2021, 1, 1, 0, 0))
    >>> pipe.get_data()
              dt
    0 2020-01-01

    """
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.venv import Venv
    from meerschaum.connectors import get_connector_plugin

    if self.cache_pipe is not None:
        success, msg = self.cache_pipe.clear(begin=begin, end=end, debug=debug, **kw)
        if not success:
            warn(msg)

    with Venv(get_connector_plugin(self.instance_connector)):
        return self.instance_connector.clear_pipe(
            self,
            begin=begin, end=end, params=params, debug=debug,
            **kw
        )
def delete(self, debug: bool = False, **kw) ‑> Tuple[bool, str]

Call the Pipe's instance connector's delete_pipe() method.

Parameters

debug : bool, default False:
Verbosity toggle.

Returns

A SuccessTuple of success (bool), message (str).

Expand source code
def delete(
        self,
        debug: bool = False,
        **kw
    ) -> SuccessTuple:
    """
    Call the Pipe's instance connector's `delete_pipe()` method.

    Parameters
    ----------
    debug : bool, default False:
        Verbosity toggle.

    Returns
    -------
    A `SuccessTuple` of success (`bool`), message (`str`).

    """
    import os, pathlib
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.venv import Venv
    from meerschaum.connectors import get_connector_plugin

    if self.temporary:
        return (
            False,
            "Cannot delete pipes created with `temporary=True` (read-only). "
            + "You may want to call `pipe.drop()` instead."
        )

    if self.cache_pipe is not None:
        _drop_cache_tuple = self.cache_pipe.drop(debug=debug, **kw)
        if not _drop_cache_tuple[0]:
            warn(_drop_cache_tuple[1])
        if getattr(self.cache_connector, 'flavor', None) == 'sqlite':
            _cache_db_path = pathlib.Path(self.cache_connector.database)
            try:
                os.remove(_cache_db_path)
            except Exception as e:
                warn(f"Could not delete cache file '{_cache_db_path}' for {self}:\n{e}")

    with Venv(get_connector_plugin(self.instance_connector)):
        result = self.instance_connector.delete_pipe(self, debug=debug, **kw)

    if not isinstance(result, tuple):
        return False, f"Received unexpected result from '{self.instance_connector}': {result}"
    if result[0]:
        to_delete = ['_id']
        for member in to_delete:
            if member in self.__dict__:
                del self.__dict__[member]
    return result
def drop(self, debug: bool = False, **kw: Any) ‑> Tuple[bool, str]

Call the Pipe's instance connector's drop_pipe() method

Parameters

debug : bool, default False:
Verbosity toggle.

Returns

A SuccessTuple of success, message.

Expand source code
def drop(
        self,
        debug: bool = False,
        **kw : Any
    ) -> SuccessTuple:
    """
    Call the Pipe's instance connector's `drop_pipe()` method

    Parameters
    ----------
    debug: bool, default False:
        Verbosity toggle.

    Returns
    -------
    A `SuccessTuple` of success, message.

    """
    self._exists = False
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.venv import Venv
    from meerschaum.connectors import get_connector_plugin

    if self.cache_pipe is not None:
        _drop_cache_tuple = self.cache_pipe.drop(debug=debug, **kw)
        if not _drop_cache_tuple[0]:
            warn(_drop_cache_tuple[1])

    with Venv(get_connector_plugin(self.instance_connector)):
        result = self.instance_connector.drop_pipe(self, debug=debug, **kw)
    return result
def edit(self, patch: bool = False, interactive: bool = False, debug: bool = False, **kw: Any) ‑> Tuple[bool, str]

Edit a Pipe's configuration.

Parameters

patch : bool, default False
If patch is True, update parameters by cascading rather than overwriting.
interactive : bool, default False
If True, open an editor for the user to make changes to the pipe's YAML file.
debug : bool, default False
Verbosity toggle.

Returns

A SuccessTuple of success, message.

Expand source code
def edit(
        self,
        patch: bool = False,
        interactive: bool = False,
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """
    Edit a Pipe's configuration.

    Parameters
    ----------
    patch: bool, default False
        If `patch` is True, update parameters by cascading rather than overwriting.
    interactive: bool, default False
        If `True`, open an editor for the user to make changes to the pipe's YAML file.
    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `SuccessTuple` of success, message.

    """
    from meerschaum.utils.venv import Venv
    from meerschaum.connectors import get_connector_plugin

    if self.temporary:
        return False, "Cannot edit pipes created with `temporary=True` (read-only)."

    if not interactive:
        with Venv(get_connector_plugin(self.instance_connector)):
            return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw)
    from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
    from meerschaum.utils.misc import edit_file
    parameters_filename = str(self) + '.yaml'
    parameters_path = PIPES_CACHE_RESOURCES_PATH / parameters_filename
    
    from meerschaum.utils.yaml import yaml

    edit_text = f"Edit the parameters for {self}"
    edit_top = '#' * (len(edit_text) + 4)
    edit_header = edit_top + f'\n# {edit_text} #\n' + edit_top + '\n\n'

    from meerschaum.config import get_config
    parameters = dict(get_config('pipes', 'parameters', patch=True))
    from meerschaum.config._patch import apply_patch_to_config
    parameters = apply_patch_to_config(parameters, self.parameters)

    ### write parameters to yaml file
    with open(parameters_path, 'w+') as f:
        f.write(edit_header)
        yaml.dump(parameters, stream=f, sort_keys=False)

    ### only quit editing if yaml is valid
    editing = True
    while editing:
        edit_file(parameters_path)
        try:
            with open(parameters_path, 'r') as f:
                file_parameters = yaml.load(f.read())
        except Exception as e:
            from meerschaum.utils.warnings import warn
            warn(f"Invalid format defined for '{self}':\n\n{e}")
            input(f"Press [Enter] to correct the configuration for '{self}': ")
        else:
            editing = False

    self.parameters = file_parameters

    if debug:
        from meerschaum.utils.formatting import pprint
        pprint(self.parameters)

    with Venv(get_connector_plugin(self.instance_connector)):
        return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw)
def edit_definition(self, yes: bool = False, noask: bool = False, force: bool = False, debug: bool = False, **kw: Any) ‑> Tuple[bool, str]

Edit a pipe's definition file and update its configuration. NOTE: This function is interactive and should not be used in automated scripts!

Returns

A SuccessTuple of success, message.

Expand source code
def edit_definition(
        self,
        yes: bool = False,
        noask: bool = False,
        force: bool = False,
        debug : bool = False,
        **kw : Any
    ) -> SuccessTuple:
    """
    Edit a pipe's definition file and update its configuration.
    **NOTE:** This function is interactive and should not be used in automated scripts!

    Returns
    -------
    A `SuccessTuple` of success, message.

    """
    if self.temporary:
        return False, "Cannot edit pipes created with `temporary=True` (read-only)."

    from meerschaum.connectors import instance_types
    if (self.connector is None) or self.connector.type not in instance_types:
        return self.edit(interactive=True, debug=debug, **kw)

    import json
    from meerschaum.utils.warnings import info, warn
    from meerschaum.utils.debug import dprint
    from meerschaum.config._patch import apply_patch_to_config
    from meerschaum.utils.misc import edit_file

    _parameters = self.parameters
    if 'fetch' not in _parameters:
        _parameters['fetch'] = {}

    def _edit_api():
        from meerschaum.utils.prompt import prompt, yes_no
        info(
            f"Please enter the keys of the source pipe from '{self.connector}'.\n" +
            "Type 'None' for None, or empty when there is no default. Press [CTRL+C] to skip."
        )

        _keys = { 'connector_keys' : None, 'metric_key' : None, 'location_key' : None }
        for k in _keys:
            _keys[k] = _parameters['fetch'].get(k, None)

        for k, v in _keys.items():
            try:
                _keys[k] = prompt(k.capitalize().replace('_', ' ') + ':', icon=True, default=v)
            except KeyboardInterrupt:
                continue
            if _keys[k] in ('', 'None', '\'None\'', '[None]'):
                _keys[k] = None

        _parameters['fetch'] = apply_patch_to_config(_parameters['fetch'], _keys)

        info("You may optionally specify additional filter parameters as JSON.")
        print("  Parameters are translated into a 'WHERE x AND y' clause, and lists are IN clauses.")
        print("  For example, the following JSON would correspond to 'WHERE x = 1 AND y IN (2, 3)':")
        print(json.dumps({'x': 1, 'y': [2, 3]}, indent=2, separators=(',', ': ')))
        if force or yes_no(
            "Would you like to add additional filter parameters?",
            yes=yes, noask=noask
        ):
            from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
            definition_filename = str(self) + '.json'
            definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename
            try:
                definition_path.touch()
                with open(definition_path, 'w+') as f:
                    json.dump(_parameters.get('fetch', {}).get('params', {}), f, indent=2)
            except Exception as e:
                return False, f"Failed writing file '{definition_path}':\n" + str(e)

            _params = None
            while True:
                edit_file(definition_path)
                try:
                    with open(definition_path, 'r') as f:
                        _params = json.load(f)
                except Exception as e:
                    warn(f'Failed to read parameters JSON:\n{e}', stack=False)
                    if force or yes_no(
                        "Would you like to try again?\n  "
                        + "If not, the parameters JSON file will be ignored.",
                        noask=noask, yes=yes
                    ):
                        continue
                    _params = None
                break
            if _params is not None:
                if 'fetch' not in _parameters:
                    _parameters['fetch'] = {}
                _parameters['fetch']['params'] = _params

        self.parameters = _parameters
        return True, "Success"

    def _edit_sql():
        import pathlib, os, textwrap
        from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH
        from meerschaum.utils.misc import edit_file
        definition_filename = str(self) + '.sql'
        definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename

        sql_definition = _parameters['fetch'].get('definition', None)
        if sql_definition is None:
            sql_definition = ''
        sql_definition = textwrap.dedent(sql_definition).lstrip()

        try:
            definition_path.touch()
            with open(definition_path, 'w+') as f:
                f.write(sql_definition)
        except Exception as e:
            return False, f"Failed writing file '{definition_path}':\n" + str(e)

        edit_file(definition_path)
        try:
            with open(definition_path, 'r') as f:
                file_definition = f.read()
        except Exception as e:
            return False, f"Failed reading file '{definition_path}':\n" + str(e)

        if sql_definition == file_definition:
            return False, f"No changes made to definition for {self}."

        if ' ' not in file_definition:
            return False, f"Invalid SQL definition for {self}."

        if debug:
            dprint("Read SQL definition:\n\n" + file_definition)
        _parameters['fetch']['definition'] = file_definition
        self.parameters = _parameters
        return True, "Success"

    locals()['_edit_' + str(self.connector.type)]()
    return self.edit(interactive=False, debug=debug, **kw)
def enforce_dtypes(self, df: "'pd.DataFrame'", debug: bool = False) ‑> 'pd.DataFrame'

Cast the input dataframe to the pipe's registered data types. If the pipe does not exist and dtypes are not set, return the dataframe.

Expand source code
def enforce_dtypes(self, df: 'pd.DataFrame', debug: bool=False) -> 'pd.DataFrame':
    """
    Cast the input dataframe to the pipe's registered data types.
    If the pipe does not exist and dtypes are not set, return the dataframe.

    """
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.misc import parse_df_datetimes, enforce_dtypes as _enforce_dtypes
    from meerschaum.utils.packages import import_pandas
    pd = import_pandas(debug=debug)
    if df is None:
        if debug:
            dprint(
                f"Received None instead of a DataFrame.\n"
                + "    Skipping dtype enforcement..."
            )
        return df

    pipe_dtypes = self.dtypes

    try:
        if isinstance(df, str):
            df = parse_df_datetimes(
                pd.read_json(df),
                ignore_cols = [
                    col
                    for col, dtype in pipe_dtypes.items()
                    if 'datetime' not in str(dtype)
                ],
                debug = debug,
            )
        else:
            df = parse_df_datetimes(
                df,
                ignore_cols = [
                    col
                    for col, dtype in pipe_dtypes.items()
                    if 'datetime' not in str(dtype)
                ],
                debug = debug,
            )
    except Exception as e:
        warn(f"Unable to cast incoming data as a DataFrame...:\n{e}")
        return df

    if not pipe_dtypes:
        if debug:
            dprint(
                f"Could not find dtypes for {self}.\n"
                + "    Skipping dtype enforcement..."
            )
        return df

    return _enforce_dtypes(df, pipe_dtypes, debug=debug)
def exists(self, debug: bool = False) ‑> bool

See if a Pipe's table exists.

Parameters

debug : bool, default False
Verbosity toggle.

Returns

A bool corresponding to whether a pipe's underlying table exists.

Expand source code
def exists(
        self,
        debug : bool = False
    ) -> bool:
    """
    See if a Pipe's table exists.

    Parameters
    ----------
    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `bool` corresponding to whether a pipe's underlying table exists.

    """
    import time
    from meerschaum.utils.venv import Venv
    from meerschaum.connectors import get_connector_plugin
    from meerschaum.config import STATIC_CONFIG
    from meerschaum.utils.debug import dprint
    now = time.perf_counter()
    exists_timeout_seconds = STATIC_CONFIG['pipes']['exists_timeout_seconds']

    _exists = self.__dict__.get('_exists', None)
    if _exists:
        exists_timestamp = self.__dict__.get('_exists_timestamp', None)
        if exists_timestamp is not None:
            delta = now - exists_timestamp
            if delta < exists_timeout_seconds:
                if debug:
                    dprint(f"Returning cached `exists` for {self} ({round(delta, 2)} seconds old).")
                return _exists

    with Venv(get_connector_plugin(self.instance_connector)):
        _exists = self.instance_connector.pipe_exists(pipe=self, debug=debug)

    self.__dict__['_exists'] = _exists
    self.__dict__['_exists_timestamp'] = now
    return _exists
def fetch(self, begin: Optional[datetime.datetime, str] = '', end: Optional[datetime.datetime] = None, sync_chunks: bool = False, deactivate_plugin_venv: bool = True, debug: bool = False, **kw: Any) ‑> 'pd.DataFrame or None'

Fetch a Pipe's latest data from its connector.

Parameters

begin : Optional[datetime.datetime, str], default '':
If provided, only fetch data newer than or equal to begin.
end : Optional[datetime.datetime], default None:
If provided, only fetch data older than or equal to end.
sync_chunks : bool, default False
If True and the pipe's connector is of type 'sql', begin syncing chunks while fetching loads chunks into memory.
debug : bool, default False
Verbosity toggle.

Returns

A pd.DataFrame of the newest unseen data.

Expand source code
def fetch(
        self,
        begin: Optional[datetime.datetime, str] = '',
        end: Optional[datetime.datetime] = None,
        sync_chunks: bool = False,
        deactivate_plugin_venv: bool = True,
        debug: bool = False,
        **kw: Any
    ) -> 'pd.DataFrame or None':
    """
    Fetch a Pipe's latest data from its connector.

    Parameters
    ----------
    begin: Optional[datetime.datetime, str], default '':
        If provided, only fetch data newer than or equal to `begin`.

    end: Optional[datetime.datetime], default None:
        If provided, only fetch data older than or equal to `end`.

    sync_chunks: bool, default False
        If `True` and the pipe's connector is of type `'sql'`, begin syncing chunks while fetching
        loads chunks into memory.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `pd.DataFrame` of the newest unseen data.

    """
    if 'fetch' not in dir(self.connector):
        from meerschaum.utils.warnings import warn
        warn(f"No `fetch()` function defined for connector '{self.connector}'")
        return None

    from meerschaum.connectors import custom_types
    from meerschaum.utils.debug import dprint, _checkpoint
    if (
        self.connector.type == 'plugin'
        or
        self.connector.type in custom_types
    ):
        from meerschaum.plugins import Plugin
        from meerschaum.utils.packages import activate_venv, deactivate_venv
        plugin_name = (
            self.connector.label if self.connector.type == 'plugin'
            else self.connector.__module__.replace('plugins.', '').split('.')[0]
        )
        connector_plugin = Plugin(plugin_name)
        connector_plugin.activate_venv(debug=debug)
    
    _chunk_hook = kw.pop('chunk_hook', None)
    if sync_chunks and _chunk_hook is None:

        def _chunk_hook(chunk, **_kw) -> SuccessTuple:
            """
            Wrap `Pipe.sync()` with a custom chunk label prepended to the message.
            """
            from meerschaum.config._patch import apply_patch_to_config
            kwargs = apply_patch_to_config(kw, _kw)
            chunk_success, chunk_message = self.sync(chunk, **kwargs)
            chunk_label = self._get_chunk_label(chunk, self.columns.get('datetime', None))
            if chunk_label:
                chunk_message = '\n' + chunk_label + '\n' + chunk_message
            return chunk_success, chunk_message

    workers = kw.get('workers', None)
    if workers is None and not getattr(self.instance_connector, 'IS_THREAD_SAFE', False):
        workers = 1
    kw['workers'] = workers

    df = self.connector.fetch(
        self,
        begin = begin,
        end = end,
        chunk_hook = _chunk_hook,
        debug = debug,
        **kw
    )
    if (
        self.connector.type == 'plugin'
        or
        self.connector.type in custom_types
    ):
        connector_plugin.deactivate_venv(debug=debug)

    return df
def filter_existing(self, df: "'pd.DataFrame'", chunksize: Optional[int] = -1, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw) ‑> Tuple['pd.DataFrame', 'pd.DataFrame', 'pd.DataFrame']

Inspect a dataframe and filter out rows which already exist in the pipe.

Parameters

df : 'pd.DataFrame'
The dataframe to inspect and filter.
chunksize : Optional[int], default -1
The chunksize used when fetching existing data.
params : Optional[Dict[str, Any]], default None
If provided, use this filter when searching for existing data.
debug : bool, default False
Verbosity toggle.

Returns

A tuple of three pandas DataFrames: unseen, update, and delta.

Expand source code
def filter_existing(
        self,
        df: 'pd.DataFrame',
        chunksize: Optional[int] = -1,
        params: Optional[Dict[str, Any]] = None,
        debug: bool = False,
        **kw
    ) -> Tuple['pd.DataFrame', 'pd.DataFrame', 'pd.DataFrame']:
    """
    Inspect a dataframe and filter out rows which already exist in the pipe.

    Parameters
    ----------
    df: 'pd.DataFrame'
        The dataframe to inspect and filter.
        
    chunksize: Optional[int], default -1
        The `chunksize` used when fetching existing data.

    params: Optional[Dict[str, Any]], default None
        If provided, use this filter when searching for existing data. 

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A tuple of three pandas DataFrames: unseen, update, and delta.
    """
    import datetime
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.packages import attempt_import, import_pandas
    from meerschaum.utils.misc import (
        round_time,
        filter_unseen_df,
        add_missing_cols_to_df,
        to_pandas_dtype,
        get_unhashable_cols,
    )

    pd = import_pandas()
    if not isinstance(df, pd.DataFrame):
        df = self.enforce_dtypes(df, debug=debug)

    if df.empty:
        return df, df, df

    ### begin is the oldest data in the new dataframe
    begin, end = None, None
    dt_col = self.columns.get('datetime', None)
    dt_type = self.dtypes.get(dt_col, 'datetime64[ns]') if dt_col else None
    try:
        min_dt_val = df[dt_col].min(skipna=True) if dt_col else None
        min_dt = (
            pd.to_datetime(min_dt_val).to_pydatetime()
            if min_dt_val is not None and 'datetime' in str(dt_type)
            else min_dt_val
        )
    except Exception as e:
        min_dt = None
    if not ('datetime' in str(type(min_dt))) or str(min_dt) == 'NaT':
        if 'int' not in str(type(min_dt)).lower():
            min_dt = None

    if isinstance(min_dt, datetime.datetime):
        begin = (
            round_time(
                min_dt,
                to = 'down'
            ) - datetime.timedelta(minutes=1)
        )
    elif dt_type and 'int' in dt_type.lower():
        begin = min_dt
    elif dt_col is None:
        begin = None

    ### end is the newest data in the new dataframe
    try:
        max_dt_val = df[dt_col].max(skipna=True) if dt_col else None
        max_dt = (
            pd.to_datetime(max_dt_val).to_pydatetime()
            if max_dt_val is not None and 'datetime' in str(dt_type)
            else max_dt_val
        )
    except Exception as e:
        import traceback
        traceback.print_exc()
        max_dt = None

    if not ('datetime' in str(type(max_dt))) or str(min_dt) == 'NaT':
        if 'int' not in str(type(max_dt)).lower():
            max_dt = None

    if isinstance(max_dt, datetime.datetime):
        end = (
            round_time(
                max_dt,
                to = 'down'
            ) + datetime.timedelta(minutes=1)
        )
    elif dt_type and 'int' in dt_type.lower():
        end = max_dt + 1

    if max_dt is not None and min_dt is not None and min_dt > max_dt:
        warn(f"Detected minimum datetime greater than maximum datetime.")

    if begin is not None and end is not None and begin > end:
        if isinstance(begin, datetime.datetime):
            begin = end - datetime.timedelta(minutes=1)
        ### We might be using integers for out datetime axis.
        else:
            begin = end - 1

    if debug:
        dprint(f"Looking at data between '{begin}' and '{end}'.", **kw)

    backtrack_df = self.get_data(
        begin = begin,
        end = end,
        chunksize = chunksize,
        params = params,
        debug = debug,
        **kw
    )
    if debug:
        dprint("Existing data:\n" + str(backtrack_df), **kw)
        dprint("Existing dtypes:\n" + str(backtrack_df.dtypes))

    ### Separate new rows from changed ones.
    on_cols = [
        col for col_key, col in self.columns.items()
        if (
            col
            and
            col_key != 'value'
            and col in backtrack_df.columns
        )
    ]
    on_cols_dtypes = {
        col: to_pandas_dtype(typ)
        for col, typ in self.dtypes.items()
        if col in on_cols
    }

    ### Detect changes between the old target and new source dataframes.
    delta_df = add_missing_cols_to_df(
        filter_unseen_df(
            backtrack_df,
            df,
            dtypes = {
                col: to_pandas_dtype(typ)
                for col, typ in self.dtypes.items()
            },
            debug = debug
        ),
        on_cols_dtypes,
    )

    ### Cast dicts or lists to strings so we can merge.
    unhashable_delta_cols = get_unhashable_cols(delta_df)
    unhashable_backtrack_cols = get_unhashable_cols(backtrack_df)
    for col in unhashable_delta_cols:
        delta_df[col] = delta_df[col].apply(json.dumps)
    for col in unhashable_backtrack_cols:
        backtrack_df[col] = backtrack_df[col].apply(json.dumps)
    casted_cols = set(unhashable_delta_cols + unhashable_backtrack_cols)

    joined_df = pd.merge(
        delta_df,
        backtrack_df,
        how = 'left',
        on = on_cols,
        indicator = True,
        suffixes = ('', '_old'),
    ) if on_cols else delta_df
    for col in casted_cols:
        if col in joined_df.columns:
            joined_df[col] = joined_df[col].apply(
                lambda x: (
                    json.loads(x)
                    if isinstance(x, str)
                    else x
                )
            )

    ### Determine which rows are completely new.
    new_rows_mask = (joined_df['_merge'] == 'left_only') if on_cols else None
    cols = list(backtrack_df.columns)

    unseen_df = (
        joined_df
        .where(new_rows_mask)
        .dropna(how='all')[cols]
        .reset_index(drop=True)
    ) if on_cols else delta_df

    ### Rows that have already been inserted but values have changed.
    update_df = (
        joined_df
        .where(~new_rows_mask)
        .dropna(how='all')[cols]
        .reset_index(drop=True)
    ) if on_cols else None

    return unseen_df, update_df, delta_df
def get_backtrack_data(self, backtrack_minutes: int = 0, begin: "Optional['datetime.datetime']" = None, fresh: bool = False, debug: bool = False, **kw: Any) ‑> Optional['pd.DataFrame']

Get the most recent data from the instance connector as a Pandas DataFrame.

Parameters

backtrack_minutes : int, default 0
How many minutes from begin to select from. Defaults to 0. This may return a few rows due to a rounding quirk.
begin : Optional[datetime.datetime], default None
The starting point to search for data. If begin is None (default), use the most recent observed datetime (AKA sync_time).
E.g. begin = 02:00

Search this region.           Ignore this, even if there's data.
/  /  /  /  /  /  /  /  /  |
-----|----------|----------|----------|----------|----------|
00:00      01:00      02:00      03:00      04:00      05:00

fresh : bool, default False
If True, Ignore local cache and pull directly from the instance connector. Only comes into effect if a pipe was created with cache=True.
debug : bool default False
Verbosity toggle.

Returns

A pd.DataFrame for the pipe's data corresponding to the provided parameters. Backtrack data is a convenient way to get a pipe's data "backtracked" from the most recent datetime.

Expand source code
def get_backtrack_data(
        self,
        backtrack_minutes: int = 0,
        begin: Optional['datetime.datetime'] = None,
        fresh: bool = False,
        debug: bool = False,
        **kw: Any
    ) -> Optional['pd.DataFrame']:
    """
    Get the most recent data from the instance connector as a Pandas DataFrame.

    Parameters
    ----------
    backtrack_minutes: int, default 0
        How many minutes from `begin` to select from.
        Defaults to 0. This may return a few rows due to a rounding quirk.
    begin: Optional[datetime.datetime], default None
        The starting point to search for data.
        If begin is `None` (default), use the most recent observed datetime
        (AKA sync_time).
        
        
    ```
    E.g. begin = 02:00

    Search this region.           Ignore this, even if there's data.
    /  /  /  /  /  /  /  /  /  |
    -----|----------|----------|----------|----------|----------|
    00:00      01:00      02:00      03:00      04:00      05:00

    ```

    fresh: bool, default False
        If `True`, Ignore local cache and pull directly from the instance connector.
        Only comes into effect if a pipe was created with `cache=True`.

    debug: bool default False
        Verbosity toggle.

    Returns
    -------
    A `pd.DataFrame` for the pipe's data corresponding to the provided parameters. Backtrack data
    is a convenient way to get a pipe's data "backtracked" from the most recent datetime.

    """
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.venv import Venv
    from meerschaum.connectors import get_connector_plugin

    kw.update({'backtrack_minutes': backtrack_minutes, 'begin': begin,})

    if not self.exists(debug=debug):
        return None

    if self.cache_pipe is not None:
        if not fresh:
            _sync_cache_tuple = self.cache_pipe.sync(debug=debug, **kw)
            if not _sync_cache_tuple[0]:
                warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1])
                fresh = True
            else: ### Successfully synced cache.
                return self.enforce_dtypes(
                    self.cache_pipe.get_backtrack_data(debug=debug, fresh=True, **kw),
                    debug = debug,
                )

    ### If `fresh` or the syncing failed, directly pull from the instance connector.
    with Venv(get_connector_plugin(self.instance_connector)):
        return self.enforce_dtypes(
            self.instance_connector.get_backtrack_data(
                pipe = self,
                debug = debug,
                **kw
            ),
            debug = debug,
        )
def get_columns(self, *args: str, error: bool = False) ‑> Union[str, Tuple[str]]

Check if the requested columns are defined.

Parameters

*args : str
The column names to be retrieved.
error : bool, default False
If True, raise an Exception if the specified column is not defined.

Returns

A tuple of the same size of args or a str if args is a single argument.

Examples

>>> pipe = mrsm.Pipe('test', 'test')
>>> pipe.columns = {'datetime': 'dt', 'id': 'id'}
>>> pipe.get_columns('datetime', 'id')
('dt', 'id')
>>> pipe.get_columns('value', error=True)
Exception:  🛑 Missing 'value' column for Pipe('test', 'test').
Expand source code
def get_columns(self, *args: str, error: bool = False) -> Union[str, Tuple[str]]:
    """
    Check if the requested columns are defined.

    Parameters
    ----------
    *args: str
        The column names to be retrieved.
        
    error: bool, default False
        If `True`, raise an `Exception` if the specified column is not defined.

    Returns
    -------
    A tuple of the same size of `args` or a `str` if `args` is a single argument.

    Examples
    --------
    >>> pipe = mrsm.Pipe('test', 'test')
    >>> pipe.columns = {'datetime': 'dt', 'id': 'id'}
    >>> pipe.get_columns('datetime', 'id')
    ('dt', 'id')
    >>> pipe.get_columns('value', error=True)
    Exception:  🛑 Missing 'value' column for Pipe('test', 'test').
    """
    from meerschaum.utils.warnings import error as _error, warn
    if not args:
        args = tuple(self.columns.keys())
    col_names = []
    for col in args:
        col_name = None
        try:
            col_name = self.columns[col]
            if col_name is None and error:
                _error(f"Please define the name of the '{col}' column for {self}.")
        except Exception as e:
            col_name = None
        if col_name is None and error:
            _error(f"Missing '{col}'" + f" column for {self}.")
        col_names.append(col_name)
    if len(col_names) == 1:
        return col_names[0]
    return tuple(col_names)
def get_columns_types(self, debug: bool = False) ‑> Optional[Dict[str, str]]

Get a dictionary of a pipe's column names and their types.

Parameters

debug : bool, default False:
Verbosity toggle.

Returns

A dictionary of column names (str) to column types (str).

Examples

>>> pipe.get_columns_types()
{
  'dt': 'TIMESTAMP WITHOUT TIMEZONE',
  'id': 'BIGINT',
  'val': 'DOUBLE PRECISION',
}
>>>
Expand source code
def get_columns_types(self, debug: bool = False) -> Union[Dict[str, str], None]:
    """
    Get a dictionary of a pipe's column names and their types.

    Parameters
    ----------
    debug: bool, default False:
        Verbosity toggle.

    Returns
    -------
    A dictionary of column names (`str`) to column types (`str`).

    Examples
    --------
    >>> pipe.get_columns_types()
    {
      'dt': 'TIMESTAMP WITHOUT TIMEZONE',
      'id': 'BIGINT',
      'val': 'DOUBLE PRECISION',
    }
    >>>
    """
    from meerschaum.utils.venv import Venv
    from meerschaum.connectors import get_connector_plugin

    with Venv(get_connector_plugin(self.instance_connector)):
        return self.instance_connector.get_pipe_columns_types(self, debug=debug)
def get_data(self, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, params: Optional[Dict[str, Any]] = None, as_iterator: bool = False, as_chunks: bool = False, chunk_interval: Union[datetime.datetime, int, None] = None, fresh: bool = False, debug: bool = False, **kw: Any) ‑> Union['pd.DataFrame', Generator['pd.DataFrame'], None]

Get a pipe's data from the instance connector.

Parameters

begin : Optional[datetime.datetime], default None
Lower bound datetime to begin searching for data (inclusive). Translates to a WHERE clause like WHERE datetime >= begin. Defaults to None.
end : Optional[datetime.datetime], default None
Upper bound datetime to stop searching for data (inclusive). Translates to a WHERE clause like WHERE datetime < end. Defaults to None.
params : Optional[Dict[str, Any]], default None
Filter the retrieved data by a dictionary of parameters. See build_where() for more details.
as_iterator : bool, default False
If True, return a generator of chunks of pipe data.
as_chunks : bool, default False
Alias for as_iterator.
chunk_interval : int, default None
If as_iterator, then return chunks with begin and end separated by this interval. By default, use a timedelta of 1 day. If the datetime axis is an integer, default to the configured chunksize. Note that because end is always non-inclusive, there will be chunk_interval - 1 rows per chunk for integers.
fresh : bool, default True
If True, skip local cache and directly query the instance connector. Defaults to True.
debug : bool, default False
Verbosity toggle. Defaults to False.

Returns

A pd.DataFrame for the pipe's data corresponding to the provided parameters.

Expand source code
def get_data(
        self,
        begin: Optional[datetime.datetime] = None,
        end: Optional[datetime.datetime] = None,
        params: Optional[Dict[str, Any]] = None,
        as_iterator: bool = False,
        as_chunks: bool = False,
        chunk_interval: Union[datetime.datetime, int, None] = None,
        fresh: bool = False,
        debug: bool = False,
        **kw: Any
    ) -> Union['pd.DataFrame', Generator['pd.DataFrame'], None]:
    """
    Get a pipe's data from the instance connector.

    Parameters
    ----------
    begin: Optional[datetime.datetime], default None
        Lower bound datetime to begin searching for data (inclusive).
        Translates to a `WHERE` clause like `WHERE datetime >= begin`.
        Defaults to `None`.

    end: Optional[datetime.datetime], default None
        Upper bound datetime to stop searching for data (inclusive).
        Translates to a `WHERE` clause like `WHERE datetime < end`.
        Defaults to `None`.

    params: Optional[Dict[str, Any]], default None
        Filter the retrieved data by a dictionary of parameters.
        See `meerschaum.utils.sql.build_where` for more details. 

    as_iterator: bool, default False
        If `True`, return a generator of chunks of pipe data.

    as_chunks: bool, default False
        Alias for `as_iterator`.

    chunk_interval: int, default None
        If `as_iterator`, then return chunks with `begin` and `end` separated by this interval.
        By default, use a timedelta of 1 day.
        If the `datetime` axis is an integer, default to the configured chunksize.
        Note that because `end` is always non-inclusive,
        there will be `chunk_interval - 1` rows per chunk for integers.

    fresh: bool, default True
        If `True`, skip local cache and directly query the instance connector.
        Defaults to `True`.

    debug: bool, default False
        Verbosity toggle.
        Defaults to `False`.

    Returns
    -------
    A `pd.DataFrame` for the pipe's data corresponding to the provided parameters.

    """
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.venv import Venv
    from meerschaum.connectors import get_connector_plugin
    from meerschaum.utils.misc import iterate_chunks
    from meerschaum.config import get_config
    kw.update({'begin': begin, 'end': end, 'params': params,})

    as_iterator = as_iterator or as_chunks

    if as_iterator or as_chunks:
        return self._get_data_as_iterator(
            begin = begin,
            end = end,
            params = params,
            chunk_interval = chunk_interval,
            fresh = fresh,
            debug = debug,
        )

    if not self.exists(debug=debug):
        return None
       
    if self.cache_pipe is not None:
        if not fresh:
            _sync_cache_tuple = self.cache_pipe.sync(debug=debug, **kw)
            if not _sync_cache_tuple[0]:
                warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1])
                fresh = True
            else: ### Successfully synced cache.
                return self.enforce_dtypes(
                    self.cache_pipe.get_data(debug=debug, fresh=True, **kw),
                    debug = debug,
                )

    ### If `fresh` or the syncing failed, directly pull from the instance connector.
    with Venv(get_connector_plugin(self.instance_connector)):
        return self.enforce_dtypes(
            self.instance_connector.get_pipe_data(
                pipe = self,
                debug = debug,
                **kw
            ),
            debug = debug,
        )
def get_id(self, **kw: Any) ‑> Optional[int]

Fetch a pipe's ID from its instance connector. If the pipe does not exist, return None.

Expand source code
def get_id(self, **kw: Any) -> Union[int, None]:
    """
    Fetch a pipe's ID from its instance connector.
    If the pipe does not exist, return `None`.
    """
    if self.temporary:
        return None
    from meerschaum.utils.venv import Venv
    from meerschaum.connectors import get_connector_plugin

    with Venv(get_connector_plugin(self.instance_connector)):
        return self.instance_connector.get_pipe_id(self, **kw)
def get_indices(self) ‑> Dict[str, str]

Return a dictionary in the form of pipe.columns but map to index names.

Expand source code
def get_indices(self) -> Dict[str, str]:
    """
    Return a dictionary in the form of `pipe.columns` but map to index names.
    """
    return {
        ix: (self.target + '_' + col + '_index')
        for ix, col in self.columns.items() if col
    }
def get_rowcount(self, begin: "Optional['datetime.datetime']" = None, end: "Optional['datetime.datetime']" = None, remote: bool = False, params: Optional[Dict[str, Any]] = None, debug: bool = False) ‑> Optional[int]

Get a Pipe's instance or remote rowcount.

Parameters

begin : Optional[datetime.datetime], default None
Count rows where datetime > begin.
end : Optional[datetime.datetime], default None
Count rows where datetime < end.
remote : bool, default False
Count rows from a pipe's remote source. NOTE: This is experimental!
debug : bool, default False
Verbosity toggle.

Returns

An int of the number of rows in the pipe corresponding to the provided parameters. None is returned if the pipe does not exist.

Expand source code
def get_rowcount(
        self,
        begin: Optional['datetime.datetime'] = None,
        end: Optional['datetime.datetime'] = None,
        remote: bool = False,
        params: Optional[Dict[str, Any]] = None,
        debug: bool = False
    ) -> Union[int, None]:
    """
    Get a Pipe's instance or remote rowcount.

    Parameters
    ----------
    begin: Optional[datetime.datetime], default None
        Count rows where datetime > begin.

    end: Optional[datetime.datetime], default None
        Count rows where datetime < end.

    remote: bool, default False
        Count rows from a pipe's remote source.
        **NOTE**: This is experimental!

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    An `int` of the number of rows in the pipe corresponding to the provided parameters.
    `None` is returned if the pipe does not exist.

    """
    from meerschaum.utils.warnings import warn
    from meerschaum.utils.venv import Venv
    from meerschaum.connectors import get_connector_plugin

    connector = self.instance_connector if not remote else self.connector
    try:
        with Venv(get_connector_plugin(connector)):
            return connector.get_pipe_rowcount(
                self, begin=begin, end=end, remote=remote, params=params, debug=debug
            )
    except AttributeError as e:
        warn(e)
        if remote:
            return None
    warn(f"Failed to get a rowcount for {self}.")
    return None
def get_sync_time(self, params: Optional[Dict[str, Any]] = None, newest: bool = True, round_down: bool = True, debug: bool = False) ‑> Optional[datetime.datetime]

Get the most recent datetime value for a Pipe.

Parameters

params : Optional[Dict[str, Any]], default None
Dictionary to build a WHERE clause for a specific column. See build_where().
newest : bool, default True
If True, get the most recent datetime (honoring params). If False, get the oldest datetime (ASC instead of DESC).
round_down : bool, default True
If True, round down the sync time to the nearest minute.
debug : bool, default False
Verbosity toggle.

Returns

A datetime.datetime object if the pipe exists, otherwise None.

Expand source code
def get_sync_time(
        self,
        params: Optional[Dict[str, Any]] = None,
        newest: bool = True,
        round_down: bool = True,
        debug: bool = False
    ) -> Union['datetime.datetime', None]:
    """
    Get the most recent datetime value for a Pipe.

    Parameters
    ----------
    params: Optional[Dict[str, Any]], default None
        Dictionary to build a WHERE clause for a specific column.
        See `meerschaum.utils.sql.build_where`.

    newest: bool, default True
        If `True`, get the most recent datetime (honoring `params`).
        If `False`, get the oldest datetime (`ASC` instead of `DESC`).

    round_down: bool, default True
        If `True`, round down the sync time to the nearest minute.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `datetime.datetime` object if the pipe exists, otherwise `None`.

    """
    from meerschaum.utils.venv import Venv
    from meerschaum.connectors import get_connector_plugin

    with Venv(get_connector_plugin(self.instance_connector)):
        return self.instance_connector.get_sync_time(
            self,
            params = params,
            newest = newest,
            round_down = round_down,
            debug = debug,
        )
def get_val_column(self, debug: bool = False) ‑> Optional[str]

Return the name of the value column if it's defined, otherwise make an educated guess. If not set in the columns dictionary, return the first numeric column that is not an ID or datetime column. If none may be found, return None.

Parameters

debug : bool, default False:
Verbosity toggle.

Returns

Either a string or None.

Expand source code
def get_val_column(self, debug: bool = False) -> Union[str, None]:
    """
    Return the name of the value column if it's defined, otherwise make an educated guess.
    If not set in the `columns` dictionary, return the first numeric column that is not
    an ID or datetime column.
    If none may be found, return `None`.

    Parameters
    ----------
    debug: bool, default False:
        Verbosity toggle.

    Returns
    -------
    Either a string or `None`.
    """
    from meerschaum.utils.debug import dprint
    if debug:
        dprint('Attempting to determine the value column...')
    try:
        val_name = self.get_columns('value')
    except Exception as e:
        val_name = None
    if val_name is not None:
        if debug:
            dprint(f"Value column: {val_name}")
        return val_name

    cols = self.columns
    if cols is None:
        if debug:
            dprint('No columns could be determined. Returning...')
        return None
    try:
        dt_name = self.get_columns('datetime', error=False)
    except Exception as e:
        dt_name = None
    try:
        id_name = self.get_columns('id', errors=False)
    except Exception as e:
        id_name = None

    if debug:
        dprint(f"dt_name: {dt_name}")
        dprint(f"id_name: {id_name}")

    cols_types = self.get_columns_types(debug=debug)
    if cols_types is None:
        return None
    if debug:
        dprint(f"cols_types: {cols_types}")
    if dt_name is not None:
        cols_types.pop(dt_name, None)
    if id_name is not None:
        cols_types.pop(id_name, None)

    candidates = []
    candidate_keywords = {'float', 'double', 'precision', 'int', 'numeric',}
    for search_term in candidate_keywords:
        for col, typ in cols_types.items():
            if search_term in typ.lower():
                candidates.append(col)
                break
    if not candidates:
        if debug:
            dprint(f"No value column could be determined.")
        return None

    return candidates[0]
def guess_datetime(self) ‑> Optional[str]

Try to determine a pipe's datetime column.

Expand source code
def guess_datetime(self) -> Union[str, None]:
    """
    Try to determine a pipe's datetime column.
    """
    dtypes = self.dtypes

    ### Abort if the user explictly disallows a datetime index.
    if 'datetime' in dtypes:
        if dtypes['datetime'] is None:
            return None

    dt_cols = [
        col for col, typ in self.dtypes.items()
        if str(typ).startswith('datetime')
    ]
    if not dt_cols:
        return None
    return dt_cols[0]    
def infer_dtypes(self, persist: bool = False, debug: bool = False) ‑> Dict[str, Any]

If dtypes is not set in Pipe.parameters, infer the data types from the underlying table if it exists.

Parameters

persist : bool, default False
If True, persist the inferred data types to Pipe.parameters.

Returns

A dictionary of strings containing the pandas data types for this Pipe.

Expand source code
def infer_dtypes(self, persist: bool=False, debug: bool=False) -> Dict[str, Any]:
    """
    If `dtypes` is not set in `meerschaum.Pipe.parameters`,
    infer the data types from the underlying table if it exists.

    Parameters
    ----------
    persist: bool, default False
        If `True`, persist the inferred data types to `meerschaum.Pipe.parameters`.

    Returns
    -------
    A dictionary of strings containing the pandas data types for this Pipe.
    """
    if not self.exists(debug=debug):
        dtypes = {}
        if not self.columns:
            return {}
        dt_col = self.columns.get('datetime', None)
        if dt_col:
            if not self.parameters.get('dtypes', {}).get(dt_col, None):
                dtypes[dt_col] = 'datetime64[ns]'
        return dtypes

    from meerschaum.utils.sql import get_pd_type
    columns_types = self.get_columns_types(debug=debug)
    dtypes = {
        c: get_pd_type(t, allow_custom_dtypes=True)
        for c, t in columns_types.items()
    } if columns_types else {}
    if persist:
        self.dtypes = dtypes
        self.edit(interactive=False, debug=debug)
    return dtypes
def register(self, debug: bool = False, **kw: Any) ‑> Tuple[bool, str]

Register a new Pipe along with its attributes.

Parameters

debug : bool, default False
Verbosity toggle.
kw : Any
Keyword arguments to pass to instance_connector.register_pipe().

Returns

A SuccessTuple of success, message.

Expand source code
def register(
        self,
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """
    Register a new Pipe along with its attributes.

    Parameters
    ----------
    debug: bool, default False
        Verbosity toggle.

    kw: Any
        Keyword arguments to pass to `instance_connector.register_pipe()`.

    Returns
    -------
    A `SuccessTuple` of success, message.
    """
    if self.temporary:
        return False, "Cannot register pipes created with `temporary=True` (read-only)."

    from meerschaum.utils.formatting import get_console
    from meerschaum.utils.venv import Venv
    from meerschaum.connectors import get_connector_plugin, custom_types
    from meerschaum.config._patch import apply_patch_to_config

    import warnings
    with warnings.catch_warnings():
        warnings.simplefilter('ignore')
        try:
            _conn = self.connector
        except Exception as e:
            _conn = None

    if (
        _conn is not None
        and
        (_conn.type == 'plugin' or _conn.type in custom_types)
        and
        getattr(_conn, 'register', None) is not None
    ):
        try:
            with Venv(get_connector_plugin(_conn), debug=debug):
                params = self.connector.register(self)
        except Exception as e:
            get_console().print_exception()
            params = None
        params = {} if params is None else params
        if not isinstance(params, dict):
            from meerschaum.utils.warnings import warn
            warn(
                f"Invalid parameters returned from `register()` in connector {self.connector}:\n"
                + f"{params}"
            )
        else:
            self.parameters = apply_patch_to_config(params, self.parameters)

    if not self.parameters:
        cols = self.columns if self.columns else {'datetime': None, 'id': None}
        self.parameters = {
            'columns': cols,
        }

    with Venv(get_connector_plugin(self.instance_connector)):
        return self.instance_connector.register_pipe(self, debug=debug, **kw)
def show(self, nopretty: bool = False, debug: bool = False, **kw) ‑> Tuple[bool, str]

Show attributes of a Pipe.

Parameters

nopretty : bool, default False
If True, simply print the JSON of the pipe's attributes.
debug : bool, default False
Verbosity toggle.

Returns

A SuccessTuple of success, message.

Expand source code
def show(
        self,
        nopretty: bool = False,
        debug: bool = False,
        **kw
    ) -> SuccessTuple:
    """
    Show attributes of a Pipe.

    Parameters
    ----------
    nopretty: bool, default False
        If `True`, simply print the JSON of the pipe's attributes.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `SuccessTuple` of success, message.

    """
    import json
    from meerschaum.utils.formatting import (
        pprint, make_header, ANSI, highlight_pipes, fill_ansi, get_console,
    )
    from meerschaum.utils.packages import import_rich, attempt_import
    from meerschaum.utils.warnings import info
    attributes_json = json.dumps(self.attributes)
    if not nopretty:
        _to_print = f"Attributes for {self}:"
        if ANSI:
            _to_print = fill_ansi(highlight_pipes(make_header(_to_print)), 'magenta')
            print(_to_print)
            rich = import_rich()
            rich_json = attempt_import('rich.json')
            get_console().print(rich_json.JSON(attributes_json))
        else:
            print(_to_print)
    else:
        print(attributes_json)

    return True, "Success"
def sync(self, df: Union[pd.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]], InferFetch] = meerschaum.core.Pipe._sync.InferFetch, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, force: bool = False, retries: int = 10, min_seconds: int = 1, check_existing: bool = True, blocking: bool = True, workers: Optional[int] = None, callback: Optional[Callable[[Tuple[bool, str]], Any]] = None, error_callback: Optional[Callable[[Exception], Any]] = None, chunksize: Optional[int] = -1, sync_chunks: bool = True, debug: bool = False, **kw: Any) ‑> SuccessTuple

Fetch new data from the source and update the pipe's table with new data.

Get new remote data via fetch, get existing data in the same time period, and merge the two, only keeping the unseen data.

Parameters

df : Union[None, pd.DataFrame, Dict[str, List[Any]]], default None
An optional DataFrame to sync into the pipe. Defaults to None.
begin : Optional[datetime.datetime], default None
Optionally specify the earliest datetime to search for data. Defaults to None.
end : Optional[datetime.datetime], default None
Optionally specify the latest datetime to search for data. Defaults to None.
force : bool, default False
If True, keep trying to sync untul retries attempts. Defaults to False.
retries : int, default 10
If force, how many attempts to try syncing before declaring failure. Defaults to 10.
min_seconds : Union[int, float], default 1
If force, how many seconds to sleep between retries. Defaults to 1.
check_existing : bool, default True
If True, pull and diff with existing data from the pipe. Defaults to True.
blocking : bool, default True
If True, wait for sync to finish and return its result, otherwise asyncronously sync (oxymoron?) and return success. Defaults to True. Only intended for specific scenarios.
workers : Optional[int], default None
No use directly within sync(). Instead is passed on to instance connectors' sync_pipe() methods (e.g. meerschaum.connectors.plugin.PluginConnector). Defaults to None.
callback : Optional[Callable[[Tuple[bool, str]], Any]], default None
Callback function which expects a SuccessTuple as input. Only applies when blocking=False.
error_callback : Optional[Callable[[Exception], Any]], default None
Callback function which expects an Exception as input. Only applies when blocking=False.
chunksize : int, default -1
Specify the number of rows to sync per chunk. If -1, resort to system configuration (default is 900). A chunksize of None will sync all rows in one transaction. Defaults to -1.
sync_chunks : bool, default True
If possible, sync chunks while fetching them into memory.
debug : bool, default False
Verbosity toggle. Defaults to False.

Returns

A SuccessTuple of success (bool) and message (str).

Expand source code
def sync(
        self,
        df: Union[
            pd.DataFrame,
            Dict[str, List[Any]],
            List[Dict[str, Any]],
            InferFetch
        ] = InferFetch,
        begin: Optional[datetime.datetime] = None,
        end: Optional[datetime.datetime] = None,
        force: bool = False,
        retries: int = 10,
        min_seconds: int = 1,
        check_existing: bool = True,
        blocking: bool = True,
        workers: Optional[int] = None,
        callback: Optional[Callable[[Tuple[bool, str]], Any]] = None,
        error_callback: Optional[Callable[[Exception], Any]] = None,
        chunksize: Optional[int] = -1,
        sync_chunks: bool = True,
        debug: bool = False,
        **kw: Any
    ) -> SuccessTuple:
    """
    Fetch new data from the source and update the pipe's table with new data.
    
    Get new remote data via fetch, get existing data in the same time period,
    and merge the two, only keeping the unseen data.

    Parameters
    ----------
    df: Union[None, pd.DataFrame, Dict[str, List[Any]]], default None
        An optional DataFrame to sync into the pipe. Defaults to `None`.

    begin: Optional[datetime.datetime], default None
        Optionally specify the earliest datetime to search for data.
        Defaults to `None`.

    end: Optional[datetime.datetime], default None
        Optionally specify the latest datetime to search for data.
        Defaults to `None`.

    force: bool, default False
        If `True`, keep trying to sync untul `retries` attempts.
        Defaults to `False`.

    retries: int, default 10
        If `force`, how many attempts to try syncing before declaring failure.
        Defaults to `10`.

    min_seconds: Union[int, float], default 1
        If `force`, how many seconds to sleep between retries. Defaults to `1`.

    check_existing: bool, default True
        If `True`, pull and diff with existing data from the pipe.
        Defaults to `True`.

    blocking: bool, default True
        If `True`, wait for sync to finish and return its result, otherwise
        asyncronously sync (oxymoron?) and return success. Defaults to `True`.
        Only intended for specific scenarios.

    workers: Optional[int], default None
        No use directly within `Pipe.sync()`. Instead is passed on to
        instance connectors' `sync_pipe()` methods
        (e.g. `meerschaum.connectors.plugin.PluginConnector`).
        Defaults to `None`.

    callback: Optional[Callable[[Tuple[bool, str]], Any]], default None
        Callback function which expects a SuccessTuple as input.
        Only applies when `blocking=False`.

    error_callback: Optional[Callable[[Exception], Any]], default None
        Callback function which expects an Exception as input.
        Only applies when `blocking=False`.

    chunksize: int, default -1
        Specify the number of rows to sync per chunk.
        If `-1`, resort to system configuration (default is `900`).
        A `chunksize` of `None` will sync all rows in one transaction.
        Defaults to `-1`.

    sync_chunks: bool, default True
        If possible, sync chunks while fetching them into memory.

    debug: bool, default False
        Verbosity toggle. Defaults to False.

    Returns
    -------
    A `SuccessTuple` of success (`bool`) and message (`str`).

    """
    from meerschaum.utils.debug import dprint, _checkpoint
    from meerschaum.utils.warnings import warn, error
    from meerschaum.connectors import custom_types
    from meerschaum.plugins import Plugin
    from meerschaum.utils.formatting import get_console
    from meerschaum.utils.venv import Venv
    from meerschaum.connectors import get_connector_plugin
    from meerschaum.utils.misc import df_is_chunk_generator
    from meerschaum.utils.pool import get_pool
    from meerschaum.config import get_config

    if (callback is not None or error_callback is not None) and blocking:
        warn("Callback functions are only executed when blocking = False. Ignoring...")

    _checkpoint(_total=2, **kw)

    if chunksize == 0:
        chunksize = None
        sync_chunks = False

    ### TODO: Add flag for specifying syncing method.
    begin = _determine_begin(self, begin, debug=debug)
    kw.update({
        'begin': begin, 'end': end, 'force': force, 'retries': retries, 'min_seconds': min_seconds,
        'check_existing': check_existing, 'blocking': blocking, 'workers': workers,
        'callback': callback, 'error_callback': error_callback, 'sync_chunks': sync_chunks,
        'chunksize': chunksize,
    })

    ### NOTE: Invalidate `_exists` cache before and after syncing.
    self._exists = None

    def _sync(
        p: 'meerschaum.Pipe',
        df: Union[
            'pd.DataFrame',
            Dict[str, List[Any]],
            List[Dict[str, Any]],
            InferFetch
        ] = InferFetch,
    ) -> SuccessTuple:
        if df is None:
            p._exists = None
            return (
                False,
                f"You passed `None` instead of data into `sync()` for {p}.\n"
                + "Omit the DataFrame to infer fetching.",
            )
        ### Ensure that Pipe is registered.
        if not p.temporary and p.get_id(debug=debug) is None:
            ### NOTE: This may trigger an interactive session for plugins!
            register_tuple = p.register(debug=debug)
            if not register_tuple[0]:
                p._exists = None
                return register_tuple

        ### If connector is a plugin with a `sync()` method, return that instead.
        ### If the plugin does not have a `sync()` method but does have a `fetch()` method,
        ### use that instead.
        ### NOTE: The DataFrame must be omitted for the plugin sync method to apply.
        ### If a DataFrame is provided, continue as expected.
        if hasattr(df, 'MRSM_INFER_FETCH'):
            try:
                if p.connector is None:
                    msg = f"{p} does not have a valid connector."
                    if p.connector_keys.startswith('plugin:'):
                        msg += f"\n    Perhaps {p.connector_keys} has a syntax error?"
                    p._exists = None
                    return False, msg
            except Exception as e:
                p._exists = None
                return False, f"Unable to create the connector for {p}."

            ### Sync in place if this is a SQL pipe.
            if (
                str(self.connector) == str(self.instance_connector)
                and 
                hasattr(self.instance_connector, 'sync_pipe_inplace')
                and
                get_config('system', 'experimental', 'inplace_sync')
            ):
                with Venv(get_connector_plugin(self.instance_connector)):
                    p._exists = None
                    return self.instance_connector.sync_pipe_inplace(p, debug=debug, **kw)


            ### Activate and invoke `sync(pipe)` for plugin connectors with `sync` methods.
            try:
                if p.connector.type == 'plugin' and p.connector.sync is not None:
                    connector_plugin = Plugin(p.connector.label)
                    with Venv(connector_plugin, debug=debug):
                        return_tuple = p.connector.sync(p, debug=debug, **kw)
                    p._exists = None
                    if not isinstance(return_tuple, tuple):
                        return_tuple = (
                            False,
                            f"Plugin '{p.connector.label}' returned non-tuple value: {return_tuple}"
                        )
                    return return_tuple

            except Exception as e:
                get_console().print_exception()
                msg = f"Failed to sync {p} with exception: '" + str(e) + "'"
                if debug:
                    error(msg, silent=False)
                p._exists = None
                return False, msg

            ### Fetch the dataframe from the connector's `fetch()` method.
            try:
                ### If was added by `make_connector`, activate the plugin's virtual environment.
                is_custom = p.connector.type in custom_types
                plugin = (
                    Plugin(p.connector.__module__.replace('plugins.', '').split('.')[0])
                    if is_custom else (
                        Plugin(p.connector.label) if p.connector.type == 'plugin'
                        else None
                    )
                )
                with Venv(plugin, debug=debug):
                    df = p.fetch(debug=debug, **kw)

            except Exception as e:
                get_console().print_exception(
                    suppress = [
                        'meerschaum/core/Pipe/_sync.py', 
                        'meerschaum/core/Pipe/_fetch.py', 
                    ]
                )
                msg = f"Failed to fetch data from {p.connector}:\n    {e}"
                df = None

            if df is None:
                p._exists = None
                return False, f"No data were fetched for {p}."

            if isinstance(df, list):
                if len(df) == 0:
                    return True, f"No new rows were returned for {p}."

                ### May be a chunk hook results list.
                if isinstance(df[0], tuple):
                    success = all([_success for _success, _ in df])
                    message = '\n'.join([_message for _, _message in df])
                    return success, message

            ### TODO: Depreciate async?
            if df is True:
                p._exists = None
                return True, f"{p} is being synced in parallel."

        ### CHECKPOINT: Retrieved the DataFrame.
        _checkpoint(**kw)
        
        ### Allow for dataframe generators or iterables.
        if df_is_chunk_generator(df):
            is_thread_safe = getattr(self.instance_connector, 'IS_THREAD_SAFE', False)
            if is_thread_safe:
                engine_pool_size = (
                    p.instance_connector.engine.pool.size()
                    if p.instance_connector.type == 'sql'
                    else None
                )
                current_num_threads = len(threading.enumerate())
                workers = kw.get('workers', None)
                desired_workers = (
                    min(workers or engine_pool_size, engine_pool_size)
                    if engine_pool_size is not None
                    else (workers if is_thread_safe else 1)
                )
                if desired_workers is None:
                    desired_workers = (current_num_threads if is_thread_safe else 1)
                kw['workers'] = max(
                    (desired_workers - current_num_threads),
                    1,
                )
            else:
                kw['workers'] = 1

            dt_col = p.columns.get('datetime', None)


            pool = get_pool(workers=kw.get('workers', 1))
            if debug:
                dprint(f"Received {type(df)}. Attempting to sync first chunk...")
            try:
                chunk = next(df)
            except StopIteration:
                return True, "Received an empty generator; nothing to do."
            chunk_success, chunk_msg = _sync(p, chunk)
            chunk_msg = '\n' + self._get_chunk_label(chunk, dt_col) + '\n' + chunk_msg
            if not chunk_success:
                return chunk_success, f"Unable to sync initial chunk for {p}:\n{chunk_msg}"
            if debug:
                dprint(f"Successfully synced the first chunk, attemping the rest...")

            failed_chunks = []
            def _process_chunk(_chunk):
                try:
                    _chunk_success, _chunk_msg = _sync(p, _chunk)
                except Exception as e:
                    _chunk_success, _chunk_msg = False, str(e)
                if not _chunk_success:
                    failed_chunks.append(_chunk)
                return (
                    _chunk_success,
                    (
                        '\n'
                        + self._get_chunk_label(_chunk, dt_col)
                        + '\n'
                        + _chunk_msg
                    )
                )


            results = sorted(
                [(chunk_success, chunk_msg)] + (
                    list(pool.imap(_process_chunk, df))
                    if not df_is_chunk_generator(chunk)
                    else [
                        _process_chunk(_child_chunks)
                        for _child_chunks in df
                    ]
                )
            )
            chunk_messages = [chunk_msg for _, chunk_msg in results]
            success_bools = [chunk_success for chunk_success, _ in results]
            success = all(success_bools)
            msg = '\n'.join(chunk_messages)

            ### If some chunks succeeded, retry the failures.
            retry_success = True
            if not success and any(success_bools):
                if debug:
                    dprint(f"Retrying failed chunks...")
                chunks_to_retry = [c for c in failed_chunks]
                failed_chunks = []
                for chunk in chunks_to_retry:
                    chunk_success, chunk_msg = _process_chunk(chunk)
                    msg += f"\n\nRetried chunk:\n{chunk_msg}\n"
                    retry_success = retry_success and chunk_success

            success = success and retry_success
            return success, msg

        ### Cast to a dataframe and ensure datatypes are what we expect.
        df = self.enforce_dtypes(df, debug=debug)
        if debug:
            dprint(
                "DataFrame to sync:\n"
                + (str(df)[:255] + '...' if len(str(df)) >= 256 else str(df)),
                **kw
            )

        ### if force, continue to sync until success
        return_tuple = False, f"Did not sync {p}."
        run = True
        _retries = 1
        while run:
            with Venv(get_connector_plugin(self.instance_connector)):
                return_tuple = p.instance_connector.sync_pipe(
                    pipe = p,
                    df = df,
                    debug = debug,
                    **kw
                )
            _retries += 1
            run = (not return_tuple[0]) and force and _retries <= retries
            if run and debug:
                dprint(f"Syncing failed for {p}. Attempt ( {_retries} / {retries} )", **kw)
                dprint(f"Sleeping for {min_seconds} seconds...", **kw)
                time.sleep(min_seconds)
            if _retries > retries:
                warn(
                    f"Unable to sync {p} within {retries} attempt" +
                        ("s" if retries != 1 else "") + "!"
                )

        ### CHECKPOINT: Finished syncing. Handle caching.
        _checkpoint(**kw)
        if self.cache_pipe is not None:
            if debug:
                dprint(f"Caching retrieved dataframe.", **kw)
                _sync_cache_tuple = self.cache_pipe.sync(df, debug=debug, **kw)
                if not _sync_cache_tuple[0]:
                    warn(f"Failed to sync local cache for {self}.")

        self._exists = None
        return return_tuple

    if blocking:
        self._exists = None
        return _sync(self, df = df)

    ### TODO implement concurrent syncing (split DataFrame? mimic the functionality of modin?)
    from meerschaum.utils.threading import Thread
    def default_callback(result_tuple : SuccessTuple):
        dprint(f"Asynchronous result from {self}: {result_tuple}", **kw)
    def default_error_callback(x : Exception):
        dprint(f"Error received for {self}: {x}", **kw)
    if callback is None and debug:
        callback = default_callback
    if error_callback is None and debug:
        error_callback = default_error_callback
    try:
        thread = Thread(
            target = _sync,
            args = (self,),
            kwargs = {'df' : df},
            daemon = False,
            callback = callback,
            error_callback = error_callback
        )
        thread.start()
    except Exception as e:
        self._exists = None
        return False, str(e)
    self._exists = None
    return True, f"Spawned asyncronous sync for {self}."
def update(self, *args, **kw) ‑> Tuple[bool, str]

Update a pipe's parameters in its instance.

Expand source code
def update(self, *args, **kw) -> SuccessTuple:
    """
    Update a pipe's parameters in its instance.
    """
    kw['interactive'] = False
    return self.edit(*args, **kw)
class Plugin (name: str, version: Optional[str] = None, user_id: Optional[int] = None, required: Optional[List[str]] = None, attributes: Optional[Dict[str, Any]] = None, archive_path: Optional[pathlib.Path] = None, venv_path: Optional[pathlib.Path] = None, repo_connector: "Optional['meerschaum.connectors.api.APIConnector']" = None, repo: "Union['meerschaum.connectors.api.APIConnector', str, None]" = None)

Handle packaging of Meerschaum plugins.

Expand source code
class Plugin:
    """Handle packaging of Meerschaum plugins."""
    def __init__(
        self,
        name: str,
        version: Optional[str] = None,
        user_id: Optional[int] = None,
        required: Optional[List[str]] = None,
        attributes: Optional[Dict[str, Any]] = None,
        archive_path: Optional[pathlib.Path] = None,
        venv_path: Optional[pathlib.Path] = None,
        repo_connector: Optional['meerschaum.connectors.api.APIConnector'] = None,
        repo: Union['meerschaum.connectors.api.APIConnector', str, None] = None,
    ):
        from meerschaum.config.static import STATIC_CONFIG
        sep = STATIC_CONFIG['plugins']['repo_separator']
        _repo = None
        if sep in name:
            try:
                name, _repo = name.split(sep)
            except Exception as e:
                error(f"Invalid plugin name: '{name}'")
        self._repo_in_name = _repo

        if attributes is None:
            attributes = {}
        self.name = name
        self.attributes = attributes
        self.user_id = user_id
        self._version = version
        if required:
            self._required = required
        self.archive_path = (
            archive_path if archive_path is not None
            else PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz"
        )
        self.venv_path = (
            venv_path if venv_path is not None
            else VIRTENV_RESOURCES_PATH / self.name
        )
        self._repo_connector = repo_connector
        self._repo_keys = repo


    @property
    def repo_connector(self):
        """
        Return the repository connector for this plugin.
        NOTE: This imports the `connectors` module, which imports certain plugin modules.
        """
        if self._repo_connector is None:
            from meerschaum.connectors.parse import parse_repo_keys

            repo_keys = self._repo_keys or self._repo_in_name
            if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name:
                error(
                    f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'."
                )
            repo_connector = parse_repo_keys(repo_keys)
            self._repo_connector = repo_connector
        return self._repo_connector


    @property
    def version(self):
        """
        Return the plugin's module version is defined (`__version__`) if it's defined.
        """
        if self._version is None:
            try:
                self._version = self.module.__version__
            except Exception as e:
                self._version = None
        return self._version


    @property
    def module(self):
        """
        Return the Python module of the underlying plugin.
        """
        if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None:
            if self.__file__ is None:
                return None
            from meerschaum.plugins import import_plugins
            self._module = import_plugins(str(self), warn=False)
        return self._module


    @property
    def __file__(self) -> Union[str, None]:
        """
        Return the file path (str) of the plugin if it exists, otherwise `None`.
        """
        if self.__dict__.get('_module', None) is not None:
            return self.module.__file__

        potential_dir = PLUGINS_RESOURCES_PATH / self.name
        if (
            potential_dir.exists()
            and potential_dir.is_dir()
            and (potential_dir / '__init__.py').exists()
        ):
            return str((potential_dir / '__init__.py').as_posix())

        potential_file = PLUGINS_RESOURCES_PATH / (self.name + '.py')
        if potential_file.exists() and not potential_file.is_dir():
            return str(potential_file.as_posix())

        return None


    @property
    def requirements_file_path(self) -> Union[pathlib.Path, None]:
        """
        If a file named `requirements.txt` exists, return its path.
        """
        if self.__file__ is None:
            return None
        path = pathlib.Path(self.__file__).parent / 'requirements.txt'
        if not path.exists():
            return None
        return path


    def is_installed(self, **kw) -> bool:
        """
        Check whether a plugin is correctly installed.

        Returns
        -------
        A `bool` indicating whether a plugin exists and is successfully imported.
        """
        return self.__file__ is not None


    def make_tar(self, debug: bool = False) -> pathlib.Path:
        """
        Compress the plugin's source files into a `.tar.gz` archive and return the archive's path.

        Parameters
        ----------
        debug: bool, default False
            Verbosity toggle.

        Returns
        -------
        A `pathlib.Path` to the archive file's path.

        """
        import tarfile, pathlib, subprocess, fnmatch
        from meerschaum.utils.debug import dprint
        from meerschaum.utils.packages import attempt_import
        pathspec = attempt_import('pathspec', debug=debug)

        if not self.__file__:
            from meerschaum.utils.warnings import error
            error(f"Could not find file for plugin '{self}'.")
        if '__init__.py' in self.__file__ or os.path.isdir(self.__file__):
            path = self.__file__.replace('__init__.py', '')
            is_dir = True
        else:
            path = self.__file__
            is_dir = False

        old_cwd = os.getcwd()
        real_parent_path = pathlib.Path(os.path.realpath(path)).parent
        os.chdir(real_parent_path)

        default_patterns_to_ignore = [
            '.pyc',
            '__pycache__/',
            'eggs/',
            '__pypackages__/',
            '.git',
        ]

        def parse_gitignore() -> 'Set[str]':
            gitignore_path = pathlib.Path(path) / '.gitignore'
            if not gitignore_path.exists():
                return set()
            with open(gitignore_path, 'r', encoding='utf-8') as f:
                gitignore_text = f.read()
            return set(pathspec.PathSpec.from_lines(
                pathspec.patterns.GitWildMatchPattern,
                default_patterns_to_ignore + gitignore_text.splitlines()
            ).match_tree(path))

        patterns_to_ignore = parse_gitignore() if is_dir else set()

        if debug:
            dprint(f"Patterns to ignore:\n{patterns_to_ignore}")

        with tarfile.open(self.archive_path, 'w:gz') as tarf:
            if not is_dir:
                tarf.add(f"{self.name}.py")
            else:
                for root, dirs, files in os.walk(self.name):
                    for f in files:
                        good_file = True
                        fp = os.path.join(root, f)
                        for pattern in patterns_to_ignore:
                            if pattern in str(fp) or f.startswith('.'):
                                good_file = False
                                break
                        if good_file:
                            if debug:
                                dprint(f"Adding '{fp}'...")
                            tarf.add(fp)

        ### clean up and change back to old directory
        os.chdir(old_cwd)

        ### change to 775 to avoid permissions issues with the API in a Docker container
        self.archive_path.chmod(0o775)

        if debug:
            dprint(f"Created archive '{self.archive_path}'.")
        return self.archive_path


    def install(
            self,
            force: bool = False,
            debug: bool = False,
        ) -> SuccessTuple:
        """
        Extract a plugin's tar archive to the plugins directory.
        
        This function checks if the plugin is already installed and if the version is equal or
        greater than the existing installation.

        Parameters
        ----------
        force: bool, default False
            If `True`, continue with installation, even if required packages fail to install.

        debug: bool, default False
            Verbosity toggle.

        Returns
        -------
        A `SuccessTuple` of success (bool) and a message (str).

        """
        if self.full_name in _ongoing_installations:
            return True, f"Already installing plugin '{self}'."
        _ongoing_installations.add(self.full_name)
        from meerschaum.utils.warnings import warn, error
        if debug:
            from meerschaum.utils.debug import dprint
        import tarfile
        import re
        import ast
        from meerschaum.plugins import reload_plugins, sync_plugins_symlinks
        from meerschaum.utils.packages import attempt_import, determine_version, reload_package
        from meerschaum.utils.venv import init_venv
        from meerschaum.utils.misc import safely_extract_tar
        old_cwd = os.getcwd()
        old_version = ''
        new_version = ''
        temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name
        temp_dir.mkdir(exist_ok=True)

        if not self.archive_path.exists():
            return False, f"Missing archive file for plugin '{self}'."
        if self.version is not None:
            old_version = self.version
            if debug:
                dprint(f"Found existing version '{old_version}' for plugin '{self}'.")

        if debug:
            dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...")

        try:
            with tarfile.open(self.archive_path, 'r:gz') as tarf:
                safely_extract_tar(tarf, temp_dir)
        except Exception as e:
            warn(e)
            return False, f"Failed to extract plugin '{self.name}'."

        ### search for version information
        files = os.listdir(temp_dir)
        
        if str(files[0]) == self.name:
            is_dir = True
        elif str(files[0]) == self.name + '.py':
            is_dir = False
        else:
            error(f"Unknown format encountered for plugin '{self}'.")

        fpath = temp_dir / files[0]
        if is_dir:
            fpath = fpath / '__init__.py'

        init_venv(self.name, debug=debug)
        with open(fpath, 'r', encoding='utf-8') as f:
            init_lines = f.readlines()
        new_version = None
        for line in init_lines:
            if '__version__' not in line:
                continue
            version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip())
            if not version_match:
                continue
            new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip())
            break
        if not new_version:
            warn(
                f"No `__version__` defined for plugin '{self}'. "
                + "Assuming new version...",
                stack = False,
            )

        packaging_version = attempt_import('packaging.version')
        try:
            is_new_version = (not new_version and not old_version) or (
                packaging_version.parse(old_version) < packaging_version.parse(new_version)
            )
            is_same_version = new_version and old_version and (
                packaging_version.parse(old_version) == packaging_version.parse(new_version)
            )
        except Exception as e:
            is_new_version, is_same_version = True, False

        ### Determine where to permanently store the new plugin.
        plugin_installation_dir_path = PLUGINS_DIR_PATHS[0]
        for path in PLUGINS_DIR_PATHS:
            files_in_plugins_dir = os.listdir(path)
            if (
                self.name in files_in_plugins_dir
                or
                (self.name + '.py') in files_in_plugins_dir
            ):
                plugin_installation_dir_path = path
                break

        success_msg = f"Successfully installed plugin '{self}'."
        success, abort = None, None

        if is_same_version and not force:
            success, msg = True, (
                f"Plugin '{self}' is up-to-date (version {old_version}).\n" +
                "    Install again with `-f` or `--force` to reinstall."
            )
            abort = True
        elif is_new_version or force:
            for src_dir, dirs, files in os.walk(temp_dir):
                if success is not None:
                    break
                dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path))
                if not os.path.exists(dst_dir):
                    os.mkdir(dst_dir)
                for f in files:
                    src_file = os.path.join(src_dir, f)
                    dst_file = os.path.join(dst_dir, f)
                    if os.path.exists(dst_file):
                        os.remove(dst_file)

                    if debug:
                        dprint(f"Moving '{src_file}' to '{dst_dir}'...")
                    try:
                        shutil.move(src_file, dst_dir)
                    except Exception as e:
                        success, msg = False, (
                            f"Failed to install plugin '{self}': " +
                            f"Could not move file '{src_file}' to '{dst_dir}'"
                        )
                        print(msg)
                        break
            if success is None:
                success, msg = True, success_msg
        else:
            success, msg = False, (
                f"Your installed version of plugin '{self}' ({old_version}) is higher than "
                + f"attempted version {new_version}."
            )

        shutil.rmtree(temp_dir)
        os.chdir(old_cwd)

        ### Reload the plugin's module.
        sync_plugins_symlinks(debug=debug)
        if '_module' in self.__dict__:
            del self.__dict__['_module']
        init_venv(venv=self.name, force=True, debug=debug)
        reload_package('meerschaum')
        reload_plugins([self.name], debug=debug)

        ### if we've already failed, return here
        if not success or abort:
            _ongoing_installations.remove(self.full_name)
            return success, msg

        ### attempt to install dependencies
        if not self.install_dependencies(force=force, debug=debug):
            _ongoing_installations.remove(self.full_name)
            return False, f"Failed to install dependencies for plugin '{self}'."

        ### handling success tuple, bool, or other (typically None)
        setup_tuple = self.setup(debug=debug)
        if isinstance(setup_tuple, tuple):
            if not setup_tuple[0]:
                success, msg = setup_tuple
        elif isinstance(setup_tuple, bool):
            if not setup_tuple:
                success, msg = False, (
                    f"Failed to run post-install setup for plugin '{self}'." + '\n' +
                    f"Check `setup()` in '{self.__file__}' for more information " +
                    f"(no error message provided)."
                )
            else:
                success, msg = True, success_msg
        elif setup_tuple is None:
            success = True
            msg = (
                f"Post-install for plugin '{self}' returned None. " +
                f"Assuming plugin successfully installed."
            )
            warn(msg)
        else:
            success = False
            msg = (
                f"Post-install for plugin '{self}' returned unexpected value " +
                f"of type '{type(setup_tuple)}': {setup_tuple}"
            )

        _ongoing_installations.remove(self.full_name)
        module = self.module
        return success, msg


    def remove_archive(
            self,        
            debug: bool = False
        ) -> SuccessTuple:
        """Remove a plugin's archive file."""
        if not self.archive_path.exists():
            return True, f"Archive file for plugin '{self}' does not exist."
        try:
            self.archive_path.unlink()
        except Exception as e:
            return False, f"Failed to remove archive for plugin '{self}':\n{e}"
        return True, "Success"


    def remove_venv(
            self,        
            debug: bool = False
        ) -> SuccessTuple:
        """Remove a plugin's virtual environment."""
        if not self.venv_path.exists():
            return True, f"Virtual environment for plugin '{self}' does not exist."
        try:
            shutil.rmtree(self.venv_path)
        except Exception as e:
            return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}"
        return True, "Success"


    def uninstall(self, debug: bool = False) -> SuccessTuple:
        """
        Remove a plugin, its virtual environment, and archive file.
        """
        from meerschaum.utils.packages import reload_package
        from meerschaum.plugins import reload_plugins, sync_plugins_symlinks
        from meerschaum.utils.warnings import warn, info
        warnings_thrown_count: int = 0
        max_warnings: int = 3

        if not self.is_installed():
            info(
                f"Plugin '{self.name}' doesn't seem to be installed.\n    "
                + "Checking for artifacts...",
                stack = False,
            )
        else:
            real_path = pathlib.Path(os.path.realpath(self.__file__))
            try:
                if real_path.name == '__init__.py':
                    shutil.rmtree(real_path.parent)
                else:
                    real_path.unlink()
            except Exception as e:
                warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False)
                warnings_thrown_count += 1
            else:
                info(f"Removed source files for plugin '{self.name}'.")

        if self.venv_path.exists():
            success, msg = self.remove_venv(debug=debug)
            if not success:
                warn(msg, stack=False)
                warnings_thrown_count += 1
            else:
                info(f"Removed virtual environment from plugin '{self.name}'.")

        success = warnings_thrown_count < max_warnings
        sync_plugins_symlinks(debug=debug)
        self.deactivate_venv(force=True, debug=debug)
        reload_package('meerschaum')
        reload_plugins(debug=debug)
        return success, (
            f"Successfully uninstalled plugin '{self}'." if success
            else f"Failed to uninstall plugin '{self}'."
        )


    def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]:
        """
        If exists, run the plugin's `setup()` function.

        Parameters
        ----------
        *args: str
            The positional arguments passed to the `setup()` function.
            
        debug: bool, default False
            Verbosity toggle.

        **kw: Any
            The keyword arguments passed to the `setup()` function.

        Returns
        -------
        A `SuccessTuple` or `bool` indicating success.

        """
        from meerschaum.utils.debug import dprint
        import inspect
        _setup = None
        for name, fp in inspect.getmembers(self.module):
            if name == 'setup' and inspect.isfunction(fp):
                _setup = fp
                break

        ### assume success if no setup() is found (not necessary)
        if _setup is None:
            return True

        sig = inspect.signature(_setup)
        has_debug, has_kw = ('debug' in sig.parameters), False
        for k, v in sig.parameters.items():
            if '**' in str(v):
                has_kw = True
                break

        _kw = {}
        if has_kw:
            _kw.update(kw)
        if has_debug:
            _kw['debug'] = debug

        if debug:
            dprint(f"Running setup for plugin '{self}'...")
        try:
            self.activate_venv(debug=debug)
            return_tuple = _setup(*args, **_kw)
            self.deactivate_venv(debug=debug)
        except Exception as e:
            return False, str(e)

        if isinstance(return_tuple, tuple):
            return return_tuple
        if isinstance(return_tuple, bool):
            return return_tuple, f"Setup for Plugin '{self.name}' did not return a message."
        if return_tuple is None:
            return False, f"Setup for Plugin '{self.name}' returned None."
        return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"


    def get_dependencies(
            self,
            debug: bool = False,
        ) -> List[str]:
        """
        If the Plugin has specified dependencies in a list called `required`, return the list.
        
        **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages.
        Meerschaum plugins may also specify connector keys for a repo after `'@'`.

        Parameters
        ----------
        debug: bool, default False
            Verbosity toggle.

        Returns
        -------
        A list of required packages and plugins (str).

        """
        if '_required' in self.__dict__:
            return self._required

        ### If the plugin has not yet been imported,
        ### infer the dependencies from the source text.
        ### This is not super robust, and it doesn't feel right
        ### having multiple versions of the logic.
        ### This is necessary when determining the activation order
        ### without having import the module.
        ### For consistency's sake, the module-less method does not cache the requirements.
        if self.__dict__.get('_module', None) is None:
            file_path = self.__file__
            if file_path is None:
                return []
            with open(file_path, 'r', encoding='utf-8') as f:
                text = f.read()

            if 'required' not in text:
                return []

            ### This has some limitations:
            ### It relies on `required` being manually declared.
            ### We lose the ability to dynamically alter the `required` list,
            ### which is why we've kept the module-reliant method below.
            import ast, re
            ### NOTE: This technically would break 
            ### if `required` was the very first line of the file.
            req_start_match = re.search(r'\nrequired(\s?)=', text)
            if not req_start_match:
                return []
            req_start = req_start_match.start()

            ### Dependencies may have brackets within the strings, so push back the index.
            first_opening_brace = req_start + 1 + text[req_start:].find('[')
            if first_opening_brace == -1:
                return []

            next_closing_brace = req_start + 1 + text[req_start:].find(']')
            if next_closing_brace == -1:
                return []

            start_ix = first_opening_brace + 1
            end_ix = next_closing_brace

            num_braces = 0
            while True:
                if '[' not in text[start_ix:end_ix]:
                    break
                num_braces += 1
                start_ix = end_ix
                end_ix += text[end_ix + 1:].find(']') + 1

            req_end = end_ix + 1
            req_text = (
                text[req_start:req_end]
                .lstrip()
                .replace('required', '', 1)
                .lstrip()
                .replace('=', '', 1)
                .lstrip()
            )
            try:
                required = ast.literal_eval(req_text)
            except Exception as e:
                warn(
                    f"Unable to determine requirements for plugin '{self.name}' "
                    + "without importing the module.\n"
                    + "    This may be due to dynamically setting the global `required` list.\n"
                    + f"    {e}"
                )
                return []
            return required

        import inspect
        self.activate_venv(dependencies=False, debug=debug)
        required = []
        for name, val in inspect.getmembers(self.module):
            if name == 'required':
                required = val
                break
        self._required = required
        self.deactivate_venv(dependencies=False, debug=debug)
        return required


    def get_required_plugins(self, debug: bool=False) -> List[meerschaum.plugins.Plugin]:
        """
        Return a list of required Plugin objects.
        """
        from meerschaum.utils.warnings import warn
        from meerschaum.config import get_config
        from meerschaum.config.static import STATIC_CONFIG
        plugins = []
        _deps = self.get_dependencies(debug=debug)
        sep = STATIC_CONFIG['plugins']['repo_separator']
        plugin_names = [
            _d[len('plugin:'):] for _d in _deps
            if _d.startswith('plugin:') and len(_d) > len('plugin:')
        ]
        default_repo_keys = get_config('meerschaum', 'default_repository')
        for _plugin_name in plugin_names:
            if sep in _plugin_name:
                try:
                    _plugin_name, _repo_keys = _plugin_name.split(sep)
                except Exception as e:
                    _repo_keys = default_repo_keys
                    warn(
                        f"Invalid repo keys for required plugin '{_plugin_name}'.\n    "
                        + f"Will try to use '{_repo_keys}' instead.",
                        stack = False,
                    )
            else:
                _repo_keys = default_repo_keys
            plugins.append(Plugin(_plugin_name, repo=_repo_keys))
        return plugins


    def get_required_packages(self, debug: bool=False) -> List[str]:
        """
        Return the required package names (excluding plugins).
        """
        _deps = self.get_dependencies(debug=debug)
        return [_d for _d in _deps if not _d.startswith('plugin:')]


    def activate_venv(self, dependencies: bool=True, debug: bool=False, **kw) -> bool:
        """
        Activate the virtual environments for the plugin and its dependencies.

        Parameters
        ----------
        dependencies: bool, default True
            If `True`, activate the virtual environments for required plugins.

        Returns
        -------
        A bool indicating success.
        """
        from meerschaum.utils.venv import venv_target_path
        from meerschaum.utils.packages import activate_venv
        from meerschaum.utils.misc import make_symlink, is_symlink
        from meerschaum.config._paths import PACKAGE_ROOT_PATH

        if dependencies:
            for plugin in self.get_required_plugins(debug=debug):
                plugin.activate_venv(debug=debug, **kw)

        vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True)
        venv_meerschaum_path = vtp / 'meerschaum'

        try:
            success, msg = True, "Success"
            if is_symlink(venv_meerschaum_path):
                if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH:
                    venv_meerschaum_path.unlink()
                    success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH)
        except Exception as e:
            success, msg = False, str(e)
        if not success:
            warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}")

        return activate_venv(self.name, debug=debug, **kw)


    def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool:
        """
        Deactivate the virtual environments for the plugin and its dependencies.

        Parameters
        ----------
        dependencies: bool, default True
            If `True`, deactivate the virtual environments for required plugins.

        Returns
        -------
        A bool indicating success.
        """
        from meerschaum.utils.packages import deactivate_venv
        success = deactivate_venv(self.name, debug=debug, **kw)
        if dependencies:
            for plugin in self.get_required_plugins(debug=debug):
                plugin.deactivate_venv(debug=debug, **kw)
        return success


    def install_dependencies(
            self,
            force: bool = False,
            debug: bool = False,
        ) -> bool:
        """
        If specified, install dependencies.
        
        **NOTE:** Dependencies that start with `'plugin:'` will be installed as
        Meerschaum plugins from the same repository as this Plugin.
        To install from a different repository, add the repo keys after `'@'`
        (e.g. `'plugin:foo@api:bar'`).

        Parameters
        ----------
        force: bool, default False
            If `True`, continue with the installation, even if some
            required packages fail to install.

        debug: bool, default False
            Verbosity toggle.

        Returns
        -------
        A bool indicating success.

        """
        from meerschaum.utils.packages import pip_install, venv_contains_package
        from meerschaum.utils.debug import dprint
        from meerschaum.utils.warnings import warn, info
        from meerschaum.connectors.parse import parse_repo_keys
        _deps = self.get_dependencies(debug=debug)
        if not _deps and self.requirements_file_path is None:
            return True

        plugins = self.get_required_plugins(debug=debug)
        for _plugin in plugins:
            if _plugin.name == self.name:
                warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False)
                continue
            _success, _msg = _plugin.repo_connector.install_plugin(
                _plugin.name, debug=debug, force=force
            )
            if not _success:
                warn(
                    f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'"
                    + f" for plugin '{self.name}':\n" + _msg,
                    stack = False,
                )
                if not force:
                    warn(
                        "Try installing with the `--force` flag to continue anyway.",
                        stack = False,
                    )
                    return False
                info(
                    "Continuing with installation despite the failure "
                    + "(careful, things might be broken!)...",
                    icon = False
                )


        ### First step: parse `requirements.txt` if it exists.
        if self.requirements_file_path is not None:
            if not pip_install(
                requirements_file_path=self.requirements_file_path,
                venv=self.name, debug=debug
            ):
                warn(
                    f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.",
                    stack = False,
                )
                if not force:
                    warn(
                        "Try installing with `--force` to continue anyway.",
                        stack = False,
                    )
                    return False
                info(
                    "Continuing with installation despite the failure "
                    + "(careful, things might be broken!)...",
                    icon = False
                )


        ### Don't reinstall packages that are already included in required plugins.
        packages = []
        _packages = self.get_required_packages(debug=debug)
        accounted_for_packages = set()
        for package_name in _packages:
            for plugin in plugins:
                if venv_contains_package(package_name, plugin.name):
                    accounted_for_packages.add(package_name)
                    break
        packages = [pkg for pkg in _packages if pkg not in accounted_for_packages]

        ### Attempt pip packages installation.
        if packages:
            for package in packages:
                if not pip_install(package, venv=self.name, debug=debug):
                    warn(
                        f"Failed to install required package '{package}'"
                        + f" for plugin '{self.name}'.",
                        stack = False,
                    )
                    if not force:
                        warn(
                            "Try installing with `--force` to continue anyway.",
                            stack = False,
                        )
                        return False
                    info(
                        "Continuing with installation despite the failure "
                        + "(careful, things might be broken!)...",
                        icon = False
                    )
        return True


    @property
    def full_name(self) -> str:
        """
        Include the repo keys with the plugin's name.
        """
        from meerschaum.config.static import STATIC_CONFIG
        sep = STATIC_CONFIG['plugins']['repo_separator']
        return self.name + sep + str(self.repo_connector)


    def __str__(self):
        return self.name


    def __repr__(self):
        return f"Plugin('{self.name}', repo='{self.repo_connector}')"


    def __del__(self):
        pass

Instance variables

var full_name : str

Include the repo keys with the plugin's name.

Expand source code
@property
def full_name(self) -> str:
    """
    Include the repo keys with the plugin's name.
    """
    from meerschaum.config.static import STATIC_CONFIG
    sep = STATIC_CONFIG['plugins']['repo_separator']
    return self.name + sep + str(self.repo_connector)
var module

Return the Python module of the underlying plugin.

Expand source code
@property
def module(self):
    """
    Return the Python module of the underlying plugin.
    """
    if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None:
        if self.__file__ is None:
            return None
        from meerschaum.plugins import import_plugins
        self._module = import_plugins(str(self), warn=False)
    return self._module
var repo_connector

Return the repository connector for this plugin. NOTE: This imports the meerschaum.connectors module, which imports certain plugin modules.

Expand source code
@property
def repo_connector(self):
    """
    Return the repository connector for this plugin.
    NOTE: This imports the `connectors` module, which imports certain plugin modules.
    """
    if self._repo_connector is None:
        from meerschaum.connectors.parse import parse_repo_keys

        repo_keys = self._repo_keys or self._repo_in_name
        if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name:
            error(
                f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'."
            )
        repo_connector = parse_repo_keys(repo_keys)
        self._repo_connector = repo_connector
    return self._repo_connector
var requirements_file_path : Optional[pathlib.Path]

If a file named requirements.txt exists, return its path.

Expand source code
@property
def requirements_file_path(self) -> Union[pathlib.Path, None]:
    """
    If a file named `requirements.txt` exists, return its path.
    """
    if self.__file__ is None:
        return None
    path = pathlib.Path(self.__file__).parent / 'requirements.txt'
    if not path.exists():
        return None
    return path
var version

Return the plugin's module version is defined (__version__) if it's defined.

Expand source code
@property
def version(self):
    """
    Return the plugin's module version is defined (`__version__`) if it's defined.
    """
    if self._version is None:
        try:
            self._version = self.module.__version__
        except Exception as e:
            self._version = None
    return self._version

Methods

def activate_venv(self, dependencies: bool = True, debug: bool = False, **kw) ‑> bool

Activate the virtual environments for the plugin and its dependencies.

Parameters

dependencies : bool, default True
If True, activate the virtual environments for required plugins.

Returns

A bool indicating success.

Expand source code
def activate_venv(self, dependencies: bool=True, debug: bool=False, **kw) -> bool:
    """
    Activate the virtual environments for the plugin and its dependencies.

    Parameters
    ----------
    dependencies: bool, default True
        If `True`, activate the virtual environments for required plugins.

    Returns
    -------
    A bool indicating success.
    """
    from meerschaum.utils.venv import venv_target_path
    from meerschaum.utils.packages import activate_venv
    from meerschaum.utils.misc import make_symlink, is_symlink
    from meerschaum.config._paths import PACKAGE_ROOT_PATH

    if dependencies:
        for plugin in self.get_required_plugins(debug=debug):
            plugin.activate_venv(debug=debug, **kw)

    vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True)
    venv_meerschaum_path = vtp / 'meerschaum'

    try:
        success, msg = True, "Success"
        if is_symlink(venv_meerschaum_path):
            if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH:
                venv_meerschaum_path.unlink()
                success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH)
    except Exception as e:
        success, msg = False, str(e)
    if not success:
        warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}")

    return activate_venv(self.name, debug=debug, **kw)
def deactivate_venv(self, dependencies: bool = True, debug: bool = False, **kw) ‑> bool

Deactivate the virtual environments for the plugin and its dependencies.

Parameters

dependencies : bool, default True
If True, deactivate the virtual environments for required plugins.

Returns

A bool indicating success.

Expand source code
def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool:
    """
    Deactivate the virtual environments for the plugin and its dependencies.

    Parameters
    ----------
    dependencies: bool, default True
        If `True`, deactivate the virtual environments for required plugins.

    Returns
    -------
    A bool indicating success.
    """
    from meerschaum.utils.packages import deactivate_venv
    success = deactivate_venv(self.name, debug=debug, **kw)
    if dependencies:
        for plugin in self.get_required_plugins(debug=debug):
            plugin.deactivate_venv(debug=debug, **kw)
    return success
def get_dependencies(self, debug: bool = False) ‑> List[str]

If the Plugin has specified dependencies in a list called required, return the list.

NOTE: Dependecies which start with 'plugin:' are Meerschaum plugins, not pip packages. Meerschaum plugins may also specify connector keys for a repo after '@'.

Parameters

debug : bool, default False
Verbosity toggle.

Returns

A list of required packages and plugins (str).

Expand source code
def get_dependencies(
        self,
        debug: bool = False,
    ) -> List[str]:
    """
    If the Plugin has specified dependencies in a list called `required`, return the list.
    
    **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages.
    Meerschaum plugins may also specify connector keys for a repo after `'@'`.

    Parameters
    ----------
    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A list of required packages and plugins (str).

    """
    if '_required' in self.__dict__:
        return self._required

    ### If the plugin has not yet been imported,
    ### infer the dependencies from the source text.
    ### This is not super robust, and it doesn't feel right
    ### having multiple versions of the logic.
    ### This is necessary when determining the activation order
    ### without having import the module.
    ### For consistency's sake, the module-less method does not cache the requirements.
    if self.__dict__.get('_module', None) is None:
        file_path = self.__file__
        if file_path is None:
            return []
        with open(file_path, 'r', encoding='utf-8') as f:
            text = f.read()

        if 'required' not in text:
            return []

        ### This has some limitations:
        ### It relies on `required` being manually declared.
        ### We lose the ability to dynamically alter the `required` list,
        ### which is why we've kept the module-reliant method below.
        import ast, re
        ### NOTE: This technically would break 
        ### if `required` was the very first line of the file.
        req_start_match = re.search(r'\nrequired(\s?)=', text)
        if not req_start_match:
            return []
        req_start = req_start_match.start()

        ### Dependencies may have brackets within the strings, so push back the index.
        first_opening_brace = req_start + 1 + text[req_start:].find('[')
        if first_opening_brace == -1:
            return []

        next_closing_brace = req_start + 1 + text[req_start:].find(']')
        if next_closing_brace == -1:
            return []

        start_ix = first_opening_brace + 1
        end_ix = next_closing_brace

        num_braces = 0
        while True:
            if '[' not in text[start_ix:end_ix]:
                break
            num_braces += 1
            start_ix = end_ix
            end_ix += text[end_ix + 1:].find(']') + 1

        req_end = end_ix + 1
        req_text = (
            text[req_start:req_end]
            .lstrip()
            .replace('required', '', 1)
            .lstrip()
            .replace('=', '', 1)
            .lstrip()
        )
        try:
            required = ast.literal_eval(req_text)
        except Exception as e:
            warn(
                f"Unable to determine requirements for plugin '{self.name}' "
                + "without importing the module.\n"
                + "    This may be due to dynamically setting the global `required` list.\n"
                + f"    {e}"
            )
            return []
        return required

    import inspect
    self.activate_venv(dependencies=False, debug=debug)
    required = []
    for name, val in inspect.getmembers(self.module):
        if name == 'required':
            required = val
            break
    self._required = required
    self.deactivate_venv(dependencies=False, debug=debug)
    return required
def get_required_packages(self, debug: bool = False) ‑> List[str]

Return the required package names (excluding plugins).

Expand source code
def get_required_packages(self, debug: bool=False) -> List[str]:
    """
    Return the required package names (excluding plugins).
    """
    _deps = self.get_dependencies(debug=debug)
    return [_d for _d in _deps if not _d.startswith('plugin:')]
def get_required_plugins(self, debug: bool = False) ‑> List[Plugin]

Return a list of required Plugin objects.

Expand source code
def get_required_plugins(self, debug: bool=False) -> List[meerschaum.plugins.Plugin]:
    """
    Return a list of required Plugin objects.
    """
    from meerschaum.utils.warnings import warn
    from meerschaum.config import get_config
    from meerschaum.config.static import STATIC_CONFIG
    plugins = []
    _deps = self.get_dependencies(debug=debug)
    sep = STATIC_CONFIG['plugins']['repo_separator']
    plugin_names = [
        _d[len('plugin:'):] for _d in _deps
        if _d.startswith('plugin:') and len(_d) > len('plugin:')
    ]
    default_repo_keys = get_config('meerschaum', 'default_repository')
    for _plugin_name in plugin_names:
        if sep in _plugin_name:
            try:
                _plugin_name, _repo_keys = _plugin_name.split(sep)
            except Exception as e:
                _repo_keys = default_repo_keys
                warn(
                    f"Invalid repo keys for required plugin '{_plugin_name}'.\n    "
                    + f"Will try to use '{_repo_keys}' instead.",
                    stack = False,
                )
        else:
            _repo_keys = default_repo_keys
        plugins.append(Plugin(_plugin_name, repo=_repo_keys))
    return plugins
def install(self, force: bool = False, debug: bool = False) ‑> Tuple[bool, str]

Extract a plugin's tar archive to the plugins directory.

This function checks if the plugin is already installed and if the version is equal or greater than the existing installation.

Parameters

force : bool, default False
If True, continue with installation, even if required packages fail to install.
debug : bool, default False
Verbosity toggle.

Returns

A SuccessTuple of success (bool) and a message (str).

Expand source code
def install(
        self,
        force: bool = False,
        debug: bool = False,
    ) -> SuccessTuple:
    """
    Extract a plugin's tar archive to the plugins directory.
    
    This function checks if the plugin is already installed and if the version is equal or
    greater than the existing installation.

    Parameters
    ----------
    force: bool, default False
        If `True`, continue with installation, even if required packages fail to install.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `SuccessTuple` of success (bool) and a message (str).

    """
    if self.full_name in _ongoing_installations:
        return True, f"Already installing plugin '{self}'."
    _ongoing_installations.add(self.full_name)
    from meerschaum.utils.warnings import warn, error
    if debug:
        from meerschaum.utils.debug import dprint
    import tarfile
    import re
    import ast
    from meerschaum.plugins import reload_plugins, sync_plugins_symlinks
    from meerschaum.utils.packages import attempt_import, determine_version, reload_package
    from meerschaum.utils.venv import init_venv
    from meerschaum.utils.misc import safely_extract_tar
    old_cwd = os.getcwd()
    old_version = ''
    new_version = ''
    temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name
    temp_dir.mkdir(exist_ok=True)

    if not self.archive_path.exists():
        return False, f"Missing archive file for plugin '{self}'."
    if self.version is not None:
        old_version = self.version
        if debug:
            dprint(f"Found existing version '{old_version}' for plugin '{self}'.")

    if debug:
        dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...")

    try:
        with tarfile.open(self.archive_path, 'r:gz') as tarf:
            safely_extract_tar(tarf, temp_dir)
    except Exception as e:
        warn(e)
        return False, f"Failed to extract plugin '{self.name}'."

    ### search for version information
    files = os.listdir(temp_dir)
    
    if str(files[0]) == self.name:
        is_dir = True
    elif str(files[0]) == self.name + '.py':
        is_dir = False
    else:
        error(f"Unknown format encountered for plugin '{self}'.")

    fpath = temp_dir / files[0]
    if is_dir:
        fpath = fpath / '__init__.py'

    init_venv(self.name, debug=debug)
    with open(fpath, 'r', encoding='utf-8') as f:
        init_lines = f.readlines()
    new_version = None
    for line in init_lines:
        if '__version__' not in line:
            continue
        version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip())
        if not version_match:
            continue
        new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip())
        break
    if not new_version:
        warn(
            f"No `__version__` defined for plugin '{self}'. "
            + "Assuming new version...",
            stack = False,
        )

    packaging_version = attempt_import('packaging.version')
    try:
        is_new_version = (not new_version and not old_version) or (
            packaging_version.parse(old_version) < packaging_version.parse(new_version)
        )
        is_same_version = new_version and old_version and (
            packaging_version.parse(old_version) == packaging_version.parse(new_version)
        )
    except Exception as e:
        is_new_version, is_same_version = True, False

    ### Determine where to permanently store the new plugin.
    plugin_installation_dir_path = PLUGINS_DIR_PATHS[0]
    for path in PLUGINS_DIR_PATHS:
        files_in_plugins_dir = os.listdir(path)
        if (
            self.name in files_in_plugins_dir
            or
            (self.name + '.py') in files_in_plugins_dir
        ):
            plugin_installation_dir_path = path
            break

    success_msg = f"Successfully installed plugin '{self}'."
    success, abort = None, None

    if is_same_version and not force:
        success, msg = True, (
            f"Plugin '{self}' is up-to-date (version {old_version}).\n" +
            "    Install again with `-f` or `--force` to reinstall."
        )
        abort = True
    elif is_new_version or force:
        for src_dir, dirs, files in os.walk(temp_dir):
            if success is not None:
                break
            dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path))
            if not os.path.exists(dst_dir):
                os.mkdir(dst_dir)
            for f in files:
                src_file = os.path.join(src_dir, f)
                dst_file = os.path.join(dst_dir, f)
                if os.path.exists(dst_file):
                    os.remove(dst_file)

                if debug:
                    dprint(f"Moving '{src_file}' to '{dst_dir}'...")
                try:
                    shutil.move(src_file, dst_dir)
                except Exception as e:
                    success, msg = False, (
                        f"Failed to install plugin '{self}': " +
                        f"Could not move file '{src_file}' to '{dst_dir}'"
                    )
                    print(msg)
                    break
        if success is None:
            success, msg = True, success_msg
    else:
        success, msg = False, (
            f"Your installed version of plugin '{self}' ({old_version}) is higher than "
            + f"attempted version {new_version}."
        )

    shutil.rmtree(temp_dir)
    os.chdir(old_cwd)

    ### Reload the plugin's module.
    sync_plugins_symlinks(debug=debug)
    if '_module' in self.__dict__:
        del self.__dict__['_module']
    init_venv(venv=self.name, force=True, debug=debug)
    reload_package('meerschaum')
    reload_plugins([self.name], debug=debug)

    ### if we've already failed, return here
    if not success or abort:
        _ongoing_installations.remove(self.full_name)
        return success, msg

    ### attempt to install dependencies
    if not self.install_dependencies(force=force, debug=debug):
        _ongoing_installations.remove(self.full_name)
        return False, f"Failed to install dependencies for plugin '{self}'."

    ### handling success tuple, bool, or other (typically None)
    setup_tuple = self.setup(debug=debug)
    if isinstance(setup_tuple, tuple):
        if not setup_tuple[0]:
            success, msg = setup_tuple
    elif isinstance(setup_tuple, bool):
        if not setup_tuple:
            success, msg = False, (
                f"Failed to run post-install setup for plugin '{self}'." + '\n' +
                f"Check `setup()` in '{self.__file__}' for more information " +
                f"(no error message provided)."
            )
        else:
            success, msg = True, success_msg
    elif setup_tuple is None:
        success = True
        msg = (
            f"Post-install for plugin '{self}' returned None. " +
            f"Assuming plugin successfully installed."
        )
        warn(msg)
    else:
        success = False
        msg = (
            f"Post-install for plugin '{self}' returned unexpected value " +
            f"of type '{type(setup_tuple)}': {setup_tuple}"
        )

    _ongoing_installations.remove(self.full_name)
    module = self.module
    return success, msg
def install_dependencies(self, force: bool = False, debug: bool = False) ‑> bool

If specified, install dependencies.

NOTE: Dependencies that start with 'plugin:' will be installed as Meerschaum plugins from the same repository as this Plugin. To install from a different repository, add the repo keys after '@' (e.g. 'plugin:foo@api:bar').

Parameters

force : bool, default False
If True, continue with the installation, even if some required packages fail to install.
debug : bool, default False
Verbosity toggle.

Returns

A bool indicating success.

Expand source code
def install_dependencies(
        self,
        force: bool = False,
        debug: bool = False,
    ) -> bool:
    """
    If specified, install dependencies.
    
    **NOTE:** Dependencies that start with `'plugin:'` will be installed as
    Meerschaum plugins from the same repository as this Plugin.
    To install from a different repository, add the repo keys after `'@'`
    (e.g. `'plugin:foo@api:bar'`).

    Parameters
    ----------
    force: bool, default False
        If `True`, continue with the installation, even if some
        required packages fail to install.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A bool indicating success.

    """
    from meerschaum.utils.packages import pip_install, venv_contains_package
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.warnings import warn, info
    from meerschaum.connectors.parse import parse_repo_keys
    _deps = self.get_dependencies(debug=debug)
    if not _deps and self.requirements_file_path is None:
        return True

    plugins = self.get_required_plugins(debug=debug)
    for _plugin in plugins:
        if _plugin.name == self.name:
            warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False)
            continue
        _success, _msg = _plugin.repo_connector.install_plugin(
            _plugin.name, debug=debug, force=force
        )
        if not _success:
            warn(
                f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'"
                + f" for plugin '{self.name}':\n" + _msg,
                stack = False,
            )
            if not force:
                warn(
                    "Try installing with the `--force` flag to continue anyway.",
                    stack = False,
                )
                return False
            info(
                "Continuing with installation despite the failure "
                + "(careful, things might be broken!)...",
                icon = False
            )


    ### First step: parse `requirements.txt` if it exists.
    if self.requirements_file_path is not None:
        if not pip_install(
            requirements_file_path=self.requirements_file_path,
            venv=self.name, debug=debug
        ):
            warn(
                f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.",
                stack = False,
            )
            if not force:
                warn(
                    "Try installing with `--force` to continue anyway.",
                    stack = False,
                )
                return False
            info(
                "Continuing with installation despite the failure "
                + "(careful, things might be broken!)...",
                icon = False
            )


    ### Don't reinstall packages that are already included in required plugins.
    packages = []
    _packages = self.get_required_packages(debug=debug)
    accounted_for_packages = set()
    for package_name in _packages:
        for plugin in plugins:
            if venv_contains_package(package_name, plugin.name):
                accounted_for_packages.add(package_name)
                break
    packages = [pkg for pkg in _packages if pkg not in accounted_for_packages]

    ### Attempt pip packages installation.
    if packages:
        for package in packages:
            if not pip_install(package, venv=self.name, debug=debug):
                warn(
                    f"Failed to install required package '{package}'"
                    + f" for plugin '{self.name}'.",
                    stack = False,
                )
                if not force:
                    warn(
                        "Try installing with `--force` to continue anyway.",
                        stack = False,
                    )
                    return False
                info(
                    "Continuing with installation despite the failure "
                    + "(careful, things might be broken!)...",
                    icon = False
                )
    return True
def is_installed(self, **kw) ‑> bool

Check whether a plugin is correctly installed.

Returns

A bool indicating whether a plugin exists and is successfully imported.

Expand source code
def is_installed(self, **kw) -> bool:
    """
    Check whether a plugin is correctly installed.

    Returns
    -------
    A `bool` indicating whether a plugin exists and is successfully imported.
    """
    return self.__file__ is not None
def make_tar(self, debug: bool = False) ‑> pathlib.Path

Compress the plugin's source files into a .tar.gz archive and return the archive's path.

Parameters

debug : bool, default False
Verbosity toggle.

Returns

A pathlib.Path to the archive file's path.

Expand source code
def make_tar(self, debug: bool = False) -> pathlib.Path:
    """
    Compress the plugin's source files into a `.tar.gz` archive and return the archive's path.

    Parameters
    ----------
    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `pathlib.Path` to the archive file's path.

    """
    import tarfile, pathlib, subprocess, fnmatch
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.packages import attempt_import
    pathspec = attempt_import('pathspec', debug=debug)

    if not self.__file__:
        from meerschaum.utils.warnings import error
        error(f"Could not find file for plugin '{self}'.")
    if '__init__.py' in self.__file__ or os.path.isdir(self.__file__):
        path = self.__file__.replace('__init__.py', '')
        is_dir = True
    else:
        path = self.__file__
        is_dir = False

    old_cwd = os.getcwd()
    real_parent_path = pathlib.Path(os.path.realpath(path)).parent
    os.chdir(real_parent_path)

    default_patterns_to_ignore = [
        '.pyc',
        '__pycache__/',
        'eggs/',
        '__pypackages__/',
        '.git',
    ]

    def parse_gitignore() -> 'Set[str]':
        gitignore_path = pathlib.Path(path) / '.gitignore'
        if not gitignore_path.exists():
            return set()
        with open(gitignore_path, 'r', encoding='utf-8') as f:
            gitignore_text = f.read()
        return set(pathspec.PathSpec.from_lines(
            pathspec.patterns.GitWildMatchPattern,
            default_patterns_to_ignore + gitignore_text.splitlines()
        ).match_tree(path))

    patterns_to_ignore = parse_gitignore() if is_dir else set()

    if debug:
        dprint(f"Patterns to ignore:\n{patterns_to_ignore}")

    with tarfile.open(self.archive_path, 'w:gz') as tarf:
        if not is_dir:
            tarf.add(f"{self.name}.py")
        else:
            for root, dirs, files in os.walk(self.name):
                for f in files:
                    good_file = True
                    fp = os.path.join(root, f)
                    for pattern in patterns_to_ignore:
                        if pattern in str(fp) or f.startswith('.'):
                            good_file = False
                            break
                    if good_file:
                        if debug:
                            dprint(f"Adding '{fp}'...")
                        tarf.add(fp)

    ### clean up and change back to old directory
    os.chdir(old_cwd)

    ### change to 775 to avoid permissions issues with the API in a Docker container
    self.archive_path.chmod(0o775)

    if debug:
        dprint(f"Created archive '{self.archive_path}'.")
    return self.archive_path
def remove_archive(self, debug: bool = False) ‑> Tuple[bool, str]

Remove a plugin's archive file.

Expand source code
def remove_archive(
        self,        
        debug: bool = False
    ) -> SuccessTuple:
    """Remove a plugin's archive file."""
    if not self.archive_path.exists():
        return True, f"Archive file for plugin '{self}' does not exist."
    try:
        self.archive_path.unlink()
    except Exception as e:
        return False, f"Failed to remove archive for plugin '{self}':\n{e}"
    return True, "Success"
def remove_venv(self, debug: bool = False) ‑> Tuple[bool, str]

Remove a plugin's virtual environment.

Expand source code
def remove_venv(
        self,        
        debug: bool = False
    ) -> SuccessTuple:
    """Remove a plugin's virtual environment."""
    if not self.venv_path.exists():
        return True, f"Virtual environment for plugin '{self}' does not exist."
    try:
        shutil.rmtree(self.venv_path)
    except Exception as e:
        return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}"
    return True, "Success"
def setup(self, *args: str, debug: bool = False, **kw: Any) ‑> Union[Tuple[bool, str], bool]

If exists, run the plugin's setup() function.

Parameters

*args : str
The positional arguments passed to the setup() function.
debug : bool, default False
Verbosity toggle.
**kw : Any
The keyword arguments passed to the setup() function.

Returns

A SuccessTuple or bool indicating success.

Expand source code
def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]:
    """
    If exists, run the plugin's `setup()` function.

    Parameters
    ----------
    *args: str
        The positional arguments passed to the `setup()` function.
        
    debug: bool, default False
        Verbosity toggle.

    **kw: Any
        The keyword arguments passed to the `setup()` function.

    Returns
    -------
    A `SuccessTuple` or `bool` indicating success.

    """
    from meerschaum.utils.debug import dprint
    import inspect
    _setup = None
    for name, fp in inspect.getmembers(self.module):
        if name == 'setup' and inspect.isfunction(fp):
            _setup = fp
            break

    ### assume success if no setup() is found (not necessary)
    if _setup is None:
        return True

    sig = inspect.signature(_setup)
    has_debug, has_kw = ('debug' in sig.parameters), False
    for k, v in sig.parameters.items():
        if '**' in str(v):
            has_kw = True
            break

    _kw = {}
    if has_kw:
        _kw.update(kw)
    if has_debug:
        _kw['debug'] = debug

    if debug:
        dprint(f"Running setup for plugin '{self}'...")
    try:
        self.activate_venv(debug=debug)
        return_tuple = _setup(*args, **_kw)
        self.deactivate_venv(debug=debug)
    except Exception as e:
        return False, str(e)

    if isinstance(return_tuple, tuple):
        return return_tuple
    if isinstance(return_tuple, bool):
        return return_tuple, f"Setup for Plugin '{self.name}' did not return a message."
    if return_tuple is None:
        return False, f"Setup for Plugin '{self.name}' returned None."
    return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"
def uninstall(self, debug: bool = False) ‑> Tuple[bool, str]

Remove a plugin, its virtual environment, and archive file.

Expand source code
def uninstall(self, debug: bool = False) -> SuccessTuple:
    """
    Remove a plugin, its virtual environment, and archive file.
    """
    from meerschaum.utils.packages import reload_package
    from meerschaum.plugins import reload_plugins, sync_plugins_symlinks
    from meerschaum.utils.warnings import warn, info
    warnings_thrown_count: int = 0
    max_warnings: int = 3

    if not self.is_installed():
        info(
            f"Plugin '{self.name}' doesn't seem to be installed.\n    "
            + "Checking for artifacts...",
            stack = False,
        )
    else:
        real_path = pathlib.Path(os.path.realpath(self.__file__))
        try:
            if real_path.name == '__init__.py':
                shutil.rmtree(real_path.parent)
            else:
                real_path.unlink()
        except Exception as e:
            warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False)
            warnings_thrown_count += 1
        else:
            info(f"Removed source files for plugin '{self.name}'.")

    if self.venv_path.exists():
        success, msg = self.remove_venv(debug=debug)
        if not success:
            warn(msg, stack=False)
            warnings_thrown_count += 1
        else:
            info(f"Removed virtual environment from plugin '{self.name}'.")

    success = warnings_thrown_count < max_warnings
    sync_plugins_symlinks(debug=debug)
    self.deactivate_venv(force=True, debug=debug)
    reload_package('meerschaum')
    reload_plugins(debug=debug)
    return success, (
        f"Successfully uninstalled plugin '{self}'." if success
        else f"Failed to uninstall plugin '{self}'."
    )
class Venv (venv: "Union[str, 'Plugin', None]" = 'mrsm', debug: bool = False)

Manage a virtual enviroment's activation status.

Examples

>>> from meerschaum.plugins import Plugin
>>> with Venv('mrsm') as venv:
...     import pandas
>>> with Venv(Plugin('noaa')) as venv:
...     import requests
>>> venv = Venv('mrsm')
>>> venv.activate()
True
>>> venv.deactivate()
True
>>>
Expand source code
class Venv:
    """
    Manage a virtual enviroment's activation status.

    Examples
    --------
    >>> from meerschaum.plugins import Plugin
    >>> with Venv('mrsm') as venv:
    ...     import pandas
    >>> with Venv(Plugin('noaa')) as venv:
    ...     import requests
    >>> venv = Venv('mrsm')
    >>> venv.activate()
    True
    >>> venv.deactivate()
    True
    >>> 
    """

    def __init__(
            self,
            venv: Union[str, 'meerschaum.plugins.Plugin', None] = 'mrsm',
            debug: bool = False,
        ) -> None:
        from meerschaum.utils.venv import activate_venv, deactivate_venv, active_venvs
        ### For some weird threading issue,
        ### we can't use `isinstance` here.
        if 'meerschaum.plugins._Plugin' in str(type(venv)):
            self._venv = venv.name
            self._activate = venv.activate_venv
            self._deactivate = venv.deactivate_venv
            self._kwargs = {}
        else:
            self._venv = venv
            self._activate = activate_venv
            self._deactivate = deactivate_venv
            self._kwargs = {'venv': venv}
        self._debug = debug
        ### In case someone calls `deactivate()` before `activate()`.
        self._kwargs['previously_active_venvs'] = copy.deepcopy(active_venvs)


    def activate(self, debug: bool = False) -> bool:
        """
        Activate this virtual environment.
        If a `meerschaum.plugins.Plugin` was provided, its dependent virtual environments
        will also be activated.
        """
        from meerschaum.utils.venv import active_venvs
        self._kwargs['previously_active_venvs'] = copy.deepcopy(active_venvs)
        return self._activate(debug=(debug or self._debug), **self._kwargs)


    def deactivate(self, debug: bool = False) -> bool:
        """
        Deactivate this virtual environment.
        If a `meerschaum.plugins.Plugin` was provided, its dependent virtual environments
        will also be deactivated.
        """
        return self._deactivate(debug=(debug or self._debug), **self._kwargs)


    @property
    def target_path(self) -> pathlib.Path:
        """
        Return the target site-packages path for this virtual environment.
        A `meerschaum.utils.venv.Venv` may have one virtual environment per minor Python version
        (e.g. Python 3.10 and Python 3.7).
        """
        from meerschaum.utils.venv import venv_target_path
        return venv_target_path(venv=self._venv, allow_nonexistent=True, debug=self._debug)


    @property
    def root_path(self) -> pathlib.Path:
        """
        Return the top-level path for this virtual environment.
        """
        from meerschaum.config._paths import VIRTENV_RESOURCES_PATH
        return VIRTENV_RESOURCES_PATH / self._venv


    def __enter__(self) -> None:
        self.activate(debug=self._debug)


    def __exit__(self, exc_type, exc_value, exc_traceback) -> None:
        self.deactivate(debug=self._debug)


    def __str__(self) -> str:
        quote = "'" if self._venv is not None else ""
        return "Venv(" + quote + str(self._venv) + quote + ")"


    def __repr__(self) -> str:
        return self.__str__()

Instance variables

var root_path : pathlib.Path

Return the top-level path for this virtual environment.

Expand source code
@property
def root_path(self) -> pathlib.Path:
    """
    Return the top-level path for this virtual environment.
    """
    from meerschaum.config._paths import VIRTENV_RESOURCES_PATH
    return VIRTENV_RESOURCES_PATH / self._venv
var target_path : pathlib.Path

Return the target site-packages path for this virtual environment. A Venv may have one virtual environment per minor Python version (e.g. Python 3.10 and Python 3.7).

Expand source code
@property
def target_path(self) -> pathlib.Path:
    """
    Return the target site-packages path for this virtual environment.
    A `meerschaum.utils.venv.Venv` may have one virtual environment per minor Python version
    (e.g. Python 3.10 and Python 3.7).
    """
    from meerschaum.utils.venv import venv_target_path
    return venv_target_path(venv=self._venv, allow_nonexistent=True, debug=self._debug)

Methods

def activate(self, debug: bool = False) ‑> bool

Activate this virtual environment. If a Plugin was provided, its dependent virtual environments will also be activated.

Expand source code
def activate(self, debug: bool = False) -> bool:
    """
    Activate this virtual environment.
    If a `meerschaum.plugins.Plugin` was provided, its dependent virtual environments
    will also be activated.
    """
    from meerschaum.utils.venv import active_venvs
    self._kwargs['previously_active_venvs'] = copy.deepcopy(active_venvs)
    return self._activate(debug=(debug or self._debug), **self._kwargs)
def deactivate(self, debug: bool = False) ‑> bool

Deactivate this virtual environment. If a Plugin was provided, its dependent virtual environments will also be deactivated.

Expand source code
def deactivate(self, debug: bool = False) -> bool:
    """
    Deactivate this virtual environment.
    If a `meerschaum.plugins.Plugin` was provided, its dependent virtual environments
    will also be deactivated.
    """
    return self._deactivate(debug=(debug or self._debug), **self._kwargs)