meerschaum.connectors
Create connectors with meerschaum.connectors.get_connector()
.
For ease of use, you can also import from the root meerschaum
module:
>>> from meerschaum import get_connector
>>> conn = get_connector()
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Create connectors with `meerschaum.connectors.get_connector()`. 7For ease of use, you can also import from the root `meerschaum` module: 8``` 9>>> from meerschaum import get_connector 10>>> conn = get_connector() 11``` 12""" 13 14from __future__ import annotations 15 16import meerschaum as mrsm 17from meerschaum.utils.typing import Any, Union, List, Dict 18from meerschaum.utils.threading import RLock 19from meerschaum.utils.warnings import warn 20 21from meerschaum.connectors._Connector import Connector, InvalidAttributesError 22from meerschaum.connectors.sql._SQLConnector import SQLConnector 23from meerschaum.connectors.api._APIConnector import APIConnector 24from meerschaum.connectors.sql._create_engine import flavor_configs as sql_flavor_configs 25 26__all__ = ( 27 "make_connector", 28 "Connector", 29 "SQLConnector", 30 "APIConnector", 31 "get_connector", 32 "is_connected", 33 "poll", 34 "api", 35 "sql", 36 "valkey", 37) 38 39### store connectors partitioned by 40### type, label for reuse 41connectors: Dict[str, Dict[str, Connector]] = { 42 'api' : {}, 43 'sql' : {}, 44 'plugin' : {}, 45 'valkey' : {}, 46} 47instance_types: List[str] = ['sql', 'api'] 48_locks: Dict[str, RLock] = { 49 'connectors' : RLock(), 50 'types' : RLock(), 51 'custom_types' : RLock(), 52 '_loaded_plugin_connectors': RLock(), 53 'instance_types' : RLock(), 54} 55attributes: Dict[str, Dict[str, Any]] = { 56 'api': { 57 'required': [ 58 'host', 59 'username', 60 'password', 61 ], 62 'optional': [ 63 'port', 64 ], 65 'default': { 66 'protocol': 'http', 67 }, 68 }, 69 'sql': { 70 'flavors': sql_flavor_configs, 71 }, 72} 73### Fill this with objects only when connectors are first referenced. 74types: Dict[str, Any] = {} 75custom_types: set = set() 76_loaded_plugin_connectors: bool = False 77 78 79def get_connector( 80 type: str = None, 81 label: str = None, 82 refresh: bool = False, 83 debug: bool = False, 84 **kw: Any 85) -> Connector: 86 """ 87 Return existing connector or create new connection and store for reuse. 88 89 You can create new connectors if enough parameters are provided for the given type and flavor. 90 91 92 Parameters 93 ---------- 94 type: Optional[str], default None 95 Connector type (sql, api, etc.). 96 Defaults to the type of the configured `instance_connector`. 97 98 label: Optional[str], default None 99 Connector label (e.g. main). Defaults to `'main'`. 100 101 refresh: bool, default False 102 Refresh the Connector instance / construct new object. Defaults to `False`. 103 104 kw: Any 105 Other arguments to pass to the Connector constructor. 106 If the Connector has already been constructed and new arguments are provided, 107 `refresh` is set to `True` and the old Connector is replaced. 108 109 Returns 110 ------- 111 A new Meerschaum connector (e.g. `meerschaum.connectors.api.APIConnector`, 112 `meerschaum.connectors.sql.SQLConnector`). 113 114 Examples 115 -------- 116 The following parameters would create a new 117 `meerschaum.connectors.sql.SQLConnector` that isn't in the configuration file. 118 119 ``` 120 >>> conn = get_connector( 121 ... type = 'sql', 122 ... label = 'newlabel', 123 ... flavor = 'sqlite', 124 ... database = '/file/path/to/database.db' 125 ... ) 126 >>> 127 ``` 128 129 """ 130 from meerschaum.connectors.parse import parse_instance_keys 131 from meerschaum.config import get_config 132 from meerschaum.config.static import STATIC_CONFIG 133 from meerschaum.utils.warnings import warn 134 global _loaded_plugin_connectors 135 if isinstance(type, str) and not label and ':' in type: 136 type, label = type.split(':', maxsplit=1) 137 138 with _locks['_loaded_plugin_connectors']: 139 if not _loaded_plugin_connectors: 140 load_plugin_connectors() 141 _load_builtin_custom_connectors() 142 _loaded_plugin_connectors = True 143 144 if type is None and label is None: 145 default_instance_keys = get_config('meerschaum', 'instance', patch=True) 146 ### recursive call to get_connector 147 return parse_instance_keys(default_instance_keys) 148 149 ### NOTE: the default instance connector may not be main. 150 ### Only fall back to 'main' if the type is provided by the label is omitted. 151 label = label if label is not None else STATIC_CONFIG['connectors']['default_label'] 152 153 ### type might actually be a label. Check if so and raise a warning. 154 if type not in connectors: 155 possibilities, poss_msg = [], "" 156 for _type in get_config('meerschaum', 'connectors'): 157 if type in get_config('meerschaum', 'connectors', _type): 158 possibilities.append(f"{_type}:{type}") 159 if len(possibilities) > 0: 160 poss_msg = " Did you mean" 161 for poss in possibilities[:-1]: 162 poss_msg += f" '{poss}'," 163 if poss_msg.endswith(','): 164 poss_msg = poss_msg[:-1] 165 if len(possibilities) > 1: 166 poss_msg += " or" 167 poss_msg += f" '{possibilities[-1]}'?" 168 169 warn(f"Cannot create Connector of type '{type}'." + poss_msg, stack=False) 170 return None 171 172 if 'sql' not in types: 173 from meerschaum.connectors.plugin import PluginConnector 174 from meerschaum.connectors.valkey import ValkeyConnector 175 with _locks['types']: 176 types.update({ 177 'api': APIConnector, 178 'sql': SQLConnector, 179 'plugin': PluginConnector, 180 'valkey': ValkeyConnector, 181 }) 182 183 ### determine if we need to call the constructor 184 if not refresh: 185 ### see if any user-supplied arguments differ from the existing instance 186 if label in connectors[type]: 187 warning_message = None 188 for attribute, value in kw.items(): 189 if attribute not in connectors[type][label].meta: 190 import inspect 191 cls = connectors[type][label].__class__ 192 cls_init_signature = inspect.signature(cls) 193 cls_init_params = cls_init_signature.parameters 194 if attribute not in cls_init_params: 195 warning_message = ( 196 f"Received new attribute '{attribute}' not present in connector " + 197 f"{connectors[type][label]}.\n" 198 ) 199 elif connectors[type][label].__dict__[attribute] != value: 200 warning_message = ( 201 f"Mismatched values for attribute '{attribute}' in connector " 202 + f"'{connectors[type][label]}'.\n" + 203 f" - Keyword value: '{value}'\n" + 204 f" - Existing value: '{connectors[type][label].__dict__[attribute]}'\n" 205 ) 206 if warning_message is not None: 207 warning_message += ( 208 "\nSetting `refresh` to True and recreating connector with type:" 209 + f" '{type}' and label '{label}'." 210 ) 211 refresh = True 212 warn(warning_message) 213 else: ### connector doesn't yet exist 214 refresh = True 215 216 ### only create an object if refresh is True 217 ### (can be manually specified, otherwise determined above) 218 if refresh: 219 with _locks['connectors']: 220 try: 221 ### will raise an error if configuration is incorrect / missing 222 conn = types[type](label=label, **kw) 223 connectors[type][label] = conn 224 except InvalidAttributesError as ie: 225 warn( 226 f"Incorrect attributes for connector '{type}:{label}'.\n" 227 + str(ie), 228 stack = False, 229 ) 230 conn = None 231 except Exception as e: 232 from meerschaum.utils.formatting import get_console 233 console = get_console() 234 if console: 235 console.print_exception() 236 warn( 237 f"Exception when creating connector '{type}:{label}'.\n" + str(e), 238 stack = False, 239 ) 240 conn = None 241 if conn is None: 242 return None 243 244 return connectors[type][label] 245 246 247def is_connected(keys: str, **kw) -> bool: 248 """ 249 Check if the connector keys correspond to an active connection. 250 If the connector has not been created, it will immediately return `False`. 251 If the connector exists but cannot communicate with the source, return `False`. 252 253 **NOTE:** Only works with instance connectors (`SQLConnectors` and `APIConnectors`). 254 Keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`. 255 256 Parameters 257 ---------- 258 keys: 259 The keys to the connector (e.g. `'sql:main'`). 260 261 Returns 262 ------- 263 A `bool` corresponding to whether a successful connection may be made. 264 265 """ 266 import warnings 267 if ':' not in keys: 268 warn(f"Invalid connector keys '{keys}'") 269 270 try: 271 typ, label = keys.split(':') 272 except Exception: 273 return False 274 if typ not in instance_types: 275 return False 276 if label not in connectors.get(typ, {}): 277 return False 278 279 from meerschaum.connectors.parse import parse_instance_keys 280 conn = parse_instance_keys(keys) 281 try: 282 with warnings.catch_warnings(): 283 warnings.filterwarnings('ignore') 284 return conn.test_connection(**kw) 285 except Exception: 286 return False 287 288 289def make_connector(cls, _is_executor: bool = False): 290 """ 291 Register a class as a `Connector`. 292 The `type` will be the lower case of the class name, without the suffix `connector`. 293 294 Parameters 295 ---------- 296 instance: bool, default False 297 If `True`, make this connector type an instance connector. 298 This requires implementing the various pipes functions and lots of testing. 299 300 Examples 301 -------- 302 >>> import meerschaum as mrsm 303 >>> from meerschaum.connectors import make_connector, Connector 304 >>> 305 >>> @make_connector 306 >>> class FooConnector(Connector): 307 ... REQUIRED_ATTRIBUTES: list[str] = ['username', 'password'] 308 ... 309 >>> conn = mrsm.get_connector('foo:bar', username='dog', password='cat') 310 >>> print(conn.username, conn.password) 311 dog cat 312 >>> 313 """ 314 import re 315 suffix_regex = ( 316 r'connector$' 317 if not _is_executor 318 else r'executor$' 319 ) 320 typ = re.sub(suffix_regex, '', cls.__name__.lower()) 321 with _locks['types']: 322 types[typ] = cls 323 with _locks['custom_types']: 324 custom_types.add(typ) 325 with _locks['connectors']: 326 if typ not in connectors: 327 connectors[typ] = {} 328 if getattr(cls, 'IS_INSTANCE', False): 329 with _locks['instance_types']: 330 if typ not in instance_types: 331 instance_types.append(typ) 332 333 return cls 334 335 336def load_plugin_connectors(): 337 """ 338 If a plugin makes use of the `make_connector` decorator, 339 load its module. 340 """ 341 from meerschaum.plugins import get_plugins, import_plugins 342 to_import = [] 343 for plugin in get_plugins(): 344 if plugin is None: 345 continue 346 with open(plugin.__file__, encoding='utf-8') as f: 347 text = f.read() 348 if 'make_connector' in text or 'Connector' in text: 349 to_import.append(plugin.name) 350 if not to_import: 351 return 352 import_plugins(*to_import) 353 354 355def get_connector_plugin( 356 connector: Connector, 357) -> Union[str, None, mrsm.Plugin]: 358 """ 359 Determine the plugin for a connector. 360 This is useful for handling virtual environments for custom instance connectors. 361 362 Parameters 363 ---------- 364 connector: Connector 365 The connector which may require a virtual environment. 366 367 Returns 368 ------- 369 A Plugin, 'mrsm', or None. 370 """ 371 if not hasattr(connector, 'type'): 372 return None 373 plugin_name = ( 374 connector.__module__.replace('plugins.', '').split('.')[0] 375 if connector.type in custom_types else ( 376 connector.label 377 if connector.type == 'plugin' 378 else 'mrsm' 379 ) 380 ) 381 plugin = mrsm.Plugin(plugin_name) 382 return plugin if plugin.is_installed() else None 383 384 385def _load_builtin_custom_connectors(): 386 """ 387 Import custom connectors decorated with `@make_connector` or `@make_executor`. 388 """ 389 import meerschaum.jobs.systemd 390 import meerschaum.connectors.valkey
290def make_connector(cls, _is_executor: bool = False): 291 """ 292 Register a class as a `Connector`. 293 The `type` will be the lower case of the class name, without the suffix `connector`. 294 295 Parameters 296 ---------- 297 instance: bool, default False 298 If `True`, make this connector type an instance connector. 299 This requires implementing the various pipes functions and lots of testing. 300 301 Examples 302 -------- 303 >>> import meerschaum as mrsm 304 >>> from meerschaum.connectors import make_connector, Connector 305 >>> 306 >>> @make_connector 307 >>> class FooConnector(Connector): 308 ... REQUIRED_ATTRIBUTES: list[str] = ['username', 'password'] 309 ... 310 >>> conn = mrsm.get_connector('foo:bar', username='dog', password='cat') 311 >>> print(conn.username, conn.password) 312 dog cat 313 >>> 314 """ 315 import re 316 suffix_regex = ( 317 r'connector$' 318 if not _is_executor 319 else r'executor$' 320 ) 321 typ = re.sub(suffix_regex, '', cls.__name__.lower()) 322 with _locks['types']: 323 types[typ] = cls 324 with _locks['custom_types']: 325 custom_types.add(typ) 326 with _locks['connectors']: 327 if typ not in connectors: 328 connectors[typ] = {} 329 if getattr(cls, 'IS_INSTANCE', False): 330 with _locks['instance_types']: 331 if typ not in instance_types: 332 instance_types.append(typ) 333 334 return cls
Register a class as a Connector
.
The type
will be the lower case of the class name, without the suffix connector
.
Parameters
- instance (bool, default False):
If
True
, make this connector type an instance connector. This requires implementing the various pipes functions and lots of testing.
Examples
>>> import meerschaum as mrsm
>>> from meerschaum.connectors import make_connector, Connector
>>>
>>> @make_connector
>>> class FooConnector(Connector):
... REQUIRED_ATTRIBUTES: list[str] = ['username', 'password']
...
>>> conn = mrsm.get_connector('foo:bar', username='dog', password='cat')
>>> print(conn.username, conn.password)
dog cat
>>>
20class Connector(metaclass=abc.ABCMeta): 21 """ 22 The base connector class to hold connection attributes. 23 """ 24 def __init__( 25 self, 26 type: Optional[str] = None, 27 label: Optional[str] = None, 28 **kw: Any 29 ): 30 """ 31 Set the given keyword arguments as attributes. 32 33 Parameters 34 ---------- 35 type: str 36 The `type` of the connector (e.g. `sql`, `api`, `plugin`). 37 38 label: str 39 The `label` for the connector. 40 41 42 Examples 43 -------- 44 Run `mrsm edit config` and to edit connectors in the YAML file: 45 46 ```yaml 47 meerschaum: 48 connections: 49 {type}: 50 {label}: 51 ### attributes go here 52 ``` 53 54 """ 55 self._original_dict = copy.deepcopy(self.__dict__) 56 self._set_attributes(type=type, label=label, **kw) 57 58 ### NOTE: Override `REQUIRED_ATTRIBUTES` if `uri` is set. 59 self.verify_attributes( 60 ['uri'] 61 if 'uri' in self.__dict__ 62 else getattr(self, 'REQUIRED_ATTRIBUTES', None) 63 ) 64 65 def _reset_attributes(self): 66 self.__dict__ = self._original_dict 67 68 def _set_attributes( 69 self, 70 *args, 71 inherit_default: bool = True, 72 **kw: Any 73 ): 74 from meerschaum.config.static import STATIC_CONFIG 75 from meerschaum.utils.warnings import error 76 77 self._attributes = {} 78 79 default_label = STATIC_CONFIG['connectors']['default_label'] 80 81 ### NOTE: Support the legacy method of explicitly passing the type. 82 label = kw.get('label', None) 83 if label is None: 84 if len(args) == 2: 85 label = args[1] 86 elif len(args) == 0: 87 label = None 88 else: 89 label = args[0] 90 91 if label == 'default': 92 error( 93 f"Label cannot be 'default'. Did you mean '{default_label}'?", 94 InvalidAttributesError, 95 ) 96 self.__dict__['label'] = label 97 98 from meerschaum.config import get_config 99 conn_configs = copy.deepcopy(get_config('meerschaum', 'connectors')) 100 connector_config = copy.deepcopy(get_config('system', 'connectors')) 101 102 ### inherit attributes from 'default' if exists 103 if inherit_default: 104 inherit_from = 'default' 105 if self.type in conn_configs and inherit_from in conn_configs[self.type]: 106 _inherit_dict = copy.deepcopy(conn_configs[self.type][inherit_from]) 107 self._attributes.update(_inherit_dict) 108 109 ### load user config into self._attributes 110 if self.type in conn_configs and self.label in conn_configs[self.type]: 111 self._attributes.update(conn_configs[self.type][self.label] or {}) 112 113 ### load system config into self._sys_config 114 ### (deep copy so future Connectors don't inherit changes) 115 if self.type in connector_config: 116 self._sys_config = copy.deepcopy(connector_config[self.type]) 117 118 ### add additional arguments or override configuration 119 self._attributes.update(kw) 120 121 ### finally, update __dict__ with _attributes. 122 self.__dict__.update(self._attributes) 123 124 def verify_attributes( 125 self, 126 required_attributes: Optional[List[str]] = None, 127 debug: bool = False, 128 ) -> None: 129 """ 130 Ensure that the required attributes have been met. 131 132 The Connector base class checks the minimum requirements. 133 Child classes may enforce additional requirements. 134 135 Parameters 136 ---------- 137 required_attributes: Optional[List[str]], default None 138 Attributes to be verified. If `None`, default to `['label']`. 139 140 debug: bool, default False 141 Verbosity toggle. 142 143 Returns 144 ------- 145 Don't return anything. 146 147 Raises 148 ------ 149 An error if any of the required attributes are missing. 150 """ 151 from meerschaum.utils.warnings import error, warn 152 from meerschaum.utils.debug import dprint 153 from meerschaum.utils.misc import items_str 154 if required_attributes is None: 155 required_attributes = ['label'] 156 157 missing_attributes = set() 158 for a in required_attributes: 159 if a not in self.__dict__: 160 missing_attributes.add(a) 161 if len(missing_attributes) > 0: 162 error( 163 ( 164 f"Missing {items_str(list(missing_attributes))} " 165 + f"for connector '{self.type}:{self.label}'." 166 ), 167 InvalidAttributesError, 168 silent=True, 169 stack=False 170 ) 171 172 173 def __str__(self): 174 """ 175 When cast to a string, return type:label. 176 """ 177 return f"{self.type}:{self.label}" 178 179 def __repr__(self): 180 """ 181 Represent the connector as type:label. 182 """ 183 return str(self) 184 185 @property 186 def meta(self) -> Dict[str, Any]: 187 """ 188 Return the keys needed to reconstruct this Connector. 189 """ 190 _meta = { 191 key: value 192 for key, value in self.__dict__.items() 193 if not str(key).startswith('_') 194 } 195 _meta.update({ 196 'type': self.type, 197 'label': self.label, 198 }) 199 return _meta 200 201 202 @property 203 def type(self) -> str: 204 """ 205 Return the type for this connector. 206 """ 207 _type = self.__dict__.get('type', None) 208 if _type is None: 209 import re 210 is_executor = self.__class__.__name__.lower().endswith('executor') 211 suffix_regex = ( 212 r'connector$' 213 if not is_executor 214 else r'executor$' 215 ) 216 _type = re.sub(suffix_regex, '', self.__class__.__name__.lower()) 217 self.__dict__['type'] = _type 218 return _type 219 220 221 @property 222 def label(self) -> str: 223 """ 224 Return the label for this connector. 225 """ 226 _label = self.__dict__.get('label', None) 227 if _label is None: 228 from meerschaum.config.static import STATIC_CONFIG 229 _label = STATIC_CONFIG['connectors']['default_label'] 230 self.__dict__['label'] = _label 231 return _label
The base connector class to hold connection attributes.
24 def __init__( 25 self, 26 type: Optional[str] = None, 27 label: Optional[str] = None, 28 **kw: Any 29 ): 30 """ 31 Set the given keyword arguments as attributes. 32 33 Parameters 34 ---------- 35 type: str 36 The `type` of the connector (e.g. `sql`, `api`, `plugin`). 37 38 label: str 39 The `label` for the connector. 40 41 42 Examples 43 -------- 44 Run `mrsm edit config` and to edit connectors in the YAML file: 45 46 ```yaml 47 meerschaum: 48 connections: 49 {type}: 50 {label}: 51 ### attributes go here 52 ``` 53 54 """ 55 self._original_dict = copy.deepcopy(self.__dict__) 56 self._set_attributes(type=type, label=label, **kw) 57 58 ### NOTE: Override `REQUIRED_ATTRIBUTES` if `uri` is set. 59 self.verify_attributes( 60 ['uri'] 61 if 'uri' in self.__dict__ 62 else getattr(self, 'REQUIRED_ATTRIBUTES', None) 63 )
124 def verify_attributes( 125 self, 126 required_attributes: Optional[List[str]] = None, 127 debug: bool = False, 128 ) -> None: 129 """ 130 Ensure that the required attributes have been met. 131 132 The Connector base class checks the minimum requirements. 133 Child classes may enforce additional requirements. 134 135 Parameters 136 ---------- 137 required_attributes: Optional[List[str]], default None 138 Attributes to be verified. If `None`, default to `['label']`. 139 140 debug: bool, default False 141 Verbosity toggle. 142 143 Returns 144 ------- 145 Don't return anything. 146 147 Raises 148 ------ 149 An error if any of the required attributes are missing. 150 """ 151 from meerschaum.utils.warnings import error, warn 152 from meerschaum.utils.debug import dprint 153 from meerschaum.utils.misc import items_str 154 if required_attributes is None: 155 required_attributes = ['label'] 156 157 missing_attributes = set() 158 for a in required_attributes: 159 if a not in self.__dict__: 160 missing_attributes.add(a) 161 if len(missing_attributes) > 0: 162 error( 163 ( 164 f"Missing {items_str(list(missing_attributes))} " 165 + f"for connector '{self.type}:{self.label}'." 166 ), 167 InvalidAttributesError, 168 silent=True, 169 stack=False 170 )
Ensure that the required attributes have been met.
The Connector base class checks the minimum requirements. Child classes may enforce additional requirements.
Parameters
- required_attributes (Optional[List[str]], default None):
Attributes to be verified. If
None
, default to['label']
. - debug (bool, default False): Verbosity toggle.
Returns
- Don't return anything.
Raises
- An error if any of the required attributes are missing.
185 @property 186 def meta(self) -> Dict[str, Any]: 187 """ 188 Return the keys needed to reconstruct this Connector. 189 """ 190 _meta = { 191 key: value 192 for key, value in self.__dict__.items() 193 if not str(key).startswith('_') 194 } 195 _meta.update({ 196 'type': self.type, 197 'label': self.label, 198 }) 199 return _meta
Return the keys needed to reconstruct this Connector.
202 @property 203 def type(self) -> str: 204 """ 205 Return the type for this connector. 206 """ 207 _type = self.__dict__.get('type', None) 208 if _type is None: 209 import re 210 is_executor = self.__class__.__name__.lower().endswith('executor') 211 suffix_regex = ( 212 r'connector$' 213 if not is_executor 214 else r'executor$' 215 ) 216 _type = re.sub(suffix_regex, '', self.__class__.__name__.lower()) 217 self.__dict__['type'] = _type 218 return _type
Return the type for this connector.
221 @property 222 def label(self) -> str: 223 """ 224 Return the label for this connector. 225 """ 226 _label = self.__dict__.get('label', None) 227 if _label is None: 228 from meerschaum.config.static import STATIC_CONFIG 229 _label = STATIC_CONFIG['connectors']['default_label'] 230 self.__dict__['label'] = _label 231 return _label
Return the label for this connector.
18class SQLConnector(Connector): 19 """ 20 Connect to SQL databases via `sqlalchemy`. 21 22 SQLConnectors may be used as Meerschaum instance connectors. 23 Read more about connectors and instances at 24 https://meerschaum.io/reference/connectors/ 25 26 """ 27 28 IS_INSTANCE: bool = True 29 30 from ._create_engine import flavor_configs, create_engine 31 from ._sql import ( 32 read, 33 value, 34 exec, 35 execute, 36 to_sql, 37 exec_queries, 38 get_connection, 39 _cleanup_connections, 40 ) 41 from meerschaum.utils.sql import test_connection 42 from ._fetch import fetch, get_pipe_metadef 43 from ._cli import cli, _cli_exit 44 from ._pipes import ( 45 fetch_pipes_keys, 46 create_indices, 47 drop_indices, 48 get_create_index_queries, 49 get_drop_index_queries, 50 get_add_columns_queries, 51 get_alter_columns_queries, 52 delete_pipe, 53 get_pipe_data, 54 get_pipe_data_query, 55 register_pipe, 56 edit_pipe, 57 get_pipe_id, 58 get_pipe_attributes, 59 sync_pipe, 60 sync_pipe_inplace, 61 get_sync_time, 62 pipe_exists, 63 get_pipe_rowcount, 64 drop_pipe, 65 clear_pipe, 66 deduplicate_pipe, 67 get_pipe_table, 68 get_pipe_columns_types, 69 get_to_sql_dtype, 70 get_pipe_schema, 71 create_pipe_table_from_df, 72 get_pipe_columns_indices, 73 get_temporary_target, 74 create_pipe_indices, 75 drop_pipe_indices, 76 get_pipe_index_names, 77 ) 78 from ._plugins import ( 79 register_plugin, 80 delete_plugin, 81 get_plugin_id, 82 get_plugin_version, 83 get_plugins, 84 get_plugin_user_id, 85 get_plugin_username, 86 get_plugin_attributes, 87 ) 88 from ._users import ( 89 register_user, 90 get_user_id, 91 get_users, 92 edit_user, 93 delete_user, 94 get_user_password_hash, 95 get_user_type, 96 get_user_attributes, 97 ) 98 from ._uri import from_uri, parse_uri 99 from ._instance import ( 100 _log_temporary_tables_creation, 101 _drop_temporary_table, 102 _drop_temporary_tables, 103 _drop_old_temporary_tables, 104 ) 105 106 def __init__( 107 self, 108 label: Optional[str] = None, 109 flavor: Optional[str] = None, 110 wait: bool = False, 111 connect: bool = False, 112 debug: bool = False, 113 **kw: Any 114 ): 115 """ 116 Parameters 117 ---------- 118 label: str, default 'main' 119 The identifying label for the connector. 120 E.g. for `sql:main`, 'main' is the label. 121 Defaults to 'main'. 122 123 flavor: Optional[str], default None 124 The database flavor, e.g. 125 `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc. 126 To see supported flavors, run the `bootstrap connectors` command. 127 128 wait: bool, default False 129 If `True`, block until a database connection has been made. 130 Defaults to `False`. 131 132 connect: bool, default False 133 If `True`, immediately attempt to connect the database and raise 134 a warning if the connection fails. 135 Defaults to `False`. 136 137 debug: bool, default False 138 Verbosity toggle. 139 Defaults to `False`. 140 141 kw: Any 142 All other arguments will be passed to the connector's attributes. 143 Therefore, a connector may be made without being registered, 144 as long enough parameters are supplied to the constructor. 145 """ 146 if 'uri' in kw: 147 uri = kw['uri'] 148 if uri.startswith('postgres') and not uri.startswith('postgresql'): 149 uri = uri.replace('postgres', 'postgresql', 1) 150 if uri.startswith('postgresql') and not uri.startswith('postgresql+'): 151 uri = uri.replace('postgresql://', 'postgresql+psycopg://', 1) 152 if uri.startswith('timescaledb://'): 153 uri = uri.replace('timescaledb://', 'postgresql+psycopg://', 1) 154 flavor = 'timescaledb' 155 kw['uri'] = uri 156 from_uri_params = self.from_uri(kw['uri'], as_dict=True) 157 label = label or from_uri_params.get('label', None) 158 _ = from_uri_params.pop('label', None) 159 160 ### Sometimes the flavor may be provided with a URI. 161 kw.update(from_uri_params) 162 if flavor: 163 kw['flavor'] = flavor 164 165 ### set __dict__ in base class 166 super().__init__( 167 'sql', 168 label = label or self.__dict__.get('label', None), 169 **kw 170 ) 171 172 if self.__dict__.get('flavor', None) == 'sqlite': 173 self._reset_attributes() 174 self._set_attributes( 175 'sql', 176 label = label, 177 inherit_default = False, 178 **kw 179 ) 180 ### For backwards compatability reasons, set the path for sql:local if its missing. 181 if self.label == 'local' and not self.__dict__.get('database', None): 182 from meerschaum.config._paths import SQLITE_DB_PATH 183 self.database = str(SQLITE_DB_PATH) 184 185 ### ensure flavor and label are set accordingly 186 if 'flavor' not in self.__dict__: 187 if flavor is None and 'uri' not in self.__dict__: 188 raise Exception( 189 f" Missing flavor. Provide flavor as a key for '{self}'." 190 ) 191 self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None) 192 193 if self.flavor == 'postgres': 194 self.flavor = 'postgresql' 195 196 self._debug = debug 197 ### Store the PID and thread at initialization 198 ### so we can dispose of the Pool in child processes or threads. 199 import os 200 import threading 201 self._pid = os.getpid() 202 self._thread_ident = threading.current_thread().ident 203 self._sessions = {} 204 self._locks = {'_sessions': threading.RLock(), } 205 206 ### verify the flavor's requirements are met 207 if self.flavor not in self.flavor_configs: 208 error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector") 209 if not self.__dict__.get('uri'): 210 self.verify_attributes( 211 self.flavor_configs[self.flavor].get('requirements', set()), 212 debug=debug, 213 ) 214 215 if wait: 216 from meerschaum.connectors.poll import retry_connect 217 retry_connect(connector=self, debug=debug) 218 219 if connect: 220 if not self.test_connection(debug=debug): 221 warn(f"Failed to connect with connector '{self}'!", stack=False) 222 223 @property 224 def Session(self): 225 if '_Session' not in self.__dict__: 226 if self.engine is None: 227 return None 228 229 from meerschaum.utils.packages import attempt_import 230 sqlalchemy_orm = attempt_import('sqlalchemy.orm', lazy=False) 231 session_factory = sqlalchemy_orm.sessionmaker(self.engine) 232 self._Session = sqlalchemy_orm.scoped_session(session_factory) 233 234 return self._Session 235 236 @property 237 def engine(self): 238 """ 239 Return the SQLAlchemy engine connected to the configured database. 240 """ 241 import os 242 import threading 243 if '_engine' not in self.__dict__: 244 self._engine, self._engine_str = self.create_engine(include_uri=True) 245 246 same_process = os.getpid() == self._pid 247 same_thread = threading.current_thread().ident == self._thread_ident 248 249 ### handle child processes 250 if not same_process: 251 self._pid = os.getpid() 252 self._thread = threading.current_thread() 253 warn("Different PID detected. Disposing of connections...") 254 self._engine.dispose() 255 256 ### handle different threads 257 if not same_thread: 258 if self.flavor == 'duckdb': 259 warn("Different thread detected.") 260 self._engine.dispose() 261 262 return self._engine 263 264 @property 265 def DATABASE_URL(self) -> str: 266 """ 267 Return the URI connection string (alias for `SQLConnector.URI`. 268 """ 269 _ = self.engine 270 return str(self._engine_str) 271 272 @property 273 def URI(self) -> str: 274 """ 275 Return the URI connection string. 276 """ 277 _ = self.engine 278 return str(self._engine_str) 279 280 @property 281 def IS_THREAD_SAFE(self) -> str: 282 """ 283 Return whether this connector may be multithreaded. 284 """ 285 if self.flavor in ('duckdb', 'oracle'): 286 return False 287 if self.flavor == 'sqlite': 288 return ':memory:' not in self.URI 289 return True 290 291 @property 292 def metadata(self): 293 """ 294 Return the metadata bound to this configured schema. 295 """ 296 from meerschaum.utils.packages import attempt_import 297 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 298 if '_metadata' not in self.__dict__: 299 self._metadata = sqlalchemy.MetaData(schema=self.schema) 300 return self._metadata 301 302 @property 303 def instance_schema(self): 304 """ 305 Return the schema name for Meerschaum tables. 306 """ 307 return self.schema 308 309 @property 310 def internal_schema(self): 311 """ 312 Return the schema name for internal tables. 313 """ 314 from meerschaum.config.static import STATIC_CONFIG 315 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 316 schema_name = self.__dict__.get('internal_schema', None) or ( 317 STATIC_CONFIG['sql']['internal_schema'] 318 if self.flavor not in NO_SCHEMA_FLAVORS 319 else self.schema 320 ) 321 322 if '_internal_schema' not in self.__dict__: 323 self._internal_schema = schema_name 324 return self._internal_schema 325 326 @property 327 def db(self) -> Optional[databases.Database]: 328 from meerschaum.utils.packages import attempt_import 329 databases = attempt_import('databases', lazy=False, install=True) 330 url = self.DATABASE_URL 331 if 'mysql' in url: 332 url = url.replace('+pymysql', '') 333 if '_db' not in self.__dict__: 334 try: 335 self._db = databases.Database(url) 336 except KeyError: 337 ### Likely encountered an unsupported flavor. 338 from meerschaum.utils.warnings import warn 339 self._db = None 340 return self._db 341 342 @property 343 def db_version(self) -> Union[str, None]: 344 """ 345 Return the database version. 346 """ 347 _db_version = self.__dict__.get('_db_version', None) 348 if _db_version is not None: 349 return _db_version 350 351 from meerschaum.utils.sql import get_db_version 352 self._db_version = get_db_version(self) 353 return self._db_version 354 355 @property 356 def schema(self) -> Union[str, None]: 357 """ 358 Return the default schema to use. 359 A value of `None` will not prepend a schema. 360 """ 361 if 'schema' in self.__dict__: 362 return self.__dict__['schema'] 363 364 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 365 if self.flavor in NO_SCHEMA_FLAVORS: 366 self.__dict__['schema'] = None 367 return None 368 369 sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False) 370 _schema = sqlalchemy.inspect(self.engine).default_schema_name 371 self.__dict__['schema'] = _schema 372 return _schema 373 374 def __getstate__(self): 375 return self.__dict__ 376 377 def __setstate__(self, d): 378 self.__dict__.update(d) 379 380 def __call__(self): 381 return self
Connect to SQL databases via sqlalchemy
.
SQLConnectors may be used as Meerschaum instance connectors. Read more about connectors and instances at https://meerschaum.io/reference/connectors/
106 def __init__( 107 self, 108 label: Optional[str] = None, 109 flavor: Optional[str] = None, 110 wait: bool = False, 111 connect: bool = False, 112 debug: bool = False, 113 **kw: Any 114 ): 115 """ 116 Parameters 117 ---------- 118 label: str, default 'main' 119 The identifying label for the connector. 120 E.g. for `sql:main`, 'main' is the label. 121 Defaults to 'main'. 122 123 flavor: Optional[str], default None 124 The database flavor, e.g. 125 `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc. 126 To see supported flavors, run the `bootstrap connectors` command. 127 128 wait: bool, default False 129 If `True`, block until a database connection has been made. 130 Defaults to `False`. 131 132 connect: bool, default False 133 If `True`, immediately attempt to connect the database and raise 134 a warning if the connection fails. 135 Defaults to `False`. 136 137 debug: bool, default False 138 Verbosity toggle. 139 Defaults to `False`. 140 141 kw: Any 142 All other arguments will be passed to the connector's attributes. 143 Therefore, a connector may be made without being registered, 144 as long enough parameters are supplied to the constructor. 145 """ 146 if 'uri' in kw: 147 uri = kw['uri'] 148 if uri.startswith('postgres') and not uri.startswith('postgresql'): 149 uri = uri.replace('postgres', 'postgresql', 1) 150 if uri.startswith('postgresql') and not uri.startswith('postgresql+'): 151 uri = uri.replace('postgresql://', 'postgresql+psycopg://', 1) 152 if uri.startswith('timescaledb://'): 153 uri = uri.replace('timescaledb://', 'postgresql+psycopg://', 1) 154 flavor = 'timescaledb' 155 kw['uri'] = uri 156 from_uri_params = self.from_uri(kw['uri'], as_dict=True) 157 label = label or from_uri_params.get('label', None) 158 _ = from_uri_params.pop('label', None) 159 160 ### Sometimes the flavor may be provided with a URI. 161 kw.update(from_uri_params) 162 if flavor: 163 kw['flavor'] = flavor 164 165 ### set __dict__ in base class 166 super().__init__( 167 'sql', 168 label = label or self.__dict__.get('label', None), 169 **kw 170 ) 171 172 if self.__dict__.get('flavor', None) == 'sqlite': 173 self._reset_attributes() 174 self._set_attributes( 175 'sql', 176 label = label, 177 inherit_default = False, 178 **kw 179 ) 180 ### For backwards compatability reasons, set the path for sql:local if its missing. 181 if self.label == 'local' and not self.__dict__.get('database', None): 182 from meerschaum.config._paths import SQLITE_DB_PATH 183 self.database = str(SQLITE_DB_PATH) 184 185 ### ensure flavor and label are set accordingly 186 if 'flavor' not in self.__dict__: 187 if flavor is None and 'uri' not in self.__dict__: 188 raise Exception( 189 f" Missing flavor. Provide flavor as a key for '{self}'." 190 ) 191 self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None) 192 193 if self.flavor == 'postgres': 194 self.flavor = 'postgresql' 195 196 self._debug = debug 197 ### Store the PID and thread at initialization 198 ### so we can dispose of the Pool in child processes or threads. 199 import os 200 import threading 201 self._pid = os.getpid() 202 self._thread_ident = threading.current_thread().ident 203 self._sessions = {} 204 self._locks = {'_sessions': threading.RLock(), } 205 206 ### verify the flavor's requirements are met 207 if self.flavor not in self.flavor_configs: 208 error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector") 209 if not self.__dict__.get('uri'): 210 self.verify_attributes( 211 self.flavor_configs[self.flavor].get('requirements', set()), 212 debug=debug, 213 ) 214 215 if wait: 216 from meerschaum.connectors.poll import retry_connect 217 retry_connect(connector=self, debug=debug) 218 219 if connect: 220 if not self.test_connection(debug=debug): 221 warn(f"Failed to connect with connector '{self}'!", stack=False)
Parameters
- label (str, default 'main'):
The identifying label for the connector.
E.g. for
sql:main
, 'main' is the label. Defaults to 'main'. - flavor (Optional[str], default None):
The database flavor, e.g.
'sqlite'
,'postgresql'
,'cockroachdb'
, etc. To see supported flavors, run thebootstrap connectors
command. - wait (bool, default False):
If
True
, block until a database connection has been made. Defaults toFalse
. - connect (bool, default False):
If
True
, immediately attempt to connect the database and raise a warning if the connection fails. Defaults toFalse
. - debug (bool, default False):
Verbosity toggle.
Defaults to
False
. - kw (Any): All other arguments will be passed to the connector's attributes. Therefore, a connector may be made without being registered, as long enough parameters are supplied to the constructor.
223 @property 224 def Session(self): 225 if '_Session' not in self.__dict__: 226 if self.engine is None: 227 return None 228 229 from meerschaum.utils.packages import attempt_import 230 sqlalchemy_orm = attempt_import('sqlalchemy.orm', lazy=False) 231 session_factory = sqlalchemy_orm.sessionmaker(self.engine) 232 self._Session = sqlalchemy_orm.scoped_session(session_factory) 233 234 return self._Session
236 @property 237 def engine(self): 238 """ 239 Return the SQLAlchemy engine connected to the configured database. 240 """ 241 import os 242 import threading 243 if '_engine' not in self.__dict__: 244 self._engine, self._engine_str = self.create_engine(include_uri=True) 245 246 same_process = os.getpid() == self._pid 247 same_thread = threading.current_thread().ident == self._thread_ident 248 249 ### handle child processes 250 if not same_process: 251 self._pid = os.getpid() 252 self._thread = threading.current_thread() 253 warn("Different PID detected. Disposing of connections...") 254 self._engine.dispose() 255 256 ### handle different threads 257 if not same_thread: 258 if self.flavor == 'duckdb': 259 warn("Different thread detected.") 260 self._engine.dispose() 261 262 return self._engine
Return the SQLAlchemy engine connected to the configured database.
264 @property 265 def DATABASE_URL(self) -> str: 266 """ 267 Return the URI connection string (alias for `SQLConnector.URI`. 268 """ 269 _ = self.engine 270 return str(self._engine_str)
Return the URI connection string (alias for SQLConnector.URI
.
272 @property 273 def URI(self) -> str: 274 """ 275 Return the URI connection string. 276 """ 277 _ = self.engine 278 return str(self._engine_str)
Return the URI connection string.
280 @property 281 def IS_THREAD_SAFE(self) -> str: 282 """ 283 Return whether this connector may be multithreaded. 284 """ 285 if self.flavor in ('duckdb', 'oracle'): 286 return False 287 if self.flavor == 'sqlite': 288 return ':memory:' not in self.URI 289 return True
Return whether this connector may be multithreaded.
291 @property 292 def metadata(self): 293 """ 294 Return the metadata bound to this configured schema. 295 """ 296 from meerschaum.utils.packages import attempt_import 297 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 298 if '_metadata' not in self.__dict__: 299 self._metadata = sqlalchemy.MetaData(schema=self.schema) 300 return self._metadata
Return the metadata bound to this configured schema.
302 @property 303 def instance_schema(self): 304 """ 305 Return the schema name for Meerschaum tables. 306 """ 307 return self.schema
Return the schema name for Meerschaum tables.
309 @property 310 def internal_schema(self): 311 """ 312 Return the schema name for internal tables. 313 """ 314 from meerschaum.config.static import STATIC_CONFIG 315 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 316 schema_name = self.__dict__.get('internal_schema', None) or ( 317 STATIC_CONFIG['sql']['internal_schema'] 318 if self.flavor not in NO_SCHEMA_FLAVORS 319 else self.schema 320 ) 321 322 if '_internal_schema' not in self.__dict__: 323 self._internal_schema = schema_name 324 return self._internal_schema
Return the schema name for internal tables.
326 @property 327 def db(self) -> Optional[databases.Database]: 328 from meerschaum.utils.packages import attempt_import 329 databases = attempt_import('databases', lazy=False, install=True) 330 url = self.DATABASE_URL 331 if 'mysql' in url: 332 url = url.replace('+pymysql', '') 333 if '_db' not in self.__dict__: 334 try: 335 self._db = databases.Database(url) 336 except KeyError: 337 ### Likely encountered an unsupported flavor. 338 from meerschaum.utils.warnings import warn 339 self._db = None 340 return self._db
342 @property 343 def db_version(self) -> Union[str, None]: 344 """ 345 Return the database version. 346 """ 347 _db_version = self.__dict__.get('_db_version', None) 348 if _db_version is not None: 349 return _db_version 350 351 from meerschaum.utils.sql import get_db_version 352 self._db_version = get_db_version(self) 353 return self._db_version
Return the database version.
355 @property 356 def schema(self) -> Union[str, None]: 357 """ 358 Return the default schema to use. 359 A value of `None` will not prepend a schema. 360 """ 361 if 'schema' in self.__dict__: 362 return self.__dict__['schema'] 363 364 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 365 if self.flavor in NO_SCHEMA_FLAVORS: 366 self.__dict__['schema'] = None 367 return None 368 369 sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False) 370 _schema = sqlalchemy.inspect(self.engine).default_schema_name 371 self.__dict__['schema'] = _schema 372 return _schema
Return the default schema to use.
A value of None
will not prepend a schema.
181def create_engine( 182 self, 183 include_uri: bool = False, 184 debug: bool = False, 185 **kw 186) -> 'sqlalchemy.engine.Engine': 187 """Create a sqlalchemy engine by building the engine string.""" 188 from meerschaum.utils.packages import attempt_import 189 from meerschaum.utils.warnings import error, warn 190 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 191 import urllib 192 import copy 193 ### Install and patch required drivers. 194 if self.flavor in install_flavor_drivers: 195 _ = attempt_import( 196 *install_flavor_drivers[self.flavor], 197 debug=debug, 198 lazy=False, 199 warn=False, 200 ) 201 if self.flavor == 'mssql': 202 pyodbc = attempt_import('pyodbc', debug=debug, lazy=False, warn=False) 203 pyodbc.pooling = False 204 if self.flavor in require_patching_flavors: 205 from meerschaum.utils.packages import determine_version, _monkey_patch_get_distribution 206 import pathlib 207 for install_name, import_name in require_patching_flavors[self.flavor]: 208 pkg = attempt_import( 209 import_name, 210 debug=debug, 211 lazy=False, 212 warn=False 213 ) 214 _monkey_patch_get_distribution( 215 install_name, determine_version(pathlib.Path(pkg.__file__), venv='mrsm') 216 ) 217 218 ### supplement missing values with defaults (e.g. port number) 219 for a, value in flavor_configs[self.flavor]['defaults'].items(): 220 if a not in self.__dict__: 221 self.__dict__[a] = value 222 223 ### Verify that everything is in order. 224 if self.flavor not in flavor_configs: 225 error(f"Cannot create a connector with the flavor '{self.flavor}'.") 226 227 _engine = flavor_configs[self.flavor].get('engine', None) 228 _username = self.__dict__.get('username', None) 229 _password = self.__dict__.get('password', None) 230 _host = self.__dict__.get('host', None) 231 _port = self.__dict__.get('port', None) 232 _database = self.__dict__.get('database', None) 233 _options = self.__dict__.get('options', {}) 234 if isinstance(_options, str): 235 _options = dict(urllib.parse.parse_qsl(_options)) 236 _uri = self.__dict__.get('uri', None) 237 238 ### Handle registering specific dialects (due to installing in virtual environments). 239 if self.flavor in flavor_dialects: 240 sqlalchemy.dialects.registry.register(*flavor_dialects[self.flavor]) 241 242 ### self._sys_config was deepcopied and can be updated safely 243 if self.flavor in ("sqlite", "duckdb"): 244 engine_str = f"{_engine}:///{_database}" if not _uri else _uri 245 if 'create_engine' not in self._sys_config: 246 self._sys_config['create_engine'] = {} 247 if 'connect_args' not in self._sys_config['create_engine']: 248 self._sys_config['create_engine']['connect_args'] = {} 249 self._sys_config['create_engine']['connect_args'].update({"check_same_thread": False}) 250 else: 251 engine_str = ( 252 _engine + "://" + (_username if _username is not None else '') + 253 ((":" + urllib.parse.quote_plus(_password)) if _password is not None else '') + 254 "@" + _host + ((":" + str(_port)) if _port is not None else '') + 255 (("/" + _database) if _database is not None else '') 256 + (("?" + urllib.parse.urlencode(_options)) if _options else '') 257 ) if not _uri else _uri 258 259 ### Sometimes the timescaledb:// flavor can slip in. 260 if _uri and self.flavor in ('timescaledb',) and self.flavor in _uri: 261 engine_str = engine_str.replace(f'{self.flavor}', 'postgresql', 1) 262 263 if debug: 264 dprint( 265 ( 266 (engine_str.replace(':' + _password, ':' + ('*' * len(_password)))) 267 if _password is not None else engine_str 268 ) + '\n' + f"{self._sys_config.get('create_engine', {}).get('connect_args', {})}" 269 ) 270 271 _kw_copy = copy.deepcopy(kw) 272 273 ### NOTE: Order of inheritance: 274 ### 1. Defaults 275 ### 2. System configuration 276 ### 3. Connector configuration 277 ### 4. Keyword arguments 278 _create_engine_args = flavor_configs.get(self.flavor, {}).get('create_engine', {}) 279 def _apply_create_engine_args(update): 280 if 'ALL' not in flavor_configs[self.flavor].get('omit_create_engine', {}): 281 _create_engine_args.update( 282 { k: v for k, v in update.items() 283 if 'omit_create_engine' not in flavor_configs[self.flavor] 284 or k not in flavor_configs[self.flavor].get('omit_create_engine') 285 } 286 ) 287 _apply_create_engine_args(self._sys_config.get('create_engine', {})) 288 _apply_create_engine_args(self.__dict__.get('create_engine', {})) 289 _apply_create_engine_args(_kw_copy) 290 291 try: 292 engine = sqlalchemy.create_engine( 293 engine_str, 294 ### I know this looks confusing, and maybe it's bad code, 295 ### but it's simple. It dynamically parses the config string 296 ### and splits it to separate the class name (QueuePool) 297 ### from the module name (sqlalchemy.pool). 298 poolclass = getattr( 299 attempt_import( 300 ".".join(self._sys_config['poolclass'].split('.')[:-1]) 301 ), 302 self._sys_config['poolclass'].split('.')[-1] 303 ), 304 echo = debug, 305 **_create_engine_args 306 ) 307 except Exception as e: 308 warn(f"Failed to create connector '{self}':\n{traceback.format_exc()}", stack=False) 309 engine = None 310 311 if include_uri: 312 return engine, engine_str 313 return engine
Create a sqlalchemy engine by building the engine string.
28def read( 29 self, 30 query_or_table: Union[str, sqlalchemy.Query], 31 params: Union[Dict[str, Any], List[str], None] = None, 32 dtype: Optional[Dict[str, Any]] = None, 33 coerce_float: bool = True, 34 chunksize: Optional[int] = -1, 35 workers: Optional[int] = None, 36 chunk_hook: Optional[Callable[[pandas.DataFrame], Any]] = None, 37 as_hook_results: bool = False, 38 chunks: Optional[int] = None, 39 schema: Optional[str] = None, 40 as_chunks: bool = False, 41 as_iterator: bool = False, 42 as_dask: bool = False, 43 index_col: Optional[str] = None, 44 silent: bool = False, 45 debug: bool = False, 46 **kw: Any 47) -> Union[ 48 pandas.DataFrame, 49 dask.DataFrame, 50 List[pandas.DataFrame], 51 List[Any], 52 None, 53]: 54 """ 55 Read a SQL query or table into a pandas dataframe. 56 57 Parameters 58 ---------- 59 query_or_table: Union[str, sqlalchemy.Query] 60 The SQL query (sqlalchemy Query or string) or name of the table from which to select. 61 62 params: Optional[Dict[str, Any]], default None 63 `List` or `Dict` of parameters to pass to `pandas.read_sql()`. 64 See the pandas documentation for more information: 65 https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html 66 67 dtype: Optional[Dict[str, Any]], default None 68 A dictionary of data types to pass to `pandas.read_sql()`. 69 See the pandas documentation for more information: 70 https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html 71 72 chunksize: Optional[int], default -1 73 How many chunks to read at a time. `None` will read everything in one large chunk. 74 Defaults to system configuration. 75 76 **NOTE:** DuckDB does not allow for chunking. 77 78 workers: Optional[int], default None 79 How many threads to use when consuming the generator. 80 Only applies if `chunk_hook` is provided. 81 82 chunk_hook: Optional[Callable[[pandas.DataFrame], Any]], default None 83 Hook function to execute once per chunk, e.g. writing and reading chunks intermittently. 84 See `--sync-chunks` for an example. 85 **NOTE:** `as_iterator` MUST be False (default). 86 87 as_hook_results: bool, default False 88 If `True`, return a `List` of the outputs of the hook function. 89 Only applicable if `chunk_hook` is not None. 90 91 **NOTE:** `as_iterator` MUST be `False` (default). 92 93 chunks: Optional[int], default None 94 Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and 95 return into a single dataframe. 96 For example, to limit the returned dataframe to 100,000 rows, 97 you could specify a `chunksize` of `1000` and `chunks` of `100`. 98 99 schema: Optional[str], default None 100 If just a table name is provided, optionally specify the table schema. 101 Defaults to `SQLConnector.schema`. 102 103 as_chunks: bool, default False 104 If `True`, return a list of DataFrames. 105 Otherwise return a single DataFrame. 106 107 as_iterator: bool, default False 108 If `True`, return the pandas DataFrame iterator. 109 `chunksize` must not be `None` (falls back to 1000 if so), 110 and hooks are not called in this case. 111 112 index_col: Optional[str], default None 113 If using Dask, use this column as the index column. 114 If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame. 115 116 silent: bool, default False 117 If `True`, don't raise warnings in case of errors. 118 Defaults to `False`. 119 120 Returns 121 ------- 122 A `pd.DataFrame` (default case), or an iterator, or a list of dataframes / iterators, 123 or `None` if something breaks. 124 125 """ 126 if chunks is not None and chunks <= 0: 127 return [] 128 from meerschaum.utils.sql import sql_item_name, truncate_item_name 129 from meerschaum.utils.dtypes import are_dtypes_equal, coerce_timezone 130 from meerschaum.utils.dtypes.sql import TIMEZONE_NAIVE_FLAVORS 131 from meerschaum.utils.packages import attempt_import, import_pandas 132 from meerschaum.utils.pool import get_pool 133 from meerschaum.utils.dataframe import chunksize_to_npartitions, get_numeric_cols 134 import warnings 135 import traceback 136 from decimal import Decimal 137 pd = import_pandas() 138 dd = None 139 is_dask = 'dask' in pd.__name__ 140 pandas = attempt_import('pandas') 141 is_dask = dd is not None 142 npartitions = chunksize_to_npartitions(chunksize) 143 if is_dask: 144 chunksize = None 145 schema = schema or self.schema 146 utc_dt_cols = [ 147 col 148 for col, typ in dtype.items() 149 if are_dtypes_equal(typ, 'datetime') and 'utc' in typ.lower() 150 ] if dtype else [] 151 152 if dtype and utc_dt_cols and self.flavor in TIMEZONE_NAIVE_FLAVORS: 153 dtype = dtype.copy() 154 for col in utc_dt_cols: 155 dtype[col] = 'datetime64[ns]' 156 157 pool = get_pool(workers=workers) 158 sqlalchemy = attempt_import("sqlalchemy", lazy=False) 159 default_chunksize = self._sys_config.get('chunksize', None) 160 chunksize = chunksize if chunksize != -1 else default_chunksize 161 if chunksize is None and as_iterator: 162 if not silent and self.flavor not in _disallow_chunks_flavors: 163 warn( 164 "An iterator may only be generated if chunksize is not None.\n" 165 + "Falling back to a chunksize of 1000.", stacklevel=3, 166 ) 167 chunksize = 1000 168 if chunksize is not None and self.flavor in _max_chunks_flavors: 169 if chunksize > _max_chunks_flavors[self.flavor]: 170 if chunksize != default_chunksize: 171 warn( 172 f"The specified chunksize of {chunksize} exceeds the maximum of " 173 + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n" 174 + f" Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.", 175 stacklevel=3, 176 ) 177 chunksize = _max_chunks_flavors[self.flavor] 178 179 if chunksize is not None and self.flavor in _disallow_chunks_flavors: 180 chunksize = None 181 182 if debug: 183 import time 184 start = time.perf_counter() 185 dprint(f"[{self}]\n{query_or_table}") 186 dprint(f"[{self}] Fetching with chunksize: {chunksize}") 187 188 ### This might be sqlalchemy object or the string of a table name. 189 ### We check for spaces and quotes to see if it might be a weird table. 190 if ( 191 ' ' not in str(query_or_table) 192 or ( 193 ' ' in str(query_or_table) 194 and str(query_or_table).startswith('"') 195 and str(query_or_table).endswith('"') 196 ) 197 ): 198 truncated_table_name = truncate_item_name(str(query_or_table), self.flavor) 199 if truncated_table_name != str(query_or_table) and not silent: 200 warn( 201 f"Table '{query_or_table}' is too long for '{self.flavor}'," 202 + f" will instead read the table '{truncated_table_name}'." 203 ) 204 205 query_or_table = sql_item_name(str(query_or_table), self.flavor, schema) 206 if debug: 207 dprint(f"[{self}] Reading from table {query_or_table}") 208 formatted_query = sqlalchemy.text("SELECT * FROM " + str(query_or_table)) 209 str_query = f"SELECT * FROM {query_or_table}" 210 else: 211 str_query = query_or_table 212 213 formatted_query = ( 214 sqlalchemy.text(str_query) 215 if not is_dask and isinstance(str_query, str) 216 else format_sql_query_for_dask(str_query) 217 ) 218 219 chunk_list = [] 220 chunk_hook_results = [] 221 def _process_chunk(_chunk, _retry_on_failure: bool = True): 222 if self.flavor in TIMEZONE_NAIVE_FLAVORS: 223 for col in utc_dt_cols: 224 _chunk[col] = coerce_timezone(_chunk[col], strip_timezone=False) 225 if not as_hook_results: 226 chunk_list.append(_chunk) 227 if chunk_hook is None: 228 return None 229 230 result = None 231 try: 232 result = chunk_hook( 233 _chunk, 234 workers=workers, 235 chunksize=chunksize, 236 debug=debug, 237 **kw 238 ) 239 except Exception: 240 result = False, traceback.format_exc() 241 from meerschaum.utils.formatting import get_console 242 if not silent: 243 get_console().print_exception() 244 245 ### If the chunk fails to process, try it again one more time. 246 if isinstance(result, tuple) and result[0] is False: 247 if _retry_on_failure: 248 return _process_chunk(_chunk, _retry_on_failure=False) 249 250 return result 251 252 try: 253 stream_results = not as_iterator and chunk_hook is not None and chunksize is not None 254 with warnings.catch_warnings(): 255 warnings.filterwarnings('ignore', 'case sensitivity issues') 256 257 read_sql_query_kwargs = { 258 'params': params, 259 'dtype': dtype, 260 'coerce_float': coerce_float, 261 'index_col': index_col, 262 } 263 if is_dask: 264 if index_col is None: 265 dd = None 266 pd = attempt_import('pandas') 267 read_sql_query_kwargs.update({ 268 'chunksize': chunksize, 269 }) 270 else: 271 read_sql_query_kwargs.update({ 272 'chunksize': chunksize, 273 }) 274 275 if is_dask and dd is not None: 276 ddf = dd.read_sql_query( 277 formatted_query, 278 self.URI, 279 **read_sql_query_kwargs 280 ) 281 else: 282 283 def get_chunk_generator(connectable): 284 chunk_generator = pd.read_sql_query( 285 formatted_query, 286 self.engine, 287 **read_sql_query_kwargs 288 ) 289 to_return = ( 290 chunk_generator 291 if as_iterator or chunksize is None 292 else ( 293 list(pool.imap(_process_chunk, chunk_generator)) 294 if as_hook_results 295 else None 296 ) 297 ) 298 return chunk_generator, to_return 299 300 if self.flavor in SKIP_READ_TRANSACTION_FLAVORS: 301 chunk_generator, to_return = get_chunk_generator(self.engine) 302 else: 303 with self.engine.begin() as transaction: 304 with transaction.execution_options(stream_results=stream_results) as connection: 305 chunk_generator, to_return = get_chunk_generator(connection) 306 307 if to_return is not None: 308 return to_return 309 310 except Exception as e: 311 if debug: 312 dprint(f"[{self}] Failed to execute query:\n\n{query_or_table}\n\n") 313 if not silent: 314 warn(str(e), stacklevel=3) 315 from meerschaum.utils.formatting import get_console 316 if not silent: 317 get_console().print_exception() 318 319 return None 320 321 if is_dask and dd is not None: 322 ddf = ddf.reset_index() 323 return ddf 324 325 chunk_list = [] 326 read_chunks = 0 327 chunk_hook_results = [] 328 if chunksize is None: 329 chunk_list.append(chunk_generator) 330 elif as_iterator: 331 return chunk_generator 332 else: 333 try: 334 for chunk in chunk_generator: 335 if chunk_hook is not None: 336 chunk_hook_results.append( 337 chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw) 338 ) 339 chunk_list.append(chunk) 340 read_chunks += 1 341 if chunks is not None and read_chunks >= chunks: 342 break 343 except Exception as e: 344 warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3) 345 from meerschaum.utils.formatting import get_console 346 if not silent: 347 get_console().print_exception() 348 349 read_chunks = 0 350 try: 351 for chunk in chunk_generator: 352 if chunk_hook is not None: 353 chunk_hook_results.append( 354 chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw) 355 ) 356 chunk_list.append(chunk) 357 read_chunks += 1 358 if chunks is not None and read_chunks >= chunks: 359 break 360 except Exception as e: 361 warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3) 362 from meerschaum.utils.formatting import get_console 363 if not silent: 364 get_console().print_exception() 365 366 return None 367 368 ### If no chunks returned, read without chunks 369 ### to get columns 370 if len(chunk_list) == 0: 371 with warnings.catch_warnings(): 372 warnings.filterwarnings('ignore', 'case sensitivity issues') 373 _ = read_sql_query_kwargs.pop('chunksize', None) 374 with self.engine.begin() as connection: 375 chunk_list.append( 376 pd.read_sql_query( 377 formatted_query, 378 connection, 379 **read_sql_query_kwargs 380 ) 381 ) 382 383 ### call the hook on any missed chunks. 384 if chunk_hook is not None and len(chunk_list) > len(chunk_hook_results): 385 for c in chunk_list[len(chunk_hook_results):]: 386 chunk_hook_results.append( 387 chunk_hook(c, chunksize=chunksize, debug=debug, **kw) 388 ) 389 390 ### chunksize is not None so must iterate 391 if debug: 392 end = time.perf_counter() 393 dprint(f"Fetched {len(chunk_list)} chunks in {round(end - start, 2)} seconds.") 394 395 if as_hook_results: 396 return chunk_hook_results 397 398 ### Skip `pd.concat()` if `as_chunks` is specified. 399 if as_chunks: 400 for c in chunk_list: 401 c.reset_index(drop=True, inplace=True) 402 for col in get_numeric_cols(c): 403 c[col] = c[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x) 404 return chunk_list 405 406 df = pd.concat(chunk_list).reset_index(drop=True) 407 ### NOTE: The calls to `canonical()` are to drop leading and trailing zeroes. 408 for col in get_numeric_cols(df): 409 df[col] = df[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x) 410 411 return df
Read a SQL query or table into a pandas dataframe.
Parameters
- query_or_table (Union[str, sqlalchemy.Query]): The SQL query (sqlalchemy Query or string) or name of the table from which to select.
- params (Optional[Dict[str, Any]], default None):
List
orDict
of parameters to pass topandas.read_sql()
. See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html - dtype (Optional[Dict[str, Any]], default None):
A dictionary of data types to pass to
pandas.read_sql()
. See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html chunksize (Optional[int], default -1): How many chunks to read at a time.
None
will read everything in one large chunk. Defaults to system configuration.NOTE: DuckDB does not allow for chunking.
- workers (Optional[int], default None):
How many threads to use when consuming the generator.
Only applies if
chunk_hook
is provided. - chunk_hook (Optional[Callable[[pandas.DataFrame], Any]], default None):
Hook function to execute once per chunk, e.g. writing and reading chunks intermittently.
See
--sync-chunks
for an example. NOTE:as_iterator
MUST be False (default). as_hook_results (bool, default False): If
True
, return aList
of the outputs of the hook function. Only applicable ifchunk_hook
is not None.NOTE:
as_iterator
MUST beFalse
(default).- chunks (Optional[int], default None):
Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and
return into a single dataframe.
For example, to limit the returned dataframe to 100,000 rows,
you could specify a
chunksize
of1000
andchunks
of100
. - schema (Optional[str], default None):
If just a table name is provided, optionally specify the table schema.
Defaults to
SQLConnector.schema
. - as_chunks (bool, default False):
If
True
, return a list of DataFrames. Otherwise return a single DataFrame. - as_iterator (bool, default False):
If
True
, return the pandas DataFrame iterator.chunksize
must not beNone
(falls back to 1000 if so), and hooks are not called in this case. - index_col (Optional[str], default None): If using Dask, use this column as the index column. If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame.
- silent (bool, default False):
If
True
, don't raise warnings in case of errors. Defaults toFalse
.
Returns
- A
pd.DataFrame
(default case), or an iterator, or a list of dataframes / iterators, - or
None
if something breaks.
414def value( 415 self, 416 query: str, 417 *args: Any, 418 use_pandas: bool = False, 419 **kw: Any 420) -> Any: 421 """ 422 Execute the provided query and return the first value. 423 424 Parameters 425 ---------- 426 query: str 427 The SQL query to execute. 428 429 *args: Any 430 The arguments passed to `meerschaum.connectors.sql.SQLConnector.exec` 431 if `use_pandas` is `False` (default) or to `meerschaum.connectors.sql.SQLConnector.read`. 432 433 use_pandas: bool, default False 434 If `True`, use `meerschaum.connectors.SQLConnector.read`, otherwise use 435 `meerschaum.connectors.sql.SQLConnector.exec` (default). 436 **NOTE:** This is always `True` for DuckDB. 437 438 **kw: Any 439 See `args`. 440 441 Returns 442 ------- 443 Any value returned from the query. 444 445 """ 446 from meerschaum.utils.packages import attempt_import 447 if self.flavor == 'duckdb': 448 use_pandas = True 449 if use_pandas: 450 try: 451 return self.read(query, *args, **kw).iloc[0, 0] 452 except Exception: 453 return None 454 455 _close = kw.get('close', True) 456 _commit = kw.get('commit', (self.flavor != 'mssql')) 457 458 try: 459 result, connection = self.exec( 460 query, 461 *args, 462 with_connection=True, 463 close=False, 464 commit=_commit, 465 **kw 466 ) 467 first = result.first() if result is not None else None 468 _val = first[0] if first is not None else None 469 except Exception as e: 470 warn(e, stacklevel=3) 471 return None 472 if _close: 473 try: 474 connection.close() 475 except Exception as e: 476 warn("Failed to close connection with exception:\n" + str(e)) 477 return _val
Execute the provided query and return the first value.
Parameters
- query (str): The SQL query to execute.
- *args (Any):
The arguments passed to
meerschaum.connectors.sql.SQLConnector.exec
ifuse_pandas
isFalse
(default) or tomeerschaum.connectors.sql.SQLConnector.read
. - use_pandas (bool, default False):
If
True
, usemeerschaum.connectors.SQLConnector.read
, otherwise usemeerschaum.connectors.sql.SQLConnector.exec
(default). NOTE: This is alwaysTrue
for DuckDB. - **kw (Any):
See
args
.
Returns
- Any value returned from the query.
491def exec( 492 self, 493 query: str, 494 *args: Any, 495 silent: bool = False, 496 debug: bool = False, 497 commit: Optional[bool] = None, 498 close: Optional[bool] = None, 499 with_connection: bool = False, 500 _connection=None, 501 _transaction=None, 502 **kw: Any 503) -> Union[ 504 sqlalchemy.engine.result.resultProxy, 505 sqlalchemy.engine.cursor.LegacyCursorResult, 506 Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection], 507 Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection], 508 None 509]: 510 """ 511 Execute SQL code and return the `sqlalchemy` result, e.g. when calling stored procedures. 512 513 If inserting data, please use bind variables to avoid SQL injection! 514 515 Parameters 516 ---------- 517 query: Union[str, List[str], Tuple[str]] 518 The query to execute. 519 If `query` is a list or tuple, call `self.exec_queries()` instead. 520 521 args: Any 522 Arguments passed to `sqlalchemy.engine.execute`. 523 524 silent: bool, default False 525 If `True`, suppress warnings. 526 527 commit: Optional[bool], default None 528 If `True`, commit the changes after execution. 529 Causes issues with flavors like `'mssql'`. 530 This does not apply if `query` is a list of strings. 531 532 close: Optional[bool], default None 533 If `True`, close the connection after execution. 534 Causes issues with flavors like `'mssql'`. 535 This does not apply if `query` is a list of strings. 536 537 with_connection: bool, default False 538 If `True`, return a tuple including the connection object. 539 This does not apply if `query` is a list of strings. 540 541 Returns 542 ------- 543 The `sqlalchemy` result object, or a tuple with the connection if `with_connection` is provided. 544 545 """ 546 if isinstance(query, (list, tuple)): 547 return self.exec_queries( 548 list(query), 549 *args, 550 silent=silent, 551 debug=debug, 552 **kw 553 ) 554 555 from meerschaum.utils.packages import attempt_import 556 sqlalchemy = attempt_import("sqlalchemy", lazy=False) 557 if debug: 558 dprint(f"[{self}] Executing query:\n{query}") 559 560 _close = close if close is not None else (self.flavor != 'mssql') 561 _commit = commit if commit is not None else ( 562 (self.flavor != 'mssql' or 'select' not in str(query).lower()) 563 ) 564 565 ### Select and Insert objects need to be compiled (SQLAlchemy 2.0.0+). 566 if not hasattr(query, 'compile'): 567 query = sqlalchemy.text(query) 568 569 connection = _connection if _connection is not None else self.get_connection() 570 571 try: 572 transaction = ( 573 _transaction 574 if _transaction is not None else ( 575 connection.begin() 576 if _commit 577 else None 578 ) 579 ) 580 except sqlalchemy.exc.InvalidRequestError as e: 581 if _connection is not None or _transaction is not None: 582 raise e 583 connection = self.get_connection(rebuild=True) 584 transaction = connection.begin() 585 586 if transaction is not None and not transaction.is_active and _transaction is not None: 587 connection = self.get_connection(rebuild=True) 588 transaction = connection.begin() if _commit else None 589 590 result = None 591 try: 592 result = connection.execute(query, *args, **kw) 593 if _commit: 594 transaction.commit() 595 except Exception as e: 596 if debug: 597 dprint(f"[{self}] Failed to execute query:\n\n{query}\n\n{e}") 598 if not silent: 599 warn(str(e), stacklevel=3) 600 result = None 601 if _commit: 602 transaction.rollback() 603 connection = self.get_connection(rebuild=True) 604 finally: 605 if _close: 606 connection.close() 607 608 if with_connection: 609 return result, connection 610 611 return result
Execute SQL code and return the sqlalchemy
result, e.g. when calling stored procedures.
If inserting data, please use bind variables to avoid SQL injection!
Parameters
- query (Union[str, List[str], Tuple[str]]):
The query to execute.
If
query
is a list or tuple, callself.exec_queries()
instead. - args (Any):
Arguments passed to
sqlalchemy.engine.execute
. - silent (bool, default False):
If
True
, suppress warnings. - commit (Optional[bool], default None):
If
True
, commit the changes after execution. Causes issues with flavors like'mssql'
. This does not apply ifquery
is a list of strings. - close (Optional[bool], default None):
If
True
, close the connection after execution. Causes issues with flavors like'mssql'
. This does not apply ifquery
is a list of strings. - with_connection (bool, default False):
If
True
, return a tuple including the connection object. This does not apply ifquery
is a list of strings.
Returns
- The
sqlalchemy
result object, or a tuple with the connection ifwith_connection
is provided.
480def execute( 481 self, 482 *args : Any, 483 **kw : Any 484) -> Optional[sqlalchemy.engine.result.resultProxy]: 485 """ 486 An alias for `meerschaum.connectors.sql.SQLConnector.exec`. 487 """ 488 return self.exec(*args, **kw)
An alias for meerschaum.connectors.sql.SQLConnector.exec
.
709def to_sql( 710 self, 711 df: pandas.DataFrame, 712 name: str = None, 713 index: bool = False, 714 if_exists: str = 'replace', 715 method: str = "", 716 chunksize: Optional[int] = -1, 717 schema: Optional[str] = None, 718 safe_copy: bool = True, 719 silent: bool = False, 720 debug: bool = False, 721 as_tuple: bool = False, 722 as_dict: bool = False, 723 _connection=None, 724 _transaction=None, 725 **kw 726) -> Union[bool, SuccessTuple]: 727 """ 728 Upload a DataFrame's contents to the SQL server. 729 730 Parameters 731 ---------- 732 df: pd.DataFrame 733 The DataFrame to be inserted. 734 735 name: str 736 The name of the table to be created. 737 738 index: bool, default False 739 If True, creates the DataFrame's indices as columns. 740 741 if_exists: str, default 'replace' 742 Drop and create the table ('replace') or append if it exists 743 ('append') or raise Exception ('fail'). 744 Options are ['replace', 'append', 'fail']. 745 746 method: str, default '' 747 None or multi. Details on pandas.to_sql. 748 749 chunksize: Optional[int], default -1 750 How many rows to insert at a time. 751 752 schema: Optional[str], default None 753 Optionally override the schema for the table. 754 Defaults to `SQLConnector.schema`. 755 756 safe_copy: bool, defaul True 757 If `True`, copy the dataframe before making any changes. 758 759 as_tuple: bool, default False 760 If `True`, return a (success_bool, message) tuple instead of a `bool`. 761 Defaults to `False`. 762 763 as_dict: bool, default False 764 If `True`, return a dictionary of transaction information. 765 The keys are `success`, `msg`, `start`, `end`, `duration`, `num_rows`, `chunksize`, 766 `method`, and `target`. 767 768 kw: Any 769 Additional arguments will be passed to the DataFrame's `to_sql` function 770 771 Returns 772 ------- 773 Either a `bool` or a `SuccessTuple` (depends on `as_tuple`). 774 """ 775 import time 776 import json 777 from decimal import Decimal 778 from datetime import timedelta 779 from meerschaum.utils.warnings import error, warn 780 import warnings 781 import functools 782 783 if name is None: 784 error(f"Name must not be `None` to insert data into {self}.") 785 786 ### We're requiring `name` to be positional, and sometimes it's passed in from background jobs. 787 kw.pop('name', None) 788 789 schema = schema or self.schema 790 791 from meerschaum.utils.sql import ( 792 sql_item_name, 793 table_exists, 794 json_flavors, 795 truncate_item_name, 796 DROP_IF_EXISTS_FLAVORS, 797 ) 798 from meerschaum.utils.dataframe import ( 799 get_json_cols, 800 get_numeric_cols, 801 get_uuid_cols, 802 get_bytes_cols, 803 ) 804 from meerschaum.utils.dtypes import ( 805 are_dtypes_equal, 806 coerce_timezone, 807 encode_bytes_for_bytea, 808 serialize_bytes, 809 serialize_decimal, 810 json_serialize_value, 811 ) 812 from meerschaum.utils.dtypes.sql import ( 813 PD_TO_SQLALCHEMY_DTYPES_FLAVORS, 814 get_db_type_from_pd_type, 815 get_pd_type_from_db_type, 816 get_numeric_precision_scale, 817 ) 818 from meerschaum.utils.misc import interval_str 819 from meerschaum.connectors.sql._create_engine import flavor_configs 820 from meerschaum.utils.packages import attempt_import, import_pandas 821 sqlalchemy = attempt_import('sqlalchemy', debug=debug, lazy=False) 822 pd = import_pandas() 823 is_dask = 'dask' in df.__module__ 824 825 bytes_cols = get_bytes_cols(df) 826 numeric_cols = get_numeric_cols(df) 827 numeric_cols_dtypes = { 828 col: typ 829 for col, typ in kw.get('dtype', {}).items() 830 if ( 831 col in df.columns 832 and 'numeric' in str(typ).lower() 833 ) 834 835 } 836 numeric_cols.extend([col for col in numeric_cols_dtypes if col not in numeric_cols]) 837 838 enable_bulk_insert = mrsm.get_config( 839 'system', 'connectors', 'sql', 'bulk_insert' 840 ).get(self.flavor, False) 841 stats = {'target': name} 842 ### resort to defaults if None 843 copied = False 844 use_bulk_insert = False 845 if method == "": 846 if enable_bulk_insert: 847 method = ( 848 functools.partial(mssql_insert_json, debug=debug) 849 if self.flavor == 'mssql' 850 else functools.partial(psql_insert_copy, debug=debug) 851 ) 852 use_bulk_insert = True 853 else: 854 ### Should resolve to 'multi' or `None`. 855 method = flavor_configs.get(self.flavor, {}).get('to_sql', {}).get('method', 'multi') 856 857 if bytes_cols and (use_bulk_insert or self.flavor == 'oracle'): 858 if safe_copy and not copied: 859 df = df.copy() 860 copied = True 861 bytes_serializer = ( 862 functools.partial(encode_bytes_for_bytea, with_prefix=(self.flavor != 'oracle')) 863 if self.flavor != 'mssql' 864 else serialize_bytes 865 ) 866 for col in bytes_cols: 867 df[col] = df[col].apply(bytes_serializer) 868 869 ### Check for numeric columns. 870 for col in numeric_cols: 871 typ = numeric_cols_dtypes.get(col, None) 872 873 precision, scale = ( 874 (typ.precision, typ.scale) 875 if hasattr(typ, 'precision') 876 else get_numeric_precision_scale(self.flavor) 877 ) 878 879 df[col] = df[col].apply( 880 functools.partial( 881 serialize_decimal, 882 quantize=True, 883 precision=precision, 884 scale=scale, 885 ) 886 ) 887 888 stats['method'] = method.__name__ if hasattr(method, '__name__') else str(method) 889 890 default_chunksize = self._sys_config.get('chunksize', None) 891 chunksize = chunksize if chunksize != -1 else default_chunksize 892 if chunksize is not None and self.flavor in _max_chunks_flavors: 893 if chunksize > _max_chunks_flavors[self.flavor]: 894 if chunksize != default_chunksize: 895 warn( 896 f"The specified chunksize of {chunksize} exceeds the maximum of " 897 + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n" 898 + f" Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.", 899 stacklevel = 3, 900 ) 901 chunksize = _max_chunks_flavors[self.flavor] 902 stats['chunksize'] = chunksize 903 904 success, msg = False, "Default to_sql message" 905 start = time.perf_counter() 906 if debug: 907 msg = f"[{self}] Inserting {len(df)} rows with chunksize: {chunksize}..." 908 print(msg, end="", flush=True) 909 stats['num_rows'] = len(df) 910 911 ### Check if the name is too long. 912 truncated_name = truncate_item_name(name, self.flavor) 913 if name != truncated_name: 914 warn( 915 f"Table '{name}' is too long for '{self.flavor}'," 916 f" will instead create the table '{truncated_name}'." 917 ) 918 919 ### filter out non-pandas args 920 import inspect 921 to_sql_params = inspect.signature(df.to_sql).parameters 922 to_sql_kw = {} 923 for k, v in kw.items(): 924 if k in to_sql_params: 925 to_sql_kw[k] = v 926 927 to_sql_kw.update({ 928 'name': truncated_name, 929 'schema': schema, 930 ('con' if not is_dask else 'uri'): (self.engine if not is_dask else self.URI), 931 'index': index, 932 'if_exists': if_exists, 933 'method': method, 934 'chunksize': chunksize, 935 }) 936 if is_dask: 937 to_sql_kw.update({ 938 'parallel': True, 939 }) 940 elif _connection is not None: 941 to_sql_kw['con'] = _connection 942 943 if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else "" 944 if self.flavor == 'oracle': 945 ### For some reason 'replace' doesn't work properly in pandas, 946 ### so try dropping first. 947 if if_exists == 'replace' and table_exists(name, self, schema=schema, debug=debug): 948 success = self.exec( 949 f"DROP TABLE {if_exists_str}" + sql_item_name(name, 'oracle', schema) 950 ) is not None 951 if not success: 952 warn(f"Unable to drop {name}") 953 954 ### Enforce NVARCHAR(2000) as text instead of CLOB. 955 dtype = to_sql_kw.get('dtype', {}) 956 for col, typ in df.dtypes.items(): 957 if are_dtypes_equal(str(typ), 'object'): 958 dtype[col] = sqlalchemy.types.NVARCHAR(2000) 959 elif are_dtypes_equal(str(typ), 'int'): 960 dtype[col] = sqlalchemy.types.INTEGER 961 to_sql_kw['dtype'] = dtype 962 elif self.flavor == 'duckdb': 963 dtype = to_sql_kw.get('dtype', {}) 964 dt_cols = [col for col, typ in df.dtypes.items() if are_dtypes_equal(str(typ), 'datetime')] 965 for col in dt_cols: 966 df[col] = coerce_timezone(df[col], strip_utc=False) 967 elif self.flavor == 'mssql': 968 dtype = to_sql_kw.get('dtype', {}) 969 dt_cols = [col for col, typ in df.dtypes.items() if are_dtypes_equal(str(typ), 'datetime')] 970 new_dtype = {} 971 for col in dt_cols: 972 if col in dtype: 973 continue 974 dt_typ = get_db_type_from_pd_type(str(df.dtypes[col]), self.flavor, as_sqlalchemy=True) 975 if col not in dtype: 976 new_dtype[col] = dt_typ 977 978 dtype.update(new_dtype) 979 to_sql_kw['dtype'] = dtype 980 981 ### Check for JSON columns. 982 if self.flavor not in json_flavors: 983 json_cols = get_json_cols(df) 984 for col in json_cols: 985 df[col] = df[col].apply( 986 ( 987 lambda x: json.dumps(x, default=json_serialize_value, sort_keys=True) 988 if not isinstance(x, Hashable) 989 else x 990 ) 991 ) 992 993 if PD_TO_SQLALCHEMY_DTYPES_FLAVORS['uuid'].get(self.flavor, None) != 'Uuid': 994 uuid_cols = get_uuid_cols(df) 995 for col in uuid_cols: 996 df[col] = df[col].astype(str) 997 998 try: 999 with warnings.catch_warnings(): 1000 warnings.filterwarnings('ignore') 1001 df.to_sql(**to_sql_kw) 1002 success = True 1003 except Exception as e: 1004 if not silent: 1005 warn(str(e)) 1006 success, msg = False, str(e) 1007 1008 end = time.perf_counter() 1009 if success: 1010 num_rows = len(df) 1011 msg = ( 1012 f"It took {interval_str(timedelta(seconds=(end - start)))} " 1013 + f"to sync {num_rows:,} row" 1014 + ('s' if num_rows != 1 else '') 1015 + f" to {name}." 1016 ) 1017 stats['start'] = start 1018 stats['end'] = end 1019 stats['duration'] = end - start 1020 1021 if debug: 1022 print(" done.", flush=True) 1023 dprint(msg) 1024 1025 stats['success'] = success 1026 stats['msg'] = msg 1027 if as_tuple: 1028 return success, msg 1029 if as_dict: 1030 return stats 1031 return success
Upload a DataFrame's contents to the SQL server.
Parameters
- df (pd.DataFrame): The DataFrame to be inserted.
- name (str): The name of the table to be created.
- index (bool, default False): If True, creates the DataFrame's indices as columns.
- if_exists (str, default 'replace'): Drop and create the table ('replace') or append if it exists ('append') or raise Exception ('fail'). Options are ['replace', 'append', 'fail'].
- method (str, default ''): None or multi. Details on pandas.to_sql.
- chunksize (Optional[int], default -1): How many rows to insert at a time.
- schema (Optional[str], default None):
Optionally override the schema for the table.
Defaults to
SQLConnector.schema
. - safe_copy (bool, defaul True):
If
True
, copy the dataframe before making any changes. - as_tuple (bool, default False):
If
True
, return a (success_bool, message) tuple instead of abool
. Defaults toFalse
. - as_dict (bool, default False):
If
True
, return a dictionary of transaction information. The keys aresuccess
,msg
,start
,end
,duration
,num_rows
,chunksize
,method
, andtarget
. - kw (Any):
Additional arguments will be passed to the DataFrame's
to_sql
function
Returns
- Either a
bool
or aSuccessTuple
(depends onas_tuple
).
614def exec_queries( 615 self, 616 queries: List[ 617 Union[ 618 str, 619 Tuple[str, Callable[['sqlalchemy.orm.session.Session'], List[str]]] 620 ] 621 ], 622 break_on_error: bool = False, 623 rollback: bool = True, 624 silent: bool = False, 625 debug: bool = False, 626) -> List[Union[sqlalchemy.engine.cursor.CursorResult, None]]: 627 """ 628 Execute a list of queries in a single transaction. 629 630 Parameters 631 ---------- 632 queries: List[ 633 Union[ 634 str, 635 Tuple[str, Callable[[], List[str]]] 636 ] 637 ] 638 The queries in the transaction to be executed. 639 If a query is a tuple, the second item of the tuple 640 will be considered a callable hook that returns a list of queries to be executed 641 before the next item in the list. 642 643 break_on_error: bool, default False 644 If `True`, stop executing when a query fails. 645 646 rollback: bool, default True 647 If `break_on_error` is `True`, rollback the transaction if a query fails. 648 649 silent: bool, default False 650 If `True`, suppress warnings. 651 652 Returns 653 ------- 654 A list of SQLAlchemy results. 655 """ 656 from meerschaum.utils.warnings import warn 657 from meerschaum.utils.debug import dprint 658 from meerschaum.utils.packages import attempt_import 659 sqlalchemy, sqlalchemy_orm = attempt_import('sqlalchemy', 'sqlalchemy.orm', lazy=False) 660 session = sqlalchemy_orm.Session(self.engine) 661 662 result = None 663 results = [] 664 with session.begin(): 665 for query in queries: 666 hook = None 667 result = None 668 669 if isinstance(query, tuple): 670 query, hook = query 671 if isinstance(query, str): 672 query = sqlalchemy.text(query) 673 674 if debug: 675 dprint(f"[{self}]\n" + str(query)) 676 677 try: 678 result = session.execute(query) 679 session.flush() 680 except Exception as e: 681 msg = (f"Encountered error while executing:\n{e}") 682 if not silent: 683 warn(msg) 684 elif debug: 685 dprint(f"[{self}]\n" + str(msg)) 686 result = None 687 if result is None and break_on_error: 688 if rollback: 689 session.rollback() 690 results.append(result) 691 break 692 elif result is not None and hook is not None: 693 hook_queries = hook(session) 694 if hook_queries: 695 hook_results = self.exec_queries( 696 hook_queries, 697 break_on_error = break_on_error, 698 rollback=rollback, 699 silent=silent, 700 debug=debug, 701 ) 702 result = (result, hook_results) 703 704 results.append(result) 705 706 return results
Execute a list of queries in a single transaction.
Parameters
- queries (List[): Union[ str, Tuple[str, Callable[[], List[str]]] ]
- ]: The queries in the transaction to be executed. If a query is a tuple, the second item of the tuple will be considered a callable hook that returns a list of queries to be executed before the next item in the list.
- break_on_error (bool, default False):
If
True
, stop executing when a query fails. - rollback (bool, default True):
If
break_on_error
isTrue
, rollback the transaction if a query fails. - silent (bool, default False):
If
True
, suppress warnings.
Returns
- A list of SQLAlchemy results.
1214def get_connection(self, rebuild: bool = False) -> 'sqlalchemy.engine.base.Connection': 1215 """ 1216 Return the current alive connection. 1217 1218 Parameters 1219 ---------- 1220 rebuild: bool, default False 1221 If `True`, close the previous connection and open a new one. 1222 1223 Returns 1224 ------- 1225 A `sqlalchemy.engine.base.Connection` object. 1226 """ 1227 import threading 1228 if '_thread_connections' not in self.__dict__: 1229 self.__dict__['_thread_connections'] = {} 1230 1231 self._cleanup_connections() 1232 1233 thread_id = threading.get_ident() 1234 1235 thread_connections = self.__dict__.get('_thread_connections', {}) 1236 connection = thread_connections.get(thread_id, None) 1237 1238 if rebuild and connection is not None: 1239 try: 1240 connection.close() 1241 except Exception: 1242 pass 1243 1244 _ = thread_connections.pop(thread_id, None) 1245 connection = None 1246 1247 if connection is None or connection.closed: 1248 connection = self.engine.connect() 1249 thread_connections[thread_id] = connection 1250 1251 return connection
Return the current alive connection.
Parameters
- rebuild (bool, default False):
If
True
, close the previous connection and open a new one.
Returns
- A
sqlalchemy.engine.base.Connection
object.
707def test_connection( 708 self, 709 **kw: Any 710) -> Union[bool, None]: 711 """ 712 Test if a successful connection to the database may be made. 713 714 Parameters 715 ---------- 716 **kw: 717 The keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`. 718 719 Returns 720 ------- 721 `True` if a connection is made, otherwise `False` or `None` in case of failure. 722 723 """ 724 import warnings 725 from meerschaum.connectors.poll import retry_connect 726 _default_kw = {'max_retries': 1, 'retry_wait': 0, 'warn': False, 'connector': self} 727 _default_kw.update(kw) 728 with warnings.catch_warnings(): 729 warnings.filterwarnings('ignore', 'Could not') 730 try: 731 return retry_connect(**_default_kw) 732 except Exception: 733 return False
Test if a successful connection to the database may be made.
Parameters
- **kw:: The keyword arguments are passed to
meerschaum.connectors.poll.retry_connect
.
Returns
True
if a connection is made, otherwiseFalse
orNone
in case of failure.
18def fetch( 19 self, 20 pipe: mrsm.Pipe, 21 begin: Union[datetime, int, str, None] = '', 22 end: Union[datetime, int, str, None] = None, 23 check_existing: bool = True, 24 chunksize: Optional[int] = -1, 25 workers: Optional[int] = None, 26 debug: bool = False, 27 **kw: Any 28) -> Union['pd.DataFrame', List[Any], None]: 29 """Execute the SQL definition and return a Pandas DataFrame. 30 31 Parameters 32 ---------- 33 pipe: mrsm.Pipe 34 The pipe object which contains the `fetch` metadata. 35 36 - pipe.columns['datetime']: str 37 - Name of the datetime column for the remote table. 38 - pipe.parameters['fetch']: Dict[str, Any] 39 - Parameters necessary to execute a query. 40 - pipe.parameters['fetch']['definition']: str 41 - Raw SQL query to execute to generate the pandas DataFrame. 42 - pipe.parameters['fetch']['backtrack_minutes']: Union[int, float] 43 - How many minutes before `begin` to search for data (*optional*). 44 45 begin: Union[datetime, int, str, None], default None 46 Most recent datatime to search for data. 47 If `backtrack_minutes` is provided, subtract `backtrack_minutes`. 48 49 end: Union[datetime, int, str, None], default None 50 The latest datetime to search for data. 51 If `end` is `None`, do not bound 52 53 check_existing: bool, defult True 54 If `False`, use a backtrack interval of 0 minutes. 55 56 chunksize: Optional[int], default -1 57 How many rows to load into memory at once. 58 Otherwise the entire result set is loaded into memory. 59 60 workers: Optional[int], default None 61 How many threads to use when consuming the generator. 62 Defaults to the number of cores. 63 64 debug: bool, default False 65 Verbosity toggle. 66 67 Returns 68 ------- 69 A pandas DataFrame generator. 70 """ 71 meta_def = self.get_pipe_metadef( 72 pipe, 73 begin=begin, 74 end=end, 75 check_existing=check_existing, 76 debug=debug, 77 **kw 78 ) 79 chunks = self.read( 80 meta_def, 81 chunksize=chunksize, 82 workers=workers, 83 as_iterator=True, 84 debug=debug, 85 ) 86 return chunks
Execute the SQL definition and return a Pandas DataFrame.
Parameters
pipe (mrsm.Pipe): The pipe object which contains the
fetch
metadata.- pipe.columns['datetime']: str
- Name of the datetime column for the remote table.
- pipe.parameters['fetch']: Dict[str, Any]
- Parameters necessary to execute a query.
- pipe.parameters['fetch']['definition']: str
- Raw SQL query to execute to generate the pandas DataFrame.
- pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
- How many minutes before
begin
to search for data (optional).
- How many minutes before
- pipe.columns['datetime']: str
- begin (Union[datetime, int, str, None], default None):
Most recent datatime to search for data.
If
backtrack_minutes
is provided, subtractbacktrack_minutes
. - end (Union[datetime, int, str, None], default None):
The latest datetime to search for data.
If
end
isNone
, do not bound - check_existing (bool, defult True):
If
False
, use a backtrack interval of 0 minutes. - chunksize (Optional[int], default -1): How many rows to load into memory at once. Otherwise the entire result set is loaded into memory.
- workers (Optional[int], default None): How many threads to use when consuming the generator. Defaults to the number of cores.
- debug (bool, default False): Verbosity toggle.
Returns
- A pandas DataFrame generator.
89def get_pipe_metadef( 90 self, 91 pipe: mrsm.Pipe, 92 params: Optional[Dict[str, Any]] = None, 93 begin: Union[datetime, int, str, None] = '', 94 end: Union[datetime, int, str, None] = None, 95 check_existing: bool = True, 96 debug: bool = False, 97 **kw: Any 98) -> Union[str, None]: 99 """ 100 Return a pipe's meta definition fetch query. 101 102 params: Optional[Dict[str, Any]], default None 103 Optional params dictionary to build the `WHERE` clause. 104 See `meerschaum.utils.sql.build_where`. 105 106 begin: Union[datetime, int, str, None], default None 107 Most recent datatime to search for data. 108 If `backtrack_minutes` is provided, subtract `backtrack_minutes`. 109 110 end: Union[datetime, int, str, None], default None 111 The latest datetime to search for data. 112 If `end` is `None`, do not bound 113 114 check_existing: bool, default True 115 If `True`, apply the backtrack interval. 116 117 debug: bool, default False 118 Verbosity toggle. 119 120 Returns 121 ------- 122 A pipe's meta definition fetch query string. 123 """ 124 from meerschaum.utils.warnings import warn 125 from meerschaum.utils.sql import sql_item_name, dateadd_str, build_where 126 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 127 from meerschaum.config import get_config 128 129 dt_col = pipe.columns.get('datetime', None) 130 if not dt_col: 131 dt_col = pipe.guess_datetime() 132 dt_name = sql_item_name(dt_col, self.flavor, None) if dt_col else None 133 is_guess = True 134 else: 135 dt_name = sql_item_name(dt_col, self.flavor, None) 136 is_guess = False 137 dt_typ = pipe.dtypes.get(dt_col, 'datetime') if dt_col else None 138 db_dt_typ = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None 139 140 if begin not in (None, '') or end is not None: 141 if is_guess: 142 if dt_col is None: 143 warn( 144 f"Unable to determine a datetime column for {pipe}." 145 + "\n Ignoring begin and end...", 146 stack=False, 147 ) 148 begin, end = '', None 149 else: 150 warn( 151 f"A datetime wasn't specified for {pipe}.\n" 152 + f" Using column \"{dt_col}\" for datetime bounds...", 153 stack=False 154 ) 155 156 apply_backtrack = begin == '' and check_existing 157 backtrack_interval = pipe.get_backtrack_interval(check_existing=check_existing, debug=debug) 158 btm = ( 159 int(backtrack_interval.total_seconds() / 60) 160 if isinstance(backtrack_interval, timedelta) 161 else backtrack_interval 162 ) 163 begin = ( 164 pipe.get_sync_time(debug=debug) 165 if begin == '' 166 else begin 167 ) 168 169 if begin not in (None, '') and end is not None and begin >= end: 170 begin = None 171 172 if dt_name: 173 begin_da = ( 174 dateadd_str( 175 flavor=self.flavor, 176 datepart='minute', 177 number=((-1 * btm) if apply_backtrack else 0), 178 begin=begin, 179 db_type=db_dt_typ, 180 ) 181 if begin not in ('', None) 182 else None 183 ) 184 end_da = ( 185 dateadd_str( 186 flavor=self.flavor, 187 datepart='minute', 188 number=0, 189 begin=end, 190 db_type=db_dt_typ, 191 ) 192 if end is not None 193 else None 194 ) 195 196 definition_name = sql_item_name('definition', self.flavor, None) 197 meta_def = ( 198 _simple_fetch_query(pipe, self.flavor) if ( 199 (not (pipe.columns or {}).get('id', None)) 200 or (not get_config('system', 'experimental', 'join_fetch')) 201 ) else _join_fetch_query(pipe, self.flavor, debug=debug, **kw) 202 ) 203 204 has_where = 'where' in meta_def.lower()[meta_def.lower().rfind('definition'):] 205 if dt_name and (begin_da or end_da): 206 definition_dt_name = f"{definition_name}.{dt_name}" 207 meta_def += "\n" + ("AND" if has_where else "WHERE") + " " 208 has_where = True 209 if begin_da: 210 meta_def += f"\n {definition_dt_name}\n >=\n {begin_da}\n" 211 if begin_da and end_da: 212 meta_def += " AND" 213 if end_da: 214 meta_def += f"\n {definition_dt_name}\n <\n {end_da}\n" 215 216 if params is not None: 217 params_where = build_where(params, self, with_where=False) 218 meta_def += "\n " + ("AND" if has_where else "WHERE") + " " 219 has_where = True 220 meta_def += params_where 221 222 return meta_def.rstrip()
Return a pipe's meta definition fetch query.
params: Optional[Dict[str, Any]], default None
Optional params dictionary to build the WHERE
clause.
See meerschaum.utils.sql.build_where
.
begin: Union[datetime, int, str, None], default None
Most recent datatime to search for data.
If backtrack_minutes
is provided, subtract backtrack_minutes
.
end: Union[datetime, int, str, None], default None
The latest datetime to search for data.
If end
is None
, do not bound
check_existing: bool, default True
If True
, apply the backtrack interval.
debug: bool, default False Verbosity toggle.
Returns
- A pipe's meta definition fetch query string.
36def cli( 37 self, 38 debug: bool = False, 39) -> SuccessTuple: 40 """ 41 Launch a subprocess for an interactive CLI. 42 """ 43 from meerschaum.utils.warnings import dprint 44 from meerschaum.utils.venv import venv_exec 45 env = copy.deepcopy(dict(os.environ)) 46 env_key = f"MRSM_SQL_{self.label.upper()}" 47 env_val = json.dumps(self.meta) 48 env[env_key] = env_val 49 cli_code = ( 50 "import sys\n" 51 "import meerschaum as mrsm\n" 52 "import os\n" 53 f"conn = mrsm.get_connector('sql:{self.label}')\n" 54 "success, msg = conn._cli_exit()\n" 55 "mrsm.pprint((success, msg))\n" 56 "if not success:\n" 57 " raise Exception(msg)" 58 ) 59 if debug: 60 dprint(cli_code) 61 try: 62 _ = venv_exec(cli_code, venv=None, env=env, debug=debug, capture_output=False) 63 except Exception as e: 64 return False, f"[{self}] Failed to start CLI:\n{e}" 65 return True, "Success"
Launch a subprocess for an interactive CLI.
143def fetch_pipes_keys( 144 self, 145 connector_keys: Optional[List[str]] = None, 146 metric_keys: Optional[List[str]] = None, 147 location_keys: Optional[List[str]] = None, 148 tags: Optional[List[str]] = None, 149 params: Optional[Dict[str, Any]] = None, 150 debug: bool = False 151) -> Optional[List[Tuple[str, str, Optional[str]]]]: 152 """ 153 Return a list of tuples corresponding to the parameters provided. 154 155 Parameters 156 ---------- 157 connector_keys: Optional[List[str]], default None 158 List of connector_keys to search by. 159 160 metric_keys: Optional[List[str]], default None 161 List of metric_keys to search by. 162 163 location_keys: Optional[List[str]], default None 164 List of location_keys to search by. 165 166 params: Optional[Dict[str, Any]], default None 167 Dictionary of additional parameters to search by. 168 E.g. `--params pipe_id:1` 169 170 debug: bool, default False 171 Verbosity toggle. 172 """ 173 from meerschaum.utils.debug import dprint 174 from meerschaum.utils.packages import attempt_import 175 from meerschaum.utils.misc import separate_negation_values 176 from meerschaum.utils.sql import OMIT_NULLSFIRST_FLAVORS, table_exists 177 from meerschaum.config.static import STATIC_CONFIG 178 import json 179 from copy import deepcopy 180 sqlalchemy, sqlalchemy_sql_functions = attempt_import( 181 'sqlalchemy', 182 'sqlalchemy.sql.functions', lazy=False, 183 ) 184 coalesce = sqlalchemy_sql_functions.coalesce 185 186 if connector_keys is None: 187 connector_keys = [] 188 if metric_keys is None: 189 metric_keys = [] 190 if location_keys is None: 191 location_keys = [] 192 else: 193 location_keys = [ 194 ( 195 lk 196 if lk not in ('[None]', 'None', 'null') 197 else 'None' 198 ) 199 for lk in location_keys 200 ] 201 if tags is None: 202 tags = [] 203 204 if params is None: 205 params = {} 206 207 ### Add three primary keys to params dictionary 208 ### (separated for convenience of arguments). 209 cols = { 210 'connector_keys': [str(ck) for ck in connector_keys], 211 'metric_key': [str(mk) for mk in metric_keys], 212 'location_key': [str(lk) for lk in location_keys], 213 } 214 215 ### Make deep copy so we don't mutate this somewhere else. 216 parameters = deepcopy(params) 217 for col, vals in cols.items(): 218 if vals not in [[], ['*']]: 219 parameters[col] = vals 220 221 if not table_exists('mrsm_pipes', self, schema=self.instance_schema, debug=debug): 222 return [] 223 224 from meerschaum.connectors.sql.tables import get_tables 225 pipes_tbl = get_tables(mrsm_instance=self, create=False, debug=debug)['pipes'] 226 227 _params = {} 228 for k, v in parameters.items(): 229 _v = json.dumps(v) if isinstance(v, dict) else v 230 _params[k] = _v 231 232 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 233 ### Parse regular params. 234 ### If a param begins with '_', negate it instead. 235 _where = [ 236 ( 237 (coalesce(pipes_tbl.c[key], 'None') == val) 238 if not str(val).startswith(negation_prefix) 239 else (pipes_tbl.c[key] != key) 240 ) for key, val in _params.items() 241 if not isinstance(val, (list, tuple)) and key in pipes_tbl.c 242 ] 243 select_cols = ( 244 [ 245 pipes_tbl.c.connector_keys, 246 pipes_tbl.c.metric_key, 247 pipes_tbl.c.location_key, 248 ] 249 ) 250 251 q = sqlalchemy.select(*select_cols).where(sqlalchemy.and_(True, *_where)) 252 for c, vals in cols.items(): 253 if not isinstance(vals, (list, tuple)) or not vals or c not in pipes_tbl.c: 254 continue 255 _in_vals, _ex_vals = separate_negation_values(vals) 256 q = q.where(coalesce(pipes_tbl.c[c], 'None').in_(_in_vals)) if _in_vals else q 257 q = q.where(coalesce(pipes_tbl.c[c], 'None').not_in(_ex_vals)) if _ex_vals else q 258 259 ### Finally, parse tags. 260 tag_groups = [tag.split(',') for tag in tags] 261 in_ex_tag_groups = [separate_negation_values(tag_group) for tag_group in tag_groups] 262 263 ors, nands = [], [] 264 for _in_tags, _ex_tags in in_ex_tag_groups: 265 sub_ands = [] 266 for nt in _in_tags: 267 sub_ands.append( 268 sqlalchemy.cast( 269 pipes_tbl.c['parameters'], 270 sqlalchemy.String, 271 ).like(f'%"tags":%"{nt}"%') 272 ) 273 if sub_ands: 274 ors.append(sqlalchemy.and_(*sub_ands)) 275 276 for xt in _ex_tags: 277 nands.append( 278 sqlalchemy.cast( 279 pipes_tbl.c['parameters'], 280 sqlalchemy.String, 281 ).not_like(f'%"tags":%"{xt}"%') 282 ) 283 284 q = q.where(sqlalchemy.and_(*nands)) if nands else q 285 q = q.where(sqlalchemy.or_(*ors)) if ors else q 286 loc_asc = sqlalchemy.asc(pipes_tbl.c['location_key']) 287 if self.flavor not in OMIT_NULLSFIRST_FLAVORS: 288 loc_asc = sqlalchemy.nullsfirst(loc_asc) 289 q = q.order_by( 290 sqlalchemy.asc(pipes_tbl.c['connector_keys']), 291 sqlalchemy.asc(pipes_tbl.c['metric_key']), 292 loc_asc, 293 ) 294 295 ### execute the query and return a list of tuples 296 if debug: 297 dprint(q.compile(compile_kwargs={'literal_binds': True})) 298 try: 299 rows = ( 300 self.execute(q).fetchall() 301 if self.flavor != 'duckdb' 302 else [ 303 (row['connector_keys'], row['metric_key'], row['location_key']) 304 for row in self.read(q).to_dict(orient='records') 305 ] 306 ) 307 except Exception as e: 308 error(str(e)) 309 310 return [(row[0], row[1], row[2]) for row in rows]
Return a list of tuples corresponding to the parameters provided.
Parameters
- connector_keys (Optional[List[str]], default None): List of connector_keys to search by.
- metric_keys (Optional[List[str]], default None): List of metric_keys to search by.
- location_keys (Optional[List[str]], default None): List of location_keys to search by.
- params (Optional[Dict[str, Any]], default None):
Dictionary of additional parameters to search by.
E.g.
--params pipe_id:1
- debug (bool, default False): Verbosity toggle.
331def create_indices( 332 self, 333 pipe: mrsm.Pipe, 334 columns: Optional[List[str]] = None, 335 indices: Optional[List[str]] = None, 336 debug: bool = False 337) -> bool: 338 """ 339 Create a pipe's indices. 340 """ 341 from meerschaum.utils.debug import dprint 342 if debug: 343 dprint(f"Creating indices for {pipe}...") 344 345 if not pipe.indices: 346 warn(f"{pipe} has no index columns; skipping index creation.", stack=False) 347 return True 348 349 cols_to_include = set((columns or []) + (indices or [])) or None 350 351 _ = pipe.__dict__.pop('_columns_indices', None) 352 ix_queries = { 353 col: queries 354 for col, queries in self.get_create_index_queries(pipe, debug=debug).items() 355 if cols_to_include is None or col in cols_to_include 356 } 357 success = True 358 for col, queries in ix_queries.items(): 359 ix_success = all(self.exec_queries(queries, debug=debug, silent=False)) 360 success = success and ix_success 361 if not ix_success: 362 warn(f"Failed to create index on column: {col}") 363 364 return success
Create a pipe's indices.
385def drop_indices( 386 self, 387 pipe: mrsm.Pipe, 388 columns: Optional[List[str]] = None, 389 indices: Optional[List[str]] = None, 390 debug: bool = False 391) -> bool: 392 """ 393 Drop a pipe's indices. 394 """ 395 from meerschaum.utils.debug import dprint 396 if debug: 397 dprint(f"Dropping indices for {pipe}...") 398 399 if not pipe.indices: 400 warn(f"No indices to drop for {pipe}.", stack=False) 401 return False 402 403 cols_to_include = set((columns or []) + (indices or [])) or None 404 405 ix_queries = { 406 col: queries 407 for col, queries in self.get_drop_index_queries(pipe, debug=debug).items() 408 if cols_to_include is None or col in cols_to_include 409 } 410 success = True 411 for col, queries in ix_queries.items(): 412 ix_success = all(self.exec_queries(queries, debug=debug, silent=(not debug))) 413 if not ix_success: 414 success = False 415 if debug: 416 dprint(f"Failed to drop index on column: {col}") 417 return success
Drop a pipe's indices.
473def get_create_index_queries( 474 self, 475 pipe: mrsm.Pipe, 476 debug: bool = False, 477) -> Dict[str, List[str]]: 478 """ 479 Return a dictionary mapping columns to a `CREATE INDEX` or equivalent query. 480 481 Parameters 482 ---------- 483 pipe: mrsm.Pipe 484 The pipe to which the queries will correspond. 485 486 Returns 487 ------- 488 A dictionary of index names mapping to lists of queries. 489 """ 490 ### NOTE: Due to recent breaking changes in DuckDB, indices don't behave properly. 491 if self.flavor == 'duckdb': 492 return {} 493 from meerschaum.utils.sql import ( 494 sql_item_name, 495 get_distinct_col_count, 496 UPDATE_QUERIES, 497 get_null_replacement, 498 get_create_table_queries, 499 get_rename_table_queries, 500 COALESCE_UNIQUE_INDEX_FLAVORS, 501 ) 502 from meerschaum.utils.dtypes.sql import ( 503 get_db_type_from_pd_type, 504 get_pd_type_from_db_type, 505 AUTO_INCREMENT_COLUMN_FLAVORS, 506 ) 507 from meerschaum.config import get_config 508 index_queries = {} 509 510 upsert = pipe.parameters.get('upsert', False) and (self.flavor + '-upsert') in UPDATE_QUERIES 511 static = pipe.parameters.get('static', False) 512 null_indices = pipe.parameters.get('null_indices', True) 513 index_names = pipe.get_indices() 514 unique_index_name_unquoted = index_names.get('unique', None) or f'IX_{pipe.target}_unique' 515 if upsert: 516 _ = index_names.pop('unique', None) 517 indices = pipe.indices 518 existing_cols_types = pipe.get_columns_types(debug=debug) 519 existing_cols_pd_types = { 520 col: get_pd_type_from_db_type(typ) 521 for col, typ in existing_cols_types.items() 522 } 523 existing_cols_indices = self.get_pipe_columns_indices(pipe, debug=debug) 524 existing_ix_names = set() 525 existing_primary_keys = [] 526 existing_clustered_primary_keys = [] 527 for col, col_indices in existing_cols_indices.items(): 528 for col_ix_doc in col_indices: 529 existing_ix_names.add(col_ix_doc.get('name', '').lower()) 530 if col_ix_doc.get('type', None) == 'PRIMARY KEY': 531 existing_primary_keys.append(col.lower()) 532 if col_ix_doc.get('clustered', True): 533 existing_clustered_primary_keys.append(col.lower()) 534 535 _datetime = pipe.get_columns('datetime', error=False) 536 _datetime_name = ( 537 sql_item_name(_datetime, self.flavor, None) 538 if _datetime is not None else None 539 ) 540 _datetime_index_name = ( 541 sql_item_name(index_names['datetime'], flavor=self.flavor, schema=None) 542 if index_names.get('datetime', None) 543 else None 544 ) 545 _id = pipe.get_columns('id', error=False) 546 _id_name = ( 547 sql_item_name(_id, self.flavor, None) 548 if _id is not None 549 else None 550 ) 551 primary_key = pipe.columns.get('primary', None) 552 primary_key_name = ( 553 sql_item_name(primary_key, flavor=self.flavor, schema=None) 554 if primary_key 555 else None 556 ) 557 autoincrement = ( 558 pipe.parameters.get('autoincrement', False) 559 or ( 560 primary_key is not None 561 and primary_key not in existing_cols_pd_types 562 ) 563 ) 564 primary_key_db_type = ( 565 get_db_type_from_pd_type(pipe.dtypes.get(primary_key, 'int') or 'int', self.flavor) 566 if primary_key 567 else None 568 ) 569 primary_key_constraint_name = ( 570 sql_item_name(f'PK_{pipe.target}', self.flavor, None) 571 if primary_key is not None 572 else None 573 ) 574 primary_key_clustered = "CLUSTERED" if _datetime is None else "NONCLUSTERED" 575 datetime_clustered = ( 576 "CLUSTERED" 577 if not existing_clustered_primary_keys and _datetime is not None 578 else "NONCLUSTERED" 579 ) 580 include_columns_str = "\n ,".join( 581 [ 582 sql_item_name(col, flavor=self.flavor) for col in existing_cols_types 583 if col != _datetime 584 ] 585 ).rstrip(',') 586 include_clause = ( 587 ( 588 f"\nINCLUDE (\n {include_columns_str}\n)" 589 ) 590 if datetime_clustered == 'NONCLUSTERED' 591 else '' 592 ) 593 594 _id_index_name = ( 595 sql_item_name(index_names['id'], self.flavor, None) 596 if index_names.get('id', None) 597 else None 598 ) 599 _pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 600 _create_space_partition = get_config('system', 'experimental', 'space') 601 602 ### create datetime index 603 dt_query = None 604 if _datetime is not None: 605 if self.flavor == 'timescaledb' and pipe.parameters.get('hypertable', True): 606 _id_count = ( 607 get_distinct_col_count(_id, f"SELECT {_id_name} FROM {_pipe_name}", self) 608 if (_id is not None and _create_space_partition) else None 609 ) 610 611 chunk_interval = pipe.get_chunk_interval(debug=debug) 612 chunk_interval_minutes = ( 613 chunk_interval 614 if isinstance(chunk_interval, int) 615 else int(chunk_interval.total_seconds() / 60) 616 ) 617 chunk_time_interval = ( 618 f"INTERVAL '{chunk_interval_minutes} MINUTES'" 619 if isinstance(chunk_interval, timedelta) 620 else f'{chunk_interval_minutes}' 621 ) 622 623 dt_query = ( 624 f"SELECT public.create_hypertable('{_pipe_name}', " + 625 f"'{_datetime}', " 626 + ( 627 f"'{_id}', {_id_count}, " if (_id is not None and _create_space_partition) 628 else '' 629 ) 630 + f'chunk_time_interval => {chunk_time_interval}, ' 631 + 'if_not_exists => true, ' 632 + "migrate_data => true);" 633 ) 634 elif _datetime_index_name and _datetime != primary_key: 635 if self.flavor == 'mssql': 636 dt_query = ( 637 f"CREATE {datetime_clustered} INDEX {_datetime_index_name} " 638 f"\nON {_pipe_name} ({_datetime_name}){include_clause}" 639 ) 640 else: 641 dt_query = ( 642 f"CREATE INDEX {_datetime_index_name} " 643 + f"ON {_pipe_name} ({_datetime_name})" 644 ) 645 646 if dt_query: 647 index_queries[_datetime] = [dt_query] 648 649 primary_queries = [] 650 if ( 651 primary_key is not None 652 and primary_key.lower() not in existing_primary_keys 653 and not static 654 ): 655 if autoincrement and primary_key not in existing_cols_pd_types: 656 autoincrement_str = AUTO_INCREMENT_COLUMN_FLAVORS.get( 657 self.flavor, 658 AUTO_INCREMENT_COLUMN_FLAVORS['default'] 659 ) 660 primary_queries.extend([ 661 ( 662 f"ALTER TABLE {_pipe_name}\n" 663 f"ADD {primary_key_name} {primary_key_db_type} {autoincrement_str}" 664 ), 665 ]) 666 elif not autoincrement and primary_key in existing_cols_pd_types: 667 if self.flavor == 'sqlite': 668 new_table_name = sql_item_name( 669 f'_new_{pipe.target}', 670 self.flavor, 671 self.get_pipe_schema(pipe) 672 ) 673 select_cols_str = ', '.join( 674 [ 675 sql_item_name(col, self.flavor, None) 676 for col in existing_cols_types 677 ] 678 ) 679 primary_queries.extend( 680 get_create_table_queries( 681 existing_cols_pd_types, 682 f'_new_{pipe.target}', 683 self.flavor, 684 schema=self.get_pipe_schema(pipe), 685 primary_key=primary_key, 686 ) + [ 687 ( 688 f"INSERT INTO {new_table_name} ({select_cols_str})\n" 689 f"SELECT {select_cols_str}\nFROM {_pipe_name}" 690 ), 691 f"DROP TABLE {_pipe_name}", 692 ] + get_rename_table_queries( 693 f'_new_{pipe.target}', 694 pipe.target, 695 self.flavor, 696 schema=self.get_pipe_schema(pipe), 697 ) 698 ) 699 elif self.flavor == 'oracle': 700 primary_queries.extend([ 701 ( 702 f"ALTER TABLE {_pipe_name}\n" 703 f"MODIFY {primary_key_name} NOT NULL" 704 ), 705 ( 706 f"ALTER TABLE {_pipe_name}\n" 707 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})" 708 ) 709 ]) 710 elif self.flavor in ('mysql', 'mariadb'): 711 primary_queries.extend([ 712 ( 713 f"ALTER TABLE {_pipe_name}\n" 714 f"MODIFY {primary_key_name} {primary_key_db_type} NOT NULL" 715 ), 716 ( 717 f"ALTER TABLE {_pipe_name}\n" 718 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})" 719 ) 720 ]) 721 elif self.flavor == 'timescaledb': 722 primary_queries.extend([ 723 ( 724 f"ALTER TABLE {_pipe_name}\n" 725 f"ALTER COLUMN {primary_key_name} SET NOT NULL" 726 ), 727 ( 728 f"ALTER TABLE {_pipe_name}\n" 729 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY (" + ( 730 f"{_datetime_name}, " if _datetime_name else "" 731 ) + f"{primary_key_name})" 732 ), 733 ]) 734 elif self.flavor in ('citus', 'postgresql', 'duckdb'): 735 primary_queries.extend([ 736 ( 737 f"ALTER TABLE {_pipe_name}\n" 738 f"ALTER COLUMN {primary_key_name} SET NOT NULL" 739 ), 740 ( 741 f"ALTER TABLE {_pipe_name}\n" 742 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})" 743 ), 744 ]) 745 else: 746 primary_queries.extend([ 747 ( 748 f"ALTER TABLE {_pipe_name}\n" 749 f"ALTER COLUMN {primary_key_name} {primary_key_db_type} NOT NULL" 750 ), 751 ( 752 f"ALTER TABLE {_pipe_name}\n" 753 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY {primary_key_clustered} ({primary_key_name})" 754 ), 755 ]) 756 index_queries[primary_key] = primary_queries 757 758 ### create id index 759 if _id_name is not None: 760 if self.flavor == 'timescaledb': 761 ### Already created indices via create_hypertable. 762 id_query = ( 763 None if (_id is not None and _create_space_partition) 764 else ( 765 f"CREATE INDEX IF NOT EXISTS {_id_index_name} ON {_pipe_name} ({_id_name})" 766 if _id is not None 767 else None 768 ) 769 ) 770 pass 771 else: ### mssql, sqlite, etc. 772 id_query = f"CREATE INDEX {_id_index_name} ON {_pipe_name} ({_id_name})" 773 774 if id_query is not None: 775 index_queries[_id] = id_query if isinstance(id_query, list) else [id_query] 776 777 ### Create indices for other labels in `pipe.columns`. 778 other_index_names = { 779 ix_key: ix_unquoted 780 for ix_key, ix_unquoted in index_names.items() 781 if ( 782 ix_key not in ('datetime', 'id', 'primary') 783 and ix_unquoted.lower() not in existing_ix_names 784 ) 785 } 786 for ix_key, ix_unquoted in other_index_names.items(): 787 ix_name = sql_item_name(ix_unquoted, self.flavor, None) 788 cols = indices[ix_key] 789 if not isinstance(cols, (list, tuple)): 790 cols = [cols] 791 if ix_key == 'unique' and upsert: 792 continue 793 cols_names = [sql_item_name(col, self.flavor, None) for col in cols if col] 794 if not cols_names: 795 continue 796 cols_names_str = ", ".join(cols_names) 797 index_queries[ix_key] = [f"CREATE INDEX {ix_name} ON {_pipe_name} ({cols_names_str})"] 798 799 indices_cols_str = ', '.join( 800 list({ 801 sql_item_name(ix, self.flavor) 802 for ix_key, ix in pipe.columns.items() 803 if ix and ix in existing_cols_types 804 }) 805 ) 806 coalesce_indices_cols_str = ', '.join( 807 [ 808 ( 809 ( 810 "COALESCE(" 811 + sql_item_name(ix, self.flavor) 812 + ", " 813 + get_null_replacement(existing_cols_types[ix], self.flavor) 814 + ") " 815 ) 816 if ix_key != 'datetime' and null_indices 817 else sql_item_name(ix, self.flavor) 818 ) 819 for ix_key, ix in pipe.columns.items() 820 if ix and ix in existing_cols_types 821 ] 822 ) 823 unique_index_name = sql_item_name(unique_index_name_unquoted, self.flavor) 824 constraint_name_unquoted = unique_index_name_unquoted.replace('IX_', 'UQ_') 825 constraint_name = sql_item_name(constraint_name_unquoted, self.flavor) 826 add_constraint_query = ( 827 f"ALTER TABLE {_pipe_name} ADD CONSTRAINT {constraint_name} UNIQUE ({indices_cols_str})" 828 ) 829 unique_index_cols_str = ( 830 indices_cols_str 831 if self.flavor not in COALESCE_UNIQUE_INDEX_FLAVORS or not null_indices 832 else coalesce_indices_cols_str 833 ) 834 create_unique_index_query = ( 835 f"CREATE UNIQUE INDEX {unique_index_name} ON {_pipe_name} ({unique_index_cols_str})" 836 ) 837 constraint_queries = [create_unique_index_query] 838 if self.flavor != 'sqlite': 839 constraint_queries.append(add_constraint_query) 840 if upsert and indices_cols_str: 841 index_queries[unique_index_name] = constraint_queries 842 return index_queries
Return a dictionary mapping columns to a CREATE INDEX
or equivalent query.
Parameters
- pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
- A dictionary of index names mapping to lists of queries.
845def get_drop_index_queries( 846 self, 847 pipe: mrsm.Pipe, 848 debug: bool = False, 849) -> Dict[str, List[str]]: 850 """ 851 Return a dictionary mapping columns to a `DROP INDEX` or equivalent query. 852 853 Parameters 854 ---------- 855 pipe: mrsm.Pipe 856 The pipe to which the queries will correspond. 857 858 Returns 859 ------- 860 A dictionary of column names mapping to lists of queries. 861 """ 862 ### NOTE: Due to breaking changes within DuckDB, indices must be skipped. 863 if self.flavor == 'duckdb': 864 return {} 865 if not pipe.exists(debug=debug): 866 return {} 867 868 from collections import defaultdict 869 from meerschaum.utils.sql import ( 870 sql_item_name, 871 table_exists, 872 hypertable_queries, 873 DROP_INDEX_IF_EXISTS_FLAVORS, 874 ) 875 drop_queries = defaultdict(lambda: []) 876 schema = self.get_pipe_schema(pipe) 877 index_schema = schema if self.flavor != 'mssql' else None 878 indices = { 879 ix_key: ix 880 for ix_key, ix in pipe.get_indices().items() 881 } 882 cols_indices = pipe.get_columns_indices(debug=debug) 883 existing_indices = set() 884 clustered_ix = None 885 for col, ix_metas in cols_indices.items(): 886 for ix_meta in ix_metas: 887 ix_name = ix_meta.get('name', None) 888 if ix_meta.get('clustered', False): 889 clustered_ix = ix_name 890 existing_indices.add(ix_name.lower()) 891 pipe_name = sql_item_name(pipe.target, self.flavor, schema) 892 pipe_name_no_schema = sql_item_name(pipe.target, self.flavor, None) 893 upsert = pipe.upsert 894 895 if self.flavor not in hypertable_queries: 896 is_hypertable = False 897 else: 898 is_hypertable_query = hypertable_queries[self.flavor].format(table_name=pipe_name) 899 is_hypertable = self.value(is_hypertable_query, silent=True, debug=debug) is not None 900 901 if_exists_str = "IF EXISTS " if self.flavor in DROP_INDEX_IF_EXISTS_FLAVORS else "" 902 if is_hypertable: 903 nuke_queries = [] 904 temp_table = '_' + pipe.target + '_temp_migration' 905 temp_table_name = sql_item_name(temp_table, self.flavor, self.get_pipe_schema(pipe)) 906 907 if table_exists(temp_table, self, schema=self.get_pipe_schema(pipe), debug=debug): 908 nuke_queries.append(f"DROP TABLE {if_exists_str} {temp_table_name}") 909 nuke_queries += [ 910 f"SELECT * INTO {temp_table_name} FROM {pipe_name}", 911 f"DROP TABLE {if_exists_str}{pipe_name}", 912 f"ALTER TABLE {temp_table_name} RENAME TO {pipe_name_no_schema}", 913 ] 914 nuke_ix_keys = ('datetime', 'id') 915 nuked = False 916 for ix_key in nuke_ix_keys: 917 if ix_key in indices and not nuked: 918 drop_queries[ix_key].extend(nuke_queries) 919 nuked = True 920 921 for ix_key, ix_unquoted in indices.items(): 922 if ix_key in drop_queries: 923 continue 924 if ix_unquoted.lower() not in existing_indices: 925 continue 926 927 if ix_key == 'unique' and upsert and self.flavor not in ('sqlite',) and not is_hypertable: 928 constraint_name_unquoted = ix_unquoted.replace('IX_', 'UQ_') 929 constraint_name = sql_item_name(constraint_name_unquoted, self.flavor) 930 constraint_or_index = ( 931 "CONSTRAINT" 932 if self.flavor not in ('mysql', 'mariadb') 933 else 'INDEX' 934 ) 935 drop_queries[ix_key].append( 936 f"ALTER TABLE {pipe_name}\n" 937 f"DROP {constraint_or_index} {constraint_name}" 938 ) 939 940 query = ( 941 ( 942 f"ALTER TABLE {pipe_name}\n" 943 if self.flavor in ('mysql', 'mariadb') 944 else '' 945 ) 946 + f"DROP INDEX {if_exists_str}" 947 + sql_item_name(ix_unquoted, self.flavor, index_schema) 948 ) 949 if self.flavor == 'mssql': 950 query += f"\nON {pipe_name}" 951 if ix_unquoted == clustered_ix: 952 query += "\nWITH (ONLINE = ON, MAXDOP = 4)" 953 drop_queries[ix_key].append(query) 954 955 956 return drop_queries
Return a dictionary mapping columns to a DROP INDEX
or equivalent query.
Parameters
- pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
- A dictionary of column names mapping to lists of queries.
3105def get_add_columns_queries( 3106 self, 3107 pipe: mrsm.Pipe, 3108 df: Union[pd.DataFrame, Dict[str, str]], 3109 _is_db_types: bool = False, 3110 debug: bool = False, 3111) -> List[str]: 3112 """ 3113 Add new null columns of the correct type to a table from a dataframe. 3114 3115 Parameters 3116 ---------- 3117 pipe: mrsm.Pipe 3118 The pipe to be altered. 3119 3120 df: Union[pd.DataFrame, Dict[str, str]] 3121 The pandas DataFrame which contains new columns. 3122 If a dictionary is provided, assume it maps columns to Pandas data types. 3123 3124 _is_db_types: bool, default False 3125 If `True`, assume `df` is a dictionary mapping columns to SQL native dtypes. 3126 3127 Returns 3128 ------- 3129 A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector. 3130 """ 3131 if not pipe.exists(debug=debug): 3132 return [] 3133 3134 if pipe.parameters.get('static', False): 3135 return [] 3136 3137 from decimal import Decimal 3138 import copy 3139 from meerschaum.utils.sql import ( 3140 sql_item_name, 3141 SINGLE_ALTER_TABLE_FLAVORS, 3142 get_table_cols_types, 3143 ) 3144 from meerschaum.utils.dtypes.sql import ( 3145 get_pd_type_from_db_type, 3146 get_db_type_from_pd_type, 3147 ) 3148 from meerschaum.utils.misc import flatten_list 3149 table_obj = self.get_pipe_table(pipe, debug=debug) 3150 is_dask = 'dask' in df.__module__ if not isinstance(df, dict) else False 3151 if is_dask: 3152 df = df.partitions[0].compute() 3153 df_cols_types = ( 3154 { 3155 col: str(typ) 3156 for col, typ in df.dtypes.items() 3157 } 3158 if not isinstance(df, dict) 3159 else copy.deepcopy(df) 3160 ) 3161 if not isinstance(df, dict) and len(df.index) > 0: 3162 for col, typ in list(df_cols_types.items()): 3163 if typ != 'object': 3164 continue 3165 val = df.iloc[0][col] 3166 if isinstance(val, (dict, list)): 3167 df_cols_types[col] = 'json' 3168 elif isinstance(val, Decimal): 3169 df_cols_types[col] = 'numeric' 3170 elif isinstance(val, str): 3171 df_cols_types[col] = 'str' 3172 db_cols_types = { 3173 col: get_pd_type_from_db_type(str(typ.type)) 3174 for col, typ in table_obj.columns.items() 3175 } if table_obj is not None else { 3176 col: get_pd_type_from_db_type(typ) 3177 for col, typ in get_table_cols_types( 3178 pipe.target, 3179 self, 3180 schema=self.get_pipe_schema(pipe), 3181 debug=debug, 3182 ).items() 3183 } 3184 new_cols = set(df_cols_types) - set(db_cols_types) 3185 if not new_cols: 3186 return [] 3187 3188 new_cols_types = { 3189 col: get_db_type_from_pd_type( 3190 df_cols_types[col], 3191 self.flavor