meerschaum.plugins
Expose plugin management APIs from the meerschaum.plugins
module.
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Expose plugin management APIs from the `meerschaum.plugins` module. 7""" 8 9from __future__ import annotations 10import functools 11import meerschaum as mrsm 12from meerschaum.utils.typing import Callable, Any, Union, Optional, Dict, List, Tuple 13from meerschaum.utils.threading import Lock, RLock 14from meerschaum.plugins._Plugin import Plugin 15 16_api_plugins: Dict[str, List[Callable[['fastapi.App'], Any]]] = {} 17_pre_sync_hooks: Dict[str, List[Callable[[Any], Any]]] = {} 18_post_sync_hooks: Dict[str, List[Callable[[Any], Any]]] = {} 19_locks = { 20 '_api_plugins': RLock(), 21 '_dash_plugins': RLock(), 22 '_pre_sync_hooks': RLock(), 23 '_post_sync_hooks': RLock(), 24 '__path__': RLock(), 25 'sys.path': RLock(), 26 'internal_plugins': RLock(), 27 '_synced_symlinks': RLock(), 28 'PLUGINS_INTERNAL_LOCK_PATH': RLock(), 29} 30__all__ = ( 31 "Plugin", "make_action", "api_plugin", "dash_plugin", "web_page", 32 "import_plugins", "from_plugin_import", 33 "reload_plugins", "get_plugins", "get_data_plugins", "add_plugin_argument", 34 "pre_sync_hook", "post_sync_hook", 35) 36__pdoc__ = { 37 'venvs': False, 'data': False, 'stack': False, 'plugins': False, 38} 39 40def make_action( 41 function: Callable[[Any], Any], 42 shell: bool = False, 43 activate: bool = True, 44 deactivate: bool = True, 45 debug: bool = False 46) -> Callable[[Any], Any]: 47 """ 48 Make a function a Meerschaum action. Useful for plugins that are adding multiple actions. 49 50 Parameters 51 ---------- 52 function: Callable[[Any], Any] 53 The function to become a Meerschaum action. Must accept all keyword arguments. 54 55 shell: bool, default False 56 Not used. 57 58 Returns 59 ------- 60 Another function (this is a decorator function). 61 62 Examples 63 -------- 64 >>> from meerschaum.plugins import make_action 65 >>> 66 >>> @make_action 67 ... def my_action(**kw): 68 ... print('foo') 69 ... return True, "Success" 70 >>> 71 """ 72 73 from meerschaum.actions import actions 74 from meerschaum.utils.formatting import pprint 75 package_name = function.__globals__['__name__'] 76 plugin_name = ( 77 package_name.split('.')[1] 78 if package_name.startswith('plugins.') else None 79 ) 80 plugin = Plugin(plugin_name) if plugin_name else None 81 82 if debug: 83 from meerschaum.utils.debug import dprint 84 dprint( 85 f"Adding action '{function.__name__}' from plugin " + 86 f"'{plugin}'..." 87 ) 88 89 actions[function.__name__] = function 90 return function 91 92 93def pre_sync_hook( 94 function: Callable[[Any], Any], 95) -> Callable[[Any], Any]: 96 """ 97 Register a function as a sync hook to be executed right before sync. 98 99 Parameters 100 ---------- 101 function: Callable[[Any], Any] 102 The function to execute right before a sync. 103 104 Returns 105 ------- 106 Another function (this is a decorator function). 107 108 Examples 109 -------- 110 >>> from meerschaum.plugins import pre_sync_hook 111 >>> 112 >>> @pre_sync_hook 113 ... def log_sync(pipe, **kwargs): 114 ... print(f"About to sync {pipe} with kwargs:\n{kwargs}.") 115 >>> 116 """ 117 with _locks['_pre_sync_hooks']: 118 try: 119 if function.__module__ not in _pre_sync_hooks: 120 _pre_sync_hooks[function.__module__] = [] 121 _pre_sync_hooks[function.__module__].append(function) 122 except Exception as e: 123 from meerschaum.utils.warnings import warn 124 warn(e) 125 return function 126 127 128def post_sync_hook( 129 function: Callable[[Any], Any], 130) -> Callable[[Any], Any]: 131 """ 132 Register a function as a sync hook to be executed upon completion of a sync. 133 134 Parameters 135 ---------- 136 function: Callable[[Any], Any] 137 The function to execute upon completion of a sync. 138 139 Returns 140 ------- 141 Another function (this is a decorator function). 142 143 Examples 144 -------- 145 >>> from meerschaum.plugins import post_sync_hook 146 >>> from meerschaum.utils.misc import interval_str 147 >>> from datetime import timedelta 148 >>> 149 >>> @post_sync_hook 150 ... def log_sync(pipe, success_tuple, duration=None, **kwargs): 151 ... duration_delta = timedelta(seconds=duration) 152 ... duration_text = interval_str(duration_delta) 153 ... print(f"It took {duration_text} to sync {pipe}.") 154 >>> 155 """ 156 with _locks['_post_sync_hooks']: 157 try: 158 if function.__module__ not in _post_sync_hooks: 159 _post_sync_hooks[function.__module__] = [] 160 _post_sync_hooks[function.__module__].append(function) 161 except Exception as e: 162 from meerschaum.utils.warnings import warn 163 warn(e) 164 return function 165 166 167_plugin_endpoints_to_pages = {} 168def web_page( 169 page: Union[str, None, Callable[[Any], Any]] = None, 170 login_required: bool = True, 171 skip_navbar: bool = False, 172 page_group: Optional[str] = None, 173 **kwargs 174) -> Any: 175 """ 176 Quickly add pages to the dash application. 177 178 Examples 179 -------- 180 >>> import meerschaum as mrsm 181 >>> from meerschaum.plugins import web_page 182 >>> html = mrsm.attempt_import('dash.html') 183 >>> 184 >>> @web_page('foo/bar', login_required=False) 185 >>> def foo_bar(): 186 ... return html.Div([html.H1("Hello, World!")]) 187 >>> 188 """ 189 page_str = None 190 191 def _decorator(_func: Callable[[Any], Any]) -> Callable[[Any], Any]: 192 nonlocal page_str, page_group 193 194 @functools.wraps(_func) 195 def wrapper(*_args, **_kwargs): 196 return _func(*_args, **_kwargs) 197 198 if page_str is None: 199 page_str = _func.__name__ 200 201 page_str = page_str.lstrip('/').rstrip('/').strip() 202 page_key = ( 203 ' '.join( 204 [ 205 word.capitalize() 206 for word in ( 207 page_str.replace('/dash', '').lstrip('/').rstrip('/').strip() 208 .replace('-', ' ').replace('_', ' ').split(' ') 209 ) 210 ] 211 ) 212 ) 213 214 package_name = _func.__globals__['__name__'] 215 plugin_name = ( 216 package_name.split('.')[1] 217 if package_name.startswith('plugins.') else None 218 ) 219 page_group = page_group or plugin_name 220 if page_group not in _plugin_endpoints_to_pages: 221 _plugin_endpoints_to_pages[page_group] = {} 222 _plugin_endpoints_to_pages[page_group][page_str] = { 223 'function': _func, 224 'login_required': login_required, 225 'skip_navbar': skip_navbar, 226 'page_key': page_key, 227 } 228 return wrapper 229 230 if callable(page): 231 decorator_to_return = _decorator(page) 232 page_str = page.__name__ 233 else: 234 decorator_to_return = _decorator 235 page_str = page 236 237 return decorator_to_return 238 239 240_dash_plugins = {} 241def dash_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]: 242 """ 243 Execute the function when starting the Dash application. 244 """ 245 with _locks['_dash_plugins']: 246 try: 247 if function.__module__ not in _dash_plugins: 248 _dash_plugins[function.__module__] = [] 249 _dash_plugins[function.__module__].append(function) 250 except Exception as e: 251 from meerschaum.utils.warnings import warn 252 warn(e) 253 return function 254 255 256def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]: 257 """ 258 Execute the function when initializing the Meerschaum API module. 259 Useful for lazy-loading heavy plugins only when the API is started, 260 such as when editing the `meerschaum.api.app` FastAPI app. 261 262 The FastAPI app will be passed as the only parameter. 263 264 Examples 265 -------- 266 >>> from meerschaum.plugins import api_plugin 267 >>> 268 >>> @api_plugin 269 >>> def initialize_plugin(app): 270 ... @app.get('/my/new/path') 271 ... def new_path(): 272 ... return {'message': 'It works!'} 273 >>> 274 """ 275 with _locks['_api_plugins']: 276 try: 277 if function.__module__ not in _api_plugins: 278 _api_plugins[function.__module__] = [] 279 _api_plugins[function.__module__].append(function) 280 except Exception as e: 281 from meerschaum.utils.warnings import warn 282 warn(e) 283 return function 284 285 286_synced_symlinks: bool = False 287def sync_plugins_symlinks(debug: bool = False, warn: bool = True) -> None: 288 """ 289 Update the plugins 290 """ 291 global _synced_symlinks 292 with _locks['_synced_symlinks']: 293 if _synced_symlinks: 294 return 295 296 import sys, os, pathlib, time 297 from collections import defaultdict 298 import importlib.util 299 from meerschaum.utils.misc import flatten_list, make_symlink, is_symlink 300 from meerschaum.utils.warnings import error, warn as _warn 301 from meerschaum.config.static import STATIC_CONFIG 302 from meerschaum.utils.venv import Venv, activate_venv, deactivate_venv, is_venv_active 303 from meerschaum.config._paths import ( 304 PLUGINS_RESOURCES_PATH, 305 PLUGINS_ARCHIVES_RESOURCES_PATH, 306 PLUGINS_INIT_PATH, 307 PLUGINS_DIR_PATHS, 308 PLUGINS_INTERNAL_LOCK_PATH, 309 ) 310 311 ### If the lock file exists, sleep for up to a second or until it's removed before continuing. 312 with _locks['PLUGINS_INTERNAL_LOCK_PATH']: 313 if PLUGINS_INTERNAL_LOCK_PATH.exists(): 314 lock_sleep_total = STATIC_CONFIG['plugins']['lock_sleep_total'] 315 lock_sleep_increment = STATIC_CONFIG['plugins']['lock_sleep_increment'] 316 lock_start = time.perf_counter() 317 while ( 318 (time.perf_counter() - lock_start) < lock_sleep_total 319 ): 320 time.sleep(lock_sleep_increment) 321 if not PLUGINS_INTERNAL_LOCK_PATH.exists(): 322 break 323 try: 324 PLUGINS_INTERNAL_LOCK_PATH.unlink() 325 except Exception as e: 326 if warn: 327 _warn(f"Error while removing lockfile {PLUGINS_INTERNAL_LOCK_PATH}:\n{e}") 328 break 329 330 ### Begin locking from other processes. 331 try: 332 PLUGINS_INTERNAL_LOCK_PATH.touch() 333 except Exception as e: 334 if warn: 335 _warn(f"Unable to create lockfile {PLUGINS_INTERNAL_LOCK_PATH}:\n{e}") 336 337 with _locks['internal_plugins']: 338 339 try: 340 from importlib.metadata import entry_points 341 except ImportError: 342 importlib_metadata = attempt_import('importlib_metadata', lazy=False) 343 entry_points = importlib_metadata.entry_points 344 345 ### NOTE: Allow plugins to be installed via `pip`. 346 packaged_plugin_paths = [] 347 try: 348 discovered_packaged_plugins_eps = entry_points(group='meerschaum.plugins') 349 except TypeError: 350 discovered_packaged_plugins_eps = [] 351 352 for ep in discovered_packaged_plugins_eps: 353 module_name = ep.name 354 for package_file_path in ep.dist.files: 355 if package_file_path.suffix != '.py': 356 continue 357 if str(package_file_path) == f'{module_name}.py': 358 packaged_plugin_paths.append(package_file_path.locate()) 359 elif str(package_file_path) == f'{module_name}/__init__.py': 360 packaged_plugin_paths.append(package_file_path.locate().parent) 361 362 if is_symlink(PLUGINS_RESOURCES_PATH) or not PLUGINS_RESOURCES_PATH.exists(): 363 try: 364 PLUGINS_RESOURCES_PATH.unlink() 365 except Exception: 366 pass 367 368 PLUGINS_RESOURCES_PATH.mkdir(exist_ok=True) 369 370 existing_symlinked_paths = [ 371 (PLUGINS_RESOURCES_PATH / item) 372 for item in os.listdir(PLUGINS_RESOURCES_PATH) 373 ] 374 for plugins_path in PLUGINS_DIR_PATHS: 375 if not plugins_path.exists(): 376 plugins_path.mkdir(exist_ok=True, parents=True) 377 plugins_to_be_symlinked = list(flatten_list( 378 [ 379 [ 380 (plugins_path / item) 381 for item in os.listdir(plugins_path) 382 if ( 383 not item.startswith('.') 384 ) and (item not in ('__pycache__', '__init__.py')) 385 ] 386 for plugins_path in PLUGINS_DIR_PATHS 387 ] 388 )) 389 plugins_to_be_symlinked.extend(packaged_plugin_paths) 390 391 ### Check for duplicates. 392 seen_plugins = defaultdict(lambda: 0) 393 for plugin_path in plugins_to_be_symlinked: 394 plugin_name = plugin_path.stem 395 seen_plugins[plugin_name] += 1 396 for plugin_name, plugin_count in seen_plugins.items(): 397 if plugin_count > 1: 398 if warn: 399 _warn(f"Found duplicate plugins named '{plugin_name}'.") 400 401 for plugin_symlink_path in existing_symlinked_paths: 402 real_path = pathlib.Path(os.path.realpath(plugin_symlink_path)) 403 404 ### Remove invalid symlinks. 405 if real_path not in plugins_to_be_symlinked: 406 if not is_symlink(plugin_symlink_path): 407 continue 408 try: 409 plugin_symlink_path.unlink() 410 except Exception as e: 411 pass 412 413 ### Remove valid plugins from the to-be-symlinked list. 414 else: 415 plugins_to_be_symlinked.remove(real_path) 416 417 for plugin_path in plugins_to_be_symlinked: 418 plugin_symlink_path = PLUGINS_RESOURCES_PATH / plugin_path.name 419 try: 420 ### There might be duplicate folders (e.g. __pycache__). 421 if ( 422 plugin_symlink_path.exists() 423 and 424 plugin_symlink_path.is_dir() 425 and 426 not is_symlink(plugin_symlink_path) 427 ): 428 continue 429 success, msg = make_symlink(plugin_path, plugin_symlink_path) 430 except Exception as e: 431 success, msg = False, str(e) 432 if not success: 433 if warn: 434 _warn( 435 f"Failed to create symlink {plugin_symlink_path} " 436 + f"to {plugin_path}:\n {msg}" 437 ) 438 439 ### Release symlink lock file in case other processes need it. 440 with _locks['PLUGINS_INTERNAL_LOCK_PATH']: 441 try: 442 if PLUGINS_INTERNAL_LOCK_PATH.exists(): 443 PLUGINS_INTERNAL_LOCK_PATH.unlink() 444 ### Sometimes competing threads will delete the lock file at the same time. 445 except FileNotFoundError: 446 pass 447 except Exception as e: 448 if warn: 449 _warn(f"Error cleaning up lockfile {PLUGINS_INTERNAL_LOCK_PATH}:\n{e}") 450 451 try: 452 if not PLUGINS_INIT_PATH.exists(): 453 PLUGINS_INIT_PATH.touch() 454 except Exception as e: 455 error(f"Failed to create the file '{PLUGINS_INIT_PATH}':\n{e}") 456 457 with _locks['__path__']: 458 if str(PLUGINS_RESOURCES_PATH.parent) not in __path__: 459 __path__.append(str(PLUGINS_RESOURCES_PATH.parent)) 460 461 with _locks['_synced_symlinks']: 462 _synced_symlinks = True 463 464 465def import_plugins( 466 *plugins_to_import: Union[str, List[str], None], 467 warn: bool = True, 468) -> Union[ 469 'ModuleType', Tuple['ModuleType', None] 470]: 471 """ 472 Import the Meerschaum plugins directory. 473 474 Parameters 475 ---------- 476 plugins_to_import: Union[str, List[str], None] 477 If provided, only import the specified plugins. 478 Otherwise import the entire plugins module. May be a string, list, or `None`. 479 Defaults to `None`. 480 481 Returns 482 ------- 483 A module of list of modules, depening on the number of plugins provided. 484 485 """ 486 import sys 487 import os 488 import importlib 489 from meerschaum.utils.misc import flatten_list 490 from meerschaum.config._paths import PLUGINS_RESOURCES_PATH 491 from meerschaum.utils.venv import is_venv_active, activate_venv, deactivate_venv, Venv 492 from meerschaum.utils.warnings import warn as _warn 493 plugins_to_import = list(plugins_to_import) 494 with _locks['sys.path']: 495 496 ### Since plugins may depend on other plugins, 497 ### we need to activate the virtual environments for library plugins. 498 ### This logic exists in `Plugin.activate_venv()`, 499 ### but that code requires the plugin's module to already be imported. 500 ### It's not a guarantee of correct activation order, 501 ### e.g. if a library plugin pins a specific package and another 502 plugins_names = get_plugins_names() 503 already_active_venvs = [is_venv_active(plugin_name) for plugin_name in plugins_names] 504 505 if not sys.path or sys.path[0] != str(PLUGINS_RESOURCES_PATH.parent): 506 sys.path.insert(0, str(PLUGINS_RESOURCES_PATH.parent)) 507 508 if not plugins_to_import: 509 for plugin_name in plugins_names: 510 activate_venv(plugin_name) 511 try: 512 imported_plugins = importlib.import_module(PLUGINS_RESOURCES_PATH.stem) 513 except ImportError as e: 514 _warn(f"Failed to import the plugins module:\n {e}") 515 import traceback 516 traceback.print_exc() 517 imported_plugins = None 518 for plugin_name in plugins_names: 519 if plugin_name in already_active_venvs: 520 continue 521 deactivate_venv(plugin_name) 522 523 else: 524 imported_plugins = [] 525 for plugin_name in flatten_list(plugins_to_import): 526 plugin = Plugin(plugin_name) 527 try: 528 with Venv(plugin): 529 imported_plugins.append( 530 importlib.import_module( 531 f'{PLUGINS_RESOURCES_PATH.stem}.{plugin_name}' 532 ) 533 ) 534 except Exception as e: 535 _warn( 536 f"Failed to import plugin '{plugin_name}':\n " 537 + f"{e}\n\nHere's a stacktrace:", 538 stack = False, 539 ) 540 from meerschaum.utils.formatting import get_console 541 get_console().print_exception( 542 suppress = [ 543 'meerschaum/plugins/__init__.py', 544 importlib, 545 importlib._bootstrap, 546 ] 547 ) 548 imported_plugins.append(None) 549 550 if imported_plugins is None and warn: 551 _warn(f"Failed to import plugins.", stacklevel=3) 552 553 if str(PLUGINS_RESOURCES_PATH.parent) in sys.path: 554 sys.path.remove(str(PLUGINS_RESOURCES_PATH.parent)) 555 556 if isinstance(imported_plugins, list): 557 return (imported_plugins[0] if len(imported_plugins) == 1 else tuple(imported_plugins)) 558 return imported_plugins 559 560 561def from_plugin_import(plugin_import_name: str, *attrs: str) -> Any: 562 """ 563 Emulate the `from module import x` behavior. 564 565 Parameters 566 ---------- 567 plugin_import_name: str 568 The import name of the plugin's module. 569 Separate submodules with '.' (e.g. 'compose.utils.pipes') 570 571 attrs: str 572 Names of the attributes to return. 573 574 Returns 575 ------- 576 Objects from a plugin's submodule. 577 If multiple objects are provided, return a tuple. 578 579 Examples 580 -------- 581 >>> init = from_plugin_import('compose.utils', 'init') 582 >>> with mrsm.Venv('compose'): 583 ... cf = init() 584 >>> build_parent_pipe, get_defined_pipes = from_plugin_import( 585 ... 'compose.utils.pipes', 586 ... 'build_parent_pipe', 587 ... 'get_defined_pipes', 588 ... ) 589 >>> parent_pipe = build_parent_pipe(cf) 590 >>> defined_pipes = get_defined_pipes(cf) 591 """ 592 import importlib 593 from meerschaum.config._paths import PLUGINS_RESOURCES_PATH 594 from meerschaum.utils.warnings import warn as _warn 595 if plugin_import_name.startswith('plugins.'): 596 plugin_import_name = plugin_import_name[len('plugins.'):] 597 plugin_import_parts = plugin_import_name.split('.') 598 plugin_root_name = plugin_import_parts[0] 599 plugin = mrsm.Plugin(plugin_root_name) 600 601 submodule_import_name = '.'.join( 602 [PLUGINS_RESOURCES_PATH.stem] 603 + plugin_import_parts 604 ) 605 if len(attrs) == 0: 606 raise ValueError(f"Provide which attributes to return from '{submodule_import_name}'.") 607 608 attrs_to_return = [] 609 with mrsm.Venv(plugin): 610 if plugin.module is None: 611 return None 612 613 try: 614 submodule = importlib.import_module(submodule_import_name) 615 except ImportError as e: 616 _warn( 617 f"Failed to import plugin '{submodule_import_name}':\n " 618 + f"{e}\n\nHere's a stacktrace:", 619 stack=False, 620 ) 621 from meerschaum.utils.formatting import get_console 622 get_console().print_exception( 623 suppress=[ 624 'meerschaum/plugins/__init__.py', 625 importlib, 626 importlib._bootstrap, 627 ] 628 ) 629 return None 630 631 for attr in attrs: 632 try: 633 attrs_to_return.append(getattr(submodule, attr)) 634 except Exception: 635 _warn(f"Failed to access '{attr}' from '{submodule_import_name}'.") 636 attrs_to_return.append(None) 637 638 if len(attrs) == 1: 639 return attrs_to_return[0] 640 641 return tuple(attrs_to_return) 642 643 644def load_plugins(debug: bool = False, shell: bool = False) -> None: 645 """ 646 Import Meerschaum plugins and update the actions dictionary. 647 """ 648 from inspect import isfunction, getmembers 649 from meerschaum.actions import __all__ as _all, modules 650 from meerschaum.config._paths import PLUGINS_RESOURCES_PATH 651 from meerschaum.utils.packages import get_modules_from_package 652 if debug: 653 from meerschaum.utils.debug import dprint 654 655 _plugins_names, plugins_modules = get_modules_from_package( 656 import_plugins(), 657 names = True, 658 recursive = True, 659 modules_venvs = True 660 ) 661 ### I'm appending here to keep from redefining the modules list. 662 new_modules = ( 663 [ 664 mod for mod in modules 665 if not mod.__name__.startswith(PLUGINS_RESOURCES_PATH.stem + '.') 666 ] 667 + plugins_modules 668 ) 669 n_mods = len(modules) 670 for mod in new_modules: 671 modules.append(mod) 672 for i in range(n_mods): 673 modules.pop(0) 674 675 for module in plugins_modules: 676 for name, func in getmembers(module): 677 if not isfunction(func): 678 continue 679 if name == module.__name__.split('.')[-1]: 680 make_action(func, **{'shell': shell, 'debug': debug}) 681 682 683def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None: 684 """ 685 Reload plugins back into memory. 686 687 Parameters 688 ---------- 689 plugins: Optional[List[str]], default None 690 The plugins to reload. `None` will reload all plugins. 691 692 """ 693 import sys 694 if debug: 695 from meerschaum.utils.debug import dprint 696 697 if not plugins: 698 plugins = get_plugins_names() 699 for plugin_name in plugins: 700 if debug: 701 dprint(f"Reloading plugin '{plugin_name}'...") 702 mod_name = 'plugins.' + str(plugin_name) 703 if mod_name in sys.modules: 704 del sys.modules[mod_name] 705 load_plugins(debug=debug) 706 707 708def get_plugins(*to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]: 709 """ 710 Return a list of `Plugin` objects. 711 712 Parameters 713 ---------- 714 to_load: 715 If specified, only load specific plugins. 716 Otherwise return all plugins. 717 718 try_import: bool, default True 719 If `True`, allow for plugins to be imported. 720 """ 721 from meerschaum.config._paths import PLUGINS_RESOURCES_PATH 722 import os 723 sync_plugins_symlinks() 724 _plugins = [ 725 Plugin(name) for name in ( 726 to_load or [ 727 ( 728 name if (PLUGINS_RESOURCES_PATH / name).is_dir() 729 else name[:-3] 730 ) for name in os.listdir(PLUGINS_RESOURCES_PATH) if name != '__init__.py' 731 ] 732 ) 733 ] 734 plugins = tuple(plugin for plugin in _plugins if plugin.is_installed(try_import=try_import)) 735 if len(to_load) == 1: 736 if len(plugins) == 0: 737 raise ValueError(f"Plugin '{to_load[0]}' is not installed.") 738 return plugins[0] 739 return plugins 740 741 742def get_plugins_names(*to_load, **kw) -> List[str]: 743 """ 744 Return a list of installed plugins. 745 """ 746 return [plugin.name for plugin in get_plugins(*to_load, **kw)] 747 748 749def get_plugins_modules(*to_load, **kw) -> List['ModuleType']: 750 """ 751 Return a list of modules for the installed plugins, or `None` if things break. 752 """ 753 return [plugin.module for plugin in get_plugins(*to_load, **kw)] 754 755 756def get_data_plugins() -> List[Plugin]: 757 """ 758 Only return the modules of plugins with either `fetch()` or `sync()` functions. 759 """ 760 import inspect 761 plugins = get_plugins() 762 data_names = {'sync', 'fetch'} 763 data_plugins = [] 764 for plugin in plugins: 765 for name, ob in inspect.getmembers(plugin.module): 766 if not inspect.isfunction(ob): 767 continue 768 if name not in data_names: 769 continue 770 data_plugins.append(plugin) 771 return data_plugins 772 773 774def add_plugin_argument(*args, **kwargs) -> None: 775 """ 776 Add argparse arguments under the 'Plugins options' group. 777 Takes the same parameters as the regular argparse `add_argument()` function. 778 779 Examples 780 -------- 781 >>> add_plugin_argument('--foo', type=int, help="This is my help text!") 782 >>> 783 """ 784 from meerschaum._internal.arguments._parser import groups, _seen_plugin_args, parser 785 from meerschaum.utils.warnings import warn, error 786 _parent_plugin_name = _get_parent_plugin(2) 787 title = f"Plugin '{_parent_plugin_name}' options" if _parent_plugin_name else 'Custom options' 788 group_key = 'plugin_' + (_parent_plugin_name or '') 789 if group_key not in groups: 790 groups[group_key] = parser.add_argument_group( 791 title = title, 792 ) 793 _seen_plugin_args[group_key] = set() 794 try: 795 if str(args) not in _seen_plugin_args[group_key]: 796 groups[group_key].add_argument(*args, **kwargs) 797 _seen_plugin_args[group_key].add(str(args)) 798 except Exception as e: 799 warn(e) 800 801 802def _get_parent_plugin(stacklevel: int = 1) -> Union[str, None]: 803 """If this function is called from outside a Meerschaum plugin, it will return None.""" 804 import inspect, re 805 try: 806 parent_globals = inspect.stack()[stacklevel][0].f_globals 807 parent_file = parent_globals.get('__file__', '') 808 return parent_globals['__name__'].replace('plugins.', '').split('.')[0] 809 except Exception as e: 810 return None
37class Plugin: 38 """Handle packaging of Meerschaum plugins.""" 39 def __init__( 40 self, 41 name: str, 42 version: Optional[str] = None, 43 user_id: Optional[int] = None, 44 required: Optional[List[str]] = None, 45 attributes: Optional[Dict[str, Any]] = None, 46 archive_path: Optional[pathlib.Path] = None, 47 venv_path: Optional[pathlib.Path] = None, 48 repo_connector: Optional['mrsm.connectors.api.APIConnector'] = None, 49 repo: Union['mrsm.connectors.api.APIConnector', str, None] = None, 50 ): 51 from meerschaum.config.static import STATIC_CONFIG 52 sep = STATIC_CONFIG['plugins']['repo_separator'] 53 _repo = None 54 if sep in name: 55 try: 56 name, _repo = name.split(sep) 57 except Exception as e: 58 error(f"Invalid plugin name: '{name}'") 59 self._repo_in_name = _repo 60 61 if attributes is None: 62 attributes = {} 63 self.name = name 64 self.attributes = attributes 65 self.user_id = user_id 66 self._version = version 67 if required: 68 self._required = required 69 self.archive_path = ( 70 archive_path if archive_path is not None 71 else PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz" 72 ) 73 self.venv_path = ( 74 venv_path if venv_path is not None 75 else VIRTENV_RESOURCES_PATH / self.name 76 ) 77 self._repo_connector = repo_connector 78 self._repo_keys = repo 79 80 81 @property 82 def repo_connector(self): 83 """ 84 Return the repository connector for this plugin. 85 NOTE: This imports the `connectors` module, which imports certain plugin modules. 86 """ 87 if self._repo_connector is None: 88 from meerschaum.connectors.parse import parse_repo_keys 89 90 repo_keys = self._repo_keys or self._repo_in_name 91 if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name: 92 error( 93 f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'." 94 ) 95 repo_connector = parse_repo_keys(repo_keys) 96 self._repo_connector = repo_connector 97 return self._repo_connector 98 99 100 @property 101 def version(self): 102 """ 103 Return the plugin's module version is defined (`__version__`) if it's defined. 104 """ 105 if self._version is None: 106 try: 107 self._version = self.module.__version__ 108 except Exception as e: 109 self._version = None 110 return self._version 111 112 113 @property 114 def module(self): 115 """ 116 Return the Python module of the underlying plugin. 117 """ 118 if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None: 119 if self.__file__ is None: 120 return None 121 from meerschaum.plugins import import_plugins 122 self._module = import_plugins(str(self), warn=False) 123 return self._module 124 125 126 @property 127 def __file__(self) -> Union[str, None]: 128 """ 129 Return the file path (str) of the plugin if it exists, otherwise `None`. 130 """ 131 if self.__dict__.get('_module', None) is not None: 132 return self.module.__file__ 133 134 potential_dir = PLUGINS_RESOURCES_PATH / self.name 135 if ( 136 potential_dir.exists() 137 and potential_dir.is_dir() 138 and (potential_dir / '__init__.py').exists() 139 ): 140 return str((potential_dir / '__init__.py').as_posix()) 141 142 potential_file = PLUGINS_RESOURCES_PATH / (self.name + '.py') 143 if potential_file.exists() and not potential_file.is_dir(): 144 return str(potential_file.as_posix()) 145 146 return None 147 148 149 @property 150 def requirements_file_path(self) -> Union[pathlib.Path, None]: 151 """ 152 If a file named `requirements.txt` exists, return its path. 153 """ 154 if self.__file__ is None: 155 return None 156 path = pathlib.Path(self.__file__).parent / 'requirements.txt' 157 if not path.exists(): 158 return None 159 return path 160 161 162 def is_installed(self, **kw) -> bool: 163 """ 164 Check whether a plugin is correctly installed. 165 166 Returns 167 ------- 168 A `bool` indicating whether a plugin exists and is successfully imported. 169 """ 170 return self.__file__ is not None 171 172 173 def make_tar(self, debug: bool = False) -> pathlib.Path: 174 """ 175 Compress the plugin's source files into a `.tar.gz` archive and return the archive's path. 176 177 Parameters 178 ---------- 179 debug: bool, default False 180 Verbosity toggle. 181 182 Returns 183 ------- 184 A `pathlib.Path` to the archive file's path. 185 186 """ 187 import tarfile, pathlib, subprocess, fnmatch 188 from meerschaum.utils.debug import dprint 189 from meerschaum.utils.packages import attempt_import 190 pathspec = attempt_import('pathspec', debug=debug) 191 192 if not self.__file__: 193 from meerschaum.utils.warnings import error 194 error(f"Could not find file for plugin '{self}'.") 195 if '__init__.py' in self.__file__ or os.path.isdir(self.__file__): 196 path = self.__file__.replace('__init__.py', '') 197 is_dir = True 198 else: 199 path = self.__file__ 200 is_dir = False 201 202 old_cwd = os.getcwd() 203 real_parent_path = pathlib.Path(os.path.realpath(path)).parent 204 os.chdir(real_parent_path) 205 206 default_patterns_to_ignore = [ 207 '.pyc', 208 '__pycache__/', 209 'eggs/', 210 '__pypackages__/', 211 '.git', 212 ] 213 214 def parse_gitignore() -> 'Set[str]': 215 gitignore_path = pathlib.Path(path) / '.gitignore' 216 if not gitignore_path.exists(): 217 return set(default_patterns_to_ignore) 218 with open(gitignore_path, 'r', encoding='utf-8') as f: 219 gitignore_text = f.read() 220 return set(pathspec.PathSpec.from_lines( 221 pathspec.patterns.GitWildMatchPattern, 222 default_patterns_to_ignore + gitignore_text.splitlines() 223 ).match_tree(path)) 224 225 patterns_to_ignore = parse_gitignore() if is_dir else set() 226 227 if debug: 228 dprint(f"Patterns to ignore:\n{patterns_to_ignore}") 229 230 with tarfile.open(self.archive_path, 'w:gz') as tarf: 231 if not is_dir: 232 tarf.add(f"{self.name}.py") 233 else: 234 for root, dirs, files in os.walk(self.name): 235 for f in files: 236 good_file = True 237 fp = os.path.join(root, f) 238 for pattern in patterns_to_ignore: 239 if pattern in str(fp) or f.startswith('.'): 240 good_file = False 241 break 242 if good_file: 243 if debug: 244 dprint(f"Adding '{fp}'...") 245 tarf.add(fp) 246 247 ### clean up and change back to old directory 248 os.chdir(old_cwd) 249 250 ### change to 775 to avoid permissions issues with the API in a Docker container 251 self.archive_path.chmod(0o775) 252 253 if debug: 254 dprint(f"Created archive '{self.archive_path}'.") 255 return self.archive_path 256 257 258 def install( 259 self, 260 skip_deps: bool = False, 261 force: bool = False, 262 debug: bool = False, 263 ) -> SuccessTuple: 264 """ 265 Extract a plugin's tar archive to the plugins directory. 266 267 This function checks if the plugin is already installed and if the version is equal or 268 greater than the existing installation. 269 270 Parameters 271 ---------- 272 skip_deps: bool, default False 273 If `True`, do not install dependencies. 274 275 force: bool, default False 276 If `True`, continue with installation, even if required packages fail to install. 277 278 debug: bool, default False 279 Verbosity toggle. 280 281 Returns 282 ------- 283 A `SuccessTuple` of success (bool) and a message (str). 284 285 """ 286 if self.full_name in _ongoing_installations: 287 return True, f"Already installing plugin '{self}'." 288 _ongoing_installations.add(self.full_name) 289 from meerschaum.utils.warnings import warn, error 290 if debug: 291 from meerschaum.utils.debug import dprint 292 import tarfile 293 import re 294 import ast 295 from meerschaum.plugins import sync_plugins_symlinks 296 from meerschaum.utils.packages import attempt_import, determine_version, reload_meerschaum 297 from meerschaum.utils.venv import init_venv 298 from meerschaum.utils.misc import safely_extract_tar 299 old_cwd = os.getcwd() 300 old_version = '' 301 new_version = '' 302 temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name 303 temp_dir.mkdir(exist_ok=True) 304 305 if not self.archive_path.exists(): 306 return False, f"Missing archive file for plugin '{self}'." 307 if self.version is not None: 308 old_version = self.version 309 if debug: 310 dprint(f"Found existing version '{old_version}' for plugin '{self}'.") 311 312 if debug: 313 dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...") 314 315 try: 316 with tarfile.open(self.archive_path, 'r:gz') as tarf: 317 safely_extract_tar(tarf, temp_dir) 318 except Exception as e: 319 warn(e) 320 return False, f"Failed to extract plugin '{self.name}'." 321 322 ### search for version information 323 files = os.listdir(temp_dir) 324 325 if str(files[0]) == self.name: 326 is_dir = True 327 elif str(files[0]) == self.name + '.py': 328 is_dir = False 329 else: 330 error(f"Unknown format encountered for plugin '{self}'.") 331 332 fpath = temp_dir / files[0] 333 if is_dir: 334 fpath = fpath / '__init__.py' 335 336 init_venv(self.name, debug=debug) 337 with open(fpath, 'r', encoding='utf-8') as f: 338 init_lines = f.readlines() 339 new_version = None 340 for line in init_lines: 341 if '__version__' not in line: 342 continue 343 version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip()) 344 if not version_match: 345 continue 346 new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip()) 347 break 348 if not new_version: 349 warn( 350 f"No `__version__` defined for plugin '{self}'. " 351 + "Assuming new version...", 352 stack = False, 353 ) 354 355 packaging_version = attempt_import('packaging.version') 356 try: 357 is_new_version = (not new_version and not old_version) or ( 358 packaging_version.parse(old_version) < packaging_version.parse(new_version) 359 ) 360 is_same_version = new_version and old_version and ( 361 packaging_version.parse(old_version) == packaging_version.parse(new_version) 362 ) 363 except Exception: 364 is_new_version, is_same_version = True, False 365 366 ### Determine where to permanently store the new plugin. 367 plugin_installation_dir_path = PLUGINS_DIR_PATHS[0] 368 for path in PLUGINS_DIR_PATHS: 369 files_in_plugins_dir = os.listdir(path) 370 if ( 371 self.name in files_in_plugins_dir 372 or 373 (self.name + '.py') in files_in_plugins_dir 374 ): 375 plugin_installation_dir_path = path 376 break 377 378 success_msg = ( 379 f"Successfully installed plugin '{self}'" 380 + ("\n (skipped dependencies)" if skip_deps else "") 381 + "." 382 ) 383 success, abort = None, None 384 385 if is_same_version and not force: 386 success, msg = True, ( 387 f"Plugin '{self}' is up-to-date (version {old_version}).\n" + 388 " Install again with `-f` or `--force` to reinstall." 389 ) 390 abort = True 391 elif is_new_version or force: 392 for src_dir, dirs, files in os.walk(temp_dir): 393 if success is not None: 394 break 395 dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path)) 396 if not os.path.exists(dst_dir): 397 os.mkdir(dst_dir) 398 for f in files: 399 src_file = os.path.join(src_dir, f) 400 dst_file = os.path.join(dst_dir, f) 401 if os.path.exists(dst_file): 402 os.remove(dst_file) 403 404 if debug: 405 dprint(f"Moving '{src_file}' to '{dst_dir}'...") 406 try: 407 shutil.move(src_file, dst_dir) 408 except Exception: 409 success, msg = False, ( 410 f"Failed to install plugin '{self}': " + 411 f"Could not move file '{src_file}' to '{dst_dir}'" 412 ) 413 print(msg) 414 break 415 if success is None: 416 success, msg = True, success_msg 417 else: 418 success, msg = False, ( 419 f"Your installed version of plugin '{self}' ({old_version}) is higher than " 420 + f"attempted version {new_version}." 421 ) 422 423 shutil.rmtree(temp_dir) 424 os.chdir(old_cwd) 425 426 ### Reload the plugin's module. 427 sync_plugins_symlinks(debug=debug) 428 if '_module' in self.__dict__: 429 del self.__dict__['_module'] 430 init_venv(venv=self.name, force=True, debug=debug) 431 reload_meerschaum(debug=debug) 432 433 ### if we've already failed, return here 434 if not success or abort: 435 _ongoing_installations.remove(self.full_name) 436 return success, msg 437 438 ### attempt to install dependencies 439 dependencies_installed = skip_deps or self.install_dependencies(force=force, debug=debug) 440 if not dependencies_installed: 441 _ongoing_installations.remove(self.full_name) 442 return False, f"Failed to install dependencies for plugin '{self}'." 443 444 ### handling success tuple, bool, or other (typically None) 445 setup_tuple = self.setup(debug=debug) 446 if isinstance(setup_tuple, tuple): 447 if not setup_tuple[0]: 448 success, msg = setup_tuple 449 elif isinstance(setup_tuple, bool): 450 if not setup_tuple: 451 success, msg = False, ( 452 f"Failed to run post-install setup for plugin '{self}'." + '\n' + 453 f"Check `setup()` in '{self.__file__}' for more information " + 454 "(no error message provided)." 455 ) 456 else: 457 success, msg = True, success_msg 458 elif setup_tuple is None: 459 success = True 460 msg = ( 461 f"Post-install for plugin '{self}' returned None. " + 462 "Assuming plugin successfully installed." 463 ) 464 warn(msg) 465 else: 466 success = False 467 msg = ( 468 f"Post-install for plugin '{self}' returned unexpected value " + 469 f"of type '{type(setup_tuple)}': {setup_tuple}" 470 ) 471 472 _ongoing_installations.remove(self.full_name) 473 _ = self.module 474 return success, msg 475 476 477 def remove_archive( 478 self, 479 debug: bool = False 480 ) -> SuccessTuple: 481 """Remove a plugin's archive file.""" 482 if not self.archive_path.exists(): 483 return True, f"Archive file for plugin '{self}' does not exist." 484 try: 485 self.archive_path.unlink() 486 except Exception as e: 487 return False, f"Failed to remove archive for plugin '{self}':\n{e}" 488 return True, "Success" 489 490 491 def remove_venv( 492 self, 493 debug: bool = False 494 ) -> SuccessTuple: 495 """Remove a plugin's virtual environment.""" 496 if not self.venv_path.exists(): 497 return True, f"Virtual environment for plugin '{self}' does not exist." 498 try: 499 shutil.rmtree(self.venv_path) 500 except Exception as e: 501 return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}" 502 return True, "Success" 503 504 505 def uninstall(self, debug: bool = False) -> SuccessTuple: 506 """ 507 Remove a plugin, its virtual environment, and archive file. 508 """ 509 from meerschaum.utils.packages import reload_meerschaum 510 from meerschaum.plugins import sync_plugins_symlinks 511 from meerschaum.utils.warnings import warn, info 512 warnings_thrown_count: int = 0 513 max_warnings: int = 3 514 515 if not self.is_installed(): 516 info( 517 f"Plugin '{self.name}' doesn't seem to be installed.\n " 518 + "Checking for artifacts...", 519 stack = False, 520 ) 521 else: 522 real_path = pathlib.Path(os.path.realpath(self.__file__)) 523 try: 524 if real_path.name == '__init__.py': 525 shutil.rmtree(real_path.parent) 526 else: 527 real_path.unlink() 528 except Exception as e: 529 warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False) 530 warnings_thrown_count += 1 531 else: 532 info(f"Removed source files for plugin '{self.name}'.") 533 534 if self.venv_path.exists(): 535 success, msg = self.remove_venv(debug=debug) 536 if not success: 537 warn(msg, stack=False) 538 warnings_thrown_count += 1 539 else: 540 info(f"Removed virtual environment from plugin '{self.name}'.") 541 542 success = warnings_thrown_count < max_warnings 543 sync_plugins_symlinks(debug=debug) 544 self.deactivate_venv(force=True, debug=debug) 545 reload_meerschaum(debug=debug) 546 return success, ( 547 f"Successfully uninstalled plugin '{self}'." if success 548 else f"Failed to uninstall plugin '{self}'." 549 ) 550 551 552 def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]: 553 """ 554 If exists, run the plugin's `setup()` function. 555 556 Parameters 557 ---------- 558 *args: str 559 The positional arguments passed to the `setup()` function. 560 561 debug: bool, default False 562 Verbosity toggle. 563 564 **kw: Any 565 The keyword arguments passed to the `setup()` function. 566 567 Returns 568 ------- 569 A `SuccessTuple` or `bool` indicating success. 570 571 """ 572 from meerschaum.utils.debug import dprint 573 import inspect 574 _setup = None 575 for name, fp in inspect.getmembers(self.module): 576 if name == 'setup' and inspect.isfunction(fp): 577 _setup = fp 578 break 579 580 ### assume success if no setup() is found (not necessary) 581 if _setup is None: 582 return True 583 584 sig = inspect.signature(_setup) 585 has_debug, has_kw = ('debug' in sig.parameters), False 586 for k, v in sig.parameters.items(): 587 if '**' in str(v): 588 has_kw = True 589 break 590 591 _kw = {} 592 if has_kw: 593 _kw.update(kw) 594 if has_debug: 595 _kw['debug'] = debug 596 597 if debug: 598 dprint(f"Running setup for plugin '{self}'...") 599 try: 600 self.activate_venv(debug=debug) 601 return_tuple = _setup(*args, **_kw) 602 self.deactivate_venv(debug=debug) 603 except Exception as e: 604 return False, str(e) 605 606 if isinstance(return_tuple, tuple): 607 return return_tuple 608 if isinstance(return_tuple, bool): 609 return return_tuple, f"Setup for Plugin '{self.name}' did not return a message." 610 if return_tuple is None: 611 return False, f"Setup for Plugin '{self.name}' returned None." 612 return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}" 613 614 615 def get_dependencies( 616 self, 617 debug: bool = False, 618 ) -> List[str]: 619 """ 620 If the Plugin has specified dependencies in a list called `required`, return the list. 621 622 **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages. 623 Meerschaum plugins may also specify connector keys for a repo after `'@'`. 624 625 Parameters 626 ---------- 627 debug: bool, default False 628 Verbosity toggle. 629 630 Returns 631 ------- 632 A list of required packages and plugins (str). 633 634 """ 635 if '_required' in self.__dict__: 636 return self._required 637 638 ### If the plugin has not yet been imported, 639 ### infer the dependencies from the source text. 640 ### This is not super robust, and it doesn't feel right 641 ### having multiple versions of the logic. 642 ### This is necessary when determining the activation order 643 ### without having import the module. 644 ### For consistency's sake, the module-less method does not cache the requirements. 645 if self.__dict__.get('_module', None) is None: 646 file_path = self.__file__ 647 if file_path is None: 648 return [] 649 with open(file_path, 'r', encoding='utf-8') as f: 650 text = f.read() 651 652 if 'required' not in text: 653 return [] 654 655 ### This has some limitations: 656 ### It relies on `required` being manually declared. 657 ### We lose the ability to dynamically alter the `required` list, 658 ### which is why we've kept the module-reliant method below. 659 import ast, re 660 ### NOTE: This technically would break 661 ### if `required` was the very first line of the file. 662 req_start_match = re.search(r'\nrequired(:\s*)?.*=', text) 663 if not req_start_match: 664 return [] 665 req_start = req_start_match.start() 666 equals_sign = req_start + text[req_start:].find('=') 667 668 ### Dependencies may have brackets within the strings, so push back the index. 669 first_opening_brace = equals_sign + 1 + text[equals_sign:].find('[') 670 if first_opening_brace == -1: 671 return [] 672 673 next_closing_brace = equals_sign + 1 + text[equals_sign:].find(']') 674 if next_closing_brace == -1: 675 return [] 676 677 start_ix = first_opening_brace + 1 678 end_ix = next_closing_brace 679 680 num_braces = 0 681 while True: 682 if '[' not in text[start_ix:end_ix]: 683 break 684 num_braces += 1 685 start_ix = end_ix 686 end_ix += text[end_ix + 1:].find(']') + 1 687 688 req_end = end_ix + 1 689 req_text = ( 690 text[(first_opening_brace-1):req_end] 691 .lstrip() 692 .replace('=', '', 1) 693 .lstrip() 694 .rstrip() 695 ) 696 try: 697 required = ast.literal_eval(req_text) 698 except Exception as e: 699 warn( 700 f"Unable to determine requirements for plugin '{self.name}' " 701 + "without importing the module.\n" 702 + " This may be due to dynamically setting the global `required` list.\n" 703 + f" {e}" 704 ) 705 return [] 706 return required 707 708 import inspect 709 self.activate_venv(dependencies=False, debug=debug) 710 required = [] 711 for name, val in inspect.getmembers(self.module): 712 if name == 'required': 713 required = val 714 break 715 self._required = required 716 self.deactivate_venv(dependencies=False, debug=debug) 717 return required 718 719 720 def get_required_plugins(self, debug: bool=False) -> List[mrsm.plugins.Plugin]: 721 """ 722 Return a list of required Plugin objects. 723 """ 724 from meerschaum.utils.warnings import warn 725 from meerschaum.config import get_config 726 from meerschaum.config.static import STATIC_CONFIG 727 from meerschaum.connectors.parse import is_valid_connector_keys 728 plugins = [] 729 _deps = self.get_dependencies(debug=debug) 730 sep = STATIC_CONFIG['plugins']['repo_separator'] 731 plugin_names = [ 732 _d[len('plugin:'):] for _d in _deps 733 if _d.startswith('plugin:') and len(_d) > len('plugin:') 734 ] 735 default_repo_keys = get_config('meerschaum', 'default_repository') 736 skipped_repo_keys = set() 737 738 for _plugin_name in plugin_names: 739 if sep in _plugin_name: 740 try: 741 _plugin_name, _repo_keys = _plugin_name.split(sep) 742 except Exception: 743 _repo_keys = default_repo_keys 744 warn( 745 f"Invalid repo keys for required plugin '{_plugin_name}'.\n " 746 + f"Will try to use '{_repo_keys}' instead.", 747 stack = False, 748 ) 749 else: 750 _repo_keys = default_repo_keys 751 752 if _repo_keys in skipped_repo_keys: 753 continue 754 755 if not is_valid_connector_keys(_repo_keys): 756 warn( 757 f"Invalid connector '{_repo_keys}'.\n" 758 f" Skipping required plugins from repository '{_repo_keys}'", 759 stack=False, 760 ) 761 continue 762 763 plugins.append(Plugin(_plugin_name, repo=_repo_keys)) 764 765 return plugins 766 767 768 def get_required_packages(self, debug: bool=False) -> List[str]: 769 """ 770 Return the required package names (excluding plugins). 771 """ 772 _deps = self.get_dependencies(debug=debug) 773 return [_d for _d in _deps if not _d.startswith('plugin:')] 774 775 776 def activate_venv(self, dependencies: bool=True, debug: bool=False, **kw) -> bool: 777 """ 778 Activate the virtual environments for the plugin and its dependencies. 779 780 Parameters 781 ---------- 782 dependencies: bool, default True 783 If `True`, activate the virtual environments for required plugins. 784 785 Returns 786 ------- 787 A bool indicating success. 788 """ 789 from meerschaum.utils.venv import venv_target_path 790 from meerschaum.utils.packages import activate_venv 791 from meerschaum.utils.misc import make_symlink, is_symlink 792 from meerschaum.config._paths import PACKAGE_ROOT_PATH 793 794 if dependencies: 795 for plugin in self.get_required_plugins(debug=debug): 796 plugin.activate_venv(debug=debug, **kw) 797 798 vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True) 799 venv_meerschaum_path = vtp / 'meerschaum' 800 801 try: 802 success, msg = True, "Success" 803 if is_symlink(venv_meerschaum_path): 804 if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH: 805 venv_meerschaum_path.unlink() 806 success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH) 807 except Exception as e: 808 success, msg = False, str(e) 809 if not success: 810 warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}") 811 812 return activate_venv(self.name, debug=debug, **kw) 813 814 815 def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool: 816 """ 817 Deactivate the virtual environments for the plugin and its dependencies. 818 819 Parameters 820 ---------- 821 dependencies: bool, default True 822 If `True`, deactivate the virtual environments for required plugins. 823 824 Returns 825 ------- 826 A bool indicating success. 827 """ 828 from meerschaum.utils.packages import deactivate_venv 829 success = deactivate_venv(self.name, debug=debug, **kw) 830 if dependencies: 831 for plugin in self.get_required_plugins(debug=debug): 832 plugin.deactivate_venv(debug=debug, **kw) 833 return success 834 835 836 def install_dependencies( 837 self, 838 force: bool = False, 839 debug: bool = False, 840 ) -> bool: 841 """ 842 If specified, install dependencies. 843 844 **NOTE:** Dependencies that start with `'plugin:'` will be installed as 845 Meerschaum plugins from the same repository as this Plugin. 846 To install from a different repository, add the repo keys after `'@'` 847 (e.g. `'plugin:foo@api:bar'`). 848 849 Parameters 850 ---------- 851 force: bool, default False 852 If `True`, continue with the installation, even if some 853 required packages fail to install. 854 855 debug: bool, default False 856 Verbosity toggle. 857 858 Returns 859 ------- 860 A bool indicating success. 861 """ 862 from meerschaum.utils.packages import pip_install, venv_contains_package 863 from meerschaum.utils.warnings import warn, info 864 _deps = self.get_dependencies(debug=debug) 865 if not _deps and self.requirements_file_path is None: 866 return True 867 868 plugins = self.get_required_plugins(debug=debug) 869 for _plugin in plugins: 870 if _plugin.name == self.name: 871 warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False) 872 continue 873 _success, _msg = _plugin.repo_connector.install_plugin( 874 _plugin.name, debug=debug, force=force 875 ) 876 if not _success: 877 warn( 878 f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'" 879 + f" for plugin '{self.name}':\n" + _msg, 880 stack = False, 881 ) 882 if not force: 883 warn( 884 "Try installing with the `--force` flag to continue anyway.", 885 stack = False, 886 ) 887 return False 888 info( 889 "Continuing with installation despite the failure " 890 + "(careful, things might be broken!)...", 891 icon = False 892 ) 893 894 895 ### First step: parse `requirements.txt` if it exists. 896 if self.requirements_file_path is not None: 897 if not pip_install( 898 requirements_file_path=self.requirements_file_path, 899 venv=self.name, debug=debug 900 ): 901 warn( 902 f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.", 903 stack = False, 904 ) 905 if not force: 906 warn( 907 "Try installing with `--force` to continue anyway.", 908 stack = False, 909 ) 910 return False 911 info( 912 "Continuing with installation despite the failure " 913 + "(careful, things might be broken!)...", 914 icon = False 915 ) 916 917 918 ### Don't reinstall packages that are already included in required plugins. 919 packages = [] 920 _packages = self.get_required_packages(debug=debug) 921 accounted_for_packages = set() 922 for package_name in _packages: 923 for plugin in plugins: 924 if venv_contains_package(package_name, plugin.name): 925 accounted_for_packages.add(package_name) 926 break 927 packages = [pkg for pkg in _packages if pkg not in accounted_for_packages] 928 929 ### Attempt pip packages installation. 930 if packages: 931 for package in packages: 932 if not pip_install(package, venv=self.name, debug=debug): 933 warn( 934 f"Failed to install required package '{package}'" 935 + f" for plugin '{self.name}'.", 936 stack = False, 937 ) 938 if not force: 939 warn( 940 "Try installing with `--force` to continue anyway.", 941 stack = False, 942 ) 943 return False 944 info( 945 "Continuing with installation despite the failure " 946 + "(careful, things might be broken!)...", 947 icon = False 948 ) 949 return True 950 951 952 @property 953 def full_name(self) -> str: 954 """ 955 Include the repo keys with the plugin's name. 956 """ 957 from meerschaum.config.static import STATIC_CONFIG 958 sep = STATIC_CONFIG['plugins']['repo_separator'] 959 return self.name + sep + str(self.repo_connector) 960 961 962 def __str__(self): 963 return self.name 964 965 966 def __repr__(self): 967 return f"Plugin('{self.name}', repo='{self.repo_connector}')" 968 969 970 def __del__(self): 971 pass
Handle packaging of Meerschaum plugins.
39 def __init__( 40 self, 41 name: str, 42 version: Optional[str] = None, 43 user_id: Optional[int] = None, 44 required: Optional[List[str]] = None, 45 attributes: Optional[Dict[str, Any]] = None, 46 archive_path: Optional[pathlib.Path] = None, 47 venv_path: Optional[pathlib.Path] = None, 48 repo_connector: Optional['mrsm.connectors.api.APIConnector'] = None, 49 repo: Union['mrsm.connectors.api.APIConnector', str, None] = None, 50 ): 51 from meerschaum.config.static import STATIC_CONFIG 52 sep = STATIC_CONFIG['plugins']['repo_separator'] 53 _repo = None 54 if sep in name: 55 try: 56 name, _repo = name.split(sep) 57 except Exception as e: 58 error(f"Invalid plugin name: '{name}'") 59 self._repo_in_name = _repo 60 61 if attributes is None: 62 attributes = {} 63 self.name = name 64 self.attributes = attributes 65 self.user_id = user_id 66 self._version = version 67 if required: 68 self._required = required 69 self.archive_path = ( 70 archive_path if archive_path is not None 71 else PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz" 72 ) 73 self.venv_path = ( 74 venv_path if venv_path is not None 75 else VIRTENV_RESOURCES_PATH / self.name 76 ) 77 self._repo_connector = repo_connector 78 self._repo_keys = repo
81 @property 82 def repo_connector(self): 83 """ 84 Return the repository connector for this plugin. 85 NOTE: This imports the `connectors` module, which imports certain plugin modules. 86 """ 87 if self._repo_connector is None: 88 from meerschaum.connectors.parse import parse_repo_keys 89 90 repo_keys = self._repo_keys or self._repo_in_name 91 if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name: 92 error( 93 f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'." 94 ) 95 repo_connector = parse_repo_keys(repo_keys) 96 self._repo_connector = repo_connector 97 return self._repo_connector
Return the repository connector for this plugin.
NOTE: This imports the connectors
module, which imports certain plugin modules.
100 @property 101 def version(self): 102 """ 103 Return the plugin's module version is defined (`__version__`) if it's defined. 104 """ 105 if self._version is None: 106 try: 107 self._version = self.module.__version__ 108 except Exception as e: 109 self._version = None 110 return self._version
Return the plugin's module version is defined (__version__
) if it's defined.
113 @property 114 def module(self): 115 """ 116 Return the Python module of the underlying plugin. 117 """ 118 if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None: 119 if self.__file__ is None: 120 return None 121 from meerschaum.plugins import import_plugins 122 self._module = import_plugins(str(self), warn=False) 123 return self._module
Return the Python module of the underlying plugin.
149 @property 150 def requirements_file_path(self) -> Union[pathlib.Path, None]: 151 """ 152 If a file named `requirements.txt` exists, return its path. 153 """ 154 if self.__file__ is None: 155 return None 156 path = pathlib.Path(self.__file__).parent / 'requirements.txt' 157 if not path.exists(): 158 return None 159 return path
If a file named requirements.txt
exists, return its path.
162 def is_installed(self, **kw) -> bool: 163 """ 164 Check whether a plugin is correctly installed. 165 166 Returns 167 ------- 168 A `bool` indicating whether a plugin exists and is successfully imported. 169 """ 170 return self.__file__ is not None
Check whether a plugin is correctly installed.
Returns
- A
bool
indicating whether a plugin exists and is successfully imported.
173 def make_tar(self, debug: bool = False) -> pathlib.Path: 174 """ 175 Compress the plugin's source files into a `.tar.gz` archive and return the archive's path. 176 177 Parameters 178 ---------- 179 debug: bool, default False 180 Verbosity toggle. 181 182 Returns 183 ------- 184 A `pathlib.Path` to the archive file's path. 185 186 """ 187 import tarfile, pathlib, subprocess, fnmatch 188 from meerschaum.utils.debug import dprint 189 from meerschaum.utils.packages import attempt_import 190 pathspec = attempt_import('pathspec', debug=debug) 191 192 if not self.__file__: 193 from meerschaum.utils.warnings import error 194 error(f"Could not find file for plugin '{self}'.") 195 if '__init__.py' in self.__file__ or os.path.isdir(self.__file__): 196 path = self.__file__.replace('__init__.py', '') 197 is_dir = True 198 else: 199 path = self.__file__ 200 is_dir = False 201 202 old_cwd = os.getcwd() 203 real_parent_path = pathlib.Path(os.path.realpath(path)).parent 204 os.chdir(real_parent_path) 205 206 default_patterns_to_ignore = [ 207 '.pyc', 208 '__pycache__/', 209 'eggs/', 210 '__pypackages__/', 211 '.git', 212 ] 213 214 def parse_gitignore() -> 'Set[str]': 215 gitignore_path = pathlib.Path(path) / '.gitignore' 216 if not gitignore_path.exists(): 217 return set(default_patterns_to_ignore) 218 with open(gitignore_path, 'r', encoding='utf-8') as f: 219 gitignore_text = f.read() 220 return set(pathspec.PathSpec.from_lines( 221 pathspec.patterns.GitWildMatchPattern, 222 default_patterns_to_ignore + gitignore_text.splitlines() 223 ).match_tree(path)) 224 225 patterns_to_ignore = parse_gitignore() if is_dir else set() 226 227 if debug: 228 dprint(f"Patterns to ignore:\n{patterns_to_ignore}") 229 230 with tarfile.open(self.archive_path, 'w:gz') as tarf: 231 if not is_dir: 232 tarf.add(f"{self.name}.py") 233 else: 234 for root, dirs, files in os.walk(self.name): 235 for f in files: 236 good_file = True 237 fp = os.path.join(root, f) 238 for pattern in patterns_to_ignore: 239 if pattern in str(fp) or f.startswith('.'): 240 good_file = False 241 break 242 if good_file: 243 if debug: 244 dprint(f"Adding '{fp}'...") 245 tarf.add(fp) 246 247 ### clean up and change back to old directory 248 os.chdir(old_cwd) 249 250 ### change to 775 to avoid permissions issues with the API in a Docker container 251 self.archive_path.chmod(0o775) 252 253 if debug: 254 dprint(f"Created archive '{self.archive_path}'.") 255 return self.archive_path
Compress the plugin's source files into a .tar.gz
archive and return the archive's path.
Parameters
- debug (bool, default False): Verbosity toggle.
Returns
- A
pathlib.Path
to the archive file's path.
258 def install( 259 self, 260 skip_deps: bool = False, 261 force: bool = False, 262 debug: bool = False, 263 ) -> SuccessTuple: 264 """ 265 Extract a plugin's tar archive to the plugins directory. 266 267 This function checks if the plugin is already installed and if the version is equal or 268 greater than the existing installation. 269 270 Parameters 271 ---------- 272 skip_deps: bool, default False 273 If `True`, do not install dependencies. 274 275 force: bool, default False 276 If `True`, continue with installation, even if required packages fail to install. 277 278 debug: bool, default False 279 Verbosity toggle. 280 281 Returns 282 ------- 283 A `SuccessTuple` of success (bool) and a message (str). 284 285 """ 286 if self.full_name in _ongoing_installations: 287 return True, f"Already installing plugin '{self}'." 288 _ongoing_installations.add(self.full_name) 289 from meerschaum.utils.warnings import warn, error 290 if debug: 291 from meerschaum.utils.debug import dprint 292 import tarfile 293 import re 294 import ast 295 from meerschaum.plugins import sync_plugins_symlinks 296 from meerschaum.utils.packages import attempt_import, determine_version, reload_meerschaum 297 from meerschaum.utils.venv import init_venv 298 from meerschaum.utils.misc import safely_extract_tar 299 old_cwd = os.getcwd() 300 old_version = '' 301 new_version = '' 302 temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name 303 temp_dir.mkdir(exist_ok=True) 304 305 if not self.archive_path.exists(): 306 return False, f"Missing archive file for plugin '{self}'." 307 if self.version is not None: 308 old_version = self.version 309 if debug: 310 dprint(f"Found existing version '{old_version}' for plugin '{self}'.") 311 312 if debug: 313 dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...") 314 315 try: 316 with tarfile.open(self.archive_path, 'r:gz') as tarf: 317 safely_extract_tar(tarf, temp_dir) 318 except Exception as e: 319 warn(e) 320 return False, f"Failed to extract plugin '{self.name}'." 321 322 ### search for version information 323 files = os.listdir(temp_dir) 324 325 if str(files[0]) == self.name: 326 is_dir = True 327 elif str(files[0]) == self.name + '.py': 328 is_dir = False 329 else: 330 error(f"Unknown format encountered for plugin '{self}'.") 331 332 fpath = temp_dir / files[0] 333 if is_dir: 334 fpath = fpath / '__init__.py' 335 336 init_venv(self.name, debug=debug) 337 with open(fpath, 'r', encoding='utf-8') as f: 338 init_lines = f.readlines() 339 new_version = None 340 for line in init_lines: 341 if '__version__' not in line: 342 continue 343 version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip()) 344 if not version_match: 345 continue 346 new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip()) 347 break 348 if not new_version: 349 warn( 350 f"No `__version__` defined for plugin '{self}'. " 351 + "Assuming new version...", 352 stack = False, 353 ) 354 355 packaging_version = attempt_import('packaging.version') 356 try: 357 is_new_version = (not new_version and not old_version) or ( 358 packaging_version.parse(old_version) < packaging_version.parse(new_version) 359 ) 360 is_same_version = new_version and old_version and ( 361 packaging_version.parse(old_version) == packaging_version.parse(new_version) 362 ) 363 except Exception: 364 is_new_version, is_same_version = True, False 365 366 ### Determine where to permanently store the new plugin. 367 plugin_installation_dir_path = PLUGINS_DIR_PATHS[0] 368 for path in PLUGINS_DIR_PATHS: 369 files_in_plugins_dir = os.listdir(path) 370 if ( 371 self.name in files_in_plugins_dir 372 or 373 (self.name + '.py') in files_in_plugins_dir 374 ): 375 plugin_installation_dir_path = path 376 break 377 378 success_msg = ( 379 f"Successfully installed plugin '{self}'" 380 + ("\n (skipped dependencies)" if skip_deps else "") 381 + "." 382 ) 383 success, abort = None, None 384 385 if is_same_version and not force: 386 success, msg = True, ( 387 f"Plugin '{self}' is up-to-date (version {old_version}).\n" + 388 " Install again with `-f` or `--force` to reinstall." 389 ) 390 abort = True 391 elif is_new_version or force: 392 for src_dir, dirs, files in os.walk(temp_dir): 393 if success is not None: 394 break 395 dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path)) 396 if not os.path.exists(dst_dir): 397 os.mkdir(dst_dir) 398 for f in files: 399 src_file = os.path.join(src_dir, f) 400 dst_file = os.path.join(dst_dir, f) 401 if os.path.exists(dst_file): 402 os.remove(dst_file) 403 404 if debug: 405 dprint(f"Moving '{src_file}' to '{dst_dir}'...") 406 try: 407 shutil.move(src_file, dst_dir) 408 except Exception: 409 success, msg = False, ( 410 f"Failed to install plugin '{self}': " + 411 f"Could not move file '{src_file}' to '{dst_dir}'" 412 ) 413 print(msg) 414 break 415 if success is None: 416 success, msg = True, success_msg 417 else: 418 success, msg = False, ( 419 f"Your installed version of plugin '{self}' ({old_version}) is higher than " 420 + f"attempted version {new_version}." 421 ) 422 423 shutil.rmtree(temp_dir) 424 os.chdir(old_cwd) 425 426 ### Reload the plugin's module. 427 sync_plugins_symlinks(debug=debug) 428 if '_module' in self.__dict__: 429 del self.__dict__['_module'] 430 init_venv(venv=self.name, force=True, debug=debug) 431 reload_meerschaum(debug=debug) 432 433 ### if we've already failed, return here 434 if not success or abort: 435 _ongoing_installations.remove(self.full_name) 436 return success, msg 437 438 ### attempt to install dependencies 439 dependencies_installed = skip_deps or self.install_dependencies(force=force, debug=debug) 440 if not dependencies_installed: 441 _ongoing_installations.remove(self.full_name) 442 return False, f"Failed to install dependencies for plugin '{self}'." 443 444 ### handling success tuple, bool, or other (typically None) 445 setup_tuple = self.setup(debug=debug) 446 if isinstance(setup_tuple, tuple): 447 if not setup_tuple[0]: 448 success, msg = setup_tuple 449 elif isinstance(setup_tuple, bool): 450 if not setup_tuple: 451 success, msg = False, ( 452 f"Failed to run post-install setup for plugin '{self}'." + '\n' + 453 f"Check `setup()` in '{self.__file__}' for more information " + 454 "(no error message provided)." 455 ) 456 else: 457 success, msg = True, success_msg 458 elif setup_tuple is None: 459 success = True 460 msg = ( 461 f"Post-install for plugin '{self}' returned None. " + 462 "Assuming plugin successfully installed." 463 ) 464 warn(msg) 465 else: 466 success = False 467 msg = ( 468 f"Post-install for plugin '{self}' returned unexpected value " + 469 f"of type '{type(setup_tuple)}': {setup_tuple}" 470 ) 471 472 _ongoing_installations.remove(self.full_name) 473 _ = self.module 474 return success, msg
Extract a plugin's tar archive to the plugins directory.
This function checks if the plugin is already installed and if the version is equal or greater than the existing installation.
Parameters
- skip_deps (bool, default False):
If
True
, do not install dependencies. - force (bool, default False):
If
True
, continue with installation, even if required packages fail to install. - debug (bool, default False): Verbosity toggle.
Returns
- A
SuccessTuple
of success (bool) and a message (str).
477 def remove_archive( 478 self, 479 debug: bool = False 480 ) -> SuccessTuple: 481 """Remove a plugin's archive file.""" 482 if not self.archive_path.exists(): 483 return True, f"Archive file for plugin '{self}' does not exist." 484 try: 485 self.archive_path.unlink() 486 except Exception as e: 487 return False, f"Failed to remove archive for plugin '{self}':\n{e}" 488 return True, "Success"
Remove a plugin's archive file.
491 def remove_venv( 492 self, 493 debug: bool = False 494 ) -> SuccessTuple: 495 """Remove a plugin's virtual environment.""" 496 if not self.venv_path.exists(): 497 return True, f"Virtual environment for plugin '{self}' does not exist." 498 try: 499 shutil.rmtree(self.venv_path) 500 except Exception as e: 501 return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}" 502 return True, "Success"
Remove a plugin's virtual environment.
505 def uninstall(self, debug: bool = False) -> SuccessTuple: 506 """ 507 Remove a plugin, its virtual environment, and archive file. 508 """ 509 from meerschaum.utils.packages import reload_meerschaum 510 from meerschaum.plugins import sync_plugins_symlinks 511 from meerschaum.utils.warnings import warn, info 512 warnings_thrown_count: int = 0 513 max_warnings: int = 3 514 515 if not self.is_installed(): 516 info( 517 f"Plugin '{self.name}' doesn't seem to be installed.\n " 518 + "Checking for artifacts...", 519 stack = False, 520 ) 521 else: 522 real_path = pathlib.Path(os.path.realpath(self.__file__)) 523 try: 524 if real_path.name == '__init__.py': 525 shutil.rmtree(real_path.parent) 526 else: 527 real_path.unlink() 528 except Exception as e: 529 warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False) 530 warnings_thrown_count += 1 531 else: 532 info(f"Removed source files for plugin '{self.name}'.") 533 534 if self.venv_path.exists(): 535 success, msg = self.remove_venv(debug=debug) 536 if not success: 537 warn(msg, stack=False) 538 warnings_thrown_count += 1 539 else: 540 info(f"Removed virtual environment from plugin '{self.name}'.") 541 542 success = warnings_thrown_count < max_warnings 543 sync_plugins_symlinks(debug=debug) 544 self.deactivate_venv(force=True, debug=debug) 545 reload_meerschaum(debug=debug) 546 return success, ( 547 f"Successfully uninstalled plugin '{self}'." if success 548 else f"Failed to uninstall plugin '{self}'." 549 )
Remove a plugin, its virtual environment, and archive file.
552 def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]: 553 """ 554 If exists, run the plugin's `setup()` function. 555 556 Parameters 557 ---------- 558 *args: str 559 The positional arguments passed to the `setup()` function. 560 561 debug: bool, default False 562 Verbosity toggle. 563 564 **kw: Any 565 The keyword arguments passed to the `setup()` function. 566 567 Returns 568 ------- 569 A `SuccessTuple` or `bool` indicating success. 570 571 """ 572 from meerschaum.utils.debug import dprint 573 import inspect 574 _setup = None 575 for name, fp in inspect.getmembers(self.module): 576 if name == 'setup' and inspect.isfunction(fp): 577 _setup = fp 578 break 579 580 ### assume success if no setup() is found (not necessary) 581 if _setup is None: 582 return True 583 584 sig = inspect.signature(_setup) 585 has_debug, has_kw = ('debug' in sig.parameters), False 586 for k, v in sig.parameters.items(): 587 if '**' in str(v): 588 has_kw = True 589 break 590 591 _kw = {} 592 if has_kw: 593 _kw.update(kw) 594 if has_debug: 595 _kw['debug'] = debug 596 597 if debug: 598 dprint(f"Running setup for plugin '{self}'...") 599 try: 600 self.activate_venv(debug=debug) 601 return_tuple = _setup(*args, **_kw) 602 self.deactivate_venv(debug=debug) 603 except Exception as e: 604 return False, str(e) 605 606 if isinstance(return_tuple, tuple): 607 return return_tuple 608 if isinstance(return_tuple, bool): 609 return return_tuple, f"Setup for Plugin '{self.name}' did not return a message." 610 if return_tuple is None: 611 return False, f"Setup for Plugin '{self.name}' returned None." 612 return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"
615 def get_dependencies( 616 self, 617 debug: bool = False, 618 ) -> List[str]: 619 """ 620 If the Plugin has specified dependencies in a list called `required`, return the list. 621 622 **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages. 623 Meerschaum plugins may also specify connector keys for a repo after `'@'`. 624 625 Parameters 626 ---------- 627 debug: bool, default False 628 Verbosity toggle. 629 630 Returns 631 ------- 632 A list of required packages and plugins (str). 633 634 """ 635 if '_required' in self.__dict__: 636 return self._required 637 638 ### If the plugin has not yet been imported, 639 ### infer the dependencies from the source text. 640 ### This is not super robust, and it doesn't feel right 641 ### having multiple versions of the logic. 642 ### This is necessary when determining the activation order 643 ### without having import the module. 644 ### For consistency's sake, the module-less method does not cache the requirements. 645 if self.__dict__.get('_module', None) is None: 646 file_path = self.__file__ 647 if file_path is None: 648 return [] 649 with open(file_path, 'r', encoding='utf-8') as f: 650 text = f.read() 651 652 if 'required' not in text: 653 return [] 654 655 ### This has some limitations: 656 ### It relies on `required` being manually declared. 657 ### We lose the ability to dynamically alter the `required` list, 658 ### which is why we've kept the module-reliant method below. 659 import ast, re 660 ### NOTE: This technically would break 661 ### if `required` was the very first line of the file. 662 req_start_match = re.search(r'\nrequired(:\s*)?.*=', text) 663 if not req_start_match: 664 return [] 665 req_start = req_start_match.start() 666 equals_sign = req_start + text[req_start:].find('=') 667 668 ### Dependencies may have brackets within the strings, so push back the index. 669 first_opening_brace = equals_sign + 1 + text[equals_sign:].find('[') 670 if first_opening_brace == -1: 671 return [] 672 673 next_closing_brace = equals_sign + 1 + text[equals_sign:].find(']') 674 if next_closing_brace == -1: 675 return [] 676 677 start_ix = first_opening_brace + 1 678 end_ix = next_closing_brace 679 680 num_braces = 0 681 while True: 682 if '[' not in text[start_ix:end_ix]: 683 break 684 num_braces += 1 685 start_ix = end_ix 686 end_ix += text[end_ix + 1:].find(']') + 1 687 688 req_end = end_ix + 1 689 req_text = ( 690 text[(first_opening_brace-1):req_end] 691 .lstrip() 692 .replace('=', '', 1) 693 .lstrip() 694 .rstrip() 695 ) 696 try: 697 required = ast.literal_eval(req_text) 698 except Exception as e: 699 warn( 700 f"Unable to determine requirements for plugin '{self.name}' " 701 + "without importing the module.\n" 702 + " This may be due to dynamically setting the global `required` list.\n" 703 + f" {e}" 704 ) 705 return [] 706 return required 707 708 import inspect 709 self.activate_venv(dependencies=False, debug=debug) 710 required = [] 711 for name, val in inspect.getmembers(self.module): 712 if name == 'required': 713 required = val 714 break 715 self._required = required 716 self.deactivate_venv(dependencies=False, debug=debug) 717 return required
If the Plugin has specified dependencies in a list called required
, return the list.
NOTE: Dependecies which start with 'plugin:'
are Meerschaum plugins, not pip packages.
Meerschaum plugins may also specify connector keys for a repo after '@'
.
Parameters
- debug (bool, default False): Verbosity toggle.
Returns
- A list of required packages and plugins (str).
720 def get_required_plugins(self, debug: bool=False) -> List[mrsm.plugins.Plugin]: 721 """ 722 Return a list of required Plugin objects. 723 """ 724 from meerschaum.utils.warnings import warn 725 from meerschaum.config import get_config 726 from meerschaum.config.static import STATIC_CONFIG 727 from meerschaum.connectors.parse import is_valid_connector_keys 728 plugins = [] 729 _deps = self.get_dependencies(debug=debug) 730 sep = STATIC_CONFIG['plugins']['repo_separator'] 731 plugin_names = [ 732 _d[len('plugin:'):] for _d in _deps 733 if _d.startswith('plugin:') and len(_d) > len('plugin:') 734 ] 735 default_repo_keys = get_config('meerschaum', 'default_repository') 736 skipped_repo_keys = set() 737 738 for _plugin_name in plugin_names: 739 if sep in _plugin_name: 740 try: 741 _plugin_name, _repo_keys = _plugin_name.split(sep) 742 except Exception: 743 _repo_keys = default_repo_keys 744 warn( 745 f"Invalid repo keys for required plugin '{_plugin_name}'.\n " 746 + f"Will try to use '{_repo_keys}' instead.", 747 stack = False, 748 ) 749 else: 750 _repo_keys = default_repo_keys 751 752 if _repo_keys in skipped_repo_keys: 753 continue 754 755 if not is_valid_connector_keys(_repo_keys): 756 warn( 757 f"Invalid connector '{_repo_keys}'.\n" 758 f" Skipping required plugins from repository '{_repo_keys}'", 759 stack=False, 760 ) 761 continue 762 763 plugins.append(Plugin(_plugin_name, repo=_repo_keys)) 764 765 return plugins
Return a list of required Plugin objects.
768 def get_required_packages(self, debug: bool=False) -> List[str]: 769 """ 770 Return the required package names (excluding plugins). 771 """ 772 _deps = self.get_dependencies(debug=debug) 773 return [_d for _d in _deps if not _d.startswith('plugin:')]
Return the required package names (excluding plugins).
776 def activate_venv(self, dependencies: bool=True, debug: bool=False, **kw) -> bool: 777 """ 778 Activate the virtual environments for the plugin and its dependencies. 779 780 Parameters 781 ---------- 782 dependencies: bool, default True 783 If `True`, activate the virtual environments for required plugins. 784 785 Returns 786 ------- 787 A bool indicating success. 788 """ 789 from meerschaum.utils.venv import venv_target_path 790 from meerschaum.utils.packages import activate_venv 791 from meerschaum.utils.misc import make_symlink, is_symlink 792 from meerschaum.config._paths import PACKAGE_ROOT_PATH 793 794 if dependencies: 795 for plugin in self.get_required_plugins(debug=debug): 796 plugin.activate_venv(debug=debug, **kw) 797 798 vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True) 799 venv_meerschaum_path = vtp / 'meerschaum' 800 801 try: 802 success, msg = True, "Success" 803 if is_symlink(venv_meerschaum_path): 804 if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH: 805 venv_meerschaum_path.unlink() 806 success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH) 807 except Exception as e: 808 success, msg = False, str(e) 809 if not success: 810 warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}") 811 812 return activate_venv(self.name, debug=debug, **kw)
Activate the virtual environments for the plugin and its dependencies.
Parameters
- dependencies (bool, default True):
If
True
, activate the virtual environments for required plugins.
Returns
- A bool indicating success.
815 def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool: 816 """ 817 Deactivate the virtual environments for the plugin and its dependencies. 818 819 Parameters 820 ---------- 821 dependencies: bool, default True 822 If `True`, deactivate the virtual environments for required plugins. 823 824 Returns 825 ------- 826 A bool indicating success. 827 """ 828 from meerschaum.utils.packages import deactivate_venv 829 success = deactivate_venv(self.name, debug=debug, **kw) 830 if dependencies: 831 for plugin in self.get_required_plugins(debug=debug): 832 plugin.deactivate_venv(debug=debug, **kw) 833 return success
Deactivate the virtual environments for the plugin and its dependencies.
Parameters
- dependencies (bool, default True):
If
True
, deactivate the virtual environments for required plugins.
Returns
- A bool indicating success.
836 def install_dependencies( 837 self, 838 force: bool = False, 839 debug: bool = False, 840 ) -> bool: 841 """ 842 If specified, install dependencies. 843 844 **NOTE:** Dependencies that start with `'plugin:'` will be installed as 845 Meerschaum plugins from the same repository as this Plugin. 846 To install from a different repository, add the repo keys after `'@'` 847 (e.g. `'plugin:foo@api:bar'`). 848 849 Parameters 850 ---------- 851 force: bool, default False 852 If `True`, continue with the installation, even if some 853 required packages fail to install. 854 855 debug: bool, default False 856 Verbosity toggle. 857 858 Returns 859 ------- 860 A bool indicating success. 861 """ 862 from meerschaum.utils.packages import pip_install, venv_contains_package 863 from meerschaum.utils.warnings import warn, info 864 _deps = self.get_dependencies(debug=debug) 865 if not _deps and self.requirements_file_path is None: 866 return True 867 868 plugins = self.get_required_plugins(debug=debug) 869 for _plugin in plugins: 870 if _plugin.name == self.name: 871 warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False) 872 continue 873 _success, _msg = _plugin.repo_connector.install_plugin( 874 _plugin.name, debug=debug, force=force 875 ) 876 if not _success: 877 warn( 878 f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'" 879 + f" for plugin '{self.name}':\n" + _msg, 880 stack = False, 881 ) 882 if not force: 883 warn( 884 "Try installing with the `--force` flag to continue anyway.", 885 stack = False, 886 ) 887 return False 888 info( 889 "Continuing with installation despite the failure " 890 + "(careful, things might be broken!)...", 891 icon = False 892 ) 893 894 895 ### First step: parse `requirements.txt` if it exists. 896 if self.requirements_file_path is not None: 897 if not pip_install( 898 requirements_file_path=self.requirements_file_path, 899 venv=self.name, debug=debug 900 ): 901 warn( 902 f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.", 903 stack = False, 904 ) 905 if not force: 906 warn( 907 "Try installing with `--force` to continue anyway.", 908 stack = False, 909 ) 910 return False 911 info( 912 "Continuing with installation despite the failure " 913 + "(careful, things might be broken!)...", 914 icon = False 915 ) 916 917 918 ### Don't reinstall packages that are already included in required plugins. 919 packages = [] 920 _packages = self.get_required_packages(debug=debug) 921 accounted_for_packages = set() 922 for package_name in _packages: 923 for plugin in plugins: 924 if venv_contains_package(package_name, plugin.name): 925 accounted_for_packages.add(package_name) 926 break 927 packages = [pkg for pkg in _packages if pkg not in accounted_for_packages] 928 929 ### Attempt pip packages installation. 930 if packages: 931 for package in packages: 932 if not pip_install(package, venv=self.name, debug=debug): 933 warn( 934 f"Failed to install required package '{package}'" 935 + f" for plugin '{self.name}'.", 936 stack = False, 937 ) 938 if not force: 939 warn( 940 "Try installing with `--force` to continue anyway.", 941 stack = False, 942 ) 943 return False 944 info( 945 "Continuing with installation despite the failure " 946 + "(careful, things might be broken!)...", 947 icon = False 948 ) 949 return True
If specified, install dependencies.
NOTE: Dependencies that start with 'plugin:'
will be installed as
Meerschaum plugins from the same repository as this Plugin.
To install from a different repository, add the repo keys after '@'
(e.g. 'plugin:foo@api:bar'
).
Parameters
- force (bool, default False):
If
True
, continue with the installation, even if some required packages fail to install. - debug (bool, default False): Verbosity toggle.
Returns
- A bool indicating success.
952 @property 953 def full_name(self) -> str: 954 """ 955 Include the repo keys with the plugin's name. 956 """ 957 from meerschaum.config.static import STATIC_CONFIG 958 sep = STATIC_CONFIG['plugins']['repo_separator'] 959 return self.name + sep + str(self.repo_connector)
Include the repo keys with the plugin's name.
41def make_action( 42 function: Callable[[Any], Any], 43 shell: bool = False, 44 activate: bool = True, 45 deactivate: bool = True, 46 debug: bool = False 47) -> Callable[[Any], Any]: 48 """ 49 Make a function a Meerschaum action. Useful for plugins that are adding multiple actions. 50 51 Parameters 52 ---------- 53 function: Callable[[Any], Any] 54 The function to become a Meerschaum action. Must accept all keyword arguments. 55 56 shell: bool, default False 57 Not used. 58 59 Returns 60 ------- 61 Another function (this is a decorator function). 62 63 Examples 64 -------- 65 >>> from meerschaum.plugins import make_action 66 >>> 67 >>> @make_action 68 ... def my_action(**kw): 69 ... print('foo') 70 ... return True, "Success" 71 >>> 72 """ 73 74 from meerschaum.actions import actions 75 from meerschaum.utils.formatting import pprint 76 package_name = function.__globals__['__name__'] 77 plugin_name = ( 78 package_name.split('.')[1] 79 if package_name.startswith('plugins.') else None 80 ) 81 plugin = Plugin(plugin_name) if plugin_name else None 82 83 if debug: 84 from meerschaum.utils.debug import dprint 85 dprint( 86 f"Adding action '{function.__name__}' from plugin " + 87 f"'{plugin}'..." 88 ) 89 90 actions[function.__name__] = function 91 return function
Make a function a Meerschaum action. Useful for plugins that are adding multiple actions.
Parameters
- function (Callable[[Any], Any]): The function to become a Meerschaum action. Must accept all keyword arguments.
- shell (bool, default False): Not used.
Returns
- Another function (this is a decorator function).
Examples
>>> from meerschaum.plugins import make_action
>>>
>>> @make_action
... def my_action(**kw):
... print('foo')
... return True, "Success"
>>>
257def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]: 258 """ 259 Execute the function when initializing the Meerschaum API module. 260 Useful for lazy-loading heavy plugins only when the API is started, 261 such as when editing the `meerschaum.api.app` FastAPI app. 262 263 The FastAPI app will be passed as the only parameter. 264 265 Examples 266 -------- 267 >>> from meerschaum.plugins import api_plugin 268 >>> 269 >>> @api_plugin 270 >>> def initialize_plugin(app): 271 ... @app.get('/my/new/path') 272 ... def new_path(): 273 ... return {'message': 'It works!'} 274 >>> 275 """ 276 with _locks['_api_plugins']: 277 try: 278 if function.__module__ not in _api_plugins: 279 _api_plugins[function.__module__] = [] 280 _api_plugins[function.__module__].append(function) 281 except Exception as e: 282 from meerschaum.utils.warnings import warn 283 warn(e) 284 return function
Execute the function when initializing the Meerschaum API module.
Useful for lazy-loading heavy plugins only when the API is started,
such as when editing the meerschaum.api.app
FastAPI app.
The FastAPI app will be passed as the only parameter.
Examples
>>> from meerschaum.plugins import api_plugin
>>>
>>> @api_plugin
>>> def initialize_plugin(app):
... @app.get('/my/new/path')
... def new_path():
... return {'message': 'It works!'}
>>>
242def dash_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]: 243 """ 244 Execute the function when starting the Dash application. 245 """ 246 with _locks['_dash_plugins']: 247 try: 248 if function.__module__ not in _dash_plugins: 249 _dash_plugins[function.__module__] = [] 250 _dash_plugins[function.__module__].append(function) 251 except Exception as e: 252 from meerschaum.utils.warnings import warn 253 warn(e) 254 return function
Execute the function when starting the Dash application.
169def web_page( 170 page: Union[str, None, Callable[[Any], Any]] = None, 171 login_required: bool = True, 172 skip_navbar: bool = False, 173 page_group: Optional[str] = None, 174 **kwargs 175) -> Any: 176 """ 177 Quickly add pages to the dash application. 178 179 Examples 180 -------- 181 >>> import meerschaum as mrsm 182 >>> from meerschaum.plugins import web_page 183 >>> html = mrsm.attempt_import('dash.html') 184 >>> 185 >>> @web_page('foo/bar', login_required=False) 186 >>> def foo_bar(): 187 ... return html.Div([html.H1("Hello, World!")]) 188 >>> 189 """ 190 page_str = None 191 192 def _decorator(_func: Callable[[Any], Any]) -> Callable[[Any], Any]: 193 nonlocal page_str, page_group 194 195 @functools.wraps(_func) 196 def wrapper(*_args, **_kwargs): 197 return _func(*_args, **_kwargs) 198 199 if page_str is None: 200 page_str = _func.__name__ 201 202 page_str = page_str.lstrip('/').rstrip('/').strip() 203 page_key = ( 204 ' '.join( 205 [ 206 word.capitalize() 207 for word in ( 208 page_str.replace('/dash', '').lstrip('/').rstrip('/').strip() 209 .replace('-', ' ').replace('_', ' ').split(' ') 210 ) 211 ] 212 ) 213 ) 214 215 package_name = _func.__globals__['__name__'] 216 plugin_name = ( 217 package_name.split('.')[1] 218 if package_name.startswith('plugins.') else None 219 ) 220 page_group = page_group or plugin_name 221 if page_group not in _plugin_endpoints_to_pages: 222 _plugin_endpoints_to_pages[page_group] = {} 223 _plugin_endpoints_to_pages[page_group][page_str] = { 224 'function': _func, 225 'login_required': login_required, 226 'skip_navbar': skip_navbar, 227 'page_key': page_key, 228 } 229 return wrapper 230 231 if callable(page): 232 decorator_to_return = _decorator(page) 233 page_str = page.__name__ 234 else: 235 decorator_to_return = _decorator 236 page_str = page 237 238 return decorator_to_return
Quickly add pages to the dash application.
Examples
>>> import meerschaum as mrsm
>>> from meerschaum.plugins import web_page
>>> html = mrsm.attempt_import('dash.html')
>>>
>>> @web_page('foo/bar', login_required=False)
>>> def foo_bar():
... return html.Div([html.H1("Hello, World!")])
>>>
466def import_plugins( 467 *plugins_to_import: Union[str, List[str], None], 468 warn: bool = True, 469) -> Union[ 470 'ModuleType', Tuple['ModuleType', None] 471]: 472 """ 473 Import the Meerschaum plugins directory. 474 475 Parameters 476 ---------- 477 plugins_to_import: Union[str, List[str], None] 478 If provided, only import the specified plugins. 479 Otherwise import the entire plugins module. May be a string, list, or `None`. 480 Defaults to `None`. 481 482 Returns 483 ------- 484 A module of list of modules, depening on the number of plugins provided. 485 486 """ 487 import sys 488 import os 489 import importlib 490 from meerschaum.utils.misc import flatten_list 491 from meerschaum.config._paths import PLUGINS_RESOURCES_PATH 492 from meerschaum.utils.venv import is_venv_active, activate_venv, deactivate_venv, Venv 493 from meerschaum.utils.warnings import warn as _warn 494 plugins_to_import = list(plugins_to_import) 495 with _locks['sys.path']: 496 497 ### Since plugins may depend on other plugins, 498 ### we need to activate the virtual environments for library plugins. 499 ### This logic exists in `Plugin.activate_venv()`, 500 ### but that code requires the plugin's module to already be imported. 501 ### It's not a guarantee of correct activation order, 502 ### e.g. if a library plugin pins a specific package and another 503 plugins_names = get_plugins_names() 504 already_active_venvs = [is_venv_active(plugin_name) for plugin_name in plugins_names] 505 506 if not sys.path or sys.path[0] != str(PLUGINS_RESOURCES_PATH.parent): 507 sys.path.insert(0, str(PLUGINS_RESOURCES_PATH.parent)) 508 509 if not plugins_to_import: 510 for plugin_name in plugins_names: 511 activate_venv(plugin_name) 512 try: 513 imported_plugins = importlib.import_module(PLUGINS_RESOURCES_PATH.stem) 514 except ImportError as e: 515 _warn(f"Failed to import the plugins module:\n {e}") 516 import traceback 517 traceback.print_exc() 518 imported_plugins = None 519 for plugin_name in plugins_names: 520 if plugin_name in already_active_venvs: 521 continue 522 deactivate_venv(plugin_name) 523 524 else: 525 imported_plugins = [] 526 for plugin_name in flatten_list(plugins_to_import): 527 plugin = Plugin(plugin_name) 528 try: 529 with Venv(plugin): 530 imported_plugins.append( 531 importlib.import_module( 532 f'{PLUGINS_RESOURCES_PATH.stem}.{plugin_name}' 533 ) 534 ) 535 except Exception as e: 536 _warn( 537 f"Failed to import plugin '{plugin_name}':\n " 538 + f"{e}\n\nHere's a stacktrace:", 539 stack = False, 540 ) 541 from meerschaum.utils.formatting import get_console 542 get_console().print_exception( 543 suppress = [ 544 'meerschaum/plugins/__init__.py', 545 importlib, 546 importlib._bootstrap, 547 ] 548 ) 549 imported_plugins.append(None) 550 551 if imported_plugins is None and warn: 552 _warn(f"Failed to import plugins.", stacklevel=3) 553 554 if str(PLUGINS_RESOURCES_PATH.parent) in sys.path: 555 sys.path.remove(str(PLUGINS_RESOURCES_PATH.parent)) 556 557 if isinstance(imported_plugins, list): 558 return (imported_plugins[0] if len(imported_plugins) == 1 else tuple(imported_plugins)) 559 return imported_plugins
Import the Meerschaum plugins directory.
Parameters
- plugins_to_import (Union[str, List[str], None]):
If provided, only import the specified plugins.
Otherwise import the entire plugins module. May be a string, list, or
None
. Defaults toNone
.
Returns
- A module of list of modules, depening on the number of plugins provided.
562def from_plugin_import(plugin_import_name: str, *attrs: str) -> Any: 563 """ 564 Emulate the `from module import x` behavior. 565 566 Parameters 567 ---------- 568 plugin_import_name: str 569 The import name of the plugin's module. 570 Separate submodules with '.' (e.g. 'compose.utils.pipes') 571 572 attrs: str 573 Names of the attributes to return. 574 575 Returns 576 ------- 577 Objects from a plugin's submodule. 578 If multiple objects are provided, return a tuple. 579 580 Examples 581 -------- 582 >>> init = from_plugin_import('compose.utils', 'init') 583 >>> with mrsm.Venv('compose'): 584 ... cf = init() 585 >>> build_parent_pipe, get_defined_pipes = from_plugin_import( 586 ... 'compose.utils.pipes', 587 ... 'build_parent_pipe', 588 ... 'get_defined_pipes', 589 ... ) 590 >>> parent_pipe = build_parent_pipe(cf) 591 >>> defined_pipes = get_defined_pipes(cf) 592 """ 593 import importlib 594 from meerschaum.config._paths import PLUGINS_RESOURCES_PATH 595 from meerschaum.utils.warnings import warn as _warn 596 if plugin_import_name.startswith('plugins.'): 597 plugin_import_name = plugin_import_name[len('plugins.'):] 598 plugin_import_parts = plugin_import_name.split('.') 599 plugin_root_name = plugin_import_parts[0] 600 plugin = mrsm.Plugin(plugin_root_name) 601 602 submodule_import_name = '.'.join( 603 [PLUGINS_RESOURCES_PATH.stem] 604 + plugin_import_parts 605 ) 606 if len(attrs) == 0: 607 raise ValueError(f"Provide which attributes to return from '{submodule_import_name}'.") 608 609 attrs_to_return = [] 610 with mrsm.Venv(plugin): 611 if plugin.module is None: 612 return None 613 614 try: 615 submodule = importlib.import_module(submodule_import_name) 616 except ImportError as e: 617 _warn( 618 f"Failed to import plugin '{submodule_import_name}':\n " 619 + f"{e}\n\nHere's a stacktrace:", 620 stack=False, 621 ) 622 from meerschaum.utils.formatting import get_console 623 get_console().print_exception( 624 suppress=[ 625 'meerschaum/plugins/__init__.py', 626 importlib, 627 importlib._bootstrap, 628 ] 629 ) 630 return None 631 632 for attr in attrs: 633 try: 634 attrs_to_return.append(getattr(submodule, attr)) 635 except Exception: 636 _warn(f"Failed to access '{attr}' from '{submodule_import_name}'.") 637 attrs_to_return.append(None) 638 639 if len(attrs) == 1: 640 return attrs_to_return[0] 641 642 return tuple(attrs_to_return)
Emulate the from module import x
behavior.
Parameters
- plugin_import_name (str): The import name of the plugin's module. Separate submodules with '.' (e.g. 'compose.utils.pipes')
- attrs (str): Names of the attributes to return.
Returns
- Objects from a plugin's submodule.
- If multiple objects are provided, return a tuple.
Examples
>>> init = from_plugin_import('compose.utils', 'init')
>>> with mrsm.Venv('compose'):
... cf = init()
>>> build_parent_pipe, get_defined_pipes = from_plugin_import(
... 'compose.utils.pipes',
... 'build_parent_pipe',
... 'get_defined_pipes',
... )
>>> parent_pipe = build_parent_pipe(cf)
>>> defined_pipes = get_defined_pipes(cf)
684def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None: 685 """ 686 Reload plugins back into memory. 687 688 Parameters 689 ---------- 690 plugins: Optional[List[str]], default None 691 The plugins to reload. `None` will reload all plugins. 692 693 """ 694 import sys 695 if debug: 696 from meerschaum.utils.debug import dprint 697 698 if not plugins: 699 plugins = get_plugins_names() 700 for plugin_name in plugins: 701 if debug: 702 dprint(f"Reloading plugin '{plugin_name}'...") 703 mod_name = 'plugins.' + str(plugin_name) 704 if mod_name in sys.modules: 705 del sys.modules[mod_name] 706 load_plugins(debug=debug)
Reload plugins back into memory.
Parameters
- plugins (Optional[List[str]], default None):
The plugins to reload.
None
will reload all plugins.
709def get_plugins(*to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]: 710 """ 711 Return a list of `Plugin` objects. 712 713 Parameters 714 ---------- 715 to_load: 716 If specified, only load specific plugins. 717 Otherwise return all plugins. 718 719 try_import: bool, default True 720 If `True`, allow for plugins to be imported. 721 """ 722 from meerschaum.config._paths import PLUGINS_RESOURCES_PATH 723 import os 724 sync_plugins_symlinks() 725 _plugins = [ 726 Plugin(name) for name in ( 727 to_load or [ 728 ( 729 name if (PLUGINS_RESOURCES_PATH / name).is_dir() 730 else name[:-3] 731 ) for name in os.listdir(PLUGINS_RESOURCES_PATH) if name != '__init__.py' 732 ] 733 ) 734 ] 735 plugins = tuple(plugin for plugin in _plugins if plugin.is_installed(try_import=try_import)) 736 if len(to_load) == 1: 737 if len(plugins) == 0: 738 raise ValueError(f"Plugin '{to_load[0]}' is not installed.") 739 return plugins[0] 740 return plugins
Return a list of Plugin
objects.
Parameters
- to_load:: If specified, only load specific plugins. Otherwise return all plugins.
- try_import (bool, default True):
If
True
, allow for plugins to be imported.
757def get_data_plugins() -> List[Plugin]: 758 """ 759 Only return the modules of plugins with either `fetch()` or `sync()` functions. 760 """ 761 import inspect 762 plugins = get_plugins() 763 data_names = {'sync', 'fetch'} 764 data_plugins = [] 765 for plugin in plugins: 766 for name, ob in inspect.getmembers(plugin.module): 767 if not inspect.isfunction(ob): 768 continue 769 if name not in data_names: 770 continue 771 data_plugins.append(plugin) 772 return data_plugins
Only return the modules of plugins with either fetch()
or sync()
functions.
775def add_plugin_argument(*args, **kwargs) -> None: 776 """ 777 Add argparse arguments under the 'Plugins options' group. 778 Takes the same parameters as the regular argparse `add_argument()` function. 779 780 Examples 781 -------- 782 >>> add_plugin_argument('--foo', type=int, help="This is my help text!") 783 >>> 784 """ 785 from meerschaum._internal.arguments._parser import groups, _seen_plugin_args, parser 786 from meerschaum.utils.warnings import warn, error 787 _parent_plugin_name = _get_parent_plugin(2) 788 title = f"Plugin '{_parent_plugin_name}' options" if _parent_plugin_name else 'Custom options' 789 group_key = 'plugin_' + (_parent_plugin_name or '') 790 if group_key not in groups: 791 groups[group_key] = parser.add_argument_group( 792 title = title, 793 ) 794 _seen_plugin_args[group_key] = set() 795 try: 796 if str(args) not in _seen_plugin_args[group_key]: 797 groups[group_key].add_argument(*args, **kwargs) 798 _seen_plugin_args[group_key].add(str(args)) 799 except Exception as e: 800 warn(e)
Add argparse arguments under the 'Plugins options' group.
Takes the same parameters as the regular argparse add_argument()
function.
Examples
>>> add_plugin_argument('--foo', type=int, help="This is my help text!")
>>>
94def pre_sync_hook( 95 function: Callable[[Any], Any], 96) -> Callable[[Any], Any]: 97 """ 98 Register a function as a sync hook to be executed right before sync. 99 100 Parameters 101 ---------- 102 function: Callable[[Any], Any] 103 The function to execute right before a sync. 104 105 Returns 106 ------- 107 Another function (this is a decorator function). 108 109 Examples 110 -------- 111 >>> from meerschaum.plugins import pre_sync_hook 112 >>> 113 >>> @pre_sync_hook 114 ... def log_sync(pipe, **kwargs): 115 ... print(f"About to sync {pipe} with kwargs:\n{kwargs}.") 116 >>> 117 """ 118 with _locks['_pre_sync_hooks']: 119 try: 120 if function.__module__ not in _pre_sync_hooks: 121 _pre_sync_hooks[function.__module__] = [] 122 _pre_sync_hooks[function.__module__].append(function) 123 except Exception as e: 124 from meerschaum.utils.warnings import warn 125 warn(e) 126 return function
Register a function as a sync hook to be executed right before sync.
Parameters
----------
function: Callable[[Any], Any]
The function to execute right before a sync.
Returns
-------
Another function (this is a decorator function).
Examples
--------
>>> from meerschaum.plugins import pre_sync_hook
>>>
>>> @pre_sync_hook
... def log_sync(pipe, **kwargs):
... print(f"About to sync {pipe} with kwargs:
{kwargs}.")
>
129def post_sync_hook( 130 function: Callable[[Any], Any], 131) -> Callable[[Any], Any]: 132 """ 133 Register a function as a sync hook to be executed upon completion of a sync. 134 135 Parameters 136 ---------- 137 function: Callable[[Any], Any] 138 The function to execute upon completion of a sync. 139 140 Returns 141 ------- 142 Another function (this is a decorator function). 143 144 Examples 145 -------- 146 >>> from meerschaum.plugins import post_sync_hook 147 >>> from meerschaum.utils.misc import interval_str 148 >>> from datetime import timedelta 149 >>> 150 >>> @post_sync_hook 151 ... def log_sync(pipe, success_tuple, duration=None, **kwargs): 152 ... duration_delta = timedelta(seconds=duration) 153 ... duration_text = interval_str(duration_delta) 154 ... print(f"It took {duration_text} to sync {pipe}.") 155 >>> 156 """ 157 with _locks['_post_sync_hooks']: 158 try: 159 if function.__module__ not in _post_sync_hooks: 160 _post_sync_hooks[function.__module__] = [] 161 _post_sync_hooks[function.__module__].append(function) 162 except Exception as e: 163 from meerschaum.utils.warnings import warn 164 warn(e) 165 return function
Register a function as a sync hook to be executed upon completion of a sync.
Parameters
- function (Callable[[Any], Any]): The function to execute upon completion of a sync.
Returns
- Another function (this is a decorator function).
Examples
>>> from meerschaum.plugins import post_sync_hook
>>> from meerschaum.utils.misc import interval_str
>>> from datetime import timedelta
>>>
>>> @post_sync_hook
... def log_sync(pipe, success_tuple, duration=None, **kwargs):
... duration_delta = timedelta(seconds=duration)
... duration_text = interval_str(duration_delta)
... print(f"It took {duration_text} to sync {pipe}.")
>>>