meerschaum.plugins

Expose plugin management APIs from the meerschaum.plugins module.

  1#! /usr/bin/env python
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Expose plugin management APIs from the `meerschaum.plugins` module.
  7"""
  8
  9from __future__ import annotations
 10import functools
 11import meerschaum as mrsm
 12from meerschaum.utils.typing import Callable, Any, Union, Optional, Dict, List, Tuple
 13from meerschaum.utils.threading import Lock, RLock
 14from meerschaum.plugins._Plugin import Plugin
 15
 16_api_plugins: Dict[str, List[Callable[['fastapi.App'], Any]]] = {}
 17_pre_sync_hooks: Dict[str, List[Callable[[Any], Any]]] = {}
 18_post_sync_hooks: Dict[str, List[Callable[[Any], Any]]] = {}
 19_locks = {
 20    '_api_plugins': RLock(),
 21    '_dash_plugins': RLock(),
 22    '_pre_sync_hooks': RLock(),
 23    '_post_sync_hooks': RLock(),
 24    '__path__': RLock(),
 25    'sys.path': RLock(),
 26    'internal_plugins': RLock(),
 27    '_synced_symlinks': RLock(),
 28    'PLUGINS_INTERNAL_LOCK_PATH': RLock(),
 29}
 30__all__ = (
 31    "Plugin", "make_action", "api_plugin", "dash_plugin", "web_page",
 32    "import_plugins", "from_plugin_import",
 33    "reload_plugins", "get_plugins", "get_data_plugins", "add_plugin_argument",
 34    "pre_sync_hook", "post_sync_hook",
 35)
 36__pdoc__ = {
 37    'venvs': False, 'data': False, 'stack': False, 'plugins': False,
 38}
 39
 40def make_action(
 41    function: Callable[[Any], Any],
 42    shell: bool = False,
 43    activate: bool = True,
 44    deactivate: bool = True,
 45    debug: bool = False
 46) -> Callable[[Any], Any]:
 47    """
 48    Make a function a Meerschaum action. Useful for plugins that are adding multiple actions.
 49    
 50    Parameters
 51    ----------
 52    function: Callable[[Any], Any]
 53        The function to become a Meerschaum action. Must accept all keyword arguments.
 54        
 55    shell: bool, default False
 56        Not used.
 57        
 58    Returns
 59    -------
 60    Another function (this is a decorator function).
 61
 62    Examples
 63    --------
 64    >>> from meerschaum.plugins import make_action
 65    >>>
 66    >>> @make_action
 67    ... def my_action(**kw):
 68    ...     print('foo')
 69    ...     return True, "Success"
 70    >>>
 71    """
 72
 73    from meerschaum.actions import actions
 74    from meerschaum.utils.formatting import pprint
 75    package_name = function.__globals__['__name__']
 76    plugin_name = (
 77        package_name.split('.')[1]
 78        if package_name.startswith('plugins.') else None
 79    )
 80    plugin = Plugin(plugin_name) if plugin_name else None
 81
 82    if debug:
 83        from meerschaum.utils.debug import dprint
 84        dprint(
 85            f"Adding action '{function.__name__}' from plugin " +
 86            f"'{plugin}'..."
 87        )
 88
 89    actions[function.__name__] = function
 90    return function
 91
 92
 93def pre_sync_hook(
 94        function: Callable[[Any], Any],
 95    ) -> Callable[[Any], Any]:
 96    """
 97    Register a function as a sync hook to be executed right before sync.
 98    
 99    Parameters
100    ----------
101    function: Callable[[Any], Any]
102        The function to execute right before a sync.
103        
104    Returns
105    -------
106    Another function (this is a decorator function).
107
108    Examples
109    --------
110    >>> from meerschaum.plugins import pre_sync_hook
111    >>>
112    >>> @pre_sync_hook
113    ... def log_sync(pipe, **kwargs):
114    ...     print(f"About to sync {pipe} with kwargs:\n{kwargs}.")
115    >>>
116    """
117    with _locks['_pre_sync_hooks']:
118        try:
119            if function.__module__ not in _pre_sync_hooks:
120                _pre_sync_hooks[function.__module__] = []
121            _pre_sync_hooks[function.__module__].append(function)
122        except Exception as e:
123            from meerschaum.utils.warnings import warn
124            warn(e)
125    return function
126
127
128def post_sync_hook(
129    function: Callable[[Any], Any],
130) -> Callable[[Any], Any]:
131    """
132    Register a function as a sync hook to be executed upon completion of a sync.
133    
134    Parameters
135    ----------
136    function: Callable[[Any], Any]
137        The function to execute upon completion of a sync.
138        
139    Returns
140    -------
141    Another function (this is a decorator function).
142
143    Examples
144    --------
145    >>> from meerschaum.plugins import post_sync_hook
146    >>> from meerschaum.utils.misc import interval_str
147    >>> from datetime import timedelta
148    >>>
149    >>> @post_sync_hook
150    ... def log_sync(pipe, success_tuple, duration=None, **kwargs):
151    ...     duration_delta = timedelta(seconds=duration)
152    ...     duration_text = interval_str(duration_delta)
153    ...     print(f"It took {duration_text} to sync {pipe}.")
154    >>>
155    """
156    with _locks['_post_sync_hooks']:
157        try:
158            if function.__module__ not in _post_sync_hooks:
159                _post_sync_hooks[function.__module__] = []
160            _post_sync_hooks[function.__module__].append(function)
161        except Exception as e:
162            from meerschaum.utils.warnings import warn
163            warn(e)
164    return function
165
166
167_plugin_endpoints_to_pages = {}
168def web_page(
169        page: Union[str, None, Callable[[Any], Any]] = None,
170        login_required: bool = True,
171        **kwargs
172    ) -> Any:
173    """
174    Quickly add pages to the dash application.
175
176    Examples
177    --------
178    >>> import meerschaum as mrsm
179    >>> from meerschaum.plugins import web_page
180    >>> html = mrsm.attempt_import('dash.html')
181    >>> 
182    >>> @web_page('foo/bar', login_required=False)
183    >>> def foo_bar():
184    ...     return html.Div([html.H1("Hello, World!")])
185    >>> 
186    """
187    page_str = None
188
189    def _decorator(_func: Callable[[Any], Any]) -> Callable[[Any], Any]:
190        nonlocal page_str
191
192        @functools.wraps(_func)
193        def wrapper(*_args, **_kwargs):
194            return _func(*_args, **_kwargs)
195
196        if page_str is None:
197            page_str = _func.__name__
198
199        page_str = page_str.lstrip('/').rstrip('/').strip()
200        _plugin_endpoints_to_pages[page_str] = {
201            'function': _func,
202            'login_required': login_required,
203        }
204        return wrapper
205
206    if callable(page):
207        decorator_to_return = _decorator(page)
208        page_str = page.__name__
209    else:
210        decorator_to_return = _decorator
211        page_str = page
212
213    return decorator_to_return
214
215
216_dash_plugins = {}
217def dash_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
218    """
219    Execute the function when starting the Dash application.
220    """
221    with _locks['_dash_plugins']:
222        try:
223            if function.__module__ not in _dash_plugins:
224                _dash_plugins[function.__module__] = []
225            _dash_plugins[function.__module__].append(function)
226        except Exception as e:
227            from meerschaum.utils.warnings import warn
228            warn(e)
229    return function
230
231
232def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
233    """
234    Execute the function when initializing the Meerschaum API module.
235    Useful for lazy-loading heavy plugins only when the API is started,
236    such as when editing the `meerschaum.api.app` FastAPI app.
237    
238    The FastAPI app will be passed as the only parameter.
239    
240    Examples
241    --------
242    >>> from meerschaum.plugins import api_plugin
243    >>>
244    >>> @api_plugin
245    >>> def initialize_plugin(app):
246    ...     @app.get('/my/new/path')
247    ...     def new_path():
248    ...         return {'message': 'It works!'}
249    >>>
250    """
251    with _locks['_api_plugins']:
252        try:
253            if function.__module__ not in _api_plugins:
254                _api_plugins[function.__module__] = []
255            _api_plugins[function.__module__].append(function)
256        except Exception as e:
257            from meerschaum.utils.warnings import warn
258            warn(e)
259    return function
260
261
262_synced_symlinks: bool = False
263def sync_plugins_symlinks(debug: bool = False, warn: bool = True) -> None:
264    """
265    Update the plugins 
266    """
267    global _synced_symlinks
268    with _locks['_synced_symlinks']:
269        if _synced_symlinks:
270            return
271
272    import sys, os, pathlib, time
273    from collections import defaultdict
274    import importlib.util
275    from meerschaum.utils.misc import flatten_list, make_symlink, is_symlink
276    from meerschaum.utils.warnings import error, warn as _warn
277    from meerschaum.config.static import STATIC_CONFIG
278    from meerschaum.utils.venv import Venv, activate_venv, deactivate_venv, is_venv_active
279    from meerschaum.config._paths import (
280        PLUGINS_RESOURCES_PATH,
281        PLUGINS_ARCHIVES_RESOURCES_PATH,
282        PLUGINS_INIT_PATH,
283        PLUGINS_DIR_PATHS,
284        PLUGINS_INTERNAL_LOCK_PATH,
285    )
286
287    ### If the lock file exists, sleep for up to a second or until it's removed before continuing.
288    with _locks['PLUGINS_INTERNAL_LOCK_PATH']:
289        if PLUGINS_INTERNAL_LOCK_PATH.exists():
290            lock_sleep_total     = STATIC_CONFIG['plugins']['lock_sleep_total']
291            lock_sleep_increment = STATIC_CONFIG['plugins']['lock_sleep_increment']
292            lock_start = time.perf_counter()
293            while (
294                (time.perf_counter() - lock_start) < lock_sleep_total
295            ):
296                time.sleep(lock_sleep_increment)
297                if not PLUGINS_INTERNAL_LOCK_PATH.exists():
298                    break
299                try:
300                    PLUGINS_INTERNAL_LOCK_PATH.unlink()
301                except Exception as e:
302                    if warn:
303                        _warn(f"Error while removing lockfile {PLUGINS_INTERNAL_LOCK_PATH}:\n{e}")
304                    break
305
306        ### Begin locking from other processes.
307        try:
308            PLUGINS_INTERNAL_LOCK_PATH.touch()
309        except Exception as e:
310            if warn:
311                _warn(f"Unable to create lockfile {PLUGINS_INTERNAL_LOCK_PATH}:\n{e}")
312
313    with _locks['internal_plugins']:
314
315        try:
316            from importlib.metadata import entry_points
317        except ImportError:
318            importlib_metadata = attempt_import('importlib_metadata', lazy=False)
319            entry_points = importlib_metadata.entry_points
320
321        ### NOTE: Allow plugins to be installed via `pip`.
322        packaged_plugin_paths = []
323        try:
324            discovered_packaged_plugins_eps = entry_points(group='meerschaum.plugins')
325        except TypeError:
326            discovered_packaged_plugins_eps = []
327
328        for ep in discovered_packaged_plugins_eps:
329            module_name = ep.name
330            for package_file_path in ep.dist.files:
331                if package_file_path.suffix != '.py':
332                    continue
333                if str(package_file_path) == f'{module_name}.py':
334                    packaged_plugin_paths.append(package_file_path.locate())
335                elif str(package_file_path) == f'{module_name}/__init__.py':
336                    packaged_plugin_paths.append(package_file_path.locate().parent)
337
338        if is_symlink(PLUGINS_RESOURCES_PATH) or not PLUGINS_RESOURCES_PATH.exists():
339            try:
340                PLUGINS_RESOURCES_PATH.unlink()
341            except Exception:
342                pass
343
344        PLUGINS_RESOURCES_PATH.mkdir(exist_ok=True)
345
346        existing_symlinked_paths = [
347            (PLUGINS_RESOURCES_PATH / item) 
348            for item in os.listdir(PLUGINS_RESOURCES_PATH)
349        ]
350        for plugins_path in PLUGINS_DIR_PATHS:
351            if not plugins_path.exists():
352                plugins_path.mkdir(exist_ok=True, parents=True)
353        plugins_to_be_symlinked = list(flatten_list(
354            [
355                [
356                    (plugins_path / item)
357                    for item in os.listdir(plugins_path)
358                    if (
359                        not item.startswith('.')
360                    ) and (item not in ('__pycache__', '__init__.py'))
361                ]
362                for plugins_path in PLUGINS_DIR_PATHS
363            ]
364        ))
365        plugins_to_be_symlinked.extend(packaged_plugin_paths)
366
367        ### Check for duplicates.
368        seen_plugins = defaultdict(lambda: 0)
369        for plugin_path in plugins_to_be_symlinked:
370            plugin_name = plugin_path.stem
371            seen_plugins[plugin_name] += 1
372        for plugin_name, plugin_count in seen_plugins.items():
373            if plugin_count > 1:
374                if warn:
375                    _warn(f"Found duplicate plugins named '{plugin_name}'.")
376
377        for plugin_symlink_path in existing_symlinked_paths:
378            real_path = pathlib.Path(os.path.realpath(plugin_symlink_path))
379
380            ### Remove invalid symlinks.
381            if real_path not in plugins_to_be_symlinked:
382                if not is_symlink(plugin_symlink_path):
383                    continue
384                try:
385                    plugin_symlink_path.unlink()
386                except Exception as e:
387                    pass
388
389            ### Remove valid plugins from the to-be-symlinked list.
390            else:
391                plugins_to_be_symlinked.remove(real_path)
392
393        for plugin_path in plugins_to_be_symlinked:
394            plugin_symlink_path = PLUGINS_RESOURCES_PATH / plugin_path.name
395            try:
396                ### There might be duplicate folders (e.g. __pycache__).
397                if (
398                    plugin_symlink_path.exists()
399                    and
400                    plugin_symlink_path.is_dir()
401                    and
402                    not is_symlink(plugin_symlink_path)
403                ):
404                    continue
405                success, msg = make_symlink(plugin_path, plugin_symlink_path)
406            except Exception as e:
407                success, msg = False, str(e)
408            if not success:
409                if warn:
410                    _warn(
411                        f"Failed to create symlink {plugin_symlink_path} "
412                        + f"to {plugin_path}:\n    {msg}"
413                    )
414
415    ### Release symlink lock file in case other processes need it.
416    with _locks['PLUGINS_INTERNAL_LOCK_PATH']:
417        try:
418            if PLUGINS_INTERNAL_LOCK_PATH.exists():
419                PLUGINS_INTERNAL_LOCK_PATH.unlink()
420        ### Sometimes competing threads will delete the lock file at the same time.
421        except FileNotFoundError:
422            pass
423        except Exception as e:
424            if warn:
425                _warn(f"Error cleaning up lockfile {PLUGINS_INTERNAL_LOCK_PATH}:\n{e}")
426
427        try:
428            if not PLUGINS_INIT_PATH.exists():
429                PLUGINS_INIT_PATH.touch()
430        except Exception as e:
431            error(f"Failed to create the file '{PLUGINS_INIT_PATH}':\n{e}")
432
433    with _locks['__path__']:
434        if str(PLUGINS_RESOURCES_PATH.parent) not in __path__:
435            __path__.append(str(PLUGINS_RESOURCES_PATH.parent))
436
437    with _locks['_synced_symlinks']:
438        _synced_symlinks = True
439
440
441def import_plugins(
442    *plugins_to_import: Union[str, List[str], None],
443    warn: bool = True,
444) -> Union[
445    'ModuleType', Tuple['ModuleType', None]
446]:
447    """
448    Import the Meerschaum plugins directory.
449
450    Parameters
451    ----------
452    plugins_to_import: Union[str, List[str], None]
453        If provided, only import the specified plugins.
454        Otherwise import the entire plugins module. May be a string, list, or `None`.
455        Defaults to `None`.
456
457    Returns
458    -------
459    A module of list of modules, depening on the number of plugins provided.
460
461    """
462    import sys
463    import os
464    import importlib
465    from meerschaum.utils.misc import flatten_list
466    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
467    from meerschaum.utils.venv import is_venv_active, activate_venv, deactivate_venv, Venv
468    from meerschaum.utils.warnings import warn as _warn
469    plugins_to_import = list(plugins_to_import)
470    with _locks['sys.path']:
471
472        ### Since plugins may depend on other plugins,
473        ### we need to activate the virtual environments for library plugins.
474        ### This logic exists in `Plugin.activate_venv()`,
475        ### but that code requires the plugin's module to already be imported.
476        ### It's not a guarantee of correct activation order,
477        ### e.g. if a library plugin pins a specific package and another 
478        plugins_names = get_plugins_names()
479        already_active_venvs = [is_venv_active(plugin_name) for plugin_name in plugins_names]
480
481        if not sys.path or sys.path[0] != str(PLUGINS_RESOURCES_PATH.parent):
482            sys.path.insert(0, str(PLUGINS_RESOURCES_PATH.parent))
483
484        if not plugins_to_import:
485            for plugin_name in plugins_names:
486                activate_venv(plugin_name)
487            try:
488                imported_plugins = importlib.import_module(PLUGINS_RESOURCES_PATH.stem)
489            except ImportError as e:
490                _warn(f"Failed to import the plugins module:\n    {e}")
491                import traceback
492                traceback.print_exc()
493                imported_plugins = None
494            for plugin_name in plugins_names:
495                if plugin_name in already_active_venvs:
496                    continue
497                deactivate_venv(plugin_name)
498
499        else:
500            imported_plugins = []
501            for plugin_name in flatten_list(plugins_to_import):
502                plugin = Plugin(plugin_name)
503                try:
504                    with Venv(plugin):
505                        imported_plugins.append(
506                            importlib.import_module(
507                                f'{PLUGINS_RESOURCES_PATH.stem}.{plugin_name}'
508                            )
509                        )
510                except Exception as e:
511                    _warn(
512                        f"Failed to import plugin '{plugin_name}':\n    "
513                        + f"{e}\n\nHere's a stacktrace:",
514                        stack = False,
515                    )
516                    from meerschaum.utils.formatting import get_console
517                    get_console().print_exception(
518                        suppress = [
519                            'meerschaum/plugins/__init__.py',
520                            importlib,
521                            importlib._bootstrap,
522                        ]
523                    )
524                    imported_plugins.append(None)
525
526        if imported_plugins is None and warn:
527            _warn(f"Failed to import plugins.", stacklevel=3)
528
529        if str(PLUGINS_RESOURCES_PATH.parent) in sys.path:
530            sys.path.remove(str(PLUGINS_RESOURCES_PATH.parent))
531
532    if isinstance(imported_plugins, list):
533        return (imported_plugins[0] if len(imported_plugins) == 1 else tuple(imported_plugins))
534    return imported_plugins
535
536
537def from_plugin_import(plugin_import_name: str, *attrs: str) -> Any:
538    """
539    Emulate the `from module import x` behavior.
540
541    Parameters
542    ----------
543    plugin_import_name: str
544        The import name of the plugin's module.
545        Separate submodules with '.' (e.g. 'compose.utils.pipes')
546
547    attrs: str
548        Names of the attributes to return.
549
550    Returns
551    -------
552    Objects from a plugin's submodule.
553    If multiple objects are provided, return a tuple.
554
555    Examples
556    --------
557    >>> init = from_plugin_import('compose.utils', 'init')
558    >>> with mrsm.Venv('compose'):
559    ...     cf = init()
560    >>> build_parent_pipe, get_defined_pipes = from_plugin_import(
561    ...     'compose.utils.pipes',
562    ...     'build_parent_pipe',
563    ...     'get_defined_pipes',
564    ... )
565    >>> parent_pipe = build_parent_pipe(cf)
566    >>> defined_pipes = get_defined_pipes(cf)
567    """
568    import importlib
569    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
570    from meerschaum.utils.warnings import warn as _warn
571    if plugin_import_name.startswith('plugins.'):
572        plugin_import_name = plugin_import_name[len('plugins.'):]
573    plugin_import_parts = plugin_import_name.split('.')
574    plugin_root_name = plugin_import_parts[0]
575    plugin = mrsm.Plugin(plugin_root_name)
576
577    submodule_import_name = '.'.join(
578        [PLUGINS_RESOURCES_PATH.stem]
579        + plugin_import_parts
580    )
581    if len(attrs) == 0:
582        raise ValueError(f"Provide which attributes to return from '{submodule_import_name}'.")
583
584    attrs_to_return = []
585    with mrsm.Venv(plugin):
586        if plugin.module is None:
587            return None
588
589        try:
590            submodule = importlib.import_module(submodule_import_name)
591        except ImportError as e:
592            _warn(
593                f"Failed to import plugin '{submodule_import_name}':\n    "
594                + f"{e}\n\nHere's a stacktrace:",
595                stack=False,
596            )
597            from meerschaum.utils.formatting import get_console
598            get_console().print_exception(
599                suppress=[
600                    'meerschaum/plugins/__init__.py',
601                    importlib,
602                    importlib._bootstrap,
603                ]
604            )
605            return None
606
607        for attr in attrs:
608            try:
609                attrs_to_return.append(getattr(submodule, attr))
610            except Exception:
611                _warn(f"Failed to access '{attr}' from '{submodule_import_name}'.")
612                attrs_to_return.append(None)
613        
614        if len(attrs) == 1:
615            return attrs_to_return[0]
616
617        return tuple(attrs_to_return)
618
619
620def load_plugins(debug: bool = False, shell: bool = False) -> None:
621    """
622    Import Meerschaum plugins and update the actions dictionary.
623    """
624    from inspect import isfunction, getmembers
625    from meerschaum.actions import __all__ as _all, modules
626    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
627    from meerschaum.utils.packages import get_modules_from_package
628    if debug:
629        from meerschaum.utils.debug import dprint
630
631    _plugins_names, plugins_modules = get_modules_from_package(
632        import_plugins(),
633        names = True,
634        recursive = True,
635        modules_venvs = True
636    )
637    ### I'm appending here to keep from redefining the modules list.
638    new_modules = (
639        [
640            mod for mod in modules
641            if not mod.__name__.startswith(PLUGINS_RESOURCES_PATH.stem + '.')
642        ]
643        + plugins_modules
644    )
645    n_mods = len(modules)
646    for mod in new_modules:
647        modules.append(mod)
648    for i in range(n_mods):
649        modules.pop(0)
650
651    for module in plugins_modules:
652        for name, func in getmembers(module):
653            if not isfunction(func):
654                continue
655            if name == module.__name__.split('.')[-1]:
656                make_action(func, **{'shell': shell, 'debug': debug})
657
658
659def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
660    """
661    Reload plugins back into memory.
662
663    Parameters
664    ----------
665    plugins: Optional[List[str]], default None
666        The plugins to reload. `None` will reload all plugins.
667
668    """
669    import sys
670    if debug:
671        from meerschaum.utils.debug import dprint
672
673    if not plugins:
674        plugins = get_plugins_names()
675    for plugin_name in plugins:
676        if debug:
677            dprint(f"Reloading plugin '{plugin_name}'...")
678        mod_name = 'plugins.' + str(plugin_name)
679        if mod_name in sys.modules:
680            del sys.modules[mod_name]
681    load_plugins(debug=debug)
682
683
684def get_plugins(*to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]:
685    """
686    Return a list of `Plugin` objects.
687
688    Parameters
689    ----------
690    to_load:
691        If specified, only load specific plugins.
692        Otherwise return all plugins.
693
694    try_import: bool, default True
695        If `True`, allow for plugins to be imported.
696    """
697    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
698    import os
699    sync_plugins_symlinks()
700    _plugins = [
701        Plugin(name) for name in (
702            to_load or [
703                (
704                    name if (PLUGINS_RESOURCES_PATH / name).is_dir()
705                    else name[:-3]
706                ) for name in os.listdir(PLUGINS_RESOURCES_PATH) if name != '__init__.py'
707            ]
708        )
709    ]
710    plugins = tuple(plugin for plugin in _plugins if plugin.is_installed(try_import=try_import))
711    if len(to_load) == 1:
712        if len(plugins) == 0:
713            raise ValueError(f"Plugin '{to_load[0]}' is not installed.")
714        return plugins[0]
715    return plugins
716
717
718def get_plugins_names(*to_load, **kw) -> List[str]:
719    """
720    Return a list of installed plugins.
721    """
722    return [plugin.name for plugin in get_plugins(*to_load, **kw)]
723
724
725def get_plugins_modules(*to_load, **kw) -> List['ModuleType']:
726    """
727    Return a list of modules for the installed plugins, or `None` if things break.
728    """
729    return [plugin.module for plugin in get_plugins(*to_load, **kw)]
730
731
732def get_data_plugins() -> List[Plugin]:
733    """
734    Only return the modules of plugins with either `fetch()` or `sync()` functions.
735    """
736    import inspect
737    plugins = get_plugins()
738    data_names = {'sync', 'fetch'}
739    data_plugins = []
740    for plugin in plugins:
741        for name, ob in inspect.getmembers(plugin.module):
742            if not inspect.isfunction(ob):
743                continue
744            if name not in data_names:
745                continue
746            data_plugins.append(plugin)
747    return data_plugins
748
749
750def add_plugin_argument(*args, **kwargs) -> None:
751    """
752    Add argparse arguments under the 'Plugins options' group.
753    Takes the same parameters as the regular argparse `add_argument()` function.
754
755    Examples
756    --------
757    >>> add_plugin_argument('--foo', type=int, help="This is my help text!")
758    >>> 
759    """
760    from meerschaum._internal.arguments._parser import groups, _seen_plugin_args, parser
761    from meerschaum.utils.warnings import warn, error
762    _parent_plugin_name = _get_parent_plugin(2)
763    title = f"Plugin '{_parent_plugin_name}' options" if _parent_plugin_name else 'Custom options'
764    group_key = 'plugin_' + (_parent_plugin_name or '')
765    if group_key not in groups:
766        groups[group_key] = parser.add_argument_group(
767            title = title,
768        )
769        _seen_plugin_args[group_key] = set()
770    try:
771        if str(args) not in _seen_plugin_args[group_key]:
772            groups[group_key].add_argument(*args, **kwargs)
773            _seen_plugin_args[group_key].add(str(args))
774    except Exception as e:
775        warn(e)
776
777
778def _get_parent_plugin(stacklevel: int = 1) -> Union[str, None]:
779    """If this function is called from outside a Meerschaum plugin, it will return None."""
780    import inspect, re
781    try:
782        parent_globals = inspect.stack()[stacklevel][0].f_globals
783        parent_file = parent_globals.get('__file__', '')
784        return parent_globals['__name__'].replace('plugins.', '').split('.')[0]
785    except Exception as e:
786        return None
class Plugin:
 37class Plugin:
 38    """Handle packaging of Meerschaum plugins."""
 39    def __init__(
 40        self,
 41        name: str,
 42        version: Optional[str] = None,
 43        user_id: Optional[int] = None,
 44        required: Optional[List[str]] = None,
 45        attributes: Optional[Dict[str, Any]] = None,
 46        archive_path: Optional[pathlib.Path] = None,
 47        venv_path: Optional[pathlib.Path] = None,
 48        repo_connector: Optional['mrsm.connectors.api.APIConnector'] = None,
 49        repo: Union['mrsm.connectors.api.APIConnector', str, None] = None,
 50    ):
 51        from meerschaum.config.static import STATIC_CONFIG
 52        sep = STATIC_CONFIG['plugins']['repo_separator']
 53        _repo = None
 54        if sep in name:
 55            try:
 56                name, _repo = name.split(sep)
 57            except Exception as e:
 58                error(f"Invalid plugin name: '{name}'")
 59        self._repo_in_name = _repo
 60
 61        if attributes is None:
 62            attributes = {}
 63        self.name = name
 64        self.attributes = attributes
 65        self.user_id = user_id
 66        self._version = version
 67        if required:
 68            self._required = required
 69        self.archive_path = (
 70            archive_path if archive_path is not None
 71            else PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz"
 72        )
 73        self.venv_path = (
 74            venv_path if venv_path is not None
 75            else VIRTENV_RESOURCES_PATH / self.name
 76        )
 77        self._repo_connector = repo_connector
 78        self._repo_keys = repo
 79
 80
 81    @property
 82    def repo_connector(self):
 83        """
 84        Return the repository connector for this plugin.
 85        NOTE: This imports the `connectors` module, which imports certain plugin modules.
 86        """
 87        if self._repo_connector is None:
 88            from meerschaum.connectors.parse import parse_repo_keys
 89
 90            repo_keys = self._repo_keys or self._repo_in_name
 91            if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name:
 92                error(
 93                    f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'."
 94                )
 95            repo_connector = parse_repo_keys(repo_keys)
 96            self._repo_connector = repo_connector
 97        return self._repo_connector
 98
 99
100    @property
101    def version(self):
102        """
103        Return the plugin's module version is defined (`__version__`) if it's defined.
104        """
105        if self._version is None:
106            try:
107                self._version = self.module.__version__
108            except Exception as e:
109                self._version = None
110        return self._version
111
112
113    @property
114    def module(self):
115        """
116        Return the Python module of the underlying plugin.
117        """
118        if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None:
119            if self.__file__ is None:
120                return None
121            from meerschaum.plugins import import_plugins
122            self._module = import_plugins(str(self), warn=False)
123        return self._module
124
125
126    @property
127    def __file__(self) -> Union[str, None]:
128        """
129        Return the file path (str) of the plugin if it exists, otherwise `None`.
130        """
131        if self.__dict__.get('_module', None) is not None:
132            return self.module.__file__
133
134        potential_dir = PLUGINS_RESOURCES_PATH / self.name
135        if (
136            potential_dir.exists()
137            and potential_dir.is_dir()
138            and (potential_dir / '__init__.py').exists()
139        ):
140            return str((potential_dir / '__init__.py').as_posix())
141
142        potential_file = PLUGINS_RESOURCES_PATH / (self.name + '.py')
143        if potential_file.exists() and not potential_file.is_dir():
144            return str(potential_file.as_posix())
145
146        return None
147
148
149    @property
150    def requirements_file_path(self) -> Union[pathlib.Path, None]:
151        """
152        If a file named `requirements.txt` exists, return its path.
153        """
154        if self.__file__ is None:
155            return None
156        path = pathlib.Path(self.__file__).parent / 'requirements.txt'
157        if not path.exists():
158            return None
159        return path
160
161
162    def is_installed(self, **kw) -> bool:
163        """
164        Check whether a plugin is correctly installed.
165
166        Returns
167        -------
168        A `bool` indicating whether a plugin exists and is successfully imported.
169        """
170        return self.__file__ is not None
171
172
173    def make_tar(self, debug: bool = False) -> pathlib.Path:
174        """
175        Compress the plugin's source files into a `.tar.gz` archive and return the archive's path.
176
177        Parameters
178        ----------
179        debug: bool, default False
180            Verbosity toggle.
181
182        Returns
183        -------
184        A `pathlib.Path` to the archive file's path.
185
186        """
187        import tarfile, pathlib, subprocess, fnmatch
188        from meerschaum.utils.debug import dprint
189        from meerschaum.utils.packages import attempt_import
190        pathspec = attempt_import('pathspec', debug=debug)
191
192        if not self.__file__:
193            from meerschaum.utils.warnings import error
194            error(f"Could not find file for plugin '{self}'.")
195        if '__init__.py' in self.__file__ or os.path.isdir(self.__file__):
196            path = self.__file__.replace('__init__.py', '')
197            is_dir = True
198        else:
199            path = self.__file__
200            is_dir = False
201
202        old_cwd = os.getcwd()
203        real_parent_path = pathlib.Path(os.path.realpath(path)).parent
204        os.chdir(real_parent_path)
205
206        default_patterns_to_ignore = [
207            '.pyc',
208            '__pycache__/',
209            'eggs/',
210            '__pypackages__/',
211            '.git',
212        ]
213
214        def parse_gitignore() -> 'Set[str]':
215            gitignore_path = pathlib.Path(path) / '.gitignore'
216            if not gitignore_path.exists():
217                return set(default_patterns_to_ignore)
218            with open(gitignore_path, 'r', encoding='utf-8') as f:
219                gitignore_text = f.read()
220            return set(pathspec.PathSpec.from_lines(
221                pathspec.patterns.GitWildMatchPattern,
222                default_patterns_to_ignore + gitignore_text.splitlines()
223            ).match_tree(path))
224
225        patterns_to_ignore = parse_gitignore() if is_dir else set()
226
227        if debug:
228            dprint(f"Patterns to ignore:\n{patterns_to_ignore}")
229
230        with tarfile.open(self.archive_path, 'w:gz') as tarf:
231            if not is_dir:
232                tarf.add(f"{self.name}.py")
233            else:
234                for root, dirs, files in os.walk(self.name):
235                    for f in files:
236                        good_file = True
237                        fp = os.path.join(root, f)
238                        for pattern in patterns_to_ignore:
239                            if pattern in str(fp) or f.startswith('.'):
240                                good_file = False
241                                break
242                        if good_file:
243                            if debug:
244                                dprint(f"Adding '{fp}'...")
245                            tarf.add(fp)
246
247        ### clean up and change back to old directory
248        os.chdir(old_cwd)
249
250        ### change to 775 to avoid permissions issues with the API in a Docker container
251        self.archive_path.chmod(0o775)
252
253        if debug:
254            dprint(f"Created archive '{self.archive_path}'.")
255        return self.archive_path
256
257
258    def install(
259        self,
260        skip_deps: bool = False,
261        force: bool = False,
262        debug: bool = False,
263    ) -> SuccessTuple:
264        """
265        Extract a plugin's tar archive to the plugins directory.
266        
267        This function checks if the plugin is already installed and if the version is equal or
268        greater than the existing installation.
269
270        Parameters
271        ----------
272        skip_deps: bool, default False
273            If `True`, do not install dependencies.
274
275        force: bool, default False
276            If `True`, continue with installation, even if required packages fail to install.
277
278        debug: bool, default False
279            Verbosity toggle.
280
281        Returns
282        -------
283        A `SuccessTuple` of success (bool) and a message (str).
284
285        """
286        if self.full_name in _ongoing_installations:
287            return True, f"Already installing plugin '{self}'."
288        _ongoing_installations.add(self.full_name)
289        from meerschaum.utils.warnings import warn, error
290        if debug:
291            from meerschaum.utils.debug import dprint
292        import tarfile
293        import re
294        import ast
295        from meerschaum.plugins import sync_plugins_symlinks
296        from meerschaum.utils.packages import attempt_import, determine_version, reload_meerschaum
297        from meerschaum.utils.venv import init_venv
298        from meerschaum.utils.misc import safely_extract_tar
299        old_cwd = os.getcwd()
300        old_version = ''
301        new_version = ''
302        temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name
303        temp_dir.mkdir(exist_ok=True)
304
305        if not self.archive_path.exists():
306            return False, f"Missing archive file for plugin '{self}'."
307        if self.version is not None:
308            old_version = self.version
309            if debug:
310                dprint(f"Found existing version '{old_version}' for plugin '{self}'.")
311
312        if debug:
313            dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...")
314
315        try:
316            with tarfile.open(self.archive_path, 'r:gz') as tarf:
317                safely_extract_tar(tarf, temp_dir)
318        except Exception as e:
319            warn(e)
320            return False, f"Failed to extract plugin '{self.name}'."
321
322        ### search for version information
323        files = os.listdir(temp_dir)
324        
325        if str(files[0]) == self.name:
326            is_dir = True
327        elif str(files[0]) == self.name + '.py':
328            is_dir = False
329        else:
330            error(f"Unknown format encountered for plugin '{self}'.")
331
332        fpath = temp_dir / files[0]
333        if is_dir:
334            fpath = fpath / '__init__.py'
335
336        init_venv(self.name, debug=debug)
337        with open(fpath, 'r', encoding='utf-8') as f:
338            init_lines = f.readlines()
339        new_version = None
340        for line in init_lines:
341            if '__version__' not in line:
342                continue
343            version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip())
344            if not version_match:
345                continue
346            new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip())
347            break
348        if not new_version:
349            warn(
350                f"No `__version__` defined for plugin '{self}'. "
351                + "Assuming new version...",
352                stack = False,
353            )
354
355        packaging_version = attempt_import('packaging.version')
356        try:
357            is_new_version = (not new_version and not old_version) or (
358                packaging_version.parse(old_version) < packaging_version.parse(new_version)
359            )
360            is_same_version = new_version and old_version and (
361                packaging_version.parse(old_version) == packaging_version.parse(new_version)
362            )
363        except Exception:
364            is_new_version, is_same_version = True, False
365
366        ### Determine where to permanently store the new plugin.
367        plugin_installation_dir_path = PLUGINS_DIR_PATHS[0]
368        for path in PLUGINS_DIR_PATHS:
369            files_in_plugins_dir = os.listdir(path)
370            if (
371                self.name in files_in_plugins_dir
372                or
373                (self.name + '.py') in files_in_plugins_dir
374            ):
375                plugin_installation_dir_path = path
376                break
377
378        success_msg = (
379            f"Successfully installed plugin '{self}'"
380            + ("\n    (skipped dependencies)" if skip_deps else "")
381            + "."
382        )
383        success, abort = None, None
384
385        if is_same_version and not force:
386            success, msg = True, (
387                f"Plugin '{self}' is up-to-date (version {old_version}).\n" +
388                "    Install again with `-f` or `--force` to reinstall."
389            )
390            abort = True
391        elif is_new_version or force:
392            for src_dir, dirs, files in os.walk(temp_dir):
393                if success is not None:
394                    break
395                dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path))
396                if not os.path.exists(dst_dir):
397                    os.mkdir(dst_dir)
398                for f in files:
399                    src_file = os.path.join(src_dir, f)
400                    dst_file = os.path.join(dst_dir, f)
401                    if os.path.exists(dst_file):
402                        os.remove(dst_file)
403
404                    if debug:
405                        dprint(f"Moving '{src_file}' to '{dst_dir}'...")
406                    try:
407                        shutil.move(src_file, dst_dir)
408                    except Exception:
409                        success, msg = False, (
410                            f"Failed to install plugin '{self}': " +
411                            f"Could not move file '{src_file}' to '{dst_dir}'"
412                        )
413                        print(msg)
414                        break
415            if success is None:
416                success, msg = True, success_msg
417        else:
418            success, msg = False, (
419                f"Your installed version of plugin '{self}' ({old_version}) is higher than "
420                + f"attempted version {new_version}."
421            )
422
423        shutil.rmtree(temp_dir)
424        os.chdir(old_cwd)
425
426        ### Reload the plugin's module.
427        sync_plugins_symlinks(debug=debug)
428        if '_module' in self.__dict__:
429            del self.__dict__['_module']
430        init_venv(venv=self.name, force=True, debug=debug)
431        reload_meerschaum(debug=debug)
432
433        ### if we've already failed, return here
434        if not success or abort:
435            _ongoing_installations.remove(self.full_name)
436            return success, msg
437
438        ### attempt to install dependencies
439        dependencies_installed = skip_deps or self.install_dependencies(force=force, debug=debug)
440        if not dependencies_installed:
441            _ongoing_installations.remove(self.full_name)
442            return False, f"Failed to install dependencies for plugin '{self}'."
443
444        ### handling success tuple, bool, or other (typically None)
445        setup_tuple = self.setup(debug=debug)
446        if isinstance(setup_tuple, tuple):
447            if not setup_tuple[0]:
448                success, msg = setup_tuple
449        elif isinstance(setup_tuple, bool):
450            if not setup_tuple:
451                success, msg = False, (
452                    f"Failed to run post-install setup for plugin '{self}'." + '\n' +
453                    f"Check `setup()` in '{self.__file__}' for more information " +
454                    f"(no error message provided)."
455                )
456            else:
457                success, msg = True, success_msg
458        elif setup_tuple is None:
459            success = True
460            msg = (
461                f"Post-install for plugin '{self}' returned None. " +
462                f"Assuming plugin successfully installed."
463            )
464            warn(msg)
465        else:
466            success = False
467            msg = (
468                f"Post-install for plugin '{self}' returned unexpected value " +
469                f"of type '{type(setup_tuple)}': {setup_tuple}"
470            )
471
472        _ongoing_installations.remove(self.full_name)
473        module = self.module
474        return success, msg
475
476
477    def remove_archive(
478        self,        
479        debug: bool = False
480    ) -> SuccessTuple:
481        """Remove a plugin's archive file."""
482        if not self.archive_path.exists():
483            return True, f"Archive file for plugin '{self}' does not exist."
484        try:
485            self.archive_path.unlink()
486        except Exception as e:
487            return False, f"Failed to remove archive for plugin '{self}':\n{e}"
488        return True, "Success"
489
490
491    def remove_venv(
492        self,        
493        debug: bool = False
494    ) -> SuccessTuple:
495        """Remove a plugin's virtual environment."""
496        if not self.venv_path.exists():
497            return True, f"Virtual environment for plugin '{self}' does not exist."
498        try:
499            shutil.rmtree(self.venv_path)
500        except Exception as e:
501            return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}"
502        return True, "Success"
503
504
505    def uninstall(self, debug: bool = False) -> SuccessTuple:
506        """
507        Remove a plugin, its virtual environment, and archive file.
508        """
509        from meerschaum.utils.packages import reload_meerschaum
510        from meerschaum.plugins import sync_plugins_symlinks
511        from meerschaum.utils.warnings import warn, info
512        warnings_thrown_count: int = 0
513        max_warnings: int = 3
514
515        if not self.is_installed():
516            info(
517                f"Plugin '{self.name}' doesn't seem to be installed.\n    "
518                + "Checking for artifacts...",
519                stack = False,
520            )
521        else:
522            real_path = pathlib.Path(os.path.realpath(self.__file__))
523            try:
524                if real_path.name == '__init__.py':
525                    shutil.rmtree(real_path.parent)
526                else:
527                    real_path.unlink()
528            except Exception as e:
529                warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False)
530                warnings_thrown_count += 1
531            else:
532                info(f"Removed source files for plugin '{self.name}'.")
533
534        if self.venv_path.exists():
535            success, msg = self.remove_venv(debug=debug)
536            if not success:
537                warn(msg, stack=False)
538                warnings_thrown_count += 1
539            else:
540                info(f"Removed virtual environment from plugin '{self.name}'.")
541
542        success = warnings_thrown_count < max_warnings
543        sync_plugins_symlinks(debug=debug)
544        self.deactivate_venv(force=True, debug=debug)
545        reload_meerschaum(debug=debug)
546        return success, (
547            f"Successfully uninstalled plugin '{self}'." if success
548            else f"Failed to uninstall plugin '{self}'."
549        )
550
551
552    def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]:
553        """
554        If exists, run the plugin's `setup()` function.
555
556        Parameters
557        ----------
558        *args: str
559            The positional arguments passed to the `setup()` function.
560            
561        debug: bool, default False
562            Verbosity toggle.
563
564        **kw: Any
565            The keyword arguments passed to the `setup()` function.
566
567        Returns
568        -------
569        A `SuccessTuple` or `bool` indicating success.
570
571        """
572        from meerschaum.utils.debug import dprint
573        import inspect
574        _setup = None
575        for name, fp in inspect.getmembers(self.module):
576            if name == 'setup' and inspect.isfunction(fp):
577                _setup = fp
578                break
579
580        ### assume success if no setup() is found (not necessary)
581        if _setup is None:
582            return True
583
584        sig = inspect.signature(_setup)
585        has_debug, has_kw = ('debug' in sig.parameters), False
586        for k, v in sig.parameters.items():
587            if '**' in str(v):
588                has_kw = True
589                break
590
591        _kw = {}
592        if has_kw:
593            _kw.update(kw)
594        if has_debug:
595            _kw['debug'] = debug
596
597        if debug:
598            dprint(f"Running setup for plugin '{self}'...")
599        try:
600            self.activate_venv(debug=debug)
601            return_tuple = _setup(*args, **_kw)
602            self.deactivate_venv(debug=debug)
603        except Exception as e:
604            return False, str(e)
605
606        if isinstance(return_tuple, tuple):
607            return return_tuple
608        if isinstance(return_tuple, bool):
609            return return_tuple, f"Setup for Plugin '{self.name}' did not return a message."
610        if return_tuple is None:
611            return False, f"Setup for Plugin '{self.name}' returned None."
612        return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"
613
614
615    def get_dependencies(
616        self,
617        debug: bool = False,
618    ) -> List[str]:
619        """
620        If the Plugin has specified dependencies in a list called `required`, return the list.
621        
622        **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages.
623        Meerschaum plugins may also specify connector keys for a repo after `'@'`.
624
625        Parameters
626        ----------
627        debug: bool, default False
628            Verbosity toggle.
629
630        Returns
631        -------
632        A list of required packages and plugins (str).
633
634        """
635        if '_required' in self.__dict__:
636            return self._required
637
638        ### If the plugin has not yet been imported,
639        ### infer the dependencies from the source text.
640        ### This is not super robust, and it doesn't feel right
641        ### having multiple versions of the logic.
642        ### This is necessary when determining the activation order
643        ### without having import the module.
644        ### For consistency's sake, the module-less method does not cache the requirements.
645        if self.__dict__.get('_module', None) is None:
646            file_path = self.__file__
647            if file_path is None:
648                return []
649            with open(file_path, 'r', encoding='utf-8') as f:
650                text = f.read()
651
652            if 'required' not in text:
653                return []
654
655            ### This has some limitations:
656            ### It relies on `required` being manually declared.
657            ### We lose the ability to dynamically alter the `required` list,
658            ### which is why we've kept the module-reliant method below.
659            import ast, re
660            ### NOTE: This technically would break 
661            ### if `required` was the very first line of the file.
662            req_start_match = re.search(r'\nrequired(:\s*)?.*=', text)
663            if not req_start_match:
664                return []
665            req_start = req_start_match.start()
666            equals_sign = req_start + text[req_start:].find('=')
667
668            ### Dependencies may have brackets within the strings, so push back the index.
669            first_opening_brace = equals_sign + 1 + text[equals_sign:].find('[')
670            if first_opening_brace == -1:
671                return []
672
673            next_closing_brace = equals_sign + 1 + text[equals_sign:].find(']')
674            if next_closing_brace == -1:
675                return []
676
677            start_ix = first_opening_brace + 1
678            end_ix = next_closing_brace
679
680            num_braces = 0
681            while True:
682                if '[' not in text[start_ix:end_ix]:
683                    break
684                num_braces += 1
685                start_ix = end_ix
686                end_ix += text[end_ix + 1:].find(']') + 1
687
688            req_end = end_ix + 1
689            req_text = (
690                text[(first_opening_brace-1):req_end]
691                .lstrip()
692                .replace('=', '', 1)
693                .lstrip()
694                .rstrip()
695            )
696            try:
697                required = ast.literal_eval(req_text)
698            except Exception as e:
699                warn(
700                    f"Unable to determine requirements for plugin '{self.name}' "
701                    + "without importing the module.\n"
702                    + "    This may be due to dynamically setting the global `required` list.\n"
703                    + f"    {e}"
704                )
705                return []
706            return required
707
708        import inspect
709        self.activate_venv(dependencies=False, debug=debug)
710        required = []
711        for name, val in inspect.getmembers(self.module):
712            if name == 'required':
713                required = val
714                break
715        self._required = required
716        self.deactivate_venv(dependencies=False, debug=debug)
717        return required
718
719
720    def get_required_plugins(self, debug: bool=False) -> List[meerschaum.plugins.Plugin]:
721        """
722        Return a list of required Plugin objects.
723        """
724        from meerschaum.utils.warnings import warn
725        from meerschaum.config import get_config
726        from meerschaum.config.static import STATIC_CONFIG
727        plugins = []
728        _deps = self.get_dependencies(debug=debug)
729        sep = STATIC_CONFIG['plugins']['repo_separator']
730        plugin_names = [
731            _d[len('plugin:'):] for _d in _deps
732            if _d.startswith('plugin:') and len(_d) > len('plugin:')
733        ]
734        default_repo_keys = get_config('meerschaum', 'default_repository')
735        for _plugin_name in plugin_names:
736            if sep in _plugin_name:
737                try:
738                    _plugin_name, _repo_keys = _plugin_name.split(sep)
739                except Exception as e:
740                    _repo_keys = default_repo_keys
741                    warn(
742                        f"Invalid repo keys for required plugin '{_plugin_name}'.\n    "
743                        + f"Will try to use '{_repo_keys}' instead.",
744                        stack = False,
745                    )
746            else:
747                _repo_keys = default_repo_keys
748            plugins.append(Plugin(_plugin_name, repo=_repo_keys))
749        return plugins
750
751
752    def get_required_packages(self, debug: bool=False) -> List[str]:
753        """
754        Return the required package names (excluding plugins).
755        """
756        _deps = self.get_dependencies(debug=debug)
757        return [_d for _d in _deps if not _d.startswith('plugin:')]
758
759
760    def activate_venv(self, dependencies: bool=True, debug: bool=False, **kw) -> bool:
761        """
762        Activate the virtual environments for the plugin and its dependencies.
763
764        Parameters
765        ----------
766        dependencies: bool, default True
767            If `True`, activate the virtual environments for required plugins.
768
769        Returns
770        -------
771        A bool indicating success.
772        """
773        from meerschaum.utils.venv import venv_target_path
774        from meerschaum.utils.packages import activate_venv
775        from meerschaum.utils.misc import make_symlink, is_symlink
776        from meerschaum.config._paths import PACKAGE_ROOT_PATH
777
778        if dependencies:
779            for plugin in self.get_required_plugins(debug=debug):
780                plugin.activate_venv(debug=debug, **kw)
781
782        vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True)
783        venv_meerschaum_path = vtp / 'meerschaum'
784
785        try:
786            success, msg = True, "Success"
787            if is_symlink(venv_meerschaum_path):
788                if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH:
789                    venv_meerschaum_path.unlink()
790                    success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH)
791        except Exception as e:
792            success, msg = False, str(e)
793        if not success:
794            warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}")
795
796        return activate_venv(self.name, debug=debug, **kw)
797
798
799    def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool:
800        """
801        Deactivate the virtual environments for the plugin and its dependencies.
802
803        Parameters
804        ----------
805        dependencies: bool, default True
806            If `True`, deactivate the virtual environments for required plugins.
807
808        Returns
809        -------
810        A bool indicating success.
811        """
812        from meerschaum.utils.packages import deactivate_venv
813        success = deactivate_venv(self.name, debug=debug, **kw)
814        if dependencies:
815            for plugin in self.get_required_plugins(debug=debug):
816                plugin.deactivate_venv(debug=debug, **kw)
817        return success
818
819
820    def install_dependencies(
821        self,
822        force: bool = False,
823        debug: bool = False,
824    ) -> bool:
825        """
826        If specified, install dependencies.
827        
828        **NOTE:** Dependencies that start with `'plugin:'` will be installed as
829        Meerschaum plugins from the same repository as this Plugin.
830        To install from a different repository, add the repo keys after `'@'`
831        (e.g. `'plugin:foo@api:bar'`).
832
833        Parameters
834        ----------
835        force: bool, default False
836            If `True`, continue with the installation, even if some
837            required packages fail to install.
838
839        debug: bool, default False
840            Verbosity toggle.
841
842        Returns
843        -------
844        A bool indicating success.
845        """
846        from meerschaum.utils.packages import pip_install, venv_contains_package
847        from meerschaum.utils.warnings import warn, info
848        _deps = self.get_dependencies(debug=debug)
849        if not _deps and self.requirements_file_path is None:
850            return True
851
852        plugins = self.get_required_plugins(debug=debug)
853        for _plugin in plugins:
854            if _plugin.name == self.name:
855                warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False)
856                continue
857            _success, _msg = _plugin.repo_connector.install_plugin(
858                _plugin.name, debug=debug, force=force
859            )
860            if not _success:
861                warn(
862                    f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'"
863                    + f" for plugin '{self.name}':\n" + _msg,
864                    stack = False,
865                )
866                if not force:
867                    warn(
868                        "Try installing with the `--force` flag to continue anyway.",
869                        stack = False,
870                    )
871                    return False
872                info(
873                    "Continuing with installation despite the failure "
874                    + "(careful, things might be broken!)...",
875                    icon = False
876                )
877
878
879        ### First step: parse `requirements.txt` if it exists.
880        if self.requirements_file_path is not None:
881            if not pip_install(
882                requirements_file_path=self.requirements_file_path,
883                venv=self.name, debug=debug
884            ):
885                warn(
886                    f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.",
887                    stack = False,
888                )
889                if not force:
890                    warn(
891                        "Try installing with `--force` to continue anyway.",
892                        stack = False,
893                    )
894                    return False
895                info(
896                    "Continuing with installation despite the failure "
897                    + "(careful, things might be broken!)...",
898                    icon = False
899                )
900
901
902        ### Don't reinstall packages that are already included in required plugins.
903        packages = []
904        _packages = self.get_required_packages(debug=debug)
905        accounted_for_packages = set()
906        for package_name in _packages:
907            for plugin in plugins:
908                if venv_contains_package(package_name, plugin.name):
909                    accounted_for_packages.add(package_name)
910                    break
911        packages = [pkg for pkg in _packages if pkg not in accounted_for_packages]
912
913        ### Attempt pip packages installation.
914        if packages:
915            for package in packages:
916                if not pip_install(package, venv=self.name, debug=debug):
917                    warn(
918                        f"Failed to install required package '{package}'"
919                        + f" for plugin '{self.name}'.",
920                        stack = False,
921                    )
922                    if not force:
923                        warn(
924                            "Try installing with `--force` to continue anyway.",
925                            stack = False,
926                        )
927                        return False
928                    info(
929                        "Continuing with installation despite the failure "
930                        + "(careful, things might be broken!)...",
931                        icon = False
932                    )
933        return True
934
935
936    @property
937    def full_name(self) -> str:
938        """
939        Include the repo keys with the plugin's name.
940        """
941        from meerschaum.config.static import STATIC_CONFIG
942        sep = STATIC_CONFIG['plugins']['repo_separator']
943        return self.name + sep + str(self.repo_connector)
944
945
946    def __str__(self):
947        return self.name
948
949
950    def __repr__(self):
951        return f"Plugin('{self.name}', repo='{self.repo_connector}')"
952
953
954    def __del__(self):
955        pass

Handle packaging of Meerschaum plugins.

Plugin( name: str, version: Optional[str] = None, user_id: Optional[int] = None, required: Optional[List[str]] = None, attributes: Optional[Dict[str, Any]] = None, archive_path: Optional[pathlib.Path] = None, venv_path: Optional[pathlib.Path] = None, repo_connector: Optional[meerschaum.connectors.APIConnector] = None, repo: Union[meerschaum.connectors.APIConnector, str, NoneType] = None)
39    def __init__(
40        self,
41        name: str,
42        version: Optional[str] = None,
43        user_id: Optional[int] = None,
44        required: Optional[List[str]] = None,
45        attributes: Optional[Dict[str, Any]] = None,
46        archive_path: Optional[pathlib.Path] = None,
47        venv_path: Optional[pathlib.Path] = None,
48        repo_connector: Optional['mrsm.connectors.api.APIConnector'] = None,
49        repo: Union['mrsm.connectors.api.APIConnector', str, None] = None,
50    ):
51        from meerschaum.config.static import STATIC_CONFIG
52        sep = STATIC_CONFIG['plugins']['repo_separator']
53        _repo = None
54        if sep in name:
55            try:
56                name, _repo = name.split(sep)
57            except Exception as e:
58                error(f"Invalid plugin name: '{name}'")
59        self._repo_in_name = _repo
60
61        if attributes is None:
62            attributes = {}
63        self.name = name
64        self.attributes = attributes
65        self.user_id = user_id
66        self._version = version
67        if required:
68            self._required = required
69        self.archive_path = (
70            archive_path if archive_path is not None
71            else PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz"
72        )
73        self.venv_path = (
74            venv_path if venv_path is not None
75            else VIRTENV_RESOURCES_PATH / self.name
76        )
77        self._repo_connector = repo_connector
78        self._repo_keys = repo
name
attributes
user_id
archive_path
venv_path
repo_connector
81    @property
82    def repo_connector(self):
83        """
84        Return the repository connector for this plugin.
85        NOTE: This imports the `connectors` module, which imports certain plugin modules.
86        """
87        if self._repo_connector is None:
88            from meerschaum.connectors.parse import parse_repo_keys
89
90            repo_keys = self._repo_keys or self._repo_in_name
91            if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name:
92                error(
93                    f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'."
94                )
95            repo_connector = parse_repo_keys(repo_keys)
96            self._repo_connector = repo_connector
97        return self._repo_connector

Return the repository connector for this plugin. NOTE: This imports the connectors module, which imports certain plugin modules.

version
100    @property
101    def version(self):
102        """
103        Return the plugin's module version is defined (`__version__`) if it's defined.
104        """
105        if self._version is None:
106            try:
107                self._version = self.module.__version__
108            except Exception as e:
109                self._version = None
110        return self._version

Return the plugin's module version is defined (__version__) if it's defined.

module
113    @property
114    def module(self):
115        """
116        Return the Python module of the underlying plugin.
117        """
118        if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None:
119            if self.__file__ is None:
120                return None
121            from meerschaum.plugins import import_plugins
122            self._module = import_plugins(str(self), warn=False)
123        return self._module

Return the Python module of the underlying plugin.

requirements_file_path: Optional[pathlib.Path]
149    @property
150    def requirements_file_path(self) -> Union[pathlib.Path, None]:
151        """
152        If a file named `requirements.txt` exists, return its path.
153        """
154        if self.__file__ is None:
155            return None
156        path = pathlib.Path(self.__file__).parent / 'requirements.txt'
157        if not path.exists():
158            return None
159        return path

If a file named requirements.txt exists, return its path.

def is_installed(self, **kw) -> bool:
162    def is_installed(self, **kw) -> bool:
163        """
164        Check whether a plugin is correctly installed.
165
166        Returns
167        -------
168        A `bool` indicating whether a plugin exists and is successfully imported.
169        """
170        return self.__file__ is not None

Check whether a plugin is correctly installed.

Returns
  • A bool indicating whether a plugin exists and is successfully imported.
def make_tar(self, debug: bool = False) -> pathlib.Path:
173    def make_tar(self, debug: bool = False) -> pathlib.Path:
174        """
175        Compress the plugin's source files into a `.tar.gz` archive and return the archive's path.
176
177        Parameters
178        ----------
179        debug: bool, default False
180            Verbosity toggle.
181
182        Returns
183        -------
184        A `pathlib.Path` to the archive file's path.
185
186        """
187        import tarfile, pathlib, subprocess, fnmatch
188        from meerschaum.utils.debug import dprint
189        from meerschaum.utils.packages import attempt_import
190        pathspec = attempt_import('pathspec', debug=debug)
191
192        if not self.__file__:
193            from meerschaum.utils.warnings import error
194            error(f"Could not find file for plugin '{self}'.")
195        if '__init__.py' in self.__file__ or os.path.isdir(self.__file__):
196            path = self.__file__.replace('__init__.py', '')
197            is_dir = True
198        else:
199            path = self.__file__
200            is_dir = False
201
202        old_cwd = os.getcwd()
203        real_parent_path = pathlib.Path(os.path.realpath(path)).parent
204        os.chdir(real_parent_path)
205
206        default_patterns_to_ignore = [
207            '.pyc',
208            '__pycache__/',
209            'eggs/',
210            '__pypackages__/',
211            '.git',
212        ]
213
214        def parse_gitignore() -> 'Set[str]':
215            gitignore_path = pathlib.Path(path) / '.gitignore'
216            if not gitignore_path.exists():
217                return set(default_patterns_to_ignore)
218            with open(gitignore_path, 'r', encoding='utf-8') as f:
219                gitignore_text = f.read()
220            return set(pathspec.PathSpec.from_lines(
221                pathspec.patterns.GitWildMatchPattern,
222                default_patterns_to_ignore + gitignore_text.splitlines()
223            ).match_tree(path))
224
225        patterns_to_ignore = parse_gitignore() if is_dir else set()
226
227        if debug:
228            dprint(f"Patterns to ignore:\n{patterns_to_ignore}")
229
230        with tarfile.open(self.archive_path, 'w:gz') as tarf:
231            if not is_dir:
232                tarf.add(f"{self.name}.py")
233            else:
234                for root, dirs, files in os.walk(self.name):
235                    for f in files:
236                        good_file = True
237                        fp = os.path.join(root, f)
238                        for pattern in patterns_to_ignore:
239                            if pattern in str(fp) or f.startswith('.'):
240                                good_file = False
241                                break
242                        if good_file:
243                            if debug:
244                                dprint(f"Adding '{fp}'...")
245                            tarf.add(fp)
246
247        ### clean up and change back to old directory
248        os.chdir(old_cwd)
249
250        ### change to 775 to avoid permissions issues with the API in a Docker container
251        self.archive_path.chmod(0o775)
252
253        if debug:
254            dprint(f"Created archive '{self.archive_path}'.")
255        return self.archive_path

Compress the plugin's source files into a .tar.gz archive and return the archive's path.

Parameters
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pathlib.Path to the archive file's path.
def install( self, skip_deps: bool = False, force: bool = False, debug: bool = False) -> Tuple[bool, str]:
258    def install(
259        self,
260        skip_deps: bool = False,
261        force: bool = False,
262        debug: bool = False,
263    ) -> SuccessTuple:
264        """
265        Extract a plugin's tar archive to the plugins directory.
266        
267        This function checks if the plugin is already installed and if the version is equal or
268        greater than the existing installation.
269
270        Parameters
271        ----------
272        skip_deps: bool, default False
273            If `True`, do not install dependencies.
274
275        force: bool, default False
276            If `True`, continue with installation, even if required packages fail to install.
277
278        debug: bool, default False
279            Verbosity toggle.
280
281        Returns
282        -------
283        A `SuccessTuple` of success (bool) and a message (str).
284
285        """
286        if self.full_name in _ongoing_installations:
287            return True, f"Already installing plugin '{self}'."
288        _ongoing_installations.add(self.full_name)
289        from meerschaum.utils.warnings import warn, error
290        if debug:
291            from meerschaum.utils.debug import dprint
292        import tarfile
293        import re
294        import ast
295        from meerschaum.plugins import sync_plugins_symlinks
296        from meerschaum.utils.packages import attempt_import, determine_version, reload_meerschaum
297        from meerschaum.utils.venv import init_venv
298        from meerschaum.utils.misc import safely_extract_tar
299        old_cwd = os.getcwd()
300        old_version = ''
301        new_version = ''
302        temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name
303        temp_dir.mkdir(exist_ok=True)
304
305        if not self.archive_path.exists():
306            return False, f"Missing archive file for plugin '{self}'."
307        if self.version is not None:
308            old_version = self.version
309            if debug:
310                dprint(f"Found existing version '{old_version}' for plugin '{self}'.")
311
312        if debug:
313            dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...")
314
315        try:
316            with tarfile.open(self.archive_path, 'r:gz') as tarf:
317                safely_extract_tar(tarf, temp_dir)
318        except Exception as e:
319            warn(e)
320            return False, f"Failed to extract plugin '{self.name}'."
321
322        ### search for version information
323        files = os.listdir(temp_dir)
324        
325        if str(files[0]) == self.name:
326            is_dir = True
327        elif str(files[0]) == self.name + '.py':
328            is_dir = False
329        else:
330            error(f"Unknown format encountered for plugin '{self}'.")
331
332        fpath = temp_dir / files[0]
333        if is_dir:
334            fpath = fpath / '__init__.py'
335
336        init_venv(self.name, debug=debug)
337        with open(fpath, 'r', encoding='utf-8') as f:
338            init_lines = f.readlines()
339        new_version = None
340        for line in init_lines:
341            if '__version__' not in line:
342                continue
343            version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip())
344            if not version_match:
345                continue
346            new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip())
347            break
348        if not new_version:
349            warn(
350                f"No `__version__` defined for plugin '{self}'. "
351                + "Assuming new version...",
352                stack = False,
353            )
354
355        packaging_version = attempt_import('packaging.version')
356        try:
357            is_new_version = (not new_version and not old_version) or (
358                packaging_version.parse(old_version) < packaging_version.parse(new_version)
359            )
360            is_same_version = new_version and old_version and (
361                packaging_version.parse(old_version) == packaging_version.parse(new_version)
362            )
363        except Exception:
364            is_new_version, is_same_version = True, False
365
366        ### Determine where to permanently store the new plugin.
367        plugin_installation_dir_path = PLUGINS_DIR_PATHS[0]
368        for path in PLUGINS_DIR_PATHS:
369            files_in_plugins_dir = os.listdir(path)
370            if (
371                self.name in files_in_plugins_dir
372                or
373                (self.name + '.py') in files_in_plugins_dir
374            ):
375                plugin_installation_dir_path = path
376                break
377
378        success_msg = (
379            f"Successfully installed plugin '{self}'"
380            + ("\n    (skipped dependencies)" if skip_deps else "")
381            + "."
382        )
383        success, abort = None, None
384
385        if is_same_version and not force:
386            success, msg = True, (
387                f"Plugin '{self}' is up-to-date (version {old_version}).\n" +
388                "    Install again with `-f` or `--force` to reinstall."
389            )
390            abort = True
391        elif is_new_version or force:
392            for src_dir, dirs, files in os.walk(temp_dir):
393                if success is not None:
394                    break
395                dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path))
396                if not os.path.exists(dst_dir):
397                    os.mkdir(dst_dir)
398                for f in files:
399                    src_file = os.path.join(src_dir, f)
400                    dst_file = os.path.join(dst_dir, f)
401                    if os.path.exists(dst_file):
402                        os.remove(dst_file)
403
404                    if debug:
405                        dprint(f"Moving '{src_file}' to '{dst_dir}'...")
406                    try:
407                        shutil.move(src_file, dst_dir)
408                    except Exception:
409                        success, msg = False, (
410                            f"Failed to install plugin '{self}': " +
411                            f"Could not move file '{src_file}' to '{dst_dir}'"
412                        )
413                        print(msg)
414                        break
415            if success is None:
416                success, msg = True, success_msg
417        else:
418            success, msg = False, (
419                f"Your installed version of plugin '{self}' ({old_version}) is higher than "
420                + f"attempted version {new_version}."
421            )
422
423        shutil.rmtree(temp_dir)
424        os.chdir(old_cwd)
425
426        ### Reload the plugin's module.
427        sync_plugins_symlinks(debug=debug)
428        if '_module' in self.__dict__:
429            del self.__dict__['_module']
430        init_venv(venv=self.name, force=True, debug=debug)
431        reload_meerschaum(debug=debug)
432
433        ### if we've already failed, return here
434        if not success or abort:
435            _ongoing_installations.remove(self.full_name)
436            return success, msg
437
438        ### attempt to install dependencies
439        dependencies_installed = skip_deps or self.install_dependencies(force=force, debug=debug)
440        if not dependencies_installed:
441            _ongoing_installations.remove(self.full_name)
442            return False, f"Failed to install dependencies for plugin '{self}'."
443
444        ### handling success tuple, bool, or other (typically None)
445        setup_tuple = self.setup(debug=debug)
446        if isinstance(setup_tuple, tuple):
447            if not setup_tuple[0]:
448                success, msg = setup_tuple
449        elif isinstance(setup_tuple, bool):
450            if not setup_tuple:
451                success, msg = False, (
452                    f"Failed to run post-install setup for plugin '{self}'." + '\n' +
453                    f"Check `setup()` in '{self.__file__}' for more information " +
454                    f"(no error message provided)."
455                )
456            else:
457                success, msg = True, success_msg
458        elif setup_tuple is None:
459            success = True
460            msg = (
461                f"Post-install for plugin '{self}' returned None. " +
462                f"Assuming plugin successfully installed."
463            )
464            warn(msg)
465        else:
466            success = False
467            msg = (
468                f"Post-install for plugin '{self}' returned unexpected value " +
469                f"of type '{type(setup_tuple)}': {setup_tuple}"
470            )
471
472        _ongoing_installations.remove(self.full_name)
473        module = self.module
474        return success, msg

Extract a plugin's tar archive to the plugins directory.

This function checks if the plugin is already installed and if the version is equal or greater than the existing installation.

Parameters
  • skip_deps (bool, default False): If True, do not install dependencies.
  • force (bool, default False): If True, continue with installation, even if required packages fail to install.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A SuccessTuple of success (bool) and a message (str).
def remove_archive(self, debug: bool = False) -> Tuple[bool, str]:
477    def remove_archive(
478        self,        
479        debug: bool = False
480    ) -> SuccessTuple:
481        """Remove a plugin's archive file."""
482        if not self.archive_path.exists():
483            return True, f"Archive file for plugin '{self}' does not exist."
484        try:
485            self.archive_path.unlink()
486        except Exception as e:
487            return False, f"Failed to remove archive for plugin '{self}':\n{e}"
488        return True, "Success"

Remove a plugin's archive file.

def remove_venv(self, debug: bool = False) -> Tuple[bool, str]:
491    def remove_venv(
492        self,        
493        debug: bool = False
494    ) -> SuccessTuple:
495        """Remove a plugin's virtual environment."""
496        if not self.venv_path.exists():
497            return True, f"Virtual environment for plugin '{self}' does not exist."
498        try:
499            shutil.rmtree(self.venv_path)
500        except Exception as e:
501            return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}"
502        return True, "Success"

Remove a plugin's virtual environment.

def uninstall(self, debug: bool = False) -> Tuple[bool, str]:
505    def uninstall(self, debug: bool = False) -> SuccessTuple:
506        """
507        Remove a plugin, its virtual environment, and archive file.
508        """
509        from meerschaum.utils.packages import reload_meerschaum
510        from meerschaum.plugins import sync_plugins_symlinks
511        from meerschaum.utils.warnings import warn, info
512        warnings_thrown_count: int = 0
513        max_warnings: int = 3
514
515        if not self.is_installed():
516            info(
517                f"Plugin '{self.name}' doesn't seem to be installed.\n    "
518                + "Checking for artifacts...",
519                stack = False,
520            )
521        else:
522            real_path = pathlib.Path(os.path.realpath(self.__file__))
523            try:
524                if real_path.name == '__init__.py':
525                    shutil.rmtree(real_path.parent)
526                else:
527                    real_path.unlink()
528            except Exception as e:
529                warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False)
530                warnings_thrown_count += 1
531            else:
532                info(f"Removed source files for plugin '{self.name}'.")
533
534        if self.venv_path.exists():
535            success, msg = self.remove_venv(debug=debug)
536            if not success:
537                warn(msg, stack=False)
538                warnings_thrown_count += 1
539            else:
540                info(f"Removed virtual environment from plugin '{self.name}'.")
541
542        success = warnings_thrown_count < max_warnings
543        sync_plugins_symlinks(debug=debug)
544        self.deactivate_venv(force=True, debug=debug)
545        reload_meerschaum(debug=debug)
546        return success, (
547            f"Successfully uninstalled plugin '{self}'." if success
548            else f"Failed to uninstall plugin '{self}'."
549        )

Remove a plugin, its virtual environment, and archive file.

def setup( self, *args: str, debug: bool = False, **kw: Any) -> Union[Tuple[bool, str], bool]:
552    def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]:
553        """
554        If exists, run the plugin's `setup()` function.
555
556        Parameters
557        ----------
558        *args: str
559            The positional arguments passed to the `setup()` function.
560            
561        debug: bool, default False
562            Verbosity toggle.
563
564        **kw: Any
565            The keyword arguments passed to the `setup()` function.
566
567        Returns
568        -------
569        A `SuccessTuple` or `bool` indicating success.
570
571        """
572        from meerschaum.utils.debug import dprint
573        import inspect
574        _setup = None
575        for name, fp in inspect.getmembers(self.module):
576            if name == 'setup' and inspect.isfunction(fp):
577                _setup = fp
578                break
579
580        ### assume success if no setup() is found (not necessary)
581        if _setup is None:
582            return True
583
584        sig = inspect.signature(_setup)
585        has_debug, has_kw = ('debug' in sig.parameters), False
586        for k, v in sig.parameters.items():
587            if '**' in str(v):
588                has_kw = True
589                break
590
591        _kw = {}
592        if has_kw:
593            _kw.update(kw)
594        if has_debug:
595            _kw['debug'] = debug
596
597        if debug:
598            dprint(f"Running setup for plugin '{self}'...")
599        try:
600            self.activate_venv(debug=debug)
601            return_tuple = _setup(*args, **_kw)
602            self.deactivate_venv(debug=debug)
603        except Exception as e:
604            return False, str(e)
605
606        if isinstance(return_tuple, tuple):
607            return return_tuple
608        if isinstance(return_tuple, bool):
609            return return_tuple, f"Setup for Plugin '{self.name}' did not return a message."
610        if return_tuple is None:
611            return False, f"Setup for Plugin '{self.name}' returned None."
612        return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"

If exists, run the plugin's setup() function.

Parameters
  • *args (str): The positional arguments passed to the setup() function.
  • debug (bool, default False): Verbosity toggle.
  • **kw (Any): The keyword arguments passed to the setup() function.
Returns
  • A SuccessTuple or bool indicating success.
def get_dependencies(self, debug: bool = False) -> List[str]:
615    def get_dependencies(
616        self,
617        debug: bool = False,
618    ) -> List[str]:
619        """
620        If the Plugin has specified dependencies in a list called `required`, return the list.
621        
622        **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages.
623        Meerschaum plugins may also specify connector keys for a repo after `'@'`.
624
625        Parameters
626        ----------
627        debug: bool, default False
628            Verbosity toggle.
629
630        Returns
631        -------
632        A list of required packages and plugins (str).
633
634        """
635        if '_required' in self.__dict__:
636            return self._required
637
638        ### If the plugin has not yet been imported,
639        ### infer the dependencies from the source text.
640        ### This is not super robust, and it doesn't feel right
641        ### having multiple versions of the logic.
642        ### This is necessary when determining the activation order
643        ### without having import the module.
644        ### For consistency's sake, the module-less method does not cache the requirements.
645        if self.__dict__.get('_module', None) is None:
646            file_path = self.__file__
647            if file_path is None:
648                return []
649            with open(file_path, 'r', encoding='utf-8') as f:
650                text = f.read()
651
652            if 'required' not in text:
653                return []
654
655            ### This has some limitations:
656            ### It relies on `required` being manually declared.
657            ### We lose the ability to dynamically alter the `required` list,
658            ### which is why we've kept the module-reliant method below.
659            import ast, re
660            ### NOTE: This technically would break 
661            ### if `required` was the very first line of the file.
662            req_start_match = re.search(r'\nrequired(:\s*)?.*=', text)
663            if not req_start_match:
664                return []
665            req_start = req_start_match.start()
666            equals_sign = req_start + text[req_start:].find('=')
667
668            ### Dependencies may have brackets within the strings, so push back the index.
669            first_opening_brace = equals_sign + 1 + text[equals_sign:].find('[')
670            if first_opening_brace == -1:
671                return []
672
673            next_closing_brace = equals_sign + 1 + text[equals_sign:].find(']')
674            if next_closing_brace == -1:
675                return []
676
677            start_ix = first_opening_brace + 1
678            end_ix = next_closing_brace
679
680            num_braces = 0
681            while True:
682                if '[' not in text[start_ix:end_ix]:
683                    break
684                num_braces += 1
685                start_ix = end_ix
686                end_ix += text[end_ix + 1:].find(']') + 1
687
688            req_end = end_ix + 1
689            req_text = (
690                text[(first_opening_brace-1):req_end]
691                .lstrip()
692                .replace('=', '', 1)
693                .lstrip()
694                .rstrip()
695            )
696            try:
697                required = ast.literal_eval(req_text)
698            except Exception as e:
699                warn(
700                    f"Unable to determine requirements for plugin '{self.name}' "
701                    + "without importing the module.\n"
702                    + "    This may be due to dynamically setting the global `required` list.\n"
703                    + f"    {e}"
704                )
705                return []
706            return required
707
708        import inspect
709        self.activate_venv(dependencies=False, debug=debug)
710        required = []
711        for name, val in inspect.getmembers(self.module):
712            if name == 'required':
713                required = val
714                break
715        self._required = required
716        self.deactivate_venv(dependencies=False, debug=debug)
717        return required

If the Plugin has specified dependencies in a list called required, return the list.

NOTE: Dependecies which start with 'plugin:' are Meerschaum plugins, not pip packages. Meerschaum plugins may also specify connector keys for a repo after '@'.

Parameters
  • debug (bool, default False): Verbosity toggle.
Returns
  • A list of required packages and plugins (str).
def get_required_plugins(self, debug: bool = False) -> List[Plugin]:
720    def get_required_plugins(self, debug: bool=False) -> List[meerschaum.plugins.Plugin]:
721        """
722        Return a list of required Plugin objects.
723        """
724        from meerschaum.utils.warnings import warn
725        from meerschaum.config import get_config
726        from meerschaum.config.static import STATIC_CONFIG
727        plugins = []
728        _deps = self.get_dependencies(debug=debug)
729        sep = STATIC_CONFIG['plugins']['repo_separator']
730        plugin_names = [
731            _d[len('plugin:'):] for _d in _deps
732            if _d.startswith('plugin:') and len(_d) > len('plugin:')
733        ]
734        default_repo_keys = get_config('meerschaum', 'default_repository')
735        for _plugin_name in plugin_names:
736            if sep in _plugin_name:
737                try:
738                    _plugin_name, _repo_keys = _plugin_name.split(sep)
739                except Exception as e:
740                    _repo_keys = default_repo_keys
741                    warn(
742                        f"Invalid repo keys for required plugin '{_plugin_name}'.\n    "
743                        + f"Will try to use '{_repo_keys}' instead.",
744                        stack = False,
745                    )
746            else:
747                _repo_keys = default_repo_keys
748            plugins.append(Plugin(_plugin_name, repo=_repo_keys))
749        return plugins

Return a list of required Plugin objects.

def get_required_packages(self, debug: bool = False) -> List[str]:
752    def get_required_packages(self, debug: bool=False) -> List[str]:
753        """
754        Return the required package names (excluding plugins).
755        """
756        _deps = self.get_dependencies(debug=debug)
757        return [_d for _d in _deps if not _d.startswith('plugin:')]

Return the required package names (excluding plugins).

def activate_venv(self, dependencies: bool = True, debug: bool = False, **kw) -> bool:
760    def activate_venv(self, dependencies: bool=True, debug: bool=False, **kw) -> bool:
761        """
762        Activate the virtual environments for the plugin and its dependencies.
763
764        Parameters
765        ----------
766        dependencies: bool, default True
767            If `True`, activate the virtual environments for required plugins.
768
769        Returns
770        -------
771        A bool indicating success.
772        """
773        from meerschaum.utils.venv import venv_target_path
774        from meerschaum.utils.packages import activate_venv
775        from meerschaum.utils.misc import make_symlink, is_symlink
776        from meerschaum.config._paths import PACKAGE_ROOT_PATH
777
778        if dependencies:
779            for plugin in self.get_required_plugins(debug=debug):
780                plugin.activate_venv(debug=debug, **kw)
781
782        vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True)
783        venv_meerschaum_path = vtp / 'meerschaum'
784
785        try:
786            success, msg = True, "Success"
787            if is_symlink(venv_meerschaum_path):
788                if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH:
789                    venv_meerschaum_path.unlink()
790                    success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH)
791        except Exception as e:
792            success, msg = False, str(e)
793        if not success:
794            warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}")
795
796        return activate_venv(self.name, debug=debug, **kw)

Activate the virtual environments for the plugin and its dependencies.

Parameters
  • dependencies (bool, default True): If True, activate the virtual environments for required plugins.
Returns
  • A bool indicating success.
def deactivate_venv(self, dependencies: bool = True, debug: bool = False, **kw) -> bool:
799    def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool:
800        """
801        Deactivate the virtual environments for the plugin and its dependencies.
802
803        Parameters
804        ----------
805        dependencies: bool, default True
806            If `True`, deactivate the virtual environments for required plugins.
807
808        Returns
809        -------
810        A bool indicating success.
811        """
812        from meerschaum.utils.packages import deactivate_venv
813        success = deactivate_venv(self.name, debug=debug, **kw)
814        if dependencies:
815            for plugin in self.get_required_plugins(debug=debug):
816                plugin.deactivate_venv(debug=debug, **kw)
817        return success

Deactivate the virtual environments for the plugin and its dependencies.

Parameters
  • dependencies (bool, default True): If True, deactivate the virtual environments for required plugins.
Returns
  • A bool indicating success.
def install_dependencies(self, force: bool = False, debug: bool = False) -> bool:
820    def install_dependencies(
821        self,
822        force: bool = False,
823        debug: bool = False,
824    ) -> bool:
825        """
826        If specified, install dependencies.
827        
828        **NOTE:** Dependencies that start with `'plugin:'` will be installed as
829        Meerschaum plugins from the same repository as this Plugin.
830        To install from a different repository, add the repo keys after `'@'`
831        (e.g. `'plugin:foo@api:bar'`).
832
833        Parameters
834        ----------
835        force: bool, default False
836            If `True`, continue with the installation, even if some
837            required packages fail to install.
838
839        debug: bool, default False
840            Verbosity toggle.
841
842        Returns
843        -------
844        A bool indicating success.
845        """
846        from meerschaum.utils.packages import pip_install, venv_contains_package
847        from meerschaum.utils.warnings import warn, info
848        _deps = self.get_dependencies(debug=debug)
849        if not _deps and self.requirements_file_path is None:
850            return True
851
852        plugins = self.get_required_plugins(debug=debug)
853        for _plugin in plugins:
854            if _plugin.name == self.name:
855                warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False)
856                continue
857            _success, _msg = _plugin.repo_connector.install_plugin(
858                _plugin.name, debug=debug, force=force
859            )
860            if not _success:
861                warn(
862                    f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'"
863                    + f" for plugin '{self.name}':\n" + _msg,
864                    stack = False,
865                )
866                if not force:
867                    warn(
868                        "Try installing with the `--force` flag to continue anyway.",
869                        stack = False,
870                    )
871                    return False
872                info(
873                    "Continuing with installation despite the failure "
874                    + "(careful, things might be broken!)...",
875                    icon = False
876                )
877
878
879        ### First step: parse `requirements.txt` if it exists.
880        if self.requirements_file_path is not None:
881            if not pip_install(
882                requirements_file_path=self.requirements_file_path,
883                venv=self.name, debug=debug
884            ):
885                warn(
886                    f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.",
887                    stack = False,
888                )
889                if not force:
890                    warn(
891                        "Try installing with `--force` to continue anyway.",
892                        stack = False,
893                    )
894                    return False
895                info(
896                    "Continuing with installation despite the failure "
897                    + "(careful, things might be broken!)...",
898                    icon = False
899                )
900
901
902        ### Don't reinstall packages that are already included in required plugins.
903        packages = []
904        _packages = self.get_required_packages(debug=debug)
905        accounted_for_packages = set()
906        for package_name in _packages:
907            for plugin in plugins:
908                if venv_contains_package(package_name, plugin.name):
909                    accounted_for_packages.add(package_name)
910                    break
911        packages = [pkg for pkg in _packages if pkg not in accounted_for_packages]
912
913        ### Attempt pip packages installation.
914        if packages:
915            for package in packages:
916                if not pip_install(package, venv=self.name, debug=debug):
917                    warn(
918                        f"Failed to install required package '{package}'"
919                        + f" for plugin '{self.name}'.",
920                        stack = False,
921                    )
922                    if not force:
923                        warn(
924                            "Try installing with `--force` to continue anyway.",
925                            stack = False,
926                        )
927                        return False
928                    info(
929                        "Continuing with installation despite the failure "
930                        + "(careful, things might be broken!)...",
931                        icon = False
932                    )
933        return True

If specified, install dependencies.

NOTE: Dependencies that start with 'plugin:' will be installed as Meerschaum plugins from the same repository as this Plugin. To install from a different repository, add the repo keys after '@' (e.g. 'plugin:foo@api:bar').

Parameters
  • force (bool, default False): If True, continue with the installation, even if some required packages fail to install.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool indicating success.
full_name: str
936    @property
937    def full_name(self) -> str:
938        """
939        Include the repo keys with the plugin's name.
940        """
941        from meerschaum.config.static import STATIC_CONFIG
942        sep = STATIC_CONFIG['plugins']['repo_separator']
943        return self.name + sep + str(self.repo_connector)

Include the repo keys with the plugin's name.

def make_action( function: Callable[[Any], Any], shell: bool = False, activate: bool = True, deactivate: bool = True, debug: bool = False) -> Callable[[Any], Any]:
41def make_action(
42    function: Callable[[Any], Any],
43    shell: bool = False,
44    activate: bool = True,
45    deactivate: bool = True,
46    debug: bool = False
47) -> Callable[[Any], Any]:
48    """
49    Make a function a Meerschaum action. Useful for plugins that are adding multiple actions.
50    
51    Parameters
52    ----------
53    function: Callable[[Any], Any]
54        The function to become a Meerschaum action. Must accept all keyword arguments.
55        
56    shell: bool, default False
57        Not used.
58        
59    Returns
60    -------
61    Another function (this is a decorator function).
62
63    Examples
64    --------
65    >>> from meerschaum.plugins import make_action
66    >>>
67    >>> @make_action
68    ... def my_action(**kw):
69    ...     print('foo')
70    ...     return True, "Success"
71    >>>
72    """
73
74    from meerschaum.actions import actions
75    from meerschaum.utils.formatting import pprint
76    package_name = function.__globals__['__name__']
77    plugin_name = (
78        package_name.split('.')[1]
79        if package_name.startswith('plugins.') else None
80    )
81    plugin = Plugin(plugin_name) if plugin_name else None
82
83    if debug:
84        from meerschaum.utils.debug import dprint
85        dprint(
86            f"Adding action '{function.__name__}' from plugin " +
87            f"'{plugin}'..."
88        )
89
90    actions[function.__name__] = function
91    return function

Make a function a Meerschaum action. Useful for plugins that are adding multiple actions.

Parameters
  • function (Callable[[Any], Any]): The function to become a Meerschaum action. Must accept all keyword arguments.
  • shell (bool, default False): Not used.
Returns
  • Another function (this is a decorator function).
Examples
>>> from meerschaum.plugins import make_action
>>>
>>> @make_action
... def my_action(**kw):
...     print('foo')
...     return True, "Success"
>>>
def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
233def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
234    """
235    Execute the function when initializing the Meerschaum API module.
236    Useful for lazy-loading heavy plugins only when the API is started,
237    such as when editing the `meerschaum.api.app` FastAPI app.
238    
239    The FastAPI app will be passed as the only parameter.
240    
241    Examples
242    --------
243    >>> from meerschaum.plugins import api_plugin
244    >>>
245    >>> @api_plugin
246    >>> def initialize_plugin(app):
247    ...     @app.get('/my/new/path')
248    ...     def new_path():
249    ...         return {'message': 'It works!'}
250    >>>
251    """
252    with _locks['_api_plugins']:
253        try:
254            if function.__module__ not in _api_plugins:
255                _api_plugins[function.__module__] = []
256            _api_plugins[function.__module__].append(function)
257        except Exception as e:
258            from meerschaum.utils.warnings import warn
259            warn(e)
260    return function

Execute the function when initializing the Meerschaum API module. Useful for lazy-loading heavy plugins only when the API is started, such as when editing the meerschaum.api.app FastAPI app.

The FastAPI app will be passed as the only parameter.

Examples
>>> from meerschaum.plugins import api_plugin
>>>
>>> @api_plugin
>>> def initialize_plugin(app):
...     @app.get('/my/new/path')
...     def new_path():
...         return {'message': 'It works!'}
>>>
def dash_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
218def dash_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
219    """
220    Execute the function when starting the Dash application.
221    """
222    with _locks['_dash_plugins']:
223        try:
224            if function.__module__ not in _dash_plugins:
225                _dash_plugins[function.__module__] = []
226            _dash_plugins[function.__module__].append(function)
227        except Exception as e:
228            from meerschaum.utils.warnings import warn
229            warn(e)
230    return function

Execute the function when starting the Dash application.

def web_page( page: Union[str, NoneType, Callable[[Any], Any]] = None, login_required: bool = True, **kwargs) -> Any:
169def web_page(
170        page: Union[str, None, Callable[[Any], Any]] = None,
171        login_required: bool = True,
172        **kwargs
173    ) -> Any:
174    """
175    Quickly add pages to the dash application.
176
177    Examples
178    --------
179    >>> import meerschaum as mrsm
180    >>> from meerschaum.plugins import web_page
181    >>> html = mrsm.attempt_import('dash.html')
182    >>> 
183    >>> @web_page('foo/bar', login_required=False)
184    >>> def foo_bar():
185    ...     return html.Div([html.H1("Hello, World!")])
186    >>> 
187    """
188    page_str = None
189
190    def _decorator(_func: Callable[[Any], Any]) -> Callable[[Any], Any]:
191        nonlocal page_str
192
193        @functools.wraps(_func)
194        def wrapper(*_args, **_kwargs):
195            return _func(*_args, **_kwargs)
196
197        if page_str is None:
198            page_str = _func.__name__
199
200        page_str = page_str.lstrip('/').rstrip('/').strip()
201        _plugin_endpoints_to_pages[page_str] = {
202            'function': _func,
203            'login_required': login_required,
204        }
205        return wrapper
206
207    if callable(page):
208        decorator_to_return = _decorator(page)
209        page_str = page.__name__
210    else:
211        decorator_to_return = _decorator
212        page_str = page
213
214    return decorator_to_return

Quickly add pages to the dash application.

Examples
>>> import meerschaum as mrsm
>>> from meerschaum.plugins import web_page
>>> html = mrsm.attempt_import('dash.html')
>>> 
>>> @web_page('foo/bar', login_required=False)
>>> def foo_bar():
...     return html.Div([html.H1("Hello, World!")])
>>>
def import_plugins( *plugins_to_import: Union[str, List[str], NoneType], warn: bool = True) -> "Union['ModuleType', Tuple['ModuleType', None]]":
442def import_plugins(
443    *plugins_to_import: Union[str, List[str], None],
444    warn: bool = True,
445) -> Union[
446    'ModuleType', Tuple['ModuleType', None]
447]:
448    """
449    Import the Meerschaum plugins directory.
450
451    Parameters
452    ----------
453    plugins_to_import: Union[str, List[str], None]
454        If provided, only import the specified plugins.
455        Otherwise import the entire plugins module. May be a string, list, or `None`.
456        Defaults to `None`.
457
458    Returns
459    -------
460    A module of list of modules, depening on the number of plugins provided.
461
462    """
463    import sys
464    import os
465    import importlib
466    from meerschaum.utils.misc import flatten_list
467    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
468    from meerschaum.utils.venv import is_venv_active, activate_venv, deactivate_venv, Venv
469    from meerschaum.utils.warnings import warn as _warn
470    plugins_to_import = list(plugins_to_import)
471    with _locks['sys.path']:
472
473        ### Since plugins may depend on other plugins,
474        ### we need to activate the virtual environments for library plugins.
475        ### This logic exists in `Plugin.activate_venv()`,
476        ### but that code requires the plugin's module to already be imported.
477        ### It's not a guarantee of correct activation order,
478        ### e.g. if a library plugin pins a specific package and another 
479        plugins_names = get_plugins_names()
480        already_active_venvs = [is_venv_active(plugin_name) for plugin_name in plugins_names]
481
482        if not sys.path or sys.path[0] != str(PLUGINS_RESOURCES_PATH.parent):
483            sys.path.insert(0, str(PLUGINS_RESOURCES_PATH.parent))
484
485        if not plugins_to_import:
486            for plugin_name in plugins_names:
487                activate_venv(plugin_name)
488            try:
489                imported_plugins = importlib.import_module(PLUGINS_RESOURCES_PATH.stem)
490            except ImportError as e:
491                _warn(f"Failed to import the plugins module:\n    {e}")
492                import traceback
493                traceback.print_exc()
494                imported_plugins = None
495            for plugin_name in plugins_names:
496                if plugin_name in already_active_venvs:
497                    continue
498                deactivate_venv(plugin_name)
499
500        else:
501            imported_plugins = []
502            for plugin_name in flatten_list(plugins_to_import):
503                plugin = Plugin(plugin_name)
504                try:
505                    with Venv(plugin):
506                        imported_plugins.append(
507                            importlib.import_module(
508                                f'{PLUGINS_RESOURCES_PATH.stem}.{plugin_name}'
509                            )
510                        )
511                except Exception as e:
512                    _warn(
513                        f"Failed to import plugin '{plugin_name}':\n    "
514                        + f"{e}\n\nHere's a stacktrace:",
515                        stack = False,
516                    )
517                    from meerschaum.utils.formatting import get_console
518                    get_console().print_exception(
519                        suppress = [
520                            'meerschaum/plugins/__init__.py',
521                            importlib,
522                            importlib._bootstrap,
523                        ]
524                    )
525                    imported_plugins.append(None)
526
527        if imported_plugins is None and warn:
528            _warn(f"Failed to import plugins.", stacklevel=3)
529
530        if str(PLUGINS_RESOURCES_PATH.parent) in sys.path:
531            sys.path.remove(str(PLUGINS_RESOURCES_PATH.parent))
532
533    if isinstance(imported_plugins, list):
534        return (imported_plugins[0] if len(imported_plugins) == 1 else tuple(imported_plugins))
535    return imported_plugins

Import the Meerschaum plugins directory.

Parameters
  • plugins_to_import (Union[str, List[str], None]): If provided, only import the specified plugins. Otherwise import the entire plugins module. May be a string, list, or None. Defaults to None.
Returns
  • A module of list of modules, depening on the number of plugins provided.
def from_plugin_import(plugin_import_name: str, *attrs: str) -> Any:
538def from_plugin_import(plugin_import_name: str, *attrs: str) -> Any:
539    """
540    Emulate the `from module import x` behavior.
541
542    Parameters
543    ----------
544    plugin_import_name: str
545        The import name of the plugin's module.
546        Separate submodules with '.' (e.g. 'compose.utils.pipes')
547
548    attrs: str
549        Names of the attributes to return.
550
551    Returns
552    -------
553    Objects from a plugin's submodule.
554    If multiple objects are provided, return a tuple.
555
556    Examples
557    --------
558    >>> init = from_plugin_import('compose.utils', 'init')
559    >>> with mrsm.Venv('compose'):
560    ...     cf = init()
561    >>> build_parent_pipe, get_defined_pipes = from_plugin_import(
562    ...     'compose.utils.pipes',
563    ...     'build_parent_pipe',
564    ...     'get_defined_pipes',
565    ... )
566    >>> parent_pipe = build_parent_pipe(cf)
567    >>> defined_pipes = get_defined_pipes(cf)
568    """
569    import importlib
570    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
571    from meerschaum.utils.warnings import warn as _warn
572    if plugin_import_name.startswith('plugins.'):
573        plugin_import_name = plugin_import_name[len('plugins.'):]
574    plugin_import_parts = plugin_import_name.split('.')
575    plugin_root_name = plugin_import_parts[0]
576    plugin = mrsm.Plugin(plugin_root_name)
577
578    submodule_import_name = '.'.join(
579        [PLUGINS_RESOURCES_PATH.stem]
580        + plugin_import_parts
581    )
582    if len(attrs) == 0:
583        raise ValueError(f"Provide which attributes to return from '{submodule_import_name}'.")
584
585    attrs_to_return = []
586    with mrsm.Venv(plugin):
587        if plugin.module is None:
588            return None
589
590        try:
591            submodule = importlib.import_module(submodule_import_name)
592        except ImportError as e:
593            _warn(
594                f"Failed to import plugin '{submodule_import_name}':\n    "
595                + f"{e}\n\nHere's a stacktrace:",
596                stack=False,
597            )
598            from meerschaum.utils.formatting import get_console
599            get_console().print_exception(
600                suppress=[
601                    'meerschaum/plugins/__init__.py',
602                    importlib,
603                    importlib._bootstrap,
604                ]
605            )
606            return None
607
608        for attr in attrs:
609            try:
610                attrs_to_return.append(getattr(submodule, attr))
611            except Exception:
612                _warn(f"Failed to access '{attr}' from '{submodule_import_name}'.")
613                attrs_to_return.append(None)
614        
615        if len(attrs) == 1:
616            return attrs_to_return[0]
617
618        return tuple(attrs_to_return)

Emulate the from module import x behavior.

Parameters
  • plugin_import_name (str): The import name of the plugin's module. Separate submodules with '.' (e.g. 'compose.utils.pipes')
  • attrs (str): Names of the attributes to return.
Returns
  • Objects from a plugin's submodule.
  • If multiple objects are provided, return a tuple.
Examples
>>> init = from_plugin_import('compose.utils', 'init')
>>> with mrsm.Venv('compose'):
...     cf = init()
>>> build_parent_pipe, get_defined_pipes = from_plugin_import(
...     'compose.utils.pipes',
...     'build_parent_pipe',
...     'get_defined_pipes',
... )
>>> parent_pipe = build_parent_pipe(cf)
>>> defined_pipes = get_defined_pipes(cf)
def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
660def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
661    """
662    Reload plugins back into memory.
663
664    Parameters
665    ----------
666    plugins: Optional[List[str]], default None
667        The plugins to reload. `None` will reload all plugins.
668
669    """
670    import sys
671    if debug:
672        from meerschaum.utils.debug import dprint
673
674    if not plugins:
675        plugins = get_plugins_names()
676    for plugin_name in plugins:
677        if debug:
678            dprint(f"Reloading plugin '{plugin_name}'...")
679        mod_name = 'plugins.' + str(plugin_name)
680        if mod_name in sys.modules:
681            del sys.modules[mod_name]
682    load_plugins(debug=debug)

Reload plugins back into memory.

Parameters
  • plugins (Optional[List[str]], default None): The plugins to reload. None will reload all plugins.
def get_plugins( *to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]:
685def get_plugins(*to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]:
686    """
687    Return a list of `Plugin` objects.
688
689    Parameters
690    ----------
691    to_load:
692        If specified, only load specific plugins.
693        Otherwise return all plugins.
694
695    try_import: bool, default True
696        If `True`, allow for plugins to be imported.
697    """
698    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
699    import os
700    sync_plugins_symlinks()
701    _plugins = [
702        Plugin(name) for name in (
703            to_load or [
704                (
705                    name if (PLUGINS_RESOURCES_PATH / name).is_dir()
706                    else name[:-3]
707                ) for name in os.listdir(PLUGINS_RESOURCES_PATH) if name != '__init__.py'
708            ]
709        )
710    ]
711    plugins = tuple(plugin for plugin in _plugins if plugin.is_installed(try_import=try_import))
712    if len(to_load) == 1:
713        if len(plugins) == 0:
714            raise ValueError(f"Plugin '{to_load[0]}' is not installed.")
715        return plugins[0]
716    return plugins

Return a list of Plugin objects.

Parameters
  • to_load:: If specified, only load specific plugins. Otherwise return all plugins.
  • try_import (bool, default True): If True, allow for plugins to be imported.
def get_data_plugins() -> List[Plugin]:
733def get_data_plugins() -> List[Plugin]:
734    """
735    Only return the modules of plugins with either `fetch()` or `sync()` functions.
736    """
737    import inspect
738    plugins = get_plugins()
739    data_names = {'sync', 'fetch'}
740    data_plugins = []
741    for plugin in plugins:
742        for name, ob in inspect.getmembers(plugin.module):
743            if not inspect.isfunction(ob):
744                continue
745            if name not in data_names:
746                continue
747            data_plugins.append(plugin)
748    return data_plugins

Only return the modules of plugins with either fetch() or sync() functions.

def add_plugin_argument(*args, **kwargs) -> None:
751def add_plugin_argument(*args, **kwargs) -> None:
752    """
753    Add argparse arguments under the 'Plugins options' group.
754    Takes the same parameters as the regular argparse `add_argument()` function.
755
756    Examples
757    --------
758    >>> add_plugin_argument('--foo', type=int, help="This is my help text!")
759    >>> 
760    """
761    from meerschaum._internal.arguments._parser import groups, _seen_plugin_args, parser
762    from meerschaum.utils.warnings import warn, error
763    _parent_plugin_name = _get_parent_plugin(2)
764    title = f"Plugin '{_parent_plugin_name}' options" if _parent_plugin_name else 'Custom options'
765    group_key = 'plugin_' + (_parent_plugin_name or '')
766    if group_key not in groups:
767        groups[group_key] = parser.add_argument_group(
768            title = title,
769        )
770        _seen_plugin_args[group_key] = set()
771    try:
772        if str(args) not in _seen_plugin_args[group_key]:
773            groups[group_key].add_argument(*args, **kwargs)
774            _seen_plugin_args[group_key].add(str(args))
775    except Exception as e:
776        warn(e)

Add argparse arguments under the 'Plugins options' group. Takes the same parameters as the regular argparse add_argument() function.

Examples
>>> add_plugin_argument('--foo', type=int, help="This is my help text!")
>>>
def pre_sync_hook(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
 94def pre_sync_hook(
 95        function: Callable[[Any], Any],
 96    ) -> Callable[[Any], Any]:
 97    """
 98    Register a function as a sync hook to be executed right before sync.
 99    
100    Parameters
101    ----------
102    function: Callable[[Any], Any]
103        The function to execute right before a sync.
104        
105    Returns
106    -------
107    Another function (this is a decorator function).
108
109    Examples
110    --------
111    >>> from meerschaum.plugins import pre_sync_hook
112    >>>
113    >>> @pre_sync_hook
114    ... def log_sync(pipe, **kwargs):
115    ...     print(f"About to sync {pipe} with kwargs:\n{kwargs}.")
116    >>>
117    """
118    with _locks['_pre_sync_hooks']:
119        try:
120            if function.__module__ not in _pre_sync_hooks:
121                _pre_sync_hooks[function.__module__] = []
122            _pre_sync_hooks[function.__module__].append(function)
123        except Exception as e:
124            from meerschaum.utils.warnings import warn
125            warn(e)
126    return function

Register a function as a sync hook to be executed right before sync.

Parameters
----------
function: Callable[[Any], Any]
    The function to execute right before a sync.

Returns
-------
Another function (this is a decorator function).

Examples
--------
>>> from meerschaum.plugins import pre_sync_hook
>>>
>>> @pre_sync_hook
... def log_sync(pipe, **kwargs):
...     print(f"About to sync {pipe} with kwargs:

{kwargs}.")

>

def post_sync_hook(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
129def post_sync_hook(
130    function: Callable[[Any], Any],
131) -> Callable[[Any], Any]:
132    """
133    Register a function as a sync hook to be executed upon completion of a sync.
134    
135    Parameters
136    ----------
137    function: Callable[[Any], Any]
138        The function to execute upon completion of a sync.
139        
140    Returns
141    -------
142    Another function (this is a decorator function).
143
144    Examples
145    --------
146    >>> from meerschaum.plugins import post_sync_hook
147    >>> from meerschaum.utils.misc import interval_str
148    >>> from datetime import timedelta
149    >>>
150    >>> @post_sync_hook
151    ... def log_sync(pipe, success_tuple, duration=None, **kwargs):
152    ...     duration_delta = timedelta(seconds=duration)
153    ...     duration_text = interval_str(duration_delta)
154    ...     print(f"It took {duration_text} to sync {pipe}.")
155    >>>
156    """
157    with _locks['_post_sync_hooks']:
158        try:
159            if function.__module__ not in _post_sync_hooks:
160                _post_sync_hooks[function.__module__] = []
161            _post_sync_hooks[function.__module__].append(function)
162        except Exception as e:
163            from meerschaum.utils.warnings import warn
164            warn(e)
165    return function

Register a function as a sync hook to be executed upon completion of a sync.

Parameters
  • function (Callable[[Any], Any]): The function to execute upon completion of a sync.
Returns
  • Another function (this is a decorator function).
Examples
>>> from meerschaum.plugins import post_sync_hook
>>> from meerschaum.utils.misc import interval_str
>>> from datetime import timedelta
>>>
>>> @post_sync_hook
... def log_sync(pipe, success_tuple, duration=None, **kwargs):
...     duration_delta = timedelta(seconds=duration)
...     duration_text = interval_str(duration_delta)
...     print(f"It took {duration_text} to sync {pipe}.")
>>>