meerschaum.plugins

Expose plugin management APIs from the meerschaum.plugins module.

   1#! /usr/bin/env python
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4
   5"""
   6Expose plugin management APIs from the `meerschaum.plugins` module.
   7"""
   8
   9from __future__ import annotations
  10
  11import pathlib
  12import functools
  13from collections import defaultdict
  14
  15import meerschaum as mrsm
  16from meerschaum.utils.typing import Callable, Any, Union, Optional, Dict, List, Tuple
  17from meerschaum.utils.threading import RLock
  18from meerschaum.core.Plugin import Plugin
  19
  20_api_plugins: Dict[str, List[Callable[['fastapi.App'], Any]]] = {}
  21_pre_sync_hooks: Dict[Union[str, None], List[Callable[[Any], Any]]] = {}
  22_post_sync_hooks: Dict[Union[str, None], List[Callable[[Any], Any]]] = {}
  23_actions_daemon_enabled: Dict[str, bool] = {}
  24_locks = {
  25    '_api_plugins': RLock(),
  26    '_dash_plugins': RLock(),
  27    '_pre_sync_hooks': RLock(),
  28    '_post_sync_hooks': RLock(),
  29    '_actions_daemon_enabled': RLock(),
  30    '__path__': RLock(),
  31    'sys.path': RLock(),
  32    'internal_plugins': RLock(),
  33    '_synced_symlinks': RLock(),
  34    'PLUGINS_INTERNAL_LOCK_PATH': RLock(),
  35}
  36__all__ = (
  37    "Plugin",
  38    "make_action",
  39    "api_plugin",
  40    "dash_plugin",
  41    "web_page",
  42    "import_plugins",
  43    "from_plugin_import",
  44    "reload_plugins",
  45    "get_plugins",
  46    "get_data_plugins",
  47    "add_plugin_argument",
  48    "pre_sync_hook",
  49    "post_sync_hook",
  50)
  51__pdoc__ = {
  52    'venvs': False,
  53    'data': False,
  54    'stack': False,
  55    'plugins': False,
  56}
  57
  58
  59def make_action(
  60    function: Optional[Callable[[Any], Any]] = None,
  61    shell: bool = False,
  62    activate: bool = True,
  63    deactivate: bool = True,
  64    debug: bool = False,
  65    daemon: bool = True,
  66    skip_if_loaded: bool = True,
  67    _plugin_name: Optional[str] = None,
  68) -> Callable[[Any], Any]:
  69    """
  70    Make a function a Meerschaum action. Useful for plugins that are adding multiple actions.
  71    
  72    Parameters
  73    ----------
  74    function: Callable[[Any], Any]
  75        The function to become a Meerschaum action. Must accept all keyword arguments.
  76        
  77    shell: bool, default False
  78        Not used.
  79        
  80    Returns
  81    -------
  82    Another function (this is a decorator function).
  83
  84    Examples
  85    --------
  86    >>> from meerschaum.plugins import make_action
  87    >>>
  88    >>> @make_action
  89    ... def my_action(**kw):
  90    ...     print('foo')
  91    ...     return True, "Success"
  92    >>>
  93    """
  94    def _decorator(func: Callable[[Any], Any]) -> Callable[[Any], Any]:
  95        from meerschaum.actions import actions, _custom_actions_plugins, _plugins_actions
  96        if skip_if_loaded and func.__name__ in actions:
  97            return func
  98
  99        from meerschaum.config.paths import PLUGINS_RESOURCES_PATH
 100        plugin_name = (
 101            func.__name__.split(f"{PLUGINS_RESOURCES_PATH.stem}.", maxsplit=1)[-1].split('.')[0]
 102        )
 103        plugin = Plugin(plugin_name) if plugin_name else None
 104
 105        if debug:
 106            from meerschaum.utils.debug import dprint
 107            dprint(
 108                f"Adding action '{func.__name__}' from plugin "
 109                f"'{plugin}'..."
 110            )
 111
 112        actions[func.__name__] = func
 113        _custom_actions_plugins[func.__name__] = plugin_name
 114        if plugin_name not in _plugins_actions:
 115            _plugins_actions[plugin_name] = []
 116        _plugins_actions[plugin_name].append(func.__name__)
 117        if not daemon:
 118            _actions_daemon_enabled[func.__name__] = False
 119        return func
 120
 121    if function is None:
 122        return _decorator
 123    return _decorator(function)
 124
 125
 126def pre_sync_hook(
 127    function: Callable[[Any], Any],
 128) -> Callable[[Any], Any]:
 129    """
 130    Register a function as a sync hook to be executed right before sync.
 131    
 132    Parameters
 133    ----------
 134    function: Callable[[Any], Any]
 135        The function to execute right before a sync.
 136        
 137    Returns
 138    -------
 139    Another function (this is a decorator function).
 140
 141    Examples
 142    --------
 143    >>> from meerschaum.plugins import pre_sync_hook
 144    >>>
 145    >>> @pre_sync_hook
 146    ... def log_sync(pipe, **kwargs):
 147    ...     print(f"About to sync {pipe} with kwargs:\n{kwargs}.")
 148    >>>
 149    """
 150    with _locks['_pre_sync_hooks']:
 151        plugin_name = _get_parent_plugin()
 152        try:
 153            if plugin_name not in _pre_sync_hooks:
 154                _pre_sync_hooks[plugin_name] = []
 155            _pre_sync_hooks[plugin_name].append(function)
 156        except Exception as e:
 157            from meerschaum.utils.warnings import warn
 158            warn(e)
 159    return function
 160
 161
 162def post_sync_hook(
 163    function: Callable[[Any], Any],
 164) -> Callable[[Any], Any]:
 165    """
 166    Register a function as a sync hook to be executed upon completion of a sync.
 167    
 168    Parameters
 169    ----------
 170    function: Callable[[Any], Any]
 171        The function to execute upon completion of a sync.
 172        
 173    Returns
 174    -------
 175    Another function (this is a decorator function).
 176
 177    Examples
 178    --------
 179    >>> from meerschaum.plugins import post_sync_hook
 180    >>> from meerschaum.utils.misc import interval_str
 181    >>> from datetime import timedelta
 182    >>>
 183    >>> @post_sync_hook
 184    ... def log_sync(pipe, success_tuple, duration=None, **kwargs):
 185    ...     duration_delta = timedelta(seconds=duration)
 186    ...     duration_text = interval_str(duration_delta)
 187    ...     print(f"It took {duration_text} to sync {pipe}.")
 188    >>>
 189    """
 190    with _locks['_post_sync_hooks']:
 191        try:
 192            plugin_name = _get_parent_plugin()
 193            if plugin_name not in _post_sync_hooks:
 194                _post_sync_hooks[plugin_name] = []
 195            _post_sync_hooks[plugin_name].append(function)
 196        except Exception as e:
 197            from meerschaum.utils.warnings import warn
 198            warn(e)
 199    return function
 200
 201
 202_plugin_endpoints_to_pages = {}
 203_plugins_web_pages = {}
 204def web_page(
 205    page: Union[str, None, Callable[[Any], Any]] = None,
 206    login_required: bool = True,
 207    skip_navbar: bool = False,
 208    page_group: Optional[str] = None,
 209    **kwargs
 210) -> Any:
 211    """
 212    Quickly add pages to the dash application.
 213
 214    Examples
 215    --------
 216    >>> import meerschaum as mrsm
 217    >>> from meerschaum.plugins import web_page
 218    >>> html = mrsm.attempt_import('dash.html')
 219    >>> 
 220    >>> @web_page('foo/bar', login_required=False)
 221    >>> def foo_bar():
 222    ...     return html.Div([html.H1("Hello, World!")])
 223    >>> 
 224    """
 225    page_str = None
 226
 227    def _decorator(_func: Callable[[Any], Any]) -> Callable[[Any], Any]:
 228        nonlocal page_str, page_group
 229
 230        @functools.wraps(_func)
 231        def wrapper(*_args, **_kwargs):
 232            return _func(*_args, **_kwargs)
 233
 234        if page_str is None:
 235            page_str = _func.__name__
 236
 237        page_str = page_str.lstrip('/').rstrip('/').strip()
 238        if not page_str.startswith('dash'):
 239            page_str = f'/dash/{page_str}'
 240        page_key = (
 241            ' '.join(
 242                [
 243                    word.capitalize()
 244                    for word in (
 245                        page_str.replace('/dash', '').lstrip('/').rstrip('/').strip()
 246                        .replace('-', ' ').replace('_', ' ').split(' ')
 247                    )
 248                ]
 249            )
 250        )
 251 
 252        plugin_name = _get_parent_plugin()
 253        page_group = page_group or plugin_name
 254        if page_group not in _plugin_endpoints_to_pages:
 255            _plugin_endpoints_to_pages[page_group] = {}
 256        _plugin_endpoints_to_pages[page_group][page_str] = {
 257            'function': _func,
 258            'login_required': login_required,
 259            'skip_navbar': skip_navbar,
 260            'page_key': page_key,
 261        }
 262        if plugin_name not in _plugins_web_pages:
 263            _plugins_web_pages[plugin_name] = []
 264        _plugins_web_pages[plugin_name].append(_func)
 265        return wrapper
 266
 267    if callable(page):
 268        decorator_to_return = _decorator(page)
 269        page_str = page.__name__
 270    else:
 271        decorator_to_return = _decorator
 272        page_str = page
 273
 274    return decorator_to_return
 275
 276
 277_dash_plugins = {}
 278def dash_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
 279    """
 280    Execute the function when starting the Dash application.
 281    """
 282    with _locks['_dash_plugins']:
 283        plugin_name = _get_parent_plugin()
 284        try:
 285            if plugin_name not in _dash_plugins:
 286                _dash_plugins[plugin_name] = []
 287            _dash_plugins[plugin_name].append(function)
 288        except Exception as e:
 289            from meerschaum.utils.warnings import warn
 290            warn(e)
 291    return function
 292
 293
 294def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
 295    """
 296    Execute the function when initializing the Meerschaum API module.
 297    Useful for lazy-loading heavy plugins only when the API is started,
 298    such as when editing the `meerschaum.api.app` FastAPI app.
 299    
 300    The FastAPI app will be passed as the only parameter.
 301    
 302    Examples
 303    --------
 304    >>> from meerschaum.plugins import api_plugin
 305    >>>
 306    >>> @api_plugin
 307    >>> def initialize_plugin(app):
 308    ...     @app.get('/my/new/path')
 309    ...     def new_path():
 310    ...         return {'message': 'It works!'}
 311    >>>
 312    """
 313    with _locks['_api_plugins']:
 314        try:
 315            if function.__module__ not in _api_plugins:
 316                _api_plugins[function.__module__] = []
 317            _api_plugins[function.__module__].append(function)
 318        except Exception as e:
 319            from meerschaum.utils.warnings import warn
 320            warn(e)
 321    return function
 322
 323
 324_synced_symlinks: int = 0
 325_injected_plugin_symlinks = defaultdict(lambda: 0)
 326def sync_plugins_symlinks(debug: bool = False, warn: bool = True) -> None:
 327    """
 328    Update the plugins' internal symlinks. 
 329    """
 330    from meerschaum.utils.warnings import error, warn as _warn, dprint
 331    global _synced_symlinks
 332    with _locks['_synced_symlinks']:
 333        if _synced_symlinks > 1:
 334            if debug:
 335                dprint("Skip syncing symlinks...")
 336            return
 337
 338    import os
 339    import pathlib
 340    import time
 341    from collections import defaultdict
 342    from meerschaum.utils.misc import flatten_list, make_symlink, is_symlink
 343    from meerschaum._internal.static import STATIC_CONFIG
 344    from meerschaum.config._paths import (
 345        PLUGINS_RESOURCES_PATH,
 346        PLUGINS_INIT_PATH,
 347        PLUGINS_DIR_PATHS,
 348        PLUGINS_INTERNAL_LOCK_PATH,
 349        PLUGINS_INJECTED_RESOURCES_PATH,
 350    )
 351
 352    ### If the lock file exists, sleep for up to a second or until it's removed before continuing.
 353    with _locks['PLUGINS_INTERNAL_LOCK_PATH']:
 354        if PLUGINS_INTERNAL_LOCK_PATH.exists():
 355            lock_sleep_total     = STATIC_CONFIG['plugins']['lock_sleep_total']
 356            lock_sleep_increment = STATIC_CONFIG['plugins']['lock_sleep_increment']
 357            lock_start = time.perf_counter()
 358            while (
 359                (time.perf_counter() - lock_start) < lock_sleep_total
 360            ):
 361                time.sleep(lock_sleep_increment)
 362                if not PLUGINS_INTERNAL_LOCK_PATH.exists():
 363                    break
 364                try:
 365                    PLUGINS_INTERNAL_LOCK_PATH.unlink()
 366                except Exception as e:
 367                    if warn:
 368                        _warn(f"Error while removing lockfile {PLUGINS_INTERNAL_LOCK_PATH}:\n{e}")
 369                    break
 370
 371        ### Begin locking from other processes.
 372        try:
 373            PLUGINS_INTERNAL_LOCK_PATH.touch()
 374        except Exception as e:
 375            if warn:
 376                _warn(f"Unable to create lockfile {PLUGINS_INTERNAL_LOCK_PATH}:\n{e}")
 377
 378    with _locks['internal_plugins']:
 379
 380        try:
 381            from importlib.metadata import entry_points
 382        except ImportError:
 383            importlib_metadata = mrsm.attempt_import('importlib_metadata', lazy=False)
 384            entry_points = importlib_metadata.entry_points
 385
 386        ### NOTE: Allow plugins to be installed via `pip`.
 387        packaged_plugin_paths = []
 388        try:
 389            discovered_packaged_plugins_eps = entry_points(group='meerschaum.plugins')
 390        except TypeError:
 391            discovered_packaged_plugins_eps = []
 392
 393        for ep in discovered_packaged_plugins_eps:
 394            module_name = ep.name
 395            for package_file_path in ep.dist.files:
 396                if package_file_path.suffix != '.py':
 397                    continue
 398                if str(package_file_path) == f'{module_name}.py':
 399                    packaged_plugin_paths.append(package_file_path.locate())
 400                elif str(package_file_path) == f'{module_name}/__init__.py':
 401                    packaged_plugin_paths.append(package_file_path.locate().parent)
 402
 403        if is_symlink(PLUGINS_RESOURCES_PATH) or not PLUGINS_RESOURCES_PATH.exists():
 404            try:
 405                PLUGINS_RESOURCES_PATH.unlink()
 406            except Exception:
 407                pass
 408
 409        PLUGINS_RESOURCES_PATH.mkdir(exist_ok=True)
 410
 411        existing_symlinked_paths = {
 412            _existing_symlink: pathlib.Path(os.path.realpath(_existing_symlink))
 413            for item in os.listdir(PLUGINS_RESOURCES_PATH)
 414            if is_symlink(_existing_symlink := (PLUGINS_RESOURCES_PATH / item))
 415        }
 416        injected_symlinked_paths = {
 417            _injected_symlink: pathlib.Path(os.path.realpath(_injected_symlink))
 418            for item in os.listdir(PLUGINS_INJECTED_RESOURCES_PATH)
 419            if is_symlink(_injected_symlink := (PLUGINS_INJECTED_RESOURCES_PATH / item))
 420        }
 421        plugins_to_be_symlinked = list(flatten_list(
 422            [
 423                [
 424                    pathlib.Path(os.path.realpath(plugins_path / item))
 425                    for item in os.listdir(plugins_path)
 426                    if (
 427                        not item.startswith('.')
 428                    ) and (item not in ('__pycache__', '__init__.py'))
 429                ]
 430                for plugins_path in PLUGINS_DIR_PATHS
 431                if plugins_path.exists()
 432            ]
 433        ))
 434        plugins_to_be_symlinked.extend(packaged_plugin_paths)
 435
 436        ### Check for duplicates.
 437        seen_plugins = defaultdict(lambda: 0)
 438        for plugin_path in plugins_to_be_symlinked:
 439            plugin_name = plugin_path.stem
 440            seen_plugins[plugin_name] += 1
 441        for plugin_name, plugin_count in seen_plugins.items():
 442            if plugin_count > 1:
 443                if warn:
 444                    _warn(f"Found duplicate plugins named '{plugin_name}'.")
 445
 446        for plugin_symlink_path, real_path in existing_symlinked_paths.items():
 447
 448            ### Remove invalid symlinks.
 449            if real_path not in plugins_to_be_symlinked:
 450                if _injected_plugin_symlinks[plugin_symlink_path] > 1:
 451                    continue
 452                if plugin_symlink_path in injected_symlinked_paths:
 453                    continue
 454                if real_path in injected_symlinked_paths.values():
 455                    continue
 456                try:
 457                    plugin_symlink_path.unlink()
 458                except Exception:
 459                    pass
 460
 461            ### Remove valid plugins from the to-be-symlinked list.
 462            else:
 463                plugins_to_be_symlinked.remove(real_path)
 464
 465        for plugin_path in plugins_to_be_symlinked:
 466            plugin_symlink_path = PLUGINS_RESOURCES_PATH / plugin_path.name
 467            try:
 468                ### There might be duplicate folders (e.g. __pycache__).
 469                if (
 470                    plugin_symlink_path.exists()
 471                    and
 472                    plugin_symlink_path.is_dir()
 473                    and
 474                    not is_symlink(plugin_symlink_path)
 475                ):
 476                    continue
 477                success, msg = make_symlink(plugin_path, plugin_symlink_path)
 478            except Exception as e:
 479                success, msg = False, str(e)
 480            if not success:
 481                if warn:
 482                    _warn(
 483                        f"Failed to create symlink {plugin_symlink_path} "
 484                        + f"to {plugin_path}:\n    {msg}"
 485                    )
 486
 487    ### Release symlink lock file in case other processes need it.
 488    with _locks['PLUGINS_INTERNAL_LOCK_PATH']:
 489        try:
 490            if PLUGINS_INTERNAL_LOCK_PATH.exists():
 491                PLUGINS_INTERNAL_LOCK_PATH.unlink()
 492        ### Sometimes competing threads will delete the lock file at the same time.
 493        except FileNotFoundError:
 494            pass
 495        except Exception as e:
 496            if warn:
 497                _warn(f"Error cleaning up lockfile {PLUGINS_INTERNAL_LOCK_PATH}:\n{e}")
 498
 499        try:
 500            if not PLUGINS_INIT_PATH.exists():
 501                PLUGINS_INIT_PATH.touch()
 502        except Exception as e:
 503            error(f"Failed to create the file '{PLUGINS_INIT_PATH}':\n{e}")
 504
 505    with _locks['__path__']:
 506        if str(PLUGINS_RESOURCES_PATH.parent) not in __path__:
 507            __path__.append(str(PLUGINS_RESOURCES_PATH.parent))
 508
 509    with _locks['_synced_symlinks']:
 510        _synced_symlinks += 1
 511
 512
 513def import_plugins(
 514    *plugins_to_import: Union[str, List[str], None],
 515    warn: bool = True,
 516) -> Union[
 517    'ModuleType', Tuple['ModuleType', None]
 518]:
 519    """
 520    Import the Meerschaum plugins directory.
 521
 522    Parameters
 523    ----------
 524    plugins_to_import: Union[str, List[str], None]
 525        If provided, only import the specified plugins.
 526        Otherwise import the entire plugins module. May be a string, list, or `None`.
 527        Defaults to `None`.
 528
 529    Returns
 530    -------
 531    A module of list of modules, depening on the number of plugins provided.
 532
 533    """
 534    import sys
 535    import importlib
 536    from meerschaum.utils.misc import flatten_list
 537    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
 538    from meerschaum.utils.venv import is_venv_active, activate_venv, deactivate_venv, Venv
 539    from meerschaum.utils.warnings import warn as _warn
 540    plugins_to_import = list(plugins_to_import)
 541    prepended_sys_path = False
 542    with _locks['sys.path']:
 543
 544        ### Since plugins may depend on other plugins,
 545        ### we need to activate the virtual environments for library plugins.
 546        ### This logic exists in `Plugin.activate_venv()`,
 547        ### but that code requires the plugin's module to already be imported.
 548        ### It's not a guarantee of correct activation order,
 549        ### e.g. if a library plugin pins a specific package and another 
 550        plugins_names = get_plugins_names()
 551        already_active_venvs = [is_venv_active(plugin_name) for plugin_name in plugins_names]
 552
 553        if not sys.path or sys.path[0] != str(PLUGINS_RESOURCES_PATH.parent):
 554            prepended_sys_path = True
 555            sys.path.insert(0, str(PLUGINS_RESOURCES_PATH.parent))
 556
 557        if not plugins_to_import:
 558            for plugin_name in plugins_names:
 559                activate_venv(plugin_name)
 560            try:
 561                imported_plugins = importlib.import_module(PLUGINS_RESOURCES_PATH.stem)
 562            except ImportError as e:
 563                _warn(f"Failed to import the plugins module:\n    {e}")
 564                import traceback
 565                traceback.print_exc()
 566                imported_plugins = None
 567            for plugin_name in plugins_names:
 568                if plugin_name in already_active_venvs:
 569                    continue
 570                deactivate_venv(plugin_name)
 571
 572        else:
 573            imported_plugins = []
 574            for plugin_name in flatten_list(plugins_to_import):
 575                plugin = Plugin(plugin_name)
 576                try:
 577                    with Venv(plugin, init_if_not_exists=False):
 578                        imported_plugins.append(
 579                            importlib.import_module(
 580                                f'{PLUGINS_RESOURCES_PATH.stem}.{plugin_name}'
 581                            )
 582                        )
 583                except Exception as e:
 584                    _warn(
 585                        f"Failed to import plugin '{plugin_name}':\n    "
 586                        + f"{e}\n\nHere's a stacktrace:",
 587                        stack = False,
 588                    )
 589                    from meerschaum.utils.formatting import get_console
 590                    get_console().print_exception(
 591                        suppress = [
 592                            'meerschaum/plugins/__init__.py',
 593                            importlib,
 594                            importlib._bootstrap,
 595                        ]
 596                    )
 597                    imported_plugins.append(None)
 598
 599        if imported_plugins is None and warn:
 600            _warn("Failed to import plugins.", stacklevel=3)
 601
 602        if prepended_sys_path and str(PLUGINS_RESOURCES_PATH.parent) in sys.path:
 603            sys.path.remove(str(PLUGINS_RESOURCES_PATH.parent))
 604
 605    if isinstance(imported_plugins, list):
 606        return (imported_plugins[0] if len(imported_plugins) == 1 else tuple(imported_plugins))
 607    return imported_plugins
 608
 609
 610def from_plugin_import(plugin_import_name: str, *attrs: str) -> Any:
 611    """
 612    Emulate the `from module import x` behavior.
 613
 614    Parameters
 615    ----------
 616    plugin_import_name: str
 617        The import name of the plugin's module.
 618        Separate submodules with '.' (e.g. 'compose.utils.pipes')
 619
 620    attrs: str
 621        Names of the attributes to return.
 622
 623    Returns
 624    -------
 625    Objects from a plugin's submodule.
 626    If multiple objects are provided, return a tuple.
 627
 628    Examples
 629    --------
 630    >>> init = from_plugin_import('compose.utils', 'init')
 631    >>> with mrsm.Venv('compose'):
 632    ...     cf = init()
 633    >>> build_parent_pipe, get_defined_pipes = from_plugin_import(
 634    ...     'compose.utils.pipes',
 635    ...     'build_parent_pipe',
 636    ...     'get_defined_pipes',
 637    ... )
 638    >>> parent_pipe = build_parent_pipe(cf)
 639    >>> defined_pipes = get_defined_pipes(cf)
 640    """
 641    import importlib
 642    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
 643    from meerschaum.utils.warnings import warn as _warn
 644    if plugin_import_name.startswith('plugins.'):
 645        plugin_import_name = plugin_import_name[len('plugins.'):]
 646    plugin_import_parts = plugin_import_name.split('.')
 647    plugin_root_name = plugin_import_parts[0]
 648    plugin = mrsm.Plugin(plugin_root_name)
 649
 650    submodule_import_name = '.'.join(
 651        [PLUGINS_RESOURCES_PATH.stem]
 652        + plugin_import_parts
 653    )
 654    if len(attrs) == 0:
 655        raise ValueError(f"Provide which attributes to return from '{submodule_import_name}'.")
 656
 657    attrs_to_return = []
 658    with mrsm.Venv(plugin):
 659        if plugin.module is None:
 660            raise ImportError(f"Unable to import plugin '{plugin}'.")
 661
 662        try:
 663            submodule = importlib.import_module(submodule_import_name)
 664        except ImportError as e:
 665            _warn(
 666                f"Failed to import plugin '{submodule_import_name}':\n    "
 667                + f"{e}\n\nHere's a stacktrace:",
 668                stack=False,
 669            )
 670            from meerschaum.utils.formatting import get_console
 671            get_console().print_exception(
 672                suppress=[
 673                    'meerschaum/plugins/__init__.py',
 674                    importlib,
 675                    importlib._bootstrap,
 676                ]
 677            )
 678            return None
 679
 680        for attr in attrs:
 681            try:
 682                attrs_to_return.append(getattr(submodule, attr))
 683            except Exception:
 684                _warn(f"Failed to access '{attr}' from '{submodule_import_name}'.")
 685                attrs_to_return.append(None)
 686        
 687        if len(attrs) == 1:
 688            return attrs_to_return[0]
 689
 690        return tuple(attrs_to_return)
 691
 692
 693_loaded_plugins: bool = False
 694def load_plugins(
 695    skip_if_loaded: bool = True,
 696    shell: bool = False,
 697    debug: bool = False,
 698) -> None:
 699    """
 700    Import Meerschaum plugins and update the actions dictionary.
 701    """
 702    global _loaded_plugins
 703    from meerschaum.utils.warnings import dprint
 704
 705    if skip_if_loaded and _loaded_plugins:
 706        if debug:
 707            dprint("Skip loading plugins...")
 708        return
 709
 710    from inspect import isfunction, getmembers
 711    from meerschaum.actions import __all__ as _all, modules
 712    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
 713    from meerschaum.utils.packages import get_modules_from_package
 714
 715    _plugins_names, plugins_modules = get_modules_from_package(
 716        import_plugins(),
 717        names = True,
 718        recursive = True,
 719        modules_venvs = True
 720    )
 721
 722    ### I'm appending here to keep from redefining the modules list.
 723    new_modules = (
 724        [
 725            mod
 726            for mod in modules
 727            if not mod.__name__.startswith(PLUGINS_RESOURCES_PATH.stem + '.')
 728        ]
 729        + plugins_modules
 730    )
 731    n_mods = len(modules)
 732    for mod in new_modules:
 733        modules.append(mod)
 734    for i in range(n_mods):
 735        modules.pop(0)
 736
 737    for module in plugins_modules:
 738        for name, func in getmembers(module):
 739            if not isfunction(func):
 740                continue
 741            if name == module.__name__.split('.')[-1]:
 742                make_action(
 743                    func,
 744                    **{'shell': shell, 'debug': debug},
 745                    _plugin_name=name,
 746                    skip_if_loaded=True,
 747                )
 748
 749    _loaded_plugins = True
 750
 751
 752def unload_custom_actions(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
 753    """
 754    Unload the custom actions added by plugins.
 755    """
 756    from meerschaum.actions import (
 757        actions,
 758        _custom_actions_plugins,
 759        _plugins_actions,
 760    )
 761    from meerschaum._internal.entry import _shell
 762    import meerschaum._internal.shell as shell_pkg
 763
 764    plugins = plugins if plugins is not None else list(_plugins_actions)
 765
 766    for plugin in plugins:
 767        action_names = _plugins_actions.get(plugin, [])
 768        actions_to_remove = {
 769            action_name: actions.get(action_name, None)
 770            for action_name in action_names
 771        }
 772        for action_name in action_names:
 773            _ = actions.pop(action_name, None)
 774            _ = _custom_actions_plugins.pop(action_name, None)
 775            _ = _actions_daemon_enabled.pop(action_name, None)
 776
 777        _ = _plugins_actions.pop(plugin, None)
 778        shell_pkg._remove_shell_actions(
 779            _shell=_shell,
 780            actions=actions_to_remove,
 781        )
 782    
 783
 784def unload_plugins(
 785    plugins: Optional[List[str]] = None,
 786    remove_symlinks: bool = True,
 787    debug: bool = False,
 788) -> None:
 789    """
 790    Unload the specified plugins from memory.
 791    """
 792    global _loaded_plugins, _synced_symlinks
 793    import sys
 794    from meerschaum.config.paths import PLUGINS_RESOURCES_PATH, PLUGINS_INJECTED_RESOURCES_PATH
 795    from meerschaum.connectors import unload_plugin_connectors
 796    if debug:
 797        from meerschaum.utils.warnings import dprint
 798
 799    _loaded_plugins = False
 800    _synced_symlinks = 0
 801
 802    all_plugins = get_plugins_names()
 803    plugins = plugins if plugins is not None else all_plugins
 804    if debug:
 805        dprint(f"Unloading plugins: {plugins}")
 806
 807    unload_custom_actions(plugins, debug=debug)
 808    unload_plugin_connectors(plugins, debug=debug)
 809
 810    module_prefix = f"{PLUGINS_RESOURCES_PATH.stem}."
 811    loaded_modules = [mod_name for mod_name in sys.modules if mod_name.startswith(module_prefix)]
 812
 813    root_plugins_mod = (
 814        sys.modules.get(PLUGINS_RESOURCES_PATH.stem, None)
 815        if sorted(plugins) != sorted(all_plugins)
 816        else sys.modules.pop(PLUGINS_RESOURCES_PATH, None)
 817    )
 818
 819    for plugin_name in plugins:
 820        for mod_name in loaded_modules:
 821            if (
 822                mod_name[len(PLUGINS_RESOURCES_PATH.stem):].startswith(plugin_name + '.')
 823                or mod_name[len(PLUGINS_RESOURCES_PATH.stem):] == plugin_name
 824            ):
 825                _ = sys.modules.pop(mod_name, None)
 826
 827        if root_plugins_mod is not None and plugin_name in root_plugins_mod.__dict__:
 828            try:
 829                delattr(root_plugins_mod, plugin_name)
 830            except Exception:
 831                pass
 832
 833        ### Unload sync hooks.
 834        _ = _pre_sync_hooks.pop(plugin_name, None)
 835        _ = _post_sync_hooks.pop(plugin_name, None)
 836
 837        ### Unload API endpoints and pages.
 838        _ = _dash_plugins.pop(plugin_name, None)
 839        web_page_funcs = _plugins_web_pages.pop(plugin_name, None) or []
 840        page_groups_to_pop = []
 841        for page_group, page_functions in _plugin_endpoints_to_pages.items():
 842            page_functions_to_pop = [
 843                page_str
 844                for page_str, page_payload in page_functions.items()
 845                if page_payload.get('function', None) in web_page_funcs
 846            ]
 847            for page_str in page_functions_to_pop:
 848                page_functions.pop(page_str, None)
 849            if not page_functions:
 850                page_groups_to_pop.append(page_group)
 851        
 852        for page_group in page_groups_to_pop:
 853            _plugin_endpoints_to_pages.pop(page_group, None)
 854
 855        ### Remove all but injected symlinks.
 856        if remove_symlinks:
 857            dir_symlink_path = PLUGINS_RESOURCES_PATH / plugin_name
 858            dir_symlink_injected_path = PLUGINS_INJECTED_RESOURCES_PATH / plugin_name
 859            file_symlink_path = PLUGINS_RESOURCES_PATH / f"{plugin_name}.py"
 860            file_symlink_injected_path = PLUGINS_INJECTED_RESOURCES_PATH / f"{plugin_name}.py"
 861
 862            try:
 863                if dir_symlink_path.exists() and not dir_symlink_injected_path.exists():
 864                    dir_symlink_path.unlink()
 865            except Exception:
 866                pass
 867
 868            try:
 869                if file_symlink_path.exists() and not file_symlink_injected_path.exists():
 870                    file_symlink_path.unlink()
 871            except Exception:
 872                pass
 873
 874
 875def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
 876    """
 877    Reload plugins back into memory.
 878
 879    Parameters
 880    ----------
 881    plugins: Optional[List[str]], default None
 882        The plugins to reload. `None` will reload all plugins.
 883
 884    """
 885    global _synced_symlinks
 886    unload_plugins(plugins, debug=debug)
 887    _synced_symlinks = 0
 888    sync_plugins_symlinks(debug=debug)
 889    load_plugins(skip_if_loaded=False, debug=debug)
 890
 891
 892def get_plugins(*to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]:
 893    """
 894    Return a list of `Plugin` objects.
 895
 896    Parameters
 897    ----------
 898    to_load:
 899        If specified, only load specific plugins.
 900        Otherwise return all plugins.
 901
 902    try_import: bool, default True
 903        If `True`, allow for plugins to be imported.
 904    """
 905    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
 906    import os
 907    sync_plugins_symlinks()
 908    _plugins = [
 909        Plugin(name)
 910        for name in (
 911            to_load or [
 912                (
 913                    name if (PLUGINS_RESOURCES_PATH / name).is_dir()
 914                    else name[:-3]
 915                )
 916                for name in os.listdir(PLUGINS_RESOURCES_PATH)
 917                if name != '__init__.py'
 918            ]
 919        )
 920    ]
 921    plugins = tuple(plugin for plugin in _plugins if plugin.is_installed(try_import=try_import))
 922    if len(to_load) == 1:
 923        if len(plugins) == 0:
 924            raise ValueError(f"Plugin '{to_load[0]}' is not installed.")
 925        return plugins[0]
 926    return plugins
 927
 928
 929def get_plugins_names(*to_load, **kw) -> List[str]:
 930    """
 931    Return a list of installed plugins.
 932    """
 933    return [plugin.name for plugin in get_plugins(*to_load, **kw)]
 934
 935
 936def get_plugins_modules(*to_load, **kw) -> List['ModuleType']:
 937    """
 938    Return a list of modules for the installed plugins, or `None` if things break.
 939    """
 940    return [plugin.module for plugin in get_plugins(*to_load, **kw)]
 941
 942
 943def get_data_plugins() -> List[Plugin]:
 944    """
 945    Only return the modules of plugins with either `fetch()` or `sync()` functions.
 946    """
 947    import inspect
 948    plugins = get_plugins()
 949    data_names = {'sync', 'fetch'}
 950    data_plugins = []
 951    for plugin in plugins:
 952        for name, ob in inspect.getmembers(plugin.module):
 953            if not inspect.isfunction(ob):
 954                continue
 955            if name not in data_names:
 956                continue
 957            data_plugins.append(plugin)
 958    return data_plugins
 959
 960
 961def add_plugin_argument(*args, **kwargs) -> None:
 962    """
 963    Add argparse arguments under the 'Plugins options' group.
 964    Takes the same parameters as the regular argparse `add_argument()` function.
 965
 966    Examples
 967    --------
 968    >>> add_plugin_argument('--foo', type=int, help="This is my help text!")
 969    >>> 
 970    """
 971    from meerschaum._internal.arguments._parser import groups, _seen_plugin_args, parser
 972    from meerschaum.utils.warnings import warn
 973    _parent_plugin_name = _get_parent_plugin()
 974    title = f"Plugin '{_parent_plugin_name}' options" if _parent_plugin_name else 'Custom options'
 975    group_key = 'plugin_' + (_parent_plugin_name or '')
 976    if group_key not in groups:
 977        groups[group_key] = parser.add_argument_group(
 978            title = title,
 979        )
 980        _seen_plugin_args[group_key] = set()
 981    try:
 982        if str(args) not in _seen_plugin_args[group_key]:
 983            groups[group_key].add_argument(*args, **kwargs)
 984            _seen_plugin_args[group_key].add(str(args))
 985    except Exception as e:
 986        warn(e)
 987
 988
 989def inject_plugin_path(
 990    plugin_path: pathlib.Path,
 991    plugins_resources_path: Optional[pathlib.Path] = None) -> None:
 992    """
 993    Inject a plugin as a symlink into the internal `plugins` directory.
 994
 995    Parameters
 996    ----------
 997    plugin_path: pathlib.Path
 998        The path to the plugin's source module.
 999    """
1000    from meerschaum.utils.misc import make_symlink
1001    if plugins_resources_path is None:
1002        from meerschaum.config.paths import PLUGINS_RESOURCES_PATH, PLUGINS_INJECTED_RESOURCES_PATH
1003        plugins_resources_path = PLUGINS_RESOURCES_PATH
1004        plugins_injected_resources_path = PLUGINS_INJECTED_RESOURCES_PATH
1005    else:
1006        plugins_injected_resources_path = plugins_resources_path / '.injected'
1007
1008    if plugin_path.is_dir():
1009        plugin_name = plugin_path.name
1010        dest_path = plugins_resources_path / plugin_name
1011        injected_path = plugins_injected_resources_path / plugin_name
1012    elif plugin_path.name == '__init__.py':
1013        plugin_name = plugin_path.parent.name
1014        dest_path = plugins_resources_path / plugin_name
1015        injected_path = plugins_injected_resources_path / plugin_name
1016    elif plugin_path.name.endswith('.py'):
1017        plugin_name = plugin_path.name[:(-1 * len('.py'))]
1018        dest_path = plugins_resources_path / plugin_path.name
1019        injected_path = plugins_injected_resources_path / plugin_path.name
1020    else:
1021        raise ValueError(f"Cannot deduce plugin name from path '{plugin_path}'.")
1022
1023    _injected_plugin_symlinks[dest_path] += 1
1024    make_symlink(plugin_path, dest_path)
1025    make_symlink(plugin_path, injected_path)
1026
1027
1028def _get_parent_plugin(stacklevel: Union[int, Tuple[int, ...]] = (1, 2, 3, 4)) -> Union[str, None]:
1029    """If this function is called from outside a Meerschaum plugin, it will return None."""
1030    import inspect
1031    if not isinstance(stacklevel, tuple):
1032        stacklevel = (stacklevel,)
1033
1034    for _level in stacklevel:
1035        try:
1036            parent_globals = inspect.stack()[_level][0].f_globals
1037            global_name = parent_globals.get('__name__', '')
1038            if global_name.startswith('meerschaum.'):
1039                continue
1040            plugin_name = global_name.replace('plugins.', '').split('.')[0]
1041            if plugin_name.startswith('_') or plugin_name == 'importlib':
1042                continue
1043            return plugin_name
1044        except Exception:
1045            continue
1046
1047    return None
class Plugin:
 30class Plugin:
 31    """Handle packaging of Meerschaum plugins."""
 32
 33    def __init__(
 34        self,
 35        name: str,
 36        version: Optional[str] = None,
 37        user_id: Optional[int] = None,
 38        required: Optional[List[str]] = None,
 39        attributes: Optional[Dict[str, Any]] = None,
 40        archive_path: Optional[pathlib.Path] = None,
 41        venv_path: Optional[pathlib.Path] = None,
 42        repo_connector: Optional['mrsm.connectors.api.APIConnector'] = None,
 43        repo: Union['mrsm.connectors.api.APIConnector', str, None] = None,
 44    ):
 45        from meerschaum._internal.static import STATIC_CONFIG
 46        from meerschaum.config.paths import PLUGINS_ARCHIVES_RESOURCES_PATH, VIRTENV_RESOURCES_PATH
 47        sep = STATIC_CONFIG['plugins']['repo_separator']
 48        _repo = None
 49        if sep in name:
 50            try:
 51                name, _repo = name.split(sep)
 52            except Exception as e:
 53                error(f"Invalid plugin name: '{name}'")
 54        self._repo_in_name = _repo
 55
 56        if attributes is None:
 57            attributes = {}
 58        self.name = name
 59        self.attributes = attributes
 60        self.user_id = user_id
 61        self._version = version
 62        if required:
 63            self._required = required
 64        self.archive_path = (
 65            archive_path if archive_path is not None
 66            else PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz"
 67        )
 68        self.venv_path = (
 69            venv_path if venv_path is not None
 70            else VIRTENV_RESOURCES_PATH / self.name
 71        )
 72        self._repo_connector = repo_connector
 73        self._repo_keys = repo
 74
 75
 76    @property
 77    def repo_connector(self):
 78        """
 79        Return the repository connector for this plugin.
 80        NOTE: This imports the `connectors` module, which imports certain plugin modules.
 81        """
 82        if self._repo_connector is None:
 83            from meerschaum.connectors.parse import parse_repo_keys
 84
 85            repo_keys = self._repo_keys or self._repo_in_name
 86            if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name:
 87                error(
 88                    f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'."
 89                )
 90            repo_connector = parse_repo_keys(repo_keys)
 91            self._repo_connector = repo_connector
 92        return self._repo_connector
 93
 94
 95    @property
 96    def version(self):
 97        """
 98        Return the plugin's module version is defined (`__version__`) if it's defined.
 99        """
100        if self._version is None:
101            try:
102                self._version = self.module.__version__
103            except Exception as e:
104                self._version = None
105        return self._version
106
107
108    @property
109    def module(self):
110        """
111        Return the Python module of the underlying plugin.
112        """
113        if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None:
114            if self.__file__ is None:
115                return None
116
117            from meerschaum.plugins import import_plugins
118            self._module = import_plugins(str(self), warn=False)
119
120        return self._module
121
122
123    @property
124    def __file__(self) -> Union[str, None]:
125        """
126        Return the file path (str) of the plugin if it exists, otherwise `None`.
127        """
128        if self.__dict__.get('_module', None) is not None:
129            return self.module.__file__
130
131        from meerschaum.config.paths import PLUGINS_RESOURCES_PATH
132
133        potential_dir = PLUGINS_RESOURCES_PATH / self.name
134        if (
135            potential_dir.exists()
136            and potential_dir.is_dir()
137            and (potential_dir / '__init__.py').exists()
138        ):
139            return str((potential_dir / '__init__.py').as_posix())
140
141        potential_file = PLUGINS_RESOURCES_PATH / (self.name + '.py')
142        if potential_file.exists() and not potential_file.is_dir():
143            return str(potential_file.as_posix())
144
145        return None
146
147
148    @property
149    def requirements_file_path(self) -> Union[pathlib.Path, None]:
150        """
151        If a file named `requirements.txt` exists, return its path.
152        """
153        if self.__file__ is None:
154            return None
155        path = pathlib.Path(self.__file__).parent / 'requirements.txt'
156        if not path.exists():
157            return None
158        return path
159
160
161    def is_installed(self, **kw) -> bool:
162        """
163        Check whether a plugin is correctly installed.
164
165        Returns
166        -------
167        A `bool` indicating whether a plugin exists and is successfully imported.
168        """
169        return self.__file__ is not None
170
171
172    def make_tar(self, debug: bool = False) -> pathlib.Path:
173        """
174        Compress the plugin's source files into a `.tar.gz` archive and return the archive's path.
175
176        Parameters
177        ----------
178        debug: bool, default False
179            Verbosity toggle.
180
181        Returns
182        -------
183        A `pathlib.Path` to the archive file's path.
184
185        """
186        import tarfile, pathlib, subprocess, fnmatch
187        from meerschaum.utils.debug import dprint
188        from meerschaum.utils.packages import attempt_import
189        pathspec = attempt_import('pathspec', debug=debug)
190
191        if not self.__file__:
192            from meerschaum.utils.warnings import error
193            error(f"Could not find file for plugin '{self}'.")
194        if '__init__.py' in self.__file__ or os.path.isdir(self.__file__):
195            path = self.__file__.replace('__init__.py', '')
196            is_dir = True
197        else:
198            path = self.__file__
199            is_dir = False
200
201        old_cwd = os.getcwd()
202        real_parent_path = pathlib.Path(os.path.realpath(path)).parent
203        os.chdir(real_parent_path)
204
205        default_patterns_to_ignore = [
206            '.pyc',
207            '__pycache__/',
208            'eggs/',
209            '__pypackages__/',
210            '.git',
211        ]
212
213        def parse_gitignore() -> 'Set[str]':
214            gitignore_path = pathlib.Path(path) / '.gitignore'
215            if not gitignore_path.exists():
216                return set(default_patterns_to_ignore)
217            with open(gitignore_path, 'r', encoding='utf-8') as f:
218                gitignore_text = f.read()
219            return set(pathspec.PathSpec.from_lines(
220                pathspec.patterns.GitWildMatchPattern,
221                default_patterns_to_ignore + gitignore_text.splitlines()
222            ).match_tree(path))
223
224        patterns_to_ignore = parse_gitignore() if is_dir else set()
225
226        if debug:
227            dprint(f"Patterns to ignore:\n{patterns_to_ignore}")
228
229        with tarfile.open(self.archive_path, 'w:gz') as tarf:
230            if not is_dir:
231                tarf.add(f"{self.name}.py")
232            else:
233                for root, dirs, files in os.walk(self.name):
234                    for f in files:
235                        good_file = True
236                        fp = os.path.join(root, f)
237                        for pattern in patterns_to_ignore:
238                            if pattern in str(fp) or f.startswith('.'):
239                                good_file = False
240                                break
241                        if good_file:
242                            if debug:
243                                dprint(f"Adding '{fp}'...")
244                            tarf.add(fp)
245
246        ### clean up and change back to old directory
247        os.chdir(old_cwd)
248
249        ### change to 775 to avoid permissions issues with the API in a Docker container
250        self.archive_path.chmod(0o775)
251
252        if debug:
253            dprint(f"Created archive '{self.archive_path}'.")
254        return self.archive_path
255
256
257    def install(
258        self,
259        skip_deps: bool = False,
260        force: bool = False,
261        debug: bool = False,
262    ) -> SuccessTuple:
263        """
264        Extract a plugin's tar archive to the plugins directory.
265        
266        This function checks if the plugin is already installed and if the version is equal or
267        greater than the existing installation.
268
269        Parameters
270        ----------
271        skip_deps: bool, default False
272            If `True`, do not install dependencies.
273
274        force: bool, default False
275            If `True`, continue with installation, even if required packages fail to install.
276
277        debug: bool, default False
278            Verbosity toggle.
279
280        Returns
281        -------
282        A `SuccessTuple` of success (bool) and a message (str).
283
284        """
285        if self.full_name in _ongoing_installations:
286            return True, f"Already installing plugin '{self}'."
287        _ongoing_installations.add(self.full_name)
288        from meerschaum.utils.warnings import warn, error
289        if debug:
290            from meerschaum.utils.debug import dprint
291        import tarfile
292        import re
293        import ast
294        from meerschaum.plugins import sync_plugins_symlinks
295        from meerschaum.utils.packages import attempt_import, determine_version, reload_meerschaum
296        from meerschaum.utils.venv import init_venv
297        from meerschaum.utils.misc import safely_extract_tar
298        from meerschaum.config.paths import PLUGINS_TEMP_RESOURCES_PATH, PLUGINS_DIR_PATHS
299        old_cwd = os.getcwd()
300        old_version = ''
301        new_version = ''
302        temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name
303        temp_dir.mkdir(exist_ok=True)
304
305        if not self.archive_path.exists():
306            return False, f"Missing archive file for plugin '{self}'."
307        if self.version is not None:
308            old_version = self.version
309            if debug:
310                dprint(f"Found existing version '{old_version}' for plugin '{self}'.")
311
312        if debug:
313            dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...")
314
315        try:
316            with tarfile.open(self.archive_path, 'r:gz') as tarf:
317                safely_extract_tar(tarf, temp_dir)
318        except Exception as e:
319            warn(e)
320            return False, f"Failed to extract plugin '{self.name}'."
321
322        ### search for version information
323        files = os.listdir(temp_dir)
324        
325        if str(files[0]) == self.name:
326            is_dir = True
327        elif str(files[0]) == self.name + '.py':
328            is_dir = False
329        else:
330            error(f"Unknown format encountered for plugin '{self}'.")
331
332        fpath = temp_dir / files[0]
333        if is_dir:
334            fpath = fpath / '__init__.py'
335
336        init_venv(self.name, debug=debug)
337        with open(fpath, 'r', encoding='utf-8') as f:
338            init_lines = f.readlines()
339        new_version = None
340        for line in init_lines:
341            if '__version__' not in line:
342                continue
343            version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip())
344            if not version_match:
345                continue
346            new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip())
347            break
348        if not new_version:
349            warn(
350                f"No `__version__` defined for plugin '{self}'. "
351                + "Assuming new version...",
352                stack = False,
353            )
354
355        packaging_version = attempt_import('packaging.version')
356        try:
357            is_new_version = (not new_version and not old_version) or (
358                packaging_version.parse(old_version) < packaging_version.parse(new_version)
359            )
360            is_same_version = new_version and old_version and (
361                packaging_version.parse(old_version) == packaging_version.parse(new_version)
362            )
363        except Exception:
364            is_new_version, is_same_version = True, False
365
366        ### Determine where to permanently store the new plugin.
367        plugin_installation_dir_path = PLUGINS_DIR_PATHS[0]
368        for path in PLUGINS_DIR_PATHS:
369            if not path.exists():
370                warn(f"Plugins path does not exist: {path}", stack=False)
371                continue
372
373            files_in_plugins_dir = os.listdir(path)
374            if (
375                self.name in files_in_plugins_dir
376                or
377                (self.name + '.py') in files_in_plugins_dir
378            ):
379                plugin_installation_dir_path = path
380                break
381
382        success_msg = (
383            f"Successfully installed plugin '{self}'"
384            + ("\n    (skipped dependencies)" if skip_deps else "")
385            + "."
386        )
387        success, abort = None, None
388
389        if is_same_version and not force:
390            success, msg = True, (
391                f"Plugin '{self}' is up-to-date (version {old_version}).\n" +
392                "    Install again with `-f` or `--force` to reinstall."
393            )
394            abort = True
395        elif is_new_version or force:
396            for src_dir, dirs, files in os.walk(temp_dir):
397                if success is not None:
398                    break
399                dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path))
400                if not os.path.exists(dst_dir):
401                    os.mkdir(dst_dir)
402                for f in files:
403                    src_file = os.path.join(src_dir, f)
404                    dst_file = os.path.join(dst_dir, f)
405                    if os.path.exists(dst_file):
406                        os.remove(dst_file)
407
408                    if debug:
409                        dprint(f"Moving '{src_file}' to '{dst_dir}'...")
410                    try:
411                        shutil.move(src_file, dst_dir)
412                    except Exception:
413                        success, msg = False, (
414                            f"Failed to install plugin '{self}': " +
415                            f"Could not move file '{src_file}' to '{dst_dir}'"
416                        )
417                        print(msg)
418                        break
419            if success is None:
420                success, msg = True, success_msg
421        else:
422            success, msg = False, (
423                f"Your installed version of plugin '{self}' ({old_version}) is higher than "
424                + f"attempted version {new_version}."
425            )
426
427        shutil.rmtree(temp_dir)
428        os.chdir(old_cwd)
429
430        ### Reload the plugin's module.
431        sync_plugins_symlinks(debug=debug)
432        if '_module' in self.__dict__:
433            del self.__dict__['_module']
434        init_venv(venv=self.name, force=True, debug=debug)
435        reload_meerschaum(debug=debug)
436
437        ### if we've already failed, return here
438        if not success or abort:
439            _ongoing_installations.remove(self.full_name)
440            return success, msg
441
442        ### attempt to install dependencies
443        dependencies_installed = skip_deps or self.install_dependencies(force=force, debug=debug)
444        if not dependencies_installed:
445            _ongoing_installations.remove(self.full_name)
446            return False, f"Failed to install dependencies for plugin '{self}'."
447
448        ### handling success tuple, bool, or other (typically None)
449        setup_tuple = self.setup(debug=debug)
450        if isinstance(setup_tuple, tuple):
451            if not setup_tuple[0]:
452                success, msg = setup_tuple
453        elif isinstance(setup_tuple, bool):
454            if not setup_tuple:
455                success, msg = False, (
456                    f"Failed to run post-install setup for plugin '{self}'." + '\n' +
457                    f"Check `setup()` in '{self.__file__}' for more information " +
458                    "(no error message provided)."
459                )
460            else:
461                success, msg = True, success_msg
462        elif setup_tuple is None:
463            success = True
464            msg = (
465                f"Post-install for plugin '{self}' returned None. " +
466                "Assuming plugin successfully installed."
467            )
468            warn(msg)
469        else:
470            success = False
471            msg = (
472                f"Post-install for plugin '{self}' returned unexpected value " +
473                f"of type '{type(setup_tuple)}': {setup_tuple}"
474            )
475
476        _ongoing_installations.remove(self.full_name)
477        _ = self.module
478        return success, msg
479
480
481    def remove_archive(
482        self,        
483        debug: bool = False
484    ) -> SuccessTuple:
485        """Remove a plugin's archive file."""
486        if not self.archive_path.exists():
487            return True, f"Archive file for plugin '{self}' does not exist."
488        try:
489            self.archive_path.unlink()
490        except Exception as e:
491            return False, f"Failed to remove archive for plugin '{self}':\n{e}"
492        return True, "Success"
493
494
495    def remove_venv(
496        self,        
497        debug: bool = False
498    ) -> SuccessTuple:
499        """Remove a plugin's virtual environment."""
500        if not self.venv_path.exists():
501            return True, f"Virtual environment for plugin '{self}' does not exist."
502        try:
503            shutil.rmtree(self.venv_path)
504        except Exception as e:
505            return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}"
506        return True, "Success"
507
508
509    def uninstall(self, debug: bool = False) -> SuccessTuple:
510        """
511        Remove a plugin, its virtual environment, and archive file.
512        """
513        from meerschaum.utils.packages import reload_meerschaum
514        from meerschaum.plugins import sync_plugins_symlinks
515        from meerschaum.utils.warnings import warn, info
516        warnings_thrown_count: int = 0
517        max_warnings: int = 3
518
519        if not self.is_installed():
520            info(
521                f"Plugin '{self.name}' doesn't seem to be installed.\n    "
522                + "Checking for artifacts...",
523                stack = False,
524            )
525        else:
526            real_path = pathlib.Path(os.path.realpath(self.__file__))
527            try:
528                if real_path.name == '__init__.py':
529                    shutil.rmtree(real_path.parent)
530                else:
531                    real_path.unlink()
532            except Exception as e:
533                warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False)
534                warnings_thrown_count += 1
535            else:
536                info(f"Removed source files for plugin '{self.name}'.")
537
538        if self.venv_path.exists():
539            success, msg = self.remove_venv(debug=debug)
540            if not success:
541                warn(msg, stack=False)
542                warnings_thrown_count += 1
543            else:
544                info(f"Removed virtual environment from plugin '{self.name}'.")
545
546        success = warnings_thrown_count < max_warnings
547        sync_plugins_symlinks(debug=debug)
548        self.deactivate_venv(force=True, debug=debug)
549        reload_meerschaum(debug=debug)
550        return success, (
551            f"Successfully uninstalled plugin '{self}'." if success
552            else f"Failed to uninstall plugin '{self}'."
553        )
554
555
556    def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]:
557        """
558        If exists, run the plugin's `setup()` function.
559
560        Parameters
561        ----------
562        *args: str
563            The positional arguments passed to the `setup()` function.
564            
565        debug: bool, default False
566            Verbosity toggle.
567
568        **kw: Any
569            The keyword arguments passed to the `setup()` function.
570
571        Returns
572        -------
573        A `SuccessTuple` or `bool` indicating success.
574
575        """
576        from meerschaum.utils.debug import dprint
577        import inspect
578        _setup = None
579        for name, fp in inspect.getmembers(self.module):
580            if name == 'setup' and inspect.isfunction(fp):
581                _setup = fp
582                break
583
584        ### assume success if no setup() is found (not necessary)
585        if _setup is None:
586            return True
587
588        sig = inspect.signature(_setup)
589        has_debug, has_kw = ('debug' in sig.parameters), False
590        for k, v in sig.parameters.items():
591            if '**' in str(v):
592                has_kw = True
593                break
594
595        _kw = {}
596        if has_kw:
597            _kw.update(kw)
598        if has_debug:
599            _kw['debug'] = debug
600
601        if debug:
602            dprint(f"Running setup for plugin '{self}'...")
603        try:
604            self.activate_venv(debug=debug)
605            return_tuple = _setup(*args, **_kw)
606            self.deactivate_venv(debug=debug)
607        except Exception as e:
608            return False, str(e)
609
610        if isinstance(return_tuple, tuple):
611            return return_tuple
612        if isinstance(return_tuple, bool):
613            return return_tuple, f"Setup for Plugin '{self.name}' did not return a message."
614        if return_tuple is None:
615            return False, f"Setup for Plugin '{self.name}' returned None."
616        return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"
617
618
619    def get_dependencies(
620        self,
621        debug: bool = False,
622    ) -> List[str]:
623        """
624        If the Plugin has specified dependencies in a list called `required`, return the list.
625        
626        **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages.
627        Meerschaum plugins may also specify connector keys for a repo after `'@'`.
628
629        Parameters
630        ----------
631        debug: bool, default False
632            Verbosity toggle.
633
634        Returns
635        -------
636        A list of required packages and plugins (str).
637
638        """
639        if '_required' in self.__dict__:
640            return self._required
641
642        ### If the plugin has not yet been imported,
643        ### infer the dependencies from the source text.
644        ### This is not super robust, and it doesn't feel right
645        ### having multiple versions of the logic.
646        ### This is necessary when determining the activation order
647        ### without having import the module.
648        ### For consistency's sake, the module-less method does not cache the requirements.
649        if self.__dict__.get('_module', None) is None:
650            file_path = self.__file__
651            if file_path is None:
652                return []
653            with open(file_path, 'r', encoding='utf-8') as f:
654                text = f.read()
655
656            if 'required' not in text:
657                return []
658
659            ### This has some limitations:
660            ### It relies on `required` being manually declared.
661            ### We lose the ability to dynamically alter the `required` list,
662            ### which is why we've kept the module-reliant method below.
663            import ast, re
664            ### NOTE: This technically would break 
665            ### if `required` was the very first line of the file.
666            req_start_match = re.search(r'\nrequired(:\s*)?.*=', text)
667            if not req_start_match:
668                return []
669            req_start = req_start_match.start()
670            equals_sign = req_start + text[req_start:].find('=')
671
672            ### Dependencies may have brackets within the strings, so push back the index.
673            first_opening_brace = equals_sign + 1 + text[equals_sign:].find('[')
674            if first_opening_brace == -1:
675                return []
676
677            next_closing_brace = equals_sign + 1 + text[equals_sign:].find(']')
678            if next_closing_brace == -1:
679                return []
680
681            start_ix = first_opening_brace + 1
682            end_ix = next_closing_brace
683
684            num_braces = 0
685            while True:
686                if '[' not in text[start_ix:end_ix]:
687                    break
688                num_braces += 1
689                start_ix = end_ix
690                end_ix += text[end_ix + 1:].find(']') + 1
691
692            req_end = end_ix + 1
693            req_text = (
694                text[(first_opening_brace-1):req_end]
695                .lstrip()
696                .replace('=', '', 1)
697                .lstrip()
698                .rstrip()
699            )
700            try:
701                required = ast.literal_eval(req_text)
702            except Exception as e:
703                warn(
704                    f"Unable to determine requirements for plugin '{self.name}' "
705                    + "without importing the module.\n"
706                    + "    This may be due to dynamically setting the global `required` list.\n"
707                    + f"    {e}"
708                )
709                return []
710            return required
711
712        import inspect
713        self.activate_venv(dependencies=False, debug=debug)
714        required = []
715        for name, val in inspect.getmembers(self.module):
716            if name == 'required':
717                required = val
718                break
719        self._required = required
720        self.deactivate_venv(dependencies=False, debug=debug)
721        return required
722
723
724    def get_required_plugins(self, debug: bool=False) -> List[mrsm.plugins.Plugin]:
725        """
726        Return a list of required Plugin objects.
727        """
728        from meerschaum.utils.warnings import warn
729        from meerschaum.config import get_config
730        from meerschaum._internal.static import STATIC_CONFIG
731        from meerschaum.connectors.parse import is_valid_connector_keys
732        plugins = []
733        _deps = self.get_dependencies(debug=debug)
734        sep = STATIC_CONFIG['plugins']['repo_separator']
735        plugin_names = [
736            _d[len('plugin:'):] for _d in _deps
737            if _d.startswith('plugin:') and len(_d) > len('plugin:')
738        ]
739        default_repo_keys = get_config('meerschaum', 'repository')
740        skipped_repo_keys = set()
741
742        for _plugin_name in plugin_names:
743            if sep in _plugin_name:
744                try:
745                    _plugin_name, _repo_keys = _plugin_name.split(sep)
746                except Exception:
747                    _repo_keys = default_repo_keys
748                    warn(
749                        f"Invalid repo keys for required plugin '{_plugin_name}'.\n    "
750                        + f"Will try to use '{_repo_keys}' instead.",
751                        stack = False,
752                    )
753            else:
754                _repo_keys = default_repo_keys
755
756            if _repo_keys in skipped_repo_keys:
757                continue
758
759            if not is_valid_connector_keys(_repo_keys):
760                warn(
761                    f"Invalid connector '{_repo_keys}'.\n"
762                    f"    Skipping required plugins from repository '{_repo_keys}'",
763                    stack=False,
764                )
765                continue
766
767            plugins.append(Plugin(_plugin_name, repo=_repo_keys))
768
769        return plugins
770
771
772    def get_required_packages(self, debug: bool=False) -> List[str]:
773        """
774        Return the required package names (excluding plugins).
775        """
776        _deps = self.get_dependencies(debug=debug)
777        return [_d for _d in _deps if not _d.startswith('plugin:')]
778
779
780    def activate_venv(
781        self,
782        dependencies: bool = True,
783        init_if_not_exists: bool = True,
784        debug: bool = False,
785        **kw
786    ) -> bool:
787        """
788        Activate the virtual environments for the plugin and its dependencies.
789
790        Parameters
791        ----------
792        dependencies: bool, default True
793            If `True`, activate the virtual environments for required plugins.
794
795        Returns
796        -------
797        A bool indicating success.
798        """
799        from meerschaum.utils.venv import venv_target_path
800        from meerschaum.utils.packages import activate_venv
801        from meerschaum.utils.misc import make_symlink, is_symlink
802        from meerschaum.config._paths import PACKAGE_ROOT_PATH
803
804        if dependencies:
805            for plugin in self.get_required_plugins(debug=debug):
806                plugin.activate_venv(debug=debug, init_if_not_exists=init_if_not_exists, **kw)
807
808        vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True)
809        venv_meerschaum_path = vtp / 'meerschaum'
810
811        try:
812            success, msg = True, "Success"
813            if is_symlink(venv_meerschaum_path):
814                if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH:
815                    venv_meerschaum_path.unlink()
816                    success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH)
817        except Exception as e:
818            success, msg = False, str(e)
819        if not success:
820            warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}")
821
822        return activate_venv(self.name, init_if_not_exists=init_if_not_exists, debug=debug, **kw)
823
824
825    def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool:
826        """
827        Deactivate the virtual environments for the plugin and its dependencies.
828
829        Parameters
830        ----------
831        dependencies: bool, default True
832            If `True`, deactivate the virtual environments for required plugins.
833
834        Returns
835        -------
836        A bool indicating success.
837        """
838        from meerschaum.utils.packages import deactivate_venv
839        success = deactivate_venv(self.name, debug=debug, **kw)
840        if dependencies:
841            for plugin in self.get_required_plugins(debug=debug):
842                plugin.deactivate_venv(debug=debug, **kw)
843        return success
844
845
846    def install_dependencies(
847        self,
848        force: bool = False,
849        debug: bool = False,
850    ) -> bool:
851        """
852        If specified, install dependencies.
853        
854        **NOTE:** Dependencies that start with `'plugin:'` will be installed as
855        Meerschaum plugins from the same repository as this Plugin.
856        To install from a different repository, add the repo keys after `'@'`
857        (e.g. `'plugin:foo@api:bar'`).
858
859        Parameters
860        ----------
861        force: bool, default False
862            If `True`, continue with the installation, even if some
863            required packages fail to install.
864
865        debug: bool, default False
866            Verbosity toggle.
867
868        Returns
869        -------
870        A bool indicating success.
871        """
872        from meerschaum.utils.packages import pip_install, venv_contains_package
873        from meerschaum.utils.warnings import warn, info
874        _deps = self.get_dependencies(debug=debug)
875        if not _deps and self.requirements_file_path is None:
876            return True
877
878        plugins = self.get_required_plugins(debug=debug)
879        for _plugin in plugins:
880            if _plugin.name == self.name:
881                warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False)
882                continue
883            _success, _msg = _plugin.repo_connector.install_plugin(
884                _plugin.name, debug=debug, force=force
885            )
886            if not _success:
887                warn(
888                    f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'"
889                    + f" for plugin '{self.name}':\n" + _msg,
890                    stack = False,
891                )
892                if not force:
893                    warn(
894                        "Try installing with the `--force` flag to continue anyway.",
895                        stack = False,
896                    )
897                    return False
898                info(
899                    "Continuing with installation despite the failure "
900                    + "(careful, things might be broken!)...",
901                    icon = False
902                )
903
904
905        ### First step: parse `requirements.txt` if it exists.
906        if self.requirements_file_path is not None:
907            if not pip_install(
908                requirements_file_path=self.requirements_file_path,
909                venv=self.name, debug=debug
910            ):
911                warn(
912                    f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.",
913                    stack = False,
914                )
915                if not force:
916                    warn(
917                        "Try installing with `--force` to continue anyway.",
918                        stack = False,
919                    )
920                    return False
921                info(
922                    "Continuing with installation despite the failure "
923                    + "(careful, things might be broken!)...",
924                    icon = False
925                )
926
927
928        ### Don't reinstall packages that are already included in required plugins.
929        packages = []
930        _packages = self.get_required_packages(debug=debug)
931        accounted_for_packages = set()
932        for package_name in _packages:
933            for plugin in plugins:
934                if venv_contains_package(package_name, plugin.name):
935                    accounted_for_packages.add(package_name)
936                    break
937        packages = [pkg for pkg in _packages if pkg not in accounted_for_packages]
938
939        ### Attempt pip packages installation.
940        if packages:
941            for package in packages:
942                if not pip_install(package, venv=self.name, debug=debug):
943                    warn(
944                        f"Failed to install required package '{package}'"
945                        + f" for plugin '{self.name}'.",
946                        stack = False,
947                    )
948                    if not force:
949                        warn(
950                            "Try installing with `--force` to continue anyway.",
951                            stack = False,
952                        )
953                        return False
954                    info(
955                        "Continuing with installation despite the failure "
956                        + "(careful, things might be broken!)...",
957                        icon = False
958                    )
959        return True
960
961
962    @property
963    def full_name(self) -> str:
964        """
965        Include the repo keys with the plugin's name.
966        """
967        from meerschaum._internal.static import STATIC_CONFIG
968        sep = STATIC_CONFIG['plugins']['repo_separator']
969        return self.name + sep + str(self.repo_connector)
970
971
972    def __str__(self):
973        return self.name
974
975
976    def __repr__(self):
977        return f"Plugin('{self.name}', repo='{self.repo_connector}')"
978
979
980    def __del__(self):
981        pass

Handle packaging of Meerschaum plugins.

Plugin( name: str, version: Optional[str] = None, user_id: Optional[int] = None, required: Optional[List[str]] = None, attributes: Optional[Dict[str, Any]] = None, archive_path: Optional[pathlib.Path] = None, venv_path: Optional[pathlib.Path] = None, repo_connector: Optional[meerschaum.connectors.APIConnector] = None, repo: Union[meerschaum.connectors.APIConnector, str, NoneType] = None)
33    def __init__(
34        self,
35        name: str,
36        version: Optional[str] = None,
37        user_id: Optional[int] = None,
38        required: Optional[List[str]] = None,
39        attributes: Optional[Dict[str, Any]] = None,
40        archive_path: Optional[pathlib.Path] = None,
41        venv_path: Optional[pathlib.Path] = None,
42        repo_connector: Optional['mrsm.connectors.api.APIConnector'] = None,
43        repo: Union['mrsm.connectors.api.APIConnector', str, None] = None,
44    ):
45        from meerschaum._internal.static import STATIC_CONFIG
46        from meerschaum.config.paths import PLUGINS_ARCHIVES_RESOURCES_PATH, VIRTENV_RESOURCES_PATH
47        sep = STATIC_CONFIG['plugins']['repo_separator']
48        _repo = None
49        if sep in name:
50            try:
51                name, _repo = name.split(sep)
52            except Exception as e:
53                error(f"Invalid plugin name: '{name}'")
54        self._repo_in_name = _repo
55
56        if attributes is None:
57            attributes = {}
58        self.name = name
59        self.attributes = attributes
60        self.user_id = user_id
61        self._version = version
62        if required:
63            self._required = required
64        self.archive_path = (
65            archive_path if archive_path is not None
66            else PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz"
67        )
68        self.venv_path = (
69            venv_path if venv_path is not None
70            else VIRTENV_RESOURCES_PATH / self.name
71        )
72        self._repo_connector = repo_connector
73        self._repo_keys = repo
name
attributes
user_id
archive_path
venv_path
repo_connector
76    @property
77    def repo_connector(self):
78        """
79        Return the repository connector for this plugin.
80        NOTE: This imports the `connectors` module, which imports certain plugin modules.
81        """
82        if self._repo_connector is None:
83            from meerschaum.connectors.parse import parse_repo_keys
84
85            repo_keys = self._repo_keys or self._repo_in_name
86            if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name:
87                error(
88                    f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'."
89                )
90            repo_connector = parse_repo_keys(repo_keys)
91            self._repo_connector = repo_connector
92        return self._repo_connector

Return the repository connector for this plugin. NOTE: This imports the connectors module, which imports certain plugin modules.

version
 95    @property
 96    def version(self):
 97        """
 98        Return the plugin's module version is defined (`__version__`) if it's defined.
 99        """
100        if self._version is None:
101            try:
102                self._version = self.module.__version__
103            except Exception as e:
104                self._version = None
105        return self._version

Return the plugin's module version is defined (__version__) if it's defined.

module
108    @property
109    def module(self):
110        """
111        Return the Python module of the underlying plugin.
112        """
113        if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None:
114            if self.__file__ is None:
115                return None
116
117            from meerschaum.plugins import import_plugins
118            self._module = import_plugins(str(self), warn=False)
119
120        return self._module

Return the Python module of the underlying plugin.

requirements_file_path: Optional[pathlib.Path]
148    @property
149    def requirements_file_path(self) -> Union[pathlib.Path, None]:
150        """
151        If a file named `requirements.txt` exists, return its path.
152        """
153        if self.__file__ is None:
154            return None
155        path = pathlib.Path(self.__file__).parent / 'requirements.txt'
156        if not path.exists():
157            return None
158        return path

If a file named requirements.txt exists, return its path.

def is_installed(self, **kw) -> bool:
161    def is_installed(self, **kw) -> bool:
162        """
163        Check whether a plugin is correctly installed.
164
165        Returns
166        -------
167        A `bool` indicating whether a plugin exists and is successfully imported.
168        """
169        return self.__file__ is not None

Check whether a plugin is correctly installed.

Returns
  • A bool indicating whether a plugin exists and is successfully imported.
def make_tar(self, debug: bool = False) -> pathlib.Path:
172    def make_tar(self, debug: bool = False) -> pathlib.Path:
173        """
174        Compress the plugin's source files into a `.tar.gz` archive and return the archive's path.
175
176        Parameters
177        ----------
178        debug: bool, default False
179            Verbosity toggle.
180
181        Returns
182        -------
183        A `pathlib.Path` to the archive file's path.
184
185        """
186        import tarfile, pathlib, subprocess, fnmatch
187        from meerschaum.utils.debug import dprint
188        from meerschaum.utils.packages import attempt_import
189        pathspec = attempt_import('pathspec', debug=debug)
190
191        if not self.__file__:
192            from meerschaum.utils.warnings import error
193            error(f"Could not find file for plugin '{self}'.")
194        if '__init__.py' in self.__file__ or os.path.isdir(self.__file__):
195            path = self.__file__.replace('__init__.py', '')
196            is_dir = True
197        else:
198            path = self.__file__
199            is_dir = False
200
201        old_cwd = os.getcwd()
202        real_parent_path = pathlib.Path(os.path.realpath(path)).parent
203        os.chdir(real_parent_path)
204
205        default_patterns_to_ignore = [
206            '.pyc',
207            '__pycache__/',
208            'eggs/',
209            '__pypackages__/',
210            '.git',
211        ]
212
213        def parse_gitignore() -> 'Set[str]':
214            gitignore_path = pathlib.Path(path) / '.gitignore'
215            if not gitignore_path.exists():
216                return set(default_patterns_to_ignore)
217            with open(gitignore_path, 'r', encoding='utf-8') as f:
218                gitignore_text = f.read()
219            return set(pathspec.PathSpec.from_lines(
220                pathspec.patterns.GitWildMatchPattern,
221                default_patterns_to_ignore + gitignore_text.splitlines()
222            ).match_tree(path))
223
224        patterns_to_ignore = parse_gitignore() if is_dir else set()
225
226        if debug:
227            dprint(f"Patterns to ignore:\n{patterns_to_ignore}")
228
229        with tarfile.open(self.archive_path, 'w:gz') as tarf:
230            if not is_dir:
231                tarf.add(f"{self.name}.py")
232            else:
233                for root, dirs, files in os.walk(self.name):
234                    for f in files:
235                        good_file = True
236                        fp = os.path.join(root, f)
237                        for pattern in patterns_to_ignore:
238                            if pattern in str(fp) or f.startswith('.'):
239                                good_file = False
240                                break
241                        if good_file:
242                            if debug:
243                                dprint(f"Adding '{fp}'...")
244                            tarf.add(fp)
245
246        ### clean up and change back to old directory
247        os.chdir(old_cwd)
248
249        ### change to 775 to avoid permissions issues with the API in a Docker container
250        self.archive_path.chmod(0o775)
251
252        if debug:
253            dprint(f"Created archive '{self.archive_path}'.")
254        return self.archive_path

Compress the plugin's source files into a .tar.gz archive and return the archive's path.

Parameters
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pathlib.Path to the archive file's path.
def install( self, skip_deps: bool = False, force: bool = False, debug: bool = False) -> Tuple[bool, str]:
257    def install(
258        self,
259        skip_deps: bool = False,
260        force: bool = False,
261        debug: bool = False,
262    ) -> SuccessTuple:
263        """
264        Extract a plugin's tar archive to the plugins directory.
265        
266        This function checks if the plugin is already installed and if the version is equal or
267        greater than the existing installation.
268
269        Parameters
270        ----------
271        skip_deps: bool, default False
272            If `True`, do not install dependencies.
273
274        force: bool, default False
275            If `True`, continue with installation, even if required packages fail to install.
276
277        debug: bool, default False
278            Verbosity toggle.
279
280        Returns
281        -------
282        A `SuccessTuple` of success (bool) and a message (str).
283
284        """
285        if self.full_name in _ongoing_installations:
286            return True, f"Already installing plugin '{self}'."
287        _ongoing_installations.add(self.full_name)
288        from meerschaum.utils.warnings import warn, error
289        if debug:
290            from meerschaum.utils.debug import dprint
291        import tarfile
292        import re
293        import ast
294        from meerschaum.plugins import sync_plugins_symlinks
295        from meerschaum.utils.packages import attempt_import, determine_version, reload_meerschaum
296        from meerschaum.utils.venv import init_venv
297        from meerschaum.utils.misc import safely_extract_tar
298        from meerschaum.config.paths import PLUGINS_TEMP_RESOURCES_PATH, PLUGINS_DIR_PATHS
299        old_cwd = os.getcwd()
300        old_version = ''
301        new_version = ''
302        temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name
303        temp_dir.mkdir(exist_ok=True)
304
305        if not self.archive_path.exists():
306            return False, f"Missing archive file for plugin '{self}'."
307        if self.version is not None:
308            old_version = self.version
309            if debug:
310                dprint(f"Found existing version '{old_version}' for plugin '{self}'.")
311
312        if debug:
313            dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...")
314
315        try:
316            with tarfile.open(self.archive_path, 'r:gz') as tarf:
317                safely_extract_tar(tarf, temp_dir)
318        except Exception as e:
319            warn(e)
320            return False, f"Failed to extract plugin '{self.name}'."
321
322        ### search for version information
323        files = os.listdir(temp_dir)
324        
325        if str(files[0]) == self.name:
326            is_dir = True
327        elif str(files[0]) == self.name + '.py':
328            is_dir = False
329        else:
330            error(f"Unknown format encountered for plugin '{self}'.")
331
332        fpath = temp_dir / files[0]
333        if is_dir:
334            fpath = fpath / '__init__.py'
335
336        init_venv(self.name, debug=debug)
337        with open(fpath, 'r', encoding='utf-8') as f:
338            init_lines = f.readlines()
339        new_version = None
340        for line in init_lines:
341            if '__version__' not in line:
342                continue
343            version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip())
344            if not version_match:
345                continue
346            new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip())
347            break
348        if not new_version:
349            warn(
350                f"No `__version__` defined for plugin '{self}'. "
351                + "Assuming new version...",
352                stack = False,
353            )
354
355        packaging_version = attempt_import('packaging.version')
356        try:
357            is_new_version = (not new_version and not old_version) or (
358                packaging_version.parse(old_version) < packaging_version.parse(new_version)
359            )
360            is_same_version = new_version and old_version and (
361                packaging_version.parse(old_version) == packaging_version.parse(new_version)
362            )
363        except Exception:
364            is_new_version, is_same_version = True, False
365
366        ### Determine where to permanently store the new plugin.
367        plugin_installation_dir_path = PLUGINS_DIR_PATHS[0]
368        for path in PLUGINS_DIR_PATHS:
369            if not path.exists():
370                warn(f"Plugins path does not exist: {path}", stack=False)
371                continue
372
373            files_in_plugins_dir = os.listdir(path)
374            if (
375                self.name in files_in_plugins_dir
376                or
377                (self.name + '.py') in files_in_plugins_dir
378            ):
379                plugin_installation_dir_path = path
380                break
381
382        success_msg = (
383            f"Successfully installed plugin '{self}'"
384            + ("\n    (skipped dependencies)" if skip_deps else "")
385            + "."
386        )
387        success, abort = None, None
388
389        if is_same_version and not force:
390            success, msg = True, (
391                f"Plugin '{self}' is up-to-date (version {old_version}).\n" +
392                "    Install again with `-f` or `--force` to reinstall."
393            )
394            abort = True
395        elif is_new_version or force:
396            for src_dir, dirs, files in os.walk(temp_dir):
397                if success is not None:
398                    break
399                dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path))
400                if not os.path.exists(dst_dir):
401                    os.mkdir(dst_dir)
402                for f in files:
403                    src_file = os.path.join(src_dir, f)
404                    dst_file = os.path.join(dst_dir, f)
405                    if os.path.exists(dst_file):
406                        os.remove(dst_file)
407
408                    if debug:
409                        dprint(f"Moving '{src_file}' to '{dst_dir}'...")
410                    try:
411                        shutil.move(src_file, dst_dir)
412                    except Exception:
413                        success, msg = False, (
414                            f"Failed to install plugin '{self}': " +
415                            f"Could not move file '{src_file}' to '{dst_dir}'"
416                        )
417                        print(msg)
418                        break
419            if success is None:
420                success, msg = True, success_msg
421        else:
422            success, msg = False, (
423                f"Your installed version of plugin '{self}' ({old_version}) is higher than "
424                + f"attempted version {new_version}."
425            )
426
427        shutil.rmtree(temp_dir)
428        os.chdir(old_cwd)
429
430        ### Reload the plugin's module.
431        sync_plugins_symlinks(debug=debug)
432        if '_module' in self.__dict__:
433            del self.__dict__['_module']
434        init_venv(venv=self.name, force=True, debug=debug)
435        reload_meerschaum(debug=debug)
436
437        ### if we've already failed, return here
438        if not success or abort:
439            _ongoing_installations.remove(self.full_name)
440            return success, msg
441
442        ### attempt to install dependencies
443        dependencies_installed = skip_deps or self.install_dependencies(force=force, debug=debug)
444        if not dependencies_installed:
445            _ongoing_installations.remove(self.full_name)
446            return False, f"Failed to install dependencies for plugin '{self}'."
447
448        ### handling success tuple, bool, or other (typically None)
449        setup_tuple = self.setup(debug=debug)
450        if isinstance(setup_tuple, tuple):
451            if not setup_tuple[0]:
452                success, msg = setup_tuple
453        elif isinstance(setup_tuple, bool):
454            if not setup_tuple:
455                success, msg = False, (
456                    f"Failed to run post-install setup for plugin '{self}'." + '\n' +
457                    f"Check `setup()` in '{self.__file__}' for more information " +
458                    "(no error message provided)."
459                )
460            else:
461                success, msg = True, success_msg
462        elif setup_tuple is None:
463            success = True
464            msg = (
465                f"Post-install for plugin '{self}' returned None. " +
466                "Assuming plugin successfully installed."
467            )
468            warn(msg)
469        else:
470            success = False
471            msg = (
472                f"Post-install for plugin '{self}' returned unexpected value " +
473                f"of type '{type(setup_tuple)}': {setup_tuple}"
474            )
475
476        _ongoing_installations.remove(self.full_name)
477        _ = self.module
478        return success, msg

Extract a plugin's tar archive to the plugins directory.

This function checks if the plugin is already installed and if the version is equal or greater than the existing installation.

Parameters
  • skip_deps (bool, default False): If True, do not install dependencies.
  • force (bool, default False): If True, continue with installation, even if required packages fail to install.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A SuccessTuple of success (bool) and a message (str).
def remove_archive(self, debug: bool = False) -> Tuple[bool, str]:
481    def remove_archive(
482        self,        
483        debug: bool = False
484    ) -> SuccessTuple:
485        """Remove a plugin's archive file."""
486        if not self.archive_path.exists():
487            return True, f"Archive file for plugin '{self}' does not exist."
488        try:
489            self.archive_path.unlink()
490        except Exception as e:
491            return False, f"Failed to remove archive for plugin '{self}':\n{e}"
492        return True, "Success"

Remove a plugin's archive file.

def remove_venv(self, debug: bool = False) -> Tuple[bool, str]:
495    def remove_venv(
496        self,        
497        debug: bool = False
498    ) -> SuccessTuple:
499        """Remove a plugin's virtual environment."""
500        if not self.venv_path.exists():
501            return True, f"Virtual environment for plugin '{self}' does not exist."
502        try:
503            shutil.rmtree(self.venv_path)
504        except Exception as e:
505            return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}"
506        return True, "Success"

Remove a plugin's virtual environment.

def uninstall(self, debug: bool = False) -> Tuple[bool, str]:
509    def uninstall(self, debug: bool = False) -> SuccessTuple:
510        """
511        Remove a plugin, its virtual environment, and archive file.
512        """
513        from meerschaum.utils.packages import reload_meerschaum
514        from meerschaum.plugins import sync_plugins_symlinks
515        from meerschaum.utils.warnings import warn, info
516        warnings_thrown_count: int = 0
517        max_warnings: int = 3
518
519        if not self.is_installed():
520            info(
521                f"Plugin '{self.name}' doesn't seem to be installed.\n    "
522                + "Checking for artifacts...",
523                stack = False,
524            )
525        else:
526            real_path = pathlib.Path(os.path.realpath(self.__file__))
527            try:
528                if real_path.name == '__init__.py':
529                    shutil.rmtree(real_path.parent)
530                else:
531                    real_path.unlink()
532            except Exception as e:
533                warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False)
534                warnings_thrown_count += 1
535            else:
536                info(f"Removed source files for plugin '{self.name}'.")
537
538        if self.venv_path.exists():
539            success, msg = self.remove_venv(debug=debug)
540            if not success:
541                warn(msg, stack=False)
542                warnings_thrown_count += 1
543            else:
544                info(f"Removed virtual environment from plugin '{self.name}'.")
545
546        success = warnings_thrown_count < max_warnings
547        sync_plugins_symlinks(debug=debug)
548        self.deactivate_venv(force=True, debug=debug)
549        reload_meerschaum(debug=debug)
550        return success, (
551            f"Successfully uninstalled plugin '{self}'." if success
552            else f"Failed to uninstall plugin '{self}'."
553        )

Remove a plugin, its virtual environment, and archive file.

def setup( self, *args: str, debug: bool = False, **kw: Any) -> Union[Tuple[bool, str], bool]:
556    def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]:
557        """
558        If exists, run the plugin's `setup()` function.
559
560        Parameters
561        ----------
562        *args: str
563            The positional arguments passed to the `setup()` function.
564            
565        debug: bool, default False
566            Verbosity toggle.
567
568        **kw: Any
569            The keyword arguments passed to the `setup()` function.
570
571        Returns
572        -------
573        A `SuccessTuple` or `bool` indicating success.
574
575        """
576        from meerschaum.utils.debug import dprint
577        import inspect
578        _setup = None
579        for name, fp in inspect.getmembers(self.module):
580            if name == 'setup' and inspect.isfunction(fp):
581                _setup = fp
582                break
583
584        ### assume success if no setup() is found (not necessary)
585        if _setup is None:
586            return True
587
588        sig = inspect.signature(_setup)
589        has_debug, has_kw = ('debug' in sig.parameters), False
590        for k, v in sig.parameters.items():
591            if '**' in str(v):
592                has_kw = True
593                break
594
595        _kw = {}
596        if has_kw:
597            _kw.update(kw)
598        if has_debug:
599            _kw['debug'] = debug
600
601        if debug:
602            dprint(f"Running setup for plugin '{self}'...")
603        try:
604            self.activate_venv(debug=debug)
605            return_tuple = _setup(*args, **_kw)
606            self.deactivate_venv(debug=debug)
607        except Exception as e:
608            return False, str(e)
609
610        if isinstance(return_tuple, tuple):
611            return return_tuple
612        if isinstance(return_tuple, bool):
613            return return_tuple, f"Setup for Plugin '{self.name}' did not return a message."
614        if return_tuple is None:
615            return False, f"Setup for Plugin '{self.name}' returned None."
616        return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"

If exists, run the plugin's setup() function.

Parameters
  • *args (str): The positional arguments passed to the setup() function.
  • debug (bool, default False): Verbosity toggle.
  • **kw (Any): The keyword arguments passed to the setup() function.
Returns
  • A SuccessTuple or bool indicating success.
def get_dependencies(self, debug: bool = False) -> List[str]:
619    def get_dependencies(
620        self,
621        debug: bool = False,
622    ) -> List[str]:
623        """
624        If the Plugin has specified dependencies in a list called `required`, return the list.
625        
626        **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages.
627        Meerschaum plugins may also specify connector keys for a repo after `'@'`.
628
629        Parameters
630        ----------
631        debug: bool, default False
632            Verbosity toggle.
633
634        Returns
635        -------
636        A list of required packages and plugins (str).
637
638        """
639        if '_required' in self.__dict__:
640            return self._required
641
642        ### If the plugin has not yet been imported,
643        ### infer the dependencies from the source text.
644        ### This is not super robust, and it doesn't feel right
645        ### having multiple versions of the logic.
646        ### This is necessary when determining the activation order
647        ### without having import the module.
648        ### For consistency's sake, the module-less method does not cache the requirements.
649        if self.__dict__.get('_module', None) is None:
650            file_path = self.__file__
651            if file_path is None:
652                return []
653            with open(file_path, 'r', encoding='utf-8') as f:
654                text = f.read()
655
656            if 'required' not in text:
657                return []
658
659            ### This has some limitations:
660            ### It relies on `required` being manually declared.
661            ### We lose the ability to dynamically alter the `required` list,
662            ### which is why we've kept the module-reliant method below.
663            import ast, re
664            ### NOTE: This technically would break 
665            ### if `required` was the very first line of the file.
666            req_start_match = re.search(r'\nrequired(:\s*)?.*=', text)
667            if not req_start_match:
668                return []
669            req_start = req_start_match.start()
670            equals_sign = req_start + text[req_start:].find('=')
671
672            ### Dependencies may have brackets within the strings, so push back the index.
673            first_opening_brace = equals_sign + 1 + text[equals_sign:].find('[')
674            if first_opening_brace == -1:
675                return []
676
677            next_closing_brace = equals_sign + 1 + text[equals_sign:].find(']')
678            if next_closing_brace == -1:
679                return []
680
681            start_ix = first_opening_brace + 1
682            end_ix = next_closing_brace
683
684            num_braces = 0
685            while True:
686                if '[' not in text[start_ix:end_ix]:
687                    break
688                num_braces += 1
689                start_ix = end_ix
690                end_ix += text[end_ix + 1:].find(']') + 1
691
692            req_end = end_ix + 1
693            req_text = (
694                text[(first_opening_brace-1):req_end]
695                .lstrip()
696                .replace('=', '', 1)
697                .lstrip()
698                .rstrip()
699            )
700            try:
701                required = ast.literal_eval(req_text)
702            except Exception as e:
703                warn(
704                    f"Unable to determine requirements for plugin '{self.name}' "
705                    + "without importing the module.\n"
706                    + "    This may be due to dynamically setting the global `required` list.\n"
707                    + f"    {e}"
708                )
709                return []
710            return required
711
712        import inspect
713        self.activate_venv(dependencies=False, debug=debug)
714        required = []
715        for name, val in inspect.getmembers(self.module):
716            if name == 'required':
717                required = val
718                break
719        self._required = required
720        self.deactivate_venv(dependencies=False, debug=debug)
721        return required

If the Plugin has specified dependencies in a list called required, return the list.

NOTE: Dependecies which start with 'plugin:' are Meerschaum plugins, not pip packages. Meerschaum plugins may also specify connector keys for a repo after '@'.

Parameters
  • debug (bool, default False): Verbosity toggle.
Returns
  • A list of required packages and plugins (str).
def get_required_plugins(self, debug: bool = False) -> List[Plugin]:
724    def get_required_plugins(self, debug: bool=False) -> List[mrsm.plugins.Plugin]:
725        """
726        Return a list of required Plugin objects.
727        """
728        from meerschaum.utils.warnings import warn
729        from meerschaum.config import get_config
730        from meerschaum._internal.static import STATIC_CONFIG
731        from meerschaum.connectors.parse import is_valid_connector_keys
732        plugins = []
733        _deps = self.get_dependencies(debug=debug)
734        sep = STATIC_CONFIG['plugins']['repo_separator']
735        plugin_names = [
736            _d[len('plugin:'):] for _d in _deps
737            if _d.startswith('plugin:') and len(_d) > len('plugin:')
738        ]
739        default_repo_keys = get_config('meerschaum', 'repository')
740        skipped_repo_keys = set()
741
742        for _plugin_name in plugin_names:
743            if sep in _plugin_name:
744                try:
745                    _plugin_name, _repo_keys = _plugin_name.split(sep)
746                except Exception:
747                    _repo_keys = default_repo_keys
748                    warn(
749                        f"Invalid repo keys for required plugin '{_plugin_name}'.\n    "
750                        + f"Will try to use '{_repo_keys}' instead.",
751                        stack = False,
752                    )
753            else:
754                _repo_keys = default_repo_keys
755
756            if _repo_keys in skipped_repo_keys:
757                continue
758
759            if not is_valid_connector_keys(_repo_keys):
760                warn(
761                    f"Invalid connector '{_repo_keys}'.\n"
762                    f"    Skipping required plugins from repository '{_repo_keys}'",
763                    stack=False,
764                )
765                continue
766
767            plugins.append(Plugin(_plugin_name, repo=_repo_keys))
768
769        return plugins

Return a list of required Plugin objects.

def get_required_packages(self, debug: bool = False) -> List[str]:
772    def get_required_packages(self, debug: bool=False) -> List[str]:
773        """
774        Return the required package names (excluding plugins).
775        """
776        _deps = self.get_dependencies(debug=debug)
777        return [_d for _d in _deps if not _d.startswith('plugin:')]

Return the required package names (excluding plugins).

def activate_venv( self, dependencies: bool = True, init_if_not_exists: bool = True, debug: bool = False, **kw) -> bool:
780    def activate_venv(
781        self,
782        dependencies: bool = True,
783        init_if_not_exists: bool = True,
784        debug: bool = False,
785        **kw
786    ) -> bool:
787        """
788        Activate the virtual environments for the plugin and its dependencies.
789
790        Parameters
791        ----------
792        dependencies: bool, default True
793            If `True`, activate the virtual environments for required plugins.
794
795        Returns
796        -------
797        A bool indicating success.
798        """
799        from meerschaum.utils.venv import venv_target_path
800        from meerschaum.utils.packages import activate_venv
801        from meerschaum.utils.misc import make_symlink, is_symlink
802        from meerschaum.config._paths import PACKAGE_ROOT_PATH
803
804        if dependencies:
805            for plugin in self.get_required_plugins(debug=debug):
806                plugin.activate_venv(debug=debug, init_if_not_exists=init_if_not_exists, **kw)
807
808        vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True)
809        venv_meerschaum_path = vtp / 'meerschaum'
810
811        try:
812            success, msg = True, "Success"
813            if is_symlink(venv_meerschaum_path):
814                if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH:
815                    venv_meerschaum_path.unlink()
816                    success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH)
817        except Exception as e:
818            success, msg = False, str(e)
819        if not success:
820            warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}")
821
822        return activate_venv(self.name, init_if_not_exists=init_if_not_exists, debug=debug, **kw)

Activate the virtual environments for the plugin and its dependencies.

Parameters
  • dependencies (bool, default True): If True, activate the virtual environments for required plugins.
Returns
  • A bool indicating success.
def deactivate_venv(self, dependencies: bool = True, debug: bool = False, **kw) -> bool:
825    def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool:
826        """
827        Deactivate the virtual environments for the plugin and its dependencies.
828
829        Parameters
830        ----------
831        dependencies: bool, default True
832            If `True`, deactivate the virtual environments for required plugins.
833
834        Returns
835        -------
836        A bool indicating success.
837        """
838        from meerschaum.utils.packages import deactivate_venv
839        success = deactivate_venv(self.name, debug=debug, **kw)
840        if dependencies:
841            for plugin in self.get_required_plugins(debug=debug):
842                plugin.deactivate_venv(debug=debug, **kw)
843        return success

Deactivate the virtual environments for the plugin and its dependencies.

Parameters
  • dependencies (bool, default True): If True, deactivate the virtual environments for required plugins.
Returns
  • A bool indicating success.
def install_dependencies(self, force: bool = False, debug: bool = False) -> bool:
846    def install_dependencies(
847        self,
848        force: bool = False,
849        debug: bool = False,
850    ) -> bool:
851        """
852        If specified, install dependencies.
853        
854        **NOTE:** Dependencies that start with `'plugin:'` will be installed as
855        Meerschaum plugins from the same repository as this Plugin.
856        To install from a different repository, add the repo keys after `'@'`
857        (e.g. `'plugin:foo@api:bar'`).
858
859        Parameters
860        ----------
861        force: bool, default False
862            If `True`, continue with the installation, even if some
863            required packages fail to install.
864
865        debug: bool, default False
866            Verbosity toggle.
867
868        Returns
869        -------
870        A bool indicating success.
871        """
872        from meerschaum.utils.packages import pip_install, venv_contains_package
873        from meerschaum.utils.warnings import warn, info
874        _deps = self.get_dependencies(debug=debug)
875        if not _deps and self.requirements_file_path is None:
876            return True
877
878        plugins = self.get_required_plugins(debug=debug)
879        for _plugin in plugins:
880            if _plugin.name == self.name:
881                warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False)
882                continue
883            _success, _msg = _plugin.repo_connector.install_plugin(
884                _plugin.name, debug=debug, force=force
885            )
886            if not _success:
887                warn(
888                    f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'"
889                    + f" for plugin '{self.name}':\n" + _msg,
890                    stack = False,
891                )
892                if not force:
893                    warn(
894                        "Try installing with the `--force` flag to continue anyway.",
895                        stack = False,
896                    )
897                    return False
898                info(
899                    "Continuing with installation despite the failure "
900                    + "(careful, things might be broken!)...",
901                    icon = False
902                )
903
904
905        ### First step: parse `requirements.txt` if it exists.
906        if self.requirements_file_path is not None:
907            if not pip_install(
908                requirements_file_path=self.requirements_file_path,
909                venv=self.name, debug=debug
910            ):
911                warn(
912                    f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.",
913                    stack = False,
914                )
915                if not force:
916                    warn(
917                        "Try installing with `--force` to continue anyway.",
918                        stack = False,
919                    )
920                    return False
921                info(
922                    "Continuing with installation despite the failure "
923                    + "(careful, things might be broken!)...",
924                    icon = False
925                )
926
927
928        ### Don't reinstall packages that are already included in required plugins.
929        packages = []
930        _packages = self.get_required_packages(debug=debug)
931        accounted_for_packages = set()
932        for package_name in _packages:
933            for plugin in plugins:
934                if venv_contains_package(package_name, plugin.name):
935                    accounted_for_packages.add(package_name)
936                    break
937        packages = [pkg for pkg in _packages if pkg not in accounted_for_packages]
938
939        ### Attempt pip packages installation.
940        if packages:
941            for package in packages:
942                if not pip_install(package, venv=self.name, debug=debug):
943                    warn(
944                        f"Failed to install required package '{package}'"
945                        + f" for plugin '{self.name}'.",
946                        stack = False,
947                    )
948                    if not force:
949                        warn(
950                            "Try installing with `--force` to continue anyway.",
951                            stack = False,
952                        )
953                        return False
954                    info(
955                        "Continuing with installation despite the failure "
956                        + "(careful, things might be broken!)...",
957                        icon = False
958                    )
959        return True

If specified, install dependencies.

NOTE: Dependencies that start with 'plugin:' will be installed as Meerschaum plugins from the same repository as this Plugin. To install from a different repository, add the repo keys after '@' (e.g. 'plugin:foo@api:bar').

Parameters
  • force (bool, default False): If True, continue with the installation, even if some required packages fail to install.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool indicating success.
full_name: str
962    @property
963    def full_name(self) -> str:
964        """
965        Include the repo keys with the plugin's name.
966        """
967        from meerschaum._internal.static import STATIC_CONFIG
968        sep = STATIC_CONFIG['plugins']['repo_separator']
969        return self.name + sep + str(self.repo_connector)

Include the repo keys with the plugin's name.

def make_action( function: Optional[Callable[[Any], Any]] = None, shell: bool = False, activate: bool = True, deactivate: bool = True, debug: bool = False, daemon: bool = True, skip_if_loaded: bool = True, _plugin_name: Optional[str] = None) -> Callable[[Any], Any]:
 60def make_action(
 61    function: Optional[Callable[[Any], Any]] = None,
 62    shell: bool = False,
 63    activate: bool = True,
 64    deactivate: bool = True,
 65    debug: bool = False,
 66    daemon: bool = True,
 67    skip_if_loaded: bool = True,
 68    _plugin_name: Optional[str] = None,
 69) -> Callable[[Any], Any]:
 70    """
 71    Make a function a Meerschaum action. Useful for plugins that are adding multiple actions.
 72    
 73    Parameters
 74    ----------
 75    function: Callable[[Any], Any]
 76        The function to become a Meerschaum action. Must accept all keyword arguments.
 77        
 78    shell: bool, default False
 79        Not used.
 80        
 81    Returns
 82    -------
 83    Another function (this is a decorator function).
 84
 85    Examples
 86    --------
 87    >>> from meerschaum.plugins import make_action
 88    >>>
 89    >>> @make_action
 90    ... def my_action(**kw):
 91    ...     print('foo')
 92    ...     return True, "Success"
 93    >>>
 94    """
 95    def _decorator(func: Callable[[Any], Any]) -> Callable[[Any], Any]:
 96        from meerschaum.actions import actions, _custom_actions_plugins, _plugins_actions
 97        if skip_if_loaded and func.__name__ in actions:
 98            return func
 99
100        from meerschaum.config.paths import PLUGINS_RESOURCES_PATH
101        plugin_name = (
102            func.__name__.split(f"{PLUGINS_RESOURCES_PATH.stem}.", maxsplit=1)[-1].split('.')[0]
103        )
104        plugin = Plugin(plugin_name) if plugin_name else None
105
106        if debug:
107            from meerschaum.utils.debug import dprint
108            dprint(
109                f"Adding action '{func.__name__}' from plugin "
110                f"'{plugin}'..."
111            )
112
113        actions[func.__name__] = func
114        _custom_actions_plugins[func.__name__] = plugin_name
115        if plugin_name not in _plugins_actions:
116            _plugins_actions[plugin_name] = []
117        _plugins_actions[plugin_name].append(func.__name__)
118        if not daemon:
119            _actions_daemon_enabled[func.__name__] = False
120        return func
121
122    if function is None:
123        return _decorator
124    return _decorator(function)

Make a function a Meerschaum action. Useful for plugins that are adding multiple actions.

Parameters
  • function (Callable[[Any], Any]): The function to become a Meerschaum action. Must accept all keyword arguments.
  • shell (bool, default False): Not used.
Returns
  • Another function (this is a decorator function).
Examples
>>> from meerschaum.plugins import make_action
>>>
>>> @make_action
... def my_action(**kw):
...     print('foo')
...     return True, "Success"
>>>
def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
295def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
296    """
297    Execute the function when initializing the Meerschaum API module.
298    Useful for lazy-loading heavy plugins only when the API is started,
299    such as when editing the `meerschaum.api.app` FastAPI app.
300    
301    The FastAPI app will be passed as the only parameter.
302    
303    Examples
304    --------
305    >>> from meerschaum.plugins import api_plugin
306    >>>
307    >>> @api_plugin
308    >>> def initialize_plugin(app):
309    ...     @app.get('/my/new/path')
310    ...     def new_path():
311    ...         return {'message': 'It works!'}
312    >>>
313    """
314    with _locks['_api_plugins']:
315        try:
316            if function.__module__ not in _api_plugins:
317                _api_plugins[function.__module__] = []
318            _api_plugins[function.__module__].append(function)
319        except Exception as e:
320            from meerschaum.utils.warnings import warn
321            warn(e)
322    return function

Execute the function when initializing the Meerschaum API module. Useful for lazy-loading heavy plugins only when the API is started, such as when editing the meerschaum.api.app FastAPI app.

The FastAPI app will be passed as the only parameter.

Examples
>>> from meerschaum.plugins import api_plugin
>>>
>>> @api_plugin
>>> def initialize_plugin(app):
...     @app.get('/my/new/path')
...     def new_path():
...         return {'message': 'It works!'}
>>>
def dash_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
279def dash_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
280    """
281    Execute the function when starting the Dash application.
282    """
283    with _locks['_dash_plugins']:
284        plugin_name = _get_parent_plugin()
285        try:
286            if plugin_name not in _dash_plugins:
287                _dash_plugins[plugin_name] = []
288            _dash_plugins[plugin_name].append(function)
289        except Exception as e:
290            from meerschaum.utils.warnings import warn
291            warn(e)
292    return function

Execute the function when starting the Dash application.

def web_page( page: Union[str, NoneType, Callable[[Any], Any]] = None, login_required: bool = True, skip_navbar: bool = False, page_group: Optional[str] = None, **kwargs) -> Any:
205def web_page(
206    page: Union[str, None, Callable[[Any], Any]] = None,
207    login_required: bool = True,
208    skip_navbar: bool = False,
209    page_group: Optional[str] = None,
210    **kwargs
211) -> Any:
212    """
213    Quickly add pages to the dash application.
214
215    Examples
216    --------
217    >>> import meerschaum as mrsm
218    >>> from meerschaum.plugins import web_page
219    >>> html = mrsm.attempt_import('dash.html')
220    >>> 
221    >>> @web_page('foo/bar', login_required=False)
222    >>> def foo_bar():
223    ...     return html.Div([html.H1("Hello, World!")])
224    >>> 
225    """
226    page_str = None
227
228    def _decorator(_func: Callable[[Any], Any]) -> Callable[[Any], Any]:
229        nonlocal page_str, page_group
230
231        @functools.wraps(_func)
232        def wrapper(*_args, **_kwargs):
233            return _func(*_args, **_kwargs)
234
235        if page_str is None:
236            page_str = _func.__name__
237
238        page_str = page_str.lstrip('/').rstrip('/').strip()
239        if not page_str.startswith('dash'):
240            page_str = f'/dash/{page_str}'
241        page_key = (
242            ' '.join(
243                [
244                    word.capitalize()
245                    for word in (
246                        page_str.replace('/dash', '').lstrip('/').rstrip('/').strip()
247                        .replace('-', ' ').replace('_', ' ').split(' ')
248                    )
249                ]
250            )
251        )
252 
253        plugin_name = _get_parent_plugin()
254        page_group = page_group or plugin_name
255        if page_group not in _plugin_endpoints_to_pages:
256            _plugin_endpoints_to_pages[page_group] = {}
257        _plugin_endpoints_to_pages[page_group][page_str] = {
258            'function': _func,
259            'login_required': login_required,
260            'skip_navbar': skip_navbar,
261            'page_key': page_key,
262        }
263        if plugin_name not in _plugins_web_pages:
264            _plugins_web_pages[plugin_name] = []
265        _plugins_web_pages[plugin_name].append(_func)
266        return wrapper
267
268    if callable(page):
269        decorator_to_return = _decorator(page)
270        page_str = page.__name__
271    else:
272        decorator_to_return = _decorator
273        page_str = page
274
275    return decorator_to_return

Quickly add pages to the dash application.

Examples
>>> import meerschaum as mrsm
>>> from meerschaum.plugins import web_page
>>> html = mrsm.attempt_import('dash.html')
>>> 
>>> @web_page('foo/bar', login_required=False)
>>> def foo_bar():
...     return html.Div([html.H1("Hello, World!")])
>>>
def import_plugins( *plugins_to_import: Union[str, List[str], NoneType], warn: bool = True) -> "Union['ModuleType', Tuple['ModuleType', None]]":
514def import_plugins(
515    *plugins_to_import: Union[str, List[str], None],
516    warn: bool = True,
517) -> Union[
518    'ModuleType', Tuple['ModuleType', None]
519]:
520    """
521    Import the Meerschaum plugins directory.
522
523    Parameters
524    ----------
525    plugins_to_import: Union[str, List[str], None]
526        If provided, only import the specified plugins.
527        Otherwise import the entire plugins module. May be a string, list, or `None`.
528        Defaults to `None`.
529
530    Returns
531    -------
532    A module of list of modules, depening on the number of plugins provided.
533
534    """
535    import sys
536    import importlib
537    from meerschaum.utils.misc import flatten_list
538    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
539    from meerschaum.utils.venv import is_venv_active, activate_venv, deactivate_venv, Venv
540    from meerschaum.utils.warnings import warn as _warn
541    plugins_to_import = list(plugins_to_import)
542    prepended_sys_path = False
543    with _locks['sys.path']:
544
545        ### Since plugins may depend on other plugins,
546        ### we need to activate the virtual environments for library plugins.
547        ### This logic exists in `Plugin.activate_venv()`,
548        ### but that code requires the plugin's module to already be imported.
549        ### It's not a guarantee of correct activation order,
550        ### e.g. if a library plugin pins a specific package and another 
551        plugins_names = get_plugins_names()
552        already_active_venvs = [is_venv_active(plugin_name) for plugin_name in plugins_names]
553
554        if not sys.path or sys.path[0] != str(PLUGINS_RESOURCES_PATH.parent):
555            prepended_sys_path = True
556            sys.path.insert(0, str(PLUGINS_RESOURCES_PATH.parent))
557
558        if not plugins_to_import:
559            for plugin_name in plugins_names:
560                activate_venv(plugin_name)
561            try:
562                imported_plugins = importlib.import_module(PLUGINS_RESOURCES_PATH.stem)
563            except ImportError as e:
564                _warn(f"Failed to import the plugins module:\n    {e}")
565                import traceback
566                traceback.print_exc()
567                imported_plugins = None
568            for plugin_name in plugins_names:
569                if plugin_name in already_active_venvs:
570                    continue
571                deactivate_venv(plugin_name)
572
573        else:
574            imported_plugins = []
575            for plugin_name in flatten_list(plugins_to_import):
576                plugin = Plugin(plugin_name)
577                try:
578                    with Venv(plugin, init_if_not_exists=False):
579                        imported_plugins.append(
580                            importlib.import_module(
581                                f'{PLUGINS_RESOURCES_PATH.stem}.{plugin_name}'
582                            )
583                        )
584                except Exception as e:
585                    _warn(
586                        f"Failed to import plugin '{plugin_name}':\n    "
587                        + f"{e}\n\nHere's a stacktrace:",
588                        stack = False,
589                    )
590                    from meerschaum.utils.formatting import get_console
591                    get_console().print_exception(
592                        suppress = [
593                            'meerschaum/plugins/__init__.py',
594                            importlib,
595                            importlib._bootstrap,
596                        ]
597                    )
598                    imported_plugins.append(None)
599
600        if imported_plugins is None and warn:
601            _warn("Failed to import plugins.", stacklevel=3)
602
603        if prepended_sys_path and str(PLUGINS_RESOURCES_PATH.parent) in sys.path:
604            sys.path.remove(str(PLUGINS_RESOURCES_PATH.parent))
605
606    if isinstance(imported_plugins, list):
607        return (imported_plugins[0] if len(imported_plugins) == 1 else tuple(imported_plugins))
608    return imported_plugins

Import the Meerschaum plugins directory.

Parameters
  • plugins_to_import (Union[str, List[str], None]): If provided, only import the specified plugins. Otherwise import the entire plugins module. May be a string, list, or None. Defaults to None.
Returns
  • A module of list of modules, depening on the number of plugins provided.
def from_plugin_import(plugin_import_name: str, *attrs: str) -> Any:
611def from_plugin_import(plugin_import_name: str, *attrs: str) -> Any:
612    """
613    Emulate the `from module import x` behavior.
614
615    Parameters
616    ----------
617    plugin_import_name: str
618        The import name of the plugin's module.
619        Separate submodules with '.' (e.g. 'compose.utils.pipes')
620
621    attrs: str
622        Names of the attributes to return.
623
624    Returns
625    -------
626    Objects from a plugin's submodule.
627    If multiple objects are provided, return a tuple.
628
629    Examples
630    --------
631    >>> init = from_plugin_import('compose.utils', 'init')
632    >>> with mrsm.Venv('compose'):
633    ...     cf = init()
634    >>> build_parent_pipe, get_defined_pipes = from_plugin_import(
635    ...     'compose.utils.pipes',
636    ...     'build_parent_pipe',
637    ...     'get_defined_pipes',
638    ... )
639    >>> parent_pipe = build_parent_pipe(cf)
640    >>> defined_pipes = get_defined_pipes(cf)
641    """
642    import importlib
643    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
644    from meerschaum.utils.warnings import warn as _warn
645    if plugin_import_name.startswith('plugins.'):
646        plugin_import_name = plugin_import_name[len('plugins.'):]
647    plugin_import_parts = plugin_import_name.split('.')
648    plugin_root_name = plugin_import_parts[0]
649    plugin = mrsm.Plugin(plugin_root_name)
650
651    submodule_import_name = '.'.join(
652        [PLUGINS_RESOURCES_PATH.stem]
653        + plugin_import_parts
654    )
655    if len(attrs) == 0:
656        raise ValueError(f"Provide which attributes to return from '{submodule_import_name}'.")
657
658    attrs_to_return = []
659    with mrsm.Venv(plugin):
660        if plugin.module is None:
661            raise ImportError(f"Unable to import plugin '{plugin}'.")
662
663        try:
664            submodule = importlib.import_module(submodule_import_name)
665        except ImportError as e:
666            _warn(
667                f"Failed to import plugin '{submodule_import_name}':\n    "
668                + f"{e}\n\nHere's a stacktrace:",
669                stack=False,
670            )
671            from meerschaum.utils.formatting import get_console
672            get_console().print_exception(
673                suppress=[
674                    'meerschaum/plugins/__init__.py',
675                    importlib,
676                    importlib._bootstrap,
677                ]
678            )
679            return None
680
681        for attr in attrs:
682            try:
683                attrs_to_return.append(getattr(submodule, attr))
684            except Exception:
685                _warn(f"Failed to access '{attr}' from '{submodule_import_name}'.")
686                attrs_to_return.append(None)
687        
688        if len(attrs) == 1:
689            return attrs_to_return[0]
690
691        return tuple(attrs_to_return)

Emulate the from module import x behavior.

Parameters
  • plugin_import_name (str): The import name of the plugin's module. Separate submodules with '.' (e.g. 'compose.utils.pipes')
  • attrs (str): Names of the attributes to return.
Returns
  • Objects from a plugin's submodule.
  • If multiple objects are provided, return a tuple.
Examples
>>> init = from_plugin_import('compose.utils', 'init')
>>> with mrsm.Venv('compose'):
...     cf = init()
>>> build_parent_pipe, get_defined_pipes = from_plugin_import(
...     'compose.utils.pipes',
...     'build_parent_pipe',
...     'get_defined_pipes',
... )
>>> parent_pipe = build_parent_pipe(cf)
>>> defined_pipes = get_defined_pipes(cf)
def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
876def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
877    """
878    Reload plugins back into memory.
879
880    Parameters
881    ----------
882    plugins: Optional[List[str]], default None
883        The plugins to reload. `None` will reload all plugins.
884
885    """
886    global _synced_symlinks
887    unload_plugins(plugins, debug=debug)
888    _synced_symlinks = 0
889    sync_plugins_symlinks(debug=debug)
890    load_plugins(skip_if_loaded=False, debug=debug)

Reload plugins back into memory.

Parameters
  • plugins (Optional[List[str]], default None): The plugins to reload. None will reload all plugins.
def get_plugins( *to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]:
893def get_plugins(*to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]:
894    """
895    Return a list of `Plugin` objects.
896
897    Parameters
898    ----------
899    to_load:
900        If specified, only load specific plugins.
901        Otherwise return all plugins.
902
903    try_import: bool, default True
904        If `True`, allow for plugins to be imported.
905    """
906    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
907    import os
908    sync_plugins_symlinks()
909    _plugins = [
910        Plugin(name)
911        for name in (
912            to_load or [
913                (
914                    name if (PLUGINS_RESOURCES_PATH / name).is_dir()
915                    else name[:-3]
916                )
917                for name in os.listdir(PLUGINS_RESOURCES_PATH)
918                if name != '__init__.py'
919            ]
920        )
921    ]
922    plugins = tuple(plugin for plugin in _plugins if plugin.is_installed(try_import=try_import))
923    if len(to_load) == 1:
924        if len(plugins) == 0:
925            raise ValueError(f"Plugin '{to_load[0]}' is not installed.")
926        return plugins[0]
927    return plugins

Return a list of Plugin objects.

Parameters
  • to_load:: If specified, only load specific plugins. Otherwise return all plugins.
  • try_import (bool, default True): If True, allow for plugins to be imported.
def get_data_plugins() -> List[Plugin]:
944def get_data_plugins() -> List[Plugin]:
945    """
946    Only return the modules of plugins with either `fetch()` or `sync()` functions.
947    """
948    import inspect
949    plugins = get_plugins()
950    data_names = {'sync', 'fetch'}
951    data_plugins = []
952    for plugin in plugins:
953        for name, ob in inspect.getmembers(plugin.module):
954            if not inspect.isfunction(ob):
955                continue
956            if name not in data_names:
957                continue
958            data_plugins.append(plugin)
959    return data_plugins

Only return the modules of plugins with either fetch() or sync() functions.

def add_plugin_argument(*args, **kwargs) -> None:
962def add_plugin_argument(*args, **kwargs) -> None:
963    """
964    Add argparse arguments under the 'Plugins options' group.
965    Takes the same parameters as the regular argparse `add_argument()` function.
966
967    Examples
968    --------
969    >>> add_plugin_argument('--foo', type=int, help="This is my help text!")
970    >>> 
971    """
972    from meerschaum._internal.arguments._parser import groups, _seen_plugin_args, parser
973    from meerschaum.utils.warnings import warn
974    _parent_plugin_name = _get_parent_plugin()
975    title = f"Plugin '{_parent_plugin_name}' options" if _parent_plugin_name else 'Custom options'
976    group_key = 'plugin_' + (_parent_plugin_name or '')
977    if group_key not in groups:
978        groups[group_key] = parser.add_argument_group(
979            title = title,
980        )
981        _seen_plugin_args[group_key] = set()
982    try:
983        if str(args) not in _seen_plugin_args[group_key]:
984            groups[group_key].add_argument(*args, **kwargs)
985            _seen_plugin_args[group_key].add(str(args))
986    except Exception as e:
987        warn(e)

Add argparse arguments under the 'Plugins options' group. Takes the same parameters as the regular argparse add_argument() function.

Examples
>>> add_plugin_argument('--foo', type=int, help="This is my help text!")
>>>
def pre_sync_hook(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
127def pre_sync_hook(
128    function: Callable[[Any], Any],
129) -> Callable[[Any], Any]:
130    """
131    Register a function as a sync hook to be executed right before sync.
132    
133    Parameters
134    ----------
135    function: Callable[[Any], Any]
136        The function to execute right before a sync.
137        
138    Returns
139    -------
140    Another function (this is a decorator function).
141
142    Examples
143    --------
144    >>> from meerschaum.plugins import pre_sync_hook
145    >>>
146    >>> @pre_sync_hook
147    ... def log_sync(pipe, **kwargs):
148    ...     print(f"About to sync {pipe} with kwargs:\n{kwargs}.")
149    >>>
150    """
151    with _locks['_pre_sync_hooks']:
152        plugin_name = _get_parent_plugin()
153        try:
154            if plugin_name not in _pre_sync_hooks:
155                _pre_sync_hooks[plugin_name] = []
156            _pre_sync_hooks[plugin_name].append(function)
157        except Exception as e:
158            from meerschaum.utils.warnings import warn
159            warn(e)
160    return function

Register a function as a sync hook to be executed right before sync.

Parameters
----------
function: Callable[[Any], Any]
    The function to execute right before a sync.

Returns
-------
Another function (this is a decorator function).

Examples
--------
>>> from meerschaum.plugins import pre_sync_hook
>>>
>>> @pre_sync_hook
... def log_sync(pipe, **kwargs):
...     print(f"About to sync {pipe} with kwargs:

{kwargs}.")

>

def post_sync_hook(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
163def post_sync_hook(
164    function: Callable[[Any], Any],
165) -> Callable[[Any], Any]:
166    """
167    Register a function as a sync hook to be executed upon completion of a sync.
168    
169    Parameters
170    ----------
171    function: Callable[[Any], Any]
172        The function to execute upon completion of a sync.
173        
174    Returns
175    -------
176    Another function (this is a decorator function).
177
178    Examples
179    --------
180    >>> from meerschaum.plugins import post_sync_hook
181    >>> from meerschaum.utils.misc import interval_str
182    >>> from datetime import timedelta
183    >>>
184    >>> @post_sync_hook
185    ... def log_sync(pipe, success_tuple, duration=None, **kwargs):
186    ...     duration_delta = timedelta(seconds=duration)
187    ...     duration_text = interval_str(duration_delta)
188    ...     print(f"It took {duration_text} to sync {pipe}.")
189    >>>
190    """
191    with _locks['_post_sync_hooks']:
192        try:
193            plugin_name = _get_parent_plugin()
194            if plugin_name not in _post_sync_hooks:
195                _post_sync_hooks[plugin_name] = []
196            _post_sync_hooks[plugin_name].append(function)
197        except Exception as e:
198            from meerschaum.utils.warnings import warn
199            warn(e)
200    return function

Register a function as a sync hook to be executed upon completion of a sync.

Parameters
  • function (Callable[[Any], Any]): The function to execute upon completion of a sync.
Returns
  • Another function (this is a decorator function).
Examples
>>> from meerschaum.plugins import post_sync_hook
>>> from meerschaum.utils.misc import interval_str
>>> from datetime import timedelta
>>>
>>> @post_sync_hook
... def log_sync(pipe, success_tuple, duration=None, **kwargs):
...     duration_delta = timedelta(seconds=duration)
...     duration_text = interval_str(duration_delta)
...     print(f"It took {duration_text} to sync {pipe}.")
>>>