meerschaum
Meerschaum Python API
Welcome to the Meerschaum Python API technical documentation! Here you can find information about the classes and functions provided by the meerschaum
package. Visit meerschaum.io for general usage documentation.
Root Module
For your convenience, the following classes and functions may be imported from the root meerschaum
namespace:
Classes
Examples
Build a Connector
Get existing connectors or build a new one in-memory with the meerschaum.get_connector()
factory function:
import meerschaum as mrsm
sql_conn = mrsm.get_connector(
'sql:temp',
flavor='sqlite',
database='/tmp/tmp.db',
)
df = sql_conn.read("SELECT 1 AS foo")
print(df)
# foo
# 0 1
sql_conn.to_sql(df, 'foo')
print(sql_conn.read('foo'))
# foo
# 0 1
Create a Custom Connector Class
Decorate your connector classes with meerschaum.make_connector()
to designate it as a custom connector:
from datetime import datetime, timezone
from random import randint
import meerschaum as mrsm
from meerschaum.utils.misc import round_time
@mrsm.make_connector
class FooConnector(mrsm.Connector):
REQUIRED_ATTRIBUTES = ['username', 'password']
def fetch(
self,
begin: datetime | None = None,
end: datetime | None = None,
):
now = begin or round_time(datetime.now(timezone.utc))
return [
{'ts': now, 'id': 1, 'vl': randint(1, 100)},
{'ts': now, 'id': 2, 'vl': randint(1, 100)},
{'ts': now, 'id': 3, 'vl': randint(1, 100)},
]
foo_conn = mrsm.get_connector(
'foo:bar',
username='foo',
password='bar',
)
docs = foo_conn.fetch()
Build a Pipe
Build a meerschaum.Pipe
in-memory:
from datetime import datetime
import meerschaum as mrsm
pipe = mrsm.Pipe(
foo_conn, 'demo',
instance=sql_conn,
columns={'datetime': 'ts', 'id': 'id'},
tags=['production'],
)
pipe.sync(begin=datetime(2024, 1, 1))
df = pipe.get_data()
print(df)
# ts id vl
# 0 2024-01-01 1 97
# 1 2024-01-01 2 18
# 2 2024-01-01 3 96
Add temporary=True
to skip registering the pipe in the pipes table.
Get Registered Pipes
The meerschaum.get_pipes()
function returns a dictionary hierarchy of pipes by connector, metric, and location:
import meerschaum as mrsm
pipes = mrsm.get_pipes(instance='sql:temp')
pipe = pipes['foo:bar']['demo'][None]
Add as_list=True
to flatten the hierarchy:
import meerschaum as mrsm
pipes = mrsm.get_pipes(
tags=['production'],
instance=sql_conn,
as_list=True,
)
print(pipes)
# [Pipe('foo:bar', 'demo', instance='sql:temp')]
Import Plugins
You can import a plugin's module through meerschaum.Plugin.module
:
import meerschaum as mrsm
plugin = mrsm.Plugin('noaa')
with mrsm.Venv(plugin):
noaa = plugin.module
If your plugin has submodules, use meerschaum.plugins.from_plugin_import
:
from meerschaum.plugins import from_plugin_import
get_defined_pipes = from_plugin_import('compose.utils.pipes', 'get_defined_pipes')
Import multiple plugins with meerschaum.plugins.import_plugins
:
from meerschaum.plugins import import_plugins
noaa, compose = import_plugins('noaa', 'compose')
Create a Job
Create a meerschaum.Job
with name
and sysargs
:
import meerschaum as mrsm
job = mrsm.Job('syncing-engine', 'sync pipes --loop')
success, msg = job.start()
Pass executor_keys
as the connectors keys of an API instance to create a remote job:
import meerschaum as mrsm
job = mrsm.Job(
'foo',
'sync pipes -s daily',
executor_keys='api:main',
)
Import from a Virtual Environment
Use the meerschaum.Venv
context manager to activate a virtual environment:
import meerschaum as mrsm
with mrsm.Venv('noaa'):
import requests
print(requests.__file__)
# /home/bmeares/.config/meerschaum/venvs/noaa/lib/python3.12/site-packages/requests/__init__.py
To import packages which may not be installed, use meerschaum.attempt_import()
:
import meerschaum as mrsm
requests = mrsm.attempt_import('requests', venv='noaa')
print(requests.__file__)
# /home/bmeares/.config/meerschaum/venvs/noaa/lib/python3.12/site-packages/requests/__init__.py
Run Actions
Run sysargs
with meerschaum.entry()
:
import meerschaum as mrsm
success, msg = mrsm.entry('show pipes + show version : x2')
Use meerschaum.actions.get_action()
to access an action function directly:
from meerschaum.actions import get_action
show_pipes = get_action(['show', 'pipes'])
success, msg = show_pipes(connector_keys=['plugin:noaa'])
Get a dictionary of available subactions with meerschaum.actions.get_subactions()
:
from meerschaum.actions import get_subactions
subactions = get_subactions('show')
success, msg = subactions['pipes']()
Create a Plugin
Run bootstrap plugin
to create a new plugin:
mrsm bootstrap plugin example
This will create example.py
in your plugins directory (default ~/.config/meerschaum/plugins/
, Windows: %APPDATA%\Meerschaum\plugins
). You may paste the example code from the "Create a Custom Action" example below.
Open your plugin with edit plugin
:
mrsm edit plugin example
Run edit plugin
and paste the example code below to try out the features.
See the writing plugins guide for more in-depth documentation.
Create a Custom Action
Decorate a function with meerschaum.actions.make_action
to designate it as an action. Subactions will be automatically detected if not decorated:
from meerschaum.actions import make_action
@make_action
def sing():
print('What would you like me to sing?')
return True, "Success"
def sing_tune():
return False, "I don't know that song!"
def sing_song():
print('Hello, World!')
return True, "Success"
Use meerschaum.plugins.add_plugin_argument()
to create new parameters for your action:
from meerschaum.plugins import make_action, add_plugin_argument
add_plugin_argument(
'--song', type=str, help='What song to sing.',
)
@make_action
def sing_melody(action=None, song=None):
to_sing = action[0] if action else song
if not to_sing:
return False, "Please tell me what to sing!"
return True, f'~I am singing {to_sing}~'
mrsm sing melody lalala
mrsm sing melody --song do-re-mi
Add a Page to the Web Dashboard
Use the decorators meerschaum.plugins.dash_plugin()
and meerschaum.plugins.web_page()
to add new pages to the web dashboard:
from meerschaum.plugins import dash_plugin, web_page
@dash_plugin
def init_dash(dash_app):
import dash.html as html
import dash_bootstrap_components as dbc
from dash import Input, Output, no_update
### Routes to '/dash/my-page'
@web_page('/my-page', login_required=False)
def my_page():
return dbc.Container([
html.H1("Hello, World!"),
dbc.Button("Click me", id='my-button'),
html.Div(id="my-output-div"),
])
@dash_app.callback(
Output('my-output-div', 'children'),
Input('my-button', 'n_clicks'),
)
def my_button_click(n_clicks):
if not n_clicks:
return no_update
return html.P(f'You clicked {n_clicks} times!')
Submodules
meerschaum.actions
Access functions for actions and subactions.
meerschaum.actions.actions
meerschaum.actions.get_action()
meerschaum.actions.get_completer()
meerschaum.actions.get_main_action_name()
meerschaum.actions.get_subactions()
meerschaum.config
Read and write the Meerschaum configuration registry.
meerschaum.config.get_config()
meerschaum.config.get_plugin_config()
meerschaum.config.write_config()
meerschaum.config.write_plugin_config()
meerschaum.connectors
Build connectors to interact with databases and fetch data.
meerschaum.connectors.get_connector()
meerschaum.connectors.make_connector()
meerschaum.connectors.is_connected()
meerschaum.connectors.poll.retry_connect()
meerschaum.connectors.Connector
meerschaum.connectors.sql.SQLConnector
meerschaum.connectors.api.APIConnector
meerschaum.connectors.valkey.ValkeyConnector
meerschaum.jobs
Start background jobs.
meerschaum.jobs.Job
meerschaum.jobs.Executor
meerschaum.jobs.systemd.SystemdExecutor
meerschaum.jobs.get_jobs()
meerschaum.jobs.get_filtered_jobs()
meerschaum.jobs.get_running_jobs()
meerschaum.jobs.get_stopped_jobs()
meerschaum.jobs.get_paused_jobs()
meerschaum.jobs.get_restart_jobs()
meerschaum.jobs.make_executor()
meerschaum.jobs.check_restart_jobs()
meerschaum.jobs.start_check_jobs_thread()
meerschaum.jobs.stop_check_jobs_thread()
meerschaum.plugins
Access plugin modules and other API utilties.
meerschaum.plugins.Plugin
meerschaum.plugins.api_plugin()
meerschaum.plugins.dash_plugin()
meerschaum.plugins.import_plugins()
meerschaum.plugins.reload_plugins()
meerschaum.plugins.get_plugins()
meerschaum.plugins.get_data_plugins()
meerschaum.plugins.add_plugin_argument()
meerschaum.plugins.pre_sync_hook()
meerschaum.plugins.post_sync_hook()
meerschaum.utils
Utility functions are available in several submodules:
meerschaum.utils.daemon.daemon_entry()
meerschaum.utils.daemon.daemon_action()
meerschaum.utils.daemon.get_daemons()
meerschaum.utils.daemon.get_daemon_ids()
meerschaum.utils.daemon.get_running_daemons()
meerschaum.utils.daemon.get_paused_daemons()
meerschaum.utils.daemon.get_stopped_daemons()
meerschaum.utils.daemon.get_filtered_daemons()
meerschaum.utils.daemon.run_daemon()
meerschaum.utils.daemon.Daemon
meerschaum.utils.daemon.FileDescriptorInterceptor
meerschaum.utils.daemon.RotatingFile
meerschaum.utils.daemon
Manage background jobs.
meerschaum.utils.dataframe.add_missing_cols_to_df()
meerschaum.utils.dataframe.df_is_chunk_generator()
meerschaum.utils.dataframe.enforce_dtypes()
meerschaum.utils.dataframe.filter_unseen_df()
meerschaum.utils.dataframe.get_datetime_bound_from_df()
meerschaum.utils.dataframe.get_first_valid_dask_partition()
meerschaum.utils.dataframe.get_json_cols()
meerschaum.utils.dataframe.get_numeric_cols()
meerschaum.utils.dataframe.get_unhashable_cols()
meerschaum.utils.dataframe.parse_df_datetimes()
meerschaum.utils.dataframe.query_df()
meerschaum.utils.dataframe.to_json()
meerschaum.utils.dataframe
Manipulate dataframes.
meerschaum.utils.dtypes.are_dtypes_equal()
meerschaum.utils.dtypes.attempt_cast_to_numeric()
meerschaum.utils.dtypes.is_dtype_numeric()
meerschaum.utils.dtypes.none_if_null()
meerschaum.utils.dtypes.quantize_decimal()
meerschaum.utils.dtypes.to_pandas_dtype()
meerschaum.utils.dtypes.value_is_null()
meerschaum.utils.dtypes.sql.get_pd_type_from_db_type()
meerschaum.utils.dtypes.sql.get_db_type_from_pd_type()
meerschaum.utils.dtypes
Work with data types.
meerschaum.utils.formatting.colored()
meerschaum.utils.formatting.extract_stats_from_message()
meerschaum.utils.formatting.fill_ansi()
meerschaum.utils.formatting.get_console()
meerschaum.utils.formatting.highlight_pipes()
meerschaum.utils.formatting.make_header()
meerschaum.utils.formatting.pipe_repr()
meerschaum.utils.formatting.pprint()
meerschaum.utils.formatting.pprint_pipes()
meerschaum.utils.formatting.print_options()
meerschaum.utils.formatting.print_pipes_results()
meerschaum.utils.formatting.print_tuple()
meerschaum.utils.formatting.translate_rich_to_termcolor()
meerschaum.utils.formatting
Format output text.
meerschaum.utils.misc.items_str()
meerschaum.utils.misc.round_time()
meerschaum.utils.misc.is_int()
meerschaum.utils.misc.interval_str()
meerschaum.utils.misc.filter_keywords()
meerschaum.utils.misc.generate_password()
meerschaum.utils.misc.string_to_dict()
meerschaum.utils.misc.iterate_chunks()
meerschaum.utils.misc.timed_input()
meerschaum.utils.misc.replace_pipes_in_dict()
meerschaum.utils.misc.is_valid_email()
meerschaum.utils.misc.string_width()
meerschaum.utils.misc.replace_password()
meerschaum.utils.misc.parse_config_substitution()
meerschaum.utils.misc.edit_file()
meerschaum.utils.misc.get_in_ex_params()
meerschaum.utils.misc.separate_negation_values()
meerschaum.utils.misc.flatten_list()
meerschaum.utils.misc.make_symlink()
meerschaum.utils.misc.is_symlink()
meerschaum.utils.misc.wget()
meerschaum.utils.misc.add_method_to_class()
meerschaum.utils.misc.is_pipe_registered()
meerschaum.utils.misc.get_cols_lines()
meerschaum.utils.misc.sorted_dict()
meerschaum.utils.misc.flatten_pipes_dict()
meerschaum.utils.misc.dict_from_od()
meerschaum.utils.misc.remove_ansi()
meerschaum.utils.misc.get_connector_labels()
meerschaum.utils.misc.json_serialize_datetime()
meerschaum.utils.misc.async_wrap()
meerschaum.utils.misc.is_docker_available()
meerschaum.utils.misc.is_android()
meerschaum.utils.misc.is_bcp_available()
meerschaum.utils.misc.truncate_string_sections()
meerschaum.utils.misc.safely_extract_tar()
meerschaum.utils.misc
Miscellaneous utility functions.
meerschaum.utils.packages.attempt_import()
meerschaum.utils.packages.get_module_path()
meerschaum.utils.packages.manually_import_module()
meerschaum.utils.packages.get_install_no_version()
meerschaum.utils.packages.determine_version()
meerschaum.utils.packages.need_update()
meerschaum.utils.packages.get_pip()
meerschaum.utils.packages.pip_install()
meerschaum.utils.packages.pip_uninstall()
meerschaum.utils.packages.completely_uninstall_package()
meerschaum.utils.packages.run_python_package()
meerschaum.utils.packages.lazy_import()
meerschaum.utils.packages.pandas_name()
meerschaum.utils.packages.import_pandas()
meerschaum.utils.packages.import_rich()
meerschaum.utils.packages.import_dcc()
meerschaum.utils.packages.import_html()
meerschaum.utils.packages.get_modules_from_package()
meerschaum.utils.packages.import_children()
meerschaum.utils.packages.reload_package()
meerschaum.utils.packages.reload_meerschaum()
meerschaum.utils.packages.is_installed()
meerschaum.utils.packages.venv_contains_package()
meerschaum.utils.packages.package_venv()
meerschaum.utils.packages.ensure_readline()
meerschaum.utils.packages.get_prerelease_dependencies()
meerschaum.utils.packages
Manage Python packages.
meerschaum.utils.sql.build_where()
meerschaum.utils.sql.clean()
meerschaum.utils.sql.dateadd_str()
meerschaum.utils.sql.test_connection()
meerschaum.utils.sql.get_distinct_col_count()
meerschaum.utils.sql.sql_item_name()
meerschaum.utils.sql.pg_capital()
meerschaum.utils.sql.oracle_capital()
meerschaum.utils.sql.truncate_item_name()
meerschaum.utils.sql.table_exists()
meerschaum.utils.sql.get_table_cols_types()
meerschaum.utils.sql.get_update_queries()
meerschaum.utils.sql.get_null_replacement()
meerschaum.utils.sql.get_db_version()
meerschaum.utils.sql.get_rename_table_queries()
meerschaum.utils.sql.get_create_table_queries()
meerschaum.utils.sql.wrap_query_with_cte()
meerschaum.utils.sql.format_cte_subquery()
meerschaum.utils.sql.session_execute()
meerschaum.utils.sql.get_reset_autoincrement_queries()
meerschaum.utils.sql
Build SQL queries.
meerschaum.utils.venv.Venv
meerschaum.utils.venv.activate_venv()
meerschaum.utils.venv.deactivate_venv()
meerschaum.utils.venv.get_module_venv()
meerschaum.utils.venv.get_venvs()
meerschaum.utils.venv.init_venv()
meerschaum.utils.venv.inside_venv()
meerschaum.utils.venv.is_venv_active()
meerschaum.utils.venv.venv_exec()
meerschaum.utils.venv.venv_executable()
meerschaum.utils.venv.venv_exists()
meerschaum.utils.venv.venv_target_path()
meerschaum.utils.venv.verify_venv()
meerschaum.utils.venv
Manage virtual environments.
meerschaum.utils.warnings
Print warnings, errors, info, and debug messages.
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Copyright 2023 Bennett Meares 7 8Licensed under the Apache License, Version 2.0 (the "License"); 9you may not use this file except in compliance with the License. 10You may obtain a copy of the License at 11 12 http://www.apache.org/licenses/LICENSE-2.0 13 14Unless required by applicable law or agreed to in writing, software 15distributed under the License is distributed on an "AS IS" BASIS, 16WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 17See the License for the specific language governing permissions and 18limitations under the License. 19""" 20 21import atexit 22from meerschaum.utils.typing import SuccessTuple 23from meerschaum.utils.packages import attempt_import 24from meerschaum.core.Pipe import Pipe 25from meerschaum.plugins import Plugin 26from meerschaum.utils.venv import Venv 27from meerschaum.jobs import Job, make_executor 28from meerschaum.connectors import get_connector, Connector, make_connector 29from meerschaum.utils import get_pipes 30from meerschaum.utils.formatting import pprint 31from meerschaum._internal.docs import index as __doc__ 32from meerschaum.config import __version__, get_config 33from meerschaum._internal.entry import entry 34from meerschaum.__main__ import _close_pools 35 36atexit.register(_close_pools) 37 38__pdoc__ = {'gui': False, 'api': False, 'core': False, '_internal': False} 39__all__ = ( 40 "get_pipes", 41 "get_connector", 42 "get_config", 43 "Pipe", 44 "Plugin", 45 "Venv", 46 "Plugin", 47 "Job", 48 "pprint", 49 "attempt_import", 50 "actions", 51 "config", 52 "connectors", 53 "jobs", 54 "plugins", 55 "utils", 56 "SuccessTuple", 57 "Connector", 58 "make_connector", 59 "entry", 60)
19def get_pipes( 20 connector_keys: Union[str, List[str], None] = None, 21 metric_keys: Union[str, List[str], None] = None, 22 location_keys: Union[str, List[str], None] = None, 23 tags: Optional[List[str]] = None, 24 params: Optional[Dict[str, Any]] = None, 25 mrsm_instance: Union[str, InstanceConnector, None] = None, 26 instance: Union[str, InstanceConnector, None] = None, 27 as_list: bool = False, 28 method: str = 'registered', 29 debug: bool = False, 30 **kw: Any 31) -> Union[PipesDict, List[mrsm.Pipe]]: 32 """ 33 Return a dictionary or list of `meerschaum.Pipe` objects. 34 35 Parameters 36 ---------- 37 connector_keys: Union[str, List[str], None], default None 38 String or list of connector keys. 39 If omitted or is `'*'`, fetch all possible keys. 40 If a string begins with `'_'`, select keys that do NOT match the string. 41 42 metric_keys: Union[str, List[str], None], default None 43 String or list of metric keys. See `connector_keys` for formatting. 44 45 location_keys: Union[str, List[str], None], default None 46 String or list of location keys. See `connector_keys` for formatting. 47 48 tags: Optional[List[str]], default None 49 If provided, only include pipes with these tags. 50 51 params: Optional[Dict[str, Any]], default None 52 Dictionary of additional parameters to search by. 53 Params are parsed into a SQL WHERE clause. 54 E.g. `{'a': 1, 'b': 2}` equates to `'WHERE a = 1 AND b = 2'` 55 56 mrsm_instance: Union[str, InstanceConnector, None], default None 57 Connector keys for the Meerschaum instance of the pipes. 58 Must be a `meerschaum.connectors.sql.SQLConnector.SQLConnector` or 59 `meerschaum.connectors.api.APIConnector.APIConnector`. 60 61 as_list: bool, default False 62 If `True`, return pipes in a list instead of a hierarchical dictionary. 63 `False` : `{connector_keys: {metric_key: {location_key: Pipe}}}` 64 `True` : `[Pipe]` 65 66 method: str, default 'registered' 67 Available options: `['registered', 'explicit', 'all']` 68 If `'registered'` (default), create pipes based on registered keys in the connector's pipes table 69 (API or SQL connector, depends on mrsm_instance). 70 If `'explicit'`, create pipes from provided connector_keys, metric_keys, and location_keys 71 instead of consulting the pipes table. Useful for creating non-existent pipes. 72 If `'all'`, create pipes from predefined metrics and locations. Required `connector_keys`. 73 **NOTE:** Method `'all'` is not implemented! 74 75 **kw: Any: 76 Keyword arguments to pass to the `meerschaum.Pipe` constructor. 77 78 79 Returns 80 ------- 81 A dictionary of dictionaries and `meerschaum.Pipe` objects 82 in the connector, metric, location hierarchy. 83 If `as_list` is `True`, return a list of `meerschaum.Pipe` objects. 84 85 Examples 86 -------- 87 ``` 88 >>> ### Manual definition: 89 >>> pipes = { 90 ... <connector_keys>: { 91 ... <metric_key>: { 92 ... <location_key>: Pipe( 93 ... <connector_keys>, 94 ... <metric_key>, 95 ... <location_key>, 96 ... ), 97 ... }, 98 ... }, 99 ... }, 100 >>> ### Accessing a single pipe: 101 >>> pipes['sql:main']['weather'][None] 102 >>> ### Return a list instead: 103 >>> get_pipes(as_list=True) 104 [sql_main_weather] 105 >>> 106 ``` 107 """ 108 109 from meerschaum.config import get_config 110 from meerschaum.utils.warnings import error 111 from meerschaum.utils.misc import filter_keywords 112 113 if connector_keys is None: 114 connector_keys = [] 115 if metric_keys is None: 116 metric_keys = [] 117 if location_keys is None: 118 location_keys = [] 119 if params is None: 120 params = {} 121 if tags is None: 122 tags = [] 123 124 if isinstance(connector_keys, str): 125 connector_keys = [connector_keys] 126 if isinstance(metric_keys, str): 127 metric_keys = [metric_keys] 128 if isinstance(location_keys, str): 129 location_keys = [location_keys] 130 131 ### Get SQL or API connector (keys come from `connector.fetch_pipes_keys()`). 132 if mrsm_instance is None: 133 mrsm_instance = instance 134 if mrsm_instance is None: 135 mrsm_instance = get_config('meerschaum', 'instance', patch=True) 136 if isinstance(mrsm_instance, str): 137 from meerschaum.connectors.parse import parse_instance_keys 138 connector = parse_instance_keys(keys=mrsm_instance, debug=debug) 139 else: 140 from meerschaum.connectors import instance_types 141 valid_connector = False 142 if hasattr(mrsm_instance, 'type'): 143 if mrsm_instance.type in instance_types: 144 valid_connector = True 145 if not valid_connector: 146 error(f"Invalid instance connector: {mrsm_instance}") 147 connector = mrsm_instance 148 if debug: 149 from meerschaum.utils.debug import dprint 150 dprint(f"Using instance connector: {connector}") 151 if not connector: 152 error(f"Could not create connector from keys: '{mrsm_instance}'") 153 154 ### Get a list of tuples for the keys needed to build pipes. 155 result = fetch_pipes_keys( 156 method, 157 connector, 158 connector_keys = connector_keys, 159 metric_keys = metric_keys, 160 location_keys = location_keys, 161 tags = tags, 162 params = params, 163 debug = debug 164 ) 165 if result is None: 166 error(f"Unable to build pipes!") 167 168 ### Populate the `pipes` dictionary with Pipes based on the keys 169 ### obtained from the chosen `method`. 170 from meerschaum import Pipe 171 pipes = {} 172 for ck, mk, lk in result: 173 if ck not in pipes: 174 pipes[ck] = {} 175 176 if mk not in pipes[ck]: 177 pipes[ck][mk] = {} 178 179 pipes[ck][mk][lk] = Pipe( 180 ck, mk, lk, 181 mrsm_instance = connector, 182 debug = debug, 183 **filter_keywords(Pipe, **kw) 184 ) 185 186 if not as_list: 187 return pipes 188 from meerschaum.utils.misc import flatten_pipes_dict 189 return flatten_pipes_dict(pipes)
Return a dictionary or list of meerschaum.Pipe
objects.
Parameters
- connector_keys (Union[str, List[str], None], default None):
String or list of connector keys.
If omitted or is
'*'
, fetch all possible keys. If a string begins with'_'
, select keys that do NOT match the string. - metric_keys (Union[str, List[str], None], default None):
String or list of metric keys. See
connector_keys
for formatting. - location_keys (Union[str, List[str], None], default None):
String or list of location keys. See
connector_keys
for formatting. - tags (Optional[List[str]], default None): If provided, only include pipes with these tags.
- params (Optional[Dict[str, Any]], default None):
Dictionary of additional parameters to search by.
Params are parsed into a SQL WHERE clause.
E.g.
{'a': 1, 'b': 2}
equates to'WHERE a = 1 AND b = 2'
- mrsm_instance (Union[str, InstanceConnector, None], default None):
Connector keys for the Meerschaum instance of the pipes.
Must be a
meerschaum.connectors.sql.SQLConnector.SQLConnector
ormeerschaum.connectors.api.APIConnector.APIConnector
. - as_list (bool, default False):
If
True
, return pipes in a list instead of a hierarchical dictionary.False
:{connector_keys: {metric_key: {location_key: Pipe}}}
True
:[Pipe]
- method (str, default 'registered'):
Available options:
['registered', 'explicit', 'all']
If'registered'
(default), create pipes based on registered keys in the connector's pipes table (API or SQL connector, depends on mrsm_instance). If'explicit'
, create pipes from provided connector_keys, metric_keys, and location_keys instead of consulting the pipes table. Useful for creating non-existent pipes. If'all'
, create pipes from predefined metrics and locations. Requiredconnector_keys
. NOTE: Method'all'
is not implemented! - **kw (Any:):
Keyword arguments to pass to the
meerschaum.Pipe
constructor.
Returns
- A dictionary of dictionaries and
meerschaum.Pipe
objects - in the connector, metric, location hierarchy.
- If
as_list
isTrue
, return a list ofmeerschaum.Pipe
objects.
Examples
>>> ### Manual definition:
>>> pipes = {
... <connector_keys>: {
... <metric_key>: {
... <location_key>: Pipe(
... <connector_keys>,
... <metric_key>,
... <location_key>,
... ),
... },
... },
... },
>>> ### Accessing a single pipe:
>>> pipes['sql:main']['weather'][None]
>>> ### Return a list instead:
>>> get_pipes(as_list=True)
[sql_main_weather]
>>>
80def get_connector( 81 type: str = None, 82 label: str = None, 83 refresh: bool = False, 84 debug: bool = False, 85 **kw: Any 86) -> Connector: 87 """ 88 Return existing connector or create new connection and store for reuse. 89 90 You can create new connectors if enough parameters are provided for the given type and flavor. 91 92 93 Parameters 94 ---------- 95 type: Optional[str], default None 96 Connector type (sql, api, etc.). 97 Defaults to the type of the configured `instance_connector`. 98 99 label: Optional[str], default None 100 Connector label (e.g. main). Defaults to `'main'`. 101 102 refresh: bool, default False 103 Refresh the Connector instance / construct new object. Defaults to `False`. 104 105 kw: Any 106 Other arguments to pass to the Connector constructor. 107 If the Connector has already been constructed and new arguments are provided, 108 `refresh` is set to `True` and the old Connector is replaced. 109 110 Returns 111 ------- 112 A new Meerschaum connector (e.g. `meerschaum.connectors.api.APIConnector`, 113 `meerschaum.connectors.sql.SQLConnector`). 114 115 Examples 116 -------- 117 The following parameters would create a new 118 `meerschaum.connectors.sql.SQLConnector` that isn't in the configuration file. 119 120 ``` 121 >>> conn = get_connector( 122 ... type = 'sql', 123 ... label = 'newlabel', 124 ... flavor = 'sqlite', 125 ... database = '/file/path/to/database.db' 126 ... ) 127 >>> 128 ``` 129 130 """ 131 from meerschaum.connectors.parse import parse_instance_keys 132 from meerschaum.config import get_config 133 from meerschaum.config.static import STATIC_CONFIG 134 from meerschaum.utils.warnings import warn 135 global _loaded_plugin_connectors 136 if isinstance(type, str) and not label and ':' in type: 137 type, label = type.split(':', maxsplit=1) 138 139 with _locks['_loaded_plugin_connectors']: 140 if not _loaded_plugin_connectors: 141 load_plugin_connectors() 142 _load_builtin_custom_connectors() 143 _loaded_plugin_connectors = True 144 145 if type is None and label is None: 146 default_instance_keys = get_config('meerschaum', 'instance', patch=True) 147 ### recursive call to get_connector 148 return parse_instance_keys(default_instance_keys) 149 150 ### NOTE: the default instance connector may not be main. 151 ### Only fall back to 'main' if the type is provided by the label is omitted. 152 label = label if label is not None else STATIC_CONFIG['connectors']['default_label'] 153 154 ### type might actually be a label. Check if so and raise a warning. 155 if type not in connectors: 156 possibilities, poss_msg = [], "" 157 for _type in get_config('meerschaum', 'connectors'): 158 if type in get_config('meerschaum', 'connectors', _type): 159 possibilities.append(f"{_type}:{type}") 160 if len(possibilities) > 0: 161 poss_msg = " Did you mean" 162 for poss in possibilities[:-1]: 163 poss_msg += f" '{poss}'," 164 if poss_msg.endswith(','): 165 poss_msg = poss_msg[:-1] 166 if len(possibilities) > 1: 167 poss_msg += " or" 168 poss_msg += f" '{possibilities[-1]}'?" 169 170 warn(f"Cannot create Connector of type '{type}'." + poss_msg, stack=False) 171 return None 172 173 if 'sql' not in types: 174 from meerschaum.connectors.plugin import PluginConnector 175 from meerschaum.connectors.valkey import ValkeyConnector 176 with _locks['types']: 177 types.update({ 178 'api': APIConnector, 179 'sql': SQLConnector, 180 'plugin': PluginConnector, 181 'valkey': ValkeyConnector, 182 }) 183 184 ### determine if we need to call the constructor 185 if not refresh: 186 ### see if any user-supplied arguments differ from the existing instance 187 if label in connectors[type]: 188 warning_message = None 189 for attribute, value in kw.items(): 190 if attribute not in connectors[type][label].meta: 191 import inspect 192 cls = connectors[type][label].__class__ 193 cls_init_signature = inspect.signature(cls) 194 cls_init_params = cls_init_signature.parameters 195 if attribute not in cls_init_params: 196 warning_message = ( 197 f"Received new attribute '{attribute}' not present in connector " + 198 f"{connectors[type][label]}.\n" 199 ) 200 elif connectors[type][label].__dict__[attribute] != value: 201 warning_message = ( 202 f"Mismatched values for attribute '{attribute}' in connector " 203 + f"'{connectors[type][label]}'.\n" + 204 f" - Keyword value: '{value}'\n" + 205 f" - Existing value: '{connectors[type][label].__dict__[attribute]}'\n" 206 ) 207 if warning_message is not None: 208 warning_message += ( 209 "\nSetting `refresh` to True and recreating connector with type:" 210 + f" '{type}' and label '{label}'." 211 ) 212 refresh = True 213 warn(warning_message) 214 else: ### connector doesn't yet exist 215 refresh = True 216 217 ### only create an object if refresh is True 218 ### (can be manually specified, otherwise determined above) 219 if refresh: 220 with _locks['connectors']: 221 try: 222 ### will raise an error if configuration is incorrect / missing 223 conn = types[type](label=label, **kw) 224 connectors[type][label] = conn 225 except InvalidAttributesError as ie: 226 warn( 227 f"Incorrect attributes for connector '{type}:{label}'.\n" 228 + str(ie), 229 stack = False, 230 ) 231 conn = None 232 except Exception as e: 233 from meerschaum.utils.formatting import get_console 234 console = get_console() 235 if console: 236 console.print_exception() 237 warn( 238 f"Exception when creating connector '{type}:{label}'.\n" + str(e), 239 stack = False, 240 ) 241 conn = None 242 if conn is None: 243 return None 244 245 return connectors[type][label]
Return existing connector or create new connection and store for reuse.
You can create new connectors if enough parameters are provided for the given type and flavor.
Parameters
- type (Optional[str], default None):
Connector type (sql, api, etc.).
Defaults to the type of the configured
instance_connector
. - label (Optional[str], default None):
Connector label (e.g. main). Defaults to
'main'
. - refresh (bool, default False):
Refresh the Connector instance / construct new object. Defaults to
False
. - kw (Any):
Other arguments to pass to the Connector constructor.
If the Connector has already been constructed and new arguments are provided,
refresh
is set toTrue
and the old Connector is replaced.
Returns
- A new Meerschaum connector (e.g.
meerschaum.connectors.api.APIConnector
, meerschaum.connectors.sql.SQLConnector
).
Examples
The following parameters would create a new
meerschaum.connectors.sql.SQLConnector
that isn't in the configuration file.
>>> conn = get_connector(
... type = 'sql',
... label = 'newlabel',
... flavor = 'sqlite',
... database = '/file/path/to/database.db'
... )
>>>
82def get_config( 83 *keys: str, 84 patch: bool = True, 85 substitute: bool = True, 86 sync_files: bool = True, 87 write_missing: bool = True, 88 as_tuple: bool = False, 89 warn: bool = True, 90 debug: bool = False 91) -> Any: 92 """ 93 Return the Meerschaum configuration dictionary. 94 If positional arguments are provided, index by the keys. 95 Raises a warning if invalid keys are provided. 96 97 Parameters 98 ---------- 99 keys: str: 100 List of strings to index. 101 102 patch: bool, default True 103 If `True`, patch missing default keys into the config directory. 104 Defaults to `True`. 105 106 sync_files: bool, default True 107 If `True`, sync files if needed. 108 Defaults to `True`. 109 110 write_missing: bool, default True 111 If `True`, write default values when the main config files are missing. 112 Defaults to `True`. 113 114 substitute: bool, default True 115 If `True`, subsitute 'MRSM{}' values. 116 Defaults to `True`. 117 118 as_tuple: bool, default False 119 If `True`, return a tuple of type (success, value). 120 Defaults to `False`. 121 122 Returns 123 ------- 124 The value in the configuration directory, indexed by the provided keys. 125 126 Examples 127 -------- 128 >>> get_config('meerschaum', 'instance') 129 'sql:main' 130 >>> get_config('does', 'not', 'exist') 131 UserWarning: Invalid keys in config: ('does', 'not', 'exist') 132 """ 133 import json 134 135 symlinks_key = STATIC_CONFIG['config']['symlinks_key'] 136 if debug: 137 from meerschaum.utils.debug import dprint 138 dprint(f"Indexing keys: {keys}", color=False) 139 140 if len(keys) == 0: 141 _rc = _config(substitute=substitute, sync_files=sync_files, write_missing=write_missing) 142 if as_tuple: 143 return True, _rc 144 return _rc 145 146 ### Weird threading issues, only import if substitute is True. 147 if substitute: 148 from meerschaum.config._read_config import search_and_substitute_config 149 ### Invalidate the cache if it was read before with substitute=False 150 ### but there still exist substitutions. 151 if ( 152 config is not None and substitute and keys[0] != symlinks_key 153 and 'MRSM{' in json.dumps(config.get(keys[0])) 154 ): 155 try: 156 _subbed = search_and_substitute_config({keys[0]: config[keys[0]]}) 157 except Exception as e: 158 import traceback 159 traceback.print_exc() 160 config[keys[0]] = _subbed[keys[0]] 161 if symlinks_key in _subbed: 162 if symlinks_key not in config: 163 config[symlinks_key] = {} 164 if keys[0] not in config[symlinks_key]: 165 config[symlinks_key][keys[0]] = {} 166 config[symlinks_key][keys[0]] = apply_patch_to_config( 167 _subbed, 168 config[symlinks_key][keys[0]] 169 ) 170 171 from meerschaum.config._sync import sync_files as _sync_files 172 if config is None: 173 _config(*keys, sync_files=sync_files) 174 175 invalid_keys = False 176 if keys[0] not in config and keys[0] != symlinks_key: 177 single_key_config = read_config( 178 keys=[keys[0]], substitute=substitute, write_missing=write_missing 179 ) 180 if keys[0] not in single_key_config: 181 invalid_keys = True 182 else: 183 config[keys[0]] = single_key_config.get(keys[0], None) 184 if symlinks_key in single_key_config and keys[0] in single_key_config[symlinks_key]: 185 if symlinks_key not in config: 186 config[symlinks_key] = {} 187 config[symlinks_key][keys[0]] = single_key_config[symlinks_key][keys[0]] 188 189 if sync_files: 190 _sync_files(keys=[keys[0]]) 191 192 c = config 193 if len(keys) > 0: 194 for k in keys: 195 try: 196 c = c[k] 197 except Exception as e: 198 invalid_keys = True 199 break 200 if invalid_keys: 201 ### Check if the keys are in the default configuration. 202 from meerschaum.config._default import default_config 203 in_default = True 204 patched_default_config = ( 205 search_and_substitute_config(default_config) 206 if substitute else copy.deepcopy(default_config) 207 ) 208 _c = patched_default_config 209 for k in keys: 210 try: 211 _c = _c[k] 212 except Exception as e: 213 in_default = False 214 if in_default: 215 c = _c 216 invalid_keys = False 217 warning_msg = f"Invalid keys in config: {keys}" 218 if not in_default: 219 try: 220 if warn: 221 from meerschaum.utils.warnings import warn as _warn 222 _warn(warning_msg, stacklevel=3, color=False) 223 except Exception as e: 224 if warn: 225 print(warning_msg) 226 if as_tuple: 227 return False, None 228 return None 229 230 ### Don't write keys that we haven't yet loaded into memory. 231 not_loaded_keys = [k for k in patched_default_config if k not in config] 232 for k in not_loaded_keys: 233 patched_default_config.pop(k, None) 234 235 set_config( 236 apply_patch_to_config( 237 patched_default_config, 238 config, 239 ) 240 ) 241 if patch and keys[0] != symlinks_key: 242 if write_missing: 243 write_config(config, debug=debug) 244 245 if as_tuple: 246 return (not invalid_keys), c 247 return c
Return the Meerschaum configuration dictionary. If positional arguments are provided, index by the keys. Raises a warning if invalid keys are provided.
Parameters
- keys (str:): List of strings to index.
- patch (bool, default True):
If
True
, patch missing default keys into the config directory. Defaults toTrue
. - sync_files (bool, default True):
If
True
, sync files if needed. Defaults toTrue
. - write_missing (bool, default True):
If
True
, write default values when the main config files are missing. Defaults toTrue
. - substitute (bool, default True):
If
True
, subsitute 'MRSM{}' values. Defaults toTrue
. - as_tuple (bool, default False):
If
True
, return a tuple of type (success, value). Defaults toFalse
.
Returns
- The value in the configuration directory, indexed by the provided keys.
Examples
>>> get_config('meerschaum', 'instance')
'sql:main'
>>> get_config('does', 'not', 'exist')
UserWarning: Invalid keys in config: ('does', 'not', 'exist')
60class Pipe: 61 """ 62 Access Meerschaum pipes via Pipe objects. 63 64 Pipes are identified by the following: 65 66 1. Connector keys (e.g. `'sql:main'`) 67 2. Metric key (e.g. `'weather'`) 68 3. Location (optional; e.g. `None`) 69 70 A pipe's connector keys correspond to a data source, and when the pipe is synced, 71 its `fetch` definition is evaluated and executed to produce new data. 72 73 Alternatively, new data may be directly synced via `pipe.sync()`: 74 75 ``` 76 >>> from meerschaum import Pipe 77 >>> pipe = Pipe('csv', 'weather') 78 >>> 79 >>> import pandas as pd 80 >>> df = pd.read_csv('weather.csv') 81 >>> pipe.sync(df) 82 ``` 83 """ 84 85 from ._fetch import ( 86 fetch, 87 get_backtrack_interval, 88 ) 89 from ._data import ( 90 get_data, 91 get_backtrack_data, 92 get_rowcount, 93 _get_data_as_iterator, 94 get_chunk_interval, 95 get_chunk_bounds, 96 parse_date_bounds, 97 ) 98 from ._register import register 99 from ._attributes import ( 100 attributes, 101 parameters, 102 columns, 103 indices, 104 indexes, 105 dtypes, 106 autoincrement, 107 upsert, 108 static, 109 tzinfo, 110 get_columns, 111 get_columns_types, 112 get_columns_indices, 113 get_indices, 114 tags, 115 get_id, 116 id, 117 get_val_column, 118 parents, 119 children, 120 target, 121 _target_legacy, 122 guess_datetime, 123 ) 124 from ._show import show 125 from ._edit import edit, edit_definition, update 126 from ._sync import ( 127 sync, 128 get_sync_time, 129 exists, 130 filter_existing, 131 _get_chunk_label, 132 get_num_workers, 133 _persist_new_json_columns, 134 _persist_new_numeric_columns, 135 _persist_new_uuid_columns, 136 ) 137 from ._verify import ( 138 verify, 139 get_bound_interval, 140 get_bound_time, 141 ) 142 from ._delete import delete 143 from ._drop import drop 144 from ._clear import clear 145 from ._deduplicate import deduplicate 146 from ._bootstrap import bootstrap 147 from ._dtypes import enforce_dtypes, infer_dtypes 148 from ._copy import copy_to 149 150 def __init__( 151 self, 152 connector: str = '', 153 metric: str = '', 154 location: Optional[str] = None, 155 parameters: Optional[Dict[str, Any]] = None, 156 columns: Union[Dict[str, str], List[str], None] = None, 157 indices: Optional[Dict[str, Union[str, List[str]]]] = None, 158 tags: Optional[List[str]] = None, 159 target: Optional[str] = None, 160 dtypes: Optional[Dict[str, str]] = None, 161 instance: Optional[Union[str, InstanceConnector]] = None, 162 temporary: bool = False, 163 upsert: Optional[bool] = None, 164 autoincrement: Optional[bool] = None, 165 static: Optional[bool] = None, 166 mrsm_instance: Optional[Union[str, InstanceConnector]] = None, 167 cache: bool = False, 168 debug: bool = False, 169 connector_keys: Optional[str] = None, 170 metric_key: Optional[str] = None, 171 location_key: Optional[str] = None, 172 indexes: Union[Dict[str, str], List[str], None] = None, 173 ): 174 """ 175 Parameters 176 ---------- 177 connector: str 178 Keys for the pipe's source connector, e.g. `'sql:main'`. 179 180 metric: str 181 Label for the pipe's contents, e.g. `'weather'`. 182 183 location: str, default None 184 Label for the pipe's location. Defaults to `None`. 185 186 parameters: Optional[Dict[str, Any]], default None 187 Optionally set a pipe's parameters from the constructor, 188 e.g. columns and other attributes. 189 You can edit these parameters with `edit pipes`. 190 191 columns: Union[Dict[str, str], List[str], None], default None 192 Set the `columns` dictionary of `parameters`. 193 If `parameters` is also provided, this dictionary is added under the `'columns'` key. 194 195 indices: Optional[Dict[str, Union[str, List[str]]]], default None 196 Set the `indices` dictionary of `parameters`. 197 If `parameters` is also provided, this dictionary is added under the `'indices'` key. 198 199 tags: Optional[List[str]], default None 200 A list of strings to be added under the `'tags'` key of `parameters`. 201 You can select pipes with certain tags using `--tags`. 202 203 dtypes: Optional[Dict[str, str]], default None 204 Set the `dtypes` dictionary of `parameters`. 205 If `parameters` is also provided, this dictionary is added under the `'dtypes'` key. 206 207 mrsm_instance: Optional[Union[str, InstanceConnector]], default None 208 Connector for the Meerschaum instance where the pipe resides. 209 Defaults to the preconfigured default instance (`'sql:main'`). 210 211 instance: Optional[Union[str, InstanceConnector]], default None 212 Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored. 213 214 upsert: Optional[bool], default None 215 If `True`, set `upsert` to `True` in the parameters. 216 217 autoincrement: Optional[bool], default None 218 If `True`, set `autoincrement` in the parameters. 219 220 static: Optional[bool], default None 221 If `True`, set `static` in the parameters. 222 223 temporary: bool, default False 224 If `True`, prevent instance tables (pipes, users, plugins) from being created. 225 226 cache: bool, default False 227 If `True`, cache fetched data into a local database file. 228 Defaults to `False`. 229 """ 230 from meerschaum.utils.warnings import error, warn 231 if (not connector and not connector_keys) or (not metric and not metric_key): 232 error( 233 "Please provide strings for the connector and metric\n " 234 + "(first two positional arguments)." 235 ) 236 237 ### Fall back to legacy `location_key` just in case. 238 if not location: 239 location = location_key 240 241 if not connector: 242 connector = connector_keys 243 244 if not metric: 245 metric = metric_key 246 247 if location in ('[None]', 'None'): 248 location = None 249 250 from meerschaum.config.static import STATIC_CONFIG 251 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 252 for k in (connector, metric, location, *(tags or [])): 253 if str(k).startswith(negation_prefix): 254 error(f"A pipe's keys and tags cannot start with the prefix '{negation_prefix}'.") 255 256 self.connector_keys = str(connector) 257 self.connector_key = self.connector_keys ### Alias 258 self.metric_key = metric 259 self.location_key = location 260 self.temporary = temporary 261 262 self._attributes = { 263 'connector_keys': self.connector_keys, 264 'metric_key': self.metric_key, 265 'location_key': self.location_key, 266 'parameters': {}, 267 } 268 269 ### only set parameters if values are provided 270 if isinstance(parameters, dict): 271 self._attributes['parameters'] = parameters 272 else: 273 if parameters is not None: 274 warn(f"The provided parameters are of invalid type '{type(parameters)}'.") 275 self._attributes['parameters'] = {} 276 277 columns = columns or self._attributes.get('parameters', {}).get('columns', {}) 278 if isinstance(columns, list): 279 columns = {str(col): str(col) for col in columns} 280 if isinstance(columns, dict): 281 self._attributes['parameters']['columns'] = columns 282 elif columns is not None: 283 warn(f"The provided columns are of invalid type '{type(columns)}'.") 284 285 indices = ( 286 indices 287 or indexes 288 or self._attributes.get('parameters', {}).get('indices', None) 289 or self._attributes.get('parameters', {}).get('indexes', None) 290 ) 291 if isinstance(indices, dict): 292 indices_key = ( 293 'indexes' 294 if 'indexes' in self._attributes['parameters'] 295 else 'indices' 296 ) 297 self._attributes['parameters'][indices_key] = indices 298 299 if isinstance(tags, (list, tuple)): 300 self._attributes['parameters']['tags'] = tags 301 elif tags is not None: 302 warn(f"The provided tags are of invalid type '{type(tags)}'.") 303 304 if isinstance(target, str): 305 self._attributes['parameters']['target'] = target 306 elif target is not None: 307 warn(f"The provided target is of invalid type '{type(target)}'.") 308 309 if isinstance(dtypes, dict): 310 self._attributes['parameters']['dtypes'] = dtypes 311 elif dtypes is not None: 312 warn(f"The provided dtypes are of invalid type '{type(dtypes)}'.") 313 314 if isinstance(upsert, bool): 315 self._attributes['parameters']['upsert'] = upsert 316 317 if isinstance(autoincrement, bool): 318 self._attributes['parameters']['autoincrement'] = autoincrement 319 320 if isinstance(static, bool): 321 self._attributes['parameters']['static'] = static 322 323 ### NOTE: The parameters dictionary is {} by default. 324 ### A Pipe may be registered without parameters, then edited, 325 ### or a Pipe may be registered with parameters set in-memory first. 326 # from meerschaum.config import get_config 327 _mrsm_instance = mrsm_instance if mrsm_instance is not None else instance 328 if _mrsm_instance is None: 329 _mrsm_instance = get_config('meerschaum', 'instance', patch=True) 330 331 if not isinstance(_mrsm_instance, str): 332 self._instance_connector = _mrsm_instance 333 self.instance_keys = str(_mrsm_instance) 334 else: ### NOTE: must be SQL or API Connector for this work 335 self.instance_keys = _mrsm_instance 336 337 self._cache = cache and get_config('system', 'experimental', 'cache') 338 339 @property 340 def meta(self): 341 """ 342 Return the four keys needed to reconstruct this pipe. 343 """ 344 return { 345 'connector': self.connector_keys, 346 'metric': self.metric_key, 347 'location': self.location_key, 348 'instance': self.instance_keys, 349 } 350 351 def keys(self) -> List[str]: 352 """ 353 Return the ordered keys for this pipe. 354 """ 355 return { 356 key: val 357 for key, val in self.meta.items() 358 if key != 'instance' 359 } 360 361 @property 362 def instance_connector(self) -> Union[InstanceConnector, None]: 363 """ 364 The connector to where this pipe resides. 365 May either be of type `meerschaum.connectors.sql.SQLConnector` or 366 `meerschaum.connectors.api.APIConnector`. 367 """ 368 if '_instance_connector' not in self.__dict__: 369 from meerschaum.connectors.parse import parse_instance_keys 370 conn = parse_instance_keys(self.instance_keys) 371 if conn: 372 self._instance_connector = conn 373 else: 374 return None 375 return self._instance_connector 376 377 @property 378 def connector(self) -> Union[meerschaum.connectors.Connector, None]: 379 """ 380 The connector to the data source. 381 """ 382 if '_connector' not in self.__dict__: 383 from meerschaum.connectors.parse import parse_instance_keys 384 import warnings 385 with warnings.catch_warnings(): 386 warnings.simplefilter('ignore') 387 try: 388 conn = parse_instance_keys(self.connector_keys) 389 except Exception as e: 390 conn = None 391 if conn: 392 self._connector = conn 393 else: 394 return None 395 return self._connector 396 397 @property 398 def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]: 399 """ 400 If the pipe was created with `cache=True`, return the connector to the pipe's 401 SQLite database for caching. 402 """ 403 if not self._cache: 404 return None 405 406 if '_cache_connector' not in self.__dict__: 407 from meerschaum.connectors import get_connector 408 from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH 409 _resources_path = SQLITE_RESOURCES_PATH 410 self._cache_connector = get_connector( 411 'sql', '_cache_' + str(self), 412 flavor='sqlite', 413 database=str(_resources_path / ('_cache_' + str(self) + '.db')), 414 ) 415 416 return self._cache_connector 417 418 @property 419 def cache_pipe(self) -> Union['meerschaum.Pipe', None]: 420 """ 421 If the pipe was created with `cache=True`, return another `meerschaum.Pipe` used to 422 manage the local data. 423 """ 424 if self.cache_connector is None: 425 return None 426 if '_cache_pipe' not in self.__dict__: 427 from meerschaum.config._patch import apply_patch_to_config 428 from meerschaum.utils.sql import sql_item_name 429 _parameters = copy.deepcopy(self.parameters) 430 _fetch_patch = { 431 'fetch': ({ 432 'definition': ( 433 f"SELECT * FROM " 434 + sql_item_name( 435 str(self.target), 436 self.instance_connector.flavor, 437 self.instance_connector.get_pipe_schema(self), 438 ) 439 ), 440 }) if self.instance_connector.type == 'sql' else ({ 441 'connector_keys': self.connector_keys, 442 'metric_key': self.metric_key, 443 'location_key': self.location_key, 444 }) 445 } 446 _parameters = apply_patch_to_config(_parameters, _fetch_patch) 447 self._cache_pipe = Pipe( 448 self.instance_keys, 449 (self.connector_keys + '_' + self.metric_key + '_cache'), 450 self.location_key, 451 mrsm_instance = self.cache_connector, 452 parameters = _parameters, 453 cache = False, 454 temporary = True, 455 ) 456 457 return self._cache_pipe 458 459 def __str__(self, ansi: bool=False): 460 return pipe_repr(self, ansi=ansi) 461 462 def __eq__(self, other): 463 try: 464 return ( 465 isinstance(self, type(other)) 466 and self.connector_keys == other.connector_keys 467 and self.metric_key == other.metric_key 468 and self.location_key == other.location_key 469 and self.instance_keys == other.instance_keys 470 ) 471 except Exception as e: 472 return False 473 474 def __hash__(self): 475 ### Using an esoteric separator to avoid collisions. 476 sep = "[\"']" 477 return hash( 478 str(self.connector_keys) + sep 479 + str(self.metric_key) + sep 480 + str(self.location_key) + sep 481 + str(self.instance_keys) + sep 482 ) 483 484 def __repr__(self, ansi: bool=True, **kw) -> str: 485 if not hasattr(sys, 'ps1'): 486 ansi = False 487 488 return pipe_repr(self, ansi=ansi, **kw) 489 490 def __pt_repr__(self): 491 from meerschaum.utils.packages import attempt_import 492 prompt_toolkit_formatted_text = attempt_import('prompt_toolkit.formatted_text', lazy=False) 493 return prompt_toolkit_formatted_text.ANSI(pipe_repr(self, ansi=True)) 494 495 def __getstate__(self) -> Dict[str, Any]: 496 """ 497 Define the state dictionary (pickling). 498 """ 499 return { 500 'connector': self.connector_keys, 501 'metric': self.metric_key, 502 'location': self.location_key, 503 'parameters': self.parameters, 504 'instance': self.instance_keys, 505 } 506 507 def __setstate__(self, _state: Dict[str, Any]): 508 """ 509 Read the state (unpickling). 510 """ 511 self.__init__(**_state) 512 513 def __getitem__(self, key: str) -> Any: 514 """ 515 Index the pipe's attributes. 516 If the `key` cannot be found`, return `None`. 517 """ 518 if key in self.attributes: 519 return self.attributes.get(key, None) 520 521 aliases = { 522 'connector': 'connector_keys', 523 'connector_key': 'connector_keys', 524 'metric': 'metric_key', 525 'location': 'location_key', 526 } 527 aliased_key = aliases.get(key, None) 528 if aliased_key is not None: 529 return self.attributes.get(aliased_key, None) 530 531 property_aliases = { 532 'instance': 'instance_keys', 533 'instance_key': 'instance_keys', 534 } 535 aliased_key = property_aliases.get(key, None) 536 if aliased_key is not None: 537 key = aliased_key 538 return getattr(self, key, None)
Access Meerschaum pipes via Pipe objects.
Pipes are identified by the following:
- Connector keys (e.g.
'sql:main'
) - Metric key (e.g.
'weather'
) - Location (optional; e.g.
None
)
A pipe's connector keys correspond to a data source, and when the pipe is synced,
its fetch
definition is evaluated and executed to produce new data.
Alternatively, new data may be directly synced via pipe.sync()
:
>>> from meerschaum import Pipe
>>> pipe = Pipe('csv', 'weather')
>>>
>>> import pandas as pd
>>> df = pd.read_csv('weather.csv')
>>> pipe.sync(df)
150 def __init__( 151 self, 152 connector: str = '', 153 metric: str = '', 154 location: Optional[str] = None, 155 parameters: Optional[Dict[str, Any]] = None, 156 columns: Union[Dict[str, str], List[str], None] = None, 157 indices: Optional[Dict[str, Union[str, List[str]]]] = None, 158 tags: Optional[List[str]] = None, 159 target: Optional[str] = None, 160 dtypes: Optional[Dict[str, str]] = None, 161 instance: Optional[Union[str, InstanceConnector]] = None, 162 temporary: bool = False, 163 upsert: Optional[bool] = None, 164 autoincrement: Optional[bool] = None, 165 static: Optional[bool] = None, 166 mrsm_instance: Optional[Union[str, InstanceConnector]] = None, 167 cache: bool = False, 168 debug: bool = False, 169 connector_keys: Optional[str] = None, 170 metric_key: Optional[str] = None, 171 location_key: Optional[str] = None, 172 indexes: Union[Dict[str, str], List[str], None] = None, 173 ): 174 """ 175 Parameters 176 ---------- 177 connector: str 178 Keys for the pipe's source connector, e.g. `'sql:main'`. 179 180 metric: str 181 Label for the pipe's contents, e.g. `'weather'`. 182 183 location: str, default None 184 Label for the pipe's location. Defaults to `None`. 185 186 parameters: Optional[Dict[str, Any]], default None 187 Optionally set a pipe's parameters from the constructor, 188 e.g. columns and other attributes. 189 You can edit these parameters with `edit pipes`. 190 191 columns: Union[Dict[str, str], List[str], None], default None 192 Set the `columns` dictionary of `parameters`. 193 If `parameters` is also provided, this dictionary is added under the `'columns'` key. 194 195 indices: Optional[Dict[str, Union[str, List[str]]]], default None 196 Set the `indices` dictionary of `parameters`. 197 If `parameters` is also provided, this dictionary is added under the `'indices'` key. 198 199 tags: Optional[List[str]], default None 200 A list of strings to be added under the `'tags'` key of `parameters`. 201 You can select pipes with certain tags using `--tags`. 202 203 dtypes: Optional[Dict[str, str]], default None 204 Set the `dtypes` dictionary of `parameters`. 205 If `parameters` is also provided, this dictionary is added under the `'dtypes'` key. 206 207 mrsm_instance: Optional[Union[str, InstanceConnector]], default None 208 Connector for the Meerschaum instance where the pipe resides. 209 Defaults to the preconfigured default instance (`'sql:main'`). 210 211 instance: Optional[Union[str, InstanceConnector]], default None 212 Alias for `mrsm_instance`. If `mrsm_instance` is supplied, this value is ignored. 213 214 upsert: Optional[bool], default None 215 If `True`, set `upsert` to `True` in the parameters. 216 217 autoincrement: Optional[bool], default None 218 If `True`, set `autoincrement` in the parameters. 219 220 static: Optional[bool], default None 221 If `True`, set `static` in the parameters. 222 223 temporary: bool, default False 224 If `True`, prevent instance tables (pipes, users, plugins) from being created. 225 226 cache: bool, default False 227 If `True`, cache fetched data into a local database file. 228 Defaults to `False`. 229 """ 230 from meerschaum.utils.warnings import error, warn 231 if (not connector and not connector_keys) or (not metric and not metric_key): 232 error( 233 "Please provide strings for the connector and metric\n " 234 + "(first two positional arguments)." 235 ) 236 237 ### Fall back to legacy `location_key` just in case. 238 if not location: 239 location = location_key 240 241 if not connector: 242 connector = connector_keys 243 244 if not metric: 245 metric = metric_key 246 247 if location in ('[None]', 'None'): 248 location = None 249 250 from meerschaum.config.static import STATIC_CONFIG 251 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 252 for k in (connector, metric, location, *(tags or [])): 253 if str(k).startswith(negation_prefix): 254 error(f"A pipe's keys and tags cannot start with the prefix '{negation_prefix}'.") 255 256 self.connector_keys = str(connector) 257 self.connector_key = self.connector_keys ### Alias 258 self.metric_key = metric 259 self.location_key = location 260 self.temporary = temporary 261 262 self._attributes = { 263 'connector_keys': self.connector_keys, 264 'metric_key': self.metric_key, 265 'location_key': self.location_key, 266 'parameters': {}, 267 } 268 269 ### only set parameters if values are provided 270 if isinstance(parameters, dict): 271 self._attributes['parameters'] = parameters 272 else: 273 if parameters is not None: 274 warn(f"The provided parameters are of invalid type '{type(parameters)}'.") 275 self._attributes['parameters'] = {} 276 277 columns = columns or self._attributes.get('parameters', {}).get('columns', {}) 278 if isinstance(columns, list): 279 columns = {str(col): str(col) for col in columns} 280 if isinstance(columns, dict): 281 self._attributes['parameters']['columns'] = columns 282 elif columns is not None: 283 warn(f"The provided columns are of invalid type '{type(columns)}'.") 284 285 indices = ( 286 indices 287 or indexes 288 or self._attributes.get('parameters', {}).get('indices', None) 289 or self._attributes.get('parameters', {}).get('indexes', None) 290 ) 291 if isinstance(indices, dict): 292 indices_key = ( 293 'indexes' 294 if 'indexes' in self._attributes['parameters'] 295 else 'indices' 296 ) 297 self._attributes['parameters'][indices_key] = indices 298 299 if isinstance(tags, (list, tuple)): 300 self._attributes['parameters']['tags'] = tags 301 elif tags is not None: 302 warn(f"The provided tags are of invalid type '{type(tags)}'.") 303 304 if isinstance(target, str): 305 self._attributes['parameters']['target'] = target 306 elif target is not None: 307 warn(f"The provided target is of invalid type '{type(target)}'.") 308 309 if isinstance(dtypes, dict): 310 self._attributes['parameters']['dtypes'] = dtypes 311 elif dtypes is not None: 312 warn(f"The provided dtypes are of invalid type '{type(dtypes)}'.") 313 314 if isinstance(upsert, bool): 315 self._attributes['parameters']['upsert'] = upsert 316 317 if isinstance(autoincrement, bool): 318 self._attributes['parameters']['autoincrement'] = autoincrement 319 320 if isinstance(static, bool): 321 self._attributes['parameters']['static'] = static 322 323 ### NOTE: The parameters dictionary is {} by default. 324 ### A Pipe may be registered without parameters, then edited, 325 ### or a Pipe may be registered with parameters set in-memory first. 326 # from meerschaum.config import get_config 327 _mrsm_instance = mrsm_instance if mrsm_instance is not None else instance 328 if _mrsm_instance is None: 329 _mrsm_instance = get_config('meerschaum', 'instance', patch=True) 330 331 if not isinstance(_mrsm_instance, str): 332 self._instance_connector = _mrsm_instance 333 self.instance_keys = str(_mrsm_instance) 334 else: ### NOTE: must be SQL or API Connector for this work 335 self.instance_keys = _mrsm_instance 336 337 self._cache = cache and get_config('system', 'experimental', 'cache')
Parameters
- connector (str):
Keys for the pipe's source connector, e.g.
'sql:main'
. - metric (str):
Label for the pipe's contents, e.g.
'weather'
. - location (str, default None):
Label for the pipe's location. Defaults to
None
. - parameters (Optional[Dict[str, Any]], default None):
Optionally set a pipe's parameters from the constructor,
e.g. columns and other attributes.
You can edit these parameters with
edit pipes
. - columns (Union[Dict[str, str], List[str], None], default None):
Set the
columns
dictionary ofparameters
. Ifparameters
is also provided, this dictionary is added under the'columns'
key. - indices (Optional[Dict[str, Union[str, List[str]]]], default None):
Set the
indices
dictionary ofparameters
. Ifparameters
is also provided, this dictionary is added under the'indices'
key. - tags (Optional[List[str]], default None):
A list of strings to be added under the
'tags'
key ofparameters
. You can select pipes with certain tags using--tags
. - dtypes (Optional[Dict[str, str]], default None):
Set the
dtypes
dictionary ofparameters
. Ifparameters
is also provided, this dictionary is added under the'dtypes'
key. - mrsm_instance (Optional[Union[str, InstanceConnector]], default None):
Connector for the Meerschaum instance where the pipe resides.
Defaults to the preconfigured default instance (
'sql:main'
). - instance (Optional[Union[str, InstanceConnector]], default None):
Alias for
mrsm_instance
. Ifmrsm_instance
is supplied, this value is ignored. - upsert (Optional[bool], default None):
If
True
, setupsert
toTrue
in the parameters. - autoincrement (Optional[bool], default None):
If
True
, setautoincrement
in the parameters. - static (Optional[bool], default None):
If
True
, setstatic
in the parameters. - temporary (bool, default False):
If
True
, prevent instance tables (pipes, users, plugins) from being created. - cache (bool, default False):
If
True
, cache fetched data into a local database file. Defaults toFalse
.
339 @property 340 def meta(self): 341 """ 342 Return the four keys needed to reconstruct this pipe. 343 """ 344 return { 345 'connector': self.connector_keys, 346 'metric': self.metric_key, 347 'location': self.location_key, 348 'instance': self.instance_keys, 349 }
Return the four keys needed to reconstruct this pipe.
351 def keys(self) -> List[str]: 352 """ 353 Return the ordered keys for this pipe. 354 """ 355 return { 356 key: val 357 for key, val in self.meta.items() 358 if key != 'instance' 359 }
Return the ordered keys for this pipe.
361 @property 362 def instance_connector(self) -> Union[InstanceConnector, None]: 363 """ 364 The connector to where this pipe resides. 365 May either be of type `meerschaum.connectors.sql.SQLConnector` or 366 `meerschaum.connectors.api.APIConnector`. 367 """ 368 if '_instance_connector' not in self.__dict__: 369 from meerschaum.connectors.parse import parse_instance_keys 370 conn = parse_instance_keys(self.instance_keys) 371 if conn: 372 self._instance_connector = conn 373 else: 374 return None 375 return self._instance_connector
The connector to where this pipe resides.
May either be of type meerschaum.connectors.sql.SQLConnector
or
meerschaum.connectors.api.APIConnector
.
377 @property 378 def connector(self) -> Union[meerschaum.connectors.Connector, None]: 379 """ 380 The connector to the data source. 381 """ 382 if '_connector' not in self.__dict__: 383 from meerschaum.connectors.parse import parse_instance_keys 384 import warnings 385 with warnings.catch_warnings(): 386 warnings.simplefilter('ignore') 387 try: 388 conn = parse_instance_keys(self.connector_keys) 389 except Exception as e: 390 conn = None 391 if conn: 392 self._connector = conn 393 else: 394 return None 395 return self._connector
The connector to the data source.
397 @property 398 def cache_connector(self) -> Union[meerschaum.connectors.sql.SQLConnector, None]: 399 """ 400 If the pipe was created with `cache=True`, return the connector to the pipe's 401 SQLite database for caching. 402 """ 403 if not self._cache: 404 return None 405 406 if '_cache_connector' not in self.__dict__: 407 from meerschaum.connectors import get_connector 408 from meerschaum.config._paths import DUCKDB_RESOURCES_PATH, SQLITE_RESOURCES_PATH 409 _resources_path = SQLITE_RESOURCES_PATH 410 self._cache_connector = get_connector( 411 'sql', '_cache_' + str(self), 412 flavor='sqlite', 413 database=str(_resources_path / ('_cache_' + str(self) + '.db')), 414 ) 415 416 return self._cache_connector
If the pipe was created with cache=True
, return the connector to the pipe's
SQLite database for caching.
418 @property 419 def cache_pipe(self) -> Union['meerschaum.Pipe', None]: 420 """ 421 If the pipe was created with `cache=True`, return another `meerschaum.Pipe` used to 422 manage the local data. 423 """ 424 if self.cache_connector is None: 425 return None 426 if '_cache_pipe' not in self.__dict__: 427 from meerschaum.config._patch import apply_patch_to_config 428 from meerschaum.utils.sql import sql_item_name 429 _parameters = copy.deepcopy(self.parameters) 430 _fetch_patch = { 431 'fetch': ({ 432 'definition': ( 433 f"SELECT * FROM " 434 + sql_item_name( 435 str(self.target), 436 self.instance_connector.flavor, 437 self.instance_connector.get_pipe_schema(self), 438 ) 439 ), 440 }) if self.instance_connector.type == 'sql' else ({ 441 'connector_keys': self.connector_keys, 442 'metric_key': self.metric_key, 443 'location_key': self.location_key, 444 }) 445 } 446 _parameters = apply_patch_to_config(_parameters, _fetch_patch) 447 self._cache_pipe = Pipe( 448 self.instance_keys, 449 (self.connector_keys + '_' + self.metric_key + '_cache'), 450 self.location_key, 451 mrsm_instance = self.cache_connector, 452 parameters = _parameters, 453 cache = False, 454 temporary = True, 455 ) 456 457 return self._cache_pipe
If the pipe was created with cache=True
, return another meerschaum.Pipe
used to
manage the local data.
21def fetch( 22 self, 23 begin: Union[datetime, int, str, None] = '', 24 end: Union[datetime, int, None] = None, 25 check_existing: bool = True, 26 sync_chunks: bool = False, 27 debug: bool = False, 28 **kw: Any 29) -> Union['pd.DataFrame', Iterator['pd.DataFrame'], None]: 30 """ 31 Fetch a Pipe's latest data from its connector. 32 33 Parameters 34 ---------- 35 begin: Union[datetime, str, None], default '': 36 If provided, only fetch data newer than or equal to `begin`. 37 38 end: Optional[datetime], default None: 39 If provided, only fetch data older than or equal to `end`. 40 41 check_existing: bool, default True 42 If `False`, do not apply the backtrack interval. 43 44 sync_chunks: bool, default False 45 If `True` and the pipe's connector is of type `'sql'`, begin syncing chunks while fetching 46 loads chunks into memory. 47 48 debug: bool, default False 49 Verbosity toggle. 50 51 Returns 52 ------- 53 A `pd.DataFrame` of the newest unseen data. 54 55 """ 56 if 'fetch' not in dir(self.connector): 57 warn(f"No `fetch()` function defined for connector '{self.connector}'") 58 return None 59 60 from meerschaum.connectors import custom_types, get_connector_plugin 61 from meerschaum.utils.debug import dprint, _checkpoint 62 from meerschaum.utils.misc import filter_arguments 63 64 _chunk_hook = kw.pop('chunk_hook', None) 65 kw['workers'] = self.get_num_workers(kw.get('workers', None)) 66 if sync_chunks and _chunk_hook is None: 67 68 def _chunk_hook(chunk, **_kw) -> SuccessTuple: 69 """ 70 Wrap `Pipe.sync()` with a custom chunk label prepended to the message. 71 """ 72 from meerschaum.config._patch import apply_patch_to_config 73 kwargs = apply_patch_to_config(kw, _kw) 74 chunk_success, chunk_message = self.sync(chunk, **kwargs) 75 chunk_label = self._get_chunk_label(chunk, self.columns.get('datetime', None)) 76 if chunk_label: 77 chunk_message = '\n' + chunk_label + '\n' + chunk_message 78 return chunk_success, chunk_message 79 80 begin, end = self.parse_date_bounds(begin, end) 81 82 with mrsm.Venv(get_connector_plugin(self.connector)): 83 _args, _kwargs = filter_arguments( 84 self.connector.fetch, 85 self, 86 begin=_determine_begin( 87 self, 88 begin, 89 check_existing=check_existing, 90 debug=debug, 91 ), 92 end=end, 93 chunk_hook=_chunk_hook, 94 debug=debug, 95 **kw 96 ) 97 df = self.connector.fetch(*_args, **_kwargs) 98 return df
Fetch a Pipe's latest data from its connector.
Parameters
- begin (Union[datetime, str, None], default '':):
If provided, only fetch data newer than or equal to
begin
. - end (Optional[datetime], default None:):
If provided, only fetch data older than or equal to
end
. - check_existing (bool, default True):
If
False
, do not apply the backtrack interval. - sync_chunks (bool, default False):
If
True
and the pipe's connector is of type'sql'
, begin syncing chunks while fetching loads chunks into memory. - debug (bool, default False): Verbosity toggle.
Returns
- A
pd.DataFrame
of the newest unseen data.
101def get_backtrack_interval( 102 self, 103 check_existing: bool = True, 104 debug: bool = False, 105) -> Union[timedelta, int]: 106 """ 107 Get the chunk interval to use for this pipe. 108 109 Parameters 110 ---------- 111 check_existing: bool, default True 112 If `False`, return a backtrack_interval of 0 minutes. 113 114 Returns 115 ------- 116 The backtrack interval (`timedelta` or `int`) to use with this pipe's `datetime` axis. 117 """ 118 default_backtrack_minutes = get_config('pipes', 'parameters', 'fetch', 'backtrack_minutes') 119 configured_backtrack_minutes = self.parameters.get('fetch', {}).get('backtrack_minutes', None) 120 backtrack_minutes = ( 121 configured_backtrack_minutes 122 if configured_backtrack_minutes is not None 123 else default_backtrack_minutes 124 ) if check_existing else 0 125 126 backtrack_interval = timedelta(minutes=backtrack_minutes) 127 dt_col = self.columns.get('datetime', None) 128 if dt_col is None: 129 return backtrack_interval 130 131 dt_dtype = self.dtypes.get(dt_col, 'datetime64[ns, UTC]') 132 if 'int' in dt_dtype.lower(): 133 return backtrack_minutes 134 135 return backtrack_interval
Get the chunk interval to use for this pipe.
Parameters
- check_existing (bool, default True):
If
False
, return a backtrack_interval of 0 minutes.
Returns
- The backtrack interval (
timedelta
orint
) to use with this pipe'sdatetime
axis.
23def get_data( 24 self, 25 select_columns: Optional[List[str]] = None, 26 omit_columns: Optional[List[str]] = None, 27 begin: Union[datetime, int, str, None] = None, 28 end: Union[datetime, int, str, None] = None, 29 params: Optional[Dict[str, Any]] = None, 30 as_iterator: bool = False, 31 as_chunks: bool = False, 32 as_dask: bool = False, 33 chunk_interval: Union[timedelta, int, None] = None, 34 order: Optional[str] = 'asc', 35 limit: Optional[int] = None, 36 fresh: bool = False, 37 debug: bool = False, 38 **kw: Any 39) -> Union['pd.DataFrame', Iterator['pd.DataFrame'], None]: 40 """ 41 Get a pipe's data from the instance connector. 42 43 Parameters 44 ---------- 45 select_columns: Optional[List[str]], default None 46 If provided, only select these given columns. 47 Otherwise select all available columns (i.e. `SELECT *`). 48 49 omit_columns: Optional[List[str]], default None 50 If provided, remove these columns from the selection. 51 52 begin: Union[datetime, int, str, None], default None 53 Lower bound datetime to begin searching for data (inclusive). 54 Translates to a `WHERE` clause like `WHERE datetime >= begin`. 55 Defaults to `None`. 56 57 end: Union[datetime, int, str, None], default None 58 Upper bound datetime to stop searching for data (inclusive). 59 Translates to a `WHERE` clause like `WHERE datetime < end`. 60 Defaults to `None`. 61 62 params: Optional[Dict[str, Any]], default None 63 Filter the retrieved data by a dictionary of parameters. 64 See `meerschaum.utils.sql.build_where` for more details. 65 66 as_iterator: bool, default False 67 If `True`, return a generator of chunks of pipe data. 68 69 as_chunks: bool, default False 70 Alias for `as_iterator`. 71 72 as_dask: bool, default False 73 If `True`, return a `dask.DataFrame` 74 (which may be loaded into a Pandas DataFrame with `df.compute()`). 75 76 chunk_interval: Union[timedelta, int, None], default None 77 If `as_iterator`, then return chunks with `begin` and `end` separated by this interval. 78 This may be set under `pipe.parameters['chunk_minutes']`. 79 By default, use a timedelta of 1440 minutes (1 day). 80 If `chunk_interval` is an integer and the `datetime` axis a timestamp, 81 the use a timedelta with the number of minutes configured to this value. 82 If the `datetime` axis is an integer, default to the configured chunksize. 83 If `chunk_interval` is a `timedelta` and the `datetime` axis an integer, 84 use the number of minutes in the `timedelta`. 85 86 order: Optional[str], default 'asc' 87 If `order` is not `None`, sort the resulting dataframe by indices. 88 89 limit: Optional[int], default None 90 If provided, cap the dataframe to this many rows. 91 92 fresh: bool, default True 93 If `True`, skip local cache and directly query the instance connector. 94 Defaults to `True`. 95 96 debug: bool, default False 97 Verbosity toggle. 98 Defaults to `False`. 99 100 Returns 101 ------- 102 A `pd.DataFrame` for the pipe's data corresponding to the provided parameters. 103 104 """ 105 from meerschaum.utils.warnings import warn 106 from meerschaum.utils.venv import Venv 107 from meerschaum.connectors import get_connector_plugin 108 from meerschaum.utils.misc import iterate_chunks, items_str 109 from meerschaum.utils.dtypes import to_pandas_dtype, coerce_timezone 110 from meerschaum.utils.dataframe import add_missing_cols_to_df, df_is_chunk_generator 111 from meerschaum.utils.packages import attempt_import 112 dd = attempt_import('dask.dataframe') if as_dask else None 113 dask = attempt_import('dask') if as_dask else None 114 dateutil_parser = attempt_import('dateutil.parser') 115 116 if select_columns == '*': 117 select_columns = None 118 elif isinstance(select_columns, str): 119 select_columns = [select_columns] 120 121 if isinstance(omit_columns, str): 122 omit_columns = [omit_columns] 123 124 begin, end = self.parse_date_bounds(begin, end) 125 as_iterator = as_iterator or as_chunks 126 dt_col = self.columns.get('datetime', None) 127 128 def _sort_df(_df): 129 if df_is_chunk_generator(_df): 130 return _df 131 indices = [] if dt_col not in _df.columns else [dt_col] 132 non_dt_cols = [ 133 col 134 for col_ix, col in self.columns.items() 135 if col_ix != 'datetime' and col in _df.columns 136 ] 137 indices.extend(non_dt_cols) 138 if 'dask' not in _df.__module__: 139 _df.sort_values( 140 by=indices, 141 inplace=True, 142 ascending=(str(order).lower() == 'asc'), 143 ) 144 _df.reset_index(drop=True, inplace=True) 145 else: 146 _df = _df.sort_values( 147 by=indices, 148 ascending=(str(order).lower() == 'asc'), 149 ) 150 _df = _df.reset_index(drop=True) 151 if limit is not None and len(_df) > limit: 152 return _df.head(limit) 153 return _df 154 155 if as_iterator or as_chunks: 156 df = self._get_data_as_iterator( 157 select_columns=select_columns, 158 omit_columns=omit_columns, 159 begin=begin, 160 end=end, 161 params=params, 162 chunk_interval=chunk_interval, 163 limit=limit, 164 order=order, 165 fresh=fresh, 166 debug=debug, 167 ) 168 return _sort_df(df) 169 170 if as_dask: 171 from multiprocessing.pool import ThreadPool 172 dask_pool = ThreadPool(self.get_num_workers()) 173 dask.config.set(pool=dask_pool) 174 chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug) 175 bounds = self.get_chunk_bounds( 176 begin=begin, 177 end=end, 178 bounded=False, 179 chunk_interval=chunk_interval, 180 debug=debug, 181 ) 182 dask_chunks = [ 183 dask.delayed(self.get_data)( 184 select_columns=select_columns, 185 omit_columns=omit_columns, 186 begin=chunk_begin, 187 end=chunk_end, 188 params=params, 189 chunk_interval=chunk_interval, 190 order=order, 191 limit=limit, 192 fresh=fresh, 193 debug=debug, 194 ) 195 for (chunk_begin, chunk_end) in bounds 196 ] 197 dask_meta = { 198 col: to_pandas_dtype(typ) 199 for col, typ in self.dtypes.items() 200 } 201 return _sort_df(dd.from_delayed(dask_chunks, meta=dask_meta)) 202 203 if not self.exists(debug=debug): 204 return None 205 206 if self.cache_pipe is not None: 207 if not fresh: 208 _sync_cache_tuple = self.cache_pipe.sync( 209 begin=begin, 210 end=end, 211 params=params, 212 debug=debug, 213 **kw 214 ) 215 if not _sync_cache_tuple[0]: 216 warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1]) 217 fresh = True 218 else: ### Successfully synced cache. 219 return self.enforce_dtypes( 220 self.cache_pipe.get_data( 221 select_columns=select_columns, 222 omit_columns=omit_columns, 223 begin=begin, 224 end=end, 225 params=params, 226 order=order, 227 limit=limit, 228 debug=debug, 229 fresh=True, 230 **kw 231 ), 232 debug=debug, 233 ) 234 235 with Venv(get_connector_plugin(self.instance_connector)): 236 df = self.instance_connector.get_pipe_data( 237 pipe=self, 238 select_columns=select_columns, 239 omit_columns=omit_columns, 240 begin=begin, 241 end=end, 242 params=params, 243 limit=limit, 244 order=order, 245 debug=debug, 246 **kw 247 ) 248 if df is None: 249 return df 250 251 if not select_columns: 252 select_columns = [col for col in df.columns] 253 254 cols_to_omit = [ 255 col 256 for col in df.columns 257 if ( 258 col in (omit_columns or []) 259 or 260 col not in (select_columns or []) 261 ) 262 ] 263 cols_to_add = [ 264 col 265 for col in select_columns 266 if col not in df.columns 267 ] 268 if cols_to_omit: 269 warn( 270 ( 271 f"Received {len(cols_to_omit)} omitted column" 272 + ('s' if len(cols_to_omit) != 1 else '') 273 + f" for {self}. " 274 + "Consider adding `select_columns` and `omit_columns` support to " 275 + f"'{self.instance_connector.type}' connectors to improve performance." 276 ), 277 stack=False, 278 ) 279 _cols_to_select = [col for col in df.columns if col not in cols_to_omit] 280 df = df[_cols_to_select] 281 282 if cols_to_add: 283 warn( 284 ( 285 f"Specified columns {items_str(cols_to_add)} were not found on {self}. " 286 + "Adding these to the DataFrame as null columns." 287 ), 288 stack=False, 289 ) 290 df = add_missing_cols_to_df(df, {col: 'string' for col in cols_to_add}) 291 292 enforced_df = self.enforce_dtypes(df, debug=debug) 293 294 if order: 295 return _sort_df(enforced_df) 296 return enforced_df
Get a pipe's data from the instance connector.
Parameters
- select_columns (Optional[List[str]], default None):
If provided, only select these given columns.
Otherwise select all available columns (i.e.
SELECT *
). - omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
- begin (Union[datetime, int, str, None], default None):
Lower bound datetime to begin searching for data (inclusive).
Translates to a
WHERE
clause likeWHERE datetime >= begin
. Defaults toNone
. - end (Union[datetime, int, str, None], default None):
Upper bound datetime to stop searching for data (inclusive).
Translates to a
WHERE
clause likeWHERE datetime < end
. Defaults toNone
. - params (Optional[Dict[str, Any]], default None):
Filter the retrieved data by a dictionary of parameters.
See
meerschaum.utils.sql.build_where
for more details. - as_iterator (bool, default False):
If
True
, return a generator of chunks of pipe data. - as_chunks (bool, default False):
Alias for
as_iterator
. - as_dask (bool, default False):
If
True
, return adask.DataFrame
(which may be loaded into a Pandas DataFrame withdf.compute()
). - chunk_interval (Union[timedelta, int, None], default None):
If
as_iterator
, then return chunks withbegin
andend
separated by this interval. This may be set underpipe.parameters['chunk_minutes']
. By default, use a timedelta of 1440 minutes (1 day). Ifchunk_interval
is an integer and thedatetime
axis a timestamp, the use a timedelta with the number of minutes configured to this value. If thedatetime
axis is an integer, default to the configured chunksize. Ifchunk_interval
is atimedelta
and thedatetime
axis an integer, use the number of minutes in thetimedelta
. - order (Optional[str], default 'asc'):
If
order
is notNone
, sort the resulting dataframe by indices. - limit (Optional[int], default None): If provided, cap the dataframe to this many rows.
- fresh (bool, default True):
If
True
, skip local cache and directly query the instance connector. Defaults toTrue
. - debug (bool, default False):
Verbosity toggle.
Defaults to
False
.
Returns
- A
pd.DataFrame
for the pipe's data corresponding to the provided parameters.
388def get_backtrack_data( 389 self, 390 backtrack_minutes: Optional[int] = None, 391 begin: Union[datetime, int, None] = None, 392 params: Optional[Dict[str, Any]] = None, 393 limit: Optional[int] = None, 394 fresh: bool = False, 395 debug: bool = False, 396 **kw: Any 397) -> Optional['pd.DataFrame']: 398 """ 399 Get the most recent data from the instance connector as a Pandas DataFrame. 400 401 Parameters 402 ---------- 403 backtrack_minutes: Optional[int], default None 404 How many minutes from `begin` to select from. 405 If `None`, use `pipe.parameters['fetch']['backtrack_minutes']`. 406 407 begin: Optional[datetime], default None 408 The starting point to search for data. 409 If begin is `None` (default), use the most recent observed datetime 410 (AKA sync_time). 411 412 ``` 413 E.g. begin = 02:00 414 415 Search this region. Ignore this, even if there's data. 416 / / / / / / / / / | 417 -----|----------|----------|----------|----------|----------| 418 00:00 01:00 02:00 03:00 04:00 05:00 419 420 ``` 421 422 params: Optional[Dict[str, Any]], default None 423 The standard Meerschaum `params` query dictionary. 424 425 limit: Optional[int], default None 426 If provided, cap the number of rows to be returned. 427 428 fresh: bool, default False 429 If `True`, Ignore local cache and pull directly from the instance connector. 430 Only comes into effect if a pipe was created with `cache=True`. 431 432 debug: bool default False 433 Verbosity toggle. 434 435 Returns 436 ------- 437 A `pd.DataFrame` for the pipe's data corresponding to the provided parameters. Backtrack data 438 is a convenient way to get a pipe's data "backtracked" from the most recent datetime. 439 """ 440 from meerschaum.utils.warnings import warn 441 from meerschaum.utils.venv import Venv 442 from meerschaum.connectors import get_connector_plugin 443 444 if not self.exists(debug=debug): 445 return None 446 447 begin = self.parse_date_bounds(begin) 448 449 backtrack_interval = self.get_backtrack_interval(debug=debug) 450 if backtrack_minutes is None: 451 backtrack_minutes = ( 452 (backtrack_interval.total_seconds() / 60) 453 if isinstance(backtrack_interval, timedelta) 454 else backtrack_interval 455 ) 456 457 if self.cache_pipe is not None: 458 if not fresh: 459 _sync_cache_tuple = self.cache_pipe.sync(begin=begin, params=params, debug=debug, **kw) 460 if not _sync_cache_tuple[0]: 461 warn(f"Failed to sync cache for {self}:\n" + _sync_cache_tuple[1]) 462 fresh = True 463 else: ### Successfully synced cache. 464 return self.enforce_dtypes( 465 self.cache_pipe.get_backtrack_data( 466 fresh=True, 467 begin=begin, 468 backtrack_minutes=backtrack_minutes, 469 params=params, 470 limit=limit, 471 order=kw.get('order', 'desc'), 472 debug=debug, 473 **kw 474 ), 475 debug=debug, 476 ) 477 478 if hasattr(self.instance_connector, 'get_backtrack_data'): 479 with Venv(get_connector_plugin(self.instance_connector)): 480 return self.enforce_dtypes( 481 self.instance_connector.get_backtrack_data( 482 pipe=self, 483 begin=begin, 484 backtrack_minutes=backtrack_minutes, 485 params=params, 486 limit=limit, 487 debug=debug, 488 **kw 489 ), 490 debug=debug, 491 ) 492 493 if begin is None: 494 begin = self.get_sync_time(params=params, debug=debug) 495 496 backtrack_interval = ( 497 timedelta(minutes=backtrack_minutes) 498 if isinstance(begin, datetime) 499 else backtrack_minutes 500 ) 501 if begin is not None: 502 begin = begin - backtrack_interval 503 504 return self.get_data( 505 begin=begin, 506 params=params, 507 debug=debug, 508 limit=limit, 509 order=kw.get('order', 'desc'), 510 **kw 511 )
Get the most recent data from the instance connector as a Pandas DataFrame.
Parameters
- backtrack_minutes (Optional[int], default None):
How many minutes from
begin
to select from. IfNone
, usepipe.parameters['fetch']['backtrack_minutes']
. begin (Optional[datetime], default None): The starting point to search for data. If begin is
None
(default), use the most recent observed datetime (AKA sync_time).E.g. begin = 02:00 Search this region. Ignore this, even if there's data. / / / / / / / / / | -----|----------|----------|----------|----------|----------| 00:00 01:00 02:00 03:00 04:00 05:00
params (Optional[Dict[str, Any]], default None): The standard Meerschaum
params
query dictionary.- limit (Optional[int], default None): If provided, cap the number of rows to be returned.
- fresh (bool, default False):
If
True
, Ignore local cache and pull directly from the instance connector. Only comes into effect if a pipe was created withcache=True
. - debug (bool default False): Verbosity toggle.
Returns
- A
pd.DataFrame
for the pipe's data corresponding to the provided parameters. Backtrack data - is a convenient way to get a pipe's data "backtracked" from the most recent datetime.
514def get_rowcount( 515 self, 516 begin: Union[datetime, int, None] = None, 517 end: Union[datetime, int, None] = None, 518 params: Optional[Dict[str, Any]] = None, 519 remote: bool = False, 520 debug: bool = False 521) -> int: 522 """ 523 Get a Pipe's instance or remote rowcount. 524 525 Parameters 526 ---------- 527 begin: Optional[datetime], default None 528 Count rows where datetime > begin. 529 530 end: Optional[datetime], default None 531 Count rows where datetime < end. 532 533 remote: bool, default False 534 Count rows from a pipe's remote source. 535 **NOTE**: This is experimental! 536 537 debug: bool, default False 538 Verbosity toggle. 539 540 Returns 541 ------- 542 An `int` of the number of rows in the pipe corresponding to the provided parameters. 543 Returned 0 if the pipe does not exist. 544 """ 545 from meerschaum.utils.warnings import warn 546 from meerschaum.utils.venv import Venv 547 from meerschaum.connectors import get_connector_plugin 548 549 begin, end = self.parse_date_bounds(begin, end) 550 connector = self.instance_connector if not remote else self.connector 551 try: 552 with Venv(get_connector_plugin(connector)): 553 rowcount = connector.get_pipe_rowcount( 554 self, 555 begin=begin, 556 end=end, 557 params=params, 558 remote=remote, 559 debug=debug, 560 ) 561 if rowcount is None: 562 return 0 563 return rowcount 564 except AttributeError as e: 565 warn(e) 566 if remote: 567 return 0 568 warn(f"Failed to get a rowcount for {self}.") 569 return 0
Get a Pipe's instance or remote rowcount.
Parameters
- begin (Optional[datetime], default None): Count rows where datetime > begin.
- end (Optional[datetime], default None): Count rows where datetime < end.
- remote (bool, default False): Count rows from a pipe's remote source. NOTE: This is experimental!
- debug (bool, default False): Verbosity toggle.
Returns
- An
int
of the number of rows in the pipe corresponding to the provided parameters. - Returned 0 if the pipe does not exist.
572def get_chunk_interval( 573 self, 574 chunk_interval: Union[timedelta, int, None] = None, 575 debug: bool = False, 576) -> Union[timedelta, int]: 577 """ 578 Get the chunk interval to use for this pipe. 579 580 Parameters 581 ---------- 582 chunk_interval: Union[timedelta, int, None], default None 583 If provided, coerce this value into the correct type. 584 For example, if the datetime axis is an integer, then 585 return the number of minutes. 586 587 Returns 588 ------- 589 The chunk interval (`timedelta` or `int`) to use with this pipe's `datetime` axis. 590 """ 591 default_chunk_minutes = get_config('pipes', 'parameters', 'verify', 'chunk_minutes') 592 configured_chunk_minutes = self.parameters.get('verify', {}).get('chunk_minutes', None) 593 chunk_minutes = ( 594 (configured_chunk_minutes or default_chunk_minutes) 595 if chunk_interval is None 596 else ( 597 chunk_interval 598 if isinstance(chunk_interval, int) 599 else int(chunk_interval.total_seconds() / 60) 600 ) 601 ) 602 603 dt_col = self.columns.get('datetime', None) 604 if dt_col is None: 605 return timedelta(minutes=chunk_minutes) 606 607 dt_dtype = self.dtypes.get(dt_col, 'datetime64[ns, UTC]') 608 if 'int' in dt_dtype.lower(): 609 return chunk_minutes 610 return timedelta(minutes=chunk_minutes)
Get the chunk interval to use for this pipe.
Parameters
- chunk_interval (Union[timedelta, int, None], default None): If provided, coerce this value into the correct type. For example, if the datetime axis is an integer, then return the number of minutes.
Returns
- The chunk interval (
timedelta
orint
) to use with this pipe'sdatetime
axis.
613def get_chunk_bounds( 614 self, 615 begin: Union[datetime, int, None] = None, 616 end: Union[datetime, int, None] = None, 617 bounded: bool = False, 618 chunk_interval: Union[timedelta, int, None] = None, 619 debug: bool = False, 620) -> List[ 621 Tuple[ 622 Union[datetime, int, None], 623 Union[datetime, int, None], 624 ] 625]: 626 """ 627 Return a list of datetime bounds for iterating over the pipe's `datetime` axis. 628 629 Parameters 630 ---------- 631 begin: Union[datetime, int, None], default None 632 If provided, do not select less than this value. 633 Otherwise the first chunk will be unbounded. 634 635 end: Union[datetime, int, None], default None 636 If provided, do not select greater than or equal to this value. 637 Otherwise the last chunk will be unbounded. 638 639 bounded: bool, default False 640 If `True`, do not include `None` in the first chunk. 641 642 chunk_interval: Union[timedelta, int, None], default None 643 If provided, use this interval for the size of chunk boundaries. 644 The default value for this pipe may be set 645 under `pipe.parameters['verify']['chunk_minutes']`. 646 647 debug: bool, default False 648 Verbosity toggle. 649 650 Returns 651 ------- 652 A list of chunk bounds (datetimes or integers). 653 If unbounded, the first and last chunks will include `None`. 654 """ 655 include_less_than_begin = not bounded and begin is None 656 include_greater_than_end = not bounded and end is None 657 if begin is None: 658 begin = self.get_sync_time(newest=False, debug=debug) 659 if end is None: 660 end = self.get_sync_time(newest=True, debug=debug) 661 if begin is None and end is None: 662 return [(None, None)] 663 664 begin, end = self.parse_date_bounds(begin, end) 665 666 ### Set the chunk interval under `pipe.parameters['verify']['chunk_minutes']`. 667 chunk_interval = self.get_chunk_interval(chunk_interval, debug=debug) 668 669 ### Build a list of tuples containing the chunk boundaries 670 ### so that we can sync multiple chunks in parallel. 671 ### Run `verify pipes --workers 1` to sync chunks in series. 672 chunk_bounds = [] 673 begin_cursor = begin 674 while begin_cursor < end: 675 end_cursor = begin_cursor + chunk_interval 676 chunk_bounds.append((begin_cursor, end_cursor)) 677 begin_cursor = end_cursor 678 679 ### The chunk interval might be too large. 680 if not chunk_bounds and end >= begin: 681 chunk_bounds = [(begin, end)] 682 683 ### Truncate the last chunk to the end timestamp. 684 if chunk_bounds[-1][1] > end: 685 chunk_bounds[-1] = (chunk_bounds[-1][0], end) 686 687 ### Pop the last chunk if its bounds are equal. 688 if chunk_bounds[-1][0] == chunk_bounds[-1][1]: 689 chunk_bounds = chunk_bounds[:-1] 690 691 if include_less_than_begin: 692 chunk_bounds = [(None, begin)] + chunk_bounds 693 if include_greater_than_end: 694 chunk_bounds = chunk_bounds + [(end, None)] 695 696 return chunk_bounds
Return a list of datetime bounds for iterating over the pipe's datetime
axis.
Parameters
- begin (Union[datetime, int, None], default None): If provided, do not select less than this value. Otherwise the first chunk will be unbounded.
- end (Union[datetime, int, None], default None): If provided, do not select greater than or equal to this value. Otherwise the last chunk will be unbounded.
- bounded (bool, default False):
If
True
, do not includeNone
in the first chunk. - chunk_interval (Union[timedelta, int, None], default None):
If provided, use this interval for the size of chunk boundaries.
The default value for this pipe may be set
under
pipe.parameters['verify']['chunk_minutes']
. - debug (bool, default False): Verbosity toggle.
Returns
- A list of chunk bounds (datetimes or integers).
- If unbounded, the first and last chunks will include
None
.
699def parse_date_bounds(self, *dt_vals: Union[datetime, int, None]) -> Union[ 700 datetime, 701 int, 702 str, 703 None, 704 Tuple[Union[datetime, int, str, None]] 705]: 706 """ 707 Given a date bound (begin, end), coerce a timezone if necessary. 708 """ 709 from meerschaum.utils.misc import is_int 710 from meerschaum.utils.dtypes import coerce_timezone 711 from meerschaum.utils.warnings import warn 712 dateutil_parser = mrsm.attempt_import('dateutil.parser') 713 714 def _parse_date_bound(dt_val): 715 if dt_val is None: 716 return None 717 718 if isinstance(dt_val, int): 719 return dt_val 720 721 if dt_val == '': 722 return '' 723 724 if is_int(dt_val): 725 return int(dt_val) 726 727 if isinstance(dt_val, str): 728 try: 729 dt_val = dateutil_parser.parse(dt_val) 730 except Exception as e: 731 warn(f"Could not parse '{dt_val}' as datetime:\n{e}") 732 return None 733 734 dt_col = self.columns.get('datetime', None) 735 dt_typ = str(self.dtypes.get(dt_col, 'datetime64[ns, UTC]')) 736 if dt_typ == 'datetime': 737 dt_typ = 'datetime64[ns, UTC]' 738 return coerce_timezone(dt_val, strip_utc=('utc' not in dt_typ.lower())) 739 740 bounds = tuple(_parse_date_bound(dt_val) for dt_val in dt_vals) 741 if len(bounds) == 1: 742 return bounds[0] 743 return bounds
Given a date bound (begin, end), coerce a timezone if necessary.
12def register( 13 self, 14 debug: bool = False, 15 **kw: Any 16 ) -> SuccessTuple: 17 """ 18 Register a new Pipe along with its attributes. 19 20 Parameters 21 ---------- 22 debug: bool, default False 23 Verbosity toggle. 24 25 kw: Any 26 Keyword arguments to pass to `instance_connector.register_pipe()`. 27 28 Returns 29 ------- 30 A `SuccessTuple` of success, message. 31 """ 32 if self.temporary: 33 return False, "Cannot register pipes created with `temporary=True` (read-only)." 34 35 from meerschaum.utils.formatting import get_console 36 from meerschaum.utils.venv import Venv 37 from meerschaum.connectors import get_connector_plugin, custom_types 38 from meerschaum.config._patch import apply_patch_to_config 39 40 import warnings 41 with warnings.catch_warnings(): 42 warnings.simplefilter('ignore') 43 try: 44 _conn = self.connector 45 except Exception as e: 46 _conn = None 47 48 if ( 49 _conn is not None 50 and 51 (_conn.type == 'plugin' or _conn.type in custom_types) 52 and 53 getattr(_conn, 'register', None) is not None 54 ): 55 try: 56 with Venv(get_connector_plugin(_conn), debug=debug): 57 params = self.connector.register(self) 58 except Exception as e: 59 get_console().print_exception() 60 params = None 61 params = {} if params is None else params 62 if not isinstance(params, dict): 63 from meerschaum.utils.warnings import warn 64 warn( 65 f"Invalid parameters returned from `register()` in connector {self.connector}:\n" 66 + f"{params}" 67 ) 68 else: 69 self.parameters = apply_patch_to_config(params, self.parameters) 70 71 if not self.parameters: 72 cols = self.columns if self.columns else {'datetime': None, 'id': None} 73 self.parameters = { 74 'columns': cols, 75 } 76 77 with Venv(get_connector_plugin(self.instance_connector)): 78 return self.instance_connector.register_pipe(self, debug=debug, **kw)
Register a new Pipe along with its attributes.
Parameters
- debug (bool, default False): Verbosity toggle.
- kw (Any):
Keyword arguments to pass to
instance_connector.register_pipe()
.
Returns
- A
SuccessTuple
of success, message.
19@property 20def attributes(self) -> Dict[str, Any]: 21 """ 22 Return a dictionary of a pipe's keys and parameters. 23 These values are reflected directly from the pipes table of the instance. 24 """ 25 import time 26 from meerschaum.config import get_config 27 from meerschaum.config._patch import apply_patch_to_config 28 from meerschaum.utils.venv import Venv 29 from meerschaum.connectors import get_connector_plugin 30 31 timeout_seconds = get_config('pipes', 'attributes', 'local_cache_timeout_seconds') 32 33 if '_attributes' not in self.__dict__: 34 self._attributes = {} 35 36 now = time.perf_counter() 37 last_refresh = self.__dict__.get('_attributes_sync_time', None) 38 timed_out = ( 39 last_refresh is None 40 or 41 (timeout_seconds is not None and (now - last_refresh) >= timeout_seconds) 42 ) 43 if not self.temporary and timed_out: 44 self._attributes_sync_time = now 45 local_attributes = self.__dict__.get('_attributes', {}) 46 with Venv(get_connector_plugin(self.instance_connector)): 47 instance_attributes = self.instance_connector.get_pipe_attributes(self) 48 self._attributes = apply_patch_to_config(instance_attributes, local_attributes) 49 return self._attributes
Return a dictionary of a pipe's keys and parameters. These values are reflected directly from the pipes table of the instance.
52@property 53def parameters(self) -> Optional[Dict[str, Any]]: 54 """ 55 Return the parameters dictionary of the pipe. 56 """ 57 if 'parameters' not in self.attributes: 58 self.attributes['parameters'] = {} 59 _parameters = self.attributes['parameters'] 60 dt_col = _parameters.get('columns', {}).get('datetime', None) 61 dt_typ = _parameters.get('dtypes', {}).get(dt_col, None) if dt_col else None 62 if dt_col and not dt_typ: 63 if 'dtypes' not in _parameters: 64 self.attributes['parameters']['dtypes'] = {} 65 self.attributes['parameters']['dtypes'][dt_col] = 'datetime' 66 return self.attributes['parameters']
Return the parameters dictionary of the pipe.
78@property 79def columns(self) -> Union[Dict[str, str], None]: 80 """ 81 Return the `columns` dictionary defined in `meerschaum.Pipe.parameters`. 82 """ 83 if 'columns' not in self.parameters: 84 self.parameters['columns'] = {} 85 cols = self.parameters['columns'] 86 if not isinstance(cols, dict): 87 cols = {} 88 self.parameters['columns'] = cols 89 return {col_ix: col for col_ix, col in cols.items() if col}
Return the columns
dictionary defined in meerschaum.Pipe.parameters
.
106@property 107def indices(self) -> Union[Dict[str, Union[str, List[str]]], None]: 108 """ 109 Return the `indices` dictionary defined in `meerschaum.Pipe.parameters`. 110 """ 111 indices_key = ( 112 'indexes' 113 if 'indexes' in self.parameters 114 else 'indices' 115 ) 116 if indices_key not in self.parameters: 117 self.parameters[indices_key] = {} 118 _indices = self.parameters[indices_key] 119 _columns = self.columns 120 dt_col = _columns.get('datetime', None) 121 if not isinstance(_indices, dict): 122 _indices = {} 123 self.parameters[indices_key] = _indices 124 unique_cols = list(set(( 125 [dt_col] 126 if dt_col 127 else [] 128 ) + [ 129 col 130 for col_ix, col in _columns.items() 131 if col and col_ix != 'datetime' 132 ])) 133 return { 134 **({'unique': unique_cols} if len(unique_cols) > 1 else {}), 135 **{col_ix: col for col_ix, col in _columns.items() if col}, 136 **_indices 137 }
Return the indices
dictionary defined in meerschaum.Pipe.parameters
.
140@property 141def indexes(self) -> Union[Dict[str, Union[str, List[str]]], None]: 142 """ 143 Alias for `meerschaum.Pipe.indices`. 144 """ 145 return self.indices
Alias for meerschaum.Pipe.indices
.
198@property 199def dtypes(self) -> Union[Dict[str, Any], None]: 200 """ 201 If defined, return the `dtypes` dictionary defined in `meerschaum.Pipe.parameters`. 202 """ 203 from meerschaum.config._patch import apply_patch_to_config 204 configured_dtypes = self.parameters.get('dtypes', {}) 205 remote_dtypes = self.infer_dtypes(persist=False) 206 patched_dtypes = apply_patch_to_config(remote_dtypes, configured_dtypes) 207 return patched_dtypes
If defined, return the dtypes
dictionary defined in meerschaum.Pipe.parameters
.
255@property 256def autoincrement(self) -> bool: 257 """ 258 Return the `autoincrement` parameter for the pipe. 259 """ 260 if 'autoincrement' not in self.parameters: 261 self.parameters['autoincrement'] = False 262 263 return self.parameters['autoincrement']
Return the autoincrement
parameter for the pipe.
219@property 220def upsert(self) -> bool: 221 """ 222 Return whether `upsert` is set for the pipe. 223 """ 224 if 'upsert' not in self.parameters: 225 self.parameters['upsert'] = False 226 return self.parameters['upsert']
Return whether upsert
is set for the pipe.
237@property 238def static(self) -> bool: 239 """ 240 Return whether `static` is set for the pipe. 241 """ 242 if 'static' not in self.parameters: 243 self.parameters['static'] = False 244 return self.parameters['static']
Return whether static
is set for the pipe.
274@property 275def tzinfo(self) -> Union[None, timezone]: 276 """ 277 Return `timezone.utc` if the pipe is timezone-aware. 278 """ 279 dt_col = self.columns.get('datetime', None) 280 if not dt_col: 281 return None 282 283 dt_typ = str(self.dtypes.get(dt_col, 'datetime64[ns, UTC]')) 284 if 'utc' in dt_typ.lower() or dt_typ == 'datetime': 285 return timezone.utc 286 287 if dt_typ == 'datetime64[ns]': 288 return None 289 290 return None
Return timezone.utc
if the pipe is timezone-aware.
293def get_columns(self, *args: str, error: bool = False) -> Union[str, Tuple[str]]: 294 """ 295 Check if the requested columns are defined. 296 297 Parameters 298 ---------- 299 *args: str 300 The column names to be retrieved. 301 302 error: bool, default False 303 If `True`, raise an `Exception` if the specified column is not defined. 304 305 Returns 306 ------- 307 A tuple of the same size of `args` or a `str` if `args` is a single argument. 308 309 Examples 310 -------- 311 >>> pipe = mrsm.Pipe('test', 'test') 312 >>> pipe.columns = {'datetime': 'dt', 'id': 'id'} 313 >>> pipe.get_columns('datetime', 'id') 314 ('dt', 'id') 315 >>> pipe.get_columns('value', error=True) 316 Exception: 🛑 Missing 'value' column for Pipe('test', 'test'). 317 """ 318 from meerschaum.utils.warnings import error as _error, warn 319 if not args: 320 args = tuple(self.columns.keys()) 321 col_names = [] 322 for col in args: 323 col_name = None 324 try: 325 col_name = self.columns[col] 326 if col_name is None and error: 327 _error(f"Please define the name of the '{col}' column for {self}.") 328 except Exception as e: 329 col_name = None 330 if col_name is None and error: 331 _error(f"Missing '{col}'" + f" column for {self}.") 332 col_names.append(col_name) 333 if len(col_names) == 1: 334 return col_names[0] 335 return tuple(col_names)
Check if the requested columns are defined.
Parameters
- *args (str): The column names to be retrieved.
- error (bool, default False):
If
True
, raise anException
if the specified column is not defined.
Returns
- A tuple of the same size of
args
or astr
ifargs
is a single argument.
Examples
>>> pipe = mrsm.Pipe('test', 'test')
>>> pipe.columns = {'datetime': 'dt', 'id': 'id'}
>>> pipe.get_columns('datetime', 'id')
('dt', 'id')
>>> pipe.get_columns('value', error=True)
Exception: 🛑 Missing 'value' column for Pipe('test', 'test').
338def get_columns_types( 339 self, 340 refresh: bool = False, 341 debug: bool = False, 342) -> Union[Dict[str, str], None]: 343 """ 344 Get a dictionary of a pipe's column names and their types. 345 346 Parameters 347 ---------- 348 refresh: bool, default False 349 If `True`, invalidate the cache and fetch directly from the instance connector. 350 351 debug: bool, default False: 352 Verbosity toggle. 353 354 Returns 355 ------- 356 A dictionary of column names (`str`) to column types (`str`). 357 358 Examples 359 -------- 360 >>> pipe.get_columns_types() 361 { 362 'dt': 'TIMESTAMP WITH TIMEZONE', 363 'id': 'BIGINT', 364 'val': 'DOUBLE PRECISION', 365 } 366 >>> 367 """ 368 import time 369 from meerschaum.connectors import get_connector_plugin 370 from meerschaum.config.static import STATIC_CONFIG 371 from meerschaum.utils.warnings import dprint 372 373 now = time.perf_counter() 374 cache_seconds = STATIC_CONFIG['pipes']['static_schema_cache_seconds'] 375 if not self.static: 376 refresh = True 377 if refresh: 378 _ = self.__dict__.pop('_columns_types_timestamp', None) 379 _ = self.__dict__.pop('_columns_types', None) 380 _columns_types = self.__dict__.get('_columns_types', None) 381 if _columns_types: 382 columns_types_timestamp = self.__dict__.get('_columns_types_timestamp', None) 383 if columns_types_timestamp is not None: 384 delta = now - columns_types_timestamp 385 if delta < cache_seconds: 386 if debug: 387 dprint( 388 f"Returning cached `columns_types` for {self} " 389 f"({round(delta, 2)} seconds old)." 390 ) 391 return _columns_types 392 393 with mrsm.Venv(get_connector_plugin(self.instance_connector)): 394 _columns_types = ( 395 self.instance_connector.get_pipe_columns_types(self, debug=debug) 396 if hasattr(self.instance_connector, 'get_pipe_columns_types') 397 else None 398 ) 399 400 self.__dict__['_columns_types'] = _columns_types 401 self.__dict__['_columns_types_timestamp'] = now 402 return _columns_types or {}
Get a dictionary of a pipe's column names and their types.
Parameters
- refresh (bool, default False):
If
True
, invalidate the cache and fetch directly from the instance connector. - debug (bool, default False:): Verbosity toggle.
Returns
- A dictionary of column names (
str
) to column types (str
).
Examples
>>> pipe.get_columns_types()
{
'dt': 'TIMESTAMP WITH TIMEZONE',
'id': 'BIGINT',
'val': 'DOUBLE PRECISION',
}
>>>
405def get_columns_indices( 406 self, 407 debug: bool = False, 408 refresh: bool = False, 409) -> Dict[str, List[Dict[str, str]]]: 410 """ 411 Return a dictionary mapping columns to index information. 412 """ 413 import time 414 from meerschaum.connectors import get_connector_plugin 415 from meerschaum.config.static import STATIC_CONFIG 416 from meerschaum.utils.warnings import dprint 417 418 now = time.perf_counter() 419 cache_seconds = ( 420 STATIC_CONFIG['pipes']['static_schema_cache_seconds'] 421 if self.static 422 else STATIC_CONFIG['pipes']['exists_timeout_seconds'] 423 ) 424 if refresh: 425 _ = self.__dict__.pop('_columns_indices_timestamp', None) 426 _ = self.__dict__.pop('_columns_indices', None) 427 _columns_indices = self.__dict__.get('_columns_indices', None) 428 if _columns_indices: 429 columns_indices_timestamp = self.__dict__.get('_columns_indices_timestamp', None) 430 if columns_indices_timestamp is not None: 431 delta = now - columns_indices_timestamp 432 if delta < cache_seconds: 433 if debug: 434 dprint( 435 f"Returning cached `columns_indices` for {self} " 436 f"({round(delta, 2)} seconds old)." 437 ) 438 return _columns_indices 439 440 with mrsm.Venv(get_connector_plugin(self.instance_connector)): 441 _columns_indices = ( 442 self.instance_connector.get_pipe_columns_indices(self, debug=debug) 443 if hasattr(self.instance_connector, 'get_pipe_columns_indices') 444 else None 445 ) 446 447 self.__dict__['_columns_indices'] = _columns_indices 448 self.__dict__['_columns_indices_timestamp'] = now 449 return _columns_indices or {}
Return a dictionary mapping columns to index information.
689def get_indices(self) -> Dict[str, str]: 690 """ 691 Return a dictionary mapping index keys to their names on the database. 692 693 Returns 694 ------- 695 A dictionary of index keys to column names. 696 """ 697 _parameters = self.parameters 698 _index_template = _parameters.get('index_template', "IX_{target}_{column_names}") 699 _indices = self.indices 700 _target = self.target 701 _column_names = { 702 ix: ( 703 '_'.join(cols) 704 if isinstance(cols, (list, tuple)) 705 else str(cols) 706 ) 707 for ix, cols in _indices.items() 708 if cols 709 } 710 _index_names = { 711 ix: _index_template.format( 712 target=_target, 713 column_names=column_names, 714 connector_keys=self.connector_keys, 715 metric_key=self.connector_key, 716 location_key=self.location_key, 717 ) 718 for ix, column_names in _column_names.items() 719 } 720 ### NOTE: Skip any duplicate indices. 721 seen_index_names = {} 722 for ix, index_name in _index_names.items(): 723 if index_name in seen_index_names: 724 continue 725 seen_index_names[index_name] = ix 726 return { 727 ix: index_name 728 for index_name, ix in seen_index_names.items() 729 }
Return a dictionary mapping index keys to their names on the database.
Returns
- A dictionary of index keys to column names.
452def get_id(self, **kw: Any) -> Union[int, None]: 453 """ 454 Fetch a pipe's ID from its instance connector. 455 If the pipe does not exist, return `None`. 456 """ 457 if self.temporary: 458 return None 459 from meerschaum.utils.venv import Venv 460 from meerschaum.connectors import get_connector_plugin 461 462 with Venv(get_connector_plugin(self.instance_connector)): 463 if hasattr(self.instance_connector, 'get_pipe_id'): 464 return self.instance_connector.get_pipe_id(self, **kw) 465 466 return None
Fetch a pipe's ID from its instance connector.
If the pipe does not exist, return None
.
469@property 470def id(self) -> Union[int, None]: 471 """ 472 Fetch and cache a pipe's ID. 473 """ 474 if not ('_id' in self.__dict__ and self._id): 475 self._id = self.get_id() 476 return self._id
Fetch and cache a pipe's ID.
479def get_val_column(self, debug: bool = False) -> Union[str, None]: 480 """ 481 Return the name of the value column if it's defined, otherwise make an educated guess. 482 If not set in the `columns` dictionary, return the first numeric column that is not 483 an ID or datetime column. 484 If none may be found, return `None`. 485 486 Parameters 487 ---------- 488 debug: bool, default False: 489 Verbosity toggle. 490 491 Returns 492 ------- 493 Either a string or `None`. 494 """ 495 from meerschaum.utils.debug import dprint 496 if debug: 497 dprint('Attempting to determine the value column...') 498 try: 499 val_name = self.get_columns('value') 500 except Exception as e: 501 val_name = None 502 if val_name is not None: 503 if debug: 504 dprint(f"Value column: {val_name}") 505 return val_name 506 507 cols = self.columns 508 if cols is None: 509 if debug: 510 dprint('No columns could be determined. Returning...') 511 return None 512 try: 513 dt_name = self.get_columns('datetime', error=False) 514 except Exception as e: 515 dt_name = None 516 try: 517 id_name = self.get_columns('id', errors=False) 518 except Exception as e: 519 id_name = None 520 521 if debug: 522 dprint(f"dt_name: {dt_name}") 523 dprint(f"id_name: {id_name}") 524 525 cols_types = self.get_columns_types(debug=debug) 526 if cols_types is None: 527 return None 528 if debug: 529 dprint(f"cols_types: {cols_types}") 530 if dt_name is not None: 531 cols_types.pop(dt_name, None) 532 if id_name is not None: 533 cols_types.pop(id_name, None) 534 535 candidates = [] 536 candidate_keywords = {'float', 'double', 'precision', 'int', 'numeric',} 537 for search_term in candidate_keywords: 538 for col, typ in cols_types.items(): 539 if search_term in typ.lower(): 540 candidates.append(col) 541 break 542 if not candidates: 543 if debug: 544 dprint("No value column could be determined.") 545 return None 546 547 return candidates[0]
Return the name of the value column if it's defined, otherwise make an educated guess.
If not set in the columns
dictionary, return the first numeric column that is not
an ID or datetime column.
If none may be found, return None
.
Parameters
- debug (bool, default False:): Verbosity toggle.
Returns
- Either a string or
None
.
550@property 551def parents(self) -> List[meerschaum.Pipe]: 552 """ 553 Return a list of `meerschaum.Pipe` objects to be designated as parents. 554 """ 555 if 'parents' not in self.parameters: 556 return [] 557 from meerschaum.utils.warnings import warn 558 _parents_keys = self.parameters['parents'] 559 if not isinstance(_parents_keys, list): 560 warn( 561 f"Please ensure the parents for {self} are defined as a list of keys.", 562 stacklevel = 4 563 ) 564 return [] 565 from meerschaum import Pipe 566 _parents = [] 567 for keys in _parents_keys: 568 try: 569 p = Pipe(**keys) 570 except Exception as e: 571 warn(f"Unable to build parent with keys '{keys}' for {self}:\n{e}") 572 continue 573 _parents.append(p) 574 return _parents
Return a list of meerschaum.Pipe
objects to be designated as parents.
577@property 578def children(self) -> List[meerschaum.Pipe]: 579 """ 580 Return a list of `meerschaum.Pipe` objects to be designated as children. 581 """ 582 if 'children' not in self.parameters: 583 return [] 584 from meerschaum.utils.warnings import warn 585 _children_keys = self.parameters['children'] 586 if not isinstance(_children_keys, list): 587 warn( 588 f"Please ensure the children for {self} are defined as a list of keys.", 589 stacklevel = 4 590 ) 591 return [] 592 from meerschaum import Pipe 593 _children = [] 594 for keys in _children_keys: 595 try: 596 p = Pipe(**keys) 597 except Exception as e: 598 warn(f"Unable to build parent with keys '{keys}' for {self}:\n{e}") 599 continue 600 _children.append(p) 601 return _children
Return a list of meerschaum.Pipe
objects to be designated as children.
604@property 605def target(self) -> str: 606 """ 607 The target table name. 608 You can set the target name under on of the following keys 609 (checked in this order): 610 - `target` 611 - `target_name` 612 - `target_table` 613 - `target_table_name` 614 """ 615 if 'target' not in self.parameters: 616 default_target = self._target_legacy() 617 default_targets = {default_target} 618 potential_keys = ('target_name', 'target_table', 'target_table_name') 619 _target = None 620 for k in potential_keys: 621 if k in self.parameters: 622 _target = self.parameters[k] 623 break 624 625 _target = _target or default_target 626 627 if self.instance_connector.type == 'sql': 628 from meerschaum.utils.sql import truncate_item_name 629 truncated_target = truncate_item_name(_target, self.instance_connector.flavor) 630 default_targets.add(truncated_target) 631 warned_target = self.__dict__.get('_warned_target', False) 632 if truncated_target != _target and not warned_target: 633 if not warned_target: 634 warn( 635 f"The target '{_target}' is too long for '{self.instance_connector.flavor}', " 636 + f"will use {truncated_target} instead." 637 ) 638 self.__dict__['_warned_target'] = True 639 _target = truncated_target 640 641 if _target in default_targets: 642 return _target 643 self.target = _target 644 return self.parameters['target']
The target table name. You can set the target name under on of the following keys (checked in this order):
target
target_name
target_table
target_table_name
667def guess_datetime(self) -> Union[str, None]: 668 """ 669 Try to determine a pipe's datetime column. 670 """ 671 _dtypes = self.dtypes 672 673 ### Abort if the user explictly disallows a datetime index. 674 if 'datetime' in _dtypes: 675 if _dtypes['datetime'] is None: 676 return None 677 678 from meerschaum.utils.dtypes import are_dtypes_equal 679 dt_cols = [ 680 col 681 for col, typ in _dtypes.items() 682 if are_dtypes_equal(typ, 'datetime') 683 ] 684 if not dt_cols: 685 return None 686 return dt_cols[0]
Try to determine a pipe's datetime column.
12def show( 13 self, 14 nopretty: bool = False, 15 debug: bool = False, 16 **kw 17 ) -> SuccessTuple: 18 """ 19 Show attributes of a Pipe. 20 21 Parameters 22 ---------- 23 nopretty: bool, default False 24 If `True`, simply print the JSON of the pipe's attributes. 25 26 debug: bool, default False 27 Verbosity toggle. 28 29 Returns 30 ------- 31 A `SuccessTuple` of success, message. 32 33 """ 34 import json 35 from meerschaum.utils.formatting import ( 36 pprint, make_header, ANSI, highlight_pipes, fill_ansi, get_console, 37 ) 38 from meerschaum.utils.packages import import_rich, attempt_import 39 from meerschaum.utils.warnings import info 40 attributes_json = json.dumps(self.attributes) 41 if not nopretty: 42 _to_print = f"Attributes for {self}:" 43 if ANSI: 44 _to_print = fill_ansi(highlight_pipes(make_header(_to_print)), 'magenta') 45 print(_to_print) 46 rich = import_rich() 47 rich_json = attempt_import('rich.json') 48 get_console().print(rich_json.JSON(attributes_json)) 49 else: 50 print(_to_print) 51 else: 52 print(attributes_json) 53 54 return True, "Success"
Show attributes of a Pipe.
Parameters
- nopretty (bool, default False):
If
True
, simply print the JSON of the pipe's attributes. - debug (bool, default False): Verbosity toggle.
Returns
- A
SuccessTuple
of success, message.
21def edit( 22 self, 23 patch: bool = False, 24 interactive: bool = False, 25 debug: bool = False, 26 **kw: Any 27) -> SuccessTuple: 28 """ 29 Edit a Pipe's configuration. 30 31 Parameters 32 ---------- 33 patch: bool, default False 34 If `patch` is True, update parameters by cascading rather than overwriting. 35 interactive: bool, default False 36 If `True`, open an editor for the user to make changes to the pipe's YAML file. 37 debug: bool, default False 38 Verbosity toggle. 39 40 Returns 41 ------- 42 A `SuccessTuple` of success, message. 43 44 """ 45 from meerschaum.utils.venv import Venv 46 from meerschaum.connectors import get_connector_plugin 47 48 if self.temporary: 49 return False, "Cannot edit pipes created with `temporary=True` (read-only)." 50 51 if not interactive: 52 with Venv(get_connector_plugin(self.instance_connector)): 53 return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw) 54 55 from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH 56 from meerschaum.utils.misc import edit_file 57 parameters_filename = str(self) + '.yaml' 58 parameters_path = PIPES_CACHE_RESOURCES_PATH / parameters_filename 59 60 from meerschaum.utils.yaml import yaml 61 62 edit_text = f"Edit the parameters for {self}" 63 edit_top = '#' * (len(edit_text) + 4) 64 edit_header = edit_top + f'\n# {edit_text} #\n' + edit_top + '\n\n' 65 66 from meerschaum.config import get_config 67 parameters = dict(get_config('pipes', 'parameters', patch=True)) 68 from meerschaum.config._patch import apply_patch_to_config 69 parameters = apply_patch_to_config(parameters, self.parameters) 70 71 ### write parameters to yaml file 72 with open(parameters_path, 'w+') as f: 73 f.write(edit_header) 74 yaml.dump(parameters, stream=f, sort_keys=False) 75 76 ### only quit editing if yaml is valid 77 editing = True 78 while editing: 79 edit_file(parameters_path) 80 try: 81 with open(parameters_path, 'r') as f: 82 file_parameters = yaml.load(f.read()) 83 except Exception as e: 84 from meerschaum.utils.warnings import warn 85 warn(f"Invalid format defined for '{self}':\n\n{e}") 86 input(f"Press [Enter] to correct the configuration for '{self}': ") 87 else: 88 editing = False 89 90 self.parameters = file_parameters 91 92 if debug: 93 from meerschaum.utils.formatting import pprint 94 pprint(self.parameters) 95 96 with Venv(get_connector_plugin(self.instance_connector)): 97 return self.instance_connector.edit_pipe(self, patch=patch, debug=debug, **kw)
Edit a Pipe's configuration.
Parameters
- patch (bool, default False):
If
patch
is True, update parameters by cascading rather than overwriting. - interactive (bool, default False):
If
True
, open an editor for the user to make changes to the pipe's YAML file. - debug (bool, default False): Verbosity toggle.
Returns
- A
SuccessTuple
of success, message.
100def edit_definition( 101 self, 102 yes: bool = False, 103 noask: bool = False, 104 force: bool = False, 105 debug : bool = False, 106 **kw : Any 107) -> SuccessTuple: 108 """ 109 Edit a pipe's definition file and update its configuration. 110 **NOTE:** This function is interactive and should not be used in automated scripts! 111 112 Returns 113 ------- 114 A `SuccessTuple` of success, message. 115 116 """ 117 if self.temporary: 118 return False, "Cannot edit pipes created with `temporary=True` (read-only)." 119 120 from meerschaum.connectors import instance_types 121 if (self.connector is None) or self.connector.type not in instance_types: 122 return self.edit(interactive=True, debug=debug, **kw) 123 124 import json 125 from meerschaum.utils.warnings import info, warn 126 from meerschaum.utils.debug import dprint 127 from meerschaum.config._patch import apply_patch_to_config 128 from meerschaum.utils.misc import edit_file 129 130 _parameters = self.parameters 131 if 'fetch' not in _parameters: 132 _parameters['fetch'] = {} 133 134 def _edit_api(): 135 from meerschaum.utils.prompt import prompt, yes_no 136 info( 137 f"Please enter the keys of the source pipe from '{self.connector}'.\n" + 138 "Type 'None' for None, or empty when there is no default. Press [CTRL+C] to skip." 139 ) 140 141 _keys = { 'connector_keys' : None, 'metric_key' : None, 'location_key' : None } 142 for k in _keys: 143 _keys[k] = _parameters['fetch'].get(k, None) 144 145 for k, v in _keys.items(): 146 try: 147 _keys[k] = prompt(k.capitalize().replace('_', ' ') + ':', icon=True, default=v) 148 except KeyboardInterrupt: 149 continue 150 if _keys[k] in ('', 'None', '\'None\'', '[None]'): 151 _keys[k] = None 152 153 _parameters['fetch'] = apply_patch_to_config(_parameters['fetch'], _keys) 154 155 info("You may optionally specify additional filter parameters as JSON.") 156 print(" Parameters are translated into a 'WHERE x AND y' clause, and lists are IN clauses.") 157 print(" For example, the following JSON would correspond to 'WHERE x = 1 AND y IN (2, 3)':") 158 print(json.dumps({'x': 1, 'y': [2, 3]}, indent=2, separators=(',', ': '))) 159 if force or yes_no( 160 "Would you like to add additional filter parameters?", 161 yes=yes, noask=noask 162 ): 163 from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH 164 definition_filename = str(self) + '.json' 165 definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename 166 try: 167 definition_path.touch() 168 with open(definition_path, 'w+') as f: 169 json.dump(_parameters.get('fetch', {}).get('params', {}), f, indent=2) 170 except Exception as e: 171 return False, f"Failed writing file '{definition_path}':\n" + str(e) 172 173 _params = None 174 while True: 175 edit_file(definition_path) 176 try: 177 with open(definition_path, 'r') as f: 178 _params = json.load(f) 179 except Exception as e: 180 warn(f'Failed to read parameters JSON:\n{e}', stack=False) 181 if force or yes_no( 182 "Would you like to try again?\n " 183 + "If not, the parameters JSON file will be ignored.", 184 noask=noask, yes=yes 185 ): 186 continue 187 _params = None 188 break 189 if _params is not None: 190 if 'fetch' not in _parameters: 191 _parameters['fetch'] = {} 192 _parameters['fetch']['params'] = _params 193 194 self.parameters = _parameters 195 return True, "Success" 196 197 def _edit_sql(): 198 import pathlib, os, textwrap 199 from meerschaum.config._paths import PIPES_CACHE_RESOURCES_PATH 200 from meerschaum.utils.misc import edit_file 201 definition_filename = str(self) + '.sql' 202 definition_path = PIPES_CACHE_RESOURCES_PATH / definition_filename 203 204 sql_definition = _parameters['fetch'].get('definition', None) 205 if sql_definition is None: 206 sql_definition = '' 207 sql_definition = textwrap.dedent(sql_definition).lstrip() 208 209 try: 210 definition_path.touch() 211 with open(definition_path, 'w+') as f: 212 f.write(sql_definition) 213 except Exception as e: 214 return False, f"Failed writing file '{definition_path}':\n" + str(e) 215 216 edit_file(definition_path) 217 try: 218 with open(definition_path, 'r') as f: 219 file_definition = f.read() 220 except Exception as e: 221 return False, f"Failed reading file '{definition_path}':\n" + str(e) 222 223 if sql_definition == file_definition: 224 return False, f"No changes made to definition for {self}." 225 226 if ' ' not in file_definition: 227 return False, f"Invalid SQL definition for {self}." 228 229 if debug: 230 dprint("Read SQL definition:\n\n" + file_definition) 231 _parameters['fetch']['definition'] = file_definition 232 self.parameters = _parameters 233 return True, "Success" 234 235 locals()['_edit_' + str(self.connector.type)]() 236 return self.edit(interactive=False, debug=debug, **kw)
Edit a pipe's definition file and update its configuration. NOTE: This function is interactive and should not be used in automated scripts!
Returns
- A
SuccessTuple
of success, message.
13def update(self, *args, **kw) -> SuccessTuple: 14 """ 15 Update a pipe's parameters in its instance. 16 """ 17 kw['interactive'] = False 18 return self.edit(*args, **kw)
Update a pipe's parameters in its instance.
40def sync( 41 self, 42 df: Union[ 43 pd.DataFrame, 44 Dict[str, List[Any]], 45 List[Dict[str, Any]], 46 InferFetch 47 ] = InferFetch, 48 begin: Union[datetime, int, str, None] = '', 49 end: Union[datetime, int, None] = None, 50 force: bool = False, 51 retries: int = 10, 52 min_seconds: int = 1, 53 check_existing: bool = True, 54 blocking: bool = True, 55 workers: Optional[int] = None, 56 callback: Optional[Callable[[Tuple[bool, str]], Any]] = None, 57 error_callback: Optional[Callable[[Exception], Any]] = None, 58 chunksize: Optional[int] = -1, 59 sync_chunks: bool = True, 60 debug: bool = False, 61 _inplace: bool = True, 62 **kw: Any 63) -> SuccessTuple: 64 """ 65 Fetch new data from the source and update the pipe's table with new data. 66 67 Get new remote data via fetch, get existing data in the same time period, 68 and merge the two, only keeping the unseen data. 69 70 Parameters 71 ---------- 72 df: Union[None, pd.DataFrame, Dict[str, List[Any]]], default None 73 An optional DataFrame to sync into the pipe. Defaults to `None`. 74 75 begin: Union[datetime, int, str, None], default '' 76 Optionally specify the earliest datetime to search for data. 77 78 end: Union[datetime, int, str, None], default None 79 Optionally specify the latest datetime to search for data. 80 81 force: bool, default False 82 If `True`, keep trying to sync untul `retries` attempts. 83 84 retries: int, default 10 85 If `force`, how many attempts to try syncing before declaring failure. 86 87 min_seconds: Union[int, float], default 1 88 If `force`, how many seconds to sleep between retries. Defaults to `1`. 89 90 check_existing: bool, default True 91 If `True`, pull and diff with existing data from the pipe. 92 93 blocking: bool, default True 94 If `True`, wait for sync to finish and return its result, otherwise 95 asyncronously sync (oxymoron?) and return success. Defaults to `True`. 96 Only intended for specific scenarios. 97 98 workers: Optional[int], default None 99 If provided and the instance connector is thread-safe 100 (`pipe.instance_connector.IS_THREAD_SAFE is True`), 101 limit concurrent sync to this many threads. 102 103 callback: Optional[Callable[[Tuple[bool, str]], Any]], default None 104 Callback function which expects a SuccessTuple as input. 105 Only applies when `blocking=False`. 106 107 error_callback: Optional[Callable[[Exception], Any]], default None 108 Callback function which expects an Exception as input. 109 Only applies when `blocking=False`. 110 111 chunksize: int, default -1 112 Specify the number of rows to sync per chunk. 113 If `-1`, resort to system configuration (default is `900`). 114 A `chunksize` of `None` will sync all rows in one transaction. 115 116 sync_chunks: bool, default True 117 If possible, sync chunks while fetching them into memory. 118 119 debug: bool, default False 120 Verbosity toggle. Defaults to False. 121 122 Returns 123 ------- 124 A `SuccessTuple` of success (`bool`) and message (`str`). 125 """ 126 from meerschaum.utils.debug import dprint, _checkpoint 127 from meerschaum.connectors import custom_types 128 from meerschaum.plugins import Plugin 129 from meerschaum.utils.formatting import get_console 130 from meerschaum.utils.venv import Venv 131 from meerschaum.connectors import get_connector_plugin 132 from meerschaum.utils.misc import df_is_chunk_generator, filter_keywords, filter_arguments 133 from meerschaum.utils.pool import get_pool 134 from meerschaum.config import get_config 135 136 if (callback is not None or error_callback is not None) and blocking: 137 warn("Callback functions are only executed when blocking = False. Ignoring...") 138 139 _checkpoint(_total=2, **kw) 140 141 if chunksize == 0: 142 chunksize = None 143 sync_chunks = False 144 145 begin, end = self.parse_date_bounds(begin, end) 146 kw.update({ 147 'begin': begin, 148 'end': end, 149 'force': force, 150 'retries': retries, 151 'min_seconds': min_seconds, 152 'check_existing': check_existing, 153 'blocking': blocking, 154 'workers': workers, 155 'callback': callback, 156 'error_callback': error_callback, 157 'sync_chunks': sync_chunks, 158 'chunksize': chunksize, 159 }) 160 161 ### NOTE: Invalidate `_exists` cache before and after syncing. 162 self._exists = None 163 164 def _sync( 165 p: 'meerschaum.Pipe', 166 df: Union[ 167 'pd.DataFrame', 168 Dict[str, List[Any]], 169 List[Dict[str, Any]], 170 InferFetch 171 ] = InferFetch, 172 ) -> SuccessTuple: 173 if df is None: 174 p._exists = None 175 return ( 176 False, 177 f"You passed `None` instead of data into `sync()` for {p}.\n" 178 + "Omit the DataFrame to infer fetching.", 179 ) 180 ### Ensure that Pipe is registered. 181 if not p.temporary and p.get_id(debug=debug) is None: 182 ### NOTE: This may trigger an interactive session for plugins! 183 register_success, register_msg = p.register(debug=debug) 184 if not register_success: 185 if 'already' not in register_msg: 186 p._exists = None 187 return register_success, register_msg 188 189 ### If connector is a plugin with a `sync()` method, return that instead. 190 ### If the plugin does not have a `sync()` method but does have a `fetch()` method, 191 ### use that instead. 192 ### NOTE: The DataFrame must be omitted for the plugin sync method to apply. 193 ### If a DataFrame is provided, continue as expected. 194 if hasattr(df, 'MRSM_INFER_FETCH'): 195 try: 196 if p.connector is None: 197 if ':' not in p.connector_keys: 198 return True, f"{p} does not support fetching; nothing to do." 199 200 msg = f"{p} does not have a valid connector." 201 if p.connector_keys.startswith('plugin:'): 202 msg += f"\n Perhaps {p.connector_keys} has a syntax error?" 203 p._exists = None 204 return False, msg 205 except Exception: 206 p._exists = None 207 return False, f"Unable to create the connector for {p}." 208 209 ### Sync in place if this is a SQL pipe. 210 if ( 211 str(self.connector) == str(self.instance_connector) 212 and 213 hasattr(self.instance_connector, 'sync_pipe_inplace') 214 and 215 _inplace 216 and 217 get_config('system', 'experimental', 'inplace_sync') 218 ): 219 with Venv(get_connector_plugin(self.instance_connector)): 220 p._exists = None 221 _args, _kwargs = filter_arguments( 222 p.instance_connector.sync_pipe_inplace, 223 p, 224 debug=debug, 225 **kw 226 ) 227 return self.instance_connector.sync_pipe_inplace( 228 *_args, 229 **_kwargs 230 ) 231 232 ### Activate and invoke `sync(pipe)` for plugin connectors with `sync` methods. 233 try: 234 if getattr(p.connector, 'sync', None) is not None: 235 with Venv(get_connector_plugin(p.connector), debug=debug): 236 _args, _kwargs = filter_arguments( 237 p.connector.sync, 238 p, 239 debug=debug, 240 **kw 241 ) 242 return_tuple = p.connector.sync(*_args, **_kwargs) 243 p._exists = None 244 if not isinstance(return_tuple, tuple): 245 return_tuple = ( 246 False, 247 f"Plugin '{p.connector.label}' returned non-tuple value: {return_tuple}" 248 ) 249 return return_tuple 250 251 except Exception as e: 252 get_console().print_exception() 253 msg = f"Failed to sync {p} with exception: '" + str(e) + "'" 254 if debug: 255 error(msg, silent=False) 256 p._exists = None 257 return False, msg 258 259 ### Fetch the dataframe from the connector's `fetch()` method. 260 try: 261 with Venv(get_connector_plugin(p.connector), debug=debug): 262 df = p.fetch( 263 **filter_keywords( 264 p.fetch, 265 debug=debug, 266 **kw 267 ) 268 ) 269 except Exception as e: 270 get_console().print_exception( 271 suppress=[ 272 'meerschaum/core/Pipe/_sync.py', 273 'meerschaum/core/Pipe/_fetch.py', 274 ] 275 ) 276 msg = f"Failed to fetch data from {p.connector}:\n {e}" 277 df = None 278 279 if df is None: 280 p._exists = None 281 return False, f"No data were fetched for {p}." 282 283 if isinstance(df, list): 284 if len(df) == 0: 285 return True, f"No new rows were returned for {p}." 286 287 ### May be a chunk hook results list. 288 if isinstance(df[0], tuple): 289 success = all([_success for _success, _ in df]) 290 message = '\n'.join([_message for _, _message in df]) 291 return success, message 292 293 ### TODO: Depreciate async? 294 if df is True: 295 p._exists = None 296 return True, f"{p} is being synced in parallel." 297 298 ### CHECKPOINT: Retrieved the DataFrame. 299 _checkpoint(**kw) 300 301 ### Allow for dataframe generators or iterables. 302 if df_is_chunk_generator(df): 303 kw['workers'] = p.get_num_workers(kw.get('workers', None)) 304 dt_col = p.columns.get('datetime', None) 305 pool = get_pool(workers=kw.get('workers', 1)) 306 if debug: 307 dprint(f"Received {type(df)}. Attempting to sync first chunk...") 308 309 try: 310 chunk = next(df) 311 except StopIteration: 312 return True, "Received an empty generator; nothing to do." 313 314 chunk_success, chunk_msg = _sync(p, chunk) 315 chunk_msg = '\n' + self._get_chunk_label(chunk, dt_col) + '\n' + chunk_msg 316 if not chunk_success: 317 return chunk_success, f"Unable to sync initial chunk for {p}:\n{chunk_msg}" 318 if debug: 319 dprint("Successfully synced the first chunk, attemping the rest...") 320 321 failed_chunks = [] 322 def _process_chunk(_chunk): 323 try: 324 _chunk_success, _chunk_msg = _sync(p, _chunk) 325 except Exception as e: 326 _chunk_success, _chunk_msg = False, str(e) 327 if not _chunk_success: 328 failed_chunks.append(_chunk) 329 return ( 330 _chunk_success, 331 ( 332 '\n' 333 + self._get_chunk_label(_chunk, dt_col) 334 + '\n' 335 + _chunk_msg 336 ) 337 ) 338 339 results = sorted( 340 [(chunk_success, chunk_msg)] + ( 341 list(pool.imap(_process_chunk, df)) 342 if not df_is_chunk_generator(chunk) 343 else [ 344 _process_chunk(_child_chunks) 345 for _child_chunks in df 346 ] 347 ) 348 ) 349 chunk_messages = [chunk_msg for _, chunk_msg in results] 350 success_bools = [chunk_success for chunk_success, _ in results] 351 success = all(success_bools) 352 msg = '\n'.join(chunk_messages) 353 354 ### If some chunks succeeded, retry the failures. 355 retry_success = True 356 if not success and any(success_bools): 357 if debug: 358 dprint("Retrying failed chunks...") 359 chunks_to_retry = [c for c in failed_chunks] 360 failed_chunks = [] 361 for chunk in chunks_to_retry: 362 chunk_success, chunk_msg = _process_chunk(chunk) 363 msg += f"\n\nRetried chunk:\n{chunk_msg}\n" 364 retry_success = retry_success and chunk_success 365 366 success = success and retry_success 367 return success, msg 368 369 ### Cast to a dataframe and ensure datatypes are what we expect. 370 df = self.enforce_dtypes(df, chunksize=chunksize, debug=debug) 371 372 ### Capture `numeric`, `uuid`, and `json` columns. 373 self._persist_new_json_columns(df, debug=debug) 374 self._persist_new_numeric_columns(df, debug=debug) 375 self._persist_new_uuid_columns(df, debug=debug) 376 377 if debug: 378 dprint( 379 "DataFrame to sync:\n" 380 + ( 381 str(df)[:255] 382 + '...' 383 if len(str(df)) >= 256 384 else str(df) 385 ), 386 **kw 387 ) 388 389 ### if force, continue to sync until success 390 return_tuple = False, f"Did not sync {p}." 391 run = True 392 _retries = 1 393 while run: 394 with Venv(get_connector_plugin(self.instance_connector)): 395 return_tuple = p.instance_connector.sync_pipe( 396 pipe=p, 397 df=df, 398 debug=debug, 399 **kw 400 ) 401 _retries += 1 402 run = (not return_tuple[0]) and force and _retries <= retries 403 if run and debug: 404 dprint(f"Syncing failed for {p}. Attempt ( {_retries} / {retries} )", **kw) 405 dprint(f"Sleeping for {min_seconds} seconds...", **kw) 406 time.sleep(min_seconds) 407 if _retries > retries: 408 warn( 409 f"Unable to sync {p} within {retries} attempt" + 410 ("s" if retries != 1 else "") + "!" 411 ) 412 413 ### CHECKPOINT: Finished syncing. Handle caching. 414 _checkpoint(**kw) 415 if self.cache_pipe is not None: 416 if debug: 417 dprint("Caching retrieved dataframe.", **kw) 418 _sync_cache_tuple = self.cache_pipe.sync(df, debug=debug, **kw) 419 if not _sync_cache_tuple[0]: 420 warn(f"Failed to sync local cache for {self}.") 421 422 self._exists = None 423 return return_tuple 424 425 if blocking: 426 self._exists = None 427 return _sync(self, df = df) 428 429 from meerschaum.utils.threading import Thread 430 def default_callback(result_tuple: SuccessTuple): 431 dprint(f"Asynchronous result from {self}: {result_tuple}", **kw) 432 433 def default_error_callback(x: Exception): 434 dprint(f"Error received for {self}: {x}", **kw) 435 436 if callback is None and debug: 437 callback = default_callback 438 if error_callback is None and debug: 439 error_callback = default_error_callback 440 try: 441 thread = Thread( 442 target=_sync, 443 args=(self,), 444 kwargs={'df': df}, 445 daemon=False, 446 callback=callback, 447 error_callback=error_callback, 448 ) 449 thread.start() 450 except Exception as e: 451 self._exists = None 452 return False, str(e) 453 454 self._exists = None 455 return True, f"Spawned asyncronous sync for {self}."
Fetch new data from the source and update the pipe's table with new data.
Get new remote data via fetch, get existing data in the same time period, and merge the two, only keeping the unseen data.
Parameters
- df (Union[None, pd.DataFrame, Dict[str, List[Any]]], default None):
An optional DataFrame to sync into the pipe. Defaults to
None
. - begin (Union[datetime, int, str, None], default ''): Optionally specify the earliest datetime to search for data.
- end (Union[datetime, int, str, None], default None): Optionally specify the latest datetime to search for data.
- force (bool, default False):
If
True
, keep trying to sync untulretries
attempts. - retries (int, default 10):
If
force
, how many attempts to try syncing before declaring failure. - min_seconds (Union[int, float], default 1):
If
force
, how many seconds to sleep between retries. Defaults to1
. - check_existing (bool, default True):
If
True
, pull and diff with existing data from the pipe. - blocking (bool, default True):
If
True
, wait for sync to finish and return its result, otherwise asyncronously sync (oxymoron?) and return success. Defaults toTrue
. Only intended for specific scenarios. - workers (Optional[int], default None):
If provided and the instance connector is thread-safe
(
pipe.instance_connector.IS_THREAD_SAFE is True
), limit concurrent sync to this many threads. - callback (Optional[Callable[[Tuple[bool, str]], Any]], default None):
Callback function which expects a SuccessTuple as input.
Only applies when
blocking=False
. - error_callback (Optional[Callable[[Exception], Any]], default None):
Callback function which expects an Exception as input.
Only applies when
blocking=False
. - chunksize (int, default -1):
Specify the number of rows to sync per chunk.
If
-1
, resort to system configuration (default is900
). Achunksize
ofNone
will sync all rows in one transaction. - sync_chunks (bool, default True): If possible, sync chunks while fetching them into memory.
- debug (bool, default False): Verbosity toggle. Defaults to False.
Returns
- A
SuccessTuple
of success (bool
) and message (str
).
458def get_sync_time( 459 self, 460 params: Optional[Dict[str, Any]] = None, 461 newest: bool = True, 462 apply_backtrack_interval: bool = False, 463 round_down: bool = False, 464 debug: bool = False 465) -> Union['datetime', int, None]: 466 """ 467 Get the most recent datetime value for a Pipe. 468 469 Parameters 470 ---------- 471 params: Optional[Dict[str, Any]], default None 472 Dictionary to build a WHERE clause for a specific column. 473 See `meerschaum.utils.sql.build_where`. 474 475 newest: bool, default True 476 If `True`, get the most recent datetime (honoring `params`). 477 If `False`, get the oldest datetime (`ASC` instead of `DESC`). 478 479 apply_backtrack_interval: bool, default False 480 If `True`, subtract the backtrack interval from the sync time. 481 482 round_down: bool, default False 483 If `True`, round down the datetime value to the nearest minute. 484 485 debug: bool, default False 486 Verbosity toggle. 487 488 Returns 489 ------- 490 A `datetime` or int, if the pipe exists, otherwise `None`. 491 492 """ 493 from meerschaum.utils.venv import Venv 494 from meerschaum.connectors import get_connector_plugin 495 from meerschaum.utils.misc import round_time 496 497 if not self.columns.get('datetime', None): 498 return None 499 500 with Venv(get_connector_plugin(self.instance_connector)): 501 sync_time = self.instance_connector.get_sync_time( 502 self, 503 params=params, 504 newest=newest, 505 debug=debug, 506 ) 507 508 if round_down and isinstance(sync_time, datetime): 509 sync_time = round_time(sync_time, timedelta(minutes=1)) 510 511 if apply_backtrack_interval and sync_time is not None: 512 backtrack_interval = self.get_backtrack_interval(debug=debug) 513 try: 514 sync_time -= backtrack_interval 515 except Exception as e: 516 warn(f"Failed to apply backtrack interval:\n{e}") 517 518 return self.parse_date_bounds(sync_time)
Get the most recent datetime value for a Pipe.
Parameters
- params (Optional[Dict[str, Any]], default None):
Dictionary to build a WHERE clause for a specific column.
See
meerschaum.utils.sql.build_where
. - newest (bool, default True):
If
True
, get the most recent datetime (honoringparams
). IfFalse
, get the oldest datetime (ASC
instead ofDESC
). - apply_backtrack_interval (bool, default False):
If
True
, subtract the backtrack interval from the sync time. - round_down (bool, default False):
If
True
, round down the datetime value to the nearest minute. - debug (bool, default False): Verbosity toggle.
Returns
- A
datetime
or int, if the pipe exists, otherwiseNone
.
521def exists( 522 self, 523 debug: bool = False 524) -> bool: 525 """ 526 See if a Pipe's table exists. 527 528 Parameters 529 ---------- 530 debug: bool, default False 531 Verbosity toggle. 532 533 Returns 534 ------- 535 A `bool` corresponding to whether a pipe's underlying table exists. 536 537 """ 538 import time 539 from meerschaum.utils.venv import Venv 540 from meerschaum.connectors import get_connector_plugin 541 from meerschaum.config import STATIC_CONFIG 542 from meerschaum.utils.debug import dprint 543 now = time.perf_counter() 544 exists_timeout_seconds = STATIC_CONFIG['pipes']['exists_timeout_seconds'] 545 546 _exists = self.__dict__.get('_exists', None) 547 if _exists: 548 exists_timestamp = self.__dict__.get('_exists_timestamp', None) 549 if exists_timestamp is not None: 550 delta = now - exists_timestamp 551 if delta < exists_timeout_seconds: 552 if debug: 553 dprint(f"Returning cached `exists` for {self} ({round(delta, 2)} seconds old).") 554 return _exists 555 556 with Venv(get_connector_plugin(self.instance_connector)): 557 _exists = ( 558 self.instance_connector.pipe_exists(pipe=self, debug=debug) 559 if hasattr(self.instance_connector, 'pipe_exists') 560 else False 561 ) 562 563 self.__dict__['_exists'] = _exists 564 self.__dict__['_exists_timestamp'] = now 565 return _exists
See if a Pipe's table exists.
Parameters
- debug (bool, default False): Verbosity toggle.
Returns
- A
bool
corresponding to whether a pipe's underlying table exists.
568def filter_existing( 569 self, 570 df: 'pd.DataFrame', 571 safe_copy: bool = True, 572 date_bound_only: bool = False, 573 include_unchanged_columns: bool = False, 574 enforce_dtypes: bool = False, 575 chunksize: Optional[int] = -1, 576 debug: bool = False, 577 **kw 578) -> Tuple['pd.DataFrame', 'pd.DataFrame', 'pd.DataFrame']: 579 """ 580 Inspect a dataframe and filter out rows which already exist in the pipe. 581 582 Parameters 583 ---------- 584 df: 'pd.DataFrame' 585 The dataframe to inspect and filter. 586 587 safe_copy: bool, default True 588 If `True`, create a copy before comparing and modifying the dataframes. 589 Setting to `False` may mutate the DataFrames. 590 See `meerschaum.utils.dataframe.filter_unseen_df`. 591 592 date_bound_only: bool, default False 593 If `True`, only use the datetime index to fetch the sample dataframe. 594 595 include_unchanged_columns: bool, default False 596 If `True`, include the backtrack columns which haven't changed in the update dataframe. 597 This is useful if you can't update individual keys. 598 599 enforce_dtypes: bool, default False 600 If `True`, ensure the given and intermediate dataframes are enforced to the correct dtypes. 601 Setting `enforce_dtypes=True` may impact performance. 602 603 chunksize: Optional[int], default -1 604 The `chunksize` used when fetching existing data. 605 606 debug: bool, default False 607 Verbosity toggle. 608 609 Returns 610 ------- 611 A tuple of three pandas DataFrames: unseen, update, and delta. 612 """ 613 from meerschaum.utils.warnings import warn 614 from meerschaum.utils.debug import dprint 615 from meerschaum.utils.packages import attempt_import, import_pandas 616 from meerschaum.utils.misc import round_time 617 from meerschaum.utils.dataframe import ( 618 filter_unseen_df, 619 add_missing_cols_to_df, 620 get_unhashable_cols, 621 get_numeric_cols, 622 ) 623 from meerschaum.utils.dtypes import ( 624 to_pandas_dtype, 625 none_if_null, 626 ) 627 from meerschaum.config import get_config 628 pd = import_pandas() 629 pandas = attempt_import('pandas') 630 if enforce_dtypes or 'dataframe' not in str(type(df)).lower(): 631 df = self.enforce_dtypes(df, chunksize=chunksize, debug=debug) 632 is_dask = hasattr('df', '__module__') and 'dask' in df.__module__ 633 if is_dask: 634 dd = attempt_import('dask.dataframe') 635 merge = dd.merge 636 NA = pandas.NA