meerschaum.connectors.sql
Subpackage for SQLConnector subclass
20class SQLConnector(InstanceConnector): 21 """ 22 Connect to SQL databases via `sqlalchemy`. 23 24 SQLConnectors may be used as Meerschaum instance connectors. 25 Read more about connectors and instances at 26 https://meerschaum.io/reference/connectors/ 27 28 """ 29 30 from ._create_engine import flavor_configs, create_engine 31 from ._sql import ( 32 read, 33 value, 34 exec, 35 execute, 36 to_sql, 37 exec_queries, 38 get_connection, 39 _cleanup_connections, 40 _init_geopackage_table, 41 ) 42 from meerschaum.utils.sql import test_connection 43 from ._fetch import fetch, get_pipe_metadef 44 from ._cli import cli, _cli_exit 45 from ._pipes import ( 46 fetch_pipes_keys, 47 create_indices, 48 drop_indices, 49 get_create_index_queries, 50 get_drop_index_queries, 51 get_add_columns_queries, 52 get_alter_columns_queries, 53 delete_pipe, 54 get_pipe_data, 55 get_pipe_data_query, 56 register_pipe, 57 edit_pipe, 58 get_pipe_id, 59 get_pipe_attributes, 60 sync_pipe, 61 sync_pipe_inplace, 62 get_sync_time, 63 pipe_exists, 64 get_pipe_rowcount, 65 drop_pipe, 66 clear_pipe, 67 deduplicate_pipe, 68 get_pipe_table, 69 get_pipe_columns_types, 70 get_to_sql_dtype, 71 get_pipe_schema, 72 create_pipe_table_from_df, 73 get_pipe_columns_indices, 74 get_temporary_target, 75 create_pipe_indices, 76 drop_pipe_indices, 77 get_pipe_index_names, 78 ) 79 from ._plugins import ( 80 get_plugins_pipe, 81 register_plugin, 82 delete_plugin, 83 get_plugin_id, 84 get_plugin_version, 85 get_plugins, 86 get_plugin_user_id, 87 get_plugin_username, 88 get_plugin_attributes, 89 ) 90 from ._users import ( 91 get_users_pipe, 92 register_user, 93 get_user_id, 94 get_users, 95 edit_user, 96 delete_user, 97 get_user_password_hash, 98 get_user_type, 99 get_user_attributes, 100 ) 101 from ._uri import from_uri, parse_uri 102 from ._instance import ( 103 _log_temporary_tables_creation, 104 _drop_temporary_table, 105 _drop_temporary_tables, 106 _drop_old_temporary_tables, 107 ) 108 109 def __init__( 110 self, 111 label: Optional[str] = None, 112 flavor: Optional[str] = None, 113 wait: bool = False, 114 connect: bool = False, 115 debug: bool = False, 116 **kw: Any 117 ): 118 """ 119 Parameters 120 ---------- 121 label: str, default 'main' 122 The identifying label for the connector. 123 E.g. for `sql:main`, 'main' is the label. 124 Defaults to 'main'. 125 126 flavor: Optional[str], default None 127 The database flavor, e.g. 128 `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc. 129 To see supported flavors, run the `bootstrap connectors` command. 130 131 wait: bool, default False 132 If `True`, block until a database connection has been made. 133 Defaults to `False`. 134 135 connect: bool, default False 136 If `True`, immediately attempt to connect the database and raise 137 a warning if the connection fails. 138 Defaults to `False`. 139 140 debug: bool, default False 141 Verbosity toggle. 142 Defaults to `False`. 143 144 kw: Any 145 All other arguments will be passed to the connector's attributes. 146 Therefore, a connector may be made without being registered, 147 as long enough parameters are supplied to the constructor. 148 """ 149 if 'uri' in kw: 150 uri = kw['uri'] 151 if uri.startswith('postgres') and not uri.startswith('postgresql'): 152 uri = uri.replace('postgres', 'postgresql', 1) 153 if uri.startswith('postgresql') and not uri.startswith('postgresql+'): 154 uri = uri.replace('postgresql://', 'postgresql+psycopg://', 1) 155 if uri.startswith('timescaledb://'): 156 uri = uri.replace('timescaledb://', 'postgresql+psycopg://', 1) 157 flavor = 'timescaledb' 158 if uri.startswith('timescaledb-ha://'): 159 uri = uri.replace('timescaledb-ha://', 'postgresql+psycopg://', 1) 160 flavor = 'timescaledb-ha' 161 if uri.startswith('postgis://'): 162 uri = uri.replace('postgis://', 'postgresql+psycopg://', 1) 163 flavor = 'postgis' 164 kw['uri'] = uri 165 from_uri_params = self.from_uri(kw['uri'], as_dict=True) 166 label = label or from_uri_params.get('label', None) 167 _ = from_uri_params.pop('label', None) 168 169 ### Sometimes the flavor may be provided with a URI. 170 kw.update(from_uri_params) 171 if flavor: 172 kw['flavor'] = flavor 173 174 ### set __dict__ in base class 175 super().__init__( 176 'sql', 177 label = label or self.__dict__.get('label', None), 178 **kw 179 ) 180 181 if self.__dict__.get('flavor', None) in ('sqlite', 'geopackage'): 182 self._reset_attributes() 183 self._set_attributes( 184 'sql', 185 label = label, 186 inherit_default = False, 187 **kw 188 ) 189 ### For backwards compatability reasons, set the path for sql:local if its missing. 190 if self.label == 'local' and not self.__dict__.get('database', None): 191 from meerschaum.config._paths import SQLITE_DB_PATH 192 self.database = SQLITE_DB_PATH.as_posix() 193 194 ### ensure flavor and label are set accordingly 195 if 'flavor' not in self.__dict__: 196 if flavor is None and 'uri' not in self.__dict__: 197 raise ValueError( 198 f" Missing flavor. Provide flavor as a key for '{self}'." 199 ) 200 self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None) 201 202 if self.flavor == 'postgres': 203 self.flavor = 'postgresql' 204 205 self._debug = debug 206 ### Store the PID and thread at initialization 207 ### so we can dispose of the Pool in child processes or threads. 208 import os 209 import threading 210 self._pid = os.getpid() 211 self._thread_ident = threading.current_thread().ident 212 self._sessions = {} 213 self._locks = {'_sessions': threading.RLock(), } 214 215 ### verify the flavor's requirements are met 216 if self.flavor not in self.flavor_configs: 217 error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector") 218 if not self.__dict__.get('uri'): 219 self.verify_attributes( 220 self.flavor_configs[self.flavor].get('requirements', set()), 221 debug=debug, 222 ) 223 224 if wait: 225 from meerschaum.connectors.poll import retry_connect 226 retry_connect(connector=self, debug=debug) 227 228 if connect: 229 if not self.test_connection(debug=debug): 230 warn(f"Failed to connect with connector '{self}'!", stack=False) 231 232 @property 233 def Session(self): 234 if '_Session' not in self.__dict__: 235 if self.engine is None: 236 return None 237 238 from meerschaum.utils.packages import attempt_import 239 sqlalchemy_orm = attempt_import('sqlalchemy.orm', lazy=False) 240 session_factory = sqlalchemy_orm.sessionmaker(self.engine) 241 self._Session = sqlalchemy_orm.scoped_session(session_factory) 242 243 return self._Session 244 245 @property 246 def engine(self): 247 """ 248 Return the SQLAlchemy engine connected to the configured database. 249 """ 250 import os 251 import threading 252 if '_engine' not in self.__dict__: 253 self._engine, self._engine_str = self.create_engine(include_uri=True) 254 255 same_process = os.getpid() == self._pid 256 same_thread = threading.current_thread().ident == self._thread_ident 257 258 ### handle child processes 259 if not same_process: 260 self._pid = os.getpid() 261 self._thread = threading.current_thread() 262 warn("Different PID detected. Disposing of connections...") 263 self._engine.dispose() 264 265 ### handle different threads 266 if not same_thread: 267 if self.flavor == 'duckdb': 268 warn("Different thread detected.") 269 self._engine.dispose() 270 271 return self._engine 272 273 @property 274 def DATABASE_URL(self) -> str: 275 """ 276 Return the URI connection string (alias for `SQLConnector.URI`. 277 """ 278 _ = self.engine 279 return str(self._engine_str) 280 281 @property 282 def URI(self) -> str: 283 """ 284 Return the URI connection string. 285 """ 286 _ = self.engine 287 return str(self._engine_str) 288 289 @property 290 def IS_THREAD_SAFE(self) -> str: 291 """ 292 Return whether this connector may be multithreaded. 293 """ 294 if self.flavor in ('duckdb', 'oracle'): 295 return False 296 if self.flavor in ('sqlite', 'geopackage'): 297 return ':memory:' not in self.URI 298 return True 299 300 @property 301 def metadata(self): 302 """ 303 Return the metadata bound to this configured schema. 304 """ 305 from meerschaum.utils.packages import attempt_import 306 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 307 if '_metadata' not in self.__dict__: 308 self._metadata = sqlalchemy.MetaData(schema=self.schema) 309 return self._metadata 310 311 @property 312 def instance_schema(self): 313 """ 314 Return the schema name for Meerschaum tables. 315 """ 316 return self.schema 317 318 @property 319 def internal_schema(self): 320 """ 321 Return the schema name for internal tables. 322 """ 323 from meerschaum._internal.static import STATIC_CONFIG 324 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 325 schema_name = self.__dict__.get('internal_schema', None) or ( 326 STATIC_CONFIG['sql']['internal_schema'] 327 if self.flavor not in NO_SCHEMA_FLAVORS 328 else self.schema 329 ) 330 331 if '_internal_schema' not in self.__dict__: 332 self._internal_schema = schema_name 333 return self._internal_schema 334 335 @property 336 def db(self) -> Optional[databases.Database]: 337 from meerschaum.utils.packages import attempt_import 338 databases = attempt_import('databases', lazy=False, install=True) 339 url = self.DATABASE_URL 340 if 'mysql' in url: 341 url = url.replace('+pymysql', '') 342 if '_db' not in self.__dict__: 343 try: 344 self._db = databases.Database(url) 345 except KeyError: 346 ### Likely encountered an unsupported flavor. 347 from meerschaum.utils.warnings import warn 348 self._db = None 349 return self._db 350 351 @property 352 def db_version(self) -> Union[str, None]: 353 """ 354 Return the database version. 355 """ 356 _db_version = self.__dict__.get('_db_version', None) 357 if _db_version is not None: 358 return _db_version 359 360 from meerschaum.utils.sql import get_db_version 361 self._db_version = get_db_version(self) 362 return self._db_version 363 364 @property 365 def schema(self) -> Union[str, None]: 366 """ 367 Return the default schema to use. 368 A value of `None` will not prepend a schema. 369 """ 370 if 'schema' in self.__dict__: 371 return self.__dict__['schema'] 372 373 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 374 if self.flavor in NO_SCHEMA_FLAVORS: 375 self.__dict__['schema'] = None 376 return None 377 378 sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False) 379 _schema = sqlalchemy.inspect(self.engine).default_schema_name 380 self.__dict__['schema'] = _schema 381 return _schema 382 383 def get_metadata_cache_path(self, kind: str = 'json') -> pathlib.Path: 384 """ 385 Return the path to the file to which to write metadata cache. 386 """ 387 from meerschaum.config.paths import SQL_CONN_CACHE_RESOURCES_PATH 388 filename = ( 389 f'{self.label}-metadata.pkl' 390 if kind == 'pkl' 391 else f'{self.label}.json' 392 ) 393 return SQL_CONN_CACHE_RESOURCES_PATH / filename 394 395 def __getstate__(self): 396 return self.__dict__ 397 398 def __setstate__(self, d): 399 self.__dict__.update(d) 400 401 def __call__(self): 402 return self
Connect to SQL databases via sqlalchemy
.
SQLConnectors may be used as Meerschaum instance connectors. Read more about connectors and instances at https://meerschaum.io/reference/connectors/
109 def __init__( 110 self, 111 label: Optional[str] = None, 112 flavor: Optional[str] = None, 113 wait: bool = False, 114 connect: bool = False, 115 debug: bool = False, 116 **kw: Any 117 ): 118 """ 119 Parameters 120 ---------- 121 label: str, default 'main' 122 The identifying label for the connector. 123 E.g. for `sql:main`, 'main' is the label. 124 Defaults to 'main'. 125 126 flavor: Optional[str], default None 127 The database flavor, e.g. 128 `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc. 129 To see supported flavors, run the `bootstrap connectors` command. 130 131 wait: bool, default False 132 If `True`, block until a database connection has been made. 133 Defaults to `False`. 134 135 connect: bool, default False 136 If `True`, immediately attempt to connect the database and raise 137 a warning if the connection fails. 138 Defaults to `False`. 139 140 debug: bool, default False 141 Verbosity toggle. 142 Defaults to `False`. 143 144 kw: Any 145 All other arguments will be passed to the connector's attributes. 146 Therefore, a connector may be made without being registered, 147 as long enough parameters are supplied to the constructor. 148 """ 149 if 'uri' in kw: 150 uri = kw['uri'] 151 if uri.startswith('postgres') and not uri.startswith('postgresql'): 152 uri = uri.replace('postgres', 'postgresql', 1) 153 if uri.startswith('postgresql') and not uri.startswith('postgresql+'): 154 uri = uri.replace('postgresql://', 'postgresql+psycopg://', 1) 155 if uri.startswith('timescaledb://'): 156 uri = uri.replace('timescaledb://', 'postgresql+psycopg://', 1) 157 flavor = 'timescaledb' 158 if uri.startswith('timescaledb-ha://'): 159 uri = uri.replace('timescaledb-ha://', 'postgresql+psycopg://', 1) 160 flavor = 'timescaledb-ha' 161 if uri.startswith('postgis://'): 162 uri = uri.replace('postgis://', 'postgresql+psycopg://', 1) 163 flavor = 'postgis' 164 kw['uri'] = uri 165 from_uri_params = self.from_uri(kw['uri'], as_dict=True) 166 label = label or from_uri_params.get('label', None) 167 _ = from_uri_params.pop('label', None) 168 169 ### Sometimes the flavor may be provided with a URI. 170 kw.update(from_uri_params) 171 if flavor: 172 kw['flavor'] = flavor 173 174 ### set __dict__ in base class 175 super().__init__( 176 'sql', 177 label = label or self.__dict__.get('label', None), 178 **kw 179 ) 180 181 if self.__dict__.get('flavor', None) in ('sqlite', 'geopackage'): 182 self._reset_attributes() 183 self._set_attributes( 184 'sql', 185 label = label, 186 inherit_default = False, 187 **kw 188 ) 189 ### For backwards compatability reasons, set the path for sql:local if its missing. 190 if self.label == 'local' and not self.__dict__.get('database', None): 191 from meerschaum.config._paths import SQLITE_DB_PATH 192 self.database = SQLITE_DB_PATH.as_posix() 193 194 ### ensure flavor and label are set accordingly 195 if 'flavor' not in self.__dict__: 196 if flavor is None and 'uri' not in self.__dict__: 197 raise ValueError( 198 f" Missing flavor. Provide flavor as a key for '{self}'." 199 ) 200 self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None) 201 202 if self.flavor == 'postgres': 203 self.flavor = 'postgresql' 204 205 self._debug = debug 206 ### Store the PID and thread at initialization 207 ### so we can dispose of the Pool in child processes or threads. 208 import os 209 import threading 210 self._pid = os.getpid() 211 self._thread_ident = threading.current_thread().ident 212 self._sessions = {} 213 self._locks = {'_sessions': threading.RLock(), } 214 215 ### verify the flavor's requirements are met 216 if self.flavor not in self.flavor_configs: 217 error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector") 218 if not self.__dict__.get('uri'): 219 self.verify_attributes( 220 self.flavor_configs[self.flavor].get('requirements', set()), 221 debug=debug, 222 ) 223 224 if wait: 225 from meerschaum.connectors.poll import retry_connect 226 retry_connect(connector=self, debug=debug) 227 228 if connect: 229 if not self.test_connection(debug=debug): 230 warn(f"Failed to connect with connector '{self}'!", stack=False)
Parameters
- label (str, default 'main'):
The identifying label for the connector.
E.g. for
sql:main
, 'main' is the label. Defaults to 'main'. - flavor (Optional[str], default None):
The database flavor, e.g.
'sqlite'
,'postgresql'
,'cockroachdb'
, etc. To see supported flavors, run thebootstrap connectors
command. - wait (bool, default False):
If
True
, block until a database connection has been made. Defaults toFalse
. - connect (bool, default False):
If
True
, immediately attempt to connect the database and raise a warning if the connection fails. Defaults toFalse
. - debug (bool, default False):
Verbosity toggle.
Defaults to
False
. - kw (Any): All other arguments will be passed to the connector's attributes. Therefore, a connector may be made without being registered, as long enough parameters are supplied to the constructor.
232 @property 233 def Session(self): 234 if '_Session' not in self.__dict__: 235 if self.engine is None: 236 return None 237 238 from meerschaum.utils.packages import attempt_import 239 sqlalchemy_orm = attempt_import('sqlalchemy.orm', lazy=False) 240 session_factory = sqlalchemy_orm.sessionmaker(self.engine) 241 self._Session = sqlalchemy_orm.scoped_session(session_factory) 242 243 return self._Session
245 @property 246 def engine(self): 247 """ 248 Return the SQLAlchemy engine connected to the configured database. 249 """ 250 import os 251 import threading 252 if '_engine' not in self.__dict__: 253 self._engine, self._engine_str = self.create_engine(include_uri=True) 254 255 same_process = os.getpid() == self._pid 256 same_thread = threading.current_thread().ident == self._thread_ident 257 258 ### handle child processes 259 if not same_process: 260 self._pid = os.getpid() 261 self._thread = threading.current_thread() 262 warn("Different PID detected. Disposing of connections...") 263 self._engine.dispose() 264 265 ### handle different threads 266 if not same_thread: 267 if self.flavor == 'duckdb': 268 warn("Different thread detected.") 269 self._engine.dispose() 270 271 return self._engine
Return the SQLAlchemy engine connected to the configured database.
273 @property 274 def DATABASE_URL(self) -> str: 275 """ 276 Return the URI connection string (alias for `SQLConnector.URI`. 277 """ 278 _ = self.engine 279 return str(self._engine_str)
Return the URI connection string (alias for SQLConnector.URI
.
281 @property 282 def URI(self) -> str: 283 """ 284 Return the URI connection string. 285 """ 286 _ = self.engine 287 return str(self._engine_str)
Return the URI connection string.
289 @property 290 def IS_THREAD_SAFE(self) -> str: 291 """ 292 Return whether this connector may be multithreaded. 293 """ 294 if self.flavor in ('duckdb', 'oracle'): 295 return False 296 if self.flavor in ('sqlite', 'geopackage'): 297 return ':memory:' not in self.URI 298 return True
Return whether this connector may be multithreaded.
300 @property 301 def metadata(self): 302 """ 303 Return the metadata bound to this configured schema. 304 """ 305 from meerschaum.utils.packages import attempt_import 306 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 307 if '_metadata' not in self.__dict__: 308 self._metadata = sqlalchemy.MetaData(schema=self.schema) 309 return self._metadata
Return the metadata bound to this configured schema.
311 @property 312 def instance_schema(self): 313 """ 314 Return the schema name for Meerschaum tables. 315 """ 316 return self.schema
Return the schema name for Meerschaum tables.
318 @property 319 def internal_schema(self): 320 """ 321 Return the schema name for internal tables. 322 """ 323 from meerschaum._internal.static import STATIC_CONFIG 324 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 325 schema_name = self.__dict__.get('internal_schema', None) or ( 326 STATIC_CONFIG['sql']['internal_schema'] 327 if self.flavor not in NO_SCHEMA_FLAVORS 328 else self.schema 329 ) 330 331 if '_internal_schema' not in self.__dict__: 332 self._internal_schema = schema_name 333 return self._internal_schema
Return the schema name for internal tables.
335 @property 336 def db(self) -> Optional[databases.Database]: 337 from meerschaum.utils.packages import attempt_import 338 databases = attempt_import('databases', lazy=False, install=True) 339 url = self.DATABASE_URL 340 if 'mysql' in url: 341 url = url.replace('+pymysql', '') 342 if '_db' not in self.__dict__: 343 try: 344 self._db = databases.Database(url) 345 except KeyError: 346 ### Likely encountered an unsupported flavor. 347 from meerschaum.utils.warnings import warn 348 self._db = None 349 return self._db
351 @property 352 def db_version(self) -> Union[str, None]: 353 """ 354 Return the database version. 355 """ 356 _db_version = self.__dict__.get('_db_version', None) 357 if _db_version is not None: 358 return _db_version 359 360 from meerschaum.utils.sql import get_db_version 361 self._db_version = get_db_version(self) 362 return self._db_version
Return the database version.
364 @property 365 def schema(self) -> Union[str, None]: 366 """ 367 Return the default schema to use. 368 A value of `None` will not prepend a schema. 369 """ 370 if 'schema' in self.__dict__: 371 return self.__dict__['schema'] 372 373 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 374 if self.flavor in NO_SCHEMA_FLAVORS: 375 self.__dict__['schema'] = None 376 return None 377 378 sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False) 379 _schema = sqlalchemy.inspect(self.engine).default_schema_name 380 self.__dict__['schema'] = _schema 381 return _schema
Return the default schema to use.
A value of None
will not prepend a schema.
383 def get_metadata_cache_path(self, kind: str = 'json') -> pathlib.Path: 384 """ 385 Return the path to the file to which to write metadata cache. 386 """ 387 from meerschaum.config.paths import SQL_CONN_CACHE_RESOURCES_PATH 388 filename = ( 389 f'{self.label}-metadata.pkl' 390 if kind == 'pkl' 391 else f'{self.label}.json' 392 ) 393 return SQL_CONN_CACHE_RESOURCES_PATH / filename
Return the path to the file to which to write metadata cache.
45def create_engine( 46 self, 47 include_uri: bool = False, 48 debug: bool = False, 49 **kw 50) -> 'sqlalchemy.engine.Engine': 51 """Create a sqlalchemy engine by building the engine string.""" 52 from meerschaum.utils.packages import attempt_import 53 from meerschaum.utils.warnings import error, warn 54 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 55 import urllib 56 import copy 57 ### Install and patch required drivers. 58 if self.flavor in install_flavor_drivers: 59 _ = attempt_import( 60 *install_flavor_drivers[self.flavor], 61 debug=debug, 62 lazy=False, 63 warn=False, 64 ) 65 if self.flavor == 'mssql': 66 _init_mssql_sqlalchemy() 67 68 ### supplement missing values with defaults (e.g. port number) 69 for a, value in flavor_configs[self.flavor]['defaults'].items(): 70 if a not in self.__dict__: 71 self.__dict__[a] = value 72 73 ### Verify that everything is in order. 74 if self.flavor not in flavor_configs: 75 error(f"Cannot create a connector with the flavor '{self.flavor}'.") 76 77 _engine = flavor_configs[self.flavor].get('engine', None) 78 _username = self.__dict__.get('username', None) 79 _password = self.__dict__.get('password', None) 80 _host = self.__dict__.get('host', None) 81 _port = self.__dict__.get('port', None) 82 _database = self.__dict__.get('database', None) 83 if _database == '{SQLITE_DB_PATH}': 84 from meerschaum.config.paths import SQLITE_DB_PATH 85 _database = SQLITE_DB_PATH.as_posix() 86 _options = self.__dict__.get('options', {}) 87 if isinstance(_options, str): 88 _options = dict(urllib.parse.parse_qsl(_options)) 89 _uri = self.__dict__.get('uri', None) 90 91 ### Handle registering specific dialects (due to installing in virtual environments). 92 if self.flavor in flavor_dialects: 93 sqlalchemy.dialects.registry.register(*flavor_dialects[self.flavor]) 94 95 ### self._sys_config was deepcopied and can be updated safely 96 if self.flavor in ("sqlite", "duckdb", "geopackage"): 97 engine_str = f"{_engine}:///{_database}" if not _uri else _uri 98 if 'create_engine' not in self._sys_config: 99 self._sys_config['create_engine'] = {} 100 if 'connect_args' not in self._sys_config['create_engine']: 101 self._sys_config['create_engine']['connect_args'] = {} 102 self._sys_config['create_engine']['connect_args'].update({"check_same_thread": False}) 103 else: 104 engine_str = ( 105 _engine + "://" + (_username if _username is not None else '') + 106 ((":" + urllib.parse.quote_plus(_password)) if _password is not None else '') + 107 "@" + _host + ((":" + str(_port)) if _port is not None else '') + 108 (("/" + _database) if _database is not None else '') 109 + (("?" + urllib.parse.urlencode(_options)) if _options else '') 110 ) if not _uri else _uri 111 112 ### Sometimes the timescaledb:// flavor can slip in. 113 if _uri and self.flavor in _uri: 114 if self.flavor in ('timescaledb', 'timescaledb-ha', 'postgis'): 115 engine_str = engine_str.replace(self.flavor, 'postgresql', 1) 116 elif _uri.startswith('postgresql://'): 117 engine_str = engine_str.replace('postgresql://', 'postgresql+psycopg2://') 118 119 if debug: 120 dprint( 121 ( 122 (engine_str.replace(':' + _password, ':' + ('*' * len(_password)))) 123 if _password is not None else engine_str 124 ) + '\n' + f"{self._sys_config.get('create_engine', {}).get('connect_args', {})}" 125 ) 126 127 _kw_copy = copy.deepcopy(kw) 128 129 ### NOTE: Order of inheritance: 130 ### 1. Defaults 131 ### 2. System configuration 132 ### 3. Connector configuration 133 ### 4. Keyword arguments 134 _create_engine_args = flavor_configs.get(self.flavor, {}).get('create_engine', {}) 135 def _apply_create_engine_args(update): 136 if 'ALL' not in flavor_configs[self.flavor].get('omit_create_engine', {}): 137 _create_engine_args.update( 138 { k: v for k, v in update.items() 139 if 'omit_create_engine' not in flavor_configs[self.flavor] 140 or k not in flavor_configs[self.flavor].get('omit_create_engine') 141 } 142 ) 143 _apply_create_engine_args(self._sys_config.get('create_engine', {})) 144 _apply_create_engine_args(self.__dict__.get('create_engine', {})) 145 _apply_create_engine_args(_kw_copy) 146 147 try: 148 engine = sqlalchemy.create_engine( 149 engine_str, 150 ### I know this looks confusing, and maybe it's bad code, 151 ### but it's simple. It dynamically parses the config string 152 ### and splits it to separate the class name (QueuePool) 153 ### from the module name (sqlalchemy.pool). 154 poolclass = getattr( 155 attempt_import( 156 ".".join(self._sys_config['poolclass'].split('.')[:-1]) 157 ), 158 self._sys_config['poolclass'].split('.')[-1] 159 ), 160 echo = debug, 161 **_create_engine_args 162 ) 163 except Exception: 164 warn(f"Failed to create connector '{self}':\n{traceback.format_exc()}", stack=False) 165 engine = None 166 167 if include_uri: 168 return engine, engine_str 169 return engine
Create a sqlalchemy engine by building the engine string.
35def read( 36 self, 37 query_or_table: Union[str, sqlalchemy.Query], 38 params: Union[Dict[str, Any], List[str], None] = None, 39 dtype: Optional[Dict[str, Any]] = None, 40 coerce_float: bool = True, 41 chunksize: Optional[int] = -1, 42 workers: Optional[int] = None, 43 chunk_hook: Optional[Callable[[pandas.DataFrame], Any]] = None, 44 as_hook_results: bool = False, 45 chunks: Optional[int] = None, 46 schema: Optional[str] = None, 47 as_chunks: bool = False, 48 as_iterator: bool = False, 49 as_dask: bool = False, 50 index_col: Optional[str] = None, 51 silent: bool = False, 52 debug: bool = False, 53 **kw: Any 54) -> Union[ 55 pandas.DataFrame, 56 dask.DataFrame, 57 List[pandas.DataFrame], 58 List[Any], 59 None, 60]: 61 """ 62 Read a SQL query or table into a pandas dataframe. 63 64 Parameters 65 ---------- 66 query_or_table: Union[str, sqlalchemy.Query] 67 The SQL query (sqlalchemy Query or string) or name of the table from which to select. 68 69 params: Optional[Dict[str, Any]], default None 70 `List` or `Dict` of parameters to pass to `pandas.read_sql()`. 71 See the pandas documentation for more information: 72 https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html 73 74 dtype: Optional[Dict[str, Any]], default None 75 A dictionary of data types to pass to `pandas.read_sql()`. 76 See the pandas documentation for more information: 77 https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html 78 79 chunksize: Optional[int], default -1 80 How many chunks to read at a time. `None` will read everything in one large chunk. 81 Defaults to system configuration. 82 83 **NOTE:** DuckDB does not allow for chunking. 84 85 workers: Optional[int], default None 86 How many threads to use when consuming the generator. 87 Only applies if `chunk_hook` is provided. 88 89 chunk_hook: Optional[Callable[[pandas.DataFrame], Any]], default None 90 Hook function to execute once per chunk, e.g. writing and reading chunks intermittently. 91 See `--sync-chunks` for an example. 92 **NOTE:** `as_iterator` MUST be False (default). 93 94 as_hook_results: bool, default False 95 If `True`, return a `List` of the outputs of the hook function. 96 Only applicable if `chunk_hook` is not None. 97 98 **NOTE:** `as_iterator` MUST be `False` (default). 99 100 chunks: Optional[int], default None 101 Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and 102 return into a single dataframe. 103 For example, to limit the returned dataframe to 100,000 rows, 104 you could specify a `chunksize` of `1000` and `chunks` of `100`. 105 106 schema: Optional[str], default None 107 If just a table name is provided, optionally specify the table schema. 108 Defaults to `SQLConnector.schema`. 109 110 as_chunks: bool, default False 111 If `True`, return a list of DataFrames. 112 Otherwise return a single DataFrame. 113 114 as_iterator: bool, default False 115 If `True`, return the pandas DataFrame iterator. 116 `chunksize` must not be `None` (falls back to 1000 if so), 117 and hooks are not called in this case. 118 119 index_col: Optional[str], default None 120 If using Dask, use this column as the index column. 121 If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame. 122 123 silent: bool, default False 124 If `True`, don't raise warnings in case of errors. 125 Defaults to `False`. 126 127 Returns 128 ------- 129 A `pd.DataFrame` (default case), or an iterator, or a list of dataframes / iterators, 130 or `None` if something breaks. 131 132 """ 133 if chunks is not None and chunks <= 0: 134 return [] 135 136 from meerschaum.utils.sql import sql_item_name, truncate_item_name 137 from meerschaum.utils.dtypes import are_dtypes_equal, coerce_timezone 138 from meerschaum.utils.dtypes.sql import TIMEZONE_NAIVE_FLAVORS 139 from meerschaum.utils.packages import attempt_import, import_pandas 140 from meerschaum.utils.pool import get_pool 141 from meerschaum.utils.dataframe import chunksize_to_npartitions, get_numeric_cols 142 from meerschaum.utils.misc import filter_arguments 143 import warnings 144 import traceback 145 from decimal import Decimal 146 147 pd = import_pandas() 148 dd = None 149 150 is_dask = 'dask' in pd.__name__ 151 pandas = attempt_import('pandas') 152 is_dask = dd is not None 153 npartitions = chunksize_to_npartitions(chunksize) 154 if is_dask: 155 chunksize = None 156 157 schema = schema or self.schema 158 utc_dt_cols = [ 159 col 160 for col, typ in dtype.items() 161 if are_dtypes_equal(typ, 'datetime') and 'utc' in typ.lower() 162 ] if dtype else [] 163 164 if dtype and utc_dt_cols and self.flavor in TIMEZONE_NAIVE_FLAVORS: 165 dtype = dtype.copy() 166 for col in utc_dt_cols: 167 dtype[col] = 'datetime64[us]' 168 169 pool = get_pool(workers=workers) 170 sqlalchemy = attempt_import("sqlalchemy", lazy=False) 171 default_chunksize = self._sys_config.get('chunksize', None) 172 chunksize = chunksize if chunksize != -1 else default_chunksize 173 if chunksize is None and as_iterator: 174 if not silent and self.flavor not in _disallow_chunks_flavors: 175 warn( 176 "An iterator may only be generated if chunksize is not None.\n" 177 + "Falling back to a chunksize of 1000.", stacklevel=3, 178 ) 179 chunksize = 1000 180 if chunksize is not None and self.flavor in _max_chunks_flavors: 181 if chunksize > _max_chunks_flavors[self.flavor]: 182 if chunksize != default_chunksize: 183 warn( 184 f"The specified chunksize of {chunksize} exceeds the maximum of " 185 + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n" 186 + f" Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.", 187 stacklevel=3, 188 ) 189 chunksize = _max_chunks_flavors[self.flavor] 190 191 if chunksize is not None and self.flavor in _disallow_chunks_flavors: 192 chunksize = None 193 194 if debug: 195 import time 196 start = time.perf_counter() 197 dprint(f"[{self}]\n{query_or_table}") 198 dprint(f"[{self}] Fetching with chunksize: {chunksize}") 199 200 ### This might be sqlalchemy object or the string of a table name. 201 ### We check for spaces and quotes to see if it might be a weird table. 202 if ( 203 ' ' not in str(query_or_table) 204 or ( 205 ' ' in str(query_or_table) 206 and str(query_or_table).startswith('"') 207 and str(query_or_table).endswith('"') 208 ) 209 ): 210 truncated_table_name = truncate_item_name(str(query_or_table), self.flavor) 211 if truncated_table_name != str(query_or_table) and not silent: 212 warn( 213 f"Table '{query_or_table}' is too long for '{self.flavor}'," 214 + f" will instead read the table '{truncated_table_name}'." 215 ) 216 217 query_or_table = sql_item_name(str(query_or_table), self.flavor, schema) 218 if debug: 219 dprint(f"[{self}] Reading from table {query_or_table}") 220 formatted_query = sqlalchemy.text("SELECT * FROM " + str(query_or_table)) 221 str_query = f"SELECT * FROM {query_or_table}" 222 else: 223 str_query = query_or_table 224 225 formatted_query = ( 226 sqlalchemy.text(str_query) 227 if not is_dask and isinstance(str_query, str) 228 else format_sql_query_for_dask(str_query) 229 ) 230 231 def _get_chunk_args_kwargs(_chunk): 232 return filter_arguments( 233 chunk_hook, 234 _chunk, 235 workers=workers, 236 chunksize=chunksize, 237 debug=debug, 238 **kw 239 ) 240 241 chunk_list = [] 242 chunk_hook_results = [] 243 def _process_chunk(_chunk, _retry_on_failure: bool = True): 244 if self.flavor in TIMEZONE_NAIVE_FLAVORS: 245 for col in utc_dt_cols: 246 _chunk[col] = coerce_timezone(_chunk[col], strip_utc=False) 247 if not as_hook_results: 248 chunk_list.append(_chunk) 249 250 if chunk_hook is None: 251 return None 252 253 chunk_args, chunk_kwargs = _get_chunk_args_kwargs(_chunk) 254 255 result = None 256 try: 257 result = chunk_hook(*chunk_args, **chunk_kwargs) 258 except Exception: 259 result = False, traceback.format_exc() 260 from meerschaum.utils.formatting import get_console 261 if not silent: 262 get_console().print_exception() 263 264 ### If the chunk fails to process, try it again one more time. 265 if isinstance(result, tuple) and result[0] is False: 266 if _retry_on_failure: 267 return _process_chunk(_chunk, _retry_on_failure=False) 268 269 return result 270 271 try: 272 stream_results = not as_iterator and chunk_hook is not None and chunksize is not None 273 with warnings.catch_warnings(): 274 warnings.filterwarnings('ignore', 'case sensitivity issues') 275 276 read_sql_query_kwargs = { 277 'params': params, 278 'dtype': dtype, 279 'coerce_float': coerce_float, 280 'index_col': index_col, 281 } 282 if is_dask: 283 if index_col is None: 284 dd = None 285 pd = attempt_import('pandas') 286 read_sql_query_kwargs.update({ 287 'chunksize': chunksize, 288 }) 289 else: 290 read_sql_query_kwargs.update({ 291 'chunksize': chunksize, 292 }) 293 294 if is_dask and dd is not None: 295 ddf = dd.read_sql_query( 296 formatted_query, 297 self.URI, 298 **read_sql_query_kwargs 299 ) 300 else: 301 302 def get_chunk_generator(connectable): 303 chunk_generator = pd.read_sql_query( 304 formatted_query, 305 self.engine, 306 **read_sql_query_kwargs 307 ) 308 309 to_return = ( 310 ( 311 chunk_generator 312 if not (as_hook_results or chunksize is None) 313 else ( 314 _process_chunk(_chunk) 315 for _chunk in chunk_generator 316 ) 317 ) 318 if as_iterator or chunksize is None 319 else ( 320 list(pool.imap(_process_chunk, chunk_generator)) 321 if as_hook_results 322 else None 323 ) 324 ) 325 return chunk_generator, to_return 326 327 if self.flavor in SKIP_READ_TRANSACTION_FLAVORS: 328 chunk_generator, to_return = get_chunk_generator(self.engine) 329 else: 330 with self.engine.begin() as transaction: 331 with transaction.execution_options(stream_results=stream_results) as connection: 332 chunk_generator, to_return = get_chunk_generator(connection) 333 334 if to_return is not None: 335 return to_return 336 337 except Exception as e: 338 if debug: 339 dprint(f"[{self}] Failed to execute query:\n\n{query_or_table}\n\n") 340 if not silent: 341 warn(str(e), stacklevel=3) 342 from meerschaum.utils.formatting import get_console 343 if not silent: 344 get_console().print_exception() 345 346 return None 347 348 if is_dask and dd is not None: 349 ddf = ddf.reset_index() 350 return ddf 351 352 chunk_list = [] 353 read_chunks = 0 354 chunk_hook_results = [] 355 if chunksize is None: 356 chunk_list.append(chunk_generator) 357 elif as_iterator: 358 return chunk_generator 359 else: 360 try: 361 for chunk in chunk_generator: 362 if chunk_hook is not None: 363 chunk_args, chunk_kwargs = _get_chunk_args_kwargs(chunk) 364 chunk_hook_results.append(chunk_hook(*chunk_args, **chunk_kwargs)) 365 chunk_list.append(chunk) 366 read_chunks += 1 367 if chunks is not None and read_chunks >= chunks: 368 break 369 except Exception as e: 370 warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3) 371 from meerschaum.utils.formatting import get_console 372 if not silent: 373 get_console().print_exception() 374 375 read_chunks = 0 376 try: 377 for chunk in chunk_generator: 378 if chunk_hook is not None: 379 chunk_args, chunk_kwargs = _get_chunk_args_kwargs(chunk) 380 chunk_hook_results.append(chunk_hook(*chunk_args, **chunk_kwargs)) 381 chunk_list.append(chunk) 382 read_chunks += 1 383 if chunks is not None and read_chunks >= chunks: 384 break 385 except Exception as e: 386 warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3) 387 from meerschaum.utils.formatting import get_console 388 if not silent: 389 get_console().print_exception() 390 391 return None 392 393 ### If no chunks returned, read without chunks 394 ### to get columns 395 if len(chunk_list) == 0: 396 with warnings.catch_warnings(): 397 warnings.filterwarnings('ignore', 'case sensitivity issues') 398 _ = read_sql_query_kwargs.pop('chunksize', None) 399 with self.engine.begin() as connection: 400 chunk_list.append( 401 pd.read_sql_query( 402 formatted_query, 403 connection, 404 **read_sql_query_kwargs 405 ) 406 ) 407 408 ### call the hook on any missed chunks. 409 if chunk_hook is not None and len(chunk_list) > len(chunk_hook_results): 410 for c in chunk_list[len(chunk_hook_results):]: 411 chunk_args, chunk_kwargs = _get_chunk_args_kwargs(c) 412 chunk_hook_results.append(chunk_hook(*chunk_args, **chunk_kwargs)) 413 414 ### chunksize is not None so must iterate 415 if debug: 416 end = time.perf_counter() 417 dprint(f"Fetched {len(chunk_list)} chunks in {round(end - start, 2)} seconds.") 418 419 if as_hook_results: 420 return chunk_hook_results 421 422 ### Skip `pd.concat()` if `as_chunks` is specified. 423 if as_chunks: 424 for c in chunk_list: 425 c.reset_index(drop=True, inplace=True) 426 for col in get_numeric_cols(c): 427 c[col] = c[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x) 428 return chunk_list 429 430 df = pd.concat(chunk_list).reset_index(drop=True) 431 ### NOTE: The calls to `canonical()` are to drop leading and trailing zeroes. 432 for col in get_numeric_cols(df): 433 df[col] = df[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x) 434 435 return df
Read a SQL query or table into a pandas dataframe.
Parameters
- query_or_table (Union[str, sqlalchemy.Query]): The SQL query (sqlalchemy Query or string) or name of the table from which to select.
- params (Optional[Dict[str, Any]], default None):
List
orDict
of parameters to pass topandas.read_sql()
. See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html - dtype (Optional[Dict[str, Any]], default None):
A dictionary of data types to pass to
pandas.read_sql()
. See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html chunksize (Optional[int], default -1): How many chunks to read at a time.
None
will read everything in one large chunk. Defaults to system configuration.NOTE: DuckDB does not allow for chunking.
- workers (Optional[int], default None):
How many threads to use when consuming the generator.
Only applies if
chunk_hook
is provided. - chunk_hook (Optional[Callable[[pandas.DataFrame], Any]], default None):
Hook function to execute once per chunk, e.g. writing and reading chunks intermittently.
See
--sync-chunks
for an example. NOTE:as_iterator
MUST be False (default). as_hook_results (bool, default False): If
True
, return aList
of the outputs of the hook function. Only applicable ifchunk_hook
is not None.NOTE:
as_iterator
MUST beFalse
(default).- chunks (Optional[int], default None):
Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and
return into a single dataframe.
For example, to limit the returned dataframe to 100,000 rows,
you could specify a
chunksize
of1000
andchunks
of100
. - schema (Optional[str], default None):
If just a table name is provided, optionally specify the table schema.
Defaults to
SQLConnector.schema
. - as_chunks (bool, default False):
If
True
, return a list of DataFrames. Otherwise return a single DataFrame. - as_iterator (bool, default False):
If
True
, return the pandas DataFrame iterator.chunksize
must not beNone
(falls back to 1000 if so), and hooks are not called in this case. - index_col (Optional[str], default None): If using Dask, use this column as the index column. If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame.
- silent (bool, default False):
If
True
, don't raise warnings in case of errors. Defaults toFalse
.
Returns
- A
pd.DataFrame
(default case), or an iterator, or a list of dataframes / iterators, - or
None
if something breaks.
438def value( 439 self, 440 query: str, 441 *args: Any, 442 use_pandas: bool = False, 443 **kw: Any 444) -> Any: 445 """ 446 Execute the provided query and return the first value. 447 448 Parameters 449 ---------- 450 query: str 451 The SQL query to execute. 452 453 *args: Any 454 The arguments passed to `meerschaum.connectors.sql.SQLConnector.exec` 455 if `use_pandas` is `False` (default) or to `meerschaum.connectors.sql.SQLConnector.read`. 456 457 use_pandas: bool, default False 458 If `True`, use `meerschaum.connectors.SQLConnector.read`, otherwise use 459 `meerschaum.connectors.sql.SQLConnector.exec` (default). 460 **NOTE:** This is always `True` for DuckDB. 461 462 **kw: Any 463 See `args`. 464 465 Returns 466 ------- 467 Any value returned from the query. 468 469 """ 470 from meerschaum.utils.packages import attempt_import 471 if self.flavor == 'duckdb': 472 use_pandas = True 473 if use_pandas: 474 try: 475 return self.read(query, *args, **kw).iloc[0, 0] 476 except Exception: 477 return None 478 479 _close = kw.get('close', True) 480 _commit = kw.get('commit', (self.flavor != 'mssql')) 481 482 try: 483 result, connection = self.exec( 484 query, 485 *args, 486 with_connection=True, 487 close=False, 488 commit=_commit, 489 **kw 490 ) 491 first = result.first() if result is not None else None 492 _val = first[0] if first is not None else None 493 except Exception as e: 494 warn(e, stacklevel=3) 495 return None 496 if _close: 497 try: 498 connection.close() 499 except Exception as e: 500 warn("Failed to close connection with exception:\n" + str(e)) 501 return _val
Execute the provided query and return the first value.
Parameters
- query (str): The SQL query to execute.
- *args (Any):
The arguments passed to
meerschaum.connectors.sql.SQLConnector.exec
ifuse_pandas
isFalse
(default) or tomeerschaum.connectors.sql.SQLConnector.read
. - use_pandas (bool, default False):
If
True
, usemeerschaum.connectors.SQLConnector.read
, otherwise usemeerschaum.connectors.sql.SQLConnector.exec
(default). NOTE: This is alwaysTrue
for DuckDB. - **kw (Any):
See
args
.
Returns
- Any value returned from the query.
515def exec( 516 self, 517 query: str, 518 *args: Any, 519 silent: bool = False, 520 debug: bool = False, 521 commit: Optional[bool] = None, 522 close: Optional[bool] = None, 523 with_connection: bool = False, 524 _connection=None, 525 _transaction=None, 526 **kw: Any 527) -> Union[ 528 sqlalchemy.engine.result.resultProxy, 529 sqlalchemy.engine.cursor.LegacyCursorResult, 530 Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection], 531 Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection], 532 None 533]: 534 """ 535 Execute SQL code and return the `sqlalchemy` result, e.g. when calling stored procedures. 536 537 If inserting data, please use bind variables to avoid SQL injection! 538 539 Parameters 540 ---------- 541 query: Union[str, List[str], Tuple[str]] 542 The query to execute. 543 If `query` is a list or tuple, call `self.exec_queries()` instead. 544 545 args: Any 546 Arguments passed to `sqlalchemy.engine.execute`. 547 548 silent: bool, default False 549 If `True`, suppress warnings. 550 551 commit: Optional[bool], default None 552 If `True`, commit the changes after execution. 553 Causes issues with flavors like `'mssql'`. 554 This does not apply if `query` is a list of strings. 555 556 close: Optional[bool], default None 557 If `True`, close the connection after execution. 558 Causes issues with flavors like `'mssql'`. 559 This does not apply if `query` is a list of strings. 560 561 with_connection: bool, default False 562 If `True`, return a tuple including the connection object. 563 This does not apply if `query` is a list of strings. 564 565 Returns 566 ------- 567 The `sqlalchemy` result object, or a tuple with the connection if `with_connection` is provided. 568 569 """ 570 if isinstance(query, (list, tuple)): 571 return self.exec_queries( 572 list(query), 573 *args, 574 silent=silent, 575 debug=debug, 576 **kw 577 ) 578 579 from meerschaum.utils.packages import attempt_import 580 sqlalchemy = attempt_import("sqlalchemy", lazy=False) 581 if debug: 582 dprint(f"[{self}] Executing query:\n{query}") 583 584 _close = close if close is not None else (self.flavor != 'mssql') 585 _commit = commit if commit is not None else ( 586 (self.flavor != 'mssql' or 'select' not in str(query).lower()) 587 ) 588 589 ### Select and Insert objects need to be compiled (SQLAlchemy 2.0.0+). 590 if not hasattr(query, 'compile'): 591 query = sqlalchemy.text(query) 592 593 connection = _connection if _connection is not None else self.get_connection() 594 595 try: 596 transaction = ( 597 _transaction 598 if _transaction is not None else ( 599 connection.begin() 600 if _commit 601 else None 602 ) 603 ) 604 except sqlalchemy.exc.InvalidRequestError as e: 605 if _connection is not None or _transaction is not None: 606 raise e 607 connection = self.get_connection(rebuild=True) 608 transaction = connection.begin() 609 610 if transaction is not None and not transaction.is_active and _transaction is not None: 611 connection = self.get_connection(rebuild=True) 612 transaction = connection.begin() if _commit else None 613 614 result = None 615 try: 616 result = connection.execute(query, *args, **kw) 617 if _commit: 618 transaction.commit() 619 except Exception as e: 620 if debug: 621 dprint(f"[{self}] Failed to execute query:\n\n{query}\n\n{e}") 622 if not silent: 623 warn(str(e), stacklevel=3) 624 result = None 625 if _commit: 626 transaction.rollback() 627 connection = self.get_connection(rebuild=True) 628 finally: 629 if _close: 630 connection.close() 631 632 if with_connection: 633 return result, connection 634 635 return result
Execute SQL code and return the sqlalchemy
result, e.g. when calling stored procedures.
If inserting data, please use bind variables to avoid SQL injection!
Parameters
- query (Union[str, List[str], Tuple[str]]):
The query to execute.
If
query
is a list or tuple, callself.exec_queries()
instead. - args (Any):
Arguments passed to
sqlalchemy.engine.execute
. - silent (bool, default False):
If
True
, suppress warnings. - commit (Optional[bool], default None):
If
True
, commit the changes after execution. Causes issues with flavors like'mssql'
. This does not apply ifquery
is a list of strings. - close (Optional[bool], default None):
If
True
, close the connection after execution. Causes issues with flavors like'mssql'
. This does not apply ifquery
is a list of strings. - with_connection (bool, default False):
If
True
, return a tuple including the connection object. This does not apply ifquery
is a list of strings.
Returns
- The
sqlalchemy
result object, or a tuple with the connection ifwith_connection
is provided.
504def execute( 505 self, 506 *args : Any, 507 **kw : Any 508) -> Optional[sqlalchemy.engine.result.resultProxy]: 509 """ 510 An alias for `meerschaum.connectors.sql.SQLConnector.exec`. 511 """ 512 return self.exec(*args, **kw)
An alias for meerschaum.connectors.sql.SQLConnector.exec
.
733def to_sql( 734 self, 735 df: pandas.DataFrame, 736 name: str = None, 737 index: bool = False, 738 if_exists: str = 'replace', 739 method: str = "", 740 chunksize: Optional[int] = -1, 741 schema: Optional[str] = None, 742 safe_copy: bool = True, 743 silent: bool = False, 744 debug: bool = False, 745 as_tuple: bool = False, 746 as_dict: bool = False, 747 _connection=None, 748 _transaction=None, 749 **kw 750) -> Union[bool, SuccessTuple]: 751 """ 752 Upload a DataFrame's contents to the SQL server. 753 754 Parameters 755 ---------- 756 df: pd.DataFrame 757 The DataFrame to be inserted. 758 759 name: str 760 The name of the table to be created. 761 762 index: bool, default False 763 If True, creates the DataFrame's indices as columns. 764 765 if_exists: str, default 'replace' 766 Drop and create the table ('replace') or append if it exists 767 ('append') or raise Exception ('fail'). 768 Options are ['replace', 'append', 'fail']. 769 770 method: str, default '' 771 None or multi. Details on pandas.to_sql. 772 773 chunksize: Optional[int], default -1 774 How many rows to insert at a time. 775 776 schema: Optional[str], default None 777 Optionally override the schema for the table. 778 Defaults to `SQLConnector.schema`. 779 780 safe_copy: bool, defaul True 781 If `True`, copy the dataframe before making any changes. 782 783 as_tuple: bool, default False 784 If `True`, return a (success_bool, message) tuple instead of a `bool`. 785 Defaults to `False`. 786 787 as_dict: bool, default False 788 If `True`, return a dictionary of transaction information. 789 The keys are `success`, `msg`, `start`, `end`, `duration`, `num_rows`, `chunksize`, 790 `method`, and `target`. 791 792 kw: Any 793 Additional arguments will be passed to the DataFrame's `to_sql` function 794 795 Returns 796 ------- 797 Either a `bool` or a `SuccessTuple` (depends on `as_tuple`). 798 """ 799 import time 800 import json 801 from datetime import timedelta 802 from meerschaum.utils.warnings import error, warn 803 import warnings 804 import functools 805 import traceback 806 807 if name is None: 808 error(f"Name must not be `None` to insert data into {self}.") 809 810 ### We're requiring `name` to be positional, and sometimes it's passed in from background jobs. 811 kw.pop('name', None) 812 813 schema = schema or self.schema 814 815 from meerschaum.utils.sql import ( 816 sql_item_name, 817 table_exists, 818 json_flavors, 819 truncate_item_name, 820 DROP_IF_EXISTS_FLAVORS, 821 ) 822 from meerschaum.utils.dataframe import ( 823 get_json_cols, 824 get_numeric_cols, 825 get_uuid_cols, 826 get_bytes_cols, 827 get_geometry_cols, 828 ) 829 from meerschaum.utils.dtypes import ( 830 are_dtypes_equal, 831 coerce_timezone, 832 encode_bytes_for_bytea, 833 serialize_bytes, 834 serialize_decimal, 835 serialize_geometry, 836 json_serialize_value, 837 get_geometry_type_srid, 838 ) 839 from meerschaum.utils.dtypes.sql import ( 840 PD_TO_SQLALCHEMY_DTYPES_FLAVORS, 841 get_db_type_from_pd_type, 842 get_pd_type_from_db_type, 843 get_numeric_precision_scale, 844 ) 845 from meerschaum.utils.misc import interval_str 846 from meerschaum.connectors.sql._create_engine import flavor_configs 847 from meerschaum.utils.packages import attempt_import, import_pandas 848 sqlalchemy = attempt_import('sqlalchemy', debug=debug, lazy=False) 849 pd = import_pandas() 850 is_dask = 'dask' in df.__module__ 851 852 bytes_cols = get_bytes_cols(df) 853 numeric_cols = get_numeric_cols(df) 854 geometry_cols = get_geometry_cols(df) 855 ### NOTE: This excludes non-numeric serialized Decimals (e.g. SQLite). 856 numeric_cols_dtypes = { 857 col: typ 858 for col, typ in kw.get('dtype', {}).items() 859 if ( 860 col in df.columns 861 and 'numeric' in str(typ).lower() 862 ) 863 } 864 numeric_cols.extend([col for col in numeric_cols_dtypes if col not in numeric_cols]) 865 numeric_cols_precisions_scales = { 866 col: ( 867 (typ.precision, typ.scale) 868 if hasattr(typ, 'precision') 869 else get_numeric_precision_scale(self.flavor) 870 ) 871 for col, typ in numeric_cols_dtypes.items() 872 } 873 geometry_cols_dtypes = { 874 col: typ 875 for col, typ in kw.get('dtype', {}).items() 876 if ( 877 col in df.columns 878 and 'geometry' in str(typ).lower() or 'geography' in str(typ).lower() 879 ) 880 } 881 geometry_cols.extend([col for col in geometry_cols_dtypes if col not in geometry_cols]) 882 geometry_cols_types_srids = { 883 col: (typ.geometry_type, typ.srid) 884 if hasattr(typ, 'srid') 885 else get_geometry_type_srid() 886 for col, typ in geometry_cols_dtypes.items() 887 } 888 889 cols_pd_types = { 890 col: get_pd_type_from_db_type(str(typ)) 891 for col, typ in kw.get('dtype', {}).items() 892 } 893 cols_pd_types.update({ 894 col: f'numeric[{precision},{scale}]' 895 for col, (precision, scale) in numeric_cols_precisions_scales.items() 896 if precision and scale 897 }) 898 cols_db_types = { 899 col: get_db_type_from_pd_type(typ, flavor=self.flavor) 900 for col, typ in cols_pd_types.items() 901 } 902 903 enable_bulk_insert = mrsm.get_config( 904 'system', 'connectors', 'sql', 'bulk_insert', self.flavor, 905 warn=False, 906 ) or False 907 stats = {'target': name} 908 ### resort to defaults if None 909 copied = False 910 use_bulk_insert = False 911 if method == "": 912 if enable_bulk_insert: 913 method = ( 914 functools.partial(mssql_insert_json, cols_types=cols_db_types, debug=debug) 915 if self.flavor == 'mssql' 916 else functools.partial(psql_insert_copy, debug=debug) 917 ) 918 use_bulk_insert = True 919 else: 920 ### Should resolve to 'multi' or `None`. 921 method = flavor_configs.get(self.flavor, {}).get('to_sql', {}).get('method', 'multi') 922 923 if bytes_cols and (use_bulk_insert or self.flavor == 'oracle'): 924 if safe_copy and not copied: 925 df = df.copy() 926 copied = True 927 bytes_serializer = ( 928 functools.partial(encode_bytes_for_bytea, with_prefix=(self.flavor != 'oracle')) 929 if self.flavor != 'mssql' 930 else serialize_bytes 931 ) 932 for col in bytes_cols: 933 df[col] = df[col].apply(bytes_serializer) 934 935 ### Check for numeric columns. 936 for col in numeric_cols: 937 precision, scale = numeric_cols_precisions_scales.get( 938 col, 939 get_numeric_precision_scale(self.flavor) 940 ) 941 df[col] = df[col].apply( 942 functools.partial( 943 serialize_decimal, 944 quantize=True, 945 precision=precision, 946 scale=scale, 947 ) 948 ) 949 950 for col in geometry_cols: 951 geometry_type, srid = geometry_cols_types_srids.get(col, get_geometry_type_srid()) 952 with warnings.catch_warnings(): 953 warnings.simplefilter("ignore") 954 df[col] = df[col].apply( 955 functools.partial( 956 serialize_geometry, 957 geometry_format=('wkt' if self.flavor == 'mssql' else 'wkb_hex'), 958 ) 959 ) 960 961 stats['method'] = method.__name__ if hasattr(method, '__name__') else str(method) 962 963 default_chunksize = self._sys_config.get('chunksize', None) 964 chunksize = chunksize if chunksize != -1 else default_chunksize 965 if chunksize is not None and self.flavor in _max_chunks_flavors: 966 if chunksize > _max_chunks_flavors[self.flavor]: 967 if chunksize != default_chunksize: 968 warn( 969 f"The specified chunksize of {chunksize} exceeds the maximum of " 970 + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n" 971 + f" Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.", 972 stacklevel = 3, 973 ) 974 chunksize = _max_chunks_flavors[self.flavor] 975 stats['chunksize'] = chunksize 976 977 success, msg = False, "Default to_sql message" 978 start = time.perf_counter() 979 if debug: 980 msg = f"[{self}] Inserting {len(df)} rows with chunksize: {chunksize}..." 981 print(msg, end="", flush=True) 982 stats['num_rows'] = len(df) 983 984 ### Check if the name is too long. 985 truncated_name = truncate_item_name(name, self.flavor) 986 if name != truncated_name: 987 warn( 988 f"Table '{name}' is too long for '{self.flavor}'," 989 f" will instead create the table '{truncated_name}'." 990 ) 991 992 ### filter out non-pandas args 993 import inspect 994 to_sql_params = inspect.signature(df.to_sql).parameters 995 to_sql_kw = {} 996 for k, v in kw.items(): 997 if k in to_sql_params: 998 to_sql_kw[k] = v 999 1000 to_sql_kw.update({ 1001 'name': truncated_name, 1002 'schema': schema, 1003 ('con' if not is_dask else 'uri'): (self.engine if not is_dask else self.URI), 1004 'index': index, 1005 'if_exists': if_exists, 1006 'method': method, 1007 'chunksize': chunksize, 1008 }) 1009 if is_dask: 1010 to_sql_kw.update({ 1011 'parallel': True, 1012 }) 1013 elif _connection is not None: 1014 to_sql_kw['con'] = _connection 1015 1016 if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else "" 1017 if self.flavor == 'oracle': 1018 ### For some reason 'replace' doesn't work properly in pandas, 1019 ### so try dropping first. 1020 if if_exists == 'replace' and table_exists(name, self, schema=schema, debug=debug): 1021 success = self.exec( 1022 f"DROP TABLE {if_exists_str}" + sql_item_name(name, 'oracle', schema) 1023 ) is not None 1024 if not success: 1025 warn(f"Unable to drop {name}") 1026 1027 ### Enforce NVARCHAR(2000) as text instead of CLOB. 1028 dtype = to_sql_kw.get('dtype', {}) 1029 for col, typ in df.dtypes.items(): 1030 if are_dtypes_equal(str(typ), 'object'): 1031 dtype[col] = sqlalchemy.types.NVARCHAR(2000) 1032 elif are_dtypes_equal(str(typ), 'int'): 1033 dtype[col] = sqlalchemy.types.INTEGER 1034 to_sql_kw['dtype'] = dtype 1035 elif self.flavor == 'duckdb': 1036 dtype = to_sql_kw.get('dtype', {}) 1037 dt_cols = [col for col, typ in df.dtypes.items() if are_dtypes_equal(str(typ), 'datetime')] 1038 for col in dt_cols: 1039 df[col] = coerce_timezone(df[col], strip_utc=False) 1040 elif self.flavor == 'mssql': 1041 dtype = to_sql_kw.get('dtype', {}) 1042 dt_cols = [col for col, typ in df.dtypes.items() if are_dtypes_equal(str(typ), 'datetime')] 1043 new_dtype = {} 1044 for col in dt_cols: 1045 if col in dtype: 1046 continue 1047 dt_typ = get_db_type_from_pd_type(str(df.dtypes[col]), self.flavor, as_sqlalchemy=True) 1048 if col not in dtype: 1049 new_dtype[col] = dt_typ 1050 1051 dtype.update(new_dtype) 1052 to_sql_kw['dtype'] = dtype 1053 1054 ### Check for JSON columns. 1055 if self.flavor not in json_flavors: 1056 json_cols = get_json_cols(df) 1057 for col in json_cols: 1058 df[col] = df[col].apply( 1059 ( 1060 lambda x: json.dumps(x, default=json_serialize_value, sort_keys=True) 1061 if not isinstance(x, Hashable) 1062 else x 1063 ) 1064 ) 1065 1066 if PD_TO_SQLALCHEMY_DTYPES_FLAVORS['uuid'].get(self.flavor, None) != 'Uuid': 1067 uuid_cols = get_uuid_cols(df) 1068 for col in uuid_cols: 1069 df[col] = df[col].astype(str) 1070 1071 try: 1072 with warnings.catch_warnings(): 1073 warnings.filterwarnings('ignore') 1074 df.to_sql(**to_sql_kw) 1075 success = True 1076 except Exception as e: 1077 if not silent: 1078 warn(str(e)) 1079 success, msg = False, traceback.format_exc() 1080 1081 end = time.perf_counter() 1082 if success: 1083 num_rows = len(df) 1084 msg = ( 1085 f"It took {interval_str(timedelta(seconds=(end - start)))} " 1086 + f"to sync {num_rows:,} row" 1087 + ('s' if num_rows != 1 else '') 1088 + f" to {name}." 1089 ) 1090 stats['start'] = start 1091 stats['end'] = end 1092 stats['duration'] = end - start 1093 1094 if debug: 1095 print(" done.", flush=True) 1096 dprint(msg) 1097 1098 stats['success'] = success 1099 stats['msg'] = msg 1100 if as_tuple: 1101 return success, msg 1102 if as_dict: 1103 return stats 1104 return success
Upload a DataFrame's contents to the SQL server.
Parameters
- df (pd.DataFrame): The DataFrame to be inserted.
- name (str): The name of the table to be created.
- index (bool, default False): If True, creates the DataFrame's indices as columns.
- if_exists (str, default 'replace'): Drop and create the table ('replace') or append if it exists ('append') or raise Exception ('fail'). Options are ['replace', 'append', 'fail'].
- method (str, default ''): None or multi. Details on pandas.to_sql.
- chunksize (Optional[int], default -1): How many rows to insert at a time.
- schema (Optional[str], default None):
Optionally override the schema for the table.
Defaults to
SQLConnector.schema
. - safe_copy (bool, defaul True):
If
True
, copy the dataframe before making any changes. - as_tuple (bool, default False):
If
True
, return a (success_bool, message) tuple instead of abool
. Defaults toFalse
. - as_dict (bool, default False):
If
True
, return a dictionary of transaction information. The keys aresuccess
,msg
,start
,end
,duration
,num_rows
,chunksize
,method
, andtarget
. - kw (Any):
Additional arguments will be passed to the DataFrame's
to_sql
function
Returns
- Either a
bool
or aSuccessTuple
(depends onas_tuple
).
638def exec_queries( 639 self, 640 queries: List[ 641 Union[ 642 str, 643 Tuple[str, Callable[['sqlalchemy.orm.session.Session'], List[str]]] 644 ] 645 ], 646 break_on_error: bool = False, 647 rollback: bool = True, 648 silent: bool = False, 649 debug: bool = False, 650) -> List[Union[sqlalchemy.engine.cursor.CursorResult, None]]: 651 """ 652 Execute a list of queries in a single transaction. 653 654 Parameters 655 ---------- 656 queries: List[ 657 Union[ 658 str, 659 Tuple[str, Callable[[], List[str]]] 660 ] 661 ] 662 The queries in the transaction to be executed. 663 If a query is a tuple, the second item of the tuple 664 will be considered a callable hook that returns a list of queries to be executed 665 before the next item in the list. 666 667 break_on_error: bool, default False 668 If `True`, stop executing when a query fails. 669 670 rollback: bool, default True 671 If `break_on_error` is `True`, rollback the transaction if a query fails. 672 673 silent: bool, default False 674 If `True`, suppress warnings. 675 676 Returns 677 ------- 678 A list of SQLAlchemy results. 679 """ 680 from meerschaum.utils.warnings import warn 681 from meerschaum.utils.debug import dprint 682 from meerschaum.utils.packages import attempt_import 683 sqlalchemy, sqlalchemy_orm = attempt_import('sqlalchemy', 'sqlalchemy.orm', lazy=False) 684 session = sqlalchemy_orm.Session(self.engine) 685 686 result = None 687 results = [] 688 with session.begin(): 689 for query in queries: 690 hook = None 691 result = None 692 693 if isinstance(query, tuple): 694 query, hook = query 695 if isinstance(query, str): 696 query = sqlalchemy.text(query) 697 698 if debug: 699 dprint(f"[{self}]\n" + str(query)) 700 701 try: 702 result = session.execute(query) 703 session.flush() 704 except Exception as e: 705 msg = (f"Encountered error while executing:\n{e}") 706 if not silent: 707 warn(msg) 708 elif debug: 709 dprint(f"[{self}]\n" + str(msg)) 710 result = None 711 if result is None and break_on_error: 712 if rollback: 713 session.rollback() 714 results.append(result) 715 break 716 elif result is not None and hook is not None: 717 hook_queries = hook(session) 718 if hook_queries: 719 hook_results = self.exec_queries( 720 hook_queries, 721 break_on_error = break_on_error, 722 rollback=rollback, 723 silent=silent, 724 debug=debug, 725 ) 726 result = (result, hook_results) 727 728 results.append(result) 729 730 return results
Execute a list of queries in a single transaction.
Parameters
- queries (List[): Union[ str, Tuple[str, Callable[[], List[str]]] ]
- ]: The queries in the transaction to be executed. If a query is a tuple, the second item of the tuple will be considered a callable hook that returns a list of queries to be executed before the next item in the list.
- break_on_error (bool, default False):
If
True
, stop executing when a query fails. - rollback (bool, default True):
If
break_on_error
isTrue
, rollback the transaction if a query fails. - silent (bool, default False):
If
True
, suppress warnings.
Returns
- A list of SQLAlchemy results.
1287def get_connection(self, rebuild: bool = False) -> 'sqlalchemy.engine.base.Connection': 1288 """ 1289 Return the current alive connection. 1290 1291 Parameters 1292 ---------- 1293 rebuild: bool, default False 1294 If `True`, close the previous connection and open a new one. 1295 1296 Returns 1297 ------- 1298 A `sqlalchemy.engine.base.Connection` object. 1299 """ 1300 import threading 1301 if '_thread_connections' not in self.__dict__: 1302 self.__dict__['_thread_connections'] = {} 1303 1304 self._cleanup_connections() 1305 1306 thread_id = threading.get_ident() 1307 1308 thread_connections = self.__dict__.get('_thread_connections', {}) 1309 connection = thread_connections.get(thread_id, None) 1310 1311 if rebuild and connection is not None: 1312 try: 1313 connection.close() 1314 except Exception: 1315 pass 1316 1317 _ = thread_connections.pop(thread_id, None) 1318 connection = None 1319 1320 if connection is None or connection.closed: 1321 connection = self.engine.connect() 1322 thread_connections[thread_id] = connection 1323 1324 return connection
Return the current alive connection.
Parameters
- rebuild (bool, default False):
If
True
, close the previous connection and open a new one.
Returns
- A
sqlalchemy.engine.base.Connection
object.
863def test_connection( 864 self, 865 **kw: Any 866) -> Union[bool, None]: 867 """ 868 Test if a successful connection to the database may be made. 869 870 Parameters 871 ---------- 872 **kw: 873 The keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`. 874 875 Returns 876 ------- 877 `True` if a connection is made, otherwise `False` or `None` in case of failure. 878 879 """ 880 import warnings 881 from meerschaum.connectors.poll import retry_connect 882 _default_kw = {'max_retries': 1, 'retry_wait': 0, 'warn': False, 'connector': self} 883 _default_kw.update(kw) 884 with warnings.catch_warnings(): 885 warnings.filterwarnings('ignore', 'Could not') 886 try: 887 return retry_connect(**_default_kw) 888 except Exception: 889 return False
Test if a successful connection to the database may be made.
Parameters
- **kw:: The keyword arguments are passed to
meerschaum.connectors.poll.retry_connect
.
Returns
True
if a connection is made, otherwiseFalse
orNone
in case of failure.
18def fetch( 19 self, 20 pipe: mrsm.Pipe, 21 begin: Union[datetime, int, str, None] = '', 22 end: Union[datetime, int, str, None] = None, 23 check_existing: bool = True, 24 chunksize: Optional[int] = -1, 25 workers: Optional[int] = None, 26 debug: bool = False, 27 **kw: Any 28) -> Union['pd.DataFrame', List[Any], None]: 29 """Execute the SQL definition and return a Pandas DataFrame. 30 31 Parameters 32 ---------- 33 pipe: mrsm.Pipe 34 The pipe object which contains the `fetch` metadata. 35 36 - pipe.columns['datetime']: str 37 - Name of the datetime column for the remote table. 38 - pipe.parameters['fetch']: Dict[str, Any] 39 - Parameters necessary to execute a query. 40 - pipe.parameters['fetch']['definition']: str 41 - Raw SQL query to execute to generate the pandas DataFrame. 42 - pipe.parameters['fetch']['backtrack_minutes']: Union[int, float] 43 - How many minutes before `begin` to search for data (*optional*). 44 45 begin: Union[datetime, int, str, None], default None 46 Most recent datatime to search for data. 47 If `backtrack_minutes` is provided, subtract `backtrack_minutes`. 48 49 end: Union[datetime, int, str, None], default None 50 The latest datetime to search for data. 51 If `end` is `None`, do not bound 52 53 check_existing: bool, defult True 54 If `False`, use a backtrack interval of 0 minutes. 55 56 chunksize: Optional[int], default -1 57 How many rows to load into memory at once. 58 Otherwise the entire result set is loaded into memory. 59 60 workers: Optional[int], default None 61 How many threads to use when consuming the generator. 62 Defaults to the number of cores. 63 64 debug: bool, default False 65 Verbosity toggle. 66 67 Returns 68 ------- 69 A pandas DataFrame generator. 70 """ 71 meta_def = self.get_pipe_metadef( 72 pipe, 73 begin=begin, 74 end=end, 75 check_existing=check_existing, 76 debug=debug, 77 **kw 78 ) 79 chunks = self.read( 80 meta_def, 81 chunksize=chunksize, 82 workers=workers, 83 as_iterator=True, 84 debug=debug, 85 ) 86 return chunks
Execute the SQL definition and return a Pandas DataFrame.
Parameters
pipe (mrsm.Pipe): The pipe object which contains the
fetch
metadata.- pipe.columns['datetime']: str
- Name of the datetime column for the remote table.
- pipe.parameters['fetch']: Dict[str, Any]
- Parameters necessary to execute a query.
- pipe.parameters['fetch']['definition']: str
- Raw SQL query to execute to generate the pandas DataFrame.
- pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
- How many minutes before
begin
to search for data (optional).
- How many minutes before
- pipe.columns['datetime']: str
- begin (Union[datetime, int, str, None], default None):
Most recent datatime to search for data.
If
backtrack_minutes
is provided, subtractbacktrack_minutes
. - end (Union[datetime, int, str, None], default None):
The latest datetime to search for data.
If
end
isNone
, do not bound - check_existing (bool, defult True):
If
False
, use a backtrack interval of 0 minutes. - chunksize (Optional[int], default -1): How many rows to load into memory at once. Otherwise the entire result set is loaded into memory.
- workers (Optional[int], default None): How many threads to use when consuming the generator. Defaults to the number of cores.
- debug (bool, default False): Verbosity toggle.
Returns
- A pandas DataFrame generator.
89def get_pipe_metadef( 90 self, 91 pipe: mrsm.Pipe, 92 params: Optional[Dict[str, Any]] = None, 93 begin: Union[datetime, int, str, None] = '', 94 end: Union[datetime, int, str, None] = None, 95 check_existing: bool = True, 96 debug: bool = False, 97 **kw: Any 98) -> Union[str, None]: 99 """ 100 Return a pipe's meta definition fetch query. 101 102 params: Optional[Dict[str, Any]], default None 103 Optional params dictionary to build the `WHERE` clause. 104 See `meerschaum.utils.sql.build_where`. 105 106 begin: Union[datetime, int, str, None], default None 107 Most recent datatime to search for data. 108 If `backtrack_minutes` is provided, subtract `backtrack_minutes`. 109 110 end: Union[datetime, int, str, None], default None 111 The latest datetime to search for data. 112 If `end` is `None`, do not bound 113 114 check_existing: bool, default True 115 If `True`, apply the backtrack interval. 116 117 debug: bool, default False 118 Verbosity toggle. 119 120 Returns 121 ------- 122 A pipe's meta definition fetch query string. 123 """ 124 from meerschaum.utils.warnings import warn 125 from meerschaum.utils.sql import sql_item_name, dateadd_str, build_where 126 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 127 from meerschaum.config import get_config 128 129 dt_col = pipe.columns.get('datetime', None) 130 if not dt_col: 131 dt_col = pipe.guess_datetime() 132 dt_name = sql_item_name(dt_col, self.flavor, None) if dt_col else None 133 is_guess = True 134 else: 135 dt_name = sql_item_name(dt_col, self.flavor, None) 136 is_guess = False 137 dt_typ = pipe.dtypes.get(dt_col, 'datetime') if dt_col else None 138 db_dt_typ = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None 139 140 if begin not in (None, '') or end is not None: 141 if is_guess: 142 if dt_col is None: 143 warn( 144 f"Unable to determine a datetime column for {pipe}." 145 + "\n Ignoring begin and end...", 146 stack=False, 147 ) 148 begin, end = '', None 149 else: 150 warn( 151 f"A datetime wasn't specified for {pipe}.\n" 152 + f" Using column \"{dt_col}\" for datetime bounds...", 153 stack=False 154 ) 155 156 apply_backtrack = begin == '' and check_existing 157 backtrack_interval = pipe.get_backtrack_interval(check_existing=check_existing, debug=debug) 158 btm = ( 159 int(backtrack_interval.total_seconds() / 60) 160 if isinstance(backtrack_interval, timedelta) 161 else backtrack_interval 162 ) 163 begin = ( 164 pipe.get_sync_time(debug=debug) 165 if begin == '' 166 else begin 167 ) 168 169 if begin not in (None, '') and end is not None and begin >= end: 170 begin = None 171 172 if dt_name: 173 begin_da = ( 174 dateadd_str( 175 flavor=self.flavor, 176 datepart='minute', 177 number=((-1 * btm) if apply_backtrack else 0), 178 begin=begin, 179 db_type=db_dt_typ, 180 ) 181 if begin not in ('', None) 182 else None 183 ) 184 end_da = ( 185 dateadd_str( 186 flavor=self.flavor, 187 datepart='minute', 188 number=0, 189 begin=end, 190 db_type=db_dt_typ, 191 ) 192 if end is not None 193 else None 194 ) 195 196 definition_name = sql_item_name('definition', self.flavor, None) 197 meta_def = ( 198 _simple_fetch_query(pipe, self.flavor) if ( 199 (not (pipe.columns or {}).get('id', None)) 200 or (not get_config('system', 'experimental', 'join_fetch')) 201 ) else _join_fetch_query(pipe, self.flavor, debug=debug, **kw) 202 ) 203 204 has_where = 'where' in meta_def.lower()[meta_def.lower().rfind('definition'):] 205 if dt_name and (begin_da or end_da): 206 definition_dt_name = f"{definition_name}.{dt_name}" 207 meta_def += "\n" + ("AND" if has_where else "WHERE") + " " 208 has_where = True 209 if begin_da: 210 meta_def += f"\n {definition_dt_name}\n >=\n {begin_da}\n" 211 if begin_da and end_da: 212 meta_def += " AND" 213 if end_da: 214 meta_def += f"\n {definition_dt_name}\n <\n {end_da}\n" 215 216 if params is not None: 217 params_where = build_where(params, self, with_where=False) 218 meta_def += "\n " + ("AND" if has_where else "WHERE") + " " 219 has_where = True 220 meta_def += params_where 221 222 return meta_def.rstrip()
Return a pipe's meta definition fetch query.
params: Optional[Dict[str, Any]], default None
Optional params dictionary to build the WHERE
clause.
See meerschaum.utils.sql.build_where
.
begin: Union[datetime, int, str, None], default None
Most recent datatime to search for data.
If backtrack_minutes
is provided, subtract backtrack_minutes
.
end: Union[datetime, int, str, None], default None
The latest datetime to search for data.
If end
is None
, do not bound
check_existing: bool, default True
If True
, apply the backtrack interval.
debug: bool, default False Verbosity toggle.
Returns
- A pipe's meta definition fetch query string.
39def cli( 40 self, 41 debug: bool = False, 42) -> SuccessTuple: 43 """ 44 Launch a subprocess for an interactive CLI. 45 """ 46 from meerschaum.utils.warnings import dprint 47 from meerschaum.utils.venv import venv_exec 48 49 ### Initialize the engine so that dependencies are resolved. 50 _ = self.engine 51 52 env = copy.deepcopy(dict(os.environ)) 53 env_key = f"MRSM_SQL_{self.label.upper()}" 54 env_val = json.dumps(self.meta) 55 env[env_key] = env_val 56 cli_code = ( 57 "import sys\n" 58 "import meerschaum as mrsm\n" 59 "import os\n" 60 f"conn = mrsm.get_connector('sql:{self.label}')\n" 61 "success, msg = conn._cli_exit()\n" 62 "mrsm.pprint((success, msg))\n" 63 "if not success:\n" 64 " raise Exception(msg)" 65 ) 66 if debug: 67 dprint(cli_code) 68 try: 69 _ = venv_exec(cli_code, venv=None, env=env, debug=debug, capture_output=False) 70 except Exception as e: 71 return False, f"[{self}] Failed to start CLI:\n{e}" 72 return True, "Success"
Launch a subprocess for an interactive CLI.
144def fetch_pipes_keys( 145 self, 146 connector_keys: Optional[List[str]] = None, 147 metric_keys: Optional[List[str]] = None, 148 location_keys: Optional[List[str]] = None, 149 tags: Optional[List[str]] = None, 150 params: Optional[Dict[str, Any]] = None, 151 debug: bool = False, 152) -> List[ 153 Tuple[str, str, Union[str, None], Dict[str, Any]] 154 ]: 155 """ 156 Return a list of tuples corresponding to the parameters provided. 157 158 Parameters 159 ---------- 160 connector_keys: Optional[List[str]], default None 161 List of connector_keys to search by. 162 163 metric_keys: Optional[List[str]], default None 164 List of metric_keys to search by. 165 166 location_keys: Optional[List[str]], default None 167 List of location_keys to search by. 168 169 tags: Optional[List[str]], default None 170 List of pipes to search by. 171 172 params: Optional[Dict[str, Any]], default None 173 Dictionary of additional parameters to search by. 174 E.g. `--params pipe_id:1` 175 176 debug: bool, default False 177 Verbosity toggle. 178 179 Returns 180 ------- 181 A list of tuples of pipes' keys and parameters (connector_keys, metric_key, location_key, parameters). 182 """ 183 from meerschaum.utils.packages import attempt_import 184 from meerschaum.utils.misc import separate_negation_values 185 from meerschaum.utils.sql import ( 186 OMIT_NULLSFIRST_FLAVORS, 187 table_exists, 188 json_flavors, 189 ) 190 from meerschaum._internal.static import STATIC_CONFIG 191 import json 192 from copy import deepcopy 193 sqlalchemy, sqlalchemy_sql_functions = attempt_import( 194 'sqlalchemy', 195 'sqlalchemy.sql.functions', lazy=False, 196 ) 197 coalesce = sqlalchemy_sql_functions.coalesce 198 199 if connector_keys is None: 200 connector_keys = [] 201 if metric_keys is None: 202 metric_keys = [] 203 if location_keys is None: 204 location_keys = [] 205 else: 206 location_keys = [ 207 ( 208 lk 209 if lk not in ('[None]', 'None', 'null') 210 else 'None' 211 ) 212 for lk in location_keys 213 ] 214 if tags is None: 215 tags = [] 216 217 if params is None: 218 params = {} 219 220 ### Add three primary keys to params dictionary 221 ### (separated for convenience of arguments). 222 cols = { 223 'connector_keys': [str(ck) for ck in connector_keys], 224 'metric_key': [str(mk) for mk in metric_keys], 225 'location_key': [str(lk) for lk in location_keys], 226 } 227 228 ### Make deep copy so we don't mutate this somewhere else. 229 parameters = deepcopy(params) 230 for col, vals in cols.items(): 231 if vals not in [[], ['*']]: 232 parameters[col] = vals 233 234 if not table_exists('mrsm_pipes', self, schema=self.instance_schema, debug=debug): 235 return [] 236 237 from meerschaum.connectors.sql.tables import get_tables 238 pipes_tbl = get_tables(mrsm_instance=self, create=False, debug=debug)['pipes'] 239 240 _params = {} 241 for k, v in parameters.items(): 242 _v = json.dumps(v) if isinstance(v, dict) else v 243 _params[k] = _v 244 245 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 246 ### Parse regular params. 247 ### If a param begins with '_', negate it instead. 248 _where = [ 249 ( 250 (coalesce(pipes_tbl.c[key], 'None') == val) 251 if not str(val).startswith(negation_prefix) 252 else (pipes_tbl.c[key] != key) 253 ) for key, val in _params.items() 254 if not isinstance(val, (list, tuple)) and key in pipes_tbl.c 255 ] 256 if self.flavor in json_flavors: 257 sqlalchemy_dialects = mrsm.attempt_import('sqlalchemy.dialects', lazy=False) 258 JSONB = sqlalchemy_dialects.postgresql.JSONB 259 else: 260 JSONB = sqlalchemy.String 261 262 select_cols = ( 263 [ 264 pipes_tbl.c.connector_keys, 265 pipes_tbl.c.metric_key, 266 pipes_tbl.c.location_key, 267 pipes_tbl.c.parameters, 268 ] 269 ) 270 271 q = sqlalchemy.select(*select_cols).where(sqlalchemy.and_(True, *_where)) 272 for c, vals in cols.items(): 273 if not isinstance(vals, (list, tuple)) or not vals or c not in pipes_tbl.c: 274 continue 275 _in_vals, _ex_vals = separate_negation_values(vals) 276 q = q.where(coalesce(pipes_tbl.c[c], 'None').in_(_in_vals)) if _in_vals else q 277 q = q.where(coalesce(pipes_tbl.c[c], 'None').not_in(_ex_vals)) if _ex_vals else q 278 279 ### Finally, parse tags. 280 tag_groups = [tag.split(',') for tag in tags] 281 in_ex_tag_groups = [separate_negation_values(tag_group) for tag_group in tag_groups] 282 283 ors, nands = [], [] 284 if self.flavor in json_flavors: 285 tags_jsonb = pipes_tbl.c['parameters'].cast(JSONB).op('->')('tags').cast(JSONB) 286 for _in_tags, _ex_tags in in_ex_tag_groups: 287 if _in_tags: 288 ors.append( 289 sqlalchemy.and_( 290 tags_jsonb.contains(_in_tags) 291 ) 292 ) 293 for xt in _ex_tags: 294 nands.append( 295 sqlalchemy.not_( 296 sqlalchemy.and_( 297 tags_jsonb.contains([xt]) 298 ) 299 ) 300 ) 301 else: 302 for _in_tags, _ex_tags in in_ex_tag_groups: 303 sub_ands = [] 304 for nt in _in_tags: 305 sub_ands.append( 306 sqlalchemy.cast( 307 pipes_tbl.c['parameters'], 308 sqlalchemy.String, 309 ).like(f'%"tags":%"{nt}"%') 310 ) 311 if sub_ands: 312 ors.append(sqlalchemy.and_(*sub_ands)) 313 314 for xt in _ex_tags: 315 nands.append( 316 sqlalchemy.cast( 317 pipes_tbl.c['parameters'], 318 sqlalchemy.String, 319 ).not_like(f'%"tags":%"{xt}"%') 320 ) 321 322 q = q.where(sqlalchemy.and_(*nands)) if nands else q 323 q = q.where(sqlalchemy.or_(*ors)) if ors else q 324 loc_asc = sqlalchemy.asc(pipes_tbl.c['location_key']) 325 if self.flavor not in OMIT_NULLSFIRST_FLAVORS: 326 loc_asc = sqlalchemy.nullsfirst(loc_asc) 327 q = q.order_by( 328 sqlalchemy.asc(pipes_tbl.c['connector_keys']), 329 sqlalchemy.asc(pipes_tbl.c['metric_key']), 330 loc_asc, 331 ) 332 333 ### execute the query and return a list of tuples 334 if debug: 335 dprint(q) 336 try: 337 rows = ( 338 self.execute(q).fetchall() 339 if self.flavor != 'duckdb' 340 else [ 341 (row['connector_keys'], row['metric_key'], row['location_key']) 342 for row in self.read(q).to_dict(orient='records') 343 ] 344 ) 345 except Exception as e: 346 error(str(e)) 347 348 return rows
Return a list of tuples corresponding to the parameters provided.
Parameters
- connector_keys (Optional[List[str]], default None): List of connector_keys to search by.
- metric_keys (Optional[List[str]], default None): List of metric_keys to search by.
- location_keys (Optional[List[str]], default None): List of location_keys to search by.
- tags (Optional[List[str]], default None): List of pipes to search by.
- params (Optional[Dict[str, Any]], default None):
Dictionary of additional parameters to search by.
E.g.
--params pipe_id:1
- debug (bool, default False): Verbosity toggle.
Returns
- A list of tuples of pipes' keys and parameters (connector_keys, metric_key, location_key, parameters).
369def create_indices( 370 self, 371 pipe: mrsm.Pipe, 372 columns: Optional[List[str]] = None, 373 indices: Optional[List[str]] = None, 374 debug: bool = False 375) -> bool: 376 """ 377 Create a pipe's indices. 378 """ 379 if pipe.__dict__.get('_skip_check_indices', False): 380 return True 381 382 if debug: 383 dprint(f"Creating indices for {pipe}...") 384 385 if not pipe.indices: 386 warn(f"{pipe} has no index columns; skipping index creation.", stack=False) 387 return True 388 389 cols_to_include = set((columns or []) + (indices or [])) or None 390 391 pipe._clear_cache_key('_columns_indices', debug=debug) 392 ix_queries = { 393 col: queries 394 for col, queries in self.get_create_index_queries(pipe, debug=debug).items() 395 if cols_to_include is None or col in cols_to_include 396 } 397 success = True 398 for col, queries in ix_queries.items(): 399 ix_success = all(self.exec_queries(queries, debug=debug, silent=False)) 400 success = success and ix_success 401 if not ix_success: 402 warn(f"Failed to create index on column: {col}") 403 404 return success
Create a pipe's indices.
425def drop_indices( 426 self, 427 pipe: mrsm.Pipe, 428 columns: Optional[List[str]] = None, 429 indices: Optional[List[str]] = None, 430 debug: bool = False 431) -> bool: 432 """ 433 Drop a pipe's indices. 434 """ 435 if debug: 436 dprint(f"Dropping indices for {pipe}...") 437 438 if not pipe.indices: 439 warn(f"No indices to drop for {pipe}.", stack=False) 440 return False 441 442 cols_to_include = set((columns or []) + (indices or [])) or None 443 444 ix_queries = { 445 col: queries 446 for col, queries in self.get_drop_index_queries(pipe, debug=debug).items() 447 if cols_to_include is None or col in cols_to_include 448 } 449 success = True 450 for col, queries in ix_queries.items(): 451 ix_success = all(self.exec_queries(queries, debug=debug, silent=(not debug))) 452 if not ix_success: 453 success = False 454 if debug: 455 dprint(f"Failed to drop index on column: {col}") 456 return success
Drop a pipe's indices.
512def get_create_index_queries( 513 self, 514 pipe: mrsm.Pipe, 515 debug: bool = False, 516) -> Dict[str, List[str]]: 517 """ 518 Return a dictionary mapping columns to a `CREATE INDEX` or equivalent query. 519 520 Parameters 521 ---------- 522 pipe: mrsm.Pipe 523 The pipe to which the queries will correspond. 524 525 Returns 526 ------- 527 A dictionary of index names mapping to lists of queries. 528 """ 529 ### NOTE: Due to recent breaking changes in DuckDB, indices don't behave properly. 530 if self.flavor == 'duckdb': 531 return {} 532 from meerschaum.utils.sql import ( 533 sql_item_name, 534 get_distinct_col_count, 535 UPDATE_QUERIES, 536 get_null_replacement, 537 get_create_table_queries, 538 get_rename_table_queries, 539 COALESCE_UNIQUE_INDEX_FLAVORS, 540 ) 541 from meerschaum.utils.dtypes import are_dtypes_equal 542 from meerschaum.utils.dtypes.sql import ( 543 get_db_type_from_pd_type, 544 get_pd_type_from_db_type, 545 AUTO_INCREMENT_COLUMN_FLAVORS, 546 ) 547 from meerschaum.config import get_config 548 index_queries = {} 549 550 upsert = pipe.parameters.get('upsert', False) and (self.flavor + '-upsert') in UPDATE_QUERIES 551 static = pipe.parameters.get('static', False) 552 null_indices = pipe.parameters.get('null_indices', True) 553 index_names = pipe.get_indices() 554 unique_index_name_unquoted = index_names.get('unique', None) or f'IX_{pipe.target}_unique' 555 if upsert: 556 _ = index_names.pop('unique', None) 557 indices = pipe.indices 558 existing_cols_types = pipe.get_columns_types(debug=debug) 559 existing_cols_pd_types = { 560 col: get_pd_type_from_db_type(typ) 561 for col, typ in existing_cols_types.items() 562 } 563 existing_cols_indices = self.get_pipe_columns_indices(pipe, debug=debug) 564 existing_ix_names = set() 565 existing_primary_keys = [] 566 existing_clustered_primary_keys = [] 567 for col, col_indices in existing_cols_indices.items(): 568 for col_ix_doc in col_indices: 569 existing_ix_names.add(col_ix_doc.get('name', '').lower()) 570 if col_ix_doc.get('type', None) == 'PRIMARY KEY': 571 existing_primary_keys.append(col.lower()) 572 if col_ix_doc.get('clustered', True): 573 existing_clustered_primary_keys.append(col.lower()) 574 575 _datetime = pipe.get_columns('datetime', error=False) 576 _datetime_name = ( 577 sql_item_name(_datetime, self.flavor, None) 578 if _datetime is not None else None 579 ) 580 _datetime_index_name = ( 581 sql_item_name(index_names['datetime'], flavor=self.flavor, schema=None) 582 if index_names.get('datetime', None) 583 else None 584 ) 585 _id = pipe.get_columns('id', error=False) 586 _id_name = ( 587 sql_item_name(_id, self.flavor, None) 588 if _id is not None 589 else None 590 ) 591 primary_key = pipe.columns.get('primary', None) 592 primary_key_name = ( 593 sql_item_name(primary_key, flavor=self.flavor, schema=None) 594 if primary_key 595 else None 596 ) 597 autoincrement = ( 598 pipe.parameters.get('autoincrement', False) 599 or ( 600 primary_key is not None 601 and primary_key not in existing_cols_pd_types 602 ) 603 ) 604 primary_key_db_type = ( 605 get_db_type_from_pd_type(pipe.dtypes.get(primary_key, 'int') or 'int', self.flavor) 606 if primary_key 607 else None 608 ) 609 primary_key_constraint_name = ( 610 sql_item_name(f'PK_{pipe.target}', self.flavor, None) 611 if primary_key is not None 612 else None 613 ) 614 primary_key_clustered = "CLUSTERED" if _datetime is None else "NONCLUSTERED" 615 datetime_clustered = ( 616 "CLUSTERED" 617 if not existing_clustered_primary_keys and _datetime is not None 618 else "NONCLUSTERED" 619 ) 620 include_columns_str = "\n ,".join( 621 [ 622 sql_item_name(col, flavor=self.flavor) for col in existing_cols_types 623 if col != _datetime 624 ] 625 ).rstrip(',') 626 include_clause = ( 627 ( 628 f"\nINCLUDE (\n {include_columns_str}\n)" 629 ) 630 if datetime_clustered == 'NONCLUSTERED' 631 else '' 632 ) 633 634 _id_index_name = ( 635 sql_item_name(index_names['id'], self.flavor, None) 636 if index_names.get('id', None) 637 else None 638 ) 639 _pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 640 _create_space_partition = get_config('system', 'experimental', 'space') 641 642 ### create datetime index 643 dt_query = None 644 if _datetime is not None: 645 if ( 646 self.flavor in ('timescaledb', 'timescaledb-ha') 647 and pipe.parameters.get('hypertable', True) 648 ): 649 _id_count = ( 650 get_distinct_col_count(_id, f"SELECT {_id_name} FROM {_pipe_name}", self) 651 if (_id is not None and _create_space_partition) else None 652 ) 653 654 chunk_interval = pipe.get_chunk_interval(debug=debug) 655 chunk_interval_minutes = ( 656 chunk_interval 657 if isinstance(chunk_interval, int) 658 else int(chunk_interval.total_seconds() / 60) 659 ) 660 chunk_time_interval = ( 661 f"INTERVAL '{chunk_interval_minutes} MINUTES'" 662 if isinstance(chunk_interval, timedelta) 663 else f'{chunk_interval_minutes}' 664 ) 665 666 dt_query = ( 667 f"SELECT public.create_hypertable('{_pipe_name}', " + 668 f"'{_datetime}', " 669 + ( 670 f"'{_id}', {_id_count}, " if (_id is not None and _create_space_partition) 671 else '' 672 ) 673 + f'chunk_time_interval => {chunk_time_interval}, ' 674 + 'if_not_exists => true, ' 675 + "migrate_data => true);" 676 ) 677 elif _datetime_index_name and _datetime != primary_key: 678 if self.flavor == 'mssql': 679 dt_query = ( 680 f"CREATE {datetime_clustered} INDEX {_datetime_index_name} " 681 f"\nON {_pipe_name} ({_datetime_name}){include_clause}" 682 ) 683 else: 684 dt_query = ( 685 f"CREATE INDEX {_datetime_index_name} " 686 + f"ON {_pipe_name} ({_datetime_name})" 687 ) 688 689 if dt_query: 690 index_queries[_datetime] = [dt_query] 691 692 primary_queries = [] 693 if ( 694 primary_key is not None 695 and primary_key.lower() not in existing_primary_keys 696 and not static 697 ): 698 if autoincrement and primary_key not in existing_cols_pd_types: 699 autoincrement_str = AUTO_INCREMENT_COLUMN_FLAVORS.get( 700 self.flavor, 701 AUTO_INCREMENT_COLUMN_FLAVORS['default'] 702 ) 703 primary_queries.extend([ 704 ( 705 f"ALTER TABLE {_pipe_name}\n" 706 f"ADD {primary_key_name} {primary_key_db_type} {autoincrement_str}" 707 ), 708 ]) 709 elif not autoincrement and primary_key in existing_cols_pd_types: 710 if self.flavor in ('sqlite', 'geopackage'): 711 new_table_name = sql_item_name( 712 f'_new_{pipe.target}', 713 self.flavor, 714 self.get_pipe_schema(pipe) 715 ) 716 select_cols_str = ', '.join( 717 [ 718 sql_item_name(col, self.flavor, None) 719 for col in existing_cols_types 720 ] 721 ) 722 primary_queries.extend( 723 get_create_table_queries( 724 existing_cols_pd_types, 725 f'_new_{pipe.target}', 726 self.flavor, 727 schema=self.get_pipe_schema(pipe), 728 primary_key=primary_key, 729 ) + [ 730 ( 731 f"INSERT INTO {new_table_name} ({select_cols_str})\n" 732 f"SELECT {select_cols_str}\nFROM {_pipe_name}" 733 ), 734 f"DROP TABLE {_pipe_name}", 735 ] + get_rename_table_queries( 736 f'_new_{pipe.target}', 737 pipe.target, 738 self.flavor, 739 schema=self.get_pipe_schema(pipe), 740 ) 741 ) 742 elif self.flavor == 'oracle': 743 primary_queries.extend([ 744 ( 745 f"ALTER TABLE {_pipe_name}\n" 746 f"MODIFY {primary_key_name} NOT NULL" 747 ), 748 ( 749 f"ALTER TABLE {_pipe_name}\n" 750 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})" 751 ) 752 ]) 753 elif self.flavor in ('mysql', 'mariadb'): 754 primary_queries.extend([ 755 ( 756 f"ALTER TABLE {_pipe_name}\n" 757 f"MODIFY {primary_key_name} {primary_key_db_type} NOT NULL" 758 ), 759 ( 760 f"ALTER TABLE {_pipe_name}\n" 761 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})" 762 ) 763 ]) 764 elif self.flavor in ('timescaledb', 'timescaledb-ha'): 765 primary_queries.extend([ 766 ( 767 f"ALTER TABLE {_pipe_name}\n" 768 f"ALTER COLUMN {primary_key_name} SET NOT NULL" 769 ), 770 ( 771 f"ALTER TABLE {_pipe_name}\n" 772 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY (" + ( 773 f"{_datetime_name}, " if _datetime_name else "" 774 ) + f"{primary_key_name})" 775 ), 776 ]) 777 elif self.flavor in ('citus', 'postgresql', 'duckdb', 'postgis'): 778 primary_queries.extend([ 779 ( 780 f"ALTER TABLE {_pipe_name}\n" 781 f"ALTER COLUMN {primary_key_name} SET NOT NULL" 782 ), 783 ( 784 f"ALTER TABLE {_pipe_name}\n" 785 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})" 786 ), 787 ]) 788 else: 789 primary_queries.extend([ 790 ( 791 f"ALTER TABLE {_pipe_name}\n" 792 f"ALTER COLUMN {primary_key_name} {primary_key_db_type} NOT NULL" 793 ), 794 ( 795 f"ALTER TABLE {_pipe_name}\n" 796 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY {primary_key_clustered} ({primary_key_name})" 797 ), 798 ]) 799 index_queries[primary_key] = primary_queries 800 801 ### create id index 802 if _id_name is not None: 803 if self.flavor in ('timescaledb', 'timescaledb-ha'): 804 ### Already created indices via create_hypertable. 805 id_query = ( 806 None if (_id is not None and _create_space_partition) 807 else ( 808 f"CREATE INDEX IF NOT EXISTS {_id_index_name} ON {_pipe_name} ({_id_name})" 809 if _id is not None 810 else None 811 ) 812 ) 813 pass 814 else: ### mssql, sqlite, etc. 815 id_query = f"CREATE INDEX {_id_index_name} ON {_pipe_name} ({_id_name})" 816 817 if id_query is not None: 818 index_queries[_id] = id_query if isinstance(id_query, list) else [id_query] 819 820 ### Create indices for other labels in `pipe.columns`. 821 other_index_names = { 822 ix_key: ix_unquoted 823 for ix_key, ix_unquoted in index_names.items() 824 if ( 825 ix_key not in ('datetime', 'id', 'primary') 826 and ix_unquoted.lower() not in existing_ix_names 827 ) 828 } 829 for ix_key, ix_unquoted in other_index_names.items(): 830 ix_name = sql_item_name(ix_unquoted, self.flavor, None) 831 cols = indices[ix_key] 832 if not isinstance(cols, (list, tuple)): 833 cols = [cols] 834 if ix_key == 'unique' and upsert: 835 continue 836 cols_names = [sql_item_name(col, self.flavor, None) for col in cols if col] 837 if not cols_names: 838 continue 839 840 cols_names_str = ", ".join(cols_names) 841 index_query_params_clause = f" ({cols_names_str})" 842 if self.flavor in ('postgis', 'timescaledb-ha'): 843 for col in cols: 844 col_typ = existing_cols_pd_types.get(cols[0], 'object') 845 if col_typ != 'object' and are_dtypes_equal(col_typ, 'geometry'): 846 index_query_params_clause = f" USING GIST ({cols_names_str})" 847 break 848 849 index_queries[ix_key] = [ 850 f"CREATE INDEX {ix_name} ON {_pipe_name}{index_query_params_clause}" 851 ] 852 853 indices_cols_str = ', '.join( 854 list({ 855 sql_item_name(ix, self.flavor) 856 for ix_key, ix in pipe.columns.items() 857 if ix and ix in existing_cols_types 858 }) 859 ) 860 coalesce_indices_cols_str = ', '.join( 861 [ 862 ( 863 ( 864 "COALESCE(" 865 + sql_item_name(ix, self.flavor) 866 + ", " 867 + get_null_replacement(existing_cols_types[ix], self.flavor) 868 + ") " 869 ) 870 if ix_key != 'datetime' and null_indices 871 else sql_item_name(ix, self.flavor) 872 ) 873 for ix_key, ix in pipe.columns.items() 874 if ix and ix in existing_cols_types 875 ] 876 ) 877 unique_index_name = sql_item_name(unique_index_name_unquoted, self.flavor) 878 constraint_name_unquoted = unique_index_name_unquoted.replace('IX_', 'UQ_') 879 constraint_name = sql_item_name(constraint_name_unquoted, self.flavor) 880 add_constraint_query = ( 881 f"ALTER TABLE {_pipe_name} ADD CONSTRAINT {constraint_name} UNIQUE ({indices_cols_str})" 882 ) 883 unique_index_cols_str = ( 884 indices_cols_str 885 if self.flavor not in COALESCE_UNIQUE_INDEX_FLAVORS or not null_indices 886 else coalesce_indices_cols_str 887 ) 888 create_unique_index_query = ( 889 f"CREATE UNIQUE INDEX {unique_index_name} ON {_pipe_name} ({unique_index_cols_str})" 890 ) 891 constraint_queries = [create_unique_index_query] 892 if self.flavor not in ('sqlite', 'geopackage'): 893 constraint_queries.append(add_constraint_query) 894 if upsert and indices_cols_str: 895 index_queries[unique_index_name] = constraint_queries 896 return index_queries
Return a dictionary mapping columns to a CREATE INDEX
or equivalent query.
Parameters
- pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
- A dictionary of index names mapping to lists of queries.
899def get_drop_index_queries( 900 self, 901 pipe: mrsm.Pipe, 902 debug: bool = False, 903) -> Dict[str, List[str]]: 904 """ 905 Return a dictionary mapping columns to a `DROP INDEX` or equivalent query. 906 907 Parameters 908 ---------- 909 pipe: mrsm.Pipe 910 The pipe to which the queries will correspond. 911 912 Returns 913 ------- 914 A dictionary of column names mapping to lists of queries. 915 """ 916 ### NOTE: Due to breaking changes within DuckDB, indices must be skipped. 917 if self.flavor == 'duckdb': 918 return {} 919 if not pipe.exists(debug=debug): 920 return {} 921 922 from collections import defaultdict 923 from meerschaum.utils.sql import ( 924 sql_item_name, 925 table_exists, 926 hypertable_queries, 927 DROP_INDEX_IF_EXISTS_FLAVORS, 928 ) 929 drop_queries = defaultdict(lambda: []) 930 schema = self.get_pipe_schema(pipe) 931 index_schema = schema if self.flavor != 'mssql' else None 932 indices = { 933 ix_key: ix 934 for ix_key, ix in pipe.get_indices().items() 935 } 936 cols_indices = pipe.get_columns_indices(debug=debug) 937 existing_indices = set() 938 clustered_ix = None 939 for col, ix_metas in cols_indices.items(): 940 for ix_meta in ix_metas: 941 ix_name = ix_meta.get('name', None) 942 if ix_meta.get('clustered', False): 943 clustered_ix = ix_name 944 existing_indices.add(ix_name.lower()) 945 pipe_name = sql_item_name(pipe.target, self.flavor, schema) 946 pipe_name_no_schema = sql_item_name(pipe.target, self.flavor, None) 947 upsert = pipe.upsert 948 949 if self.flavor not in hypertable_queries: 950 is_hypertable = False 951 else: 952 is_hypertable_query = hypertable_queries[self.flavor].format(table_name=pipe_name) 953 is_hypertable = self.value(is_hypertable_query, silent=True, debug=debug) is not None 954 955 if_exists_str = "IF EXISTS " if self.flavor in DROP_INDEX_IF_EXISTS_FLAVORS else "" 956 if is_hypertable: 957 nuke_queries = [] 958 temp_table = '_' + pipe.target + '_temp_migration' 959 temp_table_name = sql_item_name(temp_table, self.flavor, self.get_pipe_schema(pipe)) 960 961 if table_exists(temp_table, self, schema=self.get_pipe_schema(pipe), debug=debug): 962 nuke_queries.append(f"DROP TABLE {if_exists_str} {temp_table_name}") 963 nuke_queries += [ 964 f"SELECT * INTO {temp_table_name} FROM {pipe_name}", 965 f"DROP TABLE {if_exists_str}{pipe_name}", 966 f"ALTER TABLE {temp_table_name} RENAME TO {pipe_name_no_schema}", 967 ] 968 nuke_ix_keys = ('datetime', 'id') 969 nuked = False 970 for ix_key in nuke_ix_keys: 971 if ix_key in indices and not nuked: 972 drop_queries[ix_key].extend(nuke_queries) 973 nuked = True 974 975 for ix_key, ix_unquoted in indices.items(): 976 if ix_key in drop_queries: 977 continue 978 if ix_unquoted.lower() not in existing_indices: 979 continue 980 981 if ( 982 ix_key == 'unique' 983 and upsert 984 and self.flavor not in ('sqlite', 'geopackage') 985 and not is_hypertable 986 ): 987 constraint_name_unquoted = ix_unquoted.replace('IX_', 'UQ_') 988 constraint_name = sql_item_name(constraint_name_unquoted, self.flavor) 989 constraint_or_index = ( 990 "CONSTRAINT" 991 if self.flavor not in ('mysql', 'mariadb') 992 else 'INDEX' 993 ) 994 drop_queries[ix_key].append( 995 f"ALTER TABLE {pipe_name}\n" 996 f"DROP {constraint_or_index} {constraint_name}" 997 ) 998 999 query = ( 1000 ( 1001 f"ALTER TABLE {pipe_name}\n" 1002 if self.flavor in ('mysql', 'mariadb') 1003 else '' 1004 ) 1005 + f"DROP INDEX {if_exists_str}" 1006 + sql_item_name(ix_unquoted, self.flavor, index_schema) 1007 ) 1008 if self.flavor == 'mssql': 1009 query += f"\nON {pipe_name}" 1010 if ix_unquoted == clustered_ix: 1011 query += "\nWITH (ONLINE = ON, MAXDOP = 4)" 1012 drop_queries[ix_key].append(query) 1013 1014 1015 return drop_queries
Return a dictionary mapping columns to a DROP INDEX
or equivalent query.
Parameters
- pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
- A dictionary of column names mapping to lists of queries.
3158def get_add_columns_queries( 3159 self, 3160 pipe: mrsm.Pipe, 3161 df: Union[pd.DataFrame, Dict[str, str]], 3162 _is_db_types: bool = False, 3163 debug: bool = False, 3164) -> List[str]: 3165 """ 3166 Add new null columns of the correct type to a table from a dataframe. 3167 3168 Parameters 3169 ---------- 3170 pipe: mrsm.Pipe 3171 The pipe to be altered. 3172 3173 df: Union[pd.DataFrame, Dict[str, str]] 3174 The pandas DataFrame which contains new columns. 3175 If a dictionary is provided, assume it maps columns to Pandas data types. 3176 3177 _is_db_types: bool, default False 3178 If `True`, assume `df` is a dictionary mapping columns to SQL native dtypes. 3179 3180 Returns 3181 ------- 3182 A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector. 3183 """ 3184 if not pipe.exists(debug=debug): 3185 return [] 3186 3187 if pipe.parameters.get('static', False): 3188 return [] 3189 3190 from decimal import Decimal 3191 import copy 3192 from meerschaum.utils.sql import ( 3193 sql_item_name, 3194 SINGLE_ALTER_TABLE_FLAVORS, 3195 get_table_cols_types, 3196 ) 3197 from meerschaum.utils.dtypes.sql import ( 3198 get_pd_type_from_db_type, 3199 get_db_type_from_pd_type, 3200 ) 3201 from meerschaum.utils.misc import flatten_list 3202 is_dask = 'dask' in df.__module__ if not isinstance(df, dict) else False 3203 if is_dask: 3204 df = df.partitions[0].compute() 3205 df_cols_types = ( 3206 { 3207 col: str(typ) 3208 for col, typ in df.dtypes.items() 3209 } 3210 if not isinstance(df, dict) 3211 else copy.deepcopy(df) 3212 ) 3213 if not isinstance(df, dict) and len(df.index) > 0: 3214 for col, typ in list(df_cols_types.items()): 3215 if typ != 'object': 3216 continue 3217 val = df.iloc[0][col] 3218 if isinstance(val, (dict, list)): 3219 df_cols_types[col] = 'json' 3220 elif isinstance(val, Decimal): 3221 df_cols_types[col] = 'numeric' 3222 elif isinstance(val, str): 3223 df_cols_types[col] = 'str' 3224 db_cols_types = { 3225 col: get_pd_type_from_db_type(typ) 3226 for col, typ in get_table_cols_types( 3227 pipe.target, 3228 self, 3229 schema=self.get_pipe_schema(pipe), 3230 debug=debug, 3231 ).items() 3232 } 3233 new_cols = set(df_cols_types) - set(db_cols_types) 3234 if not new_cols: 3235 return [] 3236 3237 new_cols_types = { 3238 col: get_db_type_from_pd_type( 3239 df_cols_types[col], 3240 self.flavor 3241 ) 3242 for col in new_cols 3243 if col and df_cols_types.get(col, None) 3244 } 3245 3246 alter_table_query = "ALTER TABLE " + sql_item_name( 3247 pipe.target, self.flavor, self.get_pipe_schema(pipe) 3248 ) 3249 queries = [] 3250 for col, typ in new_cols_types.items(): 3251 add_col_query = ( 3252 "\nADD " 3253 + sql_item_name(col, self.flavor, None) 3254 + " " + typ + "," 3255 ) 3256 3257 if self.flavor in SINGLE_ALTER_TABLE_FLAVORS: 3258 queries.append(alter_table_query + add_col_query[:-1]) 3259 else: 3260 alter_table_query += add_col_query 3261 3262 ### For most flavors, only one query is required. 3263 ### This covers SQLite which requires one query per column. 3264 if not queries: 3265 queries.append(alter_table_query[:-1]) 3266 3267 if self.flavor != 'duckdb': 3268 return queries 3269 3270 ### NOTE: For DuckDB, we must drop and rebuild the indices. 3271 drop_index_queries = list(flatten_list( 3272 [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()] 3273 )) 3274 create_index_queries = list(flatten_list( 3275 [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()] 3276 )) 3277 3278 return drop_index_queries + queries + create_index_queries
Add new null columns of the correct type to a table from a dataframe.
Parameters
- pipe (mrsm.Pipe): The pipe to be altered.
- df (Union[pd.DataFrame, Dict[str, str]]): The pandas DataFrame which contains new columns. If a dictionary is provided, assume it maps columns to Pandas data types.
- _is_db_types (bool, default False):
If
True
, assumedf
is a dictionary mapping columns to SQL native dtypes.
Returns
- A list of the
ALTER TABLE
SQL query or queries to be executed on the provided connector.
3281def get_alter_columns_queries( 3282 self, 3283 pipe: mrsm.Pipe, 3284 df: Union[pd.DataFrame, Dict[str, str]], 3285 debug: bool = False, 3286) -> List[str]: 3287 """ 3288 If we encounter a column of a different type, set the entire column to text. 3289 If the altered columns are numeric, alter to numeric instead. 3290 3291 Parameters 3292 ---------- 3293 pipe: mrsm.Pipe 3294 The pipe to be altered. 3295 3296 df: Union[pd.DataFrame, Dict[str, str]] 3297 The pandas DataFrame which may contain altered columns. 3298 If a dict is provided, assume it maps columns to Pandas data types. 3299 3300 Returns 3301 ------- 3302 A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector. 3303 """ 3304 if not pipe.exists(debug=debug) or pipe.static: 3305 return [] 3306 3307 from meerschaum.utils.sql import ( 3308 sql_item_name, 3309 get_table_cols_types, 3310 DROP_IF_EXISTS_FLAVORS, 3311 SINGLE_ALTER_TABLE_FLAVORS, 3312 ) 3313 from meerschaum.utils.dataframe import get_numeric_cols 3314 from meerschaum.utils.dtypes import are_dtypes_equal 3315 from meerschaum.utils.dtypes.sql import ( 3316 get_pd_type_from_db_type, 3317 get_db_type_from_pd_type, 3318 ) 3319 from meerschaum.utils.misc import flatten_list, generate_password, items_str 3320 target = pipe.target 3321 session_id = generate_password(3) 3322 numeric_cols = ( 3323 get_numeric_cols(df) 3324 if not isinstance(df, dict) 3325 else [ 3326 col 3327 for col, typ in df.items() 3328 if typ.startswith('numeric') 3329 ] 3330 ) 3331 df_cols_types = ( 3332 { 3333 col: str(typ) 3334 for col, typ in df.dtypes.items() 3335 } 3336 if not isinstance(df, dict) 3337 else df 3338 ) 3339 db_cols_types = { 3340 col: get_pd_type_from_db_type(typ) 3341 for col, typ in get_table_cols_types( 3342 pipe.target, 3343 self, 3344 schema=self.get_pipe_schema(pipe), 3345 debug=debug, 3346 ).items() 3347 } 3348 pipe_dtypes = pipe.get_dtypes(debug=debug) 3349 pipe_bool_cols = [col for col, typ in pipe_dtypes.items() if are_dtypes_equal(str(typ), 'bool')] 3350 pd_db_df_aliases = { 3351 'int': 'bool', 3352 'float': 'bool', 3353 'numeric': 'bool', 3354 'guid': 'object', 3355 } 3356 if self.flavor == 'oracle': 3357 pd_db_df_aliases.update({ 3358 'int': 'numeric', 3359 'date': 'datetime', 3360 'numeric': 'int', 3361 }) 3362 3363 altered_cols = { 3364 col: (db_cols_types.get(col, 'object'), typ) 3365 for col, typ in df_cols_types.items() 3366 if not are_dtypes_equal(typ, db_cols_types.get(col, 'object').lower()) 3367 and not are_dtypes_equal(db_cols_types.get(col, 'object'), 'string') 3368 } 3369 3370 if debug and altered_cols: 3371 dprint("Columns to be altered:") 3372 mrsm.pprint(altered_cols) 3373 3374 ### NOTE: Special columns (numerics, bools, etc.) are captured and cached upon detection. 3375 new_special_cols = pipe._get_cached_value('new_special_cols', debug=debug) or {} 3376 new_special_db_cols_types = { 3377 col: (db_cols_types.get(col, 'object'), typ) 3378 for col, typ in new_special_cols.items() 3379 } 3380 if debug: 3381 dprint("Cached new special columns:") 3382 mrsm.pprint(new_special_cols) 3383 dprint("New special columns db types:") 3384 mrsm.pprint(new_special_db_cols_types) 3385 3386 altered_cols.update(new_special_db_cols_types) 3387 3388 ### NOTE: Sometimes bools are coerced into ints or floats. 3389 altered_cols_to_ignore = set() 3390 for col, (db_typ, df_typ) in altered_cols.items(): 3391 for db_alias, df_alias in pd_db_df_aliases.items(): 3392 if ( 3393 db_alias in db_typ.lower() 3394 and df_alias in df_typ.lower() 3395 and col not in new_special_cols 3396 ): 3397 altered_cols_to_ignore.add(col) 3398 3399 ### Oracle's bool handling sometimes mixes NUMBER and INT. 3400 for bool_col in pipe_bool_cols: 3401 if bool_col not in altered_cols: 3402 continue 3403 db_is_bool_compatible = ( 3404 are_dtypes_equal('int', altered_cols[bool_col][0]) 3405 or are_dtypes_equal('float', altered_cols[bool_col][0]) 3406 or are_dtypes_equal('numeric', altered_cols[bool_col][0]) 3407 or are_dtypes_equal('bool', altered_cols[bool_col][0]) 3408 ) 3409 df_is_bool_compatible = ( 3410 are_dtypes_equal('int', altered_cols[bool_col][1]) 3411 or are_dtypes_equal('float', altered_cols[bool_col][1]) 3412 or are_dtypes_equal('numeric', altered_cols[bool_col][1]) 3413 or are_dtypes_equal('bool', altered_cols[bool_col][1]) 3414 ) 3415 if db_is_bool_compatible and df_is_bool_compatible: 3416 altered_cols_to_ignore.add(bool_col) 3417 3418 if debug and altered_cols_to_ignore: 3419 dprint("Ignoring the following altered columns (false positives).") 3420 mrsm.pprint(altered_cols_to_ignore) 3421 3422 for col in altered_cols_to_ignore: 3423 _ = altered_cols.pop(col, None) 3424 3425 if not altered_cols: 3426 return [] 3427 3428 if numeric_cols: 3429 explicit_pipe_dtypes = pipe.get_dtypes(infer=False, debug=debug) 3430 explicit_pipe_dtypes.update({col: 'numeric' for col in numeric_cols}) 3431 pipe.dtypes = explicit_pipe_dtypes 3432 if not pipe.temporary: 3433 edit_success, edit_msg = pipe.edit(debug=debug) 3434 if not edit_success: 3435 warn( 3436 f"Failed to update dtypes for numeric columns {items_str(numeric_cols)}:\n" 3437 + f"{edit_msg}" 3438 ) 3439 else: 3440 numeric_cols.extend([col for col, typ in pipe_dtypes.items() if typ.startswith('numeric')]) 3441 3442 numeric_type = get_db_type_from_pd_type('numeric', self.flavor, as_sqlalchemy=False) 3443 text_type = get_db_type_from_pd_type('str', self.flavor, as_sqlalchemy=False) 3444 altered_cols_types = { 3445 col: ( 3446 numeric_type 3447 if col in numeric_cols 3448 else text_type 3449 ) 3450 for col, (db_typ, typ) in altered_cols.items() 3451 } 3452 3453 if self.flavor in ('sqlite', 'geopackage'): 3454 temp_table_name = '-' + session_id + '_' + target 3455 rename_query = ( 3456 "ALTER TABLE " 3457 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3458 + " RENAME TO " 3459 + sql_item_name(temp_table_name, self.flavor, None) 3460 ) 3461 create_query = ( 3462 "CREATE TABLE " 3463 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3464 + " (\n" 3465 ) 3466 for col_name, col_typ in db_cols_types.items(): 3467 create_query += ( 3468 sql_item_name(col_name, self.flavor, None) 3469 + " " 3470 + ( 3471 col_typ 3472 if col_name not in altered_cols 3473 else altered_cols_types[col_name] 3474 ) 3475 + ",\n" 3476 ) 3477 create_query = create_query[:-2] + "\n)" 3478 3479 insert_query = ( 3480 "INSERT INTO " 3481 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3482 + ' (' 3483 + ', '.join([ 3484 sql_item_name(col_name, self.flavor, None) 3485 for col_name in db_cols_types 3486 ]) 3487 + ')' 3488 + "\nSELECT\n" 3489 ) 3490 for col_name in db_cols_types: 3491 new_col_str = ( 3492 sql_item_name(col_name, self.flavor, None) 3493 if col_name not in altered_cols 3494 else ( 3495 "CAST(" 3496 + sql_item_name(col_name, self.flavor, None) 3497 + " AS " 3498 + altered_cols_types[col_name] 3499 + ")" 3500 ) 3501 ) 3502 insert_query += new_col_str + ",\n" 3503 3504 insert_query = insert_query[:-2] + ( 3505 f"\nFROM {sql_item_name(temp_table_name, self.flavor, self.get_pipe_schema(pipe))}" 3506 ) 3507 3508 if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else "" 3509 3510 drop_query = f"DROP TABLE {if_exists_str}" + sql_item_name( 3511 temp_table_name, self.flavor, self.get_pipe_schema(pipe) 3512 ) 3513 return [ 3514 rename_query, 3515 create_query, 3516 insert_query, 3517 drop_query, 3518 ] 3519 3520 queries = [] 3521 if self.flavor == 'oracle': 3522 for col, typ in altered_cols_types.items(): 3523 add_query = ( 3524 "ALTER TABLE " 3525 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3526 + "\nADD " + sql_item_name(col + '_temp', self.flavor, None) 3527 + " " + typ 3528 ) 3529 queries.append(add_query) 3530 3531 for col, typ in altered_cols_types.items(): 3532 populate_temp_query = ( 3533 "UPDATE " 3534 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3535 + "\nSET " + sql_item_name(col + '_temp', self.flavor, None) 3536 + ' = ' + sql_item_name(col, self.flavor, None) 3537 ) 3538 queries.append(populate_temp_query) 3539 3540 for col, typ in altered_cols_types.items(): 3541 set_old_cols_to_null_query = ( 3542 "UPDATE " 3543 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3544 + "\nSET " + sql_item_name(col, self.flavor, None) 3545 + ' = NULL' 3546 ) 3547 queries.append(set_old_cols_to_null_query) 3548 3549 for col, typ in altered_cols_types.items(): 3550 alter_type_query = ( 3551 "ALTER TABLE " 3552 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3553 + "\nMODIFY " + sql_item_name(col, self.flavor, None) + ' ' 3554 + typ 3555 ) 3556 queries.append(alter_type_query) 3557 3558 for col, typ in altered_cols_types.items(): 3559 set_old_to_temp_query = ( 3560 "UPDATE " 3561 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3562 + "\nSET " + sql_item_name(col, self.flavor, None) 3563 + ' = ' + sql_item_name(col + '_temp', self.flavor, None) 3564 ) 3565 queries.append(set_old_to_temp_query) 3566 3567 for col, typ in altered_cols_types.items(): 3568 drop_temp_query = ( 3569 "ALTER TABLE " 3570 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3571 + "\nDROP COLUMN " + sql_item_name(col + '_temp', self.flavor, None) 3572 ) 3573 queries.append(drop_temp_query) 3574 3575 return queries 3576 3577 query = "ALTER TABLE " + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3578 for col, typ in altered_cols_types.items(): 3579 alter_col_prefix = ( 3580 'ALTER' if self.flavor not in ('mysql', 'mariadb', 'oracle') 3581 else 'MODIFY' 3582 ) 3583 type_prefix = ( 3584 '' if self.flavor in ('mssql', 'mariadb', 'mysql') 3585 else 'TYPE ' 3586 ) 3587 column_str = 'COLUMN' if self.flavor != 'oracle' else '' 3588 query_suffix = ( 3589 f"\n{alter_col_prefix} {column_str} " 3590 + sql_item_name(col, self.flavor, None) 3591 + " " + type_prefix + typ + "," 3592 ) 3593 if self.flavor not in SINGLE_ALTER_TABLE_FLAVORS: 3594 query += query_suffix 3595 else: 3596 queries.append(query + query_suffix[:-1]) 3597 3598 if self.flavor not in SINGLE_ALTER_TABLE_FLAVORS: 3599 queries.append(query[:-1]) 3600 3601 if self.flavor != 'duckdb': 3602 return queries 3603 3604 drop_index_queries = list(flatten_list( 3605 [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()] 3606 )) 3607 create_index_queries = list(flatten_list( 3608 [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()] 3609 )) 3610 3611 return drop_index_queries + queries + create_index_queries
If we encounter a column of a different type, set the entire column to text. If the altered columns are numeric, alter to numeric instead.
Parameters
- pipe (mrsm.Pipe): The pipe to be altered.
- df (Union[pd.DataFrame, Dict[str, str]]): The pandas DataFrame which may contain altered columns. If a dict is provided, assume it maps columns to Pandas data types.
Returns
- A list of the
ALTER TABLE
SQL query or queries to be executed on the provided connector.
1018def delete_pipe( 1019 self, 1020 pipe: mrsm.Pipe, 1021 debug: bool = False, 1022) -> SuccessTuple: 1023 """ 1024 Delete a Pipe's registration. 1025 """ 1026 from meerschaum.utils.packages import attempt_import 1027 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 1028 1029 if not pipe.id: 1030 return False, f"{pipe} is not registered." 1031 1032 ### ensure pipes table exists 1033 from meerschaum.connectors.sql.tables import get_tables 1034 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 1035 1036 q = sqlalchemy.delete(pipes_tbl).where(pipes_tbl.c.pipe_id == pipe.id) 1037 if not self.exec(q, debug=debug): 1038 return False, f"Failed to delete registration for {pipe}." 1039 1040 return True, "Success"
Delete a Pipe's registration.
1043def get_pipe_data( 1044 self, 1045 pipe: mrsm.Pipe, 1046 select_columns: Optional[List[str]] = None, 1047 omit_columns: Optional[List[str]] = None, 1048 begin: Union[datetime, str, None] = None, 1049 end: Union[datetime, str, None] = None, 1050 params: Optional[Dict[str, Any]] = None, 1051 order: str = 'asc', 1052 limit: Optional[int] = None, 1053 begin_add_minutes: int = 0, 1054 end_add_minutes: int = 0, 1055 chunksize: Optional[int] = -1, 1056 as_iterator: bool = False, 1057 debug: bool = False, 1058 **kw: Any 1059) -> Union[pd.DataFrame, None]: 1060 """ 1061 Access a pipe's data from the SQL instance. 1062 1063 Parameters 1064 ---------- 1065 pipe: mrsm.Pipe: 1066 The pipe to get data from. 1067 1068 select_columns: Optional[List[str]], default None 1069 If provided, only select these given columns. 1070 Otherwise select all available columns (i.e. `SELECT *`). 1071 1072 omit_columns: Optional[List[str]], default None 1073 If provided, remove these columns from the selection. 1074 1075 begin: Union[datetime, str, None], default None 1076 If provided, get rows newer than or equal to this value. 1077 1078 end: Union[datetime, str, None], default None 1079 If provided, get rows older than or equal to this value. 1080 1081 params: Optional[Dict[str, Any]], default None 1082 Additional parameters to filter by. 1083 See `meerschaum.connectors.sql.build_where`. 1084 1085 order: Optional[str], default 'asc' 1086 The selection order for all of the indices in the query. 1087 If `None`, omit the `ORDER BY` clause. 1088 1089 limit: Optional[int], default None 1090 If specified, limit the number of rows retrieved to this value. 1091 1092 begin_add_minutes: int, default 0 1093 The number of minutes to add to the `begin` datetime (i.e. `DATEADD`). 1094 1095 end_add_minutes: int, default 0 1096 The number of minutes to add to the `end` datetime (i.e. `DATEADD`). 1097 1098 chunksize: Optional[int], default -1 1099 The size of dataframe chunks to load into memory. 1100 1101 as_iterator: bool, default False 1102 If `True`, return the chunks iterator directly. 1103 1104 debug: bool, default False 1105 Verbosity toggle. 1106 1107 Returns 1108 ------- 1109 A `pd.DataFrame` of the pipe's data. 1110 1111 """ 1112 import functools 1113 from meerschaum.utils.packages import import_pandas 1114 from meerschaum.utils.dtypes import to_pandas_dtype, are_dtypes_equal 1115 from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type 1116 pd = import_pandas() 1117 is_dask = 'dask' in pd.__name__ 1118 1119 cols_types = pipe.get_columns_types(debug=debug) if pipe.enforce else {} 1120 pipe_dtypes = pipe.get_dtypes(infer=False, debug=debug) if pipe.enforce else {} 1121 1122 remote_pandas_types = { 1123 col: to_pandas_dtype(get_pd_type_from_db_type(typ)) 1124 for col, typ in cols_types.items() 1125 } 1126 remote_dt_cols_types = { 1127 col: typ 1128 for col, typ in remote_pandas_types.items() 1129 if are_dtypes_equal(typ, 'datetime') 1130 } 1131 remote_dt_tz_aware_cols_types = { 1132 col: typ 1133 for col, typ in remote_dt_cols_types.items() 1134 if ',' in typ or typ == 'datetime' 1135 } 1136 remote_dt_tz_naive_cols_types = { 1137 col: typ 1138 for col, typ in remote_dt_cols_types.items() 1139 if col not in remote_dt_tz_aware_cols_types 1140 } 1141 1142 configured_pandas_types = { 1143 col: to_pandas_dtype(typ) 1144 for col, typ in pipe_dtypes.items() 1145 } 1146 configured_lower_precision_dt_cols_types = { 1147 col: typ 1148 for col, typ in pipe_dtypes.items() 1149 if ( 1150 are_dtypes_equal('datetime', typ) 1151 and '[' in typ 1152 and 'ns' not in typ 1153 ) 1154 1155 } 1156 1157 dtypes = { 1158 **remote_pandas_types, 1159 **configured_pandas_types, 1160 **remote_dt_tz_aware_cols_types, 1161 **remote_dt_tz_naive_cols_types, 1162 **configured_lower_precision_dt_cols_types 1163 } if pipe.enforce else {} 1164 1165 existing_cols = cols_types.keys() 1166 select_columns = ( 1167 [ 1168 col 1169 for col in existing_cols 1170 if col not in (omit_columns or []) 1171 ] 1172 if not select_columns 1173 else [ 1174 col 1175 for col in select_columns 1176 if col in existing_cols 1177 and col not in (omit_columns or []) 1178 ] 1179 ) if pipe.enforce else select_columns 1180 1181 if select_columns: 1182 dtypes = {col: typ for col, typ in dtypes.items() if col in select_columns} 1183 1184 dtypes = { 1185 col: typ 1186 for col, typ in dtypes.items() 1187 if col in (select_columns or [col]) and col not in (omit_columns or []) 1188 } if pipe.enforce else {} 1189 1190 if debug: 1191 dprint(f"[{self}] `read()` dtypes:") 1192 mrsm.pprint(dtypes) 1193 1194 query = self.get_pipe_data_query( 1195 pipe, 1196 select_columns=select_columns, 1197 omit_columns=omit_columns, 1198 begin=begin, 1199 end=end, 1200 params=params, 1201 order=order, 1202 limit=limit, 1203 begin_add_minutes=begin_add_minutes, 1204 end_add_minutes=end_add_minutes, 1205 debug=debug, 1206 **kw 1207 ) 1208 1209 read_kwargs = {} 1210 if is_dask: 1211 index_col = pipe.columns.get('datetime', None) 1212 read_kwargs['index_col'] = index_col 1213 1214 chunks = self.read( 1215 query, 1216 chunksize=chunksize, 1217 as_iterator=True, 1218 coerce_float=False, 1219 dtype=dtypes, 1220 debug=debug, 1221 **read_kwargs 1222 ) 1223 1224 if as_iterator: 1225 return chunks 1226 1227 return pd.concat(chunks)
Access a pipe's data from the SQL instance.
Parameters
- pipe (mrsm.Pipe:): The pipe to get data from.
- select_columns (Optional[List[str]], default None):
If provided, only select these given columns.
Otherwise select all available columns (i.e.
SELECT *
). - omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
- begin (Union[datetime, str, None], default None): If provided, get rows newer than or equal to this value.
- end (Union[datetime, str, None], default None): If provided, get rows older than or equal to this value.
- params (Optional[Dict[str, Any]], default None):
Additional parameters to filter by.
See
meerschaum.connectors.sql.build_where
. - order (Optional[str], default 'asc'):
The selection order for all of the indices in the query.
If
None
, omit theORDER BY
clause. - limit (Optional[int], default None): If specified, limit the number of rows retrieved to this value.
- begin_add_minutes (int, default 0):
The number of minutes to add to the
begin
datetime (i.e.DATEADD
). - end_add_minutes (int, default 0):
The number of minutes to add to the
end
datetime (i.e.DATEADD
). - chunksize (Optional[int], default -1): The size of dataframe chunks to load into memory.
- as_iterator (bool, default False):
If
True
, return the chunks iterator directly. - debug (bool, default False): Verbosity toggle.
Returns
- A
pd.DataFrame
of the pipe's data.
1230def get_pipe_data_query( 1231 self, 1232 pipe: mrsm.Pipe, 1233 select_columns: Optional[List[str]] = None, 1234 omit_columns: Optional[List[str]] = None, 1235 begin: Union[datetime, int, str, None] = None, 1236 end: Union[datetime, int, str, None] = None, 1237 params: Optional[Dict[str, Any]] = None, 1238 order: Optional[str] = 'asc', 1239 sort_datetimes: bool = False, 1240 limit: Optional[int] = None, 1241 begin_add_minutes: int = 0, 1242 end_add_minutes: int = 0, 1243 replace_nulls: Optional[str] = None, 1244 skip_existing_cols_check: bool = False, 1245 debug: bool = False, 1246 **kw: Any 1247) -> Union[str, None]: 1248 """ 1249 Return the `SELECT` query for retrieving a pipe's data from its instance. 1250 1251 Parameters 1252 ---------- 1253 pipe: mrsm.Pipe: 1254 The pipe to get data from. 1255 1256 select_columns: Optional[List[str]], default None 1257 If provided, only select these given columns. 1258 Otherwise select all available columns (i.e. `SELECT *`). 1259 1260 omit_columns: Optional[List[str]], default None 1261 If provided, remove these columns from the selection. 1262 1263 begin: Union[datetime, int, str, None], default None 1264 If provided, get rows newer than or equal to this value. 1265 1266 end: Union[datetime, str, None], default None 1267 If provided, get rows older than or equal to this value. 1268 1269 params: Optional[Dict[str, Any]], default None 1270 Additional parameters to filter by. 1271 See `meerschaum.connectors.sql.build_where`. 1272 1273 order: Optional[str], default None 1274 The selection order for all of the indices in the query. 1275 If `None`, omit the `ORDER BY` clause. 1276 1277 sort_datetimes: bool, default False 1278 Alias for `order='desc'`. 1279 1280 limit: Optional[int], default None 1281 If specified, limit the number of rows retrieved to this value. 1282 1283 begin_add_minutes: int, default 0 1284 The number of minutes to add to the `begin` datetime (i.e. `DATEADD`). 1285 1286 end_add_minutes: int, default 0 1287 The number of minutes to add to the `end` datetime (i.e. `DATEADD`). 1288 1289 chunksize: Optional[int], default -1 1290 The size of dataframe chunks to load into memory. 1291 1292 replace_nulls: Optional[str], default None 1293 If provided, replace null values with this value. 1294 1295 skip_existing_cols_check: bool, default False 1296 If `True`, do not verify that querying columns are actually on the table. 1297 1298 debug: bool, default False 1299 Verbosity toggle. 1300 1301 Returns 1302 ------- 1303 A `SELECT` query to retrieve a pipe's data. 1304 """ 1305 from meerschaum.utils.misc import items_str 1306 from meerschaum.utils.sql import sql_item_name, dateadd_str 1307 from meerschaum.utils.dtypes import coerce_timezone 1308 from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type, get_db_type_from_pd_type 1309 1310 dt_col = pipe.columns.get('datetime', None) 1311 existing_cols = pipe.get_columns_types(debug=debug) if pipe.enforce else [] 1312 skip_existing_cols_check = skip_existing_cols_check or not pipe.enforce 1313 dt_typ = get_pd_type_from_db_type(existing_cols[dt_col]) if dt_col in existing_cols else None 1314 dt_db_type = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None 1315 select_columns = ( 1316 [col for col in existing_cols] 1317 if not select_columns 1318 else [col for col in select_columns if skip_existing_cols_check or col in existing_cols] 1319 ) 1320 if omit_columns: 1321 select_columns = [col for col in select_columns if col not in omit_columns] 1322 1323 if order is None and sort_datetimes: 1324 order = 'desc' 1325 1326 if begin == '': 1327 begin = pipe.get_sync_time(debug=debug) 1328 backtrack_interval = pipe.get_backtrack_interval(debug=debug) 1329 if begin is not None: 1330 begin -= backtrack_interval 1331 1332 begin, end = pipe.parse_date_bounds(begin, end) 1333 if isinstance(begin, datetime) and dt_typ: 1334 begin = coerce_timezone(begin, strip_utc=('utc' not in dt_typ.lower())) 1335 if isinstance(end, datetime) and dt_typ: 1336 end = coerce_timezone(end, strip_utc=('utc' not in dt_typ.lower())) 1337 1338 cols_names = [ 1339 sql_item_name(col, self.flavor, None) 1340 for col in select_columns 1341 ] 1342 select_cols_str = ( 1343 'SELECT\n ' 1344 + ',\n '.join( 1345 [ 1346 ( 1347 col_name 1348 if not replace_nulls 1349 else f"COALESCE(col_name, '{replace_nulls}') AS {col_name}" 1350 ) 1351 for col_name in cols_names 1352 ] 1353 ) 1354 ) if cols_names else 'SELECT *' 1355 pipe_table_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 1356 query = f"{select_cols_str}\nFROM {pipe_table_name}" 1357 where = "" 1358 1359 if order is not None: 1360 default_order = 'asc' 1361 if order not in ('asc', 'desc'): 1362 warn(f"Ignoring unsupported order '{order}'. Falling back to '{default_order}'.") 1363 order = default_order 1364 order = order.upper() 1365 1366 if not pipe.columns.get('datetime', None): 1367 _dt = pipe.guess_datetime() 1368 dt = sql_item_name(_dt, self.flavor, None) if _dt else None 1369 is_guess = True 1370 else: 1371 _dt = pipe.get_columns('datetime') 1372 dt = sql_item_name(_dt, self.flavor, None) 1373 is_guess = False 1374 1375 quoted_indices = { 1376 key: sql_item_name(val, self.flavor, None) 1377 for key, val in pipe.columns.items() 1378 if val in existing_cols or skip_existing_cols_check 1379 } 1380 1381 if begin is not None or end is not None: 1382 if is_guess: 1383 if _dt is None: 1384 warn( 1385 f"No datetime could be determined for {pipe}." 1386 + "\n Ignoring begin and end...", 1387 stack=False, 1388 ) 1389 begin, end = None, None 1390 else: 1391 warn( 1392 f"A datetime wasn't specified for {pipe}.\n" 1393 + f" Using column \"{_dt}\" for datetime bounds...", 1394 stack=False, 1395 ) 1396 1397 is_dt_bound = False 1398 if begin is not None and (_dt in existing_cols or skip_existing_cols_check): 1399 begin_da = dateadd_str( 1400 flavor=self.flavor, 1401 datepart='minute', 1402 number=begin_add_minutes, 1403 begin=begin, 1404 db_type=dt_db_type, 1405 ) 1406 where += f"\n {dt} >= {begin_da}" + ("\n AND\n " if end is not None else "") 1407 is_dt_bound = True 1408 1409 if end is not None and (_dt in existing_cols or skip_existing_cols_check): 1410 if 'int' in str(type(end)).lower() and end == begin: 1411 end += 1 1412 end_da = dateadd_str( 1413 flavor=self.flavor, 1414 datepart='minute', 1415 number=end_add_minutes, 1416 begin=end, 1417 db_type=dt_db_type, 1418 ) 1419 where += f"{dt} < {end_da}" 1420 is_dt_bound = True 1421 1422 if params is not None: 1423 from meerschaum.utils.sql import build_where 1424 valid_params = { 1425 k: v 1426 for k, v in params.items() 1427 if k in existing_cols or skip_existing_cols_check 1428 } 1429 if valid_params: 1430 where += ' ' + build_where(valid_params, self).lstrip().replace( 1431 'WHERE', (' AND' if is_dt_bound else " ") 1432 ) 1433 1434 if len(where) > 0: 1435 query += "\nWHERE " + where 1436 1437 if order is not None: 1438 ### Sort by indices, starting with datetime. 1439 order_by = "" 1440 if quoted_indices: 1441 order_by += "\nORDER BY " 1442 if _dt and (_dt in existing_cols or skip_existing_cols_check): 1443 order_by += dt + ' ' + order + ',' 1444 for key, quoted_col_name in quoted_indices.items(): 1445 if dt == quoted_col_name: 1446 continue 1447 order_by += ' ' + quoted_col_name + ' ' + order + ',' 1448 order_by = order_by[:-1] 1449 1450 query += order_by 1451 1452 if isinstance(limit, int): 1453 if self.flavor == 'mssql': 1454 query = f'SELECT TOP {limit}\n' + query[len("SELECT "):] 1455 elif self.flavor == 'oracle': 1456 query = ( 1457 f"SELECT * FROM (\n {query}\n)\n" 1458 + f"WHERE ROWNUM IN ({', '.join([str(i) for i in range(1, limit+1)])})" 1459 ) 1460 else: 1461 query += f"\nLIMIT {limit}" 1462 1463 if debug: 1464 to_print = ( 1465 [] 1466 + ([f"begin='{begin}'"] if begin else []) 1467 + ([f"end='{end}'"] if end else []) 1468 + ([f"params={params}"] if params else []) 1469 ) 1470 dprint("Getting pipe data with constraints: " + items_str(to_print, quotes=False)) 1471 1472 return query
Return the SELECT
query for retrieving a pipe's data from its instance.
Parameters
- pipe (mrsm.Pipe:): The pipe to get data from.
- select_columns (Optional[List[str]], default None):
If provided, only select these given columns.
Otherwise select all available columns (i.e.
SELECT *
). - omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
- begin (Union[datetime, int, str, None], default None): If provided, get rows newer than or equal to this value.
- end (Union[datetime, str, None], default None): If provided, get rows older than or equal to this value.
- params (Optional[Dict[str, Any]], default None):
Additional parameters to filter by.
See
meerschaum.connectors.sql.build_where
. - order (Optional[str], default None):
The selection order for all of the indices in the query.
If
None
, omit theORDER BY
clause. - sort_datetimes (bool, default False):
Alias for
order='desc'
. - limit (Optional[int], default None): If specified, limit the number of rows retrieved to this value.
- begin_add_minutes (int, default 0):
The number of minutes to add to the
begin
datetime (i.e.DATEADD
). - end_add_minutes (int, default 0):
The number of minutes to add to the
end
datetime (i.e.DATEADD
). - chunksize (Optional[int], default -1): The size of dataframe chunks to load into memory.
- replace_nulls (Optional[str], default None): If provided, replace null values with this value.
- skip_existing_cols_check (bool, default False):
If
True
, do not verify that querying columns are actually on the table. - debug (bool, default False): Verbosity toggle.
Returns
- A
SELECT
query to retrieve a pipe's data.
21def register_pipe( 22 self, 23 pipe: mrsm.Pipe, 24 debug: bool = False, 25) -> SuccessTuple: 26 """ 27 Register a new pipe. 28 A pipe's attributes must be set before registering. 29 """ 30 from meerschaum.utils.packages import attempt_import 31 from meerschaum.utils.sql import json_flavors 32 33 ### ensure pipes table exists 34 from meerschaum.connectors.sql.tables import get_tables 35 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 36 37 if pipe.get_id(debug=debug) is not None: 38 return False, f"{pipe} is already registered." 39 40 ### NOTE: if `parameters` is supplied in the Pipe constructor, 41 ### then `pipe.parameters` will exist and not be fetched from the database. 42 43 ### 1. Prioritize the Pipe object's `parameters` first. 44 ### E.g. if the user manually sets the `parameters` property 45 ### or if the Pipe already exists 46 ### (which shouldn't be able to be registered anyway but that's an issue for later). 47 parameters = None 48 try: 49 parameters = pipe.get_parameters(apply_symlinks=False) 50 except Exception as e: 51 if debug: 52 dprint(str(e)) 53 parameters = None 54 55 ### ensure `parameters` is a dictionary 56 if parameters is None: 57 parameters = {} 58 59 import json 60 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 61 values = { 62 'connector_keys' : pipe.connector_keys, 63 'metric_key' : pipe.metric_key, 64 'location_key' : pipe.location_key, 65 'parameters' : ( 66 json.dumps(parameters) 67 if self.flavor not in json_flavors 68 else parameters 69 ), 70 } 71 query = sqlalchemy.insert(pipes_tbl).values(**values) 72 result = self.exec(query, debug=debug) 73 if result is None: 74 return False, f"Failed to register {pipe}." 75 return True, f"Successfully registered {pipe}."
Register a new pipe. A pipe's attributes must be set before registering.
78def edit_pipe( 79 self, 80 pipe: mrsm.Pipe, 81 patch: bool = False, 82 debug: bool = False, 83 **kw : Any 84) -> SuccessTuple: 85 """ 86 Persist a Pipe's parameters to its database. 87 88 Parameters 89 ---------- 90 pipe: mrsm.Pipe, default None 91 The pipe to be edited. 92 patch: bool, default False 93 If patch is `True`, update the existing parameters by cascading. 94 Otherwise overwrite the parameters (default). 95 debug: bool, default False 96 Verbosity toggle. 97 """ 98 99 if pipe.id is None: 100 return False, f"{pipe} is not registered and cannot be edited." 101 102 from meerschaum.utils.packages import attempt_import 103 from meerschaum.utils.sql import json_flavors 104 if not patch: 105 parameters = pipe.__dict__.get('_attributes', {}).get('parameters', {}) 106 else: 107 from meerschaum import Pipe 108 from meerschaum.config._patch import apply_patch_to_config 109 original_parameters = Pipe( 110 pipe.connector_keys, pipe.metric_key, pipe.location_key, 111 mrsm_instance=pipe.instance_keys 112 ).get_parameters(apply_symlinks=False) 113 parameters = apply_patch_to_config( 114 original_parameters, 115 pipe._attributes['parameters'] 116 ) 117 118 ### ensure pipes table exists 119 from meerschaum.connectors.sql.tables import get_tables 120 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 121 122 import json 123 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 124 125 values = { 126 'parameters': ( 127 json.dumps(parameters) 128 if self.flavor not in json_flavors 129 else parameters 130 ), 131 } 132 q = sqlalchemy.update(pipes_tbl).values(**values).where( 133 pipes_tbl.c.pipe_id == pipe.id 134 ) 135 136 result = self.exec(q, debug=debug) 137 message = ( 138 f"Successfully edited {pipe}." 139 if result is not None else f"Failed to edit {pipe}." 140 ) 141 return (result is not None), message
Persist a Pipe's parameters to its database.
Parameters
- pipe (mrsm.Pipe, default None): The pipe to be edited.
- patch (bool, default False):
If patch is
True
, update the existing parameters by cascading. Otherwise overwrite the parameters (default). - debug (bool, default False): Verbosity toggle.
1475def get_pipe_id( 1476 self, 1477 pipe: mrsm.Pipe, 1478 debug: bool = False, 1479) -> Any: 1480 """ 1481 Get a Pipe's ID from the pipes table. 1482 """ 1483 if pipe.temporary: 1484 return None 1485 from meerschaum.utils.packages import attempt_import 1486 sqlalchemy = attempt_import('sqlalchemy') 1487 from meerschaum.connectors.sql.tables import get_tables 1488 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 1489 1490 query = sqlalchemy.select(pipes_tbl.c.pipe_id).where( 1491 pipes_tbl.c.connector_keys == pipe.connector_keys 1492 ).where( 1493 pipes_tbl.c.metric_key == pipe.metric_key 1494 ).where( 1495 (pipes_tbl.c.location_key == pipe.location_key) if pipe.location_key is not None 1496 else pipes_tbl.c.location_key.is_(None) 1497 ) 1498 _id = self.value(query, debug=debug, silent=pipe.temporary) 1499 if _id is not None: 1500 _id = int(_id) 1501 return _id
Get a Pipe's ID from the pipes table.
1504def get_pipe_attributes( 1505 self, 1506 pipe: mrsm.Pipe, 1507 debug: bool = False, 1508) -> Dict[str, Any]: 1509 """ 1510 Get a Pipe's attributes dictionary. 1511 """ 1512 from meerschaum.connectors.sql.tables import get_tables 1513 from meerschaum.utils.packages import attempt_import 1514 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 1515 1516 if pipe.get_id(debug=debug) is None: 1517 return {} 1518 1519 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 1520 1521 try: 1522 q = sqlalchemy.select(pipes_tbl).where(pipes_tbl.c.pipe_id == pipe.id) 1523 if debug: 1524 dprint(q) 1525 rows = ( 1526 self.exec(q, silent=True, debug=debug).mappings().all() 1527 if self.flavor != 'duckdb' 1528 else self.read(q, debug=debug).to_dict(orient='records') 1529 ) 1530 if not rows: 1531 return {} 1532 attributes = dict(rows[0]) 1533 except Exception: 1534 warn(traceback.format_exc()) 1535 return {} 1536 1537 ### handle non-PostgreSQL databases (text vs JSON) 1538 if not isinstance(attributes.get('parameters', None), dict): 1539 try: 1540 import json 1541 parameters = json.loads(attributes['parameters']) 1542 if isinstance(parameters, str) and parameters[0] == '{': 1543 parameters = json.loads(parameters) 1544 attributes['parameters'] = parameters 1545 except Exception: 1546 attributes['parameters'] = {} 1547 1548 return attributes
Get a Pipe's attributes dictionary.
1634def sync_pipe( 1635 self, 1636 pipe: mrsm.Pipe, 1637 df: Union[pd.DataFrame, str, Dict[Any, Any], None] = None, 1638 begin: Union[datetime, int, None] = None, 1639 end: Union[datetime, int, None] = None, 1640 chunksize: Optional[int] = -1, 1641 check_existing: bool = True, 1642 blocking: bool = True, 1643 debug: bool = False, 1644 _check_temporary_tables: bool = True, 1645 **kw: Any 1646) -> SuccessTuple: 1647 """ 1648 Sync a pipe using a database connection. 1649 1650 Parameters 1651 ---------- 1652 pipe: mrsm.Pipe 1653 The Meerschaum Pipe instance into which to sync the data. 1654 1655 df: Union[pandas.DataFrame, str, Dict[Any, Any], List[Dict[str, Any]]] 1656 An optional DataFrame or equivalent to sync into the pipe. 1657 Defaults to `None`. 1658 1659 begin: Union[datetime, int, None], default None 1660 Optionally specify the earliest datetime to search for data. 1661 Defaults to `None`. 1662 1663 end: Union[datetime, int, None], default None 1664 Optionally specify the latest datetime to search for data. 1665 Defaults to `None`. 1666 1667 chunksize: Optional[int], default -1 1668 Specify the number of rows to sync per chunk. 1669 If `-1`, resort to system configuration (default is `900`). 1670 A `chunksize` of `None` will sync all rows in one transaction. 1671 Defaults to `-1`. 1672 1673 check_existing: bool, default True 1674 If `True`, pull and diff with existing data from the pipe. Defaults to `True`. 1675 1676 blocking: bool, default True 1677 If `True`, wait for sync to finish and return its result, otherwise asyncronously sync. 1678 Defaults to `True`. 1679 1680 debug: bool, default False 1681 Verbosity toggle. Defaults to False. 1682 1683 kw: Any 1684 Catch-all for keyword arguments. 1685 1686 Returns 1687 ------- 1688 A `SuccessTuple` of success (`bool`) and message (`str`). 1689 """ 1690 from meerschaum.utils.packages import import_pandas 1691 from meerschaum.utils.sql import ( 1692 get_update_queries, 1693 sql_item_name, 1694 UPDATE_QUERIES, 1695 get_reset_autoincrement_queries, 1696 ) 1697 from meerschaum.utils.dtypes import get_current_timestamp 1698 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 1699 from meerschaum.utils.dataframe import get_special_cols 1700 from meerschaum import Pipe 1701 import time 1702 import copy 1703 pd = import_pandas() 1704 if df is None: 1705 msg = f"DataFrame is None. Cannot sync {pipe}." 1706 warn(msg) 1707 return False, msg 1708 1709 start = time.perf_counter() 1710 pipe_name = sql_item_name(pipe.target, self.flavor, schema=self.get_pipe_schema(pipe)) 1711 dtypes = pipe.get_dtypes(debug=debug) 1712 1713 if not pipe.temporary and not pipe.get_id(debug=debug): 1714 register_tuple = pipe.register(debug=debug) 1715 if not register_tuple[0]: 1716 return register_tuple 1717 1718 ### df is the dataframe returned from the remote source 1719 ### via the connector 1720 if debug: 1721 dprint("Fetched data:\n" + str(df)) 1722 1723 if not isinstance(df, pd.DataFrame): 1724 df = pipe.enforce_dtypes( 1725 df, 1726 chunksize=chunksize, 1727 safe_copy=kw.get('safe_copy', False), 1728 dtypes=dtypes, 1729 debug=debug, 1730 ) 1731 1732 ### if table does not exist, create it with indices 1733 is_new = False 1734 if not pipe.exists(debug=debug): 1735 check_existing = False 1736 is_new = True 1737 else: 1738 ### Check for new columns. 1739 add_cols_queries = self.get_add_columns_queries(pipe, df, debug=debug) 1740 if add_cols_queries: 1741 pipe._clear_cache_key('_columns_types', debug=debug) 1742 pipe._clear_cache_key('_columns_indices', debug=debug) 1743 if not self.exec_queries(add_cols_queries, debug=debug): 1744 warn(f"Failed to add new columns to {pipe}.") 1745 1746 alter_cols_queries = self.get_alter_columns_queries(pipe, df, debug=debug) 1747 if alter_cols_queries: 1748 pipe._clear_cache_key('_columns_types', debug=debug) 1749 pipe._clear_cache_key('_columns_types', debug=debug) 1750 if not self.exec_queries(alter_cols_queries, debug=debug): 1751 warn(f"Failed to alter columns for {pipe}.") 1752 1753 upsert = pipe.parameters.get('upsert', False) and (self.flavor + '-upsert') in UPDATE_QUERIES 1754 if upsert: 1755 check_existing = False 1756 kw['safe_copy'] = kw.get('safe_copy', False) 1757 1758 unseen_df, update_df, delta_df = ( 1759 pipe.filter_existing( 1760 df, 1761 chunksize=chunksize, 1762 debug=debug, 1763 **kw 1764 ) if check_existing else (df, None, df) 1765 ) 1766 if upsert: 1767 unseen_df, update_df, delta_df = (df.head(0), df, df) 1768 1769 if debug: 1770 dprint("Delta data:\n" + str(delta_df)) 1771 dprint("Unseen data:\n" + str(unseen_df)) 1772 if update_df is not None: 1773 dprint(("Update" if not upsert else "Upsert") + " data:\n" + str(update_df)) 1774 1775 if_exists = kw.get('if_exists', 'append') 1776 if 'if_exists' in kw: 1777 kw.pop('if_exists') 1778 if 'name' in kw: 1779 kw.pop('name') 1780 1781 ### Insert new data into the target table. 1782 unseen_kw = copy.deepcopy(kw) 1783 unseen_kw.update({ 1784 'name': pipe.target, 1785 'if_exists': if_exists, 1786 'debug': debug, 1787 'as_dict': True, 1788 'safe_copy': kw.get('safe_copy', False), 1789 'chunksize': chunksize, 1790 'dtype': self.get_to_sql_dtype(pipe, unseen_df, update_dtypes=True), 1791 'schema': self.get_pipe_schema(pipe), 1792 }) 1793 1794 dt_col = pipe.columns.get('datetime', None) 1795 primary_key = pipe.columns.get('primary', None) 1796 autoincrement = ( 1797 pipe.parameters.get('autoincrement', False) 1798 or ( 1799 is_new 1800 and primary_key 1801 and primary_key 1802 not in dtypes 1803 and primary_key not in unseen_df.columns 1804 ) 1805 ) 1806 if autoincrement and autoincrement not in pipe.parameters: 1807 update_success, update_msg = pipe.update_parameters( 1808 {'autoincrement': autoincrement}, 1809 debug=debug, 1810 ) 1811 if not update_success: 1812 return update_success, update_msg 1813 1814 def _check_pk(_df_to_clear): 1815 if _df_to_clear is None: 1816 return 1817 if primary_key not in _df_to_clear.columns: 1818 return 1819 if not _df_to_clear[primary_key].notnull().any(): 1820 del _df_to_clear[primary_key] 1821 1822 autoincrement_needs_reset = bool( 1823 autoincrement 1824 and primary_key 1825 and primary_key in unseen_df.columns 1826 and unseen_df[primary_key].notnull().any() 1827 ) 1828 if autoincrement and primary_key: 1829 for _df_to_clear in (unseen_df, update_df, delta_df): 1830 _check_pk(_df_to_clear) 1831 1832 if is_new: 1833 create_success, create_msg = self.create_pipe_table_from_df( 1834 pipe, 1835 unseen_df, 1836 debug=debug, 1837 ) 1838 if not create_success: 1839 return create_success, create_msg 1840 1841 do_identity_insert = bool( 1842 self.flavor in ('mssql',) 1843 and primary_key 1844 and primary_key in unseen_df.columns 1845 and autoincrement 1846 ) 1847 stats = {'success': True, 'msg': ''} 1848 if len(unseen_df) > 0: 1849 with self.engine.connect() as connection: 1850 with connection.begin(): 1851 if do_identity_insert: 1852 identity_on_result = self.exec( 1853 f"SET IDENTITY_INSERT {pipe_name} ON", 1854 commit=False, 1855 _connection=connection, 1856 close=False, 1857 debug=debug, 1858 ) 1859 if identity_on_result is None: 1860 return False, f"Could not enable identity inserts on {pipe}." 1861 1862 stats = self.to_sql( 1863 unseen_df, 1864 _connection=connection, 1865 **unseen_kw 1866 ) 1867 1868 if do_identity_insert: 1869 identity_off_result = self.exec( 1870 f"SET IDENTITY_INSERT {pipe_name} OFF", 1871 commit=False, 1872 _connection=connection, 1873 close=False, 1874 debug=debug, 1875 ) 1876 if identity_off_result is None: 1877 return False, f"Could not disable identity inserts on {pipe}." 1878 1879 if is_new: 1880 if not self.create_indices(pipe, debug=debug): 1881 warn(f"Failed to create indices for {pipe}. Continuing...") 1882 1883 if autoincrement_needs_reset: 1884 reset_autoincrement_queries = get_reset_autoincrement_queries( 1885 pipe.target, 1886 primary_key, 1887 self, 1888 schema=self.get_pipe_schema(pipe), 1889 debug=debug, 1890 ) 1891 results = self.exec_queries(reset_autoincrement_queries, debug=debug) 1892 for result in results: 1893 if result is None: 1894 warn(f"Could not reset auto-incrementing primary key for {pipe}.", stack=False) 1895 1896 if update_df is not None and len(update_df) > 0: 1897 temp_target = self.get_temporary_target( 1898 pipe.target, 1899 label=('update' if not upsert else 'upsert'), 1900 ) 1901 self._log_temporary_tables_creation(temp_target, create=(not pipe.temporary), debug=debug) 1902 update_dtypes = { 1903 **{ 1904 col: str(typ) 1905 for col, typ in update_df.dtypes.items() 1906 }, 1907 **get_special_cols(update_df) 1908 } 1909 1910 temp_pipe = Pipe( 1911 pipe.connector_keys.replace(':', '_') + '_', pipe.metric_key, pipe.location_key, 1912 instance=pipe.instance_keys, 1913 columns={ 1914 (ix_key if ix_key != 'primary' else 'primary_'): ix 1915 for ix_key, ix in pipe.columns.items() 1916 if ix and ix in update_df.columns 1917 }, 1918 dtypes=update_dtypes, 1919 target=temp_target, 1920 temporary=True, 1921 enforce=False, 1922 static=True, 1923 autoincrement=False, 1924 cache=False, 1925 parameters={ 1926 'schema': self.internal_schema, 1927 'hypertable': False, 1928 }, 1929 ) 1930 _temp_columns_types = { 1931 col: get_db_type_from_pd_type(typ, self.flavor) 1932 for col, typ in update_dtypes.items() 1933 } 1934 temp_pipe._cache_value('_columns_types', _temp_columns_types, memory_only=True, debug=debug) 1935 temp_pipe._cache_value('_skip_check_indices', True, memory_only=True, debug=debug) 1936 now_ts = get_current_timestamp('ms', as_int=True) / 1000 1937 temp_pipe._cache_value('_columns_types_timestamp', now_ts, memory_only=True, debug=debug) 1938 temp_success, temp_msg = temp_pipe.sync(update_df, check_existing=False, debug=debug) 1939 if not temp_success: 1940 return temp_success, temp_msg 1941 1942 existing_cols = pipe.get_columns_types(debug=debug) 1943 join_cols = [ 1944 col 1945 for col_key, col in pipe.columns.items() 1946 if col and col in existing_cols 1947 ] if not primary_key or self.flavor == 'oracle' else ( 1948 [dt_col, primary_key] 1949 if ( 1950 self.flavor in ('timescaledb', 'timescaledb-ha') 1951 and dt_col 1952 and dt_col in update_df.columns 1953 ) 1954 else [primary_key] 1955 ) 1956 update_queries = get_update_queries( 1957 pipe.target, 1958 temp_target, 1959 self, 1960 join_cols, 1961 upsert=upsert, 1962 schema=self.get_pipe_schema(pipe), 1963 patch_schema=self.internal_schema, 1964 target_cols_types=pipe.get_columns_types(debug=debug), 1965 patch_cols_types=_temp_columns_types, 1966 datetime_col=(dt_col if dt_col in update_df.columns else None), 1967 identity_insert=(autoincrement and primary_key in update_df.columns), 1968 null_indices=pipe.null_indices, 1969 cast_columns=pipe.enforce, 1970 debug=debug, 1971 ) 1972 update_results = self.exec_queries( 1973 update_queries, 1974 break_on_error=True, 1975 rollback=True, 1976 debug=debug, 1977 ) 1978 update_success = all(update_results) 1979 self._log_temporary_tables_creation( 1980 temp_target, 1981 ready_to_drop=True, 1982 create=(not pipe.temporary), 1983 debug=debug, 1984 ) 1985 if not update_success: 1986 warn(f"Failed to apply update to {pipe}.") 1987 stats['success'] = stats['success'] and update_success 1988 stats['msg'] = ( 1989 (stats.get('msg', '') + f'\nFailed to apply update to {pipe}.').lstrip() 1990 if not update_success 1991 else stats.get('msg', '') 1992 ) 1993 1994 stop = time.perf_counter() 1995 success = stats['success'] 1996 if not success: 1997 return success, stats['msg'] or str(stats) 1998 1999 unseen_count = len(unseen_df.index) if unseen_df is not None else 0 2000 update_count = len(update_df.index) if update_df is not None else 0 2001 msg = ( 2002 ( 2003 f"Inserted {unseen_count:,}, " 2004 + f"updated {update_count:,} rows." 2005 ) 2006 if not upsert 2007 else ( 2008 f"Upserted {update_count:,} row" 2009 + ('s' if update_count != 1 else '') 2010 + "." 2011 ) 2012 ) 2013 if debug: 2014 msg = msg[:-1] + ( 2015 f"\non table {sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))}\n" 2016 + f"in {round(stop - start, 2)} seconds." 2017 ) 2018 2019 if _check_temporary_tables: 2020 drop_stale_success, drop_stale_msg = self._drop_old_temporary_tables( 2021 refresh=False, debug=debug 2022 ) 2023 if not drop_stale_success: 2024 warn(drop_stale_msg) 2025 2026 return success, msg
Sync a pipe using a database connection.
Parameters
- pipe (mrsm.Pipe): The Meerschaum Pipe instance into which to sync the data.
- df (Union[pandas.DataFrame, str, Dict[Any, Any], List[Dict[str, Any]]]):
An optional DataFrame or equivalent to sync into the pipe.
Defaults to
None
. - begin (Union[datetime, int, None], default None):
Optionally specify the earliest datetime to search for data.
Defaults to
None
. - end (Union[datetime, int, None], default None):
Optionally specify the latest datetime to search for data.
Defaults to
None
. - chunksize (Optional[int], default -1):
Specify the number of rows to sync per chunk.
If
-1
, resort to system configuration (default is900
). Achunksize
ofNone
will sync all rows in one transaction. Defaults to-1
. - check_existing (bool, default True):
If
True
, pull and diff with existing data from the pipe. Defaults toTrue
. - blocking (bool, default True):
If
True
, wait for sync to finish and return its result, otherwise asyncronously sync. Defaults toTrue
. - debug (bool, default False): Verbosity toggle. Defaults to False.
- kw (Any): Catch-all for keyword arguments.
Returns
- A
SuccessTuple
of success (bool
) and message (str
).
2029def sync_pipe_inplace( 2030 self, 2031 pipe: 'mrsm.Pipe', 2032 params: Optional[Dict[str, Any]] = None, 2033 begin: Union[datetime, int, None] = None, 2034 end: Union[datetime, int, None] = None, 2035 chunksize: Optional[int] = -1, 2036 check_existing: bool = True, 2037 debug: bool = False, 2038 **kw: Any 2039) -> SuccessTuple: 2040 """ 2041 If a pipe's connector is the same as its instance connector, 2042 it's more efficient to sync the pipe in-place rather than reading data into Pandas. 2043 2044 Parameters 2045 ---------- 2046 pipe: mrsm.Pipe 2047 The pipe whose connector is the same as its instance. 2048 2049 params: Optional[Dict[str, Any]], default None 2050 Optional params dictionary to build the `WHERE` clause. 2051 See `meerschaum.utils.sql.build_where`. 2052 2053 begin: Union[datetime, int, None], default None 2054 Optionally specify the earliest datetime to search for data. 2055 Defaults to `None`. 2056 2057 end: Union[datetime, int, None], default None 2058 Optionally specify the latest datetime to search for data. 2059 Defaults to `None`. 2060 2061 chunksize: Optional[int], default -1 2062 Specify the number of rows to sync per chunk. 2063 If `-1`, resort to system configuration (default is `900`). 2064 A `chunksize` of `None` will sync all rows in one transaction. 2065 Defaults to `-1`. 2066 2067 check_existing: bool, default True 2068 If `True`, pull and diff with existing data from the pipe. 2069 2070 debug: bool, default False 2071 Verbosity toggle. 2072 2073 Returns 2074 ------- 2075 A SuccessTuple. 2076 """ 2077 if self.flavor == 'duckdb': 2078 return pipe.sync( 2079 params=params, 2080 begin=begin, 2081 end=end, 2082 chunksize=chunksize, 2083 check_existing=check_existing, 2084 debug=debug, 2085 _inplace=False, 2086 **kw 2087 ) 2088 from meerschaum.utils.sql import ( 2089 sql_item_name, 2090 get_update_queries, 2091 get_null_replacement, 2092 get_create_table_queries, 2093 get_create_schema_if_not_exists_queries, 2094 get_table_cols_types, 2095 session_execute, 2096 dateadd_str, 2097 UPDATE_QUERIES, 2098 ) 2099 from meerschaum.utils.dtypes.sql import ( 2100 get_pd_type_from_db_type, 2101 get_db_type_from_pd_type, 2102 ) 2103 from meerschaum.utils.misc import generate_password 2104 2105 transaction_id_length = ( 2106 mrsm.get_config( 2107 'system', 'connectors', 'sql', 'instance', 'temporary_target', 'transaction_id_length' 2108 ) 2109 ) 2110 transact_id = generate_password(transaction_id_length) 2111 2112 internal_schema = self.internal_schema 2113 target = pipe.target 2114 temp_table_roots = ['backtrack', 'new', 'delta', 'joined', 'unseen', 'update'] 2115 temp_tables = { 2116 table_root: self.get_temporary_target(target, transact_id=transact_id, label=table_root) 2117 for table_root in temp_table_roots 2118 } 2119 temp_table_names = { 2120 table_root: sql_item_name(table_name_raw, self.flavor, internal_schema) 2121 for table_root, table_name_raw in temp_tables.items() 2122 } 2123 temp_table_aliases = { 2124 table_root: sql_item_name(table_root, self.flavor) 2125 for table_root in temp_table_roots 2126 } 2127 table_alias_as = " AS" if self.flavor != 'oracle' else '' 2128 metadef = self.get_pipe_metadef( 2129 pipe, 2130 params=params, 2131 begin=begin, 2132 end=end, 2133 check_existing=check_existing, 2134 debug=debug, 2135 ) 2136 pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 2137 upsert = pipe.parameters.get('upsert', False) and f'{self.flavor}-upsert' in UPDATE_QUERIES 2138 static = pipe.parameters.get('static', False) 2139 database = getattr(self, 'database', self.parse_uri(self.URI).get('database', None)) 2140 primary_key = pipe.columns.get('primary', None) 2141 primary_key_typ = pipe.dtypes.get(primary_key, None) if primary_key else None 2142 primary_key_db_type = ( 2143 get_db_type_from_pd_type(primary_key_typ, self.flavor) 2144 if primary_key_typ 2145 else None 2146 ) 2147 autoincrement = pipe.parameters.get('autoincrement', False) 2148 dt_col = pipe.columns.get('datetime', None) 2149 dt_col_name = sql_item_name(dt_col, self.flavor, None) if dt_col else None 2150 dt_typ = pipe.dtypes.get(dt_col, 'datetime') if dt_col else None 2151 dt_db_type = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None 2152 2153 def clean_up_temp_tables(ready_to_drop: bool = False): 2154 log_success, log_msg = self._log_temporary_tables_creation( 2155 [ 2156 table 2157 for table in temp_tables.values() 2158 ] if not upsert else [temp_tables['update']], 2159 ready_to_drop=ready_to_drop, 2160 create=(not pipe.temporary), 2161 debug=debug, 2162 ) 2163 if not log_success: 2164 warn(log_msg) 2165 drop_stale_success, drop_stale_msg = self._drop_old_temporary_tables( 2166 refresh=False, 2167 debug=debug, 2168 ) 2169 if not drop_stale_success: 2170 warn(drop_stale_msg) 2171 return drop_stale_success, drop_stale_msg 2172 2173 sqlalchemy, sqlalchemy_orm = mrsm.attempt_import( 2174 'sqlalchemy', 2175 'sqlalchemy.orm', 2176 ) 2177 if not pipe.exists(debug=debug): 2178 schema = self.get_pipe_schema(pipe) 2179 create_pipe_queries = get_create_table_queries( 2180 metadef, 2181 pipe.target, 2182 self.flavor, 2183 schema=schema, 2184 primary_key=primary_key, 2185 primary_key_db_type=primary_key_db_type, 2186 autoincrement=autoincrement, 2187 datetime_column=dt_col, 2188 ) 2189 if schema: 2190 create_pipe_queries = ( 2191 get_create_schema_if_not_exists_queries(schema, self.flavor) 2192 + create_pipe_queries 2193 ) 2194 2195 results = self.exec_queries(create_pipe_queries, debug=debug) 2196 if not all(results): 2197 _ = clean_up_temp_tables() 2198 return False, f"Could not insert new data into {pipe} from its SQL query definition." 2199 2200 if not self.create_indices(pipe, debug=debug): 2201 warn(f"Failed to create indices for {pipe}. Continuing...") 2202 2203 rowcount = pipe.get_rowcount(debug=debug) 2204 _ = clean_up_temp_tables() 2205 return True, f"Inserted {rowcount:,}, updated 0 rows." 2206 2207 session = sqlalchemy_orm.Session(self.engine) 2208 connectable = session if self.flavor != 'duckdb' else self 2209 2210 create_new_query = get_create_table_queries( 2211 metadef, 2212 temp_tables[('new') if not upsert else 'update'], 2213 self.flavor, 2214 schema=internal_schema, 2215 )[0] 2216 (create_new_success, create_new_msg), create_new_results = session_execute( 2217 session, 2218 create_new_query, 2219 with_results=True, 2220 debug=debug, 2221 ) 2222 if not create_new_success: 2223 _ = clean_up_temp_tables() 2224 return create_new_success, create_new_msg 2225 new_count = create_new_results[0].rowcount if create_new_results else 0 2226 2227 new_cols_types = get_table_cols_types( 2228 temp_tables[('new' if not upsert else 'update')], 2229 connectable=connectable, 2230 flavor=self.flavor, 2231 schema=internal_schema, 2232 database=database, 2233 debug=debug, 2234 ) if not static else pipe.get_columns_types(debug=debug) 2235 if not new_cols_types: 2236 return False, f"Failed to get new columns for {pipe}." 2237 2238 new_cols = { 2239 str(col_name): get_pd_type_from_db_type(str(col_type)) 2240 for col_name, col_type in new_cols_types.items() 2241 } 2242 new_cols_str = '\n ' + ',\n '.join([ 2243 sql_item_name(col, self.flavor) 2244 for col in new_cols 2245 ]) 2246 def get_col_typ(col: str, cols_types: Dict[str, str]) -> str: 2247 if self.flavor == 'oracle' and new_cols_types.get(col, '').lower() == 'char': 2248 return new_cols_types[col] 2249 return cols_types[col] 2250 2251 add_cols_queries = self.get_add_columns_queries(pipe, new_cols, debug=debug) 2252 if add_cols_queries: 2253 pipe._clear_cache_key('_columns_types', debug=debug) 2254 pipe._clear_cache_key('_columns_indices', debug=debug) 2255 self.exec_queries(add_cols_queries, debug=debug) 2256 2257 alter_cols_queries = self.get_alter_columns_queries(pipe, new_cols, debug=debug) 2258 if alter_cols_queries: 2259 pipe._clear_cache_key('_columns_types', debug=debug) 2260 self.exec_queries(alter_cols_queries, debug=debug) 2261 2262 insert_queries = [ 2263 ( 2264 f"INSERT INTO {pipe_name} ({new_cols_str})\n" 2265 f"SELECT {new_cols_str}\nFROM {temp_table_names['new']}{table_alias_as}" 2266 f" {temp_table_aliases['new']}" 2267 ) 2268 ] if not check_existing and not upsert else [] 2269 2270 new_queries = insert_queries 2271 new_success, new_msg = ( 2272 session_execute(session, new_queries, debug=debug) 2273 if new_queries 2274 else (True, "Success") 2275 ) 2276 if not new_success: 2277 _ = clean_up_temp_tables() 2278 return new_success, new_msg 2279 2280 if not check_existing: 2281 session.commit() 2282 _ = clean_up_temp_tables() 2283 return True, f"Inserted {new_count}, updated 0 rows." 2284 2285 min_dt_col_name_da = dateadd_str( 2286 flavor=self.flavor, begin=f"MIN({dt_col_name})", db_type=dt_db_type, 2287 ) 2288 max_dt_col_name_da = dateadd_str( 2289 flavor=self.flavor, begin=f"MAX({dt_col_name})", db_type=dt_db_type, 2290 ) 2291 2292 (new_dt_bounds_success, new_dt_bounds_msg), new_dt_bounds_results = session_execute( 2293 session, 2294 [ 2295 "SELECT\n" 2296 f" {min_dt_col_name_da} AS {sql_item_name('min_dt', self.flavor)},\n" 2297 f" {max_dt_col_name_da} AS {sql_item_name('max_dt', self.flavor)}\n" 2298 f"FROM {temp_table_names['new' if not upsert else 'update']}\n" 2299 f"WHERE {dt_col_name} IS NOT NULL" 2300 ], 2301 with_results=True, 2302 debug=debug, 2303 ) if dt_col and not upsert else ((True, "Success"), None) 2304 if not new_dt_bounds_success: 2305 return ( 2306 new_dt_bounds_success, 2307 f"Could not determine in-place datetime bounds:\n{new_dt_bounds_msg}" 2308 ) 2309 2310 if dt_col and not upsert: 2311 begin, end = new_dt_bounds_results[0].fetchone() 2312 2313 backtrack_def = self.get_pipe_data_query( 2314 pipe, 2315 begin=begin, 2316 end=end, 2317 begin_add_minutes=0, 2318 end_add_minutes=1, 2319 params=params, 2320 debug=debug, 2321 order=None, 2322 ) 2323 create_backtrack_query = get_create_table_queries( 2324 backtrack_def, 2325 temp_tables['backtrack'], 2326 self.flavor, 2327 schema=internal_schema, 2328 )[0] 2329 (create_backtrack_success, create_backtrack_msg), create_backtrack_results = session_execute( 2330 session, 2331 create_backtrack_query, 2332 with_results=True, 2333 debug=debug, 2334 ) if not upsert else ((True, "Success"), None) 2335 2336 if not create_backtrack_success: 2337 _ = clean_up_temp_tables() 2338 return create_backtrack_success, create_backtrack_msg 2339 2340 backtrack_cols_types = get_table_cols_types( 2341 temp_tables['backtrack'], 2342 connectable=connectable, 2343 flavor=self.flavor, 2344 schema=internal_schema, 2345 database=database, 2346 debug=debug, 2347 ) if not (upsert or static) else new_cols_types 2348 2349 common_cols = [col for col in new_cols if col in backtrack_cols_types] 2350 primary_key = pipe.columns.get('primary', None) 2351 on_cols = { 2352 col: new_cols.get(col) 2353 for col_key, col in pipe.columns.items() 2354 if ( 2355 col 2356 and 2357 col_key != 'value' 2358 and col in backtrack_cols_types 2359 and col in new_cols 2360 ) 2361 } if not primary_key or self.flavor == 'oracle' else {primary_key: new_cols.get(primary_key)} 2362 2363 null_replace_new_cols_str = ( 2364 '\n ' + ',\n '.join([ 2365 f"COALESCE({temp_table_aliases['new']}.{sql_item_name(col, self.flavor)}, " 2366 + get_null_replacement(get_col_typ(col, new_cols_types), self.flavor) 2367 + ") AS " 2368 + sql_item_name(col, self.flavor, None) 2369 for col, typ in new_cols.items() 2370 ]) 2371 ) 2372 2373 select_delta_query = ( 2374 "SELECT" 2375 + null_replace_new_cols_str 2376 + f"\nFROM {temp_table_names['new']}{table_alias_as} {temp_table_aliases['new']}\n" 2377 + f"LEFT OUTER JOIN {temp_table_names['backtrack']}{table_alias_as} {temp_table_aliases['backtrack']}" 2378 + "\n ON\n " 2379 + '\n AND\n '.join([ 2380 ( 2381 f" COALESCE({temp_table_aliases['new']}." 2382 + sql_item_name(c, self.flavor, None) 2383 + ", " 2384 + get_null_replacement(get_col_typ(c, new_cols_types), self.flavor) 2385 + ")" 2386 + '\n =\n ' 2387 + f" COALESCE({temp_table_aliases['backtrack']}." 2388 + sql_item_name(c, self.flavor, None) 2389 + ", " 2390 + get_null_replacement(get_col_typ(c, backtrack_cols_types), self.flavor) 2391 + ") " 2392 ) for c in common_cols 2393 ]) 2394 + "\nWHERE\n " 2395 + '\n AND\n '.join([ 2396 ( 2397 f"{temp_table_aliases['backtrack']}." + sql_item_name(c, self.flavor) + ' IS NULL' 2398 ) for c in common_cols 2399 ]) 2400 ) 2401 create_delta_query = get_create_table_queries( 2402 select_delta_query, 2403 temp_tables['delta'], 2404 self.flavor, 2405 schema=internal_schema, 2406 )[0] 2407 create_delta_success, create_delta_msg = session_execute( 2408 session, 2409 create_delta_query, 2410 debug=debug, 2411 ) if not upsert else (True, "Success") 2412 if not create_delta_success: 2413 _ = clean_up_temp_tables() 2414 return create_delta_success, create_delta_msg 2415 2416 delta_cols_types = get_table_cols_types( 2417 temp_tables['delta'], 2418 connectable=connectable, 2419 flavor=self.flavor, 2420 schema=internal_schema, 2421 database=database, 2422 debug=debug, 2423 ) if not (upsert or static) else new_cols_types 2424 2425 ### This is a weird bug on SQLite. 2426 ### Sometimes the backtrack dtypes are all empty strings. 2427 if not all(delta_cols_types.values()): 2428 delta_cols_types = new_cols_types 2429 2430 delta_cols = { 2431 col: get_pd_type_from_db_type(typ) 2432 for col, typ in delta_cols_types.items() 2433 } 2434 delta_cols_str = ', '.join([ 2435 sql_item_name(col, self.flavor) 2436 for col in delta_cols 2437 ]) 2438 2439 select_joined_query = ( 2440 "SELECT\n " 2441 + (',\n '.join([ 2442 ( 2443 f"{temp_table_aliases['delta']}." + sql_item_name(c, self.flavor, None) 2444 + " AS " + sql_item_name(c + '_delta', self.flavor, None) 2445 ) for c in delta_cols 2446 ])) 2447 + ",\n " 2448 + (',\n '.join([ 2449 ( 2450 f"{temp_table_aliases['backtrack']}." + sql_item_name(c, self.flavor, None) 2451 + " AS " + sql_item_name(c + '_backtrack', self.flavor, None) 2452 ) for c in backtrack_cols_types 2453 ])) 2454 + f"\nFROM {temp_table_names['delta']}{table_alias_as} {temp_table_aliases['delta']}\n" 2455 + f"LEFT OUTER JOIN {temp_table_names['backtrack']}{table_alias_as}" 2456 + f" {temp_table_aliases['backtrack']}" 2457 + "\n ON\n " 2458 + '\n AND\n '.join([ 2459 ( 2460 f" COALESCE({temp_table_aliases['delta']}." + sql_item_name(c, self.flavor) 2461 + ", " 2462 + get_null_replacement(get_col_typ(c, new_cols_types), self.flavor) + ")" 2463 + '\n =\n ' 2464 + f" COALESCE({temp_table_aliases['backtrack']}." + sql_item_name(c, self.flavor) 2465 + ", " 2466 + get_null_replacement(get_col_typ(c, new_cols_types), self.flavor) + ")" 2467 ) for c, typ in on_cols.items() 2468 ]) 2469 ) 2470 2471 create_joined_query = get_create_table_queries( 2472 select_joined_query, 2473 temp_tables['joined'], 2474 self.flavor, 2475 schema=internal_schema, 2476 )[0] 2477 create_joined_success, create_joined_msg = session_execute( 2478 session, 2479 create_joined_query, 2480 debug=debug, 2481 ) if on_cols and not upsert else (True, "Success") 2482 if not create_joined_success: 2483 _ = clean_up_temp_tables() 2484 return create_joined_success, create_joined_msg 2485 2486 select_unseen_query = ( 2487 "SELECT\n " 2488 + (',\n '.join([ 2489 ( 2490 "CASE\n WHEN " + sql_item_name(c + '_delta', self.flavor, None) 2491 + " != " + get_null_replacement(get_col_typ(c, delta_cols_types), self.flavor) 2492 + " THEN " + sql_item_name(c + '_delta', self.flavor, None) 2493 + "\n ELSE NULL\n END" 2494 + " AS " + sql_item_name(c, self.flavor, None) 2495 ) for c, typ in delta_cols.items() 2496 ])) 2497 + f"\nFROM {temp_table_names['joined']}{table_alias_as} {temp_table_aliases['joined']}\n" 2498 + "WHERE\n " 2499 + '\n AND\n '.join([ 2500 ( 2501 sql_item_name(c + '_backtrack', self.flavor, None) + ' IS NULL' 2502 ) for c in delta_cols 2503 ]) 2504 ) 2505 create_unseen_query = get_create_table_queries( 2506 select_unseen_query, 2507 temp_tables['unseen'], 2508 self.flavor, 2509 internal_schema, 2510 )[0] 2511 (create_unseen_success, create_unseen_msg), create_unseen_results = session_execute( 2512 session, 2513 create_unseen_query, 2514 with_results=True, 2515 debug=debug 2516 ) if not upsert else ((True, "Success"), None) 2517 if not create_unseen_success: 2518 _ = clean_up_temp_tables() 2519 return create_unseen_success, create_unseen_msg 2520 2521 select_update_query = ( 2522 "SELECT\n " 2523 + (',\n '.join([ 2524 ( 2525 "CASE\n WHEN " + sql_item_name(c + '_delta', self.flavor, None) 2526 + " != " + get_null_replacement(get_col_typ(c, delta_cols_types), self.flavor) 2527 + " THEN " + sql_item_name(c + '_delta', self.flavor, None) 2528 + "\n ELSE NULL\n END" 2529 + " AS " + sql_item_name(c, self.flavor, None) 2530 ) for c, typ in delta_cols.items() 2531 ])) 2532 + f"\nFROM {temp_table_names['joined']}{table_alias_as} {temp_table_aliases['joined']}\n" 2533 + "WHERE\n " 2534 + '\n OR\n '.join([ 2535 ( 2536 sql_item_name(c + '_backtrack', self.flavor, None) + ' IS NOT NULL' 2537 ) for c in delta_cols 2538 ]) 2539 ) 2540 2541 create_update_query = get_create_table_queries( 2542 select_update_query, 2543 temp_tables['update'], 2544 self.flavor, 2545 internal_schema, 2546 )[0] 2547 (create_update_success, create_update_msg), create_update_results = session_execute( 2548 session, 2549 create_update_query, 2550 with_results=True, 2551 debug=debug, 2552 ) if on_cols and not upsert else ((True, "Success"), []) 2553 apply_update_queries = ( 2554 get_update_queries( 2555 pipe.target, 2556 temp_tables['update'], 2557 session, 2558 on_cols, 2559 upsert=upsert, 2560 schema=self.get_pipe_schema(pipe), 2561 patch_schema=internal_schema, 2562 target_cols_types=pipe.get_columns_types(debug=debug), 2563 patch_cols_types=delta_cols_types, 2564 datetime_col=pipe.columns.get('datetime', None), 2565 flavor=self.flavor, 2566 null_indices=pipe.null_indices, 2567 cast_columns=pipe.enforce, 2568 debug=debug, 2569 ) 2570 if on_cols else [] 2571 ) 2572 2573 apply_unseen_queries = [ 2574 ( 2575 f"INSERT INTO {pipe_name} ({delta_cols_str})\n" 2576 + f"SELECT {delta_cols_str}\nFROM " 2577 + ( 2578 temp_table_names['unseen'] 2579 if on_cols 2580 else temp_table_names['delta'] 2581 ) 2582 ), 2583 ] 2584 2585 (apply_unseen_success, apply_unseen_msg), apply_unseen_results = session_execute( 2586 session, 2587 apply_unseen_queries, 2588 with_results=True, 2589 debug=debug, 2590 ) if not upsert else ((True, "Success"), None) 2591 if not apply_unseen_success: 2592 _ = clean_up_temp_tables() 2593 return apply_unseen_success, apply_unseen_msg 2594 unseen_count = apply_unseen_results[0].rowcount if apply_unseen_results else 0 2595 2596 (apply_update_success, apply_update_msg), apply_update_results = session_execute( 2597 session, 2598 apply_update_queries, 2599 with_results=True, 2600 debug=debug, 2601 ) 2602 if not apply_update_success: 2603 _ = clean_up_temp_tables() 2604 return apply_update_success, apply_update_msg 2605 update_count = apply_update_results[0].rowcount if apply_update_results else 0 2606 2607 session.commit() 2608 2609 msg = ( 2610 f"Inserted {unseen_count:,}, updated {update_count:,} rows." 2611 if not upsert 2612 else f"Upserted {update_count:,} row" + ('s' if update_count != 1 else '') + "." 2613 ) 2614 _ = clean_up_temp_tables(ready_to_drop=True) 2615 2616 return True, msg
If a pipe's connector is the same as its instance connector, it's more efficient to sync the pipe in-place rather than reading data into Pandas.
Parameters
- pipe (mrsm.Pipe): The pipe whose connector is the same as its instance.
- params (Optional[Dict[str, Any]], default None):
Optional params dictionary to build the
WHERE
clause. Seemeerschaum.utils.sql.build_where
. - begin (Union[datetime, int, None], default None):
Optionally specify the earliest datetime to search for data.
Defaults to
None
. - end (Union[datetime, int, None], default None):
Optionally specify the latest datetime to search for data.
Defaults to
None
. - chunksize (Optional[int], default -1):
Specify the number of rows to sync per chunk.
If
-1
, resort to system configuration (default is900
). Achunksize
ofNone
will sync all rows in one transaction. Defaults to-1
. - check_existing (bool, default True):
If
True
, pull and diff with existing data from the pipe. - debug (bool, default False): Verbosity toggle.
Returns
- A SuccessTuple.
2619def get_sync_time( 2620 self, 2621 pipe: 'mrsm.Pipe', 2622 params: Optional[Dict[str, Any]] = None, 2623 newest: bool = True, 2624 remote: bool = False, 2625 debug: bool = False, 2626) -> Union[datetime, int, None]: 2627 """Get a Pipe's most recent datetime value. 2628 2629 Parameters 2630 ---------- 2631 pipe: mrsm.Pipe 2632 The pipe to get the sync time for. 2633 2634 params: Optional[Dict[str, Any]], default None 2635 Optional params dictionary to build the `WHERE` clause. 2636 See `meerschaum.utils.sql.build_where`. 2637 2638 newest: bool, default True 2639 If `True`, get the most recent datetime (honoring `params`). 2640 If `False`, get the oldest datetime (ASC instead of DESC). 2641 2642 remote: bool, default False 2643 If `True`, return the sync time for the remote fetch definition. 2644 2645 Returns 2646 ------- 2647 A `datetime` object (or `int` if using an integer axis) if the pipe exists, otherwise `None`. 2648 """ 2649 from meerschaum.utils.sql import sql_item_name, build_where, wrap_query_with_cte 2650 src_name = sql_item_name('src', self.flavor) 2651 table_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 2652 2653 dt_col = pipe.columns.get('datetime', None) 2654 if dt_col is None: 2655 return None 2656 dt_col_name = sql_item_name(dt_col, self.flavor, None) 2657 2658 if remote and pipe.connector.type != 'sql': 2659 warn(f"Cannot get the remote sync time for {pipe}.") 2660 return None 2661 2662 ASC_or_DESC = "DESC" if newest else "ASC" 2663 existing_cols = pipe.get_columns_types(debug=debug) 2664 valid_params = {} 2665 if params is not None: 2666 valid_params = {k: v for k, v in params.items() if k in existing_cols} 2667 flavor = self.flavor if not remote else pipe.connector.flavor 2668 2669 ### If no bounds are provided for the datetime column, 2670 ### add IS NOT NULL to the WHERE clause. 2671 if dt_col not in valid_params: 2672 valid_params[dt_col] = '_None' 2673 where = "" if not valid_params else build_where(valid_params, self) 2674 src_query = ( 2675 f"SELECT {dt_col_name}\nFROM {table_name}{where}" 2676 if not remote 2677 else self.get_pipe_metadef(pipe, params=params, begin=None, end=None) 2678 ) 2679 2680 base_query = ( 2681 f"SELECT {dt_col_name}\n" 2682 f"FROM {src_name}{where}\n" 2683 f"ORDER BY {dt_col_name} {ASC_or_DESC}\n" 2684 f"LIMIT 1" 2685 ) 2686 if self.flavor == 'mssql': 2687 base_query = ( 2688 f"SELECT TOP 1 {dt_col_name}\n" 2689 f"FROM {src_name}{where}\n" 2690 f"ORDER BY {dt_col_name} {ASC_or_DESC}" 2691 ) 2692 elif self.flavor == 'oracle': 2693 base_query = ( 2694 "SELECT * FROM (\n" 2695 f" SELECT {dt_col_name}\n" 2696 f" FROM {src_name}{where}\n" 2697 f" ORDER BY {dt_col_name} {ASC_or_DESC}\n" 2698 ") WHERE ROWNUM = 1" 2699 ) 2700 2701 query = wrap_query_with_cte(src_query, base_query, flavor) 2702 2703 try: 2704 db_time = self.value(query, silent=True, debug=debug) 2705 2706 ### No datetime could be found. 2707 if db_time is None: 2708 return None 2709 ### sqlite returns str. 2710 if isinstance(db_time, str): 2711 dateutil_parser = mrsm.attempt_import('dateutil.parser') 2712 st = dateutil_parser.parse(db_time) 2713 ### Do nothing if a datetime object is returned. 2714 elif isinstance(db_time, datetime): 2715 if hasattr(db_time, 'to_pydatetime'): 2716 st = db_time.to_pydatetime() 2717 else: 2718 st = db_time 2719 ### Sometimes the datetime is actually a date. 2720 elif isinstance(db_time, date): 2721 st = datetime.combine(db_time, datetime.min.time()) 2722 ### Adding support for an integer datetime axis. 2723 elif 'int' in str(type(db_time)).lower(): 2724 st = int(db_time) 2725 ### Convert pandas timestamp to Python datetime. 2726 else: 2727 st = db_time.to_pydatetime() 2728 2729 sync_time = st 2730 2731 except Exception as e: 2732 sync_time = None 2733 warn(str(e)) 2734 2735 return sync_time
Get a Pipe's most recent datetime value.
Parameters
- pipe (mrsm.Pipe): The pipe to get the sync time for.
- params (Optional[Dict[str, Any]], default None):
Optional params dictionary to build the
WHERE
clause. Seemeerschaum.utils.sql.build_where
. - newest (bool, default True):
If
True
, get the most recent datetime (honoringparams
). IfFalse
, get the oldest datetime (ASC instead of DESC). - remote (bool, default False):
If
True
, return the sync time for the remote fetch definition.
Returns
- A
datetime
object (orint
if using an integer axis) if the pipe exists, otherwiseNone
.
2738def pipe_exists( 2739 self, 2740 pipe: mrsm.Pipe, 2741 debug: bool = False 2742) -> bool: 2743 """ 2744 Check that a Pipe's table exists. 2745 2746 Parameters 2747 ---------- 2748 pipe: mrsm.Pipe: 2749 The pipe to check. 2750 2751 debug: bool, default False 2752 Verbosity toggle. 2753 2754 Returns 2755 ------- 2756 A `bool` corresponding to whether a pipe's table exists. 2757 2758 """ 2759 from meerschaum.utils.sql import table_exists 2760 exists = table_exists( 2761 pipe.target, 2762 self, 2763 schema=self.get_pipe_schema(pipe), 2764 debug=debug, 2765 ) 2766 if debug: 2767 dprint(f"{pipe} " + ('exists.' if exists else 'does not exist.')) 2768 return exists
Check that a Pipe's table exists.
Parameters
- pipe (mrsm.Pipe:): The pipe to check.
- debug (bool, default False): Verbosity toggle.
Returns
- A
bool
corresponding to whether a pipe's table exists.
2771def get_pipe_rowcount( 2772 self, 2773 pipe: mrsm.Pipe, 2774 begin: Union[datetime, int, None] = None, 2775 end: Union[datetime, int, None] = None, 2776 params: Optional[Dict[str, Any]] = None, 2777 remote: bool = False, 2778 debug: bool = False 2779) -> Union[int, None]: 2780 """ 2781 Get the rowcount for a pipe in accordance with given parameters. 2782 2783 Parameters 2784 ---------- 2785 pipe: mrsm.Pipe 2786 The pipe to query with. 2787 2788 begin: Union[datetime, int, None], default None 2789 The begin datetime value. 2790 2791 end: Union[datetime, int, None], default None 2792 The end datetime value. 2793 2794 params: Optional[Dict[str, Any]], default None 2795 See `meerschaum.utils.sql.build_where`. 2796 2797 remote: bool, default False 2798 If `True`, get the rowcount for the remote table. 2799 2800 debug: bool, default False 2801 Verbosity toggle. 2802 2803 Returns 2804 ------- 2805 An `int` for the number of rows if the `pipe` exists, otherwise `None`. 2806 2807 """ 2808 from meerschaum.utils.sql import dateadd_str, sql_item_name, wrap_query_with_cte, build_where 2809 from meerschaum.connectors.sql._fetch import get_pipe_query 2810 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 2811 if remote: 2812 msg = f"'fetch:definition' must be an attribute of {pipe} to get a remote rowcount." 2813 if 'fetch' not in pipe.parameters: 2814 error(msg) 2815 return None 2816 if 'definition' not in pipe.parameters['fetch']: 2817 error(msg) 2818 return None 2819 2820 flavor = self.flavor if not remote else pipe.connector.flavor 2821 conn = self if not remote else pipe.connector 2822 _pipe_name = sql_item_name(pipe.target, flavor, self.get_pipe_schema(pipe)) 2823 dt_col = pipe.columns.get('datetime', None) 2824 dt_typ = pipe.dtypes.get(dt_col, 'datetime') if dt_col else None 2825 dt_db_type = get_db_type_from_pd_type(dt_typ, flavor) if dt_typ else None 2826 if not dt_col: 2827 dt_col = pipe.guess_datetime() 2828 dt_name = sql_item_name(dt_col, flavor, None) if dt_col else None 2829 is_guess = True 2830 else: 2831 dt_col = pipe.get_columns('datetime') 2832 dt_name = sql_item_name(dt_col, flavor, None) 2833 is_guess = False 2834 2835 if begin is not None or end is not None: 2836 if is_guess: 2837 if dt_col is None: 2838 warn( 2839 f"No datetime could be determined for {pipe}." 2840 + "\n Ignoring begin and end...", 2841 stack=False, 2842 ) 2843 begin, end = None, None 2844 else: 2845 warn( 2846 f"A datetime wasn't specified for {pipe}.\n" 2847 + f" Using column \"{dt_col}\" for datetime bounds...", 2848 stack=False, 2849 ) 2850 2851 2852 _datetime_name = sql_item_name(dt_col, flavor) 2853 _cols_names = [ 2854 sql_item_name(col, flavor) 2855 for col in set( 2856 ( 2857 [dt_col] 2858 if dt_col 2859 else [] 2860 ) + ( 2861 [] 2862 if params is None 2863 else list(params.keys()) 2864 ) 2865 ) 2866 ] 2867 if not _cols_names: 2868 _cols_names = ['*'] 2869 2870 src = ( 2871 f"SELECT {', '.join(_cols_names)}\nFROM {_pipe_name}" 2872 if not remote 2873 else get_pipe_query(pipe) 2874 ) 2875 parent_query = f"SELECT COUNT(*)\nFROM {sql_item_name('src', flavor)}" 2876 query = wrap_query_with_cte(src, parent_query, flavor) 2877 if begin is not None or end is not None: 2878 query += "\nWHERE" 2879 if begin is not None: 2880 query += ( 2881 f"\n {dt_name} >= " 2882 + dateadd_str(flavor, datepart='minute', number=0, begin=begin, db_type=dt_db_type) 2883 ) 2884 if end is not None and begin is not None: 2885 query += "\n AND" 2886 if end is not None: 2887 query += ( 2888 f"\n {dt_name} < " 2889 + dateadd_str(flavor, datepart='minute', number=0, begin=end, db_type=dt_db_type) 2890 ) 2891 if params is not None: 2892 existing_cols = pipe.get_columns_types(debug=debug) 2893 valid_params = {k: v for k, v in params.items() if k in existing_cols} 2894 if valid_params: 2895 query += build_where(valid_params, conn).replace('WHERE', ( 2896 'AND' if (begin is not None or end is not None) 2897 else 'WHERE' 2898 ) 2899 ) 2900 2901 result = conn.value(query, debug=debug, silent=True) 2902 try: 2903 return int(result) 2904 except Exception: 2905 return None
Get the rowcount for a pipe in accordance with given parameters.
Parameters
- pipe (mrsm.Pipe): The pipe to query with.
- begin (Union[datetime, int, None], default None): The begin datetime value.
- end (Union[datetime, int, None], default None): The end datetime value.
- params (Optional[Dict[str, Any]], default None):
See
meerschaum.utils.sql.build_where
. - remote (bool, default False):
If
True
, get the rowcount for the remote table. - debug (bool, default False): Verbosity toggle.
Returns
- An
int
for the number of rows if thepipe
exists, otherwiseNone
.
2908def drop_pipe( 2909 self, 2910 pipe: mrsm.Pipe, 2911 debug: bool = False, 2912 **kw 2913) -> SuccessTuple: 2914 """ 2915 Drop a pipe's tables but maintain its registration. 2916 2917 Parameters 2918 ---------- 2919 pipe: mrsm.Pipe 2920 The pipe to drop. 2921 2922 Returns 2923 ------- 2924 A `SuccessTuple` indicated success. 2925 """ 2926 from meerschaum.utils.sql import table_exists, sql_item_name, DROP_IF_EXISTS_FLAVORS 2927 success = True 2928 target = pipe.target 2929 schema = self.get_pipe_schema(pipe) 2930 target_name = ( 2931 sql_item_name(target, self.flavor, schema) 2932 ) 2933 if table_exists(target, self, schema=schema, debug=debug): 2934 if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else "" 2935 success = self.exec( 2936 f"DROP TABLE {if_exists_str} {target_name}", silent=True, debug=debug 2937 ) is not None 2938 2939 msg = "Success" if success else f"Failed to drop {pipe}." 2940 return success, msg
Drop a pipe's tables but maintain its registration.
Parameters
- pipe (mrsm.Pipe): The pipe to drop.
Returns
- A
SuccessTuple
indicated success.
2943def clear_pipe( 2944 self, 2945 pipe: mrsm.Pipe, 2946 begin: Union[datetime, int, None] = None, 2947 end: Union[datetime, int, None] = None, 2948 params: Optional[Dict[str, Any]] = None, 2949 debug: bool = False, 2950 **kw 2951) -> SuccessTuple: 2952 """ 2953 Delete a pipe's data within a bounded or unbounded interval without dropping the table. 2954 2955 Parameters 2956 ---------- 2957 pipe: mrsm.Pipe 2958 The pipe to clear. 2959 2960 begin: Union[datetime, int, None], default None 2961 Beginning datetime. Inclusive. 2962 2963 end: Union[datetime, int, None], default None 2964 Ending datetime. Exclusive. 2965 2966 params: Optional[Dict[str, Any]], default None 2967 See `meerschaum.utils.sql.build_where`. 2968 2969 """ 2970 if not pipe.exists(debug=debug): 2971 return True, f"{pipe} does not exist, so nothing was cleared." 2972 2973 from meerschaum.utils.sql import sql_item_name, build_where, dateadd_str 2974 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 2975 pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 2976 2977 dt_col = pipe.columns.get('datetime', None) 2978 dt_typ = pipe.dtypes.get(dt_col, 'datetime') if dt_col else None 2979 dt_db_type = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None 2980 if not pipe.columns.get('datetime', None): 2981 dt_col = pipe.guess_datetime() 2982 dt_name = sql_item_name(dt_col, self.flavor, None) if dt_col else None 2983 is_guess = True 2984 else: 2985 dt_col = pipe.get_columns('datetime') 2986 dt_name = sql_item_name(dt_col, self.flavor, None) 2987 is_guess = False 2988 2989 if begin is not None or end is not None: 2990 if is_guess: 2991 if dt_col is None: 2992 warn( 2993 f"No datetime could be determined for {pipe}." 2994 + "\n Ignoring datetime bounds...", 2995 stack=False, 2996 ) 2997 begin, end = None, None 2998 else: 2999 warn( 3000 f"A datetime wasn't specified for {pipe}.\n" 3001 + f" Using column \"{dt_col}\" for datetime bounds...", 3002 stack=False, 3003 ) 3004 3005 valid_params = {} 3006 if params is not None: 3007 existing_cols = pipe.get_columns_types(debug=debug) 3008 valid_params = {k: v for k, v in params.items() if k in existing_cols} 3009 clear_query = ( 3010 f"DELETE FROM {pipe_name}\nWHERE 1 = 1\n" 3011 + ('\n AND ' + build_where(valid_params, self, with_where=False) if valid_params else '') 3012 + ( 3013 ( 3014 f'\n AND {dt_name} >= ' 3015 + dateadd_str(self.flavor, 'day', 0, begin, db_type=dt_db_type) 3016 ) 3017 if begin is not None 3018 else '' 3019 ) + ( 3020 ( 3021 f'\n AND {dt_name} < ' 3022 + dateadd_str(self.flavor, 'day', 0, end, db_type=dt_db_type) 3023 ) 3024 if end is not None 3025 else '' 3026 ) 3027 ) 3028 success = self.exec(clear_query, silent=True, debug=debug) is not None 3029 msg = "Success" if success else f"Failed to clear {pipe}." 3030 return success, msg
Delete a pipe's data within a bounded or unbounded interval without dropping the table.
Parameters
- pipe (mrsm.Pipe): The pipe to clear.
- begin (Union[datetime, int, None], default None): Beginning datetime. Inclusive.
- end (Union[datetime, int, None], default None): Ending datetime. Exclusive.
- params (Optional[Dict[str, Any]], default None):
See
meerschaum.utils.sql.build_where
.
3668def deduplicate_pipe( 3669 self, 3670 pipe: mrsm.Pipe, 3671 begin: Union[datetime, int, None] = None, 3672 end: Union[datetime, int, None] = None, 3673 params: Optional[Dict[str, Any]] = None, 3674 debug: bool = False, 3675 **kwargs: Any 3676) -> SuccessTuple: 3677 """ 3678 Delete duplicate values within a pipe's table. 3679 3680 Parameters 3681 ---------- 3682 pipe: mrsm.Pipe 3683 The pipe whose table to deduplicate. 3684 3685 begin: Union[datetime, int, None], default None 3686 If provided, only deduplicate values greater than or equal to this value. 3687 3688 end: Union[datetime, int, None], default None 3689 If provided, only deduplicate values less than this value. 3690 3691 params: Optional[Dict[str, Any]], default None 3692 If provided, further limit deduplication to values which match this query dictionary. 3693 3694 debug: bool, default False 3695 Verbosity toggle. 3696 3697 Returns 3698 ------- 3699 A `SuccessTuple` indicating success. 3700 """ 3701 from meerschaum.utils.sql import ( 3702 sql_item_name, 3703 get_rename_table_queries, 3704 DROP_IF_EXISTS_FLAVORS, 3705 get_create_table_query, 3706 format_cte_subquery, 3707 get_null_replacement, 3708 ) 3709 from meerschaum.utils.misc import generate_password, flatten_list 3710 3711 pipe_table_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 3712 3713 if not pipe.exists(debug=debug): 3714 return False, f"Table {pipe_table_name} does not exist." 3715 3716 dt_col = pipe.columns.get('datetime', None) 3717 cols_types = pipe.get_columns_types(debug=debug) 3718 existing_cols = pipe.get_columns_types(debug=debug) 3719 3720 get_rowcount_query = f"SELECT COUNT(*) FROM {pipe_table_name}" 3721 old_rowcount = self.value(get_rowcount_query, debug=debug) 3722 if old_rowcount is None: 3723 return False, f"Failed to get rowcount for table {pipe_table_name}." 3724 3725 ### Non-datetime indices that in fact exist. 3726 indices = [ 3727 col 3728 for key, col in pipe.columns.items() 3729 if col and col != dt_col and col in cols_types 3730 ] 3731 indices_names = [sql_item_name(index_col, self.flavor, None) for index_col in indices] 3732 existing_cols_names = [sql_item_name(col, self.flavor, None) for col in existing_cols] 3733 duplicate_row_number_name = sql_item_name('dup_row_num', self.flavor, None) 3734 previous_row_number_name = sql_item_name('prev_row_num', self.flavor, None) 3735 3736 index_list_str = ( 3737 sql_item_name(dt_col, self.flavor, None) 3738 if dt_col 3739 else '' 3740 ) 3741 index_list_str_ordered = ( 3742 ( 3743 sql_item_name(dt_col, self.flavor, None) + " DESC" 3744 ) 3745 if dt_col 3746 else '' 3747 ) 3748 if indices: 3749 index_list_str += ', ' + ', '.join(indices_names) 3750 index_list_str_ordered += ', ' + ', '.join(indices_names) 3751 if index_list_str.startswith(','): 3752 index_list_str = index_list_str.lstrip(',').lstrip() 3753 if index_list_str_ordered.startswith(','): 3754 index_list_str_ordered = index_list_str_ordered.lstrip(',').lstrip() 3755 3756 cols_list_str = ', '.join(existing_cols_names) 3757 3758 try: 3759 ### NOTE: MySQL 5 and below does not support window functions (ROW_NUMBER()). 3760 is_old_mysql = ( 3761 self.flavor in ('mysql', 'mariadb') 3762 and 3763 int(self.db_version.split('.')[0]) < 8 3764 ) 3765 except Exception: 3766 is_old_mysql = False 3767 3768 src_query = f""" 3769 SELECT 3770 {cols_list_str}, 3771 ROW_NUMBER() OVER ( 3772 PARTITION BY 3773 {index_list_str} 3774 ORDER BY {index_list_str_ordered} 3775 ) AS {duplicate_row_number_name} 3776 FROM {pipe_table_name} 3777 """ 3778 duplicates_cte_subquery = format_cte_subquery( 3779 src_query, 3780 self.flavor, 3781 sub_name = 'src', 3782 cols_to_select = cols_list_str, 3783 ) + f""" 3784 WHERE {duplicate_row_number_name} = 1 3785 """ 3786 old_mysql_query = ( 3787 f""" 3788 SELECT 3789 {index_list_str} 3790 FROM ( 3791 SELECT 3792 {index_list_str}, 3793 IF( 3794 @{previous_row_number_name} <> {index_list_str.replace(', ', ' + ')}, 3795 @{duplicate_row_number_name} := 0, 3796 @{duplicate_row_number_name} 3797 ), 3798 @{previous_row_number_name} := {index_list_str.replace(', ', ' + ')}, 3799 @{duplicate_row_number_name} := @{duplicate_row_number_name} + 1 AS """ 3800 + f"""{duplicate_row_number_name} 3801 FROM 3802 {pipe_table_name}, 3803 ( 3804 SELECT @{duplicate_row_number_name} := 0 3805 ) AS {duplicate_row_number_name}, 3806 ( 3807 SELECT @{previous_row_number_name} := '{get_null_replacement('str', 'mysql')}' 3808 ) AS {previous_row_number_name} 3809 ORDER BY {index_list_str_ordered} 3810 ) AS t 3811 WHERE {duplicate_row_number_name} = 1 3812 """ 3813 ) 3814 if is_old_mysql: 3815 duplicates_cte_subquery = old_mysql_query 3816 3817 session_id = generate_password(3) 3818 3819 dedup_table = self.get_temporary_target(pipe.target, transact_id=session_id, label='dedup') 3820 temp_old_table = self.get_temporary_target(pipe.target, transact_id=session_id, label='old') 3821 temp_old_table_name = sql_item_name(temp_old_table, self.flavor, self.get_pipe_schema(pipe)) 3822 3823 create_temporary_table_query = get_create_table_query( 3824 duplicates_cte_subquery, 3825 dedup_table, 3826 self.flavor, 3827 ) + f""" 3828 ORDER BY {index_list_str_ordered} 3829 """ 3830 if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else "" 3831 alter_queries = flatten_list([ 3832 get_rename_table_queries( 3833 pipe.target, 3834 temp_old_table, 3835 self.flavor, 3836 schema=self.get_pipe_schema(pipe), 3837 ), 3838 get_rename_table_queries( 3839 dedup_table, 3840 pipe.target, 3841 self.flavor, 3842 schema=self.get_pipe_schema(pipe), 3843 ), 3844 f"DROP TABLE {if_exists_str} {temp_old_table_name}", 3845 ]) 3846 3847 self._log_temporary_tables_creation(temp_old_table, create=(not pipe.temporary), debug=debug) 3848 create_temporary_result = self.execute(create_temporary_table_query, debug=debug) 3849 if create_temporary_result is None: 3850 return False, f"Failed to deduplicate table {pipe_table_name}." 3851 3852 results = self.exec_queries( 3853 alter_queries, 3854 break_on_error=True, 3855 rollback=True, 3856 debug=debug, 3857 ) 3858 3859 fail_query = None 3860 for result, query in zip(results, alter_queries): 3861 if result is None: 3862 fail_query = query 3863 break 3864 success = fail_query is None 3865 3866 new_rowcount = ( 3867 self.value(get_rowcount_query, debug=debug) 3868 if success 3869 else None 3870 ) 3871 3872 msg = ( 3873 ( 3874 f"Successfully deduplicated table {pipe_table_name}" 3875 + ( 3876 f"\nfrom {old_rowcount:,} to {new_rowcount:,} rows" 3877 if old_rowcount != new_rowcount 3878 else '' 3879 ) + '.' 3880 ) 3881 if success 3882 else f"Failed to execute query:\n{fail_query}" 3883 ) 3884 return success, msg
Delete duplicate values within a pipe's table.
Parameters
- pipe (mrsm.Pipe): The pipe whose table to deduplicate.
- begin (Union[datetime, int, None], default None): If provided, only deduplicate values greater than or equal to this value.
- end (Union[datetime, int, None], default None): If provided, only deduplicate values less than this value.
- params (Optional[Dict[str, Any]], default None): If provided, further limit deduplication to values which match this query dictionary.
- debug (bool, default False): Verbosity toggle.
Returns
- A
SuccessTuple
indicating success.
3033def get_pipe_table( 3034 self, 3035 pipe: mrsm.Pipe, 3036 debug: bool = False, 3037) -> Union['sqlalchemy.Table', None]: 3038 """ 3039 Return the `sqlalchemy.Table` object for a `mrsm.Pipe`. 3040 3041 Parameters 3042 ---------- 3043 pipe: mrsm.Pipe: 3044 The pipe in question. 3045 3046 Returns 3047 ------- 3048 A `sqlalchemy.Table` object. 3049 3050 """ 3051 from meerschaum.utils.sql import get_sqlalchemy_table 3052 if not pipe.exists(debug=debug): 3053 return None 3054 3055 return get_sqlalchemy_table( 3056 pipe.target, 3057 connector=self, 3058 schema=self.get_pipe_schema(pipe), 3059 debug=debug, 3060 refresh=True, 3061 )
Return the sqlalchemy.Table
object for a mrsm.Pipe
.
Parameters
- pipe (mrsm.Pipe:): The pipe in question.
Returns
- A
sqlalchemy.Table
object.
3064def get_pipe_columns_types( 3065 self, 3066 pipe: mrsm.Pipe, 3067 debug: bool = False, 3068) -> Dict[str, str]: 3069 """ 3070 Get the pipe's columns and types. 3071 3072 Parameters 3073 ---------- 3074 pipe: mrsm.Pipe: 3075 The pipe to get the columns for. 3076 3077 Returns 3078 ------- 3079 A dictionary of columns names (`str`) and types (`str`). 3080 3081 Examples 3082 -------- 3083 >>> conn.get_pipe_columns_types(pipe) 3084 { 3085 'dt': 'TIMESTAMP WITHOUT TIMEZONE', 3086 'id': 'BIGINT', 3087 'val': 'DOUBLE PRECISION', 3088 } 3089 >>> 3090 """ 3091 from meerschaum.utils.sql import get_table_cols_types 3092 if not pipe.exists(debug=debug): 3093 return {} 3094 3095 if self.flavor not in ('oracle', 'mysql', 'mariadb', 'sqlite', 'geopackage'): 3096 return get_table_cols_types( 3097 pipe.target, 3098 self, 3099 flavor=self.flavor, 3100 schema=self.get_pipe_schema(pipe), 3101 debug=debug, 3102 ) 3103 3104 if debug: 3105 dprint(f"Fetching columns_types for {pipe} with via SQLAlchemy table.") 3106 3107 table_columns = {} 3108 try: 3109 pipe_table = self.get_pipe_table(pipe, debug=debug) 3110 if pipe_table is None: 3111 return {} 3112 3113 if debug: 3114 dprint("Found columns:") 3115 mrsm.pprint(dict(pipe_table.columns)) 3116 3117 for col in pipe_table.columns: 3118 table_columns[str(col.name)] = str(col.type) 3119 except Exception as e: 3120 traceback.print_exc() 3121 warn(e) 3122 table_columns = {} 3123 3124 return table_columns
Get the pipe's columns and types.
Parameters
- pipe (mrsm.Pipe:): The pipe to get the columns for.
Returns
- A dictionary of columns names (
str
) and types (str
).
Examples
>>> conn.get_pipe_columns_types(pipe)
{
'dt': 'TIMESTAMP WITHOUT TIMEZONE',
'id': 'BIGINT',
'val': 'DOUBLE PRECISION',
}
>>>
3614def get_to_sql_dtype( 3615 self, 3616 pipe: 'mrsm.Pipe', 3617 df: 'pd.DataFrame', 3618 update_dtypes: bool = True, 3619) -> Dict[str, 'sqlalchemy.sql.visitors.TraversibleType']: 3620 """ 3621 Given a pipe and DataFrame, return the `dtype` dictionary for `to_sql()`. 3622 3623 Parameters 3624 ---------- 3625 pipe: mrsm.Pipe 3626 The pipe which may contain a `dtypes` parameter. 3627 3628 df: pd.DataFrame 3629 The DataFrame to be pushed via `to_sql()`. 3630 3631 update_dtypes: bool, default True 3632 If `True`, patch the pipe's dtypes onto the DataFrame's dtypes. 3633 3634 Returns 3635 ------- 3636 A dictionary with `sqlalchemy` datatypes. 3637 3638 Examples 3639 -------- 3640 >>> import pandas as pd 3641 >>> import meerschaum as mrsm 3642 >>> 3643 >>> conn = mrsm.get_connector('sql:memory') 3644 >>> df = pd.DataFrame([{'a': {'b': 1}}]) 3645 >>> pipe = mrsm.Pipe('a', 'b', dtypes={'a': 'json'}) 3646 >>> get_to_sql_dtype(pipe, df) 3647 {'a': <class 'sqlalchemy.sql.sqltypes.JSON'>} 3648 """ 3649 from meerschaum.utils.dataframe import get_special_cols 3650 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 3651 df_dtypes = { 3652 col: str(typ) 3653 for col, typ in df.dtypes.items() 3654 } 3655 special_cols = get_special_cols(df) 3656 df_dtypes.update(special_cols) 3657 3658 if update_dtypes: 3659 df_dtypes.update(pipe.dtypes) 3660 3661 return { 3662 col: get_db_type_from_pd_type(typ, self.flavor, as_sqlalchemy=True) 3663 for col, typ in df_dtypes.items() 3664 if col and typ 3665 }
Given a pipe and DataFrame, return the dtype
dictionary for to_sql()
.
Parameters
- pipe (mrsm.Pipe):
The pipe which may contain a
dtypes
parameter. - df (pd.DataFrame):
The DataFrame to be pushed via
to_sql()
. - update_dtypes (bool, default True):
If
True
, patch the pipe's dtypes onto the DataFrame's dtypes.
Returns
- A dictionary with
sqlalchemy
datatypes.
Examples
>>> import pandas as pd
>>> import meerschaum as mrsm
>>>
>>> conn = mrsm.get_connector('sql:memory')
>>> df = pd.DataFrame([{'a': {'b': 1}}])
>>> pipe = mrsm.Pipe('a', 'b', dtypes={'a': 'json'})
>>> get_to_sql_dtype(pipe, df)
{'a': <class 'sqlalchemy.sql.sqltypes.JSON'>}
3887def get_pipe_schema(self, pipe: mrsm.Pipe) -> Union[str, None]: 3888 """ 3889 Return the schema to use for this pipe. 3890 First check `pipe.parameters['schema']`, then check `self.schema`. 3891 3892 Parameters 3893 ---------- 3894 pipe: mrsm.Pipe 3895 The pipe which may contain a configured schema. 3896 3897 Returns 3898 ------- 3899 A schema string or `None` if nothing is configured. 3900 """ 3901 if self.flavor in ('sqlite', 'geopackage'): 3902 return self.schema 3903 return pipe.parameters.get('schema', self.schema)
Return the schema to use for this pipe.
First check pipe.parameters['schema']
, then check self.schema
.
Parameters
- pipe (mrsm.Pipe): The pipe which may contain a configured schema.
Returns
- A schema string or
None
if nothing is configured.
1551def create_pipe_table_from_df( 1552 self, 1553 pipe: mrsm.Pipe, 1554 df: 'pd.DataFrame', 1555 debug: bool = False, 1556) -> mrsm.SuccessTuple: 1557 """ 1558 Create a pipe's table from its configured dtypes and an incoming dataframe. 1559 """ 1560 from meerschaum.utils.dataframe import get_special_cols 1561 from meerschaum.utils.sql import ( 1562 get_create_table_queries, 1563 sql_item_name, 1564 get_create_schema_if_not_exists_queries, 1565 ) 1566 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 1567 if self.flavor == 'geopackage': 1568 init_success, init_msg = self._init_geopackage_table(df, pipe.target, debug=debug) 1569 if not init_success: 1570 return init_success, init_msg 1571 1572 primary_key = pipe.columns.get('primary', None) 1573 primary_key_typ = ( 1574 pipe.dtypes.get(primary_key, str(df.dtypes.get(primary_key, 'int'))) 1575 if primary_key 1576 else None 1577 ) 1578 primary_key_db_type = ( 1579 get_db_type_from_pd_type(primary_key_typ, self.flavor) 1580 if primary_key 1581 else None 1582 ) 1583 dt_col = pipe.columns.get('datetime', None) 1584 new_dtypes = { 1585 **{ 1586 col: str(typ) 1587 for col, typ in df.dtypes.items() 1588 }, 1589 **{ 1590 col: str(df.dtypes.get(col, 'int')) 1591 for col_ix, col in pipe.columns.items() 1592 if col and col_ix != 'primary' 1593 }, 1594 **get_special_cols(df), 1595 **pipe.dtypes 1596 } 1597 autoincrement = ( 1598 pipe.parameters.get('autoincrement', False) 1599 or (primary_key and primary_key not in new_dtypes) 1600 ) 1601 if autoincrement: 1602 _ = new_dtypes.pop(primary_key, None) 1603 1604 schema = self.get_pipe_schema(pipe) 1605 create_table_queries = get_create_table_queries( 1606 new_dtypes, 1607 pipe.target, 1608 self.flavor, 1609 schema=schema, 1610 primary_key=primary_key, 1611 primary_key_db_type=primary_key_db_type, 1612 datetime_column=dt_col, 1613 ) 1614 if schema: 1615 create_table_queries = ( 1616 get_create_schema_if_not_exists_queries(schema, self.flavor) 1617 + create_table_queries 1618 ) 1619 success = all( 1620 self.exec_queries(create_table_queries, break_on_error=True, rollback=True, debug=debug) 1621 ) 1622 target_name = sql_item_name(pipe.target, schema=self.get_pipe_schema(pipe), flavor=self.flavor) 1623 msg = ( 1624 "Success" 1625 if success 1626 else f"Failed to create {target_name}." 1627 ) 1628 if success and self.flavor == 'geopackage': 1629 return self._init_geopackage_table(df, target, debug=debug) 1630 1631 return success, msg
Create a pipe's table from its configured dtypes and an incoming dataframe.
3127def get_pipe_columns_indices( 3128 self, 3129 pipe: mrsm.Pipe, 3130 debug: bool = False, 3131) -> Dict[str, List[Dict[str, str]]]: 3132 """ 3133 Return a dictionary mapping columns to the indices created on those columns. 3134 3135 Parameters 3136 ---------- 3137 pipe: mrsm.Pipe 3138 The pipe to be queried against. 3139 3140 Returns 3141 ------- 3142 A dictionary mapping columns names to lists of dictionaries. 3143 The dictionaries in the lists contain the name and type of the indices. 3144 """ 3145 if pipe.__dict__.get('_skip_check_indices', False): 3146 return {} 3147 3148 from meerschaum.utils.sql import get_table_cols_indices 3149 return get_table_cols_indices( 3150 pipe.target, 3151 self, 3152 flavor=self.flavor, 3153 schema=self.get_pipe_schema(pipe), 3154 debug=debug, 3155 )
Return a dictionary mapping columns to the indices created on those columns.
Parameters
- pipe (mrsm.Pipe): The pipe to be queried against.
Returns
- A dictionary mapping columns names to lists of dictionaries.
- The dictionaries in the lists contain the name and type of the indices.
3906@staticmethod 3907def get_temporary_target( 3908 target: str, 3909 transact_id: Optional[str] = None, 3910 label: Optional[str] = None, 3911 separator: Optional[str] = None, 3912) -> str: 3913 """ 3914 Return a unique(ish) temporary target for a pipe. 3915 """ 3916 from meerschaum.utils.misc import generate_password 3917 temp_target_cf = ( 3918 mrsm.get_config('system', 'connectors', 'sql', 'instance', 'temporary_target') or {} 3919 ) 3920 transaction_id_len = temp_target_cf.get('transaction_id_length', 3) 3921 transact_id = transact_id or generate_password(transaction_id_len) 3922 temp_prefix = temp_target_cf.get('prefix', '_') 3923 separator = separator or temp_target_cf.get('separator', '_') 3924 return ( 3925 temp_prefix 3926 + target 3927 + separator 3928 + transact_id 3929 + ((separator + label) if label else '') 3930 )
Return a unique(ish) temporary target for a pipe.
351def create_pipe_indices( 352 self, 353 pipe: mrsm.Pipe, 354 columns: Optional[List[str]] = None, 355 debug: bool = False, 356) -> SuccessTuple: 357 """ 358 Create a pipe's indices. 359 """ 360 success = self.create_indices(pipe, columns=columns, debug=debug) 361 msg = ( 362 "Success" 363 if success 364 else f"Failed to create indices for {pipe}." 365 ) 366 return success, msg
Create a pipe's indices.
407def drop_pipe_indices( 408 self, 409 pipe: mrsm.Pipe, 410 columns: Optional[List[str]] = None, 411 debug: bool = False, 412) -> SuccessTuple: 413 """ 414 Drop a pipe's indices. 415 """ 416 success = self.drop_indices(pipe, columns=columns, debug=debug) 417 msg = ( 418 "Success" 419 if success 420 else f"Failed to drop indices for {pipe}." 421 ) 422 return success, msg
Drop a pipe's indices.
459def get_pipe_index_names(self, pipe: mrsm.Pipe) -> Dict[str, str]: 460 """ 461 Return a dictionary mapping index keys to their names on the database. 462 463 Returns 464 ------- 465 A dictionary of index keys to column names. 466 """ 467 from meerschaum.utils.sql import DEFAULT_SCHEMA_FLAVORS, truncate_item_name 468 _parameters = pipe.parameters 469 _index_template = _parameters.get('index_template', "IX_{schema_str}{target}_{column_names}") 470 _schema = self.get_pipe_schema(pipe) 471 if _schema is None: 472 _schema = ( 473 DEFAULT_SCHEMA_FLAVORS.get(self.flavor, None) 474 if self.flavor != 'mssql' 475 else None 476 ) 477 schema_str = '' if _schema is None else f'{_schema}_' 478 schema_str = '' 479 _indices = pipe.indices 480 _target = pipe.target 481 _column_names = { 482 ix: ( 483 '_'.join(cols) 484 if isinstance(cols, (list, tuple)) 485 else str(cols) 486 ) 487 for ix, cols in _indices.items() 488 if cols 489 } 490 _index_names = { 491 ix: _index_template.format( 492 target=_target, 493 column_names=column_names, 494 connector_keys=pipe.connector_keys, 495 metric_key=pipe.metric_key, 496 location_key=pipe.location_key, 497 schema_str=schema_str, 498 ) 499 for ix, column_names in _column_names.items() 500 } 501 ### NOTE: Skip any duplicate indices. 502 seen_index_names = {} 503 for ix, index_name in _index_names.items(): 504 if index_name in seen_index_names: 505 continue 506 seen_index_names[index_name] = ix 507 return { 508 ix: truncate_item_name(index_name, flavor=self.flavor) 509 for index_name, ix in seen_index_names.items() 510 }
Return a dictionary mapping index keys to their names on the database.
Returns
- A dictionary of index keys to column names.
18def get_plugins_pipe(self) -> mrsm.Pipe: 19 """ 20 Return the internal metadata plugins pipe. 21 """ 22 users_pipe = self.get_users_pipe() 23 user_id_dtype = users_pipe.dtypes.get('user_id', 'int') 24 return mrsm.Pipe( 25 'mrsm', 'plugins', 26 instance=self, 27 temporary=True, 28 static=True, 29 null_indices=False, 30 columns={ 31 'primary': 'plugin_id', 32 'user_id': 'user_id', 33 }, 34 dtypes={ 35 'plugin_name': 'string', 36 'user_id': user_id_dtype, 37 'attributes': 'json', 38 'version': 'string', 39 }, 40 indices={ 41 'unique': 'plugin_name', 42 }, 43 )
Return the internal metadata plugins pipe.
46def register_plugin( 47 self, 48 plugin: 'mrsm.core.Plugin', 49 force: bool = False, 50 debug: bool = False, 51 **kw: Any 52) -> SuccessTuple: 53 """Register a new plugin to the plugins table.""" 54 from meerschaum.utils.packages import attempt_import 55 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 56 from meerschaum.utils.sql import json_flavors 57 from meerschaum.connectors.sql.tables import get_tables 58 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 59 60 old_id = self.get_plugin_id(plugin, debug=debug) 61 62 ### Check for version conflict. May be overridden with `--force`. 63 if old_id is not None and not force: 64 old_version = self.get_plugin_version(plugin, debug=debug) 65 new_version = plugin.version 66 if old_version is None: 67 old_version = '' 68 if new_version is None: 69 new_version = '' 70 71 ### verify that the new version is greater than the old 72 packaging_version = attempt_import('packaging.version') 73 if ( 74 old_version and new_version 75 and packaging_version.parse(old_version) >= packaging_version.parse(new_version) 76 ): 77 return False, ( 78 f"Version '{new_version}' of plugin '{plugin}' " + 79 f"must be greater than existing version '{old_version}'." 80 ) 81 82 bind_variables = { 83 'plugin_name': plugin.name, 84 'version': plugin.version, 85 'attributes': ( 86 json.dumps(plugin.attributes) if self.flavor not in json_flavors else plugin.attributes 87 ), 88 'user_id': plugin.user_id, 89 } 90 91 if old_id is None: 92 query = sqlalchemy.insert(plugins_tbl).values(**bind_variables) 93 else: 94 query = ( 95 sqlalchemy.update(plugins_tbl) 96 .values(**bind_variables) 97 .where(plugins_tbl.c.plugin_id == old_id) 98 ) 99 100 result = self.exec(query, debug=debug) 101 if result is None: 102 return False, f"Failed to register plugin '{plugin}'." 103 return True, f"Successfully registered plugin '{plugin}'."
Register a new plugin to the plugins table.
272def delete_plugin( 273 self, 274 plugin: 'mrsm.core.Plugin', 275 debug: bool = False, 276 **kw: Any 277) -> SuccessTuple: 278 """Delete a plugin from the plugins table.""" 279 from meerschaum.utils.packages import attempt_import 280 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 281 from meerschaum.connectors.sql.tables import get_tables 282 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 283 284 plugin_id = self.get_plugin_id(plugin, debug=debug) 285 if plugin_id is None: 286 return True, f"Plugin '{plugin}' was not registered." 287 288 query = sqlalchemy.delete(plugins_tbl).where(plugins_tbl.c.plugin_id == plugin_id) 289 result = self.exec(query, debug=debug) 290 if result is None: 291 return False, f"Failed to delete plugin '{plugin}'." 292 return True, f"Successfully deleted plugin '{plugin}'."
Delete a plugin from the plugins table.
105def get_plugin_id( 106 self, 107 plugin: 'mrsm.core.Plugin', 108 debug: bool = False 109) -> Optional[int]: 110 """ 111 Return a plugin's ID. 112 """ 113 ### ensure plugins table exists 114 from meerschaum.connectors.sql.tables import get_tables 115 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 116 from meerschaum.utils.packages import attempt_import 117 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 118 119 query = ( 120 sqlalchemy 121 .select(plugins_tbl.c.plugin_id) 122 .where(plugins_tbl.c.plugin_name == plugin.name) 123 ) 124 125 try: 126 return int(self.value(query, debug=debug)) 127 except Exception: 128 return None
Return a plugin's ID.
131def get_plugin_version( 132 self, 133 plugin: 'mrsm.core.Plugin', 134 debug: bool = False 135) -> Optional[str]: 136 """ 137 Return a plugin's version. 138 """ 139 ### ensure plugins table exists 140 from meerschaum.connectors.sql.tables import get_tables 141 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 142 from meerschaum.utils.packages import attempt_import 143 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 144 query = sqlalchemy.select(plugins_tbl.c.version).where(plugins_tbl.c.plugin_name == plugin.name) 145 return self.value(query, debug=debug)
Return a plugin's version.
225def get_plugins( 226 self, 227 user_id: Optional[int] = None, 228 search_term: Optional[str] = None, 229 debug: bool = False, 230 **kw: Any 231) -> List[str]: 232 """ 233 Return a list of all registered plugins. 234 235 Parameters 236 ---------- 237 user_id: Optional[int], default None 238 If specified, filter plugins by a specific `user_id`. 239 240 search_term: Optional[str], default None 241 If specified, add a `WHERE plugin_name LIKE '{search_term}%'` clause to filter the plugins. 242 243 244 Returns 245 ------- 246 A list of plugin names. 247 """ 248 ### ensure plugins table exists 249 from meerschaum.connectors.sql.tables import get_tables 250 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 251 from meerschaum.utils.packages import attempt_import 252 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 253 254 query = sqlalchemy.select(plugins_tbl.c.plugin_name) 255 if user_id is not None: 256 query = query.where(plugins_tbl.c.user_id == user_id) 257 if search_term is not None: 258 query = query.where(plugins_tbl.c.plugin_name.like(search_term + '%')) 259 260 rows = ( 261 self.execute(query).fetchall() 262 if self.flavor != 'duckdb' 263 else [ 264 (row['plugin_name'],) 265 for row in self.read(query).to_dict(orient='records') 266 ] 267 ) 268 269 return [row[0] for row in rows]
Return a list of all registered plugins.
Parameters
- user_id (Optional[int], default None):
If specified, filter plugins by a specific
user_id
. - search_term (Optional[str], default None):
If specified, add a
WHERE plugin_name LIKE '{search_term}%'
clause to filter the plugins.
Returns
- A list of plugin names.
147def get_plugin_user_id( 148 self, 149 plugin: 'mrsm.core.Plugin', 150 debug: bool = False 151) -> Optional[int]: 152 """ 153 Return a plugin's user ID. 154 """ 155 ### ensure plugins table exists 156 from meerschaum.connectors.sql.tables import get_tables 157 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 158 from meerschaum.utils.packages import attempt_import 159 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 160 161 query = ( 162 sqlalchemy 163 .select(plugins_tbl.c.user_id) 164 .where(plugins_tbl.c.plugin_name == plugin.name) 165 ) 166 167 try: 168 return int(self.value(query, debug=debug)) 169 except Exception: 170 return None
Return a plugin's user ID.
172def get_plugin_username( 173 self, 174 plugin: 'mrsm.core.Plugin', 175 debug: bool = False 176) -> Optional[str]: 177 """ 178 Return the username of a plugin's owner. 179 """ 180 ### ensure plugins table exists 181 from meerschaum.connectors.sql.tables import get_tables 182 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 183 users = get_tables(mrsm_instance=self, debug=debug)['users'] 184 from meerschaum.utils.packages import attempt_import 185 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 186 187 query = ( 188 sqlalchemy.select(users.c.username) 189 .where( 190 users.c.user_id == plugins_tbl.c.user_id 191 and plugins_tbl.c.plugin_name == plugin.name 192 ) 193 ) 194 195 return self.value(query, debug=debug)
Return the username of a plugin's owner.
198def get_plugin_attributes( 199 self, 200 plugin: 'mrsm.core.Plugin', 201 debug: bool = False 202) -> Dict[str, Any]: 203 """ 204 Return the attributes of a plugin. 205 """ 206 ### ensure plugins table exists 207 from meerschaum.connectors.sql.tables import get_tables 208 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 209 from meerschaum.utils.packages import attempt_import 210 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 211 212 query = ( 213 sqlalchemy 214 .select(plugins_tbl.c.attributes) 215 .where(plugins_tbl.c.plugin_name == plugin.name) 216 ) 217 218 _attr = self.value(query, debug=debug) 219 if isinstance(_attr, str): 220 _attr = json.loads(_attr) 221 elif _attr is None: 222 _attr = {} 223 return _attr
Return the attributes of a plugin.
16def get_users_pipe(self) -> mrsm.Pipe: 17 """ 18 Return the internal metadata pipe for users management. 19 """ 20 if '_users_pipe' in self.__dict__: 21 return self._users_pipe 22 23 cache_connector = self.__dict__.get('_cache_connector', None) 24 self._users_pipe = mrsm.Pipe( 25 'mrsm', 'users', 26 temporary=True, 27 cache=True, 28 cache_connector_keys=cache_connector, 29 static=True, 30 null_indices=False, 31 enforce=False, 32 autoincrement=True, 33 columns={ 34 'primary': 'user_id', 35 }, 36 dtypes={ 37 'user_id': 'int', 38 'username': 'string', 39 'attributes': 'json', 40 'user_type': 'string', 41 }, 42 indices={ 43 'unique': 'username', 44 }, 45 ) 46 return self._users_pipe
Return the internal metadata pipe for users management.
49def register_user( 50 self, 51 user: mrsm.core.User, 52 debug: bool = False, 53 **kw: Any 54) -> SuccessTuple: 55 """Register a new user.""" 56 from meerschaum.utils.packages import attempt_import 57 from meerschaum.utils.sql import json_flavors 58 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 59 60 valid_tuple = valid_username(user.username) 61 if not valid_tuple[0]: 62 return valid_tuple 63 64 old_id = self.get_user_id(user, debug=debug) 65 66 if old_id is not None: 67 return False, f"User '{user}' already exists." 68 69 ### ensure users table exists 70 from meerschaum.connectors.sql.tables import get_tables 71 tables = get_tables(mrsm_instance=self, debug=debug) 72 73 import json 74 bind_variables = { 75 'username': user.username, 76 'email': user.email, 77 'password_hash': user.password_hash, 78 'user_type': user.type, 79 'attributes': ( 80 json.dumps(user.attributes) 81 if self.flavor not in json_flavors 82 else user.attributes 83 ), 84 } 85 if old_id is not None: 86 return False, f"User '{user.username}' already exists." 87 if old_id is None: 88 query = ( 89 sqlalchemy.insert(tables['users']). 90 values(**bind_variables) 91 ) 92 93 result = self.exec(query, debug=debug) 94 if result is None: 95 return False, f"Failed to register user '{user}'." 96 return True, f"Successfully registered user '{user}'."
Register a new user.
188def get_user_id( 189 self, 190 user: 'mrsm.core.User', 191 debug: bool = False 192) -> Optional[int]: 193 """If a user is registered, return the `user_id`.""" 194 ### ensure users table exists 195 from meerschaum.utils.packages import attempt_import 196 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 197 from meerschaum.connectors.sql.tables import get_tables 198 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 199 200 query = ( 201 sqlalchemy.select(users_tbl.c.user_id) 202 .where(users_tbl.c.username == user.username) 203 ) 204 205 result = self.value(query, debug=debug) 206 if result is not None: 207 return int(result) 208 return None
If a user is registered, return the user_id
.
282def get_users( 283 self, 284 debug: bool = False, 285 **kw: Any 286) -> List[str]: 287 """ 288 Get the registered usernames. 289 """ 290 ### ensure users table exists 291 from meerschaum.connectors.sql.tables import get_tables 292 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 293 from meerschaum.utils.packages import attempt_import 294 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 295 296 query = sqlalchemy.select(users_tbl.c.username) 297 298 return list(self.read(query, debug=debug)['username'])
Get the registered usernames.
133def edit_user( 134 self, 135 user: 'mrsm.core.User', 136 debug: bool = False, 137 **kw: Any 138) -> SuccessTuple: 139 """Update an existing user's metadata.""" 140 from meerschaum.utils.packages import attempt_import 141 from meerschaum.utils.sql import json_flavors 142 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 143 from meerschaum.connectors.sql.tables import get_tables 144 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 145 146 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 147 if user_id is None: 148 return False, ( 149 f"User '{user.username}' does not exist. " 150 f"Register user '{user.username}' before editing." 151 ) 152 user.user_id = user_id 153 154 import json 155 valid_tuple = valid_username(user.username) 156 if not valid_tuple[0]: 157 return valid_tuple 158 159 bind_variables = { 160 'user_id' : user_id, 161 'username' : user.username, 162 } 163 if user.password != '': 164 bind_variables['password_hash'] = user.password_hash 165 if user.email != '': 166 bind_variables['email'] = user.email 167 if user.attributes is not None and user.attributes != {}: 168 bind_variables['attributes'] = ( 169 json.dumps(user.attributes) if self.flavor not in json_flavors 170 else user.attributes 171 ) 172 if user.type != '': 173 bind_variables['user_type'] = user.type 174 175 query = ( 176 sqlalchemy 177 .update(users_tbl) 178 .values(**bind_variables) 179 .where(users_tbl.c.user_id == user_id) 180 ) 181 182 result = self.exec(query, debug=debug) 183 if result is None: 184 return False, f"Failed to edit user '{user}'." 185 return True, f"Successfully edited user '{user}'."
Update an existing user's metadata.
250def delete_user( 251 self, 252 user: 'mrsm.core.User', 253 debug: bool = False 254) -> SuccessTuple: 255 """Delete a user's record from the users table.""" 256 ### ensure users table exists 257 from meerschaum.connectors.sql.tables import get_tables 258 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 259 plugins = get_tables(mrsm_instance=self, debug=debug)['plugins'] 260 from meerschaum.utils.packages import attempt_import 261 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 262 263 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 264 265 if user_id is None: 266 return False, f"User '{user.username}' is not registered and cannot be deleted." 267 268 query = sqlalchemy.delete(users_tbl).where(users_tbl.c.user_id == user_id) 269 270 result = self.exec(query, debug=debug) 271 if result is None: 272 return False, f"Failed to delete user '{user}'." 273 274 query = sqlalchemy.delete(plugins).where(plugins.c.user_id == user_id) 275 result = self.exec(query, debug=debug) 276 if result is None: 277 return False, f"Failed to delete plugins of user '{user}'." 278 279 return True, f"Successfully deleted user '{user}'"
Delete a user's record from the users table.
301def get_user_password_hash( 302 self, 303 user: 'mrsm.core.User', 304 debug: bool = False, 305 **kw: Any 306) -> Optional[str]: 307 """ 308 Return the password has for a user. 309 **NOTE**: This may be dangerous and is only allowed if the security settings explicity allow it. 310 """ 311 from meerschaum.utils.debug import dprint 312 from meerschaum.connectors.sql.tables import get_tables 313 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 314 from meerschaum.utils.packages import attempt_import 315 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 316 317 if user.user_id is not None: 318 user_id = user.user_id 319 if debug: 320 dprint(f"Already given user_id: {user_id}") 321 else: 322 if debug: 323 dprint("Fetching user_id...") 324 user_id = self.get_user_id(user, debug=debug) 325 326 if user_id is None: 327 return None 328 329 query = sqlalchemy.select(users_tbl.c.password_hash).where(users_tbl.c.user_id == user_id) 330 331 return self.value(query, debug=debug)
Return the password has for a user. NOTE: This may be dangerous and is only allowed if the security settings explicity allow it.
334def get_user_type( 335 self, 336 user: 'mrsm.core.User', 337 debug: bool = False, 338 **kw: Any 339) -> Optional[str]: 340 """ 341 Return the user's type. 342 """ 343 from meerschaum.connectors.sql.tables import get_tables 344 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 345 from meerschaum.utils.packages import attempt_import 346 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 347 348 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 349 350 if user_id is None: 351 return None 352 353 query = sqlalchemy.select(users_tbl.c.user_type).where(users_tbl.c.user_id == user_id) 354 355 return self.value(query, debug=debug)
Return the user's type.
210def get_user_attributes( 211 self, 212 user: 'mrsm.core.User', 213 debug: bool = False 214) -> Union[Dict[str, Any], None]: 215 """ 216 Return the user's attributes. 217 """ 218 ### ensure users table exists 219 from meerschaum.utils.warnings import warn 220 from meerschaum.utils.packages import attempt_import 221 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 222 from meerschaum.connectors.sql.tables import get_tables 223 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 224 225 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 226 227 query = ( 228 sqlalchemy.select(users_tbl.c.attributes) 229 .where(users_tbl.c.user_id == user_id) 230 ) 231 232 result = self.value(query, debug=debug) 233 if result is not None and not isinstance(result, dict): 234 try: 235 result = dict(result) 236 _parsed = True 237 except Exception: 238 _parsed = False 239 if not _parsed: 240 try: 241 import json 242 result = json.loads(result) 243 _parsed = True 244 except Exception: 245 _parsed = False 246 if not _parsed: 247 warn(f"Received unexpected type for attributes: {result}") 248 return result
Return the user's attributes.
15@classmethod 16def from_uri( 17 cls, 18 uri: str, 19 label: Optional[str] = None, 20 as_dict: bool = False, 21) -> Union[ 22 'meerschaum.connectors.SQLConnector', 23 Dict[str, Union[str, int]], 24]: 25 """ 26 Create a new SQLConnector from a URI string. 27 28 Parameters 29 ---------- 30 uri: str 31 The URI connection string. 32 33 label: Optional[str], default None 34 If provided, use this as the connector label. 35 Otherwise use the determined database name. 36 37 as_dict: bool, default False 38 If `True`, return a dictionary of the keyword arguments 39 necessary to create a new `SQLConnector`, otherwise create a new object. 40 41 Returns 42 ------- 43 A new SQLConnector object or a dictionary of attributes (if `as_dict` is `True`). 44 """ 45 46 params = cls.parse_uri(uri) 47 params['uri'] = uri 48 flavor = params.get('flavor', None) 49 if not flavor or flavor not in cls.flavor_configs: 50 error(f"Invalid flavor '{flavor}' detected from the provided URI.") 51 52 if 'database' not in params: 53 error("Unable to determine the database from the provided URI.") 54 55 if flavor in ('sqlite', 'duckdb', 'geopackage'): 56 if params['database'] == ':memory:': 57 params['label'] = label or f'memory_{flavor}' 58 else: 59 params['label'] = label or params['database'].split(os.path.sep)[-1].lower() 60 else: 61 params['label'] = label or ( 62 ( 63 (params['username'] + '@' if 'username' in params else '') 64 + params.get('host', '') 65 + ('/' if 'host' in params else '') 66 + params.get('database', '') 67 ).lower() 68 ) 69 70 return cls(**params) if not as_dict else params
Create a new SQLConnector from a URI string.
Parameters
- uri (str): The URI connection string.
- label (Optional[str], default None): If provided, use this as the connector label. Otherwise use the determined database name.
- as_dict (bool, default False):
If
True
, return a dictionary of the keyword arguments necessary to create a newSQLConnector
, otherwise create a new object.
Returns
- A new SQLConnector object or a dictionary of attributes (if
as_dict
isTrue
).
73@staticmethod 74def parse_uri(uri: str) -> Dict[str, Any]: 75 """ 76 Parse a URI string into a dictionary of parameters. 77 78 Parameters 79 ---------- 80 uri: str 81 The database connection URI. 82 83 Returns 84 ------- 85 A dictionary of attributes. 86 87 Examples 88 -------- 89 >>> parse_uri('sqlite:////home/foo/bar.db') 90 {'database': '/home/foo/bar.db', 'flavor': 'sqlite'} 91 >>> parse_uri( 92 ... 'mssql+pyodbc://sa:supersecureSECRETPASSWORD123!@localhost:1439' 93 ... + '/master?driver=ODBC+Driver+17+for+SQL+Server' 94 ... ) 95 {'host': 'localhost', 'database': 'master', 'username': 'sa', 96 'password': 'supersecureSECRETPASSWORD123!', 'port': 1439, 'flavor': 'mssql', 97 'driver': 'ODBC Driver 17 for SQL Server'} 98 >>> 99 """ 100 from urllib.parse import parse_qs, urlparse 101 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 102 parser = sqlalchemy.engine.url.make_url 103 params = parser(uri).translate_connect_args() 104 params['flavor'] = uri.split(':')[0].split('+')[0] 105 if params['flavor'] == 'postgres': 106 params['flavor'] = 'postgresql' 107 if '?' in uri: 108 parsed_uri = urlparse(uri) 109 for key, value in parse_qs(parsed_uri.query).items(): 110 params.update({key: value[0]}) 111 112 if '--search_path' in params.get('options', ''): 113 params.update({'schema': params['options'].replace('--search_path=', '', 1)}) 114 return params
Parse a URI string into a dictionary of parameters.
Parameters
- uri (str): The database connection URI.
Returns
- A dictionary of attributes.
Examples
>>> parse_uri('sqlite:////home/foo/bar.db')
{'database': '/home/foo/bar.db', 'flavor': 'sqlite'}
>>> parse_uri(
... 'mssql+pyodbc://sa:supersecureSECRETPASSWORD123!@localhost:1439'
... + '/master?driver=ODBC+Driver+17+for+SQL+Server'
... )
{'host': 'localhost', 'database': 'master', 'username': 'sa',
'password': 'supersecureSECRETPASSWORD123!', 'port': 1439, 'flavor': 'mssql',
'driver': 'ODBC Driver 17 for SQL Server'}
>>>