meerschaum
PyPI | GitHub | Info | Stats |
---|---|---|---|
What is Meerschaum?
Meerschaum is a tool for quickly synchronizing time-series data streams called pipes. With Meerschaum, you can have a data visualization stack running in minutes.
Why Meerschaum?
If you've worked with time-series data, you know the headaches that come with ETL. Data engineering often gets in analysts' way, and when work needs to get done, every minute spent on pipelining is time taken away from real analysis.
Rather than copy / pasting your ETL scripts, simply build pipes with Meerschaum! Meerschaum gives you the tools to design your data streams how you like โ and don't worry โ you can always incorporate Meerschaum into your existing systems!
Features
- ๐ Built for Data Scientists and Analysts
- Integrate with Pandas, Grafana and other popular data analysis tools.
- Persist your dataframes and always get the latest data.
- โก๏ธ Production-Ready, Batteries Included
- Synchronization engine concurrently updates many time-series data streams.
- One-click deploy a TimescaleDB and Grafana stack for prototyping.
- Serve data to your entire organization through the power of
uvicorn
,gunicorn
, andFastAPI
.
- ๐ Easily Expandable
- Ingest any data source with a simple plugin. Just return a DataFrame, and Meerschaum handles the rest.
- Add any function as a command to the Meerschaum system.
- Include Meerschaum in your projects with its easy-to-use Python API.
- โจ Tailored for Your Experience
- Rich CLI makes managing your data streams surprisingly enjoyable!
- Web dashboard for those who prefer a more graphical experience.
- Manage your database connections with Meerschaum connectors
- Utility commands with sensible syntax let you control many pipes with grace.
- ๐ผ Portable from the Start
- The environment variables
$MRSM_ROOT_DIR
,$MRSM_PLUGINS_DIR
, and$MRSM_VENVS_DIR
let you emulate multiple installations and group together your instances. - No dependencies required; anything needed will be installed into virtual environments.
- Specify required packages for your plugins, and users will get those packages in a virtual environment.
- The environment variables
Installation
For a more thorough setup guide, visit the Getting Started page at meerschaum.io.
TL;DR
pip install -U --user meerschaum
mrsm stack up -d db grafana
mrsm bootstrap pipes
Usage Documentation
Please visit meerschaum.io for setup, usage, and troubleshooting information. You can find technical documentation at docs.meerschaum.io, and here is a complete list of the Meerschaum actions.
>>> import meerschaum as mrsm
>>> pipe = mrsm.Pipe("plugin:noaa", "weather")
>>> cols_to_select = ['timestamp', 'station', 'temperature (degC)']
>>> df = pipe.get_data(cols_to_select, begin='2023-11-15', end='2023-11-20')
>>> df
timestamp station temperature (degC)
0 2023-11-15 00:52:00 KATL 16.1
1 2023-11-15 00:52:00 KCLT 11.7
2 2023-11-15 00:53:00 KGMU 15.0
3 2023-11-15 00:54:00 KCEU 13.9
4 2023-11-15 01:52:00 KATL 15.6
.. ... ... ...
535 2023-11-19 22:54:00 KCEU 15.6
536 2023-11-19 23:52:00 KATL 16.7
537 2023-11-19 23:52:00 KCLT 13.9
538 2023-11-19 23:53:00 KGMU 15.6
539 2023-11-19 23:54:00 KCEU 15.0
[540 rows x 3 columns]
>>>
Plugins
Check out the Awesome Meerschaum list for a list of community plugins as well as the public plugins repository.
For details on installing, using, and writing plugins, check out the plugins documentation at meerschaum.io.
Example Plugin
# ~/.config/meerschaum/plugins/example.py
__version__ = '0.0.1'
required = []
def register(pipe, **kw):
return {
'columns': {
'datetime': 'dt',
'id': 'id',
'value': 'val',
}
}
def fetch(pipe, **kw):
import random
from datetime import datetime
docs = [
{
'dt': datetime.now(),
'id': i,
'val': random.ranint(0, 200),
}
for i in range(random.randint(0, 100))
]
return docs
Support Meerschaum's Development
For consulting services and to support Meerschaum's development, please considering sponsoring me on GitHub sponsors.
Additionally, you can always buy me a coffeeโ!
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Copyright 2023 Bennett Meares 7 8Licensed under the Apache License, Version 2.0 (the "License"); 9you may not use this file except in compliance with the License. 10You may obtain a copy of the License at 11 12 http://www.apache.org/licenses/LICENSE-2.0 13 14Unless required by applicable law or agreed to in writing, software 15distributed under the License is distributed on an "AS IS" BASIS, 16WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 17See the License for the specific language governing permissions and 18limitations under the License. 19""" 20 21import atexit 22from meerschaum.utils.typing import SuccessTuple 23from meerschaum.core.Pipe import Pipe 24from meerschaum.plugins import Plugin 25from meerschaum.utils.venv import Venv 26from meerschaum.connectors import get_connector 27from meerschaum.utils import get_pipes 28from meerschaum.utils.formatting import pprint 29from meerschaum._internal.docs import index as __doc__ 30from meerschaum.config import __version__, get_config 31from meerschaum.utils.packages import attempt_import 32from meerschaum.__main__ import _close_pools 33 34atexit.register(_close_pools) 35 36__pdoc__ = {'gui': False, 'api': False, 'core': False, '_internal': False} 37__all__ = ( 38 "get_pipes", 39 "get_connector", 40 "get_config", 41 "Pipe", 42 "Plugin", 43 "Venv", 44 "Plugin", 45 "pprint", 46 "attempt_import", 47 "actions", 48 "config", 49 "connectors", 50 "plugins", 51 "utils", 52)
19def get_pipes( 20 connector_keys: Union[str, List[str], None] = None, 21 metric_keys: Union[str, List[str], None] = None, 22 location_keys: Union[str, List[str], None] = None, 23 tags: Optional[List[str]] = None, 24 params: Optional[Dict[str, Any]] = None, 25 mrsm_instance: Union[str, InstanceConnector, None] = None, 26 instance: Union[str, InstanceConnector, None] = None, 27 as_list: bool = False, 28 method: str = 'registered', 29 wait: bool = False, 30 debug: bool = False, 31 **kw: Any 32 ) -> Union[PipesDict, List[mrsm.Pipe]]: 33 """ 34 Return a dictionary or list of `meerschaum.Pipe` objects. 35 36 Parameters 37 ---------- 38 connector_keys: Union[str, List[str], None], default None 39 String or list of connector keys. 40 If omitted or is `'*'`, fetch all possible keys. 41 If a string begins with `'_'`, select keys that do NOT match the string. 42 43 metric_keys: Union[str, List[str], None], default None 44 String or list of metric keys. See `connector_keys` for formatting. 45 46 location_keys: Union[str, List[str], None], default None 47 String or list of location keys. See `connector_keys` for formatting. 48 49 tags: Optional[List[str]], default None 50 If provided, only include pipes with these tags. 51 52 params: Optional[Dict[str, Any]], default None 53 Dictionary of additional parameters to search by. 54 Params are parsed into a SQL WHERE clause. 55 E.g. `{'a': 1, 'b': 2}` equates to `'WHERE a = 1 AND b = 2'` 56 57 mrsm_instance: Union[str, InstanceConnector, None], default None 58 Connector keys for the Meerschaum instance of the pipes. 59 Must be a `meerschaum.connectors.sql.SQLConnector.SQLConnector` or 60 `meerschaum.connectors.api.APIConnector.APIConnector`. 61 62 as_list: bool, default False 63 If `True`, return pipes in a list instead of a hierarchical dictionary. 64 `False` : `{connector_keys: {metric_key: {location_key: Pipe}}}` 65 `True` : `[Pipe]` 66 67 method: str, default 'registered' 68 Available options: `['registered', 'explicit', 'all']` 69 If `'registered'` (default), create pipes based on registered keys in the connector's pipes table 70 (API or SQL connector, depends on mrsm_instance). 71 If `'explicit'`, create pipes from provided connector_keys, metric_keys, and location_keys 72 instead of consulting the pipes table. Useful for creating non-existent pipes. 73 If `'all'`, create pipes from predefined metrics and locations. Required `connector_keys`. 74 **NOTE:** Method `'all'` is not implemented! 75 76 wait: bool, default False 77 Wait for a connection before getting Pipes. Should only be true for cases where the 78 database might not be running (like the API). 79 80 **kw: Any: 81 Keyword arguments to pass to the `meerschaum.Pipe` constructor. 82 83 84 Returns 85 ------- 86 A dictionary of dictionaries and `meerschaum.Pipe` objects 87 in the connector, metric, location hierarchy. 88 If `as_list` is `True`, return a list of `meerschaum.Pipe` objects. 89 90 Examples 91 -------- 92 ``` 93 >>> ### Manual definition: 94 >>> pipes = { 95 ... <connector_keys>: { 96 ... <metric_key>: { 97 ... <location_key>: Pipe( 98 ... <connector_keys>, 99 ... <metric_key>, 100 ... <location_key>, 101 ... ), 102 ... }, 103 ... }, 104 ... }, 105 >>> ### Accessing a single pipe: 106 >>> pipes['sql:main']['weather'][None] 107 >>> ### Return a list instead: 108 >>> get_pipes(as_list=True) 109 [sql_main_weather] 110 >>> 111 ``` 112 """ 113 114 from meerschaum.config import get_config 115 from meerschaum.utils.warnings import error 116 from meerschaum.utils.misc import filter_keywords 117 118 if connector_keys is None: 119 connector_keys = [] 120 if metric_keys is None: 121 metric_keys = [] 122 if location_keys is None: 123 location_keys = [] 124 if params is None: 125 params = {} 126 if tags is None: 127 tags = [] 128 129 if isinstance(connector_keys, str): 130 connector_keys = [connector_keys] 131 if isinstance(metric_keys, str): 132 metric_keys = [metric_keys] 133 if isinstance(location_keys, str): 134 location_keys = [location_keys] 135 136 ### Get SQL or API connector (keys come from `connector.fetch_pipes_keys()`). 137 ### If `wait`, wait until a connection is made 138 if mrsm_instance is None: 139 mrsm_instance = instance 140 if mrsm_instance is None: 141 mrsm_instance = get_config('meerschaum', 'instance', patch=True) 142 if isinstance(mrsm_instance, str): 143 from meerschaum.connectors.parse import parse_instance_keys 144 connector = parse_instance_keys(keys=mrsm_instance, wait=wait, debug=debug) 145 else: ### NOTE: mrsm_instance MUST be a SQL or API connector for this to work 146 from meerschaum.connectors import instance_types 147 valid_connector = False 148 if hasattr(mrsm_instance, 'type'): 149 if mrsm_instance.type in instance_types: 150 valid_connector = True 151 if not valid_connector: 152 error(f"Invalid instance connector: {mrsm_instance}") 153 connector = mrsm_instance 154 if debug: 155 from meerschaum.utils.debug import dprint 156 dprint(f"Using instance connector: {connector}") 157 if not connector: 158 error(f"Could not create connector from keys: '{mrsm_instance}'") 159 160 ### Get a list of tuples for the keys needed to build pipes. 161 result = fetch_pipes_keys( 162 method, 163 connector, 164 connector_keys = connector_keys, 165 metric_keys = metric_keys, 166 location_keys = location_keys, 167 tags = tags, 168 params = params, 169 debug = debug 170 ) 171 if result is None: 172 error(f"Unable to build pipes!") 173 174 ### Populate the `pipes` dictionary with Pipes based on the keys 175 ### obtained from the chosen `method`. 176 from meerschaum import Pipe 177 pipes = {} 178 for ck, mk, lk in result: 179 if ck not in pipes: 180 pipes[ck] = {} 181 182 if mk not in pipes[ck]: 183 pipes[ck][mk] = {} 184 185 pipes[ck][mk][lk] = Pipe( 186 ck, mk, lk, 187 mrsm_instance = connector, 188 debug = debug, 189 **filter_keywords(Pipe, **kw) 190 ) 191 192 if not as_list: 193 return pipes 194 from meerschaum.utils.misc import flatten_pipes_dict 195 return flatten_pipes_dict(pipes)
Return a dictionary or list of meerschaum.Pipe
objects.
Parameters
- connector_keys (Union[str, List[str], None], default None):
String or list of connector keys.
If omitted or is
'*'
, fetch all possible keys. If a string begins with'_'
, select keys that do NOT match the string. - metric_keys (Union[str, List[str], None], default None):
String or list of metric keys. See
connector_keys
for formatting. - location_keys (Union[str, List[str], None], default None):
String or list of location keys. See
connector_keys
for formatting. - tags (Optional[List[str]], default None): If provided, only include pipes with these tags.
- params (Optional[Dict[str, Any]], default None):
Dictionary of additional parameters to search by.
Params are parsed into a SQL WHERE clause.
E.g.
{'a': 1, 'b': 2}
equates to'WHERE a = 1 AND b = 2'
- mrsm_instance (Union[str, InstanceConnector, None], default None):
Connector keys for the Meerschaum instance of the pipes.
Must be a
meerschaum.connectors.sql.SQLConnector.SQLConnector
ormeerschaum.connectors.api.APIConnector.APIConnector
. - as_list (bool, default False):
If
True
, return pipes in a list instead of a hierarchical dictionary.False
:{connector_keys: {metric_key: {location_key: Pipe}}}
True
:[Pipe]
- method (str, default 'registered'):
Available options:
['registered', 'explicit', 'all']
If'registered'
(default), create pipes based on registered keys in the connector's pipes table (API or SQL connector, depends on mrsm_instance). If'explicit'
, create pipes from provided connector_keys, metric_keys, and location_keys instead of consulting the pipes table. Useful for creating non-existent pipes. If'all'
, create pipes from predefined metrics and locations. Requiredconnector_keys
. NOTE: Method'all'
is not implemented! - wait (bool, default False): Wait for a connection before getting Pipes. Should only be true for cases where the database might not be running (like the API).
- **kw (Any:):
Keyword arguments to pass to the
meerschaum.Pipe
constructor.
Returns
- A dictionary of dictionaries and
meerschaum.Pipe
objects - in the connector, metric, location hierarchy.
- If
as_list
isTrue
, return a list ofmeerschaum.Pipe
objects.
Examples
>>> ### Manual definition:
>>> pipes = {
... <connector_keys>: {
... <metric_key>: {
... <location_key>: Pipe(
... <connector_keys>,
... <metric_key>,
... <location_key>,
... ),
... },
... },
... },
>>> ### Accessing a single pipe:
>>> pipes['sql:main']['weather'][None]
>>> ### Return a list instead:
>>> get_pipes(as_list=True)
[sql_main_weather]
>>>
65def get_connector( 66 type: str = None, 67 label: str = None, 68 refresh: bool = False, 69 debug: bool = False, 70 **kw: Any 71 ) -> Connector: 72 """ 73 Return existing connector or create new connection and store for reuse. 74 75 You can create new connectors if enough parameters are provided for the given type and flavor. 76 77 78 Parameters 79 ---------- 80 type: Optional[str], default None 81 Connector type (sql, api, etc.). 82 Defaults to the type of the configured `instance_connector`. 83 84 label: Optional[str], default None 85 Connector label (e.g. main). Defaults to `'main'`. 86 87 refresh: bool, default False 88 Refresh the Connector instance / construct new object. Defaults to `False`. 89 90 kw: Any 91 Other arguments to pass to the Connector constructor. 92 If the Connector has already been constructed and new arguments are provided, 93 `refresh` is set to `True` and the old Connector is replaced. 94 95 Returns 96 ------- 97 A new Meerschaum connector (e.g. `meerschaum.connectors.api.APIConnector`, 98 `meerschaum.connectors.sql.SQLConnector`). 99 100 Examples 101 -------- 102 The following parameters would create a new 103 `meerschaum.connectors.sql.SQLConnector` that isn't in the configuration file. 104 105 ``` 106 >>> conn = get_connector( 107 ... type = 'sql', 108 ... label = 'newlabel', 109 ... flavor = 'sqlite', 110 ... database = '/file/path/to/database.db' 111 ... ) 112 >>> 113 ``` 114 115 """ 116 from meerschaum.connectors.parse import parse_instance_keys 117 from meerschaum.config import get_config 118 from meerschaum.config.static import STATIC_CONFIG 119 from meerschaum.utils.warnings import warn 120 global _loaded_plugin_connectors 121 if isinstance(type, str) and not label and ':' in type: 122 type, label = type.split(':', maxsplit=1) 123 with _locks['_loaded_plugin_connectors']: 124 if not _loaded_plugin_connectors: 125 load_plugin_connectors() 126 _loaded_plugin_connectors = True 127 if type is None and label is None: 128 default_instance_keys = get_config('meerschaum', 'instance', patch=True) 129 ### recursive call to get_connector 130 return parse_instance_keys(default_instance_keys) 131 132 ### NOTE: the default instance connector may not be main. 133 ### Only fall back to 'main' if the type is provided by the label is omitted. 134 label = label if label is not None else STATIC_CONFIG['connectors']['default_label'] 135 136 ### type might actually be a label. Check if so and raise a warning. 137 if type not in connectors: 138 possibilities, poss_msg = [], "" 139 for _type in get_config('meerschaum', 'connectors'): 140 if type in get_config('meerschaum', 'connectors', _type): 141 possibilities.append(f"{_type}:{type}") 142 if len(possibilities) > 0: 143 poss_msg = " Did you mean" 144 for poss in possibilities[:-1]: 145 poss_msg += f" '{poss}'," 146 if poss_msg.endswith(','): 147 poss_msg = poss_msg[:-1] 148 if len(possibilities) > 1: 149 poss_msg += " or" 150 poss_msg += f" '{possibilities[-1]}'?" 151 152 warn(f"Cannot create Connector of type '{type}'." + poss_msg, stack=False) 153 return None 154 155 if 'sql' not in types: 156 from meerschaum.connectors.plugin import PluginConnector 157 with _locks['types']: 158 types.update({ 159 'api' : APIConnector, 160 'sql' : SQLConnector, 161 'plugin': PluginConnector, 162 }) 163 164 ### determine if we need to call the constructor 165 if not refresh: 166 ### see if any user-supplied arguments differ from the existing instance 167 if label in connectors[type]: 168 warning_message = None 169 for attribute, value in kw.items(): 170 if attribute not in connectors[type][label].meta: 171 import inspect 172 cls = connectors[type][label].__class__ 173 cls_init_signature = inspect.signature(cls) 174 cls_init_params = cls_init_signature.parameters 175 if attribute not in cls_init_params: 176 warning_message = ( 177 f"Received new attribute '{attribute}' not present in connector " + 178 f"{connectors[type][label]}.\n" 179 ) 180 elif connectors[type][label].__dict__[attribute] != value: 181 warning_message = ( 182 f"Mismatched values for attribute '{attribute}' in connector " 183 + f"'{connectors[type][label]}'.\n" + 184 f" - Keyword value: '{value}'\n" + 185 f" - Existing value: '{connectors[type][label].__dict__[attribute]}'\n" 186 ) 187 if warning_message is not None: 188 warning_message += ( 189 "\nSetting `refresh` to True and recreating connector with type:" 190 + f" '{type}' and label '{label}'." 191 ) 192 refresh = True 193 warn(warning_message) 194 else: ### connector doesn't yet exist 195 refresh = True 196 197 ### only create an object if refresh is True 198 ### (can be manually specified, otherwise determined above) 199 if refresh: 200 with _locks['connectors']: 201 try: 202 ### will raise an error if configuration is incorrect / missing 203 conn = types[type](label=label, **kw) 204 connectors[type][label] = conn 205 except InvalidAttributesError as ie: 206 warn( 207 f"Incorrect attributes for connector '{type}:{label}'.\n" 208 + str(ie), 209 stack = False, 210 ) 211 conn = None 212 except Exception as e: 213 from meerschaum.utils.formatting import get_console 214 console = get_console() 215 if console: 216 console.print_exception() 217 warn( 218 f"Exception when creating connector '{type}:{label}'.\n" + str(e), 219 stack = False, 220 ) 221 conn = None 222 if conn is None: 223 return None 224 225 return connectors[type][label]
Return existing connector or create new connection and store for reuse.
You can create new connectors if enough parameters are provided for the given type and flavor.
Parameters
- type (Optional[str], default None):
Connector type (sql, api, etc.).
Defaults to the type of the configured
instance_connector
. - label (Optional[str], default None):
Connector label (e.g. main). Defaults to
'main'
. - refresh (bool, default False):
Refresh the Connector instance / construct new object. Defaults to
False
. - kw (Any):
Other arguments to pass to the Connector constructor.
If the Connector has already been constructed and new arguments are provided,
refresh
is set toTrue
and the old Connector is replaced.
Returns
- A new Meerschaum connector (e.g.
meerschaum.connectors.api.APIConnector
, meerschaum.connectors.sql.SQLConnector
).
Examples
The following parameters would create a new
meerschaum.connectors.sql.SQLConnector
that isn't in the configuration file.
>>> conn = get_connector(
... type = 'sql',
... label = 'newlabel',
... flavor = 'sqlite',
... database = '/file/path/to/database.db'
... )
>>>
75def get_config( 76 *keys: str, 77 patch: bool = True, 78 substitute: bool = True, 79 sync_files: bool = True, 80 write_missing: bool = True, 81 as_tuple: bool = False, 82 warn: bool = True, 83 debug: bool = False 84 ) -> Any: 85 """ 86 Return the Meerschaum configuration dictionary. 87 If positional arguments are provided, index by the keys. 88 Raises a warning if invalid keys are provided. 89 90 Parameters 91 ---------- 92 keys: str: 93 List of strings to index. 94 95 patch: bool, default True 96 If `True`, patch missing default keys into the config directory. 97 Defaults to `True`. 98 99 sync_files: bool, default True 100 If `True`, sync files if needed. 101 Defaults to `True`. 102 103 write_missing: bool, default True 104 If `True`, write default values when the main config files are missing. 105 Defaults to `True`. 106 107 substitute: bool, default True 108 If `True`, subsitute 'MRSM{}' values. 109 Defaults to `True`. 110 111 as_tuple: bool, default False 112 If `True`, return a tuple of type (success, value). 113 Defaults to `False`. 114 115 Returns 116 ------- 117 The value in the configuration directory, indexed by the provided keys. 118 119 Examples 120 -------- 121 >>> get_config('meerschaum', 'instance') 122 'sql:main' 123 >>> get_config('does', 'not', 'exist') 124 UserWarning: Invalid keys in config: ('does', 'not', 'exist') 125 """ 126 import json 127 128 symlinks_key = STATIC_CONFIG['config']['symlinks_key'] 129 if debug: 130 from meerschaum.utils.debug import dprint 131 dprint(f"Indexing keys: {keys}", color=False) 132 133 if len(keys) == 0: 134 _rc = _config(substitute=substitute, sync_files=sync_files, write_missing=write_missing) 135 if as_tuple: 136 return True, _rc 137 return _rc 138 139 ### Weird threading issues, only import if substitute is True. 140 if substitute: 141 from meerschaum.config._read_config import search_and_substitute_config 142 ### Invalidate the cache if it was read before with substitute=False 143 ### but there still exist substitutions. 144 if ( 145 config is not None and substitute and keys[0] != symlinks_key 146 and 'MRSM{' in json.dumps(config.get(keys[0])) 147 ): 148 try: 149 _subbed = search_and_substitute_config({keys[0]: config[keys[0]]}) 150 except Exception as e: 151 import traceback 152 traceback.print_exc() 153 config[keys[0]] = _subbed[keys[0]] 154 if symlinks_key in _subbed: 155 if symlinks_key not in config: 156 config[symlinks_key] = {} 157 if keys[0] not in config[symlinks_key]: 158 config[symlinks_key][keys[0]] = {} 159 config[symlinks_key][keys[0]] = apply_patch_to_config( 160 _subbed, 161 config[symlinks_key][keys[0]] 162 ) 163 164 from meerschaum.config._sync import sync_files as _sync_files 165 if config is None: 166 _config(*keys, sync_files=sync_files) 167 168 invalid_keys = False 169 if keys[0] not in config and keys[0] != symlinks_key: 170 single_key_config = read_config( 171 keys=[keys[0]], substitute=substitute, write_missing=write_missing 172 ) 173 if keys[0] not in single_key_config: 174 invalid_keys = True 175 else: 176 config[keys[0]] = single_key_config.get(keys[0], None) 177 if symlinks_key in single_key_config and keys[0] in single_key_config[symlinks_key]: 178 if symlinks_key not in config: 179 config[symlinks_key] = {} 180 config[symlinks_key][keys[0]] = single_key_config[symlinks_key][keys[0]] 181 182 if sync_files: 183 _sync_files(keys=[keys[0]]) 184 185 c = config 186 if len(keys) > 0: 187 for k in keys: 188 try: 189 c = c[k] 190 except Exception as e: 191 invalid_keys = True 192 break 193 if invalid_keys: 194 ### Check if the keys are in the default configuration. 195 from meerschaum.config._default import default_config 196 in_default = True 197 patched_default_config = ( 198 search_and_substitute_config(default_config) 199 if substitute else copy.deepcopy(default_config) 200 ) 201 _c = patched_default_config 202 for k in keys: 203 try: 204 _c = _c[k] 205 except Exception as e: 206 in_default = False 207 if in_default: 208 c = _c 209 invalid_keys = False 210 warning_msg = f"Invalid keys in config: {keys}" 211 if not in_default: 212 try: 213 if warn: 214 from meerschaum.utils.warnings import warn as _warn 215 _warn(warning_msg, stacklevel=3, color=False) 216 except Exception as e: 217 if warn: 218 print(warning_msg) 219 if as_tuple: 220 return False, None 221 return None 222 223 ### Don't write keys that we haven't yet loaded into memory. 224 not_loaded_keys = [k for k in patched_default_config if k not in config] 225 for k in not_loaded_keys: 226 patched_default_config.pop(k, None) 227 228 set_config( 229 apply_patch_to_config( 230 patched_default_config, 231 config, 232 ) 233 ) 234 if patch and keys[0] != symlinks_key: 235 if write_missing: 236 write_config(config, debug=debug) 237 238 if as_tuple: 239 return (not invalid_keys), c 240 return c
Return the Meerschaum configuration dictionary. If positional arguments are provided, index by the keys. Raises a warning if invalid keys are provided.
Parameters
- keys (str:): List of strings to index.
- patch (bool, default True):
If
True
, patch missing default keys into the config directory. Defaults toTrue
. - sync_files (bool, default True):
If
True
, sync files if needed. Defaults toTrue
. - write_missing (bool, default True):
If
True
, write default values when the main config files are missing. Defaults toTrue
. - substitute (bool, default True):
If
True
, subsitute 'MRSM{}' values. Defaults toTrue
. - as_tuple (bool, default False):
If
True
, return a tuple of type (success, value). Defaults toFalse
.
Returns
- The value in the configuration directory, indexed by the provided keys.
Examples
>>> get_config('meerschaum', 'instance')
'sql:main'
>>> get_config('does', 'not', 'exist')
UserWarning: Invalid keys in config: ('does', 'not', 'exist')
59class Pipe: 60 """ 61 Access Meerschaum pipes via Pipe objects. 62 63 Pipes are identified by the following: 64 65 1. Connector keys (e.g. `'sql:main'`) 66 2. Metric key (e.g. `'weather'`) 67 3. Location (optional; e.g. `None`) 68 69 A pipe's connector keys correspond to a data source, and when the pipe is synced, 70 its `fetch` definition is evaluated and executed to produce new data. 71 72 Alternatively, new data may be directly synced via `pipe.sync()`: 73 74 ``` 75 >>> from meerschaum import Pipe 76 >>> pipe = Pipe('csv', 'weather') 77 >>> 78 >>> import pandas as pd 79 >>> df = pd.read_csv('weather.csv') 80 >>> pipe.sync(df) 81 ``` 82 """ 83 84 from ._fetch import ( 85 fetch, 86 get_backtrack_interval, 87 ) 88 from ._data import ( 89 get_data, 90 get_backtrack_data, 91 get_rowcount, 92 _get_data_as_iterator, 93 get_chunk_interval, 94 get_chunk_bounds, 95 ) 96 from ._register import register 97 from ._attributes import ( 98 attributes, 99 parameters, 100 columns, 101 dtypes, 102 get_columns, 103 get_columns_types, 104 get_indices, 105 tags, 106 get_id, 107 id, 108 get_val_column, 109 parents, 110 children, 111 target, 112 _target_legacy, 113 guess_datetime, 114 ) 115 from ._show import show 116 from ._edit import edit, edit_definition, update 117 from ._sync import ( 118 sync, 119 get_sync_time, 120 exists, 121 filter_existing, 122 _get_chunk_label, 123 get_num_workers, 124 ) 125 from ._verify import ( 126 verify, 127 get_bound_interval, 128 get_bound_time, 129 ) 130 from ._delete import delete 131 from ._drop import drop 132 from ._clear import clear 133 from ._deduplicate import deduplicate 134 from ._bootstrap import bootstrap 135 from ._dtypes import enforce_dtypes, infer_dtypes 136 137 def __init__( 138 self, 139 connector: str = '', 140 metric: str = '', 141 location: Optional[str] = None, 142 parameters: Optional[Dict[str, Any]] = None, 143 columns: Union[Dict[str, str], List[str], None] = None, 144 tags: Optional[List[str]] = None, 145 target: Optional[str] = None, 146 dtypes: Optional[Dict[str, str]] = None, 147 instance: Optional[Union[str, InstanceConnector]] = None, 148 temporary: bool = False, 149 mrsm_instance: Optional[Union[str, InstanceConnector]] = None, 150 cache: bool = False, 151 debug: bool = False, 152 connector_keys: Optional[str] = None, 153 metric_key: Optional[str] = None, 154 location_key: Optional[str] = None, 155 ): 156 """ 157 Parameters 158 ---------- 159 connector: str 160 Keys for the pipe's source connector, e.g. `'sql:main'`. 161 162 metric: str 163 Label for the pipe's contents, e.g. `'weather'`. 164 165 location: str, default None 166 Label for the pipe's location. Defaults to `None`. 167 168 parameters: Optional[Dict[str, Any]], default None 169 Optionally set a pipe's parameters from the constructor, 170 e.g. columns and other attributes. 171 You can edit these parameters with `edit pipes`. 172 173 columns: Optional[Dict[str, str]], default None 174 Set the `columns` dictionary of `parameters`. 175 If `parameters` is also provided, this dictionary is added under the `'columns'` key. 176 177 tags: Optional[List[str]], default None 178 A list of strings to be added under the `'tags'` key of `parameters`. 179 You can select pipes with certain tags using `--tags`. 180 181 dtypes: Optional[Dict[str, str]], default None 182 Set the `dtypes` dictionary of `parameters`. 183 If `parameters` is also provided, this dictionary is added under the `'dtypes'` key. 184 185 mrsm_instance: Optional[Union[str, InstanceConnector]], default None 186 Connector for the Meerschaum instance where the pipe resides. 187 Defaults to the preconfigured default instance (`'sql:main'`). 188 189 instance: Optional[Union[str, InstanceConnector]], default None 190 Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored. 191 192 temporary: bool, default False 193 If `True`, prevent instance tables (pipes, users, plugins) from being created. 194 195 cache: bool, default False 196 If `True`, cache fetched data into a local database file. 197 Defaults to `False`. 198 """ 199 from meerschaum.utils.warnings import error, warn 200 if (not connector and not connector_keys) or (not metric and not metric_key): 201 error( 202 "Please provide strings for the connector and metric\n " 203 + "(first two positional arguments)." 204 ) 205 206 ### Fall back to legacy `location_key` just in case. 207 if not location: 208 location = location_key 209 210 if not connector: 211 connector = connector_keys 212 213 if not metric: 214 metric = metric_key 215 216 if location in ('[None]', 'None'): 217 location = None 218 219 from meerschaum.config.static import STATIC_CONFIG 220 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 221 for k in (connector, metric, location, *(tags or [])): 222 if str(k).startswith(negation_prefix): 223 error(f"A pipe's keys and tags cannot start with the prefix '{negation_prefix}'.") 224 225 self.connector_keys = str(connector) 226 self.connector_key = self.connector_keys ### Alias 227 self.metric_key = metric 228 self.location_key = location 229 self.temporary = temporary 230 231 self._attributes = { 232 'connector_keys': self.connector_keys, 233 'metric_key': self.metric_key, 234 'location_key': self.location_key, 235 'parameters': {}, 236 } 237 238 ### only set parameters if values are provided 239 if isinstance(parameters, dict): 240 self._attributes['parameters'] = parameters 241 else: 242 if parameters is not None: 243 warn(f"The provided parameters are of invalid type '{type(parameters)}'.") 244 self._attributes['parameters'] = {} 245 246 columns = columns or self._attributes.get('parameters', {}).get('columns', {}) 247 if isinstance(columns, list): 248 columns = {str(col): str(col) for col in columns} 249 if isinstance(columns, dict): 250 self._attributes['parameters']['columns'] = columns 251 elif columns is not None: 252 warn(f"The provided columns are of invalid type '{type(columns)}'.") 253 254 if isinstance(tags, (list, tuple)): 255 self._attributes['parameters']['tags'] = tags 256 elif tags is not None: 257 warn(f"The provided tags are of invalid type '{type(tags)}'.") 258 259 if isinstance(target, str): 260 self._attributes['parameters']['target'] = target 261 elif target is not None: 262 warn(f"The provided target is of invalid type '{type(target)}'.") 263 264 if isinstance(dtypes, dict): 265 self._attributes['parameters']['dtypes'] = dtypes 266 elif dtypes is not None: 267 warn(f"The provided dtypes are of invalid type '{type(dtypes)}'.") 268 269 ### NOTE: The parameters dictionary is {} by default. 270 ### A Pipe may be registered without parameters, then edited, 271 ### or a Pipe may be registered with parameters set in-memory first. 272 # from meerschaum.config import get_config 273 _mrsm_instance = mrsm_instance if mrsm_instance is not None else instance 274 if _mrsm_instance is None: 275 _mrsm_instance = get_config('meerschaum', 'instance', patch=True) 276 277 if not isinstance(_mrsm_instance, str): 278 self._instance_connector = _mrsm_instance 279 self.instance_keys = str(_mrsm_instance) 280 else: ### NOTE: must be SQL or API Connector for this work 281 self.instance_keys = _mrsm_instance 282 283 self._cache = cache and get_config('system', 'experimental', 'cache') 284 285 286 @property 287 def meta(self): 288 """ 289 Return the four keys needed to reconstruct this pipe. 290 """ 291 return { 292 'connector': self.connector_keys, 293 'metric': self.metric_key, 294 'location': self.location_key, 295 'instance': self.instance_keys, 296 } 297 298 299 def keys(self) -> List[str]: 300 """ 301 Return the ordered keys for this pipe. 302 """ 303 return { 304 key: val 305 for key, val in self.meta.items() 306 if key != 'instance' 307 } 308 309 310 @property 311 def instance_connector(self) -> Union[InstanceConnector, None]: 312 """ 313 The connector to where this pipe resides. 314 May either be of type `meerschaum.connectors.sql.SQLConnector` or 315 `meerschaum.connectors.api.APIConnector`. 316 """ 317 if '_instance_connector' not in self.__dict__: 318 from meerschaum.connectors.parse import parse_instance_keys 319 conn = parse_instance_keys(self.instance_keys) 320 if conn: 321 self._instance_connector = conn 322 else: 323 return None 324 return self._instance_connector 325 326 @property 327 def connector(self) -> Union[meerschaum.connectors.Connector, None]: 328 """ 329 The connector to the data source. 330 """ 331 if '_connector' not in self.__dict__: 332 from meerschaum.connectors.parse import parse_instance_keys 333 import warnings 334 with warnings.catch_warnings(): 335 warnings.simplefilter('ignore') 336 try: 337 conn = parse_instance_keys(self.connector_keys) 338 except Exception as e: 339 conn = None 340 if conn: 341 self._connector = conn 342 else: 343 return None 344 return self._connector 345 346 347 @property 348 def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]: 349 """ 350 If the pipe was created with `cache=True`, return the connector to the pipe's 351 SQLite database for caching. 352 """ 353 if not self._cache: 354 return None 355 356 if '_cache_connector' not in self.__dict__: 357 from meerschaum.connectors import get_connector 358 from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH 359 _resources_path = SQLITE_RESOURCES_PATH 360 self._cache_connector = get_connector( 361 'sql', '_cache_' + str(self), 362 flavor='sqlite', 363 database=str(_resources_path / ('_cache_' + str(self) + '.db')), 364 ) 365 366 return self._cache_connector 367 368 369 @property 370 def cache_pipe(self) -> Union['meerschaum.Pipe', None]: 371 """ 372 If the pipe was created with `cache=True`, return another `meerschaum.Pipe` used to 373 manage the local data. 374 """ 375 if self.cache_connector is None: 376 return None 377 if '_cache_pipe' not in self.__dict__: 378 from meerschaum.config._patch import apply_patch_to_config 379 from meerschaum.utils.sql import sql_item_name 380 _parameters = copy.deepcopy(self.parameters) 381 _fetch_patch = { 382 'fetch': ({ 383 'definition': ( 384 f"SELECT * FROM " 385 + sql_item_name( 386 str(self.target), 387 self.instance_connector.flavor, 388 self.instance_connector.get_pipe_schema(self), 389 ) 390 ), 391 }) if self.instance_connector.type == 'sql' else ({ 392 'connector_keys': self.connector_keys, 393 'metric_key': self.metric_key, 394 'location_key': self.location_key, 395 }) 396 } 397 _parameters = apply_patch_to_config(_parameters, _fetch_patch) 398 self._cache_pipe = Pipe( 399 self.instance_keys, 400 (self.connector_keys + '_' + self.metric_key + '_cache'), 401 self.location_key, 402 mrsm_instance = self.cache_connector, 403 parameters = _parameters, 404 cache = False, 405 temporary = True, 406 ) 407 408 return self._cache_pipe 409 410 411 def __str__(self, ansi: bool=False): 412 return pipe_repr(self, ansi=ansi) 413 414 415 def __eq__(self, other): 416 try: 417 return ( 418 isinstance(self, type(other)) 419 and self.connector_keys == other.connector_keys 420 and self.metric_key == other.metric_key 421 and self.location_key == other.location_key 422 and self.instance_keys == other.instance_keys 423 ) 424 except Exception as e: 425 return False 426 427 def __hash__(self): 428 ### Using an esoteric separator to avoid collisions. 429 sep = "[\"']" 430 return hash( 431 str(self.connector_keys) + sep 432 + str(self.metric_key) + sep 433 + str(self.location_key) + sep 434 + str(self.instance_keys) + sep 435 ) 436 437 def __repr__(self, **kw) -> str: 438 return pipe_repr(self, **kw) 439 440 def __getstate__(self) -> Dict[str, Any]: 441 """ 442 Define the state dictionary (pickling). 443 """ 444 return { 445 'connector': self.connector_keys, 446 'metric': self.metric_key, 447 'location': self.location_key, 448 'parameters': self.parameters, 449 'instance': self.instance_keys, 450 } 451 452 def __setstate__(self, _state: Dict[str, Any]): 453 """ 454 Read the state (unpickling). 455 """ 456 self.__init__(**_state) 457 458 459 def __getitem__(self, key: str) -> Any: 460 """ 461 Index the pipe's attributes. 462 If the `key` cannot be found`, return `None`. 463 """ 464 if key in self.attributes: 465 return self.attributes.get(key, None) 466 467 aliases = { 468 'connector': 'connector_keys', 469 'connector_key': 'connector_keys', 470 'metric': 'metric_key', 471 'location': 'location_key', 472 } 473 aliased_key = aliases.get(key, None) 474 if aliased_key is not None: 475 return self.attributes.get(aliased_key, None) 476 477 property_aliases = { 478 'instance': 'instance_keys', 479 'instance_key': 'instance_keys', 480 } 481 aliased_key = property_aliases.get(key, None) 482 if aliased_key is not None: 483 key = aliased_key 484 return getattr(self, key, None)
Access Meerschaum pipes via Pipe objects.
Pipes are identified by the following:
- Connector keys (e.g.
'sql:main'
) - Metric key (e.g.
'weather'
) - Location (optional; e.g.
None
)
A pipe's connector keys correspond to a data source, and when the pipe is synced,
its fetch
definition is evaluated and executed to produce new data.
Alternatively, new data may be directly synced via pipe.sync()
:
>>> from meerschaum import Pipe
>>> pipe = Pipe('csv', 'weather')
>>>
>>> import pandas as pd
>>> df = pd.read_csv('weather.csv')
>>> pipe.sync(df)
137 def __init__( 138 self, 139 connector: str = '', 140 metric: str = '', 141 location: Optional[str] = None, 142 parameters: Optional[Dict[str, Any]] = None, 143 columns: Union[Dict[str, str], List[str], None] = None, 144 tags: Optional[List[str]] = None, 145 target: Optional[str] = None, 146 dtypes: Optional[Dict[str, str]] = None, 147 instance: Optional[Union[str, InstanceConnector]] = None, 148 temporary: bool = False, 149 mrsm_instance: Optional[Union[str, InstanceConnector]] = None, 150 cache: bool = False, 151 debug: bool = False, 152 connector_keys: Optional[str] = None, 153 metric_key: Optional[str] = None, 154 location_key: Optional[str] = None, 155 ): 156 """ 157 Parameters 158 ---------- 159 connector: str 160 Keys for the pipe's source connector, e.g. `'sql:main'`. 161 162 metric: str 163 Label for the pipe's contents, e.g. `'weather'`. 164 165 location: str, default None 166 Label for the pipe's location. Defaults to `None`. 167 168 parameters: Optional[Dict[str, Any]], default None 169 Optionally set a pipe's parameters from the constructor, 170 e.g. columns and other attributes. 171 You can edit these parameters with `edit pipes`. 172 173 columns: Optional[Dict[str, str]], default None 174 Set the `columns` dictionary of `parameters`. 175 If `parameters` is also provided, this dictionary is added under the `'columns'` key. 176 177 tags: Optional[List[str]], default None 178 A list of strings to be added under the `'tags'` key of `parameters`. 179 You can select pipes with certain tags using `--tags`. 180 181 dtypes: Optional[Dict[str, str]], default None 182 Set the `dtypes` dictionary of `parameters`. 183 If `parameters` is also provided, this dictionary is added under the `'dtypes'` key. 184 185 mrsm_instance: Optional[Union[str, InstanceConnector]], default None 186 Connector for the Meerschaum instance where the pipe resides. 187 Defaults to the preconfigured default instance (`'sql:main'`). 188 189 instance: Optional[Union[str, InstanceConnector]], default None 190 Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored. 191 192 temporary: bool, default False 193 If `True`, prevent instance tables (pipes, users, plugins) from being created. 194 195 cache: bool, default False 196 If `True`, cache fetched data into a local database file. 197 Defaults to `False`. 198 """ 199 from meerschaum.utils.warnings import error, warn 200 if (not connector and not connector_keys) or (not metric and not metric_key): 201 error( 202 "Please provide strings for the connector and metric\n " 203 + "(first two positional arguments)." 204 ) 205 206 ### Fall back to legacy `location_key` just in case. 207 if not location: 208 location = location_key 209 210 if not connector: 211 connector = connector_keys 212 213 if not metric: 214 metric = metric_key 215 216 if location in ('[None]', 'None'): 217 location = None 218 219 from meerschaum.config.static import STATIC_CONFIG 220 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 221 for k in (connector, metric, location, *(tags or [])): 222 if str(k).startswith(negation_prefix): 223 error(f"A pipe's keys and tags cannot start with the prefix '{negation_prefix}'.") 224 225 self.connector_keys = str(connector) 226 self.connector_key = self.connector_keys ### Alias 227 self.metric_key = metric 228 self.location_key = location 229 self.temporary = temporary 230 231 self._attributes = { 232 'connector_keys': self.connector_keys, 233 'metric_key': self.metric_key, 234 'location_key': self.location_key, 235 'parameters': {}, 236 } 237 238 ### only set parameters if values are provided 239 if isinstance(parameters, dict): 240 self._attributes['parameters'] = parameters 241 else: 242 if parameters is not None: 243 warn(f"The provided parameters are of invalid type '{type(parameters)}'.") 244 self._attributes['parameters'] = {} 245 246 columns = columns or self._attributes.get('parameters', {}).get('columns', {}) 247 if isinstance(columns, list): 248 columns = {str(col): str(col) for col in columns} 249 if isinstance(columns, dict): 250 self._attributes['parameters']['columns'] = columns 251 elif columns is not None: 252 warn(f"The provided columns are of invalid type '{type(columns)}'.") 253 254 if isinstance(tags, (list, tuple)): 255 self._attributes['parameters']['tags'] = tags 256 elif tags is not None: 257 warn(f"The provided tags are of invalid type '{type(tags)}'.") 258 259 if isinstance(target, str): 260 self._attributes['parameters']['target'] = target 261 elif target is not None: 262 warn(f"The provided target is of invalid type '{type(target)}'.") 263 264 if isinstance(dtypes, dict): 265 self._attributes['parameters']['dtypes'] = dtypes 266 elif dtypes is not None: 267 warn(f"The provided dtypes are of invalid type '{type(dtypes)}'.") 268 269 ### NOTE: The parameters dictionary is {} by default. 270 ### A Pipe may be registered without parameters, then edited, 271 ### or a Pipe may be registered with parameters set in-memory first. 272 # from meerschaum.config import get_config 273 _mrsm_instance = mrsm_instance if mrsm_instance is not None else instance 274 if _mrsm_instance is None: 275 _mrsm_instance = get_config('meerschaum', 'instance', patch=True) 276 277 if not isinstance(_mrsm_instance, str): 278 self._instance_connector = _mrsm_instance 279 self.instance_keys = str(_mrsm_instance) 280 else: ### NOTE: must be SQL or API Connector for this work 281 self.instance_keys = _mrsm_instance 282 283 self._cache = cache and get_config('system', 'experimental', 'cache')
Parameters
- connector (str):
Keys for the pipe's source connector, e.g.
'sql:main'
. - metric (str):
Label for the pipe's contents, e.g.
'weather'
. - location (str, default None):
Label for the pipe's location. Defaults to
None
. - parameters (Optional[Dict[str, Any]], default None):
Optionally set a pipe's parameters from the constructor,
e.g. columns and other attributes.
You can edit these parameters with
edit pipes
. - columns (Optional[Dict[str, str]], default None):
Set the
columns
dictionary ofparameters
. Ifparameters
is also provided, this dictionary is added under the'columns'
key. - tags (Optional[List[str]], default None):
A list of strings to be added under the
'tags'
key ofparameters
. You can select pipes with certain tags using--tags
. - dtypes (Optional[Dict[str, str]], default None):
Set the
dtypes
dictionary ofparameters
. Ifparameters
is also provided, this dictionary is added under the'dtypes'
key. - mrsm_instance (Optional[Union[str, InstanceConnector]], default None):
Connector for the Meerschaum instance where the pipe resides.
Defaults to the preconfigured default instance (
'sql:main'
). - instance (Optional[Union[str, InstanceConnector]], default None):
Alias for
mrsm_instance
. Ifmrsm_instance
is supplied, this value is ignored. - temporary (bool, default False):
If
True
, prevent instance tables (pipes, users, plugins) from being created. - cache (bool, default False):
If
True
, cache fetched data into a local database file. Defaults toFalse
.
299 def keys(self) -> List[str]: 300 """ 301 Return the ordered keys for this pipe. 302 """ 303 return { 304 key: val 305 for key, val in self.meta.items() 306 if key != 'instance' 307 }
Return the ordered keys for this pipe.
The connector to where this pipe resides.
May either be of type meerschaum.connectors.sql.SQLConnector
or
meerschaum.connectors.api.APIConnector
.
If the pipe was created with cache=True
, return the connector to the pipe's
SQLite database for caching.
If the pipe was created with cache=True
, return another meerschaum.Pipe
used to
manage the local data.
17def fetch( 18 self, 19 begin: Union[datetime, str, None] = '', 20 end: Optional[datetime] = None, 21 check_existing: bool = True, 22 sync_chunks: bool = False, 23 debug: bool = False, 24 **kw: Any 25 ) -> Union['pd.DataFrame', Iterator['pd.DataFrame'], None]: 26 """ 27 Fetch a Pipe's latest data from its connector. 28 29 Parameters 30 ---------- 31 begin: Union[datetime, str, None], default '': 32 If provided, only fetch data newer than or equal to `begin`. 33 34 end: Optional[datetime], default None: 35 If provided, only fetch data older than or equal to `end`. 36 37 check_existing: bool, default True 38 If `False`, do not apply the backtrack interval. 39 40 sync_chunks: bool, default False 41 If `True` and the pipe's connector is of type `'sql'`, begin syncing chunks while fetching 42 loads chunks into memory. 43 44 debug: bool, default False 45 Verbosity toggle. 46 47 Returns 48 ------- 49 A `pd.DataFrame` of the newest unseen data. 50 51 """ 52 if 'fetch' not in dir(self.connector): 53 warn(f"No `fetch()` function defined for connector '{self.connector}'") 54 return None 55 56 from meerschaum.connectors import custom_types, get_connector_plugin 57 from meerschaum.utils.debug import dprint, _checkpoint 58 59 _chunk_hook = kw.pop('chunk_hook', None) 60 kw['workers'] = self.get_num_workers(kw.get('workers', None)) 61 if sync_chunks and _chunk_hook is None: 62 63 def _chunk_hook(chunk, **_kw) -> SuccessTuple: 64 """ 65 Wrap `Pipe.sync()` with a custom chunk label prepended to the message. 66 """ 67 from meerschaum.config._patch import apply_patch_to_config 68 kwargs = apply_patch_to_config(kw, _kw) 69 chunk_success, chunk_message = self.sync(chunk, **kwargs) 70 chunk_label = self._get_chunk_label(chunk, self.columns.get('datetime', None)) 71 if chunk_label: 72 chunk_message = '\n' + chunk_label + '\n' + chunk_message 73 return chunk_success, chunk_message 74 75 76 with mrsm.Venv(get_connector_plugin(self.connector)): 77 df = self.connector.fetch( 78 self, 79 begin = _determine_begin( 80 self, 81 begin, 82 check_existing = check_existing, 83 debug = debug, 84 ), 85 end = end, 86 chunk_hook = _chunk_hook, 87 debug = debug, 88 **kw 89 ) 90 return df
Fetch a Pipe's latest data from its connector.
Parameters
- begin (Union[datetime, str, None], default '':):
If provided, only fetch data newer than or equal to
begin
. - end (Optional[datetime], default None:):
If provided, only fetch data older than or equal to
end
. - check_existing (bool, default True):
If
False
, do not apply the backtrack interval. - sync_chunks (bool, default False):
If
True
and the pipe's connector is of type'sql'
, begin syncing chunks while fetching loads chunks into memory. - debug (bool, default False): Verbosity toggle.
Returns
- A
pd.DataFrame
of the newest unseen data.
93def get_backtrack_interval( 94 self, 95 check_existing: bool = True, 96 debug: bool = False, 97 ) -> Union[timedelta, int]: 98 """ 99 Get the chunk interval to use for this pipe. 100 101 Parameters 102 ---------- 103 check_existing: bool, default True 104 If `False`, return a backtrack_interval of 0 minutes. 105 106 Returns 107 ------- 108 The backtrack interval (`timedelta` or `int`) to use with this pipe's `datetime` axis. 109 """ 110 default_backtrack_minutes = get_config('pipes', 'parameters', 'fetch', 'backtrack_minutes') 111 configured_backtrack_minutes = self.parameters.get('fetch', {}).get('backtrack_minutes', None) 112 backtrack_minutes = ( 113 configured_backtrack_minutes 114 if configured_backtrack_minutes is not None 115 else default_backtrack_minutes 116 ) if check_existing else 0 117 118 backtrack_interval = timedelta(minutes=backtrack_minutes) 119 dt_col = self.columns.get('datetime', None) 120 if dt_col is None: 121 return backtrack_interval 122 123 dt_dtype = self.dtypes.get(dt_col, 'datetime64[ns]') 124 if 'int' in dt_dtype.lower(): 125 return backtrack_minutes 126 127 return backtrack_interval
Get the chunk interval to use for this pipe.
Parameters
- check_existing (bool, default True):
If
False
, return a backtrack_interval of 0 minutes.
Returns
- The backtrack interval (
timedelta
orint
) to use with this pipe'sdatetime
axis.
15def get_data( 16 self, 17 select_columns: Optional[List[str]] = None, 18 omit_columns: Optional[List[str]] = None, 19 begin: Union[datetime, int, None] = None, 20 end: Union[datetime, int, None] = None, 21 params: Optional[Dict[str, Any]] = None, 22 as_iterator: bool = False, 23 as_chunks: bool = False, 24 as_dask: bool = False, 25 chunk_interval: Union[timedelta, int, None] = None, 26 fresh: bool = False, 27 debug: bool = False, 28 **kw: Any 29 ) -> Union['pd.DataFrame', Generator['pd.DataFrame'], None]: 30 """ 31 Get a pipe's data from the instance connector. 32 33 Parameters 34 ---------- 35 select_columns: Optional[List[str]], default None 36 If provided, only select these given columns. 37 Otherwise select all available columns (i.e. `SELECT *`). 38 39 omit_columns: Optional[List[str]], default None 40 If provided, remove these columns from the selection. 41 42 begin: Union[datetime, int, None], default None 43 Lower bound datetime to begin searching for data (inclusive). 44 Translates to a `WHERE` clause like `WHERE datetime >= begin`. 45 Defaults to `None`. 46 47 end: Union[datetime, int, None], default None 48 Upper bound datetime to stop searching for data (inclusive). 49 Translates to a `WHERE` clause like `WHERE datetime < end`. 50 Defaults to `None`. 51 52 params: Optional[Dict[str, Any]], default None 53 Filter the retrieved data by a dictionary of parameters. 54 See `meerschaum.utils.sql.build_where` for more details. 55 56 as_iterator: bool, default False 57 If `True`, return a generator of chunks of pipe data. 58 59 as_chunks: bool, default False 60 Alias for `as_iterator`. 61 62 as_dask: bool, default False 63 If `True`, return a `dask.DataFrame` 64 (which may be loaded into a Pandas DataFrame with `df.compute()`). 65 66 chunk_interval: Union[timedelta, int, None], default None 67 If `as_iterator`, then return chunks with `begin` and `end` separated by this interval. 68 This may be set under `pipe.parameters['chunk_minutes']`. 69 By default, use a timedelta of 1440 minutes (1 day). 70 If `chunk_interval` is an integer and the `datetime` axis a timestamp, 71 the use a timedelta with the number of minutes configured to this value. 72 If the `datetime` axis is an integer, default to the configured chunksize. 73 If `chunk_interval` is a `timedelta` and the `datetime` axis an integer, 74 use the number of minutes in the `timedelta`. 75 76 fresh: bool, default True 77 If `True`, skip local cache and directly query the instance connector. 78 Defaults to `True`. 79 80 debug: bool, default False 81 Verbosity toggle. 82 Defaults to `False`. 83 84 Returns 85 ------- 86 A `pd.DataFrame` for the pipe's data corresponding to the provided parameters. 87 88 """ 89 from meerschaum.utils.warnings import warn 90 from meerschaum.utils.venv import Venv 91 from meerschaum.connectors import get_connector_plugin 92 from meerschaum.utils.misc import iterate_chunks, items_str 93 from meerschaum.utils.dtypes import to_pandas_dtype 94 from meerschaum.utils.dataframe import add_missing_cols_to_df 95 from meerschaum.utils.packages import attempt_import 96 dd = attempt_import('dask.dataframe') if as_dask else None 97 dask = attempt_import('dask') if as_dask else None 98 99 if select_columns == '*': 100 select_columns = None 101 elif isinstance(select_columns, str): 102 select_columns = [select_columns] 103 104 if isinstance(omit_columns, str): 105 omit_columns = [omit_columns] 106 107 as_iterator = as_iterator or as_chunks 108 109 if as_iterator or as_chunks: 110 return self._get_data_as_iterator( 111 select_columns = select_columns, 112 omit_columns = omit_columns, 113 begin = begin, 114 end = end, 115 params = params, 116 chunk_interval = chunk_interval, 117 fresh = fresh, 118 debug = debug, 119 ) 120 121 if as_dask: 122 from multiprocessing.pool import ThreadPool 123 dask_pool = ThreadPool(self.get_num_workers()) 124 dask.config.set(pool=dask_pool) 125 chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug) 126 bounds = self.get_chunk_bounds( 127 begin = begin, 128 end = end, 129 bounded = False, 130 chunk_interval = chunk_interval, 131 debug = debug, 132 ) 133 dask_chunks = [ 134 dask.delayed(self.get_data)( 135 select_columns = select_columns, 136 omit_columns = omit_columns, 137 begin = chunk_begin, 138 end = chunk_end, 139 params = params, 140 chunk_interval = chunk_interval, 141 fresh = fresh, 142 debug = debug, 143 ) 144 for (chunk_begin, chunk_end) in bounds 145 ] 146 dask_meta = { 147 col: to_pandas_dtype(typ) 148 for col, typ in self.dtypes.items() 149 } 150 return dd.from_delayed(dask_chunks, meta=dask_meta) 151 152 if not self.exists(debug=debug): 153 return None 154 155 if self.cache_pipe is not None: 156 if not fresh: 157 _sync_cache_tuple = self.cache_pipe.sync( 158 begin = begin, 159 end = end, 160 params = params, 161 debug = debug, 162 **kw 163 ) 164 if not _sync_cache_tuple[0]: 165 warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1]) 166 fresh = True 167 else: ### Successfully synced cache. 168 return self.enforce_dtypes( 169 self.cache_pipe.get_data( 170 select_columns = select_columns, 171 omit_columns = omit_columns, 172 begin = begin, 173 end = end, 174 params = params, 175 debug = debug, 176 fresh = True, 177 **kw 178 ), 179 debug = debug, 180 ) 181 182 with Venv(get_connector_plugin(self.instance_connector)): 183 df = self.instance_connector.get_pipe_data( 184 pipe = self, 185 select_columns = select_columns, 186 omit_columns = omit_columns, 187 begin = begin, 188 end = end, 189 params = params, 190 debug = debug, 191 **kw 192 ) 193 if df is None: 194 return df 195 196 if not select_columns: 197 select_columns = [col for col in df.columns] 198 199 cols_to_omit = [ 200 col 201 for col in df.columns 202 if ( 203 col in (omit_columns or []) 204 or 205 col not in (select_columns or []) 206 ) 207 ] 208 cols_to_add = [ 209 col 210 for col in select_columns 211 if col not in df.columns 212 ] 213 if cols_to_omit: 214 warn( 215 ( 216 f"Received {len(cols_to_omit)} omitted column" 217 + ('s' if len(cols_to_omit) != 1 else '') 218 + f" for {self}. " 219 + "Consider adding `select_columns` and `omit_columns` support to " 220 + f"'{self.instance_connector.type}' connectors to improve performance." 221 ), 222 stack = False, 223 ) 224 _cols_to_select = [col for col in df.columns if col not in cols_to_omit] 225 df = df[_cols_to_select] 226 227 if cols_to_add: 228 warn( 229 ( 230 f"Specified columns {items_str(cols_to_add)} were not found on {self}. " 231 + "Adding these to the DataFrame as null columns." 232 ), 233 stack = False, 234 ) 235 df = add_missing_cols_to_df(df, {col: 'string' for col in cols_to_add}) 236 237 return self.enforce_dtypes(df, debug=debug)
Get a pipe's data from the instance connector.
Parameters
- select_columns (Optional[List[str]], default None):
If provided, only select these given columns.
Otherwise select all available columns (i.e.
SELECT *
). - omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
- begin (Union[datetime, int, None], default None):
Lower bound datetime to begin searching for data (inclusive).
Translates to a
WHERE
clause likeWHERE datetime >= begin
. Defaults toNone
. - end (Union[datetime, int, None], default None):
Upper bound datetime to stop searching for data (inclusive).
Translates to a
WHERE
clause likeWHERE datetime < end
. Defaults toNone
. - params (Optional[Dict[str, Any]], default None):
Filter the retrieved data by a dictionary of parameters.
See
meerschaum.utils.sql.build_where
for more details. - as_iterator (bool, default False):
If
True
, return a generator of chunks of pipe data. - as_chunks (bool, default False):
Alias for
as_iterator
. - as_dask (bool, default False):
If
True
, return adask.DataFrame
(which may be loaded into a Pandas DataFrame withdf.compute()
). - chunk_interval (Union[timedelta, int, None], default None):
If
as_iterator
, then return chunks withbegin
andend
separated by this interval. This may be set underpipe.parameters['chunk_minutes']
. By default, use a timedelta of 1440 minutes (1 day). Ifchunk_interval
is an integer and thedatetime
axis a timestamp, the use a timedelta with the number of minutes configured to this value. If thedatetime
axis is an integer, default to the configured chunksize. Ifchunk_interval
is atimedelta
and thedatetime
axis an integer, use the number of minutes in thetimedelta
. - fresh (bool, default True):
If
True
, skip local cache and directly query the instance connector. Defaults toTrue
. - debug (bool, default False):
Verbosity toggle.
Defaults to
False
.
Returns
- A
pd.DataFrame
for the pipe's data corresponding to the provided parameters.
332def get_backtrack_data( 333 self, 334 backtrack_minutes: Optional[int] = None, 335 begin: Union[datetime, int, None] = None, 336 params: Optional[Dict[str, Any]] = None, 337 fresh: bool = False, 338 debug: bool = False, 339 **kw: Any 340 ) -> Optional['pd.DataFrame']: 341 """ 342 Get the most recent data from the instance connector as a Pandas DataFrame. 343 344 Parameters 345 ---------- 346 backtrack_minutes: Optional[int], default None 347 How many minutes from `begin` to select from. 348 If `None`, use `pipe.parameters['fetch']['backtrack_minutes']`. 349 350 begin: Optional[datetime], default None 351 The starting point to search for data. 352 If begin is `None` (default), use the most recent observed datetime 353 (AKA sync_time). 354 355 ``` 356 E.g. begin = 02:00 357 358 Search this region. Ignore this, even if there's data. 359 / / / / / / / / / | 360 -----|----------|----------|----------|----------|----------| 361 00:00 01:00 02:00 03:00 04:00 05:00 362 363 ``` 364 365 params: Optional[Dict[str, Any]], default None 366 The standard Meerschaum `params` query dictionary. 367 368 369 fresh: bool, default False 370 If `True`, Ignore local cache and pull directly from the instance connector. 371 Only comes into effect if a pipe was created with `cache=True`. 372 373 debug: bool default False 374 Verbosity toggle. 375 376 Returns 377 ------- 378 A `pd.DataFrame` for the pipe's data corresponding to the provided parameters. Backtrack data 379 is a convenient way to get a pipe's data "backtracked" from the most recent datetime. 380 """ 381 from meerschaum.utils.warnings import warn 382 from meerschaum.utils.venv import Venv 383 from meerschaum.connectors import get_connector_plugin 384 385 if not self.exists(debug=debug): 386 return None 387 388 backtrack_interval = self.get_backtrack_interval(debug=debug) 389 if backtrack_minutes is None: 390 backtrack_minutes = ( 391 (backtrack_interval.total_seconds() * 60) 392 if isinstance(backtrack_interval, timedelta) 393 else backtrack_interval 394 ) 395 396 if self.cache_pipe is not None: 397 if not fresh: 398 _sync_cache_tuple = self.cache_pipe.sync(begin=begin, params=params, debug=debug, **kw) 399 if not _sync_cache_tuple[0]: 400 warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1]) 401 fresh = True 402 else: ### Successfully synced cache. 403 return self.enforce_dtypes( 404 self.cache_pipe.get_backtrack_data( 405 fresh = True, 406 begin = begin, 407 backtrack_minutes = backtrack_minutes, 408 params = params, 409 debug = deubg, 410 **kw 411 ), 412 debug = debug, 413 ) 414 415 if hasattr(self.instance_connector, 'get_backtrack_data'): 416 with Venv(get_connector_plugin(self.instance_connector)): 417 return self.enforce_dtypes( 418 self.instance_connector.get_backtrack_data( 419 pipe = self, 420 begin = begin, 421 backtrack_minutes = backtrack_minutes, 422 params = params, 423 debug = debug, 424 **kw 425 ), 426 debug = debug, 427 ) 428 429 if begin is None: 430 begin = self.get_sync_time(params=params, debug=debug) 431 432 backtrack_interval = ( 433 timedelta(minutes=backtrack_minutes) 434 if isinstance(begin, datetime) 435 else backtrack_minutes 436 ) 437 if begin is not None: 438 begin = begin - backtrack_interval 439 440 return self.get_data( 441 begin = begin, 442 params = params, 443 debug = debug, 444 **kw 445 )
Get the most recent data from the instance connector as a Pandas DataFrame.
Parameters
- backtrack_minutes (Optional[int], default None):
How many minutes from
begin
to select from. IfNone
, usepipe.parameters['fetch']['backtrack_minutes']
. - begin (Optional[datetime], default None):
The starting point to search for data.
If begin is
None
(default), use the most recent observed datetime (AKA sync_time).E.g. begin = 02:00 Search this region. Ignore this, even if there's data. / / / / / / / / / | -----|----------|----------|----------|----------|----------| 00:00 01:00 02:00 03:00 04:00 05:00
- params (Optional[Dict[str, Any]], default None):
The standard Meerschaum
params
query dictionary. - fresh (bool, default False):
If
True
, Ignore local cache and pull directly from the instance connector. Only comes into effect if a pipe was created withcache=True
. - debug (bool default False): Verbosity toggle.
Returns
- A
pd.DataFrame
for the pipe's data corresponding to the provided parameters. Backtrack data - is a convenient way to get a pipe's data "backtracked" from the most recent datetime.
448def get_rowcount( 449 self, 450 begin: Optional[datetime] = None, 451 end: Optional['datetime'] = None, 452 params: Optional[Dict[str, Any]] = None, 453 remote: bool = False, 454 debug: bool = False 455 ) -> int: 456 """ 457 Get a Pipe's instance or remote rowcount. 458 459 Parameters 460 ---------- 461 begin: Optional[datetime], default None 462 Count rows where datetime > begin. 463 464 end: Optional[datetime], default None 465 Count rows where datetime < end. 466 467 remote: bool, default False 468 Count rows from a pipe's remote source. 469 **NOTE**: This is experimental! 470 471 debug: bool, default False 472 Verbosity toggle. 473 474 Returns 475 ------- 476 An `int` of the number of rows in the pipe corresponding to the provided parameters. 477 Returned 0 if the pipe does not exist. 478 """ 479 from meerschaum.utils.warnings import warn 480 from meerschaum.utils.venv import Venv 481 from meerschaum.connectors import get_connector_plugin 482 483 connector = self.instance_connector if not remote else self.connector 484 try: 485 with Venv(get_connector_plugin(connector)): 486 rowcount = connector.get_pipe_rowcount( 487 self, 488 begin = begin, 489 end = end, 490 params = params, 491 remote = remote, 492 debug = debug, 493 ) 494 if rowcount is None: 495 return 0 496 return rowcount 497 except AttributeError as e: 498 warn(e) 499 if remote: 500 return 0 501 warn(f"Failed to get a rowcount for {self}.") 502 return 0
Get a Pipe's instance or remote rowcount.
Parameters
- begin (Optional[datetime], default None): Count rows where datetime > begin.
- end (Optional[datetime], default None): Count rows where datetime < end.
- remote (bool, default False): Count rows from a pipe's remote source. NOTE: This is experimental!
- debug (bool, default False): Verbosity toggle.
Returns
- An
int
of the number of rows in the pipe corresponding to the provided parameters. - Returned 0 if the pipe does not exist.
505def get_chunk_interval( 506 self, 507 chunk_interval: Union[timedelta, int, None] = None, 508 debug: bool = False, 509 ) -> Union[timedelta, int]: 510 """ 511 Get the chunk interval to use for this pipe. 512 513 Parameters 514 ---------- 515 chunk_interval: Union[timedelta, int, None], default None 516 If provided, coerce this value into the correct type. 517 For example, if the datetime axis is an integer, then 518 return the number of minutes. 519 520 Returns 521 ------- 522 The chunk interval (`timedelta` or `int`) to use with this pipe's `datetime` axis. 523 """ 524 default_chunk_minutes = get_config('pipes', 'parameters', 'verify', 'chunk_minutes') 525 configured_chunk_minutes = self.parameters.get('verify', {}).get('chunk_minutes', None) 526 chunk_minutes = ( 527 (configured_chunk_minutes or default_chunk_minutes) 528 if chunk_interval is None 529 else ( 530 chunk_interval 531 if isinstance(chunk_interval, int) 532 else int(chunk_interval.total_seconds() / 60) 533 ) 534 ) 535 536 dt_col = self.columns.get('datetime', None) 537 if dt_col is None: 538 return timedelta(minutes=chunk_minutes) 539 540 dt_dtype = self.dtypes.get(dt_col, 'datetime64[ns]') 541 if 'int' in dt_dtype.lower(): 542 return chunk_minutes 543 return timedelta(minutes=chunk_minutes)
Get the chunk interval to use for this pipe.
Parameters
- chunk_interval (Union[timedelta, int, None], default None): If provided, coerce this value into the correct type. For example, if the datetime axis is an integer, then return the number of minutes.
Returns
- The chunk interval (
timedelta
orint
) to use with this pipe'sdatetime
axis.
546def get_chunk_bounds( 547 self, 548 begin: Union[datetime, int, None] = None, 549 end: Union[datetime, int, None] = None, 550 bounded: bool = False, 551 chunk_interval: Union[timedelta, int, None] = None, 552 debug: bool = False, 553 ) -> List[ 554 Tuple[ 555 Union[datetime, int, None], 556 Union[datetime, int, None], 557 ] 558 ]: 559 """ 560 Return a list of datetime bounds for iterating over the pipe's `datetime` axis. 561 562 Parameters 563 ---------- 564 begin: Union[datetime, int, None], default None 565 If provided, do not select less than this value. 566 Otherwise the first chunk will be unbounded. 567 568 end: Union[datetime, int, None], default None 569 If provided, do not select greater than or equal to this value. 570 Otherwise the last chunk will be unbounded. 571 572 bounded: bool, default False 573 If `True`, do not include `None` in the first chunk. 574 575 chunk_interval: Union[timedelta, int, None], default None 576 If provided, use this interval for the size of chunk boundaries. 577 The default value for this pipe may be set 578 under `pipe.parameters['verify']['chunk_minutes']`. 579 580 debug: bool, default False 581 Verbosity toggle. 582 583 Returns 584 ------- 585 A list of chunk bounds (datetimes or integers). 586 If unbounded, the first and last chunks will include `None`. 587 """ 588 include_less_than_begin = not bounded and begin is None 589 include_greater_than_end = not bounded and end is None 590 if begin is None: 591 begin = self.get_sync_time(newest=False, debug=debug) 592 if end is None: 593 end = self.get_sync_time(newest=True, debug=debug) 594 if begin is None and end is None: 595 return [(None, None)] 596 597 ### Set the chunk interval under `pipe.parameters['verify']['chunk_minutes']`. 598 chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug) 599 600 ### Build a list of tuples containing the chunk boundaries 601 ### so that we can sync multiple chunks in parallel. 602 ### Run `verify pipes --workers 1` to sync chunks in series. 603 chunk_bounds = [] 604 begin_cursor = begin 605 while begin_cursor < end: 606 end_cursor = begin_cursor + chunk_interval 607 chunk_bounds.append((begin_cursor, end_cursor)) 608 begin_cursor = end_cursor 609 610 ### The chunk interval might be too large. 611 if not chunk_bounds and end >= begin: 612 chunk_bounds = [(begin, end)] 613 614 ### Truncate the last chunk to the end timestamp. 615 if chunk_bounds[-1][1] > end: 616 chunk_bounds[-1] = (chunk_bounds[-1][0], end) 617 618 ### Pop the last chunk if its bounds are equal. 619 if chunk_bounds[-1][0] == chunk_bounds[-1][1]: 620 chunk_bounds = chunk_bounds[:-1] 621 622 if include_less_than_begin: 623 chunk_bounds = [(None, begin)] + chunk_bounds 624 if include_greater_than_end: 625 chunk_bounds = chunk_bounds + [(end, None)] 626 627 return chunk_bounds
Return a list of datetime bounds for iterating over the pipe's datetime
axis.
Parameters
- begin (Union[datetime, int, None], default None): If provided, do not select less than this value. Otherwise the first chunk will be unbounded.
- end (Union[datetime, int, None], default None): If provided, do not select greater than or equal to this value. Otherwise the last chunk will be unbounded.
- bounded (bool, default False):
If
True
, do not includeNone
in the first chunk. - chunk_interval (Union[timedelta, int, None], default None):
If provided, use this interval for the size of chunk boundaries.
The default value for this pipe may be set
under
pipe.parameters['verify']['chunk_minutes']
. - debug (bool, default False): Verbosity toggle.
Returns
- A list of chunk bounds (datetimes or integers).
- If unbounded, the first and last chunks will include
None
.
12def register( 13 self, 14 debug: bool = False, 15 **kw: Any 16 ) -> SuccessTuple: 17 """ 18 Register a new Pipe along with its attributes. 19 20 Parameters 21 ---------- 22 debug: bool, default False 23 Verbosity toggle. 24 25 kw: Any 26 Keyword arguments to pass to `instance_connector.register_pipe()`. 27 28 Returns 29 ------- 30 A `SuccessTuple` of success, message. 31 """ 32 if self.temporary: 33 return False, "Cannot register pipes created with `temporary=True` (read-only)." 34 35 from meerschaum.utils.formatting import get_console 36 from meerschaum.utils.venv import Venv 37 from meerschaum.connectors import get_connector_plugin, custom_types 38 from meerschaum.config._patch import apply_patch_to_config 39 40 import warnings 41 with warnings.catch_warnings(): 42 warnings.simplefilter('ignore') 43 try: 44 _conn = self.connector 45 except Exception as e: 46 _conn = None 47 48 if ( 49 _conn is not None 50 and 51 (_conn.type == 'plugin' or _conn.type in custom_types) 52 and 53 getattr(_conn, 'register', None) is not None 54 ): 55 try: 56 with Venv(get_connector_plugin(_conn), debug=debug): 57 params = self.connector.register(self) 58 except Exception as e: 59 get_console().print_exception() 60 params = None 61 params = {} if params is None else params 62 if not isinstance(params, dict): 63 from meerschaum.utils.warnings import warn 64 warn( 65 f"Invalid parameters returned from `register()` in connector {self.connector}:\n" 66 + f"{params}" 67 ) 68 else: 69 self.parameters = apply_patch_to_config(params, self.parameters) 70 71 if not self.parameters: 72 cols = self.columns if self.columns else {'datetime': None, 'id': None} 73 self.parameters = { 74 'columns': cols, 75 } 76 77 with Venv(get_connector_plugin(self.instance_connector)): 78 return self.instance_connector.register_pipe(self, debug=debug, **kw)
Register a new Pipe along with its attributes.
Parameters
- debug (bool, default False): Verbosity toggle.
- kw (Any):
Keyword arguments to pass to
instance_connector.register_pipe()
.
Returns
- A
SuccessTuple
of success, message.
Return a dictionary of a pipe's keys and parameters. These values are reflected directly from the pipes table of the instance.
Return the columns
dictionary defined in meerschaum.Pipe.parameters
.
If defined, return the dtypes
dictionary defined in meerschaum.Pipe.parameters
.
139def get_columns(self, *args: str, error: bool = False) -> Union[str, Tuple[str]]: 140 """ 141 Check if the requested columns are defined. 142 143 Parameters 144 ---------- 145 *args: str 146 The column names to be retrieved. 147 148 error: bool, default False 149 If `True`, raise an `Exception` if the specified column is not defined. 150 151 Returns 152 ------- 153 A tuple of the same size of `args` or a `str` if `args` is a single argument. 154 155 Examples 156 -------- 157 >>> pipe = mrsm.Pipe('test', 'test') 158 >>> pipe.columns = {'datetime': 'dt', 'id': 'id'} 159 >>> pipe.get_columns('datetime', 'id') 160 ('dt', 'id') 161 >>> pipe.get_columns('value', error=True) 162 Exception: ๐ Missing 'value' column for Pipe('test', 'test'). 163 """ 164 from meerschaum.utils.warnings import error as _error, warn 165 if not args: 166 args = tuple(self.columns.keys()) 167 col_names = [] 168 for col in args: 169 col_name = None 170 try: 171 col_name = self.columns[col] 172 if col_name is None and error: 173 _error(f"Please define the name of the '{col}' column for {self}.") 174 except Exception as e: 175 col_name = None 176 if col_name is None and error: 177 _error(f"Missing '{col}'" + f" column for {self}.") 178 col_names.append(col_name) 179 if len(col_names) == 1: 180 return col_names[0] 181 return tuple(col_names)
Check if the requested columns are defined.
Parameters
- *args (str): The column names to be retrieved.
- error (bool, default False):
If
True
, raise anException
if the specified column is not defined.
Returns
- A tuple of the same size of
args
or astr
ifargs
is a single argument.
Examples
>>> pipe = mrsm.Pipe('test', 'test')
>>> pipe.columns = {'datetime': 'dt', 'id': 'id'}
>>> pipe.get_columns('datetime', 'id')
('dt', 'id')
>>> pipe.get_columns('value', error=True)
Exception: ๐ Missing 'value' column for Pipe('test', 'test').
184def get_columns_types(self, debug: bool = False) -> Union[Dict[str, str], None]: 185 """ 186 Get a dictionary of a pipe's column names and their types. 187 188 Parameters 189 ---------- 190 debug: bool, default False: 191 Verbosity toggle. 192 193 Returns 194 ------- 195 A dictionary of column names (`str`) to column types (`str`). 196 197 Examples 198 -------- 199 >>> pipe.get_columns_types() 200 { 201 'dt': 'TIMESTAMP WITHOUT TIMEZONE', 202 'id': 'BIGINT', 203 'val': 'DOUBLE PRECISION', 204 } 205 >>> 206 """ 207 from meerschaum.utils.venv import Venv 208 from meerschaum.connectors import get_connector_plugin 209 210 with Venv(get_connector_plugin(self.instance_connector)): 211 return self.instance_connector.get_pipe_columns_types(self, debug=debug)
Get a dictionary of a pipe's column names and their types.
Parameters
- debug (bool, default False:): Verbosity toggle.
Returns
- A dictionary of column names (
str
) to column types (str
).
Examples
>>> pipe.get_columns_types()
{
'dt': 'TIMESTAMP WITHOUT TIMEZONE',
'id': 'BIGINT',
'val': 'DOUBLE PRECISION',
}
>>>
436def get_indices(self) -> Dict[str, str]: 437 """ 438 Return a dictionary in the form of `pipe.columns` but map to index names. 439 """ 440 return { 441 ix: (self.target + '_' + col + '_index') 442 for ix, col in self.columns.items() if col 443 }
Return a dictionary in the form of pipe.columns
but map to index names.
214def get_id(self, **kw: Any) -> Union[int, None]: 215 """ 216 Fetch a pipe's ID from its instance connector. 217 If the pipe does not exist, return `None`. 218 """ 219 if self.temporary: 220 return None 221 from meerschaum.utils.venv import Venv 222 from meerschaum.connectors import get_connector_plugin 223 224 with Venv(get_connector_plugin(self.instance_connector)): 225 return self.instance_connector.get_pipe_id(self, **kw)
Fetch a pipe's ID from its instance connector.
If the pipe does not exist, return None
.
238def get_val_column(self, debug: bool = False) -> Union[str, None]: 239 """ 240 Return the name of the value column if it's defined, otherwise make an educated guess. 241 If not set in the `columns` dictionary, return the first numeric column that is not 242 an ID or datetime column. 243 If none may be found, return `None`. 244 245 Parameters 246 ---------- 247 debug: bool, default False: 248 Verbosity toggle. 249 250 Returns 251 ------- 252 Either a string or `None`. 253 """ 254 from meerschaum.utils.debug import dprint 255 if debug: 256 dprint('Attempting to determine the value column...') 257 try: 258 val_name = self.get_columns('value') 259 except Exception as e: 260 val_name = None 261 if val_name is not None: 262 if debug: 263 dprint(f"Value column: {val_name}") 264 return val_name 265 266 cols = self.columns 267 if cols is None: 268 if debug: 269 dprint('No columns could be determined. Returning...') 270 return None 271 try: 272 dt_name = self.get_columns('datetime', error=False) 273 except Exception as e: 274 dt_name = None 275 try: 276 id_name = self.get_columns('id', errors=False) 277 except Exception as e: 278 id_name = None 279 280 if debug: 281 dprint(f"dt_name: {dt_name}") 282 dprint(f"id_name: {id_name}") 283 284 cols_types = self.get_columns_types(debug=debug) 285 if cols_types is None: 286 return None 287 if debug: 288 dprint(f"cols_types: {cols_types}") 289 if dt_name is not None: 290 cols_types.pop(dt_name, None) 291 if id_name is not None: 292 cols_types.pop(id_name, None) 293 294 candidates = [] 295 candidate_keywords = {'float', 'double', 'precision', 'int', 'numeric',} 296 for search_term in candidate_keywords: 297 for col, typ in cols_types.items(): 298 if search_term in typ.lower(): 299 candidates.append(col) 300 break 301 if not candidates: 302 if debug: 303 dprint(f"No value column could be determined.") 304 return None 305 306 return candidates[0]
Return the name of the value column if it's defined, otherwise make an educated guess.
If not set in the columns
dictionary, return the first numeric column that is not
an ID or datetime column.
If none may be found, return None
.
Parameters
- debug (bool, default False:): Verbosity toggle.
Returns
- Either a string or
None
.
The target table name. You can set the target name under on of the following keys (checked in this order):
target
target_name
target_table
target_table_name
416def guess_datetime(self) -> Union[str, None]: 417 """ 418 Try to determine a pipe's datetime column. 419 """ 420 dtypes = self.dtypes 421 422 ### Abort if the user explictly disallows a datetime index. 423 if 'datetime' in dtypes: 424 if dtypes['datetime'] is None: 425 return None 426 427 dt_cols = [ 428 col for col, typ in self.dtypes.items() 429 if str(typ).startswith('datetime') 430 ] 431 if not dt_cols: 432 return None 433 return dt_cols[0]
Try to determine a pipe's datetime column.
12def show( 13 self, 14 nopretty: bool = False, 15 debug: bool = False, 16 **kw 17 ) -> SuccessTuple: 18 """ 19 Show attributes of a Pipe. 20 21 Parameters 22 ---------- 23 nopretty: bool, default False 24 If `True`, simply print the JSON of the pipe's attributes. 25 26 debug: bool, default False 27 Verbosity toggle. 28 29 Returns 30 ------- 31 A `SuccessTuple` of success, message. 32 33 """ 34 import json 35 from meerschaum.utils.formatting import ( 36 pprint, make_header, ANSI, highlight_pipes, fill_ansi, get_console, 37 ) 38 from meerschaum.utils.packages import import_rich, attempt_import 39 from meerschaum.utils.warnings import info 40 attributes_json = json.dumps(self.attributes) 41 if not nopretty: 42 _to_print = f"Attributes for {self}:" 43 if ANSI: 44 _to_print = fill_ansi(highlight_pipes(make_header(_to_print)), 'magenta') 45 print(_to_print) 46 rich = import_rich() 47 rich_json = attempt_import('rich.json') 48 get_console().print(rich_json.JSON(attributes_json)) 49 else: 50 print(_to_print) 51 else: 52 print(attributes_json) 53 54 return True, "Success"
Show attributes of a Pipe.
Parameters
- nopretty (bool, default False):
If
True
, simply print the JSON of the pipe's attributes. - debug (bool, default False): Verbosity toggle.
Returns
- A
SuccessTuple
of success, message.
21def edit( 22 self, 23 patch: bool = False, 24 interactive: bool = False, 25 debug: bool = False, 26 **kw: Any 27 ) -> SuccessTuple: 28 """ 29 Edit a Pipe's configuration. 30 31 Parameters 32 ---------- 33 patch: bool, default False 34 If `patch` is True, update parameters by cascading rather than overwriting. 35 interactive: bool, default False 36 If `True`, open an editor for the user to make changes to the pipe's YAML file. 37 debug: bool, default False 38 Verbosity toggle. 39 40 Returns 41 ------- 42 A `SuccessTuple` of success, message. 43 44 """ 45 from meerschaum.utils.venv import Venv 46 from meerschaum.connectors import get_connector_plugin 47 48 if self.temporary: 49 return False, "Cannot edit pipes created with `temporary=True` (read-only)." 50 51 if not interactive: 52 with Venv(get_connector_plugin(self.instance_connector)): 53 return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw) 54 from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH 55 from meerschaum.utils.misc import edit_file 56 parameters_filename = str(self) + '.yaml' 57 parameters_path = PIPES_CACHE_RESOURCES_PATH / parameters_filename 58 59 from meerschaum.utils.yaml import yaml 60 61 edit_text = f"Edit the parameters for {self}" 62 edit_top = '#' * (len(edit_text) + 4) 63 edit_header = edit_top + f'\n# {edit_text} #\n' + edit_top + '\n\n' 64 65 from meerschaum.config import get_config 66 parameters = dict(get_config('pipes', 'parameters', patch=True)) 67 from meerschaum.config._patch import apply_patch_to_config 68 parameters = apply_patch_to_config(parameters, self.parameters) 69 70 ### write parameters to yaml file 71 with open(parameters_path, 'w+') as f: 72 f.write(edit_header) 73 yaml.dump(parameters, stream=f, sort_keys=False) 74 75 ### only quit editing if yaml is valid 76 editing = True 77 while editing: 78 edit_file(parameters_path) 79 try: 80 with open(parameters_path, 'r') as f: 81 file_parameters = yaml.load(f.read()) 82 except Exception as e: 83 from meerschaum.utils.warnings import warn 84 warn(f"Invalid format defined for '{self}':\n\n{e}") 85 input(f"Press [Enter] to correct the configuration for '{self}': ") 86 else: 87 editing = False 88 89 self.parameters = file_parameters 90 91 if debug: 92 from meerschaum.utils.formatting import pprint 93 pprint(self.parameters) 94 95 with Venv(get_connector_plugin(self.instance_connector)): 96 return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw)
Edit a Pipe's configuration.
Parameters
- patch (bool, default False):
If
patch
is True, update parameters by cascading rather than overwriting. - interactive (bool, default False):
If
True
, open an editor for the user to make changes to the pipe's YAML file. - debug (bool, default False): Verbosity toggle.
Returns
- A
SuccessTuple
of success, message.
99def edit_definition( 100 self, 101 yes: bool = False, 102 noask: bool = False, 103 force: bool = False, 104 debug : bool = False, 105 **kw : Any 106 ) -> SuccessTuple: 107 """ 108 Edit a pipe's definition file and update its configuration. 109 **NOTE:** This function is interactive and should not be used in automated scripts! 110 111 Returns 112 ------- 113 A `SuccessTuple` of success, message. 114 115 """ 116 if self.temporary: 117 return False, "Cannot edit pipes created with `temporary=True` (read-only)." 118 119 from meerschaum.connectors import instance_types 120 if (self.connector is None) or self.connector.type not in instance_types: 121 return self.edit(interactive=True, debug=debug, **kw) 122 123 import json 124 from meerschaum.utils.warnings import info, warn 125 from meerschaum.utils.debug import dprint 126 from meerschaum.config._patch import apply_patch_to_config 127 from meerschaum.utils.misc import edit_file 128 129 _parameters = self.parameters 130 if 'fetch' not in _parameters: 131 _parameters['fetch'] = {} 132 133 def _edit_api(): 134 from meerschaum.utils.prompt import prompt, yes_no 135 info( 136 f"Please enter the keys of the source pipe from '{self.connector}'.\n" + 137 "Type 'None' for None, or empty when there is no default. Press [CTRL+C] to skip." 138 ) 139 140 _keys = { 'connector_keys' : None, 'metric_key' : None, 'location_key' : None } 141 for k in _keys: 142 _keys[k] = _parameters['fetch'].get(k, None) 143 144 for k, v in _keys.items(): 145 try: 146 _keys[k] = prompt(k.capitalize().replace('_', ' ') + ':', icon=True, default=v) 147 except KeyboardInterrupt: 148 continue 149 if _keys[k] in ('', 'None', '\'None\'', '[None]'): 150 _keys[k] = None 151 152 _parameters['fetch'] = apply_patch_to_config(_parameters['fetch'], _keys) 153 154 info("You may optionally specify additional filter parameters as JSON.") 155 print(" Parameters are translated into a 'WHERE x AND y' clause, and lists are IN clauses.") 156 print(" For example, the following JSON would correspond to 'WHERE x = 1 AND y IN (2, 3)':") 157 print(json.dumps({'x': 1, 'y': [2, 3]}, indent=2, separators=(',', ': '))) 158 if force or yes_no( 159 "Would you like to add additional filter parameters?", 160 yes=yes, noask=noask 161 ): 162 from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH 163 definition_filename = str(self) + '.json' 164 definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename 165 try: 166 definition_path.touch() 167 with open(definition_path, 'w+') as f: 168 json.dump(_parameters.get('fetch', {}).get('params', {}), f, indent=2) 169 except Exception as e: 170 return False, f"Failed writing file '{definition_path}':\n" + str(e) 171 172 _params = None 173 while True: 174 edit_file(definition_path) 175 try: 176 with open(definition_path, 'r') as f: 177 _params = json.load(f) 178 except Exception as e: 179 warn(f'Failed to read parameters JSON:\n{e}', stack=False) 180 if force or yes_no( 181 "Would you like to try again?\n " 182 + "If not, the parameters JSON file will be ignored.", 183 noask=noask, yes=yes 184 ): 185 continue 186 _params = None 187 break 188 if _params is not None: 189 if 'fetch' not in _parameters: 190 _parameters['fetch'] = {} 191 _parameters['fetch']['params'] = _params 192 193 self.parameters = _parameters 194 return True, "Success" 195 196 def _edit_sql(): 197 import pathlib, os, textwrap 198 from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH 199 from meerschaum.utils.misc import edit_file 200 definition_filename = str(self) + '.sql' 201 definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename 202 203 sql_definition = _parameters['fetch'].get('definition', None) 204 if sql_definition is None: 205 sql_definition = '' 206 sql_definition = textwrap.dedent(sql_definition).lstrip() 207 208 try: 209 definition_path.touch() 210 with open(definition_path, 'w+') as f: 211 f.write(sql_definition) 212 except Exception as e: 213 return False, f"Failed writing file '{definition_path}':\n" + str(e) 214 215 edit_file(definition_path) 216 try: 217 with open(definition_path, 'r') as f: 218 file_definition = f.read() 219 except Exception as e: 220 return False, f"Failed reading file '{definition_path}':\n" + str(e) 221 222 if sql_definition == file_definition: 223 return False, f"No changes made to definition for {self}." 224 225 if ' ' not in file_definition: 226 return False, f"Invalid SQL definition for {self}." 227 228 if debug: 229 dprint("Read SQL definition:\n\n" + file_definition) 230 _parameters['fetch']['definition'] = file_definition 231 self.parameters = _parameters 232 return True, "Success" 233 234 locals()['_edit_' + str(self.connector.type)]() 235 return self.edit(interactive=False, debug=debug, **kw)
Edit a pipe's definition file and update its configuration. NOTE: This function is interactive and should not be used in automated scripts!
Returns
- A
SuccessTuple
of success, message.
13def update(self, *args, **kw) -> SuccessTuple: 14 """ 15 Update a pipe's parameters in its instance. 16 """ 17 kw['interactive'] = False 18 return self.edit(*args, **kw)
Update a pipe's parameters in its instance.
37def sync( 38 self, 39 df: Union[ 40 pd.DataFrame, 41 Dict[str, List[Any]], 42 List[Dict[str, Any]], 43 InferFetch 44 ] = InferFetch, 45 begin: Union[datetime, int, str, None] = '', 46 end: Union[datetime, int] = None, 47 force: bool = False, 48 retries: int = 10, 49 min_seconds: int = 1, 50 check_existing: bool = True, 51 blocking: bool = True, 52 workers: Optional[int] = None, 53 callback: Optional[Callable[[Tuple[bool, str]], Any]] = None, 54 error_callback: Optional[Callable[[Exception], Any]] = None, 55 chunksize: Optional[int] = -1, 56 sync_chunks: bool = True, 57 debug: bool = False, 58 _inplace: bool = True, 59 **kw: Any 60 ) -> SuccessTuple: 61 """ 62 Fetch new data from the source and update the pipe's table with new data. 63 64 Get new remote data via fetch, get existing data in the same time period, 65 and merge the two, only keeping the unseen data. 66 67 Parameters 68 ---------- 69 df: Union[None, pd.DataFrame, Dict[str, List[Any]]], default None 70 An optional DataFrame to sync into the pipe. Defaults to `None`. 71 72 begin: Union[datetime, int, str, None], default '' 73 Optionally specify the earliest datetime to search for data. 74 75 end: Union[datetime, int, str, None], default None 76 Optionally specify the latest datetime to search for data. 77 78 force: bool, default False 79 If `True`, keep trying to sync untul `retries` attempts. 80 81 retries: int, default 10 82 If `force`, how many attempts to try syncing before declaring failure. 83 84 min_seconds: Union[int, float], default 1 85 If `force`, how many seconds to sleep between retries. Defaults to `1`. 86 87 check_existing: bool, default True 88 If `True`, pull and diff with existing data from the pipe. 89 90 blocking: bool, default True 91 If `True`, wait for sync to finish and return its result, otherwise 92 asyncronously sync (oxymoron?) and return success. Defaults to `True`. 93 Only intended for specific scenarios. 94 95 workers: Optional[int], default None 96 If provided and the instance connector is thread-safe 97 (`pipe.instance_connector.IS_THREAD_SAFE is True`), 98 limit concurrent sync to this many threads. 99 100 callback: Optional[Callable[[Tuple[bool, str]], Any]], default None 101 Callback function which expects a SuccessTuple as input. 102 Only applies when `blocking=False`. 103 104 error_callback: Optional[Callable[[Exception], Any]], default None 105 Callback function which expects an Exception as input. 106 Only applies when `blocking=False`. 107 108 chunksize: int, default -1 109 Specify the number of rows to sync per chunk. 110 If `-1`, resort to system configuration (default is `900`). 111 A `chunksize` of `None` will sync all rows in one transaction. 112 113 sync_chunks: bool, default True 114 If possible, sync chunks while fetching them into memory. 115 116 debug: bool, default False 117 Verbosity toggle. Defaults to False. 118 119 Returns 120 ------- 121 A `SuccessTuple` of success (`bool`) and message (`str`). 122 """ 123 from meerschaum.utils.debug import dprint, _checkpoint 124 from meerschaum.connectors import custom_types 125 from meerschaum.plugins import Plugin 126 from meerschaum.utils.formatting import get_console 127 from meerschaum.utils.venv import Venv 128 from meerschaum.connectors import get_connector_plugin 129 from meerschaum.utils.misc import df_is_chunk_generator 130 from meerschaum.utils.pool import get_pool 131 from meerschaum.config import get_config 132 133 if (callback is not None or error_callback is not None) and blocking: 134 warn("Callback functions are only executed when blocking = False. Ignoring...") 135 136 _checkpoint(_total=2, **kw) 137 138 if chunksize == 0: 139 chunksize = None 140 sync_chunks = False 141 142 kw.update({ 143 'begin': begin, 144 'end': end, 145 'force': force, 146 'retries': retries, 147 'min_seconds': min_seconds, 148 'check_existing': check_existing, 149 'blocking': blocking, 150 'workers': workers, 151 'callback': callback, 152 'error_callback': error_callback, 153 'sync_chunks': sync_chunks, 154 'chunksize': chunksize, 155 }) 156 157 ### NOTE: Invalidate `_exists` cache before and after syncing. 158 self._exists = None 159 160 def _sync( 161 p: 'meerschaum.Pipe', 162 df: Union[ 163 'pd.DataFrame', 164 Dict[str, List[Any]], 165 List[Dict[str, Any]], 166 InferFetch 167 ] = InferFetch, 168 ) -> SuccessTuple: 169 if df is None: 170 p._exists = None 171 return ( 172 False, 173 f"You passed `None` instead of data into `sync()` for {p}.\n" 174 + "Omit the DataFrame to infer fetching.", 175 ) 176 ### Ensure that Pipe is registered. 177 if not p.temporary and p.get_id(debug=debug) is None: 178 ### NOTE: This may trigger an interactive session for plugins! 179 register_success, register_msg = p.register(debug=debug) 180 if not register_success: 181 if 'already' not in register_msg: 182 p._exists = None 183 return register_success, register_msg 184 185 ### If connector is a plugin with a `sync()` method, return that instead. 186 ### If the plugin does not have a `sync()` method but does have a `fetch()` method, 187 ### use that instead. 188 ### NOTE: The DataFrame must be omitted for the plugin sync method to apply. 189 ### If a DataFrame is provided, continue as expected. 190 if hasattr(df, 'MRSM_INFER_FETCH'): 191 try: 192 if p.connector is None: 193 msg = f"{p} does not have a valid connector." 194 if p.connector_keys.startswith('plugin:'): 195 msg += f"\n Perhaps {p.connector_keys} has a syntax error?" 196 p._exists = None 197 return False, msg 198 except Exception as e: 199 p._exists = None 200 return False, f"Unable to create the connector for {p}." 201 202 ### Sync in place if this is a SQL pipe. 203 if ( 204 str(self.connector) == str(self.instance_connector) 205 and 206 hasattr(self.instance_connector, 'sync_pipe_inplace') 207 and 208 _inplace 209 and 210 get_config('system', 'experimental', 'inplace_sync') 211 ): 212 with Venv(get_connector_plugin(self.instance_connector)): 213 p._exists = None 214 return self.instance_connector.sync_pipe_inplace(p, debug=debug, **kw) 215 216 217 ### Activate and invoke `sync(pipe)` for plugin connectors with `sync` methods. 218 try: 219 if p.connector.type == 'plugin' and p.connector.sync is not None: 220 connector_plugin = Plugin(p.connector.label) 221 with Venv(connector_plugin, debug=debug): 222 return_tuple = p.connector.sync(p, debug=debug, **kw) 223 p._exists = None 224 if not isinstance(return_tuple, tuple): 225 return_tuple = ( 226 False, 227 f"Plugin '{p.connector.label}' returned non-tuple value: {return_tuple}" 228 ) 229 return return_tuple 230 231 except Exception as e: 232 get_console().print_exception() 233 msg = f"Failed to sync {p} with exception: '" + str(e) + "'" 234 if debug: 235 error(msg, silent=False) 236 p._exists = None 237 return False, msg 238 239 ### Fetch the dataframe from the connector's `fetch()` method. 240 try: 241 with Venv(get_connector_plugin(p.connector), debug=debug): 242 df = p.fetch(debug=debug, **kw) 243 244 except Exception as e: 245 get_console().print_exception( 246 suppress = [ 247 'meerschaum/core/Pipe/_sync.py', 248 'meerschaum/core/Pipe/_fetch.py', 249 ] 250 ) 251 msg = f"Failed to fetch data from {p.connector}:\n {e}" 252 df = None 253 254 if df is None: 255 p._exists = None 256 return False, f"No data were fetched for {p}." 257 258 if isinstance(df, list): 259 if len(df) == 0: 260 return True, f"No new rows were returned for {p}." 261 262 ### May be a chunk hook results list. 263 if isinstance(df[0], tuple): 264 success = all([_success for _success, _ in df]) 265 message = '\n'.join([_message for _, _message in df]) 266 return success, message 267 268 ### TODO: Depreciate async? 269 if df is True: 270 p._exists = None 271 return True, f"{p} is being synced in parallel." 272 273 ### CHECKPOINT: Retrieved the DataFrame. 274 _checkpoint(**kw) 275 276 ### Allow for dataframe generators or iterables. 277 if df_is_chunk_generator(df): 278 kw['workers'] = p.get_num_workers(kw.get('workers', None)) 279 dt_col = p.columns.get('datetime', None) 280 pool = get_pool(workers=kw.get('workers', 1)) 281 if debug: 282 dprint(f"Received {type(df)}. Attempting to sync first chunk...") 283 284 try: 285 chunk = next(df) 286 except StopIteration: 287 return True, "Received an empty generator; nothing to do." 288 289 chunk_success, chunk_msg = _sync(p, chunk) 290 chunk_msg = '\n' + self._get_chunk_label(chunk, dt_col) + '\n' + chunk_msg 291 if not chunk_success: 292 return chunk_success, f"Unable to sync initial chunk for {p}:\n{chunk_msg}" 293 if debug: 294 dprint(f"Successfully synced the first chunk, attemping the rest...") 295 296 failed_chunks = [] 297 def _process_chunk(_chunk): 298 try: 299 _chunk_success, _chunk_msg = _sync(p, _chunk) 300 except Exception as e: 301 _chunk_success, _chunk_msg = False, str(e) 302 if not _chunk_success: 303 failed_chunks.append(_chunk) 304 return ( 305 _chunk_success, 306 ( 307 '\n' 308 + self._get_chunk_label(_chunk, dt_col) 309 + '\n' 310 + _chunk_msg 311 ) 312 ) 313 314 315 results = sorted( 316 [(chunk_success, chunk_msg)] + ( 317 list(pool.imap(_process_chunk, df)) 318 if not df_is_chunk_generator(chunk) 319 else [ 320 _process_chunk(_child_chunks) 321 for _child_chunks in df 322 ] 323 ) 324 ) 325 chunk_messages = [chunk_msg for _, chunk_msg in results] 326 success_bools = [chunk_success for chunk_success, _ in results] 327 success = all(success_bools) 328 msg = '\n'.join(chunk_messages) 329 330 ### If some chunks succeeded, retry the failures. 331 retry_success = True 332 if not success and any(success_bools): 333 if debug: 334 dprint(f"Retrying failed chunks...") 335 chunks_to_retry = [c for c in failed_chunks] 336 failed_chunks = [] 337 for chunk in chunks_to_retry: 338 chunk_success, chunk_msg = _process_chunk(chunk) 339 msg += f"\n\nRetried chunk:\n{chunk_msg}\n" 340 retry_success = retry_success and chunk_success 341 342 success = success and retry_success 343 return success, msg 344 345 ### Cast to a dataframe and ensure datatypes are what we expect. 346 df = self.enforce_dtypes(df, chunksize=chunksize, debug=debug) 347 if debug: 348 dprint( 349 "DataFrame to sync:\n" 350 + ( 351 str(df)[:255] 352 + '...' 353 if len(str(df)) >= 256 354 else str(df) 355 ), 356 **kw 357 ) 358 359 ### if force, continue to sync until success 360 return_tuple = False, f"Did not sync {p}." 361 run = True 362 _retries = 1 363 while run: 364 with Venv(get_connector_plugin(self.instance_connector)): 365 return_tuple = p.instance_connector.sync_pipe( 366 pipe = p, 367 df = df, 368 debug = debug, 369 **kw 370 ) 371 _retries += 1 372 run = (not return_tuple[0]) and force and _retries <= retries 373 if run and debug: 374 dprint(f"Syncing failed for {p}. Attempt ( {_retries} / {retries} )", **kw) 375 dprint(f"Sleeping for {min_seconds} seconds...", **kw) 376 time.sleep(min_seconds) 377 if _retries > retries: 378 warn( 379 f"Unable to sync {p} within {retries} attempt" + 380 ("s" if retries != 1 else "") + "!" 381 ) 382 383 ### CHECKPOINT: Finished syncing. Handle caching. 384 _checkpoint(**kw) 385 if self.cache_pipe is not None: 386 if debug: 387 dprint(f"Caching retrieved dataframe.", **kw) 388 _sync_cache_tuple = self.cache_pipe.sync(df, debug=debug, **kw) 389 if not _sync_cache_tuple[0]: 390 warn(f"Failed to sync local cache for {self}.") 391 392 self._exists = None 393 return return_tuple 394 395 if blocking: 396 self._exists = None 397 return _sync(self, df = df) 398 399 from meerschaum.utils.threading import Thread 400 def default_callback(result_tuple : SuccessTuple): 401 dprint(f"Asynchronous result from {self}: {result_tuple}", **kw) 402 403 def default_error_callback(x : Exception): 404 dprint(f"Error received for {self}: {x}", **kw) 405 406 if callback is None and debug: 407 callback = default_callback 408 if error_callback is None and debug: 409 error_callback = default_error_callback 410 try: 411 thread = Thread( 412 target = _sync, 413 args = (self,), 414 kwargs = {'df' : df}, 415 daemon = False, 416 callback = callback, 417 error_callback = error_callback 418 ) 419 thread.start() 420 except Exception as e: 421 self._exists = None 422 return False, str(e) 423 424 self._exists = None 425 return True, f"Spawned asyncronous sync for {self}."
Fetch new data from the source and update the pipe's table with new data.
Get new remote data via fetch, get existing data in the same time period, and merge the two, only keeping the unseen data.
Parameters
- df (Union[None, pd.DataFrame, Dict[str, List[Any]]], default None):
An optional DataFrame to sync into the pipe. Defaults to
None
. - begin (Union[datetime, int, str, None], default ''): Optionally specify the earliest datetime to search for data.
- end (Union[datetime, int, str, None], default None): Optionally specify the latest datetime to search for data.
- force (bool, default False):
If
True
, keep trying to sync untulretries
attempts. - retries (int, default 10):
If
force
, how many attempts to try syncing before declaring failure. - min_seconds (Union[int, float], default 1):
If
force
, how many seconds to sleep between retries. Defaults to1
. - check_existing (bool, default True):
If
True
, pull and diff with existing data from the pipe. - blocking (bool, default True):
If
True
, wait for sync to finish and return its result, otherwise asyncronously sync (oxymoron?) and return success. Defaults toTrue
. Only intended for specific scenarios. - workers (Optional[int], default None):
If provided and the instance connector is thread-safe
(
pipe.instance_connector.IS_THREAD_SAFE is True
), limit concurrent sync to this many threads. - callback (Optional[Callable[[Tuple[bool, str]], Any]], default None):
Callback function which expects a SuccessTuple as input.
Only applies when
blocking=False
. - error_callback (Optional[Callable[[Exception], Any]], default None):
Callback function which expects an Exception as input.
Only applies when
blocking=False
. - chunksize (int, default -1):
Specify the number of rows to sync per chunk.
If
-1
, resort to system configuration (default is900
). Achunksize
ofNone
will sync all rows in one transaction. - sync_chunks (bool, default True): If possible, sync chunks while fetching them into memory.
- debug (bool, default False): Verbosity toggle. Defaults to False.
Returns
- A
SuccessTuple
of success (bool
) and message (str
).
428def get_sync_time( 429 self, 430 params: Optional[Dict[str, Any]] = None, 431 newest: bool = True, 432 round_down: bool = False, 433 debug: bool = False 434 ) -> Union['datetime', None]: 435 """ 436 Get the most recent datetime value for a Pipe. 437 438 Parameters 439 ---------- 440 params: Optional[Dict[str, Any]], default None 441 Dictionary to build a WHERE clause for a specific column. 442 See `meerschaum.utils.sql.build_where`. 443 444 newest: bool, default True 445 If `True`, get the most recent datetime (honoring `params`). 446 If `False`, get the oldest datetime (`ASC` instead of `DESC`). 447 448 round_down: bool, default False 449 If `True`, round down the datetime value to the nearest minute. 450 451 debug: bool, default False 452 Verbosity toggle. 453 454 Returns 455 ------- 456 A `datetime` object if the pipe exists, otherwise `None`. 457 458 """ 459 from meerschaum.utils.venv import Venv 460 from meerschaum.connectors import get_connector_plugin 461 from meerschaum.utils.misc import round_time 462 463 with Venv(get_connector_plugin(self.instance_connector)): 464 sync_time = self.instance_connector.get_sync_time( 465 self, 466 params = params, 467 newest = newest, 468 debug = debug, 469 ) 470 471 if not round_down or not isinstance(sync_time, datetime): 472 return sync_time 473 474 return round_time(sync_time, timedelta(minutes=1))
Get the most recent datetime value for a Pipe.
Parameters
- params (Optional[Dict[str, Any]], default None):
Dictionary to build a WHERE clause for a specific column.
See
meerschaum.utils.sql.build_where
. - newest (bool, default True):
If
True
, get the most recent datetime (honoringparams
). IfFalse
, get the oldest datetime (ASC
instead ofDESC
). - round_down (bool, default False):
If
True
, round down the datetime value to the nearest minute. - debug (bool, default False): Verbosity toggle.
Returns
- A
datetime
object if the pipe exists, otherwiseNone
.
477def exists( 478 self, 479 debug : bool = False 480 ) -> bool: 481 """ 482 See if a Pipe's table exists. 483 484 Parameters 485 ---------- 486 debug: bool, default False 487 Verbosity toggle. 488 489 Returns 490 ------- 491 A `bool` corresponding to whether a pipe's underlying table exists. 492 493 """ 494 import time 495 from meerschaum.utils.venv import Venv 496 from meerschaum.connectors import get_connector_plugin 497 from meerschaum.config import STATIC_CONFIG 498 from meerschaum.utils.debug import dprint 499 now = time.perf_counter() 500 exists_timeout_seconds = STATIC_CONFIG['pipes']['exists_timeout_seconds'] 501 502 _exists = self.__dict__.get('_exists', None) 503 if _exists: 504 exists_timestamp = self.__dict__.get('_exists_timestamp', None) 505 if exists_timestamp is not None: 506 delta = now - exists_timestamp 507 if delta < exists_timeout_seconds: 508 if debug: 509 dprint(f"Returning cached `exists` for {self} ({round(delta, 2)} seconds old).") 510 return _exists 511 512 with Venv(get_connector_plugin(self.instance_connector)): 513 _exists = self.instance_connector.pipe_exists(pipe=self, debug=debug) 514 515 self.__dict__['_exists'] = _exists 516 self.__dict__['_exists_timestamp'] = now 517 return _exists
See if a Pipe's table exists.
Parameters
- debug (bool, default False): Verbosity toggle.
Returns
- A
bool
corresponding to whether a pipe's underlying table exists.
520def filter_existing( 521 self, 522 df: 'pd.DataFrame', 523 safe_copy: bool = True, 524 date_bound_only: bool = False, 525 chunksize: Optional[int] = -1, 526 debug: bool = False, 527 **kw 528 ) -> Tuple['pd.DataFrame', 'pd.DataFrame', 'pd.DataFrame']: 529 """ 530 Inspect a dataframe and filter out rows which already exist in the pipe. 531 532 Parameters 533 ---------- 534 df: 'pd.DataFrame' 535 The dataframe to inspect and filter. 536 537 safe_copy: bool, default True 538 If `True`, create a copy before comparing and modifying the dataframes. 539 Setting to `False` may mutate the DataFrames. 540 See `meerschaum.utils.dataframe.filter_unseen_df`. 541 542 date_bound_only: bool, default False 543 If `True`, only use the datetime index to fetch the sample dataframe. 544 545 chunksize: Optional[int], default -1 546 The `chunksize` used when fetching existing data. 547 548 debug: bool, default False 549 Verbosity toggle. 550 551 Returns 552 ------- 553 A tuple of three pandas DataFrames: unseen, update, and delta. 554 """ 555 from meerschaum.utils.warnings import warn 556 from meerschaum.utils.debug import dprint 557 from meerschaum.utils.packages import attempt_import, import_pandas 558 from meerschaum.utils.misc import round_time 559 from meerschaum.utils.dataframe import ( 560 filter_unseen_df, 561 add_missing_cols_to_df, 562 get_unhashable_cols, 563 get_numeric_cols, 564 ) 565 from meerschaum.utils.dtypes import ( 566 to_pandas_dtype, 567 none_if_null, 568 ) 569 from meerschaum.config import get_config 570 pd = import_pandas() 571 pandas = attempt_import('pandas') 572 if not 'dataframe' in str(type(df)).lower(): 573 df = self.enforce_dtypes(df, chunksize=chunksize, debug=debug) 574 is_dask = 'dask' in df.__module__ 575 if is_dask: 576 dd = attempt_import('dask.dataframe') 577 merge = dd.merge 578 NA = pandas.NA 579 else: 580 merge = pd.merge 581 NA = pd.NA 582 if df is None: 583 return df, df, df 584 if (df.empty if not is_dask else len(df) == 0): 585 return df, df, df 586 587 ### begin is the oldest data in the new dataframe 588 begin, end = None, None 589 dt_col = self.columns.get('datetime', None) 590 dt_type = self.dtypes.get(dt_col, 'datetime64[ns]') if dt_col else None 591 try: 592 min_dt_val = df[dt_col].min(skipna=True) if dt_col else None 593 if is_dask and min_dt_val is not None: 594 min_dt_val = min_dt_val.compute() 595 min_dt = ( 596 pandas.to_datetime(min_dt_val).to_pydatetime() 597 if min_dt_val is not None and 'datetime' in str(dt_type) 598 else min_dt_val 599 ) 600 except Exception as e: 601 min_dt = None 602 if not ('datetime' in str(type(min_dt))) or str(min_dt) == 'NaT': 603 if 'int' not in str(type(min_dt)).lower(): 604 min_dt = None 605 606 if isinstance(min_dt, datetime): 607 begin = ( 608 round_time( 609 min_dt, 610 to = 'down' 611 ) - timedelta(minutes=1) 612 ) 613 elif dt_type and 'int' in dt_type.lower(): 614 begin = min_dt 615 elif dt_col is None: 616 begin = None 617 618 ### end is the newest data in the new dataframe 619 try: 620 max_dt_val = df[dt_col].max(skipna=True) if dt_col else None 621 if is_dask and max_dt_val is not None: 622 max_dt_val = max_dt_val.compute() 623 max_dt = ( 624 pandas.to_datetime(max_dt_val).to_pydatetime() 625 if max_dt_val is not None and 'datetime' in str(dt_type) 626 else max_dt_val 627 ) 628 except Exception as e: 629 import traceback 630 traceback.print_exc() 631 max_dt = None 632 633 if ('datetime' not in str(type(max_dt))) or str(min_dt) == 'NaT': 634 if 'int' not in str(type(max_dt)).lower(): 635 max_dt = None 636 637 if isinstance(max_dt, datetime): 638 end = ( 639 round_time( 640 max_dt, 641 to = 'down' 642 ) + timedelta(minutes=1) 643 ) 644 elif dt_type and 'int' in dt_type.lower(): 645 end = max_dt + 1 646 647 if max_dt is not None and min_dt is not None and min_dt > max_dt: 648 warn(f"Detected minimum datetime greater than maximum datetime.") 649 650 if begin is not None and end is not None and begin > end: 651 if isinstance(begin, datetime): 652 begin = end - timedelta(minutes=1) 653 ### We might be using integers for the datetime axis. 654 else: 655 begin = end - 1 656 657 unique_index_vals = { 658 col: df[col].unique() 659 for col in self.columns 660 if col in df.columns and col != dt_col 661 } if not date_bound_only else {} 662 filter_params_index_limit = get_config('pipes', 'sync', 'filter_params_index_limit') 663 _ = kw.pop('params', None) 664 params = { 665 col: [ 666 none_if_null(val) 667 for val in unique_vals 668 ] 669 for col, unique_vals in unique_index_vals.items() 670 if len(unique_vals) <= filter_params_index_limit 671 } if not date_bound_only else {} 672 673 if debug: 674 dprint(f"Looking at data between '{begin}' and '{end}':", **kw) 675 676 backtrack_df = self.get_data( 677 begin = begin, 678 end = end, 679 chunksize = chunksize, 680 params = params, 681 debug = debug, 682 **kw 683 ) 684 if debug: 685 dprint(f"Existing data for {self}:\n" + str(backtrack_df), **kw) 686 dprint(f"Existing dtypes for {self}:\n" + str(backtrack_df.dtypes)) 687 688 ### Separate new rows from changed ones. 689 on_cols = [ 690 col for col_key, col in self.columns.items() 691 if ( 692 col 693 and 694 col_key != 'value' 695 and col in backtrack_df.columns 696 ) 697 ] 698 self_dtypes = self.dtypes 699 on_cols_dtypes = { 700 col: to_pandas_dtype(typ) 701 for col, typ in self_dtypes.items() 702 if col in on_cols 703 } 704 705 ### Detect changes between the old target and new source dataframes. 706 delta_df = add_missing_cols_to_df( 707 filter_unseen_df( 708 backtrack_df, 709 df, 710 dtypes = { 711 col: to_pandas_dtype(typ) 712 for col, typ in self_dtypes.items() 713 }, 714 safe_copy = safe_copy, 715 debug = debug 716 ), 717 on_cols_dtypes, 718 ) 719 720 ### Cast dicts or lists to strings so we can merge. 721 serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str) 722 def deserializer(x): 723 return json.loads(x) if isinstance(x, str) else x 724 725 unhashable_delta_cols = get_unhashable_cols(delta_df) 726 unhashable_backtrack_cols = get_unhashable_cols(backtrack_df) 727 for col in unhashable_delta_cols: 728 delta_df[col] = delta_df[col].apply(serializer) 729 for col in unhashable_backtrack_cols: 730 backtrack_df[col] = backtrack_df[col].apply(serializer) 731 casted_cols = set(unhashable_delta_cols + unhashable_backtrack_cols) 732 733 joined_df = merge( 734 delta_df.fillna(NA), 735 backtrack_df.fillna(NA), 736 how = 'left', 737 on = on_cols, 738 indicator = True, 739 suffixes = ('', '_old'), 740 ) if on_cols else delta_df 741 for col in casted_cols: 742 if col in joined_df.columns: 743 joined_df[col] = joined_df[col].apply(deserializer) 744 if col in delta_df.columns: 745 delta_df[col] = delta_df[col].apply(deserializer) 746 747 ### Determine which rows are completely new. 748 new_rows_mask = (joined_df['_merge'] == 'left_only') if on_cols else None 749 cols = list(backtrack_df.columns) 750 751 unseen_df = ( 752 ( 753 joined_df 754 .where(new_rows_mask) 755 .dropna(how='all')[cols] 756 .reset_index(drop=True) 757 ) if not is_dask else ( 758 joined_df 759 .where(new_rows_mask) 760 .dropna(how='all')[cols] 761 .reset_index(drop=True) 762 ) 763 ) if on_cols else delta_df 764 765 ### Rows that have already been inserted but values have changed. 766 update_df = ( 767 joined_df 768 .where(~new_rows_mask) 769 .dropna(how='all')[cols] 770 .reset_index(drop=True) 771 ) if on_cols else None 772 773 return unseen_df, update_df, delta_df
Inspect a dataframe and filter out rows which already exist in the pipe.
Parameters
- df ('pd.DataFrame'): The dataframe to inspect and filter.
- safe_copy (bool, default True):
If
True
, create a copy before comparing and modifying the dataframes. Setting toFalse
may mutate the DataFrames. Seemeerschaum.utils.dataframe.filter_unseen_df
. - date_bound_only (bool, default False):
If
True
, only use the datetime index to fetch the sample dataframe. - chunksize (Optional[int], default -1):
The
chunksize
used when fetching existing data. - debug (bool, default False): Verbosity toggle.
Returns
- A tuple of three pandas DataFrames (unseen, update, and delta.):
798def get_num_workers(self, workers: Optional[int] = None) -> int: 799 """ 800 Get the number of workers to use for concurrent syncs. 801 802 Parameters 803 ---------- 804 The number of workers passed via `--workers`. 805 806 Returns 807 ------- 808 The number of workers, capped for safety. 809 """ 810 is_thread_safe = getattr(self.instance_connector, 'IS_THREAD_SAFE', False) 811 if not is_thread_safe: 812 return 1 813 814 engine_pool_size = ( 815 self.instance_connector.engine.pool.size() 816 if self.instance_connector.type == 'sql' 817 else None 818 ) 819 current_num_threads = threading.active_count() 820 current_num_connections = ( 821 self.instance_connector.engine.pool.checkedout() 822 if engine_pool_size is not None 823 else current_num_threads 824 ) 825 desired_workers = ( 826 min(workers or engine_pool_size, engine_pool_size) 827 if engine_pool_size is not None 828 else workers 829 ) 830 if desired_workers is None: 831 desired_workers = (multiprocessing.cpu_count() if is_thread_safe else 1) 832 833 return max( 834 (desired_workers - current_num_connections), 835 1, 836 )
Get the number of workers to use for concurrent syncs.
Parameters
- The number of workers passed via
--workers
.
Returns
- The number of workers, capped for safety.
15def verify( 16 self, 17 begin: Union[datetime, int, None] = None, 18 end: Union[datetime, int, None] = None, 19 params: Optional[Dict[str, Any]] = None, 20 chunk_interval: Union[timedelta, int, None] = None, 21 bounded: Optional[bool] = None, 22 deduplicate: bool = False, 23 workers: Optional[int] = None, 24 debug: bool = False, 25 **kwargs: Any 26 ) -> SuccessTuple: 27 """ 28 Verify the contents of the pipe by resyncing its interval. 29 30 Parameters 31 ---------- 32 begin: Union[datetime, int, None], default None 33 If specified, only verify rows greater than or equal to this value. 34 35 end: Union[datetime, int, None], default None 36 If specified, only verify rows less than this value. 37 38 chunk_interval: Union[timedelta, int, None], default None 39 If provided, use this as the size of the chunk boundaries. 40 Default to the value set in `pipe.parameters['chunk_minutes']` (1440). 41 42 bounded: Optional[bool], default None 43 If `True`, do not verify older than the oldest sync time or newer than the newest. 44 If `False`, verify unbounded syncs outside of the new and old sync times. 45 The default behavior (`None`) is to bound only if a bound interval is set 46 (e.g. `pipe.parameters['verify']['bound_days']`). 47 48 deduplicate: bool, default False 49 If `True`, deduplicate the pipe's table after the verification syncs. 50 51 workers: Optional[int], default None 52 If provided, limit the verification to this many threads. 53 Use a value of `1` to sync chunks in series. 54 55 debug: bool, default False 56 Verbosity toggle. 57 58 kwargs: Any 59 All keyword arguments are passed to `pipe.sync()`. 60 61 Returns 62 ------- 63 A SuccessTuple indicating whether the pipe was successfully resynced. 64 """ 65 from meerschaum.utils.pool import get_pool 66 from meerschaum.utils.misc import interval_str 67 workers = self.get_num_workers(workers) 68 69 ### Skip configured bounding in parameters 70 ### if `bounded` is explicitly `False`. 71 bound_time = ( 72 self.get_bound_time(debug=debug) 73 if bounded is not False 74 else None 75 ) 76 if bounded is None: 77 bounded = bound_time is not None 78 79 if bounded and begin is None: 80 begin = ( 81 bound_time 82 if bound_time is not None 83 else self.get_sync_time(newest=False, debug=debug) 84 ) 85 if bounded and end is None: 86 end = self.get_sync_time(newest=True, debug=debug) 87 88 if bounded and end is not None: 89 end += ( 90 timedelta(minutes=1) 91 if isinstance(end, datetime) 92 else 1 93 ) 94 95 sync_less_than_begin = not bounded and begin is None 96 sync_greater_than_end = not bounded and end is None 97 98 cannot_determine_bounds = not self.exists(debug=debug) 99 100 if cannot_determine_bounds: 101 sync_success, sync_msg = self.sync( 102 begin = begin, 103 end = end, 104 params = params, 105 workers = workers, 106 debug = debug, 107 **kwargs 108 ) 109 if not sync_success: 110 return sync_success, sync_msg 111 if deduplicate: 112 return self.deduplicate( 113 begin = begin, 114 end = end, 115 params = params, 116 workers = workers, 117 debug = debug, 118 **kwargs 119 ) 120 return sync_success, sync_msg 121 122 123 chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug) 124 chunk_bounds = self.get_chunk_bounds( 125 begin = begin, 126 end = end, 127 chunk_interval = chunk_interval, 128 bounded = bounded, 129 debug = debug, 130 ) 131 132 ### Consider it a success if no chunks need to be verified. 133 if not chunk_bounds: 134 if deduplicate: 135 return self.deduplicate( 136 begin = begin, 137 end = end, 138 params = params, 139 workers = workers, 140 debug = debug, 141 **kwargs 142 ) 143 return True, f"Could not determine chunks between '{begin}' and '{end}'; nothing to do." 144 145 begin_to_print = ( 146 begin 147 if begin is not None 148 else ( 149 chunk_bounds[0][0] 150 if bounded 151 else chunk_bounds[0][1] 152 ) 153 ) 154 end_to_print = ( 155 end 156 if end is not None 157 else ( 158 chunk_bounds[-1][1] 159 if bounded 160 else chunk_bounds[-1][0] 161 ) 162 ) 163 164 info( 165 f"Syncing {len(chunk_bounds)} chunk" + ('s' if len(chunk_bounds) != 1 else '') 166 + f" ({'un' if not bounded else ''}bounded)" 167 + f" of size '{interval_str(chunk_interval)}'" 168 + f" between '{begin_to_print}' and '{end_to_print}'." 169 ) 170 171 pool = get_pool(workers=workers) 172 173 ### Dictionary of the form bounds -> success_tuple, e.g.: 174 ### { 175 ### (2023-01-01, 2023-01-02): (True, "Success") 176 ### } 177 bounds_success_tuples = {} 178 def process_chunk_bounds( 179 chunk_begin_and_end: Tuple[ 180 Union[int, datetime], 181 Union[int, datetime] 182 ] 183 ): 184 if chunk_begin_and_end in bounds_success_tuples: 185 return chunk_begin_and_end, bounds_success_tuples[chunk_begin_and_end] 186 187 chunk_begin, chunk_end = chunk_begin_and_end 188 return chunk_begin_and_end, self.sync( 189 begin = chunk_begin, 190 end = chunk_end, 191 params = params, 192 workers = workers, 193 debug = debug, 194 **kwargs 195 ) 196 197 ### If we have more than one chunk, attempt to sync the first one and return if its fails. 198 if len(chunk_bounds) > 1: 199 first_chunk_bounds = chunk_bounds[0] 200 ( 201 (first_begin, first_end), 202 (first_success, first_msg) 203 ) = process_chunk_bounds(first_chunk_bounds) 204 if not first_success: 205 return ( 206 first_success, 207 f"\n{first_begin} - {first_end}\n" 208 + f"Failed to sync first chunk:\n{first_msg}" 209 ) 210 bounds_success_tuples[first_chunk_bounds] = (first_success, first_msg) 211 212 bounds_success_tuples.update(dict(pool.map(process_chunk_bounds, chunk_bounds))) 213 bounds_success_bools = {bounds: tup[0] for bounds, tup in bounds_success_tuples.items()} 214 215 message_header = f"{begin_to_print} - {end_to_print}" 216 if all(bounds_success_bools.values()): 217 msg = get_chunks_success_message(bounds_success_tuples, header=message_header) 218 if deduplicate: 219 deduplicate_success, deduplicate_msg = self.deduplicate( 220 begin = begin, 221 end = end, 222 params = params, 223 workers = workers, 224 debug = debug, 225 **kwargs 226 ) 227 return deduplicate_success, msg + '\n\n' + deduplicate_msg 228 return True, msg 229 230 chunk_bounds_to_resync = [ 231 bounds 232 for bounds, success in zip(chunk_bounds, bounds_success_bools) 233 if not success 234 ] 235 bounds_to_print = [ 236 f"{bounds[0]} - {bounds[1]}" 237 for bounds in chunk_bounds_to_resync 238 ] 239 if bounds_to_print: 240 warn( 241 f"Will resync the following failed chunks:\n " 242 + '\n '.join(bounds_to_print), 243 stack = False, 244 ) 245 246 retry_bounds_success_tuples = dict(pool.map(process_chunk_bounds, chunk_bounds_to_resync)) 247 bounds_success_tuples.update(retry_bounds_success_tuples) 248 retry_bounds_success_bools = { 249 bounds: tup[0] 250 for bounds, tup in retry_bounds_success_tuples.items() 251 } 252 253 if all(retry_bounds_success_bools.values()): 254 message = ( 255 get_chunks_success_message(bounds_success_tuples, header=message_header) 256 + f"\nRetried {len(chunk_bounds_to_resync)} chunks." 257 ) 258 if deduplicate: 259 deduplicate_success, deduplicate_msg = self.deduplicate( 260 begin = begin, 261 end = end, 262 params = params, 263 workers = workers, 264 debug = debug, 265 **kwargs 266 ) 267 return deduplicate_success, message + '\n\n' + deduplicate_msg 268 return True, message 269 270 message = get_chunks_success_message(bounds_success_tuples, header=message_header) 271 if deduplicate: 272 deduplicate_success, deduplicate_msg = self.deduplicate( 273 begin = begin, 274 end = end, 275 params = params, 276 workers = workers, 277 debug = debug, 278 **kwargs 279 ) 280 return deduplicate_success, message + '\n\n' + deduplicate_msg 281 return False, message
Verify the contents of the pipe by resyncing its interval.
Parameters
- begin (Union[datetime, int, None], default None): If specified, only verify rows greater than or equal to this value.
- end (Union[datetime, int, None], default None): If specified, only verify rows less than this value.
- chunk_interval (Union[timedelta, int, None], default None):
If provided, use this as the size of the chunk boundaries.
Default to the value set in
pipe.parameters['chunk_minutes']
(1440). - bounded (Optional[bool], default None):
If
True
, do not verify older than the oldest sync time or newer than the newest. IfFalse
, verify unbounded syncs outside of the new and old sync times. The default behavior (None
) is to bound only if a bound interval is set (e.g.pipe.parameters['verify']['bound_days']
). - deduplicate (bool, default False):
If
True
, deduplicate the pipe's table after the verification syncs. - workers (Optional[int], default None):
If provided, limit the verification to this many threads.
Use a value of
1
to sync chunks in series. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All keyword arguments are passed to
pipe.sync()
.
Returns
- A SuccessTuple indicating whether the pipe was successfully resynced.
353def get_bound_interval(self, debug: bool = False) -> Union[timedelta, int, None]: 354 """ 355 Return the interval used to determine the bound time (limit for verification syncs). 356 If the datetime axis is an integer, just return its value. 357 358 Below are the supported keys for the bound interval: 359 360 - `pipe.parameters['verify']['bound_minutes']` 361 - `pipe.parameters['verify']['bound_hours']` 362 - `pipe.parameters['verify']['bound_days']` 363 - `pipe.parameters['verify']['bound_weeks']` 364 - `pipe.parameters['verify']['bound_years']` 365 - `pipe.parameters['verify']['bound_seconds']` 366 367 If multiple keys are present, the first on this priority list will be used. 368 369 Returns 370 ------- 371 A `timedelta` or `int` value to be used to determine the bound time. 372 """ 373 verify_params = self.parameters.get('verify', {}) 374 prefix = 'bound_' 375 suffixes_to_check = ('minutes', 'hours', 'days', 'weeks', 'years', 'seconds') 376 keys_to_search = { 377 key: val 378 for key, val in verify_params.items() 379 if key.startswith(prefix) 380 } 381 bound_time_key, bound_time_value = None, None 382 for key, value in keys_to_search.items(): 383 for suffix in suffixes_to_check: 384 if key == prefix + suffix: 385 bound_time_key = key 386 bound_time_value = value 387 break 388 if bound_time_key is not None: 389 break 390 391 if bound_time_value is None: 392 return bound_time_value 393 394 dt_col = self.columns.get('datetime', None) 395 if not dt_col: 396 return bound_time_value 397 398 dt_typ = self.dtypes.get(dt_col, 'datetime64[ns]') 399 if 'int' in dt_typ.lower(): 400 return int(bound_time_value) 401 402 interval_type = bound_time_key.replace(prefix, '') 403 return timedelta(**{interval_type: bound_time_value})
Return the interval used to determine the bound time (limit for verification syncs). If the datetime axis is an integer, just return its value.
Below are the supported keys for the bound interval:
- `pipe.parameters['verify']['bound_minutes']`
- `pipe.parameters['verify']['bound_hours']`
- `pipe.parameters['verify']['bound_days']`
- `pipe.parameters['verify']['bound_weeks']`
- `pipe.parameters['verify']['bound_years']`
- `pipe.parameters['verify']['bound_seconds']`
If multiple keys are present, the first on this priority list will be used.
Returns
- A
timedelta
orint
value to be used to determine the bound time.
406def get_bound_time(self, debug: bool = False) -> Union[datetime, int, None]: 407 """ 408 The bound time is the limit at which long-running verification syncs should stop. 409 A value of `None` means verification syncs should be unbounded. 410 411 Like deriving a backtrack time from `pipe.get_sync_time()`, 412 the bound time is the sync time minus a large window (e.g. 366 days). 413 414 Unbound verification syncs (i.e. `bound_time is None`) 415 if the oldest sync time is less than the bound interval. 416 417 Returns 418 ------- 419 A `datetime` or `int` corresponding to the 420 `begin` bound for verification and deduplication syncs. 421 """ 422 bound_interval = self.get_bound_interval(debug=debug) 423 if bound_interval is None: 424 return None 425 426 sync_time = self.get_sync_time(debug=debug) 427 if sync_time is None: 428 return None 429 430 bound_time = sync_time - bound_interval 431 oldest_sync_time = self.get_sync_time(newest=False, debug=debug) 432 433 return ( 434 bound_time 435 if bound_time > oldest_sync_time 436 else None 437 )
The bound time is the limit at which long-running verification syncs should stop.
A value of None
means verification syncs should be unbounded.
Like deriving a backtrack time from pipe.get_sync_time()
,
the bound time is the sync time minus a large window (e.g. 366 days).
Unbound verification syncs (i.e. bound_time is None
)
if the oldest sync time is less than the bound interval.
Returns
- A
datetime
orint
corresponding to the begin
bound for verification and deduplication syncs.
12def delete( 13 self, 14 drop: bool = True, 15 debug: bool = False, 16 **kw 17 ) -> SuccessTuple: 18 """ 19 Call the Pipe's instance connector's `delete_pipe()` method. 20 21 Parameters 22 ---------- 23 drop: bool, default True 24 If `True`, drop the pipes' target table. 25 26 debug : bool, default False 27 Verbosity toggle. 28 29 Returns 30 ------- 31 A `SuccessTuple` of success (`bool`), message (`str`). 32 33 """ 34 import os, pathlib 35 from meerschaum.utils.warnings import warn 36 from meerschaum.utils.venv import Venv 37 from meerschaum.connectors import get_connector_plugin 38 39 if self.temporary: 40 return ( 41 False, 42 "Cannot delete pipes created with `temporary=True` (read-only). " 43 + "You may want to call `pipe.drop()` instead." 44 ) 45 46 if self.cache_pipe is not None: 47 _drop_cache_tuple = self.cache_pipe.drop(debug=debug, **kw) 48 if not _drop_cache_tuple[0]: 49 warn(_drop_cache_tuple[1]) 50 if getattr(self.cache_connector, 'flavor', None) == 'sqlite': 51 _cache_db_path = pathlib.Path(self.cache_connector.database) 52 try: 53 os.remove(_cache_db_path) 54 except Exception as e: 55 warn(f"Could not delete cache file '{_cache_db_path}' for {self}:\n{e}") 56 57 if drop: 58 drop_success, drop_msg = self.drop(debug=debug) 59 if not drop_success: 60 warn(f"Failed to drop {self}:\n{drop_msg}") 61 62 with Venv(get_connector_plugin(self.instance_connector)): 63 result = self.instance_connector.delete_pipe(self, debug=debug, **kw) 64 65 if not isinstance(result, tuple): 66 return False, f"Received an unexpected result from '{self.instance_connector}': {result}" 67 68 if result[0]: 69 to_delete = ['_id'] 70 for member in to_delete: 71 if member in self.__dict__: 72 del self.__dict__[member] 73 return result
Call the Pipe's instance connector's delete_pipe()
method.
Parameters
- drop (bool, default True):
If
True
, drop the pipes' target table. - debug (bool, default False): Verbosity toggle.
Returns
- A
SuccessTuple
of success (bool
), message (str
).
13def drop( 14 self, 15 debug: bool = False, 16 **kw: Any 17 ) -> SuccessTuple: 18 """ 19 Call the Pipe's instance connector's `drop_pipe()` method. 20 21 Parameters 22 ---------- 23 debug: bool, default False: 24 Verbosity toggle. 25 26 Returns 27 ------- 28 A `SuccessTuple` of success, message. 29 30 """ 31 self._exists = False 32 from meerschaum.utils.warnings import warn 33 from meerschaum.utils.venv import Venv 34 from meerschaum.connectors import get_connector_plugin 35 36 if self.cache_pipe is not None: 37 _drop_cache_tuple = self.cache_pipe.drop(debug=debug, **kw) 38 if not _drop_cache_tuple[0]: 39 warn(_drop_cache_tuple[1]) 40 41 with Venv(get_connector_plugin(self.instance_connector)): 42 result = self.instance_connector.drop_pipe(self, debug=debug, **kw) 43 return result
Call the Pipe's instance connector's drop_pipe()
method.
Parameters
- debug (bool, default False:): Verbosity toggle.
Returns
- A
SuccessTuple
of success, message.
13def clear( 14 self, 15 begin: Optional[datetime.datetime] = None, 16 end: Optional[datetime.datetime] = None, 17 params: Optional[Dict[str, Any]] = None, 18 debug: bool = False, 19 **kwargs: Any 20 ) -> SuccessTuple: 21 """ 22 Call the Pipe's instance connector's `clear_pipe` method. 23 24 Parameters 25 ---------- 26 begin: Optional[datetime.datetime], default None: 27 If provided, only remove rows newer than this datetime value. 28 29 end: Optional[datetime.datetime], default None: 30 If provided, only remove rows older than this datetime column (not including end). 31 32 params: Optional[Dict[str, Any]], default None 33 See `meerschaum.utils.sql.build_where`. 34 35 debug: bool, default False: 36 Verbositity toggle. 37 38 Returns 39 ------- 40 A `SuccessTuple` corresponding to whether this procedure completed successfully. 41 42 Examples 43 -------- 44 >>> pipe = mrsm.Pipe('test', 'test', columns={'datetime': 'dt'}, instance='sql:local') 45 >>> pipe.sync({'dt': [datetime.datetime(2020, 1, 1, 0, 0)]}) 46 >>> pipe.sync({'dt': [datetime.datetime(2021, 1, 1, 0, 0)]}) 47 >>> pipe.sync({'dt': [datetime.datetime(2022, 1, 1, 0, 0)]}) 48 >>> 49 >>> pipe.clear(begin=datetime.datetime(2021, 1, 1, 0, 0)) 50 >>> pipe.get_data() 51 dt 52 0 2020-01-01 53 54 """ 55 from meerschaum.utils.warnings import warn 56 from meerschaum.utils.venv import Venv 57 from meerschaum.connectors import get_connector_plugin 58 59 if self.cache_pipe is not None: 60 success, msg = self.cache_pipe.clear( 61 begin = begin, 62 end = end, 63 params = params, 64 debug = debug, 65 **kwargs 66 ) 67 if not success: 68 warn(msg) 69 70 with Venv(get_connector_plugin(self.instance_connector)): 71 return self.instance_connector.clear_pipe( 72 self, 73 begin = begin, 74 end = end, 75 params = params, 76 debug = debug, 77 **kwargs 78 )
Call the Pipe's instance connector's clear_pipe
method.
Parameters
- begin (Optional[datetime.datetime], default None:): If provided, only remove rows newer than this datetime value.
- end (Optional[datetime.datetime], default None:): If provided, only remove rows older than this datetime column (not including end).
- params (Optional[Dict[str, Any]], default None):
See
meerschaum.utils.sql.build_where
. - debug (bool, default False:): Verbositity toggle.
Returns
- A
SuccessTuple
corresponding to whether this procedure completed successfully.
Examples
>>> pipe = mrsm.Pipe('test', 'test', columns={'datetime': 'dt'}, instance='sql:local')
>>> pipe.sync({'dt': [datetime.datetime(2020, 1, 1, 0, 0)]})
>>> pipe.sync({'dt': [datetime.datetime(2021, 1, 1, 0, 0)]})
>>> pipe.sync({'dt': [datetime.datetime(2022, 1, 1, 0, 0)]})
>>>
>>> pipe.clear(begin=datetime.datetime(2021, 1, 1, 0, 0))
>>> pipe.get_data()
dt
0 2020-01-01
15def deduplicate( 16 self, 17 begin: Union[datetime, int, None] = None, 18 end: Union[datetime, int, None] = None, 19 params: Optional[Dict[str, Any]] = None, 20 chunk_interval: Union[datetime, int, None] = None, 21 bounded: Optional[bool] = None, 22 workers: Optional[int] = None, 23 debug: bool = False, 24 _use_instance_method: bool = True, 25 **kwargs: Any 26 ) -> SuccessTuple: 27 """ 28 Call the Pipe's instance connector's `delete_duplicates` method to delete duplicate rows. 29 30 Parameters 31 ---------- 32 begin: Union[datetime, int, None], default None: 33 If provided, only deduplicate rows newer than this datetime value. 34 35 end: Union[datetime, int, None], default None: 36 If provided, only deduplicate rows older than this datetime column (not including end). 37 38 params: Optional[Dict[str, Any]], default None 39 Restrict deduplication to this filter (for multiplexed data streams). 40 See `meerschaum.utils.sql.build_where`. 41 42 chunk_interval: Union[timedelta, int, None], default None 43 If provided, use this for the chunk bounds. 44 Defaults to the value set in `pipe.parameters['chunk_minutes']` (1440). 45 46 bounded: Optional[bool], default None 47 Only check outside the oldest and newest sync times if bounded is explicitly `False`. 48 49 workers: Optional[int], default None 50 If the instance connector is thread-safe, limit concurrenct syncs to this many threads. 51 52 debug: bool, default False: 53 Verbositity toggle. 54 55 kwargs: Any 56 All other keyword arguments are passed to 57 `pipe.sync()`, `pipe.clear()`, and `pipe.get_data(). 58 59 Returns 60 ------- 61 A `SuccessTuple` corresponding to whether all of the chunks were successfully deduplicated. 62 """ 63 from meerschaum.utils.warnings import warn, info 64 from meerschaum.utils.misc import interval_str, items_str 65 from meerschaum.utils.venv import Venv 66 from meerschaum.connectors import get_connector_plugin 67 from meerschaum.utils.pool import get_pool 68 69 if self.cache_pipe is not None: 70 success, msg = self.cache_pipe.deduplicate( 71 begin = begin, 72 end = end, 73 params = params, 74 bounded = bounded, 75 debug = debug, 76 _use_instance_method = _use_instance_method, 77 **kwargs 78 ) 79 if not success: 80 warn(msg) 81 82 workers = self.get_num_workers(workers=workers) 83 pool = get_pool(workers=workers) 84 85 if _use_instance_method: 86 with Venv(get_connector_plugin(self.instance_connector)): 87 if hasattr(self.instance_connector, 'deduplicate_pipe'): 88 return self.instance_connector.deduplicate_pipe( 89 self, 90 begin = begin, 91 end = end, 92 params = params, 93 bounded = bounded, 94 debug = debug, 95 **kwargs 96 ) 97 98 ### Only unbound if explicitly False. 99 if bounded is None: 100 bounded = True 101 chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug) 102 103 bound_time = self.get_bound_time(debug=debug) 104 if bounded and begin is None: 105 begin = ( 106 bound_time 107 if bound_time is not None 108 else self.get_sync_time(newest=False, debug=debug) 109 ) 110 if bounded and end is None: 111 end = self.get_sync_time(newest=True, debug=debug) 112 113 if bounded and end is not None: 114 end += ( 115 timedelta(minutes=1) 116 if isinstance(end, datetime) 117 else 1 118 ) 119 120 chunk_bounds = self.get_chunk_bounds( 121 bounded = bounded, 122 begin = begin, 123 end = end, 124 chunk_interval = chunk_interval, 125 debug = debug, 126 ) 127 128 indices = [col for col in self.columns.values() if col] 129 if not indices: 130 return False, f"Cannot deduplicate without index columns." 131 dt_col = self.columns.get('datetime', None) 132 133 def process_chunk_bounds(bounds) -> Tuple[ 134 Tuple[ 135 Union[datetime, int, None], 136 Union[datetime, int, None] 137 ], 138 SuccessTuple 139 ]: 140 ### Only selecting the index values here to keep bandwidth down. 141 chunk_begin, chunk_end = bounds 142 chunk_df = self.get_data( 143 select_columns = indices, 144 begin = chunk_begin, 145 end = chunk_end, 146 params = params, 147 debug = debug, 148 ) 149 if chunk_df is None: 150 return bounds, (True, "") 151 existing_chunk_len = len(chunk_df) 152 deduped_chunk_df = chunk_df.drop_duplicates(keep='last') 153 deduped_chunk_len = len(deduped_chunk_df) 154 155 if existing_chunk_len == deduped_chunk_len: 156 return bounds, (True, "") 157 158 chunk_msg_header = f"\n{chunk_begin} - {chunk_end}" 159 chunk_msg_body = "" 160 161 full_chunk = self.get_data( 162 begin = chunk_begin, 163 end = chunk_end, 164 params = params, 165 debug = debug, 166 ) 167 if full_chunk is None or len(full_chunk) == 0: 168 return bounds, (True, f"{chunk_msg_header}\nChunk is empty, skipping...") 169 170 chunk_indices = [ix for ix in indices if ix in full_chunk.columns] 171 if not chunk_indices: 172 return bounds, (False, f"None of {items_str(indices)} were present in chunk.") 173 try: 174 full_chunk = full_chunk.drop_duplicates( 175 subset = chunk_indices, 176 keep = 'last' 177 ).reset_index( 178 drop = True, 179 ) 180 except Exception as e: 181 return ( 182 bounds, 183 (False, f"Failed to deduplicate chunk on {items_str(chunk_indices)}:\n({e})") 184 ) 185 186 clear_success, clear_msg = self.clear( 187 begin = chunk_begin, 188 end = chunk_end, 189 params = params, 190 debug = debug, 191 ) 192 if not clear_success: 193 chunk_msg_body += f"Failed to clear chunk while deduplicating:\n{clear_msg}\n" 194 warn(chunk_msg_body) 195 196 sync_success, sync_msg = self.sync(full_chunk, debug=debug) 197 if not sync_success: 198 chunk_msg_body += f"Failed to sync chunk while deduplicating:\n{sync_msg}\n" 199 200 ### Finally check if the deduplication worked. 201 chunk_rowcount = self.get_rowcount( 202 begin = chunk_begin, 203 end = chunk_end, 204 params = params, 205 debug = debug, 206 ) 207 if chunk_rowcount != deduped_chunk_len: 208 return bounds, ( 209 False, ( 210 chunk_msg_header + "\n" 211 + chunk_msg_body + ("\n" if chunk_msg_body else '') 212 + "Chunk rowcounts still differ (" 213 + f"{chunk_rowcount} rowcount vs {deduped_chunk_len} chunk length)." 214 ) 215 ) 216 217 return bounds, ( 218 True, ( 219 chunk_msg_header + "\n" 220 + chunk_msg_body + ("\n" if chunk_msg_body else '') 221 + f"Deduplicated chunk from {existing_chunk_len} to {chunk_rowcount} rows." 222 ) 223 ) 224 225 info( 226 f"Deduplicating {len(chunk_bounds)} chunk" 227 + ('s' if len(chunk_bounds) != 1 else '') 228 + f" ({'un' if not bounded else ''}bounded)" 229 + f" of size '{interval_str(chunk_interval)}'" 230 + f" on {self}." 231 ) 232 bounds_success_tuples = dict(pool.map(process_chunk_bounds, chunk_bounds)) 233 bounds_successes = { 234 bounds: success_tuple 235 for bounds, success_tuple in bounds_success_tuples.items() 236 if success_tuple[0] 237 } 238 bounds_failures = { 239 bounds: success_tuple 240 for bounds, success_tuple in bounds_success_tuples.items() 241 if not success_tuple[0] 242 } 243 244 ### No need to retry if everything failed. 245 if len(bounds_failures) > 0 and len(bounds_successes) == 0: 246 return ( 247 False, 248 ( 249 f"Failed to deduplicate {len(bounds_failures)} chunk" 250 + ('s' if len(bounds_failures) != 1 else '') 251 + ".\n" 252 + "\n".join([msg for _, (_, msg) in bounds_failures.items() if msg]) 253 ) 254 ) 255 256 retry_bounds = [bounds for bounds in bounds_failures] 257 if not retry_bounds: 258 return ( 259 True, 260 ( 261 f"Successfully deduplicated {len(bounds_successes)} chunk" 262 + ('s' if len(bounds_successes) != 1 else '') 263 + ".\n" 264 + "\n".join([msg for _, (_, msg) in bounds_successes.items() if msg]) 265 ).rstrip('\n') 266 ) 267 268 info(f"Retrying {len(retry_bounds)} chunks for {self}...") 269 retry_bounds_success_tuples = dict(pool.map(process_chunk_bounds, retry_bounds)) 270 retry_bounds_successes = { 271 bounds: success_tuple 272 for bounds, success_tuple in bounds_success_tuples.items() 273 if success_tuple[0] 274 } 275 retry_bounds_failures = { 276 bounds: success_tuple 277 for bounds, success_tuple in bounds_success_tuples.items() 278 if not success_tuple[0] 279 } 280 281 bounds_successes.update(retry_bounds_successes) 282 if not retry_bounds_failures: 283 return ( 284 True, 285 ( 286 f"Successfully deduplicated {len(bounds_successes)} chunk" 287 + ('s' if len(bounds_successes) != 1 else '') 288 + f"({len(retry_bounds_successes)} retried):\n" 289 + "\n".join([msg for _, (_, msg) in bounds_successes.items() if msg]) 290 ).rstrip('\n') 291 ) 292 293 return ( 294 False, 295 ( 296 f"Failed to deduplicate {len(bounds_failures)} chunk" 297 + ('s' if len(retry_bounds_failures) != 1 else '') 298 + ".\n" 299 + "\n".join([msg for _, (_, msg) in retry_bounds_failures.items() if msg]) 300 ).rstrip('\n') 301 )
Call the Pipe's instance connector's delete_duplicates
method to delete duplicate rows.
Parameters
- begin (Union[datetime, int, None], default None:): If provided, only deduplicate rows newer than this datetime value.
- end (Union[datetime, int, None], default None:): If provided, only deduplicate rows older than this datetime column (not including end).
- params (Optional[Dict[str, Any]], default None):
Restrict deduplication to this filter (for multiplexed data streams).
See
meerschaum.utils.sql.build_where
. - chunk_interval (Union[timedelta, int, None], default None):
If provided, use this for the chunk bounds.
Defaults to the value set in
pipe.parameters['chunk_minutes']
(1440). - bounded (Optional[bool], default None):
Only check outside the oldest and newest sync times if bounded is explicitly
False
. - workers (Optional[int], default None): If the instance connector is thread-safe, limit concurrenct syncs to this many threads.
- debug (bool, default False:): Verbositity toggle.
- kwargs (Any):
All other keyword arguments are passed to
pipe.sync()
,pipe.clear()
, and `pipe.get_data().
Returns
- A
SuccessTuple
corresponding to whether all of the chunks were successfully deduplicated.
13def bootstrap( 14 self, 15 debug: bool = False, 16 yes: bool = False, 17 force: bool = False, 18 noask: bool = False, 19 shell: bool = False, 20 **kw 21 ) -> SuccessTuple: 22 """ 23 Prompt the user to create a pipe's requirements all from one method. 24 This method shouldn't be used in any automated scripts because it interactively 25 prompts the user and therefore may hang. 26 27 Parameters 28 ---------- 29 debug: bool, default False: 30 Verbosity toggle. 31 32 yes: bool, default False: 33 Print the questions and automatically agree. 34 35 force: bool, default False: 36 Skip the questions and agree anyway. 37 38 noask: bool, default False: 39 Print the questions but go with the default answer. 40 41 shell: bool, default False: 42 Used to determine if we are in the interactive shell. 43 44 Returns 45 ------- 46 A `SuccessTuple` corresponding to the success of this procedure. 47 48 """ 49 50 from meerschaum.utils.warnings import warn, info, error 51 from meerschaum.utils.prompt import prompt, yes_no 52 from meerschaum.utils.formatting import pprint 53 from meerschaum.config import get_config 54 from meerschaum.utils.formatting._shell import clear_screen 55 from meerschaum.utils.formatting import print_tuple 56 from meerschaum.actions import actions 57 from meerschaum.utils.venv import Venv 58 from meerschaum.connectors import get_connector_plugin 59 60 _clear = get_config('shell', 'clear_screen', patch=True) 61 62 if self.get_id(debug=debug) is not None: 63 delete_tuple = self.delete(debug=debug) 64 if not delete_tuple[0]: 65 return delete_tuple 66 67 if _clear: 68 clear_screen(debug=debug) 69 70 _parameters = _get_parameters(self, debug=debug) 71 self.parameters = _parameters 72 pprint(self.parameters) 73 try: 74 prompt( 75 f"\n Press [Enter] to register {self} with the above configuration:", 76 icon = False 77 ) 78 except KeyboardInterrupt as e: 79 return False, f"Aborting bootstrapping {self}." 80 81 with Venv(get_connector_plugin(self.instance_connector)): 82 register_tuple = self.instance_connector.register_pipe(self, debug=debug) 83 84 if not register_tuple[0]: 85 return register_tuple 86 87 if _clear: 88 clear_screen(debug=debug) 89 90 try: 91 if yes_no( 92 f"Would you like to edit the definition for {self}?", yes=yes, noask=noask 93 ): 94 edit_tuple = self.edit_definition(debug=debug) 95 if not edit_tuple[0]: 96 return edit_tuple 97 98 if yes_no(f"Would you like to try syncing {self} now?", yes=yes, noask=noask): 99 sync_tuple = actions['sync']( 100 ['pipes'], 101 connector_keys = [self.connector_keys], 102 metric_keys = [self.metric_key], 103 location_keys = [self.location_key], 104 mrsm_instance = str(self.instance_connector), 105 debug = debug, 106 shell = shell, 107 ) 108 if not sync_tuple[0]: 109 return sync_tuple 110 except Exception as e: 111 return False, f"Failed to bootstrap {self}:\n" + str(e) 112 113 print_tuple((True, f"Finished bootstrapping {self}!")) 114 info( 115 f"You can edit this pipe later with `edit pipes` " 116 + "or set the definition with `edit pipes definition`.\n" 117 + " To sync data into your pipe, run `sync pipes`." 118 ) 119 120 return True, "Success"
Prompt the user to create a pipe's requirements all from one method. This method shouldn't be used in any automated scripts because it interactively prompts the user and therefore may hang.
Parameters
- debug (bool, default False:): Verbosity toggle.
- yes (bool, default False:): Print the questions and automatically agree.
- force (bool, default False:): Skip the questions and agree anyway.
- noask (bool, default False:): Print the questions but go with the default answer.
- shell (bool, default False:): Used to determine if we are in the interactive shell.
Returns
- A
SuccessTuple
corresponding to the success of this procedure.
14def enforce_dtypes( 15 self, 16 df: 'pd.DataFrame', 17 chunksize: Optional[int] = -1, 18 safe_copy: bool = True, 19 debug: bool = False, 20 ) -> 'pd.DataFrame': 21 """ 22 Cast the input dataframe to the pipe's registered data types. 23 If the pipe does not exist and dtypes are not set, return the dataframe. 24 """ 25 import traceback 26 from meerschaum.utils.warnings import warn 27 from meerschaum.utils.debug import dprint 28 from meerschaum.utils.dataframe import parse_df_datetimes, enforce_dtypes as _enforce_dtypes 29 from meerschaum.utils.packages import import_pandas 30 pd = import_pandas(debug=debug) 31 if df is None: 32 if debug: 33 dprint( 34 f"Received None instead of a DataFrame.\n" 35 + " Skipping dtype enforcement..." 36 ) 37 return df 38 39 pipe_dtypes = self.dtypes 40 41 try: 42 if isinstance(df, str): 43 df = parse_df_datetimes( 44 pd.read_json(StringIO(df)), 45 ignore_cols = [ 46 col 47 for col, dtype in pipe_dtypes.items() 48 if 'datetime' not in str(dtype) 49 ], 50 chunksize = chunksize, 51 debug = debug, 52 ) 53 else: 54 df = parse_df_datetimes( 55 df, 56 ignore_cols = [ 57 col 58 for col, dtype in pipe_dtypes.items() 59 if 'datetime' not in str(dtype) 60 ], 61 chunksize = chunksize, 62 debug = debug, 63 ) 64 except Exception as e: 65 warn(f"Unable to cast incoming data as a DataFrame...:\n{e}\n\n{traceback.format_exc()}") 66 return None 67 68 if not pipe_dtypes: 69 if debug: 70 dprint( 71 f"Could not find dtypes for {self}.\n" 72 + " Skipping dtype enforcement..." 73 ) 74 return df 75 76 return _enforce_dtypes(df, pipe_dtypes, safe_copy=safe_copy, debug=debug)
Cast the input dataframe to the pipe's registered data types. If the pipe does not exist and dtypes are not set, return the dataframe.
79def infer_dtypes(self, persist: bool=False, debug: bool=False) -> Dict[str, Any]: 80 """ 81 If `dtypes` is not set in `meerschaum.Pipe.parameters`, 82 infer the data types from the underlying table if it exists. 83 84 Parameters 85 ---------- 86 persist: bool, default False 87 If `True`, persist the inferred data types to `meerschaum.Pipe.parameters`. 88 89 Returns 90 ------- 91 A dictionary of strings containing the pandas data types for this Pipe. 92 """ 93 if not self.exists(debug=debug): 94 dtypes = {} 95 if not self.columns: 96 return {} 97 dt_col = self.columns.get('datetime', None) 98 if dt_col: 99 if not self.parameters.get('dtypes', {}).get(dt_col, None): 100 dtypes[dt_col] = 'datetime64[ns]' 101 return dtypes 102 103 from meerschaum.utils.sql import get_pd_type 104 from meerschaum.utils.misc import to_pandas_dtype 105 columns_types = self.get_columns_types(debug=debug) 106 107 ### NOTE: get_columns_types() may return either the types as 108 ### PostgreSQL- or Pandas-style. 109 dtypes = { 110 c: ( 111 get_pd_type(t, allow_custom_dtypes=True) 112 if str(t).isupper() 113 else to_pandas_dtype(t) 114 ) 115 for c, t in columns_types.items() 116 } if columns_types else {} 117 if persist: 118 self.dtypes = dtypes 119 self.edit(interactive=False, debug=debug) 120 return dtypes
If dtypes
is not set in meerschaum.Pipe.parameters
,
infer the data types from the underlying table if it exists.
Parameters
- persist (bool, default False):
If
True
, persist the inferred data types tomeerschaum.Pipe.parameters
.
Returns
- A dictionary of strings containing the pandas data types for this Pipe.
33class Plugin: 34 """Handle packaging of Meerschaum plugins.""" 35 def __init__( 36 self, 37 name: str, 38 version: Optional[str] = None, 39 user_id: Optional[int] = None, 40 required: Optional[List[str]] = None, 41 attributes: Optional[Dict[str, Any]] = None, 42 archive_path: Optional[pathlib.Path] = None, 43 venv_path: Optional[pathlib.Path] = None, 44 repo_connector: Optional['meerschaum.connectors.api.APIConnector'] = None, 45 repo: Union['meerschaum.connectors.api.APIConnector', str, None] = None, 46 ): 47 from meerschaum.config.static import STATIC_CONFIG 48 sep = STATIC_CONFIG['plugins']['repo_separator'] 49 _repo = None 50 if sep in name: 51 try: 52 name, _repo = name.split(sep) 53 except Exception as e: 54 error(f"Invalid plugin name: '{name}'") 55 self._repo_in_name = _repo 56 57 if attributes is None: 58 attributes = {} 59 self.name = name 60 self.attributes = attributes 61 self.user_id = user_id 62 self._version = version 63 if required: 64 self._required = required 65 self.archive_path = ( 66 archive_path if archive_path is not None 67 else PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz" 68 ) 69 self.venv_path = ( 70 venv_path if venv_path is not None 71 else VIRTENV_RESOURCES_PATH / self.name 72 ) 73 self._repo_connector = repo_connector 74 self._repo_keys = repo 75 76 77 @property 78 def repo_connector(self): 79 """ 80 Return the repository connector for this plugin. 81 NOTE: This imports the `connectors` module, which imports certain plugin modules. 82 """ 83 if self._repo_connector is None: 84 from meerschaum.connectors.parse import parse_repo_keys 85 86 repo_keys = self._repo_keys or self._repo_in_name 87 if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name: 88 error( 89 f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'." 90 ) 91 repo_connector = parse_repo_keys(repo_keys) 92 self._repo_connector = repo_connector 93 return self._repo_connector 94 95 96 @property 97 def version(self): 98 """ 99 Return the plugin's module version is defined (`__version__`) if it's defined. 100 """ 101 if self._version is None: 102 try: 103 self._version = self.module.__version__ 104 except Exception as e: 105 self._version = None 106 return self._version 107 108 109 @property 110 def module(self): 111 """ 112 Return the Python module of the underlying plugin. 113 """ 114 if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None: 115 if self.__file__ is None: 116 return None 117 from meerschaum.plugins import import_plugins 118 self._module = import_plugins(str(self), warn=False) 119 return self._module 120 121 122 @property 123 def __file__(self) -> Union[str, None]: 124 """ 125 Return the file path (str) of the plugin if it exists, otherwise `None`. 126 """ 127 if self.__dict__.get('_module', None) is not None: 128 return self.module.__file__ 129 130 potential_dir = PLUGINS_RESOURCES_PATH / self.name 131 if ( 132 potential_dir.exists() 133 and potential_dir.is_dir() 134 and (potential_dir / '__init__.py').exists() 135 ): 136 return str((potential_dir / '__init__.py').as_posix()) 137 138 potential_file = PLUGINS_RESOURCES_PATH / (self.name + '.py') 139 if potential_file.exists() and not potential_file.is_dir(): 140 return str(potential_file.as_posix()) 141 142 return None 143 144 145 @property 146 def requirements_file_path(self) -> Union[pathlib.Path, None]: 147 """ 148 If a file named `requirements.txt` exists, return its path. 149 """ 150 if self.__file__ is None: 151 return None 152 path = pathlib.Path(self.__file__).parent / 'requirements.txt' 153 if not path.exists(): 154 return None 155 return path 156 157 158 def is_installed(self, **kw) -> bool: 159 """ 160 Check whether a plugin is correctly installed. 161 162 Returns 163 ------- 164 A `bool` indicating whether a plugin exists and is successfully imported. 165 """ 166 return self.__file__ is not None 167 168 169 def make_tar(self, debug: bool = False) -> pathlib.Path: 170 """ 171 Compress the plugin's source files into a `.tar.gz` archive and return the archive's path. 172 173 Parameters 174 ---------- 175 debug: bool, default False 176 Verbosity toggle. 177 178 Returns 179 ------- 180 A `pathlib.Path` to the archive file's path. 181 182 """ 183 import tarfile, pathlib, subprocess, fnmatch 184 from meerschaum.utils.debug import dprint 185 from meerschaum.utils.packages import attempt_import 186 pathspec = attempt_import('pathspec', debug=debug) 187 188 if not self.__file__: 189 from meerschaum.utils.warnings import error 190 error(f"Could not find file for plugin '{self}'.") 191 if '__init__.py' in self.__file__ or os.path.isdir(self.__file__): 192 path = self.__file__.replace('__init__.py', '') 193 is_dir = True 194 else: 195 path = self.__file__ 196 is_dir = False 197 198 old_cwd = os.getcwd() 199 real_parent_path = pathlib.Path(os.path.realpath(path)).parent 200 os.chdir(real_parent_path) 201 202 default_patterns_to_ignore = [ 203 '.pyc', 204 '__pycache__/', 205 'eggs/', 206 '__pypackages__/', 207 '.git', 208 ] 209 210 def parse_gitignore() -> 'Set[str]': 211 gitignore_path = pathlib.Path(path) / '.gitignore' 212 if not gitignore_path.exists(): 213 return set() 214 with open(gitignore_path, 'r', encoding='utf-8') as f: 215 gitignore_text = f.read() 216 return set(pathspec.PathSpec.from_lines( 217 pathspec.patterns.GitWildMatchPattern, 218 default_patterns_to_ignore + gitignore_text.splitlines() 219 ).match_tree(path)) 220 221 patterns_to_ignore = parse_gitignore() if is_dir else set() 222 223 if debug: 224 dprint(f"Patterns to ignore:\n{patterns_to_ignore}") 225 226 with tarfile.open(self.archive_path, 'w:gz') as tarf: 227 if not is_dir: 228 tarf.add(f"{self.name}.py") 229 else: 230 for root, dirs, files in os.walk(self.name): 231 for f in files: 232 good_file = True 233 fp = os.path.join(root, f) 234 for pattern in patterns_to_ignore: 235 if pattern in str(fp) or f.startswith('.'): 236 good_file = False 237 break 238 if good_file: 239 if debug: 240 dprint(f"Adding '{fp}'...") 241 tarf.add(fp) 242 243 ### clean up and change back to old directory 244 os.chdir(old_cwd) 245 246 ### change to 775 to avoid permissions issues with the API in a Docker container 247 self.archive_path.chmod(0o775) 248 249 if debug: 250 dprint(f"Created archive '{self.archive_path}'.") 251 return self.archive_path 252 253 254 def install( 255 self, 256 force: bool = False, 257 debug: bool = False, 258 ) -> SuccessTuple: 259 """ 260 Extract a plugin's tar archive to the plugins directory. 261 262 This function checks if the plugin is already installed and if the version is equal or 263 greater than the existing installation. 264 265 Parameters 266 ---------- 267 force: bool, default False 268 If `True`, continue with installation, even if required packages fail to install. 269 270 debug: bool, default False 271 Verbosity toggle. 272 273 Returns 274 ------- 275 A `SuccessTuple` of success (bool) and a message (str). 276 277 """ 278 if self.full_name in _ongoing_installations: 279 return True, f"Already installing plugin '{self}'." 280 _ongoing_installations.add(self.full_name) 281 from meerschaum.utils.warnings import warn, error 282 if debug: 283 from meerschaum.utils.debug import dprint 284 import tarfile 285 import re 286 import ast 287 from meerschaum.plugins import sync_plugins_symlinks 288 from meerschaum.utils.packages import attempt_import, determine_version, reload_meerschaum 289 from meerschaum.utils.venv import init_venv 290 from meerschaum.utils.misc import safely_extract_tar 291 old_cwd = os.getcwd() 292 old_version = '' 293 new_version = '' 294 temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name 295 temp_dir.mkdir(exist_ok=True) 296 297 if not self.archive_path.exists(): 298 return False, f"Missing archive file for plugin '{self}'." 299 if self.version is not None: 300 old_version = self.version 301 if debug: 302 dprint(f"Found existing version '{old_version}' for plugin '{self}'.") 303 304 if debug: 305 dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...") 306 307 try: 308 with tarfile.open(self.archive_path, 'r:gz') as tarf: 309 safely_extract_tar(tarf, temp_dir) 310 except Exception as e: 311 warn(e) 312 return False, f"Failed to extract plugin '{self.name}'." 313 314 ### search for version information 315 files = os.listdir(temp_dir) 316 317 if str(files[0]) == self.name: 318 is_dir = True 319 elif str(files[0]) == self.name + '.py': 320 is_dir = False 321 else: 322 error(f"Unknown format encountered for plugin '{self}'.") 323 324 fpath = temp_dir / files[0] 325 if is_dir: 326 fpath = fpath / '__init__.py' 327 328 init_venv(self.name, debug=debug) 329 with open(fpath, 'r', encoding='utf-8') as f: 330 init_lines = f.readlines() 331 new_version = None 332 for line in init_lines: 333 if '__version__' not in line: 334 continue 335 version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip()) 336 if not version_match: 337 continue 338 new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip()) 339 break 340 if not new_version: 341 warn( 342 f"No `__version__` defined for plugin '{self}'. " 343 + "Assuming new version...", 344 stack = False, 345 ) 346 347 packaging_version = attempt_import('packaging.version') 348 try: 349 is_new_version = (not new_version and not old_version) or ( 350 packaging_version.parse(old_version) < packaging_version.parse(new_version) 351 ) 352 is_same_version = new_version and old_version and ( 353 packaging_version.parse(old_version) == packaging_version.parse(new_version) 354 ) 355 except Exception as e: 356 is_new_version, is_same_version = True, False 357 358 ### Determine where to permanently store the new plugin. 359 plugin_installation_dir_path = PLUGINS_DIR_PATHS[0] 360 for path in PLUGINS_DIR_PATHS: 361 files_in_plugins_dir = os.listdir(path) 362 if ( 363 self.name in files_in_plugins_dir 364 or 365 (self.name + '.py') in files_in_plugins_dir 366 ): 367 plugin_installation_dir_path = path 368 break 369 370 success_msg = f"Successfully installed plugin '{self}'." 371 success, abort = None, None 372 373 if is_same_version and not force: 374 success, msg = True, ( 375 f"Plugin '{self}' is up-to-date (version {old_version}).\n" + 376 " Install again with `-f` or `--force` to reinstall." 377 ) 378 abort = True 379 elif is_new_version or force: 380 for src_dir, dirs, files in os.walk(temp_dir): 381 if success is not None: 382 break 383 dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path)) 384 if not os.path.exists(dst_dir): 385 os.mkdir(dst_dir) 386 for f in files: 387 src_file = os.path.join(src_dir, f) 388 dst_file = os.path.join(dst_dir, f) 389 if os.path.exists(dst_file): 390 os.remove(dst_file) 391 392 if debug: 393 dprint(f"Moving '{src_file}' to '{dst_dir}'...") 394 try: 395 shutil.move(src_file, dst_dir) 396 except Exception as e: 397 success, msg = False, ( 398 f"Failed to install plugin '{self}': " + 399 f"Could not move file '{src_file}' to '{dst_dir}'" 400 ) 401 print(msg) 402 break 403 if success is None: 404 success, msg = True, success_msg 405 else: 406 success, msg = False, ( 407 f"Your installed version of plugin '{self}' ({old_version}) is higher than " 408 + f"attempted version {new_version}." 409 ) 410 411 shutil.rmtree(temp_dir) 412 os.chdir(old_cwd) 413 414 ### Reload the plugin's module. 415 sync_plugins_symlinks(debug=debug) 416 if '_module' in self.__dict__: 417 del self.__dict__['_module'] 418 init_venv(venv=self.name, force=True, debug=debug) 419 reload_meerschaum(debug=debug) 420 421 ### if we've already failed, return here 422 if not success or abort: 423 _ongoing_installations.remove(self.full_name) 424 return success, msg 425 426 ### attempt to install dependencies 427 if not self.install_dependencies(force=force, debug=debug): 428 _ongoing_installations.remove(self.full_name) 429 return False, f"Failed to install dependencies for plugin '{self}'." 430 431 ### handling success tuple, bool, or other (typically None) 432 setup_tuple = self.setup(debug=debug) 433 if isinstance(setup_tuple, tuple): 434 if not setup_tuple[0]: 435 success, msg = setup_tuple 436 elif isinstance(setup_tuple, bool): 437 if not setup_tuple: 438 success, msg = False, ( 439 f"Failed to run post-install setup for plugin '{self}'." + '\n' + 440 f"Check `setup()` in '{self.__file__}' for more information " + 441 f"(no error message provided)." 442 ) 443 else: 444 success, msg = True, success_msg 445 elif setup_tuple is None: 446 success = True 447 msg = ( 448 f"Post-install for plugin '{self}' returned None. " + 449 f"Assuming plugin successfully installed." 450 ) 451 warn(msg) 452 else: 453 success = False 454 msg = ( 455 f"Post-install for plugin '{self}' returned unexpected value " + 456 f"of type '{type(setup_tuple)}': {setup_tuple}" 457 ) 458 459 _ongoing_installations.remove(self.full_name) 460 module = self.module 461 return success, msg 462 463 464 def remove_archive( 465 self, 466 debug: bool = False 467 ) -> SuccessTuple: 468 """Remove a plugin's archive file.""" 469 if not self.archive_path.exists(): 470 return True, f"Archive file for plugin '{self}' does not exist." 471 try: 472 self.archive_path.unlink() 473 except Exception as e: 474 return False, f"Failed to remove archive for plugin '{self}':\n{e}" 475 return True, "Success" 476 477 478 def remove_venv( 479 self, 480 debug: bool = False 481 ) -> SuccessTuple: 482 """Remove a plugin's virtual environment.""" 483 if not self.venv_path.exists(): 484 return True, f"Virtual environment for plugin '{self}' does not exist." 485 try: 486 shutil.rmtree(self.venv_path) 487 except Exception as e: 488 return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}" 489 return True, "Success" 490 491 492 def uninstall(self, debug: bool = False) -> SuccessTuple: 493 """ 494 Remove a plugin, its virtual environment, and archive file. 495 """ 496 from meerschaum.utils.packages import reload_meerschaum 497 from meerschaum.plugins import sync_plugins_symlinks 498 from meerschaum.utils.warnings import warn, info 499 warnings_thrown_count: int = 0 500 max_warnings: int = 3 501 502 if not self.is_installed(): 503 info( 504 f"Plugin '{self.name}' doesn't seem to be installed.\n " 505 + "Checking for artifacts...", 506 stack = False, 507 ) 508 else: 509 real_path = pathlib.Path(os.path.realpath(self.__file__)) 510 try: 511 if real_path.name == '__init__.py': 512 shutil.rmtree(real_path.parent) 513 else: 514 real_path.unlink() 515 except Exception as e: 516 warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False) 517 warnings_thrown_count += 1 518 else: 519 info(f"Removed source files for plugin '{self.name}'.") 520 521 if self.venv_path.exists(): 522 success, msg = self.remove_venv(debug=debug) 523 if not success: 524 warn(msg, stack=False) 525 warnings_thrown_count += 1 526 else: 527 info(f"Removed virtual environment from plugin '{self.name}'.") 528 529 success = warnings_thrown_count < max_warnings 530 sync_plugins_symlinks(debug=debug) 531 self.deactivate_venv(force=True, debug=debug) 532 reload_meerschaum(debug=debug) 533 return success, ( 534 f"Successfully uninstalled plugin '{self}'." if success 535 else f"Failed to uninstall plugin '{self}'." 536 ) 537 538 539 def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]: 540 """ 541 If exists, run the plugin's `setup()` function. 542 543 Parameters 544 ---------- 545 *args: str 546 The positional arguments passed to the `setup()` function. 547 548 debug: bool, default False 549 Verbosity toggle. 550 551 **kw: Any 552 The keyword arguments passed to the `setup()` function. 553 554 Returns 555 ------- 556 A `SuccessTuple` or `bool` indicating success. 557 558 """ 559 from meerschaum.utils.debug import dprint 560 import inspect 561 _setup = None 562 for name, fp in inspect.getmembers(self.module): 563 if name == 'setup' and inspect.isfunction(fp): 564 _setup = fp 565 break 566 567 ### assume success if no setup() is found (not necessary) 568 if _setup is None: 569 return True 570 571 sig = inspect.signature(_setup) 572 has_debug, has_kw = ('debug' in sig.parameters), False 573 for k, v in sig.parameters.items(): 574 if '**' in str(v): 575 has_kw = True 576 break 577 578 _kw = {} 579 if has_kw: 580 _kw.update(kw) 581 if has_debug: 582 _kw['debug'] = debug 583 584 if debug: 585 dprint(f"Running setup for plugin '{self}'...") 586 try: 587 self.activate_venv(debug=debug) 588 return_tuple = _setup(*args, **_kw) 589 self.deactivate_venv(debug=debug) 590 except Exception as e: 591 return False, str(e) 592 593 if isinstance(return_tuple, tuple): 594 return return_tuple 595 if isinstance(return_tuple, bool): 596 return return_tuple, f"Setup for Plugin '{self.name}' did not return a message." 597 if return_tuple is None: 598 return False, f"Setup for Plugin '{self.name}' returned None." 599 return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}" 600 601 602 def get_dependencies( 603 self, 604 debug: bool = False, 605 ) -> List[str]: 606 """ 607 If the Plugin has specified dependencies in a list called `required`, return the list. 608 609 **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages. 610 Meerschaum plugins may also specify connector keys for a repo after `'@'`. 611 612 Parameters 613 ---------- 614 debug: bool, default False 615 Verbosity toggle. 616 617 Returns 618 ------- 619 A list of required packages and plugins (str). 620 621 """ 622 if '_required' in self.__dict__: 623 return self._required 624 625 ### If the plugin has not yet been imported, 626 ### infer the dependencies from the source text. 627 ### This is not super robust, and it doesn't feel right 628 ### having multiple versions of the logic. 629 ### This is necessary when determining the activation order 630 ### without having import the module. 631 ### For consistency's sake, the module-less method does not cache the requirements. 632 if self.__dict__.get('_module', None) is None: 633 file_path = self.__file__ 634 if file_path is None: 635 return [] 636 with open(file_path, 'r', encoding='utf-8') as f: 637 text = f.read() 638 639 if 'required' not in text: 640 return [] 641 642 ### This has some limitations: 643 ### It relies on `required` being manually declared. 644 ### We lose the ability to dynamically alter the `required` list, 645 ### which is why we've kept the module-reliant method below. 646 import ast, re 647 ### NOTE: This technically would break 648 ### if `required` was the very first line of the file. 649 req_start_match = re.search(r'\nrequired(\s?)=', text) 650 if not req_start_match: 651 return [] 652 req_start = req_start_match.start() 653 654 ### Dependencies may have brackets within the strings, so push back the index. 655 first_opening_brace = req_start + 1 + text[req_start:].find('[') 656 if first_opening_brace == -1: 657 return [] 658 659 next_closing_brace = req_start + 1 + text[req_start:].find(']') 660 if next_closing_brace == -1: 661 return [] 662 663 start_ix = first_opening_brace + 1 664 end_ix = next_closing_brace 665 666 num_braces = 0 667 while True: 668 if '[' not in text[start_ix:end_ix]: 669 break 670 num_braces += 1 671 start_ix = end_ix 672 end_ix += text[end_ix + 1:].find(']') + 1 673 674 req_end = end_ix + 1 675 req_text = ( 676 text[req_start:req_end] 677 .lstrip() 678 .replace('required', '', 1) 679 .lstrip() 680 .replace('=', '', 1) 681 .lstrip() 682 ) 683 try: 684 required = ast.literal_eval(req_text) 685 except Exception as e: 686 warn( 687 f"Unable to determine requirements for plugin '{self.name}' " 688 + "without importing the module.\n" 689 + " This may be due to dynamically setting the global `required` list.\n" 690 + f" {e}" 691 ) 692 return [] 693 return required 694 695 import inspect 696 self.activate_venv(dependencies=False, debug=debug) 697 required = [] 698 for name, val in inspect.getmembers(self.module): 699 if name == 'required': 700 required = val 701 break 702 self._required = required 703 self.deactivate_venv(dependencies=False, debug=debug) 704 return required 705 706 707 def get_required_plugins(self, debug: bool=False) -> List[meerschaum.plugins.Plugin]: 708 """ 709 Return a list of required Plugin objects. 710 """ 711 from meerschaum.utils.warnings import warn 712 from meerschaum.config import get_config 713 from meerschaum.config.static import STATIC_CONFIG 714 plugins = [] 715 _deps = self.get_dependencies(debug=debug) 716 sep = STATIC_CONFIG['plugins']['repo_separator'] 717 plugin_names = [ 718 _d[len('plugin:'):] for _d in _deps 719 if _d.startswith('plugin:') and len(_d) > len('plugin:') 720 ] 721 default_repo_keys = get_config('meerschaum', 'default_repository') 722 for _plugin_name in plugin_names: 723 if sep in _plugin_name: 724 try: 725 _plugin_name, _repo_keys = _plugin_name.split(sep) 726 except Exception as e: 727 _repo_keys = default_repo_keys 728 warn( 729 f"Invalid repo keys for required plugin '{_plugin_name}'.\n " 730 + f"Will try to use '{_repo_keys}' instead.", 731 stack = False, 732 ) 733 else: 734 _repo_keys = default_repo_keys 735 plugins.append(Plugin(_plugin_name, repo=_repo_keys)) 736 return plugins 737 738 739 def get_required_packages(self, debug: bool=False) -> List[str]: 740 """ 741 Return the required package names (excluding plugins). 742 """ 743 _deps = self.get_dependencies(debug=debug) 744 return [_d for _d in _deps if not _d.startswith('plugin:')] 745 746 747 def activate_venv(self, dependencies: bool=True, debug: bool=False, **kw) -> bool: 748 """ 749 Activate the virtual environments for the plugin and its dependencies. 750 751 Parameters 752 ---------- 753 dependencies: bool, default True 754 If `True`, activate the virtual environments for required plugins. 755 756 Returns 757 ------- 758 A bool indicating success. 759 """ 760 from meerschaum.utils.venv import venv_target_path 761 from meerschaum.utils.packages import activate_venv 762 from meerschaum.utils.misc import make_symlink, is_symlink 763 from meerschaum.config._paths import PACKAGE_ROOT_PATH 764 765 if dependencies: 766 for plugin in self.get_required_plugins(debug=debug): 767 plugin.activate_venv(debug=debug, **kw) 768 769 vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True) 770 venv_meerschaum_path = vtp / 'meerschaum' 771 772 try: 773 success, msg = True, "Success" 774 if is_symlink(venv_meerschaum_path): 775 if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH: 776 venv_meerschaum_path.unlink() 777 success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH) 778 except Exception as e: 779 success, msg = False, str(e) 780 if not success: 781 warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}") 782 783 return activate_venv(self.name, debug=debug, **kw) 784 785 786 def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool: 787 """ 788 Deactivate the virtual environments for the plugin and its dependencies. 789 790 Parameters 791 ---------- 792 dependencies: bool, default True 793 If `True`, deactivate the virtual environments for required plugins. 794 795 Returns 796 ------- 797 A bool indicating success. 798 """ 799 from meerschaum.utils.packages import deactivate_venv 800 success = deactivate_venv(self.name, debug=debug, **kw) 801 if dependencies: 802 for plugin in self.get_required_plugins(debug=debug): 803 plugin.deactivate_venv(debug=debug, **kw) 804 return success 805 806 807 def install_dependencies( 808 self, 809 force: bool = False, 810 debug: bool = False, 811 ) -> bool: 812 """ 813 If specified, install dependencies. 814 815 **NOTE:** Dependencies that start with `'plugin:'` will be installed as 816 Meerschaum plugins from the same repository as this Plugin. 817 To install from a different repository, add the repo keys after `'@'` 818 (e.g. `'plugin:foo@api:bar'`). 819 820 Parameters 821 ---------- 822 force: bool, default False 823 If `True`, continue with the installation, even if some 824 required packages fail to install. 825 826 debug: bool, default False 827 Verbosity toggle. 828 829 Returns 830 ------- 831 A bool indicating success. 832 833 """ 834 from meerschaum.utils.packages import pip_install, venv_contains_package 835 from meerschaum.utils.debug import dprint 836 from meerschaum.utils.warnings import warn, info 837 from meerschaum.connectors.parse import parse_repo_keys 838 _deps = self.get_dependencies(debug=debug) 839 if not _deps and self.requirements_file_path is None: 840 return True 841 842 plugins = self.get_required_plugins(debug=debug) 843 for _plugin in plugins: 844 if _plugin.name == self.name: 845 warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False) 846 continue 847 _success, _msg = _plugin.repo_connector.install_plugin( 848 _plugin.name, debug=debug, force=force 849 ) 850 if not _success: 851 warn( 852 f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'" 853 + f" for plugin '{self.name}':\n" + _msg, 854 stack = False, 855 ) 856 if not force: 857 warn( 858 "Try installing with the `--force` flag to continue anyway.", 859 stack = False, 860 ) 861 return False 862 info( 863 "Continuing with installation despite the failure " 864 + "(careful, things might be broken!)...", 865 icon = False 866 ) 867 868 869 ### First step: parse `requirements.txt` if it exists. 870 if self.requirements_file_path is not None: 871 if not pip_install( 872 requirements_file_path=self.requirements_file_path, 873 venv=self.name, debug=debug 874 ): 875 warn( 876 f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.", 877 stack = False, 878 ) 879 if not force: 880 warn( 881 "Try installing with `--force` to continue anyway.", 882 stack = False, 883 ) 884 return False 885 info( 886 "Continuing with installation despite the failure " 887 + "(careful, things might be broken!)...", 888 icon = False 889 ) 890 891 892 ### Don't reinstall packages that are already included in required plugins. 893 packages = [] 894 _packages = self.get_required_packages(debug=debug) 895 accounted_for_packages = set() 896 for package_name in _packages: 897 for plugin in plugins: 898 if venv_contains_package(package_name, plugin.name): 899 accounted_for_packages.add(package_name) 900 break 901 packages = [pkg for pkg in _packages if pkg not in accounted_for_packages] 902 903 ### Attempt pip packages installation. 904 if packages: 905 for package in packages: 906 if not pip_install(package, venv=self.name, debug=debug): 907 warn( 908 f"Failed to install required package '{package}'" 909 + f" for plugin '{self.name}'.", 910 stack = False, 911 ) 912 if not force: 913 warn( 914 "Try installing with `--force` to continue anyway.", 915 stack = False, 916 ) 917 return False 918 info( 919 "Continuing with installation despite the failure " 920 + "(careful, things might be broken!)...", 921 icon = False 922 ) 923 return True 924 925 926 @property 927 def full_name(self) -> str: 928 """ 929 Include the repo keys with the plugin's name. 930 """ 931 from meerschaum.config.static import STATIC_CONFIG 932 sep = STATIC_CONFIG['plugins']['repo_separator'] 933 return self.name + sep + str(self.repo_connector) 934 935 936 def __str__(self): 937 return self.name 938 939 940 def __repr__(self): 941 return f"Plugin('{self.name}', repo='{self.repo_connector}')" 942 943 944 def __del__(self): 945 pass
Handle packaging of Meerschaum plugins.
35 def __init__( 36 self, 37 name: str, 38 version: Optional[str] = None, 39 user_id: Optional[int] = None, 40 required: Optional[List[str]] = None, 41 attributes: Optional[Dict[str, Any]] = None, 42 archive_path: Optional[pathlib.Path] = None, 43 venv_path: Optional[pathlib.Path] = None, 44 repo_connector: Optional['meerschaum.connectors.api.APIConnector'] = None, 45 repo: Union['meerschaum.connectors.api.APIConnector', str, None] = None, 46 ): 47 from meerschaum.config.static import STATIC_CONFIG 48 sep = STATIC_CONFIG['plugins']['repo_separator'] 49 _repo = None 50 if sep in name: 51 try: 52 name, _repo = name.split(sep) 53 except Exception as e: 54 error(f"Invalid plugin name: '{name}'") 55 self._repo_in_name = _repo 56 57 if attributes is None: 58 attributes = {} 59 self.name = name 60 self.attributes = attributes 61 self.user_id = user_id 62 self._version = version 63 if required: 64 self._required = required 65 self.archive_path = ( 66 archive_path if archive_path is not None 67 else PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz" 68 ) 69 self.venv_path = ( 70 venv_path if venv_path is not None 71 else VIRTENV_RESOURCES_PATH / self.name 72 ) 73 self._repo_connector = repo_connector 74 self._repo_keys = repo
Return the repository connector for this plugin.
NOTE: This imports the connectors
module, which imports certain plugin modules.
If a file named requirements.txt
exists, return its path.
158 def is_installed(self, **kw) -> bool: 159 """ 160 Check whether a plugin is correctly installed. 161 162 Returns 163 ------- 164 A `bool` indicating whether a plugin exists and is successfully imported. 165 """ 166 return self.__file__ is not None
Check whether a plugin is correctly installed.
Returns
- A
bool
indicating whether a plugin exists and is successfully imported.
169 def make_tar(self, debug: bool = False) -> pathlib.Path: 170 """ 171 Compress the plugin's source files into a `.tar.gz` archive and return the archive's path. 172 173 Parameters 174 ---------- 175 debug: bool, default False 176 Verbosity toggle. 177 178 Returns 179 ------- 180 A `pathlib.Path` to the archive file's path. 181 182 """ 183 import tarfile, pathlib, subprocess, fnmatch 184 from meerschaum.utils.debug import dprint 185 from meerschaum.utils.packages import attempt_import 186 pathspec = attempt_import('pathspec', debug=debug) 187 188 if not self.__file__: 189 from meerschaum.utils.warnings import error 190 error(f"Could not find file for plugin '{self}'.") 191 if '__init__.py' in self.__file__ or os.path.isdir(self.__file__): 192 path = self.__file__.replace('__init__.py', '') 193 is_dir = True 194 else: 195 path = self.__file__ 196 is_dir = False 197 198 old_cwd = os.getcwd() 199 real_parent_path = pathlib.Path(os.path.realpath(path)).parent 200 os.chdir(real_parent_path) 201 202 default_patterns_to_ignore = [ 203 '.pyc', 204 '__pycache__/', 205 'eggs/', 206 '__pypackages__/', 207 '.git', 208 ] 209 210 def parse_gitignore() -> 'Set[str]': 211 gitignore_path = pathlib.Path(path) / '.gitignore' 212 if not gitignore_path.exists(): 213 return set() 214 with open(gitignore_path, 'r', encoding='utf-8') as f: 215 gitignore_text = f.read() 216 return set(pathspec.PathSpec.from_lines( 217 pathspec.patterns.GitWildMatchPattern, 218 default_patterns_to_ignore + gitignore_text.splitlines() 219 ).match_tree(path)) 220 221 patterns_to_ignore = parse_gitignore() if is_dir else set() 222 223 if debug: 224 dprint(f"Patterns to ignore:\n{patterns_to_ignore}") 225 226 with tarfile.open(self.archive_path, 'w:gz') as tarf: 227 if not is_dir: 228 tarf.add(f"{self.name}.py") 229 else: 230 for root, dirs, files in os.walk(self.name): 231 for f in files: 232 good_file = True 233 fp = os.path.join(root, f) 234 for pattern in patterns_to_ignore: 235 if pattern in str(fp) or f.startswith('.'): 236 good_file = False 237 break 238 if good_file: 239 if debug: 240 dprint(f"Adding '{fp}'...") 241 tarf.add(fp) 242 243 ### clean up and change back to old directory 244 os.chdir(old_cwd) 245 246 ### change to 775 to avoid permissions issues with the API in a Docker container 247 self.archive_path.chmod(0o775) 248 249 if debug: 250 dprint(f"Created archive '{self.archive_path}'.") 251 return self.archive_path
Compress the plugin's source files into a .tar.gz
archive and return the archive's path.
Parameters
- debug (bool, default False): Verbosity toggle.
Returns
- A
pathlib.Path
to the archive file's path.
254 def install( 255 self, 256 force: bool = False, 257 debug: bool = False, 258 ) -> SuccessTuple: 259 """ 260 Extract a plugin's tar archive to the plugins directory. 261 262 This function checks if the plugin is already installed and if the version is equal or 263 greater than the existing installation. 264 265 Parameters 266 ---------- 267 force: bool, default False 268 If `True`, continue with installation, even if required packages fail to install. 269 270 debug: bool, default False 271 Verbosity toggle. 272 273 Returns 274 ------- 275 A `SuccessTuple` of success (bool) and a message (str). 276 277 """ 278 if self.full_name in _ongoing_installations: 279 return True, f"Already installing plugin '{self}'." 280 _ongoing_installations.add(self.full_name) 281 from meerschaum.utils.warnings import warn, error 282 if debug: 283 from meerschaum.utils.debug import dprint 284 import tarfile 285 import re 286 import ast 287 from meerschaum.plugins import sync_plugins_symlinks 288 from meerschaum.utils.packages import attempt_import, determine_version, reload_meerschaum 289 from meerschaum.utils.venv import init_venv 290 from meerschaum.utils.misc import safely_extract_tar 291 old_cwd = os.getcwd() 292 old_version = '' 293 new_version = '' 294 temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name 295 temp_dir.mkdir(exist_ok=True) 296 297 if not self.archive_path.exists(): 298 return False, f"Missing archive file for plugin '{self}'." 299 if self.version is not None: 300 old_version = self.version 301 if debug: 302 dprint(f"Found existing version '{old_version}' for plugin '{self}'.") 303 304 if debug: 305 dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...") 306 307 try: 308 with tarfile.open(self.archive_path, 'r:gz') as tarf: 309 safely_extract_tar(tarf, temp_dir) 310 except Exception as e: 311 warn(e) 312 return False, f"Failed to extract plugin '{self.name}'." 313 314 ### search for version information 315 files = os.listdir(temp_dir) 316 317 if str(files[0]) == self.name: 318 is_dir = True 319 elif str(files[0]) == self.name + '.py': 320 is_dir = False 321 else: 322 error(f"Unknown format encountered for plugin '{self}'.") 323 324 fpath = temp_dir / files[0] 325 if is_dir: 326 fpath = fpath / '__init__.py' 327 328 init_venv(self.name, debug=debug) 329 with open(fpath, 'r', encoding='utf-8') as f: 330 init_lines = f.readlines() 331 new_version = None 332 for line in init_lines: 333 if '__version__' not in line: 334 continue 335 version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip()) 336 if not version_match: 337 continue 338 new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip()) 339 break 340 if not new_version: 341 warn( 342 f"No `__version__` defined for plugin '{self}'. " 343 + "Assuming new version...", 344 stack = False, 345 ) 346 347 packaging_version = attempt_import('packaging.version') 348 try: 349 is_new_version = (not new_version and not old_version) or ( 350 packaging_version.parse(old_version) < packaging_version.parse(new_version) 351 ) 352 is_same_version = new_version and old_version and ( 353 packaging_version.parse(old_version) == packaging_version.parse(new_version) 354 ) 355 except Exception as e: 356 is_new_version, is_same_version = True, False 357 358 ### Determine where to permanently store the new plugin. 359 plugin_installation_dir_path = PLUGINS_DIR_PATHS[0] 360 for path in PLUGINS_DIR_PATHS: 361 files_in_plugins_dir = os.listdir(path) 362 if ( 363 self.name in files_in_plugins_dir 364 or 365 (self.name + '.py') in files_in_plugins_dir 366 ): 367 plugin_installation_dir_path = path 368 break 369 370 success_msg = f"Successfully installed plugin '{self}'." 371 success, abort = None, None 372 373 if is_same_version and not force: 374 success, msg = True, ( 375 f"Plugin '{self}' is up-to-date (version {old_version}).\n" + 376 " Install again with `-f` or `--force` to reinstall." 377 ) 378 abort = True 379 elif is_new_version or force: 380 for src_dir, dirs, files in os.walk(temp_dir): 381 if success is not None: 382 break 383 dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path)) 384 if not os.path.exists(dst_dir): 385 os.mkdir(dst_dir) 386 for f in files: 387 src_file = os.path.join(src_dir, f) 388 dst_file = os.path.join(dst_dir, f) 389 if os.path.exists(dst_file): 390 os.remove(dst_file) 391 392 if debug: 393 dprint(f"Moving '{src_file}' to '{dst_dir}'...") 394 try: 395 shutil.move(src_file, dst_dir) 396 except Exception as e: 397 success, msg = False, ( 398 f"Failed to install plugin '{self}': " + 399 f"Could not move file '{src_file}' to '{dst_dir}'" 400 ) 401 print(msg) 402 break 403 if success is None: 404 success, msg = True, success_msg 405 else: 406 success, msg = False, ( 407 f"Your installed version of plugin '{self}' ({old_version}) is higher than " 408 + f"attempted version {new_version}." 409 ) 410 411 shutil.rmtree(temp_dir) 412 os.chdir(old_cwd) 413 414 ### Reload the plugin's module. 415 sync_plugins_symlinks(debug=debug) 416 if '_module' in self.__dict__: 417 del self.__dict__['_module'] 418 init_venv(venv=self.name, force=True, debug=debug) 419 reload_meerschaum(debug=debug) 420 421 ### if we've already failed, return here 422 if not success or abort: 423 _ongoing_installations.remove(self.full_name) 424 return success, msg 425 426 ### attempt to install dependencies 427 if not self.install_dependencies(force=force, debug=debug): 428 _ongoing_installations.remove(self.full_name) 429 return False, f"Failed to install dependencies for plugin '{self}'." 430 431 ### handling success tuple, bool, or other (typically None) 432 setup_tuple = self.setup(debug=debug) 433 if isinstance(setup_tuple, tuple): 434 if not setup_tuple[0]: 435 success, msg = setup_tuple 436 elif isinstance(setup_tuple, bool): 437 if not setup_tuple: 438 success, msg = False, ( 439 f"Failed to run post-install setup for plugin '{self}'." + '\n' + 440 f"Check `setup()` in '{self.__file__}' for more information " + 441 f"(no error message provided)." 442 ) 443 else: 444 success, msg = True, success_msg 445 elif setup_tuple is None: 446 success = True 447 msg = ( 448 f"Post-install for plugin '{self}' returned None. " + 449 f"Assuming plugin successfully installed." 450 ) 451 warn(msg) 452 else: 453 success = False 454 msg = ( 455 f"Post-install for plugin '{self}' returned unexpected value " + 456 f"of type '{type(setup_tuple)}': {setup_tuple}" 457 ) 458 459 _ongoing_installations.remove(self.full_name) 460 module = self.module 461 return success, msg
Extract a plugin's tar archive to the plugins directory.
This function checks if the plugin is already installed and if the version is equal or greater than the existing installation.
Parameters
- force (bool, default False):
If
True
, continue with installation, even if required packages fail to install. - debug (bool, default False): Verbosity toggle.
Returns
- A
SuccessTuple
of success (bool) and a message (str).
464 def remove_archive( 465 self, 466 debug: bool = False 467 ) -> SuccessTuple: 468 """Remove a plugin's archive file.""" 469 if not self.archive_path.exists(): 470 return True, f"Archive file for plugin '{self}' does not exist." 471 try: 472 self.archive_path.unlink() 473 except Exception as e: 474 return False, f"Failed to remove archive for plugin '{self}':\n{e}" 475 return True, "Success"
Remove a plugin's archive file.
478 def remove_venv( 479 self, 480 debug: bool = False 481 ) -> SuccessTuple: 482 """Remove a plugin's virtual environment.""" 483 if not self.venv_path.exists(): 484 return True, f"Virtual environment for plugin '{self}' does not exist." 485 try: 486 shutil.rmtree(self.venv_path) 487 except Exception as e: 488 return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}" 489 return True, "Success"
Remove a plugin's virtual environment.
492 def uninstall(self, debug: bool = False) -> SuccessTuple: 493 """ 494 Remove a plugin, its virtual environment, and archive file. 495 """ 496 from meerschaum.utils.packages import reload_meerschaum 497 from meerschaum.plugins import sync_plugins_symlinks 498 from meerschaum.utils.warnings import warn, info 499 warnings_thrown_count: int = 0 500 max_warnings: int = 3 501 502 if not self.is_installed(): 503 info( 504 f"Plugin '{self.name}' doesn't seem to be installed.\n " 505 + "Checking for artifacts...", 506 stack = False, 507 ) 508 else: 509 real_path = pathlib.Path(os.path.realpath(self.__file__)) 510 try: 511 if real_path.name == '__init__.py': 512 shutil.rmtree(real_path.parent) 513 else: 514 real_path.unlink() 515 except Exception as e: 516 warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False) 517 warnings_thrown_count += 1 518 else: 519 info(f"Removed source files for plugin '{self.name}'.") 520 521 if self.venv_path.exists(): 522 success, msg = self.remove_venv(debug=debug) 523 if not success: 524 warn(msg, stack=False) 525 warnings_thrown_count += 1 526 else: 527 info(f"Removed virtual environment from plugin '{self.name}'.") 528 529 success = warnings_thrown_count < max_warnings 530 sync_plugins_symlinks(debug=debug) 531 self.deactivate_venv(force=True, debug=debug) 532 reload_meerschaum(debug=debug) 533 return success, ( 534 f"Successfully uninstalled plugin '{self}'." if success 535 else f"Failed to uninstall plugin '{self}'." 536 )
Remove a plugin, its virtual environment, and archive file.
539 def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]: 540 """ 541 If exists, run the plugin's `setup()` function. 542 543 Parameters 544 ---------- 545 *args: str 546 The positional arguments passed to the `setup()` function. 547 548 debug: bool, default False 549 Verbosity toggle. 550 551 **kw: Any 552 The keyword arguments passed to the `setup()` function. 553 554 Returns 555 ------- 556 A `SuccessTuple` or `bool` indicating success. 557 558 """ 559 from meerschaum.utils.debug import dprint 560 import inspect 561 _setup = None 562 for name, fp in inspect.getmembers(self.module): 563 if name == 'setup' and inspect.isfunction(fp): 564 _setup = fp 565 break 566 567 ### assume success if no setup() is found (not necessary) 568 if _setup is None: 569 return True 570 571 sig = inspect.signature(_setup) 572 has_debug, has_kw = ('debug' in sig.parameters), False 573 for k, v in sig.parameters.items(): 574 if '**' in str(v): 575 has_kw = True 576 break 577 578 _kw = {} 579 if has_kw: 580 _kw.update(kw) 581 if has_debug: 582 _kw['debug'] = debug 583 584 if debug: 585 dprint(f"Running setup for plugin '{self}'...") 586 try: 587 self.activate_venv(debug=debug) 588 return_tuple = _setup(*args, **_kw) 589 self.deactivate_venv(debug=debug) 590 except Exception as e: 591 return False, str(e) 592 593 if isinstance(return_tuple, tuple): 594 return return_tuple 595 if isinstance(return_tuple, bool): 596 return return_tuple, f"Setup for Plugin '{self.name}' did not return a message." 597 if return_tuple is None: 598 return False, f"Setup for Plugin '{self.name}' returned None." 599 return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"
602 def get_dependencies( 603 self, 604 debug: bool = False, 605 ) -> List[str]: 606 """ 607 If the Plugin has specified dependencies in a list called `required`, return the list. 608 609 **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages. 610 Meerschaum plugins may also specify connector keys for a repo after `'@'`. 611 612 Parameters 613 ---------- 614 debug: bool, default False 615 Verbosity toggle. 616 617 Returns 618 ------- 619 A list of required packages and plugins (str). 620 621 """ 622 if '_required' in self.__dict__: 623 return self._required 624 625 ### If the plugin has not yet been imported, 626 ### infer the dependencies from the source text. 627 ### This is not super robust, and it doesn't feel right 628 ### having multiple versions of the logic. 629 ### This is necessary when determining the activation order 630 ### without having import the module. 631 ### For consistency's sake, the module-less method does not cache the requirements. 632 if self.__dict__.get('_module', None) is None: 633 file_path = self.__file__ 634 if file_path is None: 635 return [] 636 with open(file_path, 'r', encoding='utf-8') as f: 637 text = f.read() 638 639 if 'required' not in text: 640 return [] 641 642 ### This has some limitations: 643 ### It relies on `required` being manually declared. 644 ### We lose the ability to dynamically alter the `required` list, 645 ### which is why we've kept the module-reliant method below. 646 import ast, re 647 ### NOTE: This technically would break 648 ### if `required` was the very first line of the file. 649 req_start_match = re.search(r'\nrequired(\s?)=', text) 650 if not req_start_match: 651 return [] 652 req_start = req_start_match.start() 653 654 ### Dependencies may have brackets within the strings, so push back the index. 655 first_opening_brace = req_start + 1 + text[req_start:].find('[') 656 if first_opening_brace == -1: 657 return [] 658 659 next_closing_brace = req_start + 1 + text[req_start:].find(']') 660 if next_closing_brace == -1: 661 return [] 662 663 start_ix = first_opening_brace + 1 664 end_ix = next_closing_brace 665 666 num_braces = 0 667 while True: 668 if '[' not in text[start_ix:end_ix]: 669 break 670 num_braces += 1 671 start_ix = end_ix 672 end_ix += text[end_ix + 1:].find(']') + 1 673 674 req_end = end_ix + 1 675 req_text = ( 676 text[req_start:req_end] 677 .lstrip() 678 .replace('required', '', 1) 679 .lstrip() 680 .replace('=', '', 1) 681 .lstrip() 682 ) 683 try: 684 required = ast.literal_eval(req_text) 685 except Exception as e: 686 warn( 687 f"Unable to determine requirements for plugin '{self.name}' " 688 + "without importing the module.\n" 689 + " This may be due to dynamically setting the global `required` list.\n" 690 + f" {e}" 691 ) 692 return [] 693 return required 694 695 import inspect 696 self.activate_venv(dependencies=False, debug=debug) 697 required = [] 698 for name, val in inspect.getmembers(self.module): 699 if name == 'required': 700 required = val 701 break 702 self._required = required 703 self.deactivate_venv(dependencies=False, debug=debug) 704 return required
If the Plugin has specified dependencies in a list called required
, return the list.
NOTE: Dependecies which start with 'plugin:'
are Meerschaum plugins, not pip packages.
Meerschaum plugins may also specify connector keys for a repo after '@'
.
Parameters
- debug (bool, default False): Verbosity toggle.
Returns
- A list of required packages and plugins (str).
707 def get_required_plugins(self, debug: bool=False) -> List[meerschaum.plugins.Plugin]: 708 """ 709 Return a list of required Plugin objects. 710 """ 711 from meerschaum.utils.warnings import warn 712 from meerschaum.config import get_config 713 from meerschaum.config.static import STATIC_CONFIG 714 plugins = [] 715 _deps = self.get_dependencies(debug=debug) 716 sep = STATIC_CONFIG['plugins']['repo_separator'] 717 plugin_names = [ 718 _d[len('plugin:'):] for _d in _deps 719 if _d.startswith('plugin:') and len(_d) > len('plugin:') 720 ] 721 default_repo_keys = get_config('meerschaum', 'default_repository') 722 for _plugin_name in plugin_names: 723 if sep in _plugin_name: 724 try: 725 _plugin_name, _repo_keys = _plugin_name.split(sep) 726 except Exception as e: 727 _repo_keys = default_repo_keys 728 warn( 729 f"Invalid repo keys for required plugin '{_plugin_name}'.\n " 730 + f"Will try to use '{_repo_keys}' instead.", 731 stack = False, 732 ) 733 else: 734 _repo_keys = default_repo_keys 735 plugins.append(Plugin(_plugin_name, repo=_repo_keys)) 736 return plugins
Return a list of required Plugin objects.
739 def get_required_packages(self, debug: bool=False) -> List[str]: 740 """ 741 Return the required package names (excluding plugins). 742 """ 743 _deps = self.get_dependencies(debug=debug) 744 return [_d for _d in _deps if not _d.startswith('plugin:')]
Return the required package names (excluding plugins).
747 def activate_venv(self, dependencies: bool=True, debug: bool=False, **kw) -> bool: 748 """ 749 Activate the virtual environments for the plugin and its dependencies. 750 751 Parameters 752 ---------- 753 dependencies: bool, default True 754 If `True`, activate the virtual environments for required plugins. 755 756 Returns 757 ------- 758 A bool indicating success. 759 """ 760 from meerschaum.utils.venv import venv_target_path 761 from meerschaum.utils.packages import activate_venv 762 from meerschaum.utils.misc import make_symlink, is_symlink 763 from meerschaum.config._paths import PACKAGE_ROOT_PATH 764 765 if dependencies: 766 for plugin in self.get_required_plugins(debug=debug): 767 plugin.activate_venv(debug=debug, **kw) 768 769 vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True) 770 venv_meerschaum_path = vtp / 'meerschaum' 771 772 try: 773 success, msg = True, "Success" 774 if is_symlink(venv_meerschaum_path): 775 if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH: 776 venv_meerschaum_path.unlink() 777 success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH) 778 except Exception as e: 779 success, msg = False, str(e) 780 if not success: 781 warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}") 782 783 return activate_venv(self.name, debug=debug, **kw)
Activate the virtual environments for the plugin and its dependencies.
Parameters
- dependencies (bool, default True):
If
True
, activate the virtual environments for required plugins.
Returns
- A bool indicating success.
786 def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool: 787 """ 788 Deactivate the virtual environments for the plugin and its dependencies. 789 790 Parameters 791 ---------- 792 dependencies: bool, default True 793 If `True`, deactivate the virtual environments for required plugins. 794 795 Returns 796 ------- 797 A bool indicating success. 798 """ 799 from meerschaum.utils.packages import deactivate_venv 800 success = deactivate_venv(self.name, debug=debug, **kw) 801 if dependencies: 802 for plugin in self.get_required_plugins(debug=debug): 803 plugin.deactivate_venv(debug=debug, **kw) 804 return success
Deactivate the virtual environments for the plugin and its dependencies.
Parameters
- dependencies (bool, default True):
If
True
, deactivate the virtual environments for required plugins.
Returns
- A bool indicating success.
807 def install_dependencies( 808 self, 809 force: bool = False, 810 debug: bool = False, 811 ) -> bool: 812 """ 813 If specified, install dependencies. 814 815 **NOTE:** Dependencies that start with `'plugin:'` will be installed as 816 Meerschaum plugins from the same repository as this Plugin. 817 To install from a different repository, add the repo keys after `'@'` 818 (e.g. `'plugin:foo@api:bar'`). 819 820 Parameters 821 ---------- 822 force: bool, default False 823 If `True`, continue with the installation, even if some 824 required packages fail to install. 825 826 debug: bool, default False 827 Verbosity toggle. 828 829 Returns 830 ------- 831 A bool indicating success. 832 833 """ 834 from meerschaum.utils.packages import pip_install, venv_contains_package 835 from meerschaum.utils.debug import dprint 836 from meerschaum.utils.warnings import warn, info 837 from meerschaum.connectors.parse import parse_repo_keys 838 _deps = self.get_dependencies(debug=debug) 839 if not _deps and self.requirements_file_path is None: 840 return True 841 842 plugins = self.get_required_plugins(debug=debug) 843 for _plugin in plugins: 844 if _plugin.name == self.name: 845 warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False) 846 continue 847 _success, _msg = _plugin.repo_connector.install_plugin( 848 _plugin.name, debug=debug, force=force 849 ) 850 if not _success: 851 warn( 852 f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'" 853 + f" for plugin '{self.name}':\n" + _msg, 854 stack = False, 855 ) 856 if not force: 857 warn( 858 "Try installing with the `--force` flag to continue anyway.", 859 stack = False, 860 ) 861 return False 862 info( 863 "Continuing with installation despite the failure " 864 + "(careful, things might be broken!)...", 865 icon = False 866 ) 867 868 869 ### First step: parse `requirements.txt` if it exists. 870 if self.requirements_file_path is not None: 871 if not pip_install( 872 requirements_file_path=self.requirements_file_path, 873 venv=self.name, debug=debug 874 ): 875 warn( 876 f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.", 877 stack = False, 878 ) 879 if not force: 880 warn( 881 "Try installing with `--force` to continue anyway.", 882 stack = False, 883 ) 884 return False 885 info( 886 "Continuing with installation despite the failure " 887 + "(careful, things might be broken!)...", 888 icon = False 889 ) 890 891 892 ### Don't reinstall packages that are already included in required plugins. 893 packages = [] 894 _packages = self.get_required_packages(debug=debug) 895 accounted_for_packages = set() 896 for package_name in _packages: 897 for plugin in plugins: 898 if venv_contains_package(package_name, plugin.name): 899 accounted_for_packages.add(package_name) 900 break 901 packages = [pkg for pkg in _packages if pkg not in accounted_for_packages] 902 903 ### Attempt pip packages installation. 904 if packages: 905 for package in packages: 906 if not pip_install(package, venv=self.name, debug=debug): 907 warn( 908 f"Failed to install required package '{package}'" 909 + f" for plugin '{self.name}'.", 910 stack = False, 911 ) 912 if not force: 913 warn( 914 "Try installing with `--force` to continue anyway.", 915 stack = False, 916 ) 917 return False 918 info( 919 "Continuing with installation despite the failure " 920 + "(careful, things might be broken!)...", 921 icon = False 922 ) 923 return True
If specified, install dependencies.
NOTE: Dependencies that start with 'plugin:'
will be installed as
Meerschaum plugins from the same repository as this Plugin.
To install from a different repository, add the repo keys after '@'
(e.g. 'plugin:foo@api:bar'
).
Parameters
- force (bool, default False):
If
True
, continue with the installation, even if some required packages fail to install. - debug (bool, default False): Verbosity toggle.
Returns
- A bool indicating success.
18class Venv: 19 """ 20 Manage a virtual enviroment's activation status. 21 22 Examples 23 -------- 24 >>> from meerschaum.plugins import Plugin 25 >>> with Venv('mrsm') as venv: 26 ... import pandas 27 >>> with Venv(Plugin('noaa')) as venv: 28 ... import requests 29 >>> venv = Venv('mrsm') 30 >>> venv.activate() 31 True 32 >>> venv.deactivate() 33 True 34 >>> 35 """ 36 37 def __init__( 38 self, 39 venv: Union[str, 'meerschaum.plugins.Plugin', None] = 'mrsm', 40 debug: bool = False, 41 ) -> None: 42 from meerschaum.utils.venv import activate_venv, deactivate_venv, active_venvs 43 ### For some weird threading issue, 44 ### we can't use `isinstance` here. 45 if 'meerschaum.plugins._Plugin' in str(type(venv)): 46 self._venv = venv.name 47 self._activate = venv.activate_venv 48 self._deactivate = venv.deactivate_venv 49 self._kwargs = {} 50 else: 51 self._venv = venv 52 self._activate = activate_venv 53 self._deactivate = deactivate_venv 54 self._kwargs = {'venv': venv} 55 self._debug = debug 56 ### In case someone calls `deactivate()` before `activate()`. 57 self._kwargs['previously_active_venvs'] = copy.deepcopy(active_venvs) 58 59 60 def activate(self, debug: bool = False) -> bool: 61 """ 62 Activate this virtual environment. 63 If a `meerschaum.plugins.Plugin` was provided, its dependent virtual environments 64 will also be activated. 65 """ 66 from meerschaum.utils.venv import active_venvs 67 self._kwargs['previously_active_venvs'] = copy.deepcopy(active_venvs) 68 return self._activate(debug=(debug or self._debug), **self._kwargs) 69 70 71 def deactivate(self, debug: bool = False) -> bool: 72 """ 73 Deactivate this virtual environment. 74 If a `meerschaum.plugins.Plugin` was provided, its dependent virtual environments 75 will also be deactivated. 76 """ 77 return self._deactivate(debug=(debug or self._debug), **self._kwargs) 78 79 80 @property 81 def target_path(self) -> pathlib.Path: 82 """ 83 Return the target site-packages path for this virtual environment. 84 A `meerschaum.utils.venv.Venv` may have one virtual environment per minor Python version 85 (e.g. Python 3.10 and Python 3.7). 86 """ 87 from meerschaum.utils.venv import venv_target_path 88 return venv_target_path(venv=self._venv, allow_nonexistent=True, debug=self._debug) 89 90 91 @property 92 def root_path(self) -> pathlib.Path: 93 """ 94 Return the top-level path for this virtual environment. 95 """ 96 from meerschaum.config._paths import VIRTENV_RESOURCES_PATH 97 if self._venv is None: 98 return self.target_path.parent 99 return VIRTENV_RESOURCES_PATH / self._venv 100 101 102 def __enter__(self) -> None: 103 self.activate(debug=self._debug) 104 105 106 def __exit__(self, exc_type, exc_value, exc_traceback) -> None: 107 self.deactivate(debug=self._debug) 108 109 110 def __str__(self) -> str: 111 quote = "'" if self._venv is not None else "" 112 return "Venv(" + quote + str(self._venv) + quote + ")" 113 114 115 def __repr__(self) -> str: 116 return self.__str__()
Manage a virtual enviroment's activation status.
Examples
>>> from meerschaum.plugins import Plugin
>>> with Venv('mrsm') as venv:
... import pandas
>>> with Venv(Plugin('noaa')) as venv:
... import requests
>>> venv = Venv('mrsm')
>>> venv.activate()
True
>>> venv.deactivate()
True
>>>
37 def __init__( 38 self, 39 venv: Union[str, 'meerschaum.plugins.Plugin', None] = 'mrsm', 40 debug: bool = False, 41 ) -> None: 42 from meerschaum.utils.venv import activate_venv, deactivate_venv, active_venvs 43 ### For some weird threading issue, 44 ### we can't use `isinstance` here. 45 if 'meerschaum.plugins._Plugin' in str(type(venv)): 46 self._venv = venv.name 47 self._activate = venv.activate_venv 48 self._deactivate = venv.deactivate_venv 49 self._kwargs = {} 50 else: 51 self._venv = venv 52 self._activate = activate_venv 53 self._deactivate = deactivate_venv 54 self._kwargs = {'venv': venv} 55 self._debug = debug 56 ### In case someone calls `deactivate()` before `activate()`. 57 self._kwargs['previously_active_venvs'] = copy.deepcopy(active_venvs)
60 def activate(self, debug: bool = False) -> bool: 61 """ 62 Activate this virtual environment. 63 If a `meerschaum.plugins.Plugin` was provided, its dependent virtual environments 64 will also be activated. 65 """ 66 from meerschaum.utils.venv import active_venvs 67 self._kwargs['previously_active_venvs'] = copy.deepcopy(active_venvs) 68 return self._activate(debug=(debug or self._debug), **self._kwargs)
Activate this virtual environment.
If a meerschaum.plugins.Plugin
was provided, its dependent virtual environments
will also be activated.
71 def deactivate(self, debug: bool = False) -> bool: 72 """ 73 Deactivate this virtual environment. 74 If a `meerschaum.plugins.Plugin` was provided, its dependent virtual environments 75 will also be deactivated. 76 """ 77 return self._deactivate(debug=(debug or self._debug), **self._kwargs)
Deactivate this virtual environment.
If a meerschaum.plugins.Plugin
was provided, its dependent virtual environments
will also be deactivated.
Return the target site-packages path for this virtual environment.
A meerschaum.utils.venv.Venv
may have one virtual environment per minor Python version
(e.g. Python 3.10 and Python 3.7).
10def pprint( 11 *args, 12 detect_password: bool = True, 13 nopretty: bool = False, 14 **kw 15 ) -> None: 16 """Pretty print an object according to the configured ANSI and UNICODE settings. 17 If detect_password is True (default), search and replace passwords with '*' characters. 18 Does not mutate objects. 19 """ 20 from meerschaum.utils.packages import attempt_import, import_rich 21 from meerschaum.utils.formatting import ANSI, UNICODE, get_console, print_tuple 22 from meerschaum.utils.warnings import error 23 from meerschaum.utils.misc import replace_password, dict_from_od, filter_keywords 24 from collections import OrderedDict 25 import copy, json 26 27 if ( 28 len(args) == 1 29 and 30 isinstance(args[0], tuple) 31 and 32 len(args[0]) == 2 33 and 34 isinstance(args[0][0], bool) 35 and 36 isinstance(args[0][1], str) 37 ): 38 return print_tuple(args[0]) 39 40 modify = True 41 rich_pprint = None 42 if ANSI and not nopretty: 43 rich = import_rich() 44 if rich is not None: 45 rich_pretty = attempt_import('rich.pretty') 46 if rich_pretty is not None: 47 def _rich_pprint(*args, **kw): 48 _console = get_console() 49 _kw = filter_keywords(_console.print, **kw) 50 _console.print(*args, **_kw) 51 rich_pprint = _rich_pprint 52 elif not nopretty: 53 pprintpp = attempt_import('pprintpp', warn=False) 54 try: 55 _pprint = pprintpp.pprint 56 except Exception as e: 57 import pprint as _pprint_module 58 _pprint = _pprint_module.pprint 59 60 func = ( 61 _pprint if rich_pprint is None else rich_pprint 62 ) if not nopretty else print 63 64 try: 65 args_copy = copy.deepcopy(args) 66 except Exception as e: 67 args_copy = args 68 modify = False 69 _args = [] 70 for a in args: 71 c = a 72 ### convert OrderedDict into dict 73 if isinstance(a, OrderedDict) or issubclass(type(a), OrderedDict): 74 c = dict_from_od(copy.deepcopy(c)) 75 _args.append(c) 76 args = _args 77 78 _args = list(args) 79 if detect_password and modify: 80 _args = [] 81 for a in args: 82 c = a 83 if isinstance(c, dict): 84 c = replace_password(copy.deepcopy(c)) 85 if nopretty: 86 try: 87 c = json.dumps(c) 88 is_json = True 89 except Exception as e: 90 is_json = False 91 if not is_json: 92 try: 93 c = str(c) 94 except Exception as e: 95 pass 96 _args.append(c) 97 98 ### filter out unsupported keywords 99 func_kw = filter_keywords(func, **kw) if not nopretty else {} 100 error_msg = None 101 try: 102 func(*_args, **func_kw) 103 except Exception as e: 104 error_msg = e 105 if error_msg is not None: 106 error(error_msg)
Pretty print an object according to the configured ANSI and UNICODE settings. If detect_password is True (default), search and replace passwords with '*' characters. Does not mutate objects.
1052def attempt_import( 1053 *names: str, 1054 lazy: bool = True, 1055 warn: bool = True, 1056 install: bool = True, 1057 venv: Optional[str] = 'mrsm', 1058 precheck: bool = True, 1059 split: bool = True, 1060 check_update: bool = False, 1061 check_pypi: bool = False, 1062 check_is_installed: bool = True, 1063 color: bool = True, 1064 debug: bool = False 1065 ) -> Union[Any, Tuple[Any]]: 1066 """ 1067 Raise a warning if packages are not installed; otherwise import and return modules. 1068 If `lazy` is `True`, return lazy-imported modules. 1069 1070 Returns tuple of modules if multiple names are provided, else returns one module. 1071 1072 Parameters 1073 ---------- 1074 names: List[str] 1075 The packages to be imported. 1076 1077 lazy: bool, default True 1078 If `True`, lazily load packages. 1079 1080 warn: bool, default True 1081 If `True`, raise a warning if a package cannot be imported. 1082 1083 install: bool, default True 1084 If `True`, attempt to install a missing package into the designated virtual environment. 1085 If `check_update` is True, install updates if available. 1086 1087 venv: Optional[str], default 'mrsm' 1088 The virtual environment in which to search for packages and to install packages into. 1089 1090 precheck: bool, default True 1091 If `True`, attempt to find module before importing (necessary for checking if modules exist 1092 and retaining lazy imports), otherwise assume lazy is `False`. 1093 1094 split: bool, default True 1095 If `True`, split packages' names on `'.'`. 1096 1097 check_update: bool, default False 1098 If `True` and `install` is `True`, install updates if the required minimum version 1099 does not match. 1100 1101 check_pypi: bool, default False 1102 If `True` and `check_update` is `True`, check PyPI when determining whether 1103 an update is required. 1104 1105 check_is_installed: bool, default True 1106 If `True`, check if the package is contained in the virtual environment. 1107 1108 Returns 1109 ------- 1110 The specified modules. If they're not available and `install` is `True`, it will first 1111 download them into a virtual environment and return the modules. 1112 1113 Examples 1114 -------- 1115 >>> pandas, sqlalchemy = attempt_import('pandas', 'sqlalchemy') 1116 >>> pandas = attempt_import('pandas') 1117 1118 """ 1119 1120 import importlib.util 1121 1122 ### to prevent recursion, check if parent Meerschaum package is being imported 1123 if names == ('meerschaum',): 1124 return _import_module('meerschaum') 1125 1126 if venv == 'mrsm' and _import_hook_venv is not None: 1127 if debug: 1128 print(f"Import hook for virtual environment '{_import_hook_venv}' is active.") 1129 venv = _import_hook_venv 1130 1131 _warnings = _import_module('meerschaum.utils.warnings') 1132 warn_function = _warnings.warn 1133 1134 def do_import(_name: str, **kw) -> Union['ModuleType', None]: 1135 with Venv(venv=venv, debug=debug): 1136 ### determine the import method (lazy vs normal) 1137 from meerschaum.utils.misc import filter_keywords 1138 import_method = ( 1139 _import_module if not lazy 1140 else lazy_import 1141 ) 1142 try: 1143 mod = import_method(_name, **(filter_keywords(import_method, **kw))) 1144 except Exception as e: 1145 if warn: 1146 import traceback 1147 traceback.print_exception(type(e), e, e.__traceback__) 1148 warn_function( 1149 f"Failed to import module '{_name}'.\nException:\n{e}", 1150 ImportWarning, 1151 stacklevel = (5 if lazy else 4), 1152 color = False, 1153 ) 1154 mod = None 1155 return mod 1156 1157 modules = [] 1158 for name in names: 1159 ### Check if package is a declared dependency. 1160 root_name = name.split('.')[0] if split else name 1161 install_name = _import_to_install_name(root_name) 1162 1163 if install_name is None: 1164 install_name = root_name 1165 if warn and root_name != 'plugins': 1166 warn_function( 1167 f"Package '{root_name}' is not declared in meerschaum.utils.packages.", 1168 ImportWarning, 1169 stacklevel = 3, 1170 color = False 1171 ) 1172 1173 ### Determine if the package exists. 1174 if precheck is False: 1175 found_module = ( 1176 do_import( 1177 name, debug=debug, warn=False, venv=venv, color=color, 1178 check_update=False, check_pypi=False, split=split, 1179 ) is not None 1180 ) 1181 else: 1182 if check_is_installed: 1183 with _locks['_is_installed_first_check']: 1184 if not _is_installed_first_check.get(name, False): 1185 package_is_installed = is_installed( 1186 name, 1187 venv = venv, 1188 split = split, 1189 debug = debug, 1190 ) 1191 _is_installed_first_check[name] = package_is_installed 1192 else: 1193 package_is_installed = _is_installed_first_check[name] 1194 else: 1195 package_is_installed = _is_installed_first_check.get( 1196 name, 1197 venv_contains_package(name, venv=venv, split=split, debug=debug) 1198 ) 1199 found_module = package_is_installed 1200 1201 if not found_module: 1202 if install: 1203 if not pip_install( 1204 install_name, 1205 venv = venv, 1206 split = False, 1207 check_update = check_update, 1208 color = color, 1209 debug = debug 1210 ) and warn: 1211 warn_function( 1212 f"Failed to install '{install_name}'.", 1213 ImportWarning, 1214 stacklevel = 3, 1215 color = False, 1216 ) 1217 elif warn: 1218 ### Raise a warning if we can't find the package and install = False. 1219 warn_function( 1220 (f"\n\nMissing package '{name}' from virtual environment '{venv}'; " 1221 + "some features will not work correctly." 1222 + f"\n\nSet install=True when calling attempt_import.\n"), 1223 ImportWarning, 1224 stacklevel = 3, 1225 color = False, 1226 ) 1227 1228 ### Do the import. Will be lazy if lazy=True. 1229 m = do_import( 1230 name, debug=debug, warn=warn, venv=venv, color=color, 1231 check_update=check_update, check_pypi=check_pypi, install=install, split=split, 1232 ) 1233 modules.append(m) 1234 1235 modules = tuple(modules) 1236 if len(modules) == 1: 1237 return modules[0] 1238 return modules
Raise a warning if packages are not installed; otherwise import and return modules.
If lazy
is True
, return lazy-imported modules.
Returns tuple of modules if multiple names are provided, else returns one module.
Parameters
- names (List[str]): The packages to be imported.
- lazy (bool, default True):
If
True
, lazily load packages. - warn (bool, default True):
If
True
, raise a warning if a package cannot be imported. - install (bool, default True):
If
True
, attempt to install a missing package into the designated virtual environment. Ifcheck_update
is True, install updates if available. - venv (Optional[str], default 'mrsm'): The virtual environment in which to search for packages and to install packages into.
- precheck (bool, default True):
If
True
, attempt to find module before importing (necessary for checking if modules exist and retaining lazy imports), otherwise assume lazy isFalse
. - split (bool, default True):
If
True
, split packages' names on'.'
. - check_update (bool, default False):
If
True
andinstall
isTrue
, install updates if the required minimum version does not match. - check_pypi (bool, default False):
If
True
andcheck_update
isTrue
, check PyPI when determining whether an update is required. - check_is_installed (bool, default True):
If
True
, check if the package is contained in the virtual environment.
Returns
- The specified modules. If they're not available and
install
isTrue
, it will first - download them into a virtual environment and return the modules.
Examples
>>> pandas, sqlalchemy = attempt_import('pandas', 'sqlalchemy')
>>> pandas = attempt_import('pandas')