meerschaum
Meerschaum Python API
Welcome to the Meerschaum Python API technical documentation! Here you can find information about the classes and functions provided by the meerschaum
package. Visit meerschaum.io for general usage documentation.
Root Module
For your convenience, the following classes and functions may be imported from the root meerschaum
namespace:
Examples
Build a Connector
Get existing connectors or build a new one in-memory with the get_connector()
factory function:
import meerschaum as mrsm
sql_conn = mrsm.get_connector(
'sql:temp',
flavor='sqlite',
database='/tmp/tmp.db',
)
df = sql_conn.read("SELECT 1 AS foo")
print(df)
# foo
# 0 1
sql_conn.to_sql(df, 'foo')
print(sql_conn.read('foo'))
# foo
# 0 1
Create a Custom Connector Class
Decorate your connector classes with make_connector()
to designate it as a custom connector:
from datetime import datetime, timezone
from random import randint
import meerschaum as mrsm
from meerschaum.utils.misc import round_time
@mrsm.make_connector
class FooConnector(mrsm.Connector):
REQUIRED_ATTRIBUTES = ['username', 'password']
def fetch(
self,
begin: datetime | None = None,
end: datetime | None = None,
):
now = begin or round_time(datetime.now(timezone.utc))
return [
{'ts': now, 'id': 1, 'vl': randint(1, 100)},
{'ts': now, 'id': 2, 'vl': randint(1, 100)},
{'ts': now, 'id': 3, 'vl': randint(1, 100)},
]
foo_conn = mrsm.get_connector(
'foo:bar',
username='foo',
password='bar',
)
docs = foo_conn.fetch()
Build a Pipe
Build a Pipe
in-memory:
from datetime import datetime
import meerschaum as mrsm
pipe = mrsm.Pipe(
foo_conn, 'demo',
instance=sql_conn,
columns={'datetime': 'ts', 'id': 'id'},
tags=['production'],
)
pipe.sync(begin=datetime(2024, 1, 1))
df = pipe.get_data()
print(df)
# ts id vl
# 0 2024-01-01 1 97
# 1 2024-01-01 2 18
# 2 2024-01-01 3 96
Add temporary=True
to skip registering the pipe in the pipes table.
Get Registered Pipes
The get_pipes()
function returns a dictionary hierarchy of pipes by connector, metric, and location:
import meerschaum as mrsm
pipes = mrsm.get_pipes(instance='sql:temp')
pipe = pipes['foo:bar']['demo'][None]
Add as_list=True
to flatten the hierarchy:
import meerschaum as mrsm
pipes = mrsm.get_pipes(
tags=['production'],
instance=sql_conn,
as_list=True,
)
print(pipes)
# [Pipe('foo:bar', 'demo', instance='sql:temp')]
Import Plugins
You can import a plugin's module through Plugin.module
:
import meerschaum as mrsm
plugin = mrsm.Plugin('noaa')
with mrsm.Venv(plugin):
noaa = plugin.module
If your plugin has submodules, use meerschaum.plugins.from_plugin_import
:
from meerschaum.plugins import from_plugin_import
get_defined_pipes = from_plugin_import('compose.utils.pipes', 'get_defined_pipes')
Import multiple plugins with meerschaum.plugins.import_plugins
:
from meerschaum.plugins import import_plugins
noaa, compose = import_plugins('noaa', 'compose')
Create a Job
Create a Job
with name
and sysargs
:
import meerschaum as mrsm
job = mrsm.Job('syncing-engine', 'sync pipes --loop')
success, msg = job.start()
Pass executor_keys
as the connectors keys of an API instance to create a remote job:
import meerschaum as mrsm
job = mrsm.Job(
'foo',
'sync pipes -s daily',
executor_keys='api:main',
)
Import from a Virtual Environment
Use the Venv
context manager to activate a virtual environment:
import meerschaum as mrsm
with mrsm.Venv('noaa'):
import requests
print(requests.__file__)
# /home/bmeares/.config/meerschaum/venvs/noaa/lib/python3.12/site-packages/requests/__init__.py
To import packages which may not be installed, use attempt_import()
:
import meerschaum as mrsm
requests = mrsm.attempt_import('requests', venv='noaa')
print(requests.__file__)
# /home/bmeares/.config/meerschaum/venvs/noaa/lib/python3.12/site-packages/requests/__init__.py
Run Actions
Run sysargs
with entry()
:
import meerschaum as mrsm
success, msg = mrsm.entry('show pipes + show version : x2')
Use meerschaum.actions.get_action()
to access an action function directly:
from meerschaum.actions import get_action
show_pipes = get_action(['show', 'pipes'])
success, msg = show_pipes(connector_keys=['plugin:noaa'])
Get a dictionary of available subactions with meerschaum.actions.get_subactions()
:
from meerschaum.actions import get_subactions
subactions = get_subactions('show')
success, msg = subactions['pipes']()
Create a Plugin
Run bootstrap plugin
to create a new plugin:
mrsm bootstrap plugin example
This will create example.py
in your plugins directory (default ~/.config/meerschaum/plugins/
, Windows: %APPDATA%\Meerschaum\plugins
). You may paste the example code from the "Create a Custom Action" example below.
Open your plugin with edit plugin
:
mrsm edit plugin example
Run edit plugin
and paste the example code below to try out the features.
See the writing plugins guide for more in-depth documentation.
Create a Custom Action
Decorate a function with meerschaum.actions.make_action
to designate it as an action. Subactions will be automatically detected if not decorated:
from meerschaum.actions import make_action
@make_action
def sing():
print('What would you like me to sing?')
return True, "Success"
def sing_tune():
return False, "I don't know that song!"
def sing_song():
print('Hello, World!')
return True, "Success"
Use meerschaum.plugins.add_plugin_argument()
to create new parameters for your action:
from meerschaum.plugins import make_action, add_plugin_argument
add_plugin_argument(
'--song', type=str, help='What song to sing.',
)
@make_action
def sing_melody(action=None, song=None):
to_sing = action[0] if action else song
if not to_sing:
return False, "Please tell me what to sing!"
return True, f'~I am singing {to_sing}~'
mrsm sing melody lalala
mrsm sing melody --song do-re-mi
Add a Page to the Web Dashboard
Use the decorators meerschaum.plugins.dash_plugin()
and meerschaum.plugins.web_page()
to add new pages to the web dashboard:
from meerschaum.plugins import dash_plugin, web_page
@dash_plugin
def init_dash(dash_app):
import dash.html as html
import dash_bootstrap_components as dbc
from dash import Input, Output, no_update
### Routes to '/dash/my-page'
@web_page('/my-page', login_required=False)
def my_page():
return dbc.Container([
html.H1("Hello, World!"),
dbc.Button("Click me", id='my-button'),
html.Div(id="my-output-div"),
])
@dash_app.callback(
Output('my-output-div', 'children'),
Input('my-button', 'n_clicks'),
)
def my_button_click(n_clicks):
if not n_clicks:
return no_update
return html.P(f'You clicked {n_clicks} times!')
Submodules
meerschaum.actions
Access functions for actions and subactions.
meerschaum.actions.actions
meerschaum.actions.get_action()
meerschaum.actions.get_completer()
meerschaum.actions.get_main_action_name()
meerschaum.actions.get_subactions()
meerschaum.config
Read and write the Meerschaum configuration registry.
get_config()
meerschaum.config.get_plugin_config()
meerschaum.config.write_config()
meerschaum.config.write_plugin_config()
meerschaum.connectors
Build connectors to interact with databases and fetch data.
get_connector()
make_connector()
meerschaum.connectors.is_connected()
meerschaum.connectors.poll.retry_connect()
Connector
meerschaum.connectors.SQLConnector
meerschaum.connectors.APIConnector
meerschaum.connectors.valkey.ValkeyConnector
meerschaum.jobs
Start background jobs.
Job
meerschaum.jobs.Executor
meerschaum.jobs.systemd.SystemdExecutor
meerschaum.jobs.get_jobs()
meerschaum.jobs.get_filtered_jobs()
meerschaum.jobs.get_running_jobs()
meerschaum.jobs.get_stopped_jobs()
meerschaum.jobs.get_paused_jobs()
meerschaum.jobs.get_restart_jobs()
meerschaum.jobs.make_executor()
meerschaum.jobs.check_restart_jobs()
meerschaum.jobs.start_check_jobs_thread()
meerschaum.jobs.stop_check_jobs_thread()
meerschaum.plugins
Access plugin modules and other API utilties.
Plugin
meerschaum.plugins.api_plugin()
meerschaum.plugins.dash_plugin()
meerschaum.plugins.import_plugins()
meerschaum.plugins.reload_plugins()
meerschaum.plugins.get_plugins()
meerschaum.plugins.get_data_plugins()
meerschaum.plugins.add_plugin_argument()
meerschaum.plugins.pre_sync_hook()
meerschaum.plugins.post_sync_hook()
meerschaum.utils
Utility functions are available in several submodules:
meerschaum.utils.daemon.daemon_entry()
meerschaum.utils.daemon.daemon_action()
meerschaum.utils.daemon.get_daemons()
meerschaum.utils.daemon.get_daemon_ids()
meerschaum.utils.daemon.get_running_daemons()
meerschaum.utils.daemon.get_paused_daemons()
meerschaum.utils.daemon.get_stopped_daemons()
meerschaum.utils.daemon.get_filtered_daemons()
meerschaum.utils.daemon.run_daemon()
meerschaum.utils.daemon.Daemon
meerschaum.utils.daemon.FileDescriptorInterceptor
meerschaum.utils.daemon.RotatingFile
meerschaum.utils.daemon
Manage background jobs.
meerschaum.utils.dataframe.add_missing_cols_to_df()
meerschaum.utils.dataframe.df_is_chunk_generator()
meerschaum.utils.dataframe.enforce_dtypes()
meerschaum.utils.dataframe.filter_unseen_df()
meerschaum.utils.dataframe.get_datetime_bound_from_df()
meerschaum.utils.dataframe.get_first_valid_dask_partition()
meerschaum.utils.dataframe.get_json_cols()
meerschaum.utils.dataframe.get_numeric_cols()
meerschaum.utils.dataframe.get_unhashable_cols()
meerschaum.utils.dataframe.parse_df_datetimes()
meerschaum.utils.dataframe.query_df()
meerschaum.utils.dataframe
Manipulate dataframes.
meerschaum.utils.dtypes.are_dtypes_equal()
meerschaum.utils.dtypes.attempt_cast_to_numeric()
meerschaum.utils.dtypes.is_dtype_numeric()
meerschaum.utils.dtypes.none_if_null()
meerschaum.utils.dtypes.quantize_decimal()
meerschaum.utils.dtypes.to_pandas_dtype()
meerschaum.utils.dtypes.value_is_null()
meerschaum.utils.dtypes.sql.get_pd_type_from_db_type()
meerschaum.utils.dtypes.sql.get_db_type_from_pd_type()
meerschaum.utils.dtypes
Work with data types.
meerschaum.utils.formatting.colored()
meerschaum.utils.formatting.extract_stats_from_message()
meerschaum.utils.formatting.fill_ansi()
meerschaum.utils.formatting.get_console()
meerschaum.utils.formatting.highlight_pipes()
meerschaum.utils.formatting.make_header()
meerschaum.utils.formatting.pipe_repr()
pprint()
meerschaum.utils.formatting.pprint_pipes()
meerschaum.utils.formatting.print_options()
meerschaum.utils.formatting.print_pipes_results()
meerschaum.utils.formatting.print_tuple()
meerschaum.utils.formatting.translate_rich_to_termcolor()
meerschaum.utils.formatting
Format output text.
meerschaum.utils.misc.items_str()
meerschaum.utils.misc.round_time()
meerschaum.utils.misc.is_int()
meerschaum.utils.misc.interval_str()
meerschaum.utils.misc.filter_keywords()
meerschaum.utils.misc.generate_password()
meerschaum.utils.misc.string_to_dict()
meerschaum.utils.misc.iterate_chunks()
meerschaum.utils.misc.timed_input()
meerschaum.utils.misc.replace_pipes_in_dict()
meerschaum.utils.misc.is_valid_email()
meerschaum.utils.misc.string_width()
meerschaum.utils.misc.replace_password()
meerschaum.utils.misc.parse_config_substitution()
meerschaum.utils.misc.edit_file()
meerschaum.utils.misc.get_in_ex_params()
meerschaum.utils.misc.separate_negation_values()
meerschaum.utils.misc.flatten_list()
meerschaum.utils.misc.make_symlink()
meerschaum.utils.misc.is_symlink()
meerschaum.utils.misc.wget()
meerschaum.utils.misc.add_method_to_class()
meerschaum.utils.misc.is_pipe_registered()
meerschaum.utils.misc.get_cols_lines()
meerschaum.utils.misc.sorted_dict()
meerschaum.utils.misc.flatten_pipes_dict()
meerschaum.utils.misc.dict_from_od()
meerschaum.utils.misc.remove_ansi()
meerschaum.utils.misc.get_connector_labels()
meerschaum.utils.misc.json_serialize_datetime()
meerschaum.utils.misc.async_wrap()
meerschaum.utils.misc.is_docker_available()
meerschaum.utils.misc.is_android()
meerschaum.utils.misc.is_bcp_available()
meerschaum.utils.misc.truncate_string_sections()
meerschaum.utils.misc.safely_extract_tar()
meerschaum.utils.misc
Miscellaneous utility functions.
attempt_import()
meerschaum.utils.packages.get_module_path()
meerschaum.utils.packages.manually_import_module()
meerschaum.utils.packages.get_install_no_version()
meerschaum.utils.packages.determine_version()
meerschaum.utils.packages.need_update()
meerschaum.utils.packages.get_pip()
meerschaum.utils.packages.pip_install()
meerschaum.utils.packages.pip_uninstall()
meerschaum.utils.packages.completely_uninstall_package()
meerschaum.utils.packages.run_python_package()
meerschaum.utils.packages.lazy_import()
meerschaum.utils.packages.pandas_name()
meerschaum.utils.packages.import_pandas()
meerschaum.utils.packages.import_rich()
meerschaum.utils.packages.import_dcc()
meerschaum.utils.packages.import_html()
meerschaum.utils.packages.get_modules_from_package()
meerschaum.utils.packages.import_children()
meerschaum.utils.packages.reload_package()
meerschaum.utils.packages.reload_meerschaum()
meerschaum.utils.packages.is_installed()
meerschaum.utils.packages.venv_contains_package()
meerschaum.utils.packages.package_venv()
meerschaum.utils.packages.ensure_readline()
meerschaum.utils.packages.get_prerelease_dependencies()
meerschaum.utils.packages
Manage Python packages.
meerschaum.utils.sql.build_where()
meerschaum.utils.sql.clean()
meerschaum.utils.sql.dateadd_str()
meerschaum.utils.sql.test_connection()
meerschaum.utils.sql.get_distinct_col_count()
meerschaum.utils.sql.sql_item_name()
meerschaum.utils.sql.pg_capital()
meerschaum.utils.sql.oracle_capital()
meerschaum.utils.sql.truncate_item_name()
meerschaum.utils.sql.table_exists()
meerschaum.utils.sql.get_table_cols_types()
meerschaum.utils.sql.get_update_queries()
meerschaum.utils.sql.get_null_replacement()
meerschaum.utils.sql.get_db_version()
meerschaum.utils.sql.get_rename_table_queries()
meerschaum.utils.sql.get_create_table_query()
meerschaum.utils.sql.format_cte_subquery()
meerschaum.utils.sql.session_execute()
meerschaum.utils.sql
Build SQL queries.
Venv
meerschaum.utils.venv.activate_venv()
meerschaum.utils.venv.deactivate_venv()
meerschaum.utils.venv.get_module_venv()
meerschaum.utils.venv.get_venvs()
meerschaum.utils.venv.init_venv()
meerschaum.utils.venv.inside_venv()
meerschaum.utils.venv.is_venv_active()
meerschaum.utils.venv.venv_exec()
meerschaum.utils.venv.venv_executable()
meerschaum.utils.venv.venv_exists()
meerschaum.utils.venv.venv_target_path()
meerschaum.utils.venv.verify_venv()
meerschaum.utils.venv
Manage virtual environments.
meerschaum.utils.warnings
Print warnings, errors, info, and debug messages.
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Copyright 2023 Bennett Meares 7 8Licensed under the Apache License, Version 2.0 (the "License"); 9you may not use this file except in compliance with the License. 10You may obtain a copy of the License at 11 12 http://www.apache.org/licenses/LICENSE-2.0 13 14Unless required by applicable law or agreed to in writing, software 15distributed under the License is distributed on an "AS IS" BASIS, 16WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 17See the License for the specific language governing permissions and 18limitations under the License. 19""" 20 21import atexit 22from meerschaum.utils.typing import SuccessTuple 23from meerschaum.utils.packages import attempt_import 24from meerschaum.core.Pipe import Pipe 25from meerschaum.plugins import Plugin 26from meerschaum.utils.venv import Venv 27from meerschaum.jobs import Job, make_executor 28from meerschaum.connectors import get_connector, Connector, make_connector 29from meerschaum.utils import get_pipes 30from meerschaum.utils.formatting import pprint 31from meerschaum._internal.docs import index as __doc__ 32from meerschaum.config import __version__, get_config 33from meerschaum._internal.entry import entry 34from meerschaum.__main__ import _close_pools 35 36atexit.register(_close_pools) 37 38__pdoc__ = {'gui': False, 'api': False, 'core': False, '_internal': False} 39__all__ = ( 40 "get_pipes", 41 "get_connector", 42 "get_config", 43 "Pipe", 44 "Plugin", 45 "Venv", 46 "Plugin", 47 "Job", 48 "pprint", 49 "attempt_import", 50 "actions", 51 "config", 52 "connectors", 53 "jobs", 54 "plugins", 55 "utils", 56 "SuccessTuple", 57 "Connector", 58 "make_connector", 59 "entry", 60)
19def get_pipes( 20 connector_keys: Union[str, List[str], None] = None, 21 metric_keys: Union[str, List[str], None] = None, 22 location_keys: Union[str, List[str], None] = None, 23 tags: Optional[List[str]] = None, 24 params: Optional[Dict[str, Any]] = None, 25 mrsm_instance: Union[str, InstanceConnector, None] = None, 26 instance: Union[str, InstanceConnector, None] = None, 27 as_list: bool = False, 28 method: str = 'registered', 29 debug: bool = False, 30 **kw: Any 31) -> Union[PipesDict, List[mrsm.Pipe]]: 32 """ 33 Return a dictionary or list of `meerschaum.Pipe` objects. 34 35 Parameters 36 ---------- 37 connector_keys: Union[str, List[str], None], default None 38 String or list of connector keys. 39 If omitted or is `'*'`, fetch all possible keys. 40 If a string begins with `'_'`, select keys that do NOT match the string. 41 42 metric_keys: Union[str, List[str], None], default None 43 String or list of metric keys. See `connector_keys` for formatting. 44 45 location_keys: Union[str, List[str], None], default None 46 String or list of location keys. See `connector_keys` for formatting. 47 48 tags: Optional[List[str]], default None 49 If provided, only include pipes with these tags. 50 51 params: Optional[Dict[str, Any]], default None 52 Dictionary of additional parameters to search by. 53 Params are parsed into a SQL WHERE clause. 54 E.g. `{'a': 1, 'b': 2}` equates to `'WHERE a = 1 AND b = 2'` 55 56 mrsm_instance: Union[str, InstanceConnector, None], default None 57 Connector keys for the Meerschaum instance of the pipes. 58 Must be a `meerschaum.connectors.sql.SQLConnector.SQLConnector` or 59 `meerschaum.connectors.api.APIConnector.APIConnector`. 60 61 as_list: bool, default False 62 If `True`, return pipes in a list instead of a hierarchical dictionary. 63 `False` : `{connector_keys: {metric_key: {location_key: Pipe}}}` 64 `True` : `[Pipe]` 65 66 method: str, default 'registered' 67 Available options: `['registered', 'explicit', 'all']` 68 If `'registered'` (default), create pipes based on registered keys in the connector's pipes table 69 (API or SQL connector, depends on mrsm_instance). 70 If `'explicit'`, create pipes from provided connector_keys, metric_keys, and location_keys 71 instead of consulting the pipes table. Useful for creating non-existent pipes. 72 If `'all'`, create pipes from predefined metrics and locations. Required `connector_keys`. 73 **NOTE:** Method `'all'` is not implemented! 74 75 **kw: Any: 76 Keyword arguments to pass to the `meerschaum.Pipe` constructor. 77 78 79 Returns 80 ------- 81 A dictionary of dictionaries and `meerschaum.Pipe` objects 82 in the connector, metric, location hierarchy. 83 If `as_list` is `True`, return a list of `meerschaum.Pipe` objects. 84 85 Examples 86 -------- 87 ``` 88 >>> ### Manual definition: 89 >>> pipes = { 90 ... <connector_keys>: { 91 ... <metric_key>: { 92 ... <location_key>: Pipe( 93 ... <connector_keys>, 94 ... <metric_key>, 95 ... <location_key>, 96 ... ), 97 ... }, 98 ... }, 99 ... }, 100 >>> ### Accessing a single pipe: 101 >>> pipes['sql:main']['weather'][None] 102 >>> ### Return a list instead: 103 >>> get_pipes(as_list=True) 104 [sql_main_weather] 105 >>> 106 ``` 107 """ 108 109 from meerschaum.config import get_config 110 from meerschaum.utils.warnings import error 111 from meerschaum.utils.misc import filter_keywords 112 113 if connector_keys is None: 114 connector_keys = [] 115 if metric_keys is None: 116 metric_keys = [] 117 if location_keys is None: 118 location_keys = [] 119 if params is None: 120 params = {} 121 if tags is None: 122 tags = [] 123 124 if isinstance(connector_keys, str): 125 connector_keys = [connector_keys] 126 if isinstance(metric_keys, str): 127 metric_keys = [metric_keys] 128 if isinstance(location_keys, str): 129 location_keys = [location_keys] 130 131 ### Get SQL or API connector (keys come from `connector.fetch_pipes_keys()`). 132 if mrsm_instance is None: 133 mrsm_instance = instance 134 if mrsm_instance is None: 135 mrsm_instance = get_config('meerschaum', 'instance', patch=True) 136 if isinstance(mrsm_instance, str): 137 from meerschaum.connectors.parse import parse_instance_keys 138 connector = parse_instance_keys(keys=mrsm_instance, debug=debug) 139 else: 140 from meerschaum.connectors import instance_types 141 valid_connector = False 142 if hasattr(mrsm_instance, 'type'): 143 if mrsm_instance.type in instance_types: 144 valid_connector = True 145 if not valid_connector: 146 error(f"Invalid instance connector: {mrsm_instance}") 147 connector = mrsm_instance 148 if debug: 149 from meerschaum.utils.debug import dprint 150 dprint(f"Using instance connector: {connector}") 151 if not connector: 152 error(f"Could not create connector from keys: '{mrsm_instance}'") 153 154 ### Get a list of tuples for the keys needed to build pipes. 155 result = fetch_pipes_keys( 156 method, 157 connector, 158 connector_keys = connector_keys, 159 metric_keys = metric_keys, 160 location_keys = location_keys, 161 tags = tags, 162 params = params, 163 debug = debug 164 ) 165 if result is None: 166 error(f"Unable to build pipes!") 167 168 ### Populate the `pipes` dictionary with Pipes based on the keys 169 ### obtained from the chosen `method`. 170 from meerschaum import Pipe 171 pipes = {} 172 for ck, mk, lk in result: 173 if ck not in pipes: 174 pipes[ck] = {} 175 176 if mk not in pipes[ck]: 177 pipes[ck][mk] = {} 178 179 pipes[ck][mk][lk] = Pipe( 180 ck, mk, lk, 181 mrsm_instance = connector, 182 debug = debug, 183 **filter_keywords(Pipe, **kw) 184 ) 185 186 if not as_list: 187 return pipes 188 from meerschaum.utils.misc import flatten_pipes_dict 189 return flatten_pipes_dict(pipes)
Return a dictionary or list of Pipe
objects.
Parameters
- connector_keys (Union[str, List[str], None], default None):
String or list of connector keys.
If omitted or is
'*'
, fetch all possible keys. If a string begins with'_'
, select keys that do NOT match the string. - metric_keys (Union[str, List[str], None], default None):
String or list of metric keys. See
connector_keys
for formatting. - location_keys (Union[str, List[str], None], default None):
String or list of location keys. See
connector_keys
for formatting. - tags (Optional[List[str]], default None): If provided, only include pipes with these tags.
- params (Optional[Dict[str, Any]], default None):
Dictionary of additional parameters to search by.
Params are parsed into a SQL WHERE clause.
E.g.
{'a': 1, 'b': 2}
equates to'WHERE a = 1 AND b = 2'
- mrsm_instance (Union[str, InstanceConnector, None], default None):
Connector keys for the Meerschaum instance of the pipes.
Must be a
meerschaum.connectors.SQLConnector
ormeerschaum.connectors.APIConnector
. - as_list (bool, default False):
If
True
, return pipes in a list instead of a hierarchical dictionary.False
:{connector_keys: {metric_key: {location_key: Pipe}}}
True
:[Pipe]
- method (str, default 'registered'):
Available options:
['registered', 'explicit', 'all']
If'registered'
(default), create pipes based on registered keys in the connector's pipes table (API or SQL connector, depends on mrsm_instance). If'explicit'
, create pipes from provided connector_keys, metric_keys, and location_keys instead of consulting the pipes table. Useful for creating non-existent pipes. If'all'
, create pipes from predefined metrics and locations. Requiredconnector_keys
. NOTE: Method'all'
is not implemented! - **kw (Any:):
Keyword arguments to pass to the
Pipe
constructor.
Returns
- A dictionary of dictionaries and
Pipe
objects - in the connector, metric, location hierarchy.
- If
as_list
isTrue
, return a list ofPipe
objects.
Examples
>>> ### Manual definition:
>>> pipes = {
... <connector_keys>: {
... <metric_key>: {
... <location_key>: Pipe(
... <connector_keys>,
... <metric_key>,
... <location_key>,
... ),
... },
... },
... },
>>> ### Accessing a single pipe:
>>> pipes['sql:main']['weather'][None]
>>> ### Return a list instead:
>>> get_pipes(as_list=True)
[sql_main_weather]
>>>
80def get_connector( 81 type: str = None, 82 label: str = None, 83 refresh: bool = False, 84 debug: bool = False, 85 **kw: Any 86) -> Connector: 87 """ 88 Return existing connector or create new connection and store for reuse. 89 90 You can create new connectors if enough parameters are provided for the given type and flavor. 91 92 93 Parameters 94 ---------- 95 type: Optional[str], default None 96 Connector type (sql, api, etc.). 97 Defaults to the type of the configured `instance_connector`. 98 99 label: Optional[str], default None 100 Connector label (e.g. main). Defaults to `'main'`. 101 102 refresh: bool, default False 103 Refresh the Connector instance / construct new object. Defaults to `False`. 104 105 kw: Any 106 Other arguments to pass to the Connector constructor. 107 If the Connector has already been constructed and new arguments are provided, 108 `refresh` is set to `True` and the old Connector is replaced. 109 110 Returns 111 ------- 112 A new Meerschaum connector (e.g. `meerschaum.connectors.api.APIConnector`, 113 `meerschaum.connectors.sql.SQLConnector`). 114 115 Examples 116 -------- 117 The following parameters would create a new 118 `meerschaum.connectors.sql.SQLConnector` that isn't in the configuration file. 119 120 ``` 121 >>> conn = get_connector( 122 ... type = 'sql', 123 ... label = 'newlabel', 124 ... flavor = 'sqlite', 125 ... database = '/file/path/to/database.db' 126 ... ) 127 >>> 128 ``` 129 130 """ 131 from meerschaum.connectors.parse import parse_instance_keys 132 from meerschaum.config import get_config 133 from meerschaum.config.static import STATIC_CONFIG 134 from meerschaum.utils.warnings import warn 135 global _loaded_plugin_connectors 136 if isinstance(type, str) and not label and ':' in type: 137 type, label = type.split(':', maxsplit=1) 138 139 with _locks['_loaded_plugin_connectors']: 140 if not _loaded_plugin_connectors: 141 load_plugin_connectors() 142 _load_builtin_custom_connectors() 143 _loaded_plugin_connectors = True 144 145 if type is None and label is None: 146 default_instance_keys = get_config('meerschaum', 'instance', patch=True) 147 ### recursive call to get_connector 148 return parse_instance_keys(default_instance_keys) 149 150 ### NOTE: the default instance connector may not be main. 151 ### Only fall back to 'main' if the type is provided by the label is omitted. 152 label = label if label is not None else STATIC_CONFIG['connectors']['default_label'] 153 154 ### type might actually be a label. Check if so and raise a warning. 155 if type not in connectors: 156 possibilities, poss_msg = [], "" 157 for _type in get_config('meerschaum', 'connectors'): 158 if type in get_config('meerschaum', 'connectors', _type): 159 possibilities.append(f"{_type}:{type}") 160 if len(possibilities) > 0: 161 poss_msg = " Did you mean" 162 for poss in possibilities[:-1]: 163 poss_msg += f" '{poss}'," 164 if poss_msg.endswith(','): 165 poss_msg = poss_msg[:-1] 166 if len(possibilities) > 1: 167 poss_msg += " or" 168 poss_msg += f" '{possibilities[-1]}'?" 169 170 warn(f"Cannot create Connector of type '{type}'." + poss_msg, stack=False) 171 return None 172 173 if 'sql' not in types: 174 from meerschaum.connectors.plugin import PluginConnector 175 from meerschaum.connectors.valkey import ValkeyConnector 176 with _locks['types']: 177 types.update({ 178 'api': APIConnector, 179 'sql': SQLConnector, 180 'plugin': PluginConnector, 181 'valkey': ValkeyConnector, 182 }) 183 184 ### determine if we need to call the constructor 185 if not refresh: 186 ### see if any user-supplied arguments differ from the existing instance 187 if label in connectors[type]: 188 warning_message = None 189 for attribute, value in kw.items(): 190 if attribute not in connectors[type][label].meta: 191 import inspect 192 cls = connectors[type][label].__class__ 193 cls_init_signature = inspect.signature(cls) 194 cls_init_params = cls_init_signature.parameters 195 if attribute not in cls_init_params: 196 warning_message = ( 197 f"Received new attribute '{attribute}' not present in connector " + 198 f"{connectors[type][label]}.\n" 199 ) 200 elif connectors[type][label].__dict__[attribute] != value: 201 warning_message = ( 202 f"Mismatched values for attribute '{attribute}' in connector " 203 + f"'{connectors[type][label]}'.\n" + 204 f" - Keyword value: '{value}'\n" + 205 f" - Existing value: '{connectors[type][label].__dict__[attribute]}'\n" 206 ) 207 if warning_message is not None: 208 warning_message += ( 209 "\nSetting `refresh` to True and recreating connector with type:" 210 + f" '{type}' and label '{label}'." 211 ) 212 refresh = True 213 warn(warning_message) 214 else: ### connector doesn't yet exist 215 refresh = True 216 217 ### only create an object if refresh is True 218 ### (can be manually specified, otherwise determined above) 219 if refresh: 220 with _locks['connectors']: 221 try: 222 ### will raise an error if configuration is incorrect / missing 223 conn = types[type](label=label, **kw) 224 connectors[type][label] = conn 225 except InvalidAttributesError as ie: 226 warn( 227 f"Incorrect attributes for connector '{type}:{label}'.\n" 228 + str(ie), 229 stack = False, 230 ) 231 conn = None 232 except Exception as e: 233 from meerschaum.utils.formatting import get_console 234 console = get_console() 235 if console: 236 console.print_exception() 237 warn( 238 f"Exception when creating connector '{type}:{label}'.\n" + str(e), 239 stack = False, 240 ) 241 conn = None 242 if conn is None: 243 return None 244 245 return connectors[type][label]
Return existing connector or create new connection and store for reuse.
You can create new connectors if enough parameters are provided for the given type and flavor.
Parameters
- type (Optional[str], default None):
Connector type (sql, api, etc.).
Defaults to the type of the configured
instance_connector
. - label (Optional[str], default None):
Connector label (e.g. main). Defaults to
'main'
. - refresh (bool, default False):
Refresh the Connector instance / construct new object. Defaults to
False
. - kw (Any):
Other arguments to pass to the Connector constructor.
If the Connector has already been constructed and new arguments are provided,
refresh
is set toTrue
and the old Connector is replaced.
Returns
- A new Meerschaum connector (e.g.
meerschaum.connectors.APIConnector
, meerschaum.connectors.SQLConnector
).
Examples
The following parameters would create a new
meerschaum.connectors.SQLConnector
that isn't in the configuration file.
>>> conn = get_connector(
... type = 'sql',
... label = 'newlabel',
... flavor = 'sqlite',
... database = '/file/path/to/database.db'
... )
>>>
82def get_config( 83 *keys: str, 84 patch: bool = True, 85 substitute: bool = True, 86 sync_files: bool = True, 87 write_missing: bool = True, 88 as_tuple: bool = False, 89 warn: bool = True, 90 debug: bool = False 91 ) -> Any: 92 """ 93 Return the Meerschaum configuration dictionary. 94 If positional arguments are provided, index by the keys. 95 Raises a warning if invalid keys are provided. 96 97 Parameters 98 ---------- 99 keys: str: 100 List of strings to index. 101 102 patch: bool, default True 103 If `True`, patch missing default keys into the config directory. 104 Defaults to `True`. 105 106 sync_files: bool, default True 107 If `True`, sync files if needed. 108 Defaults to `True`. 109 110 write_missing: bool, default True 111 If `True`, write default values when the main config files are missing. 112 Defaults to `True`. 113 114 substitute: bool, default True 115 If `True`, subsitute 'MRSM{}' values. 116 Defaults to `True`. 117 118 as_tuple: bool, default False 119 If `True`, return a tuple of type (success, value). 120 Defaults to `False`. 121 122 Returns 123 ------- 124 The value in the configuration directory, indexed by the provided keys. 125 126 Examples 127 -------- 128 >>> get_config('meerschaum', 'instance') 129 'sql:main' 130 >>> get_config('does', 'not', 'exist') 131 UserWarning: Invalid keys in config: ('does', 'not', 'exist') 132 """ 133 import json 134 135 symlinks_key = STATIC_CONFIG['config']['symlinks_key'] 136 if debug: 137 from meerschaum.utils.debug import dprint 138 dprint(f"Indexing keys: {keys}", color=False) 139 140 if len(keys) == 0: 141 _rc = _config(substitute=substitute, sync_files=sync_files, write_missing=write_missing) 142 if as_tuple: 143 return True, _rc 144 return _rc 145 146 ### Weird threading issues, only import if substitute is True. 147 if substitute: 148 from meerschaum.config._read_config import search_and_substitute_config 149 ### Invalidate the cache if it was read before with substitute=False 150 ### but there still exist substitutions. 151 if ( 152 config is not None and substitute and keys[0] != symlinks_key 153 and 'MRSM{' in json.dumps(config.get(keys[0])) 154 ): 155 try: 156 _subbed = search_and_substitute_config({keys[0]: config[keys[0]]}) 157 except Exception as e: 158 import traceback 159 traceback.print_exc() 160 config[keys[0]] = _subbed[keys[0]] 161 if symlinks_key in _subbed: 162 if symlinks_key not in config: 163 config[symlinks_key] = {} 164 if keys[0] not in config[symlinks_key]: 165 config[symlinks_key][keys[0]] = {} 166 config[symlinks_key][keys[0]] = apply_patch_to_config( 167 _subbed, 168 config[symlinks_key][keys[0]] 169 ) 170 171 from meerschaum.config._sync import sync_files as _sync_files 172 if config is None: 173 _config(*keys, sync_files=sync_files) 174 175 invalid_keys = False 176 if keys[0] not in config and keys[0] != symlinks_key: 177 single_key_config = read_config( 178 keys=[keys[0]], substitute=substitute, write_missing=write_missing 179 ) 180 if keys[0] not in single_key_config: 181 invalid_keys = True 182 else: 183 config[keys[0]] = single_key_config.get(keys[0], None) 184 if symlinks_key in single_key_config and keys[0] in single_key_config[symlinks_key]: 185 if symlinks_key not in config: 186 config[symlinks_key] = {} 187 config[symlinks_key][keys[0]] = single_key_config[symlinks_key][keys[0]] 188 189 if sync_files: 190 _sync_files(keys=[keys[0]]) 191 192 c = config 193 if len(keys) > 0: 194 for k in keys: 195 try: 196 c = c[k] 197 except Exception as e: 198 invalid_keys = True 199 break 200 if invalid_keys: 201 ### Check if the keys are in the default configuration. 202 from meerschaum.config._default import default_config 203 in_default = True 204 patched_default_config = ( 205 search_and_substitute_config(default_config) 206 if substitute else copy.deepcopy(default_config) 207 ) 208 _c = patched_default_config 209 for k in keys: 210 try: 211 _c = _c[k] 212 except Exception as e: 213 in_default = False 214 if in_default: 215 c = _c 216 invalid_keys = False 217 warning_msg = f"Invalid keys in config: {keys}" 218 if not in_default: 219 try: 220 if warn: 221 from meerschaum.utils.warnings import warn as _warn 222 _warn(warning_msg, stacklevel=3, color=False) 223 except Exception as e: 224 if warn: 225 print(warning_msg) 226 if as_tuple: 227 return False, None 228 return None 229 230 ### Don't write keys that we haven't yet loaded into memory. 231 not_loaded_keys = [k for k in patched_default_config if k not in config] 232 for k in not_loaded_keys: 233 patched_default_config.pop(k, None) 234 235 set_config( 236 apply_patch_to_config( 237 patched_default_config, 238 config, 239 ) 240 ) 241 if patch and keys[0] != symlinks_key: 242 if write_missing: 243 write_config(config, debug=debug) 244 245 if as_tuple: 246 return (not invalid_keys), c 247 return c
Return the Meerschaum configuration dictionary. If positional arguments are provided, index by the keys. Raises a warning if invalid keys are provided.
Parameters
- keys (str:): List of strings to index.
- patch (bool, default True):
If
True
, patch missing default keys into the config directory. Defaults toTrue
. - sync_files (bool, default True):
If
True
, sync files if needed. Defaults toTrue
. - write_missing (bool, default True):
If
True
, write default values when the main config files are missing. Defaults toTrue
. - substitute (bool, default True):
If
True
, subsitute 'MRSM{}' values. Defaults toTrue
. - as_tuple (bool, default False):
If
True
, return a tuple of type (success, value). Defaults toFalse
.
Returns
- The value in the configuration directory, indexed by the provided keys.
Examples
>>> get_config('meerschaum', 'instance')
'sql:main'
>>> get_config('does', 'not', 'exist')
UserWarning: Invalid keys in config: ('does', 'not', 'exist')
60class Pipe: 61 """ 62 Access Meerschaum pipes via Pipe objects. 63 64 Pipes are identified by the following: 65 66 1. Connector keys (e.g. `'sql:main'`) 67 2. Metric key (e.g. `'weather'`) 68 3. Location (optional; e.g. `None`) 69 70 A pipe's connector keys correspond to a data source, and when the pipe is synced, 71 its `fetch` definition is evaluated and executed to produce new data. 72 73 Alternatively, new data may be directly synced via `pipe.sync()`: 74 75 ``` 76 >>> from meerschaum import Pipe 77 >>> pipe = Pipe('csv', 'weather') 78 >>> 79 >>> import pandas as pd 80 >>> df = pd.read_csv('weather.csv') 81 >>> pipe.sync(df) 82 ``` 83 """ 84 85 from ._fetch import ( 86 fetch, 87 get_backtrack_interval, 88 ) 89 from ._data import ( 90 get_data, 91 get_backtrack_data, 92 get_rowcount, 93 _get_data_as_iterator, 94 get_chunk_interval, 95 get_chunk_bounds, 96 ) 97 from ._register import register 98 from ._attributes import ( 99 attributes, 100 parameters, 101 columns, 102 dtypes, 103 get_columns, 104 get_columns_types, 105 get_indices, 106 tags, 107 get_id, 108 id, 109 get_val_column, 110 parents, 111 children, 112 target, 113 _target_legacy, 114 guess_datetime, 115 ) 116 from ._show import show 117 from ._edit import edit, edit_definition, update 118 from ._sync import ( 119 sync, 120 get_sync_time, 121 exists, 122 filter_existing, 123 _get_chunk_label, 124 get_num_workers, 125 _persist_new_json_columns, 126 _persist_new_numeric_columns, 127 ) 128 from ._verify import ( 129 verify, 130 get_bound_interval, 131 get_bound_time, 132 ) 133 from ._delete import delete 134 from ._drop import drop 135 from ._clear import clear 136 from ._deduplicate import deduplicate 137 from ._bootstrap import bootstrap 138 from ._dtypes import enforce_dtypes, infer_dtypes 139 from ._copy import copy_to 140 141 def __init__( 142 self, 143 connector: str = '', 144 metric: str = '', 145 location: Optional[str] = None, 146 parameters: Optional[Dict[str, Any]] = None, 147 columns: Union[Dict[str, str], List[str], None] = None, 148 tags: Optional[List[str]] = None, 149 target: Optional[str] = None, 150 dtypes: Optional[Dict[str, str]] = None, 151 instance: Optional[Union[str, InstanceConnector]] = None, 152 temporary: bool = False, 153 mrsm_instance: Optional[Union[str, InstanceConnector]] = None, 154 cache: bool = False, 155 debug: bool = False, 156 connector_keys: Optional[str] = None, 157 metric_key: Optional[str] = None, 158 location_key: Optional[str] = None, 159 ): 160 """ 161 Parameters 162 ---------- 163 connector: str 164 Keys for the pipe's source connector, e.g. `'sql:main'`. 165 166 metric: str 167 Label for the pipe's contents, e.g. `'weather'`. 168 169 location: str, default None 170 Label for the pipe's location. Defaults to `None`. 171 172 parameters: Optional[Dict[str, Any]], default None 173 Optionally set a pipe's parameters from the constructor, 174 e.g. columns and other attributes. 175 You can edit these parameters with `edit pipes`. 176 177 columns: Optional[Dict[str, str]], default None 178 Set the `columns` dictionary of `parameters`. 179 If `parameters` is also provided, this dictionary is added under the `'columns'` key. 180 181 tags: Optional[List[str]], default None 182 A list of strings to be added under the `'tags'` key of `parameters`. 183 You can select pipes with certain tags using `--tags`. 184 185 dtypes: Optional[Dict[str, str]], default None 186 Set the `dtypes` dictionary of `parameters`. 187 If `parameters` is also provided, this dictionary is added under the `'dtypes'` key. 188 189 mrsm_instance: Optional[Union[str, InstanceConnector]], default None 190 Connector for the Meerschaum instance where the pipe resides. 191 Defaults to the preconfigured default instance (`'sql:main'`). 192 193 instance: Optional[Union[str, InstanceConnector]], default None 194 Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored. 195 196 temporary: bool, default False 197 If `True`, prevent instance tables (pipes, users, plugins) from being created. 198 199 cache: bool, default False 200 If `True`, cache fetched data into a local database file. 201 Defaults to `False`. 202 """ 203 from meerschaum.utils.warnings import error, warn 204 if (not connector and not connector_keys) or (not metric and not metric_key): 205 error( 206 "Please provide strings for the connector and metric\n " 207 + "(first two positional arguments)." 208 ) 209 210 ### Fall back to legacy `location_key` just in case. 211 if not location: 212 location = location_key 213 214 if not connector: 215 connector = connector_keys 216 217 if not metric: 218 metric = metric_key 219 220 if location in ('[None]', 'None'): 221 location = None 222 223 from meerschaum.config.static import STATIC_CONFIG 224 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 225 for k in (connector, metric, location, *(tags or [])): 226 if str(k).startswith(negation_prefix): 227 error(f"A pipe's keys and tags cannot start with the prefix '{negation_prefix}'.") 228 229 self.connector_keys = str(connector) 230 self.connector_key = self.connector_keys ### Alias 231 self.metric_key = metric 232 self.location_key = location 233 self.temporary = temporary 234 235 self._attributes = { 236 'connector_keys': self.connector_keys, 237 'metric_key': self.metric_key, 238 'location_key': self.location_key, 239 'parameters': {}, 240 } 241 242 ### only set parameters if values are provided 243 if isinstance(parameters, dict): 244 self._attributes['parameters'] = parameters 245 else: 246 if parameters is not None: 247 warn(f"The provided parameters are of invalid type '{type(parameters)}'.") 248 self._attributes['parameters'] = {} 249 250 columns = columns or self._attributes.get('parameters', {}).get('columns', {}) 251 if isinstance(columns, list): 252 columns = {str(col): str(col) for col in columns} 253 if isinstance(columns, dict): 254 self._attributes['parameters']['columns'] = columns 255 elif columns is not None: 256 warn(f"The provided columns are of invalid type '{type(columns)}'.") 257 258 if isinstance(tags, (list, tuple)): 259 self._attributes['parameters']['tags'] = tags 260 elif tags is not None: 261 warn(f"The provided tags are of invalid type '{type(tags)}'.") 262 263 if isinstance(target, str): 264 self._attributes['parameters']['target'] = target 265 elif target is not None: 266 warn(f"The provided target is of invalid type '{type(target)}'.") 267 268 if isinstance(dtypes, dict): 269 self._attributes['parameters']['dtypes'] = dtypes 270 elif dtypes is not None: 271 warn(f"The provided dtypes are of invalid type '{type(dtypes)}'.") 272 273 ### NOTE: The parameters dictionary is {} by default. 274 ### A Pipe may be registered without parameters, then edited, 275 ### or a Pipe may be registered with parameters set in-memory first. 276 # from meerschaum.config import get_config 277 _mrsm_instance = mrsm_instance if mrsm_instance is not None else instance 278 if _mrsm_instance is None: 279 _mrsm_instance = get_config('meerschaum', 'instance', patch=True) 280 281 if not isinstance(_mrsm_instance, str): 282 self._instance_connector = _mrsm_instance 283 self.instance_keys = str(_mrsm_instance) 284 else: ### NOTE: must be SQL or API Connector for this work 285 self.instance_keys = _mrsm_instance 286 287 self._cache = cache and get_config('system', 'experimental', 'cache') 288 289 290 @property 291 def meta(self): 292 """ 293 Return the four keys needed to reconstruct this pipe. 294 """ 295 return { 296 'connector': self.connector_keys, 297 'metric': self.metric_key, 298 'location': self.location_key, 299 'instance': self.instance_keys, 300 } 301 302 303 def keys(self) -> List[str]: 304 """ 305 Return the ordered keys for this pipe. 306 """ 307 return { 308 key: val 309 for key, val in self.meta.items() 310 if key != 'instance' 311 } 312 313 314 @property 315 def instance_connector(self) -> Union[InstanceConnector, None]: 316 """ 317 The connector to where this pipe resides. 318 May either be of type `meerschaum.connectors.sql.SQLConnector` or 319 `meerschaum.connectors.api.APIConnector`. 320 """ 321 if '_instance_connector' not in self.__dict__: 322 from meerschaum.connectors.parse import parse_instance_keys 323 conn = parse_instance_keys(self.instance_keys) 324 if conn: 325 self._instance_connector = conn 326 else: 327 return None 328 return self._instance_connector 329 330 @property 331 def connector(self) -> Union[meerschaum.connectors.Connector, None]: 332 """ 333 The connector to the data source. 334 """ 335 if '_connector' not in self.__dict__: 336 from meerschaum.connectors.parse import parse_instance_keys 337 import warnings 338 with warnings.catch_warnings(): 339 warnings.simplefilter('ignore') 340 try: 341 conn = parse_instance_keys(self.connector_keys) 342 except Exception as e: 343 conn = None 344 if conn: 345 self._connector = conn 346 else: 347 return None 348 return self._connector 349 350 351 @property 352 def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]: 353 """ 354 If the pipe was created with `cache=True`, return the connector to the pipe's 355 SQLite database for caching. 356 """ 357 if not self._cache: 358 return None 359 360 if '_cache_connector' not in self.__dict__: 361 from meerschaum.connectors import get_connector 362 from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH 363 _resources_path = SQLITE_RESOURCES_PATH 364 self._cache_connector = get_connector( 365 'sql', '_cache_' + str(self), 366 flavor='sqlite', 367 database=str(_resources_path / ('_cache_' + str(self) + '.db')), 368 ) 369 370 return self._cache_connector 371 372 373 @property 374 def cache_pipe(self) -> Union['meerschaum.Pipe', None]: 375 """ 376 If the pipe was created with `cache=True`, return another `meerschaum.Pipe` used to 377 manage the local data. 378 """ 379 if self.cache_connector is None: 380 return None 381 if '_cache_pipe' not in self.__dict__: 382 from meerschaum.config._patch import apply_patch_to_config 383 from meerschaum.utils.sql import sql_item_name 384 _parameters = copy.deepcopy(self.parameters) 385 _fetch_patch = { 386 'fetch': ({ 387 'definition': ( 388 f"SELECT * FROM " 389 + sql_item_name( 390 str(self.target), 391 self.instance_connector.flavor, 392 self.instance_connector.get_pipe_schema(self), 393 ) 394 ), 395 }) if self.instance_connector.type == 'sql' else ({ 396 'connector_keys': self.connector_keys, 397 'metric_key': self.metric_key, 398 'location_key': self.location_key, 399 }) 400 } 401 _parameters = apply_patch_to_config(_parameters, _fetch_patch) 402 self._cache_pipe = Pipe( 403 self.instance_keys, 404 (self.connector_keys + '_' + self.metric_key + '_cache'), 405 self.location_key, 406 mrsm_instance = self.cache_connector, 407 parameters = _parameters, 408 cache = False, 409 temporary = True, 410 ) 411 412 return self._cache_pipe 413 414 415 def __str__(self, ansi: bool=False): 416 return pipe_repr(self, ansi=ansi) 417 418 419 def __eq__(self, other): 420 try: 421 return ( 422 isinstance(self, type(other)) 423 and self.connector_keys == other.connector_keys 424 and self.metric_key == other.metric_key 425 and self.location_key == other.location_key 426 and self.instance_keys == other.instance_keys 427 ) 428 except Exception as e: 429 return False 430 431 def __hash__(self): 432 ### Using an esoteric separator to avoid collisions. 433 sep = "[\"']" 434 return hash( 435 str(self.connector_keys) + sep 436 + str(self.metric_key) + sep 437 + str(self.location_key) + sep 438 + str(self.instance_keys) + sep 439 ) 440 441 def __repr__(self, ansi: bool=True, **kw) -> str: 442 if not hasattr(sys, 'ps1'): 443 ansi = False 444 445 return pipe_repr(self, ansi=ansi, **kw) 446 447 def __pt_repr__(self): 448 from meerschaum.utils.packages import attempt_import 449 prompt_toolkit_formatted_text = attempt_import('prompt_toolkit.formatted_text', lazy=False) 450 return prompt_toolkit_formatted_text.ANSI(pipe_repr(self, ansi=True)) 451 452 def __getstate__(self) -> Dict[str, Any]: 453 """ 454 Define the state dictionary (pickling). 455 """ 456 return { 457 'connector': self.connector_keys, 458 'metric': self.metric_key, 459 'location': self.location_key, 460 'parameters': self.parameters, 461 'instance': self.instance_keys, 462 } 463 464 def __setstate__(self, _state: Dict[str, Any]): 465 """ 466 Read the state (unpickling). 467 """ 468 self.__init__(**_state) 469 470 471 def __getitem__(self, key: str) -> Any: 472 """ 473 Index the pipe's attributes. 474 If the `key` cannot be found`, return `None`. 475 """ 476 if key in self.attributes: 477 return self.attributes.get(key, None) 478 479 aliases = { 480 'connector': 'connector_keys', 481 'connector_key': 'connector_keys', 482 'metric': 'metric_key', 483 'location': 'location_key', 484 } 485 aliased_key = aliases.get(key, None) 486 if aliased_key is not None: 487 return self.attributes.get(aliased_key, None) 488 489 property_aliases = { 490 'instance': 'instance_keys', 491 'instance_key': 'instance_keys', 492 } 493 aliased_key = property_aliases.get(key, None) 494 if aliased_key is not None: 495 key = aliased_key 496 return getattr(self, key, None)
Access Meerschaum pipes via Pipe objects.
Pipes are identified by the following:
- Connector keys (e.g.
'sql:main'
) - Metric key (e.g.
'weather'
) - Location (optional; e.g.
None
)
A pipe's connector keys correspond to a data source, and when the pipe is synced,
its fetch
definition is evaluated and executed to produce new data.
Alternatively, new data may be directly synced via pipe.sync()
:
>>> from meerschaum import Pipe
>>> pipe = Pipe('csv', 'weather')
>>>
>>> import pandas as pd
>>> df = pd.read_csv('weather.csv')
>>> pipe.sync(df)
141 def __init__( 142 self, 143 connector: str = '', 144 metric: str = '', 145 location: Optional[str] = None, 146 parameters: Optional[Dict[str, Any]] = None, 147 columns: Union[Dict[str, str], List[str], None] = None, 148 tags: Optional[List[str]] = None, 149 target: Optional[str] = None, 150 dtypes: Optional[Dict[str, str]] = None, 151 instance: Optional[Union[str, InstanceConnector]] = None, 152 temporary: bool = False, 153 mrsm_instance: Optional[Union[str, InstanceConnector]] = None, 154 cache: bool = False, 155 debug: bool = False, 156 connector_keys: Optional[str] = None, 157 metric_key: Optional[str] = None, 158 location_key: Optional[str] = None, 159 ): 160 """ 161 Parameters 162 ---------- 163 connector: str 164 Keys for the pipe's source connector, e.g. `'sql:main'`. 165 166 metric: str 167 Label for the pipe's contents, e.g. `'weather'`. 168 169 location: str, default None 170 Label for the pipe's location. Defaults to `None`. 171 172 parameters: Optional[Dict[str, Any]], default None 173 Optionally set a pipe's parameters from the constructor, 174 e.g. columns and other attributes. 175 You can edit these parameters with `edit pipes`. 176 177 columns: Optional[Dict[str, str]], default None 178 Set the `columns` dictionary of `parameters`. 179 If `parameters` is also provided, this dictionary is added under the `'columns'` key. 180 181 tags: Optional[List[str]], default None 182 A list of strings to be added under the `'tags'` key of `parameters`. 183 You can select pipes with certain tags using `--tags`. 184 185 dtypes: Optional[Dict[str, str]], default None 186 Set the `dtypes` dictionary of `parameters`. 187 If `parameters` is also provided, this dictionary is added under the `'dtypes'` key. 188 189 mrsm_instance: Optional[Union[str, InstanceConnector]], default None 190 Connector for the Meerschaum instance where the pipe resides. 191 Defaults to the preconfigured default instance (`'sql:main'`). 192 193 instance: Optional[Union[str, InstanceConnector]], default None 194 Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored. 195 196 temporary: bool, default False 197 If `True`, prevent instance tables (pipes, users, plugins) from being created. 198 199 cache: bool, default False 200 If `True`, cache fetched data into a local database file. 201 Defaults to `False`. 202 """ 203 from meerschaum.utils.warnings import error, warn 204 if (not connector and not connector_keys) or (not metric and not metric_key): 205 error( 206 "Please provide strings for the connector and metric\n " 207 + "(first two positional arguments)." 208 ) 209 210 ### Fall back to legacy `location_key` just in case. 211 if not location: 212 location = location_key 213 214 if not connector: 215 connector = connector_keys 216 217 if not metric: 218 metric = metric_key 219 220 if location in ('[None]', 'None'): 221 location = None 222 223 from meerschaum.config.static import STATIC_CONFIG 224 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 225 for k in (connector, metric, location, *(tags or [])): 226 if str(k).startswith(negation_prefix): 227 error(f"A pipe's keys and tags cannot start with the prefix '{negation_prefix}'.") 228 229 self.connector_keys = str(connector) 230 self.connector_key = self.connector_keys ### Alias 231 self.metric_key = metric 232 self.location_key = location 233 self.temporary = temporary 234 235 self._attributes = { 236 'connector_keys': self.connector_keys, 237 'metric_key': self.metric_key, 238 'location_key': self.location_key, 239 'parameters': {}, 240 } 241 242 ### only set parameters if values are provided 243 if isinstance(parameters, dict): 244 self._attributes['parameters'] = parameters 245 else: 246 if parameters is not None: 247 warn(f"The provided parameters are of invalid type '{type(parameters)}'.") 248 self._attributes['parameters'] = {} 249 250 columns = columns or self._attributes.get('parameters', {}).get('columns', {}) 251 if isinstance(columns, list): 252 columns = {str(col): str(col) for col in columns} 253 if isinstance(columns, dict): 254 self._attributes['parameters']['columns'] = columns 255 elif columns is not None: 256 warn(f"The provided columns are of invalid type '{type(columns)}'.") 257 258 if isinstance(tags, (list, tuple)): 259 self._attributes['parameters']['tags'] = tags 260 elif tags is not None: 261 warn(f"The provided tags are of invalid type '{type(tags)}'.") 262 263 if isinstance(target, str): 264 self._attributes['parameters']['target'] = target 265 elif target is not None: 266 warn(f"The provided target is of invalid type '{type(target)}'.") 267 268 if isinstance(dtypes, dict): 269 self._attributes['parameters']['dtypes'] = dtypes 270 elif dtypes is not None: 271 warn(f"The provided dtypes are of invalid type '{type(dtypes)}'.") 272 273 ### NOTE: The parameters dictionary is {} by default. 274 ### A Pipe may be registered without parameters, then edited, 275 ### or a Pipe may be registered with parameters set in-memory first. 276 # from meerschaum.config import get_config 277 _mrsm_instance = mrsm_instance if mrsm_instance is not None else instance 278 if _mrsm_instance is None: 279 _mrsm_instance = get_config('meerschaum', 'instance', patch=True) 280 281 if not isinstance(_mrsm_instance, str): 282 self._instance_connector = _mrsm_instance 283 self.instance_keys = str(_mrsm_instance) 284 else: ### NOTE: must be SQL or API Connector for this work 285 self.instance_keys = _mrsm_instance 286 287 self._cache = cache and get_config('system', 'experimental', 'cache')
Parameters
- connector (str):
Keys for the pipe's source connector, e.g.
'sql:main'
. - metric (str):
Label for the pipe's contents, e.g.
'weather'
. - location (str, default None):
Label for the pipe's location. Defaults to
None
. - parameters (Optional[Dict[str, Any]], default None):
Optionally set a pipe's parameters from the constructor,
e.g. columns and other attributes.
You can edit these parameters with
edit pipes
. - columns (Optional[Dict[str, str]], default None):
Set the
columns
dictionary ofparameters
. Ifparameters
is also provided, this dictionary is added under the'columns'
key. - tags (Optional[List[str]], default None):
A list of strings to be added under the
'tags'
key ofparameters
. You can select pipes with certain tags using--tags
. - dtypes (Optional[Dict[str, str]], default None):
Set the
dtypes
dictionary ofparameters
. Ifparameters
is also provided, this dictionary is added under the'dtypes'
key. - mrsm_instance (Optional[Union[str, InstanceConnector]], default None):
Connector for the Meerschaum instance where the pipe resides.
Defaults to the preconfigured default instance (
'sql:main'
). - instance (Optional[Union[str, InstanceConnector]], default None):
Alias for
mrsm_instance
. Ifmrsm_instance
is supplied, this value is ignored. - temporary (bool, default False):
If
True
, prevent instance tables (pipes, users, plugins) from being created. - cache (bool, default False):
If
True
, cache fetched data into a local database file. Defaults toFalse
.
290 @property 291 def meta(self): 292 """ 293 Return the four keys needed to reconstruct this pipe. 294 """ 295 return { 296 'connector': self.connector_keys, 297 'metric': self.metric_key, 298 'location': self.location_key, 299 'instance': self.instance_keys, 300 }
Return the four keys needed to reconstruct this pipe.
303 def keys(self) -> List[str]: 304 """ 305 Return the ordered keys for this pipe. 306 """ 307 return { 308 key: val 309 for key, val in self.meta.items() 310 if key != 'instance' 311 }
Return the ordered keys for this pipe.
314 @property 315 def instance_connector(self) -> Union[InstanceConnector, None]: 316 """ 317 The connector to where this pipe resides. 318 May either be of type `meerschaum.connectors.sql.SQLConnector` or 319 `meerschaum.connectors.api.APIConnector`. 320 """ 321 if '_instance_connector' not in self.__dict__: 322 from meerschaum.connectors.parse import parse_instance_keys 323 conn = parse_instance_keys(self.instance_keys) 324 if conn: 325 self._instance_connector = conn 326 else: 327 return None 328 return self._instance_connector
The connector to where this pipe resides.
May either be of type meerschaum.connectors.SQLConnector
or
meerschaum.connectors.APIConnector
.
330 @property 331 def connector(self) -> Union[meerschaum.connectors.Connector, None]: 332 """ 333 The connector to the data source. 334 """ 335 if '_connector' not in self.__dict__: 336 from meerschaum.connectors.parse import parse_instance_keys 337 import warnings 338 with warnings.catch_warnings(): 339 warnings.simplefilter('ignore') 340 try: 341 conn = parse_instance_keys(self.connector_keys) 342 except Exception as e: 343 conn = None 344 if conn: 345 self._connector = conn 346 else: 347 return None 348 return self._connector
The connector to the data source.
351 @property 352 def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]: 353 """ 354 If the pipe was created with `cache=True`, return the connector to the pipe's 355 SQLite database for caching. 356 """ 357 if not self._cache: 358 return None 359 360 if '_cache_connector' not in self.__dict__: 361 from meerschaum.connectors import get_connector 362 from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH 363 _resources_path = SQLITE_RESOURCES_PATH 364 self._cache_connector = get_connector( 365 'sql', '_cache_' + str(self), 366 flavor='sqlite', 367 database=str(_resources_path / ('_cache_' + str(self) + '.db')), 368 ) 369 370 return self._cache_connector
If the pipe was created with cache=True
, return the connector to the pipe's
SQLite database for caching.
373 @property 374 def cache_pipe(self) -> Union['meerschaum.Pipe', None]: 375 """ 376 If the pipe was created with `cache=True`, return another `meerschaum.Pipe` used to 377 manage the local data. 378 """ 379 if self.cache_connector is None: 380 return None 381 if '_cache_pipe' not in self.__dict__: 382 from meerschaum.config._patch import apply_patch_to_config 383 from meerschaum.utils.sql import sql_item_name 384 _parameters = copy.deepcopy(self.parameters) 385 _fetch_patch = { 386 'fetch': ({ 387 'definition': ( 388 f"SELECT * FROM " 389 + sql_item_name( 390 str(self.target), 391 self.instance_connector.flavor, 392 self.instance_connector.get_pipe_schema(self), 393 ) 394 ), 395 }) if self.instance_connector.type == 'sql' else ({ 396 'connector_keys': self.connector_keys, 397 'metric_key': self.metric_key, 398 'location_key': self.location_key, 399 }) 400 } 401 _parameters = apply_patch_to_config(_parameters, _fetch_patch) 402 self._cache_pipe = Pipe( 403 self.instance_keys, 404 (self.connector_keys + '_' + self.metric_key + '_cache'), 405 self.location_key, 406 mrsm_instance = self.cache_connector, 407 parameters = _parameters, 408 cache = False, 409 temporary = True, 410 ) 411 412 return self._cache_pipe
If the pipe was created with cache=True
, return another Pipe
used to
manage the local data.
21def fetch( 22 self, 23 begin: Union[datetime, str, None] = '', 24 end: Optional[datetime] = None, 25 check_existing: bool = True, 26 sync_chunks: bool = False, 27 debug: bool = False, 28 **kw: Any 29 ) -> Union['pd.DataFrame', Iterator['pd.DataFrame'], None]: 30 """ 31 Fetch a Pipe's latest data from its connector. 32 33 Parameters 34 ---------- 35 begin: Union[datetime, str, None], default '': 36 If provided, only fetch data newer than or equal to `begin`. 37 38 end: Optional[datetime], default None: 39 If provided, only fetch data older than or equal to `end`. 40 41 check_existing: bool, default True 42 If `False`, do not apply the backtrack interval. 43 44 sync_chunks: bool, default False 45 If `True` and the pipe's connector is of type `'sql'`, begin syncing chunks while fetching 46 loads chunks into memory. 47 48 debug: bool, default False 49 Verbosity toggle. 50 51 Returns 52 ------- 53 A `pd.DataFrame` of the newest unseen data. 54 55 """ 56 if 'fetch' not in dir(self.connector): 57 warn(f"No `fetch()` function defined for connector '{self.connector}'") 58 return None 59 60 from meerschaum.connectors import custom_types, get_connector_plugin 61 from meerschaum.utils.debug import dprint, _checkpoint 62 from meerschaum.utils.misc import filter_arguments 63 64 _chunk_hook = kw.pop('chunk_hook', None) 65 kw['workers'] = self.get_num_workers(kw.get('workers', None)) 66 if sync_chunks and _chunk_hook is None: 67 68 def _chunk_hook(chunk, **_kw) -> SuccessTuple: 69 """ 70 Wrap `Pipe.sync()` with a custom chunk label prepended to the message. 71 """ 72 from meerschaum.config._patch import apply_patch_to_config 73 kwargs = apply_patch_to_config(kw, _kw) 74 chunk_success, chunk_message = self.sync(chunk, **kwargs) 75 chunk_label = self._get_chunk_label(chunk, self.columns.get('datetime', None)) 76 if chunk_label: 77 chunk_message = '\n' + chunk_label + '\n' + chunk_message 78 return chunk_success, chunk_message 79 80 with mrsm.Venv(get_connector_plugin(self.connector)): 81 _args, _kwargs = filter_arguments( 82 self.connector.fetch, 83 self, 84 begin=_determine_begin( 85 self, 86 begin, 87 check_existing=check_existing, 88 debug=debug, 89 ), 90 end=end, 91 chunk_hook=_chunk_hook, 92 debug=debug, 93 **kw 94 ) 95 df = self.connector.fetch(*_args, **_kwargs) 96 return df
Fetch a Pipe's latest data from its connector.
Parameters
- begin (Union[datetime, str, None], default '':):
If provided, only fetch data newer than or equal to
begin
. - end (Optional[datetime], default None:):
If provided, only fetch data older than or equal to
end
. - check_existing (bool, default True):
If
False
, do not apply the backtrack interval. - sync_chunks (bool, default False):
If
True
and the pipe's connector is of type'sql'
, begin syncing chunks while fetching loads chunks into memory. - debug (bool, default False): Verbosity toggle.
Returns
- A
pd.DataFrame
of the newest unseen data.
99def get_backtrack_interval( 100 self, 101 check_existing: bool = True, 102 debug: bool = False, 103) -> Union[timedelta, int]: 104 """ 105 Get the chunk interval to use for this pipe. 106 107 Parameters 108 ---------- 109 check_existing: bool, default True 110 If `False`, return a backtrack_interval of 0 minutes. 111 112 Returns 113 ------- 114 The backtrack interval (`timedelta` or `int`) to use with this pipe's `datetime` axis. 115 """ 116 default_backtrack_minutes = get_config('pipes', 'parameters', 'fetch', 'backtrack_minutes') 117 configured_backtrack_minutes = self.parameters.get('fetch', {}).get('backtrack_minutes', None) 118 backtrack_minutes = ( 119 configured_backtrack_minutes 120 if configured_backtrack_minutes is not None 121 else default_backtrack_minutes 122 ) if check_existing else 0 123 124 backtrack_interval = timedelta(minutes=backtrack_minutes) 125 dt_col = self.columns.get('datetime', None) 126 if dt_col is None: 127 return backtrack_interval 128 129 dt_dtype = self.dtypes.get(dt_col, 'datetime64[ns]') 130 if 'int' in dt_dtype.lower(): 131 return backtrack_minutes 132 133 return backtrack_interval
Get the chunk interval to use for this pipe.
Parameters
- check_existing (bool, default True):
If
False
, return a backtrack_interval of 0 minutes.
Returns
- The backtrack interval (
timedelta
orint
) to use with this pipe'sdatetime
axis.
23def get_data( 24 self, 25 select_columns: Optional[List[str]] = None, 26 omit_columns: Optional[List[str]] = None, 27 begin: Union[datetime, int, None] = None, 28 end: Union[datetime, int, None] = None, 29 params: Optional[Dict[str, Any]] = None, 30 as_iterator: bool = False, 31 as_chunks: bool = False, 32 as_dask: bool = False, 33 chunk_interval: Union[timedelta, int, None] = None, 34 order: Optional[str] = 'asc', 35 limit: Optional[int] = None, 36 fresh: bool = False, 37 debug: bool = False, 38 **kw: Any 39) -> Union['pd.DataFrame', Iterator['pd.DataFrame'], None]: 40 """ 41 Get a pipe's data from the instance connector. 42 43 Parameters 44 ---------- 45 select_columns: Optional[List[str]], default None 46 If provided, only select these given columns. 47 Otherwise select all available columns (i.e. `SELECT *`). 48 49 omit_columns: Optional[List[str]], default None 50 If provided, remove these columns from the selection. 51 52 begin: Union[datetime, int, None], default None 53 Lower bound datetime to begin searching for data (inclusive). 54 Translates to a `WHERE` clause like `WHERE datetime >= begin`. 55 Defaults to `None`. 56 57 end: Union[datetime, int, None], default None 58 Upper bound datetime to stop searching for data (inclusive). 59 Translates to a `WHERE` clause like `WHERE datetime < end`. 60 Defaults to `None`. 61 62 params: Optional[Dict[str, Any]], default None 63 Filter the retrieved data by a dictionary of parameters. 64 See `meerschaum.utils.sql.build_where` for more details. 65 66 as_iterator: bool, default False 67 If `True`, return a generator of chunks of pipe data. 68 69 as_chunks: bool, default False 70 Alias for `as_iterator`. 71 72 as_dask: bool, default False 73 If `True`, return a `dask.DataFrame` 74 (which may be loaded into a Pandas DataFrame with `df.compute()`). 75 76 chunk_interval: Union[timedelta, int, None], default None 77 If `as_iterator`, then return chunks with `begin` and `end` separated by this interval. 78 This may be set under `pipe.parameters['chunk_minutes']`. 79 By default, use a timedelta of 1440 minutes (1 day). 80 If `chunk_interval` is an integer and the `datetime` axis a timestamp, 81 the use a timedelta with the number of minutes configured to this value. 82 If the `datetime` axis is an integer, default to the configured chunksize. 83 If `chunk_interval` is a `timedelta` and the `datetime` axis an integer, 84 use the number of minutes in the `timedelta`. 85 86 order: Optional[str], default 'asc' 87 If `order` is not `None`, sort the resulting dataframe by indices. 88 89 limit: Optional[int], default None 90 If provided, cap the dataframe to this many rows. 91 92 fresh: bool, default True 93 If `True`, skip local cache and directly query the instance connector. 94 Defaults to `True`. 95 96 debug: bool, default False 97 Verbosity toggle. 98 Defaults to `False`. 99 100 Returns 101 ------- 102 A `pd.DataFrame` for the pipe's data corresponding to the provided parameters. 103 104 """ 105 from meerschaum.utils.warnings import warn 106 from meerschaum.utils.venv import Venv 107 from meerschaum.connectors import get_connector_plugin 108 from meerschaum.utils.misc import iterate_chunks, items_str 109 from meerschaum.utils.dtypes import to_pandas_dtype 110 from meerschaum.utils.dataframe import add_missing_cols_to_df, df_is_chunk_generator 111 from meerschaum.utils.packages import attempt_import 112 dd = attempt_import('dask.dataframe') if as_dask else None 113 dask = attempt_import('dask') if as_dask else None 114 115 if select_columns == '*': 116 select_columns = None 117 elif isinstance(select_columns, str): 118 select_columns = [select_columns] 119 120 if isinstance(omit_columns, str): 121 omit_columns = [omit_columns] 122 123 as_iterator = as_iterator or as_chunks 124 125 def _sort_df(_df): 126 if df_is_chunk_generator(_df): 127 return _df 128 dt_col = self.columns.get('datetime', None) 129 indices = [] if dt_col not in _df.columns else [dt_col] 130 non_dt_cols = [ 131 col 132 for col_ix, col in self.columns.items() 133 if col_ix != 'datetime' and col in _df.columns 134 ] 135 indices.extend(non_dt_cols) 136 if 'dask' not in _df.__module__: 137 _df.sort_values( 138 by=indices, 139 inplace=True, 140 ascending=(str(order).lower() == 'asc'), 141 ) 142 _df.reset_index(drop=True, inplace=True) 143 else: 144 _df = _df.sort_values( 145 by=indices, 146 ascending=(str(order).lower() == 'asc'), 147 ) 148 _df = _df.reset_index(drop=True) 149 if limit is not None and len(_df) > limit: 150 return _df.head(limit) 151 return _df 152 153 if as_iterator or as_chunks: 154 df = self._get_data_as_iterator( 155 select_columns=select_columns, 156 omit_columns=omit_columns, 157 begin=begin, 158 end=end, 159 params=params, 160 chunk_interval=chunk_interval, 161 limit=limit, 162 order=order, 163 fresh=fresh, 164 debug=debug, 165 ) 166 return _sort_df(df) 167 168 if as_dask: 169 from multiprocessing.pool import ThreadPool 170 dask_pool = ThreadPool(self.get_num_workers()) 171 dask.config.set(pool=dask_pool) 172 chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug) 173 bounds = self.get_chunk_bounds( 174 begin=begin, 175 end=end, 176 bounded=False, 177 chunk_interval=chunk_interval, 178 debug=debug, 179 ) 180 dask_chunks = [ 181 dask.delayed(self.get_data)( 182 select_columns=select_columns, 183 omit_columns=omit_columns, 184 begin=chunk_begin, 185 end=chunk_end, 186 params=params, 187 chunk_interval=chunk_interval, 188 order=order, 189 limit=limit, 190 fresh=fresh, 191 debug=debug, 192 ) 193 for (chunk_begin, chunk_end) in bounds 194 ] 195 dask_meta = { 196 col: to_pandas_dtype(typ) 197 for col, typ in self.dtypes.items() 198 } 199 return _sort_df(dd.from_delayed(dask_chunks, meta=dask_meta)) 200 201 if not self.exists(debug=debug): 202 return None 203 204 if self.cache_pipe is not None: 205 if not fresh: 206 _sync_cache_tuple = self.cache_pipe.sync( 207 begin=begin, 208 end=end, 209 params=params, 210 debug=debug, 211 **kw 212 ) 213 if not _sync_cache_tuple[0]: 214 warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1]) 215 fresh = True 216 else: ### Successfully synced cache. 217 return self.enforce_dtypes( 218 self.cache_pipe.get_data( 219 select_columns=select_columns, 220 omit_columns=omit_columns, 221 begin=begin, 222 end=end, 223 params=params, 224 order=order, 225 limit=limit, 226 debug=debug, 227 fresh=True, 228 **kw 229 ), 230 debug=debug, 231 ) 232 233 with Venv(get_connector_plugin(self.instance_connector)): 234 df = self.instance_connector.get_pipe_data( 235 pipe=self, 236 select_columns=select_columns, 237 omit_columns=omit_columns, 238 begin=begin, 239 end=end, 240 params=params, 241 limit=limit, 242 order=order, 243 debug=debug, 244 **kw 245 ) 246 if df is None: 247 return df 248 249 if not select_columns: 250 select_columns = [col for col in df.columns] 251 252 cols_to_omit = [ 253 col 254 for col in df.columns 255 if ( 256 col in (omit_columns or []) 257 or 258 col not in (select_columns or []) 259 ) 260 ] 261 cols_to_add = [ 262 col 263 for col in select_columns 264 if col not in df.columns 265 ] 266 if cols_to_omit: 267 warn( 268 ( 269 f"Received {len(cols_to_omit)} omitted column" 270 + ('s' if len(cols_to_omit) != 1 else '') 271 + f" for {self}. " 272 + "Consider adding `select_columns` and `omit_columns` support to " 273 + f"'{self.instance_connector.type}' connectors to improve performance." 274 ), 275 stack=False, 276 ) 277 _cols_to_select = [col for col in df.columns if col not in cols_to_omit] 278 df = df[_cols_to_select] 279 280 if cols_to_add: 281 warn( 282 ( 283 f"Specified columns {items_str(cols_to_add)} were not found on {self}. " 284 + "Adding these to the DataFrame as null columns." 285 ), 286 stack=False, 287 ) 288 df = add_missing_cols_to_df(df, {col: 'string' for col in cols_to_add}) 289 290 enforced_df = self.enforce_dtypes(df, debug=debug) 291 292 if order: 293 return _sort_df(enforced_df) 294 return enforced_df
Get a pipe's data from the instance connector.
Parameters
- select_columns (Optional[List[str]], default None):
If provided, only select these given columns.
Otherwise select all available columns (i.e.
SELECT *
). - omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
- begin (Union[datetime, int, None], default None):
Lower bound datetime to begin searching for data (inclusive).
Translates to a
WHERE
clause likeWHERE datetime >= begin
. Defaults toNone
. - end (Union[datetime, int, None], default None):
Upper bound datetime to stop searching for data (inclusive).
Translates to a
WHERE
clause likeWHERE datetime < end
. Defaults toNone
. - params (Optional[Dict[str, Any]], default None):
Filter the retrieved data by a dictionary of parameters.
See
meerschaum.utils.sql.build_where
for more details. - as_iterator (bool, default False):
If
True
, return a generator of chunks of pipe data. - as_chunks (bool, default False):
Alias for
as_iterator
. - as_dask (bool, default False):
If
True
, return adask.DataFrame
(which may be loaded into a Pandas DataFrame withdf.compute()
). - chunk_interval (Union[timedelta, int, None], default None):
If
as_iterator
, then return chunks withbegin
andend
separated by this interval. This may be set underpipe.parameters['chunk_minutes']
. By default, use a timedelta of 1440 minutes (1 day). Ifchunk_interval
is an integer and thedatetime
axis a timestamp, the use a timedelta with the number of minutes configured to this value. If thedatetime
axis is an integer, default to the configured chunksize. Ifchunk_interval
is atimedelta
and thedatetime
axis an integer, use the number of minutes in thetimedelta
. - order (Optional[str], default 'asc'):
If
order
is notNone
, sort the resulting dataframe by indices. - limit (Optional[int], default None): If provided, cap the dataframe to this many rows.
- fresh (bool, default True):
If
True
, skip local cache and directly query the instance connector. Defaults toTrue
. - debug (bool, default False):
Verbosity toggle.
Defaults to
False
.
Returns
- A
pd.DataFrame
for the pipe's data corresponding to the provided parameters.
395def get_backtrack_data( 396 self, 397 backtrack_minutes: Optional[int] = None, 398 begin: Union[datetime, int, None] = None, 399 params: Optional[Dict[str, Any]] = None, 400 limit: Optional[int] = None, 401 fresh: bool = False, 402 debug: bool = False, 403 **kw: Any 404) -> Optional['pd.DataFrame']: 405 """ 406 Get the most recent data from the instance connector as a Pandas DataFrame. 407 408 Parameters 409 ---------- 410 backtrack_minutes: Optional[int], default None 411 How many minutes from `begin` to select from. 412 If `None`, use `pipe.parameters['fetch']['backtrack_minutes']`. 413 414 begin: Optional[datetime], default None 415 The starting point to search for data. 416 If begin is `None` (default), use the most recent observed datetime 417 (AKA sync_time). 418 419 ``` 420 E.g. begin = 02:00 421 422 Search this region. Ignore this, even if there's data. 423 / / / / / / / / / | 424 -----|----------|----------|----------|----------|----------| 425 00:00 01:00 02:00 03:00 04:00 05:00 426 427 ``` 428 429 params: Optional[Dict[str, Any]], default None 430 The standard Meerschaum `params` query dictionary. 431 432 limit: Optional[int], default None 433 If provided, cap the number of rows to be returned. 434 435 fresh: bool, default False 436 If `True`, Ignore local cache and pull directly from the instance connector. 437 Only comes into effect if a pipe was created with `cache=True`. 438 439 debug: bool default False 440 Verbosity toggle. 441 442 Returns 443 ------- 444 A `pd.DataFrame` for the pipe's data corresponding to the provided parameters. Backtrack data 445 is a convenient way to get a pipe's data "backtracked" from the most recent datetime. 446 """ 447 from meerschaum.utils.warnings import warn 448 from meerschaum.utils.venv import Venv 449 from meerschaum.connectors import get_connector_plugin 450 451 if not self.exists(debug=debug): 452 return None 453 454 backtrack_interval = self.get_backtrack_interval(debug=debug) 455 if backtrack_minutes is None: 456 backtrack_minutes = ( 457 (backtrack_interval.total_seconds() * 60) 458 if isinstance(backtrack_interval, timedelta) 459 else backtrack_interval 460 ) 461 462 if self.cache_pipe is not None: 463 if not fresh: 464 _sync_cache_tuple = self.cache_pipe.sync(begin=begin, params=params, debug=debug, **kw) 465 if not _sync_cache_tuple[0]: 466 warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1]) 467 fresh = True 468 else: ### Successfully synced cache. 469 return self.enforce_dtypes( 470 self.cache_pipe.get_backtrack_data( 471 fresh=True, 472 begin=begin, 473 backtrack_minutes=backtrack_minutes, 474 params=params, 475 limit=limit, 476 order=kw.get('order', 'desc'), 477 debug=debug, 478 **kw 479 ), 480 debug=debug, 481 ) 482 483 if hasattr(self.instance_connector, 'get_backtrack_data'): 484 with Venv(get_connector_plugin(self.instance_connector)): 485 return self.enforce_dtypes( 486 self.instance_connector.get_backtrack_data( 487 pipe=self, 488 begin=begin, 489 backtrack_minutes=backtrack_minutes, 490 params=params, 491 limit=limit, 492 debug=debug, 493 **kw 494 ), 495 debug=debug, 496 ) 497 498 if begin is None: 499 begin = self.get_sync_time(params=params, debug=debug) 500 501 backtrack_interval = ( 502 timedelta(minutes=backtrack_minutes) 503 if isinstance(begin, datetime) 504 else backtrack_minutes 505 ) 506 if begin is not None: 507 begin = begin - backtrack_interval 508 509 return self.get_data( 510 begin=begin, 511 params=params, 512 debug=debug, 513 limit=limit, 514 order=kw.get('order', 'desc'), 515 **kw 516 )
Get the most recent data from the instance connector as a Pandas DataFrame.
Parameters
- backtrack_minutes (Optional[int], default None):
How many minutes from
begin
to select from. IfNone
, usepipe.parameters['fetch']['backtrack_minutes']
. begin (Optional[datetime], default None): The starting point to search for data. If begin is
None
(default), use the most recent observed datetime (AKA sync_time).E.g. begin = 02:00 Search this region. Ignore this, even if there's data. / / / / / / / / / | -----|----------|----------|----------|----------|----------| 00:00 01:00 02:00 03:00 04:00 05:00
params (Optional[Dict[str, Any]], default None): The standard Meerschaum
params
query dictionary.- limit (Optional[int], default None): If provided, cap the number of rows to be returned.
- fresh (bool, default False):
If
True
, Ignore local cache and pull directly from the instance connector. Only comes into effect if a pipe was created withcache=True
. - debug (bool default False): Verbosity toggle.
Returns
- A
pd.DataFrame
for the pipe's data corresponding to the provided parameters. Backtrack data - is a convenient way to get a pipe's data "backtracked" from the most recent datetime.
520def get_rowcount( 521 self, 522 begin: Optional[datetime] = None, 523 end: Optional['datetime'] = None, 524 params: Optional[Dict[str, Any]] = None, 525 remote: bool = False, 526 debug: bool = False 527 ) -> int: 528 """ 529 Get a Pipe's instance or remote rowcount. 530 531 Parameters 532 ---------- 533 begin: Optional[datetime], default None 534 Count rows where datetime > begin. 535 536 end: Optional[datetime], default None 537 Count rows where datetime < end. 538 539 remote: bool, default False 540 Count rows from a pipe's remote source. 541 **NOTE**: This is experimental! 542 543 debug: bool, default False 544 Verbosity toggle. 545 546 Returns 547 ------- 548 An `int` of the number of rows in the pipe corresponding to the provided parameters. 549 Returned 0 if the pipe does not exist. 550 """ 551 from meerschaum.utils.warnings import warn 552 from meerschaum.utils.venv import Venv 553 from meerschaum.connectors import get_connector_plugin 554 555 connector = self.instance_connector if not remote else self.connector 556 try: 557 with Venv(get_connector_plugin(connector)): 558 rowcount = connector.get_pipe_rowcount( 559 self, 560 begin = begin, 561 end = end, 562 params = params, 563 remote = remote, 564 debug = debug, 565 ) 566 if rowcount is None: 567 return 0 568 return rowcount 569 except AttributeError as e: 570 warn(e) 571 if remote: 572 return 0 573 warn(f"Failed to get a rowcount for {self}.") 574 return 0
Get a Pipe's instance or remote rowcount.
Parameters
- begin (Optional[datetime], default None): Count rows where datetime > begin.
- end (Optional[datetime], default None): Count rows where datetime < end.
- remote (bool, default False): Count rows from a pipe's remote source. NOTE: This is experimental!
- debug (bool, default False): Verbosity toggle.
Returns
- An
int
of the number of rows in the pipe corresponding to the provided parameters. - Returned 0 if the pipe does not exist.
577def get_chunk_interval( 578 self, 579 chunk_interval: Union[timedelta, int, None] = None, 580 debug: bool = False, 581 ) -> Union[timedelta, int]: 582 """ 583 Get the chunk interval to use for this pipe. 584 585 Parameters 586 ---------- 587 chunk_interval: Union[timedelta, int, None], default None 588 If provided, coerce this value into the correct type. 589 For example, if the datetime axis is an integer, then 590 return the number of minutes. 591 592 Returns 593 ------- 594 The chunk interval (`timedelta` or `int`) to use with this pipe's `datetime` axis. 595 """ 596 default_chunk_minutes = get_config('pipes', 'parameters', 'verify', 'chunk_minutes') 597 configured_chunk_minutes = self.parameters.get('verify', {}).get('chunk_minutes', None) 598 chunk_minutes = ( 599 (configured_chunk_minutes or default_chunk_minutes) 600 if chunk_interval is None 601 else ( 602 chunk_interval 603 if isinstance(chunk_interval, int) 604 else int(chunk_interval.total_seconds() / 60) 605 ) 606 ) 607 608 dt_col = self.columns.get('datetime', None) 609 if dt_col is None: 610 return timedelta(minutes=chunk_minutes) 611 612 dt_dtype = self.dtypes.get(dt_col, 'datetime64[ns]') 613 if 'int' in dt_dtype.lower(): 614 return chunk_minutes 615 return timedelta(minutes=chunk_minutes)
Get the chunk interval to use for this pipe.
Parameters
- chunk_interval (Union[timedelta, int, None], default None): If provided, coerce this value into the correct type. For example, if the datetime axis is an integer, then return the number of minutes.
Returns
- The chunk interval (
timedelta
orint
) to use with this pipe'sdatetime
axis.
618def get_chunk_bounds( 619 self, 620 begin: Union[datetime, int, None] = None, 621 end: Union[datetime, int, None] = None, 622 bounded: bool = False, 623 chunk_interval: Union[timedelta, int, None] = None, 624 debug: bool = False, 625 ) -> List[ 626 Tuple[ 627 Union[datetime, int, None], 628 Union[datetime, int, None], 629 ] 630 ]: 631 """ 632 Return a list of datetime bounds for iterating over the pipe's `datetime` axis. 633 634 Parameters 635 ---------- 636 begin: Union[datetime, int, None], default None 637 If provided, do not select less than this value. 638 Otherwise the first chunk will be unbounded. 639 640 end: Union[datetime, int, None], default None 641 If provided, do not select greater than or equal to this value. 642 Otherwise the last chunk will be unbounded. 643 644 bounded: bool, default False 645 If `True`, do not include `None` in the first chunk. 646 647 chunk_interval: Union[timedelta, int, None], default None 648 If provided, use this interval for the size of chunk boundaries. 649 The default value for this pipe may be set 650 under `pipe.parameters['verify']['chunk_minutes']`. 651 652 debug: bool, default False 653 Verbosity toggle. 654 655 Returns 656 ------- 657 A list of chunk bounds (datetimes or integers). 658 If unbounded, the first and last chunks will include `None`. 659 """ 660 include_less_than_begin = not bounded and begin is None 661 include_greater_than_end = not bounded and end is None 662 if begin is None: 663 begin = self.get_sync_time(newest=False, debug=debug) 664 if end is None: 665 end = self.get_sync_time(newest=True, debug=debug) 666 if begin is None and end is None: 667 return [(None, None)] 668 669 ### Set the chunk interval under `pipe.parameters['verify']['chunk_minutes']`. 670 chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug) 671 672 ### Build a list of tuples containing the chunk boundaries 673 ### so that we can sync multiple chunks in parallel. 674 ### Run `verify pipes --workers 1` to sync chunks in series. 675 chunk_bounds = [] 676 begin_cursor = begin 677 while begin_cursor < end: 678 end_cursor = begin_cursor + chunk_interval 679 chunk_bounds.append((begin_cursor, end_cursor)) 680 begin_cursor = end_cursor 681 682 ### The chunk interval might be too large. 683 if not chunk_bounds and end >= begin: 684 chunk_bounds = [(begin, end)] 685 686 ### Truncate the last chunk to the end timestamp. 687 if chunk_bounds[-1][1] > end: 688 chunk_bounds[-1] = (chunk_bounds[-1][0], end) 689 690 ### Pop the last chunk if its bounds are equal. 691 if chunk_bounds[-1][0] == chunk_bounds[-1][1]: 692 chunk_bounds = chunk_bounds[:-1] 693 694 if include_less_than_begin: 695 chunk_bounds = [(None, begin)] + chunk_bounds 696 if include_greater_than_end: 697 chunk_bounds = chunk_bounds + [(end, None)] 698 699 return chunk_bounds
Return a list of datetime bounds for iterating over the pipe's datetime
axis.
Parameters
- begin (Union[datetime, int, None], default None): If provided, do not select less than this value. Otherwise the first chunk will be unbounded.
- end (Union[datetime, int, None], default None): If provided, do not select greater than or equal to this value. Otherwise the last chunk will be unbounded.
- bounded (bool, default False):
If
True
, do not includeNone
in the first chunk. - chunk_interval (Union[timedelta, int, None], default None):
If provided, use this interval for the size of chunk boundaries.
The default value for this pipe may be set
under
pipe.parameters['verify']['chunk_minutes']
. - debug (bool, default False): Verbosity toggle.
Returns
- A list of chunk bounds (datetimes or integers).
- If unbounded, the first and last chunks will include
None
.
12def register( 13 self, 14 debug: bool = False, 15 **kw: Any 16 ) -> SuccessTuple: 17 """ 18 Register a new Pipe along with its attributes. 19 20 Parameters 21 ---------- 22 debug: bool, default False 23 Verbosity toggle. 24 25 kw: Any 26 Keyword arguments to pass to `instance_connector.register_pipe()`. 27 28 Returns 29 ------- 30 A `SuccessTuple` of success, message. 31 """ 32 if self.temporary: 33 return False, "Cannot register pipes created with `temporary=True` (read-only)." 34 35 from meerschaum.utils.formatting import get_console 36 from meerschaum.utils.venv import Venv 37 from meerschaum.connectors import get_connector_plugin, custom_types 38 from meerschaum.config._patch import apply_patch_to_config 39 40 import warnings 41 with warnings.catch_warnings(): 42 warnings.simplefilter('ignore') 43 try: 44 _conn = self.connector 45 except Exception as e: 46 _conn = None 47 48 if ( 49 _conn is not None 50 and 51 (_conn.type == 'plugin' or _conn.type in custom_types) 52 and 53 getattr(_conn, 'register', None) is not None 54 ): 55 try: 56 with Venv(get_connector_plugin(_conn), debug=debug): 57 params = self.connector.register(self) 58 except Exception as e: 59 get_console().print_exception() 60 params = None 61 params = {} if params is None else params 62 if not isinstance(params, dict): 63 from meerschaum.utils.warnings import warn 64 warn( 65 f"Invalid parameters returned from `register()` in connector {self.connector}:\n" 66 + f"{params}" 67 ) 68 else: 69 self.parameters = apply_patch_to_config(params, self.parameters) 70 71 if not self.parameters: 72 cols = self.columns if self.columns else {'datetime': None, 'id': None} 73 self.parameters = { 74 'columns': cols, 75 } 76 77 with Venv(get_connector_plugin(self.instance_connector)): 78 return self.instance_connector.register_pipe(self, debug=debug, **kw)
Register a new Pipe along with its attributes.
Parameters
- debug (bool, default False): Verbosity toggle.
- kw (Any):
Keyword arguments to pass to
instance_connector.register_pipe()
.
Returns
- A
SuccessTuple
of success, message.
14@property 15def attributes(self) -> Dict[str, Any]: 16 """ 17 Return a dictionary of a pipe's keys and parameters. 18 These values are reflected directly from the pipes table of the instance. 19 """ 20 import time 21 from meerschaum.config import get_config 22 from meerschaum.config._patch import apply_patch_to_config 23 from meerschaum.utils.venv import Venv 24 from meerschaum.connectors import get_connector_plugin 25 26 timeout_seconds = get_config('pipes', 'attributes', 'local_cache_timeout_seconds') 27 28 if '_attributes' not in self.__dict__: 29 self._attributes = {} 30 31 now = time.perf_counter() 32 last_refresh = self.__dict__.get('_attributes_sync_time', None) 33 timed_out = ( 34 last_refresh is None 35 or 36 (timeout_seconds is not None and (now - last_refresh) >= timeout_seconds) 37 ) 38 if not self.temporary and timed_out: 39 self._attributes_sync_time = now 40 local_attributes = self.__dict__.get('_attributes', {}) 41 with Venv(get_connector_plugin(self.instance_connector)): 42 instance_attributes = self.instance_connector.get_pipe_attributes(self) 43 self._attributes = apply_patch_to_config(instance_attributes, local_attributes) 44 return self._attributes
Return a dictionary of a pipe's keys and parameters. These values are reflected directly from the pipes table of the instance.
47@property 48def parameters(self) -> Optional[Dict[str, Any]]: 49 """ 50 Return the parameters dictionary of the pipe. 51 """ 52 if 'parameters' not in self.attributes: 53 self.attributes['parameters'] = {} 54 return self.attributes['parameters']
Return the parameters dictionary of the pipe.
66@property 67def columns(self) -> Union[Dict[str, str], None]: 68 """ 69 Return the `columns` dictionary defined in `meerschaum.Pipe.parameters`. 70 """ 71 if 'columns' not in self.parameters: 72 self.parameters['columns'] = {} 73 cols = self.parameters['columns'] 74 if not isinstance(cols, dict): 75 cols = {} 76 self.parameters['columns'] = cols 77 return cols
Return the columns
dictionary defined in Pipe.parameters
.
117@property 118def dtypes(self) -> Union[Dict[str, Any], None]: 119 """ 120 If defined, return the `dtypes` dictionary defined in `meerschaum.Pipe.parameters`. 121 """ 122 from meerschaum.config._patch import apply_patch_to_config 123 configured_dtypes = self.parameters.get('dtypes', {}) 124 remote_dtypes = self.infer_dtypes(persist=False) 125 patched_dtypes = apply_patch_to_config(remote_dtypes, configured_dtypes) 126 return patched_dtypes
If defined, return the dtypes
dictionary defined in Pipe.parameters
.
138def get_columns(self, *args: str, error: bool = False) -> Union[str, Tuple[str]]: 139 """ 140 Check if the requested columns are defined. 141 142 Parameters 143 ---------- 144 *args: str 145 The column names to be retrieved. 146 147 error: bool, default False 148 If `True`, raise an `Exception` if the specified column is not defined. 149 150 Returns 151 ------- 152 A tuple of the same size of `args` or a `str` if `args` is a single argument. 153 154 Examples 155 -------- 156 >>> pipe = mrsm.Pipe('test', 'test') 157 >>> pipe.columns = {'datetime': 'dt', 'id': 'id'} 158 >>> pipe.get_columns('datetime', 'id') 159 ('dt', 'id') 160 >>> pipe.get_columns('value', error=True) 161 Exception: 🛑 Missing 'value' column for Pipe('test', 'test'). 162 """ 163 from meerschaum.utils.warnings import error as _error, warn 164 if not args: 165 args = tuple(self.columns.keys()) 166 col_names = [] 167 for col in args: 168 col_name = None 169 try: 170 col_name = self.columns[col] 171 if col_name is None and error: 172 _error(f"Please define the name of the '{col}' column for {self}.") 173 except Exception as e: 174 col_name = None 175 if col_name is None and error: 176 _error(f"Missing '{col}'" + f" column for {self}.") 177 col_names.append(col_name) 178 if len(col_names) == 1: 179 return col_names[0] 180 return tuple(col_names)
Check if the requested columns are defined.
Parameters
- *args (str): The column names to be retrieved.
- error (bool, default False):
If
True
, raise anException
if the specified column is not defined.
Returns
- A tuple of the same size of
args
or astr
ifargs
is a single argument.
Examples
>>> pipe = mrsm.Pipe('test', 'test')
>>> pipe.columns = {'datetime': 'dt', 'id': 'id'}
>>> pipe.get_columns('datetime', 'id')
('dt', 'id')
>>> pipe.get_columns('value', error=True)
Exception: 🛑 Missing 'value' column for Pipe('test', 'test').
183def get_columns_types(self, debug: bool = False) -> Union[Dict[str, str], None]: 184 """ 185 Get a dictionary of a pipe's column names and their types. 186 187 Parameters 188 ---------- 189 debug: bool, default False: 190 Verbosity toggle. 191 192 Returns 193 ------- 194 A dictionary of column names (`str`) to column types (`str`). 195 196 Examples 197 -------- 198 >>> pipe.get_columns_types() 199 { 200 'dt': 'TIMESTAMP WITHOUT TIMEZONE', 201 'id': 'BIGINT', 202 'val': 'DOUBLE PRECISION', 203 } 204 >>> 205 """ 206 from meerschaum.utils.venv import Venv 207 from meerschaum.connectors import get_connector_plugin 208 209 with Venv(get_connector_plugin(self.instance_connector)): 210 return self.instance_connector.get_pipe_columns_types(self, debug=debug)
Get a dictionary of a pipe's column names and their types.
Parameters
- debug (bool, default False:): Verbosity toggle.
Returns
- A dictionary of column names (
str
) to column types (str
).
Examples
>>> pipe.get_columns_types()
{
'dt': 'TIMESTAMP WITHOUT TIMEZONE',
'id': 'BIGINT',
'val': 'DOUBLE PRECISION',
}
>>>
435def get_indices(self) -> Dict[str, str]: 436 """ 437 Return a dictionary in the form of `pipe.columns` but map to index names. 438 """ 439 return { 440 ix: (self.target + '_' + col + '_index') 441 for ix, col in self.columns.items() if col 442 }
Return a dictionary in the form of pipe.columns
but map to index names.
213def get_id(self, **kw: Any) -> Union[int, None]: 214 """ 215 Fetch a pipe's ID from its instance connector. 216 If the pipe does not exist, return `None`. 217 """ 218 if self.temporary: 219 return None 220 from meerschaum.utils.venv import Venv 221 from meerschaum.connectors import get_connector_plugin 222 223 with Venv(get_connector_plugin(self.instance_connector)): 224 return self.instance_connector.get_pipe_id(self, **kw)
Fetch a pipe's ID from its instance connector.
If the pipe does not exist, return None
.
227@property 228def id(self) -> Union[int, None]: 229 """ 230 Fetch and cache a pipe's ID. 231 """ 232 if not ('_id' in self.__dict__ and self._id): 233 self._id = self.get_id() 234 return self._id
Fetch and cache a pipe's ID.
237def get_val_column(self, debug: bool = False) -> Union[str, None]: 238 """ 239 Return the name of the value column if it's defined, otherwise make an educated guess. 240 If not set in the `columns` dictionary, return the first numeric column that is not 241 an ID or datetime column. 242 If none may be found, return `None`. 243 244 Parameters 245 ---------- 246 debug: bool, default False: 247 Verbosity toggle. 248 249 Returns 250 ------- 251 Either a string or `None`. 252 """ 253 from meerschaum.utils.debug import dprint 254 if debug: 255 dprint('Attempting to determine the value column...') 256 try: 257 val_name = self.get_columns('value') 258 except Exception as e: 259 val_name = None 260 if val_name is not None: 261 if debug: 262 dprint(f"Value column: {val_name}") 263 return val_name 264 265 cols = self.columns 266 if cols is None: 267 if debug: 268 dprint('No columns could be determined. Returning...') 269 return None 270 try: 271 dt_name = self.get_columns('datetime', error=False) 272 except Exception as e: 273 dt_name = None 274 try: 275 id_name = self.get_columns('id', errors=False) 276 except Exception as e: 277 id_name = None 278 279 if debug: 280 dprint(f"dt_name: {dt_name}") 281 dprint(f"id_name: {id_name}") 282 283 cols_types = self.get_columns_types(debug=debug) 284 if cols_types is None: 285 return None 286 if debug: 287 dprint(f"cols_types: {cols_types}") 288 if dt_name is not None: 289 cols_types.pop(dt_name, None) 290 if id_name is not None: 291 cols_types.pop(id_name, None) 292 293 candidates = [] 294 candidate_keywords = {'float', 'double', 'precision', 'int', 'numeric',} 295 for search_term in candidate_keywords: 296 for col, typ in cols_types.items(): 297 if search_term in typ.lower(): 298 candidates.append(col) 299 break 300 if not candidates: 301 if debug: 302 dprint(f"No value column could be determined.") 303 return None 304 305 return candidates[0]
Return the name of the value column if it's defined, otherwise make an educated guess.
If not set in the columns
dictionary, return the first numeric column that is not
an ID or datetime column.
If none may be found, return None
.
Parameters
- debug (bool, default False:): Verbosity toggle.
Returns
- Either a string or
None
.
308@property 309def parents(self) -> List[meerschaum.Pipe]: 310 """ 311 Return a list of `meerschaum.Pipe` objects to be designated as parents. 312 """ 313 if 'parents' not in self.parameters: 314 return [] 315 from meerschaum.utils.warnings import warn 316 _parents_keys = self.parameters['parents'] 317 if not isinstance(_parents_keys, list): 318 warn( 319 f"Please ensure the parents for {self} are defined as a list of keys.", 320 stacklevel = 4 321 ) 322 return [] 323 from meerschaum import Pipe 324 _parents = [] 325 for keys in _parents_keys: 326 try: 327 p = Pipe(**keys) 328 except Exception as e: 329 warn(f"Unable to build parent with keys '{keys}' for {self}:\n{e}") 330 continue 331 _parents.append(p) 332 return _parents
Return a list of Pipe
objects to be designated as parents.
335@property 336def children(self) -> List[meerschaum.Pipe]: 337 """ 338 Return a list of `meerschaum.Pipe` objects to be designated as children. 339 """ 340 if 'children' not in self.parameters: 341 return [] 342 from meerschaum.utils.warnings import warn 343 _children_keys = self.parameters['children'] 344 if not isinstance(_children_keys, list): 345 warn( 346 f"Please ensure the children for {self} are defined as a list of keys.", 347 stacklevel = 4 348 ) 349 return [] 350 from meerschaum import Pipe 351 _children = [] 352 for keys in _children_keys: 353 try: 354 p = Pipe(**keys) 355 except Exception as e: 356 warn(f"Unable to build parent with keys '{keys}' for {self}:\n{e}") 357 continue 358 _children.append(p) 359 return _children
Return a list of Pipe
objects to be designated as children.
362@property 363def target(self) -> str: 364 """ 365 The target table name. 366 You can set the target name under on of the following keys 367 (checked in this order): 368 - `target` 369 - `target_name` 370 - `target_table` 371 - `target_table_name` 372 """ 373 if 'target' not in self.parameters: 374 target = self._target_legacy() 375 potential_keys = ('target_name', 'target_table', 'target_table_name') 376 for k in potential_keys: 377 if k in self.parameters: 378 target = self.parameters[k] 379 break 380 381 if self.instance_connector.type == 'sql': 382 from meerschaum.utils.sql import truncate_item_name 383 truncated_target = truncate_item_name(target, self.instance_connector.flavor) 384 if truncated_target != target: 385 warn( 386 f"The target '{target}' is too long for '{self.instance_connector.flavor}', " 387 + f"will use {truncated_target} instead." 388 ) 389 target = truncated_target 390 391 self.target = target 392 return self.parameters['target']
The target table name. You can set the target name under on of the following keys (checked in this order):
target
target_name
target_table
target_table_name
415def guess_datetime(self) -> Union[str, None]: 416 """ 417 Try to determine a pipe's datetime column. 418 """ 419 dtypes = self.dtypes 420 421 ### Abort if the user explictly disallows a datetime index. 422 if 'datetime' in dtypes: 423 if dtypes['datetime'] is None: 424 return None 425 426 dt_cols = [ 427 col for col, typ in self.dtypes.items() 428 if str(typ).startswith('datetime') 429 ] 430 if not dt_cols: 431 return None 432 return dt_cols[0]
Try to determine a pipe's datetime column.
12def show( 13 self, 14 nopretty: bool = False, 15 debug: bool = False, 16 **kw 17 ) -> SuccessTuple: 18 """ 19 Show attributes of a Pipe. 20 21 Parameters 22 ---------- 23 nopretty: bool, default False 24 If `True`, simply print the JSON of the pipe's attributes. 25 26 debug: bool, default False 27 Verbosity toggle. 28 29 Returns 30 ------- 31 A `SuccessTuple` of success, message. 32 33 """ 34 import json 35 from meerschaum.utils.formatting import ( 36 pprint, make_header, ANSI, highlight_pipes, fill_ansi, get_console, 37 ) 38 from meerschaum.utils.packages import import_rich, attempt_import 39 from meerschaum.utils.warnings import info 40 attributes_json = json.dumps(self.attributes) 41 if not nopretty: 42 _to_print = f"Attributes for {self}:" 43 if ANSI: 44 _to_print = fill_ansi(highlight_pipes(make_header(_to_print)), 'magenta') 45 print(_to_print) 46 rich = import_rich() 47 rich_json = attempt_import('rich.json') 48 get_console().print(rich_json.JSON(attributes_json)) 49 else: 50 print(_to_print) 51 else: 52 print(attributes_json) 53 54 return True, "Success"
Show attributes of a Pipe.
Parameters
- nopretty (bool, default False):
If
True
, simply print the JSON of the pipe's attributes. - debug (bool, default False): Verbosity toggle.
Returns
- A
SuccessTuple
of success, message.
21def edit( 22 self, 23 patch: bool = False, 24 interactive: bool = False, 25 debug: bool = False, 26 **kw: Any 27) -> SuccessTuple: 28 """ 29 Edit a Pipe's configuration. 30 31 Parameters 32 ---------- 33 patch: bool, default False 34 If `patch` is True, update parameters by cascading rather than overwriting. 35 interactive: bool, default False 36 If `True`, open an editor for the user to make changes to the pipe's YAML file. 37 debug: bool, default False 38 Verbosity toggle. 39 40 Returns 41 ------- 42 A `SuccessTuple` of success, message. 43 44 """ 45 from meerschaum.utils.venv import Venv 46 from meerschaum.connectors import get_connector_plugin 47 48 if self.temporary: 49 return False, "Cannot edit pipes created with `temporary=True` (read-only)." 50 51 if not interactive: 52 with Venv(get_connector_plugin(self.instance_connector)): 53 return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw) 54 55 from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH 56 from meerschaum.utils.misc import edit_file 57 parameters_filename = str(self) + '.yaml' 58 parameters_path = PIPES_CACHE_RESOURCES_PATH / parameters_filename 59 60 from meerschaum.utils.yaml import yaml 61 62 edit_text = f"Edit the parameters for {self}" 63 edit_top = '#' * (len(edit_text) + 4) 64 edit_header = edit_top + f'\n# {edit_text} #\n' + edit_top + '\n\n' 65 66 from meerschaum.config import get_config 67 parameters = dict(get_config('pipes', 'parameters', patch=True)) 68 from meerschaum.config._patch import apply_patch_to_config 69 parameters = apply_patch_to_config(parameters, self.parameters) 70 71 ### write parameters to yaml file 72 with open(parameters_path, 'w+') as f: 73 f.write(edit_header) 74 yaml.dump(parameters, stream=f, sort_keys=False) 75 76 ### only quit editing if yaml is valid 77 editing = True 78 while editing: 79 edit_file(parameters_path) 80 try: 81 with open(parameters_path, 'r') as f: 82 file_parameters = yaml.load(f.read()) 83 except Exception as e: 84 from meerschaum.utils.warnings import warn 85 warn(f"Invalid format defined for '{self}':\n\n{e}") 86 input(f"Press [Enter] to correct the configuration for '{self}': ") 87 else: 88 editing = False 89 90 self.parameters = file_parameters 91 92 if debug: 93 from meerschaum.utils.formatting import pprint 94 pprint(self.parameters) 95 96 with Venv(get_connector_plugin(self.instance_connector)): 97 return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw)
Edit a Pipe's configuration.
Parameters
- patch (bool, default False):
If
patch
is True, update parameters by cascading rather than overwriting. - interactive (bool, default False):
If
True
, open an editor for the user to make changes to the pipe's YAML file. - debug (bool, default False): Verbosity toggle.
Returns
- A
SuccessTuple
of success, message.
100def edit_definition( 101 self, 102 yes: bool = False, 103 noask: bool = False, 104 force: bool = False, 105 debug : bool = False, 106 **kw : Any 107) -> SuccessTuple: 108 """ 109 Edit a pipe's definition file and update its configuration. 110 **NOTE:** This function is interactive and should not be used in automated scripts! 111 112 Returns 113 ------- 114 A `SuccessTuple` of success, message. 115 116 """ 117 if self.temporary: 118 return False, "Cannot edit pipes created with `temporary=True` (read-only)." 119 120 from meerschaum.connectors import instance_types 121 if (self.connector is None) or self.connector.type not in instance_types: 122 return self.edit(interactive=True, debug=debug, **kw) 123 124 import json 125 from meerschaum.utils.warnings import info, warn 126 from meerschaum.utils.debug import dprint 127 from meerschaum.config._patch import apply_patch_to_config 128 from meerschaum.utils.misc import edit_file 129 130 _parameters = self.parameters 131 if 'fetch' not in _parameters: 132 _parameters['fetch'] = {} 133 134 def _edit_api(): 135 from meerschaum.utils.prompt import prompt, yes_no 136 info( 137 f"Please enter the keys of the source pipe from '{self.connector}'.\n" + 138 "Type 'None' for None, or empty when there is no default. Press [CTRL+C] to skip." 139 ) 140 141 _keys = { 'connector_keys' : None, 'metric_key' : None, 'location_key' : None } 142 for k in _keys: 143 _keys[k] = _parameters['fetch'].get(k, None) 144 145 for k, v in _keys.items(): 146 try: 147 _keys[k] = prompt(k.capitalize().replace('_', ' ') + ':', icon=True, default=v) 148 except KeyboardInterrupt: 149 continue 150 if _keys[k] in ('', 'None', '\'None\'', '[None]'): 151 _keys[k] = None 152 153 _parameters['fetch'] = apply_patch_to_config(_parameters['fetch'], _keys) 154 155 info("You may optionally specify additional filter parameters as JSON.") 156 print(" Parameters are translated into a 'WHERE x AND y' clause, and lists are IN clauses.") 157 print(" For example, the following JSON would correspond to 'WHERE x = 1 AND y IN (2, 3)':") 158 print(json.dumps({'x': 1, 'y': [2, 3]}, indent=2, separators=(',', ': '))) 159 if force or yes_no( 160 "Would you like to add additional filter parameters?", 161 yes=yes, noask=noask 162 ): 163 from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH 164 definition_filename = str(self) + '.json' 165 definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename 166 try: 167 definition_path.touch() 168 with open(definition_path, 'w+') as f: 169 json.dump(_parameters.get('fetch', {}).get('params', {}), f, indent=2) 170 except Exception as e: 171 return False, f"Failed writing file '{definition_path}':\n" + str(e) 172 173 _params = None 174 while True: 175 edit_file(definition_path) 176 try: 177 with open(definition_path, 'r') as f: 178 _params = json.load(f) 179 except Exception as e: 180 warn(f'Failed to read parameters JSON:\n{e}', stack=False) 181 if force or yes_no( 182 "Would you like to try again?\n " 183 + "If not, the parameters JSON file will be ignored.", 184 noask=noask, yes=yes 185 ): 186 continue 187 _params = None 188 break 189 if _params is not None: 190 if 'fetch' not in _parameters: 191 _parameters['fetch'] = {} 192 _parameters['fetch']['params'] = _params 193 194 self.parameters = _parameters 195 return True, "Success" 196 197 def _edit_sql(): 198 import pathlib, os, textwrap 199 from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH 200 from meerschaum.utils.misc import edit_file 201 definition_filename = str(self) + '.sql' 202 definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename 203 204 sql_definition = _parameters['fetch'].get('definition', None) 205 if sql_definition is None: 206 sql_definition = '' 207 sql_definition = textwrap.dedent(sql_definition).lstrip() 208 209 try: 210 definition_path.touch() 211 with open(definition_path, 'w+') as f: 212 f.write(sql_definition) 213 except Exception as e: 214 return False, f"Failed writing file '{definition_path}':\n" + str(e) 215 216 edit_file(definition_path) 217 try: 218 with open(definition_path, 'r') as f: 219 file_definition = f.read() 220 except Exception as e: 221 return False, f"Failed reading file '{definition_path}':\n" + str(e) 222 223 if sql_definition == file_definition: 224 return False, f"No changes made to definition for {self}." 225 226 if ' ' not in file_definition: 227 return False, f"Invalid SQL definition for {self}." 228 229 if debug: 230 dprint("Read SQL definition:\n\n" + file_definition) 231 _parameters['fetch']['definition'] = file_definition 232 self.parameters = _parameters 233 return True, "Success" 234 235 locals()['_edit_' + str(self.connector.type)]() 236 return self.edit(interactive=False, debug=debug, **kw)
Edit a pipe's definition file and update its configuration. NOTE: This function is interactive and should not be used in automated scripts!
Returns
- A
SuccessTuple
of success, message.
13def update(self, *args, **kw) -> SuccessTuple: 14 """ 15 Update a pipe's parameters in its instance. 16 """ 17 kw['interactive'] = False 18 return self.edit(*args, **kw)
Update a pipe's parameters in its instance.
40def sync( 41 self, 42 df: Union[ 43 pd.DataFrame, 44 Dict[str, List[Any]], 45 List[Dict[str, Any]], 46 InferFetch 47 ] = InferFetch, 48 begin: Union[datetime, int, str, None] = '', 49 end: Union[datetime, int, None] = None, 50 force: bool = False, 51 retries: int = 10, 52 min_seconds: int = 1, 53 check_existing: bool = True, 54 blocking: bool = True, 55 workers: Optional[int] = None, 56 callback: Optional[Callable[[Tuple[bool, str]], Any]] = None, 57 error_callback: Optional[Callable[[Exception], Any]] = None, 58 chunksize: Optional[int] = -1, 59 sync_chunks: bool = True, 60 debug: bool = False, 61 _inplace: bool = True, 62 **kw: Any 63) -> SuccessTuple: 64 """ 65 Fetch new data from the source and update the pipe's table with new data. 66 67 Get new remote data via fetch, get existing data in the same time period, 68 and merge the two, only keeping the unseen data. 69 70 Parameters 71 ---------- 72 df: Union[None, pd.DataFrame, Dict[str, List[Any]]], default None 73 An optional DataFrame to sync into the pipe. Defaults to `None`. 74 75 begin: Union[datetime, int, str, None], default '' 76 Optionally specify the earliest datetime to search for data. 77 78 end: Union[datetime, int, str, None], default None 79 Optionally specify the latest datetime to search for data. 80 81 force: bool, default False 82 If `True`, keep trying to sync untul `retries` attempts. 83 84 retries: int, default 10 85 If `force`, how many attempts to try syncing before declaring failure. 86 87 min_seconds: Union[int, float], default 1 88 If `force`, how many seconds to sleep between retries. Defaults to `1`. 89 90 check_existing: bool, default True 91 If `True`, pull and diff with existing data from the pipe. 92 93 blocking: bool, default True 94 If `True`, wait for sync to finish and return its result, otherwise 95 asyncronously sync (oxymoron?) and return success. Defaults to `True`. 96 Only intended for specific scenarios. 97 98 workers: Optional[int], default None 99 If provided and the instance connector is thread-safe 100 (`pipe.instance_connector.IS_THREAD_SAFE is True`), 101 limit concurrent sync to this many threads. 102 103 callback: Optional[Callable[[Tuple[bool, str]], Any]], default None 104 Callback function which expects a SuccessTuple as input. 105 Only applies when `blocking=False`. 106 107 error_callback: Optional[Callable[[Exception], Any]], default None 108 Callback function which expects an Exception as input. 109 Only applies when `blocking=False`. 110 111 chunksize: int, default -1 112 Specify the number of rows to sync per chunk. 113 If `-1`, resort to system configuration (default is `900`). 114 A `chunksize` of `None` will sync all rows in one transaction. 115 116 sync_chunks: bool, default True 117 If possible, sync chunks while fetching them into memory. 118 119 debug: bool, default False 120 Verbosity toggle. Defaults to False. 121 122 Returns 123 ------- 124 A `SuccessTuple` of success (`bool`) and message (`str`). 125 """ 126 from meerschaum.utils.debug import dprint, _checkpoint 127 from meerschaum.connectors import custom_types 128 from meerschaum.plugins import Plugin 129 from meerschaum.utils.formatting import get_console 130 from meerschaum.utils.venv import Venv 131 from meerschaum.connectors import get_connector_plugin 132 from meerschaum.utils.misc import df_is_chunk_generator, filter_keywords, filter_arguments 133 from meerschaum.utils.pool import get_pool 134 from meerschaum.config import get_config 135 136 if (callback is not None or error_callback is not None) and blocking: 137 warn("Callback functions are only executed when blocking = False. Ignoring...") 138 139 _checkpoint(_total=2, **kw) 140 141 if chunksize == 0: 142 chunksize = None 143 sync_chunks = False 144 145 kw.update({ 146 'begin': begin, 147 'end': end, 148 'force': force, 149 'retries': retries, 150 'min_seconds': min_seconds, 151 'check_existing': check_existing, 152 'blocking': blocking, 153 'workers': workers, 154 'callback': callback, 155 'error_callback': error_callback, 156 'sync_chunks': sync_chunks, 157 'chunksize': chunksize, 158 }) 159 160 ### NOTE: Invalidate `_exists` cache before and after syncing. 161 self._exists = None 162 163 def _sync( 164 p: 'meerschaum.Pipe', 165 df: Union[ 166 'pd.DataFrame', 167 Dict[str, List[Any]], 168 List[Dict[str, Any]], 169 InferFetch 170 ] = InferFetch, 171 ) -> SuccessTuple: 172 if df is None: 173 p._exists = None 174 return ( 175 False, 176 f"You passed `None` instead of data into `sync()` for {p}.\n" 177 + "Omit the DataFrame to infer fetching.", 178 ) 179 ### Ensure that Pipe is registered. 180 if not p.temporary and p.get_id(debug=debug) is None: 181 ### NOTE: This may trigger an interactive session for plugins! 182 register_success, register_msg = p.register(debug=debug) 183 if not register_success: 184 if 'already' not in register_msg: 185 p._exists = None 186 return register_success, register_msg 187 188 ### If connector is a plugin with a `sync()` method, return that instead. 189 ### If the plugin does not have a `sync()` method but does have a `fetch()` method, 190 ### use that instead. 191 ### NOTE: The DataFrame must be omitted for the plugin sync method to apply. 192 ### If a DataFrame is provided, continue as expected. 193 if hasattr(df, 'MRSM_INFER_FETCH'): 194 try: 195 if p.connector is None: 196 if ':' not in p.connector_keys: 197 return True, f"{p} does not support fetching; nothing to do." 198 199 msg = f"{p} does not have a valid connector." 200 if p.connector_keys.startswith('plugin:'): 201 msg += f"\n Perhaps {p.connector_keys} has a syntax error?" 202 p._exists = None 203 return False, msg 204 except Exception: 205 p._exists = None 206 return False, f"Unable to create the connector for {p}." 207 208 ### Sync in place if this is a SQL pipe. 209 if ( 210 str(self.connector) == str(self.instance_connector) 211 and 212 hasattr(self.instance_connector, 'sync_pipe_inplace') 213 and 214 _inplace 215 and 216 get_config('system', 'experimental', 'inplace_sync') 217 ): 218 with Venv(get_connector_plugin(self.instance_connector)): 219 p._exists = None 220 _args, _kwargs = filter_arguments( 221 p.instance_connector.sync_pipe_inplace, 222 p, 223 debug=debug, 224 **kw 225 ) 226 return self.instance_connector.sync_pipe_inplace( 227 *_args, 228 **_kwargs 229 ) 230 231 ### Activate and invoke `sync(pipe)` for plugin connectors with `sync` methods. 232 try: 233 if getattr(p.connector, 'sync', None) is not None: 234 with Venv(get_connector_plugin(p.connector), debug=debug): 235 _args, _kwargs = filter_arguments( 236 p.connector.sync, 237 p, 238 debug=debug, 239 **kw 240 ) 241 return_tuple = p.connector.sync(*_args, **_kwargs) 242 p._exists = None 243 if not isinstance(return_tuple, tuple): 244 return_tuple = ( 245 False, 246 f"Plugin '{p.connector.label}' returned non-tuple value: {return_tuple}" 247 ) 248 return return_tuple 249 250 except Exception as e: 251 get_console().print_exception() 252 msg = f"Failed to sync {p} with exception: '" + str(e) + "'" 253 if debug: 254 error(msg, silent=False) 255 p._exists = None 256 return False, msg 257 258 ### Fetch the dataframe from the connector's `fetch()` method. 259 try: 260 with Venv(get_connector_plugin(p.connector), debug=debug): 261 df = p.fetch( 262 **filter_keywords( 263 p.fetch, 264 debug=debug, 265 **kw 266 ) 267 ) 268 except Exception as e: 269 get_console().print_exception( 270 suppress=[ 271 'meerschaum/core/Pipe/_sync.py', 272 'meerschaum/core/Pipe/_fetch.py', 273 ] 274 ) 275 msg = f"Failed to fetch data from {p.connector}:\n {e}" 276 df = None 277 278 if df is None: 279 p._exists = None 280 return False, f"No data were fetched for {p}." 281 282 if isinstance(df, list): 283 if len(df) == 0: 284 return True, f"No new rows were returned for {p}." 285 286 ### May be a chunk hook results list. 287 if isinstance(df[0], tuple): 288 success = all([_success for _success, _ in df]) 289 message = '\n'.join([_message for _, _message in df]) 290 return success, message 291 292 ### TODO: Depreciate async? 293 if df is True: 294 p._exists = None 295 return True, f"{p} is being synced in parallel." 296 297 ### CHECKPOINT: Retrieved the DataFrame. 298 _checkpoint(**kw) 299 300 ### Allow for dataframe generators or iterables. 301 if df_is_chunk_generator(df): 302 kw['workers'] = p.get_num_workers(kw.get('workers', None)) 303 dt_col = p.columns.get('datetime', None) 304 pool = get_pool(workers=kw.get('workers', 1)) 305 if debug: 306 dprint(f"Received {type(df)}. Attempting to sync first chunk...") 307 308 try: 309 chunk = next(df) 310 except StopIteration: 311 return True, "Received an empty generator; nothing to do." 312 313 chunk_success, chunk_msg = _sync(p, chunk) 314 chunk_msg = '\n' + self._get_chunk_label(chunk, dt_col) + '\n' + chunk_msg 315 if not chunk_success: 316 return chunk_success, f"Unable to sync initial chunk for {p}:\n{chunk_msg}" 317 if debug: 318 dprint("Successfully synced the first chunk, attemping the rest...") 319 320 failed_chunks = [] 321 def _process_chunk(_chunk): 322 try: 323 _chunk_success, _chunk_msg = _sync(p, _chunk) 324 except Exception as e: 325 _chunk_success, _chunk_msg = False, str(e) 326 if not _chunk_success: 327 failed_chunks.append(_chunk) 328 return ( 329 _chunk_success, 330 ( 331 '\n' 332 + self._get_chunk_label(_chunk, dt_col) 333 + '\n' 334 + _chunk_msg 335 ) 336 ) 337 338 results = sorted( 339 [(chunk_success, chunk_msg)] + ( 340 list(pool.imap(_process_chunk, df)) 341 if not df_is_chunk_generator(chunk) 342 else [ 343 _process_chunk(_child_chunks) 344 for _child_chunks in df 345 ] 346 ) 347 ) 348 chunk_messages = [chunk_msg for _, chunk_msg in results] 349 success_bools = [chunk_success for chunk_success, _ in results] 350 success = all(success_bools) 351 msg = '\n'.join(chunk_messages) 352 353 ### If some chunks succeeded, retry the failures. 354 retry_success = True 355 if not success and any(success_bools): 356 if debug: 357 dprint("Retrying failed chunks...") 358 chunks_to_retry = [c for c in failed_chunks] 359 failed_chunks = [] 360 for chunk in chunks_to_retry: 361 chunk_success, chunk_msg = _process_chunk(chunk) 362 msg += f"\n\nRetried chunk:\n{chunk_msg}\n" 363 retry_success = retry_success and chunk_success 364 365 success = success and retry_success 366 return success, msg 367 368 ### Cast to a dataframe and ensure datatypes are what we expect. 369 df = self.enforce_dtypes(df, chunksize=chunksize, debug=debug) 370 371 ### Capture `numeric` and `json` columns. 372 self._persist_new_json_columns(df, debug=debug) 373 self._persist_new_numeric_columns(df, debug=debug) 374 375 if debug: 376 dprint( 377 "DataFrame to sync:\n" 378 + ( 379 str(df)[:255] 380 + '...' 381 if len(str(df)) >= 256 382 else str(df) 383 ), 384 **kw 385 ) 386 387 ### if force, continue to sync until success 388 return_tuple = False, f"Did not sync {p}." 389 run = True 390 _retries = 1 391 while run: 392 with Venv(get_connector_plugin(self.instance_connector)): 393 return_tuple = p.instance_connector.sync_pipe( 394 pipe=p, 395 df=df, 396 debug=debug, 397 **kw 398 ) 399 _retries += 1 400 run = (not return_tuple[0]) and force and _retries <= retries 401 if run and debug: 402 dprint(f"Syncing failed for {p}. Attempt ( {_retries} / {retries} )", **kw) 403 dprint(f"Sleeping for {min_seconds} seconds...", **kw) 404 time.sleep(min_seconds) 405 if _retries > retries: 406 warn( 407 f"Unable to sync {p} within {retries} attempt" + 408 ("s" if retries != 1 else "") + "!" 409 ) 410 411 ### CHECKPOINT: Finished syncing. Handle caching. 412 _checkpoint(**kw) 413 if self.cache_pipe is not None: 414 if debug: 415 dprint("Caching retrieved dataframe.", **kw) 416 _sync_cache_tuple = self.cache_pipe.sync(df, debug=debug, **kw) 417 if not _sync_cache_tuple[0]: 418 warn(f"Failed to sync local cache for {self}.") 419 420 self._exists = None 421 return return_tuple 422 423 if blocking: 424 self._exists = None 425 return _sync(self, df = df) 426 427 from meerschaum.utils.threading import Thread 428 def default_callback(result_tuple: SuccessTuple): 429 dprint(f"Asynchronous result from {self}: {result_tuple}", **kw) 430 431 def default_error_callback(x: Exception): 432 dprint(f"Error received for {self}: {x}", **kw) 433 434 if callback is None and debug: 435 callback = default_callback 436 if error_callback is None and debug: 437 error_callback = default_error_callback 438 try: 439 thread = Thread( 440 target=_sync, 441 args=(self,), 442 kwargs={'df': df}, 443 daemon=False, 444 callback=callback, 445 error_callback=error_callback, 446 ) 447 thread.start() 448 except Exception as e: 449 self._exists = None 450 return False, str(e) 451 452 self._exists = None 453 return True, f"Spawned asyncronous sync for {self}."
Fetch new data from the source and update the pipe's table with new data.
Get new remote data via fetch, get existing data in the same time period, and merge the two, only keeping the unseen data.
Parameters
- df (Union[None, pd.DataFrame, Dict[str, List[Any]]], default None):
An optional DataFrame to sync into the pipe. Defaults to
None
. - begin (Union[datetime, int, str, None], default ''): Optionally specify the earliest datetime to search for data.
- end (Union[datetime, int, str, None], default None): Optionally specify the latest datetime to search for data.
- force (bool, default False):
If
True
, keep trying to sync untulretries
attempts. - retries (int, default 10):
If
force
, how many attempts to try syncing before declaring failure. - min_seconds (Union[int, float], default 1):
If
force
, how many seconds to sleep between retries. Defaults to1
. - check_existing (bool, default True):
If
True
, pull and diff with existing data from the pipe. - blocking (bool, default True):
If
True
, wait for sync to finish and return its result, otherwise asyncronously sync (oxymoron?) and return success. Defaults toTrue
. Only intended for specific scenarios. - workers (Optional[int], default None):
If provided and the instance connector is thread-safe
(
pipe.instance_connector.IS_THREAD_SAFE is True
), limit concurrent sync to this many threads. - callback (Optional[Callable[[Tuple[bool, str]], Any]], default None):
Callback function which expects a SuccessTuple as input.
Only applies when
blocking=False
. - error_callback (Optional[Callable[[Exception], Any]], default None):
Callback function which expects an Exception as input.
Only applies when
blocking=False
. - chunksize (int, default -1):
Specify the number of rows to sync per chunk.
If
-1
, resort to system configuration (default is900
). Achunksize
ofNone
will sync all rows in one transaction. - sync_chunks (bool, default True): If possible, sync chunks while fetching them into memory.
- debug (bool, default False): Verbosity toggle. Defaults to False.
Returns
- A
SuccessTuple
of success (bool
) and message (str
).
456def get_sync_time( 457 self, 458 params: Optional[Dict[str, Any]] = None, 459 newest: bool = True, 460 apply_backtrack_interval: bool = False, 461 round_down: bool = False, 462 debug: bool = False 463) -> Union['datetime', None]: 464 """ 465 Get the most recent datetime value for a Pipe. 466 467 Parameters 468 ---------- 469 params: Optional[Dict[str, Any]], default None 470 Dictionary to build a WHERE clause for a specific column. 471 See `meerschaum.utils.sql.build_where`. 472 473 newest: bool, default True 474 If `True`, get the most recent datetime (honoring `params`). 475 If `False`, get the oldest datetime (`ASC` instead of `DESC`). 476 477 apply_backtrack_interval: bool, default False 478 If `True`, subtract the backtrack interval from the sync time. 479 480 round_down: bool, default False 481 If `True`, round down the datetime value to the nearest minute. 482 483 debug: bool, default False 484 Verbosity toggle. 485 486 Returns 487 ------- 488 A `datetime` object if the pipe exists, otherwise `None`. 489 490 """ 491 from meerschaum.utils.venv import Venv 492 from meerschaum.connectors import get_connector_plugin 493 from meerschaum.utils.misc import round_time 494 495 with Venv(get_connector_plugin(self.instance_connector)): 496 sync_time = self.instance_connector.get_sync_time( 497 self, 498 params=params, 499 newest=newest, 500 debug=debug, 501 ) 502 503 if round_down and isinstance(sync_time, datetime): 504 sync_time = round_time(sync_time, timedelta(minutes=1)) 505 506 if apply_backtrack_interval and sync_time is not None: 507 backtrack_interval = self.get_backtrack_interval(debug=debug) 508 try: 509 sync_time -= backtrack_interval 510 except Exception as e: 511 warn(f"Failed to apply backtrack interval:\n{e}") 512 513 return sync_time
Get the most recent datetime value for a Pipe.
Parameters
- params (Optional[Dict[str, Any]], default None):
Dictionary to build a WHERE clause for a specific column.
See
meerschaum.utils.sql.build_where
. - newest (bool, default True):
If
True
, get the most recent datetime (honoringparams
). IfFalse
, get the oldest datetime (ASC
instead ofDESC
). - apply_backtrack_interval (bool, default False):
If
True
, subtract the backtrack interval from the sync time. - round_down (bool, default False):
If
True
, round down the datetime value to the nearest minute. - debug (bool, default False): Verbosity toggle.
Returns
- A
datetime
object if the pipe exists, otherwiseNone
.
516def exists( 517 self, 518 debug : bool = False 519 ) -> bool: 520 """ 521 See if a Pipe's table exists. 522 523 Parameters 524 ---------- 525 debug: bool, default False 526 Verbosity toggle. 527 528 Returns 529 ------- 530 A `bool` corresponding to whether a pipe's underlying table exists. 531 532 """ 533 import time 534 from meerschaum.utils.venv import Venv 535 from meerschaum.connectors import get_connector_plugin 536 from meerschaum.config import STATIC_CONFIG 537 from meerschaum.utils.debug import dprint 538 now = time.perf_counter() 539 exists_timeout_seconds = STATIC_CONFIG['pipes']['exists_timeout_seconds'] 540 541 _exists = self.__dict__.get('_exists', None) 542 if _exists: 543 exists_timestamp = self.__dict__.get('_exists_timestamp', None) 544 if exists_timestamp is not None: 545 delta = now - exists_timestamp 546 if delta < exists_timeout_seconds: 547 if debug: 548 dprint(f"Returning cached `exists` for {self} ({round(delta, 2)} seconds old).") 549 return _exists 550 551 with Venv(get_connector_plugin(self.instance_connector)): 552 _exists = self.instance_connector.pipe_exists(pipe=self, debug=debug) 553 554 self.__dict__['_exists'] = _exists 555 self.__dict__['_exists_timestamp'] = now 556 return _exists
See if a Pipe's table exists.
Parameters
- debug (bool, default False): Verbosity toggle.
Returns
- A
bool
corresponding to whether a pipe's underlying table exists.
559def filter_existing( 560 self, 561 df: 'pd.DataFrame', 562 safe_copy: bool = True, 563 date_bound_only: bool = False, 564 include_unchanged_columns: bool = False, 565 chunksize: Optional[int] = -1, 566 debug: bool = False, 567 **kw 568) -> Tuple['pd.DataFrame', 'pd.DataFrame', 'pd.DataFrame']: 569 """ 570 Inspect a dataframe and filter out rows which already exist in the pipe. 571 572 Parameters 573 ---------- 574 df: 'pd.DataFrame' 575 The dataframe to inspect and filter. 576 577 safe_copy: bool, default True 578 If `True`, create a copy before comparing and modifying the dataframes. 579 Setting to `False` may mutate the DataFrames. 580 See `meerschaum.utils.dataframe.filter_unseen_df`. 581 582 date_bound_only: bool, default False 583 If `True`, only use the datetime index to fetch the sample dataframe. 584 585 include_unchanged_columns: bool, default False 586 If `True`, include the backtrack columns which haven't changed in the update dataframe. 587 This is useful if you can't update individual keys. 588 589 chunksize: Optional[int], default -1 590 The `chunksize` used when fetching existing data. 591 592 debug: bool, default False 593 Verbosity toggle. 594 595 Returns 596 ------- 597 A tuple of three pandas DataFrames: unseen, update, and delta. 598 """ 599 from meerschaum.utils.warnings import warn 600 from meerschaum.utils.debug import dprint 601 from meerschaum.utils.packages import attempt_import, import_pandas 602 from meerschaum.utils.misc import round_time 603 from meerschaum.utils.dataframe import ( 604 filter_unseen_df, 605 add_missing_cols_to_df, 606 get_unhashable_cols, 607 get_numeric_cols, 608 ) 609 from meerschaum.utils.dtypes import ( 610 to_pandas_dtype, 611 none_if_null, 612 ) 613 from meerschaum.config import get_config 614 pd = import_pandas() 615 pandas = attempt_import('pandas') 616 if 'dataframe' not in str(type(df)).lower(): 617 df = self.enforce_dtypes(df, chunksize=chunksize, debug=debug) 618 is_dask = 'dask' in df.__module__ 619 if is_dask: 620 dd = attempt_import('dask.dataframe') 621 merge = dd.merge 622 NA = pandas.NA 623 else: 624 merge = pd.merge 625 NA = pd.NA 626 627 def get_empty_df(): 628 empty_df = pd.DataFrame([]) 629 dtypes = dict(df.dtypes) if df is not None else {} 630 dtypes.update(self.dtypes) 631 pd_dtypes = { 632 col: to_pandas_dtype(str(typ)) 633 for col, typ in dtypes.items() 634 } 635 return add_missing_cols_to_df(empty_df, pd_dtypes) 636 637 if df is None: 638 empty_df = get_empty_df() 639 return empty_df, empty_df, empty_df 640 641 if (df.empty if not is_dask else len(df) == 0): 642 return df, df, df 643 644 ### begin is the oldest data in the new dataframe 645 begin, end = None, None 646 dt_col = self.columns.get('datetime', None) 647 dt_type = self.dtypes.get(dt_col, 'datetime64[ns]') if dt_col else None 648 try: 649 min_dt_val = df[dt_col].min(skipna=True) if dt_col else None 650 if is_dask and min_dt_val is not None: 651 min_dt_val = min_dt_val.compute() 652 min_dt = ( 653 pandas.to_datetime(min_dt_val).to_pydatetime() 654 if min_dt_val is not None and 'datetime' in str(dt_type) 655 else min_dt_val 656 ) 657 except Exception: 658 min_dt = None 659 if not ('datetime' in str(type(min_dt))) or str(min_dt) == 'NaT': 660 if 'int' not in str(type(min_dt)).lower(): 661 min_dt = None 662 663 if isinstance(min_dt, datetime): 664 begin = ( 665 round_time( 666 min_dt, 667 to='down' 668 ) - timedelta(minutes=1) 669 ) 670 elif dt_type and 'int' in dt_type.lower(): 671 begin = min_dt 672 elif dt_col is None: 673 begin = None 674 675 ### end is the newest data in the new dataframe 676 try: 677 max_dt_val = df[dt_col].max(skipna=True) if dt_col else None 678 if is_dask and max_dt_val is not None: 679 max_dt_val = max_dt_val.compute() 680 max_dt = ( 681 pandas.to_datetime(max_dt_val).to_pydatetime() 682 if max_dt_val is not None and 'datetime' in str(dt_type) 683 else max_dt_val 684 ) 685 except Exception: 686 import traceback 687 traceback.print_exc() 688 max_dt = None 689 690 if ('datetime' not in str(type(max_dt))) or str(min_dt) == 'NaT': 691 if 'int' not in str(type(max_dt)).lower(): 692 max_dt = None 693 694 if isinstance(max_dt, datetime): 695 end = ( 696 round_time( 697 max_dt, 698 to='down' 699 ) + timedelta(minutes=1) 700 ) 701 elif dt_type and 'int' in dt_type.lower(): 702 end = max_dt + 1 703 704 if max_dt is not None and min_dt is not None and min_dt > max_dt: 705 warn("Detected minimum datetime greater than maximum datetime.") 706 707 if begin is not None and end is not None and begin > end: 708 if isinstance(begin, datetime): 709 begin = end - timedelta(minutes=1) 710 ### We might be using integers for the datetime axis. 711 else: 712 begin = end - 1 713 714 unique_index_vals = { 715 col: df[col].unique() 716 for col in self.columns 717 if col in df.columns and col != dt_col 718 } if not date_bound_only else {} 719 filter_params_index_limit = get_config('pipes', 'sync', 'filter_params_index_limit') 720 _ = kw.pop('params', None) 721 params = { 722 col: [ 723 none_if_null(val) 724 for val in unique_vals 725 ] 726 for col, unique_vals in unique_index_vals.items() 727 if len(unique_vals) <= filter_params_index_limit 728 } if not date_bound_only else {} 729 730 if debug: 731 dprint(f"Looking at data between '{begin}' and '{end}':", **kw) 732 733 backtrack_df = self.get_data( 734 begin=begin, 735 end=end, 736 chunksize=chunksize, 737 params=params, 738 debug=debug, 739 **kw 740 ) 741 if backtrack_df is None: 742 if debug: 743 dprint(f"No backtrack data was found for {self}.") 744 return df, get_empty_df(), df 745 746 if debug: 747 dprint(f"Existing data for {self}:\n" + str(backtrack_df), **kw) 748 dprint(f"Existing dtypes for {self}:\n" + str(backtrack_df.dtypes)) 749 750 ### Separate new rows from changed ones. 751 on_cols = [ 752 col for col_key, col in self.columns.items() 753 if ( 754 col 755 and 756 col_key != 'value' 757 and col in backtrack_df.columns 758 ) 759 ] 760 self_dtypes = self.dtypes 761 on_cols_dtypes = { 762 col: to_pandas_dtype(typ) 763 for col, typ in self_dtypes.items() 764 if col in on_cols 765 } 766 767 ### Detect changes between the old target and new source dataframes. 768 delta_df = add_missing_cols_to_df( 769 filter_unseen_df( 770 backtrack_df, 771 df, 772 dtypes={ 773 col: to_pandas_dtype(typ) 774 for col, typ in self_dtypes.items() 775 }, 776 safe_copy=safe_copy, 777 debug=debug 778 ), 779 on_cols_dtypes, 780 ) 781 782 ### Cast dicts or lists to strings so we can merge. 783 serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str) 784 785 def deserializer(x): 786 return json.loads(x) if isinstance(x, str) else x 787 788 unhashable_delta_cols = get_unhashable_cols(delta_df) 789 unhashable_backtrack_cols = get_unhashable_cols(backtrack_df) 790 for col in unhashable_delta_cols: 791 delta_df[col] = delta_df[col].apply(serializer) 792 for col in unhashable_backtrack_cols: 793 backtrack_df[col] = backtrack_df[col].apply(serializer) 794 casted_cols = set(unhashable_delta_cols + unhashable_backtrack_cols) 795 796 joined_df = merge( 797 delta_df.infer_objects(copy=False).fillna(NA), 798 backtrack_df.infer_objects(copy=False).fillna(NA), 799 how='left', 800 on=on_cols, 801 indicator=True, 802 suffixes=('', '_old'), 803 ) if on_cols else delta_df 804 for col in casted_cols: 805 if col in joined_df.columns: 806 joined_df[col] = joined_df[col].apply(deserializer) 807 if col in delta_df.columns: 808 delta_df[col] = delta_df[col].apply(deserializer) 809 810 ### Determine which rows are completely new. 811 new_rows_mask = (joined_df['_merge'] == 'left_only') if on_cols else None 812 cols = list(delta_df.columns) 813 814 unseen_df = ( 815 joined_df 816 .where(new_rows_mask) 817 .dropna(how='all')[cols] 818 .reset_index(drop=True) 819 ) if on_cols else delta_df 820 821 ### Rows that have already been inserted but values have changed. 822 update_df = ( 823 joined_df 824 .where(~new_rows_mask) 825 .dropna(how='all')[cols] 826 .reset_index(drop=True) 827 ) if on_cols else get_empty_df() 828 829 if include_unchanged_columns and on_cols: 830 unchanged_backtrack_cols = [ 831 col 832 for col in backtrack_df.columns 833 if col in on_cols or col not in update_df.columns 834 ] 835 update_df = merge( 836 backtrack_df[unchanged_backtrack_cols], 837 update_df, 838 how='inner', 839 on=on_cols, 840 ) 841 842 return unseen_df, update_df, delta_df
Inspect a dataframe and filter out rows which already exist in the pipe.
Parameters
- df ('pd.DataFrame'): The dataframe to inspect and filter.
- safe_copy (bool, default True):
If
True
, create a copy before comparing and modifying the dataframes. Setting toFalse
may mutate the DataFrames. Seemeerschaum.utils.dataframe.filter_unseen_df
. - date_bound_only (bool, default False):
If
True
, only use the datetime index to fetch the sample dataframe. - include_unchanged_columns (bool, default False):
If
True
, include the backtrack columns which haven't changed in the update dataframe. This is useful if you can't update individual keys. - chunksize (Optional[int], default -1):
The
chunksize
used when fetching existing data. - debug (bool, default False): Verbosity toggle.
Returns
- A tuple of three pandas DataFrames (unseen, update, and delta.):
867def get_num_workers(self, workers: Optional[int] = None) -> int: 868 """ 869 Get the number of workers to use for concurrent syncs. 870 871 Parameters 872 ---------- 873 The number of workers passed via `--workers`. 874 875 Returns 876 ------- 877 The number of workers, capped for safety. 878 """ 879 is_thread_safe = getattr(self.instance_connector, 'IS_THREAD_SAFE', False) 880 if not is_thread_safe: 881 return 1 882 883 engine_pool_size = ( 884 self.instance_connector.engine.pool.size() 885 if self.instance_connector.type == 'sql' 886 else None 887 ) 888 current_num_threads = threading.active_count() 889 current_num_connections = ( 890 self.instance_connector.engine.pool.checkedout() 891 if engine_pool_size is not None 892 else current_num_threads 893 ) 894 desired_workers = ( 895 min(workers or engine_pool_size, engine_pool_size) 896 if engine_pool_size is not None 897 else workers 898 ) 899 if desired_workers is None: 900 desired_workers = (multiprocessing.cpu_count() if is_thread_safe else 1) 901 902 return max( 903 (desired_workers - current_num_connections), 904 1, 905 )
Get the number of workers to use for concurrent syncs.
Parameters
- The number of workers passed via
--workers
.
Returns
- The number of workers, capped for safety.
15def verify( 16 self, 17 begin: Union[datetime, int, None] = None, 18 end: Union[datetime, int, None] = None, 19 params: Optional[Dict[str, Any]] = None, 20 chunk_interval: Union[timedelta, int, None] = None, 21 bounded: Optional[bool] = None, 22 deduplicate: bool = False, 23 workers: Optional[int] = None, 24 debug: bool = False, 25 **kwargs: Any 26) -> SuccessTuple: 27 """ 28 Verify the contents of the pipe by resyncing its interval. 29 30 Parameters 31 ---------- 32 begin: Union[datetime, int, None], default None 33 If specified, only verify rows greater than or equal to this value. 34 35 end: Union[datetime, int, None], default None 36 If specified, only verify rows less than this value. 37 38 chunk_interval: Union[timedelta, int, None], default None 39 If provided, use this as the size of the chunk boundaries. 40 Default to the value set in `pipe.parameters['chunk_minutes']` (1440). 41 42 bounded: Optional[bool], default None 43 If `True`, do not verify older than the oldest sync time or newer than the newest. 44 If `False`, verify unbounded syncs outside of the new and old sync times. 45 The default behavior (`None`) is to bound only if a bound interval is set 46 (e.g. `pipe.parameters['verify']['bound_days']`). 47 48 deduplicate: bool, default False 49 If `True`, deduplicate the pipe's table after the verification syncs. 50 51 workers: Optional[int], default None 52 If provided, limit the verification to this many threads. 53 Use a value of `1` to sync chunks in series. 54 55 debug: bool, default False 56 Verbosity toggle. 57 58 kwargs: Any 59 All keyword arguments are passed to `pipe.sync()`. 60 61 Returns 62 ------- 63 A SuccessTuple indicating whether the pipe was successfully resynced. 64 """ 65 from meerschaum.utils.pool import get_pool 66 from meerschaum.utils.misc import interval_str 67 workers = self.get_num_workers(workers) 68 69 ### Skip configured bounding in parameters 70 ### if `bounded` is explicitly `False`. 71 bound_time = ( 72 self.get_bound_time(debug=debug) 73 if bounded is not False 74 else None 75 ) 76 if bounded is None: 77 bounded = bound_time is not None 78 79 if bounded and begin is None: 80 begin = ( 81 bound_time 82 if bound_time is not None 83 else self.get_sync_time(newest=False, debug=debug) 84 ) 85 if bounded and end is None: 86 end = self.get_sync_time(newest=True, debug=debug) 87 88 if bounded and end is not None: 89 end += ( 90 timedelta(minutes=1) 91 if isinstance(end, datetime) 92 else 1 93 ) 94 95 sync_less_than_begin = not bounded and begin is None 96 sync_greater_than_end = not bounded and end is None 97 98 cannot_determine_bounds = not self.exists(debug=debug) 99 100 if cannot_determine_bounds: 101 sync_success, sync_msg = self.sync( 102 begin = begin, 103 end = end, 104 params = params, 105 workers = workers, 106 debug = debug, 107 **kwargs 108 ) 109 if not sync_success: 110 return sync_success, sync_msg 111 if deduplicate: 112 return self.deduplicate( 113 begin = begin, 114 end = end, 115 params = params, 116 workers = workers, 117 debug = debug, 118 **kwargs 119 ) 120 return sync_success, sync_msg 121 122 123 chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug) 124 chunk_bounds = self.get_chunk_bounds( 125 begin = begin, 126 end = end, 127 chunk_interval = chunk_interval, 128 bounded = bounded, 129 debug = debug, 130 ) 131 132 ### Consider it a success if no chunks need to be verified. 133 if not chunk_bounds: 134 if deduplicate: 135 return self.deduplicate( 136 begin = begin, 137 end = end, 138 params = params, 139 workers = workers, 140 debug = debug, 141 **kwargs 142 ) 143 return True, f"Could not determine chunks between '{begin}' and '{end}'; nothing to do." 144 145 begin_to_print = ( 146 begin 147 if begin is not None 148 else ( 149 chunk_bounds[0][0] 150 if bounded 151 else chunk_bounds[0][1] 152 ) 153 ) 154 end_to_print = ( 155 end 156 if end is not None 157 else ( 158 chunk_bounds[-1][1] 159 if bounded 160 else chunk_bounds[-1][0] 161 ) 162 ) 163 164 info( 165 f"Syncing {len(chunk_bounds)} chunk" + ('s' if len(chunk_bounds) != 1 else '') 166 + f" ({'un' if not bounded else ''}bounded)" 167 + f" of size '{interval_str(chunk_interval)}'" 168 + f" between '{begin_to_print}' and '{end_to_print}'." 169 ) 170 171 pool = get_pool(workers=workers) 172 173 ### Dictionary of the form bounds -> success_tuple, e.g.: 174 ### { 175 ### (2023-01-01, 2023-01-02): (True, "Success") 176 ### } 177 bounds_success_tuples = {} 178 def process_chunk_bounds( 179 chunk_begin_and_end: Tuple[ 180 Union[int, datetime], 181 Union[int, datetime] 182 ] 183 ): 184 if chunk_begin_and_end in bounds_success_tuples: 185 return chunk_begin_and_end, bounds_success_tuples[chunk_begin_and_end] 186 187 chunk_begin, chunk_end = chunk_begin_and_end 188 return