meerschaum.connectors
Create connectors with get_connector()
.
For ease of use, you can also import from the root meerschaum
module:
>>> from meerschaum import get_connector
>>> conn = get_connector()
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Create connectors with `meerschaum.connectors.get_connector()`. 7For ease of use, you can also import from the root `meerschaum` module: 8``` 9>>> from meerschaum import get_connector 10>>> conn = get_connector() 11``` 12""" 13 14from __future__ import annotations 15 16import meerschaum as mrsm 17from meerschaum.utils.typing import Any, SuccessTuple, Union, Optional, List, Dict 18from meerschaum.utils.threading import Lock, RLock 19from meerschaum.utils.warnings import error, warn 20 21from meerschaum.connectors.Connector import Connector, InvalidAttributesError 22from meerschaum.connectors.sql.SQLConnector import SQLConnector 23from meerschaum.connectors.api.APIConnector import APIConnector 24from meerschaum.connectors.sql._create_engine import flavor_configs as sql_flavor_configs 25 26__all__ = ("Connector", "SQLConnector", "APIConnector", "get_connector", "is_connected") 27 28### store connectors partitioned by 29### type, label for reuse 30connectors: Dict[str, Dict[str, Connector]] = { 31 'api' : {}, 32 'sql' : {}, 33 'plugin': {}, 34} 35instance_types: List[str] = ['sql', 'api'] 36_locks: Dict[str, RLock] = { 37 'connectors' : RLock(), 38 'types' : RLock(), 39 'custom_types' : RLock(), 40 '_loaded_plugin_connectors': RLock(), 41 'instance_types' : RLock(), 42} 43attributes: Dict[str, Dict[str, Any]] = { 44 'api': { 45 'required': [ 46 'host', 47 'username', 48 'password' 49 ], 50 'default': { 51 'protocol': 'http', 52 }, 53 }, 54 'sql': { 55 'flavors': sql_flavor_configs, 56 }, 57} 58### Fill this with objects only when connectors are first referenced. 59types: Dict[str, Any] = {} 60custom_types: set = set() 61_loaded_plugin_connectors: bool = False 62 63 64def get_connector( 65 type: str = None, 66 label: str = None, 67 refresh: bool = False, 68 debug: bool = False, 69 **kw: Any 70 ) -> Connector: 71 """ 72 Return existing connector or create new connection and store for reuse. 73 74 You can create new connectors if enough parameters are provided for the given type and flavor. 75 76 77 Parameters 78 ---------- 79 type: Optional[str], default None 80 Connector type (sql, api, etc.). 81 Defaults to the type of the configured `instance_connector`. 82 83 label: Optional[str], default None 84 Connector label (e.g. main). Defaults to `'main'`. 85 86 refresh: bool, default False 87 Refresh the Connector instance / construct new object. Defaults to `False`. 88 89 kw: Any 90 Other arguments to pass to the Connector constructor. 91 If the Connector has already been constructed and new arguments are provided, 92 `refresh` is set to `True` and the old Connector is replaced. 93 94 Returns 95 ------- 96 A new Meerschaum connector (e.g. `meerschaum.connectors.api.APIConnector`, 97 `meerschaum.connectors.sql.SQLConnector`). 98 99 Examples 100 -------- 101 The following parameters would create a new 102 `meerschaum.connectors.sql.SQLConnector` that isn't in the configuration file. 103 104 ``` 105 >>> conn = get_connector( 106 ... type = 'sql', 107 ... label = 'newlabel', 108 ... flavor = 'sqlite', 109 ... database = '/file/path/to/database.db' 110 ... ) 111 >>> 112 ``` 113 114 """ 115 from meerschaum.connectors.parse import parse_instance_keys 116 from meerschaum.config import get_config 117 from meerschaum.config.static import STATIC_CONFIG 118 from meerschaum.utils.warnings import warn 119 global _loaded_plugin_connectors 120 if isinstance(type, str) and not label and ':' in type: 121 type, label = type.split(':', maxsplit=1) 122 with _locks['_loaded_plugin_connectors']: 123 if not _loaded_plugin_connectors: 124 load_plugin_connectors() 125 _loaded_plugin_connectors = True 126 if type is None and label is None: 127 default_instance_keys = get_config('meerschaum', 'instance', patch=True) 128 ### recursive call to get_connector 129 return parse_instance_keys(default_instance_keys) 130 131 ### NOTE: the default instance connector may not be main. 132 ### Only fall back to 'main' if the type is provided by the label is omitted. 133 label = label if label is not None else STATIC_CONFIG['connectors']['default_label'] 134 135 ### type might actually be a label. Check if so and raise a warning. 136 if type not in connectors: 137 possibilities, poss_msg = [], "" 138 for _type in get_config('meerschaum', 'connectors'): 139 if type in get_config('meerschaum', 'connectors', _type): 140 possibilities.append(f"{_type}:{type}") 141 if len(possibilities) > 0: 142 poss_msg = " Did you mean" 143 for poss in possibilities[:-1]: 144 poss_msg += f" '{poss}'," 145 if poss_msg.endswith(','): 146 poss_msg = poss_msg[:-1] 147 if len(possibilities) > 1: 148 poss_msg += " or" 149 poss_msg += f" '{possibilities[-1]}'?" 150 151 warn(f"Cannot create Connector of type '{type}'." + poss_msg, stack=False) 152 return None 153 154 if 'sql' not in types: 155 from meerschaum.connectors.plugin import PluginConnector 156 with _locks['types']: 157 types.update({ 158 'api' : APIConnector, 159 'sql' : SQLConnector, 160 'plugin': PluginConnector, 161 }) 162 163 ### determine if we need to call the constructor 164 if not refresh: 165 ### see if any user-supplied arguments differ from the existing instance 166 if label in connectors[type]: 167 warning_message = None 168 for attribute, value in kw.items(): 169 if attribute not in connectors[type][label].meta: 170 import inspect 171 cls = connectors[type][label].__class__ 172 cls_init_signature = inspect.signature(cls) 173 cls_init_params = cls_init_signature.parameters 174 if attribute not in cls_init_params: 175 warning_message = ( 176 f"Received new attribute '{attribute}' not present in connector " + 177 f"{connectors[type][label]}.\n" 178 ) 179 elif connectors[type][label].__dict__[attribute] != value: 180 warning_message = ( 181 f"Mismatched values for attribute '{attribute}' in connector " 182 + f"'{connectors[type][label]}'.\n" + 183 f" - Keyword value: '{value}'\n" + 184 f" - Existing value: '{connectors[type][label].__dict__[attribute]}'\n" 185 ) 186 if warning_message is not None: 187 warning_message += ( 188 "\nSetting `refresh` to True and recreating connector with type:" 189 + f" '{type}' and label '{label}'." 190 ) 191 refresh = True 192 warn(warning_message) 193 else: ### connector doesn't yet exist 194 refresh = True 195 196 ### only create an object if refresh is True 197 ### (can be manually specified, otherwise determined above) 198 if refresh: 199 with _locks['connectors']: 200 try: 201 ### will raise an error if configuration is incorrect / missing 202 conn = types[type](label=label, **kw) 203 connectors[type][label] = conn 204 except InvalidAttributesError as ie: 205 warn( 206 f"Incorrect attributes for connector '{type}:{label}'.\n" 207 + str(ie), 208 stack = False, 209 ) 210 conn = None 211 except Exception as e: 212 from meerschaum.utils.formatting import get_console 213 console = get_console() 214 if console: 215 console.print_exception() 216 warn( 217 f"Exception when creating connector '{type}:{label}'.\n" + str(e), 218 stack = False, 219 ) 220 conn = None 221 if conn is None: 222 return None 223 224 return connectors[type][label] 225 226 227def is_connected(keys: str, **kw) -> bool: 228 """ 229 Check if the connector keys correspond to an active connection. 230 If the connector has not been created, it will immediately return `False`. 231 If the connector exists but cannot communicate with the source, return `False`. 232 233 **NOTE:** Only works with instance connectors (`SQLConnectors` and `APIConnectors`). 234 Keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`. 235 236 Parameters 237 ---------- 238 keys: 239 The keys to the connector (e.g. `'sql:main'`). 240 241 Returns 242 ------- 243 A `bool` corresponding to whether a successful connection may be made. 244 245 """ 246 import warnings 247 if ':' not in keys: 248 warn(f"Invalid connector keys '{keys}'") 249 250 try: 251 typ, label = keys.split(':') 252 except Exception as e: 253 return False 254 if typ not in instance_types: 255 return False 256 if not (label in connectors.get(typ, {})): 257 return False 258 259 from meerschaum.connectors.parse import parse_instance_keys 260 conn = parse_instance_keys(keys) 261 try: 262 with warnings.catch_warnings(): 263 warnings.filterwarnings('ignore') 264 return conn.test_connection(**kw) 265 except Exception as e: 266 return False 267 268 269def make_connector( 270 cls, 271 ): 272 """ 273 Register a class as a `Connector`. 274 The `type` will be the lower case of the class name, without the suffix `connector`. 275 276 Parameters 277 ---------- 278 instance: bool, default False 279 If `True`, make this connector type an instance connector. 280 This requires implementing the various pipes functions and lots of testing. 281 282 Examples 283 -------- 284 >>> import meerschaum as mrsm 285 >>> from meerschaum.connectors import make_connector, Connector 286 >>> class FooConnector(Connector): 287 ... def __init__(self, label: str, **kw): 288 ... super().__init__('foo', label, **kw) 289 ... 290 >>> make_connector(FooConnector) 291 >>> mrsm.get_connector('foo', 'bar') 292 foo:bar 293 >>> 294 """ 295 import re 296 typ = re.sub(r'connector$', '', cls.__name__.lower()) 297 with _locks['types']: 298 types[typ] = cls 299 with _locks['custom_types']: 300 custom_types.add(typ) 301 with _locks['connectors']: 302 if typ not in connectors: 303 connectors[typ] = {} 304 if getattr(cls, 'IS_INSTANCE', False): 305 with _locks['instance_types']: 306 if typ not in instance_types: 307 instance_types.append(typ) 308 309 return cls 310 311 312def load_plugin_connectors(): 313 """ 314 If a plugin makes use of the `make_connector` decorator, 315 load its module. 316 """ 317 from meerschaum.plugins import get_plugins, import_plugins 318 to_import = [] 319 for plugin in get_plugins(): 320 with open(plugin.__file__, encoding='utf-8') as f: 321 text = f.read() 322 if 'make_connector' in text: 323 to_import.append(plugin.name) 324 if not to_import: 325 return 326 import_plugins(*to_import) 327 328 329def get_connector_plugin( 330 connector: Connector, 331 ) -> Union[str, None, mrsm.Plugin]: 332 """ 333 Determine the plugin for a connector. 334 This is useful for handling virtual environments for custom instance connectors. 335 336 Parameters 337 ---------- 338 connector: Connector 339 The connector which may require a virtual environment. 340 341 Returns 342 ------- 343 A Plugin, 'mrsm', or None. 344 """ 345 if not hasattr(connector, 'type'): 346 return None 347 plugin_name = ( 348 connector.__module__.replace('plugins.', '').split('.')[0] 349 if connector.type in custom_types else ( 350 connector.label 351 if connector.type == 'plugin' 352 else 'mrsm' 353 ) 354 ) 355 plugin = mrsm.Plugin(plugin_name) 356 return plugin if plugin.is_installed() else None
20class Connector(metaclass=abc.ABCMeta): 21 """ 22 The base connector class to hold connection attributes, 23 """ 24 def __init__( 25 self, 26 type: Optional[str] = None, 27 label: Optional[str] = None, 28 **kw: Any 29 ): 30 """ 31 Parameters 32 ---------- 33 type: str 34 The `type` of the connector (e.g. `sql`, `api`, `plugin`). 35 36 label: str 37 The `label` for the connector. 38 39 Run `mrsm edit config` and to edit connectors in the YAML file: 40 41 ``` 42 meerschaum: 43 connections: 44 {type}: 45 {label}: 46 ### attributes go here 47 ``` 48 49 """ 50 self._original_dict = copy.deepcopy(self.__dict__) 51 self._set_attributes(type=type, label=label, **kw) 52 self.verify_attributes(getattr(self, 'REQUIRED_ATTRIBUTES', None)) 53 54 def _reset_attributes(self): 55 self.__dict__ = self._original_dict 56 57 def _set_attributes( 58 self, 59 *args, 60 inherit_default: bool = True, 61 **kw: Any 62 ): 63 from meerschaum.config.static import STATIC_CONFIG 64 from meerschaum.utils.warnings import error 65 66 self._attributes = {} 67 68 default_label = STATIC_CONFIG['connectors']['default_label'] 69 70 ### NOTE: Support the legacy method of explicitly passing the type. 71 label = kw.get('label', None) 72 if label is None: 73 if len(args) == 2: 74 label = args[1] 75 elif len(args) == 0: 76 label = None 77 else: 78 label = args[0] 79 80 if label == 'default': 81 error( 82 f"Label cannot be 'default'. Did you mean '{default_label}'?", 83 InvalidAttributesError, 84 ) 85 self.__dict__['label'] = label 86 87 from meerschaum.config import get_config 88 conn_configs = copy.deepcopy(get_config('meerschaum', 'connectors')) 89 connector_config = copy.deepcopy(get_config('system', 'connectors')) 90 91 ### inherit attributes from 'default' if exists 92 if inherit_default: 93 inherit_from = 'default' 94 if self.type in conn_configs and inherit_from in conn_configs[self.type]: 95 _inherit_dict = copy.deepcopy(conn_configs[self.type][inherit_from]) 96 self._attributes.update(_inherit_dict) 97 98 ### load user config into self._attributes 99 if self.type in conn_configs and self.label in conn_configs[self.type]: 100 self._attributes.update(conn_configs[self.type][self.label]) 101 102 ### load system config into self._sys_config 103 ### (deep copy so future Connectors don't inherit changes) 104 if self.type in connector_config: 105 self._sys_config = copy.deepcopy(connector_config[self.type]) 106 107 ### add additional arguments or override configuration 108 self._attributes.update(kw) 109 110 ### finally, update __dict__ with _attributes. 111 self.__dict__.update(self._attributes) 112 113 114 def verify_attributes( 115 self, 116 required_attributes: Optional[List[str]] = None, 117 debug: bool = False 118 ) -> None: 119 """ 120 Ensure that the required attributes have been met. 121 122 The Connector base class checks the minimum requirements. 123 Child classes may enforce additional requirements. 124 125 Parameters 126 ---------- 127 required_attributes: Optional[List[str]], default None 128 Attributes to be verified. If `None`, default to `['label']`. 129 130 debug: bool, default False 131 Verbosity toggle. 132 133 Returns 134 ------- 135 Don't return anything. 136 137 Raises 138 ------ 139 An error if any of the required attributes are missing. 140 """ 141 from meerschaum.utils.warnings import error, warn 142 from meerschaum.utils.debug import dprint 143 from meerschaum.utils.misc import items_str 144 if required_attributes is None: 145 required_attributes = ['label'] 146 missing_attributes = set() 147 for a in required_attributes: 148 if a not in self.__dict__: 149 missing_attributes.add(a) 150 if len(missing_attributes) > 0: 151 error( 152 ( 153 f"Missing {items_str(list(missing_attributes))} " 154 + f"for connector '{self.type}:{self.label}'." 155 ), 156 InvalidAttributesError, 157 silent = True, 158 stack = False 159 ) 160 161 162 def __str__(self): 163 """ 164 When cast to a string, return type:label. 165 """ 166 return f"{self.type}:{self.label}" 167 168 def __repr__(self): 169 """ 170 Represent the connector as type:label. 171 """ 172 return str(self) 173 174 @property 175 def meta(self) -> Dict[str, Any]: 176 """ 177 Return the keys needed to reconstruct this Connector. 178 """ 179 _meta = { 180 key: value 181 for key, value in self.__dict__.items() 182 if not str(key).startswith('_') 183 } 184 _meta.update({ 185 'type': self.type, 186 'label': self.label, 187 }) 188 return _meta 189 190 191 @property 192 def type(self) -> str: 193 """ 194 Return the type for this connector. 195 """ 196 _type = self.__dict__.get('type', None) 197 if _type is None: 198 import re 199 _type = re.sub(r'connector$', '', self.__class__.__name__.lower()) 200 self.__dict__['type'] = _type 201 return _type 202 203 204 @property 205 def label(self) -> str: 206 """ 207 Return the label for this connector. 208 """ 209 _label = self.__dict__.get('label', None) 210 if _label is None: 211 from meerschaum.config.static import STATIC_CONFIG 212 _label = STATIC_CONFIG['connectors']['default_label'] 213 self.__dict__['label'] = _label 214 return _label
The base connector class to hold connection attributes,
24 def __init__( 25 self, 26 type: Optional[str] = None, 27 label: Optional[str] = None, 28 **kw: Any 29 ): 30 """ 31 Parameters 32 ---------- 33 type: str 34 The `type` of the connector (e.g. `sql`, `api`, `plugin`). 35 36 label: str 37 The `label` for the connector. 38 39 Run `mrsm edit config` and to edit connectors in the YAML file: 40 41 ``` 42 meerschaum: 43 connections: 44 {type}: 45 {label}: 46 ### attributes go here 47 ``` 48 49 """ 50 self._original_dict = copy.deepcopy(self.__dict__) 51 self._set_attributes(type=type, label=label, **kw) 52 self.verify_attributes(getattr(self, 'REQUIRED_ATTRIBUTES', None))
114 def verify_attributes( 115 self, 116 required_attributes: Optional[List[str]] = None, 117 debug: bool = False 118 ) -> None: 119 """ 120 Ensure that the required attributes have been met. 121 122 The Connector base class checks the minimum requirements. 123 Child classes may enforce additional requirements. 124 125 Parameters 126 ---------- 127 required_attributes: Optional[List[str]], default None 128 Attributes to be verified. If `None`, default to `['label']`. 129 130 debug: bool, default False 131 Verbosity toggle. 132 133 Returns 134 ------- 135 Don't return anything. 136 137 Raises 138 ------ 139 An error if any of the required attributes are missing. 140 """ 141 from meerschaum.utils.warnings import error, warn 142 from meerschaum.utils.debug import dprint 143 from meerschaum.utils.misc import items_str 144 if required_attributes is None: 145 required_attributes = ['label'] 146 missing_attributes = set() 147 for a in required_attributes: 148 if a not in self.__dict__: 149 missing_attributes.add(a) 150 if len(missing_attributes) > 0: 151 error( 152 ( 153 f"Missing {items_str(list(missing_attributes))} " 154 + f"for connector '{self.type}:{self.label}'." 155 ), 156 InvalidAttributesError, 157 silent = True, 158 stack = False 159 )
Ensure that the required attributes have been met.
The Connector base class checks the minimum requirements. Child classes may enforce additional requirements.
Parameters
- required_attributes (Optional[List[str]], default None):
Attributes to be verified. If
None
, default to['label']
. - debug (bool, default False): Verbosity toggle.
Returns
- Don't return anything.
Raises
- An error if any of the required attributes are missing.
17class SQLConnector(Connector): 18 """ 19 Connect to SQL databases via `sqlalchemy`. 20 21 SQLConnectors may be used as Meerschaum instance connectors. 22 Read more about connectors and instances at 23 https://meerschaum.io/reference/connectors/ 24 25 """ 26 27 IS_INSTANCE: bool = True 28 29 from ._create_engine import flavor_configs, create_engine 30 from ._sql import read, value, exec, execute, to_sql, exec_queries 31 from meerschaum.utils.sql import test_connection 32 from ._fetch import fetch, get_pipe_metadef 33 from ._cli import cli, _cli_exit 34 from ._pipes import ( 35 fetch_pipes_keys, 36 create_indices, 37 drop_indices, 38 get_create_index_queries, 39 get_drop_index_queries, 40 get_add_columns_queries, 41 get_alter_columns_queries, 42 delete_pipe, 43 get_pipe_data, 44 get_pipe_data_query, 45 register_pipe, 46 edit_pipe, 47 get_pipe_id, 48 get_pipe_attributes, 49 sync_pipe, 50 sync_pipe_inplace, 51 get_sync_time, 52 pipe_exists, 53 get_pipe_rowcount, 54 drop_pipe, 55 clear_pipe, 56 deduplicate_pipe, 57 get_pipe_table, 58 get_pipe_columns_types, 59 get_to_sql_dtype, 60 get_pipe_schema, 61 ) 62 from ._plugins import ( 63 register_plugin, 64 delete_plugin, 65 get_plugin_id, 66 get_plugin_version, 67 get_plugins, 68 get_plugin_user_id, 69 get_plugin_username, 70 get_plugin_attributes, 71 ) 72 from ._users import ( 73 register_user, 74 get_user_id, 75 get_users, 76 edit_user, 77 delete_user, 78 get_user_password_hash, 79 get_user_type, 80 get_user_attributes, 81 ) 82 from ._uri import from_uri, parse_uri 83 from ._instance import ( 84 _log_temporary_tables_creation, 85 _drop_temporary_table, 86 _drop_temporary_tables, 87 _drop_old_temporary_tables, 88 ) 89 90 def __init__( 91 self, 92 label: Optional[str] = None, 93 flavor: Optional[str] = None, 94 wait: bool = False, 95 connect: bool = False, 96 debug: bool = False, 97 **kw: Any 98 ): 99 """ 100 Parameters 101 ---------- 102 label: str, default 'main' 103 The identifying label for the connector. 104 E.g. for `sql:main`, 'main' is the label. 105 Defaults to 'main'. 106 107 flavor: Optional[str], default None 108 The database flavor, e.g. 109 `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc. 110 To see supported flavors, run the `bootstrap connectors` command. 111 112 wait: bool, default False 113 If `True`, block until a database connection has been made. 114 Defaults to `False`. 115 116 connect: bool, default False 117 If `True`, immediately attempt to connect the database and raise 118 a warning if the connection fails. 119 Defaults to `False`. 120 121 debug: bool, default False 122 Verbosity toggle. 123 Defaults to `False`. 124 125 kw: Any 126 All other arguments will be passed to the connector's attributes. 127 Therefore, a connector may be made without being registered, 128 as long enough parameters are supplied to the constructor. 129 """ 130 if 'uri' in kw: 131 uri = kw['uri'] 132 if uri.startswith('postgres://'): 133 uri = uri.replace('postgres://', 'postgresql://', 1) 134 if uri.startswith('timescaledb://'): 135 uri = uri.replace('timescaledb://', 'postgresql://', 1) 136 flavor = 'timescaledb' 137 kw['uri'] = uri 138 from_uri_params = self.from_uri(kw['uri'], as_dict=True) 139 label = label or from_uri_params.get('label', None) 140 _ = from_uri_params.pop('label', None) 141 142 ### Sometimes the flavor may be provided with a URI. 143 kw.update(from_uri_params) 144 if flavor: 145 kw['flavor'] = flavor 146 147 148 ### set __dict__ in base class 149 super().__init__( 150 'sql', 151 label = label or self.__dict__.get('label', None), 152 **kw 153 ) 154 155 if self.__dict__.get('flavor', None) == 'sqlite': 156 self._reset_attributes() 157 self._set_attributes( 158 'sql', 159 label = label, 160 inherit_default = False, 161 **kw 162 ) 163 ### For backwards compatability reasons, set the path for sql:local if its missing. 164 if self.label == 'local' and not self.__dict__.get('database', None): 165 from meerschaum.config._paths import SQLITE_DB_PATH 166 self.database = str(SQLITE_DB_PATH) 167 168 ### ensure flavor and label are set accordingly 169 if 'flavor' not in self.__dict__: 170 if flavor is None and 'uri' not in self.__dict__: 171 raise Exception( 172 f" Missing flavor. Provide flavor as a key for '{self}'." 173 ) 174 self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None) 175 176 if self.flavor == 'postgres': 177 self.flavor = 'postgresql' 178 179 self._debug = debug 180 ### Store the PID and thread at initialization 181 ### so we can dispose of the Pool in child processes or threads. 182 import os, threading 183 self._pid = os.getpid() 184 self._thread_ident = threading.current_thread().ident 185 self._sessions = {} 186 self._locks = {'_sessions': threading.RLock(), } 187 188 ### verify the flavor's requirements are met 189 if self.flavor not in self.flavor_configs: 190 error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector") 191 if not self.__dict__.get('uri'): 192 self.verify_attributes( 193 self.flavor_configs[self.flavor].get('requirements', set()), 194 debug=debug, 195 ) 196 197 if wait: 198 from meerschaum.connectors.poll import retry_connect 199 retry_connect(connector=self, debug=debug) 200 201 if connect: 202 if not self.test_connection(debug=debug): 203 from meerschaum.utils.warnings import warn 204 warn(f"Failed to connect with connector '{self}'!", stack=False) 205 206 @property 207 def Session(self): 208 if '_Session' not in self.__dict__: 209 if self.engine is None: 210 return None 211 212 from meerschaum.utils.packages import attempt_import 213 sqlalchemy_orm = attempt_import('sqlalchemy.orm') 214 session_factory = sqlalchemy_orm.sessionmaker(self.engine) 215 self._Session = sqlalchemy_orm.scoped_session(session_factory) 216 217 return self._Session 218 219 @property 220 def engine(self): 221 import os, threading 222 ### build the sqlalchemy engine 223 if '_engine' not in self.__dict__: 224 self._engine, self._engine_str = self.create_engine(include_uri=True) 225 226 same_process = os.getpid() == self._pid 227 same_thread = threading.current_thread().ident == self._thread_ident 228 229 ### handle child processes 230 if not same_process: 231 self._pid = os.getpid() 232 self._thread = threading.current_thread() 233 from meerschaum.utils.warnings import warn 234 warn(f"Different PID detected. Disposing of connections...") 235 self._engine.dispose() 236 237 ### handle different threads 238 if not same_thread: 239 pass 240 241 return self._engine 242 243 @property 244 def DATABASE_URL(self) -> str: 245 """ 246 Return the URI connection string (alias for `SQLConnector.URI`. 247 """ 248 _ = self.engine 249 return str(self._engine_str) 250 251 @property 252 def URI(self) -> str: 253 """ 254 Return the URI connection string. 255 """ 256 _ = self.engine 257 return str(self._engine_str) 258 259 @property 260 def IS_THREAD_SAFE(self) -> str: 261 """ 262 Return whether this connector may be multithreaded. 263 """ 264 if self.flavor == 'duckdb': 265 return False 266 if self.flavor == 'sqlite': 267 return ':memory:' not in self.URI 268 return True 269 270 271 @property 272 def metadata(self): 273 """ 274 Return the metadata bound to this configured schema. 275 """ 276 from meerschaum.utils.packages import attempt_import 277 sqlalchemy = attempt_import('sqlalchemy') 278 if '_metadata' not in self.__dict__: 279 self._metadata = sqlalchemy.MetaData(schema=self.schema) 280 return self._metadata 281 282 283 @property 284 def instance_schema(self): 285 """ 286 Return the schema name for Meerschaum tables. 287 """ 288 return self.schema 289 290 291 @property 292 def internal_schema(self): 293 """ 294 Return the schema name for internal tables. 295 """ 296 from meerschaum.config.static import STATIC_CONFIG 297 from meerschaum.utils.packages import attempt_import 298 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 299 schema_name = self.__dict__.get('internal_schema', None) or ( 300 STATIC_CONFIG['sql']['internal_schema'] 301 if self.flavor not in NO_SCHEMA_FLAVORS 302 else self.schema 303 ) 304 305 if '_internal_schema' not in self.__dict__: 306 self._internal_schema = schema_name 307 return self._internal_schema 308 309 310 @property 311 def db(self) -> Optional[databases.Database]: 312 from meerschaum.utils.packages import attempt_import 313 databases = attempt_import('databases', lazy=False, install=True) 314 url = self.DATABASE_URL 315 if 'mysql' in url: 316 url = url.replace('+pymysql', '') 317 if '_db' not in self.__dict__: 318 try: 319 self._db = databases.Database(url) 320 except KeyError: 321 ### Likely encountered an unsupported flavor. 322 from meerschaum.utils.warnings import warn 323 self._db = None 324 return self._db 325 326 327 @property 328 def db_version(self) -> Union[str, None]: 329 """ 330 Return the database version. 331 """ 332 _db_version = self.__dict__.get('_db_version', None) 333 if _db_version is not None: 334 return _db_version 335 336 from meerschaum.utils.sql import get_db_version 337 self._db_version = get_db_version(self) 338 return self._db_version 339 340 341 @property 342 def schema(self) -> Union[str, None]: 343 """ 344 Return the default schema to use. 345 A value of `None` will not prepend a schema. 346 """ 347 if 'schema' in self.__dict__: 348 return self.__dict__['schema'] 349 350 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 351 if self.flavor in NO_SCHEMA_FLAVORS: 352 self.__dict__['schema'] = None 353 return None 354 355 sqlalchemy = mrsm.attempt_import('sqlalchemy') 356 _schema = sqlalchemy.inspect(self.engine).default_schema_name 357 self.__dict__['schema'] = _schema 358 return _schema 359 360 361 def __getstate__(self): 362 return self.__dict__ 363 364 def __setstate__(self, d): 365 self.__dict__.update(d) 366 367 def __call__(self): 368 return self
Connect to SQL databases via sqlalchemy
.
SQLConnectors may be used as Meerschaum instance connectors. Read more about connectors and instances at https://meerschaum.io/reference/connectors/
90 def __init__( 91 self, 92 label: Optional[str] = None, 93 flavor: Optional[str] = None, 94 wait: bool = False, 95 connect: bool = False, 96 debug: bool = False, 97 **kw: Any 98 ): 99 """ 100 Parameters 101 ---------- 102 label: str, default 'main' 103 The identifying label for the connector. 104 E.g. for `sql:main`, 'main' is the label. 105 Defaults to 'main'. 106 107 flavor: Optional[str], default None 108 The database flavor, e.g. 109 `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc. 110 To see supported flavors, run the `bootstrap connectors` command. 111 112 wait: bool, default False 113 If `True`, block until a database connection has been made. 114 Defaults to `False`. 115 116 connect: bool, default False 117 If `True`, immediately attempt to connect the database and raise 118 a warning if the connection fails. 119 Defaults to `False`. 120 121 debug: bool, default False 122 Verbosity toggle. 123 Defaults to `False`. 124 125 kw: Any 126 All other arguments will be passed to the connector's attributes. 127 Therefore, a connector may be made without being registered, 128 as long enough parameters are supplied to the constructor. 129 """ 130 if 'uri' in kw: 131 uri = kw['uri'] 132 if uri.startswith('postgres://'): 133 uri = uri.replace('postgres://', 'postgresql://', 1) 134 if uri.startswith('timescaledb://'): 135 uri = uri.replace('timescaledb://', 'postgresql://', 1) 136 flavor = 'timescaledb' 137 kw['uri'] = uri 138 from_uri_params = self.from_uri(kw['uri'], as_dict=True) 139 label = label or from_uri_params.get('label', None) 140 _ = from_uri_params.pop('label', None) 141 142 ### Sometimes the flavor may be provided with a URI. 143 kw.update(from_uri_params) 144 if flavor: 145 kw['flavor'] = flavor 146 147 148 ### set __dict__ in base class 149 super().__init__( 150 'sql', 151 label = label or self.__dict__.get('label', None), 152 **kw 153 ) 154 155 if self.__dict__.get('flavor', None) == 'sqlite': 156 self._reset_attributes() 157 self._set_attributes( 158 'sql', 159 label = label, 160 inherit_default = False, 161 **kw 162 ) 163 ### For backwards compatability reasons, set the path for sql:local if its missing. 164 if self.label == 'local' and not self.__dict__.get('database', None): 165 from meerschaum.config._paths import SQLITE_DB_PATH 166 self.database = str(SQLITE_DB_PATH) 167 168 ### ensure flavor and label are set accordingly 169 if 'flavor' not in self.__dict__: 170 if flavor is None and 'uri' not in self.__dict__: 171 raise Exception( 172 f" Missing flavor. Provide flavor as a key for '{self}'." 173 ) 174 self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None) 175 176 if self.flavor == 'postgres': 177 self.flavor = 'postgresql' 178 179 self._debug = debug 180 ### Store the PID and thread at initialization 181 ### so we can dispose of the Pool in child processes or threads. 182 import os, threading 183 self._pid = os.getpid() 184 self._thread_ident = threading.current_thread().ident 185 self._sessions = {} 186 self._locks = {'_sessions': threading.RLock(), } 187 188 ### verify the flavor's requirements are met 189 if self.flavor not in self.flavor_configs: 190 error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector") 191 if not self.__dict__.get('uri'): 192 self.verify_attributes( 193 self.flavor_configs[self.flavor].get('requirements', set()), 194 debug=debug, 195 ) 196 197 if wait: 198 from meerschaum.connectors.poll import retry_connect 199 retry_connect(connector=self, debug=debug) 200 201 if connect: 202 if not self.test_connection(debug=debug): 203 from meerschaum.utils.warnings import warn 204 warn(f"Failed to connect with connector '{self}'!", stack=False)
Parameters
- label (str, default 'main'):
The identifying label for the connector.
E.g. for
sql:main
, 'main' is the label. Defaults to 'main'. - flavor (Optional[str], default None):
The database flavor, e.g.
'sqlite'
,'postgresql'
,'cockroachdb'
, etc. To see supported flavors, run thebootstrap connectors
command. - wait (bool, default False):
If
True
, block until a database connection has been made. Defaults toFalse
. - connect (bool, default False):
If
True
, immediately attempt to connect the database and raise a warning if the connection fails. Defaults toFalse
. - debug (bool, default False):
Verbosity toggle.
Defaults to
False
. - kw (Any): All other arguments will be passed to the connector's attributes. Therefore, a connector may be made without being registered, as long enough parameters are supplied to the constructor.
174def create_engine( 175 self, 176 include_uri: bool = False, 177 debug: bool = False, 178 **kw 179 ) -> 'sqlalchemy.engine.Engine': 180 """Create a sqlalchemy engine by building the engine string.""" 181 from meerschaum.utils.packages import attempt_import 182 from meerschaum.utils.warnings import error, warn 183 sqlalchemy = attempt_import('sqlalchemy') 184 import urllib 185 import copy 186 ### Install and patch required drivers. 187 if self.flavor in install_flavor_drivers: 188 attempt_import(*install_flavor_drivers[self.flavor], debug=debug, lazy=False, warn=False) 189 if self.flavor in require_patching_flavors: 190 from meerschaum.utils.packages import determine_version, _monkey_patch_get_distribution 191 import pathlib 192 for install_name, import_name in require_patching_flavors[self.flavor]: 193 pkg = attempt_import( 194 import_name, 195 debug = debug, 196 lazy = False, 197 warn = False 198 ) 199 _monkey_patch_get_distribution( 200 install_name, determine_version(pathlib.Path(pkg.__file__), venv='mrsm') 201 ) 202 203 ### supplement missing values with defaults (e.g. port number) 204 for a, value in flavor_configs[self.flavor]['defaults'].items(): 205 if a not in self.__dict__: 206 self.__dict__[a] = value 207 208 ### Verify that everything is in order. 209 if self.flavor not in flavor_configs: 210 error(f"Cannot create a connector with the flavor '{self.flavor}'.") 211 212 _engine = flavor_configs[self.flavor].get('engine', None) 213 _username = self.__dict__.get('username', None) 214 _password = self.__dict__.get('password', None) 215 _host = self.__dict__.get('host', None) 216 _port = self.__dict__.get('port', None) 217 _database = self.__dict__.get('database', None) 218 _options = self.__dict__.get('options', {}) 219 if isinstance(_options, str): 220 _options = dict(urllib.parse.parse_qsl(_options)) 221 _uri = self.__dict__.get('uri', None) 222 223 ### Handle registering specific dialects (due to installing in virtual environments). 224 if self.flavor in flavor_dialects: 225 sqlalchemy.dialects.registry.register(*flavor_dialects[self.flavor]) 226 227 ### self._sys_config was deepcopied and can be updated safely 228 if self.flavor in ("sqlite", "duckdb"): 229 engine_str = f"{_engine}:///{_database}" if not _uri else _uri 230 if 'create_engine' not in self._sys_config: 231 self._sys_config['create_engine'] = {} 232 if 'connect_args' not in self._sys_config['create_engine']: 233 self._sys_config['create_engine']['connect_args'] = {} 234 self._sys_config['create_engine']['connect_args'].update({"check_same_thread" : False}) 235 else: 236 engine_str = ( 237 _engine + "://" + (_username if _username is not None else '') + 238 ((":" + urllib.parse.quote_plus(_password)) if _password is not None else '') + 239 "@" + _host + ((":" + str(_port)) if _port is not None else '') + 240 (("/" + _database) if _database is not None else '') 241 + (("?" + urllib.parse.urlencode(_options)) if _options else '') 242 ) if not _uri else _uri 243 244 ### Sometimes the timescaledb:// flavor can slip in. 245 if _uri and self.flavor in ('timescaledb',) and self.flavor in _uri: 246 engine_str = engine_str.replace(f'{self.flavor}://', 'postgresql://') 247 248 if debug: 249 dprint( 250 ( 251 (engine_str.replace(':' + _password, ':' + ('*' * len(_password)))) 252 if _password is not None else engine_str 253 ) + '\n' + f"{self._sys_config.get('create_engine', {}).get('connect_args', {})}" 254 ) 255 256 _kw_copy = copy.deepcopy(kw) 257 258 ### NOTE: Order of inheritance: 259 ### 1. Defaults 260 ### 2. System configuration 261 ### 3. Connector configuration 262 ### 4. Keyword arguments 263 _create_engine_args = flavor_configs.get(self.flavor, {}).get('create_engine', {}) 264 def _apply_create_engine_args(update): 265 if 'ALL' not in flavor_configs[self.flavor].get('omit_create_engine', {}): 266 _create_engine_args.update( 267 { k: v for k, v in update.items() 268 if 'omit_create_engine' not in flavor_configs[self.flavor] 269 or k not in flavor_configs[self.flavor].get('omit_create_engine') 270 } 271 ) 272 _apply_create_engine_args(self._sys_config.get('create_engine', {})) 273 _apply_create_engine_args(self.__dict__.get('create_engine', {})) 274 _apply_create_engine_args(_kw_copy) 275 276 try: 277 engine = sqlalchemy.create_engine( 278 engine_str, 279 ### I know this looks confusing, and maybe it's bad code, 280 ### but it's simple. It dynamically parses the config string 281 ### and splits it to separate the class name (QueuePool) 282 ### from the module name (sqlalchemy.pool). 283 poolclass = getattr( 284 attempt_import( 285 ".".join(self._sys_config['poolclass'].split('.')[:-1]) 286 ), 287 self._sys_config['poolclass'].split('.')[-1] 288 ), 289 echo = debug, 290 **_create_engine_args 291 ) 292 except Exception as e: 293 warn(f"Failed to create connector '{self}':\n{traceback.format_exc()}", stack=False) 294 engine = None 295 296 if include_uri: 297 return engine, engine_str 298 return engine
Create a sqlalchemy engine by building the engine string.
24def read( 25 self, 26 query_or_table: Union[str, sqlalchemy.Query], 27 params: Union[Dict[str, Any], List[str], None] = None, 28 dtype: Optional[Dict[str, Any]] = None, 29 coerce_float: bool = True, 30 chunksize: Optional[int] = -1, 31 workers: Optional[int] = None, 32 chunk_hook: Optional[Callable[[pandas.DataFrame], Any]] = None, 33 as_hook_results: bool = False, 34 chunks: Optional[int] = None, 35 schema: Optional[str] = None, 36 as_chunks: bool = False, 37 as_iterator: bool = False, 38 as_dask: bool = False, 39 index_col: Optional[str] = None, 40 silent: bool = False, 41 debug: bool = False, 42 **kw: Any 43 ) -> Union[ 44 pandas.DataFrame, 45 dask.DataFrame, 46 List[pandas.DataFrame], 47 List[Any], 48 None, 49 ]: 50 """ 51 Read a SQL query or table into a pandas dataframe. 52 53 Parameters 54 ---------- 55 query_or_table: Union[str, sqlalchemy.Query] 56 The SQL query (sqlalchemy Query or string) or name of the table from which to select. 57 58 params: Optional[Dict[str, Any]], default None 59 `List` or `Dict` of parameters to pass to `pandas.read_sql()`. 60 See the pandas documentation for more information: 61 https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html 62 63 dtype: Optional[Dict[str, Any]], default None 64 A dictionary of data types to pass to `pandas.read_sql()`. 65 See the pandas documentation for more information: 66 https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html 67 68 chunksize: Optional[int], default -1 69 How many chunks to read at a time. `None` will read everything in one large chunk. 70 Defaults to system configuration. 71 72 **NOTE:** DuckDB does not allow for chunking. 73 74 workers: Optional[int], default None 75 How many threads to use when consuming the generator. 76 Only applies if `chunk_hook` is provided. 77 78 chunk_hook: Optional[Callable[[pandas.DataFrame], Any]], default None 79 Hook function to execute once per chunk, e.g. writing and reading chunks intermittently. 80 See `--sync-chunks` for an example. 81 **NOTE:** `as_iterator` MUST be False (default). 82 83 as_hook_results: bool, default False 84 If `True`, return a `List` of the outputs of the hook function. 85 Only applicable if `chunk_hook` is not None. 86 87 **NOTE:** `as_iterator` MUST be `False` (default). 88 89 chunks: Optional[int], default None 90 Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and 91 return into a single dataframe. 92 For example, to limit the returned dataframe to 100,000 rows, 93 you could specify a `chunksize` of `1000` and `chunks` of `100`. 94 95 schema: Optional[str], default None 96 If just a table name is provided, optionally specify the table schema. 97 Defaults to `SQLConnector.schema`. 98 99 as_chunks: bool, default False 100 If `True`, return a list of DataFrames. 101 Otherwise return a single DataFrame. 102 103 as_iterator: bool, default False 104 If `True`, return the pandas DataFrame iterator. 105 `chunksize` must not be `None` (falls back to 1000 if so), 106 and hooks are not called in this case. 107 108 index_col: Optional[str], default None 109 If using Dask, use this column as the index column. 110 If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame. 111 112 silent: bool, default False 113 If `True`, don't raise warnings in case of errors. 114 Defaults to `False`. 115 116 Returns 117 ------- 118 A `pd.DataFrame` (default case), or an iterator, or a list of dataframes / iterators, 119 or `None` if something breaks. 120 121 """ 122 if chunks is not None and chunks <= 0: 123 return [] 124 from meerschaum.utils.sql import sql_item_name, truncate_item_name 125 from meerschaum.utils.dtypes.sql import NUMERIC_PRECISION_FLAVORS 126 from meerschaum.utils.packages import attempt_import, import_pandas 127 from meerschaum.utils.pool import get_pool 128 from meerschaum.utils.dataframe import chunksize_to_npartitions, get_numeric_cols 129 import warnings 130 import inspect 131 import traceback 132 from decimal import Decimal 133 pd = import_pandas() 134 dd = None 135 is_dask = 'dask' in pd.__name__ 136 pd = attempt_import('pandas') 137 is_dask = dd is not None 138 npartitions = chunksize_to_npartitions(chunksize) 139 if is_dask: 140 chunksize = None 141 schema = schema or self.schema 142 143 sqlalchemy = attempt_import("sqlalchemy") 144 default_chunksize = self._sys_config.get('chunksize', None) 145 chunksize = chunksize if chunksize != -1 else default_chunksize 146 if chunksize is None and as_iterator: 147 if not silent and self.flavor not in _disallow_chunks_flavors: 148 warn( 149 f"An iterator may only be generated if chunksize is not None.\n" 150 + "Falling back to a chunksize of 1000.", stacklevel=3, 151 ) 152 chunksize = 1000 153 if chunksize is not None and self.flavor in _max_chunks_flavors: 154 if chunksize > _max_chunks_flavors[self.flavor]: 155 if chunksize != default_chunksize: 156 warn( 157 f"The specified chunksize of {chunksize} exceeds the maximum of " 158 + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n" 159 + f" Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.", 160 stacklevel = 3, 161 ) 162 chunksize = _max_chunks_flavors[self.flavor] 163 164 ### NOTE: A bug in duckdb_engine does not allow for chunks. 165 if chunksize is not None and self.flavor in _disallow_chunks_flavors: 166 chunksize = None 167 168 if debug: 169 import time 170 start = time.perf_counter() 171 dprint(f"[{self}]\n{query_or_table}") 172 dprint(f"[{self}] Fetching with chunksize: {chunksize}") 173 174 ### This might be sqlalchemy object or the string of a table name. 175 ### We check for spaces and quotes to see if it might be a weird table. 176 if ( 177 ' ' not in str(query_or_table) 178 or ( 179 ' ' in str(query_or_table) 180 and str(query_or_table).startswith('"') 181 and str(query_or_table).endswith('"') 182 ) 183 ): 184 truncated_table_name = truncate_item_name(str(query_or_table), self.flavor) 185 if truncated_table_name != str(query_or_table) and not silent: 186 warn( 187 f"Table '{name}' is too long for '{self.flavor}'," 188 + f" will instead create the table '{truncated_name}'." 189 ) 190 191 query_or_table = sql_item_name(str(query_or_table), self.flavor, schema) 192 if debug: 193 dprint(f"[{self}] Reading from table {query_or_table}") 194 formatted_query = sqlalchemy.text("SELECT * FROM " + str(query_or_table)) 195 str_query = f"SELECT * FROM {query_or_table}" 196 else: 197 str_query = query_or_table 198 199 formatted_query = ( 200 sqlalchemy.text(str_query) 201 if not is_dask and isinstance(str_query, str) 202 else format_sql_query_for_dask(str_query) 203 ) 204 205 chunk_list = [] 206 chunk_hook_results = [] 207 try: 208 stream_results = not as_iterator and chunk_hook is not None and chunksize is not None 209 with warnings.catch_warnings(): 210 warnings.filterwarnings('ignore', 'case sensitivity issues') 211 212 read_sql_query_kwargs = { 213 'params': params, 214 'dtype': dtype, 215 'coerce_float': coerce_float, 216 'index_col': index_col, 217 } 218 if is_dask: 219 if index_col is None: 220 dd = None 221 pd = attempt_import('pandas') 222 read_sql_query_kwargs.update({ 223 'chunksize': chunksize, 224 }) 225 else: 226 read_sql_query_kwargs.update({ 227 'chunksize': chunksize, 228 }) 229 230 if is_dask and dd is not None: 231 ddf = dd.read_sql_query( 232 formatted_query, 233 self.URI, 234 **read_sql_query_kwargs 235 ) 236 else: 237 238 with self.engine.begin() as transaction: 239 with transaction.execution_options(stream_results=stream_results) as connection: 240 chunk_generator = pd.read_sql_query( 241 formatted_query, 242 connection, 243 **read_sql_query_kwargs 244 ) 245 246 ### `stream_results` must be False (will load everything into memory). 247 if as_iterator or chunksize is None: 248 return chunk_generator 249 250 ### We must consume the generator in this context if using server-side cursors. 251 if stream_results: 252 253 pool = get_pool(workers=workers) 254 255 def _process_chunk(_chunk, _retry_on_failure: bool = True): 256 if not as_hook_results: 257 chunk_list.append(_chunk) 258 result = None 259 if chunk_hook is not None: 260 try: 261 result = chunk_hook( 262 _chunk, 263 workers = workers, 264 chunksize = chunksize, 265 debug = debug, 266 **kw 267 ) 268 except Exception as e: 269 result = False, traceback.format_exc() 270 from meerschaum.utils.formatting import get_console 271 if not silent: 272 get_console().print_exception() 273 274 ### If the chunk fails to process, try it again one more time. 275 if isinstance(result, tuple) and result[0] is False: 276 if _retry_on_failure: 277 return _process_chunk(_chunk, _retry_on_failure=False) 278 279 return result 280 281 chunk_hook_results = list(pool.imap(_process_chunk, chunk_generator)) 282 if as_hook_results: 283 return chunk_hook_results 284 285 except Exception as e: 286 if debug: 287 dprint(f"[{self}] Failed to execute query:\n\n{query_or_table}\n\n") 288 if not silent: 289 warn(str(e), stacklevel=3) 290 from meerschaum.utils.formatting import get_console 291 if not silent: 292 get_console().print_exception() 293 294 return None 295 296 if is_dask and dd is not None: 297 ddf = ddf.reset_index() 298 return ddf 299 300 chunk_list = [] 301 read_chunks = 0 302 chunk_hook_results = [] 303 if chunksize is None: 304 chunk_list.append(chunk_generator) 305 elif as_iterator: 306 return chunk_generator 307 else: 308 try: 309 for chunk in chunk_generator: 310 if chunk_hook is not None: 311 chunk_hook_results.append( 312 chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw) 313 ) 314 chunk_list.append(chunk) 315 read_chunks += 1 316 if chunks is not None and read_chunks >= chunks: 317 break 318 except Exception as e: 319 warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3) 320 from meerschaum.utils.formatting import get_console 321 if not silent: 322 get_console().print_exception() 323 324 read_chunks = 0 325 try: 326 for chunk in chunk_generator: 327 if chunk_hook is not None: 328 chunk_hook_results.append( 329 chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw) 330 ) 331 chunk_list.append(chunk) 332 read_chunks += 1 333 if chunks is not None and read_chunks >= chunks: 334 break 335 except Exception as e: 336 warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3) 337 from meerschaum.utils.formatting import get_console 338 if not silent: 339 get_console().print_exception() 340 341 return None 342 343 ### If no chunks returned, read without chunks 344 ### to get columns 345 if len(chunk_list) == 0: 346 with warnings.catch_warnings(): 347 warnings.filterwarnings('ignore', 'case sensitivity issues') 348 _ = read_sql_query_kwargs.pop('chunksize', None) 349 with self.engine.begin() as connection: 350 chunk_list.append( 351 pd.read_sql_query( 352 formatted_query, 353 connection, 354 **read_sql_query_kwargs 355 ) 356 ) 357 358 ### call the hook on any missed chunks. 359 if chunk_hook is not None and len(chunk_list) > len(chunk_hook_results): 360 for c in chunk_list[len(chunk_hook_results):]: 361 chunk_hook_results.append( 362 chunk_hook(c, chunksize=chunksize, debug=debug, **kw) 363 ) 364 365 ### chunksize is not None so must iterate 366 if debug: 367 end = time.perf_counter() 368 dprint(f"Fetched {len(chunk_list)} chunks in {round(end - start, 2)} seconds.") 369 370 if as_hook_results: 371 return chunk_hook_results 372 373 ### Skip `pd.concat()` if `as_chunks` is specified. 374 if as_chunks: 375 for c in chunk_list: 376 c.reset_index(drop=True, inplace=True) 377 for col in get_numeric_cols(c): 378 c[col] = c[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x) 379 return chunk_list 380 381 df = pd.concat(chunk_list).reset_index(drop=True) 382 ### NOTE: The calls to `canonical()` are to drop leading and trailing zeroes. 383 for col in get_numeric_cols(df): 384 df[col] = df[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x) 385 386 return df
Read a SQL query or table into a pandas dataframe.
Parameters
- query_or_table (Union[str, sqlalchemy.Query]): The SQL query (sqlalchemy Query or string) or name of the table from which to select.
- params (Optional[Dict[str, Any]], default None):
List
orDict
of parameters to pass topandas.read_sql()
. See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html - dtype (Optional[Dict[str, Any]], default None):
A dictionary of data types to pass to
pandas.read_sql()
. See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html chunksize (Optional[int], default -1): How many chunks to read at a time.
None
will read everything in one large chunk. Defaults to system configuration.NOTE: DuckDB does not allow for chunking.
- workers (Optional[int], default None):
How many threads to use when consuming the generator.
Only applies if
chunk_hook
is provided. - chunk_hook (Optional[Callable[[pandas.DataFrame], Any]], default None):
Hook function to execute once per chunk, e.g. writing and reading chunks intermittently.
See
--sync-chunks
for an example. NOTE:as_iterator
MUST be False (default). as_hook_results (bool, default False): If
True
, return aList
of the outputs of the hook function. Only applicable ifchunk_hook
is not None.NOTE:
as_iterator
MUST beFalse
(default).- chunks (Optional[int], default None):
Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and
return into a single dataframe.
For example, to limit the returned dataframe to 100,000 rows,
you could specify a
chunksize
of1000
andchunks
of100
. - schema (Optional[str], default None):
If just a table name is provided, optionally specify the table schema.
Defaults to
SQLConnector.schema
. - as_chunks (bool, default False):
If
True
, return a list of DataFrames. Otherwise return a single DataFrame. - as_iterator (bool, default False):
If
True
, return the pandas DataFrame iterator.chunksize
must not beNone
(falls back to 1000 if so), and hooks are not called in this case. - index_col (Optional[str], default None): If using Dask, use this column as the index column. If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame.
- silent (bool, default False):
If
True
, don't raise warnings in case of errors. Defaults toFalse
.
Returns
- A
pd.DataFrame
(default case), or an iterator, or a list of dataframes / iterators, - or
None
if something breaks.
389def value( 390 self, 391 query: str, 392 *args: Any, 393 use_pandas: bool = False, 394 **kw: Any 395 ) -> Any: 396 """ 397 Execute the provided query and return the first value. 398 399 Parameters 400 ---------- 401 query: str 402 The SQL query to execute. 403 404 *args: Any 405 The arguments passed to `meerschaum.connectors.sql.SQLConnector.exec` 406 if `use_pandas` is `False` (default) or to `meerschaum.connectors.sql.SQLConnector.read`. 407 408 use_pandas: bool, default False 409 If `True`, use `meerschaum.connectors.SQLConnector.read`, otherwise use 410 `meerschaum.connectors.sql.SQLConnector.exec` (default). 411 **NOTE:** This is always `True` for DuckDB. 412 413 **kw: Any 414 See `args`. 415 416 Returns 417 ------- 418 Any value returned from the query. 419 420 """ 421 from meerschaum.utils.packages import attempt_import 422 sqlalchemy = attempt_import('sqlalchemy') 423 if self.flavor == 'duckdb': 424 use_pandas = True 425 if use_pandas: 426 try: 427 return self.read(query, *args, **kw).iloc[0, 0] 428 except Exception as e: 429 return None 430 431 _close = kw.get('close', True) 432 _commit = kw.get('commit', (self.flavor != 'mssql')) 433 try: 434 result, connection = self.exec( 435 query, 436 *args, 437 with_connection = True, 438 close = False, 439 commit = _commit, 440 **kw 441 ) 442 first = result.first() if result is not None else None 443 _val = first[0] if first is not None else None 444 except Exception as e: 445 warn(e, stacklevel=3) 446 return None 447 if _close: 448 try: 449 connection.close() 450 except Exception as e: 451 warn("Failed to close connection with exception:\n" + str(e)) 452 return _val
Execute the provided query and return the first value.
Parameters
- query (str): The SQL query to execute.
- *args (Any):
The arguments passed to
meerschaum.connectors.sql.SQLConnector.exec
ifuse_pandas
isFalse
(default) or tomeerschaum.connectors.sql.SQLConnector.read
. - use_pandas (bool, default False):
If
True
, usemeerschaum.connectors.SQLConnector.read
, otherwise usemeerschaum.connectors.sql.SQLConnector.exec
(default). NOTE: This is alwaysTrue
for DuckDB. - **kw (Any):
See
args
.
Returns
- Any value returned from the query.
466def exec( 467 self, 468 query: str, 469 *args: Any, 470 silent: bool = False, 471 debug: bool = False, 472 commit: Optional[bool] = None, 473 close: Optional[bool] = None, 474 with_connection: bool = False, 475 **kw: Any 476 ) -> Union[ 477 sqlalchemy.engine.result.resultProxy, 478 sqlalchemy.engine.cursor.LegacyCursorResult, 479 Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection], 480 Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection], 481 None 482 ]: 483 """ 484 Execute SQL code and return the `sqlalchemy` result, e.g. when calling stored procedures. 485 486 If inserting data, please use bind variables to avoid SQL injection! 487 488 Parameters 489 ---------- 490 query: Union[str, List[str], Tuple[str]] 491 The query to execute. 492 If `query` is a list or tuple, call `self.exec_queries()` instead. 493 494 args: Any 495 Arguments passed to `sqlalchemy.engine.execute`. 496 497 silent: bool, default False 498 If `True`, suppress warnings. 499 500 commit: Optional[bool], default None 501 If `True`, commit the changes after execution. 502 Causes issues with flavors like `'mssql'`. 503 This does not apply if `query` is a list of strings. 504 505 close: Optional[bool], default None 506 If `True`, close the connection after execution. 507 Causes issues with flavors like `'mssql'`. 508 This does not apply if `query` is a list of strings. 509 510 with_connection: bool, default False 511 If `True`, return a tuple including the connection object. 512 This does not apply if `query` is a list of strings. 513 514 Returns 515 ------- 516 The `sqlalchemy` result object, or a tuple with the connection if `with_connection` is provided. 517 518 """ 519 if isinstance(query, (list, tuple)): 520 return self.exec_queries( 521 list(query), 522 *args, 523 silent = silent, 524 debug = debug, 525 **kw 526 ) 527 528 from meerschaum.utils.packages import attempt_import 529 sqlalchemy = attempt_import("sqlalchemy") 530 if debug: 531 dprint(f"[{self}] Executing query:\n{query}") 532 533 _close = close if close is not None else (self.flavor != 'mssql') 534 _commit = commit if commit is not None else ( 535 (self.flavor != 'mssql' or 'select' not in str(query).lower()) 536 ) 537 538 ### Select and Insert objects need to be compiled (SQLAlchemy 2.0.0+). 539 if not hasattr(query, 'compile'): 540 query = sqlalchemy.text(query) 541 542 connection = self.engine.connect() 543 transaction = connection.begin() if _commit else None 544 try: 545 result = connection.execute(query, *args, **kw) 546 if _commit: 547 transaction.commit() 548 except Exception as e: 549 if debug: 550 dprint(f"[{self}] Failed to execute query:\n\n{query}\n\n{e}") 551 if not silent: 552 warn(str(e), stacklevel=3) 553 result = None 554 if _commit: 555 transaction.rollback() 556 finally: 557 if _close: 558 connection.close() 559 560 if with_connection: 561 return result, connection 562 563 return result
Execute SQL code and return the sqlalchemy
result, e.g. when calling stored procedures.
If inserting data, please use bind variables to avoid SQL injection!
Parameters
- query (Union[str, List[str], Tuple[str]]):
The query to execute.
If
query
is a list or tuple, callself.exec_queries()
instead. - args (Any):
Arguments passed to
sqlalchemy.engine.execute
. - silent (bool, default False):
If
True
, suppress warnings. - commit (Optional[bool], default None):
If
True
, commit the changes after execution. Causes issues with flavors like'mssql'
. This does not apply ifquery
is a list of strings. - close (Optional[bool], default None):
If
True
, close the connection after execution. Causes issues with flavors like'mssql'
. This does not apply ifquery
is a list of strings. - with_connection (bool, default False):
If
True
, return a tuple including the connection object. This does not apply ifquery
is a list of strings.
Returns
- The
sqlalchemy
result object, or a tuple with the connection ifwith_connection
is provided.
455def execute( 456 self, 457 *args : Any, 458 **kw : Any 459 ) -> Optional[sqlalchemy.engine.result.resultProxy]: 460 """ 461 An alias for `meerschaum.connectors.sql.SQLConnector.exec`. 462 """ 463 return self.exec(*args, **kw)
An alias for meerschaum.connectors.sql.SQLConnector.exec
.
660def to_sql( 661 self, 662 df: pandas.DataFrame, 663 name: str = None, 664 index: bool = False, 665 if_exists: str = 'replace', 666 method: str = "", 667 chunksize: Optional[int] = -1, 668 schema: Optional[str] = None, 669 silent: bool = False, 670 debug: bool = False, 671 as_tuple: bool = False, 672 as_dict: bool = False, 673 **kw 674 ) -> Union[bool, SuccessTuple]: 675 """ 676 Upload a DataFrame's contents to the SQL server. 677 678 Parameters 679 ---------- 680 df: pd.DataFrame 681 The DataFrame to be uploaded. 682 683 name: str 684 The name of the table to be created. 685 686 index: bool, default False 687 If True, creates the DataFrame's indices as columns. 688 689 if_exists: str, default 'replace' 690 Drop and create the table ('replace') or append if it exists 691 ('append') or raise Exception ('fail'). 692 Options are ['replace', 'append', 'fail']. 693 694 method: str, default '' 695 None or multi. Details on pandas.to_sql. 696 697 chunksize: Optional[int], default -1 698 How many rows to insert at a time. 699 700 schema: Optional[str], default None 701 Optionally override the schema for the table. 702 Defaults to `SQLConnector.schema`. 703 704 as_tuple: bool, default False 705 If `True`, return a (success_bool, message) tuple instead of a `bool`. 706 Defaults to `False`. 707 708 as_dict: bool, default False 709 If `True`, return a dictionary of transaction information. 710 The keys are `success`, `msg`, `start`, `end`, `duration`, `num_rows`, `chunksize`, 711 `method`, and `target`. 712 713 kw: Any 714 Additional arguments will be passed to the DataFrame's `to_sql` function 715 716 Returns 717 ------- 718 Either a `bool` or a `SuccessTuple` (depends on `as_tuple`). 719 """ 720 import time 721 import json 722 import decimal 723 from decimal import Decimal, Context 724 from meerschaum.utils.warnings import error, warn 725 import warnings 726 import functools 727 if name is None: 728 error(f"Name must not be `None` to insert data into {self}.") 729 730 ### We're requiring `name` to be positional, and sometimes it's passed in from background jobs. 731 kw.pop('name', None) 732 733 schema = schema or self.schema 734 735 from meerschaum.utils.sql import ( 736 sql_item_name, 737 table_exists, 738 json_flavors, 739 truncate_item_name, 740 ) 741 from meerschaum.utils.dataframe import get_json_cols, get_numeric_cols 742 from meerschaum.utils.dtypes import are_dtypes_equal, quantize_decimal 743 from meerschaum.utils.dtypes.sql import NUMERIC_PRECISION_FLAVORS 744 from meerschaum.connectors.sql._create_engine import flavor_configs 745 from meerschaum.utils.packages import attempt_import, import_pandas 746 sqlalchemy = attempt_import('sqlalchemy', debug=debug) 747 pd = import_pandas() 748 is_dask = 'dask' in df.__module__ 749 750 stats = {'target': name, } 751 ### resort to defaults if None 752 if method == "": 753 if self.flavor in _bulk_flavors: 754 method = functools.partial(psql_insert_copy, schema=self.schema) 755 else: 756 ### Should resolve to 'multi' or `None`. 757 method = flavor_configs.get(self.flavor, {}).get('to_sql', {}).get('method', 'multi') 758 stats['method'] = method.__name__ if hasattr(method, '__name__') else str(method) 759 760 default_chunksize = self._sys_config.get('chunksize', None) 761 chunksize = chunksize if chunksize != -1 else default_chunksize 762 if chunksize is not None and self.flavor in _max_chunks_flavors: 763 if chunksize > _max_chunks_flavors[self.flavor]: 764 if chunksize != default_chunksize: 765 warn( 766 f"The specified chunksize of {chunksize} exceeds the maximum of " 767 + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n" 768 + f" Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.", 769 stacklevel = 3, 770 ) 771 chunksize = _max_chunks_flavors[self.flavor] 772 stats['chunksize'] = chunksize 773 774 success, msg = False, "Default to_sql message" 775 start = time.perf_counter() 776 if debug: 777 msg = f"[{self}] Inserting {len(df)} rows with chunksize: {chunksize}..." 778 print(msg, end="", flush=True) 779 stats['num_rows'] = len(df) 780 781 ### Check if the name is too long. 782 truncated_name = truncate_item_name(name, self.flavor) 783 if name != truncated_name: 784 warn( 785 f"Table '{name}' is too long for '{self.flavor}'," 786 + f" will instead create the table '{truncated_name}'." 787 ) 788 789 ### filter out non-pandas args 790 import inspect 791 to_sql_params = inspect.signature(df.to_sql).parameters 792 to_sql_kw = {} 793 for k, v in kw.items(): 794 if k in to_sql_params: 795 to_sql_kw[k] = v 796 797 to_sql_kw.update({ 798 'name': truncated_name, 799 'schema': schema, 800 ('con' if not is_dask else 'uri'): (self.engine if not is_dask else self.URI), 801 'index': index, 802 'if_exists': if_exists, 803 'method': method, 804 'chunksize': chunksize, 805 }) 806 if is_dask: 807 to_sql_kw.update({ 808 'parallel': True, 809 }) 810 811 if self.flavor == 'oracle': 812 ### For some reason 'replace' doesn't work properly in pandas, 813 ### so try dropping first. 814 if if_exists == 'replace' and table_exists(name, self, schema=schema, debug=debug): 815 success = self.exec( 816 "DROP TABLE " + sql_item_name(name, 'oracle', schema) 817 ) is not None 818 if not success: 819 warn(f"Unable to drop {name}") 820 821 822 ### Enforce NVARCHAR(2000) as text instead of CLOB. 823 dtype = to_sql_kw.get('dtype', {}) 824 for col, typ in df.dtypes.items(): 825 if are_dtypes_equal(str(typ), 'object'): 826 dtype[col] = sqlalchemy.types.NVARCHAR(2000) 827 elif are_dtypes_equal(str(typ), 'int'): 828 dtype[col] = sqlalchemy.types.INTEGER 829 to_sql_kw['dtype'] = dtype 830 elif self.flavor == 'mssql': 831 dtype = to_sql_kw.get('dtype', {}) 832 for col, typ in df.dtypes.items(): 833 if are_dtypes_equal(str(typ), 'bool'): 834 dtype[col] = sqlalchemy.types.INTEGER 835 to_sql_kw['dtype'] = dtype 836 837 ### Check for JSON columns. 838 if self.flavor not in json_flavors: 839 json_cols = get_json_cols(df) 840 if json_cols: 841 for col in json_cols: 842 df[col] = df[col].apply( 843 ( 844 lambda x: json.dumps(x, default=str, sort_keys=True) 845 if not isinstance(x, Hashable) 846 else x 847 ) 848 ) 849 850 ### Check for numeric columns. 851 numeric_scale, numeric_precision = NUMERIC_PRECISION_FLAVORS.get(self.flavor, (None, None)) 852 if numeric_precision is not None and numeric_scale is not None: 853 numeric_cols = get_numeric_cols(df) 854 for col in numeric_cols: 855 df[col] = df[col].apply( 856 lambda x: ( 857 quantize_decimal(x, numeric_scale, numeric_precision) 858 if isinstance(x, Decimal) 859 else x 860 ) 861 ) 862 863 try: 864 with warnings.catch_warnings(): 865 warnings.filterwarnings('ignore', 'case sensitivity issues') 866 df.to_sql(**to_sql_kw) 867 success = True 868 except Exception as e: 869 if not silent: 870 warn(str(e)) 871 success, msg = False, str(e) 872 873 end = time.perf_counter() 874 if success: 875 msg = f"It took {round(end - start, 2)} seconds to sync {len(df)} rows to {name}." 876 stats['start'] = start 877 stats['end'] = end 878 stats['duration'] = end - start 879 880 if debug: 881 print(f" done.", flush=True) 882 dprint(msg) 883 884 stats['success'] = success 885 stats['msg'] = msg 886 if as_tuple: 887 return success, msg 888 if as_dict: 889 return stats 890 return success
Upload a DataFrame's contents to the SQL server.
Parameters
- df (pd.DataFrame): The DataFrame to be uploaded.
- name (str): The name of the table to be created.
- index (bool, default False): If True, creates the DataFrame's indices as columns.
- if_exists (str, default 'replace'): Drop and create the table ('replace') or append if it exists ('append') or raise Exception ('fail'). Options are ['replace', 'append', 'fail'].
- method (str, default ''): None or multi. Details on pandas.to_sql.
- chunksize (Optional[int], default -1): How many rows to insert at a time.
- schema (Optional[str], default None):
Optionally override the schema for the table.
Defaults to
SQLConnector.schema
. - as_tuple (bool, default False):
If
True
, return a (success_bool, message) tuple instead of abool
. Defaults toFalse
. - as_dict (bool, default False):
If
True
, return a dictionary of transaction information. The keys aresuccess
,msg
,start
,end
,duration
,num_rows
,chunksize
,method
, andtarget
. - kw (Any):
Additional arguments will be passed to the DataFrame's
to_sql
function
Returns
- Either a
bool
or aSuccessTuple
(depends onas_tuple
).
566def exec_queries( 567 self, 568 queries: List[ 569 Union[ 570 str, 571 Tuple[str, Callable[['sqlalchemy.orm.session.Session'], List[str]]] 572 ] 573 ], 574 break_on_error: bool = False, 575 rollback: bool = True, 576 silent: bool = False, 577 debug: bool = False, 578 ) -> List[sqlalchemy.engine.cursor.LegacyCursorResult]: 579 """ 580 Execute a list of queries in a single transaction. 581 582 Parameters 583 ---------- 584 queries: List[ 585 Union[ 586 str, 587 Tuple[str, Callable[[], List[str]]] 588 ] 589 ] 590 The queries in the transaction to be executed. 591 If a query is a tuple, the second item of the tuple 592 will be considered a callable hook that returns a list of queries to be executed 593 before the next item in the list. 594 595 break_on_error: bool, default True 596 If `True`, stop executing when a query fails. 597 598 rollback: bool, default True 599 If `break_on_error` is `True`, rollback the transaction if a query fails. 600 601 silent: bool, default False 602 If `True`, suppress warnings. 603 604 Returns 605 ------- 606 A list of SQLAlchemy results. 607 """ 608 from meerschaum.utils.warnings import warn 609 from meerschaum.utils.debug import dprint 610 from meerschaum.utils.packages import attempt_import 611 sqlalchemy, sqlalchemy_orm = attempt_import('sqlalchemy', 'sqlalchemy.orm') 612 session = sqlalchemy_orm.Session(self.engine) 613 614 result = None 615 results = [] 616 with session.begin(): 617 for query in queries: 618 hook = None 619 result = None 620 621 if isinstance(query, tuple): 622 query, hook = query 623 if isinstance(query, str): 624 query = sqlalchemy.text(query) 625 626 if debug: 627 dprint(f"[{self}]\n" + str(query)) 628 629 try: 630 result = session.execute(query) 631 session.flush() 632 except Exception as e: 633 msg = (f"Encountered error while executing:\n{e}") 634 if not silent: 635 warn(msg) 636 elif debug: 637 dprint(f"[{self}]\n" + str(msg)) 638 result = None 639 if result is None and break_on_error: 640 if rollback: 641 session.rollback() 642 break 643 elif result is not None and hook is not None: 644 hook_queries = hook(session) 645 if hook_queries: 646 hook_results = self.exec_queries( 647 hook_queries, 648 break_on_error = break_on_error, 649 rollback = rollback, 650 silent = silent, 651 debug = debug, 652 ) 653 result = (result, hook_results) 654 655 results.append(result) 656 657 return results
Execute a list of queries in a single transaction.
Parameters
- queries (List[): Union[ str, Tuple[str, Callable[[], List[str]]] ]
- ]: The queries in the transaction to be executed. If a query is a tuple, the second item of the tuple will be considered a callable hook that returns a list of queries to be executed before the next item in the list.
- break_on_error (bool, default True):
If
True
, stop executing when a query fails. - rollback (bool, default True):
If
break_on_error
isTrue
, rollback the transaction if a query fails. - silent (bool, default False):
If
True
, suppress warnings.
Returns
- A list of SQLAlchemy results.
414def test_connection( 415 self, 416 **kw: Any 417 ) -> Union[bool, None]: 418 """ 419 Test if a successful connection to the database may be made. 420 421 Parameters 422 ---------- 423 **kw: 424 The keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`. 425 426 Returns 427 ------- 428 `True` if a connection is made, otherwise `False` or `None` in case of failure. 429 430 """ 431 import warnings 432 from meerschaum.connectors.poll import retry_connect 433 _default_kw = {'max_retries': 1, 'retry_wait': 0, 'warn': False, 'connector': self} 434 _default_kw.update(kw) 435 with warnings.catch_warnings(): 436 warnings.filterwarnings('ignore', 'Could not') 437 try: 438 return retry_connect(**_default_kw) 439 except Exception as e: 440 return False
Test if a successful connection to the database may be made.
Parameters
- **kw:: The keyword arguments are passed to
meerschaum.connectors.poll.retry_connect
.
Returns
True
if a connection is made, otherwiseFalse
orNone
in case of failure.
15def fetch( 16 self, 17 pipe: meerschaum.Pipe, 18 begin: Union[datetime, int, str, None] = '', 19 end: Union[datetime, int, str, None] = None, 20 check_existing: bool = True, 21 chunk_hook: Optional[Callable[[pd.DataFrame], Any]] = None, 22 chunksize: Optional[int] = -1, 23 workers: Optional[int] = None, 24 debug: bool = False, 25 **kw: Any 26 ) -> Union['pd.DataFrame', List[Any], None]: 27 """Execute the SQL definition and return a Pandas DataFrame. 28 29 Parameters 30 ---------- 31 pipe: meerschaum.Pipe 32 The pipe object which contains the `fetch` metadata. 33 34 - pipe.columns['datetime']: str 35 - Name of the datetime column for the remote table. 36 - pipe.parameters['fetch']: Dict[str, Any] 37 - Parameters necessary to execute a query. 38 - pipe.parameters['fetch']['definition']: str 39 - Raw SQL query to execute to generate the pandas DataFrame. 40 - pipe.parameters['fetch']['backtrack_minutes']: Union[int, float] 41 - How many minutes before `begin` to search for data (*optional*). 42 43 begin: Union[datetime, int, str, None], default None 44 Most recent datatime to search for data. 45 If `backtrack_minutes` is provided, subtract `backtrack_minutes`. 46 47 end: Union[datetime, int, str, None], default None 48 The latest datetime to search for data. 49 If `end` is `None`, do not bound 50 51 check_existing: bool, defult True 52 If `False`, use a backtrack interval of 0 minutes. 53 54 chunk_hook: Callable[[pd.DataFrame], Any], default None 55 A function to pass to `SQLConnector.read()` that accepts a Pandas DataFrame. 56 57 chunksize: Optional[int], default -1 58 How many rows to load into memory at once (when `chunk_hook` is provided). 59 Otherwise the entire result set is loaded into memory. 60 61 workers: Optional[int], default None 62 How many threads to use when consuming the generator (when `chunk_hook is provided). 63 Defaults to the number of cores. 64 65 debug: bool, default False 66 Verbosity toggle. 67 68 Returns 69 ------- 70 A pandas DataFrame or `None`. 71 If `chunk_hook` is not None, return a list of the hook function's results. 72 """ 73 meta_def = self.get_pipe_metadef( 74 pipe, 75 begin = begin, 76 end = end, 77 check_existing = check_existing, 78 debug = debug, 79 **kw 80 ) 81 as_hook_results = chunk_hook is not None 82 chunks = self.read( 83 meta_def, 84 chunk_hook = chunk_hook, 85 as_hook_results = as_hook_results, 86 chunksize = chunksize, 87 workers = workers, 88 debug = debug, 89 ) 90 ### if sqlite, parse for datetimes 91 if not as_hook_results and self.flavor == 'sqlite': 92 from meerschaum.utils.misc import parse_df_datetimes 93 ignore_cols = [ 94 col 95 for col, dtype in pipe.dtypes.items() 96 if 'datetime' not in str(dtype) 97 ] 98 return ( 99 parse_df_datetimes( 100 chunk, 101 ignore_cols = ignore_cols, 102 debug = debug, 103 ) 104 for chunk in chunks 105 ) 106 return chunks
Execute the SQL definition and return a Pandas DataFrame.
Parameters
pipe (meerschaum.Pipe): The pipe object which contains the
fetch
metadata.- pipe.columns['datetime']: str
- Name of the datetime column for the remote table.
- pipe.parameters['fetch']: Dict[str, Any]
- Parameters necessary to execute a query.
- pipe.parameters['fetch']['definition']: str
- Raw SQL query to execute to generate the pandas DataFrame.
- pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
- How many minutes before
begin
to search for data (optional).
- How many minutes before
- pipe.columns['datetime']: str
- begin (Union[datetime, int, str, None], default None):
Most recent datatime to search for data.
If
backtrack_minutes
is provided, subtractbacktrack_minutes
. - end (Union[datetime, int, str, None], default None):
The latest datetime to search for data.
If
end
isNone
, do not bound - check_existing (bool, defult True):
If
False
, use a backtrack interval of 0 minutes. - chunk_hook (Callable[[pd.DataFrame], Any], default None):
A function to pass to
SQLConnector.read()
that accepts a Pandas DataFrame. - chunksize (Optional[int], default -1):
How many rows to load into memory at once (when
chunk_hook
is provided). Otherwise the entire result set is loaded into memory. - workers (Optional[int], default None): How many threads to use when consuming the generator (when `chunk_hook is provided). Defaults to the number of cores.
- debug (bool, default False): Verbosity toggle.
Returns
- A pandas DataFrame or
None
. - If
chunk_hook
is not None, return a list of the hook function's results.
109def get_pipe_metadef( 110 self, 111 pipe: meerschaum.Pipe, 112 params: Optional[Dict[str, Any]] = None, 113 begin: Union[datetime, int, str, None] = '', 114 end: Union[datetime, int, str, None] = None, 115 check_existing: bool = True, 116 debug: bool = False, 117 **kw: Any 118 ) -> Union[str, None]: 119 """ 120 Return a pipe's meta definition fetch query. 121 122 params: Optional[Dict[str, Any]], default None 123 Optional params dictionary to build the `WHERE` clause. 124 See `meerschaum.utils.sql.build_where`. 125 126 begin: Union[datetime, int, str, None], default None 127 Most recent datatime to search for data. 128 If `backtrack_minutes` is provided, subtract `backtrack_minutes`. 129 130 end: Union[datetime, int, str, None], default None 131 The latest datetime to search for data. 132 If `end` is `None`, do not bound 133 134 check_existing: bool, default True 135 If `True`, apply the backtrack interval. 136 137 debug: bool, default False 138 Verbosity toggle. 139 140 Returns 141 ------- 142 A pipe's meta definition fetch query string. 143 """ 144 from meerschaum.utils.debug import dprint 145 from meerschaum.utils.warnings import warn, error 146 from meerschaum.utils.sql import sql_item_name, dateadd_str, build_where 147 from meerschaum.utils.misc import is_int 148 from meerschaum.config import get_config 149 150 definition = get_pipe_query(pipe) 151 152 if not pipe.columns.get('datetime', None): 153 _dt = pipe.guess_datetime() 154 dt_name = sql_item_name(_dt, self.flavor, None) if _dt else None 155 is_guess = True 156 else: 157 _dt = pipe.get_columns('datetime') 158 dt_name = sql_item_name(_dt, self.flavor, None) 159 is_guess = False 160 161 if begin not in (None, '') or end is not None: 162 if is_guess: 163 if _dt is None: 164 warn( 165 f"Unable to determine a datetime column for {pipe}." 166 + "\n Ignoring begin and end...", 167 stack = False, 168 ) 169 begin, end = '', None 170 else: 171 warn( 172 f"A datetime wasn't specified for {pipe}.\n" 173 + f" Using column \"{_dt}\" for datetime bounds...", 174 stack = False 175 ) 176 177 178 apply_backtrack = begin == '' and check_existing 179 backtrack_interval = pipe.get_backtrack_interval(check_existing=check_existing, debug=debug) 180 btm = ( 181 int(backtrack_interval.total_seconds() / 60) 182 if isinstance(backtrack_interval, timedelta) 183 else backtrack_interval 184 ) 185 begin = ( 186 pipe.get_sync_time(debug=debug) 187 if begin == '' 188 else begin 189 ) 190 191 if begin and end and begin >= end: 192 begin = None 193 194 da = None 195 if dt_name: 196 begin_da = ( 197 dateadd_str( 198 flavor = self.flavor, 199 datepart = 'minute', 200 number = ((-1 * btm) if apply_backtrack else 0), 201 begin = begin, 202 ) 203 if begin 204 else None 205 ) 206 end_da = ( 207 dateadd_str( 208 flavor = self.flavor, 209 datepart = 'minute', 210 number = 0, 211 begin = end, 212 ) 213 if end 214 else None 215 ) 216 217 meta_def = ( 218 _simple_fetch_query(pipe) if ( 219 (not (pipe.columns or {}).get('id', None)) 220 or (not get_config('system', 'experimental', 'join_fetch')) 221 ) else _join_fetch_query(pipe, debug=debug, **kw) 222 ) 223 224 has_where = 'where' in meta_def.lower()[meta_def.lower().rfind('definition'):] 225 if dt_name and (begin_da or end_da): 226 definition_dt_name = ( 227 dateadd_str(self.flavor, 'minute', 0, f"definition.{dt_name}") 228 if not is_int((begin_da or end_da)) 229 else f"definition.{dt_name}" 230 ) 231 meta_def += "\n" + ("AND" if has_where else "WHERE") + " " 232 has_where = True 233 if begin_da: 234 meta_def += f"{definition_dt_name} >= {begin_da}" 235 if begin_da and end_da: 236 meta_def += " AND " 237 if end_da: 238 meta_def += f"{definition_dt_name} < {end_da}" 239 240 if params is not None: 241 params_where = build_where(params, self, with_where=False) 242 meta_def += "\n" + ("AND" if has_where else "WHERE") + " " 243 has_where = True 244 meta_def += params_where 245 246 return meta_def
Return a pipe's meta definition fetch query.
params: Optional[Dict[str, Any]], default None
Optional params dictionary to build the WHERE
clause.
See meerschaum.utils.sql.build_where
.
begin: Union[datetime, int, str, None], default None
Most recent datatime to search for data.
If backtrack_minutes
is provided, subtract backtrack_minutes
.
end: Union[datetime, int, str, None], default None
The latest datetime to search for data.
If end
is None
, do not bound
check_existing: bool, default True
If True
, apply the backtrack interval.
debug: bool, default False Verbosity toggle.
Returns
- A pipe's meta definition fetch query string.
35def cli( 36 self, 37 debug: bool = False, 38 ) -> SuccessTuple: 39 """ 40 Launch a subprocess for an interactive CLI. 41 """ 42 from meerschaum.utils.venv import venv_exec 43 env = copy.deepcopy(dict(os.environ)) 44 env[f'MRSM_SQL_{self.label.upper()}'] = json.dumps(self.meta) 45 cli_code = ( 46 "import meerschaum as mrsm\n" 47 f"conn = mrsm.get_connector('sql:{self.label}')\n" 48 f"conn._cli_exit()" 49 ) 50 try: 51 _ = venv_exec(cli_code, venv=None, debug=debug, capture_output=False) 52 except Exception as e: 53 return False, f"[{self}] Failed to start CLI:\n{e}" 54 return True, "Success"
Launch a subprocess for an interactive CLI.
143def fetch_pipes_keys( 144 self, 145 connector_keys: Optional[List[str]] = None, 146 metric_keys: Optional[List[str]] = None, 147 location_keys: Optional[List[str]] = None, 148 tags: Optional[List[str]] = None, 149 params: Optional[Dict[str, Any]] = None, 150 debug: bool = False 151 ) -> Optional[List[Tuple[str, str, Optional[str]]]]: 152 """ 153 Return a list of tuples corresponding to the parameters provided. 154 155 Parameters 156 ---------- 157 connector_keys: Optional[List[str]], default None 158 List of connector_keys to search by. 159 160 metric_keys: Optional[List[str]], default None 161 List of metric_keys to search by. 162 163 location_keys: Optional[List[str]], default None 164 List of location_keys to search by. 165 166 params: Optional[Dict[str, Any]], default None 167 Dictionary of additional parameters to search by. 168 E.g. `--params pipe_id:1` 169 170 debug: bool, default False 171 Verbosity toggle. 172 """ 173 from meerschaum.utils.debug import dprint 174 from meerschaum.utils.packages import attempt_import 175 from meerschaum.utils.misc import separate_negation_values, flatten_list 176 from meerschaum.utils.sql import OMIT_NULLSFIRST_FLAVORS, table_exists 177 from meerschaum.config.static import STATIC_CONFIG 178 import json 179 from copy import deepcopy 180 sqlalchemy, sqlalchemy_sql_functions = attempt_import('sqlalchemy', 'sqlalchemy.sql.functions') 181 coalesce = sqlalchemy_sql_functions.coalesce 182 183 if connector_keys is None: 184 connector_keys = [] 185 if metric_keys is None: 186 metric_keys = [] 187 if location_keys is None: 188 location_keys = [] 189 else: 190 location_keys = [ 191 ( 192 lk 193 if lk not in ('[None]', 'None', 'null') 194 else 'None' 195 ) 196 for lk in location_keys 197 ] 198 if tags is None: 199 tags = [] 200 201 if params is None: 202 params = {} 203 204 ### Add three primary keys to params dictionary 205 ### (separated for convenience of arguments). 206 cols = { 207 'connector_keys': [str(ck) for ck in connector_keys], 208 'metric_key': [str(mk) for mk in metric_keys], 209 'location_key': [str(lk) for lk in location_keys], 210 } 211 212 ### Make deep copy so we don't mutate this somewhere else. 213 parameters = deepcopy(params) 214 for col, vals in cols.items(): 215 if vals not in [[], ['*']]: 216 parameters[col] = vals 217 218 if not table_exists('mrsm_pipes', self, schema=self.instance_schema, debug=debug): 219 return [] 220 221 from meerschaum.connectors.sql.tables import get_tables 222 pipes_tbl = get_tables(mrsm_instance=self, create=False, debug=debug)['pipes'] 223 224 _params = {} 225 for k, v in parameters.items(): 226 _v = json.dumps(v) if isinstance(v, dict) else v 227 _params[k] = _v 228 229 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 230 ### Parse regular params. 231 ### If a param begins with '_', negate it instead. 232 _where = [ 233 ( 234 (coalesce(pipes_tbl.c[key], 'None') == val) 235 if not str(val).startswith(negation_prefix) 236 else (pipes_tbl.c[key] != key) 237 ) for key, val in _params.items() 238 if not isinstance(val, (list, tuple)) and key in pipes_tbl.c 239 ] 240 select_cols = ( 241 [ 242 pipes_tbl.c.connector_keys, 243 pipes_tbl.c.metric_key, 244 pipes_tbl.c.location_key, 245 ] 246 ) 247 248 q = sqlalchemy.select(*select_cols).where(sqlalchemy.and_(True, *_where)) 249 for c, vals in cols.items(): 250 if not isinstance(vals, (list, tuple)) or not vals or not c in pipes_tbl.c: 251 continue 252 _in_vals, _ex_vals = separate_negation_values(vals) 253 q = q.where(coalesce(pipes_tbl.c[c], 'None').in_(_in_vals)) if _in_vals else q 254 q = q.where(coalesce(pipes_tbl.c[c], 'None').not_in(_ex_vals)) if _ex_vals else q 255 256 ### Finally, parse tags. 257 tag_groups = [tag.split(',') for tag in tags] 258 in_ex_tag_groups = [separate_negation_values(tag_group) for tag_group in tag_groups] 259 260 ors, nands = [], [] 261 for _in_tags, _ex_tags in in_ex_tag_groups: 262 sub_ands = [] 263 for nt in _in_tags: 264 sub_ands.append( 265 sqlalchemy.cast( 266 pipes_tbl.c['parameters'], 267 sqlalchemy.String, 268 ).like(f'%"tags":%"{nt}"%') 269 ) 270 if sub_ands: 271 ors.append(sqlalchemy.and_(*sub_ands)) 272 273 for xt in _ex_tags: 274 nands.append( 275 sqlalchemy.cast( 276 pipes_tbl.c['parameters'], 277 sqlalchemy.String, 278 ).not_like(f'%"tags":%"{xt}"%') 279 ) 280 281 q = q.where(sqlalchemy.and_(*nands)) if nands else q 282 q = q.where(sqlalchemy.or_(*ors)) if ors else q 283 loc_asc = sqlalchemy.asc(pipes_tbl.c['location_key']) 284 if self.flavor not in OMIT_NULLSFIRST_FLAVORS: 285 loc_asc = sqlalchemy.nullsfirst(loc_asc) 286 q = q.order_by( 287 sqlalchemy.asc(pipes_tbl.c['connector_keys']), 288 sqlalchemy.asc(pipes_tbl.c['metric_key']), 289 loc_asc, 290 ) 291 292 ### execute the query and return a list of tuples 293 if debug: 294 dprint(q.compile(compile_kwargs={'literal_binds': True})) 295 try: 296 rows = ( 297 self.execute(q).fetchall() 298 if self.flavor != 'duckdb' 299 else [ 300 (row['connector_keys'], row['metric_key'], row['location_key']) 301 for row in self.read(q).to_dict(orient='records') 302 ] 303 ) 304 except Exception as e: 305 error(str(e)) 306 307 return [(row[0], row[1], row[2]) for row in rows]
Return a list of tuples corresponding to the parameters provided.
Parameters
- connector_keys (Optional[List[str]], default None): List of connector_keys to search by.
- metric_keys (Optional[List[str]], default None): List of metric_keys to search by.
- location_keys (Optional[List[str]], default None): List of location_keys to search by.
- params (Optional[Dict[str, Any]], default None):
Dictionary of additional parameters to search by.
E.g.
--params pipe_id:1
- debug (bool, default False): Verbosity toggle.
310def create_indices( 311 self, 312 pipe: mrsm.Pipe, 313 indices: Optional[List[str]] = None, 314 debug: bool = False 315 ) -> bool: 316 """ 317 Create a pipe's indices. 318 """ 319 from meerschaum.utils.sql import sql_item_name, update_queries 320 from meerschaum.utils.debug import dprint 321 if debug: 322 dprint(f"Creating indices for {pipe}...") 323 if not pipe.columns: 324 warn(f"{pipe} has no index columns; skipping index creation.", stack=False) 325 return True 326 327 ix_queries = { 328 ix: queries 329 for ix, queries in self.get_create_index_queries(pipe, debug=debug).items() 330 if indices is None or ix in indices 331 } 332 success = True 333 for ix, queries in ix_queries.items(): 334 ix_success = all(self.exec_queries(queries, debug=debug, silent=False)) 335 success = success and ix_success 336 if not ix_success: 337 warn(f"Failed to create index on column: {ix}") 338 339 return success
Create a pipe's indices.
342def drop_indices( 343 self, 344 pipe: mrsm.Pipe, 345 indices: Optional[List[str]] = None, 346 debug: bool = False 347 ) -> bool: 348 """ 349 Drop a pipe's indices. 350 """ 351 from meerschaum.utils.debug import dprint 352 if debug: 353 dprint(f"Dropping indices for {pipe}...") 354 if not pipe.columns: 355 warn(f"Unable to drop indices for {pipe} without columns.", stack=False) 356 return False 357 ix_queries = { 358 ix: queries 359 for ix, queries in self.get_drop_index_queries(pipe, debug=debug).items() 360 if indices is None or ix in indices 361 } 362 success = True 363 for ix, queries in ix_queries.items(): 364 ix_success = all(self.exec_queries(queries, debug=debug, silent=True)) 365 if not ix_success: 366 success = False 367 if debug: 368 dprint(f"Failed to drop index on column: {ix}") 369 return success
Drop a pipe's indices.
372def get_create_index_queries( 373 self, 374 pipe: mrsm.Pipe, 375 debug: bool = False, 376 ) -> Dict[str, List[str]]: 377 """ 378 Return a dictionary mapping columns to a `CREATE INDEX` or equivalent query. 379 380 Parameters 381 ---------- 382 pipe: mrsm.Pipe 383 The pipe to which the queries will correspond. 384 385 Returns 386 ------- 387 A dictionary of column names mapping to lists of queries. 388 """ 389 from meerschaum.utils.sql import ( 390 sql_item_name, 391 get_distinct_col_count, 392 update_queries, 393 get_null_replacement, 394 COALESCE_UNIQUE_INDEX_FLAVORS, 395 ) 396 from meerschaum.config import get_config 397 index_queries = {} 398 399 upsert = pipe.parameters.get('upsert', False) and (self.flavor + '-upsert') in update_queries 400 indices = pipe.get_indices() 401 402 _datetime = pipe.get_columns('datetime', error=False) 403 _datetime_type = pipe.dtypes.get(_datetime, 'datetime64[ns]') 404 _datetime_name = ( 405 sql_item_name(_datetime, self.flavor, None) 406 if _datetime is not None else None 407 ) 408 _datetime_index_name = ( 409 sql_item_name(indices['datetime'], self.flavor, None) 410 if indices.get('datetime', None) 411 else None 412 ) 413 _id = pipe.get_columns('id', error=False) 414 _id_name = ( 415 sql_item_name(_id, self.flavor, None) 416 if _id is not None 417 else None 418 ) 419 420 _id_index_name = ( 421 sql_item_name(indices['id'], self.flavor, None) 422 if indices.get('id', None) 423 else None 424 ) 425 _pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 426 _create_space_partition = get_config('system', 'experimental', 'space') 427 428 ### create datetime index 429 if _datetime is not None: 430 if self.flavor == 'timescaledb' and pipe.parameters.get('hypertable', True): 431 _id_count = ( 432 get_distinct_col_count(_id, f"SELECT {_id_name} FROM {_pipe_name}", self) 433 if (_id is not None and _create_space_partition) else None 434 ) 435 436 chunk_interval = pipe.get_chunk_interval(debug=debug) 437 chunk_interval_minutes = ( 438 chunk_interval 439 if isinstance(chunk_interval, int) 440 else int(chunk_interval.total_seconds() / 60) 441 ) 442 chunk_time_interval = ( 443 f"INTERVAL '{chunk_interval_minutes} MINUTES'" 444 if isinstance(chunk_interval, timedelta) 445 else f'{chunk_interval_minutes}' 446 ) 447 448 dt_query = ( 449 f"SELECT public.create_hypertable('{_pipe_name}', " + 450 f"'{_datetime}', " 451 + ( 452 f"'{_id}', {_id_count}, " if (_id is not None and _create_space_partition) 453 else '' 454 ) 455 + f'chunk_time_interval => {chunk_time_interval}, ' 456 + 'if_not_exists => true, ' 457 + "migrate_data => true);" 458 ) 459 else: ### mssql, sqlite, etc. 460 dt_query = ( 461 f"CREATE INDEX {_datetime_index_name} " 462 + f"ON {_pipe_name} ({_datetime_name})" 463 ) 464 465 index_queries[_datetime] = [dt_query] 466 467 ### create id index 468 if _id_name is not None: 469 if self.flavor == 'timescaledb': 470 ### Already created indices via create_hypertable. 471 id_query = ( 472 None if (_id is not None and _create_space_partition) 473 else ( 474 f"CREATE INDEX IF NOT EXISTS {_id_index_name} ON {_pipe_name} ({_id_name})" 475 if _id is not None 476 else None 477 ) 478 ) 479 pass 480 else: ### mssql, sqlite, etc. 481 id_query = f"CREATE INDEX {_id_index_name} ON {_pipe_name} ({_id_name})" 482 483 if id_query is not None: 484 index_queries[_id] = id_query if isinstance(id_query, list) else [id_query] 485 486 487 ### Create indices for other labels in `pipe.columns`. 488 other_indices = { 489 ix_key: ix_unquoted 490 for ix_key, ix_unquoted in pipe.get_indices().items() 491 if ix_key not in ('datetime', 'id') 492 } 493 for ix_key, ix_unquoted in other_indices.items(): 494 ix_name = sql_item_name(ix_unquoted, self.flavor, None) 495 col = pipe.columns[ix_key] 496 col_name = sql_item_name(col, self.flavor, None) 497 index_queries[col] = [f"CREATE INDEX {ix_name} ON {_pipe_name} ({col_name})"] 498 499 existing_cols_types = pipe.get_columns_types(debug=debug) 500 indices_cols_str = ', '.join( 501 [ 502 sql_item_name(ix, self.flavor) 503 for ix_key, ix in pipe.columns.items() 504 if ix and ix in existing_cols_types 505 ] 506 ) 507 coalesce_indices_cols_str = ', '.join( 508 [ 509 ( 510 "COALESCE(" 511 + sql_item_name(ix, self.flavor) 512 + ", " 513 + get_null_replacement(existing_cols_types[ix], self.flavor) 514 + ") " 515 ) if ix_key != 'datetime' else (sql_item_name(ix, self.flavor)) 516 for ix_key, ix in pipe.columns.items() 517 if ix and ix in existing_cols_types 518 ] 519 ) 520 unique_index_name = sql_item_name(pipe.target + '_unique_index', self.flavor) 521 constraint_name = sql_item_name(pipe.target + '_constraint', self.flavor) 522 add_constraint_query = ( 523 f"ALTER TABLE {_pipe_name} ADD CONSTRAINT {constraint_name} UNIQUE ({indices_cols_str})" 524 ) 525 unique_index_cols_str = ( 526 indices_cols_str 527 if self.flavor not in COALESCE_UNIQUE_INDEX_FLAVORS 528 else coalesce_indices_cols_str 529 ) 530 create_unique_index_query = ( 531 f"CREATE UNIQUE INDEX {unique_index_name} ON {_pipe_name} ({unique_index_cols_str})" 532 ) 533 constraint_queries = [create_unique_index_query] 534 if self.flavor != 'sqlite': 535 constraint_queries.append(add_constraint_query) 536 if upsert and indices_cols_str: 537 index_queries[unique_index_name] = constraint_queries 538 return index_queries
Return a dictionary mapping columns to a CREATE INDEX
or equivalent query.
Parameters
- pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
- A dictionary of column names mapping to lists of queries.
541def get_drop_index_queries( 542 self, 543 pipe: mrsm.Pipe, 544 debug: bool = False, 545 ) -> Dict[str, List[str]]: 546 """ 547 Return a dictionary mapping columns to a `DROP INDEX` or equivalent query. 548 549 Parameters 550 ---------- 551 pipe: mrsm.Pipe 552 The pipe to which the queries will correspond. 553 554 Returns 555 ------- 556 A dictionary of column names mapping to lists of queries. 557 """ 558 if not pipe.exists(debug=debug): 559 return {} 560 from meerschaum.utils.sql import sql_item_name, table_exists, hypertable_queries 561 drop_queries = {} 562 schema = self.get_pipe_schema(pipe) 563 schema_prefix = (schema + '_') if schema else '' 564 indices = { 565 col: schema_prefix + ix 566 for col, ix in pipe.get_indices().items() 567 } 568 pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 569 pipe_name_no_schema = sql_item_name(pipe.target, self.flavor, None) 570 571 if self.flavor not in hypertable_queries: 572 is_hypertable = False 573 else: 574 is_hypertable_query = hypertable_queries[self.flavor].format(table_name=pipe_name) 575 is_hypertable = self.value(is_hypertable_query, silent=True, debug=debug) is not None 576 577 if is_hypertable: 578 nuke_queries = [] 579 temp_table = '_' + pipe.target + '_temp_migration' 580 temp_table_name = sql_item_name(temp_table, self.flavor, self.get_pipe_schema(pipe)) 581 582 if table_exists(temp_table, self, schema=self.get_pipe_schema(pipe), debug=debug): 583 nuke_queries.append(f"DROP TABLE {temp_table_name}") 584 nuke_queries += [ 585 f"SELECT * INTO {temp_table_name} FROM {pipe_name}", 586 f"DROP TABLE {pipe_name}", 587 f"ALTER TABLE {temp_table_name} RENAME TO {pipe_name_no_schema}", 588 ] 589 nuke_ix_keys = ('datetime', 'id') 590 nuked = False 591 for ix_key in nuke_ix_keys: 592 if ix_key in indices and not nuked: 593 drop_queries[ix_key] = nuke_queries 594 nuked = True 595 596 drop_queries.update({ 597 ix_key: ["DROP INDEX " + sql_item_name(ix_unquoted, self.flavor, None)] 598 for ix_key, ix_unquoted in indices.items() 599 if ix_key not in drop_queries 600 }) 601 return drop_queries
Return a dictionary mapping columns to a DROP INDEX
or equivalent query.
Parameters
- pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
- A dictionary of column names mapping to lists of queries.
2392def get_add_columns_queries( 2393 self, 2394 pipe: mrsm.Pipe, 2395 df: Union[pd.DataFrame, Dict[str, str]], 2396 debug: bool = False, 2397 ) -> List[str]: 2398 """ 2399 Add new null columns of the correct type to a table from a dataframe. 2400 2401 Parameters 2402 ---------- 2403 pipe: mrsm.Pipe 2404 The pipe to be altered. 2405 2406 df: Union[pd.DataFrame, Dict[str, str]] 2407 The pandas DataFrame which contains new columns. 2408 If a dictionary is provided, assume it maps columns to Pandas data types. 2409 2410 Returns 2411 ------- 2412 A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector. 2413 """ 2414 if not pipe.exists(debug=debug): 2415 return [] 2416 2417 from decimal import Decimal 2418 import copy 2419 from meerschaum.utils.sql import ( 2420 sql_item_name, 2421 SINGLE_ALTER_TABLE_FLAVORS, 2422 ) 2423 from meerschaum.utils.dtypes.sql import ( 2424 get_pd_type_from_db_type, 2425 get_db_type_from_pd_type, 2426 ) 2427 from meerschaum.utils.misc import flatten_list 2428 table_obj = self.get_pipe_table(pipe, debug=debug) 2429 is_dask = 'dask' in df.__module__ if not isinstance(df, dict) else False 2430 if is_dask: 2431 df = df.partitions[0].compute() 2432 df_cols_types = ( 2433 { 2434 col: str(typ) 2435 for col, typ in df.dtypes.items() 2436 } 2437 if not isinstance(df, dict) 2438 else copy.deepcopy(df) 2439 ) 2440 if not isinstance(df, dict) and len(df.index) > 0: 2441 for col, typ in list(df_cols_types.items()): 2442 if typ != 'object': 2443 continue 2444 val = df.iloc[0][col] 2445 if isinstance(val, (dict, list)): 2446 df_cols_types[col] = 'json' 2447 elif isinstance(val, Decimal): 2448 df_cols_types[col] = 'numeric' 2449 elif isinstance(val, str): 2450 df_cols_types[col] = 'str' 2451 db_cols_types = { 2452 col: get_pd_type_from_db_type(str(typ.type)) 2453 for col, typ in table_obj.columns.items() 2454 } 2455 new_cols = set(df_cols_types) - set(db_cols_types) 2456 if not new_cols: 2457 return [] 2458 2459 new_cols_types = { 2460 col: get_db_type_from_pd_type( 2461 df_cols_types[col], 2462 self.flavor 2463 ) for col in new_cols 2464 } 2465 2466 alter_table_query = "ALTER TABLE " + sql_item_name( 2467 pipe.target, self.flavor, self.get_pipe_schema(pipe) 2468 ) 2469 queries = [] 2470 for col, typ in new_cols_types.items(): 2471 add_col_query = ( 2472 "\nADD " 2473 + sql_item_name(col, self.flavor, None) 2474 + " " + typ + "," 2475 ) 2476 2477 if self.flavor in SINGLE_ALTER_TABLE_FLAVORS: 2478 queries.append(alter_table_query + add_col_query[:-1]) 2479 else: 2480 alter_table_query += add_col_query 2481 2482 ### For most flavors, only one query is required. 2483 ### This covers SQLite which requires one query per column. 2484 if not queries: 2485 queries.append(alter_table_query[:-1]) 2486 2487 if self.flavor != 'duckdb': 2488 return queries 2489 2490 ### NOTE: For DuckDB, we must drop and rebuild the indices. 2491 drop_index_queries = list(flatten_list( 2492 [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()] 2493 )) 2494 create_index_queries = list(flatten_list( 2495 [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()] 2496 )) 2497 2498 return drop_index_queries + queries + create_index_queries
Add new null columns of the correct type to a table from a dataframe.
Parameters
- pipe (mrsm.Pipe): The pipe to be altered.
- df (Union[pd.DataFrame, Dict[str, str]]): The pandas DataFrame which contains new columns. If a dictionary is provided, assume it maps columns to Pandas data types.
Returns
- A list of the
ALTER TABLE
SQL query or queries to be executed on the provided connector.
2501def get_alter_columns_queries( 2502 self, 2503 pipe: mrsm.Pipe, 2504 df: Union[pd.DataFrame, Dict[str, str]], 2505 debug: bool = False, 2506 ) -> List[str]: 2507 """ 2508 If we encounter a column of a different type, set the entire column to text. 2509 If the altered columns are numeric, alter to numeric instead. 2510 2511 Parameters 2512 ---------- 2513 pipe: mrsm.Pipe 2514 The pipe to be altered. 2515 2516 df: Union[pd.DataFrame, Dict[str, str]] 2517 The pandas DataFrame which may contain altered columns. 2518 If a dict is provided, assume it maps columns to Pandas data types. 2519 2520 Returns 2521 ------- 2522 A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector. 2523 """ 2524 if not pipe.exists(debug=debug): 2525 return [] 2526 from meerschaum.utils.sql import sql_item_name 2527 from meerschaum.utils.dataframe import get_numeric_cols 2528 from meerschaum.utils.dtypes import are_dtypes_equal 2529 from meerschaum.utils.dtypes.sql import ( 2530 get_pd_type_from_db_type, 2531 get_db_type_from_pd_type, 2532 ) 2533 from meerschaum.utils.misc import flatten_list, generate_password, items_str 2534 table_obj = self.get_pipe_table(pipe, debug=debug) 2535 target = pipe.target 2536 session_id = generate_password(3) 2537 numeric_cols = ( 2538 get_numeric_cols(df) 2539 if not isinstance(df, dict) 2540 else [ 2541 col 2542 for col, typ in df.items() 2543 if typ == 'numeric' 2544 ] 2545 ) 2546 df_cols_types = ( 2547 { 2548 col: str(typ) 2549 for col, typ in df.dtypes.items() 2550 } 2551 if not isinstance(df, dict) 2552 else df 2553 ) 2554 db_cols_types = { 2555 col: get_pd_type_from_db_type(str(typ.type)) 2556 for col, typ in table_obj.columns.items() 2557 } 2558 pipe_bool_cols = [col for col, typ in pipe.dtypes.items() if are_dtypes_equal(str(typ), 'bool')] 2559 pd_db_df_aliases = { 2560 'int': 'bool', 2561 'float': 'bool', 2562 'numeric': 'bool', 2563 } 2564 if self.flavor == 'oracle': 2565 pd_db_df_aliases['int'] = 'numeric' 2566 2567 altered_cols = { 2568 col: (db_cols_types.get(col, 'object'), typ) 2569 for col, typ in df_cols_types.items() 2570 if not are_dtypes_equal(typ, db_cols_types.get(col, 'object').lower()) 2571 and not are_dtypes_equal(db_cols_types.get(col, 'object'), 'string') 2572 } 2573 2574 ### NOTE: Sometimes bools are coerced into ints or floats. 2575 altered_cols_to_ignore = set() 2576 for col, (db_typ, df_typ) in altered_cols.items(): 2577 for db_alias, df_alias in pd_db_df_aliases.items(): 2578 if db_alias in db_typ.lower() and df_alias in df_typ.lower(): 2579 altered_cols_to_ignore.add(col) 2580 2581 ### Oracle's bool handling sometimes mixes NUMBER and INT. 2582 for bool_col in pipe_bool_cols: 2583 if bool_col not in altered_cols: 2584 continue 2585 db_is_bool_compatible = ( 2586 are_dtypes_equal('int', altered_cols[bool_col][0]) 2587 or are_dtypes_equal('float', altered_cols[bool_col][0]) 2588 or are_dtypes_equal('numeric', altered_cols[bool_col][0]) 2589 or are_dtypes_equal('bool', altered_cols[bool_col][0]) 2590 ) 2591 df_is_bool_compatible = ( 2592 are_dtypes_equal('int', altered_cols[bool_col][1]) 2593 or are_dtypes_equal('float', altered_cols[bool_col][1]) 2594 or are_dtypes_equal('numeric', altered_cols[bool_col][1]) 2595 or are_dtypes_equal('bool', altered_cols[bool_col][1]) 2596 ) 2597 if db_is_bool_compatible and df_is_bool_compatible: 2598 altered_cols_to_ignore.add(bool_col) 2599 2600 for col in altered_cols_to_ignore: 2601 _ = altered_cols.pop(col, None) 2602 if not altered_cols: 2603 return [] 2604 2605 if numeric_cols: 2606 pipe.dtypes.update({col: 'numeric' for col in numeric_cols}) 2607 edit_success, edit_msg = pipe.edit(debug=debug) 2608 if not edit_success: 2609 warn( 2610 f"Failed to update dtypes for numeric columns {items_str(numeric_cols)}:\n" 2611 + f"{edit_msg}" 2612 ) 2613 else: 2614 numeric_cols.extend([col for col, typ in pipe.dtypes.items() if typ == 'numeric']) 2615 2616 pipe_dtypes = pipe.dtypes 2617 numeric_type = get_db_type_from_pd_type('numeric', self.flavor, as_sqlalchemy=False) 2618 text_type = get_db_type_from_pd_type('str', self.flavor, as_sqlalchemy=False) 2619 altered_cols_types = { 2620 col: ( 2621 numeric_type 2622 if col in numeric_cols 2623 else text_type 2624 ) 2625 for col, (db_typ, typ) in altered_cols.items() 2626 } 2627 2628 if self.flavor == 'sqlite': 2629 temp_table_name = '-' + session_id + '_' + target 2630 rename_query = ( 2631 "ALTER TABLE " 2632 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2633 + " RENAME TO " 2634 + sql_item_name(temp_table_name, self.flavor, None) 2635 ) 2636 create_query = ( 2637 "CREATE TABLE " 2638 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2639 + " (\n" 2640 ) 2641 for col_name, col_obj in table_obj.columns.items(): 2642 create_query += ( 2643 sql_item_name(col_name, self.flavor, None) 2644 + " " 2645 + ( 2646 str(col_obj.type) 2647 if col_name not in altered_cols 2648 else altered_cols_types[col_name] 2649 ) 2650 + ",\n" 2651 ) 2652 create_query = create_query[:-2] + "\n)" 2653 2654 insert_query = ( 2655 "INSERT INTO " 2656 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2657 + ' (' 2658 + ', '.join([ 2659 sql_item_name(col_name, self.flavor, None) 2660 for col_name, _ in table_obj.columns.items() 2661 ]) 2662 + ')' 2663 + "\nSELECT\n" 2664 ) 2665 for col_name, col_obj in table_obj.columns.items(): 2666 new_col_str = ( 2667 sql_item_name(col_name, self.flavor, None) 2668 if col_name not in altered_cols 2669 else ( 2670 "CAST(" 2671 + sql_item_name(col_name, self.flavor, None) 2672 + " AS " 2673 + altered_cols_types[col_name] 2674 + ")" 2675 ) 2676 ) 2677 insert_query += new_col_str + ",\n" 2678 insert_query = insert_query[:-2] + ( 2679 f"\nFROM {sql_item_name(temp_table_name, self.flavor, self.get_pipe_schema(pipe))}" 2680 ) 2681 2682 drop_query = "DROP TABLE " + sql_item_name( 2683 temp_table_name, self.flavor, self.get_pipe_schema(pipe) 2684 ) 2685 return [ 2686 rename_query, 2687 create_query, 2688 insert_query, 2689 drop_query, 2690 ] 2691 2692 queries = [] 2693 if self.flavor == 'oracle': 2694 for col, typ in altered_cols_types.items(): 2695 add_query = ( 2696 "ALTER TABLE " 2697 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2698 + "\nADD " + sql_item_name(col + '_temp', self.flavor, None) 2699 + " " + typ 2700 ) 2701 queries.append(add_query) 2702 2703 for col, typ in altered_cols_types.items(): 2704 populate_temp_query = ( 2705 "UPDATE " 2706 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2707 + "\nSET " + sql_item_name(col + '_temp', self.flavor, None) 2708 + ' = ' + sql_item_name(col, self.flavor, None) 2709 ) 2710 queries.append(populate_temp_query) 2711 2712 for col, typ in altered_cols_types.items(): 2713 set_old_cols_to_null_query = ( 2714 "UPDATE " 2715 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2716 + "\nSET " + sql_item_name(col, self.flavor, None) 2717 + ' = NULL' 2718 ) 2719 queries.append(set_old_cols_to_null_query) 2720 2721 for col, typ in altered_cols_types.items(): 2722 alter_type_query = ( 2723 "ALTER TABLE " 2724 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2725 + "\nMODIFY " + sql_item_name(col, self.flavor, None) + ' ' 2726 + typ 2727 ) 2728 queries.append(alter_type_query) 2729 2730 for col, typ in altered_cols_types.items(): 2731 set_old_to_temp_query = ( 2732 "UPDATE " 2733 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2734 + "\nSET " + sql_item_name(col, self.flavor, None) 2735 + ' = ' + sql_item_name(col + '_temp', self.flavor, None) 2736 ) 2737 queries.append(set_old_to_temp_query) 2738 2739 for col, typ in altered_cols_types.items(): 2740 drop_temp_query = ( 2741 "ALTER TABLE " 2742 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2743 + "\nDROP COLUMN " + sql_item_name(col + '_temp', self.flavor, None) 2744 ) 2745 queries.append(drop_temp_query) 2746 2747 return queries 2748 2749 2750 query = "ALTER TABLE " + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2751 for col, typ in altered_cols_types.items(): 2752 alter_col_prefix = ( 2753 'ALTER' if self.flavor not in ('mysql', 'mariadb', 'oracle') 2754 else 'MODIFY' 2755 ) 2756 type_prefix = ( 2757 '' if self.flavor in ('mssql', 'mariadb', 'mysql') 2758 else 'TYPE ' 2759 ) 2760 column_str = 'COLUMN' if self.flavor != 'oracle' else '' 2761 query += ( 2762 f"\n{alter_col_prefix} {column_str} " 2763 + sql_item_name(col, self.flavor, None) 2764 + " " + type_prefix + typ + "," 2765 ) 2766 2767 query = query[:-1] 2768 queries.append(query) 2769 if self.flavor != 'duckdb': 2770 return queries 2771 2772 drop_index_queries = list(flatten_list( 2773 [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()] 2774 )) 2775 create_index_queries = list(flatten_list( 2776 [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()] 2777 )) 2778 2779 return drop_index_queries + queries + create_index_queries
If we encounter a column of a different type, set the entire column to text. If the altered columns are numeric, alter to numeric instead.
Parameters
- pipe (mrsm.Pipe): The pipe to be altered.
- df (Union[pd.DataFrame, Dict[str, str]]): The pandas DataFrame which may contain altered columns. If a dict is provided, assume it maps columns to Pandas data types.
Returns
- A list of the
ALTER TABLE
SQL query or queries to be executed on the provided connector.
604def delete_pipe( 605 self, 606 pipe: mrsm.Pipe, 607 debug: bool = False, 608 ) -> SuccessTuple: 609 """ 610 Delete a Pipe's registration. 611 """ 612 from meerschaum.utils.sql import sql_item_name 613 from meerschaum.utils.debug import dprint 614 from meerschaum.utils.packages import attempt_import 615 sqlalchemy = attempt_import('sqlalchemy') 616 617 if not pipe.id: 618 return False, f"{pipe} is not registered." 619 620 ### ensure pipes table exists 621 from meerschaum.connectors.sql.tables import get_tables 622 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 623 624 q = sqlalchemy.delete(pipes_tbl).where(pipes_tbl.c.pipe_id == pipe.id) 625 if not self.exec(q, debug=debug): 626 return False, f"Failed to delete registration for {pipe}." 627 628 return True, "Success"
Delete a Pipe's registration.
631def get_pipe_data( 632 self, 633 pipe: mrsm.Pipe, 634 select_columns: Optional[List[str]] = None, 635 omit_columns: Optional[List[str]] = None, 636 begin: Union[datetime, str, None] = None, 637 end: Union[datetime, str, None] = None, 638 params: Optional[Dict[str, Any]] = None, 639 order: str = 'asc', 640 limit: Optional[int] = None, 641 begin_add_minutes: int = 0, 642 end_add_minutes: int = 0, 643 debug: bool = False, 644 **kw: Any 645 ) -> Union[pd.DataFrame, None]: 646 """ 647 Access a pipe's data from the SQL instance. 648 649 Parameters 650 ---------- 651 pipe: mrsm.Pipe: 652 The pipe to get data from. 653 654 select_columns: Optional[List[str]], default None 655 If provided, only select these given columns. 656 Otherwise select all available columns (i.e. `SELECT *`). 657 658 omit_columns: Optional[List[str]], default None 659 If provided, remove these columns from the selection. 660 661 begin: Union[datetime, str, None], default None 662 If provided, get rows newer than or equal to this value. 663 664 end: Union[datetime, str, None], default None 665 If provided, get rows older than or equal to this value. 666 667 params: Optional[Dict[str, Any]], default None 668 Additional parameters to filter by. 669 See `meerschaum.connectors.sql.build_where`. 670 671 order: Optional[str], default 'asc' 672 The selection order for all of the indices in the query. 673 If `None`, omit the `ORDER BY` clause. 674 675 limit: Optional[int], default None 676 If specified, limit the number of rows retrieved to this value. 677 678 begin_add_minutes: int, default 0 679 The number of minutes to add to the `begin` datetime (i.e. `DATEADD`. 680 681 end_add_minutes: int, default 0 682 The number of minutes to add to the `end` datetime (i.e. `DATEADD`. 683 684 chunksize: Optional[int], default -1 685 The size of dataframe chunks to load into memory. 686 687 debug: bool, default False 688 Verbosity toggle. 689 690 Returns 691 ------- 692 A `pd.DataFrame` of the pipe's data. 693 694 """ 695 import json 696 from meerschaum.utils.sql import sql_item_name 697 from meerschaum.utils.misc import parse_df_datetimes, to_pandas_dtype 698 from meerschaum.utils.packages import import_pandas 699 from meerschaum.utils.dtypes import attempt_cast_to_numeric 700 pd = import_pandas() 701 is_dask = 'dask' in pd.__name__ 702 703 dtypes = pipe.dtypes 704 if dtypes: 705 if self.flavor == 'sqlite': 706 if not pipe.columns.get('datetime', None): 707 _dt = pipe.guess_datetime() 708 dt = sql_item_name(_dt, self.flavor, None) if _dt else None 709 is_guess = True 710 else: 711 _dt = pipe.get_columns('datetime') 712 dt = sql_item_name(_dt, self.flavor, None) 713 is_guess = False 714 715 if _dt: 716 dt_type = dtypes.get(_dt, 'object').lower() 717 if 'datetime' not in dt_type: 718 if 'int' not in dt_type: 719 dtypes[_dt] = 'datetime64[ns]' 720 existing_cols = pipe.get_columns_types(debug=debug) 721 select_columns = ( 722 [ 723 col 724 for col in existing_cols 725 if col not in (omit_columns or []) 726 ] 727 if not select_columns 728 else [ 729 col 730 for col in select_columns 731 if col in existing_cols 732 and col not in (omit_columns or []) 733 ] 734 ) 735 if select_columns: 736 dtypes = {col: typ for col, typ in dtypes.items() if col in select_columns} 737 dtypes = { 738 col: to_pandas_dtype(typ) 739 for col, typ in dtypes.items() 740 if col in select_columns and col not in (omit_columns or []) 741 } 742 query = self.get_pipe_data_query( 743 pipe, 744 select_columns = select_columns, 745 omit_columns = omit_columns, 746 begin = begin, 747 end = end, 748 params = params, 749 order = order, 750 limit = limit, 751 begin_add_minutes = begin_add_minutes, 752 end_add_minutes = end_add_minutes, 753 debug = debug, 754 **kw 755 ) 756 757 if is_dask: 758 index_col = pipe.columns.get('datetime', None) 759 kw['index_col'] = index_col 760 761 numeric_columns = [ 762 col 763 for col, typ in pipe.dtypes.items() 764 if typ == 'numeric' and col in dtypes 765 ] 766 kw['coerce_float'] = kw.get('coerce_float', (len(numeric_columns) == 0)) 767 df = self.read( 768 query, 769 dtype = dtypes, 770 debug = debug, 771 **kw 772 ) 773 for col in numeric_columns: 774 if col not in df.columns: 775 continue 776 df[col] = df[col].apply(attempt_cast_to_numeric) 777 778 if self.flavor == 'sqlite': 779 ### NOTE: We have to consume the iterator here to ensure that datetimes are parsed correctly 780 df = ( 781 parse_df_datetimes( 782 df, 783 ignore_cols = [ 784 col 785 for col, dtype in pipe.dtypes.items() 786 if 'datetime' not in str(dtype) 787 ], 788 chunksize = kw.get('chunksize', None), 789 debug = debug, 790 ) if isinstance(df, pd.DataFrame) else ( 791 [ 792 parse_df_datetimes( 793 c, 794 ignore_cols = [ 795 col 796 for col, dtype in pipe.dtypes.items() 797 if 'datetime' not in str(dtype) 798 ], 799 chunksize = kw.get('chunksize', None), 800 debug = debug, 801 ) 802 for c in df 803 ] 804 ) 805 ) 806 for col, typ in dtypes.items(): 807 if typ != 'json': 808 continue 809 df[col] = df[col].apply(lambda x: json.loads(x) if x is not None else x) 810 return df
Access a pipe's data from the SQL instance.
Parameters
- pipe (mrsm.Pipe:): The pipe to get data from.
- select_columns (Optional[List[str]], default None):
If provided, only select these given columns.
Otherwise select all available columns (i.e.
SELECT *
). - omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
- begin (Union[datetime, str, None], default None): If provided, get rows newer than or equal to this value.
- end (Union[datetime, str, None], default None): If provided, get rows older than or equal to this value.
- params (Optional[Dict[str, Any]], default None):
Additional parameters to filter by.
See
meerschaum.connectors.sql.build_where
. - order (Optional[str], default 'asc'):
The selection order for all of the indices in the query.
If
None
, omit theORDER BY
clause. - limit (Optional[int], default None): If specified, limit the number of rows retrieved to this value.
- begin_add_minutes (int, default 0):
The number of minutes to add to the
begin
datetime (i.e.DATEADD
. - end_add_minutes (int, default 0):
The number of minutes to add to the
end
datetime (i.e.DATEADD
. - chunksize (Optional[int], default -1): The size of dataframe chunks to load into memory.
- debug (bool, default False): Verbosity toggle.
Returns
- A
pd.DataFrame
of the pipe's data.
813def get_pipe_data_query( 814 self, 815 pipe: mrsm.Pipe, 816 select_columns: Optional[List[str]] = None, 817 omit_columns: Optional[List[str]] = None, 818 begin: Union[datetime, int, str, None] = None, 819 end: Union[datetime, int, str, None] = None, 820 params: Optional[Dict[str, Any]] = None, 821 order: str = 'asc', 822 limit: Optional[int] = None, 823 begin_add_minutes: int = 0, 824 end_add_minutes: int = 0, 825 replace_nulls: Optional[str] = None, 826 debug: bool = False, 827 **kw: Any 828 ) -> Union[str, None]: 829 """ 830 Return the `SELECT` query for retrieving a pipe's data from its instance. 831 832 Parameters 833 ---------- 834 pipe: mrsm.Pipe: 835 The pipe to get data from. 836 837 select_columns: Optional[List[str]], default None 838 If provided, only select these given columns. 839 Otherwise select all available columns (i.e. `SELECT *`). 840 841 omit_columns: Optional[List[str]], default None 842 If provided, remove these columns from the selection. 843 844 begin: Union[datetime, int, str, None], default None 845 If provided, get rows newer than or equal to this value. 846 847 end: Union[datetime, str, None], default None 848 If provided, get rows older than or equal to this value. 849 850 params: Optional[Dict[str, Any]], default None 851 Additional parameters to filter by. 852 See `meerschaum.connectors.sql.build_where`. 853 854 order: Optional[str], default 'asc' 855 The selection order for all of the indices in the query. 856 If `None`, omit the `ORDER BY` clause. 857 858 limit: Optional[int], default None 859 If specified, limit the number of rows retrieved to this value. 860 861 begin_add_minutes: int, default 0 862 The number of minutes to add to the `begin` datetime (i.e. `DATEADD`). 863 864 end_add_minutes: int, default 0 865 The number of minutes to add to the `end` datetime (i.e. `DATEADD`). 866 867 chunksize: Optional[int], default -1 868 The size of dataframe chunks to load into memory. 869 870 replace_nulls: Optional[str], default None 871 If provided, replace null values with this value. 872 873 debug: bool, default False 874 Verbosity toggle. 875 876 Returns 877 ------- 878 A `SELECT` query to retrieve a pipe's data. 879 """ 880 import json 881 from meerschaum.utils.debug import dprint 882 from meerschaum.utils.misc import items_str 883 from meerschaum.utils.sql import sql_item_name, dateadd_str 884 from meerschaum.utils.packages import import_pandas 885 pd = import_pandas() 886 existing_cols = pipe.get_columns_types(debug=debug) 887 select_columns = ( 888 [col for col in existing_cols] 889 if not select_columns 890 else [col for col in select_columns if col in existing_cols] 891 ) 892 if omit_columns: 893 select_columns = [col for col in select_columns if col not in omit_columns] 894 895 if begin == '': 896 begin = pipe.get_sync_time(debug=debug) 897 backtrack_interval = pipe.get_backtrack_interval(debug=debug) 898 if begin is not None: 899 begin -= backtrack_interval 900 901 cols_names = [sql_item_name(col, self.flavor, None) for col in select_columns] 902 select_cols_str = ( 903 'SELECT\n' 904 + ',\n '.join( 905 [ 906 ( 907 col_name 908 if not replace_nulls 909 else f"COALESCE(col_name, '{replace_nulls}') AS {col_name}" 910 ) 911 for col_name in cols_names 912 ] 913 ) 914 ) 915 pipe_table_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 916 query = f"{select_cols_str}\nFROM {pipe_table_name}" 917 where = "" 918 919 if order is not None: 920 default_order = 'asc' 921 if order not in ('asc', 'desc'): 922 warn(f"Ignoring unsupported order '{order}'. Falling back to '{default_order}'.") 923 order = default_order 924 order = order.upper() 925 926 if not pipe.columns.get('datetime', None): 927 _dt = pipe.guess_datetime() 928 dt = sql_item_name(_dt, self.flavor, None) if _dt else None 929 is_guess = True 930 else: 931 _dt = pipe.get_columns('datetime') 932 dt = sql_item_name(_dt, self.flavor, None) 933 is_guess = False 934 935 quoted_indices = { 936 key: sql_item_name(val, self.flavor, None) 937 for key, val in pipe.columns.items() 938 if val in existing_cols 939 } 940 941 if begin is not None or end is not None: 942 if is_guess: 943 if _dt is None: 944 warn( 945 f"No datetime could be determined for {pipe}." 946 + "\n Ignoring begin and end...", 947 stack = False, 948 ) 949 begin, end = None, None 950 else: 951 warn( 952 f"A datetime wasn't specified for {pipe}.\n" 953 + f" Using column \"{_dt}\" for datetime bounds...", 954 stack = False, 955 ) 956 957 is_dt_bound = False 958 if begin is not None and _dt in existing_cols: 959 begin_da = dateadd_str( 960 flavor = self.flavor, 961 datepart = 'minute', 962 number = begin_add_minutes, 963 begin = begin 964 ) 965 where += f"{dt} >= {begin_da}" + (" AND " if end is not None else "") 966 is_dt_bound = True 967 968 if end is not None and _dt in existing_cols: 969 if 'int' in str(type(end)).lower() and end == begin: 970 end += 1 971 end_da = dateadd_str( 972 flavor = self.flavor, 973 datepart = 'minute', 974 number = end_add_minutes, 975 begin = end 976 ) 977 where += f"{dt} < {end_da}" 978 is_dt_bound = True 979 980 if params is not None: 981 from meerschaum.utils.sql import build_where 982 valid_params = {k: v for k, v in params.items() if k in existing_cols} 983 if valid_params: 984 where += build_where(valid_params, self).replace( 985 'WHERE', ('AND' if is_dt_bound else "") 986 ) 987 988 if len(where) > 0: 989 query += "\nWHERE " + where 990 991 if order is not None: 992 ### Sort by indices, starting with datetime. 993 order_by = "" 994 if quoted_indices: 995 order_by += "\nORDER BY " 996 if _dt and _dt in existing_cols: 997 order_by += dt + ' ' + order + ',' 998 for key, quoted_col_name in quoted_indices.items(): 999 if key == 'datetime': 1000 continue 1001 order_by += ' ' + quoted_col_name + ' ' + order + ',' 1002 order_by = order_by[:-1] 1003 1004 query += order_by 1005 1006 if isinstance(limit, int): 1007 if self.flavor == 'mssql': 1008 query = f'SELECT TOP {limit} ' + query[len("SELECT *"):] 1009 elif self.flavor == 'oracle': 1010 query = f"SELECT * FROM (\n {query}\n)\nWHERE ROWNUM = 1" 1011 else: 1012 query += f"\nLIMIT {limit}" 1013 1014 if debug: 1015 to_print = ( 1016 [] 1017 + ([f"begin='{begin}'"] if begin else []) 1018 + ([f"end='{end}'"] if end else []) 1019 + ([f"params={params}"] if params else []) 1020 ) 1021 dprint("Getting pipe data with constraints: " + items_str(to_print, quotes=False)) 1022 1023 return query
Return the SELECT
query for retrieving a pipe's data from its instance.
Parameters
- pipe (mrsm.Pipe:): The pipe to get data from.
- select_columns (Optional[List[str]], default None):
If provided, only select these given columns.
Otherwise select all available columns (i.e.
SELECT *
). - omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
- begin (Union[datetime, int, str, None], default None): If provided, get rows newer than or equal to this value.
- end (Union[datetime, str, None], default None): If provided, get rows older than or equal to this value.
- params (Optional[Dict[str, Any]], default None):
Additional parameters to filter by.
See
meerschaum.connectors.sql.build_where
. - order (Optional[str], default 'asc'):
The selection order for all of the indices in the query.
If
None
, omit theORDER BY
clause. - limit (Optional[int], default None): If specified, limit the number of rows retrieved to this value.
- begin_add_minutes (int, default 0):
The number of minutes to add to the
begin
datetime (i.e.DATEADD
). - end_add_minutes (int, default 0):
The number of minutes to add to the
end
datetime (i.e.DATEADD
). - chunksize (Optional[int], default -1): The size of dataframe chunks to load into memory.
- replace_nulls (Optional[str], default None): If provided, replace null values with this value.
- debug (bool, default False): Verbosity toggle.
Returns
- A
SELECT
query to retrieve a pipe's data.
18def register_pipe( 19 self, 20 pipe: mrsm.Pipe, 21 debug: bool = False, 22 ) -> SuccessTuple: 23 """ 24 Register a new pipe. 25 A pipe's attributes must be set before registering. 26 """ 27 from meerschaum.utils.debug import dprint 28 from meerschaum.utils.packages import attempt_import 29 from meerschaum.utils.sql import json_flavors 30 31 ### ensure pipes table exists 32 from meerschaum.connectors.sql.tables import get_tables 33 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 34 35 if pipe.get_id(debug=debug) is not None: 36 return False, f"{pipe} is already registered." 37 38 ### NOTE: if `parameters` is supplied in the Pipe constructor, 39 ### then `pipe.parameters` will exist and not be fetched from the database. 40 41 ### 1. Prioritize the Pipe object's `parameters` first. 42 ### E.g. if the user manually sets the `parameters` property 43 ### or if the Pipe already exists 44 ### (which shouldn't be able to be registered anyway but that's an issue for later). 45 parameters = None 46 try: 47 parameters = pipe.parameters 48 except Exception as e: 49 if debug: 50 dprint(str(e)) 51 parameters = None 52 53 ### ensure `parameters` is a dictionary 54 if parameters is None: 55 parameters = {} 56 57 import json 58 sqlalchemy = attempt_import('sqlalchemy') 59 values = { 60 'connector_keys' : pipe.connector_keys, 61 'metric_key' : pipe.metric_key, 62 'location_key' : pipe.location_key, 63 'parameters' : ( 64 json.dumps(parameters) 65 if self.flavor not in json_flavors 66 else parameters 67 ), 68 } 69 query = sqlalchemy.insert(pipes_tbl).values(**values) 70 result = self.exec(query, debug=debug) 71 if result is None: 72 return False, f"Failed to register {pipe}." 73 return True, f"Successfully registered {pipe}."
Register a new pipe. A pipe's attributes must be set before registering.
76def edit_pipe( 77 self, 78 pipe : mrsm.Pipe = None, 79 patch: bool = False, 80 debug: bool = False, 81 **kw : Any 82 ) -> SuccessTuple: 83 """ 84 Persist a Pipe's parameters to its database. 85 86 Parameters 87 ---------- 88 pipe: mrsm.Pipe, default None 89 The pipe to be edited. 90 patch: bool, default False 91 If patch is `True`, update the existing parameters by cascading. 92 Otherwise overwrite the parameters (default). 93 debug: bool, default False 94 Verbosity toggle. 95 """ 96 97 if pipe.id is None: 98 return False, f"{pipe} is not registered and cannot be edited." 99 100 from meerschaum.utils.debug import dprint 101 from meerschaum.utils.packages import attempt_import 102 from meerschaum.utils.sql import json_flavors 103 if not patch: 104 parameters = pipe.__dict__.get('_attributes', {}).get('parameters', {}) 105 else: 106 from meerschaum import Pipe 107 from meerschaum.config._patch import apply_patch_to_config 108 original_parameters = Pipe( 109 pipe.connector_keys, pipe.metric_key, pipe.location_key, 110 mrsm_instance=pipe.instance_keys 111 ).parameters 112 parameters = apply_patch_to_config( 113 original_parameters, 114 pipe.parameters 115 ) 116 117 ### ensure pipes table exists 118 from meerschaum.connectors.sql.tables import get_tables 119 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 120 121 import json 122 sqlalchemy = attempt_import('sqlalchemy') 123 124 values = { 125 'parameters': ( 126 json.dumps(parameters) 127 if self.flavor not in json_flavors 128 else parameters 129 ), 130 } 131 q = sqlalchemy.update(pipes_tbl).values(**values).where( 132 pipes_tbl.c.pipe_id == pipe.id 133 ) 134 135 result = self.exec(q, debug=debug) 136 message = ( 137 f"Successfully edited {pipe}." 138 if result is not None else f"Failed to edit {pipe}." 139 ) 140 return (result is not None), message
Persist a Pipe's parameters to its database.
Parameters
- pipe (mrsm.Pipe, default None): The pipe to be edited.
- patch (bool, default False):
If patch is
True
, update the existing parameters by cascading. Otherwise overwrite the parameters (default). - debug (bool, default False): Verbosity toggle.
1026def get_pipe_id( 1027 self, 1028 pipe: mrsm.Pipe, 1029 debug: bool = False, 1030 ) -> Any: 1031 """ 1032 Get a Pipe's ID from the pipes table. 1033 """ 1034 if pipe.temporary: 1035 return None 1036 from meerschaum.utils.packages import attempt_import 1037 import json 1038 sqlalchemy = attempt_import('sqlalchemy') 1039 from meerschaum.connectors.sql.tables import get_tables 1040 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 1041 1042 query = sqlalchemy.select(pipes_tbl.c.pipe_id).where( 1043 pipes_tbl.c.connector_keys == pipe.connector_keys 1044 ).where( 1045 pipes_tbl.c.metric_key == pipe.metric_key 1046 ).where( 1047 (pipes_tbl.c.location_key == pipe.location_key) if pipe.location_key is not None 1048 else pipes_tbl.c.location_key.is_(None) 1049 ) 1050 _id = self.value(query, debug=debug, silent=pipe.temporary) 1051 if _id is not None: 1052 _id = int(_id) 1053 return _id
Get a Pipe's ID from the pipes table.
1056def get_pipe_attributes( 1057 self, 1058 pipe: mrsm.Pipe, 1059 debug: bool = False, 1060 ) -> Dict[str, Any]: 1061 """ 1062 Get a Pipe's attributes dictionary. 1063 """ 1064 from meerschaum.connectors.sql.tables import get_tables 1065 from meerschaum.utils.packages import attempt_import 1066 sqlalchemy = attempt_import('sqlalchemy') 1067 1068 if pipe.get_id(debug=debug) is None: 1069 return {} 1070 1071 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 1072 1073 try: 1074 q = sqlalchemy.select(pipes_tbl).where(pipes_tbl.c.pipe_id == pipe.id) 1075 if debug: 1076 dprint(q) 1077 attributes = ( 1078 dict(self.exec(q, silent=True, debug=debug).first()._mapping) 1079 if self.flavor != 'duckdb' 1080 else self.read(q, debug=debug).to_dict(orient='records')[0] 1081 ) 1082 except Exception as e: 1083 import traceback 1084 traceback.print_exc() 1085 warn(e) 1086 print(pipe) 1087 return {} 1088 1089 ### handle non-PostgreSQL databases (text vs JSON) 1090 if not isinstance(attributes.get('parameters', None), dict): 1091 try: 1092 import json 1093 parameters = json.loads(attributes['parameters']) 1094 if isinstance(parameters, str) and parameters[0] == '{': 1095 parameters = json.loads(parameters) 1096 attributes['parameters'] = parameters 1097 except Exception as e: 1098 attributes['parameters'] = {} 1099 1100 return attributes
Get a Pipe's attributes dictionary.
1103def sync_pipe( 1104 self, 1105 pipe: mrsm.Pipe, 1106 df: Union[pd.DataFrame, str, Dict[Any, Any], None] = None, 1107 begin: Optional[datetime] = None, 1108 end: Optional[datetime] = None, 1109 chunksize: Optional[int] = -1, 1110 check_existing: bool = True, 1111 blocking: bool = True, 1112 debug: bool = False, 1113 _check_temporary_tables: bool = True, 1114 **kw: Any 1115 ) -> SuccessTuple: 1116 """ 1117 Sync a pipe using a database connection. 1118 1119 Parameters 1120 ---------- 1121 pipe: mrsm.Pipe 1122 The Meerschaum Pipe instance into which to sync the data. 1123 1124 df: Union[pandas.DataFrame, str, Dict[Any, Any], List[Dict[str, Any]]] 1125 An optional DataFrame or equivalent to sync into the pipe. 1126 Defaults to `None`. 1127 1128 begin: Optional[datetime], default None 1129 Optionally specify the earliest datetime to search for data. 1130 Defaults to `None`. 1131 1132 end: Optional[datetime], default None 1133 Optionally specify the latest datetime to search for data. 1134 Defaults to `None`. 1135 1136 chunksize: Optional[int], default -1 1137 Specify the number of rows to sync per chunk. 1138 If `-1`, resort to system configuration (default is `900`). 1139 A `chunksize` of `None` will sync all rows in one transaction. 1140 Defaults to `-1`. 1141 1142 check_existing: bool, default True 1143 If `True`, pull and diff with existing data from the pipe. Defaults to `True`. 1144 1145 blocking: bool, default True 1146 If `True`, wait for sync to finish and return its result, otherwise asyncronously sync. 1147 Defaults to `True`. 1148 1149 debug: bool, default False 1150 Verbosity toggle. Defaults to False. 1151 1152 kw: Any 1153 Catch-all for keyword arguments. 1154 1155 Returns 1156 ------- 1157 A `SuccessTuple` of success (`bool`) and message (`str`). 1158 """ 1159 from meerschaum.utils.packages import import_pandas 1160 from meerschaum.utils.sql import get_update_queries, sql_item_name, json_flavors, update_queries 1161 from meerschaum.utils.misc import generate_password 1162 from meerschaum.utils.dataframe import get_json_cols, get_numeric_cols 1163 from meerschaum.utils.dtypes import are_dtypes_equal 1164 from meerschaum import Pipe 1165 import time 1166 import copy 1167 pd = import_pandas() 1168 if df is None: 1169 msg = f"DataFrame is None. Cannot sync {pipe}." 1170 warn(msg) 1171 return False, msg 1172 1173 start = time.perf_counter() 1174 1175 if not pipe.temporary and not pipe.get_id(debug=debug): 1176 register_tuple = pipe.register(debug=debug) 1177 if not register_tuple[0]: 1178 return register_tuple 1179 1180 ### df is the dataframe returned from the remote source 1181 ### via the connector 1182 if debug: 1183 dprint("Fetched data:\n" + str(df)) 1184 1185 if not isinstance(df, pd.DataFrame): 1186 df = pipe.enforce_dtypes( 1187 df, 1188 chunksize = chunksize, 1189 safe_copy = kw.get('safe_copy', False), 1190 debug = debug, 1191 ) 1192 1193 ### if table does not exist, create it with indices 1194 is_new = False 1195 add_cols_query = None 1196 if not pipe.exists(debug=debug): 1197 check_existing = False 1198 is_new = True 1199 else: 1200 ### Check for new columns. 1201 add_cols_queries = self.get_add_columns_queries(pipe, df, debug=debug) 1202 if add_cols_queries: 1203 if not self.exec_queries(add_cols_queries, debug=debug): 1204 warn(f"Failed to add new columns to {pipe}.") 1205 1206 alter_cols_queries = self.get_alter_columns_queries(pipe, df, debug=debug) 1207 if alter_cols_queries: 1208 if not self.exec_queries(alter_cols_queries, debug=debug): 1209 warn(f"Failed to alter columns for {pipe}.") 1210 else: 1211 _ = pipe.infer_dtypes(persist=True) 1212 1213 ### NOTE: Oracle SQL < 23c (2023) and SQLite does not support booleans, 1214 ### so infer bools and persist them to `dtypes`. 1215 ### MSSQL supports `BIT` for booleans, but we coerce bools to int for MSSQL 1216 ### to avoid merge issues. 1217 if self.flavor in ('oracle', 'sqlite', 'mssql', 'mysql', 'mariadb'): 1218 pipe_dtypes = pipe.dtypes 1219 new_bool_cols = { 1220 col: 'bool[pyarrow]' 1221 for col, typ in df.dtypes.items() 1222 if col not in pipe_dtypes 1223 and are_dtypes_equal(str(typ), 'bool') 1224 } 1225 pipe_dtypes.update(new_bool_cols) 1226 pipe.dtypes = pipe_dtypes 1227 if not pipe.temporary: 1228 infer_bool_success, infer_bool_msg = pipe.edit(debug=debug) 1229 if not infer_bool_success: 1230 return infer_bool_success, infer_bool_msg 1231 1232 upsert = pipe.parameters.get('upsert', False) and (self.flavor + '-upsert') in update_queries 1233 if upsert: 1234 check_existing = False 1235 kw['safe_copy'] = kw.get('safe_copy', False) 1236 1237 unseen_df, update_df, delta_df = ( 1238 pipe.filter_existing( 1239 df, 1240 chunksize = chunksize, 1241 debug = debug, 1242 **kw 1243 ) if check_existing else (df, None, df) 1244 ) 1245 if upsert: 1246 unseen_df, update_df, delta_df = (df.head(0), df, df) 1247 1248 if debug: 1249 dprint("Delta data:\n" + str(delta_df)) 1250 dprint("Unseen data:\n" + str(unseen_df)) 1251 if update_df is not None: 1252 dprint(("Update" if not upsert else "Upsert") + " data:\n" + str(update_df)) 1253 1254 if_exists = kw.get('if_exists', 'append') 1255 if 'if_exists' in kw: 1256 kw.pop('if_exists') 1257 if 'name' in kw: 1258 kw.pop('name') 1259 1260 ### Account for first-time syncs of JSON columns. 1261 unseen_json_cols = get_json_cols(unseen_df) 1262 update_json_cols = get_json_cols(update_df) if update_df is not None else [] 1263 json_cols = list(set(unseen_json_cols + update_json_cols)) 1264 existing_json_cols = [col for col, typ in pipe.dtypes.items() if typ == 'json'] 1265 new_json_cols = [col for col in json_cols if col not in existing_json_cols] 1266 if new_json_cols: 1267 pipe.dtypes.update({col: 'json' for col in json_cols}) 1268 if not pipe.temporary: 1269 edit_success, edit_msg = pipe.edit(interactive=False, debug=debug) 1270 if not edit_success: 1271 warn(f"Unable to update JSON dtypes for {pipe}:\n{edit_msg}") 1272 1273 unseen_numeric_cols = get_numeric_cols(unseen_df) 1274 update_numeric_cols = get_numeric_cols(update_df) if update_df is not None else [] 1275 numeric_cols = list(set(unseen_numeric_cols + update_numeric_cols)) 1276 existing_numeric_cols = [col for col, typ in pipe.dtypes.items() if typ == 'numeric'] 1277 new_numeric_cols = [col for col in numeric_cols if col not in existing_numeric_cols] 1278 if new_numeric_cols: 1279 pipe.dtypes.update({col: 'numeric' for col in numeric_cols}) 1280 if not pipe.temporary: 1281 edit_success, edit_msg = pipe.edit(interactive=False, debug=debug) 1282 if not edit_success: 1283 warn(f"Unable to update NUMERIC dtypes for {pipe}:\n{edit_msg}") 1284 1285 ### Insert new data into Pipe's table. 1286 unseen_kw = copy.deepcopy(kw) 1287 unseen_kw.update({ 1288 'name': pipe.target, 1289 'if_exists': if_exists, 1290 'debug': debug, 1291 'as_dict': True, 1292 'chunksize': chunksize, 1293 'dtype': self.get_to_sql_dtype(pipe, unseen_df, update_dtypes=True), 1294 'schema': self.get_pipe_schema(pipe), 1295 }) 1296 1297 stats = self.to_sql(unseen_df, **unseen_kw) 1298 if is_new: 1299 if not self.create_indices(pipe, debug=debug): 1300 warn(f"Failed to create indices for {pipe}. Continuing...") 1301 1302 if update_df is not None and len(update_df) > 0: 1303 dt_col = pipe.columns.get('datetime', None) 1304 dt_typ = pipe.dtypes.get(dt_col, None) 1305 dt_name = sql_item_name(dt_col, self.flavor) if dt_col else None 1306 update_min = update_df[dt_col].min() if dt_col and dt_col in update_df.columns else None 1307 update_max = update_df[dt_col].max() if dt_col and dt_col in update_df.columns else None 1308 update_begin = update_min 1309 update_end = ( 1310 update_max 1311 + ( 1312 timedelta(minutes=1) 1313 if are_dtypes_equal(str(dt_typ), 'datetime') 1314 else 1 1315 ) 1316 ) if dt_col else None 1317 1318 transact_id = generate_password(3) 1319 temp_target = '-' + transact_id + '_' + pipe.target 1320 self._log_temporary_tables_creation(temp_target, create=(not pipe.temporary), debug=debug) 1321 temp_pipe = Pipe( 1322 pipe.connector_keys.replace(':', '_') + '_', pipe.metric_key, pipe.location_key, 1323 instance = pipe.instance_keys, 1324 columns = { 1325 ix_key: ix 1326 for ix_key, ix in pipe.columns.items() 1327 if ix and ix in update_df.columns 1328 }, 1329 dtypes = pipe.dtypes, 1330 target = temp_target, 1331 temporary = True, 1332 parameters = { 1333 'schema': self.internal_schema, 1334 'hypertable': False, 1335 }, 1336 ) 1337 temp_pipe.sync(update_df, check_existing=False, debug=debug) 1338 existing_cols = pipe.get_columns_types(debug=debug) 1339 join_cols = [ 1340 col 1341 for col_key, col in pipe.columns.items() 1342 if col and col in existing_cols 1343 ] 1344 update_queries = get_update_queries( 1345 pipe.target, 1346 temp_target, 1347 self, 1348 join_cols, 1349 upsert = upsert, 1350 schema = self.get_pipe_schema(pipe), 1351 patch_schema = self.internal_schema, 1352 datetime_col = pipe.columns.get('datetime', None), 1353 debug = debug, 1354 ) 1355 update_success = all( 1356 self.exec_queries(update_queries, break_on_error=True, rollback=True, debug=debug) 1357 ) 1358 self._log_temporary_tables_creation( 1359 temp_target, 1360 ready_to_drop = True, 1361 create = (not pipe.temporary), 1362 debug = debug, 1363 ) 1364 if not update_success: 1365 warn(f"Failed to apply update to {pipe}.") 1366 1367 stop = time.perf_counter() 1368 success = stats['success'] 1369 if not success: 1370 return success, stats['msg'] 1371 1372 unseen_count = len(unseen_df.index) if unseen_df is not None else 0 1373 update_count = len(update_df.index) if update_df is not None else 0 1374 msg = ( 1375 ( 1376 f"Inserted {unseen_count}, " 1377 + f"updated {update_count} rows." 1378 ) 1379 if not upsert 1380 else ( 1381 f"Upserted {update_count} row" 1382 + ('s' if update_count != 1 else '') 1383 + "." 1384 ) 1385 ) 1386 if debug: 1387 msg = msg[:-1] + ( 1388 f"\non table {sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))}\n" 1389 + f"in {round(stop - start, 2)} seconds." 1390 ) 1391 1392 if _check_temporary_tables: 1393 drop_stale_success, drop_stale_msg = self._drop_old_temporary_tables( 1394 refresh=False, debug=debug 1395 ) 1396 if not drop_stale_success: 1397 warn(drop_stale_msg) 1398 1399 return success, msg
Sync a pipe using a database connection.
Parameters
- pipe (mrsm.Pipe): The Meerschaum Pipe instance into which to sync the data.
- df (Union[pandas.DataFrame, str, Dict[Any, Any], List[Dict[str, Any]]]):
An optional DataFrame or equivalent to sync into the pipe.
Defaults to
None
. - begin (Optional[datetime], default None):
Optionally specify the earliest datetime to search for data.
Defaults to
None
. - end (Optional[datetime], default None):
Optionally specify the latest datetime to search for data.
Defaults to
None
. - chunksize (Optional[int], default -1):
Specify the number of rows to sync per chunk.
If
-1
, resort to system configuration (default is900
). Achunksize
ofNone
will sync all rows in one transaction. Defaults to-1
. - check_existing (bool, default True):
If
True
, pull and diff with existing data from the pipe. Defaults toTrue
. - blocking (bool, default True):
If
True
, wait for sync to finish and return its result, otherwise asyncronously sync. Defaults toTrue
. - debug (bool, default False): Verbosity toggle. Defaults to False.
- kw (Any): Catch-all for keyword arguments.
Returns
- A
SuccessTuple
of success (bool
) and message (str
).
1402def sync_pipe_inplace( 1403 self, 1404 pipe: 'mrsm.Pipe', 1405 params: Optional[Dict[str, Any]] = None, 1406 begin: Optional[datetime] = None, 1407 end: Optional[datetime] = None, 1408 chunksize: Optional[int] = -1, 1409 check_existing: bool = True, 1410 debug: bool = False, 1411 **kw: Any 1412 ) -> SuccessTuple: 1413 """ 1414 If a pipe's connector is the same as its instance connector, 1415 it's more efficient to sync the pipe in-place rather than reading data into Pandas. 1416 1417 Parameters 1418 ---------- 1419 pipe: mrsm.Pipe 1420 The pipe whose connector is the same as its instance. 1421 1422 params: Optional[Dict[str, Any]], default None 1423 Optional params dictionary to build the `WHERE` clause. 1424 See `meerschaum.utils.sql.build_where`. 1425 1426 begin: Optional[datetime], default None 1427 Optionally specify the earliest datetime to search for data. 1428 Defaults to `None`. 1429 1430 end: Optional[datetime], default None 1431 Optionally specify the latest datetime to search for data. 1432 Defaults to `None`. 1433 1434 chunksize: Optional[int], default -1 1435 Specify the number of rows to sync per chunk. 1436 If `-1`, resort to system configuration (default is `900`). 1437 A `chunksize` of `None` will sync all rows in one transaction. 1438 Defaults to `-1`. 1439 1440 check_existing: bool, default True 1441 If `True`, pull and diff with existing data from the pipe. 1442 1443 debug: bool, default False 1444 Verbosity toggle. 1445 1446 Returns 1447 ------- 1448 A SuccessTuple. 1449 """ 1450 if self.flavor == 'duckdb': 1451 return pipe.sync( 1452 params = params, 1453 begin = begin, 1454 end = end, 1455 chunksize = chunksize, 1456 check_existing = check_existing, 1457 debug = debug, 1458 _inplace = False, 1459 **kw 1460 ) 1461 from meerschaum.utils.sql import ( 1462 sql_item_name, 1463 get_sqlalchemy_table, 1464 get_update_queries, 1465 get_null_replacement, 1466 NO_CTE_FLAVORS, 1467 NO_SELECT_INTO_FLAVORS, 1468 format_cte_subquery, 1469 get_create_table_query, 1470 get_table_cols_types, 1471 truncate_item_name, 1472 session_execute, 1473 table_exists, 1474 update_queries, 1475 ) 1476 from meerschaum.utils.dtypes.sql import ( 1477 get_pd_type_from_db_type, 1478 ) 1479 from meerschaum.utils.misc import generate_password 1480 from meerschaum.utils.debug import dprint 1481 1482 sqlalchemy, sqlalchemy_orm = mrsm.attempt_import('sqlalchemy', 'sqlalchemy.orm') 1483 metadef = self.get_pipe_metadef( 1484 pipe, 1485 params = params, 1486 begin = begin, 1487 end = end, 1488 check_existing = check_existing, 1489 debug = debug, 1490 ) 1491 pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 1492 upsert = pipe.parameters.get('upsert', False) and f'{self.flavor}-upsert' in update_queries 1493 internal_schema = self.internal_schema 1494 database = getattr(self, 'database', self.parse_uri(self.URI).get('database', None)) 1495 1496 if not pipe.exists(debug=debug): 1497 create_pipe_query = get_create_table_query( 1498 metadef, 1499 pipe.target, 1500 self.flavor, 1501 schema = self.get_pipe_schema(pipe), 1502 ) 1503 result = self.exec(create_pipe_query, debug=debug) 1504 if result is None: 1505 return False, f"Could not insert new data into {pipe} from its SQL query definition." 1506 if not self.create_indices(pipe, debug=debug): 1507 warn(f"Failed to create indices for {pipe}. Continuing...") 1508 1509 rowcount = pipe.get_rowcount(debug=debug) 1510 return True, f"Inserted {rowcount}, updated 0 rows." 1511 1512 session = sqlalchemy_orm.Session(self.engine) 1513 connectable = session if self.flavor != 'duckdb' else self 1514 1515 transact_id = generate_password(3) 1516 def get_temp_table_name(label: str) -> str: 1517 return '-' + transact_id + '_' + label + '_' + pipe.target 1518 1519 temp_table_roots = ['backtrack', 'new', 'delta', 'joined', 'unseen', 'update'] 1520 temp_tables = { 1521 table_root: get_temp_table_name(table_root) 1522 for table_root in temp_table_roots 1523 } 1524 temp_table_names = { 1525 table_root: sql_item_name( 1526 table_name_raw, 1527 self.flavor, 1528 internal_schema, 1529 ) 1530 for table_root, table_name_raw in temp_tables.items() 1531 } 1532 1533 def clean_up_temp_tables(ready_to_drop: bool = False): 1534 log_success, log_msg = self._log_temporary_tables_creation( 1535 [ 1536 table 1537 for table in temp_tables.values() 1538 ] if not upsert else [temp_tables['update']], 1539 ready_to_drop = ready_to_drop, 1540 create = (not pipe.temporary), 1541 debug = debug, 1542 ) 1543 if not log_success: 1544 warn(log_msg) 1545 1546 create_new_query = get_create_table_query( 1547 metadef, 1548 temp_tables[('new') if not upsert else 'update'], 1549 self.flavor, 1550 schema = internal_schema, 1551 ) 1552 (create_new_success, create_new_msg), create_new_results = session_execute( 1553 session, 1554 create_new_query, 1555 with_results = True, 1556 debug = debug, 1557 ) 1558 if not create_new_success: 1559 _ = clean_up_temp_tables() 1560 return create_new_success, create_new_msg 1561 new_count = create_new_results[0].rowcount if create_new_results else 0 1562 1563 new_cols_types = get_table_cols_types( 1564 temp_tables[('new' if not upsert else 'update')], 1565 connectable = connectable, 1566 flavor = self.flavor, 1567 schema = internal_schema, 1568 database = database, 1569 debug = debug, 1570 ) 1571 if not new_cols_types: 1572 return False, f"Failed to get new columns for {pipe}." 1573 1574 new_cols = { 1575 str(col_name): get_pd_type_from_db_type(str(col_type)) 1576 for col_name, col_type in new_cols_types.items() 1577 } 1578 new_cols_str = ', '.join([ 1579 sql_item_name(col, self.flavor) 1580 for col in new_cols 1581 ]) 1582 1583 add_cols_queries = self.get_add_columns_queries(pipe, new_cols, debug=debug) 1584 if add_cols_queries: 1585 self.exec_queries(add_cols_queries, debug=debug) 1586 1587 alter_cols_queries = self.get_alter_columns_queries(pipe, new_cols, debug=debug) 1588 if alter_cols_queries: 1589 self.exec_queries(alter_cols_queries, debug=debug) 1590 1591 insert_queries = [ 1592 ( 1593 f"INSERT INTO {pipe_name} ({new_cols_str})\n" 1594 + f"SELECT {new_cols_str}\nFROM {temp_table_names['new']}" 1595 ) 1596 ] if not check_existing and not upsert else [] 1597 1598 new_queries = insert_queries 1599 new_success, new_msg = ( 1600 session_execute(session, new_queries, debug=debug) 1601 if new_queries 1602 else (True, "Success") 1603 ) 1604 if not new_success: 1605 _ = clean_up_temp_tables() 1606 return new_success, new_msg 1607 1608 if not check_existing: 1609 session.commit() 1610 _ = clean_up_temp_tables() 1611 return True, f"Inserted {new_count}, updated 0 rows." 1612 1613 backtrack_def = self.get_pipe_data_query( 1614 pipe, 1615 begin = begin, 1616 end = end, 1617 begin_add_minutes = 0, 1618 end_add_minutes = 1, 1619 params = params, 1620 debug = debug, 1621 order = None, 1622 ) 1623 1624 select_backtrack_query = format_cte_subquery( 1625 backtrack_def, 1626 self.flavor, 1627 sub_name = 'backtrack_def', 1628 ) 1629 create_backtrack_query = get_create_table_query( 1630 backtrack_def, 1631 temp_tables['backtrack'], 1632 self.flavor, 1633 schema = internal_schema, 1634 ) 1635 (create_backtrack_success, create_backtrack_msg), create_backtrack_results = session_execute( 1636 session, 1637 create_backtrack_query, 1638 with_results = True, 1639 debug = debug, 1640 ) if not upsert else (True, "Success"), None 1641 1642 if not create_backtrack_success: 1643 _ = clean_up_temp_tables() 1644 return create_backtrack_success, create_backtrack_msg 1645 bactrack_count = create_backtrack_results[0].rowcount if create_backtrack_results else 0 1646 1647 backtrack_cols_types = get_table_cols_types( 1648 temp_tables['backtrack'], 1649 connectable = connectable, 1650 flavor = self.flavor, 1651 schema = internal_schema, 1652 database = database, 1653 debug = debug, 1654 ) if not upsert else new_cols_types 1655 1656 common_cols = [col for col in new_cols if col in backtrack_cols_types] 1657 on_cols = { 1658 col: new_cols.get(col, 'object') 1659 for col_key, col in pipe.columns.items() 1660 if ( 1661 col 1662 and 1663 col_key != 'value' 1664 and col in backtrack_cols_types 1665 and col in new_cols 1666 ) 1667 } 1668 1669 null_replace_new_cols_str = ( 1670 ', '.join([ 1671 f"COALESCE({temp_table_names['new']}.{sql_item_name(col, self.flavor, None)}, " 1672 + f"{get_null_replacement(typ, self.flavor)}) AS " 1673 + sql_item_name(col, self.flavor, None) 1674 for col, typ in new_cols.items() 1675 ]) 1676 ) 1677 1678 select_delta_query = ( 1679 f"SELECT\n" 1680 + null_replace_new_cols_str + "\n" 1681 + f"\nFROM {temp_table_names['new']}\n" 1682 + f"LEFT OUTER JOIN {temp_table_names['backtrack']}\nON\n" 1683 + '\nAND\n'.join([ 1684 ( 1685 f"COALESCE({temp_table_names['new']}." 1686 + sql_item_name(c, self.flavor, None) 1687 + ", " 1688 + get_null_replacement(new_cols[c], self.flavor) 1689 + ") " 1690 + ' = ' 1691 + f"COALESCE({temp_table_names['backtrack']}." 1692 + sql_item_name(c, self.flavor, None) 1693 + ", " 1694 + get_null_replacement(backtrack_cols_types[c], self.flavor) 1695 + ") " 1696 ) for c in common_cols 1697 ]) 1698 + "\nWHERE\n" 1699 + '\nAND\n'.join([ 1700 ( 1701 f"{temp_table_names['backtrack']}." + sql_item_name(c, self.flavor, None) + ' IS NULL' 1702 ) for c in common_cols 1703 ]) 1704 ) 1705 create_delta_query = get_create_table_query( 1706 select_delta_query, 1707 temp_tables['delta'], 1708 self.flavor, 1709 schema = internal_schema, 1710 ) 1711 create_delta_success, create_delta_msg = session_execute( 1712 session, 1713 create_delta_query, 1714 debug = debug, 1715 ) if not upsert else (True, "Success") 1716 if not create_delta_success: 1717 _ = clean_up_temp_tables() 1718 return create_delta_success, create_delta_msg 1719 1720 delta_cols_types = get_table_cols_types( 1721 temp_tables['delta'], 1722 connectable = connectable, 1723 flavor = self.flavor, 1724 schema = internal_schema, 1725 database = database, 1726 debug = debug, 1727 ) if not upsert else new_cols_types 1728 1729 ### This is a weird bug on SQLite. 1730 ### Sometimes the backtrack dtypes are all empty strings. 1731 if not all(delta_cols_types.values()): 1732 delta_cols_types = new_cols_types 1733 1734 delta_cols = { 1735 col: get_pd_type_from_db_type(typ) 1736 for col, typ in delta_cols_types.items() 1737 } 1738 delta_cols_str = ', '.join([ 1739 sql_item_name(col, self.flavor) 1740 for col in delta_cols 1741 ]) 1742 1743 select_joined_query = ( 1744 "SELECT " 1745 + (', '.join([ 1746 ( 1747 f"{temp_table_names['delta']}." + sql_item_name(c, self.flavor, None) 1748 + " AS " + sql_item_name(c + '_delta', self.flavor, None) 1749 ) for c in delta_cols 1750 ])) 1751 + ", " 1752 + (', '.join([ 1753 ( 1754 f"{temp_table_names['backtrack']}." + sql_item_name(c, self.flavor, None) 1755 + " AS " + sql_item_name(c + '_backtrack', self.flavor, None) 1756 ) for c in backtrack_cols_types 1757 ])) 1758 + f"\nFROM {temp_table_names['delta']}\n" 1759 + f"LEFT OUTER JOIN {temp_table_names['backtrack']}\nON\n" 1760 + '\nAND\n'.join([ 1761 ( 1762 f"COALESCE({temp_table_names['delta']}." + sql_item_name(c, self.flavor, None) 1763 + ", " + get_null_replacement(typ, self.flavor) + ")" 1764 + ' = ' 1765 + f"COALESCE({temp_table_names['backtrack']}." + sql_item_name(c, self.flavor, None) 1766 + ", " + get_null_replacement(typ, self.flavor) + ")" 1767 ) for c, typ in on_cols.items() 1768 ]) 1769 ) 1770 1771 create_joined_query = get_create_table_query( 1772 select_joined_query, 1773 temp_tables['joined'], 1774 self.flavor, 1775 schema = internal_schema, 1776 ) 1777 create_joined_success, create_joined_msg = session_execute( 1778 session, 1779 create_joined_query, 1780 debug = debug, 1781 ) if on_cols and not upsert else (True, "Success") 1782 if not create_joined_success: 1783 _ = clean_up_temp_tables() 1784 return create_joined_success, create_joined_msg 1785 1786 select_unseen_query = ( 1787 "SELECT " 1788 + (', '.join([ 1789 ( 1790 "CASE\n WHEN " + sql_item_name(c + '_delta', self.flavor, None) 1791 + " != " + get_null_replacement(typ, self.flavor) 1792 + " THEN " + sql_item_name(c + '_delta', self.flavor, None) 1793 + "\n ELSE NULL\nEND " 1794 + " AS " + sql_item_name(c, self.flavor, None) 1795 ) for c, typ in delta_cols.items() 1796 ])) 1797 + f"\nFROM {temp_table_names['joined']}\n" 1798 + f"WHERE " 1799 + '\nAND\n'.join([ 1800 ( 1801 sql_item_name(c + '_backtrack', self.flavor, None) + ' IS NULL' 1802 ) for c in delta_cols 1803 ]) 1804 ) 1805 create_unseen_query = get_create_table_query( 1806 select_unseen_query, 1807 temp_tables['unseen'], 1808 self.flavor, 1809 internal_schema, 1810 ) 1811 (create_unseen_success, create_unseen_msg), create_unseen_results = session_execute( 1812 session, 1813 create_unseen_query, 1814 with_results = True, 1815 debug = debug 1816 ) if not upsert else (True, "Success"), None 1817 if not create_unseen_success: 1818 _ = clean_up_temp_tables() 1819 return create_unseen_success, create_unseen_msg 1820 1821 select_update_query = ( 1822 "SELECT " 1823 + (', '.join([ 1824 ( 1825 "CASE\n WHEN " + sql_item_name(c + '_delta', self.flavor, None) 1826 + " != " + get_null_replacement(typ, self.flavor) 1827 + " THEN " + sql_item_name(c + '_delta', self.flavor, None) 1828 + "\n ELSE NULL\nEND " 1829 + " AS " + sql_item_name(c, self.flavor, None) 1830 ) for c, typ in delta_cols.items() 1831 ])) 1832 + f"\nFROM {temp_table_names['joined']}\n" 1833 + f"WHERE " 1834 + '\nOR\n'.join([ 1835 ( 1836 sql_item_name(c + '_backtrack', self.flavor, None) + ' IS NOT NULL' 1837 ) for c in delta_cols 1838 ]) 1839 ) 1840 1841 create_update_query = get_create_table_query( 1842 select_update_query, 1843 temp_tables['update'], 1844 self.flavor, 1845 internal_schema, 1846 ) 1847 (create_update_success, create_update_msg), create_update_results = session_execute( 1848 session, 1849 create_update_query, 1850 with_results = True, 1851 debug = debug, 1852 ) if on_cols and not upsert else ((True, "Success"), []) 1853 apply_update_queries = ( 1854 get_update_queries( 1855 pipe.target, 1856 temp_tables['update'], 1857 session, 1858 on_cols, 1859 upsert = upsert, 1860 schema = self.get_pipe_schema(pipe), 1861 patch_schema = internal_schema, 1862 datetime_col = pipe.columns.get('datetime', None), 1863 flavor = self.flavor, 1864 debug = debug 1865 ) 1866 if on_cols else [] 1867 ) 1868 1869 apply_unseen_queries = [ 1870 ( 1871 f"INSERT INTO {pipe_name} ({delta_cols_str})\n" 1872 + f"SELECT {delta_cols_str}\nFROM " 1873 + ( 1874 temp_table_names['unseen'] 1875 if on_cols 1876 else temp_table_names['delta'] 1877 ) 1878 ), 1879 ] 1880 1881 (apply_unseen_success, apply_unseen_msg), apply_unseen_results = session_execute( 1882 session, 1883 apply_unseen_queries, 1884 with_results = True, 1885 debug = debug, 1886 ) if not upsert else (True, "Success"), None 1887 if not apply_unseen_success: 1888 _ = clean_up_temp_tables() 1889 return apply_unseen_success, apply_unseen_msg 1890 unseen_count = apply_unseen_results[0].rowcount if apply_unseen_results else 0 1891 1892 (apply_update_success, apply_update_msg), apply_update_results = session_execute( 1893 session, 1894 apply_update_queries, 1895 with_results = True, 1896 debug = debug, 1897 ) 1898 if not apply_update_success: 1899 _ = clean_up_temp_tables() 1900 return apply_update_success, apply_update_msg 1901 update_count = apply_update_results[0].rowcount if apply_update_results else 0 1902 1903 session.commit() 1904 1905 msg = ( 1906 f"Inserted {unseen_count}, updated {update_count} rows." 1907 if not upsert 1908 else f"Upserted {update_count} row" + ('s' if update_count != 1 else '') + "." 1909 ) 1910 _ = clean_up_temp_tables(ready_to_drop=True) 1911 1912 drop_stale_success, drop_stale_msg = self._drop_old_temporary_tables(refresh=False, debug=debug) 1913 if not drop_stale_success: 1914 warn(drop_stale_msg) 1915 1916 return True, msg
If a pipe's connector is the same as its instance connector, it's more efficient to sync the pipe in-place rather than reading data into Pandas.
Parameters
- pipe (mrsm.Pipe): The pipe whose connector is the same as its instance.
- params (Optional[Dict[str, Any]], default None):
Optional params dictionary to build the
WHERE
clause. Seemeerschaum.utils.sql.build_where
. - begin (Optional[datetime], default None):
Optionally specify the earliest datetime to search for data.
Defaults to
None
. - end (Optional[datetime], default None):
Optionally specify the latest datetime to search for data.
Defaults to
None
. - chunksize (Optional[int], default -1):
Specify the number of rows to sync per chunk.
If
-1
, resort to system configuration (default is900
). Achunksize
ofNone
will sync all rows in one transaction. Defaults to-1
. - check_existing (bool, default True):
If
True
, pull and diff with existing data from the pipe. - debug (bool, default False): Verbosity toggle.
Returns
- A SuccessTuple.
1919def get_sync_time( 1920 self, 1921 pipe: 'mrsm.Pipe', 1922 params: Optional[Dict[str, Any]] = None, 1923 newest: bool = True, 1924 debug: bool = False, 1925 ) -> Union[datetime, int, None]: 1926 """Get a Pipe's most recent datetime value. 1927 1928 Parameters 1929 ---------- 1930 pipe: mrsm.Pipe 1931 The pipe to get the sync time for. 1932 1933 params: Optional[Dict[str, Any]], default None 1934 Optional params dictionary to build the `WHERE` clause. 1935 See `meerschaum.utils.sql.build_where`. 1936 1937 newest: bool, default True 1938 If `True`, get the most recent datetime (honoring `params`). 1939 If `False`, get the oldest datetime (ASC instead of DESC). 1940 1941 Returns 1942 ------- 1943 A `datetime` object (or `int` if using an integer axis) if the pipe exists, otherwise `None`. 1944 """ 1945 from meerschaum.utils.sql import sql_item_name, build_where 1946 table = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 1947 1948 dt_col = pipe.columns.get('datetime', None) 1949 dt_type = pipe.dtypes.get(dt_col, 'datetime64[ns]') 1950 if not dt_col: 1951 _dt = pipe.guess_datetime() 1952 dt = sql_item_name(_dt, self.flavor, None) if _dt else None 1953 is_guess = True 1954 else: 1955 _dt = dt_col 1956 dt = sql_item_name(_dt, self.flavor, None) 1957 is_guess = False 1958 1959 if _dt is None: 1960 return None 1961 1962 ASC_or_DESC = "DESC" if newest else "ASC" 1963 existing_cols = pipe.get_columns_types(debug=debug) 1964 valid_params = {} 1965 if params is not None: 1966 valid_params = {k: v for k, v in params.items() if k in existing_cols} 1967 1968 ### If no bounds are provided for the datetime column, 1969 ### add IS NOT NULL to the WHERE clause. 1970 if _dt not in valid_params: 1971 valid_params[_dt] = '_None' 1972 where = "" if not valid_params else build_where(valid_params, self) 1973 q = f"SELECT {dt}\nFROM {table}{where}\nORDER BY {dt} {ASC_or_DESC}\nLIMIT 1" 1974 if self.flavor == 'mssql': 1975 q = f"SELECT TOP 1 {dt}\nFROM {table}{where}\nORDER BY {dt} {ASC_or_DESC}" 1976 elif self.flavor == 'oracle': 1977 q = ( 1978 "SELECT * FROM (\n" 1979 + f" SELECT {dt}\nFROM {table}{where}\n ORDER BY {dt} {ASC_or_DESC}\n" 1980 + ") WHERE ROWNUM = 1" 1981 ) 1982 1983 try: 1984 db_time = self.value(q, silent=True, debug=debug) 1985 1986 ### No datetime could be found. 1987 if db_time is None: 1988 return None 1989 ### sqlite returns str. 1990 if isinstance(db_time, str): 1991 from meerschaum.utils.packages import attempt_import 1992 dateutil_parser = attempt_import('dateutil.parser') 1993 st = dateutil_parser.parse(db_time) 1994 ### Do nothing if a datetime object is returned. 1995 elif isinstance(db_time, datetime): 1996 if hasattr(db_time, 'to_pydatetime'): 1997 st = db_time.to_pydatetime() 1998 else: 1999 st = db_time 2000 ### Sometimes the datetime is actually a date. 2001 elif isinstance(db_time, date): 2002 st = datetime.combine(db_time, datetime.min.time()) 2003 ### Adding support for an integer datetime axis. 2004 elif 'int' in str(type(db_time)).lower(): 2005 st = int(db_time) 2006 ### Convert pandas timestamp to Python datetime. 2007 else: 2008 st = db_time.to_pydatetime() 2009 2010 sync_time = st 2011 2012 except Exception as e: 2013 sync_time = None 2014 warn(str(e)) 2015 2016 return sync_time
Get a Pipe's most recent datetime value.
Parameters
- pipe (mrsm.Pipe): The pipe to get the sync time for.
- params (Optional[Dict[str, Any]], default None):
Optional params dictionary to build the
WHERE
clause. Seemeerschaum.utils.sql.build_where
. - newest (bool, default True):
If
True
, get the most recent datetime (honoringparams
). IfFalse
, get the oldest datetime (ASC instead of DESC).
Returns
- A
datetime
object (orint
if using an integer axis) if the pipe exists, otherwiseNone
.
2019def pipe_exists( 2020 self, 2021 pipe: mrsm.Pipe, 2022 debug: bool = False 2023 ) -> bool: 2024 """ 2025 Check that a Pipe's table exists. 2026 2027 Parameters 2028 ---------- 2029 pipe: mrsm.Pipe: 2030 The pipe to check. 2031 2032 debug: bool, default False 2033 Verbosity toggle. 2034 2035 Returns 2036 ------- 2037 A `bool` corresponding to whether a pipe's table exists. 2038 2039 """ 2040 from meerschaum.utils.sql import table_exists 2041 exists = table_exists( 2042 pipe.target, 2043 self, 2044 schema = self.get_pipe_schema(pipe), 2045 debug = debug, 2046 ) 2047 if debug: 2048 from meerschaum.utils.debug import dprint 2049 dprint(f"{pipe} " + ('exists.' if exists else 'does not exist.')) 2050 return exists
Check that a Pipe's table exists.
Parameters
- pipe (mrsm.Pipe:): The pipe to check.
- debug (bool, default False): Verbosity toggle.
Returns
- A
bool
corresponding to whether a pipe's table exists.
2053def get_pipe_rowcount( 2054 self, 2055 pipe: mrsm.Pipe, 2056 begin: Union[datetime, int, None] = None, 2057 end: Union[datetime, int, None] = None, 2058 params: Optional[Dict[str, Any]] = None, 2059 remote: bool = False, 2060 debug: bool = False 2061 ) -> Union[int, None]: 2062 """ 2063 Get the rowcount for a pipe in accordance with given parameters. 2064 2065 Parameters 2066 ---------- 2067 pipe: mrsm.Pipe 2068 The pipe to query with. 2069 2070 begin: Union[datetime, int, None], default None 2071 The begin datetime value. 2072 2073 end: Union[datetime, int, None], default None 2074 The end datetime value. 2075 2076 params: Optional[Dict[str, Any]], default None 2077 See `meerschaum.utils.sql.build_where`. 2078 2079 remote: bool, default False 2080 If `True`, get the rowcount for the remote table. 2081 2082 debug: bool, default False 2083 Verbosity toggle. 2084 2085 Returns 2086 ------- 2087 An `int` for the number of rows if the `pipe` exists, otherwise `None`. 2088 2089 """ 2090 from meerschaum.utils.sql import dateadd_str, sql_item_name, NO_CTE_FLAVORS 2091 from meerschaum.connectors.sql._fetch import get_pipe_query 2092 if remote: 2093 msg = f"'fetch:definition' must be an attribute of {pipe} to get a remote rowcount." 2094 if 'fetch' not in pipe.parameters: 2095 error(msg) 2096 return None 2097 if 'definition' not in pipe.parameters['fetch']: 2098 error(msg) 2099 return None 2100 2101 _pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 2102 2103 if not pipe.columns.get('datetime', None): 2104 _dt = pipe.guess_datetime() 2105 dt = sql_item_name(_dt, self.flavor, None) if _dt else None 2106 is_guess = True 2107 else: 2108 _dt = pipe.get_columns('datetime') 2109 dt = sql_item_name(_dt, self.flavor, None) 2110 is_guess = False 2111 2112 if begin is not None or end is not None: 2113 if is_guess: 2114 if _dt is None: 2115 warn( 2116 f"No datetime could be determined for {pipe}." 2117 + "\n Ignoring begin and end...", 2118 stack = False, 2119 ) 2120 begin, end = None, None 2121 else: 2122 warn( 2123 f"A datetime wasn't specified for {pipe}.\n" 2124 + f" Using column \"{_dt}\" for datetime bounds...", 2125 stack = False, 2126 ) 2127 2128 2129 _datetime_name = sql_item_name( 2130 _dt, 2131 ( 2132 pipe.instance_connector.flavor 2133 if not remote 2134 else pipe.connector.flavor 2135 ), 2136 None, 2137 ) 2138 _cols_names = [ 2139 sql_item_name( 2140 col, 2141 ( 2142 pipe.instance_connector.flavor 2143 if not remote 2144 else pipe.connector.flavor 2145 ), 2146 None, 2147 ) 2148 for col in set( 2149 ( 2150 [_dt] 2151 if _dt 2152 else [] 2153 ) 2154 + ( 2155 [] 2156 if params is None 2157 else list(params.keys()) 2158 ) 2159 ) 2160 ] 2161 if not _cols_names: 2162 _cols_names = ['*'] 2163 2164 src = ( 2165 f"SELECT {', '.join(_cols_names)} FROM {_pipe_name}" 2166 if not remote 2167 else get_pipe_query(pipe) 2168 ) 2169 query = ( 2170 f""" 2171 WITH src AS ({src}) 2172 SELECT COUNT(*) 2173 FROM src 2174 """ 2175 ) if self.flavor not in ('mysql', 'mariadb') else ( 2176 f""" 2177 SELECT COUNT(*) 2178 FROM ({src}) AS src 2179 """ 2180 ) 2181 if begin is not None or end is not None: 2182 query += "WHERE" 2183 if begin is not None: 2184 query += f""" 2185 {dt} >= {dateadd_str(self.flavor, datepart='minute', number=0, begin=begin)} 2186 """ 2187 if end is not None and begin is not None: 2188 query += "AND" 2189 if end is not None: 2190 query += f""" 2191 {dt} < {dateadd_str(self.flavor, datepart='minute', number=0, begin=end)} 2192 """ 2193 if params is not None: 2194 from meerschaum.utils.sql import build_where 2195 existing_cols = pipe.get_columns_types(debug=debug) 2196 valid_params = {k: v for k, v in params.items() if k in existing_cols} 2197 if valid_params: 2198 query += build_where(valid_params, self).replace('WHERE', ( 2199 'AND' if (begin is not None or end is not None) 2200 else 'WHERE' 2201 ) 2202 ) 2203 2204 result = self.value(query, debug=debug, silent=True) 2205 try: 2206 return int(result) 2207 except Exception as e: 2208 return None
Get the rowcount for a pipe in accordance with given parameters.
Parameters
- pipe (mrsm.Pipe): The pipe to query with.
- begin (Union[datetime, int, None], default None): The begin datetime value.
- end (Union[datetime, int, None], default None): The end datetime value.
- params (Optional[Dict[str, Any]], default None):
See
meerschaum.utils.sql.build_where
. - remote (bool, default False):
If
True
, get the rowcount for the remote table. - debug (bool, default False): Verbosity toggle.
Returns
- An
int
for the number of rows if thepipe
exists, otherwiseNone
.
2211def drop_pipe( 2212 self, 2213 pipe: mrsm.Pipe, 2214 debug: bool = False, 2215 **kw 2216 ) -> SuccessTuple: 2217 """ 2218 Drop a pipe's tables but maintain its registration. 2219 2220 Parameters 2221 ---------- 2222 pipe: mrsm.Pipe 2223 The pipe to drop. 2224 2225 """ 2226 from meerschaum.utils.sql import table_exists, sql_item_name 2227 success = True 2228 target = pipe.target 2229 target_name = ( 2230 sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2231 ) 2232 if table_exists(target, self, debug=debug): 2233 success = self.exec(f"DROP TABLE {target_name}", silent=True, debug=debug) is not None 2234 2235 msg = "Success" if success else f"Failed to drop {pipe}." 2236 return success, msg
Drop a pipe's tables but maintain its registration.
Parameters
- pipe (mrsm.Pipe): The pipe to drop.
2239def clear_pipe( 2240 self, 2241 pipe: mrsm.Pipe, 2242 begin: Union[datetime, int, None] = None, 2243 end: Union[datetime, int, None] = None, 2244 params: Optional[Dict[str, Any]] = None, 2245 debug: bool = False, 2246 **kw 2247 ) -> SuccessTuple: 2248 """ 2249 Delete a pipe's data within a bounded or unbounded interval without dropping the table. 2250 2251 Parameters 2252 ---------- 2253 pipe: mrsm.Pipe 2254 The pipe to clear. 2255 2256 begin: Union[datetime, int, None], default None 2257 Beginning datetime. Inclusive. 2258 2259 end: Union[datetime, int, None], default None 2260 Ending datetime. Exclusive. 2261 2262 params: Optional[Dict[str, Any]], default None 2263 See `meerschaum.utils.sql.build_where`. 2264 2265 """ 2266 if not pipe.exists(debug=debug): 2267 return True, f"{pipe} does not exist, so nothing was cleared." 2268 2269 from meerschaum.utils.sql import sql_item_name, build_where, dateadd_str 2270 pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 2271 2272 if not pipe.columns.get('datetime', None): 2273 _dt = pipe.guess_datetime() 2274 dt_name = sql_item_name(_dt, self.flavor, None) if _dt else None 2275 is_guess = True 2276 else: 2277 _dt = pipe.get_columns('datetime') 2278 dt_name = sql_item_name(_dt, self.flavor, None) 2279 is_guess = False 2280 2281 if begin is not None or end is not None: 2282 if is_guess: 2283 if _dt is None: 2284 warn( 2285 f"No datetime could be determined for {pipe}." 2286 + "\n Ignoring datetime bounds...", 2287 stack = False, 2288 ) 2289 begin, end = None, None 2290 else: 2291 warn( 2292 f"A datetime wasn't specified for {pipe}.\n" 2293 + f" Using column \"{_dt}\" for datetime bounds...", 2294 stack = False, 2295 ) 2296 2297 valid_params = {} 2298 if params is not None: 2299 existing_cols = pipe.get_columns_types(debug=debug) 2300 valid_params = {k: v for k, v in params.items() if k in existing_cols} 2301 clear_query = ( 2302 f"DELETE FROM {pipe_name}\nWHERE 1 = 1\n" 2303 + (' AND ' + build_where(valid_params, self, with_where=False) if valid_params else '') 2304 + ( 2305 f' AND {dt_name} >= ' + dateadd_str(self.flavor, 'day', 0, begin) 2306 if begin is not None else '' 2307 ) + ( 2308 f' AND {dt_name} < ' + dateadd_str(self.flavor, 'day', 0, end) 2309 if end is not None else '' 2310 ) 2311 ) 2312 success = self.exec(clear_query, silent=True, debug=debug) is not None 2313 msg = "Success" if success else f"Failed to clear {pipe}." 2314 return success, msg
Delete a pipe's data within a bounded or unbounded interval without dropping the table.
Parameters
- pipe (mrsm.Pipe): The pipe to clear.
- begin (Union[datetime, int, None], default None): Beginning datetime. Inclusive.
- end (Union[datetime, int, None], default None): Ending datetime. Exclusive.
- params (Optional[Dict[str, Any]], default None):
See
meerschaum.utils.sql.build_where
.
2835def deduplicate_pipe( 2836 self, 2837 pipe: mrsm.Pipe, 2838 begin: Union[datetime, int, None] = None, 2839 end: Union[datetime, int, None] = None, 2840 params: Optional[Dict[str, Any]] = None, 2841 debug: bool = False, 2842 **kwargs: Any 2843 ) -> SuccessTuple: 2844 """ 2845 Delete duplicate values within a pipe's table. 2846 2847 Parameters 2848 ---------- 2849 pipe: mrsm.Pipe 2850 The pipe whose table to deduplicate. 2851 2852 begin: Union[datetime, int, None], default None 2853 If provided, only deduplicate values greater than or equal to this value. 2854 2855 end: Union[datetime, int, None], default None 2856 If provided, only deduplicate values less than this value. 2857 2858 params: Optional[Dict[str, Any]], default None 2859 If provided, further limit deduplication to values which match this query dictionary. 2860 2861 debug: bool, default False 2862 Verbosity toggle. 2863 2864 Returns 2865 ------- 2866 A `SuccessTuple` indicating success. 2867 """ 2868 from meerschaum.utils.sql import ( 2869 sql_item_name, 2870 NO_CTE_FLAVORS, 2871 get_rename_table_queries, 2872 NO_SELECT_INTO_FLAVORS, 2873 get_create_table_query, 2874 format_cte_subquery, 2875 get_null_replacement, 2876 ) 2877 from meerschaum.utils.misc import generate_password, flatten_list 2878 2879 pipe_table_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 2880 2881 if not pipe.exists(debug=debug): 2882 return False, f"Table {pipe_table_name} does not exist." 2883 2884 ### TODO: Handle deleting duplicates without a datetime axis. 2885 dt_col = pipe.columns.get('datetime', None) 2886 dt_col_name = sql_item_name(dt_col, self.flavor, None) 2887 cols_types = pipe.get_columns_types(debug=debug) 2888 existing_cols = pipe.get_columns_types(debug=debug) 2889 2890 get_rowcount_query = f"SELECT COUNT(*) FROM {pipe_table_name}" 2891 old_rowcount = self.value(get_rowcount_query, debug=debug) 2892 if old_rowcount is None: 2893 return False, f"Failed to get rowcount for table {pipe_table_name}." 2894 2895 ### Non-datetime indices that in fact exist. 2896 indices = [ 2897 col 2898 for key, col in pipe.columns.items() 2899 if col and col != dt_col and col in cols_types 2900 ] 2901 indices_names = [sql_item_name(index_col, self.flavor, None) for index_col in indices] 2902 existing_cols_names = [sql_item_name(col, self.flavor, None) for col in existing_cols] 2903 duplicates_cte_name = sql_item_name('dups', self.flavor, None) 2904 duplicate_row_number_name = sql_item_name('dup_row_num', self.flavor, None) 2905 previous_row_number_name = sql_item_name('prev_row_num', self.flavor, None) 2906 2907 index_list_str = ( 2908 sql_item_name(dt_col, self.flavor, None) 2909 if dt_col 2910 else '' 2911 ) 2912 index_list_str_ordered = ( 2913 ( 2914 sql_item_name(dt_col, self.flavor, None) + " DESC" 2915 ) 2916 if dt_col 2917 else '' 2918 ) 2919 if indices: 2920 index_list_str += ', ' + ', '.join(indices_names) 2921 index_list_str_ordered += ', ' + ', '.join(indices_names) 2922 if index_list_str.startswith(','): 2923 index_list_str = index_list_str.lstrip(',').lstrip() 2924 if index_list_str_ordered.startswith(','): 2925 index_list_str_ordered = index_list_str_ordered.lstrip(',').lstrip() 2926 2927 cols_list_str = ', '.join(existing_cols_names) 2928 2929 try: 2930 ### NOTE: MySQL 5 and below does not support window functions (ROW_NUMBER()). 2931 is_old_mysql = ( 2932 self.flavor in ('mysql', 'mariadb') 2933 and 2934 int(self.db_version.split('.')[0]) < 8 2935 ) 2936 except Exception as e: 2937 is_old_mysql = False 2938 2939 src_query = f""" 2940 SELECT 2941 {cols_list_str}, 2942 ROW_NUMBER() OVER ( 2943 PARTITION BY 2944 {index_list_str} 2945 ORDER BY {index_list_str_ordered} 2946 ) AS {duplicate_row_number_name} 2947 FROM {pipe_table_name} 2948 """ 2949 duplicates_cte_subquery = format_cte_subquery( 2950 src_query, 2951 self.flavor, 2952 sub_name = 'src', 2953 cols_to_select = cols_list_str, 2954 ) + f""" 2955 WHERE {duplicate_row_number_name} = 1 2956 """ 2957 old_mysql_query = ( 2958 f""" 2959 SELECT 2960 {index_list_str} 2961 FROM ( 2962 SELECT 2963 {index_list_str}, 2964 IF( 2965 @{previous_row_number_name} <> {index_list_str.replace(', ', ' + ')}, 2966 @{duplicate_row_number_name} := 0, 2967 @{duplicate_row_number_name} 2968 ), 2969 @{previous_row_number_name} := {index_list_str.replace(', ', ' + ')}, 2970 @{duplicate_row_number_name} := @{duplicate_row_number_name} + 1 AS """ 2971 + f"""{duplicate_row_number_name} 2972 FROM 2973 {pipe_table_name}, 2974 ( 2975 SELECT @{duplicate_row_number_name} := 0 2976 ) AS {duplicate_row_number_name}, 2977 ( 2978 SELECT @{previous_row_number_name} := '{get_null_replacement('str', 'mysql')}' 2979 ) AS {previous_row_number_name} 2980 ORDER BY {index_list_str_ordered} 2981 ) AS t 2982 WHERE {duplicate_row_number_name} = 1 2983 """ 2984 ) 2985 if is_old_mysql: 2986 duplicates_cte_subquery = old_mysql_query 2987 2988 session_id = generate_password(3) 2989 2990 dedup_table = '-' + session_id + f'_dedup_{pipe.target}' 2991 temp_old_table = '-' + session_id + f"_old_{pipe.target}" 2992 2993 dedup_table_name = sql_item_name(dedup_table, self.flavor, self.get_pipe_schema(pipe)) 2994 temp_old_table_name = sql_item_name(temp_old_table, self.flavor, self.get_pipe_schema(pipe)) 2995 2996 create_temporary_table_query = get_create_table_query( 2997 duplicates_cte_subquery, 2998 dedup_table, 2999 self.flavor, 3000 ) + f""" 3001 ORDER BY {index_list_str_ordered} 3002 """ 3003 alter_queries = flatten_list([ 3004 get_rename_table_queries( 3005 pipe.target, temp_old_table, self.flavor, schema=self.get_pipe_schema(pipe) 3006 ), 3007 get_rename_table_queries( 3008 dedup_table, pipe.target, self.flavor, schema=self.get_pipe_schema(pipe) 3009 ), 3010 f""" 3011 DROP TABLE {temp_old_table_name} 3012 """, 3013 ]) 3014 3015 create_temporary_result = self.execute(create_temporary_table_query, debug=debug) 3016 if create_temporary_result is None: 3017 return False, f"Failed to deduplicate table {pipe_table_name}." 3018 3019 results = self.exec_queries( 3020 alter_queries, 3021 break_on_error = True, 3022 rollback = True, 3023 debug = debug, 3024 ) 3025 3026 fail_query = None 3027 for result, query in zip(results, alter_queries): 3028 if result is None: 3029 fail_query = query 3030 break 3031 success = fail_query is None 3032 3033 new_rowcount = ( 3034 self.value(get_rowcount_query, debug=debug) 3035 if success 3036 else None 3037 ) 3038 3039 msg = ( 3040 ( 3041 f"Successfully deduplicated table {pipe_table_name}" 3042 + ( 3043 f"\nfrom {old_rowcount} to {new_rowcount} rows" 3044 if old_rowcount != new_rowcount 3045 else '' 3046 ) 3047 + '.' 3048 ) 3049 if success 3050 else f"Failed to execute query:\n{fail_query}" 3051 ) 3052 return success, msg
Delete duplicate values within a pipe's table.
Parameters
- pipe (mrsm.Pipe): The pipe whose table to deduplicate.
- begin (Union[datetime, int, None], default None): If provided, only deduplicate values greater than or equal to this value.
- end (Union[datetime, int, None], default None): If provided, only deduplicate values less than this value.
- params (Optional[Dict[str, Any]], default None): If provided, further limit deduplication to values which match this query dictionary.
- debug (bool, default False): Verbosity toggle.
Returns
- A
SuccessTuple
indicating success.
2317def get_pipe_table( 2318 self, 2319 pipe: mrsm.Pipe, 2320 debug: bool = False, 2321 ) -> sqlalchemy.Table: 2322 """ 2323 Return the `sqlalchemy.Table` object for a `mrsm.Pipe`. 2324 2325 Parameters 2326 ---------- 2327 pipe: mrsm.Pipe: 2328 The pipe in question. 2329 2330 Returns 2331 ------- 2332 A `sqlalchemy.Table` object. 2333 2334 """ 2335 from meerschaum.utils.sql import get_sqlalchemy_table 2336 if not pipe.exists(debug=debug): 2337 return None 2338 return get_sqlalchemy_table( 2339 pipe.target, 2340 connector = self, 2341 schema = self.get_pipe_schema(pipe), 2342 debug = debug, 2343 refresh = True, 2344 )
Return the sqlalchemy.Table
object for a mrsm.Pipe
.
Parameters
- pipe (mrsm.Pipe:): The pipe in question.
Returns
- A
sqlalchemy.Table
object.
2347def get_pipe_columns_types( 2348 self, 2349 pipe: mrsm.Pipe, 2350 debug: bool = False, 2351 ) -> Dict[str, str]: 2352 """ 2353 Get the pipe's columns and types. 2354 2355 Parameters 2356 ---------- 2357 pipe: mrsm.Pipe: 2358 The pipe to get the columns for. 2359 2360 Returns 2361 ------- 2362 A dictionary of columns names (`str`) and types (`str`). 2363 2364 Examples 2365 -------- 2366 >>> conn.get_pipe_columns_types(pipe) 2367 { 2368 'dt': 'TIMESTAMP WITHOUT TIMEZONE', 2369 'id': 'BIGINT', 2370 'val': 'DOUBLE PRECISION', 2371 } 2372 >>> 2373 """ 2374 if not pipe.exists(debug=debug): 2375 return {} 2376 table_columns = {} 2377 try: 2378 pipe_table = self.get_pipe_table(pipe, debug=debug) 2379 if pipe_table is None: 2380 return {} 2381 for col in pipe_table.columns: 2382 table_columns[str(col.name)] = str(col.type) 2383 except Exception as e: 2384 import traceback 2385 traceback.print_exc() 2386 warn(e) 2387 table_columns = {} 2388 2389 return table_columns
Get the pipe's columns and types.
Parameters
- pipe (mrsm.Pipe:): The pipe to get the columns for.
Returns
- A dictionary of columns names (
str
) and types (str
).
Examples
>>> conn.get_pipe_columns_types(pipe)
{
'dt': 'TIMESTAMP WITHOUT TIMEZONE',
'id': 'BIGINT',
'val': 'DOUBLE PRECISION',
}
>>>
2782def get_to_sql_dtype( 2783 self, 2784 pipe: 'mrsm.Pipe', 2785 df: 'pd.DataFrame', 2786 update_dtypes: bool = True, 2787 ) -> Dict[str, 'sqlalchemy.sql.visitors.TraversibleType']: 2788 """ 2789 Given a pipe and DataFrame, return the `dtype` dictionary for `to_sql()`. 2790 2791 Parameters 2792 ---------- 2793 pipe: mrsm.Pipe 2794 The pipe which may contain a `dtypes` parameter. 2795 2796 df: pd.DataFrame 2797 The DataFrame to be pushed via `to_sql()`. 2798 2799 update_dtypes: bool, default True 2800 If `True`, patch the pipe's dtypes onto the DataFrame's dtypes. 2801 2802 Returns 2803 ------- 2804 A dictionary with `sqlalchemy` datatypes. 2805 2806 Examples 2807 -------- 2808 >>> import pandas as pd 2809 >>> import meerschaum as mrsm 2810 >>> 2811 >>> conn = mrsm.get_connector('sql:memory') 2812 >>> df = pd.DataFrame([{'a': {'b': 1}}]) 2813 >>> pipe = mrsm.Pipe('a', 'b', dtypes={'a': 'json'}) 2814 >>> get_to_sql_dtype(pipe, df) 2815 {'a': <class 'sqlalchemy.sql.sqltypes.JSON'>} 2816 """ 2817 from meerschaum.utils.dataframe import get_json_cols, get_numeric_cols 2818 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 2819 df_dtypes = { 2820 col: str(typ) 2821 for col, typ in df.dtypes.items() 2822 } 2823 json_cols = get_json_cols(df) 2824 numeric_cols = get_numeric_cols(df) 2825 df_dtypes.update({col: 'json' for col in json_cols}) 2826 df_dtypes.update({col: 'numeric' for col in numeric_cols}) 2827 if update_dtypes: 2828 df_dtypes.update(pipe.dtypes) 2829 return { 2830 col: get_db_type_from_pd_type(typ, self.flavor, as_sqlalchemy=True) 2831 for col, typ in df_dtypes.items() 2832 }
Given a pipe and DataFrame, return the dtype
dictionary for to_sql()
.
Parameters
- pipe (mrsm.Pipe):
The pipe which may contain a
dtypes
parameter. - df (pd.DataFrame):
The DataFrame to be pushed via
to_sql()
. - update_dtypes (bool, default True):
If
True
, patch the pipe's dtypes onto the DataFrame's dtypes.
Returns
- A dictionary with
sqlalchemy
datatypes.
Examples
>>> import pandas as pd
>>> import meerschaum as mrsm
>>>
>>> conn = mrsm.get_connector('sql:memory')
>>> df = pd.DataFrame([{'a': {'b': 1}}])
>>> pipe = mrsm.Pipe('a', 'b', dtypes={'a': 'json'})
>>> get_to_sql_dtype(pipe, df)
{'a': <class 'sqlalchemy.sql.sqltypes.JSON'>}
3055def get_pipe_schema(self, pipe: mrsm.Pipe) -> Union[str, None]: 3056 """ 3057 Return the schema to use for this pipe. 3058 First check `pipe.parameters['schema']`, then check `self.schema`. 3059 3060 Parameters 3061 ---------- 3062 pipe: mrsm.Pipe 3063 The pipe which may contain a configured schema. 3064 3065 Returns 3066 ------- 3067 A schema string or `None` if nothing is configured. 3068 """ 3069 return pipe.parameters.get('schema', self.schema)
Return the schema to use for this pipe.
First check pipe.parameters['schema']
, then check self.schema
.
Parameters
- pipe (mrsm.Pipe): The pipe which may contain a configured schema.
Returns
- A schema string or
None
if nothing is configured.
13def register_plugin( 14 self, 15 plugin: 'meerschaum.core.Plugin', 16 force: bool = False, 17 debug: bool = False, 18 **kw: Any 19 ) -> SuccessTuple: 20 """Register a new plugin to the plugins table.""" 21 from meerschaum.utils.warnings import warn, error 22 from meerschaum.utils.packages import attempt_import 23 sqlalchemy = attempt_import('sqlalchemy') 24 from meerschaum.utils.sql import json_flavors 25 from meerschaum.connectors.sql.tables import get_tables 26 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 27 28 old_id = self.get_plugin_id(plugin, debug=debug) 29 30 ### Check for version conflict. May be overridden with `--force`. 31 if old_id is not None and not force: 32 old_version = self.get_plugin_version(plugin, debug=debug) 33 new_version = plugin.version 34 if old_version is None: 35 old_version = '' 36 if new_version is None: 37 new_version = '' 38 39 ### verify that the new version is greater than the old 40 packaging_version = attempt_import('packaging.version') 41 if ( 42 old_version and new_version 43 and packaging_version.parse(old_version) >= packaging_version.parse(new_version) 44 ): 45 return False, ( 46 f"Version '{new_version}' of plugin '{plugin}' " + 47 f"must be greater than existing version '{old_version}'." 48 ) 49 50 import json 51 bind_variables = { 52 'plugin_name' : plugin.name, 53 'version' : plugin.version, 54 'attributes' : ( 55 json.dumps(plugin.attributes) if self.flavor not in json_flavors else plugin.attributes 56 ), 57 'user_id' : plugin.user_id, 58 } 59 60 if old_id is None: 61 query = sqlalchemy.insert(plugins_tbl).values(**bind_variables) 62 else: 63 query = ( 64 sqlalchemy.update(plugins_tbl) 65 .values(**bind_variables) 66 .where(plugins_tbl.c.plugin_id == old_id) 67 ) 68 69 result = self.exec(query, debug=debug) 70 if result is None: 71 return False, f"Failed to register plugin '{plugin}'." 72 return True, f"Successfully registered plugin '{plugin}'."
Register a new plugin to the plugins table.
243def delete_plugin( 244 self, 245 plugin: 'meerschaum.core.Plugin', 246 debug: bool = False, 247 **kw: Any 248 ) -> SuccessTuple: 249 """Delete a plugin from the plugins table.""" 250 from meerschaum.utils.warnings import warn, error 251 from meerschaum.utils.packages import attempt_import 252 sqlalchemy = attempt_import('sqlalchemy') 253 from meerschaum.connectors.sql.tables import get_tables 254 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 255 256 plugin_id = self.get_plugin_id(plugin, debug=debug) 257 if plugin_id is None: 258 return True, f"Plugin '{plugin}' was not registered." 259 260 bind_variables = { 261 'plugin_id' : plugin_id, 262 } 263 264 query = sqlalchemy.delete(plugins_tbl).where(plugins_tbl.c.plugin_id == plugin_id) 265 result = self.exec(query, debug=debug) 266 if result is None: 267 return False, f"Failed to delete plugin '{plugin}'." 268 return True, f"Successfully deleted plugin '{plugin}'."
Delete a plugin from the plugins table.
74def get_plugin_id( 75 self, 76 plugin: 'meerschaum.core.Plugin', 77 debug: bool = False 78 ) -> Optional[int]: 79 """ 80 Return a plugin's ID. 81 """ 82 ### ensure plugins table exists 83 from meerschaum.connectors.sql.tables import get_tables 84 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 85 from meerschaum.utils.packages import attempt_import 86 sqlalchemy = attempt_import('sqlalchemy') 87 88 query = ( 89 sqlalchemy 90 .select(plugins_tbl.c.plugin_id) 91 .where(plugins_tbl.c.plugin_name == plugin.name) 92 ) 93 94 try: 95 return int(self.value(query, debug=debug)) 96 except Exception as e: 97 return None
Return a plugin's ID.
99def get_plugin_version( 100 self, 101 plugin: 'meerschaum.core.Plugin', 102 debug: bool = False 103 ) -> Optional[str]: 104 """ 105 Return a plugin's version. 106 """ 107 ### ensure plugins table exists 108 from meerschaum.connectors.sql.tables import get_tables 109 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 110 from meerschaum.utils.packages import attempt_import 111 sqlalchemy = attempt_import('sqlalchemy') 112 113 query = sqlalchemy.select(plugins_tbl.c.version).where(plugins_tbl.c.plugin_name == plugin.name) 114 115 return self.value(query, debug=debug)
Return a plugin's version.
196def get_plugins( 197 self, 198 user_id: Optional[int] = None, 199 search_term: Optional[str] = None, 200 debug: bool = False, 201 **kw: Any 202 ) -> List[str]: 203 """ 204 Return a list of all registered plugins. 205 206 Parameters 207 ---------- 208 user_id: Optional[int], default None 209 If specified, filter plugins by a specific `user_id`. 210 211 search_term: Optional[str], default None 212 If specified, add a `WHERE plugin_name LIKE '{search_term}%'` clause to filter the plugins. 213 214 215 Returns 216 ------- 217 A list of plugin names. 218 """ 219 ### ensure plugins table exists 220 from meerschaum.connectors.sql.tables import get_tables 221 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 222 from meerschaum.utils.packages import attempt_import 223 sqlalchemy = attempt_import('sqlalchemy') 224 225 query = sqlalchemy.select(plugins_tbl.c.plugin_name) 226 if user_id is not None: 227 query = query.where(plugins_tbl.c.user_id == user_id) 228 if search_term is not None: 229 query = query.where(plugins_tbl.c.plugin_name.like(search_term + '%')) 230 231 rows = ( 232 self.execute(query).fetchall() 233 if self.flavor != 'duckdb' 234 else [ 235 (row['plugin_name'],) 236 for row in self.read(query).to_dict(orient='records') 237 ] 238 ) 239 240 return [row[0] for row in rows]
Return a list of all registered plugins.
Parameters
- user_id (Optional[int], default None):
If specified, filter plugins by a specific
user_id
. - search_term (Optional[str], default None):
If specified, add a
WHERE plugin_name LIKE '{search_term}%'
clause to filter the plugins.
Returns
- A list of plugin names.
117def get_plugin_user_id( 118 self, 119 plugin: 'meerschaum.core.Plugin', 120 debug: bool = False 121 ) -> Optional[int]: 122 """ 123 Return a plugin's user ID. 124 """ 125 ### ensure plugins table exists 126 from meerschaum.connectors.sql.tables import get_tables 127 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 128 from meerschaum.utils.packages import attempt_import 129 sqlalchemy = attempt_import('sqlalchemy') 130 131 query = ( 132 sqlalchemy 133 .select(plugins_tbl.c.user_id) 134 .where(plugins_tbl.c.plugin_name == plugin.name) 135 ) 136 137 try: 138 return int(self.value(query, debug=debug)) 139 except Exception as e: 140 return None
Return a plugin's user ID.
142def get_plugin_username( 143 self, 144 plugin: 'meerschaum.core.Plugin', 145 debug: bool = False 146 ) -> Optional[str]: 147 """ 148 Return the username of a plugin's owner. 149 """ 150 ### ensure plugins table exists 151 from meerschaum.connectors.sql.tables import get_tables 152 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 153 users = get_tables(mrsm_instance=self, debug=debug)['users'] 154 from meerschaum.utils.packages import attempt_import 155 sqlalchemy = attempt_import('sqlalchemy') 156 157 query = ( 158 sqlalchemy.select(users.c.username) 159 .where( 160 users.c.user_id == plugins_tbl.c.user_id 161 and plugins_tbl.c.plugin_name == plugin.name 162 ) 163 ) 164 165 return self.value(query, debug=debug)
Return the username of a plugin's owner.
168def get_plugin_attributes( 169 self, 170 plugin: 'meerschaum.core.Plugin', 171 debug: bool = False 172 ) -> Dict[str, Any]: 173 """ 174 Return the attributes of a plugin. 175 """ 176 ### ensure plugins table exists 177 from meerschaum.connectors.sql.tables import get_tables 178 import json 179 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 180 from meerschaum.utils.packages import attempt_import 181 sqlalchemy = attempt_import('sqlalchemy') 182 183 query = ( 184 sqlalchemy 185 .select(plugins_tbl.c.attributes) 186 .where(plugins_tbl.c.plugin_name == plugin.name) 187 ) 188 189 _attr = self.value(query, debug=debug) 190 if isinstance(_attr, str): 191 _attr = json.loads(_attr) 192 elif _attr is None: 193 _attr = {} 194 return _attr
Return the attributes of a plugin.
13def register_user( 14 self, 15 user: meerschaum.core.User, 16 debug: bool = False, 17 **kw: Any 18 ) -> SuccessTuple: 19 """Register a new user.""" 20 from meerschaum.utils.warnings import warn, error, info 21 from meerschaum.utils.packages import attempt_import 22 from meerschaum.utils.sql import json_flavors 23 sqlalchemy = attempt_import('sqlalchemy') 24 25 valid_tuple = valid_username(user.username) 26 if not valid_tuple[0]: 27 return valid_tuple 28 29 old_id = self.get_user_id(user, debug=debug) 30 31 if old_id is not None: 32 return False, f"User '{user}' already exists." 33 34 ### ensure users table exists 35 from meerschaum.connectors.sql.tables import get_tables 36 tables = get_tables(mrsm_instance=self, debug=debug) 37 38 import json 39 bind_variables = { 40 'username': user.username, 41 'email': user.email, 42 'password_hash': user.password_hash, 43 'user_type': user.type, 44 'attributes': ( 45 json.dumps(user.attributes) if self.flavor not in json_flavors else user.attributes 46 ), 47 } 48 if old_id is not None: 49 return False, f"User '{user.username}' already exists." 50 if old_id is None: 51 query = ( 52 sqlalchemy.insert(tables['users']). 53 values(**bind_variables) 54 ) 55 56 result = self.exec(query, debug=debug) 57 if result is None: 58 return False, f"Failed to register user '{user}'." 59 return True, f"Successfully registered user '{user}'."
Register a new user.
149def get_user_id( 150 self, 151 user: meerschaum.core.User, 152 debug : bool = False 153 ) -> Optional[int]: 154 """If a user is registered, return the `user_id`.""" 155 ### ensure users table exists 156 from meerschaum.utils.packages import attempt_import 157 sqlalchemy = attempt_import('sqlalchemy') 158 from meerschaum.connectors.sql.tables import get_tables 159 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 160 161 query = ( 162 sqlalchemy.select(users_tbl.c.user_id) 163 .where(users_tbl.c.username == user.username) 164 ) 165 166 result = self.value(query, debug=debug) 167 if result is not None: 168 return int(result) 169 return None
If a user is registered, return the user_id
.
242def get_users( 243 self, 244 debug: bool = False, 245 **kw: Any 246 ) -> List[str]: 247 """ 248 Get the registered usernames. 249 """ 250 ### ensure users table exists 251 from meerschaum.connectors.sql.tables import get_tables 252 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 253 from meerschaum.utils.packages import attempt_import 254 sqlalchemy = attempt_import('sqlalchemy') 255 256 query = sqlalchemy.select(users_tbl.c.username) 257 258 return list(self.read(query, debug=debug)['username'])
Get the registered usernames.
95def edit_user( 96 self, 97 user: meerschaum.core.User, 98 debug: bool = False, 99 **kw: Any 100 ) -> SuccessTuple: 101 """Update an existing user's metadata.""" 102 from meerschaum.utils.packages import attempt_import 103 sqlalchemy = attempt_import('sqlalchemy') 104 from meerschaum.connectors.sql.tables import get_tables 105 from meerschaum.utils.sql import json_flavors 106 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 107 108 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 109 if user_id is None: 110 return False, ( 111 f"User '{user.username}' does not exist. " + 112 f"Register user '{user.username}' before editing." 113 ) 114 user.user_id = user_id 115 116 import json 117 valid_tuple = valid_username(user.username) 118 if not valid_tuple[0]: 119 return valid_tuple 120 121 bind_variables = { 122 'user_id' : user_id, 123 'username' : user.username, 124 } 125 if user.password != '': 126 bind_variables['password_hash'] = user.password_hash 127 if user.email != '': 128 bind_variables['email'] = user.email 129 if user.attributes is not None and user.attributes != {}: 130 bind_variables['attributes'] = ( 131 json.dumps(user.attributes) if self.flavor in ('duckdb',) 132 else user.attributes 133 ) 134 if user.type != '': 135 bind_variables['user_type'] = user.type 136 137 query = ( 138 sqlalchemy 139 .update(users_tbl) 140 .values(**bind_variables) 141 .where(users_tbl.c.user_id == user_id) 142 ) 143 144 result = self.exec(query, debug=debug) 145 if result is None: 146 return False, f"Failed to edit user '{user}'." 147 return True, f"Successfully edited user '{user}'."
Update an existing user's metadata.
211def delete_user( 212 self, 213 user: meerschaum.core.User, 214 debug: bool = False 215 ) -> SuccessTuple: 216 """Delete a user's record from the users table.""" 217 ### ensure users table exists 218 from meerschaum.connectors.sql.tables import get_tables 219 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 220 plugins = get_tables(mrsm_instance=self, debug=debug)['plugins'] 221 from meerschaum.utils.packages import attempt_import 222 sqlalchemy = attempt_import('sqlalchemy') 223 224 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 225 226 if user_id is None: 227 return False, f"User '{user.username}' is not registered and cannot be deleted." 228 229 query = sqlalchemy.delete(users_tbl).where(users_tbl.c.user_id == user_id) 230 231 result = self.exec(query, debug=debug) 232 if result is None: 233 return False, f"Failed to delete user '{user}'." 234 235 query = sqlalchemy.delete(plugins).where(plugins.c.user_id == user_id) 236 result = self.exec(query, debug=debug) 237 if result is None: 238 return False, f"Failed to delete plugins of user '{user}'." 239 240 return True, f"Successfully deleted user '{user}'"
Delete a user's record from the users table.
260def get_user_password_hash( 261 self, 262 user: meerschaum.core.User, 263 debug: bool = False, 264 **kw: Any 265 ) -> Optional[str]: 266 """ 267 Return the password has for a user. 268 **NOTE**: This may be dangerous and is only allowed if the security settings explicity allow it. 269 """ 270 from meerschaum.utils.debug import dprint 271 from meerschaum.connectors.sql.tables import get_tables 272 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 273 from meerschaum.utils.packages import attempt_import 274 sqlalchemy = attempt_import('sqlalchemy') 275 276 if user.user_id is not None: 277 user_id = user.user_id 278 if debug: 279 dprint(f"Already given user_id: {user_id}") 280 else: 281 if debug: 282 dprint(f"Fetching user_id...") 283 user_id = self.get_user_id(user, debug=debug) 284 285 if user_id is None: 286 return None 287 288 query = sqlalchemy.select(users_tbl.c.password_hash).where(users_tbl.c.user_id == user_id) 289 290 return self.value(query, debug=debug)
Return the password has for a user. NOTE: This may be dangerous and is only allowed if the security settings explicity allow it.
292def get_user_type( 293 self, 294 user: meerschaum.core.User, 295 debug: bool = False, 296 **kw: Any 297 ) -> Optional[str]: 298 """ 299 Return the user's type. 300 """ 301 from meerschaum.connectors.sql.tables import get_tables 302 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 303 from meerschaum.utils.packages import attempt_import 304 sqlalchemy = attempt_import('sqlalchemy') 305 306 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 307 308 if user_id is None: 309 return None 310 311 query = sqlalchemy.select(users_tbl.c.user_type).where(users_tbl.c.user_id == user_id) 312 313 return self.value(query, debug=debug)
Return the user's type.
171def get_user_attributes( 172 self, 173 user: meerschaum.core.User, 174 debug: bool = False 175 ) -> Union[Dict[str, Any], None]: 176 """ 177 Return the user's attributes. 178 """ 179 ### ensure users table exists 180 from meerschaum.utils.warnings import warn 181 from meerschaum.utils.packages import attempt_import 182 sqlalchemy = attempt_import('sqlalchemy') 183 from meerschaum.connectors.sql.tables import get_tables 184 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 185 186 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 187 188 query = ( 189 sqlalchemy.select(users_tbl.c.attributes) 190 .where(users_tbl.c.user_id == user_id) 191 ) 192 193 result = self.value(query, debug=debug) 194 if result is not None and not isinstance(result, dict): 195 try: 196 result = dict(result) 197 _parsed = True 198 except Exception as e: 199 _parsed = False 200 if not _parsed: 201 try: 202 import json 203 result = json.loads(result) 204 _parsed = True 205 except Exception as e: 206 _parsed = False 207 if not _parsed: 208 warn(f"Received unexpected type for attributes: {result}") 209 return result
Return the user's attributes.
15@classmethod 16def from_uri( 17 cls, 18 uri: str, 19 label: Optional[str] = None, 20 as_dict: bool = False, 21 ) -> Union[ 22 'meerschaum.connectors.SQLConnector', 23 Dict[str, Union[str, int]], 24 ]: 25 """ 26 Create a new SQLConnector from a URI string. 27 28 Parameters 29 ---------- 30 uri: str 31 The URI connection string. 32 33 label: Optional[str], default None 34 If provided, use this as the connector label. 35 Otherwise use the determined database name. 36 37 as_dict: bool, default False 38 If `True`, return a dictionary of the keyword arguments 39 necessary to create a new `SQLConnector`, otherwise create a new object. 40 41 Returns 42 ------- 43 A new SQLConnector object or a dictionary of attributes (if `as_dict` is `True`). 44 """ 45 46 params = cls.parse_uri(uri) 47 params['uri'] = uri 48 flavor = params.get('flavor', None) 49 if not flavor or flavor not in cls.flavor_configs: 50 error(f"Invalid flavor '{flavor}' detected from the provided URI.") 51 52 if 'database' not in params: 53 error("Unable to determine the database from the provided URI.") 54 55 if flavor in ('sqlite', 'duckdb'): 56 if params['database'] == ':memory:': 57 params['label'] = label or f'memory_{flavor}' 58 else: 59 params['label'] = label or params['database'].split(os.path.sep)[-1].lower() 60 else: 61 params['label'] = label or ( 62 ( 63 (params['username'] + '@' if 'username' in params else '') 64 + params.get('host', '') 65 + ('/' if 'host' in params else '') 66 + params.get('database', '') 67 ).lower() 68 ) 69 70 return cls(**params) if not as_dict else params
Create a new SQLConnector from a URI string.
Parameters
- uri (str): The URI connection string.
- label (Optional[str], default None): If provided, use this as the connector label. Otherwise use the determined database name.
- as_dict (bool, default False):
If
True
, return a dictionary of the keyword arguments necessary to create a newSQLConnector
, otherwise create a new object.
Returns
- A new SQLConnector object or a dictionary of attributes (if
as_dict
isTrue
).
73@staticmethod 74def parse_uri(uri: str) -> Dict[str, Any]: 75 """ 76 Parse a URI string into a dictionary of parameters. 77 78 Parameters 79 ---------- 80 uri: str 81 The database connection URI. 82 83 Returns 84 ------- 85 A dictionary of attributes. 86 87 Examples 88 -------- 89 >>> parse_uri('sqlite:////home/foo/bar.db') 90 {'database': '/home/foo/bar.db', 'flavor': 'sqlite'} 91 >>> parse_uri( 92 ... 'mssql+pyodbc://sa:supersecureSECRETPASSWORD123!@localhost:1439' 93 ... + '/master?driver=ODBC+Driver+17+for+SQL+Server' 94 ... ) 95 {'host': 'localhost', 'database': 'master', 'username': 'sa', 96 'password': 'supersecureSECRETPASSWORD123!', 'port': 1439, 'flavor': 'mssql', 97 'driver': 'ODBC Driver 17 for SQL Server'} 98 >>> 99 """ 100 from urllib.parse import parse_qs, urlparse 101 sqlalchemy = attempt_import('sqlalchemy') 102 parser = sqlalchemy.engine.url.make_url 103 params = parser(uri).translate_connect_args() 104 params['flavor'] = uri.split(':')[0].split('+')[0] 105 if params['flavor'] == 'postgres': 106 params['flavor'] = 'postgresql' 107 if '?' in uri: 108 parsed_uri = urlparse(uri) 109 for key, value in parse_qs(parsed_uri.query).items(): 110 params.update({key: value[0]}) 111 112 if '--search_path' in params.get('options', ''): 113 params.update({'schema': params['options'].replace('--search_path=', '', 1)}) 114 return params
Parse a URI string into a dictionary of parameters.
Parameters
- uri (str): The database connection URI.
Returns
- A dictionary of attributes.
Examples
>>> parse_uri('sqlite:////home/foo/bar.db')
{'database': '/home/foo/bar.db', 'flavor': 'sqlite'}
>>> parse_uri(
... 'mssql+pyodbc://sa:supersecureSECRETPASSWORD123!@localhost:1439'
... + '/master?driver=ODBC+Driver+17+for+SQL+Server'
... )
{'host': 'localhost', 'database': 'master', 'username': 'sa',
'password': 'supersecureSECRETPASSWORD123!', 'port': 1439, 'flavor': 'mssql',
'driver': 'ODBC Driver 17 for SQL Server'}
>>>
Inherited Members
20class APIConnector(Connector): 21 """ 22 Connect to a Meerschaum API instance. 23 """ 24 25 IS_INSTANCE: bool = True 26 IS_THREAD_SAFE: bool = False 27 28 from ._request import ( 29 make_request, 30 get, 31 post, 32 put, 33 patch, 34 delete, 35 wget, 36 ) 37 from ._actions import get_actions, do_action 38 from ._misc import get_mrsm_version, get_chaining_status 39 from ._pipes import ( 40 register_pipe, 41 fetch_pipes_keys, 42 edit_pipe, 43 sync_pipe, 44 delete_pipe, 45 get_pipe_data, 46 get_pipe_id, 47 get_pipe_attributes, 48 get_sync_time, 49 pipe_exists, 50 create_metadata, 51 get_pipe_rowcount, 52 drop_pipe, 53 clear_pipe, 54 get_pipe_columns_types, 55 ) 56 from ._fetch import fetch 57 from ._plugins import ( 58 register_plugin, 59 install_plugin, 60 delete_plugin, 61 get_plugins, 62 get_plugin_attributes, 63 ) 64 from ._login import login, test_connection 65 from ._users import ( 66 register_user, 67 get_user_id, 68 get_users, 69 edit_user, 70 delete_user, 71 get_user_password_hash, 72 get_user_type, 73 get_user_attributes, 74 ) 75 from ._uri import from_uri 76 77 def __init__( 78 self, 79 label: Optional[str] = None, 80 wait: bool = False, 81 debug: bool = False, 82 **kw 83 ): 84 if 'uri' in kw: 85 from_uri_params = self.from_uri(kw['uri'], as_dict=True) 86 label = label or from_uri_params.get('label', None) 87 from_uri_params.pop('label', None) 88 kw.update(from_uri_params) 89 90 super().__init__('api', label=label, **kw) 91 if 'protocol' not in self.__dict__: 92 self.protocol = 'http' 93 if 'uri' not in self.__dict__: 94 self.verify_attributes(required_attributes) 95 else: 96 from meerschaum.connectors.sql import SQLConnector 97 conn_attrs = SQLConnector.parse_uri(self.__dict__['uri']) 98 if 'host' not in conn_attrs: 99 raise Exception(f"Invalid URI for '{self}'.") 100 self.__dict__.update(conn_attrs) 101 self.url = ( 102 self.protocol + '://' + 103 self.host 104 + ( 105 (':' + str(self.port)) 106 if self.__dict__.get('port', None) 107 else '' 108 ) 109 ) 110 self._token = None 111 self._expires = None 112 self._session = None 113 114 115 @property 116 def URI(self) -> str: 117 """ 118 Return the fully qualified URI. 119 """ 120 username = self.__dict__.get('username', None) 121 password = self.__dict__.get('password', None) 122 creds = (username + ':' + password + '@') if username and password else '' 123 return ( 124 self.protocol 125 + '://' 126 + creds 127 + self.host 128 + ( 129 (':' + str(self.port)) 130 if self.__dict__.get('port', None) 131 else '' 132 ) 133 ) 134 135 136 @property 137 def session(self): 138 if self._session is None: 139 requests = attempt_import('requests') 140 if requests: 141 self._session = requests.Session() 142 if self._session is None: 143 error(f"Failed to import requests. Is requests installed?") 144 return self._session 145 146 @property 147 def token(self): 148 expired = ( 149 True if self._expires is None else ( 150 ( 151 self._expires 152 < 153 datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=1) 154 ) 155 ) 156 ) 157 158 if self._token is None or expired: 159 success, msg = self.login() 160 if not success: 161 warn(msg, stack=False) 162 return self._token
Connect to a Meerschaum API instance.
77 def __init__( 78 self, 79 label: Optional[str] = None, 80 wait: bool = False, 81 debug: bool = False, 82 **kw 83 ): 84 if 'uri' in kw: 85 from_uri_params = self.from_uri(kw['uri'], as_dict=True) 86 label = label or from_uri_params.get('label', None) 87 from_uri_params.pop('label', None) 88 kw.update(from_uri_params) 89 90 super().__init__('api', label=label, **kw) 91 if 'protocol' not in self.__dict__: 92 self.protocol = 'http' 93 if 'uri' not in self.__dict__: 94 self.verify_attributes(required_attributes) 95 else: 96 from meerschaum.connectors.sql import SQLConnector 97 conn_attrs = SQLConnector.parse_uri(self.__dict__['uri']) 98 if 'host' not in conn_attrs: 99 raise Exception(f"Invalid URI for '{self}'.") 100 self.__dict__.update(conn_attrs) 101 self.url = ( 102 self.protocol + '://' + 103 self.host 104 + ( 105 (':' + str(self.port)) 106 if self.__dict__.get('port', None) 107 else '' 108 ) 109 ) 110 self._token = None 111 self._expires = None 112 self._session = None
27def make_request( 28 self, 29 method: str, 30 r_url: str, 31 headers: Optional[Dict[str, Any]] = None, 32 use_token: bool = True, 33 debug: bool = False, 34 **kwargs: Any 35 ) -> 'requests.Response': 36 """ 37 Make a request to this APIConnector's endpoint using the in-memory session. 38 39 Parameters 40 ---------- 41 method: str 42 The kind of request to make. 43 Accepted values: 44 - `'GET'` 45 - `'OPTIONS'` 46 - `'HEAD'` 47 - `'POST'` 48 - `'PUT'` 49 - `'PATCH'` 50 - `'DELETE'` 51 52 r_url: str 53 The relative URL for the endpoint (e.g. `'/pipes'`). 54 55 headers: Optional[Dict[str, Any]], default None 56 The headers to use for the request. 57 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 58 59 use_token: bool, default True 60 If `True`, add the authorization token to the headers. 61 62 debug: bool, default False 63 Verbosity toggle. 64 65 kwargs: Any 66 All other keyword arguments are passed to `requests.request`. 67 68 Returns 69 ------- 70 A `requests.Reponse` object. 71 """ 72 if method.upper() not in METHODS: 73 raise ValueError(f"Method '{method}' is not supported.") 74 75 verify = self.__dict__.get('verify', None) 76 if 'verify' not in kwargs and isinstance(verify, bool): 77 kwargs['verify'] = verify 78 79 headers = ( 80 copy.deepcopy(headers) 81 if isinstance(headers, dict) 82 else {} 83 ) 84 85 if use_token: 86 headers.update({'Authorization': f'Bearer {self.token}'}) 87 88 request_url = urllib.parse.urljoin(self.url, r_url) 89 if debug: 90 dprint(f"[{self}] Sending a '{method.upper()}' request to {request_url}") 91 92 return self.session.request( 93 method.upper(), 94 request_url, 95 headers = headers, 96 **kwargs 97 )
Make a request to this APIConnector's endpoint using the in-memory session.
Parameters
- method (str):
The kind of request to make.
Accepted values:
'GET'
'OPTIONS'
'HEAD'
'POST'
'PUT'
'PATCH'
'DELETE'
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'
). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_token
isTrue
, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True
, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request
.
Returns
- A
requests.Reponse
object.
100def get(self, r_url: str, **kwargs: Any) -> 'requests.Response': 101 """ 102 Wrapper for `requests.get`. 103 104 Parameters 105 ---------- 106 r_url: str 107 The relative URL for the endpoint (e.g. `'/pipes'`). 108 109 headers: Optional[Dict[str, Any]], default None 110 The headers to use for the request. 111 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 112 113 use_token: bool, default True 114 If `True`, add the authorization token to the headers. 115 116 debug: bool, default False 117 Verbosity toggle. 118 119 kwargs: Any 120 All other keyword arguments are passed to `requests.request`. 121 122 Returns 123 ------- 124 A `requests.Reponse` object. 125 126 """ 127 return self.make_request('GET', r_url, **kwargs)
Wrapper for requests.get
.
Parameters
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'
). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_token
isTrue
, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True
, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request
.
Returns
- A
requests.Reponse
object.
130def post(self, r_url: str, **kwargs: Any) -> 'requests.Response': 131 """ 132 Wrapper for `requests.post`. 133 134 Parameters 135 ---------- 136 r_url: str 137 The relative URL for the endpoint (e.g. `'/pipes'`). 138 139 headers: Optional[Dict[str, Any]], default None 140 The headers to use for the request. 141 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 142 143 use_token: bool, default True 144 If `True`, add the authorization token to the headers. 145 146 debug: bool, default False 147 Verbosity toggle. 148 149 kwargs: Any 150 All other keyword arguments are passed to `requests.request`. 151 152 Returns 153 ------- 154 A `requests.Reponse` object. 155 156 """ 157 return self.make_request('POST', r_url, **kwargs)
Wrapper for requests.post
.
Parameters
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'
). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_token
isTrue
, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True
, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request
.
Returns
- A
requests.Reponse
object.
189def put(self, r_url: str, **kwargs: Any) -> 'requests.Response': 190 """ 191 Wrapper for `requests.put`. 192 193 Parameters 194 ---------- 195 r_url: str 196 The relative URL for the endpoint (e.g. `'/pipes'`). 197 198 headers: Optional[Dict[str, Any]], default None 199 The headers to use for the request. 200 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 201 202 use_token: bool, default True 203 If `True`, add the authorization token to the headers. 204 205 debug: bool, default False 206 Verbosity toggle. 207 208 kwargs: Any 209 All other keyword arguments are passed to `requests.request`. 210 211 Returns 212 ------- 213 A `requests.Reponse` object. 214 """ 215 return self.make_request('PUT', r_url, **kwargs)
Wrapper for requests.put
.
Parameters
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'
). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_token
isTrue
, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True
, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request
.
Returns
- A
requests.Reponse
object.
160def patch(self, r_url: str, **kwargs: Any) -> 'requests.Response': 161 """ 162 Wrapper for `requests.patch`. 163 164 Parameters 165 ---------- 166 r_url: str 167 The relative URL for the endpoint (e.g. `'/pipes'`). 168 169 headers: Optional[Dict[str, Any]], default None 170 The headers to use for the request. 171 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 172 173 use_token: bool, default True 174 If `True`, add the authorization token to the headers. 175 176 debug: bool, default False 177 Verbosity toggle. 178 179 kwargs: Any 180 All other keyword arguments are passed to `requests.request`. 181 182 Returns 183 ------- 184 A `requests.Reponse` object. 185 """ 186 return self.make_request('PATCH', r_url, **kwargs)
Wrapper for requests.patch
.
Parameters
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'
). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_token
isTrue
, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True
, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request
.
Returns
- A
requests.Reponse
object.
218def delete(self, r_url: str, **kwargs: Any) -> 'requests.Response': 219 """ 220 Wrapper for `requests.delete`. 221 222 Parameters 223 ---------- 224 r_url: str 225 The relative URL for the endpoint (e.g. `'/pipes'`). 226 227 headers: Optional[Dict[str, Any]], default None 228 The headers to use for the request. 229 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 230 231 use_token: bool, default True 232 If `True`, add the authorization token to the headers. 233 234 debug: bool, default False 235 Verbosity toggle. 236 237 kwargs: Any 238 All other keyword arguments are passed to `requests.request`. 239 240 Returns 241 ------- 242 A `requests.Reponse` object. 243 """ 244 return self.make_request('DELETE', r_url, **kwargs)
Wrapper for requests.delete
.
Parameters
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'
). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_token
isTrue
, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True
, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request
.
Returns
- A
requests.Reponse
object.
247def wget( 248 self, 249 r_url: str, 250 dest: Optional[Union[str, pathlib.Path]] = None, 251 headers: Optional[Dict[str, Any]] = None, 252 use_token: bool = True, 253 debug: bool = False, 254 **kw: Any 255 ) -> pathlib.Path: 256 """Mimic wget with requests. 257 """ 258 from meerschaum.utils.misc import wget 259 if headers is None: 260 headers = {} 261 if use_token: 262 headers.update({'Authorization': f'Bearer {self.token}'}) 263 request_url = urllib.parse.urljoin(self.url, r_url) 264 if debug: 265 dprint( 266 f"[{self}] Downloading {request_url}" 267 + (f' to {dest}' if dest is not None else '') 268 + "..." 269 ) 270 return wget(request_url, dest=dest, headers=headers, **kw)
Mimic wget with requests.
13def get_actions( 14 self, 15 ) -> list: 16 """Get available actions from the API server""" 17 from meerschaum.config.static import STATIC_CONFIG 18 return self.get(STATIC_CONFIG['api']['endpoints']['actions'])
Get available actions from the API server
21def do_action( 22 self, 23 action: Optional[List[str]] = None, 24 sysargs: Optional[List[str]] = None, 25 debug: bool = False, 26 **kw 27 ) -> SuccessTuple: 28 """Execute a Meerschaum action remotely. 29 30 If sysargs is provided, parse those instead. Otherwise infer everything from keyword arguments. 31 32 NOTE: The first index of `action` should NOT be removed! 33 Example: action = ['show', 'config'] 34 35 Returns: tuple (succeeded : bool, message : str) 36 37 Parameters 38 ---------- 39 action: Optional[List[str]] : 40 (Default value = None) 41 sysargs: Optional[List[str]] : 42 (Default value = None) 43 debug: bool : 44 (Default value = False) 45 **kw : 46 47 48 Returns 49 ------- 50 51 """ 52 import sys, json 53 from meerschaum.utils.debug import dprint 54 from meerschaum.config.static import STATIC_CONFIG 55 from meerschaum.utils.misc import json_serialize_datetime 56 if action is None: 57 action = [] 58 59 if sysargs is not None and action and action[0] == '': 60 from meerschaum._internal.arguments import parse_arguments 61 if debug: 62 dprint(f"Parsing sysargs:\n{sysargs}") 63 json_dict = parse_arguments(sysargs) 64 else: 65 json_dict = kw 66 json_dict['action'] = action 67 json_dict['debug'] = debug 68 69 root_action = json_dict['action'][0] 70 del json_dict['action'][0] 71 r_url = f"{STATIC_CONFIG['api']['endpoints']['actions']}/{root_action}" 72 73 if debug: 74 from meerschaum.utils.formatting import pprint 75 dprint(f"Sending data to '{self.url + r_url}':") 76 pprint(json_dict, stream=sys.stderr) 77 78 response = self.post( 79 r_url, 80 data = json.dumps(json_dict, default=json_serialize_datetime), 81 debug = debug, 82 ) 83 try: 84 response_list = json.loads(response.text) 85 if isinstance(response_list, dict) and 'detail' in response_list: 86 return False, response_list['detail'] 87 except Exception as e: 88 print(f"Invalid response: {response}") 89 print(e) 90 return False, response.text 91 if debug: 92 dprint(response) 93 try: 94 return response_list[0], response_list[1] 95 except Exception as e: 96 return False, f"Failed to parse result from action '{root_action}'"
Execute a Meerschaum action remotely.
If sysargs is provided, parse those instead. Otherwise infer everything from keyword arguments.
NOTE: The first index of action
should NOT be removed!
Example: action = ['show', 'config']
Returns: tuple (succeeded : bool, message : str)
Parameters
- action (Optional[List[str]] :): (Default value = None)
- sysargs (Optional[List[str]] :): (Default value = None)
- debug (bool :): (Default value = False)
- **kw :
- Returns
- -------
13def get_mrsm_version(self, **kw) -> Optional[str]: 14 """ 15 Return the Meerschaum version of the API instance. 16 """ 17 from meerschaum.config.static import STATIC_CONFIG 18 try: 19 j = self.get( 20 STATIC_CONFIG['api']['endpoints']['version'] + '/mrsm', 21 use_token = True, 22 **kw 23 ).json() 24 except Exception as e: 25 return None 26 if isinstance(j, dict) and 'detail' in j: 27 return None 28 return j
Return the Meerschaum version of the API instance.
30def get_chaining_status(self, **kw) -> Optional[bool]: 31 """ 32 Fetch the chaining status of the API instance. 33 """ 34 from meerschaum.config.static import STATIC_CONFIG 35 try: 36 response = self.get( 37 STATIC_CONFIG['api']['endpoints']['chaining'], 38 use_token = True, 39 **kw 40 ) 41 if not response: 42 return None 43 except Exception as e: 44 return None 45 46 return response.json()
Fetch the chaining status of the API instance.
33def register_pipe( 34 self, 35 pipe: mrsm.Pipe, 36 debug: bool = False 37 ) -> SuccessTuple: 38 """Submit a POST to the API to register a new Pipe object. 39 Returns a tuple of (success_bool, response_dict). 40 """ 41 from meerschaum.utils.debug import dprint 42 from meerschaum.config.static import STATIC_CONFIG 43 ### NOTE: if `parameters` is supplied in the Pipe constructor, 44 ### then `pipe.parameters` will exist and not be fetched from the database. 45 r_url = pipe_r_url(pipe) 46 response = self.post( 47 r_url + '/register', 48 json = pipe.parameters, 49 debug = debug, 50 ) 51 if debug: 52 dprint(response.text) 53 if isinstance(response.json(), list): 54 response_tuple = response.__bool__(), response.json()[1] 55 elif 'detail' in response.json(): 56 response_tuple = response.__bool__(), response.json()['detail'] 57 else: 58 response_tuple = response.__bool__(), response.text 59 return response_tuple
Submit a POST to the API to register a new Pipe object. Returns a tuple of (success_bool, response_dict).
92def fetch_pipes_keys( 93 self, 94 connector_keys: Optional[List[str]] = None, 95 metric_keys: Optional[List[str]] = None, 96 location_keys: Optional[List[str]] = None, 97 tags: Optional[List[str]] = None, 98 params: Optional[Dict[str, Any]] = None, 99 debug: bool = False 100 ) -> Union[List[Tuple[str, str, Union[str, None]]]]: 101 """ 102 Fetch registered Pipes' keys from the API. 103 104 Parameters 105 ---------- 106 connector_keys: Optional[List[str]], default None 107 The connector keys for the query. 108 109 metric_keys: Optional[List[str]], default None 110 The metric keys for the query. 111 112 location_keys: Optional[List[str]], default None 113 The location keys for the query. 114 115 tags: Optional[List[str]], default None 116 A list of tags for the query. 117 118 params: Optional[Dict[str, Any]], default None 119 A parameters dictionary for filtering against the `pipes` table 120 (e.g. `{'connector_keys': 'plugin:foo'}`). 121 Not recommeded to be used. 122 123 debug: bool, default False 124 Verbosity toggle. 125 126 Returns 127 ------- 128 A list of tuples containing pipes' keys. 129 """ 130 from meerschaum.config.static import STATIC_CONFIG 131 if connector_keys is None: 132 connector_keys = [] 133 if metric_keys is None: 134 metric_keys = [] 135 if location_keys is None: 136 location_keys = [] 137 if tags is None: 138 tags = [] 139 140 r_url = STATIC_CONFIG['api']['endpoints']['pipes'] + '/keys' 141 try: 142 j = self.get( 143 r_url, 144 params = { 145 'connector_keys': json.dumps(connector_keys), 146 'metric_keys': json.dumps(metric_keys), 147 'location_keys': json.dumps(location_keys), 148 'tags': json.dumps(tags), 149 'params': json.dumps(params), 150 }, 151 debug=debug 152 ).json() 153 except Exception as e: 154 error(str(e)) 155 156 if 'detail' in j: 157 error(j['detail'], stack=False) 158 return [tuple(r) for r in j]
Fetch registered Pipes' keys from the API.
Parameters
- connector_keys (Optional[List[str]], default None): The connector keys for the query.
- metric_keys (Optional[List[str]], default None): The metric keys for the query.
- location_keys (Optional[List[str]], default None): The location keys for the query.
- tags (Optional[List[str]], default None): A list of tags for the query.
- params (Optional[Dict[str, Any]], default None):
A parameters dictionary for filtering against the
pipes
table (e.g.{'connector_keys': 'plugin:foo'}
). Not recommeded to be used. - debug (bool, default False): Verbosity toggle.
Returns
- A list of tuples containing pipes' keys.
62def edit_pipe( 63 self, 64 pipe: mrsm.Pipe, 65 patch: bool = False, 66 debug: bool = False, 67 ) -> SuccessTuple: 68 """Submit a PATCH to the API to edit an existing Pipe object. 69 Returns a tuple of (success_bool, response_dict). 70 """ 71 from meerschaum.utils.debug import dprint 72 ### NOTE: if `parameters` is supplied in the Pipe constructor, 73 ### then `pipe.parameters` will exist and not be fetched from the database. 74 r_url = pipe_r_url(pipe) 75 response = self.patch( 76 r_url + '/edit', 77 params = {'patch': patch,}, 78 json = pipe.parameters, 79 debug = debug, 80 ) 81 if debug: 82 dprint(response.text) 83 if isinstance(response.json(), list): 84 response_tuple = response.__bool__(), response.json()[1] 85 elif 'detail' in response.json(): 86 response_tuple = response.__bool__(), response.json()['detail'] 87 else: 88 response_tuple = response.__bool__(), response.text 89 return response_tuple
Submit a PATCH to the API to edit an existing Pipe object. Returns a tuple of (success_bool, response_dict).
161def sync_pipe( 162 self, 163 pipe: mrsm.Pipe, 164 df: Optional[Union['pd.DataFrame', Dict[Any, Any], str]] = None, 165 chunksize: Optional[int] = -1, 166 debug: bool = False, 167 **kw: Any 168 ) -> SuccessTuple: 169 """Sync a DataFrame into a Pipe.""" 170 from decimal import Decimal 171 from meerschaum.utils.debug import dprint 172 from meerschaum.utils.misc import json_serialize_datetime, items_str 173 from meerschaum.config import get_config 174 from meerschaum.utils.packages import attempt_import, import_pandas 175 from meerschaum.utils.dataframe import get_numeric_cols 176 begin = time.time() 177 more_itertools = attempt_import('more_itertools') 178 if df is None: 179 msg = f"DataFrame is `None`. Cannot sync {pipe}." 180 return False, msg 181 182 def get_json_str(c): 183 ### allow syncing dict or JSON without needing to import pandas (for IOT devices) 184 if isinstance(c, (dict, list)): 185 return json.dumps(c, default=json_serialize_datetime) 186 pd = import_pandas() 187 return c.fillna(pd.NA).to_json(date_format='iso', date_unit='ns') 188 189 df = json.loads(df) if isinstance(df, str) else df 190 191 _chunksize: Optional[int] = (1 if chunksize is None else ( 192 get_config('system', 'connectors', 'sql', 'chunksize') if chunksize == -1 193 else chunksize 194 )) 195 keys: List[str] = list(df.columns) 196 chunks = [] 197 if hasattr(df, 'index'): 198 df = df.reset_index(drop=True) 199 is_dask = 'dask' in df.__module__ 200 chunks = ( 201 (df.iloc[i] for i in more_itertools.chunked(df.index, _chunksize)) 202 if not is_dask 203 else [partition.compute() for partition in df.partitions] 204 ) 205 numeric_cols = get_numeric_cols(df) 206 if numeric_cols: 207 for col in numeric_cols: 208 df[col] = df[col].apply(lambda x: f'{x:f}' if isinstance(x, Decimal) else x) 209 pipe_dtypes = pipe.dtypes 210 new_numeric_cols = [ 211 col 212 for col in numeric_cols 213 if pipe_dtypes.get(col, None) != 'numeric' 214 ] 215 pipe.dtypes.update({ 216 col: 'numeric' 217 for col in new_numeric_cols 218 }) 219 edit_success, edit_msg = pipe.edit(debug=debug) 220 if not edit_success: 221 warn( 222 "Failed to update new numeric columns " 223 + f"{items_str(new_numeric_cols)}:\n{edit_msg}" 224 ) 225 elif isinstance(df, dict): 226 ### `_chunks` is a dict of lists of dicts. 227 ### e.g. {'a' : [ {'a':[1, 2]}, {'a':[3, 4]} ] } 228 _chunks = {k: [] for k in keys} 229 for k in keys: 230 chunk_iter = more_itertools.chunked(df[k], _chunksize) 231 for l in chunk_iter: 232 _chunks[k].append({k: l}) 233 234 ### `chunks` is a list of dicts (e.g. orient by rows in pandas JSON). 235 for k, l in _chunks.items(): 236 for i, c in enumerate(l): 237 try: 238 chunks[i].update(c) 239 except IndexError: 240 chunks.append(c) 241 elif isinstance(df, list): 242 chunks = (df[i] for i in more_itertools.chunked(df, _chunksize)) 243 244 ### Send columns in case the user has defined them locally. 245 if pipe.columns: 246 kw['columns'] = json.dumps(pipe.columns) 247 r_url = pipe_r_url(pipe) + '/data' 248 249 rowcount = 0 250 num_success_chunks = 0 251 for i, c in enumerate(chunks): 252 if debug: 253 dprint(f"[{self}] Posting chunk {i} to {r_url}...") 254 if len(c) == 0: 255 if debug: 256 dprint(f"[{self}] Skipping empty chunk...") 257 continue 258 json_str = get_json_str(c) 259 260 try: 261 response = self.post( 262 r_url, 263 ### handles check_existing 264 params = kw, 265 data = json_str, 266 debug = debug 267 ) 268 except Exception as e: 269 msg = f"Failed to post a chunk to {pipe}:\n{e}" 270 warn(msg) 271 return False, msg 272 273 if not response: 274 return False, f"Failed to sync a chunk:\n{response.text}" 275 276 try: 277 j = json.loads(response.text) 278 except Exception as e: 279 return False, f"Failed to parse response from syncing {pipe}:\n{e}" 280 281 if isinstance(j, dict) and 'detail' in j: 282 return False, j['detail'] 283 284 try: 285 j = tuple(j) 286 except Exception as e: 287 return False, response.text 288 289 if debug: 290 dprint("Received response: " + str(j)) 291 if not j[0]: 292 return j 293 294 rowcount += len(c) 295 num_success_chunks += 1 296 297 success_tuple = True, ( 298 f"It took {round(time.time() - begin, 2)} seconds to sync {rowcount} row" 299 + ('s' if rowcount != 1 else '') 300 + f" across {num_success_chunks} chunk" + ('s' if num_success_chunks != 1 else '') + 301 f" to {pipe}." 302 ) 303 return success_tuple
Sync a DataFrame into a Pipe.
306def delete_pipe( 307 self, 308 pipe: Optional[meerschaum.Pipe] = None, 309 debug: bool = None, 310 ) -> SuccessTuple: 311 """Delete a Pipe and drop its table.""" 312 if pipe is None: 313 error(f"Pipe cannot be None.") 314 r_url = pipe_r_url(pipe) 315 response = self.delete( 316 r_url + '/delete', 317 debug = debug, 318 ) 319 if debug: 320 dprint(response.text) 321 if isinstance(response.json(), list): 322 response_tuple = response.__bool__(), response.json()[1] 323 elif 'detail' in response.json(): 324 response_tuple = response.__bool__(), response.json()['detail'] 325 else: 326 response_tuple = response.__bool__(), response.text 327 return response_tuple
Delete a Pipe and drop its table.
330def get_pipe_data( 331 self, 332 pipe: meerschaum.Pipe, 333 select_columns: Optional[List[str]] = None, 334 omit_columns: Optional[List[str]] = None, 335 begin: Union[str, datetime, int, None] = None, 336 end: Union[str, datetime, int, None] = None, 337 params: Optional[Dict[str, Any]] = None, 338 as_chunks: bool = False, 339 debug: bool = False, 340 **kw: Any 341 ) -> Union[pandas.DataFrame, None]: 342 """Fetch data from the API.""" 343 r_url = pipe_r_url(pipe) 344 chunks_list = [] 345 while True: 346 try: 347 response = self.get( 348 r_url + "/data", 349 params = { 350 'select_columns': json.dumps(select_columns), 351 'omit_columns': json.dumps(omit_columns), 352 'begin': begin, 353 'end': end, 354 'params': json.dumps(params) 355 }, 356 debug = debug 357 ) 358 if not response.ok: 359 return None 360 j = response.json() 361 except Exception as e: 362 warn(f"Failed to get data for {pipe}:\n{e}") 363 return None 364 if isinstance(j, dict) and 'detail' in j: 365 return False, j['detail'] 366 break 367 368 from meerschaum.utils.packages import import_pandas 369 from meerschaum.utils.dataframe import parse_df_datetimes, add_missing_cols_to_df 370 pd = import_pandas() 371 try: 372 df = pd.read_json(StringIO(response.text)) 373 except Exception as e: 374 warn(f"Failed to parse response for {pipe}:\n{e}") 375 return None 376 377 if len(df.columns) == 0: 378 return add_missing_cols_to_df(df, pipe.dtypes) 379 380 df = parse_df_datetimes( 381 df, 382 ignore_cols = [ 383 col 384 for col, dtype in pipe.dtypes.items() 385 if 'datetime' not in str(dtype) 386 ], 387 debug = debug, 388 ) 389 return df
Fetch data from the API.
392def get_pipe_id( 393 self, 394 pipe: meerschuam.Pipe, 395 debug: bool = False, 396 ) -> int: 397 """Get a Pipe's ID from the API.""" 398 from meerschaum.utils.misc import is_int 399 r_url = pipe_r_url(pipe) 400 response = self.get( 401 r_url + '/id', 402 debug = debug 403 ) 404 if debug: 405 dprint(f"Got pipe ID: {response.text}") 406 try: 407 if is_int(response.text): 408 return int(response.text) 409 except Exception as e: 410 warn(f"Failed to get the ID for {pipe}:\n{e}") 411 return None
Get a Pipe's ID from the API.
414def get_pipe_attributes( 415 self, 416 pipe: meerschaum.Pipe, 417 debug: bool = False, 418 ) -> Dict[str, Any]: 419 """Get a Pipe's attributes from the API 420 421 Parameters 422 ---------- 423 pipe: meerschaum.Pipe 424 The pipe whose attributes we are fetching. 425 426 Returns 427 ------- 428 A dictionary of a pipe's attributes. 429 If the pipe does not exist, return an empty dictionary. 430 """ 431 r_url = pipe_r_url(pipe) 432 response = self.get(r_url + '/attributes', debug=debug) 433 try: 434 return json.loads(response.text) 435 except Exception as e: 436 warn(f"Failed to get the attributes for {pipe}:\n{e}") 437 return {}
Get a Pipe's attributes from the API
Parameters
- pipe (meerschaum.Pipe): The pipe whose attributes we are fetching.
Returns
- A dictionary of a pipe's attributes.
- If the pipe does not exist, return an empty dictionary.
440def get_sync_time( 441 self, 442 pipe: 'meerschaum.Pipe', 443 params: Optional[Dict[str, Any]] = None, 444 newest: bool = True, 445 debug: bool = False, 446 ) -> Union[datetime, int, None]: 447 """Get a Pipe's most recent datetime value from the API. 448 449 Parameters 450 ---------- 451 pipe: meerschaum.Pipe 452 The pipe to select from. 453 454 params: Optional[Dict[str, Any]], default None 455 Optional params dictionary to build the WHERE clause. 456 457 newest: bool, default True 458 If `True`, get the most recent datetime (honoring `params`). 459 If `False`, get the oldest datetime (ASC instead of DESC). 460 461 Returns 462 ------- 463 The most recent (or oldest if `newest` is `False`) datetime of a pipe, 464 rounded down to the closest minute. 465 """ 466 from meerschaum.utils.misc import is_int 467 from meerschaum.utils.warnings import warn 468 r_url = pipe_r_url(pipe) 469 response = self.get( 470 r_url + '/sync_time', 471 json = params, 472 params = {'newest': newest, 'debug': debug}, 473 debug = debug, 474 ) 475 if not response: 476 warn(f"Failed to get the sync time for {pipe}:\n" + response.text) 477 return None 478 479 j = response.json() 480 if j is None: 481 dt = None 482 else: 483 try: 484 dt = ( 485 datetime.fromisoformat(j) 486 if not is_int(j) 487 else int(j) 488 ) 489 except Exception as e: 490 warn(f"Failed to parse the sync time '{j}' for {pipe}:\n{e}") 491 dt = None 492 return dt
Get a Pipe's most recent datetime value from the API.
Parameters
- pipe (meerschaum.Pipe): The pipe to select from.
- params (Optional[Dict[str, Any]], default None): Optional params dictionary to build the WHERE clause.
- newest (bool, default True):
If
True
, get the most recent datetime (honoringparams
). IfFalse
, get the oldest datetime (ASC instead of DESC).
Returns
- The most recent (or oldest if
newest
isFalse
) datetime of a pipe, - rounded down to the closest minute.
495def pipe_exists( 496 self, 497 pipe: 'meerschaum.Pipe', 498 debug: bool = False 499 ) -> bool: 500 """Check the API to see if a Pipe exists. 501 502 Parameters 503 ---------- 504 pipe: 'meerschaum.Pipe' 505 The pipe which were are querying. 506 507 Returns 508 ------- 509 A bool indicating whether a pipe's underlying table exists. 510 """ 511 from meerschaum.utils.debug import dprint 512 from meerschaum.utils.warnings import warn 513 r_url = pipe_r_url(pipe) 514 response = self.get(r_url + '/exists', debug=debug) 515 if not response: 516 warn(f"Failed to check if {pipe} exists:\n{response.text}") 517 return False 518 if debug: 519 dprint("Received response: " + str(response.text)) 520 j = response.json() 521 if isinstance(j, dict) and 'detail' in j: 522 warn(j['detail']) 523 return j
Check the API to see if a Pipe exists.
Parameters
- pipe ('meerschaum.Pipe'): The pipe which were are querying.
Returns
- A bool indicating whether a pipe's underlying table exists.
526def create_metadata( 527 self, 528 debug: bool = False 529 ) -> bool: 530 """Create metadata tables. 531 532 Returns 533 ------- 534 A bool indicating success. 535 """ 536 from meerschaum.utils.debug import dprint 537 from meerschaum.config.static import STATIC_CONFIG 538 r_url = STATIC_CONFIG['api']['endpoints']['metadata'] 539 response = self.post(r_url, debug=debug) 540 if debug: 541 dprint("Create metadata response: {response.text}") 542 try: 543 metadata_response = json.loads(response.text) 544 except Exception as e: 545 warn(f"Failed to create metadata on {self}:\n{e}") 546 metadata_response = False 547 return False
Create metadata tables.
Returns
- A bool indicating success.
550def get_pipe_rowcount( 551 self, 552 pipe: 'meerschaum.Pipe', 553 begin: Optional[datetime] = None, 554 end: Optional[datetime] = None, 555 params: Optional[Dict[str, Any]] = None, 556 remote: bool = False, 557 debug: bool = False, 558 ) -> int: 559 """Get a pipe's row count from the API. 560 561 Parameters 562 ---------- 563 pipe: 'meerschaum.Pipe': 564 The pipe whose row count we are counting. 565 566 begin: Optional[datetime], default None 567 If provided, bound the count by this datetime. 568 569 end: Optional[datetime] 570 If provided, bound the count by this datetime. 571 572 params: Optional[Dict[str, Any]], default None 573 If provided, bound the count by these parameters. 574 575 remote: bool, default False 576 577 Returns 578 ------- 579 The number of rows in the pipe's table, bound the given parameters. 580 If the table does not exist, return 0. 581 """ 582 r_url = pipe_r_url(pipe) 583 response = self.get( 584 r_url + "/rowcount", 585 json = params, 586 params = { 587 'begin': begin, 588 'end': end, 589 'remote': remote, 590 }, 591 debug = debug 592 ) 593 if not response: 594 warn(f"Failed to get the rowcount for {pipe}:\n{response.text}") 595 return 0 596 try: 597 return int(json.loads(response.text)) 598 except Exception as e: 599 warn(f"Failed to get the rowcount for {pipe}:\n{e}") 600 return 0
Get a pipe's row count from the API.
Parameters
- pipe ('meerschaum.Pipe':): The pipe whose row count we are counting.
- begin (Optional[datetime], default None): If provided, bound the count by this datetime.
- end (Optional[datetime]): If provided, bound the count by this datetime.
- params (Optional[Dict[str, Any]], default None): If provided, bound the count by these parameters.
- remote (bool, default False):
Returns
- The number of rows in the pipe's table, bound the given parameters.
- If the table does not exist, return 0.
603def drop_pipe( 604 self, 605 pipe: meerschaum.Pipe, 606 debug: bool = False 607 ) -> SuccessTuple: 608 """ 609 Drop a pipe's table but maintain its registration. 610 611 Parameters 612 ---------- 613 pipe: meerschaum.Pipe: 614 The pipe to be dropped. 615 616 Returns 617 ------- 618 A success tuple (bool, str). 619 """ 620 from meerschaum.utils.warnings import error 621 from meerschaum.utils.debug import dprint 622 if pipe is None: 623 error(f"Pipe cannot be None.") 624 r_url = pipe_r_url(pipe) 625 response = self.delete( 626 r_url + '/drop', 627 debug = debug, 628 ) 629 if debug: 630 dprint(response.text) 631 632 try: 633 data = response.json() 634 except Exception as e: 635 return False, f"Failed to drop {pipe}." 636 637 if isinstance(data, list): 638 response_tuple = response.__bool__(), data[1] 639 elif 'detail' in response.json(): 640 response_tuple = response.__bool__(), data['detail'] 641 else: 642 response_tuple = response.__bool__(), response.text 643 644 return response_tuple
Drop a pipe's table but maintain its registration.
Parameters
- pipe (meerschaum.Pipe:): The pipe to be dropped.
Returns
- A success tuple (bool, str).
647def clear_pipe( 648 self, 649 pipe: meerschaum.Pipe, 650 debug: bool = False, 651 **kw 652 ) -> SuccessTuple: 653 """ 654 Delete rows in a pipe's table. 655 656 Parameters 657 ---------- 658 pipe: meerschaum.Pipe 659 The pipe with rows to be deleted. 660 661 Returns 662 ------- 663 A success tuple. 664 """ 665 kw.pop('metric_keys', None) 666 kw.pop('connector_keys', None) 667 kw.pop('location_keys', None) 668 kw.pop('action', None) 669 kw.pop('force', None) 670 return self.do_action( 671 ['clear', 'pipes'], 672 connector_keys = pipe.connector_keys, 673 metric_keys = pipe.metric_key, 674 location_keys = pipe.location_key, 675 force = True, 676 debug = debug, 677 **kw 678 )
Delete rows in a pipe's table.
Parameters
- pipe (meerschaum.Pipe): The pipe with rows to be deleted.
Returns
- A success tuple.
681def get_pipe_columns_types( 682 self, 683 pipe: meerschaum.Pipe, 684 debug: bool = False, 685 ) -> Union[Dict[str, str], None]: 686 """ 687 Fetch the columns and types of the pipe's table. 688 689 Parameters 690 ---------- 691 pipe: meerschaum.Pipe 692 The pipe whose columns to be queried. 693 694 Returns 695 ------- 696 A dictionary mapping column names to their database types. 697 698 Examples 699 -------- 700 >>> { 701 ... 'dt': 'TIMESTAMP WITHOUT TIMEZONE', 702 ... 'id': 'BIGINT', 703 ... 'val': 'DOUBLE PRECISION', 704 ... } 705 >>> 706 """ 707 r_url = pipe_r_url(pipe) + '/columns/types' 708 response = self.get( 709 r_url, 710 debug = debug 711 ) 712 j = response.json() 713 if isinstance(j, dict) and 'detail' in j and len(j.keys()) == 1: 714 from meerschaum.utils.warnings import warn 715 warn(j['detail']) 716 return None 717 if not isinstance(j, dict): 718 warn(response.text) 719 return None 720 return j
Fetch the columns and types of the pipe's table.
Parameters
- pipe (meerschaum.Pipe): The pipe whose columns to be queried.
Returns
- A dictionary mapping column names to their database types.
Examples
>>> {
... 'dt': 'TIMESTAMP WITHOUT TIMEZONE',
... 'id': 'BIGINT',
... 'val': 'DOUBLE PRECISION',
... }
>>>
16def fetch( 17 self, 18 pipe: mrsm.Pipe, 19 begin: Union[datetime, str, int] = '', 20 end: Union[datetime, int] = None, 21 params: Optional[Dict, Any] = None, 22 debug: bool = False, 23 **kw: Any 24 ) -> Iterator['pd.DataFrame']: 25 """Get the Pipe data from the remote Pipe.""" 26 from meerschaum.utils.debug import dprint 27 from meerschaum.utils.warnings import warn, error 28 from meerschaum.config._patch import apply_patch_to_config 29 30 fetch_params = pipe.parameters.get('fetch', {}) 31 if not fetch_params: 32 warn(f"Missing 'fetch' parameters for {pipe}.", stack=False) 33 return None 34 35 pipe_meta = fetch_params.get('pipe', {}) 36 ### Legacy: check for `connector_keys`, etc. at the root. 37 if not pipe_meta: 38 ck, mk, lk = ( 39 fetch_params.get('connector_keys', None), 40 fetch_params.get('metric_key', None), 41 fetch_params.get('location_key', None), 42 ) 43 if not ck or not mk: 44 warn(f"Missing `fetch:pipe` keys for {pipe}.", stack=False) 45 return None 46 47 pipe_meta.update({ 48 'connector': ck, 49 'metric': mk, 50 'location': lk, 51 }) 52 53 pipe_meta['instance'] = self 54 source_pipe = mrsm.Pipe(**pipe_meta) 55 56 _params = copy.deepcopy(params) if params is not None else {} 57 _params = apply_patch_to_config(_params, fetch_params.get('params', {})) 58 select_columns = fetch_params.get('select_columns', []) 59 omit_columns = fetch_params.get('omit_columns', []) 60 61 return source_pipe.get_data( 62 select_columns = select_columns, 63 omit_columns = omit_columns, 64 begin = begin, 65 end = end, 66 params = _params, 67 debug = debug, 68 as_iterator = True, 69 )
Get the Pipe data from the remote Pipe.
20def register_plugin( 21 self, 22 plugin: meerschaum.core.Plugin, 23 make_archive: bool = True, 24 debug: bool = False, 25 ) -> SuccessTuple: 26 """Register a plugin and upload its archive.""" 27 import json 28 archive_path = plugin.make_tar(debug=debug) if make_archive else plugin.archive_path 29 file_pointer = open(archive_path, 'rb') 30 files = {'archive': file_pointer} 31 metadata = { 32 'version': plugin.version, 33 'attributes': json.dumps(plugin.attributes), 34 } 35 r_url = plugin_r_url(plugin) 36 try: 37 response = self.post(r_url, files=files, params=metadata, debug=debug) 38 except Exception as e: 39 return False, f"Failed to register plugin '{plugin}'." 40 finally: 41 file_pointer.close() 42 43 try: 44 success, msg = json.loads(response.text) 45 except Exception as e: 46 return False, response.text 47 48 return success, msg
Register a plugin and upload its archive.
50def install_plugin( 51 self, 52 name: str, 53 force: bool = False, 54 debug: bool = False 55 ) -> SuccessTuple: 56 """Download and attempt to install a plugin from the API.""" 57 import os, pathlib, json 58 from meerschaum.core import Plugin 59 from meerschaum.config._paths import PLUGINS_TEMP_RESOURCES_PATH 60 from meerschaum.utils.debug import dprint 61 from meerschaum.utils.packages import attempt_import 62 binaryornot_check = attempt_import('binaryornot.check', lazy=False) 63 r_url = plugin_r_url(name) 64 dest = pathlib.Path(os.path.join(PLUGINS_TEMP_RESOURCES_PATH, name + '.tar.gz')) 65 if debug: 66 dprint(f"Fetching from '{self.url + r_url}' to '{dest}'...") 67 archive_path = self.wget(r_url, dest, debug=debug) 68 is_binary = binaryornot_check.is_binary(str(archive_path)) 69 if not is_binary: 70 fail_msg = f"Failed to download binary for plugin '{name}'." 71 try: 72 with open(archive_path, 'r') as f: 73 j = json.load(f) 74 if isinstance(j, list): 75 success, msg = tuple(j) 76 elif isinstance(j, dict) and 'detail' in j: 77 success, msg = False, fail_msg 78 except Exception as e: 79 success, msg = False, fail_msg 80 return success, msg 81 plugin = Plugin(name, archive_path=archive_path, repo_connector=self) 82 return plugin.install(force=force, debug=debug)
Download and attempt to install a plugin from the API.
148def delete_plugin( 149 self, 150 plugin: meerschaum.core.Plugin, 151 debug: bool = False 152 ) -> SuccessTuple: 153 """Delete a plugin from an API repository.""" 154 import json 155 r_url = plugin_r_url(plugin) 156 try: 157 response = self.delete(r_url, debug=debug) 158 except Exception as e: 159 return False, f"Failed to delete plugin '{plugin}'." 160 161 try: 162 success, msg = json.loads(response.text) 163 except Exception as e: 164 return False, response.text 165 166 return success, msg
Delete a plugin from an API repository.
84def get_plugins( 85 self, 86 user_id : Optional[int] = None, 87 search_term : Optional[str] = None, 88 debug : bool = False 89 ) -> Sequence[str]: 90 """Return a list of registered plugin names. 91 92 Parameters 93 ---------- 94 user_id : 95 If specified, return all plugins from a certain user. 96 user_id : Optional[int] : 97 (Default value = None) 98 search_term : Optional[str] : 99 (Default value = None) 100 debug : bool : 101 (Default value = False) 102 103 Returns 104 ------- 105 106 """ 107 import json 108 from meerschaum.utils.warnings import warn, error 109 from meerschaum.config.static import STATIC_CONFIG 110 response = self.get( 111 STATIC_CONFIG['api']['endpoints']['plugins'], 112 params = {'user_id' : user_id, 'search_term' : search_term}, 113 use_token = True, 114 debug = debug 115 ) 116 if not response: 117 return [] 118 plugins = json.loads(response.text) 119 if not isinstance(plugins, list): 120 error(response.text) 121 return plugins
Return a list of registered plugin names.
Parameters
- user_id :: If specified, return all plugins from a certain user.
- user_id (Optional[int] :): (Default value = None)
- search_term (Optional[str] :): (Default value = None)
- debug (bool :): (Default value = False)
- Returns
- -------
123def get_plugin_attributes( 124 self, 125 plugin: meerschaum.core.Plugin, 126 debug: bool = False 127 ) -> Mapping[str, Any]: 128 """ 129 Return a plugin's attributes. 130 """ 131 import json 132 from meerschaum.utils.warnings import warn, error 133 r_url = plugin_r_url(plugin) + '/attributes' 134 response = self.get(r_url, use_token=True, debug=debug) 135 attributes = response.json() 136 if isinstance(attributes, str) and attributes and attributes[0] == '{': 137 try: 138 attributes = json.loads(attributes) 139 except Exception as e: 140 pass 141 if not isinstance(attributes, dict): 142 error(response.text) 143 elif not response and 'detail' in attributes: 144 warn(attributes['detail']) 145 return {} 146 return attributes
Return a plugin's attributes.
13def login( 14 self, 15 debug: bool = False, 16 warn: bool = True, 17 **kw: Any 18 ) -> SuccessTuple: 19 """Log in and set the session token.""" 20 from meerschaum.utils.warnings import warn as _warn, info, error 21 from meerschaum.core import User 22 from meerschaum.config.static import STATIC_CONFIG 23 import json, datetime 24 try: 25 login_data = { 26 'username': self.username, 27 'password': self.password, 28 } 29 except AttributeError: 30 return False, f"Please login with the command `login {self}`." 31 response = self.post( 32 STATIC_CONFIG['api']['endpoints']['login'], 33 data = login_data, 34 use_token = False, 35 debug = debug 36 ) 37 if response: 38 msg = f"Successfully logged into '{self}' as user '{login_data['username']}'." 39 self._token = json.loads(response.text)['access_token'] 40 self._expires = datetime.datetime.strptime( 41 json.loads(response.text)['expires'], 42 '%Y-%m-%dT%H:%M:%S.%f' 43 ) 44 else: 45 msg = ( 46 f"Failed to log into '{self}' as user '{login_data['username']}'.\n" + 47 f" Please verify login details for connector '{self}'." 48 ) 49 if warn: 50 _warn(msg, stack=False) 51 52 return response.__bool__(), msg
Log in and set the session token.
55def test_connection( 56 self, 57 **kw: Any 58 ) -> Union[bool, None]: 59 """Test if a successful connection to the API may be made.""" 60 from meerschaum.connectors.poll import retry_connect 61 _default_kw = { 62 'max_retries': 1, 'retry_wait': 0, 'warn': False, 63 'connector': self, 'enforce_chaining': False, 64 'enforce_login': False, 65 } 66 _default_kw.update(kw) 67 try: 68 return retry_connect(**_default_kw) 69 except Exception as e: 70 return False
Test if a successful connection to the API may be made.
65def register_user( 66 self, 67 user: 'meerschaum.core.User', 68 debug: bool = False, 69 **kw: Any 70 ) -> SuccessTuple: 71 """Register a new user.""" 72 import json 73 from meerschaum.config.static import STATIC_CONFIG 74 r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/register" 75 data = { 76 'username': user.username, 77 'password': user.password, 78 'attributes': json.dumps(user.attributes), 79 } 80 if user.type: 81 data['type'] = user.type 82 if user.email: 83 data['email'] = user.email 84 response = self.post(r_url, data=data, debug=debug) 85 try: 86 _json = json.loads(response.text) 87 if isinstance(_json, dict) and 'detail' in _json: 88 return False, _json['detail'] 89 success_tuple = tuple(_json) 90 except Exception: 91 msg = response.text if response else f"Failed to register user '{user}'." 92 return False, msg 93 94 return tuple(success_tuple)
Register a new user.
97def get_user_id( 98 self, 99 user: 'meerschaum.core.User', 100 debug: bool = False, 101 **kw: Any 102 ) -> Optional[int]: 103 """Get a user's ID.""" 104 from meerschaum.config.static import STATIC_CONFIG 105 import json 106 r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}/id" 107 response = self.get(r_url, debug=debug, **kw) 108 try: 109 user_id = int(json.loads(response.text)) 110 except Exception as e: 111 user_id = None 112 return user_id
Get a user's ID.
13def get_users( 14 self, 15 debug: bool = False, 16 **kw : Any 17 ) -> List[str]: 18 """ 19 Return a list of registered usernames. 20 """ 21 from meerschaum.config.static import STATIC_CONFIG 22 import json 23 response = self.get( 24 f"{STATIC_CONFIG['api']['endpoints']['users']}", 25 debug = debug, 26 use_token = True, 27 ) 28 if not response: 29 return [] 30 try: 31 return response.json() 32 except Exception as e: 33 return []
Return a list of registered usernames.
35def edit_user( 36 self, 37 user: 'meerschaum.core.User', 38 debug: bool = False, 39 **kw: Any 40 ) -> SuccessTuple: 41 """Edit an existing user.""" 42 import json 43 from meerschaum.config.static import STATIC_CONFIG 44 r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/edit" 45 data = { 46 'username': user.username, 47 'password': user.password, 48 'type': user.type, 49 'email': user.email, 50 'attributes': json.dumps(user.attributes), 51 } 52 response = self.post(r_url, data=data, debug=debug) 53 try: 54 _json = json.loads(response.text) 55 if isinstance(_json, dict) and 'detail' in _json: 56 return False, _json['detail'] 57 success_tuple = tuple(_json) 58 except Exception as e: 59 msg = response.text if response else f"Failed to edit user '{user}'." 60 return False, msg 61 62 return tuple(success_tuple)
Edit an existing user.
114def delete_user( 115 self, 116 user: 'meerschaum.core.User', 117 debug: bool = False, 118 **kw: Any 119 ) -> SuccessTuple: 120 """Delete a user.""" 121 from meerschaum.config.static import STATIC_CONFIG 122 import json 123 r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}" 124 response = self.delete(r_url, debug=debug) 125 try: 126 _json = json.loads(response.text) 127 if isinstance(_json, dict) and 'detail' in _json: 128 return False, _json['detail'] 129 success_tuple = tuple(_json) 130 except Exception as e: 131 success_tuple = False, f"Failed to delete user '{user.username}'." 132 return success_tuple
Delete a user.
155def get_user_password_hash( 156 self, 157 user: 'meerschaum.core.User', 158 debug: bool = False, 159 **kw: Any 160 ) -> Optional[str]: 161 """If configured, get a user's password hash.""" 162 from meerschaum.config.static import STATIC_CONFIG 163 r_url = STATIC_CONFIG['api']['endpoints']['users'] + '/' + user.username + '/password_hash' 164 response = self.get(r_url, debug=debug, **kw) 165 if not response: 166 return None 167 return response.json()
If configured, get a user's password hash.
169def get_user_type( 170 self, 171 user: 'meerschaum.core.User', 172 debug: bool = False, 173 **kw: Any 174 ) -> Optional[str]: 175 """If configured, get a user's type.""" 176 from meerschaum.config.static import STATIC_CONFIG 177 r_url = STATIC_CONFIG['api']['endpoints']['users'] + '/' + user.username + '/type' 178 response = self.get(r_url, debug=debug, **kw) 179 if not response: 180 return None 181 return response.json()
If configured, get a user's type.
134def get_user_attributes( 135 self, 136 user: 'meerschaum.core.User', 137 debug: bool = False, 138 **kw 139 ) -> int: 140 """Get a user's attributes.""" 141 from meerschaum.config.static import STATIC_CONFIG 142 import json 143 r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}/attributes" 144 response = self.get(r_url, debug=debug, **kw) 145 try: 146 attributes = json.loads(response.text) 147 except Exception as e: 148 attributes = None 149 return attributes
Get a user's attributes.
13@classmethod 14def from_uri( 15 cls, 16 uri: str, 17 label: Optional[str] = None, 18 as_dict: bool = False, 19 ) -> Union[ 20 'meerschaum.connectors.APIConnector', 21 Dict[str, Union[str, int]], 22 ]: 23 """ 24 Create a new APIConnector from a URI string. 25 26 Parameters 27 ---------- 28 uri: str 29 The URI connection string. 30 31 label: Optional[str], default None 32 If provided, use this as the connector label. 33 Otherwise use the determined database name. 34 35 as_dict: bool, default False 36 If `True`, return a dictionary of the keyword arguments 37 necessary to create a new `APIConnector`, otherwise create a new object. 38 39 Returns 40 ------- 41 A new APIConnector object or a dictionary of attributes (if `as_dict` is `True`). 42 """ 43 from meerschaum.connectors.sql import SQLConnector 44 params = SQLConnector.parse_uri(uri) 45 if 'host' not in params: 46 error("No host was found in the provided URI.") 47 params['protocol'] = params.pop('flavor') 48 params['label'] = label or ( 49 ( 50 (params['username'] + '@' if 'username' in params else '') 51 + params['host'] 52 ).lower() 53 ) 54 55 return cls(**params) if not as_dict else params
Create a new APIConnector from a URI string.
Parameters
- uri (str): The URI connection string.
- label (Optional[str], default None): If provided, use this as the connector label. Otherwise use the determined database name.
- as_dict (bool, default False):
If
True
, return a dictionary of the keyword arguments necessary to create a newAPIConnector
, otherwise create a new object.
Returns
- A new APIConnector object or a dictionary of attributes (if
as_dict
isTrue
).
Inherited Members
65def get_connector( 66 type: str = None, 67 label: str = None, 68 refresh: bool = False, 69 debug: bool = False, 70 **kw: Any 71 ) -> Connector: 72 """ 73 Return existing connector or create new connection and store for reuse. 74 75 You can create new connectors if enough parameters are provided for the given type and flavor. 76 77 78 Parameters 79 ---------- 80 type: Optional[str], default None 81 Connector type (sql, api, etc.). 82 Defaults to the type of the configured `instance_connector`. 83 84 label: Optional[str], default None 85 Connector label (e.g. main). Defaults to `'main'`. 86 87 refresh: bool, default False 88 Refresh the Connector instance / construct new object. Defaults to `False`. 89 90 kw: Any 91 Other arguments to pass to the Connector constructor. 92 If the Connector has already been constructed and new arguments are provided, 93 `refresh` is set to `True` and the old Connector is replaced. 94 95 Returns 96 ------- 97 A new Meerschaum connector (e.g. `meerschaum.connectors.api.APIConnector`, 98 `meerschaum.connectors.sql.SQLConnector`). 99 100 Examples 101 -------- 102 The following parameters would create a new 103 `meerschaum.connectors.sql.SQLConnector` that isn't in the configuration file. 104 105 ``` 106 >>> conn = get_connector( 107 ... type = 'sql', 108 ... label = 'newlabel', 109 ... flavor = 'sqlite', 110 ... database = '/file/path/to/database.db' 111 ... ) 112 >>> 113 ``` 114 115 """ 116 from meerschaum.connectors.parse import parse_instance_keys 117 from meerschaum.config import get_config 118 from meerschaum.config.static import STATIC_CONFIG 119 from meerschaum.utils.warnings import warn 120 global _loaded_plugin_connectors 121 if isinstance(type, str) and not label and ':' in type: 122 type, label = type.split(':', maxsplit=1) 123 with _locks['_loaded_plugin_connectors']: 124 if not _loaded_plugin_connectors: 125 load_plugin_connectors() 126 _loaded_plugin_connectors = True 127 if type is None and label is None: 128 default_instance_keys = get_config('meerschaum', 'instance', patch=True) 129 ### recursive call to get_connector 130 return parse_instance_keys(default_instance_keys) 131 132 ### NOTE: the default instance connector may not be main. 133 ### Only fall back to 'main' if the type is provided by the label is omitted. 134 label = label if label is not None else STATIC_CONFIG['connectors']['default_label'] 135 136 ### type might actually be a label. Check if so and raise a warning. 137 if type not in connectors: 138 possibilities, poss_msg = [], "" 139 for _type in get_config('meerschaum', 'connectors'): 140 if type in get_config('meerschaum', 'connectors', _type): 141 possibilities.append(f"{_type}:{type}") 142 if len(possibilities) > 0: 143 poss_msg = " Did you mean" 144 for poss in possibilities[:-1]: 145 poss_msg += f" '{poss}'," 146 if poss_msg.endswith(','): 147 poss_msg = poss_msg[:-1] 148 if len(possibilities) > 1: 149 poss_msg += " or" 150 poss_msg += f" '{possibilities[-1]}'?" 151 152 warn(f"Cannot create Connector of type '{type}'." + poss_msg, stack=False) 153 return None 154 155 if 'sql' not in types: 156 from meerschaum.connectors.plugin import PluginConnector 157 with _locks['types']: 158 types.update({ 159 'api' : APIConnector, 160 'sql' : SQLConnector, 161 'plugin': PluginConnector, 162 }) 163 164 ### determine if we need to call the constructor 165 if not refresh: 166 ### see if any user-supplied arguments differ from the existing instance 167 if label in connectors[type]: 168 warning_message = None 169 for attribute, value in kw.items(): 170 if attribute not in connectors[type][label].meta: 171 import inspect 172 cls = connectors[type][label].__class__ 173 cls_init_signature = inspect.signature(cls) 174 cls_init_params = cls_init_signature.parameters 175 if attribute not in cls_init_params: 176 warning_message = ( 177 f"Received new attribute '{attribute}' not present in connector " + 178 f"{connectors[type][label]}.\n" 179 ) 180 elif connectors[type][label].__dict__[attribute] != value: 181 warning_message = ( 182 f"Mismatched values for attribute '{attribute}' in connector " 183 + f"'{connectors[type][label]}'.\n" + 184 f" - Keyword value: '{value}'\n" + 185 f" - Existing value: '{connectors[type][label].__dict__[attribute]}'\n" 186 ) 187 if warning_message is not None: 188 warning_message += ( 189 "\nSetting `refresh` to True and recreating connector with type:" 190 + f" '{type}' and label '{label}'." 191 ) 192 refresh = True 193 warn(warning_message) 194 else: ### connector doesn't yet exist 195 refresh = True 196 197 ### only create an object if refresh is True 198 ### (can be manually specified, otherwise determined above) 199 if refresh: 200 with _locks['connectors']: 201 try: 202 ### will raise an error if configuration is incorrect / missing 203 conn = types[type](label=label, **kw) 204 connectors[type][label] = conn 205 except InvalidAttributesError as ie: 206 warn( 207 f"Incorrect attributes for connector '{type}:{label}'.\n" 208 + str(ie), 209 stack = False, 210 ) 211 conn = None 212 except Exception as e: 213 from meerschaum.utils.formatting import get_console 214 console = get_console() 215 if console: 216 console.print_exception() 217 warn( 218 f"Exception when creating connector '{type}:{label}'.\n" + str(e), 219 stack = False, 220 ) 221 conn = None 222 if conn is None: 223 return None 224 225 return connectors[type][label]
Return existing connector or create new connection and store for reuse.
You can create new connectors if enough parameters are provided for the given type and flavor.
Parameters
- type (Optional[str], default None):
Connector type (sql, api, etc.).
Defaults to the type of the configured
instance_connector
. - label (Optional[str], default None):
Connector label (e.g. main). Defaults to
'main'
. - refresh (bool, default False):
Refresh the Connector instance / construct new object. Defaults to
False
. - kw (Any):
Other arguments to pass to the Connector constructor.
If the Connector has already been constructed and new arguments are provided,
refresh
is set toTrue
and the old Connector is replaced.
Returns
- A new Meerschaum connector (e.g.
meerschaum.connectors.api.APIConnector
, meerschaum.connectors.sql.SQLConnector
).
Examples
The following parameters would create a new
meerschaum.connectors.sql.SQLConnector
that isn't in the configuration file.
>>> conn = get_connector(
... type = 'sql',
... label = 'newlabel',
... flavor = 'sqlite',
... database = '/file/path/to/database.db'
... )
>>>
228def is_connected(keys: str, **kw) -> bool: 229 """ 230 Check if the connector keys correspond to an active connection. 231 If the connector has not been created, it will immediately return `False`. 232 If the connector exists but cannot communicate with the source, return `False`. 233 234 **NOTE:** Only works with instance connectors (`SQLConnectors` and `APIConnectors`). 235 Keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`. 236 237 Parameters 238 ---------- 239 keys: 240 The keys to the connector (e.g. `'sql:main'`). 241 242 Returns 243 ------- 244 A `bool` corresponding to whether a successful connection may be made. 245 246 """ 247 import warnings 248 if ':' not in keys: 249 warn(f"Invalid connector keys '{keys}'") 250 251 try: 252 typ, label = keys.split(':') 253 except Exception as e: 254 return False 255 if typ not in instance_types: 256 return False 257 if not (label in connectors.get(typ, {})): 258 return False 259 260 from meerschaum.connectors.parse import parse_instance_keys 261 conn = parse_instance_keys(keys) 262 try: 263 with warnings.catch_warnings(): 264 warnings.filterwarnings('ignore') 265 return conn.test_connection(**kw) 266 except Exception as e: 267 return False
Check if the connector keys correspond to an active connection.
If the connector has not been created, it will immediately return False
.
If the connector exists but cannot communicate with the source, return False
.
NOTE: Only works with instance connectors (SQLConnectors
and APIConnectors
).
Keyword arguments are passed to meerschaum.connectors.poll.retry_connect
.
Parameters
- keys:: The keys to the connector (e.g.
'sql:main'
).
Returns
- A
bool
corresponding to whether a successful connection may be made.