meerschaum.connectors.sql
Subpackage for SQLConnector subclass
18class SQLConnector(Connector): 19 """ 20 Connect to SQL databases via `sqlalchemy`. 21 22 SQLConnectors may be used as Meerschaum instance connectors. 23 Read more about connectors and instances at 24 https://meerschaum.io/reference/connectors/ 25 26 """ 27 28 IS_INSTANCE: bool = True 29 30 from ._create_engine import flavor_configs, create_engine 31 from ._sql import ( 32 read, 33 value, 34 exec, 35 execute, 36 to_sql, 37 exec_queries, 38 get_connection, 39 _cleanup_connections, 40 ) 41 from meerschaum.utils.sql import test_connection 42 from ._fetch import fetch, get_pipe_metadef 43 from ._cli import cli, _cli_exit 44 from ._pipes import ( 45 fetch_pipes_keys, 46 create_indices, 47 drop_indices, 48 get_create_index_queries, 49 get_drop_index_queries, 50 get_add_columns_queries, 51 get_alter_columns_queries, 52 delete_pipe, 53 get_pipe_data, 54 get_pipe_data_query, 55 register_pipe, 56 edit_pipe, 57 get_pipe_id, 58 get_pipe_attributes, 59 sync_pipe, 60 sync_pipe_inplace, 61 get_sync_time, 62 pipe_exists, 63 get_pipe_rowcount, 64 drop_pipe, 65 clear_pipe, 66 deduplicate_pipe, 67 get_pipe_table, 68 get_pipe_columns_types, 69 get_to_sql_dtype, 70 get_pipe_schema, 71 create_pipe_table_from_df, 72 get_pipe_columns_indices, 73 ) 74 from ._plugins import ( 75 register_plugin, 76 delete_plugin, 77 get_plugin_id, 78 get_plugin_version, 79 get_plugins, 80 get_plugin_user_id, 81 get_plugin_username, 82 get_plugin_attributes, 83 ) 84 from ._users import ( 85 register_user, 86 get_user_id, 87 get_users, 88 edit_user, 89 delete_user, 90 get_user_password_hash, 91 get_user_type, 92 get_user_attributes, 93 ) 94 from ._uri import from_uri, parse_uri 95 from ._instance import ( 96 _log_temporary_tables_creation, 97 _drop_temporary_table, 98 _drop_temporary_tables, 99 _drop_old_temporary_tables, 100 ) 101 102 def __init__( 103 self, 104 label: Optional[str] = None, 105 flavor: Optional[str] = None, 106 wait: bool = False, 107 connect: bool = False, 108 debug: bool = False, 109 **kw: Any 110 ): 111 """ 112 Parameters 113 ---------- 114 label: str, default 'main' 115 The identifying label for the connector. 116 E.g. for `sql:main`, 'main' is the label. 117 Defaults to 'main'. 118 119 flavor: Optional[str], default None 120 The database flavor, e.g. 121 `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc. 122 To see supported flavors, run the `bootstrap connectors` command. 123 124 wait: bool, default False 125 If `True`, block until a database connection has been made. 126 Defaults to `False`. 127 128 connect: bool, default False 129 If `True`, immediately attempt to connect the database and raise 130 a warning if the connection fails. 131 Defaults to `False`. 132 133 debug: bool, default False 134 Verbosity toggle. 135 Defaults to `False`. 136 137 kw: Any 138 All other arguments will be passed to the connector's attributes. 139 Therefore, a connector may be made without being registered, 140 as long enough parameters are supplied to the constructor. 141 """ 142 if 'uri' in kw: 143 uri = kw['uri'] 144 if uri.startswith('postgres') and not uri.startswith('postgresql'): 145 uri = uri.replace('postgres', 'postgresql', 1) 146 if uri.startswith('postgresql') and not uri.startswith('postgresql+'): 147 uri = uri.replace('postgresql://', 'postgresql+psycopg://', 1) 148 if uri.startswith('timescaledb://'): 149 uri = uri.replace('timescaledb://', 'postgresql+psycopg://', 1) 150 flavor = 'timescaledb' 151 kw['uri'] = uri 152 from_uri_params = self.from_uri(kw['uri'], as_dict=True) 153 label = label or from_uri_params.get('label', None) 154 _ = from_uri_params.pop('label', None) 155 156 ### Sometimes the flavor may be provided with a URI. 157 kw.update(from_uri_params) 158 if flavor: 159 kw['flavor'] = flavor 160 161 ### set __dict__ in base class 162 super().__init__( 163 'sql', 164 label = label or self.__dict__.get('label', None), 165 **kw 166 ) 167 168 if self.__dict__.get('flavor', None) == 'sqlite': 169 self._reset_attributes() 170 self._set_attributes( 171 'sql', 172 label = label, 173 inherit_default = False, 174 **kw 175 ) 176 ### For backwards compatability reasons, set the path for sql:local if its missing. 177 if self.label == 'local' and not self.__dict__.get('database', None): 178 from meerschaum.config._paths import SQLITE_DB_PATH 179 self.database = str(SQLITE_DB_PATH) 180 181 ### ensure flavor and label are set accordingly 182 if 'flavor' not in self.__dict__: 183 if flavor is None and 'uri' not in self.__dict__: 184 raise Exception( 185 f" Missing flavor. Provide flavor as a key for '{self}'." 186 ) 187 self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None) 188 189 if self.flavor == 'postgres': 190 self.flavor = 'postgresql' 191 192 self._debug = debug 193 ### Store the PID and thread at initialization 194 ### so we can dispose of the Pool in child processes or threads. 195 import os, threading 196 self._pid = os.getpid() 197 self._thread_ident = threading.current_thread().ident 198 self._sessions = {} 199 self._locks = {'_sessions': threading.RLock(), } 200 201 ### verify the flavor's requirements are met 202 if self.flavor not in self.flavor_configs: 203 error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector") 204 if not self.__dict__.get('uri'): 205 self.verify_attributes( 206 self.flavor_configs[self.flavor].get('requirements', set()), 207 debug=debug, 208 ) 209 210 if wait: 211 from meerschaum.connectors.poll import retry_connect 212 retry_connect(connector=self, debug=debug) 213 214 if connect: 215 if not self.test_connection(debug=debug): 216 warn(f"Failed to connect with connector '{self}'!", stack=False) 217 218 @property 219 def Session(self): 220 if '_Session' not in self.__dict__: 221 if self.engine is None: 222 return None 223 224 from meerschaum.utils.packages import attempt_import 225 sqlalchemy_orm = attempt_import('sqlalchemy.orm') 226 session_factory = sqlalchemy_orm.sessionmaker(self.engine) 227 self._Session = sqlalchemy_orm.scoped_session(session_factory) 228 229 return self._Session 230 231 @property 232 def engine(self): 233 """ 234 Return the SQLAlchemy engine connected to the configured database. 235 """ 236 import os 237 import threading 238 if '_engine' not in self.__dict__: 239 self._engine, self._engine_str = self.create_engine(include_uri=True) 240 241 same_process = os.getpid() == self._pid 242 same_thread = threading.current_thread().ident == self._thread_ident 243 244 ### handle child processes 245 if not same_process: 246 self._pid = os.getpid() 247 self._thread = threading.current_thread() 248 warn("Different PID detected. Disposing of connections...") 249 self._engine.dispose() 250 251 ### handle different threads 252 if not same_thread: 253 if self.flavor == 'duckdb': 254 warn("Different thread detected.") 255 self._engine.dispose() 256 257 return self._engine 258 259 @property 260 def DATABASE_URL(self) -> str: 261 """ 262 Return the URI connection string (alias for `SQLConnector.URI`. 263 """ 264 _ = self.engine 265 return str(self._engine_str) 266 267 @property 268 def URI(self) -> str: 269 """ 270 Return the URI connection string. 271 """ 272 _ = self.engine 273 return str(self._engine_str) 274 275 @property 276 def IS_THREAD_SAFE(self) -> str: 277 """ 278 Return whether this connector may be multithreaded. 279 """ 280 if self.flavor in ('duckdb', 'oracle'): 281 return False 282 if self.flavor == 'sqlite': 283 return ':memory:' not in self.URI 284 return True 285 286 287 @property 288 def metadata(self): 289 """ 290 Return the metadata bound to this configured schema. 291 """ 292 from meerschaum.utils.packages import attempt_import 293 sqlalchemy = attempt_import('sqlalchemy') 294 if '_metadata' not in self.__dict__: 295 self._metadata = sqlalchemy.MetaData(schema=self.schema) 296 return self._metadata 297 298 299 @property 300 def instance_schema(self): 301 """ 302 Return the schema name for Meerschaum tables. 303 """ 304 return self.schema 305 306 307 @property 308 def internal_schema(self): 309 """ 310 Return the schema name for internal tables. 311 """ 312 from meerschaum.config.static import STATIC_CONFIG 313 from meerschaum.utils.packages import attempt_import 314 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 315 schema_name = self.__dict__.get('internal_schema', None) or ( 316 STATIC_CONFIG['sql']['internal_schema'] 317 if self.flavor not in NO_SCHEMA_FLAVORS 318 else self.schema 319 ) 320 321 if '_internal_schema' not in self.__dict__: 322 self._internal_schema = schema_name 323 return self._internal_schema 324 325 326 @property 327 def db(self) -> Optional[databases.Database]: 328 from meerschaum.utils.packages import attempt_import 329 databases = attempt_import('databases', lazy=False, install=True) 330 url = self.DATABASE_URL 331 if 'mysql' in url: 332 url = url.replace('+pymysql', '') 333 if '_db' not in self.__dict__: 334 try: 335 self._db = databases.Database(url) 336 except KeyError: 337 ### Likely encountered an unsupported flavor. 338 from meerschaum.utils.warnings import warn 339 self._db = None 340 return self._db 341 342 343 @property 344 def db_version(self) -> Union[str, None]: 345 """ 346 Return the database version. 347 """ 348 _db_version = self.__dict__.get('_db_version', None) 349 if _db_version is not None: 350 return _db_version 351 352 from meerschaum.utils.sql import get_db_version 353 self._db_version = get_db_version(self) 354 return self._db_version 355 356 357 @property 358 def schema(self) -> Union[str, None]: 359 """ 360 Return the default schema to use. 361 A value of `None` will not prepend a schema. 362 """ 363 if 'schema' in self.__dict__: 364 return self.__dict__['schema'] 365 366 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 367 if self.flavor in NO_SCHEMA_FLAVORS: 368 self.__dict__['schema'] = None 369 return None 370 371 sqlalchemy = mrsm.attempt_import('sqlalchemy') 372 _schema = sqlalchemy.inspect(self.engine).default_schema_name 373 self.__dict__['schema'] = _schema 374 return _schema 375 376 377 def __getstate__(self): 378 return self.__dict__ 379 380 def __setstate__(self, d): 381 self.__dict__.update(d) 382 383 def __call__(self): 384 return self
Connect to SQL databases via sqlalchemy
.
SQLConnectors may be used as Meerschaum instance connectors. Read more about connectors and instances at https://meerschaum.io/reference/connectors/
102 def __init__( 103 self, 104 label: Optional[str] = None, 105 flavor: Optional[str] = None, 106 wait: bool = False, 107 connect: bool = False, 108 debug: bool = False, 109 **kw: Any 110 ): 111 """ 112 Parameters 113 ---------- 114 label: str, default 'main' 115 The identifying label for the connector. 116 E.g. for `sql:main`, 'main' is the label. 117 Defaults to 'main'. 118 119 flavor: Optional[str], default None 120 The database flavor, e.g. 121 `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc. 122 To see supported flavors, run the `bootstrap connectors` command. 123 124 wait: bool, default False 125 If `True`, block until a database connection has been made. 126 Defaults to `False`. 127 128 connect: bool, default False 129 If `True`, immediately attempt to connect the database and raise 130 a warning if the connection fails. 131 Defaults to `False`. 132 133 debug: bool, default False 134 Verbosity toggle. 135 Defaults to `False`. 136 137 kw: Any 138 All other arguments will be passed to the connector's attributes. 139 Therefore, a connector may be made without being registered, 140 as long enough parameters are supplied to the constructor. 141 """ 142 if 'uri' in kw: 143 uri = kw['uri'] 144 if uri.startswith('postgres') and not uri.startswith('postgresql'): 145 uri = uri.replace('postgres', 'postgresql', 1) 146 if uri.startswith('postgresql') and not uri.startswith('postgresql+'): 147 uri = uri.replace('postgresql://', 'postgresql+psycopg://', 1) 148 if uri.startswith('timescaledb://'): 149 uri = uri.replace('timescaledb://', 'postgresql+psycopg://', 1) 150 flavor = 'timescaledb' 151 kw['uri'] = uri 152 from_uri_params = self.from_uri(kw['uri'], as_dict=True) 153 label = label or from_uri_params.get('label', None) 154 _ = from_uri_params.pop('label', None) 155 156 ### Sometimes the flavor may be provided with a URI. 157 kw.update(from_uri_params) 158 if flavor: 159 kw['flavor'] = flavor 160 161 ### set __dict__ in base class 162 super().__init__( 163 'sql', 164 label = label or self.__dict__.get('label', None), 165 **kw 166 ) 167 168 if self.__dict__.get('flavor', None) == 'sqlite': 169 self._reset_attributes() 170 self._set_attributes( 171 'sql', 172 label = label, 173 inherit_default = False, 174 **kw 175 ) 176 ### For backwards compatability reasons, set the path for sql:local if its missing. 177 if self.label == 'local' and not self.__dict__.get('database', None): 178 from meerschaum.config._paths import SQLITE_DB_PATH 179 self.database = str(SQLITE_DB_PATH) 180 181 ### ensure flavor and label are set accordingly 182 if 'flavor' not in self.__dict__: 183 if flavor is None and 'uri' not in self.__dict__: 184 raise Exception( 185 f" Missing flavor. Provide flavor as a key for '{self}'." 186 ) 187 self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None) 188 189 if self.flavor == 'postgres': 190 self.flavor = 'postgresql' 191 192 self._debug = debug 193 ### Store the PID and thread at initialization 194 ### so we can dispose of the Pool in child processes or threads. 195 import os, threading 196 self._pid = os.getpid() 197 self._thread_ident = threading.current_thread().ident 198 self._sessions = {} 199 self._locks = {'_sessions': threading.RLock(), } 200 201 ### verify the flavor's requirements are met 202 if self.flavor not in self.flavor_configs: 203 error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector") 204 if not self.__dict__.get('uri'): 205 self.verify_attributes( 206 self.flavor_configs[self.flavor].get('requirements', set()), 207 debug=debug, 208 ) 209 210 if wait: 211 from meerschaum.connectors.poll import retry_connect 212 retry_connect(connector=self, debug=debug) 213 214 if connect: 215 if not self.test_connection(debug=debug): 216 warn(f"Failed to connect with connector '{self}'!", stack=False)
Parameters
- label (str, default 'main'):
The identifying label for the connector.
E.g. for
sql:main
, 'main' is the label. Defaults to 'main'. - flavor (Optional[str], default None):
The database flavor, e.g.
'sqlite'
,'postgresql'
,'cockroachdb'
, etc. To see supported flavors, run thebootstrap connectors
command. - wait (bool, default False):
If
True
, block until a database connection has been made. Defaults toFalse
. - connect (bool, default False):
If
True
, immediately attempt to connect the database and raise a warning if the connection fails. Defaults toFalse
. - debug (bool, default False):
Verbosity toggle.
Defaults to
False
. - kw (Any): All other arguments will be passed to the connector's attributes. Therefore, a connector may be made without being registered, as long enough parameters are supplied to the constructor.
218 @property 219 def Session(self): 220 if '_Session' not in self.__dict__: 221 if self.engine is None: 222 return None 223 224 from meerschaum.utils.packages import attempt_import 225 sqlalchemy_orm = attempt_import('sqlalchemy.orm') 226 session_factory = sqlalchemy_orm.sessionmaker(self.engine) 227 self._Session = sqlalchemy_orm.scoped_session(session_factory) 228 229 return self._Session
231 @property 232 def engine(self): 233 """ 234 Return the SQLAlchemy engine connected to the configured database. 235 """ 236 import os 237 import threading 238 if '_engine' not in self.__dict__: 239 self._engine, self._engine_str = self.create_engine(include_uri=True) 240 241 same_process = os.getpid() == self._pid 242 same_thread = threading.current_thread().ident == self._thread_ident 243 244 ### handle child processes 245 if not same_process: 246 self._pid = os.getpid() 247 self._thread = threading.current_thread() 248 warn("Different PID detected. Disposing of connections...") 249 self._engine.dispose() 250 251 ### handle different threads 252 if not same_thread: 253 if self.flavor == 'duckdb': 254 warn("Different thread detected.") 255 self._engine.dispose() 256 257 return self._engine
Return the SQLAlchemy engine connected to the configured database.
259 @property 260 def DATABASE_URL(self) -> str: 261 """ 262 Return the URI connection string (alias for `SQLConnector.URI`. 263 """ 264 _ = self.engine 265 return str(self._engine_str)
Return the URI connection string (alias for SQLConnector.URI
.
267 @property 268 def URI(self) -> str: 269 """ 270 Return the URI connection string. 271 """ 272 _ = self.engine 273 return str(self._engine_str)
Return the URI connection string.
275 @property 276 def IS_THREAD_SAFE(self) -> str: 277 """ 278 Return whether this connector may be multithreaded. 279 """ 280 if self.flavor in ('duckdb', 'oracle'): 281 return False 282 if self.flavor == 'sqlite': 283 return ':memory:' not in self.URI 284 return True
Return whether this connector may be multithreaded.
287 @property 288 def metadata(self): 289 """ 290 Return the metadata bound to this configured schema. 291 """ 292 from meerschaum.utils.packages import attempt_import 293 sqlalchemy = attempt_import('sqlalchemy') 294 if '_metadata' not in self.__dict__: 295 self._metadata = sqlalchemy.MetaData(schema=self.schema) 296 return self._metadata
Return the metadata bound to this configured schema.
299 @property 300 def instance_schema(self): 301 """ 302 Return the schema name for Meerschaum tables. 303 """ 304 return self.schema
Return the schema name for Meerschaum tables.
307 @property 308 def internal_schema(self): 309 """ 310 Return the schema name for internal tables. 311 """ 312 from meerschaum.config.static import STATIC_CONFIG 313 from meerschaum.utils.packages import attempt_import 314 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 315 schema_name = self.__dict__.get('internal_schema', None) or ( 316 STATIC_CONFIG['sql']['internal_schema'] 317 if self.flavor not in NO_SCHEMA_FLAVORS 318 else self.schema 319 ) 320 321 if '_internal_schema' not in self.__dict__: 322 self._internal_schema = schema_name 323 return self._internal_schema
Return the schema name for internal tables.
326 @property 327 def db(self) -> Optional[databases.Database]: 328 from meerschaum.utils.packages import attempt_import 329 databases = attempt_import('databases', lazy=False, install=True) 330 url = self.DATABASE_URL 331 if 'mysql' in url: 332 url = url.replace('+pymysql', '') 333 if '_db' not in self.__dict__: 334 try: 335 self._db = databases.Database(url) 336 except KeyError: 337 ### Likely encountered an unsupported flavor. 338 from meerschaum.utils.warnings import warn 339 self._db = None 340 return self._db
343 @property 344 def db_version(self) -> Union[str, None]: 345 """ 346 Return the database version. 347 """ 348 _db_version = self.__dict__.get('_db_version', None) 349 if _db_version is not None: 350 return _db_version 351 352 from meerschaum.utils.sql import get_db_version 353 self._db_version = get_db_version(self) 354 return self._db_version
Return the database version.
357 @property 358 def schema(self) -> Union[str, None]: 359 """ 360 Return the default schema to use. 361 A value of `None` will not prepend a schema. 362 """ 363 if 'schema' in self.__dict__: 364 return self.__dict__['schema'] 365 366 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 367 if self.flavor in NO_SCHEMA_FLAVORS: 368 self.__dict__['schema'] = None 369 return None 370 371 sqlalchemy = mrsm.attempt_import('sqlalchemy') 372 _schema = sqlalchemy.inspect(self.engine).default_schema_name 373 self.__dict__['schema'] = _schema 374 return _schema
Return the default schema to use.
A value of None
will not prepend a schema.
180def create_engine( 181 self, 182 include_uri: bool = False, 183 debug: bool = False, 184 **kw 185) -> 'sqlalchemy.engine.Engine': 186 """Create a sqlalchemy engine by building the engine string.""" 187 from meerschaum.utils.packages import attempt_import 188 from meerschaum.utils.warnings import error, warn 189 sqlalchemy = attempt_import('sqlalchemy') 190 import urllib 191 import copy 192 ### Install and patch required drivers. 193 if self.flavor in install_flavor_drivers: 194 attempt_import(*install_flavor_drivers[self.flavor], debug=debug, lazy=False, warn=False) 195 if self.flavor == 'mssql': 196 pyodbc = attempt_import('pyodbc', debug=debug, lazy=False, warn=False) 197 pyodbc.pooling = False 198 if self.flavor in require_patching_flavors: 199 from meerschaum.utils.packages import determine_version, _monkey_patch_get_distribution 200 import pathlib 201 for install_name, import_name in require_patching_flavors[self.flavor]: 202 pkg = attempt_import( 203 import_name, 204 debug=debug, 205 lazy=False, 206 warn=False 207 ) 208 _monkey_patch_get_distribution( 209 install_name, determine_version(pathlib.Path(pkg.__file__), venv='mrsm') 210 ) 211 212 ### supplement missing values with defaults (e.g. port number) 213 for a, value in flavor_configs[self.flavor]['defaults'].items(): 214 if a not in self.__dict__: 215 self.__dict__[a] = value 216 217 ### Verify that everything is in order. 218 if self.flavor not in flavor_configs: 219 error(f"Cannot create a connector with the flavor '{self.flavor}'.") 220 221 _engine = flavor_configs[self.flavor].get('engine', None) 222 _username = self.__dict__.get('username', None) 223 _password = self.__dict__.get('password', None) 224 _host = self.__dict__.get('host', None) 225 _port = self.__dict__.get('port', None) 226 _database = self.__dict__.get('database', None) 227 _options = self.__dict__.get('options', {}) 228 if isinstance(_options, str): 229 _options = dict(urllib.parse.parse_qsl(_options)) 230 _uri = self.__dict__.get('uri', None) 231 232 ### Handle registering specific dialects (due to installing in virtual environments). 233 if self.flavor in flavor_dialects: 234 sqlalchemy.dialects.registry.register(*flavor_dialects[self.flavor]) 235 236 ### self._sys_config was deepcopied and can be updated safely 237 if self.flavor in ("sqlite", "duckdb"): 238 engine_str = f"{_engine}:///{_database}" if not _uri else _uri 239 if 'create_engine' not in self._sys_config: 240 self._sys_config['create_engine'] = {} 241 if 'connect_args' not in self._sys_config['create_engine']: 242 self._sys_config['create_engine']['connect_args'] = {} 243 self._sys_config['create_engine']['connect_args'].update({"check_same_thread" : False}) 244 else: 245 engine_str = ( 246 _engine + "://" + (_username if _username is not None else '') + 247 ((":" + urllib.parse.quote_plus(_password)) if _password is not None else '') + 248 "@" + _host + ((":" + str(_port)) if _port is not None else '') + 249 (("/" + _database) if _database is not None else '') 250 + (("?" + urllib.parse.urlencode(_options)) if _options else '') 251 ) if not _uri else _uri 252 253 ### Sometimes the timescaledb:// flavor can slip in. 254 if _uri and self.flavor in ('timescaledb',) and self.flavor in _uri: 255 engine_str = engine_str.replace(f'{self.flavor}', 'postgresql', 1) 256 257 if debug: 258 dprint( 259 ( 260 (engine_str.replace(':' + _password, ':' + ('*' * len(_password)))) 261 if _password is not None else engine_str 262 ) + '\n' + f"{self._sys_config.get('create_engine', {}).get('connect_args', {})}" 263 ) 264 265 _kw_copy = copy.deepcopy(kw) 266 267 ### NOTE: Order of inheritance: 268 ### 1. Defaults 269 ### 2. System configuration 270 ### 3. Connector configuration 271 ### 4. Keyword arguments 272 _create_engine_args = flavor_configs.get(self.flavor, {}).get('create_engine', {}) 273 def _apply_create_engine_args(update): 274 if 'ALL' not in flavor_configs[self.flavor].get('omit_create_engine', {}): 275 _create_engine_args.update( 276 { k: v for k, v in update.items() 277 if 'omit_create_engine' not in flavor_configs[self.flavor] 278 or k not in flavor_configs[self.flavor].get('omit_create_engine') 279 } 280 ) 281 _apply_create_engine_args(self._sys_config.get('create_engine', {})) 282 _apply_create_engine_args(self.__dict__.get('create_engine', {})) 283 _apply_create_engine_args(_kw_copy) 284 285 try: 286 engine = sqlalchemy.create_engine( 287 engine_str, 288 ### I know this looks confusing, and maybe it's bad code, 289 ### but it's simple. It dynamically parses the config string 290 ### and splits it to separate the class name (QueuePool) 291 ### from the module name (sqlalchemy.pool). 292 poolclass = getattr( 293 attempt_import( 294 ".".join(self._sys_config['poolclass'].split('.')[:-1]) 295 ), 296 self._sys_config['poolclass'].split('.')[-1] 297 ), 298 echo = debug, 299 **_create_engine_args 300 ) 301 except Exception as e: 302 warn(f"Failed to create connector '{self}':\n{traceback.format_exc()}", stack=False) 303 engine = None 304 305 if include_uri: 306 return engine, engine_str 307 return engine
Create a sqlalchemy engine by building the engine string.
26def read( 27 self, 28 query_or_table: Union[str, sqlalchemy.Query], 29 params: Union[Dict[str, Any], List[str], None] = None, 30 dtype: Optional[Dict[str, Any]] = None, 31 coerce_float: bool = True, 32 chunksize: Optional[int] = -1, 33 workers: Optional[int] = None, 34 chunk_hook: Optional[Callable[[pandas.DataFrame], Any]] = None, 35 as_hook_results: bool = False, 36 chunks: Optional[int] = None, 37 schema: Optional[str] = None, 38 as_chunks: bool = False, 39 as_iterator: bool = False, 40 as_dask: bool = False, 41 index_col: Optional[str] = None, 42 silent: bool = False, 43 debug: bool = False, 44 **kw: Any 45) -> Union[ 46 pandas.DataFrame, 47 dask.DataFrame, 48 List[pandas.DataFrame], 49 List[Any], 50 None, 51]: 52 """ 53 Read a SQL query or table into a pandas dataframe. 54 55 Parameters 56 ---------- 57 query_or_table: Union[str, sqlalchemy.Query] 58 The SQL query (sqlalchemy Query or string) or name of the table from which to select. 59 60 params: Optional[Dict[str, Any]], default None 61 `List` or `Dict` of parameters to pass to `pandas.read_sql()`. 62 See the pandas documentation for more information: 63 https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html 64 65 dtype: Optional[Dict[str, Any]], default None 66 A dictionary of data types to pass to `pandas.read_sql()`. 67 See the pandas documentation for more information: 68 https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html 69 70 chunksize: Optional[int], default -1 71 How many chunks to read at a time. `None` will read everything in one large chunk. 72 Defaults to system configuration. 73 74 **NOTE:** DuckDB does not allow for chunking. 75 76 workers: Optional[int], default None 77 How many threads to use when consuming the generator. 78 Only applies if `chunk_hook` is provided. 79 80 chunk_hook: Optional[Callable[[pandas.DataFrame], Any]], default None 81 Hook function to execute once per chunk, e.g. writing and reading chunks intermittently. 82 See `--sync-chunks` for an example. 83 **NOTE:** `as_iterator` MUST be False (default). 84 85 as_hook_results: bool, default False 86 If `True`, return a `List` of the outputs of the hook function. 87 Only applicable if `chunk_hook` is not None. 88 89 **NOTE:** `as_iterator` MUST be `False` (default). 90 91 chunks: Optional[int], default None 92 Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and 93 return into a single dataframe. 94 For example, to limit the returned dataframe to 100,000 rows, 95 you could specify a `chunksize` of `1000` and `chunks` of `100`. 96 97 schema: Optional[str], default None 98 If just a table name is provided, optionally specify the table schema. 99 Defaults to `SQLConnector.schema`. 100 101 as_chunks: bool, default False 102 If `True`, return a list of DataFrames. 103 Otherwise return a single DataFrame. 104 105 as_iterator: bool, default False 106 If `True`, return the pandas DataFrame iterator. 107 `chunksize` must not be `None` (falls back to 1000 if so), 108 and hooks are not called in this case. 109 110 index_col: Optional[str], default None 111 If using Dask, use this column as the index column. 112 If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame. 113 114 silent: bool, default False 115 If `True`, don't raise warnings in case of errors. 116 Defaults to `False`. 117 118 Returns 119 ------- 120 A `pd.DataFrame` (default case), or an iterator, or a list of dataframes / iterators, 121 or `None` if something breaks. 122 123 """ 124 if chunks is not None and chunks <= 0: 125 return [] 126 from meerschaum.utils.sql import sql_item_name, truncate_item_name 127 from meerschaum.utils.dtypes import are_dtypes_equal, coerce_timezone 128 from meerschaum.utils.dtypes.sql import NUMERIC_PRECISION_FLAVORS, TIMEZONE_NAIVE_FLAVORS 129 from meerschaum.utils.packages import attempt_import, import_pandas 130 from meerschaum.utils.pool import get_pool 131 from meerschaum.utils.dataframe import chunksize_to_npartitions, get_numeric_cols 132 import warnings 133 import traceback 134 from decimal import Decimal 135 pd = import_pandas() 136 dd = None 137 is_dask = 'dask' in pd.__name__ 138 pandas = attempt_import('pandas') 139 is_dask = dd is not None 140 npartitions = chunksize_to_npartitions(chunksize) 141 if is_dask: 142 chunksize = None 143 schema = schema or self.schema 144 utc_dt_cols = [ 145 col 146 for col, typ in dtype.items() 147 if are_dtypes_equal(typ, 'datetime') and 'utc' in typ.lower() 148 ] if dtype else [] 149 150 if dtype and utc_dt_cols and self.flavor in TIMEZONE_NAIVE_FLAVORS: 151 dtype = dtype.copy() 152 for col in utc_dt_cols: 153 dtype[col] = 'datetime64[ns]' 154 155 pool = get_pool(workers=workers) 156 sqlalchemy = attempt_import("sqlalchemy") 157 default_chunksize = self._sys_config.get('chunksize', None) 158 chunksize = chunksize if chunksize != -1 else default_chunksize 159 if chunksize is None and as_iterator: 160 if not silent and self.flavor not in _disallow_chunks_flavors: 161 warn( 162 "An iterator may only be generated if chunksize is not None.\n" 163 + "Falling back to a chunksize of 1000.", stacklevel=3, 164 ) 165 chunksize = 1000 166 if chunksize is not None and self.flavor in _max_chunks_flavors: 167 if chunksize > _max_chunks_flavors[self.flavor]: 168 if chunksize != default_chunksize: 169 warn( 170 f"The specified chunksize of {chunksize} exceeds the maximum of " 171 + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n" 172 + f" Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.", 173 stacklevel=3, 174 ) 175 chunksize = _max_chunks_flavors[self.flavor] 176 177 if chunksize is not None and self.flavor in _disallow_chunks_flavors: 178 chunksize = None 179 180 if debug: 181 import time 182 start = time.perf_counter() 183 dprint(f"[{self}]\n{query_or_table}") 184 dprint(f"[{self}] Fetching with chunksize: {chunksize}") 185 186 ### This might be sqlalchemy object or the string of a table name. 187 ### We check for spaces and quotes to see if it might be a weird table. 188 if ( 189 ' ' not in str(query_or_table) 190 or ( 191 ' ' in str(query_or_table) 192 and str(query_or_table).startswith('"') 193 and str(query_or_table).endswith('"') 194 ) 195 ): 196 truncated_table_name = truncate_item_name(str(query_or_table), self.flavor) 197 if truncated_table_name != str(query_or_table) and not silent: 198 warn( 199 f"Table '{query_or_table}' is too long for '{self.flavor}'," 200 + f" will instead read the table '{truncated_table_name}'." 201 ) 202 203 query_or_table = sql_item_name(str(query_or_table), self.flavor, schema) 204 if debug: 205 dprint(f"[{self}] Reading from table {query_or_table}") 206 formatted_query = sqlalchemy.text("SELECT * FROM " + str(query_or_table)) 207 str_query = f"SELECT * FROM {query_or_table}" 208 else: 209 str_query = query_or_table 210 211 formatted_query = ( 212 sqlalchemy.text(str_query) 213 if not is_dask and isinstance(str_query, str) 214 else format_sql_query_for_dask(str_query) 215 ) 216 217 chunk_list = [] 218 chunk_hook_results = [] 219 def _process_chunk(_chunk, _retry_on_failure: bool = True): 220 if self.flavor in TIMEZONE_NAIVE_FLAVORS: 221 for col in utc_dt_cols: 222 _chunk[col] = coerce_timezone(_chunk[col], strip_timezone=False) 223 if not as_hook_results: 224 chunk_list.append(_chunk) 225 if chunk_hook is None: 226 return None 227 228 result = None 229 try: 230 result = chunk_hook( 231 _chunk, 232 workers=workers, 233 chunksize=chunksize, 234 debug=debug, 235 **kw 236 ) 237 except Exception: 238 result = False, traceback.format_exc() 239 from meerschaum.utils.formatting import get_console 240 if not silent: 241 get_console().print_exception() 242 243 ### If the chunk fails to process, try it again one more time. 244 if isinstance(result, tuple) and result[0] is False: 245 if _retry_on_failure: 246 return _process_chunk(_chunk, _retry_on_failure=False) 247 248 return result 249 250 try: 251 stream_results = not as_iterator and chunk_hook is not None and chunksize is not None 252 with warnings.catch_warnings(): 253 warnings.filterwarnings('ignore', 'case sensitivity issues') 254 255 read_sql_query_kwargs = { 256 'params': params, 257 'dtype': dtype, 258 'coerce_float': coerce_float, 259 'index_col': index_col, 260 } 261 if is_dask: 262 if index_col is None: 263 dd = None 264 pd = attempt_import('pandas') 265 read_sql_query_kwargs.update({ 266 'chunksize': chunksize, 267 }) 268 else: 269 read_sql_query_kwargs.update({ 270 'chunksize': chunksize, 271 }) 272 273 if is_dask and dd is not None: 274 ddf = dd.read_sql_query( 275 formatted_query, 276 self.URI, 277 **read_sql_query_kwargs 278 ) 279 else: 280 281 def get_chunk_generator(connectable): 282 chunk_generator = pd.read_sql_query( 283 formatted_query, 284 self.engine, 285 **read_sql_query_kwargs 286 ) 287 to_return = ( 288 chunk_generator 289 if as_iterator or chunksize is None 290 else ( 291 list(pool.imap(_process_chunk, chunk_generator)) 292 if as_hook_results 293 else None 294 ) 295 ) 296 return chunk_generator, to_return 297 298 if self.flavor in SKIP_READ_TRANSACTION_FLAVORS: 299 chunk_generator, to_return = get_chunk_generator(self.engine) 300 else: 301 with self.engine.begin() as transaction: 302 with transaction.execution_options(stream_results=stream_results) as connection: 303 chunk_generator, to_return = get_chunk_generator(connection) 304 305 if to_return is not None: 306 return to_return 307 308 except Exception as e: 309 if debug: 310 dprint(f"[{self}] Failed to execute query:\n\n{query_or_table}\n\n") 311 if not silent: 312 warn(str(e), stacklevel=3) 313 from meerschaum.utils.formatting import get_console 314 if not silent: 315 get_console().print_exception() 316 317 return None 318 319 if is_dask and dd is not None: 320 ddf = ddf.reset_index() 321 return ddf 322 323 chunk_list = [] 324 read_chunks = 0 325 chunk_hook_results = [] 326 if chunksize is None: 327 chunk_list.append(chunk_generator) 328 elif as_iterator: 329 return chunk_generator 330 else: 331 try: 332 for chunk in chunk_generator: 333 if chunk_hook is not None: 334 chunk_hook_results.append( 335 chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw) 336 ) 337 chunk_list.append(chunk) 338 read_chunks += 1 339 if chunks is not None and read_chunks >= chunks: 340 break 341 except Exception as e: 342 warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3) 343 from meerschaum.utils.formatting import get_console 344 if not silent: 345 get_console().print_exception() 346 347 read_chunks = 0 348 try: 349 for chunk in chunk_generator: 350 if chunk_hook is not None: 351 chunk_hook_results.append( 352 chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw) 353 ) 354 chunk_list.append(chunk) 355 read_chunks += 1 356 if chunks is not None and read_chunks >= chunks: 357 break 358 except Exception as e: 359 warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3) 360 from meerschaum.utils.formatting import get_console 361 if not silent: 362 get_console().print_exception() 363 364 return None 365 366 ### If no chunks returned, read without chunks 367 ### to get columns 368 if len(chunk_list) == 0: 369 with warnings.catch_warnings(): 370 warnings.filterwarnings('ignore', 'case sensitivity issues') 371 _ = read_sql_query_kwargs.pop('chunksize', None) 372 with self.engine.begin() as connection: 373 chunk_list.append( 374 pd.read_sql_query( 375 formatted_query, 376 connection, 377 **read_sql_query_kwargs 378 ) 379 ) 380 381 ### call the hook on any missed chunks. 382 if chunk_hook is not None and len(chunk_list) > len(chunk_hook_results): 383 for c in chunk_list[len(chunk_hook_results):]: 384 chunk_hook_results.append( 385 chunk_hook(c, chunksize=chunksize, debug=debug, **kw) 386 ) 387 388 ### chunksize is not None so must iterate 389 if debug: 390 end = time.perf_counter() 391 dprint(f"Fetched {len(chunk_list)} chunks in {round(end - start, 2)} seconds.") 392 393 if as_hook_results: 394 return chunk_hook_results 395 396 ### Skip `pd.concat()` if `as_chunks` is specified. 397 if as_chunks: 398 for c in chunk_list: 399 c.reset_index(drop=True, inplace=True) 400 for col in get_numeric_cols(c): 401 c[col] = c[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x) 402 return chunk_list 403 404 df = pd.concat(chunk_list).reset_index(drop=True) 405 ### NOTE: The calls to `canonical()` are to drop leading and trailing zeroes. 406 for col in get_numeric_cols(df): 407 df[col] = df[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x) 408 409 return df
Read a SQL query or table into a pandas dataframe.
Parameters
- query_or_table (Union[str, sqlalchemy.Query]): The SQL query (sqlalchemy Query or string) or name of the table from which to select.
- params (Optional[Dict[str, Any]], default None):
List
orDict
of parameters to pass topandas.read_sql()
. See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html - dtype (Optional[Dict[str, Any]], default None):
A dictionary of data types to pass to
pandas.read_sql()
. See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html chunksize (Optional[int], default -1): How many chunks to read at a time.
None
will read everything in one large chunk. Defaults to system configuration.NOTE: DuckDB does not allow for chunking.
- workers (Optional[int], default None):
How many threads to use when consuming the generator.
Only applies if
chunk_hook
is provided. - chunk_hook (Optional[Callable[[pandas.DataFrame], Any]], default None):
Hook function to execute once per chunk, e.g. writing and reading chunks intermittently.
See
--sync-chunks
for an example. NOTE:as_iterator
MUST be False (default). as_hook_results (bool, default False): If
True
, return aList
of the outputs of the hook function. Only applicable ifchunk_hook
is not None.NOTE:
as_iterator
MUST beFalse
(default).- chunks (Optional[int], default None):
Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and
return into a single dataframe.
For example, to limit the returned dataframe to 100,000 rows,
you could specify a
chunksize
of1000
andchunks
of100
. - schema (Optional[str], default None):
If just a table name is provided, optionally specify the table schema.
Defaults to
SQLConnector.schema
. - as_chunks (bool, default False):
If
True
, return a list of DataFrames. Otherwise return a single DataFrame. - as_iterator (bool, default False):
If
True
, return the pandas DataFrame iterator.chunksize
must not beNone
(falls back to 1000 if so), and hooks are not called in this case. - index_col (Optional[str], default None): If using Dask, use this column as the index column. If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame.
- silent (bool, default False):
If
True
, don't raise warnings in case of errors. Defaults toFalse
.
Returns
- A
pd.DataFrame
(default case), or an iterator, or a list of dataframes / iterators, - or
None
if something breaks.
412def value( 413 self, 414 query: str, 415 *args: Any, 416 use_pandas: bool = False, 417 **kw: Any 418) -> Any: 419 """ 420 Execute the provided query and return the first value. 421 422 Parameters 423 ---------- 424 query: str 425 The SQL query to execute. 426 427 *args: Any 428 The arguments passed to `meerschaum.connectors.sql.SQLConnector.exec` 429 if `use_pandas` is `False` (default) or to `meerschaum.connectors.sql.SQLConnector.read`. 430 431 use_pandas: bool, default False 432 If `True`, use `meerschaum.connectors.SQLConnector.read`, otherwise use 433 `meerschaum.connectors.sql.SQLConnector.exec` (default). 434 **NOTE:** This is always `True` for DuckDB. 435 436 **kw: Any 437 See `args`. 438 439 Returns 440 ------- 441 Any value returned from the query. 442 443 """ 444 from meerschaum.utils.packages import attempt_import 445 sqlalchemy = attempt_import('sqlalchemy') 446 if self.flavor == 'duckdb': 447 use_pandas = True 448 if use_pandas: 449 try: 450 return self.read(query, *args, **kw).iloc[0, 0] 451 except Exception: 452 return None 453 454 _close = kw.get('close', True) 455 _commit = kw.get('commit', (self.flavor != 'mssql')) 456 457 # _close = True 458 # _commit = True 459 460 try: 461 result, connection = self.exec( 462 query, 463 *args, 464 with_connection=True, 465 close=False, 466 commit=_commit, 467 **kw 468 ) 469 first = result.first() if result is not None else None 470 _val = first[0] if first is not None else None 471 except Exception as e: 472 warn(e, stacklevel=3) 473 return None 474 if _close: 475 try: 476 connection.close() 477 except Exception as e: 478 warn("Failed to close connection with exception:\n" + str(e)) 479 return _val
Execute the provided query and return the first value.
Parameters
- query (str): The SQL query to execute.
- *args (Any):
The arguments passed to
meerschaum.connectors.sql.SQLConnector.exec
ifuse_pandas
isFalse
(default) or tomeerschaum.connectors.sql.SQLConnector.read
. - use_pandas (bool, default False):
If
True
, usemeerschaum.connectors.SQLConnector.read
, otherwise usemeerschaum.connectors.sql.SQLConnector.exec
(default). NOTE: This is alwaysTrue
for DuckDB. - **kw (Any):
See
args
.
Returns
- Any value returned from the query.
493def exec( 494 self, 495 query: str, 496 *args: Any, 497 silent: bool = False, 498 debug: bool = False, 499 commit: Optional[bool] = None, 500 close: Optional[bool] = None, 501 with_connection: bool = False, 502 _connection=None, 503 _transaction=None, 504 **kw: Any 505) -> Union[ 506 sqlalchemy.engine.result.resultProxy, 507 sqlalchemy.engine.cursor.LegacyCursorResult, 508 Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection], 509 Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection], 510 None 511]: 512 """ 513 Execute SQL code and return the `sqlalchemy` result, e.g. when calling stored procedures. 514 515 If inserting data, please use bind variables to avoid SQL injection! 516 517 Parameters 518 ---------- 519 query: Union[str, List[str], Tuple[str]] 520 The query to execute. 521 If `query` is a list or tuple, call `self.exec_queries()` instead. 522 523 args: Any 524 Arguments passed to `sqlalchemy.engine.execute`. 525 526 silent: bool, default False 527 If `True`, suppress warnings. 528 529 commit: Optional[bool], default None 530 If `True`, commit the changes after execution. 531 Causes issues with flavors like `'mssql'`. 532 This does not apply if `query` is a list of strings. 533 534 close: Optional[bool], default None 535 If `True`, close the connection after execution. 536 Causes issues with flavors like `'mssql'`. 537 This does not apply if `query` is a list of strings. 538 539 with_connection: bool, default False 540 If `True`, return a tuple including the connection object. 541 This does not apply if `query` is a list of strings. 542 543 Returns 544 ------- 545 The `sqlalchemy` result object, or a tuple with the connection if `with_connection` is provided. 546 547 """ 548 if isinstance(query, (list, tuple)): 549 return self.exec_queries( 550 list(query), 551 *args, 552 silent=silent, 553 debug=debug, 554 **kw 555 ) 556 557 from meerschaum.utils.packages import attempt_import 558 sqlalchemy = attempt_import("sqlalchemy") 559 if debug: 560 dprint(f"[{self}] Executing query:\n{query}") 561 562 _close = close if close is not None else (self.flavor != 'mssql') 563 _commit = commit if commit is not None else ( 564 (self.flavor != 'mssql' or 'select' not in str(query).lower()) 565 ) 566 567 ### Select and Insert objects need to be compiled (SQLAlchemy 2.0.0+). 568 if not hasattr(query, 'compile'): 569 query = sqlalchemy.text(query) 570 571 connection = _connection if _connection is not None else self.get_connection() 572 573 try: 574 transaction = ( 575 _transaction 576 if _transaction is not None else ( 577 connection.begin() 578 if _commit 579 else None 580 ) 581 ) 582 except sqlalchemy.exc.InvalidRequestError as e: 583 if _connection is not None or _transaction is not None: 584 raise e 585 connection = self.get_connection(rebuild=True) 586 transaction = connection.begin() 587 588 if transaction is not None and not transaction.is_active and _transaction is not None: 589 connection = self.get_connection(rebuild=True) 590 transaction = connection.begin() if _commit else None 591 592 result = None 593 try: 594 result = connection.execute(query, *args, **kw) 595 if _commit: 596 transaction.commit() 597 except Exception as e: 598 if debug: 599 dprint(f"[{self}] Failed to execute query:\n\n{query}\n\n{e}") 600 if not silent: 601 warn(str(e), stacklevel=3) 602 result = None 603 if _commit: 604 transaction.rollback() 605 connection = self.get_connection(rebuild=True) 606 finally: 607 if _close: 608 connection.close() 609 610 if with_connection: 611 return result, connection 612 613 return result
Execute SQL code and return the sqlalchemy
result, e.g. when calling stored procedures.
If inserting data, please use bind variables to avoid SQL injection!
Parameters
- query (Union[str, List[str], Tuple[str]]):
The query to execute.
If
query
is a list or tuple, callself.exec_queries()
instead. - args (Any):
Arguments passed to
sqlalchemy.engine.execute
. - silent (bool, default False):
If
True
, suppress warnings. - commit (Optional[bool], default None):
If
True
, commit the changes after execution. Causes issues with flavors like'mssql'
. This does not apply ifquery
is a list of strings. - close (Optional[bool], default None):
If
True
, close the connection after execution. Causes issues with flavors like'mssql'
. This does not apply ifquery
is a list of strings. - with_connection (bool, default False):
If
True
, return a tuple including the connection object. This does not apply ifquery
is a list of strings.
Returns
- The
sqlalchemy
result object, or a tuple with the connection ifwith_connection
is provided.
482def execute( 483 self, 484 *args : Any, 485 **kw : Any 486) -> Optional[sqlalchemy.engine.result.resultProxy]: 487 """ 488 An alias for `meerschaum.connectors.sql.SQLConnector.exec`. 489 """ 490 return self.exec(*args, **kw)
An alias for meerschaum.connectors.sql.SQLConnector.exec
.
710def to_sql( 711 self, 712 df: pandas.DataFrame, 713 name: str = None, 714 index: bool = False, 715 if_exists: str = 'replace', 716 method: str = "", 717 chunksize: Optional[int] = -1, 718 schema: Optional[str] = None, 719 silent: bool = False, 720 debug: bool = False, 721 as_tuple: bool = False, 722 as_dict: bool = False, 723 _connection=None, 724 _transaction=None, 725 **kw 726) -> Union[bool, SuccessTuple]: 727 """ 728 Upload a DataFrame's contents to the SQL server. 729 730 Parameters 731 ---------- 732 df: pd.DataFrame 733 The DataFrame to be uploaded. 734 735 name: str 736 The name of the table to be created. 737 738 index: bool, default False 739 If True, creates the DataFrame's indices as columns. 740 741 if_exists: str, default 'replace' 742 Drop and create the table ('replace') or append if it exists 743 ('append') or raise Exception ('fail'). 744 Options are ['replace', 'append', 'fail']. 745 746 method: str, default '' 747 None or multi. Details on pandas.to_sql. 748 749 chunksize: Optional[int], default -1 750 How many rows to insert at a time. 751 752 schema: Optional[str], default None 753 Optionally override the schema for the table. 754 Defaults to `SQLConnector.schema`. 755 756 as_tuple: bool, default False 757 If `True`, return a (success_bool, message) tuple instead of a `bool`. 758 Defaults to `False`. 759 760 as_dict: bool, default False 761 If `True`, return a dictionary of transaction information. 762 The keys are `success`, `msg`, `start`, `end`, `duration`, `num_rows`, `chunksize`, 763 `method`, and `target`. 764 765 kw: Any 766 Additional arguments will be passed to the DataFrame's `to_sql` function 767 768 Returns 769 ------- 770 Either a `bool` or a `SuccessTuple` (depends on `as_tuple`). 771 """ 772 import time 773 import json 774 import decimal 775 from decimal import Decimal, Context 776 from meerschaum.utils.warnings import error, warn 777 import warnings 778 import functools 779 if name is None: 780 error(f"Name must not be `None` to insert data into {self}.") 781 782 ### We're requiring `name` to be positional, and sometimes it's passed in from background jobs. 783 kw.pop('name', None) 784 785 schema = schema or self.schema 786 787 from meerschaum.utils.sql import ( 788 sql_item_name, 789 table_exists, 790 json_flavors, 791 truncate_item_name, 792 DROP_IF_EXISTS_FLAVORS, 793 ) 794 from meerschaum.utils.dataframe import get_json_cols, get_numeric_cols, get_uuid_cols 795 from meerschaum.utils.dtypes import are_dtypes_equal, quantize_decimal, coerce_timezone 796 from meerschaum.utils.dtypes.sql import ( 797 NUMERIC_PRECISION_FLAVORS, 798 PD_TO_SQLALCHEMY_DTYPES_FLAVORS, 799 get_db_type_from_pd_type, 800 ) 801 from meerschaum.connectors.sql._create_engine import flavor_configs 802 from meerschaum.utils.packages import attempt_import, import_pandas 803 sqlalchemy = attempt_import('sqlalchemy', debug=debug) 804 pd = import_pandas() 805 is_dask = 'dask' in df.__module__ 806 807 stats = {'target': name, } 808 ### resort to defaults if None 809 if method == "": 810 if self.flavor in _bulk_flavors: 811 method = functools.partial(psql_insert_copy, schema=self.schema) 812 else: 813 ### Should resolve to 'multi' or `None`. 814 method = flavor_configs.get(self.flavor, {}).get('to_sql', {}).get('method', 'multi') 815 stats['method'] = method.__name__ if hasattr(method, '__name__') else str(method) 816 817 default_chunksize = self._sys_config.get('chunksize', None) 818 chunksize = chunksize if chunksize != -1 else default_chunksize 819 if chunksize is not None and self.flavor in _max_chunks_flavors: 820 if chunksize > _max_chunks_flavors[self.flavor]: 821 if chunksize != default_chunksize: 822 warn( 823 f"The specified chunksize of {chunksize} exceeds the maximum of " 824 + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n" 825 + f" Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.", 826 stacklevel = 3, 827 ) 828 chunksize = _max_chunks_flavors[self.flavor] 829 stats['chunksize'] = chunksize 830 831 success, msg = False, "Default to_sql message" 832 start = time.perf_counter() 833 if debug: 834 msg = f"[{self}] Inserting {len(df)} rows with chunksize: {chunksize}..." 835 print(msg, end="", flush=True) 836 stats['num_rows'] = len(df) 837 838 ### Check if the name is too long. 839 truncated_name = truncate_item_name(name, self.flavor) 840 if name != truncated_name: 841 warn( 842 f"Table '{name}' is too long for '{self.flavor}'," 843 + f" will instead create the table '{truncated_name}'." 844 ) 845 846 ### filter out non-pandas args 847 import inspect 848 to_sql_params = inspect.signature(df.to_sql).parameters 849 to_sql_kw = {} 850 for k, v in kw.items(): 851 if k in to_sql_params: 852 to_sql_kw[k] = v 853 854 to_sql_kw.update({ 855 'name': truncated_name, 856 'schema': schema, 857 ('con' if not is_dask else 'uri'): (self.engine if not is_dask else self.URI), 858 'index': index, 859 'if_exists': if_exists, 860 'method': method, 861 'chunksize': chunksize, 862 }) 863 if is_dask: 864 to_sql_kw.update({ 865 'parallel': True, 866 }) 867 elif _connection is not None: 868 to_sql_kw['con'] = _connection 869 870 if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else "" 871 if self.flavor == 'oracle': 872 ### For some reason 'replace' doesn't work properly in pandas, 873 ### so try dropping first. 874 if if_exists == 'replace' and table_exists(name, self, schema=schema, debug=debug): 875 success = self.exec( 876 f"DROP TABLE {if_exists_str}" + sql_item_name(name, 'oracle', schema) 877 ) is not None 878 if not success: 879 warn(f"Unable to drop {name}") 880 881 ### Enforce NVARCHAR(2000) as text instead of CLOB. 882 dtype = to_sql_kw.get('dtype', {}) 883 for col, typ in df.dtypes.items(): 884 if are_dtypes_equal(str(typ), 'object'): 885 dtype[col] = sqlalchemy.types.NVARCHAR(2000) 886 elif are_dtypes_equal(str(typ), 'int'): 887 dtype[col] = sqlalchemy.types.INTEGER 888 to_sql_kw['dtype'] = dtype 889 elif self.flavor == 'duckdb': 890 dtype = to_sql_kw.get('dtype', {}) 891 dt_cols = [col for col, typ in df.dtypes.items() if are_dtypes_equal(str(typ), 'datetime')] 892 for col in dt_cols: 893 df[col] = coerce_timezone(df[col], strip_utc=False) 894 elif self.flavor == 'mssql': 895 dtype = to_sql_kw.get('dtype', {}) 896 dt_cols = [col for col, typ in df.dtypes.items() if are_dtypes_equal(str(typ), 'datetime')] 897 new_dtype = {} 898 for col in dt_cols: 899 if col in dtype: 900 continue 901 dt_typ = get_db_type_from_pd_type(str(df.dtypes[col]), self.flavor, as_sqlalchemy=True) 902 if col not in dtype: 903 new_dtype[col] = dt_typ 904 905 dtype.update(new_dtype) 906 to_sql_kw['dtype'] = dtype 907 908 ### Check for JSON columns. 909 if self.flavor not in json_flavors: 910 json_cols = get_json_cols(df) 911 if json_cols: 912 for col in json_cols: 913 df[col] = df[col].apply( 914 ( 915 lambda x: json.dumps(x, default=str, sort_keys=True) 916 if not isinstance(x, Hashable) 917 else x 918 ) 919 ) 920 921 ### Check for numeric columns. 922 numeric_scale, numeric_precision = NUMERIC_PRECISION_FLAVORS.get(self.flavor, (None, None)) 923 if numeric_precision is not None and numeric_scale is not None: 924 numeric_cols = get_numeric_cols(df) 925 for col in numeric_cols: 926 df[col] = df[col].apply( 927 lambda x: ( 928 quantize_decimal(x, numeric_scale, numeric_precision) 929 if isinstance(x, Decimal) 930 else x 931 ) 932 ) 933 934 if PD_TO_SQLALCHEMY_DTYPES_FLAVORS['uuid'].get(self.flavor, None) != 'Uuid': 935 uuid_cols = get_uuid_cols(df) 936 for col in uuid_cols: 937 df[col] = df[col].astype(str) 938 939 try: 940 with warnings.catch_warnings(): 941 warnings.filterwarnings('ignore') 942 df.to_sql(**to_sql_kw) 943 success = True 944 except Exception as e: 945 if not silent: 946 warn(str(e)) 947 success, msg = False, str(e) 948 949 end = time.perf_counter() 950 if success: 951 msg = f"It took {round(end - start, 2)} seconds to sync {len(df)} rows to {name}." 952 stats['start'] = start 953 stats['end'] = end 954 stats['duration'] = end - start 955 956 if debug: 957 print(f" done.", flush=True) 958 dprint(msg) 959 960 stats['success'] = success 961 stats['msg'] = msg 962 if as_tuple: 963 return success, msg 964 if as_dict: 965 return stats 966 return success
Upload a DataFrame's contents to the SQL server.
Parameters
- df (pd.DataFrame): The DataFrame to be uploaded.
- name (str): The name of the table to be created.
- index (bool, default False): If True, creates the DataFrame's indices as columns.
- if_exists (str, default 'replace'): Drop and create the table ('replace') or append if it exists ('append') or raise Exception ('fail'). Options are ['replace', 'append', 'fail'].
- method (str, default ''): None or multi. Details on pandas.to_sql.
- chunksize (Optional[int], default -1): How many rows to insert at a time.
- schema (Optional[str], default None):
Optionally override the schema for the table.
Defaults to
SQLConnector.schema
. - as_tuple (bool, default False):
If
True
, return a (success_bool, message) tuple instead of abool
. Defaults toFalse
. - as_dict (bool, default False):
If
True
, return a dictionary of transaction information. The keys aresuccess
,msg
,start
,end
,duration
,num_rows
,chunksize
,method
, andtarget
. - kw (Any):
Additional arguments will be passed to the DataFrame's
to_sql
function
Returns
- Either a
bool
or aSuccessTuple
(depends onas_tuple
).
616def exec_queries( 617 self, 618 queries: List[ 619 Union[ 620 str, 621 Tuple[str, Callable[['sqlalchemy.orm.session.Session'], List[str]]] 622 ] 623 ], 624 break_on_error: bool = False, 625 rollback: bool = True, 626 silent: bool = False, 627 debug: bool = False, 628) -> List[sqlalchemy.engine.cursor.LegacyCursorResult]: 629 """ 630 Execute a list of queries in a single transaction. 631 632 Parameters 633 ---------- 634 queries: List[ 635 Union[ 636 str, 637 Tuple[str, Callable[[], List[str]]] 638 ] 639 ] 640 The queries in the transaction to be executed. 641 If a query is a tuple, the second item of the tuple 642 will be considered a callable hook that returns a list of queries to be executed 643 before the next item in the list. 644 645 break_on_error: bool, default False 646 If `True`, stop executing when a query fails. 647 648 rollback: bool, default True 649 If `break_on_error` is `True`, rollback the transaction if a query fails. 650 651 silent: bool, default False 652 If `True`, suppress warnings. 653 654 Returns 655 ------- 656 A list of SQLAlchemy results. 657 """ 658 from meerschaum.utils.warnings import warn 659 from meerschaum.utils.debug import dprint 660 from meerschaum.utils.packages import attempt_import 661 sqlalchemy, sqlalchemy_orm = attempt_import('sqlalchemy', 'sqlalchemy.orm') 662 session = sqlalchemy_orm.Session(self.engine) 663 664 result = None 665 results = [] 666 with session.begin(): 667 for query in queries: 668 hook = None 669 result = None 670 671 if isinstance(query, tuple): 672 query, hook = query 673 if isinstance(query, str): 674 query = sqlalchemy.text(query) 675 676 if debug: 677 dprint(f"[{self}]\n" + str(query)) 678 679 try: 680 result = session.execute(query) 681 session.flush() 682 except Exception as e: 683 msg = (f"Encountered error while executing:\n{e}") 684 if not silent: 685 warn(msg) 686 elif debug: 687 dprint(f"[{self}]\n" + str(msg)) 688 result = None 689 if result is None and break_on_error: 690 if rollback: 691 session.rollback() 692 break 693 elif result is not None and hook is not None: 694 hook_queries = hook(session) 695 if hook_queries: 696 hook_results = self.exec_queries( 697 hook_queries, 698 break_on_error = break_on_error, 699 rollback=rollback, 700 silent=silent, 701 debug=debug, 702 ) 703 result = (result, hook_results) 704 705 results.append(result) 706 707 return results
Execute a list of queries in a single transaction.
Parameters
- queries (List[): Union[ str, Tuple[str, Callable[[], List[str]]] ]
- ]: The queries in the transaction to be executed. If a query is a tuple, the second item of the tuple will be considered a callable hook that returns a list of queries to be executed before the next item in the list.
- break_on_error (bool, default False):
If
True
, stop executing when a query fails. - rollback (bool, default True):
If
break_on_error
isTrue
, rollback the transaction if a query fails. - silent (bool, default False):
If
True
, suppress warnings.
Returns
- A list of SQLAlchemy results.
1065def get_connection(self, rebuild: bool = False) -> 'sqlalchemy.engine.base.Connection': 1066 """ 1067 Return the current alive connection. 1068 1069 Parameters 1070 ---------- 1071 rebuild: bool, default False 1072 If `True`, close the previous connection and open a new one. 1073 1074 Returns 1075 ------- 1076 A `sqlalchemy.engine.base.Connection` object. 1077 """ 1078 import threading 1079 if '_thread_connections' not in self.__dict__: 1080 self.__dict__['_thread_connections'] = {} 1081 1082 self._cleanup_connections() 1083 1084 thread_id = threading.get_ident() 1085 1086 thread_connections = self.__dict__.get('_thread_connections', {}) 1087 connection = thread_connections.get(thread_id, None) 1088 1089 if rebuild and connection is not None: 1090 try: 1091 connection.close() 1092 except Exception: 1093 pass 1094 1095 _ = thread_connections.pop(thread_id, None) 1096 connection = None 1097 1098 if connection is None or connection.closed: 1099 connection = self.engine.connect() 1100 thread_connections[thread_id] = connection 1101 1102 return connection
Return the current alive connection.
Parameters
- rebuild (bool, default False):
If
True
, close the previous connection and open a new one.
Returns
- A
sqlalchemy.engine.base.Connection
object.
642def test_connection( 643 self, 644 **kw: Any 645) -> Union[bool, None]: 646 """ 647 Test if a successful connection to the database may be made. 648 649 Parameters 650 ---------- 651 **kw: 652 The keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`. 653 654 Returns 655 ------- 656 `True` if a connection is made, otherwise `False` or `None` in case of failure. 657 658 """ 659 import warnings 660 from meerschaum.connectors.poll import retry_connect 661 _default_kw = {'max_retries': 1, 'retry_wait': 0, 'warn': False, 'connector': self} 662 _default_kw.update(kw) 663 with warnings.catch_warnings(): 664 warnings.filterwarnings('ignore', 'Could not') 665 try: 666 return retry_connect(**_default_kw) 667 except Exception as e: 668 return False
Test if a successful connection to the database may be made.
Parameters
- **kw:: The keyword arguments are passed to
meerschaum.connectors.poll.retry_connect
.
Returns
True
if a connection is made, otherwiseFalse
orNone
in case of failure.
17def fetch( 18 self, 19 pipe: mrsm.Pipe, 20 begin: Union[datetime, int, str, None] = '', 21 end: Union[datetime, int, str, None] = None, 22 check_existing: bool = True, 23 chunk_hook: Optional[Callable[['pd.DataFrame'], Any]] = None, 24 chunksize: Optional[int] = -1, 25 workers: Optional[int] = None, 26 debug: bool = False, 27 **kw: Any 28) -> Union['pd.DataFrame', List[Any], None]: 29 """Execute the SQL definition and return a Pandas DataFrame. 30 31 Parameters 32 ---------- 33 pipe: mrsm.Pipe 34 The pipe object which contains the `fetch` metadata. 35 36 - pipe.columns['datetime']: str 37 - Name of the datetime column for the remote table. 38 - pipe.parameters['fetch']: Dict[str, Any] 39 - Parameters necessary to execute a query. 40 - pipe.parameters['fetch']['definition']: str 41 - Raw SQL query to execute to generate the pandas DataFrame. 42 - pipe.parameters['fetch']['backtrack_minutes']: Union[int, float] 43 - How many minutes before `begin` to search for data (*optional*). 44 45 begin: Union[datetime, int, str, None], default None 46 Most recent datatime to search for data. 47 If `backtrack_minutes` is provided, subtract `backtrack_minutes`. 48 49 end: Union[datetime, int, str, None], default None 50 The latest datetime to search for data. 51 If `end` is `None`, do not bound 52 53 check_existing: bool, defult True 54 If `False`, use a backtrack interval of 0 minutes. 55 56 chunk_hook: Callable[[pd.DataFrame], Any], default None 57 A function to pass to `SQLConnector.read()` that accepts a Pandas DataFrame. 58 59 chunksize: Optional[int], default -1 60 How many rows to load into memory at once (when `chunk_hook` is provided). 61 Otherwise the entire result set is loaded into memory. 62 63 workers: Optional[int], default None 64 How many threads to use when consuming the generator (when `chunk_hook is provided). 65 Defaults to the number of cores. 66 67 debug: bool, default False 68 Verbosity toggle. 69 70 Returns 71 ------- 72 A pandas DataFrame or `None`. 73 If `chunk_hook` is not None, return a list of the hook function's results. 74 """ 75 meta_def = self.get_pipe_metadef( 76 pipe, 77 begin=begin, 78 end=end, 79 check_existing=check_existing, 80 debug=debug, 81 **kw 82 ) 83 as_hook_results = chunk_hook is not None 84 chunks = self.read( 85 meta_def, 86 chunk_hook=chunk_hook, 87 as_hook_results=as_hook_results, 88 chunksize=chunksize, 89 workers=workers, 90 debug=debug, 91 ) 92 ### if sqlite, parse for datetimes 93 if not as_hook_results and self.flavor == 'sqlite': 94 from meerschaum.utils.dataframe import parse_df_datetimes 95 from meerschaum.utils.dtypes import are_dtypes_equal 96 ignore_cols = [ 97 col 98 for col, dtype in pipe.dtypes.items() 99 if not are_dtypes_equal(str(dtype), 'datetime') 100 ] 101 return ( 102 parse_df_datetimes( 103 chunk, 104 ignore_cols=ignore_cols, 105 strip_timezone=(pipe.tzinfo is None), 106 debug=debug, 107 ) 108 for chunk in chunks 109 ) 110 return chunks
Execute the SQL definition and return a Pandas DataFrame.
Parameters
pipe (mrsm.Pipe): The pipe object which contains the
fetch
metadata.- pipe.columns['datetime']: str
- Name of the datetime column for the remote table.
- pipe.parameters['fetch']: Dict[str, Any]
- Parameters necessary to execute a query.
- pipe.parameters['fetch']['definition']: str
- Raw SQL query to execute to generate the pandas DataFrame.
- pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
- How many minutes before
begin
to search for data (optional).
- How many minutes before
- pipe.columns['datetime']: str
- begin (Union[datetime, int, str, None], default None):
Most recent datatime to search for data.
If
backtrack_minutes
is provided, subtractbacktrack_minutes
. - end (Union[datetime, int, str, None], default None):
The latest datetime to search for data.
If
end
isNone
, do not bound - check_existing (bool, defult True):
If
False
, use a backtrack interval of 0 minutes. - chunk_hook (Callable[[pd.DataFrame], Any], default None):
A function to pass to
SQLConnector.read()
that accepts a Pandas DataFrame. - chunksize (Optional[int], default -1):
How many rows to load into memory at once (when
chunk_hook
is provided). Otherwise the entire result set is loaded into memory. - workers (Optional[int], default None): How many threads to use when consuming the generator (when `chunk_hook is provided). Defaults to the number of cores.
- debug (bool, default False): Verbosity toggle.
Returns
- A pandas DataFrame or
None
. - If
chunk_hook
is not None, return a list of the hook function's results.
113def get_pipe_metadef( 114 self, 115 pipe: mrsm.Pipe, 116 params: Optional[Dict[str, Any]] = None, 117 begin: Union[datetime, int, str, None] = '', 118 end: Union[datetime, int, str, None] = None, 119 check_existing: bool = True, 120 debug: bool = False, 121 **kw: Any 122) -> Union[str, None]: 123 """ 124 Return a pipe's meta definition fetch query. 125 126 params: Optional[Dict[str, Any]], default None 127 Optional params dictionary to build the `WHERE` clause. 128 See `meerschaum.utils.sql.build_where`. 129 130 begin: Union[datetime, int, str, None], default None 131 Most recent datatime to search for data. 132 If `backtrack_minutes` is provided, subtract `backtrack_minutes`. 133 134 end: Union[datetime, int, str, None], default None 135 The latest datetime to search for data. 136 If `end` is `None`, do not bound 137 138 check_existing: bool, default True 139 If `True`, apply the backtrack interval. 140 141 debug: bool, default False 142 Verbosity toggle. 143 144 Returns 145 ------- 146 A pipe's meta definition fetch query string. 147 """ 148 from meerschaum.utils.debug import dprint 149 from meerschaum.utils.warnings import warn, error 150 from meerschaum.utils.sql import sql_item_name, dateadd_str, build_where 151 from meerschaum.utils.misc import is_int 152 from meerschaum.config import get_config 153 154 definition = get_pipe_query(pipe) 155 156 if not pipe.columns.get('datetime', None): 157 _dt = pipe.guess_datetime() 158 dt_name = sql_item_name(_dt, self.flavor, None) if _dt else None 159 is_guess = True 160 else: 161 _dt = pipe.get_columns('datetime') 162 dt_name = sql_item_name(_dt, self.flavor, None) 163 is_guess = False 164 165 if begin not in (None, '') or end is not None: 166 if is_guess: 167 if _dt is None: 168 warn( 169 f"Unable to determine a datetime column for {pipe}." 170 + "\n Ignoring begin and end...", 171 stack = False, 172 ) 173 begin, end = '', None 174 else: 175 warn( 176 f"A datetime wasn't specified for {pipe}.\n" 177 + f" Using column \"{_dt}\" for datetime bounds...", 178 stack = False 179 ) 180 181 apply_backtrack = begin == '' and check_existing 182 backtrack_interval = pipe.get_backtrack_interval(check_existing=check_existing, debug=debug) 183 btm = ( 184 int(backtrack_interval.total_seconds() / 60) 185 if isinstance(backtrack_interval, timedelta) 186 else backtrack_interval 187 ) 188 begin = ( 189 pipe.get_sync_time(debug=debug) 190 if begin == '' 191 else begin 192 ) 193 194 if begin and end and begin >= end: 195 begin = None 196 197 if dt_name: 198 begin_da = ( 199 dateadd_str( 200 flavor=self.flavor, 201 datepart='minute', 202 number=((-1 * btm) if apply_backtrack else 0), 203 begin=begin, 204 ) 205 if begin 206 else None 207 ) 208 end_da = ( 209 dateadd_str( 210 flavor=self.flavor, 211 datepart='minute', 212 number=0, 213 begin=end, 214 ) 215 if end 216 else None 217 ) 218 219 meta_def = ( 220 _simple_fetch_query(pipe, self.flavor) if ( 221 (not (pipe.columns or {}).get('id', None)) 222 or (not get_config('system', 'experimental', 'join_fetch')) 223 ) else _join_fetch_query(pipe, self.flavor, debug=debug, **kw) 224 ) 225 226 has_where = 'where' in meta_def.lower()[meta_def.lower().rfind('definition'):] 227 if dt_name and (begin_da or end_da): 228 definition_dt_name = ( 229 dateadd_str(self.flavor, 'minute', 0, f"definition.{dt_name}") 230 if not is_int((begin_da or end_da)) 231 else f"definition.{dt_name}" 232 ) 233 meta_def += "\n" + ("AND" if has_where else "WHERE") + " " 234 has_where = True 235 if begin_da: 236 meta_def += f"{definition_dt_name} >= {begin_da}" 237 if begin_da and end_da: 238 meta_def += " AND " 239 if end_da: 240 meta_def += f"{definition_dt_name} < {end_da}" 241 242 if params is not None: 243 params_where = build_where(params, self, with_where=False) 244 meta_def += "\n" + ("AND" if has_where else "WHERE") + " " 245 has_where = True 246 meta_def += params_where 247 248 return meta_def
Return a pipe's meta definition fetch query.
params: Optional[Dict[str, Any]], default None
Optional params dictionary to build the WHERE
clause.
See meerschaum.utils.sql.build_where
.
begin: Union[datetime, int, str, None], default None
Most recent datatime to search for data.
If backtrack_minutes
is provided, subtract backtrack_minutes
.
end: Union[datetime, int, str, None], default None
The latest datetime to search for data.
If end
is None
, do not bound
check_existing: bool, default True
If True
, apply the backtrack interval.
debug: bool, default False Verbosity toggle.
Returns
- A pipe's meta definition fetch query string.
35def cli( 36 self, 37 debug: bool = False, 38 ) -> SuccessTuple: 39 """ 40 Launch a subprocess for an interactive CLI. 41 """ 42 from meerschaum.utils.venv import venv_exec 43 env = copy.deepcopy(dict(os.environ)) 44 env[f'MRSM_SQL_{self.label.upper()}'] = json.dumps(self.meta) 45 cli_code = ( 46 "import sys\n" 47 "import meerschaum as mrsm\n" 48 f"conn = mrsm.get_connector('sql:{self.label}')\n" 49 "success, msg = conn._cli_exit()\n" 50 "mrsm.pprint((success, msg))\n" 51 "if not success:\n" 52 " raise Exception(msg)" 53 ) 54 try: 55 _ = venv_exec(cli_code, venv=None, debug=debug, capture_output=False) 56 except Exception as e: 57 return False, f"[{self}] Failed to start CLI:\n{e}" 58 return True, "Success"
Launch a subprocess for an interactive CLI.
144def fetch_pipes_keys( 145 self, 146 connector_keys: Optional[List[str]] = None, 147 metric_keys: Optional[List[str]] = None, 148 location_keys: Optional[List[str]] = None, 149 tags: Optional[List[str]] = None, 150 params: Optional[Dict[str, Any]] = None, 151 debug: bool = False 152) -> Optional[List[Tuple[str, str, Optional[str]]]]: 153 """ 154 Return a list of tuples corresponding to the parameters provided. 155 156 Parameters 157 ---------- 158 connector_keys: Optional[List[str]], default None 159 List of connector_keys to search by. 160 161 metric_keys: Optional[List[str]], default None 162 List of metric_keys to search by. 163 164 location_keys: Optional[List[str]], default None 165 List of location_keys to search by. 166 167 params: Optional[Dict[str, Any]], default None 168 Dictionary of additional parameters to search by. 169 E.g. `--params pipe_id:1` 170 171 debug: bool, default False 172 Verbosity toggle. 173 """ 174 from meerschaum.utils.debug import dprint 175 from meerschaum.utils.packages import attempt_import 176 from meerschaum.utils.misc import separate_negation_values, flatten_list 177 from meerschaum.utils.sql import OMIT_NULLSFIRST_FLAVORS, table_exists 178 from meerschaum.config.static import STATIC_CONFIG 179 import json 180 from copy import deepcopy 181 sqlalchemy, sqlalchemy_sql_functions = attempt_import('sqlalchemy', 'sqlalchemy.sql.functions') 182 coalesce = sqlalchemy_sql_functions.coalesce 183 184 if connector_keys is None: 185 connector_keys = [] 186 if metric_keys is None: 187 metric_keys = [] 188 if location_keys is None: 189 location_keys = [] 190 else: 191 location_keys = [ 192 ( 193 lk 194 if lk not in ('[None]', 'None', 'null') 195 else 'None' 196 ) 197 for lk in location_keys 198 ] 199 if tags is None: 200 tags = [] 201 202 if params is None: 203 params = {} 204 205 ### Add three primary keys to params dictionary 206 ### (separated for convenience of arguments). 207 cols = { 208 'connector_keys': [str(ck) for ck in connector_keys], 209 'metric_key': [str(mk) for mk in metric_keys], 210 'location_key': [str(lk) for lk in location_keys], 211 } 212 213 ### Make deep copy so we don't mutate this somewhere else. 214 parameters = deepcopy(params) 215 for col, vals in cols.items(): 216 if vals not in [[], ['*']]: 217 parameters[col] = vals 218 219 if not table_exists('mrsm_pipes', self, schema=self.instance_schema, debug=debug): 220 return [] 221 222 from meerschaum.connectors.sql.tables import get_tables 223 pipes_tbl = get_tables(mrsm_instance=self, create=False, debug=debug)['pipes'] 224 225 _params = {} 226 for k, v in parameters.items(): 227 _v = json.dumps(v) if isinstance(v, dict) else v 228 _params[k] = _v 229 230 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 231 ### Parse regular params. 232 ### If a param begins with '_', negate it instead. 233 _where = [ 234 ( 235 (coalesce(pipes_tbl.c[key], 'None') == val) 236 if not str(val).startswith(negation_prefix) 237 else (pipes_tbl.c[key] != key) 238 ) for key, val in _params.items() 239 if not isinstance(val, (list, tuple)) and key in pipes_tbl.c 240 ] 241 select_cols = ( 242 [ 243 pipes_tbl.c.connector_keys, 244 pipes_tbl.c.metric_key, 245 pipes_tbl.c.location_key, 246 ] 247 ) 248 249 q = sqlalchemy.select(*select_cols).where(sqlalchemy.and_(True, *_where)) 250 for c, vals in cols.items(): 251 if not isinstance(vals, (list, tuple)) or not vals or not c in pipes_tbl.c: 252 continue 253 _in_vals, _ex_vals = separate_negation_values(vals) 254 q = q.where(coalesce(pipes_tbl.c[c], 'None').in_(_in_vals)) if _in_vals else q 255 q = q.where(coalesce(pipes_tbl.c[c], 'None').not_in(_ex_vals)) if _ex_vals else q 256 257 ### Finally, parse tags. 258 tag_groups = [tag.split(',') for tag in tags] 259 in_ex_tag_groups = [separate_negation_values(tag_group) for tag_group in tag_groups] 260 261 ors, nands = [], [] 262 for _in_tags, _ex_tags in in_ex_tag_groups: 263 sub_ands = [] 264 for nt in _in_tags: 265 sub_ands.append( 266 sqlalchemy.cast( 267 pipes_tbl.c['parameters'], 268 sqlalchemy.String, 269 ).like(f'%"tags":%"{nt}"%') 270 ) 271 if sub_ands: 272 ors.append(sqlalchemy.and_(*sub_ands)) 273 274 for xt in _ex_tags: 275 nands.append( 276 sqlalchemy.cast( 277 pipes_tbl.c['parameters'], 278 sqlalchemy.String, 279 ).not_like(f'%"tags":%"{xt}"%') 280 ) 281 282 q = q.where(sqlalchemy.and_(*nands)) if nands else q 283 q = q.where(sqlalchemy.or_(*ors)) if ors else q 284 loc_asc = sqlalchemy.asc(pipes_tbl.c['location_key']) 285 if self.flavor not in OMIT_NULLSFIRST_FLAVORS: 286 loc_asc = sqlalchemy.nullsfirst(loc_asc) 287 q = q.order_by( 288 sqlalchemy.asc(pipes_tbl.c['connector_keys']), 289 sqlalchemy.asc(pipes_tbl.c['metric_key']), 290 loc_asc, 291 ) 292 293 ### execute the query and return a list of tuples 294 if debug: 295 dprint(q.compile(compile_kwargs={'literal_binds': True})) 296 try: 297 rows = ( 298 self.execute(q).fetchall() 299 if self.flavor != 'duckdb' 300 else [ 301 (row['connector_keys'], row['metric_key'], row['location_key']) 302 for row in self.read(q).to_dict(orient='records') 303 ] 304 ) 305 except Exception as e: 306 error(str(e)) 307 308 return [(row[0], row[1], row[2]) for row in rows]
Return a list of tuples corresponding to the parameters provided.
Parameters
- connector_keys (Optional[List[str]], default None): List of connector_keys to search by.
- metric_keys (Optional[List[str]], default None): List of metric_keys to search by.
- location_keys (Optional[List[str]], default None): List of location_keys to search by.
- params (Optional[Dict[str, Any]], default None):
Dictionary of additional parameters to search by.
E.g.
--params pipe_id:1
- debug (bool, default False): Verbosity toggle.
311def create_indices( 312 self, 313 pipe: mrsm.Pipe, 314 indices: Optional[List[str]] = None, 315 debug: bool = False 316) -> bool: 317 """ 318 Create a pipe's indices. 319 """ 320 from meerschaum.utils.sql import sql_item_name, update_queries 321 from meerschaum.utils.debug import dprint 322 if debug: 323 dprint(f"Creating indices for {pipe}...") 324 if not pipe.indices: 325 warn(f"{pipe} has no index columns; skipping index creation.", stack=False) 326 return True 327 328 _ = pipe.__dict__.pop('_columns_indices', None) 329 ix_queries = { 330 ix: queries 331 for ix, queries in self.get_create_index_queries(pipe, debug=debug).items() 332 if indices is None or ix in indices 333 } 334 success = True 335 for ix, queries in ix_queries.items(): 336 ix_success = all(self.exec_queries(queries, debug=debug, silent=False)) 337 success = success and ix_success 338 if not ix_success: 339 warn(f"Failed to create index on column: {ix}") 340 341 return success
Create a pipe's indices.
344def drop_indices( 345 self, 346 pipe: mrsm.Pipe, 347 indices: Optional[List[str]] = None, 348 debug: bool = False 349) -> bool: 350 """ 351 Drop a pipe's indices. 352 """ 353 from meerschaum.utils.debug import dprint 354 if debug: 355 dprint(f"Dropping indices for {pipe}...") 356 if not pipe.columns: 357 warn(f"Unable to drop indices for {pipe} without columns.", stack=False) 358 return False 359 ix_queries = { 360 ix: queries 361 for ix, queries in self.get_drop_index_queries(pipe, debug=debug).items() 362 if indices is None or ix in indices 363 } 364 success = True 365 for ix, queries in ix_queries.items(): 366 ix_success = all(self.exec_queries(queries, debug=debug, silent=True)) 367 if not ix_success: 368 success = False 369 if debug: 370 dprint(f"Failed to drop index on column: {ix}") 371 return success
Drop a pipe's indices.
374def get_create_index_queries( 375 self, 376 pipe: mrsm.Pipe, 377 debug: bool = False, 378) -> Dict[str, List[str]]: 379 """ 380 Return a dictionary mapping columns to a `CREATE INDEX` or equivalent query. 381 382 Parameters 383 ---------- 384 pipe: mrsm.Pipe 385 The pipe to which the queries will correspond. 386 387 Returns 388 ------- 389 A dictionary of index names mapping to lists of queries. 390 """ 391 ### NOTE: Due to recent breaking changes in DuckDB, indices don't behave properly. 392 if self.flavor == 'duckdb': 393 return {} 394 from meerschaum.utils.sql import ( 395 sql_item_name, 396 get_distinct_col_count, 397 update_queries, 398 get_null_replacement, 399 get_create_table_queries, 400 get_rename_table_queries, 401 COALESCE_UNIQUE_INDEX_FLAVORS, 402 ) 403 from meerschaum.utils.dtypes.sql import ( 404 get_db_type_from_pd_type, 405 get_pd_type_from_db_type, 406 AUTO_INCREMENT_COLUMN_FLAVORS, 407 ) 408 from meerschaum.config import get_config 409 index_queries = {} 410 411 upsert = pipe.parameters.get('upsert', False) and (self.flavor + '-upsert') in update_queries 412 static = pipe.parameters.get('static', False) 413 index_names = pipe.get_indices() 414 indices = pipe.indices 415 existing_cols_types = pipe.get_columns_types(debug=debug) 416 existing_cols_pd_types = { 417 col: get_pd_type_from_db_type(typ) 418 for col, typ in existing_cols_types.items() 419 } 420 existing_cols_indices = self.get_pipe_columns_indices(pipe, debug=debug) 421 existing_ix_names = set() 422 existing_primary_keys = [] 423 for col, col_indices in existing_cols_indices.items(): 424 for col_ix_doc in col_indices: 425 existing_ix_names.add(col_ix_doc.get('name', None)) 426 if col_ix_doc.get('type', None) == 'PRIMARY KEY': 427 existing_primary_keys.append(col) 428 429 _datetime = pipe.get_columns('datetime', error=False) 430 _datetime_name = ( 431 sql_item_name(_datetime, self.flavor, None) 432 if _datetime is not None else None 433 ) 434 _datetime_index_name = ( 435 sql_item_name(index_names['datetime'], flavor=self.flavor, schema=None) 436 if index_names.get('datetime', None) 437 else None 438 ) 439 _id = pipe.get_columns('id', error=False) 440 _id_name = ( 441 sql_item_name(_id, self.flavor, None) 442 if _id is not None 443 else None 444 ) 445 primary_key = pipe.columns.get('primary', None) 446 primary_key_name = ( 447 sql_item_name(primary_key, flavor=self.flavor, schema=None) 448 if primary_key 449 else None 450 ) 451 autoincrement = ( 452 pipe.parameters.get('autoincrement', False) 453 or ( 454 primary_key is not None 455 and primary_key not in existing_cols_pd_types 456 ) 457 ) 458 primary_key_db_type = ( 459 get_db_type_from_pd_type(pipe.dtypes.get(primary_key, 'int'), self.flavor) 460 if primary_key 461 else None 462 ) 463 primary_key_constraint_name = ( 464 sql_item_name(f'pk_{pipe.target}', self.flavor, None) 465 if primary_key is not None 466 else None 467 ) 468 469 _id_index_name = ( 470 sql_item_name(index_names['id'], self.flavor, None) 471 if index_names.get('id', None) 472 else None 473 ) 474 _pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 475 _create_space_partition = get_config('system', 'experimental', 'space') 476 477 ### create datetime index 478 if _datetime is not None: 479 if self.flavor == 'timescaledb' and pipe.parameters.get('hypertable', True): 480 _id_count = ( 481 get_distinct_col_count(_id, f"SELECT {_id_name} FROM {_pipe_name}", self) 482 if (_id is not None and _create_space_partition) else None 483 ) 484 485 chunk_interval = pipe.get_chunk_interval(debug=debug) 486 chunk_interval_minutes = ( 487 chunk_interval 488 if isinstance(chunk_interval, int) 489 else int(chunk_interval.total_seconds() / 60) 490 ) 491 chunk_time_interval = ( 492 f"INTERVAL '{chunk_interval_minutes} MINUTES'" 493 if isinstance(chunk_interval, timedelta) 494 else f'{chunk_interval_minutes}' 495 ) 496 497 dt_query = ( 498 f"SELECT public.create_hypertable('{_pipe_name}', " + 499 f"'{_datetime}', " 500 + ( 501 f"'{_id}', {_id_count}, " if (_id is not None and _create_space_partition) 502 else '' 503 ) 504 + f'chunk_time_interval => {chunk_time_interval}, ' 505 + 'if_not_exists => true, ' 506 + "migrate_data => true);" 507 ) 508 elif self.flavor == 'mssql': 509 dt_query = ( 510 "CREATE " 511 + ("CLUSTERED " if not primary_key else '') 512 + f"INDEX {_datetime_index_name} " 513 + f"ON {_pipe_name} ({_datetime_name})" 514 ) 515 else: ### mssql, sqlite, etc. 516 dt_query = ( 517 f"CREATE INDEX {_datetime_index_name} " 518 + f"ON {_pipe_name} ({_datetime_name})" 519 ) 520 521 index_queries[_datetime] = [dt_query] 522 523 primary_queries = [] 524 if ( 525 primary_key is not None 526 and primary_key not in existing_primary_keys 527 and not static 528 ): 529 if autoincrement and primary_key not in existing_cols_pd_types: 530 autoincrement_str = AUTO_INCREMENT_COLUMN_FLAVORS.get( 531 self.flavor, 532 AUTO_INCREMENT_COLUMN_FLAVORS['default'] 533 ) 534 primary_queries.extend([ 535 ( 536 f"ALTER TABLE {_pipe_name}\n" 537 f"ADD {primary_key_name} {primary_key_db_type} {autoincrement_str}" 538 ), 539 ]) 540 elif not autoincrement and primary_key in existing_cols_pd_types: 541 if self.flavor == 'sqlite': 542 new_table_name = sql_item_name( 543 f'_new_{pipe.target}', 544 self.flavor, 545 self.get_pipe_schema(pipe) 546 ) 547 select_cols_str = ', '.join( 548 [ 549 sql_item_name(col, self.flavor, None) 550 for col in existing_cols_types 551 ] 552 ) 553 primary_queries.extend( 554 get_create_table_queries( 555 existing_cols_pd_types, 556 f'_new_{pipe.target}', 557 self.flavor, 558 schema=self.get_pipe_schema(pipe), 559 primary_key=primary_key, 560 ) + [ 561 ( 562 f"INSERT INTO {new_table_name} ({select_cols_str})\n" 563 f"SELECT {select_cols_str}\nFROM {_pipe_name}" 564 ), 565 f"DROP TABLE {_pipe_name}", 566 ] + get_rename_table_queries( 567 f'_new_{pipe.target}', 568 pipe.target, 569 self.flavor, 570 schema=self.get_pipe_schema(pipe), 571 ) 572 ) 573 elif self.flavor == 'oracle': 574 primary_queries.extend([ 575 ( 576 f"ALTER TABLE {_pipe_name}\n" 577 f"MODIFY {primary_key_name} NOT NULL" 578 ), 579 ( 580 f"ALTER TABLE {_pipe_name}\n" 581 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})" 582 ) 583 ]) 584 elif self.flavor in ('mysql', 'mariadb'): 585 primary_queries.extend([ 586 ( 587 f"ALTER TABLE {_pipe_name}\n" 588 f"MODIFY {primary_key_name} {primary_key_db_type} NOT NULL" 589 ), 590 ( 591 f"ALTER TABLE {_pipe_name}\n" 592 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})" 593 ) 594 ]) 595 elif self.flavor == 'timescaledb': 596 primary_queries.extend([ 597 ( 598 f"ALTER TABLE {_pipe_name}\n" 599 f"ALTER COLUMN {primary_key_name} SET NOT NULL" 600 ), 601 ( 602 f"ALTER TABLE {_pipe_name}\n" 603 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY (" + ( 604 f"{_datetime_name}, " if _datetime_name else "" 605 ) + f"{primary_key_name})" 606 ), 607 ]) 608 elif self.flavor in ('citus', 'postgresql', 'duckdb'): 609 primary_queries.extend([ 610 ( 611 f"ALTER TABLE {_pipe_name}\n" 612 f"ALTER COLUMN {primary_key_name} SET NOT NULL" 613 ), 614 ( 615 f"ALTER TABLE {_pipe_name}\n" 616 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})" 617 ), 618 ]) 619 else: 620 primary_queries.extend([ 621 ( 622 f"ALTER TABLE {_pipe_name}\n" 623 f"ALTER COLUMN {primary_key_name} {primary_key_db_type} NOT NULL" 624 ), 625 ( 626 f"ALTER TABLE {_pipe_name}\n" 627 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})" 628 ), 629 ]) 630 index_queries[primary_key] = primary_queries 631 632 ### create id index 633 if _id_name is not None: 634 if self.flavor == 'timescaledb': 635 ### Already created indices via create_hypertable. 636 id_query = ( 637 None if (_id is not None and _create_space_partition) 638 else ( 639 f"CREATE INDEX IF NOT EXISTS {_id_index_name} ON {_pipe_name} ({_id_name})" 640 if _id is not None 641 else None 642 ) 643 ) 644 pass 645 else: ### mssql, sqlite, etc. 646 id_query = f"CREATE INDEX {_id_index_name} ON {_pipe_name} ({_id_name})" 647 648 if id_query is not None: 649 index_queries[_id] = id_query if isinstance(id_query, list) else [id_query] 650 651 ### Create indices for other labels in `pipe.columns`. 652 other_index_names = { 653 ix_key: ix_unquoted 654 for ix_key, ix_unquoted in index_names.items() 655 if ix_key not in ('datetime', 'id', 'primary') and ix_unquoted not in existing_ix_names 656 } 657 for ix_key, ix_unquoted in other_index_names.items(): 658 ix_name = sql_item_name(ix_unquoted, self.flavor, None) 659 cols = indices[ix_key] 660 if not isinstance(cols, (list, tuple)): 661 cols = [cols] 662 cols_names = [sql_item_name(col, self.flavor, None) for col in cols if col] 663 if not cols_names: 664 continue 665 cols_names_str = ", ".join(cols_names) 666 index_queries[ix_key] = [f"CREATE INDEX {ix_name} ON {_pipe_name} ({cols_names_str})"] 667 668 indices_cols_str = ', '.join( 669 list({ 670 sql_item_name(ix, self.flavor) 671 for ix_key, ix in pipe.columns.items() 672 if ix and ix in existing_cols_types 673 }) 674 ) 675 coalesce_indices_cols_str = ', '.join( 676 [ 677 ( 678 "COALESCE(" 679 + sql_item_name(ix, self.flavor) 680 + ", " 681 + get_null_replacement(existing_cols_types[ix], self.flavor) 682 + ") " 683 ) if ix_key != 'datetime' else (sql_item_name(ix, self.flavor)) 684 for ix_key, ix in pipe.columns.items() 685 if ix and ix in existing_cols_types 686 ] 687 ) 688 unique_index_name = sql_item_name(pipe.target + '_unique_index', self.flavor) 689 constraint_name = sql_item_name(pipe.target + '_constraint', self.flavor) 690 add_constraint_query = ( 691 f"ALTER TABLE {_pipe_name} ADD CONSTRAINT {constraint_name} UNIQUE ({indices_cols_str})" 692 ) 693 unique_index_cols_str = ( 694 indices_cols_str 695 if self.flavor not in COALESCE_UNIQUE_INDEX_FLAVORS 696 else coalesce_indices_cols_str 697 ) 698 create_unique_index_query = ( 699 f"CREATE UNIQUE INDEX {unique_index_name} ON {_pipe_name} ({unique_index_cols_str})" 700 ) 701 constraint_queries = [create_unique_index_query] 702 if self.flavor != 'sqlite': 703 constraint_queries.append(add_constraint_query) 704 if upsert and indices_cols_str: 705 index_queries[unique_index_name] = constraint_queries 706 return index_queries
Return a dictionary mapping columns to a CREATE INDEX
or equivalent query.
Parameters
- pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
- A dictionary of index names mapping to lists of queries.
709def get_drop_index_queries( 710 self, 711 pipe: mrsm.Pipe, 712 debug: bool = False, 713) -> Dict[str, List[str]]: 714 """ 715 Return a dictionary mapping columns to a `DROP INDEX` or equivalent query. 716 717 Parameters 718 ---------- 719 pipe: mrsm.Pipe 720 The pipe to which the queries will correspond. 721 722 Returns 723 ------- 724 A dictionary of column names mapping to lists of queries. 725 """ 726 ### NOTE: Due to breaking changes within DuckDB, indices must be skipped. 727 if self.flavor == 'duckdb': 728 return {} 729 if not pipe.exists(debug=debug): 730 return {} 731 from meerschaum.utils.sql import ( 732 sql_item_name, 733 table_exists, 734 hypertable_queries, 735 DROP_IF_EXISTS_FLAVORS, 736 ) 737 drop_queries = {} 738 schema = self.get_pipe_schema(pipe) 739 schema_prefix = (schema + '_') if schema else '' 740 indices = { 741 col: schema_prefix + ix 742 for col, ix in pipe.get_indices().items() 743 } 744 pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 745 pipe_name_no_schema = sql_item_name(pipe.target, self.flavor, None) 746 747 if self.flavor not in hypertable_queries: 748 is_hypertable = False 749 else: 750 is_hypertable_query = hypertable_queries[self.flavor].format(table_name=pipe_name) 751 is_hypertable = self.value(is_hypertable_query, silent=True, debug=debug) is not None 752 753 if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else "" 754 if is_hypertable: 755 nuke_queries = [] 756 temp_table = '_' + pipe.target + '_temp_migration' 757 temp_table_name = sql_item_name(temp_table, self.flavor, self.get_pipe_schema(pipe)) 758 759 if table_exists(temp_table, self, schema=self.get_pipe_schema(pipe), debug=debug): 760 nuke_queries.append(f"DROP TABLE {if_exists_str} {temp_table_name}") 761 nuke_queries += [ 762 f"SELECT * INTO {temp_table_name} FROM {pipe_name}", 763 f"DROP TABLE {if_exists_str} {pipe_name}", 764 f"ALTER TABLE {temp_table_name} RENAME TO {pipe_name_no_schema}", 765 ] 766 nuke_ix_keys = ('datetime', 'id') 767 nuked = False 768 for ix_key in nuke_ix_keys: 769 if ix_key in indices and not nuked: 770 drop_queries[ix_key] = nuke_queries 771 nuked = True 772 773 drop_queries.update({ 774 ix_key: ["DROP INDEX " + sql_item_name(ix_unquoted, self.flavor, None)] 775 for ix_key, ix_unquoted in indices.items() 776 if ix_key not in drop_queries 777 }) 778 return drop_queries
Return a dictionary mapping columns to a DROP INDEX
or equivalent query.
Parameters
- pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
- A dictionary of column names mapping to lists of queries.
2827def get_add_columns_queries( 2828 self, 2829 pipe: mrsm.Pipe, 2830 df: Union[pd.DataFrame, Dict[str, str]], 2831 _is_db_types: bool = False, 2832 debug: bool = False, 2833) -> List[str]: 2834 """ 2835 Add new null columns of the correct type to a table from a dataframe. 2836 2837 Parameters 2838 ---------- 2839 pipe: mrsm.Pipe 2840 The pipe to be altered. 2841 2842 df: Union[pd.DataFrame, Dict[str, str]] 2843 The pandas DataFrame which contains new columns. 2844 If a dictionary is provided, assume it maps columns to Pandas data types. 2845 2846 _is_db_types: bool, default False 2847 If `True`, assume `df` is a dictionary mapping columns to SQL native dtypes. 2848 2849 Returns 2850 ------- 2851 A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector. 2852 """ 2853 if not pipe.exists(debug=debug): 2854 return [] 2855 2856 if pipe.parameters.get('static', False): 2857 return [] 2858 2859 from decimal import Decimal 2860 import copy 2861 from meerschaum.utils.sql import ( 2862 sql_item_name, 2863 SINGLE_ALTER_TABLE_FLAVORS, 2864 get_table_cols_types, 2865 ) 2866 from meerschaum.utils.dtypes.sql import ( 2867 get_pd_type_from_db_type, 2868 get_db_type_from_pd_type, 2869 ) 2870 from meerschaum.utils.misc import flatten_list 2871 table_obj = self.get_pipe_table(pipe, debug=debug) 2872 is_dask = 'dask' in df.__module__ if not isinstance(df, dict) else False 2873 if is_dask: 2874 df = df.partitions[0].compute() 2875 df_cols_types = ( 2876 { 2877 col: str(typ) 2878 for col, typ in df.dtypes.items() 2879 } 2880 if not isinstance(df, dict) 2881 else copy.deepcopy(df) 2882 ) 2883 if not isinstance(df, dict) and len(df.index) > 0: 2884 for col, typ in list(df_cols_types.items()): 2885 if typ != 'object': 2886 continue 2887 val = df.iloc[0][col] 2888 if isinstance(val, (dict, list)): 2889 df_cols_types[col] = 'json' 2890 elif isinstance(val, Decimal): 2891 df_cols_types[col] = 'numeric' 2892 elif isinstance(val, str): 2893 df_cols_types[col] = 'str' 2894 db_cols_types = { 2895 col: get_pd_type_from_db_type(str(typ.type)) 2896 for col, typ in table_obj.columns.items() 2897 } if table_obj is not None else { 2898 col: get_pd_type_from_db_type(typ) 2899 for col, typ in get_table_cols_types( 2900 pipe.target, 2901 self, 2902 schema=self.get_pipe_schema(pipe), 2903 debug=debug, 2904 ).items() 2905 } 2906 new_cols = set(df_cols_types) - set(db_cols_types) 2907 if not new_cols: 2908 return [] 2909 2910 new_cols_types = { 2911 col: get_db_type_from_pd_type( 2912 df_cols_types[col], 2913 self.flavor 2914 ) for col in new_cols 2915 } 2916 2917 alter_table_query = "ALTER TABLE " + sql_item_name( 2918 pipe.target, self.flavor, self.get_pipe_schema(pipe) 2919 ) 2920 queries = [] 2921 for col, typ in new_cols_types.items(): 2922 add_col_query = ( 2923 "\nADD " 2924 + sql_item_name(col, self.flavor, None) 2925 + " " + typ + "," 2926 ) 2927 2928 if self.flavor in SINGLE_ALTER_TABLE_FLAVORS: 2929 queries.append(alter_table_query + add_col_query[:-1]) 2930 else: 2931 alter_table_query += add_col_query 2932 2933 ### For most flavors, only one query is required. 2934 ### This covers SQLite which requires one query per column. 2935 if not queries: 2936 queries.append(alter_table_query[:-1]) 2937 2938 if self.flavor != 'duckdb': 2939 return queries 2940 2941 ### NOTE: For DuckDB, we must drop and rebuild the indices. 2942 drop_index_queries = list(flatten_list( 2943 [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()] 2944 )) 2945 create_index_queries = list(flatten_list( 2946 [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()] 2947 )) 2948 2949 return drop_index_queries + queries + create_index_queries
Add new null columns of the correct type to a table from a dataframe.
Parameters
- pipe (mrsm.Pipe): The pipe to be altered.
- df (Union[pd.DataFrame, Dict[str, str]]): The pandas DataFrame which contains new columns. If a dictionary is provided, assume it maps columns to Pandas data types.
- _is_db_types (bool, default False):
If
True
, assumedf
is a dictionary mapping columns to SQL native dtypes.
Returns
- A list of the
ALTER TABLE
SQL query or queries to be executed on the provided connector.
2952def get_alter_columns_queries( 2953 self, 2954 pipe: mrsm.Pipe, 2955 df: Union[pd.DataFrame, Dict[str, str]], 2956 debug: bool = False, 2957) -> List[str]: 2958 """ 2959 If we encounter a column of a different type, set the entire column to text. 2960 If the altered columns are numeric, alter to numeric instead. 2961 2962 Parameters 2963 ---------- 2964 pipe: mrsm.Pipe 2965 The pipe to be altered. 2966 2967 df: Union[pd.DataFrame, Dict[str, str]] 2968 The pandas DataFrame which may contain altered columns. 2969 If a dict is provided, assume it maps columns to Pandas data types. 2970 2971 Returns 2972 ------- 2973 A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector. 2974 """ 2975 if not pipe.exists(debug=debug): 2976 return [] 2977 if pipe.static: 2978 return 2979 from meerschaum.utils.sql import sql_item_name, DROP_IF_EXISTS_FLAVORS, get_table_cols_types 2980 from meerschaum.utils.dataframe import get_numeric_cols 2981 from meerschaum.utils.dtypes import are_dtypes_equal 2982 from meerschaum.utils.dtypes.sql import ( 2983 get_pd_type_from_db_type, 2984 get_db_type_from_pd_type, 2985 ) 2986 from meerschaum.utils.misc import flatten_list, generate_password, items_str 2987 table_obj = self.get_pipe_table(pipe, debug=debug) 2988 target = pipe.target 2989 session_id = generate_password(3) 2990 numeric_cols = ( 2991 get_numeric_cols(df) 2992 if not isinstance(df, dict) 2993 else [ 2994 col 2995 for col, typ in df.items() 2996 if typ == 'numeric' 2997 ] 2998 ) 2999 df_cols_types = ( 3000 { 3001 col: str(typ) 3002 for col, typ in df.dtypes.items() 3003 } 3004 if not isinstance(df, dict) 3005 else df 3006 ) 3007 db_cols_types = { 3008 col: get_pd_type_from_db_type(str(typ.type)) 3009 for col, typ in table_obj.columns.items() 3010 } if table_obj is not None else { 3011 col: get_pd_type_from_db_type(typ) 3012 for col, typ in get_table_cols_types( 3013 pipe.target, 3014 self, 3015 schema=self.get_pipe_schema(pipe), 3016 debug=debug, 3017 ).items() 3018 } 3019 pipe_bool_cols = [col for col, typ in pipe.dtypes.items() if are_dtypes_equal(str(typ), 'bool')] 3020 pd_db_df_aliases = { 3021 'int': 'bool', 3022 'float': 'bool', 3023 'numeric': 'bool', 3024 'guid': 'object', 3025 } 3026 if self.flavor == 'oracle': 3027 pd_db_df_aliases['int'] = 'numeric' 3028 3029 altered_cols = { 3030 col: (db_cols_types.get(col, 'object'), typ) 3031 for col, typ in df_cols_types.items() 3032 if not are_dtypes_equal(typ, db_cols_types.get(col, 'object').lower()) 3033 and not are_dtypes_equal(db_cols_types.get(col, 'object'), 'string') 3034 } 3035 3036 ### NOTE: Sometimes bools are coerced into ints or floats. 3037 altered_cols_to_ignore = set() 3038 for col, (db_typ, df_typ) in altered_cols.items(): 3039 for db_alias, df_alias in pd_db_df_aliases.items(): 3040 if db_alias in db_typ.lower() and df_alias in df_typ.lower(): 3041 altered_cols_to_ignore.add(col) 3042 3043 ### Oracle's bool handling sometimes mixes NUMBER and INT. 3044 for bool_col in pipe_bool_cols: 3045 if bool_col not in altered_cols: 3046 continue 3047 db_is_bool_compatible = ( 3048 are_dtypes_equal('int', altered_cols[bool_col][0]) 3049 or are_dtypes_equal('float', altered_cols[bool_col][0]) 3050 or are_dtypes_equal('numeric', altered_cols[bool_col][0]) 3051 or are_dtypes_equal('bool', altered_cols[bool_col][0]) 3052 ) 3053 df_is_bool_compatible = ( 3054 are_dtypes_equal('int', altered_cols[bool_col][1]) 3055 or are_dtypes_equal('float', altered_cols[bool_col][1]) 3056 or are_dtypes_equal('numeric', altered_cols[bool_col][1]) 3057 or are_dtypes_equal('bool', altered_cols[bool_col][1]) 3058 ) 3059 if db_is_bool_compatible and df_is_bool_compatible: 3060 altered_cols_to_ignore.add(bool_col) 3061 3062 for col in altered_cols_to_ignore: 3063 _ = altered_cols.pop(col, None) 3064 if not altered_cols: 3065 return [] 3066 3067 if numeric_cols: 3068 pipe.dtypes.update({col: 'numeric' for col in numeric_cols}) 3069 edit_success, edit_msg = pipe.edit(debug=debug) 3070 if not edit_success: 3071 warn( 3072 f"Failed to update dtypes for numeric columns {items_str(numeric_cols)}:\n" 3073 + f"{edit_msg}" 3074 ) 3075 else: 3076 numeric_cols.extend([col for col, typ in pipe.dtypes.items() if typ == 'numeric']) 3077 3078 numeric_type = get_db_type_from_pd_type('numeric', self.flavor, as_sqlalchemy=False) 3079 text_type = get_db_type_from_pd_type('str', self.flavor, as_sqlalchemy=False) 3080 altered_cols_types = { 3081 col: ( 3082 numeric_type 3083 if col in numeric_cols 3084 else text_type 3085 ) 3086 for col, (db_typ, typ) in altered_cols.items() 3087 } 3088 3089 if self.flavor == 'sqlite': 3090 temp_table_name = '-' + session_id + '_' + target 3091 rename_query = ( 3092 "ALTER TABLE " 3093 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3094 + " RENAME TO " 3095 + sql_item_name(temp_table_name, self.flavor, None) 3096 ) 3097 create_query = ( 3098 "CREATE TABLE " 3099 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3100 + " (\n" 3101 ) 3102 for col_name, col_obj in table_obj.columns.items(): 3103 create_query += ( 3104 sql_item_name(col_name, self.flavor, None) 3105 + " " 3106 + ( 3107 str(col_obj.type) 3108 if col_name not in altered_cols 3109 else altered_cols_types[col_name] 3110 ) 3111 + ",\n" 3112 ) 3113 create_query = create_query[:-2] + "\n)" 3114 3115 insert_query = ( 3116 "INSERT INTO " 3117 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3118 + ' (' 3119 + ', '.join([ 3120 sql_item_name(col_name, self.flavor, None) 3121 for col_name, _ in table_obj.columns.items() 3122 ]) 3123 + ')' 3124 + "\nSELECT\n" 3125 ) 3126 for col_name, col_obj in table_obj.columns.items(): 3127 new_col_str = ( 3128 sql_item_name(col_name, self.flavor, None) 3129 if col_name not in altered_cols 3130 else ( 3131 "CAST(" 3132 + sql_item_name(col_name, self.flavor, None) 3133 + " AS " 3134 + altered_cols_types[col_name] 3135 + ")" 3136 ) 3137 ) 3138 insert_query += new_col_str + ",\n" 3139 insert_query = insert_query[:-2] + ( 3140 f"\nFROM {sql_item_name(temp_table_name, self.flavor, self.get_pipe_schema(pipe))}" 3141 ) 3142 3143 if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else "" 3144 3145 drop_query = f"DROP TABLE {if_exists_str}" + sql_item_name( 3146 temp_table_name, self.flavor, self.get_pipe_schema(pipe) 3147 ) 3148 return [ 3149 rename_query, 3150 create_query, 3151 insert_query, 3152 drop_query, 3153 ] 3154 3155 queries = [] 3156 if self.flavor == 'oracle': 3157 for col, typ in altered_cols_types.items(): 3158 add_query = ( 3159 "ALTER TABLE " 3160 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3161 + "\nADD " + sql_item_name(col + '_temp', self.flavor, None) 3162 + " " + typ 3163 ) 3164 queries.append(add_query) 3165 3166 for col, typ in altered_cols_types.items(): 3167 populate_temp_query = ( 3168 "UPDATE " 3169 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3170 + "\nSET " + sql_item_name(col + '_temp', self.flavor, None) 3171 + ' = ' + sql_item_name(col, self.flavor, None) 3172 ) 3173 queries.append(populate_temp_query) 3174 3175 for col, typ in altered_cols_types.items(): 3176 set_old_cols_to_null_query = ( 3177 "UPDATE " 3178 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3179 + "\nSET " + sql_item_name(col, self.flavor, None) 3180 + ' = NULL' 3181 ) 3182 queries.append(set_old_cols_to_null_query) 3183 3184 for col, typ in altered_cols_types.items(): 3185 alter_type_query = ( 3186 "ALTER TABLE " 3187 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3188 + "\nMODIFY " + sql_item_name(col, self.flavor, None) + ' ' 3189 + typ 3190 ) 3191 queries.append(alter_type_query) 3192 3193 for col, typ in altered_cols_types.items(): 3194 set_old_to_temp_query = ( 3195 "UPDATE " 3196 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3197 + "\nSET " + sql_item_name(col, self.flavor, None) 3198 + ' = ' + sql_item_name(col + '_temp', self.flavor, None) 3199 ) 3200 queries.append(set_old_to_temp_query) 3201 3202 for col, typ in altered_cols_types.items(): 3203 drop_temp_query = ( 3204 "ALTER TABLE " 3205 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3206 + "\nDROP COLUMN " + sql_item_name(col + '_temp', self.flavor, None) 3207 ) 3208 queries.append(drop_temp_query) 3209 3210 return queries 3211 3212 query = "ALTER TABLE " + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3213 for col, typ in altered_cols_types.items(): 3214 alter_col_prefix = ( 3215 'ALTER' if self.flavor not in ('mysql', 'mariadb', 'oracle') 3216 else 'MODIFY' 3217 ) 3218 type_prefix = ( 3219 '' if self.flavor in ('mssql', 'mariadb', 'mysql') 3220 else 'TYPE ' 3221 ) 3222 column_str = 'COLUMN' if self.flavor != 'oracle' else '' 3223 query += ( 3224 f"\n{alter_col_prefix} {column_str} " 3225 + sql_item_name(col, self.flavor, None) 3226 + " " + type_prefix + typ + "," 3227 ) 3228 3229 query = query[:-1] 3230 queries.append(query) 3231 if self.flavor != 'duckdb': 3232 return queries 3233 3234 drop_index_queries = list(flatten_list( 3235 [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()] 3236 )) 3237 create_index_queries = list(flatten_list( 3238 [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()] 3239 )) 3240 3241 return drop_index_queries + queries + create_index_queries
If we encounter a column of a different type, set the entire column to text. If the altered columns are numeric, alter to numeric instead.
Parameters
- pipe (mrsm.Pipe): The pipe to be altered.
- df (Union[pd.DataFrame, Dict[str, str]]): The pandas DataFrame which may contain altered columns. If a dict is provided, assume it maps columns to Pandas data types.
Returns
- A list of the
ALTER TABLE
SQL query or queries to be executed on the provided connector.
781def delete_pipe( 782 self, 783 pipe: mrsm.Pipe, 784 debug: bool = False, 785) -> SuccessTuple: 786 """ 787 Delete a Pipe's registration. 788 """ 789 from meerschaum.utils.sql import sql_item_name 790 from meerschaum.utils.debug import dprint 791 from meerschaum.utils.packages import attempt_import 792 sqlalchemy = attempt_import('sqlalchemy') 793 794 if not pipe.id: 795 return False, f"{pipe} is not registered." 796 797 ### ensure pipes table exists 798 from meerschaum.connectors.sql.tables import get_tables 799 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 800 801 q = sqlalchemy.delete(pipes_tbl).where(pipes_tbl.c.pipe_id == pipe.id) 802 if not self.exec(q, debug=debug): 803 return False, f"Failed to delete registration for {pipe}." 804 805 return True, "Success"
Delete a Pipe's registration.
808def get_pipe_data( 809 self, 810 pipe: mrsm.Pipe, 811 select_columns: Optional[List[str]] = None, 812 omit_columns: Optional[List[str]] = None, 813 begin: Union[datetime, str, None] = None, 814 end: Union[datetime, str, None] = None, 815 params: Optional[Dict[str, Any]] = None, 816 order: str = 'asc', 817 limit: Optional[int] = None, 818 begin_add_minutes: int = 0, 819 end_add_minutes: int = 0, 820 debug: bool = False, 821 **kw: Any 822) -> Union[pd.DataFrame, None]: 823 """ 824 Access a pipe's data from the SQL instance. 825 826 Parameters 827 ---------- 828 pipe: mrsm.Pipe: 829 The pipe to get data from. 830 831 select_columns: Optional[List[str]], default None 832 If provided, only select these given columns. 833 Otherwise select all available columns (i.e. `SELECT *`). 834 835 omit_columns: Optional[List[str]], default None 836 If provided, remove these columns from the selection. 837 838 begin: Union[datetime, str, None], default None 839 If provided, get rows newer than or equal to this value. 840 841 end: Union[datetime, str, None], default None 842 If provided, get rows older than or equal to this value. 843 844 params: Optional[Dict[str, Any]], default None 845 Additional parameters to filter by. 846 See `meerschaum.connectors.sql.build_where`. 847 848 order: Optional[str], default 'asc' 849 The selection order for all of the indices in the query. 850 If `None`, omit the `ORDER BY` clause. 851 852 limit: Optional[int], default None 853 If specified, limit the number of rows retrieved to this value. 854 855 begin_add_minutes: int, default 0 856 The number of minutes to add to the `begin` datetime (i.e. `DATEADD`. 857 858 end_add_minutes: int, default 0 859 The number of minutes to add to the `end` datetime (i.e. `DATEADD`. 860 861 chunksize: Optional[int], default -1 862 The size of dataframe chunks to load into memory. 863 864 debug: bool, default False 865 Verbosity toggle. 866 867 Returns 868 ------- 869 A `pd.DataFrame` of the pipe's data. 870 871 """ 872 import json 873 from meerschaum.utils.sql import sql_item_name 874 from meerschaum.utils.misc import parse_df_datetimes, to_pandas_dtype 875 from meerschaum.utils.packages import import_pandas 876 from meerschaum.utils.dtypes import ( 877 attempt_cast_to_numeric, 878 attempt_cast_to_uuid, 879 are_dtypes_equal, 880 ) 881 from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type 882 pd = import_pandas() 883 is_dask = 'dask' in pd.__name__ 884 885 cols_types = pipe.get_columns_types(debug=debug) 886 dtypes = { 887 **{ 888 p_col: to_pandas_dtype(p_typ) 889 for p_col, p_typ in pipe.dtypes.items() 890 }, 891 **{ 892 col: get_pd_type_from_db_type(typ) 893 for col, typ in cols_types.items() 894 } 895 } 896 if dtypes: 897 if self.flavor == 'sqlite': 898 if not pipe.columns.get('datetime', None): 899 _dt = pipe.guess_datetime() 900 dt = sql_item_name(_dt, self.flavor, None) if _dt else None 901 is_guess = True 902 else: 903 _dt = pipe.get_columns('datetime') 904 dt = sql_item_name(_dt, self.flavor, None) 905 is_guess = False 906 907 if _dt: 908 dt_type = dtypes.get(_dt, 'object').lower() 909 if 'datetime' not in dt_type: 910 if 'int' not in dt_type: 911 dtypes[_dt] = 'datetime64[ns, UTC]' 912 existing_cols = pipe.get_columns_types(debug=debug) 913 select_columns = ( 914 [ 915 col 916 for col in existing_cols 917 if col not in (omit_columns or []) 918 ] 919 if not select_columns 920 else [ 921 col 922 for col in select_columns 923 if col in existing_cols 924 and col not in (omit_columns or []) 925 ] 926 ) 927 if select_columns: 928 dtypes = {col: typ for col, typ in dtypes.items() if col in select_columns} 929 dtypes = { 930 col: to_pandas_dtype(typ) 931 for col, typ in dtypes.items() 932 if col in select_columns and col not in (omit_columns or []) 933 } 934 query = self.get_pipe_data_query( 935 pipe, 936 select_columns=select_columns, 937 omit_columns=omit_columns, 938 begin=begin, 939 end=end, 940 params=params, 941 order=order, 942 limit=limit, 943 begin_add_minutes=begin_add_minutes, 944 end_add_minutes=end_add_minutes, 945 debug=debug, 946 **kw 947 ) 948 949 if is_dask: 950 index_col = pipe.columns.get('datetime', None) 951 kw['index_col'] = index_col 952 953 numeric_columns = [ 954 col 955 for col, typ in pipe.dtypes.items() 956 if typ == 'numeric' and col in dtypes 957 ] 958 uuid_columns = [ 959 col 960 for col, typ in pipe.dtypes.items() 961 if typ == 'uuid' and col in dtypes 962 ] 963 964 kw['coerce_float'] = kw.get('coerce_float', (len(numeric_columns) == 0)) 965 966 df = self.read( 967 query, 968 dtype=dtypes, 969 debug=debug, 970 **kw 971 ) 972 for col in numeric_columns: 973 if col not in df.columns: 974 continue 975 df[col] = df[col].apply(attempt_cast_to_numeric) 976 977 for col in uuid_columns: 978 if col not in df.columns: 979 continue 980 df[col] = df[col].apply(attempt_cast_to_uuid) 981 982 if self.flavor == 'sqlite': 983 ignore_dt_cols = [ 984 col 985 for col, dtype in pipe.dtypes.items() 986 if not are_dtypes_equal(str(dtype), 'datetime') 987 ] 988 ### NOTE: We have to consume the iterator here to ensure that datetimes are parsed correctly 989 df = ( 990 parse_df_datetimes( 991 df, 992 ignore_cols=ignore_dt_cols, 993 chunksize=kw.get('chunksize', None), 994 strip_timezone=(pipe.tzinfo is None), 995 debug=debug, 996 ) if isinstance(df, pd.DataFrame) else ( 997 [ 998 parse_df_datetimes( 999 c, 1000 ignore_cols=ignore_dt_cols, 1001 chunksize=kw.get('chunksize', None), 1002 strip_timezone=(pipe.tzinfo is None), 1003 debug=debug, 1004 ) 1005 for c in df 1006 ] 1007 ) 1008 ) 1009 for col, typ in dtypes.items(): 1010 if typ != 'json': 1011 continue 1012 df[col] = df[col].apply(lambda x: json.loads(x) if x is not None else x) 1013 return df
Access a pipe's data from the SQL instance.
Parameters
- pipe (mrsm.Pipe:): The pipe to get data from.
- select_columns (Optional[List[str]], default None):
If provided, only select these given columns.
Otherwise select all available columns (i.e.
SELECT *
). - omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
- begin (Union[datetime, str, None], default None): If provided, get rows newer than or equal to this value.
- end (Union[datetime, str, None], default None): If provided, get rows older than or equal to this value.
- params (Optional[Dict[str, Any]], default None):
Additional parameters to filter by.
See
meerschaum.connectors.sql.build_where
. - order (Optional[str], default 'asc'):
The selection order for all of the indices in the query.
If
None
, omit theORDER BY
clause. - limit (Optional[int], default None): If specified, limit the number of rows retrieved to this value.
- begin_add_minutes (int, default 0):
The number of minutes to add to the
begin
datetime (i.e.DATEADD
. - end_add_minutes (int, default 0):
The number of minutes to add to the
end
datetime (i.e.DATEADD
. - chunksize (Optional[int], default -1): The size of dataframe chunks to load into memory.
- debug (bool, default False): Verbosity toggle.
Returns
- A
pd.DataFrame
of the pipe's data.
1016def get_pipe_data_query( 1017 self, 1018 pipe: mrsm.Pipe, 1019 select_columns: Optional[List[str]] = None, 1020 omit_columns: Optional[List[str]] = None, 1021 begin: Union[datetime, int, str, None] = None, 1022 end: Union[datetime, int, str, None] = None, 1023 params: Optional[Dict[str, Any]] = None, 1024 order: Optional[str] = 'asc', 1025 sort_datetimes: bool = False, 1026 limit: Optional[int] = None, 1027 begin_add_minutes: int = 0, 1028 end_add_minutes: int = 0, 1029 replace_nulls: Optional[str] = None, 1030 skip_existing_cols_check: bool = False, 1031 debug: bool = False, 1032 **kw: Any 1033) -> Union[str, None]: 1034 """ 1035 Return the `SELECT` query for retrieving a pipe's data from its instance. 1036 1037 Parameters 1038 ---------- 1039 pipe: mrsm.Pipe: 1040 The pipe to get data from. 1041 1042 select_columns: Optional[List[str]], default None 1043 If provided, only select these given columns. 1044 Otherwise select all available columns (i.e. `SELECT *`). 1045 1046 omit_columns: Optional[List[str]], default None 1047 If provided, remove these columns from the selection. 1048 1049 begin: Union[datetime, int, str, None], default None 1050 If provided, get rows newer than or equal to this value. 1051 1052 end: Union[datetime, str, None], default None 1053 If provided, get rows older than or equal to this value. 1054 1055 params: Optional[Dict[str, Any]], default None 1056 Additional parameters to filter by. 1057 See `meerschaum.connectors.sql.build_where`. 1058 1059 order: Optional[str], default None 1060 The selection order for all of the indices in the query. 1061 If `None`, omit the `ORDER BY` clause. 1062 1063 sort_datetimes: bool, default False 1064 Alias for `order='desc'`. 1065 1066 limit: Optional[int], default None 1067 If specified, limit the number of rows retrieved to this value. 1068 1069 begin_add_minutes: int, default 0 1070 The number of minutes to add to the `begin` datetime (i.e. `DATEADD`). 1071 1072 end_add_minutes: int, default 0 1073 The number of minutes to add to the `end` datetime (i.e. `DATEADD`). 1074 1075 chunksize: Optional[int], default -1 1076 The size of dataframe chunks to load into memory. 1077 1078 replace_nulls: Optional[str], default None 1079 If provided, replace null values with this value. 1080 1081 skip_existing_cols_check: bool, default False 1082 If `True`, do not verify that querying columns are actually on the table. 1083 1084 debug: bool, default False 1085 Verbosity toggle. 1086 1087 Returns 1088 ------- 1089 A `SELECT` query to retrieve a pipe's data. 1090 """ 1091 from meerschaum.utils.misc import items_str 1092 from meerschaum.utils.sql import sql_item_name, dateadd_str 1093 from meerschaum.utils.dtypes import coerce_timezone 1094 from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type 1095 1096 dt_col = pipe.columns.get('datetime', None) 1097 existing_cols = pipe.get_columns_types(debug=debug) 1098 dt_typ = get_pd_type_from_db_type(existing_cols[dt_col]) if dt_col in existing_cols else None 1099 select_columns = ( 1100 [col for col in existing_cols] 1101 if not select_columns 1102 else [col for col in select_columns if col in existing_cols or skip_existing_cols_check] 1103 ) 1104 if omit_columns: 1105 select_columns = [col for col in select_columns if col not in omit_columns] 1106 1107 if order is None and sort_datetimes: 1108 order = 'desc' 1109 1110 if begin == '': 1111 begin = pipe.get_sync_time(debug=debug) 1112 backtrack_interval = pipe.get_backtrack_interval(debug=debug) 1113 if begin is not None: 1114 begin -= backtrack_interval 1115 1116 begin, end = pipe.parse_date_bounds(begin, end) 1117 if isinstance(begin, datetime) and dt_typ: 1118 begin = coerce_timezone(begin, strip_utc=('utc' not in dt_typ.lower())) 1119 if isinstance(end, datetime) and dt_typ: 1120 end = coerce_timezone(end, strip_utc=('utc' not in dt_typ.lower())) 1121 1122 cols_names = [ 1123 sql_item_name(col, self.flavor, None) 1124 for col in select_columns 1125 ] 1126 select_cols_str = ( 1127 'SELECT\n ' 1128 + ',\n '.join( 1129 [ 1130 ( 1131 col_name 1132 if not replace_nulls 1133 else f"COALESCE(col_name, '{replace_nulls}') AS {col_name}" 1134 ) 1135 for col_name in cols_names 1136 ] 1137 ) 1138 ) if cols_names else 'SELECT *' 1139 pipe_table_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 1140 query = f"{select_cols_str}\nFROM {pipe_table_name}" 1141 where = "" 1142 1143 if order is not None: 1144 default_order = 'asc' 1145 if order not in ('asc', 'desc'): 1146 warn(f"Ignoring unsupported order '{order}'. Falling back to '{default_order}'.") 1147 order = default_order 1148 order = order.upper() 1149 1150 if not pipe.columns.get('datetime', None): 1151 _dt = pipe.guess_datetime() 1152 dt = sql_item_name(_dt, self.flavor, None) if _dt else None 1153 is_guess = True 1154 else: 1155 _dt = pipe.get_columns('datetime') 1156 dt = sql_item_name(_dt, self.flavor, None) 1157 is_guess = False 1158 1159 quoted_indices = { 1160 key: sql_item_name(val, self.flavor, None) 1161 for key, val in pipe.columns.items() 1162 if val in existing_cols or skip_existing_cols_check 1163 } 1164 1165 if begin is not None or end is not None: 1166 if is_guess: 1167 if _dt is None: 1168 warn( 1169 f"No datetime could be determined for {pipe}." 1170 + "\n Ignoring begin and end...", 1171 stack=False, 1172 ) 1173 begin, end = None, None 1174 else: 1175 warn( 1176 f"A datetime wasn't specified for {pipe}.\n" 1177