meerschaum.plugins

Expose plugin management APIs from the meerschaum.plugins module.

  1#! /usr/bin/env python
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Expose plugin management APIs from the `meerschaum.plugins` module.
  7"""
  8
  9from __future__ import annotations
 10import functools
 11import meerschaum as mrsm
 12from meerschaum.utils.typing import Callable, Any, Union, Optional, Dict, List, Tuple
 13from meerschaum.utils.threading import Lock, RLock
 14from meerschaum.plugins._Plugin import Plugin
 15
 16_api_plugins: Dict[str, List[Callable[['fastapi.App'], Any]]] = {}
 17_pre_sync_hooks: Dict[str, List[Callable[[Any], Any]]] = {}
 18_post_sync_hooks: Dict[str, List[Callable[[Any], Any]]] = {}
 19_locks = {
 20    '_api_plugins': RLock(),
 21    '_dash_plugins': RLock(),
 22    '_pre_sync_hooks': RLock(),
 23    '_post_sync_hooks': RLock(),
 24    '__path__': RLock(),
 25    'sys.path': RLock(),
 26    'internal_plugins': RLock(),
 27    '_synced_symlinks': RLock(),
 28    'PLUGINS_INTERNAL_LOCK_PATH': RLock(),
 29}
 30__all__ = (
 31    "Plugin", "make_action", "api_plugin", "dash_plugin", "web_page",
 32    "import_plugins", "from_plugin_import",
 33    "reload_plugins", "get_plugins", "get_data_plugins", "add_plugin_argument",
 34    "pre_sync_hook", "post_sync_hook",
 35)
 36__pdoc__ = {
 37    'venvs': False, 'data': False, 'stack': False, 'plugins': False,
 38}
 39
 40def make_action(
 41    function: Callable[[Any], Any],
 42    shell: bool = False,
 43    activate: bool = True,
 44    deactivate: bool = True,
 45    debug: bool = False
 46) -> Callable[[Any], Any]:
 47    """
 48    Make a function a Meerschaum action. Useful for plugins that are adding multiple actions.
 49    
 50    Parameters
 51    ----------
 52    function: Callable[[Any], Any]
 53        The function to become a Meerschaum action. Must accept all keyword arguments.
 54        
 55    shell: bool, default False
 56        Not used.
 57        
 58    Returns
 59    -------
 60    Another function (this is a decorator function).
 61
 62    Examples
 63    --------
 64    >>> from meerschaum.plugins import make_action
 65    >>>
 66    >>> @make_action
 67    ... def my_action(**kw):
 68    ...     print('foo')
 69    ...     return True, "Success"
 70    >>>
 71    """
 72
 73    from meerschaum.actions import actions
 74    from meerschaum.utils.formatting import pprint
 75    package_name = function.__globals__['__name__']
 76    plugin_name = (
 77        package_name.split('.')[1]
 78        if package_name.startswith('plugins.') else None
 79    )
 80    plugin = Plugin(plugin_name) if plugin_name else None
 81
 82    if debug:
 83        from meerschaum.utils.debug import dprint
 84        dprint(
 85            f"Adding action '{function.__name__}' from plugin " +
 86            f"'{plugin}'..."
 87        )
 88
 89    actions[function.__name__] = function
 90    return function
 91
 92
 93def pre_sync_hook(
 94        function: Callable[[Any], Any],
 95    ) -> Callable[[Any], Any]:
 96    """
 97    Register a function as a sync hook to be executed right before sync.
 98    
 99    Parameters
100    ----------
101    function: Callable[[Any], Any]
102        The function to execute right before a sync.
103        
104    Returns
105    -------
106    Another function (this is a decorator function).
107
108    Examples
109    --------
110    >>> from meerschaum.plugins import pre_sync_hook
111    >>>
112    >>> @pre_sync_hook
113    ... def log_sync(pipe, **kwargs):
114    ...     print(f"About to sync {pipe} with kwargs:\n{kwargs}.")
115    >>>
116    """
117    with _locks['_pre_sync_hooks']:
118        try:
119            if function.__module__ not in _pre_sync_hooks:
120                _pre_sync_hooks[function.__module__] = []
121            _pre_sync_hooks[function.__module__].append(function)
122        except Exception as e:
123            from meerschaum.utils.warnings import warn
124            warn(e)
125    return function
126
127
128def post_sync_hook(
129        function: Callable[[Any], Any],
130    ) -> Callable[[Any], Any]:
131    """
132    Register a function as a sync hook to be executed upon completion of a sync.
133    
134    Parameters
135    ----------
136    function: Callable[[Any], Any]
137        The function to execute upon completion of a sync.
138        
139    Returns
140    -------
141    Another function (this is a decorator function).
142
143    Examples
144    --------
145    >>> from meerschaum.plugins import post_sync_hook
146    >>>
147    >>> @post_sync_hook
148    ... def log_sync(pipe, success_tuple, duration=None, **kwargs):
149    ...     print(f"It took {round(duration, 2)} seconds to sync {pipe}.")
150    >>>
151    """
152    with _locks['_post_sync_hooks']:
153        try:
154            if function.__module__ not in _post_sync_hooks:
155                _post_sync_hooks[function.__module__] = []
156            _post_sync_hooks[function.__module__].append(function)
157        except Exception as e:
158            from meerschaum.utils.warnings import warn
159            warn(e)
160    return function
161
162
163_plugin_endpoints_to_pages = {}
164def web_page(
165        page: Union[str, None, Callable[[Any], Any]] = None,
166        login_required: bool = True,
167        **kwargs
168    ) -> Any:
169    """
170    Quickly add pages to the dash application.
171
172    Examples
173    --------
174    >>> import meerschaum as mrsm
175    >>> from meerschaum.plugins import web_page
176    >>> html = mrsm.attempt_import('dash.html')
177    >>> 
178    >>> @web_page('foo/bar', login_required=False)
179    >>> def foo_bar():
180    ...     return html.Div([html.H1("Hello, World!")])
181    >>> 
182    """
183    page_str = None
184
185    def _decorator(_func: Callable[[Any], Any]) -> Callable[[Any], Any]:
186        nonlocal page_str
187
188        @functools.wraps(_func)
189        def wrapper(*_args, **_kwargs):
190            return _func(*_args, **_kwargs)
191
192        if page_str is None:
193            page_str = _func.__name__
194
195        page_str = page_str.lstrip('/').rstrip('/').strip()
196        _plugin_endpoints_to_pages[page_str] = {
197            'function': _func,
198            'login_required': login_required,
199        }
200        return wrapper
201
202    if callable(page):
203        decorator_to_return = _decorator(page)
204        page_str = page.__name__
205    else:
206        decorator_to_return = _decorator
207        page_str = page
208
209    return decorator_to_return
210
211
212_dash_plugins = {}
213def dash_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
214    """
215    Execute the function when starting the Dash application.
216    """
217    with _locks['_dash_plugins']:
218        try:
219            if function.__module__ not in _dash_plugins:
220                _dash_plugins[function.__module__] = []
221            _dash_plugins[function.__module__].append(function)
222        except Exception as e:
223            from meerschaum.utils.warnings import warn
224            warn(e)
225    return function
226
227
228def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
229    """
230    Execute the function when initializing the Meerschaum API module.
231    Useful for lazy-loading heavy plugins only when the API is started,
232    such as when editing the `meerschaum.api.app` FastAPI app.
233    
234    The FastAPI app will be passed as the only parameter.
235    
236    Examples
237    --------
238    >>> from meerschaum.plugins import api_plugin
239    >>>
240    >>> @api_plugin
241    >>> def initialize_plugin(app):
242    ...     @app.get('/my/new/path')
243    ...     def new_path():
244    ...         return {'message': 'It works!'}
245    >>>
246    """
247    with _locks['_api_plugins']:
248        try:
249            if function.__module__ not in _api_plugins:
250                _api_plugins[function.__module__] = []
251            _api_plugins[function.__module__].append(function)
252        except Exception as e:
253            from meerschaum.utils.warnings import warn
254            warn(e)
255    return function
256
257
258_synced_symlinks: bool = False
259def sync_plugins_symlinks(debug: bool = False, warn: bool = True) -> None:
260    """
261    Update the plugins 
262    """
263    global _synced_symlinks
264    with _locks['_synced_symlinks']:
265        if _synced_symlinks:
266            return
267
268    import sys, os, pathlib, time
269    from collections import defaultdict
270    import importlib.util
271    from meerschaum.utils.misc import flatten_list, make_symlink, is_symlink
272    from meerschaum.utils.warnings import error, warn as _warn
273    from meerschaum.config.static import STATIC_CONFIG
274    from meerschaum.utils.venv import Venv, activate_venv, deactivate_venv, is_venv_active
275    from meerschaum.config._paths import (
276        PLUGINS_RESOURCES_PATH,
277        PLUGINS_ARCHIVES_RESOURCES_PATH,
278        PLUGINS_INIT_PATH,
279        PLUGINS_DIR_PATHS,
280        PLUGINS_INTERNAL_LOCK_PATH,
281    )
282
283    ### If the lock file exists, sleep for up to a second or until it's removed before continuing.
284    with _locks['PLUGINS_INTERNAL_LOCK_PATH']:
285        if PLUGINS_INTERNAL_LOCK_PATH.exists():
286            lock_sleep_total     = STATIC_CONFIG['plugins']['lock_sleep_total']
287            lock_sleep_increment = STATIC_CONFIG['plugins']['lock_sleep_increment']
288            lock_start = time.perf_counter()
289            while (
290                (time.perf_counter() - lock_start) < lock_sleep_total
291            ):
292                time.sleep(lock_sleep_increment)
293                if not PLUGINS_INTERNAL_LOCK_PATH.exists():
294                    break
295                try:
296                    PLUGINS_INTERNAL_LOCK_PATH.unlink()
297                except Exception as e:
298                    if warn:
299                        _warn(f"Error while removing lockfile {PLUGINS_INTERNAL_LOCK_PATH}:\n{e}")
300                    break
301
302        ### Begin locking from other processes.
303        try:
304            PLUGINS_INTERNAL_LOCK_PATH.touch()
305        except Exception as e:
306            if warn:
307                _warn(f"Unable to create lockfile {PLUGINS_INTERNAL_LOCK_PATH}:\n{e}")
308
309    with _locks['internal_plugins']:
310
311        try:
312            from importlib.metadata import entry_points
313        except ImportError:
314            importlib_metadata = attempt_import('importlib_metadata', lazy=False)
315            entry_points = importlib_metadata.entry_points
316
317        ### NOTE: Allow plugins to be installed via `pip`.
318        packaged_plugin_paths = []
319        try:
320            discovered_packaged_plugins_eps = entry_points(group='meerschaum.plugins')
321        except TypeError:
322            discovered_packaged_plugins_eps = []
323
324        for ep in discovered_packaged_plugins_eps:
325            module_name = ep.name
326            for package_file_path in ep.dist.files:
327                if package_file_path.suffix != '.py':
328                    continue
329                if str(package_file_path) == f'{module_name}.py':
330                    packaged_plugin_paths.append(package_file_path.locate())
331                elif str(package_file_path) == f'{module_name}/__init__.py':
332                    packaged_plugin_paths.append(package_file_path.locate().parent)
333
334        if is_symlink(PLUGINS_RESOURCES_PATH) or not PLUGINS_RESOURCES_PATH.exists():
335            try:
336                PLUGINS_RESOURCES_PATH.unlink()
337            except Exception:
338                pass
339
340        PLUGINS_RESOURCES_PATH.mkdir(exist_ok=True)
341
342        existing_symlinked_paths = [
343            (PLUGINS_RESOURCES_PATH / item) 
344            for item in os.listdir(PLUGINS_RESOURCES_PATH)
345        ]
346        for plugins_path in PLUGINS_DIR_PATHS:
347            if not plugins_path.exists():
348                plugins_path.mkdir(exist_ok=True, parents=True)
349        plugins_to_be_symlinked = list(flatten_list(
350            [
351                [
352                    (plugins_path / item)
353                    for item in os.listdir(plugins_path)
354                    if (
355                        not item.startswith('.')
356                    ) and (item not in ('__pycache__', '__init__.py'))
357                ]
358                for plugins_path in PLUGINS_DIR_PATHS
359            ]
360        ))
361        plugins_to_be_symlinked.extend(packaged_plugin_paths)
362
363        ### Check for duplicates.
364        seen_plugins = defaultdict(lambda: 0)
365        for plugin_path in plugins_to_be_symlinked:
366            plugin_name = plugin_path.stem
367            seen_plugins[plugin_name] += 1
368        for plugin_name, plugin_count in seen_plugins.items():
369            if plugin_count > 1:
370                if warn:
371                    _warn(f"Found duplicate plugins named '{plugin_name}'.")
372
373        for plugin_symlink_path in existing_symlinked_paths:
374            real_path = pathlib.Path(os.path.realpath(plugin_symlink_path))
375
376            ### Remove invalid symlinks.
377            if real_path not in plugins_to_be_symlinked:
378                if not is_symlink(plugin_symlink_path):
379                    continue
380                try:
381                    plugin_symlink_path.unlink()
382                except Exception as e:
383                    pass
384
385            ### Remove valid plugins from the to-be-symlinked list.
386            else:
387                plugins_to_be_symlinked.remove(real_path)
388
389        for plugin_path in plugins_to_be_symlinked:
390            plugin_symlink_path = PLUGINS_RESOURCES_PATH / plugin_path.name
391            try:
392                ### There might be duplicate folders (e.g. __pycache__).
393                if (
394                    plugin_symlink_path.exists()
395                    and
396                    plugin_symlink_path.is_dir()
397                    and
398                    not is_symlink(plugin_symlink_path)
399                ):
400                    continue
401                success, msg = make_symlink(plugin_path, plugin_symlink_path)
402            except Exception as e:
403                success, msg = False, str(e)
404            if not success:
405                if warn:
406                    _warn(
407                        f"Failed to create symlink {plugin_symlink_path} "
408                        + f"to {plugin_path}:\n    {msg}"
409                    )
410
411    ### Release symlink lock file in case other processes need it.
412    with _locks['PLUGINS_INTERNAL_LOCK_PATH']:
413        try:
414            if PLUGINS_INTERNAL_LOCK_PATH.exists():
415                PLUGINS_INTERNAL_LOCK_PATH.unlink()
416        ### Sometimes competing threads will delete the lock file at the same time.
417        except FileNotFoundError:
418            pass
419        except Exception as e:
420            if warn:
421                _warn(f"Error cleaning up lockfile {PLUGINS_INTERNAL_LOCK_PATH}:\n{e}")
422
423        try:
424            if not PLUGINS_INIT_PATH.exists():
425                PLUGINS_INIT_PATH.touch()
426        except Exception as e:
427            error(f"Failed to create the file '{PLUGINS_INIT_PATH}':\n{e}")
428
429    with _locks['__path__']:
430        if str(PLUGINS_RESOURCES_PATH.parent) not in __path__:
431            __path__.append(str(PLUGINS_RESOURCES_PATH.parent))
432
433    with _locks['_synced_symlinks']:
434        _synced_symlinks = True
435
436
437def import_plugins(
438    *plugins_to_import: Union[str, List[str], None],
439    warn: bool = True,
440) -> Union[
441    'ModuleType', Tuple['ModuleType', None]
442]:
443    """
444    Import the Meerschaum plugins directory.
445
446    Parameters
447    ----------
448    plugins_to_import: Union[str, List[str], None]
449        If provided, only import the specified plugins.
450        Otherwise import the entire plugins module. May be a string, list, or `None`.
451        Defaults to `None`.
452
453    Returns
454    -------
455    A module of list of modules, depening on the number of plugins provided.
456
457    """
458    import sys
459    import os
460    import importlib
461    from meerschaum.utils.misc import flatten_list
462    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
463    from meerschaum.utils.venv import is_venv_active, activate_venv, deactivate_venv, Venv
464    from meerschaum.utils.warnings import warn as _warn
465    plugins_to_import = list(plugins_to_import)
466    with _locks['sys.path']:
467
468        ### Since plugins may depend on other plugins,
469        ### we need to activate the virtual environments for library plugins.
470        ### This logic exists in `Plugin.activate_venv()`,
471        ### but that code requires the plugin's module to already be imported.
472        ### It's not a guarantee of correct activation order,
473        ### e.g. if a library plugin pins a specific package and another 
474        plugins_names = get_plugins_names()
475        already_active_venvs = [is_venv_active(plugin_name) for plugin_name in plugins_names]
476
477        if not sys.path or sys.path[0] != str(PLUGINS_RESOURCES_PATH.parent):
478            sys.path.insert(0, str(PLUGINS_RESOURCES_PATH.parent))
479
480        if not plugins_to_import:
481            for plugin_name in plugins_names:
482                activate_venv(plugin_name)
483            try:
484                imported_plugins = importlib.import_module(PLUGINS_RESOURCES_PATH.stem)
485            except ImportError as e:
486                _warn(f"Failed to import the plugins module:\n    {e}")
487                import traceback
488                traceback.print_exc()
489                imported_plugins = None
490            for plugin_name in plugins_names:
491                if plugin_name in already_active_venvs:
492                    continue
493                deactivate_venv(plugin_name)
494
495        else:
496            imported_plugins = []
497            for plugin_name in flatten_list(plugins_to_import):
498                plugin = Plugin(plugin_name)
499                try:
500                    with Venv(plugin):
501                        imported_plugins.append(
502                            importlib.import_module(
503                                f'{PLUGINS_RESOURCES_PATH.stem}.{plugin_name}'
504                            )
505                        )
506                except Exception as e:
507                    _warn(
508                        f"Failed to import plugin '{plugin_name}':\n    "
509                        + f"{e}\n\nHere's a stacktrace:",
510                        stack = False,
511                    )
512                    from meerschaum.utils.formatting import get_console
513                    get_console().print_exception(
514                        suppress = [
515                            'meerschaum/plugins/__init__.py',
516                            importlib,
517                            importlib._bootstrap,
518                        ]
519                    )
520                    imported_plugins.append(None)
521
522        if imported_plugins is None and warn:
523            _warn(f"Failed to import plugins.", stacklevel=3)
524
525        if str(PLUGINS_RESOURCES_PATH.parent) in sys.path:
526            sys.path.remove(str(PLUGINS_RESOURCES_PATH.parent))
527
528    if isinstance(imported_plugins, list):
529        return (imported_plugins[0] if len(imported_plugins) == 1 else tuple(imported_plugins))
530    return imported_plugins
531
532
533def from_plugin_import(plugin_import_name: str, *attrs: str) -> Any:
534    """
535    Emulate the `from module import x` behavior.
536
537    Parameters
538    ----------
539    plugin_import_name: str
540        The import name of the plugin's module.
541        Separate submodules with '.' (e.g. 'compose.utils.pipes')
542
543    attrs: str
544        Names of the attributes to return.
545
546    Returns
547    -------
548    Objects from a plugin's submodule.
549    If multiple objects are provided, return a tuple.
550
551    Examples
552    --------
553    >>> init = from_plugin_import('compose.utils', 'init')
554    >>> with mrsm.Venv('compose'):
555    ...     cf = init()
556    >>> build_parent_pipe, get_defined_pipes = from_plugin_import(
557    ...     'compose.utils.pipes',
558    ...     'build_parent_pipe',
559    ...     'get_defined_pipes',
560    ... )
561    >>> parent_pipe = build_parent_pipe(cf)
562    >>> defined_pipes = get_defined_pipes(cf)
563    """
564    import importlib
565    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
566    from meerschaum.utils.warnings import warn as _warn
567    if plugin_import_name.startswith('plugins.'):
568        plugin_import_name = plugin_import_name[len('plugins.'):]
569    plugin_import_parts = plugin_import_name.split('.')
570    plugin_root_name = plugin_import_parts[0]
571    plugin = mrsm.Plugin(plugin_root_name)
572
573    submodule_import_name = '.'.join(
574        [PLUGINS_RESOURCES_PATH.stem]
575        + plugin_import_parts
576    )
577    if len(attrs) == 0:
578        raise ValueError(f"Provide which attributes to return from '{submodule_import_name}'.")
579
580    attrs_to_return = []
581    with mrsm.Venv(plugin):
582        if plugin.module is None:
583            return None
584
585        try:
586            submodule = importlib.import_module(submodule_import_name)
587        except ImportError as e:
588            _warn(
589                f"Failed to import plugin '{submodule_import_name}':\n    "
590                + f"{e}\n\nHere's a stacktrace:",
591                stack=False,
592            )
593            from meerschaum.utils.formatting import get_console
594            get_console().print_exception(
595                suppress=[
596                    'meerschaum/plugins/__init__.py',
597                    importlib,
598                    importlib._bootstrap,
599                ]
600            )
601            return None
602
603        for attr in attrs:
604            try:
605                attrs_to_return.append(getattr(submodule, attr))
606            except Exception:
607                _warn(f"Failed to access '{attr}' from '{submodule_import_name}'.")
608                attrs_to_return.append(None)
609        
610        if len(attrs) == 1:
611            return attrs_to_return[0]
612
613        return tuple(attrs_to_return)
614
615
616def load_plugins(debug: bool = False, shell: bool = False) -> None:
617    """
618    Import Meerschaum plugins and update the actions dictionary.
619    """
620    from inspect import isfunction, getmembers
621    from meerschaum.actions import __all__ as _all, modules
622    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
623    from meerschaum.utils.packages import get_modules_from_package
624    if debug:
625        from meerschaum.utils.debug import dprint
626
627    _plugins_names, plugins_modules = get_modules_from_package(
628        import_plugins(),
629        names = True,
630        recursive = True,
631        modules_venvs = True
632    )
633    ### I'm appending here to keep from redefining the modules list.
634    new_modules = (
635        [
636            mod for mod in modules
637            if not mod.__name__.startswith(PLUGINS_RESOURCES_PATH.stem + '.')
638        ]
639        + plugins_modules
640    )
641    n_mods = len(modules)
642    for mod in new_modules:
643        modules.append(mod)
644    for i in range(n_mods):
645        modules.pop(0)
646
647    for module in plugins_modules:
648        for name, func in getmembers(module):
649            if not isfunction(func):
650                continue
651            if name == module.__name__.split('.')[-1]:
652                make_action(func, **{'shell': shell, 'debug': debug})
653
654
655def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
656    """
657    Reload plugins back into memory.
658
659    Parameters
660    ----------
661    plugins: Optional[List[str]], default None
662        The plugins to reload. `None` will reload all plugins.
663
664    """
665    import sys
666    if debug:
667        from meerschaum.utils.debug import dprint
668
669    if not plugins:
670        plugins = get_plugins_names()
671    for plugin_name in plugins:
672        if debug:
673            dprint(f"Reloading plugin '{plugin_name}'...")
674        mod_name = 'plugins.' + str(plugin_name)
675        if mod_name in sys.modules:
676            del sys.modules[mod_name]
677    load_plugins(debug=debug)
678
679
680def get_plugins(*to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]:
681    """
682    Return a list of `Plugin` objects.
683
684    Parameters
685    ----------
686    to_load:
687        If specified, only load specific plugins.
688        Otherwise return all plugins.
689
690    try_import: bool, default True
691        If `True`, allow for plugins to be imported.
692    """
693    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
694    import os
695    sync_plugins_symlinks()
696    _plugins = [
697        Plugin(name) for name in (
698            to_load or [
699                (
700                    name if (PLUGINS_RESOURCES_PATH / name).is_dir()
701                    else name[:-3]
702                ) for name in os.listdir(PLUGINS_RESOURCES_PATH) if name != '__init__.py'
703            ]
704        )
705    ]
706    plugins = tuple(plugin for plugin in _plugins if plugin.is_installed(try_import=try_import))
707    if len(to_load) == 1:
708        if len(plugins) == 0:
709            raise ValueError(f"Plugin '{to_load[0]}' is not installed.")
710        return plugins[0]
711    return plugins
712
713
714def get_plugins_names(*to_load, **kw) -> List[str]:
715    """
716    Return a list of installed plugins.
717    """
718    return [plugin.name for plugin in get_plugins(*to_load, **kw)]
719
720
721def get_plugins_modules(*to_load, **kw) -> List['ModuleType']:
722    """
723    Return a list of modules for the installed plugins, or `None` if things break.
724    """
725    return [plugin.module for plugin in get_plugins(*to_load, **kw)]
726
727
728def get_data_plugins() -> List[Plugin]:
729    """
730    Only return the modules of plugins with either `fetch()` or `sync()` functions.
731    """
732    import inspect
733    plugins = get_plugins()
734    data_names = {'sync', 'fetch'}
735    data_plugins = []
736    for plugin in plugins:
737        for name, ob in inspect.getmembers(plugin.module):
738            if not inspect.isfunction(ob):
739                continue
740            if name not in data_names:
741                continue
742            data_plugins.append(plugin)
743    return data_plugins
744
745
746def add_plugin_argument(*args, **kwargs) -> None:
747    """
748    Add argparse arguments under the 'Plugins options' group.
749    Takes the same parameters as the regular argparse `add_argument()` function.
750
751    Examples
752    --------
753    >>> add_plugin_argument('--foo', type=int, help="This is my help text!")
754    >>> 
755    """
756    from meerschaum._internal.arguments._parser import groups, _seen_plugin_args, parser
757    from meerschaum.utils.warnings import warn, error
758    _parent_plugin_name = _get_parent_plugin(2)
759    title = f"Plugin '{_parent_plugin_name}' options" if _parent_plugin_name else 'Custom options'
760    group_key = 'plugin_' + (_parent_plugin_name or '')
761    if group_key not in groups:
762        groups[group_key] = parser.add_argument_group(
763            title = title,
764        )
765        _seen_plugin_args[group_key] = set()
766    try:
767        if str(args) not in _seen_plugin_args[group_key]:
768            groups[group_key].add_argument(*args, **kwargs)
769            _seen_plugin_args[group_key].add(str(args))
770    except Exception as e:
771        warn(e)
772
773
774def _get_parent_plugin(stacklevel: int = 1) -> Union[str, None]:
775    """If this function is called from outside a Meerschaum plugin, it will return None."""
776    import inspect, re
777    try:
778        parent_globals = inspect.stack()[stacklevel][0].f_globals
779        parent_file = parent_globals.get('__file__', '')
780        return parent_globals['__name__'].replace('plugins.', '').split('.')[0]
781    except Exception as e:
782        return None
class Plugin:
 37class Plugin:
 38    """Handle packaging of Meerschaum plugins."""
 39    def __init__(
 40        self,
 41        name: str,
 42        version: Optional[str] = None,
 43        user_id: Optional[int] = None,
 44        required: Optional[List[str]] = None,
 45        attributes: Optional[Dict[str, Any]] = None,
 46        archive_path: Optional[pathlib.Path] = None,
 47        venv_path: Optional[pathlib.Path] = None,
 48        repo_connector: Optional['mrsm.connectors.api.APIConnector'] = None,
 49        repo: Union['mrsm.connectors.api.APIConnector', str, None] = None,
 50    ):
 51        from meerschaum.config.static import STATIC_CONFIG
 52        sep = STATIC_CONFIG['plugins']['repo_separator']
 53        _repo = None
 54        if sep in name:
 55            try:
 56                name, _repo = name.split(sep)
 57            except Exception as e:
 58                error(f"Invalid plugin name: '{name}'")
 59        self._repo_in_name = _repo
 60
 61        if attributes is None:
 62            attributes = {}
 63        self.name = name
 64        self.attributes = attributes
 65        self.user_id = user_id
 66        self._version = version
 67        if required:
 68            self._required = required
 69        self.archive_path = (
 70            archive_path if archive_path is not None
 71            else PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz"
 72        )
 73        self.venv_path = (
 74            venv_path if venv_path is not None
 75            else VIRTENV_RESOURCES_PATH / self.name
 76        )
 77        self._repo_connector = repo_connector
 78        self._repo_keys = repo
 79
 80
 81    @property
 82    def repo_connector(self):
 83        """
 84        Return the repository connector for this plugin.
 85        NOTE: This imports the `connectors` module, which imports certain plugin modules.
 86        """
 87        if self._repo_connector is None:
 88            from meerschaum.connectors.parse import parse_repo_keys
 89
 90            repo_keys = self._repo_keys or self._repo_in_name
 91            if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name:
 92                error(
 93                    f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'."
 94                )
 95            repo_connector = parse_repo_keys(repo_keys)
 96            self._repo_connector = repo_connector
 97        return self._repo_connector
 98
 99
100    @property
101    def version(self):
102        """
103        Return the plugin's module version is defined (`__version__`) if it's defined.
104        """
105        if self._version is None:
106            try:
107                self._version = self.module.__version__
108            except Exception as e:
109                self._version = None
110        return self._version
111
112
113    @property
114    def module(self):
115        """
116        Return the Python module of the underlying plugin.
117        """
118        if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None:
119            if self.__file__ is None:
120                return None
121            from meerschaum.plugins import import_plugins
122            self._module = import_plugins(str(self), warn=False)
123        return self._module
124
125
126    @property
127    def __file__(self) -> Union[str, None]:
128        """
129        Return the file path (str) of the plugin if it exists, otherwise `None`.
130        """
131        if self.__dict__.get('_module', None) is not None:
132            return self.module.__file__
133
134        potential_dir = PLUGINS_RESOURCES_PATH / self.name
135        if (
136            potential_dir.exists()
137            and potential_dir.is_dir()
138            and (potential_dir / '__init__.py').exists()
139        ):
140            return str((potential_dir / '__init__.py').as_posix())
141
142        potential_file = PLUGINS_RESOURCES_PATH / (self.name + '.py')
143        if potential_file.exists() and not potential_file.is_dir():
144            return str(potential_file.as_posix())
145
146        return None
147
148
149    @property
150    def requirements_file_path(self) -> Union[pathlib.Path, None]:
151        """
152        If a file named `requirements.txt` exists, return its path.
153        """
154        if self.__file__ is None:
155            return None
156        path = pathlib.Path(self.__file__).parent / 'requirements.txt'
157        if not path.exists():
158            return None
159        return path
160
161
162    def is_installed(self, **kw) -> bool:
163        """
164        Check whether a plugin is correctly installed.
165
166        Returns
167        -------
168        A `bool` indicating whether a plugin exists and is successfully imported.
169        """
170        return self.__file__ is not None
171
172
173    def make_tar(self, debug: bool = False) -> pathlib.Path:
174        """
175        Compress the plugin's source files into a `.tar.gz` archive and return the archive's path.
176
177        Parameters
178        ----------
179        debug: bool, default False
180            Verbosity toggle.
181
182        Returns
183        -------
184        A `pathlib.Path` to the archive file's path.
185
186        """
187        import tarfile, pathlib, subprocess, fnmatch
188        from meerschaum.utils.debug import dprint
189        from meerschaum.utils.packages import attempt_import
190        pathspec = attempt_import('pathspec', debug=debug)
191
192        if not self.__file__:
193            from meerschaum.utils.warnings import error
194            error(f"Could not find file for plugin '{self}'.")
195        if '__init__.py' in self.__file__ or os.path.isdir(self.__file__):
196            path = self.__file__.replace('__init__.py', '')
197            is_dir = True
198        else:
199            path = self.__file__
200            is_dir = False
201
202        old_cwd = os.getcwd()
203        real_parent_path = pathlib.Path(os.path.realpath(path)).parent
204        os.chdir(real_parent_path)
205
206        default_patterns_to_ignore = [
207            '.pyc',
208            '__pycache__/',
209            'eggs/',
210            '__pypackages__/',
211            '.git',
212        ]
213
214        def parse_gitignore() -> 'Set[str]':
215            gitignore_path = pathlib.Path(path) / '.gitignore'
216            if not gitignore_path.exists():
217                return set(default_patterns_to_ignore)
218            with open(gitignore_path, 'r', encoding='utf-8') as f:
219                gitignore_text = f.read()
220            return set(pathspec.PathSpec.from_lines(
221                pathspec.patterns.GitWildMatchPattern,
222                default_patterns_to_ignore + gitignore_text.splitlines()
223            ).match_tree(path))
224
225        patterns_to_ignore = parse_gitignore() if is_dir else set()
226
227        if debug:
228            dprint(f"Patterns to ignore:\n{patterns_to_ignore}")
229
230        with tarfile.open(self.archive_path, 'w:gz') as tarf:
231            if not is_dir:
232                tarf.add(f"{self.name}.py")
233            else:
234                for root, dirs, files in os.walk(self.name):
235                    for f in files:
236                        good_file = True
237                        fp = os.path.join(root, f)
238                        for pattern in patterns_to_ignore:
239                            if pattern in str(fp) or f.startswith('.'):
240                                good_file = False
241                                break
242                        if good_file:
243                            if debug:
244                                dprint(f"Adding '{fp}'...")
245                            tarf.add(fp)
246
247        ### clean up and change back to old directory
248        os.chdir(old_cwd)
249
250        ### change to 775 to avoid permissions issues with the API in a Docker container
251        self.archive_path.chmod(0o775)
252
253        if debug:
254            dprint(f"Created archive '{self.archive_path}'.")
255        return self.archive_path
256
257
258    def install(
259            self,
260            skip_deps: bool = False,
261            force: bool = False,
262            debug: bool = False,
263        ) -> SuccessTuple:
264        """
265        Extract a plugin's tar archive to the plugins directory.
266        
267        This function checks if the plugin is already installed and if the version is equal or
268        greater than the existing installation.
269
270        Parameters
271        ----------
272        skip_deps: bool, default False
273            If `True`, do not install dependencies.
274
275        force: bool, default False
276            If `True`, continue with installation, even if required packages fail to install.
277
278        debug: bool, default False
279            Verbosity toggle.
280
281        Returns
282        -------
283        A `SuccessTuple` of success (bool) and a message (str).
284
285        """
286        if self.full_name in _ongoing_installations:
287            return True, f"Already installing plugin '{self}'."
288        _ongoing_installations.add(self.full_name)
289        from meerschaum.utils.warnings import warn, error
290        if debug:
291            from meerschaum.utils.debug import dprint
292        import tarfile
293        import re
294        import ast
295        from meerschaum.plugins import sync_plugins_symlinks
296        from meerschaum.utils.packages import attempt_import, determine_version, reload_meerschaum
297        from meerschaum.utils.venv import init_venv
298        from meerschaum.utils.misc import safely_extract_tar
299        old_cwd = os.getcwd()
300        old_version = ''
301        new_version = ''
302        temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name
303        temp_dir.mkdir(exist_ok=True)
304
305        if not self.archive_path.exists():
306            return False, f"Missing archive file for plugin '{self}'."
307        if self.version is not None:
308            old_version = self.version
309            if debug:
310                dprint(f"Found existing version '{old_version}' for plugin '{self}'.")
311
312        if debug:
313            dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...")
314
315        try:
316            with tarfile.open(self.archive_path, 'r:gz') as tarf:
317                safely_extract_tar(tarf, temp_dir)
318        except Exception as e:
319            warn(e)
320            return False, f"Failed to extract plugin '{self.name}'."
321
322        ### search for version information
323        files = os.listdir(temp_dir)
324        
325        if str(files[0]) == self.name:
326            is_dir = True
327        elif str(files[0]) == self.name + '.py':
328            is_dir = False
329        else:
330            error(f"Unknown format encountered for plugin '{self}'.")
331
332        fpath = temp_dir / files[0]
333        if is_dir:
334            fpath = fpath / '__init__.py'
335
336        init_venv(self.name, debug=debug)
337        with open(fpath, 'r', encoding='utf-8') as f:
338            init_lines = f.readlines()
339        new_version = None
340        for line in init_lines:
341            if '__version__' not in line:
342                continue
343            version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip())
344            if not version_match:
345                continue
346            new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip())
347            break
348        if not new_version:
349            warn(
350                f"No `__version__` defined for plugin '{self}'. "
351                + "Assuming new version...",
352                stack = False,
353            )
354
355        packaging_version = attempt_import('packaging.version')
356        try:
357            is_new_version = (not new_version and not old_version) or (
358                packaging_version.parse(old_version) < packaging_version.parse(new_version)
359            )
360            is_same_version = new_version and old_version and (
361                packaging_version.parse(old_version) == packaging_version.parse(new_version)
362            )
363        except Exception as e:
364            is_new_version, is_same_version = True, False
365
366        ### Determine where to permanently store the new plugin.
367        plugin_installation_dir_path = PLUGINS_DIR_PATHS[0]
368        for path in PLUGINS_DIR_PATHS:
369            files_in_plugins_dir = os.listdir(path)
370            if (
371                self.name in files_in_plugins_dir
372                or
373                (self.name + '.py') in files_in_plugins_dir
374            ):
375                plugin_installation_dir_path = path
376                break
377
378        success_msg = (
379            f"Successfully installed plugin '{self}'"
380            + ("\n    (skipped dependencies)" if skip_deps else "")
381            + "."
382        )
383        success, abort = None, None
384
385        if is_same_version and not force:
386            success, msg = True, (
387                f"Plugin '{self}' is up-to-date (version {old_version}).\n" +
388                "    Install again with `-f` or `--force` to reinstall."
389            )
390            abort = True
391        elif is_new_version or force:
392            for src_dir, dirs, files in os.walk(temp_dir):
393                if success is not None:
394                    break
395                dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path))
396                if not os.path.exists(dst_dir):
397                    os.mkdir(dst_dir)
398                for f in files:
399                    src_file = os.path.join(src_dir, f)
400                    dst_file = os.path.join(dst_dir, f)
401                    if os.path.exists(dst_file):
402                        os.remove(dst_file)
403
404                    if debug:
405                        dprint(f"Moving '{src_file}' to '{dst_dir}'...")
406                    try:
407                        shutil.move(src_file, dst_dir)
408                    except Exception as e:
409                        success, msg = False, (
410                            f"Failed to install plugin '{self}': " +
411                            f"Could not move file '{src_file}' to '{dst_dir}'"
412                        )
413                        print(msg)
414                        break
415            if success is None:
416                success, msg = True, success_msg
417        else:
418            success, msg = False, (
419                f"Your installed version of plugin '{self}' ({old_version}) is higher than "
420                + f"attempted version {new_version}."
421            )
422
423        shutil.rmtree(temp_dir)
424        os.chdir(old_cwd)
425
426        ### Reload the plugin's module.
427        sync_plugins_symlinks(debug=debug)
428        if '_module' in self.__dict__:
429            del self.__dict__['_module']
430        init_venv(venv=self.name, force=True, debug=debug)
431        reload_meerschaum(debug=debug)
432
433        ### if we've already failed, return here
434        if not success or abort:
435            _ongoing_installations.remove(self.full_name)
436            return success, msg
437
438        ### attempt to install dependencies
439        dependencies_installed = skip_deps or self.install_dependencies(force=force, debug=debug)
440        if not dependencies_installed:
441            _ongoing_installations.remove(self.full_name)
442            return False, f"Failed to install dependencies for plugin '{self}'."
443
444        ### handling success tuple, bool, or other (typically None)
445        setup_tuple = self.setup(debug=debug)
446        if isinstance(setup_tuple, tuple):
447            if not setup_tuple[0]:
448                success, msg = setup_tuple
449        elif isinstance(setup_tuple, bool):
450            if not setup_tuple:
451                success, msg = False, (
452                    f"Failed to run post-install setup for plugin '{self}'." + '\n' +
453                    f"Check `setup()` in '{self.__file__}' for more information " +
454                    f"(no error message provided)."
455                )
456            else:
457                success, msg = True, success_msg
458        elif setup_tuple is None:
459            success = True
460            msg = (
461                f"Post-install for plugin '{self}' returned None. " +
462                f"Assuming plugin successfully installed."
463            )
464            warn(msg)
465        else:
466            success = False
467            msg = (
468                f"Post-install for plugin '{self}' returned unexpected value " +
469                f"of type '{type(setup_tuple)}': {setup_tuple}"
470            )
471
472        _ongoing_installations.remove(self.full_name)
473        module = self.module
474        return success, msg
475
476
477    def remove_archive(
478        self,        
479        debug: bool = False
480    ) -> SuccessTuple:
481        """Remove a plugin's archive file."""
482        if not self.archive_path.exists():
483            return True, f"Archive file for plugin '{self}' does not exist."
484        try:
485            self.archive_path.unlink()
486        except Exception as e:
487            return False, f"Failed to remove archive for plugin '{self}':\n{e}"
488        return True, "Success"
489
490
491    def remove_venv(
492        self,        
493        debug: bool = False
494    ) -> SuccessTuple:
495        """Remove a plugin's virtual environment."""
496        if not self.venv_path.exists():
497            return True, f"Virtual environment for plugin '{self}' does not exist."
498        try:
499            shutil.rmtree(self.venv_path)
500        except Exception as e:
501            return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}"
502        return True, "Success"
503
504
505    def uninstall(self, debug: bool = False) -> SuccessTuple:
506        """
507        Remove a plugin, its virtual environment, and archive file.
508        """
509        from meerschaum.utils.packages import reload_meerschaum
510        from meerschaum.plugins import sync_plugins_symlinks
511        from meerschaum.utils.warnings import warn, info
512        warnings_thrown_count: int = 0
513        max_warnings: int = 3
514
515        if not self.is_installed():
516            info(
517                f"Plugin '{self.name}' doesn't seem to be installed.\n    "
518                + "Checking for artifacts...",
519                stack = False,
520            )
521        else:
522            real_path = pathlib.Path(os.path.realpath(self.__file__))
523            try:
524                if real_path.name == '__init__.py':
525                    shutil.rmtree(real_path.parent)
526                else:
527                    real_path.unlink()
528            except Exception as e:
529                warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False)
530                warnings_thrown_count += 1
531            else:
532                info(f"Removed source files for plugin '{self.name}'.")
533
534        if self.venv_path.exists():
535            success, msg = self.remove_venv(debug=debug)
536            if not success:
537                warn(msg, stack=False)
538                warnings_thrown_count += 1
539            else:
540                info(f"Removed virtual environment from plugin '{self.name}'.")
541
542        success = warnings_thrown_count < max_warnings
543        sync_plugins_symlinks(debug=debug)
544        self.deactivate_venv(force=True, debug=debug)
545        reload_meerschaum(debug=debug)
546        return success, (
547            f"Successfully uninstalled plugin '{self}'." if success
548            else f"Failed to uninstall plugin '{self}'."
549        )
550
551
552    def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]:
553        """
554        If exists, run the plugin's `setup()` function.
555
556        Parameters
557        ----------
558        *args: str
559            The positional arguments passed to the `setup()` function.
560            
561        debug: bool, default False
562            Verbosity toggle.
563
564        **kw: Any
565            The keyword arguments passed to the `setup()` function.
566
567        Returns
568        -------
569        A `SuccessTuple` or `bool` indicating success.
570
571        """
572        from meerschaum.utils.debug import dprint
573        import inspect
574        _setup = None
575        for name, fp in inspect.getmembers(self.module):
576            if name == 'setup' and inspect.isfunction(fp):
577                _setup = fp
578                break
579
580        ### assume success if no setup() is found (not necessary)
581        if _setup is None:
582            return True
583
584        sig = inspect.signature(_setup)
585        has_debug, has_kw = ('debug' in sig.parameters), False
586        for k, v in sig.parameters.items():
587            if '**' in str(v):
588                has_kw = True
589                break
590
591        _kw = {}
592        if has_kw:
593            _kw.update(kw)
594        if has_debug:
595            _kw['debug'] = debug
596
597        if debug:
598            dprint(f"Running setup for plugin '{self}'...")
599        try:
600            self.activate_venv(debug=debug)
601            return_tuple = _setup(*args, **_kw)
602            self.deactivate_venv(debug=debug)
603        except Exception as e:
604            return False, str(e)
605
606        if isinstance(return_tuple, tuple):
607            return return_tuple
608        if isinstance(return_tuple, bool):
609            return return_tuple, f"Setup for Plugin '{self.name}' did not return a message."
610        if return_tuple is None:
611            return False, f"Setup for Plugin '{self.name}' returned None."
612        return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"
613
614
615    def get_dependencies(
616        self,
617        debug: bool = False,
618    ) -> List[str]:
619        """
620        If the Plugin has specified dependencies in a list called `required`, return the list.
621        
622        **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages.
623        Meerschaum plugins may also specify connector keys for a repo after `'@'`.
624
625        Parameters
626        ----------
627        debug: bool, default False
628            Verbosity toggle.
629
630        Returns
631        -------
632        A list of required packages and plugins (str).
633
634        """
635        if '_required' in self.__dict__:
636            return self._required
637
638        ### If the plugin has not yet been imported,
639        ### infer the dependencies from the source text.
640        ### This is not super robust, and it doesn't feel right
641        ### having multiple versions of the logic.
642        ### This is necessary when determining the activation order
643        ### without having import the module.
644        ### For consistency's sake, the module-less method does not cache the requirements.
645        if self.__dict__.get('_module', None) is None:
646            file_path = self.__file__
647            if file_path is None:
648                return []
649            with open(file_path, 'r', encoding='utf-8') as f:
650                text = f.read()
651
652            if 'required' not in text:
653                return []
654
655            ### This has some limitations:
656            ### It relies on `required` being manually declared.
657            ### We lose the ability to dynamically alter the `required` list,
658            ### which is why we've kept the module-reliant method below.
659            import ast, re
660            ### NOTE: This technically would break 
661            ### if `required` was the very first line of the file.
662            req_start_match = re.search(r'\nrequired(:\s*)?.*=', text)
663            if not req_start_match:
664                return []
665            req_start = req_start_match.start()
666            equals_sign = req_start + text[req_start:].find('=')
667
668            ### Dependencies may have brackets within the strings, so push back the index.
669            first_opening_brace = equals_sign + 1 + text[equals_sign:].find('[')
670            if first_opening_brace == -1:
671                return []
672
673            next_closing_brace = equals_sign + 1 + text[equals_sign:].find(']')
674            if next_closing_brace == -1:
675                return []
676
677            start_ix = first_opening_brace + 1
678            end_ix = next_closing_brace
679
680            num_braces = 0
681            while True:
682                if '[' not in text[start_ix:end_ix]:
683                    break
684                num_braces += 1
685                start_ix = end_ix
686                end_ix += text[end_ix + 1:].find(']') + 1
687
688            req_end = end_ix + 1
689            req_text = (
690                text[(first_opening_brace-1):req_end]
691                .lstrip()
692                .replace('=', '', 1)
693                .lstrip()
694                .rstrip()
695            )
696            try:
697                required = ast.literal_eval(req_text)
698            except Exception as e:
699                warn(
700                    f"Unable to determine requirements for plugin '{self.name}' "
701                    + "without importing the module.\n"
702                    + "    This may be due to dynamically setting the global `required` list.\n"
703                    + f"    {e}"
704                )
705                return []
706            return required
707
708        import inspect
709        self.activate_venv(dependencies=False, debug=debug)
710        required = []
711        for name, val in inspect.getmembers(self.module):
712            if name == 'required':
713                required = val
714                break
715        self._required = required
716        self.deactivate_venv(dependencies=False, debug=debug)
717        return required
718
719
720    def get_required_plugins(self, debug: bool=False) -> List[meerschaum.plugins.Plugin]:
721        """
722        Return a list of required Plugin objects.
723        """
724        from meerschaum.utils.warnings import warn
725        from meerschaum.config import get_config
726        from meerschaum.config.static import STATIC_CONFIG
727        plugins = []
728        _deps = self.get_dependencies(debug=debug)
729        sep = STATIC_CONFIG['plugins']['repo_separator']
730        plugin_names = [
731            _d[len('plugin:'):] for _d in _deps
732            if _d.startswith('plugin:') and len(_d) > len('plugin:')
733        ]
734        default_repo_keys = get_config('meerschaum', 'default_repository')
735        for _plugin_name in plugin_names:
736            if sep in _plugin_name:
737                try:
738                    _plugin_name, _repo_keys = _plugin_name.split(sep)
739                except Exception as e:
740                    _repo_keys = default_repo_keys
741                    warn(
742                        f"Invalid repo keys for required plugin '{_plugin_name}'.\n    "
743                        + f"Will try to use '{_repo_keys}' instead.",
744                        stack = False,
745                    )
746            else:
747                _repo_keys = default_repo_keys
748            plugins.append(Plugin(_plugin_name, repo=_repo_keys))
749        return plugins
750
751
752    def get_required_packages(self, debug: bool=False) -> List[str]:
753        """
754        Return the required package names (excluding plugins).
755        """
756        _deps = self.get_dependencies(debug=debug)
757        return [_d for _d in _deps if not _d.startswith('plugin:')]
758
759
760    def activate_venv(self, dependencies: bool=True, debug: bool=False, **kw) -> bool:
761        """
762        Activate the virtual environments for the plugin and its dependencies.
763
764        Parameters
765        ----------
766        dependencies: bool, default True
767            If `True`, activate the virtual environments for required plugins.
768
769        Returns
770        -------
771        A bool indicating success.
772        """
773        from meerschaum.utils.venv import venv_target_path
774        from meerschaum.utils.packages import activate_venv
775        from meerschaum.utils.misc import make_symlink, is_symlink
776        from meerschaum.config._paths import PACKAGE_ROOT_PATH
777
778        if dependencies:
779            for plugin in self.get_required_plugins(debug=debug):
780                plugin.activate_venv(debug=debug, **kw)
781
782        vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True)
783        venv_meerschaum_path = vtp / 'meerschaum'
784
785        try:
786            success, msg = True, "Success"
787            if is_symlink(venv_meerschaum_path):
788                if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH:
789                    venv_meerschaum_path.unlink()
790                    success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH)
791        except Exception as e:
792            success, msg = False, str(e)
793        if not success:
794            warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}")
795
796        return activate_venv(self.name, debug=debug, **kw)
797
798
799    def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool:
800        """
801        Deactivate the virtual environments for the plugin and its dependencies.
802
803        Parameters
804        ----------
805        dependencies: bool, default True
806            If `True`, deactivate the virtual environments for required plugins.
807
808        Returns
809        -------
810        A bool indicating success.
811        """
812        from meerschaum.utils.packages import deactivate_venv
813        success = deactivate_venv(self.name, debug=debug, **kw)
814        if dependencies:
815            for plugin in self.get_required_plugins(debug=debug):
816                plugin.deactivate_venv(debug=debug, **kw)
817        return success
818
819
820    def install_dependencies(
821            self,
822            force: bool = False,
823            debug: bool = False,
824        ) -> bool:
825        """
826        If specified, install dependencies.
827        
828        **NOTE:** Dependencies that start with `'plugin:'` will be installed as
829        Meerschaum plugins from the same repository as this Plugin.
830        To install from a different repository, add the repo keys after `'@'`
831        (e.g. `'plugin:foo@api:bar'`).
832
833        Parameters
834        ----------
835        force: bool, default False
836            If `True`, continue with the installation, even if some
837            required packages fail to install.
838
839        debug: bool, default False
840            Verbosity toggle.
841
842        Returns
843        -------
844        A bool indicating success.
845
846        """
847        from meerschaum.utils.packages import pip_install, venv_contains_package
848        from meerschaum.utils.debug import dprint
849        from meerschaum.utils.warnings import warn, info
850        from meerschaum.connectors.parse import parse_repo_keys
851        _deps = self.get_dependencies(debug=debug)
852        if not _deps and self.requirements_file_path is None:
853            return True
854
855        plugins = self.get_required_plugins(debug=debug)
856        for _plugin in plugins:
857            if _plugin.name == self.name:
858                warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False)
859                continue
860            _success, _msg = _plugin.repo_connector.install_plugin(
861                _plugin.name, debug=debug, force=force
862            )
863            if not _success:
864                warn(
865                    f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'"
866                    + f" for plugin '{self.name}':\n" + _msg,
867                    stack = False,
868                )
869                if not force:
870                    warn(
871                        "Try installing with the `--force` flag to continue anyway.",
872                        stack = False,
873                    )
874                    return False
875                info(
876                    "Continuing with installation despite the failure "
877                    + "(careful, things might be broken!)...",
878                    icon = False
879                )
880
881
882        ### First step: parse `requirements.txt` if it exists.
883        if self.requirements_file_path is not None:
884            if not pip_install(
885                requirements_file_path=self.requirements_file_path,
886                venv=self.name, debug=debug
887            ):
888                warn(
889                    f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.",
890                    stack = False,
891                )
892                if not force:
893                    warn(
894                        "Try installing with `--force` to continue anyway.",
895                        stack = False,
896                    )
897                    return False
898                info(
899                    "Continuing with installation despite the failure "
900                    + "(careful, things might be broken!)...",
901                    icon = False
902                )
903
904
905        ### Don't reinstall packages that are already included in required plugins.
906        packages = []
907        _packages = self.get_required_packages(debug=debug)
908        accounted_for_packages = set()
909        for package_name in _packages:
910            for plugin in plugins:
911                if venv_contains_package(package_name, plugin.name):
912                    accounted_for_packages.add(package_name)
913                    break
914        packages = [pkg for pkg in _packages if pkg not in accounted_for_packages]
915
916        ### Attempt pip packages installation.
917        if packages:
918            for package in packages:
919                if not pip_install(package, venv=self.name, debug=debug):
920                    warn(
921                        f"Failed to install required package '{package}'"
922                        + f" for plugin '{self.name}'.",
923                        stack = False,
924                    )
925                    if not force:
926                        warn(
927                            "Try installing with `--force` to continue anyway.",
928                            stack = False,
929                        )
930                        return False
931                    info(
932                        "Continuing with installation despite the failure "
933                        + "(careful, things might be broken!)...",
934                        icon = False
935                    )
936        return True
937
938
939    @property
940    def full_name(self) -> str:
941        """
942        Include the repo keys with the plugin's name.
943        """
944        from meerschaum.config.static import STATIC_CONFIG
945        sep = STATIC_CONFIG['plugins']['repo_separator']
946        return self.name + sep + str(self.repo_connector)
947
948
949    def __str__(self):
950        return self.name
951
952
953    def __repr__(self):
954        return f"Plugin('{self.name}', repo='{self.repo_connector}')"
955
956
957    def __del__(self):
958        pass

Handle packaging of Meerschaum plugins.

Plugin( name: str, version: Optional[str] = None, user_id: Optional[int] = None, required: Optional[List[str]] = None, attributes: Optional[Dict[str, Any]] = None, archive_path: Optional[pathlib.Path] = None, venv_path: Optional[pathlib.Path] = None, repo_connector: Optional[meerschaum.connectors.APIConnector] = None, repo: Union[meerschaum.connectors.APIConnector, str, NoneType] = None)
39    def __init__(
40        self,
41        name: str,
42        version: Optional[str] = None,
43        user_id: Optional[int] = None,
44        required: Optional[List[str]] = None,
45        attributes: Optional[Dict[str, Any]] = None,
46        archive_path: Optional[pathlib.Path] = None,
47        venv_path: Optional[pathlib.Path] = None,
48        repo_connector: Optional['mrsm.connectors.api.APIConnector'] = None,
49        repo: Union['mrsm.connectors.api.APIConnector', str, None] = None,
50    ):
51        from meerschaum.config.static import STATIC_CONFIG
52        sep = STATIC_CONFIG['plugins']['repo_separator']
53        _repo = None
54        if sep in name:
55            try:
56                name, _repo = name.split(sep)
57            except Exception as e:
58                error(f"Invalid plugin name: '{name}'")
59        self._repo_in_name = _repo
60
61        if attributes is None:
62            attributes = {}
63        self.name = name
64        self.attributes = attributes
65        self.user_id = user_id
66        self._version = version
67        if required:
68            self._required = required
69        self.archive_path = (
70            archive_path if archive_path is not None
71            else PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz"
72        )
73        self.venv_path = (
74            venv_path if venv_path is not None
75            else VIRTENV_RESOURCES_PATH / self.name
76        )
77        self._repo_connector = repo_connector
78        self._repo_keys = repo
name
attributes
user_id
archive_path
venv_path
repo_connector
81    @property
82    def repo_connector(self):
83        """
84        Return the repository connector for this plugin.
85        NOTE: This imports the `connectors` module, which imports certain plugin modules.
86        """
87        if self._repo_connector is None:
88            from meerschaum.connectors.parse import parse_repo_keys
89
90            repo_keys = self._repo_keys or self._repo_in_name
91            if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name:
92                error(
93                    f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'."
94                )
95            repo_connector = parse_repo_keys(repo_keys)
96            self._repo_connector = repo_connector
97        return self._repo_connector

Return the repository connector for this plugin. NOTE: This imports the connectors module, which imports certain plugin modules.

version
100    @property
101    def version(self):
102        """
103        Return the plugin's module version is defined (`__version__`) if it's defined.
104        """
105        if self._version is None:
106            try:
107                self._version = self.module.__version__
108            except Exception as e:
109                self._version = None
110        return self._version

Return the plugin's module version is defined (__version__) if it's defined.

module
113    @property
114    def module(self):
115        """
116        Return the Python module of the underlying plugin.
117        """
118        if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None:
119            if self.__file__ is None:
120                return None
121            from meerschaum.plugins import import_plugins
122            self._module = import_plugins(str(self), warn=False)
123        return self._module

Return the Python module of the underlying plugin.

requirements_file_path: Optional[pathlib.Path]
149    @property
150    def requirements_file_path(self) -> Union[pathlib.Path, None]:
151        """
152        If a file named `requirements.txt` exists, return its path.
153        """
154        if self.__file__ is None:
155            return None
156        path = pathlib.Path(self.__file__).parent / 'requirements.txt'
157        if not path.exists():
158            return None
159        return path

If a file named requirements.txt exists, return its path.

def is_installed(self, **kw) -> bool:
162    def is_installed(self, **kw) -> bool:
163        """
164        Check whether a plugin is correctly installed.
165
166        Returns
167        -------
168        A `bool` indicating whether a plugin exists and is successfully imported.
169        """
170        return self.__file__ is not None

Check whether a plugin is correctly installed.

Returns
  • A bool indicating whether a plugin exists and is successfully imported.
def make_tar(self, debug: bool = False) -> pathlib.Path:
173    def make_tar(self, debug: bool = False) -> pathlib.Path:
174        """
175        Compress the plugin's source files into a `.tar.gz` archive and return the archive's path.
176
177        Parameters
178        ----------
179        debug: bool, default False
180            Verbosity toggle.
181
182        Returns
183        -------
184        A `pathlib.Path` to the archive file's path.
185
186        """
187        import tarfile, pathlib, subprocess, fnmatch
188        from meerschaum.utils.debug import dprint
189        from meerschaum.utils.packages import attempt_import
190        pathspec = attempt_import('pathspec', debug=debug)
191
192        if not self.__file__:
193            from meerschaum.utils.warnings import error
194            error(f"Could not find file for plugin '{self}'.")
195        if '__init__.py' in self.__file__ or os.path.isdir(self.__file__):
196            path = self.__file__.replace('__init__.py', '')
197            is_dir = True
198        else:
199            path = self.__file__
200            is_dir = False
201
202        old_cwd = os.getcwd()
203        real_parent_path = pathlib.Path(os.path.realpath(path)).parent
204        os.chdir(real_parent_path)
205
206        default_patterns_to_ignore = [
207            '.pyc',
208            '__pycache__/',
209            'eggs/',
210            '__pypackages__/',
211            '.git',
212        ]
213
214        def parse_gitignore() -> 'Set[str]':
215            gitignore_path = pathlib.Path(path) / '.gitignore'
216            if not gitignore_path.exists():
217                return set(default_patterns_to_ignore)
218            with open(gitignore_path, 'r', encoding='utf-8') as f:
219                gitignore_text = f.read()
220            return set(pathspec.PathSpec.from_lines(
221                pathspec.patterns.GitWildMatchPattern,
222                default_patterns_to_ignore + gitignore_text.splitlines()
223            ).match_tree(path))
224
225        patterns_to_ignore = parse_gitignore() if is_dir else set()
226
227        if debug:
228            dprint(f"Patterns to ignore:\n{patterns_to_ignore}")
229
230        with tarfile.open(self.archive_path, 'w:gz') as tarf:
231            if not is_dir:
232                tarf.add(f"{self.name}.py")
233            else:
234                for root, dirs, files in os.walk(self.name):
235                    for f in files:
236                        good_file = True
237                        fp = os.path.join(root, f)
238                        for pattern in patterns_to_ignore:
239                            if pattern in str(fp) or f.startswith('.'):
240                                good_file = False
241                                break
242                        if good_file:
243                            if debug:
244                                dprint(f"Adding '{fp}'...")
245                            tarf.add(fp)
246
247        ### clean up and change back to old directory
248        os.chdir(old_cwd)
249
250        ### change to 775 to avoid permissions issues with the API in a Docker container
251        self.archive_path.chmod(0o775)
252
253        if debug:
254            dprint(f"Created archive '{self.archive_path}'.")
255        return self.archive_path

Compress the plugin's source files into a .tar.gz archive and return the archive's path.

Parameters
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pathlib.Path to the archive file's path.
def install( self, skip_deps: bool = False, force: bool = False, debug: bool = False) -> Tuple[bool, str]:
258    def install(
259            self,
260            skip_deps: bool = False,
261            force: bool = False,
262            debug: bool = False,
263        ) -> SuccessTuple:
264        """
265        Extract a plugin's tar archive to the plugins directory.
266        
267        This function checks if the plugin is already installed and if the version is equal or
268        greater than the existing installation.
269
270        Parameters
271        ----------
272        skip_deps: bool, default False
273            If `True`, do not install dependencies.
274
275        force: bool, default False
276            If `True`, continue with installation, even if required packages fail to install.
277
278        debug: bool, default False
279            Verbosity toggle.
280
281        Returns
282        -------
283        A `SuccessTuple` of success (bool) and a message (str).
284
285        """
286        if self.full_name in _ongoing_installations:
287            return True, f"Already installing plugin '{self}'."
288        _ongoing_installations.add(self.full_name)
289        from meerschaum.utils.warnings import warn, error
290        if debug:
291            from meerschaum.utils.debug import dprint
292        import tarfile
293        import re
294        import ast
295        from meerschaum.plugins import sync_plugins_symlinks
296        from meerschaum.utils.packages import attempt_import, determine_version, reload_meerschaum
297        from meerschaum.utils.venv import init_venv
298        from meerschaum.utils.misc import safely_extract_tar
299        old_cwd = os.getcwd()
300        old_version = ''
301        new_version = ''
302        temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name
303        temp_dir.mkdir(exist_ok=True)
304
305        if not self.archive_path.exists():
306            return False, f"Missing archive file for plugin '{self}'."
307        if self.version is not None:
308            old_version = self.version
309            if debug:
310                dprint(f"Found existing version '{old_version}' for plugin '{self}'.")
311
312        if debug:
313            dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...")
314
315        try:
316            with tarfile.open(self.archive_path, 'r:gz') as tarf:
317                safely_extract_tar(tarf, temp_dir)
318        except Exception as e:
319            warn(e)
320            return False, f"Failed to extract plugin '{self.name}'."
321
322        ### search for version information
323        files = os.listdir(temp_dir)
324        
325        if str(files[0]) == self.name:
326            is_dir = True
327        elif str(files[0]) == self.name + '.py':
328            is_dir = False
329        else:
330            error(f"Unknown format encountered for plugin '{self}'.")
331
332        fpath = temp_dir / files[0]
333        if is_dir:
334            fpath = fpath / '__init__.py'
335
336        init_venv(self.name, debug=debug)
337        with open(fpath, 'r', encoding='utf-8') as f:
338            init_lines = f.readlines()
339        new_version = None
340        for line in init_lines:
341            if '__version__' not in line:
342                continue
343            version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip())
344            if not version_match:
345                continue
346            new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip())
347            break
348        if not new_version:
349            warn(
350                f"No `__version__` defined for plugin '{self}'. "
351                + "Assuming new version...",
352                stack = False,
353            )
354
355        packaging_version = attempt_import('packaging.version')
356        try:
357            is_new_version = (not new_version and not old_version) or (
358                packaging_version.parse(old_version) < packaging_version.parse(new_version)
359            )
360            is_same_version = new_version and old_version and (
361                packaging_version.parse(old_version) == packaging_version.parse(new_version)
362            )
363        except Exception as e:
364            is_new_version, is_same_version = True, False
365
366        ### Determine where to permanently store the new plugin.
367        plugin_installation_dir_path = PLUGINS_DIR_PATHS[0]
368        for path in PLUGINS_DIR_PATHS:
369            files_in_plugins_dir = os.listdir(path)
370            if (
371                self.name in files_in_plugins_dir
372                or
373                (self.name + '.py') in files_in_plugins_dir
374            ):
375                plugin_installation_dir_path = path
376                break
377
378        success_msg = (
379            f"Successfully installed plugin '{self}'"
380            + ("\n    (skipped dependencies)" if skip_deps else "")
381            + "."
382        )
383        success, abort = None, None
384
385        if is_same_version and not force:
386            success, msg = True, (
387                f"Plugin '{self}' is up-to-date (version {old_version}).\n" +
388                "    Install again with `-f` or `--force` to reinstall."
389            )
390            abort = True
391        elif is_new_version or force:
392            for src_dir, dirs, files in os.walk(temp_dir):
393                if success is not None:
394                    break
395                dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path))
396                if not os.path.exists(dst_dir):
397                    os.mkdir(dst_dir)
398                for f in files:
399                    src_file = os.path.join(src_dir, f)
400                    dst_file = os.path.join(dst_dir, f)
401                    if os.path.exists(dst_file):
402                        os.remove(dst_file)
403
404                    if debug:
405                        dprint(f"Moving '{src_file}' to '{dst_dir}'...")
406                    try:
407                        shutil.move(src_file, dst_dir)
408                    except Exception as e:
409                        success, msg = False, (
410                            f"Failed to install plugin '{self}': " +
411                            f"Could not move file '{src_file}' to '{dst_dir}'"
412                        )
413                        print(msg)
414                        break
415            if success is None:
416                success, msg = True, success_msg
417        else:
418            success, msg = False, (
419                f"Your installed version of plugin '{self}' ({old_version}) is higher than "
420                + f"attempted version {new_version}."
421            )
422
423        shutil.rmtree(temp_dir)
424        os.chdir(old_cwd)
425
426        ### Reload the plugin's module.
427        sync_plugins_symlinks(debug=debug)
428        if '_module' in self.__dict__:
429            del self.__dict__['_module']
430        init_venv(venv=self.name, force=True, debug=debug)
431        reload_meerschaum(debug=debug)
432
433        ### if we've already failed, return here
434        if not success or abort:
435            _ongoing_installations.remove(self.full_name)
436            return success, msg
437
438        ### attempt to install dependencies
439        dependencies_installed = skip_deps or self.install_dependencies(force=force, debug=debug)
440        if not dependencies_installed:
441            _ongoing_installations.remove(self.full_name)
442            return False, f"Failed to install dependencies for plugin '{self}'."
443
444        ### handling success tuple, bool, or other (typically None)
445        setup_tuple = self.setup(debug=debug)
446        if isinstance(setup_tuple, tuple):
447            if not setup_tuple[0]:
448                success, msg = setup_tuple
449        elif isinstance(setup_tuple, bool):
450            if not setup_tuple:
451                success, msg = False, (
452                    f"Failed to run post-install setup for plugin '{self}'." + '\n' +
453                    f"Check `setup()` in '{self.__file__}' for more information " +
454                    f"(no error message provided)."
455                )
456            else:
457                success, msg = True, success_msg
458        elif setup_tuple is None:
459            success = True
460            msg = (
461                f"Post-install for plugin '{self}' returned None. " +
462                f"Assuming plugin successfully installed."
463            )
464            warn(msg)
465        else:
466            success = False
467            msg = (
468                f"Post-install for plugin '{self}' returned unexpected value " +
469                f"of type '{type(setup_tuple)}': {setup_tuple}"
470            )
471
472        _ongoing_installations.remove(self.full_name)
473        module = self.module
474        return success, msg

Extract a plugin's tar archive to the plugins directory.

This function checks if the plugin is already installed and if the version is equal or greater than the existing installation.

Parameters
  • skip_deps (bool, default False): If True, do not install dependencies.
  • force (bool, default False): If True, continue with installation, even if required packages fail to install.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A SuccessTuple of success (bool) and a message (str).
def remove_archive(self, debug: bool = False) -> Tuple[bool, str]:
477    def remove_archive(
478        self,        
479        debug: bool = False
480    ) -> SuccessTuple:
481        """Remove a plugin's archive file."""
482        if not self.archive_path.exists():
483            return True, f"Archive file for plugin '{self}' does not exist."
484        try:
485            self.archive_path.unlink()
486        except Exception as e:
487            return False, f"Failed to remove archive for plugin '{self}':\n{e}"
488        return True, "Success"

Remove a plugin's archive file.

def remove_venv(self, debug: bool = False) -> Tuple[bool, str]:
491    def remove_venv(
492        self,        
493        debug: bool = False
494    ) -> SuccessTuple:
495        """Remove a plugin's virtual environment."""
496        if not self.venv_path.exists():
497            return True, f"Virtual environment for plugin '{self}' does not exist."
498        try:
499            shutil.rmtree(self.venv_path)
500        except Exception as e:
501            return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}"
502        return True, "Success"

Remove a plugin's virtual environment.

def uninstall(self, debug: bool = False) -> Tuple[bool, str]:
505    def uninstall(self, debug: bool = False) -> SuccessTuple:
506        """
507        Remove a plugin, its virtual environment, and archive file.
508        """
509        from meerschaum.utils.packages import reload_meerschaum
510        from meerschaum.plugins import sync_plugins_symlinks
511        from meerschaum.utils.warnings import warn, info
512        warnings_thrown_count: int = 0
513        max_warnings: int = 3
514
515        if not self.is_installed():
516            info(
517                f"Plugin '{self.name}' doesn't seem to be installed.\n    "
518                + "Checking for artifacts...",
519                stack = False,
520            )
521        else:
522            real_path = pathlib.Path(os.path.realpath(self.__file__))
523            try:
524                if real_path.name == '__init__.py':
525                    shutil.rmtree(real_path.parent)
526                else:
527                    real_path.unlink()
528            except Exception as e:
529                warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False)
530                warnings_thrown_count += 1
531            else:
532                info(f"Removed source files for plugin '{self.name}'.")
533
534        if self.venv_path.exists():
535            success, msg = self.remove_venv(debug=debug)
536            if not success:
537                warn(msg, stack=False)
538                warnings_thrown_count += 1
539            else:
540                info(f"Removed virtual environment from plugin '{self.name}'.")
541
542        success = warnings_thrown_count < max_warnings
543        sync_plugins_symlinks(debug=debug)
544        self.deactivate_venv(force=True, debug=debug)
545        reload_meerschaum(debug=debug)
546        return success, (
547            f"Successfully uninstalled plugin '{self}'." if success
548            else f"Failed to uninstall plugin '{self}'."
549        )

Remove a plugin, its virtual environment, and archive file.

def setup( self, *args: str, debug: bool = False, **kw: Any) -> Union[Tuple[bool, str], bool]:
552    def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]:
553        """
554        If exists, run the plugin's `setup()` function.
555
556        Parameters
557        ----------
558        *args: str
559            The positional arguments passed to the `setup()` function.
560            
561        debug: bool, default False
562            Verbosity toggle.
563
564        **kw: Any
565            The keyword arguments passed to the `setup()` function.
566
567        Returns
568        -------
569        A `SuccessTuple` or `bool` indicating success.
570
571        """
572        from meerschaum.utils.debug import dprint
573        import inspect
574        _setup = None
575        for name, fp in inspect.getmembers(self.module):
576            if name == 'setup' and inspect.isfunction(fp):
577                _setup = fp
578                break
579
580        ### assume success if no setup() is found (not necessary)
581        if _setup is None:
582            return True
583
584        sig = inspect.signature(_setup)
585        has_debug, has_kw = ('debug' in sig.parameters), False
586        for k, v in sig.parameters.items():
587            if '**' in str(v):
588                has_kw = True
589                break
590
591        _kw = {}
592        if has_kw:
593            _kw.update(kw)
594        if has_debug:
595            _kw['debug'] = debug
596
597        if debug:
598            dprint(f"Running setup for plugin '{self}'...")
599        try:
600            self.activate_venv(debug=debug)
601            return_tuple = _setup(*args, **_kw)
602            self.deactivate_venv(debug=debug)
603        except Exception as e:
604            return False, str(e)
605
606        if isinstance(return_tuple, tuple):
607            return return_tuple
608        if isinstance(return_tuple, bool):
609            return return_tuple, f"Setup for Plugin '{self.name}' did not return a message."
610        if return_tuple is None:
611            return False, f"Setup for Plugin '{self.name}' returned None."
612        return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"

If exists, run the plugin's setup() function.

Parameters
  • *args (str): The positional arguments passed to the setup() function.
  • debug (bool, default False): Verbosity toggle.
  • **kw (Any): The keyword arguments passed to the setup() function.
Returns
  • A SuccessTuple or bool indicating success.
def get_dependencies(self, debug: bool = False) -> List[str]:
615    def get_dependencies(
616        self,
617        debug: bool = False,
618    ) -> List[str]:
619        """
620        If the Plugin has specified dependencies in a list called `required`, return the list.
621        
622        **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages.
623        Meerschaum plugins may also specify connector keys for a repo after `'@'`.
624
625        Parameters
626        ----------
627        debug: bool, default False
628            Verbosity toggle.
629
630        Returns
631        -------
632        A list of required packages and plugins (str).
633
634        """
635        if '_required' in self.__dict__:
636            return self._required
637
638        ### If the plugin has not yet been imported,
639        ### infer the dependencies from the source text.
640        ### This is not super robust, and it doesn't feel right
641        ### having multiple versions of the logic.
642        ### This is necessary when determining the activation order
643        ### without having import the module.
644        ### For consistency's sake, the module-less method does not cache the requirements.
645        if self.__dict__.get('_module', None) is None:
646            file_path = self.__file__
647            if file_path is None:
648                return []
649            with open(file_path, 'r', encoding='utf-8') as f:
650                text = f.read()
651
652            if 'required' not in text:
653                return []
654
655            ### This has some limitations:
656            ### It relies on `required` being manually declared.
657            ### We lose the ability to dynamically alter the `required` list,
658            ### which is why we've kept the module-reliant method below.
659            import ast, re
660            ### NOTE: This technically would break 
661            ### if `required` was the very first line of the file.
662            req_start_match = re.search(r'\nrequired(:\s*)?.*=', text)
663            if not req_start_match:
664                return []
665            req_start = req_start_match.start()
666            equals_sign = req_start + text[req_start:].find('=')
667
668            ### Dependencies may have brackets within the strings, so push back the index.
669            first_opening_brace = equals_sign + 1 + text[equals_sign:].find('[')
670            if first_opening_brace == -1:
671                return []
672
673            next_closing_brace = equals_sign + 1 + text[equals_sign:].find(']')
674            if next_closing_brace == -1:
675                return []
676
677            start_ix = first_opening_brace + 1
678            end_ix = next_closing_brace
679
680            num_braces = 0
681            while True:
682                if '[' not in text[start_ix:end_ix]:
683                    break
684                num_braces += 1
685                start_ix = end_ix
686                end_ix += text[end_ix + 1:].find(']') + 1
687
688            req_end = end_ix + 1
689            req_text = (
690                text[(first_opening_brace-1):req_end]
691                .lstrip()
692                .replace('=', '', 1)
693                .lstrip()
694                .rstrip()
695            )
696            try:
697                required = ast.literal_eval(req_text)
698            except Exception as e:
699                warn(
700                    f"Unable to determine requirements for plugin '{self.name}' "
701                    + "without importing the module.\n"
702                    + "    This may be due to dynamically setting the global `required` list.\n"
703                    + f"    {e}"
704                )
705                return []
706            return required
707
708        import inspect
709        self.activate_venv(dependencies=False, debug=debug)
710        required = []
711        for name, val in inspect.getmembers(self.module):
712            if name == 'required':
713                required = val
714                break
715        self._required = required
716        self.deactivate_venv(dependencies=False, debug=debug)
717        return required

If the Plugin has specified dependencies in a list called required, return the list.

NOTE: Dependecies which start with 'plugin:' are Meerschaum plugins, not pip packages. Meerschaum plugins may also specify connector keys for a repo after '@'.

Parameters
  • debug (bool, default False): Verbosity toggle.
Returns
  • A list of required packages and plugins (str).
def get_required_plugins(self, debug: bool = False) -> List[Plugin]:
720    def get_required_plugins(self, debug: bool=False) -> List[meerschaum.plugins.Plugin]:
721        """
722        Return a list of required Plugin objects.
723        """
724        from meerschaum.utils.warnings import warn
725        from meerschaum.config import get_config
726        from meerschaum.config.static import STATIC_CONFIG
727        plugins = []
728        _deps = self.get_dependencies(debug=debug)
729        sep = STATIC_CONFIG['plugins']['repo_separator']
730        plugin_names = [
731            _d[len('plugin:'):] for _d in _deps
732            if _d.startswith('plugin:') and len(_d) > len('plugin:')
733        ]
734        default_repo_keys = get_config('meerschaum', 'default_repository')
735        for _plugin_name in plugin_names:
736            if sep in _plugin_name:
737                try:
738                    _plugin_name, _repo_keys = _plugin_name.split(sep)
739                except Exception as e:
740                    _repo_keys = default_repo_keys
741                    warn(
742                        f"Invalid repo keys for required plugin '{_plugin_name}'.\n    "
743                        + f"Will try to use '{_repo_keys}' instead.",
744                        stack = False,
745                    )
746            else:
747                _repo_keys = default_repo_keys
748            plugins.append(Plugin(_plugin_name, repo=_repo_keys))
749        return plugins

Return a list of required Plugin objects.

def get_required_packages(self, debug: bool = False) -> List[str]:
752    def get_required_packages(self, debug: bool=False) -> List[str]:
753        """
754        Return the required package names (excluding plugins).
755        """
756        _deps = self.get_dependencies(debug=debug)
757        return [_d for _d in _deps if not _d.startswith('plugin:')]

Return the required package names (excluding plugins).

def activate_venv(self, dependencies: bool = True, debug: bool = False, **kw) -> bool:
760    def activate_venv(self, dependencies: bool=True, debug: bool=False, **kw) -> bool:
761        """
762        Activate the virtual environments for the plugin and its dependencies.
763
764        Parameters
765        ----------
766        dependencies: bool, default True
767            If `True`, activate the virtual environments for required plugins.
768
769        Returns
770        -------
771        A bool indicating success.
772        """
773        from meerschaum.utils.venv import venv_target_path
774        from meerschaum.utils.packages import activate_venv
775        from meerschaum.utils.misc import make_symlink, is_symlink
776        from meerschaum.config._paths import PACKAGE_ROOT_PATH
777
778        if dependencies:
779            for plugin in self.get_required_plugins(debug=debug):
780                plugin.activate_venv(debug=debug, **kw)
781
782        vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True)
783        venv_meerschaum_path = vtp / 'meerschaum'
784
785        try:
786            success, msg = True, "Success"
787            if is_symlink(venv_meerschaum_path):
788                if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH:
789                    venv_meerschaum_path.unlink()
790                    success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH)
791        except Exception as e:
792            success, msg = False, str(e)
793        if not success:
794            warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}")
795
796        return activate_venv(self.name, debug=debug, **kw)

Activate the virtual environments for the plugin and its dependencies.

Parameters
  • dependencies (bool, default True): If True, activate the virtual environments for required plugins.
Returns
  • A bool indicating success.
def deactivate_venv(self, dependencies: bool = True, debug: bool = False, **kw) -> bool:
799    def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool:
800        """
801        Deactivate the virtual environments for the plugin and its dependencies.
802
803        Parameters
804        ----------
805        dependencies: bool, default True
806            If `True`, deactivate the virtual environments for required plugins.
807
808        Returns
809        -------
810        A bool indicating success.
811        """
812        from meerschaum.utils.packages import deactivate_venv
813        success = deactivate_venv(self.name, debug=debug, **kw)
814        if dependencies:
815            for plugin in self.get_required_plugins(debug=debug):
816                plugin.deactivate_venv(debug=debug, **kw)
817        return success

Deactivate the virtual environments for the plugin and its dependencies.

Parameters
  • dependencies (bool, default True): If True, deactivate the virtual environments for required plugins.
Returns
  • A bool indicating success.
def install_dependencies(self, force: bool = False, debug: bool = False) -> bool:
820    def install_dependencies(
821            self,
822            force: bool = False,
823            debug: bool = False,
824        ) -> bool:
825        """
826        If specified, install dependencies.
827        
828        **NOTE:** Dependencies that start with `'plugin:'` will be installed as
829        Meerschaum plugins from the same repository as this Plugin.
830        To install from a different repository, add the repo keys after `'@'`
831        (e.g. `'plugin:foo@api:bar'`).
832
833        Parameters
834        ----------
835        force: bool, default False
836            If `True`, continue with the installation, even if some
837            required packages fail to install.
838
839        debug: bool, default False
840            Verbosity toggle.
841
842        Returns
843        -------
844        A bool indicating success.
845
846        """
847        from meerschaum.utils.packages import pip_install, venv_contains_package
848        from meerschaum.utils.debug import dprint
849        from meerschaum.utils.warnings import warn, info
850        from meerschaum.connectors.parse import parse_repo_keys
851        _deps = self.get_dependencies(debug=debug)
852        if not _deps and self.requirements_file_path is None:
853            return True
854
855        plugins = self.get_required_plugins(debug=debug)
856        for _plugin in plugins:
857            if _plugin.name == self.name:
858                warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False)
859                continue
860            _success, _msg = _plugin.repo_connector.install_plugin(
861                _plugin.name, debug=debug, force=force
862            )
863            if not _success:
864                warn(
865                    f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'"
866                    + f" for plugin '{self.name}':\n" + _msg,
867                    stack = False,
868                )
869                if not force:
870                    warn(
871                        "Try installing with the `--force` flag to continue anyway.",
872                        stack = False,
873                    )
874                    return False
875                info(
876                    "Continuing with installation despite the failure "
877                    + "(careful, things might be broken!)...",
878                    icon = False
879                )
880
881
882        ### First step: parse `requirements.txt` if it exists.
883        if self.requirements_file_path is not None:
884            if not pip_install(
885                requirements_file_path=self.requirements_file_path,
886                venv=self.name, debug=debug
887            ):
888                warn(
889                    f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.",
890                    stack = False,
891                )
892                if not force:
893                    warn(
894                        "Try installing with `--force` to continue anyway.",
895                        stack = False,
896                    )
897                    return False
898                info(
899                    "Continuing with installation despite the failure "
900                    + "(careful, things might be broken!)...",
901                    icon = False
902                )
903
904
905        ### Don't reinstall packages that are already included in required plugins.
906        packages = []
907        _packages = self.get_required_packages(debug=debug)
908        accounted_for_packages = set()
909        for package_name in _packages:
910            for plugin in plugins:
911                if venv_contains_package(package_name, plugin.name):
912                    accounted_for_packages.add(package_name)
913                    break
914        packages = [pkg for pkg in _packages if pkg not in accounted_for_packages]
915
916        ### Attempt pip packages installation.
917        if packages:
918            for package in packages:
919                if not pip_install(package, venv=self.name, debug=debug):
920                    warn(
921                        f"Failed to install required package '{package}'"
922                        + f" for plugin '{self.name}'.",
923                        stack = False,
924                    )
925                    if not force:
926                        warn(
927                            "Try installing with `--force` to continue anyway.",
928                            stack = False,
929                        )
930                        return False
931                    info(
932                        "Continuing with installation despite the failure "
933                        + "(careful, things might be broken!)...",
934                        icon = False
935                    )
936        return True

If specified, install dependencies.

NOTE: Dependencies that start with 'plugin:' will be installed as Meerschaum plugins from the same repository as this Plugin. To install from a different repository, add the repo keys after '@' (e.g. 'plugin:foo@api:bar').

Parameters
  • force (bool, default False): If True, continue with the installation, even if some required packages fail to install.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool indicating success.
full_name: str
939    @property
940    def full_name(self) -> str:
941        """
942        Include the repo keys with the plugin's name.
943        """
944        from meerschaum.config.static import STATIC_CONFIG
945        sep = STATIC_CONFIG['plugins']['repo_separator']
946        return self.name + sep + str(self.repo_connector)

Include the repo keys with the plugin's name.

def make_action( function: Callable[[Any], Any], shell: bool = False, activate: bool = True, deactivate: bool = True, debug: bool = False) -> Callable[[Any], Any]:
41def make_action(
42    function: Callable[[Any], Any],
43    shell: bool = False,
44    activate: bool = True,
45    deactivate: bool = True,
46    debug: bool = False
47) -> Callable[[Any], Any]:
48    """
49    Make a function a Meerschaum action. Useful for plugins that are adding multiple actions.
50    
51    Parameters
52    ----------
53    function: Callable[[Any], Any]
54        The function to become a Meerschaum action. Must accept all keyword arguments.
55        
56    shell: bool, default False
57        Not used.
58        
59    Returns
60    -------
61    Another function (this is a decorator function).
62
63    Examples
64    --------
65    >>> from meerschaum.plugins import make_action
66    >>>
67    >>> @make_action
68    ... def my_action(**kw):
69    ...     print('foo')
70    ...     return True, "Success"
71    >>>
72    """
73
74    from meerschaum.actions import actions
75    from meerschaum.utils.formatting import pprint
76    package_name = function.__globals__['__name__']
77    plugin_name = (
78        package_name.split('.')[1]
79        if package_name.startswith('plugins.') else None
80    )
81    plugin = Plugin(plugin_name) if plugin_name else None
82
83    if debug:
84        from meerschaum.utils.debug import dprint
85        dprint(
86            f"Adding action '{function.__name__}' from plugin " +
87            f"'{plugin}'..."
88        )
89
90    actions[function.__name__] = function
91    return function

Make a function a Meerschaum action. Useful for plugins that are adding multiple actions.

Parameters
  • function (Callable[[Any], Any]): The function to become a Meerschaum action. Must accept all keyword arguments.
  • shell (bool, default False): Not used.
Returns
  • Another function (this is a decorator function).
Examples
>>> from meerschaum.plugins import make_action
>>>
>>> @make_action
... def my_action(**kw):
...     print('foo')
...     return True, "Success"
>>>
def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
229def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
230    """
231    Execute the function when initializing the Meerschaum API module.
232    Useful for lazy-loading heavy plugins only when the API is started,
233    such as when editing the `meerschaum.api.app` FastAPI app.
234    
235    The FastAPI app will be passed as the only parameter.
236    
237    Examples
238    --------
239    >>> from meerschaum.plugins import api_plugin
240    >>>
241    >>> @api_plugin
242    >>> def initialize_plugin(app):
243    ...     @app.get('/my/new/path')
244    ...     def new_path():
245    ...         return {'message': 'It works!'}
246    >>>
247    """
248    with _locks['_api_plugins']:
249        try:
250            if function.__module__ not in _api_plugins:
251                _api_plugins[function.__module__] = []
252            _api_plugins[function.__module__].append(function)
253        except Exception as e:
254            from meerschaum.utils.warnings import warn
255            warn(e)
256    return function

Execute the function when initializing the Meerschaum API module. Useful for lazy-loading heavy plugins only when the API is started, such as when editing the meerschaum.api.app FastAPI app.

The FastAPI app will be passed as the only parameter.

Examples
>>> from meerschaum.plugins import api_plugin
>>>
>>> @api_plugin
>>> def initialize_plugin(app):
...     @app.get('/my/new/path')
...     def new_path():
...         return {'message': 'It works!'}
>>>
def dash_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
214def dash_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
215    """
216    Execute the function when starting the Dash application.
217    """
218    with _locks['_dash_plugins']:
219        try:
220            if function.__module__ not in _dash_plugins:
221                _dash_plugins[function.__module__] = []
222            _dash_plugins[function.__module__].append(function)
223        except Exception as e:
224            from meerschaum.utils.warnings import warn
225            warn(e)
226    return function

Execute the function when starting the Dash application.

def web_page( page: Union[str, NoneType, Callable[[Any], Any]] = None, login_required: bool = True, **kwargs) -> Any:
165def web_page(
166        page: Union[str, None, Callable[[Any], Any]] = None,
167        login_required: bool = True,
168        **kwargs
169    ) -> Any:
170    """
171    Quickly add pages to the dash application.
172
173    Examples
174    --------
175    >>> import meerschaum as mrsm
176    >>> from meerschaum.plugins import web_page
177    >>> html = mrsm.attempt_import('dash.html')
178    >>> 
179    >>> @web_page('foo/bar', login_required=False)
180    >>> def foo_bar():
181    ...     return html.Div([html.H1("Hello, World!")])
182    >>> 
183    """
184    page_str = None
185
186    def _decorator(_func: Callable[[Any], Any]) -> Callable[[Any], Any]:
187        nonlocal page_str
188
189        @functools.wraps(_func)
190        def wrapper(*_args, **_kwargs):
191            return _func(*_args, **_kwargs)
192
193        if page_str is None:
194            page_str = _func.__name__
195
196        page_str = page_str.lstrip('/').rstrip('/').strip()
197        _plugin_endpoints_to_pages[page_str] = {
198            'function': _func,
199            'login_required': login_required,
200        }
201        return wrapper
202
203    if callable(page):
204        decorator_to_return = _decorator(page)
205        page_str = page.__name__
206    else:
207        decorator_to_return = _decorator
208        page_str = page
209
210    return decorator_to_return

Quickly add pages to the dash application.

Examples
>>> import meerschaum as mrsm
>>> from meerschaum.plugins import web_page
>>> html = mrsm.attempt_import('dash.html')
>>> 
>>> @web_page('foo/bar', login_required=False)
>>> def foo_bar():
...     return html.Div([html.H1("Hello, World!")])
>>>
def import_plugins( *plugins_to_import: Union[str, List[str], NoneType], warn: bool = True) -> "Union['ModuleType', Tuple['ModuleType', None]]":
438def import_plugins(
439    *plugins_to_import: Union[str, List[str], None],
440    warn: bool = True,
441) -> Union[
442    'ModuleType', Tuple['ModuleType', None]
443]:
444    """
445    Import the Meerschaum plugins directory.
446
447    Parameters
448    ----------
449    plugins_to_import: Union[str, List[str], None]
450        If provided, only import the specified plugins.
451        Otherwise import the entire plugins module. May be a string, list, or `None`.
452        Defaults to `None`.
453
454    Returns
455    -------
456    A module of list of modules, depening on the number of plugins provided.
457
458    """
459    import sys
460    import os
461    import importlib
462    from meerschaum.utils.misc import flatten_list
463    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
464    from meerschaum.utils.venv import is_venv_active, activate_venv, deactivate_venv, Venv
465    from meerschaum.utils.warnings import warn as _warn
466    plugins_to_import = list(plugins_to_import)
467    with _locks['sys.path']:
468
469        ### Since plugins may depend on other plugins,
470        ### we need to activate the virtual environments for library plugins.
471        ### This logic exists in `Plugin.activate_venv()`,
472        ### but that code requires the plugin's module to already be imported.
473        ### It's not a guarantee of correct activation order,
474        ### e.g. if a library plugin pins a specific package and another 
475        plugins_names = get_plugins_names()
476        already_active_venvs = [is_venv_active(plugin_name) for plugin_name in plugins_names]
477
478        if not sys.path or sys.path[0] != str(PLUGINS_RESOURCES_PATH.parent):
479            sys.path.insert(0, str(PLUGINS_RESOURCES_PATH.parent))
480
481        if not plugins_to_import:
482            for plugin_name in plugins_names:
483                activate_venv(plugin_name)
484            try:
485                imported_plugins = importlib.import_module(PLUGINS_RESOURCES_PATH.stem)
486            except ImportError as e:
487                _warn(f"Failed to import the plugins module:\n    {e}")
488                import traceback
489                traceback.print_exc()
490                imported_plugins = None
491            for plugin_name in plugins_names:
492                if plugin_name in already_active_venvs:
493                    continue
494                deactivate_venv(plugin_name)
495
496        else:
497            imported_plugins = []
498            for plugin_name in flatten_list(plugins_to_import):
499                plugin = Plugin(plugin_name)
500                try:
501                    with Venv(plugin):
502                        imported_plugins.append(
503                            importlib.import_module(
504                                f'{PLUGINS_RESOURCES_PATH.stem}.{plugin_name}'
505                            )
506                        )
507                except Exception as e:
508                    _warn(
509                        f"Failed to import plugin '{plugin_name}':\n    "
510                        + f"{e}\n\nHere's a stacktrace:",
511                        stack = False,
512                    )
513                    from meerschaum.utils.formatting import get_console
514                    get_console().print_exception(
515                        suppress = [
516                            'meerschaum/plugins/__init__.py',
517                            importlib,
518                            importlib._bootstrap,
519                        ]
520                    )
521                    imported_plugins.append(None)
522
523        if imported_plugins is None and warn:
524            _warn(f"Failed to import plugins.", stacklevel=3)
525
526        if str(PLUGINS_RESOURCES_PATH.parent) in sys.path:
527            sys.path.remove(str(PLUGINS_RESOURCES_PATH.parent))
528
529    if isinstance(imported_plugins, list):
530        return (imported_plugins[0] if len(imported_plugins) == 1 else tuple(imported_plugins))
531    return imported_plugins

Import the Meerschaum plugins directory.

Parameters
  • plugins_to_import (Union[str, List[str], None]): If provided, only import the specified plugins. Otherwise import the entire plugins module. May be a string, list, or None. Defaults to None.
Returns
  • A module of list of modules, depening on the number of plugins provided.
def from_plugin_import(plugin_import_name: str, *attrs: str) -> Any:
534def from_plugin_import(plugin_import_name: str, *attrs: str) -> Any:
535    """
536    Emulate the `from module import x` behavior.
537
538    Parameters
539    ----------
540    plugin_import_name: str
541        The import name of the plugin's module.
542        Separate submodules with '.' (e.g. 'compose.utils.pipes')
543
544    attrs: str
545        Names of the attributes to return.
546
547    Returns
548    -------
549    Objects from a plugin's submodule.
550    If multiple objects are provided, return a tuple.
551
552    Examples
553    --------
554    >>> init = from_plugin_import('compose.utils', 'init')
555    >>> with mrsm.Venv('compose'):
556    ...     cf = init()
557    >>> build_parent_pipe, get_defined_pipes = from_plugin_import(
558    ...     'compose.utils.pipes',
559    ...     'build_parent_pipe',
560    ...     'get_defined_pipes',
561    ... )
562    >>> parent_pipe = build_parent_pipe(cf)
563    >>> defined_pipes = get_defined_pipes(cf)
564    """
565    import importlib
566    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
567    from meerschaum.utils.warnings import warn as _warn
568    if plugin_import_name.startswith('plugins.'):
569        plugin_import_name = plugin_import_name[len('plugins.'):]
570    plugin_import_parts = plugin_import_name.split('.')
571    plugin_root_name = plugin_import_parts[0]
572    plugin = mrsm.Plugin(plugin_root_name)
573
574    submodule_import_name = '.'.join(
575        [PLUGINS_RESOURCES_PATH.stem]
576        + plugin_import_parts
577    )
578    if len(attrs) == 0:
579        raise ValueError(f"Provide which attributes to return from '{submodule_import_name}'.")
580
581    attrs_to_return = []
582    with mrsm.Venv(plugin):
583        if plugin.module is None:
584            return None
585
586        try:
587            submodule = importlib.import_module(submodule_import_name)
588        except ImportError as e:
589            _warn(
590                f"Failed to import plugin '{submodule_import_name}':\n    "
591                + f"{e}\n\nHere's a stacktrace:",
592                stack=False,
593            )
594            from meerschaum.utils.formatting import get_console
595            get_console().print_exception(
596                suppress=[
597                    'meerschaum/plugins/__init__.py',
598                    importlib,
599                    importlib._bootstrap,
600                ]
601            )
602            return None
603
604        for attr in attrs:
605            try:
606                attrs_to_return.append(getattr(submodule, attr))
607            except Exception:
608                _warn(f"Failed to access '{attr}' from '{submodule_import_name}'.")
609                attrs_to_return.append(None)
610        
611        if len(attrs) == 1:
612            return attrs_to_return[0]
613
614        return tuple(attrs_to_return)

Emulate the from module import x behavior.

Parameters
  • plugin_import_name (str): The import name of the plugin's module. Separate submodules with '.' (e.g. 'compose.utils.pipes')
  • attrs (str): Names of the attributes to return.
Returns
  • Objects from a plugin's submodule.
  • If multiple objects are provided, return a tuple.
Examples
>>> init = from_plugin_import('compose.utils', 'init')
>>> with mrsm.Venv('compose'):
...     cf = init()
>>> build_parent_pipe, get_defined_pipes = from_plugin_import(
...     'compose.utils.pipes',
...     'build_parent_pipe',
...     'get_defined_pipes',
... )
>>> parent_pipe = build_parent_pipe(cf)
>>> defined_pipes = get_defined_pipes(cf)
def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
656def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
657    """
658    Reload plugins back into memory.
659
660    Parameters
661    ----------
662    plugins: Optional[List[str]], default None
663        The plugins to reload. `None` will reload all plugins.
664
665    """
666    import sys
667    if debug:
668        from meerschaum.utils.debug import dprint
669
670    if not plugins:
671        plugins = get_plugins_names()
672    for plugin_name in plugins:
673        if debug:
674            dprint(f"Reloading plugin '{plugin_name}'...")
675        mod_name = 'plugins.' + str(plugin_name)
676        if mod_name in sys.modules:
677            del sys.modules[mod_name]
678    load_plugins(debug=debug)

Reload plugins back into memory.

Parameters
  • plugins (Optional[List[str]], default None): The plugins to reload. None will reload all plugins.
def get_plugins( *to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]:
681def get_plugins(*to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]:
682    """
683    Return a list of `Plugin` objects.
684
685    Parameters
686    ----------
687    to_load:
688        If specified, only load specific plugins.
689        Otherwise return all plugins.
690
691    try_import: bool, default True
692        If `True`, allow for plugins to be imported.
693    """
694    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
695    import os
696    sync_plugins_symlinks()
697    _plugins = [
698        Plugin(name) for name in (
699            to_load or [
700                (
701                    name if (PLUGINS_RESOURCES_PATH / name).is_dir()
702                    else name[:-3]
703                ) for name in os.listdir(PLUGINS_RESOURCES_PATH) if name != '__init__.py'
704            ]
705        )
706    ]
707    plugins = tuple(plugin for plugin in _plugins if plugin.is_installed(try_import=try_import))
708    if len(to_load) == 1:
709        if len(plugins) == 0:
710            raise ValueError(f"Plugin '{to_load[0]}' is not installed.")
711        return plugins[0]
712    return plugins

Return a list of Plugin objects.

Parameters
  • to_load:: If specified, only load specific plugins. Otherwise return all plugins.
  • try_import (bool, default True): If True, allow for plugins to be imported.
def get_data_plugins() -> List[Plugin]:
729def get_data_plugins() -> List[Plugin]:
730    """
731    Only return the modules of plugins with either `fetch()` or `sync()` functions.
732    """
733    import inspect
734    plugins = get_plugins()
735    data_names = {'sync', 'fetch'}
736    data_plugins = []
737    for plugin in plugins:
738        for name, ob in inspect.getmembers(plugin.module):
739            if not inspect.isfunction(ob):
740                continue
741            if name not in data_names:
742                continue
743            data_plugins.append(plugin)
744    return data_plugins

Only return the modules of plugins with either fetch() or sync() functions.

def add_plugin_argument(*args, **kwargs) -> None:
747def add_plugin_argument(*args, **kwargs) -> None:
748    """
749    Add argparse arguments under the 'Plugins options' group.
750    Takes the same parameters as the regular argparse `add_argument()` function.
751
752    Examples
753    --------
754    >>> add_plugin_argument('--foo', type=int, help="This is my help text!")
755    >>> 
756    """
757    from meerschaum._internal.arguments._parser import groups, _seen_plugin_args, parser
758    from meerschaum.utils.warnings import warn, error
759    _parent_plugin_name = _get_parent_plugin(2)
760    title = f"Plugin '{_parent_plugin_name}' options" if _parent_plugin_name else 'Custom options'
761    group_key = 'plugin_' + (_parent_plugin_name or '')
762    if group_key not in groups:
763        groups[group_key] = parser.add_argument_group(
764            title = title,
765        )
766        _seen_plugin_args[group_key] = set()
767    try:
768        if str(args) not in _seen_plugin_args[group_key]:
769            groups[group_key].add_argument(*args, **kwargs)
770            _seen_plugin_args[group_key].add(str(args))
771    except Exception as e:
772        warn(e)

Add argparse arguments under the 'Plugins options' group. Takes the same parameters as the regular argparse add_argument() function.

Examples
>>> add_plugin_argument('--foo', type=int, help="This is my help text!")
>>>
def pre_sync_hook(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
 94def pre_sync_hook(
 95        function: Callable[[Any], Any],
 96    ) -> Callable[[Any], Any]:
 97    """
 98    Register a function as a sync hook to be executed right before sync.
 99    
100    Parameters
101    ----------
102    function: Callable[[Any], Any]
103        The function to execute right before a sync.
104        
105    Returns
106    -------
107    Another function (this is a decorator function).
108
109    Examples
110    --------
111    >>> from meerschaum.plugins import pre_sync_hook
112    >>>
113    >>> @pre_sync_hook
114    ... def log_sync(pipe, **kwargs):
115    ...     print(f"About to sync {pipe} with kwargs:\n{kwargs}.")
116    >>>
117    """
118    with _locks['_pre_sync_hooks']:
119        try:
120            if function.__module__ not in _pre_sync_hooks:
121                _pre_sync_hooks[function.__module__] = []
122            _pre_sync_hooks[function.__module__].append(function)
123        except Exception as e:
124            from meerschaum.utils.warnings import warn
125            warn(e)
126    return function

Register a function as a sync hook to be executed right before sync.

Parameters
----------
function: Callable[[Any], Any]
    The function to execute right before a sync.

Returns
-------
Another function (this is a decorator function).

Examples
--------
>>> from meerschaum.plugins import pre_sync_hook
>>>
>>> @pre_sync_hook
... def log_sync(pipe, **kwargs):
...     print(f"About to sync {pipe} with kwargs:

{kwargs}.")

>

def post_sync_hook(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
129def post_sync_hook(
130        function: Callable[[Any], Any],
131    ) -> Callable[[Any], Any]:
132    """
133    Register a function as a sync hook to be executed upon completion of a sync.
134    
135    Parameters
136    ----------
137    function: Callable[[Any], Any]
138        The function to execute upon completion of a sync.
139        
140    Returns
141    -------
142    Another function (this is a decorator function).
143
144    Examples
145    --------
146    >>> from meerschaum.plugins import post_sync_hook
147    >>>
148    >>> @post_sync_hook
149    ... def log_sync(pipe, success_tuple, duration=None, **kwargs):
150    ...     print(f"It took {round(duration, 2)} seconds to sync {pipe}.")
151    >>>
152    """
153    with _locks['_post_sync_hooks']:
154        try:
155            if function.__module__ not in _post_sync_hooks:
156                _post_sync_hooks[function.__module__] = []
157            _post_sync_hooks[function.__module__].append(function)
158        except Exception as e:
159            from meerschaum.utils.warnings import warn
160            warn(e)
161    return function

Register a function as a sync hook to be executed upon completion of a sync.

Parameters
  • function (Callable[[Any], Any]): The function to execute upon completion of a sync.
Returns
  • Another function (this is a decorator function).
Examples
>>> from meerschaum.plugins import post_sync_hook
>>>
>>> @post_sync_hook
... def log_sync(pipe, success_tuple, duration=None, **kwargs):
...     print(f"It took {round(duration, 2)} seconds to sync {pipe}.")
>>>