Module meerschaum.plugins

Expose plugin management APIs from the meerschaum.plugins module.

Expand source code
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8

"""
Expose plugin management APIs from the `meerschaum.plugins` module.
"""

from __future__ import annotations
from meerschaum.utils.typing import Callable, Any, Union, Optional, Dict, List, Tuple
from meerschaum.utils.threading import Lock, RLock
from meerschaum.plugins._Plugin import Plugin

_api_plugins: Dict[str, List[Callable[['fastapi.App'], Any]]] = {}
_locks = {
    '_api_plugins': RLock(),
    '__path__': RLock(),
    'sys.path': RLock(),
    'internal_plugins': RLock(),
    '_synced_symlinks': RLock(),
    'PLUGINS_INTERNAL_LOCK_PATH': RLock(),
}
__all__ = (
    "Plugin", "make_action", "api_plugin", "import_plugins",
    "reload_plugins", "get_plugins", "get_data_plugins", "add_plugin_argument",
)
__pdoc__ = {
    'venvs': False, 'data': False, 'stack': False, 'plugins': False,
}

def make_action(
        function: Callable[[Any], Any],
        shell: bool = False,
        activate: bool = True,
        deactivate: bool = True,
        debug: bool = False
    ) -> Callable[[Any], Any]:
    """
    Make a function a Meerschaum action. Useful for plugins that are adding multiple actions.
    
    Parameters
    ----------
    function: Callable[[Any], Any]
        The function to become a Meerschaum action. Must accept all keyword arguments.
        
    shell: bool, default False
        Not used.
        
    Returns
    -------
    Another function (this is a decorator function).

    Examples
    --------
    >>> from meerschaum.plugins import make_action
    >>>
    >>> @make_action
    ... def my_action(**kw):
    ...     print('foo')
    ...     return True, "Success"
    >>>
    """

    from meerschaum.actions import actions
    from meerschaum.utils.formatting import pprint
    package_name = function.__globals__['__name__']
    plugin_name = (
        package_name.split('.')[1]
        if package_name.startswith('plugins.') else None
    )
    plugin = Plugin(plugin_name) if plugin_name else None

    if debug:
        from meerschaum.utils.debug import dprint
        dprint(
            f"Adding action '{function.__name__}' from plugin " +
            f"'{plugin}'..."
        )

    actions[function.__name__] = function
    return function


def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
    """
    Execute the function when initializing the Meerschaum API module.
    Useful for lazy-loading heavy plugins only when the API is started,
    such as when editing the `meerschaum.api.app` FastAPI app.
    
    The FastAPI app will be passed as the only parameter.
    
    Parameters
    ----------
    function: Callable[[Any, Any]]
        The function to be called before starting the Meerschaum API.
        
    Returns
    -------
    Another function (decorator function).

    Examples
    --------
    >>> from meerschaum.plugins import api_plugin
    >>>
    >>> @api_plugin
    >>> def initialize_plugin(app):
    ...     @app.get('/my/new/path')
    ...     def new_path():
    ...         return {'message' : 'It works!'}
    >>>
    """
    with _locks['_api_plugins']:
        try:
            if function.__module__ not in _api_plugins:
                _api_plugins[function.__module__] = []
            _api_plugins[function.__module__].append(function)
        except Exception as e:
            from meerschaum.utils.warnings import warn
            warn(e)
    return function


_synced_symlinks: bool = False
def sync_plugins_symlinks(debug: bool = False, warn: bool = True) -> None:
    """
    Update the plugins 
    """
    global _synced_symlinks
    with _locks['_synced_symlinks']:
        if _synced_symlinks:
            return

    import sys, os, pathlib, time
    from collections import defaultdict
    import importlib.util
    from meerschaum.utils.misc import flatten_list, make_symlink, is_symlink
    from meerschaum.utils.warnings import error, warn as _warn
    from meerschaum.config.static import STATIC_CONFIG
    from meerschaum.utils.venv import Venv, activate_venv, deactivate_venv, is_venv_active
    from meerschaum.config._paths import (
        PLUGINS_RESOURCES_PATH,
        PLUGINS_ARCHIVES_RESOURCES_PATH,
        PLUGINS_INIT_PATH,
        PLUGINS_DIR_PATHS,
        PLUGINS_INTERNAL_LOCK_PATH,
    )

    ### If the lock file exists, sleep for up to a second or until it's removed before continuing.
    with _locks['PLUGINS_INTERNAL_LOCK_PATH']:
        if PLUGINS_INTERNAL_LOCK_PATH.exists():
            lock_sleep_total     = STATIC_CONFIG['plugins']['lock_sleep_total']
            lock_sleep_increment = STATIC_CONFIG['plugins']['lock_sleep_increment']
            lock_start = time.perf_counter()
            while (
                (time.perf_counter() - lock_start) < lock_sleep_total
            ):
                time.sleep(lock_sleep_increment)
                if not PLUGINS_INTERNAL_LOCK_PATH.exists():
                    break
                try:
                    PLUGINS_INTERNAL_LOCK_PATH.unlink()
                except Exception as e:
                    if warn:
                        _warn(f"Error while removing lockfile {PLUGINS_INTERNAL_LOCK_PATH}:\n{e}")
                    break

        ### Begin locking from other processes.
        try:
            PLUGINS_INTERNAL_LOCK_PATH.touch()
        except Exception as e:
            if warn:
                _warn(f"Unable to create lockfile {PLUGINS_INTERNAL_LOCK_PATH}:\n{e}")

    with _locks['internal_plugins']:
        if is_symlink(PLUGINS_RESOURCES_PATH) or not PLUGINS_RESOURCES_PATH.exists():
            try:
                PLUGINS_RESOURCES_PATH.unlink()
            except Exception as e:
                pass

        PLUGINS_RESOURCES_PATH.mkdir(exist_ok=True)


        existing_symlinked_paths = [
            (PLUGINS_RESOURCES_PATH / item) 
            for item in os.listdir(PLUGINS_RESOURCES_PATH)
        ]
        for plugins_path in PLUGINS_DIR_PATHS:
            if not plugins_path.exists():
                plugins_path.mkdir(exist_ok=True, parents=True)
        plugins_to_be_symlinked = list(flatten_list(
            [
                [
                    (plugins_path / item)
                    for item in os.listdir(plugins_path)
                    if (
                        not item.startswith('.')
                    ) and (item not in ('__pycache__', '__init__.py'))
                ]
                for plugins_path in PLUGINS_DIR_PATHS
            ]
        ))

        ### Check for duplicates.
        seen_plugins = defaultdict(lambda: 0)
        for plugin_path in plugins_to_be_symlinked:
            plugin_name = plugin_path.stem
            seen_plugins[plugin_name] += 1
        for plugin_name, plugin_count in seen_plugins.items():
            if plugin_count > 1:
                if warn:
                    _warn(f"Found duplicate plugins named '{plugin_name}'.")

        for plugin_symlink_path in existing_symlinked_paths:
            real_path = pathlib.Path(os.path.realpath(plugin_symlink_path))

            ### Remove invalid symlinks.
            if real_path not in plugins_to_be_symlinked:
                if not is_symlink(plugin_symlink_path):
                    continue
                try:
                    plugin_symlink_path.unlink()
                except Exception as e:
                    pass

            ### Remove valid plugins from the to-be-symlinked list.
            else:
                plugins_to_be_symlinked.remove(real_path)

        for plugin_path in plugins_to_be_symlinked:
            plugin_symlink_path = PLUGINS_RESOURCES_PATH / plugin_path.name
            try:
                ### There might be duplicate folders (e.g. __pycache__).
                if (
                    plugin_symlink_path.exists()
                    and
                    plugin_symlink_path.is_dir()
                    and
                    not is_symlink(plugin_symlink_path)
                ):
                    continue
                success, msg = make_symlink(plugin_path, plugin_symlink_path)
            except Exception as e:
                success, msg = False, str(e)
            if not success:
                if warn:
                    _warn(
                        f"Failed to create symlink {plugin_symlink_path} "
                        + f"to {plugin_path}:\n    {msg}"
                    )

    ### Release symlink lock file in case other processes need it.
    with _locks['PLUGINS_INTERNAL_LOCK_PATH']:
        try:
            if PLUGINS_INTERNAL_LOCK_PATH.exists():
                PLUGINS_INTERNAL_LOCK_PATH.unlink()
        ### Sometimes competing threads will delete the lock file at the same time.
        except FileNotFoundError:
            pass
        except Exception as e:
            if warn:
                _warn(f"Error cleaning up lockfile {PLUGINS_INTERNAL_LOCK_PATH}:\n{e}")

        try:
            if not PLUGINS_INIT_PATH.exists():
                PLUGINS_INIT_PATH.touch()
        except Exception as e:
            error(f"Failed to create the file '{PLUGINS_INIT_PATH}':\n{e}")

    with _locks['__path__']:
        if str(PLUGINS_RESOURCES_PATH.parent) not in __path__:
            __path__.append(str(PLUGINS_RESOURCES_PATH.parent))

    with _locks['_synced_symlinks']:
        _synced_symlinks = True


def import_plugins(
        *plugins_to_import: Union[str, List[str], None],
        warn: bool = True,
    ) -> Union[
        'ModuleType', Tuple['ModuleType', None]
    ]:
    """
    Import the Meerschaum plugins directory.

    Parameters
    ----------
    plugins_to_import: Union[str, List[str], None]
        If provided, only import the specified plugins.
        Otherwise import the entire plugins module. May be a string, list, or `None`.
        Defaults to `None`.

    Returns
    -------
    A module of list of modules, depening on the number of plugins provided.

    """
    import sys
    import os
    import importlib
    from meerschaum.utils.misc import flatten_list
    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
    from meerschaum.utils.venv import is_venv_active, activate_venv, deactivate_venv, Venv
    from meerschaum.utils.warnings import warn as _warn
    plugins_to_import = list(plugins_to_import)
    with _locks['sys.path']:

        ### Since plugins may depend on other plugins,
        ### we need to activate the virtual environments for library plugins.
        ### This logic exists in `Plugin.activate_venv()`,
        ### but that code requires the plugin's module to already be imported.
        ### It's not a guarantee of correct activation order,
        ### e.g. if a library plugin pins a specific package and another 
        plugins_names = get_plugins_names()
        already_active_venvs = [is_venv_active(plugin_name) for plugin_name in plugins_names]

        if not sys.path or sys.path[0] != str(PLUGINS_RESOURCES_PATH.parent):
            sys.path.insert(0, str(PLUGINS_RESOURCES_PATH.parent))

        if not plugins_to_import:
            for plugin_name in plugins_names:
                activate_venv(plugin_name)
            try:
                imported_plugins = importlib.import_module(PLUGINS_RESOURCES_PATH.stem)
            except ImportError as e:
                _warn(f"Failed to import the plugins module:\n    {e}")
                import traceback
                traceback.print_exc()
                imported_plugins = None
            for plugin_name in plugins_names:
                if plugin_name in already_active_venvs:
                    continue
                deactivate_venv(plugin_name)

        else:
            imported_plugins = []
            for plugin_name in flatten_list(plugins_to_import):
                plugin = Plugin(plugin_name)
                try:
                    with Venv(plugin):
                        imported_plugins.append(
                            importlib.import_module(
                                f'{PLUGINS_RESOURCES_PATH.stem}.{plugin_name}'
                            )
                        )
                except Exception as e:
                    _warn(
                        f"Failed to import plugin '{plugin_name}':\n    "
                        + f"{e}\n\nHere's a stacktrace:",
                        stack = False,
                    )
                    from meerschaum.utils.formatting import get_console
                    get_console().print_exception(
                        suppress = [
                            'meerschaum/plugins/__init__.py',
                            importlib,
                            importlib._bootstrap,
                        ]
                    )
                    imported_plugins.append(None)

        if imported_plugins is None and warn:
            _warn(f"Failed to import plugins.", stacklevel=3)

        if str(PLUGINS_RESOURCES_PATH.parent) in sys.path:
            sys.path.remove(str(PLUGINS_RESOURCES_PATH.parent))

    if isinstance(imported_plugins, list):
        return (imported_plugins[0] if len(imported_plugins) == 1 else tuple(imported_plugins))
    return imported_plugins


def load_plugins(debug: bool = False, shell: bool = False) -> None:
    """
    Import Meerschaum plugins and update the actions dictionary.
    """
    from inspect import isfunction, getmembers
    from meerschaum.actions import __all__ as _all, modules
    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
    from meerschaum.utils.packages import get_modules_from_package
    if debug:
        from meerschaum.utils.debug import dprint

    _plugins_names, plugins_modules = get_modules_from_package(
        import_plugins(),
        names = True,
        recursive = True,
        modules_venvs = True
    )
    ### I'm appending here to keep from redefining the modules list.
    new_modules = (
        [
            mod for mod in modules
            if not mod.__name__.startswith(PLUGINS_RESOURCES_PATH.stem + '.')
        ]
        + plugins_modules
    )
    n_mods = len(modules)
    for mod in new_modules:
        modules.append(mod)
    for i in range(n_mods):
        modules.pop(0)

    for module in plugins_modules:
        for name, func in getmembers(module):
            if not isfunction(func):
                continue
            if name == module.__name__.split('.')[-1]:
                make_action(func, **{'shell': shell, 'debug': debug})


def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
    """
    Reload plugins back into memory.

    Parameters
    ----------
    plugins: Optional[List[str]], default None
        The plugins to reload. `None` will reload all plugins.

    """
    import sys
    if debug:
        from meerschaum.utils.debug import dprint

    if not plugins:
        plugins = get_plugins_names()
    for plugin_name in plugins:
        if debug:
            dprint(f"Reloading plugin '{plugin_name}'...")
        mod_name = 'plugins.' + str(plugin_name)
        if mod_name in sys.modules:
            del sys.modules[mod_name]
    load_plugins(debug=debug)


def get_plugins(*to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]:
    """
    Return a list of `Plugin` objects.

    Parameters
    ----------
    to_load:
        If specified, only load specific plugins.
        Otherwise return all plugins.

    try_import: bool, default True
        If `True`, allow for plugins to be imported.
    """
    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
    import os
    sync_plugins_symlinks()
    _plugins = [
        Plugin(name) for name in (
            to_load or [
                (
                    name if (PLUGINS_RESOURCES_PATH / name).is_dir()
                    else name[:-3]
                ) for name in os.listdir(PLUGINS_RESOURCES_PATH) if name != '__init__.py'
            ]
        )
    ]
    plugins = tuple(plugin for plugin in _plugins if plugin.is_installed(try_import=try_import))
    if len(to_load) == 1:
        return plugins[0]
    return plugins


def get_plugins_names(*to_load, **kw) -> List[str]:
    """
    Return a list of installed plugins.
    """
    return [plugin.name for plugin in get_plugins(*to_load, **kw)]


def get_plugins_modules(*to_load, **kw) -> List['ModuleType']:
    """
    Return a list of modules for the installed plugins, or `None` if things break.
    """
    return [plugin.module for plugin in get_plugins(*to_load, **kw)]


def get_data_plugins() -> List[Plugin]:
    """
    Only return the modules of plugins with either `fetch()` or `sync()` functions.
    """
    import inspect
    plugins = get_plugins()
    data_names = {'sync', 'fetch'}
    data_plugins = []
    for plugin in plugins:
        for name, ob in inspect.getmembers(plugin.module):
            if not inspect.isfunction(ob):
                continue
            if name not in data_names:
                continue
            data_plugins.append(plugin)
    return data_plugins


def add_plugin_argument(*args, **kwargs) -> None:
    """
    Add argparse arguments under the 'Plugins options' group.
    Takes the same parameters as the regular argparse `add_argument()` function.

    Examples
    --------
    >>> add_plugin_argument('--foo', type=int, help="This is my help text!")
    >>> 
    """
    from meerschaum._internal.arguments._parser import groups, _seen_plugin_args, parser
    from meerschaum.utils.warnings import warn, error
    _parent_plugin_name = _get_parent_plugin(2)
    title = f"Plugin '{_parent_plugin_name}' options" if _parent_plugin_name else 'Custom options'
    group_key = 'plugin_' + (_parent_plugin_name or '')
    if group_key not in groups:
        groups[group_key] = parser.add_argument_group(
            title = title,
        )
        _seen_plugin_args[group_key] = set()
    try:
        if str(args) not in _seen_plugin_args[group_key]:
            groups[group_key].add_argument(*args, **kwargs)
            _seen_plugin_args[group_key].add(str(args))
    except Exception as e:
        warn(e)


def _get_parent_plugin(stacklevel: int = 1) -> Union[str, None]:
    """If this function is called from outside a Meerschaum plugin, it will return None."""
    import inspect, re
    try:
        parent_globals = inspect.stack()[stacklevel][0].f_globals
        parent_file = parent_globals.get('__file__', '')
        return parent_globals['__name__'].replace('plugins.', '').split('.')[0]
    except Exception as e:
        return None

Functions

def add_plugin_argument(*args, **kwargs) ‑> None

Add argparse arguments under the 'Plugins options' group. Takes the same parameters as the regular argparse add_argument() function.

Examples

>>> add_plugin_argument('--foo', type=int, help="This is my help text!")
>>>
Expand source code
def add_plugin_argument(*args, **kwargs) -> None:
    """
    Add argparse arguments under the 'Plugins options' group.
    Takes the same parameters as the regular argparse `add_argument()` function.

    Examples
    --------
    >>> add_plugin_argument('--foo', type=int, help="This is my help text!")
    >>> 
    """
    from meerschaum._internal.arguments._parser import groups, _seen_plugin_args, parser
    from meerschaum.utils.warnings import warn, error
    _parent_plugin_name = _get_parent_plugin(2)
    title = f"Plugin '{_parent_plugin_name}' options" if _parent_plugin_name else 'Custom options'
    group_key = 'plugin_' + (_parent_plugin_name or '')
    if group_key not in groups:
        groups[group_key] = parser.add_argument_group(
            title = title,
        )
        _seen_plugin_args[group_key] = set()
    try:
        if str(args) not in _seen_plugin_args[group_key]:
            groups[group_key].add_argument(*args, **kwargs)
            _seen_plugin_args[group_key].add(str(args))
    except Exception as e:
        warn(e)
def api_plugin(function: Callable[[Any], Any]) ‑> Callable[[Any], Any]

Execute the function when initializing the Meerschaum API module. Useful for lazy-loading heavy plugins only when the API is started, such as when editing the meerschaum.api.app FastAPI app.

The FastAPI app will be passed as the only parameter.

Parameters

function : Callable[[Any, Any]]
The function to be called before starting the Meerschaum API.

Returns

Another function (decorator function).

Examples

>>> from meerschaum.plugins import api_plugin
>>>
>>> @api_plugin
>>> def initialize_plugin(app):
...     @app.get('/my/new/path')
...     def new_path():
...         return {'message' : 'It works!'}
>>>
Expand source code
def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
    """
    Execute the function when initializing the Meerschaum API module.
    Useful for lazy-loading heavy plugins only when the API is started,
    such as when editing the `meerschaum.api.app` FastAPI app.
    
    The FastAPI app will be passed as the only parameter.
    
    Parameters
    ----------
    function: Callable[[Any, Any]]
        The function to be called before starting the Meerschaum API.
        
    Returns
    -------
    Another function (decorator function).

    Examples
    --------
    >>> from meerschaum.plugins import api_plugin
    >>>
    >>> @api_plugin
    >>> def initialize_plugin(app):
    ...     @app.get('/my/new/path')
    ...     def new_path():
    ...         return {'message' : 'It works!'}
    >>>
    """
    with _locks['_api_plugins']:
        try:
            if function.__module__ not in _api_plugins:
                _api_plugins[function.__module__] = []
            _api_plugins[function.__module__].append(function)
        except Exception as e:
            from meerschaum.utils.warnings import warn
            warn(e)
    return function
def get_data_plugins() ‑> List[meerschaum.plugins._Plugin.Plugin]

Only return the modules of plugins with either fetch() or sync() functions.

Expand source code
def get_data_plugins() -> List[Plugin]:
    """
    Only return the modules of plugins with either `fetch()` or `sync()` functions.
    """
    import inspect
    plugins = get_plugins()
    data_names = {'sync', 'fetch'}
    data_plugins = []
    for plugin in plugins:
        for name, ob in inspect.getmembers(plugin.module):
            if not inspect.isfunction(ob):
                continue
            if name not in data_names:
                continue
            data_plugins.append(plugin)
    return data_plugins
def get_plugins(*to_load, try_import: bool = True) ‑> Union[Tuple[meerschaum.plugins._Plugin.Plugin], meerschaum.plugins._Plugin.Plugin]

Return a list of Plugin objects.

Parameters

to_load: If specified, only load specific plugins. Otherwise return all plugins.

try_import : bool, default True
If True, allow for plugins to be imported.
Expand source code
def get_plugins(*to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]:
    """
    Return a list of `Plugin` objects.

    Parameters
    ----------
    to_load:
        If specified, only load specific plugins.
        Otherwise return all plugins.

    try_import: bool, default True
        If `True`, allow for plugins to be imported.
    """
    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
    import os
    sync_plugins_symlinks()
    _plugins = [
        Plugin(name) for name in (
            to_load or [
                (
                    name if (PLUGINS_RESOURCES_PATH / name).is_dir()
                    else name[:-3]
                ) for name in os.listdir(PLUGINS_RESOURCES_PATH) if name != '__init__.py'
            ]
        )
    ]
    plugins = tuple(plugin for plugin in _plugins if plugin.is_installed(try_import=try_import))
    if len(to_load) == 1:
        return plugins[0]
    return plugins
def import_plugins(*plugins_to_import: Union[str, List[str], None], warn: bool = True) ‑> Union['ModuleType', Tuple['ModuleType', None]]

Import the Meerschaum plugins directory.

Parameters

plugins_to_import : Union[str, List[str], None]
If provided, only import the specified plugins. Otherwise import the entire plugins module. May be a string, list, or None. Defaults to None.

Returns

A module of list of modules, depening on the number of plugins provided.

Expand source code
def import_plugins(
        *plugins_to_import: Union[str, List[str], None],
        warn: bool = True,
    ) -> Union[
        'ModuleType', Tuple['ModuleType', None]
    ]:
    """
    Import the Meerschaum plugins directory.

    Parameters
    ----------
    plugins_to_import: Union[str, List[str], None]
        If provided, only import the specified plugins.
        Otherwise import the entire plugins module. May be a string, list, or `None`.
        Defaults to `None`.

    Returns
    -------
    A module of list of modules, depening on the number of plugins provided.

    """
    import sys
    import os
    import importlib
    from meerschaum.utils.misc import flatten_list
    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
    from meerschaum.utils.venv import is_venv_active, activate_venv, deactivate_venv, Venv
    from meerschaum.utils.warnings import warn as _warn
    plugins_to_import = list(plugins_to_import)
    with _locks['sys.path']:

        ### Since plugins may depend on other plugins,
        ### we need to activate the virtual environments for library plugins.
        ### This logic exists in `Plugin.activate_venv()`,
        ### but that code requires the plugin's module to already be imported.
        ### It's not a guarantee of correct activation order,
        ### e.g. if a library plugin pins a specific package and another 
        plugins_names = get_plugins_names()
        already_active_venvs = [is_venv_active(plugin_name) for plugin_name in plugins_names]

        if not sys.path or sys.path[0] != str(PLUGINS_RESOURCES_PATH.parent):
            sys.path.insert(0, str(PLUGINS_RESOURCES_PATH.parent))

        if not plugins_to_import:
            for plugin_name in plugins_names:
                activate_venv(plugin_name)
            try:
                imported_plugins = importlib.import_module(PLUGINS_RESOURCES_PATH.stem)
            except ImportError as e:
                _warn(f"Failed to import the plugins module:\n    {e}")
                import traceback
                traceback.print_exc()
                imported_plugins = None
            for plugin_name in plugins_names:
                if plugin_name in already_active_venvs:
                    continue
                deactivate_venv(plugin_name)

        else:
            imported_plugins = []
            for plugin_name in flatten_list(plugins_to_import):
                plugin = Plugin(plugin_name)
                try:
                    with Venv(plugin):
                        imported_plugins.append(
                            importlib.import_module(
                                f'{PLUGINS_RESOURCES_PATH.stem}.{plugin_name}'
                            )
                        )
                except Exception as e:
                    _warn(
                        f"Failed to import plugin '{plugin_name}':\n    "
                        + f"{e}\n\nHere's a stacktrace:",
                        stack = False,
                    )
                    from meerschaum.utils.formatting import get_console
                    get_console().print_exception(
                        suppress = [
                            'meerschaum/plugins/__init__.py',
                            importlib,
                            importlib._bootstrap,
                        ]
                    )
                    imported_plugins.append(None)

        if imported_plugins is None and warn:
            _warn(f"Failed to import plugins.", stacklevel=3)

        if str(PLUGINS_RESOURCES_PATH.parent) in sys.path:
            sys.path.remove(str(PLUGINS_RESOURCES_PATH.parent))

    if isinstance(imported_plugins, list):
        return (imported_plugins[0] if len(imported_plugins) == 1 else tuple(imported_plugins))
    return imported_plugins
def make_action(function: Callable[[Any], Any], shell: bool = False, activate: bool = True, deactivate: bool = True, debug: bool = False) ‑> Callable[[Any], Any]

Make a function a Meerschaum action. Useful for plugins that are adding multiple actions.

Parameters

function : Callable[[Any], Any]
The function to become a Meerschaum action. Must accept all keyword arguments.
shell : bool, default False
Not used.

Returns

Another function (this is a decorator function).

Examples

>>> from meerschaum.plugins import make_action
>>>
>>> @make_action
... def my_action(**kw):
...     print('foo')
...     return True, "Success"
>>>
Expand source code
def make_action(
        function: Callable[[Any], Any],
        shell: bool = False,
        activate: bool = True,
        deactivate: bool = True,
        debug: bool = False
    ) -> Callable[[Any], Any]:
    """
    Make a function a Meerschaum action. Useful for plugins that are adding multiple actions.
    
    Parameters
    ----------
    function: Callable[[Any], Any]
        The function to become a Meerschaum action. Must accept all keyword arguments.
        
    shell: bool, default False
        Not used.
        
    Returns
    -------
    Another function (this is a decorator function).

    Examples
    --------
    >>> from meerschaum.plugins import make_action
    >>>
    >>> @make_action
    ... def my_action(**kw):
    ...     print('foo')
    ...     return True, "Success"
    >>>
    """

    from meerschaum.actions import actions
    from meerschaum.utils.formatting import pprint
    package_name = function.__globals__['__name__']
    plugin_name = (
        package_name.split('.')[1]
        if package_name.startswith('plugins.') else None
    )
    plugin = Plugin(plugin_name) if plugin_name else None

    if debug:
        from meerschaum.utils.debug import dprint
        dprint(
            f"Adding action '{function.__name__}' from plugin " +
            f"'{plugin}'..."
        )

    actions[function.__name__] = function
    return function
def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) ‑> None

Reload plugins back into memory.

Parameters

plugins : Optional[List[str]], default None
The plugins to reload. None will reload all plugins.
Expand source code
def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
    """
    Reload plugins back into memory.

    Parameters
    ----------
    plugins: Optional[List[str]], default None
        The plugins to reload. `None` will reload all plugins.

    """
    import sys
    if debug:
        from meerschaum.utils.debug import dprint

    if not plugins:
        plugins = get_plugins_names()
    for plugin_name in plugins:
        if debug:
            dprint(f"Reloading plugin '{plugin_name}'...")
        mod_name = 'plugins.' + str(plugin_name)
        if mod_name in sys.modules:
            del sys.modules[mod_name]
    load_plugins(debug=debug)

Classes

class Plugin (name: str, version: Optional[str] = None, user_id: Optional[int] = None, required: Optional[List[str]] = None, attributes: Optional[Dict[str, Any]] = None, archive_path: Optional[pathlib.Path] = None, venv_path: Optional[pathlib.Path] = None, repo_connector: "Optional['meerschaum.connectors.api.APIConnector']" = None, repo: "Union['meerschaum.connectors.api.APIConnector', str, None]" = None)

Handle packaging of Meerschaum plugins.

Expand source code
class Plugin:
    """Handle packaging of Meerschaum plugins."""
    def __init__(
        self,
        name: str,
        version: Optional[str] = None,
        user_id: Optional[int] = None,
        required: Optional[List[str]] = None,
        attributes: Optional[Dict[str, Any]] = None,
        archive_path: Optional[pathlib.Path] = None,
        venv_path: Optional[pathlib.Path] = None,
        repo_connector: Optional['meerschaum.connectors.api.APIConnector'] = None,
        repo: Union['meerschaum.connectors.api.APIConnector', str, None] = None,
    ):
        from meerschaum.config.static import STATIC_CONFIG
        sep = STATIC_CONFIG['plugins']['repo_separator']
        _repo = None
        if sep in name:
            try:
                name, _repo = name.split(sep)
            except Exception as e:
                error(f"Invalid plugin name: '{name}'")
        self._repo_in_name = _repo

        if attributes is None:
            attributes = {}
        self.name = name
        self.attributes = attributes
        self.user_id = user_id
        self._version = version
        if required:
            self._required = required
        self.archive_path = (
            archive_path if archive_path is not None
            else PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz"
        )
        self.venv_path = (
            venv_path if venv_path is not None
            else VIRTENV_RESOURCES_PATH / self.name
        )
        self._repo_connector = repo_connector
        self._repo_keys = repo


    @property
    def repo_connector(self):
        """
        Return the repository connector for this plugin.
        NOTE: This imports the `connectors` module, which imports certain plugin modules.
        """
        if self._repo_connector is None:
            from meerschaum.connectors.parse import parse_repo_keys

            repo_keys = self._repo_keys or self._repo_in_name
            if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name:
                error(
                    f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'."
                )
            repo_connector = parse_repo_keys(repo_keys)
            self._repo_connector = repo_connector
        return self._repo_connector


    @property
    def version(self):
        """
        Return the plugin's module version is defined (`__version__`) if it's defined.
        """
        if self._version is None:
            try:
                self._version = self.module.__version__
            except Exception as e:
                self._version = None
        return self._version


    @property
    def module(self):
        """
        Return the Python module of the underlying plugin.
        """
        if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None:
            if self.__file__ is None:
                return None
            from meerschaum.plugins import import_plugins
            self._module = import_plugins(str(self), warn=False)
        return self._module


    @property
    def __file__(self) -> Union[str, None]:
        """
        Return the file path (str) of the plugin if it exists, otherwise `None`.
        """
        if self.__dict__.get('_module', None) is not None:
            return self.module.__file__

        potential_dir = PLUGINS_RESOURCES_PATH / self.name
        if (
            potential_dir.exists()
            and potential_dir.is_dir()
            and (potential_dir / '__init__.py').exists()
        ):
            return str((potential_dir / '__init__.py').as_posix())

        potential_file = PLUGINS_RESOURCES_PATH / (self.name + '.py')
        if potential_file.exists() and not potential_file.is_dir():
            return str(potential_file.as_posix())

        return None


    @property
    def requirements_file_path(self) -> Union[pathlib.Path, None]:
        """
        If a file named `requirements.txt` exists, return its path.
        """
        if self.__file__ is None:
            return None
        path = pathlib.Path(self.__file__).parent / 'requirements.txt'
        if not path.exists():
            return None
        return path


    def is_installed(self, **kw) -> bool:
        """
        Check whether a plugin is correctly installed.

        Returns
        -------
        A `bool` indicating whether a plugin exists and is successfully imported.
        """
        return self.__file__ is not None


    def make_tar(self, debug: bool = False) -> pathlib.Path:
        """
        Compress the plugin's source files into a `.tar.gz` archive and return the archive's path.

        Parameters
        ----------
        debug: bool, default False
            Verbosity toggle.

        Returns
        -------
        A `pathlib.Path` to the archive file's path.

        """
        import tarfile, pathlib, subprocess, fnmatch
        from meerschaum.utils.debug import dprint
        from meerschaum.utils.packages import attempt_import
        pathspec = attempt_import('pathspec', debug=debug)

        if not self.__file__:
            from meerschaum.utils.warnings import error
            error(f"Could not find file for plugin '{self}'.")
        if '__init__.py' in self.__file__ or os.path.isdir(self.__file__):
            path = self.__file__.replace('__init__.py', '')
            is_dir = True
        else:
            path = self.__file__
            is_dir = False

        old_cwd = os.getcwd()
        real_parent_path = pathlib.Path(os.path.realpath(path)).parent
        os.chdir(real_parent_path)

        default_patterns_to_ignore = [
            '.pyc',
            '__pycache__/',
            'eggs/',
            '__pypackages__/',
            '.git',
        ]

        def parse_gitignore() -> 'Set[str]':
            gitignore_path = pathlib.Path(path) / '.gitignore'
            if not gitignore_path.exists():
                return set()
            with open(gitignore_path, 'r', encoding='utf-8') as f:
                gitignore_text = f.read()
            return set(pathspec.PathSpec.from_lines(
                pathspec.patterns.GitWildMatchPattern,
                default_patterns_to_ignore + gitignore_text.splitlines()
            ).match_tree(path))

        patterns_to_ignore = parse_gitignore() if is_dir else set()

        if debug:
            dprint(f"Patterns to ignore:\n{patterns_to_ignore}")

        with tarfile.open(self.archive_path, 'w:gz') as tarf:
            if not is_dir:
                tarf.add(f"{self.name}.py")
            else:
                for root, dirs, files in os.walk(self.name):
                    for f in files:
                        good_file = True
                        fp = os.path.join(root, f)
                        for pattern in patterns_to_ignore:
                            if pattern in str(fp) or f.startswith('.'):
                                good_file = False
                                break
                        if good_file:
                            if debug:
                                dprint(f"Adding '{fp}'...")
                            tarf.add(fp)

        ### clean up and change back to old directory
        os.chdir(old_cwd)

        ### change to 775 to avoid permissions issues with the API in a Docker container
        self.archive_path.chmod(0o775)

        if debug:
            dprint(f"Created archive '{self.archive_path}'.")
        return self.archive_path


    def install(
            self,
            force: bool = False,
            debug: bool = False,
        ) -> SuccessTuple:
        """
        Extract a plugin's tar archive to the plugins directory.
        
        This function checks if the plugin is already installed and if the version is equal or
        greater than the existing installation.

        Parameters
        ----------
        force: bool, default False
            If `True`, continue with installation, even if required packages fail to install.

        debug: bool, default False
            Verbosity toggle.

        Returns
        -------
        A `SuccessTuple` of success (bool) and a message (str).

        """
        if self.full_name in _ongoing_installations:
            return True, f"Already installing plugin '{self}'."
        _ongoing_installations.add(self.full_name)
        from meerschaum.utils.warnings import warn, error
        if debug:
            from meerschaum.utils.debug import dprint
        import tarfile
        import re
        import ast
        from meerschaum.plugins import reload_plugins, sync_plugins_symlinks
        from meerschaum.utils.packages import attempt_import, determine_version, reload_package
        from meerschaum.utils.venv import init_venv
        from meerschaum.utils.misc import safely_extract_tar
        old_cwd = os.getcwd()
        old_version = ''
        new_version = ''
        temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name
        temp_dir.mkdir(exist_ok=True)

        if not self.archive_path.exists():
            return False, f"Missing archive file for plugin '{self}'."
        if self.version is not None:
            old_version = self.version
            if debug:
                dprint(f"Found existing version '{old_version}' for plugin '{self}'.")

        if debug:
            dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...")

        try:
            with tarfile.open(self.archive_path, 'r:gz') as tarf:
                safely_extract_tar(tarf, temp_dir)
        except Exception as e:
            warn(e)
            return False, f"Failed to extract plugin '{self.name}'."

        ### search for version information
        files = os.listdir(temp_dir)
        
        if str(files[0]) == self.name:
            is_dir = True
        elif str(files[0]) == self.name + '.py':
            is_dir = False
        else:
            error(f"Unknown format encountered for plugin '{self}'.")

        fpath = temp_dir / files[0]
        if is_dir:
            fpath = fpath / '__init__.py'

        init_venv(self.name, debug=debug)
        with open(fpath, 'r', encoding='utf-8') as f:
            init_lines = f.readlines()
        new_version = None
        for line in init_lines:
            if '__version__' not in line:
                continue
            version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip())
            if not version_match:
                continue
            new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip())
            break
        if not new_version:
            warn(
                f"No `__version__` defined for plugin '{self}'. "
                + "Assuming new version...",
                stack = False,
            )

        packaging_version = attempt_import('packaging.version')
        try:
            is_new_version = (not new_version and not old_version) or (
                packaging_version.parse(old_version) < packaging_version.parse(new_version)
            )
            is_same_version = new_version and old_version and (
                packaging_version.parse(old_version) == packaging_version.parse(new_version)
            )
        except Exception as e:
            is_new_version, is_same_version = True, False

        ### Determine where to permanently store the new plugin.
        plugin_installation_dir_path = PLUGINS_DIR_PATHS[0]
        for path in PLUGINS_DIR_PATHS:
            files_in_plugins_dir = os.listdir(path)
            if (
                self.name in files_in_plugins_dir
                or
                (self.name + '.py') in files_in_plugins_dir
            ):
                plugin_installation_dir_path = path
                break

        success_msg = f"Successfully installed plugin '{self}'."
        success, abort = None, None

        if is_same_version and not force:
            success, msg = True, (
                f"Plugin '{self}' is up-to-date (version {old_version}).\n" +
                "    Install again with `-f` or `--force` to reinstall."
            )
            abort = True
        elif is_new_version or force:
            for src_dir, dirs, files in os.walk(temp_dir):
                if success is not None:
                    break
                dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path))
                if not os.path.exists(dst_dir):
                    os.mkdir(dst_dir)
                for f in files:
                    src_file = os.path.join(src_dir, f)
                    dst_file = os.path.join(dst_dir, f)
                    if os.path.exists(dst_file):
                        os.remove(dst_file)

                    if debug:
                        dprint(f"Moving '{src_file}' to '{dst_dir}'...")
                    try:
                        shutil.move(src_file, dst_dir)
                    except Exception as e:
                        success, msg = False, (
                            f"Failed to install plugin '{self}': " +
                            f"Could not move file '{src_file}' to '{dst_dir}'"
                        )
                        print(msg)
                        break
            if success is None:
                success, msg = True, success_msg
        else:
            success, msg = False, (
                f"Your installed version of plugin '{self}' ({old_version}) is higher than "
                + f"attempted version {new_version}."
            )

        shutil.rmtree(temp_dir)
        os.chdir(old_cwd)

        ### Reload the plugin's module.
        sync_plugins_symlinks(debug=debug)
        if '_module' in self.__dict__:
            del self.__dict__['_module']
        init_venv(venv=self.name, force=True, debug=debug)
        reload_package('meerschaum')
        reload_plugins([self.name], debug=debug)

        ### if we've already failed, return here
        if not success or abort:
            _ongoing_installations.remove(self.full_name)
            return success, msg

        ### attempt to install dependencies
        if not self.install_dependencies(force=force, debug=debug):
            _ongoing_installations.remove(self.full_name)
            return False, f"Failed to install dependencies for plugin '{self}'."

        ### handling success tuple, bool, or other (typically None)
        setup_tuple = self.setup(debug=debug)
        if isinstance(setup_tuple, tuple):
            if not setup_tuple[0]:
                success, msg = setup_tuple
        elif isinstance(setup_tuple, bool):
            if not setup_tuple:
                success, msg = False, (
                    f"Failed to run post-install setup for plugin '{self}'." + '\n' +
                    f"Check `setup()` in '{self.__file__}' for more information " +
                    f"(no error message provided)."
                )
            else:
                success, msg = True, success_msg
        elif setup_tuple is None:
            success = True
            msg = (
                f"Post-install for plugin '{self}' returned None. " +
                f"Assuming plugin successfully installed."
            )
            warn(msg)
        else:
            success = False
            msg = (
                f"Post-install for plugin '{self}' returned unexpected value " +
                f"of type '{type(setup_tuple)}': {setup_tuple}"
            )

        _ongoing_installations.remove(self.full_name)
        module = self.module
        return success, msg


    def remove_archive(
            self,        
            debug: bool = False
        ) -> SuccessTuple:
        """Remove a plugin's archive file."""
        if not self.archive_path.exists():
            return True, f"Archive file for plugin '{self}' does not exist."
        try:
            self.archive_path.unlink()
        except Exception as e:
            return False, f"Failed to remove archive for plugin '{self}':\n{e}"
        return True, "Success"


    def remove_venv(
            self,        
            debug: bool = False
        ) -> SuccessTuple:
        """Remove a plugin's virtual environment."""
        if not self.venv_path.exists():
            return True, f"Virtual environment for plugin '{self}' does not exist."
        try:
            shutil.rmtree(self.venv_path)
        except Exception as e:
            return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}"
        return True, "Success"


    def uninstall(self, debug: bool = False) -> SuccessTuple:
        """
        Remove a plugin, its virtual environment, and archive file.
        """
        from meerschaum.utils.packages import reload_package
        from meerschaum.plugins import reload_plugins, sync_plugins_symlinks
        from meerschaum.utils.warnings import warn, info
        warnings_thrown_count: int = 0
        max_warnings: int = 3

        if not self.is_installed():
            info(
                f"Plugin '{self.name}' doesn't seem to be installed.\n    "
                + "Checking for artifacts...",
                stack = False,
            )
        else:
            real_path = pathlib.Path(os.path.realpath(self.__file__))
            try:
                if real_path.name == '__init__.py':
                    shutil.rmtree(real_path.parent)
                else:
                    real_path.unlink()
            except Exception as e:
                warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False)
                warnings_thrown_count += 1
            else:
                info(f"Removed source files for plugin '{self.name}'.")

        if self.venv_path.exists():
            success, msg = self.remove_venv(debug=debug)
            if not success:
                warn(msg, stack=False)
                warnings_thrown_count += 1
            else:
                info(f"Removed virtual environment from plugin '{self.name}'.")

        success = warnings_thrown_count < max_warnings
        sync_plugins_symlinks(debug=debug)
        self.deactivate_venv(force=True, debug=debug)
        reload_package('meerschaum')
        reload_plugins(debug=debug)
        return success, (
            f"Successfully uninstalled plugin '{self}'." if success
            else f"Failed to uninstall plugin '{self}'."
        )


    def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]:
        """
        If exists, run the plugin's `setup()` function.

        Parameters
        ----------
        *args: str
            The positional arguments passed to the `setup()` function.
            
        debug: bool, default False
            Verbosity toggle.

        **kw: Any
            The keyword arguments passed to the `setup()` function.

        Returns
        -------
        A `SuccessTuple` or `bool` indicating success.

        """
        from meerschaum.utils.debug import dprint
        import inspect
        _setup = None
        for name, fp in inspect.getmembers(self.module):
            if name == 'setup' and inspect.isfunction(fp):
                _setup = fp
                break

        ### assume success if no setup() is found (not necessary)
        if _setup is None:
            return True

        sig = inspect.signature(_setup)
        has_debug, has_kw = ('debug' in sig.parameters), False
        for k, v in sig.parameters.items():
            if '**' in str(v):
                has_kw = True
                break

        _kw = {}
        if has_kw:
            _kw.update(kw)
        if has_debug:
            _kw['debug'] = debug

        if debug:
            dprint(f"Running setup for plugin '{self}'...")
        try:
            self.activate_venv(debug=debug)
            return_tuple = _setup(*args, **_kw)
            self.deactivate_venv(debug=debug)
        except Exception as e:
            return False, str(e)

        if isinstance(return_tuple, tuple):
            return return_tuple
        if isinstance(return_tuple, bool):
            return return_tuple, f"Setup for Plugin '{self.name}' did not return a message."
        if return_tuple is None:
            return False, f"Setup for Plugin '{self.name}' returned None."
        return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"


    def get_dependencies(
            self,
            debug: bool = False,
        ) -> List[str]:
        """
        If the Plugin has specified dependencies in a list called `required`, return the list.
        
        **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages.
        Meerschaum plugins may also specify connector keys for a repo after `'@'`.

        Parameters
        ----------
        debug: bool, default False
            Verbosity toggle.

        Returns
        -------
        A list of required packages and plugins (str).

        """
        if '_required' in self.__dict__:
            return self._required

        ### If the plugin has not yet been imported,
        ### infer the dependencies from the source text.
        ### This is not super robust, and it doesn't feel right
        ### having multiple versions of the logic.
        ### This is necessary when determining the activation order
        ### without having import the module.
        ### For consistency's sake, the module-less method does not cache the requirements.
        if self.__dict__.get('_module', None) is None:
            file_path = self.__file__
            if file_path is None:
                return []
            with open(file_path, 'r', encoding='utf-8') as f:
                text = f.read()

            if 'required' not in text:
                return []

            ### This has some limitations:
            ### It relies on `required` being manually declared.
            ### We lose the ability to dynamically alter the `required` list,
            ### which is why we've kept the module-reliant method below.
            import ast, re
            ### NOTE: This technically would break 
            ### if `required` was the very first line of the file.
            req_start_match = re.search(r'\nrequired(\s?)=', text)
            if not req_start_match:
                return []
            req_start = req_start_match.start()

            ### Dependencies may have brackets within the strings, so push back the index.
            first_opening_brace = req_start + 1 + text[req_start:].find('[')
            if first_opening_brace == -1:
                return []

            next_closing_brace = req_start + 1 + text[req_start:].find(']')
            if next_closing_brace == -1:
                return []

            start_ix = first_opening_brace + 1
            end_ix = next_closing_brace

            num_braces = 0
            while True:
                if '[' not in text[start_ix:end_ix]:
                    break
                num_braces += 1
                start_ix = end_ix
                end_ix += text[end_ix + 1:].find(']') + 1

            req_end = end_ix + 1
            req_text = (
                text[req_start:req_end]
                .lstrip()
                .replace('required', '', 1)
                .lstrip()
                .replace('=', '', 1)
                .lstrip()
            )
            try:
                required = ast.literal_eval(req_text)
            except Exception as e:
                warn(
                    f"Unable to determine requirements for plugin '{self.name}' "
                    + "without importing the module.\n"
                    + "    This may be due to dynamically setting the global `required` list.\n"
                    + f"    {e}"
                )
                return []
            return required

        import inspect
        self.activate_venv(dependencies=False, debug=debug)
        required = []
        for name, val in inspect.getmembers(self.module):
            if name == 'required':
                required = val
                break
        self._required = required
        self.deactivate_venv(dependencies=False, debug=debug)
        return required


    def get_required_plugins(self, debug: bool=False) -> List[meerschaum.plugins.Plugin]:
        """
        Return a list of required Plugin objects.
        """
        from meerschaum.utils.warnings import warn
        from meerschaum.config import get_config
        from meerschaum.config.static import STATIC_CONFIG
        plugins = []
        _deps = self.get_dependencies(debug=debug)
        sep = STATIC_CONFIG['plugins']['repo_separator']
        plugin_names = [
            _d[len('plugin:'):] for _d in _deps
            if _d.startswith('plugin:') and len(_d) > len('plugin:')
        ]
        default_repo_keys = get_config('meerschaum', 'default_repository')
        for _plugin_name in plugin_names:
            if sep in _plugin_name:
                try:
                    _plugin_name, _repo_keys = _plugin_name.split(sep)
                except Exception as e:
                    _repo_keys = default_repo_keys
                    warn(
                        f"Invalid repo keys for required plugin '{_plugin_name}'.\n    "
                        + f"Will try to use '{_repo_keys}' instead.",
                        stack = False,
                    )
            else:
                _repo_keys = default_repo_keys
            plugins.append(Plugin(_plugin_name, repo=_repo_keys))
        return plugins


    def get_required_packages(self, debug: bool=False) -> List[str]:
        """
        Return the required package names (excluding plugins).
        """
        _deps = self.get_dependencies(debug=debug)
        return [_d for _d in _deps if not _d.startswith('plugin:')]


    def activate_venv(self, dependencies: bool=True, debug: bool=False, **kw) -> bool:
        """
        Activate the virtual environments for the plugin and its dependencies.

        Parameters
        ----------
        dependencies: bool, default True
            If `True`, activate the virtual environments for required plugins.

        Returns
        -------
        A bool indicating success.
        """
        from meerschaum.utils.venv import venv_target_path
        from meerschaum.utils.packages import activate_venv
        from meerschaum.utils.misc import make_symlink, is_symlink
        from meerschaum.config._paths import PACKAGE_ROOT_PATH

        if dependencies:
            for plugin in self.get_required_plugins(debug=debug):
                plugin.activate_venv(debug=debug, **kw)

        vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True)
        venv_meerschaum_path = vtp / 'meerschaum'

        try:
            success, msg = True, "Success"
            if is_symlink(venv_meerschaum_path):
                if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH:
                    venv_meerschaum_path.unlink()
                    success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH)
        except Exception as e:
            success, msg = False, str(e)
        if not success:
            warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}")

        return activate_venv(self.name, debug=debug, **kw)


    def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool:
        """
        Deactivate the virtual environments for the plugin and its dependencies.

        Parameters
        ----------
        dependencies: bool, default True
            If `True`, deactivate the virtual environments for required plugins.

        Returns
        -------
        A bool indicating success.
        """
        from meerschaum.utils.packages import deactivate_venv
        success = deactivate_venv(self.name, debug=debug, **kw)
        if dependencies:
            for plugin in self.get_required_plugins(debug=debug):
                plugin.deactivate_venv(debug=debug, **kw)
        return success


    def install_dependencies(
            self,
            force: bool = False,
            debug: bool = False,
        ) -> bool:
        """
        If specified, install dependencies.
        
        **NOTE:** Dependencies that start with `'plugin:'` will be installed as
        Meerschaum plugins from the same repository as this Plugin.
        To install from a different repository, add the repo keys after `'@'`
        (e.g. `'plugin:foo@api:bar'`).

        Parameters
        ----------
        force: bool, default False
            If `True`, continue with the installation, even if some
            required packages fail to install.

        debug: bool, default False
            Verbosity toggle.

        Returns
        -------
        A bool indicating success.

        """
        from meerschaum.utils.packages import pip_install, venv_contains_package
        from meerschaum.utils.debug import dprint
        from meerschaum.utils.warnings import warn, info
        from meerschaum.connectors.parse import parse_repo_keys
        _deps = self.get_dependencies(debug=debug)
        if not _deps and self.requirements_file_path is None:
            return True

        plugins = self.get_required_plugins(debug=debug)
        for _plugin in plugins:
            if _plugin.name == self.name:
                warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False)
                continue
            _success, _msg = _plugin.repo_connector.install_plugin(
                _plugin.name, debug=debug, force=force
            )
            if not _success:
                warn(
                    f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'"
                    + f" for plugin '{self.name}':\n" + _msg,
                    stack = False,
                )
                if not force:
                    warn(
                        "Try installing with the `--force` flag to continue anyway.",
                        stack = False,
                    )
                    return False
                info(
                    "Continuing with installation despite the failure "
                    + "(careful, things might be broken!)...",
                    icon = False
                )


        ### First step: parse `requirements.txt` if it exists.
        if self.requirements_file_path is not None:
            if not pip_install(
                requirements_file_path=self.requirements_file_path,
                venv=self.name, debug=debug
            ):
                warn(
                    f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.",
                    stack = False,
                )
                if not force:
                    warn(
                        "Try installing with `--force` to continue anyway.",
                        stack = False,
                    )
                    return False
                info(
                    "Continuing with installation despite the failure "
                    + "(careful, things might be broken!)...",
                    icon = False
                )


        ### Don't reinstall packages that are already included in required plugins.
        packages = []
        _packages = self.get_required_packages(debug=debug)
        accounted_for_packages = set()
        for package_name in _packages:
            for plugin in plugins:
                if venv_contains_package(package_name, plugin.name):
                    accounted_for_packages.add(package_name)
                    break
        packages = [pkg for pkg in _packages if pkg not in accounted_for_packages]

        ### Attempt pip packages installation.
        if packages:
            for package in packages:
                if not pip_install(package, venv=self.name, debug=debug):
                    warn(
                        f"Failed to install required package '{package}'"
                        + f" for plugin '{self.name}'.",
                        stack = False,
                    )
                    if not force:
                        warn(
                            "Try installing with `--force` to continue anyway.",
                            stack = False,
                        )
                        return False
                    info(
                        "Continuing with installation despite the failure "
                        + "(careful, things might be broken!)...",
                        icon = False
                    )
        return True


    @property
    def full_name(self) -> str:
        """
        Include the repo keys with the plugin's name.
        """
        from meerschaum.config.static import STATIC_CONFIG
        sep = STATIC_CONFIG['plugins']['repo_separator']
        return self.name + sep + str(self.repo_connector)


    def __str__(self):
        return self.name


    def __repr__(self):
        return f"Plugin('{self.name}', repo='{self.repo_connector}')"


    def __del__(self):
        pass

Instance variables

var full_name : str

Include the repo keys with the plugin's name.

Expand source code
@property
def full_name(self) -> str:
    """
    Include the repo keys with the plugin's name.
    """
    from meerschaum.config.static import STATIC_CONFIG
    sep = STATIC_CONFIG['plugins']['repo_separator']
    return self.name + sep + str(self.repo_connector)
var module

Return the Python module of the underlying plugin.

Expand source code
@property
def module(self):
    """
    Return the Python module of the underlying plugin.
    """
    if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None:
        if self.__file__ is None:
            return None
        from meerschaum.plugins import import_plugins
        self._module = import_plugins(str(self), warn=False)
    return self._module
var repo_connector

Return the repository connector for this plugin. NOTE: This imports the connectors module, which imports certain plugin modules.

Expand source code
@property
def repo_connector(self):
    """
    Return the repository connector for this plugin.
    NOTE: This imports the `connectors` module, which imports certain plugin modules.
    """
    if self._repo_connector is None:
        from meerschaum.connectors.parse import parse_repo_keys

        repo_keys = self._repo_keys or self._repo_in_name
        if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name:
            error(
                f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'."
            )
        repo_connector = parse_repo_keys(repo_keys)
        self._repo_connector = repo_connector
    return self._repo_connector
var requirements_file_path : Optional[pathlib.Path]

If a file named requirements.txt exists, return its path.

Expand source code
@property
def requirements_file_path(self) -> Union[pathlib.Path, None]:
    """
    If a file named `requirements.txt` exists, return its path.
    """
    if self.__file__ is None:
        return None
    path = pathlib.Path(self.__file__).parent / 'requirements.txt'
    if not path.exists():
        return None
    return path
var version

Return the plugin's module version is defined (__version__) if it's defined.

Expand source code
@property
def version(self):
    """
    Return the plugin's module version is defined (`__version__`) if it's defined.
    """
    if self._version is None:
        try:
            self._version = self.module.__version__
        except Exception as e:
            self._version = None
    return self._version

Methods

def activate_venv(self, dependencies: bool = True, debug: bool = False, **kw) ‑> bool

Activate the virtual environments for the plugin and its dependencies.

Parameters

dependencies : bool, default True
If True, activate the virtual environments for required plugins.

Returns

A bool indicating success.

Expand source code
def activate_venv(self, dependencies: bool=True, debug: bool=False, **kw) -> bool:
    """
    Activate the virtual environments for the plugin and its dependencies.

    Parameters
    ----------
    dependencies: bool, default True
        If `True`, activate the virtual environments for required plugins.

    Returns
    -------
    A bool indicating success.
    """
    from meerschaum.utils.venv import venv_target_path
    from meerschaum.utils.packages import activate_venv
    from meerschaum.utils.misc import make_symlink, is_symlink
    from meerschaum.config._paths import PACKAGE_ROOT_PATH

    if dependencies:
        for plugin in self.get_required_plugins(debug=debug):
            plugin.activate_venv(debug=debug, **kw)

    vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True)
    venv_meerschaum_path = vtp / 'meerschaum'

    try:
        success, msg = True, "Success"
        if is_symlink(venv_meerschaum_path):
            if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH:
                venv_meerschaum_path.unlink()
                success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH)
    except Exception as e:
        success, msg = False, str(e)
    if not success:
        warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}")

    return activate_venv(self.name, debug=debug, **kw)
def deactivate_venv(self, dependencies: bool = True, debug: bool = False, **kw) ‑> bool

Deactivate the virtual environments for the plugin and its dependencies.

Parameters

dependencies : bool, default True
If True, deactivate the virtual environments for required plugins.

Returns

A bool indicating success.

Expand source code
def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool:
    """
    Deactivate the virtual environments for the plugin and its dependencies.

    Parameters
    ----------
    dependencies: bool, default True
        If `True`, deactivate the virtual environments for required plugins.

    Returns
    -------
    A bool indicating success.
    """
    from meerschaum.utils.packages import deactivate_venv
    success = deactivate_venv(self.name, debug=debug, **kw)
    if dependencies:
        for plugin in self.get_required_plugins(debug=debug):
            plugin.deactivate_venv(debug=debug, **kw)
    return success
def get_dependencies(self, debug: bool = False) ‑> List[str]

If the Plugin has specified dependencies in a list called required, return the list.

NOTE: Dependecies which start with 'plugin:' are Meerschaum plugins, not pip packages. Meerschaum plugins may also specify connector keys for a repo after '@'.

Parameters

debug : bool, default False
Verbosity toggle.

Returns

A list of required packages and plugins (str).

Expand source code
def get_dependencies(
        self,
        debug: bool = False,
    ) -> List[str]:
    """
    If the Plugin has specified dependencies in a list called `required`, return the list.
    
    **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages.
    Meerschaum plugins may also specify connector keys for a repo after `'@'`.

    Parameters
    ----------
    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A list of required packages and plugins (str).

    """
    if '_required' in self.__dict__:
        return self._required

    ### If the plugin has not yet been imported,
    ### infer the dependencies from the source text.
    ### This is not super robust, and it doesn't feel right
    ### having multiple versions of the logic.
    ### This is necessary when determining the activation order
    ### without having import the module.
    ### For consistency's sake, the module-less method does not cache the requirements.
    if self.__dict__.get('_module', None) is None:
        file_path = self.__file__
        if file_path is None:
            return []
        with open(file_path, 'r', encoding='utf-8') as f:
            text = f.read()

        if 'required' not in text:
            return []

        ### This has some limitations:
        ### It relies on `required` being manually declared.
        ### We lose the ability to dynamically alter the `required` list,
        ### which is why we've kept the module-reliant method below.
        import ast, re
        ### NOTE: This technically would break 
        ### if `required` was the very first line of the file.
        req_start_match = re.search(r'\nrequired(\s?)=', text)
        if not req_start_match:
            return []
        req_start = req_start_match.start()

        ### Dependencies may have brackets within the strings, so push back the index.
        first_opening_brace = req_start + 1 + text[req_start:].find('[')
        if first_opening_brace == -1:
            return []

        next_closing_brace = req_start + 1 + text[req_start:].find(']')
        if next_closing_brace == -1:
            return []

        start_ix = first_opening_brace + 1
        end_ix = next_closing_brace

        num_braces = 0
        while True:
            if '[' not in text[start_ix:end_ix]:
                break
            num_braces += 1
            start_ix = end_ix
            end_ix += text[end_ix + 1:].find(']') + 1

        req_end = end_ix + 1
        req_text = (
            text[req_start:req_end]
            .lstrip()
            .replace('required', '', 1)
            .lstrip()
            .replace('=', '', 1)
            .lstrip()
        )
        try:
            required = ast.literal_eval(req_text)
        except Exception as e:
            warn(
                f"Unable to determine requirements for plugin '{self.name}' "
                + "without importing the module.\n"
                + "    This may be due to dynamically setting the global `required` list.\n"
                + f"    {e}"
            )
            return []
        return required

    import inspect
    self.activate_venv(dependencies=False, debug=debug)
    required = []
    for name, val in inspect.getmembers(self.module):
        if name == 'required':
            required = val
            break
    self._required = required
    self.deactivate_venv(dependencies=False, debug=debug)
    return required
def get_required_packages(self, debug: bool = False) ‑> List[str]

Return the required package names (excluding plugins).

Expand source code
def get_required_packages(self, debug: bool=False) -> List[str]:
    """
    Return the required package names (excluding plugins).
    """
    _deps = self.get_dependencies(debug=debug)
    return [_d for _d in _deps if not _d.startswith('plugin:')]
def get_required_plugins(self, debug: bool = False) ‑> List[Plugin]

Return a list of required Plugin objects.

Expand source code
def get_required_plugins(self, debug: bool=False) -> List[meerschaum.plugins.Plugin]:
    """
    Return a list of required Plugin objects.
    """
    from meerschaum.utils.warnings import warn
    from meerschaum.config import get_config
    from meerschaum.config.static import STATIC_CONFIG
    plugins = []
    _deps = self.get_dependencies(debug=debug)
    sep = STATIC_CONFIG['plugins']['repo_separator']
    plugin_names = [
        _d[len('plugin:'):] for _d in _deps
        if _d.startswith('plugin:') and len(_d) > len('plugin:')
    ]
    default_repo_keys = get_config('meerschaum', 'default_repository')
    for _plugin_name in plugin_names:
        if sep in _plugin_name:
            try:
                _plugin_name, _repo_keys = _plugin_name.split(sep)
            except Exception as e:
                _repo_keys = default_repo_keys
                warn(
                    f"Invalid repo keys for required plugin '{_plugin_name}'.\n    "
                    + f"Will try to use '{_repo_keys}' instead.",
                    stack = False,
                )
        else:
            _repo_keys = default_repo_keys
        plugins.append(Plugin(_plugin_name, repo=_repo_keys))
    return plugins
def install(self, force: bool = False, debug: bool = False) ‑> Tuple[bool, str]

Extract a plugin's tar archive to the plugins directory.

This function checks if the plugin is already installed and if the version is equal or greater than the existing installation.

Parameters

force : bool, default False
If True, continue with installation, even if required packages fail to install.
debug : bool, default False
Verbosity toggle.

Returns

A SuccessTuple of success (bool) and a message (str).

Expand source code
def install(
        self,
        force: bool = False,
        debug: bool = False,
    ) -> SuccessTuple:
    """
    Extract a plugin's tar archive to the plugins directory.
    
    This function checks if the plugin is already installed and if the version is equal or
    greater than the existing installation.

    Parameters
    ----------
    force: bool, default False
        If `True`, continue with installation, even if required packages fail to install.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `SuccessTuple` of success (bool) and a message (str).

    """
    if self.full_name in _ongoing_installations:
        return True, f"Already installing plugin '{self}'."
    _ongoing_installations.add(self.full_name)
    from meerschaum.utils.warnings import warn, error
    if debug:
        from meerschaum.utils.debug import dprint
    import tarfile
    import re
    import ast
    from meerschaum.plugins import reload_plugins, sync_plugins_symlinks
    from meerschaum.utils.packages import attempt_import, determine_version, reload_package
    from meerschaum.utils.venv import init_venv
    from meerschaum.utils.misc import safely_extract_tar
    old_cwd = os.getcwd()
    old_version = ''
    new_version = ''
    temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name
    temp_dir.mkdir(exist_ok=True)

    if not self.archive_path.exists():
        return False, f"Missing archive file for plugin '{self}'."
    if self.version is not None:
        old_version = self.version
        if debug:
            dprint(f"Found existing version '{old_version}' for plugin '{self}'.")

    if debug:
        dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...")

    try:
        with tarfile.open(self.archive_path, 'r:gz') as tarf:
            safely_extract_tar(tarf, temp_dir)
    except Exception as e:
        warn(e)
        return False, f"Failed to extract plugin '{self.name}'."

    ### search for version information
    files = os.listdir(temp_dir)
    
    if str(files[0]) == self.name:
        is_dir = True
    elif str(files[0]) == self.name + '.py':
        is_dir = False
    else:
        error(f"Unknown format encountered for plugin '{self}'.")

    fpath = temp_dir / files[0]
    if is_dir:
        fpath = fpath / '__init__.py'

    init_venv(self.name, debug=debug)
    with open(fpath, 'r', encoding='utf-8') as f:
        init_lines = f.readlines()
    new_version = None
    for line in init_lines:
        if '__version__' not in line:
            continue
        version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip())
        if not version_match:
            continue
        new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip())
        break
    if not new_version:
        warn(
            f"No `__version__` defined for plugin '{self}'. "
            + "Assuming new version...",
            stack = False,
        )

    packaging_version = attempt_import('packaging.version')
    try:
        is_new_version = (not new_version and not old_version) or (
            packaging_version.parse(old_version) < packaging_version.parse(new_version)
        )
        is_same_version = new_version and old_version and (
            packaging_version.parse(old_version) == packaging_version.parse(new_version)
        )
    except Exception as e:
        is_new_version, is_same_version = True, False

    ### Determine where to permanently store the new plugin.
    plugin_installation_dir_path = PLUGINS_DIR_PATHS[0]
    for path in PLUGINS_DIR_PATHS:
        files_in_plugins_dir = os.listdir(path)
        if (
            self.name in files_in_plugins_dir
            or
            (self.name + '.py') in files_in_plugins_dir
        ):
            plugin_installation_dir_path = path
            break

    success_msg = f"Successfully installed plugin '{self}'."
    success, abort = None, None

    if is_same_version and not force:
        success, msg = True, (
            f"Plugin '{self}' is up-to-date (version {old_version}).\n" +
            "    Install again with `-f` or `--force` to reinstall."
        )
        abort = True
    elif is_new_version or force:
        for src_dir, dirs, files in os.walk(temp_dir):
            if success is not None:
                break
            dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path))
            if not os.path.exists(dst_dir):
                os.mkdir(dst_dir)
            for f in files:
                src_file = os.path.join(src_dir, f)
                dst_file = os.path.join(dst_dir, f)
                if os.path.exists(dst_file):
                    os.remove(dst_file)

                if debug:
                    dprint(f"Moving '{src_file}' to '{dst_dir}'...")
                try:
                    shutil.move(src_file, dst_dir)
                except Exception as e:
                    success, msg = False, (
                        f"Failed to install plugin '{self}': " +
                        f"Could not move file '{src_file}' to '{dst_dir}'"
                    )
                    print(msg)
                    break
        if success is None:
            success, msg = True, success_msg
    else:
        success, msg = False, (
            f"Your installed version of plugin '{self}' ({old_version}) is higher than "
            + f"attempted version {new_version}."
        )

    shutil.rmtree(temp_dir)
    os.chdir(old_cwd)

    ### Reload the plugin's module.
    sync_plugins_symlinks(debug=debug)
    if '_module' in self.__dict__:
        del self.__dict__['_module']
    init_venv(venv=self.name, force=True, debug=debug)
    reload_package('meerschaum')
    reload_plugins([self.name], debug=debug)

    ### if we've already failed, return here
    if not success or abort:
        _ongoing_installations.remove(self.full_name)
        return success, msg

    ### attempt to install dependencies
    if not self.install_dependencies(force=force, debug=debug):
        _ongoing_installations.remove(self.full_name)
        return False, f"Failed to install dependencies for plugin '{self}'."

    ### handling success tuple, bool, or other (typically None)
    setup_tuple = self.setup(debug=debug)
    if isinstance(setup_tuple, tuple):
        if not setup_tuple[0]:
            success, msg = setup_tuple
    elif isinstance(setup_tuple, bool):
        if not setup_tuple:
            success, msg = False, (
                f"Failed to run post-install setup for plugin '{self}'." + '\n' +
                f"Check `setup()` in '{self.__file__}' for more information " +
                f"(no error message provided)."
            )
        else:
            success, msg = True, success_msg
    elif setup_tuple is None:
        success = True
        msg = (
            f"Post-install for plugin '{self}' returned None. " +
            f"Assuming plugin successfully installed."
        )
        warn(msg)
    else:
        success = False
        msg = (
            f"Post-install for plugin '{self}' returned unexpected value " +
            f"of type '{type(setup_tuple)}': {setup_tuple}"
        )

    _ongoing_installations.remove(self.full_name)
    module = self.module
    return success, msg
def install_dependencies(self, force: bool = False, debug: bool = False) ‑> bool

If specified, install dependencies.

NOTE: Dependencies that start with 'plugin:' will be installed as Meerschaum plugins from the same repository as this Plugin. To install from a different repository, add the repo keys after '@' (e.g. 'plugin:foo@api:bar').

Parameters

force : bool, default False
If True, continue with the installation, even if some required packages fail to install.
debug : bool, default False
Verbosity toggle.

Returns

A bool indicating success.

Expand source code
def install_dependencies(
        self,
        force: bool = False,
        debug: bool = False,
    ) -> bool:
    """
    If specified, install dependencies.
    
    **NOTE:** Dependencies that start with `'plugin:'` will be installed as
    Meerschaum plugins from the same repository as this Plugin.
    To install from a different repository, add the repo keys after `'@'`
    (e.g. `'plugin:foo@api:bar'`).

    Parameters
    ----------
    force: bool, default False
        If `True`, continue with the installation, even if some
        required packages fail to install.

    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A bool indicating success.

    """
    from meerschaum.utils.packages import pip_install, venv_contains_package
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.warnings import warn, info
    from meerschaum.connectors.parse import parse_repo_keys
    _deps = self.get_dependencies(debug=debug)
    if not _deps and self.requirements_file_path is None:
        return True

    plugins = self.get_required_plugins(debug=debug)
    for _plugin in plugins:
        if _plugin.name == self.name:
            warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False)
            continue
        _success, _msg = _plugin.repo_connector.install_plugin(
            _plugin.name, debug=debug, force=force
        )
        if not _success:
            warn(
                f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'"
                + f" for plugin '{self.name}':\n" + _msg,
                stack = False,
            )
            if not force:
                warn(
                    "Try installing with the `--force` flag to continue anyway.",
                    stack = False,
                )
                return False
            info(
                "Continuing with installation despite the failure "
                + "(careful, things might be broken!)...",
                icon = False
            )


    ### First step: parse `requirements.txt` if it exists.
    if self.requirements_file_path is not None:
        if not pip_install(
            requirements_file_path=self.requirements_file_path,
            venv=self.name, debug=debug
        ):
            warn(
                f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.",
                stack = False,
            )
            if not force:
                warn(
                    "Try installing with `--force` to continue anyway.",
                    stack = False,
                )
                return False
            info(
                "Continuing with installation despite the failure "
                + "(careful, things might be broken!)...",
                icon = False
            )


    ### Don't reinstall packages that are already included in required plugins.
    packages = []
    _packages = self.get_required_packages(debug=debug)
    accounted_for_packages = set()
    for package_name in _packages:
        for plugin in plugins:
            if venv_contains_package(package_name, plugin.name):
                accounted_for_packages.add(package_name)
                break
    packages = [pkg for pkg in _packages if pkg not in accounted_for_packages]

    ### Attempt pip packages installation.
    if packages:
        for package in packages:
            if not pip_install(package, venv=self.name, debug=debug):
                warn(
                    f"Failed to install required package '{package}'"
                    + f" for plugin '{self.name}'.",
                    stack = False,
                )
                if not force:
                    warn(
                        "Try installing with `--force` to continue anyway.",
                        stack = False,
                    )
                    return False
                info(
                    "Continuing with installation despite the failure "
                    + "(careful, things might be broken!)...",
                    icon = False
                )
    return True
def is_installed(self, **kw) ‑> bool

Check whether a plugin is correctly installed.

Returns

A bool indicating whether a plugin exists and is successfully imported.

Expand source code
def is_installed(self, **kw) -> bool:
    """
    Check whether a plugin is correctly installed.

    Returns
    -------
    A `bool` indicating whether a plugin exists and is successfully imported.
    """
    return self.__file__ is not None
def make_tar(self, debug: bool = False) ‑> pathlib.Path

Compress the plugin's source files into a .tar.gz archive and return the archive's path.

Parameters

debug : bool, default False
Verbosity toggle.

Returns

A pathlib.Path to the archive file's path.

Expand source code
def make_tar(self, debug: bool = False) -> pathlib.Path:
    """
    Compress the plugin's source files into a `.tar.gz` archive and return the archive's path.

    Parameters
    ----------
    debug: bool, default False
        Verbosity toggle.

    Returns
    -------
    A `pathlib.Path` to the archive file's path.

    """
    import tarfile, pathlib, subprocess, fnmatch
    from meerschaum.utils.debug import dprint
    from meerschaum.utils.packages import attempt_import
    pathspec = attempt_import('pathspec', debug=debug)

    if not self.__file__:
        from meerschaum.utils.warnings import error
        error(f"Could not find file for plugin '{self}'.")
    if '__init__.py' in self.__file__ or os.path.isdir(self.__file__):
        path = self.__file__.replace('__init__.py', '')
        is_dir = True
    else:
        path = self.__file__
        is_dir = False

    old_cwd = os.getcwd()
    real_parent_path = pathlib.Path(os.path.realpath(path)).parent
    os.chdir(real_parent_path)

    default_patterns_to_ignore = [
        '.pyc',
        '__pycache__/',
        'eggs/',
        '__pypackages__/',
        '.git',
    ]

    def parse_gitignore() -> 'Set[str]':
        gitignore_path = pathlib.Path(path) / '.gitignore'
        if not gitignore_path.exists():
            return set()
        with open(gitignore_path, 'r', encoding='utf-8') as f:
            gitignore_text = f.read()
        return set(pathspec.PathSpec.from_lines(
            pathspec.patterns.GitWildMatchPattern,
            default_patterns_to_ignore + gitignore_text.splitlines()
        ).match_tree(path))

    patterns_to_ignore = parse_gitignore() if is_dir else set()

    if debug:
        dprint(f"Patterns to ignore:\n{patterns_to_ignore}")

    with tarfile.open(self.archive_path, 'w:gz') as tarf:
        if not is_dir:
            tarf.add(f"{self.name}.py")
        else:
            for root, dirs, files in os.walk(self.name):
                for f in files:
                    good_file = True
                    fp = os.path.join(root, f)
                    for pattern in patterns_to_ignore:
                        if pattern in str(fp) or f.startswith('.'):
                            good_file = False
                            break
                    if good_file:
                        if debug:
                            dprint(f"Adding '{fp}'...")
                        tarf.add(fp)

    ### clean up and change back to old directory
    os.chdir(old_cwd)

    ### change to 775 to avoid permissions issues with the API in a Docker container
    self.archive_path.chmod(0o775)

    if debug:
        dprint(f"Created archive '{self.archive_path}'.")
    return self.archive_path
def remove_archive(self, debug: bool = False) ‑> Tuple[bool, str]

Remove a plugin's archive file.

Expand source code
def remove_archive(
        self,        
        debug: bool = False
    ) -> SuccessTuple:
    """Remove a plugin's archive file."""
    if not self.archive_path.exists():
        return True, f"Archive file for plugin '{self}' does not exist."
    try:
        self.archive_path.unlink()
    except Exception as e:
        return False, f"Failed to remove archive for plugin '{self}':\n{e}"
    return True, "Success"
def remove_venv(self, debug: bool = False) ‑> Tuple[bool, str]

Remove a plugin's virtual environment.

Expand source code
def remove_venv(
        self,        
        debug: bool = False
    ) -> SuccessTuple:
    """Remove a plugin's virtual environment."""
    if not self.venv_path.exists():
        return True, f"Virtual environment for plugin '{self}' does not exist."
    try:
        shutil.rmtree(self.venv_path)
    except Exception as e:
        return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}"
    return True, "Success"
def setup(self, *args: str, debug: bool = False, **kw: Any) ‑> Union[Tuple[bool, str], bool]

If exists, run the plugin's setup() function.

Parameters

*args : str
The positional arguments passed to the setup() function.
debug : bool, default False
Verbosity toggle.
**kw : Any
The keyword arguments passed to the setup() function.

Returns

A SuccessTuple or bool indicating success.

Expand source code
def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]:
    """
    If exists, run the plugin's `setup()` function.

    Parameters
    ----------
    *args: str
        The positional arguments passed to the `setup()` function.
        
    debug: bool, default False
        Verbosity toggle.

    **kw: Any
        The keyword arguments passed to the `setup()` function.

    Returns
    -------
    A `SuccessTuple` or `bool` indicating success.

    """
    from meerschaum.utils.debug import dprint
    import inspect
    _setup = None
    for name, fp in inspect.getmembers(self.module):
        if name == 'setup' and inspect.isfunction(fp):
            _setup = fp
            break

    ### assume success if no setup() is found (not necessary)
    if _setup is None:
        return True

    sig = inspect.signature(_setup)
    has_debug, has_kw = ('debug' in sig.parameters), False
    for k, v in sig.parameters.items():
        if '**' in str(v):
            has_kw = True
            break

    _kw = {}
    if has_kw:
        _kw.update(kw)
    if has_debug:
        _kw['debug'] = debug

    if debug:
        dprint(f"Running setup for plugin '{self}'...")
    try:
        self.activate_venv(debug=debug)
        return_tuple = _setup(*args, **_kw)
        self.deactivate_venv(debug=debug)
    except Exception as e:
        return False, str(e)

    if isinstance(return_tuple, tuple):
        return return_tuple
    if isinstance(return_tuple, bool):
        return return_tuple, f"Setup for Plugin '{self.name}' did not return a message."
    if return_tuple is None:
        return False, f"Setup for Plugin '{self.name}' returned None."
    return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"
def uninstall(self, debug: bool = False) ‑> Tuple[bool, str]

Remove a plugin, its virtual environment, and archive file.

Expand source code
def uninstall(self, debug: bool = False) -> SuccessTuple:
    """
    Remove a plugin, its virtual environment, and archive file.
    """
    from meerschaum.utils.packages import reload_package
    from meerschaum.plugins import reload_plugins, sync_plugins_symlinks
    from meerschaum.utils.warnings import warn, info
    warnings_thrown_count: int = 0
    max_warnings: int = 3

    if not self.is_installed():
        info(
            f"Plugin '{self.name}' doesn't seem to be installed.\n    "
            + "Checking for artifacts...",
            stack = False,
        )
    else:
        real_path = pathlib.Path(os.path.realpath(self.__file__))
        try:
            if real_path.name == '__init__.py':
                shutil.rmtree(real_path.parent)
            else:
                real_path.unlink()
        except Exception as e:
            warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False)
            warnings_thrown_count += 1
        else:
            info(f"Removed source files for plugin '{self.name}'.")

    if self.venv_path.exists():
        success, msg = self.remove_venv(debug=debug)
        if not success:
            warn(msg, stack=False)
            warnings_thrown_count += 1
        else:
            info(f"Removed virtual environment from plugin '{self.name}'.")

    success = warnings_thrown_count < max_warnings
    sync_plugins_symlinks(debug=debug)
    self.deactivate_venv(force=True, debug=debug)
    reload_package('meerschaum')
    reload_plugins(debug=debug)
    return success, (
        f"Successfully uninstalled plugin '{self}'." if success
        else f"Failed to uninstall plugin '{self}'."
    )