meerschaum
Meerschaum Python API
Welcome to the Meerschaum Python API technical documentation! Here you can find information about the classes and functions provided by the meerschaum
package. Visit meerschaum.io for general usage documentation.
Root Module
For your convenience, the following classes and functions may be imported from the root meerschaum
namespace:
Classes
Examples
Build a Connector
import meerschaum as mrsm
sql_conn = mrsm.get_connector(
'sql:temp',
flavor='sqlite',
database='/tmp/tmp.db',
)
df = sql_conn.read("SELECT 1 AS foo")
print(df)
# foo
# 0 1
sql_conn.to_sql(df, 'foo')
print(sql_conn.read('foo'))
# foo
# 0 1
Create a Custom Connector Class
from datetime import datetime, timezone
from random import randint
import meerschaum as mrsm
from meerschaum.utils.misc import round_time
@mrsm.make_connector
class FooConnector(mrsm.Connector):
REQUIRED_ATTRIBUTES = ['username', 'password']
def fetch(
self,
begin: datetime | None = None,
end: datetime | None = None,
):
now = begin or round_time(datetime.now(timezone.utc))
return [
{'ts': now, 'id': 1, 'vl': randint(1, 100)},
{'ts': now, 'id': 2, 'vl': randint(1, 100)},
{'ts': now, 'id': 3, 'vl': randint(1, 100)},
]
foo_conn = mrsm.get_connector(
'foo:bar',
username='foo',
password='bar',
)
docs = foo_conn.fetch()
Build a Pipe
from datetime import datetime
import meerschaum as mrsm
pipe = mrsm.Pipe(
foo_conn, 'demo',
instance=sql_conn,
columns={'datetime': 'ts', 'id': 'id'},
tags=['production'],
)
pipe.sync(begin=datetime(2024, 1, 1))
df = pipe.get_data()
print(df)
# ts id vl
# 0 2024-01-01 1 97
# 1 2024-01-01 2 18
# 2 2024-01-01 3 96
Get Registered Pipes
import meerschaum as mrsm
pipes = mrsm.get_pipes(
tags=['production'],
instance=sql_conn,
as_list=True,
)
print(pipes)
# [Pipe('foo:bar', 'demo', instance='sql:temp')]
Access a Plugin's Module
import meerschaum as mrsm
plugin = mrsm.Plugin('noaa')
with mrsm.Venv(plugin):
noaa = plugin.module
print(noaa.get_station_info('KGMU'))
# {'name': 'Greenville Downtown Airport', 'geometry': {'type': 'Point', 'coordinates': [-82.35004, 34.84873]}}
Submodules
meerschaum.actions
Access functions for actions and subactions.
meerschaum.actions.actions
meerschaum.actions.get_action()
meerschaum.actions.get_completer()
meerschaum.actions.get_main_action_name()
meerschaum.actions.get_subactions()
meerschaum.config
Read and write the Meerschaum configuration registry.
get_config()
meerschaum.config.get_plugin_config()
meerschaum.config.write_config()
meerschaum.config.write_plugin_config()
meerschaum.connectors
Build connectors to interact with databases and fetch data.
get_connector()
make_connector()
meerschaum.connectors.is_connected()
meerschaum.connectors.poll.retry_connect()
meerschaum.connectors.Connector
meerschaum.connectors.SQLConnector
meerschaum.connectors.APIConnector
meerschaum.plugins
Access plugin modules and other API utilties.
Plugin
meerschaum.plugins.api_plugin()
meerschaum.plugins.dash_plugin()
meerschaum.plugins.import_plugins()
meerschaum.plugins.reload_plugins()
meerschaum.plugins.get_plugins()
meerschaum.plugins.get_data_plugins()
meerschaum.plugins.add_plugin_argument()
meerschaum.plugins.pre_sync_hook()
meerschaum.plugins.post_sync_hook()
meerschaum.utils
Utility functions are available in several submodules:
meerschaum.utils.daemon.daemon_entry()
meerschaum.utils.daemon.daemon_action()
meerschaum.utils.daemon.get_daemons()
meerschaum.utils.daemon.get_daemon_ids()
meerschaum.utils.daemon.get_running_daemons()
meerschaum.utils.daemon.get_paused_daemons()
meerschaum.utils.daemon.get_stopped_daemons()
meerschaum.utils.daemon.get_filtered_daemons()
meerschaum.utils.daemon.run_daemon()
meerschaum.utils.daemon.Daemon
meerschaum.utils.daemon.FileDescriptorInterceptor
meerschaum.utils.daemon.RotatingFile
meerschaum.utils.daemon
Manage background jobs.
meerschaum.utils.dataframe.add_missing_cols_to_df()
meerschaum.utils.dataframe.df_is_chunk_generator()
meerschaum.utils.dataframe.enforce_dtypes()
meerschaum.utils.dataframe.filter_unseen_df()
meerschaum.utils.dataframe.get_datetime_bound_from_df()
meerschaum.utils.dataframe.get_first_valid_dask_partition()
meerschaum.utils.dataframe.get_json_cols()
meerschaum.utils.dataframe.get_numeric_cols()
meerschaum.utils.dataframe.get_unhashable_cols()
meerschaum.utils.dataframe.parse_df_datetimes()
meerschaum.utils.dataframe.query_df()
meerschaum.utils.dataframe
Manipulate dataframes.
meerschaum.utils.dtypes.are_dtypes_equal()
meerschaum.utils.dtypes.attempt_cast_to_numeric()
meerschaum.utils.dtypes.is_dtype_numeric()
meerschaum.utils.dtypes.none_if_null()
meerschaum.utils.dtypes.quantize_decimal()
meerschaum.utils.dtypes.to_pandas_dtype()
meerschaum.utils.dtypes.value_is_null()
meerschaum.utils.dtypes.sql.get_pd_type_from_db_type()
meerschaum.utils.dtypes.sql.get_db_type_from_pd_type()
meerschaum.utils.dtypes
Work with data types.
meerschaum.utils.formatting.colored()
meerschaum.utils.formatting.extract_stats_from_message()
meerschaum.utils.formatting.fill_ansi()
meerschaum.utils.formatting.get_console()
meerschaum.utils.formatting.highlight_pipes()
meerschaum.utils.formatting.make_header()
meerschaum.utils.formatting.pipe_repr()
pprint()
meerschaum.utils.formatting.pprint_pipes()
meerschaum.utils.formatting.print_options()
meerschaum.utils.formatting.print_pipes_results()
meerschaum.utils.formatting.print_tuple()
meerschaum.utils.formatting.translate_rich_to_termcolor()
meerschaum.utils.formatting
Format output text.
meerschaum.utils.misc.items_str()
meerschaum.utils.misc.round_time()
meerschaum.utils.misc.is_int()
meerschaum.utils.misc.interval_str()
meerschaum.utils.misc.filter_keywords()
meerschaum.utils.misc.generate_password()
meerschaum.utils.misc.string_to_dict()
meerschaum.utils.misc.iterate_chunks()
meerschaum.utils.misc.timed_input()
meerschaum.utils.misc.replace_pipes_in_dict()
meerschaum.utils.misc.is_valid_email()
meerschaum.utils.misc.string_width()
meerschaum.utils.misc.replace_password()
meerschaum.utils.misc.parse_config_substitution()
meerschaum.utils.misc.edit_file()
meerschaum.utils.misc.get_in_ex_params()
meerschaum.utils.misc.separate_negation_values()
meerschaum.utils.misc.flatten_list()
meerschaum.utils.misc.make_symlink()
meerschaum.utils.misc.is_symlink()
meerschaum.utils.misc.wget()
meerschaum.utils.misc.add_method_to_class()
meerschaum.utils.misc.is_pipe_registered()
meerschaum.utils.misc.get_cols_lines()
meerschaum.utils.misc.sorted_dict()
meerschaum.utils.misc.flatten_pipes_dict()
meerschaum.utils.misc.dict_from_od()
meerschaum.utils.misc.remove_ansi()
meerschaum.utils.misc.get_connector_labels()
meerschaum.utils.misc.json_serialize_datetime()
meerschaum.utils.misc.async_wrap()
meerschaum.utils.misc.is_docker_available()
meerschaum.utils.misc.is_android()
meerschaum.utils.misc.is_bcp_available()
meerschaum.utils.misc.truncate_string_sections()
meerschaum.utils.misc.safely_extract_tar()
meerschaum.utils.misc
Miscellaneous utility functions.
attempt_import()
meerschaum.utils.packages.get_module_path()
meerschaum.utils.packages.manually_import_module()
meerschaum.utils.packages.get_install_no_version()
meerschaum.utils.packages.determine_version()
meerschaum.utils.packages.need_update()
meerschaum.utils.packages.get_pip()
meerschaum.utils.packages.pip_install()
meerschaum.utils.packages.pip_uninstall()
meerschaum.utils.packages.completely_uninstall_package()
meerschaum.utils.packages.run_python_package()
meerschaum.utils.packages.lazy_import()
meerschaum.utils.packages.pandas_name()
meerschaum.utils.packages.import_pandas()
meerschaum.utils.packages.import_rich()
meerschaum.utils.packages.import_dcc()
meerschaum.utils.packages.import_html()
meerschaum.utils.packages.get_modules_from_package()
meerschaum.utils.packages.import_children()
meerschaum.utils.packages.reload_package()
meerschaum.utils.packages.reload_meerschaum()
meerschaum.utils.packages.is_installed()
meerschaum.utils.packages.venv_contains_package()
meerschaum.utils.packages.package_venv()
meerschaum.utils.packages.ensure_readline()
meerschaum.utils.packages.get_prerelease_dependencies()
meerschaum.utils.packages
Manage Python packages.
meerschaum.utils.sql.build_where()
meerschaum.utils.sql.clean()
meerschaum.utils.sql.dateadd_str()
meerschaum.utils.sql.test_connection()
meerschaum.utils.sql.get_distinct_col_count()
meerschaum.utils.sql.sql_item_name()
meerschaum.utils.sql.pg_capital()
meerschaum.utils.sql.oracle_capital()
meerschaum.utils.sql.truncate_item_name()
meerschaum.utils.sql.table_exists()
meerschaum.utils.sql.get_table_cols_types()
meerschaum.utils.sql.get_update_queries()
meerschaum.utils.sql.get_null_replacement()
meerschaum.utils.sql.get_db_version()
meerschaum.utils.sql.get_rename_table_queries()
meerschaum.utils.sql.get_create_table_query()
meerschaum.utils.sql.format_cte_subquery()
meerschaum.utils.sql.session_execute()
meerschaum.utils.sql
Build SQL queries.
Venv
meerschaum.utils.venv.activate_venv()
meerschaum.utils.venv.deactivate_venv()
meerschaum.utils.venv.get_module_venv()
meerschaum.utils.venv.get_venvs()
meerschaum.utils.venv.init_venv()
meerschaum.utils.venv.inside_venv()
meerschaum.utils.venv.is_venv_active()
meerschaum.utils.venv.venv_exec()
meerschaum.utils.venv.venv_executable()
meerschaum.utils.venv.venv_exists()
meerschaum.utils.venv.venv_target_path()
meerschaum.utils.venv.verify_venv()
meerschaum.utils.venv
Manage virtual environments.
meerschaum.utils.warnings
Print warnings, errors, info, and debug messages.
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Copyright 2023 Bennett Meares 7 8Licensed under the Apache License, Version 2.0 (the "License"); 9you may not use this file except in compliance with the License. 10You may obtain a copy of the License at 11 12 http://www.apache.org/licenses/LICENSE-2.0 13 14Unless required by applicable law or agreed to in writing, software 15distributed under the License is distributed on an "AS IS" BASIS, 16WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 17See the License for the specific language governing permissions and 18limitations under the License. 19""" 20 21import atexit 22from meerschaum.utils.typing import SuccessTuple 23from meerschaum.core.Pipe import Pipe 24from meerschaum.plugins import Plugin 25from meerschaum.utils.venv import Venv 26from meerschaum.connectors import get_connector, Connector, make_connector 27from meerschaum.utils import get_pipes 28from meerschaum.utils.formatting import pprint 29from meerschaum._internal.docs import index as __doc__ 30from meerschaum.config import __version__, get_config 31from meerschaum.utils.packages import attempt_import 32from meerschaum.__main__ import _close_pools 33 34atexit.register(_close_pools) 35 36__pdoc__ = {'gui': False, 'api': False, 'core': False, '_internal': False} 37__all__ = ( 38 "get_pipes", 39 "get_connector", 40 "get_config", 41 "Pipe", 42 "Plugin", 43 "Venv", 44 "Plugin", 45 "pprint", 46 "attempt_import", 47 "actions", 48 "config", 49 "connectors", 50 "plugins", 51 "utils", 52 "SuccessTuple", 53 "Connector", 54 "make_connector", 55)
19def get_pipes( 20 connector_keys: Union[str, List[str], None] = None, 21 metric_keys: Union[str, List[str], None] = None, 22 location_keys: Union[str, List[str], None] = None, 23 tags: Optional[List[str]] = None, 24 params: Optional[Dict[str, Any]] = None, 25 mrsm_instance: Union[str, InstanceConnector, None] = None, 26 instance: Union[str, InstanceConnector, None] = None, 27 as_list: bool = False, 28 method: str = 'registered', 29 wait: bool = False, 30 debug: bool = False, 31 **kw: Any 32 ) -> Union[PipesDict, List[mrsm.Pipe]]: 33 """ 34 Return a dictionary or list of `meerschaum.Pipe` objects. 35 36 Parameters 37 ---------- 38 connector_keys: Union[str, List[str], None], default None 39 String or list of connector keys. 40 If omitted or is `'*'`, fetch all possible keys. 41 If a string begins with `'_'`, select keys that do NOT match the string. 42 43 metric_keys: Union[str, List[str], None], default None 44 String or list of metric keys. See `connector_keys` for formatting. 45 46 location_keys: Union[str, List[str], None], default None 47 String or list of location keys. See `connector_keys` for formatting. 48 49 tags: Optional[List[str]], default None 50 If provided, only include pipes with these tags. 51 52 params: Optional[Dict[str, Any]], default None 53 Dictionary of additional parameters to search by. 54 Params are parsed into a SQL WHERE clause. 55 E.g. `{'a': 1, 'b': 2}` equates to `'WHERE a = 1 AND b = 2'` 56 57 mrsm_instance: Union[str, InstanceConnector, None], default None 58 Connector keys for the Meerschaum instance of the pipes. 59 Must be a `meerschaum.connectors.sql.SQLConnector.SQLConnector` or 60 `meerschaum.connectors.api.APIConnector.APIConnector`. 61 62 as_list: bool, default False 63 If `True`, return pipes in a list instead of a hierarchical dictionary. 64 `False` : `{connector_keys: {metric_key: {location_key: Pipe}}}` 65 `True` : `[Pipe]` 66 67 method: str, default 'registered' 68 Available options: `['registered', 'explicit', 'all']` 69 If `'registered'` (default), create pipes based on registered keys in the connector's pipes table 70 (API or SQL connector, depends on mrsm_instance). 71 If `'explicit'`, create pipes from provided connector_keys, metric_keys, and location_keys 72 instead of consulting the pipes table. Useful for creating non-existent pipes. 73 If `'all'`, create pipes from predefined metrics and locations. Required `connector_keys`. 74 **NOTE:** Method `'all'` is not implemented! 75 76 wait: bool, default False 77 Wait for a connection before getting Pipes. Should only be true for cases where the 78 database might not be running (like the API). 79 80 **kw: Any: 81 Keyword arguments to pass to the `meerschaum.Pipe` constructor. 82 83 84 Returns 85 ------- 86 A dictionary of dictionaries and `meerschaum.Pipe` objects 87 in the connector, metric, location hierarchy. 88 If `as_list` is `True`, return a list of `meerschaum.Pipe` objects. 89 90 Examples 91 -------- 92 ``` 93 >>> ### Manual definition: 94 >>> pipes = { 95 ... <connector_keys>: { 96 ... <metric_key>: { 97 ... <location_key>: Pipe( 98 ... <connector_keys>, 99 ... <metric_key>, 100 ... <location_key>, 101 ... ), 102 ... }, 103 ... }, 104 ... }, 105 >>> ### Accessing a single pipe: 106 >>> pipes['sql:main']['weather'][None] 107 >>> ### Return a list instead: 108 >>> get_pipes(as_list=True) 109 [sql_main_weather] 110 >>> 111 ``` 112 """ 113 114 from meerschaum.config import get_config 115 from meerschaum.utils.warnings import error 116 from meerschaum.utils.misc import filter_keywords 117 118 if connector_keys is None: 119 connector_keys = [] 120 if metric_keys is None: 121 metric_keys = [] 122 if location_keys is None: 123 location_keys = [] 124 if params is None: 125 params = {} 126 if tags is None: 127 tags = [] 128 129 if isinstance(connector_keys, str): 130 connector_keys = [connector_keys] 131 if isinstance(metric_keys, str): 132 metric_keys = [metric_keys] 133 if isinstance(location_keys, str): 134 location_keys = [location_keys] 135 136 ### Get SQL or API connector (keys come from `connector.fetch_pipes_keys()`). 137 ### If `wait`, wait until a connection is made 138 if mrsm_instance is None: 139 mrsm_instance = instance 140 if mrsm_instance is None: 141 mrsm_instance = get_config('meerschaum', 'instance', patch=True) 142 if isinstance(mrsm_instance, str): 143 from meerschaum.connectors.parse import parse_instance_keys 144 connector = parse_instance_keys(keys=mrsm_instance, wait=wait, debug=debug) 145 else: ### NOTE: mrsm_instance MUST be a SQL or API connector for this to work 146 from meerschaum.connectors import instance_types 147 valid_connector = False 148 if hasattr(mrsm_instance, 'type'): 149 if mrsm_instance.type in instance_types: 150 valid_connector = True 151 if not valid_connector: 152 error(f"Invalid instance connector: {mrsm_instance}") 153 connector = mrsm_instance 154 if debug: 155 from meerschaum.utils.debug import dprint 156 dprint(f"Using instance connector: {connector}") 157 if not connector: 158 error(f"Could not create connector from keys: '{mrsm_instance}'") 159 160 ### Get a list of tuples for the keys needed to build pipes. 161 result = fetch_pipes_keys( 162 method, 163 connector, 164 connector_keys = connector_keys, 165 metric_keys = metric_keys, 166 location_keys = location_keys, 167 tags = tags, 168 params = params, 169 debug = debug 170 ) 171 if result is None: 172 error(f"Unable to build pipes!") 173 174 ### Populate the `pipes` dictionary with Pipes based on the keys 175 ### obtained from the chosen `method`. 176 from meerschaum import Pipe 177 pipes = {} 178 for ck, mk, lk in result: 179 if ck not in pipes: 180 pipes[ck] = {} 181 182 if mk not in pipes[ck]: 183 pipes[ck][mk] = {} 184 185 pipes[ck][mk][lk] = Pipe( 186 ck, mk, lk, 187 mrsm_instance = connector, 188 debug = debug, 189 **filter_keywords(Pipe, **kw) 190 ) 191 192 if not as_list: 193 return pipes 194 from meerschaum.utils.misc import flatten_pipes_dict 195 return flatten_pipes_dict(pipes)
Return a dictionary or list of Pipe
objects.
Parameters
- connector_keys (Union[str, List[str], None], default None):
String or list of connector keys.
If omitted or is
'*'
, fetch all possible keys. If a string begins with'_'
, select keys that do NOT match the string. - metric_keys (Union[str, List[str], None], default None):
String or list of metric keys. See
connector_keys
for formatting. - location_keys (Union[str, List[str], None], default None):
String or list of location keys. See
connector_keys
for formatting. - tags (Optional[List[str]], default None): If provided, only include pipes with these tags.
- params (Optional[Dict[str, Any]], default None):
Dictionary of additional parameters to search by.
Params are parsed into a SQL WHERE clause.
E.g.
{'a': 1, 'b': 2}
equates to'WHERE a = 1 AND b = 2'
- mrsm_instance (Union[str, InstanceConnector, None], default None):
Connector keys for the Meerschaum instance of the pipes.
Must be a
meerschaum.connectors.SQLConnector
ormeerschaum.connectors.APIConnector
. - as_list (bool, default False):
If
True
, return pipes in a list instead of a hierarchical dictionary.False
:{connector_keys: {metric_key: {location_key: Pipe}}}
True
:[Pipe]
- method (str, default 'registered'):
Available options:
['registered', 'explicit', 'all']
If'registered'
(default), create pipes based on registered keys in the connector's pipes table (API or SQL connector, depends on mrsm_instance). If'explicit'
, create pipes from provided connector_keys, metric_keys, and location_keys instead of consulting the pipes table. Useful for creating non-existent pipes. If'all'
, create pipes from predefined metrics and locations. Requiredconnector_keys
. NOTE: Method'all'
is not implemented! - wait (bool, default False): Wait for a connection before getting Pipes. Should only be true for cases where the database might not be running (like the API).
- **kw (Any:):
Keyword arguments to pass to the
Pipe
constructor.
Returns
- A dictionary of dictionaries and
Pipe
objects - in the connector, metric, location hierarchy.
- If
as_list
isTrue
, return a list ofPipe
objects.
Examples
>>> ### Manual definition:
>>> pipes = {
... <connector_keys>: {
... <metric_key>: {
... <location_key>: Pipe(
... <connector_keys>,
... <metric_key>,
... <location_key>,
... ),
... },
... },
... },
>>> ### Accessing a single pipe:
>>> pipes['sql:main']['weather'][None]
>>> ### Return a list instead:
>>> get_pipes(as_list=True)
[sql_main_weather]
>>>
73def get_connector( 74 type: str = None, 75 label: str = None, 76 refresh: bool = False, 77 debug: bool = False, 78 **kw: Any 79 ) -> Connector: 80 """ 81 Return existing connector or create new connection and store for reuse. 82 83 You can create new connectors if enough parameters are provided for the given type and flavor. 84 85 86 Parameters 87 ---------- 88 type: Optional[str], default None 89 Connector type (sql, api, etc.). 90 Defaults to the type of the configured `instance_connector`. 91 92 label: Optional[str], default None 93 Connector label (e.g. main). Defaults to `'main'`. 94 95 refresh: bool, default False 96 Refresh the Connector instance / construct new object. Defaults to `False`. 97 98 kw: Any 99 Other arguments to pass to the Connector constructor. 100 If the Connector has already been constructed and new arguments are provided, 101 `refresh` is set to `True` and the old Connector is replaced. 102 103 Returns 104 ------- 105 A new Meerschaum connector (e.g. `meerschaum.connectors.api.APIConnector`, 106 `meerschaum.connectors.sql.SQLConnector`). 107 108 Examples 109 -------- 110 The following parameters would create a new 111 `meerschaum.connectors.sql.SQLConnector` that isn't in the configuration file. 112 113 ``` 114 >>> conn = get_connector( 115 ... type = 'sql', 116 ... label = 'newlabel', 117 ... flavor = 'sqlite', 118 ... database = '/file/path/to/database.db' 119 ... ) 120 >>> 121 ``` 122 123 """ 124 from meerschaum.connectors.parse import parse_instance_keys 125 from meerschaum.config import get_config 126 from meerschaum.config.static import STATIC_CONFIG 127 from meerschaum.utils.warnings import warn 128 global _loaded_plugin_connectors 129 if isinstance(type, str) and not label and ':' in type: 130 type, label = type.split(':', maxsplit=1) 131 with _locks['_loaded_plugin_connectors']: 132 if not _loaded_plugin_connectors: 133 load_plugin_connectors() 134 _loaded_plugin_connectors = True 135 if type is None and label is None: 136 default_instance_keys = get_config('meerschaum', 'instance', patch=True) 137 ### recursive call to get_connector 138 return parse_instance_keys(default_instance_keys) 139 140 ### NOTE: the default instance connector may not be main. 141 ### Only fall back to 'main' if the type is provided by the label is omitted. 142 label = label if label is not None else STATIC_CONFIG['connectors']['default_label'] 143 144 ### type might actually be a label. Check if so and raise a warning. 145 if type not in connectors: 146 possibilities, poss_msg = [], "" 147 for _type in get_config('meerschaum', 'connectors'): 148 if type in get_config('meerschaum', 'connectors', _type): 149 possibilities.append(f"{_type}:{type}") 150 if len(possibilities) > 0: 151 poss_msg = " Did you mean" 152 for poss in possibilities[:-1]: 153 poss_msg += f" '{poss}'," 154 if poss_msg.endswith(','): 155 poss_msg = poss_msg[:-1] 156 if len(possibilities) > 1: 157 poss_msg += " or" 158 poss_msg += f" '{possibilities[-1]}'?" 159 160 warn(f"Cannot create Connector of type '{type}'." + poss_msg, stack=False) 161 return None 162 163 if 'sql' not in types: 164 from meerschaum.connectors.plugin import PluginConnector 165 with _locks['types']: 166 types.update({ 167 'api' : APIConnector, 168 'sql' : SQLConnector, 169 'plugin': PluginConnector, 170 }) 171 172 ### determine if we need to call the constructor 173 if not refresh: 174 ### see if any user-supplied arguments differ from the existing instance 175 if label in connectors[type]: 176 warning_message = None 177 for attribute, value in kw.items(): 178 if attribute not in connectors[type][label].meta: 179 import inspect 180 cls = connectors[type][label].__class__ 181 cls_init_signature = inspect.signature(cls) 182 cls_init_params = cls_init_signature.parameters 183 if attribute not in cls_init_params: 184 warning_message = ( 185 f"Received new attribute '{attribute}' not present in connector " + 186 f"{connectors[type][label]}.\n" 187 ) 188 elif connectors[type][label].__dict__[attribute] != value: 189 warning_message = ( 190 f"Mismatched values for attribute '{attribute}' in connector " 191 + f"'{connectors[type][label]}'.\n" + 192 f" - Keyword value: '{value}'\n" + 193 f" - Existing value: '{connectors[type][label].__dict__[attribute]}'\n" 194 ) 195 if warning_message is not None: 196 warning_message += ( 197 "\nSetting `refresh` to True and recreating connector with type:" 198 + f" '{type}' and label '{label}'." 199 ) 200 refresh = True 201 warn(warning_message) 202 else: ### connector doesn't yet exist 203 refresh = True 204 205 ### only create an object if refresh is True 206 ### (can be manually specified, otherwise determined above) 207 if refresh: 208 with _locks['connectors']: 209 try: 210 ### will raise an error if configuration is incorrect / missing 211 conn = types[type](label=label, **kw) 212 connectors[type][label] = conn 213 except InvalidAttributesError as ie: 214 warn( 215 f"Incorrect attributes for connector '{type}:{label}'.\n" 216 + str(ie), 217 stack = False, 218 ) 219 conn = None 220 except Exception as e: 221 from meerschaum.utils.formatting import get_console 222 console = get_console() 223 if console: 224 console.print_exception() 225 warn( 226 f"Exception when creating connector '{type}:{label}'.\n" + str(e), 227 stack = False, 228 ) 229 conn = None 230 if conn is None: 231 return None 232 233 return connectors[type][label]
Return existing connector or create new connection and store for reuse.
You can create new connectors if enough parameters are provided for the given type and flavor.
Parameters
- type (Optional[str], default None):
Connector type (sql, api, etc.).
Defaults to the type of the configured
instance_connector
. - label (Optional[str], default None):
Connector label (e.g. main). Defaults to
'main'
. - refresh (bool, default False):
Refresh the Connector instance / construct new object. Defaults to
False
. - kw (Any):
Other arguments to pass to the Connector constructor.
If the Connector has already been constructed and new arguments are provided,
refresh
is set toTrue
and the old Connector is replaced.
Returns
- A new Meerschaum connector (e.g.
meerschaum.connectors.APIConnector
, meerschaum.connectors.SQLConnector
).
Examples
The following parameters would create a new
meerschaum.connectors.SQLConnector
that isn't in the configuration file.
>>> conn = get_connector(
... type = 'sql',
... label = 'newlabel',
... flavor = 'sqlite',
... database = '/file/path/to/database.db'
... )
>>>
82def get_config( 83 *keys: str, 84 patch: bool = True, 85 substitute: bool = True, 86 sync_files: bool = True, 87 write_missing: bool = True, 88 as_tuple: bool = False, 89 warn: bool = True, 90 debug: bool = False 91 ) -> Any: 92 """ 93 Return the Meerschaum configuration dictionary. 94 If positional arguments are provided, index by the keys. 95 Raises a warning if invalid keys are provided. 96 97 Parameters 98 ---------- 99 keys: str: 100 List of strings to index. 101 102 patch: bool, default True 103 If `True`, patch missing default keys into the config directory. 104 Defaults to `True`. 105 106 sync_files: bool, default True 107 If `True`, sync files if needed. 108 Defaults to `True`. 109 110 write_missing: bool, default True 111 If `True`, write default values when the main config files are missing. 112 Defaults to `True`. 113 114 substitute: bool, default True 115 If `True`, subsitute 'MRSM{}' values. 116 Defaults to `True`. 117 118 as_tuple: bool, default False 119 If `True`, return a tuple of type (success, value). 120 Defaults to `False`. 121 122 Returns 123 ------- 124 The value in the configuration directory, indexed by the provided keys. 125 126 Examples 127 -------- 128 >>> get_config('meerschaum', 'instance') 129 'sql:main' 130 >>> get_config('does', 'not', 'exist') 131 UserWarning: Invalid keys in config: ('does', 'not', 'exist') 132 """ 133 import json 134 135 symlinks_key = STATIC_CONFIG['config']['symlinks_key'] 136 if debug: 137 from meerschaum.utils.debug import dprint 138 dprint(f"Indexing keys: {keys}", color=False) 139 140 if len(keys) == 0: 141 _rc = _config(substitute=substitute, sync_files=sync_files, write_missing=write_missing) 142 if as_tuple: 143 return True, _rc 144 return _rc 145 146 ### Weird threading issues, only import if substitute is True. 147 if substitute: 148 from meerschaum.config._read_config import search_and_substitute_config 149 ### Invalidate the cache if it was read before with substitute=False 150 ### but there still exist substitutions. 151 if ( 152 config is not None and substitute and keys[0] != symlinks_key 153 and 'MRSM{' in json.dumps(config.get(keys[0])) 154 ): 155 try: 156 _subbed = search_and_substitute_config({keys[0]: config[keys[0]]}) 157 except Exception as e: 158 import traceback 159 traceback.print_exc() 160 config[keys[0]] = _subbed[keys[0]] 161 if symlinks_key in _subbed: 162 if symlinks_key not in config: 163 config[symlinks_key] = {} 164 if keys[0] not in config[symlinks_key]: 165 config[symlinks_key][keys[0]] = {} 166 config[symlinks_key][keys[0]] = apply_patch_to_config( 167 _subbed, 168 config[symlinks_key][keys[0]] 169 ) 170 171 from meerschaum.config._sync import sync_files as _sync_files 172 if config is None: 173 _config(*keys, sync_files=sync_files) 174 175 invalid_keys = False 176 if keys[0] not in config and keys[0] != symlinks_key: 177 single_key_config = read_config( 178 keys=[keys[0]], substitute=substitute, write_missing=write_missing 179 ) 180 if keys[0] not in single_key_config: 181 invalid_keys = True 182 else: 183 config[keys[0]] = single_key_config.get(keys[0], None) 184 if symlinks_key in single_key_config and keys[0] in single_key_config[symlinks_key]: 185 if symlinks_key not in config: 186 config[symlinks_key] = {} 187 config[symlinks_key][keys[0]] = single_key_config[symlinks_key][keys[0]] 188 189 if sync_files: 190 _sync_files(keys=[keys[0]]) 191 192 c = config 193 if len(keys) > 0: 194 for k in keys: 195 try: 196 c = c[k] 197 except Exception as e: 198 invalid_keys = True 199 break 200 if invalid_keys: 201 ### Check if the keys are in the default configuration. 202 from meerschaum.config._default import default_config 203 in_default = True 204 patched_default_config = ( 205 search_and_substitute_config(default_config) 206 if substitute else copy.deepcopy(default_config) 207 ) 208 _c = patched_default_config 209 for k in keys: 210 try: 211 _c = _c[k] 212 except Exception as e: 213 in_default = False 214 if in_default: 215 c = _c 216 invalid_keys = False 217 warning_msg = f"Invalid keys in config: {keys}" 218 if not in_default: 219 try: 220 if warn: 221 from meerschaum.utils.warnings import warn as _warn 222 _warn(warning_msg, stacklevel=3, color=False) 223 except Exception as e: 224 if warn: 225 print(warning_msg) 226 if as_tuple: 227 return False, None 228 return None 229 230 ### Don't write keys that we haven't yet loaded into memory. 231 not_loaded_keys = [k for k in patched_default_config if k not in config] 232 for k in not_loaded_keys: 233 patched_default_config.pop(k, None) 234 235 set_config( 236 apply_patch_to_config( 237 patched_default_config, 238 config, 239 ) 240 ) 241 if patch and keys[0] != symlinks_key: 242 if write_missing: 243 write_config(config, debug=debug) 244 245 if as_tuple: 246 return (not invalid_keys), c 247 return c
Return the Meerschaum configuration dictionary. If positional arguments are provided, index by the keys. Raises a warning if invalid keys are provided.
Parameters
- keys (str:): List of strings to index.
- patch (bool, default True):
If
True
, patch missing default keys into the config directory. Defaults toTrue
. - sync_files (bool, default True):
If
True
, sync files if needed. Defaults toTrue
. - write_missing (bool, default True):
If
True
, write default values when the main config files are missing. Defaults toTrue
. - substitute (bool, default True):
If
True
, subsitute 'MRSM{}' values. Defaults toTrue
. - as_tuple (bool, default False):
If
True
, return a tuple of type (success, value). Defaults toFalse
.
Returns
- The value in the configuration directory, indexed by the provided keys.
Examples
>>> get_config('meerschaum', 'instance')
'sql:main'
>>> get_config('does', 'not', 'exist')
UserWarning: Invalid keys in config: ('does', 'not', 'exist')
60class Pipe: 61 """ 62 Access Meerschaum pipes via Pipe objects. 63 64 Pipes are identified by the following: 65 66 1. Connector keys (e.g. `'sql:main'`) 67 2. Metric key (e.g. `'weather'`) 68 3. Location (optional; e.g. `None`) 69 70 A pipe's connector keys correspond to a data source, and when the pipe is synced, 71 its `fetch` definition is evaluated and executed to produce new data. 72 73 Alternatively, new data may be directly synced via `pipe.sync()`: 74 75 ``` 76 >>> from meerschaum import Pipe 77 >>> pipe = Pipe('csv', 'weather') 78 >>> 79 >>> import pandas as pd 80 >>> df = pd.read_csv('weather.csv') 81 >>> pipe.sync(df) 82 ``` 83 """ 84 85 from ._fetch import ( 86 fetch, 87 get_backtrack_interval, 88 ) 89 from ._data import ( 90 get_data, 91 get_backtrack_data, 92 get_rowcount, 93 _get_data_as_iterator, 94 get_chunk_interval, 95 get_chunk_bounds, 96 ) 97 from ._register import register 98 from ._attributes import ( 99 attributes, 100 parameters, 101 columns, 102 dtypes, 103 get_columns, 104 get_columns_types, 105 get_indices, 106 tags, 107 get_id, 108 id, 109 get_val_column, 110 parents, 111 children, 112 target, 113 _target_legacy, 114 guess_datetime, 115 ) 116 from ._show import show 117 from ._edit import edit, edit_definition, update 118 from ._sync import ( 119 sync, 120 get_sync_time, 121 exists, 122 filter_existing, 123 _get_chunk_label, 124 get_num_workers, 125 ) 126 from ._verify import ( 127 verify, 128 get_bound_interval, 129 get_bound_time, 130 ) 131 from ._delete import delete 132 from ._drop import drop 133 from ._clear import clear 134 from ._deduplicate import deduplicate 135 from ._bootstrap import bootstrap 136 from ._dtypes import enforce_dtypes, infer_dtypes 137 138 def __init__( 139 self, 140 connector: str = '', 141 metric: str = '', 142 location: Optional[str] = None, 143 parameters: Optional[Dict[str, Any]] = None, 144 columns: Union[Dict[str, str], List[str], None] = None, 145 tags: Optional[List[str]] = None, 146 target: Optional[str] = None, 147 dtypes: Optional[Dict[str, str]] = None, 148 instance: Optional[Union[str, InstanceConnector]] = None, 149 temporary: bool = False, 150 mrsm_instance: Optional[Union[str, InstanceConnector]] = None, 151 cache: bool = False, 152 debug: bool = False, 153 connector_keys: Optional[str] = None, 154 metric_key: Optional[str] = None, 155 location_key: Optional[str] = None, 156 ): 157 """ 158 Parameters 159 ---------- 160 connector: str 161 Keys for the pipe's source connector, e.g. `'sql:main'`. 162 163 metric: str 164 Label for the pipe's contents, e.g. `'weather'`. 165 166 location: str, default None 167 Label for the pipe's location. Defaults to `None`. 168 169 parameters: Optional[Dict[str, Any]], default None 170 Optionally set a pipe's parameters from the constructor, 171 e.g. columns and other attributes. 172 You can edit these parameters with `edit pipes`. 173 174 columns: Optional[Dict[str, str]], default None 175 Set the `columns` dictionary of `parameters`. 176 If `parameters` is also provided, this dictionary is added under the `'columns'` key. 177 178 tags: Optional[List[str]], default None 179 A list of strings to be added under the `'tags'` key of `parameters`. 180 You can select pipes with certain tags using `--tags`. 181 182 dtypes: Optional[Dict[str, str]], default None 183 Set the `dtypes` dictionary of `parameters`. 184 If `parameters` is also provided, this dictionary is added under the `'dtypes'` key. 185 186 mrsm_instance: Optional[Union[str, InstanceConnector]], default None 187 Connector for the Meerschaum instance where the pipe resides. 188 Defaults to the preconfigured default instance (`'sql:main'`). 189 190 instance: Optional[Union[str, InstanceConnector]], default None 191 Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored. 192 193 temporary: bool, default False 194 If `True`, prevent instance tables (pipes, users, plugins) from being created. 195 196 cache: bool, default False 197 If `True`, cache fetched data into a local database file. 198 Defaults to `False`. 199 """ 200 from meerschaum.utils.warnings import error, warn 201 if (not connector and not connector_keys) or (not metric and not metric_key): 202 error( 203 "Please provide strings for the connector and metric\n " 204 + "(first two positional arguments)." 205 ) 206 207 ### Fall back to legacy `location_key` just in case. 208 if not location: 209 location = location_key 210 211 if not connector: 212 connector = connector_keys 213 214 if not metric: 215 metric = metric_key 216 217 if location in ('[None]', 'None'): 218 location = None 219 220 from meerschaum.config.static import STATIC_CONFIG 221 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 222 for k in (connector, metric, location, *(tags or [])): 223 if str(k).startswith(negation_prefix): 224 error(f"A pipe's keys and tags cannot start with the prefix '{negation_prefix}'.") 225 226 self.connector_keys = str(connector) 227 self.connector_key = self.connector_keys ### Alias 228 self.metric_key = metric 229 self.location_key = location 230 self.temporary = temporary 231 232 self._attributes = { 233 'connector_keys': self.connector_keys, 234 'metric_key': self.metric_key, 235 'location_key': self.location_key, 236 'parameters': {}, 237 } 238 239 ### only set parameters if values are provided 240 if isinstance(parameters, dict): 241 self._attributes['parameters'] = parameters 242 else: 243 if parameters is not None: 244 warn(f"The provided parameters are of invalid type '{type(parameters)}'.") 245 self._attributes['parameters'] = {} 246 247 columns = columns or self._attributes.get('parameters', {}).get('columns', {}) 248 if isinstance(columns, list): 249 columns = {str(col): str(col) for col in columns} 250 if isinstance(columns, dict): 251 self._attributes['parameters']['columns'] = columns 252 elif columns is not None: 253 warn(f"The provided columns are of invalid type '{type(columns)}'.") 254 255 if isinstance(tags, (list, tuple)): 256 self._attributes['parameters']['tags'] = tags 257 elif tags is not None: 258 warn(f"The provided tags are of invalid type '{type(tags)}'.") 259 260 if isinstance(target, str): 261 self._attributes['parameters']['target'] = target 262 elif target is not None: 263 warn(f"The provided target is of invalid type '{type(target)}'.") 264 265 if isinstance(dtypes, dict): 266 self._attributes['parameters']['dtypes'] = dtypes 267 elif dtypes is not None: 268 warn(f"The provided dtypes are of invalid type '{type(dtypes)}'.") 269 270 ### NOTE: The parameters dictionary is {} by default. 271 ### A Pipe may be registered without parameters, then edited, 272 ### or a Pipe may be registered with parameters set in-memory first. 273 # from meerschaum.config import get_config 274 _mrsm_instance = mrsm_instance if mrsm_instance is not None else instance 275 if _mrsm_instance is None: 276 _mrsm_instance = get_config('meerschaum', 'instance', patch=True) 277 278 if not isinstance(_mrsm_instance, str): 279 self._instance_connector = _mrsm_instance 280 self.instance_keys = str(_mrsm_instance) 281 else: ### NOTE: must be SQL or API Connector for this work 282 self.instance_keys = _mrsm_instance 283 284 self._cache = cache and get_config('system', 'experimental', 'cache') 285 286 287 @property 288 def meta(self): 289 """ 290 Return the four keys needed to reconstruct this pipe. 291 """ 292 return { 293 'connector': self.connector_keys, 294 'metric': self.metric_key, 295 'location': self.location_key, 296 'instance': self.instance_keys, 297 } 298 299 300 def keys(self) -> List[str]: 301 """ 302 Return the ordered keys for this pipe. 303 """ 304 return { 305 key: val 306 for key, val in self.meta.items() 307 if key != 'instance' 308 } 309 310 311 @property 312 def instance_connector(self) -> Union[InstanceConnector, None]: 313 """ 314 The connector to where this pipe resides. 315 May either be of type `meerschaum.connectors.sql.SQLConnector` or 316 `meerschaum.connectors.api.APIConnector`. 317 """ 318 if '_instance_connector' not in self.__dict__: 319 from meerschaum.connectors.parse import parse_instance_keys 320 conn = parse_instance_keys(self.instance_keys) 321 if conn: 322 self._instance_connector = conn 323 else: 324 return None 325 return self._instance_connector 326 327 @property 328 def connector(self) -> Union[meerschaum.connectors.Connector, None]: 329 """ 330 The connector to the data source. 331 """ 332 if '_connector' not in self.__dict__: 333 from meerschaum.connectors.parse import parse_instance_keys 334 import warnings 335 with warnings.catch_warnings(): 336 warnings.simplefilter('ignore') 337 try: 338 conn = parse_instance_keys(self.connector_keys) 339 except Exception as e: 340 conn = None 341 if conn: 342 self._connector = conn 343 else: 344 return None 345 return self._connector 346 347 348 @property 349 def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]: 350 """ 351 If the pipe was created with `cache=True`, return the connector to the pipe's 352 SQLite database for caching. 353 """ 354 if not self._cache: 355 return None 356 357 if '_cache_connector' not in self.__dict__: 358 from meerschaum.connectors import get_connector 359 from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH 360 _resources_path = SQLITE_RESOURCES_PATH 361 self._cache_connector = get_connector( 362 'sql', '_cache_' + str(self), 363 flavor='sqlite', 364 database=str(_resources_path / ('_cache_' + str(self) + '.db')), 365 ) 366 367 return self._cache_connector 368 369 370 @property 371 def cache_pipe(self) -> Union['meerschaum.Pipe', None]: 372 """ 373 If the pipe was created with `cache=True`, return another `meerschaum.Pipe` used to 374 manage the local data. 375 """ 376 if self.cache_connector is None: 377 return None 378 if '_cache_pipe' not in self.__dict__: 379 from meerschaum.config._patch import apply_patch_to_config 380 from meerschaum.utils.sql import sql_item_name 381 _parameters = copy.deepcopy(self.parameters) 382 _fetch_patch = { 383 'fetch': ({ 384 'definition': ( 385 f"SELECT * FROM " 386 + sql_item_name( 387 str(self.target), 388 self.instance_connector.flavor, 389 self.instance_connector.get_pipe_schema(self), 390 ) 391 ), 392 }) if self.instance_connector.type == 'sql' else ({ 393 'connector_keys': self.connector_keys, 394 'metric_key': self.metric_key, 395 'location_key': self.location_key, 396 }) 397 } 398 _parameters = apply_patch_to_config(_parameters, _fetch_patch) 399 self._cache_pipe = Pipe( 400 self.instance_keys, 401 (self.connector_keys + '_' + self.metric_key + '_cache'), 402 self.location_key, 403 mrsm_instance = self.cache_connector, 404 parameters = _parameters, 405 cache = False, 406 temporary = True, 407 ) 408 409 return self._cache_pipe 410 411 412 def __str__(self, ansi: bool=False): 413 return pipe_repr(self, ansi=ansi) 414 415 416 def __eq__(self, other): 417 try: 418 return ( 419 isinstance(self, type(other)) 420 and self.connector_keys == other.connector_keys 421 and self.metric_key == other.metric_key 422 and self.location_key == other.location_key 423 and self.instance_keys == other.instance_keys 424 ) 425 except Exception as e: 426 return False 427 428 def __hash__(self): 429 ### Using an esoteric separator to avoid collisions. 430 sep = "[\"']" 431 return hash( 432 str(self.connector_keys) + sep 433 + str(self.metric_key) + sep 434 + str(self.location_key) + sep 435 + str(self.instance_keys) + sep 436 ) 437 438 def __repr__(self, ansi: bool=True, **kw) -> str: 439 if not hasattr(sys, 'ps1'): 440 ansi = False 441 442 return pipe_repr(self, ansi=ansi, **kw) 443 444 def __pt_repr__(self): 445 from meerschaum.utils.packages import attempt_import 446 prompt_toolkit_formatted_text = attempt_import('prompt_toolkit.formatted_text', lazy=False) 447 return prompt_toolkit_formatted_text.ANSI(pipe_repr(self, ansi=True)) 448 449 def __getstate__(self) -> Dict[str, Any]: 450 """ 451 Define the state dictionary (pickling). 452 """ 453 return { 454 'connector': self.connector_keys, 455 'metric': self.metric_key, 456 'location': self.location_key, 457 'parameters': self.parameters, 458 'instance': self.instance_keys, 459 } 460 461 def __setstate__(self, _state: Dict[str, Any]): 462 """ 463 Read the state (unpickling). 464 """ 465 self.__init__(**_state) 466 467 468 def __getitem__(self, key: str) -> Any: 469 """ 470 Index the pipe's attributes. 471 If the `key` cannot be found`, return `None`. 472 """ 473 if key in self.attributes: 474 return self.attributes.get(key, None) 475 476 aliases = { 477 'connector': 'connector_keys', 478 'connector_key': 'connector_keys', 479 'metric': 'metric_key', 480 'location': 'location_key', 481 } 482 aliased_key = aliases.get(key, None) 483 if aliased_key is not None: 484 return self.attributes.get(aliased_key, None) 485 486 property_aliases = { 487 'instance': 'instance_keys', 488 'instance_key': 'instance_keys', 489 } 490 aliased_key = property_aliases.get(key, None) 491 if aliased_key is not None: 492 key = aliased_key 493 return getattr(self, key, None)
Access Meerschaum pipes via Pipe objects.
Pipes are identified by the following:
- Connector keys (e.g.
'sql:main'
) - Metric key (e.g.
'weather'
) - Location (optional; e.g.
None
)
A pipe's connector keys correspond to a data source, and when the pipe is synced,
its fetch
definition is evaluated and executed to produce new data.
Alternatively, new data may be directly synced via pipe.sync()
:
>>> from meerschaum import Pipe
>>> pipe = Pipe('csv', 'weather')
>>>
>>> import pandas as pd
>>> df = pd.read_csv('weather.csv')
>>> pipe.sync(df)
138 def __init__( 139 self, 140 connector: str = '', 141 metric: str = '', 142 location: Optional[str] = None, 143 parameters: Optional[Dict[str, Any]] = None, 144 columns: Union[Dict[str, str], List[str], None] = None, 145 tags: Optional[List[str]] = None, 146 target: Optional[str] = None, 147 dtypes: Optional[Dict[str, str]] = None, 148 instance: Optional[Union[str, InstanceConnector]] = None, 149 temporary: bool = False, 150 mrsm_instance: Optional[Union[str, InstanceConnector]] = None, 151 cache: bool = False, 152 debug: bool = False, 153 connector_keys: Optional[str] = None, 154 metric_key: Optional[str] = None, 155 location_key: Optional[str] = None, 156 ): 157 """ 158 Parameters 159 ---------- 160 connector: str 161 Keys for the pipe's source connector, e.g. `'sql:main'`. 162 163 metric: str 164 Label for the pipe's contents, e.g. `'weather'`. 165 166 location: str, default None 167 Label for the pipe's location. Defaults to `None`. 168 169 parameters: Optional[Dict[str, Any]], default None 170 Optionally set a pipe's parameters from the constructor, 171 e.g. columns and other attributes. 172 You can edit these parameters with `edit pipes`. 173 174 columns: Optional[Dict[str, str]], default None 175 Set the `columns` dictionary of `parameters`. 176 If `parameters` is also provided, this dictionary is added under the `'columns'` key. 177 178 tags: Optional[List[str]], default None 179 A list of strings to be added under the `'tags'` key of `parameters`. 180 You can select pipes with certain tags using `--tags`. 181 182 dtypes: Optional[Dict[str, str]], default None 183 Set the `dtypes` dictionary of `parameters`. 184 If `parameters` is also provided, this dictionary is added under the `'dtypes'` key. 185 186 mrsm_instance: Optional[Union[str, InstanceConnector]], default None 187 Connector for the Meerschaum instance where the pipe resides. 188 Defaults to the preconfigured default instance (`'sql:main'`). 189 190 instance: Optional[Union[str, InstanceConnector]], default None 191 Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored. 192 193 temporary: bool, default False 194 If `True`, prevent instance tables (pipes, users, plugins) from being created. 195 196 cache: bool, default False 197 If `True`, cache fetched data into a local database file. 198 Defaults to `False`. 199 """ 200 from meerschaum.utils.warnings import error, warn 201 if (not connector and not connector_keys) or (not metric and not metric_key): 202 error( 203 "Please provide strings for the connector and metric\n " 204 + "(first two positional arguments)." 205 ) 206 207 ### Fall back to legacy `location_key` just in case. 208 if not location: 209 location = location_key 210 211 if not connector: 212 connector = connector_keys 213 214 if not metric: 215 metric = metric_key 216 217 if location in ('[None]', 'None'): 218 location = None 219 220 from meerschaum.config.static import STATIC_CONFIG 221 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 222 for k in (connector, metric, location, *(tags or [])): 223 if str(k).startswith(negation_prefix): 224 error(f"A pipe's keys and tags cannot start with the prefix '{negation_prefix}'.") 225 226 self.connector_keys = str(connector) 227 self.connector_key = self.connector_keys ### Alias 228 self.metric_key = metric 229 self.location_key = location 230 self.temporary = temporary 231 232 self._attributes = { 233 'connector_keys': self.connector_keys, 234 'metric_key': self.metric_key, 235 'location_key': self.location_key, 236 'parameters': {}, 237 } 238 239 ### only set parameters if values are provided 240 if isinstance(parameters, dict): 241 self._attributes['parameters'] = parameters 242 else: 243 if parameters is not None: 244 warn(f"The provided parameters are of invalid type '{type(parameters)}'.") 245 self._attributes['parameters'] = {} 246 247 columns = columns or self._attributes.get('parameters', {}).get('columns', {}) 248 if isinstance(columns, list): 249 columns = {str(col): str(col) for col in columns} 250 if isinstance(columns, dict): 251 self._attributes['parameters']['columns'] = columns 252 elif columns is not None: 253 warn(f"The provided columns are of invalid type '{type(columns)}'.") 254 255 if isinstance(tags, (list, tuple)): 256 self._attributes['parameters']['tags'] = tags 257 elif tags is not None: 258 warn(f"The provided tags are of invalid type '{type(tags)}'.") 259 260 if isinstance(target, str): 261 self._attributes['parameters']['target'] = target 262 elif target is not None: 263 warn(f"The provided target is of invalid type '{type(target)}'.") 264 265 if isinstance(dtypes, dict): 266 self._attributes['parameters']['dtypes'] = dtypes 267 elif dtypes is not None: 268 warn(f"The provided dtypes are of invalid type '{type(dtypes)}'.") 269 270 ### NOTE: The parameters dictionary is {} by default. 271 ### A Pipe may be registered without parameters, then edited, 272 ### or a Pipe may be registered with parameters set in-memory first. 273 # from meerschaum.config import get_config 274 _mrsm_instance = mrsm_instance if mrsm_instance is not None else instance 275 if _mrsm_instance is None: 276 _mrsm_instance = get_config('meerschaum', 'instance', patch=True) 277 278 if not isinstance(_mrsm_instance, str): 279 self._instance_connector = _mrsm_instance 280 self.instance_keys = str(_mrsm_instance) 281 else: ### NOTE: must be SQL or API Connector for this work 282 self.instance_keys = _mrsm_instance 283 284 self._cache = cache and get_config('system', 'experimental', 'cache')
Parameters
- connector (str):
Keys for the pipe's source connector, e.g.
'sql:main'
. - metric (str):
Label for the pipe's contents, e.g.
'weather'
. - location (str, default None):
Label for the pipe's location. Defaults to
None
. - parameters (Optional[Dict[str, Any]], default None):
Optionally set a pipe's parameters from the constructor,
e.g. columns and other attributes.
You can edit these parameters with
edit pipes
. - columns (Optional[Dict[str, str]], default None):
Set the
columns
dictionary ofparameters
. Ifparameters
is also provided, this dictionary is added under the'columns'
key. - tags (Optional[List[str]], default None):
A list of strings to be added under the
'tags'
key ofparameters
. You can select pipes with certain tags using--tags
. - dtypes (Optional[Dict[str, str]], default None):
Set the
dtypes
dictionary ofparameters
. Ifparameters
is also provided, this dictionary is added under the'dtypes'
key. - mrsm_instance (Optional[Union[str, InstanceConnector]], default None):
Connector for the Meerschaum instance where the pipe resides.
Defaults to the preconfigured default instance (
'sql:main'
). - instance (Optional[Union[str, InstanceConnector]], default None):
Alias for
mrsm_instance
. Ifmrsm_instance
is supplied, this value is ignored. - temporary (bool, default False):
If
True
, prevent instance tables (pipes, users, plugins) from being created. - cache (bool, default False):
If
True
, cache fetched data into a local database file. Defaults toFalse
.
287 @property 288 def meta(self): 289 """ 290 Return the four keys needed to reconstruct this pipe. 291 """ 292 return { 293 'connector': self.connector_keys, 294 'metric': self.metric_key, 295 'location': self.location_key, 296 'instance': self.instance_keys, 297 }
Return the four keys needed to reconstruct this pipe.
300 def keys(self) -> List[str]: 301 """ 302 Return the ordered keys for this pipe. 303 """ 304 return { 305 key: val 306 for key, val in self.meta.items() 307 if key != 'instance' 308 }
Return the ordered keys for this pipe.
311 @property 312 def instance_connector(self) -> Union[InstanceConnector, None]: 313 """ 314 The connector to where this pipe resides. 315 May either be of type `meerschaum.connectors.sql.SQLConnector` or 316 `meerschaum.connectors.api.APIConnector`. 317 """ 318 if '_instance_connector' not in self.__dict__: 319 from meerschaum.connectors.parse import parse_instance_keys 320 conn = parse_instance_keys(self.instance_keys) 321 if conn: 322 self._instance_connector = conn 323 else: 324 return None 325 return self._instance_connector
The connector to where this pipe resides.
May either be of type meerschaum.connectors.SQLConnector
or
meerschaum.connectors.APIConnector
.
327 @property 328 def connector(self) -> Union[meerschaum.connectors.Connector, None]: 329 """ 330 The connector to the data source. 331 """ 332 if '_connector' not in self.__dict__: 333 from meerschaum.connectors.parse import parse_instance_keys 334 import warnings 335 with warnings.catch_warnings(): 336 warnings.simplefilter('ignore') 337 try: 338 conn = parse_instance_keys(self.connector_keys) 339 except Exception as e: 340 conn = None 341 if conn: 342 self._connector = conn 343 else: 344 return None 345 return self._connector
The connector to the data source.
348 @property 349 def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]: 350 """ 351 If the pipe was created with `cache=True`, return the connector to the pipe's 352 SQLite database for caching. 353 """ 354 if not self._cache: 355 return None 356 357 if '_cache_connector' not in self.__dict__: 358 from meerschaum.connectors import get_connector 359 from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH 360 _resources_path = SQLITE_RESOURCES_PATH 361 self._cache_connector = get_connector( 362 'sql', '_cache_' + str(self), 363 flavor='sqlite', 364 database=str(_resources_path / ('_cache_' + str(self) + '.db')), 365 ) 366 367 return self._cache_connector
If the pipe was created with cache=True
, return the connector to the pipe's
SQLite database for caching.
370 @property 371 def cache_pipe(self) -> Union['meerschaum.Pipe', None]: 372 """ 373 If the pipe was created with `cache=True`, return another `meerschaum.Pipe` used to 374 manage the local data. 375 """ 376 if self.cache_connector is None: 377 return None 378 if '_cache_pipe' not in self.__dict__: 379 from meerschaum.config._patch import apply_patch_to_config 380 from meerschaum.utils.sql import sql_item_name 381 _parameters = copy.deepcopy(self.parameters) 382 _fetch_patch = { 383 'fetch': ({ 384 'definition': ( 385 f"SELECT * FROM " 386 + sql_item_name( 387 str(self.target), 388 self.instance_connector.flavor, 389 self.instance_connector.get_pipe_schema(self), 390 ) 391 ), 392 }) if self.instance_connector.type == 'sql' else ({ 393 'connector_keys': self.connector_keys, 394 'metric_key': self.metric_key, 395 'location_key': self.location_key, 396 }) 397 } 398 _parameters = apply_patch_to_config(_parameters, _fetch_patch) 399 self._cache_pipe = Pipe( 400 self.instance_keys, 401 (self.connector_keys + '_' + self.metric_key + '_cache'), 402 self.location_key, 403 mrsm_instance = self.cache_connector, 404 parameters = _parameters, 405 cache = False, 406 temporary = True, 407 ) 408 409 return self._cache_pipe
If the pipe was created with cache=True
, return another Pipe
used to
manage the local data.
21def fetch( 22 self, 23 begin: Union[datetime, str, None] = '', 24 end: Optional[datetime] = None, 25 check_existing: bool = True, 26 sync_chunks: bool = False, 27 debug: bool = False, 28 **kw: Any 29 ) -> Union['pd.DataFrame', Iterator['pd.DataFrame'], None]: 30 """ 31 Fetch a Pipe's latest data from its connector. 32 33 Parameters 34 ---------- 35 begin: Union[datetime, str, None], default '': 36 If provided, only fetch data newer than or equal to `begin`. 37 38 end: Optional[datetime], default None: 39 If provided, only fetch data older than or equal to `end`. 40 41 check_existing: bool, default True 42 If `False`, do not apply the backtrack interval. 43 44 sync_chunks: bool, default False 45 If `True` and the pipe's connector is of type `'sql'`, begin syncing chunks while fetching 46 loads chunks into memory. 47 48 debug: bool, default False 49 Verbosity toggle. 50 51 Returns 52 ------- 53 A `pd.DataFrame` of the newest unseen data. 54 55 """ 56 if 'fetch' not in dir(self.connector): 57 warn(f"No `fetch()` function defined for connector '{self.connector}'") 58 return None 59 60 from meerschaum.connectors import custom_types, get_connector_plugin 61 from meerschaum.utils.debug import dprint, _checkpoint 62 from meerschaum.utils.misc import filter_arguments 63 64 _chunk_hook = kw.pop('chunk_hook', None) 65 kw['workers'] = self.get_num_workers(kw.get('workers', None)) 66 if sync_chunks and _chunk_hook is None: 67 68 def _chunk_hook(chunk, **_kw) -> SuccessTuple: 69 """ 70 Wrap `Pipe.sync()` with a custom chunk label prepended to the message. 71 """ 72 from meerschaum.config._patch import apply_patch_to_config 73 kwargs = apply_patch_to_config(kw, _kw) 74 chunk_success, chunk_message = self.sync(chunk, **kwargs) 75 chunk_label = self._get_chunk_label(chunk, self.columns.get('datetime', None)) 76 if chunk_label: 77 chunk_message = '\n' + chunk_label + '\n' + chunk_message 78 return chunk_success, chunk_message 79 80 with mrsm.Venv(get_connector_plugin(self.connector)): 81 _args, _kwargs = filter_arguments( 82 self.connector.fetch, 83 self, 84 begin=_determine_begin( 85 self, 86 begin, 87 check_existing=check_existing, 88 debug=debug, 89 ), 90 end=end, 91 chunk_hook=_chunk_hook, 92 debug=debug, 93 **kw 94 ) 95 df = self.connector.fetch(*_args, **_kwargs) 96 return df
Fetch a Pipe's latest data from its connector.
Parameters
- begin (Union[datetime, str, None], default '':):
If provided, only fetch data newer than or equal to
begin
. - end (Optional[datetime], default None:):
If provided, only fetch data older than or equal to
end
. - check_existing (bool, default True):
If
False
, do not apply the backtrack interval. - sync_chunks (bool, default False):
If
True
and the pipe's connector is of type'sql'
, begin syncing chunks while fetching loads chunks into memory. - debug (bool, default False): Verbosity toggle.
Returns
- A
pd.DataFrame
of the newest unseen data.
99def get_backtrack_interval( 100 self, 101 check_existing: bool = True, 102 debug: bool = False, 103) -> Union[timedelta, int]: 104 """ 105 Get the chunk interval to use for this pipe. 106 107 Parameters 108 ---------- 109 check_existing: bool, default True 110 If `False`, return a backtrack_interval of 0 minutes. 111 112 Returns 113 ------- 114 The backtrack interval (`timedelta` or `int`) to use with this pipe's `datetime` axis. 115 """ 116 default_backtrack_minutes = get_config('pipes', 'parameters', 'fetch', 'backtrack_minutes') 117 configured_backtrack_minutes = self.parameters.get('fetch', {}).get('backtrack_minutes', None) 118 backtrack_minutes = ( 119 configured_backtrack_minutes 120 if configured_backtrack_minutes is not None 121 else default_backtrack_minutes 122 ) if check_existing else 0 123 124 backtrack_interval = timedelta(minutes=backtrack_minutes) 125 dt_col = self.columns.get('datetime', None) 126 if dt_col is None: 127 return backtrack_interval 128 129 dt_dtype = self.dtypes.get(dt_col, 'datetime64[ns]') 130 if 'int' in dt_dtype.lower(): 131 return backtrack_minutes 132 133 return backtrack_interval
Get the chunk interval to use for this pipe.
Parameters
- check_existing (bool, default True):
If
False
, return a backtrack_interval of 0 minutes.
Returns
- The backtrack interval (
timedelta
orint
) to use with this pipe'sdatetime
axis.
23def get_data( 24 self, 25 select_columns: Optional[List[str]] = None, 26 omit_columns: Optional[List[str]] = None, 27 begin: Union[datetime, int, None] = None, 28 end: Union[datetime, int, None] = None, 29 params: Optional[Dict[str, Any]] = None, 30 as_iterator: bool = False, 31 as_chunks: bool = False, 32 as_dask: bool = False, 33 chunk_interval: Union[timedelta, int, None] = None, 34 fresh: bool = False, 35 debug: bool = False, 36 **kw: Any 37) -> Union['pd.DataFrame', Iterator['pd.DataFrame'], None]: 38 """ 39 Get a pipe's data from the instance connector. 40 41 Parameters 42 ---------- 43 select_columns: Optional[List[str]], default None 44 If provided, only select these given columns. 45 Otherwise select all available columns (i.e. `SELECT *`). 46 47 omit_columns: Optional[List[str]], default None 48 If provided, remove these columns from the selection. 49 50 begin: Union[datetime, int, None], default None 51 Lower bound datetime to begin searching for data (inclusive). 52 Translates to a `WHERE` clause like `WHERE datetime >= begin`. 53 Defaults to `None`. 54 55 end: Union[datetime, int, None], default None 56 Upper bound datetime to stop searching for data (inclusive). 57 Translates to a `WHERE` clause like `WHERE datetime < end`. 58 Defaults to `None`. 59 60 params: Optional[Dict[str, Any]], default None 61 Filter the retrieved data by a dictionary of parameters. 62 See `meerschaum.utils.sql.build_where` for more details. 63 64 as_iterator: bool, default False 65 If `True`, return a generator of chunks of pipe data. 66 67 as_chunks: bool, default False 68 Alias for `as_iterator`. 69 70 as_dask: bool, default False 71 If `True`, return a `dask.DataFrame` 72 (which may be loaded into a Pandas DataFrame with `df.compute()`). 73 74 chunk_interval: Union[timedelta, int, None], default None 75 If `as_iterator`, then return chunks with `begin` and `end` separated by this interval. 76 This may be set under `pipe.parameters['chunk_minutes']`. 77 By default, use a timedelta of 1440 minutes (1 day). 78 If `chunk_interval` is an integer and the `datetime` axis a timestamp, 79 the use a timedelta with the number of minutes configured to this value. 80 If the `datetime` axis is an integer, default to the configured chunksize. 81 If `chunk_interval` is a `timedelta` and the `datetime` axis an integer, 82 use the number of minutes in the `timedelta`. 83 84 fresh: bool, default True 85 If `True`, skip local cache and directly query the instance connector. 86 Defaults to `True`. 87 88 debug: bool, default False 89 Verbosity toggle. 90 Defaults to `False`. 91 92 Returns 93 ------- 94 A `pd.DataFrame` for the pipe's data corresponding to the provided parameters. 95 96 """ 97 from meerschaum.utils.warnings import warn 98 from meerschaum.utils.venv import Venv 99 from meerschaum.connectors import get_connector_plugin 100 from meerschaum.utils.misc import iterate_chunks, items_str 101 from meerschaum.utils.dtypes import to_pandas_dtype 102 from meerschaum.utils.dataframe import add_missing_cols_to_df 103 from meerschaum.utils.packages import attempt_import 104 dd = attempt_import('dask.dataframe') if as_dask else None 105 dask = attempt_import('dask') if as_dask else None 106 107 if select_columns == '*': 108 select_columns = None 109 elif isinstance(select_columns, str): 110 select_columns = [select_columns] 111 112 if isinstance(omit_columns, str): 113 omit_columns = [omit_columns] 114 115 as_iterator = as_iterator or as_chunks 116 117 if as_iterator or as_chunks: 118 return self._get_data_as_iterator( 119 select_columns = select_columns, 120 omit_columns = omit_columns, 121 begin = begin, 122 end = end, 123 params = params, 124 chunk_interval = chunk_interval, 125 fresh = fresh, 126 debug = debug, 127 ) 128 129 if as_dask: 130 from multiprocessing.pool import ThreadPool 131 dask_pool = ThreadPool(self.get_num_workers()) 132 dask.config.set(pool=dask_pool) 133 chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug) 134 bounds = self.get_chunk_bounds( 135 begin = begin, 136 end = end, 137 bounded = False, 138 chunk_interval = chunk_interval, 139 debug = debug, 140 ) 141 dask_chunks = [ 142 dask.delayed(self.get_data)( 143 select_columns = select_columns, 144 omit_columns = omit_columns, 145 begin = chunk_begin, 146 end = chunk_end, 147 params = params, 148 chunk_interval = chunk_interval, 149 fresh = fresh, 150 debug = debug, 151 ) 152 for (chunk_begin, chunk_end) in bounds 153 ] 154 dask_meta = { 155 col: to_pandas_dtype(typ) 156 for col, typ in self.dtypes.items() 157 } 158 return dd.from_delayed(dask_chunks, meta=dask_meta) 159 160 if not self.exists(debug=debug): 161 return None 162 163 if self.cache_pipe is not None: 164 if not fresh: 165 _sync_cache_tuple = self.cache_pipe.sync( 166 begin = begin, 167 end = end, 168 params = params, 169 debug = debug, 170 **kw 171 ) 172 if not _sync_cache_tuple[0]: 173 warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1]) 174 fresh = True 175 else: ### Successfully synced cache. 176 return self.enforce_dtypes( 177 self.cache_pipe.get_data( 178 select_columns = select_columns, 179 omit_columns = omit_columns, 180 begin = begin, 181 end = end, 182 params = params, 183 debug = debug, 184 fresh = True, 185 **kw 186 ), 187 debug = debug, 188 ) 189 190 with Venv(get_connector_plugin(self.instance_connector)): 191 df = self.instance_connector.get_pipe_data( 192 pipe = self, 193 select_columns = select_columns, 194 omit_columns = omit_columns, 195 begin = begin, 196 end = end, 197 params = params, 198 debug = debug, 199 **kw 200 ) 201 if df is None: 202 return df 203 204 if not select_columns: 205 select_columns = [col for col in df.columns] 206 207 cols_to_omit = [ 208 col 209 for col in df.columns 210 if ( 211 col in (omit_columns or []) 212 or 213 col not in (select_columns or []) 214 ) 215 ] 216 cols_to_add = [ 217 col 218 for col in select_columns 219 if col not in df.columns 220 ] 221 if cols_to_omit: 222 warn( 223 ( 224 f"Received {len(cols_to_omit)} omitted column" 225 + ('s' if len(cols_to_omit) != 1 else '') 226 + f" for {self}. " 227 + "Consider adding `select_columns` and `omit_columns` support to " 228 + f"'{self.instance_connector.type}' connectors to improve performance." 229 ), 230 stack = False, 231 ) 232 _cols_to_select = [col for col in df.columns if col not in cols_to_omit] 233 df = df[_cols_to_select] 234 235 if cols_to_add: 236 warn( 237 ( 238 f"Specified columns {items_str(cols_to_add)} were not found on {self}. " 239 + "Adding these to the DataFrame as null columns." 240 ), 241 stack = False, 242 ) 243 df = add_missing_cols_to_df(df, {col: 'string' for col in cols_to_add}) 244 245 return self.enforce_dtypes(df, debug=debug)
Get a pipe's data from the instance connector.
Parameters
- select_columns (Optional[List[str]], default None):
If provided, only select these given columns.
Otherwise select all available columns (i.e.
SELECT *
). - omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
- begin (Union[datetime, int, None], default None):
Lower bound datetime to begin searching for data (inclusive).
Translates to a
WHERE
clause likeWHERE datetime >= begin
. Defaults toNone
. - end (Union[datetime, int, None], default None):
Upper bound datetime to stop searching for data (inclusive).
Translates to a
WHERE
clause likeWHERE datetime < end
. Defaults toNone
. - params (Optional[Dict[str, Any]], default None):
Filter the retrieved data by a dictionary of parameters.
See
meerschaum.utils.sql.build_where
for more details. - as_iterator (bool, default False):
If
True
, return a generator of chunks of pipe data. - as_chunks (bool, default False):
Alias for
as_iterator
. - as_dask (bool, default False):
If
True
, return adask.DataFrame
(which may be loaded into a Pandas DataFrame withdf.compute()
). - chunk_interval (Union[timedelta, int, None], default None):
If
as_iterator
, then return chunks withbegin
andend
separated by this interval. This may be set underpipe.parameters['chunk_minutes']
. By default, use a timedelta of 1440 minutes (1 day). Ifchunk_interval
is an integer and thedatetime
axis a timestamp, the use a timedelta with the number of minutes configured to this value. If thedatetime
axis is an integer, default to the configured chunksize. Ifchunk_interval
is atimedelta
and thedatetime
axis an integer, use the number of minutes in thetimedelta
. - fresh (bool, default True):
If
True
, skip local cache and directly query the instance connector. Defaults toTrue
. - debug (bool, default False):
Verbosity toggle.
Defaults to
False
.
Returns
- A
pd.DataFrame
for the pipe's data corresponding to the provided parameters.
340def get_backtrack_data( 341 self, 342 backtrack_minutes: Optional[int] = None, 343 begin: Union[datetime, int, None] = None, 344 params: Optional[Dict[str, Any]] = None, 345 fresh: bool = False, 346 debug: bool = False, 347 **kw: Any 348 ) -> Optional['pd.DataFrame']: 349 """ 350 Get the most recent data from the instance connector as a Pandas DataFrame. 351 352 Parameters 353 ---------- 354 backtrack_minutes: Optional[int], default None 355 How many minutes from `begin` to select from. 356 If `None`, use `pipe.parameters['fetch']['backtrack_minutes']`. 357 358 begin: Optional[datetime], default None 359 The starting point to search for data. 360 If begin is `None` (default), use the most recent observed datetime 361 (AKA sync_time). 362 363 ``` 364 E.g. begin = 02:00 365 366 Search this region. Ignore this, even if there's data. 367 / / / / / / / / / | 368 -----|----------|----------|----------|----------|----------| 369 00:00 01:00 02:00 03:00 04:00 05:00 370 371 ``` 372 373 params: Optional[Dict[str, Any]], default None 374 The standard Meerschaum `params` query dictionary. 375 376 377 fresh: bool, default False 378 If `True`, Ignore local cache and pull directly from the instance connector. 379 Only comes into effect if a pipe was created with `cache=True`. 380 381 debug: bool default False 382 Verbosity toggle. 383 384 Returns 385 ------- 386 A `pd.DataFrame` for the pipe's data corresponding to the provided parameters. Backtrack data 387 is a convenient way to get a pipe's data "backtracked" from the most recent datetime. 388 """ 389 from meerschaum.utils.warnings import warn 390 from meerschaum.utils.venv import Venv 391 from meerschaum.connectors import get_connector_plugin 392 393 if not self.exists(debug=debug): 394 return None 395 396 backtrack_interval = self.get_backtrack_interval(debug=debug) 397 if backtrack_minutes is None: 398 backtrack_minutes = ( 399 (backtrack_interval.total_seconds() * 60) 400 if isinstance(backtrack_interval, timedelta) 401 else backtrack_interval 402 ) 403 404 if self.cache_pipe is not None: 405 if not fresh: 406 _sync_cache_tuple = self.cache_pipe.sync(begin=begin, params=params, debug=debug, **kw) 407 if not _sync_cache_tuple[0]: 408 warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1]) 409 fresh = True 410 else: ### Successfully synced cache. 411 return self.enforce_dtypes( 412 self.cache_pipe.get_backtrack_data( 413 fresh = True, 414 begin = begin, 415 backtrack_minutes = backtrack_minutes, 416 params = params, 417 debug = deubg, 418 **kw 419 ), 420 debug = debug, 421 ) 422 423 if hasattr(self.instance_connector, 'get_backtrack_data'): 424 with Venv(get_connector_plugin(self.instance_connector)): 425 return self.enforce_dtypes( 426 self.instance_connector.get_backtrack_data( 427 pipe = self, 428 begin = begin, 429 backtrack_minutes = backtrack_minutes, 430 params = params, 431 debug = debug, 432 **kw 433 ), 434 debug = debug, 435 ) 436 437 if begin is None: 438 begin = self.get_sync_time(params=params, debug=debug) 439 440 backtrack_interval = ( 441 timedelta(minutes=backtrack_minutes) 442 if isinstance(begin, datetime) 443 else backtrack_minutes 444 ) 445 if begin is not None: 446 begin = begin - backtrack_interval 447 448 return self.get_data( 449 begin = begin, 450 params = params, 451 debug = debug, 452 **kw 453 )
Get the most recent data from the instance connector as a Pandas DataFrame.
Parameters
- backtrack_minutes (Optional[int], default None):
How many minutes from
begin
to select from. IfNone
, usepipe.parameters['fetch']['backtrack_minutes']
. begin (Optional[datetime], default None): The starting point to search for data. If begin is
None
(default), use the most recent observed datetime (AKA sync_time).E.g. begin = 02:00 Search this region. Ignore this, even if there's data. / / / / / / / / / | -----|----------|----------|----------|----------|----------| 00:00 01:00 02:00 03:00 04:00 05:00
params (Optional[Dict[str, Any]], default None): The standard Meerschaum
params
query dictionary.- fresh (bool, default False):
If
True
, Ignore local cache and pull directly from the instance connector. Only comes into effect if a pipe was created withcache=True
. - debug (bool default False): Verbosity toggle.
Returns
- A
pd.DataFrame
for the pipe's data corresponding to the provided parameters. Backtrack data - is a convenient way to get a pipe's data "backtracked" from the most recent datetime.
456def get_rowcount( 457 self, 458 begin: Optional[datetime] = None, 459 end: Optional['datetime'] = None, 460 params: Optional[Dict[str, Any]] = None, 461 remote: bool = False, 462 debug: bool = False 463 ) -> int: 464 """ 465 Get a Pipe's instance or remote rowcount. 466 467 Parameters 468 ---------- 469 begin: Optional[datetime], default None 470 Count rows where datetime > begin. 471 472 end: Optional[datetime], default None 473 Count rows where datetime < end. 474 475 remote: bool, default False 476 Count rows from a pipe's remote source. 477 **NOTE**: This is experimental! 478 479 debug: bool, default False 480 Verbosity toggle. 481 482 Returns 483 ------- 484 An `int` of the number of rows in the pipe corresponding to the provided parameters. 485 Returned 0 if the pipe does not exist. 486 """ 487 from meerschaum.utils.warnings import warn 488 from meerschaum.utils.venv import Venv 489 from meerschaum.connectors import get_connector_plugin 490 491 connector = self.instance_connector if not remote else self.connector 492 try: 493 with Venv(get_connector_plugin(connector)): 494 rowcount = connector.get_pipe_rowcount( 495 self, 496 begin = begin, 497 end = end, 498 params = params, 499 remote = remote, 500 debug = debug, 501 ) 502 if rowcount is None: 503 return 0 504 return rowcount 505 except AttributeError as e: 506 warn(e) 507 if remote: 508 return 0 509 warn(f"Failed to get a rowcount for {self}.") 510 return 0
Get a Pipe's instance or remote rowcount.
Parameters
- begin (Optional[datetime], default None): Count rows where datetime > begin.
- end (Optional[datetime], default None): Count rows where datetime < end.
- remote (bool, default False): Count rows from a pipe's remote source. NOTE: This is experimental!
- debug (bool, default False): Verbosity toggle.
Returns
- An
int
of the number of rows in the pipe corresponding to the provided parameters. - Returned 0 if the pipe does not exist.
513def get_chunk_interval( 514 self, 515 chunk_interval: Union[timedelta, int, None] = None, 516 debug: bool = False, 517 ) -> Union[timedelta, int]: 518 """ 519 Get the chunk interval to use for this pipe. 520 521 Parameters 522 ---------- 523 chunk_interval: Union[timedelta, int, None], default None 524 If provided, coerce this value into the correct type. 525 For example, if the datetime axis is an integer, then 526 return the number of minutes. 527 528 Returns 529 ------- 530 The chunk interval (`timedelta` or `int`) to use with this pipe's `datetime` axis. 531 """ 532 default_chunk_minutes = get_config('pipes', 'parameters', 'verify', 'chunk_minutes') 533 configured_chunk_minutes = self.parameters.get('verify', {}).get('chunk_minutes', None) 534 chunk_minutes = ( 535 (configured_chunk_minutes or default_chunk_minutes) 536 if chunk_interval is None 537 else ( 538 chunk_interval 539 if isinstance(chunk_interval, int) 540 else int(chunk_interval.total_seconds() / 60) 541 ) 542 ) 543 544 dt_col = self.columns.get('datetime', None) 545 if dt_col is None: 546 return timedelta(minutes=chunk_minutes) 547 548 dt_dtype = self.dtypes.get(dt_col, 'datetime64[ns]') 549 if 'int' in dt_dtype.lower(): 550 return chunk_minutes 551 return timedelta(minutes=chunk_minutes)
Get the chunk interval to use for this pipe.
Parameters
- chunk_interval (Union[timedelta, int, None], default None): If provided, coerce this value into the correct type. For example, if the datetime axis is an integer, then return the number of minutes.
Returns
- The chunk interval (
timedelta
orint
) to use with this pipe'sdatetime
axis.
554def get_chunk_bounds( 555 self, 556 begin: Union[datetime, int, None] = None, 557 end: Union[datetime, int, None] = None, 558 bounded: bool = False, 559 chunk_interval: Union[timedelta, int, None] = None, 560 debug: bool = False, 561 ) -> List[ 562 Tuple[ 563 Union[datetime, int, None], 564 Union[datetime, int, None], 565 ] 566 ]: 567 """ 568 Return a list of datetime bounds for iterating over the pipe's `datetime` axis. 569 570 Parameters 571 ---------- 572 begin: Union[datetime, int, None], default None 573 If provided, do not select less than this value. 574 Otherwise the first chunk will be unbounded. 575 576 end: Union[datetime, int, None], default None 577 If provided, do not select greater than or equal to this value. 578 Otherwise the last chunk will be unbounded. 579 580 bounded: bool, default False 581 If `True`, do not include `None` in the first chunk. 582 583 chunk_interval: Union[timedelta, int, None], default None 584 If provided, use this interval for the size of chunk boundaries. 585 The default value for this pipe may be set 586 under `pipe.parameters['verify']['chunk_minutes']`. 587 588 debug: bool, default False 589 Verbosity toggle. 590 591 Returns 592 ------- 593 A list of chunk bounds (datetimes or integers). 594 If unbounded, the first and last chunks will include `None`. 595 """ 596 include_less_than_begin = not bounded and begin is None 597 include_greater_than_end = not bounded and end is None 598 if begin is None: 599 begin = self.get_sync_time(newest=False, debug=debug) 600 if end is None: 601 end = self.get_sync_time(newest=True, debug=debug) 602 if begin is None and end is None: 603 return [(None, None)] 604 605 ### Set the chunk interval under `pipe.parameters['verify']['chunk_minutes']`. 606 chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug) 607 608 ### Build a list of tuples containing the chunk boundaries 609 ### so that we can sync multiple chunks in parallel. 610 ### Run `verify pipes --workers 1` to sync chunks in series. 611 chunk_bounds = [] 612 begin_cursor = begin 613 while begin_cursor < end: 614 end_cursor = begin_cursor + chunk_interval 615 chunk_bounds.append((begin_cursor, end_cursor)) 616 begin_cursor = end_cursor 617 618 ### The chunk interval might be too large. 619 if not chunk_bounds and end >= begin: 620 chunk_bounds = [(begin, end)] 621 622 ### Truncate the last chunk to the end timestamp. 623 if chunk_bounds[-1][1] > end: 624 chunk_bounds[-1] = (chunk_bounds[-1][0], end) 625 626 ### Pop the last chunk if its bounds are equal. 627 if chunk_bounds[-1][0] == chunk_bounds[-1][1]: 628 chunk_bounds = chunk_bounds[:-1] 629 630 if include_less_than_begin: 631 chunk_bounds = [(None, begin)] + chunk_bounds 632 if include_greater_than_end: 633 chunk_bounds = chunk_bounds + [(end, None)] 634 635 return chunk_bounds
Return a list of datetime bounds for iterating over the pipe's datetime
axis.
Parameters
- begin (Union[datetime, int, None], default None): If provided, do not select less than this value. Otherwise the first chunk will be unbounded.
- end (Union[datetime, int, None], default None): If provided, do not select greater than or equal to this value. Otherwise the last chunk will be unbounded.
- bounded (bool, default False):
If
True
, do not includeNone
in the first chunk. - chunk_interval (Union[timedelta, int, None], default None):
If provided, use this interval for the size of chunk boundaries.
The default value for this pipe may be set
under
pipe.parameters['verify']['chunk_minutes']
. - debug (bool, default False): Verbosity toggle.
Returns
- A list of chunk bounds (datetimes or integers).
- If unbounded, the first and last chunks will include
None
.
12def register( 13 self, 14 debug: bool = False, 15 **kw: Any 16 ) -> SuccessTuple: 17 """ 18 Register a new Pipe along with its attributes. 19 20 Parameters 21 ---------- 22 debug: bool, default False 23 Verbosity toggle. 24 25 kw: Any 26 Keyword arguments to pass to `instance_connector.register_pipe()`. 27 28 Returns 29 ------- 30 A `SuccessTuple` of success, message. 31 """ 32 if self.temporary: 33 return False, "Cannot register pipes created with `temporary=True` (read-only)." 34 35 from meerschaum.utils.formatting import get_console 36 from meerschaum.utils.venv import Venv 37 from meerschaum.connectors import get_connector_plugin, custom_types 38 from meerschaum.config._patch import apply_patch_to_config 39 40 import warnings 41 with warnings.catch_warnings(): 42 warnings.simplefilter('ignore') 43 try: 44 _conn = self.connector 45 except Exception as e: 46 _conn = None 47 48 if ( 49 _conn is not None 50 and 51 (_conn.type == 'plugin' or _conn.type in custom_types) 52 and 53 getattr(_conn, 'register', None) is not None 54 ): 55 try: 56 with Venv(get_connector_plugin(_conn), debug=debug): 57 params = self.connector.register(self) 58 except Exception as e: 59 get_console().print_exception() 60 params = None 61 params = {} if params is None else params 62 if not isinstance(params, dict): 63 from meerschaum.utils.warnings import warn 64 warn( 65 f"Invalid parameters returned from `register()` in connector {self.connector}:\n" 66 + f"{params}" 67 ) 68 else: 69 self.parameters = apply_patch_to_config(params, self.parameters) 70 71 if not self.parameters: 72 cols = self.columns if self.columns else {'datetime': None, 'id': None} 73 self.parameters = { 74 'columns': cols, 75 } 76 77 with Venv(get_connector_plugin(self.instance_connector)): 78 return self.instance_connector.register_pipe(self, debug=debug, **kw)
Register a new Pipe along with its attributes.
Parameters
- debug (bool, default False): Verbosity toggle.
- kw (Any):
Keyword arguments to pass to
instance_connector.register_pipe()
.
Returns
- A
SuccessTuple
of success, message.
14@property 15def attributes(self) -> Dict[str, Any]: 16 """ 17 Return a dictionary of a pipe's keys and parameters. 18 These values are reflected directly from the pipes table of the instance. 19 """ 20 import time 21 from meerschaum.config import get_config 22 from meerschaum.config._patch import apply_patch_to_config 23 from meerschaum.utils.venv import Venv 24 from meerschaum.connectors import get_connector_plugin 25 26 timeout_seconds = get_config('pipes', 'attributes', 'local_cache_timeout_seconds') 27 28 if '_attributes' not in self.__dict__: 29 self._attributes = {} 30 31 now = time.perf_counter() 32 last_refresh = self.__dict__.get('_attributes_sync_time', None) 33 timed_out = ( 34 last_refresh is None 35 or 36 (timeout_seconds is not None and (now - last_refresh) >= timeout_seconds) 37 ) 38 if not self.temporary and timed_out: 39 self._attributes_sync_time = now 40 local_attributes = self.__dict__.get('_attributes', {}) 41 with Venv(get_connector_plugin(self.instance_connector)): 42 instance_attributes = self.instance_connector.get_pipe_attributes(self) 43 self._attributes = apply_patch_to_config(instance_attributes, local_attributes) 44 return self._attributes
Return a dictionary of a pipe's keys and parameters. These values are reflected directly from the pipes table of the instance.
47@property 48def parameters(self) -> Optional[Dict[str, Any]]: 49 """ 50 Return the parameters dictionary of the pipe. 51 """ 52 if 'parameters' not in self.attributes: 53 self.attributes['parameters'] = {} 54 return self.attributes['parameters']
Return the parameters dictionary of the pipe.
66@property 67def columns(self) -> Union[Dict[str, str], None]: 68 """ 69 Return the `columns` dictionary defined in `meerschaum.Pipe.parameters`. 70 """ 71 if 'columns' not in self.parameters: 72 self.parameters['columns'] = {} 73 cols = self.parameters['columns'] 74 if not isinstance(cols, dict): 75 cols = {} 76 self.parameters['columns'] = cols 77 return cols
Return the columns
dictionary defined in Pipe.parameters
.
117@property 118def dtypes(self) -> Union[Dict[str, Any], None]: 119 """ 120 If defined, return the `dtypes` dictionary defined in `meerschaum.Pipe.parameters`. 121 """ 122 from meerschaum.config._patch import apply_patch_to_config 123 configured_dtypes = self.parameters.get('dtypes', {}) 124 remote_dtypes = self.infer_dtypes(persist=False) 125 patched_dtypes = apply_patch_to_config(remote_dtypes, configured_dtypes) 126 self.parameters['dtypes'] = patched_dtypes 127 return self.parameters['dtypes']
If defined, return the dtypes
dictionary defined in Pipe.parameters
.
139def get_columns(self, *args: str, error: bool = False) -> Union[str, Tuple[str]]: 140 """ 141 Check if the requested columns are defined. 142 143 Parameters 144 ---------- 145 *args: str 146 The column names to be retrieved. 147 148 error: bool, default False 149 If `True`, raise an `Exception` if the specified column is not defined. 150 151 Returns 152 ------- 153 A tuple of the same size of `args` or a `str` if `args` is a single argument. 154 155 Examples 156 -------- 157 >>> pipe = mrsm.Pipe('test', 'test') 158 >>> pipe.columns = {'datetime': 'dt', 'id': 'id'} 159 >>> pipe.get_columns('datetime', 'id') 160 ('dt', 'id') 161 >>> pipe.get_columns('value', error=True) 162 Exception: 🛑 Missing 'value' column for Pipe('test', 'test'). 163 """ 164 from meerschaum.utils.warnings import error as _error, warn 165 if not args: 166 args = tuple(self.columns.keys()) 167 col_names = [] 168 for col in args: 169 col_name = None 170 try: 171 col_name = self.columns[col] 172 if col_name is None and error: 173 _error(f"Please define the name of the '{col}' column for {self}.") 174 except Exception as e: 175 col_name = None 176 if col_name is None and error: 177 _error(f"Missing '{col}'" + f" column for {self}.") 178 col_names.append(col_name) 179 if len(col_names) == 1: 180 return col_names[0] 181 return tuple(col_names)
Check if the requested columns are defined.
Parameters
- *args (str): The column names to be retrieved.
- error (bool, default False):
If
True
, raise anException
if the specified column is not defined.
Returns
- A tuple of the same size of
args
or astr
ifargs
is a single argument.
Examples
>>> pipe = mrsm.Pipe('test', 'test')
>>> pipe.columns = {'datetime': 'dt', 'id': 'id'}
>>> pipe.get_columns('datetime', 'id')
('dt', 'id')
>>> pipe.get_columns('value', error=True)
Exception: 🛑 Missing 'value' column for Pipe('test', 'test').
184def get_columns_types(self, debug: bool = False) -> Union[Dict[str, str], None]: 185 """ 186 Get a dictionary of a pipe's column names and their types. 187 188 Parameters 189 ---------- 190 debug: bool, default False: 191 Verbosity toggle. 192 193 Returns 194 ------- 195 A dictionary of column names (`str`) to column types (`str`). 196 197 Examples 198 -------- 199 >>> pipe.get_columns_types() 200 { 201 'dt': 'TIMESTAMP WITHOUT TIMEZONE', 202 'id': 'BIGINT', 203 'val': 'DOUBLE PRECISION', 204 } 205 >>> 206 """ 207 from meerschaum.utils.venv import Venv 208 from meerschaum.connectors import get_connector_plugin 209 210 with Venv(get_connector_plugin(self.instance_connector)): 211 return self.instance_connector.get_pipe_columns_types(self, debug=debug)
Get a dictionary of a pipe's column names and their types.
Parameters
- debug (bool, default False:): Verbosity toggle.
Returns
- A dictionary of column names (
str
) to column types (str
).
Examples
>>> pipe.get_columns_types()
{
'dt': 'TIMESTAMP WITHOUT TIMEZONE',
'id': 'BIGINT',
'val': 'DOUBLE PRECISION',
}
>>>
436def get_indices(self) -> Dict[str, str]: 437 """ 438 Return a dictionary in the form of `pipe.columns` but map to index names. 439 """ 440 return { 441 ix: (self.target + '_' + col + '_index') 442 for ix, col in self.columns.items() if col 443 }
Return a dictionary in the form of pipe.columns
but map to index names.
214def get_id(self, **kw: Any) -> Union[int, None]: 215 """ 216 Fetch a pipe's ID from its instance connector. 217 If the pipe does not exist, return `None`. 218 """ 219 if self.temporary: 220 return None 221 from meerschaum.utils.venv import Venv 222 from meerschaum.connectors import get_connector_plugin 223 224 with Venv(get_connector_plugin(self.instance_connector)): 225 return self.instance_connector.get_pipe_id(self, **kw)
Fetch a pipe's ID from its instance connector.
If the pipe does not exist, return None
.
228@property 229def id(self) -> Union[int, None]: 230 """ 231 Fetch and cache a pipe's ID. 232 """ 233 if not ('_id' in self.__dict__ and self._id): 234 self._id = self.get_id() 235 return self._id
Fetch and cache a pipe's ID.
238def get_val_column(self, debug: bool = False) -> Union[str, None]: 239 """ 240 Return the name of the value column if it's defined, otherwise make an educated guess. 241 If not set in the `columns` dictionary, return the first numeric column that is not 242 an ID or datetime column. 243 If none may be found, return `None`. 244 245 Parameters 246 ---------- 247 debug: bool, default False: 248 Verbosity toggle. 249 250 Returns 251 ------- 252 Either a string or `None`. 253 """ 254 from meerschaum.utils.debug import dprint 255 if debug: 256 dprint('Attempting to determine the value column...') 257 try: 258 val_name = self.get_columns('value') 259 except Exception as e: 260 val_name = None 261 if val_name is not None: 262 if debug: 263 dprint(f"Value column: {val_name}") 264 return val_name 265 266 cols = self.columns 267 if cols is None: 268 if debug: 269 dprint('No columns could be determined. Returning...') 270 return None 271 try: 272 dt_name = self.get_columns('datetime', error=False) 273 except Exception as e: 274 dt_name = None 275 try: 276 id_name = self.get_columns('id', errors=False) 277 except Exception as e: 278 id_name = None 279 280 if debug: 281 dprint(f"dt_name: {dt_name}") 282 dprint(f"id_name: {id_name}") 283 284 cols_types = self.get_columns_types(debug=debug) 285 if cols_types is None: 286 return None 287 if debug: 288 dprint(f"cols_types: {cols_types}") 289 if dt_name is not None: 290 cols_types.pop(dt_name, None) 291 if id_name is not None: 292 cols_types.pop(id_name, None) 293 294 candidates = [] 295 candidate_keywords = {'float', 'double', 'precision', 'int', 'numeric',} 296 for search_term in candidate_keywords: 297 for col, typ in cols_types.items(): 298 if search_term in typ.lower(): 299 candidates.append(col) 300 break 301 if not candidates: 302 if debug: 303 dprint(f"No value column could be determined.") 304 return None 305 306 return candidates[0]
Return the name of the value column if it's defined, otherwise make an educated guess.
If not set in the columns
dictionary, return the first numeric column that is not
an ID or datetime column.
If none may be found, return None
.
Parameters
- debug (bool, default False:): Verbosity toggle.
Returns
- Either a string or
None
.
309@property 310def parents(self) -> List[meerschaum.Pipe]: 311 """ 312 Return a list of `meerschaum.Pipe` objects to be designated as parents. 313 """ 314 if 'parents' not in self.parameters: 315 return [] 316 from meerschaum.utils.warnings import warn 317 _parents_keys = self.parameters['parents'] 318 if not isinstance(_parents_keys, list): 319 warn( 320 f"Please ensure the parents for {self} are defined as a list of keys.", 321 stacklevel = 4 322 ) 323 return [] 324 from meerschaum import Pipe 325 _parents = [] 326 for keys in _parents_keys: 327 try: 328 p = Pipe(**keys) 329 except Exception as e: 330 warn(f"Unable to build parent with keys '{keys}' for {self}:\n{e}") 331 continue 332 _parents.append(p) 333 return _parents
Return a list of Pipe
objects to be designated as parents.
336@property 337def children(self) -> List[meerschaum.Pipe]: 338 """ 339 Return a list of `meerschaum.Pipe` objects to be designated as children. 340 """ 341 if 'children' not in self.parameters: 342 return [] 343 from meerschaum.utils.warnings import warn 344 _children_keys = self.parameters['children'] 345 if not isinstance(_children_keys, list): 346 warn( 347 f"Please ensure the children for {self} are defined as a list of keys.", 348 stacklevel = 4 349 ) 350 return [] 351 from meerschaum import Pipe 352 _children = [] 353 for keys in _children_keys: 354 try: 355 p = Pipe(**keys) 356 except Exception as e: 357 warn(f"Unable to build parent with keys '{keys}' for {self}:\n{e}") 358 continue 359 _children.append(p) 360 return _children
Return a list of Pipe
objects to be designated as children.
363@property 364def target(self) -> str: 365 """ 366 The target table name. 367 You can set the target name under on of the following keys 368 (checked in this order): 369 - `target` 370 - `target_name` 371 - `target_table` 372 - `target_table_name` 373 """ 374 if 'target' not in self.parameters: 375 target = self._target_legacy() 376 potential_keys = ('target_name', 'target_table', 'target_table_name') 377 for k in potential_keys: 378 if k in self.parameters: 379 target = self.parameters[k] 380 break 381 382 if self.instance_connector.type == 'sql': 383 from meerschaum.utils.sql import truncate_item_name 384 truncated_target = truncate_item_name(target, self.instance_connector.flavor) 385 if truncated_target != target: 386 warn( 387 f"The target '{target}' is too long for '{self.instance_connector.flavor}', " 388 + f"will use {truncated_target} instead." 389 ) 390 target = truncated_target 391 392 self.target = target 393 return self.parameters['target']
The target table name. You can set the target name under on of the following keys (checked in this order):
target
target_name
target_table
target_table_name
416def guess_datetime(self) -> Union[str, None]: 417 """ 418 Try to determine a pipe's datetime column. 419 """ 420 dtypes = self.dtypes 421 422 ### Abort if the user explictly disallows a datetime index. 423 if 'datetime' in dtypes: 424 if dtypes['datetime'] is None: 425 return None 426 427 dt_cols = [ 428 col for col, typ in self.dtypes.items() 429 if str(typ).startswith('datetime') 430 ] 431 if not dt_cols: 432 return None 433 return dt_cols[0]
Try to determine a pipe's datetime column.
12def show( 13 self, 14 nopretty: bool = False, 15 debug: bool = False, 16 **kw 17 ) -> SuccessTuple: 18 """ 19 Show attributes of a Pipe. 20 21 Parameters 22 ---------- 23 nopretty: bool, default False 24 If `True`, simply print the JSON of the pipe's attributes. 25 26 debug: bool, default False 27 Verbosity toggle. 28 29 Returns 30 ------- 31 A `SuccessTuple` of success, message. 32 33 """ 34 import json 35 from meerschaum.utils.formatting import ( 36 pprint, make_header, ANSI, highlight_pipes, fill_ansi, get_console, 37 ) 38 from meerschaum.utils.packages import import_rich, attempt_import 39 from meerschaum.utils.warnings import info 40 attributes_json = json.dumps(self.attributes) 41 if not nopretty: 42 _to_print = f"Attributes for {self}:" 43 if ANSI: 44 _to_print = fill_ansi(highlight_pipes(make_header(_to_print)), 'magenta') 45 print(_to_print) 46 rich = import_rich() 47 rich_json = attempt_import('rich.json') 48 get_console().print(rich_json.JSON(attributes_json)) 49 else: 50 print(_to_print) 51 else: 52 print(attributes_json) 53 54 return True, "Success"
Show attributes of a Pipe.
Parameters
- nopretty (bool, default False):
If
True
, simply print the JSON of the pipe's attributes. - debug (bool, default False): Verbosity toggle.
Returns
- A
SuccessTuple
of success, message.
21def edit( 22 self, 23 patch: bool = False, 24 interactive: bool = False, 25 debug: bool = False, 26 **kw: Any 27 ) -> SuccessTuple: 28 """ 29 Edit a Pipe's configuration. 30 31 Parameters 32 ---------- 33 patch: bool, default False 34 If `patch` is True, update parameters by cascading rather than overwriting. 35 interactive: bool, default False 36 If `True`, open an editor for the user to make changes to the pipe's YAML file. 37 debug: bool, default False 38 Verbosity toggle. 39 40 Returns 41 ------- 42 A `SuccessTuple` of success, message. 43 44 """ 45 from meerschaum.utils.venv import Venv 46 from meerschaum.connectors import get_connector_plugin 47 48 if self.temporary: 49 return False, "Cannot edit pipes created with `temporary=True` (read-only)." 50 51 if not interactive: 52 with Venv(get_connector_plugin(self.instance_connector)): 53 return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw) 54 from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH 55 from meerschaum.utils.misc import edit_file 56 parameters_filename = str(self) + '.yaml' 57 parameters_path = PIPES_CACHE_RESOURCES_PATH / parameters_filename 58 59 from meerschaum.utils.yaml import yaml 60 61 edit_text = f"Edit the parameters for {self}" 62 edit_top = '#' * (len(edit_text) + 4) 63 edit_header = edit_top + f'\n# {edit_text} #\n' + edit_top + '\n\n' 64 65 from meerschaum.config import get_config 66 parameters = dict(get_config('pipes', 'parameters', patch=True)) 67 from meerschaum.config._patch import apply_patch_to_config 68 parameters = apply_patch_to_config(parameters, self.parameters) 69 70 ### write parameters to yaml file 71 with open(parameters_path, 'w+') as f: 72 f.write(edit_header) 73 yaml.dump(parameters, stream=f, sort_keys=False) 74 75 ### only quit editing if yaml is valid 76 editing = True 77 while editing: 78 edit_file(parameters_path) 79 try: 80 with open(parameters_path, 'r') as f: 81 file_parameters = yaml.load(f.read()) 82 except Exception as e: 83 from meerschaum.utils.warnings import warn 84 warn(f"Invalid format defined for '{self}':\n\n{e}") 85 input(f"Press [Enter] to correct the configuration for '{self}': ") 86 else: 87 editing = False 88 89 self.parameters = file_parameters 90 91 if debug: 92 from meerschaum.utils.formatting import pprint 93 pprint(self.parameters) 94 95 with Venv(get_connector_plugin(self.instance_connector)): 96 return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw)
Edit a Pipe's configuration.
Parameters
- patch (bool, default False):
If
patch
is True, update parameters by cascading rather than overwriting. - interactive (bool, default False):
If
True
, open an editor for the user to make changes to the pipe's YAML file. - debug (bool, default False): Verbosity toggle.
Returns
- A
SuccessTuple
of success, message.
99def edit_definition( 100 self, 101 yes: bool = False, 102 noask: bool = False, 103 force: bool = False, 104 debug : bool = False, 105 **kw : Any 106 ) -> SuccessTuple: 107 """ 108 Edit a pipe's definition file and update its configuration. 109 **NOTE:** This function is interactive and should not be used in automated scripts! 110 111 Returns 112 ------- 113 A `SuccessTuple` of success, message. 114 115 """ 116 if self.temporary: 117 return False, "Cannot edit pipes created with `temporary=True` (read-only)." 118 119 from meerschaum.connectors import instance_types 120 if (self.connector is None) or self.connector.type not in instance_types: 121 return self.edit(interactive=True, debug=debug, **kw) 122 123 import json 124 from meerschaum.utils.warnings import info, warn 125 from meerschaum.utils.debug import dprint 126 from meerschaum.config._patch import apply_patch_to_config 127 from meerschaum.utils.misc import edit_file 128 129 _parameters = self.parameters 130 if 'fetch' not in _parameters: 131 _parameters['fetch'] = {} 132 133 def _edit_api(): 134 from meerschaum.utils.prompt import prompt, yes_no 135 info( 136 f"Please enter the keys of the source pipe from '{self.connector}'.\n" + 137 "Type 'None' for None, or empty when there is no default. Press [CTRL+C] to skip." 138 ) 139 140 _keys = { 'connector_keys' : None, 'metric_key' : None, 'location_key' : None } 141 for k in _keys: 142 _keys[k] = _parameters['fetch'].get(k, None) 143 144 for k, v in _keys.items(): 145 try: 146 _keys[k] = prompt(k.capitalize().replace('_', ' ') + ':', icon=True, default=v) 147 except KeyboardInterrupt: 148 continue 149 if _keys[k] in ('', 'None', '\'None\'', '[None]'): 150 _keys[k] = None 151 152 _parameters['fetch'] = apply_patch_to_config(_parameters['fetch'], _keys) 153 154 info("You may optionally specify additional filter parameters as JSON.") 155 print(" Parameters are translated into a 'WHERE x AND y' clause, and lists are IN clauses.") 156 print(" For example, the following JSON would correspond to 'WHERE x = 1 AND y IN (2, 3)':") 157 print(json.dumps({'x': 1, 'y': [2, 3]}, indent=2, separators=(',', ': '))) 158 if force or yes_no( 159 "Would you like to add additional filter parameters?", 160 yes=yes, noask=noask 161 ): 162 from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH 163 definition_filename = str(self) + '.json' 164 definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename 165 try: 166 definition_path.touch() 167 with open(definition_path, 'w+') as f: 168 json.dump(_parameters.get('fetch', {}).get('params', {}), f, indent=2) 169 except Exception as e: 170 return False, f"Failed writing file '{definition_path}':\n" + str(e) 171 172 _params = None 173 while True: 174 edit_file(definition_path) 175 try: 176 with open(definition_path, 'r') as f: 177 _params = json.load(f) 178 except Exception as e: 179 warn(f'Failed to read parameters JSON:\n{e}', stack=False) 180 if force or yes_no( 181 "Would you like to try again?\n " 182 + "If not, the parameters JSON file will be ignored.", 183 noask=noask, yes=yes 184 ): 185 continue 186 _params = None 187 break 188 if _params is not None: 189 if 'fetch' not in _parameters: 190 _parameters['fetch'] = {} 191 _parameters['fetch']['params'] = _params 192 193 self.parameters = _parameters 194 return True, "Success" 195 196 def _edit_sql(): 197 import pathlib, os, textwrap 198 from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH 199 from meerschaum.utils.misc import edit_file 200 definition_filename = str(self) + '.sql' 201 definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename 202 203 sql_definition = _parameters['fetch'].get('definition', None) 204 if sql_definition is None: 205 sql_definition = '' 206 sql_definition = textwrap.dedent(sql_definition).lstrip() 207 208 try: 209 definition_path.touch() 210 with open(definition_path, 'w+') as f: 211 f.write(sql_definition) 212 except Exception as e: 213 return False, f"Failed writing file '{definition_path}':\n" + str(e) 214 215 edit_file(definition_path) 216 try: 217 with open(definition_path, 'r') as f: 218 file_definition = f.read() 219 except Exception as e: 220 return False, f"Failed reading file '{definition_path}':\n" + str(e) 221 222 if sql_definition == file_definition: 223 return False, f"No changes made to definition for {self}." 224 225 if ' ' not in file_definition: 226 return False, f"Invalid SQL definition for {self}." 227 228 if debug: 229 dprint("Read SQL definition:\n\n" + file_definition) 230 _parameters['fetch']['definition'] = file_definition 231 self.parameters = _parameters 232 return True, "Success" 233 234 locals()['_edit_' + str(self.connector.type)]() 235 return self.edit(interactive=False, debug=debug, **kw)
Edit a pipe's definition file and update its configuration. NOTE: This function is interactive and should not be used in automated scripts!
Returns
- A
SuccessTuple
of success, message.
13def update(self, *args, **kw) -> SuccessTuple: 14 """ 15 Update a pipe's parameters in its instance. 16 """ 17 kw['interactive'] = False 18 return self.edit(*args, **kw)
Update a pipe's parameters in its instance.
42def sync( 43 self, 44 df: Union[ 45 pd.DataFrame, 46 Dict[str, List[Any]], 47 List[Dict[str, Any]], 48 InferFetch 49 ] = InferFetch, 50 begin: Union[datetime, int, str, None] = '', 51 end: Union[datetime, int, None] = None, 52 force: bool = False, 53 retries: int = 10, 54 min_seconds: int = 1, 55 check_existing: bool = True, 56 blocking: bool = True, 57 workers: Optional[int] = None, 58 callback: Optional[Callable[[Tuple[bool, str]], Any]] = None, 59 error_callback: Optional[Callable[[Exception], Any]] = None, 60 chunksize: Optional[int] = -1, 61 sync_chunks: bool = True, 62 debug: bool = False, 63 _inplace: bool = True, 64 **kw: Any 65) -> SuccessTuple: 66 """ 67 Fetch new data from the source and update the pipe's table with new data. 68 69 Get new remote data via fetch, get existing data in the same time period, 70 and merge the two, only keeping the unseen data. 71 72 Parameters 73 ---------- 74 df: Union[None, pd.DataFrame, Dict[str, List[Any]]], default None 75 An optional DataFrame to sync into the pipe. Defaults to `None`. 76 77 begin: Union[datetime, int, str, None], default '' 78 Optionally specify the earliest datetime to search for data. 79 80 end: Union[datetime, int, str, None], default None 81 Optionally specify the latest datetime to search for data. 82 83 force: bool, default False 84 If `True`, keep trying to sync untul `retries` attempts. 85 86 retries: int, default 10 87 If `force`, how many attempts to try syncing before declaring failure. 88 89 min_seconds: Union[int, float], default 1 90 If `force`, how many seconds to sleep between retries. Defaults to `1`. 91 92 check_existing: bool, default True 93 If `True`, pull and diff with existing data from the pipe. 94 95 blocking: bool, default True 96 If `True`, wait for sync to finish and return its result, otherwise 97 asyncronously sync (oxymoron?) and return success. Defaults to `True`. 98 Only intended for specific scenarios. 99 100 workers: Optional[int], default None 101 If provided and the instance connector is thread-safe 102 (`pipe.instance_connector.IS_THREAD_SAFE is True`), 103 limit concurrent sync to this many threads. 104 105 callback: Optional[Callable[[Tuple[bool, str]], Any]], default None 106 Callback function which expects a SuccessTuple as input. 107 Only applies when `blocking=False`. 108 109 error_callback: Optional[Callable[[Exception], Any]], default None 110 Callback function which expects an Exception as input. 111 Only applies when `blocking=False`. 112 113 chunksize: int, default -1 114 Specify the number of rows to sync per chunk. 115 If `-1`, resort to system configuration (default is `900`). 116 A `chunksize` of `None` will sync all rows in one transaction. 117 118 sync_chunks: bool, default True 119 If possible, sync chunks while fetching them into memory. 120 121 debug: bool, default False 122 Verbosity toggle. Defaults to False. 123 124 Returns 125 ------- 126 A `SuccessTuple` of success (`bool`) and message (`str`). 127 """ 128 from meerschaum.utils.debug import dprint, _checkpoint 129 from meerschaum.connectors import custom_types 130 from meerschaum.plugins import Plugin 131 from meerschaum.utils.formatting import get_console 132 from meerschaum.utils.venv import Venv 133 from meerschaum.connectors import get_connector_plugin 134 from meerschaum.utils.misc import df_is_chunk_generator, filter_keywords, filter_arguments 135 from meerschaum.utils.pool import get_pool 136 from meerschaum.config import get_config 137 138 if (callback is not None or error_callback is not None) and blocking: 139 warn("Callback functions are only executed when blocking = False. Ignoring...") 140 141 _checkpoint(_total=2, **kw) 142 143 if chunksize == 0: 144 chunksize = None 145 sync_chunks = False 146 147 kw.update({ 148 'begin': begin, 149 'end': end, 150 'force': force, 151 'retries': retries, 152 'min_seconds': min_seconds, 153 'check_existing': check_existing, 154 'blocking': blocking, 155 'workers': workers, 156 'callback': callback, 157 'error_callback': error_callback, 158 'sync_chunks': sync_chunks, 159 'chunksize': chunksize, 160 }) 161 162 ### NOTE: Invalidate `_exists` cache before and after syncing. 163 self._exists = None 164 165 def _sync( 166 p: 'meerschaum.Pipe', 167 df: Union[ 168 'pd.DataFrame', 169 Dict[str, List[Any]], 170 List[Dict[str, Any]], 171 InferFetch 172 ] = InferFetch, 173 ) -> SuccessTuple: 174 if df is None: 175 p._exists = None 176 return ( 177 False, 178 f"You passed `None` instead of data into `sync()` for {p}.\n" 179 + "Omit the DataFrame to infer fetching.", 180 ) 181 ### Ensure that Pipe is registered. 182 if not p.temporary and p.get_id(debug=debug) is None: 183 ### NOTE: This may trigger an interactive session for plugins! 184 register_success, register_msg = p.register(debug=debug) 185 if not register_success: 186 if 'already' not in register_msg: 187 p._exists = None 188 return register_success, register_msg 189 190 ### If connector is a plugin with a `sync()` method, return that instead. 191 ### If the plugin does not have a `sync()` method but does have a `fetch()` method, 192 ### use that instead. 193 ### NOTE: The DataFrame must be omitted for the plugin sync method to apply. 194 ### If a DataFrame is provided, continue as expected. 195 if hasattr(df, 'MRSM_INFER_FETCH'): 196 try: 197 if p.connector is None: 198 if ':' not in p.connector_keys: 199 return True, f"{p} does not support fetching; nothing to do." 200 201 msg = f"{p} does not have a valid connector." 202 if p.connector_keys.startswith('plugin:'): 203 msg += f"\n Perhaps {p.connector_keys} has a syntax error?" 204 p._exists = None 205 return False, msg 206 except Exception: 207 p._exists = None 208 return False, f"Unable to create the connector for {p}." 209 210 ### Sync in place if this is a SQL pipe. 211 if ( 212 str(self.connector) == str(self.instance_connector) 213 and 214 hasattr(self.instance_connector, 'sync_pipe_inplace') 215 and 216 _inplace 217 and 218 get_config('system', 'experimental', 'inplace_sync') 219 ): 220 with Venv(get_connector_plugin(self.instance_connector)): 221 p._exists = None 222 _args, _kwargs = filter_arguments( 223 p.instance_connector.sync_pipe_inplace, 224 p, 225 debug=debug, 226 **kw 227 ) 228 return self.instance_connector.sync_pipe_inplace( 229 *_args, 230 **_kwargs 231 ) 232 233 ### Activate and invoke `sync(pipe)` for plugin connectors with `sync` methods. 234 try: 235 if getattr(p.connector, 'sync', None) is not None: 236 with Venv(get_connector_plugin(p.connector), debug=debug): 237 _args, _kwargs = filter_arguments( 238 p.connector.sync, 239 p, 240 debug=debug, 241 **kw 242 ) 243 return_tuple = p.connector.sync(*_args, **_kwargs) 244 p._exists = None 245 if not isinstance(return_tuple, tuple): 246 return_tuple = ( 247 False, 248 f"Plugin '{p.connector.label}' returned non-tuple value: {return_tuple}" 249 ) 250 return return_tuple 251 252 except Exception as e: 253 get_console().print_exception() 254 msg = f"Failed to sync {p} with exception: '" + str(e) + "'" 255 if debug: 256 error(msg, silent=False) 257 p._exists = None 258 return False, msg 259 260 ### Fetch the dataframe from the connector's `fetch()` method. 261 try: 262 with Venv(get_connector_plugin(p.connector), debug=debug): 263 df = p.fetch( 264 **filter_keywords( 265 p.fetch, 266 debug=debug, 267 **kw 268 ) 269 ) 270 271 except Exception as e: 272 get_console().print_exception( 273 suppress=[ 274 'meerschaum/core/Pipe/_sync.py', 275 'meerschaum/core/Pipe/_fetch.py', 276 ] 277 ) 278 msg = f"Failed to fetch data from {p.connector}:\n {e}" 279 df = None 280 281 if df is None: 282 p._exists = None 283 return False, f"No data were fetched for {p}." 284 285 if isinstance(df, list): 286 if len(df) == 0: 287 return True, f"No new rows were returned for {p}." 288 289 ### May be a chunk hook results list. 290 if isinstance(df[0], tuple): 291 success = all([_success for _success, _ in df]) 292 message = '\n'.join([_message for _, _message in df]) 293 return success, message 294 295 ### TODO: Depreciate async? 296 if df is True: 297 p._exists = None 298 return True, f"{p} is being synced in parallel." 299 300 ### CHECKPOINT: Retrieved the DataFrame. 301 _checkpoint(**kw) 302 303 ### Allow for dataframe generators or iterables. 304 if df_is_chunk_generator(df): 305 kw['workers'] = p.get_num_workers(kw.get('workers', None)) 306 dt_col = p.columns.get('datetime', None) 307 pool = get_pool(workers=kw.get('workers', 1)) 308 if debug: 309 dprint(f"Received {type(df)}. Attempting to sync first chunk...") 310 311 try: 312 chunk = next(df) 313 except StopIteration: 314 return True, "Received an empty generator; nothing to do." 315 316 chunk_success, chunk_msg = _sync(p, chunk) 317 chunk_msg = '\n' + self._get_chunk_label(chunk, dt_col) + '\n' + chunk_msg 318 if not chunk_success: 319 return chunk_success, f"Unable to sync initial chunk for {p}:\n{chunk_msg}" 320 if debug: 321 dprint("Successfully synced the first chunk, attemping the rest...") 322 323 failed_chunks = [] 324 def _process_chunk(_chunk): 325 try: 326 _chunk_success, _chunk_msg = _sync(p, _chunk) 327 except Exception as e: 328 _chunk_success, _chunk_msg = False, str(e) 329 if not _chunk_success: 330 failed_chunks.append(_chunk) 331 return ( 332 _chunk_success, 333 ( 334 '\n' 335 + self._get_chunk_label(_chunk, dt_col) 336 + '\n' 337 + _chunk_msg 338 ) 339 ) 340 341 results = sorted( 342 [(chunk_success, chunk_msg)] + ( 343 list(pool.imap(_process_chunk, df)) 344 if not df_is_chunk_generator(chunk) 345 else [ 346 _process_chunk(_child_chunks) 347 for _child_chunks in df 348 ] 349 ) 350 ) 351 chunk_messages = [chunk_msg for _, chunk_msg in results] 352 success_bools = [chunk_success for chunk_success, _ in results] 353 success = all(success_bools) 354 msg = '\n'.join(chunk_messages) 355 356 ### If some chunks succeeded, retry the failures. 357 retry_success = True 358 if not success and any(success_bools): 359 if debug: 360 dprint("Retrying failed chunks...") 361 chunks_to_retry = [c for c in failed_chunks] 362 failed_chunks = [] 363 for chunk in chunks_to_retry: 364 chunk_success, chunk_msg = _process_chunk(chunk) 365 msg += f"\n\nRetried chunk:\n{chunk_msg}\n" 366 retry_success = retry_success and chunk_success 367 368 success = success and retry_success 369 return success, msg 370 371 ### Cast to a dataframe and ensure datatypes are what we expect. 372 df = self.enforce_dtypes(df, chunksize=chunksize, debug=debug) 373 if debug: 374 dprint( 375 "DataFrame to sync:\n" 376 + ( 377 str(df)[:255] 378 + '...' 379 if len(str(df)) >= 256 380 else str(df) 381 ), 382 **kw 383 ) 384 385 ### if force, continue to sync until success 386 return_tuple = False, f"Did not sync {p}." 387 run = True 388 _retries = 1 389 while run: 390 with Venv(get_connector_plugin(self.instance_connector)): 391 return_tuple = p.instance_connector.sync_pipe( 392 pipe=p, 393 df=df, 394 debug=debug, 395 **kw 396 ) 397 _retries += 1 398 run = (not return_tuple[0]) and force and _retries <= retries 399 if run and debug: 400 dprint(f"Syncing failed for {p}. Attempt ( {_retries} / {retries} )", **kw) 401 dprint(f"Sleeping for {min_seconds} seconds...", **kw) 402 time.sleep(min_seconds) 403 if _retries > retries: 404 warn( 405 f"Unable to sync {p} within {retries} attempt" + 406 ("s" if retries != 1 else "") + "!" 407 ) 408 409 ### CHECKPOINT: Finished syncing. Handle caching. 410 _checkpoint(**kw) 411 if self.cache_pipe is not None: 412 if debug: 413 dprint("Caching retrieved dataframe.", **kw) 414 _sync_cache_tuple = self.cache_pipe.sync(df, debug=debug, **kw) 415 if not _sync_cache_tuple[0]: 416 warn(f"Failed to sync local cache for {self}.") 417 418 self._exists = None 419 return return_tuple 420 421 if blocking: 422 self._exists = None 423 return _sync(self, df = df) 424 425 from meerschaum.utils.threading import Thread 426 def default_callback(result_tuple: SuccessTuple): 427 dprint(f"Asynchronous result from {self}: {result_tuple}", **kw) 428 429 def default_error_callback(x: Exception): 430 dprint(f"Error received for {self}: {x}", **kw) 431 432 if callback is None and debug: 433 callback = default_callback 434 if error_callback is None and debug: 435 error_callback = default_error_callback 436 try: 437 thread = Thread( 438 target=_sync, 439 args=(self,), 440 kwargs={'df': df}, 441 daemon=False, 442 callback=callback, 443 error_callback=error_callback, 444 ) 445 thread.start() 446 except Exception as e: 447 self._exists = None 448 return False, str(e) 449 450 self._exists = None 451 return True, f"Spawned asyncronous sync for {self}."
Fetch new data from the source and update the pipe's table with new data.
Get new remote data via fetch, get existing data in the same time period, and merge the two, only keeping the unseen data.
Parameters
- df (Union[None, pd.DataFrame, Dict[str, List[Any]]], default None):
An optional DataFrame to sync into the pipe. Defaults to
None
. - begin (Union[datetime, int, str, None], default ''): Optionally specify the earliest datetime to search for data.
- end (Union[datetime, int, str, None], default None): Optionally specify the latest datetime to search for data.
- force (bool, default False):
If
True
, keep trying to sync untulretries
attempts. - retries (int, default 10):
If
force
, how many attempts to try syncing before declaring failure. - min_seconds (Union[int, float], default 1):
If
force
, how many seconds to sleep between retries. Defaults to1
. - check_existing (bool, default True):
If
True
, pull and diff with existing data from the pipe. - blocking (bool, default True):
If
True
, wait for sync to finish and return its result, otherwise asyncronously sync (oxymoron?) and return success. Defaults toTrue
. Only intended for specific scenarios. - workers (Optional[int], default None):
If provided and the instance connector is thread-safe
(
pipe.instance_connector.IS_THREAD_SAFE is True
), limit concurrent sync to this many threads. - callback (Optional[Callable[[Tuple[bool, str]], Any]], default None):
Callback function which expects a SuccessTuple as input.
Only applies when
blocking=False
. - error_callback (Optional[Callable[[Exception], Any]], default None):
Callback function which expects an Exception as input.
Only applies when
blocking=False
. - chunksize (int, default -1):
Specify the number of rows to sync per chunk.
If
-1
, resort to system configuration (default is900
). Achunksize
ofNone
will sync all rows in one transaction. - sync_chunks (bool, default True): If possible, sync chunks while fetching them into memory.
- debug (bool, default False): Verbosity toggle. Defaults to False.
Returns
- A
SuccessTuple
of success (bool
) and message (str
).
454def get_sync_time( 455 self, 456 params: Optional[Dict[str, Any]] = None, 457 newest: bool = True, 458 apply_backtrack_interval: bool = False, 459 round_down: bool = False, 460 debug: bool = False 461) -> Union['datetime', None]: 462 """ 463 Get the most recent datetime value for a Pipe. 464 465 Parameters 466 ---------- 467 params: Optional[Dict[str, Any]], default None 468 Dictionary to build a WHERE clause for a specific column. 469 See `meerschaum.utils.sql.build_where`. 470 471 newest: bool, default True 472 If `True`, get the most recent datetime (honoring `params`). 473 If `False`, get the oldest datetime (`ASC` instead of `DESC`). 474 475 apply_backtrack_interval: bool, default False 476 If `True`, subtract the backtrack interval from the sync time. 477 478 round_down: bool, default False 479 If `True`, round down the datetime value to the nearest minute. 480 481 debug: bool, default False 482 Verbosity toggle. 483 484 Returns 485 ------- 486 A `datetime` object if the pipe exists, otherwise `None`. 487 488 """ 489 from meerschaum.utils.venv import Venv 490 from meerschaum.connectors import get_connector_plugin 491 from meerschaum.utils.misc import round_time 492 493 with Venv(get_connector_plugin(self.instance_connector)): 494 sync_time = self.instance_connector.get_sync_time( 495 self, 496 params=params, 497 newest=newest, 498 debug=debug, 499 ) 500 501 if round_down and isinstance(sync_time, datetime): 502 sync_time = round_time(sync_time, timedelta(minutes=1)) 503 504 if apply_backtrack_interval and sync_time is not None: 505 backtrack_interval = self.get_backtrack_interval(debug=debug) 506 try: 507 sync_time -= backtrack_interval 508 except Exception as e: 509 warn(f"Failed to apply backtrack interval:\n{e}") 510 511 return sync_time
Get the most recent datetime value for a Pipe.
Parameters
- params (Optional[Dict[str, Any]], default None):
Dictionary to build a WHERE clause for a specific column.
See
meerschaum.utils.sql.build_where
. - newest (bool, default True):
If
True
, get the most recent datetime (honoringparams
). IfFalse
, get the oldest datetime (ASC
instead ofDESC
). - apply_backtrack_interval (bool, default False):
If
True
, subtract the backtrack interval from the sync time. - round_down (bool, default False):
If
True
, round down the datetime value to the nearest minute. - debug (bool, default False): Verbosity toggle.
Returns
- A
datetime
object if the pipe exists, otherwiseNone
.
514def exists( 515 self, 516 debug : bool = False 517 ) -> bool: 518 """ 519 See if a Pipe's table exists. 520 521 Parameters 522 ---------- 523 debug: bool, default False 524 Verbosity toggle. 525 526 Returns 527 ------- 528 A `bool` corresponding to whether a pipe's underlying table exists. 529 530 """ 531 import time 532 from meerschaum.utils.venv import Venv 533 from meerschaum.connectors import get_connector_plugin 534 from meerschaum.config import STATIC_CONFIG 535 from meerschaum.utils.debug import dprint 536 now = time.perf_counter() 537 exists_timeout_seconds = STATIC_CONFIG['pipes']['exists_timeout_seconds'] 538 539 _exists = self.__dict__.get('_exists', None) 540 if _exists: 541 exists_timestamp = self.__dict__.get('_exists_timestamp', None) 542 if exists_timestamp is not None: 543 delta = now - exists_timestamp 544 if delta < exists_timeout_seconds: 545 if debug: 546 dprint(f"Returning cached `exists` for {self} ({round(delta, 2)} seconds old).") 547 return _exists 548 549 with Venv(get_connector_plugin(self.instance_connector)): 550 _exists = self.instance_connector.pipe_exists(pipe=self, debug=debug) 551 552 self.__dict__['_exists'] = _exists 553 self.__dict__['_exists_timestamp'] = now 554 return _exists
See if a Pipe's table exists.
Parameters
- debug (bool, default False): Verbosity toggle.
Returns
- A
bool
corresponding to whether a pipe's underlying table exists.
557def filter_existing( 558 self, 559 df: 'pd.DataFrame', 560 safe_copy: bool = True, 561 date_bound_only: bool = False, 562 chunksize: Optional[int] = -1, 563 debug: bool = False, 564 **kw 565 ) -> Tuple['pd.DataFrame', 'pd.DataFrame', 'pd.DataFrame']: 566 """ 567 Inspect a dataframe and filter out rows which already exist in the pipe. 568 569 Parameters 570 ---------- 571 df: 'pd.DataFrame' 572 The dataframe to inspect and filter. 573 574 safe_copy: bool, default True 575 If `True`, create a copy before comparing and modifying the dataframes. 576 Setting to `False` may mutate the DataFrames. 577 See `meerschaum.utils.dataframe.filter_unseen_df`. 578 579 date_bound_only: bool, default False 580 If `True`, only use the datetime index to fetch the sample dataframe. 581 582 chunksize: Optional[int], default -1 583 The `chunksize` used when fetching existing data. 584 585 debug: bool, default False 586 Verbosity toggle. 587 588 Returns 589 ------- 590 A tuple of three pandas DataFrames: unseen, update, and delta. 591 """ 592 from meerschaum.utils.warnings import warn 593 from meerschaum.utils.debug import dprint 594 from meerschaum.utils.packages import attempt_import, import_pandas 595 from meerschaum.utils.misc import round_time 596 from meerschaum.utils.dataframe import ( 597 filter_unseen_df, 598 add_missing_cols_to_df, 599 get_unhashable_cols, 600 get_numeric_cols, 601 ) 602 from meerschaum.utils.dtypes import ( 603 to_pandas_dtype, 604 none_if_null, 605 ) 606 from meerschaum.config import get_config 607 pd = import_pandas() 608 pandas = attempt_import('pandas') 609 if not 'dataframe' in str(type(df)).lower(): 610 df = self.enforce_dtypes(df, chunksize=chunksize, debug=debug) 611 is_dask = 'dask' in df.__module__ 612 if is_dask: 613 dd = attempt_import('dask.dataframe') 614 merge = dd.merge 615 NA = pandas.NA 616 else: 617 merge = pd.merge 618 NA = pd.NA 619 if df is None: 620 return df, df, df 621 if (df.empty if not is_dask else len(df) == 0): 622 return df, df, df 623 624 ### begin is the oldest data in the new dataframe 625 begin, end = None, None 626 dt_col = self.columns.get('datetime', None) 627 dt_type = self.dtypes.get(dt_col, 'datetime64[ns]') if dt_col else None 628 try: 629 min_dt_val = df[dt_col].min(skipna=True) if dt_col else None 630 if is_dask and min_dt_val is not None: 631 min_dt_val = min_dt_val.compute() 632 min_dt = ( 633 pandas.to_datetime(min_dt_val).to_pydatetime() 634 if min_dt_val is not None and 'datetime' in str(dt_type) 635 else min_dt_val 636 ) 637 except Exception as e: 638 min_dt = None 639 if not ('datetime' in str(type(min_dt))) or str(min_dt) == 'NaT': 640 if 'int' not in str(type(min_dt)).lower(): 641 min_dt = None 642 643 if isinstance(min_dt, datetime): 644 begin = ( 645 round_time( 646 min_dt, 647 to = 'down' 648 ) - timedelta(minutes=1) 649 ) 650 elif dt_type and 'int' in dt_type.lower(): 651 begin = min_dt 652 elif dt_col is None: 653 begin = None 654 655 ### end is the newest data in the new dataframe 656 try: 657 max_dt_val = df[dt_col].max(skipna=True) if dt_col else None 658 if is_dask and max_dt_val is not None: 659 max_dt_val = max_dt_val.compute() 660 max_dt = ( 661 pandas.to_datetime(max_dt_val).to_pydatetime() 662 if max_dt_val is not None and 'datetime' in str(dt_type) 663 else max_dt_val 664 ) 665 except Exception as e: 666 import traceback 667 traceback.print_exc() 668 max_dt = None 669 670 if ('datetime' not in str(type(max_dt))) or str(min_dt) == 'NaT': 671 if 'int' not in str(type(max_dt)).lower(): 672 max_dt = None 673 674 if isinstance(max_dt, datetime): 675 end = ( 676 round_time( 677 max_dt, 678 to = 'down' 679 ) + timedelta(minutes=1) 680 ) 681 elif dt_type and 'int' in dt_type.lower(): 682 end = max_dt + 1 683 684 if max_dt is not None and min_dt is not None and min_dt > max_dt: 685 warn(f"Detected minimum datetime greater than maximum datetime.") 686 687 if begin is not None and end is not None and begin > end: 688 if isinstance(begin, datetime): 689 begin = end - timedelta(minutes=1) 690 ### We might be using integers for the datetime axis. 691 else: 692 begin = end - 1 693 694 unique_index_vals = { 695 col: df[col].unique() 696 for col in self.columns 697 if col in df.columns and col != dt_col 698 } if not date_bound_only else {} 699 filter_params_index_limit = get_config('pipes', 'sync', 'filter_params_index_limit') 700 _ = kw.pop('params', None) 701 params = { 702 col: [ 703 none_if_null(val) 704 for val in unique_vals 705 ] 706 for col, unique_vals in unique_index_vals.items() 707 if len(unique_vals) <= filter_params_index_limit 708 } if not date_bound_only else {} 709 710 if debug: 711 dprint(f"Looking at data between '{begin}' and '{end}':", **kw) 712 713 backtrack_df = self.get_data( 714 begin = begin, 715 end = end, 716 chunksize = chunksize, 717 params = params, 718 debug = debug, 719 **kw 720 ) 721 if debug: 722 dprint(f"Existing data for {self}:\n" + str(backtrack_df), **kw) 723 dprint(f"Existing dtypes for {self}:\n" + str(backtrack_df.dtypes)) 724 725 ### Separate new rows from changed ones. 726 on_cols = [ 727 col for col_key, col in self.columns.items() 728 if ( 729 col 730 and 731 col_key != 'value' 732 and col in backtrack_df.columns 733 ) 734 ] 735 self_dtypes = self.dtypes 736 on_cols_dtypes = { 737 col: to_pandas_dtype(typ) 738 for col, typ in self_dtypes.items() 739 if col in on_cols 740 } 741 742 ### Detect changes between the old target and new source dataframes. 743 delta_df = add_missing_cols_to_df( 744 filter_unseen_df( 745 backtrack_df, 746 df, 747 dtypes = { 748 col: to_pandas_dtype(typ) 749 for col, typ in self_dtypes.items() 750 }, 751 safe_copy = safe_copy, 752 debug = debug 753 ), 754 on_cols_dtypes, 755 ) 756 757 ### Cast dicts or lists to strings so we can merge. 758 serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str) 759 def deserializer(x): 760 return json.loads(x) if isinstance(x, str) else x 761 762 unhashable_delta_cols = get_unhashable_cols(delta_df) 763 unhashable_backtrack_cols = get_unhashable_cols(backtrack_df) 764 for col in unhashable_delta_cols: 765 delta_df[col] = delta_df[col].apply(serializer) 766 for col in unhashable_backtrack_cols: 767 backtrack_df[col] = backtrack_df[col].apply(serializer) 768 casted_cols = set(unhashable_delta_cols + unhashable_backtrack_cols) 769 770 joined_df = merge( 771 delta_df.fillna(NA), 772 backtrack_df.fillna(NA), 773 how = 'left', 774 on = on_cols, 775 indicator = True, 776 suffixes = ('', '_old'), 777 ) if on_cols else delta_df 778 for col in casted_cols: 779 if col in joined_df.columns: 780 joined_df[col] = joined_df[col].apply(deserializer) 781 if col in delta_df.columns: 782 delta_df[col] = delta_df[col].apply(deserializer) 783 784 ### Determine which rows are completely new. 785 new_rows_mask = (joined_df['_merge'] == 'left_only') if on_cols else None 786 cols = list(backtrack_df.columns) 787 788 unseen_df = ( 789 ( 790 joined_df 791 .where(new_rows_mask) 792 .dropna(how='all')[cols] 793 .reset_index(drop=True) 794 ) if not is_dask else ( 795 joined_df 796 .where(new_rows_mask) 797 .dropna(how='all')[cols] 798 .reset_index(drop=True) 799 ) 800 ) if on_cols else delta_df 801 802 ### Rows that have already been inserted but values have changed. 803 update_df = ( 804 joined_df 805 .where(~new_rows_mask) 806 .dropna(how='all')[cols] 807 .reset_index(drop=True) 808 ) if on_cols else None 809 810 return unseen_df, update_df, delta_df
Inspect a dataframe and filter out rows which already exist in the pipe.
Parameters
- df ('pd.DataFrame'): The dataframe to inspect and filter.
- safe_copy (bool, default True):
If
True
, create a copy before comparing and modifying the dataframes. Setting toFalse
may mutate the DataFrames. Seemeerschaum.utils.dataframe.filter_unseen_df
. - date_bound_only (bool, default False):
If
True
, only use the datetime index to fetch the sample dataframe. - chunksize (Optional[int], default -1):
The
chunksize
used when fetching existing data. - debug (bool, default False): Verbosity toggle.
Returns
- A tuple of three pandas DataFrames (unseen, update, and delta.):
835def get_num_workers(self, workers: Optional[int] = None) -> int: 836 """ 837 Get the number of workers to use for concurrent syncs. 838 839 Parameters 840 ---------- 841 The number of workers passed via `--workers`. 842 843 Returns 844 ------- 845 The number of workers, capped for safety. 846 """ 847 is_thread_safe = getattr(self.instance_connector, 'IS_THREAD_SAFE', False) 848 if not is_thread_safe: 849 return 1 850 851 engine_pool_size = ( 852 self.instance_connector.engine.pool.size() 853 if self.instance_connector.type == 'sql' 854 else None 855 ) 856 current_num_threads = threading.active_count() 857 current_num_connections = ( 858 self.instance_connector.engine.pool.checkedout() 859 if engine_pool_size is not None 860 else current_num_threads 861 ) 862 desired_workers = ( 863 min(workers or engine_pool_size, engine_pool_size) 864 if engine_pool_size is not None 865 else workers 866 ) 867 if desired_workers is None: 868 desired_workers = (multiprocessing.cpu_count() if is_thread_safe else 1) 869 870 return max( 871 (desired_workers - current_num_connections), 872 1, 873 )
Get the number of workers to use for concurrent syncs.
Parameters
- The number of workers passed via
--workers
.
Returns
- The number of workers, capped for safety.
15def verify( 16 self, 17 begin: Union[datetime, int, None] = None, 18 end: Union[datetime, int, None] = None, 19 params: Optional[Dict[str, Any]] = None, 20 chunk_interval: Union[timedelta, int, None] = None, 21 bounded: Optional[bool] = None, 22 deduplicate: bool = False, 23 workers: Optional[int] = None, 24 debug: bool = False, 25 **kwargs: Any 26 ) -> SuccessTuple: 27 """ 28 Verify the contents of the pipe by resyncing its interval. 29 30 Parameters 31 ---------- 32 begin: Union[datetime, int, None], default None 33 If specified, only verify rows greater than or equal to this value. 34 35 end: Union[datetime, int, None], default None 36 If specified, only verify rows less than this value. 37 38 chunk_interval: Union[timedelta, int, None], default None 39 If provided, use this as the size of the chunk boundaries. 40 Default to the value set in `pipe.parameters['chunk_minutes']` (1440). 41 42 bounded: Optional[bool], default None 43 If `True`, do not verify older than the oldest sync time or newer than the newest. 44 If `False`, verify unbounded syncs outside of the new and old sync times. 45 The default behavior (`None`) is to bound only if a bound interval is set 46 (e.g. `pipe.parameters['verify']['bound_days']`). 47 48 deduplicate: bool, default False 49 If `True`, deduplicate the pipe's table after the verification syncs. 50 51 workers: Optional[int], default None 52 If provided, limit the verification to this many threads. 53 Use a value of `1` to sync chunks in series. 54 55 debug: bool, default False 56 Verbosity toggle. 57 58 kwargs: Any 59 All keyword arguments are passed to `pipe.sync()`. 60 61 Returns 62 ------- 63 A SuccessTuple indicating whether the pipe was successfully resynced. 64 """ 65 from meerschaum.utils.pool import get_pool 66 from meerschaum.utils.misc import interval_str 67 workers = self.get_num_workers(workers) 68 69 ### Skip configured bounding in parameters 70 ### if `bounded` is explicitly `False`. 71 bound_time = ( 72 self.get_bound_time(debug=debug) 73 if bounded is not False 74 else None 75 ) 76 if bounded is None: 77 bounded = bound_time is not None 78 79 if bounded and begin is None: 80 begin = ( 81 bound_time 82 if bound_time is not None 83 else self.get_sync_time(newest=False, debug=debug) 84 ) 85 if bounded and end is None: 86 end = self.get_sync_time(newest=True, debug=debug) 87 88 if bounded and end is not None: 89 end += ( 90 timedelta(minutes=1) 91 if isinstance(end, datetime) 92 else 1 93 ) 94 95 sync_less_than_begin = not bounded and begin is None 96 sync_greater_than_end = not bounded and end is None 97 98 cannot_determine_bounds = not self.exists(debug=debug) 99 100 if cannot_determine_bounds: 101 sync_success, sync_msg = self.sync( 102 begin = begin, 103 end = end, 104 params = params, 105 workers = workers, 106 debug = debug, 107 **kwargs 108 ) 109 if not sync_success: 110 return sync_success, sync_msg 111 if deduplicate: 112 return self.deduplicate( 113 begin = begin, 114 end = end, 115 params = params, 116 workers = workers, 117 debug = debug, 118 **kwargs 119 ) 120 return sync_success, sync_msg 121 122 123 chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug) 124 chunk_bounds = self.get_chunk_bounds( 125 begin = begin, 126 end = end, 127 chunk_interval = chunk_interval, 128 bounded = bounded, 129 debug = debug, 130 ) 131 132 ### Consider it a success if no chunks need to be verified. 133 if not chunk_bounds: 134 if deduplicate: 135 return self.deduplicate( 136 begin = begin, 137 end = end, 138 params = params, 139 workers = workers, 140 debug = debug, 141 **kwargs 142 ) 143 return True, f"Could not determine chunks between '{begin}' and '{end}'; nothing to do." 144 145 begin_to_print = ( 146 begin 147 if begin is not None 148 else ( 149 chunk_bounds[0][0] 150 if bounded 151 else chunk_bounds[0][1] 152 ) 153 ) 154 end_to_print = ( 155 end 156 if end is not None 157 else ( 158 chunk_bounds[-1][1] 159 if bounded 160 else chunk_bounds[-1][0] 161 ) 162 ) 163 164 info( 165 f"Syncing {len(chunk_bounds)} chunk" + ('s' if len(chunk_bounds) != 1 else '') 166 + f" ({'un' if not bounded else ''}bounded)" 167 + f" of size '{interval_str(chunk_interval)}'" 168 + f" between '{begin_to_print}' and '{end_to_print}'." 169 ) 170 171 pool = get_pool(workers=workers) 172 173 ### Dictionary of the form bounds -> success_tuple, e.g.: 174 ### { 175 ### (2023-01-01, 2023-01-02): (True, "Success") 176 ### } 177 bounds_success_tuples = {} 178 def process_chunk_bounds( 179 chunk_begin_and_end: Tuple[ 180 Union[int, datetime], 181 Union[int, datetime] 182 ] 183 ): 184 if chunk_begin_and_end in bounds_success_tuples: 185 return chunk_begin_and_end, bounds_success_tuples[chunk_begin_and_end] 186 187 chunk_begin, chunk_end = chunk_begin_and_end 188 return chunk_begin_and_end, self.sync( 189 begin = chunk_begin, 190 end = chunk_end, 191 params = params, 192 workers = workers, 193 debug = debug, 194 **kwargs 195 ) 196 197 ### If we have more than one chunk, attempt to sync the first one and return if its fails. 198 if len(chunk_bounds) > 1: 199 first_chunk_bounds = chunk_bounds[0] 200 ( 201 (first_begin, first_end), 202 (first_success, first_msg) 203 ) = process_chunk_bounds(first_chunk_bounds) 204 if not first_success: 205 return ( 206 first_success, 207 f"\n{first_begin} - {first_end}\n" 208 + f"Failed to sync first chunk:\n{first_msg}" 209 ) 210 bounds_success_tuples[first_chunk_bounds] = (first_success, first_msg) 211 212 bounds_success_tuples.update(dict(pool.map(process_chunk_bounds, chunk_bounds))) 213 bounds_success_bools = {bounds: tup[0] for bounds, tup in bounds_success_tuples.items()} 214 215 message_header = f"{begin_to_print} - {end_to_print}" 216 if all(bounds_success_bools.values()): 217 msg = get_chunks_success_message(bounds_success_tuples, header=message_header) 218 if deduplicate: 219 deduplicate_success, deduplicate_msg = self.deduplicate( 220 begin = begin, 221 end = end, 222 params = params, 223 workers = workers, 224 debug = debug, 225 **kwargs 226 ) 227 return deduplicate_success, msg + '\n\n' + deduplicate_msg 228 return True, msg 229 230 chunk_bounds_to_resync = [ 231 bounds 232 for bounds, success in zip(chunk_bounds, bounds_success_bools) 233 if not success 234 ] 235 bounds_to_print = [ 236 f"{bounds[0]} - {bounds[1]}" 237 for bounds in chunk_bounds_to_resync 238 ] 239 if bounds_to_print: 240 warn( 241 f"Will resync the following failed chunks:\n " 242 + '\n '.join(bounds_to_print), 243 stack = False, 244 ) 245 246 retry_bounds_success_tuples = dict(pool.map(process_chunk_bounds, chunk_bounds_to_resync)) 247 bounds_success_tuples.update(retry_bounds_success_tuples) 248 retry_bounds_success_bools = { 249 bounds: tup[0] 250 for bounds, tup in retry_bounds_success_tuples.items() 251 } 252 253 if all(retry_bounds_success_bools.values()): 254 message = ( 255 get_chunks_success_message(bounds_success_tuples, header=message_header) 256 + f"\nRetried {len(chunk_bounds_to_resync)} chunks." 257 ) 258 if deduplicate: 259 deduplicate_success, deduplicate_msg = self.deduplicate( 260 begin = begin, 261 end = end, 262 params = params, 263 workers = workers, 264 debug = debug, 265 **kwargs 266 ) 267 return deduplicate_success, message + '\n\n' + deduplicate_msg 268 return True, message 269 270 message = get_chunks_success_message(bounds_success_tuples, header=message_header) 271 if deduplicate: 272 deduplicate_success, deduplicate_msg = self.deduplicate( 273 begin = begin, 274 end = end, 275 params = params, 276 workers = workers, 277 debug = debug, 278 **kwargs 279 ) 280 return deduplicate_success, message + '\n\n' + deduplicate_msg 281 return False, message
Verify the contents of the pipe by resyncing its interval.
Parameters
- begin (Union[datetime, int, None], default None): If specified, only verify rows greater than or equal to this value.
- end (Union[datetime, int, None], default None): If specified, only verify rows less than this value.
- chunk_interval (Union[timedelta, int, None], default None):
If provided, use this as the size of the chunk boundaries.
Default to the value set in
pipe.parameters['chunk_minutes']
(1440). - bounded (Optional[bool], default None):
If
True
, do not verify older than the oldest sync time or newer than the newest. IfFalse
, verify unbounded syncs outside of the new and old sync times. The default behavior (None
) is to bound only if a bound interval is set (e.g.pipe.parameters['verify']['bound_days']
). - deduplicate (bool, default False):
If
True
, deduplicate the pipe's table after the verification syncs. - workers (Optional[int], default None):
If provided, limit the verification to this many threads.
Use a value of
1
to sync chunks in series. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All keyword arguments are passed to
pipe.sync()
.
Returns
- A SuccessTuple indicating whether the pipe was successfully resynced.
353def get_bound_interval(self, debug: bool = False) -> Union[timedelta, int, None]: 354 """ 355 Return the interval used to determine the bound time (limit for verification syncs). 356 If the datetime axis is an integer, just return its value. 357 358 Below are the supported keys for the bound interval: 359 360 - `pipe.parameters['verify']['bound_minutes']` 361 - `pipe.parameters['verify']['bound_hours']` 362 - `pipe.parameters['verify']['bound_days']` 363 - `pipe.parameters['verify']['bound_weeks']` 364 - `pipe.parameters['verify']['bound_years']` 365 - `pipe.parameters['verify']['bound_seconds']` 366 367 If multiple keys are present, the first on this priority list will be used. 368 369 Returns 370 ------- 371 A `timedelta` or `int` value to be used to determine the bound time. 372 """ 373 verify_params = self.parameters.get('verify', {}) 374 prefix = 'bound_' 375 suffixes_to_check = ('minutes', 'hours', 'days', 'weeks', 'years', 'seconds') 376 keys_to_search = { 377 key: val 378 for key, val in verify_params.items() 379 if key.startswith(prefix) 380 } 381 bound_time_key, bound_time_value = None, None 382 for key, value in keys_to_search.items(): 383 for suffix in suffixes_to_check: 384 if key == prefix + suffix: 385 bound_time_key = key 386 bound_time_value = value 387 break 388 if bound_time_key is not None: 389 break 390 391 if bound_time_value is None: 392 return bound_time_value 393 394 dt_col = self.columns.get('datetime', None) 395 if not dt_col: 396 return bound_time_value 397 398 dt_typ = self.dtypes.get(dt_col, 'datetime64[ns]') 399 if 'int' in dt_typ.lower(): 400 return int(bound_time_value) 401 402 interval_type = bound_time_key.replace(prefix, '') 403 return timedelta(**{interval_type: bound_time_value})
Return the interval used to determine the bound time (limit for verification syncs). If the datetime axis is an integer, just return its value.
Below are the supported keys for the bound interval:
- `pipe.parameters['verify']['bound_minutes']`
- `pipe.parameters['verify']['bound_hours']`
- `pipe.parameters['verify']['bound_days']`
- `pipe.parameters['verify']['bound_weeks']`
- `pipe.parameters['verify']['bound_years']`
- `pipe.parameters['verify']['bound_seconds']`
If multiple keys are present, the first on this priority list will be used.
Returns
- A
timedelta
orint
value to be used to determine the bound time.
406def get_bound_time(self, debug: bool = False) -> Union[datetime, int, None]: 407 """ 408 The bound time is the limit at which long-running verification syncs should stop. 409 A value of `None` means verification syncs should be unbounded. 410 411 Like deriving a backtrack time from `pipe.get_sync_time()`, 412 the bound time is the sync time minus a large window (e.g. 366 days). 413 414 Unbound verification syncs (i.e. `bound_time is None`) 415 if the oldest sync time is less than the bound interval. 416 417 Returns 418 ------- 419 A `datetime` or `int` corresponding to the 420 `begin` bound for verification and deduplication syncs. 421 """ 422 bound_interval = self.get_bound_interval(debug=debug) 423 if bound_interval is None: 424 return None 425 426 sync_time = self.get_sync_time(debug=debug) 427 if sync_time is None: 428 return None 429 430 bound_time = sync_time - bound_interval 431 oldest_sync_time = self.get_sync_time(newest=False, debug=debug) 432 433 return ( 434 bound_time 435 if bound_time > oldest_sync_time 436 else None 437 )
The bound time is the limit at which long-running verification syncs should stop.
A value of None
means verification syncs should be unbounded.
Like deriving a backtrack time from pipe.get_sync_time()
,
the bound time is the sync time minus a large window (e.g. 366 days).
Unbound verification syncs (i.e. bound_time is None
)
if the oldest sync time is less than the bound interval.
Returns
- A
datetime
orint
corresponding to the begin
bound for verification and deduplication syncs.
12def delete( 13 self, 14 drop: bool = True, 15 debug: bool = False, 16 **kw 17 ) -> SuccessTuple: 18 """ 19 Call the Pipe's instance connector's `delete_pipe()` method. 20 21 Parameters 22 ---------- 23 drop: bool, default True 24 If `True`, drop the pipes' target table. 25 26 debug : bool, default False 27 Verbosity toggle. 28 29 Returns 30 ------- 31 A `SuccessTuple` of success (`bool`), message (`str`). 32 33 """ 34 import os, pathlib 35 from meerschaum.utils.warnings import warn 36 from meerschaum.utils.venv import Venv 37 from meerschaum.connectors import get_connector_plugin 38 39 if self.temporary: 40 return ( 41 False, 42 "Cannot delete pipes created with `temporary=True` (read-only). " 43 + "You may want to call `pipe.drop()` instead." 44 ) 45 46 if self.cache_pipe is not None: 47 _drop_cache_tuple = self.cache_pipe.drop(debug=debug, **kw) 48 if not _drop_cache_tuple[0]: 49 warn(_drop_cache_tuple[1]) 50 if getattr(self.cache_connector, 'flavor', None) == 'sqlite': 51 _cache_db_path = pathlib.Path(self.cache_connector.database) 52 try: 53 os.remove(_cache_db_path) 54 except Exception as e: 55 warn(f"Could not delete cache file '{_cache_db_path}' for {self}:\n{e}") 56 57 if drop: 58 drop_success, drop_msg = self.drop(debug=debug) 59 if not drop_success: 60 warn(f"Failed to drop {self}:\n{drop_msg}") 61 62 with Venv(get_connector_plugin(self.instance_connector)): 63 result = self.instance_connector.delete_pipe(self, debug=debug, **kw) 64 65 if not isinstance(result, tuple): 66 return False, f"Received an unexpected result from '{self.instance_connector}': {result}" 67 68 if result[0]: 69 to_delete = ['_id'] 70 for member in to_delete: 71 if member in self.__dict__: 72 del self.__dict__[member] 73 return result
Call the Pipe's instance connector's delete_pipe()
method.
Parameters
- drop (bool, default True):
If
True
, drop the pipes' target table. - debug (bool, default False): Verbosity toggle.
Returns
- A
SuccessTuple
of success (bool
), message (str
).
13def drop( 14 self, 15 debug: bool = False, 16 **kw: Any 17 ) -> SuccessTuple: 18 """ 19 Call the Pipe's instance connector's `drop_pipe()` method. 20 21 Parameters 22 ---------- 23 debug: bool, default False: 24 Verbosity toggle. 25 26 Returns 27 ------- 28 A `SuccessTuple` of success, message. 29 30 """ 31 self._exists = False 32 from meerschaum.utils.warnings import warn 33 from meerschaum.utils.venv import Venv 34 from meerschaum.connectors import get_connector_plugin 35 36 if self.cache_pipe is not None: 37 _drop_cache_tuple = self.cache_pipe.drop(debug=debug, **kw) 38 if not _drop_cache_tuple[0]: 39 warn(_drop_cache_tuple[1]) 40 41 with Venv(get_connector_plugin(self.instance_connector)): 42 result = self.instance_connector.drop_pipe(self, debug=debug, **kw) 43 return result
Call the Pipe's instance connector's drop_pipe()
method.
Parameters
- debug (bool, default False:): Verbosity toggle.
Returns
- A
SuccessTuple
of success, message.
13def clear( 14 self, 15 begin: Optional[datetime.datetime] = None, 16 end: Optional[datetime.datetime] = None, 17 params: Optional[Dict[str, Any]] = None, 18 debug: bool = False, 19 **kwargs: Any 20 ) -> SuccessTuple: 21 """ 22 Call the Pipe's instance connector's `clear_pipe` method. 23 24 Parameters 25 ---------- 26 begin: Optional[datetime.datetime], default None: 27 If provided, only remove rows newer than this datetime value. 28 29 end: Optional[datetime.datetime], default None: 30 If provided, only remove rows older than this datetime column (not including end). 31 32 params: Optional[Dict[str, Any]], default None 33 See `meerschaum.utils.sql.build_where`. 34 35 debug: bool, default False: 36 Verbositity toggle. 37 38 Returns 39 ------- 40 A `SuccessTuple` corresponding to whether this procedure completed successfully. 41 42 Examples 43 -------- 44 >>> pipe = mrsm.Pipe('test', 'test', columns={'datetime': 'dt'}, instance='sql:local') 45 >>> pipe.sync({'dt': [datetime.datetime(2020, 1, 1, 0, 0)]}) 46 >>> pipe.sync({'dt': [datetime.datetime(2021, 1, 1, 0, 0)]}) 47 >>> pipe.sync({'dt': [datetime.datetime(2022, 1, 1, 0, 0)]}) 48 >>> 49 >>> pipe.clear(begin=datetime.datetime(2021, 1, 1, 0, 0)) 50 >>> pipe.get_data() 51 dt 52 0 2020-01-01 53 54 """ 55 from meerschaum.utils.warnings import warn 56 from meerschaum.utils.venv import Venv 57 from meerschaum.connectors import get_connector_plugin 58 59 if self.cache_pipe is not None: 60 success, msg = self.cache_pipe.clear( 61 begin = begin, 62 end = end, 63 params = params, 64 debug = debug, 65 **kwargs 66 ) 67 if not success: 68 warn(msg) 69 70 with Venv(get_connector_plugin(self.instance_connector)): 71 return self.instance_connector.clear_pipe( 72 self, 73 begin = begin, 74 end = end, 75 params = params, 76 debug = debug, 77 **kwargs 78 )
Call the Pipe's instance connector's clear_pipe
method.
Parameters
- begin (Optional[datetime.datetime], default None:): If provided, only remove rows newer than this datetime value.
- end (Optional[datetime.datetime], default None:): If provided, only remove rows older than this datetime column (not including end).
- params (Optional[Dict[str, Any]], default None):
See
meerschaum.utils.sql.build_where
. - debug (bool, default False:): Verbositity toggle.
Returns
- A
SuccessTuple
corresponding to whether this procedure completed successfully.
Examples
>>> pipe = mrsm.Pipe('test', 'test', columns={'datetime': 'dt'}, instance='sql:local')
>>> pipe.sync({'dt': [datetime.datetime(2020, 1, 1, 0, 0)]})
>>> pipe.sync({'dt': [datetime.datetime(2021, 1, 1, 0, 0)]})
>>> pipe.sync({'dt': [datetime.datetime(2022, 1, 1, 0, 0)]})
>>>
>>> pipe.clear(begin=datetime.datetime(2021, 1, 1, 0, 0))
>>> pipe.get_data()
dt
0 2020-01-01
15def deduplicate( 16 self, 17 begin: Union[datetime, int, None] = None, 18 end: Union[datetime, int, None] = None, 19 params: Optional[Dict[str, Any]] = None, 20 chunk_interval: Union[datetime, int, None] = None, 21 bounded: Optional[bool] = None, 22 workers: Optional[int] = None, 23 debug: bool = False, 24 _use_instance_method: bool = True, 25 **kwargs: Any 26 ) -> SuccessTuple: 27 """ 28 Call the Pipe's instance connector's `delete_duplicates` method to delete duplicate rows. 29 30 Parameters 31 ---------- 32 begin: Union[datetime, int, None], default None: 33 If provided, only deduplicate rows newer than this datetime value. 34 35 end: Union[datetime, int, None], default None: 36 If provided, only deduplicate rows older than this datetime column (not including end). 37 38 params: Optional[Dict[str, Any]], default None 39 Restrict deduplication to this filter (for multiplexed data streams). 40 See `meerschaum.utils.sql.build_where`. 41 42 chunk_interval: Union[timedelta, int, None], default None 43 If provided, use this for the chunk bounds. 44 Defaults to the value set in `pipe.parameters['chunk_minutes']` (1440). 45 46 bounded: Optional[bool], default None 47 Only check outside the oldest and newest sync times if bounded is explicitly `False`. 48 49 workers: Optional[int], default None 50 If the instance connector is thread-safe, limit concurrenct syncs to this many threads. 51 52 debug: bool, default False: 53 Verbositity toggle. 54 55 kwargs: Any 56 All other keyword arguments are passed to 57 `pipe.sync()`, `pipe.clear()`, and `pipe.get_data(). 58 59 Returns 60 ------- 61 A `SuccessTuple` corresponding to whether all of the chunks were successfully deduplicated. 62 """ 63 from meerschaum.utils.warnings import warn, info 64 from meerschaum.utils.misc import interval_str, items_str 65 from meerschaum.utils.venv import Venv 66 from meerschaum.connectors import get_connector_plugin 67 from meerschaum.utils.pool import get_pool 68 69 if self.cache_pipe is not None: 70 success, msg = self.cache_pipe.deduplicate( 71 begin = begin, 72 end = end, 73 params = params, 74 bounded = bounded, 75 debug = debug, 76 _use_instance_method = _use_instance_method, 77 **kwargs 78 ) 79 if not success: 80 warn(msg) 81 82 workers = self.get_num_workers(workers=workers) 83 pool = get_pool(workers=workers) 84 85 if _use_instance_method: 86 with Venv(get_connector_plugin(self.instance_connector)): 87 if hasattr(self.instance_connector, 'deduplicate_pipe'): 88 return self.instance_connector.deduplicate_pipe( 89 self, 90 begin = begin, 91 end = end, 92 params = params, 93 bounded = bounded, 94 debug = debug, 95 **kwargs 96 ) 97 98 ### Only unbound if explicitly False. 99 if bounded is None: 100 bounded = True 101 chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug) 102 103 bound_time = self.get_bound_time(debug=debug) 104 if bounded and begin is None: 105 begin = ( 106 bound_time 107 if bound_time is not None 108 else self.get_sync_time(newest=False, debug=debug) 109 ) 110 if bounded and end is None: 111 end = self.get_sync_time(newest=True, debug=debug) 112 113 if bounded and end is not None: 114 end += ( 115 timedelta(minutes=1) 116 if isinstance(end, datetime) 117 else 1 118 ) 119 120 chunk_bounds = self.get_chunk_bounds( 121 bounded = bounded, 122 begin = begin, 123 end = end, 124 chunk_interval = chunk_interval, 125 debug = debug, 126 ) 127 128 indices = [col for col in self.columns.values() if col] 129 if not indices: 130 return False, f"Cannot deduplicate without index columns." 131 dt_col = self.columns.get('datetime', None) 132 133 def process_chunk_bounds(bounds) -> Tuple[ 134 Tuple[ 135 Union[datetime, int, None], 136 Union[datetime, int, None] 137 ], 138 SuccessTuple 139 ]: 140 ### Only selecting the index values here to keep bandwidth down. 141 chunk_begin, chunk_end = bounds 142 chunk_df = self.get_data( 143 select_columns = indices, 144 begin = chunk_begin, 145 end = chunk_end, 146 params = params, 147 debug = debug, 148 ) 149 if chunk_df is None: 150 return bounds, (True, "") 151 existing_chunk_len = len(chunk_df) 152 deduped_chunk_df = chunk_df.drop_duplicates(keep='last') 153 deduped_chunk_len = len(deduped_chunk_df) 154 155 if existing_chunk_len == deduped_chunk_len: 156 return bounds, (True, "") 157 158 chunk_msg_header = f"\n{chunk_begin} - {chunk_end}" 159 chunk_msg_body = "" 160 161 full_chunk = self.get_data( 162 begin = chunk_begin, 163 end = chunk_end, 164 params = params, 165 debug = debug, 166 ) 167 if full_chunk is None or len(full_chunk) == 0: 168 return bounds, (True, f"{chunk_msg_header}\nChunk is empty, skipping...") 169 170 chunk_indices = [ix for ix in indices if ix in full_chunk.columns] 171 if not chunk_indices: 172 return bounds, (False, f"None of {items_str(indices)} were present in chunk.") 173 try: 174 full_chunk = full_chunk.drop_duplicates( 175 subset = chunk_indices, 176 keep = 'last' 177 ).reset_index( 178 drop = True, 179 ) 180 except Exception as e: 181 return ( 182 bounds, 183 (False, f"Failed to deduplicate chunk on {items_str(chunk_indices)}:\n({e})") 184 ) 185 186 clear_success, clear_msg = self.clear( 187 begin = chunk_begin, 188 end = chunk_end, 189 params = params, 190 debug = debug, 191 ) 192 if not clear_success: 193 chunk_msg_body += f"Failed to clear chunk while deduplicating:\n{clear_msg}\n" 194 warn(chunk_msg_body) 195 196 sync_success, sync_msg = self.sync(full_chunk, debug=debug) 197 if not sync_success: 198 chunk_msg_body += f"Failed to sync chunk while deduplicating:\n{sync_msg}\n" 199 200 ### Finally check if the deduplication worked. 201 chunk_rowcount = self.get_rowcount( 202 begin = chunk_begin, 203 end = chunk_end, 204 params = params, 205 debug = debug, 206 ) 207 if chunk_rowcount != deduped_chunk_len: 208 return bounds, ( 209 False, ( 210 chunk_msg_header + "\n" 211 + chunk_msg_body + ("\n" if chunk_msg_body else '') 212 + "Chunk rowcounts still differ (" 213 + f"{chunk_rowcount} rowcount vs {deduped_chunk_len} chunk length)." 214 ) 215 ) 216 217 return bounds, ( 218 True, ( 219 chunk_msg_header + "\n" 220 + chunk_msg_body + ("\n" if chunk_msg_body else '') 221 + f"Deduplicated chunk from {existing_chunk_len} to {chunk_rowcount} rows." 222 ) 223 ) 224 225 info( 226 f"Deduplicating {len(chunk_bounds)} chunk" 227 + ('s' if len(chunk_bounds) != 1 else '') 228 + f" ({'un' if not bounded else ''}bounded)" 229 + f" of size '{interval_str(chunk_interval)}'" 230 + f" on {self}." 231 ) 232 bounds_success_tuples = dict(pool.map(process_chunk_bounds, chunk_bounds)) 233 bounds_successes = { 234 bounds: success_tuple 235 for bounds, success_tuple in bounds_success_tuples.items() 236 if success_tuple[0] 237 } 238 bounds_failures = { 239 bounds: success_tuple 240 for bounds, success_tuple in bounds_success_tuples.items() 241 if not success_tuple[0] 242 } 243 244 ### No need to retry if everything failed. 245 if len(bounds_failures) > 0 and len(bounds_successes) == 0: 246 return ( 247 False, 248 ( 249 f"Failed to deduplicate {len(bounds_failures)} chunk" 250 + ('s' if len(bounds_failures) != 1 else '') 251 + ".\n" 252 + "\n".join([msg for _, (_, msg) in bounds_failures.items() if msg]) 253 ) 254 ) 255 256 retry_bounds = [bounds for bounds in bounds_failures] 257 if not retry_bounds: 258 return ( 259 True, 260 ( 261 f"Successfully deduplicated {len(bounds_successes)} chunk" 262 + ('s' if len(bounds_successes) != 1 else '') 263 + ".\n" 264 + "\n".join([msg for _, (_, msg) in bounds_successes.items() if msg]) 265 ).rstrip('\n') 266 ) 267 268 info(f"Retrying {len(retry_bounds)} chunks for {self}...") 269 retry_bounds_success_tuples = dict(pool.map(process_chunk_bounds, retry_bounds)) 270 retry_bounds_successes = { 271 bounds: success_tuple 272 for bounds, success_tuple in bounds_success_tuples.items() 273 if success_tuple[0] 274 } 275 retry_bounds_failures = { 276 bounds: success_tuple 277 for bounds, success_tuple in bounds_success_tuples.items() 278 if not success_tuple[0] 279 } 280 281 bounds_successes.update(retry_bounds_successes) 282 if not retry_bounds_failures: 283 return ( 284 True, 285 ( 286 f"Successfully deduplicated {len(bounds_successes)} chunk" 287 + ('s' if len(bounds_successes) != 1 else '') 288 + f"({len(retry_bounds_successes)} retried):\n" 289 + "\n".join([msg for _, (_, msg) in bounds_successes.items() if msg]) 290 ).rstrip('\n') 291 ) 292 293 return ( 294 False, 295 ( 296 f"Failed to deduplicate {len(bounds_failures)} chunk" 297 + ('s' if len(retry_bounds_failures) != 1 else '') 298 + ".\n" 299 + "\n".join([msg for _, (_, msg) in retry_bounds_failures.items() if msg]) 300 ).rstrip('\n') 301 )
Call the Pipe's instance connector's delete_duplicates
method to delete duplicate rows.
Parameters
- begin (Union[datetime, int, None], default None:): If provided, only deduplicate rows newer than this datetime value.
- end (Union[datetime, int, None], default None:): If provided, only deduplicate rows older than this datetime column (not including end).
- params (Optional[Dict[str, Any]], default None):
Restrict deduplication to this filter (for multiplexed data streams).
See
meerschaum.utils.sql.build_where
. - chunk_interval (Union[timedelta, int, None], default None):
If provided, use this for the chunk bounds.
Defaults to the value set in
pipe.parameters['chunk_minutes']
(1440). - bounded (Optional[bool], default None):
Only check outside the oldest and newest sync times if bounded is explicitly
False
. - workers (Optional[int], default None): If the instance connector is thread-safe, limit concurrenct syncs to this many threads.
- debug (bool, default False:): Verbositity toggle.
- kwargs (Any):
All other keyword arguments are passed to
pipe.sync()
,pipe.clear()
, and `pipe.get_data().
Returns
- A
SuccessTuple
corresponding to whether all of the chunks were successfully deduplicated.
13def bootstrap( 14 self, 15 debug: bool = False, 16 yes: bool = False, 17 force: bool = False, 18 noask: bool = False, 19 shell: bool = False, 20 **kw 21 ) -> SuccessTuple: 22 """ 23 Prompt the user to create a pipe's requirements all from one method. 24 This method shouldn't be used in any automated scripts because it interactively 25 prompts the user and therefore may hang. 26 27 Parameters 28 ---------- 29 debug: bool, default False: 30 Verbosity toggle. 31 32 yes: bool, default False: 33 Print the questions and automatically agree. 34 35 force: bool, default False: 36 Skip the questions and agree anyway. 37 38 noask: bool, default False: 39 Print the questions but go with the default answer. 40 41 shell: bool, default False: 42 Used to determine if we are in the interactive shell. 43 44 Returns 45 ------- 46 A `SuccessTuple` corresponding to the success of this procedure. 47 48 """ 49 50 from meerschaum.utils.warnings import warn, info, error 51 from meerschaum.utils.prompt import prompt, yes_no 52 from meerschaum.utils.formatting import pprint 53 from meerschaum.config import get_config 54 from meerschaum.utils.formatting._shell import clear_screen 55 from meerschaum.utils.formatting import print_tuple 56 from meerschaum.actions import actions 57 from meerschaum.utils.venv import Venv 58 from meerschaum.connectors import get_connector_plugin 59 60 _clear = get_config('shell', 'clear_screen', patch=True) 61 62 if self.get_id(debug=debug) is not None: 63 delete_tuple = self.delete(debug=debug) 64 if not delete_tuple[0]: 65 return delete_tuple 66 67 if _clear: 68 clear_screen(debug=debug) 69 70 _parameters = _get_parameters(self, debug=debug) 71 self.parameters = _parameters 72 pprint(self.parameters) 73 try: 74 prompt( 75 f"\n Press [Enter] to register {self} with the above configuration:", 76 icon = False 77 ) 78 except KeyboardInterrupt as e: 79 return False, f"Aborting bootstrapping {self}." 80 81 with Venv(get_connector_plugin(self.instance_connector)): 82 register_tuple = self.instance_connector.register_pipe(self, debug=debug) 83 84 if not register_tuple[0]: 85 return register_tuple 86 87 if _clear: 88 clear_screen(debug=debug) 89 90 try: 91 if yes_no( 92 f"Would you like to edit the definition for {self}?", yes=yes, noask=noask 93 ): 94 edit_tuple = self.edit_definition(debug=debug) 95 if not edit_tuple[0]: 96 return edit_tuple 97 98 if yes_no(f"Would you like to try syncing {self} now?", yes=yes, noask=noask): 99 sync_tuple = actions['sync']( 100 ['pipes'], 101 connector_keys = [self.connector_keys], 102 metric_keys = [self.metric_key], 103 location_keys = [self.location_key], 104 mrsm_instance = str(self.instance_connector), 105 debug = debug, 106 shell = shell, 107 ) 108 if not sync_tuple[0]: 109 return sync_tuple 110 except Exception as e: 111 return False, f"Failed to bootstrap {self}:\n" + str(e) 112 113 print_tuple((True, f"Finished bootstrapping {self}!")) 114 info( 115 f"You can edit this pipe later with `edit pipes` " 116 + "or set the definition with `edit pipes definition`.\n" 117 + " To sync data into your pipe, run `sync pipes`." 118 ) 119 120 return True, "Success"
Prompt the user to create a pipe's requirements all from one method. This method shouldn't be used in any automated scripts because it interactively prompts the user and therefore may hang.
Parameters
- debug (bool, default False:): Verbosity toggle.
- yes (bool, default False:): Print the questions and automatically agree.
- force (bool, default False:): Skip the questions and agree anyway.
- noask (bool, default False:): Print the questions but go with the default answer.
- shell (bool, default False:): Used to determine if we are in the interactive shell.
Returns
- A
SuccessTuple
corresponding to the success of this procedure.
19def enforce_dtypes( 20 self, 21 df: 'pd.DataFrame', 22 chunksize: Optional[int] = -1, 23 safe_copy: bool = True, 24 debug: bool = False, 25 ) -> 'pd.DataFrame': 26 """ 27 Cast the input dataframe to the pipe's registered data types. 28 If the pipe does not exist and dtypes are not set, return the dataframe. 29 """ 30 import traceback 31 from meerschaum.utils.warnings import warn 32 from meerschaum.utils.debug import dprint 33 from meerschaum.utils.dataframe import parse_df_datetimes, enforce_dtypes as _enforce_dtypes 34 from meerschaum.utils.packages import import_pandas 35 pd = import_pandas(debug=debug) 36 if df is None: 37 if debug: 38 dprint( 39 f"Received None instead of a DataFrame.\n" 40 + " Skipping dtype enforcement..." 41 ) 42 return df 43 44 pipe_dtypes = self.dtypes 45 46 try: 47 if isinstance(df, str): 48 df = parse_df_datetimes( 49 pd.read_json(StringIO(df)), 50 ignore_cols = [ 51 col 52 for col, dtype in pipe_dtypes.items() 53 if 'datetime' not in str(dtype) 54 ], 55 chunksize = chunksize, 56 debug = debug, 57 ) 58 else: 59 df = parse_df_datetimes( 60 df, 61 ignore_cols = [ 62 col 63 for col, dtype in pipe_dtypes.items() 64 if 'datetime' not in str(dtype) 65 ], 66 chunksize = chunksize, 67 debug = debug, 68 ) 69 except Exception as e: 70 warn(f"Unable to cast incoming data as a DataFrame...:\n{e}\n\n{traceback.format_exc()}") 71 return None 72 73 if not pipe_dtypes: 74 if debug: 75 dprint( 76 f"Could not find dtypes for {self}.\n" 77 + " Skipping dtype enforcement..." 78 ) 79 return df 80 81 return _enforce_dtypes(df, pipe_dtypes, safe_copy=safe_copy, debug=debug)
Cast the input dataframe to the pipe's registered data types. If the pipe does not exist and dtypes are not set, return the dataframe.
84def infer_dtypes(self, persist: bool=False, debug: bool=False) -> Dict[str, Any]: 85 """ 86 If `dtypes` is not set in `meerschaum.Pipe.parameters`, 87 infer the data types from the underlying table if it exists. 88 89 Parameters 90 ---------- 91 persist: bool, default False 92 If `True`, persist the inferred data types to `meerschaum.Pipe.parameters`. 93 94 Returns 95 ------- 96 A dictionary of strings containing the pandas data types for this Pipe. 97 """ 98 if not self.exists(debug=debug): 99 dtypes = {} 100 if not self.columns: 101 return {} 102 dt_col = self.columns.get('datetime', None) 103 if dt_col: 104 if not self.parameters.get('dtypes', {}).get(dt_col, None): 105 dtypes[dt_col] = 'datetime64[ns]' 106 return dtypes 107 108 from meerschaum.utils.sql import get_pd_type 109 from meerschaum.utils.misc import to_pandas_dtype 110 columns_types = self.get_columns_types(debug=debug) 111 112 ### NOTE: get_columns_types() may return either the types as 113 ### PostgreSQL- or Pandas-style. 114 dtypes = { 115 c: ( 116 get_pd_type(t, allow_custom_dtypes=True) 117 if str(t).isupper() 118 else to_pandas_dtype(t) 119 ) 120 for c, t in columns_types.items() 121 } if columns_types else {} 122 if persist: 123 self.dtypes = dtypes 124 self.edit(interactive=False, debug=debug) 125 return dtypes
If dtypes
is not set in Pipe.parameters
,
infer the data types from the underlying table if it exists.
Parameters
- persist (bool, default False):
If
True
, persist the inferred data types toPipe.parameters
.
Returns
- A dictionary of strings containing the pandas data types for this Pipe.
33class Plugin: 34 """Handle packaging of Meerschaum plugins.""" 35 def __init__( 36 self, 37 name: str, 38 version: Optional[str] = None, 39 user_id: Optional[int] = None, 40 required: Optional[List[str]] = None, 41 attributes: Optional[Dict[str, Any]] = None, 42 archive_path: Optional[pathlib.Path] = None, 43 venv_path: Optional[pathlib.Path] = None, 44 repo_connector: Optional['meerschaum.connectors.api.APIConnector'] = None, 45 repo: Union['meerschaum.connectors.api.APIConnector', str, None] = None, 46 ): 47 from meerschaum.config.static import STATIC_CONFIG 48 sep = STATIC_CONFIG['plugins']['repo_separator'] 49 _repo = None 50 if sep in name: 51 try: 52 name, _repo = name.split(sep) 53 except Exception as e: 54 error(f"Invalid plugin name: '{name}'") 55 self._repo_in_name = _repo 56 57 if attributes is None: 58 attributes = {} 59 self.name = name 60 self.attributes = attributes 61 self.user_id = user_id 62 self._version = version 63 if required: 64 self._required = required 65 self.archive_path = ( 66 archive_path if archive_path is not None 67 else PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz" 68 ) 69 self.venv_path = ( 70 venv_path if venv_path is not None 71 else VIRTENV_RESOURCES_PATH / self.name 72 ) 73 self._repo_connector = repo_connector 74 self._repo_keys = repo 75 76 77 @property 78 def repo_connector(self): 79 """ 80 Return the repository connector for this plugin. 81 NOTE: This imports the `connectors` module, which imports certain plugin modules. 82 """ 83 if self._repo_connector is None: 84 from meerschaum.connectors.parse import parse_repo_keys 85 86 repo_keys = self._repo_keys or self._repo_in_name 87 if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name: 88 error( 89 f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'." 90 ) 91 repo_connector = parse_repo_keys(repo_keys) 92 self._repo_connector = repo_connector 93 return self._repo_connector 94 95 96 @property 97 def version(self): 98 """ 99 Return the plugin's module version is defined (`__version__`) if it's defined. 100 """ 101 if self._version is None: 102 try: 103 self._version = self.module.__version__ 104 except Exception as e: 105 self._version = None 106 return self._version 107 108 109 @property 110 def module(self): 111 """ 112 Return the Python module of the underlying plugin. 113 """ 114 if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None: 115 if self.__file__ is None: 116 return None 117 from meerschaum.plugins import import_plugins 118 self._module = import_plugins(str(self), warn=False) 119 return self._module 120 121 122 @property 123 def __file__(self) -> Union[str, None]: 124 """ 125 Return the file path (str) of the plugin if it exists, otherwise `None`. 126 """ 127 if self.__dict__.get('_module', None) is not None: 128 return self.module.__file__ 129 130 potential_dir = PLUGINS_RESOURCES_PATH / self.name 131 if ( 132 potential_dir.exists() 133 and potential_dir.is_dir() 134 and (potential_dir / '__init__.py').exists() 135 ): 136 return str((potential_dir / '__init__.py').as_posix()) 137 138 potential_file = PLUGINS_RESOURCES_PATH / (self.name + '.py') 139 if potential_file.exists() and not potential_file.is_dir(): 140 return str(potential_file.as_posix()) 141 142 return None 143 144 145 @property 146 def requirements_file_path(self) -> Union[pathlib.Path, None]: 147 """ 148 If a file named `requirements.txt` exists, return its path. 149 """ 150 if self.__file__ is None: 151 return None 152 path = pathlib.Path(self.__file__).parent / 'requirements.txt' 153 if not path.exists(): 154 return None 155 return path 156 157 158 def is_installed(self, **kw) -> bool: 159 """ 160 Check whether a plugin is correctly installed. 161 162 Returns 163 ------- 164 A `bool` indicating whether a plugin exists and is successfully imported. 165 """ 166 return self.__file__ is not None 167 168 169 def make_tar(self, debug: bool = False) -> pathlib.Path: 170 """ 171 Compress the plugin's source files into a `.tar.gz` archive and return the archive's path. 172 173 Parameters 174 ---------- 175 debug: bool, default False 176 Verbosity toggle. 177 178 Returns 179 ------- 180 A `pathlib.Path` to the archive file's path. 181 182 """ 183 import tarfile, pathlib, subprocess, fnmatch 184 from meerschaum.utils.debug import dprint 185 from meerschaum.utils.packages import attempt_import 186 pathspec = attempt_import('pathspec', debug=debug) 187 188 if not self.__file__: 189 from meerschaum.utils.warnings import error 190 error(f"Could not find file for plugin '{self}'.") 191 if '__init__.py' in self.__file__ or os.path.isdir(self.__file__): 192 path = self.__file__.replace('__init__.py', '') 193 is_dir = True 194 else: 195 path = self.__file__ 196 is_dir = False 197 198 old_cwd = os.getcwd() 199 real_parent_path = pathlib.Path(os.path.realpath(path)).parent 200 os.chdir(real_parent_path) 201 202 default_patterns_to_ignore = [ 203 '.pyc', 204 '__pycache__/', 205 'eggs/', 206 '__pypackages__/', 207 '.git', 208 ] 209 210 def parse_gitignore() -> 'Set[str]': 211 gitignore_path = pathlib.Path(path) / '.gitignore' 212 if not gitignore_path.exists(): 213 return set(default_patterns_to_ignore) 214 with open(gitignore_path, 'r', encoding='utf-8') as f: 215 gitignore_text = f.read() 216 return set(pathspec.PathSpec.from_lines( 217 pathspec.patterns.GitWildMatchPattern, 218 default_patterns_to_ignore + gitignore_text.splitlines() 219 ).match_tree(path)) 220 221 patterns_to_ignore = parse_gitignore() if is_dir else set() 222 223 if debug: 224 dprint(f"Patterns to ignore:\n{patterns_to_ignore}") 225 226 with tarfile.open(self.archive_path, 'w:gz') as tarf: 227 if not is_dir: 228 tarf.add(f"{self.name}.py") 229 else: 230 for root, dirs, files in os.walk(self.name): 231 for f in files: 232 good_file = True 233 fp = os.path.join(root, f) 234 for pattern in patterns_to_ignore: 235 if pattern in str(fp) or f.startswith('.'): 236 good_file = False 237 break 238 if good_file: 239 if debug: 240 dprint(f"Adding '{fp}'...") 241 tarf.add(fp) 242 243 ### clean up and change back to old directory 244 os.chdir(old_cwd) 245 246 ### change to 775 to avoid permissions issues with the API in a Docker container 247 self.archive_path.chmod(0o775) 248 249 if debug: 250 dprint(f"Created archive '{self.archive_path}'.") 251 return self.archive_path 252 253 254 def install( 255 self, 256 skip_deps: bool = False, 257 force: bool = False, 258 debug: bool = False, 259 ) -> SuccessTuple: 260 """ 261 Extract a plugin's tar archive to the plugins directory. 262 263 This function checks if the plugin is already installed and if the version is equal or 264 greater than the existing installation. 265 266 Parameters 267 ---------- 268 skip_deps: bool, default False 269 If `True`, do not install dependencies. 270 271 force: bool, default False 272 If `True`, continue with installation, even if required packages fail to install. 273 274 debug: bool, default False 275 Verbosity toggle. 276 277 Returns 278 ------- 279 A `SuccessTuple` of success (bool) and a message (str). 280 281 """ 282 if self.full_name in _ongoing_installations: 283 return True, f"Already installing plugin '{self}'." 284 _ongoing_installations.add(self.full_name) 285 from meerschaum.utils.warnings import warn, error 286 if debug: 287 from meerschaum.utils.debug import dprint 288 import tarfile 289 import re 290 import ast 291 from meerschaum.plugins import sync_plugins_symlinks 292 from meerschaum.utils.packages import attempt_import, determine_version, reload_meerschaum 293 from meerschaum.utils.venv import init_venv 294 from meerschaum.utils.misc import safely_extract_tar 295 old_cwd = os.getcwd() 296 old_version = '' 297 new_version = '' 298 temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name 299 temp_dir.mkdir(exist_ok=True) 300 301 if not self.archive_path.exists(): 302 return False, f"Missing archive file for plugin '{self}'." 303 if self.version is not None: 304 old_version = self.version 305 if debug: 306 dprint(f"Found existing version '{old_version}' for plugin '{self}'.") 307 308 if debug: 309 dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...") 310 311 try: 312 with tarfile.open(self.archive_path, 'r:gz') as tarf: 313 safely_extract_tar(tarf, temp_dir) 314 except Exception as e: 315 warn(e) 316 return False, f"Failed to extract plugin '{self.name}'." 317 318 ### search for version information 319 files = os.listdir(temp_dir) 320 321 if str(files[0]) == self.name: 322 is_dir = True 323 elif str(files[0]) == self.name + '.py': 324 is_dir = False 325 else: 326 error(f"Unknown format encountered for plugin '{self}'.") 327 328 fpath = temp_dir / files[0] 329 if is_dir: 330 fpath = fpath / '__init__.py' 331 332 init_venv(self.name, debug=debug) 333 with open(fpath, 'r', encoding='utf-8') as f: 334 init_lines = f.readlines() 335 new_version = None 336 for line in init_lines: 337 if '__version__' not in line: 338 continue 339 version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip()) 340 if not version_match: 341 continue 342 new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip()) 343 break 344 if not new_version: 345 warn( 346 f"No `__version__` defined for plugin '{self}'. " 347 + "Assuming new version...", 348 stack = False, 349 ) 350 351 packaging_version = attempt_import('packaging.version') 352 try: 353 is_new_version = (not new_version and not old_version) or ( 354 packaging_version.parse(old_version) < packaging_version.parse(new_version) 355 ) 356 is_same_version = new_version and old_version and ( 357 packaging_version.parse(old_version) == packaging_version.parse(new_version) 358 ) 359 except Exception as e: 360 is_new_version, is_same_version = True, False 361 362 ### Determine where to permanently store the new plugin. 363 plugin_installation_dir_path = PLUGINS_DIR_PATHS[0] 364 for path in PLUGINS_DIR_PATHS: 365 files_in_plugins_dir = os.listdir(path) 366 if ( 367 self.name in files_in_plugins_dir 368 or 369 (self.name + '.py') in files_in_plugins_dir 370 ): 371 plugin_installation_dir_path = path 372 break 373 374 success_msg = ( 375 f"Successfully installed plugin '{self}'" 376 + ("\n (skipped dependencies)" if skip_deps else "") 377 + "." 378 ) 379 success, abort = None, None 380 381 if is_same_version and not force: 382 success, msg = True, ( 383 f"Plugin '{self}' is up-to-date (version {old_version}).\n" + 384 " Install again with `-f` or `--force` to reinstall." 385 ) 386 abort = True 387 elif is_new_version or force: 388 for src_dir, dirs, files in os.walk(temp_dir): 389 if success is not None: 390 break 391 dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path)) 392 if not os.path.exists(dst_dir): 393 os.mkdir(dst_dir) 394 for f in files: 395 src_file = os.path.join(src_dir, f) 396 dst_file = os.path.join(dst_dir, f) 397 if os.path.exists(dst_file): 398 os.remove(dst_file) 399 400 if debug: 401 dprint(f"Moving '{src_file}' to '{dst_dir}'...") 402 try: 403 shutil.move(src_file, dst_dir) 404 except Exception as e: 405 success, msg = False, ( 406 f"Failed to install plugin '{self}': " + 407 f"Could not move file '{src_file}' to '{dst_dir}'" 408 ) 409 print(msg) 410 break 411 if success is None: 412 success, msg = True, success_msg 413 else: 414 success, msg = False, ( 415 f"Your installed version of plugin '{self}' ({old_version}) is higher than " 416 + f"attempted version {new_version}." 417 ) 418 419 shutil.rmtree(temp_dir) 420 os.chdir(old_cwd) 421 422 ### Reload the plugin's module. 423 sync_plugins_symlinks(debug=debug) 424 if '_module' in self.__dict__: 425 del self.__dict__['_module'] 426 init_venv(venv=self.name, force=True, debug=debug) 427 reload_meerschaum(debug=debug) 428 429 ### if we've already failed, return here 430 if not success or abort: 431 _ongoing_installations.remove(self.full_name) 432 return success, msg 433 434 ### attempt to install dependencies 435 dependencies_installed = skip_deps or self.install_dependencies(force=force, debug=debug) 436 if not dependencies_installed: 437 _ongoing_installations.remove(self.full_name) 438 return False, f"Failed to install dependencies for plugin '{self}'." 439 440 ### handling success tuple, bool, or other (typically None) 441 setup_tuple = self.setup(debug=debug) 442 if isinstance(setup_tuple, tuple): 443 if not setup_tuple[0]: 444 success, msg = setup_tuple 445 elif isinstance(setup_tuple, bool): 446 if not setup_tuple: 447 success, msg = False, ( 448 f"Failed to run post-install setup for plugin '{self}'." + '\n' + 449 f"Check `setup()` in '{self.__file__}' for more information " + 450 f"(no error message provided)." 451 ) 452 else: 453 success, msg = True, success_msg 454 elif setup_tuple is None: 455 success = True 456 msg = ( 457 f"Post-install for plugin '{self}' returned None. " + 458 f"Assuming plugin successfully installed." 459 ) 460 warn(msg) 461 else: 462 success = False 463 msg = ( 464 f"Post-install for plugin '{self}' returned unexpected value " + 465 f"of type '{type(setup_tuple)}': {setup_tuple}" 466 ) 467 468 _ongoing_installations.remove(self.full_name) 469 module = self.module 470 return success, msg 471 472 473 def remove_archive( 474 self, 475 debug: bool = False 476 ) -> SuccessTuple: 477 """Remove a plugin's archive file.""" 478 if not self.archive_path.exists(): 479 return True, f"Archive file for plugin '{self}' does not exist." 480 try: 481 self.archive_path.unlink() 482 except Exception as e: 483 return False, f"Failed to remove archive for plugin '{self}':\n{e}" 484 return True, "Success" 485 486 487 def remove_venv( 488 self, 489 debug: bool = False 490 ) -> SuccessTuple: 491 """Remove a plugin's virtual environment.""" 492 if not self.venv_path.exists(): 493 return True, f"Virtual environment for plugin '{self}' does not exist." 494 try: 495 shutil.rmtree(self.venv_path) 496 except Exception as e: 497 return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}" 498 return True, "Success" 499 500 501 def uninstall(self, debug: bool = False) -> SuccessTuple: 502 """ 503 Remove a plugin, its virtual environment, and archive file. 504 """ 505 from meerschaum.utils.packages import reload_meerschaum 506 from meerschaum.plugins import sync_plugins_symlinks 507 from meerschaum.utils.warnings import warn, info 508 warnings_thrown_count: int = 0 509 max_warnings: int = 3 510 511 if not self.is_installed(): 512 info( 513 f"Plugin '{self.name}' doesn't seem to be installed.\n " 514 + "Checking for artifacts...", 515 stack = False, 516 ) 517 else: 518 real_path = pathlib.Path(os.path.realpath(self.__file__)) 519 try: 520 if real_path.name == '__init__.py': 521 shutil.rmtree(real_path.parent) 522 else: 523 real_path.unlink() 524 except Exception as e: 525 warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False) 526 warnings_thrown_count += 1 527 else: 528 info(f"Removed source files for plugin '{self.name}'.") 529 530 if self.venv_path.exists(): 531 success, msg = self.remove_venv(debug=debug) 532 if not success: 533 warn(msg, stack=False) 534 warnings_thrown_count += 1 535 else: 536 info(f"Removed virtual environment from plugin '{self.name}'.") 537 538 success = warnings_thrown_count < max_warnings 539 sync_plugins_symlinks(debug=debug) 540 self.deactivate_venv(force=True, debug=debug) 541 reload_meerschaum(debug=debug) 542 return success, ( 543 f"Successfully uninstalled plugin '{self}'." if success 544 else f"Failed to uninstall plugin '{self}'." 545 ) 546 547 548 def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]: 549 """ 550 If exists, run the plugin's `setup()` function. 551 552 Parameters 553 ---------- 554 *args: str 555 The positional arguments passed to the `setup()` function. 556 557 debug: bool, default False 558 Verbosity toggle. 559 560 **kw: Any 561 The keyword arguments passed to the `setup()` function. 562 563 Returns 564 ------- 565 A `SuccessTuple` or `bool` indicating success. 566 567 """ 568 from meerschaum.utils.debug import dprint 569 import inspect 570 _setup = None 571 for name, fp in inspect.getmembers(self.module): 572 if name == 'setup' and inspect.isfunction(fp): 573 _setup = fp 574 break 575 576 ### assume success if no setup() is found (not necessary) 577 if _setup is None: 578 return True 579 580 sig = inspect.signature(_setup) 581 has_debug, has_kw = ('debug' in sig.parameters), False 582 for k, v in sig.parameters.items(): 583 if '**' in str(v): 584 has_kw = True 585 break 586 587 _kw = {} 588 if has_kw: 589 _kw.update(kw) 590 if has_debug: 591 _kw['debug'] = debug 592 593 if debug: 594 dprint(f"Running setup for plugin '{self}'...") 595 try: 596 self.activate_venv(debug=debug) 597 return_tuple = _setup(*args, **_kw) 598 self.deactivate_venv(debug=debug) 599 except Exception as e: 600 return False, str(e) 601 602 if isinstance(return_tuple, tuple): 603 return return_tuple 604 if isinstance(return_tuple, bool): 605 return return_tuple, f"Setup for Plugin '{self.name}' did not return a message." 606 if return_tuple is None: 607 return False, f"Setup for Plugin '{self.name}' returned None." 608 return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}" 609 610 611 def get_dependencies( 612 self, 613 debug: bool = False, 614 ) -> List[str]: 615 """ 616 If the Plugin has specified dependencies in a list called `required`, return the list. 617 618 **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages. 619 Meerschaum plugins may also specify connector keys for a repo after `'@'`. 620 621 Parameters 622 ---------- 623 debug: bool, default False 624 Verbosity toggle. 625 626 Returns 627 ------- 628 A list of required packages and plugins (str). 629 630 """ 631 if '_required' in self.__dict__: 632 return self._required 633 634 ### If the plugin has not yet been imported, 635 ### infer the dependencies from the source text. 636 ### This is not super robust, and it doesn't feel right 637 ### having multiple versions of the logic. 638 ### This is necessary when determining the activation order 639 ### without having import the module. 640 ### For consistency's sake, the module-less method does not cache the requirements. 641 if self.__dict__.get('_module', None) is None: 642 file_path = self.__file__ 643 if file_path is None: 644 return [] 645 with open(file_path, 'r', encoding='utf-8') as f: 646 text = f.read() 647 648 if 'required' not in text: 649 return [] 650 651 ### This has some limitations: 652 ### It relies on `required` being manually declared. 653 ### We lose the ability to dynamically alter the `required` list, 654 ### which is why we've kept the module-reliant method below. 655 import ast, re 656 ### NOTE: This technically would break 657 ### if `required` was the very first line of the file. 658 req_start_match = re.search(r'required(:\s*)?.*=', text) 659 if not req_start_match: 660 return [] 661 req_start = req_start_match.start() 662 equals_sign = req_start + text[req_start:].find('=') 663 664 ### Dependencies may have brackets within the strings, so push back the index. 665 first_opening_brace = equals_sign + 1 + text[equals_sign:].find('[') 666 if first_opening_brace == -1: 667 return [] 668 669 next_closing_brace = equals_sign + 1 + text[equals_sign:].find(']') 670 if next_closing_brace == -1: 671 return [] 672 673 start_ix = first_opening_brace + 1 674 end_ix = next_closing_brace 675 676 num_braces = 0 677 while True: 678 if '[' not in text[start_ix:end_ix]: 679 break 680 num_braces += 1 681 start_ix = end_ix 682 end_ix += text[end_ix + 1:].find(']') + 1 683 684 req_end = end_ix + 1 685 req_text = ( 686 text[(first_opening_brace-1):req_end] 687 .lstrip() 688 .replace('=', '', 1) 689 .lstrip() 690 .rstrip() 691 ) 692 try: 693 required = ast.literal_eval(req_text) 694 except Exception as e: 695 warn( 696 f"Unable to determine requirements for plugin '{self.name}' " 697 + "without importing the module.\n" 698 + " This may be due to dynamically setting the global `required` list.\n" 699 + f" {e}" 700 ) 701 return [] 702 return required 703 704 import inspect 705 self.activate_venv(dependencies=False, debug=debug) 706 required = [] 707 for name, val in inspect.getmembers(self.module): 708 if name == 'required': 709 required = val 710 break 711 self._required = required 712 self.deactivate_venv(dependencies=False, debug=debug) 713 return required 714 715 716 def get_required_plugins(self, debug: bool=False) -> List[meerschaum.plugins.Plugin]: 717 """ 718 Return a list of required Plugin objects. 719 """ 720 from meerschaum.utils.warnings import warn 721 from meerschaum.config import get_config 722 from meerschaum.config.static import STATIC_CONFIG 723 plugins = [] 724 _deps = self.get_dependencies(debug=debug) 725 sep = STATIC_CONFIG['plugins']['repo_separator'] 726 plugin_names = [ 727 _d[len('plugin:'):] for _d in _deps 728 if _d.startswith('plugin:') and len(_d) > len('plugin:') 729 ] 730 default_repo_keys = get_config('meerschaum', 'default_repository') 731 for _plugin_name in plugin_names: 732 if sep in _plugin_name: 733 try: 734 _plugin_name, _repo_keys = _plugin_name.split(sep) 735 except Exception as e: 736 _repo_keys = default_repo_keys 737 warn( 738 f"Invalid repo keys for required plugin '{_plugin_name}'.\n " 739 + f"Will try to use '{_repo_keys}' instead.", 740 stack = False, 741 ) 742 else: 743 _repo_keys = default_repo_keys 744 plugins.append(Plugin(_plugin_name, repo=_repo_keys)) 745 return plugins 746 747 748 def get_required_packages(self, debug: bool=False) -> List[str]: 749 """ 750 Return the required package names (excluding plugins). 751 """ 752 _deps = self.get_dependencies(debug=debug) 753 return [_d for _d in _deps if not _d.startswith('plugin:')] 754 755 756 def activate_venv(self, dependencies: bool=True, debug: bool=False, **kw) -> bool: 757 """ 758 Activate the virtual environments for the plugin and its dependencies. 759 760 Parameters 761 ---------- 762 dependencies: bool, default True 763 If `True`, activate the virtual environments for required plugins. 764 765 Returns 766 ------- 767 A bool indicating success. 768 """ 769 from meerschaum.utils.venv import venv_target_path 770 from meerschaum.utils.packages import activate_venv 771 from meerschaum.utils.misc import make_symlink, is_symlink 772 from meerschaum.config._paths import PACKAGE_ROOT_PATH 773 774 if dependencies: 775 for plugin in self.get_required_plugins(debug=debug): 776 plugin.activate_venv(debug=debug, **kw) 777 778 vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True) 779 venv_meerschaum_path = vtp / 'meerschaum' 780 781 try: 782 success, msg = True, "Success" 783 if is_symlink(venv_meerschaum_path): 784 if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH: 785 venv_meerschaum_path.unlink() 786 success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH) 787 except Exception as e: 788 success, msg = False, str(e) 789 if not success: 790 warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}") 791 792 return activate_venv(self.name, debug=debug, **kw) 793 794 795 def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool: 796 """ 797 Deactivate the virtual environments for the plugin and its dependencies. 798 799 Parameters 800 ---------- 801 dependencies: bool, default True 802 If `True`, deactivate the virtual environments for required plugins. 803 804 Returns 805 ------- 806 A bool indicating success. 807 """ 808 from meerschaum.utils.packages import deactivate_venv 809 success = deactivate_venv(self.name, debug=debug, **kw) 810 if dependencies: 811 for plugin in self.get_required_plugins(debug=debug): 812 plugin.deactivate_venv(debug=debug, **kw) 813 return success 814 815 816 def install_dependencies( 817 self, 818 force: bool = False, 819 debug: bool = False, 820 ) -> bool: 821 """ 822 If specified, install dependencies. 823 824 **NOTE:** Dependencies that start with `'plugin:'` will be installed as 825 Meerschaum plugins from the same repository as this Plugin. 826 To install from a different repository, add the repo keys after `'@'` 827 (e.g. `'plugin:foo@api:bar'`). 828 829 Parameters 830 ---------- 831 force: bool, default False 832 If `True`, continue with the installation, even if some 833 required packages fail to install. 834 835 debug: bool, default False 836 Verbosity toggle. 837 838 Returns 839 ------- 840 A bool indicating success. 841 842 """ 843 from meerschaum.utils.packages import pip_install, venv_contains_package 844 from meerschaum.utils.debug import dprint 845 from meerschaum.utils.warnings import warn, info 846 from meerschaum.connectors.parse import parse_repo_keys 847 _deps = self.get_dependencies(debug=debug) 848 if not _deps and self.requirements_file_path is None: 849 return True 850 851 plugins = self.get_required_plugins(debug=debug) 852 for _plugin in plugins: 853 if _plugin.name == self.name: 854 warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False) 855 continue 856 _success, _msg = _plugin.repo_connector.install_plugin( 857 _plugin.name, debug=debug, force=force 858 ) 859 if not _success: 860 warn( 861 f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'" 862 + f" for plugin '{self.name}':\n" + _msg, 863 stack = False, 864 ) 865 if not force: 866 warn( 867 "Try installing with the `--force` flag to continue anyway.", 868 stack = False, 869 ) 870 return False 871 info( 872 "Continuing with installation despite the failure " 873 + "(careful, things might be broken!)...", 874 icon = False 875 ) 876 877 878 ### First step: parse `requirements.txt` if it exists. 879 if self.requirements_file_path is not None: 880 if not pip_install( 881 requirements_file_path=self.requirements_file_path, 882 venv=self.name, debug=debug 883 ): 884 warn( 885 f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.", 886 stack = False, 887 ) 888 if not force: 889 warn( 890 "Try installing with `--force` to continue anyway.", 891 stack = False, 892 ) 893 return False 894 info( 895 "Continuing with installation despite the failure " 896 + "(careful, things might be broken!)...", 897 icon = False 898 ) 899 900 901 ### Don't reinstall packages that are already included in required plugins. 902 packages = [] 903 _packages = self.get_required_packages(debug=debug) 904 accounted_for_packages = set() 905 for package_name in _packages: 906 for plugin in plugins: 907 if venv_contains_package(package_name, plugin.name): 908 accounted_for_packages.add(package_name) 909 break 910 packages = [pkg for pkg in _packages if pkg not in accounted_for_packages] 911 912 ### Attempt pip packages installation. 913 if packages: 914 for package in packages: 915 if not pip_install(package, venv=self.name, debug=debug): 916 warn( 917 f"Failed to install required package '{package}'" 918 + f" for plugin '{self.name}'.", 919 stack = False, 920 ) 921 if not force: 922 warn( 923 "Try installing with `--force` to continue anyway.", 924 stack = False, 925 ) 926 return False 927 info( 928 "Continuing with installation despite the failure " 929 + "(careful, things might be broken!)...", 930 icon = False 931 ) 932 return True 933 934 935 @property 936 def full_name(self) -> str: 937 """ 938 Include the repo keys with the plugin's name. 939 """ 940 from meerschaum.config.static import STATIC_CONFIG 941 sep = STATIC_CONFIG['plugins']['repo_separator'] 942 return self.name + sep + str(self.repo_connector) 943 944 945 def __str__(self): 946 return self.name 947 948 949 def __repr__(self): 950 return f"Plugin('{self.name}', repo='{self.repo_connector}')" 951 952 953 def __del__(self): 954 pass
Handle packaging of Meerschaum plugins.
35 def __init__( 36 self, 37 name: str, 38 version: Optional[str] = None, 39 user_id: Optional[int] = None, 40 required: Optional[List[str]] = None, 41 attributes: Optional[Dict[str, Any]] = None, 42 archive_path: Optional[pathlib.Path] = None, 43 venv_path: Optional[pathlib.Path] = None, 44 repo_connector: Optional['meerschaum.connectors.api.APIConnector'] = None, 45 repo: Union['meerschaum.connectors.api.APIConnector', str, None] = None, 46 ): 47 from meerschaum.config.static import STATIC_CONFIG 48 sep = STATIC_CONFIG['plugins']['repo_separator'] 49 _repo = None 50 if sep in name: 51 try: 52 name, _repo = name.split(sep) 53 except Exception as e: 54 error(f"Invalid plugin name: '{name}'") 55 self._repo_in_name = _repo 56 57 if attributes is None: 58 attributes = {} 59 self.name = name 60 self.attributes = attributes 61 self.user_id = user_id 62 self._version = version 63 if required: 64 self._required = required 65 self.archive_path = ( 66 archive_path if archive_path is not None 67 else PLUGINS_ARCHIVES_RESOURCES_PATH / f"{self.name}.tar.gz" 68 ) 69 self.venv_path = ( 70 venv_path if venv_path is not None 71 else VIRTENV_RESOURCES_PATH / self.name 72 ) 73 self._repo_connector = repo_connector 74 self._repo_keys = repo
77 @property 78 def repo_connector(self): 79 """ 80 Return the repository connector for this plugin. 81 NOTE: This imports the `connectors` module, which imports certain plugin modules. 82 """ 83 if self._repo_connector is None: 84 from meerschaum.connectors.parse import parse_repo_keys 85 86 repo_keys = self._repo_keys or self._repo_in_name 87 if self._repo_in_name and self._repo_keys and self._repo_keys != self._repo_in_name: 88 error( 89 f"Received inconsistent repos: '{self._repo_in_name}' and '{self._repo_keys}'." 90 ) 91 repo_connector = parse_repo_keys(repo_keys) 92 self._repo_connector = repo_connector 93 return self._repo_connector
Return the repository connector for this plugin.
NOTE: This imports the connectors
module, which imports certain plugin modules.
96 @property 97 def version(self): 98 """ 99 Return the plugin's module version is defined (`__version__`) if it's defined. 100 """ 101 if self._version is None: 102 try: 103 self._version = self.module.__version__ 104 except Exception as e: 105 self._version = None 106 return self._version
Return the plugin's module version is defined (__version__
) if it's defined.
109 @property 110 def module(self): 111 """ 112 Return the Python module of the underlying plugin. 113 """ 114 if '_module' not in self.__dict__ or self.__dict__.get('_module', None) is None: 115 if self.__file__ is None: 116 return None 117 from meerschaum.plugins import import_plugins 118 self._module = import_plugins(str(self), warn=False) 119 return self._module
Return the Python module of the underlying plugin.
145 @property 146 def requirements_file_path(self) -> Union[pathlib.Path, None]: 147 """ 148 If a file named `requirements.txt` exists, return its path. 149 """ 150 if self.__file__ is None: 151 return None 152 path = pathlib.Path(self.__file__).parent / 'requirements.txt' 153 if not path.exists(): 154 return None 155 return path
If a file named requirements.txt
exists, return its path.
158 def is_installed(self, **kw) -> bool: 159 """ 160 Check whether a plugin is correctly installed. 161 162 Returns 163 ------- 164 A `bool` indicating whether a plugin exists and is successfully imported. 165 """ 166 return self.__file__ is not None
Check whether a plugin is correctly installed.
Returns
- A
bool
indicating whether a plugin exists and is successfully imported.
169 def make_tar(self, debug: bool = False) -> pathlib.Path: 170 """ 171 Compress the plugin's source files into a `.tar.gz` archive and return the archive's path. 172 173 Parameters 174 ---------- 175 debug: bool, default False 176 Verbosity toggle. 177 178 Returns 179 ------- 180 A `pathlib.Path` to the archive file's path. 181 182 """ 183 import tarfile, pathlib, subprocess, fnmatch 184 from meerschaum.utils.debug import dprint 185 from meerschaum.utils.packages import attempt_import 186 pathspec = attempt_import('pathspec', debug=debug) 187 188 if not self.__file__: 189 from meerschaum.utils.warnings import error 190 error(f"Could not find file for plugin '{self}'.") 191 if '__init__.py' in self.__file__ or os.path.isdir(self.__file__): 192 path = self.__file__.replace('__init__.py', '') 193 is_dir = True 194 else: 195 path = self.__file__ 196 is_dir = False 197 198 old_cwd = os.getcwd() 199 real_parent_path = pathlib.Path(os.path.realpath(path)).parent 200 os.chdir(real_parent_path) 201 202 default_patterns_to_ignore = [ 203 '.pyc', 204 '__pycache__/', 205 'eggs/', 206 '__pypackages__/', 207 '.git', 208 ] 209 210 def parse_gitignore() -> 'Set[str]': 211 gitignore_path = pathlib.Path(path) / '.gitignore' 212 if not gitignore_path.exists(): 213 return set(default_patterns_to_ignore) 214 with open(gitignore_path, 'r', encoding='utf-8') as f: 215 gitignore_text = f.read() 216 return set(pathspec.PathSpec.from_lines( 217 pathspec.patterns.GitWildMatchPattern, 218 default_patterns_to_ignore + gitignore_text.splitlines() 219 ).match_tree(path)) 220 221 patterns_to_ignore = parse_gitignore() if is_dir else set() 222 223 if debug: 224 dprint(f"Patterns to ignore:\n{patterns_to_ignore}") 225 226 with tarfile.open(self.archive_path, 'w:gz') as tarf: 227 if not is_dir: 228 tarf.add(f"{self.name}.py") 229 else: 230 for root, dirs, files in os.walk(self.name): 231 for f in files: 232 good_file = True 233 fp = os.path.join(root, f) 234 for pattern in patterns_to_ignore: 235 if pattern in str(fp) or f.startswith('.'): 236 good_file = False 237 break 238 if good_file: 239 if debug: 240 dprint(f"Adding '{fp}'...") 241 tarf.add(fp) 242 243 ### clean up and change back to old directory 244 os.chdir(old_cwd) 245 246 ### change to 775 to avoid permissions issues with the API in a Docker container 247 self.archive_path.chmod(0o775) 248 249 if debug: 250 dprint(f"Created archive '{self.archive_path}'.") 251 return self.archive_path
Compress the plugin's source files into a .tar.gz
archive and return the archive's path.
Parameters
- debug (bool, default False): Verbosity toggle.
Returns
- A
pathlib.Path
to the archive file's path.
254 def install( 255 self, 256 skip_deps: bool = False, 257 force: bool = False, 258 debug: bool = False, 259 ) -> SuccessTuple: 260 """ 261 Extract a plugin's tar archive to the plugins directory. 262 263 This function checks if the plugin is already installed and if the version is equal or 264 greater than the existing installation. 265 266 Parameters 267 ---------- 268 skip_deps: bool, default False 269 If `True`, do not install dependencies. 270 271 force: bool, default False 272 If `True`, continue with installation, even if required packages fail to install. 273 274 debug: bool, default False 275 Verbosity toggle. 276 277 Returns 278 ------- 279 A `SuccessTuple` of success (bool) and a message (str). 280 281 """ 282 if self.full_name in _ongoing_installations: 283 return True, f"Already installing plugin '{self}'." 284 _ongoing_installations.add(self.full_name) 285 from meerschaum.utils.warnings import warn, error 286 if debug: 287 from meerschaum.utils.debug import dprint 288 import tarfile 289 import re 290 import ast 291 from meerschaum.plugins import sync_plugins_symlinks 292 from meerschaum.utils.packages import attempt_import, determine_version, reload_meerschaum 293 from meerschaum.utils.venv import init_venv 294 from meerschaum.utils.misc import safely_extract_tar 295 old_cwd = os.getcwd() 296 old_version = '' 297 new_version = '' 298 temp_dir = PLUGINS_TEMP_RESOURCES_PATH / self.name 299 temp_dir.mkdir(exist_ok=True) 300 301 if not self.archive_path.exists(): 302 return False, f"Missing archive file for plugin '{self}'." 303 if self.version is not None: 304 old_version = self.version 305 if debug: 306 dprint(f"Found existing version '{old_version}' for plugin '{self}'.") 307 308 if debug: 309 dprint(f"Extracting '{self.archive_path}' to '{temp_dir}'...") 310 311 try: 312 with tarfile.open(self.archive_path, 'r:gz') as tarf: 313 safely_extract_tar(tarf, temp_dir) 314 except Exception as e: 315 warn(e) 316 return False, f"Failed to extract plugin '{self.name}'." 317 318 ### search for version information 319 files = os.listdir(temp_dir) 320 321 if str(files[0]) == self.name: 322 is_dir = True 323 elif str(files[0]) == self.name + '.py': 324 is_dir = False 325 else: 326 error(f"Unknown format encountered for plugin '{self}'.") 327 328 fpath = temp_dir / files[0] 329 if is_dir: 330 fpath = fpath / '__init__.py' 331 332 init_venv(self.name, debug=debug) 333 with open(fpath, 'r', encoding='utf-8') as f: 334 init_lines = f.readlines() 335 new_version = None 336 for line in init_lines: 337 if '__version__' not in line: 338 continue 339 version_match = re.search(r'__version__(\s?)=', line.lstrip().rstrip()) 340 if not version_match: 341 continue 342 new_version = ast.literal_eval(line.split('=')[1].lstrip().rstrip()) 343 break 344 if not new_version: 345 warn( 346 f"No `__version__` defined for plugin '{self}'. " 347 + "Assuming new version...", 348 stack = False, 349 ) 350 351 packaging_version = attempt_import('packaging.version') 352 try: 353 is_new_version = (not new_version and not old_version) or ( 354 packaging_version.parse(old_version) < packaging_version.parse(new_version) 355 ) 356 is_same_version = new_version and old_version and ( 357 packaging_version.parse(old_version) == packaging_version.parse(new_version) 358 ) 359 except Exception as e: 360 is_new_version, is_same_version = True, False 361 362 ### Determine where to permanently store the new plugin. 363 plugin_installation_dir_path = PLUGINS_DIR_PATHS[0] 364 for path in PLUGINS_DIR_PATHS: 365 files_in_plugins_dir = os.listdir(path) 366 if ( 367 self.name in files_in_plugins_dir 368 or 369 (self.name + '.py') in files_in_plugins_dir 370 ): 371 plugin_installation_dir_path = path 372 break 373 374 success_msg = ( 375 f"Successfully installed plugin '{self}'" 376 + ("\n (skipped dependencies)" if skip_deps else "") 377 + "." 378 ) 379 success, abort = None, None 380 381 if is_same_version and not force: 382 success, msg = True, ( 383 f"Plugin '{self}' is up-to-date (version {old_version}).\n" + 384 " Install again with `-f` or `--force` to reinstall." 385 ) 386 abort = True 387 elif is_new_version or force: 388 for src_dir, dirs, files in os.walk(temp_dir): 389 if success is not None: 390 break 391 dst_dir = str(src_dir).replace(str(temp_dir), str(plugin_installation_dir_path)) 392 if not os.path.exists(dst_dir): 393 os.mkdir(dst_dir) 394 for f in files: 395 src_file = os.path.join(src_dir, f) 396 dst_file = os.path.join(dst_dir, f) 397 if os.path.exists(dst_file): 398 os.remove(dst_file) 399 400 if debug: 401 dprint(f"Moving '{src_file}' to '{dst_dir}'...") 402 try: 403 shutil.move(src_file, dst_dir) 404 except Exception as e: 405 success, msg = False, ( 406 f"Failed to install plugin '{self}': " + 407 f"Could not move file '{src_file}' to '{dst_dir}'" 408 ) 409 print(msg) 410 break 411 if success is None: 412 success, msg = True, success_msg 413 else: 414 success, msg = False, ( 415 f"Your installed version of plugin '{self}' ({old_version}) is higher than " 416 + f"attempted version {new_version}." 417 ) 418 419 shutil.rmtree(temp_dir) 420 os.chdir(old_cwd) 421 422 ### Reload the plugin's module. 423 sync_plugins_symlinks(debug=debug) 424 if '_module' in self.__dict__: 425 del self.__dict__['_module'] 426 init_venv(venv=self.name, force=True, debug=debug) 427 reload_meerschaum(debug=debug) 428 429 ### if we've already failed, return here 430 if not success or abort: 431 _ongoing_installations.remove(self.full_name) 432 return success, msg 433 434 ### attempt to install dependencies 435 dependencies_installed = skip_deps or self.install_dependencies(force=force, debug=debug) 436 if not dependencies_installed: 437 _ongoing_installations.remove(self.full_name) 438 return False, f"Failed to install dependencies for plugin '{self}'." 439 440 ### handling success tuple, bool, or other (typically None) 441 setup_tuple = self.setup(debug=debug) 442 if isinstance(setup_tuple, tuple): 443 if not setup_tuple[0]: 444 success, msg = setup_tuple 445 elif isinstance(setup_tuple, bool): 446 if not setup_tuple: 447 success, msg = False, ( 448 f"Failed to run post-install setup for plugin '{self}'." + '\n' + 449 f"Check `setup()` in '{self.__file__}' for more information " + 450 f"(no error message provided)." 451 ) 452 else: 453 success, msg = True, success_msg 454 elif setup_tuple is None: 455 success = True 456 msg = ( 457 f"Post-install for plugin '{self}' returned None. " + 458 f"Assuming plugin successfully installed." 459 ) 460 warn(msg) 461 else: 462 success = False 463 msg = ( 464 f"Post-install for plugin '{self}' returned unexpected value " + 465 f"of type '{type(setup_tuple)}': {setup_tuple}" 466 ) 467 468 _ongoing_installations.remove(self.full_name) 469 module = self.module 470 return success, msg
Extract a plugin's tar archive to the plugins directory.
This function checks if the plugin is already installed and if the version is equal or greater than the existing installation.
Parameters
- skip_deps (bool, default False):
If
True
, do not install dependencies. - force (bool, default False):
If
True
, continue with installation, even if required packages fail to install. - debug (bool, default False): Verbosity toggle.
Returns
- A
SuccessTuple
of success (bool) and a message (str).
473 def remove_archive( 474 self, 475 debug: bool = False 476 ) -> SuccessTuple: 477 """Remove a plugin's archive file.""" 478 if not self.archive_path.exists(): 479 return True, f"Archive file for plugin '{self}' does not exist." 480 try: 481 self.archive_path.unlink() 482 except Exception as e: 483 return False, f"Failed to remove archive for plugin '{self}':\n{e}" 484 return True, "Success"
Remove a plugin's archive file.
487 def remove_venv( 488 self, 489 debug: bool = False 490 ) -> SuccessTuple: 491 """Remove a plugin's virtual environment.""" 492 if not self.venv_path.exists(): 493 return True, f"Virtual environment for plugin '{self}' does not exist." 494 try: 495 shutil.rmtree(self.venv_path) 496 except Exception as e: 497 return False, f"Failed to remove virtual environment for plugin '{self}':\n{e}" 498 return True, "Success"
Remove a plugin's virtual environment.
501 def uninstall(self, debug: bool = False) -> SuccessTuple: 502 """ 503 Remove a plugin, its virtual environment, and archive file. 504 """ 505 from meerschaum.utils.packages import reload_meerschaum 506 from meerschaum.plugins import sync_plugins_symlinks 507 from meerschaum.utils.warnings import warn, info 508 warnings_thrown_count: int = 0 509 max_warnings: int = 3 510 511 if not self.is_installed(): 512 info( 513 f"Plugin '{self.name}' doesn't seem to be installed.\n " 514 + "Checking for artifacts...", 515 stack = False, 516 ) 517 else: 518 real_path = pathlib.Path(os.path.realpath(self.__file__)) 519 try: 520 if real_path.name == '__init__.py': 521 shutil.rmtree(real_path.parent) 522 else: 523 real_path.unlink() 524 except Exception as e: 525 warn(f"Could not remove source files for plugin '{self.name}':\n{e}", stack=False) 526 warnings_thrown_count += 1 527 else: 528 info(f"Removed source files for plugin '{self.name}'.") 529 530 if self.venv_path.exists(): 531 success, msg = self.remove_venv(debug=debug) 532 if not success: 533 warn(msg, stack=False) 534 warnings_thrown_count += 1 535 else: 536 info(f"Removed virtual environment from plugin '{self.name}'.") 537 538 success = warnings_thrown_count < max_warnings 539 sync_plugins_symlinks(debug=debug) 540 self.deactivate_venv(force=True, debug=debug) 541 reload_meerschaum(debug=debug) 542 return success, ( 543 f"Successfully uninstalled plugin '{self}'." if success 544 else f"Failed to uninstall plugin '{self}'." 545 )
Remove a plugin, its virtual environment, and archive file.
548 def setup(self, *args: str, debug: bool = False, **kw: Any) -> Union[SuccessTuple, bool]: 549 """ 550 If exists, run the plugin's `setup()` function. 551 552 Parameters 553 ---------- 554 *args: str 555 The positional arguments passed to the `setup()` function. 556 557 debug: bool, default False 558 Verbosity toggle. 559 560 **kw: Any 561 The keyword arguments passed to the `setup()` function. 562 563 Returns 564 ------- 565 A `SuccessTuple` or `bool` indicating success. 566 567 """ 568 from meerschaum.utils.debug import dprint 569 import inspect 570 _setup = None 571 for name, fp in inspect.getmembers(self.module): 572 if name == 'setup' and inspect.isfunction(fp): 573 _setup = fp 574 break 575 576 ### assume success if no setup() is found (not necessary) 577 if _setup is None: 578 return True 579 580 sig = inspect.signature(_setup) 581 has_debug, has_kw = ('debug' in sig.parameters), False 582 for k, v in sig.parameters.items(): 583 if '**' in str(v): 584 has_kw = True 585 break 586 587 _kw = {} 588 if has_kw: 589 _kw.update(kw) 590 if has_debug: 591 _kw['debug'] = debug 592 593 if debug: 594 dprint(f"Running setup for plugin '{self}'...") 595 try: 596 self.activate_venv(debug=debug) 597 return_tuple = _setup(*args, **_kw) 598 self.deactivate_venv(debug=debug) 599 except Exception as e: 600 return False, str(e) 601 602 if isinstance(return_tuple, tuple): 603 return return_tuple 604 if isinstance(return_tuple, bool): 605 return return_tuple, f"Setup for Plugin '{self.name}' did not return a message." 606 if return_tuple is None: 607 return False, f"Setup for Plugin '{self.name}' returned None." 608 return False, f"Unknown return value from setup for Plugin '{self.name}': {return_tuple}"
If exists, run the plugin's setup()
function.
Parameters
- *args (str):
The positional arguments passed to the
setup()
function. - debug (bool, default False): Verbosity toggle.
- **kw (Any):
The keyword arguments passed to the
setup()
function.
Returns
- A
SuccessTuple
orbool
indicating success.
611 def get_dependencies( 612 self, 613 debug: bool = False, 614 ) -> List[str]: 615 """ 616 If the Plugin has specified dependencies in a list called `required`, return the list. 617 618 **NOTE:** Dependecies which start with `'plugin:'` are Meerschaum plugins, not pip packages. 619 Meerschaum plugins may also specify connector keys for a repo after `'@'`. 620 621 Parameters 622 ---------- 623 debug: bool, default False 624 Verbosity toggle. 625 626 Returns 627 ------- 628 A list of required packages and plugins (str). 629 630 """ 631 if '_required' in self.__dict__: 632 return self._required 633 634 ### If the plugin has not yet been imported, 635 ### infer the dependencies from the source text. 636 ### This is not super robust, and it doesn't feel right 637 ### having multiple versions of the logic. 638 ### This is necessary when determining the activation order 639 ### without having import the module. 640 ### For consistency's sake, the module-less method does not cache the requirements. 641 if self.__dict__.get('_module', None) is None: 642 file_path = self.__file__ 643 if file_path is None: 644 return [] 645 with open(file_path, 'r', encoding='utf-8') as f: 646 text = f.read() 647 648 if 'required' not in text: 649 return [] 650 651 ### This has some limitations: 652 ### It relies on `required` being manually declared. 653 ### We lose the ability to dynamically alter the `required` list, 654 ### which is why we've kept the module-reliant method below. 655 import ast, re 656 ### NOTE: This technically would break 657 ### if `required` was the very first line of the file. 658 req_start_match = re.search(r'required(:\s*)?.*=', text) 659 if not req_start_match: 660 return [] 661 req_start = req_start_match.start() 662 equals_sign = req_start + text[req_start:].find('=') 663 664 ### Dependencies may have brackets within the strings, so push back the index. 665 first_opening_brace = equals_sign + 1 + text[equals_sign:].find('[') 666 if first_opening_brace == -1: 667 return [] 668 669 next_closing_brace = equals_sign + 1 + text[equals_sign:].find(']') 670 if next_closing_brace == -1: 671 return [] 672 673 start_ix = first_opening_brace + 1 674 end_ix = next_closing_brace 675 676 num_braces = 0 677 while True: 678 if '[' not in text[start_ix:end_ix]: 679 break 680 num_braces += 1 681 start_ix = end_ix 682 end_ix += text[end_ix + 1:].find(']') + 1 683 684 req_end = end_ix + 1 685 req_text = ( 686 text[(first_opening_brace-1):req_end] 687 .lstrip() 688 .replace('=', '', 1) 689 .lstrip() 690 .rstrip() 691 ) 692 try: 693 required = ast.literal_eval(req_text) 694 except Exception as e: 695 warn( 696 f"Unable to determine requirements for plugin '{self.name}' " 697 + "without importing the module.\n" 698 + " This may be due to dynamically setting the global `required` list.\n" 699 + f" {e}" 700 ) 701 return [] 702 return required 703 704 import inspect 705 self.activate_venv(dependencies=False, debug=debug) 706 required = [] 707 for name, val in inspect.getmembers(self.module): 708 if name == 'required': 709 required = val 710 break 711 self._required = required 712 self.deactivate_venv(dependencies=False, debug=debug) 713 return required
If the Plugin has specified dependencies in a list called required
, return the list.
NOTE: Dependecies which start with 'plugin:'
are Meerschaum plugins, not pip packages.
Meerschaum plugins may also specify connector keys for a repo after '@'
.
Parameters
- debug (bool, default False): Verbosity toggle.
Returns
- A list of required packages and plugins (str).
716 def get_required_plugins(self, debug: bool=False) -> List[meerschaum.plugins.Plugin]: 717 """ 718 Return a list of required Plugin objects. 719 """ 720 from meerschaum.utils.warnings import warn 721 from meerschaum.config import get_config 722 from meerschaum.config.static import STATIC_CONFIG 723 plugins = [] 724 _deps = self.get_dependencies(debug=debug) 725 sep = STATIC_CONFIG['plugins']['repo_separator'] 726 plugin_names = [ 727 _d[len('plugin:'):] for _d in _deps 728 if _d.startswith('plugin:') and len(_d) > len('plugin:') 729 ] 730 default_repo_keys = get_config('meerschaum', 'default_repository') 731 for _plugin_name in plugin_names: 732 if sep in _plugin_name: 733 try: 734 _plugin_name, _repo_keys = _plugin_name.split(sep) 735 except Exception as e: 736 _repo_keys = default_repo_keys 737 warn( 738 f"Invalid repo keys for required plugin '{_plugin_name}'.\n " 739 + f"Will try to use '{_repo_keys}' instead.", 740 stack = False, 741 ) 742 else: 743 _repo_keys = default_repo_keys 744 plugins.append(Plugin(_plugin_name, repo=_repo_keys)) 745 return plugins
Return a list of required Plugin objects.
748 def get_required_packages(self, debug: bool=False) -> List[str]: 749 """ 750 Return the required package names (excluding plugins). 751 """ 752 _deps = self.get_dependencies(debug=debug) 753 return [_d for _d in _deps if not _d.startswith('plugin:')]
Return the required package names (excluding plugins).
756 def activate_venv(self, dependencies: bool=True, debug: bool=False, **kw) -> bool: 757 """ 758 Activate the virtual environments for the plugin and its dependencies. 759 760 Parameters 761 ---------- 762 dependencies: bool, default True 763 If `True`, activate the virtual environments for required plugins. 764 765 Returns 766 ------- 767 A bool indicating success. 768 """ 769 from meerschaum.utils.venv import venv_target_path 770 from meerschaum.utils.packages import activate_venv 771 from meerschaum.utils.misc import make_symlink, is_symlink 772 from meerschaum.config._paths import PACKAGE_ROOT_PATH 773 774 if dependencies: 775 for plugin in self.get_required_plugins(debug=debug): 776 plugin.activate_venv(debug=debug, **kw) 777 778 vtp = venv_target_path(self.name, debug=debug, allow_nonexistent=True) 779 venv_meerschaum_path = vtp / 'meerschaum' 780 781 try: 782 success, msg = True, "Success" 783 if is_symlink(venv_meerschaum_path): 784 if pathlib.Path(os.path.realpath(venv_meerschaum_path)) != PACKAGE_ROOT_PATH: 785 venv_meerschaum_path.unlink() 786 success, msg = make_symlink(venv_meerschaum_path, PACKAGE_ROOT_PATH) 787 except Exception as e: 788 success, msg = False, str(e) 789 if not success: 790 warn(f"Unable to create symlink {venv_meerschaum_path} to {PACKAGE_ROOT_PATH}:\n{msg}") 791 792 return activate_venv(self.name, debug=debug, **kw)
Activate the virtual environments for the plugin and its dependencies.
Parameters
- dependencies (bool, default True):
If
True
, activate the virtual environments for required plugins.
Returns
- A bool indicating success.
795 def deactivate_venv(self, dependencies: bool=True, debug: bool = False, **kw) -> bool: 796 """ 797 Deactivate the virtual environments for the plugin and its dependencies. 798 799 Parameters 800 ---------- 801 dependencies: bool, default True 802 If `True`, deactivate the virtual environments for required plugins. 803 804 Returns 805 ------- 806 A bool indicating success. 807 """ 808 from meerschaum.utils.packages import deactivate_venv 809 success = deactivate_venv(self.name, debug=debug, **kw) 810 if dependencies: 811 for plugin in self.get_required_plugins(debug=debug): 812 plugin.deactivate_venv(debug=debug, **kw) 813 return success
Deactivate the virtual environments for the plugin and its dependencies.
Parameters
- dependencies (bool, default True):
If
True
, deactivate the virtual environments for required plugins.
Returns
- A bool indicating success.
816 def install_dependencies( 817 self, 818 force: bool = False, 819 debug: bool = False, 820 ) -> bool: 821 """ 822 If specified, install dependencies. 823 824 **NOTE:** Dependencies that start with `'plugin:'` will be installed as 825 Meerschaum plugins from the same repository as this Plugin. 826 To install from a different repository, add the repo keys after `'@'` 827 (e.g. `'plugin:foo@api:bar'`). 828 829 Parameters 830 ---------- 831 force: bool, default False 832 If `True`, continue with the installation, even if some 833 required packages fail to install. 834 835 debug: bool, default False 836 Verbosity toggle. 837 838 Returns 839 ------- 840 A bool indicating success. 841 842 """ 843 from meerschaum.utils.packages import pip_install, venv_contains_package 844 from meerschaum.utils.debug import dprint 845 from meerschaum.utils.warnings import warn, info 846 from meerschaum.connectors.parse import parse_repo_keys 847 _deps = self.get_dependencies(debug=debug) 848 if not _deps and self.requirements_file_path is None: 849 return True 850 851 plugins = self.get_required_plugins(debug=debug) 852 for _plugin in plugins: 853 if _plugin.name == self.name: 854 warn(f"Plugin '{self.name}' cannot depend on itself! Skipping...", stack=False) 855 continue 856 _success, _msg = _plugin.repo_connector.install_plugin( 857 _plugin.name, debug=debug, force=force 858 ) 859 if not _success: 860 warn( 861 f"Failed to install required plugin '{_plugin}' from '{_plugin.repo_connector}'" 862 + f" for plugin '{self.name}':\n" + _msg, 863 stack = False, 864 ) 865 if not force: 866 warn( 867 "Try installing with the `--force` flag to continue anyway.", 868 stack = False, 869 ) 870 return False 871 info( 872 "Continuing with installation despite the failure " 873 + "(careful, things might be broken!)...", 874 icon = False 875 ) 876 877 878 ### First step: parse `requirements.txt` if it exists. 879 if self.requirements_file_path is not None: 880 if not pip_install( 881 requirements_file_path=self.requirements_file_path, 882 venv=self.name, debug=debug 883 ): 884 warn( 885 f"Failed to resolve 'requirements.txt' for plugin '{self.name}'.", 886 stack = False, 887 ) 888 if not force: 889 warn( 890 "Try installing with `--force` to continue anyway.", 891 stack = False, 892 ) 893 return False 894 info( 895 "Continuing with installation despite the failure " 896 + "(careful, things might be broken!)...", 897 icon = False 898 ) 899 900 901 ### Don't reinstall packages that are already included in required plugins. 902 packages = [] 903 _packages = self.get_required_packages(debug=debug) 904 accounted_for_packages = set() 905 for package_name in _packages: 906 for plugin in plugins: 907 if venv_contains_package(package_name, plugin.name): 908 accounted_for_packages.add(package_name) 909 break 910 packages = [pkg for pkg in _packages if pkg not in accounted_for_packages] 911 912 ### Attempt pip packages installation. 913 if packages: 914 for package in packages: 915 if not pip_install(package, venv=self.name, debug=debug): 916 warn( 917 f"Failed to install required package '{package}'" 918 + f" for plugin '{self.name}'.", 919 stack = False, 920 ) 921 if not force: 922 warn( 923 "Try installing with `--force` to continue anyway.", 924 stack = False, 925 ) 926 return False 927 info( 928 "Continuing with installation despite the failure " 929 + "(careful, things might be broken!)...", 930 icon = False 931 ) 932 return True
If specified, install dependencies.
NOTE: Dependencies that start with 'plugin:'
will be installed as
Meerschaum plugins from the same repository as this Plugin.
To install from a different repository, add the repo keys after '@'
(e.g. 'plugin:foo@api:bar'
).
Parameters
- force (bool, default False):
If
True
, continue with the installation, even if some required packages fail to install. - debug (bool, default False): Verbosity toggle.
Returns
- A bool indicating success.
935 @property 936 def full_name(self) -> str: 937 """ 938 Include the repo keys with the plugin's name. 939 """ 940 from meerschaum.config.static import STATIC_CONFIG 941 sep = STATIC_CONFIG['plugins']['repo_separator'] 942 return self.name + sep + str(self.repo_connector)
Include the repo keys with the plugin's name.
18class Venv: 19 """ 20 Manage a virtual enviroment's activation status. 21 22 Examples 23 -------- 24 >>> from meerschaum.plugins import Plugin 25 >>> with Venv('mrsm') as venv: 26 ... import pandas 27 >>> with Venv(Plugin('noaa')) as venv: 28 ... import requests 29 >>> venv = Venv('mrsm') 30 >>> venv.activate() 31 True 32 >>> venv.deactivate() 33 True 34 >>> 35 """ 36 37 def __init__( 38 self, 39 venv: Union[str, 'meerschaum.plugins.Plugin', None] = 'mrsm', 40 debug: bool = False, 41 ) -> None: 42 from meerschaum.utils.venv import activate_venv, deactivate_venv, active_venvs 43 ### For some weird threading issue, 44 ### we can't use `isinstance` here. 45 if 'meerschaum.plugins._Plugin' in str(type(venv)): 46 self._venv = venv.name 47 self._activate = venv.activate_venv 48 self._deactivate = venv.deactivate_venv 49 self._kwargs = {} 50 else: 51 self._venv = venv 52 self._activate = activate_venv 53 self._deactivate = deactivate_venv 54 self._kwargs = {'venv': venv} 55 self._debug = debug 56 ### In case someone calls `deactivate()` before `activate()`. 57 self._kwargs['previously_active_venvs'] = copy.deepcopy(active_venvs) 58 59 60 def activate(self, debug: bool = False) -> bool: 61 """ 62 Activate this virtual environment. 63 If a `meerschaum.plugins.Plugin` was provided, its dependent virtual environments 64 will also be activated. 65 """ 66 from meerschaum.utils.venv import active_venvs 67 self._kwargs['previously_active_venvs'] = copy.deepcopy(active_venvs) 68 return self._activate(debug=(debug or self._debug), **self._kwargs) 69 70 71 def deactivate(self, debug: bool = False) -> bool: 72 """ 73 Deactivate this virtual environment. 74 If a `meerschaum.plugins.Plugin` was provided, its dependent virtual environments 75 will also be deactivated. 76 """ 77 return self._deactivate(debug=(debug or self._debug), **self._kwargs) 78 79 80 @property 81 def target_path(self) -> pathlib.Path: 82 """ 83 Return the target site-packages path for this virtual environment. 84 A `meerschaum.utils.venv.Venv` may have one virtual environment per minor Python version 85 (e.g. Python 3.10 and Python 3.7). 86 """ 87 from meerschaum.utils.venv import venv_target_path 88 return venv_target_path(venv=self._venv, allow_nonexistent=True, debug=self._debug) 89 90 91 @property 92 def root_path(self) -> pathlib.Path: 93 """ 94 Return the top-level path for this virtual environment. 95 """ 96 from meerschaum.config._paths import VIRTENV_RESOURCES_PATH 97 if self._venv is None: 98 return self.target_path.parent 99 return VIRTENV_RESOURCES_PATH / self._venv 100 101 102 def __enter__(self) -> None: 103 self.activate(debug=self._debug) 104 105 106 def __exit__(self, exc_type, exc_value, exc_traceback) -> None: 107 self.deactivate(debug=self._debug) 108 109 110 def __str__(self) -> str: 111 quote = "'" if self._venv is not None else "" 112 return "Venv(" + quote + str(self._venv) + quote + ")" 113 114 115 def __repr__(self) -> str: 116 return self.__str__()
Manage a virtual enviroment's activation status.
Examples
>>> from meerschaum.plugins import Plugin
>>> with Venv('mrsm') as venv:
... import pandas
>>> with Venv(Plugin('noaa')) as venv:
... import requests
>>> venv = Venv('mrsm')
>>> venv.activate()
True
>>> venv.deactivate()
True
>>>
37 def __init__( 38 self, 39 venv: Union[str, 'meerschaum.plugins.Plugin', None] = 'mrsm', 40 debug: bool = False, 41 ) -> None: 42 from meerschaum.utils.venv import activate_venv, deactivate_venv, active_venvs 43 ### For some weird threading issue, 44 ### we can't use `isinstance` here. 45 if 'meerschaum.plugins._Plugin' in str(type(venv)): 46 self._venv = venv.name 47 self._activate = venv.activate_venv 48 self._deactivate = venv.deactivate_venv 49 self._kwargs = {} 50 else: 51 self._venv = venv 52 self._activate = activate_venv 53 self._deactivate = deactivate_venv 54 self._kwargs = {'venv': venv} 55 self._debug = debug 56 ### In case someone calls `deactivate()` before `activate()`. 57 self._kwargs['previously_active_venvs'] = copy.deepcopy(active_venvs)
60 def activate(self, debug: bool = False) -> bool: 61 """ 62 Activate this virtual environment. 63 If a `meerschaum.plugins.Plugin` was provided, its dependent virtual environments 64 will also be activated. 65 """ 66 from meerschaum.utils.venv import active_venvs 67 self._kwargs['previously_active_venvs'] = copy.deepcopy(active_venvs) 68 return self._activate(debug=(debug or self._debug), **self._kwargs)
Activate this virtual environment.
If a Plugin
was provided, its dependent virtual environments
will also be activated.
71 def deactivate(self, debug: bool = False) -> bool: 72 """ 73 Deactivate this virtual environment. 74 If a `meerschaum.plugins.Plugin` was provided, its dependent virtual environments 75 will also be deactivated. 76 """ 77 return self._deactivate(debug=(debug or self._debug), **self._kwargs)
Deactivate this virtual environment.
If a Plugin
was provided, its dependent virtual environments
will also be deactivated.
80 @property 81 def target_path(self) -> pathlib.Path: 82 """ 83 Return the target site-packages path for this virtual environment. 84 A `meerschaum.utils.venv.Venv` may have one virtual environment per minor Python version 85 (e.g. Python 3.10 and Python 3.7). 86 """ 87 from meerschaum.utils.venv import venv_target_path 88 return venv_target_path(venv=self._venv, allow_nonexistent=True, debug=self._debug)
Return the target site-packages path for this virtual environment.
A Venv
may have one virtual environment per minor Python version
(e.g. Python 3.10 and Python 3.7).
91 @property 92 def root_path(self) -> pathlib.Path: 93 """ 94 Return the top-level path for this virtual environment. 95 """ 96 from meerschaum.config._paths import VIRTENV_RESOURCES_PATH 97 if self._venv is None: 98 return self.target_path.parent 99 return VIRTENV_RESOURCES_PATH / self._venv
Return the top-level path for this virtual environment.
10def pprint( 11 *args, 12 detect_password: bool = True, 13 nopretty: bool = False, 14 **kw 15 ) -> None: 16 """Pretty print an object according to the configured ANSI and UNICODE settings. 17 If detect_password is True (default), search and replace passwords with '*' characters. 18 Does not mutate objects. 19 """ 20 from meerschaum.utils.packages import attempt_import, import_rich 21 from meerschaum.utils.formatting import ANSI, UNICODE, get_console, print_tuple 22 from meerschaum.utils.warnings import error 23 from meerschaum.utils.misc import replace_password, dict_from_od, filter_keywords 24 from collections import OrderedDict 25 import copy, json 26 27 if ( 28 len(args) == 1 29 and 30 isinstance(args[0], tuple) 31 and 32 len(args[0]) == 2 33 and 34 isinstance(args[0][0], bool) 35 and 36 isinstance(args[0][1], str) 37 ): 38 return print_tuple(args[0]) 39 40 modify = True 41 rich_pprint = None 42 if ANSI and not nopretty: 43 rich = import_rich() 44 if rich is not None: 45 rich_pretty = attempt_import('rich.pretty') 46 if rich_pretty is not None: 47 def _rich_pprint(*args, **kw): 48 _console = get_console() 49 _kw = filter_keywords(_console.print, **kw) 50 _console.print(*args, **_kw) 51 rich_pprint = _rich_pprint 52 elif not nopretty: 53 pprintpp = attempt_import('pprintpp', warn=False) 54 try: 55 _pprint = pprintpp.pprint 56 except Exception as e: 57 import pprint as _pprint_module 58 _pprint = _pprint_module.pprint 59 60 func = ( 61 _pprint if rich_pprint is None else rich_pprint 62 ) if not nopretty else print 63 64 try: 65 args_copy = copy.deepcopy(args) 66 except Exception as e: 67 args_copy = args 68 modify = False 69 _args = [] 70 for a in args: 71 c = a 72 ### convert OrderedDict into dict 73 if isinstance(a, OrderedDict) or issubclass(type(a), OrderedDict): 74 c = dict_from_od(copy.deepcopy(c)) 75 _args.append(c) 76 args = _args 77 78 _args = list(args) 79 if detect_password and modify: 80 _args = [] 81 for a in args: 82 c = a 83 if isinstance(c, dict): 84 c = replace_password(copy.deepcopy(c)) 85 if nopretty: 86 try: 87 c = json.dumps(c) 88 is_json = True 89 except Exception as e: 90 is_json = False 91 if not is_json: 92 try: 93 c = str(c) 94 except Exception as e: 95 pass 96 _args.append(c) 97 98 ### filter out unsupported keywords 99 func_kw = filter_keywords(func, **kw) if not nopretty else {} 100 error_msg = None 101 try: 102 func(*_args, **func_kw) 103 except Exception as e: 104 error_msg = e 105 if error_msg is not None: 106 error(error_msg)
Pretty print an object according to the configured ANSI and UNICODE settings. If detect_password is True (default), search and replace passwords with '*' characters. Does not mutate objects.
1197def attempt_import( 1198 *names: str, 1199 lazy: bool = True, 1200 warn: bool = True, 1201 install: bool = True, 1202 venv: Optional[str] = 'mrsm', 1203 precheck: bool = True, 1204 split: bool = True, 1205 check_update: bool = False, 1206 check_pypi: bool = False, 1207 check_is_installed: bool = True, 1208 allow_outside_venv: bool = True, 1209 color: bool = True, 1210 debug: bool = False 1211 ) -> Any: 1212 """ 1213 Raise a warning if packages are not installed; otherwise import and return modules. 1214 If `lazy` is `True`, return lazy-imported modules. 1215 1216 Returns tuple of modules if multiple names are provided, else returns one module. 1217 1218 Parameters 1219 ---------- 1220 names: List[str] 1221 The packages to be imported. 1222 1223 lazy: bool, default True 1224 If `True`, lazily load packages. 1225 1226 warn: bool, default True 1227 If `True`, raise a warning if a package cannot be imported. 1228 1229 install: bool, default True 1230 If `True`, attempt to install a missing package into the designated virtual environment. 1231 If `check_update` is True, install updates if available. 1232 1233 venv: Optional[str], default 'mrsm' 1234 The virtual environment in which to search for packages and to install packages into. 1235 1236 precheck: bool, default True 1237 If `True`, attempt to find module before importing (necessary for checking if modules exist 1238 and retaining lazy imports), otherwise assume lazy is `False`. 1239 1240 split: bool, default True 1241 If `True`, split packages' names on `'.'`. 1242 1243 check_update: bool, default False 1244 If `True` and `install` is `True`, install updates if the required minimum version 1245 does not match. 1246 1247 check_pypi: bool, default False 1248 If `True` and `check_update` is `True`, check PyPI when determining whether 1249 an update is required. 1250 1251 check_is_installed: bool, default True 1252 If `True`, check if the package is contained in the virtual environment. 1253 1254 allow_outside_venv: bool, default True 1255 If `True`, search outside of the specified virtual environment 1256 if the package cannot be found. 1257 Setting to `False` will reinstall the package into a virtual environment, even if it 1258 is installed outside. 1259 1260 color: bool, default True 1261 If `False`, do not print ANSI colors. 1262 1263 Returns 1264 ------- 1265 The specified modules. If they're not available and `install` is `True`, it will first 1266 download them into a virtual environment and return the modules. 1267 1268 Examples 1269 -------- 1270 >>> pandas, sqlalchemy = attempt_import('pandas', 'sqlalchemy') 1271 >>> pandas = attempt_import('pandas') 1272 1273 """ 1274 1275 import importlib.util 1276 1277 ### to prevent recursion, check if parent Meerschaum package is being imported 1278 if names == ('meerschaum',): 1279 return _import_module('meerschaum') 1280 1281 if venv == 'mrsm' and _import_hook_venv is not None: 1282 if debug: 1283 print(f"Import hook for virtual environment '{_import_hook_venv}' is active.") 1284 venv = _import_hook_venv 1285 1286 _warnings = _import_module('meerschaum.utils.warnings') 1287 warn_function = _warnings.warn 1288 1289 def do_import(_name: str, **kw) -> Union['ModuleType', None]: 1290 with Venv(venv=venv, debug=debug): 1291 ### determine the import method (lazy vs normal) 1292 from meerschaum.utils.misc import filter_keywords 1293 import_method = ( 1294 _import_module if not lazy 1295 else lazy_import 1296 ) 1297 try: 1298 mod = import_method(_name, **(filter_keywords(import_method, **kw))) 1299 except Exception as e: 1300 if warn: 1301 import traceback 1302 traceback.print_exception(type(e), e, e.__traceback__) 1303 warn_function( 1304 f"Failed to import module '{_name}'.\nException:\n{e}", 1305 ImportWarning, 1306 stacklevel = (5 if lazy else 4), 1307 color = False, 1308 ) 1309 mod = None 1310 return mod 1311 1312 modules = [] 1313 for name in names: 1314 ### Check if package is a declared dependency. 1315 root_name = name.split('.')[0] if split else name 1316 install_name = _import_to_install_name(root_name) 1317 1318 if install_name is None: 1319 install_name = root_name 1320 if warn and root_name != 'plugins': 1321 warn_function( 1322 f"Package '{root_name}' is not declared in meerschaum.utils.packages.", 1323 ImportWarning, 1324 stacklevel = 3, 1325 color = False 1326 ) 1327 1328 ### Determine if the package exists. 1329 if precheck is False: 1330 found_module = ( 1331 do_import( 1332 name, debug=debug, warn=False, venv=venv, color=color, 1333 check_update=False, check_pypi=False, split=split, 1334 ) is not None 1335 ) 1336 else: 1337 if check_is_installed: 1338 with _locks['_is_installed_first_check']: 1339 if not _is_installed_first_check.get(name, False): 1340 package_is_installed = is_installed( 1341 name, 1342 venv = venv, 1343 split = split, 1344 allow_outside_venv = allow_outside_venv, 1345 debug = debug, 1346 ) 1347 _is_installed_first_check[name] = package_is_installed 1348 else: 1349 package_is_installed = _is_installed_first_check[name] 1350 else: 1351 package_is_installed = _is_installed_first_check.get( 1352 name, 1353 venv_contains_package(name, venv=venv, split=split, debug=debug) 1354 ) 1355 found_module = package_is_installed 1356 1357 if not found_module: 1358 if install: 1359 if not pip_install( 1360 install_name, 1361 venv = venv, 1362 split = False, 1363 check_update = check_update, 1364 color = color, 1365 debug = debug 1366 ) and warn: 1367 warn_function( 1368 f"Failed to install '{install_name}'.", 1369 ImportWarning, 1370 stacklevel = 3, 1371 color = False, 1372 ) 1373 elif warn: 1374 ### Raise a warning if we can't find the package and install = False. 1375 warn_function( 1376 (f"\n\nMissing package '{name}' from virtual environment '{venv}'; " 1377 + "some features will not work correctly." 1378 + f"\n\nSet install=True when calling attempt_import.\n"), 1379 ImportWarning, 1380 stacklevel = 3, 1381 color = False, 1382 ) 1383 1384 ### Do the import. Will be lazy if lazy=True. 1385 m = do_import( 1386 name, debug=debug, warn=warn, venv=venv, color=color, 1387 check_update=check_update, check_pypi=check_pypi, install=install, split=split, 1388 ) 1389 modules.append(m) 1390 1391 modules = tuple(modules) 1392 if len(modules) == 1: 1393 return modules[0] 1394 return modules
Raise a warning if packages are not installed; otherwise import and return modules.
If lazy
is True
, return lazy-imported modules.
Returns tuple of modules if multiple names are provided, else returns one module.
Parameters
- names (List[str]): The packages to be imported.
- lazy (bool, default True):
If
True
, lazily load packages. - warn (bool, default True):
If
True
, raise a warning if a package cannot be imported. - install (bool, default True):
If
True
, attempt to install a missing package into the designated virtual environment. Ifcheck_update
is True, install updates if available. - venv (Optional[str], default 'mrsm'): The virtual environment in which to search for packages and to install packages into.
- precheck (bool, default True):
If
True
, attempt to find module before importing (necessary for checking if modules exist and retaining lazy imports), otherwise assume lazy isFalse
. - split (bool, default True):
If
True
, split packages' names on'.'
. - check_update (bool, default False):
If
True
andinstall
isTrue
, install updates if the required minimum version does not match. - check_pypi (bool, default False):
If
True
andcheck_update
isTrue
, check PyPI when determining whether an update is required. - check_is_installed (bool, default True):
If
True
, check if the package is contained in the virtual environment. - allow_outside_venv (bool, default True):
If
True
, search outside of the specified virtual environment if the package cannot be found. Setting toFalse
will reinstall the package into a virtual environment, even if it is installed outside. - color (bool, default True):
If
False
, do not print ANSI colors.
Returns
- The specified modules. If they're not available and
install
isTrue
, it will first - download them into a virtual environment and return the modules.
Examples
>>> pandas, sqlalchemy = attempt_import('pandas', 'sqlalchemy')
>>> pandas = attempt_import('pandas')
20class Connector(metaclass=abc.ABCMeta): 21 """ 22 The base connector class to hold connection attributes. 23 """ 24 def __init__( 25 self, 26 type: Optional[str] = None, 27 label: Optional[str] = None, 28 **kw: Any 29 ): 30 """ 31 Set the given keyword arguments as attributes. 32 33 Parameters 34 ---------- 35 type: str 36 The `type` of the connector (e.g. `sql`, `api`, `plugin`). 37 38 label: str 39 The `label` for the connector. 40 41 42 Examples 43 -------- 44 Run `mrsm edit config` and to edit connectors in the YAML file: 45 46 ```yaml 47 meerschaum: 48 connections: 49 {type}: 50 {label}: 51 ### attributes go here 52 ``` 53 54 """ 55 self._original_dict = copy.deepcopy(self.__dict__) 56 self._set_attributes(type=type, label=label, **kw) 57 self.verify_attributes(getattr(self, 'REQUIRED_ATTRIBUTES', None)) 58 59 def _reset_attributes(self): 60 self.__dict__ = self._original_dict 61 62 def _set_attributes( 63 self, 64 *args, 65 inherit_default: bool = True, 66 **kw: Any 67 ): 68 from meerschaum.config.static import STATIC_CONFIG 69 from meerschaum.utils.warnings import error 70 71 self._attributes = {} 72 73 default_label = STATIC_CONFIG['connectors']['default_label'] 74 75 ### NOTE: Support the legacy method of explicitly passing the type. 76 label = kw.get('label', None) 77 if label is None: 78 if len(args) == 2: 79 label = args[1] 80 elif len(args) == 0: 81 label = None 82 else: 83 label = args[0] 84 85 if label == 'default': 86 error( 87 f"Label cannot be 'default'. Did you mean '{default_label}'?", 88 InvalidAttributesError, 89 ) 90 self.__dict__['label'] = label 91 92 from meerschaum.config import get_config 93 conn_configs = copy.deepcopy(get_config('meerschaum', 'connectors')) 94 connector_config = copy.deepcopy(get_config('system', 'connectors')) 95 96 ### inherit attributes from 'default' if exists 97 if inherit_default: 98 inherit_from = 'default' 99 if self.type in conn_configs and inherit_from in conn_configs[self.type]: 100 _inherit_dict = copy.deepcopy(conn_configs[self.type][inherit_from]) 101 self._attributes.update(_inherit_dict) 102 103 ### load user config into self._attributes 104 if self.type in conn_configs and self.label in conn_configs[self.type]: 105 self._attributes.update(conn_configs[self.type][self.label]) 106 107 ### load system config into self._sys_config 108 ### (deep copy so future Connectors don't inherit changes) 109 if self.type in connector_config: 110 self._sys_config = copy.deepcopy(connector_config[self.type]) 111 112 ### add additional arguments or override configuration 113 self._attributes.update(kw) 114 115 ### finally, update __dict__ with _attributes. 116 self.__dict__.update(self._attributes) 117 118 119 def verify_attributes( 120 self, 121 required_attributes: Optional[List[str]] = None, 122 debug: bool = False 123 ) -> None: 124 """ 125 Ensure that the required attributes have been met. 126 127 The Connector base class checks the minimum requirements. 128 Child classes may enforce additional requirements. 129 130 Parameters 131 ---------- 132 required_attributes: Optional[List[str]], default None 133 Attributes to be verified. If `None`, default to `['label']`. 134 135 debug: bool, default False 136 Verbosity toggle. 137 138 Returns 139 ------- 140 Don't return anything. 141 142 Raises 143 ------ 144 An error if any of the required attributes are missing. 145 """ 146 from meerschaum.utils.warnings import error, warn 147 from meerschaum.utils.debug import dprint 148 from meerschaum.utils.misc import items_str 149 if required_attributes is None: 150 required_attributes = ['label'] 151 missing_attributes = set() 152 for a in required_attributes: 153 if a not in self.__dict__: 154 missing_attributes.add(a) 155 if len(missing_attributes) > 0: 156 error( 157 ( 158 f"Missing {items_str(list(missing_attributes))} " 159 + f"for connector '{self.type}:{self.label}'." 160 ), 161 InvalidAttributesError, 162 silent = True, 163 stack = False 164 ) 165 166 167 def __str__(self): 168 """ 169 When cast to a string, return type:label. 170 """ 171 return f"{self.type}:{self.label}" 172 173 def __repr__(self): 174 """ 175 Represent the connector as type:label. 176 """ 177 return str(self) 178 179 @property 180 def meta(self) -> Dict[str, Any]: 181 """ 182 Return the keys needed to reconstruct this Connector. 183 """ 184 _meta = { 185 key: value 186 for key, value in self.__dict__.items() 187 if not str(key).startswith('_') 188 } 189 _meta.update({ 190 'type': self.type, 191 'label': self.label, 192 }) 193 return _meta 194 195 196 @property 197 def type(self) -> str: 198 """ 199 Return the type for this connector. 200 """ 201 _type = self.__dict__.get('type', None) 202 if _type is None: 203 import re 204 _type = re.sub(r'connector$', '', self.__class__.__name__.lower()) 205 self.__dict__['type'] = _type 206 return _type 207 208 209 @property 210 def label(self) -> str: 211 """ 212 Return the label for this connector. 213 """ 214 _label = self.__dict__.get('label', None) 215 if _label is None: 216 from meerschaum.config.static import STATIC_CONFIG 217 _label = STATIC_CONFIG['connectors']['default_label'] 218 self.__dict__['label'] = _label 219 return _label
The base connector class to hold connection attributes.
24 def __init__( 25 self, 26 type: Optional[str] = None, 27 label: Optional[str] = None, 28 **kw: Any 29 ): 30 """ 31 Set the given keyword arguments as attributes. 32 33 Parameters 34 ---------- 35 type: str 36 The `type` of the connector (e.g. `sql`, `api`, `plugin`). 37 38 label: str 39 The `label` for the connector. 40 41 42 Examples 43 -------- 44 Run `mrsm edit config` and to edit connectors in the YAML file: 45 46 ```yaml 47 meerschaum: 48 connections: 49 {type}: 50 {label}: 51 ### attributes go here 52 ``` 53 54 """ 55 self._original_dict = copy.deepcopy(self.__dict__) 56 self._set_attributes(type=type, label=label, **kw) 57 self.verify_attributes(getattr(self, 'REQUIRED_ATTRIBUTES', None))
119 def verify_attributes( 120 self, 121 required_attributes: Optional[List[str]] = None, 122 debug: bool = False 123 ) -> None: 124 """ 125 Ensure that the required attributes have been met. 126 127 The Connector base class checks the minimum requirements. 128 Child classes may enforce additional requirements. 129 130 Parameters 131 ---------- 132 required_attributes: Optional[List[str]], default None 133 Attributes to be verified. If `None`, default to `['label']`. 134 135 debug: bool, default False 136 Verbosity toggle. 137 138 Returns 139 ------- 140 Don't return anything. 141 142 Raises 143 ------ 144 An error if any of the required attributes are missing. 145 """ 146 from meerschaum.utils.warnings import error, warn 147 from meerschaum.utils.debug import dprint 148 from meerschaum.utils.misc import items_str 149 if required_attributes is None: 150 required_attributes = ['label'] 151 missing_attributes = set() 152 for a in required_attributes: 153 if a not in self.__dict__: 154 missing_attributes.add(a) 155 if len(missing_attributes) > 0: 156 error( 157 ( 158 f"Missing {items_str(list(missing_attributes))} " 159 + f"for connector '{self.type}:{self.label}'." 160 ), 161 InvalidAttributesError, 162 silent = True, 163 stack = False 164 )
Ensure that the required attributes have been met.
The Connector base class checks the minimum requirements. Child classes may enforce additional requirements.
Parameters
- required_attributes (Optional[List[str]], default None):
Attributes to be verified. If
None
, default to['label']
. - debug (bool, default False): Verbosity toggle.
Returns
- Don't return anything.
Raises
- An error if any of the required attributes are missing.
179 @property 180 def meta(self) -> Dict[str, Any]: 181 """ 182 Return the keys needed to reconstruct this Connector. 183 """ 184 _meta = { 185 key: value 186 for key, value in self.__dict__.items() 187 if not str(key).startswith('_') 188 } 189 _meta.update({ 190 'type': self.type, 191 'label': self.label, 192 }) 193 return _meta
Return the keys needed to reconstruct this Connector.
196 @property 197 def type(self) -> str: 198 """ 199 Return the type for this connector. 200 """ 201 _type = self.__dict__.get('type', None) 202 if _type is None: 203 import re 204 _type = re.sub(r'connector$', '', self.__class__.__name__.lower()) 205 self.__dict__['type'] = _type 206 return _type
Return the type for this connector.
209 @property 210 def label(self) -> str: 211 """ 212 Return the label for this connector. 213 """ 214 _label = self.__dict__.get('label', None) 215 if _label is None: 216 from meerschaum.config.static import STATIC_CONFIG 217 _label = STATIC_CONFIG['connectors']['default_label'] 218 self.__dict__['label'] = _label 219 return _label
Return the label for this connector.
278def make_connector( 279 cls, 280 ): 281 """ 282 Register a class as a `Connector`. 283 The `type` will be the lower case of the class name, without the suffix `connector`. 284 285 Parameters 286 ---------- 287 instance: bool, default False 288 If `True`, make this connector type an instance connector. 289 This requires implementing the various pipes functions and lots of testing. 290 291 Examples 292 -------- 293 >>> import meerschaum as mrsm 294 >>> from meerschaum.connectors import make_connector, Connector 295 >>> 296 >>> @make_connector 297 >>> class FooConnector(Connector): 298 ... REQUIRED_ATTRIBUTES: list[str] = ['username', 'password'] 299 ... 300 >>> conn = mrsm.get_connector('foo:bar', username='dog', password='cat') 301 >>> print(conn.username, conn.password) 302 dog cat 303 >>> 304 """ 305 import re 306 typ = re.sub(r'connector$', '', cls.__name__.lower()) 307 with _locks['types']: 308 types[typ] = cls 309 with _locks['custom_types']: 310 custom_types.add(typ) 311 with _locks['connectors']: 312 if typ not in connectors: 313 connectors[typ] = {} 314 if getattr(cls, 'IS_INSTANCE', False): 315 with _locks['instance_types']: 316 if typ not in instance_types: 317 instance_types.append(typ) 318 319 return cls
Register a class as a Connector
.
The type
will be the lower case of the class name, without the suffix connector
.
Parameters
- instance (bool, default False):
If
True
, make this connector type an instance connector. This requires implementing the various pipes functions and lots of testing.
Examples
>>> import meerschaum as mrsm
>>> from meerschaum.connectors import make_connector, Connector
>>>
>>> @make_connector
>>> class FooConnector(Connector):
... REQUIRED_ATTRIBUTES: list[str] = ['username', 'password']
...
>>> conn = mrsm.get_connector('foo:bar', username='dog', password='cat')
>>> print(conn.username, conn.password)
dog cat
>>>