meerschaum.connectors
Create connectors with get_connector()
.
For ease of use, you can also import from the root meerschaum
module:
>>> from meerschaum import get_connector
>>> conn = get_connector()
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Create connectors with `meerschaum.connectors.get_connector()`. 7For ease of use, you can also import from the root `meerschaum` module: 8``` 9>>> from meerschaum import get_connector 10>>> conn = get_connector() 11``` 12""" 13 14from __future__ import annotations 15 16import meerschaum as mrsm 17from meerschaum.utils.typing import Any, SuccessTuple, Union, Optional, List, Dict 18from meerschaum.utils.threading import Lock, RLock 19from meerschaum.utils.warnings import error, warn 20 21from meerschaum.connectors.Connector import Connector, InvalidAttributesError 22from meerschaum.connectors.sql.SQLConnector import SQLConnector 23from meerschaum.connectors.api.APIConnector import APIConnector 24from meerschaum.connectors.sql._create_engine import flavor_configs as sql_flavor_configs 25 26__all__ = ( 27 "make_connector", 28 "Connector", 29 "SQLConnector", 30 "APIConnector", 31 "get_connector", 32 "is_connected", 33 "poll", 34) 35 36### store connectors partitioned by 37### type, label for reuse 38connectors: Dict[str, Dict[str, Connector]] = { 39 'api' : {}, 40 'sql' : {}, 41 'plugin': {}, 42} 43instance_types: List[str] = ['sql', 'api'] 44_locks: Dict[str, RLock] = { 45 'connectors' : RLock(), 46 'types' : RLock(), 47 'custom_types' : RLock(), 48 '_loaded_plugin_connectors': RLock(), 49 'instance_types' : RLock(), 50} 51attributes: Dict[str, Dict[str, Any]] = { 52 'api': { 53 'required': [ 54 'host', 55 'username', 56 'password' 57 ], 58 'default': { 59 'protocol': 'http', 60 }, 61 }, 62 'sql': { 63 'flavors': sql_flavor_configs, 64 }, 65} 66### Fill this with objects only when connectors are first referenced. 67types: Dict[str, Any] = {} 68custom_types: set = set() 69_loaded_plugin_connectors: bool = False 70 71 72def get_connector( 73 type: str = None, 74 label: str = None, 75 refresh: bool = False, 76 debug: bool = False, 77 **kw: Any 78 ) -> Connector: 79 """ 80 Return existing connector or create new connection and store for reuse. 81 82 You can create new connectors if enough parameters are provided for the given type and flavor. 83 84 85 Parameters 86 ---------- 87 type: Optional[str], default None 88 Connector type (sql, api, etc.). 89 Defaults to the type of the configured `instance_connector`. 90 91 label: Optional[str], default None 92 Connector label (e.g. main). Defaults to `'main'`. 93 94 refresh: bool, default False 95 Refresh the Connector instance / construct new object. Defaults to `False`. 96 97 kw: Any 98 Other arguments to pass to the Connector constructor. 99 If the Connector has already been constructed and new arguments are provided, 100 `refresh` is set to `True` and the old Connector is replaced. 101 102 Returns 103 ------- 104 A new Meerschaum connector (e.g. `meerschaum.connectors.api.APIConnector`, 105 `meerschaum.connectors.sql.SQLConnector`). 106 107 Examples 108 -------- 109 The following parameters would create a new 110 `meerschaum.connectors.sql.SQLConnector` that isn't in the configuration file. 111 112 ``` 113 >>> conn = get_connector( 114 ... type = 'sql', 115 ... label = 'newlabel', 116 ... flavor = 'sqlite', 117 ... database = '/file/path/to/database.db' 118 ... ) 119 >>> 120 ``` 121 122 """ 123 from meerschaum.connectors.parse import parse_instance_keys 124 from meerschaum.config import get_config 125 from meerschaum.config.static import STATIC_CONFIG 126 from meerschaum.utils.warnings import warn 127 global _loaded_plugin_connectors 128 if isinstance(type, str) and not label and ':' in type: 129 type, label = type.split(':', maxsplit=1) 130 with _locks['_loaded_plugin_connectors']: 131 if not _loaded_plugin_connectors: 132 load_plugin_connectors() 133 _loaded_plugin_connectors = True 134 if type is None and label is None: 135 default_instance_keys = get_config('meerschaum', 'instance', patch=True) 136 ### recursive call to get_connector 137 return parse_instance_keys(default_instance_keys) 138 139 ### NOTE: the default instance connector may not be main. 140 ### Only fall back to 'main' if the type is provided by the label is omitted. 141 label = label if label is not None else STATIC_CONFIG['connectors']['default_label'] 142 143 ### type might actually be a label. Check if so and raise a warning. 144 if type not in connectors: 145 possibilities, poss_msg = [], "" 146 for _type in get_config('meerschaum', 'connectors'): 147 if type in get_config('meerschaum', 'connectors', _type): 148 possibilities.append(f"{_type}:{type}") 149 if len(possibilities) > 0: 150 poss_msg = " Did you mean" 151 for poss in possibilities[:-1]: 152 poss_msg += f" '{poss}'," 153 if poss_msg.endswith(','): 154 poss_msg = poss_msg[:-1] 155 if len(possibilities) > 1: 156 poss_msg += " or" 157 poss_msg += f" '{possibilities[-1]}'?" 158 159 warn(f"Cannot create Connector of type '{type}'." + poss_msg, stack=False) 160 return None 161 162 if 'sql' not in types: 163 from meerschaum.connectors.plugin import PluginConnector 164 with _locks['types']: 165 types.update({ 166 'api' : APIConnector, 167 'sql' : SQLConnector, 168 'plugin': PluginConnector, 169 }) 170 171 ### determine if we need to call the constructor 172 if not refresh: 173 ### see if any user-supplied arguments differ from the existing instance 174 if label in connectors[type]: 175 warning_message = None 176 for attribute, value in kw.items(): 177 if attribute not in connectors[type][label].meta: 178 import inspect 179 cls = connectors[type][label].__class__ 180 cls_init_signature = inspect.signature(cls) 181 cls_init_params = cls_init_signature.parameters 182 if attribute not in cls_init_params: 183 warning_message = ( 184 f"Received new attribute '{attribute}' not present in connector " + 185 f"{connectors[type][label]}.\n" 186 ) 187 elif connectors[type][label].__dict__[attribute] != value: 188 warning_message = ( 189 f"Mismatched values for attribute '{attribute}' in connector " 190 + f"'{connectors[type][label]}'.\n" + 191 f" - Keyword value: '{value}'\n" + 192 f" - Existing value: '{connectors[type][label].__dict__[attribute]}'\n" 193 ) 194 if warning_message is not None: 195 warning_message += ( 196 "\nSetting `refresh` to True and recreating connector with type:" 197 + f" '{type}' and label '{label}'." 198 ) 199 refresh = True 200 warn(warning_message) 201 else: ### connector doesn't yet exist 202 refresh = True 203 204 ### only create an object if refresh is True 205 ### (can be manually specified, otherwise determined above) 206 if refresh: 207 with _locks['connectors']: 208 try: 209 ### will raise an error if configuration is incorrect / missing 210 conn = types[type](label=label, **kw) 211 connectors[type][label] = conn 212 except InvalidAttributesError as ie: 213 warn( 214 f"Incorrect attributes for connector '{type}:{label}'.\n" 215 + str(ie), 216 stack = False, 217 ) 218 conn = None 219 except Exception as e: 220 from meerschaum.utils.formatting import get_console 221 console = get_console() 222 if console: 223 console.print_exception() 224 warn( 225 f"Exception when creating connector '{type}:{label}'.\n" + str(e), 226 stack = False, 227 ) 228 conn = None 229 if conn is None: 230 return None 231 232 return connectors[type][label] 233 234 235def is_connected(keys: str, **kw) -> bool: 236 """ 237 Check if the connector keys correspond to an active connection. 238 If the connector has not been created, it will immediately return `False`. 239 If the connector exists but cannot communicate with the source, return `False`. 240 241 **NOTE:** Only works with instance connectors (`SQLConnectors` and `APIConnectors`). 242 Keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`. 243 244 Parameters 245 ---------- 246 keys: 247 The keys to the connector (e.g. `'sql:main'`). 248 249 Returns 250 ------- 251 A `bool` corresponding to whether a successful connection may be made. 252 253 """ 254 import warnings 255 if ':' not in keys: 256 warn(f"Invalid connector keys '{keys}'") 257 258 try: 259 typ, label = keys.split(':') 260 except Exception as e: 261 return False 262 if typ not in instance_types: 263 return False 264 if not (label in connectors.get(typ, {})): 265 return False 266 267 from meerschaum.connectors.parse import parse_instance_keys 268 conn = parse_instance_keys(keys) 269 try: 270 with warnings.catch_warnings(): 271 warnings.filterwarnings('ignore') 272 return conn.test_connection(**kw) 273 except Exception as e: 274 return False 275 276 277def make_connector( 278 cls, 279 ): 280 """ 281 Register a class as a `Connector`. 282 The `type` will be the lower case of the class name, without the suffix `connector`. 283 284 Parameters 285 ---------- 286 instance: bool, default False 287 If `True`, make this connector type an instance connector. 288 This requires implementing the various pipes functions and lots of testing. 289 290 Examples 291 -------- 292 >>> import meerschaum as mrsm 293 >>> from meerschaum.connectors import make_connector, Connector 294 >>> 295 >>> @make_connector 296 >>> class FooConnector(Connector): 297 ... REQUIRED_ATTRIBUTES: list[str] = ['username', 'password'] 298 ... 299 >>> conn = mrsm.get_connector('foo:bar', username='dog', password='cat') 300 >>> print(conn.username, conn.password) 301 dog cat 302 >>> 303 """ 304 import re 305 typ = re.sub(r'connector$', '', cls.__name__.lower()) 306 with _locks['types']: 307 types[typ] = cls 308 with _locks['custom_types']: 309 custom_types.add(typ) 310 with _locks['connectors']: 311 if typ not in connectors: 312 connectors[typ] = {} 313 if getattr(cls, 'IS_INSTANCE', False): 314 with _locks['instance_types']: 315 if typ not in instance_types: 316 instance_types.append(typ) 317 318 return cls 319 320 321def load_plugin_connectors(): 322 """ 323 If a plugin makes use of the `make_connector` decorator, 324 load its module. 325 """ 326 from meerschaum.plugins import get_plugins, import_plugins 327 to_import = [] 328 for plugin in get_plugins(): 329 if plugin is None: 330 continue 331 with open(plugin.__file__, encoding='utf-8') as f: 332 text = f.read() 333 if 'make_connector' in text or 'Connector' in text: 334 to_import.append(plugin.name) 335 if not to_import: 336 return 337 import_plugins(*to_import) 338 339 340def get_connector_plugin( 341 connector: Connector, 342 ) -> Union[str, None, mrsm.Plugin]: 343 """ 344 Determine the plugin for a connector. 345 This is useful for handling virtual environments for custom instance connectors. 346 347 Parameters 348 ---------- 349 connector: Connector 350 The connector which may require a virtual environment. 351 352 Returns 353 ------- 354 A Plugin, 'mrsm', or None. 355 """ 356 if not hasattr(connector, 'type'): 357 return None 358 plugin_name = ( 359 connector.__module__.replace('plugins.', '').split('.')[0] 360 if connector.type in custom_types else ( 361 connector.label 362 if connector.type == 'plugin' 363 else 'mrsm' 364 ) 365 ) 366 plugin = mrsm.Plugin(plugin_name) 367 return plugin if plugin.is_installed() else None
278def make_connector( 279 cls, 280 ): 281 """ 282 Register a class as a `Connector`. 283 The `type` will be the lower case of the class name, without the suffix `connector`. 284 285 Parameters 286 ---------- 287 instance: bool, default False 288 If `True`, make this connector type an instance connector. 289 This requires implementing the various pipes functions and lots of testing. 290 291 Examples 292 -------- 293 >>> import meerschaum as mrsm 294 >>> from meerschaum.connectors import make_connector, Connector 295 >>> 296 >>> @make_connector 297 >>> class FooConnector(Connector): 298 ... REQUIRED_ATTRIBUTES: list[str] = ['username', 'password'] 299 ... 300 >>> conn = mrsm.get_connector('foo:bar', username='dog', password='cat') 301 >>> print(conn.username, conn.password) 302 dog cat 303 >>> 304 """ 305 import re 306 typ = re.sub(r'connector$', '', cls.__name__.lower()) 307 with _locks['types']: 308 types[typ] = cls 309 with _locks['custom_types']: 310 custom_types.add(typ) 311 with _locks['connectors']: 312 if typ not in connectors: 313 connectors[typ] = {} 314 if getattr(cls, 'IS_INSTANCE', False): 315 with _locks['instance_types']: 316 if typ not in instance_types: 317 instance_types.append(typ) 318 319 return cls
Register a class as a Connector
.
The type
will be the lower case of the class name, without the suffix connector
.
Parameters
- instance (bool, default False):
If
True
, make this connector type an instance connector. This requires implementing the various pipes functions and lots of testing.
Examples
>>> import meerschaum as mrsm
>>> from meerschaum.connectors import make_connector, Connector
>>>
>>> @make_connector
>>> class FooConnector(Connector):
... REQUIRED_ATTRIBUTES: list[str] = ['username', 'password']
...
>>> conn = mrsm.get_connector('foo:bar', username='dog', password='cat')
>>> print(conn.username, conn.password)
dog cat
>>>
20class Connector(metaclass=abc.ABCMeta): 21 """ 22 The base connector class to hold connection attributes. 23 """ 24 def __init__( 25 self, 26 type: Optional[str] = None, 27 label: Optional[str] = None, 28 **kw: Any 29 ): 30 """ 31 Set the given keyword arguments as attributes. 32 33 Parameters 34 ---------- 35 type: str 36 The `type` of the connector (e.g. `sql`, `api`, `plugin`). 37 38 label: str 39 The `label` for the connector. 40 41 42 Examples 43 -------- 44 Run `mrsm edit config` and to edit connectors in the YAML file: 45 46 ```yaml 47 meerschaum: 48 connections: 49 {type}: 50 {label}: 51 ### attributes go here 52 ``` 53 54 """ 55 self._original_dict = copy.deepcopy(self.__dict__) 56 self._set_attributes(type=type, label=label, **kw) 57 self.verify_attributes(getattr(self, 'REQUIRED_ATTRIBUTES', None)) 58 59 def _reset_attributes(self): 60 self.__dict__ = self._original_dict 61 62 def _set_attributes( 63 self, 64 *args, 65 inherit_default: bool = True, 66 **kw: Any 67 ): 68 from meerschaum.config.static import STATIC_CONFIG 69 from meerschaum.utils.warnings import error 70 71 self._attributes = {} 72 73 default_label = STATIC_CONFIG['connectors']['default_label'] 74 75 ### NOTE: Support the legacy method of explicitly passing the type. 76 label = kw.get('label', None) 77 if label is None: 78 if len(args) == 2: 79 label = args[1] 80 elif len(args) == 0: 81 label = None 82 else: 83 label = args[0] 84 85 if label == 'default': 86 error( 87 f"Label cannot be 'default'. Did you mean '{default_label}'?", 88 InvalidAttributesError, 89 ) 90 self.__dict__['label'] = label 91 92 from meerschaum.config import get_config 93 conn_configs = copy.deepcopy(get_config('meerschaum', 'connectors')) 94 connector_config = copy.deepcopy(get_config('system', 'connectors')) 95 96 ### inherit attributes from 'default' if exists 97 if inherit_default: 98 inherit_from = 'default' 99 if self.type in conn_configs and inherit_from in conn_configs[self.type]: 100 _inherit_dict = copy.deepcopy(conn_configs[self.type][inherit_from]) 101 self._attributes.update(_inherit_dict) 102 103 ### load user config into self._attributes 104 if self.type in conn_configs and self.label in conn_configs[self.type]: 105 self._attributes.update(conn_configs[self.type][self.label]) 106 107 ### load system config into self._sys_config 108 ### (deep copy so future Connectors don't inherit changes) 109 if self.type in connector_config: 110 self._sys_config = copy.deepcopy(connector_config[self.type]) 111 112 ### add additional arguments or override configuration 113 self._attributes.update(kw) 114 115 ### finally, update __dict__ with _attributes. 116 self.__dict__.update(self._attributes) 117 118 119 def verify_attributes( 120 self, 121 required_attributes: Optional[List[str]] = None, 122 debug: bool = False 123 ) -> None: 124 """ 125 Ensure that the required attributes have been met. 126 127 The Connector base class checks the minimum requirements. 128 Child classes may enforce additional requirements. 129 130 Parameters 131 ---------- 132 required_attributes: Optional[List[str]], default None 133 Attributes to be verified. If `None`, default to `['label']`. 134 135 debug: bool, default False 136 Verbosity toggle. 137 138 Returns 139 ------- 140 Don't return anything. 141 142 Raises 143 ------ 144 An error if any of the required attributes are missing. 145 """ 146 from meerschaum.utils.warnings import error, warn 147 from meerschaum.utils.debug import dprint 148 from meerschaum.utils.misc import items_str 149 if required_attributes is None: 150 required_attributes = ['label'] 151 missing_attributes = set() 152 for a in required_attributes: 153 if a not in self.__dict__: 154 missing_attributes.add(a) 155 if len(missing_attributes) > 0: 156 error( 157 ( 158 f"Missing {items_str(list(missing_attributes))} " 159 + f"for connector '{self.type}:{self.label}'." 160 ), 161 InvalidAttributesError, 162 silent = True, 163 stack = False 164 ) 165 166 167 def __str__(self): 168 """ 169 When cast to a string, return type:label. 170 """ 171 return f"{self.type}:{self.label}" 172 173 def __repr__(self): 174 """ 175 Represent the connector as type:label. 176 """ 177 return str(self) 178 179 @property 180 def meta(self) -> Dict[str, Any]: 181 """ 182 Return the keys needed to reconstruct this Connector. 183 """ 184 _meta = { 185 key: value 186 for key, value in self.__dict__.items() 187 if not str(key).startswith('_') 188 } 189 _meta.update({ 190 'type': self.type, 191 'label': self.label, 192 }) 193 return _meta 194 195 196 @property 197 def type(self) -> str: 198 """ 199 Return the type for this connector. 200 """ 201 _type = self.__dict__.get('type', None) 202 if _type is None: 203 import re 204 _type = re.sub(r'connector$', '', self.__class__.__name__.lower()) 205 self.__dict__['type'] = _type 206 return _type 207 208 209 @property 210 def label(self) -> str: 211 """ 212 Return the label for this connector. 213 """ 214 _label = self.__dict__.get('label', None) 215 if _label is None: 216 from meerschaum.config.static import STATIC_CONFIG 217 _label = STATIC_CONFIG['connectors']['default_label'] 218 self.__dict__['label'] = _label 219 return _label
The base connector class to hold connection attributes.
24 def __init__( 25 self, 26 type: Optional[str] = None, 27 label: Optional[str] = None, 28 **kw: Any 29 ): 30 """ 31 Set the given keyword arguments as attributes. 32 33 Parameters 34 ---------- 35 type: str 36 The `type` of the connector (e.g. `sql`, `api`, `plugin`). 37 38 label: str 39 The `label` for the connector. 40 41 42 Examples 43 -------- 44 Run `mrsm edit config` and to edit connectors in the YAML file: 45 46 ```yaml 47 meerschaum: 48 connections: 49 {type}: 50 {label}: 51 ### attributes go here 52 ``` 53 54 """ 55 self._original_dict = copy.deepcopy(self.__dict__) 56 self._set_attributes(type=type, label=label, **kw) 57 self.verify_attributes(getattr(self, 'REQUIRED_ATTRIBUTES', None))
119 def verify_attributes( 120 self, 121 required_attributes: Optional[List[str]] = None, 122 debug: bool = False 123 ) -> None: 124 """ 125 Ensure that the required attributes have been met. 126 127 The Connector base class checks the minimum requirements. 128 Child classes may enforce additional requirements. 129 130 Parameters 131 ---------- 132 required_attributes: Optional[List[str]], default None 133 Attributes to be verified. If `None`, default to `['label']`. 134 135 debug: bool, default False 136 Verbosity toggle. 137 138 Returns 139 ------- 140 Don't return anything. 141 142 Raises 143 ------ 144 An error if any of the required attributes are missing. 145 """ 146 from meerschaum.utils.warnings import error, warn 147 from meerschaum.utils.debug import dprint 148 from meerschaum.utils.misc import items_str 149 if required_attributes is None: 150 required_attributes = ['label'] 151 missing_attributes = set() 152 for a in required_attributes: 153 if a not in self.__dict__: 154 missing_attributes.add(a) 155 if len(missing_attributes) > 0: 156 error( 157 ( 158 f"Missing {items_str(list(missing_attributes))} " 159 + f"for connector '{self.type}:{self.label}'." 160 ), 161 InvalidAttributesError, 162 silent = True, 163 stack = False 164 )
Ensure that the required attributes have been met.
The Connector base class checks the minimum requirements. Child classes may enforce additional requirements.
Parameters
- required_attributes (Optional[List[str]], default None):
Attributes to be verified. If
None
, default to['label']
. - debug (bool, default False): Verbosity toggle.
Returns
- Don't return anything.
Raises
- An error if any of the required attributes are missing.
179 @property 180 def meta(self) -> Dict[str, Any]: 181 """ 182 Return the keys needed to reconstruct this Connector. 183 """ 184 _meta = { 185 key: value 186 for key, value in self.__dict__.items() 187 if not str(key).startswith('_') 188 } 189 _meta.update({ 190 'type': self.type, 191 'label': self.label, 192 }) 193 return _meta
Return the keys needed to reconstruct this Connector.
196 @property 197 def type(self) -> str: 198 """ 199 Return the type for this connector. 200 """ 201 _type = self.__dict__.get('type', None) 202 if _type is None: 203 import re 204 _type = re.sub(r'connector$', '', self.__class__.__name__.lower()) 205 self.__dict__['type'] = _type 206 return _type
Return the type for this connector.
209 @property 210 def label(self) -> str: 211 """ 212 Return the label for this connector. 213 """ 214 _label = self.__dict__.get('label', None) 215 if _label is None: 216 from meerschaum.config.static import STATIC_CONFIG 217 _label = STATIC_CONFIG['connectors']['default_label'] 218 self.__dict__['label'] = _label 219 return _label
Return the label for this connector.
17class SQLConnector(Connector): 18 """ 19 Connect to SQL databases via `sqlalchemy`. 20 21 SQLConnectors may be used as Meerschaum instance connectors. 22 Read more about connectors and instances at 23 https://meerschaum.io/reference/connectors/ 24 25 """ 26 27 IS_INSTANCE: bool = True 28 29 from ._create_engine import flavor_configs, create_engine 30 from ._sql import read, value, exec, execute, to_sql, exec_queries 31 from meerschaum.utils.sql import test_connection 32 from ._fetch import fetch, get_pipe_metadef 33 from ._cli import cli, _cli_exit 34 from ._pipes import ( 35 fetch_pipes_keys, 36 create_indices, 37 drop_indices, 38 get_create_index_queries, 39 get_drop_index_queries, 40 get_add_columns_queries, 41 get_alter_columns_queries, 42 delete_pipe, 43 get_pipe_data, 44 get_pipe_data_query, 45 register_pipe, 46 edit_pipe, 47 get_pipe_id, 48 get_pipe_attributes, 49 sync_pipe, 50 sync_pipe_inplace, 51 get_sync_time, 52 pipe_exists, 53 get_pipe_rowcount, 54 drop_pipe, 55 clear_pipe, 56 deduplicate_pipe, 57 get_pipe_table, 58 get_pipe_columns_types, 59 get_to_sql_dtype, 60 get_pipe_schema, 61 ) 62 from ._plugins import ( 63 register_plugin, 64 delete_plugin, 65 get_plugin_id, 66 get_plugin_version, 67 get_plugins, 68 get_plugin_user_id, 69 get_plugin_username, 70 get_plugin_attributes, 71 ) 72 from ._users import ( 73 register_user, 74 get_user_id, 75 get_users, 76 edit_user, 77 delete_user, 78 get_user_password_hash, 79 get_user_type, 80 get_user_attributes, 81 ) 82 from ._uri import from_uri, parse_uri 83 from ._instance import ( 84 _log_temporary_tables_creation, 85 _drop_temporary_table, 86 _drop_temporary_tables, 87 _drop_old_temporary_tables, 88 ) 89 90 def __init__( 91 self, 92 label: Optional[str] = None, 93 flavor: Optional[str] = None, 94 wait: bool = False, 95 connect: bool = False, 96 debug: bool = False, 97 **kw: Any 98 ): 99 """ 100 Parameters 101 ---------- 102 label: str, default 'main' 103 The identifying label for the connector. 104 E.g. for `sql:main`, 'main' is the label. 105 Defaults to 'main'. 106 107 flavor: Optional[str], default None 108 The database flavor, e.g. 109 `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc. 110 To see supported flavors, run the `bootstrap connectors` command. 111 112 wait: bool, default False 113 If `True`, block until a database connection has been made. 114 Defaults to `False`. 115 116 connect: bool, default False 117 If `True`, immediately attempt to connect the database and raise 118 a warning if the connection fails. 119 Defaults to `False`. 120 121 debug: bool, default False 122 Verbosity toggle. 123 Defaults to `False`. 124 125 kw: Any 126 All other arguments will be passed to the connector's attributes. 127 Therefore, a connector may be made without being registered, 128 as long enough parameters are supplied to the constructor. 129 """ 130 if 'uri' in kw: 131 uri = kw['uri'] 132 if uri.startswith('postgres') and not uri.startswith('postgresql'): 133 uri = uri.replace('postgres', 'postgresql', 1) 134 if uri.startswith('postgresql') and not uri.startswith('postgresql+'): 135 uri = uri.replace('postgresql://', 'postgresql+psycopg', 1) 136 if uri.startswith('timescaledb://'): 137 uri = uri.replace('timescaledb://', 'postgresql://', 1) 138 flavor = 'timescaledb' 139 kw['uri'] = uri 140 from_uri_params = self.from_uri(kw['uri'], as_dict=True) 141 label = label or from_uri_params.get('label', None) 142 _ = from_uri_params.pop('label', None) 143 144 ### Sometimes the flavor may be provided with a URI. 145 kw.update(from_uri_params) 146 if flavor: 147 kw['flavor'] = flavor 148 149 150 ### set __dict__ in base class 151 super().__init__( 152 'sql', 153 label = label or self.__dict__.get('label', None), 154 **kw 155 ) 156 157 if self.__dict__.get('flavor', None) == 'sqlite': 158 self._reset_attributes() 159 self._set_attributes( 160 'sql', 161 label = label, 162 inherit_default = False, 163 **kw 164 ) 165 ### For backwards compatability reasons, set the path for sql:local if its missing. 166 if self.label == 'local' and not self.__dict__.get('database', None): 167 from meerschaum.config._paths import SQLITE_DB_PATH 168 self.database = str(SQLITE_DB_PATH) 169 170 ### ensure flavor and label are set accordingly 171 if 'flavor' not in self.__dict__: 172 if flavor is None and 'uri' not in self.__dict__: 173 raise Exception( 174 f" Missing flavor. Provide flavor as a key for '{self}'." 175 ) 176 self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None) 177 178 if self.flavor == 'postgres': 179 self.flavor = 'postgresql' 180 181 self._debug = debug 182 ### Store the PID and thread at initialization 183 ### so we can dispose of the Pool in child processes or threads. 184 import os, threading 185 self._pid = os.getpid() 186 self._thread_ident = threading.current_thread().ident 187 self._sessions = {} 188 self._locks = {'_sessions': threading.RLock(), } 189 190 ### verify the flavor's requirements are met 191 if self.flavor not in self.flavor_configs: 192 error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector") 193 if not self.__dict__.get('uri'): 194 self.verify_attributes( 195 self.flavor_configs[self.flavor].get('requirements', set()), 196 debug=debug, 197 ) 198 199 if wait: 200 from meerschaum.connectors.poll import retry_connect 201 retry_connect(connector=self, debug=debug) 202 203 if connect: 204 if not self.test_connection(debug=debug): 205 from meerschaum.utils.warnings import warn 206 warn(f"Failed to connect with connector '{self}'!", stack=False) 207 208 @property 209 def Session(self): 210 if '_Session' not in self.__dict__: 211 if self.engine is None: 212 return None 213 214 from meerschaum.utils.packages import attempt_import 215 sqlalchemy_orm = attempt_import('sqlalchemy.orm') 216 session_factory = sqlalchemy_orm.sessionmaker(self.engine) 217 self._Session = sqlalchemy_orm.scoped_session(session_factory) 218 219 return self._Session 220 221 @property 222 def engine(self): 223 import os, threading 224 ### build the sqlalchemy engine 225 if '_engine' not in self.__dict__: 226 self._engine, self._engine_str = self.create_engine(include_uri=True) 227 228 same_process = os.getpid() == self._pid 229 same_thread = threading.current_thread().ident == self._thread_ident 230 231 ### handle child processes 232 if not same_process: 233 self._pid = os.getpid() 234 self._thread = threading.current_thread() 235 from meerschaum.utils.warnings import warn 236 warn(f"Different PID detected. Disposing of connections...") 237 self._engine.dispose() 238 239 ### handle different threads 240 if not same_thread: 241 pass 242 243 return self._engine 244 245 @property 246 def DATABASE_URL(self) -> str: 247 """ 248 Return the URI connection string (alias for `SQLConnector.URI`. 249 """ 250 _ = self.engine 251 return str(self._engine_str) 252 253 @property 254 def URI(self) -> str: 255 """ 256 Return the URI connection string. 257 """ 258 _ = self.engine 259 return str(self._engine_str) 260 261 @property 262 def IS_THREAD_SAFE(self) -> str: 263 """ 264 Return whether this connector may be multithreaded. 265 """ 266 if self.flavor == 'duckdb': 267 return False 268 if self.flavor == 'sqlite': 269 return ':memory:' not in self.URI 270 return True 271 272 273 @property 274 def metadata(self): 275 """ 276 Return the metadata bound to this configured schema. 277 """ 278 from meerschaum.utils.packages import attempt_import 279 sqlalchemy = attempt_import('sqlalchemy') 280 if '_metadata' not in self.__dict__: 281 self._metadata = sqlalchemy.MetaData(schema=self.schema) 282 return self._metadata 283 284 285 @property 286 def instance_schema(self): 287 """ 288 Return the schema name for Meerschaum tables. 289 """ 290 return self.schema 291 292 293 @property 294 def internal_schema(self): 295 """ 296 Return the schema name for internal tables. 297 """ 298 from meerschaum.config.static import STATIC_CONFIG 299 from meerschaum.utils.packages import attempt_import 300 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 301 schema_name = self.__dict__.get('internal_schema', None) or ( 302 STATIC_CONFIG['sql']['internal_schema'] 303 if self.flavor not in NO_SCHEMA_FLAVORS 304 else self.schema 305 ) 306 307 if '_internal_schema' not in self.__dict__: 308 self._internal_schema = schema_name 309 return self._internal_schema 310 311 312 @property 313 def db(self) -> Optional[databases.Database]: 314 from meerschaum.utils.packages import attempt_import 315 databases = attempt_import('databases', lazy=False, install=True) 316 url = self.DATABASE_URL 317 if 'mysql' in url: 318 url = url.replace('+pymysql', '') 319 if '_db' not in self.__dict__: 320 try: 321 self._db = databases.Database(url) 322 except KeyError: 323 ### Likely encountered an unsupported flavor. 324 from meerschaum.utils.warnings import warn 325 self._db = None 326 return self._db 327 328 329 @property 330 def db_version(self) -> Union[str, None]: 331 """ 332 Return the database version. 333 """ 334 _db_version = self.__dict__.get('_db_version', None) 335 if _db_version is not None: 336 return _db_version 337 338 from meerschaum.utils.sql import get_db_version 339 self._db_version = get_db_version(self) 340 return self._db_version 341 342 343 @property 344 def schema(self) -> Union[str, None]: 345 """ 346 Return the default schema to use. 347 A value of `None` will not prepend a schema. 348 """ 349 if 'schema' in self.__dict__: 350 return self.__dict__['schema'] 351 352 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 353 if self.flavor in NO_SCHEMA_FLAVORS: 354 self.__dict__['schema'] = None 355 return None 356 357 sqlalchemy = mrsm.attempt_import('sqlalchemy') 358 _schema = sqlalchemy.inspect(self.engine).default_schema_name 359 self.__dict__['schema'] = _schema 360 return _schema 361 362 363 def __getstate__(self): 364 return self.__dict__ 365 366 def __setstate__(self, d): 367 self.__dict__.update(d) 368 369 def __call__(self): 370 return self
Connect to SQL databases via sqlalchemy
.
SQLConnectors may be used as Meerschaum instance connectors. Read more about connectors and instances at https://meerschaum.io/reference/connectors/
90 def __init__( 91 self, 92 label: Optional[str] = None, 93 flavor: Optional[str] = None, 94 wait: bool = False, 95 connect: bool = False, 96 debug: bool = False, 97 **kw: Any 98 ): 99 """ 100 Parameters 101 ---------- 102 label: str, default 'main' 103 The identifying label for the connector. 104 E.g. for `sql:main`, 'main' is the label. 105 Defaults to 'main'. 106 107 flavor: Optional[str], default None 108 The database flavor, e.g. 109 `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc. 110 To see supported flavors, run the `bootstrap connectors` command. 111 112 wait: bool, default False 113 If `True`, block until a database connection has been made. 114 Defaults to `False`. 115 116 connect: bool, default False 117 If `True`, immediately attempt to connect the database and raise 118 a warning if the connection fails. 119 Defaults to `False`. 120 121 debug: bool, default False 122 Verbosity toggle. 123 Defaults to `False`. 124 125 kw: Any 126 All other arguments will be passed to the connector's attributes. 127 Therefore, a connector may be made without being registered, 128 as long enough parameters are supplied to the constructor. 129 """ 130 if 'uri' in kw: 131 uri = kw['uri'] 132 if uri.startswith('postgres') and not uri.startswith('postgresql'): 133 uri = uri.replace('postgres', 'postgresql', 1) 134 if uri.startswith('postgresql') and not uri.startswith('postgresql+'): 135 uri = uri.replace('postgresql://', 'postgresql+psycopg', 1) 136 if uri.startswith('timescaledb://'): 137 uri = uri.replace('timescaledb://', 'postgresql://', 1) 138 flavor = 'timescaledb' 139 kw['uri'] = uri 140 from_uri_params = self.from_uri(kw['uri'], as_dict=True) 141 label = label or from_uri_params.get('label', None) 142 _ = from_uri_params.pop('label', None) 143 144 ### Sometimes the flavor may be provided with a URI. 145 kw.update(from_uri_params) 146 if flavor: 147 kw['flavor'] = flavor 148 149 150 ### set __dict__ in base class 151 super().__init__( 152 'sql', 153 label = label or self.__dict__.get('label', None), 154 **kw 155 ) 156 157 if self.__dict__.get('flavor', None) == 'sqlite': 158 self._reset_attributes() 159 self._set_attributes( 160 'sql', 161 label = label, 162 inherit_default = False, 163 **kw 164 ) 165 ### For backwards compatability reasons, set the path for sql:local if its missing. 166 if self.label == 'local' and not self.__dict__.get('database', None): 167 from meerschaum.config._paths import SQLITE_DB_PATH 168 self.database = str(SQLITE_DB_PATH) 169 170 ### ensure flavor and label are set accordingly 171 if 'flavor' not in self.__dict__: 172 if flavor is None and 'uri' not in self.__dict__: 173 raise Exception( 174 f" Missing flavor. Provide flavor as a key for '{self}'." 175 ) 176 self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None) 177 178 if self.flavor == 'postgres': 179 self.flavor = 'postgresql' 180 181 self._debug = debug 182 ### Store the PID and thread at initialization 183 ### so we can dispose of the Pool in child processes or threads. 184 import os, threading 185 self._pid = os.getpid() 186 self._thread_ident = threading.current_thread().ident 187 self._sessions = {} 188 self._locks = {'_sessions': threading.RLock(), } 189 190 ### verify the flavor's requirements are met 191 if self.flavor not in self.flavor_configs: 192 error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector") 193 if not self.__dict__.get('uri'): 194 self.verify_attributes( 195 self.flavor_configs[self.flavor].get('requirements', set()), 196 debug=debug, 197 ) 198 199 if wait: 200 from meerschaum.connectors.poll import retry_connect 201 retry_connect(connector=self, debug=debug) 202 203 if connect: 204 if not self.test_connection(debug=debug): 205 from meerschaum.utils.warnings import warn 206 warn(f"Failed to connect with connector '{self}'!", stack=False)
Parameters
- label (str, default 'main'):
The identifying label for the connector.
E.g. for
sql:main
, 'main' is the label. Defaults to 'main'. - flavor (Optional[str], default None):
The database flavor, e.g.
'sqlite'
,'postgresql'
,'cockroachdb'
, etc. To see supported flavors, run thebootstrap connectors
command. - wait (bool, default False):
If
True
, block until a database connection has been made. Defaults toFalse
. - connect (bool, default False):
If
True
, immediately attempt to connect the database and raise a warning if the connection fails. Defaults toFalse
. - debug (bool, default False):
Verbosity toggle.
Defaults to
False
. - kw (Any): All other arguments will be passed to the connector's attributes. Therefore, a connector may be made without being registered, as long enough parameters are supplied to the constructor.
208 @property 209 def Session(self): 210 if '_Session' not in self.__dict__: 211 if self.engine is None: 212 return None 213 214 from meerschaum.utils.packages import attempt_import 215 sqlalchemy_orm = attempt_import('sqlalchemy.orm') 216 session_factory = sqlalchemy_orm.sessionmaker(self.engine) 217 self._Session = sqlalchemy_orm.scoped_session(session_factory) 218 219 return self._Session
221 @property 222 def engine(self): 223 import os, threading 224 ### build the sqlalchemy engine 225 if '_engine' not in self.__dict__: 226 self._engine, self._engine_str = self.create_engine(include_uri=True) 227 228 same_process = os.getpid() == self._pid 229 same_thread = threading.current_thread().ident == self._thread_ident 230 231 ### handle child processes 232 if not same_process: 233 self._pid = os.getpid() 234 self._thread = threading.current_thread() 235 from meerschaum.utils.warnings import warn 236 warn(f"Different PID detected. Disposing of connections...") 237 self._engine.dispose() 238 239 ### handle different threads 240 if not same_thread: 241 pass 242 243 return self._engine
245 @property 246 def DATABASE_URL(self) -> str: 247 """ 248 Return the URI connection string (alias for `SQLConnector.URI`. 249 """ 250 _ = self.engine 251 return str(self._engine_str)
Return the URI connection string (alias for SQLConnector.URI
.
253 @property 254 def URI(self) -> str: 255 """ 256 Return the URI connection string. 257 """ 258 _ = self.engine 259 return str(self._engine_str)
Return the URI connection string.
261 @property 262 def IS_THREAD_SAFE(self) -> str: 263 """ 264 Return whether this connector may be multithreaded. 265 """ 266 if self.flavor == 'duckdb': 267 return False 268 if self.flavor == 'sqlite': 269 return ':memory:' not in self.URI 270 return True
Return whether this connector may be multithreaded.
273 @property 274 def metadata(self): 275 """ 276 Return the metadata bound to this configured schema. 277 """ 278 from meerschaum.utils.packages import attempt_import 279 sqlalchemy = attempt_import('sqlalchemy') 280 if '_metadata' not in self.__dict__: 281 self._metadata = sqlalchemy.MetaData(schema=self.schema) 282 return self._metadata
Return the metadata bound to this configured schema.
285 @property 286 def instance_schema(self): 287 """ 288 Return the schema name for Meerschaum tables. 289 """ 290 return self.schema
Return the schema name for Meerschaum tables.
293 @property 294 def internal_schema(self): 295 """ 296 Return the schema name for internal tables. 297 """ 298 from meerschaum.config.static import STATIC_CONFIG 299 from meerschaum.utils.packages import attempt_import 300 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 301 schema_name = self.__dict__.get('internal_schema', None) or ( 302 STATIC_CONFIG['sql']['internal_schema'] 303 if self.flavor not in NO_SCHEMA_FLAVORS 304 else self.schema 305 ) 306 307 if '_internal_schema' not in self.__dict__: 308 self._internal_schema = schema_name 309 return self._internal_schema
Return the schema name for internal tables.
312 @property 313 def db(self) -> Optional[databases.Database]: 314 from meerschaum.utils.packages import attempt_import 315 databases = attempt_import('databases', lazy=False, install=True) 316 url = self.DATABASE_URL 317 if 'mysql' in url: 318 url = url.replace('+pymysql', '') 319 if '_db' not in self.__dict__: 320 try: 321 self._db = databases.Database(url) 322 except KeyError: 323 ### Likely encountered an unsupported flavor. 324 from meerschaum.utils.warnings import warn 325 self._db = None 326 return self._db
329 @property 330 def db_version(self) -> Union[str, None]: 331 """ 332 Return the database version. 333 """ 334 _db_version = self.__dict__.get('_db_version', None) 335 if _db_version is not None: 336 return _db_version 337 338 from meerschaum.utils.sql import get_db_version 339 self._db_version = get_db_version(self) 340 return self._db_version
Return the database version.
343 @property 344 def schema(self) -> Union[str, None]: 345 """ 346 Return the default schema to use. 347 A value of `None` will not prepend a schema. 348 """ 349 if 'schema' in self.__dict__: 350 return self.__dict__['schema'] 351 352 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 353 if self.flavor in NO_SCHEMA_FLAVORS: 354 self.__dict__['schema'] = None 355 return None 356 357 sqlalchemy = mrsm.attempt_import('sqlalchemy') 358 _schema = sqlalchemy.inspect(self.engine).default_schema_name 359 self.__dict__['schema'] = _schema 360 return _schema
Return the default schema to use.
A value of None
will not prepend a schema.
174def create_engine( 175 self, 176 include_uri: bool = False, 177 debug: bool = False, 178 **kw 179 ) -> 'sqlalchemy.engine.Engine': 180 """Create a sqlalchemy engine by building the engine string.""" 181 from meerschaum.utils.packages import attempt_import 182 from meerschaum.utils.warnings import error, warn 183 sqlalchemy = attempt_import('sqlalchemy') 184 import urllib 185 import copy 186 ### Install and patch required drivers. 187 if self.flavor in install_flavor_drivers: 188 attempt_import(*install_flavor_drivers[self.flavor], debug=debug, lazy=False, warn=False) 189 if self.flavor in require_patching_flavors: 190 from meerschaum.utils.packages import determine_version, _monkey_patch_get_distribution 191 import pathlib 192 for install_name, import_name in require_patching_flavors[self.flavor]: 193 pkg = attempt_import( 194 import_name, 195 debug = debug, 196 lazy = False, 197 warn = False 198 ) 199 _monkey_patch_get_distribution( 200 install_name, determine_version(pathlib.Path(pkg.__file__), venv='mrsm') 201 ) 202 203 ### supplement missing values with defaults (e.g. port number) 204 for a, value in flavor_configs[self.flavor]['defaults'].items(): 205 if a not in self.__dict__: 206 self.__dict__[a] = value 207 208 ### Verify that everything is in order. 209 if self.flavor not in flavor_configs: 210 error(f"Cannot create a connector with the flavor '{self.flavor}'.") 211 212 _engine = flavor_configs[self.flavor].get('engine', None) 213 _username = self.__dict__.get('username', None) 214 _password = self.__dict__.get('password', None) 215 _host = self.__dict__.get('host', None) 216 _port = self.__dict__.get('port', None) 217 _database = self.__dict__.get('database', None) 218 _options = self.__dict__.get('options', {}) 219 if isinstance(_options, str): 220 _options = dict(urllib.parse.parse_qsl(_options)) 221 _uri = self.__dict__.get('uri', None) 222 223 ### Handle registering specific dialects (due to installing in virtual environments). 224 if self.flavor in flavor_dialects: 225 sqlalchemy.dialects.registry.register(*flavor_dialects[self.flavor]) 226 227 ### self._sys_config was deepcopied and can be updated safely 228 if self.flavor in ("sqlite", "duckdb"): 229 engine_str = f"{_engine}:///{_database}" if not _uri else _uri 230 if 'create_engine' not in self._sys_config: 231 self._sys_config['create_engine'] = {} 232 if 'connect_args' not in self._sys_config['create_engine']: 233 self._sys_config['create_engine']['connect_args'] = {} 234 self._sys_config['create_engine']['connect_args'].update({"check_same_thread" : False}) 235 else: 236 engine_str = ( 237 _engine + "://" + (_username if _username is not None else '') + 238 ((":" + urllib.parse.quote_plus(_password)) if _password is not None else '') + 239 "@" + _host + ((":" + str(_port)) if _port is not None else '') + 240 (("/" + _database) if _database is not None else '') 241 + (("?" + urllib.parse.urlencode(_options)) if _options else '') 242 ) if not _uri else _uri 243 244 ### Sometimes the timescaledb:// flavor can slip in. 245 if _uri and self.flavor in ('timescaledb',) and self.flavor in _uri: 246 engine_str = engine_str.replace(f'{self.flavor}', 'postgresql', 1) 247 248 if debug: 249 dprint( 250 ( 251 (engine_str.replace(':' + _password, ':' + ('*' * len(_password)))) 252 if _password is not None else engine_str 253 ) + '\n' + f"{self._sys_config.get('create_engine', {}).get('connect_args', {})}" 254 ) 255 256 _kw_copy = copy.deepcopy(kw) 257 258 ### NOTE: Order of inheritance: 259 ### 1. Defaults 260 ### 2. System configuration 261 ### 3. Connector configuration 262 ### 4. Keyword arguments 263 _create_engine_args = flavor_configs.get(self.flavor, {}).get('create_engine', {}) 264 def _apply_create_engine_args(update): 265 if 'ALL' not in flavor_configs[self.flavor].get('omit_create_engine', {}): 266 _create_engine_args.update( 267 { k: v for k, v in update.items() 268 if 'omit_create_engine' not in flavor_configs[self.flavor] 269 or k not in flavor_configs[self.flavor].get('omit_create_engine') 270 } 271 ) 272 _apply_create_engine_args(self._sys_config.get('create_engine', {})) 273 _apply_create_engine_args(self.__dict__.get('create_engine', {})) 274 _apply_create_engine_args(_kw_copy) 275 276 try: 277 engine = sqlalchemy.create_engine( 278 engine_str, 279 ### I know this looks confusing, and maybe it's bad code, 280 ### but it's simple. It dynamically parses the config string 281 ### and splits it to separate the class name (QueuePool) 282 ### from the module name (sqlalchemy.pool). 283 poolclass = getattr( 284 attempt_import( 285 ".".join(self._sys_config['poolclass'].split('.')[:-1]) 286 ), 287 self._sys_config['poolclass'].split('.')[-1] 288 ), 289 echo = debug, 290 **_create_engine_args 291 ) 292 except Exception as e: 293 warn(f"Failed to create connector '{self}':\n{traceback.format_exc()}", stack=False) 294 engine = None 295 296 if include_uri: 297 return engine, engine_str 298 return engine
Create a sqlalchemy engine by building the engine string.
24def read( 25 self, 26 query_or_table: Union[str, sqlalchemy.Query], 27 params: Union[Dict[str, Any], List[str], None] = None, 28 dtype: Optional[Dict[str, Any]] = None, 29 coerce_float: bool = True, 30 chunksize: Optional[int] = -1, 31 workers: Optional[int] = None, 32 chunk_hook: Optional[Callable[[pandas.DataFrame], Any]] = None, 33 as_hook_results: bool = False, 34 chunks: Optional[int] = None, 35 schema: Optional[str] = None, 36 as_chunks: bool = False, 37 as_iterator: bool = False, 38 as_dask: bool = False, 39 index_col: Optional[str] = None, 40 silent: bool = False, 41 debug: bool = False, 42 **kw: Any 43 ) -> Union[ 44 pandas.DataFrame, 45 dask.DataFrame, 46 List[pandas.DataFrame], 47 List[Any], 48 None, 49 ]: 50 """ 51 Read a SQL query or table into a pandas dataframe. 52 53 Parameters 54 ---------- 55 query_or_table: Union[str, sqlalchemy.Query] 56 The SQL query (sqlalchemy Query or string) or name of the table from which to select. 57 58 params: Optional[Dict[str, Any]], default None 59 `List` or `Dict` of parameters to pass to `pandas.read_sql()`. 60 See the pandas documentation for more information: 61 https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html 62 63 dtype: Optional[Dict[str, Any]], default None 64 A dictionary of data types to pass to `pandas.read_sql()`. 65 See the pandas documentation for more information: 66 https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html 67 68 chunksize: Optional[int], default -1 69 How many chunks to read at a time. `None` will read everything in one large chunk. 70 Defaults to system configuration. 71 72 **NOTE:** DuckDB does not allow for chunking. 73 74 workers: Optional[int], default None 75 How many threads to use when consuming the generator. 76 Only applies if `chunk_hook` is provided. 77 78 chunk_hook: Optional[Callable[[pandas.DataFrame], Any]], default None 79 Hook function to execute once per chunk, e.g. writing and reading chunks intermittently. 80 See `--sync-chunks` for an example. 81 **NOTE:** `as_iterator` MUST be False (default). 82 83 as_hook_results: bool, default False 84 If `True`, return a `List` of the outputs of the hook function. 85 Only applicable if `chunk_hook` is not None. 86 87 **NOTE:** `as_iterator` MUST be `False` (default). 88 89 chunks: Optional[int], default None 90 Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and 91 return into a single dataframe. 92 For example, to limit the returned dataframe to 100,000 rows, 93 you could specify a `chunksize` of `1000` and `chunks` of `100`. 94 95 schema: Optional[str], default None 96 If just a table name is provided, optionally specify the table schema. 97 Defaults to `SQLConnector.schema`. 98 99 as_chunks: bool, default False 100 If `True`, return a list of DataFrames. 101 Otherwise return a single DataFrame. 102 103 as_iterator: bool, default False 104 If `True`, return the pandas DataFrame iterator. 105 `chunksize` must not be `None` (falls back to 1000 if so), 106 and hooks are not called in this case. 107 108 index_col: Optional[str], default None 109 If using Dask, use this column as the index column. 110 If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame. 111 112 silent: bool, default False 113 If `True`, don't raise warnings in case of errors. 114 Defaults to `False`. 115 116 Returns 117 ------- 118 A `pd.DataFrame` (default case), or an iterator, or a list of dataframes / iterators, 119 or `None` if something breaks. 120 121 """ 122 if chunks is not None and chunks <= 0: 123 return [] 124 from meerschaum.utils.sql import sql_item_name, truncate_item_name 125 from meerschaum.utils.dtypes.sql import NUMERIC_PRECISION_FLAVORS 126 from meerschaum.utils.packages import attempt_import, import_pandas 127 from meerschaum.utils.pool import get_pool 128 from meerschaum.utils.dataframe import chunksize_to_npartitions, get_numeric_cols 129 import warnings 130 import inspect 131 import traceback 132 from decimal import Decimal 133 pd = import_pandas() 134 dd = None 135 is_dask = 'dask' in pd.__name__ 136 pd = attempt_import('pandas') 137 is_dask = dd is not None 138 npartitions = chunksize_to_npartitions(chunksize) 139 if is_dask: 140 chunksize = None 141 schema = schema or self.schema 142 143 sqlalchemy = attempt_import("sqlalchemy") 144 default_chunksize = self._sys_config.get('chunksize', None) 145 chunksize = chunksize if chunksize != -1 else default_chunksize 146 if chunksize is None and as_iterator: 147 if not silent and self.flavor not in _disallow_chunks_flavors: 148 warn( 149 f"An iterator may only be generated if chunksize is not None.\n" 150 + "Falling back to a chunksize of 1000.", stacklevel=3, 151 ) 152 chunksize = 1000 153 if chunksize is not None and self.flavor in _max_chunks_flavors: 154 if chunksize > _max_chunks_flavors[self.flavor]: 155 if chunksize != default_chunksize: 156 warn( 157 f"The specified chunksize of {chunksize} exceeds the maximum of " 158 + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n" 159 + f" Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.", 160 stacklevel = 3, 161 ) 162 chunksize = _max_chunks_flavors[self.flavor] 163 164 ### NOTE: A bug in duckdb_engine does not allow for chunks. 165 if chunksize is not None and self.flavor in _disallow_chunks_flavors: 166 chunksize = None 167 168 if debug: 169 import time 170 start = time.perf_counter() 171 dprint(f"[{self}]\n{query_or_table}") 172 dprint(f"[{self}] Fetching with chunksize: {chunksize}") 173 174 ### This might be sqlalchemy object or the string of a table name. 175 ### We check for spaces and quotes to see if it might be a weird table. 176 if ( 177 ' ' not in str(query_or_table) 178 or ( 179 ' ' in str(query_or_table) 180 and str(query_or_table).startswith('"') 181 and str(query_or_table).endswith('"') 182 ) 183 ): 184 truncated_table_name = truncate_item_name(str(query_or_table), self.flavor) 185 if truncated_table_name != str(query_or_table) and not silent: 186 warn( 187 f"Table '{name}' is too long for '{self.flavor}'," 188 + f" will instead create the table '{truncated_name}'." 189 ) 190 191 query_or_table = sql_item_name(str(query_or_table), self.flavor, schema) 192 if debug: 193 dprint(f"[{self}] Reading from table {query_or_table}") 194 formatted_query = sqlalchemy.text("SELECT * FROM " + str(query_or_table)) 195 str_query = f"SELECT * FROM {query_or_table}" 196 else: 197 str_query = query_or_table 198 199 formatted_query = ( 200 sqlalchemy.text(str_query) 201 if not is_dask and isinstance(str_query, str) 202 else format_sql_query_for_dask(str_query) 203 ) 204 205 chunk_list = [] 206 chunk_hook_results = [] 207 try: 208 stream_results = not as_iterator and chunk_hook is not None and chunksize is not None 209 with warnings.catch_warnings(): 210 warnings.filterwarnings('ignore', 'case sensitivity issues') 211 212 read_sql_query_kwargs = { 213 'params': params, 214 'dtype': dtype, 215 'coerce_float': coerce_float, 216 'index_col': index_col, 217 } 218 if is_dask: 219 if index_col is None: 220 dd = None 221 pd = attempt_import('pandas') 222 read_sql_query_kwargs.update({ 223 'chunksize': chunksize, 224 }) 225 else: 226 read_sql_query_kwargs.update({ 227 'chunksize': chunksize, 228 }) 229 230 if is_dask and dd is not None: 231 ddf = dd.read_sql_query( 232 formatted_query, 233 self.URI, 234 **read_sql_query_kwargs 235 ) 236 else: 237 238 with self.engine.begin() as transaction: 239 with transaction.execution_options(stream_results=stream_results) as connection: 240 chunk_generator = pd.read_sql_query( 241 formatted_query, 242 connection, 243 **read_sql_query_kwargs 244 ) 245 246 ### `stream_results` must be False (will load everything into memory). 247 if as_iterator or chunksize is None: 248 return chunk_generator 249 250 ### We must consume the generator in this context if using server-side cursors. 251 if stream_results: 252 253 pool = get_pool(workers=workers) 254 255 def _process_chunk(_chunk, _retry_on_failure: bool = True): 256 if not as_hook_results: 257 chunk_list.append(_chunk) 258 result = None 259 if chunk_hook is not None: 260 try: 261 result = chunk_hook( 262 _chunk, 263 workers = workers, 264 chunksize = chunksize, 265 debug = debug, 266 **kw 267 ) 268 except Exception as e: 269 result = False, traceback.format_exc() 270 from meerschaum.utils.formatting import get_console 271 if not silent: 272 get_console().print_exception() 273 274 ### If the chunk fails to process, try it again one more time. 275 if isinstance(result, tuple) and result[0] is False: 276 if _retry_on_failure: 277 return _process_chunk(_chunk, _retry_on_failure=False) 278 279 return result 280 281 chunk_hook_results = list(pool.imap(_process_chunk, chunk_generator)) 282 if as_hook_results: 283 return chunk_hook_results 284 285 except Exception as e: 286 if debug: 287 dprint(f"[{self}] Failed to execute query:\n\n{query_or_table}\n\n") 288 if not silent: 289 warn(str(e), stacklevel=3) 290 from meerschaum.utils.formatting import get_console 291 if not silent: 292 get_console().print_exception() 293 294 return None 295 296 if is_dask and dd is not None: 297 ddf = ddf.reset_index() 298 return ddf 299 300 chunk_list = [] 301 read_chunks = 0 302 chunk_hook_results = [] 303 if chunksize is None: 304 chunk_list.append(chunk_generator) 305 elif as_iterator: 306 return chunk_generator 307 else: 308 try: 309 for chunk in chunk_generator: 310 if chunk_hook is not None: 311 chunk_hook_results.append( 312 chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw) 313 ) 314 chunk_list.append(chunk) 315 read_chunks += 1 316 if chunks is not None and read_chunks >= chunks: 317 break 318 except Exception as e: 319 warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3) 320 from meerschaum.utils.formatting import get_console 321 if not silent: 322 get_console().print_exception() 323 324 read_chunks = 0 325 try: 326 for chunk in chunk_generator: 327 if chunk_hook is not None: 328 chunk_hook_results.append( 329 chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw) 330 ) 331 chunk_list.append(chunk) 332 read_chunks += 1 333 if chunks is not None and read_chunks >= chunks: 334 break 335 except Exception as e: 336 warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3) 337 from meerschaum.utils.formatting import get_console 338 if not silent: 339 get_console().print_exception() 340 341 return None 342 343 ### If no chunks returned, read without chunks 344 ### to get columns 345 if len(chunk_list) == 0: 346 with warnings.catch_warnings(): 347 warnings.filterwarnings('ignore', 'case sensitivity issues') 348 _ = read_sql_query_kwargs.pop('chunksize', None) 349 with self.engine.begin() as connection: 350 chunk_list.append( 351 pd.read_sql_query( 352 formatted_query, 353 connection, 354 **read_sql_query_kwargs 355 ) 356 ) 357 358 ### call the hook on any missed chunks. 359 if chunk_hook is not None and len(chunk_list) > len(chunk_hook_results): 360 for c in chunk_list[len(chunk_hook_results):]: 361 chunk_hook_results.append( 362 chunk_hook(c, chunksize=chunksize, debug=debug, **kw) 363 ) 364 365 ### chunksize is not None so must iterate 366 if debug: 367 end = time.perf_counter() 368 dprint(f"Fetched {len(chunk_list)} chunks in {round(end - start, 2)} seconds.") 369 370 if as_hook_results: 371 return chunk_hook_results 372 373 ### Skip `pd.concat()` if `as_chunks` is specified. 374 if as_chunks: 375 for c in chunk_list: 376 c.reset_index(drop=True, inplace=True) 377 for col in get_numeric_cols(c): 378 c[col] = c[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x) 379 return chunk_list 380 381 df = pd.concat(chunk_list).reset_index(drop=True) 382 ### NOTE: The calls to `canonical()` are to drop leading and trailing zeroes. 383 for col in get_numeric_cols(df): 384 df[col] = df[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x) 385 386 return df
Read a SQL query or table into a pandas dataframe.
Parameters
- query_or_table (Union[str, sqlalchemy.Query]): The SQL query (sqlalchemy Query or string) or name of the table from which to select.
- params (Optional[Dict[str, Any]], default None):
List
orDict
of parameters to pass topandas.read_sql()
. See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html - dtype (Optional[Dict[str, Any]], default None):
A dictionary of data types to pass to
pandas.read_sql()
. See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html chunksize (Optional[int], default -1): How many chunks to read at a time.
None
will read everything in one large chunk. Defaults to system configuration.NOTE: DuckDB does not allow for chunking.
- workers (Optional[int], default None):
How many threads to use when consuming the generator.
Only applies if
chunk_hook
is provided. - chunk_hook (Optional[Callable[[pandas.DataFrame], Any]], default None):
Hook function to execute once per chunk, e.g. writing and reading chunks intermittently.
See
--sync-chunks
for an example. NOTE:as_iterator
MUST be False (default). as_hook_results (bool, default False): If
True
, return aList
of the outputs of the hook function. Only applicable ifchunk_hook
is not None.NOTE:
as_iterator
MUST beFalse
(default).- chunks (Optional[int], default None):
Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and
return into a single dataframe.
For example, to limit the returned dataframe to 100,000 rows,
you could specify a
chunksize
of1000
andchunks
of100
. - schema (Optional[str], default None):
If just a table name is provided, optionally specify the table schema.
Defaults to
SQLConnector.schema
. - as_chunks (bool, default False):
If
True
, return a list of DataFrames. Otherwise return a single DataFrame. - as_iterator (bool, default False):
If
True
, return the pandas DataFrame iterator.chunksize
must not beNone
(falls back to 1000 if so), and hooks are not called in this case. - index_col (Optional[str], default None): If using Dask, use this column as the index column. If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame.
- silent (bool, default False):
If
True
, don't raise warnings in case of errors. Defaults toFalse
.
Returns
- A
pd.DataFrame
(default case), or an iterator, or a list of dataframes / iterators, - or
None
if something breaks.
389def value( 390 self, 391 query: str, 392 *args: Any, 393 use_pandas: bool = False, 394 **kw: Any 395 ) -> Any: 396 """ 397 Execute the provided query and return the first value. 398 399 Parameters 400 ---------- 401 query: str 402 The SQL query to execute. 403 404 *args: Any 405 The arguments passed to `meerschaum.connectors.sql.SQLConnector.exec` 406 if `use_pandas` is `False` (default) or to `meerschaum.connectors.sql.SQLConnector.read`. 407 408 use_pandas: bool, default False 409 If `True`, use `meerschaum.connectors.SQLConnector.read`, otherwise use 410 `meerschaum.connectors.sql.SQLConnector.exec` (default). 411 **NOTE:** This is always `True` for DuckDB. 412 413 **kw: Any 414 See `args`. 415 416 Returns 417 ------- 418 Any value returned from the query. 419 420 """ 421 from meerschaum.utils.packages import attempt_import 422 sqlalchemy = attempt_import('sqlalchemy') 423 if self.flavor == 'duckdb': 424 use_pandas = True 425 if use_pandas: 426 try: 427 return self.read(query, *args, **kw).iloc[0, 0] 428 except Exception as e: 429 return None 430 431 _close = kw.get('close', True) 432 _commit = kw.get('commit', (self.flavor != 'mssql')) 433 try: 434 result, connection = self.exec( 435 query, 436 *args, 437 with_connection = True, 438 close = False, 439 commit = _commit, 440 **kw 441 ) 442 first = result.first() if result is not None else None 443 _val = first[0] if first is not None else None 444 except Exception as e: 445 warn(e, stacklevel=3) 446 return None 447 if _close: 448 try: 449 connection.close() 450 except Exception as e: 451 warn("Failed to close connection with exception:\n" + str(e)) 452 return _val
Execute the provided query and return the first value.
Parameters
- query (str): The SQL query to execute.
- *args (Any):
The arguments passed to
SQLConnector.exec
ifuse_pandas
isFalse
(default) or toSQLConnector.read
. - use_pandas (bool, default False):
If
True
, useSQLConnector.read
, otherwise useSQLConnector.exec
(default). NOTE: This is alwaysTrue
for DuckDB. - **kw (Any):
See
args
.
Returns
- Any value returned from the query.
466def exec( 467 self, 468 query: str, 469 *args: Any, 470 silent: bool = False, 471 debug: bool = False, 472 commit: Optional[bool] = None, 473 close: Optional[bool] = None, 474 with_connection: bool = False, 475 **kw: Any 476 ) -> Union[ 477 sqlalchemy.engine.result.resultProxy, 478 sqlalchemy.engine.cursor.LegacyCursorResult, 479 Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection], 480 Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection], 481 None 482 ]: 483 """ 484 Execute SQL code and return the `sqlalchemy` result, e.g. when calling stored procedures. 485 486 If inserting data, please use bind variables to avoid SQL injection! 487 488 Parameters 489 ---------- 490 query: Union[str, List[str], Tuple[str]] 491 The query to execute. 492 If `query` is a list or tuple, call `self.exec_queries()` instead. 493 494 args: Any 495 Arguments passed to `sqlalchemy.engine.execute`. 496 497 silent: bool, default False 498 If `True`, suppress warnings. 499 500 commit: Optional[bool], default None 501 If `True`, commit the changes after execution. 502 Causes issues with flavors like `'mssql'`. 503 This does not apply if `query` is a list of strings. 504 505 close: Optional[bool], default None 506 If `True`, close the connection after execution. 507 Causes issues with flavors like `'mssql'`. 508 This does not apply if `query` is a list of strings. 509 510 with_connection: bool, default False 511 If `True`, return a tuple including the connection object. 512 This does not apply if `query` is a list of strings. 513 514 Returns 515 ------- 516 The `sqlalchemy` result object, or a tuple with the connection if `with_connection` is provided. 517 518 """ 519 if isinstance(query, (list, tuple)): 520 return self.exec_queries( 521 list(query), 522 *args, 523 silent = silent, 524 debug = debug, 525 **kw 526 ) 527 528 from meerschaum.utils.packages import attempt_import 529 sqlalchemy = attempt_import("sqlalchemy") 530 if debug: 531 dprint(f"[{self}] Executing query:\n{query}") 532 533 _close = close if close is not None else (self.flavor != 'mssql') 534 _commit = commit if commit is not None else ( 535 (self.flavor != 'mssql' or 'select' not in str(query).lower()) 536 ) 537 538 ### Select and Insert objects need to be compiled (SQLAlchemy 2.0.0+). 539 if not hasattr(query, 'compile'): 540 query = sqlalchemy.text(query) 541 542 connection = self.engine.connect() 543 transaction = connection.begin() if _commit else None 544 try: 545 result = connection.execute(query, *args, **kw) 546 if _commit: 547 transaction.commit() 548 except Exception as e: 549 if debug: 550 dprint(f"[{self}] Failed to execute query:\n\n{query}\n\n{e}") 551 if not silent: 552 warn(str(e), stacklevel=3) 553 result = None 554 if _commit: 555 transaction.rollback() 556 finally: 557 if _close: 558 connection.close() 559 560 if with_connection: 561 return result, connection 562 563 return result
Execute SQL code and return the sqlalchemy
result, e.g. when calling stored procedures.
If inserting data, please use bind variables to avoid SQL injection!
Parameters
- query (Union[str, List[str], Tuple[str]]):
The query to execute.
If
query
is a list or tuple, callself.exec_queries()
instead. - args (Any):
Arguments passed to
sqlalchemy.engine.execute
. - silent (bool, default False):
If
True
, suppress warnings. - commit (Optional[bool], default None):
If
True
, commit the changes after execution. Causes issues with flavors like'mssql'
. This does not apply ifquery
is a list of strings. - close (Optional[bool], default None):
If
True
, close the connection after execution. Causes issues with flavors like'mssql'
. This does not apply ifquery
is a list of strings. - with_connection (bool, default False):
If
True
, return a tuple including the connection object. This does not apply ifquery
is a list of strings.
Returns
- The
sqlalchemy
result object, or a tuple with the connection ifwith_connection
is provided.
455def execute( 456 self, 457 *args : Any, 458 **kw : Any 459 ) -> Optional[sqlalchemy.engine.result.resultProxy]: 460 """ 461 An alias for `meerschaum.connectors.sql.SQLConnector.exec`. 462 """ 463 return self.exec(*args, **kw)
An alias for SQLConnector.exec
.
660def to_sql( 661 self, 662 df: pandas.DataFrame, 663 name: str = None, 664 index: bool = False, 665 if_exists: str = 'replace', 666 method: str = "", 667 chunksize: Optional[int] = -1, 668 schema: Optional[str] = None, 669 silent: bool = False, 670 debug: bool = False, 671 as_tuple: bool = False, 672 as_dict: bool = False, 673 **kw 674 ) -> Union[bool, SuccessTuple]: 675 """ 676 Upload a DataFrame's contents to the SQL server. 677 678 Parameters 679 ---------- 680 df: pd.DataFrame 681 The DataFrame to be uploaded. 682 683 name: str 684 The name of the table to be created. 685 686 index: bool, default False 687 If True, creates the DataFrame's indices as columns. 688 689 if_exists: str, default 'replace' 690 Drop and create the table ('replace') or append if it exists 691 ('append') or raise Exception ('fail'). 692 Options are ['replace', 'append', 'fail']. 693 694 method: str, default '' 695 None or multi. Details on pandas.to_sql. 696 697 chunksize: Optional[int], default -1 698 How many rows to insert at a time. 699 700 schema: Optional[str], default None 701 Optionally override the schema for the table. 702 Defaults to `SQLConnector.schema`. 703 704 as_tuple: bool, default False 705 If `True`, return a (success_bool, message) tuple instead of a `bool`. 706 Defaults to `False`. 707 708 as_dict: bool, default False 709 If `True`, return a dictionary of transaction information. 710 The keys are `success`, `msg`, `start`, `end`, `duration`, `num_rows`, `chunksize`, 711 `method`, and `target`. 712 713 kw: Any 714 Additional arguments will be passed to the DataFrame's `to_sql` function 715 716 Returns 717 ------- 718 Either a `bool` or a `SuccessTuple` (depends on `as_tuple`). 719 """ 720 import time 721 import json 722 import decimal 723 from decimal import Decimal, Context 724 from meerschaum.utils.warnings import error, warn 725 import warnings 726 import functools 727 if name is None: 728 error(f"Name must not be `None` to insert data into {self}.") 729 730 ### We're requiring `name` to be positional, and sometimes it's passed in from background jobs. 731 kw.pop('name', None) 732 733 schema = schema or self.schema 734 735 from meerschaum.utils.sql import ( 736 sql_item_name, 737 table_exists, 738 json_flavors, 739 truncate_item_name, 740 ) 741 from meerschaum.utils.dataframe import get_json_cols, get_numeric_cols 742 from meerschaum.utils.dtypes import are_dtypes_equal, quantize_decimal 743 from meerschaum.utils.dtypes.sql import NUMERIC_PRECISION_FLAVORS 744 from meerschaum.connectors.sql._create_engine import flavor_configs 745 from meerschaum.utils.packages import attempt_import, import_pandas 746 sqlalchemy = attempt_import('sqlalchemy', debug=debug) 747 pd = import_pandas() 748 is_dask = 'dask' in df.__module__ 749 750 stats = {'target': name, } 751 ### resort to defaults if None 752 if method == "": 753 if self.flavor in _bulk_flavors: 754 method = functools.partial(psql_insert_copy, schema=self.schema) 755 else: 756 ### Should resolve to 'multi' or `None`. 757 method = flavor_configs.get(self.flavor, {}).get('to_sql', {}).get('method', 'multi') 758 stats['method'] = method.__name__ if hasattr(method, '__name__') else str(method) 759 760 default_chunksize = self._sys_config.get('chunksize', None) 761 chunksize = chunksize if chunksize != -1 else default_chunksize 762 if chunksize is not None and self.flavor in _max_chunks_flavors: 763 if chunksize > _max_chunks_flavors[self.flavor]: 764 if chunksize != default_chunksize: 765 warn( 766 f"The specified chunksize of {chunksize} exceeds the maximum of " 767 + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n" 768 + f" Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.", 769 stacklevel = 3, 770 ) 771 chunksize = _max_chunks_flavors[self.flavor] 772 stats['chunksize'] = chunksize 773 774 success, msg = False, "Default to_sql message" 775 start = time.perf_counter() 776 if debug: 777 msg = f"[{self}] Inserting {len(df)} rows with chunksize: {chunksize}..." 778 print(msg, end="", flush=True) 779 stats['num_rows'] = len(df) 780 781 ### Check if the name is too long. 782 truncated_name = truncate_item_name(name, self.flavor) 783 if name != truncated_name: 784 warn( 785 f"Table '{name}' is too long for '{self.flavor}'," 786 + f" will instead create the table '{truncated_name}'." 787 ) 788 789 ### filter out non-pandas args 790 import inspect 791 to_sql_params = inspect.signature(df.to_sql).parameters 792 to_sql_kw = {} 793 for k, v in kw.items(): 794 if k in to_sql_params: 795 to_sql_kw[k] = v 796 797 to_sql_kw.update({ 798 'name': truncated_name, 799 'schema': schema, 800 ('con' if not is_dask else 'uri'): (self.engine if not is_dask else self.URI), 801 'index': index, 802 'if_exists': if_exists, 803 'method': method, 804 'chunksize': chunksize, 805 }) 806 if is_dask: 807 to_sql_kw.update({ 808 'parallel': True, 809 }) 810 811 if self.flavor == 'oracle': 812 ### For some reason 'replace' doesn't work properly in pandas, 813 ### so try dropping first. 814 if if_exists == 'replace' and table_exists(name, self, schema=schema, debug=debug): 815 success = self.exec( 816 "DROP TABLE " + sql_item_name(name, 'oracle', schema) 817 ) is not None 818 if not success: 819 warn(f"Unable to drop {name}") 820 821 822 ### Enforce NVARCHAR(2000) as text instead of CLOB. 823 dtype = to_sql_kw.get('dtype', {}) 824 for col, typ in df.dtypes.items(): 825 if are_dtypes_equal(str(typ), 'object'): 826 dtype[col] = sqlalchemy.types.NVARCHAR(2000) 827 elif are_dtypes_equal(str(typ), 'int'): 828 dtype[col] = sqlalchemy.types.INTEGER 829 to_sql_kw['dtype'] = dtype 830 elif self.flavor == 'mssql': 831 dtype = to_sql_kw.get('dtype', {}) 832 for col, typ in df.dtypes.items(): 833 if are_dtypes_equal(str(typ), 'bool'): 834 dtype[col] = sqlalchemy.types.INTEGER 835 to_sql_kw['dtype'] = dtype 836 837 ### Check for JSON columns. 838 if self.flavor not in json_flavors: 839 json_cols = get_json_cols(df) 840 if json_cols: 841 for col in json_cols: 842 df[col] = df[col].apply( 843 ( 844 lambda x: json.dumps(x, default=str, sort_keys=True) 845 if not isinstance(x, Hashable) 846 else x 847 ) 848 ) 849 850 ### Check for numeric columns. 851 numeric_scale, numeric_precision = NUMERIC_PRECISION_FLAVORS.get(self.flavor, (None, None)) 852 if numeric_precision is not None and numeric_scale is not None: 853 numeric_cols = get_numeric_cols(df) 854 for col in numeric_cols: 855 df[col] = df[col].apply( 856 lambda x: ( 857 quantize_decimal(x, numeric_scale, numeric_precision) 858 if isinstance(x, Decimal) 859 else x 860 ) 861 ) 862 863 try: 864 with warnings.catch_warnings(): 865 warnings.filterwarnings('ignore', 'case sensitivity issues') 866 df.to_sql(**to_sql_kw) 867 success = True 868 except Exception as e: 869 if not silent: 870 warn(str(e)) 871 success, msg = False, str(e) 872 873 end = time.perf_counter() 874 if success: 875 msg = f"It took {round(end - start, 2)} seconds to sync {len(df)} rows to {name}." 876 stats['start'] = start 877 stats['end'] = end 878 stats['duration'] = end - start 879 880 if debug: 881 print(f" done.", flush=True) 882 dprint(msg) 883 884 stats['success'] = success 885 stats['msg'] = msg 886 if as_tuple: 887 return success, msg 888 if as_dict: 889 return stats 890 return success
Upload a DataFrame's contents to the SQL server.
Parameters
- df (pd.DataFrame): The DataFrame to be uploaded.
- name (str): The name of the table to be created.
- index (bool, default False): If True, creates the DataFrame's indices as columns.
- if_exists (str, default 'replace'): Drop and create the table ('replace') or append if it exists ('append') or raise Exception ('fail'). Options are ['replace', 'append', 'fail'].
- method (str, default ''): None or multi. Details on pandas.to_sql.
- chunksize (Optional[int], default -1): How many rows to insert at a time.
- schema (Optional[str], default None):
Optionally override the schema for the table.
Defaults to
SQLConnector.schema
. - as_tuple (bool, default False):
If
True
, return a (success_bool, message) tuple instead of abool
. Defaults toFalse
. - as_dict (bool, default False):
If
True
, return a dictionary of transaction information. The keys aresuccess
,msg
,start
,end
,duration
,num_rows
,chunksize
,method
, andtarget
. - kw (Any):
Additional arguments will be passed to the DataFrame's
to_sql
function
Returns
- Either a
bool
or aSuccessTuple
(depends onas_tuple
).
566def exec_queries( 567 self, 568 queries: List[ 569 Union[ 570 str, 571 Tuple[str, Callable[['sqlalchemy.orm.session.Session'], List[str]]] 572 ] 573 ], 574 break_on_error: bool = False, 575 rollback: bool = True, 576 silent: bool = False, 577 debug: bool = False, 578 ) -> List[sqlalchemy.engine.cursor.LegacyCursorResult]: 579 """ 580 Execute a list of queries in a single transaction. 581 582 Parameters 583 ---------- 584 queries: List[ 585 Union[ 586 str, 587 Tuple[str, Callable[[], List[str]]] 588 ] 589 ] 590 The queries in the transaction to be executed. 591 If a query is a tuple, the second item of the tuple 592 will be considered a callable hook that returns a list of queries to be executed 593 before the next item in the list. 594 595 break_on_error: bool, default True 596 If `True`, stop executing when a query fails. 597 598 rollback: bool, default True 599 If `break_on_error` is `True`, rollback the transaction if a query fails. 600 601 silent: bool, default False 602 If `True`, suppress warnings. 603 604 Returns 605 ------- 606 A list of SQLAlchemy results. 607 """ 608 from meerschaum.utils.warnings import warn 609 from meerschaum.utils.debug import dprint 610 from meerschaum.utils.packages import attempt_import 611 sqlalchemy, sqlalchemy_orm = attempt_import('sqlalchemy', 'sqlalchemy.orm') 612 session = sqlalchemy_orm.Session(self.engine) 613 614 result = None 615 results = [] 616 with session.begin(): 617 for query in queries: 618 hook = None 619 result = None 620 621 if isinstance(query, tuple): 622 query, hook = query 623 if isinstance(query, str): 624 query = sqlalchemy.text(query) 625 626 if debug: 627 dprint(f"[{self}]\n" + str(query)) 628 629 try: 630 result = session.execute(query) 631 session.flush() 632 except Exception as e: 633 msg = (f"Encountered error while executing:\n{e}") 634 if not silent: 635 warn(msg) 636 elif debug: 637 dprint(f"[{self}]\n" + str(msg)) 638 result = None 639 if result is None and break_on_error: 640 if rollback: 641 session.rollback() 642 break 643 elif result is not None and hook is not None: 644 hook_queries = hook(session) 645 if hook_queries: 646 hook_results = self.exec_queries( 647 hook_queries, 648 break_on_error = break_on_error, 649 rollback = rollback, 650 silent = silent, 651 debug = debug, 652 ) 653 result = (result, hook_results) 654 655 results.append(result) 656 657 return results
Execute a list of queries in a single transaction.
Parameters
- queries (List[): Union[ str, Tuple[str, Callable[[], List[str]]] ]
- ]: The queries in the transaction to be executed. If a query is a tuple, the second item of the tuple will be considered a callable hook that returns a list of queries to be executed before the next item in the list.
- break_on_error (bool, default True):
If
True
, stop executing when a query fails. - rollback (bool, default True):
If
break_on_error
isTrue
, rollback the transaction if a query fails. - silent (bool, default False):
If
True
, suppress warnings.
Returns
- A list of SQLAlchemy results.
414def test_connection( 415 self, 416 **kw: Any 417 ) -> Union[bool, None]: 418 """ 419 Test if a successful connection to the database may be made. 420 421 Parameters 422 ---------- 423 **kw: 424 The keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`. 425 426 Returns 427 ------- 428 `True` if a connection is made, otherwise `False` or `None` in case of failure. 429 430 """ 431 import warnings 432 from meerschaum.connectors.poll import retry_connect 433 _default_kw = {'max_retries': 1, 'retry_wait': 0, 'warn': False, 'connector': self} 434 _default_kw.update(kw) 435 with warnings.catch_warnings(): 436 warnings.filterwarnings('ignore', 'Could not') 437 try: 438 return retry_connect(**_default_kw) 439 except Exception as e: 440 return False
Test if a successful connection to the database may be made.
Parameters
- **kw:: The keyword arguments are passed to
meerschaum.connectors.poll.retry_connect
.
Returns
True
if a connection is made, otherwiseFalse
orNone
in case of failure.
15def fetch( 16 self, 17 pipe: meerschaum.Pipe, 18 begin: Union[datetime, int, str, None] = '', 19 end: Union[datetime, int, str, None] = None, 20 check_existing: bool = True, 21 chunk_hook: Optional[Callable[[pd.DataFrame], Any]] = None, 22 chunksize: Optional[int] = -1, 23 workers: Optional[int] = None, 24 debug: bool = False, 25 **kw: Any 26 ) -> Union['pd.DataFrame', List[Any], None]: 27 """Execute the SQL definition and return a Pandas DataFrame. 28 29 Parameters 30 ---------- 31 pipe: meerschaum.Pipe 32 The pipe object which contains the `fetch` metadata. 33 34 - pipe.columns['datetime']: str 35 - Name of the datetime column for the remote table. 36 - pipe.parameters['fetch']: Dict[str, Any] 37 - Parameters necessary to execute a query. 38 - pipe.parameters['fetch']['definition']: str 39 - Raw SQL query to execute to generate the pandas DataFrame. 40 - pipe.parameters['fetch']['backtrack_minutes']: Union[int, float] 41 - How many minutes before `begin` to search for data (*optional*). 42 43 begin: Union[datetime, int, str, None], default None 44 Most recent datatime to search for data. 45 If `backtrack_minutes` is provided, subtract `backtrack_minutes`. 46 47 end: Union[datetime, int, str, None], default None 48 The latest datetime to search for data. 49 If `end` is `None`, do not bound 50 51 check_existing: bool, defult True 52 If `False`, use a backtrack interval of 0 minutes. 53 54 chunk_hook: Callable[[pd.DataFrame], Any], default None 55 A function to pass to `SQLConnector.read()` that accepts a Pandas DataFrame. 56 57 chunksize: Optional[int], default -1 58 How many rows to load into memory at once (when `chunk_hook` is provided). 59 Otherwise the entire result set is loaded into memory. 60 61 workers: Optional[int], default None 62 How many threads to use when consuming the generator (when `chunk_hook is provided). 63 Defaults to the number of cores. 64 65 debug: bool, default False 66 Verbosity toggle. 67 68 Returns 69 ------- 70 A pandas DataFrame or `None`. 71 If `chunk_hook` is not None, return a list of the hook function's results. 72 """ 73 meta_def = self.get_pipe_metadef( 74 pipe, 75 begin = begin, 76 end = end, 77 check_existing = check_existing, 78 debug = debug, 79 **kw 80 ) 81 as_hook_results = chunk_hook is not None 82 chunks = self.read( 83 meta_def, 84 chunk_hook = chunk_hook, 85 as_hook_results = as_hook_results, 86 chunksize = chunksize, 87 workers = workers, 88 debug = debug, 89 ) 90 ### if sqlite, parse for datetimes 91 if not as_hook_results and self.flavor == 'sqlite': 92 from meerschaum.utils.misc import parse_df_datetimes 93 ignore_cols = [ 94 col 95 for col, dtype in pipe.dtypes.items() 96 if 'datetime' not in str(dtype) 97 ] 98 return ( 99 parse_df_datetimes( 100 chunk, 101 ignore_cols = ignore_cols, 102 debug = debug, 103 ) 104 for chunk in chunks 105 ) 106 return chunks
Execute the SQL definition and return a Pandas DataFrame.
Parameters
pipe (meerschaum.Pipe): The pipe object which contains the
fetch
metadata.- pipe.columns['datetime']: str
- Name of the datetime column for the remote table.
- pipe.parameters['fetch']: Dict[str, Any]
- Parameters necessary to execute a query.
- pipe.parameters['fetch']['definition']: str
- Raw SQL query to execute to generate the pandas DataFrame.
- pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
- How many minutes before
begin
to search for data (optional).
- How many minutes before
- pipe.columns['datetime']: str
- begin (Union[datetime, int, str, None], default None):
Most recent datatime to search for data.
If
backtrack_minutes
is provided, subtractbacktrack_minutes
. - end (Union[datetime, int, str, None], default None):
The latest datetime to search for data.
If
end
isNone
, do not bound - check_existing (bool, defult True):
If
False
, use a backtrack interval of 0 minutes. - chunk_hook (Callable[[pd.DataFrame], Any], default None):
A function to pass to
SQLConnector.read()
that accepts a Pandas DataFrame. - chunksize (Optional[int], default -1):
How many rows to load into memory at once (when
chunk_hook
is provided). Otherwise the entire result set is loaded into memory. - workers (Optional[int], default None): How many threads to use when consuming the generator (when `chunk_hook is provided). Defaults to the number of cores.
- debug (bool, default False): Verbosity toggle.
Returns
- A pandas DataFrame or
None
. - If
chunk_hook
is not None, return a list of the hook function's results.
109def get_pipe_metadef( 110 self, 111 pipe: meerschaum.Pipe, 112 params: Optional[Dict[str, Any]] = None, 113 begin: Union[datetime, int, str, None] = '', 114 end: Union[datetime, int, str, None] = None, 115 check_existing: bool = True, 116 debug: bool = False, 117 **kw: Any 118 ) -> Union[str, None]: 119 """ 120 Return a pipe's meta definition fetch query. 121 122 params: Optional[Dict[str, Any]], default None 123 Optional params dictionary to build the `WHERE` clause. 124 See `meerschaum.utils.sql.build_where`. 125 126 begin: Union[datetime, int, str, None], default None 127 Most recent datatime to search for data. 128 If `backtrack_minutes` is provided, subtract `backtrack_minutes`. 129 130 end: Union[datetime, int, str, None], default None 131 The latest datetime to search for data. 132 If `end` is `None`, do not bound 133 134 check_existing: bool, default True 135 If `True`, apply the backtrack interval. 136 137 debug: bool, default False 138 Verbosity toggle. 139 140 Returns 141 ------- 142 A pipe's meta definition fetch query string. 143 """ 144 from meerschaum.utils.debug import dprint 145 from meerschaum.utils.warnings import warn, error 146 from meerschaum.utils.sql import sql_item_name, dateadd_str, build_where 147 from meerschaum.utils.misc import is_int 148 from meerschaum.config import get_config 149 150 definition = get_pipe_query(pipe) 151 152 if not pipe.columns.get('datetime', None): 153 _dt = pipe.guess_datetime() 154 dt_name = sql_item_name(_dt, self.flavor, None) if _dt else None 155 is_guess = True 156 else: 157 _dt = pipe.get_columns('datetime') 158 dt_name = sql_item_name(_dt, self.flavor, None) 159 is_guess = False 160 161 if begin not in (None, '') or end is not None: 162 if is_guess: 163 if _dt is None: 164 warn( 165 f"Unable to determine a datetime column for {pipe}." 166 + "\n Ignoring begin and end...", 167 stack = False, 168 ) 169 begin, end = '', None 170 else: 171 warn( 172 f"A datetime wasn't specified for {pipe}.\n" 173 + f" Using column \"{_dt}\" for datetime bounds...", 174 stack = False 175 ) 176 177 178 apply_backtrack = begin == '' and check_existing 179 backtrack_interval = pipe.get_backtrack_interval(check_existing=check_existing, debug=debug) 180 btm = ( 181 int(backtrack_interval.total_seconds() / 60) 182 if isinstance(backtrack_interval, timedelta) 183 else backtrack_interval 184 ) 185 begin = ( 186 pipe.get_sync_time(debug=debug) 187 if begin == '' 188 else begin 189 ) 190 191 if begin and end and begin >= end: 192 begin = None 193 194 da = None 195 if dt_name: 196 begin_da = ( 197 dateadd_str( 198 flavor = self.flavor, 199 datepart = 'minute', 200 number = ((-1 * btm) if apply_backtrack else 0), 201 begin = begin, 202 ) 203 if begin 204 else None 205 ) 206 end_da = ( 207 dateadd_str( 208 flavor = self.flavor, 209 datepart = 'minute', 210 number = 0, 211 begin = end, 212 ) 213 if end 214 else None 215 ) 216 217 meta_def = ( 218 _simple_fetch_query(pipe) if ( 219 (not (pipe.columns or {}).get('id', None)) 220 or (not get_config('system', 'experimental', 'join_fetch')) 221 ) else _join_fetch_query(pipe, debug=debug, **kw) 222 ) 223 224 has_where = 'where' in meta_def.lower()[meta_def.lower().rfind('definition'):] 225 if dt_name and (begin_da or end_da): 226 definition_dt_name = ( 227 dateadd_str(self.flavor, 'minute', 0, f"definition.{dt_name}") 228 if not is_int((begin_da or end_da)) 229 else f"definition.{dt_name}" 230 ) 231 meta_def += "\n" + ("AND" if has_where else "WHERE") + " " 232 has_where = True 233 if begin_da: 234 meta_def += f"{definition_dt_name} >= {begin_da}" 235 if begin_da and end_da: 236 meta_def += " AND " 237 if end_da: 238 meta_def += f"{definition_dt_name} < {end_da}" 239 240 if params is not None: 241 params_where = build_where(params, self, with_where=False) 242 meta_def += "\n" + ("AND" if has_where else "WHERE") + " " 243 has_where = True 244 meta_def += params_where 245 246 return meta_def
Return a pipe's meta definition fetch query.
params: Optional[Dict[str, Any]], default None
Optional params dictionary to build the WHERE
clause.
See meerschaum.utils.sql.build_where
.
begin: Union[datetime, int, str, None], default None
Most recent datatime to search for data.
If backtrack_minutes
is provided, subtract backtrack_minutes
.
end: Union[datetime, int, str, None], default None
The latest datetime to search for data.
If end
is None
, do not bound
check_existing: bool, default True
If True
, apply the backtrack interval.
debug: bool, default False Verbosity toggle.
Returns
- A pipe's meta definition fetch query string.
35def cli( 36 self, 37 debug: bool = False, 38 ) -> SuccessTuple: 39 """ 40 Launch a subprocess for an interactive CLI. 41 """ 42 from meerschaum.utils.venv import venv_exec 43 env = copy.deepcopy(dict(os.environ)) 44 env[f'MRSM_SQL_{self.label.upper()}'] = json.dumps(self.meta) 45 cli_code = ( 46 "import sys\n" 47 "import meerschaum as mrsm\n" 48 f"conn = mrsm.get_connector('sql:{self.label}')\n" 49 "success, msg = conn._cli_exit()\n" 50 "mrsm.pprint((success, msg))\n" 51 "if not success:\n" 52 " raise Exception(msg)" 53 ) 54 try: 55 _ = venv_exec(cli_code, venv=None, debug=debug, capture_output=False) 56 except Exception as e: 57 return False, f"[{self}] Failed to start CLI:\n{e}" 58 return True, "Success"
Launch a subprocess for an interactive CLI.
143def fetch_pipes_keys( 144 self, 145 connector_keys: Optional[List[str]] = None, 146 metric_keys: Optional[List[str]] = None, 147 location_keys: Optional[List[str]] = None, 148 tags: Optional[List[str]] = None, 149 params: Optional[Dict[str, Any]] = None, 150 debug: bool = False 151 ) -> Optional[List[Tuple[str, str, Optional[str]]]]: 152 """ 153 Return a list of tuples corresponding to the parameters provided. 154 155 Parameters 156 ---------- 157 connector_keys: Optional[List[str]], default None 158 List of connector_keys to search by. 159 160 metric_keys: Optional[List[str]], default None 161 List of metric_keys to search by. 162 163 location_keys: Optional[List[str]], default None 164 List of location_keys to search by. 165 166 params: Optional[Dict[str, Any]], default None 167 Dictionary of additional parameters to search by. 168 E.g. `--params pipe_id:1` 169 170 debug: bool, default False 171 Verbosity toggle. 172 """ 173 from meerschaum.utils.debug import dprint 174 from meerschaum.utils.packages import attempt_import 175 from meerschaum.utils.misc import separate_negation_values, flatten_list 176 from meerschaum.utils.sql import OMIT_NULLSFIRST_FLAVORS, table_exists 177 from meerschaum.config.static import STATIC_CONFIG 178 import json 179 from copy import deepcopy 180 sqlalchemy, sqlalchemy_sql_functions = attempt_import('sqlalchemy', 'sqlalchemy.sql.functions') 181 coalesce = sqlalchemy_sql_functions.coalesce 182 183 if connector_keys is None: 184 connector_keys = [] 185 if metric_keys is None: 186 metric_keys = [] 187 if location_keys is None: 188 location_keys = [] 189 else: 190 location_keys = [ 191 ( 192 lk 193 if lk not in ('[None]', 'None', 'null') 194 else 'None' 195 ) 196 for lk in location_keys 197 ] 198 if tags is None: 199 tags = [] 200 201 if params is None: 202 params = {} 203 204 ### Add three primary keys to params dictionary 205 ### (separated for convenience of arguments). 206 cols = { 207 'connector_keys': [str(ck) for ck in connector_keys], 208 'metric_key': [str(mk) for mk in metric_keys], 209 'location_key': [str(lk) for lk in location_keys], 210 } 211 212 ### Make deep copy so we don't mutate this somewhere else. 213 parameters = deepcopy(params) 214 for col, vals in cols.items(): 215 if vals not in [[], ['*']]: 216 parameters[col] = vals 217 218 if not table_exists('mrsm_pipes', self, schema=self.instance_schema, debug=debug): 219 return [] 220 221 from meerschaum.connectors.sql.tables import get_tables 222 pipes_tbl = get_tables(mrsm_instance=self, create=False, debug=debug)['pipes'] 223 224 _params = {} 225 for k, v in parameters.items(): 226 _v = json.dumps(v) if isinstance(v, dict) else v 227 _params[k] = _v 228 229 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 230 ### Parse regular params. 231 ### If a param begins with '_', negate it instead. 232 _where = [ 233 ( 234 (coalesce(pipes_tbl.c[key], 'None') == val) 235 if not str(val).startswith(negation_prefix) 236 else (pipes_tbl.c[key] != key) 237 ) for key, val in _params.items() 238 if not isinstance(val, (list, tuple)) and key in pipes_tbl.c 239 ] 240 select_cols = ( 241 [ 242 pipes_tbl.c.connector_keys, 243 pipes_tbl.c.metric_key, 244 pipes_tbl.c.location_key, 245 ] 246 ) 247 248 q = sqlalchemy.select(*select_cols).where(sqlalchemy.and_(True, *_where)) 249 for c, vals in cols.items(): 250 if not isinstance(vals, (list, tuple)) or not vals or not c in pipes_tbl.c: 251 continue 252 _in_vals, _ex_vals = separate_negation_values(vals) 253 q = q.where(coalesce(pipes_tbl.c[c], 'None').in_(_in_vals)) if _in_vals else q 254 q = q.where(coalesce(pipes_tbl.c[c], 'None').not_in(_ex_vals)) if _ex_vals else q 255 256 ### Finally, parse tags. 257 tag_groups = [tag.split(',') for tag in tags] 258 in_ex_tag_groups = [separate_negation_values(tag_group) for tag_group in tag_groups] 259 260 ors, nands = [], [] 261 for _in_tags, _ex_tags in in_ex_tag_groups: 262 sub_ands = [] 263 for nt in _in_tags: 264 sub_ands.append( 265 sqlalchemy.cast( 266 pipes_tbl.c['parameters'], 267 sqlalchemy.String, 268 ).like(f'%"tags":%"{nt}"%') 269 ) 270 if sub_ands: 271 ors.append(sqlalchemy.and_(*sub_ands)) 272 273 for xt in _ex_tags: 274 nands.append( 275 sqlalchemy.cast( 276 pipes_tbl.c['parameters'], 277 sqlalchemy.String, 278 ).not_like(f'%"tags":%"{xt}"%') 279 ) 280 281 q = q.where(sqlalchemy.and_(*nands)) if nands else q 282 q = q.where(sqlalchemy.or_(*ors)) if ors else q 283 loc_asc = sqlalchemy.asc(pipes_tbl.c['location_key']) 284 if self.flavor not in OMIT_NULLSFIRST_FLAVORS: 285 loc_asc = sqlalchemy.nullsfirst(loc_asc) 286 q = q.order_by( 287 sqlalchemy.asc(pipes_tbl.c['connector_keys']), 288 sqlalchemy.asc(pipes_tbl.c['metric_key']), 289 loc_asc, 290 ) 291 292 ### execute the query and return a list of tuples 293 if debug: 294 dprint(q.compile(compile_kwargs={'literal_binds': True})) 295 try: 296 rows = ( 297 self.execute(q).fetchall() 298 if self.flavor != 'duckdb' 299 else [ 300 (row['connector_keys'], row['metric_key'], row['location_key']) 301 for row in self.read(q).to_dict(orient='records') 302 ] 303 ) 304 except Exception as e: 305 error(str(e)) 306 307 return [(row[0], row[1], row[2]) for row in rows]
Return a list of tuples corresponding to the parameters provided.
Parameters
- connector_keys (Optional[List[str]], default None): List of connector_keys to search by.
- metric_keys (Optional[List[str]], default None): List of metric_keys to search by.
- location_keys (Optional[List[str]], default None): List of location_keys to search by.
- params (Optional[Dict[str, Any]], default None):
Dictionary of additional parameters to search by.
E.g.
--params pipe_id:1
- debug (bool, default False): Verbosity toggle.
310def create_indices( 311 self, 312 pipe: mrsm.Pipe, 313 indices: Optional[List[str]] = None, 314 debug: bool = False 315 ) -> bool: 316 """ 317 Create a pipe's indices. 318 """ 319 from meerschaum.utils.sql import sql_item_name, update_queries 320 from meerschaum.utils.debug import dprint 321 if debug: 322 dprint(f"Creating indices for {pipe}...") 323 if not pipe.columns: 324 warn(f"{pipe} has no index columns; skipping index creation.", stack=False) 325 return True 326 327 ix_queries = { 328 ix: queries 329 for ix, queries in self.get_create_index_queries(pipe, debug=debug).items() 330 if indices is None or ix in indices 331 } 332 success = True 333 for ix, queries in ix_queries.items(): 334 ix_success = all(self.exec_queries(queries, debug=debug, silent=False)) 335 success = success and ix_success 336 if not ix_success: 337 warn(f"Failed to create index on column: {ix}") 338 339 return success
Create a pipe's indices.
342def drop_indices( 343 self, 344 pipe: mrsm.Pipe, 345 indices: Optional[List[str]] = None, 346 debug: bool = False 347 ) -> bool: 348 """ 349 Drop a pipe's indices. 350 """ 351 from meerschaum.utils.debug import dprint 352 if debug: 353 dprint(f"Dropping indices for {pipe}...") 354 if not pipe.columns: 355 warn(f"Unable to drop indices for {pipe} without columns.", stack=False) 356 return False 357 ix_queries = { 358 ix: queries 359 for ix, queries in self.get_drop_index_queries(pipe, debug=debug).items() 360 if indices is None or ix in indices 361 } 362 success = True 363 for ix, queries in ix_queries.items(): 364 ix_success = all(self.exec_queries(queries, debug=debug, silent=True)) 365 if not ix_success: 366 success = False 367 if debug: 368 dprint(f"Failed to drop index on column: {ix}") 369 return success
Drop a pipe's indices.
372def get_create_index_queries( 373 self, 374 pipe: mrsm.Pipe, 375 debug: bool = False, 376 ) -> Dict[str, List[str]]: 377 """ 378 Return a dictionary mapping columns to a `CREATE INDEX` or equivalent query. 379 380 Parameters 381 ---------- 382 pipe: mrsm.Pipe 383 The pipe to which the queries will correspond. 384 385 Returns 386 ------- 387 A dictionary of column names mapping to lists of queries. 388 """ 389 ### NOTE: Due to recent breaking changes in DuckDB, indices don't behave properly. 390 if self.flavor == 'duckdb': 391 return {} 392 from meerschaum.utils.sql import ( 393 sql_item_name, 394 get_distinct_col_count, 395 update_queries, 396 get_null_replacement, 397 COALESCE_UNIQUE_INDEX_FLAVORS, 398 ) 399 from meerschaum.config import get_config 400 index_queries = {} 401 402 upsert = pipe.parameters.get('upsert', False) and (self.flavor + '-upsert') in update_queries 403 indices = pipe.get_indices() 404 405 _datetime = pipe.get_columns('datetime', error=False) 406 _datetime_type = pipe.dtypes.get(_datetime, 'datetime64[ns]') 407 _datetime_name = ( 408 sql_item_name(_datetime, self.flavor, None) 409 if _datetime is not None else None 410 ) 411 _datetime_index_name = ( 412 sql_item_name(indices['datetime'], self.flavor, None) 413 if indices.get('datetime', None) 414 else None 415 ) 416 _id = pipe.get_columns('id', error=False) 417 _id_name = ( 418 sql_item_name(_id, self.flavor, None) 419 if _id is not None 420 else None 421 ) 422 423 _id_index_name = ( 424 sql_item_name(indices['id'], self.flavor, None) 425 if indices.get('id', None) 426 else None 427 ) 428 _pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 429 _create_space_partition = get_config('system', 'experimental', 'space') 430 431 ### create datetime index 432 if _datetime is not None: 433 if self.flavor == 'timescaledb' and pipe.parameters.get('hypertable', True): 434 _id_count = ( 435 get_distinct_col_count(_id, f"SELECT {_id_name} FROM {_pipe_name}", self) 436 if (_id is not None and _create_space_partition) else None 437 ) 438 439 chunk_interval = pipe.get_chunk_interval(debug=debug) 440 chunk_interval_minutes = ( 441 chunk_interval 442 if isinstance(chunk_interval, int) 443 else int(chunk_interval.total_seconds() / 60) 444 ) 445 chunk_time_interval = ( 446 f"INTERVAL '{chunk_interval_minutes} MINUTES'" 447 if isinstance(chunk_interval, timedelta) 448 else f'{chunk_interval_minutes}' 449 ) 450 451 dt_query = ( 452 f"SELECT public.create_hypertable('{_pipe_name}', " + 453 f"'{_datetime}', " 454 + ( 455 f"'{_id}', {_id_count}, " if (_id is not None and _create_space_partition) 456 else '' 457 ) 458 + f'chunk_time_interval => {chunk_time_interval}, ' 459 + 'if_not_exists => true, ' 460 + "migrate_data => true);" 461 ) 462 else: ### mssql, sqlite, etc. 463 dt_query = ( 464 f"CREATE INDEX {_datetime_index_name} " 465 + f"ON {_pipe_name} ({_datetime_name})" 466 ) 467 468 index_queries[_datetime] = [dt_query] 469 470 ### create id index 471 if _id_name is not None: 472 if self.flavor == 'timescaledb': 473 ### Already created indices via create_hypertable. 474 id_query = ( 475 None if (_id is not None and _create_space_partition) 476 else ( 477 f"CREATE INDEX IF NOT EXISTS {_id_index_name} ON {_pipe_name} ({_id_name})" 478 if _id is not None 479 else None 480 ) 481 ) 482 pass 483 else: ### mssql, sqlite, etc. 484 id_query = f"CREATE INDEX {_id_index_name} ON {_pipe_name} ({_id_name})" 485 486 if id_query is not None: 487 index_queries[_id] = id_query if isinstance(id_query, list) else [id_query] 488 489 490 ### Create indices for other labels in `pipe.columns`. 491 other_indices = { 492 ix_key: ix_unquoted 493 for ix_key, ix_unquoted in pipe.get_indices().items() 494 if ix_key not in ('datetime', 'id') 495 } 496 for ix_key, ix_unquoted in other_indices.items(): 497 ix_name = sql_item_name(ix_unquoted, self.flavor, None) 498 col = pipe.columns[ix_key] 499 col_name = sql_item_name(col, self.flavor, None) 500 index_queries[col] = [f"CREATE INDEX {ix_name} ON {_pipe_name} ({col_name})"] 501 502 existing_cols_types = pipe.get_columns_types(debug=debug) 503 indices_cols_str = ', '.join( 504 [ 505 sql_item_name(ix, self.flavor) 506 for ix_key, ix in pipe.columns.items() 507 if ix and ix in existing_cols_types 508 ] 509 ) 510 coalesce_indices_cols_str = ', '.join( 511 [ 512 ( 513 "COALESCE(" 514 + sql_item_name(ix, self.flavor) 515 + ", " 516 + get_null_replacement(existing_cols_types[ix], self.flavor) 517 + ") " 518 ) if ix_key != 'datetime' else (sql_item_name(ix, self.flavor)) 519 for ix_key, ix in pipe.columns.items() 520 if ix and ix in existing_cols_types 521 ] 522 ) 523 unique_index_name = sql_item_name(pipe.target + '_unique_index', self.flavor) 524 constraint_name = sql_item_name(pipe.target + '_constraint', self.flavor) 525 add_constraint_query = ( 526 f"ALTER TABLE {_pipe_name} ADD CONSTRAINT {constraint_name} UNIQUE ({indices_cols_str})" 527 ) 528 unique_index_cols_str = ( 529 indices_cols_str 530 if self.flavor not in COALESCE_UNIQUE_INDEX_FLAVORS 531 else coalesce_indices_cols_str 532 ) 533 create_unique_index_query = ( 534 f"CREATE UNIQUE INDEX {unique_index_name} ON {_pipe_name} ({unique_index_cols_str})" 535 ) 536 constraint_queries = [create_unique_index_query] 537 if self.flavor != 'sqlite': 538 constraint_queries.append(add_constraint_query) 539 if upsert and indices_cols_str: 540 index_queries[unique_index_name] = constraint_queries 541 return index_queries
Return a dictionary mapping columns to a CREATE INDEX
or equivalent query.
Parameters
- pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
- A dictionary of column names mapping to lists of queries.
544def get_drop_index_queries( 545 self, 546 pipe: mrsm.Pipe, 547 debug: bool = False, 548 ) -> Dict[str, List[str]]: 549 """ 550 Return a dictionary mapping columns to a `DROP INDEX` or equivalent query. 551 552 Parameters 553 ---------- 554 pipe: mrsm.Pipe 555 The pipe to which the queries will correspond. 556 557 Returns 558 ------- 559 A dictionary of column names mapping to lists of queries. 560 """ 561 ### NOTE: Due to breaking changes within DuckDB, indices must be skipped. 562 if self.flavor == 'duckdb': 563 return {} 564 if not pipe.exists(debug=debug): 565 return {} 566 from meerschaum.utils.sql import sql_item_name, table_exists, hypertable_queries 567 drop_queries = {} 568 schema = self.get_pipe_schema(pipe) 569 schema_prefix = (schema + '_') if schema else '' 570 indices = { 571 col: schema_prefix + ix 572 for col, ix in pipe.get_indices().items() 573 } 574 pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 575 pipe_name_no_schema = sql_item_name(pipe.target, self.flavor, None) 576 577 if self.flavor not in hypertable_queries: 578 is_hypertable = False 579 else: 580 is_hypertable_query = hypertable_queries[self.flavor].format(table_name=pipe_name) 581 is_hypertable = self.value(is_hypertable_query, silent=True, debug=debug) is not None 582 583 if is_hypertable: 584 nuke_queries = [] 585 temp_table = '_' + pipe.target + '_temp_migration' 586 temp_table_name = sql_item_name(temp_table, self.flavor, self.get_pipe_schema(pipe)) 587 588 if table_exists(temp_table, self, schema=self.get_pipe_schema(pipe), debug=debug): 589 nuke_queries.append(f"DROP TABLE {temp_table_name}") 590 nuke_queries += [ 591 f"SELECT * INTO {temp_table_name} FROM {pipe_name}", 592 f"DROP TABLE {pipe_name}", 593 f"ALTER TABLE {temp_table_name} RENAME TO {pipe_name_no_schema}", 594 ] 595 nuke_ix_keys = ('datetime', 'id') 596 nuked = False 597 for ix_key in nuke_ix_keys: 598 if ix_key in indices and not nuked: 599 drop_queries[ix_key] = nuke_queries 600 nuked = True 601 602 drop_queries.update({ 603 ix_key: ["DROP INDEX " + sql_item_name(ix_unquoted, self.flavor, None)] 604 for ix_key, ix_unquoted in indices.items() 605 if ix_key not in drop_queries 606 }) 607 return drop_queries
Return a dictionary mapping columns to a DROP INDEX
or equivalent query.
Parameters
- pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
- A dictionary of column names mapping to lists of queries.
2414def get_add_columns_queries( 2415 self, 2416 pipe: mrsm.Pipe, 2417 df: Union[pd.DataFrame, Dict[str, str]], 2418 debug: bool = False, 2419 ) -> List[str]: 2420 """ 2421 Add new null columns of the correct type to a table from a dataframe. 2422 2423 Parameters 2424 ---------- 2425 pipe: mrsm.Pipe 2426 The pipe to be altered. 2427 2428 df: Union[pd.DataFrame, Dict[str, str]] 2429 The pandas DataFrame which contains new columns. 2430 If a dictionary is provided, assume it maps columns to Pandas data types. 2431 2432 Returns 2433 ------- 2434 A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector. 2435 """ 2436 if not pipe.exists(debug=debug): 2437 return [] 2438 2439 from decimal import Decimal 2440 import copy 2441 from meerschaum.utils.sql import ( 2442 sql_item_name, 2443 SINGLE_ALTER_TABLE_FLAVORS, 2444 ) 2445 from meerschaum.utils.dtypes.sql import ( 2446 get_pd_type_from_db_type, 2447 get_db_type_from_pd_type, 2448 ) 2449 from meerschaum.utils.misc import flatten_list 2450 table_obj = self.get_pipe_table(pipe, debug=debug) 2451 is_dask = 'dask' in df.__module__ if not isinstance(df, dict) else False 2452 if is_dask: 2453 df = df.partitions[0].compute() 2454 df_cols_types = ( 2455 { 2456 col: str(typ) 2457 for col, typ in df.dtypes.items() 2458 } 2459 if not isinstance(df, dict) 2460 else copy.deepcopy(df) 2461 ) 2462 if not isinstance(df, dict) and len(df.index) > 0: 2463 for col, typ in list(df_cols_types.items()): 2464 if typ != 'object': 2465 continue 2466 val = df.iloc[0][col] 2467 if isinstance(val, (dict, list)): 2468 df_cols_types[col] = 'json' 2469 elif isinstance(val, Decimal): 2470 df_cols_types[col] = 'numeric' 2471 elif isinstance(val, str): 2472 df_cols_types[col] = 'str' 2473 db_cols_types = { 2474 col: get_pd_type_from_db_type(str(typ.type)) 2475 for col, typ in table_obj.columns.items() 2476 } 2477 new_cols = set(df_cols_types) - set(db_cols_types) 2478 if not new_cols: 2479 return [] 2480 2481 new_cols_types = { 2482 col: get_db_type_from_pd_type( 2483 df_cols_types[col], 2484 self.flavor 2485 ) for col in new_cols 2486 } 2487 2488 alter_table_query = "ALTER TABLE " + sql_item_name( 2489 pipe.target, self.flavor, self.get_pipe_schema(pipe) 2490 ) 2491 queries = [] 2492 for col, typ in new_cols_types.items(): 2493 add_col_query = ( 2494 "\nADD " 2495 + sql_item_name(col, self.flavor, None) 2496 + " " + typ + "," 2497 ) 2498 2499 if self.flavor in SINGLE_ALTER_TABLE_FLAVORS: 2500 queries.append(alter_table_query + add_col_query[:-1]) 2501 else: 2502 alter_table_query += add_col_query 2503 2504 ### For most flavors, only one query is required. 2505 ### This covers SQLite which requires one query per column. 2506 if not queries: 2507 queries.append(alter_table_query[:-1]) 2508 2509 if self.flavor != 'duckdb': 2510 return queries 2511 2512 ### NOTE: For DuckDB, we must drop and rebuild the indices. 2513 drop_index_queries = list(flatten_list( 2514 [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()] 2515 )) 2516 create_index_queries = list(flatten_list( 2517 [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()] 2518 )) 2519 2520 return drop_index_queries + queries + create_index_queries
Add new null columns of the correct type to a table from a dataframe.
Parameters
- pipe (mrsm.Pipe): The pipe to be altered.
- df (Union[pd.DataFrame, Dict[str, str]]): The pandas DataFrame which contains new columns. If a dictionary is provided, assume it maps columns to Pandas data types.
Returns
- A list of the
ALTER TABLE
SQL query or queries to be executed on the provided connector.
2523def get_alter_columns_queries( 2524 self, 2525 pipe: mrsm.Pipe, 2526 df: Union[pd.DataFrame, Dict[str, str]], 2527 debug: bool = False, 2528 ) -> List[str]: 2529 """ 2530 If we encounter a column of a different type, set the entire column to text. 2531 If the altered columns are numeric, alter to numeric instead. 2532 2533 Parameters 2534 ---------- 2535 pipe: mrsm.Pipe 2536 The pipe to be altered. 2537 2538 df: Union[pd.DataFrame, Dict[str, str]] 2539 The pandas DataFrame which may contain altered columns. 2540 If a dict is provided, assume it maps columns to Pandas data types. 2541 2542 Returns 2543 ------- 2544 A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector. 2545 """ 2546 if not pipe.exists(debug=debug): 2547 return [] 2548 from meerschaum.utils.sql import sql_item_name 2549 from meerschaum.utils.dataframe import get_numeric_cols 2550 from meerschaum.utils.dtypes import are_dtypes_equal 2551 from meerschaum.utils.dtypes.sql import ( 2552 get_pd_type_from_db_type, 2553 get_db_type_from_pd_type, 2554 ) 2555 from meerschaum.utils.misc import flatten_list, generate_password, items_str 2556 table_obj = self.get_pipe_table(pipe, debug=debug) 2557 target = pipe.target 2558 session_id = generate_password(3) 2559 numeric_cols = ( 2560 get_numeric_cols(df) 2561 if not isinstance(df, dict) 2562 else [ 2563 col 2564 for col, typ in df.items() 2565 if typ == 'numeric' 2566 ] 2567 ) 2568 df_cols_types = ( 2569 { 2570 col: str(typ) 2571 for col, typ in df.dtypes.items() 2572 } 2573 if not isinstance(df, dict) 2574 else df 2575 ) 2576 db_cols_types = { 2577 col: get_pd_type_from_db_type(str(typ.type)) 2578 for col, typ in table_obj.columns.items() 2579 } 2580 pipe_bool_cols = [col for col, typ in pipe.dtypes.items() if are_dtypes_equal(str(typ), 'bool')] 2581 pd_db_df_aliases = { 2582 'int': 'bool', 2583 'float': 'bool', 2584 'numeric': 'bool', 2585 } 2586 if self.flavor == 'oracle': 2587 pd_db_df_aliases['int'] = 'numeric' 2588 2589 altered_cols = { 2590 col: (db_cols_types.get(col, 'object'), typ) 2591 for col, typ in df_cols_types.items() 2592 if not are_dtypes_equal(typ, db_cols_types.get(col, 'object').lower()) 2593 and not are_dtypes_equal(db_cols_types.get(col, 'object'), 'string') 2594 } 2595 2596 ### NOTE: Sometimes bools are coerced into ints or floats. 2597 altered_cols_to_ignore = set() 2598 for col, (db_typ, df_typ) in altered_cols.items(): 2599 for db_alias, df_alias in pd_db_df_aliases.items(): 2600 if db_alias in db_typ.lower() and df_alias in df_typ.lower(): 2601 altered_cols_to_ignore.add(col) 2602 2603 ### Oracle's bool handling sometimes mixes NUMBER and INT. 2604 for bool_col in pipe_bool_cols: 2605 if bool_col not in altered_cols: 2606 continue 2607 db_is_bool_compatible = ( 2608 are_dtypes_equal('int', altered_cols[bool_col][0]) 2609 or are_dtypes_equal('float', altered_cols[bool_col][0]) 2610 or are_dtypes_equal('numeric', altered_cols[bool_col][0]) 2611 or are_dtypes_equal('bool', altered_cols[bool_col][0]) 2612 ) 2613 df_is_bool_compatible = ( 2614 are_dtypes_equal('int', altered_cols[bool_col][1]) 2615 or are_dtypes_equal('float', altered_cols[bool_col][1]) 2616 or are_dtypes_equal('numeric', altered_cols[bool_col][1]) 2617 or are_dtypes_equal('bool', altered_cols[bool_col][1]) 2618 ) 2619 if db_is_bool_compatible and df_is_bool_compatible: 2620 altered_cols_to_ignore.add(bool_col) 2621 2622 for col in altered_cols_to_ignore: 2623 _ = altered_cols.pop(col, None) 2624 if not altered_cols: 2625 return [] 2626 2627 if numeric_cols: 2628 pipe.dtypes.update({col: 'numeric' for col in numeric_cols}) 2629 edit_success, edit_msg = pipe.edit(debug=debug) 2630 if not edit_success: 2631 warn( 2632 f"Failed to update dtypes for numeric columns {items_str(numeric_cols)}:\n" 2633 + f"{edit_msg}" 2634 ) 2635 else: 2636 numeric_cols.extend([col for col, typ in pipe.dtypes.items() if typ == 'numeric']) 2637 2638 pipe_dtypes = pipe.dtypes 2639 numeric_type = get_db_type_from_pd_type('numeric', self.flavor, as_sqlalchemy=False) 2640 text_type = get_db_type_from_pd_type('str', self.flavor, as_sqlalchemy=False) 2641 altered_cols_types = { 2642 col: ( 2643 numeric_type 2644 if col in numeric_cols 2645 else text_type 2646 ) 2647 for col, (db_typ, typ) in altered_cols.items() 2648 } 2649 2650 if self.flavor == 'sqlite': 2651 temp_table_name = '-' + session_id + '_' + target 2652 rename_query = ( 2653 "ALTER TABLE " 2654 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2655 + " RENAME TO " 2656 + sql_item_name(temp_table_name, self.flavor, None) 2657 ) 2658 create_query = ( 2659 "CREATE TABLE " 2660 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2661 + " (\n" 2662 ) 2663 for col_name, col_obj in table_obj.columns.items(): 2664 create_query += ( 2665 sql_item_name(col_name, self.flavor, None) 2666 + " " 2667 + ( 2668 str(col_obj.type) 2669 if col_name not in altered_cols 2670 else altered_cols_types[col_name] 2671 ) 2672 + ",\n" 2673 ) 2674 create_query = create_query[:-2] + "\n)" 2675 2676 insert_query = ( 2677 "INSERT INTO " 2678 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2679 + ' (' 2680 + ', '.join([ 2681 sql_item_name(col_name, self.flavor, None) 2682 for col_name, _ in table_obj.columns.items() 2683 ]) 2684 + ')' 2685 + "\nSELECT\n" 2686 ) 2687 for col_name, col_obj in table_obj.columns.items(): 2688 new_col_str = ( 2689 sql_item_name(col_name, self.flavor, None) 2690 if col_name not in altered_cols 2691 else ( 2692 "CAST(" 2693 + sql_item_name(col_name, self.flavor, None) 2694 + " AS " 2695 + altered_cols_types[col_name] 2696 + ")" 2697 ) 2698 ) 2699 insert_query += new_col_str + ",\n" 2700 insert_query = insert_query[:-2] + ( 2701 f"\nFROM {sql_item_name(temp_table_name, self.flavor, self.get_pipe_schema(pipe))}" 2702 ) 2703 2704 drop_query = "DROP TABLE " + sql_item_name( 2705 temp_table_name, self.flavor, self.get_pipe_schema(pipe) 2706 ) 2707 return [ 2708 rename_query, 2709 create_query, 2710 insert_query, 2711 drop_query, 2712 ] 2713 2714 queries = [] 2715 if self.flavor == 'oracle': 2716 for col, typ in altered_cols_types.items(): 2717 add_query = ( 2718 "ALTER TABLE " 2719 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2720 + "\nADD " + sql_item_name(col + '_temp', self.flavor, None) 2721 + " " + typ 2722 ) 2723 queries.append(add_query) 2724 2725 for col, typ in altered_cols_types.items(): 2726 populate_temp_query = ( 2727 "UPDATE " 2728 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2729 + "\nSET " + sql_item_name(col + '_temp', self.flavor, None) 2730 + ' = ' + sql_item_name(col, self.flavor, None) 2731 ) 2732 queries.append(populate_temp_query) 2733 2734 for col, typ in altered_cols_types.items(): 2735 set_old_cols_to_null_query = ( 2736 "UPDATE " 2737 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2738 + "\nSET " + sql_item_name(col, self.flavor, None) 2739 + ' = NULL' 2740 ) 2741 queries.append(set_old_cols_to_null_query) 2742 2743 for col, typ in altered_cols_types.items(): 2744 alter_type_query = ( 2745 "ALTER TABLE " 2746 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2747 + "\nMODIFY " + sql_item_name(col, self.flavor, None) + ' ' 2748 + typ 2749 ) 2750 queries.append(alter_type_query) 2751 2752 for col, typ in altered_cols_types.items(): 2753 set_old_to_temp_query = ( 2754 "UPDATE " 2755 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2756 + "\nSET " + sql_item_name(col, self.flavor, None) 2757 + ' = ' + sql_item_name(col + '_temp', self.flavor, None) 2758 ) 2759 queries.append(set_old_to_temp_query) 2760 2761 for col, typ in altered_cols_types.items(): 2762 drop_temp_query = ( 2763 "ALTER TABLE " 2764 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2765 + "\nDROP COLUMN " + sql_item_name(col + '_temp', self.flavor, None) 2766 ) 2767 queries.append(drop_temp_query) 2768 2769 return queries 2770 2771 2772 query = "ALTER TABLE " + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2773 for col, typ in altered_cols_types.items(): 2774 alter_col_prefix = ( 2775 'ALTER' if self.flavor not in ('mysql', 'mariadb', 'oracle') 2776 else 'MODIFY' 2777 ) 2778 type_prefix = ( 2779 '' if self.flavor in ('mssql', 'mariadb', 'mysql') 2780 else 'TYPE ' 2781 ) 2782 column_str = 'COLUMN' if self.flavor != 'oracle' else '' 2783 query += ( 2784 f"\n{alter_col_prefix} {column_str} " 2785 + sql_item_name(col, self.flavor, None) 2786 + " " + type_prefix + typ + "," 2787 ) 2788 2789 query = query[:-1] 2790 queries.append(query) 2791 if self.flavor != 'duckdb': 2792 return queries 2793 2794 drop_index_queries = list(flatten_list( 2795 [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()] 2796 )) 2797 create_index_queries = list(flatten_list( 2798 [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()] 2799 )) 2800 2801 return drop_index_queries + queries + create_index_queries
If we encounter a column of a different type, set the entire column to text. If the altered columns are numeric, alter to numeric instead.
Parameters
- pipe (mrsm.Pipe): The pipe to be altered.
- df (Union[pd.DataFrame, Dict[str, str]]): The pandas DataFrame which may contain altered columns. If a dict is provided, assume it maps columns to Pandas data types.
Returns
- A list of the
ALTER TABLE
SQL query or queries to be executed on the provided connector.
610def delete_pipe( 611 self, 612 pipe: mrsm.Pipe, 613 debug: bool = False, 614 ) -> SuccessTuple: 615 """ 616 Delete a Pipe's registration. 617 """ 618 from meerschaum.utils.sql import sql_item_name 619 from meerschaum.utils.debug import dprint 620 from meerschaum.utils.packages import attempt_import 621 sqlalchemy = attempt_import('sqlalchemy') 622 623 if not pipe.id: 624 return False, f"{pipe} is not registered." 625 626 ### ensure pipes table exists 627 from meerschaum.connectors.sql.tables import get_tables 628 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 629 630 q = sqlalchemy.delete(pipes_tbl).where(pipes_tbl.c.pipe_id == pipe.id) 631 if not self.exec(q, debug=debug): 632 return False, f"Failed to delete registration for {pipe}." 633 634 return True, "Success"
Delete a Pipe's registration.
637def get_pipe_data( 638 self, 639 pipe: mrsm.Pipe, 640 select_columns: Optional[List[str]] = None, 641 omit_columns: Optional[List[str]] = None, 642 begin: Union[datetime, str, None] = None, 643 end: Union[datetime, str, None] = None, 644 params: Optional[Dict[str, Any]] = None, 645 order: str = 'asc', 646 limit: Optional[int] = None, 647 begin_add_minutes: int = 0, 648 end_add_minutes: int = 0, 649 debug: bool = False, 650 **kw: Any 651 ) -> Union[pd.DataFrame, None]: 652 """ 653 Access a pipe's data from the SQL instance. 654 655 Parameters 656 ---------- 657 pipe: mrsm.Pipe: 658 The pipe to get data from. 659 660 select_columns: Optional[List[str]], default None 661 If provided, only select these given columns. 662 Otherwise select all available columns (i.e. `SELECT *`). 663 664 omit_columns: Optional[List[str]], default None 665 If provided, remove these columns from the selection. 666 667 begin: Union[datetime, str, None], default None 668 If provided, get rows newer than or equal to this value. 669 670 end: Union[datetime, str, None], default None 671 If provided, get rows older than or equal to this value. 672 673 params: Optional[Dict[str, Any]], default None 674 Additional parameters to filter by. 675 See `meerschaum.connectors.sql.build_where`. 676 677 order: Optional[str], default 'asc' 678 The selection order for all of the indices in the query. 679 If `None`, omit the `ORDER BY` clause. 680 681 limit: Optional[int], default None 682 If specified, limit the number of rows retrieved to this value. 683 684 begin_add_minutes: int, default 0 685 The number of minutes to add to the `begin` datetime (i.e. `DATEADD`. 686 687 end_add_minutes: int, default 0 688 The number of minutes to add to the `end` datetime (i.e. `DATEADD`. 689 690 chunksize: Optional[int], default -1 691 The size of dataframe chunks to load into memory. 692 693 debug: bool, default False 694 Verbosity toggle. 695 696 Returns 697 ------- 698 A `pd.DataFrame` of the pipe's data. 699 700 """ 701 import json 702 from meerschaum.utils.sql import sql_item_name 703 from meerschaum.utils.misc import parse_df_datetimes, to_pandas_dtype 704 from meerschaum.utils.packages import import_pandas 705 from meerschaum.utils.dtypes import attempt_cast_to_numeric 706 pd = import_pandas() 707 is_dask = 'dask' in pd.__name__ 708 709 dtypes = pipe.dtypes 710 if dtypes: 711 if self.flavor == 'sqlite': 712 if not pipe.columns.get('datetime', None): 713 _dt = pipe.guess_datetime() 714 dt = sql_item_name(_dt, self.flavor, None) if _dt else None 715 is_guess = True 716 else: 717 _dt = pipe.get_columns('datetime') 718 dt = sql_item_name(_dt, self.flavor, None) 719 is_guess = False 720 721 if _dt: 722 dt_type = dtypes.get(_dt, 'object').lower() 723 if 'datetime' not in dt_type: 724 if 'int' not in dt_type: 725 dtypes[_dt] = 'datetime64[ns]' 726 existing_cols = pipe.get_columns_types(debug=debug) 727 select_columns = ( 728 [ 729 col 730 for col in existing_cols 731 if col not in (omit_columns or []) 732 ] 733 if not select_columns 734 else [ 735 col 736 for col in select_columns 737 if col in existing_cols 738 and col not in (omit_columns or []) 739 ] 740 ) 741 if select_columns: 742 dtypes = {col: typ for col, typ in dtypes.items() if col in select_columns} 743 dtypes = { 744 col: to_pandas_dtype(typ) 745 for col, typ in dtypes.items() 746 if col in select_columns and col not in (omit_columns or []) 747 } 748 query = self.get_pipe_data_query( 749 pipe, 750 select_columns = select_columns, 751 omit_columns = omit_columns, 752 begin = begin, 753 end = end, 754 params = params, 755 order = order, 756 limit = limit, 757 begin_add_minutes = begin_add_minutes, 758 end_add_minutes = end_add_minutes, 759 debug = debug, 760 **kw 761 ) 762 763 if is_dask: 764 index_col = pipe.columns.get('datetime', None) 765 kw['index_col'] = index_col 766 767 numeric_columns = [ 768 col 769 for col, typ in pipe.dtypes.items() 770 if typ == 'numeric' and col in dtypes 771 ] 772 kw['coerce_float'] = kw.get('coerce_float', (len(numeric_columns) == 0)) 773 774 df = self.read( 775 query, 776 dtype = dtypes, 777 debug = debug, 778 **kw 779 ) 780 for col in numeric_columns: 781 if col not in df.columns: 782 continue 783 df[col] = df[col].apply(attempt_cast_to_numeric) 784 785 if self.flavor == 'sqlite': 786 ### NOTE: We have to consume the iterator here to ensure that datetimes are parsed correctly 787 df = ( 788 parse_df_datetimes( 789 df, 790 ignore_cols = [ 791 col 792 for col, dtype in pipe.dtypes.items() 793 if 'datetime' not in str(dtype) 794 ], 795 chunksize = kw.get('chunksize', None), 796 debug = debug, 797 ) if isinstance(df, pd.DataFrame) else ( 798 [ 799 parse_df_datetimes( 800 c, 801 ignore_cols = [ 802 col 803 for col, dtype in pipe.dtypes.items() 804 if 'datetime' not in str(dtype) 805 ], 806 chunksize = kw.get('chunksize', None), 807 debug = debug, 808 ) 809 for c in df 810 ] 811 ) 812 ) 813 for col, typ in dtypes.items(): 814 if typ != 'json': 815 continue 816 df[col] = df[col].apply(lambda x: json.loads(x) if x is not None else x) 817 return df
Access a pipe's data from the SQL instance.
Parameters
- pipe (mrsm.Pipe:): The pipe to get data from.
- select_columns (Optional[List[str]], default None):
If provided, only select these given columns.
Otherwise select all available columns (i.e.
SELECT *
). - omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
- begin (Union[datetime, str, None], default None): If provided, get rows newer than or equal to this value.
- end (Union[datetime, str, None], default None): If provided, get rows older than or equal to this value.
- params (Optional[Dict[str, Any]], default None):
Additional parameters to filter by.
See
meerschaum.connectors.sql.build_where
. - order (Optional[str], default 'asc'):
The selection order for all of the indices in the query.
If
None
, omit theORDER BY
clause. - limit (Optional[int], default None): If specified, limit the number of rows retrieved to this value.
- begin_add_minutes (int, default 0):
The number of minutes to add to the
begin
datetime (i.e.DATEADD
. - end_add_minutes (int, default 0):
The number of minutes to add to the
end
datetime (i.e.DATEADD
. - chunksize (Optional[int], default -1): The size of dataframe chunks to load into memory.
- debug (bool, default False): Verbosity toggle.
Returns
- A
pd.DataFrame
of the pipe's data.
820def get_pipe_data_query( 821 self, 822 pipe: mrsm.Pipe, 823 select_columns: Optional[List[str]] = None, 824 omit_columns: Optional[List[str]] = None, 825 begin: Union[datetime, int, str, None] = None, 826 end: Union[datetime, int, str, None] = None, 827 params: Optional[Dict[str, Any]] = None, 828 order: str = 'asc', 829 limit: Optional[int] = None, 830 begin_add_minutes: int = 0, 831 end_add_minutes: int = 0, 832 replace_nulls: Optional[str] = None, 833 debug: bool = False, 834 **kw: Any 835 ) -> Union[str, None]: 836 """ 837 Return the `SELECT` query for retrieving a pipe's data from its instance. 838 839 Parameters 840 ---------- 841 pipe: mrsm.Pipe: 842 The pipe to get data from. 843 844 select_columns: Optional[List[str]], default None 845 If provided, only select these given columns. 846 Otherwise select all available columns (i.e. `SELECT *`). 847 848 omit_columns: Optional[List[str]], default None 849 If provided, remove these columns from the selection. 850 851 begin: Union[datetime, int, str, None], default None 852 If provided, get rows newer than or equal to this value. 853 854 end: Union[datetime, str, None], default None 855 If provided, get rows older than or equal to this value. 856 857 params: Optional[Dict[str, Any]], default None 858 Additional parameters to filter by. 859 See `meerschaum.connectors.sql.build_where`. 860 861 order: Optional[str], default 'asc' 862 The selection order for all of the indices in the query. 863 If `None`, omit the `ORDER BY` clause. 864 865 limit: Optional[int], default None 866 If specified, limit the number of rows retrieved to this value. 867 868 begin_add_minutes: int, default 0 869 The number of minutes to add to the `begin` datetime (i.e. `DATEADD`). 870 871 end_add_minutes: int, default 0 872 The number of minutes to add to the `end` datetime (i.e. `DATEADD`). 873 874 chunksize: Optional[int], default -1 875 The size of dataframe chunks to load into memory. 876 877 replace_nulls: Optional[str], default None 878 If provided, replace null values with this value. 879 880 debug: bool, default False 881 Verbosity toggle. 882 883 Returns 884 ------- 885 A `SELECT` query to retrieve a pipe's data. 886 """ 887 import json 888 from meerschaum.utils.debug import dprint 889 from meerschaum.utils.misc import items_str 890 from meerschaum.utils.sql import sql_item_name, dateadd_str 891 from meerschaum.utils.packages import import_pandas 892 pd = import_pandas() 893 existing_cols = pipe.get_columns_types(debug=debug) 894 select_columns = ( 895 [col for col in existing_cols] 896 if not select_columns 897 else [col for col in select_columns if col in existing_cols] 898 ) 899 if omit_columns: 900 select_columns = [col for col in select_columns if col not in omit_columns] 901 902 if begin == '': 903 begin = pipe.get_sync_time(debug=debug) 904 backtrack_interval = pipe.get_backtrack_interval(debug=debug) 905 if begin is not None: 906 begin -= backtrack_interval 907 908 cols_names = [sql_item_name(col, self.flavor, None) for col in select_columns] 909 select_cols_str = ( 910 'SELECT\n' 911 + ',\n '.join( 912 [ 913 ( 914 col_name 915 if not replace_nulls 916 else f"COALESCE(col_name, '{replace_nulls}') AS {col_name}" 917 ) 918 for col_name in cols_names 919 ] 920 ) 921 ) 922 pipe_table_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 923 query = f"{select_cols_str}\nFROM {pipe_table_name}" 924 where = "" 925 926 if order is not None: 927 default_order = 'asc' 928 if order not in ('asc', 'desc'): 929 warn(f"Ignoring unsupported order '{order}'. Falling back to '{default_order}'.") 930 order = default_order 931 order = order.upper() 932 933 if not pipe.columns.get('datetime', None): 934 _dt = pipe.guess_datetime() 935 dt = sql_item_name(_dt, self.flavor, None) if _dt else None 936 is_guess = True 937 else: 938 _dt = pipe.get_columns('datetime') 939 dt = sql_item_name(_dt, self.flavor, None) 940 is_guess = False 941 942 quoted_indices = { 943 key: sql_item_name(val, self.flavor, None) 944 for key, val in pipe.columns.items() 945 if val in existing_cols 946 } 947 948 if begin is not None or end is not None: 949 if is_guess: 950 if _dt is None: 951 warn( 952 f"No datetime could be determined for {pipe}." 953 + "\n Ignoring begin and end...", 954 stack = False, 955 ) 956 begin, end = None, None 957 else: 958 warn( 959 f"A datetime wasn't specified for {pipe}.\n" 960 + f" Using column \"{_dt}\" for datetime bounds...", 961 stack = False, 962 ) 963 964 is_dt_bound = False 965 if begin is not None and _dt in existing_cols: 966 begin_da = dateadd_str( 967 flavor = self.flavor, 968 datepart = 'minute', 969 number = begin_add_minutes, 970 begin = begin 971 ) 972 where += f"{dt} >= {begin_da}" + (" AND " if end is not None else "") 973 is_dt_bound = True 974 975 if end is not None and _dt in existing_cols: 976 if 'int' in str(type(end)).lower() and end == begin: 977 end += 1 978 end_da = dateadd_str( 979 flavor = self.flavor, 980 datepart = 'minute', 981 number = end_add_minutes, 982 begin = end 983 ) 984 where += f"{dt} < {end_da}" 985 is_dt_bound = True 986 987 if params is not None: 988 from meerschaum.utils.sql import build_where 989 valid_params = {k: v for k, v in params.items() if k in existing_cols} 990 if valid_params: 991 where += build_where(valid_params, self).replace( 992 'WHERE', ('AND' if is_dt_bound else "") 993 ) 994 995 if len(where) > 0: 996 query += "\nWHERE " + where 997 998 if order is not None: 999 ### Sort by indices, starting with datetime. 1000 order_by = "" 1001 if quoted_indices: 1002 order_by += "\nORDER BY " 1003 if _dt and _dt in existing_cols: 1004 order_by += dt + ' ' + order + ',' 1005 for key, quoted_col_name in quoted_indices.items(): 1006 if key == 'datetime': 1007 continue 1008 order_by += ' ' + quoted_col_name + ' ' + order + ',' 1009 order_by = order_by[:-1] 1010 1011 query += order_by 1012 1013 if isinstance(limit, int): 1014 if self.flavor == 'mssql': 1015 query = f'SELECT TOP {limit} ' + query[len("SELECT *"):] 1016 elif self.flavor == 'oracle': 1017 query = f"SELECT * FROM (\n {query}\n)\nWHERE ROWNUM = 1" 1018 else: 1019 query += f"\nLIMIT {limit}" 1020 1021 if debug: 1022 to_print = ( 1023 [] 1024 + ([f"begin='{begin}'"] if begin else []) 1025 + ([f"end='{end}'"] if end else []) 1026 + ([f"params={params}"] if params else []) 1027 ) 1028 dprint("Getting pipe data with constraints: " + items_str(to_print, quotes=False)) 1029 1030 return query
Return the SELECT
query for retrieving a pipe's data from its instance.
Parameters
- pipe (mrsm.Pipe:): The pipe to get data from.
- select_columns (Optional[List[str]], default None):
If provided, only select these given columns.
Otherwise select all available columns (i.e.
SELECT *
). - omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
- begin (Union[datetime, int, str, None], default None): If provided, get rows newer than or equal to this value.
- end (Union[datetime, str, None], default None): If provided, get rows older than or equal to this value.
- params (Optional[Dict[str, Any]], default None):
Additional parameters to filter by.
See
meerschaum.connectors.sql.build_where
. - order (Optional[str], default 'asc'):
The selection order for all of the indices in the query.
If
None
, omit theORDER BY
clause. - limit (Optional[int], default None): If specified, limit the number of rows retrieved to this value.
- begin_add_minutes (int, default 0):
The number of minutes to add to the
begin
datetime (i.e.DATEADD
). - end_add_minutes (int, default 0):
The number of minutes to add to the
end
datetime (i.e.DATEADD
). - chunksize (Optional[int], default -1): The size of dataframe chunks to load into memory.
- replace_nulls (Optional[str], default None): If provided, replace null values with this value.
- debug (bool, default False): Verbosity toggle.
Returns
- A
SELECT
query to retrieve a pipe's data.
18def register_pipe( 19 self, 20 pipe: mrsm.Pipe, 21 debug: bool = False, 22 ) -> SuccessTuple: 23 """ 24 Register a new pipe. 25 A pipe's attributes must be set before registering. 26 """ 27 from meerschaum.utils.debug import dprint 28 from meerschaum.utils.packages import attempt_import 29 from meerschaum.utils.sql import json_flavors 30 31 ### ensure pipes table exists 32 from meerschaum.connectors.sql.tables import get_tables 33 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 34 35 if pipe.get_id(debug=debug) is not None: 36 return False, f"{pipe} is already registered." 37 38 ### NOTE: if `parameters` is supplied in the Pipe constructor, 39 ### then `pipe.parameters` will exist and not be fetched from the database. 40 41 ### 1. Prioritize the Pipe object's `parameters` first. 42 ### E.g. if the user manually sets the `parameters` property 43 ### or if the Pipe already exists 44 ### (which shouldn't be able to be registered anyway but that's an issue for later). 45 parameters = None 46 try: 47 parameters = pipe.parameters 48 except Exception as e: 49 if debug: 50 dprint(str(e)) 51 parameters = None 52 53 ### ensure `parameters` is a dictionary 54 if parameters is None: 55 parameters = {} 56 57 import json 58 sqlalchemy = attempt_import('sqlalchemy') 59 values = { 60 'connector_keys' : pipe.connector_keys, 61 'metric_key' : pipe.metric_key, 62 'location_key' : pipe.location_key, 63 'parameters' : ( 64 json.dumps(parameters) 65 if self.flavor not in json_flavors 66 else parameters 67 ), 68 } 69 query = sqlalchemy.insert(pipes_tbl).values(**values) 70 result = self.exec(query, debug=debug) 71 if result is None: 72 return False, f"Failed to register {pipe}." 73 return True, f"Successfully registered {pipe}."
Register a new pipe. A pipe's attributes must be set before registering.
76def edit_pipe( 77 self, 78 pipe : mrsm.Pipe = None, 79 patch: bool = False, 80 debug: bool = False, 81 **kw : Any 82 ) -> SuccessTuple: 83 """ 84 Persist a Pipe's parameters to its database. 85 86 Parameters 87 ---------- 88 pipe: mrsm.Pipe, default None 89 The pipe to be edited. 90 patch: bool, default False 91 If patch is `True`, update the existing parameters by cascading. 92 Otherwise overwrite the parameters (default). 93 debug: bool, default False 94 Verbosity toggle. 95 """ 96 97 if pipe.id is None: 98 return False, f"{pipe} is not registered and cannot be edited." 99 100 from meerschaum.utils.debug import dprint 101 from meerschaum.utils.packages import attempt_import 102 from meerschaum.utils.sql import json_flavors 103 if not patch: 104 parameters = pipe.__dict__.get('_attributes', {}).get('parameters', {}) 105 else: 106 from meerschaum import Pipe 107 from meerschaum.config._patch import apply_patch_to_config 108 original_parameters = Pipe( 109 pipe.connector_keys, pipe.metric_key, pipe.location_key, 110 mrsm_instance=pipe.instance_keys 111 ).parameters 112 parameters = apply_patch_to_config( 113 original_parameters, 114 pipe.parameters 115 ) 116 117 ### ensure pipes table exists 118 from meerschaum.connectors.sql.tables import get_tables 119 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 120 121 import json 122 sqlalchemy = attempt_import('sqlalchemy') 123 124 values = { 125 'parameters': ( 126 json.dumps(parameters) 127 if self.flavor not in json_flavors 128 else parameters 129 ), 130 } 131 q = sqlalchemy.update(pipes_tbl).values(**values).where( 132 pipes_tbl.c.pipe_id == pipe.id 133 ) 134 135 result = self.exec(q, debug=debug) 136 message = ( 137 f"Successfully edited {pipe}." 138 if result is not None else f"Failed to edit {pipe}." 139 ) 140 return (result is not None), message
Persist a Pipe's parameters to its database.
Parameters
- pipe (mrsm.Pipe, default None): The pipe to be edited.
- patch (bool, default False):
If patch is
True
, update the existing parameters by cascading. Otherwise overwrite the parameters (default). - debug (bool, default False): Verbosity toggle.
1033def get_pipe_id( 1034 self, 1035 pipe: mrsm.Pipe, 1036 debug: bool = False, 1037 ) -> Any: 1038 """ 1039 Get a Pipe's ID from the pipes table. 1040 """ 1041 if pipe.temporary: 1042 return None 1043 from meerschaum.utils.packages import attempt_import 1044 import json 1045 sqlalchemy = attempt_import('sqlalchemy') 1046 from meerschaum.connectors.sql.tables import get_tables 1047 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 1048 1049 query = sqlalchemy.select(pipes_tbl.c.pipe_id).where( 1050 pipes_tbl.c.connector_keys == pipe.connector_keys 1051 ).where( 1052 pipes_tbl.c.metric_key == pipe.metric_key 1053 ).where( 1054 (pipes_tbl.c.location_key == pipe.location_key) if pipe.location_key is not None 1055 else pipes_tbl.c.location_key.is_(None) 1056 ) 1057 _id = self.value(query, debug=debug, silent=pipe.temporary) 1058 if _id is not None: 1059 _id = int(_id) 1060 return _id
Get a Pipe's ID from the pipes table.
1063def get_pipe_attributes( 1064 self, 1065 pipe: mrsm.Pipe, 1066 debug: bool = False, 1067 ) -> Dict[str, Any]: 1068 """ 1069 Get a Pipe's attributes dictionary. 1070 """ 1071 from meerschaum.connectors.sql.tables import get_tables 1072 from meerschaum.utils.packages import attempt_import 1073 sqlalchemy = attempt_import('sqlalchemy') 1074 1075 if pipe.get_id(debug=debug) is None: 1076 return {} 1077 1078 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 1079 1080 try: 1081 q = sqlalchemy.select(pipes_tbl).where(pipes_tbl.c.pipe_id == pipe.id) 1082 if debug: 1083 dprint(q) 1084 attributes = ( 1085 dict(self.exec(q, silent=True, debug=debug).first()._mapping) 1086 if self.flavor != 'duckdb' 1087 else self.read(q, debug=debug).to_dict(orient='records')[0] 1088 ) 1089 except Exception as e: 1090 import traceback 1091 traceback.print_exc() 1092 warn(e) 1093 print(pipe) 1094 return {} 1095 1096 ### handle non-PostgreSQL databases (text vs JSON) 1097 if not isinstance(attributes.get('parameters', None), dict): 1098 try: 1099 import json 1100 parameters = json.loads(attributes['parameters']) 1101 if isinstance(parameters, str) and parameters[0] == '{': 1102 parameters = json.loads(parameters) 1103 attributes['parameters'] = parameters 1104 except Exception as e: 1105 attributes['parameters'] = {} 1106 1107 return attributes
Get a Pipe's attributes dictionary.
1110def sync_pipe( 1111 self, 1112 pipe: mrsm.Pipe, 1113 df: Union[pd.DataFrame, str, Dict[Any, Any], None] = None, 1114 begin: Optional[datetime] = None, 1115 end: Optional[datetime] = None, 1116 chunksize: Optional[int] = -1, 1117 check_existing: bool = True, 1118 blocking: bool = True, 1119 debug: bool = False, 1120 _check_temporary_tables: bool = True, 1121 **kw: Any 1122 ) -> SuccessTuple: 1123 """ 1124 Sync a pipe using a database connection. 1125 1126 Parameters 1127 ---------- 1128 pipe: mrsm.Pipe 1129 The Meerschaum Pipe instance into which to sync the data. 1130 1131 df: Union[pandas.DataFrame, str, Dict[Any, Any], List[Dict[str, Any]]] 1132 An optional DataFrame or equivalent to sync into the pipe. 1133 Defaults to `None`. 1134 1135 begin: Optional[datetime], default None 1136 Optionally specify the earliest datetime to search for data. 1137 Defaults to `None`. 1138 1139 end: Optional[datetime], default None 1140 Optionally specify the latest datetime to search for data. 1141 Defaults to `None`. 1142 1143 chunksize: Optional[int], default -1 1144 Specify the number of rows to sync per chunk. 1145 If `-1`, resort to system configuration (default is `900`). 1146 A `chunksize` of `None` will sync all rows in one transaction. 1147 Defaults to `-1`. 1148 1149 check_existing: bool, default True 1150 If `True`, pull and diff with existing data from the pipe. Defaults to `True`. 1151 1152 blocking: bool, default True 1153 If `True`, wait for sync to finish and return its result, otherwise asyncronously sync. 1154 Defaults to `True`. 1155 1156 debug: bool, default False 1157 Verbosity toggle. Defaults to False. 1158 1159 kw: Any 1160 Catch-all for keyword arguments. 1161 1162 Returns 1163 ------- 1164 A `SuccessTuple` of success (`bool`) and message (`str`). 1165 """ 1166 from meerschaum.utils.packages import import_pandas 1167 from meerschaum.utils.sql import get_update_queries, sql_item_name, json_flavors, update_queries 1168 from meerschaum.utils.misc import generate_password 1169 from meerschaum.utils.dataframe import get_json_cols, get_numeric_cols 1170 from meerschaum.utils.dtypes import are_dtypes_equal 1171 from meerschaum import Pipe 1172 import time 1173 import copy 1174 pd = import_pandas() 1175 if df is None: 1176 msg = f"DataFrame is None. Cannot sync {pipe}." 1177 warn(msg) 1178 return False, msg 1179 1180 start = time.perf_counter() 1181 1182 if not pipe.temporary and not pipe.get_id(debug=debug): 1183 register_tuple = pipe.register(debug=debug) 1184 if not register_tuple[0]: 1185 return register_tuple 1186 1187 ### df is the dataframe returned from the remote source 1188 ### via the connector 1189 if debug: 1190 dprint("Fetched data:\n" + str(df)) 1191 1192 if not isinstance(df, pd.DataFrame): 1193 df = pipe.enforce_dtypes( 1194 df, 1195 chunksize = chunksize, 1196 safe_copy = kw.get('safe_copy', False), 1197 debug = debug, 1198 ) 1199 1200 ### if table does not exist, create it with indices 1201 is_new = False 1202 add_cols_query = None 1203 if not pipe.exists(debug=debug): 1204 check_existing = False 1205 is_new = True 1206 else: 1207 ### Check for new columns. 1208 add_cols_queries = self.get_add_columns_queries(pipe, df, debug=debug) 1209 if add_cols_queries: 1210 if not self.exec_queries(add_cols_queries, debug=debug): 1211 warn(f"Failed to add new columns to {pipe}.") 1212 1213 alter_cols_queries = self.get_alter_columns_queries(pipe, df, debug=debug) 1214 if alter_cols_queries: 1215 if not self.exec_queries(alter_cols_queries, debug=debug): 1216 warn(f"Failed to alter columns for {pipe}.") 1217 else: 1218 _ = pipe.infer_dtypes(persist=True) 1219 1220 ### NOTE: Oracle SQL < 23c (2023) and SQLite does not support booleans, 1221 ### so infer bools and persist them to `dtypes`. 1222 ### MSSQL supports `BIT` for booleans, but we coerce bools to int for MSSQL 1223 ### to avoid merge issues. 1224 if self.flavor in ('oracle', 'sqlite', 'mssql', 'mysql', 'mariadb'): 1225 pipe_dtypes = pipe.dtypes 1226 new_bool_cols = { 1227 col: 'bool[pyarrow]' 1228 for col, typ in df.dtypes.items() 1229 if col not in pipe_dtypes 1230 and are_dtypes_equal(str(typ), 'bool') 1231 } 1232 pipe_dtypes.update(new_bool_cols) 1233 pipe.dtypes = pipe_dtypes 1234 if not pipe.temporary: 1235 infer_bool_success, infer_bool_msg = pipe.edit(debug=debug) 1236 if not infer_bool_success: 1237 return infer_bool_success, infer_bool_msg 1238 1239 upsert = pipe.parameters.get('upsert', False) and (self.flavor + '-upsert') in update_queries 1240 if upsert: 1241 check_existing = False 1242 kw['safe_copy'] = kw.get('safe_copy', False) 1243 1244 unseen_df, update_df, delta_df = ( 1245 pipe.filter_existing( 1246 df, 1247 chunksize = chunksize, 1248 debug = debug, 1249 **kw 1250 ) if check_existing else (df, None, df) 1251 ) 1252 if upsert: 1253 unseen_df, update_df, delta_df = (df.head(0), df, df) 1254 1255 if debug: 1256 dprint("Delta data:\n" + str(delta_df)) 1257 dprint("Unseen data:\n" + str(unseen_df)) 1258 if update_df is not None: 1259 dprint(("Update" if not upsert else "Upsert") + " data:\n" + str(update_df)) 1260 1261 if_exists = kw.get('if_exists', 'append') 1262 if 'if_exists' in kw: 1263 kw.pop('if_exists') 1264 if 'name' in kw: 1265 kw.pop('name') 1266 1267 ### Account for first-time syncs of JSON columns. 1268 unseen_json_cols = get_json_cols(unseen_df) 1269 update_json_cols = get_json_cols(update_df) if update_df is not None else [] 1270 json_cols = list(set(unseen_json_cols + update_json_cols)) 1271 existing_json_cols = [col for col, typ in pipe.dtypes.items() if typ == 'json'] 1272 new_json_cols = [col for col in json_cols if col not in existing_json_cols] 1273 if new_json_cols: 1274 pipe.dtypes.update({col: 'json' for col in json_cols}) 1275 if not pipe.temporary: 1276 edit_success, edit_msg = pipe.edit(interactive=False, debug=debug) 1277 if not edit_success: 1278 warn(f"Unable to update JSON dtypes for {pipe}:\n{edit_msg}") 1279 1280 unseen_numeric_cols = get_numeric_cols(unseen_df) 1281 update_numeric_cols = get_numeric_cols(update_df) if update_df is not None else [] 1282 numeric_cols = list(set(unseen_numeric_cols + update_numeric_cols)) 1283 existing_numeric_cols = [col for col, typ in pipe.dtypes.items() if typ == 'numeric'] 1284 new_numeric_cols = [col for col in numeric_cols if col not in existing_numeric_cols] 1285 if new_numeric_cols: 1286 pipe.dtypes.update({col: 'numeric' for col in numeric_cols}) 1287 if not pipe.temporary: 1288 edit_success, edit_msg = pipe.edit(interactive=False, debug=debug) 1289 if not edit_success: 1290 warn(f"Unable to update NUMERIC dtypes for {pipe}:\n{edit_msg}") 1291 1292 ### Insert new data into Pipe's table. 1293 unseen_kw = copy.deepcopy(kw) 1294 unseen_kw.update({ 1295 'name': pipe.target, 1296 'if_exists': if_exists, 1297 'debug': debug, 1298 'as_dict': True, 1299 'chunksize': chunksize, 1300 'dtype': self.get_to_sql_dtype(pipe, unseen_df, update_dtypes=True), 1301 'schema': self.get_pipe_schema(pipe), 1302 }) 1303 1304 stats = self.to_sql(unseen_df, **unseen_kw) 1305 if is_new: 1306 if not self.create_indices(pipe, debug=debug): 1307 warn(f"Failed to create indices for {pipe}. Continuing...") 1308 1309 if update_df is not None and len(update_df) > 0: 1310 dt_col = pipe.columns.get('datetime', None) 1311 dt_typ = pipe.dtypes.get(dt_col, None) 1312 dt_name = sql_item_name(dt_col, self.flavor) if dt_col else None 1313 update_min = update_df[dt_col].min() if dt_col and dt_col in update_df.columns else None 1314 update_max = update_df[dt_col].max() if dt_col and dt_col in update_df.columns else None 1315 update_begin = update_min 1316 update_end = ( 1317 update_max 1318 + ( 1319 timedelta(minutes=1) 1320 if are_dtypes_equal(str(dt_typ), 'datetime') 1321 else 1 1322 ) 1323 ) if dt_col else None 1324 1325 transact_id = generate_password(3) 1326 temp_target = '-' + transact_id + '_' + pipe.target 1327 self._log_temporary_tables_creation(temp_target, create=(not pipe.temporary), debug=debug) 1328 temp_pipe = Pipe( 1329 pipe.connector_keys.replace(':', '_') + '_', pipe.metric_key, pipe.location_key, 1330 instance = pipe.instance_keys, 1331 columns = { 1332 ix_key: ix 1333 for ix_key, ix in pipe.columns.items() 1334 if ix and ix in update_df.columns 1335 }, 1336 dtypes = pipe.dtypes, 1337 target = temp_target, 1338 temporary = True, 1339 parameters = { 1340 'schema': self.internal_schema, 1341 'hypertable': False, 1342 }, 1343 ) 1344 temp_pipe.sync(update_df, check_existing=False, debug=debug) 1345 existing_cols = pipe.get_columns_types(debug=debug) 1346 join_cols = [ 1347 col 1348 for col_key, col in pipe.columns.items() 1349 if col and col in existing_cols 1350 ] 1351 update_queries = get_update_queries( 1352 pipe.target, 1353 temp_target, 1354 self, 1355 join_cols, 1356 upsert = upsert, 1357 schema = self.get_pipe_schema(pipe), 1358 patch_schema = self.internal_schema, 1359 datetime_col = pipe.columns.get('datetime', None), 1360 debug = debug, 1361 ) 1362 update_success = all( 1363 self.exec_queries(update_queries, break_on_error=True, rollback=True, debug=debug) 1364 ) 1365 self._log_temporary_tables_creation( 1366 temp_target, 1367 ready_to_drop = True, 1368 create = (not pipe.temporary), 1369 debug = debug, 1370 ) 1371 if not update_success: 1372 warn(f"Failed to apply update to {pipe}.") 1373 1374 stop = time.perf_counter() 1375 success = stats['success'] 1376 if not success: 1377 return success, stats['msg'] 1378 1379 unseen_count = len(unseen_df.index) if unseen_df is not None else 0 1380 update_count = len(update_df.index) if update_df is not None else 0 1381 msg = ( 1382 ( 1383 f"Inserted {unseen_count}, " 1384 + f"updated {update_count} rows." 1385 ) 1386 if not upsert 1387 else ( 1388 f"Upserted {update_count} row" 1389 + ('s' if update_count != 1 else '') 1390 + "." 1391 ) 1392 ) 1393 if debug: 1394 msg = msg[:-1] + ( 1395 f"\non table {sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))}\n" 1396 + f"in {round(stop - start, 2)} seconds." 1397 ) 1398 1399 if _check_temporary_tables: 1400 drop_stale_success, drop_stale_msg = self._drop_old_temporary_tables( 1401 refresh=False, debug=debug 1402 ) 1403 if not drop_stale_success: 1404 warn(drop_stale_msg) 1405 1406 return success, msg
Sync a pipe using a database connection.
Parameters
- pipe (mrsm.Pipe): The Meerschaum Pipe instance into which to sync the data.
- df (Union[pandas.DataFrame, str, Dict[Any, Any], List[Dict[str, Any]]]):
An optional DataFrame or equivalent to sync into the pipe.
Defaults to
None
. - begin (Optional[datetime], default None):
Optionally specify the earliest datetime to search for data.
Defaults to
None
. - end (Optional[datetime], default None):
Optionally specify the latest datetime to search for data.
Defaults to
None
. - chunksize (Optional[int], default -1):
Specify the number of rows to sync per chunk.
If
-1
, resort to system configuration (default is900
). Achunksize
ofNone
will sync all rows in one transaction. Defaults to-1
. - check_existing (bool, default True):
If
True
, pull and diff with existing data from the pipe. Defaults toTrue
. - blocking (bool, default True):
If
True
, wait for sync to finish and return its result, otherwise asyncronously sync. Defaults toTrue
. - debug (bool, default False): Verbosity toggle. Defaults to False.
- kw (Any): Catch-all for keyword arguments.
Returns
- A
SuccessTuple
of success (bool
) and message (str
).
1409def sync_pipe_inplace( 1410 self, 1411 pipe: 'mrsm.Pipe', 1412 params: Optional[Dict[str, Any]] = None, 1413 begin: Optional[datetime] = None, 1414 end: Optional[datetime] = None, 1415 chunksize: Optional[int] = -1, 1416 check_existing: bool = True, 1417 debug: bool = False, 1418 **kw: Any 1419 ) -> SuccessTuple: 1420 """ 1421 If a pipe's connector is the same as its instance connector, 1422 it's more efficient to sync the pipe in-place rather than reading data into Pandas. 1423 1424 Parameters 1425 ---------- 1426 pipe: mrsm.Pipe 1427 The pipe whose connector is the same as its instance. 1428 1429 params: Optional[Dict[str, Any]], default None 1430 Optional params dictionary to build the `WHERE` clause. 1431 See `meerschaum.utils.sql.build_where`. 1432 1433 begin: Optional[datetime], default None 1434 Optionally specify the earliest datetime to search for data. 1435 Defaults to `None`. 1436 1437 end: Optional[datetime], default None 1438 Optionally specify the latest datetime to search for data. 1439 Defaults to `None`. 1440 1441 chunksize: Optional[int], default -1 1442 Specify the number of rows to sync per chunk. 1443 If `-1`, resort to system configuration (default is `900`). 1444 A `chunksize` of `None` will sync all rows in one transaction. 1445 Defaults to `-1`. 1446 1447 check_existing: bool, default True 1448 If `True`, pull and diff with existing data from the pipe. 1449 1450 debug: bool, default False 1451 Verbosity toggle. 1452 1453 Returns 1454 ------- 1455 A SuccessTuple. 1456 """ 1457 if self.flavor == 'duckdb': 1458 return pipe.sync( 1459 params = params, 1460 begin = begin, 1461 end = end, 1462 chunksize = chunksize, 1463 check_existing = check_existing, 1464 debug = debug, 1465 _inplace = False, 1466 **kw 1467 ) 1468 from meerschaum.utils.sql import ( 1469 sql_item_name, 1470 get_sqlalchemy_table, 1471 get_update_queries, 1472 get_null_replacement, 1473 NO_CTE_FLAVORS, 1474 NO_SELECT_INTO_FLAVORS, 1475 format_cte_subquery, 1476 get_create_table_query, 1477 get_table_cols_types, 1478 truncate_item_name, 1479 session_execute, 1480 table_exists, 1481 update_queries, 1482 ) 1483 from meerschaum.utils.dtypes.sql import ( 1484 get_pd_type_from_db_type, 1485 ) 1486 from meerschaum.utils.misc import generate_password 1487 from meerschaum.utils.debug import dprint 1488 1489 transact_id = generate_password(3) 1490 def get_temp_table_name(label: str) -> str: 1491 return '-' + transact_id + '_' + label + '_' + pipe.target 1492 1493 internal_schema = self.internal_schema 1494 temp_table_roots = ['backtrack', 'new', 'delta', 'joined', 'unseen', 'update'] 1495 temp_tables = { 1496 table_root: get_temp_table_name(table_root) 1497 for table_root in temp_table_roots 1498 } 1499 temp_table_names = { 1500 table_root: sql_item_name( 1501 table_name_raw, 1502 self.flavor, 1503 internal_schema, 1504 ) 1505 for table_root, table_name_raw in temp_tables.items() 1506 } 1507 metadef = self.get_pipe_metadef( 1508 pipe, 1509 params = params, 1510 begin = begin, 1511 end = end, 1512 check_existing = check_existing, 1513 debug = debug, 1514 ) 1515 pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 1516 upsert = pipe.parameters.get('upsert', False) and f'{self.flavor}-upsert' in update_queries 1517 database = getattr(self, 'database', self.parse_uri(self.URI).get('database', None)) 1518 1519 def clean_up_temp_tables(ready_to_drop: bool = False): 1520 log_success, log_msg = self._log_temporary_tables_creation( 1521 [ 1522 table 1523 for table in temp_tables.values() 1524 ] if not upsert else [temp_tables['update']], 1525 ready_to_drop = ready_to_drop, 1526 create = (not pipe.temporary), 1527 debug = debug, 1528 ) 1529 if not log_success: 1530 warn(log_msg) 1531 drop_stale_success, drop_stale_msg = self._drop_old_temporary_tables( 1532 refresh = False, 1533 debug = debug, 1534 ) 1535 if not drop_stale_success: 1536 warn(drop_stale_msg) 1537 return drop_stale_success, drop_stale_msg 1538 1539 sqlalchemy, sqlalchemy_orm = mrsm.attempt_import('sqlalchemy', 'sqlalchemy.orm') 1540 if not pipe.exists(debug=debug): 1541 create_pipe_query = get_create_table_query( 1542 metadef, 1543 pipe.target, 1544 self.flavor, 1545 schema = self.get_pipe_schema(pipe), 1546 ) 1547 result = self.exec(create_pipe_query, debug=debug) 1548 if result is None: 1549 _ = clean_up_temp_tables() 1550 return False, f"Could not insert new data into {pipe} from its SQL query definition." 1551 1552 if not self.create_indices(pipe, debug=debug): 1553 warn(f"Failed to create indices for {pipe}. Continuing...") 1554 1555 rowcount = pipe.get_rowcount(debug=debug) 1556 _ = clean_up_temp_tables() 1557 return True, f"Inserted {rowcount}, updated 0 rows." 1558 1559 session = sqlalchemy_orm.Session(self.engine) 1560 connectable = session if self.flavor != 'duckdb' else self 1561 1562 create_new_query = get_create_table_query( 1563 metadef, 1564 temp_tables[('new') if not upsert else 'update'], 1565 self.flavor, 1566 schema = internal_schema, 1567 ) 1568 (create_new_success, create_new_msg), create_new_results = session_execute( 1569 session, 1570 create_new_query, 1571 with_results = True, 1572 debug = debug, 1573 ) 1574 if not create_new_success: 1575 _ = clean_up_temp_tables() 1576 return create_new_success, create_new_msg 1577 new_count = create_new_results[0].rowcount if create_new_results else 0 1578 1579 new_cols_types = get_table_cols_types( 1580 temp_tables[('new' if not upsert else 'update')], 1581 connectable = connectable, 1582 flavor = self.flavor, 1583 schema = internal_schema, 1584 database = database, 1585 debug = debug, 1586 ) 1587 if not new_cols_types: 1588 return False, f"Failed to get new columns for {pipe}." 1589 1590 new_cols = { 1591 str(col_name): get_pd_type_from_db_type(str(col_type)) 1592 for col_name, col_type in new_cols_types.items() 1593 } 1594 new_cols_str = ', '.join([ 1595 sql_item_name(col, self.flavor) 1596 for col in new_cols 1597 ]) 1598 1599 add_cols_queries = self.get_add_columns_queries(pipe, new_cols, debug=debug) 1600 if add_cols_queries: 1601 self.exec_queries(add_cols_queries, debug=debug) 1602 1603 alter_cols_queries = self.get_alter_columns_queries(pipe, new_cols, debug=debug) 1604 if alter_cols_queries: 1605 self.exec_queries(alter_cols_queries, debug=debug) 1606 1607 insert_queries = [ 1608 ( 1609 f"INSERT INTO {pipe_name} ({new_cols_str})\n" 1610 + f"SELECT {new_cols_str}\nFROM {temp_table_names['new']}" 1611 ) 1612 ] if not check_existing and not upsert else [] 1613 1614 new_queries = insert_queries 1615 new_success, new_msg = ( 1616 session_execute(session, new_queries, debug=debug) 1617 if new_queries 1618 else (True, "Success") 1619 ) 1620 if not new_success: 1621 _ = clean_up_temp_tables() 1622 return new_success, new_msg 1623 1624 if not check_existing: 1625 session.commit() 1626 _ = clean_up_temp_tables() 1627 return True, f"Inserted {new_count}, updated 0 rows." 1628 1629 backtrack_def = self.get_pipe_data_query( 1630 pipe, 1631 begin = begin, 1632 end = end, 1633 begin_add_minutes = 0, 1634 end_add_minutes = 1, 1635 params = params, 1636 debug = debug, 1637 order = None, 1638 ) 1639 1640 select_backtrack_query = format_cte_subquery( 1641 backtrack_def, 1642 self.flavor, 1643 sub_name = 'backtrack_def', 1644 ) 1645 create_backtrack_query = get_create_table_query( 1646 backtrack_def, 1647 temp_tables['backtrack'], 1648 self.flavor, 1649 schema = internal_schema, 1650 ) 1651 (create_backtrack_success, create_backtrack_msg), create_backtrack_results = session_execute( 1652 session, 1653 create_backtrack_query, 1654 with_results = True, 1655 debug = debug, 1656 ) if not upsert else (True, "Success"), None 1657 1658 if not create_backtrack_success: 1659 _ = clean_up_temp_tables() 1660 return create_backtrack_success, create_backtrack_msg 1661 bactrack_count = create_backtrack_results[0].rowcount if create_backtrack_results else 0 1662 1663 backtrack_cols_types = get_table_cols_types( 1664 temp_tables['backtrack'], 1665 connectable = connectable, 1666 flavor = self.flavor, 1667 schema = internal_schema, 1668 database = database, 1669 debug = debug, 1670 ) if not upsert else new_cols_types 1671 1672 common_cols = [col for col in new_cols if col in backtrack_cols_types] 1673 on_cols = { 1674 col: new_cols.get(col, 'object') 1675 for col_key, col in pipe.columns.items() 1676 if ( 1677 col 1678 and 1679 col_key != 'value' 1680 and col in backtrack_cols_types 1681 and col in new_cols 1682 ) 1683 } 1684 1685 null_replace_new_cols_str = ( 1686 ', '.join([ 1687 f"COALESCE({temp_table_names['new']}.{sql_item_name(col, self.flavor, None)}, " 1688 + f"{get_null_replacement(typ, self.flavor)}) AS " 1689 + sql_item_name(col, self.flavor, None) 1690 for col, typ in new_cols.items() 1691 ]) 1692 ) 1693 1694 select_delta_query = ( 1695 f"SELECT\n" 1696 + null_replace_new_cols_str + "\n" 1697 + f"\nFROM {temp_table_names['new']}\n" 1698 + f"LEFT OUTER JOIN {temp_table_names['backtrack']}\nON\n" 1699 + '\nAND\n'.join([ 1700 ( 1701 f"COALESCE({temp_table_names['new']}." 1702 + sql_item_name(c, self.flavor, None) 1703 + ", " 1704 + get_null_replacement(new_cols[c], self.flavor) 1705 + ") " 1706 + ' = ' 1707 + f"COALESCE({temp_table_names['backtrack']}." 1708 + sql_item_name(c, self.flavor, None) 1709 + ", " 1710 + get_null_replacement(backtrack_cols_types[c], self.flavor) 1711 + ") " 1712 ) for c in common_cols 1713 ]) 1714 + "\nWHERE\n" 1715 + '\nAND\n'.join([ 1716 ( 1717 f"{temp_table_names['backtrack']}." + sql_item_name(c, self.flavor, None) + ' IS NULL' 1718 ) for c in common_cols 1719 ]) 1720 ) 1721 create_delta_query = get_create_table_query( 1722 select_delta_query, 1723 temp_tables['delta'], 1724 self.flavor, 1725 schema = internal_schema, 1726 ) 1727 create_delta_success, create_delta_msg = session_execute( 1728 session, 1729 create_delta_query, 1730 debug = debug, 1731 ) if not upsert else (True, "Success") 1732 if not create_delta_success: 1733 _ = clean_up_temp_tables() 1734 return create_delta_success, create_delta_msg 1735 1736 delta_cols_types = get_table_cols_types( 1737 temp_tables['delta'], 1738 connectable = connectable, 1739 flavor = self.flavor, 1740 schema = internal_schema, 1741 database = database, 1742 debug = debug, 1743 ) if not upsert else new_cols_types 1744 1745 ### This is a weird bug on SQLite. 1746 ### Sometimes the backtrack dtypes are all empty strings. 1747 if not all(delta_cols_types.values()): 1748 delta_cols_types = new_cols_types 1749 1750 delta_cols = { 1751 col: get_pd_type_from_db_type(typ) 1752 for col, typ in delta_cols_types.items() 1753 } 1754 delta_cols_str = ', '.join([ 1755 sql_item_name(col, self.flavor) 1756 for col in delta_cols 1757 ]) 1758 1759 select_joined_query = ( 1760 "SELECT " 1761 + (', '.join([ 1762 ( 1763 f"{temp_table_names['delta']}." + sql_item_name(c, self.flavor, None) 1764 + " AS " + sql_item_name(c + '_delta', self.flavor, None) 1765 ) for c in delta_cols 1766 ])) 1767 + ", " 1768 + (', '.join([ 1769 ( 1770 f"{temp_table_names['backtrack']}." + sql_item_name(c, self.flavor, None) 1771 + " AS " + sql_item_name(c + '_backtrack', self.flavor, None) 1772 ) for c in backtrack_cols_types 1773 ])) 1774 + f"\nFROM {temp_table_names['delta']}\n" 1775 + f"LEFT OUTER JOIN {temp_table_names['backtrack']}\nON\n" 1776 + '\nAND\n'.join([ 1777 ( 1778 f"COALESCE({temp_table_names['delta']}." + sql_item_name(c, self.flavor, None) 1779 + ", " + get_null_replacement(typ, self.flavor) + ")" 1780 + ' = ' 1781 + f"COALESCE({temp_table_names['backtrack']}." + sql_item_name(c, self.flavor, None) 1782 + ", " + get_null_replacement(typ, self.flavor) + ")" 1783 ) for c, typ in on_cols.items() 1784 ]) 1785 ) 1786 1787 create_joined_query = get_create_table_query( 1788 select_joined_query, 1789 temp_tables['joined'], 1790 self.flavor, 1791 schema = internal_schema, 1792 ) 1793 create_joined_success, create_joined_msg = session_execute( 1794 session, 1795 create_joined_query, 1796 debug = debug, 1797 ) if on_cols and not upsert else (True, "Success") 1798 if not create_joined_success: 1799 _ = clean_up_temp_tables() 1800 return create_joined_success, create_joined_msg 1801 1802 select_unseen_query = ( 1803 "SELECT " 1804 + (', '.join([ 1805 ( 1806 "CASE\n WHEN " + sql_item_name(c + '_delta', self.flavor, None) 1807 + " != " + get_null_replacement(typ, self.flavor) 1808 + " THEN " + sql_item_name(c + '_delta', self.flavor, None) 1809 + "\n ELSE NULL\nEND " 1810 + " AS " + sql_item_name(c, self.flavor, None) 1811 ) for c, typ in delta_cols.items() 1812 ])) 1813 + f"\nFROM {temp_table_names['joined']}\n" 1814 + f"WHERE " 1815 + '\nAND\n'.join([ 1816 ( 1817 sql_item_name(c + '_backtrack', self.flavor, None) + ' IS NULL' 1818 ) for c in delta_cols 1819 ]) 1820 ) 1821 create_unseen_query = get_create_table_query( 1822 select_unseen_query, 1823 temp_tables['unseen'], 1824 self.flavor, 1825 internal_schema, 1826 ) 1827 (create_unseen_success, create_unseen_msg), create_unseen_results = session_execute( 1828 session, 1829 create_unseen_query, 1830 with_results = True, 1831 debug = debug 1832 ) if not upsert else (True, "Success"), None 1833 if not create_unseen_success: 1834 _ = clean_up_temp_tables() 1835 return create_unseen_success, create_unseen_msg 1836 1837 select_update_query = ( 1838 "SELECT " 1839 + (', '.join([ 1840 ( 1841 "CASE\n WHEN " + sql_item_name(c + '_delta', self.flavor, None) 1842 + " != " + get_null_replacement(typ, self.flavor) 1843 + " THEN " + sql_item_name(c + '_delta', self.flavor, None) 1844 + "\n ELSE NULL\nEND " 1845 + " AS " + sql_item_name(c, self.flavor, None) 1846 ) for c, typ in delta_cols.items() 1847 ])) 1848 + f"\nFROM {temp_table_names['joined']}\n" 1849 + f"WHERE " 1850 + '\nOR\n'.join([ 1851 ( 1852 sql_item_name(c + '_backtrack', self.flavor, None) + ' IS NOT NULL' 1853 ) for c in delta_cols 1854 ]) 1855 ) 1856 1857 create_update_query = get_create_table_query( 1858 select_update_query, 1859 temp_tables['update'], 1860 self.flavor, 1861 internal_schema, 1862 ) 1863 (create_update_success, create_update_msg), create_update_results = session_execute( 1864 session, 1865 create_update_query, 1866 with_results = True, 1867 debug = debug, 1868 ) if on_cols and not upsert else ((True, "Success"), []) 1869 apply_update_queries = ( 1870 get_update_queries( 1871 pipe.target, 1872 temp_tables['update'], 1873 session, 1874 on_cols, 1875 upsert = upsert, 1876 schema = self.get_pipe_schema(pipe), 1877 patch_schema = internal_schema, 1878 datetime_col = pipe.columns.get('datetime', None), 1879 flavor = self.flavor, 1880 debug = debug 1881 ) 1882 if on_cols else [] 1883 ) 1884 1885 apply_unseen_queries = [ 1886 ( 1887 f"INSERT INTO {pipe_name} ({delta_cols_str})\n" 1888 + f"SELECT {delta_cols_str}\nFROM " 1889 + ( 1890 temp_table_names['unseen'] 1891 if on_cols 1892 else temp_table_names['delta'] 1893 ) 1894 ), 1895 ] 1896 1897 (apply_unseen_success, apply_unseen_msg), apply_unseen_results = session_execute( 1898 session, 1899 apply_unseen_queries, 1900 with_results = True, 1901 debug = debug, 1902 ) if not upsert else (True, "Success"), None 1903 if not apply_unseen_success: 1904 _ = clean_up_temp_tables() 1905 return apply_unseen_success, apply_unseen_msg 1906 unseen_count = apply_unseen_results[0].rowcount if apply_unseen_results else 0 1907 1908 (apply_update_success, apply_update_msg), apply_update_results = session_execute( 1909 session, 1910 apply_update_queries, 1911 with_results = True, 1912 debug = debug, 1913 ) 1914 if not apply_update_success: 1915 _ = clean_up_temp_tables() 1916 return apply_update_success, apply_update_msg 1917 update_count = apply_update_results[0].rowcount if apply_update_results else 0 1918 1919 session.commit() 1920 1921 msg = ( 1922 f"Inserted {unseen_count}, updated {update_count} rows." 1923 if not upsert 1924 else f"Upserted {update_count} row" + ('s' if update_count != 1 else '') + "." 1925 ) 1926 _ = clean_up_temp_tables(ready_to_drop=True) 1927 1928 return True, msg
If a pipe's connector is the same as its instance connector, it's more efficient to sync the pipe in-place rather than reading data into Pandas.
Parameters
- pipe (mrsm.Pipe): The pipe whose connector is the same as its instance.
- params (Optional[Dict[str, Any]], default None):
Optional params dictionary to build the
WHERE
clause. Seemeerschaum.utils.sql.build_where
. - begin (Optional[datetime], default None):
Optionally specify the earliest datetime to search for data.
Defaults to
None
. - end (Optional[datetime], default None):
Optionally specify the latest datetime to search for data.
Defaults to
None
. - chunksize (Optional[int], default -1):
Specify the number of rows to sync per chunk.
If
-1
, resort to system configuration (default is900
). Achunksize
ofNone
will sync all rows in one transaction. Defaults to-1
. - check_existing (bool, default True):
If
True
, pull and diff with existing data from the pipe. - debug (bool, default False): Verbosity toggle.
Returns
- A SuccessTuple.
1931def get_sync_time( 1932 self, 1933 pipe: 'mrsm.Pipe', 1934 params: Optional[Dict[str, Any]] = None, 1935 newest: bool = True, 1936 debug: bool = False, 1937 ) -> Union[datetime, int, None]: 1938 """Get a Pipe's most recent datetime value. 1939 1940 Parameters 1941 ---------- 1942 pipe: mrsm.Pipe 1943 The pipe to get the sync time for. 1944 1945 params: Optional[Dict[str, Any]], default None 1946 Optional params dictionary to build the `WHERE` clause. 1947 See `meerschaum.utils.sql.build_where`. 1948 1949 newest: bool, default True 1950 If `True`, get the most recent datetime (honoring `params`). 1951 If `False`, get the oldest datetime (ASC instead of DESC). 1952 1953 Returns 1954 ------- 1955 A `datetime` object (or `int` if using an integer axis) if the pipe exists, otherwise `None`. 1956 """ 1957 from meerschaum.utils.sql import sql_item_name, build_where 1958 table = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 1959 1960 dt_col = pipe.columns.get('datetime', None) 1961 dt_type = pipe.dtypes.get(dt_col, 'datetime64[ns]') 1962 if not dt_col: 1963 _dt = pipe.guess_datetime() 1964 dt = sql_item_name(_dt, self.flavor, None) if _dt else None 1965 is_guess = True 1966 else: 1967 _dt = dt_col 1968 dt = sql_item_name(_dt, self.flavor, None) 1969 is_guess = False 1970 1971 if _dt is None: 1972 return None 1973 1974 ASC_or_DESC = "DESC" if newest else "ASC" 1975 existing_cols = pipe.get_columns_types(debug=debug) 1976 valid_params = {} 1977 if params is not None: 1978 valid_params = {k: v for k, v in params.items() if k in existing_cols} 1979 1980 ### If no bounds are provided for the datetime column, 1981 ### add IS NOT NULL to the WHERE clause. 1982 if _dt not in valid_params: 1983 valid_params[_dt] = '_None' 1984 where = "" if not valid_params else build_where(valid_params, self) 1985 q = f"SELECT {dt}\nFROM {table}{where}\nORDER BY {dt} {ASC_or_DESC}\nLIMIT 1" 1986 if self.flavor == 'mssql': 1987 q = f"SELECT TOP 1 {dt}\nFROM {table}{where}\nORDER BY {dt} {ASC_or_DESC}" 1988 elif self.flavor == 'oracle': 1989 q = ( 1990 "SELECT * FROM (\n" 1991 + f" SELECT {dt}\nFROM {table}{where}\n ORDER BY {dt} {ASC_or_DESC}\n" 1992 + ") WHERE ROWNUM = 1" 1993 ) 1994 1995 try: 1996 db_time = self.value(q, silent=True, debug=debug) 1997 1998 ### No datetime could be found. 1999 if db_time is None: 2000 return None 2001 ### sqlite returns str. 2002 if isinstance(db_time, str): 2003 from meerschaum.utils.packages import attempt_import 2004 dateutil_parser = attempt_import('dateutil.parser') 2005 st = dateutil_parser.parse(db_time) 2006 ### Do nothing if a datetime object is returned. 2007 elif isinstance(db_time, datetime): 2008 if hasattr(db_time, 'to_pydatetime'): 2009 st = db_time.to_pydatetime() 2010 else: 2011 st = db_time 2012 ### Sometimes the datetime is actually a date. 2013 elif isinstance(db_time, date): 2014 st = datetime.combine(db_time, datetime.min.time()) 2015 ### Adding support for an integer datetime axis. 2016 elif 'int' in str(type(db_time)).lower(): 2017 st = int(db_time) 2018 ### Convert pandas timestamp to Python datetime. 2019 else: 2020 st = db_time.to_pydatetime() 2021 2022 sync_time = st 2023 2024 except Exception as e: 2025 sync_time = None 2026 warn(str(e)) 2027 2028 return sync_time
Get a Pipe's most recent datetime value.
Parameters
- pipe (mrsm.Pipe): The pipe to get the sync time for.
- params (Optional[Dict[str, Any]], default None):
Optional params dictionary to build the
WHERE
clause. Seemeerschaum.utils.sql.build_where
. - newest (bool, default True):
If
True
, get the most recent datetime (honoringparams
). IfFalse
, get the oldest datetime (ASC instead of DESC).
Returns
- A
datetime
object (orint
if using an integer axis) if the pipe exists, otherwiseNone
.
2031def pipe_exists( 2032 self, 2033 pipe: mrsm.Pipe, 2034 debug: bool = False 2035 ) -> bool: 2036 """ 2037 Check that a Pipe's table exists. 2038 2039 Parameters 2040 ---------- 2041 pipe: mrsm.Pipe: 2042 The pipe to check. 2043 2044 debug: bool, default False 2045 Verbosity toggle. 2046 2047 Returns 2048 ------- 2049 A `bool` corresponding to whether a pipe's table exists. 2050 2051 """ 2052 from meerschaum.utils.sql import table_exists 2053 exists = table_exists( 2054 pipe.target, 2055 self, 2056 schema = self.get_pipe_schema(pipe), 2057 debug = debug, 2058 ) 2059 if debug: 2060 from meerschaum.utils.debug import dprint 2061 dprint(f"{pipe} " + ('exists.' if exists else 'does not exist.')) 2062 return exists
Check that a Pipe's table exists.
Parameters
- pipe (mrsm.Pipe:): The pipe to check.
- debug (bool, default False): Verbosity toggle.
Returns
- A
bool
corresponding to whether a pipe's table exists.
2065def get_pipe_rowcount( 2066 self, 2067 pipe: mrsm.Pipe, 2068 begin: Union[datetime, int, None] = None, 2069 end: Union[datetime, int, None] = None, 2070 params: Optional[Dict[str, Any]] = None, 2071 remote: bool = False, 2072 debug: bool = False 2073 ) -> Union[int, None]: 2074 """ 2075 Get the rowcount for a pipe in accordance with given parameters. 2076 2077 Parameters 2078 ---------- 2079 pipe: mrsm.Pipe 2080 The pipe to query with. 2081 2082 begin: Union[datetime, int, None], default None 2083 The begin datetime value. 2084 2085 end: Union[datetime, int, None], default None 2086 The end datetime value. 2087 2088 params: Optional[Dict[str, Any]], default None 2089 See `meerschaum.utils.sql.build_where`. 2090 2091 remote: bool, default False 2092 If `True`, get the rowcount for the remote table. 2093 2094 debug: bool, default False 2095 Verbosity toggle. 2096 2097 Returns 2098 ------- 2099 An `int` for the number of rows if the `pipe` exists, otherwise `None`. 2100 2101 """ 2102 from meerschaum.utils.sql import dateadd_str, sql_item_name, NO_CTE_FLAVORS 2103 from meerschaum.connectors.sql._fetch import get_pipe_query 2104 if remote: 2105 msg = f"'fetch:definition' must be an attribute of {pipe} to get a remote rowcount." 2106 if 'fetch' not in pipe.parameters: 2107 error(msg) 2108 return None 2109 if 'definition' not in pipe.parameters['fetch']: 2110 error(msg) 2111 return None 2112 2113 _pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 2114 2115 if not pipe.columns.get('datetime', None): 2116 _dt = pipe.guess_datetime() 2117 dt = sql_item_name(_dt, self.flavor, None) if _dt else None 2118 is_guess = True 2119 else: 2120 _dt = pipe.get_columns('datetime') 2121 dt = sql_item_name(_dt, self.flavor, None) 2122 is_guess = False 2123 2124 if begin is not None or end is not None: 2125 if is_guess: 2126 if _dt is None: 2127 warn( 2128 f"No datetime could be determined for {pipe}." 2129 + "\n Ignoring begin and end...", 2130 stack = False, 2131 ) 2132 begin, end = None, None 2133 else: 2134 warn( 2135 f"A datetime wasn't specified for {pipe}.\n" 2136 + f" Using column \"{_dt}\" for datetime bounds...", 2137 stack = False, 2138 ) 2139 2140 2141 _datetime_name = sql_item_name( 2142 _dt, 2143 ( 2144 pipe.instance_connector.flavor 2145 if not remote 2146 else pipe.connector.flavor 2147 ), 2148 None, 2149 ) 2150 _cols_names = [ 2151 sql_item_name( 2152 col, 2153 ( 2154 pipe.instance_connector.flavor 2155 if not remote 2156 else pipe.connector.flavor 2157 ), 2158 None, 2159 ) 2160 for col in set( 2161 ( 2162 [_dt] 2163 if _dt 2164 else [] 2165 ) 2166 + ( 2167 [] 2168 if params is None 2169 else list(params.keys()) 2170 ) 2171 ) 2172 ] 2173 if not _cols_names: 2174 _cols_names = ['*'] 2175 2176 src = ( 2177 f"SELECT {', '.join(_cols_names)} FROM {_pipe_name}" 2178 if not remote 2179 else get_pipe_query(pipe) 2180 ) 2181 query = ( 2182 f""" 2183 WITH src AS ({src}) 2184 SELECT COUNT(*) 2185 FROM src 2186 """ 2187 ) if self.flavor not in ('mysql', 'mariadb') else ( 2188 f""" 2189 SELECT COUNT(*) 2190 FROM ({src}) AS src 2191 """ 2192 ) 2193 if begin is not None or end is not None: 2194 query += "WHERE" 2195 if begin is not None: 2196 query += f""" 2197 {dt} >= {dateadd_str(self.flavor, datepart='minute', number=0, begin=begin)} 2198 """ 2199 if end is not None and begin is not None: 2200 query += "AND" 2201 if end is not None: 2202 query += f""" 2203 {dt} < {dateadd_str(self.flavor, datepart='minute', number=0, begin=end)} 2204 """ 2205 if params is not None: 2206 from meerschaum.utils.sql import build_where 2207 existing_cols = pipe.get_columns_types(debug=debug) 2208 valid_params = {k: v for k, v in params.items() if k in existing_cols} 2209 if valid_params: 2210 query += build_where(valid_params, self).replace('WHERE', ( 2211 'AND' if (begin is not None or end is not None) 2212 else 'WHERE' 2213 ) 2214 ) 2215 2216 result = self.value(query, debug=debug, silent=True) 2217 try: 2218 return int(result) 2219 except Exception as e: 2220 return None
Get the rowcount for a pipe in accordance with given parameters.
Parameters
- pipe (mrsm.Pipe): The pipe to query with.
- begin (Union[datetime, int, None], default None): The begin datetime value.
- end (Union[datetime, int, None], default None): The end datetime value.
- params (Optional[Dict[str, Any]], default None):
See
meerschaum.utils.sql.build_where
. - remote (bool, default False):
If
True
, get the rowcount for the remote table. - debug (bool, default False): Verbosity toggle.
Returns
- An
int
for the number of rows if thepipe
exists, otherwiseNone
.
2223def drop_pipe( 2224 self, 2225 pipe: mrsm.Pipe, 2226 debug: bool = False, 2227 **kw 2228 ) -> SuccessTuple: 2229 """ 2230 Drop a pipe's tables but maintain its registration. 2231 2232 Parameters 2233 ---------- 2234 pipe: mrsm.Pipe 2235 The pipe to drop. 2236 2237 """ 2238 from meerschaum.utils.sql import table_exists, sql_item_name 2239 success = True 2240 target = pipe.target 2241 target_name = ( 2242 sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2243 ) 2244 if table_exists(target, self, debug=debug): 2245 success = self.exec(f"DROP TABLE {target_name}", silent=True, debug=debug) is not None 2246 2247 msg = "Success" if success else f"Failed to drop {pipe}." 2248 return success, msg
Drop a pipe's tables but maintain its registration.
Parameters
- pipe (mrsm.Pipe): The pipe to drop.
2251def clear_pipe( 2252 self, 2253 pipe: mrsm.Pipe, 2254 begin: Union[datetime, int, None] = None, 2255 end: Union[datetime, int, None] = None, 2256 params: Optional[Dict[str, Any]] = None, 2257 debug: bool = False, 2258 **kw 2259 ) -> SuccessTuple: 2260 """ 2261 Delete a pipe's data within a bounded or unbounded interval without dropping the table. 2262 2263 Parameters 2264 ---------- 2265 pipe: mrsm.Pipe 2266 The pipe to clear. 2267 2268 begin: Union[datetime, int, None], default None 2269 Beginning datetime. Inclusive. 2270 2271 end: Union[datetime, int, None], default None 2272 Ending datetime. Exclusive. 2273 2274 params: Optional[Dict[str, Any]], default None 2275 See `meerschaum.utils.sql.build_where`. 2276 2277 """ 2278 if not pipe.exists(debug=debug): 2279 return True, f"{pipe} does not exist, so nothing was cleared." 2280 2281 from meerschaum.utils.sql import sql_item_name, build_where, dateadd_str 2282 pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 2283 2284 if not pipe.columns.get('datetime', None): 2285 _dt = pipe.guess_datetime() 2286 dt_name = sql_item_name(_dt, self.flavor, None) if _dt else None 2287 is_guess = True 2288 else: 2289 _dt = pipe.get_columns('datetime') 2290 dt_name = sql_item_name(_dt, self.flavor, None) 2291 is_guess = False 2292 2293 if begin is not None or end is not None: 2294 if is_guess: 2295 if _dt is None: 2296 warn( 2297 f"No datetime could be determined for {pipe}." 2298 + "\n Ignoring datetime bounds...", 2299 stack = False, 2300 ) 2301 begin, end = None, None 2302 else: 2303 warn( 2304 f"A datetime wasn't specified for {pipe}.\n" 2305 + f" Using column \"{_dt}\" for datetime bounds...", 2306 stack = False, 2307 ) 2308 2309 valid_params = {} 2310 if params is not None: 2311 existing_cols = pipe.get_columns_types(debug=debug) 2312 valid_params = {k: v for k, v in params.items() if k in existing_cols} 2313 clear_query = ( 2314 f"DELETE FROM {pipe_name}\nWHERE 1 = 1\n" 2315 + (' AND ' + build_where(valid_params, self, with_where=False) if valid_params else '') 2316 + ( 2317 f' AND {dt_name} >= ' + dateadd_str(self.flavor, 'day', 0, begin) 2318 if begin is not None else '' 2319 ) + ( 2320 f' AND {dt_name} < ' + dateadd_str(self.flavor, 'day', 0, end) 2321 if end is not None else '' 2322 ) 2323 ) 2324 success = self.exec(clear_query, silent=True, debug=debug) is not None 2325 msg = "Success" if success else f"Failed to clear {pipe}." 2326 return success, msg
Delete a pipe's data within a bounded or unbounded interval without dropping the table.
Parameters
- pipe (mrsm.Pipe): The pipe to clear.
- begin (Union[datetime, int, None], default None): Beginning datetime. Inclusive.
- end (Union[datetime, int, None], default None): Ending datetime. Exclusive.
- params (Optional[Dict[str, Any]], default None):
See
meerschaum.utils.sql.build_where
.
2857def deduplicate_pipe( 2858 self, 2859 pipe: mrsm.Pipe, 2860 begin: Union[datetime, int, None] = None, 2861 end: Union[datetime, int, None] = None, 2862 params: Optional[Dict[str, Any]] = None, 2863 debug: bool = False, 2864 **kwargs: Any 2865 ) -> SuccessTuple: 2866 """ 2867 Delete duplicate values within a pipe's table. 2868 2869 Parameters 2870 ---------- 2871 pipe: mrsm.Pipe 2872 The pipe whose table to deduplicate. 2873 2874 begin: Union[datetime, int, None], default None 2875 If provided, only deduplicate values greater than or equal to this value. 2876 2877 end: Union[datetime, int, None], default None 2878 If provided, only deduplicate values less than this value. 2879 2880 params: Optional[Dict[str, Any]], default None 2881 If provided, further limit deduplication to values which match this query dictionary. 2882 2883 debug: bool, default False 2884 Verbosity toggle. 2885 2886 Returns 2887 ------- 2888 A `SuccessTuple` indicating success. 2889 """ 2890 from meerschaum.utils.sql import ( 2891 sql_item_name, 2892 NO_CTE_FLAVORS, 2893 get_rename_table_queries, 2894 NO_SELECT_INTO_FLAVORS, 2895 get_create_table_query, 2896 format_cte_subquery, 2897 get_null_replacement, 2898 ) 2899 from meerschaum.utils.misc import generate_password, flatten_list 2900 2901 pipe_table_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 2902 2903 if not pipe.exists(debug=debug): 2904 return False, f"Table {pipe_table_name} does not exist." 2905 2906 ### TODO: Handle deleting duplicates without a datetime axis. 2907 dt_col = pipe.columns.get('datetime', None) 2908 dt_col_name = sql_item_name(dt_col, self.flavor, None) 2909 cols_types = pipe.get_columns_types(debug=debug) 2910 existing_cols = pipe.get_columns_types(debug=debug) 2911 2912 get_rowcount_query = f"SELECT COUNT(*) FROM {pipe_table_name}" 2913 old_rowcount = self.value(get_rowcount_query, debug=debug) 2914 if old_rowcount is None: 2915 return False, f"Failed to get rowcount for table {pipe_table_name}." 2916 2917 ### Non-datetime indices that in fact exist. 2918 indices = [ 2919 col 2920 for key, col in pipe.columns.items() 2921 if col and col != dt_col and col in cols_types 2922 ] 2923 indices_names = [sql_item_name(index_col, self.flavor, None) for index_col in indices] 2924 existing_cols_names = [sql_item_name(col, self.flavor, None) for col in existing_cols] 2925 duplicates_cte_name = sql_item_name('dups', self.flavor, None) 2926 duplicate_row_number_name = sql_item_name('dup_row_num', self.flavor, None) 2927 previous_row_number_name = sql_item_name('prev_row_num', self.flavor, None) 2928 2929 index_list_str = ( 2930 sql_item_name(dt_col, self.flavor, None) 2931 if dt_col 2932 else '' 2933 ) 2934 index_list_str_ordered = ( 2935 ( 2936 sql_item_name(dt_col, self.flavor, None) + " DESC" 2937 ) 2938 if dt_col 2939 else '' 2940 ) 2941 if indices: 2942 index_list_str += ', ' + ', '.join(indices_names) 2943 index_list_str_ordered += ', ' + ', '.join(indices_names) 2944 if index_list_str.startswith(','): 2945 index_list_str = index_list_str.lstrip(',').lstrip() 2946 if index_list_str_ordered.startswith(','): 2947 index_list_str_ordered = index_list_str_ordered.lstrip(',').lstrip() 2948 2949 cols_list_str = ', '.join(existing_cols_names) 2950 2951 try: 2952 ### NOTE: MySQL 5 and below does not support window functions (ROW_NUMBER()). 2953 is_old_mysql = ( 2954 self.flavor in ('mysql', 'mariadb') 2955 and 2956 int(self.db_version.split('.')[0]) < 8 2957 ) 2958 except Exception as e: 2959 is_old_mysql = False 2960 2961 src_query = f""" 2962 SELECT 2963 {cols_list_str}, 2964 ROW_NUMBER() OVER ( 2965 PARTITION BY 2966 {index_list_str} 2967 ORDER BY {index_list_str_ordered} 2968 ) AS {duplicate_row_number_name} 2969 FROM {pipe_table_name} 2970 """ 2971 duplicates_cte_subquery = format_cte_subquery( 2972 src_query, 2973 self.flavor, 2974 sub_name = 'src', 2975 cols_to_select = cols_list_str, 2976 ) + f""" 2977 WHERE {duplicate_row_number_name} = 1 2978 """ 2979 old_mysql_query = ( 2980 f""" 2981 SELECT 2982 {index_list_str} 2983 FROM ( 2984 SELECT 2985 {index_list_str}, 2986 IF( 2987 @{previous_row_number_name} <> {index_list_str.replace(', ', ' + ')}, 2988 @{duplicate_row_number_name} := 0, 2989 @{duplicate_row_number_name} 2990 ), 2991 @{previous_row_number_name} := {index_list_str.replace(', ', ' + ')}, 2992 @{duplicate_row_number_name} := @{duplicate_row_number_name} + 1 AS """ 2993 + f"""{duplicate_row_number_name} 2994 FROM 2995 {pipe_table_name}, 2996 ( 2997 SELECT @{duplicate_row_number_name} := 0 2998 ) AS {duplicate_row_number_name}, 2999 ( 3000 SELECT @{previous_row_number_name} := '{get_null_replacement('str', 'mysql')}' 3001 ) AS {previous_row_number_name} 3002 ORDER BY {index_list_str_ordered} 3003 ) AS t 3004 WHERE {duplicate_row_number_name} = 1 3005 """ 3006 ) 3007 if is_old_mysql: 3008 duplicates_cte_subquery = old_mysql_query 3009 3010 session_id = generate_password(3) 3011 3012 dedup_table = '-' + session_id + f'_dedup_{pipe.target}' 3013 temp_old_table = '-' + session_id + f"_old_{pipe.target}" 3014 3015 dedup_table_name = sql_item_name(dedup_table, self.flavor, self.get_pipe_schema(pipe)) 3016 temp_old_table_name = sql_item_name(temp_old_table, self.flavor, self.get_pipe_schema(pipe)) 3017 3018 create_temporary_table_query = get_create_table_query( 3019 duplicates_cte_subquery, 3020 dedup_table, 3021 self.flavor, 3022 ) + f""" 3023 ORDER BY {index_list_str_ordered} 3024 """ 3025 alter_queries = flatten_list([ 3026 get_rename_table_queries( 3027 pipe.target, temp_old_table, self.flavor, schema=self.get_pipe_schema(pipe) 3028 ), 3029 get_rename_table_queries( 3030 dedup_table, pipe.target, self.flavor, schema=self.get_pipe_schema(pipe) 3031 ), 3032 f""" 3033 DROP TABLE {temp_old_table_name} 3034 """, 3035 ]) 3036 3037 create_temporary_result = self.execute(create_temporary_table_query, debug=debug) 3038 if create_temporary_result is None: 3039 return False, f"Failed to deduplicate table {pipe_table_name}." 3040 3041 results = self.exec_queries( 3042 alter_queries, 3043 break_on_error = True, 3044 rollback = True, 3045 debug = debug, 3046 ) 3047 3048 fail_query = None 3049 for result, query in zip(results, alter_queries): 3050 if result is None: 3051 fail_query = query 3052 break 3053 success = fail_query is None 3054 3055 new_rowcount = ( 3056 self.value(get_rowcount_query, debug=debug) 3057 if success 3058 else None 3059 ) 3060 3061 msg = ( 3062 ( 3063 f"Successfully deduplicated table {pipe_table_name}" 3064 + ( 3065 f"\nfrom {old_rowcount} to {new_rowcount} rows" 3066 if old_rowcount != new_rowcount 3067 else '' 3068 ) 3069 + '.' 3070 ) 3071 if success 3072 else f"Failed to execute query:\n{fail_query}" 3073 ) 3074 return success, msg
Delete duplicate values within a pipe's table.
Parameters
- pipe (mrsm.Pipe): The pipe whose table to deduplicate.
- begin (Union[datetime, int, None], default None): If provided, only deduplicate values greater than or equal to this value.
- end (Union[datetime, int, None], default None): If provided, only deduplicate values less than this value.
- params (Optional[Dict[str, Any]], default None): If provided, further limit deduplication to values which match this query dictionary.
- debug (bool, default False): Verbosity toggle.
Returns
- A
SuccessTuple
indicating success.
2329def get_pipe_table( 2330 self, 2331 pipe: mrsm.Pipe, 2332 debug: bool = False, 2333 ) -> sqlalchemy.Table: 2334 """ 2335 Return the `sqlalchemy.Table` object for a `mrsm.Pipe`. 2336 2337 Parameters 2338 ---------- 2339 pipe: mrsm.Pipe: 2340 The pipe in question. 2341 2342 Returns 2343 ------- 2344 A `sqlalchemy.Table` object. 2345 2346 """ 2347 from meerschaum.utils.sql import get_sqlalchemy_table 2348 if not pipe.exists(debug=debug): 2349 return None 2350 return get_sqlalchemy_table( 2351 pipe.target, 2352 connector = self, 2353 schema = self.get_pipe_schema(pipe), 2354 debug = debug, 2355 refresh = True, 2356 )
Return the sqlalchemy.Table
object for a mrsm.Pipe
.
Parameters
- pipe (mrsm.Pipe:): The pipe in question.
Returns
- A
sqlalchemy.Table
object.
2359def get_pipe_columns_types( 2360 self, 2361 pipe: mrsm.Pipe, 2362 debug: bool = False, 2363 ) -> Dict[str, str]: 2364 """ 2365 Get the pipe's columns and types. 2366 2367 Parameters 2368 ---------- 2369 pipe: mrsm.Pipe: 2370 The pipe to get the columns for. 2371 2372 Returns 2373 ------- 2374 A dictionary of columns names (`str`) and types (`str`). 2375 2376 Examples 2377 -------- 2378 >>> conn.get_pipe_columns_types(pipe) 2379 { 2380 'dt': 'TIMESTAMP WITHOUT TIMEZONE', 2381 'id': 'BIGINT', 2382 'val': 'DOUBLE PRECISION', 2383 } 2384 >>> 2385 """ 2386 if not pipe.exists(debug=debug): 2387 return {} 2388 2389 if self.flavor == 'duckdb': 2390 from meerschaum.utils.sql import get_table_cols_types 2391 return get_table_cols_types( 2392 pipe.target, 2393 self, 2394 flavor = self.flavor, 2395 schema = self.schema, 2396 ) 2397 2398 table_columns = {} 2399 try: 2400 pipe_table = self.get_pipe_table(pipe, debug=debug) 2401 if pipe_table is None: 2402 return {} 2403 for col in pipe_table.columns: 2404 table_columns[str(col.name)] = str(col.type) 2405 except Exception as e: 2406 import traceback 2407 traceback.print_exc() 2408 warn(e) 2409 table_columns = {} 2410 2411 return table_columns
Get the pipe's columns and types.
Parameters
- pipe (mrsm.Pipe:): The pipe to get the columns for.
Returns
- A dictionary of columns names (
str
) and types (str
).
Examples
>>> conn.get_pipe_columns_types(pipe)
{
'dt': 'TIMESTAMP WITHOUT TIMEZONE',
'id': 'BIGINT',
'val': 'DOUBLE PRECISION',
}
>>>
2804def get_to_sql_dtype( 2805 self, 2806 pipe: 'mrsm.Pipe', 2807 df: 'pd.DataFrame', 2808 update_dtypes: bool = True, 2809 ) -> Dict[str, 'sqlalchemy.sql.visitors.TraversibleType']: 2810 """ 2811 Given a pipe and DataFrame, return the `dtype` dictionary for `to_sql()`. 2812 2813 Parameters 2814 ---------- 2815 pipe: mrsm.Pipe 2816 The pipe which may contain a `dtypes` parameter. 2817 2818 df: pd.DataFrame 2819 The DataFrame to be pushed via `to_sql()`. 2820 2821 update_dtypes: bool, default True 2822 If `True`, patch the pipe's dtypes onto the DataFrame's dtypes. 2823 2824 Returns 2825 ------- 2826 A dictionary with `sqlalchemy` datatypes. 2827 2828 Examples 2829 -------- 2830 >>> import pandas as pd 2831 >>> import meerschaum as mrsm 2832 >>> 2833 >>> conn = mrsm.get_connector('sql:memory') 2834 >>> df = pd.DataFrame([{'a': {'b': 1}}]) 2835 >>> pipe = mrsm.Pipe('a', 'b', dtypes={'a': 'json'}) 2836 >>> get_to_sql_dtype(pipe, df) 2837 {'a': <class 'sqlalchemy.sql.sqltypes.JSON'>} 2838 """ 2839 from meerschaum.utils.dataframe import get_json_cols, get_numeric_cols 2840 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 2841 df_dtypes = { 2842 col: str(typ) 2843 for col, typ in df.dtypes.items() 2844 } 2845 json_cols = get_json_cols(df) 2846 numeric_cols = get_numeric_cols(df) 2847 df_dtypes.update({col: 'json' for col in json_cols}) 2848 df_dtypes.update({col: 'numeric' for col in numeric_cols}) 2849 if update_dtypes: 2850 df_dtypes.update(pipe.dtypes) 2851 return { 2852 col: get_db_type_from_pd_type(typ, self.flavor, as_sqlalchemy=True) 2853 for col, typ in df_dtypes.items() 2854 }
Given a pipe and DataFrame, return the dtype
dictionary for to_sql()
.
Parameters
- pipe (mrsm.Pipe):
The pipe which may contain a
dtypes
parameter. - df (pd.DataFrame):
The DataFrame to be pushed via
to_sql()
. - update_dtypes (bool, default True):
If
True
, patch the pipe's dtypes onto the DataFrame's dtypes.
Returns
- A dictionary with
sqlalchemy
datatypes.
Examples
>>> import pandas as pd
>>> import meerschaum as mrsm
>>>
>>> conn = mrsm.get_connector('sql:memory')
>>> df = pd.DataFrame([{'a': {'b': 1}}])
>>> pipe = mrsm.Pipe('a', 'b', dtypes={'a': 'json'})
>>> get_to_sql_dtype(pipe, df)
{'a': <class 'sqlalchemy.sql.sqltypes.JSON'>}
3077def get_pipe_schema(self, pipe: mrsm.Pipe) -> Union[str, None]: 3078 """ 3079 Return the schema to use for this pipe. 3080 First check `pipe.parameters['schema']`, then check `self.schema`. 3081 3082 Parameters 3083 ---------- 3084 pipe: mrsm.Pipe 3085 The pipe which may contain a configured schema. 3086 3087 Returns 3088 ------- 3089 A schema string or `None` if nothing is configured. 3090 """ 3091 return pipe.parameters.get('schema', self.schema)
Return the schema to use for this pipe.
First check pipe.parameters['schema']
, then check self.schema
.
Parameters
- pipe (mrsm.Pipe): The pipe which may contain a configured schema.
Returns
- A schema string or
None
if nothing is configured.
13def register_plugin( 14 self, 15 plugin: 'meerschaum.core.Plugin', 16 force: bool = False, 17 debug: bool = False, 18 **kw: Any 19 ) -> SuccessTuple: 20 """Register a new plugin to the plugins table.""" 21 from meerschaum.utils.warnings import warn, error 22 from meerschaum.utils.packages import attempt_import 23 sqlalchemy = attempt_import('sqlalchemy') 24 from meerschaum.utils.sql import json_flavors 25 from meerschaum.connectors.sql.tables import get_tables 26 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 27 28 old_id = self.get_plugin_id(plugin, debug=debug) 29 30 ### Check for version conflict. May be overridden with `--force`. 31 if old_id is not None and not force: 32 old_version = self.get_plugin_version(plugin, debug=debug) 33 new_version = plugin.version 34 if old_version is None: 35 old_version = '' 36 if new_version is None: 37 new_version = '' 38 39 ### verify that the new version is greater than the old 40 packaging_version = attempt_import('packaging.version') 41 if ( 42 old_version and new_version 43 and packaging_version.parse(old_version) >= packaging_version.parse(new_version) 44 ): 45 return False, ( 46 f"Version '{new_version}' of plugin '{plugin}' " + 47 f"must be greater than existing version '{old_version}'." 48 ) 49 50 import json 51 bind_variables = { 52 'plugin_name' : plugin.name, 53 'version' : plugin.version, 54 'attributes' : ( 55 json.dumps(plugin.attributes) if self.flavor not in json_flavors else plugin.attributes 56 ), 57 'user_id' : plugin.user_id, 58 } 59 60 if old_id is None: 61 query = sqlalchemy.insert(plugins_tbl).values(**bind_variables) 62 else: 63 query = ( 64 sqlalchemy.update(plugins_tbl) 65 .values(**bind_variables) 66 .where(plugins_tbl.c.plugin_id == old_id) 67 ) 68 69 result = self.exec(query, debug=debug) 70 if result is None: 71 return False, f"Failed to register plugin '{plugin}'." 72 return True, f"Successfully registered plugin '{plugin}'."
Register a new plugin to the plugins table.
241def delete_plugin( 242 self, 243 plugin: 'meerschaum.core.Plugin', 244 debug: bool = False, 245 **kw: Any 246 ) -> SuccessTuple: 247 """Delete a plugin from the plugins table.""" 248 from meerschaum.utils.warnings import warn, error 249 from meerschaum.utils.packages import attempt_import 250 sqlalchemy = attempt_import('sqlalchemy') 251 from meerschaum.connectors.sql.tables import get_tables 252 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 253 254 plugin_id = self.get_plugin_id(plugin, debug=debug) 255 if plugin_id is None: 256 return True, f"Plugin '{plugin}' was not registered." 257 258 bind_variables = { 259 'plugin_id' : plugin_id, 260 } 261 262 query = sqlalchemy.delete(plugins_tbl).where(plugins_tbl.c.plugin_id == plugin_id) 263 result = self.exec(query, debug=debug) 264 if result is None: 265 return False, f"Failed to delete plugin '{plugin}'." 266 return True, f"Successfully deleted plugin '{plugin}'."
Delete a plugin from the plugins table.
74def get_plugin_id( 75 self, 76 plugin: 'meerschaum.core.Plugin', 77 debug: bool = False 78 ) -> Optional[int]: 79 """ 80 Return a plugin's ID. 81 """ 82 ### ensure plugins table exists 83 from meerschaum.connectors.sql.tables import get_tables 84 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 85 from meerschaum.utils.packages import attempt_import 86 sqlalchemy = attempt_import('sqlalchemy') 87 88 query = ( 89 sqlalchemy 90 .select(plugins_tbl.c.plugin_id) 91 .where(plugins_tbl.c.plugin_name == plugin.name) 92 ) 93 94 try: 95 return int(self.value(query, debug=debug)) 96 except Exception as e: 97 return None
Return a plugin's ID.
99def get_plugin_version( 100 self, 101 plugin: 'meerschaum.core.Plugin', 102 debug: bool = False 103 ) -> Optional[str]: 104 """ 105 Return a plugin's version. 106 """ 107 ### ensure plugins table exists 108 from meerschaum.connectors.sql.tables import get_tables 109 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 110 from meerschaum.utils.packages import attempt_import 111 sqlalchemy = attempt_import('sqlalchemy') 112 query = sqlalchemy.select(plugins_tbl.c.version).where(plugins_tbl.c.plugin_name == plugin.name) 113 return self.value(query, debug=debug)
Return a plugin's version.
194def get_plugins( 195 self, 196 user_id: Optional[int] = None, 197 search_term: Optional[str] = None, 198 debug: bool = False, 199 **kw: Any 200 ) -> List[str]: 201 """ 202 Return a list of all registered plugins. 203 204 Parameters 205 ---------- 206 user_id: Optional[int], default None 207 If specified, filter plugins by a specific `user_id`. 208 209 search_term: Optional[str], default None 210 If specified, add a `WHERE plugin_name LIKE '{search_term}%'` clause to filter the plugins. 211 212 213 Returns 214 ------- 215 A list of plugin names. 216 """ 217 ### ensure plugins table exists 218 from meerschaum.connectors.sql.tables import get_tables 219 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 220 from meerschaum.utils.packages import attempt_import 221 sqlalchemy = attempt_import('sqlalchemy') 222 223 query = sqlalchemy.select(plugins_tbl.c.plugin_name) 224 if user_id is not None: 225 query = query.where(plugins_tbl.c.user_id == user_id) 226 if search_term is not None: 227 query = query.where(plugins_tbl.c.plugin_name.like(search_term + '%')) 228 229 rows = ( 230 self.execute(query).fetchall() 231 if self.flavor != 'duckdb' 232 else [ 233 (row['plugin_name'],) 234 for row in self.read(query).to_dict(orient='records') 235 ] 236 ) 237 238 return [row[0] for row in rows]
Return a list of all registered plugins.
Parameters
- user_id (Optional[int], default None):
If specified, filter plugins by a specific
user_id
. - search_term (Optional[str], default None):
If specified, add a
WHERE plugin_name LIKE '{search_term}%'
clause to filter the plugins.
Returns
- A list of plugin names.
115def get_plugin_user_id( 116 self, 117 plugin: 'meerschaum.core.Plugin', 118 debug: bool = False 119 ) -> Optional[int]: 120 """ 121 Return a plugin's user ID. 122 """ 123 ### ensure plugins table exists 124 from meerschaum.connectors.sql.tables import get_tables 125 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 126 from meerschaum.utils.packages import attempt_import 127 sqlalchemy = attempt_import('sqlalchemy') 128 129 query = ( 130 sqlalchemy 131 .select(plugins_tbl.c.user_id) 132 .where(plugins_tbl.c.plugin_name == plugin.name) 133 ) 134 135 try: 136 return int(self.value(query, debug=debug)) 137 except Exception as e: 138 return None
Return a plugin's user ID.
140def get_plugin_username( 141 self, 142 plugin: 'meerschaum.core.Plugin', 143 debug: bool = False 144 ) -> Optional[str]: 145 """ 146 Return the username of a plugin's owner. 147 """ 148 ### ensure plugins table exists 149 from meerschaum.connectors.sql.tables import get_tables 150 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 151 users = get_tables(mrsm_instance=self, debug=debug)['users'] 152 from meerschaum.utils.packages import attempt_import 153 sqlalchemy = attempt_import('sqlalchemy') 154 155 query = ( 156 sqlalchemy.select(users.c.username) 157 .where( 158 users.c.user_id == plugins_tbl.c.user_id 159 and plugins_tbl.c.plugin_name == plugin.name 160 ) 161 ) 162 163 return self.value(query, debug=debug)
Return the username of a plugin's owner.
166def get_plugin_attributes( 167 self, 168 plugin: 'meerschaum.core.Plugin', 169 debug: bool = False 170 ) -> Dict[str, Any]: 171 """ 172 Return the attributes of a plugin. 173 """ 174 ### ensure plugins table exists 175 from meerschaum.connectors.sql.tables import get_tables 176 import json 177 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 178 from meerschaum.utils.packages import attempt_import 179 sqlalchemy = attempt_import('sqlalchemy') 180 181 query = ( 182 sqlalchemy 183 .select(plugins_tbl.c.attributes) 184 .where(plugins_tbl.c.plugin_name == plugin.name) 185 ) 186 187 _attr = self.value(query, debug=debug) 188 if isinstance(_attr, str): 189 _attr = json.loads(_attr) 190 elif _attr is None: 191 _attr = {} 192 return _attr
Return the attributes of a plugin.
13def register_user( 14 self, 15 user: meerschaum.core.User, 16 debug: bool = False, 17 **kw: Any 18 ) -> SuccessTuple: 19 """Register a new user.""" 20 from meerschaum.utils.warnings import warn, error, info 21 from meerschaum.utils.packages import attempt_import 22 from meerschaum.utils.sql import json_flavors 23 sqlalchemy = attempt_import('sqlalchemy') 24 25 valid_tuple = valid_username(user.username) 26 if not valid_tuple[0]: 27 return valid_tuple 28 29 old_id = self.get_user_id(user, debug=debug) 30 31 if old_id is not None: 32 return False, f"User '{user}' already exists." 33 34 ### ensure users table exists 35 from meerschaum.connectors.sql.tables import get_tables 36 tables = get_tables(mrsm_instance=self, debug=debug) 37 38 import json 39 bind_variables = { 40 'username': user.username, 41 'email': user.email, 42 'password_hash': user.password_hash, 43 'user_type': user.type, 44 'attributes': ( 45 json.dumps(user.attributes) if self.flavor not in json_flavors else user.attributes 46 ), 47 } 48 if old_id is not None: 49 return False, f"User '{user.username}' already exists." 50 if old_id is None: 51 query = ( 52 sqlalchemy.insert(tables['users']). 53 values(**bind_variables) 54 ) 55 56 result = self.exec(query, debug=debug) 57 if result is None: 58 return False, f"Failed to register user '{user}'." 59 return True, f"Successfully registered user '{user}'."
Register a new user.
149def get_user_id( 150 self, 151 user: meerschaum.core.User, 152 debug : bool = False 153 ) -> Optional[int]: 154 """If a user is registered, return the `user_id`.""" 155 ### ensure users table exists 156 from meerschaum.utils.packages import attempt_import 157 sqlalchemy = attempt_import('sqlalchemy') 158 from meerschaum.connectors.sql.tables import get_tables 159 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 160 161 query = ( 162 sqlalchemy.select(users_tbl.c.user_id) 163 .where(users_tbl.c.username == user.username) 164 ) 165 166 result = self.value(query, debug=debug) 167 if result is not None: 168 return int(result) 169 return None
If a user is registered, return the user_id
.
242def get_users( 243 self, 244 debug: bool = False, 245 **kw: Any 246 ) -> List[str]: 247 """ 248 Get the registered usernames. 249 """ 250 ### ensure users table exists 251 from meerschaum.connectors.sql.tables import get_tables 252 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 253 from meerschaum.utils.packages import attempt_import 254 sqlalchemy = attempt_import('sqlalchemy') 255 256 query = sqlalchemy.select(users_tbl.c.username) 257 258 return list(self.read(query, debug=debug)['username'])
Get the registered usernames.
95def edit_user( 96 self, 97 user: meerschaum.core.User, 98 debug: bool = False, 99 **kw: Any 100 ) -> SuccessTuple: 101 """Update an existing user's metadata.""" 102 from meerschaum.utils.packages import attempt_import 103 sqlalchemy = attempt_import('sqlalchemy') 104 from meerschaum.connectors.sql.tables import get_tables 105 from meerschaum.utils.sql import json_flavors 106 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 107 108 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 109 if user_id is None: 110 return False, ( 111 f"User '{user.username}' does not exist. " + 112 f"Register user '{user.username}' before editing." 113 ) 114 user.user_id = user_id 115 116 import json 117 valid_tuple = valid_username(user.username) 118 if not valid_tuple[0]: 119 return valid_tuple 120 121 bind_variables = { 122 'user_id' : user_id, 123 'username' : user.username, 124 } 125 if user.password != '': 126 bind_variables['password_hash'] = user.password_hash 127 if user.email != '': 128 bind_variables['email'] = user.email 129 if user.attributes is not None and user.attributes != {}: 130 bind_variables['attributes'] = ( 131 json.dumps(user.attributes) if self.flavor in ('duckdb',) 132 else user.attributes 133 ) 134 if user.type != '': 135 bind_variables['user_type'] = user.type 136 137 query = ( 138 sqlalchemy 139 .update(users_tbl) 140 .values(**bind_variables) 141 .where(users_tbl.c.user_id == user_id) 142 ) 143 144 result = self.exec(query, debug=debug) 145 if result is None: 146 return False, f"Failed to edit user '{user}'." 147 return True, f"Successfully edited user '{user}'."
Update an existing user's metadata.
211def delete_user( 212 self, 213 user: meerschaum.core.User, 214 debug: bool = False 215 ) -> SuccessTuple: 216 """Delete a user's record from the users table.""" 217 ### ensure users table exists 218 from meerschaum.connectors.sql.tables import get_tables 219 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 220 plugins = get_tables(mrsm_instance=self, debug=debug)['plugins'] 221 from meerschaum.utils.packages import attempt_import 222 sqlalchemy = attempt_import('sqlalchemy') 223 224 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 225 226 if user_id is None: 227 return False, f"User '{user.username}' is not registered and cannot be deleted." 228 229 query = sqlalchemy.delete(users_tbl).where(users_tbl.c.user_id == user_id) 230 231 result = self.exec(query, debug=debug) 232 if result is None: 233 return False, f"Failed to delete user '{user}'." 234 235 query = sqlalchemy.delete(plugins).where(plugins.c.user_id == user_id) 236 result = self.exec(query, debug=debug) 237 if result is None: 238 return False, f"Failed to delete plugins of user '{user}'." 239 240 return True, f"Successfully deleted user '{user}'"
Delete a user's record from the users table.
260def get_user_password_hash( 261 self, 262 user: meerschaum.core.User, 263 debug: bool = False, 264 **kw: Any 265 ) -> Optional[str]: 266 """ 267 Return the password has for a user. 268 **NOTE**: This may be dangerous and is only allowed if the security settings explicity allow it. 269 """ 270 from meerschaum.utils.debug import dprint 271 from meerschaum.connectors.sql.tables import get_tables 272 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 273 from meerschaum.utils.packages import attempt_import 274 sqlalchemy = attempt_import('sqlalchemy') 275 276 if user.user_id is not None: 277 user_id = user.user_id 278 if debug: 279 dprint(f"Already given user_id: {user_id}") 280 else: 281 if debug: 282 dprint(f"Fetching user_id...") 283 user_id = self.get_user_id(user, debug=debug) 284 285 if user_id is None: 286 return None 287 288 query = sqlalchemy.select(users_tbl.c.password_hash).where(users_tbl.c.user_id == user_id) 289 290 return self.value(query, debug=debug)
Return the password has for a user. NOTE: This may be dangerous and is only allowed if the security settings explicity allow it.
292def get_user_type( 293 self, 294 user: meerschaum.core.User, 295 debug: bool = False, 296 **kw: Any 297 ) -> Optional[str]: 298 """ 299 Return the user's type. 300 """ 301 from meerschaum.connectors.sql.tables import get_tables 302 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 303 from meerschaum.utils.packages import attempt_import 304 sqlalchemy = attempt_import('sqlalchemy') 305 306 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 307 308 if user_id is None: 309 return None 310 311 query = sqlalchemy.select(users_tbl.c.user_type).where(users_tbl.c.user_id == user_id) 312 313 return self.value(query, debug=debug)
Return the user's type.
171def get_user_attributes( 172 self, 173 user: meerschaum.core.User, 174 debug: bool = False 175 ) -> Union[Dict[str, Any], None]: 176 """ 177 Return the user's attributes. 178 """ 179 ### ensure users table exists 180 from meerschaum.utils.warnings import warn 181 from meerschaum.utils.packages import attempt_import 182 sqlalchemy = attempt_import('sqlalchemy') 183 from meerschaum.connectors.sql.tables import get_tables 184 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 185 186 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 187 188 query = ( 189 sqlalchemy.select(users_tbl.c.attributes) 190 .where(users_tbl.c.user_id == user_id) 191 ) 192 193 result = self.value(query, debug=debug) 194 if result is not None and not isinstance(result, dict): 195 try: 196 result = dict(result) 197 _parsed = True 198 except Exception as e: 199 _parsed = False 200 if not _parsed: 201 try: 202 import json 203 result = json.loads(result) 204 _parsed = True 205 except Exception as e: 206 _parsed = False 207 if not _parsed: 208 warn(f"Received unexpected type for attributes: {result}") 209 return result
Return the user's attributes.
15@classmethod 16def from_uri( 17 cls, 18 uri: str, 19 label: Optional[str] = None, 20 as_dict: bool = False, 21 ) -> Union[ 22 'meerschaum.connectors.SQLConnector', 23 Dict[str, Union[str, int]], 24 ]: 25 """ 26 Create a new SQLConnector from a URI string. 27 28 Parameters 29 ---------- 30 uri: str 31 The URI connection string. 32 33 label: Optional[str], default None 34 If provided, use this as the connector label. 35 Otherwise use the determined database name. 36 37 as_dict: bool, default False 38 If `True`, return a dictionary of the keyword arguments 39 necessary to create a new `SQLConnector`, otherwise create a new object. 40 41 Returns 42 ------- 43 A new SQLConnector object or a dictionary of attributes (if `as_dict` is `True`). 44 """ 45 46 params = cls.parse_uri(uri) 47 params['uri'] = uri 48 flavor = params.get('flavor', None) 49 if not flavor or flavor not in cls.flavor_configs: 50 error(f"Invalid flavor '{flavor}' detected from the provided URI.") 51 52 if 'database' not in params: 53 error("Unable to determine the database from the provided URI.") 54 55 if flavor in ('sqlite', 'duckdb'): 56 if params['database'] == ':memory:': 57 params['label'] = label or f'memory_{flavor}' 58 else: 59 params['label'] = label or params['database'].split(os.path.sep)[-1].lower() 60 else: 61 params['label'] = label or ( 62 ( 63 (params['username'] + '@' if 'username' in params else '') 64 + params.get('host', '') 65 + ('/' if 'host' in params else '') 66 + params.get('database', '') 67 ).lower() 68 ) 69 70 return cls(**params) if not as_dict else params
Create a new SQLConnector from a URI string.
Parameters
- uri (str): The URI connection string.
- label (Optional[str], default None): If provided, use this as the connector label. Otherwise use the determined database name.
- as_dict (bool, default False):
If
True
, return a dictionary of the keyword arguments necessary to create a newSQLConnector
, otherwise create a new object.
Returns
- A new SQLConnector object or a dictionary of attributes (if
as_dict
isTrue
).
73@staticmethod 74def parse_uri(uri: str) -> Dict[str, Any]: 75 """ 76 Parse a URI string into a dictionary of parameters. 77 78 Parameters 79 ---------- 80 uri: str 81 The database connection URI. 82 83 Returns 84 ------- 85 A dictionary of attributes. 86 87 Examples 88 -------- 89 >>> parse_uri('sqlite:////home/foo/bar.db') 90 {'database': '/home/foo/bar.db', 'flavor': 'sqlite'} 91 >>> parse_uri( 92 ... 'mssql+pyodbc://sa:supersecureSECRETPASSWORD123!@localhost:1439' 93 ... + '/master?driver=ODBC+Driver+17+for+SQL+Server' 94 ... ) 95 {'host': 'localhost', 'database': 'master', 'username': 'sa', 96 'password': 'supersecureSECRETPASSWORD123!', 'port': 1439, 'flavor': 'mssql', 97 'driver': 'ODBC Driver 17 for SQL Server'} 98 >>> 99 """ 100 from urllib.parse import parse_qs, urlparse 101 sqlalchemy = attempt_import('sqlalchemy') 102 parser = sqlalchemy.engine.url.make_url 103 params = parser(uri).translate_connect_args() 104 params['flavor'] = uri.split(':')[0].split('+')[0] 105 if params['flavor'] == 'postgres': 106 params['flavor'] = 'postgresql' 107 if '?' in uri: 108 parsed_uri = urlparse(uri) 109 for key, value in parse_qs(parsed_uri.query).items(): 110 params.update({key: value[0]}) 111 112 if '--search_path' in params.get('options', ''): 113 params.update({'schema': params['options'].replace('--search_path=', '', 1)}) 114 return params
Parse a URI string into a dictionary of parameters.
Parameters
- uri (str): The database connection URI.
Returns
- A dictionary of attributes.
Examples
>>> parse_uri('sqlite:////home/foo/bar.db')
{'database': '/home/foo/bar.db', 'flavor': 'sqlite'}
>>> parse_uri(
... 'mssql+pyodbc://sa:supersecureSECRETPASSWORD123!@localhost:1439'
... + '/master?driver=ODBC+Driver+17+for+SQL+Server'
... )
{'host': 'localhost', 'database': 'master', 'username': 'sa',
'password': 'supersecureSECRETPASSWORD123!', 'port': 1439, 'flavor': 'mssql',
'driver': 'ODBC Driver 17 for SQL Server'}
>>>
Inherited Members
20class APIConnector(Connector): 21 """ 22 Connect to a Meerschaum API instance. 23 """ 24 25 IS_INSTANCE: bool = True 26 IS_THREAD_SAFE: bool = False 27 28 from ._request import ( 29 make_request, 30 get, 31 post, 32 put, 33 patch, 34 delete, 35 wget, 36 ) 37 from ._actions import get_actions, do_action 38 from ._misc import get_mrsm_version, get_chaining_status 39 from ._pipes import ( 40 register_pipe, 41 fetch_pipes_keys, 42 edit_pipe, 43 sync_pipe, 44 delete_pipe, 45 get_pipe_data, 46 get_pipe_id, 47 get_pipe_attributes, 48 get_sync_time, 49 pipe_exists, 50 create_metadata, 51 get_pipe_rowcount, 52 drop_pipe, 53 clear_pipe, 54 get_pipe_columns_types, 55 ) 56 from ._fetch import fetch 57 from ._plugins import ( 58 register_plugin, 59 install_plugin, 60 delete_plugin, 61 get_plugins, 62 get_plugin_attributes, 63 ) 64 from ._login import login, test_connection 65 from ._users import ( 66 register_user, 67 get_user_id, 68 get_users, 69 edit_user, 70 delete_user, 71 get_user_password_hash, 72 get_user_type, 73 get_user_attributes, 74 ) 75 from ._uri import from_uri 76 77 def __init__( 78 self, 79 label: Optional[str] = None, 80 wait: bool = False, 81 debug: bool = False, 82 **kw 83 ): 84 if 'uri' in kw: 85 from_uri_params = self.from_uri(kw['uri'], as_dict=True) 86 label = label or from_uri_params.get('label', None) 87 _ = from_uri_params.pop('label', None) 88 kw.update(from_uri_params) 89 90 super().__init__('api', label=label, **kw) 91 if 'protocol' not in self.__dict__: 92 self.protocol = ( 93 'https' if self.__dict__.get('uri', '').startswith('https') 94 else 'http' 95 ) 96 97 if 'uri' not in self.__dict__: 98 self.verify_attributes(required_attributes) 99 else: 100 from meerschaum.connectors.sql import SQLConnector 101 conn_attrs = SQLConnector.parse_uri(self.__dict__['uri']) 102 if 'host' not in conn_attrs: 103 raise Exception(f"Invalid URI for '{self}'.") 104 self.__dict__.update(conn_attrs) 105 106 self.url = ( 107 self.protocol + '://' + 108 self.host 109 + ( 110 (':' + str(self.port)) 111 if self.__dict__.get('port', None) 112 else '' 113 ) 114 ) 115 self._token = None 116 self._expires = None 117 self._session = None 118 119 120 @property 121 def URI(self) -> str: 122 """ 123 Return the fully qualified URI. 124 """ 125 username = self.__dict__.get('username', None) 126 password = self.__dict__.get('password', None) 127 creds = (username + ':' + password + '@') if username and password else '' 128 return ( 129 self.protocol 130 + '://' 131 + creds 132 + self.host 133 + ( 134 (':' + str(self.port)) 135 if self.__dict__.get('port', None) 136 else '' 137 ) 138 ) 139 140 141 @property 142 def session(self): 143 if self._session is None: 144 requests = attempt_import('requests') 145 if requests: 146 self._session = requests.Session() 147 if self._session is None: 148 error(f"Failed to import requests. Is requests installed?") 149 return self._session 150 151 @property 152 def token(self): 153 expired = ( 154 True if self._expires is None else ( 155 ( 156 self._expires 157 < 158 datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=1) 159 ) 160 ) 161 ) 162 163 if self._token is None or expired: 164 success, msg = self.login() 165 if not success: 166 warn(msg, stack=False) 167 return self._token
Connect to a Meerschaum API instance.
77 def __init__( 78 self, 79 label: Optional[str] = None, 80 wait: bool = False, 81 debug: bool = False, 82 **kw 83 ): 84 if 'uri' in kw: 85 from_uri_params = self.from_uri(kw['uri'], as_dict=True) 86 label = label or from_uri_params.get('label', None) 87 _ = from_uri_params.pop('label', None) 88 kw.update(from_uri_params) 89 90 super().__init__('api', label=label, **kw) 91 if 'protocol' not in self.__dict__: 92 self.protocol = ( 93 'https' if self.__dict__.get('uri', '').startswith('https') 94 else 'http' 95 ) 96 97 if 'uri' not in self.__dict__: 98 self.verify_attributes(required_attributes) 99 else: 100 from meerschaum.connectors.sql import SQLConnector 101 conn_attrs = SQLConnector.parse_uri(self.__dict__['uri']) 102 if 'host' not in conn_attrs: 103 raise Exception(f"Invalid URI for '{self}'.") 104 self.__dict__.update(conn_attrs) 105 106 self.url = ( 107 self.protocol + '://' + 108 self.host 109 + ( 110 (':' + str(self.port)) 111 if self.__dict__.get('port', None) 112 else '' 113 ) 114 ) 115 self._token = None 116 self._expires = None 117 self._session = None
120 @property 121 def URI(self) -> str: 122 """ 123 Return the fully qualified URI. 124 """ 125 username = self.__dict__.get('username', None) 126 password = self.__dict__.get('password', None) 127 creds = (username + ':' + password + '@') if username and password else '' 128 return ( 129 self.protocol 130 + '://' 131 + creds 132 + self.host 133 + ( 134 (':' + str(self.port)) 135 if self.__dict__.get('port', None) 136 else '' 137 ) 138 )
Return the fully qualified URI.
151 @property 152 def token(self): 153 expired = ( 154 True if self._expires is None else ( 155 ( 156 self._expires 157 < 158 datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(minutes=1) 159 ) 160 ) 161 ) 162 163 if self._token is None or expired: 164 success, msg = self.login() 165 if not success: 166 warn(msg, stack=False) 167 return self._token
28def make_request( 29 self, 30 method: str, 31 r_url: str, 32 headers: Optional[Dict[str, Any]] = None, 33 use_token: bool = True, 34 debug: bool = False, 35 **kwargs: Any 36) -> 'requests.Response': 37 """ 38 Make a request to this APIConnector's endpoint using the in-memory session. 39 40 Parameters 41 ---------- 42 method: str 43 The kind of request to make. 44 Accepted values: 45 - `'GET'` 46 - `'OPTIONS'` 47 - `'HEAD'` 48 - `'POST'` 49 - `'PUT'` 50 - `'PATCH'` 51 - `'DELETE'` 52 53 r_url: str 54 The relative URL for the endpoint (e.g. `'/pipes'`). 55 56 headers: Optional[Dict[str, Any]], default None 57 The headers to use for the request. 58 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 59 60 use_token: bool, default True 61 If `True`, add the authorization token to the headers. 62 63 debug: bool, default False 64 Verbosity toggle. 65 66 kwargs: Any 67 All other keyword arguments are passed to `requests.request`. 68 69 Returns 70 ------- 71 A `requests.Reponse` object. 72 """ 73 if method.upper() not in METHODS: 74 raise ValueError(f"Method '{method}' is not supported.") 75 76 verify = self.__dict__.get('verify', None) 77 if 'verify' not in kwargs and isinstance(verify, bool): 78 kwargs['verify'] = verify 79 80 headers = ( 81 copy.deepcopy(headers) 82 if isinstance(headers, dict) 83 else {} 84 ) 85 86 if use_token: 87 headers.update({'Authorization': f'Bearer {self.token}'}) 88 89 if 'timeout' not in kwargs: 90 kwargs['timeout'] = STATIC_CONFIG['api']['default_timeout'] 91 92 request_url = urllib.parse.urljoin(self.url, r_url) 93 if debug: 94 dprint(f"[{self}] Sending a '{method.upper()}' request to {request_url}") 95 96 return self.session.request( 97 method.upper(), 98 request_url, 99 headers = headers, 100 **kwargs 101 )
Make a request to this APIConnector's endpoint using the in-memory session.
Parameters
- method (str):
The kind of request to make.
Accepted values:
'GET'
'OPTIONS'
'HEAD'
'POST'
'PUT'
'PATCH'
'DELETE'
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'
). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_token
isTrue
, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True
, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request
.
Returns
- A
requests.Reponse
object.
104def get(self, r_url: str, **kwargs: Any) -> 'requests.Response': 105 """ 106 Wrapper for `requests.get`. 107 108 Parameters 109 ---------- 110 r_url: str 111 The relative URL for the endpoint (e.g. `'/pipes'`). 112 113 headers: Optional[Dict[str, Any]], default None 114 The headers to use for the request. 115 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 116 117 use_token: bool, default True 118 If `True`, add the authorization token to the headers. 119 120 debug: bool, default False 121 Verbosity toggle. 122 123 kwargs: Any 124 All other keyword arguments are passed to `requests.request`. 125 126 Returns 127 ------- 128 A `requests.Reponse` object. 129 130 """ 131 return self.make_request('GET', r_url, **kwargs)
Wrapper for requests.get
.
Parameters
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'
). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_token
isTrue
, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True
, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request
.
Returns
- A
requests.Reponse
object.
134def post(self, r_url: str, **kwargs: Any) -> 'requests.Response': 135 """ 136 Wrapper for `requests.post`. 137 138 Parameters 139 ---------- 140 r_url: str 141 The relative URL for the endpoint (e.g. `'/pipes'`). 142 143 headers: Optional[Dict[str, Any]], default None 144 The headers to use for the request. 145 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 146 147 use_token: bool, default True 148 If `True`, add the authorization token to the headers. 149 150 debug: bool, default False 151 Verbosity toggle. 152 153 kwargs: Any 154 All other keyword arguments are passed to `requests.request`. 155 156 Returns 157 ------- 158 A `requests.Reponse` object. 159 160 """ 161 return self.make_request('POST', r_url, **kwargs)
Wrapper for requests.post
.
Parameters
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'
). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_token
isTrue
, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True
, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request
.
Returns
- A
requests.Reponse
object.
193def put(self, r_url: str, **kwargs: Any) -> 'requests.Response': 194 """ 195 Wrapper for `requests.put`. 196 197 Parameters 198 ---------- 199 r_url: str 200 The relative URL for the endpoint (e.g. `'/pipes'`). 201 202 headers: Optional[Dict[str, Any]], default None 203 The headers to use for the request. 204 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 205 206 use_token: bool, default True 207 If `True`, add the authorization token to the headers. 208 209 debug: bool, default False 210 Verbosity toggle. 211 212 kwargs: Any 213 All other keyword arguments are passed to `requests.request`. 214 215 Returns 216 ------- 217 A `requests.Reponse` object. 218 """ 219 return self.make_request('PUT', r_url, **kwargs)
Wrapper for requests.put
.
Parameters
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'
). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_token
isTrue
, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True
, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request
.
Returns
- A
requests.Reponse
object.
164def patch(self, r_url: str, **kwargs: Any) -> 'requests.Response': 165 """ 166 Wrapper for `requests.patch`. 167 168 Parameters 169 ---------- 170 r_url: str 171 The relative URL for the endpoint (e.g. `'/pipes'`). 172 173 headers: Optional[Dict[str, Any]], default None 174 The headers to use for the request. 175 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 176 177 use_token: bool, default True 178 If `True`, add the authorization token to the headers. 179 180 debug: bool, default False 181 Verbosity toggle. 182 183 kwargs: Any 184 All other keyword arguments are passed to `requests.request`. 185 186 Returns 187 ------- 188 A `requests.Reponse` object. 189 """ 190 return self.make_request('PATCH', r_url, **kwargs)
Wrapper for requests.patch
.
Parameters
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'
). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_token
isTrue
, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True
, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request
.
Returns
- A
requests.Reponse
object.
222def delete(self, r_url: str, **kwargs: Any) -> 'requests.Response': 223 """ 224 Wrapper for `requests.delete`. 225 226 Parameters 227 ---------- 228 r_url: str 229 The relative URL for the endpoint (e.g. `'/pipes'`). 230 231 headers: Optional[Dict[str, Any]], default None 232 The headers to use for the request. 233 If `use_token` is `True`, the authorization token will be added to a copy of these headers. 234 235 use_token: bool, default True 236 If `True`, add the authorization token to the headers. 237 238 debug: bool, default False 239 Verbosity toggle. 240 241 kwargs: Any 242 All other keyword arguments are passed to `requests.request`. 243 244 Returns 245 ------- 246 A `requests.Reponse` object. 247 """ 248 return self.make_request('DELETE', r_url, **kwargs)
Wrapper for requests.delete
.
Parameters
- r_url (str):
The relative URL for the endpoint (e.g.
'/pipes'
). - headers (Optional[Dict[str, Any]], default None):
The headers to use for the request.
If
use_token
isTrue
, the authorization token will be added to a copy of these headers. - use_token (bool, default True):
If
True
, add the authorization token to the headers. - debug (bool, default False): Verbosity toggle.
- kwargs (Any):
All other keyword arguments are passed to
requests.request
.
Returns
- A
requests.Reponse
object.
251def wget( 252 self, 253 r_url: str, 254 dest: Optional[Union[str, pathlib.Path]] = None, 255 headers: Optional[Dict[str, Any]] = None, 256 use_token: bool = True, 257 debug: bool = False, 258 **kw: Any 259 ) -> pathlib.Path: 260 """Mimic wget with requests. 261 """ 262 from meerschaum.utils.misc import wget 263 if headers is None: 264 headers = {} 265 if use_token: 266 headers.update({'Authorization': f'Bearer {self.token}'}) 267 request_url = urllib.parse.urljoin(self.url, r_url) 268 if debug: 269 dprint( 270 f"[{self}] Downloading {request_url}" 271 + (f' to {dest}' if dest is not None else '') 272 + "..." 273 ) 274 return wget(request_url, dest=dest, headers=headers, **kw)
Mimic wget with requests.
13def get_actions(self) -> list: 14 """Get available actions from the API instance.""" 15 from meerschaum.config.static import STATIC_CONFIG 16 return self.get(STATIC_CONFIG['api']['endpoints']['actions'])
Get available actions from the API instance.
19def do_action( 20 self, 21 action: Optional[List[str]] = None, 22 sysargs: Optional[List[str]] = None, 23 debug: bool = False, 24 **kw 25) -> SuccessTuple: 26 """Execute a Meerschaum action remotely. 27 28 If `sysargs` are provided, parse those instead. 29 Otherwise infer everything from keyword arguments. 30 31 Examples 32 -------- 33 >>> conn = mrsm.get_connector('api:main') 34 >>> conn.do_action(['show', 'pipes']) 35 (True, "Success") 36 >>> conn.do_action(['show', 'arguments'], name='test') 37 (True, "Success") 38 """ 39 import sys, json 40 from meerschaum.utils.debug import dprint 41 from meerschaum.config.static import STATIC_CONFIG 42 from meerschaum.utils.misc import json_serialize_datetime 43 if action is None: 44 action = [] 45 46 if sysargs is not None and action and action[0] == '': 47 from meerschaum._internal.arguments import parse_arguments 48 if debug: 49 dprint(f"Parsing sysargs:\n{sysargs}") 50 json_dict = parse_arguments(sysargs) 51 else: 52 json_dict = kw 53 json_dict['action'] = action 54 if 'noask' not in kw: 55 json_dict['noask'] = True 56 if 'yes' not in kw: 57 json_dict['yes'] = True 58 if debug: 59 json_dict['debug'] = debug 60 61 root_action = json_dict['action'][0] 62 del json_dict['action'][0] 63 r_url = f"{STATIC_CONFIG['api']['endpoints']['actions']}/{root_action}" 64 65 if debug: 66 from meerschaum.utils.formatting import pprint 67 dprint(f"Sending data to '{self.url + r_url}':") 68 pprint(json_dict, stream=sys.stderr) 69 70 response = self.post( 71 r_url, 72 data = json.dumps(json_dict, default=json_serialize_datetime), 73 debug = debug, 74 ) 75 try: 76 response_list = json.loads(response.text) 77 if isinstance(response_list, dict) and 'detail' in response_list: 78 return False, response_list['detail'] 79 except Exception as e: 80 print(f"Invalid response: {response}") 81 print(e) 82 return False, response.text 83 if debug: 84 dprint(response) 85 try: 86 return response_list[0], response_list[1] 87 except Exception as e: 88 return False, f"Failed to parse result from action '{root_action}'"
Execute a Meerschaum action remotely.
If sysargs
are provided, parse those instead.
Otherwise infer everything from keyword arguments.
Examples
>>> conn = mrsm.get_connector('api:main')
>>> conn.do_action(['show', 'pipes'])
(True, "Success")
>>> conn.do_action(['show', 'arguments'], name='test')
(True, "Success")
13def get_mrsm_version(self, **kw) -> Optional[str]: 14 """ 15 Return the Meerschaum version of the API instance. 16 """ 17 from meerschaum.config.static import STATIC_CONFIG 18 try: 19 j = self.get( 20 STATIC_CONFIG['api']['endpoints']['version'] + '/mrsm', 21 use_token=False, 22 **kw 23 ).json() 24 except Exception as e: 25 return None 26 if isinstance(j, dict) and 'detail' in j: 27 return None 28 return j
Return the Meerschaum version of the API instance.
30def get_chaining_status(self, **kw) -> Optional[bool]: 31 """ 32 Fetch the chaining status of the API instance. 33 """ 34 from meerschaum.config.static import STATIC_CONFIG 35 try: 36 response = self.get( 37 STATIC_CONFIG['api']['endpoints']['chaining'], 38 use_token = True, 39 **kw 40 ) 41 if not response: 42 return None 43 except Exception as e: 44 return None 45 46 return response.json()
Fetch the chaining status of the API instance.
33def register_pipe( 34 self, 35 pipe: mrsm.Pipe, 36 debug: bool = False 37 ) -> SuccessTuple: 38 """Submit a POST to the API to register a new Pipe object. 39 Returns a tuple of (success_bool, response_dict). 40 """ 41 from meerschaum.utils.debug import dprint 42 from meerschaum.config.static import STATIC_CONFIG 43 ### NOTE: if `parameters` is supplied in the Pipe constructor, 44 ### then `pipe.parameters` will exist and not be fetched from the database. 45 r_url = pipe_r_url(pipe) 46 response = self.post( 47 r_url + '/register', 48 json = pipe.parameters, 49 debug = debug, 50 ) 51 if debug: 52 dprint(response.text) 53 if isinstance(response.json(), list): 54 response_tuple = response.__bool__(), response.json()[1] 55 elif 'detail' in response.json(): 56 response_tuple = response.__bool__(), response.json()['detail'] 57 else: 58 response_tuple = response.__bool__(), response.text 59 return response_tuple
Submit a POST to the API to register a new Pipe object. Returns a tuple of (success_bool, response_dict).
92def fetch_pipes_keys( 93 self, 94 connector_keys: Optional[List[str]] = None, 95 metric_keys: Optional[List[str]] = None, 96 location_keys: Optional[List[str]] = None, 97 tags: Optional[List[str]] = None, 98 params: Optional[Dict[str, Any]] = None, 99 debug: bool = False 100 ) -> Union[List[Tuple[str, str, Union[str, None]]]]: 101 """ 102 Fetch registered Pipes' keys from the API. 103 104 Parameters 105 ---------- 106 connector_keys: Optional[List[str]], default None 107 The connector keys for the query. 108 109 metric_keys: Optional[List[str]], default None 110 The metric keys for the query. 111 112 location_keys: Optional[List[str]], default None 113 The location keys for the query. 114 115 tags: Optional[List[str]], default None 116 A list of tags for the query. 117 118 params: Optional[Dict[str, Any]], default None 119 A parameters dictionary for filtering against the `pipes` table 120 (e.g. `{'connector_keys': 'plugin:foo'}`). 121 Not recommeded to be used. 122 123 debug: bool, default False 124 Verbosity toggle. 125 126 Returns 127 ------- 128 A list of tuples containing pipes' keys. 129 """ 130 from meerschaum.config.static import STATIC_CONFIG 131 if connector_keys is None: 132 connector_keys = [] 133 if metric_keys is None: 134 metric_keys = [] 135 if location_keys is None: 136 location_keys = [] 137 if tags is None: 138 tags = [] 139 140 r_url = STATIC_CONFIG['api']['endpoints']['pipes'] + '/keys' 141 try: 142 j = self.get( 143 r_url, 144 params = { 145 'connector_keys': json.dumps(connector_keys), 146 'metric_keys': json.dumps(metric_keys), 147 'location_keys': json.dumps(location_keys), 148 'tags': json.dumps(tags), 149 'params': json.dumps(params), 150 }, 151 debug=debug 152 ).json() 153 except Exception as e: 154 error(str(e)) 155 156 if 'detail' in j: 157 error(j['detail'], stack=False) 158 return [tuple(r) for r in j]
Fetch registered Pipes' keys from the API.
Parameters
- connector_keys (Optional[List[str]], default None): The connector keys for the query.
- metric_keys (Optional[List[str]], default None): The metric keys for the query.
- location_keys (Optional[List[str]], default None): The location keys for the query.
- tags (Optional[List[str]], default None): A list of tags for the query.
- params (Optional[Dict[str, Any]], default None):
A parameters dictionary for filtering against the
pipes
table (e.g.{'connector_keys': 'plugin:foo'}
). Not recommeded to be used. - debug (bool, default False): Verbosity toggle.
Returns
- A list of tuples containing pipes' keys.
62def edit_pipe( 63 self, 64 pipe: mrsm.Pipe, 65 patch: bool = False, 66 debug: bool = False, 67 ) -> SuccessTuple: 68 """Submit a PATCH to the API to edit an existing Pipe object. 69 Returns a tuple of (success_bool, response_dict). 70 """ 71 from meerschaum.utils.debug import dprint 72 ### NOTE: if `parameters` is supplied in the Pipe constructor, 73 ### then `pipe.parameters` will exist and not be fetched from the database. 74 r_url = pipe_r_url(pipe) 75 response = self.patch( 76 r_url + '/edit', 77 params = {'patch': patch,}, 78 json = pipe.parameters, 79 debug = debug, 80 ) 81 if debug: 82 dprint(response.text) 83 if isinstance(response.json(), list): 84 response_tuple = response.__bool__(), response.json()[1] 85 elif 'detail' in response.json(): 86 response_tuple = response.__bool__(), response.json()['detail'] 87 else: 88 response_tuple = response.__bool__(), response.text 89 return response_tuple
Submit a PATCH to the API to edit an existing Pipe object. Returns a tuple of (success_bool, response_dict).
161def sync_pipe( 162 self, 163 pipe: mrsm.Pipe, 164 df: Optional[Union['pd.DataFrame', Dict[Any, Any], str]] = None, 165 chunksize: Optional[int] = -1, 166 debug: bool = False, 167 **kw: Any 168 ) -> SuccessTuple: 169 """Sync a DataFrame into a Pipe.""" 170 from decimal import Decimal 171 from meerschaum.utils.debug import dprint 172 from meerschaum.utils.misc import json_serialize_datetime, items_str 173 from meerschaum.config import get_config 174 from meerschaum.utils.packages import attempt_import, import_pandas 175 from meerschaum.utils.dataframe import get_numeric_cols 176 begin = time.time() 177 more_itertools = attempt_import('more_itertools') 178 if df is None: 179 msg = f"DataFrame is `None`. Cannot sync {pipe}." 180 return False, msg 181 182 def get_json_str(c): 183 ### allow syncing dict or JSON without needing to import pandas (for IOT devices) 184 if isinstance(c, (dict, list)): 185 return json.dumps(c, default=json_serialize_datetime) 186 pd = import_pandas() 187 return c.fillna(pd.NA).to_json(date_format='iso', date_unit='ns') 188 189 df = json.loads(df) if isinstance(df, str) else df 190 191 _chunksize: Optional[int] = (1 if chunksize is None else ( 192 get_config('system', 'connectors', 'sql', 'chunksize') if chunksize == -1 193 else chunksize 194 )) 195 keys: List[str] = list(df.columns) 196 chunks = [] 197 if hasattr(df, 'index'): 198 df = df.reset_index(drop=True) 199 is_dask = 'dask' in df.__module__ 200 chunks = ( 201 (df.iloc[i] for i in more_itertools.chunked(df.index, _chunksize)) 202 if not is_dask 203 else [partition.compute() for partition in df.partitions] 204 ) 205 numeric_cols = get_numeric_cols(df) 206 if numeric_cols: 207 for col in numeric_cols: 208 df[col] = df[col].apply(lambda x: f'{x:f}' if isinstance(x, Decimal) else x) 209 pipe_dtypes = pipe.dtypes 210 new_numeric_cols = [ 211 col 212 for col in numeric_cols 213 if pipe_dtypes.get(col, None) != 'numeric' 214 ] 215 pipe.dtypes.update({ 216 col: 'numeric' 217 for col in new_numeric_cols 218 }) 219 edit_success, edit_msg = pipe.edit(debug=debug) 220 if not edit_success: 221 warn( 222 "Failed to update new numeric columns " 223 + f"{items_str(new_numeric_cols)}:\n{edit_msg}" 224 ) 225 elif isinstance(df, dict): 226 ### `_chunks` is a dict of lists of dicts. 227 ### e.g. {'a' : [ {'a':[1, 2]}, {'a':[3, 4]} ] } 228 _chunks = {k: [] for k in keys} 229 for k in keys: 230 chunk_iter = more_itertools.chunked(df[k], _chunksize) 231 for l in chunk_iter: 232 _chunks[k].append({k: l}) 233 234 ### `chunks` is a list of dicts (e.g. orient by rows in pandas JSON). 235 for k, l in _chunks.items(): 236 for i, c in enumerate(l): 237 try: 238 chunks[i].update(c) 239 except IndexError: 240 chunks.append(c) 241 elif isinstance(df, list): 242 chunks = (df[i] for i in more_itertools.chunked(df, _chunksize)) 243 244 ### Send columns in case the user has defined them locally. 245 if pipe.columns: 246 kw['columns'] = json.dumps(pipe.columns) 247 r_url = pipe_r_url(pipe) + '/data' 248 249 rowcount = 0 250 num_success_chunks = 0 251 for i, c in enumerate(chunks): 252 if debug: 253 dprint(f"[{self}] Posting chunk {i} to {r_url}...") 254 if len(c) == 0: 255 if debug: 256 dprint(f"[{self}] Skipping empty chunk...") 257 continue 258 json_str = get_json_str(c) 259 260 try: 261 response = self.post( 262 r_url, 263 ### handles check_existing 264 params = kw, 265 data = json_str, 266 debug = debug 267 ) 268 except Exception as e: 269 msg = f"Failed to post a chunk to {pipe}:\n{e}" 270 warn(msg) 271 return False, msg 272 273 if not response: 274 return False, f"Failed to sync a chunk:\n{response.text}" 275 276 try: 277 j = json.loads(response.text) 278 except Exception as e: 279 return False, f"Failed to parse response from syncing {pipe}:\n{e}" 280 281 if isinstance(j, dict) and 'detail' in j: 282 return False, j['detail'] 283 284 try: 285 j = tuple(j) 286 except Exception as e: 287 return False, response.text 288 289 if debug: 290 dprint("Received response: " + str(j)) 291 if not j[0]: 292 return j 293 294 rowcount += len(c) 295 num_success_chunks += 1 296 297 success_tuple = True, ( 298 f"It took {round(time.time() - begin, 2)} seconds to sync {rowcount} row" 299 + ('s' if rowcount != 1 else '') 300 + f" across {num_success_chunks} chunk" + ('s' if num_success_chunks != 1 else '') + 301 f" to {pipe}." 302 ) 303 return success_tuple
Sync a DataFrame into a Pipe.
306def delete_pipe( 307 self, 308 pipe: Optional[meerschaum.Pipe] = None, 309 debug: bool = None, 310 ) -> SuccessTuple: 311 """Delete a Pipe and drop its table.""" 312 if pipe is None: 313 error(f"Pipe cannot be None.") 314 r_url = pipe_r_url(pipe) 315 response = self.delete( 316 r_url + '/delete', 317 debug = debug, 318 ) 319 if debug: 320 dprint(response.text) 321 if isinstance(response.json(), list): 322 response_tuple = response.__bool__(), response.json()[1] 323 elif 'detail' in response.json(): 324 response_tuple = response.__bool__(), response.json()['detail'] 325 else: 326 response_tuple = response.__bool__(), response.text 327 return response_tuple
Delete a Pipe and drop its table.
330def get_pipe_data( 331 self, 332 pipe: meerschaum.Pipe, 333 select_columns: Optional[List[str]] = None, 334 omit_columns: Optional[List[str]] = None, 335 begin: Union[str, datetime, int, None] = None, 336 end: Union[str, datetime, int, None] = None, 337 params: Optional[Dict[str, Any]] = None, 338 as_chunks: bool = False, 339 debug: bool = False, 340 **kw: Any 341 ) -> Union[pandas.DataFrame, None]: 342 """Fetch data from the API.""" 343 r_url = pipe_r_url(pipe) 344 chunks_list = [] 345 while True: 346 try: 347 response = self.get( 348 r_url + "/data", 349 params = { 350 'select_columns': json.dumps(select_columns), 351 'omit_columns': json.dumps(omit_columns), 352 'begin': begin, 353 'end': end, 354 'params': json.dumps(params) 355 }, 356 debug = debug 357 ) 358 if not response.ok: 359 return None 360 j = response.json() 361 except Exception as e: 362 warn(f"Failed to get data for {pipe}:\n{e}") 363 return None 364 if isinstance(j, dict) and 'detail' in j: 365 return False, j['detail'] 366 break 367 368 from meerschaum.utils.packages import import_pandas 369 from meerschaum.utils.dataframe import parse_df_datetimes, add_missing_cols_to_df 370 pd = import_pandas() 371 try: 372 df = pd.read_json(StringIO(response.text)) 373 except Exception as e: 374 warn(f"Failed to parse response for {pipe}:\n{e}") 375 return None 376 377 if len(df.columns) == 0: 378 return add_missing_cols_to_df(df, pipe.dtypes) 379 380 df = parse_df_datetimes( 381 df, 382 ignore_cols = [ 383 col 384 for col, dtype in pipe.dtypes.items() 385 if 'datetime' not in str(dtype) 386 ], 387 debug = debug, 388 ) 389 return df
Fetch data from the API.
392def get_pipe_id( 393 self, 394 pipe: meerschuam.Pipe, 395 debug: bool = False, 396 ) -> int: 397 """Get a Pipe's ID from the API.""" 398 from meerschaum.utils.misc import is_int 399 r_url = pipe_r_url(pipe) 400 response = self.get( 401 r_url + '/id', 402 debug = debug 403 ) 404 if debug: 405 dprint(f"Got pipe ID: {response.text}") 406 try: 407 if is_int(response.text): 408 return int(response.text) 409 except Exception as e: 410 warn(f"Failed to get the ID for {pipe}:\n{e}") 411 return None
Get a Pipe's ID from the API.
414def get_pipe_attributes( 415 self, 416 pipe: meerschaum.Pipe, 417 debug: bool = False, 418 ) -> Dict[str, Any]: 419 """Get a Pipe's attributes from the API 420 421 Parameters 422 ---------- 423 pipe: meerschaum.Pipe 424 The pipe whose attributes we are fetching. 425 426 Returns 427 ------- 428 A dictionary of a pipe's attributes. 429 If the pipe does not exist, return an empty dictionary. 430 """ 431 r_url = pipe_r_url(pipe) 432 response = self.get(r_url + '/attributes', debug=debug) 433 try: 434 return json.loads(response.text) 435 except Exception as e: 436 warn(f"Failed to get the attributes for {pipe}:\n{e}") 437 return {}
Get a Pipe's attributes from the API
Parameters
- pipe (meerschaum.Pipe): The pipe whose attributes we are fetching.
Returns
- A dictionary of a pipe's attributes.
- If the pipe does not exist, return an empty dictionary.
440def get_sync_time( 441 self, 442 pipe: 'meerschaum.Pipe', 443 params: Optional[Dict[str, Any]] = None, 444 newest: bool = True, 445 debug: bool = False, 446 ) -> Union[datetime, int, None]: 447 """Get a Pipe's most recent datetime value from the API. 448 449 Parameters 450 ---------- 451 pipe: meerschaum.Pipe 452 The pipe to select from. 453 454 params: Optional[Dict[str, Any]], default None 455 Optional params dictionary to build the WHERE clause. 456 457 newest: bool, default True 458 If `True`, get the most recent datetime (honoring `params`). 459 If `False`, get the oldest datetime (ASC instead of DESC). 460 461 Returns 462 ------- 463 The most recent (or oldest if `newest` is `False`) datetime of a pipe, 464 rounded down to the closest minute. 465 """ 466 from meerschaum.utils.misc import is_int 467 from meerschaum.utils.warnings import warn 468 r_url = pipe_r_url(pipe) 469 response = self.get( 470 r_url + '/sync_time', 471 json = params, 472 params = {'newest': newest, 'debug': debug}, 473 debug = debug, 474 ) 475 if not response: 476 warn(f"Failed to get the sync time for {pipe}:\n" + response.text) 477 return None 478 479 j = response.json() 480 if j is None: 481 dt = None 482 else: 483 try: 484 dt = ( 485 datetime.fromisoformat(j) 486 if not is_int(j) 487 else int(j) 488 ) 489 except Exception as e: 490 warn(f"Failed to parse the sync time '{j}' for {pipe}:\n{e}") 491 dt = None 492 return dt
Get a Pipe's most recent datetime value from the API.
Parameters
- pipe (meerschaum.Pipe): The pipe to select from.
- params (Optional[Dict[str, Any]], default None): Optional params dictionary to build the WHERE clause.
- newest (bool, default True):
If
True
, get the most recent datetime (honoringparams
). IfFalse
, get the oldest datetime (ASC instead of DESC).
Returns
- The most recent (or oldest if
newest
isFalse
) datetime of a pipe, - rounded down to the closest minute.
495def pipe_exists( 496 self, 497 pipe: 'meerschaum.Pipe', 498 debug: bool = False 499 ) -> bool: 500 """Check the API to see if a Pipe exists. 501 502 Parameters 503 ---------- 504 pipe: 'meerschaum.Pipe' 505 The pipe which were are querying. 506 507 Returns 508 ------- 509 A bool indicating whether a pipe's underlying table exists. 510 """ 511 from meerschaum.utils.debug import dprint 512 from meerschaum.utils.warnings import warn 513 r_url = pipe_r_url(pipe) 514 response = self.get(r_url + '/exists', debug=debug) 515 if not response: 516 warn(f"Failed to check if {pipe} exists:\n{response.text}") 517 return False 518 if debug: 519 dprint("Received response: " + str(response.text)) 520 j = response.json() 521 if isinstance(j, dict) and 'detail' in j: 522 warn(j['detail']) 523 return j
Check the API to see if a Pipe exists.
Parameters
- pipe ('meerschaum.Pipe'): The pipe which were are querying.
Returns
- A bool indicating whether a pipe's underlying table exists.
526def create_metadata( 527 self, 528 debug: bool = False 529 ) -> bool: 530 """Create metadata tables. 531 532 Returns 533 ------- 534 A bool indicating success. 535 """ 536 from meerschaum.utils.debug import dprint 537 from meerschaum.config.static import STATIC_CONFIG 538 r_url = STATIC_CONFIG['api']['endpoints']['metadata'] 539 response = self.post(r_url, debug=debug) 540 if debug: 541 dprint("Create metadata response: {response.text}") 542 try: 543 metadata_response = json.loads(response.text) 544 except Exception as e: 545 warn(f"Failed to create metadata on {self}:\n{e}") 546 metadata_response = False 547 return False
Create metadata tables.
Returns
- A bool indicating success.
550def get_pipe_rowcount( 551 self, 552 pipe: 'meerschaum.Pipe', 553 begin: Optional[datetime] = None, 554 end: Optional[datetime] = None, 555 params: Optional[Dict[str, Any]] = None, 556 remote: bool = False, 557 debug: bool = False, 558 ) -> int: 559 """Get a pipe's row count from the API. 560 561 Parameters 562 ---------- 563 pipe: 'meerschaum.Pipe': 564 The pipe whose row count we are counting. 565 566 begin: Optional[datetime], default None 567 If provided, bound the count by this datetime. 568 569 end: Optional[datetime] 570 If provided, bound the count by this datetime. 571 572 params: Optional[Dict[str, Any]], default None 573 If provided, bound the count by these parameters. 574 575 remote: bool, default False 576 577 Returns 578 ------- 579 The number of rows in the pipe's table, bound the given parameters. 580 If the table does not exist, return 0. 581 """ 582 r_url = pipe_r_url(pipe) 583 response = self.get( 584 r_url + "/rowcount", 585 json = params, 586 params = { 587 'begin': begin, 588 'end': end, 589 'remote': remote, 590 }, 591 debug = debug 592 ) 593 if not response: 594 warn(f"Failed to get the rowcount for {pipe}:\n{response.text}") 595 return 0 596 try: 597 return int(json.loads(response.text)) 598 except Exception as e: 599 warn(f"Failed to get the rowcount for {pipe}:\n{e}") 600 return 0
Get a pipe's row count from the API.
Parameters
- pipe ('meerschaum.Pipe':): The pipe whose row count we are counting.
- begin (Optional[datetime], default None): If provided, bound the count by this datetime.
- end (Optional[datetime]): If provided, bound the count by this datetime.
- params (Optional[Dict[str, Any]], default None): If provided, bound the count by these parameters.
- remote (bool, default False):
Returns
- The number of rows in the pipe's table, bound the given parameters.
- If the table does not exist, return 0.
603def drop_pipe( 604 self, 605 pipe: meerschaum.Pipe, 606 debug: bool = False 607 ) -> SuccessTuple: 608 """ 609 Drop a pipe's table but maintain its registration. 610 611 Parameters 612 ---------- 613 pipe: meerschaum.Pipe: 614 The pipe to be dropped. 615 616 Returns 617 ------- 618 A success tuple (bool, str). 619 """ 620 from meerschaum.utils.warnings import error 621 from meerschaum.utils.debug import dprint 622 if pipe is None: 623 error(f"Pipe cannot be None.") 624 r_url = pipe_r_url(pipe) 625 response = self.delete( 626 r_url + '/drop', 627 debug = debug, 628 ) 629 if debug: 630 dprint(response.text) 631 632 try: 633 data = response.json() 634 except Exception as e: 635 return False, f"Failed to drop {pipe}." 636 637 if isinstance(data, list): 638 response_tuple = response.__bool__(), data[1] 639 elif 'detail' in response.json(): 640 response_tuple = response.__bool__(), data['detail'] 641 else: 642 response_tuple = response.__bool__(), response.text 643 644 return response_tuple
Drop a pipe's table but maintain its registration.
Parameters
- pipe (meerschaum.Pipe:): The pipe to be dropped.
Returns
- A success tuple (bool, str).
647def clear_pipe( 648 self, 649 pipe: meerschaum.Pipe, 650 debug: bool = False, 651 **kw 652 ) -> SuccessTuple: 653 """ 654 Delete rows in a pipe's table. 655 656 Parameters 657 ---------- 658 pipe: meerschaum.Pipe 659 The pipe with rows to be deleted. 660 661 Returns 662 ------- 663 A success tuple. 664 """ 665 kw.pop('metric_keys', None) 666 kw.pop('connector_keys', None) 667 kw.pop('location_keys', None) 668 kw.pop('action', None) 669 kw.pop('force', None) 670 return self.do_action( 671 ['clear', 'pipes'], 672 connector_keys = pipe.connector_keys, 673 metric_keys = pipe.metric_key, 674 location_keys = pipe.location_key, 675 force = True, 676 debug = debug, 677 **kw 678 )
Delete rows in a pipe's table.
Parameters
- pipe (meerschaum.Pipe): The pipe with rows to be deleted.
Returns
- A success tuple.
681def get_pipe_columns_types( 682 self, 683 pipe: meerschaum.Pipe, 684 debug: bool = False, 685 ) -> Union[Dict[str, str], None]: 686 """ 687 Fetch the columns and types of the pipe's table. 688 689 Parameters 690 ---------- 691 pipe: meerschaum.Pipe 692 The pipe whose columns to be queried. 693 694 Returns 695 ------- 696 A dictionary mapping column names to their database types. 697 698 Examples 699 -------- 700 >>> { 701 ... 'dt': 'TIMESTAMP WITHOUT TIMEZONE', 702 ... 'id': 'BIGINT', 703 ... 'val': 'DOUBLE PRECISION', 704 ... } 705 >>> 706 """ 707 r_url = pipe_r_url(pipe) + '/columns/types' 708 response = self.get( 709 r_url, 710 debug = debug 711 ) 712 j = response.json() 713 if isinstance(j, dict) and 'detail' in j and len(j.keys()) == 1: 714 from meerschaum.utils.warnings import warn 715 warn(j['detail']) 716 return None 717 if not isinstance(j, dict): 718 warn(response.text) 719 return None 720 return j
Fetch the columns and types of the pipe's table.
Parameters
- pipe (meerschaum.Pipe): The pipe whose columns to be queried.
Returns
- A dictionary mapping column names to their database types.
Examples
>>> {
... 'dt': 'TIMESTAMP WITHOUT TIMEZONE',
... 'id': 'BIGINT',
... 'val': 'DOUBLE PRECISION',
... }
>>>
16def fetch( 17 self, 18 pipe: mrsm.Pipe, 19 begin: Union[datetime, str, int] = '', 20 end: Union[datetime, int] = None, 21 params: Optional[Dict, Any] = None, 22 debug: bool = False, 23 **kw: Any 24 ) -> Iterator['pd.DataFrame']: 25 """Get the Pipe data from the remote Pipe.""" 26 from meerschaum.utils.debug import dprint 27 from meerschaum.utils.warnings import warn, error 28 from meerschaum.config._patch import apply_patch_to_config 29 30 fetch_params = pipe.parameters.get('fetch', {}) 31 if not fetch_params: 32 warn(f"Missing 'fetch' parameters for {pipe}.", stack=False) 33 return None 34 35 pipe_meta = fetch_params.get('pipe', {}) 36 ### Legacy: check for `connector_keys`, etc. at the root. 37 if not pipe_meta: 38 ck, mk, lk = ( 39 fetch_params.get('connector_keys', None), 40 fetch_params.get('metric_key', None), 41 fetch_params.get('location_key', None), 42 ) 43 if not ck or not mk: 44 warn(f"Missing `fetch:pipe` keys for {pipe}.", stack=False) 45 return None 46 47 pipe_meta.update({ 48 'connector': ck, 49 'metric': mk, 50 'location': lk, 51 }) 52 53 pipe_meta['instance'] = self 54 source_pipe = mrsm.Pipe(**pipe_meta) 55 56 _params = copy.deepcopy(params) if params is not None else {} 57 _params = apply_patch_to_config(_params, fetch_params.get('params', {})) 58 select_columns = fetch_params.get('select_columns', []) 59 omit_columns = fetch_params.get('omit_columns', []) 60 61 return source_pipe.get_data( 62 select_columns = select_columns, 63 omit_columns = omit_columns, 64 begin = begin, 65 end = end, 66 params = _params, 67 debug = debug, 68 as_iterator = True, 69 )
Get the Pipe data from the remote Pipe.
20def register_plugin( 21 self, 22 plugin: meerschaum.core.Plugin, 23 make_archive: bool = True, 24 debug: bool = False, 25 ) -> SuccessTuple: 26 """Register a plugin and upload its archive.""" 27 import json 28 archive_path = plugin.make_tar(debug=debug) if make_archive else plugin.archive_path 29 file_pointer = open(archive_path, 'rb') 30 files = {'archive': file_pointer} 31 metadata = { 32 'version': plugin.version, 33 'attributes': json.dumps(plugin.attributes), 34 } 35 r_url = plugin_r_url(plugin) 36 try: 37 response = self.post(r_url, files=files, params=metadata, debug=debug) 38 except Exception as e: 39 return False, f"Failed to register plugin '{plugin}'." 40 finally: 41 file_pointer.close() 42 43 try: 44 success, msg = json.loads(response.text) 45 except Exception as e: 46 return False, response.text 47 48 return success, msg
Register a plugin and upload its archive.
50def install_plugin( 51 self, 52 name: str, 53 skip_deps: bool = False, 54 force: bool = False, 55 debug: bool = False 56 ) -> SuccessTuple: 57 """Download and attempt to install a plugin from the API.""" 58 import os, pathlib, json 59 from meerschaum.core import Plugin 60 from meerschaum.config._paths import PLUGINS_TEMP_RESOURCES_PATH 61 from meerschaum.utils.debug import dprint 62 from meerschaum.utils.packages import attempt_import 63 binaryornot_check = attempt_import('binaryornot.check', lazy=False) 64 r_url = plugin_r_url(name) 65 dest = pathlib.Path(os.path.join(PLUGINS_TEMP_RESOURCES_PATH, name + '.tar.gz')) 66 if debug: 67 dprint(f"Fetching from '{self.url + r_url}' to '{dest}'...") 68 archive_path = self.wget(r_url, dest, debug=debug) 69 is_binary = binaryornot_check.is_binary(str(archive_path)) 70 if not is_binary: 71 fail_msg = f"Failed to download binary for plugin '{name}'." 72 try: 73 with open(archive_path, 'r') as f: 74 j = json.load(f) 75 if isinstance(j, list): 76 success, msg = tuple(j) 77 elif isinstance(j, dict) and 'detail' in j: 78 success, msg = False, fail_msg 79 except Exception as e: 80 success, msg = False, fail_msg 81 return success, msg 82 plugin = Plugin(name, archive_path=archive_path, repo_connector=self) 83 return plugin.install(skip_deps=skip_deps, force=force, debug=debug)
Download and attempt to install a plugin from the API.
149def delete_plugin( 150 self, 151 plugin: meerschaum.core.Plugin, 152 debug: bool = False 153 ) -> SuccessTuple: 154 """Delete a plugin from an API repository.""" 155 import json 156 r_url = plugin_r_url(plugin) 157 try: 158 response = self.delete(r_url, debug=debug) 159 except Exception as e: 160 return False, f"Failed to delete plugin '{plugin}'." 161 162 try: 163 success, msg = json.loads(response.text) 164 except Exception as e: 165 return False, response.text 166 167 return success, msg
Delete a plugin from an API repository.
85def get_plugins( 86 self, 87 user_id : Optional[int] = None, 88 search_term : Optional[str] = None, 89 debug : bool = False 90 ) -> Sequence[str]: 91 """Return a list of registered plugin names. 92 93 Parameters 94 ---------- 95 user_id : 96 If specified, return all plugins from a certain user. 97 user_id : Optional[int] : 98 (Default value = None) 99 search_term : Optional[str] : 100 (Default value = None) 101 debug : bool : 102 (Default value = False) 103 104 Returns 105 ------- 106 107 """ 108 import json 109 from meerschaum.utils.warnings import warn, error 110 from meerschaum.config.static import STATIC_CONFIG 111 response = self.get( 112 STATIC_CONFIG['api']['endpoints']['plugins'], 113 params = {'user_id' : user_id, 'search_term' : search_term}, 114 use_token = True, 115 debug = debug 116 ) 117 if not response: 118 return [] 119 plugins = json.loads(response.text) 120 if not isinstance(plugins, list): 121 error(response.text) 122 return plugins
Return a list of registered plugin names.
Parameters
- user_id :: If specified, return all plugins from a certain user.
- user_id (Optional[int] :): (Default value = None)
- search_term (Optional[str] :): (Default value = None)
- debug (bool :): (Default value = False)
- Returns
- -------
124def get_plugin_attributes( 125 self, 126 plugin: meerschaum.core.Plugin, 127 debug: bool = False 128 ) -> Mapping[str, Any]: 129 """ 130 Return a plugin's attributes. 131 """ 132 import json 133 from meerschaum.utils.warnings import warn, error 134 r_url = plugin_r_url(plugin) + '/attributes' 135 response = self.get(r_url, use_token=True, debug=debug) 136 attributes = response.json() 137 if isinstance(attributes, str) and attributes and attributes[0] == '{': 138 try: 139 attributes = json.loads(attributes) 140 except Exception as e: 141 pass 142 if not isinstance(attributes, dict): 143 error(response.text) 144 elif not response and 'detail' in attributes: 145 warn(attributes['detail']) 146 return {} 147 return attributes
Return a plugin's attributes.
13def login( 14 self, 15 debug: bool = False, 16 warn: bool = True, 17 **kw: Any 18 ) -> SuccessTuple: 19 """Log in and set the session token.""" 20 from meerschaum.utils.warnings import warn as _warn, info, error 21 from meerschaum.core import User 22 from meerschaum.config.static import STATIC_CONFIG 23 import json, datetime 24 try: 25 login_data = { 26 'username': self.username, 27 'password': self.password, 28 } 29 except AttributeError: 30 return False, f"Please login with the command `login {self}`." 31 response = self.post( 32 STATIC_CONFIG['api']['endpoints']['login'], 33 data = login_data, 34 use_token = False, 35 debug = debug 36 ) 37 if response: 38 msg = f"Successfully logged into '{self}' as user '{login_data['username']}'." 39 self._token = json.loads(response.text)['access_token'] 40 self._expires = datetime.datetime.strptime( 41 json.loads(response.text)['expires'], 42 '%Y-%m-%dT%H:%M:%S.%f' 43 ) 44 else: 45 msg = ( 46 f"Failed to log into '{self}' as user '{login_data['username']}'.\n" + 47 f" Please verify login details for connector '{self}'." 48 ) 49 if warn: 50 _warn(msg, stack=False) 51 52 return response.__bool__(), msg
Log in and set the session token.
55def test_connection( 56 self, 57 **kw: Any 58 ) -> Union[bool, None]: 59 """Test if a successful connection to the API may be made.""" 60 from meerschaum.connectors.poll import retry_connect 61 _default_kw = { 62 'max_retries': 1, 'retry_wait': 0, 'warn': False, 63 'connector': self, 'enforce_chaining': False, 64 'enforce_login': False, 65 } 66 _default_kw.update(kw) 67 try: 68 return retry_connect(**_default_kw) 69 except Exception as e: 70 return False
Test if a successful connection to the API may be made.
65def register_user( 66 self, 67 user: 'meerschaum.core.User', 68 debug: bool = False, 69 **kw: Any 70 ) -> SuccessTuple: 71 """Register a new user.""" 72 import json 73 from meerschaum.config.static import STATIC_CONFIG 74 r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/register" 75 data = { 76 'username': user.username, 77 'password': user.password, 78 'attributes': json.dumps(user.attributes), 79 } 80 if user.type: 81 data['type'] = user.type 82 if user.email: 83 data['email'] = user.email 84 response = self.post(r_url, data=data, debug=debug) 85 try: 86 _json = json.loads(response.text) 87 if isinstance(_json, dict) and 'detail' in _json: 88 return False, _json['detail'] 89 success_tuple = tuple(_json) 90 except Exception: 91 msg = response.text if response else f"Failed to register user '{user}'." 92 return False, msg 93 94 return tuple(success_tuple)
Register a new user.
97def get_user_id( 98 self, 99 user: 'meerschaum.core.User', 100 debug: bool = False, 101 **kw: Any 102 ) -> Optional[int]: 103 """Get a user's ID.""" 104 from meerschaum.config.static import STATIC_CONFIG 105 import json 106 r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}/id" 107 response = self.get(r_url, debug=debug, **kw) 108 try: 109 user_id = int(json.loads(response.text)) 110 except Exception as e: 111 user_id = None 112 return user_id
Get a user's ID.
13def get_users( 14 self, 15 debug: bool = False, 16 **kw : Any 17 ) -> List[str]: 18 """ 19 Return a list of registered usernames. 20 """ 21 from meerschaum.config.static import STATIC_CONFIG 22 import json 23 response = self.get( 24 f"{STATIC_CONFIG['api']['endpoints']['users']}", 25 debug = debug, 26 use_token = True, 27 ) 28 if not response: 29 return [] 30 try: 31 return response.json() 32 except Exception as e: 33 return []
Return a list of registered usernames.
35def edit_user( 36 self, 37 user: 'meerschaum.core.User', 38 debug: bool = False, 39 **kw: Any 40 ) -> SuccessTuple: 41 """Edit an existing user.""" 42 import json 43 from meerschaum.config.static import STATIC_CONFIG 44 r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/edit" 45 data = { 46 'username': user.username, 47 'password': user.password, 48 'type': user.type, 49 'email': user.email, 50 'attributes': json.dumps(user.attributes), 51 } 52 response = self.post(r_url, data=data, debug=debug) 53 try: 54 _json = json.loads(response.text) 55 if isinstance(_json, dict) and 'detail' in _json: 56 return False, _json['detail'] 57 success_tuple = tuple(_json) 58 except Exception as e: 59 msg = response.text if response else f"Failed to edit user '{user}'." 60 return False, msg 61 62 return tuple(success_tuple)
Edit an existing user.
114def delete_user( 115 self, 116 user: 'meerschaum.core.User', 117 debug: bool = False, 118 **kw: Any 119 ) -> SuccessTuple: 120 """Delete a user.""" 121 from meerschaum.config.static import STATIC_CONFIG 122 import json 123 r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}" 124 response = self.delete(r_url, debug=debug) 125 try: 126 _json = json.loads(response.text) 127 if isinstance(_json, dict) and 'detail' in _json: 128 return False, _json['detail'] 129 success_tuple = tuple(_json) 130 except Exception as e: 131 success_tuple = False, f"Failed to delete user '{user.username}'." 132 return success_tuple
Delete a user.
155def get_user_password_hash( 156 self, 157 user: 'meerschaum.core.User', 158 debug: bool = False, 159 **kw: Any 160 ) -> Optional[str]: 161 """If configured, get a user's password hash.""" 162 from meerschaum.config.static import STATIC_CONFIG 163 r_url = STATIC_CONFIG['api']['endpoints']['users'] + '/' + user.username + '/password_hash' 164 response = self.get(r_url, debug=debug, **kw) 165 if not response: 166 return None 167 return response.json()
If configured, get a user's password hash.
169def get_user_type( 170 self, 171 user: 'meerschaum.core.User', 172 debug: bool = False, 173 **kw: Any 174 ) -> Optional[str]: 175 """If configured, get a user's type.""" 176 from meerschaum.config.static import STATIC_CONFIG 177 r_url = STATIC_CONFIG['api']['endpoints']['users'] + '/' + user.username + '/type' 178 response = self.get(r_url, debug=debug, **kw) 179 if not response: 180 return None 181 return response.json()
If configured, get a user's type.
134def get_user_attributes( 135 self, 136 user: 'meerschaum.core.User', 137 debug: bool = False, 138 **kw 139 ) -> int: 140 """Get a user's attributes.""" 141 from meerschaum.config.static import STATIC_CONFIG 142 import json 143 r_url = f"{STATIC_CONFIG['api']['endpoints']['users']}/{user.username}/attributes" 144 response = self.get(r_url, debug=debug, **kw) 145 try: 146 attributes = json.loads(response.text) 147 except Exception as e: 148 attributes = None 149 return attributes
Get a user's attributes.
13@classmethod 14def from_uri( 15 cls, 16 uri: str, 17 label: Optional[str] = None, 18 as_dict: bool = False, 19) -> Union[ 20 'meerschaum.connectors.APIConnector', 21 Dict[str, Union[str, int]], 22 ]: 23 """ 24 Create a new APIConnector from a URI string. 25 26 Parameters 27 ---------- 28 uri: str 29 The URI connection string. 30 31 label: Optional[str], default None 32 If provided, use this as the connector label. 33 Otherwise use the determined database name. 34 35 as_dict: bool, default False 36 If `True`, return a dictionary of the keyword arguments 37 necessary to create a new `APIConnector`, otherwise create a new object. 38 39 Returns 40 ------- 41 A new APIConnector object or a dictionary of attributes (if `as_dict` is `True`). 42 """ 43 from meerschaum.connectors.sql import SQLConnector 44 params = SQLConnector.parse_uri(uri) 45 if 'host' not in params: 46 error("No host was found in the provided URI.") 47 params['protocol'] = params.pop('flavor') 48 params['label'] = label or ( 49 ( 50 (params['username'] + '@' if 'username' in params else '') 51 + params['host'] 52 ).lower() 53 ) 54 55 return cls(**params) if not as_dict else params
Create a new APIConnector from a URI string.
Parameters
- uri (str): The URI connection string.
- label (Optional[str], default None): If provided, use this as the connector label. Otherwise use the determined database name.
- as_dict (bool, default False):
If
True
, return a dictionary of the keyword arguments necessary to create a newAPIConnector
, otherwise create a new object.
Returns
- A new APIConnector object or a dictionary of attributes (if
as_dict
isTrue
).
Inherited Members
73def get_connector( 74 type: str = None, 75 label: str = None, 76 refresh: bool = False, 77 debug: bool = False, 78 **kw: Any 79 ) -> Connector: 80 """ 81 Return existing connector or create new connection and store for reuse. 82 83 You can create new connectors if enough parameters are provided for the given type and flavor. 84 85 86 Parameters 87 ---------- 88 type: Optional[str], default None 89 Connector type (sql, api, etc.). 90 Defaults to the type of the configured `instance_connector`. 91 92 label: Optional[str], default None 93 Connector label (e.g. main). Defaults to `'main'`. 94 95 refresh: bool, default False 96 Refresh the Connector instance / construct new object. Defaults to `False`. 97 98 kw: Any 99 Other arguments to pass to the Connector constructor. 100 If the Connector has already been constructed and new arguments are provided, 101 `refresh` is set to `True` and the old Connector is replaced. 102 103 Returns 104 ------- 105 A new Meerschaum connector (e.g. `meerschaum.connectors.api.APIConnector`, 106 `meerschaum.connectors.sql.SQLConnector`). 107 108 Examples 109 -------- 110 The following parameters would create a new 111 `meerschaum.connectors.sql.SQLConnector` that isn't in the configuration file. 112 113 ``` 114 >>> conn = get_connector( 115 ... type = 'sql', 116 ... label = 'newlabel', 117 ... flavor = 'sqlite', 118 ... database = '/file/path/to/database.db' 119 ... ) 120 >>> 121 ``` 122 123 """ 124 from meerschaum.connectors.parse import parse_instance_keys 125 from meerschaum.config import get_config 126 from meerschaum.config.static import STATIC_CONFIG 127 from meerschaum.utils.warnings import warn 128 global _loaded_plugin_connectors 129 if isinstance(type, str) and not label and ':' in type: 130 type, label = type.split(':', maxsplit=1) 131 with _locks['_loaded_plugin_connectors']: 132 if not _loaded_plugin_connectors: 133 load_plugin_connectors() 134 _loaded_plugin_connectors = True 135 if type is None and label is None: 136 default_instance_keys = get_config('meerschaum', 'instance', patch=True) 137 ### recursive call to get_connector 138 return parse_instance_keys(default_instance_keys) 139 140 ### NOTE: the default instance connector may not be main. 141 ### Only fall back to 'main' if the type is provided by the label is omitted. 142 label = label if label is not None else STATIC_CONFIG['connectors']['default_label'] 143 144 ### type might actually be a label. Check if so and raise a warning. 145 if type not in connectors: 146 possibilities, poss_msg = [], "" 147 for _type in get_config('meerschaum', 'connectors'): 148 if type in get_config('meerschaum', 'connectors', _type): 149 possibilities.append(f"{_type}:{type}") 150 if len(possibilities) > 0: 151 poss_msg = " Did you mean" 152 for poss in possibilities[:-1]: 153 poss_msg += f" '{poss}'," 154 if poss_msg.endswith(','): 155 poss_msg = poss_msg[:-1] 156 if len(possibilities) > 1: 157 poss_msg += " or" 158 poss_msg += f" '{possibilities[-1]}'?" 159 160 warn(f"Cannot create Connector of type '{type}'." + poss_msg, stack=False) 161 return None 162 163 if 'sql' not in types: 164 from meerschaum.connectors.plugin import PluginConnector 165 with _locks['types']: 166 types.update({ 167 'api' : APIConnector, 168 'sql' : SQLConnector, 169 'plugin': PluginConnector, 170 }) 171 172 ### determine if we need to call the constructor 173 if not refresh: 174 ### see if any user-supplied arguments differ from the existing instance 175 if label in connectors[type]: 176 warning_message = None 177 for attribute, value in kw.items(): 178 if attribute not in connectors[type][label].meta: 179 import inspect 180 cls = connectors[type][label].__class__ 181 cls_init_signature = inspect.signature(cls) 182 cls_init_params = cls_init_signature.parameters 183 if attribute not in cls_init_params: 184 warning_message = ( 185 f"Received new attribute '{attribute}' not present in connector " + 186 f"{connectors[type][label]}.\n" 187 ) 188 elif connectors[type][label].__dict__[attribute] != value: 189 warning_message = ( 190 f"Mismatched values for attribute '{attribute}' in connector " 191 + f"'{connectors[type][label]}'.\n" + 192 f" - Keyword value: '{value}'\n" + 193 f" - Existing value: '{connectors[type][label].__dict__[attribute]}'\n" 194 ) 195 if warning_message is not None: 196 warning_message += ( 197 "\nSetting `refresh` to True and recreating connector with type:" 198 + f" '{type}' and label '{label}'." 199 ) 200 refresh = True 201 warn(warning_message) 202 else: ### connector doesn't yet exist 203 refresh = True 204 205 ### only create an object if refresh is True 206 ### (can be manually specified, otherwise determined above) 207 if refresh: 208 with _locks['connectors']: 209 try: 210 ### will raise an error if configuration is incorrect / missing 211 conn = types[type](label=label, **kw) 212 connectors[type][label] = conn 213 except InvalidAttributesError as ie: 214 warn( 215 f"Incorrect attributes for connector '{type}:{label}'.\n" 216 + str(ie), 217 stack = False, 218 ) 219 conn = None 220 except Exception as e: 221 from meerschaum.utils.formatting import get_console 222 console = get_console() 223 if console: 224 console.print_exception() 225 warn( 226 f"Exception when creating connector '{type}:{label}'.\n" + str(e), 227 stack = False, 228 ) 229 conn = None 230 if conn is None: 231 return None 232 233 return connectors[type][label]
Return existing connector or create new connection and store for reuse.
You can create new connectors if enough parameters are provided for the given type and flavor.
Parameters
- type (Optional[str], default None):
Connector type (sql, api, etc.).
Defaults to the type of the configured
instance_connector
. - label (Optional[str], default None):
Connector label (e.g. main). Defaults to
'main'
. - refresh (bool, default False):
Refresh the Connector instance / construct new object. Defaults to
False
. - kw (Any):
Other arguments to pass to the Connector constructor.
If the Connector has already been constructed and new arguments are provided,
refresh
is set toTrue
and the old Connector is replaced.
Returns
- A new Meerschaum connector (e.g.
APIConnector
, SQLConnector
).
Examples
The following parameters would create a new
SQLConnector
that isn't in the configuration file.
>>> conn = get_connector(
... type = 'sql',
... label = 'newlabel',
... flavor = 'sqlite',
... database = '/file/path/to/database.db'
... )
>>>
236def is_connected(keys: str, **kw) -> bool: 237 """ 238 Check if the connector keys correspond to an active connection. 239 If the connector has not been created, it will immediately return `False`. 240 If the connector exists but cannot communicate with the source, return `False`. 241 242 **NOTE:** Only works with instance connectors (`SQLConnectors` and `APIConnectors`). 243 Keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`. 244 245 Parameters 246 ---------- 247 keys: 248 The keys to the connector (e.g. `'sql:main'`). 249 250 Returns 251 ------- 252 A `bool` corresponding to whether a successful connection may be made. 253 254 """ 255 import warnings 256 if ':' not in keys: 257 warn(f"Invalid connector keys '{keys}'") 258 259 try: 260 typ, label = keys.split(':') 261 except Exception as e: 262 return False 263 if typ not in instance_types: 264 return False 265 if not (label in connectors.get(typ, {})): 266 return False 267 268 from meerschaum.connectors.parse import parse_instance_keys 269 conn = parse_instance_keys(keys) 270 try: 271 with warnings.catch_warnings(): 272 warnings.filterwarnings('ignore') 273 return conn.test_connection(**kw) 274 except Exception as e: 275 return False
Check if the connector keys correspond to an active connection.
If the connector has not been created, it will immediately return False
.
If the connector exists but cannot communicate with the source, return False
.
NOTE: Only works with instance connectors (SQLConnectors
and APIConnectors
).
Keyword arguments are passed to meerschaum.connectors.poll.retry_connect
.
Parameters
- keys:: The keys to the connector (e.g.
'sql:main'
).
Returns
- A
bool
corresponding to whether a successful connection may be made.