meerschaum
Meerschaum Python API
Welcome to the Meerschaum Python API technical documentation! Here you can find information about the classes and functions provided by the meerschaum
package. Visit meerschaum.io for general usage documentation.
Root Module
For your convenience, the following classes and functions may be imported from the root meerschaum
namespace:
Classes
Examples
Build a Connector
Get existing connectors or build a new one in-memory with the meerschaum.get_connector()
factory function:
import meerschaum as mrsm
sql_conn = mrsm.get_connector(
'sql:temp',
flavor='sqlite',
database='/tmp/tmp.db',
)
df = sql_conn.read("SELECT 1 AS foo")
print(df)
# foo
# 0 1
sql_conn.to_sql(df, 'foo')
print(sql_conn.read('foo'))
# foo
# 0 1
Create a Custom Connector Class
Decorate your connector classes with meerschaum.make_connector()
to designate it as a custom connector:
from datetime import datetime, timezone
from random import randint
import meerschaum as mrsm
from meerschaum.utils.misc import round_time
@mrsm.make_connector
class FooConnector(mrsm.Connector):
REQUIRED_ATTRIBUTES = ['username', 'password']
def fetch(
self,
begin: datetime | None = None,
end: datetime | None = None,
):
now = begin or round_time(datetime.now(timezone.utc))
return [
{'ts': now, 'id': 1, 'vl': randint(1, 100)},
{'ts': now, 'id': 2, 'vl': randint(1, 100)},
{'ts': now, 'id': 3, 'vl': randint(1, 100)},
]
foo_conn = mrsm.get_connector(
'foo:bar',
username='foo',
password='bar',
)
docs = foo_conn.fetch()
Build a Pipe
Build a meerschaum.Pipe
in-memory:
from datetime import datetime
import meerschaum as mrsm
pipe = mrsm.Pipe(
foo_conn, 'demo',
instance=sql_conn,
columns={'datetime': 'ts', 'id': 'id'},
tags=['production'],
)
pipe.sync(begin=datetime(2024, 1, 1))
df = pipe.get_data()
print(df)
# ts id vl
# 0 2024-01-01 1 97
# 1 2024-01-01 2 18
# 2 2024-01-01 3 96
Add temporary=True
to skip registering the pipe in the pipes table.
Get Registered Pipes
The meerschaum.get_pipes()
function returns a dictionary hierarchy of pipes by connector, metric, and location:
import meerschaum as mrsm
pipes = mrsm.get_pipes(instance='sql:temp')
pipe = pipes['foo:bar']['demo'][None]
Add as_list=True
to flatten the hierarchy:
import meerschaum as mrsm
pipes = mrsm.get_pipes(
tags=['production'],
instance=sql_conn,
as_list=True,
)
print(pipes)
# [Pipe('foo:bar', 'demo', instance='sql:temp')]
Import Plugins
You can import a plugin's module through meerschaum.Plugin.module
:
import meerschaum as mrsm
plugin = mrsm.Plugin('noaa')
with mrsm.Venv(plugin):
noaa = plugin.module
If your plugin has submodules, use meerschaum.plugins.from_plugin_import
:
from meerschaum.plugins import from_plugin_import
get_defined_pipes = from_plugin_import('compose.utils.pipes', 'get_defined_pipes')
Import multiple plugins with meerschaum.plugins.import_plugins
:
from meerschaum.plugins import import_plugins
noaa, compose = import_plugins('noaa', 'compose')
Create a Job
Create a meerschaum.Job
with name
and sysargs
:
import meerschaum as mrsm
job = mrsm.Job('syncing-engine', 'sync pipes --loop')
success, msg = job.start()
Pass executor_keys
as the connectors keys of an API instance to create a remote job:
import meerschaum as mrsm
job = mrsm.Job(
'foo',
'sync pipes -s daily',
executor_keys='api:main',
)
Import from a Virtual Environment
Use the meerschaum.Venv
context manager to activate a virtual environment:
import meerschaum as mrsm
with mrsm.Venv('noaa'):
import requests
print(requests.__file__)
# /home/bmeares/.config/meerschaum/venvs/noaa/lib/python3.12/site-packages/requests/__init__.py
To import packages which may not be installed, use meerschaum.attempt_import()
:
import meerschaum as mrsm
requests = mrsm.attempt_import('requests', venv='noaa')
print(requests.__file__)
# /home/bmeares/.config/meerschaum/venvs/noaa/lib/python3.12/site-packages/requests/__init__.py
Run Actions
Run sysargs
with meerschaum.entry()
:
import meerschaum as mrsm
success, msg = mrsm.entry('show pipes + show version : x2')
Use meerschaum.actions.get_action()
to access an action function directly:
from meerschaum.actions import get_action
show_pipes = get_action(['show', 'pipes'])
success, msg = show_pipes(connector_keys=['plugin:noaa'])
Get a dictionary of available subactions with meerschaum.actions.get_subactions()
:
from meerschaum.actions import get_subactions
subactions = get_subactions('show')
success, msg = subactions['pipes']()
Create a Plugin
Run bootstrap plugin
to create a new plugin:
mrsm bootstrap plugin example
This will create example.py
in your plugins directory (default ~/.config/meerschaum/plugins/
, Windows: %APPDATA%\Meerschaum\plugins
). You may paste the example code from the "Create a Custom Action" example below.
Open your plugin with edit plugin
:
mrsm edit plugin example
Run edit plugin
and paste the example code below to try out the features.
See the writing plugins guide for more in-depth documentation.
Create a Custom Action
Decorate a function with meerschaum.actions.make_action
to designate it as an action. Subactions will be automatically detected if not decorated:
from meerschaum.actions import make_action
@make_action
def sing():
print('What would you like me to sing?')
return True, "Success"
def sing_tune():
return False, "I don't know that song!"
def sing_song():
print('Hello, World!')
return True, "Success"
Use meerschaum.plugins.add_plugin_argument()
to create new parameters for your action:
from meerschaum.plugins import make_action, add_plugin_argument
add_plugin_argument(
'--song', type=str, help='What song to sing.',
)
@make_action
def sing_melody(action=None, song=None):
to_sing = action[0] if action else song
if not to_sing:
return False, "Please tell me what to sing!"
return True, f'~I am singing {to_sing}~'
mrsm sing melody lalala
mrsm sing melody --song do-re-mi
Add a Page to the Web Dashboard
Use the decorators meerschaum.plugins.dash_plugin()
and meerschaum.plugins.web_page()
to add new pages to the web dashboard:
from meerschaum.plugins import dash_plugin, web_page
@dash_plugin
def init_dash(dash_app):
import dash.html as html
import dash_bootstrap_components as dbc
from dash import Input, Output, no_update
### Routes to '/dash/my-page'
@web_page('/my-page', login_required=False)
def my_page():
return dbc.Container([
html.H1("Hello, World!"),
dbc.Button("Click me", id='my-button'),
html.Div(id="my-output-div"),
])
@dash_app.callback(
Output('my-output-div', 'children'),
Input('my-button', 'n_clicks'),
)
def my_button_click(n_clicks):
if not n_clicks:
return no_update
return html.P(f'You clicked {n_clicks} times!')
Submodules
meerschaum.actions
Access functions for actions and subactions.
meerschaum.actions.actions
meerschaum.actions.get_action()
meerschaum.actions.get_completer()
meerschaum.actions.get_main_action_name()
meerschaum.actions.get_subactions()
meerschaum.config
Read and write the Meerschaum configuration registry.
meerschaum.config.get_config()
meerschaum.config.get_plugin_config()
meerschaum.config.write_config()
meerschaum.config.write_plugin_config()
meerschaum.connectors
Build connectors to interact with databases and fetch data.
meerschaum.connectors.get_connector()
meerschaum.connectors.make_connector()
meerschaum.connectors.is_connected()
meerschaum.connectors.poll.retry_connect()
meerschaum.connectors.Connector
meerschaum.connectors.sql.SQLConnector
meerschaum.connectors.api.APIConnector
meerschaum.connectors.valkey.ValkeyConnector
meerschaum.jobs
Start background jobs.
meerschaum.jobs.Job
meerschaum.jobs.Executor
meerschaum.jobs.systemd.SystemdExecutor
meerschaum.jobs.get_jobs()
meerschaum.jobs.get_filtered_jobs()
meerschaum.jobs.get_running_jobs()
meerschaum.jobs.get_stopped_jobs()
meerschaum.jobs.get_paused_jobs()
meerschaum.jobs.get_restart_jobs()
meerschaum.jobs.make_executor()
meerschaum.jobs.check_restart_jobs()
meerschaum.jobs.start_check_jobs_thread()
meerschaum.jobs.stop_check_jobs_thread()
meerschaum.plugins
Access plugin modules and other API utilties.
meerschaum.plugins.Plugin
meerschaum.plugins.api_plugin()
meerschaum.plugins.dash_plugin()
meerschaum.plugins.import_plugins()
meerschaum.plugins.reload_plugins()
meerschaum.plugins.get_plugins()
meerschaum.plugins.get_data_plugins()
meerschaum.plugins.add_plugin_argument()
meerschaum.plugins.pre_sync_hook()
meerschaum.plugins.post_sync_hook()
meerschaum.utils
Utility functions are available in several submodules:
meerschaum.utils.daemon.daemon_entry()
meerschaum.utils.daemon.daemon_action()
meerschaum.utils.daemon.get_daemons()
meerschaum.utils.daemon.get_daemon_ids()
meerschaum.utils.daemon.get_running_daemons()
meerschaum.utils.daemon.get_paused_daemons()
meerschaum.utils.daemon.get_stopped_daemons()
meerschaum.utils.daemon.get_filtered_daemons()
meerschaum.utils.daemon.run_daemon()
meerschaum.utils.daemon.Daemon
meerschaum.utils.daemon.FileDescriptorInterceptor
meerschaum.utils.daemon.RotatingFile
meerschaum.utils.daemon
Manage background jobs.
meerschaum.utils.dataframe.add_missing_cols_to_df()
meerschaum.utils.dataframe.df_is_chunk_generator()
meerschaum.utils.dataframe.enforce_dtypes()
meerschaum.utils.dataframe.filter_unseen_df()
meerschaum.utils.dataframe.get_datetime_bound_from_df()
meerschaum.utils.dataframe.get_first_valid_dask_partition()
meerschaum.utils.dataframe.get_json_cols()
meerschaum.utils.dataframe.get_numeric_cols()
meerschaum.utils.dataframe.get_unhashable_cols()
meerschaum.utils.dataframe.parse_df_datetimes()
meerschaum.utils.dataframe.query_df()
meerschaum.utils.dataframe.to_json()
meerschaum.utils.dataframe
Manipulate dataframes.
meerschaum.utils.dtypes.are_dtypes_equal()
meerschaum.utils.dtypes.attempt_cast_to_numeric()
meerschaum.utils.dtypes.is_dtype_numeric()
meerschaum.utils.dtypes.none_if_null()
meerschaum.utils.dtypes.quantize_decimal()
meerschaum.utils.dtypes.to_pandas_dtype()
meerschaum.utils.dtypes.value_is_null()
meerschaum.utils.dtypes.sql.get_pd_type_from_db_type()
meerschaum.utils.dtypes.sql.get_db_type_from_pd_type()
meerschaum.utils.dtypes
Work with data types.
meerschaum.utils.formatting.colored()
meerschaum.utils.formatting.extract_stats_from_message()
meerschaum.utils.formatting.fill_ansi()
meerschaum.utils.formatting.get_console()
meerschaum.utils.formatting.highlight_pipes()
meerschaum.utils.formatting.make_header()
meerschaum.utils.formatting.pipe_repr()
meerschaum.utils.formatting.pprint()
meerschaum.utils.formatting.pprint_pipes()
meerschaum.utils.formatting.print_options()
meerschaum.utils.formatting.print_pipes_results()
meerschaum.utils.formatting.print_tuple()
meerschaum.utils.formatting.translate_rich_to_termcolor()
meerschaum.utils.formatting
Format output text.
meerschaum.utils.misc.items_str()
meerschaum.utils.misc.round_time()
meerschaum.utils.misc.is_int()
meerschaum.utils.misc.interval_str()
meerschaum.utils.misc.filter_keywords()
meerschaum.utils.misc.generate_password()
meerschaum.utils.misc.string_to_dict()
meerschaum.utils.misc.iterate_chunks()
meerschaum.utils.misc.timed_input()
meerschaum.utils.misc.replace_pipes_in_dict()
meerschaum.utils.misc.is_valid_email()
meerschaum.utils.misc.string_width()
meerschaum.utils.misc.replace_password()
meerschaum.utils.misc.parse_config_substitution()
meerschaum.utils.misc.edit_file()
meerschaum.utils.misc.get_in_ex_params()
meerschaum.utils.misc.separate_negation_values()
meerschaum.utils.misc.flatten_list()
meerschaum.utils.misc.make_symlink()
meerschaum.utils.misc.is_symlink()
meerschaum.utils.misc.wget()
meerschaum.utils.misc.add_method_to_class()
meerschaum.utils.misc.is_pipe_registered()
meerschaum.utils.misc.get_cols_lines()
meerschaum.utils.misc.sorted_dict()
meerschaum.utils.misc.flatten_pipes_dict()
meerschaum.utils.misc.dict_from_od()
meerschaum.utils.misc.remove_ansi()
meerschaum.utils.misc.get_connector_labels()
meerschaum.utils.misc.json_serialize_datetime()
meerschaum.utils.misc.async_wrap()
meerschaum.utils.misc.is_docker_available()
meerschaum.utils.misc.is_android()
meerschaum.utils.misc.is_bcp_available()
meerschaum.utils.misc.truncate_string_sections()
meerschaum.utils.misc.safely_extract_tar()
meerschaum.utils.misc
Miscellaneous utility functions.
meerschaum.utils.packages.attempt_import()
meerschaum.utils.packages.get_module_path()
meerschaum.utils.packages.manually_import_module()
meerschaum.utils.packages.get_install_no_version()
meerschaum.utils.packages.determine_version()
meerschaum.utils.packages.need_update()
meerschaum.utils.packages.get_pip()
meerschaum.utils.packages.pip_install()
meerschaum.utils.packages.pip_uninstall()
meerschaum.utils.packages.completely_uninstall_package()
meerschaum.utils.packages.run_python_package()
meerschaum.utils.packages.lazy_import()
meerschaum.utils.packages.pandas_name()
meerschaum.utils.packages.import_pandas()
meerschaum.utils.packages.import_rich()
meerschaum.utils.packages.import_dcc()
meerschaum.utils.packages.import_html()
meerschaum.utils.packages.get_modules_from_package()
meerschaum.utils.packages.import_children()
meerschaum.utils.packages.reload_package()
meerschaum.utils.packages.reload_meerschaum()
meerschaum.utils.packages.is_installed()
meerschaum.utils.packages.venv_contains_package()
meerschaum.utils.packages.package_venv()
meerschaum.utils.packages.ensure_readline()
meerschaum.utils.packages.get_prerelease_dependencies()
meerschaum.utils.packages
Manage Python packages.
meerschaum.utils.sql.build_where()
meerschaum.utils.sql.clean()
meerschaum.utils.sql.dateadd_str()
meerschaum.utils.sql.test_connection()
meerschaum.utils.sql.get_distinct_col_count()
meerschaum.utils.sql.sql_item_name()
meerschaum.utils.sql.pg_capital()
meerschaum.utils.sql.oracle_capital()
meerschaum.utils.sql.truncate_item_name()
meerschaum.utils.sql.table_exists()
meerschaum.utils.sql.get_table_cols_types()
meerschaum.utils.sql.get_update_queries()
meerschaum.utils.sql.get_null_replacement()
meerschaum.utils.sql.get_db_version()
meerschaum.utils.sql.get_rename_table_queries()
meerschaum.utils.sql.get_create_table_queries()
meerschaum.utils.sql.wrap_query_with_cte()
meerschaum.utils.sql.format_cte_subquery()
meerschaum.utils.sql.session_execute()
meerschaum.utils.sql.get_reset_autoincrement_queries()
meerschaum.utils.sql
Build SQL queries.
meerschaum.utils.venv.Venv
meerschaum.utils.venv.activate_venv()
meerschaum.utils.venv.deactivate_venv()
meerschaum.utils.venv.get_module_venv()
meerschaum.utils.venv.get_venvs()
meerschaum.utils.venv.init_venv()
meerschaum.utils.venv.inside_venv()
meerschaum.utils.venv.is_venv_active()
meerschaum.utils.venv.venv_exec()
meerschaum.utils.venv.venv_executable()
meerschaum.utils.venv.venv_exists()
meerschaum.utils.venv.venv_target_path()
meerschaum.utils.venv.verify_venv()
meerschaum.utils.venv
Manage virtual environments.
meerschaum.utils.warnings
Print warnings, errors, info, and debug messages.
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Copyright 2023 Bennett Meares 7 8Licensed under the Apache License, Version 2.0 (the "License"); 9you may not use this file except in compliance with the License. 10You may obtain a copy of the License at 11 12 http://www.apache.org/licenses/LICENSE-2.0 13 14Unless required by applicable law or agreed to in writing, software 15distributed under the License is distributed on an "AS IS" BASIS, 16WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 17See the License for the specific language governing permissions and 18limitations under the License. 19""" 20 21import atexit 22from meerschaum.utils.typing import SuccessTuple 23from meerschaum.utils.packages import attempt_import 24from meerschaum.core.Pipe import Pipe 25from meerschaum.plugins import Plugin 26from meerschaum.utils.venv import Venv 27from meerschaum.jobs import Job, make_executor 28from meerschaum.connectors import get_connector, Connector, make_connector 29from meerschaum.utils import get_pipes 30from meerschaum.utils.formatting import pprint 31from meerschaum._internal.docs import index as __doc__ 32from meerschaum.config import __version__, get_config 33from meerschaum._internal.entry import entry 34from meerschaum.__main__ import _close_pools 35 36atexit.register(_close_pools) 37 38__pdoc__ = {'gui': False, 'api': False, 'core': False, '_internal': False} 39__all__ = ( 40 "get_pipes", 41 "get_connector", 42 "get_config", 43 "Pipe", 44 "Plugin", 45 "Venv", 46 "Plugin", 47 "Job", 48 "pprint", 49 "attempt_import", 50 "actions", 51 "config", 52 "connectors", 53 "jobs", 54 "plugins", 55 "utils", 56 "SuccessTuple", 57 "Connector", 58 "make_connector", 59 "entry", 60)
19def get_pipes( 20 connector_keys: Union[str, List[str], None] = None, 21 metric_keys: Union[str, List[str], None] = None, 22 location_keys: Union[str, List[str], None] = None, 23 tags: Optional[List[str]] = None, 24 params: Optional[Dict[str, Any]] = None, 25 mrsm_instance: Union[str, InstanceConnector, None] = None, 26 instance: Union[str, InstanceConnector, None] = None, 27 as_list: bool = False, 28 method: str = 'registered', 29 debug: bool = False, 30 **kw: Any 31) -> Union[PipesDict, List[mrsm.Pipe]]: 32 """ 33 Return a dictionary or list of `meerschaum.Pipe` objects. 34 35 Parameters 36 ---------- 37 connector_keys: Union[str, List[str], None], default None 38 String or list of connector keys. 39 If omitted or is `'*'`, fetch all possible keys. 40 If a string begins with `'_'`, select keys that do NOT match the string. 41 42 metric_keys: Union[str, List[str], None], default None 43 String or list of metric keys. See `connector_keys` for formatting. 44 45 location_keys: Union[str, List[str], None], default None 46 String or list of location keys. See `connector_keys` for formatting. 47 48 tags: Optional[List[str]], default None 49 If provided, only include pipes with these tags. 50 51 params: Optional[Dict[str, Any]], default None 52 Dictionary of additional parameters to search by. 53 Params are parsed into a SQL WHERE clause. 54 E.g. `{'a': 1, 'b': 2}` equates to `'WHERE a = 1 AND b = 2'` 55 56 mrsm_instance: Union[str, InstanceConnector, None], default None 57 Connector keys for the Meerschaum instance of the pipes. 58 Must be a `meerschaum.connectors.sql.SQLConnector.SQLConnector` or 59 `meerschaum.connectors.api.APIConnector.APIConnector`. 60 61 as_list: bool, default False 62 If `True`, return pipes in a list instead of a hierarchical dictionary. 63 `False` : `{connector_keys: {metric_key: {location_key: Pipe}}}` 64 `True` : `[Pipe]` 65 66 method: str, default 'registered' 67 Available options: `['registered', 'explicit', 'all']` 68 If `'registered'` (default), create pipes based on registered keys in the connector's pipes table 69 (API or SQL connector, depends on mrsm_instance). 70 If `'explicit'`, create pipes from provided connector_keys, metric_keys, and location_keys 71 instead of consulting the pipes table. Useful for creating non-existent pipes. 72 If `'all'`, create pipes from predefined metrics and locations. Required `connector_keys`. 73 **NOTE:** Method `'all'` is not implemented! 74 75 **kw: Any: 76 Keyword arguments to pass to the `meerschaum.Pipe` constructor. 77 78 79 Returns 80 ------- 81 A dictionary of dictionaries and `meerschaum.Pipe` objects 82 in the connector, metric, location hierarchy. 83 If `as_list` is `True`, return a list of `meerschaum.Pipe` objects. 84 85 Examples 86 -------- 87 ``` 88 >>> ### Manual definition: 89 >>> pipes = { 90 ... <connector_keys>: { 91 ... <metric_key>: { 92 ... <location_key>: Pipe( 93 ... <connector_keys>, 94 ... <metric_key>, 95 ... <location_key>, 96 ... ), 97 ... }, 98 ... }, 99 ... }, 100 >>> ### Accessing a single pipe: 101 >>> pipes['sql:main']['weather'][None] 102 >>> ### Return a list instead: 103 >>> get_pipes(as_list=True) 104 [sql_main_weather] 105 >>> 106 ``` 107 """ 108 109 from meerschaum.config import get_config 110 from meerschaum.utils.warnings import error 111 from meerschaum.utils.misc import filter_keywords 112 113 if connector_keys is None: 114 connector_keys = [] 115 if metric_keys is None: 116 metric_keys = [] 117 if location_keys is None: 118 location_keys = [] 119 if params is None: 120 params = {} 121 if tags is None: 122 tags = [] 123 124 if isinstance(connector_keys, str): 125 connector_keys = [connector_keys] 126 if isinstance(metric_keys, str): 127 metric_keys = [metric_keys] 128 if isinstance(location_keys, str): 129 location_keys = [location_keys] 130 131 ### Get SQL or API connector (keys come from `connector.fetch_pipes_keys()`). 132 if mrsm_instance is None: 133 mrsm_instance = instance 134 if mrsm_instance is None: 135 mrsm_instance = get_config('meerschaum', 'instance', patch=True) 136 if isinstance(mrsm_instance, str): 137 from meerschaum.connectors.parse import parse_instance_keys 138 connector = parse_instance_keys(keys=mrsm_instance, debug=debug) 139 else: 140 from meerschaum.connectors import instance_types 141 valid_connector = False 142 if hasattr(mrsm_instance, 'type'): 143 if mrsm_instance.type in instance_types: 144 valid_connector = True 145 if not valid_connector: 146 error(f"Invalid instance connector: {mrsm_instance}") 147 connector = mrsm_instance 148 if debug: 149 from meerschaum.utils.debug import dprint 150 dprint(f"Using instance connector: {connector}") 151 if not connector: 152 error(f"Could not create connector from keys: '{mrsm_instance}'") 153 154 ### Get a list of tuples for the keys needed to build pipes. 155 result = fetch_pipes_keys( 156 method, 157 connector, 158 connector_keys = connector_keys, 159 metric_keys = metric_keys, 160 location_keys = location_keys, 161 tags = tags, 162 params = params, 163 debug = debug 164 ) 165 if result is None: 166 error(f"Unable to build pipes!") 167 168 ### Populate the `pipes` dictionary with Pipes based on the keys 169 ### obtained from the chosen `method`. 170 from meerschaum import Pipe 171 pipes = {} 172 for ck, mk, lk in result: 173 if ck not in pipes: 174 pipes[ck] = {} 175 176 if mk not in pipes[ck]: 177 pipes[ck][mk] = {} 178 179 pipes[ck][mk][lk] = Pipe( 180 ck, mk, lk, 181 mrsm_instance = connector, 182 debug = debug, 183 **filter_keywords(Pipe, **kw) 184 ) 185 186 if not as_list: 187 return pipes 188 from meerschaum.utils.misc import flatten_pipes_dict 189 return flatten_pipes_dict(pipes)
Return a dictionary or list of meerschaum.Pipe
objects.
Parameters
- connector_keys (Union[str, List[str], None], default None):
String or list of connector keys.
If omitted or is
'*'
, fetch all possible keys. If a string begins with'_'
, select keys that do NOT match the string. - metric_keys (Union[str, List[str], None], default None):
String or list of metric keys. See
connector_keys
for formatting. - location_keys (Union[str, List[str], None], default None):
String or list of location keys. See
connector_keys
for formatting. - tags (Optional[List[str]], default None): If provided, only include pipes with these tags.
- params (Optional[Dict[str, Any]], default None):
Dictionary of additional parameters to search by.
Params are parsed into a SQL WHERE clause.
E.g.
{'a': 1, 'b': 2}
equates to'WHERE a = 1 AND b = 2'
- mrsm_instance (Union[str, InstanceConnector, None], default None):
Connector keys for the Meerschaum instance of the pipes.
Must be a
meerschaum.connectors.sql.SQLConnector.SQLConnector
ormeerschaum.connectors.api.APIConnector.APIConnector
. - as_list (bool, default False):
If
True
, return pipes in a list instead of a hierarchical dictionary.False
:{connector_keys: {metric_key: {location_key: Pipe}}}
True
:[Pipe]
- method (str, default 'registered'):
Available options:
['registered', 'explicit', 'all']
If'registered'
(default), create pipes based on registered keys in the connector's pipes table (API or SQL connector, depends on mrsm_instance). If'explicit'
, create pipes from provided connector_keys, metric_keys, and location_keys instead of consulting the pipes table. Useful for creating non-existent pipes. If'all'
, create pipes from predefined metrics and locations. Requiredconnector_keys
. NOTE: Method'all'
is not implemented! - **kw (Any:):
Keyword arguments to pass to the
meerschaum.Pipe
constructor.
Returns
- A dictionary of dictionaries and
meerschaum.Pipe
objects - in the connector, metric, location hierarchy.
- If
as_list
isTrue
, return a list ofmeerschaum.Pipe
objects.
Examples
>>> ### Manual definition:
>>> pipes = {
... <connector_keys>: {
... <metric_key>: {
... <location_key>: Pipe(
... <connector_keys>,
... <metric_key>,
... <location_key>,
... ),
... },
... },
... },
>>> ### Accessing a single pipe:
>>> pipes['sql:main']['weather'][None]
>>> ### Return a list instead:
>>> get_pipes(as_list=True)
[sql_main_weather]
>>>
80def get_connector( 81 type: str = None, 82 label: str = None, 83 refresh: bool = False, 84 debug: bool = False, 85 **kw: Any 86) -> Connector: 87 """ 88 Return existing connector or create new connection and store for reuse. 89 90 You can create new connectors if enough parameters are provided for the given type and flavor. 91 92 93 Parameters 94 ---------- 95 type: Optional[str], default None 96 Connector type (sql, api, etc.). 97 Defaults to the type of the configured `instance_connector`. 98 99 label: Optional[str], default None 100 Connector label (e.g. main). Defaults to `'main'`. 101 102 refresh: bool, default False 103 Refresh the Connector instance / construct new object. Defaults to `False`. 104 105 kw: Any 106 Other arguments to pass to the Connector constructor. 107 If the Connector has already been constructed and new arguments are provided, 108 `refresh` is set to `True` and the old Connector is replaced. 109 110 Returns 111 ------- 112 A new Meerschaum connector (e.g. `meerschaum.connectors.api.APIConnector`, 113 `meerschaum.connectors.sql.SQLConnector`). 114 115 Examples 116 -------- 117 The following parameters would create a new 118 `meerschaum.connectors.sql.SQLConnector` that isn't in the configuration file. 119 120 ``` 121 >>> conn = get_connector( 122 ... type = 'sql', 123 ... label = 'newlabel', 124 ... flavor = 'sqlite', 125 ... database = '/file/path/to/database.db' 126 ... ) 127 >>> 128 ``` 129 130 """ 131 from meerschaum.connectors.parse import parse_instance_keys 132 from meerschaum.config import get_config 133 from meerschaum.config.static import STATIC_CONFIG 134 from meerschaum.utils.warnings import warn 135 global _loaded_plugin_connectors 136 if isinstance(type, str) and not label and ':' in type: 137 type, label = type.split(':', maxsplit=1) 138 139 with _locks['_loaded_plugin_connectors']: 140 if not _loaded_plugin_connectors: 141 load_plugin_connectors() 142 _load_builtin_custom_connectors() 143 _loaded_plugin_connectors = True 144 145 if type is None and label is None: 146 default_instance_keys = get_config('meerschaum', 'instance', patch=True) 147 ### recursive call to get_connector 148 return parse_instance_keys(default_instance_keys) 149 150 ### NOTE: the default instance connector may not be main. 151 ### Only fall back to 'main' if the type is provided by the label is omitted. 152 label = label if label is not None else STATIC_CONFIG['connectors']['default_label'] 153 154 ### type might actually be a label. Check if so and raise a warning. 155 if type not in connectors: 156 possibilities, poss_msg = [], "" 157 for _type in get_config('meerschaum', 'connectors'): 158 if type in get_config('meerschaum', 'connectors', _type): 159 possibilities.append(f"{_type}:{type}") 160 if len(possibilities) > 0: 161 poss_msg = " Did you mean" 162 for poss in possibilities[:-1]: 163 poss_msg += f" '{poss}'," 164 if poss_msg.endswith(','): 165 poss_msg = poss_msg[:-1] 166 if len(possibilities) > 1: 167 poss_msg += " or" 168 poss_msg += f" '{possibilities[-1]}'?" 169 170 warn(f"Cannot create Connector of type '{type}'." + poss_msg, stack=False) 171 return None 172 173 if 'sql' not in types: 174 from meerschaum.connectors.plugin import PluginConnector 175 from meerschaum.connectors.valkey import ValkeyConnector 176 with _locks['types']: 177 types.update({ 178 'api': APIConnector, 179 'sql': SQLConnector, 180 'plugin': PluginConnector, 181 'valkey': ValkeyConnector, 182 }) 183 184 ### determine if we need to call the constructor 185 if not refresh: 186 ### see if any user-supplied arguments differ from the existing instance 187 if label in connectors[type]: 188 warning_message = None 189 for attribute, value in kw.items(): 190 if attribute not in connectors[type][label].meta: 191 import inspect 192 cls = connectors[type][label].__class__ 193 cls_init_signature = inspect.signature(cls) 194 cls_init_params = cls_init_signature.parameters 195 if attribute not in cls_init_params: 196 warning_message = ( 197 f"Received new attribute '{attribute}' not present in connector " + 198 f"{connectors[type][label]}.\n" 199 ) 200 elif connectors[type][label].__dict__[attribute] != value: 201 warning_message = ( 202 f"Mismatched values for attribute '{attribute}' in connector " 203 + f"'{connectors[type][label]}'.\n" + 204 f" - Keyword value: '{value}'\n" + 205 f" - Existing value: '{connectors[type][label].__dict__[attribute]}'\n" 206 ) 207 if warning_message is not None: 208 warning_message += ( 209 "\nSetting `refresh` to True and recreating connector with type:" 210 + f" '{type}' and label '{label}'." 211 ) 212 refresh = True 213 warn(warning_message) 214 else: ### connector doesn't yet exist 215 refresh = True 216 217 ### only create an object if refresh is True 218 ### (can be manually specified, otherwise determined above) 219 if refresh: 220 with _locks['connectors']: 221 try: 222 ### will raise an error if configuration is incorrect / missing 223 conn = types[type](label=label, **kw) 224 connectors[type][label] = conn 225 except InvalidAttributesError as ie: 226 warn( 227 f"Incorrect attributes for connector '{type}:{label}'.\n" 228 + str(ie), 229 stack = False, 230 ) 231 conn = None 232 except Exception as e: 233 from meerschaum.utils.formatting import get_console 234 console = get_console() 235 if console: 236 console.print_exception() 237 warn( 238 f"Exception when creating connector '{type}:{label}'.\n" + str(e), 239 stack = False, 240 ) 241 conn = None 242 if conn is None: 243 return None 244 245 return connectors[type][label]
Return existing connector or create new connection and store for reuse.
You can create new connectors if enough parameters are provided for the given type and flavor.
Parameters
- type (Optional[str], default None):
Connector type (sql, api, etc.).
Defaults to the type of the configured
instance_connector
. - label (Optional[str], default None):
Connector label (e.g. main). Defaults to
'main'
. - refresh (bool, default False):
Refresh the Connector instance / construct new object. Defaults to
False
. - kw (Any):
Other arguments to pass to the Connector constructor.
If the Connector has already been constructed and new arguments are provided,
refresh
is set toTrue
and the old Connector is replaced.
Returns
- A new Meerschaum connector (e.g.
meerschaum.connectors.api.APIConnector
, meerschaum.connectors.sql.SQLConnector
).
Examples
The following parameters would create a new
meerschaum.connectors.sql.SQLConnector
that isn't in the configuration file.
>>> conn = get_connector(
... type = 'sql',
... label = 'newlabel',
... flavor = 'sqlite',
... database = '/file/path/to/database.db'
... )
>>>
82def get_config( 83 *keys: str, 84 patch: bool = True, 85 substitute: bool = True, 86 sync_files: bool = True, 87 write_missing: bool = True, 88 as_tuple: bool = False, 89 warn: bool = True, 90 debug: bool = False 91) -> Any: 92 """ 93 Return the Meerschaum configuration dictionary. 94 If positional arguments are provided, index by the keys. 95 Raises a warning if invalid keys are provided. 96 97 Parameters 98 ---------- 99 keys: str: 100 List of strings to index. 101 102 patch: bool, default True 103 If `True`, patch missing default keys into the config directory. 104 Defaults to `True`. 105 106 sync_files: bool, default True 107 If `True`, sync files if needed. 108 Defaults to `True`. 109 110 write_missing: bool, default True 111 If `True`, write default values when the main config files are missing. 112 Defaults to `True`. 113 114 substitute: bool, default True 115 If `True`, subsitute 'MRSM{}' values. 116 Defaults to `True`. 117 118 as_tuple: bool, default False 119 If `True`, return a tuple of type (success, value). 120 Defaults to `False`. 121 122 Returns 123 ------- 124 The value in the configuration directory, indexed by the provided keys. 125 126 Examples 127 -------- 128 >>> get_config('meerschaum', 'instance') 129 'sql:main' 130 >>> get_config('does', 'not', 'exist') 131 UserWarning: Invalid keys in config: ('does', 'not', 'exist') 132 """ 133 import json 134 135 symlinks_key = STATIC_CONFIG['config']['symlinks_key'] 136 if debug: 137 from meerschaum.utils.debug import dprint 138 dprint(f"Indexing keys: {keys}", color=False) 139 140 if len(keys) == 0: 141 _rc = _config(substitute=substitute, sync_files=sync_files, write_missing=write_missing) 142 if as_tuple: 143 return True, _rc 144 return _rc 145 146 ### Weird threading issues, only import if substitute is True. 147 if substitute: 148 from meerschaum.config._read_config import search_and_substitute_config 149 ### Invalidate the cache if it was read before with substitute=False 150 ### but there still exist substitutions. 151 if ( 152 config is not None and substitute and keys[0] != symlinks_key 153 and 'MRSM{' in json.dumps(config.get(keys[0])) 154 ): 155 try: 156 _subbed = search_and_substitute_config({keys[0]: config[keys[0]]}) 157 except Exception as e: 158 import traceback 159 traceback.print_exc() 160 config[keys[0]] = _subbed[keys[0]] 161 if symlinks_key in _subbed: 162 if symlinks_key not in config: 163 config[symlinks_key] = {} 164 if keys[0] not in config[symlinks_key]: 165 config[symlinks_key][keys[0]] = {} 166 config[symlinks_key][keys[0]] = apply_patch_to_config( 167 _subbed, 168 config[symlinks_key][keys[0]] 169 ) 170 171 from meerschaum.config._sync import sync_files as _sync_files 172 if config is None: 173 _config(*keys, sync_files=sync_files) 174 175 invalid_keys = False 176 if keys[0] not in config and keys[0] != symlinks_key: 177 single_key_config = read_config( 178 keys=[keys[0]], substitute=substitute, write_missing=write_missing 179 ) 180 if keys[0] not in single_key_config: 181 invalid_keys = True 182 else: 183 config[keys[0]] = single_key_config.get(keys[0], None) 184 if symlinks_key in single_key_config and keys[0] in single_key_config[symlinks_key]: 185 if symlinks_key not in config: 186 config[symlinks_key] = {} 187 config[symlinks_key][keys[0]] = single_key_config[symlinks_key][keys[0]] 188 189 if sync_files: 190 _sync_files(keys=[keys[0]]) 191 192 c = config 193 if len(keys) > 0: 194 for k in keys: 195 try: 196 c = c[k] 197 except Exception as e: 198 invalid_keys = True 199 break 200 if invalid_keys: 201 ### Check if the keys are in the default configuration. 202 from meerschaum.config._default import default_config 203 in_default = True 204 patched_default_config = ( 205 search_and_substitute_config(default_config) 206 if substitute else copy.deepcopy(default_config) 207 ) 208 _c = patched_default_config 209 for k in keys: 210 try: 211 _c = _c[k] 212 except Exception as e: 213 in_default = False 214 if in_default: 215 c = _c 216 invalid_keys = False 217 warning_msg = f"Invalid keys in config: {keys}" 218 if not in_default: 219 try: 220 if warn: 221 from meerschaum.utils.warnings import warn as _warn 222 _warn(warning_msg, stacklevel=3, color=False) 223 except Exception as e: 224 if warn: 225 print(warning_msg) 226 if as_tuple: 227 return False, None 228 return None 229 230 ### Don't write keys that we haven't yet loaded into memory. 231 not_loaded_keys = [k for k in patched_default_config if k not in config] 232 for k in not_loaded_keys: 233 patched_default_config.pop(k, None) 234 235 set_config( 236 apply_patch_to_config( 237 patched_default_config, 238 config, 239 ) 240 ) 241 if patch and keys[0] != symlinks_key: 242 if write_missing: 243 write_config(config, debug=debug) 244 245 if as_tuple: 246 return (not invalid_keys), c 247 return c
Return the Meerschaum configuration dictionary. If positional arguments are provided, index by the keys. Raises a warning if invalid keys are provided.
Parameters
- keys (str:): List of strings to index.
- patch (bool, default True):
If
True
, patch missing default keys into the config directory. Defaults toTrue
. - sync_files (bool, default True):
If
True
, sync files if needed. Defaults toTrue
. - write_missing (bool, default True):
If
True
, write default values when the main config files are missing. Defaults toTrue
. - substitute (bool, default True):
If
True
, subsitute 'MRSM{}' values. Defaults toTrue
. - as_tuple (bool, default False):
If
True
, return a tuple of type (success, value). Defaults toFalse
.
Returns
- The value in the configuration directory, indexed by the provided keys.
Examples
>>> get_config('meerschaum', 'instance')
'sql:main'
>>> get_config('does', 'not', 'exist')
UserWarning: Invalid keys in config: ('does', 'not', 'exist')
60class Pipe: 61 """ 62 Access Meerschaum pipes via Pipe objects. 63 64 Pipes are identified by the following: 65 66 1. Connector keys (e.g. `'sql:main'`) 67 2. Metric key (e.g. `'weather'`) 68 3. Location (optional; e.g. `None`) 69 70 A pipe's connector keys correspond to a data source, and when the pipe is synced, 71 its `fetch` definition is evaluated and executed to produce new data. 72 73 Alternatively, new data may be directly synced via `pipe.sync()`: 74 75 ``` 76 >>> from meerschaum import Pipe 77 >>> pipe = Pipe('csv', 'weather') 78 >>> 79 >>> import pandas as pd 80 >>> df = pd.read_csv('weather.csv') 81 >>> pipe.sync(df) 82 ``` 83 """ 84 85 from ._fetch import ( 86 fetch, 87 get_backtrack_interval, 88 ) 89 from ._data import ( 90 get_data, 91 get_backtrack_data, 92 get_rowcount, 93 _get_data_as_iterator, 94 get_chunk_interval, 95 get_chunk_bounds, 96 parse_date_bounds, 97 ) 98 from ._register import register 99 from ._attributes import ( 100 attributes, 101 parameters, 102 columns, 103 indices, 104 indexes, 105 dtypes, 106 autoincrement, 107 upsert, 108 static, 109 tzinfo, 110 enforce, 111 null_indices, 112 get_columns, 113 get_columns_types, 114 get_columns_indices, 115 get_indices, 116 tags, 117 get_id, 118 id, 119 get_val_column, 120 parents, 121 children, 122 target, 123 _target_legacy, 124 guess_datetime, 125 ) 126 from ._show import show 127 from ._edit import edit, edit_definition, update 128 from ._sync import ( 129 sync, 130 get_sync_time, 131 exists, 132 filter_existing, 133 _get_chunk_label, 134 get_num_workers, 135 _persist_new_json_columns, 136 _persist_new_numeric_columns, 137 _persist_new_uuid_columns, 138 _persist_new_bytes_columns, 139 ) 140 from ._verify import ( 141 verify, 142 get_bound_interval, 143 get_bound_time, 144 ) 145 from ._delete import delete 146 from ._drop import drop, drop_indices 147 from ._index import create_indices 148 from ._clear import clear 149 from ._deduplicate import deduplicate 150 from ._bootstrap import bootstrap 151 from ._dtypes import enforce_dtypes, infer_dtypes 152 from ._copy import copy_to 153 154 def __init__( 155 self, 156 connector: str = '', 157 metric: str = '', 158 location: Optional[str] = None, 159 parameters: Optional[Dict[str, Any]] = None, 160 columns: Union[Dict[str, str], List[str], None] = None, 161 indices: Optional[Dict[str, Union[str, List[str]]]] = None, 162 tags: Optional[List[str]] = None, 163 target: Optional[str] = None, 164 dtypes: Optional[Dict[str, str]] = None, 165 instance: Optional[Union[str, InstanceConnector]] = None, 166 temporary: bool = False, 167 upsert: Optional[bool] = None, 168 autoincrement: Optional[bool] = None, 169 static: Optional[bool] = None, 170 enforce: Optional[bool] = None, 171 null_indices: Optional[bool] = None, 172 mrsm_instance: Optional[Union[str, InstanceConnector]] = None, 173 cache: bool = False, 174 debug: bool = False, 175 connector_keys: Optional[str] = None, 176 metric_key: Optional[str] = None, 177 location_key: Optional[str] = None, 178 instance_keys: Optional[str] = None, 179 indexes: Union[Dict[str, str], List[str], None] = None, 180 ): 181 """ 182 Parameters 183 ---------- 184 connector: str 185 Keys for the pipe's source connector, e.g. `'sql:main'`. 186 187 metric: str 188 Label for the pipe's contents, e.g. `'weather'`. 189 190 location: str, default None 191 Label for the pipe's location. Defaults to `None`. 192 193 parameters: Optional[Dict[str, Any]], default None 194 Optionally set a pipe's parameters from the constructor, 195 e.g. columns and other attributes. 196 You can edit these parameters with `edit pipes`. 197 198 columns: Union[Dict[str, str], List[str], None], default None 199 Set the `columns` dictionary of `parameters`. 200 If `parameters` is also provided, this dictionary is added under the `'columns'` key. 201 202 indices: Optional[Dict[str, Union[str, List[str]]]], default None 203 Set the `indices` dictionary of `parameters`. 204 If `parameters` is also provided, this dictionary is added under the `'indices'` key. 205 206 tags: Optional[List[str]], default None 207 A list of strings to be added under the `'tags'` key of `parameters`. 208 You can select pipes with certain tags using `--tags`. 209 210 dtypes: Optional[Dict[str, str]], default None 211 Set the `dtypes` dictionary of `parameters`. 212 If `parameters` is also provided, this dictionary is added under the `'dtypes'` key. 213 214 mrsm_instance: Optional[Union[str, InstanceConnector]], default None 215 Connector for the Meerschaum instance where the pipe resides. 216 Defaults to the preconfigured default instance (`'sql:main'`). 217 218 instance: Optional[Union[str, InstanceConnector]], default None 219 Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored. 220 221 upsert: Optional[bool], default None 222 If `True`, set `upsert` to `True` in the parameters. 223 224 autoincrement: Optional[bool], default None 225 If `True`, set `autoincrement` in the parameters. 226 227 static: Optional[bool], default None 228 If `True`, set `static` in the parameters. 229 230 enforce: Optional[bool], default None 231 If `False`, skip data type enforcement. 232 Default behavior is `True`. 233 234 null_indices: Optional[bool], default None 235 Set to `False` if there will be no null values in the index columns. 236 Defaults to `True`. 237 238 temporary: bool, default False 239 If `True`, prevent instance tables (pipes, users, plugins) from being created. 240 241 cache: bool, default False 242 If `True`, cache fetched data into a local database file. 243 Defaults to `False`. 244 """ 245 from meerschaum.utils.warnings import error, warn 246 if (not connector and not connector_keys) or (not metric and not metric_key): 247 error( 248 "Please provide strings for the connector and metric\n " 249 + "(first two positional arguments)." 250 ) 251 252 ### Fall back to legacy `location_key` just in case. 253 if not location: 254 location = location_key 255 256 if not connector: 257 connector = connector_keys 258 259 if not metric: 260 metric = metric_key 261 262 if location in ('[None]', 'None'): 263 location = None 264 265 from meerschaum.config.static import STATIC_CONFIG 266 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 267 for k in (connector, metric, location, *(tags or [])): 268 if str(k).startswith(negation_prefix): 269 error(f"A pipe's keys and tags cannot start with the prefix '{negation_prefix}'.") 270 271 self.connector_keys = str(connector) 272 self.connector_key = self.connector_keys ### Alias 273 self.metric_key = metric 274 self.location_key = location 275 self.temporary = temporary 276 277 self._attributes = { 278 'connector_keys': self.connector_keys, 279 'metric_key': self.metric_key, 280 'location_key': self.location_key, 281 'parameters': {}, 282 } 283 284 ### only set parameters if values are provided 285 if isinstance(parameters, dict): 286 self._attributes['parameters'] = parameters 287 else: 288 if parameters is not None: 289 warn(f"The provided parameters are of invalid type '{type(parameters)}'.") 290 self._attributes['parameters'] = {} 291 292 columns = columns or self._attributes.get('parameters', {}).get('columns', {}) 293 if isinstance(columns, list): 294 columns = {str(col): str(col) for col in columns} 295 if isinstance(columns, dict): 296 self._attributes['parameters']['columns'] = columns 297 elif columns is not None: 298 warn(f"The provided columns are of invalid type '{type(columns)}'.") 299 300 indices = ( 301 indices 302 or indexes 303 or self._attributes.get('parameters', {}).get('indices', None) 304 or self._attributes.get('parameters', {}).get('indexes', None) 305 ) 306 if isinstance(indices, dict): 307 indices_key = ( 308 'indexes' 309 if 'indexes' in self._attributes['parameters'] 310 else 'indices' 311 ) 312 self._attributes['parameters'][indices_key] = indices 313 314 if isinstance(tags, (list, tuple)): 315 self._attributes['parameters']['tags'] = tags 316 elif tags is not None: 317 warn(f"The provided tags are of invalid type '{type(tags)}'.") 318 319 if isinstance(target, str): 320 self._attributes['parameters']['target'] = target 321 elif target is not None: 322 warn(f"The provided target is of invalid type '{type(target)}'.") 323 324 if isinstance(dtypes, dict): 325 self._attributes['parameters']['dtypes'] = dtypes 326 elif dtypes is not None: 327 warn(f"The provided dtypes are of invalid type '{type(dtypes)}'.") 328 329 if isinstance(upsert, bool): 330 self._attributes['parameters']['upsert'] = upsert 331 332 if isinstance(autoincrement, bool): 333 self._attributes['parameters']['autoincrement'] = autoincrement 334 335 if isinstance(static, bool): 336 self._attributes['parameters']['static'] = static 337 338 if isinstance(enforce, bool): 339 self._attributes['parameters']['enforce'] = enforce 340 341 if isinstance(null_indices, bool): 342 self._attributes['parameters']['null_indices'] = null_indices 343 344 ### NOTE: The parameters dictionary is {} by default. 345 ### A Pipe may be registered without parameters, then edited, 346 ### or a Pipe may be registered with parameters set in-memory first. 347 _mrsm_instance = mrsm_instance if mrsm_instance is not None else (instance or instance_keys) 348 if _mrsm_instance is None: 349 _mrsm_instance = get_config('meerschaum', 'instance', patch=True) 350 351 if not isinstance(_mrsm_instance, str): 352 self._instance_connector = _mrsm_instance 353 self.instance_keys = str(_mrsm_instance) 354 else: ### NOTE: must be SQL or API Connector for this work 355 self.instance_keys = _mrsm_instance 356 357 self._cache = cache and get_config('system', 'experimental', 'cache') 358 359 @property 360 def meta(self): 361 """ 362 Return the four keys needed to reconstruct this pipe. 363 """ 364 return { 365 'connector_keys': self.connector_keys, 366 'metric_key': self.metric_key, 367 'location_key': self.location_key, 368 'instance_keys': self.instance_keys, 369 } 370 371 def keys(self) -> List[str]: 372 """ 373 Return the ordered keys for this pipe. 374 """ 375 return { 376 key: val 377 for key, val in self.meta.items() 378 if key != 'instance' 379 } 380 381 @property 382 def instance_connector(self) -> Union[InstanceConnector, None]: 383 """ 384 The connector to where this pipe resides. 385 May either be of type `meerschaum.connectors.sql.SQLConnector` or 386 `meerschaum.connectors.api.APIConnector`. 387 """ 388 if '_instance_connector' not in self.__dict__: 389 from meerschaum.connectors.parse import parse_instance_keys 390 conn = parse_instance_keys(self.instance_keys) 391 if conn: 392 self._instance_connector = conn 393 else: 394 return None 395 return self._instance_connector 396 397 @property 398 def connector(self) -> Union[meerschaum.connectors.Connector, None]: 399 """ 400 The connector to the data source. 401 """ 402 if '_connector' not in self.__dict__: 403 from meerschaum.connectors.parse import parse_instance_keys 404 import warnings 405 with warnings.catch_warnings(): 406 warnings.simplefilter('ignore') 407 try: 408 conn = parse_instance_keys(self.connector_keys) 409 except Exception: 410 conn = None 411 if conn: 412 self._connector = conn 413 else: 414 return None 415 return self._connector 416 417 @property 418 def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]: 419 """ 420 If the pipe was created with `cache=True`, return the connector to the pipe's 421 SQLite database for caching. 422 """ 423 if not self._cache: 424 return None 425 426 if '_cache_connector' not in self.__dict__: 427 from meerschaum.connectors import get_connector 428 from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH 429 _resources_path = SQLITE_RESOURCES_PATH 430 self._cache_connector = get_connector( 431 'sql', '_cache_' + str(self), 432 flavor='sqlite', 433 database=str(_resources_path / ('_cache_' + str(self) + '.db')), 434 ) 435 436 return self._cache_connector 437 438 @property 439 def cache_pipe(self) -> Union['meerschaum.Pipe', None]: 440 """ 441 If the pipe was created with `cache=True`, return another `meerschaum.Pipe` used to 442 manage the local data. 443 """ 444 if self.cache_connector is None: 445 return None 446 if '_cache_pipe' not in self.__dict__: 447 from meerschaum.config._patch import apply_patch_to_config 448 from meerschaum.utils.sql import sql_item_name 449 _parameters = copy.deepcopy(self.parameters) 450 _fetch_patch = { 451 'fetch': ({ 452 'definition': ( 453 "SELECT * FROM " 454 + sql_item_name( 455 str(self.target), 456 self.instance_connector.flavor, 457 self.instance_connector.get_pipe_schema(self), 458 ) 459 ), 460 }) if self.instance_connector.type == 'sql' else ({ 461 'connector_keys': self.connector_keys, 462 'metric_key': self.metric_key, 463 'location_key': self.location_key, 464 }) 465 } 466 _parameters = apply_patch_to_config(_parameters, _fetch_patch) 467 self._cache_pipe = Pipe( 468 self.instance_keys, 469 (self.connector_keys + '_' + self.metric_key + '_cache'), 470 self.location_key, 471 mrsm_instance = self.cache_connector, 472 parameters = _parameters, 473 cache = False, 474 temporary = True, 475 ) 476 477 return self._cache_pipe 478 479 def __str__(self, ansi: bool=False): 480 return pipe_repr(self, ansi=ansi) 481 482 def __eq__(self, other): 483 try: 484 return ( 485 isinstance(self, type(other)) 486 and self.connector_keys == other.connector_keys 487 and self.metric_key == other.metric_key 488 and self.location_key == other.location_key 489 and self.instance_keys == other.instance_keys 490 ) 491 except Exception: 492 return False 493 494 def __hash__(self): 495 ### Using an esoteric separator to avoid collisions. 496 sep = "[\"']" 497 return hash( 498 str(self.connector_keys) + sep 499 + str(self.metric_key) + sep 500 + str(self.location_key) + sep 501 + str(self.instance_keys) + sep 502 ) 503 504 def __repr__(self, ansi: bool=True, **kw) -> str: 505 if not hasattr(sys, 'ps1'): 506 ansi = False 507 508 return pipe_repr(self, ansi=ansi, **kw) 509 510 def __pt_repr__(self): 511 from meerschaum.utils.packages import attempt_import 512 prompt_toolkit_formatted_text = attempt_import('prompt_toolkit.formatted_text', lazy=False) 513 return prompt_toolkit_formatted_text.ANSI(pipe_repr(self, ansi=True)) 514 515 def __getstate__(self) -> Dict[str, Any]: 516 """ 517 Define the state dictionary (pickling). 518 """ 519 return { 520 'connector_keys': self.connector_keys, 521 'metric_key': self.metric_key, 522 'location_key': self.location_key, 523 'parameters': self.parameters, 524 'instance_keys': self.instance_keys, 525 } 526 527 def __setstate__(self, _state: Dict[str, Any]): 528 """ 529 Read the state (unpickling). 530 """ 531 self.__init__(**_state) 532 533 def __getitem__(self, key: str) -> Any: 534 """ 535 Index the pipe's attributes. 536 If the `key` cannot be found`, return `None`. 537 """ 538 if key in self.attributes: 539 return self.attributes.get(key, None) 540 541 aliases = { 542 'connector': 'connector_keys', 543 'connector_key': 'connector_keys', 544 'metric': 'metric_key', 545 'location': 'location_key', 546 } 547 aliased_key = aliases.get(key, None) 548 if aliased_key is not None: 549 return self.attributes.get(aliased_key, None) 550 551 property_aliases = { 552 'instance': 'instance_keys', 553 'instance_key': 'instance_keys', 554 } 555 aliased_key = property_aliases.get(key, None) 556 if aliased_key is not None: 557 key = aliased_key 558 return getattr(self, key, None)
Access Meerschaum pipes via Pipe objects.
Pipes are identified by the following:
- Connector keys (e.g.
'sql:main'
) - Metric key (e.g.
'weather'
) - Location (optional; e.g.
None
)
A pipe's connector keys correspond to a data source, and when the pipe is synced,
its fetch
definition is evaluated and executed to produce new data.
Alternatively, new data may be directly synced via pipe.sync()
:
>>> from meerschaum import Pipe
>>> pipe = Pipe('csv', 'weather')
>>>
>>> import pandas as pd
>>> df = pd.read_csv('weather.csv')
>>> pipe.sync(df)
154 def __init__( 155 self, 156 connector: str = '', 157 metric: str = '', 158 location: Optional[str] = None, 159 parameters: Optional[Dict[str, Any]] = None, 160 columns: Union[Dict[str, str], List[str], None] = None, 161 indices: Optional[Dict[str, Union[str, List[str]]]] = None, 162 tags: Optional[List[str]] = None, 163 target: Optional[str] = None, 164 dtypes: Optional[Dict[str, str]] = None, 165 instance: Optional[Union[str, InstanceConnector]] = None, 166 temporary: bool = False, 167 upsert: Optional[bool] = None, 168 autoincrement: Optional[bool] = None, 169 static: Optional[bool] = None, 170 enforce: Optional[bool] = None, 171 null_indices: Optional[bool] = None, 172 mrsm_instance: Optional[Union[str, InstanceConnector]] = None, 173 cache: bool = False, 174 debug: bool = False, 175 connector_keys: Optional[str] = None, 176 metric_key: Optional[str] = None, 177 location_key: Optional[str] = None, 178 instance_keys: Optional[str] = None, 179 indexes: Union[Dict[str, str], List[str], None] = None, 180 ): 181 """ 182 Parameters 183 ---------- 184 connector: str 185 Keys for the pipe's source connector, e.g. `'sql:main'`. 186 187 metric: str 188 Label for the pipe's contents, e.g. `'weather'`. 189 190 location: str, default None 191 Label for the pipe's location. Defaults to `None`. 192 193 parameters: Optional[Dict[str, Any]], default None 194 Optionally set a pipe's parameters from the constructor, 195 e.g. columns and other attributes. 196 You can edit these parameters with `edit pipes`. 197 198 columns: Union[Dict[str, str], List[str], None], default None 199 Set the `columns` dictionary of `parameters`. 200 If `parameters` is also provided, this dictionary is added under the `'columns'` key. 201 202 indices: Optional[Dict[str, Union[str, List[str]]]], default None 203 Set the `indices` dictionary of `parameters`. 204 If `parameters` is also provided, this dictionary is added under the `'indices'` key. 205 206 tags: Optional[List[str]], default None 207 A list of strings to be added under the `'tags'` key of `parameters`. 208 You can select pipes with certain tags using `--tags`. 209 210 dtypes: Optional[Dict[str, str]], default None 211 Set the `dtypes` dictionary of `parameters`. 212 If `parameters` is also provided, this dictionary is added under the `'dtypes'` key. 213 214 mrsm_instance: Optional[Union[str, InstanceConnector]], default None 215 Connector for the Meerschaum instance where the pipe resides. 216 Defaults to the preconfigured default instance (`'sql:main'`). 217 218 instance: Optional[Union[str, InstanceConnector]], default None 219 Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored. 220 221 upsert: Optional[bool], default None 222 If `True`, set `upsert` to `True` in the parameters. 223 224 autoincrement: Optional[bool], default None 225 If `True`, set `autoincrement` in the parameters. 226 227 static: Optional[bool], default None 228 If `True`, set `static` in the parameters. 229 230 enforce: Optional[bool], default None 231 If `False`, skip data type enforcement. 232 Default behavior is `True`. 233 234 null_indices: Optional[bool], default None 235 Set to `False` if there will be no null values in the index columns. 236 Defaults to `True`. 237 238 temporary: bool, default False 239 If `True`, prevent instance tables (pipes, users, plugins) from being created. 240 241 cache: bool, default False 242 If `True`, cache fetched data into a local database file. 243 Defaults to `False`. 244 """ 245 from meerschaum.utils.warnings import error, warn 246 if (not connector and not connector_keys) or (not metric and not metric_key): 247 error( 248 "Please provide strings for the connector and metric\n " 249 + "(first two positional arguments)." 250 ) 251 252 ### Fall back to legacy `location_key` just in case. 253 if not location: 254 location = location_key 255 256 if not connector: 257 connector = connector_keys 258 259 if not metric: 260 metric = metric_key 261 262 if location in ('[None]', 'None'): 263 location = None 264 265 from meerschaum.config.static import STATIC_CONFIG 266 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 267 for k in (connector, metric, location, *(tags or [])): 268 if str(k).startswith(negation_prefix): 269 error(f"A pipe's keys and tags cannot start with the prefix '{negation_prefix}'.") 270 271 self.connector_keys = str(connector) 272 self.connector_key = self.connector_keys ### Alias 273 self.metric_key = metric 274 self.location_key = location 275 self.temporary = temporary 276 277 self._attributes = { 278 'connector_keys': self.connector_keys, 279 'metric_key': self.metric_key, 280 'location_key': self.location_key, 281 'parameters': {}, 282 } 283 284 ### only set parameters if values are provided 285 if isinstance(parameters, dict): 286 self._attributes['parameters'] = parameters 287 else: 288 if parameters is not None: 289 warn(f"The provided parameters are of invalid type '{type(parameters)}'.") 290 self._attributes['parameters'] = {} 291 292 columns = columns or self._attributes.get('parameters', {}).get('columns', {}) 293 if isinstance(columns, list): 294 columns = {str(col): str(col) for col in columns} 295 if isinstance(columns, dict): 296 self._attributes['parameters']['columns'] = columns 297 elif columns is not None: 298 warn(f"The provided columns are of invalid type '{type(columns)}'.") 299 300 indices = ( 301 indices 302 or indexes 303 or self._attributes.get('parameters', {}).get('indices', None) 304 or self._attributes.get('parameters', {}).get('indexes', None) 305 ) 306 if isinstance(indices, dict): 307 indices_key = ( 308 'indexes' 309 if 'indexes' in self._attributes['parameters'] 310 else 'indices' 311 ) 312 self._attributes['parameters'][indices_key] = indices 313 314 if isinstance(tags, (list, tuple)): 315 self._attributes['parameters']['tags'] = tags 316 elif tags is not None: 317 warn(f"The provided tags are of invalid type '{type(tags)}'.") 318 319 if isinstance(target, str): 320 self._attributes['parameters']['target'] = target 321 elif target is not None: 322 warn(f"The provided target is of invalid type '{type(target)}'.") 323 324 if isinstance(dtypes, dict): 325 self._attributes['parameters']['dtypes'] = dtypes 326 elif dtypes is not None: 327 warn(f"The provided dtypes are of invalid type '{type(dtypes)}'.") 328 329 if isinstance(upsert, bool): 330 self._attributes['parameters']['upsert'] = upsert 331 332 if isinstance(autoincrement, bool): 333 self._attributes['parameters']['autoincrement'] = autoincrement 334 335 if isinstance(static, bool): 336 self._attributes['parameters']['static'] = static 337 338 if isinstance(enforce, bool): 339 self._attributes['parameters']['enforce'] = enforce 340 341 if isinstance(null_indices, bool): 342 self._attributes['parameters']['null_indices'] = null_indices 343 344 ### NOTE: The parameters dictionary is {} by default. 345 ### A Pipe may be registered without parameters, then edited, 346 ### or a Pipe may be registered with parameters set in-memory first. 347 _mrsm_instance = mrsm_instance if mrsm_instance is not None else (instance or instance_keys) 348 if _mrsm_instance is None: 349 _mrsm_instance = get_config('meerschaum', 'instance', patch=True) 350 351 if not isinstance(_mrsm_instance, str): 352 self._instance_connector = _mrsm_instance 353 self.instance_keys = str(_mrsm_instance) 354 else: ### NOTE: must be SQL or API Connector for this work 355 self.instance_keys = _mrsm_instance 356 357 self._cache = cache and get_config('system', 'experimental', 'cache')
Parameters
- connector (str):
Keys for the pipe's source connector, e.g.
'sql:main'
. - metric (str):
Label for the pipe's contents, e.g.
'weather'
. - location (str, default None):
Label for the pipe's location. Defaults to
None
. - parameters (Optional[Dict[str, Any]], default None):
Optionally set a pipe's parameters from the constructor,
e.g. columns and other attributes.
You can edit these parameters with
edit pipes
. - columns (Union[Dict[str, str], List[str], None], default None):
Set the
columns
dictionary ofparameters
. Ifparameters
is also provided, this dictionary is added under the'columns'
key. - indices (Optional[Dict[str, Union[str, List[str]]]], default None):
Set the
indices
dictionary ofparameters
. Ifparameters
is also provided, this dictionary is added under the'indices'
key. - tags (Optional[List[str]], default None):
A list of strings to be added under the
'tags'
key ofparameters
. You can select pipes with certain tags using--tags
. - dtypes (Optional[Dict[str, str]], default None):
Set the
dtypes
dictionary ofparameters
. Ifparameters
is also provided, this dictionary is added under the'dtypes'
key. - mrsm_instance (Optional[Union[str, InstanceConnector]], default None):
Connector for the Meerschaum instance where the pipe resides.
Defaults to the preconfigured default instance (
'sql:main'
). - instance (Optional[Union[str, InstanceConnector]], default None):
Alias for
mrsm_instance
. Ifmrsm_instance
is supplied, this value is ignored. - upsert (Optional[bool], default None):
If
True
, setupsert
toTrue
in the parameters. - autoincrement (Optional[bool], default None):
If
True
, setautoincrement
in the parameters. - static (Optional[bool], default None):
If
True
, setstatic
in the parameters. - enforce (Optional[bool], default None):
If
False
, skip data type enforcement. Default behavior isTrue
. - null_indices (Optional[bool], default None):
Set to
False
if there will be no null values in the index columns. Defaults toTrue
. - temporary (bool, default False):
If
True
, prevent instance tables (pipes, users, plugins) from being created. - cache (bool, default False):
If
True
, cache fetched data into a local database file. Defaults toFalse
.
359 @property 360 def meta(self): 361 """ 362 Return the four keys needed to reconstruct this pipe. 363 """ 364 return { 365 'connector_keys': self.connector_keys, 366 'metric_key': self.metric_key, 367 'location_key': self.location_key, 368 'instance_keys': self.instance_keys, 369 }
Return the four keys needed to reconstruct this pipe.
371 def keys(self) -> List[str]: 372 """ 373 Return the ordered keys for this pipe. 374 """ 375 return { 376 key: val 377 for key, val in self.meta.items() 378 if key != 'instance' 379 }
Return the ordered keys for this pipe.
381 @property 382 def instance_connector(self) -> Union[InstanceConnector, None]: 383 """ 384 The connector to where this pipe resides. 385 May either be of type `meerschaum.connectors.sql.SQLConnector` or 386 `meerschaum.connectors.api.APIConnector`. 387 """ 388 if '_instance_connector' not in self.__dict__: 389 from meerschaum.connectors.parse import parse_instance_keys 390 conn = parse_instance_keys(self.instance_keys) 391 if conn: 392 self._instance_connector = conn 393 else: 394 return None 395 return self._instance_connector
The connector to where this pipe resides.
May either be of type meerschaum.connectors.sql.SQLConnector
or
meerschaum.connectors.api.APIConnector
.
397 @property 398 def connector(self) -> Union[meerschaum.connectors.Connector, None]: 399 """ 400 The connector to the data source. 401 """ 402 if '_connector' not in self.__dict__: 403 from meerschaum.connectors.parse import parse_instance_keys 404 import warnings 405 with warnings.catch_warnings(): 406 warnings.simplefilter('ignore') 407 try: 408 conn = parse_instance_keys(self.connector_keys) 409 except Exception: 410 conn = None 411 if conn: 412 self._connector = conn 413 else: 414 return None 415 return self._connector
The connector to the data source.
417 @property 418 def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]: 419 """ 420 If the pipe was created with `cache=True`, return the connector to the pipe's 421 SQLite database for caching. 422 """ 423 if not self._cache: 424 return None 425 426 if '_cache_connector' not in self.__dict__: 427 from meerschaum.connectors import get_connector 428 from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH 429 _resources_path = SQLITE_RESOURCES_PATH 430 self._cache_connector = get_connector( 431 'sql', '_cache_' + str(self), 432 flavor='sqlite', 433 database=str(_resources_path / ('_cache_' + str(self) + '.db')), 434 ) 435 436 return self._cache_connector
If the pipe was created with cache=True
, return the connector to the pipe's
SQLite database for caching.
438 @property 439 def cache_pipe(self) -> Union['meerschaum.Pipe', None]: 440 """ 441 If the pipe was created with `cache=True`, return another `meerschaum.Pipe` used to 442 manage the local data. 443 """ 444 if self.cache_connector is None: 445 return None 446 if '_cache_pipe' not in self.__dict__: 447 from meerschaum.config._patch import apply_patch_to_config 448 from meerschaum.utils.sql import sql_item_name 449 _parameters = copy.deepcopy(self.parameters) 450 _fetch_patch = { 451 'fetch': ({ 452 'definition': ( 453 "SELECT * FROM " 454 + sql_item_name( 455 str(self.target), 456 self.instance_connector.flavor, 457 self.instance_connector.get_pipe_schema(self), 458 ) 459 ), 460 }) if self.instance_connector.type == 'sql' else ({ 461 'connector_keys': self.connector_keys, 462 'metric_key': self.metric_key, 463 'location_key': self.location_key, 464 }) 465 } 466 _parameters = apply_patch_to_config(_parameters, _fetch_patch) 467 self._cache_pipe = Pipe( 468 self.instance_keys, 469 (self.connector_keys + '_' + self.metric_key + '_cache'), 470 self.location_key, 471 mrsm_instance = self.cache_connector, 472 parameters = _parameters, 473 cache = False, 474 temporary = True, 475 ) 476 477 return self._cache_pipe
If the pipe was created with cache=True
, return another meerschaum.Pipe
used to
manage the local data.
21def fetch( 22 self, 23 begin: Union[datetime, int, str, None] = '', 24 end: Union[datetime, int, None] = None, 25 check_existing: bool = True, 26 sync_chunks: bool = False, 27 debug: bool = False, 28 **kw: Any 29) -> Union['pd.DataFrame', Iterator['pd.DataFrame'], None]: 30 """ 31 Fetch a Pipe's latest data from its connector. 32 33 Parameters 34 ---------- 35 begin: Union[datetime, str, None], default '': 36 If provided, only fetch data newer than or equal to `begin`. 37 38 end: Optional[datetime], default None: 39 If provided, only fetch data older than or equal to `end`. 40 41 check_existing: bool, default True 42 If `False`, do not apply the backtrack interval. 43 44 sync_chunks: bool, default False 45 If `True` and the pipe's connector is of type `'sql'`, begin syncing chunks while fetching 46 loads chunks into memory. 47 48 debug: bool, default False 49 Verbosity toggle. 50 51 Returns 52 ------- 53 A `pd.DataFrame` of the newest unseen data. 54 55 """ 56 if 'fetch' not in dir(self.connector): 57 warn(f"No `fetch()` function defined for connector '{self.connector}'") 58 return None 59 60 from meerschaum.connectors import get_connector_plugin 61 from meerschaum.utils.misc import filter_arguments 62 63 _chunk_hook = kw.pop('chunk_hook', None) 64 kw['workers'] = self.get_num_workers(kw.get('workers', None)) 65 if sync_chunks and _chunk_hook is None: 66 67 def _chunk_hook(chunk, **_kw) -> SuccessTuple: 68 """ 69 Wrap `Pipe.sync()` with a custom chunk label prepended to the message. 70 """ 71 from meerschaum.config._patch import apply_patch_to_config 72 kwargs = apply_patch_to_config(kw, _kw) 73 chunk_success, chunk_message = self.sync(chunk, **kwargs) 74 chunk_label = self._get_chunk_label(chunk, self.columns.get('datetime', None)) 75 if chunk_label: 76 chunk_message = '\n' + chunk_label + '\n' + chunk_message 77 return chunk_success, chunk_message 78 79 begin, end = self.parse_date_bounds(begin, end) 80 81 with mrsm.Venv(get_connector_plugin(self.connector)): 82 _args, _kwargs = filter_arguments( 83 self.connector.fetch, 84 self, 85 begin=_determine_begin( 86 self, 87 begin, 88 end, 89 check_existing=check_existing, 90 debug=debug, 91 ), 92 end=end, 93 chunk_hook=_chunk_hook, 94 debug=debug, 95 **kw 96 ) 97 df = self.connector.fetch(*_args, **_kwargs) 98 return df
Fetch a Pipe's latest data from its connector.
Parameters
- begin (Union[datetime, str, None], default '':):
If provided, only fetch data newer than or equal to
begin
. - end (Optional[datetime], default None:):
If provided, only fetch data older than or equal to
end
. - check_existing (bool, default True):
If
False
, do not apply the backtrack interval. - sync_chunks (bool, default False):
If
True
and the pipe's connector is of type'sql'
, begin syncing chunks while fetching loads chunks into memory. - debug (bool, default False): Verbosity toggle.
Returns
- A
pd.DataFrame
of the newest unseen data.
101def get_backtrack_interval( 102 self, 103 check_existing: bool = True, 104 debug: bool = False, 105) -> Union[timedelta, int]: 106 """ 107 Get the chunk interval to use for this pipe. 108 109 Parameters 110 ---------- 111 check_existing: bool, default True 112 If `False`, return a backtrack_interval of 0 minutes. 113 114 Returns 115 ------- 116 The backtrack interval (`timedelta` or `int`) to use with this pipe's `datetime` axis. 117 """ 118 default_backtrack_minutes = get_config('pipes', 'parameters', 'fetch', 'backtrack_minutes') 119 configured_backtrack_minutes = self.parameters.get('fetch', {}).get('backtrack_minutes', None) 120 backtrack_minutes = ( 121 configured_backtrack_minutes 122 if configured_backtrack_minutes is not None 123 else default_backtrack_minutes 124 ) if check_existing else 0 125 126 backtrack_interval = timedelta(minutes=backtrack_minutes) 127 dt_col = self.columns.get('datetime', None) 128 if dt_col is None: 129 return backtrack_interval 130 131 dt_dtype = self.dtypes.get(dt_col, 'datetime64[ns, UTC]') 132 if 'int' in dt_dtype.lower(): 133 return backtrack_minutes 134 135 return backtrack_interval
Get the chunk interval to use for this pipe.
Parameters
- check_existing (bool, default True):
If
False
, return a backtrack_interval of 0 minutes.
Returns
- The backtrack interval (
timedelta
orint
) to use with this pipe'sdatetime
axis.
23def get_data( 24 self, 25 select_columns: Optional[List[str]] = None, 26 omit_columns: Optional[List[str]] = None, 27 begin: Union[datetime, int, str, None] = None, 28 end: Union[datetime, int, str, None] = None, 29 params: Optional[Dict[str, Any]] = None, 30 as_iterator: bool = False, 31 as_chunks: bool = False, 32 as_dask: bool = False, 33 chunk_interval: Union[timedelta, int, None] = None, 34 order: Optional[str] = 'asc', 35 limit: Optional[int] = None, 36 fresh: bool = False, 37 debug: bool = False, 38 **kw: Any 39) -> Union['pd.DataFrame', Iterator['pd.DataFrame'], None]: 40 """ 41 Get a pipe's data from the instance connector. 42 43 Parameters 44 ---------- 45 select_columns: Optional[List[str]], default None 46 If provided, only select these given columns. 47 Otherwise select all available columns (i.e. `SELECT *`). 48 49 omit_columns: Optional[List[str]], default None 50 If provided, remove these columns from the selection. 51 52 begin: Union[datetime, int, str, None], default None 53 Lower bound datetime to begin searching for data (inclusive). 54 Translates to a `WHERE` clause like `WHERE datetime >= begin`. 55 Defaults to `None`. 56 57 end: Union[datetime, int, str, None], default None 58 Upper bound datetime to stop searching for data (inclusive). 59 Translates to a `WHERE` clause like `WHERE datetime < end`. 60 Defaults to `None`. 61 62 params: Optional[Dict[str, Any]], default None 63 Filter the retrieved data by a dictionary of parameters. 64 See `meerschaum.utils.sql.build_where` for more details. 65 66 as_iterator: bool, default False 67 If `True`, return a generator of chunks of pipe data. 68 69 as_chunks: bool, default False 70 Alias for `as_iterator`. 71 72 as_dask: bool, default False 73 If `True`, return a `dask.DataFrame` 74 (which may be loaded into a Pandas DataFrame with `df.compute()`). 75 76 chunk_interval: Union[timedelta, int, None], default None 77 If `as_iterator`, then return chunks with `begin` and `end` separated by this interval. 78 This may be set under `pipe.parameters['chunk_minutes']`. 79 By default, use a timedelta of 1440 minutes (1 day). 80 If `chunk_interval` is an integer and the `datetime` axis a timestamp, 81 the use a timedelta with the number of minutes configured to this value. 82 If the `datetime` axis is an integer, default to the configured chunksize. 83 If `chunk_interval` is a `timedelta` and the `datetime` axis an integer, 84 use the number of minutes in the `timedelta`. 85 86 order: Optional[str], default 'asc' 87 If `order` is not `None`, sort the resulting dataframe by indices. 88 89 limit: Optional[int], default None 90 If provided, cap the dataframe to this many rows. 91 92 fresh: bool, default True 93 If `True`, skip local cache and directly query the instance connector. 94 Defaults to `True`. 95 96 debug: bool, default False 97 Verbosity toggle. 98 Defaults to `False`. 99 100 Returns 101 ------- 102 A `pd.DataFrame` for the pipe's data corresponding to the provided parameters. 103 104 """ 105 from meerschaum.utils.warnings import warn 106 from meerschaum.utils.venv import Venv 107 from meerschaum.connectors import get_connector_plugin 108 from meerschaum.utils.misc import iterate_chunks, items_str 109 from meerschaum.utils.dtypes import to_pandas_dtype, coerce_timezone 110 from meerschaum.utils.dataframe import add_missing_cols_to_df, df_is_chunk_generator 111 from meerschaum.utils.packages import attempt_import 112 dd = attempt_import('dask.dataframe') if as_dask else None 113 dask = attempt_import('dask') if as_dask else None 114 dateutil_parser = attempt_import('dateutil.parser') 115 116 if select_columns == '*': 117 select_columns = None 118 elif isinstance(select_columns, str): 119 select_columns = [select_columns] 120 121 if isinstance(omit_columns, str): 122 omit_columns = [omit_columns] 123 124 begin, end = self.parse_date_bounds(begin, end) 125 as_iterator = as_iterator or as_chunks 126 dt_col = self.columns.get('datetime', None) 127 128 def _sort_df(_df): 129 if df_is_chunk_generator(_df): 130 return _df 131 indices = [] if dt_col not in _df.columns else [dt_col] 132 non_dt_cols = [ 133 col 134 for col_ix, col in self.columns.items() 135 if col_ix != 'datetime' and col in _df.columns 136 ] 137 indices.extend(non_dt_cols) 138 if 'dask' not in _df.__module__: 139 _df.sort_values( 140 by=indices, 141 inplace=True, 142 ascending=(str(order).lower() == 'asc'), 143 ) 144 _df.reset_index(drop=True, inplace=True) 145 else: 146 _df = _df.sort_values( 147 by=indices, 148 ascending=(str(order).lower() == 'asc'), 149 ) 150 _df = _df.reset_index(drop=True) 151 if limit is not None and len(_df) > limit: 152 return _df.head(limit) 153 return _df 154 155 if as_iterator or as_chunks: 156 df = self._get_data_as_iterator( 157 select_columns=select_columns, 158 omit_columns=omit_columns, 159 begin=begin, 160 end=end, 161 params=params, 162 chunk_interval=chunk_interval, 163 limit=limit, 164 order=order, 165 fresh=fresh, 166 debug=debug, 167 ) 168 return _sort_df(df) 169 170 if as_dask: 171 from multiprocessing.pool import ThreadPool 172 dask_pool = ThreadPool(self.get_num_workers()) 173 dask.config.set(pool=dask_pool) 174 chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug) 175 bounds = self.get_chunk_bounds( 176 begin=begin, 177 end=end, 178 bounded=False, 179 chunk_interval=chunk_interval, 180 debug=debug, 181 ) 182 dask_chunks = [ 183 dask.delayed(self.get_data)( 184 select_columns=select_columns, 185 omit_columns=omit_columns, 186 begin=chunk_begin, 187 end=chunk_end, 188 params=params, 189 chunk_interval=chunk_interval, 190 order=order, 191 limit=limit, 192 fresh=fresh, 193 debug=debug, 194 ) 195 for (chunk_begin, chunk_end) in bounds 196 ] 197 dask_meta = { 198 col: to_pandas_dtype(typ) 199 for col, typ in self.dtypes.items() 200 } 201 return _sort_df(dd.from_delayed(dask_chunks, meta=dask_meta)) 202 203 if not self.exists(debug=debug): 204 return None 205 206 if self.cache_pipe is not None: 207 if not fresh: 208 _sync_cache_tuple = self.cache_pipe.sync( 209 begin=begin, 210 end=end, 211 params=params, 212 debug=debug, 213 **kw 214 ) 215 if not _sync_cache_tuple[0]: 216 warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1]) 217 fresh = True 218 else: ### Successfully synced cache. 219 return self.enforce_dtypes( 220 self.cache_pipe.get_data( 221 select_columns=select_columns, 222 omit_columns=omit_columns, 223 begin=begin, 224 end=end, 225 params=params, 226 order=order, 227 limit=limit, 228 debug=debug, 229 fresh=True, 230 **kw 231 ), 232 debug=debug, 233 ) 234 235 with Venv(get_connector_plugin(self.instance_connector)): 236 df = self.instance_connector.get_pipe_data( 237 pipe=self, 238 select_columns=select_columns, 239 omit_columns=omit_columns, 240 begin=begin, 241 end=end, 242 params=params, 243 limit=limit, 244 order=order, 245 debug=debug, 246 **kw 247 ) 248 if df is None: 249 return df 250 251 if not select_columns: 252 select_columns = [col for col in df.columns] 253 254 cols_to_omit = [ 255 col 256 for col in df.columns 257 if ( 258 col in (omit_columns or []) 259 or 260 col not in (select_columns or []) 261 ) 262 ] 263 cols_to_add = [ 264 col 265 for col in select_columns 266 if col not in df.columns 267 ] 268 if cols_to_omit: 269 warn( 270 ( 271 f"Received {len(cols_to_omit)} omitted column" 272 + ('s' if len(cols_to_omit) != 1 else '') 273 + f" for {self}. " 274 + "Consider adding `select_columns` and `omit_columns` support to " 275 + f"'{self.instance_connector.type}' connectors to improve performance." 276 ), 277 stack=False, 278 ) 279 _cols_to_select = [col for col in df.columns if col not in cols_to_omit] 280 df = df[_cols_to_select] 281 282 if cols_to_add: 283 warn( 284 ( 285 f"Specified columns {items_str(cols_to_add)} were not found on {self}. " 286 + "Adding these to the DataFrame as null columns." 287 ), 288 stack=False, 289 ) 290 df = add_missing_cols_to_df(df, {col: 'string' for col in cols_to_add}) 291 292 enforced_df = self.enforce_dtypes(df, debug=debug) 293 294 if order: 295 return _sort_df(enforced_df) 296 return enforced_df
Get a pipe's data from the instance connector.
Parameters
- select_columns (Optional[List[str]], default None):
If provided, only select these given columns.
Otherwise select all available columns (i.e.
SELECT *
). - omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
- begin (Union[datetime, int, str, None], default None):
Lower bound datetime to begin searching for data (inclusive).
Translates to a
WHERE
clause likeWHERE datetime >= begin
. Defaults toNone
. - end (Union[datetime, int, str, None], default None):
Upper bound datetime to stop searching for data (inclusive).
Translates to a
WHERE
clause likeWHERE datetime < end
. Defaults toNone
. - params (Optional[Dict[str, Any]], default None):
Filter the retrieved data by a dictionary of parameters.
See
meerschaum.utils.sql.build_where
for more details. - as_iterator (bool, default False):
If
True
, return a generator of chunks of pipe data. - as_chunks (bool, default False):
Alias for
as_iterator
. - as_dask (bool, default False):
If
True
, return adask.DataFrame
(which may be loaded into a Pandas DataFrame withdf.compute()
). - chunk_interval (Union[timedelta, int, None], default None):
If
as_iterator
, then return chunks withbegin
andend
separated by this interval. This may be set underpipe.parameters['chunk_minutes']
. By default, use a timedelta of 1440 minutes (1 day). Ifchunk_interval
is an integer and thedatetime
axis a timestamp, the use a timedelta with the number of minutes configured to this value. If thedatetime
axis is an integer, default to the configured chunksize. Ifchunk_interval
is atimedelta
and thedatetime
axis an integer, use the number of minutes in thetimedelta
. - order (Optional[str], default 'asc'):
If
order
is notNone
, sort the resulting dataframe by indices. - limit (Optional[int], default None): If provided, cap the dataframe to this many rows.
- fresh (bool, default True):
If
True
, skip local cache and directly query the instance connector. Defaults toTrue
. - debug (bool, default False):
Verbosity toggle.
Defaults to
False
.
Returns
- A
pd.DataFrame
for the pipe's data corresponding to the provided parameters.
388def get_backtrack_data( 389 self, 390 backtrack_minutes: Optional[int] = None, 391 begin: Union[datetime, int, None] = None, 392 params: Optional[Dict[str, Any]] = None, 393 limit: Optional[int] = None, 394 fresh: bool = False, 395 debug: bool = False, 396 **kw: Any 397) -> Optional['pd.DataFrame']: 398 """ 399 Get the most recent data from the instance connector as a Pandas DataFrame. 400 401 Parameters 402 ---------- 403 backtrack_minutes: Optional[int], default None 404 How many minutes from `begin` to select from. 405 If `None`, use `pipe.parameters['fetch']['backtrack_minutes']`. 406 407 begin: Optional[datetime], default None 408 The starting point to search for data. 409 If begin is `None` (default), use the most recent observed datetime 410 (AKA sync_time). 411 412 ``` 413 E.g. begin = 02:00 414 415 Search this region. Ignore this, even if there's data. 416 / / / / / / / / / | 417 -----|----------|----------|----------|----------|----------| 418 00:00 01:00 02:00 03:00 04:00 05:00 419 420 ``` 421 422 params: Optional[Dict[str, Any]], default None 423 The standard Meerschaum `params` query dictionary. 424 425 limit: Optional[int], default None 426 If provided, cap the number of rows to be returned. 427 428 fresh: bool, default False 429 If `True`, Ignore local cache and pull directly from the instance connector. 430 Only comes into effect if a pipe was created with `cache=True`. 431 432 debug: bool default False 433 Verbosity toggle. 434 435 Returns 436 ------- 437 A `pd.DataFrame` for the pipe's data corresponding to the provided parameters. Backtrack data 438 is a convenient way to get a pipe's data "backtracked" from the most recent datetime. 439 """ 440 from meerschaum.utils.warnings import warn 441 from meerschaum.utils.venv import Venv 442 from meerschaum.connectors import get_connector_plugin 443 444 if not self.exists(debug=debug): 445 return None 446 447 begin = self.parse_date_bounds(begin) 448 449 backtrack_interval = self.get_backtrack_interval(debug=debug) 450 if backtrack_minutes is None: 451 backtrack_minutes = ( 452 (backtrack_interval.total_seconds() / 60) 453 if isinstance(backtrack_interval, timedelta) 454 else backtrack_interval 455 ) 456 457 if self.cache_pipe is not None: 458 if not fresh: 459 _sync_cache_tuple = self.cache_pipe.sync(begin=begin, params=params, debug=debug, **kw) 460 if not _sync_cache_tuple[0]: 461 warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1]) 462 fresh = True 463 else: ### Successfully synced cache. 464 return self.enforce_dtypes( 465 self.cache_pipe.get_backtrack_data( 466 fresh=True, 467 begin=begin, 468 backtrack_minutes=backtrack_minutes, 469 params=params, 470 limit=limit, 471 order=kw.get('order', 'desc'), 472 debug=debug, 473 **kw 474 ), 475 debug=debug, 476 ) 477 478 if hasattr(self.instance_connector, 'get_backtrack_data'): 479 with Venv(get_connector_plugin(self.instance_connector)): 480 return self.enforce_dtypes( 481 self.instance_connector.get_backtrack_data( 482 pipe=self, 483 begin=begin, 484 backtrack_minutes=backtrack_minutes, 485 params=params, 486 limit=limit, 487 debug=debug, 488 **kw 489 ), 490 debug=debug, 491 ) 492 493 if begin is None: 494 begin = self.get_sync_time(params=params, debug=debug) 495 496 backtrack_interval = ( 497 timedelta(minutes=backtrack_minutes) 498 if isinstance(begin, datetime) 499 else backtrack_minutes 500 ) 501 if begin is not None: 502 begin = begin - backtrack_interval 503 504 return self.get_data( 505 begin=begin, 506 params=params, 507 debug=debug, 508 limit=limit, 509 order=kw.get('order', 'desc'), 510 **kw 511 )
Get the most recent data from the instance connector as a Pandas DataFrame.
Parameters
- backtrack_minutes (Optional[int], default None):
How many minutes from
begin
to select from. IfNone
, usepipe.parameters['fetch']['backtrack_minutes']
. begin (Optional[datetime], default None): The starting point to search for data. If begin is
None
(default), use the most recent observed datetime (AKA sync_time).E.g. begin = 02:00 Search this region. Ignore this, even if there's data. / / / / / / / / / | -----|----------|----------|----------|----------|----------| 00:00 01:00 02:00 03:00 04:00 05:00
params (Optional[Dict[str, Any]], default None): The standard Meerschaum
params
query dictionary.- limit (Optional[int], default None): If provided, cap the number of rows to be returned.
- fresh (bool, default False):
If
True
, Ignore local cache and pull directly from the instance connector. Only comes into effect if a pipe was created withcache=True
. - debug (bool default False): Verbosity toggle.
Returns
- A
pd.DataFrame
for the pipe's data corresponding to the provided parameters. Backtrack data - is a convenient way to get a pipe's data "backtracked" from the most recent datetime.
514def get_rowcount( 515 self, 516 begin: Union[datetime, int, None] = None, 517 end: Union[datetime, int, None] = None, 518 params: Optional[Dict[str, Any]] = None, 519 remote: bool = False, 520 debug: bool = False 521) -> int: 522 """ 523 Get a Pipe's instance or remote rowcount. 524 525 Parameters 526 ---------- 527 begin: Optional[datetime], default None 528 Count rows where datetime > begin. 529 530 end: Optional[datetime], default None 531 Count rows where datetime < end. 532 533 remote: bool, default False 534 Count rows from a pipe's remote source. 535 **NOTE**: This is experimental! 536 537 debug: bool, default False 538 Verbosity toggle. 539 540 Returns 541 ------- 542 An `int` of the number of rows in the pipe corresponding to the provided parameters. 543 Returned 0 if the pipe does not exist. 544 """ 545 from meerschaum.utils.warnings import warn 546 from meerschaum.utils.venv import Venv 547 from meerschaum.connectors import get_connector_plugin 548 549 begin, end = self.parse_date_bounds(begin, end) 550 connector = self.instance_connector if not remote else self.connector 551 try: 552 with Venv(get_connector_plugin(connector)): 553 rowcount = connector.get_pipe_rowcount( 554 self, 555 begin=begin, 556 end=end, 557 params=params, 558 remote=remote, 559 debug=debug, 560 ) 561 if rowcount is None: 562 return 0 563 return rowcount 564 except AttributeError as e: 565 warn(e) 566 if remote: 567 return 0 568 warn(f"Failed to get a rowcount for {self}.") 569 return 0
Get a Pipe's instance or remote rowcount.
Parameters
- begin (Optional[datetime], default None): Count rows where datetime > begin.
- end (Optional[datetime], default None): Count rows where datetime < end.
- remote (bool, default False): Count rows from a pipe's remote source. NOTE: This is experimental!
- debug (bool, default False): Verbosity toggle.
Returns
- An
int
of the number of rows in the pipe corresponding to the provided parameters. - Returned 0 if the pipe does not exist.
572def get_chunk_interval( 573 self, 574 chunk_interval: Union[timedelta, int, None] = None, 575 debug: bool = False, 576) -> Union[timedelta, int]: 577 """ 578 Get the chunk interval to use for this pipe. 579 580 Parameters 581 ---------- 582 chunk_interval: Union[timedelta, int, None], default None 583 If provided, coerce this value into the correct type. 584 For example, if the datetime axis is an integer, then 585 return the number of minutes. 586 587 Returns 588 ------- 589 The chunk interval (`timedelta` or `int`) to use with this pipe's `datetime` axis. 590 """ 591 default_chunk_minutes = get_config('pipes', 'parameters', 'verify', 'chunk_minutes') 592 configured_chunk_minutes = self.parameters.get('verify', {}).get('chunk_minutes', None) 593 chunk_minutes = ( 594 (configured_chunk_minutes or default_chunk_minutes) 595 if chunk_interval is None 596 else ( 597 chunk_interval 598 if isinstance(chunk_interval, int) 599 else int(chunk_interval.total_seconds() / 60) 600 ) 601 ) 602 603 dt_col = self.columns.get('datetime', None) 604 if dt_col is None: 605 return timedelta(minutes=chunk_minutes) 606 607 dt_dtype = self.dtypes.get(dt_col, 'datetime64[ns, UTC]') 608 if 'int' in dt_dtype.lower(): 609 return chunk_minutes 610 return timedelta(minutes=chunk_minutes)
Get the chunk interval to use for this pipe.
Parameters
- chunk_interval (Union[timedelta, int, None], default None): If provided, coerce this value into the correct type. For example, if the datetime axis is an integer, then return the number of minutes.
Returns
- The chunk interval (
timedelta
orint
) to use with this pipe'sdatetime
axis.
613def get_chunk_bounds( 614 self, 615 begin: Union[datetime, int, None] = None, 616 end: Union[datetime, int, None] = None, 617 bounded: bool = False, 618 chunk_interval: Union[timedelta, int, None] = None, 619 debug: bool = False, 620) -> List[ 621 Tuple[ 622 Union[datetime, int, None], 623 Union[datetime, int, None], 624 ] 625]: 626 """ 627 Return a list of datetime bounds for iterating over the pipe's `datetime` axis. 628 629 Parameters 630 ---------- 631 begin: Union[datetime, int, None], default None 632 If provided, do not select less than this value. 633 Otherwise the first chunk will be unbounded. 634 635 end: Union[datetime, int, None], default None 636 If provided, do not select greater than or equal to this value. 637 Otherwise the last chunk will be unbounded. 638 639 bounded: bool, default False 640 If `True`, do not include `None` in the first chunk. 641 642 chunk_interval: Union[timedelta, int, None], default None 643 If provided, use this interval for the size of chunk boundaries. 644 The default value for this pipe may be set 645 under `pipe.parameters['verify']['chunk_minutes']`. 646 647 debug: bool, default False 648 Verbosity toggle. 649 650 Returns 651 ------- 652 A list of chunk bounds (datetimes or integers). 653 If unbounded, the first and last chunks will include `None`. 654 """ 655 include_less_than_begin = not bounded and begin is None 656 include_greater_than_end = not bounded and end is None 657 if begin is None: 658 begin = self.get_sync_time(newest=False, debug=debug) 659 if end is None: 660 end = self.get_sync_time(newest=True, debug=debug) 661 if begin is None and end is None: 662 return [(None, None)] 663 664 begin, end = self.parse_date_bounds(begin, end) 665 666 ### Set the chunk interval under `pipe.parameters['verify']['chunk_minutes']`. 667 chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug) 668 669 ### Build a list of tuples containing the chunk boundaries 670 ### so that we can sync multiple chunks in parallel. 671 ### Run `verify pipes --workers 1` to sync chunks in series. 672 chunk_bounds = [] 673 begin_cursor = begin 674 while begin_cursor < end: 675 end_cursor = begin_cursor + chunk_interval 676 chunk_bounds.append((begin_cursor, end_cursor)) 677 begin_cursor = end_cursor 678 679 ### The chunk interval might be too large. 680 if not chunk_bounds and end >= begin: 681 chunk_bounds = [(begin, end)] 682 683 ### Truncate the last chunk to the end timestamp. 684 if chunk_bounds[-1][1] > end: 685 chunk_bounds[-1] = (chunk_bounds[-1][0], end) 686 687 ### Pop the last chunk if its bounds are equal. 688 if chunk_bounds[-1][0] == chunk_bounds[-1][1]: 689 chunk_bounds = chunk_bounds[:-1] 690 691 if include_less_than_begin: 692 chunk_bounds = [(None, begin)] + chunk_bounds 693 if include_greater_than_end: 694 chunk_bounds = chunk_bounds + [(end, None)] 695 696 return chunk_bounds
Return a list of datetime bounds for iterating over the pipe's datetime
axis.
Parameters
- begin (Union[datetime, int, None], default None): If provided, do not select less than this value. Otherwise the first chunk will be unbounded.
- end (Union[datetime, int, None], default None): If provided, do not select greater than or equal to this value. Otherwise the last chunk will be unbounded.
- bounded (bool, default False):
If
True
, do not includeNone
in the first chunk. - chunk_interval (Union[timedelta, int, None], default None):
If provided, use this interval for the size of chunk boundaries.
The default value for this pipe may be set
under
pipe.parameters['verify']['chunk_minutes']
. - debug (bool, default False): Verbosity toggle.
Returns
- A list of chunk bounds (datetimes or integers).
- If unbounded, the first and last chunks will include
None
.
699def parse_date_bounds(self, *dt_vals: Union[datetime, int, None]) -> Union[ 700 datetime, 701 int, 702 str, 703 None, 704 Tuple[Union[datetime, int, str, None]] 705]: 706 """ 707 Given a date bound (begin, end), coerce a timezone if necessary. 708 """ 709 from meerschaum.utils.misc import is_int 710 from meerschaum.utils.dtypes import coerce_timezone 711 from meerschaum.utils.warnings import warn 712 dateutil_parser = mrsm.attempt_import('dateutil.parser') 713 714 def _parse_date_bound(dt_val): 715 if dt_val is None: 716 return None 717 718 if isinstance(dt_val, int): 719 return dt_val 720 721 if dt_val == '': 722 return '' 723 724 if is_int(dt_val): 725 return int(dt_val) 726 727 if isinstance(dt_val, str): 728 try: 729 dt_val = dateutil_parser.parse(dt_val) 730 except Exception as e: 731 warn(f"Could not parse '{dt_val}' as datetime:\n{e}") 732 return None 733 734 dt_col = self.columns.get('datetime', None) 735 dt_typ = str(self.dtypes.get(dt_col, 'datetime64[ns, UTC]')) 736 if dt_typ == 'datetime': 737 dt_typ = 'datetime64[ns, UTC]' 738 return coerce_timezone(dt_val, strip_utc=('utc' not in dt_typ.lower())) 739 740 bounds = tuple(_parse_date_bound(dt_val) for dt_val in dt_vals) 741 if len(bounds) == 1: 742 return bounds[0] 743 return bounds
Given a date bound (begin, end), coerce a timezone if necessary.
12def register( 13 self, 14 debug: bool = False, 15 **kw: Any 16 ) -> SuccessTuple: 17 """ 18 Register a new Pipe along with its attributes. 19 20 Parameters 21 ---------- 22 debug: bool, default False 23 Verbosity toggle. 24 25 kw: Any 26 Keyword arguments to pass to `instance_connector.register_pipe()`. 27 28 Returns 29 ------- 30 A `SuccessTuple` of success, message. 31 """ 32 if self.temporary: 33 return False, "Cannot register pipes created with `temporary=True` (read-only)." 34 35 from meerschaum.utils.formatting import get_console 36 from meerschaum.utils.venv import Venv 37 from meerschaum.connectors import get_connector_plugin, custom_types 38 from meerschaum.config._patch import apply_patch_to_config 39 40 import warnings 41 with warnings.catch_warnings(): 42 warnings.simplefilter('ignore') 43 try: 44 _conn = self.connector 45 except Exception as e: 46 _conn = None 47 48 if ( 49 _conn is not None 50 and 51 (_conn.type == 'plugin' or _conn.type in custom_types) 52 and 53 getattr(_conn, 'register', None) is not None 54 ): 55 try: 56 with Venv(get_connector_plugin(_conn), debug=debug): 57 params = self.connector.register(self) 58 except Exception as e: 59 get_console().print_exception() 60 params = None 61 params = {} if params is None else params 62 if not isinstance(params, dict): 63 from meerschaum.utils.warnings import warn 64 warn( 65 f"Invalid parameters returned from `register()` in connector {self.connector}:\n" 66 + f"{params}" 67 ) 68 else: 69 self.parameters = apply_patch_to_config(params, self.parameters) 70 71 if not self.parameters: 72 cols = self.columns if self.columns else {'datetime': None, 'id': None} 73 self.parameters = { 74 'columns': cols, 75 } 76 77 with Venv(get_connector_plugin(self.instance_connector)): 78 return self.instance_connector.register_pipe(self, debug=debug, **kw)
Register a new Pipe along with its attributes.
Parameters
- debug (bool, default False): Verbosity toggle.
- kw (Any):
Keyword arguments to pass to
instance_connector.register_pipe()
.
Returns
- A
SuccessTuple
of success, message.
19@property 20def attributes(self) -> Dict[str, Any]: 21 """ 22 Return a dictionary of a pipe's keys and parameters. 23 These values are reflected directly from the pipes table of the instance. 24 """ 25 import time 26 from meerschaum.config import get_config 27 from meerschaum.config._patch import apply_patch_to_config 28 from meerschaum.utils.venv import Venv 29 from meerschaum.connectors import get_connector_plugin 30 31 timeout_seconds = get_config('pipes', 'attributes', 'local_cache_timeout_seconds') 32 33 if '_attributes' not in self.__dict__: 34 self._attributes = {} 35 36 now = time.perf_counter() 37 last_refresh = self.__dict__.get('_attributes_sync_time', None) 38 timed_out = ( 39 last_refresh is None 40 or 41 (timeout_seconds is not None and (now - last_refresh) >= timeout_seconds) 42 ) 43 if not self.temporary and timed_out: 44 self._attributes_sync_time = now 45 local_attributes = self.__dict__.get('_attributes', {}) 46 with Venv(get_connector_plugin(self.instance_connector)): 47 instance_attributes = self.instance_connector.get_pipe_attributes(self) 48 self._attributes = apply_patch_to_config(instance_attributes, local_attributes) 49 return self._attributes
Return a dictionary of a pipe's keys and parameters. These values are reflected directly from the pipes table of the instance.
52@property 53def parameters(self) -> Optional[Dict[str, Any]]: 54 """ 55 Return the parameters dictionary of the pipe. 56 """ 57 if 'parameters' not in self.attributes: 58 self.attributes['parameters'] = {} 59 _parameters = self.attributes['parameters'] 60 dt_col = _parameters.get('columns', {}).get('datetime', None) 61 dt_typ = _parameters.get('dtypes', {}).get(dt_col, None) if dt_col else None 62 if dt_col and not dt_typ: 63 if 'dtypes' not in _parameters: 64 self.attributes['parameters']['dtypes'] = {} 65 self.attributes['parameters']['dtypes'][dt_col] = 'datetime' 66 return self.attributes['parameters']
Return the parameters dictionary of the pipe.
78@property 79def columns(self) -> Union[Dict[str, str], None]: 80 """ 81 Return the `columns` dictionary defined in `meerschaum.Pipe.parameters`. 82 """ 83 if 'columns' not in self.parameters: 84 self.parameters['columns'] = {} 85 cols = self.parameters['columns'] 86 if not isinstance(cols, dict): 87 cols = {} 88 self.parameters['columns'] = cols 89 return {col_ix: col for col_ix, col in cols.items() if col}
Return the columns
dictionary defined in meerschaum.Pipe.parameters
.
106@property 107def indices(self) -> Union[Dict[str, Union[str, List[str]]], None]: 108 """ 109 Return the `indices` dictionary defined in `meerschaum.Pipe.parameters`. 110 """ 111 indices_key = ( 112 'indexes' 113 if 'indexes' in self.parameters 114 else 'indices' 115 ) 116 if indices_key not in self.parameters: 117 self.parameters[indices_key] = {} 118 _indices = self.parameters[indices_key] 119 _columns = self.columns 120 dt_col = _columns.get('datetime', None) 121 if not isinstance(_indices, dict): 122 _indices = {} 123 self.parameters[indices_key] = _indices 124 unique_cols = list(set(( 125 [dt_col] 126 if dt_col 127 else [] 128 ) + [ 129 col 130 for col_ix, col in _columns.items() 131 if col and col_ix != 'datetime' 132 ])) 133 return { 134 **({'unique': unique_cols} if len(unique_cols) > 1 else {}), 135 **{col_ix: col for col_ix, col in _columns.items() if col}, 136 **_indices 137 }
Return the indices
dictionary defined in meerschaum.Pipe.parameters
.
140@property 141def indexes(self) -> Union[Dict[str, Union[str, List[str]]], None]: 142 """ 143 Alias for `meerschaum.Pipe.indices`. 144 """ 145 return self.indices
Alias for meerschaum.Pipe.indices
.
198@property 199def dtypes(self) -> Union[Dict[str, Any], None]: 200 """ 201 If defined, return the `dtypes` dictionary defined in `meerschaum.Pipe.parameters`. 202 """ 203 from meerschaum.config._patch import apply_patch_to_config 204 from meerschaum.utils.dtypes import MRSM_ALIAS_DTYPES 205 configured_dtypes = self.parameters.get('dtypes', {}) 206 remote_dtypes = self.infer_dtypes(persist=False) 207 patched_dtypes = apply_patch_to_config(remote_dtypes, configured_dtypes) 208 return { 209 col: MRSM_ALIAS_DTYPES.get(typ, typ) 210 for col, typ in patched_dtypes.items() 211 if col and typ 212 }
If defined, return the dtypes
dictionary defined in meerschaum.Pipe.parameters
.
260@property 261def autoincrement(self) -> bool: 262 """ 263 Return the `autoincrement` parameter for the pipe. 264 """ 265 if 'autoincrement' not in self.parameters: 266 self.parameters['autoincrement'] = False 267 268 return self.parameters['autoincrement']
Return the autoincrement
parameter for the pipe.
224@property 225def upsert(self) -> bool: 226 """ 227 Return whether `upsert` is set for the pipe. 228 """ 229 if 'upsert' not in self.parameters: 230 self.parameters['upsert'] = False 231 return self.parameters['upsert']
Return whether upsert
is set for the pipe.
242@property 243def static(self) -> bool: 244 """ 245 Return whether `static` is set for the pipe. 246 """ 247 if 'static' not in self.parameters: 248 self.parameters['static'] = False 249 return self.parameters['static']
Return whether static
is set for the pipe.
279@property 280def tzinfo(self) -> Union[None, timezone]: 281 """ 282 Return `timezone.utc` if the pipe is timezone-aware. 283 """ 284 dt_col = self.columns.get('datetime', None) 285 if not dt_col: 286 return None 287 288 dt_typ = str(self.dtypes.get(dt_col, 'datetime64[ns, UTC]')) 289 if 'utc' in dt_typ.lower() or dt_typ == 'datetime': 290 return timezone.utc 291 292 if dt_typ == 'datetime64[ns]': 293 return None 294 295 return None
Return timezone.utc
if the pipe is timezone-aware.
298@property 299def enforce(self) -> bool: 300 """ 301 Return the `enforce` parameter for the pipe. 302 """ 303 if 'enforce' not in self.parameters: 304 self.parameters['enforce'] = True 305 306 return self.parameters['enforce']
Return the enforce
parameter for the pipe.
317@property 318def null_indices(self) -> bool: 319 """ 320 Return the `null_indices` parameter for the pipe. 321 """ 322 if 'null_indices' not in self.parameters: 323 self.parameters['null_indices'] = True 324 325 return self.parameters['null_indices']
Return the null_indices
parameter for the pipe.
336def get_columns(self, *args: str, error: bool = False) -> Union[str, Tuple[str]]: 337 """ 338 Check if the requested columns are defined. 339 340 Parameters 341 ---------- 342 *args: str 343 The column names to be retrieved. 344 345 error: bool, default False 346 If `True`, raise an `Exception` if the specified column is not defined. 347 348 Returns 349 ------- 350 A tuple of the same size of `args` or a `str` if `args` is a single argument. 351 352 Examples 353 -------- 354 >>> pipe = mrsm.Pipe('test', 'test') 355 >>> pipe.columns = {'datetime': 'dt', 'id': 'id'} 356 >>> pipe.get_columns('datetime', 'id') 357 ('dt', 'id') 358 >>> pipe.get_columns('value', error=True) 359 Exception: 🛑 Missing 'value' column for Pipe('test', 'test'). 360 """ 361 from meerschaum.utils.warnings import error as _error, warn 362 if not args: 363 args = tuple(self.columns.keys()) 364 col_names = [] 365 for col in args: 366 col_name = None 367 try: 368 col_name = self.columns[col] 369 if col_name is None and error: 370 _error(f"Please define the name of the '{col}' column for {self}.") 371 except Exception as e: 372 col_name = None 373 if col_name is None and error: 374 _error(f"Missing '{col}'" + f" column for {self}.") 375 col_names.append(col_name) 376 if len(col_names) == 1: 377 return col_names[0] 378 return tuple(col_names)
Check if the requested columns are defined.
Parameters
- *args (str): The column names to be retrieved.
- error (bool, default False):
If
True
, raise anException
if the specified column is not defined.
Returns
- A tuple of the same size of
args
or astr
ifargs
is a single argument.
Examples
>>> pipe = mrsm.Pipe('test', 'test')
>>> pipe.columns = {'datetime': 'dt', 'id': 'id'}
>>> pipe.get_columns('datetime', 'id')
('dt', 'id')
>>> pipe.get_columns('value', error=True)
Exception: 🛑 Missing 'value' column for Pipe('test', 'test').
381def get_columns_types( 382 self, 383 refresh: bool = False, 384 debug: bool = False, 385) -> Union[Dict[str, str], None]: 386 """ 387 Get a dictionary of a pipe's column names and their types. 388 389 Parameters 390 ---------- 391 refresh: bool, default False 392 If `True`, invalidate the cache and fetch directly from the instance connector. 393 394 debug: bool, default False: 395 Verbosity toggle. 396 397 Returns 398 ------- 399 A dictionary of column names (`str`) to column types (`str`). 400 401 Examples 402 -------- 403 >>> pipe.get_columns_types() 404 { 405 'dt': 'TIMESTAMP WITH TIMEZONE', 406 'id': 'BIGINT', 407 'val': 'DOUBLE PRECISION', 408 } 409 >>> 410 """ 411 import time 412 from meerschaum.connectors import get_connector_plugin 413 from meerschaum.config.static import STATIC_CONFIG 414 from meerschaum.utils.warnings import dprint 415 416 now = time.perf_counter() 417 cache_seconds = STATIC_CONFIG['pipes']['static_schema_cache_seconds'] 418 if not self.static: 419 refresh = True 420 if refresh: 421 _ = self.__dict__.pop('_columns_types_timestamp', None) 422 _ = self.__dict__.pop('_columns_types', None) 423 _columns_types = self.__dict__.get('_columns_types', None) 424 if _columns_types: 425 columns_types_timestamp = self.__dict__.get('_columns_types_timestamp', None) 426 if columns_types_timestamp is not None: 427 delta = now - columns_types_timestamp 428 if delta < cache_seconds: 429 if debug: 430 dprint( 431 f"Returning cached `columns_types` for {self} " 432 f"({round(delta, 2)} seconds old)." 433 ) 434 return _columns_types 435 436 with mrsm.Venv(get_connector_plugin(self.instance_connector)): 437 _columns_types = ( 438 self.instance_connector.get_pipe_columns_types(self, debug=debug) 439 if hasattr(self.instance_connector, 'get_pipe_columns_types') 440 else None 441 ) 442 443 self.__dict__['_columns_types'] = _columns_types 444 self.__dict__['_columns_types_timestamp'] = now 445 return _columns_types or {}
Get a dictionary of a pipe's column names and their types.
Parameters
- refresh (bool, default False):
If
True
, invalidate the cache and fetch directly from the instance connector. - debug (bool, default False:): Verbosity toggle.
Returns
- A dictionary of column names (
str
) to column types (str
).
Examples
>>> pipe.get_columns_types()
{
'dt': 'TIMESTAMP WITH TIMEZONE',
'id': 'BIGINT',
'val': 'DOUBLE PRECISION',
}
>>>
448def get_columns_indices( 449 self, 450 debug: bool = False, 451 refresh: bool = False, 452) -> Dict[str, List[Dict[str, str]]]: 453 """ 454 Return a dictionary mapping columns to index information. 455 """ 456 import time 457 from meerschaum.connectors import get_connector_plugin 458 from meerschaum.config.static import STATIC_CONFIG 459 from meerschaum.utils.warnings import dprint 460 461 now = time.perf_counter() 462 cache_seconds = ( 463 STATIC_CONFIG['pipes']['static_schema_cache_seconds'] 464 if self.static 465 else STATIC_CONFIG['pipes']['exists_timeout_seconds'] 466 ) 467 if refresh: 468 _ = self.__dict__.pop('_columns_indices_timestamp', None) 469 _ = self.__dict__.pop('_columns_indices', None) 470 _columns_indices = self.__dict__.get('_columns_indices', None) 471 if _columns_indices: 472 columns_indices_timestamp = self.__dict__.get('_columns_indices_timestamp', None) 473 if columns_indices_timestamp is not None: 474 delta = now - columns_indices_timestamp 475 if delta < cache_seconds: 476 if debug: 477 dprint( 478 f"Returning cached `columns_indices` for {self} " 479 f"({round(delta, 2)} seconds old)." 480 ) 481 return _columns_indices 482 483 with mrsm.Venv(get_connector_plugin(self.instance_connector)): 484 _columns_indices = ( 485 self.instance_connector.get_pipe_columns_indices(self, debug=debug) 486 if hasattr(self.instance_connector, 'get_pipe_columns_indices') 487 else None 488 ) 489 490 self.__dict__['_columns_indices'] = _columns_indices 491 self.__dict__['_columns_indices_timestamp'] = now 492 return {k: v for k, v in _columns_indices.items() if k and v} or {}
Return a dictionary mapping columns to index information.
732def get_indices(self) -> Dict[str, str]: 733 """ 734 Return a dictionary mapping index keys to their names in the database. 735 736 Returns 737 ------- 738 A dictionary of index keys to index names. 739 """ 740 from meerschaum.connectors import get_connector_plugin 741 with mrsm.Venv(get_connector_plugin(self.instance_connector)): 742 if hasattr(self.instance_connector, 'get_pipe_index_names'): 743 result = self.instance_connector.get_pipe_index_names(self) 744 else: 745 result = {} 746 747 return result
Return a dictionary mapping index keys to their names in the database.
Returns
- A dictionary of index keys to index names.
495def get_id(self, **kw: Any) -> Union[int, None]: 496 """ 497 Fetch a pipe's ID from its instance connector. 498 If the pipe does not exist, return `None`. 499 """ 500 if self.temporary: 501 return None 502 from meerschaum.utils.venv import Venv 503 from meerschaum.connectors import get_connector_plugin 504 505 with Venv(get_connector_plugin(self.instance_connector)): 506 if hasattr(self.instance_connector, 'get_pipe_id'): 507 return self.instance_connector.get_pipe_id(self, **kw) 508 509 return None
Fetch a pipe's ID from its instance connector.
If the pipe does not exist, return None
.
512@property 513def id(self) -> Union[int, None]: 514 """ 515 Fetch and cache a pipe's ID. 516 """ 517 if not ('_id' in self.__dict__ and self._id): 518 self._id = self.get_id() 519 return self._id
Fetch and cache a pipe's ID.
522def get_val_column(self, debug: bool = False) -> Union[str, None]: 523 """ 524 Return the name of the value column if it's defined, otherwise make an educated guess. 525 If not set in the `columns` dictionary, return the first numeric column that is not 526 an ID or datetime column. 527 If none may be found, return `None`. 528 529 Parameters 530 ---------- 531 debug: bool, default False: 532 Verbosity toggle. 533 534 Returns 535 ------- 536 Either a string or `None`. 537 """ 538 from meerschaum.utils.debug import dprint 539 if debug: 540 dprint('Attempting to determine the value column...') 541 try: 542 val_name = self.get_columns('value') 543 except Exception as e: 544 val_name = None 545 if val_name is not None: 546 if debug: 547 dprint(f"Value column: {val_name}") 548 return val_name 549 550 cols = self.columns 551 if cols is None: 552 if debug: 553 dprint('No columns could be determined. Returning...') 554 return None 555 try: 556 dt_name = self.get_columns('datetime', error=False) 557 except Exception as e: 558 dt_name = None 559 try: 560 id_name = self.get_columns('id', errors=False) 561 except Exception as e: 562 id_name = None 563 564 if debug: 565 dprint(f"dt_name: {dt_name}") 566 dprint(f"id_name: {id_name}") 567 568 cols_types = self.get_columns_types(debug=debug) 569 if cols_types is None: 570 return None 571 if debug: 572 dprint(f"cols_types: {cols_types}") 573 if dt_name is not None: 574 cols_types.pop(dt_name, None) 575 if id_name is not None: 576 cols_types.pop(id_name, None) 577 578 candidates = [] 579 candidate_keywords = {'float', 'double', 'precision', 'int', 'numeric',} 580 for search_term in candidate_keywords: 581 for col, typ in cols_types.items(): 582 if search_term in typ.lower(): 583 candidates.append(col) 584 break 585 if not candidates: 586 if debug: 587 dprint("No value column could be determined.") 588 return None 589 590 return candidates[0]
Return the name of the value column if it's defined, otherwise make an educated guess.
If not set in the columns
dictionary, return the first numeric column that is not
an ID or datetime column.
If none may be found, return None
.
Parameters
- debug (bool, default False:): Verbosity toggle.
Returns
- Either a string or
None
.
593@property 594def parents(self) -> List[meerschaum.Pipe]: 595 """ 596 Return a list of `meerschaum.Pipe` objects to be designated as parents. 597 """ 598 if 'parents' not in self.parameters: 599 return [] 600 from meerschaum.utils.warnings import warn 601 _parents_keys = self.parameters['parents'] 602 if not isinstance(_parents_keys, list): 603 warn( 604 f"Please ensure the parents for {self} are defined as a list of keys.", 605 stacklevel = 4 606 ) 607 return [] 608 from meerschaum import Pipe 609 _parents = [] 610 for keys in _parents_keys: 611 try: 612 p = Pipe(**keys) 613 except Exception as e: 614 warn(f"Unable to build parent with keys '{keys}' for {self}:\n{e}") 615 continue 616 _parents.append(p) 617 return _parents
Return a list of meerschaum.Pipe
objects to be designated as parents.
620@property 621def children(self) -> List[meerschaum.Pipe]: 622 """ 623 Return a list of `meerschaum.Pipe` objects to be designated as children. 624 """ 625 if 'children' not in self.parameters: 626 return [] 627 from meerschaum.utils.warnings import warn 628 _children_keys = self.parameters['children'] 629 if not isinstance(_children_keys, list): 630 warn( 631 f"Please ensure the children for {self} are defined as a list of keys.", 632 stacklevel = 4 633 ) 634 return [] 635 from meerschaum import Pipe 636 _children = [] 637 for keys in _children_keys: 638 try: 639 p = Pipe(**keys) 640 except Exception as e: 641 warn(f"Unable to build parent with keys '{keys}' for {self}:\n{e}") 642 continue 643 _children.append(p) 644 return _children
Return a list of meerschaum.Pipe
objects to be designated as children.
647@property 648def target(self) -> str: 649 """ 650 The target table name. 651 You can set the target name under on of the following keys 652 (checked in this order): 653 - `target` 654 - `target_name` 655 - `target_table` 656 - `target_table_name` 657 """ 658 if 'target' not in self.parameters: 659 default_target = self._target_legacy() 660 default_targets = {default_target} 661 potential_keys = ('target_name', 'target_table', 'target_table_name') 662 _target = None 663 for k in potential_keys: 664 if k in self.parameters: 665 _target = self.parameters[k] 666 break 667 668 _target = _target or default_target 669 670 if self.instance_connector.type == 'sql': 671 from meerschaum.utils.sql import truncate_item_name 672 truncated_target = truncate_item_name(_target, self.instance_connector.flavor) 673 default_targets.add(truncated_target) 674 warned_target = self.__dict__.get('_warned_target', False) 675 if truncated_target != _target and not warned_target: 676 if not warned_target: 677 warn( 678 f"The target '{_target}' is too long for '{self.instance_connector.flavor}', " 679 + f"will use {truncated_target} instead." 680 ) 681 self.__dict__['_warned_target'] = True 682 _target = truncated_target 683 684 if _target in default_targets: 685 return _target 686 self.target = _target 687 return self.parameters['target']
The target table name. You can set the target name under on of the following keys (checked in this order):
target
target_name
target_table
target_table_name
710def guess_datetime(self) -> Union[str, None]: 711 """ 712 Try to determine a pipe's datetime column. 713 """ 714 _dtypes = self.dtypes 715 716 ### Abort if the user explictly disallows a datetime index. 717 if 'datetime' in _dtypes: 718 if _dtypes['datetime'] is None: 719 return None 720 721 from meerschaum.utils.dtypes import are_dtypes_equal 722 dt_cols = [ 723 col 724 for col, typ in _dtypes.items() 725 if are_dtypes_equal(typ, 'datetime') 726 ] 727 if not dt_cols: 728 return None 729 return dt_cols[0]
Try to determine a pipe's datetime column.
12def show( 13 self, 14 nopretty: bool = False, 15 debug: bool = False, 16 **kw 17 ) -> SuccessTuple: 18 """ 19 Show attributes of a Pipe. 20 21 Parameters 22 ---------- 23 nopretty: bool, default False 24 If `True`, simply print the JSON of the pipe's attributes. 25 26 debug: bool, default False 27 Verbosity toggle. 28 29 Returns 30 ------- 31 A `SuccessTuple` of success, message. 32 33 """ 34 import json 35 from meerschaum.utils.formatting import ( 36 pprint, make_header, ANSI, highlight_pipes, fill_ansi, get_console, 37 ) 38 from meerschaum.utils.packages import import_rich, attempt_import 39 from meerschaum.utils.warnings import info 40 attributes_json = json.dumps(self.attributes) 41 if not nopretty: 42 _to_print = f"Attributes for {self}:" 43 if ANSI: 44 _to_print = fill_ansi(highlight_pipes(make_header(_to_print)), 'magenta') 45 print(_to_print) 46 rich = import_rich() 47 rich_json = attempt_import('rich.json') 48 get_console().print(rich_json.JSON(attributes_json)) 49 else: 50 print(_to_print) 51 else: 52 print(attributes_json) 53 54 return True, "Success"
Show attributes of a Pipe.
Parameters
- nopretty (bool, default False):
If
True
, simply print the JSON of the pipe's attributes. - debug (bool, default False): Verbosity toggle.
Returns
- A
SuccessTuple
of success, message.
21def edit( 22 self, 23 patch: bool = False, 24 interactive: bool = False, 25 debug: bool = False, 26 **kw: Any 27) -> SuccessTuple: 28 """ 29 Edit a Pipe's configuration. 30 31 Parameters 32 ---------- 33 patch: bool, default False 34 If `patch` is True, update parameters by cascading rather than overwriting. 35 interactive: bool, default False 36 If `True`, open an editor for the user to make changes to the pipe's YAML file. 37 debug: bool, default False 38 Verbosity toggle. 39 40 Returns 41 ------- 42 A `SuccessTuple` of success, message. 43 44 """ 45 from meerschaum.utils.venv import Venv 46 from meerschaum.connectors import get_connector_plugin 47 48 if self.temporary: 49 return False, "Cannot edit pipes created with `temporary=True` (read-only)." 50 51 if not interactive: 52 with Venv(get_connector_plugin(self.instance_connector)): 53 return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw) 54 55 from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH 56 from meerschaum.utils.misc import edit_file 57 parameters_filename = str(self) + '.yaml' 58 parameters_path = PIPES_CACHE_RESOURCES_PATH / parameters_filename 59 60 from meerschaum.utils.yaml import yaml 61 62 edit_text = f"Edit the parameters for {self}" 63 edit_top = '#' * (len(edit_text) + 4) 64 edit_header = edit_top + f'\n# {edit_text} #\n' + edit_top + '\n\n' 65 66 from meerschaum.config import get_config 67 parameters = dict(get_config('pipes', 'parameters', patch=True)) 68 from meerschaum.config._patch import apply_patch_to_config 69 parameters = apply_patch_to_config(parameters, self.parameters) 70 71 ### write parameters to yaml file 72 with open(parameters_path, 'w+') as f: 73 f.write(edit_header) 74 yaml.dump(parameters, stream=f, sort_keys=False) 75 76 ### only quit editing if yaml is valid 77 editing = True 78 while editing: 79 edit_file(parameters_path) 80 try: 81 with open(parameters_path, 'r') as f: 82 file_parameters = yaml.load(f.read()) 83 except Exception as e: 84 from meerschaum.utils.warnings import warn 85 warn(f"Invalid format defined for '{self}':\n\n{e}") 86 input(f"Press [Enter] to correct the configuration for '{self}': ") 87 else: 88 editing = False 89 90 self.parameters = file_parameters 91 92 if debug: 93 from meerschaum.utils.formatting import pprint 94 pprint(self.parameters) 95 96 with Venv(get_connector_plugin(self.instance_connector)): 97 return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw)
Edit a Pipe's configuration.
Parameters
- patch (bool, default False):
If
patch
is True, update parameters by cascading rather than overwriting. - interactive (bool, default False):
If
True
, open an editor for the user to make changes to the pipe's YAML file. - debug (bool, default False): Verbosity toggle.
Returns
- A
SuccessTuple
of success, message.
100def edit_definition( 101 self, 102 yes: bool = False, 103 noask: bool = False, 104 force: bool = False, 105 debug : bool = False, 106 **kw : Any 107) -> SuccessTuple: 108 """ 109 Edit a pipe's definition file and update its configuration. 110 **NOTE:** This function is interactive and should not be used in automated scripts! 111 112 Returns 113 ------- 114 A `SuccessTuple` of success, message. 115 116 """ 117 if self.temporary: 118 return False, "Cannot edit pipes created with `temporary=True` (read-only)." 119 120 from meerschaum.connectors import instance_types 121 if (self.connector is None) or self.connector.type not in instance_types: 122 return self.edit(interactive=True, debug=debug, **kw) 123 124 import json 125 from meerschaum.utils.warnings import info, warn 126 from meerschaum.utils.debug import dprint 127 from meerschaum.config._patch import apply_patch_to_config 128 from meerschaum.utils.misc import edit_file 129 130 _parameters = self.parameters 131 if 'fetch' not in _parameters: 132 _parameters['fetch'] = {} 133 134 def _edit_api(): 135 from meerschaum.utils.prompt import prompt, yes_no 136 info( 137 f"Please enter the keys of the source pipe from '{self.connector}'.\n" + 138 "Type 'None' for None, or empty when there is no default. Press [CTRL+C] to skip." 139 ) 140 141 _keys = { 'connector_keys' : None, 'metric_key' : None, 'location_key' : None } 142 for k in _keys: 143 _keys[k] = _parameters['fetch'].get(k, None) 144 145 for k, v in _keys.items(): 146 try: 147 _keys[k] = prompt(k.capitalize().replace('_', ' ') + ':', icon=True, default=v) 148 except KeyboardInterrupt: 149 continue 150 if _keys[k] in ('', 'None', '\'None\'', '[None]'): 151 _keys[k] = None 152 153 _parameters['fetch'] = apply_patch_to_config(_parameters['fetch'], _keys) 154 155 info("You may optionally specify additional filter parameters as JSON.") 156 print(" Parameters are translated into a 'WHERE x AND y' clause, and lists are IN clauses.") 157 print(" For example, the following JSON would correspond to 'WHERE x = 1 AND y IN (2, 3)':") 158 print(json.dumps({'x': 1, 'y': [2, 3]}, indent=2, separators=(',', ': '))) 159 if force or yes_no( 160 "Would you like to add additional filter parameters?", 161 yes=yes, noask=noask 162 ): 163 from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH 164 definition_filename = str(self) + '.json' 165 definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename 166 try: 167 definition_path.touch() 168 with open(definition_path, 'w+') as f: 169 json.dump(_parameters.get('fetch', {}).get('params', {}), f, indent=2) 170 except Exception as e: 171 return False, f"Failed writing file '{definition_path}':\n" + str(e) 172 173 _params = None 174 while True: 175 edit_file(definition_path) 176 try: 177 with open(definition_path, 'r') as f: 178 _params = json.load(f) 179 except Exception as e: 180 warn(f'Failed to read parameters JSON:\n{e}', stack=False) 181 if force or yes_no( 182 "Would you like to try again?\n " 183 + "If not, the parameters JSON file will be ignored.", 184 noask=noask, yes=yes 185 ): 186 continue 187 _params = None 188 break 189 if _params is not None: 190 if 'fetch' not in _parameters: 191 _parameters['fetch'] = {} 192 _parameters['fetch']['params'] = _params 193 194 self.parameters = _parameters 195 return True, "Success" 196 197 def _edit_sql(): 198 import pathlib, os, textwrap 199 from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH 200 from meerschaum.utils.misc import edit_file 201 definition_filename = str(self) + '.sql' 202 definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename 203 204 sql_definition = _parameters['fetch'].get('definition', None) 205 if sql_definition is None: 206 sql_definition = '' 207 sql_definition = textwrap.dedent(sql_definition).lstrip() 208 209 try: 210 definition_path.touch() 211 with open(definition_path, 'w+') as f: 212 f.write(sql_definition) 213 except Exception as e: 214 return False, f"Failed writing file '{definition_path}':\n" + str(e) 215 216 edit_file(definition_path) 217 try: 218 with open(definition_path, 'r') as f: 219 file_definition = f.read() 220 except Exception as e: 221 return False, f"Failed reading file '{definition_path}':\n" + str(e) 222 223 if sql_definition == file_definition: 224 return False, f"No changes made to definition for {self}." 225 226 if ' ' not in file_definition: 227 return False, f"Invalid SQL definition for {self}." 228 229 if debug: 230 dprint("Read SQL definition:\n\n" + file_definition) 231 _parameters['fetch']['definition'] = file_definition 232 self.parameters = _parameters 233 return True, "Success" 234 235 locals()['_edit_' + str(self.connector.type)]() 236 return self.edit(interactive=False, debug=debug, **kw)
Edit a pipe's definition file and update its configuration. NOTE: This function is interactive and should not be used in automated scripts!
Returns
- A
SuccessTuple
of success, message.
13def update(self, *args, **kw) -> SuccessTuple: 14 """ 15 Update a pipe's parameters in its instance. 16 """ 17 kw['interactive'] = False 18 return self.edit(*args, **kw)
Update a pipe's parameters in its instance.
40def sync( 41 self, 42 df: Union[ 43 pd.DataFrame, 44 Dict[str, List[Any]], 45 List[Dict[str, Any]], 46 InferFetch 47 ] = InferFetch, 48 begin: Union[datetime, int, str, None] = '', 49 end: Union[datetime, int, None] = None, 50 force: bool = False, 51 retries: int = 10, 52 min_seconds: int = 1, 53 check_existing: bool = True, 54 enforce_dtypes: bool = True, 55 blocking: bool = True, 56 workers: Optional[int] = None, 57 callback: Optional[Callable[[Tuple[bool, str]], Any]] = None, 58 error_callback: Optional[Callable[[Exception], Any]] = None, 59 chunksize: Optional[int] = -1, 60 sync_chunks: bool = True, 61 debug: bool = False, 62 _inplace: bool = True, 63 **kw: Any 64) -> SuccessTuple: 65 """ 66 Fetch new data from the source and update the pipe's table with new data. 67 68 Get new remote data via fetch, get existing data in the same time period, 69 and merge the two, only keeping the unseen data. 70 71 Parameters 72 ---------- 73 df: Union[None, pd.DataFrame, Dict[str, List[Any]]], default None 74 An optional DataFrame to sync into the pipe. Defaults to `None`. 75 76 begin: Union[datetime, int, str, None], default '' 77 Optionally specify the earliest datetime to search for data. 78 79 end: Union[datetime, int, str, None], default None 80 Optionally specify the latest datetime to search for data. 81 82 force: bool, default False 83 If `True`, keep trying to sync untul `retries` attempts. 84 85 retries: int, default 10 86 If `force`, how many attempts to try syncing before declaring failure. 87 88 min_seconds: Union[int, float], default 1 89 If `force`, how many seconds to sleep between retries. Defaults to `1`. 90 91 check_existing: bool, default True 92 If `True`, pull and diff with existing data from the pipe. 93 94 enforce_dtypes: bool, default True 95 If `True`, enforce dtypes on incoming data. 96 Set this to `False` if the incoming rows are expected to be of the correct dtypes. 97 98 blocking: bool, default True 99 If `True`, wait for sync to finish and return its result, otherwise 100 asyncronously sync (oxymoron?) and return success. Defaults to `True`. 101 Only intended for specific scenarios. 102 103 workers: Optional[int], default None 104 If provided and the instance connector is thread-safe 105 (`pipe.instance_connector.IS_THREAD_SAFE is True`), 106 limit concurrent sync to this many threads. 107 108 callback: Optional[Callable[[Tuple[bool, str]], Any]], default None 109 Callback function which expects a SuccessTuple as input. 110 Only applies when `blocking=False`. 111 112 error_callback: Optional[Callable[[Exception], Any]], default None 113 Callback function which expects an Exception as input. 114 Only applies when `blocking=False`. 115 116 chunksize: int, default -1 117 Specify the number of rows to sync per chunk. 118 If `-1`, resort to system configuration (default is `900`). 119 A `chunksize` of `None` will sync all rows in one transaction. 120 121 sync_chunks: bool, default True 122 If possible, sync chunks while fetching them into memory. 123 124 debug: bool, default False 125 Verbosity toggle. Defaults to False. 126 127 Returns 128 ------- 129 A `SuccessTuple` of success (`bool`) and message (`str`). 130 """ 131 from meerschaum.utils.debug import dprint, _checkpoint 132 from meerschaum.utils.formatting import get_console 133 from meerschaum.utils.venv import Venv 134 from meerschaum.connectors import get_connector_plugin 135 from meerschaum.utils.misc import df_is_chunk_generator, filter_keywords, filter_arguments 136 from meerschaum.utils.pool import get_pool 137 from meerschaum.config import get_config 138 139 if (callback is not None or error_callback is not None) and blocking: 140 warn("Callback functions are only executed when blocking = False. Ignoring...") 141 142 _checkpoint(_total=2, **kw) 143 144 if chunksize == 0: 145 chunksize = None 146 sync_chunks = False 147 148 begin, end = self.parse_date_bounds(begin, end) 149 kw.update({ 150 'begin': begin, 151 'end': end, 152 'force': force, 153 'retries': retries, 154 'min_seconds': min_seconds, 155 'check_existing': check_existing, 156 'blocking': blocking, 157 'workers': workers, 158 'callback': callback, 159 'error_callback': error_callback, 160 'sync_chunks': sync_chunks, 161 'chunksize': chunksize, 162 }) 163 164 ### NOTE: Invalidate `_exists` cache before and after syncing. 165 self._exists = None 166 167 def _sync( 168 p: mrsm.Pipe, 169 df: Union[ 170 'pd.DataFrame', 171 Dict[str, List[Any]], 172 List[Dict[str, Any]], 173 InferFetch 174 ] = InferFetch, 175 ) -> SuccessTuple: 176 if df is None: 177 p._exists = None 178 return ( 179 False, 180 f"You passed `None` instead of data into `sync()` for {p}.\n" 181 + "Omit the DataFrame to infer fetching.", 182 ) 183 ### Ensure that Pipe is registered. 184 if not p.temporary and p.get_id(debug=debug) is None: 185 ### NOTE: This may trigger an interactive session for plugins! 186 register_success, register_msg = p.register(debug=debug) 187 if not register_success: 188 if 'already' not in register_msg: 189 p._exists = None 190 return register_success, register_msg 191 192 ### If connector is a plugin with a `sync()` method, return that instead. 193 ### If the plugin does not have a `sync()` method but does have a `fetch()` method, 194 ### use that instead. 195 ### NOTE: The DataFrame must be omitted for the plugin sync method to apply. 196 ### If a DataFrame is provided, continue as expected. 197 if hasattr(df, 'MRSM_INFER_FETCH'): 198 try: 199 if p.connector is None: 200 if ':' not in p.connector_keys: 201 return True, f"{p} does not support fetching; nothing to do." 202 203 msg = f"{p} does not have a valid connector." 204 if p.connector_keys.startswith('plugin:'): 205 msg += f"\n Perhaps {p.connector_keys} has a syntax error?" 206 p._exists = None 207 return False, msg 208 except Exception: 209 p._exists = None 210 return False, f"Unable to create the connector for {p}." 211 212 ### Sync in place if this is a SQL pipe. 213 if ( 214 str(self.connector) == str(self.instance_connector) 215 and 216 hasattr(self.instance_connector, 'sync_pipe_inplace') 217 and 218 _inplace 219 and 220 get_config('system', 'experimental', 'inplace_sync') 221 ): 222 with Venv(get_connector_plugin(self.instance_connector)): 223 p._exists = None 224 _args, _kwargs = filter_arguments( 225 p.instance_connector.sync_pipe_inplace, 226 p, 227 debug=debug, 228 **kw 229 ) 230 return self.instance_connector.sync_pipe_inplace( 231 *_args, 232 **_kwargs 233 ) 234 235 ### Activate and invoke `sync(pipe)` for plugin connectors with `sync` methods. 236 try: 237 if getattr(p.connector, 'sync', None) is not None: 238 with Venv(get_connector_plugin(p.connector), debug=debug): 239 _args, _kwargs = filter_arguments( 240 p.connector.sync, 241 p, 242 debug=debug, 243 **kw 244 ) 245 return_tuple = p.connector.sync(*_args, **_kwargs) 246 p._exists = None 247 if not isinstance(return_tuple, tuple): 248 return_tuple = ( 249 False, 250 f"Plugin '{p.connector.label}' returned non-tuple value: {return_tuple}" 251 ) 252 return return_tuple 253 254 except Exception as e: 255 get_console().print_exception() 256 msg = f"Failed to sync {p} with exception: '" + str(e) + "'" 257 if debug: 258 error(msg, silent=False) 259 p._exists = None 260 return False, msg 261 262 ### Fetch the dataframe from the connector's `fetch()` method. 263 try: 264 with Venv(get_connector_plugin(p.connector), debug=debug): 265 df = p.fetch( 266 **filter_keywords( 267 p.fetch, 268 debug=debug, 269 **kw 270 ) 271 ) 272 except Exception as e: 273 get_console().print_exception( 274 suppress=[ 275 'meerschaum/core/Pipe/_sync.py', 276 'meerschaum/core/Pipe/_fetch.py', 277 ] 278 ) 279 msg = f"Failed to fetch data from {p.connector}:\n {e}" 280 df = None 281 282 if df is None: 283 p._exists = None 284 return False, f"No data were fetched for {p}." 285 286 if isinstance(df, list): 287 if len(df) == 0: 288 return True, f"No new rows were returned for {p}." 289 290 ### May be a chunk hook results list. 291 if isinstance(df[0], tuple): 292 success = all([_success for _success, _ in df]) 293 message = '\n'.join([_message for _, _message in df]) 294 return success, message 295 296 if df is True: 297 p._exists = None 298 return True, f"{p} is being synced in parallel." 299 300 ### CHECKPOINT: Retrieved the DataFrame. 301 _checkpoint(**kw) 302 303 ### Allow for dataframe generators or iterables. 304 if df_is_chunk_generator(df): 305 kw['workers'] = p.get_num_workers(kw.get('workers', None)) 306 dt_col = p.columns.get('datetime', None) 307 pool = get_pool(workers=kw.get('workers', 1)) 308 if debug: 309 dprint(f"Received {type(df)}. Attempting to sync first chunk...") 310 311 try: 312 chunk = next(df) 313 except StopIteration: 314 return True, "Received an empty generator; nothing to do." 315 316 chunk_success, chunk_msg = _sync(p, chunk) 317 chunk_msg = '\n' + self._get_chunk_label(chunk, dt_col) + '\n' + chunk_msg 318 if not chunk_success: 319 return chunk_success, f"Unable to sync initial chunk for {p}:\n{chunk_msg}" 320 if debug: 321 dprint("Successfully synced the first chunk, attemping the rest...") 322 323 failed_chunks = [] 324 def _process_chunk(_chunk): 325 try: 326 _chunk_success, _chunk_msg = _sync(p, _chunk) 327 except Exception as e: 328 _chunk_success, _chunk_msg = False, str(e) 329 if not _chunk_success: 330 failed_chunks.append(_chunk) 331 _chunk_msg = ( 332 self._get_chunk_label(_chunk, dt_col) 333 + '\n' 334 + _chunk_msg 335 ) 336 337 mrsm.pprint((_chunk_success, _chunk_msg), calm=True) 338 return _chunk_success, _chunk_msg 339 340 results = sorted( 341 [(chunk_success, chunk_msg)] + ( 342 list(pool.imap(_process_chunk, df)) 343 if ( 344 not df_is_chunk_generator(chunk) # Handle nested generators. 345 and kw.get('workers', 1) != 1 346 ) 347 else list( 348 _process_chunk(_child_chunks) 349 for _child_chunks in df 350 ) 351 ) 352 ) 353 chunk_messages = [chunk_msg for _, chunk_msg in results] 354 success_bools = [chunk_success for chunk_success, _ in results] 355 success = all(success_bools) 356 msg = ( 357 f'Synced {len(chunk_messages)} chunk' 358 + ('s' if len(chunk_messages) != 1 else '') 359 + f' to {p}:\n\n' 360 + '\n\n'.join(chunk_messages).lstrip().rstrip() 361 ).lstrip().rstrip() 362 363 ### If some chunks succeeded, retry the failures. 364 retry_success = True 365 if not success and any(success_bools): 366 if debug: 367 dprint("Retrying failed chunks...") 368 chunks_to_retry = [c for c in failed_chunks] 369 failed_chunks = [] 370 for chunk in chunks_to_retry: 371 chunk_success, chunk_msg = _process_chunk(chunk) 372 msg += f"\n\nRetried chunk:\n{chunk_msg}\n" 373 retry_success = retry_success and chunk_success 374 375 success = success and retry_success 376 return success, msg 377 378 ### Cast to a dataframe and ensure datatypes are what we expect. 379 df = self.enforce_dtypes( 380 df, 381 chunksize=chunksize, 382 enforce=enforce_dtypes, 383 debug=debug, 384 ) 385 386 ### Capture `numeric`, `uuid`, `json`, and `bytes` columns. 387 self._persist_new_json_columns(df, debug=debug) 388 self._persist_new_numeric_columns(df, debug=debug) 389 self._persist_new_uuid_columns(df, debug=debug) 390 self._persist_new_bytes_columns(df, debug=debug) 391 392 if debug: 393 dprint( 394 "DataFrame to sync:\n" 395 + ( 396 str(df)[:255] 397 + '...' 398 if len(str(df)) >= 256 399 else str(df) 400 ), 401 **kw 402 ) 403 404 ### if force, continue to sync until success 405 return_tuple = False, f"Did not sync {p}." 406 run = True 407 _retries = 1 408 while run: 409 with Venv(get_connector_plugin(self.instance_connector)): 410 return_tuple = p.instance_connector.sync_pipe( 411 pipe=p, 412 df=df, 413 debug=debug, 414 **kw 415 ) 416 _retries += 1 417 run = (not return_tuple[0]) and force and _retries <= retries 418 if run and debug: 419 dprint(f"Syncing failed for {p}. Attempt ( {_retries} / {retries} )", **kw) 420 dprint(f"Sleeping for {min_seconds} seconds...", **kw) 421 time.sleep(min_seconds) 422 if _retries > retries: 423 warn( 424 f"Unable to sync {p} within {retries} attempt" + 425 ("s" if retries != 1 else "") + "!" 426 ) 427 428 ### CHECKPOINT: Finished syncing. Handle caching. 429 _checkpoint(**kw) 430 if self.cache_pipe is not None: 431 if debug: 432 dprint("Caching retrieved dataframe.", **kw) 433 _sync_cache_tuple = self.cache_pipe.sync(df, debug=debug, **kw) 434 if not _sync_cache_tuple[0]: 435 warn(f"Failed to sync local cache for {self}.") 436 437 self._exists = None 438 return return_tuple 439 440 if blocking: 441 self._exists = None 442 return _sync(self, df=df) 443 444 from meerschaum.utils.threading import Thread 445 def default_callback(result_tuple: SuccessTuple): 446 dprint(f"Asynchronous result from {self}: {result_tuple}", **kw) 447 448 def default_error_callback(x: Exception): 449 dprint(f"Error received for {self}: {x}", **kw) 450 451 if callback is None and debug: 452 callback = default_callback 453 if error_callback is None and debug: 454 error_callback = default_error_callback 455 try: 456 thread = Thread( 457 target=_sync, 458 args=(self,), 459 kwargs={'df': df}, 460 daemon=False, 461 callback=callback, 462 error_callback=error_callback, 463 ) 464 thread.start() 465 except Exception as e: 466 self._exists = None 467 return False, str(e) 468 469 self._exists = None 470 return True, f"Spawned asyncronous sync for {self}."
Fetch new data from the source and update the pipe's table with new data.
Get new remote data via fetch, get existing data in the same time period, and merge the two, only keeping the unseen data.
Parameters
- df (Union[None, pd.DataFrame, Dict[str, List[Any]]], default None):
An optional DataFrame to sync into the pipe. Defaults to
None
. - begin (Union[datetime, int, str, None], default ''): Optionally specify the earliest datetime to search for data.
- end (Union[datetime, int, str, None], default None): Optionally specify the latest datetime to search for data.
- force (bool, default False):
If
True
, keep trying to sync untulretries
attempts. - retries (int, default 10):
If
force
, how many attempts to try syncing before declaring failure. - min_seconds (Union[int, float], default 1):
If
force
, how many seconds to sleep between retries. Defaults to1
. - check_existing (bool, default True):
If
True
, pull and diff with existing data from the pipe. - enforce_dtypes (bool, default True):
If
True
, enforce dtypes on incoming data. Set this toFalse
if the incoming rows are expected to be of the correct dtypes. - blocking (bool, default True):
If
True
, wait for sync to finish and return its result, otherwise asyncronously sync (oxymoron?) and return success. Defaults toTrue
. Only intended for specific scenarios. - workers (Optional[int], default None):
If provided and the instance connector is thread-safe
(
pipe.instance_connector.IS_THREAD_SAFE is True
), limit concurrent sync to this many threads. - callback (Optional[Callable[[Tuple[bool, str]], Any]], default None):
Callback function which expects a SuccessTuple as input.
Only applies when
blocking=False
. - error_callback (Optional[Callable[[Exception], Any]], default None):
Callback function which expects an Exception as input.
Only applies when
blocking=False
. - chunksize (int, default -1):
Specify the number of rows to sync per chunk.
If
-1
, resort to system configuration (default is900
). Achunksize
ofNone
will sync all rows in one transaction. - sync_chunks (bool, default True): If possible, sync chunks while fetching them into memory.
- debug (bool, default False): Verbosity toggle. Defaults to False.
Returns
- A
SuccessTuple
of success (bool
) and message (str
).
473def get_sync_time( 474 self, 475 params: Optional[Dict[str, Any]] = None, 476 newest: bool = True, 477 apply_backtrack_interval: bool = False, 478 round_down: bool = False, 479 debug: bool = False 480) -> Union['datetime', int, None]: 481 """ 482 Get the most recent datetime value for a Pipe. 483 484 Parameters 485 ---------- 486 params: Optional[Dict[str, Any]], default None 487 Dictionary to build a WHERE clause for a specific column. 488 See `meerschaum.utils.sql.build_where`. 489 490 newest: bool, default True 491 If `True`, get the most recent datetime (honoring `params`). 492 If `False`, get the oldest datetime (`ASC` instead of `DESC`). 493 494 apply_backtrack_interval: bool, default False 495 If `True`, subtract the backtrack interval from the sync time. 496 497 round_down: bool, default False 498 If `True`, round down the datetime value to the nearest minute. 499 500 debug: bool, default False 501 Verbosity toggle. 502 503 Returns 504 ------- 505 A `datetime` or int, if the pipe exists, otherwise `None`. 506 507 """ 508 from meerschaum.utils.venv import Venv 509 from meerschaum.connectors import get_connector_plugin 510 from meerschaum.utils.misc import round_time 511 512 if not self.columns.get('datetime', None): 513 return None 514 515 with Venv(get_connector_plugin(self.instance_connector)): 516 sync_time = self.instance_connector.get_sync_time( 517 self, 518 params=params, 519 newest=newest, 520 debug=debug, 521 ) 522 523 if round_down and isinstance(sync_time, datetime): 524 sync_time = round_time(sync_time, timedelta(minutes=1)) 525 526 if apply_backtrack_interval and sync_time is not None: 527 backtrack_interval = self.get_backtrack_interval(debug=debug) 528 try: 529 sync_time -= backtrack_interval 530 except Exception as e: 531 warn(f"Failed to apply backtrack interval:\n{e}") 532 533 return self.parse_date_bounds(sync_time)
Get the most recent datetime value for a Pipe.
Parameters
- params (Optional[Dict[str, Any]], default None):
Dictionary to build a WHERE clause for a specific column.
See
meerschaum.utils.sql.build_where
. - newest (bool, default True):
If
True
, get the most recent datetime (honoringparams
). IfFalse
, get the oldest datetime (ASC
instead ofDESC
). - apply_backtrack_interval (bool, default False):
If
True
, subtract the backtrack interval from the sync time. - round_down (bool, default False):
If
True
, round down the datetime value to the nearest minute. - debug (bool, default False): Verbosity toggle.
Returns
- A
datetime
or int, if the pipe exists, otherwiseNone
.
536def exists( 537 self, 538 debug: bool = False 539) -> bool: 540 """ 541 See if a Pipe's table exists. 542 543 Parameters 544 ---------- 545 debug: bool, default False 546 Verbosity toggle. 547 548 Returns 549 ------- 550 A `bool` corresponding to whether a pipe's underlying table exists. 551 552 """ 553 import time 554 from meerschaum.utils.venv import Venv 555 from meerschaum.connectors import get_connector_plugin 556 from meerschaum.config import STATIC_CONFIG 557 from meerschaum.utils.debug import dprint 558 now = time.perf_counter() 559 exists_timeout_seconds = STATIC_CONFIG['pipes']['exists_timeout_seconds'] 560 561 _exists = self.__dict__.get('_exists', None) 562 if _exists: 563 exists_timestamp = self.__dict__.get('_exists_timestamp', None) 564 if exists_timestamp is not None: 565 delta = now - exists_timestamp 566 if delta < exists_timeout_seconds: 567 if debug: 568 dprint(f"Returning cached `exists` for {self} ({round(delta, 2)} seconds old).") 569 return _exists 570 571 with Venv(get_connector_plugin(self.instance_connector)): 572 _exists = ( 573 self.instance_connector.pipe_exists(pipe=self, debug=debug) 574 if hasattr(self.instance_connector, 'pipe_exists') 575 else False 576 ) 577 578 self.__dict__['_exists'] = _exists 579 self.__dict__['_exists_timestamp'] = now 580 return _exists
See if a Pipe's table exists.
Parameters
- debug (bool, default False): Verbosity toggle.
Returns
- A
bool
corresponding to whether a pipe's underlying table exists.