Package meerschaum
PyPI | GitHub | Info | Stats |
---|---|---|---|
What is Meerschaum?
Meerschaum is a tool for quickly synchronizing time-series data streams called pipes. With Meerschaum, you can have a data visualization stack running in minutes.
Why Meerschaum?
If you've worked with time-series data, you know the headaches that come with ETL. Data engineering often gets in analysts' way, and when work needs to get done, every minute spent on pipelining is time taken away from real analysis.
Rather than copy / pasting your ETL scripts, simply build pipes with Meerschaum! Meerschaum gives you the tools to design your data streams how you like ― and don't worry — you can always incorporate Meerschaum into your existing systems!
Want to Learn More?
You can find a wealth of information at meerschaum.io!
Additionally, below are several articles published about Meerschaum:
- Interview featured in Console 100 - The Open Source Newsletter
- A Data Scientist's Guide to Fetching COVID-19 Data in 2022 (Towards Data Science)
- Time-Series ETL with Meerschaum (Towards Data Science)
- How I automatically extract my M1 Finance transactions
Features
- 📊 Built for Data Scientists and Analysts
- Integrate with Pandas, Grafana and other popular data analysis tools.
- Persist your dataframes and always get the latest data.
- ⚡️ Production-Ready, Batteries Included
- Synchronization engine concurrently updates many time-series data streams.
- One-click deploy a TimescaleDB and Grafana stack for prototyping.
- Serve data to your entire organization through the power of
uvicorn
,gunicorn
, andFastAPI
.
- 🔌 Easily Expandable
- Ingest any data source with a simple plugin. Just return a DataFrame, and Meerschaum handles the rest.
- Add any function as a command to the Meerschaum system.
- Include Meerschaum in your projects with its easy-to-use Python API.
- ✨ Tailored for Your Experience
- Rich CLI makes managing your data streams surprisingly enjoyable!
- Web dashboard for those who prefer a more graphical experience.
- Manage your database connections with Meerschaum connectors
- Utility commands with sensible syntax let you control many pipes with grace.
- 💼 Portable from the Start
- The environment variable
$MRSM_ROOT_DIR
lets you emulate multiple installations and group together your instances. - No dependencies required; anything needed will be installed into a virtual environment.
- Specify required packages for your plugins, and users will get those packages in a virtual environment.
- The environment variable
Installation
For a more thorough setup guide, visit the Getting Started page at meerschaum.io.
TL;DR
pip install -U --user meerschaum
mrsm stack up -d db grafana
mrsm bootstrap pipes
Usage Documentation
Please visit meerschaum.io for setup, usage, and troubleshooting information. You can find technical documentation at docs.meerschaum.io, and here is a complete list of the Meerschaum actions.
>>> import meerschaum as mrsm
>>> pipe = mrsm.Pipe("plugin:noaa", "weather")
>>> df = pipe.get_data(begin='2022-02-02')
>>> df[['timestamp', 'station', 'temperature (wmoUnit:degC)']]
timestamp station temperature (wmoUnit:degC)
0 2022-03-29 09:54:00 KCEU 8.3
1 2022-03-29 09:52:00 KATL 10.6
2 2022-03-29 09:52:00 KCLT 7.2
3 2022-03-29 08:54:00 KCEU 8.3
4 2022-03-29 08:52:00 KATL 11.1
... ... ... ...
1626 2022-02-02 01:52:00 KATL 10.0
1627 2022-02-02 01:52:00 KCLT 7.8
1628 2022-02-02 00:54:00 KCEU 8.3
1629 2022-02-02 00:52:00 KATL 10.0
1630 2022-02-02 00:52:00 KCLT 8.3
[1631 rows x 3 columns]
>>>
Plugins
Here is the list of community plugins and the public plugins repository.
For details on installing, using, and writing plugins, check out the plugins documentation at meerschaum.io.
Example Plugin
# ~/.config/meerschaum/plugins/example.py
__version__ = '0.0.1'
required = []
def register(pipe, **kw):
return {
'columns': {
'datetime': 'dt',
'id': 'id',
'value': 'val',
}
}
def fetch(pipe, **kw):
import datetime, random
return {
'dt': [datetime.datetime.utcnow()],
'id': [1],
'val': [random.randint(0, 100)],
}
Support Meerschaum's Development
For consulting services and to support Meerschaum's development, please considering sponsoring me on GitHub sponsors.
Additionally, you can always buy me a coffee☕!
Expand source code
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
"""
Copyright 2023 Bennett Meares
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from meerschaum.core.Pipe import Pipe
from meerschaum.plugins import Plugin
from meerschaum.utils import get_pipes
from meerschaum.utils.venv import Venv
from meerschaum.utils.formatting import pprint
from meerschaum._internal.docs import index as __doc__
from meerschaum.connectors import get_connector
from meerschaum.config import __version__, get_config
from meerschaum.utils.packages import attempt_import
__pdoc__ = {'gui': False, 'api': False, 'core': False,}
__all__ = (
"Pipe",
"get_pipes",
"get_connector",
"Plugin",
"Venv",
"Plugin",
"pprint",
"attempt_import",
)
Sub-modules
meerschaum.actions
-
Default actions available to the mrsm CLI.
meerschaum.config
-
Meerschaum v1.6.13
meerschaum.connectors
-
Create connectors with
get_connector()
. For ease of use, you can also import from the rootmeerschaum
module:```python-repl >>> from …
meerschaum.plugins
-
Expose plugin management APIs from the
meerschaum.plugins
module. meerschaum.utils
-
The utils module contains utility functions. These include tools from primary utilities (get_pipes) to miscellaneous helper functions.
Functions
def attempt_import(*names: List[str], lazy: bool = True, warn: bool = True, install: bool = True, venv: Optional[str] = 'mrsm', precheck: bool = True, split: bool = True, check_update: bool = False, check_pypi: bool = False, check_is_installed: bool = True, color: bool = True, debug: bool = False) ‑> Union['ModuleType', Tuple['ModuleType']]
-
Raise a warning if packages are not installed; otherwise import and return modules. If
lazy
isTrue
, return lazy-imported modules.Returns tuple of modules if multiple names are provided, else returns one module.
Parameters
names
:List[str]
- The packages to be imported.
lazy
:bool
, defaultTrue
- If
True
, lazily load packages. warn
:bool
, defaultTrue
- If
True
, raise a warning if a package cannot be imported. install
:bool
, defaultTrue
- If
True
, attempt to install a missing package into the designated virtual environment. Ifcheck_update
is True, install updates if available. venv
:Optional[str]
, default'mrsm'
- The virtual environment in which to search for packages and to install packages into.
precheck
:bool
, defaultTrue
- If
True
, attempt to find module before importing (necessary for checking if modules exist and retaining lazy imports), otherwise assume lazy isFalse
. split
:bool
, defaultTrue
- If
True
, split packages' names on'.'
. check_update
:bool
, defaultFalse
- If
True
andinstall
isTrue
, install updates if the required minimum version does not match. check_pypi
:bool
, defaultFalse
- If
True
andcheck_update
isTrue
, check PyPI when determining whether an update is required. check_is_installed
:bool
, defaultTrue
- If
True
, check if the package is contained in the virtual environment.
Returns
The specified modules. If they're not available and
install
isTrue
, it will first download them into a virtual environment and return the modules.Examples
>>> pandas, sqlalchemy = attempt_import('pandas', 'sqlalchemy') >>> pandas = attempt_import('pandas')
Expand source code
def attempt_import( *names: List[str], lazy: bool = True, warn: bool = True, install: bool = True, venv: Optional[str] = 'mrsm', precheck: bool = True, split: bool = True, check_update: bool = False, check_pypi: bool = False, check_is_installed: bool = True, color: bool = True, debug: bool = False ) -> Union['ModuleType', Tuple['ModuleType']]: """ Raise a warning if packages are not installed; otherwise import and return modules. If `lazy` is `True`, return lazy-imported modules. Returns tuple of modules if multiple names are provided, else returns one module. Parameters ---------- names: List[str] The packages to be imported. lazy: bool, default True If `True`, lazily load packages. warn: bool, default True If `True`, raise a warning if a package cannot be imported. install: bool, default True If `True`, attempt to install a missing package into the designated virtual environment. If `check_update` is True, install updates if available. venv: Optional[str], default 'mrsm' The virtual environment in which to search for packages and to install packages into. precheck: bool, default True If `True`, attempt to find module before importing (necessary for checking if modules exist and retaining lazy imports), otherwise assume lazy is `False`. split: bool, default True If `True`, split packages' names on `'.'`. check_update: bool, default False If `True` and `install` is `True`, install updates if the required minimum version does not match. check_pypi: bool, default False If `True` and `check_update` is `True`, check PyPI when determining whether an update is required. check_is_installed: bool, default True If `True`, check if the package is contained in the virtual environment. Returns ------- The specified modules. If they're not available and `install` is `True`, it will first download them into a virtual environment and return the modules. Examples -------- >>> pandas, sqlalchemy = attempt_import('pandas', 'sqlalchemy') >>> pandas = attempt_import('pandas') """ import importlib.util ### to prevent recursion, check if parent Meerschaum package is being imported if names == ('meerschaum',): return _import_module('meerschaum') if venv == 'mrsm' and _import_hook_venv is not None: if debug: print(f"Import hook for virtual environment '{_import_hook_venv}' is active.") venv = _import_hook_venv _warnings = _import_module('meerschaum.utils.warnings') warn_function = _warnings.warn def do_import(_name: str, **kw) -> Union['ModuleType', None]: with Venv(venv=venv, debug=debug): ### determine the import method (lazy vs normal) from meerschaum.utils.misc import filter_keywords import_method = ( _import_module if not lazy else lazy_import ) try: mod = import_method(_name, **(filter_keywords(import_method, **kw))) except Exception as e: if warn: import traceback traceback.print_exception(type(e), e, e.__traceback__) warn_function( f"Failed to import module '{_name}'.\nException:\n{e}", ImportWarning, stacklevel = (5 if lazy else 4), color = False, ) mod = None return mod modules = [] for name in names: ### Check if package is a declared dependency. root_name = name.split('.')[0] if split else name install_name = _import_to_install_name(root_name) if install_name is None: install_name = root_name if warn and root_name != 'plugins': warn_function( f"Package '{root_name}' is not declared in meerschaum.utils.packages.", ImportWarning, stacklevel = 3, color = False ) ### Determine if the package exists. if precheck is False: found_module = ( do_import( name, debug=debug, warn=False, venv=venv, color=color, check_update=False, check_pypi=False, split=split, ) is not None ) else: if check_is_installed: with _locks['_is_installed_first_check']: if not _is_installed_first_check.get(name, False): package_is_installed = is_installed( name, venv = venv, split = split, debug = debug, ) _is_installed_first_check[name] = package_is_installed else: package_is_installed = _is_installed_first_check[name] else: package_is_installed = _is_installed_first_check.get( name, venv_contains_package(name, venv=venv, split=split, debug=debug) ) found_module = package_is_installed if not found_module: if install: if not pip_install( install_name, venv = venv, split = False, check_update = check_update, color = color, debug = debug ) and warn: warn_function( f"Failed to install '{install_name}'.", ImportWarning, stacklevel = 3, color = False, ) elif warn: ### Raise a warning if we can't find the package and install = False. warn_function( (f"\n\nMissing package '{name}' from virtual environment '{venv}'; " + "some features will not work correctly." + f"\n\nSet install=True when calling attempt_import.\n"), ImportWarning, stacklevel = 3, color = False, ) ### Do the import. Will be lazy if lazy=True. m = do_import( name, debug=debug, warn=warn, venv=venv, color=color, check_update=check_update, check_pypi=check_pypi, install=install, split=split, ) modules.append(m) modules = tuple(modules) if len(modules) == 1: return modules[0] return modules
def get_connector(type: str = None, label: str = None, refresh: bool = False, debug: bool = False, **kw: Any) ‑> meerschaum.connectors.Connector.Connector
-
Return existing connector or create new connection and store for reuse.
You can create new connectors if enough parameters are provided for the given type and flavor.
Parameters
type
:Optional[str]
, defaultNone
- Connector type (sql, api, etc.).
Defaults to the type of the configured
instance_connector
. label
:Optional[str]
, defaultNone
- Connector label (e.g. main). Defaults to
'main'
. refresh
:bool
, defaultFalse
- Refresh the Connector instance / construct new object. Defaults to
False
. kw
:Any
- Other arguments to pass to the Connector constructor.
If the Connector has already been constructed and new arguments are provided,
refresh
is set toTrue
and the old Connector is replaced.
Returns
A new Meerschaum connector (e.g.
meerschaum.connectors.api.APIConnector
,meerschaum.connectors.sql.SQLConnector
).Examples
The following parameters would create a new
meerschaum.connectors.sql.SQLConnector
that isn't in the configuration file.>>> conn = get_connector( ... type = 'sql', ... label = 'newlabel', ... flavor = 'sqlite', ... database = '/file/path/to/database.db' ... ) >>>
Expand source code
def get_connector( type: str = None, label: str = None, refresh: bool = False, debug: bool = False, **kw: Any ) -> Connector: """ Return existing connector or create new connection and store for reuse. You can create new connectors if enough parameters are provided for the given type and flavor. Parameters ---------- type: Optional[str], default None Connector type (sql, api, etc.). Defaults to the type of the configured `instance_connector`. label: Optional[str], default None Connector label (e.g. main). Defaults to `'main'`. refresh: bool, default False Refresh the Connector instance / construct new object. Defaults to `False`. kw: Any Other arguments to pass to the Connector constructor. If the Connector has already been constructed and new arguments are provided, `refresh` is set to `True` and the old Connector is replaced. Returns ------- A new Meerschaum connector (e.g. `meerschaum.connectors.api.APIConnector`, `meerschaum.connectors.sql.SQLConnector`). Examples -------- The following parameters would create a new `meerschaum.connectors.sql.SQLConnector` that isn't in the configuration file. ``` >>> conn = get_connector( ... type = 'sql', ... label = 'newlabel', ... flavor = 'sqlite', ... database = '/file/path/to/database.db' ... ) >>> ``` """ from meerschaum.connectors.parse import parse_instance_keys from meerschaum.config import get_config from meerschaum.config.static import STATIC_CONFIG from meerschaum.utils.warnings import warn global _loaded_plugin_connectors if isinstance(type, str) and not label and ':' in type: type, label = type.split(':', maxsplit=1) with _locks['_loaded_plugin_connectors']: if not _loaded_plugin_connectors: load_plugin_connectors() _loaded_plugin_connectors = True if type is None and label is None: default_instance_keys = get_config('meerschaum', 'instance', patch=True) ### recursive call to get_connector return parse_instance_keys(default_instance_keys) ### NOTE: the default instance connector may not be main. ### Only fall back to 'main' if the type is provided by the label is omitted. label = label if label is not None else STATIC_CONFIG['connectors']['default_label'] ### type might actually be a label. Check if so and raise a warning. if type not in connectors: possibilities, poss_msg = [], "" for _type in get_config('meerschaum', 'connectors'): if type in get_config('meerschaum', 'connectors', _type): possibilities.append(f"{_type}:{type}") if len(possibilities) > 0: poss_msg = " Did you mean" for poss in possibilities[:-1]: poss_msg += f" '{poss}'," if poss_msg.endswith(','): poss_msg = poss_msg[:-1] if len(possibilities) > 1: poss_msg += " or" poss_msg += f" '{possibilities[-1]}'?" warn(f"Cannot create Connector of type '{type}'." + poss_msg, stack=False) return None if 'sql' not in types: from meerschaum.connectors.plugin import PluginConnector with _locks['types']: types.update({ 'api' : APIConnector, 'sql' : SQLConnector, 'plugin': PluginConnector, }) ### determine if we need to call the constructor if not refresh: ### see if any user-supplied arguments differ from the existing instance if label in connectors[type]: warning_message = None for attribute, value in kw.items(): if attribute not in connectors[type][label].meta: import inspect cls = connectors[type][label].__class__ cls_init_signature = inspect.signature(cls) cls_init_params = cls_init_signature.parameters if attribute not in cls_init_params: warning_message = ( f"Received new attribute '{attribute}' not present in connector " + f"{connectors[type][label]}.\n" ) elif connectors[type][label].__dict__[attribute] != value: warning_message = ( f"Mismatched values for attribute '{attribute}' in connector " + f"'{connectors[type][label]}'.\n" + f" - Keyword value: '{value}'\n" + f" - Existing value: '{connectors[type][label].__dict__[attribute]}'\n" ) if warning_message is not None: warning_message += ( "\nSetting `refresh` to True and recreating connector with type:" + f" '{type}' and label '{label}'." ) refresh = True warn(warning_message) else: ### connector doesn't yet exist refresh = True ### only create an object if refresh is True ### (can be manually specified, otherwise determined above) if refresh: with _locks['connectors']: try: ### will raise an error if configuration is incorrect / missing conn = types[type](label=label, **kw) connectors[type][label] = conn except InvalidAttributesError as ie: warn( f"Incorrect attributes for connector '{type}:{label}'.\n" + str(ie), stack = False, ) conn = None except Exception as e: from meerschaum.utils.formatting import get_console console = get_console() if console: console.print_exception() warn( f"Exception when creating connector '{type}:{label}'.\n" + str(e), stack = False, ) conn = None if conn is None: return None return connectors[type][label]
def get_pipes(connector_keys: Union[str, List[str], None] = None, metric_keys: Union[str, List[str], None] = None, location_keys: Union[str, List[str], None] = None, tags: Optional[List[str], None] = None, params: Optional[Dict[str, Any]] = None, mrsm_instance: Union[str, InstanceConnector, None] = None, instance: Union[str, InstanceConnector, None] = None, as_list: bool = False, method: str = 'registered', wait: bool = False, debug: bool = False, **kw: Any) ‑> Union[PipesDict, List['Pipe']]
-
Return a dictionary or list of
Pipe
objects.Parameters
connector_keys
:Union[str, List[str], None]
, defaultNone
- String or list of connector keys.
If omitted or is
'*'
, fetch all possible keys. If a string begins with'_'
, select keys that do NOT match the string. metric_keys
:Union[str, List[str], None]
, defaultNone
- String or list of metric keys. See
connector_keys
for formatting. location_keys
:Union[str, List[str], None]
, defaultNone
- String or list of location keys. See
connector_keys
for formatting. tags
:Optional[List[str]]
, defaultNone
- If provided, only include pipes with these tags.
params
:Optional[Dict[str, Any]]
, defaultNone
- Dictionary of additional parameters to search by.
Params are parsed into a SQL WHERE clause.
E.g.
{'a': 1, 'b': 2}
equates to'WHERE a = 1 AND b = 2'
mrsm_instance
:Union[str, InstanceConnector, None]
, defaultNone
- Connector keys for the Meerschaum instance of the pipes.
Must be a
SQLConnector
orAPIConnector
. as_list
:bool
, defaultFalse
- If
True
, return pipes in a list instead of a hierarchical dictionary.False
:{connector_keys: {metric_key: {location_key: Pipe}}}
True
:[Pipe]
method
:str
, default'registered'
- Available options:
['registered', 'explicit', 'all']
If'registered'
(default), create pipes based on registered keys in the connector's pipes table (API or SQL connector, depends on mrsm_instance). If'explicit'
, create pipes from provided connector_keys, metric_keys, and location_keys instead of consulting the pipes table. Useful for creating non-existent pipes. If'all'
, create pipes from predefined metrics and locations. Requiredconnector_keys
. NOTE: Method'all'
is not implemented! wait
:bool
, defaultFalse
- Wait for a connection before getting Pipes. Should only be true for cases where the database might not be running (like the API).
**kw
:Any:
- Keyword arguments to pass to the
Pipe
constructor.
Returns
A dictionary of dictionaries and
Pipe
objects in the connector, metric, location hierarchy. Ifas_list
isTrue
, return a list ofPipe
objects.Examples
>>> ### Manual definition: >>> pipes = { ... <connector_keys>: { ... <metric_key>: { ... <location_key>: Pipe( ... <connector_keys>, ... <metric_key>, ... <location_key>, ... ), ... }, ... }, ... }, >>> ### Accessing a single pipe: >>> pipes['sql:main']['weather'][None] >>> ### Return a list instead: >>> get_pipes(as_list=True) [sql_main_weather] >>>
Expand source code
def get_pipes( connector_keys: Union[str, List[str], None] = None, metric_keys: Union[str, List[str], None] = None, location_keys: Union[str, List[str], None] = None, tags: Optional[List[str], None] = None, params: Optional[Dict[str, Any]] = None, mrsm_instance: Union[str, InstanceConnector, None] = None, instance: Union[str, InstanceConnector, None] = None, as_list: bool = False, method: str = 'registered', wait: bool = False, debug: bool = False, **kw: Any ) -> Union[PipesDict, List['meerschaum.Pipe']]: """ Return a dictionary or list of `meerschaum.Pipe` objects. Parameters ---------- connector_keys: Union[str, List[str], None], default None String or list of connector keys. If omitted or is `'*'`, fetch all possible keys. If a string begins with `'_'`, select keys that do NOT match the string. metric_keys: Union[str, List[str], None], default None String or list of metric keys. See `connector_keys` for formatting. location_keys: Union[str, List[str], None], default None String or list of location keys. See `connector_keys` for formatting. tags: Optional[List[str]], default None If provided, only include pipes with these tags. params: Optional[Dict[str, Any]], default None Dictionary of additional parameters to search by. Params are parsed into a SQL WHERE clause. E.g. `{'a': 1, 'b': 2}` equates to `'WHERE a = 1 AND b = 2'` mrsm_instance: Union[str, InstanceConnector, None], default None Connector keys for the Meerschaum instance of the pipes. Must be a `meerschaum.connectors.sql.SQLConnector.SQLConnector` or `meerschaum.connectors.api.APIConnector.APIConnector`. as_list: bool, default False If `True`, return pipes in a list instead of a hierarchical dictionary. `False` : `{connector_keys: {metric_key: {location_key: Pipe}}}` `True` : `[Pipe]` method: str, default 'registered' Available options: `['registered', 'explicit', 'all']` If `'registered'` (default), create pipes based on registered keys in the connector's pipes table (API or SQL connector, depends on mrsm_instance). If `'explicit'`, create pipes from provided connector_keys, metric_keys, and location_keys instead of consulting the pipes table. Useful for creating non-existent pipes. If `'all'`, create pipes from predefined metrics and locations. Required `connector_keys`. **NOTE:** Method `'all'` is not implemented! wait: bool, default False Wait for a connection before getting Pipes. Should only be true for cases where the database might not be running (like the API). **kw: Any: Keyword arguments to pass to the `meerschaum.Pipe` constructor. Returns ------- A dictionary of dictionaries and `meerschaum.Pipe` objects in the connector, metric, location hierarchy. If `as_list` is `True`, return a list of `meerschaum.Pipe` objects. Examples -------- ``` >>> ### Manual definition: >>> pipes = { ... <connector_keys>: { ... <metric_key>: { ... <location_key>: Pipe( ... <connector_keys>, ... <metric_key>, ... <location_key>, ... ), ... }, ... }, ... }, >>> ### Accessing a single pipe: >>> pipes['sql:main']['weather'][None] >>> ### Return a list instead: >>> get_pipes(as_list=True) [sql_main_weather] >>> ``` """ from meerschaum.config import get_config from meerschaum.utils.warnings import error from meerschaum.utils.misc import filter_keywords if connector_keys is None: connector_keys = [] if metric_keys is None: metric_keys = [] if location_keys is None: location_keys = [] if params is None: params = {} if tags is None: tags = [] if isinstance(connector_keys, str): connector_keys = [connector_keys] if isinstance(metric_keys, str): metric_keys = [metric_keys] if isinstance(location_keys, str): location_keys = [location_keys] ### Get SQL or API connector (keys come from `connector.fetch_pipes_keys()`). ### If `wait`, wait until a connection is made if mrsm_instance is None: mrsm_instance = instance if mrsm_instance is None: mrsm_instance = get_config('meerschaum', 'instance', patch=True) if isinstance(mrsm_instance, str): from meerschaum.connectors.parse import parse_instance_keys connector = parse_instance_keys(keys=mrsm_instance, wait=wait, debug=debug) else: ### NOTE: mrsm_instance MUST be a SQL or API connector for this to work from meerschaum.connectors import instance_types valid_connector = False if hasattr(mrsm_instance, 'type'): if mrsm_instance.type in instance_types: valid_connector = True if not valid_connector: error(f"Invalid instance connector: {mrsm_instance}") connector = mrsm_instance if debug: from meerschaum.utils.debug import dprint dprint(f"Using instance connector: {connector}") if not connector: error(f"Could not create connector from keys: '{mrsm_instance}'") ### Get a list of tuples for the keys needed to build pipes. result = fetch_pipes_keys( method, connector, connector_keys = connector_keys, metric_keys = metric_keys, location_keys = location_keys, tags = tags, params = params, debug = debug ) if result is None: error(f"Unable to build pipes!") ### Populate the `pipes` dictionary with Pipes based on the keys ### obtained from the chosen `method`. from meerschaum import Pipe pipes = {} for ck, mk, lk in result: if ck not in pipes: pipes[ck] = {} if mk not in pipes[ck]: pipes[ck][mk] = {} pipes[ck][mk][lk] = Pipe( ck, mk, lk, mrsm_instance=connector, debug=debug, **filter_keywords(Pipe, **kw) ) if not as_list: return pipes from meerschaum.utils.misc import flatten_pipes_dict return flatten_pipes_dict(pipes)
def pprint(*args, detect_password: bool = True, nopretty: bool = False, **kw)
-
Pretty print an object according to the configured ANSI and UNICODE settings. If detect_password is True (default), search and replace passwords with '*' characters. Does not mutate objects.
Parameters
*args :
detect_password
:bool :
- (Default value = True)
nopretty
:bool :
- (Default value = False)
**kw :
Returns
Expand source code
def pprint( *args, detect_password : bool = True, nopretty : bool = False, **kw ): """Pretty print an object according to the configured ANSI and UNICODE settings. If detect_password is True (default), search and replace passwords with '*' characters. Does not mutate objects. Parameters ---------- *args : detect_password : bool : (Default value = True) nopretty : bool : (Default value = False) **kw : Returns ------- """ from meerschaum.utils.packages import attempt_import, import_rich from meerschaum.utils.formatting import ANSI, UNICODE, get_console from meerschaum.utils.warnings import error from meerschaum.utils.misc import replace_password, dict_from_od, filter_keywords from collections import OrderedDict import copy, json modify = True rich_pprint = None if ANSI and not nopretty: rich = import_rich() if rich is not None: rich_pretty = attempt_import('rich.pretty') if rich_pretty is not None: def _rich_pprint(*args, **kw): _console = get_console() _kw = filter_keywords(_console.print, **kw) _console.print(*args, **_kw) rich_pprint = _rich_pprint elif not nopretty: pprintpp = attempt_import('pprintpp', warn=False) try: _pprint = pprintpp.pprint except Exception as e: import pprint as _pprint_module _pprint = _pprint_module.pprint func = ( _pprint if rich_pprint is None else rich_pprint ) if not nopretty else print try: args_copy = copy.deepcopy(args) except Exception as e: args_copy = args modify = False _args = [] for a in args: c = a ### convert OrderedDict into dict if isinstance(a, OrderedDict) or issubclass(type(a), OrderedDict): c = dict_from_od(copy.deepcopy(c)) _args.append(c) args = _args _args = list(args) if detect_password and modify: _args = [] for a in args: c = a if isinstance(c, dict): c = replace_password(copy.deepcopy(c)) if nopretty: try: c = json.dumps(c) is_json = True except Exception as e: is_json = False if not is_json: try: c = str(c) except Exception as e: pass _args.append(c) ### filter out unsupported keywords func_kw = filter_keywords(func, **kw) if not nopretty else {} error_msg = None try: func(*_args, **func_kw) except Exception as e: error_msg = e if error_msg is not None: error(error_msg)
Classes
class Pipe (connector: str = '', metric: str = '', location: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None, columns: Optional[Dict[str, str]] = None, tags: Optional[List[str]] = None, target: Optional[str] = None, dtypes: Optional[Dict[str, str]] = None, instance: Optional[Union[str, InstanceConnector]] = None, temporary: bool = False, mrsm_instance: Optional[Union[str, InstanceConnector]] = None, cache: bool = False, debug: bool = False, connector_keys: Optional[str] = None, metric_key: Optional[str] = None, location_key: Optional[str] = None)
-
Access Meerschaum pipes via Pipe objects.
Pipes are identified by the following:
- Connector keys (e.g.
'sql:main'
) - Metric key (e.g.
'weather'
) - Location (optional; e.g.
None
)
A pipe's connector keys correspond to a data source, and when the pipe is synced, its
fetch
definition is evaluated and executed to produce new data.Alternatively, new data may be directly synced via
pipe.sync()
:>>> from meerschaum import Pipe >>> pipe = Pipe('csv', 'weather') >>> >>> import pandas as pd >>> df = pd.read_csv('weather.csv') >>> pipe.sync(df)
Parameters
connector
:str
- Keys for the pipe's source connector, e.g.
'sql:main'
. metric
:str
- Label for the pipe's contents, e.g.
'weather'
. location
:str
, defaultNone
- Label for the pipe's location. Defaults to
None
. parameters
:Optional[Dict[str, Any]]
, defaultNone
- Optionally set a pipe's parameters from the constructor,
e.g. columns and other attributes.
You can edit these parameters with
edit pipes
. columns
:Optional[Dict[str, str]]
, defaultNone
- Set the
columns
dictionary ofparameters
. Ifparameters
is also provided, this dictionary is added under the'columns'
key. tags
:Optional[List[str]]
, defaultNone
- A list of strings to be added under the
'tags'
key ofparameters
. You can select pipes with certain tags using--tags
. dtypes
:Optional[Dict[str, str]]
, defaultNone
- Set the
dtypes
dictionary ofparameters
. Ifparameters
is also provided, this dictionary is added under the'dtypes'
key. mrsm_instance
:Optional[Union[str, InstanceConnector]]
, defaultNone
- Connector for the Meerschaum instance where the pipe resides.
Defaults to the preconfigured default instance (
'sql:main'
). instance
:Optional[Union[str, InstanceConnector]]
, defaultNone
- Alias for
mrsm_instance
. Ifmrsm_instance
is supplied, this value is ignored. temporary
:bool
, defaultFalse
- If
True
, prevent instance tables (pipes, users, plugins) from being created. cache
:bool
, defaultFalse
- If
True
, cache fetched data into a local database file. Defaults toFalse
.
Expand source code
class Pipe: """ Access Meerschaum pipes via Pipe objects. Pipes are identified by the following: 1. Connector keys (e.g. `'sql:main'`) 2. Metric key (e.g. `'weather'`) 3. Location (optional; e.g. `None`) A pipe's connector keys correspond to a data source, and when the pipe is synced, its `fetch` definition is evaluated and executed to produce new data. Alternatively, new data may be directly synced via `pipe.sync()`: ``` >>> from meerschaum import Pipe >>> pipe = Pipe('csv', 'weather') >>> >>> import pandas as pd >>> df = pd.read_csv('weather.csv') >>> pipe.sync(df) ``` """ from ._fetch import fetch from ._data import get_data, get_backtrack_data, get_rowcount, _get_data_as_iterator from ._register import register from ._attributes import ( attributes, parameters, columns, dtypes, get_columns, get_columns_types, get_indices, tags, get_id, id, get_val_column, parents, children, target, _target_legacy, guess_datetime, ) from ._show import show from ._edit import edit, edit_definition, update from ._sync import sync, get_sync_time, exists, filter_existing, _get_chunk_label from ._delete import delete from ._drop import drop from ._clear import clear from ._bootstrap import bootstrap from ._dtypes import enforce_dtypes, infer_dtypes def __init__( self, connector: str = '', metric: str = '', location: Optional[str] = None, parameters: Optional[Dict[str, Any]] = None, columns: Optional[Dict[str, str]] = None, tags: Optional[List[str]] = None, target: Optional[str] = None, dtypes: Optional[Dict[str, str]] = None, instance: Optional[Union[str, InstanceConnector]] = None, temporary: bool = False, mrsm_instance: Optional[Union[str, InstanceConnector]] = None, cache: bool = False, debug: bool = False, connector_keys: Optional[str] = None, metric_key: Optional[str] = None, location_key: Optional[str] = None, ): """ Parameters ---------- connector: str Keys for the pipe's source connector, e.g. `'sql:main'`. metric: str Label for the pipe's contents, e.g. `'weather'`. location: str, default None Label for the pipe's location. Defaults to `None`. parameters: Optional[Dict[str, Any]], default None Optionally set a pipe's parameters from the constructor, e.g. columns and other attributes. You can edit these parameters with `edit pipes`. columns: Optional[Dict[str, str]], default None Set the `columns` dictionary of `parameters`. If `parameters` is also provided, this dictionary is added under the `'columns'` key. tags: Optional[List[str]], default None A list of strings to be added under the `'tags'` key of `parameters`. You can select pipes with certain tags using `--tags`. dtypes: Optional[Dict[str, str]], default None Set the `dtypes` dictionary of `parameters`. If `parameters` is also provided, this dictionary is added under the `'dtypes'` key. mrsm_instance: Optional[Union[str, InstanceConnector]], default None Connector for the Meerschaum instance where the pipe resides. Defaults to the preconfigured default instance (`'sql:main'`). instance: Optional[Union[str, InstanceConnector]], default None Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored. temporary: bool, default False If `True`, prevent instance tables (pipes, users, plugins) from being created. cache: bool, default False If `True`, cache fetched data into a local database file. Defaults to `False`. """ from meerschaum.utils.warnings import error, warn if (not connector and not connector_keys) or (not metric and not metric_key): error( "Please provide strings for the connector and metric\n " + "(first two positional arguments)." ) ### Fall back to legacy `location_key` just in case. if not location: location = location_key if not connector: connector = connector_keys if not metric: metric = metric_key if location in ('[None]', 'None'): location = None from meerschaum.config.static import STATIC_CONFIG negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] for k in (connector, metric, location, *(tags or [])): if str(k).startswith(negation_prefix): error(f"A pipe's keys and tags cannot start with the prefix '{negation_prefix}'.") self.connector_keys = str(connector) self.connector_key = self.connector_keys ### Alias self.metric_key = metric self.location_key = location self.temporary = temporary self._attributes = { 'connector_keys': self.connector_keys, 'metric_key': self.metric_key, 'location_key': self.location_key, 'parameters': {}, } ### only set parameters if values are provided if isinstance(parameters, dict): self._attributes['parameters'] = parameters else: if parameters is not None: warn(f"The provided parameters are of invalid type '{type(parameters)}'.") self._attributes['parameters'] = {} if isinstance(columns, dict): self._attributes['parameters']['columns'] = columns elif columns is not None: warn(f"The provided columns are of invalid type '{type(columns)}'.") if isinstance(tags, (list, tuple)): self._attributes['parameters']['tags'] = tags elif tags is not None: warn(f"The provided tags are of invalid type '{type(tags)}'.") if isinstance(target, str): self._attributes['parameters']['target'] = target elif target is not None: warn(f"The provided target is of invalid type '{type(target)}'.") if isinstance(dtypes, dict): self._attributes['parameters']['dtypes'] = dtypes elif dtypes is not None: warn(f"The provided dtypes are of invalid type '{type(dtypes)}'.") ### NOTE: The parameters dictionary is {} by default. ### A Pipe may be registered without parameters, then edited, ### or a Pipe may be registered with parameters set in-memory first. # from meerschaum.config import get_config _mrsm_instance = mrsm_instance if mrsm_instance is not None else instance if _mrsm_instance is None: _mrsm_instance = get_config('meerschaum', 'instance', patch=True) if not isinstance(_mrsm_instance, str): self._instance_connector = _mrsm_instance self.instance_keys = str(_mrsm_instance) else: ### NOTE: must be SQL or API Connector for this work self.instance_keys = _mrsm_instance self._cache = cache and get_config('system', 'experimental', 'cache') @property def meta(self): """Simulate the MetaPipe model without importing FastAPI.""" if '_meta' not in self.__dict__: self._meta = { 'connector_keys' : self.connector_keys, 'metric_key' : self.metric_key, 'location_key' : self.location_key, 'instance' : self.instance_keys, } return self._meta @property def instance_connector(self) -> Union[InstanceConnector, None]: """ The connector to where this pipe resides. May either be of type `meerschaum.connectors.sql.SQLConnector` or `meerschaum.connectors.api.APIConnector`. """ if '_instance_connector' not in self.__dict__: from meerschaum.connectors.parse import parse_instance_keys conn = parse_instance_keys(self.instance_keys) if conn: self._instance_connector = conn else: return None return self._instance_connector @property def connector(self) -> Union[meerschaum.connectors.Connector, None]: """ The connector to the data source. """ if '_connector' not in self.__dict__: from meerschaum.connectors.parse import parse_instance_keys import warnings with warnings.catch_warnings(): warnings.simplefilter('ignore') try: conn = parse_instance_keys(self.connector_keys) except Exception as e: conn = None if conn: self._connector = conn else: return None return self._connector @property def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]: """ If the pipe was created with `cache=True`, return the connector to the pipe's SQLite database for caching. """ if not self._cache: return None if '_cache_connector' not in self.__dict__: from meerschaum.connectors import get_connector from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH _resources_path = SQLITE_RESOURCES_PATH self._cache_connector = get_connector( 'sql', '_cache_' + str(self), flavor='sqlite', database=str(_resources_path / ('_cache_' + str(self) + '.db')), ) return self._cache_connector @property def cache_pipe(self) -> Union['meerschaum.Pipe', None]: """ If the pipe was created with `cache=True`, return another `meerschaum.Pipe` used to manage the local data. """ if self.cache_connector is None: return None if '_cache_pipe' not in self.__dict__: from meerschaum.config._patch import apply_patch_to_config from meerschaum.utils.sql import sql_item_name _parameters = copy.deepcopy(self.parameters) _fetch_patch = { 'fetch': ({ 'definition': ( f"SELECT * " + f"FROM {sql_item_name(str(self.target), self.instance_connector.flavor)}" ), }) if self.instance_connector.type == 'sql' else ({ 'connector_keys': self.connector_keys, 'metric_key': self.metric_key, 'location_key': self.location_key, }) } _parameters = apply_patch_to_config(_parameters, _fetch_patch) self._cache_pipe = Pipe( self.instance_keys, (self.connector_keys + '_' + self.metric_key + '_cache'), self.location_key, mrsm_instance = self.cache_connector, parameters = _parameters, cache = False, temporary = True, ) return self._cache_pipe @property def sync_time(self) -> Union['datetime.datetime', None]: """ Convenience function to get the pipe's latest datetime. Use `meerschaum.Pipe.get_sync_time()` instead. """ return self.get_sync_time() def __str__(self, ansi: bool=False): return pipe_repr(self, ansi=ansi) def __eq__(self, other): try: return ( isinstance(self, type(other)) and self.connector_keys == other.connector_keys and self.metric_key == other.metric_key and self.location_key == other.location_key and self.instance_keys == other.instance_keys ) except Exception as e: return False def __hash__(self): ### Using an esoteric separator to avoid collisions. sep = "[\"']" return hash( str(self.connector_keys) + sep + str(self.metric_key) + sep + str(self.location_key) + sep + str(self.instance_keys) + sep ) def __repr__(self, **kw) -> str: return pipe_repr(self, **kw) def __getstate__(self) -> Dict[str, Any]: """ Define the state dictionary (pickling). """ return { 'connector_keys': self.connector_keys, 'metric_key': self.metric_key, 'location_key': self.location_key, 'parameters': self.parameters, 'mrsm_instance': self.instance_keys, } def __setstate__(self, _state: Dict[str, Any]): """ Read the state (unpickling). """ connector_keys = _state.pop('connector_keys') metric_key = _state.pop('metric_key') location_key = _state.pop('location_key') self.__init__(connector_keys, metric_key, location_key, **_state)
Instance variables
var attributes : Dict[str, Any]
-
Return a dictionary of a pipe's keys and parameters. These values are reflected directly from the pipes table of the instance.
Expand source code
@property def attributes(self) -> Dict[str, Any]: """ Return a dictionary of a pipe's keys and parameters. These values are reflected directly from the pipes table of the instance. """ import time from meerschaum.config import get_config from meerschaum.config._patch import apply_patch_to_config from meerschaum.utils.venv import Venv from meerschaum.connectors import get_connector_plugin timeout_seconds = get_config('pipes', 'attributes', 'local_cache_timeout_seconds') if '_attributes' not in self.__dict__: self._attributes = {} now = time.perf_counter() last_refresh = self.__dict__.get('_attributes_sync_time', None) timed_out = ( last_refresh is None or (timeout_seconds is not None and (now - last_refresh) >= timeout_seconds) ) if not self.temporary and timed_out: self._attributes_sync_time = now local_attributes = self.__dict__.get('_attributes', {}) with Venv(get_connector_plugin(self.instance_connector)): instance_attributes = self.instance_connector.get_pipe_attributes(self) self._attributes = apply_patch_to_config(instance_attributes, local_attributes) return self._attributes
var cache_connector : Union[meerschaum.connectors.sql.SQLConnector, None]
-
If the pipe was created with
cache=True
, return the connector to the pipe's SQLite database for caching.Expand source code
@property def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]: """ If the pipe was created with `cache=True`, return the connector to the pipe's SQLite database for caching. """ if not self._cache: return None if '_cache_connector' not in self.__dict__: from meerschaum.connectors import get_connector from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH _resources_path = SQLITE_RESOURCES_PATH self._cache_connector = get_connector( 'sql', '_cache_' + str(self), flavor='sqlite', database=str(_resources_path / ('_cache_' + str(self) + '.db')), ) return self._cache_connector
var cache_pipe : Union['Pipe', None]
-
If the pipe was created with
cache=True
, return anotherPipe
used to manage the local data.Expand source code
@property def cache_pipe(self) -> Union['meerschaum.Pipe', None]: """ If the pipe was created with `cache=True`, return another `meerschaum.Pipe` used to manage the local data. """ if self.cache_connector is None: return None if '_cache_pipe' not in self.__dict__: from meerschaum.config._patch import apply_patch_to_config from meerschaum.utils.sql import sql_item_name _parameters = copy.deepcopy(self.parameters) _fetch_patch = { 'fetch': ({ 'definition': ( f"SELECT * " + f"FROM {sql_item_name(str(self.target), self.instance_connector.flavor)}" ), }) if self.instance_connector.type == 'sql' else ({ 'connector_keys': self.connector_keys, 'metric_key': self.metric_key, 'location_key': self.location_key, }) } _parameters = apply_patch_to_config(_parameters, _fetch_patch) self._cache_pipe = Pipe( self.instance_keys, (self.connector_keys + '_' + self.metric_key + '_cache'), self.location_key, mrsm_instance = self.cache_connector, parameters = _parameters, cache = False, temporary = True, ) return self._cache_pipe
var children : List[Pipe]
-
Return a list of
Pipe
objects to be designated as children.Expand source code
@property def children(self) -> List[meerschaum.Pipe]: """ Return a list of `meerschaum.Pipe` objects to be designated as children. """ if 'children' not in self.parameters: return [] from meerschaum.utils.warnings import warn _children_keys = self.parameters['children'] if not isinstance(_children_keys, list): warn( f"Please ensure the children for {self} are defined as a list of keys.", stacklevel = 4 ) return [] from meerschaum import Pipe _children = [] for keys in _children_keys: try: p = Pipe(**keys) except Exception as e: warn(f"Unable to build parent with keys '{keys}' for {self}:\n{e}") continue _children.append(p) return _children
var columns : Optional[Dict[str, str]]
-
Return the
columns
dictionary defined inPipe.parameters
.Expand source code
@property def columns(self) -> Union[Dict[str, str], None]: """ Return the `columns` dictionary defined in `meerschaum.Pipe.parameters`. """ if 'columns' not in self.parameters: self.parameters['columns'] = {} cols = self.parameters['columns'] if not isinstance(cols, dict): cols = {} self.parameters['columns'] = cols return cols
var connector : Union[Connector, None]
-
The connector to the data source.
Expand source code
@property def connector(self) -> Union[meerschaum.connectors.Connector, None]: """ The connector to the data source. """ if '_connector' not in self.__dict__: from meerschaum.connectors.parse import parse_instance_keys import warnings with warnings.catch_warnings(): warnings.simplefilter('ignore') try: conn = parse_instance_keys(self.connector_keys) except Exception as e: conn = None if conn: self._connector = conn else: return None return self._connector
var dtypes : Optional[Dict[str, Any]]
-
If defined, return the
dtypes
dictionary defined inPipe.parameters
.Expand source code
@property def dtypes(self) -> Union[Dict[str, Any], None]: """ If defined, return the `dtypes` dictionary defined in `meerschaum.Pipe.parameters`. """ from meerschaum.config._patch import apply_patch_to_config configured_dtypes = self.parameters.get('dtypes', {}) remote_dtypes = self.infer_dtypes(persist=False) patched_dtypes = apply_patch_to_config(remote_dtypes, configured_dtypes) self.parameters['dtypes'] = patched_dtypes return self.parameters['dtypes']
var id : Optional[int]
-
Fetch and cache a pipe's ID.
Expand source code
@property def id(self) -> Union[int, None]: """ Fetch and cache a pipe's ID. """ if not ('_id' in self.__dict__ and self._id): self._id = self.get_id() return self._id
var instance_connector : Union[InstanceConnector, None]
-
The connector to where this pipe resides. May either be of type
meerschaum.connectors.sql.SQLConnector
ormeerschaum.connectors.api.APIConnector
.Expand source code
@property def instance_connector(self) -> Union[InstanceConnector, None]: """ The connector to where this pipe resides. May either be of type `meerschaum.connectors.sql.SQLConnector` or `meerschaum.connectors.api.APIConnector`. """ if '_instance_connector' not in self.__dict__: from meerschaum.connectors.parse import parse_instance_keys conn = parse_instance_keys(self.instance_keys) if conn: self._instance_connector = conn else: return None return self._instance_connector
var meta
-
Simulate the MetaPipe model without importing FastAPI.
Expand source code
@property def meta(self): """Simulate the MetaPipe model without importing FastAPI.""" if '_meta' not in self.__dict__: self._meta = { 'connector_keys' : self.connector_keys, 'metric_key' : self.metric_key, 'location_key' : self.location_key, 'instance' : self.instance_keys, } return self._meta
var parameters : Optional[Dict[str, Any]]
-
Return the parameters dictionary of the pipe.
Expand source code
@property def parameters(self) -> Optional[Dict[str, Any]]: """ Return the parameters dictionary of the pipe. """ if 'parameters' not in self.attributes: self.attributes['parameters'] = {} return self.attributes['parameters']
var parents : List[Pipe]
-
Return a list of
Pipe
objects to be designated as parents.Expand source code
@property def parents(self) -> List[meerschaum.Pipe]: """ Return a list of `meerschaum.Pipe` objects to be designated as parents. """ if 'parents' not in self.parameters: return [] from meerschaum.utils.warnings import warn _parents_keys = self.parameters['parents'] if not isinstance(_parents_keys, list): warn( f"Please ensure the parents for {self} are defined as a list of keys.", stacklevel = 4 ) return [] from meerschaum import Pipe _parents = [] for keys in _parents_keys: try: p = Pipe(**keys) except Exception as e: warn(f"Unable to build parent with keys '{keys}' for {self}:\n{e}") continue _parents.append(p) return _parents
var sync_time : Union['datetime.datetime', None]
-
Convenience function to get the pipe's latest datetime. Use
get_sync_time()
instead.Expand source code
@property def sync_time(self) -> Union['datetime.datetime', None]: """ Convenience function to get the pipe's latest datetime. Use `meerschaum.Pipe.get_sync_time()` instead. """ return self.get_sync_time()
-
If defined, return the
tags
list defined inPipe.parameters
.Expand source code
@property def tags(self) -> Union[List[str], None]: """ If defined, return the `tags` list defined in `meerschaum.Pipe.parameters`. """ if 'tags' not in self.parameters: self.parameters['tags'] = [] return self.parameters['tags']
var target : str
-
The target table name. You can set the target name under on of the following keys (checked in this order): -
target
-target_name
-target_table
-target_table_name
Expand source code
@property def target(self) -> str: """ The target table name. You can set the target name under on of the following keys (checked in this order): - `target` - `target_name` - `target_table` - `target_table_name` """ if 'target' not in self.parameters: target = self._target_legacy() potential_keys = ('target_name', 'target_table', 'target_table_name') for k in potential_keys: if k in self.parameters: target = self.parameters[k] break if self.instance_connector.type == 'sql': from meerschaum.utils.sql import truncate_item_name truncated_target = truncate_item_name(target, self.instance_connector.flavor) if truncated_target != target: warn( f"The target '{target}' is too long for '{self.instance_connector.flavor}', " + f"will use {truncated_target} instead." ) target = truncated_target self.target = target return self.parameters['target']
Methods
def bootstrap(self, debug: bool = False, yes: bool = False, force: bool = False, noask: bool = False, shell: bool = False, **kw) ‑> Tuple[bool, str]
-
Prompt the user to create a pipe's requirements all from one method. This method shouldn't be used in any automated scripts because it interactively prompts the user and therefore may hang.
Parameters
debug
:bool
, defaultFalse:
- Verbosity toggle.
yes
:bool
, defaultFalse:
- Print the questions and automatically agree.
force
:bool
, defaultFalse:
- Skip the questions and agree anyway.
noask
:bool
, defaultFalse:
- Print the questions but go with the default answer.
shell
:bool
, defaultFalse:
- Used to determine if we are in the interactive shell.
Returns
A
SuccessTuple
corresponding to the success of this procedure.Expand source code
def bootstrap( self, debug: bool = False, yes: bool = False, force: bool = False, noask: bool = False, shell: bool = False, **kw ) -> SuccessTuple: """ Prompt the user to create a pipe's requirements all from one method. This method shouldn't be used in any automated scripts because it interactively prompts the user and therefore may hang. Parameters ---------- debug: bool, default False: Verbosity toggle. yes: bool, default False: Print the questions and automatically agree. force: bool, default False: Skip the questions and agree anyway. noask: bool, default False: Print the questions but go with the default answer. shell: bool, default False: Used to determine if we are in the interactive shell. Returns ------- A `SuccessTuple` corresponding to the success of this procedure. """ from meerschaum.utils.warnings import warn, info, error from meerschaum.utils.prompt import prompt, yes_no from meerschaum.utils.formatting import pprint from meerschaum.config import get_config from meerschaum.utils.formatting._shell import clear_screen from meerschaum.utils.formatting import print_tuple from meerschaum.actions import actions from meerschaum.utils.venv import Venv from meerschaum.connectors import get_connector_plugin _clear = get_config('shell', 'clear_screen', patch=True) if self.get_id(debug=debug) is not None: delete_tuple = self.delete(debug=debug) if not delete_tuple[0]: return delete_tuple if _clear: clear_screen(debug=debug) _parameters = _get_parameters(self, debug=debug) self.parameters = _parameters pprint(self.parameters) try: prompt( f"\n Press [Enter] to register {self} with the above configuration:", icon = False ) except KeyboardInterrupt as e: return False, f"Aborting bootstrapping {self}." with Venv(get_connector_plugin(self.instance_connector)): register_tuple = self.instance_connector.register_pipe(self, debug=debug) if not register_tuple[0]: return register_tuple if _clear: clear_screen(debug=debug) try: if yes_no( f"Would you like to edit the definition for {self}?", yes=yes, noask=noask ): edit_tuple = self.edit_definition(debug=debug) if not edit_tuple[0]: return edit_tuple if yes_no(f"Would you like to try syncing {self} now?", yes=yes, noask=noask): sync_tuple = actions['sync']( ['pipes'], connector_keys = [self.connector_keys], metric_keys = [self.metric_key], location_keys = [self.location_key], mrsm_instance = str(self.instance_connector), debug = debug, shell = shell, ) if not sync_tuple[0]: return sync_tuple except Exception as e: return False, f"Failed to bootstrap {self}:\n" + str(e) print_tuple((True, f"Finished bootstrapping {self}!")) info( f"You can edit this pipe later with `edit pipes` " + "or set the definition with `edit pipes definition`.\n" + " To sync data into your pipe, run `sync pipes`." ) return True, "Success"
def clear(self, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw: Any) ‑> SuccessTuple
-
Call the Pipe's instance connector's
clear_pipe
method.Parameters
begin
:Optional[datetime.datetime]
, defaultNone:
- If provided, only remove rows newer than this datetime value.
end
:Optional[datetime.datetime]
, defaultNone:
- If provided, only remove rows older than this datetime column (not including end).
params
:Optional[Dict[str, Any]]
, defaultNone
- See
build_where()
. debug
:bool
, defaultFalse:
- Verbositity toggle.
Returns
A
SuccessTuple
corresponding to whether this procedure completed successfully.Examples
>>> pipe = mrsm.Pipe('test', 'test', columns={'datetime': 'dt'}, instance='sql:local') >>> pipe.sync({'dt': [datetime.datetime(2020, 1, 1, 0, 0)]}) >>> pipe.sync({'dt': [datetime.datetime(2021, 1, 1, 0, 0)]}) >>> pipe.sync({'dt': [datetime.datetime(2022, 1, 1, 0, 0)]}) >>> >>> pipe.clear(begin=datetime.datetime(2021, 1, 1, 0, 0)) >>> pipe.get_data() dt 0 2020-01-01
Expand source code
def clear( self, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw: Any ) -> SuccessTuple: """ Call the Pipe's instance connector's `clear_pipe` method. Parameters ---------- begin: Optional[datetime.datetime], default None: If provided, only remove rows newer than this datetime value. end: Optional[datetime.datetime], default None: If provided, only remove rows older than this datetime column (not including end). params: Optional[Dict[str, Any]], default None See `meerschaum.utils.sql.build_where`. debug: bool, default False: Verbositity toggle. Returns ------- A `SuccessTuple` corresponding to whether this procedure completed successfully. Examples -------- >>> pipe = mrsm.Pipe('test', 'test', columns={'datetime': 'dt'}, instance='sql:local') >>> pipe.sync({'dt': [datetime.datetime(2020, 1, 1, 0, 0)]}) >>> pipe.sync({'dt': [datetime.datetime(2021, 1, 1, 0, 0)]}) >>> pipe.sync({'dt': [datetime.datetime(2022, 1, 1, 0, 0)]}) >>> >>> pipe.clear(begin=datetime.datetime(2021, 1, 1, 0, 0)) >>> pipe.get_data() dt 0 2020-01-01 """ from meerschaum.utils.warnings import warn from meerschaum.utils.venv import Venv from meerschaum.connectors import get_connector_plugin if self.cache_pipe is not None: success, msg = self.cache_pipe.clear(begin=begin, end=end, debug=debug, **kw) if not success: warn(msg) with Venv(get_connector_plugin(self.instance_connector)): return self.instance_connector.clear_pipe( self, begin=begin, end=end, params=params, debug=debug, **kw )
def delete(self, debug: bool = False, **kw) ‑> Tuple[bool, str]
-
Call the Pipe's instance connector's
delete_pipe()
method.Parameters
debug
:bool
, defaultFalse:
- Verbosity toggle.
Returns
A
SuccessTuple
of success (bool
), message (str
).Expand source code
def delete( self, debug: bool = False, **kw ) -> SuccessTuple: """ Call the Pipe's instance connector's `delete_pipe()` method. Parameters ---------- debug : bool, default False: Verbosity toggle. Returns ------- A `SuccessTuple` of success (`bool`), message (`str`). """ import os, pathlib from meerschaum.utils.warnings import warn from meerschaum.utils.venv import Venv from meerschaum.connectors import get_connector_plugin if self.temporary: return ( False, "Cannot delete pipes created with `temporary=True` (read-only). " + "You may want to call `pipe.drop()` instead." ) if self.cache_pipe is not None: _drop_cache_tuple = self.cache_pipe.drop(debug=debug, **kw) if not _drop_cache_tuple[0]: warn(_drop_cache_tuple[1]) if getattr(self.cache_connector, 'flavor', None) == 'sqlite': _cache_db_path = pathlib.Path(self.cache_connector.database) try: os.remove(_cache_db_path) except Exception as e: warn(f"Could not delete cache file '{_cache_db_path}' for {self}:\n{e}") with Venv(get_connector_plugin(self.instance_connector)): result = self.instance_connector.delete_pipe(self, debug=debug, **kw) if not isinstance(result, tuple): return False, f"Received unexpected result from '{self.instance_connector}': {result}" if result[0]: to_delete = ['_id'] for member in to_delete: if member in self.__dict__: del self.__dict__[member] return result
def drop(self, debug: bool = False, **kw: Any) ‑> Tuple[bool, str]
-
Call the Pipe's instance connector's
drop_pipe()
methodParameters
debug
:bool
, defaultFalse:
- Verbosity toggle.
Returns
A
SuccessTuple
of success, message.Expand source code
def drop( self, debug: bool = False, **kw : Any ) -> SuccessTuple: """ Call the Pipe's instance connector's `drop_pipe()` method Parameters ---------- debug: bool, default False: Verbosity toggle. Returns ------- A `SuccessTuple` of success, message. """ self._exists = False from meerschaum.utils.warnings import warn from meerschaum.utils.venv import Venv from meerschaum.connectors import get_connector_plugin if self.cache_pipe is not None: _drop_cache_tuple = self.cache_pipe.drop(debug=debug, **kw) if not _drop_cache_tuple[0]: warn(_drop_cache_tuple[1]) with Venv(get_connector_plugin(self.instance_connector)): result = self.instance_connector.drop_pipe(self, debug=debug, **kw) return result
def edit(self, patch: bool = False, interactive: bool = False, debug: bool = False, **kw: Any) ‑> Tuple[bool, str]
-
Edit a Pipe's configuration.
Parameters
patch
:bool
, defaultFalse
- If
patch
is True, update parameters by cascading rather than overwriting. interactive
:bool
, defaultFalse
- If
True
, open an editor for the user to make changes to the pipe's YAML file. debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A
SuccessTuple
of success, message.Expand source code
def edit( self, patch: bool = False, interactive: bool = False, debug: bool = False, **kw: Any ) -> SuccessTuple: """ Edit a Pipe's configuration. Parameters ---------- patch: bool, default False If `patch` is True, update parameters by cascading rather than overwriting. interactive: bool, default False If `True`, open an editor for the user to make changes to the pipe's YAML file. debug: bool, default False Verbosity toggle. Returns ------- A `SuccessTuple` of success, message. """ from meerschaum.utils.venv import Venv from meerschaum.connectors import get_connector_plugin if self.temporary: return False, "Cannot edit pipes created with `temporary=True` (read-only)." if not interactive: with Venv(get_connector_plugin(self.instance_connector)): return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw) from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH from meerschaum.utils.misc import edit_file parameters_filename = str(self) + '.yaml' parameters_path = PIPES_CACHE_RESOURCES_PATH / parameters_filename from meerschaum.utils.yaml import yaml edit_text = f"Edit the parameters for {self}" edit_top = '#' * (len(edit_text) + 4) edit_header = edit_top + f'\n# {edit_text} #\n' + edit_top + '\n\n' from meerschaum.config import get_config parameters = dict(get_config('pipes', 'parameters', patch=True)) from meerschaum.config._patch import apply_patch_to_config parameters = apply_patch_to_config(parameters, self.parameters) ### write parameters to yaml file with open(parameters_path, 'w+') as f: f.write(edit_header) yaml.dump(parameters, stream=f, sort_keys=False) ### only quit editing if yaml is valid editing = True while editing: edit_file(parameters_path) try: with open(parameters_path, 'r') as f: file_parameters = yaml.load(f.read()) except Exception as e: from meerschaum.utils.warnings import warn warn(f"Invalid format defined for '{self}':\n\n{e}") input(f"Press [Enter] to correct the configuration for '{self}': ") else: editing = False self.parameters = file_parameters if debug: from meerschaum.utils.formatting import pprint pprint(self.parameters) with Venv(get_connector_plugin(self.instance_connector)): return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw)
def edit_definition(self, yes: bool = False, noask: bool = False, force: bool = False, debug: bool = False, **kw: Any) ‑> Tuple[bool, str]
-
Edit a pipe's definition file and update its configuration. NOTE: This function is interactive and should not be used in automated scripts!
Returns
A
SuccessTuple
of success, message.Expand source code
def edit_definition( self, yes: bool = False, noask: bool = False, force: bool = False, debug : bool = False, **kw : Any ) -> SuccessTuple: """ Edit a pipe's definition file and update its configuration. **NOTE:** This function is interactive and should not be used in automated scripts! Returns ------- A `SuccessTuple` of success, message. """ if self.temporary: return False, "Cannot edit pipes created with `temporary=True` (read-only)." from meerschaum.connectors import instance_types if (self.connector is None) or self.connector.type not in instance_types: return self.edit(interactive=True, debug=debug, **kw) import json from meerschaum.utils.warnings import info, warn from meerschaum.utils.debug import dprint from meerschaum.config._patch import apply_patch_to_config from meerschaum.utils.misc import edit_file _parameters = self.parameters if 'fetch' not in _parameters: _parameters['fetch'] = {} def _edit_api(): from meerschaum.utils.prompt import prompt, yes_no info( f"Please enter the keys of the source pipe from '{self.connector}'.\n" + "Type 'None' for None, or empty when there is no default. Press [CTRL+C] to skip." ) _keys = { 'connector_keys' : None, 'metric_key' : None, 'location_key' : None } for k in _keys: _keys[k] = _parameters['fetch'].get(k, None) for k, v in _keys.items(): try: _keys[k] = prompt(k.capitalize().replace('_', ' ') + ':', icon=True, default=v) except KeyboardInterrupt: continue if _keys[k] in ('', 'None', '\'None\'', '[None]'): _keys[k] = None _parameters['fetch'] = apply_patch_to_config(_parameters['fetch'], _keys) info("You may optionally specify additional filter parameters as JSON.") print(" Parameters are translated into a 'WHERE x AND y' clause, and lists are IN clauses.") print(" For example, the following JSON would correspond to 'WHERE x = 1 AND y IN (2, 3)':") print(json.dumps({'x': 1, 'y': [2, 3]}, indent=2, separators=(',', ': '))) if force or yes_no( "Would you like to add additional filter parameters?", yes=yes, noask=noask ): from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH definition_filename = str(self) + '.json' definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename try: definition_path.touch() with open(definition_path, 'w+') as f: json.dump(_parameters.get('fetch', {}).get('params', {}), f, indent=2) except Exception as e: return False, f"Failed writing file '{definition_path}':\n" + str(e) _params = None while True: edit_file(definition_path) try: with open(definition_path, 'r') as f: _params = json.load(f) except Exception as e: warn(f'Failed to read parameters JSON:\n{e}', stack=False) if force or yes_no( "Would you like to try again?\n " + "If not, the parameters JSON file will be ignored.", noask=noask, yes=yes ): continue _params = None break if _params is not None: if 'fetch' not in _parameters: _parameters['fetch'] = {} _parameters['fetch']['params'] = _params self.parameters = _parameters return True, "Success" def _edit_sql(): import pathlib, os, textwrap from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH from meerschaum.utils.misc import edit_file definition_filename = str(self) + '.sql' definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename sql_definition = _parameters['fetch'].get('definition', None) if sql_definition is None: sql_definition = '' sql_definition = textwrap.dedent(sql_definition).lstrip() try: definition_path.touch() with open(definition_path, 'w+') as f: f.write(sql_definition) except Exception as e: return False, f"Failed writing file '{definition_path}':\n" + str(e) edit_file(definition_path) try: with open(definition_path, 'r') as f: file_definition = f.read() except Exception as e: return False, f"Failed reading file '{definition_path}':\n" + str(e) if sql_definition == file_definition: return False, f"No changes made to definition for {self}." if ' ' not in file_definition: return False, f"Invalid SQL definition for {self}." if debug: dprint("Read SQL definition:\n\n" + file_definition) _parameters['fetch']['definition'] = file_definition self.parameters = _parameters return True, "Success" locals()['_edit_' + str(self.connector.type)]() return self.edit(interactive=False, debug=debug, **kw)
def enforce_dtypes(self, df: "'pd.DataFrame'", debug: bool = False) ‑> 'pd.DataFrame'
-
Cast the input dataframe to the pipe's registered data types. If the pipe does not exist and dtypes are not set, return the dataframe.
Expand source code
def enforce_dtypes(self, df: 'pd.DataFrame', debug: bool=False) -> 'pd.DataFrame': """ Cast the input dataframe to the pipe's registered data types. If the pipe does not exist and dtypes are not set, return the dataframe. """ from meerschaum.utils.warnings import warn from meerschaum.utils.debug import dprint from meerschaum.utils.misc import parse_df_datetimes, enforce_dtypes as _enforce_dtypes from meerschaum.utils.packages import import_pandas pd = import_pandas(debug=debug) if df is None: if debug: dprint( f"Received None instead of a DataFrame.\n" + " Skipping dtype enforcement..." ) return df pipe_dtypes = self.dtypes try: if isinstance(df, str): df = parse_df_datetimes( pd.read_json(df), ignore_cols = [ col for col, dtype in pipe_dtypes.items() if 'datetime' not in str(dtype) ], debug = debug, ) else: df = parse_df_datetimes( df, ignore_cols = [ col for col, dtype in pipe_dtypes.items() if 'datetime' not in str(dtype) ], debug = debug, ) except Exception as e: warn(f"Unable to cast incoming data as a DataFrame...:\n{e}") return df if not pipe_dtypes: if debug: dprint( f"Could not find dtypes for {self}.\n" + " Skipping dtype enforcement..." ) return df return _enforce_dtypes(df, pipe_dtypes, debug=debug)
def exists(self, debug: bool = False) ‑> bool
-
See if a Pipe's table exists.
Parameters
debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A
bool
corresponding to whether a pipe's underlying table exists.Expand source code
def exists( self, debug : bool = False ) -> bool: """ See if a Pipe's table exists. Parameters ---------- debug: bool, default False Verbosity toggle. Returns ------- A `bool` corresponding to whether a pipe's underlying table exists. """ import time from meerschaum.utils.venv import Venv from meerschaum.connectors import get_connector_plugin from meerschaum.config import STATIC_CONFIG from meerschaum.utils.debug import dprint now = time.perf_counter() exists_timeout_seconds = STATIC_CONFIG['pipes']['exists_timeout_seconds'] _exists = self.__dict__.get('_exists', None) if _exists: exists_timestamp = self.__dict__.get('_exists_timestamp', None) if exists_timestamp is not None: delta = now - exists_timestamp if delta < exists_timeout_seconds: if debug: dprint(f"Returning cached `exists` for {self} ({round(delta, 2)} seconds old).") return _exists with Venv(get_connector_plugin(self.instance_connector)): _exists = self.instance_connector.pipe_exists(pipe=self, debug=debug) self.__dict__['_exists'] = _exists self.__dict__['_exists_timestamp'] = now return _exists
def fetch(self, begin: Optional[datetime.datetime, str] = '', end: Optional[datetime.datetime] = None, sync_chunks: bool = False, deactivate_plugin_venv: bool = True, debug: bool = False, **kw: Any) ‑> 'pd.DataFrame or None'
-
Fetch a Pipe's latest data from its connector.
Parameters
begin
:Optional[datetime.datetime, str]
, default'':
- If provided, only fetch data newer than or equal to
begin
. end
:Optional[datetime.datetime]
, defaultNone:
- If provided, only fetch data older than or equal to
end
. sync_chunks
:bool
, defaultFalse
- If
True
and the pipe's connector is of type'sql'
, begin syncing chunks while fetching loads chunks into memory. debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A
pd.DataFrame
of the newest unseen data.Expand source code
def fetch( self, begin: Optional[datetime.datetime, str] = '', end: Optional[datetime.datetime] = None, sync_chunks: bool = False, deactivate_plugin_venv: bool = True, debug: bool = False, **kw: Any ) -> 'pd.DataFrame or None': """ Fetch a Pipe's latest data from its connector. Parameters ---------- begin: Optional[datetime.datetime, str], default '': If provided, only fetch data newer than or equal to `begin`. end: Optional[datetime.datetime], default None: If provided, only fetch data older than or equal to `end`. sync_chunks: bool, default False If `True` and the pipe's connector is of type `'sql'`, begin syncing chunks while fetching loads chunks into memory. debug: bool, default False Verbosity toggle. Returns ------- A `pd.DataFrame` of the newest unseen data. """ if 'fetch' not in dir(self.connector): from meerschaum.utils.warnings import warn warn(f"No `fetch()` function defined for connector '{self.connector}'") return None from meerschaum.connectors import custom_types from meerschaum.utils.debug import dprint, _checkpoint if ( self.connector.type == 'plugin' or self.connector.type in custom_types ): from meerschaum.plugins import Plugin from meerschaum.utils.packages import activate_venv, deactivate_venv plugin_name = ( self.connector.label if self.connector.type == 'plugin' else self.connector.__module__.replace('plugins.', '').split('.')[0] ) connector_plugin = Plugin(plugin_name) connector_plugin.activate_venv(debug=debug) _chunk_hook = kw.pop('chunk_hook', None) if sync_chunks and _chunk_hook is None: def _chunk_hook(chunk, **_kw) -> SuccessTuple: """ Wrap `Pipe.sync()` with a custom chunk label prepended to the message. """ from meerschaum.config._patch import apply_patch_to_config kwargs = apply_patch_to_config(kw, _kw) chunk_success, chunk_message = self.sync(chunk, **kwargs) chunk_label = self._get_chunk_label(chunk, self.columns.get('datetime', None)) if chunk_label: chunk_message = '\n' + chunk_label + '\n' + chunk_message return chunk_success, chunk_message workers = kw.get('workers', None) if workers is None and not getattr(self.instance_connector, 'IS_THREAD_SAFE', False): workers = 1 kw['workers'] = workers df = self.connector.fetch( self, begin = begin, end = end, chunk_hook = _chunk_hook, debug = debug, **kw ) if ( self.connector.type == 'plugin' or self.connector.type in custom_types ): connector_plugin.deactivate_venv(debug=debug) return df
def filter_existing(self, df: "'pd.DataFrame'", chunksize: Optional[int] = -1, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw) ‑> Tuple['pd.DataFrame', 'pd.DataFrame', 'pd.DataFrame']
-
Inspect a dataframe and filter out rows which already exist in the pipe.
Parameters
df
:'pd.DataFrame'
- The dataframe to inspect and filter.
chunksize
:Optional[int]
, default-1
- The
chunksize
used when fetching existing data. params
:Optional[Dict[str, Any]]
, defaultNone
- If provided, use this filter when searching for existing data.
debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A tuple of three pandas DataFrames: unseen, update, and delta.
Expand source code
def filter_existing( self, df: 'pd.DataFrame', chunksize: Optional[int] = -1, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw ) -> Tuple['pd.DataFrame', 'pd.DataFrame', 'pd.DataFrame']: """ Inspect a dataframe and filter out rows which already exist in the pipe. Parameters ---------- df: 'pd.DataFrame' The dataframe to inspect and filter. chunksize: Optional[int], default -1 The `chunksize` used when fetching existing data. params: Optional[Dict[str, Any]], default None If provided, use this filter when searching for existing data. debug: bool, default False Verbosity toggle. Returns ------- A tuple of three pandas DataFrames: unseen, update, and delta. """ import datetime from meerschaum.utils.warnings import warn from meerschaum.utils.debug import dprint from meerschaum.utils.packages import attempt_import, import_pandas from meerschaum.utils.misc import ( round_time, filter_unseen_df, add_missing_cols_to_df, to_pandas_dtype, get_unhashable_cols, ) pd = import_pandas() if not isinstance(df, pd.DataFrame): df = self.enforce_dtypes(df, debug=debug) if df.empty: return df, df, df ### begin is the oldest data in the new dataframe begin, end = None, None dt_col = self.columns.get('datetime', None) dt_type = self.dtypes.get(dt_col, 'datetime64[ns]') if dt_col else None try: min_dt_val = df[dt_col].min(skipna=True) if dt_col else None min_dt = ( pd.to_datetime(min_dt_val).to_pydatetime() if min_dt_val is not None and 'datetime' in str(dt_type) else min_dt_val ) except Exception as e: min_dt = None if not ('datetime' in str(type(min_dt))) or str(min_dt) == 'NaT': if 'int' not in str(type(min_dt)).lower(): min_dt = None if isinstance(min_dt, datetime.datetime): begin = ( round_time( min_dt, to = 'down' ) - datetime.timedelta(minutes=1) ) elif dt_type and 'int' in dt_type.lower(): begin = min_dt elif dt_col is None: begin = None ### end is the newest data in the new dataframe try: max_dt_val = df[dt_col].max(skipna=True) if dt_col else None max_dt = ( pd.to_datetime(max_dt_val).to_pydatetime() if max_dt_val is not None and 'datetime' in str(dt_type) else max_dt_val ) except Exception as e: import traceback traceback.print_exc() max_dt = None if not ('datetime' in str(type(max_dt))) or str(min_dt) == 'NaT': if 'int' not in str(type(max_dt)).lower(): max_dt = None if isinstance(max_dt, datetime.datetime): end = ( round_time( max_dt, to = 'down' ) + datetime.timedelta(minutes=1) ) elif dt_type and 'int' in dt_type.lower(): end = max_dt + 1 if max_dt is not None and min_dt is not None and min_dt > max_dt: warn(f"Detected minimum datetime greater than maximum datetime.") if begin is not None and end is not None and begin > end: if isinstance(begin, datetime.datetime): begin = end - datetime.timedelta(minutes=1) ### We might be using integers for out datetime axis. else: begin = end - 1 if debug: dprint(f"Looking at data between '{begin}' and '{end}'.", **kw) backtrack_df = self.get_data( begin = begin, end = end, chunksize = chunksize, params = params, debug = debug, **kw ) if debug: dprint("Existing data:\n" + str(backtrack_df), **kw) dprint("Existing dtypes:\n" + str(backtrack_df.dtypes)) ### Separate new rows from changed ones. on_cols = [ col for col_key, col in self.columns.items() if ( col and col_key != 'value' and col in backtrack_df.columns ) ] on_cols_dtypes = { col: to_pandas_dtype(typ) for col, typ in self.dtypes.items() if col in on_cols } ### Detect changes between the old target and new source dataframes. delta_df = add_missing_cols_to_df( filter_unseen_df( backtrack_df, df, dtypes = { col: to_pandas_dtype(typ) for col, typ in self.dtypes.items() }, debug = debug ), on_cols_dtypes, ) ### Cast dicts or lists to strings so we can merge. unhashable_delta_cols = get_unhashable_cols(delta_df) unhashable_backtrack_cols = get_unhashable_cols(backtrack_df) for col in unhashable_delta_cols: delta_df[col] = delta_df[col].apply(json.dumps) for col in unhashable_backtrack_cols: backtrack_df[col] = backtrack_df[col].apply(json.dumps) casted_cols = set(unhashable_delta_cols + unhashable_backtrack_cols) joined_df = pd.merge( delta_df, backtrack_df, how = 'left', on = on_cols, indicator = True, suffixes = ('', '_old'), ) if on_cols else delta_df for col in casted_cols: if col in joined_df.columns: joined_df[col] = joined_df[col].apply( lambda x: ( json.loads(x) if isinstance(x, str) else x ) ) ### Determine which rows are completely new. new_rows_mask = (joined_df['_merge'] == 'left_only') if on_cols else None cols = list(backtrack_df.columns) unseen_df = ( joined_df .where(new_rows_mask) .dropna(how='all')[cols] .reset_index(drop=True) ) if on_cols else delta_df ### Rows that have already been inserted but values have changed. update_df = ( joined_df .where(~new_rows_mask) .dropna(how='all')[cols] .reset_index(drop=True) ) if on_cols else None return unseen_df, update_df, delta_df
def get_backtrack_data(self, backtrack_minutes: int = 0, begin: "Optional['datetime.datetime']" = None, fresh: bool = False, debug: bool = False, **kw: Any) ‑> Optional['pd.DataFrame']
-
Get the most recent data from the instance connector as a Pandas DataFrame.
Parameters
backtrack_minutes
:int
, default0
- How many minutes from
begin
to select from. Defaults to 0. This may return a few rows due to a rounding quirk. begin
:Optional[datetime.datetime]
, defaultNone
- The starting point to search for data.
If begin is
None
(default), use the most recent observed datetime (AKA sync_time).
E.g. begin = 02:00 Search this region. Ignore this, even if there's data. / / / / / / / / / | -----|----------|----------|----------|----------|----------| 00:00 01:00 02:00 03:00 04:00 05:00
fresh
:bool
, defaultFalse
- If
True
, Ignore local cache and pull directly from the instance connector. Only comes into effect if a pipe was created withcache=True
. debug
:bool default False
- Verbosity toggle.
Returns
A
pd.DataFrame
for the pipe's data corresponding to the provided parameters. Backtrack data is a convenient way to get a pipe's data "backtracked" from the most recent datetime.Expand source code
def get_backtrack_data( self, backtrack_minutes: int = 0, begin: Optional['datetime.datetime'] = None, fresh: bool = False, debug: bool = False, **kw: Any ) -> Optional['pd.DataFrame']: """ Get the most recent data from the instance connector as a Pandas DataFrame. Parameters ---------- backtrack_minutes: int, default 0 How many minutes from `begin` to select from. Defaults to 0. This may return a few rows due to a rounding quirk. begin: Optional[datetime.datetime], default None The starting point to search for data. If begin is `None` (default), use the most recent observed datetime (AKA sync_time). ``` E.g. begin = 02:00 Search this region. Ignore this, even if there's data. / / / / / / / / / | -----|----------|----------|----------|----------|----------| 00:00 01:00 02:00 03:00 04:00 05:00 ``` fresh: bool, default False If `True`, Ignore local cache and pull directly from the instance connector. Only comes into effect if a pipe was created with `cache=True`. debug: bool default False Verbosity toggle. Returns ------- A `pd.DataFrame` for the pipe's data corresponding to the provided parameters. Backtrack data is a convenient way to get a pipe's data "backtracked" from the most recent datetime. """ from meerschaum.utils.warnings import warn from meerschaum.utils.venv import Venv from meerschaum.connectors import get_connector_plugin kw.update({'backtrack_minutes': backtrack_minutes, 'begin': begin,}) if not self.exists(debug=debug): return None if self.cache_pipe is not None: if not fresh: _sync_cache_tuple = self.cache_pipe.sync(debug=debug, **kw) if not _sync_cache_tuple[0]: warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1]) fresh = True else: ### Successfully synced cache. return self.enforce_dtypes( self.cache_pipe.get_backtrack_data(debug=debug, fresh=True, **kw), debug = debug, ) ### If `fresh` or the syncing failed, directly pull from the instance connector. with Venv(get_connector_plugin(self.instance_connector)): return self.enforce_dtypes( self.instance_connector.get_backtrack_data( pipe = self, debug = debug, **kw ), debug = debug, )
def get_columns(self, *args: str, error: bool = False) ‑> Union[str, Tuple[str]]
-
Check if the requested columns are defined.
Parameters
*args
:str
- The column names to be retrieved.
error
:bool
, defaultFalse
- If
True
, raise anException
if the specified column is not defined.
Returns
A tuple of the same size of
args
or astr
ifargs
is a single argument.Examples
>>> pipe = mrsm.Pipe('test', 'test') >>> pipe.columns = {'datetime': 'dt', 'id': 'id'} >>> pipe.get_columns('datetime', 'id') ('dt', 'id') >>> pipe.get_columns('value', error=True) Exception: 🛑 Missing 'value' column for Pipe('test', 'test').
Expand source code
def get_columns(self, *args: str, error: bool = False) -> Union[str, Tuple[str]]: """ Check if the requested columns are defined. Parameters ---------- *args: str The column names to be retrieved. error: bool, default False If `True`, raise an `Exception` if the specified column is not defined. Returns ------- A tuple of the same size of `args` or a `str` if `args` is a single argument. Examples -------- >>> pipe = mrsm.Pipe('test', 'test') >>> pipe.columns = {'datetime': 'dt', 'id': 'id'} >>> pipe.get_columns('datetime', 'id') ('dt', 'id') >>> pipe.get_columns('value', error=True) Exception: 🛑 Missing 'value' column for Pipe('test', 'test'). """ from meerschaum.utils.warnings import error as _error, warn if not args: args = tuple(self.columns.keys()) col_names = [] for col in args: col_name = None try: col_name = self.columns[col] if col_name is None and error: _error(f"Please define the name of the '{col}' column for {self}.") except Exception as e: col_name = None if col_name is None and error: _error(f"Missing '{col}'" + f" column for {self}.") col_names.append(col_name) if len(col_names) == 1: return col_names[0] return tuple(col_names)
def get_columns_types(self, debug: bool = False) ‑> Optional[Dict[str, str]]
-
Get a dictionary of a pipe's column names and their types.
Parameters
debug
:bool
, defaultFalse:
- Verbosity toggle.
Returns
A dictionary of column names (
str
) to column types (str
).Examples
>>> pipe.get_columns_types() { 'dt': 'TIMESTAMP WITHOUT TIMEZONE', 'id': 'BIGINT', 'val': 'DOUBLE PRECISION', } >>>
Expand source code
def get_columns_types(self, debug: bool = False) -> Union[Dict[str, str], None]: """ Get a dictionary of a pipe's column names and their types. Parameters ---------- debug: bool, default False: Verbosity toggle. Returns ------- A dictionary of column names (`str`) to column types (`str`). Examples -------- >>> pipe.get_columns_types() { 'dt': 'TIMESTAMP WITHOUT TIMEZONE', 'id': 'BIGINT', 'val': 'DOUBLE PRECISION', } >>> """ from meerschaum.utils.venv import Venv from meerschaum.connectors import get_connector_plugin with Venv(get_connector_plugin(self.instance_connector)): return self.instance_connector.get_pipe_columns_types(self, debug=debug)
def get_data(self, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, params: Optional[Dict[str, Any]] = None, as_iterator: bool = False, as_chunks: bool = False, chunk_interval: Union[datetime.datetime, int, None] = None, fresh: bool = False, debug: bool = False, **kw: Any) ‑> Union['pd.DataFrame', Generator['pd.DataFrame'], None]
-
Get a pipe's data from the instance connector.
Parameters
begin
:Optional[datetime.datetime]
, defaultNone
- Lower bound datetime to begin searching for data (inclusive).
Translates to a
WHERE
clause likeWHERE datetime >= begin
. Defaults toNone
. end
:Optional[datetime.datetime]
, defaultNone
- Upper bound datetime to stop searching for data (inclusive).
Translates to a
WHERE
clause likeWHERE datetime < end
. Defaults toNone
. params
:Optional[Dict[str, Any]]
, defaultNone
- Filter the retrieved data by a dictionary of parameters.
See
build_where()
for more details. as_iterator
:bool
, defaultFalse
- If
True
, return a generator of chunks of pipe data. as_chunks
:bool
, defaultFalse
- Alias for
as_iterator
. chunk_interval
:int
, defaultNone
- If
as_iterator
, then return chunks withbegin
andend
separated by this interval. By default, use a timedelta of 1 day. If thedatetime
axis is an integer, default to the configured chunksize. Note that becauseend
is always non-inclusive, there will bechunk_interval - 1
rows per chunk for integers. fresh
:bool
, defaultTrue
- If
True
, skip local cache and directly query the instance connector. Defaults toTrue
. debug
:bool
, defaultFalse
- Verbosity toggle.
Defaults to
False
.
Returns
A
pd.DataFrame
for the pipe's data corresponding to the provided parameters.Expand source code
def get_data( self, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, params: Optional[Dict[str, Any]] = None, as_iterator: bool = False, as_chunks: bool = False, chunk_interval: Union[datetime.datetime, int, None] = None, fresh: bool = False, debug: bool = False, **kw: Any ) -> Union['pd.DataFrame', Generator['pd.DataFrame'], None]: """ Get a pipe's data from the instance connector. Parameters ---------- begin: Optional[datetime.datetime], default None Lower bound datetime to begin searching for data (inclusive). Translates to a `WHERE` clause like `WHERE datetime >= begin`. Defaults to `None`. end: Optional[datetime.datetime], default None Upper bound datetime to stop searching for data (inclusive). Translates to a `WHERE` clause like `WHERE datetime < end`. Defaults to `None`. params: Optional[Dict[str, Any]], default None Filter the retrieved data by a dictionary of parameters. See `meerschaum.utils.sql.build_where` for more details. as_iterator: bool, default False If `True`, return a generator of chunks of pipe data. as_chunks: bool, default False Alias for `as_iterator`. chunk_interval: int, default None If `as_iterator`, then return chunks with `begin` and `end` separated by this interval. By default, use a timedelta of 1 day. If the `datetime` axis is an integer, default to the configured chunksize. Note that because `end` is always non-inclusive, there will be `chunk_interval - 1` rows per chunk for integers. fresh: bool, default True If `True`, skip local cache and directly query the instance connector. Defaults to `True`. debug: bool, default False Verbosity toggle. Defaults to `False`. Returns ------- A `pd.DataFrame` for the pipe's data corresponding to the provided parameters. """ from meerschaum.utils.warnings import warn from meerschaum.utils.venv import Venv from meerschaum.connectors import get_connector_plugin from meerschaum.utils.misc import iterate_chunks from meerschaum.config import get_config kw.update({'begin': begin, 'end': end, 'params': params,}) as_iterator = as_iterator or as_chunks if as_iterator or as_chunks: return self._get_data_as_iterator( begin = begin, end = end, params = params, chunk_interval = chunk_interval, fresh = fresh, debug = debug, ) if not self.exists(debug=debug): return None if self.cache_pipe is not None: if not fresh: _sync_cache_tuple = self.cache_pipe.sync(debug=debug, **kw) if not _sync_cache_tuple[0]: warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1]) fresh = True else: ### Successfully synced cache. return self.enforce_dtypes( self.cache_pipe.get_data(debug=debug, fresh=True, **kw), debug = debug, ) ### If `fresh` or the syncing failed, directly pull from the instance connector. with Venv(get_connector_plugin(self.instance_connector)): return self.enforce_dtypes( self.instance_connector.get_pipe_data( pipe = self, debug = debug, **kw ), debug = debug, )
def get_id(self, **kw: Any) ‑> Optional[int]
-
Fetch a pipe's ID from its instance connector. If the pipe does not exist, return
None
.Expand source code
def get_id(self, **kw: Any) -> Union[int, None]: """ Fetch a pipe's ID from its instance connector. If the pipe does not exist, return `None`. """ if self.temporary: return None from meerschaum.utils.venv import Venv from meerschaum.connectors import get_connector_plugin with Venv(get_connector_plugin(self.instance_connector)): return self.instance_connector.get_pipe_id(self, **kw)
def get_indices(self) ‑> Dict[str, str]
-
Return a dictionary in the form of
pipe.columns
but map to index names.Expand source code
def get_indices(self) -> Dict[str, str]: """ Return a dictionary in the form of `pipe.columns` but map to index names. """ return { ix: (self.target + '_' + col + '_index') for ix, col in self.columns.items() if col }
def get_rowcount(self, begin: "Optional['datetime.datetime']" = None, end: "Optional['datetime.datetime']" = None, remote: bool = False, params: Optional[Dict[str, Any]] = None, debug: bool = False) ‑> Optional[int]
-
Get a Pipe's instance or remote rowcount.
Parameters
begin
:Optional[datetime.datetime]
, defaultNone
- Count rows where datetime > begin.
end
:Optional[datetime.datetime]
, defaultNone
- Count rows where datetime < end.
remote
:bool
, defaultFalse
- Count rows from a pipe's remote source. NOTE: This is experimental!
debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
An
int
of the number of rows in the pipe corresponding to the provided parameters.None
is returned if the pipe does not exist.Expand source code
def get_rowcount( self, begin: Optional['datetime.datetime'] = None, end: Optional['datetime.datetime'] = None, remote: bool = False, params: Optional[Dict[str, Any]] = None, debug: bool = False ) -> Union[int, None]: """ Get a Pipe's instance or remote rowcount. Parameters ---------- begin: Optional[datetime.datetime], default None Count rows where datetime > begin. end: Optional[datetime.datetime], default None Count rows where datetime < end. remote: bool, default False Count rows from a pipe's remote source. **NOTE**: This is experimental! debug: bool, default False Verbosity toggle. Returns ------- An `int` of the number of rows in the pipe corresponding to the provided parameters. `None` is returned if the pipe does not exist. """ from meerschaum.utils.warnings import warn from meerschaum.utils.venv import Venv from meerschaum.connectors import get_connector_plugin connector = self.instance_connector if not remote else self.connector try: with Venv(get_connector_plugin(connector)): return connector.get_pipe_rowcount( self, begin=begin, end=end, remote=remote, params=params, debug=debug ) except AttributeError as e: warn(e) if remote: return None warn(f"Failed to get a rowcount for {self}.") return None
def get_sync_time(self, params: Optional[Dict[str, Any]] = None, newest: bool = True, round_down: bool = True, debug: bool = False) ‑> Optional[datetime.datetime]
-
Get the most recent datetime value for a Pipe.
Parameters
params
:Optional[Dict[str, Any]]
, defaultNone
- Dictionary to build a WHERE clause for a specific column.
See
build_where()
. newest
:bool
, defaultTrue
- If
True
, get the most recent datetime (honoringparams
). IfFalse
, get the oldest datetime (ASC
instead ofDESC
). round_down
:bool
, defaultTrue
- If
True
, round down the sync time to the nearest minute. debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A
datetime.datetime
object if the pipe exists, otherwiseNone
.Expand source code
def get_sync_time( self, params: Optional[Dict[str, Any]] = None, newest: bool = True, round_down: bool = True, debug: bool = False ) -> Union['datetime.datetime', None]: """ Get the most recent datetime value for a Pipe. Parameters ---------- params: Optional[Dict[str, Any]], default None Dictionary to build a WHERE clause for a specific column. See `meerschaum.utils.sql.build_where`. newest: bool, default True If `True`, get the most recent datetime (honoring `params`). If `False`, get the oldest datetime (`ASC` instead of `DESC`). round_down: bool, default True If `True`, round down the sync time to the nearest minute. debug: bool, default False Verbosity toggle. Returns ------- A `datetime.datetime` object if the pipe exists, otherwise `None`. """ from meerschaum.utils.venv import Venv from meerschaum.connectors import get_connector_plugin with Venv(get_connector_plugin(self.instance_connector)): return self.instance_connector.get_sync_time( self, params = params, newest = newest, round_down = round_down, debug = debug, )
def get_val_column(self, debug: bool = False) ‑> Optional[str]
-
Return the name of the value column if it's defined, otherwise make an educated guess. If not set in the
columns
dictionary, return the first numeric column that is not an ID or datetime column. If none may be found, returnNone
.Parameters
debug
:bool
, defaultFalse:
- Verbosity toggle.
Returns
Either a string or
None
.Expand source code
def get_val_column(self, debug: bool = False) -> Union[str, None]: """ Return the name of the value column if it's defined, otherwise make an educated guess. If not set in the `columns` dictionary, return the first numeric column that is not an ID or datetime column. If none may be found, return `None`. Parameters ---------- debug: bool, default False: Verbosity toggle. Returns ------- Either a string or `None`. """ from meerschaum.utils.debug import dprint if debug: dprint('Attempting to determine the value column...') try: val_name = self.get_columns('value') except Exception as e: val_name = None if val_name is not None: if debug: dprint(f"Value column: {val_name}") return val_name cols = self.columns if cols is None: if debug: dprint('No columns could be determined. Returning...') return None try: dt_name = self.get_columns('datetime', error=False) except Exception as e: dt_name = None try: id_name = self.get_columns('id', errors=False) except Exception as e: id_name = None if debug: dprint(f"dt_name: {dt_name}") dprint(f"id_name: {id_name}") cols_types = self.get_columns_types(debug=debug) if cols_types is None: return None if debug: dprint(f"cols_types: {cols_types}") if dt_name is not None: cols_types.pop(dt_name, None) if id_name is not None: cols_types.pop(id_name, None) candidates = [] candidate_keywords = {'float', 'double', 'precision', 'int', 'numeric',} for search_term in candidate_keywords: for col, typ in cols_types.items(): if search_term in typ.lower(): candidates.append(col) break if not candidates: if debug: dprint(f"No value column could be determined.") return None return candidates[0]
def guess_datetime(self) ‑> Optional[str]
-
Try to determine a pipe's datetime column.
Expand source code
def guess_datetime(self) -> Union[str, None]: """ Try to determine a pipe's datetime column. """ dtypes = self.dtypes ### Abort if the user explictly disallows a datetime index. if 'datetime' in dtypes: if dtypes['datetime'] is None: return None dt_cols = [ col for col, typ in self.dtypes.items() if str(typ).startswith('datetime') ] if not dt_cols: return None return dt_cols[0]
def infer_dtypes(self, persist: bool = False, debug: bool = False) ‑> Dict[str, Any]
-
If
dtypes
is not set inPipe.parameters
, infer the data types from the underlying table if it exists.Parameters
persist
:bool
, defaultFalse
- If
True
, persist the inferred data types toPipe.parameters
.
Returns
A dictionary of strings containing the pandas data types for this Pipe.
Expand source code
def infer_dtypes(self, persist: bool=False, debug: bool=False) -> Dict[str, Any]: """ If `dtypes` is not set in `meerschaum.Pipe.parameters`, infer the data types from the underlying table if it exists. Parameters ---------- persist: bool, default False If `True`, persist the inferred data types to `meerschaum.Pipe.parameters`. Returns ------- A dictionary of strings containing the pandas data types for this Pipe. """ if not self.exists(debug=debug): dtypes = {} if not self.columns: return {} dt_col = self.columns.get('datetime', None) if dt_col: if not self.parameters.get('dtypes', {}).get(dt_col, None): dtypes[dt_col] = 'datetime64[ns]' return dtypes from meerschaum.utils.sql import get_pd_type columns_types = self.get_columns_types(debug=debug) dtypes = { c: get_pd_type(t, allow_custom_dtypes=True) for c, t in columns_types.items() } if columns_types else {} if persist: self.dtypes = dtypes self.edit(interactive=False, debug=debug) return dtypes
def register(self, debug: bool = False, **kw: Any) ‑> Tuple[bool, str]
-
Register a new Pipe along with its attributes.
Parameters
debug
:bool
, defaultFalse
- Verbosity toggle.
kw
:Any
- Keyword arguments to pass to
instance_connector.register_pipe()
.
Returns
A
SuccessTuple
of success, message.Expand source code
def register( self, debug: bool = False, **kw: Any ) -> SuccessTuple: """ Register a new Pipe along with its attributes. Parameters ---------- debug: bool, default False Verbosity toggle. kw: Any Keyword arguments to pass to `instance_connector.register_pipe()`. Returns ------- A `SuccessTuple` of success, message. """ if self.temporary: return False, "Cannot register pipes created with `temporary=True` (read-only)." from meerschaum.utils.formatting import get_console from meerschaum.utils.venv import Venv from meerschaum.connectors import get_connector_plugin, custom_types from meerschaum.config._patch import apply_patch_to_config import warnings with warnings.catch_warnings(): warnings.simplefilter('ignore') try: _conn = self.connector except Exception as e: _conn = None if ( _conn is not None and (_conn.type == 'plugin' or _conn.type in custom_types) and getattr(_conn, 'register', None) is not None ): try: with Venv(get_connector_plugin(_conn), debug=debug): params = self.connector.register(self) except Exception as e: get_console().print_exception() params = None params = {} if params is None else params if not isinstance(params, dict): from meerschaum.utils.warnings import warn warn( f"Invalid parameters returned from `register()` in connector {self.connector}:\n" + f"{params}" ) else: self.parameters = apply_patch_to_config(params, self.parameters) if not self.parameters: cols = self.columns if self.columns else {'datetime': None, 'id': None} self.parameters = { 'columns': cols, } with Venv(get_connector_plugin(self.instance_connector)): return self.instance_connector.register_pipe(self, debug=debug, **kw)
def show(self, nopretty: bool = False, debug: bool = False, **kw) ‑> Tuple[bool, str]
-
Show attributes of a Pipe.
Parameters
nopretty
:bool
, defaultFalse
- If
True
, simply print the JSON of the pipe's attributes. debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A
SuccessTuple
of success, message.Expand source code
def show( self, nopretty: bool = False, debug: bool = False, **kw ) -> SuccessTuple: """ Show attributes of a Pipe. Parameters ---------- nopretty: bool, default False If `True`, simply print the JSON of the pipe's attributes. debug: bool, default False Verbosity toggle. Returns ------- A `SuccessTuple` of success, message. """ import json from meerschaum.utils.formatting import ( pprint, make_header, ANSI, highlight_pipes, fill_ansi, get_console, ) from meerschaum.utils.packages import import_rich, attempt_import from meerschaum.utils.warnings import info attributes_json = json.dumps(self.attributes) if not nopretty: _to_print = f"Attributes for {self}:" if ANSI: _to_print = fill_ansi(highlight_pipes(make_header(_to_print)), 'magenta') print(_to_print) rich = import_rich() rich_json = attempt_import('rich.json') get_console().print(rich_json.JSON(attributes_json)) else: print(_to_print) else: print(attributes_json) return True, "Success"
def sync(self, df: Union[pd.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]], InferFetch] = meerschaum.core.Pipe._sync.InferFetch, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, force: bool = False, retries: int = 10, min_seconds: int = 1, check_existing: bool = True, blocking: bool = True, workers: Optional[int] = None, callback: Optional[Callable[[Tuple[bool, str]], Any]] = None, error_callback: Optional[Callable[[Exception], Any]] = None, chunksize: Optional[int] = -1, sync_chunks: bool = True, debug: bool = False, **kw: Any) ‑> SuccessTuple
-
Fetch new data from the source and update the pipe's table with new data.
Get new remote data via fetch, get existing data in the same time period, and merge the two, only keeping the unseen data.
Parameters
df
:Union[None, pd.DataFrame, Dict[str, List[Any]]]
, defaultNone
- An optional DataFrame to sync into the pipe. Defaults to
None
. begin
:Optional[datetime.datetime]
, defaultNone
- Optionally specify the earliest datetime to search for data.
Defaults to
None
. end
:Optional[datetime.datetime]
, defaultNone
- Optionally specify the latest datetime to search for data.
Defaults to
None
. force
:bool
, defaultFalse
- If
True
, keep trying to sync untulretries
attempts. Defaults toFalse
. retries
:int
, default10
- If
force
, how many attempts to try syncing before declaring failure. Defaults to10
. min_seconds
:Union[int, float]
, default1
- If
force
, how many seconds to sleep between retries. Defaults to1
. check_existing
:bool
, defaultTrue
- If
True
, pull and diff with existing data from the pipe. Defaults toTrue
. blocking
:bool
, defaultTrue
- If
True
, wait for sync to finish and return its result, otherwise asyncronously sync (oxymoron?) and return success. Defaults toTrue
. Only intended for specific scenarios. workers
:Optional[int]
, defaultNone
- No use directly within
sync()
. Instead is passed on to instance connectors'sync_pipe()
methods (e.g.meerschaum.connectors.plugin.PluginConnector
). Defaults toNone
. callback
:Optional[Callable[[Tuple[bool, str]], Any]]
, defaultNone
- Callback function which expects a SuccessTuple as input.
Only applies when
blocking=False
. error_callback
:Optional[Callable[[Exception], Any]]
, defaultNone
- Callback function which expects an Exception as input.
Only applies when
blocking=False
. chunksize
:int
, default-1
- Specify the number of rows to sync per chunk.
If
-1
, resort to system configuration (default is900
). Achunksize
ofNone
will sync all rows in one transaction. Defaults to-1
. sync_chunks
:bool
, defaultTrue
- If possible, sync chunks while fetching them into memory.
debug
:bool
, defaultFalse
- Verbosity toggle. Defaults to False.
Returns
A
SuccessTuple
of success (bool
) and message (str
).Expand source code
def sync( self, df: Union[ pd.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]], InferFetch ] = InferFetch, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, force: bool = False, retries: int = 10, min_seconds: int = 1, check_existing: bool = True, blocking: bool = True, workers: Optional[int] = None, callback: Optional[Callable[[Tuple[bool, str]], Any]] = None, error_callback: Optional[Callable[[Exception], Any]] = None, chunksize: Optional[int] = -1, sync_chunks: bool = True, debug: bool = False, **kw: Any ) -> SuccessTuple: """ Fetch new data from the source and update the pipe's table with new data. Get new remote data via fetch, get existing data in the same time period, and merge the two, only keeping the unseen data. Parameters ---------- df: Union[None, pd.DataFrame, Dict[str, List[Any]]], default None An optional DataFrame to sync into the pipe. Defaults to `None`. begin: Optional[datetime.datetime], default None Optionally specify the earliest datetime to search for data. Defaults to `None`. end: Optional[datetime.datetime], default None Optionally specify the latest datetime to search for data. Defaults to `None`. force: bool, default False If `True`, keep trying to sync untul `retries` attempts. Defaults to `False`. retries: int, default 10 If `force`, how many attempts to try syncing before declaring failure. Defaults to `10`. min_seconds: Union[int, float], default 1 If `force`, how many seconds to sleep between retries. Defaults to `1`. check_existing: bool, default True If `True`, pull and diff with existing data from the pipe. Defaults to `True`. blocking: bool, default True If `True`, wait for sync to finish and return its result, otherwise asyncronously sync (oxymoron?) and return success. Defaults to `True`. Only intended for specific scenarios. workers: Optional[int], default None No use directly within `Pipe.sync()`. Instead is passed on to instance connectors' `sync_pipe()` methods (e.g. `meerschaum.connectors.plugin.PluginConnector`). Defaults to `None`. callback: Optional[Callable[[Tuple[bool, str]], Any]], default None Callback function which expects a SuccessTuple as input. Only applies when `blocking=False`. error_callback: Optional[Callable[[Exception], Any]], default None Callback function which expects an Exception as input. Only applies when `blocking=False`. chunksize: int, default -1 Specify the number of rows to sync per chunk. If `-1`, resort to system configuration (default is `900`). A `chunksize` of `None` will sync all rows in one transaction. Defaults to `-1`. sync_chunks: bool, default True If possible, sync chunks while fetching them into memory. debug: bool, default False Verbosity toggle. Defaults to False. Returns ------- A `SuccessTuple` of success (`bool`) and message (`str`). """ from meerschaum.utils.debug import dprint, _checkpoint from meerschaum.utils.warnings import warn, error from meerschaum.connectors import custom_types from meerschaum.plugins import Plugin from meerschaum.utils.formatting import get_console from meerschaum.utils.venv import Venv from meerschaum.connectors import get_connector_plugin from meerschaum.utils.misc import df_is_chunk_generator from meerschaum.utils.pool import get_pool from meerschaum.config import get_config if (callback is not None or error_callback is not None) and blocking: warn("Callback functions are only executed when blocking = False. Ignoring...") _checkpoint(_total=2, **kw) if chunksize == 0: chunksize = None sync_chunks = False ### TODO: Add flag for specifying syncing method. begin = _determine_begin(self, begin, debug=debug) kw.update({ 'begin': begin, 'end': end, 'force': force, 'retries': retries, 'min_seconds': min_seconds, 'check_existing': check_existing, 'blocking': blocking, 'workers': workers, 'callback': callback, 'error_callback': error_callback, 'sync_chunks': sync_chunks, 'chunksize': chunksize, }) ### NOTE: Invalidate `_exists` cache before and after syncing. self._exists = None def _sync( p: 'meerschaum.Pipe', df: Union[ 'pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]], InferFetch ] = InferFetch, ) -> SuccessTuple: if df is None: p._exists = None return ( False, f"You passed `None` instead of data into `sync()` for {p}.\n" + "Omit the DataFrame to infer fetching.", ) ### Ensure that Pipe is registered. if not p.temporary and p.get_id(debug=debug) is None: ### NOTE: This may trigger an interactive session for plugins! register_tuple = p.register(debug=debug) if not register_tuple[0]: p._exists = None return register_tuple ### If connector is a plugin with a `sync()` method, return that instead. ### If the plugin does not have a `sync()` method but does have a `fetch()` method, ### use that instead. ### NOTE: The DataFrame must be omitted for the plugin sync method to apply. ### If a DataFrame is provided, continue as expected. if hasattr(df, 'MRSM_INFER_FETCH'): try: if p.connector is None: msg = f"{p} does not have a valid connector." if p.connector_keys.startswith('plugin:'): msg += f"\n Perhaps {p.connector_keys} has a syntax error?" p._exists = None return False, msg except Exception as e: p._exists = None return False, f"Unable to create the connector for {p}." ### Sync in place if this is a SQL pipe. if ( str(self.connector) == str(self.instance_connector) and hasattr(self.instance_connector, 'sync_pipe_inplace') and get_config('system', 'experimental', 'inplace_sync') ): with Venv(get_connector_plugin(self.instance_connector)): p._exists = None return self.instance_connector.sync_pipe_inplace(p, debug=debug, **kw) ### Activate and invoke `sync(pipe)` for plugin connectors with `sync` methods. try: if p.connector.type == 'plugin' and p.connector.sync is not None: connector_plugin = Plugin(p.connector.label) with Venv(connector_plugin, debug=debug): return_tuple = p.connector.sync(p, debug=debug, **kw) p._exists = None if not isinstance(return_tuple, tuple): return_tuple = ( False, f"Plugin '{p.connector.label}' returned non-tuple value: {return_tuple}" ) return return_tuple except Exception as e: get_console().print_exception() msg = f"Failed to sync {p} with exception: '" + str(e) + "'" if debug: error(msg, silent=False) p._exists = None return False, msg ### Fetch the dataframe from the connector's `fetch()` method. try: ### If was added by `make_connector`, activate the plugin's virtual environment. is_custom = p.connector.type in custom_types plugin = ( Plugin(p.connector.__module__.replace('plugins.', '').split('.')[0]) if is_custom else ( Plugin(p.connector.label) if p.connector.type == 'plugin' else None ) ) with Venv(plugin, debug=debug): df = p.fetch(debug=debug, **kw) except Exception as e: get_console().print_exception( suppress = [ 'meerschaum/core/Pipe/_sync.py', 'meerschaum/core/Pipe/_fetch.py', ] ) msg = f"Failed to fetch data from {p.connector}:\n {e}" df = None if df is None: p._exists = None return False, f"No data were fetched for {p}." if isinstance(df, list): if len(df) == 0: return True, f"No new rows were returned for {p}." ### May be a chunk hook results list. if isinstance(df[0], tuple): success = all([_success for _success, _ in df]) message = '\n'.join([_message for _, _message in df]) return success, message ### TODO: Depreciate async? if df is True: p._exists = None return True, f"{p} is being synced in parallel." ### CHECKPOINT: Retrieved the DataFrame. _checkpoint(**kw) ### Allow for dataframe generators or iterables. if df_is_chunk_generator(df): is_thread_safe = getattr(self.instance_connector, 'IS_THREAD_SAFE', False) if is_thread_safe: engine_pool_size = ( p.instance_connector.engine.pool.size() if p.instance_connector.type == 'sql' else None ) current_num_threads = len(threading.enumerate()) workers = kw.get('workers', None) desired_workers = ( min(workers or engine_pool_size, engine_pool_size) if engine_pool_size is not None else (workers if is_thread_safe else 1) ) if desired_workers is None: desired_workers = (current_num_threads if is_thread_safe else 1) kw['workers'] = max( (desired_workers - current_num_threads), 1, ) else: kw['workers'] = 1 dt_col = p.columns.get('datetime', None) pool = get_pool(workers=kw.get('workers', 1)) if debug: dprint(f"Received {type(df)}. Attempting to sync first chunk...") try: chunk = next(df) except StopIteration: return True, "Received an empty generator; nothing to do." chunk_success, chunk_msg = _sync(p, chunk) chunk_msg = '\n' + self._get_chunk_label(chunk, dt_col) + '\n' + chunk_msg if not chunk_success: return chunk_success, f"Unable to sync initial chunk for {p}:\n{chunk_msg}" if debug: dprint(f"Successfully synced the first chunk, attemping the rest...") failed_chunks = [] def _process_chunk(_chunk): try: _chunk_success, _chunk_msg = _sync(p, _chunk) except Exception as e: _chunk_success, _chunk_msg = False, str(e) if not _chunk_success: failed_chunks.append(_chunk) return ( _chunk_success, ( '\n' + self._get_chunk_label(_chunk, dt_col) + '\n' + _chunk_msg ) ) results = sorted( [(chunk_success, chunk_msg)] + ( list(pool.imap(_process_chunk, df)) if not df_is_chunk_generator(chunk) else [ _process_chunk(_child_chunks) for _child_chunks in df ] ) ) chunk_messages = [chunk_msg for _, chunk_msg in results] success_bools = [chunk_success for chunk_success, _ in results] success = all(success_bools) msg = '\n'.join(chunk_messages) ### If some chunks succeeded, retry the failures. retry_success = True if not success and any(success_bools): if debug: dprint(f"Retrying failed chunks...") chunks_to_retry = [c for c in failed_chunks] failed_chunks = [] for chunk in chunks_to_retry: chunk_success, chunk_msg = _process_chunk(chunk) msg += f"\n\nRetried chunk:\n{chunk_msg}\n" retry_success = retry_success and chunk_success success = success and retry_success return success, msg ### Cast to a dataframe and ensure datatypes are what we expect. df = self.enforce_dtypes(df, debug=debug) if debug: dprint( "DataFrame to sync:\n" + (str(df)[:255] + '...' if len(str(df)) >= 256 else str(df)), **kw ) ### if force, continue to sync until success return_tuple = False, f"Did not sync {p}." run = True _retries = 1 while run: with Venv(get_connector_plugin(self.instance_connector)): return_tuple = p.instance_connector.sync_pipe( pipe = p, df = df, debug = debug, **kw ) _retries += 1 run = (not return_tuple[0]) and force and _retries <= retries if run and debug: dprint(f"Syncing failed for {p}. Attempt ( {_retries} / {retries} )", **kw) dprint(f"Sleeping for {min_seconds} seconds...", **kw) time.sleep(min_seconds) if _retries > retries: warn( f"Unable to sync {p} within {retries} attempt" + ("s" if retries != 1 else "") + "!" ) ### CHECKPOINT: Finished syncing. Handle caching. _checkpoint(**kw) if self.cache_pipe is not None: if debug: dprint(f"Caching retrieved dataframe.", **kw) _sync_cache_tuple = self.cache_pipe.sync(df, debug=debug, **kw) if not _sync_cache_tuple[0]: warn(f"Failed to sync local cache for {self}.") self._exists = None return return_tuple if blocking: self._exists = None return _sync(self, df = df) ### TODO implement concurrent syncing (split DataFrame? mimic the functionality of modin?) from meerschaum.utils.threading import Thread def default_callback(result_tuple : SuccessTuple): dprint(f"Asynchronous result from {self}: {result_tuple}", **kw) def default_error_callback(x : Exception): dprint(f"Error received for {self}: {x}", **kw) if callback is None and debug: callback = default_callback if error_callback is None and debug: error_callback = default_error_callback try: thread = Thread( target = _sync, args = (self,), kwargs = {'df' : df}, daemon = False, callback = callback, error_callback = error_callback ) thread.start() except Exception as e: self._exists = None return False, str(e) self._exists = None return True, f"Spawned asyncronous sync for {self}."
def update(self, *args, **kw) ‑> Tuple[bool, str]
-
Update a pipe's parameters in its instance.
Expand source code
def update(self, *args, **kw) -> SuccessTuple: """ Update a pipe's parameters in its instance. """ kw['interactive'] = False return self.edit(*args, **kw)
- Connector keys (e.g.
class Plugin (name: str, version: Optional[str] = None, user_id: Optional[int] = None, required: Optional[List[str]] = None, attributes: Optional[Dict[str, Any]] = None, archive_path: Optional[pathlib.Path] = None, venv_path: Optional[pathlib.Path] = None, repo_connector: "Optional['meerschaum.connectors.api.APIConnector']" = None, repo: "Union['meerschaum.connectors.api.APIConnector', str, None]" = None)
-
Handle packaging of Meerschaum plugins.
Expand source code
class Plugin: """Handle packaging of Meerschaum plugins.""" def __init__( self, name: str, version: Optional[str] = None, user_id: Optional[int] = None, required: Optional[List[str]] = None, attributes: Optional[Dict[str, Any]] = None, archive_path: Optional[pathlib.Path] = None, venv_path: Optional[pathlib.Path] = None, repo_connector: Optional['meerschaum.connectors.api.APIConnector'] = None, repo: Union['meerschaum.connectors.api.APIConnector', str, None] = None, ): from meerschaum.config.static import STATIC_CONFIG sep = STATIC_CONFIG['plugins']['repo_separator'] _repo = None if sep in name: try: name, _repo = name.split(sep) except Exception as e: error(f"Invalid plugin name: '{name}'") self._repo_in_name = _repo if attributes is None: attributes = {} self.name = name self.attributes = attributes self.user_id = user_id self._version = version if required: self._required = required self.archive_path = ( archive_path if archive_path is not None else PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz" ) self.venv_path = ( venv_path if venv_path is not None else VIRTENV_RESOURCES_PATH / self.name ) self._repo_connector = repo_connector self._repo_keys = repo @property def repo_connector(self): """ Return the repository connector for this plugin. NOTE: This imports the `connectors` module, which imports certain plugin modules. """ if self._repo_connector is None: from meerschaum.connectors.parse import parse_repo_keys repo_keys = self._repo_keys or self._repo_in_name if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name: error( f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'." ) repo_connector = parse_repo_keys(repo_keys) self._repo_connector = repo_connector return self._repo_connector @property def version(self): """ Return the plugin's module version is defined (`__version__`) if it's defined. """ if self._version is None: try: self._version = self.module.__version__ except Exception as e: self._version = None return self._version @property def module(self): """ Return the Python module of the underlying plugin. """ if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None: if self.__file__ is None: return None from meerschaum.plugins import import_plugins self._module = import_plugins(str(self), warn=False) return self._module @property def __file__(self) -> Union[str, None]: """ Return the file path (str) of the plugin if it exists, otherwise `None`. """ if self.__dict__.get('_module', None) is not None: return self.module.__file__ potential_dir = PLUGINS_RESOURCES_PATH / self.name if ( potential_dir.exists() and potential_dir.is_dir() and (potential_dir / '__init__.py').exists() ): return str((potential_dir / '__init__.py').as_posix()) potential_file = PLUGINS_RESOURCES_PATH / (self.name + '.py') if potential_file.exists() and not potential_file.is_dir(): return str(potential_file.as_posix()) return None @property def requirements_file_path(self) -> Union[pathlib.Path, None]: """ If a file named `requirements.txt` exists, return its path. """ if self.__file__ is None: return None path = pathlib.Path(self.__file__).parent / 'requirements.txt' if not path.exists(): return None return path def is_installed(self, **kw) -> bool: """ Check whether a plugin is correctly installed. Returns ------- A `bool` indicating whether a plugin exists and is successfully imported. """ return self.__file__ is not None def make_tar(self, debug: bool = False) -> pathlib.Path: """ Compress the plugin's source files into a `.tar.gz` archive and return the archive's path. Parameters ---------- debug: bool, default False Verbosity toggle. Returns ------- A `pathlib.Path` to the archive file's path. """ import tarfile, pathlib, subprocess, fnmatch from meerschaum.utils.debug import dprint from meerschaum.utils.packages import attempt_import pathspec = attempt_import('pathspec', debug=debug) if not self.__file__: from meerschaum.utils.warnings import error error(f"Could not find file for plugin '{self}'.") if '__init__.py' in self.__file__ or os.path.isdir(self.__file__): path = self.__file__.replace('__init__.py', '') is_dir = True else: path = self.__file__ is_dir = False old_cwd = os.getcwd() real_parent_path = pathlib.Path(os.path.realpath(path)).parent os.chdir(real_parent_path) default_patterns_to_ignore = [ '.pyc', '__pycache__/', 'eggs/', '__pypackages__/', '.git', ] def parse_gitignore() -> 'Set[str]': gitignore_path = pathlib.Path(path) / '.gitignore' if not gitignore_path.exists(): return set() with open(gitignore_path, 'r', encoding='utf-8') as f: gitignore_text = f.read() return set(pathspec.PathSpec.from_lines( pathspec.patterns.GitWildMatchPattern, default_patterns_to_ignore + gitignore_text.splitlines() ).match_tree(path)) patterns_to_ignore = parse_gitignore() if is_dir else set() if debug: dprint(f"Patterns to ignore:\n{patterns_to_ignore}") with tarfile.open(self.archive_path, 'w:gz') as tarf: if not is_dir: tarf.add(f"{self.name}.py") else: for root, dirs, files in os.walk(self.name): for f in files: good_file = True fp = os.path.join(root, f) for pattern in patterns_to_ignore: if pattern in str(fp) or f.startswith('.'): good_file = False break if good_file: if debug: dprint(f"Adding '{fp}'...") tarf.add(fp) ### clean up and change back to old directory os.chdir(old_cwd) ### change to 775 to avoid permissions issues with the API in a Docker container self.archive_path.chmod(0o775) if debug: dprint(f"Created archive '{self.archive_path}'.") return self.archive_path def install( self, force: bool = False, debug: bool = False, ) -> SuccessTuple: """ Extract a plugin's tar archive to the plugins directory. This function checks if the plugin is already installed and if the version is equal or greater than the existing installation. Parameters ---------- force: bool, default False If `True`, continue with installation, even if required packages fail to install. debug: bool, default False Verbosity toggle. Returns ------- A `SuccessTuple` of success (bool) and a message (str). """ if self.full_name in _ongoing_installations: return True, f"Already installing plugin '{self}'." _ongoing_installations.add(self.full_name) from meerschaum.utils.warnings import warn, error if debug: from meerschaum.utils.debug import dprint import tarfile import re import ast from meerschaum.plugins import reload_plugins, sync_plugins_symlinks from meerschaum.utils.packages import attempt_import, determine_version, reload_package from meerschaum.utils.venv import init_venv from meerschaum.utils.misc import safely_extract_tar old_cwd = os.getcwd() old_version = '' new_version = '' temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name temp_dir.mkdir(exist_ok=True) if not self.archive_path.exists(): return False, f"Missing archive file for plugin '{self}'." if self.version is not None: old_version = self.version if debug: dprint(f"Found existing version '{old_version}' for plugin '{self}'.") if debug: dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...") try: with tarfile.open(self.archive_path, 'r:gz') as tarf: safely_extract_tar(tarf, temp_dir) except Exception as e: warn(e) return False, f"Failed to extract plugin '{self.name}'." ### search for version information files = os.listdir(temp_dir) if str(files[0]) == self.name: is_dir = True elif str(files[0]) == self.name + '.py': is_dir = False else: error(f"Unknown format encountered for plugin '{self}'.") fpath = temp_dir / files[0] if is_dir: fpath = fpath / '__init__.py' init_venv(self.name, debug=debug) with open(fpath, 'r', encoding='utf-8') as f: init_lines = f.readlines() new_version = None for line in init_lines: if '__version__' not in line: continue version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip()) if not version_match: continue new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip()) break if not new_version: warn( f"No `__version__` defined for plugin '{self}'. " + "Assuming new version...", stack = False, ) packaging_version = attempt_import('packaging.version') try: is_new_version = (not new_version and not old_version) or ( packaging_version.parse(old_version) < packaging_version.parse(new_version) ) is_same_version = new_version and old_version and ( packaging_version.parse(old_version) == packaging_version.parse(new_version) ) except Exception as e: is_new_version, is_same_version = True, False ### Determine where to permanently store the new plugin. plugin_installation_dir_path = PLUGINS_DIR_PATHS[0] for path in PLUGINS_DIR_PATHS: files_in_plugins_dir = os.listdir(path) if ( self.name in files_in_plugins_dir or (self.name + '.py') in files_in_plugins_dir ): plugin_installation_dir_path = path break success_msg = f"Successfully installed plugin '{self}'." success, abort = None, None if is_same_version and not force: success, msg = True, ( f"Plugin '{self}' is up-to-date (version {old_version}).\n" + " Install again with `-f` or `--force` to reinstall." ) abort = True elif is_new_version or force: for src_dir, dirs, files in os.walk(temp_dir): if success is not None: break dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path)) if not os.path.exists(dst_dir): os.mkdir(dst_dir) for f in files: src_file = os.path.join(src_dir, f) dst_file = os.path.join(dst_dir, f) if os.path.exists(dst_file): os.remove(dst_file) if debug: dprint(f"Moving '{src_file}' to '{dst_dir}'...") try: shutil.move(src_file, dst_dir) except Exception as e: success, msg = False, ( f"Failed to install plugin '{self}': " + f"Could not move file '{src_file}' to '{dst_dir}'" ) print(msg) break if success is None: success, msg = True, success_msg else: success, msg = False, ( f"Your installed version of plugin '{self}' ({old_version}) is higher than " + f"attempted version {new_version}." ) shutil.rmtree(temp_dir) os.chdir(old_cwd) ### Reload the plugin's module. sync_plugins_symlinks(debug=debug) if '_module' in self.__dict__: del self.__dict__['_module'] init_venv(venv=self.name, force=True, debug=debug) reload_package('meerschaum') reload_plugins([self.name], debug=debug) ### if we've already failed, return here if not success or abort: _ongoing_installations.remove(self.full_name) return success, msg ### attempt to install dependencies if not self.install_dependencies(force=force, debug=debug): _ongoing_installations.remove(self.full_name) return False, f"Failed to install dependencies for plugin '{self}'." ### handling success tuple, bool, or other (typically None) setup_tuple = self.setup(debug=debug) if isinstance(setup_tuple, tuple): if not setup_tuple[0]: success, msg = setup_tuple elif isinstance(setup_tuple, bool): if not setup_tuple: success, msg = False, ( f"Failed to run post-install setup for plugin '{self}'." + '\n' + f"Check `setup()` in '{self.__file__}' for more information " + f"(no error message provided)." ) else: success, msg = True, success_msg elif setup_tuple is None: success = True msg = ( f"Post-install for plugin '{self}' returned None. " + f"Assuming plugin successfully installed." ) warn(msg) else: success = False msg = ( f"Post-install for plugin '{self}' returned unexpected value " + f"of type '{type(setup_tuple)}': {setup_tuple}" ) _ongoing_installations.remove(self.full_name) module = self.module return success, msg def remove_archive( self, debug: bool = False ) -> SuccessTuple: """Remove a plugin's archive file.""" if not self.archive_path.exists(): return True, f"Archive file for plugin '{self}' does not exist." try: self.archive_path.unlink() except Exception as e: return False, f"Failed to remove archive for plugin '{self}':\n{e}" return True, "Success" def remove_venv( self, debug: bool = False ) -> SuccessTuple: """Remove a plugin's virtual environment.""" if not self.venv_path.exists(): return True, f"Virtual environment for plugin '{self}' does not exist." try: shutil.rmtree(self.venv_path) except Exception as e: return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}" return True, "Success" def uninstall(self, debug: bool = False) -> SuccessTuple: """ Remove a plugin, its virtual environment, and archive file. """ from meerschaum.utils.packages import reload_package from meerschaum.plugins import reload_plugins, sync_plugins_symlinks from meerschaum.utils.warnings import warn, info warnings_thrown_count: int = 0 max_warnings: int = 3 if not self.is_installed(): info( f"Plugin '{self.name}' doesn't seem to be installed.\n " + "Checking for artifacts...", stack = False, ) else: real_path = pathlib.Path(os.path.realpath(self.__file__)) try: if real_path.name == '__init__.py': shutil.rmtree(real_path.parent) else: real_path.unlink() except Exception as e: warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False) warnings_thrown_count += 1 else: info(f"Removed source files for plugin '{self.name}'.") if self.venv_path.exists(): success, msg = self.remove_venv(debug=debug) if not success: warn(msg, stack=False) warnings_thrown_count += 1 else: info(f"Removed virtual environment from plugin '{self.name}'.") success = warnings_thrown_count < max_warnings sync_plugins_symlinks(debug=debug) self.deactivate_venv(force=True, debug=debug) reload_package('meerschaum') reload_plugins(debug=debug) return success, ( f"Successfully uninstalled plugin '{self}'." if success else f"Failed to uninstall plugin '{self}'." ) def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]: """ If exists, run the plugin's `setup()` function. Parameters ---------- *args: str The positional arguments passed to the `setup()` function. debug: bool, default False Verbosity toggle. **kw: Any The keyword arguments passed to the `setup()` function. Returns ------- A `SuccessTuple` or `bool` indicating success. """ from meerschaum.utils.debug import dprint import inspect _setup = None for name, fp in inspect.getmembers(self.module): if name == 'setup' and inspect.isfunction(fp): _setup = fp break ### assume success if no setup() is found (not necessary) if _setup is None: return True sig = inspect.signature(_setup) has_debug, has_kw = ('debug' in sig.parameters), False for k, v in sig.parameters.items(): if '**' in str(v): has_kw = True break _kw = {} if has_kw: _kw.update(kw) if has_debug: _kw['debug'] = debug if debug: dprint(f"Running setup for plugin '{self}'...") try: self.activate_venv(debug=debug) return_tuple = _setup(*args, **_kw) self.deactivate_venv(debug=debug) except Exception as e: return False, str(e) if isinstance(return_tuple, tuple): return return_tuple if isinstance(return_tuple, bool): return return_tuple, f"Setup for Plugin '{self.name}' did not return a message." if return_tuple is None: return False, f"Setup for Plugin '{self.name}' returned None." return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}" def get_dependencies( self, debug: bool = False, ) -> List[str]: """ If the Plugin has specified dependencies in a list called `required`, return the list. **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages. Meerschaum plugins may also specify connector keys for a repo after `'@'`. Parameters ---------- debug: bool, default False Verbosity toggle. Returns ------- A list of required packages and plugins (str). """ if '_required' in self.__dict__: return self._required ### If the plugin has not yet been imported, ### infer the dependencies from the source text. ### This is not super robust, and it doesn't feel right ### having multiple versions of the logic. ### This is necessary when determining the activation order ### without having import the module. ### For consistency's sake, the module-less method does not cache the requirements. if self.__dict__.get('_module', None) is None: file_path = self.__file__ if file_path is None: return [] with open(file_path, 'r', encoding='utf-8') as f: text = f.read() if 'required' not in text: return [] ### This has some limitations: ### It relies on `required` being manually declared. ### We lose the ability to dynamically alter the `required` list, ### which is why we've kept the module-reliant method below. import ast, re ### NOTE: This technically would break ### if `required` was the very first line of the file. req_start_match = re.search(r'\nrequired(\s?)=', text) if not req_start_match: return [] req_start = req_start_match.start() ### Dependencies may have brackets within the strings, so push back the index. first_opening_brace = req_start + 1 + text[req_start:].find('[') if first_opening_brace == -1: return [] next_closing_brace = req_start + 1 + text[req_start:].find(']') if next_closing_brace == -1: return [] start_ix = first_opening_brace + 1 end_ix = next_closing_brace num_braces = 0 while True: if '[' not in text[start_ix:end_ix]: break num_braces += 1 start_ix = end_ix end_ix += text[end_ix + 1:].find(']') + 1 req_end = end_ix + 1 req_text = ( text[req_start:req_end] .lstrip() .replace('required', '', 1) .lstrip() .replace('=', '', 1) .lstrip() ) try: required = ast.literal_eval(req_text) except Exception as e: warn( f"Unable to determine requirements for plugin '{self.name}' " + "without importing the module.\n" + " This may be due to dynamically setting the global `required` list.\n" + f" {e}" ) return [] return required import inspect self.activate_venv(dependencies=False, debug=debug) required = [] for name, val in inspect.getmembers(self.module): if name == 'required': required = val break self._required = required self.deactivate_venv(dependencies=False, debug=debug) return required def get_required_plugins(self, debug: bool=False) -> List[meerschaum.plugins.Plugin]: """ Return a list of required Plugin objects. """ from meerschaum.utils.warnings import warn from meerschaum.config import get_config from meerschaum.config.static import STATIC_CONFIG plugins = [] _deps = self.get_dependencies(debug=debug) sep = STATIC_CONFIG['plugins']['repo_separator'] plugin_names = [ _d[len('plugin:'):] for _d in _deps if _d.startswith('plugin:') and len(_d) > len('plugin:') ] default_repo_keys = get_config('meerschaum', 'default_repository') for _plugin_name in plugin_names: if sep in _plugin_name: try: _plugin_name, _repo_keys = _plugin_name.split(sep) except Exception as e: _repo_keys = default_repo_keys warn( f"Invalid repo keys for required plugin '{_plugin_name}'.\n " + f"Will try to use '{_repo_keys}' instead.", stack = False, ) else: _repo_keys = default_repo_keys plugins.append(Plugin(_plugin_name, repo=_repo_keys)) return plugins def get_required_packages(self, debug: bool=False) -> List[str]: """ Return the required package names (excluding plugins). """ _deps = self.get_dependencies(debug=debug) return [_d for _d in _deps if not _d.startswith('plugin:')] def activate_venv(self, dependencies: bool=True, debug: bool=False, **kw) -> bool: """ Activate the virtual environments for the plugin and its dependencies. Parameters ---------- dependencies: bool, default True If `True`, activate the virtual environments for required plugins. Returns ------- A bool indicating success. """ from meerschaum.utils.venv import venv_target_path from meerschaum.utils.packages import activate_venv from meerschaum.utils.misc import make_symlink, is_symlink from meerschaum.config._paths import PACKAGE_ROOT_PATH if dependencies: for plugin in self.get_required_plugins(debug=debug): plugin.activate_venv(debug=debug, **kw) vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True) venv_meerschaum_path = vtp / 'meerschaum' try: success, msg = True, "Success" if is_symlink(venv_meerschaum_path): if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH: venv_meerschaum_path.unlink() success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH) except Exception as e: success, msg = False, str(e) if not success: warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}") return activate_venv(self.name, debug=debug, **kw) def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool: """ Deactivate the virtual environments for the plugin and its dependencies. Parameters ---------- dependencies: bool, default True If `True`, deactivate the virtual environments for required plugins. Returns ------- A bool indicating success. """ from meerschaum.utils.packages import deactivate_venv success = deactivate_venv(self.name, debug=debug, **kw) if dependencies: for plugin in self.get_required_plugins(debug=debug): plugin.deactivate_venv(debug=debug, **kw) return success def install_dependencies( self, force: bool = False, debug: bool = False, ) -> bool: """ If specified, install dependencies. **NOTE:** Dependencies that start with `'plugin:'` will be installed as Meerschaum plugins from the same repository as this Plugin. To install from a different repository, add the repo keys after `'@'` (e.g. `'plugin:foo@api:bar'`). Parameters ---------- force: bool, default False If `True`, continue with the installation, even if some required packages fail to install. debug: bool, default False Verbosity toggle. Returns ------- A bool indicating success. """ from meerschaum.utils.packages import pip_install, venv_contains_package from meerschaum.utils.debug import dprint from meerschaum.utils.warnings import warn, info from meerschaum.connectors.parse import parse_repo_keys _deps = self.get_dependencies(debug=debug) if not _deps and self.requirements_file_path is None: return True plugins = self.get_required_plugins(debug=debug) for _plugin in plugins: if _plugin.name == self.name: warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False) continue _success, _msg = _plugin.repo_connector.install_plugin( _plugin.name, debug=debug, force=force ) if not _success: warn( f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'" + f" for plugin '{self.name}':\n" + _msg, stack = False, ) if not force: warn( "Try installing with the `--force` flag to continue anyway.", stack = False, ) return False info( "Continuing with installation despite the failure " + "(careful, things might be broken!)...", icon = False ) ### First step: parse `requirements.txt` if it exists. if self.requirements_file_path is not None: if not pip_install( requirements_file_path=self.requirements_file_path, venv=self.name, debug=debug ): warn( f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.", stack = False, ) if not force: warn( "Try installing with `--force` to continue anyway.", stack = False, ) return False info( "Continuing with installation despite the failure " + "(careful, things might be broken!)...", icon = False ) ### Don't reinstall packages that are already included in required plugins. packages = [] _packages = self.get_required_packages(debug=debug) accounted_for_packages = set() for package_name in _packages: for plugin in plugins: if venv_contains_package(package_name, plugin.name): accounted_for_packages.add(package_name) break packages = [pkg for pkg in _packages if pkg not in accounted_for_packages] ### Attempt pip packages installation. if packages: for package in packages: if not pip_install(package, venv=self.name, debug=debug): warn( f"Failed to install required package '{package}'" + f" for plugin '{self.name}'.", stack = False, ) if not force: warn( "Try installing with `--force` to continue anyway.", stack = False, ) return False info( "Continuing with installation despite the failure " + "(careful, things might be broken!)...", icon = False ) return True @property def full_name(self) -> str: """ Include the repo keys with the plugin's name. """ from meerschaum.config.static import STATIC_CONFIG sep = STATIC_CONFIG['plugins']['repo_separator'] return self.name + sep + str(self.repo_connector) def __str__(self): return self.name def __repr__(self): return f"Plugin('{self.name}', repo='{self.repo_connector}')" def __del__(self): pass
Instance variables
var full_name : str
-
Include the repo keys with the plugin's name.
Expand source code
@property def full_name(self) -> str: """ Include the repo keys with the plugin's name. """ from meerschaum.config.static import STATIC_CONFIG sep = STATIC_CONFIG['plugins']['repo_separator'] return self.name + sep + str(self.repo_connector)
var module
-
Return the Python module of the underlying plugin.
Expand source code
@property def module(self): """ Return the Python module of the underlying plugin. """ if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None: if self.__file__ is None: return None from meerschaum.plugins import import_plugins self._module = import_plugins(str(self), warn=False) return self._module
var repo_connector
-
Return the repository connector for this plugin. NOTE: This imports the
meerschaum.connectors
module, which imports certain plugin modules.Expand source code
@property def repo_connector(self): """ Return the repository connector for this plugin. NOTE: This imports the `connectors` module, which imports certain plugin modules. """ if self._repo_connector is None: from meerschaum.connectors.parse import parse_repo_keys repo_keys = self._repo_keys or self._repo_in_name if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name: error( f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'." ) repo_connector = parse_repo_keys(repo_keys) self._repo_connector = repo_connector return self._repo_connector
var requirements_file_path : Optional[pathlib.Path]
-
If a file named
requirements.txt
exists, return its path.Expand source code
@property def requirements_file_path(self) -> Union[pathlib.Path, None]: """ If a file named `requirements.txt` exists, return its path. """ if self.__file__ is None: return None path = pathlib.Path(self.__file__).parent / 'requirements.txt' if not path.exists(): return None return path
var version
-
Return the plugin's module version is defined (
__version__
) if it's defined.Expand source code
@property def version(self): """ Return the plugin's module version is defined (`__version__`) if it's defined. """ if self._version is None: try: self._version = self.module.__version__ except Exception as e: self._version = None return self._version
Methods
def activate_venv(self, dependencies: bool = True, debug: bool = False, **kw) ‑> bool
-
Activate the virtual environments for the plugin and its dependencies.
Parameters
dependencies
:bool
, defaultTrue
- If
True
, activate the virtual environments for required plugins.
Returns
A bool indicating success.
Expand source code
def activate_venv(self, dependencies: bool=True, debug: bool=False, **kw) -> bool: """ Activate the virtual environments for the plugin and its dependencies. Parameters ---------- dependencies: bool, default True If `True`, activate the virtual environments for required plugins. Returns ------- A bool indicating success. """ from meerschaum.utils.venv import venv_target_path from meerschaum.utils.packages import activate_venv from meerschaum.utils.misc import make_symlink, is_symlink from meerschaum.config._paths import PACKAGE_ROOT_PATH if dependencies: for plugin in self.get_required_plugins(debug=debug): plugin.activate_venv(debug=debug, **kw) vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True) venv_meerschaum_path = vtp / 'meerschaum' try: success, msg = True, "Success" if is_symlink(venv_meerschaum_path): if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH: venv_meerschaum_path.unlink() success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH) except Exception as e: success, msg = False, str(e) if not success: warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}") return activate_venv(self.name, debug=debug, **kw)
def deactivate_venv(self, dependencies: bool = True, debug: bool = False, **kw) ‑> bool
-
Deactivate the virtual environments for the plugin and its dependencies.
Parameters
dependencies
:bool
, defaultTrue
- If
True
, deactivate the virtual environments for required plugins.
Returns
A bool indicating success.
Expand source code
def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool: """ Deactivate the virtual environments for the plugin and its dependencies. Parameters ---------- dependencies: bool, default True If `True`, deactivate the virtual environments for required plugins. Returns ------- A bool indicating success. """ from meerschaum.utils.packages import deactivate_venv success = deactivate_venv(self.name, debug=debug, **kw) if dependencies: for plugin in self.get_required_plugins(debug=debug): plugin.deactivate_venv(debug=debug, **kw) return success
def get_dependencies(self, debug: bool = False) ‑> List[str]
-
If the Plugin has specified dependencies in a list called
required
, return the list.NOTE: Dependecies which start with
'plugin:'
are Meerschaum plugins, not pip packages. Meerschaum plugins may also specify connector keys for a repo after'@'
.Parameters
debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A list of required packages and plugins (str).
Expand source code
def get_dependencies( self, debug: bool = False, ) -> List[str]: """ If the Plugin has specified dependencies in a list called `required`, return the list. **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages. Meerschaum plugins may also specify connector keys for a repo after `'@'`. Parameters ---------- debug: bool, default False Verbosity toggle. Returns ------- A list of required packages and plugins (str). """ if '_required' in self.__dict__: return self._required ### If the plugin has not yet been imported, ### infer the dependencies from the source text. ### This is not super robust, and it doesn't feel right ### having multiple versions of the logic. ### This is necessary when determining the activation order ### without having import the module. ### For consistency's sake, the module-less method does not cache the requirements. if self.__dict__.get('_module', None) is None: file_path = self.__file__ if file_path is None: return [] with open(file_path, 'r', encoding='utf-8') as f: text = f.read() if 'required' not in text: return [] ### This has some limitations: ### It relies on `required` being manually declared. ### We lose the ability to dynamically alter the `required` list, ### which is why we've kept the module-reliant method below. import ast, re ### NOTE: This technically would break ### if `required` was the very first line of the file. req_start_match = re.search(r'\nrequired(\s?)=', text) if not req_start_match: return [] req_start = req_start_match.start() ### Dependencies may have brackets within the strings, so push back the index. first_opening_brace = req_start + 1 + text[req_start:].find('[') if first_opening_brace == -1: return [] next_closing_brace = req_start + 1 + text[req_start:].find(']') if next_closing_brace == -1: return [] start_ix = first_opening_brace + 1 end_ix = next_closing_brace num_braces = 0 while True: if '[' not in text[start_ix:end_ix]: break num_braces += 1 start_ix = end_ix end_ix += text[end_ix + 1:].find(']') + 1 req_end = end_ix + 1 req_text = ( text[req_start:req_end] .lstrip() .replace('required', '', 1) .lstrip() .replace('=', '', 1) .lstrip() ) try: required = ast.literal_eval(req_text) except Exception as e: warn( f"Unable to determine requirements for plugin '{self.name}' " + "without importing the module.\n" + " This may be due to dynamically setting the global `required` list.\n" + f" {e}" ) return [] return required import inspect self.activate_venv(dependencies=False, debug=debug) required = [] for name, val in inspect.getmembers(self.module): if name == 'required': required = val break self._required = required self.deactivate_venv(dependencies=False, debug=debug) return required
def get_required_packages(self, debug: bool = False) ‑> List[str]
-
Return the required package names (excluding plugins).
Expand source code
def get_required_packages(self, debug: bool=False) -> List[str]: """ Return the required package names (excluding plugins). """ _deps = self.get_dependencies(debug=debug) return [_d for _d in _deps if not _d.startswith('plugin:')]
def get_required_plugins(self, debug: bool = False) ‑> List[Plugin]
-
Return a list of required Plugin objects.
Expand source code
def get_required_plugins(self, debug: bool=False) -> List[meerschaum.plugins.Plugin]: """ Return a list of required Plugin objects. """ from meerschaum.utils.warnings import warn from meerschaum.config import get_config from meerschaum.config.static import STATIC_CONFIG plugins = [] _deps = self.get_dependencies(debug=debug) sep = STATIC_CONFIG['plugins']['repo_separator'] plugin_names = [ _d[len('plugin:'):] for _d in _deps if _d.startswith('plugin:') and len(_d) > len('plugin:') ] default_repo_keys = get_config('meerschaum', 'default_repository') for _plugin_name in plugin_names: if sep in _plugin_name: try: _plugin_name, _repo_keys = _plugin_name.split(sep) except Exception as e: _repo_keys = default_repo_keys warn( f"Invalid repo keys for required plugin '{_plugin_name}'.\n " + f"Will try to use '{_repo_keys}' instead.", stack = False, ) else: _repo_keys = default_repo_keys plugins.append(Plugin(_plugin_name, repo=_repo_keys)) return plugins
def install(self, force: bool = False, debug: bool = False) ‑> Tuple[bool, str]
-
Extract a plugin's tar archive to the plugins directory.
This function checks if the plugin is already installed and if the version is equal or greater than the existing installation.
Parameters
force
:bool
, defaultFalse
- If
True
, continue with installation, even if required packages fail to install. debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A
SuccessTuple
of success (bool) and a message (str).Expand source code
def install( self, force: bool = False, debug: bool = False, ) -> SuccessTuple: """ Extract a plugin's tar archive to the plugins directory. This function checks if the plugin is already installed and if the version is equal or greater than the existing installation. Parameters ---------- force: bool, default False If `True`, continue with installation, even if required packages fail to install. debug: bool, default False Verbosity toggle. Returns ------- A `SuccessTuple` of success (bool) and a message (str). """ if self.full_name in _ongoing_installations: return True, f"Already installing plugin '{self}'." _ongoing_installations.add(self.full_name) from meerschaum.utils.warnings import warn, error if debug: from meerschaum.utils.debug import dprint import tarfile import re import ast from meerschaum.plugins import reload_plugins, sync_plugins_symlinks from meerschaum.utils.packages import attempt_import, determine_version, reload_package from meerschaum.utils.venv import init_venv from meerschaum.utils.misc import safely_extract_tar old_cwd = os.getcwd() old_version = '' new_version = '' temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name temp_dir.mkdir(exist_ok=True) if not self.archive_path.exists(): return False, f"Missing archive file for plugin '{self}'." if self.version is not None: old_version = self.version if debug: dprint(f"Found existing version '{old_version}' for plugin '{self}'.") if debug: dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...") try: with tarfile.open(self.archive_path, 'r:gz') as tarf: safely_extract_tar(tarf, temp_dir) except Exception as e: warn(e) return False, f"Failed to extract plugin '{self.name}'." ### search for version information files = os.listdir(temp_dir) if str(files[0]) == self.name: is_dir = True elif str(files[0]) == self.name + '.py': is_dir = False else: error(f"Unknown format encountered for plugin '{self}'.") fpath = temp_dir / files[0] if is_dir: fpath = fpath / '__init__.py' init_venv(self.name, debug=debug) with open(fpath, 'r', encoding='utf-8') as f: init_lines = f.readlines() new_version = None for line in init_lines: if '__version__' not in line: continue version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip()) if not version_match: continue new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip()) break if not new_version: warn( f"No `__version__` defined for plugin '{self}'. " + "Assuming new version...", stack = False, ) packaging_version = attempt_import('packaging.version') try: is_new_version = (not new_version and not old_version) or ( packaging_version.parse(old_version) < packaging_version.parse(new_version) ) is_same_version = new_version and old_version and ( packaging_version.parse(old_version) == packaging_version.parse(new_version) ) except Exception as e: is_new_version, is_same_version = True, False ### Determine where to permanently store the new plugin. plugin_installation_dir_path = PLUGINS_DIR_PATHS[0] for path in PLUGINS_DIR_PATHS: files_in_plugins_dir = os.listdir(path) if ( self.name in files_in_plugins_dir or (self.name + '.py') in files_in_plugins_dir ): plugin_installation_dir_path = path break success_msg = f"Successfully installed plugin '{self}'." success, abort = None, None if is_same_version and not force: success, msg = True, ( f"Plugin '{self}' is up-to-date (version {old_version}).\n" + " Install again with `-f` or `--force` to reinstall." ) abort = True elif is_new_version or force: for src_dir, dirs, files in os.walk(temp_dir): if success is not None: break dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path)) if not os.path.exists(dst_dir): os.mkdir(dst_dir) for f in files: src_file = os.path.join(src_dir, f) dst_file = os.path.join(dst_dir, f) if os.path.exists(dst_file): os.remove(dst_file) if debug: dprint(f"Moving '{src_file}' to '{dst_dir}'...") try: shutil.move(src_file, dst_dir) except Exception as e: success, msg = False, ( f"Failed to install plugin '{self}': " + f"Could not move file '{src_file}' to '{dst_dir}'" ) print(msg) break if success is None: success, msg = True, success_msg else: success, msg = False, ( f"Your installed version of plugin '{self}' ({old_version}) is higher than " + f"attempted version {new_version}." ) shutil.rmtree(temp_dir) os.chdir(old_cwd) ### Reload the plugin's module. sync_plugins_symlinks(debug=debug) if '_module' in self.__dict__: del self.__dict__['_module'] init_venv(venv=self.name, force=True, debug=debug) reload_package('meerschaum') reload_plugins([self.name], debug=debug) ### if we've already failed, return here if not success or abort: _ongoing_installations.remove(self.full_name) return success, msg ### attempt to install dependencies if not self.install_dependencies(force=force, debug=debug): _ongoing_installations.remove(self.full_name) return False, f"Failed to install dependencies for plugin '{self}'." ### handling success tuple, bool, or other (typically None) setup_tuple = self.setup(debug=debug) if isinstance(setup_tuple, tuple): if not setup_tuple[0]: success, msg = setup_tuple elif isinstance(setup_tuple, bool): if not setup_tuple: success, msg = False, ( f"Failed to run post-install setup for plugin '{self}'." + '\n' + f"Check `setup()` in '{self.__file__}' for more information " + f"(no error message provided)." ) else: success, msg = True, success_msg elif setup_tuple is None: success = True msg = ( f"Post-install for plugin '{self}' returned None. " + f"Assuming plugin successfully installed." ) warn(msg) else: success = False msg = ( f"Post-install for plugin '{self}' returned unexpected value " + f"of type '{type(setup_tuple)}': {setup_tuple}" ) _ongoing_installations.remove(self.full_name) module = self.module return success, msg
def install_dependencies(self, force: bool = False, debug: bool = False) ‑> bool
-
If specified, install dependencies.
NOTE: Dependencies that start with
'plugin:'
will be installed as Meerschaum plugins from the same repository as this Plugin. To install from a different repository, add the repo keys after'@'
(e.g.'plugin:foo@api:bar'
).Parameters
force
:bool
, defaultFalse
- If
True
, continue with the installation, even if some required packages fail to install. debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A bool indicating success.
Expand source code
def install_dependencies( self, force: bool = False, debug: bool = False, ) -> bool: """ If specified, install dependencies. **NOTE:** Dependencies that start with `'plugin:'` will be installed as Meerschaum plugins from the same repository as this Plugin. To install from a different repository, add the repo keys after `'@'` (e.g. `'plugin:foo@api:bar'`). Parameters ---------- force: bool, default False If `True`, continue with the installation, even if some required packages fail to install. debug: bool, default False Verbosity toggle. Returns ------- A bool indicating success. """ from meerschaum.utils.packages import pip_install, venv_contains_package from meerschaum.utils.debug import dprint from meerschaum.utils.warnings import warn, info from meerschaum.connectors.parse import parse_repo_keys _deps = self.get_dependencies(debug=debug) if not _deps and self.requirements_file_path is None: return True plugins = self.get_required_plugins(debug=debug) for _plugin in plugins: if _plugin.name == self.name: warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False) continue _success, _msg = _plugin.repo_connector.install_plugin( _plugin.name, debug=debug, force=force ) if not _success: warn( f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'" + f" for plugin '{self.name}':\n" + _msg, stack = False, ) if not force: warn( "Try installing with the `--force` flag to continue anyway.", stack = False, ) return False info( "Continuing with installation despite the failure " + "(careful, things might be broken!)...", icon = False ) ### First step: parse `requirements.txt` if it exists. if self.requirements_file_path is not None: if not pip_install( requirements_file_path=self.requirements_file_path, venv=self.name, debug=debug ): warn( f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.", stack = False, ) if not force: warn( "Try installing with `--force` to continue anyway.", stack = False, ) return False info( "Continuing with installation despite the failure " + "(careful, things might be broken!)...", icon = False ) ### Don't reinstall packages that are already included in required plugins. packages = [] _packages = self.get_required_packages(debug=debug) accounted_for_packages = set() for package_name in _packages: for plugin in plugins: if venv_contains_package(package_name, plugin.name): accounted_for_packages.add(package_name) break packages = [pkg for pkg in _packages if pkg not in accounted_for_packages] ### Attempt pip packages installation. if packages: for package in packages: if not pip_install(package, venv=self.name, debug=debug): warn( f"Failed to install required package '{package}'" + f" for plugin '{self.name}'.", stack = False, ) if not force: warn( "Try installing with `--force` to continue anyway.", stack = False, ) return False info( "Continuing with installation despite the failure " + "(careful, things might be broken!)...", icon = False ) return True
def is_installed(self, **kw) ‑> bool
-
Check whether a plugin is correctly installed.
Returns
A
bool
indicating whether a plugin exists and is successfully imported.Expand source code
def is_installed(self, **kw) -> bool: """ Check whether a plugin is correctly installed. Returns ------- A `bool` indicating whether a plugin exists and is successfully imported. """ return self.__file__ is not None
def make_tar(self, debug: bool = False) ‑> pathlib.Path
-
Compress the plugin's source files into a
.tar.gz
archive and return the archive's path.Parameters
debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A
pathlib.Path
to the archive file's path.Expand source code
def make_tar(self, debug: bool = False) -> pathlib.Path: """ Compress the plugin's source files into a `.tar.gz` archive and return the archive's path. Parameters ---------- debug: bool, default False Verbosity toggle. Returns ------- A `pathlib.Path` to the archive file's path. """ import tarfile, pathlib, subprocess, fnmatch from meerschaum.utils.debug import dprint from meerschaum.utils.packages import attempt_import pathspec = attempt_import('pathspec', debug=debug) if not self.__file__: from meerschaum.utils.warnings import error error(f"Could not find file for plugin '{self}'.") if '__init__.py' in self.__file__ or os.path.isdir(self.__file__): path = self.__file__.replace('__init__.py', '') is_dir = True else: path = self.__file__ is_dir = False old_cwd = os.getcwd() real_parent_path = pathlib.Path(os.path.realpath(path)).parent os.chdir(real_parent_path) default_patterns_to_ignore = [ '.pyc', '__pycache__/', 'eggs/', '__pypackages__/', '.git', ] def parse_gitignore() -> 'Set[str]': gitignore_path = pathlib.Path(path) / '.gitignore' if not gitignore_path.exists(): return set() with open(gitignore_path, 'r', encoding='utf-8') as f: gitignore_text = f.read() return set(pathspec.PathSpec.from_lines( pathspec.patterns.GitWildMatchPattern, default_patterns_to_ignore + gitignore_text.splitlines() ).match_tree(path)) patterns_to_ignore = parse_gitignore() if is_dir else set() if debug: dprint(f"Patterns to ignore:\n{patterns_to_ignore}") with tarfile.open(self.archive_path, 'w:gz') as tarf: if not is_dir: tarf.add(f"{self.name}.py") else: for root, dirs, files in os.walk(self.name): for f in files: good_file = True fp = os.path.join(root, f) for pattern in patterns_to_ignore: if pattern in str(fp) or f.startswith('.'): good_file = False break if good_file: if debug: dprint(f"Adding '{fp}'...") tarf.add(fp) ### clean up and change back to old directory os.chdir(old_cwd) ### change to 775 to avoid permissions issues with the API in a Docker container self.archive_path.chmod(0o775) if debug: dprint(f"Created archive '{self.archive_path}'.") return self.archive_path
def remove_archive(self, debug: bool = False) ‑> Tuple[bool, str]
-
Remove a plugin's archive file.
Expand source code
def remove_archive( self, debug: bool = False ) -> SuccessTuple: """Remove a plugin's archive file.""" if not self.archive_path.exists(): return True, f"Archive file for plugin '{self}' does not exist." try: self.archive_path.unlink() except Exception as e: return False, f"Failed to remove archive for plugin '{self}':\n{e}" return True, "Success"
def remove_venv(self, debug: bool = False) ‑> Tuple[bool, str]
-
Remove a plugin's virtual environment.
Expand source code
def remove_venv( self, debug: bool = False ) -> SuccessTuple: """Remove a plugin's virtual environment.""" if not self.venv_path.exists(): return True, f"Virtual environment for plugin '{self}' does not exist." try: shutil.rmtree(self.venv_path) except Exception as e: return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}" return True, "Success"
def setup(self, *args: str, debug: bool = False, **kw: Any) ‑> Union[Tuple[bool, str], bool]
-
If exists, run the plugin's
setup()
function.Parameters
*args
:str
- The positional arguments passed to the
setup()
function. debug
:bool
, defaultFalse
- Verbosity toggle.
**kw
:Any
- The keyword arguments passed to the
setup()
function.
Returns
A
SuccessTuple
orbool
indicating success.Expand source code
def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]: """ If exists, run the plugin's `setup()` function. Parameters ---------- *args: str The positional arguments passed to the `setup()` function. debug: bool, default False Verbosity toggle. **kw: Any The keyword arguments passed to the `setup()` function. Returns ------- A `SuccessTuple` or `bool` indicating success. """ from meerschaum.utils.debug import dprint import inspect _setup = None for name, fp in inspect.getmembers(self.module): if name == 'setup' and inspect.isfunction(fp): _setup = fp break ### assume success if no setup() is found (not necessary) if _setup is None: return True sig = inspect.signature(_setup) has_debug, has_kw = ('debug' in sig.parameters), False for k, v in sig.parameters.items(): if '**' in str(v): has_kw = True break _kw = {} if has_kw: _kw.update(kw) if has_debug: _kw['debug'] = debug if debug: dprint(f"Running setup for plugin '{self}'...") try: self.activate_venv(debug=debug) return_tuple = _setup(*args, **_kw) self.deactivate_venv(debug=debug) except Exception as e: return False, str(e) if isinstance(return_tuple, tuple): return return_tuple if isinstance(return_tuple, bool): return return_tuple, f"Setup for Plugin '{self.name}' did not return a message." if return_tuple is None: return False, f"Setup for Plugin '{self.name}' returned None." return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"
def uninstall(self, debug: bool = False) ‑> Tuple[bool, str]
-
Remove a plugin, its virtual environment, and archive file.
Expand source code
def uninstall(self, debug: bool = False) -> SuccessTuple: """ Remove a plugin, its virtual environment, and archive file. """ from meerschaum.utils.packages import reload_package from meerschaum.plugins import reload_plugins, sync_plugins_symlinks from meerschaum.utils.warnings import warn, info warnings_thrown_count: int = 0 max_warnings: int = 3 if not self.is_installed(): info( f"Plugin '{self.name}' doesn't seem to be installed.\n " + "Checking for artifacts...", stack = False, ) else: real_path = pathlib.Path(os.path.realpath(self.__file__)) try: if real_path.name == '__init__.py': shutil.rmtree(real_path.parent) else: real_path.unlink() except Exception as e: warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False) warnings_thrown_count += 1 else: info(f"Removed source files for plugin '{self.name}'.") if self.venv_path.exists(): success, msg = self.remove_venv(debug=debug) if not success: warn(msg, stack=False) warnings_thrown_count += 1 else: info(f"Removed virtual environment from plugin '{self.name}'.") success = warnings_thrown_count < max_warnings sync_plugins_symlinks(debug=debug) self.deactivate_venv(force=True, debug=debug) reload_package('meerschaum') reload_plugins(debug=debug) return success, ( f"Successfully uninstalled plugin '{self}'." if success else f"Failed to uninstall plugin '{self}'." )
class Venv (venv: "Union[str, 'Plugin', None]" = 'mrsm', debug: bool = False)
-
Manage a virtual enviroment's activation status.
Examples
>>> from meerschaum.plugins import Plugin >>> with Venv('mrsm') as venv: ... import pandas >>> with Venv(Plugin('noaa')) as venv: ... import requests >>> venv = Venv('mrsm') >>> venv.activate() True >>> venv.deactivate() True >>>
Expand source code
class Venv: """ Manage a virtual enviroment's activation status. Examples -------- >>> from meerschaum.plugins import Plugin >>> with Venv('mrsm') as venv: ... import pandas >>> with Venv(Plugin('noaa')) as venv: ... import requests >>> venv = Venv('mrsm') >>> venv.activate() True >>> venv.deactivate() True >>> """ def __init__( self, venv: Union[str, 'meerschaum.plugins.Plugin', None] = 'mrsm', debug: bool = False, ) -> None: from meerschaum.utils.venv import activate_venv, deactivate_venv, active_venvs ### For some weird threading issue, ### we can't use `isinstance` here. if 'meerschaum.plugins._Plugin' in str(type(venv)): self._venv = venv.name self._activate = venv.activate_venv self._deactivate = venv.deactivate_venv self._kwargs = {} else: self._venv = venv self._activate = activate_venv self._deactivate = deactivate_venv self._kwargs = {'venv': venv} self._debug = debug ### In case someone calls `deactivate()` before `activate()`. self._kwargs['previously_active_venvs'] = copy.deepcopy(active_venvs) def activate(self, debug: bool = False) -> bool: """ Activate this virtual environment. If a `meerschaum.plugins.Plugin` was provided, its dependent virtual environments will also be activated. """ from meerschaum.utils.venv import active_venvs self._kwargs['previously_active_venvs'] = copy.deepcopy(active_venvs) return self._activate(debug=(debug or self._debug), **self._kwargs) def deactivate(self, debug: bool = False) -> bool: """ Deactivate this virtual environment. If a `meerschaum.plugins.Plugin` was provided, its dependent virtual environments will also be deactivated. """ return self._deactivate(debug=(debug or self._debug), **self._kwargs) @property def target_path(self) -> pathlib.Path: """ Return the target site-packages path for this virtual environment. A `meerschaum.utils.venv.Venv` may have one virtual environment per minor Python version (e.g. Python 3.10 and Python 3.7). """ from meerschaum.utils.venv import venv_target_path return venv_target_path(venv=self._venv, allow_nonexistent=True, debug=self._debug) @property def root_path(self) -> pathlib.Path: """ Return the top-level path for this virtual environment. """ from meerschaum.config._paths import VIRTENV_RESOURCES_PATH return VIRTENV_RESOURCES_PATH / self._venv def __enter__(self) -> None: self.activate(debug=self._debug) def __exit__(self, exc_type, exc_value, exc_traceback) -> None: self.deactivate(debug=self._debug) def __str__(self) -> str: quote = "'" if self._venv is not None else "" return "Venv(" + quote + str(self._venv) + quote + ")" def __repr__(self) -> str: return self.__str__()
Instance variables
var root_path : pathlib.Path
-
Return the top-level path for this virtual environment.
Expand source code
@property def root_path(self) -> pathlib.Path: """ Return the top-level path for this virtual environment. """ from meerschaum.config._paths import VIRTENV_RESOURCES_PATH return VIRTENV_RESOURCES_PATH / self._venv
var target_path : pathlib.Path
-
Return the target site-packages path for this virtual environment. A
Venv
may have one virtual environment per minor Python version (e.g. Python 3.10 and Python 3.7).Expand source code
@property def target_path(self) -> pathlib.Path: """ Return the target site-packages path for this virtual environment. A `meerschaum.utils.venv.Venv` may have one virtual environment per minor Python version (e.g. Python 3.10 and Python 3.7). """ from meerschaum.utils.venv import venv_target_path return venv_target_path(venv=self._venv, allow_nonexistent=True, debug=self._debug)
Methods
def activate(self, debug: bool = False) ‑> bool
-
Activate this virtual environment. If a
Plugin
was provided, its dependent virtual environments will also be activated.Expand source code
def activate(self, debug: bool = False) -> bool: """ Activate this virtual environment. If a `meerschaum.plugins.Plugin` was provided, its dependent virtual environments will also be activated. """ from meerschaum.utils.venv import active_venvs self._kwargs['previously_active_venvs'] = copy.deepcopy(active_venvs) return self._activate(debug=(debug or self._debug), **self._kwargs)
def deactivate(self, debug: bool = False) ‑> bool
-
Deactivate this virtual environment. If a
Plugin
was provided, its dependent virtual environments will also be deactivated.Expand source code
def deactivate(self, debug: bool = False) -> bool: """ Deactivate this virtual environment. If a `meerschaum.plugins.Plugin` was provided, its dependent virtual environments will also be deactivated. """ return self._deactivate(debug=(debug or self._debug), **self._kwargs)