meerschaum.plugins

Expose plugin management APIs from the meerschaum.plugins module.

  1#! /usr/bin/env python
  2# -*- coding: utf-8 -*-
  3# vim:fenc=utf-8
  4
  5"""
  6Expose plugin management APIs from the `meerschaum.plugins` module.
  7"""
  8
  9from __future__ import annotations
 10from meerschaum.utils.typing import Callable, Any, Union, Optional, Dict, List, Tuple
 11from meerschaum.utils.threading import Lock, RLock
 12from meerschaum.plugins._Plugin import Plugin
 13
 14_api_plugins: Dict[str, List[Callable[['fastapi.App'], Any]]] = {}
 15_pre_sync_hooks: Dict[str, List[Callable[[Any], Any]]] = {}
 16_post_sync_hooks: Dict[str, List[Callable[[Any], Any]]] = {}
 17_locks = {
 18    '_api_plugins': RLock(),
 19    '_pre_sync_hooks': RLock(),
 20    '_post_sync_hooks': RLock(),
 21    '__path__': RLock(),
 22    'sys.path': RLock(),
 23    'internal_plugins': RLock(),
 24    '_synced_symlinks': RLock(),
 25    'PLUGINS_INTERNAL_LOCK_PATH': RLock(),
 26}
 27__all__ = (
 28    "Plugin", "make_action", "api_plugin", "import_plugins",
 29    "reload_plugins", "get_plugins", "get_data_plugins", "add_plugin_argument",
 30    "pre_sync_hook", "post_sync_hook",
 31)
 32__pdoc__ = {
 33    'venvs': False, 'data': False, 'stack': False, 'plugins': False,
 34}
 35
 36def make_action(
 37        function: Callable[[Any], Any],
 38        shell: bool = False,
 39        activate: bool = True,
 40        deactivate: bool = True,
 41        debug: bool = False
 42    ) -> Callable[[Any], Any]:
 43    """
 44    Make a function a Meerschaum action. Useful for plugins that are adding multiple actions.
 45    
 46    Parameters
 47    ----------
 48    function: Callable[[Any], Any]
 49        The function to become a Meerschaum action. Must accept all keyword arguments.
 50        
 51    shell: bool, default False
 52        Not used.
 53        
 54    Returns
 55    -------
 56    Another function (this is a decorator function).
 57
 58    Examples
 59    --------
 60    >>> from meerschaum.plugins import make_action
 61    >>>
 62    >>> @make_action
 63    ... def my_action(**kw):
 64    ...     print('foo')
 65    ...     return True, "Success"
 66    >>>
 67    """
 68
 69    from meerschaum.actions import actions
 70    from meerschaum.utils.formatting import pprint
 71    package_name = function.__globals__['__name__']
 72    plugin_name = (
 73        package_name.split('.')[1]
 74        if package_name.startswith('plugins.') else None
 75    )
 76    plugin = Plugin(plugin_name) if plugin_name else None
 77
 78    if debug:
 79        from meerschaum.utils.debug import dprint
 80        dprint(
 81            f"Adding action '{function.__name__}' from plugin " +
 82            f"'{plugin}'..."
 83        )
 84
 85    actions[function.__name__] = function
 86    return function
 87
 88
 89def pre_sync_hook(
 90        function: Callable[[Any], Any],
 91    ) -> Callable[[Any], Any]:
 92    """
 93    Register a function as a sync hook to be executed right before sync.
 94    
 95    Parameters
 96    ----------
 97    function: Callable[[Any], Any]
 98        The function to execute right before a sync.
 99        
100    Returns
101    -------
102    Another function (this is a decorator function).
103
104    Examples
105    --------
106    >>> from meerschaum.plugins import pre_sync_hook
107    >>>
108    >>> @pre_sync_hook
109    ... def log_sync(pipe, **kwargs):
110    ...     print(f"About to sync {pipe} with kwargs:\n{kwargs}.")
111    >>>
112    """
113    with _locks['_pre_sync_hooks']:
114        try:
115            if function.__module__ not in _pre_sync_hooks:
116                _pre_sync_hooks[function.__module__] = []
117            _pre_sync_hooks[function.__module__].append(function)
118        except Exception as e:
119            from meerschaum.utils.warnings import warn
120            warn(e)
121    return function
122
123
124def post_sync_hook(
125        function: Callable[[Any], Any],
126    ) -> Callable[[Any], Any]:
127    """
128    Register a function as a sync hook to be executed upon completion of a sync.
129    
130    Parameters
131    ----------
132    function: Callable[[Any], Any]
133        The function to execute upon completion of a sync.
134        
135    Returns
136    -------
137    Another function (this is a decorator function).
138
139    Examples
140    --------
141    >>> from meerschaum.plugins import post_sync_hook
142    >>>
143    >>> @post_sync_hook
144    ... def log_sync(pipe, success_tuple, duration=None, **kwargs):
145    ...     print(f"It took {round(duration, 2)} seconds to sync {pipe}.")
146    >>>
147    """
148    with _locks['_post_sync_hooks']:
149        try:
150            if function.__module__ not in _post_sync_hooks:
151                _post_sync_hooks[function.__module__] = []
152            _post_sync_hooks[function.__module__].append(function)
153        except Exception as e:
154            from meerschaum.utils.warnings import warn
155            warn(e)
156    return function
157
158
159def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
160    """
161    Execute the function when initializing the Meerschaum API module.
162    Useful for lazy-loading heavy plugins only when the API is started,
163    such as when editing the `meerschaum.api.app` FastAPI app.
164    
165    The FastAPI app will be passed as the only parameter.
166    
167    Parameters
168    ----------
169    function: Callable[[Any, Any]]
170        The function to be called before starting the Meerschaum API.
171        
172    Returns
173    -------
174    Another function (decorator function).
175
176    Examples
177    --------
178    >>> from meerschaum.plugins import api_plugin
179    >>>
180    >>> @api_plugin
181    >>> def initialize_plugin(app):
182    ...     @app.get('/my/new/path')
183    ...     def new_path():
184    ...         return {'message': 'It works!'}
185    >>>
186    """
187    with _locks['_api_plugins']:
188        try:
189            if function.__module__ not in _api_plugins:
190                _api_plugins[function.__module__] = []
191            _api_plugins[function.__module__].append(function)
192        except Exception as e:
193            from meerschaum.utils.warnings import warn
194            warn(e)
195    return function
196
197
198_synced_symlinks: bool = False
199def sync_plugins_symlinks(debug: bool = False, warn: bool = True) -> None:
200    """
201    Update the plugins 
202    """
203    global _synced_symlinks
204    with _locks['_synced_symlinks']:
205        if _synced_symlinks:
206            return
207
208    import sys, os, pathlib, time
209    from collections import defaultdict
210    import importlib.util
211    from meerschaum.utils.misc import flatten_list, make_symlink, is_symlink
212    from meerschaum.utils.warnings import error, warn as _warn
213    from meerschaum.config.static import STATIC_CONFIG
214    from meerschaum.utils.venv import Venv, activate_venv, deactivate_venv, is_venv_active
215    from meerschaum.config._paths import (
216        PLUGINS_RESOURCES_PATH,
217        PLUGINS_ARCHIVES_RESOURCES_PATH,
218        PLUGINS_INIT_PATH,
219        PLUGINS_DIR_PATHS,
220        PLUGINS_INTERNAL_LOCK_PATH,
221    )
222
223    ### If the lock file exists, sleep for up to a second or until it's removed before continuing.
224    with _locks['PLUGINS_INTERNAL_LOCK_PATH']:
225        if PLUGINS_INTERNAL_LOCK_PATH.exists():
226            lock_sleep_total     = STATIC_CONFIG['plugins']['lock_sleep_total']
227            lock_sleep_increment = STATIC_CONFIG['plugins']['lock_sleep_increment']
228            lock_start = time.perf_counter()
229            while (
230                (time.perf_counter() - lock_start) < lock_sleep_total
231            ):
232                time.sleep(lock_sleep_increment)
233                if not PLUGINS_INTERNAL_LOCK_PATH.exists():
234                    break
235                try:
236                    PLUGINS_INTERNAL_LOCK_PATH.unlink()
237                except Exception as e:
238                    if warn:
239                        _warn(f"Error while removing lockfile {PLUGINS_INTERNAL_LOCK_PATH}:\n{e}")
240                    break
241
242        ### Begin locking from other processes.
243        try:
244            PLUGINS_INTERNAL_LOCK_PATH.touch()
245        except Exception as e:
246            if warn:
247                _warn(f"Unable to create lockfile {PLUGINS_INTERNAL_LOCK_PATH}:\n{e}")
248
249    with _locks['internal_plugins']:
250        if is_symlink(PLUGINS_RESOURCES_PATH) or not PLUGINS_RESOURCES_PATH.exists():
251            try:
252                PLUGINS_RESOURCES_PATH.unlink()
253            except Exception as e:
254                pass
255
256        PLUGINS_RESOURCES_PATH.mkdir(exist_ok=True)
257
258
259        existing_symlinked_paths = [
260            (PLUGINS_RESOURCES_PATH / item) 
261            for item in os.listdir(PLUGINS_RESOURCES_PATH)
262        ]
263        for plugins_path in PLUGINS_DIR_PATHS:
264            if not plugins_path.exists():
265                plugins_path.mkdir(exist_ok=True, parents=True)
266        plugins_to_be_symlinked = list(flatten_list(
267            [
268                [
269                    (plugins_path / item)
270                    for item in os.listdir(plugins_path)
271                    if (
272                        not item.startswith('.')
273                    ) and (item not in ('__pycache__', '__init__.py'))
274                ]
275                for plugins_path in PLUGINS_DIR_PATHS
276            ]
277        ))
278
279        ### Check for duplicates.
280        seen_plugins = defaultdict(lambda: 0)
281        for plugin_path in plugins_to_be_symlinked:
282            plugin_name = plugin_path.stem
283            seen_plugins[plugin_name] += 1
284        for plugin_name, plugin_count in seen_plugins.items():
285            if plugin_count > 1:
286                if warn:
287                    _warn(f"Found duplicate plugins named '{plugin_name}'.")
288
289        for plugin_symlink_path in existing_symlinked_paths:
290            real_path = pathlib.Path(os.path.realpath(plugin_symlink_path))
291
292            ### Remove invalid symlinks.
293            if real_path not in plugins_to_be_symlinked:
294                if not is_symlink(plugin_symlink_path):
295                    continue
296                try:
297                    plugin_symlink_path.unlink()
298                except Exception as e:
299                    pass
300
301            ### Remove valid plugins from the to-be-symlinked list.
302            else:
303                plugins_to_be_symlinked.remove(real_path)
304
305        for plugin_path in plugins_to_be_symlinked:
306            plugin_symlink_path = PLUGINS_RESOURCES_PATH / plugin_path.name
307            try:
308                ### There might be duplicate folders (e.g. __pycache__).
309                if (
310                    plugin_symlink_path.exists()
311                    and
312                    plugin_symlink_path.is_dir()
313                    and
314                    not is_symlink(plugin_symlink_path)
315                ):
316                    continue
317                success, msg = make_symlink(plugin_path, plugin_symlink_path)
318            except Exception as e:
319                success, msg = False, str(e)
320            if not success:
321                if warn:
322                    _warn(
323                        f"Failed to create symlink {plugin_symlink_path} "
324                        + f"to {plugin_path}:\n    {msg}"
325                    )
326
327    ### Release symlink lock file in case other processes need it.
328    with _locks['PLUGINS_INTERNAL_LOCK_PATH']:
329        try:
330            if PLUGINS_INTERNAL_LOCK_PATH.exists():
331                PLUGINS_INTERNAL_LOCK_PATH.unlink()
332        ### Sometimes competing threads will delete the lock file at the same time.
333        except FileNotFoundError:
334            pass
335        except Exception as e:
336            if warn:
337                _warn(f"Error cleaning up lockfile {PLUGINS_INTERNAL_LOCK_PATH}:\n{e}")
338
339        try:
340            if not PLUGINS_INIT_PATH.exists():
341                PLUGINS_INIT_PATH.touch()
342        except Exception as e:
343            error(f"Failed to create the file '{PLUGINS_INIT_PATH}':\n{e}")
344
345    with _locks['__path__']:
346        if str(PLUGINS_RESOURCES_PATH.parent) not in __path__:
347            __path__.append(str(PLUGINS_RESOURCES_PATH.parent))
348
349    with _locks['_synced_symlinks']:
350        _synced_symlinks = True
351
352
353def import_plugins(
354        *plugins_to_import: Union[str, List[str], None],
355        warn: bool = True,
356    ) -> Union[
357        'ModuleType', Tuple['ModuleType', None]
358    ]:
359    """
360    Import the Meerschaum plugins directory.
361
362    Parameters
363    ----------
364    plugins_to_import: Union[str, List[str], None]
365        If provided, only import the specified plugins.
366        Otherwise import the entire plugins module. May be a string, list, or `None`.
367        Defaults to `None`.
368
369    Returns
370    -------
371    A module of list of modules, depening on the number of plugins provided.
372
373    """
374    import sys
375    import os
376    import importlib
377    from meerschaum.utils.misc import flatten_list
378    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
379    from meerschaum.utils.venv import is_venv_active, activate_venv, deactivate_venv, Venv
380    from meerschaum.utils.warnings import warn as _warn
381    plugins_to_import = list(plugins_to_import)
382    with _locks['sys.path']:
383
384        ### Since plugins may depend on other plugins,
385        ### we need to activate the virtual environments for library plugins.
386        ### This logic exists in `Plugin.activate_venv()`,
387        ### but that code requires the plugin's module to already be imported.
388        ### It's not a guarantee of correct activation order,
389        ### e.g. if a library plugin pins a specific package and another 
390        plugins_names = get_plugins_names()
391        already_active_venvs = [is_venv_active(plugin_name) for plugin_name in plugins_names]
392
393        if not sys.path or sys.path[0] != str(PLUGINS_RESOURCES_PATH.parent):
394            sys.path.insert(0, str(PLUGINS_RESOURCES_PATH.parent))
395
396        if not plugins_to_import:
397            for plugin_name in plugins_names:
398                activate_venv(plugin_name)
399            try:
400                imported_plugins = importlib.import_module(PLUGINS_RESOURCES_PATH.stem)
401            except ImportError as e:
402                _warn(f"Failed to import the plugins module:\n    {e}")
403                import traceback
404                traceback.print_exc()
405                imported_plugins = None
406            for plugin_name in plugins_names:
407                if plugin_name in already_active_venvs:
408                    continue
409                deactivate_venv(plugin_name)
410
411        else:
412            imported_plugins = []
413            for plugin_name in flatten_list(plugins_to_import):
414                plugin = Plugin(plugin_name)
415                try:
416                    with Venv(plugin):
417                        imported_plugins.append(
418                            importlib.import_module(
419                                f'{PLUGINS_RESOURCES_PATH.stem}.{plugin_name}'
420                            )
421                        )
422                except Exception as e:
423                    _warn(
424                        f"Failed to import plugin '{plugin_name}':\n    "
425                        + f"{e}\n\nHere's a stacktrace:",
426                        stack = False,
427                    )
428                    from meerschaum.utils.formatting import get_console
429                    get_console().print_exception(
430                        suppress = [
431                            'meerschaum/plugins/__init__.py',
432                            importlib,
433                            importlib._bootstrap,
434                        ]
435                    )
436                    imported_plugins.append(None)
437
438        if imported_plugins is None and warn:
439            _warn(f"Failed to import plugins.", stacklevel=3)
440
441        if str(PLUGINS_RESOURCES_PATH.parent) in sys.path:
442            sys.path.remove(str(PLUGINS_RESOURCES_PATH.parent))
443
444    if isinstance(imported_plugins, list):
445        return (imported_plugins[0] if len(imported_plugins) == 1 else tuple(imported_plugins))
446    return imported_plugins
447
448
449def load_plugins(debug: bool = False, shell: bool = False) -> None:
450    """
451    Import Meerschaum plugins and update the actions dictionary.
452    """
453    from inspect import isfunction, getmembers
454    from meerschaum.actions import __all__ as _all, modules
455    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
456    from meerschaum.utils.packages import get_modules_from_package
457    if debug:
458        from meerschaum.utils.debug import dprint
459
460    _plugins_names, plugins_modules = get_modules_from_package(
461        import_plugins(),
462        names = True,
463        recursive = True,
464        modules_venvs = True
465    )
466    ### I'm appending here to keep from redefining the modules list.
467    new_modules = (
468        [
469            mod for mod in modules
470            if not mod.__name__.startswith(PLUGINS_RESOURCES_PATH.stem + '.')
471        ]
472        + plugins_modules
473    )
474    n_mods = len(modules)
475    for mod in new_modules:
476        modules.append(mod)
477    for i in range(n_mods):
478        modules.pop(0)
479
480    for module in plugins_modules:
481        for name, func in getmembers(module):
482            if not isfunction(func):
483                continue
484            if name == module.__name__.split('.')[-1]:
485                make_action(func, **{'shell': shell, 'debug': debug})
486
487
488def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
489    """
490    Reload plugins back into memory.
491
492    Parameters
493    ----------
494    plugins: Optional[List[str]], default None
495        The plugins to reload. `None` will reload all plugins.
496
497    """
498    import sys
499    if debug:
500        from meerschaum.utils.debug import dprint
501
502    if not plugins:
503        plugins = get_plugins_names()
504    for plugin_name in plugins:
505        if debug:
506            dprint(f"Reloading plugin '{plugin_name}'...")
507        mod_name = 'plugins.' + str(plugin_name)
508        if mod_name in sys.modules:
509            del sys.modules[mod_name]
510    load_plugins(debug=debug)
511
512
513def get_plugins(*to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]:
514    """
515    Return a list of `Plugin` objects.
516
517    Parameters
518    ----------
519    to_load:
520        If specified, only load specific plugins.
521        Otherwise return all plugins.
522
523    try_import: bool, default True
524        If `True`, allow for plugins to be imported.
525    """
526    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
527    import os
528    sync_plugins_symlinks()
529    _plugins = [
530        Plugin(name) for name in (
531            to_load or [
532                (
533                    name if (PLUGINS_RESOURCES_PATH / name).is_dir()
534                    else name[:-3]
535                ) for name in os.listdir(PLUGINS_RESOURCES_PATH) if name != '__init__.py'
536            ]
537        )
538    ]
539    plugins = tuple(plugin for plugin in _plugins if plugin.is_installed(try_import=try_import))
540    if len(to_load) == 1:
541        return plugins[0]
542    return plugins
543
544
545def get_plugins_names(*to_load, **kw) -> List[str]:
546    """
547    Return a list of installed plugins.
548    """
549    return [plugin.name for plugin in get_plugins(*to_load, **kw)]
550
551
552def get_plugins_modules(*to_load, **kw) -> List['ModuleType']:
553    """
554    Return a list of modules for the installed plugins, or `None` if things break.
555    """
556    return [plugin.module for plugin in get_plugins(*to_load, **kw)]
557
558
559def get_data_plugins() -> List[Plugin]:
560    """
561    Only return the modules of plugins with either `fetch()` or `sync()` functions.
562    """
563    import inspect
564    plugins = get_plugins()
565    data_names = {'sync', 'fetch'}
566    data_plugins = []
567    for plugin in plugins:
568        for name, ob in inspect.getmembers(plugin.module):
569            if not inspect.isfunction(ob):
570                continue
571            if name not in data_names:
572                continue
573            data_plugins.append(plugin)
574    return data_plugins
575
576
577def add_plugin_argument(*args, **kwargs) -> None:
578    """
579    Add argparse arguments under the 'Plugins options' group.
580    Takes the same parameters as the regular argparse `add_argument()` function.
581
582    Examples
583    --------
584    >>> add_plugin_argument('--foo', type=int, help="This is my help text!")
585    >>> 
586    """
587    from meerschaum._internal.arguments._parser import groups, _seen_plugin_args, parser
588    from meerschaum.utils.warnings import warn, error
589    _parent_plugin_name = _get_parent_plugin(2)
590    title = f"Plugin '{_parent_plugin_name}' options" if _parent_plugin_name else 'Custom options'
591    group_key = 'plugin_' + (_parent_plugin_name or '')
592    if group_key not in groups:
593        groups[group_key] = parser.add_argument_group(
594            title = title,
595        )
596        _seen_plugin_args[group_key] = set()
597    try:
598        if str(args) not in _seen_plugin_args[group_key]:
599            groups[group_key].add_argument(*args, **kwargs)
600            _seen_plugin_args[group_key].add(str(args))
601    except Exception as e:
602        warn(e)
603
604
605def _get_parent_plugin(stacklevel: int = 1) -> Union[str, None]:
606    """If this function is called from outside a Meerschaum plugin, it will return None."""
607    import inspect, re
608    try:
609        parent_globals = inspect.stack()[stacklevel][0].f_globals
610        parent_file = parent_globals.get('__file__', '')
611        return parent_globals['__name__'].replace('plugins.', '').split('.')[0]
612    except Exception as e:
613        return None
class Plugin:
 33class Plugin:
 34    """Handle packaging of Meerschaum plugins."""
 35    def __init__(
 36        self,
 37        name: str,
 38        version: Optional[str] = None,
 39        user_id: Optional[int] = None,
 40        required: Optional[List[str]] = None,
 41        attributes: Optional[Dict[str, Any]] = None,
 42        archive_path: Optional[pathlib.Path] = None,
 43        venv_path: Optional[pathlib.Path] = None,
 44        repo_connector: Optional['meerschaum.connectors.api.APIConnector'] = None,
 45        repo: Union['meerschaum.connectors.api.APIConnector', str, None] = None,
 46    ):
 47        from meerschaum.config.static import STATIC_CONFIG
 48        sep = STATIC_CONFIG['plugins']['repo_separator']
 49        _repo = None
 50        if sep in name:
 51            try:
 52                name, _repo = name.split(sep)
 53            except Exception as e:
 54                error(f"Invalid plugin name: '{name}'")
 55        self._repo_in_name = _repo
 56
 57        if attributes is None:
 58            attributes = {}
 59        self.name = name
 60        self.attributes = attributes
 61        self.user_id = user_id
 62        self._version = version
 63        if required:
 64            self._required = required
 65        self.archive_path = (
 66            archive_path if archive_path is not None
 67            else PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz"
 68        )
 69        self.venv_path = (
 70            venv_path if venv_path is not None
 71            else VIRTENV_RESOURCES_PATH / self.name
 72        )
 73        self._repo_connector = repo_connector
 74        self._repo_keys = repo
 75
 76
 77    @property
 78    def repo_connector(self):
 79        """
 80        Return the repository connector for this plugin.
 81        NOTE: This imports the `connectors` module, which imports certain plugin modules.
 82        """
 83        if self._repo_connector is None:
 84            from meerschaum.connectors.parse import parse_repo_keys
 85
 86            repo_keys = self._repo_keys or self._repo_in_name
 87            if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name:
 88                error(
 89                    f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'."
 90                )
 91            repo_connector = parse_repo_keys(repo_keys)
 92            self._repo_connector = repo_connector
 93        return self._repo_connector
 94
 95
 96    @property
 97    def version(self):
 98        """
 99        Return the plugin's module version is defined (`__version__`) if it's defined.
100        """
101        if self._version is None:
102            try:
103                self._version = self.module.__version__
104            except Exception as e:
105                self._version = None
106        return self._version
107
108
109    @property
110    def module(self):
111        """
112        Return the Python module of the underlying plugin.
113        """
114        if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None:
115            if self.__file__ is None:
116                return None
117            from meerschaum.plugins import import_plugins
118            self._module = import_plugins(str(self), warn=False)
119        return self._module
120
121
122    @property
123    def __file__(self) -> Union[str, None]:
124        """
125        Return the file path (str) of the plugin if it exists, otherwise `None`.
126        """
127        if self.__dict__.get('_module', None) is not None:
128            return self.module.__file__
129
130        potential_dir = PLUGINS_RESOURCES_PATH / self.name
131        if (
132            potential_dir.exists()
133            and potential_dir.is_dir()
134            and (potential_dir / '__init__.py').exists()
135        ):
136            return str((potential_dir / '__init__.py').as_posix())
137
138        potential_file = PLUGINS_RESOURCES_PATH / (self.name + '.py')
139        if potential_file.exists() and not potential_file.is_dir():
140            return str(potential_file.as_posix())
141
142        return None
143
144
145    @property
146    def requirements_file_path(self) -> Union[pathlib.Path, None]:
147        """
148        If a file named `requirements.txt` exists, return its path.
149        """
150        if self.__file__ is None:
151            return None
152        path = pathlib.Path(self.__file__).parent / 'requirements.txt'
153        if not path.exists():
154            return None
155        return path
156
157
158    def is_installed(self, **kw) -> bool:
159        """
160        Check whether a plugin is correctly installed.
161
162        Returns
163        -------
164        A `bool` indicating whether a plugin exists and is successfully imported.
165        """
166        return self.__file__ is not None
167
168
169    def make_tar(self, debug: bool = False) -> pathlib.Path:
170        """
171        Compress the plugin's source files into a `.tar.gz` archive and return the archive's path.
172
173        Parameters
174        ----------
175        debug: bool, default False
176            Verbosity toggle.
177
178        Returns
179        -------
180        A `pathlib.Path` to the archive file's path.
181
182        """
183        import tarfile, pathlib, subprocess, fnmatch
184        from meerschaum.utils.debug import dprint
185        from meerschaum.utils.packages import attempt_import
186        pathspec = attempt_import('pathspec', debug=debug)
187
188        if not self.__file__:
189            from meerschaum.utils.warnings import error
190            error(f"Could not find file for plugin '{self}'.")
191        if '__init__.py' in self.__file__ or os.path.isdir(self.__file__):
192            path = self.__file__.replace('__init__.py', '')
193            is_dir = True
194        else:
195            path = self.__file__
196            is_dir = False
197
198        old_cwd = os.getcwd()
199        real_parent_path = pathlib.Path(os.path.realpath(path)).parent
200        os.chdir(real_parent_path)
201
202        default_patterns_to_ignore = [
203            '.pyc',
204            '__pycache__/',
205            'eggs/',
206            '__pypackages__/',
207            '.git',
208        ]
209
210        def parse_gitignore() -> 'Set[str]':
211            gitignore_path = pathlib.Path(path) / '.gitignore'
212            if not gitignore_path.exists():
213                return set()
214            with open(gitignore_path, 'r', encoding='utf-8') as f:
215                gitignore_text = f.read()
216            return set(pathspec.PathSpec.from_lines(
217                pathspec.patterns.GitWildMatchPattern,
218                default_patterns_to_ignore + gitignore_text.splitlines()
219            ).match_tree(path))
220
221        patterns_to_ignore = parse_gitignore() if is_dir else set()
222
223        if debug:
224            dprint(f"Patterns to ignore:\n{patterns_to_ignore}")
225
226        with tarfile.open(self.archive_path, 'w:gz') as tarf:
227            if not is_dir:
228                tarf.add(f"{self.name}.py")
229            else:
230                for root, dirs, files in os.walk(self.name):
231                    for f in files:
232                        good_file = True
233                        fp = os.path.join(root, f)
234                        for pattern in patterns_to_ignore:
235                            if pattern in str(fp) or f.startswith('.'):
236                                good_file = False
237                                break
238                        if good_file:
239                            if debug:
240                                dprint(f"Adding '{fp}'...")
241                            tarf.add(fp)
242
243        ### clean up and change back to old directory
244        os.chdir(old_cwd)
245
246        ### change to 775 to avoid permissions issues with the API in a Docker container
247        self.archive_path.chmod(0o775)
248
249        if debug:
250            dprint(f"Created archive '{self.archive_path}'.")
251        return self.archive_path
252
253
254    def install(
255            self,
256            force: bool = False,
257            debug: bool = False,
258        ) -> SuccessTuple:
259        """
260        Extract a plugin's tar archive to the plugins directory.
261        
262        This function checks if the plugin is already installed and if the version is equal or
263        greater than the existing installation.
264
265        Parameters
266        ----------
267        force: bool, default False
268            If `True`, continue with installation, even if required packages fail to install.
269
270        debug: bool, default False
271            Verbosity toggle.
272
273        Returns
274        -------
275        A `SuccessTuple` of success (bool) and a message (str).
276
277        """
278        if self.full_name in _ongoing_installations:
279            return True, f"Already installing plugin '{self}'."
280        _ongoing_installations.add(self.full_name)
281        from meerschaum.utils.warnings import warn, error
282        if debug:
283            from meerschaum.utils.debug import dprint
284        import tarfile
285        import re
286        import ast
287        from meerschaum.plugins import sync_plugins_symlinks
288        from meerschaum.utils.packages import attempt_import, determine_version, reload_meerschaum
289        from meerschaum.utils.venv import init_venv
290        from meerschaum.utils.misc import safely_extract_tar
291        old_cwd = os.getcwd()
292        old_version = ''
293        new_version = ''
294        temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name
295        temp_dir.mkdir(exist_ok=True)
296
297        if not self.archive_path.exists():
298            return False, f"Missing archive file for plugin '{self}'."
299        if self.version is not None:
300            old_version = self.version
301            if debug:
302                dprint(f"Found existing version '{old_version}' for plugin '{self}'.")
303
304        if debug:
305            dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...")
306
307        try:
308            with tarfile.open(self.archive_path, 'r:gz') as tarf:
309                safely_extract_tar(tarf, temp_dir)
310        except Exception as e:
311            warn(e)
312            return False, f"Failed to extract plugin '{self.name}'."
313
314        ### search for version information
315        files = os.listdir(temp_dir)
316        
317        if str(files[0]) == self.name:
318            is_dir = True
319        elif str(files[0]) == self.name + '.py':
320            is_dir = False
321        else:
322            error(f"Unknown format encountered for plugin '{self}'.")
323
324        fpath = temp_dir / files[0]
325        if is_dir:
326            fpath = fpath / '__init__.py'
327
328        init_venv(self.name, debug=debug)
329        with open(fpath, 'r', encoding='utf-8') as f:
330            init_lines = f.readlines()
331        new_version = None
332        for line in init_lines:
333            if '__version__' not in line:
334                continue
335            version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip())
336            if not version_match:
337                continue
338            new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip())
339            break
340        if not new_version:
341            warn(
342                f"No `__version__` defined for plugin '{self}'. "
343                + "Assuming new version...",
344                stack = False,
345            )
346
347        packaging_version = attempt_import('packaging.version')
348        try:
349            is_new_version = (not new_version and not old_version) or (
350                packaging_version.parse(old_version) < packaging_version.parse(new_version)
351            )
352            is_same_version = new_version and old_version and (
353                packaging_version.parse(old_version) == packaging_version.parse(new_version)
354            )
355        except Exception as e:
356            is_new_version, is_same_version = True, False
357
358        ### Determine where to permanently store the new plugin.
359        plugin_installation_dir_path = PLUGINS_DIR_PATHS[0]
360        for path in PLUGINS_DIR_PATHS:
361            files_in_plugins_dir = os.listdir(path)
362            if (
363                self.name in files_in_plugins_dir
364                or
365                (self.name + '.py') in files_in_plugins_dir
366            ):
367                plugin_installation_dir_path = path
368                break
369
370        success_msg = f"Successfully installed plugin '{self}'."
371        success, abort = None, None
372
373        if is_same_version and not force:
374            success, msg = True, (
375                f"Plugin '{self}' is up-to-date (version {old_version}).\n" +
376                "    Install again with `-f` or `--force` to reinstall."
377            )
378            abort = True
379        elif is_new_version or force:
380            for src_dir, dirs, files in os.walk(temp_dir):
381                if success is not None:
382                    break
383                dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path))
384                if not os.path.exists(dst_dir):
385                    os.mkdir(dst_dir)
386                for f in files:
387                    src_file = os.path.join(src_dir, f)
388                    dst_file = os.path.join(dst_dir, f)
389                    if os.path.exists(dst_file):
390                        os.remove(dst_file)
391
392                    if debug:
393                        dprint(f"Moving '{src_file}' to '{dst_dir}'...")
394                    try:
395                        shutil.move(src_file, dst_dir)
396                    except Exception as e:
397                        success, msg = False, (
398                            f"Failed to install plugin '{self}': " +
399                            f"Could not move file '{src_file}' to '{dst_dir}'"
400                        )
401                        print(msg)
402                        break
403            if success is None:
404                success, msg = True, success_msg
405        else:
406            success, msg = False, (
407                f"Your installed version of plugin '{self}' ({old_version}) is higher than "
408                + f"attempted version {new_version}."
409            )
410
411        shutil.rmtree(temp_dir)
412        os.chdir(old_cwd)
413
414        ### Reload the plugin's module.
415        sync_plugins_symlinks(debug=debug)
416        if '_module' in self.__dict__:
417            del self.__dict__['_module']
418        init_venv(venv=self.name, force=True, debug=debug)
419        reload_meerschaum(debug=debug)
420
421        ### if we've already failed, return here
422        if not success or abort:
423            _ongoing_installations.remove(self.full_name)
424            return success, msg
425
426        ### attempt to install dependencies
427        if not self.install_dependencies(force=force, debug=debug):
428            _ongoing_installations.remove(self.full_name)
429            return False, f"Failed to install dependencies for plugin '{self}'."
430
431        ### handling success tuple, bool, or other (typically None)
432        setup_tuple = self.setup(debug=debug)
433        if isinstance(setup_tuple, tuple):
434            if not setup_tuple[0]:
435                success, msg = setup_tuple
436        elif isinstance(setup_tuple, bool):
437            if not setup_tuple:
438                success, msg = False, (
439                    f"Failed to run post-install setup for plugin '{self}'." + '\n' +
440                    f"Check `setup()` in '{self.__file__}' for more information " +
441                    f"(no error message provided)."
442                )
443            else:
444                success, msg = True, success_msg
445        elif setup_tuple is None:
446            success = True
447            msg = (
448                f"Post-install for plugin '{self}' returned None. " +
449                f"Assuming plugin successfully installed."
450            )
451            warn(msg)
452        else:
453            success = False
454            msg = (
455                f"Post-install for plugin '{self}' returned unexpected value " +
456                f"of type '{type(setup_tuple)}': {setup_tuple}"
457            )
458
459        _ongoing_installations.remove(self.full_name)
460        module = self.module
461        return success, msg
462
463
464    def remove_archive(
465            self,        
466            debug: bool = False
467        ) -> SuccessTuple:
468        """Remove a plugin's archive file."""
469        if not self.archive_path.exists():
470            return True, f"Archive file for plugin '{self}' does not exist."
471        try:
472            self.archive_path.unlink()
473        except Exception as e:
474            return False, f"Failed to remove archive for plugin '{self}':\n{e}"
475        return True, "Success"
476
477
478    def remove_venv(
479            self,        
480            debug: bool = False
481        ) -> SuccessTuple:
482        """Remove a plugin's virtual environment."""
483        if not self.venv_path.exists():
484            return True, f"Virtual environment for plugin '{self}' does not exist."
485        try:
486            shutil.rmtree(self.venv_path)
487        except Exception as e:
488            return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}"
489        return True, "Success"
490
491
492    def uninstall(self, debug: bool = False) -> SuccessTuple:
493        """
494        Remove a plugin, its virtual environment, and archive file.
495        """
496        from meerschaum.utils.packages import reload_meerschaum
497        from meerschaum.plugins import sync_plugins_symlinks
498        from meerschaum.utils.warnings import warn, info
499        warnings_thrown_count: int = 0
500        max_warnings: int = 3
501
502        if not self.is_installed():
503            info(
504                f"Plugin '{self.name}' doesn't seem to be installed.\n    "
505                + "Checking for artifacts...",
506                stack = False,
507            )
508        else:
509            real_path = pathlib.Path(os.path.realpath(self.__file__))
510            try:
511                if real_path.name == '__init__.py':
512                    shutil.rmtree(real_path.parent)
513                else:
514                    real_path.unlink()
515            except Exception as e:
516                warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False)
517                warnings_thrown_count += 1
518            else:
519                info(f"Removed source files for plugin '{self.name}'.")
520
521        if self.venv_path.exists():
522            success, msg = self.remove_venv(debug=debug)
523            if not success:
524                warn(msg, stack=False)
525                warnings_thrown_count += 1
526            else:
527                info(f"Removed virtual environment from plugin '{self.name}'.")
528
529        success = warnings_thrown_count < max_warnings
530        sync_plugins_symlinks(debug=debug)
531        self.deactivate_venv(force=True, debug=debug)
532        reload_meerschaum(debug=debug)
533        return success, (
534            f"Successfully uninstalled plugin '{self}'." if success
535            else f"Failed to uninstall plugin '{self}'."
536        )
537
538
539    def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]:
540        """
541        If exists, run the plugin's `setup()` function.
542
543        Parameters
544        ----------
545        *args: str
546            The positional arguments passed to the `setup()` function.
547            
548        debug: bool, default False
549            Verbosity toggle.
550
551        **kw: Any
552            The keyword arguments passed to the `setup()` function.
553
554        Returns
555        -------
556        A `SuccessTuple` or `bool` indicating success.
557
558        """
559        from meerschaum.utils.debug import dprint
560        import inspect
561        _setup = None
562        for name, fp in inspect.getmembers(self.module):
563            if name == 'setup' and inspect.isfunction(fp):
564                _setup = fp
565                break
566
567        ### assume success if no setup() is found (not necessary)
568        if _setup is None:
569            return True
570
571        sig = inspect.signature(_setup)
572        has_debug, has_kw = ('debug' in sig.parameters), False
573        for k, v in sig.parameters.items():
574            if '**' in str(v):
575                has_kw = True
576                break
577
578        _kw = {}
579        if has_kw:
580            _kw.update(kw)
581        if has_debug:
582            _kw['debug'] = debug
583
584        if debug:
585            dprint(f"Running setup for plugin '{self}'...")
586        try:
587            self.activate_venv(debug=debug)
588            return_tuple = _setup(*args, **_kw)
589            self.deactivate_venv(debug=debug)
590        except Exception as e:
591            return False, str(e)
592
593        if isinstance(return_tuple, tuple):
594            return return_tuple
595        if isinstance(return_tuple, bool):
596            return return_tuple, f"Setup for Plugin '{self.name}' did not return a message."
597        if return_tuple is None:
598            return False, f"Setup for Plugin '{self.name}' returned None."
599        return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"
600
601
602    def get_dependencies(
603            self,
604            debug: bool = False,
605        ) -> List[str]:
606        """
607        If the Plugin has specified dependencies in a list called `required`, return the list.
608        
609        **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages.
610        Meerschaum plugins may also specify connector keys for a repo after `'@'`.
611
612        Parameters
613        ----------
614        debug: bool, default False
615            Verbosity toggle.
616
617        Returns
618        -------
619        A list of required packages and plugins (str).
620
621        """
622        if '_required' in self.__dict__:
623            return self._required
624
625        ### If the plugin has not yet been imported,
626        ### infer the dependencies from the source text.
627        ### This is not super robust, and it doesn't feel right
628        ### having multiple versions of the logic.
629        ### This is necessary when determining the activation order
630        ### without having import the module.
631        ### For consistency's sake, the module-less method does not cache the requirements.
632        if self.__dict__.get('_module', None) is None:
633            file_path = self.__file__
634            if file_path is None:
635                return []
636            with open(file_path, 'r', encoding='utf-8') as f:
637                text = f.read()
638
639            if 'required' not in text:
640                return []
641
642            ### This has some limitations:
643            ### It relies on `required` being manually declared.
644            ### We lose the ability to dynamically alter the `required` list,
645            ### which is why we've kept the module-reliant method below.
646            import ast, re
647            ### NOTE: This technically would break 
648            ### if `required` was the very first line of the file.
649            req_start_match = re.search(r'\nrequired(\s?)=', text)
650            if not req_start_match:
651                return []
652            req_start = req_start_match.start()
653
654            ### Dependencies may have brackets within the strings, so push back the index.
655            first_opening_brace = req_start + 1 + text[req_start:].find('[')
656            if first_opening_brace == -1:
657                return []
658
659            next_closing_brace = req_start + 1 + text[req_start:].find(']')
660            if next_closing_brace == -1:
661                return []
662
663            start_ix = first_opening_brace + 1
664            end_ix = next_closing_brace
665
666            num_braces = 0
667            while True:
668                if '[' not in text[start_ix:end_ix]:
669                    break
670                num_braces += 1
671                start_ix = end_ix
672                end_ix += text[end_ix + 1:].find(']') + 1
673
674            req_end = end_ix + 1
675            req_text = (
676                text[req_start:req_end]
677                .lstrip()
678                .replace('required', '', 1)
679                .lstrip()
680                .replace('=', '', 1)
681                .lstrip()
682            )
683            try:
684                required = ast.literal_eval(req_text)
685            except Exception as e:
686                warn(
687                    f"Unable to determine requirements for plugin '{self.name}' "
688                    + "without importing the module.\n"
689                    + "    This may be due to dynamically setting the global `required` list.\n"
690                    + f"    {e}"
691                )
692                return []
693            return required
694
695        import inspect
696        self.activate_venv(dependencies=False, debug=debug)
697        required = []
698        for name, val in inspect.getmembers(self.module):
699            if name == 'required':
700                required = val
701                break
702        self._required = required
703        self.deactivate_venv(dependencies=False, debug=debug)
704        return required
705
706
707    def get_required_plugins(self, debug: bool=False) -> List[meerschaum.plugins.Plugin]:
708        """
709        Return a list of required Plugin objects.
710        """
711        from meerschaum.utils.warnings import warn
712        from meerschaum.config import get_config
713        from meerschaum.config.static import STATIC_CONFIG
714        plugins = []
715        _deps = self.get_dependencies(debug=debug)
716        sep = STATIC_CONFIG['plugins']['repo_separator']
717        plugin_names = [
718            _d[len('plugin:'):] for _d in _deps
719            if _d.startswith('plugin:') and len(_d) > len('plugin:')
720        ]
721        default_repo_keys = get_config('meerschaum', 'default_repository')
722        for _plugin_name in plugin_names:
723            if sep in _plugin_name:
724                try:
725                    _plugin_name, _repo_keys = _plugin_name.split(sep)
726                except Exception as e:
727                    _repo_keys = default_repo_keys
728                    warn(
729                        f"Invalid repo keys for required plugin '{_plugin_name}'.\n    "
730                        + f"Will try to use '{_repo_keys}' instead.",
731                        stack = False,
732                    )
733            else:
734                _repo_keys = default_repo_keys
735            plugins.append(Plugin(_plugin_name, repo=_repo_keys))
736        return plugins
737
738
739    def get_required_packages(self, debug: bool=False) -> List[str]:
740        """
741        Return the required package names (excluding plugins).
742        """
743        _deps = self.get_dependencies(debug=debug)
744        return [_d for _d in _deps if not _d.startswith('plugin:')]
745
746
747    def activate_venv(self, dependencies: bool=True, debug: bool=False, **kw) -> bool:
748        """
749        Activate the virtual environments for the plugin and its dependencies.
750
751        Parameters
752        ----------
753        dependencies: bool, default True
754            If `True`, activate the virtual environments for required plugins.
755
756        Returns
757        -------
758        A bool indicating success.
759        """
760        from meerschaum.utils.venv import venv_target_path
761        from meerschaum.utils.packages import activate_venv
762        from meerschaum.utils.misc import make_symlink, is_symlink
763        from meerschaum.config._paths import PACKAGE_ROOT_PATH
764
765        if dependencies:
766            for plugin in self.get_required_plugins(debug=debug):
767                plugin.activate_venv(debug=debug, **kw)
768
769        vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True)
770        venv_meerschaum_path = vtp / 'meerschaum'
771
772        try:
773            success, msg = True, "Success"
774            if is_symlink(venv_meerschaum_path):
775                if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH:
776                    venv_meerschaum_path.unlink()
777                    success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH)
778        except Exception as e:
779            success, msg = False, str(e)
780        if not success:
781            warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}")
782
783        return activate_venv(self.name, debug=debug, **kw)
784
785
786    def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool:
787        """
788        Deactivate the virtual environments for the plugin and its dependencies.
789
790        Parameters
791        ----------
792        dependencies: bool, default True
793            If `True`, deactivate the virtual environments for required plugins.
794
795        Returns
796        -------
797        A bool indicating success.
798        """
799        from meerschaum.utils.packages import deactivate_venv
800        success = deactivate_venv(self.name, debug=debug, **kw)
801        if dependencies:
802            for plugin in self.get_required_plugins(debug=debug):
803                plugin.deactivate_venv(debug=debug, **kw)
804        return success
805
806
807    def install_dependencies(
808            self,
809            force: bool = False,
810            debug: bool = False,
811        ) -> bool:
812        """
813        If specified, install dependencies.
814        
815        **NOTE:** Dependencies that start with `'plugin:'` will be installed as
816        Meerschaum plugins from the same repository as this Plugin.
817        To install from a different repository, add the repo keys after `'@'`
818        (e.g. `'plugin:foo@api:bar'`).
819
820        Parameters
821        ----------
822        force: bool, default False
823            If `True`, continue with the installation, even if some
824            required packages fail to install.
825
826        debug: bool, default False
827            Verbosity toggle.
828
829        Returns
830        -------
831        A bool indicating success.
832
833        """
834        from meerschaum.utils.packages import pip_install, venv_contains_package
835        from meerschaum.utils.debug import dprint
836        from meerschaum.utils.warnings import warn, info
837        from meerschaum.connectors.parse import parse_repo_keys
838        _deps = self.get_dependencies(debug=debug)
839        if not _deps and self.requirements_file_path is None:
840            return True
841
842        plugins = self.get_required_plugins(debug=debug)
843        for _plugin in plugins:
844            if _plugin.name == self.name:
845                warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False)
846                continue
847            _success, _msg = _plugin.repo_connector.install_plugin(
848                _plugin.name, debug=debug, force=force
849            )
850            if not _success:
851                warn(
852                    f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'"
853                    + f" for plugin '{self.name}':\n" + _msg,
854                    stack = False,
855                )
856                if not force:
857                    warn(
858                        "Try installing with the `--force` flag to continue anyway.",
859                        stack = False,
860                    )
861                    return False
862                info(
863                    "Continuing with installation despite the failure "
864                    + "(careful, things might be broken!)...",
865                    icon = False
866                )
867
868
869        ### First step: parse `requirements.txt` if it exists.
870        if self.requirements_file_path is not None:
871            if not pip_install(
872                requirements_file_path=self.requirements_file_path,
873                venv=self.name, debug=debug
874            ):
875                warn(
876                    f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.",
877                    stack = False,
878                )
879                if not force:
880                    warn(
881                        "Try installing with `--force` to continue anyway.",
882                        stack = False,
883                    )
884                    return False
885                info(
886                    "Continuing with installation despite the failure "
887                    + "(careful, things might be broken!)...",
888                    icon = False
889                )
890
891
892        ### Don't reinstall packages that are already included in required plugins.
893        packages = []
894        _packages = self.get_required_packages(debug=debug)
895        accounted_for_packages = set()
896        for package_name in _packages:
897            for plugin in plugins:
898                if venv_contains_package(package_name, plugin.name):
899                    accounted_for_packages.add(package_name)
900                    break
901        packages = [pkg for pkg in _packages if pkg not in accounted_for_packages]
902
903        ### Attempt pip packages installation.
904        if packages:
905            for package in packages:
906                if not pip_install(package, venv=self.name, debug=debug):
907                    warn(
908                        f"Failed to install required package '{package}'"
909                        + f" for plugin '{self.name}'.",
910                        stack = False,
911                    )
912                    if not force:
913                        warn(
914                            "Try installing with `--force` to continue anyway.",
915                            stack = False,
916                        )
917                        return False
918                    info(
919                        "Continuing with installation despite the failure "
920                        + "(careful, things might be broken!)...",
921                        icon = False
922                    )
923        return True
924
925
926    @property
927    def full_name(self) -> str:
928        """
929        Include the repo keys with the plugin's name.
930        """
931        from meerschaum.config.static import STATIC_CONFIG
932        sep = STATIC_CONFIG['plugins']['repo_separator']
933        return self.name + sep + str(self.repo_connector)
934
935
936    def __str__(self):
937        return self.name
938
939
940    def __repr__(self):
941        return f"Plugin('{self.name}', repo='{self.repo_connector}')"
942
943
944    def __del__(self):
945        pass

Handle packaging of Meerschaum plugins.

Plugin( name: str, version: Optional[str] = None, user_id: Optional[int] = None, required: Optional[List[str]] = None, attributes: Optional[Dict[str, Any]] = None, archive_path: Optional[pathlib.Path] = None, venv_path: Optional[pathlib.Path] = None, repo_connector: Optional[meerschaum.connectors.api.APIConnector.APIConnector] = None, repo: Union[meerschaum.connectors.api.APIConnector.APIConnector, str, NoneType] = None)
35    def __init__(
36        self,
37        name: str,
38        version: Optional[str] = None,
39        user_id: Optional[int] = None,
40        required: Optional[List[str]] = None,
41        attributes: Optional[Dict[str, Any]] = None,
42        archive_path: Optional[pathlib.Path] = None,
43        venv_path: Optional[pathlib.Path] = None,
44        repo_connector: Optional['meerschaum.connectors.api.APIConnector'] = None,
45        repo: Union['meerschaum.connectors.api.APIConnector', str, None] = None,
46    ):
47        from meerschaum.config.static import STATIC_CONFIG
48        sep = STATIC_CONFIG['plugins']['repo_separator']
49        _repo = None
50        if sep in name:
51            try:
52                name, _repo = name.split(sep)
53            except Exception as e:
54                error(f"Invalid plugin name: '{name}'")
55        self._repo_in_name = _repo
56
57        if attributes is None:
58            attributes = {}
59        self.name = name
60        self.attributes = attributes
61        self.user_id = user_id
62        self._version = version
63        if required:
64            self._required = required
65        self.archive_path = (
66            archive_path if archive_path is not None
67            else PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz"
68        )
69        self.venv_path = (
70            venv_path if venv_path is not None
71            else VIRTENV_RESOURCES_PATH / self.name
72        )
73        self._repo_connector = repo_connector
74        self._repo_keys = repo
name
attributes
user_id
archive_path
venv_path
repo_connector

Return the repository connector for this plugin. NOTE: This imports the connectors module, which imports certain plugin modules.

version

Return the plugin's module version is defined (__version__) if it's defined.

module

Return the Python module of the underlying plugin.

requirements_file_path: Optional[pathlib.Path]

If a file named requirements.txt exists, return its path.

def is_installed(self, **kw) -> bool:
158    def is_installed(self, **kw) -> bool:
159        """
160        Check whether a plugin is correctly installed.
161
162        Returns
163        -------
164        A `bool` indicating whether a plugin exists and is successfully imported.
165        """
166        return self.__file__ is not None

Check whether a plugin is correctly installed.

Returns
  • A bool indicating whether a plugin exists and is successfully imported.
def make_tar(self, debug: bool = False) -> pathlib.Path:
169    def make_tar(self, debug: bool = False) -> pathlib.Path:
170        """
171        Compress the plugin's source files into a `.tar.gz` archive and return the archive's path.
172
173        Parameters
174        ----------
175        debug: bool, default False
176            Verbosity toggle.
177
178        Returns
179        -------
180        A `pathlib.Path` to the archive file's path.
181
182        """
183        import tarfile, pathlib, subprocess, fnmatch
184        from meerschaum.utils.debug import dprint
185        from meerschaum.utils.packages import attempt_import
186        pathspec = attempt_import('pathspec', debug=debug)
187
188        if not self.__file__:
189            from meerschaum.utils.warnings import error
190            error(f"Could not find file for plugin '{self}'.")
191        if '__init__.py' in self.__file__ or os.path.isdir(self.__file__):
192            path = self.__file__.replace('__init__.py', '')
193            is_dir = True
194        else:
195            path = self.__file__
196            is_dir = False
197
198        old_cwd = os.getcwd()
199        real_parent_path = pathlib.Path(os.path.realpath(path)).parent
200        os.chdir(real_parent_path)
201
202        default_patterns_to_ignore = [
203            '.pyc',
204            '__pycache__/',
205            'eggs/',
206            '__pypackages__/',
207            '.git',
208        ]
209
210        def parse_gitignore() -> 'Set[str]':
211            gitignore_path = pathlib.Path(path) / '.gitignore'
212            if not gitignore_path.exists():
213                return set()
214            with open(gitignore_path, 'r', encoding='utf-8') as f:
215                gitignore_text = f.read()
216            return set(pathspec.PathSpec.from_lines(
217                pathspec.patterns.GitWildMatchPattern,
218                default_patterns_to_ignore + gitignore_text.splitlines()
219            ).match_tree(path))
220
221        patterns_to_ignore = parse_gitignore() if is_dir else set()
222
223        if debug:
224            dprint(f"Patterns to ignore:\n{patterns_to_ignore}")
225
226        with tarfile.open(self.archive_path, 'w:gz') as tarf:
227            if not is_dir:
228                tarf.add(f"{self.name}.py")
229            else:
230                for root, dirs, files in os.walk(self.name):
231                    for f in files:
232                        good_file = True
233                        fp = os.path.join(root, f)
234                        for pattern in patterns_to_ignore:
235                            if pattern in str(fp) or f.startswith('.'):
236                                good_file = False
237                                break
238                        if good_file:
239                            if debug:
240                                dprint(f"Adding '{fp}'...")
241                            tarf.add(fp)
242
243        ### clean up and change back to old directory
244        os.chdir(old_cwd)
245
246        ### change to 775 to avoid permissions issues with the API in a Docker container
247        self.archive_path.chmod(0o775)
248
249        if debug:
250            dprint(f"Created archive '{self.archive_path}'.")
251        return self.archive_path

Compress the plugin's source files into a .tar.gz archive and return the archive's path.

Parameters
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pathlib.Path to the archive file's path.
def install(self, force: bool = False, debug: bool = False) -> Tuple[bool, str]:
254    def install(
255            self,
256            force: bool = False,
257            debug: bool = False,
258        ) -> SuccessTuple:
259        """
260        Extract a plugin's tar archive to the plugins directory.
261        
262        This function checks if the plugin is already installed and if the version is equal or
263        greater than the existing installation.
264
265        Parameters
266        ----------
267        force: bool, default False
268            If `True`, continue with installation, even if required packages fail to install.
269
270        debug: bool, default False
271            Verbosity toggle.
272
273        Returns
274        -------
275        A `SuccessTuple` of success (bool) and a message (str).
276
277        """
278        if self.full_name in _ongoing_installations:
279            return True, f"Already installing plugin '{self}'."
280        _ongoing_installations.add(self.full_name)
281        from meerschaum.utils.warnings import warn, error
282        if debug:
283            from meerschaum.utils.debug import dprint
284        import tarfile
285        import re
286        import ast
287        from meerschaum.plugins import sync_plugins_symlinks
288        from meerschaum.utils.packages import attempt_import, determine_version, reload_meerschaum
289        from meerschaum.utils.venv import init_venv
290        from meerschaum.utils.misc import safely_extract_tar
291        old_cwd = os.getcwd()
292        old_version = ''
293        new_version = ''
294        temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name
295        temp_dir.mkdir(exist_ok=True)
296
297        if not self.archive_path.exists():
298            return False, f"Missing archive file for plugin '{self}'."
299        if self.version is not None:
300            old_version = self.version
301            if debug:
302                dprint(f"Found existing version '{old_version}' for plugin '{self}'.")
303
304        if debug:
305            dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...")
306
307        try:
308            with tarfile.open(self.archive_path, 'r:gz') as tarf:
309                safely_extract_tar(tarf, temp_dir)
310        except Exception as e:
311            warn(e)
312            return False, f"Failed to extract plugin '{self.name}'."
313
314        ### search for version information
315        files = os.listdir(temp_dir)
316        
317        if str(files[0]) == self.name:
318            is_dir = True
319        elif str(files[0]) == self.name + '.py':
320            is_dir = False
321        else:
322            error(f"Unknown format encountered for plugin '{self}'.")
323
324        fpath = temp_dir / files[0]
325        if is_dir:
326            fpath = fpath / '__init__.py'
327
328        init_venv(self.name, debug=debug)
329        with open(fpath, 'r', encoding='utf-8') as f:
330            init_lines = f.readlines()
331        new_version = None
332        for line in init_lines:
333            if '__version__' not in line:
334                continue
335            version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip())
336            if not version_match:
337                continue
338            new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip())
339            break
340        if not new_version:
341            warn(
342                f"No `__version__` defined for plugin '{self}'. "
343                + "Assuming new version...",
344                stack = False,
345            )
346
347        packaging_version = attempt_import('packaging.version')
348        try:
349            is_new_version = (not new_version and not old_version) or (
350                packaging_version.parse(old_version) < packaging_version.parse(new_version)
351            )
352            is_same_version = new_version and old_version and (
353                packaging_version.parse(old_version) == packaging_version.parse(new_version)
354            )
355        except Exception as e:
356            is_new_version, is_same_version = True, False
357
358        ### Determine where to permanently store the new plugin.
359        plugin_installation_dir_path = PLUGINS_DIR_PATHS[0]
360        for path in PLUGINS_DIR_PATHS:
361            files_in_plugins_dir = os.listdir(path)
362            if (
363                self.name in files_in_plugins_dir
364                or
365                (self.name + '.py') in files_in_plugins_dir
366            ):
367                plugin_installation_dir_path = path
368                break
369
370        success_msg = f"Successfully installed plugin '{self}'."
371        success, abort = None, None
372
373        if is_same_version and not force:
374            success, msg = True, (
375                f"Plugin '{self}' is up-to-date (version {old_version}).\n" +
376                "    Install again with `-f` or `--force` to reinstall."
377            )
378            abort = True
379        elif is_new_version or force:
380            for src_dir, dirs, files in os.walk(temp_dir):
381                if success is not None:
382                    break
383                dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path))
384                if not os.path.exists(dst_dir):
385                    os.mkdir(dst_dir)
386                for f in files:
387                    src_file = os.path.join(src_dir, f)
388                    dst_file = os.path.join(dst_dir, f)
389                    if os.path.exists(dst_file):
390                        os.remove(dst_file)
391
392                    if debug:
393                        dprint(f"Moving '{src_file}' to '{dst_dir}'...")
394                    try:
395                        shutil.move(src_file, dst_dir)
396                    except Exception as e:
397                        success, msg = False, (
398                            f"Failed to install plugin '{self}': " +
399                            f"Could not move file '{src_file}' to '{dst_dir}'"
400                        )
401                        print(msg)
402                        break
403            if success is None:
404                success, msg = True, success_msg
405        else:
406            success, msg = False, (
407                f"Your installed version of plugin '{self}' ({old_version}) is higher than "
408                + f"attempted version {new_version}."
409            )
410
411        shutil.rmtree(temp_dir)
412        os.chdir(old_cwd)
413
414        ### Reload the plugin's module.
415        sync_plugins_symlinks(debug=debug)
416        if '_module' in self.__dict__:
417            del self.__dict__['_module']
418        init_venv(venv=self.name, force=True, debug=debug)
419        reload_meerschaum(debug=debug)
420
421        ### if we've already failed, return here
422        if not success or abort:
423            _ongoing_installations.remove(self.full_name)
424            return success, msg
425
426        ### attempt to install dependencies
427        if not self.install_dependencies(force=force, debug=debug):
428            _ongoing_installations.remove(self.full_name)
429            return False, f"Failed to install dependencies for plugin '{self}'."
430
431        ### handling success tuple, bool, or other (typically None)
432        setup_tuple = self.setup(debug=debug)
433        if isinstance(setup_tuple, tuple):
434            if not setup_tuple[0]:
435                success, msg = setup_tuple
436        elif isinstance(setup_tuple, bool):
437            if not setup_tuple:
438                success, msg = False, (
439                    f"Failed to run post-install setup for plugin '{self}'." + '\n' +
440                    f"Check `setup()` in '{self.__file__}' for more information " +
441                    f"(no error message provided)."
442                )
443            else:
444                success, msg = True, success_msg
445        elif setup_tuple is None:
446            success = True
447            msg = (
448                f"Post-install for plugin '{self}' returned None. " +
449                f"Assuming plugin successfully installed."
450            )
451            warn(msg)
452        else:
453            success = False
454            msg = (
455                f"Post-install for plugin '{self}' returned unexpected value " +
456                f"of type '{type(setup_tuple)}': {setup_tuple}"
457            )
458
459        _ongoing_installations.remove(self.full_name)
460        module = self.module
461        return success, msg

Extract a plugin's tar archive to the plugins directory.

This function checks if the plugin is already installed and if the version is equal or greater than the existing installation.

Parameters
  • force (bool, default False): If True, continue with installation, even if required packages fail to install.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A SuccessTuple of success (bool) and a message (str).
def remove_archive(self, debug: bool = False) -> Tuple[bool, str]:
464    def remove_archive(
465            self,        
466            debug: bool = False
467        ) -> SuccessTuple:
468        """Remove a plugin's archive file."""
469        if not self.archive_path.exists():
470            return True, f"Archive file for plugin '{self}' does not exist."
471        try:
472            self.archive_path.unlink()
473        except Exception as e:
474            return False, f"Failed to remove archive for plugin '{self}':\n{e}"
475        return True, "Success"

Remove a plugin's archive file.

def remove_venv(self, debug: bool = False) -> Tuple[bool, str]:
478    def remove_venv(
479            self,        
480            debug: bool = False
481        ) -> SuccessTuple:
482        """Remove a plugin's virtual environment."""
483        if not self.venv_path.exists():
484            return True, f"Virtual environment for plugin '{self}' does not exist."
485        try:
486            shutil.rmtree(self.venv_path)
487        except Exception as e:
488            return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}"
489        return True, "Success"

Remove a plugin's virtual environment.

def uninstall(self, debug: bool = False) -> Tuple[bool, str]:
492    def uninstall(self, debug: bool = False) -> SuccessTuple:
493        """
494        Remove a plugin, its virtual environment, and archive file.
495        """
496        from meerschaum.utils.packages import reload_meerschaum
497        from meerschaum.plugins import sync_plugins_symlinks
498        from meerschaum.utils.warnings import warn, info
499        warnings_thrown_count: int = 0
500        max_warnings: int = 3
501
502        if not self.is_installed():
503            info(
504                f"Plugin '{self.name}' doesn't seem to be installed.\n    "
505                + "Checking for artifacts...",
506                stack = False,
507            )
508        else:
509            real_path = pathlib.Path(os.path.realpath(self.__file__))
510            try:
511                if real_path.name == '__init__.py':
512                    shutil.rmtree(real_path.parent)
513                else:
514                    real_path.unlink()
515            except Exception as e:
516                warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False)
517                warnings_thrown_count += 1
518            else:
519                info(f"Removed source files for plugin '{self.name}'.")
520
521        if self.venv_path.exists():
522            success, msg = self.remove_venv(debug=debug)
523            if not success:
524                warn(msg, stack=False)
525                warnings_thrown_count += 1
526            else:
527                info(f"Removed virtual environment from plugin '{self.name}'.")
528
529        success = warnings_thrown_count < max_warnings
530        sync_plugins_symlinks(debug=debug)
531        self.deactivate_venv(force=True, debug=debug)
532        reload_meerschaum(debug=debug)
533        return success, (
534            f"Successfully uninstalled plugin '{self}'." if success
535            else f"Failed to uninstall plugin '{self}'."
536        )

Remove a plugin, its virtual environment, and archive file.

def setup( self, *args: str, debug: bool = False, **kw: Any) -> Union[Tuple[bool, str], bool]:
539    def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]:
540        """
541        If exists, run the plugin's `setup()` function.
542
543        Parameters
544        ----------
545        *args: str
546            The positional arguments passed to the `setup()` function.
547            
548        debug: bool, default False
549            Verbosity toggle.
550
551        **kw: Any
552            The keyword arguments passed to the `setup()` function.
553
554        Returns
555        -------
556        A `SuccessTuple` or `bool` indicating success.
557
558        """
559        from meerschaum.utils.debug import dprint
560        import inspect
561        _setup = None
562        for name, fp in inspect.getmembers(self.module):
563            if name == 'setup' and inspect.isfunction(fp):
564                _setup = fp
565                break
566
567        ### assume success if no setup() is found (not necessary)
568        if _setup is None:
569            return True
570
571        sig = inspect.signature(_setup)
572        has_debug, has_kw = ('debug' in sig.parameters), False
573        for k, v in sig.parameters.items():
574            if '**' in str(v):
575                has_kw = True
576                break
577
578        _kw = {}
579        if has_kw:
580            _kw.update(kw)
581        if has_debug:
582            _kw['debug'] = debug
583
584        if debug:
585            dprint(f"Running setup for plugin '{self}'...")
586        try:
587            self.activate_venv(debug=debug)
588            return_tuple = _setup(*args, **_kw)
589            self.deactivate_venv(debug=debug)
590        except Exception as e:
591            return False, str(e)
592
593        if isinstance(return_tuple, tuple):
594            return return_tuple
595        if isinstance(return_tuple, bool):
596            return return_tuple, f"Setup for Plugin '{self.name}' did not return a message."
597        if return_tuple is None:
598            return False, f"Setup for Plugin '{self.name}' returned None."
599        return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"

If exists, run the plugin's setup() function.

Parameters
  • *args (str): The positional arguments passed to the setup() function.
  • debug (bool, default False): Verbosity toggle.
  • **kw (Any): The keyword arguments passed to the setup() function.
Returns
  • A SuccessTuple or bool indicating success.
def get_dependencies(self, debug: bool = False) -> List[str]:
602    def get_dependencies(
603            self,
604            debug: bool = False,
605        ) -> List[str]:
606        """
607        If the Plugin has specified dependencies in a list called `required`, return the list.
608        
609        **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages.
610        Meerschaum plugins may also specify connector keys for a repo after `'@'`.
611
612        Parameters
613        ----------
614        debug: bool, default False
615            Verbosity toggle.
616
617        Returns
618        -------
619        A list of required packages and plugins (str).
620
621        """
622        if '_required' in self.__dict__:
623            return self._required
624
625        ### If the plugin has not yet been imported,
626        ### infer the dependencies from the source text.
627        ### This is not super robust, and it doesn't feel right
628        ### having multiple versions of the logic.
629        ### This is necessary when determining the activation order
630        ### without having import the module.
631        ### For consistency's sake, the module-less method does not cache the requirements.
632        if self.__dict__.get('_module', None) is None:
633            file_path = self.__file__
634            if file_path is None:
635                return []
636            with open(file_path, 'r', encoding='utf-8') as f:
637                text = f.read()
638
639            if 'required' not in text:
640                return []
641
642            ### This has some limitations:
643            ### It relies on `required` being manually declared.
644            ### We lose the ability to dynamically alter the `required` list,
645            ### which is why we've kept the module-reliant method below.
646            import ast, re
647            ### NOTE: This technically would break 
648            ### if `required` was the very first line of the file.
649            req_start_match = re.search(r'\nrequired(\s?)=', text)
650            if not req_start_match:
651                return []
652            req_start = req_start_match.start()
653
654            ### Dependencies may have brackets within the strings, so push back the index.
655            first_opening_brace = req_start + 1 + text[req_start:].find('[')
656            if first_opening_brace == -1:
657                return []
658
659            next_closing_brace = req_start + 1 + text[req_start:].find(']')
660            if next_closing_brace == -1:
661                return []
662
663            start_ix = first_opening_brace + 1
664            end_ix = next_closing_brace
665
666            num_braces = 0
667            while True:
668                if '[' not in text[start_ix:end_ix]:
669                    break
670                num_braces += 1
671                start_ix = end_ix
672                end_ix += text[end_ix + 1:].find(']') + 1
673
674            req_end = end_ix + 1
675            req_text = (
676                text[req_start:req_end]
677                .lstrip()
678                .replace('required', '', 1)
679                .lstrip()
680                .replace('=', '', 1)
681                .lstrip()
682            )
683            try:
684                required = ast.literal_eval(req_text)
685            except Exception as e:
686                warn(
687                    f"Unable to determine requirements for plugin '{self.name}' "
688                    + "without importing the module.\n"
689                    + "    This may be due to dynamically setting the global `required` list.\n"
690                    + f"    {e}"
691                )
692                return []
693            return required
694
695        import inspect
696        self.activate_venv(dependencies=False, debug=debug)
697        required = []
698        for name, val in inspect.getmembers(self.module):
699            if name == 'required':
700                required = val
701                break
702        self._required = required
703        self.deactivate_venv(dependencies=False, debug=debug)
704        return required

If the Plugin has specified dependencies in a list called required, return the list.

NOTE: Dependecies which start with 'plugin:' are Meerschaum plugins, not pip packages. Meerschaum plugins may also specify connector keys for a repo after '@'.

Parameters
  • debug (bool, default False): Verbosity toggle.
Returns
  • A list of required packages and plugins (str).
def get_required_plugins(self, debug: bool = False) -> List[Plugin]:
707    def get_required_plugins(self, debug: bool=False) -> List[meerschaum.plugins.Plugin]:
708        """
709        Return a list of required Plugin objects.
710        """
711        from meerschaum.utils.warnings import warn
712        from meerschaum.config import get_config
713        from meerschaum.config.static import STATIC_CONFIG
714        plugins = []
715        _deps = self.get_dependencies(debug=debug)
716        sep = STATIC_CONFIG['plugins']['repo_separator']
717        plugin_names = [
718            _d[len('plugin:'):] for _d in _deps
719            if _d.startswith('plugin:') and len(_d) > len('plugin:')
720        ]
721        default_repo_keys = get_config('meerschaum', 'default_repository')
722        for _plugin_name in plugin_names:
723            if sep in _plugin_name:
724                try:
725                    _plugin_name, _repo_keys = _plugin_name.split(sep)
726                except Exception as e:
727                    _repo_keys = default_repo_keys
728                    warn(
729                        f"Invalid repo keys for required plugin '{_plugin_name}'.\n    "
730                        + f"Will try to use '{_repo_keys}' instead.",
731                        stack = False,
732                    )
733            else:
734                _repo_keys = default_repo_keys
735            plugins.append(Plugin(_plugin_name, repo=_repo_keys))
736        return plugins

Return a list of required Plugin objects.

def get_required_packages(self, debug: bool = False) -> List[str]:
739    def get_required_packages(self, debug: bool=False) -> List[str]:
740        """
741        Return the required package names (excluding plugins).
742        """
743        _deps = self.get_dependencies(debug=debug)
744        return [_d for _d in _deps if not _d.startswith('plugin:')]

Return the required package names (excluding plugins).

def activate_venv(self, dependencies: bool = True, debug: bool = False, **kw) -> bool:
747    def activate_venv(self, dependencies: bool=True, debug: bool=False, **kw) -> bool:
748        """
749        Activate the virtual environments for the plugin and its dependencies.
750
751        Parameters
752        ----------
753        dependencies: bool, default True
754            If `True`, activate the virtual environments for required plugins.
755
756        Returns
757        -------
758        A bool indicating success.
759        """
760        from meerschaum.utils.venv import venv_target_path
761        from meerschaum.utils.packages import activate_venv
762        from meerschaum.utils.misc import make_symlink, is_symlink
763        from meerschaum.config._paths import PACKAGE_ROOT_PATH
764
765        if dependencies:
766            for plugin in self.get_required_plugins(debug=debug):
767                plugin.activate_venv(debug=debug, **kw)
768
769        vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True)
770        venv_meerschaum_path = vtp / 'meerschaum'
771
772        try:
773            success, msg = True, "Success"
774            if is_symlink(venv_meerschaum_path):
775                if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH:
776                    venv_meerschaum_path.unlink()
777                    success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH)
778        except Exception as e:
779            success, msg = False, str(e)
780        if not success:
781            warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}")
782
783        return activate_venv(self.name, debug=debug, **kw)

Activate the virtual environments for the plugin and its dependencies.

Parameters
  • dependencies (bool, default True): If True, activate the virtual environments for required plugins.
Returns
  • A bool indicating success.
def deactivate_venv(self, dependencies: bool = True, debug: bool = False, **kw) -> bool:
786    def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool:
787        """
788        Deactivate the virtual environments for the plugin and its dependencies.
789
790        Parameters
791        ----------
792        dependencies: bool, default True
793            If `True`, deactivate the virtual environments for required plugins.
794
795        Returns
796        -------
797        A bool indicating success.
798        """
799        from meerschaum.utils.packages import deactivate_venv
800        success = deactivate_venv(self.name, debug=debug, **kw)
801        if dependencies:
802            for plugin in self.get_required_plugins(debug=debug):
803                plugin.deactivate_venv(debug=debug, **kw)
804        return success

Deactivate the virtual environments for the plugin and its dependencies.

Parameters
  • dependencies (bool, default True): If True, deactivate the virtual environments for required plugins.
Returns
  • A bool indicating success.
def install_dependencies(self, force: bool = False, debug: bool = False) -> bool:
807    def install_dependencies(
808            self,
809            force: bool = False,
810            debug: bool = False,
811        ) -> bool:
812        """
813        If specified, install dependencies.
814        
815        **NOTE:** Dependencies that start with `'plugin:'` will be installed as
816        Meerschaum plugins from the same repository as this Plugin.
817        To install from a different repository, add the repo keys after `'@'`
818        (e.g. `'plugin:foo@api:bar'`).
819
820        Parameters
821        ----------
822        force: bool, default False
823            If `True`, continue with the installation, even if some
824            required packages fail to install.
825
826        debug: bool, default False
827            Verbosity toggle.
828
829        Returns
830        -------
831        A bool indicating success.
832
833        """
834        from meerschaum.utils.packages import pip_install, venv_contains_package
835        from meerschaum.utils.debug import dprint
836        from meerschaum.utils.warnings import warn, info
837        from meerschaum.connectors.parse import parse_repo_keys
838        _deps = self.get_dependencies(debug=debug)
839        if not _deps and self.requirements_file_path is None:
840            return True
841
842        plugins = self.get_required_plugins(debug=debug)
843        for _plugin in plugins:
844            if _plugin.name == self.name:
845                warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False)
846                continue
847            _success, _msg = _plugin.repo_connector.install_plugin(
848                _plugin.name, debug=debug, force=force
849            )
850            if not _success:
851                warn(
852                    f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'"
853                    + f" for plugin '{self.name}':\n" + _msg,
854                    stack = False,
855                )
856                if not force:
857                    warn(
858                        "Try installing with the `--force` flag to continue anyway.",
859                        stack = False,
860                    )
861                    return False
862                info(
863                    "Continuing with installation despite the failure "
864                    + "(careful, things might be broken!)...",
865                    icon = False
866                )
867
868
869        ### First step: parse `requirements.txt` if it exists.
870        if self.requirements_file_path is not None:
871            if not pip_install(
872                requirements_file_path=self.requirements_file_path,
873                venv=self.name, debug=debug
874            ):
875                warn(
876                    f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.",
877                    stack = False,
878                )
879                if not force:
880                    warn(
881                        "Try installing with `--force` to continue anyway.",
882                        stack = False,
883                    )
884                    return False
885                info(
886                    "Continuing with installation despite the failure "
887                    + "(careful, things might be broken!)...",
888                    icon = False
889                )
890
891
892        ### Don't reinstall packages that are already included in required plugins.
893        packages = []
894        _packages = self.get_required_packages(debug=debug)
895        accounted_for_packages = set()
896        for package_name in _packages:
897            for plugin in plugins:
898                if venv_contains_package(package_name, plugin.name):
899                    accounted_for_packages.add(package_name)
900                    break
901        packages = [pkg for pkg in _packages if pkg not in accounted_for_packages]
902
903        ### Attempt pip packages installation.
904        if packages:
905            for package in packages:
906                if not pip_install(package, venv=self.name, debug=debug):
907                    warn(
908                        f"Failed to install required package '{package}'"
909                        + f" for plugin '{self.name}'.",
910                        stack = False,
911                    )
912                    if not force:
913                        warn(
914                            "Try installing with `--force` to continue anyway.",
915                            stack = False,
916                        )
917                        return False
918                    info(
919                        "Continuing with installation despite the failure "
920                        + "(careful, things might be broken!)...",
921                        icon = False
922                    )
923        return True

If specified, install dependencies.

NOTE: Dependencies that start with 'plugin:' will be installed as Meerschaum plugins from the same repository as this Plugin. To install from a different repository, add the repo keys after '@' (e.g. 'plugin:foo@api:bar').

Parameters
  • force (bool, default False): If True, continue with the installation, even if some required packages fail to install.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A bool indicating success.
full_name: str

Include the repo keys with the plugin's name.

def make_action( function: Callable[[Any], Any], shell: bool = False, activate: bool = True, deactivate: bool = True, debug: bool = False) -> Callable[[Any], Any]:
37def make_action(
38        function: Callable[[Any], Any],
39        shell: bool = False,
40        activate: bool = True,
41        deactivate: bool = True,
42        debug: bool = False
43    ) -> Callable[[Any], Any]:
44    """
45    Make a function a Meerschaum action. Useful for plugins that are adding multiple actions.
46    
47    Parameters
48    ----------
49    function: Callable[[Any], Any]
50        The function to become a Meerschaum action. Must accept all keyword arguments.
51        
52    shell: bool, default False
53        Not used.
54        
55    Returns
56    -------
57    Another function (this is a decorator function).
58
59    Examples
60    --------
61    >>> from meerschaum.plugins import make_action
62    >>>
63    >>> @make_action
64    ... def my_action(**kw):
65    ...     print('foo')
66    ...     return True, "Success"
67    >>>
68    """
69
70    from meerschaum.actions import actions
71    from meerschaum.utils.formatting import pprint
72    package_name = function.__globals__['__name__']
73    plugin_name = (
74        package_name.split('.')[1]
75        if package_name.startswith('plugins.') else None
76    )
77    plugin = Plugin(plugin_name) if plugin_name else None
78
79    if debug:
80        from meerschaum.utils.debug import dprint
81        dprint(
82            f"Adding action '{function.__name__}' from plugin " +
83            f"'{plugin}'..."
84        )
85
86    actions[function.__name__] = function
87    return function

Make a function a Meerschaum action. Useful for plugins that are adding multiple actions.

Parameters
  • function (Callable[[Any], Any]): The function to become a Meerschaum action. Must accept all keyword arguments.
  • shell (bool, default False): Not used.
Returns
  • Another function (this is a decorator function).
Examples
>>> from meerschaum.plugins import make_action
>>>
>>> @make_action
... def my_action(**kw):
...     print('foo')
...     return True, "Success"
>>>
def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
160def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
161    """
162    Execute the function when initializing the Meerschaum API module.
163    Useful for lazy-loading heavy plugins only when the API is started,
164    such as when editing the `meerschaum.api.app` FastAPI app.
165    
166    The FastAPI app will be passed as the only parameter.
167    
168    Parameters
169    ----------
170    function: Callable[[Any, Any]]
171        The function to be called before starting the Meerschaum API.
172        
173    Returns
174    -------
175    Another function (decorator function).
176
177    Examples
178    --------
179    >>> from meerschaum.plugins import api_plugin
180    >>>
181    >>> @api_plugin
182    >>> def initialize_plugin(app):
183    ...     @app.get('/my/new/path')
184    ...     def new_path():
185    ...         return {'message': 'It works!'}
186    >>>
187    """
188    with _locks['_api_plugins']:
189        try:
190            if function.__module__ not in _api_plugins:
191                _api_plugins[function.__module__] = []
192            _api_plugins[function.__module__].append(function)
193        except Exception as e:
194            from meerschaum.utils.warnings import warn
195            warn(e)
196    return function

Execute the function when initializing the Meerschaum API module. Useful for lazy-loading heavy plugins only when the API is started, such as when editing the meerschaum.api.app FastAPI app.

The FastAPI app will be passed as the only parameter.

Parameters
  • function (Callable[[Any, Any]]): The function to be called before starting the Meerschaum API.
Returns
  • Another function (decorator function).
Examples
>>> from meerschaum.plugins import api_plugin
>>>
>>> @api_plugin
>>> def initialize_plugin(app):
...     @app.get('/my/new/path')
...     def new_path():
...         return {'message': 'It works!'}
>>>
def import_plugins( *plugins_to_import: Union[str, List[str], NoneType], warn: bool = True) -> "Union['ModuleType', Tuple['ModuleType', None]]":
354def import_plugins(
355        *plugins_to_import: Union[str, List[str], None],
356        warn: bool = True,
357    ) -> Union[
358        'ModuleType', Tuple['ModuleType', None]
359    ]:
360    """
361    Import the Meerschaum plugins directory.
362
363    Parameters
364    ----------
365    plugins_to_import: Union[str, List[str], None]
366        If provided, only import the specified plugins.
367        Otherwise import the entire plugins module. May be a string, list, or `None`.
368        Defaults to `None`.
369
370    Returns
371    -------
372    A module of list of modules, depening on the number of plugins provided.
373
374    """
375    import sys
376    import os
377    import importlib
378    from meerschaum.utils.misc import flatten_list
379    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
380    from meerschaum.utils.venv import is_venv_active, activate_venv, deactivate_venv, Venv
381    from meerschaum.utils.warnings import warn as _warn
382    plugins_to_import = list(plugins_to_import)
383    with _locks['sys.path']:
384
385        ### Since plugins may depend on other plugins,
386        ### we need to activate the virtual environments for library plugins.
387        ### This logic exists in `Plugin.activate_venv()`,
388        ### but that code requires the plugin's module to already be imported.
389        ### It's not a guarantee of correct activation order,
390        ### e.g. if a library plugin pins a specific package and another 
391        plugins_names = get_plugins_names()
392        already_active_venvs = [is_venv_active(plugin_name) for plugin_name in plugins_names]
393
394        if not sys.path or sys.path[0] != str(PLUGINS_RESOURCES_PATH.parent):
395            sys.path.insert(0, str(PLUGINS_RESOURCES_PATH.parent))
396
397        if not plugins_to_import:
398            for plugin_name in plugins_names:
399                activate_venv(plugin_name)
400            try:
401                imported_plugins = importlib.import_module(PLUGINS_RESOURCES_PATH.stem)
402            except ImportError as e:
403                _warn(f"Failed to import the plugins module:\n    {e}")
404                import traceback
405                traceback.print_exc()
406                imported_plugins = None
407            for plugin_name in plugins_names:
408                if plugin_name in already_active_venvs:
409                    continue
410                deactivate_venv(plugin_name)
411
412        else:
413            imported_plugins = []
414            for plugin_name in flatten_list(plugins_to_import):
415                plugin = Plugin(plugin_name)
416                try:
417                    with Venv(plugin):
418                        imported_plugins.append(
419                            importlib.import_module(
420                                f'{PLUGINS_RESOURCES_PATH.stem}.{plugin_name}'
421                            )
422                        )
423                except Exception as e:
424                    _warn(
425                        f"Failed to import plugin '{plugin_name}':\n    "
426                        + f"{e}\n\nHere's a stacktrace:",
427                        stack = False,
428                    )
429                    from meerschaum.utils.formatting import get_console
430                    get_console().print_exception(
431                        suppress = [
432                            'meerschaum/plugins/__init__.py',
433                            importlib,
434                            importlib._bootstrap,
435                        ]
436                    )
437                    imported_plugins.append(None)
438
439        if imported_plugins is None and warn:
440            _warn(f"Failed to import plugins.", stacklevel=3)
441
442        if str(PLUGINS_RESOURCES_PATH.parent) in sys.path:
443            sys.path.remove(str(PLUGINS_RESOURCES_PATH.parent))
444
445    if isinstance(imported_plugins, list):
446        return (imported_plugins[0] if len(imported_plugins) == 1 else tuple(imported_plugins))
447    return imported_plugins

Import the Meerschaum plugins directory.

Parameters
  • plugins_to_import (Union[str, List[str], None]): If provided, only import the specified plugins. Otherwise import the entire plugins module. May be a string, list, or None. Defaults to None.
Returns
  • A module of list of modules, depening on the number of plugins provided.
def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
489def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None:
490    """
491    Reload plugins back into memory.
492
493    Parameters
494    ----------
495    plugins: Optional[List[str]], default None
496        The plugins to reload. `None` will reload all plugins.
497
498    """
499    import sys
500    if debug:
501        from meerschaum.utils.debug import dprint
502
503    if not plugins:
504        plugins = get_plugins_names()
505    for plugin_name in plugins:
506        if debug:
507            dprint(f"Reloading plugin '{plugin_name}'...")
508        mod_name = 'plugins.' + str(plugin_name)
509        if mod_name in sys.modules:
510            del sys.modules[mod_name]
511    load_plugins(debug=debug)

Reload plugins back into memory.

Parameters
  • plugins (Optional[List[str]], default None): The plugins to reload. None will reload all plugins.
def get_plugins( *to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]:
514def get_plugins(*to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]:
515    """
516    Return a list of `Plugin` objects.
517
518    Parameters
519    ----------
520    to_load:
521        If specified, only load specific plugins.
522        Otherwise return all plugins.
523
524    try_import: bool, default True
525        If `True`, allow for plugins to be imported.
526    """
527    from meerschaum.config._paths import PLUGINS_RESOURCES_PATH
528    import os
529    sync_plugins_symlinks()
530    _plugins = [
531        Plugin(name) for name in (
532            to_load or [
533                (
534                    name if (PLUGINS_RESOURCES_PATH / name).is_dir()
535                    else name[:-3]
536                ) for name in os.listdir(PLUGINS_RESOURCES_PATH) if name != '__init__.py'
537            ]
538        )
539    ]
540    plugins = tuple(plugin for plugin in _plugins if plugin.is_installed(try_import=try_import))
541    if len(to_load) == 1:
542        return plugins[0]
543    return plugins

Return a list of Plugin objects.

Parameters
  • to_load:: If specified, only load specific plugins. Otherwise return all plugins.
  • try_import (bool, default True): If True, allow for plugins to be imported.
def get_data_plugins() -> List[Plugin]:
560def get_data_plugins() -> List[Plugin]:
561    """
562    Only return the modules of plugins with either `fetch()` or `sync()` functions.
563    """
564    import inspect
565    plugins = get_plugins()
566    data_names = {'sync', 'fetch'}
567    data_plugins = []
568    for plugin in plugins:
569        for name, ob in inspect.getmembers(plugin.module):
570            if not inspect.isfunction(ob):
571                continue
572            if name not in data_names:
573                continue
574            data_plugins.append(plugin)
575    return data_plugins

Only return the modules of plugins with either fetch() or sync() functions.

def add_plugin_argument(*args, **kwargs) -> None:
578def add_plugin_argument(*args, **kwargs) -> None:
579    """
580    Add argparse arguments under the 'Plugins options' group.
581    Takes the same parameters as the regular argparse `add_argument()` function.
582
583    Examples
584    --------
585    >>> add_plugin_argument('--foo', type=int, help="This is my help text!")
586    >>> 
587    """
588    from meerschaum._internal.arguments._parser import groups, _seen_plugin_args, parser
589    from meerschaum.utils.warnings import warn, error
590    _parent_plugin_name = _get_parent_plugin(2)
591    title = f"Plugin '{_parent_plugin_name}' options" if _parent_plugin_name else 'Custom options'
592    group_key = 'plugin_' + (_parent_plugin_name or '')
593    if group_key not in groups:
594        groups[group_key] = parser.add_argument_group(
595            title = title,
596        )
597        _seen_plugin_args[group_key] = set()
598    try:
599        if str(args) not in _seen_plugin_args[group_key]:
600            groups[group_key].add_argument(*args, **kwargs)
601            _seen_plugin_args[group_key].add(str(args))
602    except Exception as e:
603        warn(e)

Add argparse arguments under the 'Plugins options' group. Takes the same parameters as the regular argparse add_argument() function.

Examples
>>> add_plugin_argument('--foo', type=int, help="This is my help text!")
>>>
def pre_sync_hook(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
 90def pre_sync_hook(
 91        function: Callable[[Any], Any],
 92    ) -> Callable[[Any], Any]:
 93    """
 94    Register a function as a sync hook to be executed right before sync.
 95    
 96    Parameters
 97    ----------
 98    function: Callable[[Any], Any]
 99        The function to execute right before a sync.
100        
101    Returns
102    -------
103    Another function (this is a decorator function).
104
105    Examples
106    --------
107    >>> from meerschaum.plugins import pre_sync_hook
108    >>>
109    >>> @pre_sync_hook
110    ... def log_sync(pipe, **kwargs):
111    ...     print(f"About to sync {pipe} with kwargs:\n{kwargs}.")
112    >>>
113    """
114    with _locks['_pre_sync_hooks']:
115        try:
116            if function.__module__ not in _pre_sync_hooks:
117                _pre_sync_hooks[function.__module__] = []
118            _pre_sync_hooks[function.__module__].append(function)
119        except Exception as e:
120            from meerschaum.utils.warnings import warn
121            warn(e)
122    return function

Register a function as a sync hook to be executed right before sync.

Parameters
----------
function: Callable[[Any], Any]
    The function to execute right before a sync.

Returns
-------
Another function (this is a decorator function).

Examples
--------
>>> from meerschaum.plugins import pre_sync_hook
>>>
>>> @pre_sync_hook
... def log_sync(pipe, **kwargs):
...     print(f"About to sync {pipe} with kwargs:

{kwargs}.")

>

def post_sync_hook(function: Callable[[Any], Any]) -> Callable[[Any], Any]:
125def post_sync_hook(
126        function: Callable[[Any], Any],
127    ) -> Callable[[Any], Any]:
128    """
129    Register a function as a sync hook to be executed upon completion of a sync.
130    
131    Parameters
132    ----------
133    function: Callable[[Any], Any]
134        The function to execute upon completion of a sync.
135        
136    Returns
137    -------
138    Another function (this is a decorator function).
139
140    Examples
141    --------
142    >>> from meerschaum.plugins import post_sync_hook
143    >>>
144    >>> @post_sync_hook
145    ... def log_sync(pipe, success_tuple, duration=None, **kwargs):
146    ...     print(f"It took {round(duration, 2)} seconds to sync {pipe}.")
147    >>>
148    """
149    with _locks['_post_sync_hooks']:
150        try:
151            if function.__module__ not in _post_sync_hooks:
152                _post_sync_hooks[function.__module__] = []
153            _post_sync_hooks[function.__module__].append(function)
154        except Exception as e:
155            from meerschaum.utils.warnings import warn
156            warn(e)
157    return function

Register a function as a sync hook to be executed upon completion of a sync.

Parameters
  • function (Callable[[Any], Any]): The function to execute upon completion of a sync.
Returns
  • Another function (this is a decorator function).
Examples
>>> from meerschaum.plugins import post_sync_hook
>>>
>>> @post_sync_hook
... def log_sync(pipe, success_tuple, duration=None, **kwargs):
...     print(f"It took {round(duration, 2)} seconds to sync {pipe}.")
>>>