meerschaum.plugins

Expose plugin management APIs from the meerschaum.plugins module.

   1#! /usr/bin/env python
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4
   5"""
   6Expose plugin management APIs from the `meerschaum.plugins` module.
   7"""
   8
   9from __future__ import annotations
  10
  11import pathlib
  12import functools
  13from collections import defaultdict
  14
  15import meerschaum as mrsm
  16from meerschaum.utils.typing import Callable, Any, Union, Optional, Dict, List, Tuple
  17from meerschaum.utils.threading import RLock
  18from meerschaum.core.Plugin import Plugin
  19
  20_api_plugins: Dict[str, List[Callable[['fastapi.App'], Any]]] = {}
  21_pre_sync_hooks: Dict[Union[str, None], List[Callable[[Any], Any]]] = {}
  22_post_sync_hooks: Dict[Union[str, None], List[Callable[[Any], Any]]] = {}
  23_actions_daemon_enabled: Dict[str, bool] = {}
  24_locks = {
  25    '_api_plugins': RLock(),
  26    '_dash_plugins': RLock(),
  27    '_pre_sync_hooks': RLock(),
  28    '_post_sync_hooks': RLock(),
  29    '_actions_daemon_enabled': RLock(),
  30    '__path__': RLock(),
  31    'sys.path': RLock(),
  32    'internal_plugins': RLock(),
  33    '_synced_symlinks': RLock(),
  34    'PLUGINS_INTERNAL_LOCK_PATH': RLock(),
  35}
  36__all__ = (
  37    "Plugin",
  38    "make_action",
  39    "api_plugin",
  40    "dash_plugin",
  41    "web_page",
  42    "import_plugins",
  43    "from_plugin_import",
  44    "reload_plugins",
  45    "get_plugins",
  46    "get_data_plugins",
  47    "add_plugin_argument",
  48    "pre_sync_hook",
  49    "post_sync_hook",
  50)
  51__pdoc__ = {
  52    'venvs': False,
  53    'data': False,
  54    'stack': False,
  55    'plugins': False,
  56}
  57
  58
  59def make_action(
  60    function: Optional[Callable[[Any], Any]] = None,
  61    shell: bool = False,
  62    activate: bool = True,
  63    deactivate: bool = True,
  64    debug: bool = False,
  65    daemon: bool = True,
  66    skip_if_loaded: bool = True,
  67    _plugin_name: Optional[str] = None,
  68) -> Callable[[Any], Any]:
  69    """
  70    Make a function a Meerschaum action. Useful for plugins that are adding multiple actions.
  71    
  72    Parameters
  73    ----------
  74    function: Callable[[Any], Any]
  75        The function to become a Meerschaum action. Must accept all keyword arguments.
  76        
  77    shell: bool, default False
  78        Not used.
  79        
  80    Returns
  81    -------
  82    Another function (this is a decorator function).
  83
  84    Examples
  85    --------
  86    >>> from meerschaum.plugins import make_action
  87    >>>
  88    >>> @make_action
  89    ... def my_action(**kw):
  90    ...     print('foo')
  91    ...     return True, "Success"
  92    >>>
  93    """
  94    def _decorator(func: Callable[[Any], Any]) -> Callable[[Any], Any]:
  95        from meerschaum.actions import actions, _custom_actions_plugins, _plugins_actions
  96        if skip_if_loaded and func.__name__ in actions:
  97            return func
  98
  99        import meerschaum.config.paths as paths
 100        plugin_name = (
 101            func.__name__.split(
 102                f"{paths.PLUGINS_RESOURCES_PATH.stem}.",
 103                maxsplit=1,
 104            )[-1].split('.')[0]
 105        )
 106        plugin = Plugin(plugin_name) if plugin_name else None
 107
 108        if debug:
 109            from meerschaum.utils.debug import dprint
 110            dprint(
 111                f"Adding action '{func.__name__}' from plugin "
 112                f"'{plugin}'..."
 113            )
 114
 115        actions[func.__name__] = func
 116        _custom_actions_plugins[func.__name__] = plugin_name
 117        if plugin_name not in _plugins_actions:
 118            _plugins_actions[plugin_name] = []
 119        _plugins_actions[plugin_name].append(func.__name__)
 120        if not daemon:
 121            _actions_daemon_enabled[func.__name__] = False
 122        return func
 123
 124    if function is None:
 125        return _decorator
 126    return _decorator(function)
 127
 128
 129def pre_sync_hook(
 130    function: Callable[[Any], Any],
 131) -> Callable[[Any], Any]:
 132    """
 133    Register a function as a sync hook to be executed right before sync.
 134    
 135    Parameters
 136    ----------
 137    function: Callable[[Any], Any]
 138        The function to execute right before a sync.
 139        
 140    Returns
 141    -------
 142    Another function (this is a decorator function).
 143
 144    Examples
 145    --------
 146    >>> from meerschaum.plugins import pre_sync_hook
 147    >>>
 148    >>> @pre_sync_hook
 149    ... def log_sync(pipe, **kwargs):
 150    ...     print(f"About to sync {pipe} with kwargs:\n{kwargs}.")
 151    >>>
 152    """
 153    with _locks['_pre_sync_hooks']:
 154        plugin_name = _get_parent_plugin()
 155        try:
 156            if plugin_name not in _pre_sync_hooks:
 157                _pre_sync_hooks[plugin_name] = []
 158            _pre_sync_hooks[plugin_name].append(function)
 159        except Exception as e:
 160            from meerschaum.utils.warnings import warn
 161            warn(e)
 162    return function
 163
 164
 165def post_sync_hook(
 166    function: Callable[[Any], Any],
 167) -> Callable[[Any], Any]:
 168    """
 169    Register a function as a sync hook to be executed upon completion of a sync.
 170    
 171    Parameters
 172    ----------
 173    function: Callable[[Any], Any]
 174        The function to execute upon completion of a sync.
 175        
 176    Returns
 177    -------
 178    Another function (this is a decorator function).
 179
 180    Examples
 181    --------
 182    >>> from meerschaum.plugins import post_sync_hook
 183    >>> from meerschaum.utils.misc import interval_str
 184    >>> from datetime import timedelta
 185    >>>
 186    >>> @post_sync_hook
 187    ... def log_sync(pipe, success_tuple, duration=None, **kwargs):
 188    ...     duration_delta = timedelta(seconds=duration)
 189    ...     duration_text = interval_str(duration_delta)
 190    ...     print(f"It took {duration_text} to sync {pipe}.")
 191    >>>
 192    """
 193    with _locks['_post_sync_hooks']:
 194        try:
 195            plugin_name = _get_parent_plugin()
 196            if plugin_name not in _post_sync_hooks:
 197                _post_sync_hooks[plugin_name] = []
 198            _post_sync_hooks[plugin_name].append(function)
 199        except Exception as e:
 200            from meerschaum.utils.warnings import warn
 201            warn(e)
 202    return function
 203
 204
 205_plugin_endpoints_to_pages = {}
 206_plugins_web_pages = {}
 207def web_page(
 208    page: Union[str, None, Callable[[Any], Any]] = None,
 209    login_required: bool = True,
 210    skip_navbar: bool = False,
 211    page_group: Optional[str] = None,
 212    **kwargs
 213) -> Any:
 214    """
 215    Quickly add pages to the dash application.
 216
 217    Examples
 218    --------
 219    >>> import meerschaum as mrsm
 220    >>> from meerschaum.plugins import web_page
 221    >>> html = mrsm.attempt_import('dash.html')
 222    >>> 
 223    >>> @web_page('foo/bar', login_required=False)
 224    >>> def foo_bar():
 225    ...     return html.Div([html.H1("Hello, World!")])
 226    >>> 
 227    """
 228    page_str = None
 229
 230    def _decorator(_func: Callable[[Any], Any]) -> Callable[[Any], Any]:
 231        nonlocal page_str, page_group
 232
 233        @functools.wraps(_func)
 234        def wrapper(*_args, **_kwargs):
 235            return _func(*_args, **_kwargs)
 236
 237        if page_str is None:
 238            page_str = _func.__name__
 239
 240        page_str = page_str.lstrip('/').rstrip('/').strip()
 241        if not page_str.startswith('dash'):
 242            page_str = f'/dash/{page_str}'
 243        page_key = (
 244            ' '.join(
 245                [
 246                    word.capitalize()
 247                    for word in (
 248                        page_str.replace('/dash', '').lstrip('/').rstrip('/').strip()
 249                        .replace('-', ' ').replace('_', ' ').split(' ')
 250                    )
 251                ]
 252            )
 253        )
 254 
 255        plugin_name = _get_parent_plugin()
 256        page_group = page_group or plugin_name
 257        if page_group not in _plugin_endpoints_to_pages:
 258            _plugin_endpoints_to_pages[page_group] = {}
 259        _plugin_endpoints_to_pages[page_group][page_str] = {
 260            'function': _func,
 261            'login_required': login_required,
 262            'skip_navbar': skip_navbar,
 263            'page_key': page_key,
 264        }
 265        if plugin_name not in _plugins_web_pages:
 266            _plugins_web_pages[plugin_name] = []
 267        _plugins_web_pages[plugin_name].append(_func)
 268        return wrapper
 269
 270    if callable(page):
 271        decorator_to_return = _decorator(page)
 272        page_str = page.__name__
 273    else:
 274        decorator_to_return = _decorator
 275        page_str = page
 276
 277    return decorator_to_return
 278
 279
 280_dash_plugins = {}
 281def dash_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
 282    """
 283    Execute the function when starting the Dash application.
 284    """
 285    with _locks['_dash_plugins']:
 286        plugin_name = _get_parent_plugin()
 287        try:
 288            if plugin_name not in _dash_plugins:
 289                _dash_plugins[plugin_name] = []
 290            _dash_plugins[plugin_name].append(function)
 291        except Exception as e:
 292            from meerschaum.utils.warnings import warn
 293            warn(e)
 294    return function
 295
 296
 297def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
 298    """
 299    Execute the function when initializing the Meerschaum API module.
 300    Useful for lazy-loading heavy plugins only when the API is started,
 301    such as when editing the `meerschaum.api.app` FastAPI app.
 302    
 303    The FastAPI app will be passed as the only parameter.
 304    
 305    Examples
 306    --------
 307    >>> from meerschaum.plugins import api_plugin
 308    >>>
 309    >>> @api_plugin
 310    >>> def initialize_plugin(app):
 311    ...     @app.get('/my/new/path')
 312    ...     def new_path():
 313    ...         return {'message': 'It works!'}
 314    >>>
 315    """
 316    with _locks['_api_plugins']:
 317        try:
 318            if function.__module__ not in _api_plugins:
 319                _api_plugins[function.__module__] = []
 320            _api_plugins[function.__module__].append(function)
 321        except Exception as e:
 322            from meerschaum.utils.warnings import warn
 323            warn(e)
 324    return function
 325
 326
 327_synced_symlinks: int = 0
 328_injected_plugin_symlinks = defaultdict(lambda: 0)
 329def sync_plugins_symlinks(debug: bool = False, warn: bool = True) -> None:
 330    """
 331    Update the plugins' internal symlinks. 
 332    """
 333    from meerschaum.utils.warnings import error, warn as _warn, dprint
 334    global _synced_symlinks
 335    with _locks['_synced_symlinks']:
 336        if _synced_symlinks > 1:
 337            if debug:
 338                dprint("Skip syncing symlinks...")
 339            return
 340
 341    import os
 342    import pathlib
 343    import time
 344    from collections import defaultdict
 345    from meerschaum.utils.misc import flatten_list, make_symlink, is_symlink
 346    from meerschaum._internal.static import STATIC_CONFIG
 347    import meerschaum.config.paths as paths
 348
 349    ### If the lock file exists, sleep for up to a second or until it's removed before continuing.
 350    with _locks['PLUGINS_INTERNAL_LOCK_PATH']:
 351        if paths.PLUGINS_INTERNAL_LOCK_PATH.exists():
 352            lock_sleep_total = STATIC_CONFIG['plugins']['lock_sleep_total']
 353            lock_sleep_increment = STATIC_CONFIG['plugins']['lock_sleep_increment']
 354            lock_start = time.perf_counter()
 355            while (
 356                (time.perf_counter() - lock_start) < lock_sleep_total
 357            ):
 358                time.sleep(lock_sleep_increment)
 359                if not paths.PLUGINS_INTERNAL_LOCK_PATH.exists():
 360                    break
 361                try:
 362                    paths.PLUGINS_INTERNAL_LOCK_PATH.unlink()
 363                except Exception as e:
 364                    if warn:
 365                        _warn(
 366                            f"Error while removing lockfile {paths.PLUGINS_INTERNAL_LOCK_PATH}:\n"
 367                            f"{e}"
 368                        )
 369                    break
 370
 371        ### Begin locking from other processes.
 372        try:
 373            paths.PLUGINS_INTERNAL_LOCK_PATH.touch()
 374        except Exception as e:
 375            if warn:
 376                _warn(f"Unable to create lockfile {paths.PLUGINS_INTERNAL_LOCK_PATH}:\n{e}")
 377
 378    with _locks['internal_plugins']:
 379
 380        try:
 381            from importlib.metadata import entry_points
 382        except ImportError:
 383            importlib_metadata = mrsm.attempt_import('importlib_metadata', lazy=False)
 384            entry_points = importlib_metadata.entry_points
 385
 386        ### NOTE: Allow plugins to be installed via `pip`.
 387        packaged_plugin_paths = []
 388        try:
 389            discovered_packaged_plugins_eps = entry_points(group='meerschaum.plugins')
 390        except TypeError:
 391            discovered_packaged_plugins_eps = []
 392
 393        for ep in discovered_packaged_plugins_eps:
 394            module_name = ep.name
 395            for package_file_path in ep.dist.files:
 396                if package_file_path.suffix != '.py':
 397                    continue
 398                if str(package_file_path) == f'{module_name}.py':
 399                    packaged_plugin_paths.append(package_file_path.locate())
 400                elif str(package_file_path) == f'{module_name}/__init__.py':
 401                    packaged_plugin_paths.append(package_file_path.locate().parent)
 402
 403        if is_symlink(paths.PLUGINS_RESOURCES_PATH) or not paths.PLUGINS_RESOURCES_PATH.exists():
 404            try:
 405                paths.PLUGINS_RESOURCES_PATH.unlink()
 406            except Exception:
 407                pass
 408
 409        paths.PLUGINS_RESOURCES_PATH.mkdir(exist_ok=True)
 410
 411        existing_symlinked_paths = {
 412            _existing_symlink: pathlib.Path(os.path.realpath(_existing_symlink))
 413            for item in os.listdir(paths.PLUGINS_RESOURCES_PATH)
 414            if is_symlink(_existing_symlink := (paths.PLUGINS_RESOURCES_PATH / item))
 415        }
 416        injected_symlinked_paths = {
 417            _injected_symlink: pathlib.Path(os.path.realpath(_injected_symlink))
 418            for item in os.listdir(paths.PLUGINS_INJECTED_RESOURCES_PATH)
 419            if is_symlink(_injected_symlink := (paths.PLUGINS_INJECTED_RESOURCES_PATH / item))
 420        }
 421        plugins_to_be_symlinked = list(flatten_list(
 422            [
 423                [
 424                    pathlib.Path(os.path.realpath(plugins_path / item))
 425                    for item in os.listdir(plugins_path)
 426                    if (
 427                        not item.startswith('.')
 428                    ) and (item not in ('__pycache__', '__init__.py'))
 429                ]
 430                for plugins_path in paths.PLUGINS_DIR_PATHS
 431                if plugins_path.exists()
 432            ]
 433        ))
 434        plugins_to_be_symlinked.extend(packaged_plugin_paths)
 435
 436        ### Check for duplicates.
 437        seen_plugins = defaultdict(lambda: 0)
 438        for plugin_path in plugins_to_be_symlinked:
 439            plugin_name = plugin_path.stem
 440            seen_plugins[plugin_name] += 1
 441        for plugin_name, plugin_count in seen_plugins.items():
 442            if plugin_count > 1:
 443                if warn:
 444                    _warn(f"Found duplicate plugins named '{plugin_name}'.")
 445
 446        for plugin_symlink_path, real_path in existing_symlinked_paths.items():
 447
 448            ### Remove invalid symlinks.
 449            if real_path not in plugins_to_be_symlinked:
 450                if _injected_plugin_symlinks[plugin_symlink_path] > 1:
 451                    continue
 452                if plugin_symlink_path in injected_symlinked_paths:
 453                    continue
 454                if real_path in injected_symlinked_paths.values():
 455                    continue
 456                try:
 457                    plugin_symlink_path.unlink()
 458                except Exception:
 459                    pass
 460
 461            ### Remove valid plugins from the to-be-symlinked list.
 462            else:
 463                plugins_to_be_symlinked.remove(real_path)
 464
 465        for plugin_path in plugins_to_be_symlinked:
 466            plugin_symlink_path = paths.PLUGINS_RESOURCES_PATH / plugin_path.name
 467            try:
 468                ### There might be duplicate folders (e.g. __pycache__).
 469                if (
 470                    plugin_symlink_path.exists()
 471                    and
 472                    plugin_symlink_path.is_dir()
 473                    and
 474                    not is_symlink(plugin_symlink_path)
 475                ):
 476                    continue
 477                success, msg = make_symlink(plugin_path, plugin_symlink_path)
 478            except Exception as e:
 479                success, msg = False, str(e)
 480            if not success:
 481                if warn:
 482                    _warn(
 483                        f"Failed to create symlink {plugin_symlink_path} "
 484                        + f"to {plugin_path}:\n    {msg}"
 485                    )
 486
 487    ### Release symlink lock file in case other processes need it.
 488    with _locks['PLUGINS_INTERNAL_LOCK_PATH']:
 489        try:
 490            if paths.PLUGINS_INTERNAL_LOCK_PATH.exists():
 491                paths.PLUGINS_INTERNAL_LOCK_PATH.unlink()
 492        ### Sometimes competing threads will delete the lock file at the same time.
 493        except FileNotFoundError:
 494            pass
 495        except Exception as e:
 496            if warn:
 497                _warn(f"Error cleaning up lockfile {paths.PLUGINS_INTERNAL_LOCK_PATH}:\n{e}")
 498
 499        try:
 500            if not paths.PLUGINS_INIT_PATH.exists():
 501                paths.PLUGINS_INIT_PATH.touch()
 502        except Exception as e:
 503            error(f"Failed to create the file '{paths.PLUGINS_INIT_PATH}':\n{e}")
 504
 505    with _locks['__path__']:
 506        if str(paths.PLUGINS_RESOURCES_PATH.parent) not in __path__:
 507            __path__.append(str(paths.PLUGINS_RESOURCES_PATH.parent))
 508
 509    with _locks['_synced_symlinks']:
 510        _synced_symlinks += 1
 511
 512
 513def import_plugins(
 514    *plugins_to_import: Union[str, List[str], None],
 515    warn: bool = True,
 516) -> Union[
 517    'ModuleType', Tuple['ModuleType', None]
 518]:
 519    """
 520    Import the Meerschaum plugins directory.
 521
 522    Parameters
 523    ----------
 524    plugins_to_import: Union[str, List[str], None]
 525        If provided, only import the specified plugins.
 526        Otherwise import the entire plugins module. May be a string, list, or `None`.
 527        Defaults to `None`.
 528
 529    Returns
 530    -------
 531    A module of list of modules, depening on the number of plugins provided.
 532
 533    """
 534    import sys
 535    import importlib
 536    import meerschaum.config.paths as paths
 537    from meerschaum.utils.misc import flatten_list
 538    from meerschaum.utils.venv import is_venv_active, activate_venv, deactivate_venv, Venv
 539    from meerschaum.utils.warnings import warn as _warn
 540    plugins_to_import = list(plugins_to_import)
 541    prepended_sys_path = False
 542    with _locks['sys.path']:
 543
 544        ### Since plugins may depend on other plugins,
 545        ### we need to activate the virtual environments for library plugins.
 546        ### This logic exists in `Plugin.activate_venv()`,
 547        ### but that code requires the plugin's module to already be imported.
 548        ### It's not a guarantee of correct activation order,
 549        ### e.g. if a library plugin pins a specific package and another 
 550        plugins_names = get_plugins_names()
 551        already_active_venvs = [is_venv_active(plugin_name) for plugin_name in plugins_names]
 552
 553        if not sys.path or sys.path[0] != str(paths.PLUGINS_RESOURCES_PATH.parent):
 554            prepended_sys_path = True
 555            sys.path.insert(0, str(paths.PLUGINS_RESOURCES_PATH.parent))
 556
 557        if not plugins_to_import:
 558            for plugin_name in plugins_names:
 559                activate_venv(plugin_name)
 560            try:
 561                imported_plugins = importlib.import_module(paths.PLUGINS_RESOURCES_PATH.stem)
 562            except ImportError as e:
 563                _warn(f"Failed to import the plugins module:\n    {e}")
 564                import traceback
 565                traceback.print_exc()
 566                imported_plugins = None
 567            for plugin_name in plugins_names:
 568                if plugin_name in already_active_venvs:
 569                    continue
 570                deactivate_venv(plugin_name)
 571
 572        else:
 573            imported_plugins = []
 574            for plugin_name in flatten_list(plugins_to_import):
 575                plugin = Plugin(plugin_name)
 576                try:
 577                    with Venv(plugin, init_if_not_exists=False):
 578                        imported_plugins.append(
 579                            importlib.import_module(
 580                                f'{paths.PLUGINS_RESOURCES_PATH.stem}.{plugin_name}'
 581                            )
 582                        )
 583                except Exception as e:
 584                    _warn(
 585                        f"Failed to import plugin '{plugin_name}':\n    "
 586                        + f"{e}\n\nHere's a stacktrace:",
 587                        stack = False,
 588                    )
 589                    from meerschaum.utils.formatting import get_console
 590                    get_console().print_exception(
 591                        suppress = [
 592                            'meerschaum/plugins/__init__.py',
 593                            importlib,
 594                            importlib._bootstrap,
 595                        ]
 596                    )
 597                    imported_plugins.append(None)
 598
 599        if imported_plugins is None and warn:
 600            _warn("Failed to import plugins.", stacklevel=3)
 601
 602        if prepended_sys_path and str(paths.PLUGINS_RESOURCES_PATH.parent) in sys.path:
 603            sys.path.remove(str(paths.PLUGINS_RESOURCES_PATH.parent))
 604
 605    if isinstance(imported_plugins, list):
 606        return (imported_plugins[0] if len(imported_plugins) == 1 else tuple(imported_plugins))
 607    return imported_plugins
 608
 609
 610def from_plugin_import(plugin_import_name: str, *attrs: str) -> Any:
 611    """
 612    Emulate the `from module import x` behavior.
 613
 614    Parameters
 615    ----------
 616    plugin_import_name: str
 617        The import name of the plugin's module.
 618        Separate submodules with '.' (e.g. 'compose.utils.pipes')
 619
 620    attrs: str
 621        Names of the attributes to return.
 622
 623    Returns
 624    -------
 625    Objects from a plugin's submodule.
 626    If multiple objects are provided, return a tuple.
 627
 628    Examples
 629    --------
 630    >>> init = from_plugin_import('compose.utils', 'init')
 631    >>> with mrsm.Venv('compose'):
 632    ...     cf = init()
 633    >>> build_parent_pipe, get_defined_pipes = from_plugin_import(
 634    ...     'compose.utils.pipes',
 635    ...     'build_parent_pipe',
 636    ...     'get_defined_pipes',
 637    ... )
 638    >>> parent_pipe = build_parent_pipe(cf)
 639    >>> defined_pipes = get_defined_pipes(cf)
 640    """
 641    import importlib
 642    import meerschaum.config.paths as paths
 643    from meerschaum.utils.warnings import warn as _warn
 644    if plugin_import_name.startswith('plugins.'):
 645        plugin_import_name = plugin_import_name[len('plugins.'):]
 646    plugin_import_parts = plugin_import_name.split('.')
 647    plugin_root_name = plugin_import_parts[0]
 648    plugin = mrsm.Plugin(plugin_root_name)
 649
 650    submodule_import_name = '.'.join(
 651        [paths.PLUGINS_RESOURCES_PATH.stem]
 652        + plugin_import_parts
 653    )
 654    if len(attrs) == 0:
 655        raise ValueError(f"Provide which attributes to return from '{submodule_import_name}'.")
 656
 657    attrs_to_return = []
 658    with mrsm.Venv(plugin):
 659        if plugin.module is None:
 660            raise ImportError(f"Unable to import plugin '{plugin}'.")
 661
 662        try:
 663            submodule = importlib.import_module(submodule_import_name)
 664        except ImportError as e:
 665            _warn(
 666                f"Failed to import plugin '{submodule_import_name}':\n    "
 667                + f"{e}\n\nHere's a stacktrace:",
 668                stack=False,
 669            )
 670            from meerschaum.utils.formatting import get_console
 671            get_console().print_exception(
 672                suppress=[
 673                    'meerschaum/plugins/__init__.py',
 674                    importlib,
 675                    importlib._bootstrap,
 676                ]
 677            )
 678            return None
 679
 680        for attr in attrs:
 681            try:
 682                attrs_to_return.append(getattr(submodule, attr))
 683            except Exception:
 684                _warn(f"Failed to access '{attr}' from '{submodule_import_name}'.")
 685                attrs_to_return.append(None)
 686        
 687        if len(attrs) == 1:
 688            return attrs_to_return[0]
 689
 690        return tuple(attrs_to_return)
 691
 692
 693_loaded_plugins: bool = False
 694def load_plugins(
 695    skip_if_loaded: bool = True,
 696    shell: bool = False,
 697    debug: bool = False,
 698) -> None:
 699    """
 700    Import Meerschaum plugins and update the actions dictionary.
 701    """
 702    global _loaded_plugins
 703    from meerschaum.utils.warnings import dprint
 704
 705    if skip_if_loaded and _loaded_plugins:
 706        if debug:
 707            dprint("Skip loading plugins...")
 708        return
 709
 710    from inspect import isfunction, getmembers
 711    import meerschaum.config.paths as paths
 712    from meerschaum.actions import __all__ as _all, modules
 713    from meerschaum.utils.packages import get_modules_from_package
 714
 715    _plugins_names, plugins_modules = get_modules_from_package(
 716        import_plugins(),
 717        names = True,
 718        recursive = True,
 719        modules_venvs = True
 720    )
 721
 722    ### I'm appending here to keep from redefining the modules list.
 723    new_modules = (
 724        [
 725            mod
 726            for mod in modules
 727            if not mod.__name__.startswith(paths.PLUGINS_RESOURCES_PATH.stem + '.')
 728        ]
 729        + plugins_modules
 730    )
 731    n_mods = len(modules)
 732    for mod in new_modules:
 733        modules.append(mod)
 734    for i in range(n_mods):
 735        modules.pop(0)
 736
 737    for module in plugins_modules:
 738        for name, func in getmembers(module):
 739            if not isfunction(func):
 740                continue
 741            if name == module.__name__.split('.')[-1]:
 742                make_action(
 743                    func,
 744                    **{'shell': shell, 'debug': debug},
 745                    _plugin_name=name,
 746                    skip_if_loaded=True,
 747                )
 748
 749    _loaded_plugins = True
 750
 751
 752def unload_custom_actions(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
 753    """
 754    Unload the custom actions added by plugins.
 755    """
 756    from meerschaum.actions import (
 757        actions,
 758        _custom_actions_plugins,
 759        _plugins_actions,
 760    )
 761    from meerschaum._internal.entry import _shell
 762    import meerschaum._internal.shell as shell_pkg
 763
 764    plugins = plugins if plugins is not None else list(_plugins_actions)
 765
 766    for plugin in plugins:
 767        action_names = _plugins_actions.get(plugin, [])
 768        actions_to_remove = {
 769            action_name: actions.get(action_name, None)
 770            for action_name in action_names
 771        }
 772        for action_name in action_names:
 773            _ = actions.pop(action_name, None)
 774            _ = _custom_actions_plugins.pop(action_name, None)
 775            _ = _actions_daemon_enabled.pop(action_name, None)
 776
 777        _ = _plugins_actions.pop(plugin, None)
 778        shell_pkg._remove_shell_actions(
 779            _shell=_shell,
 780            actions=actions_to_remove,
 781        )
 782    
 783
 784def unload_plugins(
 785    plugins: Optional[List[str]] = None,
 786    remove_symlinks: bool = True,
 787    debug: bool = False,
 788) -> None:
 789    """
 790    Unload the specified plugins from memory.
 791    """
 792    global _loaded_plugins, _synced_symlinks
 793    import sys
 794    import meerschaum.config.paths as paths
 795    from meerschaum.connectors import unload_plugin_connectors
 796    if debug:
 797        from meerschaum.utils.warnings import dprint
 798
 799    _loaded_plugins = False
 800    _synced_symlinks = 0
 801
 802    all_plugins = get_plugins_names()
 803    plugins = plugins if plugins is not None else all_plugins
 804    if debug:
 805        dprint(f"Unloading plugins: {plugins}")
 806
 807    unload_custom_actions(plugins, debug=debug)
 808    unload_plugin_connectors(plugins, debug=debug)
 809
 810    module_prefix = f"{paths.PLUGINS_RESOURCES_PATH.stem}."
 811    loaded_modules = [mod_name for mod_name in sys.modules if mod_name.startswith(module_prefix)]
 812
 813    root_plugins_mod = (
 814        sys.modules.get(paths.PLUGINS_RESOURCES_PATH.stem, None)
 815        if sorted(plugins) != sorted(all_plugins)
 816        else sys.modules.pop(paths.PLUGINS_RESOURCES_PATH, None)
 817    )
 818
 819    for plugin_name in plugins:
 820        for mod_name in loaded_modules:
 821            if (
 822                mod_name[len(paths.PLUGINS_RESOURCES_PATH.stem):].startswith(plugin_name + '.')
 823                or mod_name[len(paths.PLUGINS_RESOURCES_PATH.stem):] == plugin_name
 824            ):
 825                _ = sys.modules.pop(mod_name, None)
 826
 827        if root_plugins_mod is not None and plugin_name in root_plugins_mod.__dict__:
 828            try:
 829                delattr(root_plugins_mod, plugin_name)
 830            except Exception:
 831                pass
 832
 833        ### Unload sync hooks.
 834        _ = _pre_sync_hooks.pop(plugin_name, None)
 835        _ = _post_sync_hooks.pop(plugin_name, None)
 836
 837        ### Unload API endpoints and pages.
 838        _ = _dash_plugins.pop(plugin_name, None)
 839        web_page_funcs = _plugins_web_pages.pop(plugin_name, None) or []
 840        page_groups_to_pop = []
 841        for page_group, page_functions in _plugin_endpoints_to_pages.items():
 842            page_functions_to_pop = [
 843                page_str
 844                for page_str, page_payload in page_functions.items()
 845                if page_payload.get('function', None) in web_page_funcs
 846            ]
 847            for page_str in page_functions_to_pop:
 848                page_functions.pop(page_str, None)
 849            if not page_functions:
 850                page_groups_to_pop.append(page_group)
 851        
 852        for page_group in page_groups_to_pop:
 853            _plugin_endpoints_to_pages.pop(page_group, None)
 854
 855        ### Remove all but injected symlinks.
 856        if remove_symlinks:
 857            dir_symlink_path = paths.PLUGINS_RESOURCES_PATH / plugin_name
 858            dir_symlink_injected_path = paths.PLUGINS_INJECTED_RESOURCES_PATH / plugin_name
 859            file_symlink_path = paths.PLUGINS_RESOURCES_PATH / f"{plugin_name}.py"
 860            file_symlink_injected_path = paths.PLUGINS_INJECTED_RESOURCES_PATH / f"{plugin_name}.py"
 861
 862            try:
 863                if dir_symlink_path.exists() and not dir_symlink_injected_path.exists():
 864                    dir_symlink_path.unlink()
 865            except Exception:
 866                pass
 867
 868            try:
 869                if file_symlink_path.exists() and not file_symlink_injected_path.exists():
 870                    file_symlink_path.unlink()
 871            except Exception:
 872                pass
 873
 874
 875def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
 876    """
 877    Reload plugins back into memory.
 878
 879    Parameters
 880    ----------
 881    plugins: Optional[List[str]], default None
 882        The plugins to reload. `None` will reload all plugins.
 883
 884    """
 885    global _synced_symlinks
 886    unload_plugins(plugins, debug=debug)
 887    _synced_symlinks = 0
 888    sync_plugins_symlinks(debug=debug)
 889    load_plugins(skip_if_loaded=False, debug=debug)
 890
 891
 892def get_plugins(*to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]:
 893    """
 894    Return a list of `Plugin` objects.
 895
 896    Parameters
 897    ----------
 898    to_load:
 899        If specified, only load specific plugins.
 900        Otherwise return all plugins.
 901
 902    try_import: bool, default True
 903        If `True`, allow for plugins to be imported.
 904    """
 905    import meerschaum.config.paths as paths
 906    import os
 907    sync_plugins_symlinks()
 908    _plugins = [
 909        Plugin(name)
 910        for name in (
 911            to_load or [
 912                (
 913                    name if (paths.PLUGINS_RESOURCES_PATH / name).is_dir()
 914                    else name[:-3]
 915                )
 916                for name in os.listdir(paths.PLUGINS_RESOURCES_PATH)
 917                if name != '__init__.py'
 918            ]
 919        )
 920    ]
 921    plugins = tuple(plugin for plugin in _plugins if plugin.is_installed(try_import=try_import))
 922    if len(to_load) == 1:
 923        if len(plugins) == 0:
 924            raise ValueError(f"Plugin '{to_load[0]}' is not installed.")
 925        return plugins[0]
 926    return plugins
 927
 928
 929def get_plugins_names(*to_load, **kw) -> List[str]:
 930    """
 931    Return a list of installed plugins.
 932    """
 933    return [plugin.name for plugin in get_plugins(*to_load, **kw)]
 934
 935
 936def get_plugins_modules(*to_load, **kw) -> List['ModuleType']:
 937    """
 938    Return a list of modules for the installed plugins, or `None` if things break.
 939    """
 940    return [plugin.module for plugin in get_plugins(*to_load, **kw)]
 941
 942
 943def get_data_plugins() -> List[Plugin]:
 944    """
 945    Only return the modules of plugins with either `fetch()` or `sync()` functions.
 946    """
 947    import inspect
 948    plugins = get_plugins()
 949    data_names = {'sync', 'fetch'}
 950    data_plugins = []
 951    for plugin in plugins:
 952        for name, ob in inspect.getmembers(plugin.module):
 953            if not inspect.isfunction(ob):
 954                continue
 955            if name not in data_names:
 956                continue
 957            data_plugins.append(plugin)
 958    return data_plugins
 959
 960
 961def add_plugin_argument(*args, **kwargs) -> None:
 962    """
 963    Add argparse arguments under the 'Plugins options' group.
 964    Takes the same parameters as the regular argparse `add_argument()` function.
 965
 966    Examples
 967    --------
 968    >>> add_plugin_argument('--foo', type=int, help="This is my help text!")
 969    >>> 
 970    """
 971    from meerschaum._internal.arguments._parser import groups, _seen_plugin_args, parser
 972    from meerschaum.utils.warnings import warn
 973    _parent_plugin_name = _get_parent_plugin()
 974    title = f"Plugin '{_parent_plugin_name}' options" if _parent_plugin_name else 'Custom options'
 975    group_key = 'plugin_' + (_parent_plugin_name or '')
 976    if group_key not in groups:
 977        groups[group_key] = parser.add_argument_group(
 978            title = title,
 979        )
 980        _seen_plugin_args[group_key] = set()
 981    try:
 982        if str(args) not in _seen_plugin_args[group_key]:
 983            groups[group_key].add_argument(*args, **kwargs)
 984            _seen_plugin_args[group_key].add(str(args))
 985    except Exception as e:
 986        warn(e)
 987
 988
 989def inject_plugin_path(
 990    plugin_path: pathlib.Path,
 991    plugins_resources_path: Optional[pathlib.Path] = None) -> None:
 992    """
 993    Inject a plugin as a symlink into the internal `plugins` directory.
 994
 995    Parameters
 996    ----------
 997    plugin_path: pathlib.Path
 998        The path to the plugin's source module.
 999    """
1000    import meerschaum.config.paths as paths
1001    from meerschaum.utils.misc import make_symlink
1002    if plugins_resources_path is None:
1003        plugins_resources_path = paths.PLUGINS_RESOURCES_PATH
1004        plugins_injected_resources_path = paths.PLUGINS_INJECTED_RESOURCES_PATH
1005    else:
1006        plugins_injected_resources_path = plugins_resources_path / '.injected'
1007
1008    if plugin_path.is_dir():
1009        plugin_name = plugin_path.name
1010        dest_path = plugins_resources_path / plugin_name
1011        injected_path = plugins_injected_resources_path / plugin_name
1012    elif plugin_path.name == '__init__.py':
1013        plugin_name = plugin_path.parent.name
1014        dest_path = plugins_resources_path / plugin_name
1015        injected_path = plugins_injected_resources_path / plugin_name
1016    elif plugin_path.name.endswith('.py'):
1017        plugin_name = plugin_path.name[:(-1 * len('.py'))]
1018        dest_path = plugins_resources_path / plugin_path.name
1019        injected_path = plugins_injected_resources_path / plugin_path.name
1020    else:
1021        raise ValueError(f"Cannot deduce plugin name from path '{plugin_path}'.")
1022
1023    _injected_plugin_symlinks[dest_path] += 1
1024    make_symlink(plugin_path, dest_path)
1025    make_symlink(plugin_path, injected_path)
1026
1027
1028def _get_parent_plugin(stacklevel: Union[int, Tuple[int, ...]] = (1, 2, 3, 4)) -> Union[str, None]:
1029    """If this function is called from outside a Meerschaum plugin, it will return None."""
1030    import inspect
1031    if not isinstance(stacklevel, tuple):
1032        stacklevel = (stacklevel,)
1033
1034    for _level in stacklevel:
1035        try:
1036            parent_globals = inspect.stack()[_level][0].f_globals
1037            global_name = parent_globals.get('__name__', '')
1038            if global_name.startswith('meerschaum.'):
1039                continue
1040            plugin_name = global_name.replace('plugins.', '').split('.')[0]
1041            if plugin_name.startswith('_') or plugin_name == 'importlib':
1042                continue
1043            return plugin_name
1044        except Exception:
1045            continue
1046
1047    return None
class Plugin:
 30class Plugin:
 31    """Handle packaging of Meerschaum plugins."""
 32
 33    def __init__(
 34        self,
 35        name: str,
 36        version: Optional[str] = None,
 37        user_id: Optional[int] = None,
 38        required: Optional[List[str]] = None,
 39        attributes: Optional[Dict[str, Any]] = None,
 40        archive_path: Optional[pathlib.Path] = None,
 41        venv_path: Optional[pathlib.Path] = None,
 42        repo_connector: Optional['mrsm.connectors.api.APIConnector'] = None,
 43        repo: Union['mrsm.connectors.api.APIConnector', str, None] = None,
 44    ):
 45        import meerschaum.config.paths as paths
 46        from meerschaum._internal.static import STATIC_CONFIG
 47        sep = STATIC_CONFIG['plugins']['repo_separator']
 48        _repo = None
 49        if sep in name:
 50            try:
 51                name, _repo = name.split(sep)
 52            except Exception as e:
 53                error(f"Invalid plugin name: '{name}'")
 54        self._repo_in_name = _repo
 55
 56        if attributes is None:
 57            attributes = {}
 58        self.name = name
 59        self.attributes = attributes
 60        self.user_id = user_id
 61        self._version = version
 62        if required:
 63            self._required = required
 64        self.archive_path = (
 65            archive_path if archive_path is not None
 66            else paths.PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz"
 67        )
 68        self.venv_path = (
 69            venv_path if venv_path is not None
 70            else paths.VIRTENV_RESOURCES_PATH / self.name
 71        )
 72        self._repo_connector = repo_connector
 73        self._repo_keys = repo
 74
 75
 76    @property
 77    def repo_connector(self):
 78        """
 79        Return the repository connector for this plugin.
 80        NOTE: This imports the `connectors` module, which imports certain plugin modules.
 81        """
 82        if self._repo_connector is None:
 83            from meerschaum.connectors.parse import parse_repo_keys
 84
 85            repo_keys = self._repo_keys or self._repo_in_name
 86            if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name:
 87                error(
 88                    f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'."
 89                )
 90            repo_connector = parse_repo_keys(repo_keys)
 91            self._repo_connector = repo_connector
 92        return self._repo_connector
 93
 94
 95    @property
 96    def version(self):
 97        """
 98        Return the plugin's module version is defined (`__version__`) if it's defined.
 99        """
100        if self._version is None:
101            try:
102                self._version = self.module.__version__
103            except Exception as e:
104                self._version = None
105        return self._version
106
107
108    @property
109    def module(self):
110        """
111        Return the Python module of the underlying plugin.
112        """
113        if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None:
114            if self.__file__ is None:
115                return None
116
117            from meerschaum.plugins import import_plugins
118            self._module = import_plugins(str(self), warn=False)
119
120        return self._module
121
122
123    @property
124    def __file__(self) -> Union[str, None]:
125        """
126        Return the file path (str) of the plugin if it exists, otherwise `None`.
127        """
128        if self.__dict__.get('_module', None) is not None:
129            return self.module.__file__
130
131        import meerschaum.config.paths as paths
132
133        potential_dir = paths.PLUGINS_RESOURCES_PATH / self.name
134        if (
135            potential_dir.exists()
136            and potential_dir.is_dir()
137            and (potential_dir / '__init__.py').exists()
138        ):
139            return str((potential_dir / '__init__.py').as_posix())
140
141        potential_file = paths.PLUGINS_RESOURCES_PATH / (self.name + '.py')
142        if potential_file.exists() and not potential_file.is_dir():
143            return str(potential_file.as_posix())
144
145        return None
146
147
148    @property
149    def requirements_file_path(self) -> Union[pathlib.Path, None]:
150        """
151        If a file named `requirements.txt` exists, return its path.
152        """
153        if self.__file__ is None:
154            return None
155        path = pathlib.Path(self.__file__).parent / 'requirements.txt'
156        if not path.exists():
157            return None
158        return path
159
160
161    def is_installed(self, **kw) -> bool:
162        """
163        Check whether a plugin is correctly installed.
164
165        Returns
166        -------
167        A `bool` indicating whether a plugin exists and is successfully imported.
168        """
169        return self.__file__ is not None
170
171
172    def make_tar(self, debug: bool = False) -> pathlib.Path:
173        """
174        Compress the plugin's source files into a `.tar.gz` archive and return the archive's path.
175
176        Parameters
177        ----------
178        debug: bool, default False
179            Verbosity toggle.
180
181        Returns
182        -------
183        A `pathlib.Path` to the archive file's path.
184
185        """
186        import tarfile, pathlib, subprocess, fnmatch
187        from meerschaum.utils.debug import dprint
188        from meerschaum.utils.packages import attempt_import
189        pathspec = attempt_import('pathspec', debug=debug)
190
191        if not self.__file__:
192            from meerschaum.utils.warnings import error
193            error(f"Could not find file for plugin '{self}'.")
194        if '__init__.py' in self.__file__ or os.path.isdir(self.__file__):
195            path = self.__file__.replace('__init__.py', '')
196            is_dir = True
197        else:
198            path = self.__file__
199            is_dir = False
200
201        old_cwd = os.getcwd()
202        real_parent_path = pathlib.Path(os.path.realpath(path)).parent
203        os.chdir(real_parent_path)
204
205        default_patterns_to_ignore = [
206            '.pyc',
207            '__pycache__/',
208            'eggs/',
209            '__pypackages__/',
210            '.git',
211        ]
212
213        def parse_gitignore() -> 'Set[str]':
214            gitignore_path = pathlib.Path(path) / '.gitignore'
215            if not gitignore_path.exists():
216                return set(default_patterns_to_ignore)
217            with open(gitignore_path, 'r', encoding='utf-8') as f:
218                gitignore_text = f.read()
219            return set(pathspec.PathSpec.from_lines(
220                pathspec.patterns.GitWildMatchPattern,
221                default_patterns_to_ignore + gitignore_text.splitlines()
222            ).match_tree(path))
223
224        patterns_to_ignore = parse_gitignore() if is_dir else set()
225
226        if debug:
227            dprint(f"Patterns to ignore:\n{patterns_to_ignore}")
228
229        with tarfile.open(self.archive_path, 'w:gz') as tarf:
230            if not is_dir:
231                tarf.add(f"{self.name}.py")
232            else:
233                for root, dirs, files in os.walk(self.name):
234                    for f in files:
235                        good_file = True
236                        fp = os.path.join(root, f)
237                        for pattern in patterns_to_ignore:
238                            if pattern in str(fp) or f.startswith('.'):
239                                good_file = False
240                                break
241                        if good_file:
242                            if debug:
243                                dprint(f"Adding '{fp}'...")
244                            tarf.add(fp)
245
246        ### clean up and change back to old directory
247        os.chdir(old_cwd)
248
249        ### change to 775 to avoid permissions issues with the API in a Docker container
250        self.archive_path.chmod(0o775)
251
252        if debug:
253            dprint(f"Created archive '{self.archive_path}'.")
254        return self.archive_path
255
256
257    def install(
258        self,
259        skip_deps: bool = False,
260        force: bool = False,
261        debug: bool = False,
262    ) -> SuccessTuple:
263        """
264        Extract a plugin's tar archive to the plugins directory.
265        
266        This function checks if the plugin is already installed and if the version is equal or
267        greater than the existing installation.
268
269        Parameters
270        ----------
271        skip_deps: bool, default False
272            If `True`, do not install dependencies.
273
274        force: bool, default False
275            If `True`, continue with installation, even if required packages fail to install.
276
277        debug: bool, default False
278            Verbosity toggle.
279
280        Returns
281        -------
282        A `SuccessTuple` of success (bool) and a message (str).
283
284        """
285        if self.full_name in _ongoing_installations:
286            return True, f"Already installing plugin '{self}'."
287        _ongoing_installations.add(self.full_name)
288
289        import meerschaum.config.paths as paths
290        from meerschaum.utils.warnings import warn, error
291        if debug:
292            from meerschaum.utils.debug import dprint
293        import tarfile
294        import re
295        import ast
296        from meerschaum.plugins import sync_plugins_symlinks
297        from meerschaum.utils.packages import attempt_import, reload_meerschaum
298        from meerschaum.utils.venv import init_venv
299        from meerschaum.utils.misc import safely_extract_tar
300        old_cwd = os.getcwd()
301        old_version = ''
302        new_version = ''
303        temp_dir = paths.PLUGINS_TEMP_RESOURCES_PATH / self.name
304        temp_dir.mkdir(exist_ok=True)
305
306        if not self.archive_path.exists():
307            return False, f"Missing archive file for plugin '{self}'."
308        if self.version is not None:
309            old_version = self.version
310            if debug:
311                dprint(f"Found existing version '{old_version}' for plugin '{self}'.")
312
313        if debug:
314            dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...")
315
316        try:
317            with tarfile.open(self.archive_path, 'r:gz') as tarf:
318                safely_extract_tar(tarf, temp_dir)
319        except Exception as e:
320            warn(e)
321            return False, f"Failed to extract plugin '{self.name}'."
322
323        ### search for version information
324        files = os.listdir(temp_dir)
325        
326        if str(files[0]) == self.name:
327            is_dir = True
328        elif str(files[0]) == self.name + '.py':
329            is_dir = False
330        else:
331            error(f"Unknown format encountered for plugin '{self}'.")
332
333        fpath = temp_dir / files[0]
334        if is_dir:
335            fpath = fpath / '__init__.py'
336
337        init_venv(self.name, debug=debug)
338        with open(fpath, 'r', encoding='utf-8') as f:
339            init_lines = f.readlines()
340        new_version = None
341        for line in init_lines:
342            if '__version__' not in line:
343                continue
344            version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip())
345            if not version_match:
346                continue
347            new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip())
348            break
349        if not new_version:
350            warn(
351                f"No `__version__` defined for plugin '{self}'. "
352                + "Assuming new version...",
353                stack = False,
354            )
355
356        packaging_version = attempt_import('packaging.version')
357        try:
358            is_new_version = (not new_version and not old_version) or (
359                packaging_version.parse(old_version) < packaging_version.parse(new_version)
360            )
361            is_same_version = new_version and old_version and (
362                packaging_version.parse(old_version) == packaging_version.parse(new_version)
363            )
364        except Exception:
365            is_new_version, is_same_version = True, False
366
367        ### Determine where to permanently store the new plugin.
368        plugin_installation_dir_path = paths.PLUGINS_DIR_PATHS[0]
369        for path in paths.PLUGINS_DIR_PATHS:
370            if not path.exists():
371                warn(f"Plugins path does not exist: {path}", stack=False)
372                continue
373
374            files_in_plugins_dir = os.listdir(path)
375            if (
376                self.name in files_in_plugins_dir
377                or
378                (self.name + '.py') in files_in_plugins_dir
379            ):
380                plugin_installation_dir_path = path
381                break
382
383        success_msg = (
384            f"Successfully installed plugin '{self}'"
385            + ("\n    (skipped dependencies)" if skip_deps else "")
386            + "."
387        )
388        success, abort = None, None
389
390        if is_same_version and not force:
391            success, msg = True, (
392                f"Plugin '{self}' is up-to-date (version {old_version}).\n" +
393                "    Install again with `-f` or `--force` to reinstall."
394            )
395            abort = True
396        elif is_new_version or force:
397            for src_dir, dirs, files in os.walk(temp_dir):
398                if success is not None:
399                    break
400                dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path))
401                if not os.path.exists(dst_dir):
402                    os.mkdir(dst_dir)
403                for f in files:
404                    src_file = os.path.join(src_dir, f)
405                    dst_file = os.path.join(dst_dir, f)
406                    if os.path.exists(dst_file):
407                        os.remove(dst_file)
408
409                    if debug:
410                        dprint(f"Moving '{src_file}' to '{dst_dir}'...")
411                    try:
412                        shutil.move(src_file, dst_dir)
413                    except Exception:
414                        success, msg = False, (
415                            f"Failed to install plugin '{self}': " +
416                            f"Could not move file '{src_file}' to '{dst_dir}'"
417                        )
418                        print(msg)
419                        break
420            if success is None:
421                success, msg = True, success_msg
422        else:
423            success, msg = False, (
424                f"Your installed version of plugin '{self}' ({old_version}) is higher than "
425                + f"attempted version {new_version}."
426            )
427
428        shutil.rmtree(temp_dir)
429        os.chdir(old_cwd)
430
431        ### Reload the plugin's module.
432        sync_plugins_symlinks(debug=debug)
433        if '_module' in self.__dict__:
434            del self.__dict__['_module']
435        init_venv(venv=self.name, force=True, debug=debug)
436        reload_meerschaum(debug=debug)
437
438        ### if we've already failed, return here
439        if not success or abort:
440            _ongoing_installations.remove(self.full_name)
441            return success, msg
442
443        ### attempt to install dependencies
444        dependencies_installed = skip_deps or self.install_dependencies(force=force, debug=debug)
445        if not dependencies_installed:
446            _ongoing_installations.remove(self.full_name)
447            return False, f"Failed to install dependencies for plugin '{self}'."
448
449        ### handling success tuple, bool, or other (typically None)
450        setup_tuple = self.setup(debug=debug)
451        if isinstance(setup_tuple, tuple):
452            if not setup_tuple[0]:
453                success, msg = setup_tuple
454        elif isinstance(setup_tuple, bool):
455            if not setup_tuple:
456                success, msg = False, (
457                    f"Failed to run post-install setup for plugin '{self}'." + '\n' +
458                    f"Check `setup()` in '{self.__file__}' for more information " +
459                    "(no error message provided)."
460                )
461            else:
462                success, msg = True, success_msg
463        elif setup_tuple is None:
464            success = True
465            msg = (
466                f"Post-install for plugin '{self}' returned None. " +
467                "Assuming plugin successfully installed."
468            )
469            warn(msg)
470        else:
471            success = False
472            msg = (
473                f"Post-install for plugin '{self}' returned unexpected value " +
474                f"of type '{type(setup_tuple)}': {setup_tuple}"
475            )
476
477        _ongoing_installations.remove(self.full_name)
478        _ = self.module
479        return success, msg
480
481
482    def remove_archive(
483        self,        
484        debug: bool = False
485    ) -> SuccessTuple:
486        """Remove a plugin's archive file."""
487        if not self.archive_path.exists():
488            return True, f"Archive file for plugin '{self}' does not exist."
489        try:
490            self.archive_path.unlink()
491        except Exception as e:
492            return False, f"Failed to remove archive for plugin '{self}':\n{e}"
493        return True, "Success"
494
495
496    def remove_venv(
497        self,        
498        debug: bool = False
499    ) -> SuccessTuple:
500        """Remove a plugin's virtual environment."""
501        if not self.venv_path.exists():
502            return True, f"Virtual environment for plugin '{self}' does not exist."
503        try:
504            shutil.rmtree(self.venv_path)
505        except Exception as e:
506            return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}"
507        return True, "Success"
508
509
510    def uninstall(self, debug: bool = False) -> SuccessTuple:
511        """
512        Remove a plugin, its virtual environment, and archive file.
513        """
514        from meerschaum.utils.packages import reload_meerschaum
515        from meerschaum.plugins import sync_plugins_symlinks
516        from meerschaum.utils.warnings import warn, info
517        warnings_thrown_count: int = 0
518        max_warnings: int = 3
519
520        if not self.is_installed():
521            info(
522                f"Plugin '{self.name}' doesn't seem to be installed.\n    "
523                + "Checking for artifacts...",
524                stack = False,
525            )
526        else:
527            real_path = pathlib.Path(os.path.realpath(self.__file__))
528            try:
529                if real_path.name == '__init__.py':
530                    shutil.rmtree(real_path.parent)
531                else:
532                    real_path.unlink()
533            except Exception as e:
534                warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False)
535                warnings_thrown_count += 1
536            else:
537                info(f"Removed source files for plugin '{self.name}'.")
538
539        if self.venv_path.exists():
540            success, msg = self.remove_venv(debug=debug)
541            if not success:
542                warn(msg, stack=False)
543                warnings_thrown_count += 1
544            else:
545                info(f"Removed virtual environment from plugin '{self.name}'.")
546
547        success = warnings_thrown_count < max_warnings
548        sync_plugins_symlinks(debug=debug)
549        self.deactivate_venv(force=True, debug=debug)
550        reload_meerschaum(debug=debug)
551        return success, (
552            f"Successfully uninstalled plugin '{self}'." if success
553            else f"Failed to uninstall plugin '{self}'."
554        )
555
556
557    def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]:
558        """
559        If exists, run the plugin's `setup()` function.
560
561        Parameters
562        ----------
563        *args: str
564            The positional arguments passed to the `setup()` function.
565            
566        debug: bool, default False
567            Verbosity toggle.
568
569        **kw: Any
570            The keyword arguments passed to the `setup()` function.
571
572        Returns
573        -------
574        A `SuccessTuple` or `bool` indicating success.
575
576        """
577        from meerschaum.utils.debug import dprint
578        import inspect
579        _setup = None
580        for name, fp in inspect.getmembers(self.module):
581            if name == 'setup' and inspect.isfunction(fp):
582                _setup = fp
583                break
584
585        ### assume success if no setup() is found (not necessary)
586        if _setup is None:
587            return True
588
589        sig = inspect.signature(_setup)
590        has_debug, has_kw = ('debug' in sig.parameters), False
591        for k, v in sig.parameters.items():
592            if '**' in str(v):
593                has_kw = True
594                break
595
596        _kw = {}
597        if has_kw:
598            _kw.update(kw)
599        if has_debug:
600            _kw['debug'] = debug
601
602        if debug:
603            dprint(f"Running setup for plugin '{self}'...")
604        try:
605            self.activate_venv(debug=debug)
606            return_tuple = _setup(*args, **_kw)
607            self.deactivate_venv(debug=debug)
608        except Exception as e:
609            return False, str(e)
610
611        if isinstance(return_tuple, tuple):
612            return return_tuple
613        if isinstance(return_tuple, bool):
614            return return_tuple, f"Setup for Plugin '{self.name}' did not return a message."
615        if return_tuple is None:
616            return False, f"Setup for Plugin '{self.name}' returned None."
617        return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"
618
619
620    def get_dependencies(
621        self,
622        debug: bool = False,
623    ) -> List[str]:
624        """
625        If the Plugin has specified dependencies in a list called `required`, return the list.
626        
627        **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages.
628        Meerschaum plugins may also specify connector keys for a repo after `'@'`.
629
630        Parameters
631        ----------
632        debug: bool, default False
633            Verbosity toggle.
634
635        Returns
636        -------
637        A list of required packages and plugins (str).
638
639        """
640        if '_required' in self.__dict__:
641            return self._required
642
643        ### If the plugin has not yet been imported,
644        ### infer the dependencies from the source text.
645        ### This is not super robust, and it doesn't feel right
646        ### having multiple versions of the logic.
647        ### This is necessary when determining the activation order
648        ### without having import the module.
649        ### For consistency's sake, the module-less method does not cache the requirements.
650        if self.__dict__.get('_module', None) is None:
651            file_path = self.__file__
652            if file_path is None:
653                return []
654            with open(file_path, 'r', encoding='utf-8') as f:
655                text = f.read()
656
657            if 'required' not in text:
658                return []
659
660            ### This has some limitations:
661            ### It relies on `required` being manually declared.
662            ### We lose the ability to dynamically alter the `required` list,
663            ### which is why we've kept the module-reliant method below.
664            import ast, re
665            ### NOTE: This technically would break 
666            ### if `required` was the very first line of the file.
667            req_start_match = re.search(r'\nrequired(:\s*)?.*=', text)
668            if not req_start_match:
669                return []
670            req_start = req_start_match.start()
671            equals_sign = req_start + text[req_start:].find('=')
672
673            ### Dependencies may have brackets within the strings, so push back the index.
674            first_opening_brace = equals_sign + 1 + text[equals_sign:].find('[')
675            if first_opening_brace == -1:
676                return []
677
678            next_closing_brace = equals_sign + 1 + text[equals_sign:].find(']')
679            if next_closing_brace == -1:
680                return []
681
682            start_ix = first_opening_brace + 1
683            end_ix = next_closing_brace
684
685            num_braces = 0
686            while True:
687                if '[' not in text[start_ix:end_ix]:
688                    break
689                num_braces += 1
690                start_ix = end_ix
691                end_ix += text[end_ix + 1:].find(']') + 1
692
693            req_end = end_ix + 1
694            req_text = (
695                text[(first_opening_brace-1):req_end]
696                .lstrip()
697                .replace('=', '', 1)
698                .lstrip()
699                .rstrip()
700            )
701            try:
702                required = ast.literal_eval(req_text)
703            except Exception as e:
704                warn(
705                    f"Unable to determine requirements for plugin '{self.name}' "
706                    + "without importing the module.\n"
707                    + "    This may be due to dynamically setting the global `required` list.\n"
708                    + f"    {e}"
709                )
710                return []
711            return required
712
713        import inspect
714        self.activate_venv(dependencies=False, debug=debug)
715        required = []
716        for name, val in inspect.getmembers(self.module):
717            if name == 'required':
718                required = val
719                break
720        self._required = required
721        self.deactivate_venv(dependencies=False, debug=debug)
722        return required
723
724
725    def get_required_plugins(self, debug: bool=False) -> List[mrsm.plugins.Plugin]:
726        """
727        Return a list of required Plugin objects.
728        """
729        from meerschaum.utils.warnings import warn
730        from meerschaum.config import get_config
731        from meerschaum._internal.static import STATIC_CONFIG
732        from meerschaum.connectors.parse import is_valid_connector_keys
733        plugins = []
734        _deps = self.get_dependencies(debug=debug)
735        sep = STATIC_CONFIG['plugins']['repo_separator']
736        plugin_names = [
737            _d[len('plugin:'):] for _d in _deps
738            if _d.startswith('plugin:') and len(_d) > len('plugin:')
739        ]
740        default_repo_keys = get_config('meerschaum', 'repository')
741        skipped_repo_keys = set()
742
743        for _plugin_name in plugin_names:
744            if sep in _plugin_name:
745                try:
746                    _plugin_name, _repo_keys = _plugin_name.split(sep)
747                except Exception:
748                    _repo_keys = default_repo_keys
749                    warn(
750                        f"Invalid repo keys for required plugin '{_plugin_name}'.\n    "
751                        + f"Will try to use '{_repo_keys}' instead.",
752                        stack = False,
753                    )
754            else:
755                _repo_keys = default_repo_keys
756
757            if _repo_keys in skipped_repo_keys:
758                continue
759
760            if not is_valid_connector_keys(_repo_keys):
761                warn(
762                    f"Invalid connector '{_repo_keys}'.\n"
763                    f"    Skipping required plugins from repository '{_repo_keys}'",
764                    stack=False,
765                )
766                continue
767
768            plugins.append(Plugin(_plugin_name, repo=_repo_keys))
769
770        return plugins
771
772
773    def get_required_packages(self, debug: bool=False) -> List[str]:
774        """
775        Return the required package names (excluding plugins).
776        """
777        _deps = self.get_dependencies(debug=debug)
778        return [_d for _d in _deps if not _d.startswith('plugin:')]
779
780
781    def activate_venv(
782        self,
783        dependencies: bool = True,
784        init_if_not_exists: bool = True,
785        debug: bool = False,
786        **kw
787    ) -> bool:
788        """
789        Activate the virtual environments for the plugin and its dependencies.
790
791        Parameters
792        ----------
793        dependencies: bool, default True
794            If `True`, activate the virtual environments for required plugins.
795
796        Returns
797        -------
798        A bool indicating success.
799        """
800        import meerschaum.config.paths as paths
801        from meerschaum.utils.venv import venv_target_path
802        from meerschaum.utils.packages import activate_venv
803        from meerschaum.utils.misc import make_symlink, is_symlink
804
805        if dependencies:
806            for plugin in self.get_required_plugins(debug=debug):
807                plugin.activate_venv(debug=debug, init_if_not_exists=init_if_not_exists, **kw)
808
809        vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True)
810        venv_meerschaum_path = vtp / 'meerschaum'
811
812        try:
813            success, msg = True, "Success"
814            if is_symlink(venv_meerschaum_path):
815                if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != paths.PACKAGE_ROOT_PATH:
816                    venv_meerschaum_path.unlink()
817                    success, msg = make_symlink(venv_meerschaum_path, paths.PACKAGE_ROOT_PATH)
818        except Exception as e:
819            success, msg = False, str(e)
820        if not success:
821            warn(
822                f"Unable to create symlink {venv_meerschaum_path} to {paths.PACKAGE_ROOT_PATH}:\n"
823                f"{msg}"
824            )
825
826        return activate_venv(self.name, init_if_not_exists=init_if_not_exists, debug=debug, **kw)
827
828
829    def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool:
830        """
831        Deactivate the virtual environments for the plugin and its dependencies.
832
833        Parameters
834        ----------
835        dependencies: bool, default True
836            If `True`, deactivate the virtual environments for required plugins.
837
838        Returns
839        -------
840        A bool indicating success.
841        """
842        from meerschaum.utils.packages import deactivate_venv
843        success = deactivate_venv(self.name, debug=debug, **kw)
844        if dependencies:
845            for plugin in self.get_required_plugins(debug=debug):
846                plugin.deactivate_venv(debug=debug, **kw)
847        return success
848
849
850    def install_dependencies(
851        self,
852        force: bool = False,
853        debug: bool = False,
854    ) -> bool:
855        """
856        If specified, install dependencies.
857        
858        **NOTE:** Dependencies that start with `'plugin:'` will be installed as
859        Meerschaum plugins from the same repository as this Plugin.
860        To install from a different repository, add the repo keys after `'@'`
861        (e.g. `'plugin:foo@api:bar'`).
862
863        Parameters
864        ----------
865        force: bool, default False
866            If `True`, continue with the installation, even if some
867            required packages fail to install.
868
869        debug: bool, default False
870            Verbosity toggle.
871
872        Returns
873        -------
874        A bool indicating success.
875        """
876        from meerschaum.utils.packages import pip_install, venv_contains_package
877        from meerschaum.utils.warnings import warn, info
878        _deps = self.get_dependencies(debug=debug)
879        if not _deps and self.requirements_file_path is None:
880            return True
881
882        plugins = self.get_required_plugins(debug=debug)
883        for _plugin in plugins:
884            if _plugin.name == self.name:
885                warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False)
886                continue
887            _success, _msg = _plugin.repo_connector.install_plugin(
888                _plugin.name, debug=debug, force=force
889            )
890            if not _success:
891                warn(
892                    f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'"
893                    + f" for plugin '{self.name}':\n" + _msg,
894                    stack = False,
895                )
896                if not force:
897                    warn(
898                        "Try installing with the `--force` flag to continue anyway.",
899                        stack = False,
900                    )
901                    return False
902                info(
903                    "Continuing with installation despite the failure "
904                    + "(careful, things might be broken!)...",
905                    icon = False
906                )
907
908
909        ### First step: parse `requirements.txt` if it exists.
910        if self.requirements_file_path is not None:
911            if not pip_install(
912                requirements_file_path=self.requirements_file_path,
913                venv=self.name, debug=debug
914            ):
915                warn(
916                    f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.",
917                    stack = False,
918                )
919                if not force:
920                    warn(
921                        "Try installing with `--force` to continue anyway.",
922                        stack = False,
923                    )
924                    return False
925                info(
926                    "Continuing with installation despite the failure "
927                    + "(careful, things might be broken!)...",
928                    icon = False
929                )
930
931
932        ### Don't reinstall packages that are already included in required plugins.
933        packages = []
934        _packages = self.get_required_packages(debug=debug)
935        accounted_for_packages = set()
936        for package_name in _packages:
937            for plugin in plugins:
938                if venv_contains_package(package_name, plugin.name):
939                    accounted_for_packages.add(package_name)
940                    break
941        packages = [pkg for pkg in _packages if pkg not in accounted_for_packages]
942
943        ### Attempt pip packages installation.
944        if packages:
945            for package in packages:
946                if not pip_install(package, venv=self.name, debug=debug):
947                    warn(
948                        f"Failed to install required package '{package}'"
949                        + f" for plugin '{self.name}'.",
950                        stack = False,
951                    )
952                    if not force:
953                        warn(
954                            "Try installing with `--force` to continue anyway.",
955                            stack = False,
956                        )
957                        return False
958                    info(
959                        "Continuing with installation despite the failure "
960                        + "(careful, things might be broken!)...",
961                        icon = False
962                    )
963        return True
964
965
966    @property
967    def full_name(self) -> str:
968        """
969        Include the repo keys with the plugin's name.
970        """
971        from meerschaum._internal.static import STATIC_CONFIG
972        sep = STATIC_CONFIG['plugins']['repo_separator']
973        return self.name + sep + str(self.repo_connector)
974
975
976    def __str__(self):
977        return self.name
978
979
980    def __repr__(self):
981        return f"Plugin('{self.name}', repo='{self.repo_connector}')"
982
983
984    def __del__(self):
985        pass

Handle packaging of Meerschaum plugins.

Plugin( name: str, version: Optional[str] = None, user_id: Optional[int] = None, required: Optional[List[str]] = None, attributes: Optional[Dict[str, Any]] = None, archive_path: Optional[pathlib.Path] = None, venv_path: Optional[pathlib.Path] = None, repo_connector: Optional[meerschaum.connectors.APIConnector] = None, repo: Union[meerschaum.connectors.APIConnector, str, NoneType] = None)
33    def __init__(
34        self,
35        name: str,
36        version: Optional[str] = None,
37        user_id: Optional[int] = None,
38        required: Optional[List[str]] = None,
39        attributes: Optional[Dict[str, Any]] = None,
40        archive_path: Optional[pathlib.Path] = None,
41        venv_path: Optional[pathlib.Path] = None,
42        repo_connector: Optional['mrsm.connectors.api.APIConnector'] = None,
43        repo: Union['mrsm.connectors.api.APIConnector', str, None] = None,
44    ):
45        import meerschaum.config.paths as paths
46        from meerschaum._internal.static import STATIC_CONFIG
47        sep = STATIC_CONFIG['plugins']['repo_separator']
48        _repo = None
49        if sep in name:
50            try:
51                name, _repo = name.split(sep)
52            except Exception as e:
53                error(f"Invalid plugin name: '{name}'")
54        self._repo_in_name = _repo
55
56        if attributes is None:
57            attributes = {}
58        self.name = name
59        self.attributes = attributes
60        self.user_id = user_id
61        self._version = version
62        if required:
63            self._required = required
64        self.archive_path = (
65            archive_path if archive_path is not None
66            else paths.PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz"
67        )
68        self.venv_path = (
69            venv_path if venv_path is not None
70            else paths.VIRTENV_RESOURCES_PATH / self.name
71        )
72        self._repo_connector = repo_connector
73        self._repo_keys = repo
name
attributes
user_id
archive_path
venv_path
repo_connector
76    @property
77    def repo_connector(self):
78        """
79        Return the repository connector for this plugin.
80        NOTE: This imports the `connectors` module, which imports certain plugin modules.
81        """
82        if self._repo_connector is None:
83            from meerschaum.connectors.parse import parse_repo_keys
84
85            repo_keys = self._repo_keys or self._repo_in_name
86            if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name:
87                error(
88                    f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'."
89                )
90            repo_connector = parse_repo_keys(repo_keys)
91            self._repo_connector = repo_connector
92        return self._repo_connector

Return the repository connector for this plugin. NOTE: This imports the connectors module, which imports certain plugin modules.

version
 95    @property
 96    def version(self):
 97        """
 98        Return the plugin's module version is defined (`__version__`) if it's defined.
 99        """
100        if self._version is None:
101            try:
102                self._version = self.module.__version__
103            except Exception as e:
104                self._version = None
105        return self._version

Return the plugin's module version is defined (__version__) if it's defined.

module
108    @property
109    def module(self):
110        """
111        Return the Python module of the underlying plugin.
112        """
113        if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None:
114            if self.__file__ is None:
115                return None
116
117            from meerschaum.plugins import import_plugins
118            self._module = import_plugins(str(self), warn=False)
119
120        return self._module

Return the Python module of the underlying plugin.

requirements_file_path: Optional[pathlib.Path]
148    @property
149    def requirements_file_path(self) -> Union[pathlib.Path, None]:
150        """
151        If a file named `requirements.txt` exists, return its path.
152        """
153        if self.__file__ is None:
154            return None
155        path = pathlib.Path(self.__file__).parent / 'requirements.txt'
156        if not path.exists():
157            return None
158        return path

If a file named requirements.txt exists, return its path.

def is_installed(self, **kw) -> bool:
161    def is_installed(self, **kw) -> bool:
162        """
163        Check whether a plugin is correctly installed.
164
165        Returns
166        -------
167        A `bool` indicating whether a plugin exists and is successfully imported.
168        """
169        return self.__file__ is not None

Check whether a plugin is correctly installed.

Returns
  • A bool indicating whether a plugin exists and is successfully imported.
def make_tar(self, debug: bool = False) -> pathlib.Path:
172    def make_tar(self, debug: bool = False) -> pathlib.Path:
173        """
174        Compress the plugin's source files into a `.tar.gz` archive and return the archive's path.
175
176        Parameters
177        ----------
178        debug: bool, default False
179            Verbosity toggle.
180
181        Returns
182        -------
183        A `pathlib.Path` to the archive file's path.
184
185        """
186        import tarfile, pathlib, subprocess, fnmatch
187        from meerschaum.utils.debug import dprint
188        from meerschaum.utils.packages import attempt_import
189        pathspec = attempt_import('pathspec', debug=debug)
190
191        if not self.__file__:
192            from meerschaum.utils.warnings import error
193            error(f"Could not find file for plugin '{self}'.")
194        if '__init__.py' in self.__file__ or os.path.isdir(self.__file__):
195            path = self.__file__.replace('__init__.py', '')
196            is_dir = True
197        else:
198            path = self.__file__
199            is_dir = False
200
201        old_cwd = os.getcwd()
202        real_parent_path = pathlib.Path(os.path.realpath(path)).parent
203        os.chdir(real_parent_path)
204
205        default_patterns_to_ignore = [
206            '.pyc',
207            '__pycache__/',
208            'eggs/',
209            '__pypackages__/',
210            '.git',
211        ]
212
213        def parse_gitignore() -> 'Set[str]':
214            gitignore_path = pathlib.Path(path) / '.gitignore'
215            if not gitignore_path.exists():
216                return set(default_patterns_to_ignore)
217            with open(gitignore_path, 'r', encoding='utf-8') as f:
218                gitignore_text = f.read()
219            return set(pathspec.PathSpec.from_lines(
220                pathspec.patterns.GitWildMatchPattern,
221                default_patterns_to_ignore + gitignore_text.splitlines()
222            ).match_tree(path))
223
224        patterns_to_ignore = parse_gitignore() if is_dir else set()
225
226        if debug:
227            dprint(f"Patterns to ignore:\n{patterns_to_ignore}")
228
229        with tarfile.open(self.archive_path, 'w:gz') as tarf:
230            if not is_dir:
231                tarf.add(f"{self.name}.py")
232            else:
233                for root, dirs, files in os.walk(self.name):
234                    for f in files:
235                        good_file = True
236                        fp = os.path.join(root, f)
237                        for pattern in patterns_to_ignore:
238                            if pattern in str(fp) or f.startswith('.'):
239                                good_file = False
240                                break
241                        if good_file:
242                            if debug:
243                                dprint(f"Adding '{fp}'...")
244                            tarf.add(fp)
245
246        ### clean up and change back to old directory
247        os.chdir(old_cwd)
248
249        ### change to 775 to avoid permissions issues with the API in a Docker container
250        self.archive_path.chmod(0o775)
251
252        if debug:
253            dprint(f"Created archive '{self.archive_path}'.")
254        return self.archive_path

Compress the plugin's source files into a .tar.gz archive and return the archive's path.

Parameters
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pathlib.Path to the archive file's path.
def install( self, skip_deps: bool = False, force: bool = False, debug: bool = False) -> Tuple[bool, str]:
257    def install(
258        self,
259        skip_deps: bool = False,
260        force: bool = False,
261        debug: bool = False,
262    ) -> SuccessTuple:
263        """
264        Extract a plugin's tar archive to the plugins directory.
265        
266        This function checks if the plugin is already installed and if the version is equal or
267        greater than the existing installation.
268
269        Parameters
270        ----------
271        skip_deps: bool, default False
272            If `True`, do not install dependencies.
273
274        force: bool, default False
275            If `True`, continue with installation, even if required packages fail to install.
276
277        debug: bool, default False
278            Verbosity toggle.
279
280        Returns
281        -------
282        A `SuccessTuple` of success (bool) and a message (str).
283
284        """
285        if self.full_name in _ongoing_installations:
286            return True, f"Already installing plugin '{self}'."
287        _ongoing_installations.add(self.full_name)
288
289        import meerschaum.config.paths as paths
290        from meerschaum.utils.warnings import warn, error
291        if debug:
292            from meerschaum.utils.debug import dprint
293        import tarfile
294        import re
295        import ast
296        from meerschaum.plugins import sync_plugins_symlinks
297        from meerschaum.utils.packages import attempt_import, reload_meerschaum
298        from meerschaum.utils.venv import init_venv
299        from meerschaum.utils.misc import safely_extract_tar
300        old_cwd = os.getcwd()
301        old_version = ''
302        new_version = ''
303        temp_dir = paths.PLUGINS_TEMP_RESOURCES_PATH / self.name
304        temp_dir.mkdir(exist_ok=True)
305
306        if not self.archive_path.exists():
307            return False, f"Missing archive file for plugin '{self}'."
308        if self.version is not None:
309            old_version = self.version
310            if debug:
311                dprint(f"Found existing version '{old_version}' for plugin '{self}'.")
312
313        if debug:
314            dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...")
315
316        try:
317            with tarfile.open(self.archive_path, 'r:gz') as tarf:
318                safely_extract_tar(tarf, temp_dir)
319        except Exception as e:
320            warn(e)
321            return False, f"Failed to extract plugin '{self.name}'."
322
323        ### search for version information
324        files = os.listdir(temp_dir)
325        
326        if str(files[0]) == self.name:
327            is_dir = True
328        elif str(files[0]) == self.name + '.py':
329            is_dir = False
330        else:
331            error(f"Unknown format encountered for plugin '{self}'.")
332
333        fpath = temp_dir / files[0]
334        if is_dir:
335            fpath = fpath / '__init__.py'
336
337        init_venv(self.name, debug=debug)
338        with open(fpath, 'r', encoding='utf-8') as f:
339            init_lines = f.readlines()
340        new_version = None
341        for line in init_lines:
342            if '__version__' not in line:
343                continue
344            version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip())
345            if not version_match:
346                continue
347            new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip())
348            break
349        if not new_version:
350            warn(
351                f"No `__version__` defined for plugin '{self}'. "
352                + "Assuming new version...",
353                stack = False,
354            )
355
356        packaging_version = attempt_import('packaging.version')
357        try:
358            is_new_version = (not new_version and not old_version) or (
359                packaging_version.parse(old_version) < packaging_version.parse(new_version)
360            )
361            is_same_version = new_version and old_version and (
362                packaging_version.parse(old_version) == packaging_version.parse(new_version)
363            )
364        except Exception:
365            is_new_version, is_same_version = True, False
366
367        ### Determine where to permanently store the new plugin.
368        plugin_installation_dir_path = paths.PLUGINS_DIR_PATHS[0]
369        for path in paths.PLUGINS_DIR_PATHS:
370            if not path.exists():
371                warn(f"Plugins path does not exist: {path}", stack=False)
372                continue
373
374            files_in_plugins_dir = os.listdir(path)
375            if (
376                self.name in files_in_plugins_dir
377                or
378                (self.name + '.py') in files_in_plugins_dir
379            ):
380                plugin_installation_dir_path = path
381                break
382
383        success_msg = (
384            f"Successfully installed plugin '{self}'"
385            + ("\n    (skipped dependencies)" if skip_deps else "")
386            + "."
387        )
388        success, abort = None, None
389
390        if is_same_version and not force:
391            success, msg = True, (
392                f"Plugin '{self}' is up-to-date (version {old_version}).\n" +
393                "    Install again with `-f` or `--force` to reinstall."
394            )
395            abort = True
396        elif is_new_version or force:
397            for src_dir, dirs, files in os.walk(temp_dir):
398                if success is not None:
399                    break
400                dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path))
401                if not os.path.exists(dst_dir):
402                    os.mkdir(dst_dir)
403                for f in files:
404                    src_file = os.path.join(src_dir, f)
405                    dst_file = os.path.join(dst_dir, f)
406                    if os.path.exists(dst_file):
407                        os.remove(dst_file)
408
409                    if debug:
410                        dprint(f"Moving '{src_file}' to '{dst_dir}'...")
411                    try:
412                        shutil.move(src_file, dst_dir)
413                    except Exception:
414                        success, msg = False, (
415                            f"Failed to install plugin '{self}': " +
416                            f"Could not move file '{src_file}' to '{dst_dir}'"
417                        )
418                        print(msg)
419                        break
420            if success is None:
421                success, msg = True, success_msg
422        else:
423            success, msg = False, (
424                f"Your installed version of plugin '{self}' ({old_version}) is higher than "
425                + f"attempted version {new_version}."
426            )
427
428        shutil.rmtree(temp_dir)
429        os.chdir(old_cwd)
430
431        ### Reload the plugin's module.
432        sync_plugins_symlinks(debug=debug)
433        if '_module' in self.__dict__:
434            del self.__dict__['_module']
435        init_venv(venv=self.name, force=True, debug=debug)
436        reload_meerschaum(debug=debug)
437
438        ### if we've already failed, return here
439        if not success or abort:
440            _ongoing_installations.remove(self.full_name)
441            return success, msg
442
443        ### attempt to install dependencies
444        dependencies_installed = skip_deps or self.install_dependencies(force=force, debug=debug)
445        if not dependencies_installed:
446            _ongoing_installations.remove(self.full_name)
447            return False, f"Failed to install dependencies for plugin '{self}'."
448
449        ### handling success tuple, bool, or other (typically None)
450        setup_tuple = self.setup(debug=debug)
451        if isinstance(setup_tuple, tuple):
452            if not setup_tuple[0]:
453                success, msg = setup_tuple
454        elif isinstance(setup_tuple, bool):
455            if not setup_tuple:
456                success, msg = False, (
457                    f"Failed to run post-install setup for plugin '{self}'." + '\n' +
458                    f"Check `setup()` in '{self.__file__}' for more information " +
459                    "(no error message provided)."
460                )
461            else:
462                success, msg = True, success_msg
463        elif setup_tuple is None:
464            success = True
465            msg = (
466                f"Post-install for plugin '{self}' returned None. " +
467                "Assuming plugin successfully installed."
468            )
469            warn(msg)
470        else:
471            success = False
472            msg = (
473                f"Post-install for plugin '{self}' returned unexpected value " +
474                f"of type '{type(setup_tuple)}': {setup_tuple}"
475            )
476
477        _ongoing_installations.remove(self.full_name)
478        _ = self.module
479        return success, msg

Extract a plugin's tar archive to the plugins directory.

This function checks if the plugin is already installed and if the version is equal or greater than the existing installation.

Parameters
  • skip_deps (bool, default False): If True, do not install dependencies.
  • force (bool, default False): If True, continue with installation, even if required packages fail to install.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A SuccessTuple of success (bool) and a message (str).
def remove_archive(self, debug: bool = False) -> Tuple[bool, str]:
482    def remove_archive(
483        self,        
484        debug: bool = False
485    ) -> SuccessTuple:
486        """Remove a plugin's archive file."""
487        if not self.archive_path.exists():
488            return True, f"Archive file for plugin '{self}' does not exist."
489        try:
490            self.archive_path.unlink()
491        except Exception as e:
492            return False, f"Failed to remove archive for plugin '{self}':\n{e}"
493        return True, "Success"

Remove a plugin's archive file.

def remove_venv(self, debug: bool = False) -> Tuple[bool, str]:
496    def remove_venv(
497        self,        
498        debug: bool = False
499    ) -> SuccessTuple:
500        """Remove a plugin's virtual environment."""
501        if not self.venv_path.exists():
502            return True, f"Virtual environment for plugin '{self}' does not exist."
503        try:
504            shutil.rmtree(self.venv_path)
505        except Exception as e:
506            return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}"
507        return True, "Success"

Remove a plugin's virtual environment.

def uninstall(self, debug: bool = False) -> Tuple[bool, str]:
510    def uninstall(self, debug: bool = False) -> SuccessTuple:
511        """
512        Remove a plugin, its virtual environment, and archive file.
513        """
514        from meerschaum.utils.packages import reload_meerschaum
515        from meerschaum.plugins import sync_plugins_symlinks
516        from meerschaum.utils.warnings import warn, info
517        warnings_thrown_count: int = 0
518        max_warnings: int = 3
519
520        if not self.is_installed():
521            info(
522                f"Plugin '{self.name}' doesn't seem to be installed.\n    "
523                + "Checking for artifacts...",
524                stack = False,
525            )
526        else:
527            real_path = pathlib.Path(os.path.realpath(self.__file__))
528            try:
529                if real_path.name == '__init__.py':
530                    shutil.rmtree(real_path.parent)
531                else:
532                    real_path.unlink()
533            except Exception as e:
534                warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False)
535                warnings_thrown_count += 1
536            else:
537                info(f"Removed source files for plugin '{self.name}'.")
538
539        if self.venv_path.exists():
540            success, msg = self.remove_venv(debug=debug)
541            if not success:
542                warn(msg, stack=False)
543                warnings_thrown_count += 1
544            else:
545                info(f"Removed virtual environment from plugin '{self.name}'.")
546
547        success = warnings_thrown_count < max_warnings
548        sync_plugins_symlinks(debug=debug)
549        self.deactivate_venv(force=True, debug=debug)
550        reload_meerschaum(debug=debug)
551        return success, (
552            f"Successfully uninstalled plugin '{self}'." if success
553            else f"Failed to uninstall plugin '{self}'."
554        )

Remove a plugin, its virtual environment, and archive file.

def setup( self, *args: str, debug: bool = False, **kw: Any) -> Union[Tuple[bool, str], bool]:
557    def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]:
558        """
559        If exists, run the plugin's `setup()` function.
560
561        Parameters
562        ----------
563        *args: str
564            The positional arguments passed to the `setup()` function.
565            
566        debug: bool, default False
567            Verbosity toggle.
568
569        **kw: Any
570            The keyword arguments passed to the `setup()` function.
571
572        Returns
573        -------
574        A `SuccessTuple` or `bool` indicating success.
575
576        """
577        from meerschaum.utils.debug import dprint
578        import inspect
579        _setup = None
580        for name, fp in inspect.getmembers(self.module):
581            if name == 'setup' and inspect.isfunction(fp):
582                _setup = fp
583                break
584
585        ### assume success if no setup() is found (not necessary)
586        if _setup is None:
587            return True
588
589        sig = inspect.signature(_setup)
590        has_debug, has_kw = ('debug' in sig.parameters), False
591        for k, v in sig.parameters.items():
592            if '**' in str(v):
593                has_kw = True
594                break
595
596        _kw = {}
597        if has_kw:
598            _kw.update(kw)
599        if has_debug:
600            _kw['debug'] = debug
601
602        if debug:
603            dprint(f"Running setup for plugin '{self}'...")
604        try:
605            self.activate_venv(debug=debug)
606            return_tuple = _setup(*args, **_kw)
607            self.deactivate_venv(debug=debug)
608        except Exception as e:
609            return False, str(e)
610
611        if isinstance(return_tuple, tuple):
612            return return_tuple
613        if isinstance(return_tuple, bool):
614            return return_tuple, f"Setup for Plugin '{self.name}' did not return a message."
615        if return_tuple is None:
616            return False, f"Setup for Plugin '{self.name}' returned None."
617        return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"

If exists, run the plugin's setup() function.

Parameters
  • *args (str): The positional arguments passed to the setup() function.
  • debug (bool, default False): Verbosity toggle.
  • **kw (Any): The keyword arguments passed to the setup() function.
Returns
  • A SuccessTuple or bool indicating success.
def get_dependencies(self, debug: bool = False) -> List[str]:
620    def get_dependencies(
621        self,
622        debug: bool = False,
623    ) -> List[str]:
624        """
625        If the Plugin has specified dependencies in a list called `required`, return the list.
626        
627        **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages.
628        Meerschaum plugins may also specify connector keys for a repo after `'@'`.
629
630        Parameters
631        ----------
632        debug: bool, default False
633            Verbosity toggle.
634
635        Returns
636        -------
637        A list of required packages and plugins (str).
638
639        """
640        if '_required' in self.__dict__:
641            return self._required
642
643        ### If the plugin has not yet been imported,
644        ### infer the dependencies from the source text.
645        ### This is not super robust, and it doesn't feel right
646        ### having multiple versions of the logic.
647        ### This is necessary when determining the activation order
648        ### without having import the module.
649        ### For consistency's sake, the module-less method does not cache the requirements.
650        if self.__dict__.get('_module', None) is None:
651            file_path = self.__file__
652            if file_path is None:
653                return []
654            with open(file_path, 'r', encoding='utf-8') as f:
655                text = f.read()
656
657            if 'required' not in text:
658                return []
659
660            ### This has some limitations:
661            ### It relies on `required` being manually declared.
662            ### We lose the ability to dynamically alter the `required` list,
663            ### which is why we've kept the module-reliant method below.
664            import ast, re
665            ### NOTE: This technically would break 
666            ### if `required` was the very first line of the file.
667            req_start_match = re.search(r'\nrequired(:\s*)?.*=', text)
668            if not req_start_match:
669                return []
670            req_start = req_start_match.start()
671            equals_sign = req_start + text[req_start:].find('=')
672
673            ### Dependencies may have brackets within the strings, so push back the index.
674            first_opening_brace = equals_sign + 1 + text[equals_sign:].find('[')
675            if first_opening_brace == -1:
676                return []
677
678            next_closing_brace = equals_sign + 1 + text[equals_sign:].find(']')
679            if next_closing_brace == -1:
680                return []
681
682            start_ix = first_opening_brace + 1
683            end_ix = next_closing_brace
684
685            num_braces = 0
686            while True:
687                if '[' not in text[start_ix:end_ix]:
688                    break
689                num_braces += 1
690                start_ix = end_ix
691                end_ix += text[end_ix + 1:].find(']') + 1
692
693            req_end = end_ix + 1
694            req_text = (
695                text[(first_opening_brace-1):req_end]
696                .lstrip()
697                .replace('=', '', 1)
698                .lstrip()
699                .rstrip()
700            )
701            try:
702                required = ast.literal_eval(req_text)
703            except Exception as e:
704                warn(
705                    f"Unable to determine requirements for plugin '{self.name}' "
706                    + "without importing the module.\n"
707                    + "    This may be due to dynamically setting the global `required` list.\n"
708                    + f"    {e}"
709                )
710                return []
711            return required
712
713        import inspect
714        self.activate_venv(dependencies=False, debug=debug)
715        required = []
716        for name, val in inspect.getmembers(self.module):
717            if name == 'required':
718                required = val
719                break
720        self._required = required
721        self.deactivate_venv(dependencies=False, debug=debug)
722        return required

If the Plugin has specified dependencies in a list called required, return the list.

NOTE: Dependecies which start with 'plugin:' are Meerschaum plugins, not pip packages. Meerschaum plugins may also specify connector keys for a repo after '@'.

Parameters
  • debug (bool, default False): Verbosity toggle.
Returns
  • A list of required packages and plugins (str).
def get_required_plugins(self, debug: bool = False) -> List[Plugin]:
725    def get_required_plugins(self, debug: bool=False) -> List[mrsm.plugins.Plugin]:
726        """
727        Return a list of required Plugin objects.
728        """
729        from meerschaum.utils.warnings import warn
730        from meerschaum.config import get_config
731        from meerschaum._internal.static import STATIC_CONFIG
732        from meerschaum.connectors.parse import is_valid_connector_keys
733        plugins = []
734        _deps = self.get_dependencies(debug=debug)
735        sep = STATIC_CONFIG['plugins']['repo_separator']
736        plugin_names = [
737            _d[len('plugin:'):] for _d in _deps
738            if _d.startswith('plugin:') and len(_d) > len('plugin:')
739        ]
740        default_repo_keys = get_config('meerschaum', 'repository')
741        skipped_repo_keys = set()
742
743        for _plugin_name in plugin_names:
744            if sep in _plugin_name:
745                try:
746                    _plugin_name, _repo_keys = _plugin_name.split(sep)
747                except Exception:
748                    _repo_keys = default_repo_keys
749                    warn(
750                        f"Invalid repo keys for required plugin '{_plugin_name}'.\n    "
751                        + f"Will try to use '{_repo_keys}' instead.",
752                        stack = False,
753                    )
754            else:
755                _repo_keys = default_repo_keys
756
757            if _repo_keys in skipped_repo_keys:
758                continue
759
760            if not is_valid_connector_keys(_repo_keys):
761                warn(
762                    f"Invalid connector '{_repo_keys}'.\n"
763                    f"    Skipping required plugins from repository '{_repo_keys}'",
764                    stack=False,
765                )
766                continue
767
768            plugins.append(Plugin(_plugin_name, repo=_repo_keys))
769
770        return plugins

Return a list of required Plugin objects.

def get_required_packages(self, debug: bool = False) -> List[str]:
773    def get_required_packages(self, debug: bool=False) -> List[str]:
774        """
775        Return the required package names (excluding plugins).
776        """
777        _deps = self.get_dependencies(debug=debug)
778        return [_d for _d in _deps if not _d.startswith('plugin:')]

Return the required package names (excluding plugins).

def activate_venv( self, dependencies: bool = True, init_if_not_exists: bool = True, debug: bool = False, **kw) -> bool:
781    def activate_venv(
782        self,
783        dependencies: bool = True,
784        init_if_not_exists: bool = True,
785        debug: bool = False,
786        **kw
787    ) -> bool:
788        """
789        Activate the virtual environments for the plugin and its dependencies.
790
791        Parameters
792        ----------
793        dependencies: bool, default True
794            If `True`, activate the virtual environments for required plugins.
795
796        Returns
797        -------
798        A bool indicating success.
799        """
800        import meerschaum.config.paths as paths
801        from meerschaum.utils.venv import venv_target_path
802        from meerschaum.utils.packages import activate_venv
803        from meerschaum.utils.misc import make_symlink, is_symlink
804
805        if dependencies:
806            for plugin in self.get_required_plugins(debug=debug):
807                plugin.activate_venv(debug=debug, init_if_not_exists=init_if_not_exists, **kw)
808
809        vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True)
810        venv_meerschaum_path = vtp / 'meerschaum'
811
812        try:
813            success, msg = True, "Success"
814            if is_symlink(venv_meerschaum_path):
815                if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != paths.PACKAGE_ROOT_PATH:
816                    venv_meerschaum_path.unlink()
817                    success, msg = make_symlink(venv_meerschaum_path, paths.PACKAGE_ROOT_PATH)
818        except Exception as e:
819            success, msg = False, str(e)
820        if not success:
821            warn(
822                f"Unable to create symlink {venv_meerschaum_path} to {paths.PACKAGE_ROOT_PATH}:\n"
823                f"{msg}"
824            )
825
826        return activate_venv(self.name, init_if_not_exists=init_if_not_exists, debug=debug, **kw)

Activate the virtual environments for the plugin and its dependencies.

Parameters
  • dependencies (bool, default True): If True, activate the virtual environments for required plugins.
Returns
  • A bool indicating success.
def deactivate_venv(self, dependencies: bool = True, debug: bool = False, **kw) -> bool:
829    def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool:
830        """
831        Deactivate the virtual environments for the plugin and its dependencies.
832
833        Parameters
834        ----------
835        dependencies: bool, default True
836            If `True`, deactivate the virtual environments for required plugins.
837
838        Returns
839        -------
840        A bool indicating success.
841        """
842        from meerschaum.utils.packages import deactivate_venv
843        success = deactivate_venv(self.name, debug=debug, **kw)
844        if dependencies:
845            for plugin in self.get_required_plugins(debug=debug):
846                plugin.deactivate_venv(debug=debug, **kw)
847        return success

Deactivate the virtual environments for the plugin and its dependencies.

Parameters
  • dependencies (bool, default True): If True, deactivate the virtual environments for required plugins.
Returns
  • A bool indicating success.
def install_dependencies(self, force: bool = False, debug: bool = False) -> bool:
850    def install_dependencies(
851        self,
852        force: bool = False,
853        debug: bool = False,
854    ) -> bool:
855        """
856        If specified, install dependencies.
857        
858        **NOTE:** Dependencies that start with `'plugin:'` will be installed as
859        Meerschaum plugins from the same repository as this Plugin.
860        To install from a different repository, add the repo keys after `'@'`
861        (e.g. `'plugin:foo@api:bar'`).
862
863        Parameters
864        ----------
865        force: bool, default False
866            If `True`, continue with the installation, even if some
867            required packages fail to install.
868
869        debug: bool, default False
870            Verbosity toggle.
871
872        Returns
873        -------
874        A bool indicating success.
875        """
876        from meerschaum.utils.packages import pip_install, venv_contains_package
877        from meerschaum.utils.warnings import warn, info
878        _deps = self.get_dependencies(debug=debug)
879        if not _deps and self.requirements_file_path is None:
880            return True
881
882        plugins = self.get_required_plugins(debug=debug)
883        for _plugin in plugins:
884            if _plugin.name == self.name:
885                warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False)
886                continue
887            _success, _msg = _plugin.repo_connector.install_plugin(
888                _plugin.name, debug=debug, force=force
889            )
890            if not _success:
891                warn(
892                    f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'"
893                    + f" for plugin '{self.name}':\n" + _msg,
894                    stack = False,
895                )
896                if not force:
897                    warn(
898                        "Try installing with the `--force` flag to continue anyway.",
899                        stack = False,
900                    )
901                    return False
902                info(
903                    "Continuing with installation despite the failure "
904                    + "(careful, things might be broken!)...",
905                    icon = False
906                )
907
908
909        ### First step: parse `requirements.txt` if it exists.
910        if self.requirements_file_path is not None:
911            if not pip_install(
912                requirements_file_path=self.requirements_file_path,
913                venv=self.name, debug=debug
914            ):
915                warn(
916                    f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.",
917                    stack = False,
918                )
919                if not force:
920                    warn(
921                        "Try installing with `--force` to continue anyway.",
922                        stack = False,
923                    )
924                    return False
925                info(
926                    "Continuing with installation despite the failure "
927                    + "(careful, things might be broken!)...",
928                    icon = False
929                )
930
931
932        ### Don't reinstall packages that are already included in required plugins.
933        packages = []
934        _packages = self.get_required_packages(debug=debug)
935        accounted_for_packages = set()
936        for package_name in _packages:
937            for plugin in plugins:
938                if venv_contains_package(package_name, plugin.name):
939                    accounted_for_packages.add(package_name)
940                    break
941        packages = [pkg for pkg in _packages if pkg not in accounted_for_packages]
942
943        ### Attempt pip packages installation.
944        if packages:
945            for package in packages:
946                if not pip_install(package, venv=self.name, debug=debug):
947                    warn(
948                        f"Failed to install required package '{package}'"
949                        + f" for plugin '{self.name}'.",
950                        stack = False,
951                    )
952                    if not force:
953                        warn(
954                            "Try installing with `--force` to continue anyway.",
955                            stack = False,
956                        )
957                        return False
958                    info(
959                        "Continuing with installation despite the failure "
960                        + "(careful, things might be broken!)...",
961                        icon = False
962                    )
963        return True

If specified, install dependencies.

NOTE: Dependencies that start with 'plugin:' will be installed as Meerschaum plugins from the same repository as this Plugin. To install from a different repository, add the repo keys after '@' (e.g. 'plugin:foo@api:bar').

Parameters
  • force (bool, default False): If True, continue with the installation, even if some required packages fail to install.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool indicating success.
full_name: str
966    @property
967    def full_name(self) -> str:
968        """
969        Include the repo keys with the plugin's name.
970        """
971        from meerschaum._internal.static import STATIC_CONFIG
972        sep = STATIC_CONFIG['plugins']['repo_separator']
973        return self.name + sep + str(self.repo_connector)

Include the repo keys with the plugin's name.

def make_action( function: Optional[Callable[[Any], Any]] = None, shell: bool = False, activate: bool = True, deactivate: bool = True, debug: bool = False, daemon: bool = True, skip_if_loaded: bool = True, _plugin_name: Optional[str] = None) -> Callable[[Any], Any]:
 60def make_action(
 61    function: Optional[Callable[[Any], Any]] = None,
 62    shell: bool = False,
 63    activate: bool = True,
 64    deactivate: bool = True,
 65    debug: bool = False,
 66    daemon: bool = True,
 67    skip_if_loaded: bool = True,
 68    _plugin_name: Optional[str] = None,
 69) -> Callable[[Any], Any]:
 70    """
 71    Make a function a Meerschaum action. Useful for plugins that are adding multiple actions.
 72    
 73    Parameters
 74    ----------
 75    function: Callable[[Any], Any]
 76        The function to become a Meerschaum action. Must accept all keyword arguments.
 77        
 78    shell: bool, default False
 79        Not used.
 80        
 81    Returns
 82    -------
 83    Another function (this is a decorator function).
 84
 85    Examples
 86    --------
 87    >>> from meerschaum.plugins import make_action
 88    >>>
 89    >>> @make_action
 90    ... def my_action(**kw):
 91    ...     print('foo')
 92    ...     return True, "Success"
 93    >>>
 94    """
 95    def _decorator(func: Callable[[Any], Any]) -> Callable[[Any], Any]:
 96        from meerschaum.actions import actions, _custom_actions_plugins, _plugins_actions
 97        if skip_if_loaded and func.__name__ in actions:
 98            return func
 99
100        import meerschaum.config.paths as paths
101        plugin_name = (
102            func.__name__.split(
103                f"{paths.PLUGINS_RESOURCES_PATH.stem}.",
104                maxsplit=1,
105            )[-1].split('.')[0]
106        )
107        plugin = Plugin(plugin_name) if plugin_name else None
108
109        if debug:
110            from meerschaum.utils.debug import dprint
111            dprint(
112                f"Adding action '{func.__name__}' from plugin "
113                f"'{plugin}'..."
114            )
115
116        actions[func.__name__] = func
117        _custom_actions_plugins[func.__name__] = plugin_name
118        if plugin_name not in _plugins_actions:
119            _plugins_actions[plugin_name] = []
120        _plugins_actions[plugin_name].append(func.__name__)
121        if not daemon:
122            _actions_daemon_enabled[func.__name__] = False
123        return func
124
125    if function is None:
126        return _decorator
127    return _decorator(function)

Make a function a Meerschaum action. Useful for plugins that are adding multiple actions.

Parameters
  • function (Callable[[Any], Any]): The function to become a Meerschaum action. Must accept all keyword arguments.
  • shell (bool, default False): Not used.
Returns
  • Another function (this is a decorator function).
Examples
>>> from meerschaum.plugins import make_action
>>>
>>> @make_action
... def my_action(**kw):
...     print('foo')
...     return True, "Success"
>>>
def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
298def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
299    """
300    Execute the function when initializing the Meerschaum API module.
301    Useful for lazy-loading heavy plugins only when the API is started,
302    such as when editing the `meerschaum.api.app` FastAPI app.
303    
304    The FastAPI app will be passed as the only parameter.
305    
306    Examples
307    --------
308    >>> from meerschaum.plugins import api_plugin
309    >>>
310    >>> @api_plugin
311    >>> def initialize_plugin(app):
312    ...     @app.get('/my/new/path')
313    ...     def new_path():
314    ...         return {'message': 'It works!'}
315    >>>
316    """
317    with _locks['_api_plugins']:
318        try:
319            if function.__module__ not in _api_plugins:
320                _api_plugins[function.__module__] = []
321            _api_plugins[function.__module__].append(function)
322        except Exception as e:
323            from meerschaum.utils.warnings import warn
324            warn(e)
325    return function

Execute the function when initializing the Meerschaum API module. Useful for lazy-loading heavy plugins only when the API is started, such as when editing the meerschaum.api.app FastAPI app.

The FastAPI app will be passed as the only parameter.

Examples
>>> from meerschaum.plugins import api_plugin
>>>
>>> @api_plugin
>>> def initialize_plugin(app):
...     @app.get('/my/new/path')
...     def new_path():
...         return {'message': 'It works!'}
>>>
def dash_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
282def dash_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
283    """
284    Execute the function when starting the Dash application.
285    """
286    with _locks['_dash_plugins']:
287        plugin_name = _get_parent_plugin()
288        try:
289            if plugin_name not in _dash_plugins:
290                _dash_plugins[plugin_name] = []
291            _dash_plugins[plugin_name].append(function)
292        except Exception as e:
293            from meerschaum.utils.warnings import warn
294            warn(e)
295    return function

Execute the function when starting the Dash application.

def web_page( page: Union[str, NoneType, Callable[[Any], Any]] = None, login_required: bool = True, skip_navbar: bool = False, page_group: Optional[str] = None, **kwargs) -> Any:
208def web_page(
209    page: Union[str, None, Callable[[Any], Any]] = None,
210    login_required: bool = True,
211    skip_navbar: bool = False,
212    page_group: Optional[str] = None,
213    **kwargs
214) -> Any:
215    """
216    Quickly add pages to the dash application.
217
218    Examples
219    --------
220    >>> import meerschaum as mrsm
221    >>> from meerschaum.plugins import web_page
222    >>> html = mrsm.attempt_import('dash.html')
223    >>> 
224    >>> @web_page('foo/bar', login_required=False)
225    >>> def foo_bar():
226    ...     return html.Div([html.H1("Hello, World!")])
227    >>> 
228    """
229    page_str = None
230
231    def _decorator(_func: Callable[[Any], Any]) -> Callable[[Any], Any]:
232        nonlocal page_str, page_group
233
234        @functools.wraps(_func)
235        def wrapper(*_args, **_kwargs):
236            return _func(*_args, **_kwargs)
237
238        if page_str is None:
239            page_str = _func.__name__
240
241        page_str = page_str.lstrip('/').rstrip('/').strip()
242        if not page_str.startswith('dash'):
243            page_str = f'/dash/{page_str}'
244        page_key = (
245            ' '.join(
246                [
247                    word.capitalize()
248                    for word in (
249                        page_str.replace('/dash', '').lstrip('/').rstrip('/').strip()
250                        .replace('-', ' ').replace('_', ' ').split(' ')
251                    )
252                ]
253            )
254        )
255 
256        plugin_name = _get_parent_plugin()
257        page_group = page_group or plugin_name
258        if page_group not in _plugin_endpoints_to_pages:
259            _plugin_endpoints_to_pages[page_group] = {}
260        _plugin_endpoints_to_pages[page_group][page_str] = {
261            'function': _func,
262            'login_required': login_required,
263            'skip_navbar': skip_navbar,
264            'page_key': page_key,
265        }
266        if plugin_name not in _plugins_web_pages:
267            _plugins_web_pages[plugin_name] = []
268        _plugins_web_pages[plugin_name].append(_func)
269        return wrapper
270
271    if callable(page):
272        decorator_to_return = _decorator(page)
273        page_str = page.__name__
274    else:
275        decorator_to_return = _decorator
276        page_str = page
277
278    return decorator_to_return

Quickly add pages to the dash application.

Examples
>>> import meerschaum as mrsm
>>> from meerschaum.plugins import web_page
>>> html = mrsm.attempt_import('dash.html')
>>> 
>>> @web_page('foo/bar', login_required=False)
>>> def foo_bar():
...     return html.Div([html.H1("Hello, World!")])
>>>
def import_plugins( *plugins_to_import: Union[str, List[str], NoneType], warn: bool = True) -> "Union['ModuleType', Tuple['ModuleType', None]]":
514def import_plugins(
515    *plugins_to_import: Union[str, List[str], None],
516    warn: bool = True,
517) -> Union[
518    'ModuleType', Tuple['ModuleType', None]
519]:
520    """
521    Import the Meerschaum plugins directory.
522
523    Parameters
524    ----------
525    plugins_to_import: Union[str, List[str], None]
526        If provided, only import the specified plugins.
527        Otherwise import the entire plugins module. May be a string, list, or `None`.
528        Defaults to `None`.
529
530    Returns
531    -------
532    A module of list of modules, depening on the number of plugins provided.
533
534    """
535    import sys
536    import importlib
537    import meerschaum.config.paths as paths
538    from meerschaum.utils.misc import flatten_list
539    from meerschaum.utils.venv import is_venv_active, activate_venv, deactivate_venv, Venv
540    from meerschaum.utils.warnings import warn as _warn
541    plugins_to_import = list(plugins_to_import)
542    prepended_sys_path = False
543    with _locks['sys.path']:
544
545        ### Since plugins may depend on other plugins,
546        ### we need to activate the virtual environments for library plugins.
547        ### This logic exists in `Plugin.activate_venv()`,
548        ### but that code requires the plugin's module to already be imported.
549        ### It's not a guarantee of correct activation order,
550        ### e.g. if a library plugin pins a specific package and another 
551        plugins_names = get_plugins_names()
552        already_active_venvs = [is_venv_active(plugin_name) for plugin_name in plugins_names]
553
554        if not sys.path or sys.path[0] != str(paths.PLUGINS_RESOURCES_PATH.parent):
555            prepended_sys_path = True
556            sys.path.insert(0, str(paths.PLUGINS_RESOURCES_PATH.parent))
557
558        if not plugins_to_import:
559            for plugin_name in plugins_names:
560                activate_venv(plugin_name)
561            try:
562                imported_plugins = importlib.import_module(paths.PLUGINS_RESOURCES_PATH.stem)
563            except ImportError as e:
564                _warn(f"Failed to import the plugins module:\n    {e}")
565                import traceback
566                traceback.print_exc()
567                imported_plugins = None
568            for plugin_name in plugins_names:
569                if plugin_name in already_active_venvs:
570                    continue
571                deactivate_venv(plugin_name)
572
573        else:
574            imported_plugins = []
575            for plugin_name in flatten_list(plugins_to_import):
576                plugin = Plugin(plugin_name)
577                try:
578                    with Venv(plugin, init_if_not_exists=False):
579                        imported_plugins.append(
580                            importlib.import_module(
581                                f'{paths.PLUGINS_RESOURCES_PATH.stem}.{plugin_name}'
582                            )
583                        )
584                except Exception as e:
585                    _warn(
586                        f"Failed to import plugin '{plugin_name}':\n    "
587                        + f"{e}\n\nHere's a stacktrace:",
588                        stack = False,
589                    )
590                    from meerschaum.utils.formatting import get_console
591                    get_console().print_exception(
592                        suppress = [
593                            'meerschaum/plugins/__init__.py',
594                            importlib,
595                            importlib._bootstrap,
596                        ]
597                    )
598                    imported_plugins.append(None)
599
600        if imported_plugins is None and warn:
601            _warn("Failed to import plugins.", stacklevel=3)
602
603        if prepended_sys_path and str(paths.PLUGINS_RESOURCES_PATH.parent) in sys.path:
604            sys.path.remove(str(paths.PLUGINS_RESOURCES_PATH.parent))
605
606    if isinstance(imported_plugins, list):
607        return (imported_plugins[0] if len(imported_plugins) == 1 else tuple(imported_plugins))
608    return imported_plugins

Import the Meerschaum plugins directory.

Parameters
  • plugins_to_import (Union[str, List[str], None]): If provided, only import the specified plugins. Otherwise import the entire plugins module. May be a string, list, or None. Defaults to None.
Returns
  • A module of list of modules, depening on the number of plugins provided.
def from_plugin_import(plugin_import_name: str, *attrs: str) -> Any:
611def from_plugin_import(plugin_import_name: str, *attrs: str) -> Any:
612    """
613    Emulate the `from module import x` behavior.
614
615    Parameters
616    ----------
617    plugin_import_name: str
618        The import name of the plugin's module.
619        Separate submodules with '.' (e.g. 'compose.utils.pipes')
620
621    attrs: str
622        Names of the attributes to return.
623
624    Returns
625    -------
626    Objects from a plugin's submodule.
627    If multiple objects are provided, return a tuple.
628
629    Examples
630    --------
631    >>> init = from_plugin_import('compose.utils', 'init')
632    >>> with mrsm.Venv('compose'):
633    ...     cf = init()
634    >>> build_parent_pipe, get_defined_pipes = from_plugin_import(
635    ...     'compose.utils.pipes',
636    ...     'build_parent_pipe',
637    ...     'get_defined_pipes',
638    ... )
639    >>> parent_pipe = build_parent_pipe(cf)
640    >>> defined_pipes = get_defined_pipes(cf)
641    """
642    import importlib
643    import meerschaum.config.paths as paths
644    from meerschaum.utils.warnings import warn as _warn
645    if plugin_import_name.startswith('plugins.'):
646        plugin_import_name = plugin_import_name[len('plugins.'):]
647    plugin_import_parts = plugin_import_name.split('.')
648    plugin_root_name = plugin_import_parts[0]
649    plugin = mrsm.Plugin(plugin_root_name)
650
651    submodule_import_name = '.'.join(
652        [paths.PLUGINS_RESOURCES_PATH.stem]
653        + plugin_import_parts
654    )
655    if len(attrs) == 0:
656        raise ValueError(f"Provide which attributes to return from '{submodule_import_name}'.")
657
658    attrs_to_return = []
659    with mrsm.Venv(plugin):
660        if plugin.module is None:
661            raise ImportError(f"Unable to import plugin '{plugin}'.")
662
663        try:
664            submodule = importlib.import_module(submodule_import_name)
665        except ImportError as e:
666            _warn(
667                f"Failed to import plugin '{submodule_import_name}':\n    "
668                + f"{e}\n\nHere's a stacktrace:",
669                stack=False,
670            )
671            from meerschaum.utils.formatting import get_console
672            get_console().print_exception(
673                suppress=[
674                    'meerschaum/plugins/__init__.py',
675                    importlib,
676                    importlib._bootstrap,
677                ]
678            )
679            return None
680
681        for attr in attrs:
682            try:
683                attrs_to_return.append(getattr(submodule, attr))
684            except Exception:
685                _warn(f"Failed to access '{attr}' from '{submodule_import_name}'.")
686                attrs_to_return.append(None)
687        
688        if len(attrs) == 1:
689            return attrs_to_return[0]
690
691        return tuple(attrs_to_return)

Emulate the from module import x behavior.

Parameters
  • plugin_import_name (str): The import name of the plugin's module. Separate submodules with '.' (e.g. 'compose.utils.pipes')
  • attrs (str): Names of the attributes to return.
Returns
  • Objects from a plugin's submodule.
  • If multiple objects are provided, return a tuple.
Examples
>>> init = from_plugin_import('compose.utils', 'init')
>>> with mrsm.Venv('compose'):
...     cf = init()
>>> build_parent_pipe, get_defined_pipes = from_plugin_import(
...     'compose.utils.pipes',
...     'build_parent_pipe',
...     'get_defined_pipes',
... )
>>> parent_pipe = build_parent_pipe(cf)
>>> defined_pipes = get_defined_pipes(cf)
def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
876def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
877    """
878    Reload plugins back into memory.
879
880    Parameters
881    ----------
882    plugins: Optional[List[str]], default None
883        The plugins to reload. `None` will reload all plugins.
884
885    """
886    global _synced_symlinks
887    unload_plugins(plugins, debug=debug)
888    _synced_symlinks = 0
889    sync_plugins_symlinks(debug=debug)
890    load_plugins(skip_if_loaded=False, debug=debug)

Reload plugins back into memory.

Parameters
  • plugins (Optional[List[str]], default None): The plugins to reload. None will reload all plugins.
def get_plugins( *to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]:
893def get_plugins(*to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]:
894    """
895    Return a list of `Plugin` objects.
896
897    Parameters
898    ----------
899    to_load:
900        If specified, only load specific plugins.
901        Otherwise return all plugins.
902
903    try_import: bool, default True
904        If `True`, allow for plugins to be imported.
905    """
906    import meerschaum.config.paths as paths
907    import os
908    sync_plugins_symlinks()
909    _plugins = [
910        Plugin(name)
911        for name in (
912            to_load or [
913                (
914                    name if (paths.PLUGINS_RESOURCES_PATH / name).is_dir()
915                    else name[:-3]
916                )
917                for name in os.listdir(paths.PLUGINS_RESOURCES_PATH)
918                if name != '__init__.py'
919            ]
920        )
921    ]
922    plugins = tuple(plugin for plugin in _plugins if plugin.is_installed(try_import=try_import))
923    if len(to_load) == 1:
924        if len(plugins) == 0:
925            raise ValueError(f"Plugin '{to_load[0]}' is not installed.")
926        return plugins[0]
927    return plugins

Return a list of Plugin objects.

Parameters
  • to_load:: If specified, only load specific plugins. Otherwise return all plugins.
  • try_import (bool, default True): If True, allow for plugins to be imported.
def get_data_plugins() -> List[Plugin]:
944def get_data_plugins() -> List[Plugin]:
945    """
946    Only return the modules of plugins with either `fetch()` or `sync()` functions.
947    """
948    import inspect
949    plugins = get_plugins()
950    data_names = {'sync', 'fetch'}
951    data_plugins = []
952    for plugin in plugins:
953        for name, ob in inspect.getmembers(plugin.module):
954            if not inspect.isfunction(ob):
955                continue
956            if name not in data_names:
957                continue
958            data_plugins.append(plugin)
959    return data_plugins

Only return the modules of plugins with either fetch() or sync() functions.

def add_plugin_argument(*args, **kwargs) -> None:
962def add_plugin_argument(*args, **kwargs) -> None:
963    """
964    Add argparse arguments under the 'Plugins options' group.
965    Takes the same parameters as the regular argparse `add_argument()` function.
966
967    Examples
968    --------
969    >>> add_plugin_argument('--foo', type=int, help="This is my help text!")
970    >>> 
971    """
972    from meerschaum._internal.arguments._parser import groups, _seen_plugin_args, parser
973    from meerschaum.utils.warnings import warn
974    _parent_plugin_name = _get_parent_plugin()
975    title = f"Plugin '{_parent_plugin_name}' options" if _parent_plugin_name else 'Custom options'
976    group_key = 'plugin_' + (_parent_plugin_name or '')
977    if group_key not in groups:
978        groups[group_key] = parser.add_argument_group(
979            title = title,
980        )
981        _seen_plugin_args[group_key] = set()
982    try:
983        if str(args) not in _seen_plugin_args[group_key]:
984            groups[group_key].add_argument(*args, **kwargs)
985            _seen_plugin_args[group_key].add(str(args))
986    except Exception as e:
987        warn(e)

Add argparse arguments under the 'Plugins options' group. Takes the same parameters as the regular argparse add_argument() function.

Examples
>>> add_plugin_argument('--foo', type=int, help="This is my help text!")
>>>
def pre_sync_hook(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
130def pre_sync_hook(
131    function: Callable[[Any], Any],
132) -> Callable[[Any], Any]:
133    """
134    Register a function as a sync hook to be executed right before sync.
135    
136    Parameters
137    ----------
138    function: Callable[[Any], Any]
139        The function to execute right before a sync.
140        
141    Returns
142    -------
143    Another function (this is a decorator function).
144
145    Examples
146    --------
147    >>> from meerschaum.plugins import pre_sync_hook
148    >>>
149    >>> @pre_sync_hook
150    ... def log_sync(pipe, **kwargs):
151    ...     print(f"About to sync {pipe} with kwargs:\n{kwargs}.")
152    >>>
153    """
154    with _locks['_pre_sync_hooks']:
155        plugin_name = _get_parent_plugin()
156        try:
157            if plugin_name not in _pre_sync_hooks:
158                _pre_sync_hooks[plugin_name] = []
159            _pre_sync_hooks[plugin_name].append(function)
160        except Exception as e:
161            from meerschaum.utils.warnings import warn
162            warn(e)
163    return function

Register a function as a sync hook to be executed right before sync.

Parameters
----------
function: Callable[[Any], Any]
    The function to execute right before a sync.

Returns
-------
Another function (this is a decorator function).

Examples
--------
>>> from meerschaum.plugins import pre_sync_hook
>>>
>>> @pre_sync_hook
... def log_sync(pipe, **kwargs):
...     print(f"About to sync {pipe} with kwargs:

{kwargs}.")

>

def post_sync_hook(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
166def post_sync_hook(
167    function: Callable[[Any], Any],
168) -> Callable[[Any], Any]:
169    """
170    Register a function as a sync hook to be executed upon completion of a sync.
171    
172    Parameters
173    ----------
174    function: Callable[[Any], Any]
175        The function to execute upon completion of a sync.
176        
177    Returns
178    -------
179    Another function (this is a decorator function).
180
181    Examples
182    --------
183    >>> from meerschaum.plugins import post_sync_hook
184    >>> from meerschaum.utils.misc import interval_str
185    >>> from datetime import timedelta
186    >>>
187    >>> @post_sync_hook
188    ... def log_sync(pipe, success_tuple, duration=None, **kwargs):
189    ...     duration_delta = timedelta(seconds=duration)
190    ...     duration_text = interval_str(duration_delta)
191    ...     print(f"It took {duration_text} to sync {pipe}.")
192    >>>
193    """
194    with _locks['_post_sync_hooks']:
195        try:
196            plugin_name = _get_parent_plugin()
197            if plugin_name not in _post_sync_hooks:
198                _post_sync_hooks[plugin_name] = []
199            _post_sync_hooks[plugin_name].append(function)
200        except Exception as e:
201            from meerschaum.utils.warnings import warn
202            warn(e)
203    return function

Register a function as a sync hook to be executed upon completion of a sync.

Parameters
  • function (Callable[[Any], Any]): The function to execute upon completion of a sync.
Returns
  • Another function (this is a decorator function).
Examples
>>> from meerschaum.plugins import post_sync_hook
>>> from meerschaum.utils.misc import interval_str
>>> from datetime import timedelta
>>>
>>> @post_sync_hook
... def log_sync(pipe, success_tuple, duration=None, **kwargs):
...     duration_delta = timedelta(seconds=duration)
...     duration_text = interval_str(duration_delta)
...     print(f"It took {duration_text} to sync {pipe}.")
>>>