Module meerschaum.connectors
Create connectors with get_connector()
.
For ease of use, you can also import from the root meerschaum
module:
>>> from meerschaum import get_connector
>>> conn = get_connector()
Expand source code
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
"""
Create connectors with `meerschaum.connectors.get_connector()`.
For ease of use, you can also import from the root `meerschaum` module:
```
>>> from meerschaum import get_connector
>>> conn = get_connector()
```
"""
from __future__ import annotations
from meerschaum.utils.typing import Any, SuccessTuple, Union, Optional, List, Dict
from meerschaum.utils.threading import Lock, RLock
from meerschaum.utils.warnings import error, warn
from meerschaum.connectors.Connector import Connector, InvalidAttributesError
from meerschaum.connectors.sql.SQLConnector import SQLConnector
from meerschaum.connectors.api.APIConnector import APIConnector
from meerschaum.connectors.sql._create_engine import flavor_configs as sql_flavor_configs
__all__ = ("Connector", "SQLConnector", "APIConnector", "get_connector", "is_connected")
### store connectors partitioned by
### type, label for reuse
connectors: Dict[str, Dict[str, Connector]] = {
'api' : {},
'sql' : {},
'plugin': {},
}
instance_types: List[str] = ['sql', 'api']
_locks: Dict[str, RLock] = {
'connectors' : RLock(),
'types' : RLock(),
'custom_types' : RLock(),
'_loaded_plugin_connectors': RLock(),
'instance_types' : RLock(),
}
attributes: Dict[str, Dict[str, Any]] = {
'api': {
'required': [
'host',
'username',
'password'
],
'default': {
'protocol': 'http',
'port' : 8000,
},
},
'sql': {
'flavors': sql_flavor_configs,
},
}
### Fill this with objects only when connectors are first referenced.
types: Dict[str, Any] = {}
custom_types: set = set()
_loaded_plugin_connectors: bool = False
def get_connector(
type: str = None,
label: str = None,
refresh: bool = False,
debug: bool = False,
**kw: Any
) -> Connector:
"""
Return existing connector or create new connection and store for reuse.
You can create new connectors if enough parameters are provided for the given type and flavor.
Parameters
----------
type: Optional[str], default None
Connector type (sql, api, etc.).
Defaults to the type of the configured `instance_connector`.
label: Optional[str], default None
Connector label (e.g. main). Defaults to `'main'`.
refresh: bool, default False
Refresh the Connector instance / construct new object. Defaults to `False`.
kw: Any
Other arguments to pass to the Connector constructor.
If the Connector has already been constructed and new arguments are provided,
`refresh` is set to `True` and the old Connector is replaced.
Returns
-------
A new Meerschaum connector (e.g. `meerschaum.connectors.api.APIConnector`,
`meerschaum.connectors.sql.SQLConnector`).
Examples
--------
The following parameters would create a new
`meerschaum.connectors.sql.SQLConnector` that isn't in the configuration file.
```
>>> conn = get_connector(
... type = 'sql',
... label = 'newlabel',
... flavor = 'sqlite',
... database = '/file/path/to/database.db'
... )
>>>
```
"""
from meerschaum.connectors.parse import parse_instance_keys
from meerschaum.config import get_config
from meerschaum.config.static import STATIC_CONFIG
from meerschaum.utils.warnings import warn
global _loaded_plugin_connectors
if isinstance(type, str) and not label and ':' in type:
type, label = type.split(':', maxsplit=1)
with _locks['_loaded_plugin_connectors']:
if not _loaded_plugin_connectors:
load_plugin_connectors()
_loaded_plugin_connectors = True
if type is None and label is None:
default_instance_keys = get_config('meerschaum', 'instance', patch=True)
### recursive call to get_connector
return parse_instance_keys(default_instance_keys)
### NOTE: the default instance connector may not be main.
### Only fall back to 'main' if the type is provided by the label is omitted.
label = label if label is not None else STATIC_CONFIG['connectors']['default_label']
### type might actually be a label. Check if so and raise a warning.
if type not in connectors:
possibilities, poss_msg = [], ""
for _type in get_config('meerschaum', 'connectors'):
if type in get_config('meerschaum', 'connectors', _type):
possibilities.append(f"{_type}:{type}")
if len(possibilities) > 0:
poss_msg = " Did you mean"
for poss in possibilities[:-1]:
poss_msg += f" '{poss}',"
if poss_msg.endswith(','):
poss_msg = poss_msg[:-1]
if len(possibilities) > 1:
poss_msg += " or"
poss_msg += f" '{possibilities[-1]}'?"
warn(f"Cannot create Connector of type '{type}'." + poss_msg, stack=False)
return None
if 'sql' not in types:
from meerschaum.connectors.plugin import PluginConnector
with _locks['types']:
types.update({
'api' : APIConnector,
'sql' : SQLConnector,
'plugin': PluginConnector,
})
### determine if we need to call the constructor
if not refresh:
### see if any user-supplied arguments differ from the existing instance
if label in connectors[type]:
warning_message = None
for attribute, value in kw.items():
if attribute not in connectors[type][label].meta:
import inspect
cls = connectors[type][label].__class__
cls_init_signature = inspect.signature(cls)
cls_init_params = cls_init_signature.parameters
if attribute not in cls_init_params:
warning_message = (
f"Received new attribute '{attribute}' not present in connector " +
f"{connectors[type][label]}.\n"
)
elif connectors[type][label].__dict__[attribute] != value:
warning_message = (
f"Mismatched values for attribute '{attribute}' in connector "
+ f"'{connectors[type][label]}'.\n" +
f" - Keyword value: '{value}'\n" +
f" - Existing value: '{connectors[type][label].__dict__[attribute]}'\n"
)
if warning_message is not None:
warning_message += (
"\nSetting `refresh` to True and recreating connector with type:"
+ f" '{type}' and label '{label}'."
)
refresh = True
warn(warning_message)
else: ### connector doesn't yet exist
refresh = True
### only create an object if refresh is True
### (can be manually specified, otherwise determined above)
if refresh:
with _locks['connectors']:
try:
### will raise an error if configuration is incorrect / missing
conn = types[type](label=label, **kw)
connectors[type][label] = conn
except InvalidAttributesError as ie:
warn(
f"Incorrect attributes for connector '{type}:{label}'.\n"
+ str(ie),
stack = False,
)
conn = None
except Exception as e:
from meerschaum.utils.formatting import get_console
console = get_console()
if console:
console.print_exception()
warn(
f"Exception when creating connector '{type}:{label}'.\n" + str(e),
stack = False,
)
conn = None
if conn is None:
return None
return connectors[type][label]
def is_connected(keys: str, **kw) -> bool:
"""
Check if the connector keys correspond to an active connection.
If the connector has not been created, it will immediately return `False`.
If the connector exists but cannot communicate with the source, return `False`.
**NOTE:** Only works with instance connectors (`SQLConnectors` and `APIConnectors`).
Keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`.
Parameters
----------
keys:
The keys to the connector (e.g. `'sql:main'`).
Returns
-------
A `bool` corresponding to whether a successful connection may be made.
"""
import warnings
if ':' not in keys:
warn(f"Invalid connector keys '{keys}'")
try:
typ, label = keys.split(':')
except Exception as e:
return False
if typ not in instance_types:
return False
if not (label in connectors.get(typ, {})):
return False
from meerschaum.connectors.parse import parse_instance_keys
conn = parse_instance_keys(keys)
try:
with warnings.catch_warnings():
warnings.filterwarnings('ignore')
return conn.test_connection(**kw)
except Exception as e:
return False
def make_connector(
cls,
):
"""
Register a class as a `Connector`.
The `type` will be the lower case of the class name, without the suffix `connector`.
Parameters
----------
instance: bool, default False
If `True`, make this connector type an instance connector.
This requires implementing the various pipes functions and lots of testing.
Examples
--------
>>> import meerschaum as mrsm
>>> from meerschaum.connectors import make_connector, Connector
>>> class FooConnector(Connector):
... def __init__(self, label: str, **kw):
... super().__init__('foo', label, **kw)
...
>>> make_connector(FooConnector)
>>> mrsm.get_connector('foo', 'bar')
foo:bar
>>>
"""
import re
typ = re.sub(r'connector$', '', cls.__name__.lower())
with _locks['types']:
types[typ] = cls
with _locks['custom_types']:
custom_types.add(typ)
with _locks['connectors']:
if typ not in connectors:
connectors[typ] = {}
if getattr(cls, 'IS_INSTANCE', False):
with _locks['instance_types']:
if typ not in instance_types:
instance_types.append(typ)
return cls
def load_plugin_connectors():
"""
If a plugin makes use of the `make_connector` decorator,
load its module.
"""
from meerschaum.plugins import get_plugins, import_plugins
to_import = []
for plugin in get_plugins():
with open(plugin.__file__, encoding='utf-8') as f:
text = f.read()
if 'make_connector' in text:
to_import.append(plugin.name)
if not to_import:
return
import_plugins(*to_import)
def get_connector_plugin(
connector: Connector,
) -> Union[str, None, 'meerschaum.Plugin']:
"""
Determine the plugin for a connector.
This is useful for handling virtual environments for custom instance connectors.
Parameters
----------
connector: Connector
The connector which may require a virtual environment.
Returns
-------
A Plugin, 'mrsm', or None.
"""
if not hasattr(connector, 'type'):
return None
from meerschaum import Plugin
plugin_name = (
connector.__module__.replace('plugins.', '').split('.')[0]
if connector.type in custom_types else (
connector.label
if connector.type == 'plugin'
else 'mrsm'
)
)
plugin = Plugin(plugin_name)
return plugin if plugin.is_installed() else None
Sub-modules
meerschaum.connectors.api
-
Interact with the Meerschaum API (send commands, pull data, etc.)
meerschaum.connectors.parse
-
Utility functions for parsing connector keys.
meerschaum.connectors.plugin
-
Allow pipes to source data from installed plugins.
meerschaum.connectors.poll
-
Poll database and API connections.
meerschaum.connectors.sql
-
Subpackage for SQLConnector subclass
Functions
def get_connector(type: str = None, label: str = None, refresh: bool = False, debug: bool = False, **kw: Any) ‑> meerschaum.connectors.Connector.Connector
-
Return existing connector or create new connection and store for reuse.
You can create new connectors if enough parameters are provided for the given type and flavor.
Parameters
type
:Optional[str]
, defaultNone
- Connector type (sql, api, etc.).
Defaults to the type of the configured
instance_connector
. label
:Optional[str]
, defaultNone
- Connector label (e.g. main). Defaults to
'main'
. refresh
:bool
, defaultFalse
- Refresh the Connector instance / construct new object. Defaults to
False
. kw
:Any
- Other arguments to pass to the Connector constructor.
If the Connector has already been constructed and new arguments are provided,
refresh
is set toTrue
and the old Connector is replaced.
Returns
A new Meerschaum connector (e.g.
meerschaum.connectors.api.APIConnector
,meerschaum.connectors.sql.SQLConnector
).Examples
The following parameters would create a new
meerschaum.connectors.sql.SQLConnector
that isn't in the configuration file.>>> conn = get_connector( ... type = 'sql', ... label = 'newlabel', ... flavor = 'sqlite', ... database = '/file/path/to/database.db' ... ) >>>
Expand source code
def get_connector( type: str = None, label: str = None, refresh: bool = False, debug: bool = False, **kw: Any ) -> Connector: """ Return existing connector or create new connection and store for reuse. You can create new connectors if enough parameters are provided for the given type and flavor. Parameters ---------- type: Optional[str], default None Connector type (sql, api, etc.). Defaults to the type of the configured `instance_connector`. label: Optional[str], default None Connector label (e.g. main). Defaults to `'main'`. refresh: bool, default False Refresh the Connector instance / construct new object. Defaults to `False`. kw: Any Other arguments to pass to the Connector constructor. If the Connector has already been constructed and new arguments are provided, `refresh` is set to `True` and the old Connector is replaced. Returns ------- A new Meerschaum connector (e.g. `meerschaum.connectors.api.APIConnector`, `meerschaum.connectors.sql.SQLConnector`). Examples -------- The following parameters would create a new `meerschaum.connectors.sql.SQLConnector` that isn't in the configuration file. ``` >>> conn = get_connector( ... type = 'sql', ... label = 'newlabel', ... flavor = 'sqlite', ... database = '/file/path/to/database.db' ... ) >>> ``` """ from meerschaum.connectors.parse import parse_instance_keys from meerschaum.config import get_config from meerschaum.config.static import STATIC_CONFIG from meerschaum.utils.warnings import warn global _loaded_plugin_connectors if isinstance(type, str) and not label and ':' in type: type, label = type.split(':', maxsplit=1) with _locks['_loaded_plugin_connectors']: if not _loaded_plugin_connectors: load_plugin_connectors() _loaded_plugin_connectors = True if type is None and label is None: default_instance_keys = get_config('meerschaum', 'instance', patch=True) ### recursive call to get_connector return parse_instance_keys(default_instance_keys) ### NOTE: the default instance connector may not be main. ### Only fall back to 'main' if the type is provided by the label is omitted. label = label if label is not None else STATIC_CONFIG['connectors']['default_label'] ### type might actually be a label. Check if so and raise a warning. if type not in connectors: possibilities, poss_msg = [], "" for _type in get_config('meerschaum', 'connectors'): if type in get_config('meerschaum', 'connectors', _type): possibilities.append(f"{_type}:{type}") if len(possibilities) > 0: poss_msg = " Did you mean" for poss in possibilities[:-1]: poss_msg += f" '{poss}'," if poss_msg.endswith(','): poss_msg = poss_msg[:-1] if len(possibilities) > 1: poss_msg += " or" poss_msg += f" '{possibilities[-1]}'?" warn(f"Cannot create Connector of type '{type}'." + poss_msg, stack=False) return None if 'sql' not in types: from meerschaum.connectors.plugin import PluginConnector with _locks['types']: types.update({ 'api' : APIConnector, 'sql' : SQLConnector, 'plugin': PluginConnector, }) ### determine if we need to call the constructor if not refresh: ### see if any user-supplied arguments differ from the existing instance if label in connectors[type]: warning_message = None for attribute, value in kw.items(): if attribute not in connectors[type][label].meta: import inspect cls = connectors[type][label].__class__ cls_init_signature = inspect.signature(cls) cls_init_params = cls_init_signature.parameters if attribute not in cls_init_params: warning_message = ( f"Received new attribute '{attribute}' not present in connector " + f"{connectors[type][label]}.\n" ) elif connectors[type][label].__dict__[attribute] != value: warning_message = ( f"Mismatched values for attribute '{attribute}' in connector " + f"'{connectors[type][label]}'.\n" + f" - Keyword value: '{value}'\n" + f" - Existing value: '{connectors[type][label].__dict__[attribute]}'\n" ) if warning_message is not None: warning_message += ( "\nSetting `refresh` to True and recreating connector with type:" + f" '{type}' and label '{label}'." ) refresh = True warn(warning_message) else: ### connector doesn't yet exist refresh = True ### only create an object if refresh is True ### (can be manually specified, otherwise determined above) if refresh: with _locks['connectors']: try: ### will raise an error if configuration is incorrect / missing conn = types[type](label=label, **kw) connectors[type][label] = conn except InvalidAttributesError as ie: warn( f"Incorrect attributes for connector '{type}:{label}'.\n" + str(ie), stack = False, ) conn = None except Exception as e: from meerschaum.utils.formatting import get_console console = get_console() if console: console.print_exception() warn( f"Exception when creating connector '{type}:{label}'.\n" + str(e), stack = False, ) conn = None if conn is None: return None return connectors[type][label]
def is_connected(keys: str, **kw) ‑> bool
-
Check if the connector keys correspond to an active connection. If the connector has not been created, it will immediately return
False
. If the connector exists but cannot communicate with the source, returnFalse
.NOTE: Only works with instance connectors (
SQLConnectors
andAPIConnectors
). Keyword arguments are passed toretry_connect()
.Parameters
keys: The keys to the connector (e.g.
'sql:main'
).Returns
A
bool
corresponding to whether a successful connection may be made.Expand source code
def is_connected(keys: str, **kw) -> bool: """ Check if the connector keys correspond to an active connection. If the connector has not been created, it will immediately return `False`. If the connector exists but cannot communicate with the source, return `False`. **NOTE:** Only works with instance connectors (`SQLConnectors` and `APIConnectors`). Keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`. Parameters ---------- keys: The keys to the connector (e.g. `'sql:main'`). Returns ------- A `bool` corresponding to whether a successful connection may be made. """ import warnings if ':' not in keys: warn(f"Invalid connector keys '{keys}'") try: typ, label = keys.split(':') except Exception as e: return False if typ not in instance_types: return False if not (label in connectors.get(typ, {})): return False from meerschaum.connectors.parse import parse_instance_keys conn = parse_instance_keys(keys) try: with warnings.catch_warnings(): warnings.filterwarnings('ignore') return conn.test_connection(**kw) except Exception as e: return False
Classes
class APIConnector (label: Optional[str] = None, wait: bool = False, debug: bool = False, **kw)
-
Connect to a Meerschaum API instance.
Parameters
type
:str
- The
type
of the connector (e.g.meerschaum.connectors.sql
,meerschaum.connectors.api
,meerschaum.connectors.plugin
). label
:str
- The
label
for the connector.
Run
mrsm edit config
and to edit connectors in the YAML file:meerschaum: connections: {type}: {label}: ### attributes go here
Expand source code
class APIConnector(Connector): """ Connect to a Meerschaum API instance. """ IS_INSTANCE: bool = True IS_THREAD_SAFE: bool = False from ._delete import delete from ._post import post from ._patch import patch from ._get import get, wget from ._actions import get_actions, do_action from ._misc import get_mrsm_version, get_chaining_status from ._pipes import ( register_pipe, fetch_pipes_keys, edit_pipe, sync_pipe, delete_pipe, get_pipe_data, get_backtrack_data, get_pipe_id, get_pipe_attributes, get_sync_time, pipe_exists, create_metadata, get_pipe_rowcount, drop_pipe, clear_pipe, get_pipe_columns_types, ) from ._fetch import fetch from ._plugins import ( register_plugin, install_plugin, delete_plugin, get_plugins, get_plugin_attributes, ) from ._login import login, test_connection from ._users import ( register_user, get_user_id, get_users, edit_user, delete_user, get_user_password_hash, get_user_type, get_user_attributes, ) from ._uri import from_uri def __init__( self, label: Optional[str] = None, wait: bool = False, debug: bool = False, **kw ): if 'uri' in kw: from_uri_params = self.from_uri(kw['uri'], as_dict=True) label = label or from_uri_params.get('label', None) from_uri_params.pop('label', None) kw.update(from_uri_params) super().__init__('api', label=label, **kw) if 'protocol' not in self.__dict__: self.protocol = 'http' if 'port' not in self.__dict__: self.port = 8000 if 'uri' not in self.__dict__: self.verify_attributes(required_attributes) else: from meerschaum.connectors.sql import SQLConnector conn_attrs = SQLConnector.parse_uri(self.__dict__['uri']) if 'host' not in conn_attrs: raise Exception(f"Invalid URI for '{self}'.") self.__dict__.update(conn_attrs) self.url = ( self.protocol + '://' + self.host + ':' + str(self.port) ) self._token = None self._expires = None self._session = None @property def URI(self) -> str: """ Return the fully qualified URI. """ username = self.__dict__.get('username', None) password = self.__dict__.get('password', None) creds = (username + ':' + password + '@') if username and password else '' return ( self.protocol + '://' + creds + self.host + ':' + str(self.port) ) @property def session(self): if self._session is None: requests = attempt_import('requests') if requests: self._session = requests.Session() if self._session is None: error(f"Failed to import requests. Is requests installed?") return self._session @property def token(self): expired = ( True if self._expires is None else ( (self._expires < datetime.datetime.utcnow() + datetime.timedelta(minutes=1)) ) ) if self._token is None or expired: success, msg = self.login() if not success: warn(msg, stack=False) return self._token
Ancestors
- meerschaum.connectors.Connector.Connector
Class variables
var IS_INSTANCE : bool
var IS_THREAD_SAFE : bool
Static methods
def from_uri(uri: str, label: Optional[str] = None, as_dict: bool = False) ‑> Union[APIConnector, Dict[str, Union[str, int]]]
-
Create a new APIConnector from a URI string.
Parameters
uri
:str
- The URI connection string.
label
:Optional[str]
, defaultNone
- If provided, use this as the connector label. Otherwise use the determined database name.
as_dict
:bool
, defaultFalse
- If
True
, return a dictionary of the keyword arguments necessary to create a newAPIConnector
, otherwise create a new object.
Returns
A new APIConnector object or a dictionary of attributes (if
as_dict
isTrue
).Expand source code
@classmethod def from_uri( cls, uri: str, label: Optional[str] = None, as_dict: bool = False, ) -> Union[ 'meerschaum.connectors.APIConnector', Dict[str, Union[str, int]], ]: """ Create a new APIConnector from a URI string. Parameters ---------- uri: str The URI connection string. label: Optional[str], default None If provided, use this as the connector label. Otherwise use the determined database name. as_dict: bool, default False If `True`, return a dictionary of the keyword arguments necessary to create a new `APIConnector`, otherwise create a new object. Returns ------- A new APIConnector object or a dictionary of attributes (if `as_dict` is `True`). """ from meerschaum.connectors.sql import SQLConnector params = SQLConnector.parse_uri(uri) if 'host' not in params: error("No host was found in the provided URI.") params['protocol'] = params.pop('flavor') params['label'] = label or ( ( (params['username'] + '@' if 'username' in params else '') + params['host'] ).lower() ) return cls(**params) if not as_dict else params
Instance variables
var URI : str
-
Return the fully qualified URI.
Expand source code
@property def URI(self) -> str: """ Return the fully qualified URI. """ username = self.__dict__.get('username', None) password = self.__dict__.get('password', None) creds = (username + ':' + password + '@') if username and password else '' return ( self.protocol + '://' + creds + self.host + ':' + str(self.port) )
var session
-
Expand source code
@property def session(self): if self._session is None: requests = attempt_import('requests') if requests: self._session = requests.Session() if self._session is None: error(f"Failed to import requests. Is requests installed?") return self._session
var token
-
Expand source code
@property def token(self): expired = ( True if self._expires is None else ( (self._expires < datetime.datetime.utcnow() + datetime.timedelta(minutes=1)) ) ) if self._token is None or expired: success, msg = self.login() if not success: warn(msg, stack=False) return self._token
Methods
def clear_pipe(self, pipe: Pipe, debug: bool = False, **kw) ‑> SuccessTuple
-
Delete rows in a pipe's table.
Parameters
pipe
:Pipe
- The pipe with rows to be deleted.
Returns
A success tuple.
Expand source code
def clear_pipe( self, pipe: meerschaum.Pipe, debug: bool = False, **kw ) -> SuccessTuple: """ Delete rows in a pipe's table. Parameters ---------- pipe: meerschaum.Pipe The pipe with rows to be deleted. Returns ------- A success tuple. """ kw.pop('metric_keys', None) kw.pop('connector_keys', None) kw.pop('location_keys', None) kw.pop('action', None) kw.pop('force', None) return self.do_action( ['clear', 'pipes'], connector_keys = pipe.connector_keys, metric_keys = pipe.metric_key, location_keys = pipe.location_key, force = True, debug = debug, **kw )
def create_metadata(self, debug: bool = False) ‑> bool
-
Create metadata tables.
Returns
A bool indicating success.
Expand source code
def create_metadata( self, debug: bool = False ) -> bool: """Create metadata tables. Returns ------- A bool indicating success. """ from meerschaum.utils.debug import dprint from meerschaum.config.static import STATIC_CONFIG import json r_url = STATIC_CONFIG['api']['endpoints']['metadata'] response = self.post(r_url, debug=debug) if debug: dprint("Create metadata response: {response.text}") try: metadata_response = json.loads(response.text) except Exception as e: metadata_response = False return False
def delete(self, r_url: str, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Ahy) ‑> requests.Response
-
Wrapper for
requests.delete
.Expand source code
def delete( self, r_url : str, headers : Optional[Dict[str, Any]] = None, use_token : bool = True, debug : bool = False, **kw : Ahy, ) -> requests.Response: """Wrapper for `requests.delete`.""" if debug: from meerschaum.utils.debug import dprint if headers is None: headers = {} if use_token: if debug: dprint(f"Checking token...") headers.update({ 'Authorization': f'Bearer {self.token}' }) if debug: from meerschaum.utils.formatting import pprint dprint(f"Sending DELETE request to {self.url + r_url}.") return self.session.delete( self.url + r_url, headers = headers, **kw )
def delete_pipe(self, pipe: Optional[Pipe] = None, debug: bool = None) ‑> SuccessTuple
-
Delete a Pipe and drop its table.
Expand source code
def delete_pipe( self, pipe: Optional[meerschaum.Pipe] = None, debug: bool = None, ) -> SuccessTuple: """Delete a Pipe and drop its table.""" from meerschaum.utils.warnings import error from meerschaum.utils.debug import dprint if pipe is None: error(f"Pipe cannot be None.") r_url = pipe_r_url(pipe) response = self.delete( r_url + '/delete', debug = debug, ) if debug: dprint(response.text) if isinstance(response.json(), list): response_tuple = response.__bool__(), response.json()[1] elif 'detail' in response.json(): response_tuple = response.__bool__(), response.json()['detail'] else: response_tuple = response.__bool__(), response.text return response_tuple
def delete_plugin(self, plugin: meerschaum.core.Plugin, debug: bool = False) ‑> SuccessTuple
-
Delete a plugin from an API repository.
Expand source code
def delete_plugin( self, plugin: meerschaum.core.Plugin, debug: bool = False ) -> SuccessTuple: """Delete a plugin from an API repository.""" import json r_url = plugin_r_url(plugin) try: response = self.delete(r_url, debug=debug) except Exception as e: return False, f"Failed to delete plugin '{plugin}'." try: success, msg = json.loads(response.text) except Exception as e: return False, response.text return success, msg
def delete_user(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> SuccessTuple
-
Delete a user.
Expand source code
def delete_user( self, user: 'meerschaum.core.User', debug: bool = False, **kw: Any ) -> SuccessTuple: """Delete a user.""" from meerschaum.config.static import _static_config import json r_url = f"{_static_config()['api']['endpoints']['users']}/{user.username}" response = self.delete(r_url, debug=debug) try: _json = json.loads(response.text) if isinstance(_json, dict) and 'detail' in _json: return False, _json['detail'] success_tuple = tuple(_json) except Exception as e: success_tuple = False, f"Failed to delete user '{user.username}'." return success_tuple
def do_action(self, action: Optional[List[str]] = None, sysargs: Optional[List[str]] = None, debug: bool = False, **kw) ‑> Tuple[bool, str]
-
Execute a Meerschaum action remotely.
If sysargs is provided, parse those instead. Otherwise infer everything from keyword arguments.
NOTE: The first index of
action
should NOT be removed! Example: action = ['show', 'config']Returns: tuple (succeeded : bool, message : str)
Parameters
action
:Optional[List[str]] :
- (Default value = None)
sysargs
:Optional[List[str]] :
- (Default value = None)
debug
:bool :
- (Default value = False)
**kw :
Returns
Expand source code
def do_action( self, action: Optional[List[str]] = None, sysargs: Optional[List[str]] = None, debug: bool = False, **kw ) -> SuccessTuple: """Execute a Meerschaum action remotely. If sysargs is provided, parse those instead. Otherwise infer everything from keyword arguments. NOTE: The first index of `action` should NOT be removed! Example: action = ['show', 'config'] Returns: tuple (succeeded : bool, message : str) Parameters ---------- action: Optional[List[str]] : (Default value = None) sysargs: Optional[List[str]] : (Default value = None) debug: bool : (Default value = False) **kw : Returns ------- """ import sys, json from meerschaum.utils.debug import dprint from meerschaum.config.static import _static_config from meerschaum.utils.misc import json_serialize_datetime if action is None: action = [] if sysargs is not None and action and action[0] == '': from meerschaum._internal.arguments import parse_arguments if debug: dprint(f"Parsing sysargs:\n{sysargs}") json_dict = parse_arguments(sysargs) else: json_dict = kw json_dict['action'] = action json_dict['debug'] = debug root_action = json_dict['action'][0] del json_dict['action'][0] r_url = f"{_static_config()['api']['endpoints']['actions']}/{root_action}" if debug: from meerschaum.utils.formatting import pprint dprint(f"Sending data to '{self.url + r_url}':") pprint(json_dict, stream=sys.stderr) response = self.post( r_url, data = json.dumps(json_dict, default=json_serialize_datetime), debug = debug, ) try: response_list = json.loads(response.text) if isinstance(response_list, dict) and 'detail' in response_list: return False, response_list['detail'] except Exception as e: print(f"Invalid response: {response}") print(e) return False, response.text if debug: dprint(response) try: return response_list[0], response_list[1] except Exception as e: return False, f"Failed to parse result from action '{root_action}'"
def drop_pipe(self, pipe: Pipe, debug: bool = False) ‑> SuccessTuple
-
Drop a pipe's table but maintain its registration.
Parameters
pipe
:meerschaum.Pipe:
- The pipe to be dropped.
Returns
A success tuple (bool, str).
Expand source code
def drop_pipe( self, pipe: meerschaum.Pipe, debug: bool = False ) -> SuccessTuple: """Drop a pipe's table but maintain its registration. Parameters ---------- pipe: meerschaum.Pipe: The pipe to be dropped. Returns ------- A success tuple (bool, str). """ return self.do_action( ['drop', 'pipes'], connector_keys = pipe.connector_keys, metric_keys = pipe.metric_key, location_keys = pipe.location_key, force = True, debug = debug )
def edit_pipe(self, pipe: Pipe, patch: bool = False, debug: bool = False) ‑> SuccessTuple
-
Submit a PATCH to the API to edit an existing Pipe object. Returns a tuple of (success_bool, response_dict).
Expand source code
def edit_pipe( self, pipe: meerschaum.Pipe, patch: bool = False, debug: bool = False, ) -> SuccessTuple: """Submit a PATCH to the API to edit an existing Pipe object. Returns a tuple of (success_bool, response_dict). """ from meerschaum.utils.debug import dprint ### NOTE: if `parameters` is supplied in the Pipe constructor, ### then `pipe.parameters` will exist and not be fetched from the database. r_url = pipe_r_url(pipe) response = self.patch( r_url + '/edit', params = {'patch': patch,}, json = pipe.parameters, debug = debug, ) if debug: dprint(response.text) if isinstance(response.json(), list): response_tuple = response.__bool__(), response.json()[1] elif 'detail' in response.json(): response_tuple = response.__bool__(), response.json()['detail'] else: response_tuple = response.__bool__(), response.text return response_tuple
def edit_user(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> SuccessTuple
-
Edit an existing user.
Expand source code
def edit_user( self, user: 'meerschaum.core.User', debug: bool = False, **kw: Any ) -> SuccessTuple: """Edit an existing user.""" import json from meerschaum.config.static import STATIC_CONFIG r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/edit" data = { 'username': user.username, 'password': user.password, 'type': user.type, 'email': user.email, 'attributes': json.dumps(user.attributes), } response = self.post(r_url, data=data, debug=debug) try: _json = json.loads(response.text) if isinstance(_json, dict) and 'detail' in _json: return False, _json['detail'] success_tuple = tuple(_json) except Exception as e: msg = response.text if response else f"Failed to edit user '{user}'." return False, msg return tuple(success_tuple)
def fetch(self, pipe: Pipe, begin: Optional[datetime.datetime, str] = '', end: Optional[datetime.datetime] = None, params: Optional[Dict, Any] = None, debug: bool = False, **kw: Any) ‑> pandas.DataFrame
-
Get the Pipe data from the remote Pipe.
Expand source code
def fetch( self, pipe: meerschaum.Pipe, begin: Optional[datetime.datetime, str] = '', end: Optional[datetime.datetime] = None, params: Optional[Dict, Any] = None, debug: bool = False, **kw: Any ) -> pandas.DataFrame: """Get the Pipe data from the remote Pipe.""" from meerschaum.utils.debug import dprint from meerschaum.utils.warnings import warn, error from meerschaum.config.static import _static_config from meerschaum.config._patch import apply_patch_to_config if 'fetch' not in pipe.parameters: warn(f"Missing 'fetch' parameters for Pipe '{pipe}'.", stack=False) return None instructions = pipe.parameters['fetch'] if 'connector_keys' not in instructions: warn(f"Missing connector_keys in fetch parameters for Pipe '{pipe}'", stack=False) return None remote_connector_keys = instructions.get('connector_keys', None) if 'metric_key' not in instructions: warn(f"Missing metric_key in fetch parameters for Pipe '{pipe}'", stack=False) return None remote_metric_key = instructions.get('metric_key', None) remote_location_key = instructions.get('location_key', None) if begin is None: begin = pipe.sync_time _params = copy.deepcopy(params) if params is not None else {} _params = apply_patch_to_config(_params, instructions.get('params', {})) from meerschaum import Pipe p = Pipe( remote_connector_keys, remote_metric_key, remote_location_key, mrsm_instance = self ) begin = ( begin if not (isinstance(begin, str) and begin == '') else pipe.get_sync_time(debug=debug) ) return p.get_data( begin=begin, end=end, params=_params, debug=debug )
def fetch_pipes_keys(self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) ‑> List[Tuple[str, str, Optional[str]]]
-
Fetch registered Pipes' keys from the API.
Parameters
connector_keys
:Optional[List[str]]
, defaultNone
- The connector keys for the query.
metric_keys
:Optional[List[str]]
, defaultNone
- The metric keys for the query.
location_keys
:Optional[List[str]]
, defaultNone
- The location keys for the query.
tags
:Optional[List[str]]
, defaultNone
- A list of tags for the query.
params
:Optional[Dict[str, Any]]
, defaultNone
- A parameters dictionary for filtering against the
pipes
table (e.g.{'connector_keys': 'plugin:foo'}
). Not recommeded to be used. debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A list of tuples containing pipes' keys.
Expand source code
def fetch_pipes_keys( self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False ) -> Union[List[Tuple[str, str, Union[str, None]]]]: """ Fetch registered Pipes' keys from the API. Parameters ---------- connector_keys: Optional[List[str]], default None The connector keys for the query. metric_keys: Optional[List[str]], default None The metric keys for the query. location_keys: Optional[List[str]], default None The location keys for the query. tags: Optional[List[str]], default None A list of tags for the query. params: Optional[Dict[str, Any]], default None A parameters dictionary for filtering against the `pipes` table (e.g. `{'connector_keys': 'plugin:foo'}`). Not recommeded to be used. debug: bool, default False Verbosity toggle. Returns ------- A list of tuples containing pipes' keys. """ from meerschaum.utils.warnings import error from meerschaum.config.static import STATIC_CONFIG import json if connector_keys is None: connector_keys = [] if metric_keys is None: metric_keys = [] if location_keys is None: location_keys = [] if tags is None: tags = [] r_url = STATIC_CONFIG['api']['endpoints']['pipes'] + '/keys' try: j = self.get( r_url, params = { 'connector_keys': json.dumps(connector_keys), 'metric_keys': json.dumps(metric_keys), 'location_keys': json.dumps(location_keys), 'tags': json.dumps(tags), 'params': json.dumps(params), }, debug=debug ).json() except Exception as e: error(str(e)) if 'detail' in j: error(j['detail'], stack=False) return [tuple(r) for r in j]
def get(self, r_url: str, headers: Optional[Dict[str, str]] = None, use_token: bool = True, debug: bool = False, **kw: Any) ‑> requests.Reponse
-
Wrapper for
requests.get
.Expand source code
def get( self, r_url: str, headers: Optional[Dict[str, str]] = None, use_token: bool = True, debug: bool = False, **kw: Any ) -> requests.Reponse: """Wrapper for `requests.get`.""" if debug: from meerschaum.utils.debug import dprint if headers is None: headers = {} if use_token: if debug: dprint(f"Checking login token.") headers.update({'Authorization': f'Bearer {self.token}'}) if debug: from meerschaum.utils.formatting import pprint dprint(f"Sending GET request to {self.url + r_url}.") return self.session.get( self.url + r_url, headers = headers, **kw )
def get_actions(self) ‑> list
-
Get available actions from the API server
Expand source code
def get_actions( self, ) -> list: """Get available actions from the API server""" from meerschaum.config.static import STATIC_CONFIG return self.get(STATIC_CONFIG['api']['endpoints']['actions'])
def get_backtrack_data(self, pipe: Pipe, begin: datetime.datetime, backtrack_minutes: int = 0, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw: Any) ‑> pandas.DataFrame
-
Get a Pipe's backtrack data from the API.
Expand source code
def get_backtrack_data( self, pipe: meerschaum.Pipe, begin: datetime.datetime, backtrack_minutes: int = 0, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw: Any, ) -> pandas.DataFrame: """Get a Pipe's backtrack data from the API.""" import json from meerschaum.utils.debug import dprint from meerschaum.utils.warnings import warn r_url = pipe_r_url(pipe) try: response = self.get( r_url + "/backtrack_data", params = { 'begin': begin, 'backtrack_minutes': backtrack_minutes, 'params': json.dumps(params), }, debug = debug ) except Exception as e: warn(f"Failed to parse backtrack data JSON for {pipe}. Exception:\n" + str(e)) return None from meerschaum.utils.packages import import_pandas from meerschaum.utils.misc import parse_df_datetimes if debug: dprint(response.text) pd = import_pandas() try: df = pd.read_json(response.text) except Exception as e: warn(str(e)) return None df = parse_df_datetimes(pd.read_json(response.text), debug=debug) return df
def get_chaining_status(self, **kw) ‑> Optional[bool]
-
Parameters
**kw :
Returns
type
Expand source code
def get_chaining_status(self, **kw) -> Optional[bool]: """ Parameters ---------- **kw : Returns ------- type """ from meerschaum.config.static import _static_config try: response = self.get( _static_config()['api']['endpoints']['chaining'], use_token = True, **kw ) if not response: return None except Exception as e: return None return response.json()
def get_mrsm_version(self, **kw) ‑> Optional[str]
-
Parameters
**kw :
Returns
type
Expand source code
def get_mrsm_version(self, **kw) -> Optional[str]: """ Parameters ---------- **kw : Returns ------- type """ from meerschaum.config.static import _static_config try: j = self.get( _static_config()['api']['endpoints']['version'] + '/mrsm', use_token = True, **kw ).json() except Exception as e: return None if isinstance(j, dict) and 'detail' in j: return None return j
def get_pipe_attributes(self, pipe: Pipe, debug: bool = False) ‑> Dict[str, Any]
-
Get a Pipe's attributes from the API
Parameters
pipe
:Pipe
- The pipe whose attributes we are fetching.
Returns
A dictionary of a pipe's attributes. If the pipe does not exist, return an empty dictionary.
Expand source code
def get_pipe_attributes( self, pipe: meerschaum.Pipe, debug: bool = False, ) -> Dict[str, Any]: """Get a Pipe's attributes from the API Parameters ---------- pipe: meerschaum.Pipe The pipe whose attributes we are fetching. Returns ------- A dictionary of a pipe's attributes. If the pipe does not exist, return an empty dictionary. """ r_url = pipe_r_url(pipe) response = self.get(r_url + '/attributes', debug=debug) import json try: return json.loads(response.text) except Exception as e: return {}
def get_pipe_columns_types(self, pipe: Pipe, debug: bool = False) ‑> Union[Dict[str, str], None]
-
Fetch the columns and types of the pipe's table.
Parameters
pipe
:Pipe
- The pipe whose columns to be queried.
Returns
A dictionary mapping column names to their database types.
Examples
>>> { ... 'dt': 'TIMESTAMP WITHOUT TIMEZONE', ... 'id': 'BIGINT', ... 'val': 'DOUBLE PRECISION', ... } >>>
Expand source code
def get_pipe_columns_types( self, pipe: meerschaum.Pipe, debug: bool = False, ) -> Union[Dict[str, str], None]: """ Fetch the columns and types of the pipe's table. Parameters ---------- pipe: meerschaum.Pipe The pipe whose columns to be queried. Returns ------- A dictionary mapping column names to their database types. Examples -------- >>> { ... 'dt': 'TIMESTAMP WITHOUT TIMEZONE', ... 'id': 'BIGINT', ... 'val': 'DOUBLE PRECISION', ... } >>> """ r_url = pipe_r_url(pipe) + '/columns/types' response = self.get( r_url, debug = debug ) j = response.json() if isinstance(j, dict) and 'detail' in j and len(j.keys()) == 1: from meerschaum.utils.warnings import warn warn(j['detail']) return None if not isinstance(j, dict): warn(response.text) return None return j
def get_pipe_data(self, pipe: Pipe, begin: Union[str, datetime.datetime, int, None] = None, end: Union[str, datetime.datetime, int, None] = None, params: Optional[Dict[str, Any]] = None, as_chunks: bool = False, debug: bool = False, **kw: Any) ‑> Union[pandas.DataFrame, None]
-
Fetch data from the API.
Expand source code
def get_pipe_data( self, pipe: meerschaum.Pipe, begin: Union[str, datetime.datetime, int, None] = None, end: Union[str, datetime.datetime, int, None] = None, params: Optional[Dict[str, Any]] = None, as_chunks: bool = False, debug: bool = False, **kw: Any ) -> Union[pandas.DataFrame, None]: """Fetch data from the API.""" import json from meerschaum.utils.warnings import warn r_url = pipe_r_url(pipe) chunks_list = [] while True: try: response = self.get( r_url + "/data", params = {'begin': begin, 'end': end, 'params': json.dumps(params)}, debug = debug ) if not response.ok: return None j = response.json() except Exception as e: warn(str(e)) return None if isinstance(j, dict) and 'detail' in j: return False, j['detail'] break from meerschaum.utils.packages import import_pandas from meerschaum.utils.misc import parse_df_datetimes pd = import_pandas() try: df = pd.read_json(response.text) except Exception as e: warn(str(e)) return None df = parse_df_datetimes( df, ignore_cols = [ col for col, dtype in pipe.dtypes.items() if 'datetime' not in str(dtype) ], debug = debug, ) return df
def get_pipe_id(self, pipe: meerschuam.Pipe, debug: bool = False) ‑> int
-
Get a Pipe's ID from the API.
Expand source code
def get_pipe_id( self, pipe: meerschuam.Pipe, debug: bool = False, ) -> int: """Get a Pipe's ID from the API.""" from meerschaum.utils.debug import dprint r_url = pipe_r_url(pipe) response = self.get( r_url + '/id', debug = debug ) if debug: dprint(f"Got pipe ID: {response.text}") try: return int(response.text) except Exception as e: return None
def get_pipe_rowcount(self, pipe: "'Pipe'", begin: "Optional['datetime.datetime']" = None, end: "Optional['datetime.datetime']" = None, params: Optional[Dict[str, Any]] = None, remote: bool = False, debug: bool = False) ‑> int
-
Get a pipe's row count from the API.
Parameters
pipe
:'meerschaum.Pipe':
- The pipe whose row count we are counting.
begin
:Optional[datetime.datetime]
, defaultNone
- If provided, bound the count by this datetime.
end
:Optional[datetime.datetime]
- If provided, bound the count by this datetime.
params
:Optional[Dict[str, Any]]
, defaultNone
- If provided, bound the count by these parameters.
remote
:bool
, defaultFalse
Returns
The number of rows in the pipe's table, bound the given parameters. If the table does not exist, return 0.
Expand source code
def get_pipe_rowcount( self, pipe: 'meerschaum.Pipe', begin: Optional['datetime.datetime'] = None, end: Optional['datetime.datetime'] = None, params: Optional[Dict[str, Any]] = None, remote: bool = False, debug: bool = False, ) -> int: """Get a pipe's row count from the API. Parameters ---------- pipe: 'meerschaum.Pipe': The pipe whose row count we are counting. begin: Optional[datetime.datetime], default None If provided, bound the count by this datetime. end: Optional[datetime.datetime] If provided, bound the count by this datetime. params: Optional[Dict[str, Any]], default None If provided, bound the count by these parameters. remote: bool, default False Returns ------- The number of rows in the pipe's table, bound the given parameters. If the table does not exist, return 0. """ import json r_url = pipe_r_url(pipe) response = self.get( r_url + "/rowcount", json = params, params = { 'begin' : begin, 'end' : end, 'remote' : remote, }, debug = debug ) try: return int(json.loads(response.text)) except Exception as e: return 0
def get_plugin_attributes(self, plugin: meerschaum.core.Plugin, debug: bool = False) ‑> Mapping[str, Any]
-
Return a plugin's attributes.
Expand source code
def get_plugin_attributes( self, plugin: meerschaum.core.Plugin, debug: bool = False ) -> Mapping[str, Any]: """ Return a plugin's attributes. """ import json from meerschaum.utils.warnings import warn, error from meerschaum.config.static import _static_config r_url = plugin_r_url(plugin) + '/attributes' response = self.get(r_url, use_token=True, debug=debug) attributes = response.json() if isinstance(attributes, str) and attributes and attributes[0] == '{': try: attributes = json.loads(attributes) except Exception as e: pass if not isinstance(attributes, dict): error(response.text) elif not response and 'detail' in attributes: warn(attributes['detail']) return {} return attributes
def get_plugins(self, user_id: Optional[int] = None, search_term: Optional[str] = None, debug: bool = False) ‑> Sequence[str]
-
Return a list of registered plugin names.
Parameters
- user_id :
- If specified, return all plugins from a certain user.
user_id
:Optional[int] :
- (Default value = None)
search_term
:Optional[str] :
- (Default value = None)
debug
:bool :
- (Default value = False)
Returns
Expand source code
def get_plugins( self, user_id : Optional[int] = None, search_term : Optional[str] = None, debug : bool = False ) -> Sequence[str]: """Return a list of registered plugin names. Parameters ---------- user_id : If specified, return all plugins from a certain user. user_id : Optional[int] : (Default value = None) search_term : Optional[str] : (Default value = None) debug : bool : (Default value = False) Returns ------- """ import json from meerschaum.utils.warnings import warn, error from meerschaum.config.static import _static_config response = self.get( _static_config()['api']['endpoints']['plugins'], params = {'user_id' : user_id, 'search_term' : search_term}, use_token = True, debug = debug ) if not response: return [] plugins = json.loads(response.text) if not isinstance(plugins, list): error(response.text) return plugins
def get_sync_time(self, pipe: "'Pipe'", params: Optional[Dict[str, Any]] = None, newest: bool = True, round_down: bool = True, debug: bool = False) ‑> Union[datetime.datetime, int, None]
-
Get a Pipe's most recent datetime value from the API.
Parameters
pipe
:Pipe
- The pipe to select from.
params
:Optional[Dict[str, Any]]
, defaultNone
- Optional params dictionary to build the WHERE clause.
newest
:bool
, defaultTrue
- If
True
, get the most recent datetime (honoringparams
). IfFalse
, get the oldest datetime (ASC instead of DESC). round_down
:bool
, defaultTrue
- If
True
, round the resulting datetime value down to the nearest minute.
Returns
The most recent (or oldest if
newest
isFalse
) datetime of a pipe, rounded down to the closest minute.Expand source code
def get_sync_time( self, pipe: 'meerschaum.Pipe', params: Optional[Dict[str, Any]] = None, newest: bool = True, round_down: bool = True, debug: bool = False, ) -> Union[datetime.datetime, int, None]: """Get a Pipe's most recent datetime value from the API. Parameters ---------- pipe: meerschaum.Pipe The pipe to select from. params: Optional[Dict[str, Any]], default None Optional params dictionary to build the WHERE clause. newest: bool, default True If `True`, get the most recent datetime (honoring `params`). If `False`, get the oldest datetime (ASC instead of DESC). round_down: bool, default True If `True`, round the resulting datetime value down to the nearest minute. Returns ------- The most recent (or oldest if `newest` is `False`) datetime of a pipe, rounded down to the closest minute. """ from meerschaum.utils.misc import is_int from meerschaum.utils.warnings import warn import datetime, json r_url = pipe_r_url(pipe) response = self.get( r_url + '/sync_time', json = params, params = {'newest': newest, 'debug': debug, 'round_down': round_down}, debug = debug, ) if not response: warn(response.text) return None j = response.json() if j is None: dt = None else: try: dt = ( datetime.datetime.fromisoformat(j) ) if not is_int(j) else int(j) except Exception as e: warn(e) dt = None return dt
def get_user_attributes(self, user: "'meerschaum.core.User'", debug: bool = False, **kw) ‑> int
-
Get a user's attributes.
Expand source code
def get_user_attributes( self, user: 'meerschaum.core.User', debug: bool = False, **kw ) -> int: """Get a user's attributes.""" from meerschaum.config.static import _static_config import json r_url = f"{_static_config()['api']['endpoints']['users']}/{user.username}/attributes" response = self.get(r_url, debug=debug, **kw) try: attributes = json.loads(response.text) except Exception as e: attributes = None return attributes
def get_user_id(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> Optional[int]
-
Get a user's ID.
Expand source code
def get_user_id( self, user: 'meerschaum.core.User', debug: bool = False, **kw: Any ) -> Optional[int]: """Get a user's ID.""" from meerschaum.config.static import _static_config import json r_url = f"{_static_config()['api']['endpoints']['users']}/{user.username}/id" response = self.get(r_url, debug=debug, **kw) try: user_id = int(json.loads(response.text)) except Exception as e: user_id = None return user_id
def get_user_password_hash(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> Optional[str]
-
If configured, get a user's password hash.
Expand source code
def get_user_password_hash( self, user: 'meerschaum.core.User', debug: bool = False, **kw: Any ) -> Optional[str]: """If configured, get a user's password hash.""" from meerschaum.config.static import _static_config r_url = _static_config()['api']['endpoints']['users'] + '/' + user.username + '/password_hash' response = self.get(r_url, debug=debug, **kw) if not response: return None return response.json()
def get_user_type(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> Optional[str]
-
If configured, get a user's type.
Expand source code
def get_user_type( self, user: 'meerschaum.core.User', debug: bool = False, **kw: Any ) -> Optional[str]: """If configured, get a user's type.""" from meerschaum.config.static import _static_config r_url = _static_config()['api']['endpoints']['users'] + '/' + user.username + '/type' response = self.get(r_url, debug=debug, **kw) if not response: return None return response.json()
def get_users(self, debug: bool = False, **kw: Any) ‑> List[str]
-
Return a list of registered usernames.
Expand source code
def get_users( self, debug: bool = False, **kw : Any ) -> List[str]: """ Return a list of registered usernames. """ from meerschaum.config.static import _static_config import json response = self.get( f"{_static_config()['api']['endpoints']['users']}", debug = debug, use_token = True, ) if not response: return [] try: return response.json() except Exception as e: return []
def install_plugin(self, name: str, force: bool = False, debug: bool = False) ‑> Tuple[bool, str]
-
Download and attempt to install a plugin from the API.
Expand source code
def install_plugin( self, name: str, force: bool = False, debug: bool = False ) -> SuccessTuple: """Download and attempt to install a plugin from the API.""" import os, pathlib, json from meerschaum.core import Plugin from meerschaum.config._paths import PLUGINS_TEMP_RESOURCES_PATH from meerschaum.utils.debug import dprint from meerschaum.utils.packages import attempt_import binaryornot_check = attempt_import('binaryornot.check', lazy=False) r_url = plugin_r_url(name) dest = pathlib.Path(os.path.join(PLUGINS_TEMP_RESOURCES_PATH, name + '.tar.gz')) if debug: dprint(f"Fetching from '{self.url + r_url}' to '{dest}'...") archive_path = self.wget(r_url, dest, debug=debug) is_binary = binaryornot_check.is_binary(str(archive_path)) if not is_binary: fail_msg = f"Failed to download binary for plugin '{name}'." try: with open(archive_path, 'r') as f: j = json.load(f) if isinstance(j, list): success, msg = tuple(j) elif isinstance(j, dict) and 'detail' in j: success, msg = False, fail_msg except Exception as e: success, msg = False, fail_msg return success, msg plugin = Plugin(name, archive_path=archive_path, repo_connector=self) return plugin.install(force=force, debug=debug)
def login(self, debug: bool = False, warn: bool = True, **kw: Any) ‑> Tuple[bool, str]
-
Log in and set the session token.
Expand source code
def login( self, debug: bool = False, warn: bool = True, **kw: Any ) -> SuccessTuple: """Log in and set the session token.""" from meerschaum.utils.warnings import warn as _warn, info, error from meerschaum.core import User from meerschaum.config.static import _static_config import json, datetime try: login_data = { 'username': self.username, 'password': self.password, } except AttributeError: return False, f"Please login with the command `login {self}`." response = self.post( _static_config()['api']['endpoints']['login'], data = login_data, use_token = False, debug = debug ) if response: msg = f"Successfully logged into '{self}' as user '{login_data['username']}'." self._token = json.loads(response.text)['access_token'] self._expires = datetime.datetime.strptime( json.loads(response.text)['expires'], '%Y-%m-%dT%H:%M:%S.%f' ) else: msg = ( f"Failed to log into '{self}' as user '{login_data['username']}'.\n" + f" Please verify login details for connector '{self}'." ) if warn: _warn(msg, stack=False) return response.__bool__(), msg
def patch(self, r_url: str, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Any) ‑> requests.Response
-
Wrapper for
requests.patch
.Expand source code
def patch( self, r_url: str, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Any ) -> requests.Response: """Wrapper for `requests.patch`.""" if debug: from meerschaum.utils.debug import dprint if headers is None: headers = {} if use_token: if debug: dprint(f"Checking login token.") headers.update({ 'Authorization': f'Bearer {self.token}' }) if debug: from meerschaum.utils.formatting import pprint dprint(f"Sending PATCH request to {self.url + r_url}") return self.session.patch( self.url + r_url, headers = headers, **kw )
def pipe_exists(self, pipe: "'Pipe'", debug: bool = False) ‑> bool
-
Check the API to see if a Pipe exists.
Parameters
pipe
:'meerschaum.Pipe'
- The pipe which were are querying.
Returns
A bool indicating whether a pipe's underlying table exists.
Expand source code
def pipe_exists( self, pipe: 'meerschaum.Pipe', debug: bool = False ) -> bool: """Check the API to see if a Pipe exists. Parameters ---------- pipe: 'meerschaum.Pipe' The pipe which were are querying. Returns ------- A bool indicating whether a pipe's underlying table exists. """ from meerschaum.utils.debug import dprint from meerschaum.utils.warnings import warn r_url = pipe_r_url(pipe) response = self.get(r_url + '/exists', debug=debug) if debug: dprint("Received response: " + str(response.text)) j = response.json() if isinstance(j, dict) and 'detail' in j: warn(j['detail']) return j
def post(self, r_url: str, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Any) ‑> requests.Response
-
Wrapper for
requests.post
.Expand source code
def post( self, r_url: str, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Any ) -> requests.Response: """Wrapper for `requests.post`.""" if debug: from meerschaum.utils.debug import dprint if headers is None: headers = {} if use_token: if debug: dprint(f"Checking token...") headers.update({'Authorization': f'Bearer {self.token}'}) if debug: from meerschaum.utils.formatting import pprint dprint(f"Sending POST request to {self.url + r_url}") return self.session.post( self.url + r_url, headers = headers, **kw )
def register_pipe(self, pipe: Pipe, debug: bool = False) ‑> SuccessTuple
-
Submit a POST to the API to register a new Pipe object. Returns a tuple of (success_bool, response_dict).
Expand source code
def register_pipe( self, pipe: meerschaum.Pipe, debug: bool = False ) -> SuccessTuple: """Submit a POST to the API to register a new Pipe object. Returns a tuple of (success_bool, response_dict). """ from meerschaum.utils.debug import dprint from meerschaum.config.static import STATIC_CONFIG ### NOTE: if `parameters` is supplied in the Pipe constructor, ### then `pipe.parameters` will exist and not be fetched from the database. r_url = pipe_r_url(pipe) response = self.post( r_url + '/register', json = pipe.parameters, debug = debug, ) if debug: dprint(response.text) if isinstance(response.json(), list): response_tuple = response.__bool__(), response.json()[1] elif 'detail' in response.json(): response_tuple = response.__bool__(), response.json()['detail'] else: response_tuple = response.__bool__(), response.text return response_tuple
def register_plugin(self, plugin: meerschaum.core.Plugin, make_archive: bool = True, debug: bool = False) ‑> SuccessTuple
-
Register a plugin and upload its archive.
Expand source code
def register_plugin( self, plugin: meerschaum.core.Plugin, make_archive: bool = True, debug: bool = False, ) -> SuccessTuple: """Register a plugin and upload its archive.""" import json archive_path = plugin.make_tar(debug=debug) if make_archive else plugin.archive_path file_pointer = open(archive_path, 'rb') files = {'archive': file_pointer} metadata = { 'version': plugin.version, 'attributes': json.dumps(plugin.attributes), } r_url = plugin_r_url(plugin) try: response = self.post(r_url, files=files, params=metadata, debug=debug) except Exception as e: return False, f"Failed to register plugin '{plugin}'." finally: file_pointer.close() try: success, msg = json.loads(response.text) except Exception as e: return False, response.text return success, msg
def register_user(self, user: "'meerschaum.core.User'", debug: bool = False, **kw: Any) ‑> SuccessTuple
-
Register a new user.
Expand source code
def register_user( self, user: 'meerschaum.core.User', debug: bool = False, **kw: Any ) -> SuccessTuple: """Register a new user.""" import json from meerschaum.config.static import _static_config r_url = f"{_static_config()['api']['endpoints']['users']}/register" data = { 'username': user.username, 'password': user.password, 'attributes': json.dumps(user.attributes), } if user.type: data['type'] = user.type if user.email: data['email'] = user.email response = self.post(r_url, data=data, debug=debug) try: _json = json.loads(response.text) if isinstance(_json, dict) and 'detail' in _json: return False, _json['detail'] success_tuple = tuple(_json) except Exception: msg = response.text if response else f"Failed to register user '{user}'." return False, msg return tuple(success_tuple)
def sync_pipe(self, pipe: Optional[Pipe] = None, df: Optional[Union[pandas.DataFrame, Dict[Any, Any], str]] = None, chunksize: Optional[int] = -1, debug: bool = False, **kw: Any) ‑> SuccessTuple
-
Append a pandas DataFrame to a Pipe. If Pipe does not exist, it is registered with supplied metadata.
Expand source code
def sync_pipe( self, pipe: Optional[meerschaum.Pipe] = None, df: Optional[Union[pandas.DataFrame, Dict[Any, Any], str]] = None, chunksize: Optional[int] = -1, debug: bool = False, **kw: Any ) -> SuccessTuple: """Append a pandas DataFrame to a Pipe. If Pipe does not exist, it is registered with supplied metadata. """ from meerschaum.utils.debug import dprint from meerschaum.utils.warnings import warn from meerschaum.utils.misc import json_serialize_datetime from meerschaum.config import get_config from meerschaum.utils.packages import attempt_import import json, time begin = time.time() more_itertools = attempt_import('more_itertools') if df is None: msg = f"DataFrame is `None`. Cannot sync {pipe}." return False, msg def get_json_str(c): ### allow syncing dict or JSON without needing to import pandas (for IOT devices) return ( json.dumps(c, default=json_serialize_datetime) if isinstance(c, (dict, list)) else c.to_json(date_format='iso', date_unit='ns') ) df = json.loads(df) if isinstance(df, str) else df ### TODO Make separate chunksize for API? _chunksize : Optional[int] = (1 if chunksize is None else ( get_config('system', 'connectors', 'sql', 'chunksize') if chunksize == -1 else chunksize )) keys : list = list(df.keys()) chunks = [] if hasattr(df, 'index'): rowcount = len(df) chunks = [df.iloc[i] for i in more_itertools.chunked(df.index, _chunksize)] elif isinstance(df, dict): ### `_chunks` is a dict of lists of dicts. ### e.g. {'a' : [ {'a':[1, 2]}, {'a':[3, 4]} ] } _chunks = {k: [] for k in keys} rowcount = len(df[keys[0]]) for k in keys: if len(df[k]) != rowcount: return False, "Arrays must all be the same length." chunk_iter = more_itertools.chunked(df[k], _chunksize) for l in chunk_iter: _chunks[k].append({k: l}) ### `chunks` is a list of dicts (e.g. orient by rows in pandas JSON). for k, l in _chunks.items(): for i, c in enumerate(l): try: chunks[i].update(c) except IndexError: chunks.append(c) elif isinstance(df, list): chunks = [df[i] for i in more_itertools.chunked(df, _chunksize)] rowcount = len(chunks) ### Send columns in case the user has defined them locally. if pipe.columns: kw['columns'] = json.dumps(pipe.columns) r_url = pipe_r_url(pipe) + '/data' for i, c in enumerate(chunks): if debug: dprint(f"Posting chunk ({i + 1} / {len(chunks)}) to {r_url}...") json_str = get_json_str(c) try: response = self.post( r_url, ### handles check_existing params = kw, data = json_str, debug = debug ) except Exception as e: warn(str(e)) return False, str(e) if not response: return False, f"Failed to receive response. Response text: {response.text}" try: j = json.loads(response.text) except Exception as e: return False, str(e) if isinstance(j, dict) and 'detail' in j: return False, j['detail'] try: j = tuple(j) except Exception as e: return False, response.text if debug: dprint("Received response: " + str(j)) if not j[0]: return j len_chunks = len(chunks) success_tuple = True, ( f"It took {round(time.time() - begin, 2)} seconds to sync {rowcount} row" + ('s' if rowcount != 1 else '') + f" across {len_chunks} chunk" + ('s' if len_chunks != 1 else '') + f" to {pipe}." ) return success_tuple
def test_connection(self, **kw: Any) ‑> Optional[bool]
-
Test if a successful connection to the API may be made.
Expand source code
def test_connection( self, **kw: Any ) -> Union[bool, None]: """Test if a successful connection to the API may be made.""" from meerschaum.connectors.poll import retry_connect _default_kw = { 'max_retries': 1, 'retry_wait': 0, 'warn': False, 'connector': self, 'enforce_chaining': False, 'enforce_login': False, } _default_kw.update(kw) try: return retry_connect(**_default_kw) except Exception as e: return False
def wget(self, r_url: str, dest: Optional[Union[str, pathlib.Path]] = None, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Any) ‑> pathlib.Path
-
Mimic wget with requests.
Expand source code
def wget( self, r_url: str, dest: Optional[Union[str, pathlib.Path]] = None, headers: Optional[Dict[str, Any]] = None, use_token: bool = True, debug: bool = False, **kw: Any ) -> pathlib.Path: """Mimic wget with requests. """ from meerschaum.utils.misc import wget from meerschaum.utils.debug import dprint if headers is None: headers = {} if use_token: if debug: dprint(f"Checking login token.") headers.update({'Authorization': f'Bearer {self.token}'}) return wget(self.url + r_url, dest=dest, headers=headers, **kw)
class Connector (type: Optional[str] = None, label: Optional[str] = None, **kw: Any)
-
The base connector class to hold connection attributes,
Parameters
type
:str
- The
type
of the connector (e.g.meerschaum.connectors.sql
,meerschaum.connectors.api
,meerschaum.connectors.plugin
). label
:str
- The
label
for the connector.
Run
mrsm edit config
and to edit connectors in the YAML file:meerschaum: connections: {type}: {label}: ### attributes go here
Expand source code
class Connector(metaclass=abc.ABCMeta): """ The base connector class to hold connection attributes, """ def __init__( self, type: Optional[str] = None, label: Optional[str] = None, **kw: Any ): """ Parameters ---------- type: str The `type` of the connector (e.g. `sql`, `api`, `plugin`). label: str The `label` for the connector. Run `mrsm edit config` and to edit connectors in the YAML file: ``` meerschaum: connections: {type}: {label}: ### attributes go here ``` """ self._original_dict = copy.deepcopy(self.__dict__) self._set_attributes(type=type, label=label, **kw) self.verify_attributes(getattr(self, 'REQUIRED_ATTRIBUTES', None)) def _reset_attributes(self): self.__dict__ = self._original_dict def _set_attributes( self, *args, inherit_default: bool = True, **kw: Any ): from meerschaum.config.static import STATIC_CONFIG from meerschaum.utils.warnings import error self._attributes = {} default_label = STATIC_CONFIG['connectors']['default_label'] ### NOTE: Support the legacy method of explicitly passing the type. label = kw.get('label', None) if label is None: if len(args) == 2: label = args[1] elif len(args) == 0: label = None else: label = args[0] if label == 'default': error( f"Label cannot be 'default'. Did you mean '{default_label}'?", InvalidAttributesError, ) self.__dict__['label'] = label from meerschaum.config import get_config conn_configs = copy.deepcopy(get_config('meerschaum', 'connectors')) connector_config = copy.deepcopy(get_config('system', 'connectors')) ### inherit attributes from 'default' if exists if inherit_default: inherit_from = 'default' if self.type in conn_configs and inherit_from in conn_configs[self.type]: _inherit_dict = copy.deepcopy(conn_configs[self.type][inherit_from]) self._attributes.update(_inherit_dict) ### load user config into self._attributes if self.type in conn_configs and self.label in conn_configs[self.type]: self._attributes.update(conn_configs[self.type][self.label]) ### load system config into self._sys_config ### (deep copy so future Connectors don't inherit changes) if self.type in connector_config: self._sys_config = copy.deepcopy(connector_config[self.type]) ### add additional arguments or override configuration self._attributes.update(kw) ### finally, update __dict__ with _attributes. self.__dict__.update(self._attributes) def verify_attributes( self, required_attributes: Optional[List[str]] = None, debug: bool = False ) -> None: """ Ensure that the required attributes have been met. The Connector base class checks the minimum requirements. Child classes may enforce additional requirements. Parameters ---------- required_attributes: Optional[List[str]], default None Attributes to be verified. If `None`, default to `['label']`. debug: bool, default False Verbosity toggle. Returns ------- Don't return anything. Raises ------ An error if any of the required attributes are missing. """ from meerschaum.utils.warnings import error, warn from meerschaum.utils.debug import dprint from meerschaum.utils.misc import items_str if required_attributes is None: required_attributes = ['label'] missing_attributes = set() for a in required_attributes: if a not in self.__dict__: missing_attributes.add(a) if len(missing_attributes) > 0: error( ( f"Missing {items_str(list(missing_attributes))} " + f"for connector '{self.type}:{self.label}'." ), InvalidAttributesError, silent = True, stack = False ) def __str__(self): """ When cast to a string, return type:label. """ return f"{self.type}:{self.label}" def __repr__(self): """ Represent the connector as type:label. """ return str(self) @property def meta(self) -> Dict[str, Any]: """ Return the keys needed to reconstruct this Connector. """ _meta = { key: value for key, value in self.__dict__.items() if not str(key).startswith('_') } _meta.update({ 'type': self.type, 'label': self.label, }) return _meta @property def type(self) -> str: """ Return the type for this connector. """ _type = self.__dict__.get('type', None) if _type is None: import re _type = re.sub(r'connector$', '', self.__class__.__name__.lower()) self.__dict__['type'] = _type return _type @property def label(self) -> str: """ Return the label for this connector. """ _label = self.__dict__.get('label', None) if _label is None: from meerschaum.config.static import STATIC_CONFIG _label = STATIC_CONFIG['connectors']['default_label'] self.__dict__['label'] = _label return _label
Subclasses
Instance variables
var label : str
-
Return the label for this connector.
Expand source code
@property def label(self) -> str: """ Return the label for this connector. """ _label = self.__dict__.get('label', None) if _label is None: from meerschaum.config.static import STATIC_CONFIG _label = STATIC_CONFIG['connectors']['default_label'] self.__dict__['label'] = _label return _label
var meta : Dict[str, Any]
-
Return the keys needed to reconstruct this Connector.
Expand source code
@property def meta(self) -> Dict[str, Any]: """ Return the keys needed to reconstruct this Connector. """ _meta = { key: value for key, value in self.__dict__.items() if not str(key).startswith('_') } _meta.update({ 'type': self.type, 'label': self.label, }) return _meta
var type : str
-
Return the type for this connector.
Expand source code
@property def type(self) -> str: """ Return the type for this connector. """ _type = self.__dict__.get('type', None) if _type is None: import re _type = re.sub(r'connector$', '', self.__class__.__name__.lower()) self.__dict__['type'] = _type return _type
Methods
def verify_attributes(self, required_attributes: Optional[List[str]] = None, debug: bool = False) ‑> None
-
Ensure that the required attributes have been met.
The Connector base class checks the minimum requirements. Child classes may enforce additional requirements.
Parameters
required_attributes
:Optional[List[str]]
, defaultNone
- Attributes to be verified. If
None
, default to['label']
. debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
Don't return anything.
Raises
An error if any of the required attributes are missing.
Expand source code
def verify_attributes( self, required_attributes: Optional[List[str]] = None, debug: bool = False ) -> None: """ Ensure that the required attributes have been met. The Connector base class checks the minimum requirements. Child classes may enforce additional requirements. Parameters ---------- required_attributes: Optional[List[str]], default None Attributes to be verified. If `None`, default to `['label']`. debug: bool, default False Verbosity toggle. Returns ------- Don't return anything. Raises ------ An error if any of the required attributes are missing. """ from meerschaum.utils.warnings import error, warn from meerschaum.utils.debug import dprint from meerschaum.utils.misc import items_str if required_attributes is None: required_attributes = ['label'] missing_attributes = set() for a in required_attributes: if a not in self.__dict__: missing_attributes.add(a) if len(missing_attributes) > 0: error( ( f"Missing {items_str(list(missing_attributes))} " + f"for connector '{self.type}:{self.label}'." ), InvalidAttributesError, silent = True, stack = False )
class SQLConnector (label: Optional[str] = None, flavor: Optional[str] = None, wait: bool = False, connect: bool = False, debug: bool = False, **kw: Any)
-
Connect to SQL databases via
sqlalchemy
.SQLConnectors may be used as Meerschaum instance connectors. Read more about connectors and instances at https://meerschaum.io/reference/connectors/
Parameters
label
:str
, default'main'
- The identifying label for the connector.
E.g. for
sql:main
, 'main' is the label. Defaults to 'main'. flavor
:Optional[str]
, defaultNone
- The database flavor, e.g.
'sqlite'
,'postgresql'
,'cockroachdb'
, etc. To see supported flavors, run thebootstrap connectors
command. wait
:bool
, defaultFalse
- If
True
, block until a database connection has been made. Defaults toFalse
. connect
:bool
, defaultFalse
- If
True
, immediately attempt to connect the database and raise a warning if the connection fails. Defaults toFalse
. debug
:bool
, defaultFalse
- Verbosity toggle.
Defaults to
False
. kw
:Any
- All other arguments will be passed to the connector's attributes. Therefore, a connector may be made without being registered, as long enough parameters are supplied to the constructor.
Expand source code
class SQLConnector(Connector): """ Connect to SQL databases via `sqlalchemy`. SQLConnectors may be used as Meerschaum instance connectors. Read more about connectors and instances at https://meerschaum.io/reference/connectors/ """ IS_INSTANCE: bool = True from ._create_engine import flavor_configs, create_engine from ._sql import read, value, exec, execute, to_sql, exec_queries from meerschaum.utils.sql import test_connection from ._fetch import fetch, get_pipe_metadef, get_pipe_backtrack_minutes from ._cli import cli from ._pipes import ( fetch_pipes_keys, create_indices, drop_indices, get_create_index_queries, get_drop_index_queries, get_add_columns_queries, get_alter_columns_queries, delete_pipe, get_backtrack_data, get_pipe_data, get_pipe_data_query, register_pipe, edit_pipe, get_pipe_id, get_pipe_attributes, sync_pipe, sync_pipe_inplace, get_sync_time, pipe_exists, get_pipe_rowcount, drop_pipe, clear_pipe, get_pipe_table, get_pipe_columns_types, get_to_sql_dtype, ) from ._plugins import ( register_plugin, delete_plugin, get_plugin_id, get_plugin_version, get_plugins, get_plugin_user_id, get_plugin_username, get_plugin_attributes, ) from ._users import ( register_user, get_user_id, get_users, edit_user, delete_user, get_user_password_hash, get_user_type, get_user_attributes, ) from ._uri import from_uri, parse_uri def __init__( self, label: Optional[str] = None, flavor: Optional[str] = None, wait: bool = False, connect: bool = False, debug: bool = False, **kw: Any ): """ Parameters ---------- label: str, default 'main' The identifying label for the connector. E.g. for `sql:main`, 'main' is the label. Defaults to 'main'. flavor: Optional[str], default None The database flavor, e.g. `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc. To see supported flavors, run the `bootstrap connectors` command. wait: bool, default False If `True`, block until a database connection has been made. Defaults to `False`. connect: bool, default False If `True`, immediately attempt to connect the database and raise a warning if the connection fails. Defaults to `False`. debug: bool, default False Verbosity toggle. Defaults to `False`. kw: Any All other arguments will be passed to the connector's attributes. Therefore, a connector may be made without being registered, as long enough parameters are supplied to the constructor. """ if 'uri' in kw: uri = kw['uri'] if uri.startswith('postgres://'): uri = uri.replace('postgres://', 'postgresql://', 1) kw['uri'] = uri from_uri_params = self.from_uri(kw['uri'], as_dict=True) label = label or from_uri_params.get('label', None) from_uri_params.pop('label', None) kw.update(from_uri_params) ### set __dict__ in base class super().__init__( 'sql', label = label or self.__dict__.get('label', None), **kw ) if self.__dict__.get('flavor', None) == 'sqlite': self._reset_attributes() self._set_attributes( 'sql', label = label, inherit_default = False, **kw ) ### For backwards compatability reasons, set the path for sql:local if its missing. if self.label == 'local' and not self.__dict__.get('database', None): from meerschaum.config._paths import SQLITE_DB_PATH self.database = str(SQLITE_DB_PATH) ### ensure flavor and label are set accordingly if 'flavor' not in self.__dict__: if flavor is None and 'uri' not in self.__dict__: raise Exception( f" Missing flavor. Provide flavor as a key for '{self}'." ) self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None) if self.flavor == 'postgres': self.flavor = 'postgresql' self._debug = debug ### Store the PID and thread at initialization ### so we can dispose of the Pool in child processes or threads. import os, threading self._pid = os.getpid() self._thread_ident = threading.current_thread().ident self._sessions = {} self._locks = {'_sessions': threading.RLock(), } ### verify the flavor's requirements are met if self.flavor not in self.flavor_configs: error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector") if not self.__dict__.get('uri'): self.verify_attributes( self.flavor_configs[self.flavor].get('requirements', set()), debug=debug, ) if wait: from meerschaum.connectors.poll import retry_connect retry_connect(connector=self, debug=debug) if connect: if not self.test_connection(debug=debug): from meerschaum.utils.warnings import warn warn(f"Failed to connect with connector '{self}'!", stack=False) @property def Session(self): if '_Session' not in self.__dict__: if self.engine is None: return None from meerschaum.utils.packages import attempt_import sqlalchemy_orm = attempt_import('sqlalchemy.orm') session_factory = sqlalchemy_orm.sessionmaker(self.engine) self._Session = sqlalchemy_orm.scoped_session(session_factory) return self._Session @property def engine(self): import os, threading ### build the sqlalchemy engine if '_engine' not in self.__dict__: self._engine, self._engine_str = self.create_engine(include_uri=True) same_process = os.getpid() == self._pid same_thread = threading.current_thread().ident == self._thread_ident ### handle child processes if not same_process: self._pid = os.getpid() self._thread = threading.current_thread() from meerschaum.utils.warnings import warn warn(f"Different PID detected. Disposing of connections...") self._engine.dispose() ### handle different threads if not same_thread: pass return self._engine @property def DATABASE_URL(self) -> str: """ Return the URI connection string (alias for `SQLConnector.URI`. """ _ = self.engine return str(self._engine_str) @property def URI(self) -> str: """ Return the URI connection string. """ _ = self.engine return str(self._engine_str) @property def IS_THREAD_SAFE(self) -> str: """ Return whether this connector may be multithreaded. """ if self.flavor == 'duckdb': return False if self.flavor == 'sqlite': return ':memory:' not in self.URI return True @property def metadata(self): from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') if '_metadata' not in self.__dict__: self._metadata = sqlalchemy.MetaData() return self._metadata @property def db(self) -> Optional[databases.Database]: from meerschaum.utils.packages import attempt_import databases = attempt_import('databases', lazy=False, install=True) url = self.DATABASE_URL if 'mysql' in url: url = url.replace('+pymysql', '') if '_db' not in self.__dict__: try: self._db = databases.Database(url) except KeyError: ### Likely encountered an unsupported flavor. from meerschaum.utils.warnings import warn self._db = None return self._db def __getstate__(self): return self.__dict__ def __setstate__(self, d): self.__dict__.update(d) def __call__(self): return self
Ancestors
- meerschaum.connectors.Connector.Connector
Class variables
var IS_INSTANCE : bool
var flavor_configs
Static methods
def from_uri(uri: str, label: Optional[str] = None, as_dict: bool = False) ‑> Union[SQLConnector, Dict[str, Union[str, int]]]
-
Create a new SQLConnector from a URI string.
Parameters
uri
:str
- The URI connection string.
label
:Optional[str]
, defaultNone
- If provided, use this as the connector label. Otherwise use the determined database name.
as_dict
:bool
, defaultFalse
- If
True
, return a dictionary of the keyword arguments necessary to create a newSQLConnector
, otherwise create a new object.
Returns
A new SQLConnector object or a dictionary of attributes (if
as_dict
isTrue
).Expand source code
@classmethod def from_uri( cls, uri: str, label: Optional[str] = None, as_dict: bool = False, ) -> Union[ 'meerschaum.connectors.SQLConnector', Dict[str, Union[str, int]], ]: """ Create a new SQLConnector from a URI string. Parameters ---------- uri: str The URI connection string. label: Optional[str], default None If provided, use this as the connector label. Otherwise use the determined database name. as_dict: bool, default False If `True`, return a dictionary of the keyword arguments necessary to create a new `SQLConnector`, otherwise create a new object. Returns ------- A new SQLConnector object or a dictionary of attributes (if `as_dict` is `True`). """ params = cls.parse_uri(uri) params['uri'] = uri flavor = params.get('flavor', None) if not flavor or flavor not in cls.flavor_configs: error(f"Invalid flavor '{flavor}' detected from the provided URI.") if 'database' not in params: error("Unable to determine the database from the provided URI.") if flavor in ('sqlite', 'duckdb'): if params['database'] == ':memory:': params['label'] = label or f'memory_{flavor}' else: params['label'] = label or params['database'].split(os.path.sep)[-1].lower() else: params['label'] = label or ( ( (params['username'] + '@' if 'username' in params else '') + params.get('host', '') + ('/' if 'host' in params else '') + params.get('database', '') ).lower() ) return cls(**params) if not as_dict else params
def get_pipe_backtrack_minutes(pipe) ‑> Union[int, float]
-
Return the first available value for the following parameter keys:
- fetch, backtrack_minutes
- backtrack_minutes
Expand source code
@staticmethod def get_pipe_backtrack_minutes(pipe) -> Union[int, float]: """ Return the first available value for the following parameter keys: - fetch, backtrack_minutes - backtrack_minutes """ if pipe.parameters.get('fetch', {}).get('backtrack_minutes', None): btm = pipe.parameters['fetch']['backtrack_minutes'] elif pipe.parameters.get('backtrack_minutes', None): btm = pipe.parameters['backtrack_minutes'] else: btm = 0 return btm
def parse_uri(uri: str) ‑> Dict[str, Any]
-
Parse a URI string into a dictionary of parameters.
Parameters
uri
:str
- The database connection URI.
Returns
A dictionary of attributes.
Examples
>>> parse_uri('sqlite:////home/foo/bar.db') {'database': '/home/foo/bar.db', 'flavor': 'sqlite'} >>> parse_uri( ... 'mssql+pyodbc://sa:supersecureSECRETPASSWORD123!@localhost:1439' ... + '/master?driver=ODBC+Driver+17+for+SQL+Server' ... ) {'host': 'localhost', 'database': 'master', 'username': 'sa', 'password': 'supersecureSECRETPASSWORD123!', 'port': 1439, 'flavor': 'mssql', 'driver': 'ODBC Driver 17 for SQL Server'} >>>
Expand source code
@staticmethod def parse_uri(uri: str) -> Dict[str, Any]: """ Parse a URI string into a dictionary of parameters. Parameters ---------- uri: str The database connection URI. Returns ------- A dictionary of attributes. Examples -------- >>> parse_uri('sqlite:////home/foo/bar.db') {'database': '/home/foo/bar.db', 'flavor': 'sqlite'} >>> parse_uri( ... 'mssql+pyodbc://sa:supersecureSECRETPASSWORD123!@localhost:1439' ... + '/master?driver=ODBC+Driver+17+for+SQL+Server' ... ) {'host': 'localhost', 'database': 'master', 'username': 'sa', 'password': 'supersecureSECRETPASSWORD123!', 'port': 1439, 'flavor': 'mssql', 'driver': 'ODBC Driver 17 for SQL Server'} >>> """ from urllib.parse import parse_qs, urlparse sqlalchemy = attempt_import('sqlalchemy') parser = sqlalchemy.engine.url.make_url params = parser(uri).translate_connect_args() params['flavor'] = uri.split(':')[0].split('+')[0] if params['flavor'] == 'postgres': params['flavor'] = 'postgresql' if '?' in uri: parsed_uri = urlparse(uri) for key, value in parse_qs(parsed_uri.query).items(): params.update({key: value[0]}) return params
Instance variables
var DATABASE_URL : str
-
Return the URI connection string (alias for
SQLConnector.URI
.Expand source code
@property def DATABASE_URL(self) -> str: """ Return the URI connection string (alias for `SQLConnector.URI`. """ _ = self.engine return str(self._engine_str)
var IS_THREAD_SAFE : str
-
Return whether this connector may be multithreaded.
Expand source code
@property def IS_THREAD_SAFE(self) -> str: """ Return whether this connector may be multithreaded. """ if self.flavor == 'duckdb': return False if self.flavor == 'sqlite': return ':memory:' not in self.URI return True
var Session
-
Expand source code
@property def Session(self): if '_Session' not in self.__dict__: if self.engine is None: return None from meerschaum.utils.packages import attempt_import sqlalchemy_orm = attempt_import('sqlalchemy.orm') session_factory = sqlalchemy_orm.sessionmaker(self.engine) self._Session = sqlalchemy_orm.scoped_session(session_factory) return self._Session
var URI : str
-
Return the URI connection string.
Expand source code
@property def URI(self) -> str: """ Return the URI connection string. """ _ = self.engine return str(self._engine_str)
var db : Optional[databases.Database]
-
Expand source code
@property def db(self) -> Optional[databases.Database]: from meerschaum.utils.packages import attempt_import databases = attempt_import('databases', lazy=False, install=True) url = self.DATABASE_URL if 'mysql' in url: url = url.replace('+pymysql', '') if '_db' not in self.__dict__: try: self._db = databases.Database(url) except KeyError: ### Likely encountered an unsupported flavor. from meerschaum.utils.warnings import warn self._db = None return self._db
var engine
-
Expand source code
@property def engine(self): import os, threading ### build the sqlalchemy engine if '_engine' not in self.__dict__: self._engine, self._engine_str = self.create_engine(include_uri=True) same_process = os.getpid() == self._pid same_thread = threading.current_thread().ident == self._thread_ident ### handle child processes if not same_process: self._pid = os.getpid() self._thread = threading.current_thread() from meerschaum.utils.warnings import warn warn(f"Different PID detected. Disposing of connections...") self._engine.dispose() ### handle different threads if not same_thread: pass return self._engine
var metadata
-
Expand source code
@property def metadata(self): from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') if '_metadata' not in self.__dict__: self._metadata = sqlalchemy.MetaData() return self._metadata
Methods
def clear_pipe(self, pipe: Pipe, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw) ‑> SuccessTuple
-
Delete a pipe's data within a bounded or unbounded interval without dropping the table.
Parameters
pipe
:Pipe
- The pipe to clear.
begin
:Optional[datetime.datetime]
, defaultNone
- Beginning datetime. Inclusive.
end
:Optional[datetime.datetime]
, defaultNone
- Ending datetime. Exclusive.
params
:Optional[Dict[str, Any]]
, defaultNone
- See
build_where()
.
Expand source code
def clear_pipe( self, pipe: meerschaum.Pipe, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False, **kw ) -> SuccessTuple: """ Delete a pipe's data within a bounded or unbounded interval without dropping the table. Parameters ---------- pipe: meerschaum.Pipe The pipe to clear. begin: Optional[datetime.datetime], default None Beginning datetime. Inclusive. end: Optional[datetime.datetime], default None Ending datetime. Exclusive. params: Optional[Dict[str, Any]], default None See `meerschaum.utils.sql.build_where`. """ if not pipe.exists(debug=debug): return True, f"{pipe} does not exist, so nothing was cleared." from meerschaum.utils.sql import sql_item_name, build_where, dateadd_str from meerschaum.utils.warnings import warn pipe_name = sql_item_name(pipe.target, self.flavor) if not pipe.columns.get('datetime', None): _dt = pipe.guess_datetime() dt_name = sql_item_name(_dt, self.flavor) if _dt else None is_guess = True else: _dt = pipe.get_columns('datetime') dt_name = sql_item_name(_dt, self.flavor) is_guess = False if begin is not None or end is not None: if is_guess: if _dt is None: warn( f"No datetime could be determined for {pipe}." + "\n Ignoring datetime bounds...", stack = False, ) begin, end = None, None else: warn( f"A datetime wasn't specified for {pipe}.\n" + f" Using column \"{_dt}\" for datetime bounds...", stack = False, ) valid_params = {} if params is not None: existing_cols = pipe.get_columns_types(debug=debug) valid_params = {k: v for k, v in params.items() if k in existing_cols} clear_query = ( f"DELETE FROM {pipe_name}\nWHERE 1 = 1\n" + (' AND ' + build_where(valid_params, self, with_where=False) if valid_params else '') + ( f' AND {dt_name} >= ' + dateadd_str(self.flavor, 'day', 0, begin) if begin is not None else '' ) + ( f' AND {dt_name} < ' + dateadd_str(self.flavor, 'day', 0, end) if end is not None else '' ) ) success = self.exec(clear_query, silent=True, debug=debug) is not None msg = "Success" if success else f"Failed to clear {pipe}." return success, msg
def cli(self, debug: bool = False) ‑> Tuple[bool, str]
-
Launch an interactive CLI for the SQLConnector's flavor.
Expand source code
def cli( self, debug : bool = False ) -> SuccessTuple: """Launch an interactive CLI for the SQLConnector's flavor.""" from meerschaum.utils.packages import venv_exec, attempt_import from meerschaum.utils.debug import dprint from meerschaum.utils.warnings import error import sys, subprocess, os if self.flavor not in flavor_clis: return False, f"No CLI available for flavor '{self.flavor}'." if self.flavor == 'duckdb': gadwall = attempt_import('gadwall', debug = debug, lazy=False) gadwall_shell = gadwall.Gadwall(self.database) try: gadwall_shell.cmdloop() except KeyboardInterrupt: pass return True, "Success" elif self.flavor == 'mssql': if 'DOTNET_SYSTEM_GLOBALIZATION_INVARIANT' not in os.environ: os.environ['DOTNET_SYSTEM_GLOBALIZATION_INVARIANT'] = '1' cli_name = flavor_clis[self.flavor] ### Install the CLI package and any dependencies. cli, cli_main = attempt_import(cli_name, (cli_name + '.main'), lazy=False, debug=debug) if cli_name in cli_deps: for dep in cli_deps[cli_name]: locals()[dep] = attempt_import(dep, lazy=False, warn=False, debug=debug) ### NOTE: The `DATABASE_URL` property must be initialized first in case the database is not ### yet defined (e.g. 'sql:local'). cli_arg_str = self.DATABASE_URL if self.flavor in ('sqlite', 'duckdb'): cli_arg_str = str(self.database) ### Define the script to execute to launch the CLI. ### The `mssqlcli` script is manually written to avoid telemetry ### and because `main.cli()` is not defined. launch_cli = f"cli_main.cli(['{cli_arg_str}'])" if self.flavor == 'mssql': launch_cli = ( "mssqlclioptionsparser, mssql_cli = attempt_import(" + "'mssqlcli.mssqlclioptionsparser', 'mssqlcli.mssql_cli', lazy=False)\n" + "ms_parser = mssqlclioptionsparser.create_parser()\n" + f"ms_options = ms_parser.parse_args(['--server', 'tcp:{self.host},{self.port}', " + f"'--database', '{self.database}', " + f"'--username', '{self.username}', '--password', '{self.password}'])\n" + "ms_object = mssql_cli.MssqlCli(ms_options)\n" + "try:\n" + " ms_object.connect_to_database()\n" + " ms_object.run()\n" + "finally:\n" + " ms_object.shutdown()" ) elif self.flavor == 'duckdb': launch_cli = () try: if debug: dprint(f'Launching CLI:\n{launch_cli}') exec(launch_cli) success, msg = True, 'Success' except Exception as e: success, msg = False, str(e) return success, msg
def create_engine(self, include_uri: bool = False, debug: bool = False, **kw) ‑> sqlalchemy.engine.Engine
-
Create a sqlalchemy engine by building the engine string.
Expand source code
def create_engine( self, include_uri: bool = False, debug: bool = False, **kw ) -> 'sqlalchemy.engine.Engine': """Create a sqlalchemy engine by building the engine string.""" from meerschaum.utils.packages import attempt_import from meerschaum.utils.warnings import error, warn sqlalchemy = attempt_import('sqlalchemy') import urllib import copy ### Install and patch required drivers. if self.flavor in install_flavor_drivers: attempt_import(*install_flavor_drivers[self.flavor], debug=debug, lazy=False, warn=False) if self.flavor in require_patching_flavors: from meerschaum.utils.packages import determine_version, _monkey_patch_get_distribution import pathlib for install_name, import_name in require_patching_flavors[self.flavor]: pkg = attempt_import( import_name, debug = debug, lazy = False, warn = False ) _monkey_patch_get_distribution( install_name, determine_version(pathlib.Path(pkg.__file__), venv='mrsm') ) ### supplement missing values with defaults (e.g. port number) for a, value in flavor_configs[self.flavor]['defaults'].items(): if a not in self.__dict__: self.__dict__[a] = value ### Verify that everything is in order. if self.flavor not in flavor_configs: error(f"Cannot create a connector with the flavor '{self.flavor}'.") _engine = flavor_configs[self.flavor].get('engine', None) _username = self.__dict__.get('username', None) _password = self.__dict__.get('password', None) _host = self.__dict__.get('host', None) _port = self.__dict__.get('port', None) _database = self.__dict__.get('database', None) _driver = self.__dict__.get('driver', None) if _driver is not None: ### URL-encode the driver if a space is detected. ### Not a bullet-proof solution, but this should work for most cases. _driver = urllib.parse.quote_plus(_driver) if ' ' in _driver else _driver _uri = self.__dict__.get('uri', None) ### Handle registering specific dialects (due to installing in virtual environments). if self.flavor in flavor_dialects: sqlalchemy.dialects.registry.register(*flavor_dialects[self.flavor]) ### self._sys_config was deepcopied and can be updated safely if self.flavor in ("sqlite", "duckdb"): engine_str = f"{_engine}:///{_database}" if not _uri else _uri if 'create_engine' not in self._sys_config: self._sys_config['create_engine'] = {} if 'connect_args' not in self._sys_config['create_engine']: self._sys_config['create_engine']['connect_args'] = {} self._sys_config['create_engine']['connect_args'].update({"check_same_thread" : False}) else: engine_str = ( _engine + "://" + (_username if _username is not None else '') + ((":" + urllib.parse.quote_plus(_password)) if _password is not None else '') + "@" + _host + ((":" + str(_port)) if _port is not None else '') + (("/" + _database) if _database is not None else '') + (("?driver=" + _driver) if _driver is not None else '') ) if not _uri else _uri if debug: dprint( ( (engine_str.replace(':' + _password, ':' + ('*' * len(_password)))) if _password is not None else engine_str ) + '\n' + f"{self._sys_config.get('create_engine', {}).get('connect_args', {})}" ) _kw_copy = copy.deepcopy(kw) ### NOTE: Order of inheritance: ### 1. Defaults ### 2. System configuration ### 3. Connector configuration ### 4. Keyword arguments _create_engine_args = flavor_configs.get(self.flavor, {}).get('create_engine', {}) def _apply_create_engine_args(update): if 'ALL' not in flavor_configs[self.flavor].get('omit_create_engine', {}): _create_engine_args.update( { k: v for k, v in update.items() if 'omit_create_engine' not in flavor_configs[self.flavor] or k not in flavor_configs[self.flavor].get('omit_create_engine') } ) _apply_create_engine_args(self._sys_config.get('create_engine', {})) _apply_create_engine_args(self.__dict__.get('create_engine', {})) _apply_create_engine_args(_kw_copy) try: engine = sqlalchemy.create_engine( engine_str, ### I know this looks confusing, and maybe it's bad code, ### but it's simple. It dynamically parses the config string ### and splits it to separate the class name (QueuePool) ### from the module name (sqlalchemy.pool). poolclass = getattr( attempt_import( ".".join(self._sys_config['poolclass'].split('.')[:-1]) ), self._sys_config['poolclass'].split('.')[-1] ), echo = debug, **_create_engine_args ) except Exception as e: warn(e) warn(f"Failed to create connector '{self}'.", stack=False) engine = None if include_uri: return engine, engine_str return engine
def create_indices(self, pipe: Pipe, indices: Optional[List[str]] = None, debug: bool = False) ‑> bool
-
Create a pipe's indices.
Expand source code
def create_indices( self, pipe: meerschaum.Pipe, indices: Optional[List[str]] = None, debug: bool = False ) -> bool: """ Create a pipe's indices. """ from meerschaum.utils.warnings import warn from meerschaum.utils.debug import dprint if debug: dprint(f"Creating indices for {pipe}...") if not pipe.columns: warn(f"Unable to create indices for {pipe} without columns.", stack=False) return False ix_queries = { ix: queries for ix, queries in self.get_create_index_queries(pipe, debug=debug).items() if indices is None or ix in indices } success = True for ix, queries in ix_queries.items(): ix_success = all(self.exec_queries(queries, debug=debug, silent=True)) if not ix_success: success = False if debug: dprint(f"Failed to create index on column: {ix}") return success
def delete_pipe(self, pipe: Pipe, debug: bool = False) ‑> SuccessTuple
-
Delete a Pipe's registration and drop its table.
Expand source code
def delete_pipe( self, pipe: meerschaum.Pipe, debug: bool = False, ) -> SuccessTuple: """ Delete a Pipe's registration and drop its table. """ from meerschaum.utils.warnings import warn from meerschaum.utils.sql import sql_item_name from meerschaum.utils.debug import dprint from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') ### try dropping first drop_tuple = pipe.drop(debug=debug) if not drop_tuple[0]: return drop_tuple if not pipe.id: return False, f"{pipe} is not registered." ### ensure pipes table exists from meerschaum.connectors.sql.tables import get_tables pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] q = sqlalchemy.delete(pipes_tbl).where(pipes_tbl.c.pipe_id == pipe.id) if not self.exec(q, debug=debug): return False, f"Failed to delete registration for {pipe}." return True, "Success"
def delete_plugin(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False, **kw: Any) ‑> SuccessTuple
-
Delete a plugin from the plugins table.
Expand source code
def delete_plugin( self, plugin: 'meerschaum.core.Plugin', debug: bool = False, **kw: Any ) -> SuccessTuple: """Delete a plugin from the plugins table.""" from meerschaum.utils.warnings import warn, error from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') from meerschaum.connectors.sql.tables import get_tables plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] plugin_id = self.get_plugin_id(plugin, debug=debug) if plugin_id is None: return True, f"Plugin '{plugin}' was not registered." bind_variables = { 'plugin_id' : plugin_id, } query = sqlalchemy.delete(plugins_tbl).where(plugins_tbl.c.plugin_id == plugin_id) result = self.exec(query, debug=debug) if result is None: return False, f"Failed to delete plugin '{plugin}'." return True, f"Successfully deleted plugin '{plugin}'."
def delete_user(self, user: meerschaum.core.User, debug: bool = False) ‑> SuccessTuple
-
Delete a user's record from the users table.
Expand source code
def delete_user( self, user: meerschaum.core.User, debug: bool = False ) -> SuccessTuple: """Delete a user's record from the users table.""" ### ensure users table exists from meerschaum.connectors.sql.tables import get_tables users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] plugins = get_tables(mrsm_instance=self, debug=debug)['plugins'] from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) if user_id is None: return False, f"User '{user.username}' is not registered and cannot be deleted." query = sqlalchemy.delete(users_tbl).where(users_tbl.c.user_id == user_id) result = self.exec(query, debug=debug) if result is None: return False, f"Failed to delete user '{user}'." query = sqlalchemy.delete(plugins).where(plugins.c.user_id == user_id) result = self.exec(query, debug=debug) if result is None: return False, f"Failed to delete plugins of user '{user}'." return True, f"Successfully deleted user '{user}'"
def drop_indices(self, pipe: Pipe, indices: Optional[List[str]] = None, debug: bool = False) ‑> bool
-
Drop a pipe's indices.
Expand source code
def drop_indices( self, pipe: meerschaum.Pipe, indices: Optional[List[str]] = None, debug: bool = False ) -> bool: """ Drop a pipe's indices. """ from meerschaum.utils.warnings import warn from meerschaum.utils.debug import dprint if debug: dprint(f"Dropping indices for {pipe}...") if not pipe.columns: warn(f"Unable to drop indices for {pipe} without columns.", stack=False) return False ix_queries = { ix: queries for ix, queries in self.get_drop_index_queries(pipe, debug=debug).items() if indices is None or ix in indices } success = True for ix, queries in ix_queries.items(): ix_success = all(self.exec_queries(queries, debug=debug, silent=True)) if not ix_success: success = False if debug: dprint(f"Failed to drop index on column: {ix}") return success
def drop_pipe(self, pipe: Pipe, debug: bool = False, **kw) ‑> SuccessTuple
-
Expand source code
def drop_pipe( self, pipe: meerschaum.Pipe, debug: bool = False, **kw ) -> SuccessTuple: """ Drop a pipe's tables but maintain its registration. Parameters ---------- pipe: meerschaum.Pipe The pipe to drop. """ from meerschaum.utils.sql import table_exists, sql_item_name success = True target, temp_target = pipe.target, '_' + pipe.target target_name, temp_name = ( sql_item_name(target, self.flavor), sql_item_name(temp_target, self.flavor), ) if table_exists(target, self, debug=debug): success = self.exec(f"DROP TABLE {target_name}", silent=True, debug=debug) is not None if table_exists(temp_target, self, debug=debug): success = ( success and self.exec(f"DROP TABLE {temp_name}", silent=True, debug=debug) is not None ) msg = "Success" if success else f"Failed to drop {pipe}." return success, msg
def edit_pipe(self, pipe: Pipe = None, patch: bool = False, debug: bool = False, **kw: Any) ‑> SuccessTuple
-
Persist a Pipe's parameters to its database.
Parameters
pipe
:Pipe
, defaultNone
- The pipe to be edited.
patch
:bool
, defaultFalse
- If patch is
True
, update the existing parameters by cascading. Otherwise overwrite the parameters (default). debug
:bool
, defaultFalse
- Verbosity toggle.
Expand source code
def edit_pipe( self, pipe : meerschaum.Pipe = None, patch: bool = False, debug: bool = False, **kw : Any ) -> SuccessTuple: """ Persist a Pipe's parameters to its database. Parameters ---------- pipe: meerschaum.Pipe, default None The pipe to be edited. patch: bool, default False If patch is `True`, update the existing parameters by cascading. Otherwise overwrite the parameters (default). debug: bool, default False Verbosity toggle. """ if pipe.id is None: return False, f"{pipe} is not registered and cannot be edited." from meerschaum.utils.debug import dprint from meerschaum.utils.packages import attempt_import from meerschaum.utils.sql import json_flavors if not patch: parameters = pipe.__dict__.get('_attributes', {}).get('parameters', {}) else: from meerschaum import Pipe from meerschaum.config._patch import apply_patch_to_config original_parameters = Pipe( pipe.connector_keys, pipe.metric_key, pipe.location_key, mrsm_instance=pipe.instance_keys ).parameters parameters = apply_patch_to_config( original_parameters, pipe.parameters ) ### ensure pipes table exists from meerschaum.connectors.sql.tables import get_tables pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] import json sqlalchemy = attempt_import('sqlalchemy') values = { 'parameters': ( json.dumps(parameters) if self.flavor not in json_flavors else parameters ), } q = sqlalchemy.update(pipes_tbl).values(**values).where( pipes_tbl.c.pipe_id == pipe.id ) result = self.exec(q, debug=debug) message = ( f"Successfully edited {pipe}." if result is not None else f"Failed to edit {pipe}." ) return (result is not None), message
def edit_user(self, user: meerschaum.core.User, debug: bool = False, **kw: Any) ‑> SuccessTuple
-
Update an existing user's metadata.
Expand source code
def edit_user( self, user: meerschaum.core.User, debug: bool = False, **kw: Any ) -> SuccessTuple: """Update an existing user's metadata.""" from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') from meerschaum.connectors.sql.tables import get_tables from meerschaum.utils.sql import json_flavors users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) if user_id is None: return False, ( f"User '{user.username}' does not exist. " + f"Register user '{user.username}' before editing." ) user.user_id = user_id import json valid_tuple = valid_username(user.username) if not valid_tuple[0]: return valid_tuple bind_variables = { 'user_id' : user_id, 'username' : user.username, } if user.password != '': bind_variables['password_hash'] = user.password_hash if user.email != '': bind_variables['email'] = user.email if user.attributes is not None and user.attributes != {}: bind_variables['attributes'] = ( json.dumps(user.attributes) if self.flavor in ('duckdb',) else user.attributes ) if user.type != '': bind_variables['user_type'] = user.type query = ( sqlalchemy .update(users_tbl) .values(**bind_variables) .where(users_tbl.c.user_id == user_id) ) result = self.exec(query, debug=debug) if result is None: return False, f"Failed to edit user '{user}'." return True, f"Successfully edited user '{user}'."
def exec(self, query: str, *args: Any, silent: bool = False, debug: bool = False, commit: Optional[bool] = None, close: Optional[bool] = None, with_connection: bool = False, **kw: Any) ‑> Union[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.cursor.LegacyCursorResult, Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection], Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection], None]
-
Execute SQL code and return the
sqlalchemy
result, e.g. when calling stored procedures.If inserting data, please use bind variables to avoid SQL injection!
Parameters
query
:Union[str, List[str], Tuple[str]]
- The query to execute.
If
query
is a list or tuple, callself.exec_queries()
instead. args
:Any
- Arguments passed to
sqlalchemy.engine.execute
. silent
:bool
, defaultFalse
- If
True
, suppress warnings. commit
:Optional[bool]
, defaultNone
- If
True
, commit the changes after execution. Causes issues with flavors like'mssql'
. This does not apply ifquery
is a list of strings. close
:Optional[bool]
, defaultNone
- If
True
, close the connection after execution. Causes issues with flavors like'mssql'
. This does not apply ifquery
is a list of strings. with_connection
:bool
, defaultFalse
- If
True
, return a tuple including the connection object. This does not apply ifquery
is a list of strings.
Returns
The
sqlalchemy
result object, or a tuple with the connection ifwith_connection
is provided.Expand source code
def exec( self, query: str, *args: Any, silent: bool = False, debug: bool = False, commit: Optional[bool] = None, close: Optional[bool] = None, with_connection: bool = False, **kw: Any ) -> Union[ sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.cursor.LegacyCursorResult, Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection], Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection], None ]: """ Execute SQL code and return the `sqlalchemy` result, e.g. when calling stored procedures. If inserting data, please use bind variables to avoid SQL injection! Parameters ---------- query: Union[str, List[str], Tuple[str]] The query to execute. If `query` is a list or tuple, call `self.exec_queries()` instead. args: Any Arguments passed to `sqlalchemy.engine.execute`. silent: bool, default False If `True`, suppress warnings. commit: Optional[bool], default None If `True`, commit the changes after execution. Causes issues with flavors like `'mssql'`. This does not apply if `query` is a list of strings. close: Optional[bool], default None If `True`, close the connection after execution. Causes issues with flavors like `'mssql'`. This does not apply if `query` is a list of strings. with_connection: bool, default False If `True`, return a tuple including the connection object. This does not apply if `query` is a list of strings. Returns ------- The `sqlalchemy` result object, or a tuple with the connection if `with_connection` is provided. """ if isinstance(query, (list, tuple)): return self.exec_queries( list(query), *args, silent = silent, debug = debug, **kw ) from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import("sqlalchemy") if debug: dprint("Executing query:\n" + f"{query}") _close = close if close is not None else (self.flavor != 'mssql') _commit = commit if commit is not None else ( (self.flavor != 'mssql' or 'select' not in str(query).lower()) ) ### Select and Insert objects need to be compiled (SQLAlchemy 2.0.0+). if not hasattr(query, 'compile'): query = sqlalchemy.text(query) connection = self.engine.connect() transaction = connection.begin() if _commit else None try: result = connection.execute(query, *args, **kw) if _commit: transaction.commit() except Exception as e: if debug: dprint(f"Failed to execute query:\n\n{query}\n\n{e}") if not silent: warn(str(e)) result = None if _commit: transaction.rollback() finally: if _close: connection.close() if with_connection: return result, connection return result
def exec_queries(self, queries: List[str], break_on_error: bool = False, silent: bool = False, debug: bool = False) ‑> List[sqlalchemy.engine.cursor.LegacyCursorResult]
-
Execute a list of queries in a single transaction.
Parameters
queries
:List[str]
- The queries in the transaction to be executed.
break_on_error
:bool
, defaultFalse
- If
True
, stop executing when a query fails. silent
:bool
, defaultFalse
- If
True
, suppress warnings.
Returns
A list of SQLAlchemy results.
Expand source code
def exec_queries( self, queries: List[str], break_on_error: bool = False, silent: bool = False, debug: bool = False, ) -> List[sqlalchemy.engine.cursor.LegacyCursorResult]: """ Execute a list of queries in a single transaction. Parameters ---------- queries: List[str] The queries in the transaction to be executed. break_on_error: bool, default False If `True`, stop executing when a query fails. silent: bool, default False If `True`, suppress warnings. Returns ------- A list of SQLAlchemy results. """ from meerschaum.utils.warnings import warn from meerschaum.utils.debug import dprint from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') results = [] with self.engine.begin() as connection: for query in queries: if debug: dprint(query) if isinstance(query, str): query = sqlalchemy.text(query) try: result = connection.execute(query) except Exception as e: msg = (f"Encountered error while executing:\n{e}") if not silent: warn(msg) elif debug: dprint(msg) result = None results.append(result) if result is None and break_on_error: break return results
def execute(self, *args: Any, **kw: Any) ‑> Optional[sqlalchemy.engine.result.resultProxy]
-
An alias for
meerschaum.connectors.sql.SQLConnector.exec
.Expand source code
def execute( self, *args : Any, **kw : Any ) -> Optional[sqlalchemy.engine.result.resultProxy]: """ An alias for `meerschaum.connectors.sql.SQLConnector.exec`. """ return self.exec(*args, **kw)
def fetch(self, pipe: Pipe, begin: Union[datetime.datetime, str, None] = '', end: Union[datetime.datetime, str, None] = None, chunk_hook: Optional[Callable[[pd.DataFrame], Any]] = None, chunksize: Optional[int] = -1, workers: Optional[int] = None, debug: bool = False, **kw: Any) ‑> Union['pd.DataFrame', List[Any], None]
-
Execute the SQL definition and return a Pandas DataFrame.
Parameters
pipe
:Pipe
-
The pipe object which contains the
fetch
metadata.- pipe.columns['datetime']: str
- Name of the datetime column for the remote table.
- pipe.parameters['fetch']: Dict[str, Any]
- Parameters necessary to execute a query.
- pipe.parameters['fetch']['definition']: str
- Raw SQL query to execute to generate the pandas DataFrame.
- pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
- How many minutes before
begin
to search for data (optional).
- How many minutes before
- pipe.columns['datetime']: str
begin
:Union[datetime.datetime, str, None]
, defaultNone
- Most recent datatime to search for data.
If
backtrack_minutes
is provided, subtractbacktrack_minutes
. end
:Union[datetime.datetime, str, None]
, defaultNone
- The latest datetime to search for data.
If
end
isNone
, do not bound chunk_hook
:Callable[[pd.DataFrame], Any]
, defaultNone
- A function to pass to
read()
that accepts a Pandas DataFrame. chunksize
:Optional[int]
, default-1
- How many rows to load into memory at once (when
chunk_hook
is provided). Otherwise the entire result set is loaded into memory. workers
:Optional[int]
, defaultNone
- How many threads to use when consuming the generator (when `chunk_hook is provided). Defaults to the number of cores.
debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A pandas DataFrame or
None
. Ifchunk_hook
is not None, return a list of the hook function's results.Expand source code
def fetch( self, pipe: meerschaum.Pipe, begin: Union[datetime.datetime, str, None] = '', end: Union[datetime.datetime, str, None] = None, chunk_hook: Optional[Callable[[pd.DataFrame], Any]] = None, chunksize: Optional[int] = -1, workers: Optional[int] = None, debug: bool = False, **kw: Any ) -> Union['pd.DataFrame', List[Any], None]: """Execute the SQL definition and return a Pandas DataFrame. Parameters ---------- pipe: meerschaum.Pipe The pipe object which contains the `fetch` metadata. - pipe.columns['datetime']: str - Name of the datetime column for the remote table. - pipe.parameters['fetch']: Dict[str, Any] - Parameters necessary to execute a query. - pipe.parameters['fetch']['definition']: str - Raw SQL query to execute to generate the pandas DataFrame. - pipe.parameters['fetch']['backtrack_minutes']: Union[int, float] - How many minutes before `begin` to search for data (*optional*). begin: Union[datetime.datetime, str, None], default None Most recent datatime to search for data. If `backtrack_minutes` is provided, subtract `backtrack_minutes`. end: Union[datetime.datetime, str, None], default None The latest datetime to search for data. If `end` is `None`, do not bound chunk_hook: Callable[[pd.DataFrame], Any], default None A function to pass to `SQLConnector.read()` that accepts a Pandas DataFrame. chunksize: Optional[int], default -1 How many rows to load into memory at once (when `chunk_hook` is provided). Otherwise the entire result set is loaded into memory. workers: Optional[int], default None How many threads to use when consuming the generator (when `chunk_hook is provided). Defaults to the number of cores. debug: bool, default False Verbosity toggle. Returns ------- A pandas DataFrame or `None`. If `chunk_hook` is not None, return a list of the hook function's results. """ meta_def = self.get_pipe_metadef( pipe, begin = begin, end = end, debug = debug, **kw ) as_hook_results = chunk_hook is not None chunks = self.read( meta_def, chunk_hook = chunk_hook, as_hook_results = as_hook_results, chunksize = chunksize, workers = workers, debug = debug, ) ### if sqlite, parse for datetimes if not as_hook_results and self.flavor == 'sqlite': from meerschaum.utils.misc import parse_df_datetimes ignore_cols = [ col for col, dtype in pipe.dtypes.items() if 'datetime' not in str(dtype) ] return ( parse_df_datetimes( chunk, ignore_cols = ignore_cols, debug = debug, ) for chunk in chunks ) return chunks
def fetch_pipes_keys(self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False) ‑> Optional[List[Tuple[str, str, Optional[str]]]]
-
Return a list of tuples corresponding to the parameters provided.
Parameters
connector_keys
:Optional[List[str]]
, defaultNone
- List of connector_keys to search by.
metric_keys
:Optional[List[str]]
, defaultNone
- List of metric_keys to search by.
location_keys
:Optional[List[str]]
, defaultNone
- List of location_keys to search by.
params
:Optional[Dict[str, Any]]
, defaultNone
- Dictionary of additional parameters to search by.
E.g.
--params pipe_id:1
debug
:bool
, defaultFalse
- Verbosity toggle.
Expand source code
def fetch_pipes_keys( self, connector_keys: Optional[List[str]] = None, metric_keys: Optional[List[str]] = None, location_keys: Optional[List[str]] = None, tags: Optional[List[str]] = None, params: Optional[Dict[str, Any]] = None, debug: bool = False ) -> Optional[List[Tuple[str, str, Optional[str]]]]: """ Return a list of tuples corresponding to the parameters provided. Parameters ---------- connector_keys: Optional[List[str]], default None List of connector_keys to search by. metric_keys: Optional[List[str]], default None List of metric_keys to search by. location_keys: Optional[List[str]], default None List of location_keys to search by. params: Optional[Dict[str, Any]], default None Dictionary of additional parameters to search by. E.g. `--params pipe_id:1` debug: bool, default False Verbosity toggle. """ from meerschaum.utils.warnings import error from meerschaum.utils.debug import dprint from meerschaum.utils.packages import attempt_import from meerschaum.utils.misc import separate_negation_values from meerschaum.utils.sql import OMIT_NULLSFIRST_FLAVORS, table_exists from meerschaum.config.static import STATIC_CONFIG sqlalchemy = attempt_import('sqlalchemy') import json from copy import deepcopy if connector_keys is None: connector_keys = [] if metric_keys is None: metric_keys = [] if location_keys is None: location_keys = [] else: location_keys = [ (lk if lk not in ('[None]', 'None', 'null') else None) for lk in location_keys ] if tags is None: tags = [] if params is None: params = {} ### Add three primary keys to params dictionary ### (separated for convenience of arguments). cols = { 'connector_keys': connector_keys, 'metric_key': metric_keys, 'location_key': location_keys, } ### Make deep copy so we don't mutate this somewhere else. parameters = deepcopy(params) for col, vals in cols.items(): ### Allow for IS NULL to be declared as a single-item list ([None]). if vals == [None]: vals = None if vals not in [[], ['*']]: parameters[col] = vals cols = {k: v for k, v in cols.items() if v != [None]} if not table_exists('pipes', self, debug=debug): return [] from meerschaum.connectors.sql.tables import get_tables pipes_tbl = get_tables(mrsm_instance=self, create=False, debug=debug)['pipes'] _params = {} for k, v in parameters.items(): _v = json.dumps(v) if isinstance(v, dict) else v _params[k] = _v negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] ### Parse regular params. ### If a param begins with '_', negate it instead. _where = [ ( (pipes_tbl.c[key] == val) if not str(val).startswith(negation_prefix) else (pipes_tbl.c[key] != key) ) for key, val in _params.items() if not isinstance(val, (list, tuple)) and key in pipes_tbl.c ] select_cols = ( [pipes_tbl.c.connector_keys, pipes_tbl.c.metric_key, pipes_tbl.c.location_key] + ([pipes_tbl.c.parameters] if tags else []) ) q = sqlalchemy.select(*select_cols).where(sqlalchemy.and_(True, *_where)) ### Parse IN params and add OR IS NULL if None in list. for c, vals in cols.items(): if not isinstance(vals, (list, tuple)) or not vals or not c in pipes_tbl.c: continue _in_vals, _ex_vals = separate_negation_values(vals) ### Include params (positive) q = ( q.where(pipes_tbl.c[c].in_(_in_vals)) if None not in _in_vals else q.where(sqlalchemy.or_(pipes_tbl.c[c].in_(_in_vals), pipes_tbl.c[c].is_(None))) ) if _in_vals else q ### Exclude params (negative) q = q.where(pipes_tbl.c[c].not_in(_ex_vals)) if _ex_vals else q ### Finally, parse tags. _in_tags, _ex_tags = separate_negation_values(tags) ors = [] for nt in _in_tags: ors.append( sqlalchemy.cast( pipes_tbl.c['parameters'], sqlalchemy.String, ).like(f'%"tags":%"{nt}"%') ) q = q.where(sqlalchemy.and_(sqlalchemy.or_(*ors).self_group())) if ors else q ors = [] for xt in _ex_tags: ors.append( sqlalchemy.cast( pipes_tbl.c['parameters'], sqlalchemy.String, ).not_like(f'%"tags":%"{xt}"%') ) q = q.where(sqlalchemy.and_(sqlalchemy.or_(*ors).self_group())) if ors else q loc_asc = sqlalchemy.asc(pipes_tbl.c['location_key']) if self.flavor not in OMIT_NULLSFIRST_FLAVORS: loc_asc = sqlalchemy.nullsfirst(loc_asc) q = q.order_by( sqlalchemy.asc(pipes_tbl.c['connector_keys']), sqlalchemy.asc(pipes_tbl.c['metric_key']), loc_asc, ) ### execute the query and return a list of tuples if debug: dprint(q.compile(compile_kwargs={'literal_binds': True})) try: rows = self.execute(q).fetchall() except Exception as e: error(str(e)) _keys = [(row[0], row[1], row[2]) for row in rows] if not tags: return _keys ### Make 100% sure that the tags are correct. keys = [] for row in rows: ktup = (row[0], row[1], row[2]) _actual_tags = ( json.loads(row[3]) if isinstance(row[3], str) else row[3] ).get('tags', []) for nt in _in_tags: if nt in _actual_tags: keys.append(ktup) for xt in _ex_tags: if xt in _actual_tags: keys.remove(ktup) else: keys.append(ktup) return keys
def get_add_columns_queries(self, pipe: mrsm.Pipe, df: Union[pd.DataFrame, Dict[str, str]], debug: bool = False) ‑> List[str]
-
Add new null columns of the correct type to a table from a dataframe.
Parameters
pipe
:mrsm.Pipe
- The pipe to be altered.
df
:Union[pd.DataFrame, Dict[str, str]]
- The pandas DataFrame which contains new columns. If a dictionary is provided, assume it maps columns to Pandas data types.
Returns
A list of the
ALTER TABLE
SQL query or queries to be executed on the provided connector.Expand source code
def get_add_columns_queries( self, pipe: mrsm.Pipe, df: Union[pd.DataFrame, Dict[str, str]], debug: bool = False, ) -> List[str]: """ Add new null columns of the correct type to a table from a dataframe. Parameters ---------- pipe: mrsm.Pipe The pipe to be altered. df: Union[pd.DataFrame, Dict[str, str]] The pandas DataFrame which contains new columns. If a dictionary is provided, assume it maps columns to Pandas data types. Returns ------- A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector. """ if not pipe.exists(debug=debug): return [] import copy from meerschaum.utils.sql import ( get_pd_type, get_db_type, sql_item_name, SINGLE_ALTER_TABLE_FLAVORS, ) from meerschaum.utils.misc import flatten_list table_obj = self.get_pipe_table(pipe, debug=debug) df_cols_types = ( { col: str(typ) for col, typ in df.dtypes.items() } if not isinstance(df, dict) else copy.deepcopy(df) ) if len(df) > 0 and not isinstance(df, dict): for col, typ in list(df_cols_types.items()): if typ != 'object': continue val = df.iloc[0][col] if isinstance(val, (dict, list)): df_cols_types[col] = 'json' elif isinstance(val, str): df_cols_types[col] = 'str' db_cols_types = {col: get_pd_type(str(typ.type)) for col, typ in table_obj.columns.items()} new_cols = set(df_cols_types) - set(db_cols_types) if not new_cols: return [] new_cols_types = { col: get_db_type( df_cols_types[col], self.flavor ) for col in new_cols } alter_table_query = "ALTER TABLE " + sql_item_name(pipe.target, self.flavor) queries = [] for col, typ in new_cols_types.items(): add_col_query = "\nADD " + sql_item_name(col, self.flavor) + " " + typ + "," if self.flavor in SINGLE_ALTER_TABLE_FLAVORS: queries.append(alter_table_query + add_col_query[:-1]) else: alter_table_query += add_col_query ### For most flavors, only one query is required. ### This covers SQLite which requires one query per column. if not queries: queries.append(alter_table_query[:-1]) if self.flavor != 'duckdb': return queries ### NOTE: For DuckDB, we must drop and rebuild the indices. drop_index_queries = list(flatten_list( [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()] )) create_index_queries = list(flatten_list( [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()] )) return drop_index_queries + queries + create_index_queries
def get_alter_columns_queries(self, pipe: mrsm.Pipe, df: Union[pd.DataFrame, Dict[str, str]], debug: bool = False) ‑> List[str]
-
If we encounter a column of a different type, set the entire column to text.
Parameters
pipe
:mrsm.Pipe
- The pipe to be altered.
df
:Union[pd.DataFrame, Dict[str, str]]
- The pandas DataFrame which may contain altered columns. If a dict is provided, assume it maps columns to Pandas data types.
Returns
A list of the
ALTER TABLE
SQL query or queries to be executed on the provided connector.Expand source code
def get_alter_columns_queries( self, pipe: mrsm.Pipe, df: Union[pd.DataFrame, Dict[str, str]], debug: bool = False, ) -> List[str]: """ If we encounter a column of a different type, set the entire column to text. Parameters ---------- pipe: mrsm.Pipe The pipe to be altered. df: Union[pd.DataFrame, Dict[str, str]] The pandas DataFrame which may contain altered columns. If a dict is provided, assume it maps columns to Pandas data types. Returns ------- A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector. """ if not pipe.exists(debug=debug): return [] from meerschaum.utils.sql import get_pd_type, get_db_type, sql_item_name from meerschaum.utils.misc import flatten_list, generate_password table_obj = self.get_pipe_table(pipe, debug=debug) target = pipe.target session_id = generate_password(3) df_cols_types = ( {col: str(typ) for col, typ in df.dtypes.items()} if not isinstance(df, dict) else df ) db_cols_types = {col: get_pd_type(str(typ.type)) for col, typ in table_obj.columns.items()} altered_cols = [ col for col, typ in df_cols_types.items() if typ.lower() != db_cols_types.get(col, 'object').lower() and db_cols_types.get(col, 'object') != 'object' ] if not altered_cols: return [] text_type = get_db_type('str', self.flavor) altered_cols_types = { col: text_type for col in altered_cols } if self.flavor == 'sqlite': temp_table_name = '_' + session_id + '_' + target rename_query = ( "ALTER TABLE " + sql_item_name(target, self.flavor) + " RENAME TO " + sql_item_name(temp_table_name, self.flavor) ) create_query = ( "CREATE TABLE " + sql_item_name(target, self.flavor) + " (\n" ) for col_name, col_obj in table_obj.columns.items(): create_query += ( sql_item_name(col_name, self.flavor) + " " + (str(col_obj.type) if col_name not in altered_cols else text_type) + ",\n" ) create_query = create_query[:-2] + "\n)" insert_query = ( "INSERT INTO " + sql_item_name(target, self.flavor) + "\nSELECT\n" ) for col_name, col_obj in table_obj.columns.items(): new_col_str = ( sql_item_name(col_name, self.flavor) if col_name not in altered_cols else f"CAST({sql_item_name(col_name, self.flavor)} AS {text_type})" ) insert_query += new_col_str + ",\n" insert_query = insert_query[:-2] + f"\nFROM {sql_item_name(temp_table_name, self.flavor)}" drop_query = f"DROP TABLE {sql_item_name(temp_table_name, self.flavor)}" return [ rename_query, create_query, insert_query, drop_query, ] queries = [] if self.flavor == 'oracle': add_query = "ALTER TABLE " + sql_item_name(target, self.flavor) for col, typ in altered_cols_types.items(): add_query += "\nADD " + sql_item_name(col + '_temp', self.flavor) + " " + typ + "," add_query = add_query[:-1] queries.append(add_query) populate_temp_query = "UPDATE " + sql_item_name(target, self.flavor) for col, typ in altered_cols_types.items(): populate_temp_query += ( "\nSET " + sql_item_name(col + '_temp', self.flavor) + ' = ' + sql_item_name(col, self.flavor) + ',' ) populate_temp_query = populate_temp_query[:-1] queries.append(populate_temp_query) set_old_cols_to_null_query = "UPDATE " + sql_item_name(target, self.flavor) for col, typ in altered_cols_types.items(): set_old_cols_to_null_query += ( "\nSET " + sql_item_name(col, self.flavor) + ' = NULL,' ) set_old_cols_to_null_query = set_old_cols_to_null_query[:-1] queries.append(set_old_cols_to_null_query) alter_type_query = "ALTER TABLE " + sql_item_name(target, self.flavor) for col, typ in altered_cols_types.items(): alter_type_query += ( "\nMODIFY " + sql_item_name(col, self.flavor) + ' ' + typ + ',' ) alter_type_query = alter_type_query[:-1] queries.append(alter_type_query) set_old_to_temp_query = "UPDATE " + sql_item_name(target, self.flavor) for col, typ in altered_cols_types.items(): set_old_to_temp_query += ( "\nSET " + sql_item_name(col, self.flavor) + ' = ' + sql_item_name(col + '_temp', self.flavor) + ',' ) set_old_to_temp_query = set_old_to_temp_query[:-1] queries.append(set_old_to_temp_query) drop_temp_query = "ALTER TABLE " + sql_item_name(target, self.flavor) for col, typ in altered_cols_types.items(): drop_temp_query += ( "\nDROP COLUMN " + sql_item_name(col + '_temp', self.flavor) + ',' ) drop_temp_query = drop_temp_query[:-1] queries.append(drop_temp_query) return queries query = "ALTER TABLE " + sql_item_name(target, self.flavor) for col, typ in altered_cols_types.items(): alter_col_prefix = ( 'ALTER' if self.flavor not in ('mysql', 'mariadb', 'oracle') else 'MODIFY' ) type_prefix = ( '' if self.flavor in ('mssql', 'mariadb', 'mysql') else 'TYPE ' ) column_str = 'COLUMN' if self.flavor != 'oracle' else '' query += ( f"\n{alter_col_prefix} {column_str} " + sql_item_name(col, self.flavor) + " " + type_prefix + typ + "," ) query = query[:-1] queries.append(query) if self.flavor != 'duckdb': return queries drop_index_queries = list(flatten_list( [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()] )) create_index_queries = list(flatten_list( [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()] )) return drop_index_queries + queries + create_index_queries
def get_backtrack_data(self, pipe: Optional[Pipe] = None, backtrack_minutes: int = 0, begin: Optional[datetime.datetime] = None, params: Optional[Dict[str, Any]] = None, limit: Optional[int] = None, chunksize: Optional[int] = -1, debug: bool = False) ‑> Union[pandas.DataFrame, None]
-
Get the most recent backtrack_minutes' worth of data from a Pipe.
Parameters
pipe
:meerschaum.Pipe:
- The pipe to get data from.
backtrack_minutes
:int
, default0
- How far into the past to look for data.
begin
:Optional[datetime.datetime]
, defaultNone
- Where to start traversing from. Defaults to
None
, which uses theget_sync_time()
value. params
:Optional[Dict[str, Any]]
, defaultNone
- Additional parameters to filter by.
See
meerschaum.connectors.sql.build_where
. limit
:Optional[int]
, defaultNone
- If specified, limit the number of rows retrieved to this value.
chunksize
:Optional[int]
, default-1
- The size of dataframe chunks to load into memory.
debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A
pd.DataFrame
of backtracked data.Expand source code
def get_backtrack_data( self, pipe: Optional[meerschaum.Pipe] = None, backtrack_minutes: int = 0, begin: Optional[datetime.datetime] = None, params: Optional[Dict[str, Any]] = None, limit: Optional[int] = None, chunksize: Optional[int] = -1, debug: bool = False ) -> Union[pandas.DataFrame, None]: """ Get the most recent backtrack_minutes' worth of data from a Pipe. Parameters ---------- pipe: meerschaum.Pipe: The pipe to get data from. backtrack_minutes: int, default 0 How far into the past to look for data. begin: Optional[datetime.datetime], default None Where to start traversing from. Defaults to `None`, which uses the `meerschaum.Pipe.get_sync_time` value. params: Optional[Dict[str, Any]], default None Additional parameters to filter by. See `meerschaum.connectors.sql.build_where`. limit: Optional[int], default None If specified, limit the number of rows retrieved to this value. chunksize: Optional[int], default -1 The size of dataframe chunks to load into memory. debug: bool, default False Verbosity toggle. Returns ------- A `pd.DataFrame` of backtracked data. """ import datetime from meerschaum.utils.warnings import error if pipe is None: error("Pipe must be provided.") if begin is None: begin = pipe.get_sync_time(debug=debug) return pipe.get_data( begin = begin, begin_add_minutes = (-1 * backtrack_minutes), order = 'desc', params = params, limit = limit, chunksize = chunksize, debug = debug, )
def get_create_index_queries(self, pipe: Pipe, debug: bool = False) ‑> Dict[str, List[str]]
-
Return a dictionary mapping columns to a
CREATE INDEX
or equivalent query.Parameters
pipe
:Pipe
- The pipe to which the queries will correspond.
Returns
A dictionary of column names mapping to lists of queries.
Expand source code
def get_create_index_queries( self, pipe: meerschaum.Pipe, debug: bool = False, ) -> Dict[str, List[str]]: """ Return a dictionary mapping columns to a `CREATE INDEX` or equivalent query. Parameters ---------- pipe: meerschaum.Pipe The pipe to which the queries will correspond. Returns ------- A dictionary of column names mapping to lists of queries. """ from meerschaum.utils.debug import dprint from meerschaum.utils.sql import sql_item_name, get_distinct_col_count from meerschaum.utils.warnings import warn from meerschaum.config import get_config index_queries = {} indices = pipe.get_indices() _datetime = pipe.get_columns('datetime', error=False) _datetime_type = pipe.dtypes.get(_datetime, 'datetime64[ns]') _datetime_name = sql_item_name(_datetime, self.flavor) if _datetime is not None else None _datetime_index_name = ( sql_item_name(indices['datetime'], self.flavor) if indices.get('datetime', None) else None ) _id = pipe.get_columns('id', error=False) _id_name = sql_item_name(_id, self.flavor) if _id is not None else None _id_index_name = sql_item_name(indices['id'], self.flavor) if indices.get('id') else None _pipe_name = sql_item_name(pipe.target, self.flavor) _create_space_partition = get_config('system', 'experimental', 'space') ### create datetime index if _datetime is not None: if self.flavor == 'timescaledb': _id_count = ( get_distinct_col_count(_id, f"SELECT {_id_name} FROM {_pipe_name}", self) if (_id is not None and _create_space_partition) else None ) chunk_time_interval = ( pipe.parameters.get('chunk_time_interval', None) or ("INTERVAL '1 DAY'" if not 'int' in _datetime_type.lower() else '100000') ) dt_query = ( f"SELECT create_hypertable('{_pipe_name}', " + f"'{_datetime}', " + ( f"'{_id}', {_id_count}, " if (_id is not None and _create_space_partition) else '' ) + f'chunk_time_interval => {chunk_time_interval}, ' + "migrate_data => true);" ) else: ### mssql, sqlite, etc. dt_query = ( f"CREATE INDEX {_datetime_index_name} " + f"ON {_pipe_name} ({_datetime_name})" ) index_queries[_datetime] = [dt_query] ### create id index if _id_name is not None: if self.flavor == 'timescaledb': ### Already created indices via create_hypertable. id_query = ( None if (_id is not None and _create_space_partition) else ( f"CREATE INDEX {_id_index_name} ON {_pipe_name} ({_id_name})" if _id is not None else None ) ) pass elif self.flavor == 'citus': id_query = [( f"CREATE INDEX {_id_index_name} " + f"ON {_pipe_name} ({_id_name});" ), ( f"SELECT create_distributed_table('{_pipe_name}', '{_id}');" )] else: ### mssql, sqlite, etc. id_query = f"CREATE INDEX {_id_index_name} ON {_pipe_name} ({_id_name})" if id_query is not None: index_queries[_id] = [id_query] ### Create indices for other labels in `pipe.columns`. other_indices = { ix_key: ix_unquoted for ix_key, ix_unquoted in pipe.get_indices().items() if ix_key not in ('datetime', 'id') } for ix_key, ix_unquoted in other_indices.items(): ix_name = sql_item_name(ix_unquoted, self.flavor) col = pipe.columns[ix_key] col_name = sql_item_name(col, self.flavor) index_queries[col] = [f"CREATE INDEX {ix_name} ON {_pipe_name} ({col_name})"] return index_queries
def get_drop_index_queries(self, pipe: Pipe, debug: bool = False) ‑> Dict[str, List[str]]
-
Return a dictionary mapping columns to a
DROP INDEX
or equivalent query.Parameters
pipe
:Pipe
- The pipe to which the queries will correspond.
Returns
A dictionary of column names mapping to lists of queries.
Expand source code
def get_drop_index_queries( self, pipe: meerschaum.Pipe, debug: bool = False, ) -> Dict[str, List[str]]: """ Return a dictionary mapping columns to a `DROP INDEX` or equivalent query. Parameters ---------- pipe: meerschaum.Pipe The pipe to which the queries will correspond. Returns ------- A dictionary of column names mapping to lists of queries. """ if not pipe.exists(debug=debug): return {} from meerschaum.utils.sql import sql_item_name, table_exists, hypertable_queries drop_queries = {} indices = pipe.get_indices() pipe_name = sql_item_name(pipe.target, self.flavor) if self.flavor not in hypertable_queries: is_hypertable = False else: is_hypertable_query = hypertable_queries[self.flavor].format(table_name=pipe_name) is_hypertable = self.value(is_hypertable_query, silent=True, debug=debug) is not None if is_hypertable: nuke_queries = [] temp_table = '_' + pipe.target + '_temp_migration' temp_table_name = sql_item_name(temp_table, self.flavor) if table_exists(temp_table, self, debug=debug): nuke_queries.append(f"DROP TABLE {temp_table_name}") nuke_queries += [ f"SELECT * INTO {temp_table_name} FROM {pipe_name}", f"DROP TABLE {pipe_name}", f"ALTER TABLE {temp_table_name} RENAME TO {pipe_name}", ] nuke_ix_keys = ('datetime', 'id') nuked = False for ix_key in nuke_ix_keys: if ix_key in indices and not nuked: drop_queries[ix_key] = nuke_queries nuked = True drop_queries.update({ ix_key: ["DROP INDEX " + sql_item_name(ix_unquoted, self.flavor)] for ix_key, ix_unquoted in indices.items() if ix_key not in drop_queries }) return drop_queries
def get_pipe_attributes(self, pipe: Pipe, debug: bool = False) ‑> Dict[str, Any]
-
Get a Pipe's attributes dictionary.
Expand source code
def get_pipe_attributes( self, pipe: meerschaum.Pipe, debug: bool = False, ) -> Dict[str, Any]: """ Get a Pipe's attributes dictionary. """ from meerschaum.utils.warnings import warn from meerschaum.utils.debug import dprint from meerschaum.connectors.sql.tables import get_tables from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') if pipe.get_id(debug=debug) is None: return {} pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] try: q = sqlalchemy.select(pipes_tbl).where(pipes_tbl.c.pipe_id == pipe.id) if debug: dprint(q) attributes = ( dict(self.exec(q, silent=True, debug=debug).first()._mapping) if self.flavor != 'duckdb' else self.read(q, debug=debug).to_dict(orient='records')[0] ) except Exception as e: import traceback traceback.print_exc() warn(e) print(pipe) return {} ### handle non-PostgreSQL databases (text vs JSON) if not isinstance(attributes.get('parameters', None), dict): try: import json parameters = json.loads(attributes['parameters']) if isinstance(parameters, str) and parameters[0] == '{': parameters = json.loads(parameters) attributes['parameters'] = parameters except Exception as e: attributes['parameters'] = {} return attributes
def get_pipe_columns_types(self, pipe: Pipe, debug: bool = False) ‑> Optional[Dict[str, str]]
-
Get the pipe's columns and types.
Parameters
pipe
:meerschaum.Pipe:
- The pipe to get the columns for.
Returns
A dictionary of columns names (
str
) and types (str
).Examples
>>> conn.get_pipe_columns_types(pipe) { 'dt': 'TIMESTAMP WITHOUT TIMEZONE', 'id': 'BIGINT', 'val': 'DOUBLE PRECISION', } >>>
Expand source code
def get_pipe_columns_types( self, pipe: meerschaum.Pipe, debug: bool = False, ) -> Optional[Dict[str, str]]: """ Get the pipe's columns and types. Parameters ---------- pipe: meerschaum.Pipe: The pipe to get the columns for. Returns ------- A dictionary of columns names (`str`) and types (`str`). Examples -------- >>> conn.get_pipe_columns_types(pipe) { 'dt': 'TIMESTAMP WITHOUT TIMEZONE', 'id': 'BIGINT', 'val': 'DOUBLE PRECISION', } >>> """ if not pipe.exists(debug=debug): return {} table_columns = {} try: pipe_table = self.get_pipe_table(pipe, debug=debug) if pipe_table is None: return {} for col in pipe_table.columns: table_columns[str(col.name)] = str(col.type) except Exception as e: import traceback traceback.print_exc() from meerschaum.utils.warnings import warn warn(e) table_columns = None return table_columns
def get_pipe_data(self, pipe: Optional[Pipe] = None, begin: Union[datetime.datetime, str, None] = None, end: Union[datetime.datetime, str, None] = None, params: Optional[Dict[str, Any]] = None, order: str = 'asc', limit: Optional[int] = None, begin_add_minutes: int = 0, end_add_minutes: int = 0, debug: bool = False, **kw: Any) ‑> Union[pd.DataFrame, None]
-
Access a pipe's data from the SQL instance.
Parameters
pipe
:meerschaum.Pipe:
- The pipe to get data from.
begin
:Optional[datetime.datetime]
, defaultNone
- If provided, get rows newer than or equal to this value.
end
:Optional[datetime.datetime]
, defaultNone
- If provided, get rows older than or equal to this value.
params
:Optional[Dict[str, Any]]
, defaultNone
- Additional parameters to filter by.
See
meerschaum.connectors.sql.build_where
. order
:Optional[str]
, default'asc'
- The selection order for all of the indices in the query.
If
None
, omit theORDER BY
clause. limit
:Optional[int]
, defaultNone
- If specified, limit the number of rows retrieved to this value.
begin_add_minutes
:int
, default0
- The number of minutes to add to the
begin
datetime (i.e.DATEADD
. end_add_minutes
:int
, default0
- The number of minutes to add to the
end
datetime (i.e.DATEADD
. chunksize
:Optional[int]
, default-1
- The size of dataframe chunks to load into memory.
debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A
pd.DataFrame
of the pipe's data.Expand source code
def get_pipe_data( self, pipe: Optional[meerschaum.Pipe] = None, begin: Union[datetime.datetime, str, None] = None, end: Union[datetime.datetime, str, None] = None, params: Optional[Dict[str, Any]] = None, order: str = 'asc', limit: Optional[int] = None, begin_add_minutes: int = 0, end_add_minutes: int = 0, debug: bool = False, **kw: Any ) -> Union[pd.DataFrame, None]: """ Access a pipe's data from the SQL instance. Parameters ---------- pipe: meerschaum.Pipe: The pipe to get data from. begin: Optional[datetime.datetime], default None If provided, get rows newer than or equal to this value. end: Optional[datetime.datetime], default None If provided, get rows older than or equal to this value. params: Optional[Dict[str, Any]], default None Additional parameters to filter by. See `meerschaum.connectors.sql.build_where`. order: Optional[str], default 'asc' The selection order for all of the indices in the query. If `None`, omit the `ORDER BY` clause. limit: Optional[int], default None If specified, limit the number of rows retrieved to this value. begin_add_minutes: int, default 0 The number of minutes to add to the `begin` datetime (i.e. `DATEADD`. end_add_minutes: int, default 0 The number of minutes to add to the `end` datetime (i.e. `DATEADD`. chunksize: Optional[int], default -1 The size of dataframe chunks to load into memory. debug: bool, default False Verbosity toggle. Returns ------- A `pd.DataFrame` of the pipe's data. """ import json from meerschaum.utils.sql import sql_item_name dtypes = pipe.dtypes if dtypes: if self.flavor == 'sqlite': if not pipe.columns.get('datetime', None): _dt = pipe.guess_datetime() dt = sql_item_name(_dt, self.flavor) if _dt else None is_guess = True else: _dt = pipe.get_columns('datetime') dt = sql_item_name(_dt, self.flavor) is_guess = False if _dt: dt_type = dtypes.get(_dt, 'object').lower() if 'datetime' not in dt_type: if 'int' not in dt_type: dtypes[_dt] = 'datetime64[ns]' existing_cols = pipe.get_columns_types(debug=debug) if existing_cols: dtypes = {col: typ for col, typ in dtypes.items() if col in existing_cols} query = self.get_pipe_data_query( pipe, begin = begin, end = end, params = params, order = order, limit = limit, begin_add_minutes = begin_add_minutes, end_add_minutes = end_add_minutes, debug = debug, **kw ) df = self.read( query, debug = debug, **kw ) if self.flavor == 'sqlite': from meerschaum.utils.misc import parse_df_datetimes from meerschaum.utils.packages import import_pandas pd = import_pandas() ### NOTE: We have to consume the iterator here to ensure that datetimes are parsed correctly df = ( parse_df_datetimes( df, ignore_cols = [ col for col, dtype in pipe.dtypes.items() if 'datetime' not in str(dtype) ], debug = debug, ) if isinstance(df, pd.DataFrame) else ( [ parse_df_datetimes( c, ignore_cols = [ col for col, dtype in pipe.dtypes.items() if 'datetime' not in str(dtype) ], debug = debug, ) for c in df ] ) ) for col, typ in dtypes.items(): if typ != 'json': continue df[col] = df[col].apply(lambda x: json.loads(x) if x is not None else x) return df
def get_pipe_data_query(self, pipe: Optional[Pipe] = None, begin: Union[datetime.datetime, str, None] = None, end: Union[datetime.datetime, str, None] = None, params: Optional[Dict[str, Any]] = None, order: str = 'asc', limit: Optional[int] = None, begin_add_minutes: int = 0, end_add_minutes: int = 0, replace_nulls: Optional[str] = None, debug: bool = False, **kw: Any) ‑> Union[str, None]
-
Return the
SELECT
query for retrieving a pipe's data from its instance.Parameters
pipe
:meerschaum.Pipe:
- The pipe to get data from.
begin
:Optional[datetime.datetime]
, defaultNone
- If provided, get rows newer than or equal to this value.
end
:Optional[datetime.datetime]
, defaultNone
- If provided, get rows older than or equal to this value.
params
:Optional[Dict[str, Any]]
, defaultNone
- Additional parameters to filter by.
See
meerschaum.connectors.sql.build_where
. order
:Optional[str]
, default'asc'
- The selection order for all of the indices in the query.
If
None
, omit theORDER BY
clause. limit
:Optional[int]
, defaultNone
- If specified, limit the number of rows retrieved to this value.
begin_add_minutes
:int
, default0
- The number of minutes to add to the
begin
datetime (i.e.DATEADD
. end_add_minutes
:int
, default0
- The number of minutes to add to the
end
datetime (i.e.DATEADD
. chunksize
:Optional[int]
, default-1
- The size of dataframe chunks to load into memory.
replace_nulls
:Optional[str]
, defaultNone
- If provided, replace null values with this value.
debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A
SELECT
query to retrieve a pipe's data.Expand source code
def get_pipe_data_query( self, pipe: Optional[meerschaum.Pipe] = None, begin: Union[datetime.datetime, str, None] = None, end: Union[datetime.datetime, str, None] = None, params: Optional[Dict[str, Any]] = None, order: str = 'asc', limit: Optional[int] = None, begin_add_minutes: int = 0, end_add_minutes: int = 0, replace_nulls: Optional[str] = None, debug: bool = False, **kw: Any ) -> Union[str, None]: """ Return the `SELECT` query for retrieving a pipe's data from its instance. Parameters ---------- pipe: meerschaum.Pipe: The pipe to get data from. begin: Optional[datetime.datetime], default None If provided, get rows newer than or equal to this value. end: Optional[datetime.datetime], default None If provided, get rows older than or equal to this value. params: Optional[Dict[str, Any]], default None Additional parameters to filter by. See `meerschaum.connectors.sql.build_where`. order: Optional[str], default 'asc' The selection order for all of the indices in the query. If `None`, omit the `ORDER BY` clause. limit: Optional[int], default None If specified, limit the number of rows retrieved to this value. begin_add_minutes: int, default 0 The number of minutes to add to the `begin` datetime (i.e. `DATEADD`. end_add_minutes: int, default 0 The number of minutes to add to the `end` datetime (i.e. `DATEADD`. chunksize: Optional[int], default -1 The size of dataframe chunks to load into memory. replace_nulls: Optional[str], default None If provided, replace null values with this value. debug: bool, default False Verbosity toggle. Returns ------- A `SELECT` query to retrieve a pipe's data. """ import json from meerschaum.utils.debug import dprint from meerschaum.utils.misc import items_str from meerschaum.utils.sql import sql_item_name, dateadd_str from meerschaum.utils.packages import import_pandas from meerschaum.utils.warnings import warn pd = import_pandas() select_cols = "SELECT *" if replace_nulls: existing_cols = pipe.get_columns_types(debug=debug) if existing_cols: select_cols = "SELECT " for col in existing_cols: select_cols += ( f"\n COALESCE({sql_item_name(col, self.flavor)}, " + f"'{replace_nulls}') AS {sql_item_name(col, self.flavor)}," ) select_cols = select_cols[:-1] query = f"{select_cols}\nFROM {sql_item_name(pipe.target, self.flavor)}" where = "" if order is not None: default_order = 'asc' if order not in ('asc', 'desc'): warn(f"Ignoring unsupported order '{order}'. Falling back to '{default_order}'.") order = default_order order = order.upper() existing_cols = pipe.get_columns_types(debug=debug) if not pipe.columns.get('datetime', None): _dt = pipe.guess_datetime() dt = sql_item_name(_dt, self.flavor) if _dt else None is_guess = True else: _dt = pipe.get_columns('datetime') dt = sql_item_name(_dt, self.flavor) is_guess = False quoted_indices = { key: sql_item_name(val, self.flavor) for key, val in pipe.columns.items() if val in existing_cols } if begin is not None or end is not None: if is_guess: if _dt is None: warn( f"No datetime could be determined for {pipe}." + "\n Ignoring begin and end...", stack = False, ) begin, end = None, None else: warn( f"A datetime wasn't specified for {pipe}.\n" + f" Using column \"{_dt}\" for datetime bounds...", stack = False, ) is_dt_bound = False if begin is not None and _dt in existing_cols: begin_da = dateadd_str( flavor = self.flavor, datepart = 'minute', number = begin_add_minutes, begin = begin ) where += f"{dt} >= {begin_da}" + (" AND " if end is not None else "") is_dt_bound = True if end is not None and _dt in existing_cols: if 'int' in str(type(end)).lower() and end == begin: end += 1 end_da = dateadd_str( flavor = self.flavor, datepart = 'minute', number = end_add_minutes, begin = end ) where += f"{dt} < {end_da}" is_dt_bound = True if params is not None: from meerschaum.utils.sql import build_where valid_params = {k: v for k, v in params.items() if k in existing_cols} if valid_params: where += build_where(valid_params, self).replace( 'WHERE', ('AND' if is_dt_bound else "") ) if len(where) > 0: query += "\nWHERE " + where if order is not None: ### Sort by indices, starting with datetime. order_by = "" if quoted_indices: order_by += "\nORDER BY " if _dt and _dt in existing_cols: order_by += dt + ' ' + order + ',' for key, quoted_col_name in quoted_indices.items(): if key == 'datetime': continue order_by += ' ' + quoted_col_name + ' ' + order + ',' order_by = order_by[:-1] query += order_by if isinstance(limit, int): if self.flavor == 'mssql': query = f'SELECT TOP {limit} ' + query[len("SELECT *"):] elif self.flavor == 'oracle': query = f"SELECT * FROM (\n {query}\n)\nWHERE ROWNUM = 1" else: query += f"\nLIMIT {limit}" if debug: to_print = ( [] + ([f"begin='{begin}'"] if begin else []) + ([f"end='{end}'"] if end else []) + ([f"params='{json.dumps(params)}'"] if params else []) ) dprint("Getting pipe data with constraints: " + items_str(to_print, quotes=False)) return query
def get_pipe_id(self, pipe: Pipe, debug: bool = False) ‑> Any
-
Get a Pipe's ID from the pipes table.
Expand source code
def get_pipe_id( self, pipe: meerschaum.Pipe, debug: bool = False, ) -> Any: """ Get a Pipe's ID from the pipes table. """ if pipe.temporary: return None from meerschaum.utils.packages import attempt_import import json sqlalchemy = attempt_import('sqlalchemy') from meerschaum.connectors.sql.tables import get_tables pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] query = sqlalchemy.select(pipes_tbl.c.pipe_id).where( pipes_tbl.c.connector_keys == pipe.connector_keys ).where( pipes_tbl.c.metric_key == pipe.metric_key ).where( (pipes_tbl.c.location_key == pipe.location_key) if pipe.location_key is not None else pipes_tbl.c.location_key.is_(None) ) _id = self.value(query, debug=debug, silent=pipe.temporary) if _id is not None: _id = int(_id) return _id
def get_pipe_metadef(self, pipe: Pipe, params: Optional[Dict[str, Any]] = None, begin: Union[datetime.datetime, str, None] = '', end: Union[datetime.datetime, str, None] = None, debug: bool = False, **kw: Any) ‑> Union[str, None]
-
Return a pipe's meta definition fetch query (definition with
params: Optional[Dict[str, Any]], default None Optional params dictionary to build the
WHERE
clause. Seebuild_where()
.begin: Union[datetime.datetime, str, None], default None Most recent datatime to search for data. If
backtrack_minutes
is provided, subtractbacktrack_minutes
.end: Union[datetime.datetime, str, None], default None The latest datetime to search for data. If
end
isNone
, do not bounddebug: bool, default False Verbosity toggle.
Expand source code
def get_pipe_metadef( self, pipe: meerschaum.Pipe, params: Optional[Dict[str, Any]] = None, begin: Union[datetime.datetime, str, None] = '', end: Union[datetime.datetime, str, None] = None, debug: bool = False, **kw: Any ) -> Union[str, None]: """ Return a pipe's meta definition fetch query (definition with params: Optional[Dict[str, Any]], default None Optional params dictionary to build the `WHERE` clause. See `meerschaum.utils.sql.build_where`. begin: Union[datetime.datetime, str, None], default None Most recent datatime to search for data. If `backtrack_minutes` is provided, subtract `backtrack_minutes`. end: Union[datetime.datetime, str, None], default None The latest datetime to search for data. If `end` is `None`, do not bound debug: bool, default False Verbosity toggle. """ import datetime from meerschaum.utils.debug import dprint from meerschaum.utils.warnings import warn, error from meerschaum.utils.sql import sql_item_name, dateadd_str, build_where from meerschaum.utils.misc import is_int from meerschaum.config import get_config definition = get_pipe_query(pipe) btm = self.get_pipe_backtrack_minutes(pipe) if not pipe.columns.get('datetime', None): _dt = pipe.guess_datetime() dt_name = sql_item_name(_dt, self.flavor) if _dt else None is_guess = True else: _dt = pipe.get_columns('datetime') dt_name = sql_item_name(_dt, self.flavor) is_guess = False if begin is not None or end is not None: if is_guess: if _dt is None: warn( f"Unable to determine a datetime column for {pipe}." + "\n Ignoring begin and end...", stack = False, ) begin, end = '', None else: warn( f"A datetime wasn't specified for {pipe}.\n" + f" Using column \"{_dt}\" for datetime bounds...", stack = False ) if 'order by' in definition.lower() and 'over' not in definition.lower(): error("Cannot fetch with an ORDER clause in the definition") begin = ( begin if not (isinstance(begin, str) and begin == '') else pipe.get_sync_time(debug=debug) ) da = None if dt_name: ### default: do not backtrack begin_da = dateadd_str( flavor=self.flavor, datepart='minute', number=(-1 * btm), begin=begin, ) if begin else None end_da = dateadd_str( flavor=self.flavor, datepart='minute', number=1, begin=end, ) if end else None meta_def = ( _simple_fetch_query(pipe) if ( (not (pipe.columns or {}).get('id', None)) or (not get_config('system', 'experimental', 'join_fetch')) ) else _join_fetch_query(pipe, debug=debug, **kw) ) has_where = 'where' in meta_def.lower()[meta_def.lower().rfind('definition'):] if dt_name and (begin_da or end_da): definition_dt_name = ( dateadd_str(self.flavor, 'minute', 0, f"definition.{dt_name}") if not is_int((begin_da or end_da)) else f"definition.{dt_name}" ) meta_def += "\n" + ("AND" if has_where else "WHERE") + " " has_where = True if begin_da: meta_def += f"{definition_dt_name} >= {begin_da}" if begin_da and end_da: meta_def += " AND " if end_da: meta_def += f"{definition_dt_name} < {end_da}" if params is not None: params_where = build_where(params, self, with_where=False) meta_def += "\n" + ("AND" if has_where else "WHERE") + " " has_where = True meta_def += params_where return meta_def
def get_pipe_rowcount(self, pipe: Pipe, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, remote: bool = False, params: Optional[Dict[str, Any]] = None, debug: bool = False) ‑> int
-
Get the rowcount for a pipe in accordance with given parameters.
Parameters
pipe
:Pipe
- The pipe to query with.
begin
:Optional[datetime.datetime]
, defaultNone
- The beginning datetime value.
end
:Optional[datetime.datetime]
, defaultNone
- The beginning datetime value.
remote
:bool
, defaultFalse
- If
True
, get the rowcount for the remote table. params
:Optional[Dict[str, Any]]
, defaultNone
- See
build_where()
. debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
An
int
for the number of rows if thepipe
exists, otherwiseNone
.Expand source code
def get_pipe_rowcount( self, pipe: meerschaum.Pipe, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, remote: bool = False, params: Optional[Dict[str, Any]] = None, debug: bool = False ) -> int: """ Get the rowcount for a pipe in accordance with given parameters. Parameters ---------- pipe: meerschaum.Pipe The pipe to query with. begin: Optional[datetime.datetime], default None The beginning datetime value. end: Optional[datetime.datetime], default None The beginning datetime value. remote: bool, default False If `True`, get the rowcount for the remote table. params: Optional[Dict[str, Any]], default None See `meerschaum.utils.sql.build_where`. debug: bool, default False Verbosity toggle. Returns ------- An `int` for the number of rows if the `pipe` exists, otherwise `None`. """ from meerschaum.utils.sql import dateadd_str, sql_item_name from meerschaum.utils.warnings import error, warn from meerschaum.connectors.sql._fetch import get_pipe_query if remote: msg = f"'fetch:definition' must be an attribute of {pipe} to get a remote rowcount." if 'fetch' not in pipe.parameters: error(msg) return None if 'definition' not in pipe.parameters['fetch']: error(msg) return None _pipe_name = sql_item_name(pipe.target, self.flavor) if not pipe.columns.get('datetime', None): _dt = pipe.guess_datetime() dt = sql_item_name(_dt, self.flavor) if _dt else None is_guess = True else: _dt = pipe.get_columns('datetime') dt = sql_item_name(_dt, self.flavor) is_guess = False if begin is not None or end is not None: if is_guess: if _dt is None: warn( f"No datetime could be determined for {pipe}." + "\n Ignoring begin and end...", stack = False, ) begin, end = None, None else: warn( f"A datetime wasn't specified for {pipe}.\n" + f" Using column \"{_dt}\" for datetime bounds...", stack = False, ) _datetime_name = sql_item_name( _dt, pipe.instance_connector.flavor if not remote else pipe.connector.flavor ) _cols_names = [ sql_item_name(col, pipe.instance_connector.flavor if not remote else pipe.connector.flavor) for col in set( ([_dt] if _dt else []) + ([] if params is None else list(params.keys())) ) ] if not _cols_names: _cols_names = ['*'] src = ( f"SELECT {', '.join(_cols_names)} FROM {_pipe_name}" if not remote else get_pipe_query(pipe) ) query = ( f""" WITH src AS ({src}) SELECT COUNT(*) FROM src """ ) if self.flavor not in ('mysql', 'mariadb') else ( f""" SELECT COUNT(*) FROM ({src}) AS src """ ) if begin is not None or end is not None: query += "WHERE" if begin is not None: query += f""" {dt} >= {dateadd_str(self.flavor, datepart='minute', number=0, begin=begin)} """ if end is not None and begin is not None: query += "AND" if end is not None: query += f""" {dt} < {dateadd_str(self.flavor, datepart='minute', number=0, begin=end)} """ if params is not None: from meerschaum.utils.sql import build_where existing_cols = pipe.get_columns_types(debug=debug) valid_params = {k: v for k, v in params.items() if k in existing_cols} if valid_params: query += build_where(valid_params, self).replace('WHERE', ( 'AND' if (begin is not None or end is not None) else 'WHERE' ) ) result = self.value(query, debug=debug, silent=True) try: return int(result) except Exception as e: return None
def get_pipe_table(self, pipe: Pipe, debug: bool = False) ‑> sqlalchemy.Table
-
Return the
sqlalchemy.Table
object for aPipe
.Parameters
pipe
:meerschaum.Pipe:
- The pipe in question.
Returns
A
sqlalchemy.Table
object.Expand source code
def get_pipe_table( self, pipe: meerschaum.Pipe, debug: bool = False, ) -> sqlalchemy.Table: """ Return the `sqlalchemy.Table` object for a `meerschaum.Pipe`. Parameters ---------- pipe: meerschaum.Pipe: The pipe in question. Returns ------- A `sqlalchemy.Table` object. """ from meerschaum.utils.sql import get_sqlalchemy_table if not pipe.exists(debug=debug): return None return get_sqlalchemy_table(pipe.target, connector=self, debug=debug, refresh=True)
def get_plugin_attributes(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False) ‑> Dict[str, Any]
-
Return the attributes of a plugin.
Expand source code
def get_plugin_attributes( self, plugin: 'meerschaum.core.Plugin', debug: bool = False ) -> Dict[str, Any]: """ Return the attributes of a plugin. """ ### ensure plugins table exists from meerschaum.connectors.sql.tables import get_tables import json plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') query = ( sqlalchemy .select(plugins_tbl.c.attributes) .where(plugins_tbl.c.plugin_name == plugin.name) ) _attr = self.value(query, debug=debug) if isinstance(_attr, str): _attr = json.loads(_attr) elif _attr is None: _attr = {} return _attr
def get_plugin_id(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False) ‑> Optional[int]
-
Return a plugin's ID.
Expand source code
def get_plugin_id( self, plugin: 'meerschaum.core.Plugin', debug: bool = False ) -> Optional[int]: """ Return a plugin's ID. """ ### ensure plugins table exists from meerschaum.connectors.sql.tables import get_tables plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') query = ( sqlalchemy .select(plugins_tbl.c.plugin_id) .where(plugins_tbl.c.plugin_name == plugin.name) ) try: return int(self.value(query, debug=debug)) except Exception as e: return None
def get_plugin_user_id(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False) ‑> Optional[int]
-
Return a plugin's user ID.
Expand source code
def get_plugin_user_id( self, plugin: 'meerschaum.core.Plugin', debug: bool = False ) -> Optional[int]: """ Return a plugin's user ID. """ ### ensure plugins table exists from meerschaum.connectors.sql.tables import get_tables plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') query = ( sqlalchemy .select(plugins_tbl.c.user_id) .where(plugins_tbl.c.plugin_name == plugin.name) ) try: return int(self.value(query, debug=debug)) except Exception as e: return None
def get_plugin_username(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False) ‑> Optional[str]
-
Return the username of a plugin's owner.
Expand source code
def get_plugin_username( self, plugin: 'meerschaum.core.Plugin', debug: bool = False ) -> Optional[str]: """ Return the username of a plugin's owner. """ ### ensure plugins table exists from meerschaum.connectors.sql.tables import get_tables plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] users = get_tables(mrsm_instance=self, debug=debug)['users'] from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') query = ( sqlalchemy.select(users.c.username) .where( users.c.user_id == plugins_tbl.c.user_id and plugins_tbl.c.plugin_name == plugin.name ) ) return self.value(query, debug=debug)
def get_plugin_version(self, plugin: "'meerschaum.core.Plugin'", debug: bool = False) ‑> Optional[str]
-
Return a plugin's version.
Expand source code
def get_plugin_version( self, plugin: 'meerschaum.core.Plugin', debug: bool = False ) -> Optional[str]: """ Return a plugin's version. """ ### ensure plugins table exists from meerschaum.connectors.sql.tables import get_tables plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') query = sqlalchemy.select(plugins_tbl.c.version).where(plugins_tbl.c.plugin_name == plugin.name) return self.value(query, debug=debug)
def get_plugins(self, user_id: Optional[int] = None, search_term: Optional[str] = None, debug: bool = False, **kw: Any) ‑> List[str]
-
Return a list of all registered plugins.
Parameters
user_id
:Optional[int]
, defaultNone
- If specified, filter plugins by a specific
user_id
. search_term
:Optional[str]
, defaultNone
- If specified, add a
WHERE plugin_name LIKE '{search_term}%'
clause to filter the plugins.
Returns
A list of plugin names.
Expand source code
def get_plugins( self, user_id: Optional[int] = None, search_term: Optional[str] = None, debug: bool = False, **kw: Any ) -> List[str]: """ Return a list of all registered plugins. Parameters ---------- user_id: Optional[int], default None If specified, filter plugins by a specific `user_id`. search_term: Optional[str], default None If specified, add a `WHERE plugin_name LIKE '{search_term}%'` clause to filter the plugins. Returns ------- A list of plugin names. """ ### ensure plugins table exists from meerschaum.connectors.sql.tables import get_tables plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') query = sqlalchemy.select(plugins_tbl.c.plugin_name) if user_id is not None: query = query.where(plugins_tbl.c.user_id == user_id) if search_term is not None: query = query.where(plugins_tbl.c.plugin_name.like(search_term + '%')) return [row[0] for row in self.execute(query).fetchall()]
def get_sync_time(self, pipe: "'Pipe'", params: Optional[Dict[str, Any]] = None, newest: bool = True, round_down: bool = True, debug: bool = False) ‑> 'datetime.datetime'
-
Get a Pipe's most recent datetime.
Parameters
pipe
:Pipe
- The pipe to get the sync time for.
params
:Optional[Dict[str, Any]]
, defaultNone
- Optional params dictionary to build the
WHERE
clause. Seebuild_where()
. newest
:bool
, defaultTrue
- If
True
, get the most recent datetime (honoringparams
). IfFalse
, get the oldest datetime (ASC instead of DESC). round_down
:bool
, defaultTrue
- If
True
, round the resulting datetime value down to the nearest minute. Defaults toTrue
.
Returns
A
datetime.datetime
object if the pipe exists, otherwiseNone
.Expand source code
def get_sync_time( self, pipe: 'meerschaum.Pipe', params: Optional[Dict[str, Any]] = None, newest: bool = True, round_down: bool = True, debug: bool = False, ) -> 'datetime.datetime': """Get a Pipe's most recent datetime. Parameters ---------- pipe: meerschaum.Pipe The pipe to get the sync time for. params: Optional[Dict[str, Any]], default None Optional params dictionary to build the `WHERE` clause. See `meerschaum.utils.sql.build_where`. newest: bool, default True If `True`, get the most recent datetime (honoring `params`). If `False`, get the oldest datetime (ASC instead of DESC). round_down: bool, default True If `True`, round the resulting datetime value down to the nearest minute. Defaults to `True`. Returns ------- A `datetime.datetime` object if the pipe exists, otherwise `None`. """ from meerschaum.utils.sql import sql_item_name, build_where from meerschaum.utils.warnings import warn import datetime table = sql_item_name(pipe.target, self.flavor) dt_col = pipe.columns.get('datetime', None) dt_type = pipe.dtypes.get(dt_col, 'datetime64[ns]') if not dt_col: _dt = pipe.guess_datetime() dt = sql_item_name(_dt, self.flavor) if _dt else None is_guess = True else: _dt = dt_col dt = sql_item_name(_dt, self.flavor) is_guess = False if _dt is None: return None ASC_or_DESC = "DESC" if newest else "ASC" existing_cols = pipe.get_columns_types(debug=debug) valid_params = {} if params is not None: valid_params = {k: v for k, v in params.items() if k in existing_cols} ### If no bounds are provided for the datetime column, ### add IS NOT NULL to the WHERE clause. if _dt not in valid_params: valid_params[_dt] = '_None' where = "" if not valid_params else build_where(valid_params, self) q = f"SELECT {dt}\nFROM {table}{where}\nORDER BY {dt} {ASC_or_DESC}\nLIMIT 1" if self.flavor == 'mssql': q = f"SELECT TOP 1 {dt}\nFROM {table}{where}\nORDER BY {dt} {ASC_or_DESC}" elif self.flavor == 'oracle': q = ( "SELECT * FROM (\n" + f" SELECT {dt}\nFROM {table}{where}\n ORDER BY {dt} {ASC_or_DESC}\n" + ") WHERE ROWNUM = 1" ) try: from meerschaum.utils.misc import round_time import datetime db_time = self.value(q, silent=True, debug=debug) ### No datetime could be found. if db_time is None: return None ### sqlite returns str. if isinstance(db_time, str): from meerschaum.utils.packages import attempt_import dateutil_parser = attempt_import('dateutil.parser') st = dateutil_parser.parse(db_time) ### Do nothing if a datetime object is returned. elif isinstance(db_time, datetime.datetime): if hasattr(db_time, 'to_pydatetime'): st = db_time.to_pydatetime() else: st = db_time ### Sometimes the datetime is actually a date. elif isinstance(db_time, datetime.date): st = datetime.datetime.combine(db_time, datetime.datetime.min.time()) ### Adding support for an integer datetime axis. elif 'int' in str(type(db_time)).lower(): st = int(db_time) ### Convert pandas timestamp to Python datetime. else: st = db_time.to_pydatetime() ### round down to smooth timestamp sync_time = ( round_time(st, date_delta=datetime.timedelta(minutes=1), to='down') if round_down else st ) if not isinstance(st, int) else st except Exception as e: sync_time = None warn(str(e)) return sync_time
def get_to_sql_dtype(self, pipe: "'Pipe'", df: "'pd.DataFrame'", update_dtypes: bool = True) ‑> Dict[str, 'sqlalchemy.sql.visitors.TraversibleType']
-
Given a pipe and DataFrame, return the
dtype
dictionary forto_sql()
.Parameters
pipe
:Pipe
- The pipe which may contain a
dtypes
parameter. df
:pd.DataFrame
- The DataFrame to be pushed via
to_sql()
. update_dtypes
:bool
, defaultTrue
- If
True
, patch the pipe's dtypes onto the DataFrame's dtypes.
Returns
A dictionary with
sqlalchemy
datatypes.Examples
>>> import pandas as pd >>> import meerschaum as mrsm >>> >>> conn = mrsm.get_connector('sql:memory') >>> df = pd.DataFrame([{'a': {'b': 1}}]) >>> pipe = mrsm.Pipe('a', 'b', dtypes={'a': 'json'}) >>> get_to_sql_dtype(pipe, df) {'a': <class 'sqlalchemy.sql.sqltypes.JSON'>}
Expand source code
def get_to_sql_dtype( self, pipe: 'meerschaum.Pipe', df: 'pd.DataFrame', update_dtypes: bool = True, ) -> Dict[str, 'sqlalchemy.sql.visitors.TraversibleType']: """ Given a pipe and DataFrame, return the `dtype` dictionary for `to_sql()`. Parameters ---------- pipe: meerschaum.Pipe The pipe which may contain a `dtypes` parameter. df: pd.DataFrame The DataFrame to be pushed via `to_sql()`. update_dtypes: bool, default True If `True`, patch the pipe's dtypes onto the DataFrame's dtypes. Returns ------- A dictionary with `sqlalchemy` datatypes. Examples -------- >>> import pandas as pd >>> import meerschaum as mrsm >>> >>> conn = mrsm.get_connector('sql:memory') >>> df = pd.DataFrame([{'a': {'b': 1}}]) >>> pipe = mrsm.Pipe('a', 'b', dtypes={'a': 'json'}) >>> get_to_sql_dtype(pipe, df) {'a': <class 'sqlalchemy.sql.sqltypes.JSON'>} """ from meerschaum.utils.misc import get_json_cols from meerschaum.utils.sql import get_db_type df_dtypes = {col: str(typ) for col, typ in df.dtypes.items()} json_cols = get_json_cols(df) df_dtypes.update({col: 'json' for col in json_cols}) if update_dtypes: df_dtypes.update(pipe.dtypes) return { col: get_db_type(typ, self.flavor, as_sqlalchemy=True) for col, typ in df_dtypes.items() }
def get_user_attributes(self, user: meerschaum.core.User, debug: bool = False) ‑> Union[Dict[str, Any], None]
-
Return the user's attributes.
Expand source code
def get_user_attributes( self, user: meerschaum.core.User, debug: bool = False ) -> Union[Dict[str, Any], None]: """ Return the user's attributes. """ ### ensure users table exists from meerschaum.utils.warnings import warn from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') from meerschaum.connectors.sql.tables import get_tables users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) query = ( sqlalchemy.select(users_tbl.c.attributes) .where(users_tbl.c.user_id == user_id) ) result = self.value(query, debug=debug) if result is not None and not isinstance(result, dict): try: result = dict(result) _parsed = True except Exception as e: _parsed = False if not _parsed: try: import json result = json.loads(result) _parsed = True except Exception as e: _parsed = False if not _parsed: warn(f"Received unexpected type for attributes: {result}") return result
def get_user_id(self, user: meerschaum.core.User, debug: bool = False) ‑> Optional[int]
-
If a user is registered, return the
user_id
.Expand source code
def get_user_id( self, user: meerschaum.core.User, debug : bool = False ) -> Optional[int]: """If a user is registered, return the `user_id`.""" ### ensure users table exists from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') from meerschaum.connectors.sql.tables import get_tables users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] query = ( sqlalchemy.select(users_tbl.c.user_id) .where(users_tbl.c.username == user.username) ) result = self.value(query, debug=debug) if result is not None: return int(result) return None
def get_user_password_hash(self, user: meerschaum.core.User, debug: bool = False, **kw: Any) ‑> Optional[str]
-
Return the password has for a user. NOTE: This may be dangerous and is only allowed if the security settings explicity allow it.
Expand source code
def get_user_password_hash( self, user: meerschaum.core.User, debug: bool = False, **kw: Any ) -> Optional[str]: """ Return the password has for a user. **NOTE**: This may be dangerous and is only allowed if the security settings explicity allow it. """ from meerschaum.utils.debug import dprint from meerschaum.connectors.sql.tables import get_tables users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') if user.user_id is not None: user_id = user.user_id if debug: dprint(f"Already given user_id: {user_id}") else: if debug: dprint(f"Fetching user_id...") user_id = self.get_user_id(user, debug=debug) if user_id is None: return None query = sqlalchemy.select(users_tbl.c.password_hash).where(users_tbl.c.user_id == user_id) return self.value(query, debug=debug)
def get_user_type(self, user: meerschaum.core.User, debug: bool = False, **kw: Any) ‑> Optional[str]
-
Return the user's type.
Expand source code
def get_user_type( self, user: meerschaum.core.User, debug: bool = False, **kw: Any ) -> Optional[str]: """ Return the user's type. """ from meerschaum.connectors.sql.tables import get_tables users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) if user_id is None: return None query = sqlalchemy.select(users_tbl.c.user_type).where(users_tbl.c.user_id == user_id) return self.value(query, debug=debug)
def get_users(self, debug: bool = False, **kw: Any) ‑> List[str]
-
Get the registered usernames.
Expand source code
def get_users( self, debug: bool = False, **kw: Any ) -> List[str]: """ Get the registered usernames. """ ### ensure users table exists from meerschaum.connectors.sql.tables import get_tables users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') query = sqlalchemy.select(users_tbl.c.username) return list(self.read(query, debug=debug)['username'])
def pipe_exists(self, pipe: Pipe, debug: bool = False) ‑> bool
-
Check that a Pipe's table exists.
Parameters
pipe
:meerschaum.Pipe:
- The pipe to check.
debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A
bool
corresponding to whether a pipe's table exists.Expand source code
def pipe_exists( self, pipe: meerschaum.Pipe, debug: bool = False ) -> bool: """ Check that a Pipe's table exists. Parameters ---------- pipe: meerschaum.Pipe: The pipe to check. debug: bool, default False Verbosity toggle. Returns ------- A `bool` corresponding to whether a pipe's table exists. """ from meerschaum.utils.sql import table_exists exists = table_exists(pipe.target, self, debug=debug) if debug: from meerschaum.utils.debug import dprint dprint(f"{pipe} " + ('exists.' if exists else 'does not exist.')) return exists
def read(self, query_or_table: Union[str, sqlalchemy.Query], params: Optional[Dict[str, Any], List[str]] = None, dtype: Optional[Dict[str, Any]] = None, chunksize: Optional[int] = -1, workers: Optional[int] = None, chunk_hook: Optional[Callable[[pandas.DataFrame], Any]] = None, as_hook_results: bool = False, chunks: Optional[int] = None, as_chunks: bool = False, as_iterator: bool = False, silent: bool = False, debug: bool = False, **kw: Any) ‑> Union[pandas.DataFrame, List[pandas.DataFrame], List[Any], None]
-
Read a SQL query or table into a pandas dataframe.
Parameters
query_or_table
:Union[str, sqlalchemy.Query]
- The SQL query (sqlalchemy Query or string) or name of the table from which to select.
params
:Optional[Dict[str, Any]]
, defaultNone
List
orDict
of parameters to pass topandas.read_sql()
. See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.htmldtype
:Optional[Dict[str, Any]]
, defaultNone
- A dictionary of data types to pass to
pandas.read_sql()
. See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html chunksize
:Optional[int]
, default-1
-
How many chunks to read at a time.
None
will read everything in one large chunk. Defaults to system configuration.NOTE: DuckDB does not allow for chunking.
workers
:Optional[int]
, defaultNone
- How many threads to use when consuming the generator.
Only applies if
chunk_hook
is provided. chunk_hook
:Optional[Callable[[pandas.DataFrame], Any]]
, defaultNone
- Hook function to execute once per chunk, e.g. writing and reading chunks intermittently.
See
--sync-chunks
for an example. NOTE:as_iterator
MUST be False (default). as_hook_results
:bool
, defaultFalse
-
If
True
, return aList
of the outputs of the hook function. Only applicable ifchunk_hook
is not None.NOTE:
as_iterator
MUST beFalse
(default). chunks
:Optional[int]
, defaultNone
- Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and
return into a single dataframe.
For example, to limit the returned dataframe to 100,000 rows,
you could specify a
chunksize
of1000
andchunks
of100
. as_chunks
:bool
, defaultFalse
- If
True
, return a list of DataFrames. Otherwise return a single DataFrame. Defaults toFalse
. as_iterator
:bool
, defaultFalse
- If
True
, return the pandas DataFrame iterator.chunksize
must not beNone
(falls back to 1000 if so), and hooks are not called in this case. Defaults toFalse
. silent
:bool
, defaultFalse
- If
True
, don't raise warnings in case of errors. Defaults toFalse
.
Returns
A
pd.DataFrame
(default case), or an iterator, or a list of dataframes / iterators, orNone
if something breaks.Expand source code
def read( self, query_or_table: Union[str, sqlalchemy.Query], params: Optional[Dict[str, Any], List[str]] = None, dtype: Optional[Dict[str, Any]] = None, chunksize: Optional[int] = -1, workers: Optional[int] = None, chunk_hook: Optional[Callable[[pandas.DataFrame], Any]] = None, as_hook_results: bool = False, chunks: Optional[int] = None, as_chunks: bool = False, as_iterator: bool = False, silent: bool = False, debug: bool = False, **kw: Any ) -> Union[ pandas.DataFrame, List[pandas.DataFrame], List[Any], None, ]: """ Read a SQL query or table into a pandas dataframe. Parameters ---------- query_or_table: Union[str, sqlalchemy.Query] The SQL query (sqlalchemy Query or string) or name of the table from which to select. params: Optional[Dict[str, Any]], default None `List` or `Dict` of parameters to pass to `pandas.read_sql()`. See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html dtype: Optional[Dict[str, Any]], default None A dictionary of data types to pass to `pandas.read_sql()`. See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html chunksize: Optional[int], default -1 How many chunks to read at a time. `None` will read everything in one large chunk. Defaults to system configuration. **NOTE:** DuckDB does not allow for chunking. workers: Optional[int], default None How many threads to use when consuming the generator. Only applies if `chunk_hook` is provided. chunk_hook: Optional[Callable[[pandas.DataFrame], Any]], default None Hook function to execute once per chunk, e.g. writing and reading chunks intermittently. See `--sync-chunks` for an example. **NOTE:** `as_iterator` MUST be False (default). as_hook_results: bool, default False If `True`, return a `List` of the outputs of the hook function. Only applicable if `chunk_hook` is not None. **NOTE:** `as_iterator` MUST be `False` (default). chunks: Optional[int], default None Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and return into a single dataframe. For example, to limit the returned dataframe to 100,000 rows, you could specify a `chunksize` of `1000` and `chunks` of `100`. as_chunks: bool, default False If `True`, return a list of DataFrames. Otherwise return a single DataFrame. Defaults to `False`. as_iterator: bool, default False If `True`, return the pandas DataFrame iterator. `chunksize` must not be `None` (falls back to 1000 if so), and hooks are not called in this case. Defaults to `False`. silent: bool, default False If `True`, don't raise warnings in case of errors. Defaults to `False`. Returns ------- A `pd.DataFrame` (default case), or an iterator, or a list of dataframes / iterators, or `None` if something breaks. """ if chunks is not None and chunks <= 0: return [] from meerschaum.utils.sql import sql_item_name, truncate_item_name from meerschaum.utils.packages import attempt_import, import_pandas from meerschaum.utils.pool import get_pool import warnings import inspect import traceback pd = import_pandas() sqlalchemy = attempt_import("sqlalchemy") default_chunksize = self._sys_config.get('chunksize', None) chunksize = chunksize if chunksize != -1 else default_chunksize if chunksize is None and as_iterator: if not silent and self.flavor not in _disallow_chunks_flavors: warn( f"An iterator may only be generated if chunksize is not None.\n" + "Falling back to a chunksize of 1000.", stacklevel=3, ) chunksize = 1000 if chunksize is not None and self.flavor in _max_chunks_flavors: if chunksize > _max_chunks_flavors[self.flavor]: if chunksize != default_chunksize: warn( f"The specified chunksize of {chunksize} exceeds the maximum of " + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n" + f" Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.", stacklevel = 3, ) chunksize = _max_chunks_flavors[self.flavor] ### NOTE: A bug in duckdb_engine does not allow for chunks. if chunksize is not None and self.flavor in _disallow_chunks_flavors: chunksize = None if debug: import time start = time.perf_counter() dprint(query_or_table) dprint(f"Fetching with chunksize: {chunksize}") ### This might be sqlalchemy object or the string of a table name. ### We check for spaces and quotes to see if it might be a weird table. if ( ' ' not in str(query_or_table) or ( ' ' in str(query_or_table) and str(query_or_table).startswith('"') and str(query_or_table).endswith('"') ) ): truncated_table_name = truncate_item_name(str(query_or_table), self.flavor) if truncated_table_name != str(query_or_table) and not silent: warn( f"Table '{name}' is too long for '{self.flavor}'," + f" will instead create the table '{truncated_name}'." ) query_or_table = sql_item_name(str(query_or_table), self.flavor) if debug: dprint(f"Reading from table {query_or_table}") formatted_query = sqlalchemy.text("SELECT * FROM " + str(query_or_table)) else: try: formatted_query = sqlalchemy.text(query_or_table) except Exception as e: formatted_query = query_or_table chunk_list = [] chunk_hook_results = [] try: stream_results = not as_iterator and chunk_hook is not None and chunksize is not None with warnings.catch_warnings(): warnings.filterwarnings('ignore', 'case sensitivity issues') with self.engine.begin() as transaction: with transaction.execution_options(stream_results=stream_results) as connection: chunk_generator = pd.read_sql_query( formatted_query, connection, params = params, chunksize = chunksize, dtype = dtype, ) ### `stream_results` must be False (will load everything into memory). if as_iterator or chunksize is None: return chunk_generator ### We must consume the generator in this context if using server-side cursors. if stream_results: pool = get_pool(workers=workers) def _process_chunk(_chunk, _retry_on_failure: bool = True): if not as_hook_results: chunk_list.append(_chunk) result = None if chunk_hook is not None: try: result = chunk_hook( _chunk, workers = workers, chunksize = chunksize, debug = debug, **kw ) except Exception as e: result = False, traceback.format_exc() from meerschaum.utils.formatting import get_console get_console().print_exception() ### If the chunk fails to process, try it again one more time. if isinstance(result, tuple) and result[0] is False: if _retry_on_failure: return _process_chunk(_chunk, _retry_on_failure=False) return result chunk_hook_results = list(pool.imap(_process_chunk, chunk_generator)) if as_hook_results: return chunk_hook_results except Exception as e: if debug: dprint(f"[{self}] Failed to execute query:\n\n{query_or_table}\n\n") if not silent: warn(str(e), stacklevel=3) from meerschaum.utils.formatting import get_console get_console().print_exception() return None read_chunks = 0 try: for chunk in chunk_generator: if chunk_hook is not None: chunk_hook_results.append( chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw) ) chunk_list.append(chunk) read_chunks += 1 if chunks is not None and read_chunks >= chunks: break except Exception as e: warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3) from meerschaum.utils.formatting import get_console get_console().print_exception() return None ### If no chunks returned, read without chunks ### to get columns if len(chunk_list) == 0: with warnings.catch_warnings(): warnings.filterwarnings('ignore', 'case sensitivity issues') with self.engine.begin() as connection: chunk_list.append( pd.read_sql_query( formatted_query, connection, params = params, dtype = dtype, ) ) ### call the hook on any missed chunks. if chunk_hook is not None and len(chunk_list) > len(chunk_hook_results): for c in chunk_list[len(chunk_hook_results):]: chunk_hook_results.append( chunk_hook(c, chunksize=chunksize, debug=debug, **kw) ) ### chunksize is not None so must iterate if debug: end = time.perf_counter() dprint(f"Fetched {len(chunk_list)} chunks in {round(end - start, 2)} seconds.") if as_hook_results: return chunk_hook_results ### Skip `pd.concat()` if `as_chunks` is specified. if as_chunks: for c in chunk_list: c.reset_index(drop=True, inplace=True) return chunk_list return pd.concat(chunk_list).reset_index(drop=True)
def register_pipe(self, pipe: Pipe, debug: bool = False) ‑> SuccessTuple
-
Register a new pipe. A pipe's attributes must be set before registering.
Expand source code
def register_pipe( self, pipe: meerschaum.Pipe, debug: bool = False, ) -> SuccessTuple: """ Register a new pipe. A pipe's attributes must be set before registering. """ from meerschaum.utils.debug import dprint from meerschaum.utils.packages import attempt_import from meerschaum.utils.sql import json_flavors ### ensure pipes table exists from meerschaum.connectors.sql.tables import get_tables pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] if pipe.get_id(debug=debug) is not None: return False, f"{pipe} is already registered." ### NOTE: if `parameters` is supplied in the Pipe constructor, ### then `pipe.parameters` will exist and not be fetched from the database. ### 1. Prioritize the Pipe object's `parameters` first. ### E.g. if the user manually sets the `parameters` property ### or if the Pipe already exists ### (which shouldn't be able to be registered anyway but that's an issue for later). parameters = None try: parameters = pipe.parameters except Exception as e: if debug: dprint(str(e)) parameters = None ### ensure `parameters` is a dictionary if parameters is None: parameters = {} import json sqlalchemy = attempt_import('sqlalchemy') values = { 'connector_keys' : pipe.connector_keys, 'metric_key' : pipe.metric_key, 'location_key' : pipe.location_key, 'parameters' : ( json.dumps(parameters) if self.flavor not in json_flavors else parameters ), } query = sqlalchemy.insert(pipes_tbl).values(**values) result = self.exec(query, debug=debug) if result is None: return False, f"Failed to register {pipe}." return True, f"Successfully registered {pipe}."
def register_plugin(self, plugin: "'meerschaum.core.Plugin'", force: bool = False, debug: bool = False, **kw: Any) ‑> SuccessTuple
-
Register a new plugin to the plugins table.
Expand source code
def register_plugin( self, plugin: 'meerschaum.core.Plugin', force: bool = False, debug: bool = False, **kw: Any ) -> SuccessTuple: """Register a new plugin to the plugins table.""" from meerschaum.utils.warnings import warn, error from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') from meerschaum.utils.sql import json_flavors from meerschaum.connectors.sql.tables import get_tables plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] old_id = self.get_plugin_id(plugin, debug=debug) ### Check for version conflict. May be overridden with `--force`. if old_id is not None and not force: old_version = self.get_plugin_version(plugin, debug=debug) new_version = plugin.version if old_version is None: old_version = '' if new_version is None: new_version = '' ### verify that the new version is greater than the old packaging_version = attempt_import('packaging.version') if ( old_version and new_version and packaging_version.parse(old_version) >= packaging_version.parse(new_version) ): return False, ( f"Version '{new_version}' of plugin '{plugin}' " + f"must be greater than existing version '{old_version}'." ) import json bind_variables = { 'plugin_name' : plugin.name, 'version' : plugin.version, 'attributes' : ( json.dumps(plugin.attributes) if self.flavor not in json_flavors else plugin.attributes ), 'user_id' : plugin.user_id, } if old_id is None: query = sqlalchemy.insert(plugins_tbl).values(**bind_variables) else: query = ( sqlalchemy.update(plugins_tbl) .values(**bind_variables) .where(plugins_tbl.c.plugin_id == old_id) ) result = self.exec(query, debug=debug) if result is None: return False, f"Failed to register plugin '{plugin}'." return True, f"Successfully registered plugin '{plugin}'."
def register_user(self, user: meerschaum.core.User, debug: bool = False, **kw: Any) ‑> SuccessTuple
-
Register a new user.
Expand source code
def register_user( self, user: meerschaum.core.User, debug: bool = False, **kw: Any ) -> SuccessTuple: """Register a new user.""" from meerschaum.utils.warnings import warn, error, info from meerschaum.utils.packages import attempt_import from meerschaum.utils.sql import json_flavors sqlalchemy = attempt_import('sqlalchemy') valid_tuple = valid_username(user.username) if not valid_tuple[0]: return valid_tuple old_id = self.get_user_id(user, debug=debug) if old_id is not None: return False, f"User '{user}' already exists." ### ensure users table exists from meerschaum.connectors.sql.tables import get_tables tables = get_tables(mrsm_instance=self, debug=debug) import json bind_variables = { 'username' : user.username, 'email' : user.email, 'password_hash' : user.password_hash, 'user_type' : user.type, 'attributes' : ( json.dumps(user.attributes) if self.flavor not in json_flavors else user.attributes ), } if old_id is not None: return False, f"User '{user.username}' already exists." if old_id is None: query = ( sqlalchemy.insert(tables['users']). values(**bind_variables) ) result = self.exec(query, debug=debug) if result is None: return False, f"Failed to register user '{user}'." return True, f"Successfully registered user '{user}'."
def sync_pipe(self, pipe: Pipe, df: Union[pandas.DataFrame, str, Dict[Any, Any], None] = None, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, chunksize: Optional[int] = -1, check_existing: bool = True, blocking: bool = True, debug: bool = False, **kw: Any) ‑> SuccessTuple
-
Sync a pipe using a database connection.
Parameters
pipe
:Pipe
- The Meerschaum Pipe instance into which to sync the data.
df
:Union[pandas.DataFrame, str, Dict[Any, Any], List[Dict[str, Any]]]
- An optional DataFrame or equivalent to sync into the pipe.
Defaults to
None
. begin
:Optional[datetime.datetime]
, defaultNone
- Optionally specify the earliest datetime to search for data.
Defaults to
None
. end
:Optional[datetime.datetime]
, defaultNone
- Optionally specify the latest datetime to search for data.
Defaults to
None
. chunksize
:Optional[int]
, default-1
- Specify the number of rows to sync per chunk.
If
-1
, resort to system configuration (default is900
). Achunksize
ofNone
will sync all rows in one transaction. Defaults to-1
. check_existing
:bool
, defaultTrue
- If
True
, pull and diff with existing data from the pipe. Defaults toTrue
. blocking
:bool
, defaultTrue
- If
True
, wait for sync to finish and return its result, otherwise asyncronously sync. Defaults toTrue
. debug
:bool
, defaultFalse
- Verbosity toggle. Defaults to False.
kw
:Any
- Catch-all for keyword arguments.
Returns
A
SuccessTuple
of success (bool
) and message (str
).Expand source code
def sync_pipe( self, pipe: meerschaum.Pipe, df: Union[pandas.DataFrame, str, Dict[Any, Any], None] = None, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, chunksize: Optional[int] = -1, check_existing: bool = True, blocking: bool = True, debug: bool = False, **kw: Any ) -> SuccessTuple: """ Sync a pipe using a database connection. Parameters ---------- pipe: meerschaum.Pipe The Meerschaum Pipe instance into which to sync the data. df: Union[pandas.DataFrame, str, Dict[Any, Any], List[Dict[str, Any]]] An optional DataFrame or equivalent to sync into the pipe. Defaults to `None`. begin: Optional[datetime.datetime], default None Optionally specify the earliest datetime to search for data. Defaults to `None`. end: Optional[datetime.datetime], default None Optionally specify the latest datetime to search for data. Defaults to `None`. chunksize: Optional[int], default -1 Specify the number of rows to sync per chunk. If `-1`, resort to system configuration (default is `900`). A `chunksize` of `None` will sync all rows in one transaction. Defaults to `-1`. check_existing: bool, default True If `True`, pull and diff with existing data from the pipe. Defaults to `True`. blocking: bool, default True If `True`, wait for sync to finish and return its result, otherwise asyncronously sync. Defaults to `True`. debug: bool, default False Verbosity toggle. Defaults to False. kw: Any Catch-all for keyword arguments. Returns ------- A `SuccessTuple` of success (`bool`) and message (`str`). """ from meerschaum.utils.warnings import warn from meerschaum.utils.debug import dprint from meerschaum.utils.packages import import_pandas from meerschaum.utils.sql import get_update_queries, sql_item_name, json_flavors from meerschaum.utils.misc import generate_password, get_json_cols from meerschaum import Pipe import time import copy pd = import_pandas() if df is None: msg = f"DataFrame is None. Cannot sync {pipe}." warn(msg) return False, msg start = time.perf_counter() if not pipe.temporary and not pipe.get_id(debug=debug): register_tuple = pipe.register(debug=debug) if not register_tuple[0]: return register_tuple ### df is the dataframe returned from the remote source ### via the connector if debug: dprint("Fetched data:\n" + str(df)) if not isinstance(df, pd.DataFrame): df = pipe.enforce_dtypes(df, debug=debug) ### if table does not exist, create it with indices is_new = False add_cols_query = None if not pipe.exists(debug=debug): check_existing = False is_new = True else: ### Check for new columns. add_cols_queries = self.get_add_columns_queries(pipe, df, debug=debug) if add_cols_queries: if not self.exec_queries(add_cols_queries, debug=debug): warn(f"Failed to add new columns to {pipe}.") alter_cols_queries = self.get_alter_columns_queries(pipe, df, debug=debug) if alter_cols_queries: if not self.exec_queries(alter_cols_queries, debug=debug): warn(f"Failed to alter columns for {pipe}.") else: pipe.infer_dtypes(persist=True) unseen_df, update_df, delta_df = ( pipe.filter_existing( df, chunksize = chunksize, debug = debug, **kw ) if check_existing else (df, None, df) ) if debug: dprint("Delta data:\n" + str(delta_df)) dprint("Unseen data:\n" + str(unseen_df)) if update_df is not None: dprint("Update data:\n" + str(update_df)) if update_df is not None and not update_df.empty: transact_id = generate_password(3) temp_target = '_' + transact_id + '_' + pipe.target update_kw = copy.deepcopy(kw) update_kw.update({ 'name': temp_target, 'if_exists': 'append', 'chunksize': chunksize, 'dtype': self.get_to_sql_dtype(pipe, update_df, update_dtypes=False), 'debug': debug, }) self.to_sql(update_df, **update_kw) temp_pipe = Pipe( pipe.connector_keys + '_', pipe.metric_key, pipe.location_key, instance = pipe.instance_keys, columns = pipe.columns, target = temp_target, temporary = True, ) existing_cols = pipe.get_columns_types(debug=debug) join_cols = [ col for col_key, col in pipe.columns.items() if col and col_key != 'value' and col in existing_cols ] queries = get_update_queries( pipe.target, temp_target, self, join_cols, debug = debug ) success = all(self.exec_queries(queries, break_on_error=True, debug=debug)) drop_success, drop_msg = temp_pipe.drop(debug=debug) if not drop_success: warn(drop_msg) if not success: return False, f"Failed to apply update to {pipe}." if_exists = kw.get('if_exists', 'append') if 'if_exists' in kw: kw.pop('if_exists') if 'name' in kw: kw.pop('name') ### Account for first-time syncs of JSON columns. unseen_json_cols = get_json_cols(unseen_df) update_json_cols = get_json_cols(update_df) if update_df is not None else [] json_cols = list(set(unseen_json_cols + update_json_cols)) existing_json_cols = [col for col, typ in pipe.dtypes.items() if typ == 'json'] new_json_cols = [col for col in json_cols if col not in existing_json_cols] if new_json_cols: pipe.dtypes.update({col: 'json' for col in json_cols}) edit_success, edit_msg = pipe.edit(interactive=False, debug=debug) if not edit_success: warn(f"Unable to update JSON dtypes for {pipe}:\n{edit_msg}") ### Insert new data into Pipe's table. unseen_kw = copy.deepcopy(kw) unseen_kw.update({ 'name': pipe.target, 'if_exists': if_exists, 'debug': debug, 'as_dict': True, 'chunksize': chunksize, 'dtype': self.get_to_sql_dtype(pipe, unseen_df, update_dtypes=True), }) stats = self.to_sql(unseen_df, **unseen_kw) if is_new: if not self.create_indices(pipe, debug=debug): if debug: dprint(f"Failed to create indices for {pipe}. Continuing...") end = time.perf_counter() success = stats['success'] if not success: return success, stats['msg'] msg = ( f"Inserted {len(unseen_df)}, " + f"updated {len(update_df) if update_df is not None else 0} rows." ) if debug: msg = msg[:-1] + ( f"\non table {sql_item_name(pipe.target, self.flavor)}\n" + f"in {round(end-start, 2)} seconds." ) return success, msg
def sync_pipe_inplace(self, pipe: "'Pipe'", params: Optional[Dict[str, Any]] = None, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, chunksize: Optional[int] = -1, check_existing: bool = True, debug: bool = False, **kw: Any) ‑> SuccessTuple
-
If a pipe's connector is the same as its instance connector, it's more efficient to sync the pipe in-place rather than reading data into Pandas.
Parameters
pipe
:Pipe
- The pipe whose connector is the same as its instance.
params
:Optional[Dict[str, Any]]
, defaultNone
- Optional params dictionary to build the
WHERE
clause. Seebuild_where()
. begin
:Optional[datetime.datetime]
, defaultNone
- Optionally specify the earliest datetime to search for data.
Defaults to
None
. end
:Optional[datetime.datetime]
, defaultNone
- Optionally specify the latest datetime to search for data.
Defaults to
None
. chunksize
:Optional[int]
, default-1
- Specify the number of rows to sync per chunk.
If
-1
, resort to system configuration (default is900
). Achunksize
ofNone
will sync all rows in one transaction. Defaults to-1
. check_existing
:bool
, defaultTrue
- If
True
, pull and diff with existing data from the pipe. Defaults toTrue
. debug
:bool
, defaultFalse
- Verbosity toggle.
Returns
A SuccessTuple.
Expand source code
def sync_pipe_inplace( self, pipe: 'meerschaum.Pipe', params: Optional[Dict[str, Any]] = None, begin: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, chunksize: Optional[int] = -1, check_existing: bool = True, debug: bool = False, **kw: Any ) -> SuccessTuple: """ If a pipe's connector is the same as its instance connector, it's more efficient to sync the pipe in-place rather than reading data into Pandas. Parameters ---------- pipe: meerschaum.Pipe The pipe whose connector is the same as its instance. params: Optional[Dict[str, Any]], default None Optional params dictionary to build the `WHERE` clause. See `meerschaum.utils.sql.build_where`. begin: Optional[datetime.datetime], default None Optionally specify the earliest datetime to search for data. Defaults to `None`. end: Optional[datetime.datetime], default None Optionally specify the latest datetime to search for data. Defaults to `None`. chunksize: Optional[int], default -1 Specify the number of rows to sync per chunk. If `-1`, resort to system configuration (default is `900`). A `chunksize` of `None` will sync all rows in one transaction. Defaults to `-1`. check_existing: bool, default True If `True`, pull and diff with existing data from the pipe. Defaults to `True`. debug: bool, default False Verbosity toggle. Returns ------- A SuccessTuple. """ from meerschaum.utils.sql import ( sql_item_name, table_exists, get_sqlalchemy_table, get_pd_type, get_update_queries, get_null_replacement, ) from meerschaum.utils.misc import generate_password from meerschaum.utils.debug import dprint metadef = self.get_pipe_metadef( pipe, params = params, begin = begin, end = end, debug = debug, ) if self.flavor in ('mssql',): final_select_ix = metadef.lower().rfind('select') def_name = metadef[len('WITH '):].split(' ', maxsplit=1)[0] metadef = ( metadef[:final_select_ix].rstrip() + ',\n' + "metadef AS (\n" + metadef[final_select_ix:] + "\n)\n" ) pipe_name = sql_item_name(pipe.target, self.flavor) if not pipe.exists(debug=debug): if self.flavor in ('mssql',): create_pipe_query = metadef + f"SELECT *\nINTO {pipe_name}\nFROM metadef" elif self.flavor in ('sqlite', 'oracle', 'mysql', 'mariadb', 'duckdb'): create_pipe_query = ( f"CREATE TABLE {pipe_name} AS\n" + f"SELECT *\nFROM ({metadef})" + (" AS metadef" if self.flavor in ('mysql', 'mariadb') else '') ) else: create_pipe_query = f"SELECT *\nINTO {pipe_name}\nFROM ({metadef}) AS metadef" result = self.exec(create_pipe_query, debug=debug) if result is None: return False, f"Could not insert new data into {pipe} from its SQL query definition." if not self.create_indices(pipe, debug=debug): if debug: dprint(f"Failed to create indices for {pipe}. Continuing...") rowcount = pipe.get_rowcount(debug=debug) return True, f"Inserted {rowcount}, updated 0 rows." ### Generate names for the tables. transact_id = generate_password(3) def get_temp_table_name(label: str) -> str: return '_' + transact_id + '_' + label + '_' + pipe.target backtrack_table_raw = get_temp_table_name('backtrack') backtrack_table_name = sql_item_name(backtrack_table_raw, self.flavor) new_table_raw = get_temp_table_name('new') new_table_name = sql_item_name(new_table_raw, self.flavor) delta_table_raw = get_temp_table_name('delta') delta_table_name = sql_item_name(delta_table_raw, self.flavor) joined_table_raw = get_temp_table_name('joined') joined_table_name = sql_item_name(joined_table_raw, self.flavor) unseen_table_raw = get_temp_table_name('unseen') unseen_table_name = sql_item_name(unseen_table_raw, self.flavor) update_table_raw = get_temp_table_name('update') update_table_name = sql_item_name(update_table_raw, self.flavor) new_queries = [] drop_new_query = f"DROP TABLE {new_table_name}" if table_exists(new_table_raw, self, debug=debug): new_queries.append(drop_new_query) if self.flavor in ('mssql',): create_new_query = metadef + f"SELECT *\nINTO {new_table_name}\nFROM metadef" elif self.flavor in ('sqlite', 'oracle', 'mysql', 'mariadb', 'duckdb'): create_new_query = ( f"CREATE TABLE {new_table_name} AS\n" + f"SELECT *\nFROM ({metadef})" + (" AS metadef" if self.flavor in ('mysql', 'mariadb') else '') ) else: create_new_query = f"SELECT *\nINTO {new_table_name}\nFROM ({metadef}) AS metadef" new_queries.append(create_new_query) new_success = all(self.exec_queries(new_queries, break_on_error=True, debug=debug)) if not new_success: self.exec_queries([drop_new_query], break_on_error=False, debug=debug) return False, f"Could not fetch new data for {pipe}." new_table_obj = get_sqlalchemy_table( new_table_raw, connector = self, refresh = True, debug = debug, ) new_cols = {str(col.name): get_pd_type(str(col.type)) for col in new_table_obj.columns} add_cols_queries = self.get_add_columns_queries(pipe, new_cols, debug=debug) if add_cols_queries: if not self.exec_queries(add_cols_queries, debug=debug): warn(f"Failed to add new columns to {pipe}.") alter_cols_queries = self.get_alter_columns_queries(pipe, new_cols, debug=debug) if alter_cols_queries: if not self.exec_queries(alter_cols_queries, debug=debug): warn(f"Failed to alter columns for {pipe}.") else: pipe.infer_dtypes(persist=True) if not check_existing: new_count = self.value(f"SELECT COUNT(*) FROM {new_table_name}", debug=debug) insert_queries = [ ( f"INSERT INTO {pipe_name}\n" + f"SELECT *\nFROM {new_table_name}" ), f"DROP TABLE {new_table_name}" ] if not self.exec_queries(insert_queries, debug=debug, break_on_error=False): return False, f"Failed to insert into rows into {pipe}." return True, f"Inserted {new_count}, updated 0 rows." backtrack_queries = [] drop_backtrack_query = f"DROP TABLE {backtrack_table_name}" if table_exists(backtrack_table_raw, self, debug=debug): backtrack_queries.append(drop_backtrack_query) btm = max(self.get_pipe_backtrack_minutes(pipe), 1) backtrack_def = self.get_pipe_data_query( pipe, begin = begin, end = end, begin_add_minutes = (-1 * btm), end_add_minutes = 1, params = params, debug = debug, order = None, ) create_backtrack_query = ( ( f"WITH backtrack_def AS (\n{backtrack_def}\n)\n" + f"SELECT *\nINTO {backtrack_table_name}\nFROM backtrack_def" ) if self.flavor not in ('sqlite', 'oracle', 'mysql', 'mariadb', 'duckdb') else ( f"CREATE TABLE {backtrack_table_name} AS\n" + f"SELECT *\nFROM ({backtrack_def})" + (" AS backtrack" if self.flavor in ('mysql', 'mariadb') else '') ) ) backtrack_queries.append(create_backtrack_query) backtrack_success = all(self.exec_queries(backtrack_queries, break_on_error=True, debug=debug)) if not backtrack_success: self.exec_queries([drop_new_query, drop_backtrack_query], break_on_error=False, debug=debug) return False, f"Could not fetch backtrack data from {pipe}." ### Determine which index columns are present in both tables. backtrack_table_obj = get_sqlalchemy_table( backtrack_table_raw, connector = self, refresh = True, debug = debug, ) backtrack_cols = {str(col.name): str(col.type) for col in backtrack_table_obj.columns} common_cols = [col for col in new_cols if col in backtrack_cols] on_cols = { col: new_cols.get(col, 'object') for col_key, col in pipe.columns.items() if ( col and col_key != 'value' and col in backtrack_cols and col in new_cols ) } delta_queries = [] drop_delta_query = f"DROP TABLE {delta_table_name}" if table_exists(delta_table_raw, self, debug=debug): delta_queries.append(drop_delta_query) create_delta_query = ( ( f"SELECT\n" + ( ', '.join([ f"COALESCE(new.{sql_item_name(col, self.flavor)}, " + f"{get_null_replacement(typ, self.flavor)}) AS " + sql_item_name(col, self.flavor) for col, typ in new_cols.items() ]) ) + "\n" + f"INTO {delta_table_name}\n" + f"FROM {new_table_name} AS new\n" + f"LEFT OUTER JOIN {backtrack_table_name} AS old\nON\n" + '\nAND\n'.join([ ( 'COALESCE(new.' + sql_item_name(c, self.flavor) + ", " + get_null_replacement(new_cols[c], self.flavor) + ") " + ' = ' + 'COALESCE(old.' + sql_item_name(c, self.flavor) + ", " + get_null_replacement(backtrack_cols[c], self.flavor) + ") " ) for c in common_cols ]) + "\nWHERE\n" + '\nAND\n'.join([ ( 'old.' + sql_item_name(c, self.flavor) + ' IS NULL' ) for c in common_cols ]) # + "\nAND\n" # + '\nAND\n'.join([ # ( # 'new.' + sql_item_name(c, self.flavor) + ' IS NOT NULL' # ) for c in new_cols # ]) ) if self.flavor not in ('sqlite', 'oracle', 'mysql', 'mariadb', 'duckdb') else ( f"CREATE TABLE {delta_table_name} AS\n" + f"SELECT\n" + ( ', '.join([ f"COALESCE(new.{sql_item_name(col, self.flavor)}, " + f"{get_null_replacement(typ, self.flavor)}) AS " + sql_item_name(col, self.flavor) for col, typ in new_cols.items() ]) ) + "\n" + f"FROM {new_table_name} new\n" + f"LEFT OUTER JOIN {backtrack_table_name} old\nON\n" + '\nAND\n'.join([ ( 'COALESCE(new.' + sql_item_name(c, self.flavor) + ", " + get_null_replacement(new_cols[c], self.flavor) + ") " + ' = ' + 'COALESCE(old.' + sql_item_name(c, self.flavor) + ", " + get_null_replacement(backtrack_cols[c], self.flavor) + ") " ) for c in common_cols ]) + "\nWHERE\n" + '\nAND\n'.join([ ( 'old.' + sql_item_name(c, self.flavor) + ' IS NULL' ) for c in common_cols ]) # + "\nAND\n" # + '\nAND\n'.join([ # ( # 'new.' + sql_item_name(c, self.flavor) + ' IS NOT NULL' # ) for c in new_cols # ]) ) ) delta_queries.append(create_delta_query) delta_success = all(self.exec_queries(delta_queries, break_on_error=True, debug=debug)) if not delta_success: self.exec_queries( [ drop_new_query, drop_backtrack_query, drop_delta_query, ], break_on_error = False, debug = debug, ) return False, f"Could not filter data for {pipe}." delta_table_obj = get_sqlalchemy_table( delta_table_raw, connector = self, refresh = True, debug = debug, ) delta_cols = {str(col.name): get_pd_type(str(col.type)) for col in delta_table_obj.columns} joined_queries = [] drop_joined_query = f"DROP TABLE {joined_table_name}" if on_cols and table_exists(joined_table_raw, self, debug=debug): joined_queries.append(drop_joined_query) create_joined_query = ( ( "SELECT " + (', '.join([ ( 'delta.' + sql_item_name(c, self.flavor) + " AS " + sql_item_name(c + '_delta', self.flavor) ) for c in delta_cols ])) + ", " + (', '.join([ ( 'bt.' + sql_item_name(c, self.flavor) + " AS " + sql_item_name(c + '_backtrack', self.flavor) ) for c in backtrack_cols ])) + f"\nINTO {joined_table_name}\n" + f"FROM {delta_table_name} AS delta\n" + f"LEFT OUTER JOIN {backtrack_table_name} AS bt\nON\n" + '\nAND\n'.join([ ( 'COALESCE(delta.' + sql_item_name(c, self.flavor) + ", " + get_null_replacement(typ, self.flavor) + ")" + ' = ' + 'COALESCE(bt.' + sql_item_name(c, self.flavor) + ", " + get_null_replacement(typ, self.flavor) + ")" ) for c, typ in on_cols.items() ]) ) if self.flavor not in ('sqlite', 'oracle', 'mysql', 'mariadb', 'duckdb') else ( f"CREATE TABLE {joined_table_name} AS\n" + "SELECT " + (', '.join([ ( 'delta.' + sql_item_name(c, self.flavor) + " AS " + sql_item_name(c + '_delta', self.flavor) ) for c in delta_cols ])) + ", " + (', '.join([ ( 'bt.' + sql_item_name(c, self.flavor) + " AS " + sql_item_name(c + '_backtrack', self.flavor) ) for c in backtrack_cols ])) + f"\nFROM {delta_table_name} delta\n" + f"LEFT OUTER JOIN {backtrack_table_name} bt\nON\n" + '\nAND\n'.join([ ( 'COALESCE(delta.' + sql_item_name(c, self.flavor) + ", " + get_null_replacement(typ, self.flavor) + ")" + ' = ' + 'COALESCE(bt.' + sql_item_name(c, self.flavor) + ", " + get_null_replacement(typ, self.flavor) + ")" ) for c, typ in on_cols.items() ]) ) ) joined_queries.append(create_joined_query) joined_success = ( all(self.exec_queries(joined_queries, break_on_error=True, debug=debug)) if on_cols else True ) if not joined_success: self.exec_queries( [ drop_new_query, drop_backtrack_query, drop_delta_query, drop_joined_query, ], break_on_error = False, debug = debug, ) return False, f"Could not separate new and updated data for {pipe}." unseen_queries = [] drop_unseen_query = f"DROP TABLE {unseen_table_name}" if on_cols and table_exists(unseen_table_raw, self, debug=debug): unseen_queries.append(drop_unseen_query) create_unseen_query = ( ( "SELECT " + (', '.join([ ( "CASE\n WHEN " + sql_item_name(c + '_delta', self.flavor) + " != " + get_null_replacement(typ, self.flavor) + " THEN " + sql_item_name(c + '_delta', self.flavor) + "\n ELSE NULL\nEND " + " AS " + sql_item_name(c, self.flavor) ) for c, typ in delta_cols.items() ])) + f"\nINTO {unseen_table_name}\n" + f"\nFROM {joined_table_name} AS joined\n" + f"WHERE " + '\nAND\n'.join([ ( sql_item_name(c + '_backtrack', self.flavor) + ' IS NULL' ) for c in delta_cols ]) # + "\nAND\n" # + '\nAND\n'.join([ # ( # sql_item_name(c + '_delta', self.flavor) + ' IS NOT NULL' # ) for c in delta_cols # ]) ) if self.flavor not in ('sqlite', 'oracle', 'mysql', 'mariadb', 'duckdb') else ( f"CREATE TABLE {unseen_table_name} AS\n" + "SELECT " + (', '.join([ ( "CASE\n WHEN " + sql_item_name(c + '_delta', self.flavor) + " != " + get_null_replacement(typ, self.flavor) + " THEN " + sql_item_name(c + '_delta', self.flavor) + "\n ELSE NULL\nEND " + " AS " + sql_item_name(c, self.flavor) ) for c, typ in delta_cols.items() ])) + f"\nFROM {joined_table_name} joined\n" + f"WHERE " + '\nAND\n'.join([ ( sql_item_name(c + '_backtrack', self.flavor) + ' IS NULL' ) for c in delta_cols ]) # + "\nAND\n" # + '\nAND\n'.join([ # ( # sql_item_name(c + '_delta', self.flavor) + ' IS NOT NULL' # ) for c in delta_cols # ]) ) ) unseen_queries.append(create_unseen_query) unseen_success = ( all(self.exec_queries(unseen_queries, break_on_error=True, debug=debug)) if on_cols else True ) if not unseen_success: self.exec_queries( [ drop_new_query, drop_backtrack_query, drop_delta_query, drop_joined_query, drop_unseen_query, ], break_on_error = False, debug = debug, ) return False, f"Could not determine new data for {pipe}." unseen_count = self.value( ( "SELECT COUNT(*) FROM " + (unseen_table_name if on_cols else delta_table_name) ), debug = debug, ) update_queries = [] drop_update_query = f"DROP TABLE {update_table_name}" if on_cols and table_exists(update_table_raw, self, debug=debug): update_queries.append(drop_unseen_query) create_update_query = ( ( "SELECT " + (', '.join([ ( "CASE\n WHEN " + sql_item_name(c + '_delta', self.flavor) + " != " + get_null_replacement(typ, self.flavor) + " THEN " + sql_item_name(c + '_delta', self.flavor) + "\n ELSE NULL\nEND " + " AS " + sql_item_name(c, self.flavor) ) for c, typ in delta_cols.items() ])) + f"\nINTO {update_table_name}" + f"\nFROM {joined_table_name} AS joined\n" + f"WHERE " + '\nOR\n'.join([ ( sql_item_name(c + '_backtrack', self.flavor) + ' IS NOT NULL' ) for c in delta_cols ]) ) if self.flavor not in ('sqlite', 'oracle', 'mysql', 'mariadb', 'duckdb') else ( f"CREATE TABLE {update_table_name} AS\n" + "SELECT " + (', '.join([ ( "CASE\n WHEN " + sql_item_name(c + '_delta', self.flavor) + " != " + get_null_replacement(typ, self.flavor) + " THEN " + sql_item_name(c + '_delta', self.flavor) + "\n ELSE NULL\nEND " + " AS " + sql_item_name(c, self.flavor) ) for c, typ in delta_cols.items() ])) + f"\nFROM {joined_table_name} joined\n" + f"WHERE " + '\nOR\n'.join([ ( sql_item_name(c + '_backtrack', self.flavor) + ' IS NOT NULL' ) for c in delta_cols ]) ) ) update_queries.append(create_update_query) update_success = ( all(self.exec_queries(update_queries, break_on_error=True, debug=debug)) if on_cols else True ) if not update_success: self.exec_queries( [ drop_new_query, drop_backtrack_query, drop_delta_query, drop_joined_query, drop_unseen_query, drop_update_query, ], break_on_error = False, debug = debug, ) return False, "Could not determine updated data for {pipe}." update_count = ( self.value(f"SELECT COUNT(*) FROM {update_table_name}", debug=debug) if on_cols else 0 ) apply_update_queries = ( get_update_queries( pipe.target, update_table_raw, self, on_cols, debug = debug ) if on_cols else [] ) apply_unseen_queries = [ ( f"INSERT INTO {pipe_name}\n" + f"SELECT *\nFROM " + (unseen_table_name if on_cols else delta_table_name) ), ] apply_queries = ( (apply_unseen_queries if unseen_count > 0 else []) + (apply_update_queries if update_count > 0 else []) + [ drop_new_query, drop_backtrack_query, drop_delta_query, ] + ( [ drop_joined_query, drop_unseen_query, drop_update_query, ] if on_cols else [] ) ) success = all(self.exec_queries(apply_queries, break_on_error=False, debug=debug)) msg = ( f"Was not able to apply changes to {pipe}." if not success else f"Inserted {unseen_count}, updated {update_count} rows." ) return success, msg
def test_connection(self, **kw: Any) ‑> Optional[bool]
-
Test if a successful connection to the database may be made.
Parameters
**kw: The keyword arguments are passed to
retry_connect()
.Returns
True
if a connection is made, otherwiseFalse
orNone
in case of failure.Expand source code
def test_connection( self, **kw: Any ) -> Union[bool, None]: """ Test if a successful connection to the database may be made. Parameters ---------- **kw: The keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`. Returns ------- `True` if a connection is made, otherwise `False` or `None` in case of failure. """ import warnings from meerschaum.connectors.poll import retry_connect _default_kw = {'max_retries': 1, 'retry_wait': 0, 'warn': False, 'connector': self} _default_kw.update(kw) with warnings.catch_warnings(): warnings.filterwarnings('ignore', 'Could not') try: return retry_connect(**_default_kw) except Exception as e: return False
def to_sql(self, df: pandas.DataFrame, name: str = None, index: bool = False, if_exists: str = 'replace', method: str = '', chunksize: Optional[int] = -1, silent: bool = False, debug: bool = False, as_tuple: bool = False, as_dict: bool = False, **kw) ‑> Union[bool, SuccessTuple]
-
Upload a DataFrame's contents to the SQL server.
Parameters
df
:pd.DataFrame
- The DataFrame to be uploaded.
name
:str
- The name of the table to be created.
index
:bool
, defaultFalse
- If True, creates the DataFrame's indices as columns.
if_exists
:str
, default'replace'
- Drop and create the table ('replace') or append if it exists ('append') or raise Exception ('fail'). Options are ['replace', 'append', 'fail'].
method
:str
, default''
- None or multi. Details on pandas.to_sql.
as_tuple
:bool
, defaultFalse
- If
True
, return a (success_bool, message) tuple instead of abool
. Defaults toFalse
. as_dict
:bool
, defaultFalse
- If
True
, return a dictionary of transaction information. The keys aresuccess
,msg
,start
,end
,duration
,num_rows
,chunksize
,method
, andtarget
. kw
:Any
- Additional arguments will be passed to the DataFrame's
to_sql
function
Returns
Either a
bool
or aSuccessTuple
(depends onas_tuple
).Expand source code
def to_sql( self, df: pandas.DataFrame, name: str = None, index: bool = False, if_exists: str = 'replace', method: str = "", chunksize: Optional[int] = -1, silent: bool = False, debug: bool = False, as_tuple: bool = False, as_dict: bool = False, **kw ) -> Union[bool, SuccessTuple]: """ Upload a DataFrame's contents to the SQL server. Parameters ---------- df: pd.DataFrame The DataFrame to be uploaded. name: str The name of the table to be created. index: bool, default False If True, creates the DataFrame's indices as columns. if_exists: str, default 'replace' Drop and create the table ('replace') or append if it exists ('append') or raise Exception ('fail'). Options are ['replace', 'append', 'fail']. method: str, default '' None or multi. Details on pandas.to_sql. as_tuple: bool, default False If `True`, return a (success_bool, message) tuple instead of a `bool`. Defaults to `False`. as_dict: bool, default False If `True`, return a dictionary of transaction information. The keys are `success`, `msg`, `start`, `end`, `duration`, `num_rows`, `chunksize`, `method`, and `target`. kw: Any Additional arguments will be passed to the DataFrame's `to_sql` function Returns ------- Either a `bool` or a `SuccessTuple` (depends on `as_tuple`). """ import time import json from meerschaum.utils.warnings import error, warn import warnings import functools if name is None: error(f"Name must not be `None` to insert data into {self}.") ### We're requiring `name` to be positional, and sometimes it's passed in from background jobs. kw.pop('name', None) from meerschaum.utils.sql import sql_item_name, table_exists, json_flavors, truncate_item_name from meerschaum.utils.misc import get_json_cols, is_bcp_available from meerschaum.connectors.sql._create_engine import flavor_configs from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy', debug=debug) stats = {'target': name, } ### resort to defaults if None if method == "": if self.flavor in _bulk_flavors: method = psql_insert_copy else: ### Should resolve to 'multi' or `None`. method = flavor_configs.get(self.flavor, {}).get('to_sql', {}).get('method', 'multi') stats['method'] = method.__name__ if hasattr(method, '__name__') else str(method) default_chunksize = self._sys_config.get('chunksize', None) chunksize = chunksize if chunksize != -1 else default_chunksize if chunksize is not None and self.flavor in _max_chunks_flavors: if chunksize > _max_chunks_flavors[self.flavor]: if chunksize != default_chunksize: warn( f"The specified chunksize of {chunksize} exceeds the maximum of " + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n" + f" Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.", stacklevel = 3, ) chunksize = _max_chunks_flavors[self.flavor] stats['chunksize'] = chunksize success, msg = False, "Default to_sql message" start = time.perf_counter() if debug: msg = f"Inserting {len(df)} rows with chunksize: {chunksize}..." print(msg, end="", flush=True) stats['num_rows'] = len(df) ### filter out non-pandas args import inspect to_sql_params = inspect.signature(df.to_sql).parameters to_sql_kw = {} for k, v in kw.items(): if k in to_sql_params: to_sql_kw[k] = v if self.flavor == 'oracle': ### For some reason 'replace' doesn't work properly in pandas, ### so try dropping first. if if_exists == 'replace' and table_exists(name, self, debug=debug): success = self.exec("DROP TABLE " + sql_item_name(name, 'oracle')) is not None if not success: warn(f"Unable to drop {name}") ### Enforce NVARCHAR(2000) as text instead of CLOB. dtype = to_sql_kw.get('dtype', {}) for col, typ in df.dtypes.items(): if str(typ) == 'object': dtype[col] = sqlalchemy.types.NVARCHAR(2000) elif str(typ).lower().startswith('int'): dtype[col] = sqlalchemy.types.INTEGER to_sql_kw['dtype'] = dtype ### Check for JSON columns. if self.flavor not in json_flavors: json_cols = get_json_cols(df) if json_cols: for col in json_cols: df[col] = df[col].apply( ( lambda x: json.dumps(x, default=str, sort_keys=True) if not isinstance(x, Hashable) else x ) ) ### Check if the name is too long. truncated_name = truncate_item_name(name, self.flavor) if name != truncated_name: warn( f"Table '{name}' is too long for '{self.flavor}'," + f" will instead create the table '{truncated_name}'." ) try: with warnings.catch_warnings(): warnings.filterwarnings('ignore', 'case sensitivity issues') df.to_sql( name = truncated_name, con = self.engine, index = index, if_exists = if_exists, method = method, chunksize = chunksize, **to_sql_kw ) success = True except Exception as e: if not silent: warn(str(e)) success, msg = False, str(e) end = time.perf_counter() if success: msg = f"It took {round(end - start, 2)} seconds to sync {len(df)} rows to {name}." stats['start'] = start stats['end'] = end stats['duration'] = end - start if debug: print(f" done.", flush=True) dprint(msg) stats['success'] = success stats['msg'] = msg if as_tuple: return success, msg if as_dict: return stats return success
def value(self, query: str, *args: Any, use_pandas: bool = False, **kw: Any) ‑> Any
-
Execute the provided query and return the first value.
Parameters
query
:str
- The SQL query to execute.
*args
:Any
- The arguments passed to
meerschaum.connectors.sql.SQLConnector.exec
ifuse_pandas
isFalse
(default) or tomeerschaum.connectors.sql.SQLConnector.read
. use_pandas
:bool
, defaultFalse
- If
True
, useread()
, otherwise usemeerschaum.connectors.sql.SQLConnector.exec
(default). NOTE: This is alwaysTrue
for DuckDB. **kw
:Any
- See
args
.
Returns
Any value returned from the query.
Expand source code
def value( self, query: str, *args: Any, use_pandas: bool = False, **kw: Any ) -> Any: """ Execute the provided query and return the first value. Parameters ---------- query: str The SQL query to execute. *args: Any The arguments passed to `meerschaum.connectors.sql.SQLConnector.exec` if `use_pandas` is `False` (default) or to `meerschaum.connectors.sql.SQLConnector.read`. use_pandas: bool, default False If `True`, use `meerschaum.connectors.SQLConnector.read`, otherwise use `meerschaum.connectors.sql.SQLConnector.exec` (default). **NOTE:** This is always `True` for DuckDB. **kw: Any See `args`. Returns ------- Any value returned from the query. """ from meerschaum.utils.packages import attempt_import sqlalchemy = attempt_import('sqlalchemy') if self.flavor == 'duckdb': use_pandas = True if use_pandas: try: return self.read(query, *args, **kw).iloc[0, 0] except Exception as e: # warn(e) return None _close = kw.get('close', True) _commit = kw.get('commit', (self.flavor != 'mssql')) try: result, connection = self.exec( query, *args, with_connection=True, close=False, commit=_commit, **kw ) first = result.first() if result is not None else None _val = first[0] if first is not None else None except Exception as e: warn(e, stacklevel=3) return None if _close: try: connection.close() except Exception as e: warn("Failed to close connection with exception:\n" + str(e)) return _val