meerschaum.plugins

Expose plugin management APIs from the meerschaum.plugins module.

   1#! /usr/bin/env python
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4
   5"""
   6Expose plugin management APIs from the `meerschaum.plugins` module.
   7"""
   8
   9from __future__ import annotations
  10
  11import pathlib
  12import functools
  13
  14import meerschaum as mrsm
  15from meerschaum.utils.typing import Callable, Any, Union, Optional, Dict, List, Tuple
  16from meerschaum.utils.threading import RLock
  17from meerschaum.core.Plugin import Plugin
  18
  19_api_plugins: Dict[str, List[Callable[['fastapi.App'], Any]]] = {}
  20_pre_sync_hooks: Dict[Union[str, None], List[Callable[[Any], Any]]] = {}
  21_post_sync_hooks: Dict[Union[str, None], List[Callable[[Any], Any]]] = {}
  22_actions_daemon_enabled: Dict[str, bool] = {}
  23_locks = {
  24    '_api_plugins': RLock(),
  25    '_dash_plugins': RLock(),
  26    '_pre_sync_hooks': RLock(),
  27    '_post_sync_hooks': RLock(),
  28    '_actions_daemon_enabled': RLock(),
  29    '__path__': RLock(),
  30    'sys.path': RLock(),
  31    'internal_plugins': RLock(),
  32    '_synced_symlinks': RLock(),
  33    'PLUGINS_INTERNAL_LOCK_PATH': RLock(),
  34}
  35__all__ = (
  36    "Plugin",
  37    "make_action",
  38    "api_plugin",
  39    "dash_plugin",
  40    "web_page",
  41    "import_plugins",
  42    "from_plugin_import",
  43    "reload_plugins",
  44    "get_plugins",
  45    "get_data_plugins",
  46    "add_plugin_argument",
  47    "pre_sync_hook",
  48    "post_sync_hook",
  49)
  50__pdoc__ = {
  51    'venvs': False,
  52    'data': False,
  53    'stack': False,
  54    'plugins': False,
  55}
  56
  57
  58def make_action(
  59    function: Optional[Callable[[Any], Any]] = None,
  60    shell: bool = False,
  61    activate: bool = True,
  62    deactivate: bool = True,
  63    debug: bool = False,
  64    daemon: bool = True,
  65    skip_if_loaded: bool = True,
  66    _plugin_name: Optional[str] = None,
  67) -> Callable[[Any], Any]:
  68    """
  69    Make a function a Meerschaum action. Useful for plugins that are adding multiple actions.
  70    
  71    Parameters
  72    ----------
  73    function: Callable[[Any], Any]
  74        The function to become a Meerschaum action. Must accept all keyword arguments.
  75        
  76    shell: bool, default False
  77        Not used.
  78        
  79    Returns
  80    -------
  81    Another function (this is a decorator function).
  82
  83    Examples
  84    --------
  85    >>> from meerschaum.plugins import make_action
  86    >>>
  87    >>> @make_action
  88    ... def my_action(**kw):
  89    ...     print('foo')
  90    ...     return True, "Success"
  91    >>>
  92    """
  93    def _decorator(func: Callable[[Any], Any]) -> Callable[[Any], Any]:
  94        from meerschaum.actions import actions, _custom_actions_plugins, _plugins_actions
  95        if skip_if_loaded and func.__name__ in actions:
  96            return func
  97
  98        from meerschaum.config.paths import PLUGINS_RESOURCES_PATH
  99        plugin_name = (
 100            func.__name__.split(f"{PLUGINS_RESOURCES_PATH.stem}.", maxsplit=1)[-1].split('.')[0]
 101        )
 102        plugin = Plugin(plugin_name) if plugin_name else None
 103
 104        if debug:
 105            from meerschaum.utils.debug import dprint
 106            dprint(
 107                f"Adding action '{func.__name__}' from plugin "
 108                f"'{plugin}'..."
 109            )
 110
 111        actions[func.__name__] = func
 112        _custom_actions_plugins[func.__name__] = plugin_name
 113        if plugin_name not in _plugins_actions:
 114            _plugins_actions[plugin_name] = []
 115        _plugins_actions[plugin_name].append(func.__name__)
 116        if not daemon:
 117            _actions_daemon_enabled[func.__name__] = False
 118        return func
 119
 120    if function is None:
 121        return _decorator
 122    return _decorator(function)
 123
 124
 125def pre_sync_hook(
 126    function: Callable[[Any], Any],
 127) -> Callable[[Any], Any]:
 128    """
 129    Register a function as a sync hook to be executed right before sync.
 130    
 131    Parameters
 132    ----------
 133    function: Callable[[Any], Any]
 134        The function to execute right before a sync.
 135        
 136    Returns
 137    -------
 138    Another function (this is a decorator function).
 139
 140    Examples
 141    --------
 142    >>> from meerschaum.plugins import pre_sync_hook
 143    >>>
 144    >>> @pre_sync_hook
 145    ... def log_sync(pipe, **kwargs):
 146    ...     print(f"About to sync {pipe} with kwargs:\n{kwargs}.")
 147    >>>
 148    """
 149    with _locks['_pre_sync_hooks']:
 150        plugin_name = _get_parent_plugin()
 151        try:
 152            if plugin_name not in _pre_sync_hooks:
 153                _pre_sync_hooks[plugin_name] = []
 154            _pre_sync_hooks[plugin_name].append(function)
 155        except Exception as e:
 156            from meerschaum.utils.warnings import warn
 157            warn(e)
 158    return function
 159
 160
 161def post_sync_hook(
 162    function: Callable[[Any], Any],
 163) -> Callable[[Any], Any]:
 164    """
 165    Register a function as a sync hook to be executed upon completion of a sync.
 166    
 167    Parameters
 168    ----------
 169    function: Callable[[Any], Any]
 170        The function to execute upon completion of a sync.
 171        
 172    Returns
 173    -------
 174    Another function (this is a decorator function).
 175
 176    Examples
 177    --------
 178    >>> from meerschaum.plugins import post_sync_hook
 179    >>> from meerschaum.utils.misc import interval_str
 180    >>> from datetime import timedelta
 181    >>>
 182    >>> @post_sync_hook
 183    ... def log_sync(pipe, success_tuple, duration=None, **kwargs):
 184    ...     duration_delta = timedelta(seconds=duration)
 185    ...     duration_text = interval_str(duration_delta)
 186    ...     print(f"It took {duration_text} to sync {pipe}.")
 187    >>>
 188    """
 189    with _locks['_post_sync_hooks']:
 190        try:
 191            plugin_name = _get_parent_plugin()
 192            if plugin_name not in _post_sync_hooks:
 193                _post_sync_hooks[plugin_name] = []
 194            _post_sync_hooks[plugin_name].append(function)
 195        except Exception as e:
 196            from meerschaum.utils.warnings import warn
 197            warn(e)
 198    return function
 199
 200
 201_plugin_endpoints_to_pages = {}
 202_plugins_web_pages = {}
 203def web_page(
 204    page: Union[str, None, Callable[[Any], Any]] = None,
 205    login_required: bool = True,
 206    skip_navbar: bool = False,
 207    page_group: Optional[str] = None,
 208    **kwargs
 209) -> Any:
 210    """
 211    Quickly add pages to the dash application.
 212
 213    Examples
 214    --------
 215    >>> import meerschaum as mrsm
 216    >>> from meerschaum.plugins import web_page
 217    >>> html = mrsm.attempt_import('dash.html')
 218    >>> 
 219    >>> @web_page('foo/bar', login_required=False)
 220    >>> def foo_bar():
 221    ...     return html.Div([html.H1("Hello, World!")])
 222    >>> 
 223    """
 224    page_str = None
 225
 226    def _decorator(_func: Callable[[Any], Any]) -> Callable[[Any], Any]:
 227        nonlocal page_str, page_group
 228
 229        @functools.wraps(_func)
 230        def wrapper(*_args, **_kwargs):
 231            return _func(*_args, **_kwargs)
 232
 233        if page_str is None:
 234            page_str = _func.__name__
 235
 236        page_str = page_str.lstrip('/').rstrip('/').strip()
 237        if not page_str.startswith('dash'):
 238            page_str = f'/dash/{page_str}'
 239        page_key = (
 240            ' '.join(
 241                [
 242                    word.capitalize()
 243                    for word in (
 244                        page_str.replace('/dash', '').lstrip('/').rstrip('/').strip()
 245                        .replace('-', ' ').replace('_', ' ').split(' ')
 246                    )
 247                ]
 248            )
 249        )
 250 
 251        plugin_name = _get_parent_plugin()
 252        page_group = page_group or plugin_name
 253        if page_group not in _plugin_endpoints_to_pages:
 254            _plugin_endpoints_to_pages[page_group] = {}
 255        _plugin_endpoints_to_pages[page_group][page_str] = {
 256            'function': _func,
 257            'login_required': login_required,
 258            'skip_navbar': skip_navbar,
 259            'page_key': page_key,
 260        }
 261        if plugin_name not in _plugins_web_pages:
 262            _plugins_web_pages[plugin_name] = []
 263        _plugins_web_pages[plugin_name].append(_func)
 264        return wrapper
 265
 266    if callable(page):
 267        decorator_to_return = _decorator(page)
 268        page_str = page.__name__
 269    else:
 270        decorator_to_return = _decorator
 271        page_str = page
 272
 273    return decorator_to_return
 274
 275
 276_dash_plugins = {}
 277def dash_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
 278    """
 279    Execute the function when starting the Dash application.
 280    """
 281    with _locks['_dash_plugins']:
 282        plugin_name = _get_parent_plugin()
 283        try:
 284            if plugin_name not in _dash_plugins:
 285                _dash_plugins[plugin_name] = []
 286            _dash_plugins[plugin_name].append(function)
 287        except Exception as e:
 288            from meerschaum.utils.warnings import warn
 289            warn(e)
 290    return function
 291
 292
 293def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
 294    """
 295    Execute the function when initializing the Meerschaum API module.
 296    Useful for lazy-loading heavy plugins only when the API is started,
 297    such as when editing the `meerschaum.api.app` FastAPI app.
 298    
 299    The FastAPI app will be passed as the only parameter.
 300    
 301    Examples
 302    --------
 303    >>> from meerschaum.plugins import api_plugin
 304    >>>
 305    >>> @api_plugin
 306    >>> def initialize_plugin(app):
 307    ...     @app.get('/my/new/path')
 308    ...     def new_path():
 309    ...         return {'message': 'It works!'}
 310    >>>
 311    """
 312    with _locks['_api_plugins']:
 313        try:
 314            if function.__module__ not in _api_plugins:
 315                _api_plugins[function.__module__] = []
 316            _api_plugins[function.__module__].append(function)
 317        except Exception as e:
 318            from meerschaum.utils.warnings import warn
 319            warn(e)
 320    return function
 321
 322
 323_synced_symlinks: bool = False
 324_injected_plugin_symlinks = set()
 325def sync_plugins_symlinks(debug: bool = False, warn: bool = True) -> None:
 326    """
 327    Update the plugins' internal symlinks. 
 328    """
 329    from meerschaum.utils.warnings import error, warn as _warn, dprint
 330    global _synced_symlinks
 331    with _locks['_synced_symlinks']:
 332        if _synced_symlinks:
 333            if debug:
 334                dprint("Skip syncing symlinks...")
 335            return
 336
 337    import os
 338    import pathlib
 339    import time
 340    from collections import defaultdict
 341    from meerschaum.utils.misc import flatten_list, make_symlink, is_symlink
 342    from meerschaum._internal.static import STATIC_CONFIG
 343    from meerschaum.config._paths import (
 344        PLUGINS_RESOURCES_PATH,
 345        PLUGINS_INIT_PATH,
 346        PLUGINS_DIR_PATHS,
 347        PLUGINS_INTERNAL_LOCK_PATH,
 348        PLUGINS_INJECTED_RESOURCES_PATH,
 349    )
 350
 351    ### If the lock file exists, sleep for up to a second or until it's removed before continuing.
 352    with _locks['PLUGINS_INTERNAL_LOCK_PATH']:
 353        if PLUGINS_INTERNAL_LOCK_PATH.exists():
 354            lock_sleep_total     = STATIC_CONFIG['plugins']['lock_sleep_total']
 355            lock_sleep_increment = STATIC_CONFIG['plugins']['lock_sleep_increment']
 356            lock_start = time.perf_counter()
 357            while (
 358                (time.perf_counter() - lock_start) < lock_sleep_total
 359            ):
 360                time.sleep(lock_sleep_increment)
 361                if not PLUGINS_INTERNAL_LOCK_PATH.exists():
 362                    break
 363                try:
 364                    PLUGINS_INTERNAL_LOCK_PATH.unlink()
 365                except Exception as e:
 366                    if warn:
 367                        _warn(f"Error while removing lockfile {PLUGINS_INTERNAL_LOCK_PATH}:\n{e}")
 368                    break
 369
 370        ### Begin locking from other processes.
 371        try:
 372            PLUGINS_INTERNAL_LOCK_PATH.touch()
 373        except Exception as e:
 374            if warn:
 375                _warn(f"Unable to create lockfile {PLUGINS_INTERNAL_LOCK_PATH}:\n{e}")
 376
 377    with _locks['internal_plugins']:
 378
 379        try:
 380            from importlib.metadata import entry_points
 381        except ImportError:
 382            importlib_metadata = mrsm.attempt_import('importlib_metadata', lazy=False)
 383            entry_points = importlib_metadata.entry_points
 384
 385        ### NOTE: Allow plugins to be installed via `pip`.
 386        packaged_plugin_paths = []
 387        try:
 388            discovered_packaged_plugins_eps = entry_points(group='meerschaum.plugins')
 389        except TypeError:
 390            discovered_packaged_plugins_eps = []
 391
 392        for ep in discovered_packaged_plugins_eps:
 393            module_name = ep.name
 394            for package_file_path in ep.dist.files:
 395                if package_file_path.suffix != '.py':
 396                    continue
 397                if str(package_file_path) == f'{module_name}.py':
 398                    packaged_plugin_paths.append(package_file_path.locate())
 399                elif str(package_file_path) == f'{module_name}/__init__.py':
 400                    packaged_plugin_paths.append(package_file_path.locate().parent)
 401
 402        if is_symlink(PLUGINS_RESOURCES_PATH) or not PLUGINS_RESOURCES_PATH.exists():
 403            try:
 404                PLUGINS_RESOURCES_PATH.unlink()
 405            except Exception:
 406                pass
 407
 408        PLUGINS_RESOURCES_PATH.mkdir(exist_ok=True)
 409
 410        existing_symlinked_paths = {
 411            _existing_symlink: pathlib.Path(os.path.realpath(_existing_symlink))
 412            for item in os.listdir(PLUGINS_RESOURCES_PATH)
 413            if is_symlink(_existing_symlink := (PLUGINS_RESOURCES_PATH / item))
 414        }
 415        injected_symlinked_paths = {
 416            _injected_symlink: pathlib.Path(os.path.realpath(_injected_symlink))
 417            for item in os.listdir(PLUGINS_INJECTED_RESOURCES_PATH)
 418            if is_symlink(_injected_symlink := (PLUGINS_INJECTED_RESOURCES_PATH / item))
 419        }
 420        plugins_to_be_symlinked = list(flatten_list(
 421            [
 422                [
 423                    pathlib.Path(os.path.realpath(plugins_path / item))
 424                    for item in os.listdir(plugins_path)
 425                    if (
 426                        not item.startswith('.')
 427                    ) and (item not in ('__pycache__', '__init__.py'))
 428                ]
 429                for plugins_path in PLUGINS_DIR_PATHS
 430                if plugins_path.exists()
 431            ]
 432        ))
 433        plugins_to_be_symlinked.extend(packaged_plugin_paths)
 434
 435        ### Check for duplicates.
 436        seen_plugins = defaultdict(lambda: 0)
 437        for plugin_path in plugins_to_be_symlinked:
 438            plugin_name = plugin_path.stem
 439            seen_plugins[plugin_name] += 1
 440        for plugin_name, plugin_count in seen_plugins.items():
 441            if plugin_count > 1:
 442                if warn:
 443                    _warn(f"Found duplicate plugins named '{plugin_name}'.")
 444
 445        for plugin_symlink_path, real_path in existing_symlinked_paths.items():
 446
 447            ### Remove invalid symlinks.
 448            if real_path not in plugins_to_be_symlinked:
 449                if plugin_symlink_path in _injected_plugin_symlinks:
 450                    continue
 451                if plugin_symlink_path in injected_symlinked_paths:
 452                    continue
 453                if real_path in injected_symlinked_paths.values():
 454                    continue
 455                try:
 456                    plugin_symlink_path.unlink()
 457                except Exception:
 458                    pass
 459
 460            ### Remove valid plugins from the to-be-symlinked list.
 461            else:
 462                plugins_to_be_symlinked.remove(real_path)
 463
 464        for plugin_path in plugins_to_be_symlinked:
 465            plugin_symlink_path = PLUGINS_RESOURCES_PATH / plugin_path.name
 466            try:
 467                ### There might be duplicate folders (e.g. __pycache__).
 468                if (
 469                    plugin_symlink_path.exists()
 470                    and
 471                    plugin_symlink_path.is_dir()
 472                    and
 473                    not is_symlink(plugin_symlink_path)
 474                ):
 475                    continue
 476                success, msg = make_symlink(plugin_path, plugin_symlink_path)
 477            except Exception as e:
 478                success, msg = False, str(e)
 479            if not success:
 480                if warn:
 481                    _warn(
 482                        f"Failed to create symlink {plugin_symlink_path} "
 483                        + f"to {plugin_path}:\n    {msg}"
 484                    )
 485
 486    ### Release symlink lock file in case other processes need it.
 487    with _locks['PLUGINS_INTERNAL_LOCK_PATH']:
 488        try:
 489            if PLUGINS_INTERNAL_LOCK_PATH.exists():
 490                PLUGINS_INTERNAL_LOCK_PATH.unlink()
 491        ### Sometimes competing threads will delete the lock file at the same time.
 492        except FileNotFoundError:
 493            pass
 494        except Exception as e:
 495            if warn:
 496                _warn(f"Error cleaning up lockfile {PLUGINS_INTERNAL_LOCK_PATH}:\n{e}")
 497
 498        try:
 499            if not PLUGINS_INIT_PATH.exists():
 500                PLUGINS_INIT_PATH.touch()
 501        except Exception as e:
 502            error(f"Failed to create the file '{PLUGINS_INIT_PATH}':\n{e}")
 503
 504    with _locks['__path__']:
 505        if str(PLUGINS_RESOURCES_PATH.parent) not in __path__:
 506            __path__.append(str(PLUGINS_RESOURCES_PATH.parent))
 507
 508    with _locks['_synced_symlinks']:
 509        _synced_symlinks = True
 510
 511
 512def import_plugins(
 513    *plugins_to_import: Union[str, List[str], None],
 514    warn: bool = True,
 515) -> Union[
 516    'ModuleType', Tuple['ModuleType', None]
 517]:
 518    """
 519    Import the Meerschaum plugins directory.
 520
 521    Parameters
 522    ----------
 523    plugins_to_import: Union[str, List[str], None]
 524        If provided, only import the specified plugins.
 525        Otherwise import the entire plugins module. May be a string, list, or `None`.
 526        Defaults to `None`.
 527
 528    Returns
 529    -------
 530    A module of list of modules, depening on the number of plugins provided.
 531
 532    """
 533    import sys
 534    import importlib
 535    from meerschaum.utils.misc import flatten_list
 536    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
 537    from meerschaum.utils.venv import is_venv_active, activate_venv, deactivate_venv, Venv
 538    from meerschaum.utils.warnings import warn as _warn
 539    plugins_to_import = list(plugins_to_import)
 540    prepended_sys_path = False
 541    with _locks['sys.path']:
 542
 543        ### Since plugins may depend on other plugins,
 544        ### we need to activate the virtual environments for library plugins.
 545        ### This logic exists in `Plugin.activate_venv()`,
 546        ### but that code requires the plugin's module to already be imported.
 547        ### It's not a guarantee of correct activation order,
 548        ### e.g. if a library plugin pins a specific package and another 
 549        plugins_names = get_plugins_names()
 550        already_active_venvs = [is_venv_active(plugin_name) for plugin_name in plugins_names]
 551
 552        if not sys.path or sys.path[0] != str(PLUGINS_RESOURCES_PATH.parent):
 553            prepended_sys_path = True
 554            sys.path.insert(0, str(PLUGINS_RESOURCES_PATH.parent))
 555
 556        if not plugins_to_import:
 557            for plugin_name in plugins_names:
 558                activate_venv(plugin_name)
 559            try:
 560                imported_plugins = importlib.import_module(PLUGINS_RESOURCES_PATH.stem)
 561            except ImportError as e:
 562                _warn(f"Failed to import the plugins module:\n    {e}")
 563                import traceback
 564                traceback.print_exc()
 565                imported_plugins = None
 566            for plugin_name in plugins_names:
 567                if plugin_name in already_active_venvs:
 568                    continue
 569                deactivate_venv(plugin_name)
 570
 571        else:
 572            imported_plugins = []
 573            for plugin_name in flatten_list(plugins_to_import):
 574                plugin = Plugin(plugin_name)
 575                try:
 576                    with Venv(plugin, init_if_not_exists=False):
 577                        imported_plugins.append(
 578                            importlib.import_module(
 579                                f'{PLUGINS_RESOURCES_PATH.stem}.{plugin_name}'
 580                            )
 581                        )
 582                except Exception as e:
 583                    _warn(
 584                        f"Failed to import plugin '{plugin_name}':\n    "
 585                        + f"{e}\n\nHere's a stacktrace:",
 586                        stack = False,
 587                    )
 588                    from meerschaum.utils.formatting import get_console
 589                    get_console().print_exception(
 590                        suppress = [
 591                            'meerschaum/plugins/__init__.py',
 592                            importlib,
 593                            importlib._bootstrap,
 594                        ]
 595                    )
 596                    imported_plugins.append(None)
 597
 598        if imported_plugins is None and warn:
 599            _warn("Failed to import plugins.", stacklevel=3)
 600
 601        if prepended_sys_path and str(PLUGINS_RESOURCES_PATH.parent) in sys.path:
 602            sys.path.remove(str(PLUGINS_RESOURCES_PATH.parent))
 603
 604    if isinstance(imported_plugins, list):
 605        return (imported_plugins[0] if len(imported_plugins) == 1 else tuple(imported_plugins))
 606    return imported_plugins
 607
 608
 609def from_plugin_import(plugin_import_name: str, *attrs: str) -> Any:
 610    """
 611    Emulate the `from module import x` behavior.
 612
 613    Parameters
 614    ----------
 615    plugin_import_name: str
 616        The import name of the plugin's module.
 617        Separate submodules with '.' (e.g. 'compose.utils.pipes')
 618
 619    attrs: str
 620        Names of the attributes to return.
 621
 622    Returns
 623    -------
 624    Objects from a plugin's submodule.
 625    If multiple objects are provided, return a tuple.
 626
 627    Examples
 628    --------
 629    >>> init = from_plugin_import('compose.utils', 'init')
 630    >>> with mrsm.Venv('compose'):
 631    ...     cf = init()
 632    >>> build_parent_pipe, get_defined_pipes = from_plugin_import(
 633    ...     'compose.utils.pipes',
 634    ...     'build_parent_pipe',
 635    ...     'get_defined_pipes',
 636    ... )
 637    >>> parent_pipe = build_parent_pipe(cf)
 638    >>> defined_pipes = get_defined_pipes(cf)
 639    """
 640    import importlib
 641    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
 642    from meerschaum.utils.warnings import warn as _warn
 643    if plugin_import_name.startswith('plugins.'):
 644        plugin_import_name = plugin_import_name[len('plugins.'):]
 645    plugin_import_parts = plugin_import_name.split('.')
 646    plugin_root_name = plugin_import_parts[0]
 647    plugin = mrsm.Plugin(plugin_root_name)
 648
 649    submodule_import_name = '.'.join(
 650        [PLUGINS_RESOURCES_PATH.stem]
 651        + plugin_import_parts
 652    )
 653    if len(attrs) == 0:
 654        raise ValueError(f"Provide which attributes to return from '{submodule_import_name}'.")
 655
 656    attrs_to_return = []
 657    with mrsm.Venv(plugin):
 658        if plugin.module is None:
 659            raise ImportError(f"Unable to import plugin '{plugin}'.")
 660
 661        try:
 662            submodule = importlib.import_module(submodule_import_name)
 663        except ImportError as e:
 664            _warn(
 665                f"Failed to import plugin '{submodule_import_name}':\n    "
 666                + f"{e}\n\nHere's a stacktrace:",
 667                stack=False,
 668            )
 669            from meerschaum.utils.formatting import get_console
 670            get_console().print_exception(
 671                suppress=[
 672                    'meerschaum/plugins/__init__.py',
 673                    importlib,
 674                    importlib._bootstrap,
 675                ]
 676            )
 677            return None
 678
 679        for attr in attrs:
 680            try:
 681                attrs_to_return.append(getattr(submodule, attr))
 682            except Exception:
 683                _warn(f"Failed to access '{attr}' from '{submodule_import_name}'.")
 684                attrs_to_return.append(None)
 685        
 686        if len(attrs) == 1:
 687            return attrs_to_return[0]
 688
 689        return tuple(attrs_to_return)
 690
 691
 692_loaded_plugins: bool = False
 693def load_plugins(
 694    skip_if_loaded: bool = True,
 695    shell: bool = False,
 696    debug: bool = False,
 697) -> None:
 698    """
 699    Import Meerschaum plugins and update the actions dictionary.
 700    """
 701    global _loaded_plugins
 702    from meerschaum.utils.warnings import dprint
 703
 704    if skip_if_loaded and _loaded_plugins:
 705        if debug:
 706            dprint("Skip loading plugins...")
 707        return
 708
 709    from inspect import isfunction, getmembers
 710    from meerschaum.actions import __all__ as _all, modules
 711    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
 712    from meerschaum.utils.packages import get_modules_from_package
 713
 714    _plugins_names, plugins_modules = get_modules_from_package(
 715        import_plugins(),
 716        names = True,
 717        recursive = True,
 718        modules_venvs = True
 719    )
 720
 721    ### I'm appending here to keep from redefining the modules list.
 722    new_modules = (
 723        [
 724            mod
 725            for mod in modules
 726            if not mod.__name__.startswith(PLUGINS_RESOURCES_PATH.stem + '.')
 727        ]
 728        + plugins_modules
 729    )
 730    n_mods = len(modules)
 731    for mod in new_modules:
 732        modules.append(mod)
 733    for i in range(n_mods):
 734        modules.pop(0)
 735
 736    for module in plugins_modules:
 737        for name, func in getmembers(module):
 738            if not isfunction(func):
 739                continue
 740            if name == module.__name__.split('.')[-1]:
 741                make_action(
 742                    func,
 743                    **{'shell': shell, 'debug': debug},
 744                    _plugin_name=name,
 745                    skip_if_loaded=True,
 746                )
 747
 748    _loaded_plugins = True
 749
 750
 751def unload_custom_actions(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
 752    """
 753    Unload the custom actions added by plugins.
 754    """
 755    from meerschaum.actions import (
 756        actions,
 757        _custom_actions_plugins,
 758        _plugins_actions,
 759    )
 760    from meerschaum._internal.entry import _shell
 761    import meerschaum._internal.shell as shell_pkg
 762
 763    plugins = plugins if plugins is not None else list(_plugins_actions)
 764
 765    for plugin in plugins:
 766        action_names = _plugins_actions.get(plugin, [])
 767        actions_to_remove = {
 768            action_name: actions.get(action_name, None)
 769            for action_name in action_names
 770        }
 771        for action_name in action_names:
 772            _ = actions.pop(action_name, None)
 773            _ = _custom_actions_plugins.pop(action_name, None)
 774            _ = _actions_daemon_enabled.pop(action_name, None)
 775
 776        _ = _plugins_actions.pop(plugin, None)
 777        shell_pkg._remove_shell_actions(
 778            _shell=_shell,
 779            actions=actions_to_remove,
 780        )
 781    
 782
 783def unload_plugins(
 784    plugins: Optional[List[str]] = None,
 785    remove_symlinks: bool = True,
 786    debug: bool = False,
 787) -> None:
 788    """
 789    Unload the specified plugins from memory.
 790    """
 791    global _loaded_plugins, _synced_symlinks
 792    import sys
 793    from meerschaum.config.paths import PLUGINS_RESOURCES_PATH, PLUGINS_INJECTED_RESOURCES_PATH
 794    from meerschaum.connectors import unload_plugin_connectors
 795    if debug:
 796        from meerschaum.utils.warnings import dprint
 797
 798    _loaded_plugins = False
 799    _synced_symlinks = False
 800
 801    all_plugins = get_plugins_names()
 802    plugins = plugins if plugins is not None else all_plugins
 803    if debug:
 804        dprint(f"Unloading plugins: {plugins}")
 805
 806    unload_custom_actions(plugins, debug=debug)
 807    unload_plugin_connectors(plugins, debug=debug)
 808
 809    module_prefix = f"{PLUGINS_RESOURCES_PATH.stem}."
 810    loaded_modules = [mod_name for mod_name in sys.modules if mod_name.startswith(module_prefix)]
 811
 812    root_plugins_mod = (
 813        sys.modules.get(PLUGINS_RESOURCES_PATH.stem, None)
 814        if sorted(plugins) != sorted(all_plugins)
 815        else sys.modules.pop(PLUGINS_RESOURCES_PATH, None)
 816    )
 817
 818    for plugin_name in plugins:
 819        for mod_name in loaded_modules:
 820            if (
 821                mod_name[len(PLUGINS_RESOURCES_PATH.stem):].startswith(plugin_name + '.')
 822                or mod_name[len(PLUGINS_RESOURCES_PATH.stem):] == plugin_name
 823            ):
 824                _ = sys.modules.pop(mod_name, None)
 825
 826        if root_plugins_mod is not None and plugin_name in root_plugins_mod.__dict__:
 827            try:
 828                delattr(root_plugins_mod, plugin_name)
 829            except Exception:
 830                pass
 831
 832        ### Unload sync hooks.
 833        _ = _pre_sync_hooks.pop(plugin_name, None)
 834        _ = _post_sync_hooks.pop(plugin_name, None)
 835
 836        ### Unload API endpoints and pages.
 837        _ = _dash_plugins.pop(plugin_name, None)
 838        web_page_funcs = _plugins_web_pages.pop(plugin_name, None) or []
 839        page_groups_to_pop = []
 840        for page_group, page_functions in _plugin_endpoints_to_pages.items():
 841            page_functions_to_pop = [
 842                page_str
 843                for page_str, page_payload in page_functions.items()
 844                if page_payload.get('function', None) in web_page_funcs
 845            ]
 846            for page_str in page_functions_to_pop:
 847                page_functions.pop(page_str, None)
 848            if not page_functions:
 849                page_groups_to_pop.append(page_group)
 850        
 851        for page_group in page_groups_to_pop:
 852            _plugin_endpoints_to_pages.pop(page_group, None)
 853
 854        ### Remove all but injected symlinks.
 855        if remove_symlinks:
 856            dir_symlink_path = PLUGINS_RESOURCES_PATH / plugin_name
 857            dir_symlink_injected_path = PLUGINS_INJECTED_RESOURCES_PATH / plugin_name
 858            file_symlink_path = PLUGINS_RESOURCES_PATH / f"{plugin_name}.py"
 859            file_symlink_injected_path = PLUGINS_INJECTED_RESOURCES_PATH / f"{plugin_name}.py"
 860
 861            try:
 862                if dir_symlink_path.exists() and not dir_symlink_injected_path.exists():
 863                    dir_symlink_path.unlink()
 864            except Exception:
 865                pass
 866
 867            try:
 868                if file_symlink_path.exists() and not file_symlink_injected_path.exists():
 869                    file_symlink_path.unlink()
 870            except Exception:
 871                pass
 872
 873
 874def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
 875    """
 876    Reload plugins back into memory.
 877
 878    Parameters
 879    ----------
 880    plugins: Optional[List[str]], default None
 881        The plugins to reload. `None` will reload all plugins.
 882
 883    """
 884    global _synced_symlinks
 885    unload_plugins(plugins, debug=debug)
 886    _synced_symlinks = False
 887    sync_plugins_symlinks(debug=debug)
 888    load_plugins(skip_if_loaded=False, debug=debug)
 889
 890
 891def get_plugins(*to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]:
 892    """
 893    Return a list of `Plugin` objects.
 894
 895    Parameters
 896    ----------
 897    to_load:
 898        If specified, only load specific plugins.
 899        Otherwise return all plugins.
 900
 901    try_import: bool, default True
 902        If `True`, allow for plugins to be imported.
 903    """
 904    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
 905    import os
 906    sync_plugins_symlinks()
 907    _plugins = [
 908        Plugin(name)
 909        for name in (
 910            to_load or [
 911                (
 912                    name if (PLUGINS_RESOURCES_PATH / name).is_dir()
 913                    else name[:-3]
 914                )
 915                for name in os.listdir(PLUGINS_RESOURCES_PATH)
 916                if name != '__init__.py'
 917            ]
 918        )
 919    ]
 920    plugins = tuple(plugin for plugin in _plugins if plugin.is_installed(try_import=try_import))
 921    if len(to_load) == 1:
 922        if len(plugins) == 0:
 923            raise ValueError(f"Plugin '{to_load[0]}' is not installed.")
 924        return plugins[0]
 925    return plugins
 926
 927
 928def get_plugins_names(*to_load, **kw) -> List[str]:
 929    """
 930    Return a list of installed plugins.
 931    """
 932    return [plugin.name for plugin in get_plugins(*to_load, **kw)]
 933
 934
 935def get_plugins_modules(*to_load, **kw) -> List['ModuleType']:
 936    """
 937    Return a list of modules for the installed plugins, or `None` if things break.
 938    """
 939    return [plugin.module for plugin in get_plugins(*to_load, **kw)]
 940
 941
 942def get_data_plugins() -> List[Plugin]:
 943    """
 944    Only return the modules of plugins with either `fetch()` or `sync()` functions.
 945    """
 946    import inspect
 947    plugins = get_plugins()
 948    data_names = {'sync', 'fetch'}
 949    data_plugins = []
 950    for plugin in plugins:
 951        for name, ob in inspect.getmembers(plugin.module):
 952            if not inspect.isfunction(ob):
 953                continue
 954            if name not in data_names:
 955                continue
 956            data_plugins.append(plugin)
 957    return data_plugins
 958
 959
 960def add_plugin_argument(*args, **kwargs) -> None:
 961    """
 962    Add argparse arguments under the 'Plugins options' group.
 963    Takes the same parameters as the regular argparse `add_argument()` function.
 964
 965    Examples
 966    --------
 967    >>> add_plugin_argument('--foo', type=int, help="This is my help text!")
 968    >>> 
 969    """
 970    from meerschaum._internal.arguments._parser import groups, _seen_plugin_args, parser
 971    from meerschaum.utils.warnings import warn
 972    _parent_plugin_name = _get_parent_plugin()
 973    title = f"Plugin '{_parent_plugin_name}' options" if _parent_plugin_name else 'Custom options'
 974    group_key = 'plugin_' + (_parent_plugin_name or '')
 975    if group_key not in groups:
 976        groups[group_key] = parser.add_argument_group(
 977            title = title,
 978        )
 979        _seen_plugin_args[group_key] = set()
 980    try:
 981        if str(args) not in _seen_plugin_args[group_key]:
 982            groups[group_key].add_argument(*args, **kwargs)
 983            _seen_plugin_args[group_key].add(str(args))
 984    except Exception as e:
 985        warn(e)
 986
 987
 988def inject_plugin_path(
 989    plugin_path: pathlib.Path,
 990    plugins_resources_path: Optional[pathlib.Path] = None) -> None:
 991    """
 992    Inject a plugin as a symlink into the internal `plugins` directory.
 993
 994    Parameters
 995    ----------
 996    plugin_path: pathlib.Path
 997        The path to the plugin's source module.
 998    """
 999    from meerschaum.utils.misc import make_symlink
1000    if plugins_resources_path is None:
1001        from meerschaum.config.paths import PLUGINS_RESOURCES_PATH, PLUGINS_INJECTED_RESOURCES_PATH
1002        plugins_resources_path = PLUGINS_RESOURCES_PATH
1003        plugins_injected_resources_path = PLUGINS_INJECTED_RESOURCES_PATH
1004    else:
1005        plugins_injected_resources_path = plugins_resources_path / '.injected'
1006
1007    if plugin_path.is_dir():
1008        plugin_name = plugin_path.name
1009        dest_path = plugins_resources_path / plugin_name
1010        injected_path = plugins_injected_resources_path / plugin_name
1011    elif plugin_path.name == '__init__.py':
1012        plugin_name = plugin_path.parent.name
1013        dest_path = plugins_resources_path / plugin_name
1014        injected_path = plugins_injected_resources_path / plugin_name
1015    elif plugin_path.name.endswith('.py'):
1016        plugin_name = plugin_path.name[:(-1 * len('.py'))]
1017        dest_path = plugins_resources_path / plugin_path.name
1018        injected_path = plugins_injected_resources_path / plugin_path.name
1019    else:
1020        raise ValueError(f"Cannot deduce plugin name from path '{plugin_path}'.")
1021
1022    _injected_plugin_symlinks.add(dest_path)
1023    make_symlink(plugin_path, dest_path)
1024    make_symlink(plugin_path, injected_path)
1025
1026
1027def _get_parent_plugin(stacklevel: Union[int, Tuple[int, ...]] = (1, 2, 3, 4)) -> Union[str, None]:
1028    """If this function is called from outside a Meerschaum plugin, it will return None."""
1029    import inspect
1030    if not isinstance(stacklevel, tuple):
1031        stacklevel = (stacklevel,)
1032
1033    for _level in stacklevel:
1034        try:
1035            parent_globals = inspect.stack()[_level][0].f_globals
1036            global_name = parent_globals.get('__name__', '')
1037            if global_name.startswith('meerschaum.'):
1038                continue
1039            plugin_name = global_name.replace('plugins.', '').split('.')[0]
1040            if plugin_name.startswith('_') or plugin_name == 'importlib':
1041                continue
1042            return plugin_name
1043        except Exception:
1044            continue
1045
1046    return None
class Plugin:
 30class Plugin:
 31    """Handle packaging of Meerschaum plugins."""
 32
 33    def __init__(
 34        self,
 35        name: str,
 36        version: Optional[str] = None,
 37        user_id: Optional[int] = None,
 38        required: Optional[List[str]] = None,
 39        attributes: Optional[Dict[str, Any]] = None,
 40        archive_path: Optional[pathlib.Path] = None,
 41        venv_path: Optional[pathlib.Path] = None,
 42        repo_connector: Optional['mrsm.connectors.api.APIConnector'] = None,
 43        repo: Union['mrsm.connectors.api.APIConnector', str, None] = None,
 44    ):
 45        from meerschaum._internal.static import STATIC_CONFIG
 46        from meerschaum.config.paths import PLUGINS_ARCHIVES_RESOURCES_PATH, VIRTENV_RESOURCES_PATH
 47        sep = STATIC_CONFIG['plugins']['repo_separator']
 48        _repo = None
 49        if sep in name:
 50            try:
 51                name, _repo = name.split(sep)
 52            except Exception as e:
 53                error(f"Invalid plugin name: '{name}'")
 54        self._repo_in_name = _repo
 55
 56        if attributes is None:
 57            attributes = {}
 58        self.name = name
 59        self.attributes = attributes
 60        self.user_id = user_id
 61        self._version = version
 62        if required:
 63            self._required = required
 64        self.archive_path = (
 65            archive_path if archive_path is not None
 66            else PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz"
 67        )
 68        self.venv_path = (
 69            venv_path if venv_path is not None
 70            else VIRTENV_RESOURCES_PATH / self.name
 71        )
 72        self._repo_connector = repo_connector
 73        self._repo_keys = repo
 74
 75
 76    @property
 77    def repo_connector(self):
 78        """
 79        Return the repository connector for this plugin.
 80        NOTE: This imports the `connectors` module, which imports certain plugin modules.
 81        """
 82        if self._repo_connector is None:
 83            from meerschaum.connectors.parse import parse_repo_keys
 84
 85            repo_keys = self._repo_keys or self._repo_in_name
 86            if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name:
 87                error(
 88                    f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'."
 89                )
 90            repo_connector = parse_repo_keys(repo_keys)
 91            self._repo_connector = repo_connector
 92        return self._repo_connector
 93
 94
 95    @property
 96    def version(self):
 97        """
 98        Return the plugin's module version is defined (`__version__`) if it's defined.
 99        """
100        if self._version is None:
101            try:
102                self._version = self.module.__version__
103            except Exception as e:
104                self._version = None
105        return self._version
106
107
108    @property
109    def module(self):
110        """
111        Return the Python module of the underlying plugin.
112        """
113        if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None:
114            if self.__file__ is None:
115                return None
116
117            from meerschaum.plugins import import_plugins
118            self._module = import_plugins(str(self), warn=False)
119
120        return self._module
121
122
123    @property
124    def __file__(self) -> Union[str, None]:
125        """
126        Return the file path (str) of the plugin if it exists, otherwise `None`.
127        """
128        if self.__dict__.get('_module', None) is not None:
129            return self.module.__file__
130
131        from meerschaum.config.paths import PLUGINS_RESOURCES_PATH
132
133        potential_dir = PLUGINS_RESOURCES_PATH / self.name
134        if (
135            potential_dir.exists()
136            and potential_dir.is_dir()
137            and (potential_dir / '__init__.py').exists()
138        ):
139            return str((potential_dir / '__init__.py').as_posix())
140
141        potential_file = PLUGINS_RESOURCES_PATH / (self.name + '.py')
142        if potential_file.exists() and not potential_file.is_dir():
143            return str(potential_file.as_posix())
144
145        return None
146
147
148    @property
149    def requirements_file_path(self) -> Union[pathlib.Path, None]:
150        """
151        If a file named `requirements.txt` exists, return its path.
152        """
153        if self.__file__ is None:
154            return None
155        path = pathlib.Path(self.__file__).parent / 'requirements.txt'
156        if not path.exists():
157            return None
158        return path
159
160
161    def is_installed(self, **kw) -> bool:
162        """
163        Check whether a plugin is correctly installed.
164
165        Returns
166        -------
167        A `bool` indicating whether a plugin exists and is successfully imported.
168        """
169        return self.__file__ is not None
170
171
172    def make_tar(self, debug: bool = False) -> pathlib.Path:
173        """
174        Compress the plugin's source files into a `.tar.gz` archive and return the archive's path.
175
176        Parameters
177        ----------
178        debug: bool, default False
179            Verbosity toggle.
180
181        Returns
182        -------
183        A `pathlib.Path` to the archive file's path.
184
185        """
186        import tarfile, pathlib, subprocess, fnmatch
187        from meerschaum.utils.debug import dprint
188        from meerschaum.utils.packages import attempt_import
189        pathspec = attempt_import('pathspec', debug=debug)
190
191        if not self.__file__:
192            from meerschaum.utils.warnings import error
193            error(f"Could not find file for plugin '{self}'.")
194        if '__init__.py' in self.__file__ or os.path.isdir(self.__file__):
195            path = self.__file__.replace('__init__.py', '')
196            is_dir = True
197        else:
198            path = self.__file__
199            is_dir = False
200
201        old_cwd = os.getcwd()
202        real_parent_path = pathlib.Path(os.path.realpath(path)).parent
203        os.chdir(real_parent_path)
204
205        default_patterns_to_ignore = [
206            '.pyc',
207            '__pycache__/',
208            'eggs/',
209            '__pypackages__/',
210            '.git',
211        ]
212
213        def parse_gitignore() -> 'Set[str]':
214            gitignore_path = pathlib.Path(path) / '.gitignore'
215            if not gitignore_path.exists():
216                return set(default_patterns_to_ignore)
217            with open(gitignore_path, 'r', encoding='utf-8') as f:
218                gitignore_text = f.read()
219            return set(pathspec.PathSpec.from_lines(
220                pathspec.patterns.GitWildMatchPattern,
221                default_patterns_to_ignore + gitignore_text.splitlines()
222            ).match_tree(path))
223
224        patterns_to_ignore = parse_gitignore() if is_dir else set()
225
226        if debug:
227            dprint(f"Patterns to ignore:\n{patterns_to_ignore}")
228
229        with tarfile.open(self.archive_path, 'w:gz') as tarf:
230            if not is_dir:
231                tarf.add(f"{self.name}.py")
232            else:
233                for root, dirs, files in os.walk(self.name):
234                    for f in files:
235                        good_file = True
236                        fp = os.path.join(root, f)
237                        for pattern in patterns_to_ignore:
238                            if pattern in str(fp) or f.startswith('.'):
239                                good_file = False
240                                break
241                        if good_file:
242                            if debug:
243                                dprint(f"Adding '{fp}'...")
244                            tarf.add(fp)
245
246        ### clean up and change back to old directory
247        os.chdir(old_cwd)
248
249        ### change to 775 to avoid permissions issues with the API in a Docker container
250        self.archive_path.chmod(0o775)
251
252        if debug:
253            dprint(f"Created archive '{self.archive_path}'.")
254        return self.archive_path
255
256
257    def install(
258        self,
259        skip_deps: bool = False,
260        force: bool = False,
261        debug: bool = False,
262    ) -> SuccessTuple:
263        """
264        Extract a plugin's tar archive to the plugins directory.
265        
266        This function checks if the plugin is already installed and if the version is equal or
267        greater than the existing installation.
268
269        Parameters
270        ----------
271        skip_deps: bool, default False
272            If `True`, do not install dependencies.
273
274        force: bool, default False
275            If `True`, continue with installation, even if required packages fail to install.
276
277        debug: bool, default False
278            Verbosity toggle.
279
280        Returns
281        -------
282        A `SuccessTuple` of success (bool) and a message (str).
283
284        """
285        if self.full_name in _ongoing_installations:
286            return True, f"Already installing plugin '{self}'."
287        _ongoing_installations.add(self.full_name)
288        from meerschaum.utils.warnings import warn, error
289        if debug:
290            from meerschaum.utils.debug import dprint
291        import tarfile
292        import re
293        import ast
294        from meerschaum.plugins import sync_plugins_symlinks
295        from meerschaum.utils.packages import attempt_import, determine_version, reload_meerschaum
296        from meerschaum.utils.venv import init_venv
297        from meerschaum.utils.misc import safely_extract_tar
298        from meerschaum.config.paths import PLUGINS_TEMP_RESOURCES_PATH, PLUGINS_DIR_PATHS
299        old_cwd = os.getcwd()
300        old_version = ''
301        new_version = ''
302        temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name
303        temp_dir.mkdir(exist_ok=True)
304
305        if not self.archive_path.exists():
306            return False, f"Missing archive file for plugin '{self}'."
307        if self.version is not None:
308            old_version = self.version
309            if debug:
310                dprint(f"Found existing version '{old_version}' for plugin '{self}'.")
311
312        if debug:
313            dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...")
314
315        try:
316            with tarfile.open(self.archive_path, 'r:gz') as tarf:
317                safely_extract_tar(tarf, temp_dir)
318        except Exception as e:
319            warn(e)
320            return False, f"Failed to extract plugin '{self.name}'."
321
322        ### search for version information
323        files = os.listdir(temp_dir)
324        
325        if str(files[0]) == self.name:
326            is_dir = True
327        elif str(files[0]) == self.name + '.py':
328            is_dir = False
329        else:
330            error(f"Unknown format encountered for plugin '{self}'.")
331
332        fpath = temp_dir / files[0]
333        if is_dir:
334            fpath = fpath / '__init__.py'
335
336        init_venv(self.name, debug=debug)
337        with open(fpath, 'r', encoding='utf-8') as f:
338            init_lines = f.readlines()
339        new_version = None
340        for line in init_lines:
341            if '__version__' not in line:
342                continue
343            version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip())
344            if not version_match:
345                continue
346            new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip())
347            break
348        if not new_version:
349            warn(
350                f"No `__version__` defined for plugin '{self}'. "
351                + "Assuming new version...",
352                stack = False,
353            )
354
355        packaging_version = attempt_import('packaging.version')
356        try:
357            is_new_version = (not new_version and not old_version) or (
358                packaging_version.parse(old_version) < packaging_version.parse(new_version)
359            )
360            is_same_version = new_version and old_version and (
361                packaging_version.parse(old_version) == packaging_version.parse(new_version)
362            )
363        except Exception:
364            is_new_version, is_same_version = True, False
365
366        ### Determine where to permanently store the new plugin.
367        plugin_installation_dir_path = PLUGINS_DIR_PATHS[0]
368        for path in PLUGINS_DIR_PATHS:
369            if not path.exists():
370                warn(f"Plugins path does not exist: {path}", stack=False)
371                continue
372
373            files_in_plugins_dir = os.listdir(path)
374            if (
375                self.name in files_in_plugins_dir
376                or
377                (self.name + '.py') in files_in_plugins_dir
378            ):
379                plugin_installation_dir_path = path
380                break
381
382        success_msg = (
383            f"Successfully installed plugin '{self}'"
384            + ("\n    (skipped dependencies)" if skip_deps else "")
385            + "."
386        )
387        success, abort = None, None
388
389        if is_same_version and not force:
390            success, msg = True, (
391                f"Plugin '{self}' is up-to-date (version {old_version}).\n" +
392                "    Install again with `-f` or `--force` to reinstall."
393            )
394            abort = True
395        elif is_new_version or force:
396            for src_dir, dirs, files in os.walk(temp_dir):
397                if success is not None:
398                    break
399                dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path))
400                if not os.path.exists(dst_dir):
401                    os.mkdir(dst_dir)
402                for f in files:
403                    src_file = os.path.join(src_dir, f)
404                    dst_file = os.path.join(dst_dir, f)
405                    if os.path.exists(dst_file):
406                        os.remove(dst_file)
407
408                    if debug:
409                        dprint(f"Moving '{src_file}' to '{dst_dir}'...")
410                    try:
411                        shutil.move(src_file, dst_dir)
412                    except Exception:
413                        success, msg = False, (
414                            f"Failed to install plugin '{self}': " +
415                            f"Could not move file '{src_file}' to '{dst_dir}'"
416                        )
417                        print(msg)
418                        break
419            if success is None:
420                success, msg = True, success_msg
421        else:
422            success, msg = False, (
423                f"Your installed version of plugin '{self}' ({old_version}) is higher than "
424                + f"attempted version {new_version}."
425            )
426
427        shutil.rmtree(temp_dir)
428        os.chdir(old_cwd)
429
430        ### Reload the plugin's module.
431        sync_plugins_symlinks(debug=debug)
432        if '_module' in self.__dict__:
433            del self.__dict__['_module']
434        init_venv(venv=self.name, force=True, debug=debug)
435        reload_meerschaum(debug=debug)
436
437        ### if we've already failed, return here
438        if not success or abort:
439            _ongoing_installations.remove(self.full_name)
440            return success, msg
441
442        ### attempt to install dependencies
443        dependencies_installed = skip_deps or self.install_dependencies(force=force, debug=debug)
444        if not dependencies_installed:
445            _ongoing_installations.remove(self.full_name)
446            return False, f"Failed to install dependencies for plugin '{self}'."
447
448        ### handling success tuple, bool, or other (typically None)
449        setup_tuple = self.setup(debug=debug)
450        if isinstance(setup_tuple, tuple):
451            if not setup_tuple[0]:
452                success, msg = setup_tuple
453        elif isinstance(setup_tuple, bool):
454            if not setup_tuple:
455                success, msg = False, (
456                    f"Failed to run post-install setup for plugin '{self}'." + '\n' +
457                    f"Check `setup()` in '{self.__file__}' for more information " +
458                    "(no error message provided)."
459                )
460            else:
461                success, msg = True, success_msg
462        elif setup_tuple is None:
463            success = True
464            msg = (
465                f"Post-install for plugin '{self}' returned None. " +
466                "Assuming plugin successfully installed."
467            )
468            warn(msg)
469        else:
470            success = False
471            msg = (
472                f"Post-install for plugin '{self}' returned unexpected value " +
473                f"of type '{type(setup_tuple)}': {setup_tuple}"
474            )
475
476        _ongoing_installations.remove(self.full_name)
477        _ = self.module
478        return success, msg
479
480
481    def remove_archive(
482        self,        
483        debug: bool = False
484    ) -> SuccessTuple:
485        """Remove a plugin's archive file."""
486        if not self.archive_path.exists():
487            return True, f"Archive file for plugin '{self}' does not exist."
488        try:
489            self.archive_path.unlink()
490        except Exception as e:
491            return False, f"Failed to remove archive for plugin '{self}':\n{e}"
492        return True, "Success"
493
494
495    def remove_venv(
496        self,        
497        debug: bool = False
498    ) -> SuccessTuple:
499        """Remove a plugin's virtual environment."""
500        if not self.venv_path.exists():
501            return True, f"Virtual environment for plugin '{self}' does not exist."
502        try:
503            shutil.rmtree(self.venv_path)
504        except Exception as e:
505            return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}"
506        return True, "Success"
507
508
509    def uninstall(self, debug: bool = False) -> SuccessTuple:
510        """
511        Remove a plugin, its virtual environment, and archive file.
512        """
513        from meerschaum.utils.packages import reload_meerschaum
514        from meerschaum.plugins import sync_plugins_symlinks
515        from meerschaum.utils.warnings import warn, info
516        warnings_thrown_count: int = 0
517        max_warnings: int = 3
518
519        if not self.is_installed():
520            info(
521                f"Plugin '{self.name}' doesn't seem to be installed.\n    "
522                + "Checking for artifacts...",
523                stack = False,
524            )
525        else:
526            real_path = pathlib.Path(os.path.realpath(self.__file__))
527            try:
528                if real_path.name == '__init__.py':
529                    shutil.rmtree(real_path.parent)
530                else:
531                    real_path.unlink()
532            except Exception as e:
533                warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False)
534                warnings_thrown_count += 1
535            else:
536                info(f"Removed source files for plugin '{self.name}'.")
537
538        if self.venv_path.exists():
539            success, msg = self.remove_venv(debug=debug)
540            if not success:
541                warn(msg, stack=False)
542                warnings_thrown_count += 1
543            else:
544                info(f"Removed virtual environment from plugin '{self.name}'.")
545
546        success = warnings_thrown_count < max_warnings
547        sync_plugins_symlinks(debug=debug)
548        self.deactivate_venv(force=True, debug=debug)
549        reload_meerschaum(debug=debug)
550        return success, (
551            f"Successfully uninstalled plugin '{self}'." if success
552            else f"Failed to uninstall plugin '{self}'."
553        )
554
555
556    def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]:
557        """
558        If exists, run the plugin's `setup()` function.
559
560        Parameters
561        ----------
562        *args: str
563            The positional arguments passed to the `setup()` function.
564            
565        debug: bool, default False
566            Verbosity toggle.
567
568        **kw: Any
569            The keyword arguments passed to the `setup()` function.
570
571        Returns
572        -------
573        A `SuccessTuple` or `bool` indicating success.
574
575        """
576        from meerschaum.utils.debug import dprint
577        import inspect
578        _setup = None
579        for name, fp in inspect.getmembers(self.module):
580            if name == 'setup' and inspect.isfunction(fp):
581                _setup = fp
582                break
583
584        ### assume success if no setup() is found (not necessary)
585        if _setup is None:
586            return True
587
588        sig = inspect.signature(_setup)
589        has_debug, has_kw = ('debug' in sig.parameters), False
590        for k, v in sig.parameters.items():
591            if '**' in str(v):
592                has_kw = True
593                break
594
595        _kw = {}
596        if has_kw:
597            _kw.update(kw)
598        if has_debug:
599            _kw['debug'] = debug
600
601        if debug:
602            dprint(f"Running setup for plugin '{self}'...")
603        try:
604            self.activate_venv(debug=debug)
605            return_tuple = _setup(*args, **_kw)
606            self.deactivate_venv(debug=debug)
607        except Exception as e:
608            return False, str(e)
609
610        if isinstance(return_tuple, tuple):
611            return return_tuple
612        if isinstance(return_tuple, bool):
613            return return_tuple, f"Setup for Plugin '{self.name}' did not return a message."
614        if return_tuple is None:
615            return False, f"Setup for Plugin '{self.name}' returned None."
616        return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"
617
618
619    def get_dependencies(
620        self,
621        debug: bool = False,
622    ) -> List[str]:
623        """
624        If the Plugin has specified dependencies in a list called `required`, return the list.
625        
626        **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages.
627        Meerschaum plugins may also specify connector keys for a repo after `'@'`.
628
629        Parameters
630        ----------
631        debug: bool, default False
632            Verbosity toggle.
633
634        Returns
635        -------
636        A list of required packages and plugins (str).
637
638        """
639        if '_required' in self.__dict__:
640            return self._required
641
642        ### If the plugin has not yet been imported,
643        ### infer the dependencies from the source text.
644        ### This is not super robust, and it doesn't feel right
645        ### having multiple versions of the logic.
646        ### This is necessary when determining the activation order
647        ### without having import the module.
648        ### For consistency's sake, the module-less method does not cache the requirements.
649        if self.__dict__.get('_module', None) is None:
650            file_path = self.__file__
651            if file_path is None:
652                return []
653            with open(file_path, 'r', encoding='utf-8') as f:
654                text = f.read()
655
656            if 'required' not in text:
657                return []
658
659            ### This has some limitations:
660            ### It relies on `required` being manually declared.
661            ### We lose the ability to dynamically alter the `required` list,
662            ### which is why we've kept the module-reliant method below.
663            import ast, re
664            ### NOTE: This technically would break 
665            ### if `required` was the very first line of the file.
666            req_start_match = re.search(r'\nrequired(:\s*)?.*=', text)
667            if not req_start_match:
668                return []
669            req_start = req_start_match.start()
670            equals_sign = req_start + text[req_start:].find('=')
671
672            ### Dependencies may have brackets within the strings, so push back the index.
673            first_opening_brace = equals_sign + 1 + text[equals_sign:].find('[')
674            if first_opening_brace == -1:
675                return []
676
677            next_closing_brace = equals_sign + 1 + text[equals_sign:].find(']')
678            if next_closing_brace == -1:
679                return []
680
681            start_ix = first_opening_brace + 1
682            end_ix = next_closing_brace
683
684            num_braces = 0
685            while True:
686                if '[' not in text[start_ix:end_ix]:
687                    break
688                num_braces += 1
689                start_ix = end_ix
690                end_ix += text[end_ix + 1:].find(']') + 1
691
692            req_end = end_ix + 1
693            req_text = (
694                text[(first_opening_brace-1):req_end]
695                .lstrip()
696                .replace('=', '', 1)
697                .lstrip()
698                .rstrip()
699            )
700            try:
701                required = ast.literal_eval(req_text)
702            except Exception as e:
703                warn(
704                    f"Unable to determine requirements for plugin '{self.name}' "
705                    + "without importing the module.\n"
706                    + "    This may be due to dynamically setting the global `required` list.\n"
707                    + f"    {e}"
708                )
709                return []
710            return required
711
712        import inspect
713        self.activate_venv(dependencies=False, debug=debug)
714        required = []
715        for name, val in inspect.getmembers(self.module):
716            if name == 'required':
717                required = val
718                break
719        self._required = required
720        self.deactivate_venv(dependencies=False, debug=debug)
721        return required
722
723
724    def get_required_plugins(self, debug: bool=False) -> List[mrsm.plugins.Plugin]:
725        """
726        Return a list of required Plugin objects.
727        """
728        from meerschaum.utils.warnings import warn
729        from meerschaum.config import get_config
730        from meerschaum._internal.static import STATIC_CONFIG
731        from meerschaum.connectors.parse import is_valid_connector_keys
732        plugins = []
733        _deps = self.get_dependencies(debug=debug)
734        sep = STATIC_CONFIG['plugins']['repo_separator']
735        plugin_names = [
736            _d[len('plugin:'):] for _d in _deps
737            if _d.startswith('plugin:') and len(_d) > len('plugin:')
738        ]
739        default_repo_keys = get_config('meerschaum', 'repository')
740        skipped_repo_keys = set()
741
742        for _plugin_name in plugin_names:
743            if sep in _plugin_name:
744                try:
745                    _plugin_name, _repo_keys = _plugin_name.split(sep)
746                except Exception:
747                    _repo_keys = default_repo_keys
748                    warn(
749                        f"Invalid repo keys for required plugin '{_plugin_name}'.\n    "
750                        + f"Will try to use '{_repo_keys}' instead.",
751                        stack = False,
752                    )
753            else:
754                _repo_keys = default_repo_keys
755
756            if _repo_keys in skipped_repo_keys:
757                continue
758
759            if not is_valid_connector_keys(_repo_keys):
760                warn(
761                    f"Invalid connector '{_repo_keys}'.\n"
762                    f"    Skipping required plugins from repository '{_repo_keys}'",
763                    stack=False,
764                )
765                continue
766
767            plugins.append(Plugin(_plugin_name, repo=_repo_keys))
768
769        return plugins
770
771
772    def get_required_packages(self, debug: bool=False) -> List[str]:
773        """
774        Return the required package names (excluding plugins).
775        """
776        _deps = self.get_dependencies(debug=debug)
777        return [_d for _d in _deps if not _d.startswith('plugin:')]
778
779
780    def activate_venv(
781        self,
782        dependencies: bool = True,
783        init_if_not_exists: bool = True,
784        debug: bool = False,
785        **kw
786    ) -> bool:
787        """
788        Activate the virtual environments for the plugin and its dependencies.
789
790        Parameters
791        ----------
792        dependencies: bool, default True
793            If `True`, activate the virtual environments for required plugins.
794
795        Returns
796        -------
797        A bool indicating success.
798        """
799        from meerschaum.utils.venv import venv_target_path
800        from meerschaum.utils.packages import activate_venv
801        from meerschaum.utils.misc import make_symlink, is_symlink
802        from meerschaum.config._paths import PACKAGE_ROOT_PATH
803
804        if dependencies:
805            for plugin in self.get_required_plugins(debug=debug):
806                plugin.activate_venv(debug=debug, init_if_not_exists=init_if_not_exists, **kw)
807
808        vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True)
809        venv_meerschaum_path = vtp / 'meerschaum'
810
811        try:
812            success, msg = True, "Success"
813            if is_symlink(venv_meerschaum_path):
814                if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH:
815                    venv_meerschaum_path.unlink()
816                    success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH)
817        except Exception as e:
818            success, msg = False, str(e)
819        if not success:
820            warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}")
821
822        return activate_venv(self.name, init_if_not_exists=init_if_not_exists, debug=debug, **kw)
823
824
825    def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool:
826        """
827        Deactivate the virtual environments for the plugin and its dependencies.
828
829        Parameters
830        ----------
831        dependencies: bool, default True
832            If `True`, deactivate the virtual environments for required plugins.
833
834        Returns
835        -------
836        A bool indicating success.
837        """
838        from meerschaum.utils.packages import deactivate_venv
839        success = deactivate_venv(self.name, debug=debug, **kw)
840        if dependencies:
841            for plugin in self.get_required_plugins(debug=debug):
842                plugin.deactivate_venv(debug=debug, **kw)
843        return success
844
845
846    def install_dependencies(
847        self,
848        force: bool = False,
849        debug: bool = False,
850    ) -> bool:
851        """
852        If specified, install dependencies.
853        
854        **NOTE:** Dependencies that start with `'plugin:'` will be installed as
855        Meerschaum plugins from the same repository as this Plugin.
856        To install from a different repository, add the repo keys after `'@'`
857        (e.g. `'plugin:foo@api:bar'`).
858
859        Parameters
860        ----------
861        force: bool, default False
862            If `True`, continue with the installation, even if some
863            required packages fail to install.
864
865        debug: bool, default False
866            Verbosity toggle.
867
868        Returns
869        -------
870        A bool indicating success.
871        """
872        from meerschaum.utils.packages import pip_install, venv_contains_package
873        from meerschaum.utils.warnings import warn, info
874        _deps = self.get_dependencies(debug=debug)
875        if not _deps and self.requirements_file_path is None:
876            return True
877
878        plugins = self.get_required_plugins(debug=debug)
879        for _plugin in plugins:
880            if _plugin.name == self.name:
881                warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False)
882                continue
883            _success, _msg = _plugin.repo_connector.install_plugin(
884                _plugin.name, debug=debug, force=force
885            )
886            if not _success:
887                warn(
888                    f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'"
889                    + f" for plugin '{self.name}':\n" + _msg,
890                    stack = False,
891                )
892                if not force:
893                    warn(
894                        "Try installing with the `--force` flag to continue anyway.",
895                        stack = False,
896                    )
897                    return False
898                info(
899                    "Continuing with installation despite the failure "
900                    + "(careful, things might be broken!)...",
901                    icon = False
902                )
903
904
905        ### First step: parse `requirements.txt` if it exists.
906        if self.requirements_file_path is not None:
907            if not pip_install(
908                requirements_file_path=self.requirements_file_path,
909                venv=self.name, debug=debug
910            ):
911                warn(
912                    f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.",
913                    stack = False,
914                )
915                if not force:
916                    warn(
917                        "Try installing with `--force` to continue anyway.",
918                        stack = False,
919                    )
920                    return False
921                info(
922                    "Continuing with installation despite the failure "
923                    + "(careful, things might be broken!)...",
924                    icon = False
925                )
926
927
928        ### Don't reinstall packages that are already included in required plugins.
929        packages = []
930        _packages = self.get_required_packages(debug=debug)
931        accounted_for_packages = set()
932        for package_name in _packages:
933            for plugin in plugins:
934                if venv_contains_package(package_name, plugin.name):
935                    accounted_for_packages.add(package_name)
936                    break
937        packages = [pkg for pkg in _packages if pkg not in accounted_for_packages]
938
939        ### Attempt pip packages installation.
940        if packages:
941            for package in packages:
942                if not pip_install(package, venv=self.name, debug=debug):
943                    warn(
944                        f"Failed to install required package '{package}'"
945                        + f" for plugin '{self.name}'.",
946                        stack = False,
947                    )
948                    if not force:
949                        warn(
950                            "Try installing with `--force` to continue anyway.",
951                            stack = False,
952                        )
953                        return False
954                    info(
955                        "Continuing with installation despite the failure "
956                        + "(careful, things might be broken!)...",
957                        icon = False
958                    )
959        return True
960
961
962    @property
963    def full_name(self) -> str:
964        """
965        Include the repo keys with the plugin's name.
966        """
967        from meerschaum._internal.static import STATIC_CONFIG
968        sep = STATIC_CONFIG['plugins']['repo_separator']
969        return self.name + sep + str(self.repo_connector)
970
971
972    def __str__(self):
973        return self.name
974
975
976    def __repr__(self):
977        return f"Plugin('{self.name}', repo='{self.repo_connector}')"
978
979
980    def __del__(self):
981        pass

Handle packaging of Meerschaum plugins.

Plugin( name: str, version: Optional[str] = None, user_id: Optional[int] = None, required: Optional[List[str]] = None, attributes: Optional[Dict[str, Any]] = None, archive_path: Optional[pathlib.Path] = None, venv_path: Optional[pathlib.Path] = None, repo_connector: Optional[meerschaum.connectors.APIConnector] = None, repo: Union[meerschaum.connectors.APIConnector, str, NoneType] = None)
33    def __init__(
34        self,
35        name: str,
36        version: Optional[str] = None,
37        user_id: Optional[int] = None,
38        required: Optional[List[str]] = None,
39        attributes: Optional[Dict[str, Any]] = None,
40        archive_path: Optional[pathlib.Path] = None,
41        venv_path: Optional[pathlib.Path] = None,
42        repo_connector: Optional['mrsm.connectors.api.APIConnector'] = None,
43        repo: Union['mrsm.connectors.api.APIConnector', str, None] = None,
44    ):
45        from meerschaum._internal.static import STATIC_CONFIG
46        from meerschaum.config.paths import PLUGINS_ARCHIVES_RESOURCES_PATH, VIRTENV_RESOURCES_PATH
47        sep = STATIC_CONFIG['plugins']['repo_separator']
48        _repo = None
49        if sep in name:
50            try:
51                name, _repo = name.split(sep)
52            except Exception as e:
53                error(f"Invalid plugin name: '{name}'")
54        self._repo_in_name = _repo
55
56        if attributes is None:
57            attributes = {}
58        self.name = name
59        self.attributes = attributes
60        self.user_id = user_id
61        self._version = version
62        if required:
63            self._required = required
64        self.archive_path = (
65            archive_path if archive_path is not None
66            else PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz"
67        )
68        self.venv_path = (
69            venv_path if venv_path is not None
70            else VIRTENV_RESOURCES_PATH / self.name
71        )
72        self._repo_connector = repo_connector
73        self._repo_keys = repo
name
attributes
user_id
archive_path
venv_path
repo_connector
76    @property
77    def repo_connector(self):
78        """
79        Return the repository connector for this plugin.
80        NOTE: This imports the `connectors` module, which imports certain plugin modules.
81        """
82        if self._repo_connector is None:
83            from meerschaum.connectors.parse import parse_repo_keys
84
85            repo_keys = self._repo_keys or self._repo_in_name
86            if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name:
87                error(
88                    f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'."
89                )
90            repo_connector = parse_repo_keys(repo_keys)
91            self._repo_connector = repo_connector
92        return self._repo_connector

Return the repository connector for this plugin. NOTE: This imports the connectors module, which imports certain plugin modules.

version
 95    @property
 96    def version(self):
 97        """
 98        Return the plugin's module version is defined (`__version__`) if it's defined.
 99        """
100        if self._version is None:
101            try:
102                self._version = self.module.__version__
103            except Exception as e:
104                self._version = None
105        return self._version

Return the plugin's module version is defined (__version__) if it's defined.

module
108    @property
109    def module(self):
110        """
111        Return the Python module of the underlying plugin.
112        """
113        if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None:
114            if self.__file__ is None:
115                return None
116
117            from meerschaum.plugins import import_plugins
118            self._module = import_plugins(str(self), warn=False)
119
120        return self._module

Return the Python module of the underlying plugin.

requirements_file_path: Optional[pathlib.Path]
148    @property
149    def requirements_file_path(self) -> Union[pathlib.Path, None]:
150        """
151        If a file named `requirements.txt` exists, return its path.
152        """
153        if self.__file__ is None:
154            return None
155        path = pathlib.Path(self.__file__).parent / 'requirements.txt'
156        if not path.exists():
157            return None
158        return path

If a file named requirements.txt exists, return its path.

def is_installed(self, **kw) -> bool:
161    def is_installed(self, **kw) -> bool:
162        """
163        Check whether a plugin is correctly installed.
164
165        Returns
166        -------
167        A `bool` indicating whether a plugin exists and is successfully imported.
168        """
169        return self.__file__ is not None

Check whether a plugin is correctly installed.

Returns
  • A bool indicating whether a plugin exists and is successfully imported.
def make_tar(self, debug: bool = False) -> pathlib.Path:
172    def make_tar(self, debug: bool = False) -> pathlib.Path:
173        """
174        Compress the plugin's source files into a `.tar.gz` archive and return the archive's path.
175
176        Parameters
177        ----------
178        debug: bool, default False
179            Verbosity toggle.
180
181        Returns
182        -------
183        A `pathlib.Path` to the archive file's path.
184
185        """
186        import tarfile, pathlib, subprocess, fnmatch
187        from meerschaum.utils.debug import dprint
188        from meerschaum.utils.packages import attempt_import
189        pathspec = attempt_import('pathspec', debug=debug)
190
191        if not self.__file__:
192            from meerschaum.utils.warnings import error
193            error(f"Could not find file for plugin '{self}'.")
194        if '__init__.py' in self.__file__ or os.path.isdir(self.__file__):
195            path = self.__file__.replace('__init__.py', '')
196            is_dir = True
197        else:
198            path = self.__file__
199            is_dir = False
200
201        old_cwd = os.getcwd()
202        real_parent_path = pathlib.Path(os.path.realpath(path)).parent
203        os.chdir(real_parent_path)
204
205        default_patterns_to_ignore = [
206            '.pyc',
207            '__pycache__/',
208            'eggs/',
209            '__pypackages__/',
210            '.git',
211        ]
212
213        def parse_gitignore() -> 'Set[str]':
214            gitignore_path = pathlib.Path(path) / '.gitignore'
215            if not gitignore_path.exists():
216                return set(default_patterns_to_ignore)
217            with open(gitignore_path, 'r', encoding='utf-8') as f:
218                gitignore_text = f.read()
219            return set(pathspec.PathSpec.from_lines(
220                pathspec.patterns.GitWildMatchPattern,
221                default_patterns_to_ignore + gitignore_text.splitlines()
222            ).match_tree(path))
223
224        patterns_to_ignore = parse_gitignore() if is_dir else set()
225
226        if debug:
227            dprint(f"Patterns to ignore:\n{patterns_to_ignore}")
228
229        with tarfile.open(self.archive_path, 'w:gz') as tarf:
230            if not is_dir:
231                tarf.add(f"{self.name}.py")
232            else:
233                for root, dirs, files in os.walk(self.name):
234                    for f in files:
235                        good_file = True
236                        fp = os.path.join(root, f)
237                        for pattern in patterns_to_ignore:
238                            if pattern in str(fp) or f.startswith('.'):
239                                good_file = False
240                                break
241                        if good_file:
242                            if debug:
243                                dprint(f"Adding '{fp}'...")
244                            tarf.add(fp)
245
246        ### clean up and change back to old directory
247        os.chdir(old_cwd)
248
249        ### change to 775 to avoid permissions issues with the API in a Docker container
250        self.archive_path.chmod(0o775)
251
252        if debug:
253            dprint(f"Created archive '{self.archive_path}'.")
254        return self.archive_path

Compress the plugin's source files into a .tar.gz archive and return the archive's path.

Parameters
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pathlib.Path to the archive file's path.
def install( self, skip_deps: bool = False, force: bool = False, debug: bool = False) -> Tuple[bool, str]:
257    def install(
258        self,
259        skip_deps: bool = False,
260        force: bool = False,
261        debug: bool = False,
262    ) -> SuccessTuple:
263        """
264        Extract a plugin's tar archive to the plugins directory.
265        
266        This function checks if the plugin is already installed and if the version is equal or
267        greater than the existing installation.
268
269        Parameters
270        ----------
271        skip_deps: bool, default False
272            If `True`, do not install dependencies.
273
274        force: bool, default False
275            If `True`, continue with installation, even if required packages fail to install.
276
277        debug: bool, default False
278            Verbosity toggle.
279
280        Returns
281        -------
282        A `SuccessTuple` of success (bool) and a message (str).
283
284        """
285        if self.full_name in _ongoing_installations:
286            return True, f"Already installing plugin '{self}'."
287        _ongoing_installations.add(self.full_name)
288        from meerschaum.utils.warnings import warn, error
289        if debug:
290            from meerschaum.utils.debug import dprint
291        import tarfile
292        import re
293        import ast
294        from meerschaum.plugins import sync_plugins_symlinks
295        from meerschaum.utils.packages import attempt_import, determine_version, reload_meerschaum
296        from meerschaum.utils.venv import init_venv
297        from meerschaum.utils.misc import safely_extract_tar
298        from meerschaum.config.paths import PLUGINS_TEMP_RESOURCES_PATH, PLUGINS_DIR_PATHS
299        old_cwd = os.getcwd()
300        old_version = ''
301        new_version = ''
302        temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name
303        temp_dir.mkdir(exist_ok=True)
304
305        if not self.archive_path.exists():
306            return False, f"Missing archive file for plugin '{self}'."
307        if self.version is not None:
308            old_version = self.version
309            if debug:
310                dprint(f"Found existing version '{old_version}' for plugin '{self}'.")
311
312        if debug:
313            dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...")
314
315        try:
316            with tarfile.open(self.archive_path, 'r:gz') as tarf:
317                safely_extract_tar(tarf, temp_dir)
318        except Exception as e:
319            warn(e)
320            return False, f"Failed to extract plugin '{self.name}'."
321
322        ### search for version information
323        files = os.listdir(temp_dir)
324        
325        if str(files[0]) == self.name:
326            is_dir = True
327        elif str(files[0]) == self.name + '.py':
328            is_dir = False
329        else:
330            error(f"Unknown format encountered for plugin '{self}'.")
331
332        fpath = temp_dir / files[0]
333        if is_dir:
334            fpath = fpath / '__init__.py'
335
336        init_venv(self.name, debug=debug)
337        with open(fpath, 'r', encoding='utf-8') as f:
338            init_lines = f.readlines()
339        new_version = None
340        for line in init_lines:
341            if '__version__' not in line:
342                continue
343            version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip())
344            if not version_match:
345                continue
346            new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip())
347            break
348        if not new_version:
349            warn(
350                f"No `__version__` defined for plugin '{self}'. "
351                + "Assuming new version...",
352                stack = False,
353            )
354
355        packaging_version = attempt_import('packaging.version')
356        try:
357            is_new_version = (not new_version and not old_version) or (
358                packaging_version.parse(old_version) < packaging_version.parse(new_version)
359            )
360            is_same_version = new_version and old_version and (
361                packaging_version.parse(old_version) == packaging_version.parse(new_version)
362            )
363        except Exception:
364            is_new_version, is_same_version = True, False
365
366        ### Determine where to permanently store the new plugin.
367        plugin_installation_dir_path = PLUGINS_DIR_PATHS[0]
368        for path in PLUGINS_DIR_PATHS:
369            if not path.exists():
370                warn(f"Plugins path does not exist: {path}", stack=False)
371                continue
372
373            files_in_plugins_dir = os.listdir(path)
374            if (
375                self.name in files_in_plugins_dir
376                or
377                (self.name + '.py') in files_in_plugins_dir
378            ):
379                plugin_installation_dir_path = path
380                break
381
382        success_msg = (
383            f"Successfully installed plugin '{self}'"
384            + ("\n    (skipped dependencies)" if skip_deps else "")
385            + "."
386        )
387        success, abort = None, None
388
389        if is_same_version and not force:
390            success, msg = True, (
391                f"Plugin '{self}' is up-to-date (version {old_version}).\n" +
392                "    Install again with `-f` or `--force` to reinstall."
393            )
394            abort = True
395        elif is_new_version or force:
396            for src_dir, dirs, files in os.walk(temp_dir):
397                if success is not None:
398                    break
399                dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path))
400                if not os.path.exists(dst_dir):
401                    os.mkdir(dst_dir)
402                for f in files:
403                    src_file = os.path.join(src_dir, f)
404                    dst_file = os.path.join(dst_dir, f)
405                    if os.path.exists(dst_file):
406                        os.remove(dst_file)
407
408                    if debug:
409                        dprint(f"Moving '{src_file}' to '{dst_dir}'...")
410                    try:
411                        shutil.move(src_file, dst_dir)
412                    except Exception:
413                        success, msg = False, (
414                            f"Failed to install plugin '{self}': " +
415                            f"Could not move file '{src_file}' to '{dst_dir}'"
416                        )
417                        print(msg)
418                        break
419            if success is None:
420                success, msg = True, success_msg
421        else:
422            success, msg = False, (
423                f"Your installed version of plugin '{self}' ({old_version}) is higher than "
424                + f"attempted version {new_version}."
425            )
426
427        shutil.rmtree(temp_dir)
428        os.chdir(old_cwd)
429
430        ### Reload the plugin's module.
431        sync_plugins_symlinks(debug=debug)
432        if '_module' in self.__dict__:
433            del self.__dict__['_module']
434        init_venv(venv=self.name, force=True, debug=debug)
435        reload_meerschaum(debug=debug)
436
437        ### if we've already failed, return here
438        if not success or abort:
439            _ongoing_installations.remove(self.full_name)
440            return success, msg
441
442        ### attempt to install dependencies
443        dependencies_installed = skip_deps or self.install_dependencies(force=force, debug=debug)
444        if not dependencies_installed:
445            _ongoing_installations.remove(self.full_name)
446            return False, f"Failed to install dependencies for plugin '{self}'."
447
448        ### handling success tuple, bool, or other (typically None)
449        setup_tuple = self.setup(debug=debug)
450        if isinstance(setup_tuple, tuple):
451            if not setup_tuple[0]:
452                success, msg = setup_tuple
453        elif isinstance(setup_tuple, bool):
454            if not setup_tuple:
455                success, msg = False, (
456                    f"Failed to run post-install setup for plugin '{self}'." + '\n' +
457                    f"Check `setup()` in '{self.__file__}' for more information " +
458                    "(no error message provided)."
459                )
460            else:
461                success, msg = True, success_msg
462        elif setup_tuple is None:
463            success = True
464            msg = (
465                f"Post-install for plugin '{self}' returned None. " +
466                "Assuming plugin successfully installed."
467            )
468            warn(msg)
469        else:
470            success = False
471            msg = (
472                f"Post-install for plugin '{self}' returned unexpected value " +
473                f"of type '{type(setup_tuple)}': {setup_tuple}"
474            )
475
476        _ongoing_installations.remove(self.full_name)
477        _ = self.module
478        return success, msg

Extract a plugin's tar archive to the plugins directory.

This function checks if the plugin is already installed and if the version is equal or greater than the existing installation.

Parameters
  • skip_deps (bool, default False): If True, do not install dependencies.
  • force (bool, default False): If True, continue with installation, even if required packages fail to install.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A SuccessTuple of success (bool) and a message (str).
def remove_archive(self, debug: bool = False) -> Tuple[bool, str]:
481    def remove_archive(
482        self,        
483        debug: bool = False
484    ) -> SuccessTuple:
485        """Remove a plugin's archive file."""
486        if not self.archive_path.exists():
487            return True, f"Archive file for plugin '{self}' does not exist."
488        try:
489            self.archive_path.unlink()
490        except Exception as e:
491            return False, f"Failed to remove archive for plugin '{self}':\n{e}"
492        return True, "Success"

Remove a plugin's archive file.

def remove_venv(self, debug: bool = False) -> Tuple[bool, str]:
495    def remove_venv(
496        self,        
497        debug: bool = False
498    ) -> SuccessTuple:
499        """Remove a plugin's virtual environment."""
500        if not self.venv_path.exists():
501            return True, f"Virtual environment for plugin '{self}' does not exist."
502        try:
503            shutil.rmtree(self.venv_path)
504        except Exception as e:
505            return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}"
506        return True, "Success"

Remove a plugin's virtual environment.

def uninstall(self, debug: bool = False) -> Tuple[bool, str]:
509    def uninstall(self, debug: bool = False) -> SuccessTuple:
510        """
511        Remove a plugin, its virtual environment, and archive file.
512        """
513        from meerschaum.utils.packages import reload_meerschaum
514        from meerschaum.plugins import sync_plugins_symlinks
515        from meerschaum.utils.warnings import warn, info
516        warnings_thrown_count: int = 0
517        max_warnings: int = 3
518
519        if not self.is_installed():
520            info(
521                f"Plugin '{self.name}' doesn't seem to be installed.\n    "
522                + "Checking for artifacts...",
523                stack = False,
524            )
525        else:
526            real_path = pathlib.Path(os.path.realpath(self.__file__))
527            try:
528                if real_path.name == '__init__.py':
529                    shutil.rmtree(real_path.parent)
530                else:
531                    real_path.unlink()
532            except Exception as e:
533                warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False)
534                warnings_thrown_count += 1
535            else:
536                info(f"Removed source files for plugin '{self.name}'.")
537
538        if self.venv_path.exists():
539            success, msg = self.remove_venv(debug=debug)
540            if not success:
541                warn(msg, stack=False)
542                warnings_thrown_count += 1
543            else:
544                info(f"Removed virtual environment from plugin '{self.name}'.")
545
546        success = warnings_thrown_count < max_warnings
547        sync_plugins_symlinks(debug=debug)
548        self.deactivate_venv(force=True, debug=debug)
549        reload_meerschaum(debug=debug)
550        return success, (
551            f"Successfully uninstalled plugin '{self}'." if success
552            else f"Failed to uninstall plugin '{self}'."
553        )

Remove a plugin, its virtual environment, and archive file.

def setup( self, *args: str, debug: bool = False, **kw: Any) -> Union[Tuple[bool, str], bool]:
556    def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]:
557        """
558        If exists, run the plugin's `setup()` function.
559
560        Parameters
561        ----------
562        *args: str
563            The positional arguments passed to the `setup()` function.
564            
565        debug: bool, default False
566            Verbosity toggle.
567
568        **kw: Any
569            The keyword arguments passed to the `setup()` function.
570
571        Returns
572        -------
573        A `SuccessTuple` or `bool` indicating success.
574
575        """
576        from meerschaum.utils.debug import dprint
577        import inspect
578        _setup = None
579        for name, fp in inspect.getmembers(self.module):
580            if name == 'setup' and inspect.isfunction(fp):
581                _setup = fp
582                break
583
584        ### assume success if no setup() is found (not necessary)
585        if _setup is None:
586            return True
587
588        sig = inspect.signature(_setup)
589        has_debug, has_kw = ('debug' in sig.parameters), False
590        for k, v in sig.parameters.items():
591            if '**' in str(v):
592                has_kw = True
593                break
594
595        _kw = {}
596        if has_kw:
597            _kw.update(kw)
598        if has_debug:
599            _kw['debug'] = debug
600
601        if debug:
602            dprint(f"Running setup for plugin '{self}'...")
603        try:
604            self.activate_venv(debug=debug)
605            return_tuple = _setup(*args, **_kw)
606            self.deactivate_venv(debug=debug)
607        except Exception as e:
608            return False, str(e)
609
610        if isinstance(return_tuple, tuple):
611            return return_tuple
612        if isinstance(return_tuple, bool):
613            return return_tuple, f"Setup for Plugin '{self.name}' did not return a message."
614        if return_tuple is None:
615            return False, f"Setup for Plugin '{self.name}' returned None."
616        return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"

If exists, run the plugin's setup() function.

Parameters
  • *args (str): The positional arguments passed to the setup() function.
  • debug (bool, default False): Verbosity toggle.
  • **kw (Any): The keyword arguments passed to the setup() function.
Returns
  • A SuccessTuple or bool indicating success.
def get_dependencies(self, debug: bool = False) -> List[str]:
619    def get_dependencies(
620        self,
621        debug: bool = False,
622    ) -> List[str]:
623        """
624        If the Plugin has specified dependencies in a list called `required`, return the list.
625        
626        **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages.
627        Meerschaum plugins may also specify connector keys for a repo after `'@'`.
628
629        Parameters
630        ----------
631        debug: bool, default False
632            Verbosity toggle.
633
634        Returns
635        -------
636        A list of required packages and plugins (str).
637
638        """
639        if '_required' in self.__dict__:
640            return self._required
641
642        ### If the plugin has not yet been imported,
643        ### infer the dependencies from the source text.
644        ### This is not super robust, and it doesn't feel right
645        ### having multiple versions of the logic.
646        ### This is necessary when determining the activation order
647        ### without having import the module.
648        ### For consistency's sake, the module-less method does not cache the requirements.
649        if self.__dict__.get('_module', None) is None:
650            file_path = self.__file__
651            if file_path is None:
652                return []
653            with open(file_path, 'r', encoding='utf-8') as f:
654                text = f.read()
655
656            if 'required' not in text:
657                return []
658
659            ### This has some limitations:
660            ### It relies on `required` being manually declared.
661            ### We lose the ability to dynamically alter the `required` list,
662            ### which is why we've kept the module-reliant method below.
663            import ast, re
664            ### NOTE: This technically would break 
665            ### if `required` was the very first line of the file.
666            req_start_match = re.search(r'\nrequired(:\s*)?.*=', text)
667            if not req_start_match:
668                return []
669            req_start = req_start_match.start()
670            equals_sign = req_start + text[req_start:].find('=')
671
672            ### Dependencies may have brackets within the strings, so push back the index.
673            first_opening_brace = equals_sign + 1 + text[equals_sign:].find('[')
674            if first_opening_brace == -1:
675                return []
676
677            next_closing_brace = equals_sign + 1 + text[equals_sign:].find(']')
678            if next_closing_brace == -1:
679                return []
680
681            start_ix = first_opening_brace + 1
682            end_ix = next_closing_brace
683
684            num_braces = 0
685            while True:
686                if '[' not in text[start_ix:end_ix]:
687                    break
688                num_braces += 1
689                start_ix = end_ix
690                end_ix += text[end_ix + 1:].find(']') + 1
691
692            req_end = end_ix + 1
693            req_text = (
694                text[(first_opening_brace-1):req_end]
695                .lstrip()
696                .replace('=', '', 1)
697                .lstrip()
698                .rstrip()
699            )
700            try:
701                required = ast.literal_eval(req_text)
702            except Exception as e:
703                warn(
704                    f"Unable to determine requirements for plugin '{self.name}' "
705                    + "without importing the module.\n"
706                    + "    This may be due to dynamically setting the global `required` list.\n"
707                    + f"    {e}"
708                )
709                return []
710            return required
711
712        import inspect
713        self.activate_venv(dependencies=False, debug=debug)
714        required = []
715        for name, val in inspect.getmembers(self.module):
716            if name == 'required':
717                required = val
718                break
719        self._required = required
720        self.deactivate_venv(dependencies=False, debug=debug)
721        return required

If the Plugin has specified dependencies in a list called required, return the list.

NOTE: Dependecies which start with 'plugin:' are Meerschaum plugins, not pip packages. Meerschaum plugins may also specify connector keys for a repo after '@'.

Parameters
  • debug (bool, default False): Verbosity toggle.
Returns
  • A list of required packages and plugins (str).
def get_required_plugins(self, debug: bool = False) -> List[Plugin]:
724    def get_required_plugins(self, debug: bool=False) -> List[mrsm.plugins.Plugin]:
725        """
726        Return a list of required Plugin objects.
727        """
728        from meerschaum.utils.warnings import warn
729        from meerschaum.config import get_config
730        from meerschaum._internal.static import STATIC_CONFIG
731        from meerschaum.connectors.parse import is_valid_connector_keys
732        plugins = []
733        _deps = self.get_dependencies(debug=debug)
734        sep = STATIC_CONFIG['plugins']['repo_separator']
735        plugin_names = [
736            _d[len('plugin:'):] for _d in _deps
737            if _d.startswith('plugin:') and len(_d) > len('plugin:')
738        ]
739        default_repo_keys = get_config('meerschaum', 'repository')
740        skipped_repo_keys = set()
741
742        for _plugin_name in plugin_names:
743            if sep in _plugin_name:
744                try:
745                    _plugin_name, _repo_keys = _plugin_name.split(sep)
746                except Exception:
747                    _repo_keys = default_repo_keys
748                    warn(
749                        f"Invalid repo keys for required plugin '{_plugin_name}'.\n    "
750                        + f"Will try to use '{_repo_keys}' instead.",
751                        stack = False,
752                    )
753            else:
754                _repo_keys = default_repo_keys
755
756            if _repo_keys in skipped_repo_keys:
757                continue
758
759            if not is_valid_connector_keys(_repo_keys):
760                warn(
761                    f"Invalid connector '{_repo_keys}'.\n"
762                    f"    Skipping required plugins from repository '{_repo_keys}'",
763                    stack=False,
764                )
765                continue
766
767            plugins.append(Plugin(_plugin_name, repo=_repo_keys))
768
769        return plugins

Return a list of required Plugin objects.

def get_required_packages(self, debug: bool = False) -> List[str]:
772    def get_required_packages(self, debug: bool=False) -> List[str]:
773        """
774        Return the required package names (excluding plugins).
775        """
776        _deps = self.get_dependencies(debug=debug)
777        return [_d for _d in _deps if not _d.startswith('plugin:')]

Return the required package names (excluding plugins).

def activate_venv( self, dependencies: bool = True, init_if_not_exists: bool = True, debug: bool = False, **kw) -> bool:
780    def activate_venv(
781        self,
782        dependencies: bool = True,
783        init_if_not_exists: bool = True,
784        debug: bool = False,
785        **kw
786    ) -> bool:
787        """
788        Activate the virtual environments for the plugin and its dependencies.
789
790        Parameters
791        ----------
792        dependencies: bool, default True
793            If `True`, activate the virtual environments for required plugins.
794
795        Returns
796        -------
797        A bool indicating success.
798        """
799        from meerschaum.utils.venv import venv_target_path
800        from meerschaum.utils.packages import activate_venv
801        from meerschaum.utils.misc import make_symlink, is_symlink
802        from meerschaum.config._paths import PACKAGE_ROOT_PATH
803
804        if dependencies:
805            for plugin in self.get_required_plugins(debug=debug):
806                plugin.activate_venv(debug=debug, init_if_not_exists=init_if_not_exists, **kw)
807
808        vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True)
809        venv_meerschaum_path = vtp / 'meerschaum'
810
811        try:
812            success, msg = True, "Success"
813            if is_symlink(venv_meerschaum_path):
814                if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH:
815                    venv_meerschaum_path.unlink()
816                    success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH)
817        except Exception as e:
818            success, msg = False, str(e)
819        if not success:
820            warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}")
821
822        return activate_venv(self.name, init_if_not_exists=init_if_not_exists, debug=debug, **kw)

Activate the virtual environments for the plugin and its dependencies.

Parameters
  • dependencies (bool, default True): If True, activate the virtual environments for required plugins.
Returns
  • A bool indicating success.
def deactivate_venv(self, dependencies: bool = True, debug: bool = False, **kw) -> bool:
825    def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool:
826        """
827        Deactivate the virtual environments for the plugin and its dependencies.
828
829        Parameters
830        ----------
831        dependencies: bool, default True
832            If `True`, deactivate the virtual environments for required plugins.
833
834        Returns
835        -------
836        A bool indicating success.
837        """
838        from meerschaum.utils.packages import deactivate_venv
839        success = deactivate_venv(self.name, debug=debug, **kw)
840        if dependencies:
841            for plugin in self.get_required_plugins(debug=debug):
842                plugin.deactivate_venv(debug=debug, **kw)
843        return success

Deactivate the virtual environments for the plugin and its dependencies.

Parameters
  • dependencies (bool, default True): If True, deactivate the virtual environments for required plugins.
Returns
  • A bool indicating success.
def install_dependencies(self, force: bool = False, debug: bool = False) -> bool:
846    def install_dependencies(
847        self,
848        force: bool = False,
849        debug: bool = False,
850    ) -> bool:
851        """
852        If specified, install dependencies.
853        
854        **NOTE:** Dependencies that start with `'plugin:'` will be installed as
855        Meerschaum plugins from the same repository as this Plugin.
856        To install from a different repository, add the repo keys after `'@'`
857        (e.g. `'plugin:foo@api:bar'`).
858
859        Parameters
860        ----------
861        force: bool, default False
862            If `True`, continue with the installation, even if some
863            required packages fail to install.
864
865        debug: bool, default False
866            Verbosity toggle.
867
868        Returns
869        -------
870        A bool indicating success.
871        """
872        from meerschaum.utils.packages import pip_install, venv_contains_package
873        from meerschaum.utils.warnings import warn, info
874        _deps = self.get_dependencies(debug=debug)
875        if not _deps and self.requirements_file_path is None:
876            return True
877
878        plugins = self.get_required_plugins(debug=debug)
879        for _plugin in plugins:
880            if _plugin.name == self.name:
881                warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False)
882                continue
883            _success, _msg = _plugin.repo_connector.install_plugin(
884                _plugin.name, debug=debug, force=force
885            )
886            if not _success:
887                warn(
888                    f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'"
889                    + f" for plugin '{self.name}':\n" + _msg,
890                    stack = False,
891                )
892                if not force:
893                    warn(
894                        "Try installing with the `--force` flag to continue anyway.",
895                        stack = False,
896                    )
897                    return False
898                info(
899                    "Continuing with installation despite the failure "
900                    + "(careful, things might be broken!)...",
901                    icon = False
902                )
903
904
905        ### First step: parse `requirements.txt` if it exists.
906        if self.requirements_file_path is not None:
907            if not pip_install(
908                requirements_file_path=self.requirements_file_path,
909                venv=self.name, debug=debug
910            ):
911                warn(
912                    f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.",
913                    stack = False,
914                )
915                if not force:
916                    warn(
917                        "Try installing with `--force` to continue anyway.",
918                        stack = False,
919                    )
920                    return False
921                info(
922                    "Continuing with installation despite the failure "
923                    + "(careful, things might be broken!)...",
924                    icon = False
925                )
926
927
928        ### Don't reinstall packages that are already included in required plugins.
929        packages = []
930        _packages = self.get_required_packages(debug=debug)
931        accounted_for_packages = set()
932        for package_name in _packages:
933            for plugin in plugins:
934                if venv_contains_package(package_name, plugin.name):
935                    accounted_for_packages.add(package_name)
936                    break
937        packages = [pkg for pkg in _packages if pkg not in accounted_for_packages]
938
939        ### Attempt pip packages installation.
940        if packages:
941            for package in packages:
942                if not pip_install(package, venv=self.name, debug=debug):
943                    warn(
944                        f"Failed to install required package '{package}'"
945                        + f" for plugin '{self.name}'.",
946                        stack = False,
947                    )
948                    if not force:
949                        warn(
950                            "Try installing with `--force` to continue anyway.",
951                            stack = False,
952                        )
953                        return False
954                    info(
955                        "Continuing with installation despite the failure "
956                        + "(careful, things might be broken!)...",
957                        icon = False
958                    )
959        return True

If specified, install dependencies.

NOTE: Dependencies that start with 'plugin:' will be installed as Meerschaum plugins from the same repository as this Plugin. To install from a different repository, add the repo keys after '@' (e.g. 'plugin:foo@api:bar').

Parameters
  • force (bool, default False): If True, continue with the installation, even if some required packages fail to install.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool indicating success.
full_name: str
962    @property
963    def full_name(self) -> str:
964        """
965        Include the repo keys with the plugin's name.
966        """
967        from meerschaum._internal.static import STATIC_CONFIG
968        sep = STATIC_CONFIG['plugins']['repo_separator']
969        return self.name + sep + str(self.repo_connector)

Include the repo keys with the plugin's name.

def make_action( function: Optional[Callable[[Any], Any]] = None, shell: bool = False, activate: bool = True, deactivate: bool = True, debug: bool = False, daemon: bool = True, skip_if_loaded: bool = True, _plugin_name: Optional[str] = None) -> Callable[[Any], Any]:
 59def make_action(
 60    function: Optional[Callable[[Any], Any]] = None,
 61    shell: bool = False,
 62    activate: bool = True,
 63    deactivate: bool = True,
 64    debug: bool = False,
 65    daemon: bool = True,
 66    skip_if_loaded: bool = True,
 67    _plugin_name: Optional[str] = None,
 68) -> Callable[[Any], Any]:
 69    """
 70    Make a function a Meerschaum action. Useful for plugins that are adding multiple actions.
 71    
 72    Parameters
 73    ----------
 74    function: Callable[[Any], Any]
 75        The function to become a Meerschaum action. Must accept all keyword arguments.
 76        
 77    shell: bool, default False
 78        Not used.
 79        
 80    Returns
 81    -------
 82    Another function (this is a decorator function).
 83
 84    Examples
 85    --------
 86    >>> from meerschaum.plugins import make_action
 87    >>>
 88    >>> @make_action
 89    ... def my_action(**kw):
 90    ...     print('foo')
 91    ...     return True, "Success"
 92    >>>
 93    """
 94    def _decorator(func: Callable[[Any], Any]) -> Callable[[Any], Any]:
 95        from meerschaum.actions import actions, _custom_actions_plugins, _plugins_actions
 96        if skip_if_loaded and func.__name__ in actions:
 97            return func
 98
 99        from meerschaum.config.paths import PLUGINS_RESOURCES_PATH
100        plugin_name = (
101            func.__name__.split(f"{PLUGINS_RESOURCES_PATH.stem}.", maxsplit=1)[-1].split('.')[0]
102        )
103        plugin = Plugin(plugin_name) if plugin_name else None
104
105        if debug:
106            from meerschaum.utils.debug import dprint
107            dprint(
108                f"Adding action '{func.__name__}' from plugin "
109                f"'{plugin}'..."
110            )
111
112        actions[func.__name__] = func
113        _custom_actions_plugins[func.__name__] = plugin_name
114        if plugin_name not in _plugins_actions:
115            _plugins_actions[plugin_name] = []
116        _plugins_actions[plugin_name].append(func.__name__)
117        if not daemon:
118            _actions_daemon_enabled[func.__name__] = False
119        return func
120
121    if function is None:
122        return _decorator
123    return _decorator(function)

Make a function a Meerschaum action. Useful for plugins that are adding multiple actions.

Parameters
  • function (Callable[[Any], Any]): The function to become a Meerschaum action. Must accept all keyword arguments.
  • shell (bool, default False): Not used.
Returns
  • Another function (this is a decorator function).
Examples
>>> from meerschaum.plugins import make_action
>>>
>>> @make_action
... def my_action(**kw):
...     print('foo')
...     return True, "Success"
>>>
def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
294def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
295    """
296    Execute the function when initializing the Meerschaum API module.
297    Useful for lazy-loading heavy plugins only when the API is started,
298    such as when editing the `meerschaum.api.app` FastAPI app.
299    
300    The FastAPI app will be passed as the only parameter.
301    
302    Examples
303    --------
304    >>> from meerschaum.plugins import api_plugin
305    >>>
306    >>> @api_plugin
307    >>> def initialize_plugin(app):
308    ...     @app.get('/my/new/path')
309    ...     def new_path():
310    ...         return {'message': 'It works!'}
311    >>>
312    """
313    with _locks['_api_plugins']:
314        try:
315            if function.__module__ not in _api_plugins:
316                _api_plugins[function.__module__] = []
317            _api_plugins[function.__module__].append(function)
318        except Exception as e:
319            from meerschaum.utils.warnings import warn
320            warn(e)
321    return function

Execute the function when initializing the Meerschaum API module. Useful for lazy-loading heavy plugins only when the API is started, such as when editing the meerschaum.api.app FastAPI app.

The FastAPI app will be passed as the only parameter.

Examples
>>> from meerschaum.plugins import api_plugin
>>>
>>> @api_plugin
>>> def initialize_plugin(app):
...     @app.get('/my/new/path')
...     def new_path():
...         return {'message': 'It works!'}
>>>
def dash_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
278def dash_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
279    """
280    Execute the function when starting the Dash application.
281    """
282    with _locks['_dash_plugins']:
283        plugin_name = _get_parent_plugin()
284        try:
285            if plugin_name not in _dash_plugins:
286                _dash_plugins[plugin_name] = []
287            _dash_plugins[plugin_name].append(function)
288        except Exception as e:
289            from meerschaum.utils.warnings import warn
290            warn(e)
291    return function

Execute the function when starting the Dash application.

def web_page( page: Union[str, NoneType, Callable[[Any], Any]] = None, login_required: bool = True, skip_navbar: bool = False, page_group: Optional[str] = None, **kwargs) -> Any:
204def web_page(
205    page: Union[str, None, Callable[[Any], Any]] = None,
206    login_required: bool = True,
207    skip_navbar: bool = False,
208    page_group: Optional[str] = None,
209    **kwargs
210) -> Any:
211    """
212    Quickly add pages to the dash application.
213
214    Examples
215    --------
216    >>> import meerschaum as mrsm
217    >>> from meerschaum.plugins import web_page
218    >>> html = mrsm.attempt_import('dash.html')
219    >>> 
220    >>> @web_page('foo/bar', login_required=False)
221    >>> def foo_bar():
222    ...     return html.Div([html.H1("Hello, World!")])
223    >>> 
224    """
225    page_str = None
226
227    def _decorator(_func: Callable[[Any], Any]) -> Callable[[Any], Any]:
228        nonlocal page_str, page_group
229
230        @functools.wraps(_func)
231        def wrapper(*_args, **_kwargs):
232            return _func(*_args, **_kwargs)
233
234        if page_str is None:
235            page_str = _func.__name__
236
237        page_str = page_str.lstrip('/').rstrip('/').strip()
238        if not page_str.startswith('dash'):
239            page_str = f'/dash/{page_str}'
240        page_key = (
241            ' '.join(
242                [
243                    word.capitalize()
244                    for word in (
245                        page_str.replace('/dash', '').lstrip('/').rstrip('/').strip()
246                        .replace('-', ' ').replace('_', ' ').split(' ')
247                    )
248                ]
249            )
250        )
251 
252        plugin_name = _get_parent_plugin()
253        page_group = page_group or plugin_name
254        if page_group not in _plugin_endpoints_to_pages:
255            _plugin_endpoints_to_pages[page_group] = {}
256        _plugin_endpoints_to_pages[page_group][page_str] = {
257            'function': _func,
258            'login_required': login_required,
259            'skip_navbar': skip_navbar,
260            'page_key': page_key,
261        }
262        if plugin_name not in _plugins_web_pages:
263            _plugins_web_pages[plugin_name] = []
264        _plugins_web_pages[plugin_name].append(_func)
265        return wrapper
266
267    if callable(page):
268        decorator_to_return = _decorator(page)
269        page_str = page.__name__
270    else:
271        decorator_to_return = _decorator
272        page_str = page
273
274    return decorator_to_return

Quickly add pages to the dash application.

Examples
>>> import meerschaum as mrsm
>>> from meerschaum.plugins import web_page
>>> html = mrsm.attempt_import('dash.html')
>>> 
>>> @web_page('foo/bar', login_required=False)
>>> def foo_bar():
...     return html.Div([html.H1("Hello, World!")])
>>>
def import_plugins( *plugins_to_import: Union[str, List[str], NoneType], warn: bool = True) -> "Union['ModuleType', Tuple['ModuleType', None]]":
513def import_plugins(
514    *plugins_to_import: Union[str, List[str], None],
515    warn: bool = True,
516) -> Union[
517    'ModuleType', Tuple['ModuleType', None]
518]:
519    """
520    Import the Meerschaum plugins directory.
521
522    Parameters
523    ----------
524    plugins_to_import: Union[str, List[str], None]
525        If provided, only import the specified plugins.
526        Otherwise import the entire plugins module. May be a string, list, or `None`.
527        Defaults to `None`.
528
529    Returns
530    -------
531    A module of list of modules, depening on the number of plugins provided.
532
533    """
534    import sys
535    import importlib
536    from meerschaum.utils.misc import flatten_list
537    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
538    from meerschaum.utils.venv import is_venv_active, activate_venv, deactivate_venv, Venv
539    from meerschaum.utils.warnings import warn as _warn
540    plugins_to_import = list(plugins_to_import)
541    prepended_sys_path = False
542    with _locks['sys.path']:
543
544        ### Since plugins may depend on other plugins,
545        ### we need to activate the virtual environments for library plugins.
546        ### This logic exists in `Plugin.activate_venv()`,
547        ### but that code requires the plugin's module to already be imported.
548        ### It's not a guarantee of correct activation order,
549        ### e.g. if a library plugin pins a specific package and another 
550        plugins_names = get_plugins_names()
551        already_active_venvs = [is_venv_active(plugin_name) for plugin_name in plugins_names]
552
553        if not sys.path or sys.path[0] != str(PLUGINS_RESOURCES_PATH.parent):
554            prepended_sys_path = True
555            sys.path.insert(0, str(PLUGINS_RESOURCES_PATH.parent))
556
557        if not plugins_to_import:
558            for plugin_name in plugins_names:
559                activate_venv(plugin_name)
560            try:
561                imported_plugins = importlib.import_module(PLUGINS_RESOURCES_PATH.stem)
562            except ImportError as e:
563                _warn(f"Failed to import the plugins module:\n    {e}")
564                import traceback
565                traceback.print_exc()
566                imported_plugins = None
567            for plugin_name in plugins_names:
568                if plugin_name in already_active_venvs:
569                    continue
570                deactivate_venv(plugin_name)
571
572        else:
573            imported_plugins = []
574            for plugin_name in flatten_list(plugins_to_import):
575                plugin = Plugin(plugin_name)
576                try:
577                    with Venv(plugin, init_if_not_exists=False):
578                        imported_plugins.append(
579                            importlib.import_module(
580                                f'{PLUGINS_RESOURCES_PATH.stem}.{plugin_name}'
581                            )
582                        )
583                except Exception as e:
584                    _warn(
585                        f"Failed to import plugin '{plugin_name}':\n    "
586                        + f"{e}\n\nHere's a stacktrace:",
587                        stack = False,
588                    )
589                    from meerschaum.utils.formatting import get_console
590                    get_console().print_exception(
591                        suppress = [
592                            'meerschaum/plugins/__init__.py',
593                            importlib,
594                            importlib._bootstrap,
595                        ]
596                    )
597                    imported_plugins.append(None)
598
599        if imported_plugins is None and warn:
600            _warn("Failed to import plugins.", stacklevel=3)
601
602        if prepended_sys_path and str(PLUGINS_RESOURCES_PATH.parent) in sys.path:
603            sys.path.remove(str(PLUGINS_RESOURCES_PATH.parent))
604
605    if isinstance(imported_plugins, list):
606        return (imported_plugins[0] if len(imported_plugins) == 1 else tuple(imported_plugins))
607    return imported_plugins

Import the Meerschaum plugins directory.

Parameters
  • plugins_to_import (Union[str, List[str], None]): If provided, only import the specified plugins. Otherwise import the entire plugins module. May be a string, list, or None. Defaults to None.
Returns
  • A module of list of modules, depening on the number of plugins provided.
def from_plugin_import(plugin_import_name: str, *attrs: str) -> Any:
610def from_plugin_import(plugin_import_name: str, *attrs: str) -> Any:
611    """
612    Emulate the `from module import x` behavior.
613
614    Parameters
615    ----------
616    plugin_import_name: str
617        The import name of the plugin's module.
618        Separate submodules with '.' (e.g. 'compose.utils.pipes')
619
620    attrs: str
621        Names of the attributes to return.
622
623    Returns
624    -------
625    Objects from a plugin's submodule.
626    If multiple objects are provided, return a tuple.
627
628    Examples
629    --------
630    >>> init = from_plugin_import('compose.utils', 'init')
631    >>> with mrsm.Venv('compose'):
632    ...     cf = init()
633    >>> build_parent_pipe, get_defined_pipes = from_plugin_import(
634    ...     'compose.utils.pipes',
635    ...     'build_parent_pipe',
636    ...     'get_defined_pipes',
637    ... )
638    >>> parent_pipe = build_parent_pipe(cf)
639    >>> defined_pipes = get_defined_pipes(cf)
640    """
641    import importlib
642    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
643    from meerschaum.utils.warnings import warn as _warn
644    if plugin_import_name.startswith('plugins.'):
645        plugin_import_name = plugin_import_name[len('plugins.'):]
646    plugin_import_parts = plugin_import_name.split('.')
647    plugin_root_name = plugin_import_parts[0]
648    plugin = mrsm.Plugin(plugin_root_name)
649
650    submodule_import_name = '.'.join(
651        [PLUGINS_RESOURCES_PATH.stem]
652        + plugin_import_parts
653    )
654    if len(attrs) == 0:
655        raise ValueError(f"Provide which attributes to return from '{submodule_import_name}'.")
656
657    attrs_to_return = []
658    with mrsm.Venv(plugin):
659        if plugin.module is None:
660            raise ImportError(f"Unable to import plugin '{plugin}'.")
661
662        try:
663            submodule = importlib.import_module(submodule_import_name)
664        except ImportError as e:
665            _warn(
666                f"Failed to import plugin '{submodule_import_name}':\n    "
667                + f"{e}\n\nHere's a stacktrace:",
668                stack=False,
669            )
670            from meerschaum.utils.formatting import get_console
671            get_console().print_exception(
672                suppress=[
673                    'meerschaum/plugins/__init__.py',
674                    importlib,
675                    importlib._bootstrap,
676                ]
677            )
678            return None
679
680        for attr in attrs:
681            try:
682                attrs_to_return.append(getattr(submodule, attr))
683            except Exception:
684                _warn(f"Failed to access '{attr}' from '{submodule_import_name}'.")
685                attrs_to_return.append(None)
686        
687        if len(attrs) == 1:
688            return attrs_to_return[0]
689
690        return tuple(attrs_to_return)

Emulate the from module import x behavior.

Parameters
  • plugin_import_name (str): The import name of the plugin's module. Separate submodules with '.' (e.g. 'compose.utils.pipes')
  • attrs (str): Names of the attributes to return.
Returns
  • Objects from a plugin's submodule.
  • If multiple objects are provided, return a tuple.
Examples
>>> init = from_plugin_import('compose.utils', 'init')
>>> with mrsm.Venv('compose'):
...     cf = init()
>>> build_parent_pipe, get_defined_pipes = from_plugin_import(
...     'compose.utils.pipes',
...     'build_parent_pipe',
...     'get_defined_pipes',
... )
>>> parent_pipe = build_parent_pipe(cf)
>>> defined_pipes = get_defined_pipes(cf)
def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
875def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
876    """
877    Reload plugins back into memory.
878
879    Parameters
880    ----------
881    plugins: Optional[List[str]], default None
882        The plugins to reload. `None` will reload all plugins.
883
884    """
885    global _synced_symlinks
886    unload_plugins(plugins, debug=debug)
887    _synced_symlinks = False
888    sync_plugins_symlinks(debug=debug)
889    load_plugins(skip_if_loaded=False, debug=debug)

Reload plugins back into memory.

Parameters
  • plugins (Optional[List[str]], default None): The plugins to reload. None will reload all plugins.
def get_plugins( *to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]:
892def get_plugins(*to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]:
893    """
894    Return a list of `Plugin` objects.
895
896    Parameters
897    ----------
898    to_load:
899        If specified, only load specific plugins.
900        Otherwise return all plugins.
901
902    try_import: bool, default True
903        If `True`, allow for plugins to be imported.
904    """
905    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
906    import os
907    sync_plugins_symlinks()
908    _plugins = [
909        Plugin(name)
910        for name in (
911            to_load or [
912                (
913                    name if (PLUGINS_RESOURCES_PATH / name).is_dir()
914                    else name[:-3]
915                )
916                for name in os.listdir(PLUGINS_RESOURCES_PATH)
917                if name != '__init__.py'
918            ]
919        )
920    ]
921    plugins = tuple(plugin for plugin in _plugins if plugin.is_installed(try_import=try_import))
922    if len(to_load) == 1:
923        if len(plugins) == 0:
924            raise ValueError(f"Plugin '{to_load[0]}' is not installed.")
925        return plugins[0]
926    return plugins

Return a list of Plugin objects.

Parameters
  • to_load:: If specified, only load specific plugins. Otherwise return all plugins.
  • try_import (bool, default True): If True, allow for plugins to be imported.
def get_data_plugins() -> List[Plugin]:
943def get_data_plugins() -> List[Plugin]:
944    """
945    Only return the modules of plugins with either `fetch()` or `sync()` functions.
946    """
947    import inspect
948    plugins = get_plugins()
949    data_names = {'sync', 'fetch'}
950    data_plugins = []
951    for plugin in plugins:
952        for name, ob in inspect.getmembers(plugin.module):
953            if not inspect.isfunction(ob):
954                continue
955            if name not in data_names:
956                continue
957            data_plugins.append(plugin)
958    return data_plugins

Only return the modules of plugins with either fetch() or sync() functions.

def add_plugin_argument(*args, **kwargs) -> None:
961def add_plugin_argument(*args, **kwargs) -> None:
962    """
963    Add argparse arguments under the 'Plugins options' group.
964    Takes the same parameters as the regular argparse `add_argument()` function.
965
966    Examples
967    --------
968    >>> add_plugin_argument('--foo', type=int, help="This is my help text!")
969    >>> 
970    """
971    from meerschaum._internal.arguments._parser import groups, _seen_plugin_args, parser
972    from meerschaum.utils.warnings import warn
973    _parent_plugin_name = _get_parent_plugin()
974    title = f"Plugin '{_parent_plugin_name}' options" if _parent_plugin_name else 'Custom options'
975    group_key = 'plugin_' + (_parent_plugin_name or '')
976    if group_key not in groups:
977        groups[group_key] = parser.add_argument_group(
978            title = title,
979        )
980        _seen_plugin_args[group_key] = set()
981    try:
982        if str(args) not in _seen_plugin_args[group_key]:
983            groups[group_key].add_argument(*args, **kwargs)
984            _seen_plugin_args[group_key].add(str(args))
985    except Exception as e:
986        warn(e)

Add argparse arguments under the 'Plugins options' group. Takes the same parameters as the regular argparse add_argument() function.

Examples
>>> add_plugin_argument('--foo', type=int, help="This is my help text!")
>>>
def pre_sync_hook(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
126def pre_sync_hook(
127    function: Callable[[Any], Any],
128) -> Callable[[Any], Any]:
129    """
130    Register a function as a sync hook to be executed right before sync.
131    
132    Parameters
133    ----------
134    function: Callable[[Any], Any]
135        The function to execute right before a sync.
136        
137    Returns
138    -------
139    Another function (this is a decorator function).
140
141    Examples
142    --------
143    >>> from meerschaum.plugins import pre_sync_hook
144    >>>
145    >>> @pre_sync_hook
146    ... def log_sync(pipe, **kwargs):
147    ...     print(f"About to sync {pipe} with kwargs:\n{kwargs}.")
148    >>>
149    """
150    with _locks['_pre_sync_hooks']:
151        plugin_name = _get_parent_plugin()
152        try:
153            if plugin_name not in _pre_sync_hooks:
154                _pre_sync_hooks[plugin_name] = []
155            _pre_sync_hooks[plugin_name].append(function)
156        except Exception as e:
157            from meerschaum.utils.warnings import warn
158            warn(e)
159    return function

Register a function as a sync hook to be executed right before sync.

Parameters
----------
function: Callable[[Any], Any]
    The function to execute right before a sync.

Returns
-------
Another function (this is a decorator function).

Examples
--------
>>> from meerschaum.plugins import pre_sync_hook
>>>
>>> @pre_sync_hook
... def log_sync(pipe, **kwargs):
...     print(f"About to sync {pipe} with kwargs:

{kwargs}.")

>

def post_sync_hook(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
162def post_sync_hook(
163    function: Callable[[Any], Any],
164) -> Callable[[Any], Any]:
165    """
166    Register a function as a sync hook to be executed upon completion of a sync.
167    
168    Parameters
169    ----------
170    function: Callable[[Any], Any]
171        The function to execute upon completion of a sync.
172        
173    Returns
174    -------
175    Another function (this is a decorator function).
176
177    Examples
178    --------
179    >>> from meerschaum.plugins import post_sync_hook
180    >>> from meerschaum.utils.misc import interval_str
181    >>> from datetime import timedelta
182    >>>
183    >>> @post_sync_hook
184    ... def log_sync(pipe, success_tuple, duration=None, **kwargs):
185    ...     duration_delta = timedelta(seconds=duration)
186    ...     duration_text = interval_str(duration_delta)
187    ...     print(f"It took {duration_text} to sync {pipe}.")
188    >>>
189    """
190    with _locks['_post_sync_hooks']:
191        try:
192            plugin_name = _get_parent_plugin()
193            if plugin_name not in _post_sync_hooks:
194                _post_sync_hooks[plugin_name] = []
195            _post_sync_hooks[plugin_name].append(function)
196        except Exception as e:
197            from meerschaum.utils.warnings import warn
198            warn(e)
199    return function

Register a function as a sync hook to be executed upon completion of a sync.

Parameters
  • function (Callable[[Any], Any]): The function to execute upon completion of a sync.
Returns
  • Another function (this is a decorator function).
Examples
>>> from meerschaum.plugins import post_sync_hook
>>> from meerschaum.utils.misc import interval_str
>>> from datetime import timedelta
>>>
>>> @post_sync_hook
... def log_sync(pipe, success_tuple, duration=None, **kwargs):
...     duration_delta = timedelta(seconds=duration)
...     duration_text = interval_str(duration_delta)
...     print(f"It took {duration_text} to sync {pipe}.")
>>>