meerschaum.connectors
Create connectors with meerschaum.connectors.get_connector()
.
For ease of use, you can also import from the root meerschaum
module:
>>> from meerschaum import get_connector
>>> conn = get_connector()
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Create connectors with `meerschaum.connectors.get_connector()`. 7For ease of use, you can also import from the root `meerschaum` module: 8``` 9>>> from meerschaum import get_connector 10>>> conn = get_connector() 11``` 12""" 13 14from __future__ import annotations 15 16import meerschaum as mrsm 17from meerschaum.utils.typing import Any, Union, List, Dict 18from meerschaum.utils.threading import RLock 19from meerschaum.utils.warnings import warn 20 21from meerschaum.connectors._Connector import Connector, InvalidAttributesError 22from meerschaum.connectors.sql._SQLConnector import SQLConnector 23from meerschaum.connectors.api._APIConnector import APIConnector 24from meerschaum.connectors.sql._create_engine import flavor_configs as sql_flavor_configs 25 26__all__ = ( 27 "make_connector", 28 "Connector", 29 "SQLConnector", 30 "APIConnector", 31 "get_connector", 32 "is_connected", 33 "poll", 34 "api", 35 "sql", 36 "valkey", 37) 38 39### store connectors partitioned by 40### type, label for reuse 41connectors: Dict[str, Dict[str, Connector]] = { 42 'api' : {}, 43 'sql' : {}, 44 'plugin' : {}, 45 'valkey' : {}, 46} 47instance_types: List[str] = ['sql', 'api'] 48_locks: Dict[str, RLock] = { 49 'connectors' : RLock(), 50 'types' : RLock(), 51 'custom_types' : RLock(), 52 '_loaded_plugin_connectors': RLock(), 53 'instance_types' : RLock(), 54} 55attributes: Dict[str, Dict[str, Any]] = { 56 'api': { 57 'required': [ 58 'host', 59 'username', 60 'password', 61 ], 62 'optional': [ 63 'port', 64 ], 65 'default': { 66 'protocol': 'http', 67 }, 68 }, 69 'sql': { 70 'flavors': sql_flavor_configs, 71 }, 72} 73### Fill this with objects only when connectors are first referenced. 74types: Dict[str, Any] = {} 75custom_types: set = set() 76_loaded_plugin_connectors: bool = False 77 78 79def get_connector( 80 type: str = None, 81 label: str = None, 82 refresh: bool = False, 83 debug: bool = False, 84 **kw: Any 85) -> Connector: 86 """ 87 Return existing connector or create new connection and store for reuse. 88 89 You can create new connectors if enough parameters are provided for the given type and flavor. 90 91 92 Parameters 93 ---------- 94 type: Optional[str], default None 95 Connector type (sql, api, etc.). 96 Defaults to the type of the configured `instance_connector`. 97 98 label: Optional[str], default None 99 Connector label (e.g. main). Defaults to `'main'`. 100 101 refresh: bool, default False 102 Refresh the Connector instance / construct new object. Defaults to `False`. 103 104 kw: Any 105 Other arguments to pass to the Connector constructor. 106 If the Connector has already been constructed and new arguments are provided, 107 `refresh` is set to `True` and the old Connector is replaced. 108 109 Returns 110 ------- 111 A new Meerschaum connector (e.g. `meerschaum.connectors.api.APIConnector`, 112 `meerschaum.connectors.sql.SQLConnector`). 113 114 Examples 115 -------- 116 The following parameters would create a new 117 `meerschaum.connectors.sql.SQLConnector` that isn't in the configuration file. 118 119 ``` 120 >>> conn = get_connector( 121 ... type = 'sql', 122 ... label = 'newlabel', 123 ... flavor = 'sqlite', 124 ... database = '/file/path/to/database.db' 125 ... ) 126 >>> 127 ``` 128 129 """ 130 from meerschaum.connectors.parse import parse_instance_keys 131 from meerschaum.config import get_config 132 from meerschaum.config.static import STATIC_CONFIG 133 from meerschaum.utils.warnings import warn 134 global _loaded_plugin_connectors 135 if isinstance(type, str) and not label and ':' in type: 136 type, label = type.split(':', maxsplit=1) 137 138 with _locks['_loaded_plugin_connectors']: 139 if not _loaded_plugin_connectors: 140 load_plugin_connectors() 141 _load_builtin_custom_connectors() 142 _loaded_plugin_connectors = True 143 144 if type is None and label is None: 145 default_instance_keys = get_config('meerschaum', 'instance', patch=True) 146 ### recursive call to get_connector 147 return parse_instance_keys(default_instance_keys) 148 149 ### NOTE: the default instance connector may not be main. 150 ### Only fall back to 'main' if the type is provided by the label is omitted. 151 label = label if label is not None else STATIC_CONFIG['connectors']['default_label'] 152 153 ### type might actually be a label. Check if so and raise a warning. 154 if type not in connectors: 155 possibilities, poss_msg = [], "" 156 for _type in get_config('meerschaum', 'connectors'): 157 if type in get_config('meerschaum', 'connectors', _type): 158 possibilities.append(f"{_type}:{type}") 159 if len(possibilities) > 0: 160 poss_msg = " Did you mean" 161 for poss in possibilities[:-1]: 162 poss_msg += f" '{poss}'," 163 if poss_msg.endswith(','): 164 poss_msg = poss_msg[:-1] 165 if len(possibilities) > 1: 166 poss_msg += " or" 167 poss_msg += f" '{possibilities[-1]}'?" 168 169 warn(f"Cannot create Connector of type '{type}'." + poss_msg, stack=False) 170 return None 171 172 if 'sql' not in types: 173 from meerschaum.connectors.plugin import PluginConnector 174 from meerschaum.connectors.valkey import ValkeyConnector 175 with _locks['types']: 176 types.update({ 177 'api': APIConnector, 178 'sql': SQLConnector, 179 'plugin': PluginConnector, 180 'valkey': ValkeyConnector, 181 }) 182 183 ### determine if we need to call the constructor 184 if not refresh: 185 ### see if any user-supplied arguments differ from the existing instance 186 if label in connectors[type]: 187 warning_message = None 188 for attribute, value in kw.items(): 189 if attribute not in connectors[type][label].meta: 190 import inspect 191 cls = connectors[type][label].__class__ 192 cls_init_signature = inspect.signature(cls) 193 cls_init_params = cls_init_signature.parameters 194 if attribute not in cls_init_params: 195 warning_message = ( 196 f"Received new attribute '{attribute}' not present in connector " + 197 f"{connectors[type][label]}.\n" 198 ) 199 elif connectors[type][label].__dict__[attribute] != value: 200 warning_message = ( 201 f"Mismatched values for attribute '{attribute}' in connector " 202 + f"'{connectors[type][label]}'.\n" + 203 f" - Keyword value: '{value}'\n" + 204 f" - Existing value: '{connectors[type][label].__dict__[attribute]}'\n" 205 ) 206 if warning_message is not None: 207 warning_message += ( 208 "\nSetting `refresh` to True and recreating connector with type:" 209 + f" '{type}' and label '{label}'." 210 ) 211 refresh = True 212 warn(warning_message) 213 else: ### connector doesn't yet exist 214 refresh = True 215 216 ### only create an object if refresh is True 217 ### (can be manually specified, otherwise determined above) 218 if refresh: 219 with _locks['connectors']: 220 try: 221 ### will raise an error if configuration is incorrect / missing 222 conn = types[type](label=label, **kw) 223 connectors[type][label] = conn 224 except InvalidAttributesError as ie: 225 warn( 226 f"Incorrect attributes for connector '{type}:{label}'.\n" 227 + str(ie), 228 stack = False, 229 ) 230 conn = None 231 except Exception as e: 232 from meerschaum.utils.formatting import get_console 233 console = get_console() 234 if console: 235 console.print_exception() 236 warn( 237 f"Exception when creating connector '{type}:{label}'.\n" + str(e), 238 stack = False, 239 ) 240 conn = None 241 if conn is None: 242 return None 243 244 return connectors[type][label] 245 246 247def is_connected(keys: str, **kw) -> bool: 248 """ 249 Check if the connector keys correspond to an active connection. 250 If the connector has not been created, it will immediately return `False`. 251 If the connector exists but cannot communicate with the source, return `False`. 252 253 **NOTE:** Only works with instance connectors (`SQLConnectors` and `APIConnectors`). 254 Keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`. 255 256 Parameters 257 ---------- 258 keys: 259 The keys to the connector (e.g. `'sql:main'`). 260 261 Returns 262 ------- 263 A `bool` corresponding to whether a successful connection may be made. 264 265 """ 266 import warnings 267 if ':' not in keys: 268 warn(f"Invalid connector keys '{keys}'") 269 270 try: 271 typ, label = keys.split(':') 272 except Exception: 273 return False 274 if typ not in instance_types: 275 return False 276 if label not in connectors.get(typ, {}): 277 return False 278 279 from meerschaum.connectors.parse import parse_instance_keys 280 conn = parse_instance_keys(keys) 281 try: 282 with warnings.catch_warnings(): 283 warnings.filterwarnings('ignore') 284 return conn.test_connection(**kw) 285 except Exception: 286 return False 287 288 289def make_connector(cls, _is_executor: bool = False): 290 """ 291 Register a class as a `Connector`. 292 The `type` will be the lower case of the class name, without the suffix `connector`. 293 294 Parameters 295 ---------- 296 instance: bool, default False 297 If `True`, make this connector type an instance connector. 298 This requires implementing the various pipes functions and lots of testing. 299 300 Examples 301 -------- 302 >>> import meerschaum as mrsm 303 >>> from meerschaum.connectors import make_connector, Connector 304 >>> 305 >>> @make_connector 306 >>> class FooConnector(Connector): 307 ... REQUIRED_ATTRIBUTES: list[str] = ['username', 'password'] 308 ... 309 >>> conn = mrsm.get_connector('foo:bar', username='dog', password='cat') 310 >>> print(conn.username, conn.password) 311 dog cat 312 >>> 313 """ 314 import re 315 suffix_regex = ( 316 r'connector$' 317 if not _is_executor 318 else r'executor$' 319 ) 320 typ = re.sub(suffix_regex, '', cls.__name__.lower()) 321 with _locks['types']: 322 types[typ] = cls 323 with _locks['custom_types']: 324 custom_types.add(typ) 325 with _locks['connectors']: 326 if typ not in connectors: 327 connectors[typ] = {} 328 if getattr(cls, 'IS_INSTANCE', False): 329 with _locks['instance_types']: 330 if typ not in instance_types: 331 instance_types.append(typ) 332 333 return cls 334 335 336def load_plugin_connectors(): 337 """ 338 If a plugin makes use of the `make_connector` decorator, 339 load its module. 340 """ 341 from meerschaum.plugins import get_plugins, import_plugins 342 to_import = [] 343 for plugin in get_plugins(): 344 if plugin is None: 345 continue 346 with open(plugin.__file__, encoding='utf-8') as f: 347 text = f.read() 348 if 'make_connector' in text or 'Connector' in text: 349 to_import.append(plugin.name) 350 if not to_import: 351 return 352 import_plugins(*to_import) 353 354 355def get_connector_plugin( 356 connector: Connector, 357) -> Union[str, None, mrsm.Plugin]: 358 """ 359 Determine the plugin for a connector. 360 This is useful for handling virtual environments for custom instance connectors. 361 362 Parameters 363 ---------- 364 connector: Connector 365 The connector which may require a virtual environment. 366 367 Returns 368 ------- 369 A Plugin, 'mrsm', or None. 370 """ 371 if not hasattr(connector, 'type'): 372 return None 373 plugin_name = ( 374 connector.__module__.replace('plugins.', '').split('.')[0] 375 if connector.type in custom_types else ( 376 connector.label 377 if connector.type == 'plugin' 378 else 'mrsm' 379 ) 380 ) 381 plugin = mrsm.Plugin(plugin_name) 382 return plugin if plugin.is_installed() else None 383 384 385def _load_builtin_custom_connectors(): 386 """ 387 Import custom connectors decorated with `@make_connector` or `@make_executor`. 388 """ 389 import meerschaum.jobs.systemd 390 import meerschaum.connectors.valkey
290def make_connector(cls, _is_executor: bool = False): 291 """ 292 Register a class as a `Connector`. 293 The `type` will be the lower case of the class name, without the suffix `connector`. 294 295 Parameters 296 ---------- 297 instance: bool, default False 298 If `True`, make this connector type an instance connector. 299 This requires implementing the various pipes functions and lots of testing. 300 301 Examples 302 -------- 303 >>> import meerschaum as mrsm 304 >>> from meerschaum.connectors import make_connector, Connector 305 >>> 306 >>> @make_connector 307 >>> class FooConnector(Connector): 308 ... REQUIRED_ATTRIBUTES: list[str] = ['username', 'password'] 309 ... 310 >>> conn = mrsm.get_connector('foo:bar', username='dog', password='cat') 311 >>> print(conn.username, conn.password) 312 dog cat 313 >>> 314 """ 315 import re 316 suffix_regex = ( 317 r'connector$' 318 if not _is_executor 319 else r'executor$' 320 ) 321 typ = re.sub(suffix_regex, '', cls.__name__.lower()) 322 with _locks['types']: 323 types[typ] = cls 324 with _locks['custom_types']: 325 custom_types.add(typ) 326 with _locks['connectors']: 327 if typ not in connectors: 328 connectors[typ] = {} 329 if getattr(cls, 'IS_INSTANCE', False): 330 with _locks['instance_types']: 331 if typ not in instance_types: 332 instance_types.append(typ) 333 334 return cls
Register a class as a Connector
.
The type
will be the lower case of the class name, without the suffix connector
.
Parameters
- instance (bool, default False):
If
True
, make this connector type an instance connector. This requires implementing the various pipes functions and lots of testing.
Examples
>>> import meerschaum as mrsm
>>> from meerschaum.connectors import make_connector, Connector
>>>
>>> @make_connector
>>> class FooConnector(Connector):
... REQUIRED_ATTRIBUTES: list[str] = ['username', 'password']
...
>>> conn = mrsm.get_connector('foo:bar', username='dog', password='cat')
>>> print(conn.username, conn.password)
dog cat
>>>
20class Connector(metaclass=abc.ABCMeta): 21 """ 22 The base connector class to hold connection attributes. 23 """ 24 def __init__( 25 self, 26 type: Optional[str] = None, 27 label: Optional[str] = None, 28 **kw: Any 29 ): 30 """ 31 Set the given keyword arguments as attributes. 32 33 Parameters 34 ---------- 35 type: str 36 The `type` of the connector (e.g. `sql`, `api`, `plugin`). 37 38 label: str 39 The `label` for the connector. 40 41 42 Examples 43 -------- 44 Run `mrsm edit config` and to edit connectors in the YAML file: 45 46 ```yaml 47 meerschaum: 48 connections: 49 {type}: 50 {label}: 51 ### attributes go here 52 ``` 53 54 """ 55 self._original_dict = copy.deepcopy(self.__dict__) 56 self._set_attributes(type=type, label=label, **kw) 57 58 ### NOTE: Override `REQUIRED_ATTRIBUTES` if `uri` is set. 59 self.verify_attributes( 60 ['uri'] 61 if 'uri' in self.__dict__ 62 else getattr(self, 'REQUIRED_ATTRIBUTES', None) 63 ) 64 65 def _reset_attributes(self): 66 self.__dict__ = self._original_dict 67 68 def _set_attributes( 69 self, 70 *args, 71 inherit_default: bool = True, 72 **kw: Any 73 ): 74 from meerschaum.config.static import STATIC_CONFIG 75 from meerschaum.utils.warnings import error 76 77 self._attributes = {} 78 79 default_label = STATIC_CONFIG['connectors']['default_label'] 80 81 ### NOTE: Support the legacy method of explicitly passing the type. 82 label = kw.get('label', None) 83 if label is None: 84 if len(args) == 2: 85 label = args[1] 86 elif len(args) == 0: 87 label = None 88 else: 89 label = args[0] 90 91 if label == 'default': 92 error( 93 f"Label cannot be 'default'. Did you mean '{default_label}'?", 94 InvalidAttributesError, 95 ) 96 self.__dict__['label'] = label 97 98 from meerschaum.config import get_config 99 conn_configs = copy.deepcopy(get_config('meerschaum', 'connectors')) 100 connector_config = copy.deepcopy(get_config('system', 'connectors')) 101 102 ### inherit attributes from 'default' if exists 103 if inherit_default: 104 inherit_from = 'default' 105 if self.type in conn_configs and inherit_from in conn_configs[self.type]: 106 _inherit_dict = copy.deepcopy(conn_configs[self.type][inherit_from]) 107 self._attributes.update(_inherit_dict) 108 109 ### load user config into self._attributes 110 if self.type in conn_configs and self.label in conn_configs[self.type]: 111 self._attributes.update(conn_configs[self.type][self.label] or {}) 112 113 ### load system config into self._sys_config 114 ### (deep copy so future Connectors don't inherit changes) 115 if self.type in connector_config: 116 self._sys_config = copy.deepcopy(connector_config[self.type]) 117 118 ### add additional arguments or override configuration 119 self._attributes.update(kw) 120 121 ### finally, update __dict__ with _attributes. 122 self.__dict__.update(self._attributes) 123 124 def verify_attributes( 125 self, 126 required_attributes: Optional[List[str]] = None, 127 debug: bool = False, 128 ) -> None: 129 """ 130 Ensure that the required attributes have been met. 131 132 The Connector base class checks the minimum requirements. 133 Child classes may enforce additional requirements. 134 135 Parameters 136 ---------- 137 required_attributes: Optional[List[str]], default None 138 Attributes to be verified. If `None`, default to `['label']`. 139 140 debug: bool, default False 141 Verbosity toggle. 142 143 Returns 144 ------- 145 Don't return anything. 146 147 Raises 148 ------ 149 An error if any of the required attributes are missing. 150 """ 151 from meerschaum.utils.warnings import error, warn 152 from meerschaum.utils.debug import dprint 153 from meerschaum.utils.misc import items_str 154 if required_attributes is None: 155 required_attributes = ['label'] 156 157 missing_attributes = set() 158 for a in required_attributes: 159 if a not in self.__dict__: 160 missing_attributes.add(a) 161 if len(missing_attributes) > 0: 162 error( 163 ( 164 f"Missing {items_str(list(missing_attributes))} " 165 + f"for connector '{self.type}:{self.label}'." 166 ), 167 InvalidAttributesError, 168 silent=True, 169 stack=False 170 ) 171 172 173 def __str__(self): 174 """ 175 When cast to a string, return type:label. 176 """ 177 return f"{self.type}:{self.label}" 178 179 def __repr__(self): 180 """ 181 Represent the connector as type:label. 182 """ 183 return str(self) 184 185 @property 186 def meta(self) -> Dict[str, Any]: 187 """ 188 Return the keys needed to reconstruct this Connector. 189 """ 190 _meta = { 191 key: value 192 for key, value in self.__dict__.items() 193 if not str(key).startswith('_') 194 } 195 _meta.update({ 196 'type': self.type, 197 'label': self.label, 198 }) 199 return _meta 200 201 202 @property 203 def type(self) -> str: 204 """ 205 Return the type for this connector. 206 """ 207 _type = self.__dict__.get('type', None) 208 if _type is None: 209 import re 210 is_executor = self.__class__.__name__.lower().endswith('executor') 211 suffix_regex = ( 212 r'connector$' 213 if not is_executor 214 else r'executor$' 215 ) 216 _type = re.sub(suffix_regex, '', self.__class__.__name__.lower()) 217 self.__dict__['type'] = _type 218 return _type 219 220 221 @property 222 def label(self) -> str: 223 """ 224 Return the label for this connector. 225 """ 226 _label = self.__dict__.get('label', None) 227 if _label is None: 228 from meerschaum.config.static import STATIC_CONFIG 229 _label = STATIC_CONFIG['connectors']['default_label'] 230 self.__dict__['label'] = _label 231 return _label
The base connector class to hold connection attributes.
24 def __init__( 25 self, 26 type: Optional[str] = None, 27 label: Optional[str] = None, 28 **kw: Any 29 ): 30 """ 31 Set the given keyword arguments as attributes. 32 33 Parameters 34 ---------- 35 type: str 36 The `type` of the connector (e.g. `sql`, `api`, `plugin`). 37 38 label: str 39 The `label` for the connector. 40 41 42 Examples 43 -------- 44 Run `mrsm edit config` and to edit connectors in the YAML file: 45 46 ```yaml 47 meerschaum: 48 connections: 49 {type}: 50 {label}: 51 ### attributes go here 52 ``` 53 54 """ 55 self._original_dict = copy.deepcopy(self.__dict__) 56 self._set_attributes(type=type, label=label, **kw) 57 58 ### NOTE: Override `REQUIRED_ATTRIBUTES` if `uri` is set. 59 self.verify_attributes( 60 ['uri'] 61 if 'uri' in self.__dict__ 62 else getattr(self, 'REQUIRED_ATTRIBUTES', None) 63 )
124 def verify_attributes( 125 self, 126 required_attributes: Optional[List[str]] = None, 127 debug: bool = False, 128 ) -> None: 129 """ 130 Ensure that the required attributes have been met. 131 132 The Connector base class checks the minimum requirements. 133 Child classes may enforce additional requirements. 134 135 Parameters 136 ---------- 137 required_attributes: Optional[List[str]], default None 138 Attributes to be verified. If `None`, default to `['label']`. 139 140 debug: bool, default False 141 Verbosity toggle. 142 143 Returns 144 ------- 145 Don't return anything. 146 147 Raises 148 ------ 149 An error if any of the required attributes are missing. 150 """ 151 from meerschaum.utils.warnings import error, warn 152 from meerschaum.utils.debug import dprint 153 from meerschaum.utils.misc import items_str 154 if required_attributes is None: 155 required_attributes = ['label'] 156 157 missing_attributes = set() 158 for a in required_attributes: 159 if a not in self.__dict__: 160 missing_attributes.add(a) 161 if len(missing_attributes) > 0: 162 error( 163 ( 164 f"Missing {items_str(list(missing_attributes))} " 165 + f"for connector '{self.type}:{self.label}'." 166 ), 167 InvalidAttributesError, 168 silent=True, 169 stack=False 170 )
Ensure that the required attributes have been met.
The Connector base class checks the minimum requirements. Child classes may enforce additional requirements.
Parameters
- required_attributes (Optional[List[str]], default None):
Attributes to be verified. If
None
, default to['label']
. - debug (bool, default False): Verbosity toggle.
Returns
- Don't return anything.
Raises
- An error if any of the required attributes are missing.
185 @property 186 def meta(self) -> Dict[str, Any]: 187 """ 188 Return the keys needed to reconstruct this Connector. 189 """ 190 _meta = { 191 key: value 192 for key, value in self.__dict__.items() 193 if not str(key).startswith('_') 194 } 195 _meta.update({ 196 'type': self.type, 197 'label': self.label, 198 }) 199 return _meta
Return the keys needed to reconstruct this Connector.
202 @property 203 def type(self) -> str: 204 """ 205 Return the type for this connector. 206 """ 207 _type = self.__dict__.get('type', None) 208 if _type is None: 209 import re 210 is_executor = self.__class__.__name__.lower().endswith('executor') 211 suffix_regex = ( 212 r'connector$' 213 if not is_executor 214 else r'executor$' 215 ) 216 _type = re.sub(suffix_regex, '', self.__class__.__name__.lower()) 217 self.__dict__['type'] = _type 218 return _type
Return the type for this connector.
221 @property 222 def label(self) -> str: 223 """ 224 Return the label for this connector. 225 """ 226 _label = self.__dict__.get('label', None) 227 if _label is None: 228 from meerschaum.config.static import STATIC_CONFIG 229 _label = STATIC_CONFIG['connectors']['default_label'] 230 self.__dict__['label'] = _label 231 return _label
Return the label for this connector.
18class SQLConnector(Connector): 19 """ 20 Connect to SQL databases via `sqlalchemy`. 21 22 SQLConnectors may be used as Meerschaum instance connectors. 23 Read more about connectors and instances at 24 https://meerschaum.io/reference/connectors/ 25 26 """ 27 28 IS_INSTANCE: bool = True 29 30 from ._create_engine import flavor_configs, create_engine 31 from ._sql import ( 32 read, 33 value, 34 exec, 35 execute, 36 to_sql, 37 exec_queries, 38 get_connection, 39 _cleanup_connections, 40 ) 41 from meerschaum.utils.sql import test_connection 42 from ._fetch import fetch, get_pipe_metadef 43 from ._cli import cli, _cli_exit 44 from ._pipes import ( 45 fetch_pipes_keys, 46 create_indices, 47 drop_indices, 48 get_create_index_queries, 49 get_drop_index_queries, 50 get_add_columns_queries, 51 get_alter_columns_queries, 52 delete_pipe, 53 get_pipe_data, 54 get_pipe_data_query, 55 register_pipe, 56 edit_pipe, 57 get_pipe_id, 58 get_pipe_attributes, 59 sync_pipe, 60 sync_pipe_inplace, 61 get_sync_time, 62 pipe_exists, 63 get_pipe_rowcount, 64 drop_pipe, 65 clear_pipe, 66 deduplicate_pipe, 67 get_pipe_table, 68 get_pipe_columns_types, 69 get_to_sql_dtype, 70 get_pipe_schema, 71 ) 72 from ._plugins import ( 73 register_plugin, 74 delete_plugin, 75 get_plugin_id, 76 get_plugin_version, 77 get_plugins, 78 get_plugin_user_id, 79 get_plugin_username, 80 get_plugin_attributes, 81 ) 82 from ._users import ( 83 register_user, 84 get_user_id, 85 get_users, 86 edit_user, 87 delete_user, 88 get_user_password_hash, 89 get_user_type, 90 get_user_attributes, 91 ) 92 from ._uri import from_uri, parse_uri 93 from ._instance import ( 94 _log_temporary_tables_creation, 95 _drop_temporary_table, 96 _drop_temporary_tables, 97 _drop_old_temporary_tables, 98 ) 99 100 def __init__( 101 self, 102 label: Optional[str] = None, 103 flavor: Optional[str] = None, 104 wait: bool = False, 105 connect: bool = False, 106 debug: bool = False, 107 **kw: Any 108 ): 109 """ 110 Parameters 111 ---------- 112 label: str, default 'main' 113 The identifying label for the connector. 114 E.g. for `sql:main`, 'main' is the label. 115 Defaults to 'main'. 116 117 flavor: Optional[str], default None 118 The database flavor, e.g. 119 `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc. 120 To see supported flavors, run the `bootstrap connectors` command. 121 122 wait: bool, default False 123 If `True`, block until a database connection has been made. 124 Defaults to `False`. 125 126 connect: bool, default False 127 If `True`, immediately attempt to connect the database and raise 128 a warning if the connection fails. 129 Defaults to `False`. 130 131 debug: bool, default False 132 Verbosity toggle. 133 Defaults to `False`. 134 135 kw: Any 136 All other arguments will be passed to the connector's attributes. 137 Therefore, a connector may be made without being registered, 138 as long enough parameters are supplied to the constructor. 139 """ 140 if 'uri' in kw: 141 uri = kw['uri'] 142 if uri.startswith('postgres') and not uri.startswith('postgresql'): 143 uri = uri.replace('postgres', 'postgresql', 1) 144 if uri.startswith('postgresql') and not uri.startswith('postgresql+'): 145 uri = uri.replace('postgresql://', 'postgresql+psycopg', 1) 146 if uri.startswith('timescaledb://'): 147 uri = uri.replace('timescaledb://', 'postgresql://', 1) 148 flavor = 'timescaledb' 149 kw['uri'] = uri 150 from_uri_params = self.from_uri(kw['uri'], as_dict=True) 151 label = label or from_uri_params.get('label', None) 152 _ = from_uri_params.pop('label', None) 153 154 ### Sometimes the flavor may be provided with a URI. 155 kw.update(from_uri_params) 156 if flavor: 157 kw['flavor'] = flavor 158 159 160 ### set __dict__ in base class 161 super().__init__( 162 'sql', 163 label = label or self.__dict__.get('label', None), 164 **kw 165 ) 166 167 if self.__dict__.get('flavor', None) == 'sqlite': 168 self._reset_attributes() 169 self._set_attributes( 170 'sql', 171 label = label, 172 inherit_default = False, 173 **kw 174 ) 175 ### For backwards compatability reasons, set the path for sql:local if its missing. 176 if self.label == 'local' and not self.__dict__.get('database', None): 177 from meerschaum.config._paths import SQLITE_DB_PATH 178 self.database = str(SQLITE_DB_PATH) 179 180 ### ensure flavor and label are set accordingly 181 if 'flavor' not in self.__dict__: 182 if flavor is None and 'uri' not in self.__dict__: 183 raise Exception( 184 f" Missing flavor. Provide flavor as a key for '{self}'." 185 ) 186 self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None) 187 188 if self.flavor == 'postgres': 189 self.flavor = 'postgresql' 190 191 self._debug = debug 192 ### Store the PID and thread at initialization 193 ### so we can dispose of the Pool in child processes or threads. 194 import os, threading 195 self._pid = os.getpid() 196 self._thread_ident = threading.current_thread().ident 197 self._sessions = {} 198 self._locks = {'_sessions': threading.RLock(), } 199 200 ### verify the flavor's requirements are met 201 if self.flavor not in self.flavor_configs: 202 error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector") 203 if not self.__dict__.get('uri'): 204 self.verify_attributes( 205 self.flavor_configs[self.flavor].get('requirements', set()), 206 debug=debug, 207 ) 208 209 if wait: 210 from meerschaum.connectors.poll import retry_connect 211 retry_connect(connector=self, debug=debug) 212 213 if connect: 214 if not self.test_connection(debug=debug): 215 from meerschaum.utils.warnings import warn 216 warn(f"Failed to connect with connector '{self}'!", stack=False) 217 218 @property 219 def Session(self): 220 if '_Session' not in self.__dict__: 221 if self.engine is None: 222 return None 223 224 from meerschaum.utils.packages import attempt_import 225 sqlalchemy_orm = attempt_import('sqlalchemy.orm') 226 session_factory = sqlalchemy_orm.sessionmaker(self.engine) 227 self._Session = sqlalchemy_orm.scoped_session(session_factory) 228 229 return self._Session 230 231 @property 232 def engine(self): 233 import os, threading 234 ### build the sqlalchemy engine 235 if '_engine' not in self.__dict__: 236 self._engine, self._engine_str = self.create_engine(include_uri=True) 237 238 same_process = os.getpid() == self._pid 239 same_thread = threading.current_thread().ident == self._thread_ident 240 241 ### handle child processes 242 if not same_process: 243 self._pid = os.getpid() 244 self._thread = threading.current_thread() 245 from meerschaum.utils.warnings import warn 246 warn(f"Different PID detected. Disposing of connections...") 247 self._engine.dispose() 248 249 ### handle different threads 250 if not same_thread: 251 pass 252 253 return self._engine 254 255 @property 256 def DATABASE_URL(self) -> str: 257 """ 258 Return the URI connection string (alias for `SQLConnector.URI`. 259 """ 260 _ = self.engine 261 return str(self._engine_str) 262 263 @property 264 def URI(self) -> str: 265 """ 266 Return the URI connection string. 267 """ 268 _ = self.engine 269 return str(self._engine_str) 270 271 @property 272 def IS_THREAD_SAFE(self) -> str: 273 """ 274 Return whether this connector may be multithreaded. 275 """ 276 if self.flavor in ('duckdb', 'oracle'): 277 return False 278 if self.flavor == 'sqlite': 279 return ':memory:' not in self.URI 280 return True 281 282 283 @property 284 def metadata(self): 285 """ 286 Return the metadata bound to this configured schema. 287 """ 288 from meerschaum.utils.packages import attempt_import 289 sqlalchemy = attempt_import('sqlalchemy') 290 if '_metadata' not in self.__dict__: 291 self._metadata = sqlalchemy.MetaData(schema=self.schema) 292 return self._metadata 293 294 295 @property 296 def instance_schema(self): 297 """ 298 Return the schema name for Meerschaum tables. 299 """ 300 return self.schema 301 302 303 @property 304 def internal_schema(self): 305 """ 306 Return the schema name for internal tables. 307 """ 308 from meerschaum.config.static import STATIC_CONFIG 309 from meerschaum.utils.packages import attempt_import 310 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 311 schema_name = self.__dict__.get('internal_schema', None) or ( 312 STATIC_CONFIG['sql']['internal_schema'] 313 if self.flavor not in NO_SCHEMA_FLAVORS 314 else self.schema 315 ) 316 317 if '_internal_schema' not in self.__dict__: 318 self._internal_schema = schema_name 319 return self._internal_schema 320 321 322 @property 323 def db(self) -> Optional[databases.Database]: 324 from meerschaum.utils.packages import attempt_import 325 databases = attempt_import('databases', lazy=False, install=True) 326 url = self.DATABASE_URL 327 if 'mysql' in url: 328 url = url.replace('+pymysql', '') 329 if '_db' not in self.__dict__: 330 try: 331 self._db = databases.Database(url) 332 except KeyError: 333 ### Likely encountered an unsupported flavor. 334 from meerschaum.utils.warnings import warn 335 self._db = None 336 return self._db 337 338 339 @property 340 def db_version(self) -> Union[str, None]: 341 """ 342 Return the database version. 343 """ 344 _db_version = self.__dict__.get('_db_version', None) 345 if _db_version is not None: 346 return _db_version 347 348 from meerschaum.utils.sql import get_db_version 349 self._db_version = get_db_version(self) 350 return self._db_version 351 352 353 @property 354 def schema(self) -> Union[str, None]: 355 """ 356 Return the default schema to use. 357 A value of `None` will not prepend a schema. 358 """ 359 if 'schema' in self.__dict__: 360 return self.__dict__['schema'] 361 362 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 363 if self.flavor in NO_SCHEMA_FLAVORS: 364 self.__dict__['schema'] = None 365 return None 366 367 sqlalchemy = mrsm.attempt_import('sqlalchemy') 368 _schema = sqlalchemy.inspect(self.engine).default_schema_name 369 self.__dict__['schema'] = _schema 370 return _schema 371 372 373 def __getstate__(self): 374 return self.__dict__ 375 376 def __setstate__(self, d): 377 self.__dict__.update(d) 378 379 def __call__(self): 380 return self
Connect to SQL databases via sqlalchemy
.
SQLConnectors may be used as Meerschaum instance connectors. Read more about connectors and instances at https://meerschaum.io/reference/connectors/
100 def __init__( 101 self, 102 label: Optional[str] = None, 103 flavor: Optional[str] = None, 104 wait: bool = False, 105 connect: bool = False, 106 debug: bool = False, 107 **kw: Any 108 ): 109 """ 110 Parameters 111 ---------- 112 label: str, default 'main' 113 The identifying label for the connector. 114 E.g. for `sql:main`, 'main' is the label. 115 Defaults to 'main'. 116 117 flavor: Optional[str], default None 118 The database flavor, e.g. 119 `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc. 120 To see supported flavors, run the `bootstrap connectors` command. 121 122 wait: bool, default False 123 If `True`, block until a database connection has been made. 124 Defaults to `False`. 125 126 connect: bool, default False 127 If `True`, immediately attempt to connect the database and raise 128 a warning if the connection fails. 129 Defaults to `False`. 130 131 debug: bool, default False 132 Verbosity toggle. 133 Defaults to `False`. 134 135 kw: Any 136 All other arguments will be passed to the connector's attributes. 137 Therefore, a connector may be made without being registered, 138 as long enough parameters are supplied to the constructor. 139 """ 140 if 'uri' in kw: 141 uri = kw['uri'] 142 if uri.startswith('postgres') and not uri.startswith('postgresql'): 143 uri = uri.replace('postgres', 'postgresql', 1) 144 if uri.startswith('postgresql') and not uri.startswith('postgresql+'): 145 uri = uri.replace('postgresql://', 'postgresql+psycopg', 1) 146 if uri.startswith('timescaledb://'): 147 uri = uri.replace('timescaledb://', 'postgresql://', 1) 148 flavor = 'timescaledb' 149 kw['uri'] = uri 150 from_uri_params = self.from_uri(kw['uri'], as_dict=True) 151 label = label or from_uri_params.get('label', None) 152 _ = from_uri_params.pop('label', None) 153 154 ### Sometimes the flavor may be provided with a URI. 155 kw.update(from_uri_params) 156 if flavor: 157 kw['flavor'] = flavor 158 159 160 ### set __dict__ in base class 161 super().__init__( 162 'sql', 163 label = label or self.__dict__.get('label', None), 164 **kw 165 ) 166 167 if self.__dict__.get('flavor', None) == 'sqlite': 168 self._reset_attributes() 169 self._set_attributes( 170 'sql', 171 label = label, 172 inherit_default = False, 173 **kw 174 ) 175 ### For backwards compatability reasons, set the path for sql:local if its missing. 176 if self.label == 'local' and not self.__dict__.get('database', None): 177 from meerschaum.config._paths import SQLITE_DB_PATH 178 self.database = str(SQLITE_DB_PATH) 179 180 ### ensure flavor and label are set accordingly 181 if 'flavor' not in self.__dict__: 182 if flavor is None and 'uri' not in self.__dict__: 183 raise Exception( 184 f" Missing flavor. Provide flavor as a key for '{self}'." 185 ) 186 self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None) 187 188 if self.flavor == 'postgres': 189 self.flavor = 'postgresql' 190 191 self._debug = debug 192 ### Store the PID and thread at initialization 193 ### so we can dispose of the Pool in child processes or threads. 194 import os, threading 195 self._pid = os.getpid() 196 self._thread_ident = threading.current_thread().ident 197 self._sessions = {} 198 self._locks = {'_sessions': threading.RLock(), } 199 200 ### verify the flavor's requirements are met 201 if self.flavor not in self.flavor_configs: 202 error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector") 203 if not self.__dict__.get('uri'): 204 self.verify_attributes( 205 self.flavor_configs[self.flavor].get('requirements', set()), 206 debug=debug, 207 ) 208 209 if wait: 210 from meerschaum.connectors.poll import retry_connect 211 retry_connect(connector=self, debug=debug) 212 213 if connect: 214 if not self.test_connection(debug=debug): 215 from meerschaum.utils.warnings import warn 216 warn(f"Failed to connect with connector '{self}'!", stack=False)
Parameters
- label (str, default 'main'):
The identifying label for the connector.
E.g. for
sql:main
, 'main' is the label. Defaults to 'main'. - flavor (Optional[str], default None):
The database flavor, e.g.
'sqlite'
,'postgresql'
,'cockroachdb'
, etc. To see supported flavors, run thebootstrap connectors
command. - wait (bool, default False):
If
True
, block until a database connection has been made. Defaults toFalse
. - connect (bool, default False):
If
True
, immediately attempt to connect the database and raise a warning if the connection fails. Defaults toFalse
. - debug (bool, default False):
Verbosity toggle.
Defaults to
False
. - kw (Any): All other arguments will be passed to the connector's attributes. Therefore, a connector may be made without being registered, as long enough parameters are supplied to the constructor.
218 @property 219 def Session(self): 220 if '_Session' not in self.__dict__: 221 if self.engine is None: 222 return None 223 224 from meerschaum.utils.packages import attempt_import 225 sqlalchemy_orm = attempt_import('sqlalchemy.orm') 226 session_factory = sqlalchemy_orm.sessionmaker(self.engine) 227 self._Session = sqlalchemy_orm.scoped_session(session_factory) 228 229 return self._Session
231 @property 232 def engine(self): 233 import os, threading 234 ### build the sqlalchemy engine 235 if '_engine' not in self.__dict__: 236 self._engine, self._engine_str = self.create_engine(include_uri=True) 237 238 same_process = os.getpid() == self._pid 239 same_thread = threading.current_thread().ident == self._thread_ident 240 241 ### handle child processes 242 if not same_process: 243 self._pid = os.getpid() 244 self._thread = threading.current_thread() 245 from meerschaum.utils.warnings import warn 246 warn(f"Different PID detected. Disposing of connections...") 247 self._engine.dispose() 248 249 ### handle different threads 250 if not same_thread: 251 pass 252 253 return self._engine
255 @property 256 def DATABASE_URL(self) -> str: 257 """ 258 Return the URI connection string (alias for `SQLConnector.URI`. 259 """ 260 _ = self.engine 261 return str(self._engine_str)
Return the URI connection string (alias for SQLConnector.URI
.
263 @property 264 def URI(self) -> str: 265 """ 266 Return the URI connection string. 267 """ 268 _ = self.engine 269 return str(self._engine_str)
Return the URI connection string.
271 @property 272 def IS_THREAD_SAFE(self) -> str: 273 """ 274 Return whether this connector may be multithreaded. 275 """ 276 if self.flavor in ('duckdb', 'oracle'): 277 return False 278 if self.flavor == 'sqlite': 279 return ':memory:' not in self.URI 280 return True
Return whether this connector may be multithreaded.
283 @property 284 def metadata(self): 285 """ 286 Return the metadata bound to this configured schema. 287 """ 288 from meerschaum.utils.packages import attempt_import 289 sqlalchemy = attempt_import('sqlalchemy') 290 if '_metadata' not in self.__dict__: 291 self._metadata = sqlalchemy.MetaData(schema=self.schema) 292 return self._metadata
Return the metadata bound to this configured schema.
295 @property 296 def instance_schema(self): 297 """ 298 Return the schema name for Meerschaum tables. 299 """ 300 return self.schema
Return the schema name for Meerschaum tables.
303 @property 304 def internal_schema(self): 305 """ 306 Return the schema name for internal tables. 307 """ 308 from meerschaum.config.static import STATIC_CONFIG 309 from meerschaum.utils.packages import attempt_import 310 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 311 schema_name = self.__dict__.get('internal_schema', None) or ( 312 STATIC_CONFIG['sql']['internal_schema'] 313 if self.flavor not in NO_SCHEMA_FLAVORS 314 else self.schema 315 ) 316 317 if '_internal_schema' not in self.__dict__: 318 self._internal_schema = schema_name 319 return self._internal_schema
Return the schema name for internal tables.
322 @property 323 def db(self) -> Optional[databases.Database]: 324 from meerschaum.utils.packages import attempt_import 325 databases = attempt_import('databases', lazy=False, install=True) 326 url = self.DATABASE_URL 327 if 'mysql' in url: 328 url = url.replace('+pymysql', '') 329 if '_db' not in self.__dict__: 330 try: 331 self._db = databases.Database(url) 332 except KeyError: 333 ### Likely encountered an unsupported flavor. 334 from meerschaum.utils.warnings import warn 335 self._db = None 336 return self._db
339 @property 340 def db_version(self) -> Union[str, None]: 341 """ 342 Return the database version. 343 """ 344 _db_version = self.__dict__.get('_db_version', None) 345 if _db_version is not None: 346 return _db_version 347 348 from meerschaum.utils.sql import get_db_version 349 self._db_version = get_db_version(self) 350 return self._db_version
Return the database version.
353 @property 354 def schema(self) -> Union[str, None]: 355 """ 356 Return the default schema to use. 357 A value of `None` will not prepend a schema. 358 """ 359 if 'schema' in self.__dict__: 360 return self.__dict__['schema'] 361 362 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 363 if self.flavor in NO_SCHEMA_FLAVORS: 364 self.__dict__['schema'] = None 365 return None 366 367 sqlalchemy = mrsm.attempt_import('sqlalchemy') 368 _schema = sqlalchemy.inspect(self.engine).default_schema_name 369 self.__dict__['schema'] = _schema 370 return _schema
Return the default schema to use.
A value of None
will not prepend a schema.
180def create_engine( 181 self, 182 include_uri: bool = False, 183 debug: bool = False, 184 **kw 185) -> 'sqlalchemy.engine.Engine': 186 """Create a sqlalchemy engine by building the engine string.""" 187 from meerschaum.utils.packages import attempt_import 188 from meerschaum.utils.warnings import error, warn 189 sqlalchemy = attempt_import('sqlalchemy') 190 import urllib 191 import copy 192 ### Install and patch required drivers. 193 if self.flavor in install_flavor_drivers: 194 attempt_import(*install_flavor_drivers[self.flavor], debug=debug, lazy=False, warn=False) 195 if self.flavor == 'mssql': 196 pyodbc = attempt_import('pyodbc', debug=debug, lazy=False, warn=False) 197 pyodbc.pooling = False 198 if self.flavor in require_patching_flavors: 199 from meerschaum.utils.packages import determine_version, _monkey_patch_get_distribution 200 import pathlib 201 for install_name, import_name in require_patching_flavors[self.flavor]: 202 pkg = attempt_import( 203 import_name, 204 debug=debug, 205 lazy=False, 206 warn=False 207 ) 208 _monkey_patch_get_distribution( 209 install_name, determine_version(pathlib.Path(pkg.__file__), venv='mrsm') 210 ) 211 212 ### supplement missing values with defaults (e.g. port number) 213 for a, value in flavor_configs[self.flavor]['defaults'].items(): 214 if a not in self.__dict__: 215 self.__dict__[a] = value 216 217 ### Verify that everything is in order. 218 if self.flavor not in flavor_configs: 219 error(f"Cannot create a connector with the flavor '{self.flavor}'.") 220 221 _engine = flavor_configs[self.flavor].get('engine', None) 222 _username = self.__dict__.get('username', None) 223 _password = self.__dict__.get('password', None) 224 _host = self.__dict__.get('host', None) 225 _port = self.__dict__.get('port', None) 226 _database = self.__dict__.get('database', None) 227 _options = self.__dict__.get('options', {}) 228 if isinstance(_options, str): 229 _options = dict(urllib.parse.parse_qsl(_options)) 230 _uri = self.__dict__.get('uri', None) 231 232 ### Handle registering specific dialects (due to installing in virtual environments). 233 if self.flavor in flavor_dialects: 234 sqlalchemy.dialects.registry.register(*flavor_dialects[self.flavor]) 235 236 ### self._sys_config was deepcopied and can be updated safely 237 if self.flavor in ("sqlite", "duckdb"): 238 engine_str = f"{_engine}:///{_database}" if not _uri else _uri 239 if 'create_engine' not in self._sys_config: 240 self._sys_config['create_engine'] = {} 241 if 'connect_args' not in self._sys_config['create_engine']: 242 self._sys_config['create_engine']['connect_args'] = {} 243 self._sys_config['create_engine']['connect_args'].update({"check_same_thread" : False}) 244 else: 245 engine_str = ( 246 _engine + "://" + (_username if _username is not None else '') + 247 ((":" + urllib.parse.quote_plus(_password)) if _password is not None else '') + 248 "@" + _host + ((":" + str(_port)) if _port is not None else '') + 249 (("/" + _database) if _database is not None else '') 250 + (("?" + urllib.parse.urlencode(_options)) if _options else '') 251 ) if not _uri else _uri 252 253 ### Sometimes the timescaledb:// flavor can slip in. 254 if _uri and self.flavor in ('timescaledb',) and self.flavor in _uri: 255 engine_str = engine_str.replace(f'{self.flavor}', 'postgresql', 1) 256 257 if debug: 258 dprint( 259 ( 260 (engine_str.replace(':' + _password, ':' + ('*' * len(_password)))) 261 if _password is not None else engine_str 262 ) + '\n' + f"{self._sys_config.get('create_engine', {}).get('connect_args', {})}" 263 ) 264 265 _kw_copy = copy.deepcopy(kw) 266 267 ### NOTE: Order of inheritance: 268 ### 1. Defaults 269 ### 2. System configuration 270 ### 3. Connector configuration 271 ### 4. Keyword arguments 272 _create_engine_args = flavor_configs.get(self.flavor, {}).get('create_engine', {}) 273 def _apply_create_engine_args(update): 274 if 'ALL' not in flavor_configs[self.flavor].get('omit_create_engine', {}): 275 _create_engine_args.update( 276 { k: v for k, v in update.items() 277 if 'omit_create_engine' not in flavor_configs[self.flavor] 278 or k not in flavor_configs[self.flavor].get('omit_create_engine') 279 } 280 ) 281 _apply_create_engine_args(self._sys_config.get('create_engine', {})) 282 _apply_create_engine_args(self.__dict__.get('create_engine', {})) 283 _apply_create_engine_args(_kw_copy) 284 285 try: 286 engine = sqlalchemy.create_engine( 287 engine_str, 288 ### I know this looks confusing, and maybe it's bad code, 289 ### but it's simple. It dynamically parses the config string 290 ### and splits it to separate the class name (QueuePool) 291 ### from the module name (sqlalchemy.pool). 292 poolclass = getattr( 293 attempt_import( 294 ".".join(self._sys_config['poolclass'].split('.')[:-1]) 295 ), 296 self._sys_config['poolclass'].split('.')[-1] 297 ), 298 echo = debug, 299 **_create_engine_args 300 ) 301 except Exception as e: 302 warn(f"Failed to create connector '{self}':\n{traceback.format_exc()}", stack=False) 303 engine = None 304 305 if include_uri: 306 return engine, engine_str 307 return engine
Create a sqlalchemy engine by building the engine string.
26def read( 27 self, 28 query_or_table: Union[str, sqlalchemy.Query], 29 params: Union[Dict[str, Any], List[str], None] = None, 30 dtype: Optional[Dict[str, Any]] = None, 31 coerce_float: bool = True, 32 chunksize: Optional[int] = -1, 33 workers: Optional[int] = None, 34 chunk_hook: Optional[Callable[[pandas.DataFrame], Any]] = None, 35 as_hook_results: bool = False, 36 chunks: Optional[int] = None, 37 schema: Optional[str] = None, 38 as_chunks: bool = False, 39 as_iterator: bool = False, 40 as_dask: bool = False, 41 index_col: Optional[str] = None, 42 silent: bool = False, 43 debug: bool = False, 44 **kw: Any 45) -> Union[ 46 pandas.DataFrame, 47 dask.DataFrame, 48 List[pandas.DataFrame], 49 List[Any], 50 None, 51]: 52 """ 53 Read a SQL query or table into a pandas dataframe. 54 55 Parameters 56 ---------- 57 query_or_table: Union[str, sqlalchemy.Query] 58 The SQL query (sqlalchemy Query or string) or name of the table from which to select. 59 60 params: Optional[Dict[str, Any]], default None 61 `List` or `Dict` of parameters to pass to `pandas.read_sql()`. 62 See the pandas documentation for more information: 63 https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html 64 65 dtype: Optional[Dict[str, Any]], default None 66 A dictionary of data types to pass to `pandas.read_sql()`. 67 See the pandas documentation for more information: 68 https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html 69 70 chunksize: Optional[int], default -1 71 How many chunks to read at a time. `None` will read everything in one large chunk. 72 Defaults to system configuration. 73 74 **NOTE:** DuckDB does not allow for chunking. 75 76 workers: Optional[int], default None 77 How many threads to use when consuming the generator. 78 Only applies if `chunk_hook` is provided. 79 80 chunk_hook: Optional[Callable[[pandas.DataFrame], Any]], default None 81 Hook function to execute once per chunk, e.g. writing and reading chunks intermittently. 82 See `--sync-chunks` for an example. 83 **NOTE:** `as_iterator` MUST be False (default). 84 85 as_hook_results: bool, default False 86 If `True`, return a `List` of the outputs of the hook function. 87 Only applicable if `chunk_hook` is not None. 88 89 **NOTE:** `as_iterator` MUST be `False` (default). 90 91 chunks: Optional[int], default None 92 Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and 93 return into a single dataframe. 94 For example, to limit the returned dataframe to 100,000 rows, 95 you could specify a `chunksize` of `1000` and `chunks` of `100`. 96 97 schema: Optional[str], default None 98 If just a table name is provided, optionally specify the table schema. 99 Defaults to `SQLConnector.schema`. 100 101 as_chunks: bool, default False 102 If `True`, return a list of DataFrames. 103 Otherwise return a single DataFrame. 104 105 as_iterator: bool, default False 106 If `True`, return the pandas DataFrame iterator. 107 `chunksize` must not be `None` (falls back to 1000 if so), 108 and hooks are not called in this case. 109 110 index_col: Optional[str], default None 111 If using Dask, use this column as the index column. 112 If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame. 113 114 silent: bool, default False 115 If `True`, don't raise warnings in case of errors. 116 Defaults to `False`. 117 118 Returns 119 ------- 120 A `pd.DataFrame` (default case), or an iterator, or a list of dataframes / iterators, 121 or `None` if something breaks. 122 123 """ 124 if chunks is not None and chunks <= 0: 125 return [] 126 from meerschaum.utils.sql import sql_item_name, truncate_item_name 127 from meerschaum.utils.dtypes.sql import NUMERIC_PRECISION_FLAVORS 128 from meerschaum.utils.packages import attempt_import, import_pandas 129 from meerschaum.utils.pool import get_pool 130 from meerschaum.utils.dataframe import chunksize_to_npartitions, get_numeric_cols 131 import warnings 132 import traceback 133 from decimal import Decimal 134 pd = import_pandas() 135 dd = None 136 is_dask = 'dask' in pd.__name__ 137 pd = attempt_import('pandas') 138 is_dask = dd is not None 139 npartitions = chunksize_to_npartitions(chunksize) 140 if is_dask: 141 chunksize = None 142 schema = schema or self.schema 143 144 pool = get_pool(workers=workers) 145 sqlalchemy = attempt_import("sqlalchemy") 146 default_chunksize = self._sys_config.get('chunksize', None) 147 chunksize = chunksize if chunksize != -1 else default_chunksize 148 if chunksize is None and as_iterator: 149 if not silent and self.flavor not in _disallow_chunks_flavors: 150 warn( 151 "An iterator may only be generated if chunksize is not None.\n" 152 + "Falling back to a chunksize of 1000.", stacklevel=3, 153 ) 154 chunksize = 1000 155 if chunksize is not None and self.flavor in _max_chunks_flavors: 156 if chunksize > _max_chunks_flavors[self.flavor]: 157 if chunksize != default_chunksize: 158 warn( 159 f"The specified chunksize of {chunksize} exceeds the maximum of " 160 + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n" 161 + f" Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.", 162 stacklevel=3, 163 ) 164 chunksize = _max_chunks_flavors[self.flavor] 165 166 ### NOTE: A bug in duckdb_engine does not allow for chunks. 167 if chunksize is not None and self.flavor in _disallow_chunks_flavors: 168 chunksize = None 169 170 if debug: 171 import time 172 start = time.perf_counter() 173 dprint(f"[{self}]\n{query_or_table}") 174 dprint(f"[{self}] Fetching with chunksize: {chunksize}") 175 176 ### This might be sqlalchemy object or the string of a table name. 177 ### We check for spaces and quotes to see if it might be a weird table. 178 if ( 179 ' ' not in str(query_or_table) 180 or ( 181 ' ' in str(query_or_table) 182 and str(query_or_table).startswith('"') 183 and str(query_or_table).endswith('"') 184 ) 185 ): 186 truncated_table_name = truncate_item_name(str(query_or_table), self.flavor) 187 if truncated_table_name != str(query_or_table) and not silent: 188 warn( 189 f"Table '{query_or_table}' is too long for '{self.flavor}'," 190 + f" will instead read the table '{truncated_table_name}'." 191 ) 192 193 query_or_table = sql_item_name(str(query_or_table), self.flavor, schema) 194 if debug: 195 dprint(f"[{self}] Reading from table {query_or_table}") 196 formatted_query = sqlalchemy.text("SELECT * FROM " + str(query_or_table)) 197 str_query = f"SELECT * FROM {query_or_table}" 198 else: 199 str_query = query_or_table 200 201 formatted_query = ( 202 sqlalchemy.text(str_query) 203 if not is_dask and isinstance(str_query, str) 204 else format_sql_query_for_dask(str_query) 205 ) 206 207 chunk_list = [] 208 chunk_hook_results = [] 209 def _process_chunk(_chunk, _retry_on_failure: bool = True): 210 if not as_hook_results: 211 chunk_list.append(_chunk) 212 if chunk_hook is None: 213 return None 214 215 result = None 216 try: 217 result = chunk_hook( 218 _chunk, 219 workers=workers, 220 chunksize=chunksize, 221 debug=debug, 222 **kw 223 ) 224 except Exception: 225 result = False, traceback.format_exc() 226 from meerschaum.utils.formatting import get_console 227 if not silent: 228 get_console().print_exception() 229 230 ### If the chunk fails to process, try it again one more time. 231 if isinstance(result, tuple) and result[0] is False: 232 if _retry_on_failure: 233 return _process_chunk(_chunk, _retry_on_failure=False) 234 235 return result 236 237 try: 238 stream_results = not as_iterator and chunk_hook is not None and chunksize is not None 239 with warnings.catch_warnings(): 240 warnings.filterwarnings('ignore', 'case sensitivity issues') 241 242 read_sql_query_kwargs = { 243 'params': params, 244 'dtype': dtype, 245 'coerce_float': coerce_float, 246 'index_col': index_col, 247 } 248 if is_dask: 249 if index_col is None: 250 dd = None 251 pd = attempt_import('pandas') 252 read_sql_query_kwargs.update({ 253 'chunksize': chunksize, 254 }) 255 else: 256 read_sql_query_kwargs.update({ 257 'chunksize': chunksize, 258 }) 259 260 if is_dask and dd is not None: 261 ddf = dd.read_sql_query( 262 formatted_query, 263 self.URI, 264 **read_sql_query_kwargs 265 ) 266 else: 267 268 def get_chunk_generator(connectable): 269 chunk_generator = pd.read_sql_query( 270 formatted_query, 271 self.engine, 272 **read_sql_query_kwargs 273 ) 274 to_return = ( 275 chunk_generator 276 if as_iterator or chunksize is None 277 else ( 278 list(pool.imap(_process_chunk, chunk_generator)) 279 if as_hook_results 280 else None 281 ) 282 ) 283 return chunk_generator, to_return 284 285 if self.flavor in SKIP_READ_TRANSACTION_FLAVORS: 286 chunk_generator, to_return = get_chunk_generator(self.engine) 287 else: 288 with self.engine.begin() as transaction: 289 with transaction.execution_options(stream_results=stream_results) as connection: 290 chunk_generator, to_return = get_chunk_generator(connection) 291 292 if to_return is not None: 293 return to_return 294 295 except Exception as e: 296 if debug: 297 dprint(f"[{self}] Failed to execute query:\n\n{query_or_table}\n\n") 298 if not silent: 299 warn(str(e), stacklevel=3) 300 from meerschaum.utils.formatting import get_console 301 if not silent: 302 get_console().print_exception() 303 304 return None 305 306 if is_dask and dd is not None: 307 ddf = ddf.reset_index() 308 return ddf 309 310 chunk_list = [] 311 read_chunks = 0 312 chunk_hook_results = [] 313 if chunksize is None: 314 chunk_list.append(chunk_generator) 315 elif as_iterator: 316 return chunk_generator 317 else: 318 try: 319 for chunk in chunk_generator: 320 if chunk_hook is not None: 321 chunk_hook_results.append( 322 chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw) 323 ) 324 chunk_list.append(chunk) 325 read_chunks += 1 326 if chunks is not None and read_chunks >= chunks: 327 break 328 except Exception as e: 329 warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3) 330 from meerschaum.utils.formatting import get_console 331 if not silent: 332 get_console().print_exception() 333 334 read_chunks = 0 335 try: 336 for chunk in chunk_generator: 337 if chunk_hook is not None: 338 chunk_hook_results.append( 339 chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw) 340 ) 341 chunk_list.append(chunk) 342 read_chunks += 1 343 if chunks is not None and read_chunks >= chunks: 344 break 345 except Exception as e: 346 warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3) 347 from meerschaum.utils.formatting import get_console 348 if not silent: 349 get_console().print_exception() 350 351 return None 352 353 ### If no chunks returned, read without chunks 354 ### to get columns 355 if len(chunk_list) == 0: 356 with warnings.catch_warnings(): 357 warnings.filterwarnings('ignore', 'case sensitivity issues') 358 _ = read_sql_query_kwargs.pop('chunksize', None) 359 with self.engine.begin() as connection: 360 chunk_list.append( 361 pd.read_sql_query( 362 formatted_query, 363 connection, 364 **read_sql_query_kwargs 365 ) 366 ) 367 368 ### call the hook on any missed chunks. 369 if chunk_hook is not None and len(chunk_list) > len(chunk_hook_results): 370 for c in chunk_list[len(chunk_hook_results):]: 371 chunk_hook_results.append( 372 chunk_hook(c, chunksize=chunksize, debug=debug, **kw) 373 ) 374 375 ### chunksize is not None so must iterate 376 if debug: 377 end = time.perf_counter() 378 dprint(f"Fetched {len(chunk_list)} chunks in {round(end - start, 2)} seconds.") 379 380 if as_hook_results: 381 return chunk_hook_results 382 383 ### Skip `pd.concat()` if `as_chunks` is specified. 384 if as_chunks: 385 for c in chunk_list: 386 c.reset_index(drop=True, inplace=True) 387 for col in get_numeric_cols(c): 388 c[col] = c[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x) 389 return chunk_list 390 391 df = pd.concat(chunk_list).reset_index(drop=True) 392 ### NOTE: The calls to `canonical()` are to drop leading and trailing zeroes. 393 for col in get_numeric_cols(df): 394 df[col] = df[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x) 395 396 return df
Read a SQL query or table into a pandas dataframe.
Parameters
- query_or_table (Union[str, sqlalchemy.Query]): The SQL query (sqlalchemy Query or string) or name of the table from which to select.
- params (Optional[Dict[str, Any]], default None):
List
orDict
of parameters to pass topandas.read_sql()
. See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html - dtype (Optional[Dict[str, Any]], default None):
A dictionary of data types to pass to
pandas.read_sql()
. See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html chunksize (Optional[int], default -1): How many chunks to read at a time.
None
will read everything in one large chunk. Defaults to system configuration.NOTE: DuckDB does not allow for chunking.
- workers (Optional[int], default None):
How many threads to use when consuming the generator.
Only applies if
chunk_hook
is provided. - chunk_hook (Optional[Callable[[pandas.DataFrame], Any]], default None):
Hook function to execute once per chunk, e.g. writing and reading chunks intermittently.
See
--sync-chunks
for an example. NOTE:as_iterator
MUST be False (default). as_hook_results (bool, default False): If
True
, return aList
of the outputs of the hook function. Only applicable ifchunk_hook
is not None.NOTE:
as_iterator
MUST beFalse
(default).- chunks (Optional[int], default None):
Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and
return into a single dataframe.
For example, to limit the returned dataframe to 100,000 rows,
you could specify a
chunksize
of1000
andchunks
of100
. - schema (Optional[str], default None):
If just a table name is provided, optionally specify the table schema.
Defaults to
SQLConnector.schema
. - as_chunks (bool, default False):
If
True
, return a list of DataFrames. Otherwise return a single DataFrame. - as_iterator (bool, default False):
If
True
, return the pandas DataFrame iterator.chunksize
must not beNone
(falls back to 1000 if so), and hooks are not called in this case. - index_col (Optional[str], default None): If using Dask, use this column as the index column. If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame.
- silent (bool, default False):
If
True
, don't raise warnings in case of errors. Defaults toFalse
.
Returns
- A
pd.DataFrame
(default case), or an iterator, or a list of dataframes / iterators, - or
None
if something breaks.
399def value( 400 self, 401 query: str, 402 *args: Any, 403 use_pandas: bool = False, 404 **kw: Any 405) -> Any: 406 """ 407 Execute the provided query and return the first value. 408 409 Parameters 410 ---------- 411 query: str 412 The SQL query to execute. 413 414 *args: Any 415 The arguments passed to `meerschaum.connectors.sql.SQLConnector.exec` 416 if `use_pandas` is `False` (default) or to `meerschaum.connectors.sql.SQLConnector.read`. 417 418 use_pandas: bool, default False 419 If `True`, use `meerschaum.connectors.SQLConnector.read`, otherwise use 420 `meerschaum.connectors.sql.SQLConnector.exec` (default). 421 **NOTE:** This is always `True` for DuckDB. 422 423 **kw: Any 424 See `args`. 425 426 Returns 427 ------- 428 Any value returned from the query. 429 430 """ 431 from meerschaum.utils.packages import attempt_import 432 sqlalchemy = attempt_import('sqlalchemy') 433 if self.flavor == 'duckdb': 434 use_pandas = True 435 if use_pandas: 436 try: 437 return self.read(query, *args, **kw).iloc[0, 0] 438 except Exception: 439 return None 440 441 _close = kw.get('close', True) 442 _commit = kw.get('commit', (self.flavor != 'mssql')) 443 444 # _close = True 445 # _commit = True 446 447 try: 448 result, connection = self.exec( 449 query, 450 *args, 451 with_connection=True, 452 close=False, 453 commit=_commit, 454 **kw 455 ) 456 first = result.first() if result is not None else None 457 _val = first[0] if first is not None else None 458 except Exception as e: 459 warn(e, stacklevel=3) 460 return None 461 if _close: 462 try: 463 connection.close() 464 except Exception as e: 465 warn("Failed to close connection with exception:\n" + str(e)) 466 return _val
Execute the provided query and return the first value.
Parameters
- query (str): The SQL query to execute.
- *args (Any):
The arguments passed to
meerschaum.connectors.sql.SQLConnector.exec
ifuse_pandas
isFalse
(default) or tomeerschaum.connectors.sql.SQLConnector.read
. - use_pandas (bool, default False):
If
True
, usemeerschaum.connectors.SQLConnector.read
, otherwise usemeerschaum.connectors.sql.SQLConnector.exec
(default). NOTE: This is alwaysTrue
for DuckDB. - **kw (Any):
See
args
.
Returns
- Any value returned from the query.
480def exec( 481 self, 482 query: str, 483 *args: Any, 484 silent: bool = False, 485 debug: bool = False, 486 commit: Optional[bool] = None, 487 close: Optional[bool] = None, 488 with_connection: bool = False, 489 **kw: Any 490) -> Union[ 491 sqlalchemy.engine.result.resultProxy, 492 sqlalchemy.engine.cursor.LegacyCursorResult, 493 Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection], 494 Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection], 495 None 496]: 497 """ 498 Execute SQL code and return the `sqlalchemy` result, e.g. when calling stored procedures. 499 500 If inserting data, please use bind variables to avoid SQL injection! 501 502 Parameters 503 ---------- 504 query: Union[str, List[str], Tuple[str]] 505 The query to execute. 506 If `query` is a list or tuple, call `self.exec_queries()` instead. 507 508 args: Any 509 Arguments passed to `sqlalchemy.engine.execute`. 510 511 silent: bool, default False 512 If `True`, suppress warnings. 513 514 commit: Optional[bool], default None 515 If `True`, commit the changes after execution. 516 Causes issues with flavors like `'mssql'`. 517 This does not apply if `query` is a list of strings. 518 519 close: Optional[bool], default None 520 If `True`, close the connection after execution. 521 Causes issues with flavors like `'mssql'`. 522 This does not apply if `query` is a list of strings. 523 524 with_connection: bool, default False 525 If `True`, return a tuple including the connection object. 526 This does not apply if `query` is a list of strings. 527 528 Returns 529 ------- 530 The `sqlalchemy` result object, or a tuple with the connection if `with_connection` is provided. 531 532 """ 533 if isinstance(query, (list, tuple)): 534 return self.exec_queries( 535 list(query), 536 *args, 537 silent=silent, 538 debug=debug, 539 **kw 540 ) 541 542 from meerschaum.utils.packages import attempt_import 543 sqlalchemy = attempt_import("sqlalchemy") 544 if debug: 545 dprint(f"[{self}] Executing query:\n{query}") 546 547 _close = close if close is not None else (self.flavor != 'mssql') 548 _commit = commit if commit is not None else ( 549 (self.flavor != 'mssql' or 'select' not in str(query).lower()) 550 ) 551 552 ### Select and Insert objects need to be compiled (SQLAlchemy 2.0.0+). 553 if not hasattr(query, 'compile'): 554 query = sqlalchemy.text(query) 555 556 connection = self.get_connection() 557 558 try: 559 transaction = connection.begin() if _commit else None 560 except sqlalchemy.exc.InvalidRequestError: 561 connection = self.get_connection(rebuild=True) 562 transaction = connection.begin() 563 564 if transaction is not None and not transaction.is_active: 565 connection = self.get_connection(rebuild=True) 566 transaction = connection.begin() if _commit else None 567 568 result = None 569 try: 570 result = connection.execute(query, *args, **kw) 571 if _commit: 572 transaction.commit() 573 except Exception as e: 574 if debug: 575 dprint(f"[{self}] Failed to execute query:\n\n{query}\n\n{e}") 576 if not silent: 577 warn(str(e), stacklevel=3) 578 result = None 579 if _commit: 580 transaction.rollback() 581 connection = self.get_connection(rebuild=True) 582 finally: 583 if _close: 584 connection.close() 585 586 if with_connection: 587 return result, connection 588 589 return result
Execute SQL code and return the sqlalchemy
result, e.g. when calling stored procedures.
If inserting data, please use bind variables to avoid SQL injection!
Parameters
- query (Union[str, List[str], Tuple[str]]):
The query to execute.
If
query
is a list or tuple, callself.exec_queries()
instead. - args (Any):
Arguments passed to
sqlalchemy.engine.execute
. - silent (bool, default False):
If
True
, suppress warnings. - commit (Optional[bool], default None):
If
True
, commit the changes after execution. Causes issues with flavors like'mssql'
. This does not apply ifquery
is a list of strings. - close (Optional[bool], default None):
If
True
, close the connection after execution. Causes issues with flavors like'mssql'
. This does not apply ifquery
is a list of strings. - with_connection (bool, default False):
If
True
, return a tuple including the connection object. This does not apply ifquery
is a list of strings.
Returns
- The
sqlalchemy
result object, or a tuple with the connection ifwith_connection
is provided.
469def execute( 470 self, 471 *args : Any, 472 **kw : Any 473) -> Optional[sqlalchemy.engine.result.resultProxy]: 474 """ 475 An alias for `meerschaum.connectors.sql.SQLConnector.exec`. 476 """ 477 return self.exec(*args, **kw)
An alias for meerschaum.connectors.sql.SQLConnector.exec
.
686def to_sql( 687 self, 688 df: pandas.DataFrame, 689 name: str = None, 690 index: bool = False, 691 if_exists: str = 'replace', 692 method: str = "", 693 chunksize: Optional[int] = -1, 694 schema: Optional[str] = None, 695 silent: bool = False, 696 debug: bool = False, 697 as_tuple: bool = False, 698 as_dict: bool = False, 699 **kw 700) -> Union[bool, SuccessTuple]: 701 """ 702 Upload a DataFrame's contents to the SQL server. 703 704 Parameters 705 ---------- 706 df: pd.DataFrame 707 The DataFrame to be uploaded. 708 709 name: str 710 The name of the table to be created. 711 712 index: bool, default False 713 If True, creates the DataFrame's indices as columns. 714 715 if_exists: str, default 'replace' 716 Drop and create the table ('replace') or append if it exists 717 ('append') or raise Exception ('fail'). 718 Options are ['replace', 'append', 'fail']. 719 720 method: str, default '' 721 None or multi. Details on pandas.to_sql. 722 723 chunksize: Optional[int], default -1 724 How many rows to insert at a time. 725 726 schema: Optional[str], default None 727 Optionally override the schema for the table. 728 Defaults to `SQLConnector.schema`. 729 730 as_tuple: bool, default False 731 If `True`, return a (success_bool, message) tuple instead of a `bool`. 732 Defaults to `False`. 733 734 as_dict: bool, default False 735 If `True`, return a dictionary of transaction information. 736 The keys are `success`, `msg`, `start`, `end`, `duration`, `num_rows`, `chunksize`, 737 `method`, and `target`. 738 739 kw: Any 740 Additional arguments will be passed to the DataFrame's `to_sql` function 741 742 Returns 743 ------- 744 Either a `bool` or a `SuccessTuple` (depends on `as_tuple`). 745 """ 746 import time 747 import json 748 import decimal 749 from decimal import Decimal, Context 750 from meerschaum.utils.warnings import error, warn 751 import warnings 752 import functools 753 if name is None: 754 error(f"Name must not be `None` to insert data into {self}.") 755 756 ### We're requiring `name` to be positional, and sometimes it's passed in from background jobs. 757 kw.pop('name', None) 758 759 schema = schema or self.schema 760 761 from meerschaum.utils.sql import ( 762 sql_item_name, 763 table_exists, 764 json_flavors, 765 truncate_item_name, 766 DROP_IF_EXISTS_FLAVORS, 767 ) 768 from meerschaum.utils.dataframe import get_json_cols, get_numeric_cols, get_uuid_cols 769 from meerschaum.utils.dtypes import are_dtypes_equal, quantize_decimal 770 from meerschaum.utils.dtypes.sql import ( 771 NUMERIC_PRECISION_FLAVORS, 772 PD_TO_SQLALCHEMY_DTYPES_FLAVORS, 773 ) 774 from meerschaum.connectors.sql._create_engine import flavor_configs 775 from meerschaum.utils.packages import attempt_import, import_pandas 776 sqlalchemy = attempt_import('sqlalchemy', debug=debug) 777 pd = import_pandas() 778 is_dask = 'dask' in df.__module__ 779 780 stats = {'target': name, } 781 ### resort to defaults if None 782 if method == "": 783 if self.flavor in _bulk_flavors: 784 method = functools.partial(psql_insert_copy, schema=self.schema) 785 else: 786 ### Should resolve to 'multi' or `None`. 787 method = flavor_configs.get(self.flavor, {}).get('to_sql', {}).get('method', 'multi') 788 stats['method'] = method.__name__ if hasattr(method, '__name__') else str(method) 789 790 default_chunksize = self._sys_config.get('chunksize', None) 791 chunksize = chunksize if chunksize != -1 else default_chunksize 792 if chunksize is not None and self.flavor in _max_chunks_flavors: 793 if chunksize > _max_chunks_flavors[self.flavor]: 794 if chunksize != default_chunksize: 795 warn( 796 f"The specified chunksize of {chunksize} exceeds the maximum of " 797 + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n" 798 + f" Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.", 799 stacklevel = 3, 800 ) 801 chunksize = _max_chunks_flavors[self.flavor] 802 stats['chunksize'] = chunksize 803 804 success, msg = False, "Default to_sql message" 805 start = time.perf_counter() 806 if debug: 807 msg = f"[{self}] Inserting {len(df)} rows with chunksize: {chunksize}..." 808 print(msg, end="", flush=True) 809 stats['num_rows'] = len(df) 810 811 ### Check if the name is too long. 812 truncated_name = truncate_item_name(name, self.flavor) 813 if name != truncated_name: 814 warn( 815 f"Table '{name}' is too long for '{self.flavor}'," 816 + f" will instead create the table '{truncated_name}'." 817 ) 818 819 ### filter out non-pandas args 820 import inspect 821 to_sql_params = inspect.signature(df.to_sql).parameters 822 to_sql_kw = {} 823 for k, v in kw.items(): 824 if k in to_sql_params: 825 to_sql_kw[k] = v 826 827 to_sql_kw.update({ 828 'name': truncated_name, 829 'schema': schema, 830 ('con' if not is_dask else 'uri'): (self.engine if not is_dask else self.URI), 831 'index': index, 832 'if_exists': if_exists, 833 'method': method, 834 'chunksize': chunksize, 835 }) 836 if is_dask: 837 to_sql_kw.update({ 838 'parallel': True, 839 }) 840 841 if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else "" 842 if self.flavor == 'oracle': 843 ### For some reason 'replace' doesn't work properly in pandas, 844 ### so try dropping first. 845 if if_exists == 'replace' and table_exists(name, self, schema=schema, debug=debug): 846 success = self.exec( 847 f"DROP TABLE {if_exists_str}" + sql_item_name(name, 'oracle', schema) 848 ) is not None 849 if not success: 850 warn(f"Unable to drop {name}") 851 852 853 ### Enforce NVARCHAR(2000) as text instead of CLOB. 854 dtype = to_sql_kw.get('dtype', {}) 855 for col, typ in df.dtypes.items(): 856 if are_dtypes_equal(str(typ), 'object'): 857 dtype[col] = sqlalchemy.types.NVARCHAR(2000) 858 elif are_dtypes_equal(str(typ), 'int'): 859 dtype[col] = sqlalchemy.types.INTEGER 860 to_sql_kw['dtype'] = dtype 861 elif self.flavor == 'mssql': 862 dtype = to_sql_kw.get('dtype', {}) 863 for col, typ in df.dtypes.items(): 864 if are_dtypes_equal(str(typ), 'bool'): 865 dtype[col] = sqlalchemy.types.INTEGER 866 to_sql_kw['dtype'] = dtype 867 868 ### Check for JSON columns. 869 if self.flavor not in json_flavors: 870 json_cols = get_json_cols(df) 871 if json_cols: 872 for col in json_cols: 873 df[col] = df[col].apply( 874 ( 875 lambda x: json.dumps(x, default=str, sort_keys=True) 876 if not isinstance(x, Hashable) 877 else x 878 ) 879 ) 880 881 ### Check for numeric columns. 882 numeric_scale, numeric_precision = NUMERIC_PRECISION_FLAVORS.get(self.flavor, (None, None)) 883 if numeric_precision is not None and numeric_scale is not None: 884 numeric_cols = get_numeric_cols(df) 885 for col in numeric_cols: 886 df[col] = df[col].apply( 887 lambda x: ( 888 quantize_decimal(x, numeric_scale, numeric_precision) 889 if isinstance(x, Decimal) 890 else x 891 ) 892 ) 893 894 if PD_TO_SQLALCHEMY_DTYPES_FLAVORS['uuid'].get(self.flavor, None) != 'Uuid': 895 uuid_cols = get_uuid_cols(df) 896 for col in uuid_cols: 897 df[col] = df[col].astype(str) 898 899 try: 900 with warnings.catch_warnings(): 901 warnings.filterwarnings('ignore', 'case sensitivity issues') 902 df.to_sql(**to_sql_kw) 903 success = True 904 except Exception as e: 905 if not silent: 906 warn(str(e)) 907 success, msg = False, str(e) 908 909 end = time.perf_counter() 910 if success: 911 msg = f"It took {round(end - start, 2)} seconds to sync {len(df)} rows to {name}." 912 stats['start'] = start 913 stats['end'] = end 914 stats['duration'] = end - start 915 916 if debug: 917 print(f" done.", flush=True) 918 dprint(msg) 919 920 stats['success'] = success 921 stats['msg'] = msg 922 if as_tuple: 923 return success, msg 924 if as_dict: 925 return stats 926 return success
Upload a DataFrame's contents to the SQL server.
Parameters
- df (pd.DataFrame): The DataFrame to be uploaded.
- name (str): The name of the table to be created.
- index (bool, default False): If True, creates the DataFrame's indices as columns.
- if_exists (str, default 'replace'): Drop and create the table ('replace') or append if it exists ('append') or raise Exception ('fail'). Options are ['replace', 'append', 'fail'].
- method (str, default ''): None or multi. Details on pandas.to_sql.
- chunksize (Optional[int], default -1): How many rows to insert at a time.
- schema (Optional[str], default None):
Optionally override the schema for the table.
Defaults to
SQLConnector.schema
. - as_tuple (bool, default False):
If
True
, return a (success_bool, message) tuple instead of abool
. Defaults toFalse
. - as_dict (bool, default False):
If
True
, return a dictionary of transaction information. The keys aresuccess
,msg
,start
,end
,duration
,num_rows
,chunksize
,method
, andtarget
. - kw (Any):
Additional arguments will be passed to the DataFrame's
to_sql
function
Returns
- Either a
bool
or aSuccessTuple
(depends onas_tuple
).
592def exec_queries( 593 self, 594 queries: List[ 595 Union[ 596 str, 597 Tuple[str, Callable[['sqlalchemy.orm.session.Session'], List[str]]] 598 ] 599 ], 600 break_on_error: bool = False, 601 rollback: bool = True, 602 silent: bool = False, 603 debug: bool = False, 604) -> List[sqlalchemy.engine.cursor.LegacyCursorResult]: 605 """ 606 Execute a list of queries in a single transaction. 607 608 Parameters 609 ---------- 610 queries: List[ 611 Union[ 612 str, 613 Tuple[str, Callable[[], List[str]]] 614 ] 615 ] 616 The queries in the transaction to be executed. 617 If a query is a tuple, the second item of the tuple 618 will be considered a callable hook that returns a list of queries to be executed 619 before the next item in the list. 620 621 break_on_error: bool, default True 622 If `True`, stop executing when a query fails. 623 624 rollback: bool, default True 625 If `break_on_error` is `True`, rollback the transaction if a query fails. 626 627 silent: bool, default False 628 If `True`, suppress warnings. 629 630 Returns 631 ------- 632 A list of SQLAlchemy results. 633 """ 634 from meerschaum.utils.warnings import warn 635 from meerschaum.utils.debug import dprint 636 from meerschaum.utils.packages import attempt_import 637 sqlalchemy, sqlalchemy_orm = attempt_import('sqlalchemy', 'sqlalchemy.orm') 638 session = sqlalchemy_orm.Session(self.engine) 639 640 result = None 641 results = [] 642 with session.begin(): 643 for query in queries: 644 hook = None 645 result = None 646 647 if isinstance(query, tuple): 648 query, hook = query 649 if isinstance(query, str): 650 query = sqlalchemy.text(query) 651 652 if debug: 653 dprint(f"[{self}]\n" + str(query)) 654 655 try: 656 result = session.execute(query) 657 session.flush() 658 except Exception as e: 659 msg = (f"Encountered error while executing:\n{e}") 660 if not silent: 661 warn(msg) 662 elif debug: 663 dprint(f"[{self}]\n" + str(msg)) 664 result = None 665 if result is None and break_on_error: 666 if rollback: 667 session.rollback() 668 break 669 elif result is not None and hook is not None: 670 hook_queries = hook(session) 671 if hook_queries: 672 hook_results = self.exec_queries( 673 hook_queries, 674 break_on_error = break_on_error, 675 rollback=rollback, 676 silent=silent, 677 debug=debug, 678 ) 679 result = (result, hook_results) 680 681 results.append(result) 682 683 return results
Execute a list of queries in a single transaction.
Parameters
- queries (List[): Union[ str, Tuple[str, Callable[[], List[str]]] ]
- ]: The queries in the transaction to be executed. If a query is a tuple, the second item of the tuple will be considered a callable hook that returns a list of queries to be executed before the next item in the list.
- break_on_error (bool, default True):
If
True
, stop executing when a query fails. - rollback (bool, default True):
If
break_on_error
isTrue
, rollback the transaction if a query fails. - silent (bool, default False):
If
True
, suppress warnings.
Returns
- A list of SQLAlchemy results.
1025def get_connection(self, rebuild: bool = False) -> 'sqlalchemy.engine.base.Connection': 1026 """ 1027 Return the current alive connection. 1028 1029 Parameters 1030 ---------- 1031 rebuild: bool, default False 1032 If `True`, close the previous connection and open a new one. 1033 1034 Returns 1035 ------- 1036 A `sqlalchemy.engine.base.Connection` object. 1037 """ 1038 import threading 1039 if '_thread_connections' not in self.__dict__: 1040 self.__dict__['_thread_connections'] = {} 1041 1042 self._cleanup_connections() 1043 1044 thread_id = threading.get_ident() 1045 1046 thread_connections = self.__dict__.get('_thread_connections', {}) 1047 connection = thread_connections.get(thread_id, None) 1048 1049 if rebuild and connection is not None: 1050 try: 1051 connection.close() 1052 except Exception: 1053 pass 1054 1055 _ = thread_connections.pop(thread_id, None) 1056 connection = None 1057 1058 if connection is None or connection.closed: 1059 connection = self.engine.connect() 1060 thread_connections[thread_id] = connection 1061 1062 return connection
Return the current alive connection.
Parameters
- rebuild (bool, default False):
If
True
, close the previous connection and open a new one.
Returns
- A
sqlalchemy.engine.base.Connection
object.
428def test_connection( 429 self, 430 **kw: Any 431 ) -> Union[bool, None]: 432 """ 433 Test if a successful connection to the database may be made. 434 435 Parameters 436 ---------- 437 **kw: 438 The keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`. 439 440 Returns 441 ------- 442 `True` if a connection is made, otherwise `False` or `None` in case of failure. 443 444 """ 445 import warnings 446 from meerschaum.connectors.poll import retry_connect 447 _default_kw = {'max_retries': 1, 'retry_wait': 0, 'warn': False, 'connector': self} 448 _default_kw.update(kw) 449 with warnings.catch_warnings(): 450 warnings.filterwarnings('ignore', 'Could not') 451 try: 452 return retry_connect(**_default_kw) 453 except Exception as e: 454 return False
Test if a successful connection to the database may be made.
Parameters
- **kw:: The keyword arguments are passed to
meerschaum.connectors.poll.retry_connect
.
Returns
True
if a connection is made, otherwiseFalse
orNone
in case of failure.
17def fetch( 18 self, 19 pipe: mrsm.Pipe, 20 begin: Union[datetime, int, str, None] = '', 21 end: Union[datetime, int, str, None] = None, 22 check_existing: bool = True, 23 chunk_hook: Optional[Callable[['pd.DataFrame'], Any]] = None, 24 chunksize: Optional[int] = -1, 25 workers: Optional[int] = None, 26 debug: bool = False, 27 **kw: Any 28) -> Union['pd.DataFrame', List[Any], None]: 29 """Execute the SQL definition and return a Pandas DataFrame. 30 31 Parameters 32 ---------- 33 pipe: mrsm.Pipe 34 The pipe object which contains the `fetch` metadata. 35 36 - pipe.columns['datetime']: str 37 - Name of the datetime column for the remote table. 38 - pipe.parameters['fetch']: Dict[str, Any] 39 - Parameters necessary to execute a query. 40 - pipe.parameters['fetch']['definition']: str 41 - Raw SQL query to execute to generate the pandas DataFrame. 42 - pipe.parameters['fetch']['backtrack_minutes']: Union[int, float] 43 - How many minutes before `begin` to search for data (*optional*). 44 45 begin: Union[datetime, int, str, None], default None 46 Most recent datatime to search for data. 47 If `backtrack_minutes` is provided, subtract `backtrack_minutes`. 48 49 end: Union[datetime, int, str, None], default None 50 The latest datetime to search for data. 51 If `end` is `None`, do not bound 52 53 check_existing: bool, defult True 54 If `False`, use a backtrack interval of 0 minutes. 55 56 chunk_hook: Callable[[pd.DataFrame], Any], default None 57 A function to pass to `SQLConnector.read()` that accepts a Pandas DataFrame. 58 59 chunksize: Optional[int], default -1 60 How many rows to load into memory at once (when `chunk_hook` is provided). 61 Otherwise the entire result set is loaded into memory. 62 63 workers: Optional[int], default None 64 How many threads to use when consuming the generator (when `chunk_hook is provided). 65 Defaults to the number of cores. 66 67 debug: bool, default False 68 Verbosity toggle. 69 70 Returns 71 ------- 72 A pandas DataFrame or `None`. 73 If `chunk_hook` is not None, return a list of the hook function's results. 74 """ 75 meta_def = self.get_pipe_metadef( 76 pipe, 77 begin=begin, 78 end=end, 79 check_existing=check_existing, 80 debug=debug, 81 **kw 82 ) 83 as_hook_results = chunk_hook is not None 84 chunks = self.read( 85 meta_def, 86 chunk_hook=chunk_hook, 87 as_hook_results=as_hook_results, 88 chunksize=chunksize, 89 workers=workers, 90 debug=debug, 91 ) 92 ### if sqlite, parse for datetimes 93 if not as_hook_results and self.flavor == 'sqlite': 94 from meerschaum.utils.misc import parse_df_datetimes 95 ignore_cols = [ 96 col 97 for col, dtype in pipe.dtypes.items() 98 if 'datetime' not in str(dtype) 99 ] 100 return ( 101 parse_df_datetimes( 102 chunk, 103 ignore_cols=ignore_cols, 104 debug=debug, 105 ) 106 for chunk in chunks 107 ) 108 return chunks
Execute the SQL definition and return a Pandas DataFrame.
Parameters
pipe (mrsm.Pipe): The pipe object which contains the
fetch
metadata.- pipe.columns['datetime']: str
- Name of the datetime column for the remote table.
- pipe.parameters['fetch']: Dict[str, Any]
- Parameters necessary to execute a query.
- pipe.parameters['fetch']['definition']: str
- Raw SQL query to execute to generate the pandas DataFrame.
- pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
- How many minutes before
begin
to search for data (optional).
- How many minutes before
- pipe.columns['datetime']: str
- begin (Union[datetime, int, str, None], default None):
Most recent datatime to search for data.
If
backtrack_minutes
is provided, subtractbacktrack_minutes
. - end (Union[datetime, int, str, None], default None):
The latest datetime to search for data.
If
end
isNone
, do not bound - check_existing (bool, defult True):
If
False
, use a backtrack interval of 0 minutes. - chunk_hook (Callable[[pd.DataFrame], Any], default None):
A function to pass to
SQLConnector.read()
that accepts a Pandas DataFrame. - chunksize (Optional[int], default -1):
How many rows to load into memory at once (when
chunk_hook
is provided). Otherwise the entire result set is loaded into memory. - workers (Optional[int], default None): How many threads to use when consuming the generator (when `chunk_hook is provided). Defaults to the number of cores.
- debug (bool, default False): Verbosity toggle.
Returns
- A pandas DataFrame or
None
. - If
chunk_hook
is not None, return a list of the hook function's results.
111def get_pipe_metadef( 112 self, 113 pipe: mrsm.Pipe, 114 params: Optional[Dict[str, Any]] = None, 115 begin: Union[datetime, int, str, None] = '', 116 end: Union[datetime, int, str, None] = None, 117 check_existing: bool = True, 118 debug: bool = False, 119 **kw: Any 120) -> Union[str, None]: 121 """ 122 Return a pipe's meta definition fetch query. 123 124 params: Optional[Dict[str, Any]], default None 125 Optional params dictionary to build the `WHERE` clause. 126 See `meerschaum.utils.sql.build_where`. 127 128 begin: Union[datetime, int, str, None], default None 129 Most recent datatime to search for data. 130 If `backtrack_minutes` is provided, subtract `backtrack_minutes`. 131 132 end: Union[datetime, int, str, None], default None 133 The latest datetime to search for data. 134 If `end` is `None`, do not bound 135 136 check_existing: bool, default True 137 If `True`, apply the backtrack interval. 138 139 debug: bool, default False 140 Verbosity toggle. 141 142 Returns 143 ------- 144 A pipe's meta definition fetch query string. 145 """ 146 from meerschaum.utils.debug import dprint 147 from meerschaum.utils.warnings import warn, error 148 from meerschaum.utils.sql import sql_item_name, dateadd_str, build_where 149 from meerschaum.utils.misc import is_int 150 from meerschaum.config import get_config 151 152 definition = get_pipe_query(pipe) 153 154 if not pipe.columns.get('datetime', None): 155 _dt = pipe.guess_datetime() 156 dt_name = sql_item_name(_dt, self.flavor, None) if _dt else None 157 is_guess = True 158 else: 159 _dt = pipe.get_columns('datetime') 160 dt_name = sql_item_name(_dt, self.flavor, None) 161 is_guess = False 162 163 if begin not in (None, '') or end is not None: 164 if is_guess: 165 if _dt is None: 166 warn( 167 f"Unable to determine a datetime column for {pipe}." 168 + "\n Ignoring begin and end...", 169 stack = False, 170 ) 171 begin, end = '', None 172 else: 173 warn( 174 f"A datetime wasn't specified for {pipe}.\n" 175 + f" Using column \"{_dt}\" for datetime bounds...", 176 stack = False 177 ) 178 179 apply_backtrack = begin == '' and check_existing 180 backtrack_interval = pipe.get_backtrack_interval(check_existing=check_existing, debug=debug) 181 btm = ( 182 int(backtrack_interval.total_seconds() / 60) 183 if isinstance(backtrack_interval, timedelta) 184 else backtrack_interval 185 ) 186 begin = ( 187 pipe.get_sync_time(debug=debug) 188 if begin == '' 189 else begin 190 ) 191 192 if begin and end and begin >= end: 193 begin = None 194 195 if dt_name: 196 begin_da = ( 197 dateadd_str( 198 flavor=self.flavor, 199 datepart='minute', 200 number=((-1 * btm) if apply_backtrack else 0), 201 begin=begin, 202 ) 203 if begin 204 else None 205 ) 206 end_da = ( 207 dateadd_str( 208 flavor=self.flavor, 209 datepart='minute', 210 number=0, 211 begin=end, 212 ) 213 if end 214 else None 215 ) 216 217 meta_def = ( 218 _simple_fetch_query(pipe, self.flavor) if ( 219 (not (pipe.columns or {}).get('id', None)) 220 or (not get_config('system', 'experimental', 'join_fetch')) 221 ) else _join_fetch_query(pipe, self.flavor, debug=debug, **kw) 222 ) 223 224 has_where = 'where' in meta_def.lower()[meta_def.lower().rfind('definition'):] 225 if dt_name and (begin_da or end_da): 226 definition_dt_name = ( 227 dateadd_str(self.flavor, 'minute', 0, f"definition.{dt_name}") 228 if not is_int((begin_da or end_da)) 229 else f"definition.{dt_name}" 230 ) 231 meta_def += "\n" + ("AND" if has_where else "WHERE") + " " 232 has_where = True 233 if begin_da: 234 meta_def += f"{definition_dt_name} >= {begin_da}" 235 if begin_da and end_da: 236 meta_def += " AND " 237 if end_da: 238 meta_def += f"{definition_dt_name} < {end_da}" 239 240 if params is not None: 241 params_where = build_where(params, self, with_where=False) 242 meta_def += "\n" + ("AND" if has_where else "WHERE") + " " 243 has_where = True 244 meta_def += params_where 245 246 return meta_def
Return a pipe's meta definition fetch query.
params: Optional[Dict[str, Any]], default None
Optional params dictionary to build the WHERE
clause.
See meerschaum.utils.sql.build_where
.
begin: Union[datetime, int, str, None], default None
Most recent datatime to search for data.
If backtrack_minutes
is provided, subtract backtrack_minutes
.
end: Union[datetime, int, str, None], default None
The latest datetime to search for data.
If end
is None
, do not bound
check_existing: bool, default True
If True
, apply the backtrack interval.
debug: bool, default False Verbosity toggle.
Returns
- A pipe's meta definition fetch query string.
35def cli( 36 self, 37 debug: bool = False, 38 ) -> SuccessTuple: 39 """ 40 Launch a subprocess for an interactive CLI. 41 """ 42 from meerschaum.utils.venv import venv_exec 43 env = copy.deepcopy(dict(os.environ)) 44 env[f'MRSM_SQL_{self.label.upper()}'] = json.dumps(self.meta) 45 cli_code = ( 46 "import sys\n" 47 "import meerschaum as mrsm\n" 48 f"conn = mrsm.get_connector('sql:{self.label}')\n" 49 "success, msg = conn._cli_exit()\n" 50 "mrsm.pprint((success, msg))\n" 51 "if not success:\n" 52 " raise Exception(msg)" 53 ) 54 try: 55 _ = venv_exec(cli_code, venv=None, debug=debug, capture_output=False) 56 except Exception as e: 57 return False, f"[{self}] Failed to start CLI:\n{e}" 58 return True, "Success"
Launch a subprocess for an interactive CLI.
144def fetch_pipes_keys( 145 self, 146 connector_keys: Optional[List[str]] = None, 147 metric_keys: Optional[List[str]] = None, 148 location_keys: Optional[List[str]] = None, 149 tags: Optional[List[str]] = None, 150 params: Optional[Dict[str, Any]] = None, 151 debug: bool = False 152) -> Optional[List[Tuple[str, str, Optional[str]]]]: 153 """ 154 Return a list of tuples corresponding to the parameters provided. 155 156 Parameters 157 ---------- 158 connector_keys: Optional[List[str]], default None 159 List of connector_keys to search by. 160 161 metric_keys: Optional[List[str]], default None 162 List of metric_keys to search by. 163 164 location_keys: Optional[List[str]], default None 165 List of location_keys to search by. 166 167 params: Optional[Dict[str, Any]], default None 168 Dictionary of additional parameters to search by. 169 E.g. `--params pipe_id:1` 170 171 debug: bool, default False 172 Verbosity toggle. 173 """ 174 from meerschaum.utils.debug import dprint 175 from meerschaum.utils.packages import attempt_import 176 from meerschaum.utils.misc import separate_negation_values, flatten_list 177 from meerschaum.utils.sql import OMIT_NULLSFIRST_FLAVORS, table_exists 178 from meerschaum.config.static import STATIC_CONFIG 179 import json 180 from copy import deepcopy 181 sqlalchemy, sqlalchemy_sql_functions = attempt_import('sqlalchemy', 'sqlalchemy.sql.functions') 182 coalesce = sqlalchemy_sql_functions.coalesce 183 184 if connector_keys is None: 185 connector_keys = [] 186 if metric_keys is None: 187 metric_keys = [] 188 if location_keys is None: 189 location_keys = [] 190 else: 191 location_keys = [ 192 ( 193 lk 194 if lk not in ('[None]', 'None', 'null') 195 else 'None' 196 ) 197 for lk in location_keys 198 ] 199 if tags is None: 200 tags = [] 201 202 if params is None: 203 params = {} 204 205 ### Add three primary keys to params dictionary 206 ### (separated for convenience of arguments). 207 cols = { 208 'connector_keys': [str(ck) for ck in connector_keys], 209 'metric_key': [str(mk) for mk in metric_keys], 210 'location_key': [str(lk) for lk in location_keys], 211 } 212 213 ### Make deep copy so we don't mutate this somewhere else. 214 parameters = deepcopy(params) 215 for col, vals in cols.items(): 216 if vals not in [[], ['*']]: 217 parameters[col] = vals 218 219 if not table_exists('mrsm_pipes', self, schema=self.instance_schema, debug=debug): 220 return [] 221 222 from meerschaum.connectors.sql.tables import get_tables 223 pipes_tbl = get_tables(mrsm_instance=self, create=False, debug=debug)['pipes'] 224 225 _params = {} 226 for k, v in parameters.items(): 227 _v = json.dumps(v) if isinstance(v, dict) else v 228 _params[k] = _v 229 230 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 231 ### Parse regular params. 232 ### If a param begins with '_', negate it instead. 233 _where = [ 234 ( 235 (coalesce(pipes_tbl.c[key], 'None') == val) 236 if not str(val).startswith(negation_prefix) 237 else (pipes_tbl.c[key] != key) 238 ) for key, val in _params.items() 239 if not isinstance(val, (list, tuple)) and key in pipes_tbl.c 240 ] 241 select_cols = ( 242 [ 243 pipes_tbl.c.connector_keys, 244 pipes_tbl.c.metric_key, 245 pipes_tbl.c.location_key, 246 ] 247 ) 248 249 q = sqlalchemy.select(*select_cols).where(sqlalchemy.and_(True, *_where)) 250 for c, vals in cols.items(): 251 if not isinstance(vals, (list, tuple)) or not vals or not c in pipes_tbl.c: 252 continue 253 _in_vals, _ex_vals = separate_negation_values(vals) 254 q = q.where(coalesce(pipes_tbl.c[c], 'None').in_(_in_vals)) if _in_vals else q 255 q = q.where(coalesce(pipes_tbl.c[c], 'None').not_in(_ex_vals)) if _ex_vals else q 256 257 ### Finally, parse tags. 258 tag_groups = [tag.split(',') for tag in tags] 259 in_ex_tag_groups = [separate_negation_values(tag_group) for tag_group in tag_groups] 260 261 ors, nands = [], [] 262 for _in_tags, _ex_tags in in_ex_tag_groups: 263 sub_ands = [] 264 for nt in _in_tags: 265 sub_ands.append( 266 sqlalchemy.cast( 267 pipes_tbl.c['parameters'], 268 sqlalchemy.String, 269 ).like(f'%"tags":%"{nt}"%') 270 ) 271 if sub_ands: 272 ors.append(sqlalchemy.and_(*sub_ands)) 273 274 for xt in _ex_tags: 275 nands.append( 276 sqlalchemy.cast( 277 pipes_tbl.c['parameters'], 278 sqlalchemy.String, 279 ).not_like(f'%"tags":%"{xt}"%') 280 ) 281 282 q = q.where(sqlalchemy.and_(*nands)) if nands else q 283 q = q.where(sqlalchemy.or_(*ors)) if ors else q 284 loc_asc = sqlalchemy.asc(pipes_tbl.c['location_key']) 285 if self.flavor not in OMIT_NULLSFIRST_FLAVORS: 286 loc_asc = sqlalchemy.nullsfirst(loc_asc) 287 q = q.order_by( 288 sqlalchemy.asc(pipes_tbl.c['connector_keys']), 289 sqlalchemy.asc(pipes_tbl.c['metric_key']), 290 loc_asc, 291 ) 292 293 ### execute the query and return a list of tuples 294 if debug: 295 dprint(q.compile(compile_kwargs={'literal_binds': True})) 296 try: 297 rows = ( 298 self.execute(q).fetchall() 299 if self.flavor != 'duckdb' 300 else [ 301 (row['connector_keys'], row['metric_key'], row['location_key']) 302 for row in self.read(q).to_dict(orient='records') 303 ] 304 ) 305 except Exception as e: 306 error(str(e)) 307 308 return [(row[0], row[1], row[2]) for row in rows]
Return a list of tuples corresponding to the parameters provided.
Parameters
- connector_keys (Optional[List[str]], default None): List of connector_keys to search by.
- metric_keys (Optional[List[str]], default None): List of metric_keys to search by.
- location_keys (Optional[List[str]], default None): List of location_keys to search by.
- params (Optional[Dict[str, Any]], default None):
Dictionary of additional parameters to search by.
E.g.
--params pipe_id:1
- debug (bool, default False): Verbosity toggle.
311def create_indices( 312 self, 313 pipe: mrsm.Pipe, 314 indices: Optional[List[str]] = None, 315 debug: bool = False 316) -> bool: 317 """ 318 Create a pipe's indices. 319 """ 320 from meerschaum.utils.sql import sql_item_name, update_queries 321 from meerschaum.utils.debug import dprint 322 if debug: 323 dprint(f"Creating indices for {pipe}...") 324 if not pipe.columns: 325 warn(f"{pipe} has no index columns; skipping index creation.", stack=False) 326 return True 327 328 ix_queries = { 329 ix: queries 330 for ix, queries in self.get_create_index_queries(pipe, debug=debug).items() 331 if indices is None or ix in indices 332 } 333 success = True 334 for ix, queries in ix_queries.items(): 335 ix_success = all(self.exec_queries(queries, debug=debug, silent=False)) 336 success = success and ix_success 337 if not ix_success: 338 warn(f"Failed to create index on column: {ix}") 339 340 return success
Create a pipe's indices.
343def drop_indices( 344 self, 345 pipe: mrsm.Pipe, 346 indices: Optional[List[str]] = None, 347 debug: bool = False 348) -> bool: 349 """ 350 Drop a pipe's indices. 351 """ 352 from meerschaum.utils.debug import dprint 353 if debug: 354 dprint(f"Dropping indices for {pipe}...") 355 if not pipe.columns: 356 warn(f"Unable to drop indices for {pipe} without columns.", stack=False) 357 return False 358 ix_queries = { 359 ix: queries 360 for ix, queries in self.get_drop_index_queries(pipe, debug=debug).items() 361 if indices is None or ix in indices 362 } 363 success = True 364 for ix, queries in ix_queries.items(): 365 ix_success = all(self.exec_queries(queries, debug=debug, silent=True)) 366 if not ix_success: 367 success = False 368 if debug: 369 dprint(f"Failed to drop index on column: {ix}") 370 return success
Drop a pipe's indices.
373def get_create_index_queries( 374 self, 375 pipe: mrsm.Pipe, 376 debug: bool = False, 377) -> Dict[str, List[str]]: 378 """ 379 Return a dictionary mapping columns to a `CREATE INDEX` or equivalent query. 380 381 Parameters 382 ---------- 383 pipe: mrsm.Pipe 384 The pipe to which the queries will correspond. 385 386 Returns 387 ------- 388 A dictionary of index names mapping to lists of queries. 389 """ 390 ### NOTE: Due to recent breaking changes in DuckDB, indices don't behave properly. 391 if self.flavor == 'duckdb': 392 return {} 393 from meerschaum.utils.sql import ( 394 sql_item_name, 395 get_distinct_col_count, 396 update_queries, 397 get_null_replacement, 398 COALESCE_UNIQUE_INDEX_FLAVORS, 399 ) 400 from meerschaum.config import get_config 401 index_queries = {} 402 403 upsert = pipe.parameters.get('upsert', False) and (self.flavor + '-upsert') in update_queries 404 index_names = pipe.get_indices() 405 indices = pipe.indices 406 407 _datetime = pipe.get_columns('datetime', error=False) 408 _datetime_type = pipe.dtypes.get(_datetime, 'datetime64[ns]') 409 _datetime_name = ( 410 sql_item_name(_datetime, self.flavor, None) 411 if _datetime is not None else None 412 ) 413 _datetime_index_name = ( 414 sql_item_name(index_names['datetime'], self.flavor, None) 415 if index_names.get('datetime', None) 416 else None 417 ) 418 _id = pipe.get_columns('id', error=False) 419 _id_name = ( 420 sql_item_name(_id, self.flavor, None) 421 if _id is not None 422 else None 423 ) 424 425 _id_index_name = ( 426 sql_item_name(index_names['id'], self.flavor, None) 427 if index_names.get('id', None) 428 else None 429 ) 430 _pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 431 _create_space_partition = get_config('system', 'experimental', 'space') 432 433 ### create datetime index 434 if _datetime is not None: 435 if self.flavor == 'timescaledb' and pipe.parameters.get('hypertable', True): 436 _id_count = ( 437 get_distinct_col_count(_id, f"SELECT {_id_name} FROM {_pipe_name}", self) 438 if (_id is not None and _create_space_partition) else None 439 ) 440 441 chunk_interval = pipe.get_chunk_interval(debug=debug) 442 chunk_interval_minutes = ( 443 chunk_interval 444 if isinstance(chunk_interval, int) 445 else int(chunk_interval.total_seconds() / 60) 446 ) 447 chunk_time_interval = ( 448 f"INTERVAL '{chunk_interval_minutes} MINUTES'" 449 if isinstance(chunk_interval, timedelta) 450 else f'{chunk_interval_minutes}' 451 ) 452 453 dt_query = ( 454 f"SELECT public.create_hypertable('{_pipe_name}', " + 455 f"'{_datetime}', " 456 + ( 457 f"'{_id}', {_id_count}, " if (_id is not None and _create_space_partition) 458 else '' 459 ) 460 + f'chunk_time_interval => {chunk_time_interval}, ' 461 + 'if_not_exists => true, ' 462 + "migrate_data => true);" 463 ) 464 elif self.flavor == 'mssql': 465 dt_query = ( 466 f"CREATE CLUSTERED INDEX {_datetime_index_name} " 467 f"ON {_pipe_name} ({_datetime_name})" 468 ) 469 else: ### mssql, sqlite, etc. 470 dt_query = ( 471 f"CREATE INDEX {_datetime_index_name} " 472 + f"ON {_pipe_name} ({_datetime_name})" 473 ) 474 475 index_queries[_datetime] = [dt_query] 476 477 ### create id index 478 if _id_name is not None: 479 if self.flavor == 'timescaledb': 480 ### Already created indices via create_hypertable. 481 id_query = ( 482 None if (_id is not None and _create_space_partition) 483 else ( 484 f"CREATE INDEX IF NOT EXISTS {_id_index_name} ON {_pipe_name} ({_id_name})" 485 if _id is not None 486 else None 487 ) 488 ) 489 pass 490 else: ### mssql, sqlite, etc. 491 id_query = f"CREATE INDEX {_id_index_name} ON {_pipe_name} ({_id_name})" 492 493 if id_query is not None: 494 index_queries[_id] = id_query if isinstance(id_query, list) else [id_query] 495 496 ### Create indices for other labels in `pipe.columns`. 497 other_index_names = { 498 ix_key: ix_unquoted 499 for ix_key, ix_unquoted in index_names.items() 500 if ix_key not in ('datetime', 'id') 501 } 502 for ix_key, ix_unquoted in other_index_names.items(): 503 ix_name = sql_item_name(ix_unquoted, self.flavor, None) 504 cols = indices[ix_key] 505 if not isinstance(cols, (list, tuple)): 506 cols = [cols] 507 cols_names = [sql_item_name(col, self.flavor, None) for col in cols if col] 508 if not cols_names: 509 continue 510 cols_names_str = ", ".join(cols_names) 511 index_queries[ix_key] = [f"CREATE INDEX {ix_name} ON {_pipe_name} ({cols_names_str})"] 512 513 existing_cols_types = pipe.get_columns_types(debug=debug) 514 indices_cols_str = ', '.join( 515 [ 516 sql_item_name(ix, self.flavor) 517 for ix_key, ix in pipe.columns.items() 518 if ix and ix in existing_cols_types 519 ] 520 ) 521 coalesce_indices_cols_str = ', '.join( 522 [ 523 ( 524 "COALESCE(" 525 + sql_item_name(ix, self.flavor) 526 + ", " 527 + get_null_replacement(existing_cols_types[ix], self.flavor) 528 + ") " 529 ) if ix_key != 'datetime' else (sql_item_name(ix, self.flavor)) 530 for ix_key, ix in pipe.columns.items() 531 if ix and ix in existing_cols_types 532 ] 533 ) 534 unique_index_name = sql_item_name(pipe.target + '_unique_index', self.flavor) 535 constraint_name = sql_item_name(pipe.target + '_constraint', self.flavor) 536 add_constraint_query = ( 537 f"ALTER TABLE {_pipe_name} ADD CONSTRAINT {constraint_name} UNIQUE ({indices_cols_str})" 538 ) 539 unique_index_cols_str = ( 540 indices_cols_str 541 if self.flavor not in COALESCE_UNIQUE_INDEX_FLAVORS 542 else coalesce_indices_cols_str 543 ) 544 create_unique_index_query = ( 545 f"CREATE UNIQUE INDEX {unique_index_name} ON {_pipe_name} ({unique_index_cols_str})" 546 ) 547 constraint_queries = [create_unique_index_query] 548 if self.flavor != 'sqlite': 549 constraint_queries.append(add_constraint_query) 550 if upsert and indices_cols_str: 551 index_queries[unique_index_name] = constraint_queries 552 return index_queries
Return a dictionary mapping columns to a CREATE INDEX
or equivalent query.
Parameters
- pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
- A dictionary of index names mapping to lists of queries.
555def get_drop_index_queries( 556 self, 557 pipe: mrsm.Pipe, 558 debug: bool = False, 559) -> Dict[str, List[str]]: 560 """ 561 Return a dictionary mapping columns to a `DROP INDEX` or equivalent query. 562 563 Parameters 564 ---------- 565 pipe: mrsm.Pipe 566 The pipe to which the queries will correspond. 567 568 Returns 569 ------- 570 A dictionary of column names mapping to lists of queries. 571 """ 572 ### NOTE: Due to breaking changes within DuckDB, indices must be skipped. 573 if self.flavor == 'duckdb': 574 return {} 575 if not pipe.exists(debug=debug): 576 return {} 577 from meerschaum.utils.sql import ( 578 sql_item_name, 579 table_exists, 580 hypertable_queries, 581 DROP_IF_EXISTS_FLAVORS, 582 ) 583 drop_queries = {} 584 schema = self.get_pipe_schema(pipe) 585 schema_prefix = (schema + '_') if schema else '' 586 indices = { 587 col: schema_prefix + ix 588 for col, ix in pipe.get_indices().items() 589 } 590 pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 591 pipe_name_no_schema = sql_item_name(pipe.target, self.flavor, None) 592 593 if self.flavor not in hypertable_queries: 594 is_hypertable = False 595 else: 596 is_hypertable_query = hypertable_queries[self.flavor].format(table_name=pipe_name) 597 is_hypertable = self.value(is_hypertable_query, silent=True, debug=debug) is not None 598 599 if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else "" 600 if is_hypertable: 601 nuke_queries = [] 602 temp_table = '_' + pipe.target + '_temp_migration' 603 temp_table_name = sql_item_name(temp_table, self.flavor, self.get_pipe_schema(pipe)) 604 605 if table_exists(temp_table, self, schema=self.get_pipe_schema(pipe), debug=debug): 606 nuke_queries.append(f"DROP TABLE {if_exists_str} {temp_table_name}") 607 nuke_queries += [ 608 f"SELECT * INTO {temp_table_name} FROM {pipe_name}", 609 f"DROP TABLE {if_exists_str} {pipe_name}", 610 f"ALTER TABLE {temp_table_name} RENAME TO {pipe_name_no_schema}", 611 ] 612 nuke_ix_keys = ('datetime', 'id') 613 nuked = False 614 for ix_key in nuke_ix_keys: 615 if ix_key in indices and not nuked: 616 drop_queries[ix_key] = nuke_queries 617 nuked = True 618 619 drop_queries.update({ 620 ix_key: ["DROP INDEX " + sql_item_name(ix_unquoted, self.flavor, None)] 621 for ix_key, ix_unquoted in indices.items() 622 if ix_key not in drop_queries 623 }) 624 return drop_queries
Return a dictionary mapping columns to a DROP INDEX
or equivalent query.
Parameters
- pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
- A dictionary of column names mapping to lists of queries.
2413def get_add_columns_queries( 2414 self, 2415 pipe: mrsm.Pipe, 2416 df: Union[pd.DataFrame, Dict[str, str]], 2417 _is_db_types: bool = False, 2418 debug: bool = False, 2419) -> List[str]: 2420 """ 2421 Add new null columns of the correct type to a table from a dataframe. 2422 2423 Parameters 2424 ---------- 2425 pipe: mrsm.Pipe 2426 The pipe to be altered. 2427 2428 df: Union[pd.DataFrame, Dict[str, str]] 2429 The pandas DataFrame which contains new columns. 2430 If a dictionary is provided, assume it maps columns to Pandas data types. 2431 2432 _is_db_types: bool, default False 2433 If `True`, assume `df` is a dictionary mapping columns to SQL native dtypes. 2434 2435 Returns 2436 ------- 2437 A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector. 2438 """ 2439 if not pipe.exists(debug=debug): 2440 return [] 2441 2442 from decimal import Decimal 2443 import copy 2444 from meerschaum.utils.sql import ( 2445 sql_item_name, 2446 SINGLE_ALTER_TABLE_FLAVORS, 2447 get_table_cols_types, 2448 ) 2449 from meerschaum.utils.dtypes.sql import ( 2450 get_pd_type_from_db_type, 2451 get_db_type_from_pd_type, 2452 ) 2453 from meerschaum.utils.misc import flatten_list 2454 table_obj = self.get_pipe_table(pipe, debug=debug) 2455 is_dask = 'dask' in df.__module__ if not isinstance(df, dict) else False 2456 if is_dask: 2457 df = df.partitions[0].compute() 2458 df_cols_types = ( 2459 { 2460 col: str(typ) 2461 for col, typ in df.dtypes.items() 2462 } 2463 if not isinstance(df, dict) 2464 else copy.deepcopy(df) 2465 ) 2466 if not isinstance(df, dict) and len(df.index) > 0: 2467 for col, typ in list(df_cols_types.items()): 2468 if typ != 'object': 2469 continue 2470 val = df.iloc[0][col] 2471 if isinstance(val, (dict, list)): 2472 df_cols_types[col] = 'json' 2473 elif isinstance(val, Decimal): 2474 df_cols_types[col] = 'numeric' 2475 elif isinstance(val, str): 2476 df_cols_types[col] = 'str' 2477 db_cols_types = { 2478 col: get_pd_type_from_db_type(str(typ.type)) 2479 for col, typ in table_obj.columns.items() 2480 } if table_obj is not None else { 2481 col: get_pd_type_from_db_type(typ) 2482 for col, typ in get_table_cols_types( 2483 pipe.target, 2484 self, 2485 schema=self.get_pipe_schema(pipe), 2486 debug=debug, 2487 ).items() 2488 } 2489 new_cols = set(df_cols_types) - set(db_cols_types) 2490 if not new_cols: 2491 return [] 2492 2493 new_cols_types = { 2494 col: get_db_type_from_pd_type( 2495 df_cols_types[col], 2496 self.flavor 2497 ) for col in new_cols 2498 } 2499 2500 alter_table_query = "ALTER TABLE " + sql_item_name( 2501 pipe.target, self.flavor, self.get_pipe_schema(pipe) 2502 ) 2503 queries = [] 2504 for col, typ in new_cols_types.items(): 2505 add_col_query = ( 2506 "\nADD " 2507 + sql_item_name(col, self.flavor, None) 2508 + " " + typ + "," 2509 ) 2510 2511 if self.flavor in SINGLE_ALTER_TABLE_FLAVORS: 2512 queries.append(alter_table_query + add_col_query[:-1]) 2513 else: 2514 alter_table_query += add_col_query 2515 2516 ### For most flavors, only one query is required. 2517 ### This covers SQLite which requires one query per column. 2518 if not queries: 2519 queries.append(alter_table_query[:-1]) 2520 2521 if self.flavor != 'duckdb': 2522 return queries 2523 2524 ### NOTE: For DuckDB, we must drop and rebuild the indices. 2525 drop_index_queries = list(flatten_list( 2526 [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()] 2527 )) 2528 create_index_queries = list(flatten_list( 2529 [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()] 2530 )) 2531 2532 return drop_index_queries + queries + create_index_queries
Add new null columns of the correct type to a table from a dataframe.
Parameters
- pipe (mrsm.Pipe): The pipe to be altered.
- df (Union[pd.DataFrame, Dict[str, str]]): The pandas DataFrame which contains new columns. If a dictionary is provided, assume it maps columns to Pandas data types.
- _is_db_types (bool, default False):
If
True
, assumedf
is a dictionary mapping columns to SQL native dtypes.
Returns
- A list of the
ALTER TABLE
SQL query or queries to be executed on the provided connector.
2535def get_alter_columns_queries( 2536 self, 2537 pipe: mrsm.Pipe, 2538 df: Union[pd.DataFrame, Dict[str, str]], 2539 debug: bool = False, 2540) -> List[str]: 2541 """ 2542 If we encounter a column of a different type, set the entire column to text. 2543 If the altered columns are numeric, alter to numeric instead. 2544 2545 Parameters 2546 ---------- 2547 pipe: mrsm.Pipe 2548 The pipe to be altered. 2549 2550 df: Union[pd.DataFrame, Dict[str, str]] 2551 The pandas DataFrame which may contain altered columns. 2552 If a dict is provided, assume it maps columns to Pandas data types. 2553 2554 Returns 2555 ------- 2556 A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector. 2557 """ 2558 if not pipe.exists(debug=debug): 2559 return [] 2560 from meerschaum.utils.sql import sql_item_name, DROP_IF_EXISTS_FLAVORS, get_table_cols_types 2561 from meerschaum.utils.dataframe import get_numeric_cols 2562 from meerschaum.utils.dtypes import are_dtypes_equal 2563 from meerschaum.utils.dtypes.sql import ( 2564 get_pd_type_from_db_type, 2565 get_db_type_from_pd_type, 2566 ) 2567 from meerschaum.utils.misc import flatten_list, generate_password, items_str 2568 table_obj = self.get_pipe_table(pipe, debug=debug) 2569 target = pipe.target 2570 session_id = generate_password(3) 2571 numeric_cols = ( 2572 get_numeric_cols(df) 2573 if not isinstance(df, dict) 2574 else [ 2575 col 2576 for col, typ in df.items() 2577 if typ == 'numeric' 2578 ] 2579 ) 2580 df_cols_types = ( 2581 { 2582 col: str(typ) 2583 for col, typ in df.dtypes.items() 2584 } 2585 if not isinstance(df, dict) 2586 else df 2587 ) 2588 db_cols_types = { 2589 col: get_pd_type_from_db_type(str(typ.type)) 2590 for col, typ in table_obj.columns.items() 2591 } if table_obj is not None else { 2592 col: get_pd_type_from_db_type(typ) 2593 for col, typ in get_table_cols_types( 2594 pipe.target, 2595 self, 2596 schema=self.get_pipe_schema(pipe), 2597 debug=debug, 2598 ).items() 2599 } 2600 pipe_bool_cols = [col for col, typ in pipe.dtypes.items() if are_dtypes_equal(str(typ), 'bool')] 2601 pd_db_df_aliases = { 2602 'int': 'bool', 2603 'float': 'bool', 2604 'numeric': 'bool', 2605 'guid': 'object', 2606 } 2607 if self.flavor == 'oracle': 2608 pd_db_df_aliases['int'] = 'numeric' 2609 2610 altered_cols = { 2611 col: (db_cols_types.get(col, 'object'), typ) 2612 for col, typ in df_cols_types.items() 2613 if not are_dtypes_equal(typ, db_cols_types.get(col, 'object').lower()) 2614 and not are_dtypes_equal(db_cols_types.get(col, 'object'), 'string') 2615 } 2616 2617 ### NOTE: Sometimes bools are coerced into ints or floats. 2618 altered_cols_to_ignore = set() 2619 for col, (db_typ, df_typ) in altered_cols.items(): 2620 for db_alias, df_alias in pd_db_df_aliases.items(): 2621 if db_alias in db_typ.lower() and df_alias in df_typ.lower(): 2622 altered_cols_to_ignore.add(col) 2623 2624 ### Oracle's bool handling sometimes mixes NUMBER and INT. 2625 for bool_col in pipe_bool_cols: 2626 if bool_col not in altered_cols: 2627 continue 2628 db_is_bool_compatible = ( 2629 are_dtypes_equal('int', altered_cols[bool_col][0]) 2630 or are_dtypes_equal('float', altered_cols[bool_col][0]) 2631 or are_dtypes_equal('numeric', altered_cols[bool_col][0]) 2632 or are_dtypes_equal('bool', altered_cols[bool_col][0]) 2633 ) 2634 df_is_bool_compatible = ( 2635 are_dtypes_equal('int', altered_cols[bool_col][1]) 2636 or are_dtypes_equal('float', altered_cols[bool_col][1]) 2637 or are_dtypes_equal('numeric', altered_cols[bool_col][1]) 2638 or are_dtypes_equal('bool', altered_cols[bool_col][1]) 2639 ) 2640 if db_is_bool_compatible and df_is_bool_compatible: 2641 altered_cols_to_ignore.add(bool_col) 2642 2643 for col in altered_cols_to_ignore: 2644 _ = altered_cols.pop(col, None) 2645 if not altered_cols: 2646 return [] 2647 2648 if numeric_cols: 2649 pipe.dtypes.update({col: 'numeric' for col in numeric_cols}) 2650 edit_success, edit_msg = pipe.edit(debug=debug) 2651 if not edit_success: 2652 warn( 2653 f"Failed to update dtypes for numeric columns {items_str(numeric_cols)}:\n" 2654 + f"{edit_msg}" 2655 ) 2656 else: 2657 numeric_cols.extend([col for col, typ in pipe.dtypes.items() if typ == 'numeric']) 2658 2659 numeric_type = get_db_type_from_pd_type('numeric', self.flavor, as_sqlalchemy=False) 2660 text_type = get_db_type_from_pd_type('str', self.flavor, as_sqlalchemy=False) 2661 altered_cols_types = { 2662 col: ( 2663 numeric_type 2664 if col in numeric_cols 2665 else text_type 2666 ) 2667 for col, (db_typ, typ) in altered_cols.items() 2668 } 2669 2670 if self.flavor == 'sqlite': 2671 temp_table_name = '-' + session_id + '_' + target 2672 rename_query = ( 2673 "ALTER TABLE " 2674 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2675 + " RENAME TO " 2676 + sql_item_name(temp_table_name, self.flavor, None) 2677 ) 2678 create_query = ( 2679 "CREATE TABLE " 2680 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2681 + " (\n" 2682 ) 2683 for col_name, col_obj in table_obj.columns.items(): 2684 create_query += ( 2685 sql_item_name(col_name, self.flavor, None) 2686 + " " 2687 + ( 2688 str(col_obj.type) 2689 if col_name not in altered_cols 2690 else altered_cols_types[col_name] 2691 ) 2692 + ",\n" 2693 ) 2694 create_query = create_query[:-2] + "\n)" 2695 2696 insert_query = ( 2697 "INSERT INTO " 2698 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2699 + ' (' 2700 + ', '.join([ 2701 sql_item_name(col_name, self.flavor, None) 2702 for col_name, _ in table_obj.columns.items() 2703 ]) 2704 + ')' 2705 + "\nSELECT\n" 2706 ) 2707 for col_name, col_obj in table_obj.columns.items(): 2708 new_col_str = ( 2709 sql_item_name(col_name, self.flavor, None) 2710 if col_name not in altered_cols 2711 else ( 2712 "CAST(" 2713 + sql_item_name(col_name, self.flavor, None) 2714 + " AS " 2715 + altered_cols_types[col_name] 2716 + ")" 2717 ) 2718 ) 2719 insert_query += new_col_str + ",\n" 2720 insert_query = insert_query[:-2] + ( 2721 f"\nFROM {sql_item_name(temp_table_name, self.flavor, self.get_pipe_schema(pipe))}" 2722 ) 2723 2724 if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else "" 2725 2726 drop_query = f"DROP TABLE {if_exists_str}" + sql_item_name( 2727 temp_table_name, self.flavor, self.get_pipe_schema(pipe) 2728 ) 2729 return [ 2730 rename_query, 2731 create_query, 2732 insert_query, 2733 drop_query, 2734 ] 2735 2736 queries = [] 2737 if self.flavor == 'oracle': 2738 for col, typ in altered_cols_types.items(): 2739 add_query = ( 2740 "ALTER TABLE " 2741 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2742 + "\nADD " + sql_item_name(col + '_temp', self.flavor, None) 2743 + " " + typ 2744 ) 2745 queries.append(add_query) 2746 2747 for col, typ in altered_cols_types.items(): 2748 populate_temp_query = ( 2749 "UPDATE " 2750 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2751 + "\nSET " + sql_item_name(col + '_temp', self.flavor, None) 2752 + ' = ' + sql_item_name(col, self.flavor, None) 2753 ) 2754 queries.append(populate_temp_query) 2755 2756 for col, typ in altered_cols_types.items(): 2757 set_old_cols_to_null_query = ( 2758 "UPDATE " 2759 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2760 + "\nSET " + sql_item_name(col, self.flavor, None) 2761 + ' = NULL' 2762 ) 2763 queries.append(set_old_cols_to_null_query) 2764 2765 for col, typ in altered_cols_types.items(): 2766 alter_type_query = ( 2767 "ALTER TABLE " 2768 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2769 + "\nMODIFY " + sql_item_name(col, self.flavor, None) + ' ' 2770 + typ 2771 ) 2772 queries.append(alter_type_query) 2773 2774 for col, typ in altered_cols_types.items(): 2775 set_old_to_temp_query = ( 2776 "UPDATE " 2777 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2778 + "\nSET " + sql_item_name(col, self.flavor, None) 2779 + ' = ' + sql_item_name(col + '_temp', self.flavor, None) 2780 ) 2781 queries.append(set_old_to_temp_query) 2782 2783 for col, typ in altered_cols_types.items(): 2784 drop_temp_query = ( 2785 "ALTER TABLE " 2786 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2787 + "\nDROP COLUMN " + sql_item_name(col + '_temp', self.flavor, None) 2788 ) 2789 queries.append(drop_temp_query) 2790 2791 return queries 2792 2793 2794 query = "ALTER TABLE " + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 2795 for col, typ in altered_cols_types.items(): 2796 alter_col_prefix = ( 2797 'ALTER' if self.flavor not in ('mysql', 'mariadb', 'oracle') 2798 else 'MODIFY' 2799 ) 2800 type_prefix = ( 2801 '' if self.flavor in ('mssql', 'mariadb', 'mysql') 2802 else 'TYPE ' 2803 ) 2804 column_str = 'COLUMN' if self.flavor != 'oracle' else '' 2805 query += ( 2806 f"\n{alter_col_prefix} {column_str} " 2807 + sql_item_name(col, self.flavor, None) 2808 + " " + type_prefix + typ + "," 2809 ) 2810 2811 query = query[:-1] 2812 queries.append(query) 2813 if self.flavor != 'duckdb': 2814 return queries 2815 2816 drop_index_queries = list(flatten_list( 2817 [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()] 2818 )) 2819 create_index_queries = list(flatten_list( 2820 [q for