meerschaum.plugins

Expose plugin management APIs from the meerschaum.plugins module.

  1#! /usr/bin/env python
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Expose plugin management APIs from the `meerschaum.plugins` module.
  7"""
  8
  9from __future__ import annotations
 10import functools
 11from meerschaum.utils.typing import Callable, Any, Union, Optional, Dict, List, Tuple
 12from meerschaum.utils.threading import Lock, RLock
 13from meerschaum.plugins._Plugin import Plugin
 14
 15_api_plugins: Dict[str, List[Callable[['fastapi.App'], Any]]] = {}
 16_pre_sync_hooks: Dict[str, List[Callable[[Any], Any]]] = {}
 17_post_sync_hooks: Dict[str, List[Callable[[Any], Any]]] = {}
 18_locks = {
 19    '_api_plugins': RLock(),
 20    '_dash_plugins': RLock(),
 21    '_pre_sync_hooks': RLock(),
 22    '_post_sync_hooks': RLock(),
 23    '__path__': RLock(),
 24    'sys.path': RLock(),
 25    'internal_plugins': RLock(),
 26    '_synced_symlinks': RLock(),
 27    'PLUGINS_INTERNAL_LOCK_PATH': RLock(),
 28}
 29__all__ = (
 30    "Plugin", "make_action", "api_plugin", "dash_plugin", "import_plugins",
 31    "reload_plugins", "get_plugins", "get_data_plugins", "add_plugin_argument",
 32    "pre_sync_hook", "post_sync_hook",
 33)
 34__pdoc__ = {
 35    'venvs': False, 'data': False, 'stack': False, 'plugins': False,
 36}
 37
 38def make_action(
 39        function: Callable[[Any], Any],
 40        shell: bool = False,
 41        activate: bool = True,
 42        deactivate: bool = True,
 43        debug: bool = False
 44    ) -> Callable[[Any], Any]:
 45    """
 46    Make a function a Meerschaum action. Useful for plugins that are adding multiple actions.
 47    
 48    Parameters
 49    ----------
 50    function: Callable[[Any], Any]
 51        The function to become a Meerschaum action. Must accept all keyword arguments.
 52        
 53    shell: bool, default False
 54        Not used.
 55        
 56    Returns
 57    -------
 58    Another function (this is a decorator function).
 59
 60    Examples
 61    --------
 62    >>> from meerschaum.plugins import make_action
 63    >>>
 64    >>> @make_action
 65    ... def my_action(**kw):
 66    ...     print('foo')
 67    ...     return True, "Success"
 68    >>>
 69    """
 70
 71    from meerschaum.actions import actions
 72    from meerschaum.utils.formatting import pprint
 73    package_name = function.__globals__['__name__']
 74    plugin_name = (
 75        package_name.split('.')[1]
 76        if package_name.startswith('plugins.') else None
 77    )
 78    plugin = Plugin(plugin_name) if plugin_name else None
 79
 80    if debug:
 81        from meerschaum.utils.debug import dprint
 82        dprint(
 83            f"Adding action '{function.__name__}' from plugin " +
 84            f"'{plugin}'..."
 85        )
 86
 87    actions[function.__name__] = function
 88    return function
 89
 90
 91def pre_sync_hook(
 92        function: Callable[[Any], Any],
 93    ) -> Callable[[Any], Any]:
 94    """
 95    Register a function as a sync hook to be executed right before sync.
 96    
 97    Parameters
 98    ----------
 99    function: Callable[[Any], Any]
100        The function to execute right before a sync.
101        
102    Returns
103    -------
104    Another function (this is a decorator function).
105
106    Examples
107    --------
108    >>> from meerschaum.plugins import pre_sync_hook
109    >>>
110    >>> @pre_sync_hook
111    ... def log_sync(pipe, **kwargs):
112    ...     print(f"About to sync {pipe} with kwargs:\n{kwargs}.")
113    >>>
114    """
115    with _locks['_pre_sync_hooks']:
116        try:
117            if function.__module__ not in _pre_sync_hooks:
118                _pre_sync_hooks[function.__module__] = []
119            _pre_sync_hooks[function.__module__].append(function)
120        except Exception as e:
121            from meerschaum.utils.warnings import warn
122            warn(e)
123    return function
124
125
126def post_sync_hook(
127        function: Callable[[Any], Any],
128    ) -> Callable[[Any], Any]:
129    """
130    Register a function as a sync hook to be executed upon completion of a sync.
131    
132    Parameters
133    ----------
134    function: Callable[[Any], Any]
135        The function to execute upon completion of a sync.
136        
137    Returns
138    -------
139    Another function (this is a decorator function).
140
141    Examples
142    --------
143    >>> from meerschaum.plugins import post_sync_hook
144    >>>
145    >>> @post_sync_hook
146    ... def log_sync(pipe, success_tuple, duration=None, **kwargs):
147    ...     print(f"It took {round(duration, 2)} seconds to sync {pipe}.")
148    >>>
149    """
150    with _locks['_post_sync_hooks']:
151        try:
152            if function.__module__ not in _post_sync_hooks:
153                _post_sync_hooks[function.__module__] = []
154            _post_sync_hooks[function.__module__].append(function)
155        except Exception as e:
156            from meerschaum.utils.warnings import warn
157            warn(e)
158    return function
159
160
161_plugin_endpoints_to_pages = {}
162def web_page(
163        page: Union[str, None, Callable[[Any], Any]] = None,
164        login_required: bool = True,
165        **kwargs
166    ) -> Any:
167    """
168    Quickly add pages to the dash application.
169
170    Examples
171    --------
172    >>> import meerschaum as mrsm
173    >>> from meerschaum.plugins import web_page
174    >>> html = mrsm.attempt_import('dash.html')
175    >>> 
176    >>> @web_page('foo/bar', login_required=False)
177    >>> def foo_bar():
178    ...     return html.Div([html.H1("Hello, World!")])
179    >>> 
180    """
181    page_str = None
182
183    def _decorator(_func: Callable[[Any], Any]) -> Callable[[Any], Any]:
184        nonlocal page_str
185
186        @functools.wraps(_func)
187        def wrapper(*_args, **_kwargs):
188            return _func(*_args, **_kwargs)
189
190        if page_str is None:
191            page_str = _func.__name__
192
193        page_str = page_str.lstrip('/').rstrip('/').strip()
194        _plugin_endpoints_to_pages[page_str] = {
195            'function': _func,
196            'login_required': login_required,
197        }
198        return wrapper
199
200    if callable(page):
201        decorator_to_return = _decorator(page)
202        page_str = page.__name__
203    else:
204        decorator_to_return = _decorator
205        page_str = page
206
207    return decorator_to_return
208
209
210_dash_plugins = {}
211def dash_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
212    """
213    Execute the function when starting the Dash application.
214    """
215    with _locks['_dash_plugins']:
216        try:
217            if function.__module__ not in _dash_plugins:
218                _dash_plugins[function.__module__] = []
219            _dash_plugins[function.__module__].append(function)
220        except Exception as e:
221            from meerschaum.utils.warnings import warn
222            warn(e)
223    return function
224
225
226def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
227    """
228    Execute the function when initializing the Meerschaum API module.
229    Useful for lazy-loading heavy plugins only when the API is started,
230    such as when editing the `meerschaum.api.app` FastAPI app.
231    
232    The FastAPI app will be passed as the only parameter.
233    
234    Examples
235    --------
236    >>> from meerschaum.plugins import api_plugin
237    >>>
238    >>> @api_plugin
239    >>> def initialize_plugin(app):
240    ...     @app.get('/my/new/path')
241    ...     def new_path():
242    ...         return {'message': 'It works!'}
243    >>>
244    """
245    with _locks['_api_plugins']:
246        try:
247            if function.__module__ not in _api_plugins:
248                _api_plugins[function.__module__] = []
249            _api_plugins[function.__module__].append(function)
250        except Exception as e:
251            from meerschaum.utils.warnings import warn
252            warn(e)
253    return function
254
255
256_synced_symlinks: bool = False
257def sync_plugins_symlinks(debug: bool = False, warn: bool = True) -> None:
258    """
259    Update the plugins 
260    """
261    global _synced_symlinks
262    with _locks['_synced_symlinks']:
263        if _synced_symlinks:
264            return
265
266    import sys, os, pathlib, time
267    from collections import defaultdict
268    import importlib.util
269    from meerschaum.utils.misc import flatten_list, make_symlink, is_symlink
270    from meerschaum.utils.warnings import error, warn as _warn
271    from meerschaum.config.static import STATIC_CONFIG
272    from meerschaum.utils.venv import Venv, activate_venv, deactivate_venv, is_venv_active
273    from meerschaum.config._paths import (
274        PLUGINS_RESOURCES_PATH,
275        PLUGINS_ARCHIVES_RESOURCES_PATH,
276        PLUGINS_INIT_PATH,
277        PLUGINS_DIR_PATHS,
278        PLUGINS_INTERNAL_LOCK_PATH,
279    )
280
281    ### If the lock file exists, sleep for up to a second or until it's removed before continuing.
282    with _locks['PLUGINS_INTERNAL_LOCK_PATH']:
283        if PLUGINS_INTERNAL_LOCK_PATH.exists():
284            lock_sleep_total     = STATIC_CONFIG['plugins']['lock_sleep_total']
285            lock_sleep_increment = STATIC_CONFIG['plugins']['lock_sleep_increment']
286            lock_start = time.perf_counter()
287            while (
288                (time.perf_counter() - lock_start) < lock_sleep_total
289            ):
290                time.sleep(lock_sleep_increment)
291                if not PLUGINS_INTERNAL_LOCK_PATH.exists():
292                    break
293                try:
294                    PLUGINS_INTERNAL_LOCK_PATH.unlink()
295                except Exception as e:
296                    if warn:
297                        _warn(f"Error while removing lockfile {PLUGINS_INTERNAL_LOCK_PATH}:\n{e}")
298                    break
299
300        ### Begin locking from other processes.
301        try:
302            PLUGINS_INTERNAL_LOCK_PATH.touch()
303        except Exception as e:
304            if warn:
305                _warn(f"Unable to create lockfile {PLUGINS_INTERNAL_LOCK_PATH}:\n{e}")
306
307    with _locks['internal_plugins']:
308
309        try:
310            from importlib.metadata import entry_points
311        except ImportError:
312            importlib_metadata = attempt_import('importlib_metadata', lazy=False)
313            entry_points = importlib_metadata.entry_points
314
315        ### NOTE: Allow plugins to be installed via `pip`.
316        packaged_plugin_paths = []
317        discovered_packaged_plugins_eps = entry_points(group='meerschaum.plugins')
318        for ep in discovered_packaged_plugins_eps:
319            module_name = ep.name
320            for package_file_path in ep.dist.files:
321                if package_file_path.suffix != '.py':
322                    continue
323                if str(package_file_path) == f'{module_name}.py':
324                    packaged_plugin_paths.append(package_file_path.locate())
325                elif str(package_file_path) == f'{module_name}/__init__.py':
326                    packaged_plugin_paths.append(package_file_path.locate().parent)
327
328        if is_symlink(PLUGINS_RESOURCES_PATH) or not PLUGINS_RESOURCES_PATH.exists():
329            try:
330                PLUGINS_RESOURCES_PATH.unlink()
331            except Exception as e:
332                pass
333
334        PLUGINS_RESOURCES_PATH.mkdir(exist_ok=True)
335
336        existing_symlinked_paths = [
337            (PLUGINS_RESOURCES_PATH / item) 
338            for item in os.listdir(PLUGINS_RESOURCES_PATH)
339        ]
340        for plugins_path in PLUGINS_DIR_PATHS:
341            if not plugins_path.exists():
342                plugins_path.mkdir(exist_ok=True, parents=True)
343        plugins_to_be_symlinked = list(flatten_list(
344            [
345                [
346                    (plugins_path / item)
347                    for item in os.listdir(plugins_path)
348                    if (
349                        not item.startswith('.')
350                    ) and (item not in ('__pycache__', '__init__.py'))
351                ]
352                for plugins_path in PLUGINS_DIR_PATHS
353            ]
354        ))
355        plugins_to_be_symlinked.extend(packaged_plugin_paths)
356
357        ### Check for duplicates.
358        seen_plugins = defaultdict(lambda: 0)
359        for plugin_path in plugins_to_be_symlinked:
360            plugin_name = plugin_path.stem
361            seen_plugins[plugin_name] += 1
362        for plugin_name, plugin_count in seen_plugins.items():
363            if plugin_count > 1:
364                if warn:
365                    _warn(f"Found duplicate plugins named '{plugin_name}'.")
366
367        for plugin_symlink_path in existing_symlinked_paths:
368            real_path = pathlib.Path(os.path.realpath(plugin_symlink_path))
369
370            ### Remove invalid symlinks.
371            if real_path not in plugins_to_be_symlinked:
372                if not is_symlink(plugin_symlink_path):
373                    continue
374                try:
375                    plugin_symlink_path.unlink()
376                except Exception as e:
377                    pass
378
379            ### Remove valid plugins from the to-be-symlinked list.
380            else:
381                plugins_to_be_symlinked.remove(real_path)
382
383        for plugin_path in plugins_to_be_symlinked:
384            plugin_symlink_path = PLUGINS_RESOURCES_PATH / plugin_path.name
385            try:
386                ### There might be duplicate folders (e.g. __pycache__).
387                if (
388                    plugin_symlink_path.exists()
389                    and
390                    plugin_symlink_path.is_dir()
391                    and
392                    not is_symlink(plugin_symlink_path)
393                ):
394                    continue
395                success, msg = make_symlink(plugin_path, plugin_symlink_path)
396            except Exception as e:
397                success, msg = False, str(e)
398            if not success:
399                if warn:
400                    _warn(
401                        f"Failed to create symlink {plugin_symlink_path} "
402                        + f"to {plugin_path}:\n    {msg}"
403                    )
404
405    ### Release symlink lock file in case other processes need it.
406    with _locks['PLUGINS_INTERNAL_LOCK_PATH']:
407        try:
408            if PLUGINS_INTERNAL_LOCK_PATH.exists():
409                PLUGINS_INTERNAL_LOCK_PATH.unlink()
410        ### Sometimes competing threads will delete the lock file at the same time.
411        except FileNotFoundError:
412            pass
413        except Exception as e:
414            if warn:
415                _warn(f"Error cleaning up lockfile {PLUGINS_INTERNAL_LOCK_PATH}:\n{e}")
416
417        try:
418            if not PLUGINS_INIT_PATH.exists():
419                PLUGINS_INIT_PATH.touch()
420        except Exception as e:
421            error(f"Failed to create the file '{PLUGINS_INIT_PATH}':\n{e}")
422
423    with _locks['__path__']:
424        if str(PLUGINS_RESOURCES_PATH.parent) not in __path__:
425            __path__.append(str(PLUGINS_RESOURCES_PATH.parent))
426
427    with _locks['_synced_symlinks']:
428        _synced_symlinks = True
429
430
431def import_plugins(
432        *plugins_to_import: Union[str, List[str], None],
433        warn: bool = True,
434    ) -> Union[
435        'ModuleType', Tuple['ModuleType', None]
436    ]:
437    """
438    Import the Meerschaum plugins directory.
439
440    Parameters
441    ----------
442    plugins_to_import: Union[str, List[str], None]
443        If provided, only import the specified plugins.
444        Otherwise import the entire plugins module. May be a string, list, or `None`.
445        Defaults to `None`.
446
447    Returns
448    -------
449    A module of list of modules, depening on the number of plugins provided.
450
451    """
452    import sys
453    import os
454    import importlib
455    from meerschaum.utils.misc import flatten_list
456    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
457    from meerschaum.utils.venv import is_venv_active, activate_venv, deactivate_venv, Venv
458    from meerschaum.utils.warnings import warn as _warn
459    plugins_to_import = list(plugins_to_import)
460    with _locks['sys.path']:
461
462        ### Since plugins may depend on other plugins,
463        ### we need to activate the virtual environments for library plugins.
464        ### This logic exists in `Plugin.activate_venv()`,
465        ### but that code requires the plugin's module to already be imported.
466        ### It's not a guarantee of correct activation order,
467        ### e.g. if a library plugin pins a specific package and another 
468        plugins_names = get_plugins_names()
469        already_active_venvs = [is_venv_active(plugin_name) for plugin_name in plugins_names]
470
471        if not sys.path or sys.path[0] != str(PLUGINS_RESOURCES_PATH.parent):
472            sys.path.insert(0, str(PLUGINS_RESOURCES_PATH.parent))
473
474        if not plugins_to_import:
475            for plugin_name in plugins_names:
476                activate_venv(plugin_name)
477            try:
478                imported_plugins = importlib.import_module(PLUGINS_RESOURCES_PATH.stem)
479            except ImportError as e:
480                _warn(f"Failed to import the plugins module:\n    {e}")
481                import traceback
482                traceback.print_exc()
483                imported_plugins = None
484            for plugin_name in plugins_names:
485                if plugin_name in already_active_venvs:
486                    continue
487                deactivate_venv(plugin_name)
488
489        else:
490            imported_plugins = []
491            for plugin_name in flatten_list(plugins_to_import):
492                plugin = Plugin(plugin_name)
493                try:
494                    with Venv(plugin):
495                        imported_plugins.append(
496                            importlib.import_module(
497                                f'{PLUGINS_RESOURCES_PATH.stem}.{plugin_name}'
498                            )
499                        )
500                except Exception as e:
501                    _warn(
502                        f"Failed to import plugin '{plugin_name}':\n    "
503                        + f"{e}\n\nHere's a stacktrace:",
504                        stack = False,
505                    )
506                    from meerschaum.utils.formatting import get_console
507                    get_console().print_exception(
508                        suppress = [
509                            'meerschaum/plugins/__init__.py',
510                            importlib,
511                            importlib._bootstrap,
512                        ]
513                    )
514                    imported_plugins.append(None)
515
516        if imported_plugins is None and warn:
517            _warn(f"Failed to import plugins.", stacklevel=3)
518
519        if str(PLUGINS_RESOURCES_PATH.parent) in sys.path:
520            sys.path.remove(str(PLUGINS_RESOURCES_PATH.parent))
521
522    if isinstance(imported_plugins, list):
523        return (imported_plugins[0] if len(imported_plugins) == 1 else tuple(imported_plugins))
524    return imported_plugins
525
526
527def load_plugins(debug: bool = False, shell: bool = False) -> None:
528    """
529    Import Meerschaum plugins and update the actions dictionary.
530    """
531    from inspect import isfunction, getmembers
532    from meerschaum.actions import __all__ as _all, modules
533    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
534    from meerschaum.utils.packages import get_modules_from_package
535    if debug:
536        from meerschaum.utils.debug import dprint
537
538    _plugins_names, plugins_modules = get_modules_from_package(
539        import_plugins(),
540        names = True,
541        recursive = True,
542        modules_venvs = True
543    )
544    ### I'm appending here to keep from redefining the modules list.
545    new_modules = (
546        [
547            mod for mod in modules
548            if not mod.__name__.startswith(PLUGINS_RESOURCES_PATH.stem + '.')
549        ]
550        + plugins_modules
551    )
552    n_mods = len(modules)
553    for mod in new_modules:
554        modules.append(mod)
555    for i in range(n_mods):
556        modules.pop(0)
557
558    for module in plugins_modules:
559        for name, func in getmembers(module):
560            if not isfunction(func):
561                continue
562            if name == module.__name__.split('.')[-1]:
563                make_action(func, **{'shell': shell, 'debug': debug})
564
565
566def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
567    """
568    Reload plugins back into memory.
569
570    Parameters
571    ----------
572    plugins: Optional[List[str]], default None
573        The plugins to reload. `None` will reload all plugins.
574
575    """
576    import sys
577    if debug:
578        from meerschaum.utils.debug import dprint
579
580    if not plugins:
581        plugins = get_plugins_names()
582    for plugin_name in plugins:
583        if debug:
584            dprint(f"Reloading plugin '{plugin_name}'...")
585        mod_name = 'plugins.' + str(plugin_name)
586        if mod_name in sys.modules:
587            del sys.modules[mod_name]
588    load_plugins(debug=debug)
589
590
591def get_plugins(*to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]:
592    """
593    Return a list of `Plugin` objects.
594
595    Parameters
596    ----------
597    to_load:
598        If specified, only load specific plugins.
599        Otherwise return all plugins.
600
601    try_import: bool, default True
602        If `True`, allow for plugins to be imported.
603    """
604    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
605    import os
606    sync_plugins_symlinks()
607    _plugins = [
608        Plugin(name) for name in (
609            to_load or [
610                (
611                    name if (PLUGINS_RESOURCES_PATH / name).is_dir()
612                    else name[:-3]
613                ) for name in os.listdir(PLUGINS_RESOURCES_PATH) if name != '__init__.py'
614            ]
615        )
616    ]
617    plugins = tuple(plugin for plugin in _plugins if plugin.is_installed(try_import=try_import))
618    if len(to_load) == 1:
619        if len(plugins) == 0:
620            raise ValueError(f"Plugin '{to_load[0]}' is not installed.")
621        return plugins[0]
622    return plugins
623
624
625def get_plugins_names(*to_load, **kw) -> List[str]:
626    """
627    Return a list of installed plugins.
628    """
629    return [plugin.name for plugin in get_plugins(*to_load, **kw)]
630
631
632def get_plugins_modules(*to_load, **kw) -> List['ModuleType']:
633    """
634    Return a list of modules for the installed plugins, or `None` if things break.
635    """
636    return [plugin.module for plugin in get_plugins(*to_load, **kw)]
637
638
639def get_data_plugins() -> List[Plugin]:
640    """
641    Only return the modules of plugins with either `fetch()` or `sync()` functions.
642    """
643    import inspect
644    plugins = get_plugins()
645    data_names = {'sync', 'fetch'}
646    data_plugins = []
647    for plugin in plugins:
648        for name, ob in inspect.getmembers(plugin.module):
649            if not inspect.isfunction(ob):
650                continue
651            if name not in data_names:
652                continue
653            data_plugins.append(plugin)
654    return data_plugins
655
656
657def add_plugin_argument(*args, **kwargs) -> None:
658    """
659    Add argparse arguments under the 'Plugins options' group.
660    Takes the same parameters as the regular argparse `add_argument()` function.
661
662    Examples
663    --------
664    >>> add_plugin_argument('--foo', type=int, help="This is my help text!")
665    >>> 
666    """
667    from meerschaum._internal.arguments._parser import groups, _seen_plugin_args, parser
668    from meerschaum.utils.warnings import warn, error
669    _parent_plugin_name = _get_parent_plugin(2)
670    title = f"Plugin '{_parent_plugin_name}' options" if _parent_plugin_name else 'Custom options'
671    group_key = 'plugin_' + (_parent_plugin_name or '')
672    if group_key not in groups:
673        groups[group_key] = parser.add_argument_group(
674            title = title,
675        )
676        _seen_plugin_args[group_key] = set()
677    try:
678        if str(args) not in _seen_plugin_args[group_key]:
679            groups[group_key].add_argument(*args, **kwargs)
680            _seen_plugin_args[group_key].add(str(args))
681    except Exception as e:
682        warn(e)
683
684
685def _get_parent_plugin(stacklevel: int = 1) -> Union[str, None]:
686    """If this function is called from outside a Meerschaum plugin, it will return None."""
687    import inspect, re
688    try:
689        parent_globals = inspect.stack()[stacklevel][0].f_globals
690        parent_file = parent_globals.get('__file__', '')
691        return parent_globals['__name__'].replace('plugins.', '').split('.')[0]
692    except Exception as e:
693        return None
class Plugin:
 33class Plugin:
 34    """Handle packaging of Meerschaum plugins."""
 35    def __init__(
 36        self,
 37        name: str,
 38        version: Optional[str] = None,
 39        user_id: Optional[int] = None,
 40        required: Optional[List[str]] = None,
 41        attributes: Optional[Dict[str, Any]] = None,
 42        archive_path: Optional[pathlib.Path] = None,
 43        venv_path: Optional[pathlib.Path] = None,
 44        repo_connector: Optional['meerschaum.connectors.api.APIConnector'] = None,
 45        repo: Union['meerschaum.connectors.api.APIConnector', str, None] = None,
 46    ):
 47        from meerschaum.config.static import STATIC_CONFIG
 48        sep = STATIC_CONFIG['plugins']['repo_separator']
 49        _repo = None
 50        if sep in name:
 51            try:
 52                name, _repo = name.split(sep)
 53            except Exception as e:
 54                error(f"Invalid plugin name: '{name}'")
 55        self._repo_in_name = _repo
 56
 57        if attributes is None:
 58            attributes = {}
 59        self.name = name
 60        self.attributes = attributes
 61        self.user_id = user_id
 62        self._version = version
 63        if required:
 64            self._required = required
 65        self.archive_path = (
 66            archive_path if archive_path is not None
 67            else PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz"
 68        )
 69        self.venv_path = (
 70            venv_path if venv_path is not None
 71            else VIRTENV_RESOURCES_PATH / self.name
 72        )
 73        self._repo_connector = repo_connector
 74        self._repo_keys = repo
 75
 76
 77    @property
 78    def repo_connector(self):
 79        """
 80        Return the repository connector for this plugin.
 81        NOTE: This imports the `connectors` module, which imports certain plugin modules.
 82        """
 83        if self._repo_connector is None:
 84            from meerschaum.connectors.parse import parse_repo_keys
 85
 86            repo_keys = self._repo_keys or self._repo_in_name
 87            if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name:
 88                error(
 89                    f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'."
 90                )
 91            repo_connector = parse_repo_keys(repo_keys)
 92            self._repo_connector = repo_connector
 93        return self._repo_connector
 94
 95
 96    @property
 97    def version(self):
 98        """
 99        Return the plugin's module version is defined (`__version__`) if it's defined.
100        """
101        if self._version is None:
102            try:
103                self._version = self.module.__version__
104            except Exception as e:
105                self._version = None
106        return self._version
107
108
109    @property
110    def module(self):
111        """
112        Return the Python module of the underlying plugin.
113        """
114        if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None:
115            if self.__file__ is None:
116                return None
117            from meerschaum.plugins import import_plugins
118            self._module = import_plugins(str(self), warn=False)
119        return self._module
120
121
122    @property
123    def __file__(self) -> Union[str, None]:
124        """
125        Return the file path (str) of the plugin if it exists, otherwise `None`.
126        """
127        if self.__dict__.get('_module', None) is not None:
128            return self.module.__file__
129
130        potential_dir = PLUGINS_RESOURCES_PATH / self.name
131        if (
132            potential_dir.exists()
133            and potential_dir.is_dir()
134            and (potential_dir / '__init__.py').exists()
135        ):
136            return str((potential_dir / '__init__.py').as_posix())
137
138        potential_file = PLUGINS_RESOURCES_PATH / (self.name + '.py')
139        if potential_file.exists() and not potential_file.is_dir():
140            return str(potential_file.as_posix())
141
142        return None
143
144
145    @property
146    def requirements_file_path(self) -> Union[pathlib.Path, None]:
147        """
148        If a file named `requirements.txt` exists, return its path.
149        """
150        if self.__file__ is None:
151            return None
152        path = pathlib.Path(self.__file__).parent / 'requirements.txt'
153        if not path.exists():
154            return None
155        return path
156
157
158    def is_installed(self, **kw) -> bool:
159        """
160        Check whether a plugin is correctly installed.
161
162        Returns
163        -------
164        A `bool` indicating whether a plugin exists and is successfully imported.
165        """
166        return self.__file__ is not None
167
168
169    def make_tar(self, debug: bool = False) -> pathlib.Path:
170        """
171        Compress the plugin's source files into a `.tar.gz` archive and return the archive's path.
172
173        Parameters
174        ----------
175        debug: bool, default False
176            Verbosity toggle.
177
178        Returns
179        -------
180        A `pathlib.Path` to the archive file's path.
181
182        """
183        import tarfile, pathlib, subprocess, fnmatch
184        from meerschaum.utils.debug import dprint
185        from meerschaum.utils.packages import attempt_import
186        pathspec = attempt_import('pathspec', debug=debug)
187
188        if not self.__file__:
189            from meerschaum.utils.warnings import error
190            error(f"Could not find file for plugin '{self}'.")
191        if '__init__.py' in self.__file__ or os.path.isdir(self.__file__):
192            path = self.__file__.replace('__init__.py', '')
193            is_dir = True
194        else:
195            path = self.__file__
196            is_dir = False
197
198        old_cwd = os.getcwd()
199        real_parent_path = pathlib.Path(os.path.realpath(path)).parent
200        os.chdir(real_parent_path)
201
202        default_patterns_to_ignore = [
203            '.pyc',
204            '__pycache__/',
205            'eggs/',
206            '__pypackages__/',
207            '.git',
208        ]
209
210        def parse_gitignore() -> 'Set[str]':
211            gitignore_path = pathlib.Path(path) / '.gitignore'
212            if not gitignore_path.exists():
213                return set(default_patterns_to_ignore)
214            with open(gitignore_path, 'r', encoding='utf-8') as f:
215                gitignore_text = f.read()
216            return set(pathspec.PathSpec.from_lines(
217                pathspec.patterns.GitWildMatchPattern,
218                default_patterns_to_ignore + gitignore_text.splitlines()
219            ).match_tree(path))
220
221        patterns_to_ignore = parse_gitignore() if is_dir else set()
222
223        if debug:
224            dprint(f"Patterns to ignore:\n{patterns_to_ignore}")
225
226        with tarfile.open(self.archive_path, 'w:gz') as tarf:
227            if not is_dir:
228                tarf.add(f"{self.name}.py")
229            else:
230                for root, dirs, files in os.walk(self.name):
231                    for f in files:
232                        good_file = True
233                        fp = os.path.join(root, f)
234                        for pattern in patterns_to_ignore:
235                            if pattern in str(fp) or f.startswith('.'):
236                                good_file = False
237                                break
238                        if good_file:
239                            if debug:
240                                dprint(f"Adding '{fp}'...")
241                            tarf.add(fp)
242
243        ### clean up and change back to old directory
244        os.chdir(old_cwd)
245
246        ### change to 775 to avoid permissions issues with the API in a Docker container
247        self.archive_path.chmod(0o775)
248
249        if debug:
250            dprint(f"Created archive '{self.archive_path}'.")
251        return self.archive_path
252
253
254    def install(
255            self,
256            skip_deps: bool = False,
257            force: bool = False,
258            debug: bool = False,
259        ) -> SuccessTuple:
260        """
261        Extract a plugin's tar archive to the plugins directory.
262        
263        This function checks if the plugin is already installed and if the version is equal or
264        greater than the existing installation.
265
266        Parameters
267        ----------
268        skip_deps: bool, default False
269            If `True`, do not install dependencies.
270
271        force: bool, default False
272            If `True`, continue with installation, even if required packages fail to install.
273
274        debug: bool, default False
275            Verbosity toggle.
276
277        Returns
278        -------
279        A `SuccessTuple` of success (bool) and a message (str).
280
281        """
282        if self.full_name in _ongoing_installations:
283            return True, f"Already installing plugin '{self}'."
284        _ongoing_installations.add(self.full_name)
285        from meerschaum.utils.warnings import warn, error
286        if debug:
287            from meerschaum.utils.debug import dprint
288        import tarfile
289        import re
290        import ast
291        from meerschaum.plugins import sync_plugins_symlinks
292        from meerschaum.utils.packages import attempt_import, determine_version, reload_meerschaum
293        from meerschaum.utils.venv import init_venv
294        from meerschaum.utils.misc import safely_extract_tar
295        old_cwd = os.getcwd()
296        old_version = ''
297        new_version = ''
298        temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name
299        temp_dir.mkdir(exist_ok=True)
300
301        if not self.archive_path.exists():
302            return False, f"Missing archive file for plugin '{self}'."
303        if self.version is not None:
304            old_version = self.version
305            if debug:
306                dprint(f"Found existing version '{old_version}' for plugin '{self}'.")
307
308        if debug:
309            dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...")
310
311        try:
312            with tarfile.open(self.archive_path, 'r:gz') as tarf:
313                safely_extract_tar(tarf, temp_dir)
314        except Exception as e:
315            warn(e)
316            return False, f"Failed to extract plugin '{self.name}'."
317
318        ### search for version information
319        files = os.listdir(temp_dir)
320        
321        if str(files[0]) == self.name:
322            is_dir = True
323        elif str(files[0]) == self.name + '.py':
324            is_dir = False
325        else:
326            error(f"Unknown format encountered for plugin '{self}'.")
327
328        fpath = temp_dir / files[0]
329        if is_dir:
330            fpath = fpath / '__init__.py'
331
332        init_venv(self.name, debug=debug)
333        with open(fpath, 'r', encoding='utf-8') as f:
334            init_lines = f.readlines()
335        new_version = None
336        for line in init_lines:
337            if '__version__' not in line:
338                continue
339            version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip())
340            if not version_match:
341                continue
342            new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip())
343            break
344        if not new_version:
345            warn(
346                f"No `__version__` defined for plugin '{self}'. "
347                + "Assuming new version...",
348                stack = False,
349            )
350
351        packaging_version = attempt_import('packaging.version')
352        try:
353            is_new_version = (not new_version and not old_version) or (
354                packaging_version.parse(old_version) < packaging_version.parse(new_version)
355            )
356            is_same_version = new_version and old_version and (
357                packaging_version.parse(old_version) == packaging_version.parse(new_version)
358            )
359        except Exception as e:
360            is_new_version, is_same_version = True, False
361
362        ### Determine where to permanently store the new plugin.
363        plugin_installation_dir_path = PLUGINS_DIR_PATHS[0]
364        for path in PLUGINS_DIR_PATHS:
365            files_in_plugins_dir = os.listdir(path)
366            if (
367                self.name in files_in_plugins_dir
368                or
369                (self.name + '.py') in files_in_plugins_dir
370            ):
371                plugin_installation_dir_path = path
372                break
373
374        success_msg = (
375            f"Successfully installed plugin '{self}'"
376            + ("\n    (skipped dependencies)" if skip_deps else "")
377            + "."
378        )
379        success, abort = None, None
380
381        if is_same_version and not force:
382            success, msg = True, (
383                f"Plugin '{self}' is up-to-date (version {old_version}).\n" +
384                "    Install again with `-f` or `--force` to reinstall."
385            )
386            abort = True
387        elif is_new_version or force:
388            for src_dir, dirs, files in os.walk(temp_dir):
389                if success is not None:
390                    break
391                dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path))
392                if not os.path.exists(dst_dir):
393                    os.mkdir(dst_dir)
394                for f in files:
395                    src_file = os.path.join(src_dir, f)
396                    dst_file = os.path.join(dst_dir, f)
397                    if os.path.exists(dst_file):
398                        os.remove(dst_file)
399
400                    if debug:
401                        dprint(f"Moving '{src_file}' to '{dst_dir}'...")
402                    try:
403                        shutil.move(src_file, dst_dir)
404                    except Exception as e:
405                        success, msg = False, (
406                            f"Failed to install plugin '{self}': " +
407                            f"Could not move file '{src_file}' to '{dst_dir}'"
408                        )
409                        print(msg)
410                        break
411            if success is None:
412                success, msg = True, success_msg
413        else:
414            success, msg = False, (
415                f"Your installed version of plugin '{self}' ({old_version}) is higher than "
416                + f"attempted version {new_version}."
417            )
418
419        shutil.rmtree(temp_dir)
420        os.chdir(old_cwd)
421
422        ### Reload the plugin's module.
423        sync_plugins_symlinks(debug=debug)
424        if '_module' in self.__dict__:
425            del self.__dict__['_module']
426        init_venv(venv=self.name, force=True, debug=debug)
427        reload_meerschaum(debug=debug)
428
429        ### if we've already failed, return here
430        if not success or abort:
431            _ongoing_installations.remove(self.full_name)
432            return success, msg
433
434        ### attempt to install dependencies
435        dependencies_installed = skip_deps or self.install_dependencies(force=force, debug=debug)
436        if not dependencies_installed:
437            _ongoing_installations.remove(self.full_name)
438            return False, f"Failed to install dependencies for plugin '{self}'."
439
440        ### handling success tuple, bool, or other (typically None)
441        setup_tuple = self.setup(debug=debug)
442        if isinstance(setup_tuple, tuple):
443            if not setup_tuple[0]:
444                success, msg = setup_tuple
445        elif isinstance(setup_tuple, bool):
446            if not setup_tuple:
447                success, msg = False, (
448                    f"Failed to run post-install setup for plugin '{self}'." + '\n' +
449                    f"Check `setup()` in '{self.__file__}' for more information " +
450                    f"(no error message provided)."
451                )
452            else:
453                success, msg = True, success_msg
454        elif setup_tuple is None:
455            success = True
456            msg = (
457                f"Post-install for plugin '{self}' returned None. " +
458                f"Assuming plugin successfully installed."
459            )
460            warn(msg)
461        else:
462            success = False
463            msg = (
464                f"Post-install for plugin '{self}' returned unexpected value " +
465                f"of type '{type(setup_tuple)}': {setup_tuple}"
466            )
467
468        _ongoing_installations.remove(self.full_name)
469        module = self.module
470        return success, msg
471
472
473    def remove_archive(
474            self,        
475            debug: bool = False
476        ) -> SuccessTuple:
477        """Remove a plugin's archive file."""
478        if not self.archive_path.exists():
479            return True, f"Archive file for plugin '{self}' does not exist."
480        try:
481            self.archive_path.unlink()
482        except Exception as e:
483            return False, f"Failed to remove archive for plugin '{self}':\n{e}"
484        return True, "Success"
485
486
487    def remove_venv(
488            self,        
489            debug: bool = False
490        ) -> SuccessTuple:
491        """Remove a plugin's virtual environment."""
492        if not self.venv_path.exists():
493            return True, f"Virtual environment for plugin '{self}' does not exist."
494        try:
495            shutil.rmtree(self.venv_path)
496        except Exception as e:
497            return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}"
498        return True, "Success"
499
500
501    def uninstall(self, debug: bool = False) -> SuccessTuple:
502        """
503        Remove a plugin, its virtual environment, and archive file.
504        """
505        from meerschaum.utils.packages import reload_meerschaum
506        from meerschaum.plugins import sync_plugins_symlinks
507        from meerschaum.utils.warnings import warn, info
508        warnings_thrown_count: int = 0
509        max_warnings: int = 3
510
511        if not self.is_installed():
512            info(
513                f"Plugin '{self.name}' doesn't seem to be installed.\n    "
514                + "Checking for artifacts...",
515                stack = False,
516            )
517        else:
518            real_path = pathlib.Path(os.path.realpath(self.__file__))
519            try:
520                if real_path.name == '__init__.py':
521                    shutil.rmtree(real_path.parent)
522                else:
523                    real_path.unlink()
524            except Exception as e:
525                warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False)
526                warnings_thrown_count += 1
527            else:
528                info(f"Removed source files for plugin '{self.name}'.")
529
530        if self.venv_path.exists():
531            success, msg = self.remove_venv(debug=debug)
532            if not success:
533                warn(msg, stack=False)
534                warnings_thrown_count += 1
535            else:
536                info(f"Removed virtual environment from plugin '{self.name}'.")
537
538        success = warnings_thrown_count < max_warnings
539        sync_plugins_symlinks(debug=debug)
540        self.deactivate_venv(force=True, debug=debug)
541        reload_meerschaum(debug=debug)
542        return success, (
543            f"Successfully uninstalled plugin '{self}'." if success
544            else f"Failed to uninstall plugin '{self}'."
545        )
546
547
548    def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]:
549        """
550        If exists, run the plugin's `setup()` function.
551
552        Parameters
553        ----------
554        *args: str
555            The positional arguments passed to the `setup()` function.
556            
557        debug: bool, default False
558            Verbosity toggle.
559
560        **kw: Any
561            The keyword arguments passed to the `setup()` function.
562
563        Returns
564        -------
565        A `SuccessTuple` or `bool` indicating success.
566
567        """
568        from meerschaum.utils.debug import dprint
569        import inspect
570        _setup = None
571        for name, fp in inspect.getmembers(self.module):
572            if name == 'setup' and inspect.isfunction(fp):
573                _setup = fp
574                break
575
576        ### assume success if no setup() is found (not necessary)
577        if _setup is None:
578            return True
579
580        sig = inspect.signature(_setup)
581        has_debug, has_kw = ('debug' in sig.parameters), False
582        for k, v in sig.parameters.items():
583            if '**' in str(v):
584                has_kw = True
585                break
586
587        _kw = {}
588        if has_kw:
589            _kw.update(kw)
590        if has_debug:
591            _kw['debug'] = debug
592
593        if debug:
594            dprint(f"Running setup for plugin '{self}'...")
595        try:
596            self.activate_venv(debug=debug)
597            return_tuple = _setup(*args, **_kw)
598            self.deactivate_venv(debug=debug)
599        except Exception as e:
600            return False, str(e)
601
602        if isinstance(return_tuple, tuple):
603            return return_tuple
604        if isinstance(return_tuple, bool):
605            return return_tuple, f"Setup for Plugin '{self.name}' did not return a message."
606        if return_tuple is None:
607            return False, f"Setup for Plugin '{self.name}' returned None."
608        return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"
609
610
611    def get_dependencies(
612            self,
613            debug: bool = False,
614        ) -> List[str]:
615        """
616        If the Plugin has specified dependencies in a list called `required`, return the list.
617        
618        **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages.
619        Meerschaum plugins may also specify connector keys for a repo after `'@'`.
620
621        Parameters
622        ----------
623        debug: bool, default False
624            Verbosity toggle.
625
626        Returns
627        -------
628        A list of required packages and plugins (str).
629
630        """
631        if '_required' in self.__dict__:
632            return self._required
633
634        ### If the plugin has not yet been imported,
635        ### infer the dependencies from the source text.
636        ### This is not super robust, and it doesn't feel right
637        ### having multiple versions of the logic.
638        ### This is necessary when determining the activation order
639        ### without having import the module.
640        ### For consistency's sake, the module-less method does not cache the requirements.
641        if self.__dict__.get('_module', None) is None:
642            file_path = self.__file__
643            if file_path is None:
644                return []
645            with open(file_path, 'r', encoding='utf-8') as f:
646                text = f.read()
647
648            if 'required' not in text:
649                return []
650
651            ### This has some limitations:
652            ### It relies on `required` being manually declared.
653            ### We lose the ability to dynamically alter the `required` list,
654            ### which is why we've kept the module-reliant method below.
655            import ast, re
656            ### NOTE: This technically would break 
657            ### if `required` was the very first line of the file.
658            req_start_match = re.search(r'required(:\s*)?.*=', text)
659            if not req_start_match:
660                return []
661            req_start = req_start_match.start()
662            equals_sign = req_start + text[req_start:].find('=')
663
664            ### Dependencies may have brackets within the strings, so push back the index.
665            first_opening_brace = equals_sign + 1 + text[equals_sign:].find('[')
666            if first_opening_brace == -1:
667                return []
668
669            next_closing_brace = equals_sign + 1 + text[equals_sign:].find(']')
670            if next_closing_brace == -1:
671                return []
672
673            start_ix = first_opening_brace + 1
674            end_ix = next_closing_brace
675
676            num_braces = 0
677            while True:
678                if '[' not in text[start_ix:end_ix]:
679                    break
680                num_braces += 1
681                start_ix = end_ix
682                end_ix += text[end_ix + 1:].find(']') + 1
683
684            req_end = end_ix + 1
685            req_text = (
686                text[(first_opening_brace-1):req_end]
687                .lstrip()
688                .replace('=', '', 1)
689                .lstrip()
690                .rstrip()
691            )
692            try:
693                required = ast.literal_eval(req_text)
694            except Exception as e:
695                warn(
696                    f"Unable to determine requirements for plugin '{self.name}' "
697                    + "without importing the module.\n"
698                    + "    This may be due to dynamically setting the global `required` list.\n"
699                    + f"    {e}"
700                )
701                return []
702            return required
703
704        import inspect
705        self.activate_venv(dependencies=False, debug=debug)
706        required = []
707        for name, val in inspect.getmembers(self.module):
708            if name == 'required':
709                required = val
710                break
711        self._required = required
712        self.deactivate_venv(dependencies=False, debug=debug)
713        return required
714
715
716    def get_required_plugins(self, debug: bool=False) -> List[meerschaum.plugins.Plugin]:
717        """
718        Return a list of required Plugin objects.
719        """
720        from meerschaum.utils.warnings import warn
721        from meerschaum.config import get_config
722        from meerschaum.config.static import STATIC_CONFIG
723        plugins = []
724        _deps = self.get_dependencies(debug=debug)
725        sep = STATIC_CONFIG['plugins']['repo_separator']
726        plugin_names = [
727            _d[len('plugin:'):] for _d in _deps
728            if _d.startswith('plugin:') and len(_d) > len('plugin:')
729        ]
730        default_repo_keys = get_config('meerschaum', 'default_repository')
731        for _plugin_name in plugin_names:
732            if sep in _plugin_name:
733                try:
734                    _plugin_name, _repo_keys = _plugin_name.split(sep)
735                except Exception as e:
736                    _repo_keys = default_repo_keys
737                    warn(
738                        f"Invalid repo keys for required plugin '{_plugin_name}'.\n    "
739                        + f"Will try to use '{_repo_keys}' instead.",
740                        stack = False,
741                    )
742            else:
743                _repo_keys = default_repo_keys
744            plugins.append(Plugin(_plugin_name, repo=_repo_keys))
745        return plugins
746
747
748    def get_required_packages(self, debug: bool=False) -> List[str]:
749        """
750        Return the required package names (excluding plugins).
751        """
752        _deps = self.get_dependencies(debug=debug)
753        return [_d for _d in _deps if not _d.startswith('plugin:')]
754
755
756    def activate_venv(self, dependencies: bool=True, debug: bool=False, **kw) -> bool:
757        """
758        Activate the virtual environments for the plugin and its dependencies.
759
760        Parameters
761        ----------
762        dependencies: bool, default True
763            If `True`, activate the virtual environments for required plugins.
764
765        Returns
766        -------
767        A bool indicating success.
768        """
769        from meerschaum.utils.venv import venv_target_path
770        from meerschaum.utils.packages import activate_venv
771        from meerschaum.utils.misc import make_symlink, is_symlink
772        from meerschaum.config._paths import PACKAGE_ROOT_PATH
773
774        if dependencies:
775            for plugin in self.get_required_plugins(debug=debug):
776                plugin.activate_venv(debug=debug, **kw)
777
778        vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True)
779        venv_meerschaum_path = vtp / 'meerschaum'
780
781        try:
782            success, msg = True, "Success"
783            if is_symlink(venv_meerschaum_path):
784                if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH:
785                    venv_meerschaum_path.unlink()
786                    success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH)
787        except Exception as e:
788            success, msg = False, str(e)
789        if not success:
790            warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}")
791
792        return activate_venv(self.name, debug=debug, **kw)
793
794
795    def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool:
796        """
797        Deactivate the virtual environments for the plugin and its dependencies.
798
799        Parameters
800        ----------
801        dependencies: bool, default True
802            If `True`, deactivate the virtual environments for required plugins.
803
804        Returns
805        -------
806        A bool indicating success.
807        """
808        from meerschaum.utils.packages import deactivate_venv
809        success = deactivate_venv(self.name, debug=debug, **kw)
810        if dependencies:
811            for plugin in self.get_required_plugins(debug=debug):
812                plugin.deactivate_venv(debug=debug, **kw)
813        return success
814
815
816    def install_dependencies(
817            self,
818            force: bool = False,
819            debug: bool = False,
820        ) -> bool:
821        """
822        If specified, install dependencies.
823        
824        **NOTE:** Dependencies that start with `'plugin:'` will be installed as
825        Meerschaum plugins from the same repository as this Plugin.
826        To install from a different repository, add the repo keys after `'@'`
827        (e.g. `'plugin:foo@api:bar'`).
828
829        Parameters
830        ----------
831        force: bool, default False
832            If `True`, continue with the installation, even if some
833            required packages fail to install.
834
835        debug: bool, default False
836            Verbosity toggle.
837
838        Returns
839        -------
840        A bool indicating success.
841
842        """
843        from meerschaum.utils.packages import pip_install, venv_contains_package
844        from meerschaum.utils.debug import dprint
845        from meerschaum.utils.warnings import warn, info
846        from meerschaum.connectors.parse import parse_repo_keys
847        _deps = self.get_dependencies(debug=debug)
848        if not _deps and self.requirements_file_path is None:
849            return True
850
851        plugins = self.get_required_plugins(debug=debug)
852        for _plugin in plugins:
853            if _plugin.name == self.name:
854                warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False)
855                continue
856            _success, _msg = _plugin.repo_connector.install_plugin(
857                _plugin.name, debug=debug, force=force
858            )
859            if not _success:
860                warn(
861                    f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'"
862                    + f" for plugin '{self.name}':\n" + _msg,
863                    stack = False,
864                )
865                if not force:
866                    warn(
867                        "Try installing with the `--force` flag to continue anyway.",
868                        stack = False,
869                    )
870                    return False
871                info(
872                    "Continuing with installation despite the failure "
873                    + "(careful, things might be broken!)...",
874                    icon = False
875                )
876
877
878        ### First step: parse `requirements.txt` if it exists.
879        if self.requirements_file_path is not None:
880            if not pip_install(
881                requirements_file_path=self.requirements_file_path,
882                venv=self.name, debug=debug
883            ):
884                warn(
885                    f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.",
886                    stack = False,
887                )
888                if not force:
889                    warn(
890                        "Try installing with `--force` to continue anyway.",
891                        stack = False,
892                    )
893                    return False
894                info(
895                    "Continuing with installation despite the failure "
896                    + "(careful, things might be broken!)...",
897                    icon = False
898                )
899
900
901        ### Don't reinstall packages that are already included in required plugins.
902        packages = []
903        _packages = self.get_required_packages(debug=debug)
904        accounted_for_packages = set()
905        for package_name in _packages:
906            for plugin in plugins:
907                if venv_contains_package(package_name, plugin.name):
908                    accounted_for_packages.add(package_name)
909                    break
910        packages = [pkg for pkg in _packages if pkg not in accounted_for_packages]
911
912        ### Attempt pip packages installation.
913        if packages:
914            for package in packages:
915                if not pip_install(package, venv=self.name, debug=debug):
916                    warn(
917                        f"Failed to install required package '{package}'"
918                        + f" for plugin '{self.name}'.",
919                        stack = False,
920                    )
921                    if not force:
922                        warn(
923                            "Try installing with `--force` to continue anyway.",
924                            stack = False,
925                        )
926                        return False
927                    info(
928                        "Continuing with installation despite the failure "
929                        + "(careful, things might be broken!)...",
930                        icon = False
931                    )
932        return True
933
934
935    @property
936    def full_name(self) -> str:
937        """
938        Include the repo keys with the plugin's name.
939        """
940        from meerschaum.config.static import STATIC_CONFIG
941        sep = STATIC_CONFIG['plugins']['repo_separator']
942        return self.name + sep + str(self.repo_connector)
943
944
945    def __str__(self):
946        return self.name
947
948
949    def __repr__(self):
950        return f"Plugin('{self.name}', repo='{self.repo_connector}')"
951
952
953    def __del__(self):
954        pass

Handle packaging of Meerschaum plugins.

Plugin( name: str, version: Optional[str] = None, user_id: Optional[int] = None, required: Optional[List[str]] = None, attributes: Optional[Dict[str, Any]] = None, archive_path: Optional[pathlib.Path] = None, venv_path: Optional[pathlib.Path] = None, repo_connector: Optional[meerschaum.connectors.APIConnector] = None, repo: Union[meerschaum.connectors.APIConnector, str, NoneType] = None)
35    def __init__(
36        self,
37        name: str,
38        version: Optional[str] = None,
39        user_id: Optional[int] = None,
40        required: Optional[List[str]] = None,
41        attributes: Optional[Dict[str, Any]] = None,
42        archive_path: Optional[pathlib.Path] = None,
43        venv_path: Optional[pathlib.Path] = None,
44        repo_connector: Optional['meerschaum.connectors.api.APIConnector'] = None,
45        repo: Union['meerschaum.connectors.api.APIConnector', str, None] = None,
46    ):
47        from meerschaum.config.static import STATIC_CONFIG
48        sep = STATIC_CONFIG['plugins']['repo_separator']
49        _repo = None
50        if sep in name:
51            try:
52                name, _repo = name.split(sep)
53            except Exception as e:
54                error(f"Invalid plugin name: '{name}'")
55        self._repo_in_name = _repo
56
57        if attributes is None:
58            attributes = {}
59        self.name = name
60        self.attributes = attributes
61        self.user_id = user_id
62        self._version = version
63        if required:
64            self._required = required
65        self.archive_path = (
66            archive_path if archive_path is not None
67            else PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz"
68        )
69        self.venv_path = (
70            venv_path if venv_path is not None
71            else VIRTENV_RESOURCES_PATH / self.name
72        )
73        self._repo_connector = repo_connector
74        self._repo_keys = repo
name
attributes
user_id
archive_path
venv_path
repo_connector
77    @property
78    def repo_connector(self):
79        """
80        Return the repository connector for this plugin.
81        NOTE: This imports the `connectors` module, which imports certain plugin modules.
82        """
83        if self._repo_connector is None:
84            from meerschaum.connectors.parse import parse_repo_keys
85
86            repo_keys = self._repo_keys or self._repo_in_name
87            if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name:
88                error(
89                    f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'."
90                )
91            repo_connector = parse_repo_keys(repo_keys)
92            self._repo_connector = repo_connector
93        return self._repo_connector

Return the repository connector for this plugin. NOTE: This imports the connectors module, which imports certain plugin modules.

version
 96    @property
 97    def version(self):
 98        """
 99        Return the plugin's module version is defined (`__version__`) if it's defined.
100        """
101        if self._version is None:
102            try:
103                self._version = self.module.__version__
104            except Exception as e:
105                self._version = None
106        return self._version

Return the plugin's module version is defined (__version__) if it's defined.

module
109    @property
110    def module(self):
111        """
112        Return the Python module of the underlying plugin.
113        """
114        if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None:
115            if self.__file__ is None:
116                return None
117            from meerschaum.plugins import import_plugins
118            self._module = import_plugins(str(self), warn=False)
119        return self._module

Return the Python module of the underlying plugin.

requirements_file_path: Optional[pathlib.Path]
145    @property
146    def requirements_file_path(self) -> Union[pathlib.Path, None]:
147        """
148        If a file named `requirements.txt` exists, return its path.
149        """
150        if self.__file__ is None:
151            return None
152        path = pathlib.Path(self.__file__).parent / 'requirements.txt'
153        if not path.exists():
154            return None
155        return path

If a file named requirements.txt exists, return its path.

def is_installed(self, **kw) -> bool:
158    def is_installed(self, **kw) -> bool:
159        """
160        Check whether a plugin is correctly installed.
161
162        Returns
163        -------
164        A `bool` indicating whether a plugin exists and is successfully imported.
165        """
166        return self.__file__ is not None

Check whether a plugin is correctly installed.

Returns
  • A bool indicating whether a plugin exists and is successfully imported.
def make_tar(self, debug: bool = False) -> pathlib.Path:
169    def make_tar(self, debug: bool = False) -> pathlib.Path:
170        """
171        Compress the plugin's source files into a `.tar.gz` archive and return the archive's path.
172
173        Parameters
174        ----------
175        debug: bool, default False
176            Verbosity toggle.
177
178        Returns
179        -------
180        A `pathlib.Path` to the archive file's path.
181
182        """
183        import tarfile, pathlib, subprocess, fnmatch
184        from meerschaum.utils.debug import dprint
185        from meerschaum.utils.packages import attempt_import
186        pathspec = attempt_import('pathspec', debug=debug)
187
188        if not self.__file__:
189            from meerschaum.utils.warnings import error
190            error(f"Could not find file for plugin '{self}'.")
191        if '__init__.py' in self.__file__ or os.path.isdir(self.__file__):
192            path = self.__file__.replace('__init__.py', '')
193            is_dir = True
194        else:
195            path = self.__file__
196            is_dir = False
197
198        old_cwd = os.getcwd()
199        real_parent_path = pathlib.Path(os.path.realpath(path)).parent
200        os.chdir(real_parent_path)
201
202        default_patterns_to_ignore = [
203            '.pyc',
204            '__pycache__/',
205            'eggs/',
206            '__pypackages__/',
207            '.git',
208        ]
209
210        def parse_gitignore() -> 'Set[str]':
211            gitignore_path = pathlib.Path(path) / '.gitignore'
212            if not gitignore_path.exists():
213                return set(default_patterns_to_ignore)
214            with open(gitignore_path, 'r', encoding='utf-8') as f:
215                gitignore_text = f.read()
216            return set(pathspec.PathSpec.from_lines(
217                pathspec.patterns.GitWildMatchPattern,
218                default_patterns_to_ignore + gitignore_text.splitlines()
219            ).match_tree(path))
220
221        patterns_to_ignore = parse_gitignore() if is_dir else set()
222
223        if debug:
224            dprint(f"Patterns to ignore:\n{patterns_to_ignore}")
225
226        with tarfile.open(self.archive_path, 'w:gz') as tarf:
227            if not is_dir:
228                tarf.add(f"{self.name}.py")
229            else:
230                for root, dirs, files in os.walk(self.name):
231                    for f in files:
232                        good_file = True
233                        fp = os.path.join(root, f)
234                        for pattern in patterns_to_ignore:
235                            if pattern in str(fp) or f.startswith('.'):
236                                good_file = False
237                                break
238                        if good_file:
239                            if debug:
240                                dprint(f"Adding '{fp}'...")
241                            tarf.add(fp)
242
243        ### clean up and change back to old directory
244        os.chdir(old_cwd)
245
246        ### change to 775 to avoid permissions issues with the API in a Docker container
247        self.archive_path.chmod(0o775)
248
249        if debug:
250            dprint(f"Created archive '{self.archive_path}'.")
251        return self.archive_path

Compress the plugin's source files into a .tar.gz archive and return the archive's path.

Parameters
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pathlib.Path to the archive file's path.
def install( self, skip_deps: bool = False, force: bool = False, debug: bool = False) -> Tuple[bool, str]:
254    def install(
255            self,
256            skip_deps: bool = False,
257            force: bool = False,
258            debug: bool = False,
259        ) -> SuccessTuple:
260        """
261        Extract a plugin's tar archive to the plugins directory.
262        
263        This function checks if the plugin is already installed and if the version is equal or
264        greater than the existing installation.
265
266        Parameters
267        ----------
268        skip_deps: bool, default False
269            If `True`, do not install dependencies.
270
271        force: bool, default False
272            If `True`, continue with installation, even if required packages fail to install.
273
274        debug: bool, default False
275            Verbosity toggle.
276
277        Returns
278        -------
279        A `SuccessTuple` of success (bool) and a message (str).
280
281        """
282        if self.full_name in _ongoing_installations:
283            return True, f"Already installing plugin '{self}'."
284        _ongoing_installations.add(self.full_name)
285        from meerschaum.utils.warnings import warn, error
286        if debug:
287            from meerschaum.utils.debug import dprint
288        import tarfile
289        import re
290        import ast
291        from meerschaum.plugins import sync_plugins_symlinks
292        from meerschaum.utils.packages import attempt_import, determine_version, reload_meerschaum
293        from meerschaum.utils.venv import init_venv
294        from meerschaum.utils.misc import safely_extract_tar
295        old_cwd = os.getcwd()
296        old_version = ''
297        new_version = ''
298        temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name
299        temp_dir.mkdir(exist_ok=True)
300
301        if not self.archive_path.exists():
302            return False, f"Missing archive file for plugin '{self}'."
303        if self.version is not None:
304            old_version = self.version
305            if debug:
306                dprint(f"Found existing version '{old_version}' for plugin '{self}'.")
307
308        if debug:
309            dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...")
310
311        try:
312            with tarfile.open(self.archive_path, 'r:gz') as tarf:
313                safely_extract_tar(tarf, temp_dir)
314        except Exception as e:
315            warn(e)
316            return False, f"Failed to extract plugin '{self.name}'."
317
318        ### search for version information
319        files = os.listdir(temp_dir)
320        
321        if str(files[0]) == self.name:
322            is_dir = True
323        elif str(files[0]) == self.name + '.py':
324            is_dir = False
325        else:
326            error(f"Unknown format encountered for plugin '{self}'.")
327
328        fpath = temp_dir / files[0]
329        if is_dir:
330            fpath = fpath / '__init__.py'
331
332        init_venv(self.name, debug=debug)
333        with open(fpath, 'r', encoding='utf-8') as f:
334            init_lines = f.readlines()
335        new_version = None
336        for line in init_lines:
337            if '__version__' not in line:
338                continue
339            version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip())
340            if not version_match:
341                continue
342            new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip())
343            break
344        if not new_version:
345            warn(
346                f"No `__version__` defined for plugin '{self}'. "
347                + "Assuming new version...",
348                stack = False,
349            )
350
351        packaging_version = attempt_import('packaging.version')
352        try:
353            is_new_version = (not new_version and not old_version) or (
354                packaging_version.parse(old_version) < packaging_version.parse(new_version)
355            )
356            is_same_version = new_version and old_version and (
357                packaging_version.parse(old_version) == packaging_version.parse(new_version)
358            )
359        except Exception as e:
360            is_new_version, is_same_version = True, False
361
362        ### Determine where to permanently store the new plugin.
363        plugin_installation_dir_path = PLUGINS_DIR_PATHS[0]
364        for path in PLUGINS_DIR_PATHS:
365            files_in_plugins_dir = os.listdir(path)
366            if (
367                self.name in files_in_plugins_dir
368                or
369                (self.name + '.py') in files_in_plugins_dir
370            ):
371                plugin_installation_dir_path = path
372                break
373
374        success_msg = (
375            f"Successfully installed plugin '{self}'"
376            + ("\n    (skipped dependencies)" if skip_deps else "")
377            + "."
378        )
379        success, abort = None, None
380
381        if is_same_version and not force:
382            success, msg = True, (
383                f"Plugin '{self}' is up-to-date (version {old_version}).\n" +
384                "    Install again with `-f` or `--force` to reinstall."
385            )
386            abort = True
387        elif is_new_version or force:
388            for src_dir, dirs, files in os.walk(temp_dir):
389                if success is not None:
390                    break
391                dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path))
392                if not os.path.exists(dst_dir):
393                    os.mkdir(dst_dir)
394                for f in files:
395                    src_file = os.path.join(src_dir, f)
396                    dst_file = os.path.join(dst_dir, f)
397                    if os.path.exists(dst_file):
398                        os.remove(dst_file)
399
400                    if debug:
401                        dprint(f"Moving '{src_file}' to '{dst_dir}'...")
402                    try:
403                        shutil.move(src_file, dst_dir)
404                    except Exception as e:
405                        success, msg = False, (
406                            f"Failed to install plugin '{self}': " +
407                            f"Could not move file '{src_file}' to '{dst_dir}'"
408                        )
409                        print(msg)
410                        break
411            if success is None:
412                success, msg = True, success_msg
413        else:
414            success, msg = False, (
415                f"Your installed version of plugin '{self}' ({old_version}) is higher than "
416                + f"attempted version {new_version}."
417            )
418
419        shutil.rmtree(temp_dir)
420        os.chdir(old_cwd)
421
422        ### Reload the plugin's module.
423        sync_plugins_symlinks(debug=debug)
424        if '_module' in self.__dict__:
425            del self.__dict__['_module']
426        init_venv(venv=self.name, force=True, debug=debug)
427        reload_meerschaum(debug=debug)
428
429        ### if we've already failed, return here
430        if not success or abort:
431            _ongoing_installations.remove(self.full_name)
432            return success, msg
433
434        ### attempt to install dependencies
435        dependencies_installed = skip_deps or self.install_dependencies(force=force, debug=debug)
436        if not dependencies_installed:
437            _ongoing_installations.remove(self.full_name)
438            return False, f"Failed to install dependencies for plugin '{self}'."
439
440        ### handling success tuple, bool, or other (typically None)
441        setup_tuple = self.setup(debug=debug)
442        if isinstance(setup_tuple, tuple):
443            if not setup_tuple[0]:
444                success, msg = setup_tuple
445        elif isinstance(setup_tuple, bool):
446            if not setup_tuple:
447                success, msg = False, (
448                    f"Failed to run post-install setup for plugin '{self}'." + '\n' +
449                    f"Check `setup()` in '{self.__file__}' for more information " +
450                    f"(no error message provided)."
451                )
452            else:
453                success, msg = True, success_msg
454        elif setup_tuple is None:
455            success = True
456            msg = (
457                f"Post-install for plugin '{self}' returned None. " +
458                f"Assuming plugin successfully installed."
459            )
460            warn(msg)
461        else:
462            success = False
463            msg = (
464                f"Post-install for plugin '{self}' returned unexpected value " +
465                f"of type '{type(setup_tuple)}': {setup_tuple}"
466            )
467
468        _ongoing_installations.remove(self.full_name)
469        module = self.module
470        return success, msg

Extract a plugin's tar archive to the plugins directory.

This function checks if the plugin is already installed and if the version is equal or greater than the existing installation.

Parameters
  • skip_deps (bool, default False): If True, do not install dependencies.
  • force (bool, default False): If True, continue with installation, even if required packages fail to install.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A SuccessTuple of success (bool) and a message (str).
def remove_archive(self, debug: bool = False) -> Tuple[bool, str]:
473    def remove_archive(
474            self,        
475            debug: bool = False
476        ) -> SuccessTuple:
477        """Remove a plugin's archive file."""
478        if not self.archive_path.exists():
479            return True, f"Archive file for plugin '{self}' does not exist."
480        try:
481            self.archive_path.unlink()
482        except Exception as e:
483            return False, f"Failed to remove archive for plugin '{self}':\n{e}"
484        return True, "Success"

Remove a plugin's archive file.

def remove_venv(self, debug: bool = False) -> Tuple[bool, str]:
487    def remove_venv(
488            self,        
489            debug: bool = False
490        ) -> SuccessTuple:
491        """Remove a plugin's virtual environment."""
492        if not self.venv_path.exists():
493            return True, f"Virtual environment for plugin '{self}' does not exist."
494        try:
495            shutil.rmtree(self.venv_path)
496        except Exception as e:
497            return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}"
498        return True, "Success"

Remove a plugin's virtual environment.

def uninstall(self, debug: bool = False) -> Tuple[bool, str]:
501    def uninstall(self, debug: bool = False) -> SuccessTuple:
502        """
503        Remove a plugin, its virtual environment, and archive file.
504        """
505        from meerschaum.utils.packages import reload_meerschaum
506        from meerschaum.plugins import sync_plugins_symlinks
507        from meerschaum.utils.warnings import warn, info
508        warnings_thrown_count: int = 0
509        max_warnings: int = 3
510
511        if not self.is_installed():
512            info(
513                f"Plugin '{self.name}' doesn't seem to be installed.\n    "
514                + "Checking for artifacts...",
515                stack = False,
516            )
517        else:
518            real_path = pathlib.Path(os.path.realpath(self.__file__))
519            try:
520                if real_path.name == '__init__.py':
521                    shutil.rmtree(real_path.parent)
522                else:
523                    real_path.unlink()
524            except Exception as e:
525                warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False)
526                warnings_thrown_count += 1
527            else:
528                info(f"Removed source files for plugin '{self.name}'.")
529
530        if self.venv_path.exists():
531            success, msg = self.remove_venv(debug=debug)
532            if not success:
533                warn(msg, stack=False)
534                warnings_thrown_count += 1
535            else:
536                info(f"Removed virtual environment from plugin '{self.name}'.")
537
538        success = warnings_thrown_count < max_warnings
539        sync_plugins_symlinks(debug=debug)
540        self.deactivate_venv(force=True, debug=debug)
541        reload_meerschaum(debug=debug)
542        return success, (
543            f"Successfully uninstalled plugin '{self}'." if success
544            else f"Failed to uninstall plugin '{self}'."
545        )

Remove a plugin, its virtual environment, and archive file.

def setup( self, *args: str, debug: bool = False, **kw: Any) -> Union[Tuple[bool, str], bool]:
548    def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]:
549        """
550        If exists, run the plugin's `setup()` function.
551
552        Parameters
553        ----------
554        *args: str
555            The positional arguments passed to the `setup()` function.
556            
557        debug: bool, default False
558            Verbosity toggle.
559
560        **kw: Any
561            The keyword arguments passed to the `setup()` function.
562
563        Returns
564        -------
565        A `SuccessTuple` or `bool` indicating success.
566
567        """
568        from meerschaum.utils.debug import dprint
569        import inspect
570        _setup = None
571        for name, fp in inspect.getmembers(self.module):
572            if name == 'setup' and inspect.isfunction(fp):
573                _setup = fp
574                break
575
576        ### assume success if no setup() is found (not necessary)
577        if _setup is None:
578            return True
579
580        sig = inspect.signature(_setup)
581        has_debug, has_kw = ('debug' in sig.parameters), False
582        for k, v in sig.parameters.items():
583            if '**' in str(v):
584                has_kw = True
585                break
586
587        _kw = {}
588        if has_kw:
589            _kw.update(kw)
590        if has_debug:
591            _kw['debug'] = debug
592
593        if debug:
594            dprint(f"Running setup for plugin '{self}'...")
595        try:
596            self.activate_venv(debug=debug)
597            return_tuple = _setup(*args, **_kw)
598            self.deactivate_venv(debug=debug)
599        except Exception as e:
600            return False, str(e)
601
602        if isinstance(return_tuple, tuple):
603            return return_tuple
604        if isinstance(return_tuple, bool):
605            return return_tuple, f"Setup for Plugin '{self.name}' did not return a message."
606        if return_tuple is None:
607            return False, f"Setup for Plugin '{self.name}' returned None."
608        return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"

If exists, run the plugin's setup() function.

Parameters
  • *args (str): The positional arguments passed to the setup() function.
  • debug (bool, default False): Verbosity toggle.
  • **kw (Any): The keyword arguments passed to the setup() function.
Returns
  • A SuccessTuple or bool indicating success.
def get_dependencies(self, debug: bool = False) -> List[str]:
611    def get_dependencies(
612            self,
613            debug: bool = False,
614        ) -> List[str]:
615        """
616        If the Plugin has specified dependencies in a list called `required`, return the list.
617        
618        **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages.
619        Meerschaum plugins may also specify connector keys for a repo after `'@'`.
620
621        Parameters
622        ----------
623        debug: bool, default False
624            Verbosity toggle.
625
626        Returns
627        -------
628        A list of required packages and plugins (str).
629
630        """
631        if '_required' in self.__dict__:
632            return self._required
633
634        ### If the plugin has not yet been imported,
635        ### infer the dependencies from the source text.
636        ### This is not super robust, and it doesn't feel right
637        ### having multiple versions of the logic.
638        ### This is necessary when determining the activation order
639        ### without having import the module.
640        ### For consistency's sake, the module-less method does not cache the requirements.
641        if self.__dict__.get('_module', None) is None:
642            file_path = self.__file__
643            if file_path is None:
644                return []
645            with open(file_path, 'r', encoding='utf-8') as f:
646                text = f.read()
647
648            if 'required' not in text:
649                return []
650
651            ### This has some limitations:
652            ### It relies on `required` being manually declared.
653            ### We lose the ability to dynamically alter the `required` list,
654            ### which is why we've kept the module-reliant method below.
655            import ast, re
656            ### NOTE: This technically would break 
657            ### if `required` was the very first line of the file.
658            req_start_match = re.search(r'required(:\s*)?.*=', text)
659            if not req_start_match:
660                return []
661            req_start = req_start_match.start()
662            equals_sign = req_start + text[req_start:].find('=')
663
664            ### Dependencies may have brackets within the strings, so push back the index.
665            first_opening_brace = equals_sign + 1 + text[equals_sign:].find('[')
666            if first_opening_brace == -1:
667                return []
668
669            next_closing_brace = equals_sign + 1 + text[equals_sign:].find(']')
670            if next_closing_brace == -1:
671                return []
672
673            start_ix = first_opening_brace + 1
674            end_ix = next_closing_brace
675
676            num_braces = 0
677            while True:
678                if '[' not in text[start_ix:end_ix]:
679                    break
680                num_braces += 1
681                start_ix = end_ix
682                end_ix += text[end_ix + 1:].find(']') + 1
683
684            req_end = end_ix + 1
685            req_text = (
686                text[(first_opening_brace-1):req_end]
687                .lstrip()
688                .replace('=', '', 1)
689                .lstrip()
690                .rstrip()
691            )
692            try:
693                required = ast.literal_eval(req_text)
694            except Exception as e:
695                warn(
696                    f"Unable to determine requirements for plugin '{self.name}' "
697                    + "without importing the module.\n"
698                    + "    This may be due to dynamically setting the global `required` list.\n"
699                    + f"    {e}"
700                )
701                return []
702            return required
703
704        import inspect
705        self.activate_venv(dependencies=False, debug=debug)
706        required = []
707        for name, val in inspect.getmembers(self.module):
708            if name == 'required':
709                required = val
710                break
711        self._required = required
712        self.deactivate_venv(dependencies=False, debug=debug)
713        return required

If the Plugin has specified dependencies in a list called required, return the list.

NOTE: Dependecies which start with 'plugin:' are Meerschaum plugins, not pip packages. Meerschaum plugins may also specify connector keys for a repo after '@'.

Parameters
  • debug (bool, default False): Verbosity toggle.
Returns
  • A list of required packages and plugins (str).
def get_required_plugins(self, debug: bool = False) -> List[Plugin]:
716    def get_required_plugins(self, debug: bool=False) -> List[meerschaum.plugins.Plugin]:
717        """
718        Return a list of required Plugin objects.
719        """
720        from meerschaum.utils.warnings import warn
721        from meerschaum.config import get_config
722        from meerschaum.config.static import STATIC_CONFIG
723        plugins = []
724        _deps = self.get_dependencies(debug=debug)
725        sep = STATIC_CONFIG['plugins']['repo_separator']
726        plugin_names = [
727            _d[len('plugin:'):] for _d in _deps
728            if _d.startswith('plugin:') and len(_d) > len('plugin:')
729        ]
730        default_repo_keys = get_config('meerschaum', 'default_repository')
731        for _plugin_name in plugin_names:
732            if sep in _plugin_name:
733                try:
734                    _plugin_name, _repo_keys = _plugin_name.split(sep)
735                except Exception as e:
736                    _repo_keys = default_repo_keys
737                    warn(
738                        f"Invalid repo keys for required plugin '{_plugin_name}'.\n    "
739                        + f"Will try to use '{_repo_keys}' instead.",
740                        stack = False,
741                    )
742            else:
743                _repo_keys = default_repo_keys
744            plugins.append(Plugin(_plugin_name, repo=_repo_keys))
745        return plugins

Return a list of required Plugin objects.

def get_required_packages(self, debug: bool = False) -> List[str]:
748    def get_required_packages(self, debug: bool=False) -> List[str]:
749        """
750        Return the required package names (excluding plugins).
751        """
752        _deps = self.get_dependencies(debug=debug)
753        return [_d for _d in _deps if not _d.startswith('plugin:')]

Return the required package names (excluding plugins).

def activate_venv(self, dependencies: bool = True, debug: bool = False, **kw) -> bool:
756    def activate_venv(self, dependencies: bool=True, debug: bool=False, **kw) -> bool:
757        """
758        Activate the virtual environments for the plugin and its dependencies.
759
760        Parameters
761        ----------
762        dependencies: bool, default True
763            If `True`, activate the virtual environments for required plugins.
764
765        Returns
766        -------
767        A bool indicating success.
768        """
769        from meerschaum.utils.venv import venv_target_path
770        from meerschaum.utils.packages import activate_venv
771        from meerschaum.utils.misc import make_symlink, is_symlink
772        from meerschaum.config._paths import PACKAGE_ROOT_PATH
773
774        if dependencies:
775            for plugin in self.get_required_plugins(debug=debug):
776                plugin.activate_venv(debug=debug, **kw)
777
778        vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True)
779        venv_meerschaum_path = vtp / 'meerschaum'
780
781        try:
782            success, msg = True, "Success"
783            if is_symlink(venv_meerschaum_path):
784                if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH:
785                    venv_meerschaum_path.unlink()
786                    success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH)
787        except Exception as e:
788            success, msg = False, str(e)
789        if not success:
790            warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}")
791
792        return activate_venv(self.name, debug=debug, **kw)

Activate the virtual environments for the plugin and its dependencies.

Parameters
  • dependencies (bool, default True): If True, activate the virtual environments for required plugins.
Returns
  • A bool indicating success.
def deactivate_venv(self, dependencies: bool = True, debug: bool = False, **kw) -> bool:
795    def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool:
796        """
797        Deactivate the virtual environments for the plugin and its dependencies.
798
799        Parameters
800        ----------
801        dependencies: bool, default True
802            If `True`, deactivate the virtual environments for required plugins.
803
804        Returns
805        -------
806        A bool indicating success.
807        """
808        from meerschaum.utils.packages import deactivate_venv
809        success = deactivate_venv(self.name, debug=debug, **kw)
810        if dependencies:
811            for plugin in self.get_required_plugins(debug=debug):
812                plugin.deactivate_venv(debug=debug, **kw)
813        return success

Deactivate the virtual environments for the plugin and its dependencies.

Parameters
  • dependencies (bool, default True): If True, deactivate the virtual environments for required plugins.
Returns
  • A bool indicating success.
def install_dependencies(self, force: bool = False, debug: bool = False) -> bool:
816    def install_dependencies(
817            self,
818            force: bool = False,
819            debug: bool = False,
820        ) -> bool:
821        """
822        If specified, install dependencies.
823        
824        **NOTE:** Dependencies that start with `'plugin:'` will be installed as
825        Meerschaum plugins from the same repository as this Plugin.
826        To install from a different repository, add the repo keys after `'@'`
827        (e.g. `'plugin:foo@api:bar'`).
828
829        Parameters
830        ----------
831        force: bool, default False
832            If `True`, continue with the installation, even if some
833            required packages fail to install.
834
835        debug: bool, default False
836            Verbosity toggle.
837
838        Returns
839        -------
840        A bool indicating success.
841
842        """
843        from meerschaum.utils.packages import pip_install, venv_contains_package
844        from meerschaum.utils.debug import dprint
845        from meerschaum.utils.warnings import warn, info
846        from meerschaum.connectors.parse import parse_repo_keys
847        _deps = self.get_dependencies(debug=debug)
848        if not _deps and self.requirements_file_path is None:
849            return True
850
851        plugins = self.get_required_plugins(debug=debug)
852        for _plugin in plugins:
853            if _plugin.name == self.name:
854                warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False)
855                continue
856            _success, _msg = _plugin.repo_connector.install_plugin(
857                _plugin.name, debug=debug, force=force
858            )
859            if not _success:
860                warn(
861                    f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'"
862                    + f" for plugin '{self.name}':\n" + _msg,
863                    stack = False,
864                )
865                if not force:
866                    warn(
867                        "Try installing with the `--force` flag to continue anyway.",
868                        stack = False,
869                    )
870                    return False
871                info(
872                    "Continuing with installation despite the failure "
873                    + "(careful, things might be broken!)...",
874                    icon = False
875                )
876
877
878        ### First step: parse `requirements.txt` if it exists.
879        if self.requirements_file_path is not None:
880            if not pip_install(
881                requirements_file_path=self.requirements_file_path,
882                venv=self.name, debug=debug
883            ):
884                warn(
885                    f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.",
886                    stack = False,
887                )
888                if not force:
889                    warn(
890                        "Try installing with `--force` to continue anyway.",
891                        stack = False,
892                    )
893                    return False
894                info(
895                    "Continuing with installation despite the failure "
896                    + "(careful, things might be broken!)...",
897                    icon = False
898                )
899
900
901        ### Don't reinstall packages that are already included in required plugins.
902        packages = []
903        _packages = self.get_required_packages(debug=debug)
904        accounted_for_packages = set()
905        for package_name in _packages:
906            for plugin in plugins:
907                if venv_contains_package(package_name, plugin.name):
908                    accounted_for_packages.add(package_name)
909                    break
910        packages = [pkg for pkg in _packages if pkg not in accounted_for_packages]
911
912        ### Attempt pip packages installation.
913        if packages:
914            for package in packages:
915                if not pip_install(package, venv=self.name, debug=debug):
916                    warn(
917                        f"Failed to install required package '{package}'"
918                        + f" for plugin '{self.name}'.",
919                        stack = False,
920                    )
921                    if not force:
922                        warn(
923                            "Try installing with `--force` to continue anyway.",
924                            stack = False,
925                        )
926                        return False
927                    info(
928                        "Continuing with installation despite the failure "
929                        + "(careful, things might be broken!)...",
930                        icon = False
931                    )
932        return True

If specified, install dependencies.

NOTE: Dependencies that start with 'plugin:' will be installed as Meerschaum plugins from the same repository as this Plugin. To install from a different repository, add the repo keys after '@' (e.g. 'plugin:foo@api:bar').

Parameters
  • force (bool, default False): If True, continue with the installation, even if some required packages fail to install.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool indicating success.
full_name: str
935    @property
936    def full_name(self) -> str:
937        """
938        Include the repo keys with the plugin's name.
939        """
940        from meerschaum.config.static import STATIC_CONFIG
941        sep = STATIC_CONFIG['plugins']['repo_separator']
942        return self.name + sep + str(self.repo_connector)

Include the repo keys with the plugin's name.

def make_action( function: Callable[[Any], Any], shell: bool = False, activate: bool = True, deactivate: bool = True, debug: bool = False) -> Callable[[Any], Any]:
39def make_action(
40        function: Callable[[Any], Any],
41        shell: bool = False,
42        activate: bool = True,
43        deactivate: bool = True,
44        debug: bool = False
45    ) -> Callable[[Any], Any]:
46    """
47    Make a function a Meerschaum action. Useful for plugins that are adding multiple actions.
48    
49    Parameters
50    ----------
51    function: Callable[[Any], Any]
52        The function to become a Meerschaum action. Must accept all keyword arguments.
53        
54    shell: bool, default False
55        Not used.
56        
57    Returns
58    -------
59    Another function (this is a decorator function).
60
61    Examples
62    --------
63    >>> from meerschaum.plugins import make_action
64    >>>
65    >>> @make_action
66    ... def my_action(**kw):
67    ...     print('foo')
68    ...     return True, "Success"
69    >>>
70    """
71
72    from meerschaum.actions import actions
73    from meerschaum.utils.formatting import pprint
74    package_name = function.__globals__['__name__']
75    plugin_name = (
76        package_name.split('.')[1]
77        if package_name.startswith('plugins.') else None
78    )
79    plugin = Plugin(plugin_name) if plugin_name else None
80
81    if debug:
82        from meerschaum.utils.debug import dprint
83        dprint(
84            f"Adding action '{function.__name__}' from plugin " +
85            f"'{plugin}'..."
86        )
87
88    actions[function.__name__] = function
89    return function

Make a function a Meerschaum action. Useful for plugins that are adding multiple actions.

Parameters
  • function (Callable[[Any], Any]): The function to become a Meerschaum action. Must accept all keyword arguments.
  • shell (bool, default False): Not used.
Returns
  • Another function (this is a decorator function).
Examples
>>> from meerschaum.plugins import make_action
>>>
>>> @make_action
... def my_action(**kw):
...     print('foo')
...     return True, "Success"
>>>
def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
227def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
228    """
229    Execute the function when initializing the Meerschaum API module.
230    Useful for lazy-loading heavy plugins only when the API is started,
231    such as when editing the `meerschaum.api.app` FastAPI app.
232    
233    The FastAPI app will be passed as the only parameter.
234    
235    Examples
236    --------
237    >>> from meerschaum.plugins import api_plugin
238    >>>
239    >>> @api_plugin
240    >>> def initialize_plugin(app):
241    ...     @app.get('/my/new/path')
242    ...     def new_path():
243    ...         return {'message': 'It works!'}
244    >>>
245    """
246    with _locks['_api_plugins']:
247        try:
248            if function.__module__ not in _api_plugins:
249                _api_plugins[function.__module__] = []
250            _api_plugins[function.__module__].append(function)
251        except Exception as e:
252            from meerschaum.utils.warnings import warn
253            warn(e)
254    return function

Execute the function when initializing the Meerschaum API module. Useful for lazy-loading heavy plugins only when the API is started, such as when editing the meerschaum.api.app FastAPI app.

The FastAPI app will be passed as the only parameter.

Examples
>>> from meerschaum.plugins import api_plugin
>>>
>>> @api_plugin
>>> def initialize_plugin(app):
...     @app.get('/my/new/path')
...     def new_path():
...         return {'message': 'It works!'}
>>>
def dash_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
212def dash_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
213    """
214    Execute the function when starting the Dash application.
215    """
216    with _locks['_dash_plugins']:
217        try:
218            if function.__module__ not in _dash_plugins:
219                _dash_plugins[function.__module__] = []
220            _dash_plugins[function.__module__].append(function)
221        except Exception as e:
222            from meerschaum.utils.warnings import warn
223            warn(e)
224    return function

Execute the function when starting the Dash application.

def import_plugins( *plugins_to_import: Union[str, List[str], NoneType], warn: bool = True) -> "Union['ModuleType', Tuple['ModuleType', None]]":
432def import_plugins(
433        *plugins_to_import: Union[str, List[str], None],
434        warn: bool = True,
435    ) -> Union[
436        'ModuleType', Tuple['ModuleType', None]
437    ]:
438    """
439    Import the Meerschaum plugins directory.
440
441    Parameters
442    ----------
443    plugins_to_import: Union[str, List[str], None]
444        If provided, only import the specified plugins.
445        Otherwise import the entire plugins module. May be a string, list, or `None`.
446        Defaults to `None`.
447
448    Returns
449    -------
450    A module of list of modules, depening on the number of plugins provided.
451
452    """
453    import sys
454    import os
455    import importlib
456    from meerschaum.utils.misc import flatten_list
457    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
458    from meerschaum.utils.venv import is_venv_active, activate_venv, deactivate_venv, Venv
459    from meerschaum.utils.warnings import warn as _warn
460    plugins_to_import = list(plugins_to_import)
461    with _locks['sys.path']:
462
463        ### Since plugins may depend on other plugins,
464        ### we need to activate the virtual environments for library plugins.
465        ### This logic exists in `Plugin.activate_venv()`,
466        ### but that code requires the plugin's module to already be imported.
467        ### It's not a guarantee of correct activation order,
468        ### e.g. if a library plugin pins a specific package and another 
469        plugins_names = get_plugins_names()
470        already_active_venvs = [is_venv_active(plugin_name) for plugin_name in plugins_names]
471
472        if not sys.path or sys.path[0] != str(PLUGINS_RESOURCES_PATH.parent):
473            sys.path.insert(0, str(PLUGINS_RESOURCES_PATH.parent))
474
475        if not plugins_to_import:
476            for plugin_name in plugins_names:
477                activate_venv(plugin_name)
478            try:
479                imported_plugins = importlib.import_module(PLUGINS_RESOURCES_PATH.stem)
480            except ImportError as e:
481                _warn(f"Failed to import the plugins module:\n    {e}")
482                import traceback
483                traceback.print_exc()
484                imported_plugins = None
485            for plugin_name in plugins_names:
486                if plugin_name in already_active_venvs:
487                    continue
488                deactivate_venv(plugin_name)
489
490        else:
491            imported_plugins = []
492            for plugin_name in flatten_list(plugins_to_import):
493                plugin = Plugin(plugin_name)
494                try:
495                    with Venv(plugin):
496                        imported_plugins.append(
497                            importlib.import_module(
498                                f'{PLUGINS_RESOURCES_PATH.stem}.{plugin_name}'
499                            )
500                        )
501                except Exception as e:
502                    _warn(
503                        f"Failed to import plugin '{plugin_name}':\n    "
504                        + f"{e}\n\nHere's a stacktrace:",
505                        stack = False,
506                    )
507                    from meerschaum.utils.formatting import get_console
508                    get_console().print_exception(
509                        suppress = [
510                            'meerschaum/plugins/__init__.py',
511                            importlib,
512                            importlib._bootstrap,
513                        ]
514                    )
515                    imported_plugins.append(None)
516
517        if imported_plugins is None and warn:
518            _warn(f"Failed to import plugins.", stacklevel=3)
519
520        if str(PLUGINS_RESOURCES_PATH.parent) in sys.path:
521            sys.path.remove(str(PLUGINS_RESOURCES_PATH.parent))
522
523    if isinstance(imported_plugins, list):
524        return (imported_plugins[0] if len(imported_plugins) == 1 else tuple(imported_plugins))
525    return imported_plugins

Import the Meerschaum plugins directory.

Parameters
  • plugins_to_import (Union[str, List[str], None]): If provided, only import the specified plugins. Otherwise import the entire plugins module. May be a string, list, or None. Defaults to None.
Returns
  • A module of list of modules, depening on the number of plugins provided.
def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
567def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
568    """
569    Reload plugins back into memory.
570
571    Parameters
572    ----------
573    plugins: Optional[List[str]], default None
574        The plugins to reload. `None` will reload all plugins.
575
576    """
577    import sys
578    if debug:
579        from meerschaum.utils.debug import dprint
580
581    if not plugins:
582        plugins = get_plugins_names()
583    for plugin_name in plugins:
584        if debug:
585            dprint(f"Reloading plugin '{plugin_name}'...")
586        mod_name = 'plugins.' + str(plugin_name)
587        if mod_name in sys.modules:
588            del sys.modules[mod_name]
589    load_plugins(debug=debug)

Reload plugins back into memory.

Parameters
  • plugins (Optional[List[str]], default None): The plugins to reload. None will reload all plugins.
def get_plugins( *to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]:
592def get_plugins(*to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]:
593    """
594    Return a list of `Plugin` objects.
595
596    Parameters
597    ----------
598    to_load:
599        If specified, only load specific plugins.
600        Otherwise return all plugins.
601
602    try_import: bool, default True
603        If `True`, allow for plugins to be imported.
604    """
605    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
606    import os
607    sync_plugins_symlinks()
608    _plugins = [
609        Plugin(name) for name in (
610            to_load or [
611                (
612                    name if (PLUGINS_RESOURCES_PATH / name).is_dir()
613                    else name[:-3]
614                ) for name in os.listdir(PLUGINS_RESOURCES_PATH) if name != '__init__.py'
615            ]
616        )
617    ]
618    plugins = tuple(plugin for plugin in _plugins if plugin.is_installed(try_import=try_import))
619    if len(to_load) == 1:
620        if len(plugins) == 0:
621            raise ValueError(f"Plugin '{to_load[0]}' is not installed.")
622        return plugins[0]
623    return plugins

Return a list of Plugin objects.

Parameters
  • to_load:: If specified, only load specific plugins. Otherwise return all plugins.
  • try_import (bool, default True): If True, allow for plugins to be imported.
def get_data_plugins() -> List[Plugin]:
640def get_data_plugins() -> List[Plugin]:
641    """
642    Only return the modules of plugins with either `fetch()` or `sync()` functions.
643    """
644    import inspect
645    plugins = get_plugins()
646    data_names = {'sync', 'fetch'}
647    data_plugins = []
648    for plugin in plugins:
649        for name, ob in inspect.getmembers(plugin.module):
650            if not inspect.isfunction(ob):
651                continue
652            if name not in data_names:
653                continue
654            data_plugins.append(plugin)
655    return data_plugins

Only return the modules of plugins with either fetch() or sync() functions.

def add_plugin_argument(*args, **kwargs) -> None:
658def add_plugin_argument(*args, **kwargs) -> None:
659    """
660    Add argparse arguments under the 'Plugins options' group.
661    Takes the same parameters as the regular argparse `add_argument()` function.
662
663    Examples
664    --------
665    >>> add_plugin_argument('--foo', type=int, help="This is my help text!")
666    >>> 
667    """
668    from meerschaum._internal.arguments._parser import groups, _seen_plugin_args, parser
669    from meerschaum.utils.warnings import warn, error
670    _parent_plugin_name = _get_parent_plugin(2)
671    title = f"Plugin '{_parent_plugin_name}' options" if _parent_plugin_name else 'Custom options'
672    group_key = 'plugin_' + (_parent_plugin_name or '')
673    if group_key not in groups:
674        groups[group_key] = parser.add_argument_group(
675            title = title,
676        )
677        _seen_plugin_args[group_key] = set()
678    try:
679        if str(args) not in _seen_plugin_args[group_key]:
680            groups[group_key].add_argument(*args, **kwargs)
681            _seen_plugin_args[group_key].add(str(args))
682    except Exception as e:
683        warn(e)

Add argparse arguments under the 'Plugins options' group. Takes the same parameters as the regular argparse add_argument() function.

Examples
>>> add_plugin_argument('--foo', type=int, help="This is my help text!")
>>>
def pre_sync_hook(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
 92def pre_sync_hook(
 93        function: Callable[[Any], Any],
 94    ) -> Callable[[Any], Any]:
 95    """
 96    Register a function as a sync hook to be executed right before sync.
 97    
 98    Parameters
 99    ----------
100    function: Callable[[Any], Any]
101        The function to execute right before a sync.
102        
103    Returns
104    -------
105    Another function (this is a decorator function).
106
107    Examples
108    --------
109    >>> from meerschaum.plugins import pre_sync_hook
110    >>>
111    >>> @pre_sync_hook
112    ... def log_sync(pipe, **kwargs):
113    ...     print(f"About to sync {pipe} with kwargs:\n{kwargs}.")
114    >>>
115    """
116    with _locks['_pre_sync_hooks']:
117        try:
118            if function.__module__ not in _pre_sync_hooks:
119                _pre_sync_hooks[function.__module__] = []
120            _pre_sync_hooks[function.__module__].append(function)
121        except Exception as e:
122            from meerschaum.utils.warnings import warn
123            warn(e)
124    return function

Register a function as a sync hook to be executed right before sync.

Parameters
----------
function: Callable[[Any], Any]
    The function to execute right before a sync.

Returns
-------
Another function (this is a decorator function).

Examples
--------
>>> from meerschaum.plugins import pre_sync_hook
>>>
>>> @pre_sync_hook
... def log_sync(pipe, **kwargs):
...     print(f"About to sync {pipe} with kwargs:

{kwargs}.")

>

def post_sync_hook(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
127def post_sync_hook(
128        function: Callable[[Any], Any],
129    ) -> Callable[[Any], Any]:
130    """
131    Register a function as a sync hook to be executed upon completion of a sync.
132    
133    Parameters
134    ----------
135    function: Callable[[Any], Any]
136        The function to execute upon completion of a sync.
137        
138    Returns
139    -------
140    Another function (this is a decorator function).
141
142    Examples
143    --------
144    >>> from meerschaum.plugins import post_sync_hook
145    >>>
146    >>> @post_sync_hook
147    ... def log_sync(pipe, success_tuple, duration=None, **kwargs):
148    ...     print(f"It took {round(duration, 2)} seconds to sync {pipe}.")
149    >>>
150    """
151    with _locks['_post_sync_hooks']:
152        try:
153            if function.__module__ not in _post_sync_hooks:
154                _post_sync_hooks[function.__module__] = []
155            _post_sync_hooks[function.__module__].append(function)
156        except Exception as e:
157            from meerschaum.utils.warnings import warn
158            warn(e)
159    return function

Register a function as a sync hook to be executed upon completion of a sync.

Parameters
  • function (Callable[[Any], Any]): The function to execute upon completion of a sync.
Returns
  • Another function (this is a decorator function).
Examples
>>> from meerschaum.plugins import post_sync_hook
>>>
>>> @post_sync_hook
... def log_sync(pipe, success_tuple, duration=None, **kwargs):
...     print(f"It took {round(duration, 2)} seconds to sync {pipe}.")
>>>