meerschaum.plugins

Expose plugin management APIs from the meerschaum.plugins module.

   1#! /usr/bin/env python
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4
   5"""
   6Expose plugin management APIs from the `meerschaum.plugins` module.
   7"""
   8
   9from __future__ import annotations
  10
  11import pathlib
  12import functools
  13from collections import defaultdict
  14
  15import meerschaum as mrsm
  16from meerschaum.utils.typing import Callable, Any, Union, Optional, Dict, List, Tuple
  17from meerschaum.utils.threading import RLock
  18from meerschaum.core.Plugin import Plugin
  19
  20_api_plugins: Dict[str, List[Callable[['fastapi.App'], Any]]] = {}
  21_pre_sync_hooks: Dict[Union[str, None], List[Callable[[Any], Any]]] = {}
  22_post_sync_hooks: Dict[Union[str, None], List[Callable[[Any], Any]]] = {}
  23_actions_daemon_enabled: Dict[str, bool] = {}
  24_locks = {
  25    '_api_plugins': RLock(),
  26    '_dash_plugins': RLock(),
  27    '_pre_sync_hooks': RLock(),
  28    '_post_sync_hooks': RLock(),
  29    '_actions_daemon_enabled': RLock(),
  30    '__path__': RLock(),
  31    'sys.path': RLock(),
  32    'internal_plugins': RLock(),
  33    '_synced_symlinks': RLock(),
  34    'PLUGINS_INTERNAL_LOCK_PATH': RLock(),
  35}
  36__all__ = (
  37    "Plugin",
  38    "make_action",
  39    "api_plugin",
  40    "dash_plugin",
  41    "web_page",
  42    "import_plugins",
  43    "from_plugin_import",
  44    "reload_plugins",
  45    "get_plugins",
  46    "get_data_plugins",
  47    "add_plugin_argument",
  48    "pre_sync_hook",
  49    "post_sync_hook",
  50)
  51__pdoc__ = {
  52    'venvs': False,
  53    'data': False,
  54    'stack': False,
  55    'plugins': False,
  56}
  57
  58
  59def make_action(
  60    function: Optional[Callable[[Any], Any]] = None,
  61    shell: bool = False,
  62    activate: bool = True,
  63    deactivate: bool = True,
  64    debug: bool = False,
  65    daemon: bool = True,
  66    skip_if_loaded: bool = True,
  67    _plugin_name: Optional[str] = None,
  68) -> Callable[[Any], Any]:
  69    """
  70    Make a function a Meerschaum action. Useful for plugins that are adding multiple actions.
  71    
  72    Parameters
  73    ----------
  74    function: Callable[[Any], Any]
  75        The function to become a Meerschaum action. Must accept all keyword arguments.
  76        
  77    shell: bool, default False
  78        Not used.
  79        
  80    Returns
  81    -------
  82    Another function (this is a decorator function).
  83
  84    Examples
  85    --------
  86    >>> from meerschaum.plugins import make_action
  87    >>>
  88    >>> @make_action
  89    ... def my_action(**kw):
  90    ...     print('foo')
  91    ...     return True, "Success"
  92    >>>
  93    """
  94    def _decorator(func: Callable[[Any], Any]) -> Callable[[Any], Any]:
  95        from meerschaum.actions import actions, _custom_actions_plugins, _plugins_actions
  96        if skip_if_loaded and func.__name__ in actions:
  97            return func
  98
  99        import meerschaum.config.paths as paths
 100        plugin_name = (
 101            func.__name__.split(
 102                f"{paths.PLUGINS_RESOURCES_PATH.stem}.",
 103                maxsplit=1,
 104            )[-1].split('.')[0]
 105        )
 106        plugin = Plugin(plugin_name) if plugin_name else None
 107
 108        if debug:
 109            from meerschaum.utils.debug import dprint
 110            dprint(
 111                f"Adding action '{func.__name__}' from plugin "
 112                f"'{plugin}'..."
 113            )
 114
 115        actions[func.__name__] = func
 116        _custom_actions_plugins[func.__name__] = plugin_name
 117        if plugin_name not in _plugins_actions:
 118            _plugins_actions[plugin_name] = []
 119        _plugins_actions[plugin_name].append(func.__name__)
 120        if not daemon:
 121            _actions_daemon_enabled[func.__name__] = False
 122        return func
 123
 124    if function is None:
 125        return _decorator
 126    return _decorator(function)
 127
 128
 129def pre_sync_hook(
 130    function: Callable[[Any], Any],
 131) -> Callable[[Any], Any]:
 132    """
 133    Register a function as a sync hook to be executed right before sync.
 134    
 135    Parameters
 136    ----------
 137    function: Callable[[Any], Any]
 138        The function to execute right before a sync.
 139        
 140    Returns
 141    -------
 142    Another function (this is a decorator function).
 143
 144    Examples
 145    --------
 146    >>> from meerschaum.plugins import pre_sync_hook
 147    >>>
 148    >>> @pre_sync_hook
 149    ... def log_sync(pipe, **kwargs):
 150    ...     print(f"About to sync {pipe} with kwargs:\n{kwargs}.")
 151    >>>
 152    """
 153    with _locks['_pre_sync_hooks']:
 154        plugin_name = _get_parent_plugin()
 155        try:
 156            if plugin_name not in _pre_sync_hooks:
 157                _pre_sync_hooks[plugin_name] = []
 158            _pre_sync_hooks[plugin_name].append(function)
 159        except Exception as e:
 160            from meerschaum.utils.warnings import warn
 161            warn(e)
 162    return function
 163
 164
 165def post_sync_hook(
 166    function: Callable[[Any], Any],
 167) -> Callable[[Any], Any]:
 168    """
 169    Register a function as a sync hook to be executed upon completion of a sync.
 170    
 171    Parameters
 172    ----------
 173    function: Callable[[Any], Any]
 174        The function to execute upon completion of a sync.
 175        
 176    Returns
 177    -------
 178    Another function (this is a decorator function).
 179
 180    Examples
 181    --------
 182    >>> from meerschaum.plugins import post_sync_hook
 183    >>> from meerschaum.utils.misc import interval_str
 184    >>> from datetime import timedelta
 185    >>>
 186    >>> @post_sync_hook
 187    ... def log_sync(pipe, success_tuple, duration=None, **kwargs):
 188    ...     duration_delta = timedelta(seconds=duration)
 189    ...     duration_text = interval_str(duration_delta)
 190    ...     print(f"It took {duration_text} to sync {pipe}.")
 191    >>>
 192    """
 193    with _locks['_post_sync_hooks']:
 194        try:
 195            plugin_name = _get_parent_plugin()
 196            if plugin_name not in _post_sync_hooks:
 197                _post_sync_hooks[plugin_name] = []
 198            _post_sync_hooks[plugin_name].append(function)
 199        except Exception as e:
 200            from meerschaum.utils.warnings import warn
 201            warn(e)
 202    return function
 203
 204
 205_plugin_endpoints_to_pages = {}
 206_plugins_web_pages = {}
 207def web_page(
 208    page: Union[str, None, Callable[[Any], Any]] = None,
 209    login_required: bool = True,
 210    skip_navbar: bool = False,
 211    page_group: Optional[str] = None,
 212    dark_theme: bool = True,
 213    **kwargs
 214) -> Any:
 215    """
 216    Quickly add pages to the dash application.
 217
 218    Parameters
 219    ----------
 220    dark_theme: bool, default True
 221        If `True`, apply the Web Console's `dbc_dark` component theme to this page
 222        (the default — most plugins want this). Set to `False` to opt out: the
 223        `dbc_dark` class is removed from `<body>` while this page is active, so a
 224        plugin's own styling applies without competing with the theme's overrides.
 225        Note the global base Bootstrap theme (dark background) still applies, so an
 226        opted-out page should paint its own background.
 227
 228    Examples
 229    --------
 230    >>> import meerschaum as mrsm
 231    >>> from meerschaum.plugins import web_page
 232    >>> html = mrsm.attempt_import('dash.html')
 233    >>>
 234    >>> @web_page('foo/bar', login_required=False)
 235    >>> def foo_bar():
 236    ...     return html.Div([html.H1("Hello, World!")])
 237    >>>
 238    """
 239    page_str = None
 240
 241    def _decorator(_func: Callable[[Any], Any]) -> Callable[[Any], Any]:
 242        nonlocal page_str, page_group
 243
 244        @functools.wraps(_func)
 245        def wrapper(*_args, **_kwargs):
 246            return _func(*_args, **_kwargs)
 247
 248        if page_str is None:
 249            page_str = _func.__name__
 250
 251        page_str = page_str.lstrip('/').rstrip('/').strip()
 252        if not page_str.startswith('dash'):
 253            page_str = f'/dash/{page_str}'
 254        page_key = (
 255            ' '.join(
 256                [
 257                    word.capitalize()
 258                    for word in (
 259                        page_str.replace('/dash', '').lstrip('/').rstrip('/').strip()
 260                        .replace('-', ' ').replace('_', ' ').split(' ')
 261                    )
 262                ]
 263            )
 264        )
 265 
 266        plugin_name = _get_parent_plugin()
 267        page_group = page_group or plugin_name
 268        if page_group not in _plugin_endpoints_to_pages:
 269            _plugin_endpoints_to_pages[page_group] = {}
 270        _plugin_endpoints_to_pages[page_group][page_str] = {
 271            'function': _func,
 272            'login_required': login_required,
 273            'skip_navbar': skip_navbar,
 274            'page_key': page_key,
 275            'dark_theme': dark_theme,
 276        }
 277        if plugin_name not in _plugins_web_pages:
 278            _plugins_web_pages[plugin_name] = []
 279        _plugins_web_pages[plugin_name].append(_func)
 280        return wrapper
 281
 282    if callable(page):
 283        decorator_to_return = _decorator(page)
 284        page_str = page.__name__
 285    else:
 286        decorator_to_return = _decorator
 287        page_str = page
 288
 289    return decorator_to_return
 290
 291
 292_dash_plugins = {}
 293def dash_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
 294    """
 295    Execute the function when starting the Dash application.
 296    """
 297    with _locks['_dash_plugins']:
 298        plugin_name = _get_parent_plugin()
 299        try:
 300            if plugin_name not in _dash_plugins:
 301                _dash_plugins[plugin_name] = []
 302            _dash_plugins[plugin_name].append(function)
 303        except Exception as e:
 304            from meerschaum.utils.warnings import warn
 305            warn(e)
 306    return function
 307
 308
 309def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
 310    """
 311    Execute the function when initializing the Meerschaum API module.
 312    Useful for lazy-loading heavy plugins only when the API is started,
 313    such as when editing the `meerschaum.api.app` FastAPI app.
 314    
 315    The FastAPI app will be passed as the only parameter.
 316    
 317    Examples
 318    --------
 319    >>> from meerschaum.plugins import api_plugin
 320    >>>
 321    >>> @api_plugin
 322    >>> def initialize_plugin(app):
 323    ...     @app.get('/my/new/path')
 324    ...     def new_path():
 325    ...         return {'message': 'It works!'}
 326    >>>
 327    """
 328    with _locks['_api_plugins']:
 329        try:
 330            if function.__module__ not in _api_plugins:
 331                _api_plugins[function.__module__] = []
 332            _api_plugins[function.__module__].append(function)
 333        except Exception as e:
 334            from meerschaum.utils.warnings import warn
 335            warn(e)
 336    return function
 337
 338
 339_synced_symlinks: int = 0
 340_injected_plugin_symlinks = defaultdict(lambda: 0)
 341def sync_plugins_symlinks(debug: bool = False, warn: bool = True) -> None:
 342    """
 343    Update the plugins' internal symlinks. 
 344    """
 345    from meerschaum.utils.warnings import error, warn as _warn, dprint
 346    global _synced_symlinks
 347    with _locks['_synced_symlinks']:
 348        if _synced_symlinks > 1:
 349            if debug:
 350                dprint("Skip syncing symlinks...")
 351            return
 352
 353    import os
 354    import pathlib
 355    import time
 356    from collections import defaultdict
 357    from meerschaum.utils.misc import flatten_list, make_symlink, is_symlink
 358    from meerschaum._internal.static import STATIC_CONFIG
 359    import meerschaum.config.paths as paths
 360
 361    ### If the lock file exists, sleep for up to a second or until it's removed before continuing.
 362    with _locks['PLUGINS_INTERNAL_LOCK_PATH']:
 363        if paths.PLUGINS_INTERNAL_LOCK_PATH.exists():
 364            lock_sleep_total = STATIC_CONFIG['plugins']['lock_sleep_total']
 365            lock_sleep_increment = STATIC_CONFIG['plugins']['lock_sleep_increment']
 366            lock_start = time.perf_counter()
 367            while (
 368                (time.perf_counter() - lock_start) < lock_sleep_total
 369            ):
 370                time.sleep(lock_sleep_increment)
 371                if not paths.PLUGINS_INTERNAL_LOCK_PATH.exists():
 372                    break
 373                try:
 374                    paths.PLUGINS_INTERNAL_LOCK_PATH.unlink()
 375                except Exception as e:
 376                    if warn:
 377                        _warn(
 378                            f"Error while removing lockfile {paths.PLUGINS_INTERNAL_LOCK_PATH}:\n"
 379                            f"{e}"
 380                        )
 381                    break
 382
 383        ### Begin locking from other processes.
 384        try:
 385            paths.PLUGINS_INTERNAL_LOCK_PATH.touch()
 386        except Exception as e:
 387            if warn:
 388                _warn(f"Unable to create lockfile {paths.PLUGINS_INTERNAL_LOCK_PATH}:\n{e}")
 389
 390    with _locks['internal_plugins']:
 391
 392        try:
 393            from importlib.metadata import entry_points
 394        except ImportError:
 395            importlib_metadata = mrsm.attempt_import('importlib_metadata', lazy=False)
 396            entry_points = importlib_metadata.entry_points
 397
 398        ### NOTE: Allow plugins to be installed via `pip`.
 399        packaged_plugin_paths = []
 400        try:
 401            discovered_packaged_plugins_eps = entry_points(group='meerschaum.plugins')
 402        except TypeError:
 403            discovered_packaged_plugins_eps = []
 404
 405        for ep in discovered_packaged_plugins_eps:
 406            module_name = ep.name
 407            for package_file_path in ep.dist.files:
 408                if package_file_path.suffix != '.py':
 409                    continue
 410                if str(package_file_path) == f'{module_name}.py':
 411                    packaged_plugin_paths.append(package_file_path.locate())
 412                elif str(package_file_path) == f'{module_name}/__init__.py':
 413                    packaged_plugin_paths.append(package_file_path.locate().parent)
 414
 415        if is_symlink(paths.PLUGINS_RESOURCES_PATH) or not paths.PLUGINS_RESOURCES_PATH.exists():
 416            try:
 417                paths.PLUGINS_RESOURCES_PATH.unlink()
 418            except Exception:
 419                pass
 420
 421        paths.PLUGINS_RESOURCES_PATH.mkdir(exist_ok=True)
 422
 423        existing_symlinked_paths = {
 424            _existing_symlink: pathlib.Path(os.path.realpath(_existing_symlink))
 425            for item in os.listdir(paths.PLUGINS_RESOURCES_PATH)
 426            if is_symlink(_existing_symlink := (paths.PLUGINS_RESOURCES_PATH / item))
 427        }
 428        injected_symlinked_paths = {
 429            _injected_symlink: pathlib.Path(os.path.realpath(_injected_symlink))
 430            for item in os.listdir(paths.PLUGINS_INJECTED_RESOURCES_PATH)
 431            if is_symlink(_injected_symlink := (paths.PLUGINS_INJECTED_RESOURCES_PATH / item))
 432        }
 433        plugins_to_be_symlinked = list(flatten_list(
 434            [
 435                [
 436                    pathlib.Path(os.path.realpath(plugins_path / item))
 437                    for item in os.listdir(plugins_path)
 438                    if (
 439                        not item.startswith('.')
 440                    ) and (item not in ('__pycache__', '__init__.py'))
 441                ]
 442                for plugins_path in paths.PLUGINS_DIR_PATHS
 443                if plugins_path.exists()
 444            ]
 445        ))
 446        plugins_to_be_symlinked.extend(packaged_plugin_paths)
 447
 448        ### Check for duplicates.
 449        seen_plugins = defaultdict(lambda: 0)
 450        for plugin_path in plugins_to_be_symlinked:
 451            plugin_name = plugin_path.stem
 452            seen_plugins[plugin_name] += 1
 453        for plugin_name, plugin_count in seen_plugins.items():
 454            if plugin_count > 1:
 455                if warn:
 456                    _warn(f"Found duplicate plugins named '{plugin_name}'.")
 457
 458        for plugin_symlink_path, real_path in existing_symlinked_paths.items():
 459
 460            ### Remove invalid symlinks.
 461            if real_path not in plugins_to_be_symlinked:
 462                if _injected_plugin_symlinks[plugin_symlink_path] > 1:
 463                    continue
 464                if plugin_symlink_path in injected_symlinked_paths:
 465                    continue
 466                if real_path in injected_symlinked_paths.values():
 467                    continue
 468                try:
 469                    plugin_symlink_path.unlink()
 470                except Exception:
 471                    pass
 472
 473            ### Remove valid plugins from the to-be-symlinked list.
 474            else:
 475                plugins_to_be_symlinked.remove(real_path)
 476
 477        for plugin_path in plugins_to_be_symlinked:
 478            plugin_symlink_path = paths.PLUGINS_RESOURCES_PATH / plugin_path.name
 479            try:
 480                ### There might be duplicate folders (e.g. __pycache__).
 481                if (
 482                    plugin_symlink_path.exists()
 483                    and
 484                    plugin_symlink_path.is_dir()
 485                    and
 486                    not is_symlink(plugin_symlink_path)
 487                ):
 488                    continue
 489                success, msg = make_symlink(plugin_path, plugin_symlink_path)
 490            except Exception as e:
 491                success, msg = False, str(e)
 492            if not success:
 493                if warn:
 494                    _warn(
 495                        f"Failed to create symlink {plugin_symlink_path} "
 496                        + f"to {plugin_path}:\n    {msg}"
 497                    )
 498
 499    ### Release symlink lock file in case other processes need it.
 500    with _locks['PLUGINS_INTERNAL_LOCK_PATH']:
 501        try:
 502            if paths.PLUGINS_INTERNAL_LOCK_PATH.exists():
 503                paths.PLUGINS_INTERNAL_LOCK_PATH.unlink()
 504        ### Sometimes competing threads will delete the lock file at the same time.
 505        except FileNotFoundError:
 506            pass
 507        except Exception as e:
 508            if warn:
 509                _warn(f"Error cleaning up lockfile {paths.PLUGINS_INTERNAL_LOCK_PATH}:\n{e}")
 510
 511        try:
 512            if not paths.PLUGINS_INIT_PATH.exists():
 513                paths.PLUGINS_INIT_PATH.touch()
 514        except Exception as e:
 515            error(f"Failed to create the file '{paths.PLUGINS_INIT_PATH}':\n{e}")
 516
 517    with _locks['__path__']:
 518        if str(paths.PLUGINS_RESOURCES_PATH.parent) not in __path__:
 519            __path__.append(str(paths.PLUGINS_RESOURCES_PATH.parent))
 520
 521    with _locks['_synced_symlinks']:
 522        _synced_symlinks += 1
 523
 524
 525def import_plugins(
 526    *plugins_to_import: Union[str, List[str], None],
 527    warn: bool = True,
 528) -> Union[
 529    'ModuleType', Tuple['ModuleType', None]
 530]:
 531    """
 532    Import the Meerschaum plugins directory.
 533
 534    Parameters
 535    ----------
 536    plugins_to_import: Union[str, List[str], None]
 537        If provided, only import the specified plugins.
 538        Otherwise import the entire plugins module. May be a string, list, or `None`.
 539        Defaults to `None`.
 540
 541    Returns
 542    -------
 543    A module of list of modules, depening on the number of plugins provided.
 544
 545    """
 546    import sys
 547    import importlib
 548    import meerschaum.config.paths as paths
 549    from meerschaum.utils.misc import flatten_list
 550    from meerschaum.utils.venv import is_venv_active, activate_venv, deactivate_venv, Venv
 551    from meerschaum.utils.warnings import warn as _warn
 552    plugins_to_import = list(plugins_to_import)
 553    prepended_sys_path = False
 554    with _locks['sys.path']:
 555
 556        ### Since plugins may depend on other plugins,
 557        ### we need to activate the virtual environments for library plugins.
 558        ### This logic exists in `Plugin.activate_venv()`,
 559        ### but that code requires the plugin's module to already be imported.
 560        ### It's not a guarantee of correct activation order,
 561        ### e.g. if a library plugin pins a specific package and another 
 562        plugins_names = get_plugins_names()
 563        already_active_venvs = [is_venv_active(plugin_name) for plugin_name in plugins_names]
 564
 565        if not sys.path or sys.path[0] != str(paths.PLUGINS_RESOURCES_PATH.parent):
 566            prepended_sys_path = True
 567            sys.path.insert(0, str(paths.PLUGINS_RESOURCES_PATH.parent))
 568
 569        if not plugins_to_import:
 570            for plugin_name in plugins_names:
 571                activate_venv(plugin_name)
 572            try:
 573                imported_plugins = importlib.import_module(paths.PLUGINS_RESOURCES_PATH.stem)
 574            except ImportError as e:
 575                _warn(f"Failed to import the plugins module:\n    {e}")
 576                import traceback
 577                traceback.print_exc()
 578                imported_plugins = None
 579            for plugin_name in plugins_names:
 580                if plugin_name in already_active_venvs:
 581                    continue
 582                deactivate_venv(plugin_name)
 583
 584        else:
 585            imported_plugins = []
 586            for plugin_name in flatten_list(plugins_to_import):
 587                plugin = Plugin(plugin_name)
 588                try:
 589                    with Venv(plugin, init_if_not_exists=False):
 590                        imported_plugins.append(
 591                            importlib.import_module(
 592                                f'{paths.PLUGINS_RESOURCES_PATH.stem}.{plugin_name}'
 593                            )
 594                        )
 595                except Exception as e:
 596                    _warn(
 597                        f"Failed to import plugin '{plugin_name}':\n    "
 598                        + f"{e}\n\nHere's a stacktrace:",
 599                        stack = False,
 600                    )
 601                    from meerschaum.utils.formatting import get_console
 602                    get_console().print_exception(
 603                        suppress = [
 604                            'meerschaum/plugins/__init__.py',
 605                            importlib,
 606                            importlib._bootstrap,
 607                        ]
 608                    )
 609                    imported_plugins.append(None)
 610
 611        if imported_plugins is None and warn:
 612            _warn("Failed to import plugins.", stacklevel=3)
 613
 614        if prepended_sys_path and str(paths.PLUGINS_RESOURCES_PATH.parent) in sys.path:
 615            sys.path.remove(str(paths.PLUGINS_RESOURCES_PATH.parent))
 616
 617    if isinstance(imported_plugins, list):
 618        return (imported_plugins[0] if len(imported_plugins) == 1 else tuple(imported_plugins))
 619    return imported_plugins
 620
 621
 622def from_plugin_import(plugin_import_name: str, *attrs: str) -> Any:
 623    """
 624    Emulate the `from module import x` behavior.
 625
 626    Parameters
 627    ----------
 628    plugin_import_name: str
 629        The import name of the plugin's module.
 630        Separate submodules with '.' (e.g. 'compose.utils.pipes')
 631
 632    attrs: str
 633        Names of the attributes to return.
 634
 635    Returns
 636    -------
 637    Objects from a plugin's submodule.
 638    If multiple objects are provided, return a tuple.
 639
 640    Examples
 641    --------
 642    >>> init = from_plugin_import('compose.utils', 'init')
 643    >>> with mrsm.Venv('compose'):
 644    ...     cf = init()
 645    >>> build_parent_pipe, get_defined_pipes = from_plugin_import(
 646    ...     'compose.utils.pipes',
 647    ...     'build_parent_pipe',
 648    ...     'get_defined_pipes',
 649    ... )
 650    >>> parent_pipe = build_parent_pipe(cf)
 651    >>> defined_pipes = get_defined_pipes(cf)
 652    """
 653    import importlib
 654    import meerschaum.config.paths as paths
 655    from meerschaum.utils.warnings import warn as _warn
 656    if plugin_import_name.startswith('plugins.'):
 657        plugin_import_name = plugin_import_name[len('plugins.'):]
 658    plugin_import_parts = plugin_import_name.split('.')
 659    plugin_root_name = plugin_import_parts[0]
 660    plugin = mrsm.Plugin(plugin_root_name)
 661
 662    submodule_import_name = '.'.join(
 663        [paths.PLUGINS_RESOURCES_PATH.stem]
 664        + plugin_import_parts
 665    )
 666    if len(attrs) == 0:
 667        raise ValueError(f"Provide which attributes to return from '{submodule_import_name}'.")
 668
 669    attrs_to_return = []
 670    with mrsm.Venv(plugin):
 671        if plugin.module is None:
 672            raise ImportError(f"Unable to import plugin '{plugin}'.")
 673
 674        try:
 675            submodule = importlib.import_module(submodule_import_name)
 676        except ImportError as e:
 677            _warn(
 678                f"Failed to import plugin '{submodule_import_name}':\n    "
 679                + f"{e}\n\nHere's a stacktrace:",
 680                stack=False,
 681            )
 682            from meerschaum.utils.formatting import get_console
 683            get_console().print_exception(
 684                suppress=[
 685                    'meerschaum/plugins/__init__.py',
 686                    importlib,
 687                    importlib._bootstrap,
 688                ]
 689            )
 690            return None
 691
 692        for attr in attrs:
 693            try:
 694                attrs_to_return.append(getattr(submodule, attr))
 695            except Exception:
 696                _warn(f"Failed to access '{attr}' from '{submodule_import_name}'.")
 697                attrs_to_return.append(None)
 698        
 699        if len(attrs) == 1:
 700            return attrs_to_return[0]
 701
 702        return tuple(attrs_to_return)
 703
 704
 705_loaded_plugins: bool = False
 706def load_plugins(
 707    skip_if_loaded: bool = True,
 708    shell: bool = False,
 709    debug: bool = False,
 710) -> None:
 711    """
 712    Import Meerschaum plugins and update the actions dictionary.
 713    """
 714    global _loaded_plugins
 715    from meerschaum.utils.warnings import dprint
 716
 717    if skip_if_loaded and _loaded_plugins:
 718        if debug:
 719            dprint("Skip loading plugins...")
 720        return
 721
 722    from inspect import isfunction, getmembers
 723    import meerschaum.config.paths as paths
 724    from meerschaum.actions import __all__ as _all, modules
 725    from meerschaum.utils.packages import get_modules_from_package
 726
 727    _plugins_names, plugins_modules = get_modules_from_package(
 728        import_plugins(),
 729        names = True,
 730        recursive = True,
 731        modules_venvs = True
 732    )
 733
 734    ### I'm appending here to keep from redefining the modules list.
 735    new_modules = (
 736        [
 737            mod
 738            for mod in modules
 739            if not mod.__name__.startswith(paths.PLUGINS_RESOURCES_PATH.stem + '.')
 740        ]
 741        + plugins_modules
 742    )
 743    n_mods = len(modules)
 744    for mod in new_modules:
 745        modules.append(mod)
 746    for i in range(n_mods):
 747        modules.pop(0)
 748
 749    for module in plugins_modules:
 750        for name, func in getmembers(module):
 751            if not isfunction(func):
 752                continue
 753            if name == module.__name__.split('.')[-1]:
 754                make_action(
 755                    func,
 756                    **{'shell': shell, 'debug': debug},
 757                    _plugin_name=name,
 758                    skip_if_loaded=True,
 759                )
 760
 761    _loaded_plugins = True
 762
 763
 764def unload_custom_actions(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
 765    """
 766    Unload the custom actions added by plugins.
 767    """
 768    from meerschaum.actions import (
 769        actions,
 770        _custom_actions_plugins,
 771        _plugins_actions,
 772    )
 773    from meerschaum._internal.entry import _shell
 774    import meerschaum._internal.shell as shell_pkg
 775
 776    plugins = plugins if plugins is not None else list(_plugins_actions)
 777
 778    for plugin in plugins:
 779        action_names = _plugins_actions.get(plugin, [])
 780        actions_to_remove = {
 781            action_name: actions.get(action_name, None)
 782            for action_name in action_names
 783        }
 784        for action_name in action_names:
 785            _ = actions.pop(action_name, None)
 786            _ = _custom_actions_plugins.pop(action_name, None)
 787            _ = _actions_daemon_enabled.pop(action_name, None)
 788
 789        _ = _plugins_actions.pop(plugin, None)
 790        shell_pkg._remove_shell_actions(
 791            _shell=_shell,
 792            actions=actions_to_remove,
 793        )
 794    
 795
 796def unload_plugins(
 797    plugins: Optional[List[str]] = None,
 798    remove_symlinks: bool = True,
 799    debug: bool = False,
 800) -> None:
 801    """
 802    Unload the specified plugins from memory.
 803    """
 804    global _loaded_plugins, _synced_symlinks
 805    import sys
 806    import meerschaum.config.paths as paths
 807    from meerschaum.connectors import unload_plugin_connectors
 808    if debug:
 809        from meerschaum.utils.warnings import dprint
 810
 811    _loaded_plugins = False
 812    _synced_symlinks = 0
 813
 814    all_plugins = get_plugins_names()
 815    plugins = plugins if plugins is not None else all_plugins
 816    if debug:
 817        dprint(f"Unloading plugins: {plugins}")
 818
 819    unload_custom_actions(plugins, debug=debug)
 820    unload_plugin_connectors(plugins, debug=debug)
 821
 822    module_prefix = f"{paths.PLUGINS_RESOURCES_PATH.stem}."
 823    loaded_modules = [mod_name for mod_name in sys.modules if mod_name.startswith(module_prefix)]
 824
 825    root_plugins_mod = (
 826        sys.modules.get(paths.PLUGINS_RESOURCES_PATH.stem, None)
 827        if sorted(plugins) != sorted(all_plugins)
 828        else sys.modules.pop(paths.PLUGINS_RESOURCES_PATH, None)
 829    )
 830
 831    for plugin_name in plugins:
 832        for mod_name in loaded_modules:
 833            if (
 834                mod_name[len(module_prefix):].startswith(plugin_name + '.')
 835                or mod_name[len(module_prefix):] == plugin_name
 836            ):
 837                _ = sys.modules.pop(mod_name, None)
 838
 839        if root_plugins_mod is not None and plugin_name in root_plugins_mod.__dict__:
 840            try:
 841                delattr(root_plugins_mod, plugin_name)
 842            except Exception:
 843                pass
 844
 845        ### Unload sync hooks.
 846        _ = _pre_sync_hooks.pop(plugin_name, None)
 847        _ = _post_sync_hooks.pop(plugin_name, None)
 848
 849        ### Unload API endpoints and pages.
 850        _ = _dash_plugins.pop(plugin_name, None)
 851        web_page_funcs = _plugins_web_pages.pop(plugin_name, None) or []
 852        page_groups_to_pop = []
 853        for page_group, page_functions in _plugin_endpoints_to_pages.items():
 854            page_functions_to_pop = [
 855                page_str
 856                for page_str, page_payload in page_functions.items()
 857                if page_payload.get('function', None) in web_page_funcs
 858            ]
 859            for page_str in page_functions_to_pop:
 860                page_functions.pop(page_str, None)
 861            if not page_functions:
 862                page_groups_to_pop.append(page_group)
 863        
 864        for page_group in page_groups_to_pop:
 865            _plugin_endpoints_to_pages.pop(page_group, None)
 866
 867        ### Remove all but injected symlinks.
 868        if remove_symlinks:
 869            dir_symlink_path = paths.PLUGINS_RESOURCES_PATH / plugin_name
 870            dir_symlink_injected_path = paths.PLUGINS_INJECTED_RESOURCES_PATH / plugin_name
 871            file_symlink_path = paths.PLUGINS_RESOURCES_PATH / f"{plugin_name}.py"
 872            file_symlink_injected_path = paths.PLUGINS_INJECTED_RESOURCES_PATH / f"{plugin_name}.py"
 873
 874            try:
 875                if dir_symlink_path.exists() and not dir_symlink_injected_path.exists():
 876                    dir_symlink_path.unlink()
 877            except Exception:
 878                pass
 879
 880            try:
 881                if file_symlink_path.exists() and not file_symlink_injected_path.exists():
 882                    file_symlink_path.unlink()
 883            except Exception:
 884                pass
 885
 886
 887def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
 888    """
 889    Reload plugins back into memory.
 890
 891    Parameters
 892    ----------
 893    plugins: Optional[List[str]], default None
 894        The plugins to reload. `None` will reload all plugins.
 895
 896    """
 897    global _synced_symlinks
 898    unload_plugins(plugins, debug=debug)
 899    _synced_symlinks = 0
 900    sync_plugins_symlinks(debug=debug)
 901    load_plugins(skip_if_loaded=False, debug=debug)
 902
 903
 904def get_plugins(*to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]:
 905    """
 906    Return a list of `Plugin` objects.
 907
 908    Parameters
 909    ----------
 910    to_load:
 911        If specified, only load specific plugins.
 912        Otherwise return all plugins.
 913
 914    try_import: bool, default True
 915        If `True`, allow for plugins to be imported.
 916    """
 917    import meerschaum.config.paths as paths
 918    import os
 919    sync_plugins_symlinks()
 920    _plugins = [
 921        Plugin(name)
 922        for name in (
 923            to_load or [
 924                (
 925                    name if (paths.PLUGINS_RESOURCES_PATH / name).is_dir()
 926                    else name[:-3]
 927                )
 928                for name in os.listdir(paths.PLUGINS_RESOURCES_PATH)
 929                if name != '__init__.py'
 930            ]
 931        )
 932    ]
 933    plugins = tuple(plugin for plugin in _plugins if plugin.is_installed(try_import=try_import))
 934    if len(to_load) == 1:
 935        if len(plugins) == 0:
 936            raise ValueError(f"Plugin '{to_load[0]}' is not installed.")
 937        return plugins[0]
 938    return plugins
 939
 940
 941def get_plugins_names(*to_load, **kw) -> List[str]:
 942    """
 943    Return a list of installed plugins.
 944    """
 945    return [plugin.name for plugin in get_plugins(*to_load, **kw)]
 946
 947
 948def get_plugins_modules(*to_load, **kw) -> List['ModuleType']:
 949    """
 950    Return a list of modules for the installed plugins, or `None` if things break.
 951    """
 952    return [plugin.module for plugin in get_plugins(*to_load, **kw)]
 953
 954
 955def get_data_plugins() -> List[Plugin]:
 956    """
 957    Only return the modules of plugins with either `fetch()` or `sync()` functions.
 958    """
 959    import inspect
 960    plugins = get_plugins()
 961    data_names = {'sync', 'fetch'}
 962    data_plugins = []
 963    for plugin in plugins:
 964        for name, ob in inspect.getmembers(plugin.module):
 965            if not inspect.isfunction(ob):
 966                continue
 967            if name not in data_names:
 968                continue
 969            data_plugins.append(plugin)
 970    return data_plugins
 971
 972
 973def add_plugin_argument(*args, **kwargs) -> None:
 974    """
 975    Add argparse arguments under the 'Plugins options' group.
 976    Takes the same parameters as the regular argparse `add_argument()` function.
 977
 978    Examples
 979    --------
 980    >>> add_plugin_argument('--foo', type=int, help="This is my help text!")
 981    >>> 
 982    """
 983    from meerschaum._internal.arguments._parser import groups, _seen_plugin_args, parser
 984    from meerschaum.utils.warnings import warn
 985    _parent_plugin_name = _get_parent_plugin()
 986    title = f"Plugin '{_parent_plugin_name}' options" if _parent_plugin_name else 'Custom options'
 987    group_key = 'plugin_' + (_parent_plugin_name or '')
 988    if group_key not in groups:
 989        groups[group_key] = parser.add_argument_group(
 990            title = title,
 991        )
 992        _seen_plugin_args[group_key] = set()
 993    try:
 994        if str(args) not in _seen_plugin_args[group_key]:
 995            groups[group_key].add_argument(*args, **kwargs)
 996            _seen_plugin_args[group_key].add(str(args))
 997    except Exception as e:
 998        warn(e)
 999
1000
1001def inject_plugin_path(
1002    plugin_path: pathlib.Path,
1003    plugins_resources_path: Optional[pathlib.Path] = None) -> None:
1004    """
1005    Inject a plugin as a symlink into the internal `plugins` directory.
1006
1007    Parameters
1008    ----------
1009    plugin_path: pathlib.Path
1010        The path to the plugin's source module.
1011    """
1012    import meerschaum.config.paths as paths
1013    from meerschaum.utils.misc import make_symlink
1014    if plugins_resources_path is None:
1015        plugins_resources_path = paths.PLUGINS_RESOURCES_PATH
1016        plugins_injected_resources_path = paths.PLUGINS_INJECTED_RESOURCES_PATH
1017    else:
1018        plugins_injected_resources_path = plugins_resources_path / '.injected'
1019
1020    if plugin_path.is_dir():
1021        plugin_name = plugin_path.name
1022        dest_path = plugins_resources_path / plugin_name
1023        injected_path = plugins_injected_resources_path / plugin_name
1024    elif plugin_path.name == '__init__.py':
1025        plugin_name = plugin_path.parent.name
1026        dest_path = plugins_resources_path / plugin_name
1027        injected_path = plugins_injected_resources_path / plugin_name
1028    elif plugin_path.name.endswith('.py'):
1029        plugin_name = plugin_path.name[:(-1 * len('.py'))]
1030        dest_path = plugins_resources_path / plugin_path.name
1031        injected_path = plugins_injected_resources_path / plugin_path.name
1032    else:
1033        raise ValueError(f"Cannot deduce plugin name from path '{plugin_path}'.")
1034
1035    _injected_plugin_symlinks[dest_path] += 1
1036    make_symlink(plugin_path, dest_path)
1037    make_symlink(plugin_path, injected_path)
1038
1039
1040def _get_parent_plugin(stacklevel: Union[int, Tuple[int, ...]] = (1, 2, 3, 4)) -> Union[str, None]:
1041    """If this function is called from outside a Meerschaum plugin, it will return None."""
1042    import inspect
1043    if not isinstance(stacklevel, tuple):
1044        stacklevel = (stacklevel,)
1045
1046    for _level in stacklevel:
1047        try:
1048            parent_globals = inspect.stack()[_level][0].f_globals
1049            global_name = parent_globals.get('__name__', '')
1050            if global_name.startswith('meerschaum.'):
1051                continue
1052            plugin_name = global_name.replace('plugins.', '').split('.')[0]
1053            if plugin_name.startswith('_') or plugin_name == 'importlib':
1054                continue
1055            return plugin_name
1056        except Exception:
1057            continue
1058
1059    return None
class Plugin:
 30class Plugin:
 31    """Handle packaging of Meerschaum plugins."""
 32
 33    def __init__(
 34        self,
 35        name: str,
 36        version: Optional[str] = None,
 37        user_id: Optional[int] = None,
 38        required: Optional[List[str]] = None,
 39        attributes: Optional[Dict[str, Any]] = None,
 40        archive_path: Optional[pathlib.Path] = None,
 41        venv_path: Optional[pathlib.Path] = None,
 42        repo_connector: Optional['mrsm.connectors.api.APIConnector'] = None,
 43        repo: Union['mrsm.connectors.api.APIConnector', str, None] = None,
 44    ):
 45        import meerschaum.config.paths as paths
 46        from meerschaum._internal.static import STATIC_CONFIG
 47        sep = STATIC_CONFIG['plugins']['repo_separator']
 48        _repo = None
 49        if sep in name:
 50            try:
 51                name, _repo = name.split(sep)
 52            except Exception as e:
 53                error(f"Invalid plugin name: '{name}'")
 54        self._repo_in_name = _repo
 55
 56        if attributes is None:
 57            attributes = {}
 58        self.name = name
 59        self.attributes = attributes
 60        self.user_id = user_id
 61        self._version = version
 62        if required:
 63            self._required = required
 64        self.archive_path = (
 65            archive_path if archive_path is not None
 66            else paths.PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz"
 67        )
 68        self.venv_path = (
 69            venv_path if venv_path is not None
 70            else paths.VIRTENV_RESOURCES_PATH / self.name
 71        )
 72        self._repo_connector = repo_connector
 73        self._repo_keys = repo
 74
 75
 76    @property
 77    def repo_connector(self):
 78        """
 79        Return the repository connector for this plugin.
 80        NOTE: This imports the `connectors` module, which imports certain plugin modules.
 81        """
 82        if self._repo_connector is None:
 83            from meerschaum.connectors.parse import parse_repo_keys
 84
 85            repo_keys = self._repo_keys or self._repo_in_name
 86            if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name:
 87                error(
 88                    f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'."
 89                )
 90            repo_connector = parse_repo_keys(repo_keys)
 91            self._repo_connector = repo_connector
 92        return self._repo_connector
 93
 94
 95    @property
 96    def version(self):
 97        """
 98        Return the plugin's module version is defined (`__version__`) if it's defined.
 99        """
100        if self._version is None:
101            try:
102                self._version = self.module.__version__
103            except Exception as e:
104                self._version = None
105        return self._version
106
107
108    @property
109    def module(self):
110        """
111        Return the Python module of the underlying plugin.
112        """
113        if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None:
114            if self.__file__ is None:
115                return None
116
117            from meerschaum.plugins import import_plugins
118            self._module = import_plugins(str(self), warn=False)
119
120        return self._module
121
122
123    @property
124    def __file__(self) -> Union[str, None]:
125        """
126        Return the file path (str) of the plugin if it exists, otherwise `None`.
127        """
128        if self.__dict__.get('_module', None) is not None:
129            return self.module.__file__
130
131        import meerschaum.config.paths as paths
132
133        potential_dir = paths.PLUGINS_RESOURCES_PATH / self.name
134        if (
135            potential_dir.exists()
136            and potential_dir.is_dir()
137            and (potential_dir / '__init__.py').exists()
138        ):
139            return str((potential_dir / '__init__.py').as_posix())
140
141        potential_file = paths.PLUGINS_RESOURCES_PATH / (self.name + '.py')
142        if potential_file.exists() and not potential_file.is_dir():
143            return str(potential_file.as_posix())
144
145        return None
146
147
148    @property
149    def requirements_file_path(self) -> Union[pathlib.Path, None]:
150        """
151        If a file named `requirements.txt` exists, return its path.
152        """
153        if self.__file__ is None:
154            return None
155        path = pathlib.Path(self.__file__).parent / 'requirements.txt'
156        if not path.exists():
157            return None
158        return path
159
160
161    def is_installed(self, **kw) -> bool:
162        """
163        Check whether a plugin is correctly installed.
164
165        Returns
166        -------
167        A `bool` indicating whether a plugin exists and is successfully imported.
168        """
169        return self.__file__ is not None
170
171
172    def make_tar(self, debug: bool = False) -> pathlib.Path:
173        """
174        Compress the plugin's source files into a `.tar.gz` archive and return the archive's path.
175
176        Parameters
177        ----------
178        debug: bool, default False
179            Verbosity toggle.
180
181        Returns
182        -------
183        A `pathlib.Path` to the archive file's path.
184
185        """
186        import tarfile, pathlib, subprocess, fnmatch
187        from meerschaum.utils.debug import dprint
188        from meerschaum.utils.packages import attempt_import
189        pathspec = attempt_import('pathspec', debug=debug)
190
191        if not self.__file__:
192            from meerschaum.utils.warnings import error
193            error(f"Could not find file for plugin '{self}'.")
194        if '__init__.py' in self.__file__ or os.path.isdir(self.__file__):
195            path = self.__file__.replace('__init__.py', '')
196            is_dir = True
197        else:
198            path = self.__file__
199            is_dir = False
200
201        old_cwd = os.getcwd()
202        real_parent_path = pathlib.Path(os.path.realpath(path)).parent
203        os.chdir(real_parent_path)
204
205        default_patterns_to_ignore = [
206            '.pyc',
207            '__pycache__/',
208            'eggs/',
209            '__pypackages__/',
210            '.git',
211        ]
212
213        def parse_gitignore() -> 'Set[str]':
214            gitignore_path = pathlib.Path(path) / '.gitignore'
215            if not gitignore_path.exists():
216                return set(default_patterns_to_ignore)
217            with open(gitignore_path, 'r', encoding='utf-8') as f:
218                gitignore_text = f.read()
219            return set(pathspec.PathSpec.from_lines(
220                pathspec.patterns.GitWildMatchPattern,
221                default_patterns_to_ignore + gitignore_text.splitlines()
222            ).match_tree(path))
223
224        patterns_to_ignore = parse_gitignore() if is_dir else set()
225
226        if debug:
227            dprint(f"Patterns to ignore:\n{patterns_to_ignore}")
228
229        with tarfile.open(self.archive_path, 'w:gz') as tarf:
230            if not is_dir:
231                tarf.add(f"{self.name}.py")
232            else:
233                for root, dirs, files in os.walk(self.name):
234                    for f in files:
235                        good_file = True
236                        fp = os.path.join(root, f)
237                        for pattern in patterns_to_ignore:
238                            if pattern in str(fp) or f.startswith('.'):
239                                good_file = False
240                                break
241                        if good_file:
242                            if debug:
243                                dprint(f"Adding '{fp}'...")
244                            tarf.add(fp)
245
246        ### clean up and change back to old directory
247        os.chdir(old_cwd)
248
249        ### change to 775 to avoid permissions issues with the API in a Docker container
250        self.archive_path.chmod(0o775)
251
252        if debug:
253            dprint(f"Created archive '{self.archive_path}'.")
254        return self.archive_path
255
256
257    def install(
258        self,
259        skip_deps: bool = False,
260        force: bool = False,
261        debug: bool = False,
262    ) -> SuccessTuple:
263        """
264        Extract a plugin's tar archive to the plugins directory.
265        
266        This function checks if the plugin is already installed and if the version is equal or
267        greater than the existing installation.
268
269        Parameters
270        ----------
271        skip_deps: bool, default False
272            If `True`, do not install dependencies.
273
274        force: bool, default False
275            If `True`, continue with installation, even if required packages fail to install.
276
277        debug: bool, default False
278            Verbosity toggle.
279
280        Returns
281        -------
282        A `SuccessTuple` of success (bool) and a message (str).
283
284        """
285        if self.full_name in _ongoing_installations:
286            return True, f"Already installing plugin '{self}'."
287        _ongoing_installations.add(self.full_name)
288
289        import meerschaum.config.paths as paths
290        from meerschaum.utils.warnings import warn, error
291        if debug:
292            from meerschaum.utils.debug import dprint
293        import tarfile
294        import re
295        import ast
296        from meerschaum.plugins import sync_plugins_symlinks
297        from meerschaum.utils.packages import attempt_import, reload_meerschaum
298        from meerschaum.utils.venv import init_venv
299        from meerschaum.utils.misc import safely_extract_tar
300        old_cwd = os.getcwd()
301        old_version = ''
302        new_version = ''
303        temp_dir = paths.PLUGINS_TEMP_RESOURCES_PATH / self.name
304        temp_dir.mkdir(exist_ok=True)
305
306        if not self.archive_path.exists():
307            return False, f"Missing archive file for plugin '{self}'."
308        if self.version is not None:
309            old_version = self.version
310            if debug:
311                dprint(f"Found existing version '{old_version}' for plugin '{self}'.")
312
313        if debug:
314            dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...")
315
316        try:
317            with tarfile.open(self.archive_path, 'r:gz') as tarf:
318                safely_extract_tar(tarf, temp_dir)
319        except Exception as e:
320            warn(e)
321            return False, f"Failed to extract plugin '{self.name}'."
322
323        ### search for version information
324        files = os.listdir(temp_dir)
325        
326        if str(files[0]) == self.name:
327            is_dir = True
328        elif str(files[0]) == self.name + '.py':
329            is_dir = False
330        else:
331            error(f"Unknown format encountered for plugin '{self}'.")
332
333        fpath = temp_dir / files[0]
334        if is_dir:
335            fpath = fpath / '__init__.py'
336
337        init_venv(self.name, debug=debug)
338        with open(fpath, 'r', encoding='utf-8') as f:
339            init_lines = f.readlines()
340        new_version = None
341        for line in init_lines:
342            if '__version__' not in line:
343                continue
344            version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip())
345            if not version_match:
346                continue
347            new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip())
348            break
349        if not new_version:
350            warn(
351                f"No `__version__` defined for plugin '{self}'. "
352                + "Assuming new version...",
353                stack = False,
354            )
355
356        packaging_version = attempt_import('packaging.version')
357        try:
358            is_new_version = (not new_version and not old_version) or (
359                packaging_version.parse(old_version) < packaging_version.parse(new_version)
360            )
361            is_same_version = new_version and old_version and (
362                packaging_version.parse(old_version) == packaging_version.parse(new_version)
363            )
364        except Exception:
365            is_new_version, is_same_version = True, False
366
367        ### Determine where to permanently store the new plugin.
368        plugin_installation_dir_path = paths.PLUGINS_DIR_PATHS[0]
369        for path in paths.PLUGINS_DIR_PATHS:
370            if not path.exists():
371                warn(f"Plugins path does not exist: {path}", stack=False)
372                continue
373
374            files_in_plugins_dir = os.listdir(path)
375            if (
376                self.name in files_in_plugins_dir
377                or
378                (self.name + '.py') in files_in_plugins_dir
379            ):
380                plugin_installation_dir_path = path
381                break
382
383        success_msg = (
384            f"Successfully installed plugin '{self}'"
385            + ("\n    (skipped dependencies)" if skip_deps else "")
386            + "."
387        )
388        success, abort = None, None
389
390        if is_same_version and not force:
391            success, msg = True, (
392                f"Plugin '{self}' is up-to-date (version {old_version}).\n" +
393                "    Install again with `-f` or `--force` to reinstall."
394            )
395            abort = True
396        elif is_new_version or force:
397            for src_dir, dirs, files in os.walk(temp_dir):
398                if success is not None:
399                    break
400                dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path))
401                if not os.path.exists(dst_dir):
402                    os.mkdir(dst_dir)
403                for f in files:
404                    src_file = os.path.join(src_dir, f)
405                    dst_file = os.path.join(dst_dir, f)
406                    if os.path.exists(dst_file):
407                        os.remove(dst_file)
408
409                    if debug:
410                        dprint(f"Moving '{src_file}' to '{dst_dir}'...")
411                    try:
412                        shutil.move(src_file, dst_dir)
413                    except Exception:
414                        success, msg = False, (
415                            f"Failed to install plugin '{self}': " +
416                            f"Could not move file '{src_file}' to '{dst_dir}'"
417                        )
418                        print(msg)
419                        break
420            if success is None:
421                success, msg = True, success_msg
422        else:
423            success, msg = False, (
424                f"Your installed version of plugin '{self}' ({old_version}) is higher than "
425                + f"attempted version {new_version}."
426            )
427
428        shutil.rmtree(temp_dir)
429        os.chdir(old_cwd)
430
431        ### Reload the plugin's module.
432        sync_plugins_symlinks(debug=debug)
433        if '_module' in self.__dict__:
434            del self.__dict__['_module']
435        init_venv(venv=self.name, force=True, debug=debug)
436        reload_meerschaum(debug=debug)
437
438        ### if we've already failed, return here
439        if not success or abort:
440            _ongoing_installations.remove(self.full_name)
441            return success, msg
442
443        ### attempt to install dependencies
444        dependencies_installed = skip_deps or self.install_dependencies(force=force, debug=debug)
445        if not dependencies_installed:
446            _ongoing_installations.remove(self.full_name)
447            return False, f"Failed to install dependencies for plugin '{self}'."
448
449        ### handling success tuple, bool, or other (typically None)
450        setup_tuple = self.setup(debug=debug)
451        if isinstance(setup_tuple, tuple):
452            if not setup_tuple[0]:
453                success, msg = setup_tuple
454        elif isinstance(setup_tuple, bool):
455            if not setup_tuple:
456                success, msg = False, (
457                    f"Failed to run post-install setup for plugin '{self}'." + '\n' +
458                    f"Check `setup()` in '{self.__file__}' for more information " +
459                    "(no error message provided)."
460                )
461            else:
462                success, msg = True, success_msg
463        elif setup_tuple is None:
464            success = True
465            msg = (
466                f"Post-install for plugin '{self}' returned None. " +
467                "Assuming plugin successfully installed."
468            )
469            warn(msg)
470        else:
471            success = False
472            msg = (
473                f"Post-install for plugin '{self}' returned unexpected value " +
474                f"of type '{type(setup_tuple)}': {setup_tuple}"
475            )
476
477        _ongoing_installations.remove(self.full_name)
478        _ = self.module
479        return success, msg
480
481
482    def remove_archive(
483        self,        
484        debug: bool = False
485    ) -> SuccessTuple:
486        """Remove a plugin's archive file."""
487        if not self.archive_path.exists():
488            return True, f"Archive file for plugin '{self}' does not exist."
489        try:
490            self.archive_path.unlink()
491        except Exception as e:
492            return False, f"Failed to remove archive for plugin '{self}':\n{e}"
493        return True, "Success"
494
495
496    def remove_venv(
497        self,        
498        debug: bool = False
499    ) -> SuccessTuple:
500        """Remove a plugin's virtual environment."""
501        if not self.venv_path.exists():
502            return True, f"Virtual environment for plugin '{self}' does not exist."
503        try:
504            shutil.rmtree(self.venv_path)
505        except Exception as e:
506            return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}"
507        return True, "Success"
508
509
510    def uninstall(self, debug: bool = False) -> SuccessTuple:
511        """
512        Remove a plugin, its virtual environment, and archive file.
513        """
514        from meerschaum.utils.packages import reload_meerschaum
515        from meerschaum.plugins import sync_plugins_symlinks
516        from meerschaum.utils.warnings import warn, info
517        warnings_thrown_count: int = 0
518        max_warnings: int = 3
519
520        if not self.is_installed():
521            info(
522                f"Plugin '{self.name}' doesn't seem to be installed.\n    "
523                + "Checking for artifacts...",
524                stack = False,
525            )
526        else:
527            real_path = pathlib.Path(os.path.realpath(self.__file__))
528            try:
529                if real_path.name == '__init__.py':
530                    shutil.rmtree(real_path.parent)
531                else:
532                    real_path.unlink()
533            except Exception as e:
534                warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False)
535                warnings_thrown_count += 1
536            else:
537                info(f"Removed source files for plugin '{self.name}'.")
538
539        if self.venv_path.exists():
540            success, msg = self.remove_venv(debug=debug)
541            if not success:
542                warn(msg, stack=False)
543                warnings_thrown_count += 1
544            else:
545                info(f"Removed virtual environment from plugin '{self.name}'.")
546
547        success = warnings_thrown_count < max_warnings
548        sync_plugins_symlinks(debug=debug)
549        self.deactivate_venv(force=True, debug=debug)
550        reload_meerschaum(debug=debug)
551        return success, (
552            f"Successfully uninstalled plugin '{self}'." if success
553            else f"Failed to uninstall plugin '{self}'."
554        )
555
556
557    def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]:
558        """
559        If exists, run the plugin's `setup()` function.
560
561        Parameters
562        ----------
563        *args: str
564            The positional arguments passed to the `setup()` function.
565            
566        debug: bool, default False
567            Verbosity toggle.
568
569        **kw: Any
570            The keyword arguments passed to the `setup()` function.
571
572        Returns
573        -------
574        A `SuccessTuple` or `bool` indicating success.
575
576        """
577        from meerschaum.utils.debug import dprint
578        import inspect
579        _setup = None
580        for name, fp in inspect.getmembers(self.module):
581            if name == 'setup' and inspect.isfunction(fp):
582                _setup = fp
583                break
584
585        ### assume success if no setup() is found (not necessary)
586        if _setup is None:
587            return True
588
589        sig = inspect.signature(_setup)
590        has_debug, has_kw = ('debug' in sig.parameters), False
591        for k, v in sig.parameters.items():
592            if '**' in str(v):
593                has_kw = True
594                break
595
596        _kw = {}
597        if has_kw:
598            _kw.update(kw)
599        if has_debug:
600            _kw['debug'] = debug
601
602        if debug:
603            dprint(f"Running setup for plugin '{self}'...")
604        try:
605            self.activate_venv(debug=debug)
606            return_tuple = _setup(*args, **_kw)
607            self.deactivate_venv(debug=debug)
608        except Exception as e:
609            return False, str(e)
610
611        if isinstance(return_tuple, tuple):
612            return return_tuple
613        if isinstance(return_tuple, bool):
614            return return_tuple, f"Setup for Plugin '{self.name}' did not return a message."
615        if return_tuple is None:
616            return False, f"Setup for Plugin '{self.name}' returned None."
617        return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"
618
619
620    def get_dependencies(
621        self,
622        debug: bool = False,
623    ) -> List[str]:
624        """
625        If the Plugin has specified dependencies in a list called `required`, return the list.
626        
627        **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages.
628        Meerschaum plugins may also specify connector keys for a repo after `'@'`.
629
630        Parameters
631        ----------
632        debug: bool, default False
633            Verbosity toggle.
634
635        Returns
636        -------
637        A list of required packages and plugins (str).
638
639        """
640        if '_required' in self.__dict__:
641            return self._required
642
643        ### If the plugin has not yet been imported,
644        ### infer the dependencies from the source text.
645        ### This is not super robust, and it doesn't feel right
646        ### having multiple versions of the logic.
647        ### This is necessary when determining the activation order
648        ### without having import the module.
649        ### For consistency's sake, the module-less method does not cache the requirements.
650        if self.__dict__.get('_module', None) is None:
651            file_path = self.__file__
652            if file_path is None:
653                return []
654            with open(file_path, 'r', encoding='utf-8') as f:
655                text = f.read()
656
657            if 'required' not in text:
658                return []
659
660            ### This has some limitations:
661            ### It relies on `required` being manually declared.
662            ### We lose the ability to dynamically alter the `required` list,
663            ### which is why we've kept the module-reliant method below.
664            import ast, re
665            ### NOTE: This technically would break 
666            ### if `required` was the very first line of the file.
667            req_start_match = re.search(r'\nrequired(:\s*)?.*=', text)
668            if not req_start_match:
669                return []
670            req_start = req_start_match.start()
671            equals_sign = req_start + text[req_start:].find('=')
672
673            ### Dependencies may have brackets within the strings, so push back the index.
674            first_opening_brace = equals_sign + 1 + text[equals_sign:].find('[')
675            if first_opening_brace == -1:
676                return []
677
678            next_closing_brace = equals_sign + 1 + text[equals_sign:].find(']')
679            if next_closing_brace == -1:
680                return []
681
682            start_ix = first_opening_brace + 1
683            end_ix = next_closing_brace
684
685            num_braces = 0
686            while True:
687                if '[' not in text[start_ix:end_ix]:
688                    break
689                num_braces += 1
690                start_ix = end_ix
691                end_ix += text[end_ix + 1:].find(']') + 1
692
693            req_end = end_ix + 1
694            req_text = (
695                text[(first_opening_brace-1):req_end]
696                .lstrip()
697                .replace('=', '', 1)
698                .lstrip()
699                .rstrip()
700            )
701            try:
702                required = ast.literal_eval(req_text)
703            except Exception as e:
704                warn(
705                    f"Unable to determine requirements for plugin '{self.name}' "
706                    + "without importing the module.\n"
707                    + "    This may be due to dynamically setting the global `required` list.\n"
708                    + f"    {e}"
709                )
710                return []
711            return required
712
713        import inspect
714        self.activate_venv(dependencies=False, debug=debug)
715        required = []
716        for name, val in inspect.getmembers(self.module):
717            if name == 'required':
718                required = val
719                break
720        self._required = required
721        self.deactivate_venv(dependencies=False, debug=debug)
722        return required
723
724
725    def get_required_plugins(self, debug: bool=False) -> List[mrsm.plugins.Plugin]:
726        """
727        Return a list of required Plugin objects.
728        """
729        from meerschaum.utils.warnings import warn
730        from meerschaum.config import get_config
731        from meerschaum._internal.static import STATIC_CONFIG
732        from meerschaum.connectors.parse import is_valid_connector_keys
733        plugins = []
734        _deps = self.get_dependencies(debug=debug)
735        sep = STATIC_CONFIG['plugins']['repo_separator']
736        plugin_names = [
737            _d[len('plugin:'):] for _d in _deps
738            if _d.startswith('plugin:') and len(_d) > len('plugin:')
739        ]
740        default_repo_keys = get_config('meerschaum', 'repository')
741        skipped_repo_keys = set()
742
743        for _plugin_name in plugin_names:
744            if sep in _plugin_name:
745                try:
746                    _plugin_name, _repo_keys = _plugin_name.split(sep)
747                except Exception:
748                    _repo_keys = default_repo_keys
749                    warn(
750                        f"Invalid repo keys for required plugin '{_plugin_name}'.\n    "
751                        + f"Will try to use '{_repo_keys}' instead.",
752                        stack = False,
753                    )
754            else:
755                _repo_keys = default_repo_keys
756
757            if _repo_keys in skipped_repo_keys:
758                continue
759
760            if not is_valid_connector_keys(_repo_keys):
761                warn(
762                    f"Invalid connector '{_repo_keys}'.\n"
763                    f"    Skipping required plugins from repository '{_repo_keys}'",
764                    stack=False,
765                )
766                continue
767
768            plugins.append(Plugin(_plugin_name, repo=_repo_keys))
769
770        return plugins
771
772
773    def get_required_packages(self, debug: bool=False) -> List[str]:
774        """
775        Return the required package names (excluding plugins).
776        """
777        _deps = self.get_dependencies(debug=debug)
778        return [_d for _d in _deps if not _d.startswith('plugin:')]
779
780
781    def activate_venv(
782        self,
783        dependencies: bool = True,
784        init_if_not_exists: bool = True,
785        debug: bool = False,
786        **kw
787    ) -> bool:
788        """
789        Activate the virtual environments for the plugin and its dependencies.
790
791        Parameters
792        ----------
793        dependencies: bool, default True
794            If `True`, activate the virtual environments for required plugins.
795
796        Returns
797        -------
798        A bool indicating success.
799        """
800        import meerschaum.config.paths as paths
801        from meerschaum.utils.venv import venv_target_path
802        from meerschaum.utils.packages import activate_venv
803        from meerschaum.utils.misc import make_symlink, is_symlink
804
805        if dependencies:
806            for plugin in self.get_required_plugins(debug=debug):
807                plugin.activate_venv(debug=debug, init_if_not_exists=init_if_not_exists, **kw)
808
809        vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True)
810        venv_meerschaum_path = vtp / 'meerschaum'
811
812        try:
813            success, msg = True, "Success"
814            if is_symlink(venv_meerschaum_path):
815                if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != paths.PACKAGE_ROOT_PATH:
816                    venv_meerschaum_path.unlink()
817                    success, msg = make_symlink(venv_meerschaum_path, paths.PACKAGE_ROOT_PATH)
818        except Exception as e:
819            success, msg = False, str(e)
820        if not success:
821            warn(
822                f"Unable to create symlink {venv_meerschaum_path} to {paths.PACKAGE_ROOT_PATH}:\n"
823                f"{msg}"
824            )
825
826        return activate_venv(self.name, init_if_not_exists=init_if_not_exists, debug=debug, **kw)
827
828
829    def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool:
830        """
831        Deactivate the virtual environments for the plugin and its dependencies.
832
833        Parameters
834        ----------
835        dependencies: bool, default True
836            If `True`, deactivate the virtual environments for required plugins.
837
838        Returns
839        -------
840        A bool indicating success.
841        """
842        from meerschaum.utils.packages import deactivate_venv
843        success = deactivate_venv(self.name, debug=debug, **kw)
844        if dependencies:
845            for plugin in self.get_required_plugins(debug=debug):
846                plugin.deactivate_venv(debug=debug, **kw)
847        return success
848
849
850    def install_dependencies(
851        self,
852        force: bool = False,
853        debug: bool = False,
854    ) -> bool:
855        """
856        If specified, install dependencies.
857        
858        **NOTE:** Dependencies that start with `'plugin:'` will be installed as
859        Meerschaum plugins from the same repository as this Plugin.
860        To install from a different repository, add the repo keys after `'@'`
861        (e.g. `'plugin:foo@api:bar'`).
862
863        Parameters
864        ----------
865        force: bool, default False
866            If `True`, continue with the installation, even if some
867            required packages fail to install.
868
869        debug: bool, default False
870            Verbosity toggle.
871
872        Returns
873        -------
874        A bool indicating success.
875        """
876        from meerschaum.utils.packages import pip_install, venv_contains_package
877        from meerschaum.utils.warnings import warn, info
878        _deps = self.get_dependencies(debug=debug)
879        if not _deps and self.requirements_file_path is None:
880            return True
881
882        plugins = self.get_required_plugins(debug=debug)
883        for _plugin in plugins:
884            if _plugin.name == self.name:
885                warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False)
886                continue
887            _success, _msg = _plugin.repo_connector.install_plugin(
888                _plugin.name, debug=debug, force=force
889            )
890            if not _success:
891                warn(
892                    f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'"
893                    + f" for plugin '{self.name}':\n" + _msg,
894                    stack = False,
895                )
896                if not force:
897                    warn(
898                        "Try installing with the `--force` flag to continue anyway.",
899                        stack = False,
900                    )
901                    return False
902                info(
903                    "Continuing with installation despite the failure "
904                    + "(careful, things might be broken!)...",
905                    icon = False
906                )
907
908
909        ### First step: parse `requirements.txt` if it exists.
910        if self.requirements_file_path is not None:
911            if not pip_install(
912                requirements_file_path=self.requirements_file_path,
913                venv=self.name, debug=debug
914            ):
915                warn(
916                    f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.",
917                    stack = False,
918                )
919                if not force:
920                    warn(
921                        "Try installing with `--force` to continue anyway.",
922                        stack = False,
923                    )
924                    return False
925                info(
926                    "Continuing with installation despite the failure "
927                    + "(careful, things might be broken!)...",
928                    icon = False
929                )
930
931
932        ### Don't reinstall packages that are already included in required plugins.
933        packages = []
934        _packages = self.get_required_packages(debug=debug)
935        accounted_for_packages = set()
936        for package_name in _packages:
937            for plugin in plugins:
938                if venv_contains_package(package_name, plugin.name):
939                    accounted_for_packages.add(package_name)
940                    break
941        packages = [pkg for pkg in _packages if pkg not in accounted_for_packages]
942
943        ### Attempt pip packages installation.
944        if packages:
945            for package in packages:
946                if not pip_install(package, venv=self.name, debug=debug):
947                    warn(
948                        f"Failed to install required package '{package}'"
949                        + f" for plugin '{self.name}'.",
950                        stack = False,
951                    )
952                    if not force:
953                        warn(
954                            "Try installing with `--force` to continue anyway.",
955                            stack = False,
956                        )
957                        return False
958                    info(
959                        "Continuing with installation despite the failure "
960                        + "(careful, things might be broken!)...",
961                        icon = False
962                    )
963        return True
964
965
966    @property
967    def full_name(self) -> str:
968        """
969        Include the repo keys with the plugin's name.
970        """
971        from meerschaum._internal.static import STATIC_CONFIG
972        sep = STATIC_CONFIG['plugins']['repo_separator']
973        return self.name + sep + str(self.repo_connector)
974
975
976    def __str__(self):
977        return self.name
978
979
980    def __repr__(self):
981        return f"Plugin('{self.name}', repo='{self.repo_connector}')"
982
983
984    def __del__(self):
985        pass

Handle packaging of Meerschaum plugins.

Plugin( name: str, version: Optional[str] = None, user_id: Optional[int] = None, required: Optional[List[str]] = None, attributes: Optional[Dict[str, Any]] = None, archive_path: Optional[pathlib.Path] = None, venv_path: Optional[pathlib.Path] = None, repo_connector: Optional[meerschaum.connectors.APIConnector] = None, repo: Union[meerschaum.connectors.APIConnector, str, NoneType] = None)
33    def __init__(
34        self,
35        name: str,
36        version: Optional[str] = None,
37        user_id: Optional[int] = None,
38        required: Optional[List[str]] = None,
39        attributes: Optional[Dict[str, Any]] = None,
40        archive_path: Optional[pathlib.Path] = None,
41        venv_path: Optional[pathlib.Path] = None,
42        repo_connector: Optional['mrsm.connectors.api.APIConnector'] = None,
43        repo: Union['mrsm.connectors.api.APIConnector', str, None] = None,
44    ):
45        import meerschaum.config.paths as paths
46        from meerschaum._internal.static import STATIC_CONFIG
47        sep = STATIC_CONFIG['plugins']['repo_separator']
48        _repo = None
49        if sep in name:
50            try:
51                name, _repo = name.split(sep)
52            except Exception as e:
53                error(f"Invalid plugin name: '{name}'")
54        self._repo_in_name = _repo
55
56        if attributes is None:
57            attributes = {}
58        self.name = name
59        self.attributes = attributes
60        self.user_id = user_id
61        self._version = version
62        if required:
63            self._required = required
64        self.archive_path = (
65            archive_path if archive_path is not None
66            else paths.PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz"
67        )
68        self.venv_path = (
69            venv_path if venv_path is not None
70            else paths.VIRTENV_RESOURCES_PATH / self.name
71        )
72        self._repo_connector = repo_connector
73        self._repo_keys = repo
name
attributes
user_id
archive_path
venv_path
repo_connector
76    @property
77    def repo_connector(self):
78        """
79        Return the repository connector for this plugin.
80        NOTE: This imports the `connectors` module, which imports certain plugin modules.
81        """
82        if self._repo_connector is None:
83            from meerschaum.connectors.parse import parse_repo_keys
84
85            repo_keys = self._repo_keys or self._repo_in_name
86            if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name:
87                error(
88                    f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'."
89                )
90            repo_connector = parse_repo_keys(repo_keys)
91            self._repo_connector = repo_connector
92        return self._repo_connector

Return the repository connector for this plugin. NOTE: This imports the connectors module, which imports certain plugin modules.

version
 95    @property
 96    def version(self):
 97        """
 98        Return the plugin's module version is defined (`__version__`) if it's defined.
 99        """
100        if self._version is None:
101            try:
102                self._version = self.module.__version__
103            except Exception as e:
104                self._version = None
105        return self._version

Return the plugin's module version is defined (__version__) if it's defined.

module
108    @property
109    def module(self):
110        """
111        Return the Python module of the underlying plugin.
112        """
113        if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None:
114            if self.__file__ is None:
115                return None
116
117            from meerschaum.plugins import import_plugins
118            self._module = import_plugins(str(self), warn=False)
119
120        return self._module

Return the Python module of the underlying plugin.

requirements_file_path: Optional[pathlib.Path]
148    @property
149    def requirements_file_path(self) -> Union[pathlib.Path, None]:
150        """
151        If a file named `requirements.txt` exists, return its path.
152        """
153        if self.__file__ is None:
154            return None
155        path = pathlib.Path(self.__file__).parent / 'requirements.txt'
156        if not path.exists():
157            return None
158        return path

If a file named requirements.txt exists, return its path.

def is_installed(self, **kw) -> bool:
161    def is_installed(self, **kw) -> bool:
162        """
163        Check whether a plugin is correctly installed.
164
165        Returns
166        -------
167        A `bool` indicating whether a plugin exists and is successfully imported.
168        """
169        return self.__file__ is not None

Check whether a plugin is correctly installed.

Returns
  • A bool indicating whether a plugin exists and is successfully imported.
def make_tar(self, debug: bool = False) -> pathlib.Path:
172    def make_tar(self, debug: bool = False) -> pathlib.Path:
173        """
174        Compress the plugin's source files into a `.tar.gz` archive and return the archive's path.
175
176        Parameters
177        ----------
178        debug: bool, default False
179            Verbosity toggle.
180
181        Returns
182        -------
183        A `pathlib.Path` to the archive file's path.
184
185        """
186        import tarfile, pathlib, subprocess, fnmatch
187        from meerschaum.utils.debug import dprint
188        from meerschaum.utils.packages import attempt_import
189        pathspec = attempt_import('pathspec', debug=debug)
190
191        if not self.__file__:
192            from meerschaum.utils.warnings import error
193            error(f"Could not find file for plugin '{self}'.")
194        if '__init__.py' in self.__file__ or os.path.isdir(self.__file__):
195            path = self.__file__.replace('__init__.py', '')
196            is_dir = True
197        else:
198            path = self.__file__
199            is_dir = False
200
201        old_cwd = os.getcwd()
202        real_parent_path = pathlib.Path(os.path.realpath(path)).parent
203        os.chdir(real_parent_path)
204
205        default_patterns_to_ignore = [
206            '.pyc',
207            '__pycache__/',
208            'eggs/',
209            '__pypackages__/',
210            '.git',
211        ]
212
213        def parse_gitignore() -> 'Set[str]':
214            gitignore_path = pathlib.Path(path) / '.gitignore'
215            if not gitignore_path.exists():
216                return set(default_patterns_to_ignore)
217            with open(gitignore_path, 'r', encoding='utf-8') as f:
218                gitignore_text = f.read()
219            return set(pathspec.PathSpec.from_lines(
220                pathspec.patterns.GitWildMatchPattern,
221                default_patterns_to_ignore + gitignore_text.splitlines()
222            ).match_tree(path))
223
224        patterns_to_ignore = parse_gitignore() if is_dir else set()
225
226        if debug:
227            dprint(f"Patterns to ignore:\n{patterns_to_ignore}")
228
229        with tarfile.open(self.archive_path, 'w:gz') as tarf:
230            if not is_dir:
231                tarf.add(f"{self.name}.py")
232            else:
233                for root, dirs, files in os.walk(self.name):
234                    for f in files:
235                        good_file = True
236                        fp = os.path.join(root, f)
237                        for pattern in patterns_to_ignore:
238                            if pattern in str(fp) or f.startswith('.'):
239                                good_file = False
240                                break
241                        if good_file:
242                            if debug:
243                                dprint(f"Adding '{fp}'...")
244                            tarf.add(fp)
245
246        ### clean up and change back to old directory
247        os.chdir(old_cwd)
248
249        ### change to 775 to avoid permissions issues with the API in a Docker container
250        self.archive_path.chmod(0o775)
251
252        if debug:
253            dprint(f"Created archive '{self.archive_path}'.")
254        return self.archive_path

Compress the plugin's source files into a .tar.gz archive and return the archive's path.

Parameters
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pathlib.Path to the archive file's path.
def install( self, skip_deps: bool = False, force: bool = False, debug: bool = False) -> Tuple[bool, str]:
257    def install(
258        self,
259        skip_deps: bool = False,
260        force: bool = False,
261        debug: bool = False,
262    ) -> SuccessTuple:
263        """
264        Extract a plugin's tar archive to the plugins directory.
265        
266        This function checks if the plugin is already installed and if the version is equal or
267        greater than the existing installation.
268
269        Parameters
270        ----------
271        skip_deps: bool, default False
272            If `True`, do not install dependencies.
273
274        force: bool, default False
275            If `True`, continue with installation, even if required packages fail to install.
276
277        debug: bool, default False
278            Verbosity toggle.
279
280        Returns
281        -------
282        A `SuccessTuple` of success (bool) and a message (str).
283
284        """
285        if self.full_name in _ongoing_installations:
286            return True, f"Already installing plugin '{self}'."
287        _ongoing_installations.add(self.full_name)
288
289        import meerschaum.config.paths as paths
290        from meerschaum.utils.warnings import warn, error
291        if debug:
292            from meerschaum.utils.debug import dprint
293        import tarfile
294        import re
295        import ast
296        from meerschaum.plugins import sync_plugins_symlinks
297        from meerschaum.utils.packages import attempt_import, reload_meerschaum
298        from meerschaum.utils.venv import init_venv
299        from meerschaum.utils.misc import safely_extract_tar
300        old_cwd = os.getcwd()
301        old_version = ''
302        new_version = ''
303        temp_dir = paths.PLUGINS_TEMP_RESOURCES_PATH / self.name
304        temp_dir.mkdir(exist_ok=True)
305
306        if not self.archive_path.exists():
307            return False, f"Missing archive file for plugin '{self}'."
308        if self.version is not None:
309            old_version = self.version
310            if debug:
311                dprint(f"Found existing version '{old_version}' for plugin '{self}'.")
312
313        if debug:
314            dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...")
315
316        try:
317            with tarfile.open(self.archive_path, 'r:gz') as tarf:
318                safely_extract_tar(tarf, temp_dir)
319        except Exception as e:
320            warn(e)
321            return False, f"Failed to extract plugin '{self.name}'."
322
323        ### search for version information
324        files = os.listdir(temp_dir)
325        
326        if str(files[0]) == self.name:
327            is_dir = True
328        elif str(files[0]) == self.name + '.py':
329            is_dir = False
330        else:
331            error(f"Unknown format encountered for plugin '{self}'.")
332
333        fpath = temp_dir / files[0]
334        if is_dir:
335            fpath = fpath / '__init__.py'
336
337        init_venv(self.name, debug=debug)
338        with open(fpath, 'r', encoding='utf-8') as f:
339            init_lines = f.readlines()
340        new_version = None
341        for line in init_lines:
342            if '__version__' not in line:
343                continue
344            version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip())
345            if not version_match:
346                continue
347            new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip())
348            break
349        if not new_version:
350            warn(
351                f"No `__version__` defined for plugin '{self}'. "
352                + "Assuming new version...",
353                stack = False,
354            )
355
356        packaging_version = attempt_import('packaging.version')
357        try:
358            is_new_version = (not new_version and not old_version) or (
359                packaging_version.parse(old_version) < packaging_version.parse(new_version)
360            )
361            is_same_version = new_version and old_version and (
362                packaging_version.parse(old_version) == packaging_version.parse(new_version)
363            )
364        except Exception:
365            is_new_version, is_same_version = True, False
366
367        ### Determine where to permanently store the new plugin.
368        plugin_installation_dir_path = paths.PLUGINS_DIR_PATHS[0]
369        for path in paths.PLUGINS_DIR_PATHS:
370            if not path.exists():
371                warn(f"Plugins path does not exist: {path}", stack=False)
372                continue
373
374            files_in_plugins_dir = os.listdir(path)
375            if (
376                self.name in files_in_plugins_dir
377                or
378                (self.name + '.py') in files_in_plugins_dir
379            ):
380                plugin_installation_dir_path = path
381                break
382
383        success_msg = (
384            f"Successfully installed plugin '{self}'"
385            + ("\n    (skipped dependencies)" if skip_deps else "")
386            + "."
387        )
388        success, abort = None, None
389
390        if is_same_version and not force:
391            success, msg = True, (
392                f"Plugin '{self}' is up-to-date (version {old_version}).\n" +
393                "    Install again with `-f` or `--force` to reinstall."
394            )
395            abort = True
396        elif is_new_version or force:
397            for src_dir, dirs, files in os.walk(temp_dir):
398                if success is not None:
399                    break
400                dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path))
401                if not os.path.exists(dst_dir):
402                    os.mkdir(dst_dir)
403                for f in files:
404                    src_file = os.path.join(src_dir, f)
405                    dst_file = os.path.join(dst_dir, f)
406                    if os.path.exists(dst_file):
407                        os.remove(dst_file)
408
409                    if debug:
410                        dprint(f"Moving '{src_file}' to '{dst_dir}'...")
411                    try:
412                        shutil.move(src_file, dst_dir)
413                    except Exception:
414                        success, msg = False, (
415                            f"Failed to install plugin '{self}': " +
416                            f"Could not move file '{src_file}' to '{dst_dir}'"
417                        )
418                        print(msg)
419                        break
420            if success is None:
421                success, msg = True, success_msg
422        else:
423            success, msg = False, (
424                f"Your installed version of plugin '{self}' ({old_version}) is higher than "
425                + f"attempted version {new_version}."
426            )
427
428        shutil.rmtree(temp_dir)
429        os.chdir(old_cwd)
430
431        ### Reload the plugin's module.
432        sync_plugins_symlinks(debug=debug)
433        if '_module' in self.__dict__:
434            del self.__dict__['_module']
435        init_venv(venv=self.name, force=True, debug=debug)
436        reload_meerschaum(debug=debug)
437
438        ### if we've already failed, return here
439        if not success or abort:
440            _ongoing_installations.remove(self.full_name)
441            return success, msg
442
443        ### attempt to install dependencies
444        dependencies_installed = skip_deps or self.install_dependencies(force=force, debug=debug)
445        if not dependencies_installed:
446            _ongoing_installations.remove(self.full_name)
447            return False, f"Failed to install dependencies for plugin '{self}'."
448
449        ### handling success tuple, bool, or other (typically None)
450        setup_tuple = self.setup(debug=debug)
451        if isinstance(setup_tuple, tuple):
452            if not setup_tuple[0]:
453                success, msg = setup_tuple
454        elif isinstance(setup_tuple, bool):
455            if not setup_tuple:
456                success, msg = False, (
457                    f"Failed to run post-install setup for plugin '{self}'." + '\n' +
458                    f"Check `setup()` in '{self.__file__}' for more information " +
459                    "(no error message provided)."
460                )
461            else:
462                success, msg = True, success_msg
463        elif setup_tuple is None:
464            success = True
465            msg = (
466                f"Post-install for plugin '{self}' returned None. " +
467                "Assuming plugin successfully installed."
468            )
469            warn(msg)
470        else:
471            success = False
472            msg = (
473                f"Post-install for plugin '{self}' returned unexpected value " +
474                f"of type '{type(setup_tuple)}': {setup_tuple}"
475            )
476
477        _ongoing_installations.remove(self.full_name)
478        _ = self.module
479        return success, msg

Extract a plugin's tar archive to the plugins directory.

This function checks if the plugin is already installed and if the version is equal or greater than the existing installation.

Parameters
  • skip_deps (bool, default False): If True, do not install dependencies.
  • force (bool, default False): If True, continue with installation, even if required packages fail to install.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A SuccessTuple of success (bool) and a message (str).
def remove_archive(self, debug: bool = False) -> Tuple[bool, str]:
482    def remove_archive(
483        self,        
484        debug: bool = False
485    ) -> SuccessTuple:
486        """Remove a plugin's archive file."""
487        if not self.archive_path.exists():
488            return True, f"Archive file for plugin '{self}' does not exist."
489        try:
490            self.archive_path.unlink()
491        except Exception as e:
492            return False, f"Failed to remove archive for plugin '{self}':\n{e}"
493        return True, "Success"

Remove a plugin's archive file.

def remove_venv(self, debug: bool = False) -> Tuple[bool, str]:
496    def remove_venv(
497        self,        
498        debug: bool = False
499    ) -> SuccessTuple:
500        """Remove a plugin's virtual environment."""
501        if not self.venv_path.exists():
502            return True, f"Virtual environment for plugin '{self}' does not exist."
503        try:
504            shutil.rmtree(self.venv_path)
505        except Exception as e:
506            return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}"
507        return True, "Success"

Remove a plugin's virtual environment.

def uninstall(self, debug: bool = False) -> Tuple[bool, str]:
510    def uninstall(self, debug: bool = False) -> SuccessTuple:
511        """
512        Remove a plugin, its virtual environment, and archive file.
513        """
514        from meerschaum.utils.packages import reload_meerschaum
515        from meerschaum.plugins import sync_plugins_symlinks
516        from meerschaum.utils.warnings import warn, info
517        warnings_thrown_count: int = 0
518        max_warnings: int = 3
519
520        if not self.is_installed():
521            info(
522                f"Plugin '{self.name}' doesn't seem to be installed.\n    "
523                + "Checking for artifacts...",
524                stack = False,
525            )
526        else:
527            real_path = pathlib.Path(os.path.realpath(self.__file__))
528            try:
529                if real_path.name == '__init__.py':
530                    shutil.rmtree(real_path.parent)
531                else:
532                    real_path.unlink()
533            except Exception as e:
534                warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False)
535                warnings_thrown_count += 1
536            else:
537                info(f"Removed source files for plugin '{self.name}'.")
538
539        if self.venv_path.exists():
540            success, msg = self.remove_venv(debug=debug)
541            if not success:
542                warn(msg, stack=False)
543                warnings_thrown_count += 1
544            else:
545                info(f"Removed virtual environment from plugin '{self.name}'.")
546
547        success = warnings_thrown_count < max_warnings
548        sync_plugins_symlinks(debug=debug)
549        self.deactivate_venv(force=True, debug=debug)
550        reload_meerschaum(debug=debug)
551        return success, (
552            f"Successfully uninstalled plugin '{self}'." if success
553            else f"Failed to uninstall plugin '{self}'."
554        )

Remove a plugin, its virtual environment, and archive file.

def setup( self, *args: str, debug: bool = False, **kw: Any) -> Union[Tuple[bool, str], bool]:
557    def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]:
558        """
559        If exists, run the plugin's `setup()` function.
560
561        Parameters
562        ----------
563        *args: str
564            The positional arguments passed to the `setup()` function.
565            
566        debug: bool, default False
567            Verbosity toggle.
568
569        **kw: Any
570            The keyword arguments passed to the `setup()` function.
571
572        Returns
573        -------
574        A `SuccessTuple` or `bool` indicating success.
575
576        """
577        from meerschaum.utils.debug import dprint
578        import inspect
579        _setup = None
580        for name, fp in inspect.getmembers(self.module):
581            if name == 'setup' and inspect.isfunction(fp):
582                _setup = fp
583                break
584
585        ### assume success if no setup() is found (not necessary)
586        if _setup is None:
587            return True
588
589        sig = inspect.signature(_setup)
590        has_debug, has_kw = ('debug' in sig.parameters), False
591        for k, v in sig.parameters.items():
592            if '**' in str(v):
593                has_kw = True
594                break
595
596        _kw = {}
597        if has_kw:
598            _kw.update(kw)
599        if has_debug:
600            _kw['debug'] = debug
601
602        if debug:
603            dprint(f"Running setup for plugin '{self}'...")
604        try:
605            self.activate_venv(debug=debug)
606            return_tuple = _setup(*args, **_kw)
607            self.deactivate_venv(debug=debug)
608        except Exception as e:
609            return False, str(e)
610
611        if isinstance(return_tuple, tuple):
612            return return_tuple
613        if isinstance(return_tuple, bool):
614            return return_tuple, f"Setup for Plugin '{self.name}' did not return a message."
615        if return_tuple is None:
616            return False, f"Setup for Plugin '{self.name}' returned None."
617        return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"

If exists, run the plugin's setup() function.

Parameters
  • *args (str): The positional arguments passed to the setup() function.
  • debug (bool, default False): Verbosity toggle.
  • **kw (Any): The keyword arguments passed to the setup() function.
Returns
  • A SuccessTuple or bool indicating success.
def get_dependencies(self, debug: bool = False) -> List[str]:
620    def get_dependencies(
621        self,
622        debug: bool = False,
623    ) -> List[str]:
624        """
625        If the Plugin has specified dependencies in a list called `required`, return the list.
626        
627        **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages.
628        Meerschaum plugins may also specify connector keys for a repo after `'@'`.
629
630        Parameters
631        ----------
632        debug: bool, default False
633            Verbosity toggle.
634
635        Returns
636        -------
637        A list of required packages and plugins (str).
638
639        """
640        if '_required' in self.__dict__:
641            return self._required
642
643        ### If the plugin has not yet been imported,
644        ### infer the dependencies from the source text.
645        ### This is not super robust, and it doesn't feel right
646        ### having multiple versions of the logic.
647        ### This is necessary when determining the activation order
648        ### without having import the module.
649        ### For consistency's sake, the module-less method does not cache the requirements.
650        if self.__dict__.get('_module', None) is None:
651            file_path = self.__file__
652            if file_path is None:
653                return []
654            with open(file_path, 'r', encoding='utf-8') as f:
655                text = f.read()
656
657            if 'required' not in text:
658                return []
659
660            ### This has some limitations:
661            ### It relies on `required` being manually declared.
662            ### We lose the ability to dynamically alter the `required` list,
663            ### which is why we've kept the module-reliant method below.
664            import ast, re
665            ### NOTE: This technically would break 
666            ### if `required` was the very first line of the file.
667            req_start_match = re.search(r'\nrequired(:\s*)?.*=', text)
668            if not req_start_match:
669                return []
670            req_start = req_start_match.start()
671            equals_sign = req_start + text[req_start:].find('=')
672
673            ### Dependencies may have brackets within the strings, so push back the index.
674            first_opening_brace = equals_sign + 1 + text[equals_sign:].find('[')
675            if first_opening_brace == -1:
676                return []
677
678            next_closing_brace = equals_sign + 1 + text[equals_sign:].find(']')
679            if next_closing_brace == -1:
680                return []
681
682            start_ix = first_opening_brace + 1
683            end_ix = next_closing_brace
684
685            num_braces = 0
686            while True:
687                if '[' not in text[start_ix:end_ix]:
688                    break
689                num_braces += 1
690                start_ix = end_ix
691                end_ix += text[end_ix + 1:].find(']') + 1
692
693            req_end = end_ix + 1
694            req_text = (
695                text[(first_opening_brace-1):req_end]
696                .lstrip()
697                .replace('=', '', 1)
698                .lstrip()
699                .rstrip()
700            )
701            try:
702                required = ast.literal_eval(req_text)
703            except Exception as e:
704                warn(
705                    f"Unable to determine requirements for plugin '{self.name}' "
706                    + "without importing the module.\n"
707                    + "    This may be due to dynamically setting the global `required` list.\n"
708                    + f"    {e}"
709                )
710                return []
711            return required
712
713        import inspect
714        self.activate_venv(dependencies=False, debug=debug)
715        required = []
716        for name, val in inspect.getmembers(self.module):
717            if name == 'required':
718                required = val
719                break
720        self._required = required
721        self.deactivate_venv(dependencies=False, debug=debug)
722        return required

If the Plugin has specified dependencies in a list called required, return the list.

NOTE: Dependecies which start with 'plugin:' are Meerschaum plugins, not pip packages. Meerschaum plugins may also specify connector keys for a repo after '@'.

Parameters
  • debug (bool, default False): Verbosity toggle.
Returns
  • A list of required packages and plugins (str).
def get_required_plugins(self, debug: bool = False) -> List[Plugin]:
725    def get_required_plugins(self, debug: bool=False) -> List[mrsm.plugins.Plugin]:
726        """
727        Return a list of required Plugin objects.
728        """
729        from meerschaum.utils.warnings import warn
730        from meerschaum.config import get_config
731        from meerschaum._internal.static import STATIC_CONFIG
732        from meerschaum.connectors.parse import is_valid_connector_keys
733        plugins = []
734        _deps = self.get_dependencies(debug=debug)
735        sep = STATIC_CONFIG['plugins']['repo_separator']
736        plugin_names = [
737            _d[len('plugin:'):] for _d in _deps
738            if _d.startswith('plugin:') and len(_d) > len('plugin:')
739        ]
740        default_repo_keys = get_config('meerschaum', 'repository')
741        skipped_repo_keys = set()
742
743        for _plugin_name in plugin_names:
744            if sep in _plugin_name:
745                try:
746                    _plugin_name, _repo_keys = _plugin_name.split(sep)
747                except Exception:
748                    _repo_keys = default_repo_keys
749                    warn(
750                        f"Invalid repo keys for required plugin '{_plugin_name}'.\n    "
751                        + f"Will try to use '{_repo_keys}' instead.",
752                        stack = False,
753                    )
754            else:
755                _repo_keys = default_repo_keys
756
757            if _repo_keys in skipped_repo_keys:
758                continue
759
760            if not is_valid_connector_keys(_repo_keys):
761                warn(
762                    f"Invalid connector '{_repo_keys}'.\n"
763                    f"    Skipping required plugins from repository '{_repo_keys}'",
764                    stack=False,
765                )
766                continue
767
768            plugins.append(Plugin(_plugin_name, repo=_repo_keys))
769
770        return plugins

Return a list of required Plugin objects.

def get_required_packages(self, debug: bool = False) -> List[str]:
773    def get_required_packages(self, debug: bool=False) -> List[str]:
774        """
775        Return the required package names (excluding plugins).
776        """
777        _deps = self.get_dependencies(debug=debug)
778        return [_d for _d in _deps if not _d.startswith('plugin:')]

Return the required package names (excluding plugins).

def activate_venv( self, dependencies: bool = True, init_if_not_exists: bool = True, debug: bool = False, **kw) -> bool:
781    def activate_venv(
782        self,
783        dependencies: bool = True,
784        init_if_not_exists: bool = True,
785        debug: bool = False,
786        **kw
787    ) -> bool:
788        """
789        Activate the virtual environments for the plugin and its dependencies.
790
791        Parameters
792        ----------
793        dependencies: bool, default True
794            If `True`, activate the virtual environments for required plugins.
795
796        Returns
797        -------
798        A bool indicating success.
799        """
800        import meerschaum.config.paths as paths
801        from meerschaum.utils.venv import venv_target_path
802        from meerschaum.utils.packages import activate_venv
803        from meerschaum.utils.misc import make_symlink, is_symlink
804
805        if dependencies:
806            for plugin in self.get_required_plugins(debug=debug):
807                plugin.activate_venv(debug=debug, init_if_not_exists=init_if_not_exists, **kw)
808
809        vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True)
810        venv_meerschaum_path = vtp / 'meerschaum'
811
812        try:
813            success, msg = True, "Success"
814            if is_symlink(venv_meerschaum_path):
815                if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != paths.PACKAGE_ROOT_PATH:
816                    venv_meerschaum_path.unlink()
817                    success, msg = make_symlink(venv_meerschaum_path, paths.PACKAGE_ROOT_PATH)
818        except Exception as e:
819            success, msg = False, str(e)
820        if not success:
821            warn(
822                f"Unable to create symlink {venv_meerschaum_path} to {paths.PACKAGE_ROOT_PATH}:\n"
823                f"{msg}"
824            )
825
826        return activate_venv(self.name, init_if_not_exists=init_if_not_exists, debug=debug, **kw)

Activate the virtual environments for the plugin and its dependencies.

Parameters
  • dependencies (bool, default True): If True, activate the virtual environments for required plugins.
Returns
  • A bool indicating success.
def deactivate_venv(self, dependencies: bool = True, debug: bool = False, **kw) -> bool:
829    def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool:
830        """
831        Deactivate the virtual environments for the plugin and its dependencies.
832
833        Parameters
834        ----------
835        dependencies: bool, default True
836            If `True`, deactivate the virtual environments for required plugins.
837
838        Returns
839        -------
840        A bool indicating success.
841        """
842        from meerschaum.utils.packages import deactivate_venv
843        success = deactivate_venv(self.name, debug=debug, **kw)
844        if dependencies:
845            for plugin in self.get_required_plugins(debug=debug):
846                plugin.deactivate_venv(debug=debug, **kw)
847        return success

Deactivate the virtual environments for the plugin and its dependencies.

Parameters
  • dependencies (bool, default True): If True, deactivate the virtual environments for required plugins.
Returns
  • A bool indicating success.
def install_dependencies(self, force: bool = False, debug: bool = False) -> bool:
850    def install_dependencies(
851        self,
852        force: bool = False,
853        debug: bool = False,
854    ) -> bool:
855        """
856        If specified, install dependencies.
857        
858        **NOTE:** Dependencies that start with `'plugin:'` will be installed as
859        Meerschaum plugins from the same repository as this Plugin.
860        To install from a different repository, add the repo keys after `'@'`
861        (e.g. `'plugin:foo@api:bar'`).
862
863        Parameters
864        ----------
865        force: bool, default False
866            If `True`, continue with the installation, even if some
867            required packages fail to install.
868
869        debug: bool, default False
870            Verbosity toggle.
871
872        Returns
873        -------
874        A bool indicating success.
875        """
876        from meerschaum.utils.packages import pip_install, venv_contains_package
877        from meerschaum.utils.warnings import warn, info
878        _deps = self.get_dependencies(debug=debug)
879        if not _deps and self.requirements_file_path is None:
880            return True
881
882        plugins = self.get_required_plugins(debug=debug)
883        for _plugin in plugins:
884            if _plugin.name == self.name:
885                warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False)
886                continue
887            _success, _msg = _plugin.repo_connector.install_plugin(
888                _plugin.name, debug=debug, force=force
889            )
890            if not _success:
891                warn(
892                    f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'"
893                    + f" for plugin '{self.name}':\n" + _msg,
894                    stack = False,
895                )
896                if not force:
897                    warn(
898                        "Try installing with the `--force` flag to continue anyway.",
899                        stack = False,
900                    )
901                    return False
902                info(
903                    "Continuing with installation despite the failure "
904                    + "(careful, things might be broken!)...",
905                    icon = False
906                )
907
908
909        ### First step: parse `requirements.txt` if it exists.
910        if self.requirements_file_path is not None:
911            if not pip_install(
912                requirements_file_path=self.requirements_file_path,
913                venv=self.name, debug=debug
914            ):
915                warn(
916                    f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.",
917                    stack = False,
918                )
919                if not force:
920                    warn(
921                        "Try installing with `--force` to continue anyway.",
922                        stack = False,
923                    )
924                    return False
925                info(
926                    "Continuing with installation despite the failure "
927                    + "(careful, things might be broken!)...",
928                    icon = False
929                )
930
931
932        ### Don't reinstall packages that are already included in required plugins.
933        packages = []
934        _packages = self.get_required_packages(debug=debug)
935        accounted_for_packages = set()
936        for package_name in _packages:
937            for plugin in plugins:
938                if venv_contains_package(package_name, plugin.name):
939                    accounted_for_packages.add(package_name)
940                    break
941        packages = [pkg for pkg in _packages if pkg not in accounted_for_packages]
942
943        ### Attempt pip packages installation.
944        if packages:
945            for package in packages:
946                if not pip_install(package, venv=self.name, debug=debug):
947                    warn(
948                        f"Failed to install required package '{package}'"
949                        + f" for plugin '{self.name}'.",
950                        stack = False,
951                    )
952                    if not force:
953                        warn(
954                            "Try installing with `--force` to continue anyway.",
955                            stack = False,
956                        )
957                        return False
958                    info(
959                        "Continuing with installation despite the failure "
960                        + "(careful, things might be broken!)...",
961                        icon = False
962                    )
963        return True

If specified, install dependencies.

NOTE: Dependencies that start with 'plugin:' will be installed as Meerschaum plugins from the same repository as this Plugin. To install from a different repository, add the repo keys after '@' (e.g. 'plugin:foo@api:bar').

Parameters
  • force (bool, default False): If True, continue with the installation, even if some required packages fail to install.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool indicating success.
full_name: str
966    @property
967    def full_name(self) -> str:
968        """
969        Include the repo keys with the plugin's name.
970        """
971        from meerschaum._internal.static import STATIC_CONFIG
972        sep = STATIC_CONFIG['plugins']['repo_separator']
973        return self.name + sep + str(self.repo_connector)

Include the repo keys with the plugin's name.

def make_action( function: Optional[Callable[[Any], Any]] = None, shell: bool = False, activate: bool = True, deactivate: bool = True, debug: bool = False, daemon: bool = True, skip_if_loaded: bool = True, _plugin_name: Optional[str] = None) -> Callable[[Any], Any]:
 60def make_action(
 61    function: Optional[Callable[[Any], Any]] = None,
 62    shell: bool = False,
 63    activate: bool = True,
 64    deactivate: bool = True,
 65    debug: bool = False,
 66    daemon: bool = True,
 67    skip_if_loaded: bool = True,
 68    _plugin_name: Optional[str] = None,
 69) -> Callable[[Any], Any]:
 70    """
 71    Make a function a Meerschaum action. Useful for plugins that are adding multiple actions.
 72    
 73    Parameters
 74    ----------
 75    function: Callable[[Any], Any]
 76        The function to become a Meerschaum action. Must accept all keyword arguments.
 77        
 78    shell: bool, default False
 79        Not used.
 80        
 81    Returns
 82    -------
 83    Another function (this is a decorator function).
 84
 85    Examples
 86    --------
 87    >>> from meerschaum.plugins import make_action
 88    >>>
 89    >>> @make_action
 90    ... def my_action(**kw):
 91    ...     print('foo')
 92    ...     return True, "Success"
 93    >>>
 94    """
 95    def _decorator(func: Callable[[Any], Any]) -> Callable[[Any], Any]:
 96        from meerschaum.actions import actions, _custom_actions_plugins, _plugins_actions
 97        if skip_if_loaded and func.__name__ in actions:
 98            return func
 99
100        import meerschaum.config.paths as paths
101        plugin_name = (
102            func.__name__.split(
103                f"{paths.PLUGINS_RESOURCES_PATH.stem}.",
104                maxsplit=1,
105            )[-1].split('.')[0]
106        )
107        plugin = Plugin(plugin_name) if plugin_name else None
108
109        if debug:
110            from meerschaum.utils.debug import dprint
111            dprint(
112                f"Adding action '{func.__name__}' from plugin "
113                f"'{plugin}'..."
114            )
115
116        actions[func.__name__] = func
117        _custom_actions_plugins[func.__name__] = plugin_name
118        if plugin_name not in _plugins_actions:
119            _plugins_actions[plugin_name] = []
120        _plugins_actions[plugin_name].append(func.__name__)
121        if not daemon:
122            _actions_daemon_enabled[func.__name__] = False
123        return func
124
125    if function is None:
126        return _decorator
127    return _decorator(function)

Make a function a Meerschaum action. Useful for plugins that are adding multiple actions.

Parameters
  • function (Callable[[Any], Any]): The function to become a Meerschaum action. Must accept all keyword arguments.
  • shell (bool, default False): Not used.
Returns
  • Another function (this is a decorator function).
Examples
>>> from meerschaum.plugins import make_action
>>>
>>> @make_action
... def my_action(**kw):
...     print('foo')
...     return True, "Success"
>>>
def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
310def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
311    """
312    Execute the function when initializing the Meerschaum API module.
313    Useful for lazy-loading heavy plugins only when the API is started,
314    such as when editing the `meerschaum.api.app` FastAPI app.
315    
316    The FastAPI app will be passed as the only parameter.
317    
318    Examples
319    --------
320    >>> from meerschaum.plugins import api_plugin
321    >>>
322    >>> @api_plugin
323    >>> def initialize_plugin(app):
324    ...     @app.get('/my/new/path')
325    ...     def new_path():
326    ...         return {'message': 'It works!'}
327    >>>
328    """
329    with _locks['_api_plugins']:
330        try:
331            if function.__module__ not in _api_plugins:
332                _api_plugins[function.__module__] = []
333            _api_plugins[function.__module__].append(function)
334        except Exception as e:
335            from meerschaum.utils.warnings import warn
336            warn(e)
337    return function

Execute the function when initializing the Meerschaum API module. Useful for lazy-loading heavy plugins only when the API is started, such as when editing the meerschaum.api.app FastAPI app.

The FastAPI app will be passed as the only parameter.

Examples
>>> from meerschaum.plugins import api_plugin
>>>
>>> @api_plugin
>>> def initialize_plugin(app):
...     @app.get('/my/new/path')
...     def new_path():
...         return {'message': 'It works!'}
>>>
def dash_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
294def dash_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
295    """
296    Execute the function when starting the Dash application.
297    """
298    with _locks['_dash_plugins']:
299        plugin_name = _get_parent_plugin()
300        try:
301            if plugin_name not in _dash_plugins:
302                _dash_plugins[plugin_name] = []
303            _dash_plugins[plugin_name].append(function)
304        except Exception as e:
305            from meerschaum.utils.warnings import warn
306            warn(e)
307    return function

Execute the function when starting the Dash application.

def web_page( page: Union[str, NoneType, Callable[[Any], Any]] = None, login_required: bool = True, skip_navbar: bool = False, page_group: Optional[str] = None, dark_theme: bool = True, **kwargs) -> Any:
208def web_page(
209    page: Union[str, None, Callable[[Any], Any]] = None,
210    login_required: bool = True,
211    skip_navbar: bool = False,
212    page_group: Optional[str] = None,
213    dark_theme: bool = True,
214    **kwargs
215) -> Any:
216    """
217    Quickly add pages to the dash application.
218
219    Parameters
220    ----------
221    dark_theme: bool, default True
222        If `True`, apply the Web Console's `dbc_dark` component theme to this page
223        (the default — most plugins want this). Set to `False` to opt out: the
224        `dbc_dark` class is removed from `<body>` while this page is active, so a
225        plugin's own styling applies without competing with the theme's overrides.
226        Note the global base Bootstrap theme (dark background) still applies, so an
227        opted-out page should paint its own background.
228
229    Examples
230    --------
231    >>> import meerschaum as mrsm
232    >>> from meerschaum.plugins import web_page
233    >>> html = mrsm.attempt_import('dash.html')
234    >>>
235    >>> @web_page('foo/bar', login_required=False)
236    >>> def foo_bar():
237    ...     return html.Div([html.H1("Hello, World!")])
238    >>>
239    """
240    page_str = None
241
242    def _decorator(_func: Callable[[Any], Any]) -> Callable[[Any], Any]:
243        nonlocal page_str, page_group
244
245        @functools.wraps(_func)
246        def wrapper(*_args, **_kwargs):
247            return _func(*_args, **_kwargs)
248
249        if page_str is None:
250            page_str = _func.__name__
251
252        page_str = page_str.lstrip('/').rstrip('/').strip()
253        if not page_str.startswith('dash'):
254            page_str = f'/dash/{page_str}'
255        page_key = (
256            ' '.join(
257                [
258                    word.capitalize()
259                    for word in (
260                        page_str.replace('/dash', '').lstrip('/').rstrip('/').strip()
261                        .replace('-', ' ').replace('_', ' ').split(' ')
262                    )
263                ]
264            )
265        )
266 
267        plugin_name = _get_parent_plugin()
268        page_group = page_group or plugin_name
269        if page_group not in _plugin_endpoints_to_pages:
270            _plugin_endpoints_to_pages[page_group] = {}
271        _plugin_endpoints_to_pages[page_group][page_str] = {
272            'function': _func,
273            'login_required': login_required,
274            'skip_navbar': skip_navbar,
275            'page_key': page_key,
276            'dark_theme': dark_theme,
277        }
278        if plugin_name not in _plugins_web_pages:
279            _plugins_web_pages[plugin_name] = []
280        _plugins_web_pages[plugin_name].append(_func)
281        return wrapper
282
283    if callable(page):
284        decorator_to_return = _decorator(page)
285        page_str = page.__name__
286    else:
287        decorator_to_return = _decorator
288        page_str = page
289
290    return decorator_to_return

Quickly add pages to the dash application.

Parameters
  • dark_theme (bool, default True): If True, apply the Web Console's dbc_dark component theme to this page (the default — most plugins want this). Set to False to opt out: the dbc_dark class is removed from <body> while this page is active, so a plugin's own styling applies without competing with the theme's overrides. Note the global base Bootstrap theme (dark background) still applies, so an opted-out page should paint its own background.
Examples
>>> import meerschaum as mrsm
>>> from meerschaum.plugins import web_page
>>> html = mrsm.attempt_import('dash.html')
>>>
>>> @web_page('foo/bar', login_required=False)
>>> def foo_bar():
...     return html.Div([html.H1("Hello, World!")])
>>>
def import_plugins( *plugins_to_import: Union[str, List[str], NoneType], warn: bool = True) -> "Union['ModuleType', Tuple['ModuleType', None]]":
526def import_plugins(
527    *plugins_to_import: Union[str, List[str], None],
528    warn: bool = True,
529) -> Union[
530    'ModuleType', Tuple['ModuleType', None]
531]:
532    """
533    Import the Meerschaum plugins directory.
534
535    Parameters
536    ----------
537    plugins_to_import: Union[str, List[str], None]
538        If provided, only import the specified plugins.
539        Otherwise import the entire plugins module. May be a string, list, or `None`.
540        Defaults to `None`.
541
542    Returns
543    -------
544    A module of list of modules, depening on the number of plugins provided.
545
546    """
547    import sys
548    import importlib
549    import meerschaum.config.paths as paths
550    from meerschaum.utils.misc import flatten_list
551    from meerschaum.utils.venv import is_venv_active, activate_venv, deactivate_venv, Venv
552    from meerschaum.utils.warnings import warn as _warn
553    plugins_to_import = list(plugins_to_import)
554    prepended_sys_path = False
555    with _locks['sys.path']:
556
557        ### Since plugins may depend on other plugins,
558        ### we need to activate the virtual environments for library plugins.
559        ### This logic exists in `Plugin.activate_venv()`,
560        ### but that code requires the plugin's module to already be imported.
561        ### It's not a guarantee of correct activation order,
562        ### e.g. if a library plugin pins a specific package and another 
563        plugins_names = get_plugins_names()
564        already_active_venvs = [is_venv_active(plugin_name) for plugin_name in plugins_names]
565
566        if not sys.path or sys.path[0] != str(paths.PLUGINS_RESOURCES_PATH.parent):
567            prepended_sys_path = True
568            sys.path.insert(0, str(paths.PLUGINS_RESOURCES_PATH.parent))
569
570        if not plugins_to_import:
571            for plugin_name in plugins_names:
572                activate_venv(plugin_name)
573            try:
574                imported_plugins = importlib.import_module(paths.PLUGINS_RESOURCES_PATH.stem)
575            except ImportError as e:
576                _warn(f"Failed to import the plugins module:\n    {e}")
577                import traceback
578                traceback.print_exc()
579                imported_plugins = None
580            for plugin_name in plugins_names:
581                if plugin_name in already_active_venvs:
582                    continue
583                deactivate_venv(plugin_name)
584
585        else:
586            imported_plugins = []
587            for plugin_name in flatten_list(plugins_to_import):
588                plugin = Plugin(plugin_name)
589                try:
590                    with Venv(plugin, init_if_not_exists=False):
591                        imported_plugins.append(
592                            importlib.import_module(
593                                f'{paths.PLUGINS_RESOURCES_PATH.stem}.{plugin_name}'
594                            )
595                        )
596                except Exception as e:
597                    _warn(
598                        f"Failed to import plugin '{plugin_name}':\n    "
599                        + f"{e}\n\nHere's a stacktrace:",
600                        stack = False,
601                    )
602                    from meerschaum.utils.formatting import get_console
603                    get_console().print_exception(
604                        suppress = [
605                            'meerschaum/plugins/__init__.py',
606                            importlib,
607                            importlib._bootstrap,
608                        ]
609                    )
610                    imported_plugins.append(None)
611
612        if imported_plugins is None and warn:
613            _warn("Failed to import plugins.", stacklevel=3)
614
615        if prepended_sys_path and str(paths.PLUGINS_RESOURCES_PATH.parent) in sys.path:
616            sys.path.remove(str(paths.PLUGINS_RESOURCES_PATH.parent))
617
618    if isinstance(imported_plugins, list):
619        return (imported_plugins[0] if len(imported_plugins) == 1 else tuple(imported_plugins))
620    return imported_plugins

Import the Meerschaum plugins directory.

Parameters
  • plugins_to_import (Union[str, List[str], None]): If provided, only import the specified plugins. Otherwise import the entire plugins module. May be a string, list, or None. Defaults to None.
Returns
  • A module of list of modules, depening on the number of plugins provided.
def from_plugin_import(plugin_import_name: str, *attrs: str) -> Any:
623def from_plugin_import(plugin_import_name: str, *attrs: str) -> Any:
624    """
625    Emulate the `from module import x` behavior.
626
627    Parameters
628    ----------
629    plugin_import_name: str
630        The import name of the plugin's module.
631        Separate submodules with '.' (e.g. 'compose.utils.pipes')
632
633    attrs: str
634        Names of the attributes to return.
635
636    Returns
637    -------
638    Objects from a plugin's submodule.
639    If multiple objects are provided, return a tuple.
640
641    Examples
642    --------
643    >>> init = from_plugin_import('compose.utils', 'init')
644    >>> with mrsm.Venv('compose'):
645    ...     cf = init()
646    >>> build_parent_pipe, get_defined_pipes = from_plugin_import(
647    ...     'compose.utils.pipes',
648    ...     'build_parent_pipe',
649    ...     'get_defined_pipes',
650    ... )
651    >>> parent_pipe = build_parent_pipe(cf)
652    >>> defined_pipes = get_defined_pipes(cf)
653    """
654    import importlib
655    import meerschaum.config.paths as paths
656    from meerschaum.utils.warnings import warn as _warn
657    if plugin_import_name.startswith('plugins.'):
658        plugin_import_name = plugin_import_name[len('plugins.'):]
659    plugin_import_parts = plugin_import_name.split('.')
660    plugin_root_name = plugin_import_parts[0]
661    plugin = mrsm.Plugin(plugin_root_name)
662
663    submodule_import_name = '.'.join(
664        [paths.PLUGINS_RESOURCES_PATH.stem]
665        + plugin_import_parts
666    )
667    if len(attrs) == 0:
668        raise ValueError(f"Provide which attributes to return from '{submodule_import_name}'.")
669
670    attrs_to_return = []
671    with mrsm.Venv(plugin):
672        if plugin.module is None:
673            raise ImportError(f"Unable to import plugin '{plugin}'.")
674
675        try:
676            submodule = importlib.import_module(submodule_import_name)
677        except ImportError as e:
678            _warn(
679                f"Failed to import plugin '{submodule_import_name}':\n    "
680                + f"{e}\n\nHere's a stacktrace:",
681                stack=False,
682            )
683            from meerschaum.utils.formatting import get_console
684            get_console().print_exception(
685                suppress=[
686                    'meerschaum/plugins/__init__.py',
687                    importlib,
688                    importlib._bootstrap,
689                ]
690            )
691            return None
692
693        for attr in attrs:
694            try:
695                attrs_to_return.append(getattr(submodule, attr))
696            except Exception:
697                _warn(f"Failed to access '{attr}' from '{submodule_import_name}'.")
698                attrs_to_return.append(None)
699        
700        if len(attrs) == 1:
701            return attrs_to_return[0]
702
703        return tuple(attrs_to_return)

Emulate the from module import x behavior.

Parameters
  • plugin_import_name (str): The import name of the plugin's module. Separate submodules with '.' (e.g. 'compose.utils.pipes')
  • attrs (str): Names of the attributes to return.
Returns
  • Objects from a plugin's submodule.
  • If multiple objects are provided, return a tuple.
Examples
>>> init = from_plugin_import('compose.utils', 'init')
>>> with mrsm.Venv('compose'):
...     cf = init()
>>> build_parent_pipe, get_defined_pipes = from_plugin_import(
...     'compose.utils.pipes',
...     'build_parent_pipe',
...     'get_defined_pipes',
... )
>>> parent_pipe = build_parent_pipe(cf)
>>> defined_pipes = get_defined_pipes(cf)
def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
888def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
889    """
890    Reload plugins back into memory.
891
892    Parameters
893    ----------
894    plugins: Optional[List[str]], default None
895        The plugins to reload. `None` will reload all plugins.
896
897    """
898    global _synced_symlinks
899    unload_plugins(plugins, debug=debug)
900    _synced_symlinks = 0
901    sync_plugins_symlinks(debug=debug)
902    load_plugins(skip_if_loaded=False, debug=debug)

Reload plugins back into memory.

Parameters
  • plugins (Optional[List[str]], default None): The plugins to reload. None will reload all plugins.
def get_plugins( *to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]:
905def get_plugins(*to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]:
906    """
907    Return a list of `Plugin` objects.
908
909    Parameters
910    ----------
911    to_load:
912        If specified, only load specific plugins.
913        Otherwise return all plugins.
914
915    try_import: bool, default True
916        If `True`, allow for plugins to be imported.
917    """
918    import meerschaum.config.paths as paths
919    import os
920    sync_plugins_symlinks()
921    _plugins = [
922        Plugin(name)
923        for name in (
924            to_load or [
925                (
926                    name if (paths.PLUGINS_RESOURCES_PATH / name).is_dir()
927                    else name[:-3]
928                )
929                for name in os.listdir(paths.PLUGINS_RESOURCES_PATH)
930                if name != '__init__.py'
931            ]
932        )
933    ]
934    plugins = tuple(plugin for plugin in _plugins if plugin.is_installed(try_import=try_import))
935    if len(to_load) == 1:
936        if len(plugins) == 0:
937            raise ValueError(f"Plugin '{to_load[0]}' is not installed.")
938        return plugins[0]
939    return plugins

Return a list of Plugin objects.

Parameters
  • to_load:: If specified, only load specific plugins. Otherwise return all plugins.
  • try_import (bool, default True): If True, allow for plugins to be imported.
def get_data_plugins() -> List[Plugin]:
956def get_data_plugins() -> List[Plugin]:
957    """
958    Only return the modules of plugins with either `fetch()` or `sync()` functions.
959    """
960    import inspect
961    plugins = get_plugins()
962    data_names = {'sync', 'fetch'}
963    data_plugins = []
964    for plugin in plugins:
965        for name, ob in inspect.getmembers(plugin.module):
966            if not inspect.isfunction(ob):
967                continue
968            if name not in data_names:
969                continue
970            data_plugins.append(plugin)
971    return data_plugins

Only return the modules of plugins with either fetch() or sync() functions.

def add_plugin_argument(*args, **kwargs) -> None:
974def add_plugin_argument(*args, **kwargs) -> None:
975    """
976    Add argparse arguments under the 'Plugins options' group.
977    Takes the same parameters as the regular argparse `add_argument()` function.
978
979    Examples
980    --------
981    >>> add_plugin_argument('--foo', type=int, help="This is my help text!")
982    >>> 
983    """
984    from meerschaum._internal.arguments._parser import groups, _seen_plugin_args, parser
985    from meerschaum.utils.warnings import warn
986    _parent_plugin_name = _get_parent_plugin()
987    title = f"Plugin '{_parent_plugin_name}' options" if _parent_plugin_name else 'Custom options'
988    group_key = 'plugin_' + (_parent_plugin_name or '')
989    if group_key not in groups:
990        groups[group_key] = parser.add_argument_group(
991            title = title,
992        )
993        _seen_plugin_args[group_key] = set()
994    try:
995        if str(args) not in _seen_plugin_args[group_key]:
996            groups[group_key].add_argument(*args, **kwargs)
997            _seen_plugin_args[group_key].add(str(args))
998    except Exception as e:
999        warn(e)

Add argparse arguments under the 'Plugins options' group. Takes the same parameters as the regular argparse add_argument() function.

Examples
>>> add_plugin_argument('--foo', type=int, help="This is my help text!")
>>>
def pre_sync_hook(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
130def pre_sync_hook(
131    function: Callable[[Any], Any],
132) -> Callable[[Any], Any]:
133    """
134    Register a function as a sync hook to be executed right before sync.
135    
136    Parameters
137    ----------
138    function: Callable[[Any], Any]
139        The function to execute right before a sync.
140        
141    Returns
142    -------
143    Another function (this is a decorator function).
144
145    Examples
146    --------
147    >>> from meerschaum.plugins import pre_sync_hook
148    >>>
149    >>> @pre_sync_hook
150    ... def log_sync(pipe, **kwargs):
151    ...     print(f"About to sync {pipe} with kwargs:\n{kwargs}.")
152    >>>
153    """
154    with _locks['_pre_sync_hooks']:
155        plugin_name = _get_parent_plugin()
156        try:
157            if plugin_name not in _pre_sync_hooks:
158                _pre_sync_hooks[plugin_name] = []
159            _pre_sync_hooks[plugin_name].append(function)
160        except Exception as e:
161            from meerschaum.utils.warnings import warn
162            warn(e)
163    return function

Register a function as a sync hook to be executed right before sync.

Parameters
----------
function: Callable[[Any], Any]
    The function to execute right before a sync.

Returns
-------
Another function (this is a decorator function).

Examples
--------
>>> from meerschaum.plugins import pre_sync_hook
>>>
>>> @pre_sync_hook
... def log_sync(pipe, **kwargs):
...     print(f"About to sync {pipe} with kwargs:

{kwargs}.")

>

def post_sync_hook(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
166def post_sync_hook(
167    function: Callable[[Any], Any],
168) -> Callable[[Any], Any]:
169    """
170    Register a function as a sync hook to be executed upon completion of a sync.
171    
172    Parameters
173    ----------
174    function: Callable[[Any], Any]
175        The function to execute upon completion of a sync.
176        
177    Returns
178    -------
179    Another function (this is a decorator function).
180
181    Examples
182    --------
183    >>> from meerschaum.plugins import post_sync_hook
184    >>> from meerschaum.utils.misc import interval_str
185    >>> from datetime import timedelta
186    >>>
187    >>> @post_sync_hook
188    ... def log_sync(pipe, success_tuple, duration=None, **kwargs):
189    ...     duration_delta = timedelta(seconds=duration)
190    ...     duration_text = interval_str(duration_delta)
191    ...     print(f"It took {duration_text} to sync {pipe}.")
192    >>>
193    """
194    with _locks['_post_sync_hooks']:
195        try:
196            plugin_name = _get_parent_plugin()
197            if plugin_name not in _post_sync_hooks:
198                _post_sync_hooks[plugin_name] = []
199            _post_sync_hooks[plugin_name].append(function)
200        except Exception as e:
201            from meerschaum.utils.warnings import warn
202            warn(e)
203    return function

Register a function as a sync hook to be executed upon completion of a sync.

Parameters
  • function (Callable[[Any], Any]): The function to execute upon completion of a sync.
Returns
  • Another function (this is a decorator function).
Examples
>>> from meerschaum.plugins import post_sync_hook
>>> from meerschaum.utils.misc import interval_str
>>> from datetime import timedelta
>>>
>>> @post_sync_hook
... def log_sync(pipe, success_tuple, duration=None, **kwargs):
...     duration_delta = timedelta(seconds=duration)
...     duration_text = interval_str(duration_delta)
...     print(f"It took {duration_text} to sync {pipe}.")
>>>