meerschaum.plugins
Expose plugin management APIs from the meerschaum.plugins
module.
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Expose plugin management APIs from the `meerschaum.plugins` module. 7""" 8 9from __future__ import annotations 10from meerschaum.utils.typing import Callable, Any, Union, Optional, Dict, List, Tuple 11from meerschaum.utils.threading import Lock, RLock 12from meerschaum.plugins._Plugin import Plugin 13 14_api_plugins: Dict[str, List[Callable[['fastapi.App'], Any]]] = {} 15_pre_sync_hooks: Dict[str, List[Callable[[Any], Any]]] = {} 16_post_sync_hooks: Dict[str, List[Callable[[Any], Any]]] = {} 17_locks = { 18 '_api_plugins': RLock(), 19 '_pre_sync_hooks': RLock(), 20 '_post_sync_hooks': RLock(), 21 '__path__': RLock(), 22 'sys.path': RLock(), 23 'internal_plugins': RLock(), 24 '_synced_symlinks': RLock(), 25 'PLUGINS_INTERNAL_LOCK_PATH': RLock(), 26} 27__all__ = ( 28 "Plugin", "make_action", "api_plugin", "import_plugins", 29 "reload_plugins", "get_plugins", "get_data_plugins", "add_plugin_argument", 30 "pre_sync_hook", "post_sync_hook", 31) 32__pdoc__ = { 33 'venvs': False, 'data': False, 'stack': False, 'plugins': False, 34} 35 36def make_action( 37 function: Callable[[Any], Any], 38 shell: bool = False, 39 activate: bool = True, 40 deactivate: bool = True, 41 debug: bool = False 42 ) -> Callable[[Any], Any]: 43 """ 44 Make a function a Meerschaum action. Useful for plugins that are adding multiple actions. 45 46 Parameters 47 ---------- 48 function: Callable[[Any], Any] 49 The function to become a Meerschaum action. Must accept all keyword arguments. 50 51 shell: bool, default False 52 Not used. 53 54 Returns 55 ------- 56 Another function (this is a decorator function). 57 58 Examples 59 -------- 60 >>> from meerschaum.plugins import make_action 61 >>> 62 >>> @make_action 63 ... def my_action(**kw): 64 ... print('foo') 65 ... return True, "Success" 66 >>> 67 """ 68 69 from meerschaum.actions import actions 70 from meerschaum.utils.formatting import pprint 71 package_name = function.__globals__['__name__'] 72 plugin_name = ( 73 package_name.split('.')[1] 74 if package_name.startswith('plugins.') else None 75 ) 76 plugin = Plugin(plugin_name) if plugin_name else None 77 78 if debug: 79 from meerschaum.utils.debug import dprint 80 dprint( 81 f"Adding action '{function.__name__}' from plugin " + 82 f"'{plugin}'..." 83 ) 84 85 actions[function.__name__] = function 86 return function 87 88 89def pre_sync_hook( 90 function: Callable[[Any], Any], 91 ) -> Callable[[Any], Any]: 92 """ 93 Register a function as a sync hook to be executed right before sync. 94 95 Parameters 96 ---------- 97 function: Callable[[Any], Any] 98 The function to execute right before a sync. 99 100 Returns 101 ------- 102 Another function (this is a decorator function). 103 104 Examples 105 -------- 106 >>> from meerschaum.plugins import pre_sync_hook 107 >>> 108 >>> @pre_sync_hook 109 ... def log_sync(pipe, **kwargs): 110 ... print(f"About to sync {pipe} with kwargs:\n{kwargs}.") 111 >>> 112 """ 113 with _locks['_pre_sync_hooks']: 114 try: 115 if function.__module__ not in _pre_sync_hooks: 116 _pre_sync_hooks[function.__module__] = [] 117 _pre_sync_hooks[function.__module__].append(function) 118 except Exception as e: 119 from meerschaum.utils.warnings import warn 120 warn(e) 121 return function 122 123 124def post_sync_hook( 125 function: Callable[[Any], Any], 126 ) -> Callable[[Any], Any]: 127 """ 128 Register a function as a sync hook to be executed upon completion of a sync. 129 130 Parameters 131 ---------- 132 function: Callable[[Any], Any] 133 The function to execute upon completion of a sync. 134 135 Returns 136 ------- 137 Another function (this is a decorator function). 138 139 Examples 140 -------- 141 >>> from meerschaum.plugins import post_sync_hook 142 >>> 143 >>> @post_sync_hook 144 ... def log_sync(pipe, success_tuple, duration=None, **kwargs): 145 ... print(f"It took {round(duration, 2)} seconds to sync {pipe}.") 146 >>> 147 """ 148 with _locks['_post_sync_hooks']: 149 try: 150 if function.__module__ not in _post_sync_hooks: 151 _post_sync_hooks[function.__module__] = [] 152 _post_sync_hooks[function.__module__].append(function) 153 except Exception as e: 154 from meerschaum.utils.warnings import warn 155 warn(e) 156 return function 157 158 159def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]: 160 """ 161 Execute the function when initializing the Meerschaum API module. 162 Useful for lazy-loading heavy plugins only when the API is started, 163 such as when editing the `meerschaum.api.app` FastAPI app. 164 165 The FastAPI app will be passed as the only parameter. 166 167 Parameters 168 ---------- 169 function: Callable[[Any, Any]] 170 The function to be called before starting the Meerschaum API. 171 172 Returns 173 ------- 174 Another function (decorator function). 175 176 Examples 177 -------- 178 >>> from meerschaum.plugins import api_plugin 179 >>> 180 >>> @api_plugin 181 >>> def initialize_plugin(app): 182 ... @app.get('/my/new/path') 183 ... def new_path(): 184 ... return {'message': 'It works!'} 185 >>> 186 """ 187 with _locks['_api_plugins']: 188 try: 189 if function.__module__ not in _api_plugins: 190 _api_plugins[function.__module__] = [] 191 _api_plugins[function.__module__].append(function) 192 except Exception as e: 193 from meerschaum.utils.warnings import warn 194 warn(e) 195 return function 196 197 198_synced_symlinks: bool = False 199def sync_plugins_symlinks(debug: bool = False, warn: bool = True) -> None: 200 """ 201 Update the plugins 202 """ 203 global _synced_symlinks 204 with _locks['_synced_symlinks']: 205 if _synced_symlinks: 206 return 207 208 import sys, os, pathlib, time 209 from collections import defaultdict 210 import importlib.util 211 from meerschaum.utils.misc import flatten_list, make_symlink, is_symlink 212 from meerschaum.utils.warnings import error, warn as _warn 213 from meerschaum.config.static import STATIC_CONFIG 214 from meerschaum.utils.venv import Venv, activate_venv, deactivate_venv, is_venv_active 215 from meerschaum.config._paths import ( 216 PLUGINS_RESOURCES_PATH, 217 PLUGINS_ARCHIVES_RESOURCES_PATH, 218 PLUGINS_INIT_PATH, 219 PLUGINS_DIR_PATHS, 220 PLUGINS_INTERNAL_LOCK_PATH, 221 ) 222 223 ### If the lock file exists, sleep for up to a second or until it's removed before continuing. 224 with _locks['PLUGINS_INTERNAL_LOCK_PATH']: 225 if PLUGINS_INTERNAL_LOCK_PATH.exists(): 226 lock_sleep_total = STATIC_CONFIG['plugins']['lock_sleep_total'] 227 lock_sleep_increment = STATIC_CONFIG['plugins']['lock_sleep_increment'] 228 lock_start = time.perf_counter() 229 while ( 230 (time.perf_counter() - lock_start) < lock_sleep_total 231 ): 232 time.sleep(lock_sleep_increment) 233 if not PLUGINS_INTERNAL_LOCK_PATH.exists(): 234 break 235 try: 236 PLUGINS_INTERNAL_LOCK_PATH.unlink() 237 except Exception as e: 238 if warn: 239 _warn(f"Error while removing lockfile {PLUGINS_INTERNAL_LOCK_PATH}:\n{e}") 240 break 241 242 ### Begin locking from other processes. 243 try: 244 PLUGINS_INTERNAL_LOCK_PATH.touch() 245 except Exception as e: 246 if warn: 247 _warn(f"Unable to create lockfile {PLUGINS_INTERNAL_LOCK_PATH}:\n{e}") 248 249 with _locks['internal_plugins']: 250 if is_symlink(PLUGINS_RESOURCES_PATH) or not PLUGINS_RESOURCES_PATH.exists(): 251 try: 252 PLUGINS_RESOURCES_PATH.unlink() 253 except Exception as e: 254 pass 255 256 PLUGINS_RESOURCES_PATH.mkdir(exist_ok=True) 257 258 259 existing_symlinked_paths = [ 260 (PLUGINS_RESOURCES_PATH / item) 261 for item in os.listdir(PLUGINS_RESOURCES_PATH) 262 ] 263 for plugins_path in PLUGINS_DIR_PATHS: 264 if not plugins_path.exists(): 265 plugins_path.mkdir(exist_ok=True, parents=True) 266 plugins_to_be_symlinked = list(flatten_list( 267 [ 268 [ 269 (plugins_path / item) 270 for item in os.listdir(plugins_path) 271 if ( 272 not item.startswith('.') 273 ) and (item not in ('__pycache__', '__init__.py')) 274 ] 275 for plugins_path in PLUGINS_DIR_PATHS 276 ] 277 )) 278 279 ### Check for duplicates. 280 seen_plugins = defaultdict(lambda: 0) 281 for plugin_path in plugins_to_be_symlinked: 282 plugin_name = plugin_path.stem 283 seen_plugins[plugin_name] += 1 284 for plugin_name, plugin_count in seen_plugins.items(): 285 if plugin_count > 1: 286 if warn: 287 _warn(f"Found duplicate plugins named '{plugin_name}'.") 288 289 for plugin_symlink_path in existing_symlinked_paths: 290 real_path = pathlib.Path(os.path.realpath(plugin_symlink_path)) 291 292 ### Remove invalid symlinks. 293 if real_path not in plugins_to_be_symlinked: 294 if not is_symlink(plugin_symlink_path): 295 continue 296 try: 297 plugin_symlink_path.unlink() 298 except Exception as e: 299 pass 300 301 ### Remove valid plugins from the to-be-symlinked list. 302 else: 303 plugins_to_be_symlinked.remove(real_path) 304 305 for plugin_path in plugins_to_be_symlinked: 306 plugin_symlink_path = PLUGINS_RESOURCES_PATH / plugin_path.name 307 try: 308 ### There might be duplicate folders (e.g. __pycache__). 309 if ( 310 plugin_symlink_path.exists() 311 and 312 plugin_symlink_path.is_dir() 313 and 314 not is_symlink(plugin_symlink_path) 315 ): 316 continue 317 success, msg = make_symlink(plugin_path, plugin_symlink_path) 318 except Exception as e: 319 success, msg = False, str(e) 320 if not success: 321 if warn: 322 _warn( 323 f"Failed to create symlink {plugin_symlink_path} " 324 + f"to {plugin_path}:\n {msg}" 325 ) 326 327 ### Release symlink lock file in case other processes need it. 328 with _locks['PLUGINS_INTERNAL_LOCK_PATH']: 329 try: 330 if PLUGINS_INTERNAL_LOCK_PATH.exists(): 331 PLUGINS_INTERNAL_LOCK_PATH.unlink() 332 ### Sometimes competing threads will delete the lock file at the same time. 333 except FileNotFoundError: 334 pass 335 except Exception as e: 336 if warn: 337 _warn(f"Error cleaning up lockfile {PLUGINS_INTERNAL_LOCK_PATH}:\n{e}") 338 339 try: 340 if not PLUGINS_INIT_PATH.exists(): 341 PLUGINS_INIT_PATH.touch() 342 except Exception as e: 343 error(f"Failed to create the file '{PLUGINS_INIT_PATH}':\n{e}") 344 345 with _locks['__path__']: 346 if str(PLUGINS_RESOURCES_PATH.parent) not in __path__: 347 __path__.append(str(PLUGINS_RESOURCES_PATH.parent)) 348 349 with _locks['_synced_symlinks']: 350 _synced_symlinks = True 351 352 353def import_plugins( 354 *plugins_to_import: Union[str, List[str], None], 355 warn: bool = True, 356 ) -> Union[ 357 'ModuleType', Tuple['ModuleType', None] 358 ]: 359 """ 360 Import the Meerschaum plugins directory. 361 362 Parameters 363 ---------- 364 plugins_to_import: Union[str, List[str], None] 365 If provided, only import the specified plugins. 366 Otherwise import the entire plugins module. May be a string, list, or `None`. 367 Defaults to `None`. 368 369 Returns 370 ------- 371 A module of list of modules, depening on the number of plugins provided. 372 373 """ 374 import sys 375 import os 376 import importlib 377 from meerschaum.utils.misc import flatten_list 378 from meerschaum.config._paths import PLUGINS_RESOURCES_PATH 379 from meerschaum.utils.venv import is_venv_active, activate_venv, deactivate_venv, Venv 380 from meerschaum.utils.warnings import warn as _warn 381 plugins_to_import = list(plugins_to_import) 382 with _locks['sys.path']: 383 384 ### Since plugins may depend on other plugins, 385 ### we need to activate the virtual environments for library plugins. 386 ### This logic exists in `Plugin.activate_venv()`, 387 ### but that code requires the plugin's module to already be imported. 388 ### It's not a guarantee of correct activation order, 389 ### e.g. if a library plugin pins a specific package and another 390 plugins_names = get_plugins_names() 391 already_active_venvs = [is_venv_active(plugin_name) for plugin_name in plugins_names] 392 393 if not sys.path or sys.path[0] != str(PLUGINS_RESOURCES_PATH.parent): 394 sys.path.insert(0, str(PLUGINS_RESOURCES_PATH.parent)) 395 396 if not plugins_to_import: 397 for plugin_name in plugins_names: 398 activate_venv(plugin_name) 399 try: 400 imported_plugins = importlib.import_module(PLUGINS_RESOURCES_PATH.stem) 401 except ImportError as e: 402 _warn(f"Failed to import the plugins module:\n {e}") 403 import traceback 404 traceback.print_exc() 405 imported_plugins = None 406 for plugin_name in plugins_names: 407 if plugin_name in already_active_venvs: 408 continue 409 deactivate_venv(plugin_name) 410 411 else: 412 imported_plugins = [] 413 for plugin_name in flatten_list(plugins_to_import): 414 plugin = Plugin(plugin_name) 415 try: 416 with Venv(plugin): 417 imported_plugins.append( 418 importlib.import_module( 419 f'{PLUGINS_RESOURCES_PATH.stem}.{plugin_name}' 420 ) 421 ) 422 except Exception as e: 423 _warn( 424 f"Failed to import plugin '{plugin_name}':\n " 425 + f"{e}\n\nHere's a stacktrace:", 426 stack = False, 427 ) 428 from meerschaum.utils.formatting import get_console 429 get_console().print_exception( 430 suppress = [ 431 'meerschaum/plugins/__init__.py', 432 importlib, 433 importlib._bootstrap, 434 ] 435 ) 436 imported_plugins.append(None) 437 438 if imported_plugins is None and warn: 439 _warn(f"Failed to import plugins.", stacklevel=3) 440 441 if str(PLUGINS_RESOURCES_PATH.parent) in sys.path: 442 sys.path.remove(str(PLUGINS_RESOURCES_PATH.parent)) 443 444 if isinstance(imported_plugins, list): 445 return (imported_plugins[0] if len(imported_plugins) == 1 else tuple(imported_plugins)) 446 return imported_plugins 447 448 449def load_plugins(debug: bool = False, shell: bool = False) -> None: 450 """ 451 Import Meerschaum plugins and update the actions dictionary. 452 """ 453 from inspect import isfunction, getmembers 454 from meerschaum.actions import __all__ as _all, modules 455 from meerschaum.config._paths import PLUGINS_RESOURCES_PATH 456 from meerschaum.utils.packages import get_modules_from_package 457 if debug: 458 from meerschaum.utils.debug import dprint 459 460 _plugins_names, plugins_modules = get_modules_from_package( 461 import_plugins(), 462 names = True, 463 recursive = True, 464 modules_venvs = True 465 ) 466 ### I'm appending here to keep from redefining the modules list. 467 new_modules = ( 468 [ 469 mod for mod in modules 470 if not mod.__name__.startswith(PLUGINS_RESOURCES_PATH.stem + '.') 471 ] 472 + plugins_modules 473 ) 474 n_mods = len(modules) 475 for mod in new_modules: 476 modules.append(mod) 477 for i in range(n_mods): 478 modules.pop(0) 479 480 for module in plugins_modules: 481 for name, func in getmembers(module): 482 if not isfunction(func): 483 continue 484 if name == module.__name__.split('.')[-1]: 485 make_action(func, **{'shell': shell, 'debug': debug}) 486 487 488def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None: 489 """ 490 Reload plugins back into memory. 491 492 Parameters 493 ---------- 494 plugins: Optional[List[str]], default None 495 The plugins to reload. `None` will reload all plugins. 496 497 """ 498 import sys 499 if debug: 500 from meerschaum.utils.debug import dprint 501 502 if not plugins: 503 plugins = get_plugins_names() 504 for plugin_name in plugins: 505 if debug: 506 dprint(f"Reloading plugin '{plugin_name}'...") 507 mod_name = 'plugins.' + str(plugin_name) 508 if mod_name in sys.modules: 509 del sys.modules[mod_name] 510 load_plugins(debug=debug) 511 512 513def get_plugins(*to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]: 514 """ 515 Return a list of `Plugin` objects. 516 517 Parameters 518 ---------- 519 to_load: 520 If specified, only load specific plugins. 521 Otherwise return all plugins. 522 523 try_import: bool, default True 524 If `True`, allow for plugins to be imported. 525 """ 526 from meerschaum.config._paths import PLUGINS_RESOURCES_PATH 527 import os 528 sync_plugins_symlinks() 529 _plugins = [ 530 Plugin(name) for name in ( 531 to_load or [ 532 ( 533 name if (PLUGINS_RESOURCES_PATH / name).is_dir() 534 else name[:-3] 535 ) for name in os.listdir(PLUGINS_RESOURCES_PATH) if name != '__init__.py' 536 ] 537 ) 538 ] 539 plugins = tuple(plugin for plugin in _plugins if plugin.is_installed(try_import=try_import)) 540 if len(to_load) == 1: 541 return plugins[0] 542 return plugins 543 544 545def get_plugins_names(*to_load, **kw) -> List[str]: 546 """ 547 Return a list of installed plugins. 548 """ 549 return [plugin.name for plugin in get_plugins(*to_load, **kw)] 550 551 552def get_plugins_modules(*to_load, **kw) -> List['ModuleType']: 553 """ 554 Return a list of modules for the installed plugins, or `None` if things break. 555 """ 556 return [plugin.module for plugin in get_plugins(*to_load, **kw)] 557 558 559def get_data_plugins() -> List[Plugin]: 560 """ 561 Only return the modules of plugins with either `fetch()` or `sync()` functions. 562 """ 563 import inspect 564 plugins = get_plugins() 565 data_names = {'sync', 'fetch'} 566 data_plugins = [] 567 for plugin in plugins: 568 for name, ob in inspect.getmembers(plugin.module): 569 if not inspect.isfunction(ob): 570 continue 571 if name not in data_names: 572 continue 573 data_plugins.append(plugin) 574 return data_plugins 575 576 577def add_plugin_argument(*args, **kwargs) -> None: 578 """ 579 Add argparse arguments under the 'Plugins options' group. 580 Takes the same parameters as the regular argparse `add_argument()` function. 581 582 Examples 583 -------- 584 >>> add_plugin_argument('--foo', type=int, help="This is my help text!") 585 >>> 586 """ 587 from meerschaum._internal.arguments._parser import groups, _seen_plugin_args, parser 588 from meerschaum.utils.warnings import warn, error 589 _parent_plugin_name = _get_parent_plugin(2) 590 title = f"Plugin '{_parent_plugin_name}' options" if _parent_plugin_name else 'Custom options' 591 group_key = 'plugin_' + (_parent_plugin_name or '') 592 if group_key not in groups: 593 groups[group_key] = parser.add_argument_group( 594 title = title, 595 ) 596 _seen_plugin_args[group_key] = set() 597 try: 598 if str(args) not in _seen_plugin_args[group_key]: 599 groups[group_key].add_argument(*args, **kwargs) 600 _seen_plugin_args[group_key].add(str(args)) 601 except Exception as e: 602 warn(e) 603 604 605def _get_parent_plugin(stacklevel: int = 1) -> Union[str, None]: 606 """If this function is called from outside a Meerschaum plugin, it will return None.""" 607 import inspect, re 608 try: 609 parent_globals = inspect.stack()[stacklevel][0].f_globals 610 parent_file = parent_globals.get('__file__', '') 611 return parent_globals['__name__'].replace('plugins.', '').split('.')[0] 612 except Exception as e: 613 return None
33class Plugin: 34 """Handle packaging of Meerschaum plugins.""" 35 def __init__( 36 self, 37 name: str, 38 version: Optional[str] = None, 39 user_id: Optional[int] = None, 40 required: Optional[List[str]] = None, 41 attributes: Optional[Dict[str, Any]] = None, 42 archive_path: Optional[pathlib.Path] = None, 43 venv_path: Optional[pathlib.Path] = None, 44 repo_connector: Optional['meerschaum.connectors.api.APIConnector'] = None, 45 repo: Union['meerschaum.connectors.api.APIConnector', str, None] = None, 46 ): 47 from meerschaum.config.static import STATIC_CONFIG 48 sep = STATIC_CONFIG['plugins']['repo_separator'] 49 _repo = None 50 if sep in name: 51 try: 52 name, _repo = name.split(sep) 53 except Exception as e: 54 error(f"Invalid plugin name: '{name}'") 55 self._repo_in_name = _repo 56 57 if attributes is None: 58 attributes = {} 59 self.name = name 60 self.attributes = attributes 61 self.user_id = user_id 62 self._version = version 63 if required: 64 self._required = required 65 self.archive_path = ( 66 archive_path if archive_path is not None 67 else PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz" 68 ) 69 self.venv_path = ( 70 venv_path if venv_path is not None 71 else VIRTENV_RESOURCES_PATH / self.name 72 ) 73 self._repo_connector = repo_connector 74 self._repo_keys = repo 75 76 77 @property 78 def repo_connector(self): 79 """ 80 Return the repository connector for this plugin. 81 NOTE: This imports the `connectors` module, which imports certain plugin modules. 82 """ 83 if self._repo_connector is None: 84 from meerschaum.connectors.parse import parse_repo_keys 85 86 repo_keys = self._repo_keys or self._repo_in_name 87 if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name: 88 error( 89 f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'." 90 ) 91 repo_connector = parse_repo_keys(repo_keys) 92 self._repo_connector = repo_connector 93 return self._repo_connector 94 95 96 @property 97 def version(self): 98 """ 99 Return the plugin's module version is defined (`__version__`) if it's defined. 100 """ 101 if self._version is None: 102 try: 103 self._version = self.module.__version__ 104 except Exception as e: 105 self._version = None 106 return self._version 107 108 109 @property 110 def module(self): 111 """ 112 Return the Python module of the underlying plugin. 113 """ 114 if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None: 115 if self.__file__ is None: 116 return None 117 from meerschaum.plugins import import_plugins 118 self._module = import_plugins(str(self), warn=False) 119 return self._module 120 121 122 @property 123 def __file__(self) -> Union[str, None]: 124 """ 125 Return the file path (str) of the plugin if it exists, otherwise `None`. 126 """ 127 if self.__dict__.get('_module', None) is not None: 128 return self.module.__file__ 129 130 potential_dir = PLUGINS_RESOURCES_PATH / self.name 131 if ( 132 potential_dir.exists() 133 and potential_dir.is_dir() 134 and (potential_dir / '__init__.py').exists() 135 ): 136 return str((potential_dir / '__init__.py').as_posix()) 137 138 potential_file = PLUGINS_RESOURCES_PATH / (self.name + '.py') 139 if potential_file.exists() and not potential_file.is_dir(): 140 return str(potential_file.as_posix()) 141 142 return None 143 144 145 @property 146 def requirements_file_path(self) -> Union[pathlib.Path, None]: 147 """ 148 If a file named `requirements.txt` exists, return its path. 149 """ 150 if self.__file__ is None: 151 return None 152 path = pathlib.Path(self.__file__).parent / 'requirements.txt' 153 if not path.exists(): 154 return None 155 return path 156 157 158 def is_installed(self, **kw) -> bool: 159 """ 160 Check whether a plugin is correctly installed. 161 162 Returns 163 ------- 164 A `bool` indicating whether a plugin exists and is successfully imported. 165 """ 166 return self.__file__ is not None 167 168 169 def make_tar(self, debug: bool = False) -> pathlib.Path: 170 """ 171 Compress the plugin's source files into a `.tar.gz` archive and return the archive's path. 172 173 Parameters 174 ---------- 175 debug: bool, default False 176 Verbosity toggle. 177 178 Returns 179 ------- 180 A `pathlib.Path` to the archive file's path. 181 182 """ 183 import tarfile, pathlib, subprocess, fnmatch 184 from meerschaum.utils.debug import dprint 185 from meerschaum.utils.packages import attempt_import 186 pathspec = attempt_import('pathspec', debug=debug) 187 188 if not self.__file__: 189 from meerschaum.utils.warnings import error 190 error(f"Could not find file for plugin '{self}'.") 191 if '__init__.py' in self.__file__ or os.path.isdir(self.__file__): 192 path = self.__file__.replace('__init__.py', '') 193 is_dir = True 194 else: 195 path = self.__file__ 196 is_dir = False 197 198 old_cwd = os.getcwd() 199 real_parent_path = pathlib.Path(os.path.realpath(path)).parent 200 os.chdir(real_parent_path) 201 202 default_patterns_to_ignore = [ 203 '.pyc', 204 '__pycache__/', 205 'eggs/', 206 '__pypackages__/', 207 '.git', 208 ] 209 210 def parse_gitignore() -> 'Set[str]': 211 gitignore_path = pathlib.Path(path) / '.gitignore' 212 if not gitignore_path.exists(): 213 return set() 214 with open(gitignore_path, 'r', encoding='utf-8') as f: 215 gitignore_text = f.read() 216 return set(pathspec.PathSpec.from_lines( 217 pathspec.patterns.GitWildMatchPattern, 218 default_patterns_to_ignore + gitignore_text.splitlines() 219 ).match_tree(path)) 220 221 patterns_to_ignore = parse_gitignore() if is_dir else set() 222 223 if debug: 224 dprint(f"Patterns to ignore:\n{patterns_to_ignore}") 225 226 with tarfile.open(self.archive_path, 'w:gz') as tarf: 227 if not is_dir: 228 tarf.add(f"{self.name}.py") 229 else: 230 for root, dirs, files in os.walk(self.name): 231 for f in files: 232 good_file = True 233 fp = os.path.join(root, f) 234 for pattern in patterns_to_ignore: 235 if pattern in str(fp) or f.startswith('.'): 236 good_file = False 237 break 238 if good_file: 239 if debug: 240 dprint(f"Adding '{fp}'...") 241 tarf.add(fp) 242 243 ### clean up and change back to old directory 244 os.chdir(old_cwd) 245 246 ### change to 775 to avoid permissions issues with the API in a Docker container 247 self.archive_path.chmod(0o775) 248 249 if debug: 250 dprint(f"Created archive '{self.archive_path}'.") 251 return self.archive_path 252 253 254 def install( 255 self, 256 force: bool = False, 257 debug: bool = False, 258 ) -> SuccessTuple: 259 """ 260 Extract a plugin's tar archive to the plugins directory. 261 262 This function checks if the plugin is already installed and if the version is equal or 263 greater than the existing installation. 264 265 Parameters 266 ---------- 267 force: bool, default False 268 If `True`, continue with installation, even if required packages fail to install. 269 270 debug: bool, default False 271 Verbosity toggle. 272 273 Returns 274 ------- 275 A `SuccessTuple` of success (bool) and a message (str). 276 277 """ 278 if self.full_name in _ongoing_installations: 279 return True, f"Already installing plugin '{self}'." 280 _ongoing_installations.add(self.full_name) 281 from meerschaum.utils.warnings import warn, error 282 if debug: 283 from meerschaum.utils.debug import dprint 284 import tarfile 285 import re 286 import ast 287 from meerschaum.plugins import sync_plugins_symlinks 288 from meerschaum.utils.packages import attempt_import, determine_version, reload_meerschaum 289 from meerschaum.utils.venv import init_venv 290 from meerschaum.utils.misc import safely_extract_tar 291 old_cwd = os.getcwd() 292 old_version = '' 293 new_version = '' 294 temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name 295 temp_dir.mkdir(exist_ok=True) 296 297 if not self.archive_path.exists(): 298 return False, f"Missing archive file for plugin '{self}'." 299 if self.version is not None: 300 old_version = self.version 301 if debug: 302 dprint(f"Found existing version '{old_version}' for plugin '{self}'.") 303 304 if debug: 305 dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...") 306 307 try: 308 with tarfile.open(self.archive_path, 'r:gz') as tarf: 309 safely_extract_tar(tarf, temp_dir) 310 except Exception as e: 311 warn(e) 312 return False, f"Failed to extract plugin '{self.name}'." 313 314 ### search for version information 315 files = os.listdir(temp_dir) 316 317 if str(files[0]) == self.name: 318 is_dir = True 319 elif str(files[0]) == self.name + '.py': 320 is_dir = False 321 else: 322 error(f"Unknown format encountered for plugin '{self}'.") 323 324 fpath = temp_dir / files[0] 325 if is_dir: 326 fpath = fpath / '__init__.py' 327 328 init_venv(self.name, debug=debug) 329 with open(fpath, 'r', encoding='utf-8') as f: 330 init_lines = f.readlines() 331 new_version = None 332 for line in init_lines: 333 if '__version__' not in line: 334 continue 335 version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip()) 336 if not version_match: 337 continue 338 new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip()) 339 break 340 if not new_version: 341 warn( 342 f"No `__version__` defined for plugin '{self}'. " 343 + "Assuming new version...", 344 stack = False, 345 ) 346 347 packaging_version = attempt_import('packaging.version') 348 try: 349 is_new_version = (not new_version and not old_version) or ( 350 packaging_version.parse(old_version) < packaging_version.parse(new_version) 351 ) 352 is_same_version = new_version and old_version and ( 353 packaging_version.parse(old_version) == packaging_version.parse(new_version) 354 ) 355 except Exception as e: 356 is_new_version, is_same_version = True, False 357 358 ### Determine where to permanently store the new plugin. 359 plugin_installation_dir_path = PLUGINS_DIR_PATHS[0] 360 for path in PLUGINS_DIR_PATHS: 361 files_in_plugins_dir = os.listdir(path) 362 if ( 363 self.name in files_in_plugins_dir 364 or 365 (self.name + '.py') in files_in_plugins_dir 366 ): 367 plugin_installation_dir_path = path 368 break 369 370 success_msg = f"Successfully installed plugin '{self}'." 371 success, abort = None, None 372 373 if is_same_version and not force: 374 success, msg = True, ( 375 f"Plugin '{self}' is up-to-date (version {old_version}).\n" + 376 " Install again with `-f` or `--force` to reinstall." 377 ) 378 abort = True 379 elif is_new_version or force: 380 for src_dir, dirs, files in os.walk(temp_dir): 381 if success is not None: 382 break 383 dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path)) 384 if not os.path.exists(dst_dir): 385 os.mkdir(dst_dir) 386 for f in files: 387 src_file = os.path.join(src_dir, f) 388 dst_file = os.path.join(dst_dir, f) 389 if os.path.exists(dst_file): 390 os.remove(dst_file) 391 392 if debug: 393 dprint(f"Moving '{src_file}' to '{dst_dir}'...") 394 try: 395 shutil.move(src_file, dst_dir) 396 except Exception as e: 397 success, msg = False, ( 398 f"Failed to install plugin '{self}': " + 399 f"Could not move file '{src_file}' to '{dst_dir}'" 400 ) 401 print(msg) 402 break 403 if success is None: 404 success, msg = True, success_msg 405 else: 406 success, msg = False, ( 407 f"Your installed version of plugin '{self}' ({old_version}) is higher than " 408 + f"attempted version {new_version}." 409 ) 410 411 shutil.rmtree(temp_dir) 412 os.chdir(old_cwd) 413 414 ### Reload the plugin's module. 415 sync_plugins_symlinks(debug=debug) 416 if '_module' in self.__dict__: 417 del self.__dict__['_module'] 418 init_venv(venv=self.name, force=True, debug=debug) 419 reload_meerschaum(debug=debug) 420 421 ### if we've already failed, return here 422 if not success or abort: 423 _ongoing_installations.remove(self.full_name) 424 return success, msg 425 426 ### attempt to install dependencies 427 if not self.install_dependencies(force=force, debug=debug): 428 _ongoing_installations.remove(self.full_name) 429 return False, f"Failed to install dependencies for plugin '{self}'." 430 431 ### handling success tuple, bool, or other (typically None) 432 setup_tuple = self.setup(debug=debug) 433 if isinstance(setup_tuple, tuple): 434 if not setup_tuple[0]: 435 success, msg = setup_tuple 436 elif isinstance(setup_tuple, bool): 437 if not setup_tuple: 438 success, msg = False, ( 439 f"Failed to run post-install setup for plugin '{self}'." + '\n' + 440 f"Check `setup()` in '{self.__file__}' for more information " + 441 f"(no error message provided)." 442 ) 443 else: 444 success, msg = True, success_msg 445 elif setup_tuple is None: 446 success = True 447 msg = ( 448 f"Post-install for plugin '{self}' returned None. " + 449 f"Assuming plugin successfully installed." 450 ) 451 warn(msg) 452 else: 453 success = False 454 msg = ( 455 f"Post-install for plugin '{self}' returned unexpected value " + 456 f"of type '{type(setup_tuple)}': {setup_tuple}" 457 ) 458 459 _ongoing_installations.remove(self.full_name) 460 module = self.module 461 return success, msg 462 463 464 def remove_archive( 465 self, 466 debug: bool = False 467 ) -> SuccessTuple: 468 """Remove a plugin's archive file.""" 469 if not self.archive_path.exists(): 470 return True, f"Archive file for plugin '{self}' does not exist." 471 try: 472 self.archive_path.unlink() 473 except Exception as e: 474 return False, f"Failed to remove archive for plugin '{self}':\n{e}" 475 return True, "Success" 476 477 478 def remove_venv( 479 self, 480 debug: bool = False 481 ) -> SuccessTuple: 482 """Remove a plugin's virtual environment.""" 483 if not self.venv_path.exists(): 484 return True, f"Virtual environment for plugin '{self}' does not exist." 485 try: 486 shutil.rmtree(self.venv_path) 487 except Exception as e: 488 return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}" 489 return True, "Success" 490 491 492 def uninstall(self, debug: bool = False) -> SuccessTuple: 493 """ 494 Remove a plugin, its virtual environment, and archive file. 495 """ 496 from meerschaum.utils.packages import reload_meerschaum 497 from meerschaum.plugins import sync_plugins_symlinks 498 from meerschaum.utils.warnings import warn, info 499 warnings_thrown_count: int = 0 500 max_warnings: int = 3 501 502 if not self.is_installed(): 503 info( 504 f"Plugin '{self.name}' doesn't seem to be installed.\n " 505 + "Checking for artifacts...", 506 stack = False, 507 ) 508 else: 509 real_path = pathlib.Path(os.path.realpath(self.__file__)) 510 try: 511 if real_path.name == '__init__.py': 512 shutil.rmtree(real_path.parent) 513 else: 514 real_path.unlink() 515 except Exception as e: 516 warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False) 517 warnings_thrown_count += 1 518 else: 519 info(f"Removed source files for plugin '{self.name}'.") 520 521 if self.venv_path.exists(): 522 success, msg = self.remove_venv(debug=debug) 523 if not success: 524 warn(msg, stack=False) 525 warnings_thrown_count += 1 526 else: 527 info(f"Removed virtual environment from plugin '{self.name}'.") 528 529 success = warnings_thrown_count < max_warnings 530 sync_plugins_symlinks(debug=debug) 531 self.deactivate_venv(force=True, debug=debug) 532 reload_meerschaum(debug=debug) 533 return success, ( 534 f"Successfully uninstalled plugin '{self}'." if success 535 else f"Failed to uninstall plugin '{self}'." 536 ) 537 538 539 def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]: 540 """ 541 If exists, run the plugin's `setup()` function. 542 543 Parameters 544 ---------- 545 *args: str 546 The positional arguments passed to the `setup()` function. 547 548 debug: bool, default False 549 Verbosity toggle. 550 551 **kw: Any 552 The keyword arguments passed to the `setup()` function. 553 554 Returns 555 ------- 556 A `SuccessTuple` or `bool` indicating success. 557 558 """ 559 from meerschaum.utils.debug import dprint 560 import inspect 561 _setup = None 562 for name, fp in inspect.getmembers(self.module): 563 if name == 'setup' and inspect.isfunction(fp): 564 _setup = fp 565 break 566 567 ### assume success if no setup() is found (not necessary) 568 if _setup is None: 569 return True 570 571 sig = inspect.signature(_setup) 572 has_debug, has_kw = ('debug' in sig.parameters), False 573 for k, v in sig.parameters.items(): 574 if '**' in str(v): 575 has_kw = True 576 break 577 578 _kw = {} 579 if has_kw: 580 _kw.update(kw) 581 if has_debug: 582 _kw['debug'] = debug 583 584 if debug: 585 dprint(f"Running setup for plugin '{self}'...") 586 try: 587 self.activate_venv(debug=debug) 588 return_tuple = _setup(*args, **_kw) 589 self.deactivate_venv(debug=debug) 590 except Exception as e: 591 return False, str(e) 592 593 if isinstance(return_tuple, tuple): 594 return return_tuple 595 if isinstance(return_tuple, bool): 596 return return_tuple, f"Setup for Plugin '{self.name}' did not return a message." 597 if return_tuple is None: 598 return False, f"Setup for Plugin '{self.name}' returned None." 599 return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}" 600 601 602 def get_dependencies( 603 self, 604 debug: bool = False, 605 ) -> List[str]: 606 """ 607 If the Plugin has specified dependencies in a list called `required`, return the list. 608 609 **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages. 610 Meerschaum plugins may also specify connector keys for a repo after `'@'`. 611 612 Parameters 613 ---------- 614 debug: bool, default False 615 Verbosity toggle. 616 617 Returns 618 ------- 619 A list of required packages and plugins (str). 620 621 """ 622 if '_required' in self.__dict__: 623 return self._required 624 625 ### If the plugin has not yet been imported, 626 ### infer the dependencies from the source text. 627 ### This is not super robust, and it doesn't feel right 628 ### having multiple versions of the logic. 629 ### This is necessary when determining the activation order 630 ### without having import the module. 631 ### For consistency's sake, the module-less method does not cache the requirements. 632 if self.__dict__.get('_module', None) is None: 633 file_path = self.__file__ 634 if file_path is None: 635 return [] 636 with open(file_path, 'r', encoding='utf-8') as f: 637 text = f.read() 638 639 if 'required' not in text: 640 return [] 641 642 ### This has some limitations: 643 ### It relies on `required` being manually declared. 644 ### We lose the ability to dynamically alter the `required` list, 645 ### which is why we've kept the module-reliant method below. 646 import ast, re 647 ### NOTE: This technically would break 648 ### if `required` was the very first line of the file. 649 req_start_match = re.search(r'\nrequired(\s?)=', text) 650 if not req_start_match: 651 return [] 652 req_start = req_start_match.start() 653 654 ### Dependencies may have brackets within the strings, so push back the index. 655 first_opening_brace = req_start + 1 + text[req_start:].find('[') 656 if first_opening_brace == -1: 657 return [] 658 659 next_closing_brace = req_start + 1 + text[req_start:].find(']') 660 if next_closing_brace == -1: 661 return [] 662 663 start_ix = first_opening_brace + 1 664 end_ix = next_closing_brace 665 666 num_braces = 0 667 while True: 668 if '[' not in text[start_ix:end_ix]: 669 break 670 num_braces += 1 671 start_ix = end_ix 672 end_ix += text[end_ix + 1:].find(']') + 1 673 674 req_end = end_ix + 1 675 req_text = ( 676 text[req_start:req_end] 677 .lstrip() 678 .replace('required', '', 1) 679 .lstrip() 680 .replace('=', '', 1) 681 .lstrip() 682 ) 683 try: 684 required = ast.literal_eval(req_text) 685 except Exception as e: 686 warn( 687 f"Unable to determine requirements for plugin '{self.name}' " 688 + "without importing the module.\n" 689 + " This may be due to dynamically setting the global `required` list.\n" 690 + f" {e}" 691 ) 692 return [] 693 return required 694 695 import inspect 696 self.activate_venv(dependencies=False, debug=debug) 697 required = [] 698 for name, val in inspect.getmembers(self.module): 699 if name == 'required': 700 required = val 701 break 702 self._required = required 703 self.deactivate_venv(dependencies=False, debug=debug) 704 return required 705 706 707 def get_required_plugins(self, debug: bool=False) -> List[meerschaum.plugins.Plugin]: 708 """ 709 Return a list of required Plugin objects. 710 """ 711 from meerschaum.utils.warnings import warn 712 from meerschaum.config import get_config 713 from meerschaum.config.static import STATIC_CONFIG 714 plugins = [] 715 _deps = self.get_dependencies(debug=debug) 716 sep = STATIC_CONFIG['plugins']['repo_separator'] 717 plugin_names = [ 718 _d[len('plugin:'):] for _d in _deps 719 if _d.startswith('plugin:') and len(_d) > len('plugin:') 720 ] 721 default_repo_keys = get_config('meerschaum', 'default_repository') 722 for _plugin_name in plugin_names: 723 if sep in _plugin_name: 724 try: 725 _plugin_name, _repo_keys = _plugin_name.split(sep) 726 except Exception as e: 727 _repo_keys = default_repo_keys 728 warn( 729 f"Invalid repo keys for required plugin '{_plugin_name}'.\n " 730 + f"Will try to use '{_repo_keys}' instead.", 731 stack = False, 732 ) 733 else: 734 _repo_keys = default_repo_keys 735 plugins.append(Plugin(_plugin_name, repo=_repo_keys)) 736 return plugins 737 738 739 def get_required_packages(self, debug: bool=False) -> List[str]: 740 """ 741 Return the required package names (excluding plugins). 742 """ 743 _deps = self.get_dependencies(debug=debug) 744 return [_d for _d in _deps if not _d.startswith('plugin:')] 745 746 747 def activate_venv(self, dependencies: bool=True, debug: bool=False, **kw) -> bool: 748 """ 749 Activate the virtual environments for the plugin and its dependencies. 750 751 Parameters 752 ---------- 753 dependencies: bool, default True 754 If `True`, activate the virtual environments for required plugins. 755 756 Returns 757 ------- 758 A bool indicating success. 759 """ 760 from meerschaum.utils.venv import venv_target_path 761 from meerschaum.utils.packages import activate_venv 762 from meerschaum.utils.misc import make_symlink, is_symlink 763 from meerschaum.config._paths import PACKAGE_ROOT_PATH 764 765 if dependencies: 766 for plugin in self.get_required_plugins(debug=debug): 767 plugin.activate_venv(debug=debug, **kw) 768 769 vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True) 770 venv_meerschaum_path = vtp / 'meerschaum' 771 772 try: 773 success, msg = True, "Success" 774 if is_symlink(venv_meerschaum_path): 775 if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH: 776 venv_meerschaum_path.unlink() 777 success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH) 778 except Exception as e: 779 success, msg = False, str(e) 780 if not success: 781 warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}") 782 783 return activate_venv(self.name, debug=debug, **kw) 784 785 786 def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool: 787 """ 788 Deactivate the virtual environments for the plugin and its dependencies. 789 790 Parameters 791 ---------- 792 dependencies: bool, default True 793 If `True`, deactivate the virtual environments for required plugins. 794 795 Returns 796 ------- 797 A bool indicating success. 798 """ 799 from meerschaum.utils.packages import deactivate_venv 800 success = deactivate_venv(self.name, debug=debug, **kw) 801 if dependencies: 802 for plugin in self.get_required_plugins(debug=debug): 803 plugin.deactivate_venv(debug=debug, **kw) 804 return success 805 806 807 def install_dependencies( 808 self, 809 force: bool = False, 810 debug: bool = False, 811 ) -> bool: 812 """ 813 If specified, install dependencies. 814 815 **NOTE:** Dependencies that start with `'plugin:'` will be installed as 816 Meerschaum plugins from the same repository as this Plugin. 817 To install from a different repository, add the repo keys after `'@'` 818 (e.g. `'plugin:foo@api:bar'`). 819 820 Parameters 821 ---------- 822 force: bool, default False 823 If `True`, continue with the installation, even if some 824 required packages fail to install. 825 826 debug: bool, default False 827 Verbosity toggle. 828 829 Returns 830 ------- 831 A bool indicating success. 832 833 """ 834 from meerschaum.utils.packages import pip_install, venv_contains_package 835 from meerschaum.utils.debug import dprint 836 from meerschaum.utils.warnings import warn, info 837 from meerschaum.connectors.parse import parse_repo_keys 838 _deps = self.get_dependencies(debug=debug) 839 if not _deps and self.requirements_file_path is None: 840 return True 841 842 plugins = self.get_required_plugins(debug=debug) 843 for _plugin in plugins: 844 if _plugin.name == self.name: 845 warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False) 846 continue 847 _success, _msg = _plugin.repo_connector.install_plugin( 848 _plugin.name, debug=debug, force=force 849 ) 850 if not _success: 851 warn( 852 f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'" 853 + f" for plugin '{self.name}':\n" + _msg, 854 stack = False, 855 ) 856 if not force: 857 warn( 858 "Try installing with the `--force` flag to continue anyway.", 859 stack = False, 860 ) 861 return False 862 info( 863 "Continuing with installation despite the failure " 864 + "(careful, things might be broken!)...", 865 icon = False 866 ) 867 868 869 ### First step: parse `requirements.txt` if it exists. 870 if self.requirements_file_path is not None: 871 if not pip_install( 872 requirements_file_path=self.requirements_file_path, 873 venv=self.name, debug=debug 874 ): 875 warn( 876 f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.", 877 stack = False, 878 ) 879 if not force: 880 warn( 881 "Try installing with `--force` to continue anyway.", 882 stack = False, 883 ) 884 return False 885 info( 886 "Continuing with installation despite the failure " 887 + "(careful, things might be broken!)...", 888 icon = False 889 ) 890 891 892 ### Don't reinstall packages that are already included in required plugins. 893 packages = [] 894 _packages = self.get_required_packages(debug=debug) 895 accounted_for_packages = set() 896 for package_name in _packages: 897 for plugin in plugins: 898 if venv_contains_package(package_name, plugin.name): 899 accounted_for_packages.add(package_name) 900 break 901 packages = [pkg for pkg in _packages if pkg not in accounted_for_packages] 902 903 ### Attempt pip packages installation. 904 if packages: 905 for package in packages: 906 if not pip_install(package, venv=self.name, debug=debug): 907 warn( 908 f"Failed to install required package '{package}'" 909 + f" for plugin '{self.name}'.", 910 stack = False, 911 ) 912 if not force: 913 warn( 914 "Try installing with `--force` to continue anyway.", 915 stack = False, 916 ) 917 return False 918 info( 919 "Continuing with installation despite the failure " 920 + "(careful, things might be broken!)...", 921 icon = False 922 ) 923 return True 924 925 926 @property 927 def full_name(self) -> str: 928 """ 929 Include the repo keys with the plugin's name. 930 """ 931 from meerschaum.config.static import STATIC_CONFIG 932 sep = STATIC_CONFIG['plugins']['repo_separator'] 933 return self.name + sep + str(self.repo_connector) 934 935 936 def __str__(self): 937 return self.name 938 939 940 def __repr__(self): 941 return f"Plugin('{self.name}', repo='{self.repo_connector}')" 942 943 944 def __del__(self): 945 pass
Handle packaging of Meerschaum plugins.
35 def __init__( 36 self, 37 name: str, 38 version: Optional[str] = None, 39 user_id: Optional[int] = None, 40 required: Optional[List[str]] = None, 41 attributes: Optional[Dict[str, Any]] = None, 42 archive_path: Optional[pathlib.Path] = None, 43 venv_path: Optional[pathlib.Path] = None, 44 repo_connector: Optional['meerschaum.connectors.api.APIConnector'] = None, 45 repo: Union['meerschaum.connectors.api.APIConnector', str, None] = None, 46 ): 47 from meerschaum.config.static import STATIC_CONFIG 48 sep = STATIC_CONFIG['plugins']['repo_separator'] 49 _repo = None 50 if sep in name: 51 try: 52 name, _repo = name.split(sep) 53 except Exception as e: 54 error(f"Invalid plugin name: '{name}'") 55 self._repo_in_name = _repo 56 57 if attributes is None: 58 attributes = {} 59 self.name = name 60 self.attributes = attributes 61 self.user_id = user_id 62 self._version = version 63 if required: 64 self._required = required 65 self.archive_path = ( 66 archive_path if archive_path is not None 67 else PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz" 68 ) 69 self.venv_path = ( 70 venv_path if venv_path is not None 71 else VIRTENV_RESOURCES_PATH / self.name 72 ) 73 self._repo_connector = repo_connector 74 self._repo_keys = repo
Return the repository connector for this plugin.
NOTE: This imports the connectors
module, which imports certain plugin modules.
If a file named requirements.txt
exists, return its path.
158 def is_installed(self, **kw) -> bool: 159 """ 160 Check whether a plugin is correctly installed. 161 162 Returns 163 ------- 164 A `bool` indicating whether a plugin exists and is successfully imported. 165 """ 166 return self.__file__ is not None
Check whether a plugin is correctly installed.
Returns
- A
bool
indicating whether a plugin exists and is successfully imported.
169 def make_tar(self, debug: bool = False) -> pathlib.Path: 170 """ 171 Compress the plugin's source files into a `.tar.gz` archive and return the archive's path. 172 173 Parameters 174 ---------- 175 debug: bool, default False 176 Verbosity toggle. 177 178 Returns 179 ------- 180 A `pathlib.Path` to the archive file's path. 181 182 """ 183 import tarfile, pathlib, subprocess, fnmatch 184 from meerschaum.utils.debug import dprint 185 from meerschaum.utils.packages import attempt_import 186 pathspec = attempt_import('pathspec', debug=debug) 187 188 if not self.__file__: 189 from meerschaum.utils.warnings import error 190 error(f"Could not find file for plugin '{self}'.") 191 if '__init__.py' in self.__file__ or os.path.isdir(self.__file__): 192 path = self.__file__.replace('__init__.py', '') 193 is_dir = True 194 else: 195 path = self.__file__ 196 is_dir = False 197 198 old_cwd = os.getcwd() 199 real_parent_path = pathlib.Path(os.path.realpath(path)).parent 200 os.chdir(real_parent_path) 201 202 default_patterns_to_ignore = [ 203 '.pyc', 204 '__pycache__/', 205 'eggs/', 206 '__pypackages__/', 207 '.git', 208 ] 209 210 def parse_gitignore() -> 'Set[str]': 211 gitignore_path = pathlib.Path(path) / '.gitignore' 212 if not gitignore_path.exists(): 213 return set() 214 with open(gitignore_path, 'r', encoding='utf-8') as f: 215 gitignore_text = f.read() 216 return set(pathspec.PathSpec.from_lines( 217 pathspec.patterns.GitWildMatchPattern, 218 default_patterns_to_ignore + gitignore_text.splitlines() 219 ).match_tree(path)) 220 221 patterns_to_ignore = parse_gitignore() if is_dir else set() 222 223 if debug: 224 dprint(f"Patterns to ignore:\n{patterns_to_ignore}") 225 226 with tarfile.open(self.archive_path, 'w:gz') as tarf: 227 if not is_dir: 228 tarf.add(f"{self.name}.py") 229 else: 230 for root, dirs, files in os.walk(self.name): 231 for f in files: 232 good_file = True 233 fp = os.path.join(root, f) 234 for pattern in patterns_to_ignore: 235 if pattern in str(fp) or f.startswith('.'): 236 good_file = False 237 break 238 if good_file: 239 if debug: 240 dprint(f"Adding '{fp}'...") 241 tarf.add(fp) 242 243 ### clean up and change back to old directory 244 os.chdir(old_cwd) 245 246 ### change to 775 to avoid permissions issues with the API in a Docker container 247 self.archive_path.chmod(0o775) 248 249 if debug: 250 dprint(f"Created archive '{self.archive_path}'.") 251 return self.archive_path
Compress the plugin's source files into a .tar.gz
archive and return the archive's path.
Parameters
- debug (bool, default False): Verbosity toggle.
Returns
- A
pathlib.Path
to the archive file's path.
254 def install( 255 self, 256 force: bool = False, 257 debug: bool = False, 258 ) -> SuccessTuple: 259 """ 260 Extract a plugin's tar archive to the plugins directory. 261 262 This function checks if the plugin is already installed and if the version is equal or 263 greater than the existing installation. 264 265 Parameters 266 ---------- 267 force: bool, default False 268 If `True`, continue with installation, even if required packages fail to install. 269 270 debug: bool, default False 271 Verbosity toggle. 272 273 Returns 274 ------- 275 A `SuccessTuple` of success (bool) and a message (str). 276 277 """ 278 if self.full_name in _ongoing_installations: 279 return True, f"Already installing plugin '{self}'." 280 _ongoing_installations.add(self.full_name) 281 from meerschaum.utils.warnings import warn, error 282 if debug: 283 from meerschaum.utils.debug import dprint 284 import tarfile 285 import re 286 import ast 287 from meerschaum.plugins import sync_plugins_symlinks 288 from meerschaum.utils.packages import attempt_import, determine_version, reload_meerschaum 289 from meerschaum.utils.venv import init_venv 290 from meerschaum.utils.misc import safely_extract_tar 291 old_cwd = os.getcwd() 292 old_version = '' 293 new_version = '' 294 temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name 295 temp_dir.mkdir(exist_ok=True) 296 297 if not self.archive_path.exists(): 298 return False, f"Missing archive file for plugin '{self}'." 299 if self.version is not None: 300 old_version = self.version 301 if debug: 302 dprint(f"Found existing version '{old_version}' for plugin '{self}'.") 303 304 if debug: 305 dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...") 306 307 try: 308 with tarfile.open(self.archive_path, 'r:gz') as tarf: 309 safely_extract_tar(tarf, temp_dir) 310 except Exception as e: 311 warn(e) 312 return False, f"Failed to extract plugin '{self.name}'." 313 314 ### search for version information 315 files = os.listdir(temp_dir) 316 317 if str(files[0]) == self.name: 318 is_dir = True 319 elif str(files[0]) == self.name + '.py': 320 is_dir = False 321 else: 322 error(f"Unknown format encountered for plugin '{self}'.") 323 324 fpath = temp_dir / files[0] 325 if is_dir: 326 fpath = fpath / '__init__.py' 327 328 init_venv(self.name, debug=debug) 329 with open(fpath, 'r', encoding='utf-8') as f: 330 init_lines = f.readlines() 331 new_version = None 332 for line in init_lines: 333 if '__version__' not in line: 334 continue 335 version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip()) 336 if not version_match: 337 continue 338 new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip()) 339 break 340 if not new_version: 341 warn( 342 f"No `__version__` defined for plugin '{self}'. " 343 + "Assuming new version...", 344 stack = False, 345 ) 346 347 packaging_version = attempt_import('packaging.version') 348 try: 349 is_new_version = (not new_version and not old_version) or ( 350 packaging_version.parse(old_version) < packaging_version.parse(new_version) 351 ) 352 is_same_version = new_version and old_version and ( 353 packaging_version.parse(old_version) == packaging_version.parse(new_version) 354 ) 355 except Exception as e: 356 is_new_version, is_same_version = True, False 357 358 ### Determine where to permanently store the new plugin. 359 plugin_installation_dir_path = PLUGINS_DIR_PATHS[0] 360 for path in PLUGINS_DIR_PATHS: 361 files_in_plugins_dir = os.listdir(path) 362 if ( 363 self.name in files_in_plugins_dir 364 or 365 (self.name + '.py') in files_in_plugins_dir 366 ): 367 plugin_installation_dir_path = path 368 break 369 370 success_msg = f"Successfully installed plugin '{self}'." 371 success, abort = None, None 372 373 if is_same_version and not force: 374 success, msg = True, ( 375 f"Plugin '{self}' is up-to-date (version {old_version}).\n" + 376 " Install again with `-f` or `--force` to reinstall." 377 ) 378 abort = True 379 elif is_new_version or force: 380 for src_dir, dirs, files in os.walk(temp_dir): 381 if success is not None: 382 break 383 dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path)) 384 if not os.path.exists(dst_dir): 385 os.mkdir(dst_dir) 386 for f in files: 387 src_file = os.path.join(src_dir, f) 388 dst_file = os.path.join(dst_dir, f) 389 if os.path.exists(dst_file): 390 os.remove(dst_file) 391 392 if debug: 393 dprint(f"Moving '{src_file}' to '{dst_dir}'...") 394 try: 395 shutil.move(src_file, dst_dir) 396 except Exception as e: 397 success, msg = False, ( 398 f"Failed to install plugin '{self}': " + 399 f"Could not move file '{src_file}' to '{dst_dir}'" 400 ) 401 print(msg) 402 break 403 if success is None: 404 success, msg = True, success_msg 405 else: 406 success, msg = False, ( 407 f"Your installed version of plugin '{self}' ({old_version}) is higher than " 408 + f"attempted version {new_version}." 409 ) 410 411 shutil.rmtree(temp_dir) 412 os.chdir(old_cwd) 413 414 ### Reload the plugin's module. 415 sync_plugins_symlinks(debug=debug) 416 if '_module' in self.__dict__: 417 del self.__dict__['_module'] 418 init_venv(venv=self.name, force=True, debug=debug) 419 reload_meerschaum(debug=debug) 420 421 ### if we've already failed, return here 422 if not success or abort: 423 _ongoing_installations.remove(self.full_name) 424 return success, msg 425 426 ### attempt to install dependencies 427 if not self.install_dependencies(force=force, debug=debug): 428 _ongoing_installations.remove(self.full_name) 429 return False, f"Failed to install dependencies for plugin '{self}'." 430 431 ### handling success tuple, bool, or other (typically None) 432 setup_tuple = self.setup(debug=debug) 433 if isinstance(setup_tuple, tuple): 434 if not setup_tuple[0]: 435 success, msg = setup_tuple 436 elif isinstance(setup_tuple, bool): 437 if not setup_tuple: 438 success, msg = False, ( 439 f"Failed to run post-install setup for plugin '{self}'." + '\n' + 440 f"Check `setup()` in '{self.__file__}' for more information " + 441 f"(no error message provided)." 442 ) 443 else: 444 success, msg = True, success_msg 445 elif setup_tuple is None: 446 success = True 447 msg = ( 448 f"Post-install for plugin '{self}' returned None. " + 449 f"Assuming plugin successfully installed." 450 ) 451 warn(msg) 452 else: 453 success = False 454 msg = ( 455 f"Post-install for plugin '{self}' returned unexpected value " + 456 f"of type '{type(setup_tuple)}': {setup_tuple}" 457 ) 458 459 _ongoing_installations.remove(self.full_name) 460 module = self.module 461 return success, msg
Extract a plugin's tar archive to the plugins directory.
This function checks if the plugin is already installed and if the version is equal or greater than the existing installation.
Parameters
- force (bool, default False):
If
True
, continue with installation, even if required packages fail to install. - debug (bool, default False): Verbosity toggle.
Returns
- A
SuccessTuple
of success (bool) and a message (str).
464 def remove_archive( 465 self, 466 debug: bool = False 467 ) -> SuccessTuple: 468 """Remove a plugin's archive file.""" 469 if not self.archive_path.exists(): 470 return True, f"Archive file for plugin '{self}' does not exist." 471 try: 472 self.archive_path.unlink() 473 except Exception as e: 474 return False, f"Failed to remove archive for plugin '{self}':\n{e}" 475 return True, "Success"
Remove a plugin's archive file.
478 def remove_venv( 479 self, 480 debug: bool = False 481 ) -> SuccessTuple: 482 """Remove a plugin's virtual environment.""" 483 if not self.venv_path.exists(): 484 return True, f"Virtual environment for plugin '{self}' does not exist." 485 try: 486 shutil.rmtree(self.venv_path) 487 except Exception as e: 488 return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}" 489 return True, "Success"
Remove a plugin's virtual environment.
492 def uninstall(self, debug: bool = False) -> SuccessTuple: 493 """ 494 Remove a plugin, its virtual environment, and archive file. 495 """ 496 from meerschaum.utils.packages import reload_meerschaum 497 from meerschaum.plugins import sync_plugins_symlinks 498 from meerschaum.utils.warnings import warn, info 499 warnings_thrown_count: int = 0 500 max_warnings: int = 3 501 502 if not self.is_installed(): 503 info( 504 f"Plugin '{self.name}' doesn't seem to be installed.\n " 505 + "Checking for artifacts...", 506 stack = False, 507 ) 508 else: 509 real_path = pathlib.Path(os.path.realpath(self.__file__)) 510 try: 511 if real_path.name == '__init__.py': 512 shutil.rmtree(real_path.parent) 513 else: 514 real_path.unlink() 515 except Exception as e: 516 warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False) 517 warnings_thrown_count += 1 518 else: 519 info(f"Removed source files for plugin '{self.name}'.") 520 521 if self.venv_path.exists(): 522 success, msg = self.remove_venv(debug=debug) 523 if not success: 524 warn(msg, stack=False) 525 warnings_thrown_count += 1 526 else: 527 info(f"Removed virtual environment from plugin '{self.name}'.") 528 529 success = warnings_thrown_count < max_warnings 530 sync_plugins_symlinks(debug=debug) 531 self.deactivate_venv(force=True, debug=debug) 532 reload_meerschaum(debug=debug) 533 return success, ( 534 f"Successfully uninstalled plugin '{self}'." if success 535 else f"Failed to uninstall plugin '{self}'." 536 )
Remove a plugin, its virtual environment, and archive file.
539 def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]: 540 """ 541 If exists, run the plugin's `setup()` function. 542 543 Parameters 544 ---------- 545 *args: str 546 The positional arguments passed to the `setup()` function. 547 548 debug: bool, default False 549 Verbosity toggle. 550 551 **kw: Any 552 The keyword arguments passed to the `setup()` function. 553 554 Returns 555 ------- 556 A `SuccessTuple` or `bool` indicating success. 557 558 """ 559 from meerschaum.utils.debug import dprint 560 import inspect 561 _setup = None 562 for name, fp in inspect.getmembers(self.module): 563 if name == 'setup' and inspect.isfunction(fp): 564 _setup = fp 565 break 566 567 ### assume success if no setup() is found (not necessary) 568 if _setup is None: 569 return True 570 571 sig = inspect.signature(_setup) 572 has_debug, has_kw = ('debug' in sig.parameters), False 573 for k, v in sig.parameters.items(): 574 if '**' in str(v): 575 has_kw = True 576 break 577 578 _kw = {} 579 if has_kw: 580 _kw.update(kw) 581 if has_debug: 582 _kw['debug'] = debug 583 584 if debug: 585 dprint(f"Running setup for plugin '{self}'...") 586 try: 587 self.activate_venv(debug=debug) 588 return_tuple = _setup(*args, **_kw) 589 self.deactivate_venv(debug=debug) 590 except Exception as e: 591 return False, str(e) 592 593 if isinstance(return_tuple, tuple): 594 return return_tuple 595 if isinstance(return_tuple, bool): 596 return return_tuple, f"Setup for Plugin '{self.name}' did not return a message." 597 if return_tuple is None: 598 return False, f"Setup for Plugin '{self.name}' returned None." 599 return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"
602 def get_dependencies( 603 self, 604 debug: bool = False, 605 ) -> List[str]: 606 """ 607 If the Plugin has specified dependencies in a list called `required`, return the list. 608 609 **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages. 610 Meerschaum plugins may also specify connector keys for a repo after `'@'`. 611 612 Parameters 613 ---------- 614 debug: bool, default False 615 Verbosity toggle. 616 617 Returns 618 ------- 619 A list of required packages and plugins (str). 620 621 """ 622 if '_required' in self.__dict__: 623 return self._required 624 625 ### If the plugin has not yet been imported, 626 ### infer the dependencies from the source text. 627 ### This is not super robust, and it doesn't feel right 628 ### having multiple versions of the logic. 629 ### This is necessary when determining the activation order 630 ### without having import the module. 631 ### For consistency's sake, the module-less method does not cache the requirements. 632 if self.__dict__.get('_module', None) is None: 633 file_path = self.__file__ 634 if file_path is None: 635 return [] 636 with open(file_path, 'r', encoding='utf-8') as f: 637 text = f.read() 638 639 if 'required' not in text: 640 return [] 641 642 ### This has some limitations: 643 ### It relies on `required` being manually declared. 644 ### We lose the ability to dynamically alter the `required` list, 645 ### which is why we've kept the module-reliant method below. 646 import ast, re 647 ### NOTE: This technically would break 648 ### if `required` was the very first line of the file. 649 req_start_match = re.search(r'\nrequired(\s?)=', text) 650 if not req_start_match: 651 return [] 652 req_start = req_start_match.start() 653 654 ### Dependencies may have brackets within the strings, so push back the index. 655 first_opening_brace = req_start + 1 + text[req_start:].find('[') 656 if first_opening_brace == -1: 657 return [] 658 659 next_closing_brace = req_start + 1 + text[req_start:].find(']') 660 if next_closing_brace == -1: 661 return [] 662 663 start_ix = first_opening_brace + 1 664 end_ix = next_closing_brace 665 666 num_braces = 0 667 while True: 668 if '[' not in text[start_ix:end_ix]: 669 break 670 num_braces += 1 671 start_ix = end_ix 672 end_ix += text[end_ix + 1:].find(']') + 1 673 674 req_end = end_ix + 1 675 req_text = ( 676 text[req_start:req_end] 677 .lstrip() 678 .replace('required', '', 1) 679 .lstrip() 680 .replace('=', '', 1) 681 .lstrip() 682 ) 683 try: 684 required = ast.literal_eval(req_text) 685 except Exception as e: 686 warn( 687 f"Unable to determine requirements for plugin '{self.name}' " 688 + "without importing the module.\n" 689 + " This may be due to dynamically setting the global `required` list.\n" 690 + f" {e}" 691 ) 692 return [] 693 return required 694 695 import inspect 696 self.activate_venv(dependencies=False, debug=debug) 697 required = [] 698 for name, val in inspect.getmembers(self.module): 699 if name == 'required': 700 required = val 701 break 702 self._required = required 703 self.deactivate_venv(dependencies=False, debug=debug) 704 return required
If the Plugin has specified dependencies in a list called required
, return the list.
NOTE: Dependecies which start with 'plugin:'
are Meerschaum plugins, not pip packages.
Meerschaum plugins may also specify connector keys for a repo after '@'
.
Parameters
- debug (bool, default False): Verbosity toggle.
Returns
- A list of required packages and plugins (str).
707 def get_required_plugins(self, debug: bool=False) -> List[meerschaum.plugins.Plugin]: 708 """ 709 Return a list of required Plugin objects. 710 """ 711 from meerschaum.utils.warnings import warn 712 from meerschaum.config import get_config 713 from meerschaum.config.static import STATIC_CONFIG 714 plugins = [] 715 _deps = self.get_dependencies(debug=debug) 716 sep = STATIC_CONFIG['plugins']['repo_separator'] 717 plugin_names = [ 718 _d[len('plugin:'):] for _d in _deps 719 if _d.startswith('plugin:') and len(_d) > len('plugin:') 720 ] 721 default_repo_keys = get_config('meerschaum', 'default_repository') 722 for _plugin_name in plugin_names: 723 if sep in _plugin_name: 724 try: 725 _plugin_name, _repo_keys = _plugin_name.split(sep) 726 except Exception as e: 727 _repo_keys = default_repo_keys 728 warn( 729 f"Invalid repo keys for required plugin '{_plugin_name}'.\n " 730 + f"Will try to use '{_repo_keys}' instead.", 731 stack = False, 732 ) 733 else: 734 _repo_keys = default_repo_keys 735 plugins.append(Plugin(_plugin_name, repo=_repo_keys)) 736 return plugins
Return a list of required Plugin objects.
739 def get_required_packages(self, debug: bool=False) -> List[str]: 740 """ 741 Return the required package names (excluding plugins). 742 """ 743 _deps = self.get_dependencies(debug=debug) 744 return [_d for _d in _deps if not _d.startswith('plugin:')]
Return the required package names (excluding plugins).
747 def activate_venv(self, dependencies: bool=True, debug: bool=False, **kw) -> bool: 748 """ 749 Activate the virtual environments for the plugin and its dependencies. 750 751 Parameters 752 ---------- 753 dependencies: bool, default True 754 If `True`, activate the virtual environments for required plugins. 755 756 Returns 757 ------- 758 A bool indicating success. 759 """ 760 from meerschaum.utils.venv import venv_target_path 761 from meerschaum.utils.packages import activate_venv 762 from meerschaum.utils.misc import make_symlink, is_symlink 763 from meerschaum.config._paths import PACKAGE_ROOT_PATH 764 765 if dependencies: 766 for plugin in self.get_required_plugins(debug=debug): 767 plugin.activate_venv(debug=debug, **kw) 768 769 vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True) 770 venv_meerschaum_path = vtp / 'meerschaum' 771 772 try: 773 success, msg = True, "Success" 774 if is_symlink(venv_meerschaum_path): 775 if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH: 776 venv_meerschaum_path.unlink() 777 success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH) 778 except Exception as e: 779 success, msg = False, str(e) 780 if not success: 781 warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}") 782 783 return activate_venv(self.name, debug=debug, **kw)
Activate the virtual environments for the plugin and its dependencies.
Parameters
- dependencies (bool, default True):
If
True
, activate the virtual environments for required plugins.
Returns
- A bool indicating success.
786 def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool: 787 """ 788 Deactivate the virtual environments for the plugin and its dependencies. 789 790 Parameters 791 ---------- 792 dependencies: bool, default True 793 If `True`, deactivate the virtual environments for required plugins. 794 795 Returns 796 ------- 797 A bool indicating success. 798 """ 799 from meerschaum.utils.packages import deactivate_venv 800 success = deactivate_venv(self.name, debug=debug, **kw) 801 if dependencies: 802 for plugin in self.get_required_plugins(debug=debug): 803 plugin.deactivate_venv(debug=debug, **kw) 804 return success
Deactivate the virtual environments for the plugin and its dependencies.
Parameters
- dependencies (bool, default True):
If
True
, deactivate the virtual environments for required plugins.
Returns
- A bool indicating success.
807 def install_dependencies( 808 self, 809 force: bool = False, 810 debug: bool = False, 811 ) -> bool: 812 """ 813 If specified, install dependencies. 814 815 **NOTE:** Dependencies that start with `'plugin:'` will be installed as 816 Meerschaum plugins from the same repository as this Plugin. 817 To install from a different repository, add the repo keys after `'@'` 818 (e.g. `'plugin:foo@api:bar'`). 819 820 Parameters 821 ---------- 822 force: bool, default False 823 If `True`, continue with the installation, even if some 824 required packages fail to install. 825 826 debug: bool, default False 827 Verbosity toggle. 828 829 Returns 830 ------- 831 A bool indicating success. 832 833 """ 834 from meerschaum.utils.packages import pip_install, venv_contains_package 835 from meerschaum.utils.debug import dprint 836 from meerschaum.utils.warnings import warn, info 837 from meerschaum.connectors.parse import parse_repo_keys 838 _deps = self.get_dependencies(debug=debug) 839 if not _deps and self.requirements_file_path is None: 840 return True 841 842 plugins = self.get_required_plugins(debug=debug) 843 for _plugin in plugins: 844 if _plugin.name == self.name: 845 warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False) 846 continue 847 _success, _msg = _plugin.repo_connector.install_plugin( 848 _plugin.name, debug=debug, force=force 849 ) 850 if not _success: 851 warn( 852 f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'" 853 + f" for plugin '{self.name}':\n" + _msg, 854 stack = False, 855 ) 856 if not force: 857 warn( 858 "Try installing with the `--force` flag to continue anyway.", 859 stack = False, 860 ) 861 return False 862 info( 863 "Continuing with installation despite the failure " 864 + "(careful, things might be broken!)...", 865 icon = False 866 ) 867 868 869 ### First step: parse `requirements.txt` if it exists. 870 if self.requirements_file_path is not None: 871 if not pip_install( 872 requirements_file_path=self.requirements_file_path, 873 venv=self.name, debug=debug 874 ): 875 warn( 876 f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.", 877 stack = False, 878 ) 879 if not force: 880 warn( 881 "Try installing with `--force` to continue anyway.", 882 stack = False, 883 ) 884 return False 885 info( 886 "Continuing with installation despite the failure " 887 + "(careful, things might be broken!)...", 888 icon = False 889 ) 890 891 892 ### Don't reinstall packages that are already included in required plugins. 893 packages = [] 894 _packages = self.get_required_packages(debug=debug) 895 accounted_for_packages = set() 896 for package_name in _packages: 897 for plugin in plugins: 898 if venv_contains_package(package_name, plugin.name): 899 accounted_for_packages.add(package_name) 900 break 901 packages = [pkg for pkg in _packages if pkg not in accounted_for_packages] 902 903 ### Attempt pip packages installation. 904 if packages: 905 for package in packages: 906 if not pip_install(package, venv=self.name, debug=debug): 907 warn( 908 f"Failed to install required package '{package}'" 909 + f" for plugin '{self.name}'.", 910 stack = False, 911 ) 912 if not force: 913 warn( 914 "Try installing with `--force` to continue anyway.", 915 stack = False, 916 ) 917 return False 918 info( 919 "Continuing with installation despite the failure " 920 + "(careful, things might be broken!)...", 921 icon = False 922 ) 923 return True
If specified, install dependencies.
NOTE: Dependencies that start with 'plugin:'
will be installed as
Meerschaum plugins from the same repository as this Plugin.
To install from a different repository, add the repo keys after '@'
(e.g. 'plugin:foo@api:bar'
).
Parameters
- force (bool, default False):
If
True
, continue with the installation, even if some required packages fail to install. - debug (bool, default False): Verbosity toggle.
Returns
- A bool indicating success.
37def make_action( 38 function: Callable[[Any], Any], 39 shell: bool = False, 40 activate: bool = True, 41 deactivate: bool = True, 42 debug: bool = False 43 ) -> Callable[[Any], Any]: 44 """ 45 Make a function a Meerschaum action. Useful for plugins that are adding multiple actions. 46 47 Parameters 48 ---------- 49 function: Callable[[Any], Any] 50 The function to become a Meerschaum action. Must accept all keyword arguments. 51 52 shell: bool, default False 53 Not used. 54 55 Returns 56 ------- 57 Another function (this is a decorator function). 58 59 Examples 60 -------- 61 >>> from meerschaum.plugins import make_action 62 >>> 63 >>> @make_action 64 ... def my_action(**kw): 65 ... print('foo') 66 ... return True, "Success" 67 >>> 68 """ 69 70 from meerschaum.actions import actions 71 from meerschaum.utils.formatting import pprint 72 package_name = function.__globals__['__name__'] 73 plugin_name = ( 74 package_name.split('.')[1] 75 if package_name.startswith('plugins.') else None 76 ) 77 plugin = Plugin(plugin_name) if plugin_name else None 78 79 if debug: 80 from meerschaum.utils.debug import dprint 81 dprint( 82 f"Adding action '{function.__name__}' from plugin " + 83 f"'{plugin}'..." 84 ) 85 86 actions[function.__name__] = function 87 return function
Make a function a Meerschaum action. Useful for plugins that are adding multiple actions.
Parameters
- function (Callable[[Any], Any]): The function to become a Meerschaum action. Must accept all keyword arguments.
- shell (bool, default False): Not used.
Returns
- Another function (this is a decorator function).
Examples
>>> from meerschaum.plugins import make_action
>>>
>>> @make_action
... def my_action(**kw):
... print('foo')
... return True, "Success"
>>>
160def api_plugin(function: Callable[[Any], Any]) -> Callable[[Any], Any]: 161 """ 162 Execute the function when initializing the Meerschaum API module. 163 Useful for lazy-loading heavy plugins only when the API is started, 164 such as when editing the `meerschaum.api.app` FastAPI app. 165 166 The FastAPI app will be passed as the only parameter. 167 168 Parameters 169 ---------- 170 function: Callable[[Any, Any]] 171 The function to be called before starting the Meerschaum API. 172 173 Returns 174 ------- 175 Another function (decorator function). 176 177 Examples 178 -------- 179 >>> from meerschaum.plugins import api_plugin 180 >>> 181 >>> @api_plugin 182 >>> def initialize_plugin(app): 183 ... @app.get('/my/new/path') 184 ... def new_path(): 185 ... return {'message': 'It works!'} 186 >>> 187 """ 188 with _locks['_api_plugins']: 189 try: 190 if function.__module__ not in _api_plugins: 191 _api_plugins[function.__module__] = [] 192 _api_plugins[function.__module__].append(function) 193 except Exception as e: 194 from meerschaum.utils.warnings import warn 195 warn(e) 196 return function
Execute the function when initializing the Meerschaum API module.
Useful for lazy-loading heavy plugins only when the API is started,
such as when editing the meerschaum.api.app
FastAPI app.
The FastAPI app will be passed as the only parameter.
Parameters
- function (Callable[[Any, Any]]): The function to be called before starting the Meerschaum API.
Returns
- Another function (decorator function).
Examples
>>> from meerschaum.plugins import api_plugin
>>>
>>> @api_plugin
>>> def initialize_plugin(app):
... @app.get('/my/new/path')
... def new_path():
... return {'message': 'It works!'}
>>>
354def import_plugins( 355 *plugins_to_import: Union[str, List[str], None], 356 warn: bool = True, 357 ) -> Union[ 358 'ModuleType', Tuple['ModuleType', None] 359 ]: 360 """ 361 Import the Meerschaum plugins directory. 362 363 Parameters 364 ---------- 365 plugins_to_import: Union[str, List[str], None] 366 If provided, only import the specified plugins. 367 Otherwise import the entire plugins module. May be a string, list, or `None`. 368 Defaults to `None`. 369 370 Returns 371 ------- 372 A module of list of modules, depening on the number of plugins provided. 373 374 """ 375 import sys 376 import os 377 import importlib 378 from meerschaum.utils.misc import flatten_list 379 from meerschaum.config._paths import PLUGINS_RESOURCES_PATH 380 from meerschaum.utils.venv import is_venv_active, activate_venv, deactivate_venv, Venv 381 from meerschaum.utils.warnings import warn as _warn 382 plugins_to_import = list(plugins_to_import) 383 with _locks['sys.path']: 384 385 ### Since plugins may depend on other plugins, 386 ### we need to activate the virtual environments for library plugins. 387 ### This logic exists in `Plugin.activate_venv()`, 388 ### but that code requires the plugin's module to already be imported. 389 ### It's not a guarantee of correct activation order, 390 ### e.g. if a library plugin pins a specific package and another 391 plugins_names = get_plugins_names() 392 already_active_venvs = [is_venv_active(plugin_name) for plugin_name in plugins_names] 393 394 if not sys.path or sys.path[0] != str(PLUGINS_RESOURCES_PATH.parent): 395 sys.path.insert(0, str(PLUGINS_RESOURCES_PATH.parent)) 396 397 if not plugins_to_import: 398 for plugin_name in plugins_names: 399 activate_venv(plugin_name) 400 try: 401 imported_plugins = importlib.import_module(PLUGINS_RESOURCES_PATH.stem) 402 except ImportError as e: 403 _warn(f"Failed to import the plugins module:\n {e}") 404 import traceback 405 traceback.print_exc() 406 imported_plugins = None 407 for plugin_name in plugins_names: 408 if plugin_name in already_active_venvs: 409 continue 410 deactivate_venv(plugin_name) 411 412 else: 413 imported_plugins = [] 414 for plugin_name in flatten_list(plugins_to_import): 415 plugin = Plugin(plugin_name) 416 try: 417 with Venv(plugin): 418 imported_plugins.append( 419 importlib.import_module( 420 f'{PLUGINS_RESOURCES_PATH.stem}.{plugin_name}' 421 ) 422 ) 423 except Exception as e: 424 _warn( 425 f"Failed to import plugin '{plugin_name}':\n " 426 + f"{e}\n\nHere's a stacktrace:", 427 stack = False, 428 ) 429 from meerschaum.utils.formatting import get_console 430 get_console().print_exception( 431 suppress = [ 432 'meerschaum/plugins/__init__.py', 433 importlib, 434 importlib._bootstrap, 435 ] 436 ) 437 imported_plugins.append(None) 438 439 if imported_plugins is None and warn: 440 _warn(f"Failed to import plugins.", stacklevel=3) 441 442 if str(PLUGINS_RESOURCES_PATH.parent) in sys.path: 443 sys.path.remove(str(PLUGINS_RESOURCES_PATH.parent)) 444 445 if isinstance(imported_plugins, list): 446 return (imported_plugins[0] if len(imported_plugins) == 1 else tuple(imported_plugins)) 447 return imported_plugins
Import the Meerschaum plugins directory.
Parameters
- plugins_to_import (Union[str, List[str], None]):
If provided, only import the specified plugins.
Otherwise import the entire plugins module. May be a string, list, or
None
. Defaults toNone
.
Returns
- A module of list of modules, depening on the number of plugins provided.
489def reload_plugins(plugins: Optional[List[str]] = None, debug: bool = False) -> None: 490 """ 491 Reload plugins back into memory. 492 493 Parameters 494 ---------- 495 plugins: Optional[List[str]], default None 496 The plugins to reload. `None` will reload all plugins. 497 498 """ 499 import sys 500 if debug: 501 from meerschaum.utils.debug import dprint 502 503 if not plugins: 504 plugins = get_plugins_names() 505 for plugin_name in plugins: 506 if debug: 507 dprint(f"Reloading plugin '{plugin_name}'...") 508 mod_name = 'plugins.' + str(plugin_name) 509 if mod_name in sys.modules: 510 del sys.modules[mod_name] 511 load_plugins(debug=debug)
Reload plugins back into memory.
Parameters
- plugins (Optional[List[str]], default None):
The plugins to reload.
None
will reload all plugins.
514def get_plugins(*to_load, try_import: bool = True) -> Union[Tuple[Plugin], Plugin]: 515 """ 516 Return a list of `Plugin` objects. 517 518 Parameters 519 ---------- 520 to_load: 521 If specified, only load specific plugins. 522 Otherwise return all plugins. 523 524 try_import: bool, default True 525 If `True`, allow for plugins to be imported. 526 """ 527 from meerschaum.config._paths import PLUGINS_RESOURCES_PATH 528 import os 529 sync_plugins_symlinks() 530 _plugins = [ 531 Plugin(name) for name in ( 532 to_load or [ 533 ( 534 name if (PLUGINS_RESOURCES_PATH / name).is_dir() 535 else name[:-3] 536 ) for name in os.listdir(PLUGINS_RESOURCES_PATH) if name != '__init__.py' 537 ] 538 ) 539 ] 540 plugins = tuple(plugin for plugin in _plugins if plugin.is_installed(try_import=try_import)) 541 if len(to_load) == 1: 542 return plugins[0] 543 return plugins
Return a list of Plugin
objects.
Parameters
- to_load:: If specified, only load specific plugins. Otherwise return all plugins.
- try_import (bool, default True):
If
True
, allow for plugins to be imported.
560def get_data_plugins() -> List[Plugin]: 561 """ 562 Only return the modules of plugins with either `fetch()` or `sync()` functions. 563 """ 564 import inspect 565 plugins = get_plugins() 566 data_names = {'sync', 'fetch'} 567 data_plugins = [] 568 for plugin in plugins: 569 for name, ob in inspect.getmembers(plugin.module): 570 if not inspect.isfunction(ob): 571 continue 572 if name not in data_names: 573 continue 574 data_plugins.append(plugin) 575 return data_plugins
Only return the modules of plugins with either fetch()
or sync()
functions.
578def add_plugin_argument(*args, **kwargs) -> None: 579 """ 580 Add argparse arguments under the 'Plugins options' group. 581 Takes the same parameters as the regular argparse `add_argument()` function. 582 583 Examples 584 -------- 585 >>> add_plugin_argument('--foo', type=int, help="This is my help text!") 586 >>> 587 """ 588 from meerschaum._internal.arguments._parser import groups, _seen_plugin_args, parser 589 from meerschaum.utils.warnings import warn, error 590 _parent_plugin_name = _get_parent_plugin(2) 591 title = f"Plugin '{_parent_plugin_name}' options" if _parent_plugin_name else 'Custom options' 592 group_key = 'plugin_' + (_parent_plugin_name or '') 593 if group_key not in groups: 594 groups[group_key] = parser.add_argument_group( 595 title = title, 596 ) 597 _seen_plugin_args[group_key] = set() 598 try: 599 if str(args) not in _seen_plugin_args[group_key]: 600 groups[group_key].add_argument(*args, **kwargs) 601 _seen_plugin_args[group_key].add(str(args)) 602 except Exception as e: 603 warn(e)
Add argparse arguments under the 'Plugins options' group.
Takes the same parameters as the regular argparse add_argument()
function.
Examples
>>> add_plugin_argument('--foo', type=int, help="This is my help text!")
>>>
90def pre_sync_hook( 91 function: Callable[[Any], Any], 92 ) -> Callable[[Any], Any]: 93 """ 94 Register a function as a sync hook to be executed right before sync. 95 96 Parameters 97 ---------- 98 function: Callable[[Any], Any] 99 The function to execute right before a sync. 100 101 Returns 102 ------- 103 Another function (this is a decorator function). 104 105 Examples 106 -------- 107 >>> from meerschaum.plugins import pre_sync_hook 108 >>> 109 >>> @pre_sync_hook 110 ... def log_sync(pipe, **kwargs): 111 ... print(f"About to sync {pipe} with kwargs:\n{kwargs}.") 112 >>> 113 """ 114 with _locks['_pre_sync_hooks']: 115 try: 116 if function.__module__ not in _pre_sync_hooks: 117 _pre_sync_hooks[function.__module__] = [] 118 _pre_sync_hooks[function.__module__].append(function) 119 except Exception as e: 120 from meerschaum.utils.warnings import warn 121 warn(e) 122 return function
Register a function as a sync hook to be executed right before sync.
Parameters
----------
function: Callable[[Any], Any]
The function to execute right before a sync.
Returns
-------
Another function (this is a decorator function).
Examples
--------
>>> from meerschaum.plugins import pre_sync_hook
>>>
>>> @pre_sync_hook
... def log_sync(pipe, **kwargs):
... print(f"About to sync {pipe} with kwargs:
{kwargs}.")
>
125def post_sync_hook( 126 function: Callable[[Any], Any], 127 ) -> Callable[[Any], Any]: 128 """ 129 Register a function as a sync hook to be executed upon completion of a sync. 130 131 Parameters 132 ---------- 133 function: Callable[[Any], Any] 134 The function to execute upon completion of a sync. 135 136 Returns 137 ------- 138 Another function (this is a decorator function). 139 140 Examples 141 -------- 142 >>> from meerschaum.plugins import post_sync_hook 143 >>> 144 >>> @post_sync_hook 145 ... def log_sync(pipe, success_tuple, duration=None, **kwargs): 146 ... print(f"It took {round(duration, 2)} seconds to sync {pipe}.") 147 >>> 148 """ 149 with _locks['_post_sync_hooks']: 150 try: 151 if function.__module__ not in _post_sync_hooks: 152 _post_sync_hooks[function.__module__] = [] 153 _post_sync_hooks[function.__module__].append(function) 154 except Exception as e: 155 from meerschaum.utils.warnings import warn 156 warn(e) 157 return function
Register a function as a sync hook to be executed upon completion of a sync.
Parameters
- function (Callable[[Any], Any]): The function to execute upon completion of a sync.
Returns
- Another function (this is a decorator function).
Examples
>>> from meerschaum.plugins import post_sync_hook
>>>
>>> @post_sync_hook
... def log_sync(pipe, success_tuple, duration=None, **kwargs):
... print(f"It took {round(duration, 2)} seconds to sync {pipe}.")
>>>