meerschaum.connectors.sql
Subpackage for SQLConnector subclass
20class SQLConnector(InstanceConnector): 21 """ 22 Connect to SQL databases via `sqlalchemy`. 23 24 SQLConnectors may be used as Meerschaum instance connectors. 25 Read more about connectors and instances at 26 https://meerschaum.io/reference/connectors/ 27 28 """ 29 30 from ._create_engine import flavor_configs, create_engine 31 from ._sql import ( 32 read, 33 value, 34 exec, 35 execute, 36 to_sql, 37 exec_queries, 38 get_connection, 39 _cleanup_connections, 40 ) 41 from meerschaum.utils.sql import test_connection 42 from ._fetch import fetch, get_pipe_metadef 43 from ._cli import cli, _cli_exit 44 from ._compress import ( 45 get_pipe_size, 46 compress_pipe, 47 decompress_pipe, 48 apply_compression_policy, 49 _get_compress_settings, 50 _is_hypertable, 51 _get_columnstore_settings_query, 52 _get_columnstore_policy_query, 53 _get_columnstore_remove_policy_query, 54 _get_columnstore_disable_query, 55 ) 56 from ._maintenance import ( 57 vacuum_pipe, 58 analyze_pipe, 59 _run_in_autocommit, 60 _get_vacuum_queries, 61 _get_analyze_query, 62 ) 63 from ._partition import ( 64 _should_partition, 65 _get_partition_column, 66 _get_partition_count, 67 _get_chunk_count_timescaledb, 68 get_partition_info, 69 partition_pipe, 70 _partition_bounds, 71 _partition_literal, 72 _partition_name, 73 _get_partition_ranges_for_df, 74 _get_initial_partition_bounds, 75 _create_missing_partitions, 76 _create_missing_partitions_pg, 77 _create_missing_partitions_mysql, 78 _get_mysql_max_partition_bound, 79 _partition_function_name, 80 _partition_scheme_name, 81 _get_partition_boundary_values, 82 _get_mssql_partition_creation_queries, 83 _get_mssql_max_partition_boundary, 84 _create_missing_partitions_mssql, 85 _get_partition_cleanup_queries, 86 ) 87 from ._pipes import ( 88 fetch_pipes_keys, 89 create_indices, 90 drop_indices, 91 get_create_index_queries, 92 get_drop_index_queries, 93 get_add_columns_queries, 94 get_alter_columns_queries, 95 delete_pipe, 96 get_pipe_data, 97 get_pipe_docs, 98 get_pipe_data_query, 99 register_pipe, 100 edit_pipe, 101 get_pipe_id, 102 get_pipe_attributes, 103 sync_pipe, 104 sync_pipe_inplace, 105 get_sync_time, 106 pipe_exists, 107 get_pipe_rowcount, 108 drop_pipe, 109 clear_pipe, 110 deduplicate_pipe, 111 get_pipe_table, 112 get_pipe_columns_types, 113 get_to_sql_dtype, 114 get_pipe_schema, 115 create_pipe_table_from_df, 116 get_pipe_columns_indices, 117 get_temporary_target, 118 create_pipe_indices, 119 drop_pipe_indices, 120 get_pipe_index_names, 121 _init_geopackage_pipe, 122 ) 123 from ._plugins import ( 124 get_plugins_pipe, 125 register_plugin, 126 delete_plugin, 127 get_plugin_id, 128 get_plugin_version, 129 get_plugins, 130 get_plugin_user_id, 131 get_plugin_username, 132 get_plugin_attributes, 133 ) 134 from ._users import ( 135 get_users_pipe, 136 register_user, 137 get_user_id, 138 get_users, 139 edit_user, 140 delete_user, 141 get_user_password_hash, 142 get_user_type, 143 get_user_attributes, 144 ) 145 from ._uri import from_uri, parse_uri 146 from ._instance import ( 147 _log_temporary_tables_creation, 148 _drop_temporary_table, 149 _drop_temporary_tables, 150 _drop_old_temporary_tables, 151 ) 152 153 def __init__( 154 self, 155 label: Optional[str] = None, 156 flavor: Optional[str] = None, 157 wait: bool = False, 158 connect: bool = False, 159 debug: bool = False, 160 **kw: Any 161 ): 162 """ 163 Parameters 164 ---------- 165 label: str, default 'main' 166 The identifying label for the connector. 167 E.g. for `sql:main`, 'main' is the label. 168 Defaults to 'main'. 169 170 flavor: Optional[str], default None 171 The database flavor, e.g. 172 `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc. 173 To see supported flavors, run the `bootstrap connectors` command. 174 175 wait: bool, default False 176 If `True`, block until a database connection has been made. 177 Defaults to `False`. 178 179 connect: bool, default False 180 If `True`, immediately attempt to connect the database and raise 181 a warning if the connection fails. 182 Defaults to `False`. 183 184 debug: bool, default False 185 Verbosity toggle. 186 Defaults to `False`. 187 188 kw: Any 189 All other arguments will be passed to the connector's attributes. 190 Therefore, a connector may be made without being registered, 191 as long enough parameters are supplied to the constructor. 192 """ 193 if 'uri' in kw: 194 uri = kw['uri'] 195 if uri.startswith('postgres') and not uri.startswith('postgresql'): 196 uri = uri.replace('postgres', 'postgresql', 1) 197 if uri.startswith('postgresql') and not uri.startswith('postgresql+'): 198 uri = uri.replace('postgresql://', 'postgresql+psycopg://', 1) 199 if uri.startswith('timescaledb://'): 200 uri = uri.replace('timescaledb://', 'postgresql+psycopg://', 1) 201 flavor = 'timescaledb' 202 if uri.startswith('timescaledb-ha://'): 203 uri = uri.replace('timescaledb-ha://', 'postgresql+psycopg://', 1) 204 flavor = 'timescaledb-ha' 205 if uri.startswith('postgis://'): 206 uri = uri.replace('postgis://', 'postgresql+psycopg://', 1) 207 flavor = 'postgis' 208 kw['uri'] = uri 209 from_uri_params = self.from_uri(kw['uri'], as_dict=True) 210 label = label or from_uri_params.get('label', None) 211 _ = from_uri_params.pop('label', None) 212 213 ### Sometimes the flavor may be provided with a URI. 214 kw.update(from_uri_params) 215 if flavor: 216 kw['flavor'] = flavor 217 218 ### set __dict__ in base class 219 super().__init__( 220 'sql', 221 label = label or self.__dict__.get('label', None), 222 **kw 223 ) 224 225 if self.__dict__.get('flavor', None) in ('sqlite', 'geopackage'): 226 self._reset_attributes() 227 self._set_attributes( 228 'sql', 229 label = label, 230 inherit_default = False, 231 **kw 232 ) 233 ### For backwards compatability reasons, set the path for sql:local if its missing. 234 if ( 235 self.label == 'local' 236 and self.__dict__.get('database', None) in (None, '{SQLITE_DB_PATH}') 237 ): 238 import meerschaum.config.paths as paths 239 self.database = paths.SQLITE_DB_PATH.as_posix() 240 241 ### ensure flavor and label are set accordingly 242 if 'flavor' not in self.__dict__: 243 if flavor is None and 'uri' not in self.__dict__: 244 raise ValueError( 245 f" Missing flavor. Provide flavor as a key for '{self}'." 246 ) 247 self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None) 248 249 if self.flavor == 'postgres': 250 self.flavor = 'postgresql' 251 252 self._debug = debug 253 ### Store the PID and thread at initialization 254 ### so we can dispose of the Pool in child processes or threads. 255 import os 256 import threading 257 self._pid = os.getpid() 258 self._thread_ident = threading.current_thread().ident 259 self._sessions = {} 260 self._locks = {'_sessions': threading.RLock(), } 261 262 ### verify the flavor's requirements are met 263 if self.flavor not in self.flavor_configs: 264 error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector") 265 if not self.__dict__.get('uri'): 266 self.verify_attributes( 267 self.flavor_configs[self.flavor].get('requirements', set()), 268 debug=debug, 269 ) 270 271 if wait: 272 from meerschaum.connectors.poll import retry_connect 273 retry_connect(connector=self, debug=debug) 274 275 if connect: 276 if not self.test_connection(debug=debug): 277 warn(f"Failed to connect with connector '{self}'!", stack=False) 278 279 @property 280 def Session(self): 281 if '_Session' not in self.__dict__: 282 if self.engine is None: 283 return None 284 285 from meerschaum.utils.packages import attempt_import 286 sqlalchemy_orm = attempt_import('sqlalchemy.orm', lazy=False) 287 session_factory = sqlalchemy_orm.sessionmaker(self.engine) 288 self._Session = sqlalchemy_orm.scoped_session(session_factory) 289 290 return self._Session 291 292 @property 293 def engine(self): 294 """ 295 Return the SQLAlchemy engine connected to the configured database. 296 """ 297 import os 298 import threading 299 if '_engine' not in self.__dict__: 300 self._engine, self._engine_str = self.create_engine(include_uri=True) 301 302 same_process = os.getpid() == self._pid 303 same_thread = threading.current_thread().ident == self._thread_ident 304 305 ### handle child processes 306 if not same_process: 307 self._pid = os.getpid() 308 self._thread = threading.current_thread() 309 warn("Different PID detected. Disposing of connections...") 310 self._engine.dispose() 311 312 ### handle different threads 313 if not same_thread: 314 if self.flavor == 'duckdb': 315 warn("Different thread detected.") 316 self._engine.dispose() 317 318 return self._engine 319 320 @property 321 def DATABASE_URL(self) -> str: 322 """ 323 Return the URI connection string (alias for `SQLConnector.URI`. 324 """ 325 _ = self.engine 326 return str(self._engine_str) 327 328 @property 329 def URI(self) -> str: 330 """ 331 Return the URI connection string. 332 """ 333 _ = self.engine 334 return str(self._engine_str) 335 336 @property 337 def IS_THREAD_SAFE(self) -> str: 338 """ 339 Return whether this connector may be multithreaded. 340 """ 341 if self.flavor in ('duckdb', 'oracle'): 342 return False 343 if self.flavor in ('sqlite', 'geopackage'): 344 return ':memory:' not in self.URI 345 return True 346 347 @property 348 def metadata(self): 349 """ 350 Return the metadata bound to this configured schema. 351 """ 352 from meerschaum.utils.packages import attempt_import 353 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 354 if '_metadata' not in self.__dict__: 355 self._metadata = sqlalchemy.MetaData(schema=self.schema) 356 return self._metadata 357 358 @property 359 def instance_schema(self): 360 """ 361 Return the schema name for Meerschaum tables. 362 """ 363 return self.schema 364 365 @property 366 def internal_schema(self): 367 """ 368 Return the schema name for internal tables. 369 """ 370 from meerschaum._internal.static import STATIC_CONFIG 371 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 372 schema_name = self.__dict__.get('internal_schema', None) or ( 373 STATIC_CONFIG['sql']['internal_schema'] 374 if self.flavor not in NO_SCHEMA_FLAVORS 375 else self.schema 376 ) 377 378 if '_internal_schema' not in self.__dict__: 379 self._internal_schema = schema_name 380 return self._internal_schema 381 382 @property 383 def db(self) -> Optional[databases.Database]: 384 from meerschaum.utils.packages import attempt_import 385 databases = attempt_import('databases', lazy=False, install=True) 386 url = self.DATABASE_URL 387 if 'mysql' in url: 388 url = url.replace('+pymysql', '') 389 if '_db' not in self.__dict__: 390 try: 391 self._db = databases.Database(url) 392 except KeyError: 393 ### Likely encountered an unsupported flavor. 394 from meerschaum.utils.warnings import warn 395 self._db = None 396 return self._db 397 398 @property 399 def db_version(self) -> Union[str, None]: 400 """ 401 Return the database version. 402 """ 403 _db_version = self.__dict__.get('_db_version', None) 404 if _db_version is not None: 405 return _db_version 406 407 from meerschaum.utils.sql import get_db_version 408 self._db_version = get_db_version(self) 409 return self._db_version 410 411 @property 412 def schema(self) -> Union[str, None]: 413 """ 414 Return the default schema to use. 415 A value of `None` will not prepend a schema. 416 """ 417 if 'schema' in self.__dict__: 418 return self.__dict__['schema'] 419 420 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 421 if self.flavor in NO_SCHEMA_FLAVORS: 422 self.__dict__['schema'] = None 423 return None 424 425 sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False) 426 _schema = sqlalchemy.inspect(self.engine).default_schema_name 427 self.__dict__['schema'] = _schema 428 return _schema 429 430 def get_metadata_cache_path(self, kind: str = 'json') -> pathlib.Path: 431 """ 432 Return the path to the file to which to write metadata cache. 433 """ 434 import meerschaum.config.paths as paths 435 filename = ( 436 f'{self.label}-metadata.pkl' 437 if kind == 'pkl' 438 else f'{self.label}.json' 439 ) 440 return paths.SQL_CONN_CACHE_RESOURCES_PATH / filename 441 442 def __getstate__(self): 443 return self.__dict__ 444 445 def __setstate__(self, d): 446 self.__dict__.update(d) 447 448 def __call__(self): 449 return self
Connect to SQL databases via sqlalchemy.
SQLConnectors may be used as Meerschaum instance connectors. Read more about connectors and instances at https://meerschaum.io/reference/connectors/
153 def __init__( 154 self, 155 label: Optional[str] = None, 156 flavor: Optional[str] = None, 157 wait: bool = False, 158 connect: bool = False, 159 debug: bool = False, 160 **kw: Any 161 ): 162 """ 163 Parameters 164 ---------- 165 label: str, default 'main' 166 The identifying label for the connector. 167 E.g. for `sql:main`, 'main' is the label. 168 Defaults to 'main'. 169 170 flavor: Optional[str], default None 171 The database flavor, e.g. 172 `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc. 173 To see supported flavors, run the `bootstrap connectors` command. 174 175 wait: bool, default False 176 If `True`, block until a database connection has been made. 177 Defaults to `False`. 178 179 connect: bool, default False 180 If `True`, immediately attempt to connect the database and raise 181 a warning if the connection fails. 182 Defaults to `False`. 183 184 debug: bool, default False 185 Verbosity toggle. 186 Defaults to `False`. 187 188 kw: Any 189 All other arguments will be passed to the connector's attributes. 190 Therefore, a connector may be made without being registered, 191 as long enough parameters are supplied to the constructor. 192 """ 193 if 'uri' in kw: 194 uri = kw['uri'] 195 if uri.startswith('postgres') and not uri.startswith('postgresql'): 196 uri = uri.replace('postgres', 'postgresql', 1) 197 if uri.startswith('postgresql') and not uri.startswith('postgresql+'): 198 uri = uri.replace('postgresql://', 'postgresql+psycopg://', 1) 199 if uri.startswith('timescaledb://'): 200 uri = uri.replace('timescaledb://', 'postgresql+psycopg://', 1) 201 flavor = 'timescaledb' 202 if uri.startswith('timescaledb-ha://'): 203 uri = uri.replace('timescaledb-ha://', 'postgresql+psycopg://', 1) 204 flavor = 'timescaledb-ha' 205 if uri.startswith('postgis://'): 206 uri = uri.replace('postgis://', 'postgresql+psycopg://', 1) 207 flavor = 'postgis' 208 kw['uri'] = uri 209 from_uri_params = self.from_uri(kw['uri'], as_dict=True) 210 label = label or from_uri_params.get('label', None) 211 _ = from_uri_params.pop('label', None) 212 213 ### Sometimes the flavor may be provided with a URI. 214 kw.update(from_uri_params) 215 if flavor: 216 kw['flavor'] = flavor 217 218 ### set __dict__ in base class 219 super().__init__( 220 'sql', 221 label = label or self.__dict__.get('label', None), 222 **kw 223 ) 224 225 if self.__dict__.get('flavor', None) in ('sqlite', 'geopackage'): 226 self._reset_attributes() 227 self._set_attributes( 228 'sql', 229 label = label, 230 inherit_default = False, 231 **kw 232 ) 233 ### For backwards compatability reasons, set the path for sql:local if its missing. 234 if ( 235 self.label == 'local' 236 and self.__dict__.get('database', None) in (None, '{SQLITE_DB_PATH}') 237 ): 238 import meerschaum.config.paths as paths 239 self.database = paths.SQLITE_DB_PATH.as_posix() 240 241 ### ensure flavor and label are set accordingly 242 if 'flavor' not in self.__dict__: 243 if flavor is None and 'uri' not in self.__dict__: 244 raise ValueError( 245 f" Missing flavor. Provide flavor as a key for '{self}'." 246 ) 247 self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None) 248 249 if self.flavor == 'postgres': 250 self.flavor = 'postgresql' 251 252 self._debug = debug 253 ### Store the PID and thread at initialization 254 ### so we can dispose of the Pool in child processes or threads. 255 import os 256 import threading 257 self._pid = os.getpid() 258 self._thread_ident = threading.current_thread().ident 259 self._sessions = {} 260 self._locks = {'_sessions': threading.RLock(), } 261 262 ### verify the flavor's requirements are met 263 if self.flavor not in self.flavor_configs: 264 error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector") 265 if not self.__dict__.get('uri'): 266 self.verify_attributes( 267 self.flavor_configs[self.flavor].get('requirements', set()), 268 debug=debug, 269 ) 270 271 if wait: 272 from meerschaum.connectors.poll import retry_connect 273 retry_connect(connector=self, debug=debug) 274 275 if connect: 276 if not self.test_connection(debug=debug): 277 warn(f"Failed to connect with connector '{self}'!", stack=False)
Parameters
- label (str, default 'main'):
The identifying label for the connector.
E.g. for
sql:main, 'main' is the label. Defaults to 'main'. - flavor (Optional[str], default None):
The database flavor, e.g.
'sqlite','postgresql','cockroachdb', etc. To see supported flavors, run thebootstrap connectorscommand. - wait (bool, default False):
If
True, block until a database connection has been made. Defaults toFalse. - connect (bool, default False):
If
True, immediately attempt to connect the database and raise a warning if the connection fails. Defaults toFalse. - debug (bool, default False):
Verbosity toggle.
Defaults to
False. - kw (Any): All other arguments will be passed to the connector's attributes. Therefore, a connector may be made without being registered, as long enough parameters are supplied to the constructor.
279 @property 280 def Session(self): 281 if '_Session' not in self.__dict__: 282 if self.engine is None: 283 return None 284 285 from meerschaum.utils.packages import attempt_import 286 sqlalchemy_orm = attempt_import('sqlalchemy.orm', lazy=False) 287 session_factory = sqlalchemy_orm.sessionmaker(self.engine) 288 self._Session = sqlalchemy_orm.scoped_session(session_factory) 289 290 return self._Session
292 @property 293 def engine(self): 294 """ 295 Return the SQLAlchemy engine connected to the configured database. 296 """ 297 import os 298 import threading 299 if '_engine' not in self.__dict__: 300 self._engine, self._engine_str = self.create_engine(include_uri=True) 301 302 same_process = os.getpid() == self._pid 303 same_thread = threading.current_thread().ident == self._thread_ident 304 305 ### handle child processes 306 if not same_process: 307 self._pid = os.getpid() 308 self._thread = threading.current_thread() 309 warn("Different PID detected. Disposing of connections...") 310 self._engine.dispose() 311 312 ### handle different threads 313 if not same_thread: 314 if self.flavor == 'duckdb': 315 warn("Different thread detected.") 316 self._engine.dispose() 317 318 return self._engine
Return the SQLAlchemy engine connected to the configured database.
320 @property 321 def DATABASE_URL(self) -> str: 322 """ 323 Return the URI connection string (alias for `SQLConnector.URI`. 324 """ 325 _ = self.engine 326 return str(self._engine_str)
Return the URI connection string (alias for SQLConnector.URI.
328 @property 329 def URI(self) -> str: 330 """ 331 Return the URI connection string. 332 """ 333 _ = self.engine 334 return str(self._engine_str)
Return the URI connection string.
336 @property 337 def IS_THREAD_SAFE(self) -> str: 338 """ 339 Return whether this connector may be multithreaded. 340 """ 341 if self.flavor in ('duckdb', 'oracle'): 342 return False 343 if self.flavor in ('sqlite', 'geopackage'): 344 return ':memory:' not in self.URI 345 return True
Return whether this connector may be multithreaded.
347 @property 348 def metadata(self): 349 """ 350 Return the metadata bound to this configured schema. 351 """ 352 from meerschaum.utils.packages import attempt_import 353 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 354 if '_metadata' not in self.__dict__: 355 self._metadata = sqlalchemy.MetaData(schema=self.schema) 356 return self._metadata
Return the metadata bound to this configured schema.
358 @property 359 def instance_schema(self): 360 """ 361 Return the schema name for Meerschaum tables. 362 """ 363 return self.schema
Return the schema name for Meerschaum tables.
365 @property 366 def internal_schema(self): 367 """ 368 Return the schema name for internal tables. 369 """ 370 from meerschaum._internal.static import STATIC_CONFIG 371 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 372 schema_name = self.__dict__.get('internal_schema', None) or ( 373 STATIC_CONFIG['sql']['internal_schema'] 374 if self.flavor not in NO_SCHEMA_FLAVORS 375 else self.schema 376 ) 377 378 if '_internal_schema' not in self.__dict__: 379 self._internal_schema = schema_name 380 return self._internal_schema
Return the schema name for internal tables.
382 @property 383 def db(self) -> Optional[databases.Database]: 384 from meerschaum.utils.packages import attempt_import 385 databases = attempt_import('databases', lazy=False, install=True) 386 url = self.DATABASE_URL 387 if 'mysql' in url: 388 url = url.replace('+pymysql', '') 389 if '_db' not in self.__dict__: 390 try: 391 self._db = databases.Database(url) 392 except KeyError: 393 ### Likely encountered an unsupported flavor. 394 from meerschaum.utils.warnings import warn 395 self._db = None 396 return self._db
398 @property 399 def db_version(self) -> Union[str, None]: 400 """ 401 Return the database version. 402 """ 403 _db_version = self.__dict__.get('_db_version', None) 404 if _db_version is not None: 405 return _db_version 406 407 from meerschaum.utils.sql import get_db_version 408 self._db_version = get_db_version(self) 409 return self._db_version
Return the database version.
411 @property 412 def schema(self) -> Union[str, None]: 413 """ 414 Return the default schema to use. 415 A value of `None` will not prepend a schema. 416 """ 417 if 'schema' in self.__dict__: 418 return self.__dict__['schema'] 419 420 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 421 if self.flavor in NO_SCHEMA_FLAVORS: 422 self.__dict__['schema'] = None 423 return None 424 425 sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False) 426 _schema = sqlalchemy.inspect(self.engine).default_schema_name 427 self.__dict__['schema'] = _schema 428 return _schema
Return the default schema to use.
A value of None will not prepend a schema.
430 def get_metadata_cache_path(self, kind: str = 'json') -> pathlib.Path: 431 """ 432 Return the path to the file to which to write metadata cache. 433 """ 434 import meerschaum.config.paths as paths 435 filename = ( 436 f'{self.label}-metadata.pkl' 437 if kind == 'pkl' 438 else f'{self.label}.json' 439 ) 440 return paths.SQL_CONN_CACHE_RESOURCES_PATH / filename
Return the path to the file to which to write metadata cache.
45def create_engine( 46 self, 47 include_uri: bool = False, 48 debug: bool = False, 49 **kw 50) -> 'sqlalchemy.engine.Engine': 51 """Create a sqlalchemy engine by building the engine string.""" 52 from meerschaum.utils.packages import attempt_import 53 from meerschaum.utils.warnings import error, warn 54 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 55 import urllib 56 import copy 57 ### Install and patch required drivers. 58 if self.flavor in install_flavor_drivers: 59 _ = attempt_import( 60 *install_flavor_drivers[self.flavor], 61 debug=debug, 62 lazy=False, 63 warn=False, 64 ) 65 if self.flavor == 'mssql': 66 _init_mssql_sqlalchemy() 67 68 ### supplement missing values with defaults (e.g. port number) 69 for a, value in flavor_configs[self.flavor]['defaults'].items(): 70 if a not in self.__dict__: 71 self.__dict__[a] = value 72 73 ### Verify that everything is in order. 74 if self.flavor not in flavor_configs: 75 error(f"Cannot create a connector with the flavor '{self.flavor}'.") 76 77 _engine = flavor_configs[self.flavor].get('engine', None) 78 _username = self.__dict__.get('username', None) 79 _password = self.__dict__.get('password', None) 80 _host = self.__dict__.get('host', None) 81 _port = self.__dict__.get('port', None) 82 _database = self.__dict__.get('database', None) 83 if _database == '{SQLITE_DB_PATH}': 84 import meerschaum.config.paths as paths 85 _database = paths.SQLITE_DB_PATH.as_posix() 86 _options = self.__dict__.get('options', {}) 87 if isinstance(_options, str): 88 _options = dict(urllib.parse.parse_qsl(_options)) 89 _uri = self.__dict__.get('uri', None) 90 91 ### Handle registering specific dialects (due to installing in virtual environments). 92 if self.flavor in flavor_dialects: 93 sqlalchemy.dialects.registry.register(*flavor_dialects[self.flavor]) 94 95 ### self._sys_config was deepcopied and can be updated safely 96 if self.flavor in ("sqlite", "duckdb", "geopackage"): 97 engine_str = f"{_engine}:///{_database}" if not _uri else _uri 98 if 'create_engine' not in self._sys_config: 99 self._sys_config['create_engine'] = {} 100 if 'connect_args' not in self._sys_config['create_engine']: 101 self._sys_config['create_engine']['connect_args'] = {} 102 self._sys_config['create_engine']['connect_args'].update({"check_same_thread": False}) 103 else: 104 engine_str = ( 105 _engine + "://" + (_username if _username is not None else '') + 106 ((":" + urllib.parse.quote_plus(_password)) if _password is not None else '') + 107 "@" + _host + ((":" + str(_port)) if _port is not None else '') + 108 (("/" + _database) if _database is not None else '') 109 + (("?" + urllib.parse.urlencode(_options)) if _options else '') 110 ) if not _uri else _uri 111 112 ### Sometimes the timescaledb:// flavor can slip in. 113 if _uri and self.flavor in _uri: 114 if self.flavor in ('timescaledb', 'timescaledb-ha', 'postgis'): 115 engine_str = engine_str.replace(self.flavor, 'postgresql', 1) 116 elif _uri.startswith('postgresql://'): 117 engine_str = engine_str.replace('postgresql://', 'postgresql+psycopg2://') 118 119 if debug: 120 dprint( 121 ( 122 (engine_str.replace(':' + _password, ':' + ('*' * len(_password)))) 123 if _password is not None else engine_str 124 ) + '\n' + f"{self._sys_config.get('create_engine', {}).get('connect_args', {})}" 125 ) 126 127 _kw_copy = copy.deepcopy(kw) 128 129 ### NOTE: Order of inheritance: 130 ### 1. Defaults 131 ### 2. System configuration 132 ### 3. Connector configuration 133 ### 4. Keyword arguments 134 _create_engine_args = flavor_configs.get(self.flavor, {}).get('create_engine', {}) 135 def _apply_create_engine_args(update): 136 if 'ALL' not in flavor_configs[self.flavor].get('omit_create_engine', {}): 137 _create_engine_args.update( 138 { k: v for k, v in update.items() 139 if 'omit_create_engine' not in flavor_configs[self.flavor] 140 or k not in flavor_configs[self.flavor].get('omit_create_engine') 141 } 142 ) 143 _apply_create_engine_args(self._sys_config.get('create_engine', {})) 144 _apply_create_engine_args(self.__dict__.get('create_engine', {})) 145 _apply_create_engine_args(_kw_copy) 146 147 try: 148 engine = sqlalchemy.create_engine( 149 engine_str, 150 ### I know this looks confusing, and maybe it's bad code, 151 ### but it's simple. It dynamically parses the config string 152 ### and splits it to separate the class name (QueuePool) 153 ### from the module name (sqlalchemy.pool). 154 poolclass = getattr( 155 attempt_import( 156 ".".join(self._sys_config['poolclass'].split('.')[:-1]) 157 ), 158 self._sys_config['poolclass'].split('.')[-1] 159 ), 160 echo = debug, 161 **_create_engine_args 162 ) 163 except Exception: 164 warn(f"Failed to create connector '{self}':\n{traceback.format_exc()}", stack=False) 165 engine = None 166 167 if include_uri: 168 return engine, engine_str 169 return engine
Create a sqlalchemy engine by building the engine string.
35def read( 36 self, 37 query_or_table: Union[str, sqlalchemy.Query], 38 params: Union[Dict[str, Any], List[str], None] = None, 39 dtype: Optional[Dict[str, Any]] = None, 40 coerce_float: bool = True, 41 chunksize: Optional[int] = -1, 42 workers: Optional[int] = None, 43 chunk_hook: Optional[Callable[[pandas.DataFrame], Any]] = None, 44 as_hook_results: bool = False, 45 chunks: Optional[int] = None, 46 schema: Optional[str] = None, 47 as_chunks: bool = False, 48 as_iterator: bool = False, 49 as_dask: bool = False, 50 index_col: Optional[str] = None, 51 silent: bool = False, 52 debug: bool = False, 53 **kw: Any 54) -> Union[ 55 pandas.DataFrame, 56 dask.DataFrame, 57 List[pandas.DataFrame], 58 List[Any], 59 None, 60]: 61 """ 62 Read a SQL query or table into a pandas dataframe. 63 64 Parameters 65 ---------- 66 query_or_table: Union[str, sqlalchemy.Query] 67 The SQL query (sqlalchemy Query or string) or name of the table from which to select. 68 69 params: Optional[Dict[str, Any]], default None 70 `List` or `Dict` of parameters to pass to `pandas.read_sql()`. 71 See the pandas documentation for more information: 72 https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html 73 74 dtype: Optional[Dict[str, Any]], default None 75 A dictionary of data types to pass to `pandas.read_sql()`. 76 See the pandas documentation for more information: 77 https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html 78 79 chunksize: Optional[int], default -1 80 How many chunks to read at a time. `None` will read everything in one large chunk. 81 Defaults to system configuration. 82 83 **NOTE:** DuckDB does not allow for chunking. 84 85 workers: Optional[int], default None 86 How many threads to use when consuming the generator. 87 Only applies if `chunk_hook` is provided. 88 89 chunk_hook: Optional[Callable[[pandas.DataFrame], Any]], default None 90 Hook function to execute once per chunk, e.g. writing and reading chunks intermittently. 91 See `--sync-chunks` for an example. 92 **NOTE:** `as_iterator` MUST be False (default). 93 94 as_hook_results: bool, default False 95 If `True`, return a `List` of the outputs of the hook function. 96 Only applicable if `chunk_hook` is not None. 97 98 **NOTE:** `as_iterator` MUST be `False` (default). 99 100 chunks: Optional[int], default None 101 Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and 102 return into a single dataframe. 103 For example, to limit the returned dataframe to 100,000 rows, 104 you could specify a `chunksize` of `1000` and `chunks` of `100`. 105 106 schema: Optional[str], default None 107 If just a table name is provided, optionally specify the table schema. 108 Defaults to `SQLConnector.schema`. 109 110 as_chunks: bool, default False 111 If `True`, return a list of DataFrames. 112 Otherwise return a single DataFrame. 113 114 as_iterator: bool, default False 115 If `True`, return the pandas DataFrame iterator. 116 `chunksize` must not be `None` (falls back to 1000 if so), 117 and hooks are not called in this case. 118 119 index_col: Optional[str], default None 120 If using Dask, use this column as the index column. 121 If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame. 122 123 silent: bool, default False 124 If `True`, don't raise warnings in case of errors. 125 Defaults to `False`. 126 127 Returns 128 ------- 129 A `pd.DataFrame` (default case), or an iterator, or a list of dataframes / iterators, 130 or `None` if something breaks. 131 132 """ 133 if chunks is not None and chunks <= 0: 134 return [] 135 136 from meerschaum.utils.sql import sql_item_name, truncate_item_name 137 from meerschaum.utils.dtypes import are_dtypes_equal, coerce_timezone 138 from meerschaum.utils.dtypes.sql import TIMEZONE_NAIVE_FLAVORS 139 from meerschaum.utils.packages import attempt_import, import_pandas 140 from meerschaum.utils.pool import get_pool 141 from meerschaum.utils.dataframe import chunksize_to_npartitions, get_numeric_cols 142 from meerschaum.utils.misc import filter_arguments 143 import warnings 144 import traceback 145 from decimal import Decimal 146 147 pd = import_pandas() 148 dd = None 149 150 is_dask = 'dask' in pd.__name__ 151 pandas = attempt_import('pandas') 152 is_dask = dd is not None 153 npartitions = chunksize_to_npartitions(chunksize) 154 if is_dask: 155 chunksize = None 156 157 schema = schema or self.schema 158 utc_dt_cols = [ 159 col 160 for col, typ in dtype.items() 161 if are_dtypes_equal(typ, 'datetime') and 'utc' in typ.lower() 162 ] if dtype else [] 163 164 if dtype and utc_dt_cols and self.flavor in TIMEZONE_NAIVE_FLAVORS: 165 dtype = dtype.copy() 166 for col in utc_dt_cols: 167 dtype[col] = 'datetime64[us]' 168 169 pool = get_pool(workers=workers) 170 sqlalchemy = attempt_import("sqlalchemy", lazy=False) 171 default_chunksize = self._sys_config.get('chunksize', None) 172 chunksize = chunksize if chunksize != -1 else default_chunksize 173 if chunksize is None and as_iterator: 174 if not silent and self.flavor not in _disallow_chunks_flavors: 175 warn( 176 "An iterator may only be generated if chunksize is not None.\n" 177 + "Falling back to a chunksize of 1000.", stacklevel=3, 178 ) 179 chunksize = 1000 180 if chunksize is not None and self.flavor in _max_chunks_flavors: 181 if chunksize > _max_chunks_flavors[self.flavor]: 182 if chunksize != default_chunksize: 183 warn( 184 f"The specified chunksize of {chunksize} exceeds the maximum of " 185 + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n" 186 + f" Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.", 187 stacklevel=3, 188 ) 189 chunksize = _max_chunks_flavors[self.flavor] 190 191 if chunksize is not None and self.flavor in _disallow_chunks_flavors: 192 chunksize = None 193 194 if debug: 195 import time 196 start = time.perf_counter() 197 dprint(f"[{self}]\n{query_or_table}") 198 dprint(f"[{self}] Fetching with chunksize: {chunksize}") 199 200 ### This might be sqlalchemy object or the string of a table name. 201 ### We check for spaces and quotes to see if it might be a weird table. 202 if ( 203 ' ' not in str(query_or_table) 204 or ( 205 ' ' in str(query_or_table) 206 and str(query_or_table).startswith('"') 207 and str(query_or_table).endswith('"') 208 ) 209 ): 210 truncated_table_name = truncate_item_name(str(query_or_table), self.flavor) 211 if truncated_table_name != str(query_or_table) and not silent: 212 if self.flavor not in ('oracle', 'mysql', 'mariadb'): 213 warn( 214 f"Table '{query_or_table}' is too long for '{self.flavor}'," 215 + f" will instead read the table '{truncated_table_name}'." 216 ) 217 218 query_or_table = sql_item_name(str(query_or_table), self.flavor, schema) 219 if debug: 220 dprint(f"[{self}] Reading from table {query_or_table}") 221 formatted_query = sqlalchemy.text("SELECT * FROM " + str(query_or_table)) 222 str_query = f"SELECT * FROM {query_or_table}" 223 else: 224 str_query = query_or_table 225 226 formatted_query = ( 227 sqlalchemy.text(str_query) 228 if not is_dask and isinstance(str_query, str) 229 else format_sql_query_for_dask(str_query) 230 ) 231 232 def _get_chunk_args_kwargs(_chunk): 233 return filter_arguments( 234 chunk_hook, 235 _chunk, 236 workers=workers, 237 chunksize=chunksize, 238 debug=debug, 239 **kw 240 ) 241 242 chunk_list = [] 243 chunk_hook_results = [] 244 def _process_chunk(_chunk, _retry_on_failure: bool = True): 245 if self.flavor in TIMEZONE_NAIVE_FLAVORS: 246 for col in utc_dt_cols: 247 _chunk[col] = coerce_timezone(_chunk[col], strip_utc=False) 248 if not as_hook_results: 249 chunk_list.append(_chunk) 250 251 if chunk_hook is None: 252 return None 253 254 chunk_args, chunk_kwargs = _get_chunk_args_kwargs(_chunk) 255 256 result = None 257 try: 258 result = chunk_hook(*chunk_args, **chunk_kwargs) 259 except Exception: 260 result = False, traceback.format_exc() 261 from meerschaum.utils.formatting import get_console 262 if not silent: 263 get_console().print_exception() 264 265 ### If the chunk fails to process, try it again one more time. 266 if isinstance(result, tuple) and result[0] is False: 267 if _retry_on_failure: 268 return _process_chunk(_chunk, _retry_on_failure=False) 269 270 return result 271 272 try: 273 stream_results = not as_iterator and chunk_hook is not None and chunksize is not None 274 with warnings.catch_warnings(): 275 warnings.filterwarnings('ignore', 'case sensitivity issues') 276 277 read_sql_query_kwargs = { 278 'params': params, 279 'dtype': dtype, 280 'coerce_float': coerce_float, 281 'index_col': index_col, 282 } 283 if is_dask: 284 if index_col is None: 285 dd = None 286 pd = attempt_import('pandas') 287 read_sql_query_kwargs.update({ 288 'chunksize': chunksize, 289 }) 290 else: 291 read_sql_query_kwargs.update({ 292 'chunksize': chunksize, 293 }) 294 295 if is_dask and dd is not None: 296 ddf = dd.read_sql_query( 297 formatted_query, 298 self.URI, 299 **read_sql_query_kwargs 300 ) 301 else: 302 303 def get_chunk_generator(connectable): 304 chunk_generator = pd.read_sql_query( 305 formatted_query, 306 connectable, # NOTE: test this against `self.engine`. 307 **read_sql_query_kwargs 308 ) 309 310 to_return = ( 311 ( 312 chunk_generator 313 if not (as_hook_results or chunksize is None) 314 else ( 315 _process_chunk(_chunk) 316 for _chunk in chunk_generator 317 ) 318 ) 319 if as_iterator or chunksize is None 320 else ( 321 list(pool.imap(_process_chunk, chunk_generator)) 322 if as_hook_results 323 else None 324 ) 325 ) 326 return chunk_generator, to_return 327 328 if self.flavor in SKIP_READ_TRANSACTION_FLAVORS: 329 chunk_generator, to_return = get_chunk_generator(self.engine) 330 else: 331 with self.engine.begin() as transaction: 332 with transaction.execution_options( 333 stream_results=stream_results, 334 ) as connection: 335 chunk_generator, to_return = get_chunk_generator(connection) 336 337 if to_return is not None: 338 return to_return 339 340 except Exception as e: 341 if debug: 342 dprint(f"[{self}] Failed to execute query:\n\n{query_or_table}\n\n") 343 if not silent: 344 warn(str(e), stacklevel=3) 345 from meerschaum.utils.formatting import get_console 346 if not silent: 347 get_console().print_exception() 348 349 return None 350 351 if is_dask and dd is not None: 352 ddf = ddf.reset_index() 353 return ddf 354 355 chunk_list = [] 356 read_chunks = 0 357 chunk_hook_results = [] 358 if chunksize is None: 359 chunk_list.append(chunk_generator) 360 elif as_iterator: 361 return chunk_generator 362 else: 363 try: 364 for chunk in chunk_generator: 365 if chunk_hook is not None: 366 chunk_args, chunk_kwargs = _get_chunk_args_kwargs(chunk) 367 chunk_hook_results.append(chunk_hook(*chunk_args, **chunk_kwargs)) 368 chunk_list.append(chunk) 369 read_chunks += 1 370 if chunks is not None and read_chunks >= chunks: 371 break 372 except Exception as e: 373 warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3) 374 from meerschaum.utils.formatting import get_console 375 if not silent: 376 get_console().print_exception() 377 378 read_chunks = 0 379 try: 380 for chunk in chunk_generator: 381 if chunk_hook is not None: 382 chunk_args, chunk_kwargs = _get_chunk_args_kwargs(chunk) 383 chunk_hook_results.append(chunk_hook(*chunk_args, **chunk_kwargs)) 384 chunk_list.append(chunk) 385 read_chunks += 1 386 if chunks is not None and read_chunks >= chunks: 387 break 388 except Exception as e: 389 warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3) 390 from meerschaum.utils.formatting import get_console 391 if not silent: 392 get_console().print_exception() 393 394 return None 395 396 ### If no chunks returned, read without chunks 397 ### to get columns 398 if len(chunk_list) == 0: 399 with warnings.catch_warnings(): 400 warnings.filterwarnings('ignore', 'case sensitivity issues') 401 _ = read_sql_query_kwargs.pop('chunksize', None) 402 with self.engine.begin() as connection: 403 chunk_list.append( 404 pd.read_sql_query( 405 formatted_query, 406 connection, 407 **read_sql_query_kwargs 408 ) 409 ) 410 411 ### call the hook on any missed chunks. 412 if chunk_hook is not None and len(chunk_list) > len(chunk_hook_results): 413 for c in chunk_list[len(chunk_hook_results):]: 414 chunk_args, chunk_kwargs = _get_chunk_args_kwargs(c) 415 chunk_hook_results.append(chunk_hook(*chunk_args, **chunk_kwargs)) 416 417 ### chunksize is not None so must iterate 418 if debug: 419 end = time.perf_counter() 420 dprint(f"Fetched {len(chunk_list)} chunks in {round(end - start, 2)} seconds.") 421 422 if as_hook_results: 423 return chunk_hook_results 424 425 ### Skip `pd.concat()` if `as_chunks` is specified. 426 if as_chunks: 427 for c in chunk_list: 428 c.reset_index(drop=True, inplace=True) 429 for col in get_numeric_cols(c): 430 c[col] = c[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x) 431 return chunk_list 432 433 df = pd.concat(chunk_list).reset_index(drop=True) 434 ### NOTE: The calls to `canonical()` are to drop leading and trailing zeroes. 435 for col in get_numeric_cols(df): 436 df[col] = df[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x) 437 438 return df
Read a SQL query or table into a pandas dataframe.
Parameters
- query_or_table (Union[str, sqlalchemy.Query]): The SQL query (sqlalchemy Query or string) or name of the table from which to select.
- params (Optional[Dict[str, Any]], default None):
ListorDictof parameters to pass topandas.read_sql(). See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html - dtype (Optional[Dict[str, Any]], default None):
A dictionary of data types to pass to
pandas.read_sql(). See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html chunksize (Optional[int], default -1): How many chunks to read at a time.
Nonewill read everything in one large chunk. Defaults to system configuration.NOTE: DuckDB does not allow for chunking.
- workers (Optional[int], default None):
How many threads to use when consuming the generator.
Only applies if
chunk_hookis provided. - chunk_hook (Optional[Callable[[pandas.DataFrame], Any]], default None):
Hook function to execute once per chunk, e.g. writing and reading chunks intermittently.
See
--sync-chunksfor an example. NOTE:as_iteratorMUST be False (default). as_hook_results (bool, default False): If
True, return aListof the outputs of the hook function. Only applicable ifchunk_hookis not None.NOTE:
as_iteratorMUST beFalse(default).- chunks (Optional[int], default None):
Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and
return into a single dataframe.
For example, to limit the returned dataframe to 100,000 rows,
you could specify a
chunksizeof1000andchunksof100. - schema (Optional[str], default None):
If just a table name is provided, optionally specify the table schema.
Defaults to
SQLConnector.schema. - as_chunks (bool, default False):
If
True, return a list of DataFrames. Otherwise return a single DataFrame. - as_iterator (bool, default False):
If
True, return the pandas DataFrame iterator.chunksizemust not beNone(falls back to 1000 if so), and hooks are not called in this case. - index_col (Optional[str], default None): If using Dask, use this column as the index column. If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame.
- silent (bool, default False):
If
True, don't raise warnings in case of errors. Defaults toFalse.
Returns
- A
pd.DataFrame(default case), or an iterator, or a list of dataframes / iterators, - or
Noneif something breaks.
441def value( 442 self, 443 query: str, 444 *args: Any, 445 use_pandas: bool = False, 446 **kw: Any 447) -> Any: 448 """ 449 Execute the provided query and return the first value. 450 451 Parameters 452 ---------- 453 query: str 454 The SQL query to execute. 455 456 *args: Any 457 The arguments passed to `meerschaum.connectors.sql.SQLConnector.exec` 458 if `use_pandas` is `False` (default) or to `meerschaum.connectors.sql.SQLConnector.read`. 459 460 use_pandas: bool, default False 461 If `True`, use `meerschaum.connectors.SQLConnector.read`, otherwise use 462 `meerschaum.connectors.sql.SQLConnector.exec` (default). 463 **NOTE:** This is always `True` for DuckDB. 464 465 **kw: Any 466 See `args`. 467 468 Returns 469 ------- 470 Any value returned from the query. 471 472 """ 473 from meerschaum.utils.packages import attempt_import 474 if self.flavor == 'duckdb': 475 use_pandas = True 476 if use_pandas: 477 try: 478 return self.read(query, *args, **kw).iloc[0, 0] 479 except Exception: 480 return None 481 482 _close = kw.get('close', True) 483 _commit = kw.get('commit', (self.flavor != 'mssql')) 484 485 try: 486 result, connection = self.exec( 487 query, 488 *args, 489 with_connection=True, 490 close=False, 491 commit=_commit, 492 **kw 493 ) 494 first = result.first() if result is not None else None 495 _val = first[0] if first is not None else None 496 except Exception as e: 497 warn(e, stacklevel=3) 498 return None 499 if _close: 500 try: 501 connection.close() 502 except Exception as e: 503 warn("Failed to close connection with exception:\n" + str(e)) 504 return _val
Execute the provided query and return the first value.
Parameters
- query (str): The SQL query to execute.
- *args (Any):
The arguments passed to
meerschaum.connectors.sql.SQLConnector.execifuse_pandasisFalse(default) or tomeerschaum.connectors.sql.SQLConnector.read. - use_pandas (bool, default False):
If
True, usemeerschaum.connectors.SQLConnector.read, otherwise usemeerschaum.connectors.sql.SQLConnector.exec(default). NOTE: This is alwaysTruefor DuckDB. - **kw (Any):
See
args.
Returns
- Any value returned from the query.
518def exec( 519 self, 520 query: str, 521 *args: Any, 522 silent: bool = False, 523 debug: bool = False, 524 commit: Optional[bool] = None, 525 close: Optional[bool] = None, 526 with_connection: bool = False, 527 _connection=None, 528 _transaction=None, 529 **kw: Any 530) -> Union[ 531 sqlalchemy.engine.result.resultProxy, 532 sqlalchemy.engine.cursor.LegacyCursorResult, 533 Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection], 534 Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection], 535 None 536]: 537 """ 538 Execute SQL code and return the `sqlalchemy` result, e.g. when calling stored procedures. 539 540 If inserting data, please use bind variables to avoid SQL injection! 541 542 Parameters 543 ---------- 544 query: Union[str, List[str], Tuple[str]] 545 The query to execute. 546 If `query` is a list or tuple, call `self.exec_queries()` instead. 547 548 args: Any 549 Arguments passed to `sqlalchemy.engine.execute`. 550 551 silent: bool, default False 552 If `True`, suppress warnings. 553 554 commit: Optional[bool], default None 555 If `True`, commit the changes after execution. 556 Causes issues with flavors like `'mssql'`. 557 This does not apply if `query` is a list of strings. 558 559 close: Optional[bool], default None 560 If `True`, close the connection after execution. 561 Causes issues with flavors like `'mssql'`. 562 This does not apply if `query` is a list of strings. 563 564 with_connection: bool, default False 565 If `True`, return a tuple including the connection object. 566 This does not apply if `query` is a list of strings. 567 568 Returns 569 ------- 570 The `sqlalchemy` result object, or a tuple with the connection if `with_connection` is provided. 571 572 """ 573 if isinstance(query, (list, tuple)): 574 return self.exec_queries( 575 list(query), 576 *args, 577 silent=silent, 578 debug=debug, 579 **kw 580 ) 581 582 from meerschaum.utils.packages import attempt_import 583 sqlalchemy = attempt_import("sqlalchemy", lazy=False) 584 if debug: 585 dprint(f"[{self}] Executing query:\n{query}") 586 587 _close = close if close is not None else (self.flavor != 'mssql') 588 _commit = commit if commit is not None else ( 589 (self.flavor != 'mssql' or 'select' not in str(query).lower()) 590 ) 591 592 ### Select and Insert objects need to be compiled (SQLAlchemy 2.0.0+). 593 if not hasattr(query, 'compile'): 594 query = sqlalchemy.text(query) 595 596 connection = _connection if _connection is not None else self.get_connection() 597 598 try: 599 transaction = ( 600 _transaction 601 if _transaction is not None else ( 602 connection.begin() 603 if _commit 604 else None 605 ) 606 ) 607 except sqlalchemy.exc.InvalidRequestError as e: 608 if _connection is not None or _transaction is not None: 609 raise e 610 connection = self.get_connection(rebuild=True) 611 transaction = connection.begin() 612 613 if transaction is not None and not transaction.is_active and _transaction is not None: 614 connection = self.get_connection(rebuild=True) 615 transaction = connection.begin() if _commit else None 616 617 result = None 618 try: 619 result = connection.execute(query, *args, **kw) 620 if _commit: 621 transaction.commit() 622 except Exception as e: 623 if debug: 624 dprint(f"[{self}] Failed to execute query:\n\n{query}\n\n{e}") 625 if not silent: 626 warn(str(e), stacklevel=3) 627 result = None 628 if _commit: 629 if debug: 630 dprint(f"[{self}] Rolling back failed transaction...") 631 transaction.rollback() 632 connection = self.get_connection(rebuild=True) 633 finally: 634 if _close: 635 connection.close() 636 637 if debug: 638 dprint(f"[{self}] Done executing.") 639 640 if with_connection: 641 return result, connection 642 643 return result
Execute SQL code and return the sqlalchemy result, e.g. when calling stored procedures.
If inserting data, please use bind variables to avoid SQL injection!
Parameters
- query (Union[str, List[str], Tuple[str]]):
The query to execute.
If
queryis a list or tuple, callself.exec_queries()instead. - args (Any):
Arguments passed to
sqlalchemy.engine.execute. - silent (bool, default False):
If
True, suppress warnings. - commit (Optional[bool], default None):
If
True, commit the changes after execution. Causes issues with flavors like'mssql'. This does not apply ifqueryis a list of strings. - close (Optional[bool], default None):
If
True, close the connection after execution. Causes issues with flavors like'mssql'. This does not apply ifqueryis a list of strings. - with_connection (bool, default False):
If
True, return a tuple including the connection object. This does not apply ifqueryis a list of strings.
Returns
- The
sqlalchemyresult object, or a tuple with the connection ifwith_connectionis provided.
507def execute( 508 self, 509 *args : Any, 510 **kw : Any 511) -> Optional[sqlalchemy.engine.result.resultProxy]: 512 """ 513 An alias for `meerschaum.connectors.sql.SQLConnector.exec`. 514 """ 515 return self.exec(*args, **kw)
An alias for meerschaum.connectors.sql.SQLConnector.exec.
747def to_sql( 748 self, 749 df: pandas.DataFrame, 750 name: str = None, 751 index: bool = False, 752 if_exists: str = 'replace', 753 method: str = "", 754 chunksize: Optional[int] = -1, 755 schema: Optional[str] = None, 756 safe_copy: bool = True, 757 silent: bool = False, 758 debug: bool = False, 759 as_tuple: bool = False, 760 as_dict: bool = False, 761 _connection=None, 762 _transaction=None, 763 **kw 764) -> Union[bool, SuccessTuple]: 765 """ 766 Upload a DataFrame's contents to the SQL server. 767 768 Parameters 769 ---------- 770 df: pd.DataFrame 771 The DataFrame to be inserted. 772 773 name: str 774 The name of the table to be created. 775 776 index: bool, default False 777 If True, creates the DataFrame's indices as columns. 778 779 if_exists: str, default 'replace' 780 Drop and create the table ('replace') or append if it exists 781 ('append') or raise Exception ('fail'). 782 Options are ['replace', 'append', 'fail']. 783 784 method: str, default '' 785 None or multi. Details on pandas.to_sql. 786 787 chunksize: Optional[int], default -1 788 How many rows to insert at a time. 789 790 schema: Optional[str], default None 791 Optionally override the schema for the table. 792 Defaults to `SQLConnector.schema`. 793 794 safe_copy: bool, defaul True 795 If `True`, copy the dataframe before making any changes. 796 797 as_tuple: bool, default False 798 If `True`, return a (success_bool, message) tuple instead of a `bool`. 799 Defaults to `False`. 800 801 as_dict: bool, default False 802 If `True`, return a dictionary of transaction information. 803 The keys are `success`, `msg`, `start`, `end`, `duration`, `num_rows`, `chunksize`, 804 `method`, and `target`. 805 806 kw: Any 807 Additional arguments will be passed to the DataFrame's `to_sql` function 808 809 Returns 810 ------- 811 Either a `bool` or a `SuccessTuple` (depends on `as_tuple`). 812 """ 813 import time 814 import json 815 from datetime import timedelta 816 from meerschaum.utils.warnings import error, warn 817 import warnings 818 import functools 819 import traceback 820 821 if name is None: 822 error(f"Name must not be `None` to insert data into {self}.") 823 824 ### We're requiring `name` to be positional, and sometimes it's passed in from background jobs. 825 kw.pop('name', None) 826 827 schema = schema or self.schema 828 829 from meerschaum.utils.sql import ( 830 sql_item_name, 831 table_exists, 832 json_flavors, 833 truncate_item_name, 834 DROP_IF_EXISTS_FLAVORS, 835 ) 836 from meerschaum.utils.dataframe import ( 837 get_json_cols, 838 get_numeric_cols, 839 get_uuid_cols, 840 get_bytes_cols, 841 get_geometry_cols, 842 ) 843 from meerschaum.utils.dtypes import ( 844 are_dtypes_equal, 845 coerce_timezone, 846 encode_bytes_for_bytea, 847 serialize_bytes, 848 serialize_decimal, 849 serialize_geometry, 850 json_serialize_value, 851 get_geometry_type_srid, 852 ) 853 from meerschaum.utils.dtypes.sql import ( 854 PD_TO_SQLALCHEMY_DTYPES_FLAVORS, 855 get_db_type_from_pd_type, 856 get_pd_type_from_db_type, 857 get_numeric_precision_scale, 858 ) 859 from meerschaum.utils.misc import interval_str 860 from meerschaum.connectors.sql._create_engine import flavor_configs 861 from meerschaum.utils.packages import attempt_import, import_pandas 862 sqlalchemy = attempt_import('sqlalchemy', debug=debug, lazy=False) 863 pd = import_pandas() 864 is_dask = 'dask' in df.__module__ 865 866 bytes_cols = get_bytes_cols(df) 867 numeric_cols = get_numeric_cols(df) 868 geometry_cols = get_geometry_cols(df) 869 ### NOTE: This excludes non-numeric serialized Decimals (e.g. SQLite). 870 numeric_cols_dtypes = { 871 col: typ 872 for col, typ in kw.get('dtype', {}).items() 873 if ( 874 col in df.columns 875 and 'numeric' in str(typ).lower() 876 ) 877 } 878 numeric_cols.extend([col for col in numeric_cols_dtypes if col not in numeric_cols]) 879 numeric_cols_precisions_scales = { 880 col: ( 881 (typ.precision, typ.scale) 882 if hasattr(typ, 'precision') 883 else get_numeric_precision_scale(self.flavor) 884 ) 885 for col, typ in numeric_cols_dtypes.items() 886 } 887 geometry_cols_dtypes = { 888 col: typ 889 for col, typ in kw.get('dtype', {}).items() 890 if ( 891 col in df.columns 892 and 'geometry' in str(typ).lower() or 'geography' in str(typ).lower() 893 ) 894 } 895 geometry_cols.extend([col for col in geometry_cols_dtypes if col not in geometry_cols]) 896 geometry_cols_types_srids = { 897 col: (typ.geometry_type, typ.srid) 898 if hasattr(typ, 'srid') 899 else get_geometry_type_srid() 900 for col, typ in geometry_cols_dtypes.items() 901 } 902 903 cols_pd_types = { 904 col: get_pd_type_from_db_type(str(typ)) 905 for col, typ in kw.get('dtype', {}).items() 906 } 907 cols_pd_types.update({ 908 col: f'numeric[{precision},{scale}]' 909 for col, (precision, scale) in numeric_cols_precisions_scales.items() 910 if precision and scale 911 }) 912 cols_db_types = { 913 col: get_db_type_from_pd_type(typ, flavor=self.flavor) 914 for col, typ in cols_pd_types.items() 915 } 916 917 enable_bulk_insert = mrsm.get_config( 918 'system', 'connectors', 'sql', 'bulk_insert', self.flavor, 919 warn=False, 920 ) or False 921 stats = {'target': name} 922 ### resort to defaults if None 923 copied = False 924 use_bulk_insert = False 925 if method == "": 926 if enable_bulk_insert: 927 method = ( 928 functools.partial(mssql_insert_json, cols_types=cols_db_types, debug=debug) 929 if self.flavor == 'mssql' 930 else functools.partial(psql_insert_copy, debug=debug) 931 ) 932 use_bulk_insert = True 933 else: 934 ### Should resolve to 'multi' or `None`. 935 method = flavor_configs.get(self.flavor, {}).get('to_sql', {}).get('method', 'multi') 936 937 if bytes_cols and (use_bulk_insert or self.flavor == 'oracle'): 938 if safe_copy and not copied: 939 df = df.copy() 940 copied = True 941 bytes_serializer = ( 942 functools.partial(encode_bytes_for_bytea, with_prefix=(self.flavor != 'oracle')) 943 if self.flavor != 'mssql' 944 else serialize_bytes 945 ) 946 for col in bytes_cols: 947 df[col] = df[col].apply(bytes_serializer) 948 949 ### Check for numeric columns. 950 for col in numeric_cols: 951 precision, scale = numeric_cols_precisions_scales.get( 952 col, 953 get_numeric_precision_scale(self.flavor) 954 ) 955 df[col] = df[col].apply( 956 functools.partial( 957 serialize_decimal, 958 quantize=True, 959 precision=precision, 960 scale=scale, 961 ) 962 ) 963 964 geometry_format = 'wkt' if self.flavor == 'mssql' else ( 965 'gpkg_wkb' 966 if self.flavor == 'geopackage' 967 else 'wkb_hex' 968 ) 969 for col in geometry_cols: 970 geometry_type, srid = geometry_cols_types_srids.get(col, get_geometry_type_srid()) 971 with warnings.catch_warnings(): 972 warnings.simplefilter("ignore") 973 df[col] = df[col].apply( 974 functools.partial( 975 serialize_geometry, 976 geometry_format=geometry_format, 977 ) 978 ) 979 980 stats['method'] = method.__name__ if hasattr(method, '__name__') else str(method) 981 982 default_chunksize = self._sys_config.get('chunksize', None) 983 chunksize = chunksize if chunksize != -1 else default_chunksize 984 if chunksize is not None and self.flavor in _max_chunks_flavors: 985 if chunksize > _max_chunks_flavors[self.flavor]: 986 if chunksize != default_chunksize: 987 warn( 988 f"The specified chunksize of {chunksize} exceeds the maximum of " 989 + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n" 990 + f" Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.", 991 stacklevel = 3, 992 ) 993 chunksize = _max_chunks_flavors[self.flavor] 994 stats['chunksize'] = chunksize 995 996 success, msg = False, "Default to_sql message" 997 start = time.perf_counter() 998 if debug: 999 msg = f"[{self}] Inserting {len(df)} rows with chunksize: {chunksize}..." 1000 print(msg, end="", flush=True) 1001 stats['num_rows'] = len(df) 1002 1003 ### Check if the name is too long. 1004 truncated_name = truncate_item_name(name, self.flavor) 1005 if name != truncated_name: 1006 if self.flavor not in ('oracle', 'mysql', 'mariadb'): 1007 warn( 1008 f"Table '{name}' is too long for '{self.flavor}'," 1009 f" will instead create the table '{truncated_name}'." 1010 ) 1011 1012 ### filter out non-pandas args 1013 import inspect 1014 to_sql_params = inspect.signature(df.to_sql).parameters 1015 to_sql_kw = {} 1016 for k, v in kw.items(): 1017 if k in to_sql_params: 1018 to_sql_kw[k] = v 1019 1020 to_sql_kw.update({ 1021 'name': truncated_name, 1022 'schema': schema, 1023 ('con' if not is_dask else 'uri'): (self.engine if not is_dask else self.URI), 1024 'index': index, 1025 'if_exists': if_exists, 1026 'method': method, 1027 'chunksize': chunksize, 1028 }) 1029 if is_dask: 1030 to_sql_kw.update({ 1031 'parallel': True, 1032 }) 1033 elif _connection is not None: 1034 to_sql_kw['con'] = _connection 1035 1036 if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else "" 1037 if self.flavor == 'oracle': 1038 ### For some reason 'replace' doesn't work properly in pandas, 1039 ### so try dropping first. 1040 if if_exists == 'replace' and table_exists(name, self, schema=schema, debug=debug): 1041 success = self.exec( 1042 f"DROP TABLE {if_exists_str}" + sql_item_name(name, 'oracle', schema) 1043 ) is not None 1044 if not success: 1045 warn(f"Unable to drop {name}") 1046 1047 ### Enforce NVARCHAR(2000) as text instead of CLOB. 1048 dtype = to_sql_kw.get('dtype', {}) 1049 for col, typ in df.dtypes.items(): 1050 if are_dtypes_equal(str(typ), 'object'): 1051 dtype[col] = sqlalchemy.types.NVARCHAR(2000) 1052 elif are_dtypes_equal(str(typ), 'int'): 1053 dtype[col] = sqlalchemy.types.INTEGER 1054 to_sql_kw['dtype'] = dtype 1055 elif self.flavor == 'duckdb': 1056 dtype = to_sql_kw.get('dtype', {}) 1057 dt_cols = [col for col, typ in df.dtypes.items() if are_dtypes_equal(str(typ), 'datetime')] 1058 for col in dt_cols: 1059 df[col] = coerce_timezone(df[col], strip_utc=False) 1060 elif self.flavor == 'mssql': 1061 dtype = to_sql_kw.get('dtype', {}) 1062 dt_cols = [col for col, typ in df.dtypes.items() if are_dtypes_equal(str(typ), 'datetime')] 1063 new_dtype = {} 1064 for col in dt_cols: 1065 if col in dtype: 1066 continue 1067 dt_typ = get_db_type_from_pd_type(str(df.dtypes[col]), self.flavor, as_sqlalchemy=True) 1068 if col not in dtype: 1069 new_dtype[col] = dt_typ 1070 1071 dtype.update(new_dtype) 1072 to_sql_kw['dtype'] = dtype 1073 1074 ### Check for JSON columns. 1075 if self.flavor not in json_flavors: 1076 json_cols = get_json_cols(df) 1077 for col in json_cols: 1078 df[col] = df[col].apply( 1079 ( 1080 lambda x: json.dumps(x, default=json_serialize_value, sort_keys=True) 1081 if not isinstance(x, Hashable) 1082 else x 1083 ) 1084 ) 1085 1086 if PD_TO_SQLALCHEMY_DTYPES_FLAVORS['uuid'].get(self.flavor, None) != 'Uuid': 1087 uuid_cols = get_uuid_cols(df) 1088 for col in uuid_cols: 1089 df[col] = df[col].astype(str) 1090 1091 try: 1092 with warnings.catch_warnings(): 1093 warnings.filterwarnings('ignore') 1094 df.to_sql(**to_sql_kw) 1095 success = True 1096 except Exception: 1097 if not silent: 1098 warn(traceback.format_exc()) 1099 success, msg = False, traceback.format_exc() 1100 1101 end = time.perf_counter() 1102 if success: 1103 num_rows = len(df) 1104 msg = ( 1105 f"It took {interval_str(timedelta(seconds=(end - start)))} " 1106 + f"to sync {num_rows:,} row" 1107 + ('s' if num_rows != 1 else '') 1108 + f" to {name}." 1109 ) 1110 stats['start'] = start 1111 stats['end'] = end 1112 stats['duration'] = end - start 1113 1114 if debug: 1115 print(" done.", flush=True) 1116 dprint(msg) 1117 1118 stats['success'] = success 1119 stats['msg'] = msg 1120 if as_tuple: 1121 return success, msg 1122 if as_dict: 1123 return stats 1124 return success
Upload a DataFrame's contents to the SQL server.
Parameters
- df (pd.DataFrame): The DataFrame to be inserted.
- name (str): The name of the table to be created.
- index (bool, default False): If True, creates the DataFrame's indices as columns.
- if_exists (str, default 'replace'): Drop and create the table ('replace') or append if it exists ('append') or raise Exception ('fail'). Options are ['replace', 'append', 'fail'].
- method (str, default ''): None or multi. Details on pandas.to_sql.
- chunksize (Optional[int], default -1): How many rows to insert at a time.
- schema (Optional[str], default None):
Optionally override the schema for the table.
Defaults to
SQLConnector.schema. - safe_copy (bool, defaul True):
If
True, copy the dataframe before making any changes. - as_tuple (bool, default False):
If
True, return a (success_bool, message) tuple instead of abool. Defaults toFalse. - as_dict (bool, default False):
If
True, return a dictionary of transaction information. The keys aresuccess,msg,start,end,duration,num_rows,chunksize,method, andtarget. - kw (Any):
Additional arguments will be passed to the DataFrame's
to_sqlfunction
Returns
- Either a
boolor aSuccessTuple(depends onas_tuple).
646def exec_queries( 647 self, 648 queries: List[ 649 Union[ 650 str, 651 Tuple[str, Callable[['sqlalchemy.orm.session.Session'], List[str]]] 652 ] 653 ], 654 break_on_error: bool = False, 655 rollback: bool = True, 656 silent: bool = False, 657 debug: bool = False, 658) -> List[Union[sqlalchemy.engine.cursor.CursorResult, None]]: 659 """ 660 Execute a list of queries in a single transaction. 661 662 Parameters 663 ---------- 664 queries: List[ 665 Union[ 666 str, 667 Tuple[str, Callable[[], List[str]]] 668 ] 669 ] 670 The queries in the transaction to be executed. 671 If a query is a tuple, the second item of the tuple 672 will be considered a callable hook that returns a list of queries to be executed 673 before the next item in the list. 674 675 break_on_error: bool, default False 676 If `True`, stop executing when a query fails. 677 678 rollback: bool, default True 679 If `break_on_error` is `True`, rollback the transaction if a query fails. 680 681 silent: bool, default False 682 If `True`, suppress warnings. 683 684 Returns 685 ------- 686 A list of SQLAlchemy results. 687 """ 688 from meerschaum.utils.warnings import warn 689 from meerschaum.utils.debug import dprint 690 from meerschaum.utils.packages import attempt_import 691 sqlalchemy, sqlalchemy_orm = attempt_import('sqlalchemy', 'sqlalchemy.orm', lazy=False) 692 session = sqlalchemy_orm.Session(self.engine) 693 694 result = None 695 results = [] 696 with session.begin(): 697 for query in queries: 698 hook = None 699 result = None 700 701 if isinstance(query, tuple): 702 query, hook = query 703 if isinstance(query, str): 704 query = sqlalchemy.text(query) 705 706 if debug: 707 dprint(f"[{self}]\n" + str(query)) 708 709 try: 710 result = session.execute(query) 711 session.flush() 712 except Exception as e: 713 msg = (f"Encountered error while executing:\n{e}") 714 if not silent: 715 warn(msg) 716 elif debug: 717 dprint(f"[{self}]\n" + str(msg)) 718 result = None 719 720 if debug: 721 dprint(f"[{self}] Finished executing.") 722 723 if result is None and break_on_error: 724 if rollback: 725 if debug: 726 dprint(f"[{self}] Rolling back...") 727 session.rollback() 728 results.append(result) 729 break 730 elif result is not None and hook is not None: 731 hook_queries = hook(session) 732 if hook_queries: 733 hook_results = self.exec_queries( 734 hook_queries, 735 break_on_error = break_on_error, 736 rollback=rollback, 737 silent=silent, 738 debug=debug, 739 ) 740 result = (result, hook_results) 741 742 results.append(result) 743 744 return results
Execute a list of queries in a single transaction.
Parameters
- queries (List[): Union[ str, Tuple[str, Callable[[], List[str]]] ]
- ]: The queries in the transaction to be executed. If a query is a tuple, the second item of the tuple will be considered a callable hook that returns a list of queries to be executed before the next item in the list.
- break_on_error (bool, default False):
If
True, stop executing when a query fails. - rollback (bool, default True):
If
break_on_errorisTrue, rollback the transaction if a query fails. - silent (bool, default False):
If
True, suppress warnings.
Returns
- A list of SQLAlchemy results.
1322def get_connection(self, rebuild: bool = False) -> 'sqlalchemy.engine.base.Connection': 1323 """ 1324 Return the current alive connection. 1325 1326 Parameters 1327 ---------- 1328 rebuild: bool, default False 1329 If `True`, close the previous connection and open a new one. 1330 1331 Returns 1332 ------- 1333 A `sqlalchemy.engine.base.Connection` object. 1334 """ 1335 import threading 1336 if '_thread_connections' not in self.__dict__: 1337 self.__dict__['_thread_connections'] = {} 1338 1339 self._cleanup_connections() 1340 1341 thread_id = threading.get_ident() 1342 1343 thread_connections = self.__dict__.get('_thread_connections', {}) 1344 connection = thread_connections.get(thread_id, None) 1345 1346 if rebuild and connection is not None: 1347 try: 1348 connection.close() 1349 except Exception: 1350 pass 1351 1352 _ = thread_connections.pop(thread_id, None) 1353 connection = None 1354 1355 if connection is None or connection.closed: 1356 connection = self.engine.connect() 1357 thread_connections[thread_id] = connection 1358 1359 return connection
Return the current alive connection.
Parameters
- rebuild (bool, default False):
If
True, close the previous connection and open a new one.
Returns
- A
sqlalchemy.engine.base.Connectionobject.
871def test_connection( 872 self, 873 **kw: Any 874) -> Union[bool, None]: 875 """ 876 Test if a successful connection to the database may be made. 877 878 Parameters 879 ---------- 880 **kw: 881 The keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`. 882 883 Returns 884 ------- 885 `True` if a connection is made, otherwise `False` or `None` in case of failure. 886 887 """ 888 import warnings 889 from meerschaum.connectors.poll import retry_connect 890 _default_kw = {'max_retries': 1, 'retry_wait': 0, 'warn': False, 'connector': self} 891 _default_kw.update(kw) 892 with warnings.catch_warnings(): 893 warnings.filterwarnings('ignore', 'Could not') 894 try: 895 return retry_connect(**_default_kw) 896 except Exception: 897 return False
Test if a successful connection to the database may be made.
Parameters
- **kw:: The keyword arguments are passed to
meerschaum.connectors.poll.retry_connect.
Returns
Trueif a connection is made, otherwiseFalseorNonein case of failure.
18def fetch( 19 self, 20 pipe: mrsm.Pipe, 21 begin: Union[datetime, int, str, None] = '', 22 end: Union[datetime, int, str, None] = None, 23 check_existing: bool = True, 24 chunksize: Optional[int] = -1, 25 workers: Optional[int] = None, 26 debug: bool = False, 27 **kw: Any 28) -> Union['pd.DataFrame', List[Any], None]: 29 """Execute the SQL definition and return a Pandas DataFrame. 30 31 Parameters 32 ---------- 33 pipe: mrsm.Pipe 34 The pipe object which contains the `fetch` metadata. 35 36 - pipe.columns['datetime']: str 37 - Name of the datetime column for the remote table. 38 - pipe.parameters['fetch']: Dict[str, Any] 39 - Parameters necessary to execute a query. 40 - pipe.parameters['fetch']['definition']: str 41 - Raw SQL query to execute to generate the pandas DataFrame. 42 - pipe.parameters['fetch']['backtrack_minutes']: Union[int, float] 43 - How many minutes before `begin` to search for data (*optional*). 44 45 begin: Union[datetime, int, str, None], default None 46 Most recent datatime to search for data. 47 If `backtrack_minutes` is provided, subtract `backtrack_minutes`. 48 49 end: Union[datetime, int, str, None], default None 50 The latest datetime to search for data. 51 If `end` is `None`, do not bound 52 53 check_existing: bool, defult True 54 If `False`, use a backtrack interval of 0 minutes. 55 56 chunksize: Optional[int], default -1 57 How many rows to load into memory at once. 58 Otherwise the entire result set is loaded into memory. 59 60 workers: Optional[int], default None 61 How many threads to use when consuming the generator. 62 Defaults to the number of cores. 63 64 debug: bool, default False 65 Verbosity toggle. 66 67 Returns 68 ------- 69 A pandas DataFrame generator. 70 """ 71 meta_def = self.get_pipe_metadef( 72 pipe, 73 begin=begin, 74 end=end, 75 check_existing=check_existing, 76 debug=debug, 77 **kw 78 ) 79 chunks = self.read( 80 meta_def, 81 chunksize=chunksize, 82 workers=workers, 83 as_iterator=True, 84 debug=debug, 85 ) 86 return chunks
Execute the SQL definition and return a Pandas DataFrame.
Parameters
pipe (mrsm.Pipe): The pipe object which contains the
fetchmetadata.- pipe.columns['datetime']: str
- Name of the datetime column for the remote table.
- pipe.parameters['fetch']: Dict[str, Any]
- Parameters necessary to execute a query.
- pipe.parameters['fetch']['definition']: str
- Raw SQL query to execute to generate the pandas DataFrame.
- pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
- How many minutes before
beginto search for data (optional).
- How many minutes before
- pipe.columns['datetime']: str
- begin (Union[datetime, int, str, None], default None):
Most recent datatime to search for data.
If
backtrack_minutesis provided, subtractbacktrack_minutes. - end (Union[datetime, int, str, None], default None):
The latest datetime to search for data.
If
endisNone, do not bound - check_existing (bool, defult True):
If
False, use a backtrack interval of 0 minutes. - chunksize (Optional[int], default -1): How many rows to load into memory at once. Otherwise the entire result set is loaded into memory.
- workers (Optional[int], default None): How many threads to use when consuming the generator. Defaults to the number of cores.
- debug (bool, default False): Verbosity toggle.
Returns
- A pandas DataFrame generator.
89def get_pipe_metadef( 90 self, 91 pipe: mrsm.Pipe, 92 params: Optional[Dict[str, Any]] = None, 93 begin: Union[datetime, int, str, None] = '', 94 end: Union[datetime, int, str, None] = None, 95 check_existing: bool = True, 96 debug: bool = False, 97 **kw: Any 98) -> Union[str, None]: 99 """ 100 Return a pipe's meta definition fetch query. 101 102 params: Optional[Dict[str, Any]], default None 103 Optional params dictionary to build the `WHERE` clause. 104 See `meerschaum.utils.sql.build_where`. 105 106 begin: Union[datetime, int, str, None], default None 107 Most recent datatime to search for data. 108 If `backtrack_minutes` is provided, subtract `backtrack_minutes`. 109 110 end: Union[datetime, int, str, None], default None 111 The latest datetime to search for data. 112 If `end` is `None`, do not bound 113 114 check_existing: bool, default True 115 If `True`, apply the backtrack interval. 116 117 debug: bool, default False 118 Verbosity toggle. 119 120 Returns 121 ------- 122 A pipe's meta definition fetch query string. 123 """ 124 from meerschaum.utils.warnings import warn 125 from meerschaum.utils.sql import ( 126 sql_item_name, 127 dateadd_str, 128 build_where, 129 wrap_query_with_cte, 130 format_cte_subquery, 131 ) 132 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 133 from meerschaum.config import get_config 134 from meerschaum.utils.dtypes import ( 135 get_current_timestamp, 136 MRSM_PRECISION_UNITS_SCALARS, 137 MRSM_PRECISION_UNITS_ALIASES, 138 ) 139 140 parent = pipe.parent 141 parent_dt_col = parent.columns.get('datetime', None) if parent is not None else None 142 parent_dt_typ = parent.dtypes.get(parent_dt_col, 'datetime') if parent_dt_col else None 143 dt_col = parent_dt_col or pipe.columns.get('datetime', None) 144 dt_typ = parent_dt_typ or pipe.dtypes.get(dt_col, 'datetime') 145 db_dt_typ = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None 146 precision = parent.precision if parent_dt_typ else pipe.precision 147 if not dt_col: 148 dt_col = pipe.guess_datetime() 149 dt_name = sql_item_name(dt_col, self.flavor, None) if dt_col else None 150 is_guess = True 151 else: 152 dt_name = sql_item_name(dt_col, self.flavor, None) 153 is_guess = False 154 155 if begin not in (None, '') or end is not None: 156 if is_guess: 157 if dt_col is None: 158 warn( 159 f"Unable to determine a datetime column for {pipe}." 160 + "\n Ignoring begin and end...", 161 stack=False, 162 ) 163 begin, end = '', None 164 else: 165 warn( 166 f"A datetime wasn't specified for {pipe}.\n" 167 + f" Using column \"{dt_col}\" for datetime bounds...", 168 stack=False 169 ) 170 171 apply_backtrack = begin == '' and check_existing 172 backtrack_interval = pipe.get_backtrack_interval(check_existing=check_existing, debug=debug) 173 btm = ( 174 int(backtrack_interval.total_seconds() / 60) 175 if isinstance(backtrack_interval, timedelta) 176 else backtrack_interval 177 ) 178 begin = ( 179 pipe.get_sync_time(debug=debug) 180 if begin == '' 181 else begin 182 ) 183 184 if 'int' in dt_typ.lower(): 185 precision_unit = precision.get('unit', 'second') 186 if isinstance(begin, datetime): 187 begin = get_current_timestamp(precision_unit, _now=begin, as_int=True) 188 if isinstance(end, datetime): 189 end = get_current_timestamp(precision_unit, _now=end, as_int=True) 190 191 if isinstance(backtrack_interval, timedelta): 192 true_unit = MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit) 193 btm = int(backtrack_interval.total_seconds() * MRSM_PRECISION_UNITS_SCALARS[true_unit]) 194 195 if begin not in (None, '') and end is not None and begin >= end: 196 begin = None 197 198 begin_da, end_da = None, None 199 if dt_name: 200 begin_da = ( 201 dateadd_str( 202 flavor=self.flavor, 203 datepart=('minute' if not isinstance(begin, int) else None), 204 number=((-1 * btm) if apply_backtrack else 0), 205 begin=begin, 206 db_type=db_dt_typ, 207 ) 208 if begin not in ('', None) 209 else None 210 ) 211 end_da = ( 212 dateadd_str( 213 flavor=self.flavor, 214 datepart=('minute' if not isinstance(end, int) else None), 215 number=0, 216 begin=end, 217 db_type=db_dt_typ, 218 ) 219 if end is not None 220 else None 221 ) 222 223 definition_name = sql_item_name('definition', self.flavor, None) 224 definition = get_pipe_query(pipe) 225 if definition is None: 226 raise ValueError(f"No SQL definition could be found for {pipe}.") 227 228 ### Attempt to push down the predicate if possible. 229 handled_bounding = False 230 if parent_dt_col and (begin not in (None, '') or end is not None): 231 parent_target = parent.target 232 parent_schema = ( 233 parent.instance_connector.get_pipe_schema(parent) 234 if parent.instance_connector.type == 'sql' 235 else None 236 ) 237 parent_dt_name = sql_item_name(parent_dt_col, self.flavor, None) 238 parent_db_dt_typ = get_db_type_from_pd_type(parent_dt_typ, self.flavor) 239 240 p_begin, p_end, p_btm = begin, end, btm 241 if 'int' in parent_dt_typ.lower(): 242 p_precision_unit = precision.get('unit', 'second') 243 if isinstance(begin, (datetime, int)): 244 _dt_begin = ( 245 begin 246 if isinstance(begin, datetime) 247 else datetime.fromtimestamp( 248 begin / MRSM_PRECISION_UNITS_SCALARS[ 249 MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit) 250 ], 251 timezone.utc 252 ) 253 ) 254 p_begin = get_current_timestamp(p_precision_unit, _now=_dt_begin, as_int=True) 255 256 if isinstance(end, (datetime, int)) and end is not None: 257 _dt_end = ( 258 end 259 if isinstance(end, datetime) 260 else datetime.fromtimestamp( 261 end / MRSM_PRECISION_UNITS_SCALARS[ 262 MRSM_PRECISION_UNITS_ALIASES.get(precision_unit, precision_unit) 263 ], 264 timezone.utc 265 ) 266 ) 267 p_end = get_current_timestamp(p_precision_unit, _now=_dt_end, as_int=True) 268 269 if isinstance(backtrack_interval, timedelta): 270 p_true_unit = MRSM_PRECISION_UNITS_ALIASES.get(p_precision_unit, p_precision_unit) 271 p_btm = int( 272 backtrack_interval.total_seconds() 273 * MRSM_PRECISION_UNITS_SCALARS[p_true_unit] 274 ) 275 276 parent_begin_da = ( 277 dateadd_str( 278 flavor=self.flavor, 279 datepart='minute', 280 number=((-1 * p_btm) if apply_backtrack else 0), 281 begin=p_begin, 282 db_type=parent_db_dt_typ, 283 ) 284 if p_begin not in ('', None) 285 else None 286 ) 287 parent_end_da = ( 288 dateadd_str( 289 flavor=self.flavor, 290 datepart='minute', 291 number=0, 292 begin=p_end, 293 db_type=parent_db_dt_typ, 294 ) 295 if p_end is not None 296 else None 297 ) 298 299 parent_item_name = sql_item_name(parent_target, self.flavor, None) 300 parent_item_name_full = sql_item_name(parent_target, self.flavor, parent_schema) 301 302 # Simple string search for parent target in the original definition. 303 if parent_dt_name and ( 304 parent_item_name in definition 305 or parent_item_name_full in definition 306 or parent_target in definition 307 ): 308 pushdown_cte_name = sql_item_name('_mrsm_pushdown', self.flavor, None) 309 pushdown_where = "" 310 if parent_begin_da: 311 pushdown_where += f"\n {parent_dt_name} >= {parent_begin_da}" 312 if parent_begin_da and parent_end_da: 313 pushdown_where += "\n AND" 314 if parent_end_da: 315 pushdown_where += f"\n {parent_dt_name} < {parent_end_da}" 316 317 pushdown_query = ( 318 f"SELECT *\nFROM {parent_item_name_full}" 319 + f"\nWHERE {pushdown_where}" 320 ) 321 322 # Replace occurrences of parent target with pushdown CTE in the definition body. 323 parent_found = False 324 patterns_to_replace = [ 325 parent_item_name_full, 326 parent_item_name, 327 parent_target, 328 ] 329 330 new_definition_body = definition 331 for pattern in patterns_to_replace: 332 if pattern in new_definition_body: 333 new_definition_body = new_definition_body.replace(pattern, pushdown_cte_name) 334 parent_found = True 335 336 if parent_found: 337 definition = wrap_query_with_cte( 338 pushdown_query, 339 new_definition_body, 340 self.flavor, 341 cte_name='_mrsm_pushdown', 342 ) 343 handled_bounding = True 344 345 meta_def = ( 346 format_cte_subquery(definition, self.flavor, 'definition') if ( 347 (not (pipe.columns or {}).get('id', None)) 348 or (not get_config('system', 'experimental', 'join_fetch')) 349 ) else _join_fetch_query(pipe, self.flavor, debug=debug, **kw) 350 ) 351 352 has_where = 'where' in meta_def.lower()[meta_def.lower().rfind('definition'):] 353 if dt_name and (begin_da or end_da) and not handled_bounding: 354 definition_dt_name = f"{definition_name}.{dt_name}" 355 meta_def += "\n" + ("AND" if has_where else "WHERE") + " " 356 has_where = True 357 if begin_da: 358 meta_def += f"\n {definition_dt_name}\n >=\n {begin_da}\n" 359 if begin_da and end_da: 360 meta_def += " AND" 361 if end_da: 362 meta_def += f"\n {definition_dt_name}\n <\n {end_da}\n" 363 364 if params is not None: 365 params_where = build_where(params, self, with_where=False) 366 meta_def += "\n " + ("AND" if has_where else "WHERE") + " " 367 has_where = True 368 meta_def += params_where 369 370 return meta_def.rstrip()
Return a pipe's meta definition fetch query.
params: Optional[Dict[str, Any]], default None
Optional params dictionary to build the WHERE clause.
See meerschaum.utils.sql.build_where.
begin: Union[datetime, int, str, None], default None
Most recent datatime to search for data.
If backtrack_minutes is provided, subtract backtrack_minutes.
end: Union[datetime, int, str, None], default None
The latest datetime to search for data.
If end is None, do not bound
check_existing: bool, default True
If True, apply the backtrack interval.
debug: bool, default False Verbosity toggle.
Returns
- A pipe's meta definition fetch query string.
39def cli( 40 self, 41 debug: bool = False, 42) -> SuccessTuple: 43 """ 44 Launch a subprocess for an interactive CLI. 45 """ 46 from meerschaum.utils.warnings import dprint 47 from meerschaum.utils.venv import venv_exec 48 49 ### Initialize the engine so that dependencies are resolved. 50 _ = self.engine 51 52 env = copy.deepcopy(dict(os.environ)) 53 env_key = f"MRSM_SQL_{self.label.upper()}" 54 env_val = json.dumps(self.meta) 55 env[env_key] = env_val 56 cli_code = ( 57 "import sys\n" 58 "import meerschaum as mrsm\n" 59 "import os\n" 60 f"conn = mrsm.get_connector('sql:{self.label}')\n" 61 "success, msg = conn._cli_exit()\n" 62 "mrsm.pprint((success, msg))\n" 63 "if not success:\n" 64 " raise Exception(msg)" 65 ) 66 if debug: 67 dprint(cli_code) 68 try: 69 _ = venv_exec(cli_code, venv=None, env=env, debug=debug, capture_output=False) 70 except Exception as e: 71 return False, f"[{self}] Failed to start CLI:\n{e}" 72 return True, "Success"
Launch a subprocess for an interactive CLI.
104def get_pipe_size( 105 self, 106 pipe: mrsm.Pipe, 107 debug: bool = False, 108 **kwargs: Any 109) -> Union[int, None]: 110 """ 111 Return the on-disk size of a pipe's target table in bytes. 112 113 For TimescaleDB hypertables, the total hypertable size (including chunks and indexes) 114 is returned. Other flavors use their native size functions where available. 115 116 Parameters 117 ---------- 118 pipe: mrsm.Pipe 119 The pipe whose target table size to measure. 120 121 debug: bool, default False 122 Verbosity toggle. 123 124 Returns 125 ------- 126 An `int` of the number of bytes occupied by the target table, 127 or `None` if the size could not be determined. 128 """ 129 from meerschaum.utils.sql import sql_item_name, hypertable_queries 130 131 if not pipe.exists(debug=debug): 132 return None 133 134 flavor = self.flavor 135 schema = self.get_pipe_schema(pipe) 136 pipe_name = sql_item_name(pipe.target, flavor, schema) 137 138 def _value(query: str) -> Union[int, None]: 139 try: 140 result = self.value(query, silent=True, debug=debug) 141 return int(result) if result is not None else None 142 except Exception: 143 return None 144 145 ### TimescaleDB / Citus expose dedicated size functions for distributed tables. 146 if flavor in hypertable_queries: 147 size = _value(hypertable_queries[flavor].format(table_name=pipe_name)) 148 if size is not None: 149 return size 150 151 if flavor in ('timescaledb', 'timescaledb-ha', 'postgresql', 'postgis', 'citus'): 152 ### `pg_partition_tree` sums the parent plus every child partition (a partitioned parent 153 ### holds no rows itself); it returns the single relation for non-partitioned tables. 154 size = _value( 155 "SELECT SUM(pg_total_relation_size(relid))\n" 156 f"FROM pg_partition_tree('{pipe_name}')" 157 ) 158 if size is not None: 159 return size 160 return _value(f"SELECT pg_total_relation_size('{pipe_name}')") 161 162 if flavor == 'cockroachdb': 163 return _value(f"SELECT pg_total_relation_size('{pipe_name}')") 164 165 if flavor in ('mysql', 'mariadb'): 166 ### A MySQL/MariaDB "schema" is a database; honor a pipe's configured schema so the size 167 ### lookup matches the database the table actually lives in. 168 db_name = ( 169 self.get_pipe_schema(pipe) 170 or self.database 171 or self.parse_uri(self.URI).get('database', None) 172 ) 173 if not db_name: 174 return None 175 clean_db = db_name.replace("'", "''") 176 clean_target = pipe.target.replace("'", "''") 177 return _value( 178 "SELECT data_length + index_length\n" 179 "FROM information_schema.tables\n" 180 f"WHERE table_schema = '{clean_db}' AND table_name = '{clean_target}'" 181 ) 182 183 if flavor == 'mssql': 184 clean_name = pipe_name.replace("'", "''") 185 return _value( 186 "SELECT SUM(reserved_page_count) * 8192\n" 187 "FROM sys.dm_db_partition_stats\n" 188 f"WHERE object_id = OBJECT_ID('{clean_name}')" 189 ) 190 191 if flavor in ('sqlite', 'geopackage'): 192 clean_target = pipe.target.replace("'", "''") 193 ### `dbstat` is only available when SQLite is compiled with SQLITE_ENABLE_DBSTAT_VTAB. 194 return _value(f"SELECT SUM(pgsize) FROM dbstat WHERE name = '{clean_target}'") 195 196 ### duckdb, oracle, and unknown flavors have no portable per-table size query. 197 return None
Return the on-disk size of a pipe's target table in bytes.
For TimescaleDB hypertables, the total hypertable size (including chunks and indexes) is returned. Other flavors use their native size functions where available.
Parameters
- pipe (mrsm.Pipe): The pipe whose target table size to measure.
- debug (bool, default False): Verbosity toggle.
Returns
- An
intof the number of bytes occupied by the target table, - or
Noneif the size could not be determined.
421def compress_pipe( 422 self, 423 pipe: mrsm.Pipe, 424 no_policy: bool = False, 425 debug: bool = False, 426 **kwargs: Any 427) -> SuccessTuple: 428 """ 429 Compress a pipe's target table to reduce disk usage. 430 431 For TimescaleDB, enables the Hypercore columnstore, installs a columnstore (compression) 432 policy (so future synced chunks are converted automatically), and converts any existing 433 uncompressed chunks now. For MySQL/MariaDB and MSSQL, applies the flavor's native table 434 compression. Other flavors are unsupported. 435 436 Parameters 437 ---------- 438 pipe: mrsm.Pipe 439 The pipe whose target table to compress. 440 441 no_policy: bool, default False 442 If `True` (TimescaleDB only), compress existing chunks now without installing an ongoing 443 columnstore (compression) policy. Any pre-existing policy is left untouched. 444 445 debug: bool, default False 446 Verbosity toggle. 447 448 Returns 449 ------- 450 A `SuccessTuple` indicating success, including the amount of disk reclaimed. 451 """ 452 from meerschaum.utils.sql import sql_item_name 453 from meerschaum.utils.formatting import format_bytes 454 455 if not pipe.exists(debug=debug): 456 return False, f"{pipe} does not exist; nothing to compress." 457 458 flavor = self.flavor 459 if flavor not in COMPRESSIBLE_FLAVORS: 460 return False, f"Compression is not supported for flavor '{flavor}'." 461 462 pipe_name = sql_item_name(pipe.target, flavor, self.get_pipe_schema(pipe)) 463 size_before = pipe.get_size(debug=debug) 464 465 ### Each group is run in its own transaction. TimescaleDB requires enabling the columnstore 466 ### and adding its policy in separate transactions (see timescale/timescaledb#8600). 467 query_groups: List[List[str]] = [] 468 if flavor in ('timescaledb', 'timescaledb-ha'): 469 if not self._is_hypertable(pipe, debug=debug): 470 return False, _not_a_hypertable_message(pipe) 471 ### 1. Enable the columnstore (required before any chunk can be converted). 472 query_groups.append([self._get_columnstore_settings_query(pipe)]) 473 ### 2. Install a policy for ongoing conversion — re-create it so the configured `after` 474 ### wins over any existing (e.g. auto-created) policy. Skipped entirely with `no_policy`, 475 ### which compresses existing chunks now but leaves any pre-existing policy untouched. 476 if not no_policy: 477 query_groups.append([self._get_columnstore_remove_policy_query(pipe)]) 478 query_groups.append([self._get_columnstore_policy_query(pipe)]) 479 ### 3. Convert any existing uncompressed chunks now. `compress_chunk` is the still-supported 480 ### function form of `convert_to_columnstore` (transaction-safe, unlike the `CALL` form). 481 query_groups.append([ 482 f"SELECT compress_chunk(c, if_not_compressed => true) " 483 f"FROM show_chunks('{pipe_name}') c" 484 ]) 485 elif flavor in ('mysql', 'mariadb'): 486 query_groups.append([f"ALTER TABLE {pipe_name} ROW_FORMAT=COMPRESSED"]) 487 elif flavor == 'mssql': 488 query_groups.append([ 489 f"ALTER TABLE {pipe_name} REBUILD PARTITION = ALL " 490 "WITH (DATA_COMPRESSION = PAGE)" 491 ]) 492 493 try: 494 success = all( 495 all(self.exec_queries( 496 group, break_on_error=True, rollback=True, silent=(not debug), debug=debug, 497 )) 498 for group in query_groups 499 ) 500 except Exception as e: 501 return False, f"Failed to compress {pipe}:\n{e}" 502 503 if not success: 504 return False, f"Failed to compress {pipe}." 505 506 pipe._clear_cache_key('_exists', debug=debug) 507 size_after = pipe.get_size(debug=debug) 508 509 reclaimed_msg = "" 510 if size_before is not None and size_after is not None: 511 reclaimed = size_before - size_after 512 change_str = f"{format_bytes(size_before)} to {format_bytes(size_after)}" 513 if reclaimed > 0: 514 reclaimed_msg = f"Reclaimed {format_bytes(reclaimed)} ({change_str})." 515 elif reclaimed < 0: 516 ### On small tables, compression overhead can exceed the savings. 517 reclaimed_msg = f"Size grew by {format_bytes(-reclaimed)} ({change_str})." 518 else: 519 reclaimed_msg = f"Size unchanged ({format_bytes(size_before)})." 520 521 return True, reclaimed_msg
Compress a pipe's target table to reduce disk usage.
For TimescaleDB, enables the Hypercore columnstore, installs a columnstore (compression) policy (so future synced chunks are converted automatically), and converts any existing uncompressed chunks now. For MySQL/MariaDB and MSSQL, applies the flavor's native table compression. Other flavors are unsupported.
Parameters
- pipe (mrsm.Pipe): The pipe whose target table to compress.
- no_policy (bool, default False):
If
True(TimescaleDB only), compress existing chunks now without installing an ongoing columnstore (compression) policy. Any pre-existing policy is left untouched. - debug (bool, default False): Verbosity toggle.
Returns
- A
SuccessTupleindicating success, including the amount of disk reclaimed.
538def decompress_pipe( 539 self, 540 pipe: mrsm.Pipe, 541 no_policy: bool = False, 542 debug: bool = False, 543 **kwargs: Any 544) -> SuccessTuple: 545 """ 546 Decompress a pipe's target table, the inverse of `compress_pipe()`. 547 548 For TimescaleDB, removes the columnstore (compression) policy, converts every compressed 549 chunk back to row-store, and disables the columnstore so future synced chunks stay 550 uncompressed. For MySQL/MariaDB and MSSQL, reverts the flavor's native table compression. 551 Other flavors are unsupported. 552 553 Parameters 554 ---------- 555 pipe: mrsm.Pipe 556 The pipe whose target table to decompress. 557 558 no_policy: bool, default False 559 If `True` (TimescaleDB only), decompress existing chunks now but leave the columnstore 560 (compression) policy in place — chunks will be recompressed on the policy's schedule. 561 Useful to temporarily decompress for a bulk backfill without disabling compression. 562 563 debug: bool, default False 564 Verbosity toggle. 565 566 Returns 567 ------- 568 A `SuccessTuple` indicating success, including the change in disk size. 569 """ 570 from meerschaum.utils.sql import sql_item_name 571 from meerschaum.utils.formatting import format_bytes 572 573 if not pipe.exists(debug=debug): 574 return False, f"{pipe} does not exist; nothing to decompress." 575 576 flavor = self.flavor 577 if flavor not in COMPRESSIBLE_FLAVORS: 578 return False, f"Decompression is not supported for flavor '{flavor}'." 579 580 pipe_name = sql_item_name(pipe.target, flavor, self.get_pipe_schema(pipe)) 581 size_before = pipe.get_size(debug=debug) 582 583 ### Each group is run in its own transaction. 584 query_groups: List[List[str]] = [] 585 if flavor in ('timescaledb', 'timescaledb-ha'): 586 if not self._is_hypertable(pipe, debug=debug): 587 return False, _not_a_hypertable_message(pipe) 588 ### 1. Remove the ongoing policy so chunks aren't recompressed. Skipped with `no_policy`, 589 ### which decompresses existing chunks now but leaves the policy (e.g. for a backfill). 590 if not no_policy: 591 query_groups.append([self._get_columnstore_remove_policy_query(pipe)]) 592 ### 2. Convert every compressed chunk back to row-store. `decompress_chunk` is the function 593 ### form (transaction-safe, unlike the `CALL` form of `convert_to_rowstore`). 594 query_groups.append([ 595 f"SELECT decompress_chunk(c, if_compressed => true) " 596 f"FROM show_chunks('{pipe_name}') c" 597 ]) 598 ### 3. Disable the columnstore so future synced chunks stay uncompressed. Only valid once 599 ### no compressed chunks remain, and only sensible when the policy is also gone. 600 if not no_policy: 601 query_groups.append([self._get_columnstore_disable_query(pipe)]) 602 elif flavor in ('mysql', 'mariadb'): 603 query_groups.append([f"ALTER TABLE {pipe_name} ROW_FORMAT=DYNAMIC"]) 604 elif flavor == 'mssql': 605 query_groups.append([ 606 f"ALTER TABLE {pipe_name} REBUILD PARTITION = ALL " 607 "WITH (DATA_COMPRESSION = NONE)" 608 ]) 609 610 try: 611 success = all( 612 all(self.exec_queries( 613 group, break_on_error=True, rollback=True, silent=(not debug), debug=debug, 614 )) 615 for group in query_groups 616 ) 617 except Exception as e: 618 return False, f"Failed to decompress {pipe}:\n{e}" 619 620 if not success: 621 return False, f"Failed to decompress {pipe}." 622 623 pipe._clear_cache_key('_exists', debug=debug) 624 size_after = pipe.get_size(debug=debug) 625 626 change_msg = "" 627 if size_before is not None and size_after is not None: 628 added = size_after - size_before 629 change_str = f"{format_bytes(size_before)} to {format_bytes(size_after)}" 630 if added > 0: 631 change_msg = f"Expanded by {format_bytes(added)} ({change_str})." 632 elif added < 0: 633 change_msg = f"Shrank by {format_bytes(-added)} ({change_str})." 634 else: 635 change_msg = f"Size unchanged ({format_bytes(size_before)})." 636 637 return True, change_msg
Decompress a pipe's target table, the inverse of compress_pipe().
For TimescaleDB, removes the columnstore (compression) policy, converts every compressed chunk back to row-store, and disables the columnstore so future synced chunks stay uncompressed. For MySQL/MariaDB and MSSQL, reverts the flavor's native table compression. Other flavors are unsupported.
Parameters
- pipe (mrsm.Pipe): The pipe whose target table to decompress.
- no_policy (bool, default False):
If
True(TimescaleDB only), decompress existing chunks now but leave the columnstore (compression) policy in place — chunks will be recompressed on the policy's schedule. Useful to temporarily decompress for a bulk backfill without disabling compression. - debug (bool, default False): Verbosity toggle.
Returns
- A
SuccessTupleindicating success, including the change in disk size.
368def apply_compression_policy( 369 self, 370 pipe: mrsm.Pipe, 371 debug: bool = False, 372 **kwargs: Any 373) -> SuccessTuple: 374 """ 375 Idempotently enable compression and install a compression policy for a pipe. 376 377 Intended to be called automatically (e.g. after a sync) when `pipe.compress` is set. 378 Only TimescaleDB hypertables are affected; all other flavors are a no-op success. 379 Failures are non-fatal and never raise. 380 381 Parameters 382 ---------- 383 pipe: mrsm.Pipe 384 The pipe whose target table should have a compression policy. 385 386 Returns 387 ------- 388 A `SuccessTuple` indicating success. 389 """ 390 if self.flavor not in ('timescaledb', 'timescaledb-ha'): 391 return True, "Compression policies are only supported for TimescaleDB." 392 393 if not pipe.parameters.get('compress', False): 394 return True, "Compression is not enabled for this pipe." 395 396 try: 397 if not pipe.exists(debug=debug) or not self._is_hypertable(pipe, debug=debug): 398 return True, f"{pipe} is not a hypertable; skipping compression policy." 399 400 ### Enable the columnstore and add the policy in SEPARATE transactions 401 ### (see timescale/timescaledb#8600). 402 settings_success = all(self.exec_queries( 403 [self._get_columnstore_settings_query(pipe)], 404 break_on_error=True, rollback=True, silent=True, debug=debug, 405 )) 406 policy_success = all(self.exec_queries( 407 [self._get_columnstore_policy_query(pipe)], 408 break_on_error=True, rollback=True, silent=True, debug=debug, 409 )) 410 if not (settings_success and policy_success): 411 return False, f"Failed to apply a compression policy to {pipe}." 412 except Exception as e: 413 msg = f"Failed to apply a compression policy to {pipe}:\n{e}" 414 if debug: 415 dprint(msg) 416 return False, msg 417 418 return True, f"Applied a compression policy to {pipe}."
Idempotently enable compression and install a compression policy for a pipe.
Intended to be called automatically (e.g. after a sync) when pipe.compress is set.
Only TimescaleDB hypertables are affected; all other flavors are a no-op success.
Failures are non-fatal and never raise.
Parameters
- pipe (mrsm.Pipe): The pipe whose target table should have a compression policy.
Returns
- A
SuccessTupleindicating success.
165def vacuum_pipe( 166 self, 167 pipe: mrsm.Pipe, 168 full: bool = False, 169 debug: bool = False, 170 **kwargs: Any 171) -> SuccessTuple: 172 """ 173 Reclaim dead-tuple disk space from a pipe's target table. 174 175 PostgreSQL-family tables run `VACUUM` (optionally `VACUUM FULL`); TimescaleDB hypertables 176 recurse into their chunks; MySQL/MariaDB run `OPTIMIZE TABLE`; MSSQL rebuilds the table; 177 SQLite vacuums the whole database file. 178 179 Parameters 180 ---------- 181 pipe: mrsm.Pipe 182 The pipe whose target table to vacuum. 183 184 full: bool, default False 185 If `True` (PostgreSQL family only), run `VACUUM FULL`, which rewrites the table and 186 returns freed space to the operating system at the cost of an exclusive lock. 187 188 debug: bool, default False 189 Verbosity toggle. 190 191 Returns 192 ------- 193 A `SuccessTuple` indicating success, including the amount of disk reclaimed. 194 """ 195 from meerschaum.utils.sql import sql_item_name 196 from meerschaum.utils.formatting import format_bytes 197 198 if not pipe.exists(debug=debug): 199 return False, f"{pipe} does not exist; nothing to vacuum." 200 201 flavor = self.flavor 202 if flavor not in VACUUMABLE_FLAVORS: 203 return False, f"Vacuuming is not supported for flavor '{flavor}'." 204 205 pipe_name = sql_item_name(pipe.target, flavor, self.get_pipe_schema(pipe)) 206 queries = self._get_vacuum_queries(pipe, pipe_name, full=full) 207 if not queries: 208 return False, f"Vacuuming is not supported for flavor '{flavor}'." 209 210 size_before = pipe.get_size(debug=debug) 211 212 try: 213 if flavor in _AUTOCOMMIT_VACUUM_FLAVORS: 214 success = self._run_in_autocommit(queries, silent=(not debug), debug=debug) 215 else: 216 success = all(self.exec_queries( 217 queries, break_on_error=True, rollback=True, silent=(not debug), debug=debug, 218 )) 219 except Exception as e: 220 return False, f"Failed to vacuum {pipe}:\n{e}" 221 222 if not success: 223 return False, f"Failed to vacuum {pipe}." 224 225 pipe._clear_cache_key('_exists', debug=debug) 226 size_after = pipe.get_size(debug=debug) 227 228 reclaimed_msg = f"Vacuumed {pipe}." 229 if size_before is not None and size_after is not None: 230 reclaimed = size_before - size_after 231 change_str = f"{format_bytes(size_before)} to {format_bytes(size_after)}" 232 if reclaimed > 0: 233 reclaimed_msg = f"Reclaimed {format_bytes(reclaimed)} ({change_str})." 234 elif reclaimed < 0: 235 reclaimed_msg = f"Size grew by {format_bytes(-reclaimed)} ({change_str})." 236 else: 237 reclaimed_msg = f"Size unchanged ({format_bytes(size_before)})." 238 239 return True, reclaimed_msg
Reclaim dead-tuple disk space from a pipe's target table.
PostgreSQL-family tables run VACUUM (optionally VACUUM FULL); TimescaleDB hypertables
recurse into their chunks; MySQL/MariaDB run OPTIMIZE TABLE; MSSQL rebuilds the table;
SQLite vacuums the whole database file.
Parameters
- pipe (mrsm.Pipe): The pipe whose target table to vacuum.
- full (bool, default False):
If
True(PostgreSQL family only), runVACUUM FULL, which rewrites the table and returns freed space to the operating system at the cost of an exclusive lock. - debug (bool, default False): Verbosity toggle.
Returns
- A
SuccessTupleindicating success, including the amount of disk reclaimed.
242def analyze_pipe( 243 self, 244 pipe: mrsm.Pipe, 245 debug: bool = False, 246 **kwargs: Any 247) -> SuccessTuple: 248 """ 249 Refresh the database planner's statistics for a pipe's target table. 250 251 This does not reclaim disk space; it helps the query planner choose better plans after 252 large syncs. PostgreSQL/SQLite run `ANALYZE`, MySQL/MariaDB run `ANALYZE TABLE`, and MSSQL 253 runs `UPDATE STATISTICS`. 254 255 Parameters 256 ---------- 257 pipe: mrsm.Pipe 258 The pipe whose target table to analyze. 259 260 debug: bool, default False 261 Verbosity toggle. 262 263 Returns 264 ------- 265 A `SuccessTuple` indicating success. 266 """ 267 from meerschaum.utils.sql import sql_item_name 268 269 if not pipe.exists(debug=debug): 270 return False, f"{pipe} does not exist; nothing to analyze." 271 272 flavor = self.flavor 273 if flavor not in ANALYZABLE_FLAVORS: 274 return False, f"Analyzing is not supported for flavor '{flavor}'." 275 276 pipe_name = sql_item_name(pipe.target, flavor, self.get_pipe_schema(pipe)) 277 query = self._get_analyze_query(pipe, pipe_name) 278 if not query: 279 return False, f"Analyzing is not supported for flavor '{flavor}'." 280 281 try: 282 success = all(self.exec_queries( 283 [query], break_on_error=True, rollback=True, silent=(not debug), debug=debug, 284 )) 285 except Exception as e: 286 return False, f"Failed to analyze {pipe}:\n{e}" 287 288 if not success: 289 return False, f"Failed to analyze {pipe}." 290 291 return True, f"Analyzed {pipe}."
Refresh the database planner's statistics for a pipe's target table.
This does not reclaim disk space; it helps the query planner choose better plans after
large syncs. PostgreSQL/SQLite run ANALYZE, MySQL/MariaDB run ANALYZE TABLE, and MSSQL
runs UPDATE STATISTICS.
Parameters
- pipe (mrsm.Pipe): The pipe whose target table to analyze.
- debug (bool, default False): Verbosity toggle.
Returns
- A
SuccessTupleindicating success.
192def get_partition_info(self, pipe: mrsm.Pipe, debug: bool = False) -> dict: 193 """ 194 Return a summary of a pipe's target table partitioning for `show partitions`. 195 196 Keys: 197 - `flavor`: the connector flavor. 198 - `partitioned`: whether the table is range-partitioned (native) or a TimescaleDB hypertable. 199 - `count`: the number of partitions / chunks (`None` if unknown). 200 - `interval`: the physical partition width (`timedelta`, epoch-`int`, or `None`). 201 """ 202 info = {'flavor': self.flavor, 'partitioned': False, 'count': None, 'interval': None} 203 try: 204 if not pipe.exists(debug=debug): 205 return info 206 except Exception: 207 return info 208 209 flavor = self.flavor 210 if flavor in _TIMESCALEDB_FLAVORS: 211 if not self._is_hypertable(pipe, debug=debug): 212 return info 213 info['partitioned'] = True 214 info['count'] = self._get_chunk_count_timescaledb(pipe, debug=debug) 215 info['interval'] = pipe.get_chunk_interval(debug=debug) 216 return info 217 218 if not self._should_partition(pipe): 219 return info 220 ### Report based on the table's ACTUAL state, not just the `hypertable` flag — a pre-existing 221 ### plain table (created before partitioning, or with `hypertable` only just enabled) has no 222 ### partitions and should not be reported as partitioned. 223 count = self._get_partition_count(pipe, debug=debug) 224 if not count: 225 return info 226 info['partitioned'] = True 227 info['count'] = count 228 info['interval'] = pipe.get_chunk_interval(debug=debug) 229 return info
Return a summary of a pipe's target table partitioning for show partitions.
Keys:
flavor: the connector flavor.partitioned: whether the table is range-partitioned (native) or a TimescaleDB hypertable.count: the number of partitions / chunks (Noneif unknown).interval: the physical partition width (timedelta, epoch-int, orNone).
796def partition_pipe( 797 self, 798 pipe: mrsm.Pipe, 799 chunk_minutes: Optional[int] = None, 800 debug: bool = False, 801 **kwargs: Any 802) -> SuccessTuple: 803 """ 804 Rebuild a pipe's target table to a new partition (chunk) width. 805 806 The width is taken from `chunk_minutes` if provided, else the pipe's configured 807 `verify.chunk_minutes`. The new width is persisted to `verify.chunk_minutes`, which is the 808 authoritative partition width (see `Pipe.get_chunk_interval`). 809 810 Strategy by flavor: 811 812 - **TimescaleDB**: call `set_chunk_time_interval()`. This changes the width of FUTURE chunks 813 only; existing chunks are not rewritten. 814 - **PostgreSQL / PostGIS, MySQL / MariaDB, MSSQL**: rebuild the table by reading its data, 815 dropping it, and re-syncing at the new width. This reuses the tested `create_pipe_table_from_df` 816 and `_create_missing_partitions` paths, and (for MSSQL) frees the partition function/scheme 817 names so they can be recreated. The whole table is read into memory; for very large tables 818 consider a manual chunked rebuild. 819 820 Parameters 821 ---------- 822 pipe: mrsm.Pipe 823 The partitioned pipe whose target table to repartition. 824 825 chunk_minutes: Optional[int], default None 826 The new partition width in minutes. Defaults to the pipe's `verify.chunk_minutes`. 827 828 debug: bool, default False 829 Verbosity toggle. 830 831 Returns 832 ------- 833 A `SuccessTuple` indicating success. 834 """ 835 from meerschaum.config import get_config 836 from meerschaum.utils.warnings import warn 837 838 flavor = self.flavor 839 if flavor not in (PARTITIONABLE_FLAVORS | _TIMESCALEDB_FLAVORS): 840 return False, f"Repartitioning is not supported for flavor '{flavor}'." 841 842 is_timescaledb = flavor in _TIMESCALEDB_FLAVORS 843 if not is_timescaledb and not self._should_partition(pipe): 844 return False, ( 845 f"{pipe} is not partitioned. Set `hypertable` to `True` (and define a `datetime` " 846 "column) to enable native range partitioning." 847 ) 848 849 if pipe.columns.get('datetime', None) is None: 850 return False, f"{pipe} has no `datetime` column to partition by." 851 852 if not pipe.exists(debug=debug): 853 return False, f"{pipe} does not exist; nothing to repartition." 854 855 new_minutes = ( 856 chunk_minutes 857 if chunk_minutes is not None 858 else ( 859 pipe.parameters.get('verify', {}).get('chunk_minutes', None) 860 or get_config('pipes', 'parameters', 'verify', 'chunk_minutes') 861 ) 862 ) 863 if not isinstance(new_minutes, int) or new_minutes <= 0: 864 return False, f"Invalid chunk interval '{new_minutes}'; must be a positive integer of minutes." 865 866 ### TimescaleDB: native, no rewrite. Future chunks adopt the new interval. 867 if is_timescaledb: 868 from meerschaum.utils.sql import sql_item_name 869 ### `set_chunk_time_interval` takes the hypertable as a `regclass`; pass the 870 ### schema-qualified, quoted name as a string literal so it resolves unambiguously. 871 pipe_name = sql_item_name(pipe.target, flavor, self.get_pipe_schema(pipe)) 872 regclass_literal = "'" + pipe_name.replace("'", "''") + "'" 873 interval = pipe.get_chunk_interval(new_minutes, debug=debug) 874 chunk_time_interval = ( 875 f"{interval}" 876 if isinstance(interval, int) 877 else f"INTERVAL '{int(interval.total_seconds() / 60)} MINUTES'" 878 ) 879 query = f"SELECT set_chunk_time_interval({regclass_literal}, {chunk_time_interval})" 880 try: 881 success = self.exec(query, silent=(not debug), debug=debug) is not None 882 except Exception as e: 883 return False, f"Failed to set chunk interval for {pipe}:\n{e}" 884 if not success: 885 return False, f"Failed to set chunk interval for {pipe}." 886 pipe.update_parameters( 887 {'verify': {'chunk_minutes': new_minutes}}, persist=True, debug=debug 888 ) 889 return True, ( 890 f"Set chunk interval for {pipe} to {new_minutes} minutes " 891 "(applies to future chunks; existing chunks are unchanged)." 892 ) 893 894 ### Non-TimescaleDB: rebuild via a drop + re-sync round-trip. 895 current_interval = pipe.get_chunk_interval(debug=debug) 896 new_interval = pipe.get_chunk_interval(new_minutes, debug=debug) 897 if current_interval == new_interval: 898 return True, f"{pipe} is already partitioned at {new_minutes} minutes." 899 900 rowcount_before = pipe.get_rowcount(debug=debug) 901 902 if debug: 903 dprint(f"[{self}] Reading {pipe} data to rebuild partitions at {new_minutes} minutes.") 904 df = pipe.get_data(debug=debug) 905 if df is None: 906 return False, f"Could not read data for {pipe}; aborting repartition." 907 908 ### Persist the new width BEFORE recreating so the rebuild lays partitions at the new size. 909 ### `verify.chunk_minutes` is the authoritative partition width. 910 update_success, update_msg = pipe.update_parameters( 911 {'verify': {'chunk_minutes': new_minutes}}, 912 persist=True, 913 debug=debug, 914 ) 915 if not update_success: 916 return False, f"Failed to persist new partition width for {pipe}:\n{update_msg}" 917 918 drop_success, drop_msg = pipe.drop(debug=debug) 919 if not drop_success: 920 return False, f"Failed to drop {pipe} during repartition:\n{drop_msg}" 921 922 ### Re-sync the data we read; `create_pipe_table_from_df` recreates the table at the new 923 ### width and `_create_missing_partitions` populates the partitions. 924 sync_success, sync_msg = pipe.sync(df, debug=debug) 925 if not sync_success: 926 return False, ( 927 f"Repartition of {pipe} failed during re-sync; the table was dropped and must be " 928 f"resynced from its source:\n{sync_msg}" 929 ) 930 931 rowcount_after = pipe.get_rowcount(debug=debug) 932 if ( 933 rowcount_before is not None 934 and rowcount_after is not None 935 and rowcount_after != rowcount_before 936 ): 937 warn( 938 f"Row count changed during repartition of {pipe} " 939 f"({rowcount_before} -> {rowcount_after}).", 940 stack=False, 941 ) 942 943 return True, f"Repartitioned {pipe} to {new_minutes} minutes."
Rebuild a pipe's target table to a new partition (chunk) width.
The width is taken from chunk_minutes if provided, else the pipe's configured
verify.chunk_minutes. The new width is persisted to verify.chunk_minutes, which is the
authoritative partition width (see Pipe.get_chunk_interval).
Strategy by flavor:
- TimescaleDB: call
set_chunk_time_interval(). This changes the width of FUTURE chunks only; existing chunks are not rewritten. - PostgreSQL / PostGIS, MySQL / MariaDB, MSSQL: rebuild the table by reading its data,
dropping it, and re-syncing at the new width. This reuses the tested
create_pipe_table_from_dfand_create_missing_partitionspaths, and (for MSSQL) frees the partition function/scheme names so they can be recreated. The whole table is read into memory; for very large tables consider a manual chunked rebuild.
Parameters
- pipe (mrsm.Pipe): The partitioned pipe whose target table to repartition.
- chunk_minutes (Optional[int], default None):
The new partition width in minutes. Defaults to the pipe's
verify.chunk_minutes. - debug (bool, default False): Verbosity toggle.
Returns
- A
SuccessTupleindicating success.
144def fetch_pipes_keys( 145 self, 146 connector_keys: Optional[List[str]] = None, 147 metric_keys: Optional[List[str]] = None, 148 location_keys: Optional[List[str]] = None, 149 tags: Optional[List[str]] = None, 150 params: Optional[Dict[str, Any]] = None, 151 debug: bool = False, 152) -> Dict[ 153 int, Tuple[str, str, Union[str, None], Dict[str, Any]] 154 ]: 155 """ 156 Return a dictionary mapping pipe IDs to key tuples corresponding to the parameters provided. 157 158 Parameters 159 ---------- 160 connector_keys: Optional[List[str]], default None 161 List of connector_keys to search by. 162 163 metric_keys: Optional[List[str]], default None 164 List of metric_keys to search by. 165 166 location_keys: Optional[List[str]], default None 167 List of location_keys to search by. 168 169 tags: Optional[List[str]], default None 170 List of pipes to search by. 171 172 params: Optional[Dict[str, Any]], default None 173 Dictionary of additional parameters to search by. 174 E.g. `--params pipe_id:1` 175 176 debug: bool, default False 177 Verbosity toggle. 178 179 Returns 180 ------- 181 A list of tuples of pipes' keys and parameters (connector_keys, metric_key, location_key, parameters). 182 """ 183 from meerschaum.utils.packages import attempt_import 184 from meerschaum.utils.misc import separate_negation_values 185 from meerschaum.utils.sql import ( 186 OMIT_NULLSFIRST_FLAVORS, 187 table_exists, 188 json_flavors, 189 ) 190 from meerschaum._internal.static import STATIC_CONFIG 191 import json 192 from copy import deepcopy 193 sqlalchemy, sqlalchemy_sql_functions = attempt_import( 194 'sqlalchemy', 195 'sqlalchemy.sql.functions', lazy=False, 196 ) 197 coalesce = sqlalchemy_sql_functions.coalesce 198 199 if connector_keys is None: 200 connector_keys = [] 201 if metric_keys is None: 202 metric_keys = [] 203 if location_keys is None: 204 location_keys = [] 205 else: 206 location_keys = [ 207 ( 208 lk 209 if lk not in ('[None]', 'None', 'null') 210 else 'None' 211 ) 212 for lk in location_keys 213 ] 214 if tags is None: 215 tags = [] 216 217 if params is None: 218 params = {} 219 220 ### Add three primary keys to params dictionary 221 ### (separated for convenience of arguments). 222 cols = { 223 'connector_keys': [str(ck) for ck in connector_keys], 224 'metric_key': [str(mk) for mk in metric_keys], 225 'location_key': [str(lk) for lk in location_keys], 226 } 227 228 ### Make deep copy so we don't mutate this somewhere else. 229 parameters = deepcopy(params) 230 for col, vals in cols.items(): 231 if vals not in [[], ['*']]: 232 parameters[col] = vals 233 234 if not table_exists('mrsm_pipes', self, schema=self.instance_schema, debug=debug): 235 return {} 236 237 from meerschaum.connectors.sql.tables import get_tables 238 pipes_tbl = get_tables(mrsm_instance=self, create=False, debug=debug)['pipes'] 239 240 _params = {} 241 for k, v in parameters.items(): 242 _v = json.dumps(v) if isinstance(v, dict) else v 243 _params[k] = _v 244 245 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 246 ### Parse regular params. 247 ### If a param begins with '_', negate it instead. 248 _where = [ 249 ( 250 (coalesce(pipes_tbl.c[key], 'None') == val) 251 if not str(val).startswith(negation_prefix) 252 else (pipes_tbl.c[key] != key) 253 ) for key, val in _params.items() 254 if not isinstance(val, (list, tuple)) and key in pipes_tbl.c 255 ] 256 if self.flavor in json_flavors: 257 sqlalchemy_dialects = mrsm.attempt_import('sqlalchemy.dialects', lazy=False) 258 JSONB = sqlalchemy_dialects.postgresql.JSONB 259 else: 260 JSONB = sqlalchemy.String 261 262 select_cols = ( 263 [ 264 pipes_tbl.c.pipe_id, 265 pipes_tbl.c.connector_keys, 266 pipes_tbl.c.metric_key, 267 pipes_tbl.c.location_key, 268 pipes_tbl.c.parameters, 269 ] 270 ) 271 272 q = sqlalchemy.select(*select_cols).where(sqlalchemy.and_(True, *_where)) 273 for c, vals in cols.items(): 274 if not isinstance(vals, (list, tuple)) or not vals or c not in pipes_tbl.c: 275 continue 276 _in_vals, _ex_vals = separate_negation_values(vals) 277 q = q.where(coalesce(pipes_tbl.c[c], 'None').in_(_in_vals)) if _in_vals else q 278 q = q.where(coalesce(pipes_tbl.c[c], 'None').not_in(_ex_vals)) if _ex_vals else q 279 280 ### Finally, parse tags. 281 tag_groups = [tag.split(',') for tag in tags] 282 in_ex_tag_groups = [separate_negation_values(tag_group) for tag_group in tag_groups] 283 284 ors, nands = [], [] 285 if self.flavor in json_flavors: 286 tags_jsonb = pipes_tbl.c['parameters'].cast(JSONB).op('->')('tags').cast(JSONB) 287 for _in_tags, _ex_tags in in_ex_tag_groups: 288 if _in_tags: 289 ors.append( 290 sqlalchemy.and_( 291 tags_jsonb.contains(_in_tags) 292 ) 293 ) 294 for xt in _ex_tags: 295 nands.append( 296 sqlalchemy.not_( 297 sqlalchemy.and_( 298 tags_jsonb.contains([xt]) 299 ) 300 ) 301 ) 302 else: 303 for _in_tags, _ex_tags in in_ex_tag_groups: 304 sub_ands = [] 305 for nt in _in_tags: 306 sub_ands.append( 307 sqlalchemy.cast( 308 pipes_tbl.c['parameters'], 309 sqlalchemy.String, 310 ).like(f'%"tags":%"{nt}"%') 311 ) 312 if sub_ands: 313 ors.append(sqlalchemy.and_(*sub_ands)) 314 315 for xt in _ex_tags: 316 nands.append( 317 sqlalchemy.cast( 318 pipes_tbl.c['parameters'], 319 sqlalchemy.String, 320 ).not_like(f'%"tags":%"{xt}"%') 321 ) 322 323 q = q.where(sqlalchemy.and_(*nands)) if nands else q 324 q = q.where(sqlalchemy.or_(*ors)) if ors else q 325 loc_asc = sqlalchemy.asc(pipes_tbl.c['location_key']) 326 if self.flavor not in OMIT_NULLSFIRST_FLAVORS: 327 loc_asc = sqlalchemy.nullsfirst(loc_asc) 328 q = q.order_by( 329 sqlalchemy.asc(pipes_tbl.c['connector_keys']), 330 sqlalchemy.asc(pipes_tbl.c['metric_key']), 331 loc_asc, 332 ) 333 334 ### execute the query and return a list of tuples 335 if debug: 336 dprint(q) 337 try: 338 rows = ( 339 self.execute(q).fetchall() 340 if self.flavor != 'duckdb' 341 else [ 342 ( 343 row['pipe_id'], 344 row['connector_keys'], 345 row['metric_key'], 346 row['location_key'], 347 row['parameters'], 348 ) 349 for row in self.read(q).to_dict(orient='records') 350 ] 351 ) 352 except Exception as e: 353 error(str(e)) 354 355 return { 356 row[0]: row[1:] 357 for row in rows 358 }
Return a dictionary mapping pipe IDs to key tuples corresponding to the parameters provided.
Parameters
- connector_keys (Optional[List[str]], default None): List of connector_keys to search by.
- metric_keys (Optional[List[str]], default None): List of metric_keys to search by.
- location_keys (Optional[List[str]], default None): List of location_keys to search by.
- tags (Optional[List[str]], default None): List of pipes to search by.
- params (Optional[Dict[str, Any]], default None):
Dictionary of additional parameters to search by.
E.g.
--params pipe_id:1 - debug (bool, default False): Verbosity toggle.
Returns
- A list of tuples of pipes' keys and parameters (connector_keys, metric_key, location_key, parameters).
379def create_indices( 380 self, 381 pipe: mrsm.Pipe, 382 columns: Optional[List[str]] = None, 383 indices: Optional[List[str]] = None, 384 debug: bool = False 385) -> bool: 386 """ 387 Create a pipe's indices. 388 """ 389 if pipe.__dict__.get('_skip_check_indices', False): 390 return True 391 392 if debug: 393 dprint(f"Creating indices for {pipe}...") 394 395 if not pipe.indices: 396 warn(f"{pipe} has no index columns; skipping index creation.", stack=False) 397 return True 398 399 cols_to_include = set((columns or []) + (indices or [])) or None 400 401 pipe._clear_cache_key('_columns_indices', debug=debug) 402 ix_queries = { 403 col: queries 404 for col, queries in self.get_create_index_queries(pipe, debug=debug).items() 405 if cols_to_include is None or col in cols_to_include 406 } 407 success = True 408 for col, queries in ix_queries.items(): 409 ix_success = all(self.exec_queries(queries, debug=debug, silent=False)) 410 success = success and ix_success 411 if not ix_success: 412 warn(f"Failed to create index on column: {col}") 413 414 return success
Create a pipe's indices.
435def drop_indices( 436 self, 437 pipe: mrsm.Pipe, 438 columns: Optional[List[str]] = None, 439 indices: Optional[List[str]] = None, 440 debug: bool = False 441) -> bool: 442 """ 443 Drop a pipe's indices. 444 """ 445 if debug: 446 dprint(f"Dropping indices for {pipe}...") 447 448 if not pipe.indices: 449 warn(f"No indices to drop for {pipe}.", stack=False) 450 return False 451 452 cols_to_include = set((columns or []) + (indices or [])) or None 453 454 ix_queries = { 455 col: queries 456 for col, queries in self.get_drop_index_queries(pipe, debug=debug).items() 457 if cols_to_include is None or col in cols_to_include 458 } 459 success = True 460 for col, queries in ix_queries.items(): 461 ix_success = all(self.exec_queries(queries, debug=debug, silent=(not debug))) 462 if not ix_success: 463 success = False 464 if debug: 465 dprint(f"Failed to drop index on column: {col}") 466 return success
Drop a pipe's indices.
532def get_create_index_queries( 533 self, 534 pipe: mrsm.Pipe, 535 debug: bool = False, 536) -> Dict[str, List[str]]: 537 """ 538 Return a dictionary mapping columns to a `CREATE INDEX` or equivalent query. 539 540 Parameters 541 ---------- 542 pipe: mrsm.Pipe 543 The pipe to which the queries will correspond. 544 545 Returns 546 ------- 547 A dictionary of index names mapping to lists of queries. 548 """ 549 ### NOTE: Due to recent breaking changes in DuckDB, indices don't behave properly. 550 if self.flavor == 'duckdb': 551 return {} 552 from meerschaum.utils.sql import ( 553 sql_item_name, 554 get_distinct_col_count, 555 UPDATE_QUERIES, 556 get_null_replacement, 557 get_create_table_queries, 558 get_rename_table_queries, 559 COALESCE_UNIQUE_INDEX_FLAVORS, 560 ) 561 from meerschaum.utils.dtypes import are_dtypes_equal 562 from meerschaum.utils.dtypes.sql import ( 563 get_db_type_from_pd_type, 564 get_pd_type_from_db_type, 565 AUTO_INCREMENT_COLUMN_FLAVORS, 566 ) 567 from meerschaum.config import get_config 568 index_queries = {} 569 570 upsert = pipe.parameters.get('upsert', False) and (self.flavor + '-upsert') in UPDATE_QUERIES 571 static = pipe.parameters.get('static', False) 572 null_indices = pipe.parameters.get('null_indices', True) 573 index_names = pipe.get_indices() 574 unique_index_name_unquoted = index_names.get('unique', None) or f'IX_{pipe.target}_unique' 575 if upsert: 576 _ = index_names.pop('unique', None) 577 indices = pipe.indices 578 existing_cols_types = pipe.get_columns_types(debug=debug) 579 existing_cols_pd_types = { 580 col: get_pd_type_from_db_type(typ) 581 for col, typ in existing_cols_types.items() 582 } 583 existing_cols_indices = self.get_pipe_columns_indices(pipe, debug=debug) 584 existing_ix_names = set() 585 existing_primary_keys = [] 586 existing_clustered_primary_keys = [] 587 for col, col_indices in existing_cols_indices.items(): 588 for col_ix_doc in col_indices: 589 existing_ix_names.add(col_ix_doc.get('name', '').lower()) 590 if col_ix_doc.get('type', None) == 'PRIMARY KEY': 591 existing_primary_keys.append(col.lower()) 592 if col_ix_doc.get('clustered', True): 593 existing_clustered_primary_keys.append(col.lower()) 594 595 _datetime = pipe.get_columns('datetime', error=False) 596 _datetime_name = ( 597 sql_item_name(_datetime, self.flavor, None) 598 if _datetime is not None else None 599 ) 600 _datetime_index_name = ( 601 sql_item_name(index_names['datetime'], flavor=self.flavor, schema=None) 602 if index_names.get('datetime', None) 603 else None 604 ) 605 _id = pipe.get_columns('id', error=False) 606 _id_name = ( 607 sql_item_name(_id, self.flavor, None) 608 if _id is not None 609 else None 610 ) 611 primary_key = pipe.columns.get('primary', None) 612 primary_key_name = ( 613 sql_item_name(primary_key, flavor=self.flavor, schema=None) 614 if primary_key 615 else None 616 ) 617 autoincrement = ( 618 pipe.parameters.get('autoincrement', False) 619 or ( 620 primary_key is not None 621 and primary_key not in existing_cols_pd_types 622 ) 623 ) 624 primary_key_db_type = ( 625 get_db_type_from_pd_type(pipe.dtypes.get(primary_key, 'int') or 'int', self.flavor) 626 if primary_key 627 else None 628 ) 629 primary_key_constraint_name = ( 630 sql_item_name(f'PK_{pipe.target}', self.flavor, None) 631 if primary_key is not None 632 else None 633 ) 634 primary_key_clustered = "CLUSTERED" if _datetime is None else "NONCLUSTERED" 635 datetime_clustered = ( 636 "CLUSTERED" 637 if not existing_clustered_primary_keys and _datetime is not None 638 else "NONCLUSTERED" 639 ) 640 include_columns_str = "\n ,".join( 641 [ 642 sql_item_name(col, flavor=self.flavor) for col in existing_cols_types 643 if col != _datetime 644 ] 645 ).rstrip(',') 646 include_clause = ( 647 ( 648 f"\nINCLUDE (\n {include_columns_str}\n)" 649 ) 650 if datetime_clustered == 'NONCLUSTERED' 651 else '' 652 ) 653 654 _id_index_name = ( 655 sql_item_name(index_names['id'], self.flavor, None) 656 if index_names.get('id', None) 657 else None 658 ) 659 _pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 660 _create_space_partition = get_config('system', 'experimental', 'space') 661 662 ### create datetime index 663 dt_query = None 664 if _datetime is not None: 665 if ( 666 self.flavor in ('timescaledb', 'timescaledb-ha') 667 and pipe.parameters.get('hypertable', True) 668 ): 669 _id_count = ( 670 get_distinct_col_count(_id, f"SELECT {_id_name} FROM {_pipe_name}", self) 671 if (_id is not None and _create_space_partition) else None 672 ) 673 674 chunk_interval = pipe.get_chunk_interval(debug=debug) 675 chunk_interval_minutes = ( 676 chunk_interval 677 if isinstance(chunk_interval, int) 678 else int(chunk_interval.total_seconds() / 60) 679 ) 680 chunk_time_interval = ( 681 f"INTERVAL '{chunk_interval_minutes} MINUTES'" 682 if isinstance(chunk_interval, timedelta) 683 else f'{chunk_interval_minutes}' 684 ) 685 686 dt_query = ( 687 f"SELECT public.create_hypertable('{_pipe_name}', " + 688 f"'{_datetime}', " 689 + ( 690 f"'{_id}', {_id_count}, " if (_id is not None and _create_space_partition) 691 else '' 692 ) 693 + f'chunk_time_interval => {chunk_time_interval}, ' 694 + 'if_not_exists => true, ' 695 + "migrate_data => true);" 696 ) 697 elif _datetime_index_name and _datetime != primary_key: 698 if self.flavor == 'mssql': 699 dt_query = ( 700 f"CREATE {datetime_clustered} INDEX {_datetime_index_name} " 701 f"\nON {_pipe_name} ({_datetime_name}){include_clause}" 702 ) 703 else: 704 dt_query = ( 705 f"CREATE INDEX {_datetime_index_name} " 706 + f"ON {_pipe_name} ({_datetime_name})" 707 ) 708 709 if dt_query: 710 index_queries[_datetime] = [dt_query] 711 712 primary_queries = [] 713 if ( 714 primary_key is not None 715 and primary_key.lower() not in existing_primary_keys 716 and not static 717 ): 718 if autoincrement and primary_key not in existing_cols_pd_types: 719 autoincrement_str = AUTO_INCREMENT_COLUMN_FLAVORS.get( 720 self.flavor, 721 AUTO_INCREMENT_COLUMN_FLAVORS['default'] 722 ) 723 primary_queries.extend([ 724 ( 725 f"ALTER TABLE {_pipe_name}\n" 726 f"ADD {primary_key_name} {primary_key_db_type} {autoincrement_str}" 727 ), 728 ]) 729 elif not autoincrement and primary_key in existing_cols_pd_types: 730 if self.flavor in ('sqlite', 'geopackage'): 731 new_table_name = sql_item_name( 732 f'_new_{pipe.target}', 733 self.flavor, 734 self.get_pipe_schema(pipe) 735 ) 736 select_cols_str = ', '.join( 737 [ 738 sql_item_name(col, self.flavor, None) 739 for col in existing_cols_types 740 ] 741 ) 742 primary_queries.extend( 743 get_create_table_queries( 744 existing_cols_pd_types, 745 f'_new_{pipe.target}', 746 self.flavor, 747 schema=self.get_pipe_schema(pipe), 748 primary_key=primary_key, 749 ) + [ 750 ( 751 f"INSERT INTO {new_table_name} ({select_cols_str})\n" 752 f"SELECT {select_cols_str}\nFROM {_pipe_name}" 753 ), 754 f"DROP TABLE {_pipe_name}", 755 ] + get_rename_table_queries( 756 f'_new_{pipe.target}', 757 pipe.target, 758 self.flavor, 759 schema=self.get_pipe_schema(pipe), 760 ) 761 ) 762 elif self.flavor == 'oracle': 763 primary_queries.extend([ 764 ( 765 f"ALTER TABLE {_pipe_name}\n" 766 f"MODIFY {primary_key_name} NOT NULL" 767 ), 768 ( 769 f"ALTER TABLE {_pipe_name}\n" 770 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})" 771 ) 772 ]) 773 elif self.flavor in ('mysql', 'mariadb'): 774 primary_queries.extend([ 775 ( 776 f"ALTER TABLE {_pipe_name}\n" 777 f"MODIFY {primary_key_name} {primary_key_db_type} NOT NULL" 778 ), 779 ( 780 f"ALTER TABLE {_pipe_name}\n" 781 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})" 782 ) 783 ]) 784 elif self.flavor in ('timescaledb', 'timescaledb-ha'): 785 primary_queries.extend([ 786 ( 787 f"ALTER TABLE {_pipe_name}\n" 788 f"ALTER COLUMN {primary_key_name} SET NOT NULL" 789 ), 790 ( 791 f"ALTER TABLE {_pipe_name}\n" 792 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY (" + ( 793 f"{_datetime_name}, " if _datetime_name else "" 794 ) + f"{primary_key_name})" 795 ), 796 ]) 797 elif self.flavor in ('citus', 'postgresql', 'duckdb', 'postgis'): 798 primary_queries.extend([ 799 ( 800 f"ALTER TABLE {_pipe_name}\n" 801 f"ALTER COLUMN {primary_key_name} SET NOT NULL" 802 ), 803 ( 804 f"ALTER TABLE {_pipe_name}\n" 805 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})" 806 ), 807 ]) 808 else: 809 primary_queries.extend([ 810 ( 811 f"ALTER TABLE {_pipe_name}\n" 812 f"ALTER COLUMN {primary_key_name} {primary_key_db_type} NOT NULL" 813 ), 814 ( 815 f"ALTER TABLE {_pipe_name}\n" 816 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY {primary_key_clustered} ({primary_key_name})" 817 ), 818 ]) 819 index_queries[primary_key] = primary_queries 820 821 ### create id index 822 if _id_name is not None: 823 if self.flavor in ('timescaledb', 'timescaledb-ha'): 824 ### Already created indices via create_hypertable. 825 id_query = ( 826 None if (_id is not None and _create_space_partition) 827 else ( 828 f"CREATE INDEX IF NOT EXISTS {_id_index_name} ON {_pipe_name} ({_id_name})" 829 if _id is not None 830 else None 831 ) 832 ) 833 pass 834 else: ### mssql, sqlite, etc. 835 id_query = ( 836 None 837 if _is_non_indexable_col(_id, existing_cols_types, self.flavor) 838 else f"CREATE INDEX {_id_index_name} ON {_pipe_name} ({_id_name})" 839 ) 840 841 if id_query is not None: 842 index_queries[_id] = id_query if isinstance(id_query, list) else [id_query] 843 844 ### Create indices for other labels in `pipe.columns`. 845 other_index_names = { 846 ix_key: ix_unquoted 847 for ix_key, ix_unquoted in index_names.items() 848 if ( 849 ix_key not in ('datetime', 'id', 'primary') 850 and ix_unquoted.lower() not in existing_ix_names 851 ) 852 } 853 for ix_key, ix_unquoted in other_index_names.items(): 854 ix_name = sql_item_name(ix_unquoted, self.flavor, None) 855 cols = indices[ix_key] 856 if not isinstance(cols, (list, tuple)): 857 cols = [cols] 858 if ix_key == 'unique' and upsert: 859 continue 860 if self.flavor in ('mysql', 'mariadb', 'mssql'): 861 cols = [ 862 col for col in cols 863 if col and not _is_non_indexable_col( 864 col, existing_cols_types, self.flavor 865 ) 866 ] 867 cols_names = [sql_item_name(col, self.flavor, None) for col in cols if col] 868 if not cols_names: 869 continue 870 871 cols_names_str = ", ".join(cols_names) 872 index_query_params_clause = f" ({cols_names_str})" 873 if self.flavor in ('postgis', 'timescaledb-ha'): 874 for col in cols: 875 col_typ = existing_cols_pd_types.get(cols[0], 'object') 876 if col_typ != 'object' and are_dtypes_equal(col_typ, 'geometry'): 877 index_query_params_clause = f" USING GIST ({cols_names_str})" 878 break 879 880 index_queries[ix_key] = [ 881 f"CREATE INDEX {ix_name} ON {_pipe_name}{index_query_params_clause}" 882 ] 883 884 indices_cols_str = ', '.join( 885 list({ 886 sql_item_name(ix, self.flavor) 887 for ix_key, ix in pipe.columns.items() 888 if ix and ix in existing_cols_types 889 }) 890 ) 891 coalesce_indices_cols_str = ', '.join( 892 [ 893 ( 894 ( 895 "COALESCE(" 896 + sql_item_name(ix, self.flavor) 897 + ", " 898 + get_null_replacement(existing_cols_types[ix], self.flavor) 899 + ") " 900 ) 901 if ix_key != 'datetime' and null_indices 902 else sql_item_name(ix, self.flavor) 903 ) 904 for ix_key, ix in pipe.columns.items() 905 if ix and ix in existing_cols_types 906 ] 907 ) 908 unique_index_name = sql_item_name(unique_index_name_unquoted, self.flavor) 909 constraint_name_unquoted = unique_index_name_unquoted.replace('IX_', 'UQ_') 910 constraint_name = sql_item_name(constraint_name_unquoted, self.flavor) 911 add_constraint_query = ( 912 f"ALTER TABLE {_pipe_name} ADD CONSTRAINT {constraint_name} UNIQUE ({indices_cols_str})" 913 ) 914 unique_index_cols_str = ( 915 indices_cols_str 916 if self.flavor not in COALESCE_UNIQUE_INDEX_FLAVORS or not null_indices 917 else coalesce_indices_cols_str 918 ) 919 create_unique_index_query = ( 920 f"CREATE UNIQUE INDEX {unique_index_name} ON {_pipe_name} ({unique_index_cols_str})" 921 ) 922 constraint_queries = [create_unique_index_query] 923 if self.flavor not in ('sqlite', 'geopackage'): 924 constraint_queries.append(add_constraint_query) 925 if upsert and indices_cols_str and unique_index_name_unquoted.lower() not in existing_ix_names: 926 index_queries[unique_index_name] = constraint_queries 927 ### Remove regular indices that cover the same single column as the unique index. 928 ### Some flavors (e.g. Oracle) reject two indices on the same column combination. 929 if unique_index_cols_str == _id_name: 930 index_queries.pop(_id, None) 931 if unique_index_cols_str == _datetime_name: 932 index_queries.pop(_datetime, None) 933 return index_queries
Return a dictionary mapping columns to a CREATE INDEX or equivalent query.
Parameters
- pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
- A dictionary of index names mapping to lists of queries.
936def get_drop_index_queries( 937 self, 938 pipe: mrsm.Pipe, 939 debug: bool = False, 940) -> Dict[str, List[str]]: 941 """ 942 Return a dictionary mapping columns to a `DROP INDEX` or equivalent query. 943 944 Parameters 945 ---------- 946 pipe: mrsm.Pipe 947 The pipe to which the queries will correspond. 948 949 Returns 950 ------- 951 A dictionary of column names mapping to lists of queries. 952 """ 953 ### NOTE: Due to breaking changes within DuckDB, indices must be skipped. 954 if self.flavor == 'duckdb': 955 return {} 956 if not pipe.exists(debug=debug): 957 return {} 958 959 from collections import defaultdict 960 from meerschaum.utils.sql import ( 961 sql_item_name, 962 table_exists, 963 hypertable_queries, 964 DROP_INDEX_IF_EXISTS_FLAVORS, 965 ) 966 drop_queries = defaultdict(lambda: []) 967 schema = self.get_pipe_schema(pipe) 968 index_schema = schema if self.flavor != 'mssql' else None 969 indices = { 970 ix_key: ix 971 for ix_key, ix in pipe.get_indices().items() 972 } 973 cols_indices = pipe.get_columns_indices(debug=debug) 974 existing_indices = set() 975 clustered_ix = None 976 for col, ix_metas in cols_indices.items(): 977 for ix_meta in ix_metas: 978 ix_name = ix_meta.get('name', None) 979 if ix_meta.get('clustered', False): 980 clustered_ix = ix_name 981 existing_indices.add(ix_name.lower()) 982 pipe_name = sql_item_name(pipe.target, self.flavor, schema) 983 pipe_name_no_schema = sql_item_name(pipe.target, self.flavor, None) 984 upsert = pipe.upsert 985 986 if self.flavor not in hypertable_queries: 987 is_hypertable = False 988 else: 989 is_hypertable_query = hypertable_queries[self.flavor].format(table_name=pipe_name) 990 is_hypertable = self.value(is_hypertable_query, silent=True, debug=debug) is not None 991 992 if_exists_str = "IF EXISTS " if self.flavor in DROP_INDEX_IF_EXISTS_FLAVORS else "" 993 if is_hypertable: 994 nuke_queries = [] 995 temp_table = '_' + pipe.target + '_temp_migration' 996 temp_table_name = sql_item_name(temp_table, self.flavor, self.get_pipe_schema(pipe)) 997 998 if table_exists(temp_table, self, schema=self.get_pipe_schema(pipe), debug=debug): 999 nuke_queries.append(f"DROP TABLE {if_exists_str} {temp_table_name}") 1000 nuke_queries += [ 1001 f"SELECT * INTO {temp_table_name} FROM {pipe_name}", 1002 f"DROP TABLE {if_exists_str}{pipe_name}", 1003 f"ALTER TABLE {temp_table_name} RENAME TO {pipe_name_no_schema}", 1004 ] 1005 nuke_ix_keys = ('datetime', 'id') 1006 nuked = False 1007 for ix_key in nuke_ix_keys: 1008 if ix_key in indices and not nuked: 1009 drop_queries[ix_key].extend(nuke_queries) 1010 nuked = True 1011 1012 for ix_key, ix_unquoted in indices.items(): 1013 if ix_key in drop_queries: 1014 continue 1015 if ix_unquoted.lower() not in existing_indices: 1016 continue 1017 1018 if ( 1019 ix_key == 'unique' 1020 and upsert 1021 and self.flavor not in ('sqlite', 'geopackage') 1022 and not is_hypertable 1023 ): 1024 constraint_name_unquoted = ix_unquoted.replace('IX_', 'UQ_') 1025 constraint_name = sql_item_name(constraint_name_unquoted, self.flavor) 1026 constraint_or_index = ( 1027 "CONSTRAINT" 1028 if self.flavor not in ('mysql', 'mariadb') 1029 else 'INDEX' 1030 ) 1031 drop_queries[ix_key].append( 1032 f"ALTER TABLE {pipe_name}\n" 1033 f"DROP {constraint_or_index} {constraint_name}" 1034 ) 1035 1036 query = ( 1037 ( 1038 f"ALTER TABLE {pipe_name}\n" 1039 if self.flavor in ('mysql', 'mariadb') 1040 else '' 1041 ) 1042 + f"DROP INDEX {if_exists_str}" 1043 + sql_item_name(ix_unquoted, self.flavor, index_schema) 1044 ) 1045 if self.flavor == 'mssql': 1046 query += f"\nON {pipe_name}" 1047 if ix_unquoted == clustered_ix: 1048 query += "\nWITH (ONLINE = ON, MAXDOP = 4)" 1049 drop_queries[ix_key].append(query) 1050 1051 1052 return drop_queries
Return a dictionary mapping columns to a DROP INDEX or equivalent query.
Parameters
- pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
- A dictionary of column names mapping to lists of queries.
3374def get_add_columns_queries( 3375 self, 3376 pipe: mrsm.Pipe, 3377 df: Union[pd.DataFrame, Dict[str, str]], 3378 _is_db_types: bool = False, 3379 debug: bool = False, 3380) -> List[str]: 3381 """ 3382 Add new null columns of the correct type to a table from a dataframe. 3383 3384 Parameters 3385 ---------- 3386 pipe: mrsm.Pipe 3387 The pipe to be altered. 3388 3389 df: Union[pd.DataFrame, Dict[str, str]] 3390 The pandas DataFrame which contains new columns. 3391 If a dictionary is provided, assume it maps columns to Pandas data types. 3392 3393 _is_db_types: bool, default False 3394 If `True`, assume `df` is a dictionary mapping columns to SQL native dtypes. 3395 3396 Returns 3397 ------- 3398 A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector. 3399 """ 3400 if not pipe.exists(debug=debug): 3401 return [] 3402 3403 if pipe.parameters.get('static', False): 3404 return [] 3405 3406 from decimal import Decimal 3407 import copy 3408 from meerschaum.utils.sql import ( 3409 sql_item_name, 3410 SINGLE_ALTER_TABLE_FLAVORS, 3411 get_table_cols_types, 3412 ) 3413 from meerschaum.utils.dtypes.sql import ( 3414 get_pd_type_from_db_type, 3415 get_db_type_from_pd_type, 3416 ) 3417 from meerschaum.utils.misc import flatten_list 3418 is_dask = 'dask' in df.__module__ if not isinstance(df, dict) else False 3419 if is_dask: 3420 df = df.partitions[0].compute() 3421 df_cols_types = ( 3422 { 3423 col: str(typ) 3424 for col, typ in df.dtypes.items() 3425 } 3426 if not isinstance(df, dict) 3427 else copy.deepcopy(df) 3428 ) 3429 if not isinstance(df, dict) and len(df.index) > 0: 3430 for col, typ in list(df_cols_types.items()): 3431 if typ != 'object': 3432 continue 3433 val = df.iloc[0][col] 3434 if isinstance(val, (dict, list)): 3435 df_cols_types[col] = 'json' 3436 elif isinstance(val, Decimal): 3437 df_cols_types[col] = 'numeric' 3438 elif isinstance(val, str): 3439 df_cols_types[col] = 'str' 3440 db_cols_types = { 3441 col: get_pd_type_from_db_type(typ) 3442 for col, typ in get_table_cols_types( 3443 pipe.target, 3444 self, 3445 schema=self.get_pipe_schema(pipe), 3446 debug=debug, 3447 ).items() 3448 } 3449 new_cols = set(df_cols_types) - set(db_cols_types) 3450 if not new_cols: 3451 return [] 3452 3453 new_cols_types = { 3454 col: get_db_type_from_pd_type( 3455 df_cols_types[col], 3456 self.flavor 3457 ) 3458 for col in new_cols 3459 if col and df_cols_types.get(col, None) 3460 } 3461 3462 alter_table_query = "ALTER TABLE " + sql_item_name( 3463 pipe.target, self.flavor, self.get_pipe_schema(pipe) 3464 ) 3465 queries = [] 3466 for col, typ in new_cols_types.items(): 3467 add_col_query = ( 3468 "\nADD " 3469 + sql_item_name(col, self.flavor, None) 3470 + " " + typ + "," 3471 ) 3472 3473 if self.flavor in SINGLE_ALTER_TABLE_FLAVORS: 3474 queries.append(alter_table_query + add_col_query[:-1]) 3475 else: 3476 alter_table_query += add_col_query 3477 3478 ### For most flavors, only one query is required. 3479 ### This covers SQLite which requires one query per column. 3480 if not queries: 3481 queries.append(alter_table_query[:-1]) 3482 3483 if self.flavor != 'duckdb': 3484 return queries 3485 3486 ### NOTE: For DuckDB, we must drop and rebuild the indices. 3487 drop_index_queries = list(flatten_list( 3488 [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()] 3489 )) 3490 create_index_queries = list(flatten_list( 3491 [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()] 3492 )) 3493 3494 return drop_index_queries + queries + create_index_queries
Add new null columns of the correct type to a table from a dataframe.
Parameters
- pipe (mrsm.Pipe): The pipe to be altered.
- df (Union[pd.DataFrame, Dict[str, str]]): The pandas DataFrame which contains new columns. If a dictionary is provided, assume it maps columns to Pandas data types.
- _is_db_types (bool, default False):
If
True, assumedfis a dictionary mapping columns to SQL native dtypes.
Returns
- A list of the
ALTER TABLESQL query or queries to be executed on the provided connector.
3497def get_alter_columns_queries( 3498 self, 3499 pipe: mrsm.Pipe, 3500 df: Union[pd.DataFrame, Dict[str, str]], 3501 debug: bool = False, 3502) -> List[str]: 3503 """ 3504 If we encounter a column of a different type, set the entire column to text. 3505 If the altered columns are numeric, alter to numeric instead. 3506 3507 Parameters 3508 ---------- 3509 pipe: mrsm.Pipe 3510 The pipe to be altered. 3511 3512 df: Union[pd.DataFrame, Dict[str, str]] 3513 The pandas DataFrame which may contain altered columns. 3514 If a dict is provided, assume it maps columns to Pandas data types. 3515 3516 Returns 3517 ------- 3518 A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector. 3519 """ 3520 if not pipe.exists(debug=debug) or pipe.static: 3521 return [] 3522 3523 from meerschaum.utils.sql import ( 3524 sql_item_name, 3525 get_table_cols_types, 3526 DROP_IF_EXISTS_FLAVORS, 3527 SINGLE_ALTER_TABLE_FLAVORS, 3528 ) 3529 from meerschaum.utils.dataframe import get_numeric_cols 3530 from meerschaum.utils.dtypes import are_dtypes_equal 3531 from meerschaum.utils.dtypes.sql import ( 3532 get_pd_type_from_db_type, 3533 get_db_type_from_pd_type, 3534 ) 3535 from meerschaum.utils.misc import flatten_list, generate_password, items_str 3536 target = pipe.target 3537 session_id = generate_password(3) 3538 numeric_cols = ( 3539 get_numeric_cols(df) 3540 if not isinstance(df, dict) 3541 else [ 3542 col 3543 for col, typ in df.items() 3544 if typ.startswith('numeric') 3545 ] 3546 ) 3547 df_cols_types = ( 3548 { 3549 col: str(typ) 3550 for col, typ in df.dtypes.items() 3551 } 3552 if not isinstance(df, dict) 3553 else df 3554 ) 3555 db_cols_types = { 3556 col: get_pd_type_from_db_type(typ) 3557 for col, typ in get_table_cols_types( 3558 pipe.target, 3559 self, 3560 schema=self.get_pipe_schema(pipe), 3561 debug=debug, 3562 ).items() 3563 } 3564 pipe_dtypes = pipe.get_dtypes(debug=debug) 3565 pipe_bool_cols = [col for col, typ in pipe_dtypes.items() if are_dtypes_equal(str(typ), 'bool')] 3566 pd_db_df_aliases = { 3567 'int': 'bool', 3568 'float': 'bool', 3569 'numeric': 'bool', 3570 'guid': 'object', 3571 } 3572 if self.flavor == 'oracle': 3573 pd_db_df_aliases.update({ 3574 'int': 'numeric', 3575 'date': 'datetime', 3576 'numeric': 'int', 3577 }) 3578 elif self.flavor == 'geopackage': 3579 pd_db_df_aliases.update({ 3580 'geometry': 'bytes', 3581 'bytes': 'geometry', 3582 }) 3583 3584 altered_cols = { 3585 col: (db_cols_types.get(col, 'object'), typ) 3586 for col, typ in df_cols_types.items() 3587 if not are_dtypes_equal(typ, db_cols_types.get(col, 'object').lower()) 3588 and not are_dtypes_equal(db_cols_types.get(col, 'object'), 'string') 3589 } 3590 3591 if debug and altered_cols: 3592 dprint("Columns to be altered:") 3593 mrsm.pprint(altered_cols) 3594 3595 ### NOTE: Special columns (numerics, bools, etc.) are captured and cached upon detection. 3596 new_special_cols = pipe._get_cached_value('new_special_cols', debug=debug) or {} 3597 new_special_db_cols_types = { 3598 col: (db_cols_types.get(col, 'object'), typ) 3599 for col, typ in new_special_cols.items() 3600 } 3601 if debug: 3602 dprint("Cached new special columns:") 3603 mrsm.pprint(new_special_cols) 3604 dprint("New special columns db types:") 3605 mrsm.pprint(new_special_db_cols_types) 3606 3607 altered_cols.update(new_special_db_cols_types) 3608 3609 ### NOTE: Sometimes bools are coerced into ints or floats. 3610 altered_cols_to_ignore = set() 3611 for col, (db_typ, df_typ) in altered_cols.items(): 3612 for db_alias, df_alias in pd_db_df_aliases.items(): 3613 if ( 3614 db_alias in db_typ.lower() 3615 and df_alias in df_typ.lower() 3616 and col not in new_special_cols 3617 ): 3618 altered_cols_to_ignore.add(col) 3619 3620 ### Oracle's bool handling sometimes mixes NUMBER and INT. 3621 for bool_col in pipe_bool_cols: 3622 if bool_col not in altered_cols: 3623 continue 3624 db_is_bool_compatible = ( 3625 are_dtypes_equal('int', altered_cols[bool_col][0]) 3626 or are_dtypes_equal('float', altered_cols[bool_col][0]) 3627 or are_dtypes_equal('numeric', altered_cols[bool_col][0]) 3628 or are_dtypes_equal('bool', altered_cols[bool_col][0]) 3629 ) 3630 df_is_bool_compatible = ( 3631 are_dtypes_equal('int', altered_cols[bool_col][1]) 3632 or are_dtypes_equal('float', altered_cols[bool_col][1]) 3633 or are_dtypes_equal('numeric', altered_cols[bool_col][1]) 3634 or are_dtypes_equal('bool', altered_cols[bool_col][1]) 3635 ) 3636 if db_is_bool_compatible and df_is_bool_compatible: 3637 altered_cols_to_ignore.add(bool_col) 3638 3639 if debug and altered_cols_to_ignore: 3640 dprint("Ignoring the following altered columns (false positives).") 3641 mrsm.pprint(altered_cols_to_ignore) 3642 3643 for col in altered_cols_to_ignore: 3644 _ = altered_cols.pop(col, None) 3645 3646 if not altered_cols: 3647 return [] 3648 3649 if numeric_cols: 3650 explicit_pipe_dtypes = pipe.get_dtypes(infer=False, debug=debug) 3651 explicit_pipe_dtypes.update({col: 'numeric' for col in numeric_cols}) 3652 pipe.dtypes = explicit_pipe_dtypes 3653 if not pipe.temporary: 3654 edit_success, edit_msg = pipe.edit(debug=debug) 3655 if not edit_success: 3656 warn( 3657 f"Failed to update dtypes for numeric columns {items_str(numeric_cols)}:\n" 3658 + f"{edit_msg}" 3659 ) 3660 else: 3661 numeric_cols.extend([col for col, typ in pipe_dtypes.items() if typ.startswith('numeric')]) 3662 3663 numeric_type = get_db_type_from_pd_type('numeric', self.flavor, as_sqlalchemy=False) 3664 text_type = get_db_type_from_pd_type('str', self.flavor, as_sqlalchemy=False) 3665 altered_cols_types = { 3666 col: ( 3667 numeric_type 3668 if col in numeric_cols 3669 else text_type 3670 ) 3671 for col, (db_typ, typ) in altered_cols.items() 3672 } 3673 3674 if self.flavor in ('sqlite', 'geopackage'): 3675 temp_table_name = '-' + session_id + '_' + target 3676 rename_query = ( 3677 "ALTER TABLE " 3678 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3679 + " RENAME TO " 3680 + sql_item_name(temp_table_name, self.flavor, None) 3681 ) 3682 create_query = ( 3683 "CREATE TABLE " 3684 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3685 + " (\n" 3686 ) 3687 for col_name, col_typ in db_cols_types.items(): 3688 create_query += ( 3689 sql_item_name(col_name, self.flavor, None) 3690 + " " 3691 + ( 3692 col_typ 3693 if col_name not in altered_cols 3694 else altered_cols_types[col_name] 3695 ) 3696 + ",\n" 3697 ) 3698 create_query = create_query[:-2] + "\n)" 3699 3700 insert_query = ( 3701 "INSERT INTO " 3702 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3703 + ' (' 3704 + ', '.join([ 3705 sql_item_name(col_name, self.flavor, None) 3706 for col_name in db_cols_types 3707 ]) 3708 + ')' 3709 + "\nSELECT\n" 3710 ) 3711 for col_name in db_cols_types: 3712 new_col_str = ( 3713 sql_item_name(col_name, self.flavor, None) 3714 if col_name not in altered_cols 3715 else ( 3716 "CAST(" 3717 + sql_item_name(col_name, self.flavor, None) 3718 + " AS " 3719 + altered_cols_types[col_name] 3720 + ")" 3721 ) 3722 ) 3723 insert_query += new_col_str + ",\n" 3724 3725 insert_query = insert_query[:-2] + ( 3726 f"\nFROM {sql_item_name(temp_table_name, self.flavor, self.get_pipe_schema(pipe))}" 3727 ) 3728 3729 if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else "" 3730 3731 drop_query = f"DROP TABLE {if_exists_str}" + sql_item_name( 3732 temp_table_name, self.flavor, self.get_pipe_schema(pipe) 3733 ) 3734 return [ 3735 rename_query, 3736 create_query, 3737 insert_query, 3738 drop_query, 3739 ] 3740 3741 queries = [] 3742 if self.flavor == 'oracle': 3743 for col, typ in altered_cols_types.items(): 3744 add_query = ( 3745 "ALTER TABLE " 3746 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3747 + "\nADD " + sql_item_name(col + '_temp', self.flavor, None) 3748 + " " + typ 3749 ) 3750 queries.append(add_query) 3751 3752 for col, typ in altered_cols_types.items(): 3753 populate_temp_query = ( 3754 "UPDATE " 3755 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3756 + "\nSET " + sql_item_name(col + '_temp', self.flavor, None) 3757 + ' = ' + sql_item_name(col, self.flavor, None) 3758 ) 3759 queries.append(populate_temp_query) 3760 3761 for col, typ in altered_cols_types.items(): 3762 set_old_cols_to_null_query = ( 3763 "UPDATE " 3764 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3765 + "\nSET " + sql_item_name(col, self.flavor, None) 3766 + ' = NULL' 3767 ) 3768 queries.append(set_old_cols_to_null_query) 3769 3770 for col, typ in altered_cols_types.items(): 3771 alter_type_query = ( 3772 "ALTER TABLE " 3773 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3774 + "\nMODIFY " + sql_item_name(col, self.flavor, None) + ' ' 3775 + typ 3776 ) 3777 queries.append(alter_type_query) 3778 3779 for col, typ in altered_cols_types.items(): 3780 set_old_to_temp_query = ( 3781 "UPDATE " 3782 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3783 + "\nSET " + sql_item_name(col, self.flavor, None) 3784 + ' = ' + sql_item_name(col + '_temp', self.flavor, None) 3785 ) 3786 queries.append(set_old_to_temp_query) 3787 3788 for col, typ in altered_cols_types.items(): 3789 drop_temp_query = ( 3790 "ALTER TABLE " 3791 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3792 + "\nDROP COLUMN " + sql_item_name(col + '_temp', self.flavor, None) 3793 ) 3794 queries.append(drop_temp_query) 3795 3796 return queries 3797 3798 query = "ALTER TABLE " + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3799 for col, typ in altered_cols_types.items(): 3800 alter_col_prefix = ( 3801 'ALTER' if self.flavor not in ('mysql', 'mariadb', 'oracle') 3802 else 'MODIFY' 3803 ) 3804 type_prefix = ( 3805 '' if self.flavor in ('mssql', 'mariadb', 'mysql') 3806 else 'TYPE ' 3807 ) 3808 column_str = 'COLUMN' if self.flavor != 'oracle' else '' 3809 query_suffix = ( 3810 f"\n{alter_col_prefix} {column_str} " 3811 + sql_item_name(col, self.flavor, None) 3812 + " " + type_prefix + typ + "," 3813 ) 3814 if self.flavor not in SINGLE_ALTER_TABLE_FLAVORS: 3815 query += query_suffix 3816 else: 3817 queries.append(query + query_suffix[:-1]) 3818 3819 if self.flavor not in SINGLE_ALTER_TABLE_FLAVORS: 3820 queries.append(query[:-1]) 3821 3822 if self.flavor != 'duckdb': 3823 return queries 3824 3825 drop_index_queries = list(flatten_list( 3826 [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()] 3827 )) 3828 create_index_queries = list(flatten_list( 3829 [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()] 3830 )) 3831 3832 return drop_index_queries + queries + create_index_queries
If we encounter a column of a different type, set the entire column to text. If the altered columns are numeric, alter to numeric instead.
Parameters
- pipe (mrsm.Pipe): The pipe to be altered.
- df (Union[pd.DataFrame, Dict[str, str]]): The pandas DataFrame which may contain altered columns. If a dict is provided, assume it maps columns to Pandas data types.
Returns
- A list of the
ALTER TABLESQL query or queries to be executed on the provided connector.
1055def delete_pipe( 1056 self, 1057 pipe: mrsm.Pipe, 1058 debug: bool = False, 1059) -> SuccessTuple: 1060 """ 1061 Delete a Pipe's registration. 1062 """ 1063 from meerschaum.utils.packages import attempt_import 1064 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 1065 1066 if not pipe.id: 1067 return False, f"{pipe} is not registered." 1068 1069 ### ensure pipes table exists 1070 from meerschaum.connectors.sql.tables import get_tables 1071 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 1072 1073 q = sqlalchemy.delete(pipes_tbl).where(pipes_tbl.c.pipe_id == pipe.id) 1074 if not self.exec(q, debug=debug): 1075 return False, f"Failed to delete registration for {pipe}." 1076 1077 return True, "Success"
Delete a Pipe's registration.
1080def get_pipe_data( 1081 self, 1082 pipe: mrsm.Pipe, 1083 select_columns: Optional[List[str]] = None, 1084 omit_columns: Optional[List[str]] = None, 1085 begin: Union[datetime, str, None] = None, 1086 end: Union[datetime, str, None] = None, 1087 params: Optional[Dict[str, Any]] = None, 1088 order: str = 'asc', 1089 limit: Optional[int] = None, 1090 begin_add_minutes: int = 0, 1091 end_add_minutes: int = 0, 1092 chunksize: Optional[int] = -1, 1093 as_iterator: bool = False, 1094 debug: bool = False, 1095 **kw: Any 1096) -> Union[pd.DataFrame, None]: 1097 """ 1098 Access a pipe's data from the SQL instance. 1099 1100 Parameters 1101 ---------- 1102 pipe: mrsm.Pipe: 1103 The pipe to get data from. 1104 1105 select_columns: Optional[List[str]], default None 1106 If provided, only select these given columns. 1107 Otherwise select all available columns (i.e. `SELECT *`). 1108 1109 omit_columns: Optional[List[str]], default None 1110 If provided, remove these columns from the selection. 1111 1112 begin: Union[datetime, str, None], default None 1113 If provided, get rows newer than or equal to this value. 1114 1115 end: Union[datetime, str, None], default None 1116 If provided, get rows older than or equal to this value. 1117 1118 params: Optional[Dict[str, Any]], default None 1119 Additional parameters to filter by. 1120 See `meerschaum.connectors.sql.build_where`. 1121 1122 order: Optional[str], default 'asc' 1123 The selection order for all of the indices in the query. 1124 If `None`, omit the `ORDER BY` clause. 1125 1126 limit: Optional[int], default None 1127 If specified, limit the number of rows retrieved to this value. 1128 1129 begin_add_minutes: int, default 0 1130 The number of minutes to add to the `begin` datetime (i.e. `DATEADD`). 1131 1132 end_add_minutes: int, default 0 1133 The number of minutes to add to the `end` datetime (i.e. `DATEADD`). 1134 1135 chunksize: Optional[int], default -1 1136 The size of dataframe chunks to load into memory. 1137 1138 as_iterator: bool, default False 1139 If `True`, return the chunks iterator directly. 1140 1141 debug: bool, default False 1142 Verbosity toggle. 1143 1144 Returns 1145 ------- 1146 A `pd.DataFrame` of the pipe's data. 1147 1148 """ 1149 from meerschaum.utils.packages import import_pandas 1150 from meerschaum.utils.dtypes import to_pandas_dtype, are_dtypes_equal 1151 from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type 1152 pd = import_pandas() 1153 is_dask = 'dask' in pd.__name__ 1154 1155 cols_types = pipe.get_columns_types(debug=debug) if pipe.enforce else {} 1156 pipe_dtypes = pipe.get_dtypes(infer=False, debug=debug) if pipe.enforce else {} 1157 1158 remote_pandas_types = { 1159 col: to_pandas_dtype(get_pd_type_from_db_type(typ)) 1160 for col, typ in cols_types.items() 1161 } 1162 remote_dt_cols_types = { 1163 col: typ 1164 for col, typ in remote_pandas_types.items() 1165 if are_dtypes_equal(typ, 'datetime') 1166 } 1167 remote_dt_tz_aware_cols_types = { 1168 col: typ 1169 for col, typ in remote_dt_cols_types.items() 1170 if ',' in typ or typ == 'datetime' 1171 } 1172 remote_dt_tz_naive_cols_types = { 1173 col: typ 1174 for col, typ in remote_dt_cols_types.items() 1175 if col not in remote_dt_tz_aware_cols_types 1176 } 1177 1178 configured_pandas_types = { 1179 col: to_pandas_dtype(typ) 1180 for col, typ in pipe_dtypes.items() 1181 } 1182 configured_lower_precision_dt_cols_types = { 1183 col: typ 1184 for col, typ in pipe_dtypes.items() 1185 if ( 1186 are_dtypes_equal('datetime', typ) 1187 and '[' in typ 1188 and 'ns' not in typ 1189 ) 1190 1191 } 1192 1193 dtypes = { 1194 **remote_pandas_types, 1195 **configured_pandas_types, 1196 **remote_dt_tz_aware_cols_types, 1197 **remote_dt_tz_naive_cols_types, 1198 **configured_lower_precision_dt_cols_types 1199 } if pipe.enforce else {} 1200 1201 existing_cols = cols_types.keys() 1202 select_columns = ( 1203 [ 1204 col 1205 for col in existing_cols 1206 if col not in (omit_columns or []) 1207 ] 1208 if not select_columns 1209 else [ 1210 col 1211 for col in select_columns 1212 if col in existing_cols 1213 and col not in (omit_columns or []) 1214 ] 1215 ) if pipe.enforce else select_columns 1216 1217 if select_columns: 1218 dtypes = {col: typ for col, typ in dtypes.items() if col in select_columns} 1219 1220 dtypes = { 1221 col: typ 1222 for col, typ in dtypes.items() 1223 if col in (select_columns or [col]) and col not in (omit_columns or []) 1224 } if pipe.enforce else {} 1225 1226 if debug: 1227 dprint(f"[{self}] `read()` dtypes:") 1228 mrsm.pprint(dtypes) 1229 1230 query = self.get_pipe_data_query( 1231 pipe, 1232 select_columns=select_columns, 1233 omit_columns=omit_columns, 1234 begin=begin, 1235 end=end, 1236 params=params, 1237 order=order, 1238 limit=limit, 1239 begin_add_minutes=begin_add_minutes, 1240 end_add_minutes=end_add_minutes, 1241 debug=debug, 1242 **kw 1243 ) 1244 1245 read_kwargs = {} 1246 if is_dask: 1247 index_col = pipe.columns.get('datetime', None) 1248 read_kwargs['index_col'] = index_col 1249 1250 chunks = self.read( 1251 query, 1252 chunksize=chunksize, 1253 as_iterator=True, 1254 coerce_float=False, 1255 dtype=dtypes, 1256 debug=debug, 1257 **read_kwargs 1258 ) 1259 1260 if as_iterator: 1261 return chunks 1262 1263 return pd.concat(chunks)
Access a pipe's data from the SQL instance.
Parameters
- pipe (mrsm.Pipe:): The pipe to get data from.
- select_columns (Optional[List[str]], default None):
If provided, only select these given columns.
Otherwise select all available columns (i.e.
SELECT *). - omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
- begin (Union[datetime, str, None], default None): If provided, get rows newer than or equal to this value.
- end (Union[datetime, str, None], default None): If provided, get rows older than or equal to this value.
- params (Optional[Dict[str, Any]], default None):
Additional parameters to filter by.
See
meerschaum.connectors.sql.build_where. - order (Optional[str], default 'asc'):
The selection order for all of the indices in the query.
If
None, omit theORDER BYclause. - limit (Optional[int], default None): If specified, limit the number of rows retrieved to this value.
- begin_add_minutes (int, default 0):
The number of minutes to add to the
begindatetime (i.e.DATEADD). - end_add_minutes (int, default 0):
The number of minutes to add to the
enddatetime (i.e.DATEADD). - chunksize (Optional[int], default -1): The size of dataframe chunks to load into memory.
- as_iterator (bool, default False):
If
True, return the chunks iterator directly. - debug (bool, default False): Verbosity toggle.
Returns
- A
pd.DataFrameof the pipe's data.
1266def get_pipe_docs( 1267 self, 1268 pipe: mrsm.Pipe, 1269 select_columns: Optional[List[str]] = None, 1270 omit_columns: Optional[List[str]] = None, 1271 begin: Union[datetime, str, None] = None, 1272 end: Union[datetime, str, None] = None, 1273 params: Optional[Dict[str, Any]] = None, 1274 order: str = 'asc', 1275 limit: Optional[int] = None, 1276 debug: bool = False, 1277 **kw: Any 1278) -> List[Dict[str, Any]]: 1279 """ 1280 Return a pipe's data as a list of dictionaries, bypassing pandas overhead. 1281 """ 1282 query = self.get_pipe_data_query( 1283 pipe=pipe, 1284 select_columns=select_columns, 1285 omit_columns=omit_columns, 1286 begin=begin, 1287 end=end, 1288 params=params, 1289 order=order, 1290 limit=limit, 1291 debug=debug, 1292 ) 1293 if query is None: 1294 return [] 1295 result = self.exec(query, silent=True, debug=debug) 1296 if result is None: 1297 return [] 1298 return [dict(row) for row in result.mappings().fetchall()]
Return a pipe's data as a list of dictionaries, bypassing pandas overhead.
1301def get_pipe_data_query( 1302 self, 1303 pipe: mrsm.Pipe, 1304 select_columns: Optional[List[str]] = None, 1305 omit_columns: Optional[List[str]] = None, 1306 begin: Union[datetime, int, str, None] = None, 1307 end: Union[datetime, int, str, None] = None, 1308 params: Optional[Dict[str, Any]] = None, 1309 order: Optional[str] = 'asc', 1310 sort_datetimes: bool = False, 1311 limit: Optional[int] = None, 1312 begin_add_minutes: int = 0, 1313 end_add_minutes: int = 0, 1314 replace_nulls: Optional[str] = None, 1315 skip_existing_cols_check: bool = False, 1316 debug: bool = False, 1317 **kw: Any 1318) -> Union[str, None]: 1319 """ 1320 Return the `SELECT` query for retrieving a pipe's data from its instance. 1321 1322 Parameters 1323 ---------- 1324 pipe: mrsm.Pipe: 1325 The pipe to get data from. 1326 1327 select_columns: Optional[List[str]], default None 1328 If provided, only select these given columns. 1329 Otherwise select all available columns (i.e. `SELECT *`). 1330 1331 omit_columns: Optional[List[str]], default None 1332 If provided, remove these columns from the selection. 1333 1334 begin: Union[datetime, int, str, None], default None 1335 If provided, get rows newer than or equal to this value. 1336 1337 end: Union[datetime, str, None], default None 1338 If provided, get rows older than or equal to this value. 1339 1340 params: Optional[Dict[str, Any]], default None 1341 Additional parameters to filter by. 1342 See `meerschaum.connectors.sql.build_where`. 1343 1344 order: Optional[str], default None 1345 The selection order for all of the indices in the query. 1346 If `None`, omit the `ORDER BY` clause. 1347 1348 sort_datetimes: bool, default False 1349 Alias for `order='desc'`. 1350 1351 limit: Optional[int], default None 1352 If specified, limit the number of rows retrieved to this value. 1353 1354 begin_add_minutes: int, default 0 1355 The number of minutes to add to the `begin` datetime (i.e. `DATEADD`). 1356 1357 end_add_minutes: int, default 0 1358 The number of minutes to add to the `end` datetime (i.e. `DATEADD`). 1359 1360 chunksize: Optional[int], default -1 1361 The size of dataframe chunks to load into memory. 1362 1363 replace_nulls: Optional[str], default None 1364 If provided, replace null values with this value. 1365 1366 skip_existing_cols_check: bool, default False 1367 If `True`, do not verify that querying columns are actually on the table. 1368 1369 debug: bool, default False 1370 Verbosity toggle. 1371 1372 Returns 1373 ------- 1374 A `SELECT` query to retrieve a pipe's data. 1375 """ 1376 from meerschaum.utils.misc import items_str 1377 from meerschaum.utils.sql import sql_item_name, dateadd_str 1378 from meerschaum.utils.dtypes import coerce_timezone 1379 from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type, get_db_type_from_pd_type 1380 1381 dt_col = pipe.columns.get('datetime', None) 1382 existing_cols = pipe.get_columns_types(debug=debug) if pipe.enforce else [] 1383 skip_existing_cols_check = skip_existing_cols_check or not pipe.enforce 1384 dt_typ = get_pd_type_from_db_type(existing_cols[dt_col]) if dt_col in existing_cols else None 1385 dt_db_type = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None 1386 select_columns = ( 1387 [col for col in existing_cols] 1388 if not select_columns 1389 else [col for col in select_columns if skip_existing_cols_check or col in existing_cols] 1390 ) 1391 if omit_columns: 1392 select_columns = [col for col in select_columns if col not in omit_columns] 1393 1394 if order is None and sort_datetimes: 1395 order = 'desc' 1396 1397 if begin == '': 1398 begin = pipe.get_sync_time(debug=debug) 1399 backtrack_interval = pipe.get_backtrack_interval(debug=debug) 1400 if begin is not None: 1401 begin -= backtrack_interval 1402 1403 begin, end = pipe.parse_date_bounds(begin, end) 1404 if isinstance(begin, datetime) and dt_typ: 1405 begin = coerce_timezone(begin, strip_utc=('utc' not in dt_typ.lower())) 1406 if isinstance(end, datetime) and dt_typ: 1407 end = coerce_timezone(end, strip_utc=('utc' not in dt_typ.lower())) 1408 1409 cols_names = [ 1410 sql_item_name(col, self.flavor, None) 1411 for col in select_columns 1412 ] 1413 select_cols_str = ( 1414 'SELECT\n ' 1415 + ',\n '.join( 1416 [ 1417 ( 1418 col_name 1419 if not replace_nulls 1420 else f"COALESCE(col_name, '{replace_nulls}') AS {col_name}" 1421 ) 1422 for col_name in cols_names 1423 ] 1424 ) 1425 ) if cols_names else 'SELECT *' 1426 pipe_table_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 1427 query = f"{select_cols_str}\nFROM {pipe_table_name}" 1428 where = "" 1429 1430 ### MariaDB 12.x optimizer bug: an ordered index scan with `LIMIT` over a `RANGE COLUMNS`- 1431 ### partitioned table that must read a non-indexed column returns zero rows (see the 1432 ### `get_sync_time` workaround in CLAUDE.md — same family, MariaDB-only). Wrapping the 1433 ### datetime column in the `ORDER BY` with a no-op `COALESCE(col, col)` forces a filesort 1434 ### over the fetched rows instead of the broken index walk, with identical ordering. Gated 1435 ### on `LIMIT` (the bug's trigger) so unlimited ordered reads keep the index-ordered scan. 1436 mariadb_partition_order_workaround = ( 1437 self.flavor == 'mariadb' 1438 and isinstance(limit, int) 1439 and self._should_partition(pipe) 1440 ) 1441 1442 if order is not None: 1443 default_order = 'asc' 1444 if order not in ('asc', 'desc'): 1445 warn(f"Ignoring unsupported order '{order}'. Falling back to '{default_order}'.") 1446 order = default_order 1447 order = order.upper() 1448 1449 if not pipe.columns.get('datetime', None): 1450 _dt = pipe.guess_datetime() 1451 dt = sql_item_name(_dt, self.flavor, None) if _dt else None 1452 is_guess = True 1453 else: 1454 _dt = pipe.get_columns('datetime') 1455 dt = sql_item_name(_dt, self.flavor, None) 1456 is_guess = False 1457 1458 quoted_indices = { 1459 key: sql_item_name(val, self.flavor, None) 1460 for key, val in pipe.columns.items() 1461 if val in existing_cols or skip_existing_cols_check 1462 } 1463 1464 if begin is not None or end is not None: 1465 if is_guess: 1466 if _dt is None: 1467 warn( 1468 f"No datetime could be determined for {pipe}." 1469 + "\n Ignoring begin and end...", 1470 stack=False, 1471 ) 1472 begin, end = None, None 1473 else: 1474 warn( 1475 f"A datetime wasn't specified for {pipe}.\n" 1476 + f" Using column \"{_dt}\" for datetime bounds...", 1477 stack=False, 1478 ) 1479 1480 is_dt_bound = False 1481 if begin is not None and (_dt in existing_cols or skip_existing_cols_check): 1482 begin_da = dateadd_str( 1483 flavor=self.flavor, 1484 datepart='minute', 1485 number=begin_add_minutes, 1486 begin=begin, 1487 db_type=dt_db_type, 1488 ) 1489 where += f"\n {dt} >= {begin_da}" + ("\n AND\n " if end is not None else "") 1490 is_dt_bound = True 1491 1492 if end is not None and (_dt in existing_cols or skip_existing_cols_check): 1493 if 'int' in str(type(end)).lower() and end == begin: 1494 end += 1 1495 end_da = dateadd_str( 1496 flavor=self.flavor, 1497 datepart='minute', 1498 number=end_add_minutes, 1499 begin=end, 1500 db_type=dt_db_type, 1501 ) 1502 where += f"{dt} < {end_da}" 1503 is_dt_bound = True 1504 1505 if params is not None: 1506 from meerschaum.utils.sql import build_where 1507 valid_params = { 1508 k: v 1509 for k, v in params.items() 1510 if k in existing_cols or skip_existing_cols_check 1511 } 1512 if valid_params: 1513 where += ' ' + build_where(valid_params, self).lstrip().replace( 1514 'WHERE', (' AND' if is_dt_bound else " ") 1515 ) 1516 1517 if len(where) > 0: 1518 query += "\nWHERE " + where 1519 1520 if order is not None: 1521 ### Sort by indices, starting with datetime. 1522 order_by = "" 1523 if quoted_indices: 1524 order_by += "\nORDER BY " 1525 if _dt and (_dt in existing_cols or skip_existing_cols_check): 1526 dt_order_expr = ( 1527 f"COALESCE({dt}, {dt})" 1528 if mariadb_partition_order_workaround 1529 else dt 1530 ) 1531 order_by += dt_order_expr + ' ' + order + ',' 1532 for key, quoted_col_name in quoted_indices.items(): 1533 if dt == quoted_col_name: 1534 continue 1535 order_by += ' ' + quoted_col_name + ' ' + order + ',' 1536 order_by = order_by[:-1] 1537 1538 query += order_by 1539 1540 if isinstance(limit, int): 1541 if self.flavor == 'mssql': 1542 query = f'SELECT TOP {limit}\n' + query[len("SELECT "):] 1543 elif self.flavor == 'oracle': 1544 query = ( 1545 f"SELECT * FROM (\n {query}\n)\n" 1546 + f"WHERE ROWNUM IN ({', '.join([str(i) for i in range(1, limit+1)])})" 1547 ) 1548 else: 1549 query += f"\nLIMIT {limit}" 1550 1551 if debug: 1552 to_print = ( 1553 [] 1554 + ([f"begin='{begin}'"] if begin else []) 1555 + ([f"end='{end}'"] if end else []) 1556 + ([f"params={params}"] if params else []) 1557 ) 1558 dprint("Getting pipe data with constraints: " + items_str(to_print, quotes=False)) 1559 1560 return query
Return the SELECT query for retrieving a pipe's data from its instance.
Parameters
- pipe (mrsm.Pipe:): The pipe to get data from.
- select_columns (Optional[List[str]], default None):
If provided, only select these given columns.
Otherwise select all available columns (i.e.
SELECT *). - omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
- begin (Union[datetime, int, str, None], default None): If provided, get rows newer than or equal to this value.
- end (Union[datetime, str, None], default None): If provided, get rows older than or equal to this value.
- params (Optional[Dict[str, Any]], default None):
Additional parameters to filter by.
See
meerschaum.connectors.sql.build_where. - order (Optional[str], default None):
The selection order for all of the indices in the query.
If
None, omit theORDER BYclause. - sort_datetimes (bool, default False):
Alias for
order='desc'. - limit (Optional[int], default None): If specified, limit the number of rows retrieved to this value.
- begin_add_minutes (int, default 0):
The number of minutes to add to the
begindatetime (i.e.DATEADD). - end_add_minutes (int, default 0):
The number of minutes to add to the
enddatetime (i.e.DATEADD). - chunksize (Optional[int], default -1): The size of dataframe chunks to load into memory.
- replace_nulls (Optional[str], default None): If provided, replace null values with this value.
- skip_existing_cols_check (bool, default False):
If
True, do not verify that querying columns are actually on the table. - debug (bool, default False): Verbosity toggle.
Returns
- A
SELECTquery to retrieve a pipe's data.
21def register_pipe( 22 self, 23 pipe: mrsm.Pipe, 24 debug: bool = False, 25) -> SuccessTuple: 26 """ 27 Register a new pipe. 28 A pipe's attributes must be set before registering. 29 """ 30 from meerschaum.utils.packages import attempt_import 31 from meerschaum.utils.sql import json_flavors 32 33 ### ensure pipes table exists 34 from meerschaum.connectors.sql.tables import get_tables 35 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 36 37 if pipe.id is not None: 38 return False, f"{pipe} is already registered." 39 40 ### NOTE: if `parameters` is supplied in the Pipe constructor, 41 ### then `pipe.parameters` will exist and not be fetched from the database. 42 43 ### 1. Prioritize the Pipe object's `parameters` first. 44 ### E.g. if the user manually sets the `parameters` property 45 ### or if the Pipe already exists 46 ### (which shouldn't be able to be registered anyway but that's an issue for later). 47 parameters = None 48 try: 49 parameters = pipe.get_parameters(apply_symlinks=False) 50 except Exception as e: 51 if debug: 52 dprint(str(e)) 53 parameters = None 54 55 ### ensure `parameters` is a dictionary 56 if parameters is None: 57 parameters = {} 58 59 import json 60 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 61 values = { 62 'connector_keys' : pipe.connector_keys, 63 'metric_key' : pipe.metric_key, 64 'location_key' : pipe.location_key, 65 'parameters' : ( 66 json.dumps(parameters) 67 if self.flavor not in json_flavors 68 else parameters 69 ), 70 } 71 query = sqlalchemy.insert(pipes_tbl).values(**values) 72 result = self.exec(query, debug=debug) 73 if result is None: 74 return False, f"Failed to register {pipe}." 75 return True, f"Successfully registered {pipe}."
Register a new pipe. A pipe's attributes must be set before registering.
78def edit_pipe( 79 self, 80 pipe: mrsm.Pipe, 81 patch: bool = False, 82 debug: bool = False, 83 **kw : Any 84) -> SuccessTuple: 85 """ 86 Persist a Pipe's parameters to its database. 87 88 Parameters 89 ---------- 90 pipe: mrsm.Pipe, default None 91 The pipe to be edited. 92 patch: bool, default False 93 If patch is `True`, update the existing parameters by cascading. 94 Otherwise overwrite the parameters (default). 95 debug: bool, default False 96 Verbosity toggle. 97 """ 98 99 if pipe.id is None: 100 return False, f"{pipe} is not registered and cannot be edited." 101 102 from meerschaum.utils.packages import attempt_import 103 from meerschaum.utils.sql import json_flavors 104 if not patch: 105 parameters = pipe.__dict__.get('_attributes', {}).get('parameters', {}) 106 else: 107 from meerschaum import Pipe 108 from meerschaum.config._patch import apply_patch_to_config 109 original_parameters = Pipe( 110 pipe.connector_keys, pipe.metric_key, pipe.location_key, 111 mrsm_instance=pipe.instance_keys 112 ).get_parameters(apply_symlinks=False) 113 parameters = apply_patch_to_config( 114 original_parameters, 115 pipe._attributes['parameters'] 116 ) 117 118 ### ensure pipes table exists 119 from meerschaum.connectors.sql.tables import get_tables 120 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 121 122 import json 123 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 124 125 values = { 126 'parameters': ( 127 json.dumps(parameters) 128 if self.flavor not in json_flavors 129 else parameters 130 ), 131 } 132 q = sqlalchemy.update(pipes_tbl).values(**values).where( 133 pipes_tbl.c.pipe_id == pipe.id 134 ) 135 136 result = self.exec(q, debug=debug) 137 message = ( 138 f"Successfully edited {pipe}." 139 if result is not None else f"Failed to edit {pipe}." 140 ) 141 return (result is not None), message
Persist a Pipe's parameters to its database.
Parameters
- pipe (mrsm.Pipe, default None): The pipe to be edited.
- patch (bool, default False):
If patch is
True, update the existing parameters by cascading. Otherwise overwrite the parameters (default). - debug (bool, default False): Verbosity toggle.
1563def get_pipe_id( 1564 self, 1565 pipe: mrsm.Pipe, 1566 debug: bool = False, 1567) -> Any: 1568 """ 1569 Get a Pipe's ID from the pipes table. 1570 """ 1571 if pipe.temporary: 1572 return None 1573 from meerschaum.utils.packages import attempt_import 1574 sqlalchemy = attempt_import('sqlalchemy') 1575 from meerschaum.connectors.sql.tables import get_tables 1576 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 1577 1578 query = sqlalchemy.select(pipes_tbl.c.pipe_id).where( 1579 pipes_tbl.c.connector_keys == pipe.connector_keys 1580 ).where( 1581 pipes_tbl.c.metric_key == pipe.metric_key 1582 ).where( 1583 (pipes_tbl.c.location_key == pipe.location_key) if pipe.location_key is not None 1584 else pipes_tbl.c.location_key.is_(None) 1585 ) 1586 _id = self.value(query, debug=debug, silent=pipe.temporary) 1587 if _id is not None: 1588 _id = int(_id) 1589 return _id
Get a Pipe's ID from the pipes table.
1592def get_pipe_attributes( 1593 self, 1594 pipe: mrsm.Pipe, 1595 debug: bool = False, 1596) -> Dict[str, Any]: 1597 """ 1598 Get a Pipe's attributes dictionary. 1599 """ 1600 from meerschaum.connectors.sql.tables import get_tables 1601 from meerschaum.utils.packages import attempt_import 1602 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 1603 1604 if pipe.id is None: 1605 return {} 1606 1607 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 1608 1609 try: 1610 q = sqlalchemy.select(pipes_tbl).where(pipes_tbl.c.pipe_id == pipe.id) 1611 if debug: 1612 dprint(q) 1613 rows = ( 1614 self.exec(q, silent=True, debug=debug).mappings().all() 1615 if self.flavor != 'duckdb' 1616 else self.read(q, debug=debug).to_dict(orient='records') 1617 ) 1618 if not rows: 1619 return {} 1620 attributes = dict(rows[0]) 1621 except Exception: 1622 if debug: 1623 dprint(traceback.format_exc()) 1624 return {} 1625 1626 ### handle non-PostgreSQL databases (text vs JSON) 1627 if not isinstance(attributes.get('parameters', None), dict): 1628 try: 1629 import json 1630 parameters = json.loads(attributes['parameters']) 1631 if isinstance(parameters, str) and parameters[0] == '{': 1632 parameters = json.loads(parameters) 1633 attributes['parameters'] = parameters 1634 except Exception: 1635 attributes['parameters'] = {} 1636 1637 return attributes
Get a Pipe's attributes dictionary.
1803def sync_pipe( 1804 self, 1805 pipe: mrsm.Pipe, 1806 df: Union[pd.DataFrame, str, Dict[Any, Any], None] = None, 1807 begin: Union[datetime, int, None] = None, 1808 end: Union[datetime, int, None] = None, 1809 chunksize: Optional[int] = -1, 1810 check_existing: bool = True, 1811 blocking: bool = True, 1812 debug: bool = False, 1813 _check_temporary_tables: bool = True, 1814 **kw: Any 1815) -> SuccessTuple: 1816 """ 1817 Sync a pipe using a database connection. 1818 1819 Parameters 1820 ---------- 1821 pipe: mrsm.Pipe 1822 The Meerschaum Pipe instance into which to sync the data. 1823 1824 df: Union[pandas.DataFrame, str, Dict[Any, Any], List[Dict[str, Any]]] 1825 An optional DataFrame or equivalent to sync into the pipe. 1826 Defaults to `None`. 1827 1828 begin: Union[datetime, int, None], default None 1829 Optionally specify the earliest datetime to search for data. 1830 Defaults to `None`. 1831 1832 end: Union[datetime, int, None], default None 1833 Optionally specify the latest datetime to search for data. 1834 Defaults to `None`. 1835 1836 chunksize: Optional[int], default -1 1837 Specify the number of rows to sync per chunk. 1838 If `-1`, resort to system configuration (default is `900`). 1839 A `chunksize` of `None` will sync all rows in one transaction. 1840 Defaults to `-1`. 1841 1842 check_existing: bool, default True 1843 If `True`, pull and diff with existing data from the pipe. Defaults to `True`. 1844 1845 blocking: bool, default True 1846 If `True`, wait for sync to finish and return its result, otherwise asyncronously sync. 1847 Defaults to `True`. 1848 1849 debug: bool, default False 1850 Verbosity toggle. Defaults to False. 1851 1852 kw: Any 1853 Catch-all for keyword arguments. 1854 1855 Returns 1856 ------- 1857 A `SuccessTuple` of success (`bool`) and message (`str`). 1858 """ 1859 from meerschaum.utils.packages import import_pandas 1860 from meerschaum.utils.sql import ( 1861 get_update_queries, 1862 sql_item_name, 1863 UPDATE_QUERIES, 1864 get_reset_autoincrement_queries, 1865 ) 1866 from meerschaum.utils.dtypes import get_current_timestamp 1867 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 1868 from meerschaum.utils.dataframe import get_special_cols 1869 from meerschaum import Pipe 1870 import time 1871 import copy 1872 pd = import_pandas() 1873 if df is None: 1874 msg = f"DataFrame is None. Cannot sync {pipe}." 1875 warn(msg) 1876 return False, msg 1877 1878 start = time.perf_counter() 1879 pipe_name = sql_item_name(pipe.target, self.flavor, schema=self.get_pipe_schema(pipe)) 1880 dtypes = pipe.get_dtypes(debug=debug) 1881 1882 if not pipe.temporary and not pipe.id: 1883 register_tuple = pipe.register(debug=debug) 1884 if not register_tuple[0]: 1885 return register_tuple 1886 1887 ### df is the dataframe returned from the remote source 1888 ### via the connector 1889 if debug: 1890 dprint("Fetched data:\n" + str(df)) 1891 1892 if not isinstance(df, pd.DataFrame): 1893 df = pipe.enforce_dtypes( 1894 df, 1895 chunksize=chunksize, 1896 safe_copy=kw.get('safe_copy', False), 1897 dtypes=dtypes, 1898 debug=debug, 1899 ) 1900 1901 ### if table does not exist, create it with indices 1902 is_new = False 1903 if not pipe.exists(debug=debug): 1904 check_existing = False 1905 is_new = True 1906 else: 1907 ### Check for new columns. 1908 add_cols_queries = self.get_add_columns_queries(pipe, df, debug=debug) 1909 if add_cols_queries: 1910 pipe._clear_cache_key('_columns_types', debug=debug) 1911 pipe._clear_cache_key('_columns_indices', debug=debug) 1912 if not self.exec_queries(add_cols_queries, debug=debug): 1913 warn(f"Failed to add new columns to {pipe}.") 1914 1915 alter_cols_queries = self.get_alter_columns_queries(pipe, df, debug=debug) 1916 if alter_cols_queries: 1917 pipe._clear_cache_key('_columns_types', debug=debug) 1918 pipe._clear_cache_key('_columns_types', debug=debug) 1919 if not self.exec_queries(alter_cols_queries, debug=debug): 1920 warn(f"Failed to alter columns for {pipe}.") 1921 1922 upsert = pipe.parameters.get('upsert', False) and (self.flavor + '-upsert') in UPDATE_QUERIES 1923 if upsert: 1924 check_existing = False 1925 kw['safe_copy'] = kw.get('safe_copy', False) 1926 1927 unseen_df, update_df, delta_df = ( 1928 pipe.filter_existing( 1929 df, 1930 chunksize=chunksize, 1931 debug=debug, 1932 **kw 1933 ) if check_existing else (df, None, df) 1934 ) 1935 if upsert: 1936 unseen_df, update_df, delta_df = (df.head(0), df, df) 1937 1938 if debug: 1939 dprint("Delta data:\n" + str(delta_df)) 1940 dprint("Unseen data:\n" + str(unseen_df)) 1941 if update_df is not None: 1942 dprint(("Update" if not upsert else "Upsert") + " data:\n" + str(update_df)) 1943 1944 if_exists = kw.get('if_exists', 'append') 1945 if 'if_exists' in kw: 1946 kw.pop('if_exists') 1947 if 'name' in kw: 1948 kw.pop('name') 1949 1950 ### Insert new data into the target table. 1951 unseen_kw = copy.deepcopy(kw) 1952 unseen_kw.update({ 1953 'name': pipe.target, 1954 'if_exists': if_exists, 1955 'debug': debug, 1956 'as_dict': True, 1957 'safe_copy': kw.get('safe_copy', False), 1958 'chunksize': chunksize, 1959 'dtype': self.get_to_sql_dtype(pipe, unseen_df, update_dtypes=True), 1960 'schema': self.get_pipe_schema(pipe), 1961 }) 1962 1963 dt_col = pipe.columns.get('datetime', None) 1964 primary_key = pipe.columns.get('primary', None) 1965 autoincrement = ( 1966 pipe.parameters.get('autoincrement', False) 1967 or ( 1968 is_new 1969 and primary_key 1970 and primary_key 1971 not in dtypes 1972 and primary_key not in unseen_df.columns 1973 ) 1974 ) 1975 if autoincrement and autoincrement not in pipe.parameters: 1976 update_success, update_msg = pipe.update_parameters( 1977 {'autoincrement': autoincrement}, 1978 debug=debug, 1979 ) 1980 if not update_success: 1981 return update_success, update_msg 1982 1983 def _check_pk(_df_to_clear): 1984 if _df_to_clear is None: 1985 return 1986 if primary_key not in _df_to_clear.columns: 1987 return 1988 if not _df_to_clear[primary_key].notnull().any(): 1989 del _df_to_clear[primary_key] 1990 1991 autoincrement_needs_reset = bool( 1992 autoincrement 1993 and primary_key 1994 and primary_key in unseen_df.columns 1995 and unseen_df[primary_key].notnull().any() 1996 ) 1997 if autoincrement and primary_key: 1998 for _df_to_clear in (unseen_df, update_df, delta_df): 1999 _check_pk(_df_to_clear) 2000 2001 if is_new: 2002 create_success, create_msg = self.create_pipe_table_from_df( 2003 pipe, 2004 unseen_df, 2005 debug=debug, 2006 ) 2007 if not create_success: 2008 return create_success, create_msg 2009 2010 ### Pre-create native range partitions (non-TimescaleDB) so the rows about to be written 2011 ### land in an existing partition. No-op for non-partitioned pipes. 2012 if self._should_partition(pipe): 2013 for _part_df in (unseen_df, update_df): 2014 if _part_df is not None and len(_part_df) > 0: 2015 part_success, part_msg = self._create_missing_partitions( 2016 pipe, _part_df, debug=debug, 2017 ) 2018 if not part_success: 2019 return part_success, part_msg 2020 2021 do_identity_insert = bool( 2022 self.flavor in ('mssql',) 2023 and primary_key 2024 and primary_key in unseen_df.columns 2025 and autoincrement 2026 ) 2027 stats = {'success': True, 'msg': ''} 2028 if len(unseen_df) > 0: 2029 with self.engine.connect() as connection: 2030 with connection.begin(): 2031 if do_identity_insert: 2032 identity_on_result = self.exec( 2033 f"SET IDENTITY_INSERT {pipe_name} ON", 2034 commit=False, 2035 _connection=connection, 2036 close=False, 2037 debug=debug, 2038 ) 2039 if identity_on_result is None: 2040 return False, f"Could not enable identity inserts on {pipe}." 2041 2042 stats = self.to_sql( 2043 unseen_df, 2044 _connection=connection, 2045 **unseen_kw 2046 ) 2047 2048 if do_identity_insert: 2049 identity_off_result = self.exec( 2050 f"SET IDENTITY_INSERT {pipe_name} OFF", 2051 commit=False, 2052 _connection=connection, 2053 close=False, 2054 debug=debug, 2055 ) 2056 if identity_off_result is None: 2057 return False, f"Could not disable identity inserts on {pipe}." 2058 2059 if is_new: 2060 if not self.create_indices(pipe, debug=debug): 2061 warn(f"Failed to create indices for {pipe}. Continuing...") 2062 2063 if autoincrement_needs_reset: 2064 reset_autoincrement_queries = get_reset_autoincrement_queries( 2065 pipe.target, 2066 primary_key, 2067 self, 2068 schema=self.get_pipe_schema(pipe), 2069 debug=debug, 2070 ) 2071 results = self.exec_queries(reset_autoincrement_queries, debug=debug) 2072 for result in results: 2073 if result is None: 2074 warn(f"Could not reset auto-incrementing primary key for {pipe}.", stack=False) 2075 2076 if update_df is not None and len(update_df) > 0: 2077 temp_target = self.get_temporary_target( 2078 pipe.target, 2079 label=('update' if not upsert else 'upsert'), 2080 ) 2081 self._log_temporary_tables_creation(temp_target, create=(not pipe.temporary), debug=debug) 2082 update_dtypes = { 2083 **{ 2084 col: str(typ) 2085 for col, typ in update_df.dtypes.items() 2086 }, 2087 **get_special_cols(update_df) 2088 } 2089 2090 temp_pipe = Pipe( 2091 pipe.connector_keys.replace(':', '_') + '_', pipe.metric_key, pipe.location_key, 2092 instance=pipe.instance_keys, 2093 columns={ 2094 (ix_key if ix_key != 'primary' else 'primary_'): ix 2095 for ix_key, ix in pipe.columns.items() 2096 if ix and ix in update_df.columns 2097 }, 2098 dtypes=update_dtypes, 2099 target=temp_target, 2100 temporary=True, 2101 enforce=False, 2102 static=True, 2103 autoincrement=False, 2104 cache=False, 2105 parameters={ 2106 'schema': self.internal_schema, 2107 'hypertable': False, 2108 }, 2109 ) 2110 _temp_columns_types = { 2111 col: get_db_type_from_pd_type(typ, self.flavor) 2112 for col, typ in update_dtypes.items() 2113 } 2114 temp_pipe._cache_value('_columns_types', _temp_columns_types, memory_only=True, debug=debug) 2115 temp_pipe._cache_value('_skip_check_indices', True, memory_only=True, debug=debug) 2116 now_ts = get_current_timestamp('ms', as_int=True) / 1000 2117 temp_pipe._cache_value('_columns_types_timestamp', now_ts, memory_only=True, debug=debug) 2118 temp_success, temp_msg = temp_pipe.sync(update_df, check_existing=False, debug=debug) 2119 if not temp_success: 2120 return temp_success, temp_msg 2121 2122 existing_cols = pipe.get_columns_types(debug=debug) 2123 ### A partitioned table (TimescaleDB hypertable or a native range-partitioned table on 2124 ### PostgreSQL/MySQL/MSSQL) folds the datetime column into its composite primary key, so the 2125 ### upsert conflict target must include it too — `ON CONFLICT (primary_key)` alone has no 2126 ### matching unique constraint. Non-partitioned tables keep the historical primary-key-only 2127 ### semantics ("the primary key is the identity, regardless of datetime"). 2128 partition_upsert = bool( 2129 dt_col 2130 and dt_col in update_df.columns 2131 and ( 2132 self.flavor in ('timescaledb', 'timescaledb-ha') 2133 or self._should_partition(pipe) 2134 ) 2135 ) 2136 join_cols = [ 2137 col 2138 for col_key, col in pipe.columns.items() 2139 if col and col in existing_cols 2140 ] if not primary_key or self.flavor == 'oracle' else ( 2141 [dt_col, primary_key] 2142 if partition_upsert 2143 else [primary_key] 2144 ) 2145 update_queries = get_update_queries( 2146 pipe.target, 2147 temp_target, 2148 self, 2149 join_cols, 2150 upsert=upsert, 2151 schema=self.get_pipe_schema(pipe), 2152 patch_schema=self.internal_schema, 2153 target_cols_types=pipe.get_columns_types(debug=debug), 2154 patch_cols_types=_temp_columns_types, 2155 datetime_col=(dt_col if dt_col in update_df.columns else None), 2156 identity_insert=(autoincrement and primary_key in update_df.columns), 2157 null_indices=pipe.null_indices, 2158 cast_columns=pipe.enforce, 2159 debug=debug, 2160 ) 2161 update_results = self.exec_queries( 2162 update_queries, 2163 break_on_error=True, 2164 rollback=True, 2165 debug=debug, 2166 ) 2167 update_success = all(update_results) 2168 self._log_temporary_tables_creation( 2169 temp_target, 2170 ready_to_drop=True, 2171 create=(not pipe.temporary), 2172 debug=debug, 2173 ) 2174 if not update_success: 2175 warn(f"Failed to apply update to {pipe}.") 2176 stats['success'] = stats['success'] and update_success 2177 stats['msg'] = ( 2178 (stats.get('msg', '') + f'\nFailed to apply update to {pipe}.').lstrip() 2179 if not update_success 2180 else stats.get('msg', '') 2181 ) 2182 2183 stop = time.perf_counter() 2184 success = stats['success'] 2185 if not success: 2186 return success, stats['msg'] or str(stats) 2187 2188 unseen_count = len(unseen_df.index) if unseen_df is not None else 0 2189 update_count = len(update_df.index) if update_df is not None else 0 2190 msg = ( 2191 ( 2192 f"Inserted {unseen_count:,}, " 2193 + f"updated {update_count:,} rows." 2194 ) 2195 if not upsert 2196 else ( 2197 f"Upserted {update_count:,} row" 2198 + ('s' if update_count != 1 else '') 2199 + "." 2200 ) 2201 ) 2202 if debug: 2203 msg = msg[:-1] + ( 2204 f"\non table {sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))}\n" 2205 + f"in {round(stop - start, 2)} seconds." 2206 ) 2207 2208 if _check_temporary_tables: 2209 drop_stale_success, drop_stale_msg = self._drop_old_temporary_tables( 2210 refresh=False, debug=debug 2211 ) 2212 if not drop_stale_success: 2213 warn(drop_stale_msg) 2214 2215 return success, msg
Sync a pipe using a database connection.
Parameters
- pipe (mrsm.Pipe): The Meerschaum Pipe instance into which to sync the data.
- df (Union[pandas.DataFrame, str, Dict[Any, Any], List[Dict[str, Any]]]):
An optional DataFrame or equivalent to sync into the pipe.
Defaults to
None. - begin (Union[datetime, int, None], default None):
Optionally specify the earliest datetime to search for data.
Defaults to
None. - end (Union[datetime, int, None], default None):
Optionally specify the latest datetime to search for data.
Defaults to
None. - chunksize (Optional[int], default -1):
Specify the number of rows to sync per chunk.
If
-1, resort to system configuration (default is900). AchunksizeofNonewill sync all rows in one transaction. Defaults to-1. - check_existing (bool, default True):
If
True, pull and diff with existing data from the pipe. Defaults toTrue. - blocking (bool, default True):
If
True, wait for sync to finish and return its result, otherwise asyncronously sync. Defaults toTrue. - debug (bool, default False): Verbosity toggle. Defaults to False.
- kw (Any): Catch-all for keyword arguments.
Returns
- A
SuccessTupleof success (bool) and message (str).
2218def sync_pipe_inplace( 2219 self, 2220 pipe: 'mrsm.Pipe', 2221 params: Optional[Dict[str, Any]] = None, 2222 begin: Union[datetime, int, None] = None, 2223 end: Union[datetime, int, None] = None, 2224 chunksize: Optional[int] = -1, 2225 check_existing: bool = True, 2226 debug: bool = False, 2227 **kw: Any 2228) -> SuccessTuple: 2229 """ 2230 If a pipe's connector is the same as its instance connector, 2231 it's more efficient to sync the pipe in-place rather than reading data into Pandas. 2232 2233 Parameters 2234 ---------- 2235 pipe: mrsm.Pipe 2236 The pipe whose connector is the same as its instance. 2237 2238 params: Optional[Dict[str, Any]], default None 2239 Optional params dictionary to build the `WHERE` clause. 2240 See `meerschaum.utils.sql.build_where`. 2241 2242 begin: Union[datetime, int, None], default None 2243 Optionally specify the earliest datetime to search for data. 2244 Defaults to `None`. 2245 2246 end: Union[datetime, int, None], default None 2247 Optionally specify the latest datetime to search for data. 2248 Defaults to `None`. 2249 2250 chunksize: Optional[int], default -1 2251 Specify the number of rows to sync per chunk. 2252 If `-1`, resort to system configuration (default is `900`). 2253 A `chunksize` of `None` will sync all rows in one transaction. 2254 Defaults to `-1`. 2255 2256 check_existing: bool, default True 2257 If `True`, pull and diff with existing data from the pipe. 2258 2259 debug: bool, default False 2260 Verbosity toggle. 2261 2262 Returns 2263 ------- 2264 A SuccessTuple. 2265 """ 2266 if self.flavor == 'duckdb': 2267 return pipe.sync( 2268 params=params, 2269 begin=begin, 2270 end=end, 2271 chunksize=chunksize, 2272 check_existing=check_existing, 2273 debug=debug, 2274 _inplace=False, 2275 **kw 2276 ) 2277 from meerschaum.utils.sql import ( 2278 sql_item_name, 2279 get_update_queries, 2280 get_null_replacement, 2281 get_create_table_queries, 2282 get_create_schema_if_not_exists_queries, 2283 get_table_cols_types, 2284 session_execute, 2285 dateadd_str, 2286 UPDATE_QUERIES, 2287 ) 2288 from meerschaum.utils.dtypes.sql import ( 2289 get_pd_type_from_db_type, 2290 get_db_type_from_pd_type, 2291 ) 2292 from meerschaum.utils.misc import generate_password 2293 2294 transaction_id_length = ( 2295 mrsm.get_config( 2296 'system', 'connectors', 'sql', 'instance', 'temporary_target', 'transaction_id_length' 2297 ) 2298 ) 2299 transact_id = generate_password(transaction_id_length) 2300 2301 internal_schema = self.internal_schema 2302 target = pipe.target 2303 temp_table_roots = ['backtrack', 'new', 'delta', 'joined', 'unseen', 'update'] 2304 temp_tables = { 2305 table_root: self.get_temporary_target(target, transact_id=transact_id, label=table_root) 2306 for table_root in temp_table_roots 2307 } 2308 temp_table_names = { 2309 table_root: sql_item_name(table_name_raw, self.flavor, internal_schema) 2310 for table_root, table_name_raw in temp_tables.items() 2311 } 2312 temp_table_aliases = { 2313 table_root: sql_item_name(table_root, self.flavor) 2314 for table_root in temp_table_roots 2315 } 2316 table_alias_as = " AS" if self.flavor != 'oracle' else '' 2317 metadef = self.get_pipe_metadef( 2318 pipe, 2319 params=params, 2320 begin=begin, 2321 end=end, 2322 check_existing=check_existing, 2323 debug=debug, 2324 ) 2325 pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 2326 upsert = pipe.parameters.get('upsert', False) and f'{self.flavor}-upsert' in UPDATE_QUERIES 2327 static = pipe.parameters.get('static', False) 2328 database = getattr(self, 'database', self.parse_uri(self.URI).get('database', None)) 2329 primary_key = pipe.columns.get('primary', None) 2330 primary_key_typ = pipe.dtypes.get(primary_key, None) if primary_key else None 2331 primary_key_db_type = ( 2332 get_db_type_from_pd_type(primary_key_typ, self.flavor) 2333 if primary_key_typ 2334 else None 2335 ) 2336 if not {col_key: col for col_key, col in pipe.columns.items() if col_key and col}: 2337 return False, "Cannot sync in-place without index columns." 2338 2339 autoincrement = pipe.parameters.get('autoincrement', False) 2340 dt_col = pipe.columns.get('datetime', None) 2341 dt_col_name = sql_item_name(dt_col, self.flavor, None) if dt_col else None 2342 dt_typ = pipe.dtypes.get(dt_col, 'datetime') if dt_col else None 2343 dt_db_type = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None 2344 2345 def clean_up_temp_tables(ready_to_drop: bool = False): 2346 log_success, log_msg = self._log_temporary_tables_creation( 2347 [ 2348 table 2349 for table in temp_tables.values() 2350 ] if not upsert else [temp_tables['update']], 2351 ready_to_drop=ready_to_drop, 2352 create=(not pipe.temporary), 2353 debug=debug, 2354 ) 2355 if not log_success: 2356 warn(log_msg) 2357 drop_stale_success, drop_stale_msg = self._drop_old_temporary_tables( 2358 refresh=False, 2359 debug=debug, 2360 ) 2361 if not drop_stale_success: 2362 warn(drop_stale_msg) 2363 return drop_stale_success, drop_stale_msg 2364 2365 sqlalchemy, sqlalchemy_orm = mrsm.attempt_import( 2366 'sqlalchemy', 2367 'sqlalchemy.orm', 2368 ) 2369 if not pipe.exists(debug=debug): 2370 schema = self.get_pipe_schema(pipe) 2371 create_pipe_queries = get_create_table_queries( 2372 metadef, 2373 pipe.target, 2374 self.flavor, 2375 schema=schema, 2376 primary_key=primary_key, 2377 primary_key_db_type=primary_key_db_type, 2378 autoincrement=autoincrement, 2379 datetime_column=dt_col, 2380 ) 2381 if schema: 2382 create_pipe_queries = ( 2383 get_create_schema_if_not_exists_queries(schema, self.flavor) 2384 + create_pipe_queries 2385 ) 2386 2387 results = self.exec_queries(create_pipe_queries, debug=debug) 2388 if not all(results): 2389 _ = clean_up_temp_tables() 2390 return False, f"Could not insert new data into {pipe} from its SQL query definition." 2391 2392 if not self.create_indices(pipe, debug=debug): 2393 warn(f"Failed to create indices for {pipe}. Continuing...") 2394 2395 rowcount = pipe.get_rowcount(debug=debug) 2396 _ = clean_up_temp_tables() 2397 return True, f"Inserted {rowcount:,}, updated 0 rows." 2398 2399 session = sqlalchemy_orm.Session(self.engine) 2400 connectable = session if self.flavor != 'duckdb' else self 2401 2402 create_new_query = get_create_table_queries( 2403 metadef, 2404 temp_tables[('new') if not upsert else 'update'], 2405 self.flavor, 2406 schema=internal_schema, 2407 )[0] 2408 (create_new_success, create_new_msg), create_new_results = session_execute( 2409 session, 2410 create_new_query, 2411 with_results=True, 2412 debug=debug, 2413 ) 2414 if not create_new_success: 2415 _ = clean_up_temp_tables() 2416 return create_new_success, create_new_msg 2417 new_count = create_new_results[0].rowcount if create_new_results else 0 2418 2419 new_cols_types = get_table_cols_types( 2420 temp_tables[('new' if not upsert else 'update')], 2421 connectable=connectable, 2422 flavor=self.flavor, 2423 schema=internal_schema, 2424 database=database, 2425 debug=debug, 2426 ) if not static else pipe.get_columns_types(debug=debug) 2427 if not new_cols_types: 2428 return False, f"Failed to get new columns for {pipe}." 2429 2430 new_cols = { 2431 str(col_name): get_pd_type_from_db_type(str(col_type)) 2432 for col_name, col_type in new_cols_types.items() 2433 } 2434 new_cols_str = '\n ' + ',\n '.join([ 2435 sql_item_name(col, self.flavor) 2436 for col in new_cols 2437 ]) 2438 def get_col_typ(col: str, cols_types: Dict[str, str]) -> str: 2439 if self.flavor == 'oracle' and new_cols_types.get(col, '').lower() == 'char': 2440 return new_cols_types[col] 2441 return cols_types[col] 2442 2443 add_cols_queries = self.get_add_columns_queries(pipe, new_cols, debug=debug) 2444 if add_cols_queries: 2445 pipe._clear_cache_key('_columns_types', debug=debug) 2446 pipe._clear_cache_key('_columns_indices', debug=debug) 2447 self.exec_queries(add_cols_queries, debug=debug) 2448 2449 alter_cols_queries = self.get_alter_columns_queries(pipe, new_cols, debug=debug) 2450 if alter_cols_queries: 2451 pipe._clear_cache_key('_columns_types', debug=debug) 2452 self.exec_queries(alter_cols_queries, debug=debug) 2453 2454 insert_queries = [ 2455 ( 2456 f"INSERT INTO {pipe_name} ({new_cols_str})\n" 2457 f"SELECT {new_cols_str}\nFROM {temp_table_names['new']}{table_alias_as}" 2458 f" {temp_table_aliases['new']}" 2459 ) 2460 ] if not check_existing and not upsert else [] 2461 2462 new_queries = insert_queries 2463 new_success, new_msg = ( 2464 session_execute(session, new_queries, debug=debug) 2465 if new_queries 2466 else (True, "Success") 2467 ) 2468 if not new_success: 2469 _ = clean_up_temp_tables() 2470 return new_success, new_msg 2471 2472 if not check_existing: 2473 session.commit() 2474 _ = clean_up_temp_tables() 2475 return True, f"Inserted {new_count}, updated 0 rows." 2476 2477 min_dt_col_name_da = dateadd_str( 2478 flavor=self.flavor, begin=f"MIN({dt_col_name})", db_type=dt_db_type, 2479 ) 2480 max_dt_col_name_da = dateadd_str( 2481 flavor=self.flavor, begin=f"MAX({dt_col_name})", db_type=dt_db_type, 2482 ) 2483 2484 (new_dt_bounds_success, new_dt_bounds_msg), new_dt_bounds_results = session_execute( 2485 session, 2486 [ 2487 "SELECT\n" 2488 f" {min_dt_col_name_da} AS {sql_item_name('min_dt', self.flavor)},\n" 2489 f" {max_dt_col_name_da} AS {sql_item_name('max_dt', self.flavor)}\n" 2490 f"FROM {temp_table_names['new' if not upsert else 'update']}\n" 2491 f"WHERE {dt_col_name} IS NOT NULL" 2492 ], 2493 with_results=True, 2494 debug=debug, 2495 ) if dt_col and not upsert else ((True, "Success"), None) 2496 if not new_dt_bounds_success: 2497 return ( 2498 new_dt_bounds_success, 2499 f"Could not determine in-place datetime bounds:\n{new_dt_bounds_msg}" 2500 ) 2501 2502 if dt_col and not upsert: 2503 begin, end = new_dt_bounds_results[0].fetchone() 2504 2505 backtrack_def = self.get_pipe_data_query( 2506 pipe, 2507 begin=begin, 2508 end=end, 2509 begin_add_minutes=0, 2510 end_add_minutes=1, 2511 params=params, 2512 debug=debug, 2513 order=None, 2514 ) 2515 create_backtrack_query = get_create_table_queries( 2516 backtrack_def, 2517 temp_tables['backtrack'], 2518 self.flavor, 2519 schema=internal_schema, 2520 )[0] 2521 (create_backtrack_success, create_backtrack_msg), create_backtrack_results = session_execute( 2522 session, 2523 create_backtrack_query, 2524 with_results=True, 2525 debug=debug, 2526 ) if not upsert else ((True, "Success"), None) 2527 2528 if not create_backtrack_success: 2529 _ = clean_up_temp_tables() 2530 return create_backtrack_success, create_backtrack_msg 2531 2532 backtrack_cols_types = get_table_cols_types( 2533 temp_tables['backtrack'], 2534 connectable=connectable, 2535 flavor=self.flavor, 2536 schema=internal_schema, 2537 database=database, 2538 debug=debug, 2539 ) if not (upsert or static) else new_cols_types 2540 2541 common_cols = [col for col in new_cols if col in backtrack_cols_types] 2542 primary_key = pipe.columns.get('primary', None) 2543 on_cols = { 2544 col: new_cols.get(col) 2545 for col_key, col in pipe.columns.items() 2546 if ( 2547 col 2548 and 2549 col_key != 'value' 2550 and col in backtrack_cols_types 2551 and col in new_cols 2552 ) 2553 } if not primary_key else {primary_key: new_cols.get(primary_key)} 2554 if not on_cols: 2555 raise ValueError("Cannot sync without common index columns.") 2556 2557 null_replace_new_cols_str = ( 2558 '\n ' + ',\n '.join([ 2559 f"COALESCE({temp_table_aliases['new']}.{sql_item_name(col, self.flavor)}, " 2560 + get_null_replacement(get_col_typ(col, new_cols_types), self.flavor) 2561 + ") AS " 2562 + sql_item_name(col, self.flavor, None) 2563 for col, typ in new_cols.items() 2564 ]) 2565 ) 2566 2567 select_delta_query = ( 2568 "SELECT" 2569 + null_replace_new_cols_str 2570 + f"\nFROM {temp_table_names['new']}{table_alias_as} {temp_table_aliases['new']}\n" 2571 + f"LEFT OUTER JOIN {temp_table_names['backtrack']}{table_alias_as} {temp_table_aliases['backtrack']}" 2572 + "\n ON\n " 2573 + '\n AND\n '.join([ 2574 ( 2575 f" COALESCE({temp_table_aliases['new']}." 2576 + sql_item_name(c, self.flavor, None) 2577 + ", " 2578 + get_null_replacement(get_col_typ(c, new_cols_types), self.flavor) 2579 + ")" 2580 + '\n =\n ' 2581 + f" COALESCE({temp_table_aliases['backtrack']}." 2582 + sql_item_name(c, self.flavor, None) 2583 + ", " 2584 + get_null_replacement(get_col_typ(c, backtrack_cols_types), self.flavor) 2585 + ") " 2586 ) for c in common_cols 2587 ]) 2588 + "\nWHERE\n " 2589 + '\n AND\n '.join([ 2590 ( 2591 f"{temp_table_aliases['backtrack']}." + sql_item_name(c, self.flavor) + ' IS NULL' 2592 ) for c in common_cols 2593 ]) 2594 ) 2595 create_delta_query = get_create_table_queries( 2596 select_delta_query, 2597 temp_tables['delta'], 2598 self.flavor, 2599 schema=internal_schema, 2600 )[0] 2601 create_delta_success, create_delta_msg = session_execute( 2602 session, 2603 create_delta_query, 2604 debug=debug, 2605 ) if not upsert else (True, "Success") 2606 if not create_delta_success: 2607 _ = clean_up_temp_tables() 2608 return create_delta_success, create_delta_msg 2609 2610 delta_cols_types = get_table_cols_types( 2611 temp_tables['delta'], 2612 connectable=connectable, 2613 flavor=self.flavor, 2614 schema=internal_schema, 2615 database=database, 2616 debug=debug, 2617 ) if not (upsert or static) else new_cols_types 2618 2619 ### This is a weird bug on SQLite. 2620 ### Sometimes the backtrack dtypes are all empty strings. 2621 if not all(delta_cols_types.values()): 2622 delta_cols_types = new_cols_types 2623 2624 delta_cols = { 2625 col: get_pd_type_from_db_type(typ) 2626 for col, typ in delta_cols_types.items() 2627 } 2628 delta_cols_str = ', '.join([ 2629 sql_item_name(col, self.flavor) 2630 for col in delta_cols 2631 ]) 2632 2633 select_joined_query = ( 2634 "SELECT\n " 2635 + (',\n '.join([ 2636 ( 2637 f"{temp_table_aliases['delta']}." + sql_item_name(c, self.flavor, None) 2638 + " AS " + sql_item_name(c + '_delta', self.flavor, None) 2639 ) for c in delta_cols 2640 ])) 2641 + ",\n " 2642 + (',\n '.join([ 2643 ( 2644 f"{temp_table_aliases['backtrack']}." + sql_item_name(c, self.flavor, None) 2645 + " AS " + sql_item_name(c + '_backtrack', self.flavor, None) 2646 ) for c in backtrack_cols_types 2647 ])) 2648 + f"\nFROM {temp_table_names['delta']}{table_alias_as} {temp_table_aliases['delta']}\n" 2649 + f"LEFT OUTER JOIN {temp_table_names['backtrack']}{table_alias_as}" 2650 + f" {temp_table_aliases['backtrack']}" 2651 + "\n ON\n " 2652 + '\n AND\n '.join([ 2653 ( 2654 f" COALESCE({temp_table_aliases['delta']}." + sql_item_name(c, self.flavor) 2655 + ", " 2656 + get_null_replacement(get_col_typ(c, new_cols_types), self.flavor) + ")" 2657 + '\n =\n ' 2658 + f" COALESCE({temp_table_aliases['backtrack']}." + sql_item_name(c, self.flavor) 2659 + ", " 2660 + get_null_replacement(get_col_typ(c, new_cols_types), self.flavor) + ")" 2661 ) for c, typ in on_cols.items() 2662 ]) 2663 ) 2664 2665 create_joined_query = get_create_table_queries( 2666 select_joined_query, 2667 temp_tables['joined'], 2668 self.flavor, 2669 schema=internal_schema, 2670 )[0] 2671 create_joined_success, create_joined_msg = session_execute( 2672 session, 2673 create_joined_query, 2674 debug=debug, 2675 ) if on_cols and not upsert else (True, "Success") 2676 if not create_joined_success: 2677 _ = clean_up_temp_tables() 2678 return create_joined_success, create_joined_msg 2679 2680 select_unseen_query = ( 2681 "SELECT\n " 2682 + (',\n '.join([ 2683 ( 2684 "CASE\n WHEN " + sql_item_name(c + '_delta', self.flavor, None) 2685 + " != " + get_null_replacement(get_col_typ(c, delta_cols_types), self.flavor) 2686 + " THEN " + sql_item_name(c + '_delta', self.flavor, None) 2687 + "\n ELSE NULL\n END" 2688 + " AS " + sql_item_name(c, self.flavor, None) 2689 ) for c, typ in delta_cols.items() 2690 ])) 2691 + f"\nFROM {temp_table_names['joined']}{table_alias_as} {temp_table_aliases['joined']}\n" 2692 + "WHERE\n " 2693 + '\n AND\n '.join([ 2694 ( 2695 sql_item_name(c + '_backtrack', self.flavor, None) + ' IS NULL' 2696 ) for c in delta_cols 2697 ]) 2698 ) 2699 create_unseen_query = get_create_table_queries( 2700 select_unseen_query, 2701 temp_tables['unseen'], 2702 self.flavor, 2703 internal_schema, 2704 )[0] 2705 (create_unseen_success, create_unseen_msg), create_unseen_results = session_execute( 2706 session, 2707 create_unseen_query, 2708 with_results=True, 2709 debug=debug 2710 ) if not upsert else ((True, "Success"), None) 2711 if not create_unseen_success: 2712 _ = clean_up_temp_tables() 2713 return create_unseen_success, create_unseen_msg 2714 2715 select_update_query = ( 2716 "SELECT\n " 2717 + (',\n '.join([ 2718 ( 2719 "CASE\n WHEN " + sql_item_name(c + '_delta', self.flavor, None) 2720 + " != " + get_null_replacement(get_col_typ(c, delta_cols_types), self.flavor) 2721 + " THEN " + sql_item_name(c + '_delta', self.flavor, None) 2722 + "\n ELSE NULL\n END" 2723 + " AS " + sql_item_name(c, self.flavor, None) 2724 ) for c, typ in delta_cols.items() 2725 ])) 2726 + f"\nFROM {temp_table_names['joined']}{table_alias_as} {temp_table_aliases['joined']}\n" 2727 + "WHERE\n " 2728 + '\n OR\n '.join([ 2729 ( 2730 sql_item_name(c + '_backtrack', self.flavor, None) + ' IS NOT NULL' 2731 ) for c in delta_cols 2732 ]) 2733 ) 2734 2735 create_update_query = get_create_table_queries( 2736 select_update_query, 2737 temp_tables['update'], 2738 self.flavor, 2739 internal_schema, 2740 )[0] 2741 (create_update_success, create_update_msg), create_update_results = session_execute( 2742 session, 2743 create_update_query, 2744 with_results=True, 2745 debug=debug, 2746 ) if on_cols and not upsert else ((True, "Success"), []) 2747 apply_update_queries = ( 2748 get_update_queries( 2749 pipe.target, 2750 temp_tables['update'], 2751 session, 2752 on_cols, 2753 upsert=upsert, 2754 schema=self.get_pipe_schema(pipe), 2755 patch_schema=internal_schema, 2756 target_cols_types=pipe.get_columns_types(debug=debug), 2757 patch_cols_types=delta_cols_types, 2758 datetime_col=pipe.columns.get('datetime', None), 2759 flavor=self.flavor, 2760 null_indices=pipe.null_indices, 2761 cast_columns=pipe.enforce, 2762 debug=debug, 2763 ) 2764 if on_cols else [] 2765 ) 2766 2767 apply_unseen_queries = [ 2768 ( 2769 f"INSERT INTO {pipe_name} ({delta_cols_str})\n" 2770 + f"SELECT {delta_cols_str}\nFROM " 2771 + ( 2772 temp_table_names['unseen'] 2773 if on_cols 2774 else temp_table_names['delta'] 2775 ) 2776 ), 2777 ] 2778 2779 (apply_unseen_success, apply_unseen_msg), apply_unseen_results = session_execute( 2780 session, 2781 apply_unseen_queries, 2782 with_results=True, 2783 debug=debug, 2784 ) if not upsert else ((True, "Success"), None) 2785 if not apply_unseen_success: 2786 _ = clean_up_temp_tables() 2787 return apply_unseen_success, apply_unseen_msg 2788 unseen_count = apply_unseen_results[0].rowcount if apply_unseen_results else 0 2789 2790 (apply_update_success, apply_update_msg), apply_update_results = session_execute( 2791 session, 2792 apply_update_queries, 2793 with_results=True, 2794 debug=debug, 2795 ) 2796 if not apply_update_success: 2797 _ = clean_up_temp_tables() 2798 return apply_update_success, apply_update_msg 2799 update_count = apply_update_results[0].rowcount if apply_update_results else 0 2800 2801 session.commit() 2802 2803 msg = ( 2804 f"Inserted {unseen_count:,}, updated {update_count:,} rows." 2805 if not upsert 2806 else f"Upserted {update_count:,} row" + ('s' if update_count != 1 else '') + "." 2807 ) 2808 _ = clean_up_temp_tables(ready_to_drop=True) 2809 2810 return True, msg
If a pipe's connector is the same as its instance connector, it's more efficient to sync the pipe in-place rather than reading data into Pandas.
Parameters
- pipe (mrsm.Pipe): The pipe whose connector is the same as its instance.
- params (Optional[Dict[str, Any]], default None):
Optional params dictionary to build the
WHEREclause. Seemeerschaum.utils.sql.build_where. - begin (Union[datetime, int, None], default None):
Optionally specify the earliest datetime to search for data.
Defaults to
None. - end (Union[datetime, int, None], default None):
Optionally specify the latest datetime to search for data.
Defaults to
None. - chunksize (Optional[int], default -1):
Specify the number of rows to sync per chunk.
If
-1, resort to system configuration (default is900). AchunksizeofNonewill sync all rows in one transaction. Defaults to-1. - check_existing (bool, default True):
If
True, pull and diff with existing data from the pipe. - debug (bool, default False): Verbosity toggle.
Returns
- A SuccessTuple.
2813def get_sync_time( 2814 self, 2815 pipe: 'mrsm.Pipe', 2816 params: Optional[Dict[str, Any]] = None, 2817 newest: bool = True, 2818 remote: bool = False, 2819 debug: bool = False, 2820) -> Union[datetime, int, None]: 2821 """Get a Pipe's most recent datetime value. 2822 2823 Parameters 2824 ---------- 2825 pipe: mrsm.Pipe 2826 The pipe to get the sync time for. 2827 2828 params: Optional[Dict[str, Any]], default None 2829 Optional params dictionary to build the `WHERE` clause. 2830 See `meerschaum.utils.sql.build_where`. 2831 2832 newest: bool, default True 2833 If `True`, get the most recent datetime (honoring `params`). 2834 If `False`, get the oldest datetime (ASC instead of DESC). 2835 2836 remote: bool, default False 2837 If `True`, return the sync time for the remote fetch definition. 2838 2839 Returns 2840 ------- 2841 A `datetime` object (or `int` if using an integer axis) if the pipe exists, otherwise `None`. 2842 """ 2843 from meerschaum.utils.sql import sql_item_name, build_where, wrap_query_with_cte 2844 src_name = sql_item_name('src', self.flavor) 2845 table_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 2846 2847 dt_col = pipe.columns.get('datetime', None) 2848 if dt_col is None: 2849 return None 2850 dt_col_name = sql_item_name(dt_col, self.flavor, None) 2851 2852 if remote and pipe.connector.type != 'sql': 2853 warn(f"Cannot get the remote sync time for {pipe}.") 2854 return None 2855 2856 ASC_or_DESC = "DESC" if newest else "ASC" 2857 existing_cols = pipe.get_columns_types(debug=debug) 2858 if not remote and not existing_cols: 2859 return None 2860 valid_params = {} 2861 if params is not None: 2862 valid_params = {k: v for k, v in params.items() if k in existing_cols} 2863 flavor = self.flavor if not remote else pipe.connector.flavor 2864 2865 ### If no bounds are provided for the datetime column, 2866 ### add IS NOT NULL to the WHERE clause. 2867 if dt_col not in valid_params: 2868 valid_params[dt_col] = '_None' 2869 where = "" if not valid_params else build_where(valid_params, self) 2870 src_query = ( 2871 f"SELECT {dt_col_name}\nFROM {table_name}{where}" 2872 if not remote 2873 else self.get_pipe_metadef(pipe, params=params, begin=None, end=None) 2874 ) 2875 2876 base_query = ( 2877 f"SELECT {dt_col_name}\n" 2878 f"FROM {src_name}\n" 2879 f"ORDER BY {dt_col_name} {ASC_or_DESC}\n" 2880 f"LIMIT 1" 2881 ) 2882 if self.flavor == 'mssql': 2883 base_query = ( 2884 f"SELECT TOP 1 {dt_col_name}\n" 2885 f"FROM {src_name}\n" 2886 f"ORDER BY {dt_col_name} {ASC_or_DESC}" 2887 ) 2888 elif self.flavor == 'oracle': 2889 base_query = ( 2890 "SELECT * FROM (\n" 2891 f" SELECT {dt_col_name}\n" 2892 f" FROM {src_name}\n" 2893 f" ORDER BY {dt_col_name} {ASC_or_DESC}\n" 2894 ") WHERE ROWNUM = 1" 2895 ) 2896 2897 ### NOTE: MariaDB has an optimizer bug where `ORDER BY <dt> DESC/ASC LIMIT 1` against a 2898 ### `RANGE COLUMNS` partitioned table combined with a `WHERE` clause performs a partition 2899 ### index scan that stops early and returns zero rows (observed on MariaDB 12.x). The 2900 ### equivalent `MIN`/`MAX` aggregate scans the pruned partitions correctly, so use it for 2901 ### the bounds on partitioned MariaDB tables instead. 2902 if self.flavor == 'mariadb' and not remote and self._should_partition(pipe): 2903 agg_func = "MAX" if newest else "MIN" 2904 base_query = ( 2905 f"SELECT {agg_func}({dt_col_name}) AS {dt_col_name}\n" 2906 f"FROM {src_name}" 2907 ) 2908 2909 query = wrap_query_with_cte(src_query, base_query, flavor) 2910 2911 try: 2912 db_time = self.value(query, silent=True, debug=debug) 2913 2914 ### No datetime could be found. 2915 if db_time is None: 2916 return None 2917 ### sqlite returns str. 2918 if isinstance(db_time, str): 2919 dateutil_parser = mrsm.attempt_import('dateutil.parser') 2920 st = dateutil_parser.parse(db_time) 2921 ### Do nothing if a datetime object is returned. 2922 elif isinstance(db_time, datetime): 2923 if hasattr(db_time, 'to_pydatetime'): 2924 st = db_time.to_pydatetime() 2925 else: 2926 st = db_time 2927 ### Sometimes the datetime is actually a date. 2928 elif isinstance(db_time, date): 2929 st = datetime.combine(db_time, datetime.min.time()) 2930 ### Adding support for an integer datetime axis. 2931 elif 'int' in str(type(db_time)).lower(): 2932 st = int(db_time) 2933 ### Convert pandas timestamp to Python datetime. 2934 else: 2935 st = db_time.to_pydatetime() 2936 2937 sync_time = st 2938 2939 except Exception as e: 2940 sync_time = None 2941 warn(str(e)) 2942 2943 return sync_time
Get a Pipe's most recent datetime value.
Parameters
- pipe (mrsm.Pipe): The pipe to get the sync time for.
- params (Optional[Dict[str, Any]], default None):
Optional params dictionary to build the
WHEREclause. Seemeerschaum.utils.sql.build_where. - newest (bool, default True):
If
True, get the most recent datetime (honoringparams). IfFalse, get the oldest datetime (ASC instead of DESC). - remote (bool, default False):
If
True, return the sync time for the remote fetch definition.
Returns
- A
datetimeobject (orintif using an integer axis) if the pipe exists, otherwiseNone.
2946def pipe_exists( 2947 self, 2948 pipe: mrsm.Pipe, 2949 debug: bool = False 2950) -> bool: 2951 """ 2952 Check that a Pipe's table exists. 2953 2954 Parameters 2955 ---------- 2956 pipe: mrsm.Pipe: 2957 The pipe to check. 2958 2959 debug: bool, default False 2960 Verbosity toggle. 2961 2962 Returns 2963 ------- 2964 A `bool` corresponding to whether a pipe's table exists. 2965 2966 """ 2967 from meerschaum.utils.sql import table_exists 2968 exists = table_exists( 2969 pipe.target, 2970 self, 2971 schema=self.get_pipe_schema(pipe), 2972 debug=debug, 2973 ) 2974 if debug: 2975 dprint(f"{pipe} " + ('exists.' if exists else 'does not exist.')) 2976 return exists
Check that a Pipe's table exists.
Parameters
- pipe (mrsm.Pipe:): The pipe to check.
- debug (bool, default False): Verbosity toggle.
Returns
- A
boolcorresponding to whether a pipe's table exists.
2979def get_pipe_rowcount( 2980 self, 2981 pipe: mrsm.Pipe, 2982 begin: Union[datetime, int, None] = None, 2983 end: Union[datetime, int, None] = None, 2984 params: Optional[Dict[str, Any]] = None, 2985 remote: bool = False, 2986 debug: bool = False 2987) -> Union[int, None]: 2988 """ 2989 Get the rowcount for a pipe in accordance with given parameters. 2990 2991 Parameters 2992 ---------- 2993 pipe: mrsm.Pipe 2994 The pipe to query with. 2995 2996 begin: Union[datetime, int, None], default None 2997 The begin datetime value. 2998 2999 end: Union[datetime, int, None], default None 3000 The end datetime value. 3001 3002 params: Optional[Dict[str, Any]], default None 3003 See `meerschaum.utils.sql.build_where`. 3004 3005 remote: bool, default False 3006 If `True`, get the rowcount for the remote table. 3007 3008 debug: bool, default False 3009 Verbosity toggle. 3010 3011 Returns 3012 ------- 3013 An `int` for the number of rows if the `pipe` exists, otherwise `None`. 3014 3015 """ 3016 from meerschaum.utils.sql import dateadd_str, sql_item_name, wrap_query_with_cte, build_where 3017 from meerschaum.connectors.sql._fetch import get_pipe_query 3018 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 3019 if remote: 3020 msg = f"'fetch:definition' must be an attribute of {pipe} to get a remote rowcount." 3021 if 'fetch' not in pipe.parameters: 3022 error(msg) 3023 return None 3024 if 'definition' not in pipe.parameters['fetch']: 3025 error(msg) 3026 return None 3027 elif not pipe.exists(debug=debug): 3028 return None 3029 3030 flavor = self.flavor if not remote else pipe.connector.flavor 3031 conn = self if not remote else pipe.connector 3032 _pipe_name = sql_item_name(pipe.target, flavor, self.get_pipe_schema(pipe)) 3033 dt_col = pipe.columns.get('datetime', None) 3034 dt_typ = pipe.dtypes.get(dt_col, 'datetime') if dt_col else None 3035 dt_db_type = get_db_type_from_pd_type(dt_typ, flavor) if dt_typ else None 3036 if not dt_col: 3037 dt_col = pipe.guess_datetime() 3038 dt_name = sql_item_name(dt_col, flavor, None) if dt_col else None 3039 is_guess = True 3040 else: 3041 dt_col = pipe.get_columns('datetime') 3042 dt_name = sql_item_name(dt_col, flavor, None) 3043 is_guess = False 3044 3045 if begin is not None or end is not None: 3046 if is_guess: 3047 if dt_col is None: 3048 warn( 3049 f"No datetime could be determined for {pipe}." 3050 + "\n Ignoring begin and end...", 3051 stack=False, 3052 ) 3053 begin, end = None, None 3054 else: 3055 warn( 3056 f"A datetime wasn't specified for {pipe}.\n" 3057 + f" Using column \"{dt_col}\" for datetime bounds...", 3058 stack=False, 3059 ) 3060 3061 3062 _datetime_name = sql_item_name(dt_col, flavor) 3063 _cols_names = [ 3064 sql_item_name(col, flavor) 3065 for col in set( 3066 ( 3067 [dt_col] 3068 if dt_col 3069 else [] 3070 ) + ( 3071 [] 3072 if params is None 3073 else list(params.keys()) 3074 ) 3075 ) 3076 ] 3077 if not _cols_names: 3078 _cols_names = ['*'] 3079 3080 src = ( 3081 f"SELECT {', '.join(_cols_names)}\nFROM {_pipe_name}" 3082 if not remote 3083 else get_pipe_query(pipe) 3084 ) 3085 parent_query = f"SELECT COUNT(*)\nFROM {sql_item_name('src', flavor)}" 3086 query = wrap_query_with_cte(src, parent_query, flavor) 3087 if begin is not None or end is not None: 3088 query += "\nWHERE" 3089 if begin is not None: 3090 query += ( 3091 f"\n {dt_name} >= " 3092 + dateadd_str(flavor, datepart='minute', number=0, begin=begin, db_type=dt_db_type) 3093 ) 3094 if end is not None and begin is not None: 3095 query += "\n AND" 3096 if end is not None: 3097 query += ( 3098 f"\n {dt_name} < " 3099 + dateadd_str(flavor, datepart='minute', number=0, begin=end, db_type=dt_db_type) 3100 ) 3101 if params is not None: 3102 existing_cols = pipe.get_columns_types(debug=debug) 3103 valid_params = {k: v for k, v in params.items() if k in existing_cols} 3104 if valid_params: 3105 query += build_where(valid_params, conn).replace('WHERE', ( 3106 'AND' if (begin is not None or end is not None) 3107 else 'WHERE' 3108 ) 3109 ) 3110 3111 result = conn.value(query, debug=debug, silent=True) 3112 try: 3113 return int(result) 3114 except Exception: 3115 return None
Get the rowcount for a pipe in accordance with given parameters.
Parameters
- pipe (mrsm.Pipe): The pipe to query with.
- begin (Union[datetime, int, None], default None): The begin datetime value.
- end (Union[datetime, int, None], default None): The end datetime value.
- params (Optional[Dict[str, Any]], default None):
See
meerschaum.utils.sql.build_where. - remote (bool, default False):
If
True, get the rowcount for the remote table. - debug (bool, default False): Verbosity toggle.
Returns
- An
intfor the number of rows if thepipeexists, otherwiseNone.
3118def drop_pipe( 3119 self, 3120 pipe: mrsm.Pipe, 3121 debug: bool = False, 3122 **kw 3123) -> SuccessTuple: 3124 """ 3125 Drop a pipe's tables but maintain its registration. 3126 3127 Parameters 3128 ---------- 3129 pipe: mrsm.Pipe 3130 The pipe to drop. 3131 3132 Returns 3133 ------- 3134 A `SuccessTuple` indicated success. 3135 """ 3136 from meerschaum.utils.sql import table_exists, sql_item_name, DROP_IF_EXISTS_FLAVORS 3137 success = True 3138 target = pipe.target 3139 schema = self.get_pipe_schema(pipe) 3140 target_name = ( 3141 sql_item_name(target, self.flavor, schema) 3142 ) 3143 if table_exists(target, self, schema=schema, debug=debug): 3144 if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else "" 3145 success = self.exec( 3146 f"DROP TABLE {if_exists_str} {target_name}", silent=True, debug=debug 3147 ) is not None 3148 3149 ### Drop any MSSQL partition scheme + function the table referenced (no-op otherwise). 3150 if success: 3151 cleanup_queries = self._get_partition_cleanup_queries(pipe) 3152 if cleanup_queries: 3153 self.exec_queries(cleanup_queries, break_on_error=False, silent=True, debug=debug) 3154 3155 msg = "Success" if success else f"Failed to drop {pipe}." 3156 return success, msg
Drop a pipe's tables but maintain its registration.
Parameters
- pipe (mrsm.Pipe): The pipe to drop.
Returns
- A
SuccessTupleindicated success.
3159def clear_pipe( 3160 self, 3161 pipe: mrsm.Pipe, 3162 begin: Union[datetime, int, None] = None, 3163 end: Union[datetime, int, None] = None, 3164 params: Optional[Dict[str, Any]] = None, 3165 debug: bool = False, 3166 **kw 3167) -> SuccessTuple: 3168 """ 3169 Delete a pipe's data within a bounded or unbounded interval without dropping the table. 3170 3171 Parameters 3172 ---------- 3173 pipe: mrsm.Pipe 3174 The pipe to clear. 3175 3176 begin: Union[datetime, int, None], default None 3177 Beginning datetime. Inclusive. 3178 3179 end: Union[datetime, int, None], default None 3180 Ending datetime. Exclusive. 3181 3182 params: Optional[Dict[str, Any]], default None 3183 See `meerschaum.utils.sql.build_where`. 3184 3185 """ 3186 if not pipe.exists(debug=debug): 3187 return True, f"{pipe} does not exist, so nothing was cleared." 3188 3189 from meerschaum.utils.sql import sql_item_name, build_where, dateadd_str 3190 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 3191 pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 3192 3193 dt_col = pipe.columns.get('datetime', None) 3194 dt_typ = pipe.dtypes.get(dt_col, 'datetime') if dt_col else None 3195 dt_db_type = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None 3196 if not pipe.columns.get('datetime', None): 3197 dt_col = pipe.guess_datetime() 3198 dt_name = sql_item_name(dt_col, self.flavor, None) if dt_col else None 3199 is_guess = True 3200 else: 3201 dt_col = pipe.get_columns('datetime') 3202 dt_name = sql_item_name(dt_col, self.flavor, None) 3203 is_guess = False 3204 3205 if begin is not None or end is not None: 3206 if is_guess: 3207 if dt_col is None: 3208 warn( 3209 f"No datetime could be determined for {pipe}." 3210 + "\n Ignoring datetime bounds...", 3211 stack=False, 3212 ) 3213 begin, end = None, None 3214 else: 3215 warn( 3216 f"A datetime wasn't specified for {pipe}.\n" 3217 + f" Using column \"{dt_col}\" for datetime bounds...", 3218 stack=False, 3219 ) 3220 3221 valid_params = {} 3222 if params is not None: 3223 existing_cols = pipe.get_columns_types(debug=debug) 3224 valid_params = {k: v for k, v in params.items() if k in existing_cols} 3225 clear_query = ( 3226 f"DELETE FROM {pipe_name}\nWHERE 1 = 1\n" 3227 + ('\n AND ' + build_where(valid_params, self, with_where=False) if valid_params else '') 3228 + ( 3229 ( 3230 f'\n AND {dt_name} >= ' 3231 + dateadd_str(self.flavor, 'day', 0, begin, db_type=dt_db_type) 3232 ) 3233 if begin is not None 3234 else '' 3235 ) + ( 3236 ( 3237 f'\n AND {dt_name} < ' 3238 + dateadd_str(self.flavor, 'day', 0, end, db_type=dt_db_type) 3239 ) 3240 if end is not None 3241 else '' 3242 ) 3243 ) 3244 success = self.exec(clear_query, silent=True, debug=debug) is not None 3245 msg = "Success" if success else f"Failed to clear {pipe}." 3246 return success, msg
Delete a pipe's data within a bounded or unbounded interval without dropping the table.
Parameters
- pipe (mrsm.Pipe): The pipe to clear.
- begin (Union[datetime, int, None], default None): Beginning datetime. Inclusive.
- end (Union[datetime, int, None], default None): Ending datetime. Exclusive.
- params (Optional[Dict[str, Any]], default None):
See
meerschaum.utils.sql.build_where.
3889def deduplicate_pipe( 3890 self, 3891 pipe: mrsm.Pipe, 3892 begin: Union[datetime, int, None] = None, 3893 end: Union[datetime, int, None] = None, 3894 params: Optional[Dict[str, Any]] = None, 3895 debug: bool = False, 3896 **kwargs: Any 3897) -> SuccessTuple: 3898 """ 3899 Delete duplicate values within a pipe's table. 3900 3901 Parameters 3902 ---------- 3903 pipe: mrsm.Pipe 3904 The pipe whose table to deduplicate. 3905 3906 begin: Union[datetime, int, None], default None 3907 If provided, only deduplicate values greater than or equal to this value. 3908 3909 end: Union[datetime, int, None], default None 3910 If provided, only deduplicate values less than this value. 3911 3912 params: Optional[Dict[str, Any]], default None 3913 If provided, further limit deduplication to values which match this query dictionary. 3914 3915 debug: bool, default False 3916 Verbosity toggle. 3917 3918 Returns 3919 ------- 3920 A `SuccessTuple` indicating success. 3921 """ 3922 from meerschaum.utils.sql import ( 3923 sql_item_name, 3924 get_rename_table_queries, 3925 DROP_IF_EXISTS_FLAVORS, 3926 get_create_table_query, 3927 format_cte_subquery, 3928 get_null_replacement, 3929 ) 3930 from meerschaum.utils.misc import generate_password, flatten_list 3931 3932 pipe_table_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 3933 3934 if not pipe.exists(debug=debug): 3935 return False, f"Table {pipe_table_name} does not exist." 3936 3937 dt_col = pipe.columns.get('datetime', None) 3938 cols_types = pipe.get_columns_types(debug=debug) 3939 existing_cols = pipe.get_columns_types(debug=debug) 3940 3941 get_rowcount_query = f"SELECT COUNT(*) FROM {pipe_table_name}" 3942 old_rowcount = self.value(get_rowcount_query, debug=debug) 3943 if old_rowcount is None: 3944 return False, f"Failed to get rowcount for table {pipe_table_name}." 3945 3946 ### Non-datetime indices that in fact exist. 3947 indices = [ 3948 col 3949 for key, col in pipe.columns.items() 3950 if col and col != dt_col and col in cols_types 3951 ] 3952 indices_names = [sql_item_name(index_col, self.flavor, None) for index_col in indices] 3953 existing_cols_names = [sql_item_name(col, self.flavor, None) for col in existing_cols] 3954 duplicate_row_number_name = sql_item_name('dup_row_num', self.flavor, None) 3955 previous_row_number_name = sql_item_name('prev_row_num', self.flavor, None) 3956 3957 index_list_str = ( 3958 sql_item_name(dt_col, self.flavor, None) 3959 if dt_col 3960 else '' 3961 ) 3962 index_list_str_ordered = ( 3963 ( 3964 sql_item_name(dt_col, self.flavor, None) + " DESC" 3965 ) 3966 if dt_col 3967 else '' 3968 ) 3969 if indices: 3970 index_list_str += ', ' + ', '.join(indices_names) 3971 index_list_str_ordered += ', ' + ', '.join(indices_names) 3972 if index_list_str.startswith(','): 3973 index_list_str = index_list_str.lstrip(',').lstrip() 3974 if index_list_str_ordered.startswith(','): 3975 index_list_str_ordered = index_list_str_ordered.lstrip(',').lstrip() 3976 3977 cols_list_str = ', '.join(existing_cols_names) 3978 3979 try: 3980 ### NOTE: MySQL 5 and below does not support window functions (ROW_NUMBER()). 3981 is_old_mysql = ( 3982 self.flavor in ('mysql', 'mariadb') 3983 and 3984 int(self.db_version.split('.')[0]) < 8 3985 ) 3986 except Exception: 3987 is_old_mysql = False 3988 3989 src_query = f""" 3990 SELECT 3991 {cols_list_str}, 3992 ROW_NUMBER() OVER ( 3993 PARTITION BY 3994 {index_list_str} 3995 ORDER BY {index_list_str_ordered} 3996 ) AS {duplicate_row_number_name} 3997 FROM {pipe_table_name} 3998 """ 3999 duplicates_cte_subquery = format_cte_subquery( 4000 src_query, 4001 self.flavor, 4002 sub_name = 'src', 4003 cols_to_select = cols_list_str, 4004 ) + f""" 4005 WHERE {duplicate_row_number_name} = 1 4006 """ 4007 old_mysql_query = ( 4008 f""" 4009 SELECT 4010 {index_list_str} 4011 FROM ( 4012 SELECT 4013 {index_list_str}, 4014 IF( 4015 @{previous_row_number_name} <> {index_list_str.replace(', ', ' + ')}, 4016 @{duplicate_row_number_name} := 0, 4017 @{duplicate_row_number_name} 4018 ), 4019 @{previous_row_number_name} := {index_list_str.replace(', ', ' + ')}, 4020 @{duplicate_row_number_name} := @{duplicate_row_number_name} + 1 AS """ 4021 + f"""{duplicate_row_number_name} 4022 FROM 4023 {pipe_table_name}, 4024 ( 4025 SELECT @{duplicate_row_number_name} := 0 4026 ) AS {duplicate_row_number_name}, 4027 ( 4028 SELECT @{previous_row_number_name} := '{get_null_replacement('str', 'mysql')}' 4029 ) AS {previous_row_number_name} 4030 ORDER BY {index_list_str_ordered} 4031 ) AS t 4032 WHERE {duplicate_row_number_name} = 1 4033 """ 4034 ) 4035 if is_old_mysql: 4036 duplicates_cte_subquery = old_mysql_query 4037 4038 session_id = generate_password(3) 4039 4040 dedup_table = self.get_temporary_target(pipe.target, transact_id=session_id, label='dedup') 4041 temp_old_table = self.get_temporary_target(pipe.target, transact_id=session_id, label='old') 4042 temp_old_table_name = sql_item_name(temp_old_table, self.flavor, self.get_pipe_schema(pipe)) 4043 4044 create_temporary_table_query = get_create_table_query( 4045 duplicates_cte_subquery, 4046 dedup_table, 4047 self.flavor, 4048 ) + f""" 4049 ORDER BY {index_list_str_ordered} 4050 """ 4051 if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else "" 4052 alter_queries = flatten_list([ 4053 get_rename_table_queries( 4054 pipe.target, 4055 temp_old_table, 4056 self.flavor, 4057 schema=self.get_pipe_schema(pipe), 4058 ), 4059 get_rename_table_queries( 4060 dedup_table, 4061 pipe.target, 4062 self.flavor, 4063 schema=None, 4064 new_schema=self.get_pipe_schema(pipe), 4065 ), 4066 f"DROP TABLE {if_exists_str} {temp_old_table_name}", 4067 ]) 4068 4069 self._log_temporary_tables_creation(temp_old_table, create=(not pipe.temporary), debug=debug) 4070 create_temporary_result = self.execute(create_temporary_table_query, debug=debug) 4071 if create_temporary_result is None: 4072 return False, f"Failed to deduplicate table {pipe_table_name}." 4073 4074 results = self.exec_queries( 4075 alter_queries, 4076 break_on_error=True, 4077 rollback=True, 4078 debug=debug, 4079 ) 4080 4081 fail_query = None 4082 for result, query in zip(results, alter_queries): 4083 if result is None: 4084 fail_query = query 4085 break 4086 success = fail_query is None 4087 4088 new_rowcount = ( 4089 self.value(get_rowcount_query, debug=debug) 4090 if success 4091 else None 4092 ) 4093 4094 msg = ( 4095 ( 4096 f"Successfully deduplicated table {pipe_table_name}" 4097 + ( 4098 f"\nfrom {old_rowcount:,} to {new_rowcount:,} rows" 4099 if old_rowcount != new_rowcount 4100 else '' 4101 ) + '.' 4102 ) 4103 if success 4104 else f"Failed to execute query:\n{fail_query}" 4105 ) 4106 return success, msg
Delete duplicate values within a pipe's table.
Parameters
- pipe (mrsm.Pipe): The pipe whose table to deduplicate.
- begin (Union[datetime, int, None], default None): If provided, only deduplicate values greater than or equal to this value.
- end (Union[datetime, int, None], default None): If provided, only deduplicate values less than this value.
- params (Optional[Dict[str, Any]], default None): If provided, further limit deduplication to values which match this query dictionary.
- debug (bool, default False): Verbosity toggle.
Returns
- A
SuccessTupleindicating success.
3249def get_pipe_table( 3250 self, 3251 pipe: mrsm.Pipe, 3252 debug: bool = False, 3253) -> Union['sqlalchemy.Table', None]: 3254 """ 3255 Return the `sqlalchemy.Table` object for a `mrsm.Pipe`. 3256 3257 Parameters 3258 ---------- 3259 pipe: mrsm.Pipe: 3260 The pipe in question. 3261 3262 Returns 3263 ------- 3264 A `sqlalchemy.Table` object. 3265 3266 """ 3267 from meerschaum.utils.sql import get_sqlalchemy_table 3268 if not pipe.exists(debug=debug): 3269 return None 3270 3271 return get_sqlalchemy_table( 3272 pipe.target, 3273 connector=self, 3274 schema=self.get_pipe_schema(pipe), 3275 debug=debug, 3276 refresh=True, 3277 )
Return the sqlalchemy.Table object for a mrsm.Pipe.
Parameters
- pipe (mrsm.Pipe:): The pipe in question.
Returns
- A
sqlalchemy.Tableobject.
3280def get_pipe_columns_types( 3281 self, 3282 pipe: mrsm.Pipe, 3283 debug: bool = False, 3284) -> Dict[str, str]: 3285 """ 3286 Get the pipe's columns and types. 3287 3288 Parameters 3289 ---------- 3290 pipe: mrsm.Pipe: 3291 The pipe to get the columns for. 3292 3293 Returns 3294 ------- 3295 A dictionary of columns names (`str`) and types (`str`). 3296 3297 Examples 3298 -------- 3299 >>> conn.get_pipe_columns_types(pipe) 3300 { 3301 'dt': 'TIMESTAMP WITHOUT TIMEZONE', 3302 'id': 'BIGINT', 3303 'val': 'DOUBLE PRECISION', 3304 } 3305 >>> 3306 """ 3307 from meerschaum.utils.sql import get_table_cols_types 3308 if not pipe.exists(debug=debug): 3309 return {} 3310 3311 if self.flavor not in ('oracle', 'mysql', 'mariadb', 'sqlite', 'geopackage'): 3312 return get_table_cols_types( 3313 pipe.target, 3314 self, 3315 flavor=self.flavor, 3316 schema=self.get_pipe_schema(pipe), 3317 debug=debug, 3318 ) 3319 3320 if debug: 3321 dprint(f"Fetching columns_types for {pipe} with via SQLAlchemy table.") 3322 3323 table_columns = {} 3324 try: 3325 pipe_table = self.get_pipe_table(pipe, debug=debug) 3326 if pipe_table is None: 3327 return {} 3328 3329 if debug: 3330 dprint("Found columns:") 3331 mrsm.pprint(dict(pipe_table.columns)) 3332 3333 for col in pipe_table.columns: 3334 table_columns[str(col.name)] = str(col.type) 3335 except Exception as e: 3336 traceback.print_exc() 3337 warn(e) 3338 table_columns = {} 3339 3340 return table_columns
Get the pipe's columns and types.
Parameters
- pipe (mrsm.Pipe:): The pipe to get the columns for.
Returns
- A dictionary of columns names (
str) and types (str).
Examples
>>> conn.get_pipe_columns_types(pipe)
{
'dt': 'TIMESTAMP WITHOUT TIMEZONE',
'id': 'BIGINT',
'val': 'DOUBLE PRECISION',
}
>>>
3835def get_to_sql_dtype( 3836 self, 3837 pipe: 'mrsm.Pipe', 3838 df: 'pd.DataFrame', 3839 update_dtypes: bool = True, 3840) -> Dict[str, 'sqlalchemy.sql.visitors.TraversibleType']: 3841 """ 3842 Given a pipe and DataFrame, return the `dtype` dictionary for `to_sql()`. 3843 3844 Parameters 3845 ---------- 3846 pipe: mrsm.Pipe 3847 The pipe which may contain a `dtypes` parameter. 3848 3849 df: pd.DataFrame 3850 The DataFrame to be pushed via `to_sql()`. 3851 3852 update_dtypes: bool, default True 3853 If `True`, patch the pipe's dtypes onto the DataFrame's dtypes. 3854 3855 Returns 3856 ------- 3857 A dictionary with `sqlalchemy` datatypes. 3858 3859 Examples 3860 -------- 3861 >>> import pandas as pd 3862 >>> import meerschaum as mrsm 3863 >>> 3864 >>> conn = mrsm.get_connector('sql:memory') 3865 >>> df = pd.DataFrame([{'a': {'b': 1}}]) 3866 >>> pipe = mrsm.Pipe('a', 'b', dtypes={'a': 'json'}) 3867 >>> get_to_sql_dtype(pipe, df) 3868 {'a': <class 'sqlalchemy.sql.sqltypes.JSON'>} 3869 """ 3870 from meerschaum.utils.dataframe import get_special_cols 3871 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 3872 df_dtypes = { 3873 col: str(typ) 3874 for col, typ in df.dtypes.items() 3875 } 3876 special_cols = get_special_cols(df) 3877 df_dtypes.update(special_cols) 3878 3879 if update_dtypes: 3880 df_dtypes.update(pipe.dtypes) 3881 3882 return { 3883 col: get_db_type_from_pd_type(typ, self.flavor, as_sqlalchemy=True) 3884 for col, typ in df_dtypes.items() 3885 if col and typ 3886 }
Given a pipe and DataFrame, return the dtype dictionary for to_sql().
Parameters
- pipe (mrsm.Pipe):
The pipe which may contain a
dtypesparameter. - df (pd.DataFrame):
The DataFrame to be pushed via
to_sql(). - update_dtypes (bool, default True):
If
True, patch the pipe's dtypes onto the DataFrame's dtypes.
Returns
- A dictionary with
sqlalchemydatatypes.
Examples
>>> import pandas as pd
>>> import meerschaum as mrsm
>>>
>>> conn = mrsm.get_connector('sql:memory')
>>> df = pd.DataFrame([{'a': {'b': 1}}])
>>> pipe = mrsm.Pipe('a', 'b', dtypes={'a': 'json'})
>>> get_to_sql_dtype(pipe, df)
{'a': <class 'sqlalchemy.sql.sqltypes.JSON'>}
4109def get_pipe_schema(self, pipe: mrsm.Pipe) -> Union[str, None]: 4110 """ 4111 Return the schema to use for this pipe. 4112 First check `pipe.parameters['schema']`, then check `self.schema`. 4113 4114 Parameters 4115 ---------- 4116 pipe: mrsm.Pipe 4117 The pipe which may contain a configured schema. 4118 4119 Returns 4120 ------- 4121 A schema string or `None` if nothing is configured. 4122 """ 4123 if self.flavor in ('sqlite', 'geopackage'): 4124 return self.schema 4125 return pipe.parameters.get('schema', self.schema)
Return the schema to use for this pipe.
First check pipe.parameters['schema'], then check self.schema.
Parameters
- pipe (mrsm.Pipe): The pipe which may contain a configured schema.
Returns
- A schema string or
Noneif nothing is configured.
1640def create_pipe_table_from_df( 1641 self, 1642 pipe: mrsm.Pipe, 1643 df: 'pd.DataFrame', 1644 debug: bool = False, 1645) -> mrsm.SuccessTuple: 1646 """ 1647 Create a pipe's table from its configured dtypes and an incoming dataframe. 1648 """ 1649 from meerschaum.utils.dataframe import get_special_cols 1650 from meerschaum.utils.sql import ( 1651 get_create_table_queries, 1652 sql_item_name, 1653 get_create_schema_if_not_exists_queries, 1654 ) 1655 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 1656 if self.flavor == 'geopackage': 1657 init_success, init_msg = self._init_geopackage_pipe(df, pipe, debug=debug) 1658 if not init_success: 1659 return init_success, init_msg 1660 1661 primary_key = pipe.columns.get('primary', None) 1662 primary_key_typ = ( 1663 pipe.dtypes.get(primary_key, str(df.dtypes.get(primary_key, 'int'))) 1664 if primary_key 1665 else None 1666 ) 1667 primary_key_db_type = ( 1668 get_db_type_from_pd_type(primary_key_typ, self.flavor) 1669 if primary_key 1670 else None 1671 ) 1672 dt_col = pipe.columns.get('datetime', None) 1673 new_dtypes = { 1674 **{ 1675 col: str(typ) 1676 for col, typ in df.dtypes.items() 1677 }, 1678 **{ 1679 col: str(df.dtypes.get(col, 'int')) 1680 for col_ix, col in pipe.columns.items() 1681 if col and col_ix != 'primary' 1682 }, 1683 **get_special_cols(df), 1684 **pipe.dtypes 1685 } 1686 autoincrement = ( 1687 pipe.parameters.get('autoincrement', False) 1688 or (primary_key and primary_key not in new_dtypes) 1689 ) 1690 if autoincrement: 1691 _ = new_dtypes.pop(primary_key, None) 1692 1693 schema = self.get_pipe_schema(pipe) 1694 1695 ### When supported (TimescaleDB 2.21+), create the hypertable declaratively via 1696 ### `CREATE TABLE ... WITH (tsdb.hypertable, ...)`. Fall back to a plain `CREATE TABLE` 1697 ### (and the `create_hypertable()` call in `get_create_index_queries`) if it fails. 1698 hypertable = ( 1699 self.flavor in ('timescaledb', 'timescaledb-ha') 1700 and pipe.parameters.get('hypertable', True) 1701 and dt_col is not None 1702 ) 1703 1704 ### Use the declarative `CREATE TABLE ... WITH (tsdb.hypertable, ...)` path only when Hypercore 1705 ### is enabled (the default). Declarative creation enables the columnstore (via the 1706 ### `segmentby`/`orderby` options) AND makes TimescaleDB auto-install a columnstore policy — 1707 ### exactly the Hypercore behavior we want. With `hypercore=False`, fall back to a plain table 1708 ### plus the `create_hypertable()` call during index creation, which adds NO columnstore policy, 1709 ### keeping `hypercore` a true opt-out (a plain row-store hypertable). 1710 hypercore = hypertable and pipe.parameters.get('hypercore', True) 1711 hypertable_chunk_interval = None 1712 hypertable_segmentby = None 1713 hypertable_orderby = None 1714 if hypercore: 1715 chunk_interval = pipe.get_chunk_interval(debug=debug) 1716 hypertable_chunk_interval = ( 1717 f'{chunk_interval}' 1718 if isinstance(chunk_interval, int) 1719 else f'{int(chunk_interval.total_seconds() / 60)} minutes' 1720 ) 1721 _compress_settings = self._get_compress_settings(pipe) 1722 hypertable_segmentby = _compress_settings['segmentby'] or None 1723 hypertable_orderby = _compress_settings['orderby'] or None 1724 1725 ### Native range partitioning (non-TimescaleDB flavors); a no-op column for others. 1726 partition_by_column = self._get_partition_column(pipe) 1727 ### MySQL/MariaDB require the initial partitions declared inline at `CREATE TABLE` 1728 ### (an empty RANGE-partitioned table is invalid); compute them from the creation df. 1729 partition_bounds = ( 1730 self._get_initial_partition_bounds(pipe, df, debug=debug) 1731 if (partition_by_column is not None and self.flavor in ('mysql', 'mariadb')) 1732 else None 1733 ) 1734 ### A MySQL RANGE table needs at least one inline partition; if the creation df has no 1735 ### datetime values, fall back to a plain table (rare — `is_new` normally implies rows). 1736 if partition_by_column is not None and self.flavor in ('mysql', 'mariadb') and not partition_bounds: 1737 partition_by_column = None 1738 1739 ### MSSQL partitions via a function + scheme created before the table; its clustered index is 1740 ### placed on the scheme (passed as `partition_scheme_name`). 1741 partition_scheme_name = None 1742 partition_creation_queries = [] 1743 if partition_by_column is not None and self.flavor == 'mssql': 1744 partition_scheme_name = self._partition_scheme_name(pipe) 1745 partition_creation_queries = self._get_mssql_partition_creation_queries( 1746 pipe, df, debug=debug 1747 ) 1748 1749 def _build_create_table_queries(_hypertable_chunk_interval): 1750 _queries = get_create_table_queries( 1751 new_dtypes, 1752 pipe.target, 1753 self.flavor, 1754 schema=schema, 1755 primary_key=primary_key, 1756 primary_key_db_type=primary_key_db_type, 1757 datetime_column=dt_col, 1758 hypertable_chunk_interval=_hypertable_chunk_interval, 1759 hypertable_segmentby=(hypertable_segmentby if _hypertable_chunk_interval else None), 1760 hypertable_orderby=(hypertable_orderby if _hypertable_chunk_interval else None), 1761 partition_by_column=partition_by_column, 1762 partition_bounds=partition_bounds, 1763 partition_scheme_name=partition_scheme_name, 1764 ) 1765 if partition_creation_queries: 1766 _queries = partition_creation_queries + _queries 1767 if schema: 1768 _queries = ( 1769 get_create_schema_if_not_exists_queries(schema, self.flavor) 1770 + _queries 1771 ) 1772 return _queries 1773 1774 create_table_queries = _build_create_table_queries(hypertable_chunk_interval) 1775 success = all( 1776 self.exec_queries( 1777 create_table_queries, 1778 break_on_error=True, 1779 rollback=True, 1780 silent=hypercore, 1781 debug=debug, 1782 ) 1783 ) 1784 if not success and hypercore: 1785 ### Declarative hypertable syntax unsupported; retry as a plain table. 1786 ### `create_hypertable()` runs later during index creation. 1787 create_table_queries = _build_create_table_queries(None) 1788 success = all( 1789 self.exec_queries(create_table_queries, break_on_error=True, rollback=True, debug=debug) 1790 ) 1791 target_name = sql_item_name(pipe.target, schema=self.get_pipe_schema(pipe), flavor=self.flavor) 1792 msg = ( 1793 "Success" 1794 if success 1795 else f"Failed to create {target_name}." 1796 ) 1797 if success and self.flavor == 'geopackage': 1798 return self._init_geopackage_pipe(df, pipe, debug=debug) 1799 1800 return success, msg
Create a pipe's table from its configured dtypes and an incoming dataframe.
3343def get_pipe_columns_indices( 3344 self, 3345 pipe: mrsm.Pipe, 3346 debug: bool = False, 3347) -> Dict[str, List[Dict[str, str]]]: 3348 """ 3349 Return a dictionary mapping columns to the indices created on those columns. 3350 3351 Parameters 3352 ---------- 3353 pipe: mrsm.Pipe 3354 The pipe to be queried against. 3355 3356 Returns 3357 ------- 3358 A dictionary mapping columns names to lists of dictionaries. 3359 The dictionaries in the lists contain the name and type of the indices. 3360 """ 3361 if pipe.__dict__.get('_skip_check_indices', False): 3362 return {} 3363 3364 from meerschaum.utils.sql import get_table_cols_indices 3365 return get_table_cols_indices( 3366 pipe.target, 3367 self, 3368 flavor=self.flavor, 3369 schema=self.get_pipe_schema(pipe), 3370 debug=debug, 3371 )
Return a dictionary mapping columns to the indices created on those columns.
Parameters
- pipe (mrsm.Pipe): The pipe to be queried against.
Returns
- A dictionary mapping columns names to lists of dictionaries.
- The dictionaries in the lists contain the name and type of the indices.
4128@staticmethod 4129def get_temporary_target( 4130 target: str, 4131 transact_id: Optional[str] = None, 4132 label: Optional[str] = None, 4133 separator: Optional[str] = None, 4134) -> str: 4135 """ 4136 Return a unique(ish) temporary target for a pipe. 4137 """ 4138 from meerschaum.utils.misc import generate_password 4139 temp_target_cf = ( 4140 mrsm.get_config('system', 'connectors', 'sql', 'instance', 'temporary_target') or {} 4141 ) 4142 transaction_id_len = temp_target_cf.get('transaction_id_length', 3) 4143 transact_id = transact_id or generate_password(transaction_id_len) 4144 temp_prefix = temp_target_cf.get('prefix', '_') 4145 separator = separator or temp_target_cf.get('separator', '_') 4146 return ( 4147 temp_prefix 4148 + target 4149 + separator 4150 + transact_id 4151 + ((separator + label) if label else '') 4152 )
Return a unique(ish) temporary target for a pipe.
361def create_pipe_indices( 362 self, 363 pipe: mrsm.Pipe, 364 columns: Optional[List[str]] = None, 365 debug: bool = False, 366) -> SuccessTuple: 367 """ 368 Create a pipe's indices. 369 """ 370 success = self.create_indices(pipe, columns=columns, debug=debug) 371 msg = ( 372 "Success" 373 if success 374 else f"Failed to create indices for {pipe}." 375 ) 376 return success, msg
Create a pipe's indices.
417def drop_pipe_indices( 418 self, 419 pipe: mrsm.Pipe, 420 columns: Optional[List[str]] = None, 421 debug: bool = False, 422) -> SuccessTuple: 423 """ 424 Drop a pipe's indices. 425 """ 426 success = self.drop_indices(pipe, columns=columns, debug=debug) 427 msg = ( 428 "Success" 429 if success 430 else f"Failed to drop indices for {pipe}." 431 ) 432 return success, msg
Drop a pipe's indices.
469def get_pipe_index_names(self, pipe: mrsm.Pipe) -> Dict[str, str]: 470 """ 471 Return a dictionary mapping index keys to their names on the database. 472 473 Returns 474 ------- 475 A dictionary of index keys to column names. 476 """ 477 from meerschaum.utils.sql import DEFAULT_SCHEMA_FLAVORS, truncate_item_name 478 _parameters = pipe.parameters 479 _index_template = _parameters.get('index_template', "IX_{schema_str}{target}_{column_names}") 480 _schema = self.get_pipe_schema(pipe) 481 if _schema is None: 482 _schema = ( 483 DEFAULT_SCHEMA_FLAVORS.get(self.flavor, None) 484 if self.flavor != 'mssql' 485 else None 486 ) 487 schema_str = '' if _schema is None else f'{_schema}_' 488 schema_str = '' 489 _indices = pipe.indices 490 _target = pipe.target 491 _column_names = { 492 ix: ( 493 '_'.join(cols) 494 if isinstance(cols, (list, tuple)) 495 else str(cols) 496 ) 497 for ix, cols in _indices.items() 498 if cols 499 } 500 _index_names = { 501 ix: _index_template.format( 502 target=_target, 503 column_names=column_names, 504 connector_keys=pipe.connector_keys, 505 metric_key=pipe.metric_key, 506 location_key=pipe.location_key, 507 schema_str=schema_str, 508 ) 509 for ix, column_names in _column_names.items() 510 } 511 ### NOTE: Skip any duplicate indices. 512 seen_index_names = {} 513 for ix, index_name in _index_names.items(): 514 if index_name in seen_index_names: 515 continue 516 seen_index_names[index_name] = ix 517 return { 518 ix: truncate_item_name(index_name, flavor=self.flavor) 519 for index_name, ix in seen_index_names.items() 520 }
Return a dictionary mapping index keys to their names on the database.
Returns
- A dictionary of index keys to column names.
18def get_plugins_pipe(self) -> mrsm.Pipe: 19 """ 20 Return the internal metadata plugins pipe. 21 """ 22 users_pipe = self.get_users_pipe() 23 user_id_dtype = users_pipe.dtypes.get('user_id', 'int') 24 return mrsm.Pipe( 25 'mrsm', 'plugins', 26 instance=self, 27 temporary=True, 28 static=True, 29 null_indices=False, 30 columns={ 31 'primary': 'plugin_id', 32 'user_id': 'user_id', 33 }, 34 dtypes={ 35 'plugin_name': 'string', 36 'user_id': user_id_dtype, 37 'attributes': 'json', 38 'version': 'string', 39 }, 40 indices={ 41 'unique': 'plugin_name', 42 }, 43 )
Return the internal metadata plugins pipe.
46def register_plugin( 47 self, 48 plugin: 'mrsm.core.Plugin', 49 force: bool = False, 50 debug: bool = False, 51 **kw: Any 52) -> SuccessTuple: 53 """Register a new plugin to the plugins table.""" 54 from meerschaum.utils.packages import attempt_import 55 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 56 from meerschaum.utils.sql import json_flavors 57 from meerschaum.connectors.sql.tables import get_tables 58 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 59 60 old_id = self.get_plugin_id(plugin, debug=debug) 61 62 ### Check for version conflict. May be overridden with `--force`. 63 if old_id is not None and not force: 64 old_version = self.get_plugin_version(plugin, debug=debug) 65 new_version = plugin.version 66 if old_version is None: 67 old_version = '' 68 if new_version is None: 69 new_version = '' 70 71 ### verify that the new version is greater than the old 72 packaging_version = attempt_import('packaging.version') 73 if ( 74 old_version and new_version 75 and packaging_version.parse(old_version) >= packaging_version.parse(new_version) 76 ): 77 return False, ( 78 f"Version '{new_version}' of plugin '{plugin}' " + 79 f"must be greater than existing version '{old_version}'." 80 ) 81 82 bind_variables = { 83 'plugin_name': plugin.name, 84 'version': plugin.version, 85 'attributes': ( 86 json.dumps(plugin.attributes) if self.flavor not in json_flavors else plugin.attributes 87 ), 88 'user_id': plugin.user_id, 89 } 90 91 if old_id is None: 92 query = sqlalchemy.insert(plugins_tbl).values(**bind_variables) 93 else: 94 query = ( 95 sqlalchemy.update(plugins_tbl) 96 .values(**bind_variables) 97 .where(plugins_tbl.c.plugin_id == old_id) 98 ) 99 100 result = self.exec(query, debug=debug) 101 if result is None: 102 return False, f"Failed to register plugin '{plugin}'." 103 return True, f"Successfully registered plugin '{plugin}'."
Register a new plugin to the plugins table.
272def delete_plugin( 273 self, 274 plugin: 'mrsm.core.Plugin', 275 debug: bool = False, 276 **kw: Any 277) -> SuccessTuple: 278 """Delete a plugin from the plugins table.""" 279 from meerschaum.utils.packages import attempt_import 280 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 281 from meerschaum.connectors.sql.tables import get_tables 282 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 283 284 plugin_id = self.get_plugin_id(plugin, debug=debug) 285 if plugin_id is None: 286 return True, f"Plugin '{plugin}' was not registered." 287 288 query = sqlalchemy.delete(plugins_tbl).where(plugins_tbl.c.plugin_id == plugin_id) 289 result = self.exec(query, debug=debug) 290 if result is None: 291 return False, f"Failed to delete plugin '{plugin}'." 292 return True, f"Successfully deleted plugin '{plugin}'."
Delete a plugin from the plugins table.
105def get_plugin_id( 106 self, 107 plugin: 'mrsm.core.Plugin', 108 debug: bool = False 109) -> Optional[int]: 110 """ 111 Return a plugin's ID. 112 """ 113 ### ensure plugins table exists 114 from meerschaum.connectors.sql.tables import get_tables 115 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 116 from meerschaum.utils.packages import attempt_import 117 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 118 119 query = ( 120 sqlalchemy 121 .select(plugins_tbl.c.plugin_id) 122 .where(plugins_tbl.c.plugin_name == plugin.name) 123 ) 124 125 try: 126 return int(self.value(query, debug=debug)) 127 except Exception: 128 return None
Return a plugin's ID.
131def get_plugin_version( 132 self, 133 plugin: 'mrsm.core.Plugin', 134 debug: bool = False 135) -> Optional[str]: 136 """ 137 Return a plugin's version. 138 """ 139 ### ensure plugins table exists 140 from meerschaum.connectors.sql.tables import get_tables 141 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 142 from meerschaum.utils.packages import attempt_import 143 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 144 query = sqlalchemy.select(plugins_tbl.c.version).where(plugins_tbl.c.plugin_name == plugin.name) 145 return self.value(query, debug=debug)
Return a plugin's version.
225def get_plugins( 226 self, 227 user_id: Optional[int] = None, 228 search_term: Optional[str] = None, 229 debug: bool = False, 230 **kw: Any 231) -> List[str]: 232 """ 233 Return a list of all registered plugins. 234 235 Parameters 236 ---------- 237 user_id: Optional[int], default None 238 If specified, filter plugins by a specific `user_id`. 239 240 search_term: Optional[str], default None 241 If specified, add a `WHERE plugin_name LIKE '{search_term}%'` clause to filter the plugins. 242 243 244 Returns 245 ------- 246 A list of plugin names. 247 """ 248 ### ensure plugins table exists 249 from meerschaum.connectors.sql.tables import get_tables 250 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 251 from meerschaum.utils.packages import attempt_import 252 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 253 254 query = sqlalchemy.select(plugins_tbl.c.plugin_name) 255 if user_id is not None: 256 query = query.where(plugins_tbl.c.user_id == user_id) 257 if search_term is not None: 258 query = query.where(plugins_tbl.c.plugin_name.like(search_term + '%')) 259 260 rows = ( 261 self.execute(query).fetchall() 262 if self.flavor != 'duckdb' 263 else [ 264 (row['plugin_name'],) 265 for row in self.read(query).to_dict(orient='records') 266 ] 267 ) 268 269 return [row[0] for row in rows]
Return a list of all registered plugins.
Parameters
- user_id (Optional[int], default None):
If specified, filter plugins by a specific
user_id. - search_term (Optional[str], default None):
If specified, add a
WHERE plugin_name LIKE '{search_term}%'clause to filter the plugins.
Returns
- A list of plugin names.
147def get_plugin_user_id( 148 self, 149 plugin: 'mrsm.core.Plugin', 150 debug: bool = False 151) -> Optional[int]: 152 """ 153 Return a plugin's user ID. 154 """ 155 ### ensure plugins table exists 156 from meerschaum.connectors.sql.tables import get_tables 157 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 158 from meerschaum.utils.packages import attempt_import 159 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 160 161 query = ( 162 sqlalchemy 163 .select(plugins_tbl.c.user_id) 164 .where(plugins_tbl.c.plugin_name == plugin.name) 165 ) 166 167 try: 168 return int(self.value(query, debug=debug)) 169 except Exception: 170 return None
Return a plugin's user ID.
172def get_plugin_username( 173 self, 174 plugin: 'mrsm.core.Plugin', 175 debug: bool = False 176) -> Optional[str]: 177 """ 178 Return the username of a plugin's owner. 179 """ 180 ### ensure plugins table exists 181 from meerschaum.connectors.sql.tables import get_tables 182 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 183 users = get_tables(mrsm_instance=self, debug=debug)['users'] 184 from meerschaum.utils.packages import attempt_import 185 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 186 187 query = ( 188 sqlalchemy.select(users.c.username) 189 .where( 190 users.c.user_id == plugins_tbl.c.user_id 191 and plugins_tbl.c.plugin_name == plugin.name 192 ) 193 ) 194 195 return self.value(query, debug=debug)
Return the username of a plugin's owner.
198def get_plugin_attributes( 199 self, 200 plugin: 'mrsm.core.Plugin', 201 debug: bool = False 202) -> Dict[str, Any]: 203 """ 204 Return the attributes of a plugin. 205 """ 206 ### ensure plugins table exists 207 from meerschaum.connectors.sql.tables import get_tables 208 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 209 from meerschaum.utils.packages import attempt_import 210 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 211 212 query = ( 213 sqlalchemy 214 .select(plugins_tbl.c.attributes) 215 .where(plugins_tbl.c.plugin_name == plugin.name) 216 ) 217 218 _attr = self.value(query, debug=debug) 219 if isinstance(_attr, str): 220 _attr = json.loads(_attr) 221 elif _attr is None: 222 _attr = {} 223 return _attr
Return the attributes of a plugin.
16def get_users_pipe(self) -> mrsm.Pipe: 17 """ 18 Return the internal metadata pipe for users management. 19 """ 20 if '_users_pipe' in self.__dict__: 21 return self._users_pipe 22 23 cache_connector = self.__dict__.get('_cache_connector', None) 24 self._users_pipe = mrsm.Pipe( 25 'mrsm', 'users', 26 temporary=True, 27 cache=True, 28 cache_connector_keys=cache_connector, 29 static=True, 30 null_indices=False, 31 enforce=False, 32 autoincrement=True, 33 columns={ 34 'primary': 'user_id', 35 }, 36 dtypes={ 37 'user_id': 'int', 38 'username': 'string', 39 'attributes': 'json', 40 'user_type': 'string', 41 }, 42 indices={ 43 'unique': 'username', 44 }, 45 ) 46 return self._users_pipe
Return the internal metadata pipe for users management.
49def register_user( 50 self, 51 user: mrsm.core.User, 52 debug: bool = False, 53 **kw: Any 54) -> SuccessTuple: 55 """Register a new user.""" 56 from meerschaum.utils.packages import attempt_import 57 from meerschaum.utils.sql import json_flavors 58 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 59 60 valid_tuple = valid_username(user.username) 61 if not valid_tuple[0]: 62 return valid_tuple 63 64 old_id = self.get_user_id(user, debug=debug) 65 66 if old_id is not None: 67 return False, f"User '{user}' already exists." 68 69 ### ensure users table exists 70 from meerschaum.connectors.sql.tables import get_tables 71 tables = get_tables(mrsm_instance=self, debug=debug) 72 73 import json 74 bind_variables = { 75 'username': user.username, 76 'email': user.email, 77 'password_hash': user.password_hash, 78 'user_type': user.type, 79 'attributes': ( 80 json.dumps(user.attributes) 81 if self.flavor not in json_flavors 82 else user.attributes 83 ), 84 } 85 if old_id is not None: 86 return False, f"User '{user.username}' already exists." 87 if old_id is None: 88 query = ( 89 sqlalchemy.insert(tables['users']). 90 values(**bind_variables) 91 ) 92 93 result = self.exec(query, debug=debug) 94 if result is None: 95 return False, f"Failed to register user '{user}'." 96 return True, f"Successfully registered user '{user}'."
Register a new user.
188def get_user_id( 189 self, 190 user: 'mrsm.core.User', 191 debug: bool = False 192) -> Optional[int]: 193 """If a user is registered, return the `user_id`.""" 194 ### ensure users table exists 195 from meerschaum.utils.packages import attempt_import 196 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 197 from meerschaum.connectors.sql.tables import get_tables 198 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 199 200 query = ( 201 sqlalchemy.select(users_tbl.c.user_id) 202 .where(users_tbl.c.username == user.username) 203 ) 204 205 result = self.value(query, debug=debug) 206 if result is not None: 207 return int(result) 208 return None
If a user is registered, return the user_id.
282def get_users( 283 self, 284 debug: bool = False, 285 **kw: Any 286) -> List[str]: 287 """ 288 Get the registered usernames. 289 """ 290 ### ensure users table exists 291 from meerschaum.connectors.sql.tables import get_tables 292 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 293 from meerschaum.utils.packages import attempt_import 294 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 295 296 query = sqlalchemy.select(users_tbl.c.username) 297 298 return list(self.read(query, debug=debug)['username'])
Get the registered usernames.
133def edit_user( 134 self, 135 user: 'mrsm.core.User', 136 debug: bool = False, 137 **kw: Any 138) -> SuccessTuple: 139 """Update an existing user's metadata.""" 140 from meerschaum.utils.packages import attempt_import 141 from meerschaum.utils.sql import json_flavors 142 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 143 from meerschaum.connectors.sql.tables import get_tables 144 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 145 146 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 147 if user_id is None: 148 return False, ( 149 f"User '{user.username}' does not exist. " 150 f"Register user '{user.username}' before editing." 151 ) 152 user.user_id = user_id 153 154 import json 155 valid_tuple = valid_username(user.username) 156 if not valid_tuple[0]: 157 return valid_tuple 158 159 bind_variables = { 160 'user_id' : user_id, 161 'username' : user.username, 162 } 163 if user.password != '': 164 bind_variables['password_hash'] = user.password_hash 165 if user.email != '': 166 bind_variables['email'] = user.email 167 if user.attributes is not None and user.attributes != {}: 168 bind_variables['attributes'] = ( 169 json.dumps(user.attributes) if self.flavor not in json_flavors 170 else user.attributes 171 ) 172 if user.type != '': 173 bind_variables['user_type'] = user.type 174 175 query = ( 176 sqlalchemy 177 .update(users_tbl) 178 .values(**bind_variables) 179 .where(users_tbl.c.user_id == user_id) 180 ) 181 182 result = self.exec(query, debug=debug) 183 if result is None: 184 return False, f"Failed to edit user '{user}'." 185 return True, f"Successfully edited user '{user}'."
Update an existing user's metadata.
250def delete_user( 251 self, 252 user: 'mrsm.core.User', 253 debug: bool = False 254) -> SuccessTuple: 255 """Delete a user's record from the users table.""" 256 ### ensure users table exists 257 from meerschaum.connectors.sql.tables import get_tables 258 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 259 plugins = get_tables(mrsm_instance=self, debug=debug)['plugins'] 260 from meerschaum.utils.packages import attempt_import 261 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 262 263 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 264 265 if user_id is None: 266 return False, f"User '{user.username}' is not registered and cannot be deleted." 267 268 query = sqlalchemy.delete(users_tbl).where(users_tbl.c.user_id == user_id) 269 270 result = self.exec(query, debug=debug) 271 if result is None: 272 return False, f"Failed to delete user '{user}'." 273 274 query = sqlalchemy.delete(plugins).where(plugins.c.user_id == user_id) 275 result = self.exec(query, debug=debug) 276 if result is None: 277 return False, f"Failed to delete plugins of user '{user}'." 278 279 return True, f"Successfully deleted user '{user}'"
Delete a user's record from the users table.
301def get_user_password_hash( 302 self, 303 user: 'mrsm.core.User', 304 debug: bool = False, 305 **kw: Any 306) -> Optional[str]: 307 """ 308 Return the password has for a user. 309 **NOTE**: This may be dangerous and is only allowed if the security settings explicity allow it. 310 """ 311 from meerschaum.utils.debug import dprint 312 from meerschaum.connectors.sql.tables import get_tables 313 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 314 from meerschaum.utils.packages import attempt_import 315 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 316 317 if user.user_id is not None: 318 user_id = user.user_id 319 if debug: 320 dprint(f"Already given user_id: {user_id}") 321 else: 322 if debug: 323 dprint("Fetching user_id...") 324 user_id = self.get_user_id(user, debug=debug) 325 326 if user_id is None: 327 return None 328 329 query = sqlalchemy.select(users_tbl.c.password_hash).where(users_tbl.c.user_id == user_id) 330 331 return self.value(query, debug=debug)
Return the password has for a user. NOTE: This may be dangerous and is only allowed if the security settings explicity allow it.
334def get_user_type( 335 self, 336 user: 'mrsm.core.User', 337 debug: bool = False, 338 **kw: Any 339) -> Optional[str]: 340 """ 341 Return the user's type. 342 """ 343 from meerschaum.connectors.sql.tables import get_tables 344 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 345 from meerschaum.utils.packages import attempt_import 346 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 347 348 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 349 350 if user_id is None: 351 return None 352 353 query = sqlalchemy.select(users_tbl.c.user_type).where(users_tbl.c.user_id == user_id) 354 355 return self.value(query, debug=debug)
Return the user's type.
210def get_user_attributes( 211 self, 212 user: 'mrsm.core.User', 213 debug: bool = False 214) -> Union[Dict[str, Any], None]: 215 """ 216 Return the user's attributes. 217 """ 218 ### ensure users table exists 219 from meerschaum.utils.warnings import warn 220 from meerschaum.utils.packages import attempt_import 221 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 222 from meerschaum.connectors.sql.tables import get_tables 223 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 224 225 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 226 227 query = ( 228 sqlalchemy.select(users_tbl.c.attributes) 229 .where(users_tbl.c.user_id == user_id) 230 ) 231 232 result = self.value(query, debug=debug) 233 if result is not None and not isinstance(result, dict): 234 try: 235 result = dict(result) 236 _parsed = True 237 except Exception: 238 _parsed = False 239 if not _parsed: 240 try: 241 import json 242 result = json.loads(result) 243 _parsed = True 244 except Exception: 245 _parsed = False 246 if not _parsed: 247 warn(f"Received unexpected type for attributes: {result}") 248 return result
Return the user's attributes.
15@classmethod 16def from_uri( 17 cls, 18 uri: str, 19 label: Optional[str] = None, 20 as_dict: bool = False, 21) -> Union[ 22 'meerschaum.connectors.SQLConnector', 23 Dict[str, Union[str, int]], 24]: 25 """ 26 Create a new SQLConnector from a URI string. 27 28 Parameters 29 ---------- 30 uri: str 31 The URI connection string. 32 33 label: Optional[str], default None 34 If provided, use this as the connector label. 35 Otherwise use the determined database name. 36 37 as_dict: bool, default False 38 If `True`, return a dictionary of the keyword arguments 39 necessary to create a new `SQLConnector`, otherwise create a new object. 40 41 Returns 42 ------- 43 A new SQLConnector object or a dictionary of attributes (if `as_dict` is `True`). 44 """ 45 46 params = cls.parse_uri(uri) 47 params['uri'] = uri 48 flavor = params.get('flavor', None) 49 if not flavor or flavor not in cls.flavor_configs: 50 error(f"Invalid flavor '{flavor}' detected from the provided URI.") 51 52 if 'database' not in params: 53 error("Unable to determine the database from the provided URI.") 54 55 if flavor in ('sqlite', 'duckdb', 'geopackage'): 56 if params['database'] == ':memory:': 57 params['label'] = label or f'memory_{flavor}' 58 else: 59 params['label'] = label or params['database'].split(os.path.sep)[-1].lower() 60 else: 61 params['label'] = label or ( 62 ( 63 (params['username'] + '@' if 'username' in params else '') 64 + params.get('host', '') 65 + ('/' if 'host' in params else '') 66 + params.get('database', '') 67 ).lower() 68 ) 69 70 return cls(**params) if not as_dict else params
Create a new SQLConnector from a URI string.
Parameters
- uri (str): The URI connection string.
- label (Optional[str], default None): If provided, use this as the connector label. Otherwise use the determined database name.
- as_dict (bool, default False):
If
True, return a dictionary of the keyword arguments necessary to create a newSQLConnector, otherwise create a new object.
Returns
- A new SQLConnector object or a dictionary of attributes (if
as_dictisTrue).
73@staticmethod 74def parse_uri(uri: str) -> Dict[str, Any]: 75 """ 76 Parse a URI string into a dictionary of parameters. 77 78 Parameters 79 ---------- 80 uri: str 81 The database connection URI. 82 83 Returns 84 ------- 85 A dictionary of attributes. 86 87 Examples 88 -------- 89 >>> parse_uri('sqlite:////home/foo/bar.db') 90 {'database': '/home/foo/bar.db', 'flavor': 'sqlite'} 91 >>> parse_uri( 92 ... 'mssql+pyodbc://sa:supersecureSECRETPASSWORD123!@localhost:1439' 93 ... + '/master?driver=ODBC+Driver+17+for+SQL+Server' 94 ... ) 95 {'host': 'localhost', 'database': 'master', 'username': 'sa', 96 'password': 'supersecureSECRETPASSWORD123!', 'port': 1439, 'flavor': 'mssql', 97 'driver': 'ODBC Driver 17 for SQL Server'} 98 >>> 99 """ 100 from urllib.parse import parse_qs, urlparse 101 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 102 parser = sqlalchemy.engine.url.make_url 103 params = parser(uri).translate_connect_args() 104 params['flavor'] = uri.split(':')[0].split('+')[0] 105 if params['flavor'] == 'postgres': 106 params['flavor'] = 'postgresql' 107 if '?' in uri: 108 parsed_uri = urlparse(uri) 109 for key, value in parse_qs(parsed_uri.query).items(): 110 params.update({key: value[0]}) 111 112 if '--search_path' in params.get('options', ''): 113 params.update({'schema': params['options'].replace('--search_path=', '', 1)}) 114 return params
Parse a URI string into a dictionary of parameters.
Parameters
- uri (str): The database connection URI.
Returns
- A dictionary of attributes.
Examples
>>> parse_uri('sqlite:////home/foo/bar.db')
{'database': '/home/foo/bar.db', 'flavor': 'sqlite'}
>>> parse_uri(
... 'mssql+pyodbc://sa:supersecureSECRETPASSWORD123!@localhost:1439'
... + '/master?driver=ODBC+Driver+17+for+SQL+Server'
... )
{'host': 'localhost', 'database': 'master', 'username': 'sa',
'password': 'supersecureSECRETPASSWORD123!', 'port': 1439, 'flavor': 'mssql',
'driver': 'ODBC Driver 17 for SQL Server'}
>>>