meerschaum.connectors.sql
Subpackage for SQLConnector subclass
18class SQLConnector(Connector): 19 """ 20 Connect to SQL databases via `sqlalchemy`. 21 22 SQLConnectors may be used as Meerschaum instance connectors. 23 Read more about connectors and instances at 24 https://meerschaum.io/reference/connectors/ 25 26 """ 27 28 IS_INSTANCE: bool = True 29 30 from ._create_engine import flavor_configs, create_engine 31 from ._sql import ( 32 read, 33 value, 34 exec, 35 execute, 36 to_sql, 37 exec_queries, 38 get_connection, 39 _cleanup_connections, 40 ) 41 from meerschaum.utils.sql import test_connection 42 from ._fetch import fetch, get_pipe_metadef 43 from ._cli import cli, _cli_exit 44 from ._pipes import ( 45 fetch_pipes_keys, 46 create_indices, 47 drop_indices, 48 get_create_index_queries, 49 get_drop_index_queries, 50 get_add_columns_queries, 51 get_alter_columns_queries, 52 delete_pipe, 53 get_pipe_data, 54 get_pipe_data_query, 55 register_pipe, 56 edit_pipe, 57 get_pipe_id, 58 get_pipe_attributes, 59 sync_pipe, 60 sync_pipe_inplace, 61 get_sync_time, 62 pipe_exists, 63 get_pipe_rowcount, 64 drop_pipe, 65 clear_pipe, 66 deduplicate_pipe, 67 get_pipe_table, 68 get_pipe_columns_types, 69 get_to_sql_dtype, 70 get_pipe_schema, 71 create_pipe_table_from_df, 72 get_pipe_columns_indices, 73 get_temporary_target, 74 create_pipe_indices, 75 drop_pipe_indices, 76 get_pipe_index_names, 77 ) 78 from ._plugins import ( 79 register_plugin, 80 delete_plugin, 81 get_plugin_id, 82 get_plugin_version, 83 get_plugins, 84 get_plugin_user_id, 85 get_plugin_username, 86 get_plugin_attributes, 87 ) 88 from ._users import ( 89 register_user, 90 get_user_id, 91 get_users, 92 edit_user, 93 delete_user, 94 get_user_password_hash, 95 get_user_type, 96 get_user_attributes, 97 ) 98 from ._uri import from_uri, parse_uri 99 from ._instance import ( 100 _log_temporary_tables_creation, 101 _drop_temporary_table, 102 _drop_temporary_tables, 103 _drop_old_temporary_tables, 104 ) 105 106 def __init__( 107 self, 108 label: Optional[str] = None, 109 flavor: Optional[str] = None, 110 wait: bool = False, 111 connect: bool = False, 112 debug: bool = False, 113 **kw: Any 114 ): 115 """ 116 Parameters 117 ---------- 118 label: str, default 'main' 119 The identifying label for the connector. 120 E.g. for `sql:main`, 'main' is the label. 121 Defaults to 'main'. 122 123 flavor: Optional[str], default None 124 The database flavor, e.g. 125 `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc. 126 To see supported flavors, run the `bootstrap connectors` command. 127 128 wait: bool, default False 129 If `True`, block until a database connection has been made. 130 Defaults to `False`. 131 132 connect: bool, default False 133 If `True`, immediately attempt to connect the database and raise 134 a warning if the connection fails. 135 Defaults to `False`. 136 137 debug: bool, default False 138 Verbosity toggle. 139 Defaults to `False`. 140 141 kw: Any 142 All other arguments will be passed to the connector's attributes. 143 Therefore, a connector may be made without being registered, 144 as long enough parameters are supplied to the constructor. 145 """ 146 if 'uri' in kw: 147 uri = kw['uri'] 148 if uri.startswith('postgres') and not uri.startswith('postgresql'): 149 uri = uri.replace('postgres', 'postgresql', 1) 150 if uri.startswith('postgresql') and not uri.startswith('postgresql+'): 151 uri = uri.replace('postgresql://', 'postgresql+psycopg://', 1) 152 if uri.startswith('timescaledb://'): 153 uri = uri.replace('timescaledb://', 'postgresql+psycopg://', 1) 154 flavor = 'timescaledb' 155 if uri.startswith('postgis://'): 156 uri = uri.replace('postgis://', 'postgresql+psycopg://', 1) 157 flavor = 'postgis' 158 kw['uri'] = uri 159 from_uri_params = self.from_uri(kw['uri'], as_dict=True) 160 label = label or from_uri_params.get('label', None) 161 _ = from_uri_params.pop('label', None) 162 163 ### Sometimes the flavor may be provided with a URI. 164 kw.update(from_uri_params) 165 if flavor: 166 kw['flavor'] = flavor 167 168 ### set __dict__ in base class 169 super().__init__( 170 'sql', 171 label = label or self.__dict__.get('label', None), 172 **kw 173 ) 174 175 if self.__dict__.get('flavor', None) == 'sqlite': 176 self._reset_attributes() 177 self._set_attributes( 178 'sql', 179 label = label, 180 inherit_default = False, 181 **kw 182 ) 183 ### For backwards compatability reasons, set the path for sql:local if its missing. 184 if self.label == 'local' and not self.__dict__.get('database', None): 185 from meerschaum.config._paths import SQLITE_DB_PATH 186 self.database = str(SQLITE_DB_PATH) 187 188 ### ensure flavor and label are set accordingly 189 if 'flavor' not in self.__dict__: 190 if flavor is None and 'uri' not in self.__dict__: 191 raise ValueError( 192 f" Missing flavor. Provide flavor as a key for '{self}'." 193 ) 194 self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None) 195 196 if self.flavor == 'postgres': 197 self.flavor = 'postgresql' 198 199 self._debug = debug 200 ### Store the PID and thread at initialization 201 ### so we can dispose of the Pool in child processes or threads. 202 import os 203 import threading 204 self._pid = os.getpid() 205 self._thread_ident = threading.current_thread().ident 206 self._sessions = {} 207 self._locks = {'_sessions': threading.RLock(), } 208 209 ### verify the flavor's requirements are met 210 if self.flavor not in self.flavor_configs: 211 error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector") 212 if not self.__dict__.get('uri'): 213 self.verify_attributes( 214 self.flavor_configs[self.flavor].get('requirements', set()), 215 debug=debug, 216 ) 217 218 if wait: 219 from meerschaum.connectors.poll import retry_connect 220 retry_connect(connector=self, debug=debug) 221 222 if connect: 223 if not self.test_connection(debug=debug): 224 warn(f"Failed to connect with connector '{self}'!", stack=False) 225 226 @property 227 def Session(self): 228 if '_Session' not in self.__dict__: 229 if self.engine is None: 230 return None 231 232 from meerschaum.utils.packages import attempt_import 233 sqlalchemy_orm = attempt_import('sqlalchemy.orm', lazy=False) 234 session_factory = sqlalchemy_orm.sessionmaker(self.engine) 235 self._Session = sqlalchemy_orm.scoped_session(session_factory) 236 237 return self._Session 238 239 @property 240 def engine(self): 241 """ 242 Return the SQLAlchemy engine connected to the configured database. 243 """ 244 import os 245 import threading 246 if '_engine' not in self.__dict__: 247 self._engine, self._engine_str = self.create_engine(include_uri=True) 248 249 same_process = os.getpid() == self._pid 250 same_thread = threading.current_thread().ident == self._thread_ident 251 252 ### handle child processes 253 if not same_process: 254 self._pid = os.getpid() 255 self._thread = threading.current_thread() 256 warn("Different PID detected. Disposing of connections...") 257 self._engine.dispose() 258 259 ### handle different threads 260 if not same_thread: 261 if self.flavor == 'duckdb': 262 warn("Different thread detected.") 263 self._engine.dispose() 264 265 return self._engine 266 267 @property 268 def DATABASE_URL(self) -> str: 269 """ 270 Return the URI connection string (alias for `SQLConnector.URI`. 271 """ 272 _ = self.engine 273 return str(self._engine_str) 274 275 @property 276 def URI(self) -> str: 277 """ 278 Return the URI connection string. 279 """ 280 _ = self.engine 281 return str(self._engine_str) 282 283 @property 284 def IS_THREAD_SAFE(self) -> str: 285 """ 286 Return whether this connector may be multithreaded. 287 """ 288 if self.flavor in ('duckdb', 'oracle'): 289 return False 290 if self.flavor == 'sqlite': 291 return ':memory:' not in self.URI 292 return True 293 294 @property 295 def metadata(self): 296 """ 297 Return the metadata bound to this configured schema. 298 """ 299 from meerschaum.utils.packages import attempt_import 300 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 301 if '_metadata' not in self.__dict__: 302 self._metadata = sqlalchemy.MetaData(schema=self.schema) 303 return self._metadata 304 305 @property 306 def instance_schema(self): 307 """ 308 Return the schema name for Meerschaum tables. 309 """ 310 return self.schema 311 312 @property 313 def internal_schema(self): 314 """ 315 Return the schema name for internal tables. 316 """ 317 from meerschaum.config.static import STATIC_CONFIG 318 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 319 schema_name = self.__dict__.get('internal_schema', None) or ( 320 STATIC_CONFIG['sql']['internal_schema'] 321 if self.flavor not in NO_SCHEMA_FLAVORS 322 else self.schema 323 ) 324 325 if '_internal_schema' not in self.__dict__: 326 self._internal_schema = schema_name 327 return self._internal_schema 328 329 @property 330 def db(self) -> Optional[databases.Database]: 331 from meerschaum.utils.packages import attempt_import 332 databases = attempt_import('databases', lazy=False, install=True) 333 url = self.DATABASE_URL 334 if 'mysql' in url: 335 url = url.replace('+pymysql', '') 336 if '_db' not in self.__dict__: 337 try: 338 self._db = databases.Database(url) 339 except KeyError: 340 ### Likely encountered an unsupported flavor. 341 from meerschaum.utils.warnings import warn 342 self._db = None 343 return self._db 344 345 @property 346 def db_version(self) -> Union[str, None]: 347 """ 348 Return the database version. 349 """ 350 _db_version = self.__dict__.get('_db_version', None) 351 if _db_version is not None: 352 return _db_version 353 354 from meerschaum.utils.sql import get_db_version 355 self._db_version = get_db_version(self) 356 return self._db_version 357 358 @property 359 def schema(self) -> Union[str, None]: 360 """ 361 Return the default schema to use. 362 A value of `None` will not prepend a schema. 363 """ 364 if 'schema' in self.__dict__: 365 return self.__dict__['schema'] 366 367 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 368 if self.flavor in NO_SCHEMA_FLAVORS: 369 self.__dict__['schema'] = None 370 return None 371 372 sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False) 373 _schema = sqlalchemy.inspect(self.engine).default_schema_name 374 self.__dict__['schema'] = _schema 375 return _schema 376 377 def __getstate__(self): 378 return self.__dict__ 379 380 def __setstate__(self, d): 381 self.__dict__.update(d) 382 383 def __call__(self): 384 return self
Connect to SQL databases via sqlalchemy
.
SQLConnectors may be used as Meerschaum instance connectors. Read more about connectors and instances at https://meerschaum.io/reference/connectors/
106 def __init__( 107 self, 108 label: Optional[str] = None, 109 flavor: Optional[str] = None, 110 wait: bool = False, 111 connect: bool = False, 112 debug: bool = False, 113 **kw: Any 114 ): 115 """ 116 Parameters 117 ---------- 118 label: str, default 'main' 119 The identifying label for the connector. 120 E.g. for `sql:main`, 'main' is the label. 121 Defaults to 'main'. 122 123 flavor: Optional[str], default None 124 The database flavor, e.g. 125 `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc. 126 To see supported flavors, run the `bootstrap connectors` command. 127 128 wait: bool, default False 129 If `True`, block until a database connection has been made. 130 Defaults to `False`. 131 132 connect: bool, default False 133 If `True`, immediately attempt to connect the database and raise 134 a warning if the connection fails. 135 Defaults to `False`. 136 137 debug: bool, default False 138 Verbosity toggle. 139 Defaults to `False`. 140 141 kw: Any 142 All other arguments will be passed to the connector's attributes. 143 Therefore, a connector may be made without being registered, 144 as long enough parameters are supplied to the constructor. 145 """ 146 if 'uri' in kw: 147 uri = kw['uri'] 148 if uri.startswith('postgres') and not uri.startswith('postgresql'): 149 uri = uri.replace('postgres', 'postgresql', 1) 150 if uri.startswith('postgresql') and not uri.startswith('postgresql+'): 151 uri = uri.replace('postgresql://', 'postgresql+psycopg://', 1) 152 if uri.startswith('timescaledb://'): 153 uri = uri.replace('timescaledb://', 'postgresql+psycopg://', 1) 154 flavor = 'timescaledb' 155 if uri.startswith('postgis://'): 156 uri = uri.replace('postgis://', 'postgresql+psycopg://', 1) 157 flavor = 'postgis' 158 kw['uri'] = uri 159 from_uri_params = self.from_uri(kw['uri'], as_dict=True) 160 label = label or from_uri_params.get('label', None) 161 _ = from_uri_params.pop('label', None) 162 163 ### Sometimes the flavor may be provided with a URI. 164 kw.update(from_uri_params) 165 if flavor: 166 kw['flavor'] = flavor 167 168 ### set __dict__ in base class 169 super().__init__( 170 'sql', 171 label = label or self.__dict__.get('label', None), 172 **kw 173 ) 174 175 if self.__dict__.get('flavor', None) == 'sqlite': 176 self._reset_attributes() 177 self._set_attributes( 178 'sql', 179 label = label, 180 inherit_default = False, 181 **kw 182 ) 183 ### For backwards compatability reasons, set the path for sql:local if its missing. 184 if self.label == 'local' and not self.__dict__.get('database', None): 185 from meerschaum.config._paths import SQLITE_DB_PATH 186 self.database = str(SQLITE_DB_PATH) 187 188 ### ensure flavor and label are set accordingly 189 if 'flavor' not in self.__dict__: 190 if flavor is None and 'uri' not in self.__dict__: 191 raise ValueError( 192 f" Missing flavor. Provide flavor as a key for '{self}'." 193 ) 194 self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None) 195 196 if self.flavor == 'postgres': 197 self.flavor = 'postgresql' 198 199 self._debug = debug 200 ### Store the PID and thread at initialization 201 ### so we can dispose of the Pool in child processes or threads. 202 import os 203 import threading 204 self._pid = os.getpid() 205 self._thread_ident = threading.current_thread().ident 206 self._sessions = {} 207 self._locks = {'_sessions': threading.RLock(), } 208 209 ### verify the flavor's requirements are met 210 if self.flavor not in self.flavor_configs: 211 error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector") 212 if not self.__dict__.get('uri'): 213 self.verify_attributes( 214 self.flavor_configs[self.flavor].get('requirements', set()), 215 debug=debug, 216 ) 217 218 if wait: 219 from meerschaum.connectors.poll import retry_connect 220 retry_connect(connector=self, debug=debug) 221 222 if connect: 223 if not self.test_connection(debug=debug): 224 warn(f"Failed to connect with connector '{self}'!", stack=False)
Parameters
- label (str, default 'main'):
The identifying label for the connector.
E.g. for
sql:main
, 'main' is the label. Defaults to 'main'. - flavor (Optional[str], default None):
The database flavor, e.g.
'sqlite'
,'postgresql'
,'cockroachdb'
, etc. To see supported flavors, run thebootstrap connectors
command. - wait (bool, default False):
If
True
, block until a database connection has been made. Defaults toFalse
. - connect (bool, default False):
If
True
, immediately attempt to connect the database and raise a warning if the connection fails. Defaults toFalse
. - debug (bool, default False):
Verbosity toggle.
Defaults to
False
. - kw (Any): All other arguments will be passed to the connector's attributes. Therefore, a connector may be made without being registered, as long enough parameters are supplied to the constructor.
226 @property 227 def Session(self): 228 if '_Session' not in self.__dict__: 229 if self.engine is None: 230 return None 231 232 from meerschaum.utils.packages import attempt_import 233 sqlalchemy_orm = attempt_import('sqlalchemy.orm', lazy=False) 234 session_factory = sqlalchemy_orm.sessionmaker(self.engine) 235 self._Session = sqlalchemy_orm.scoped_session(session_factory) 236 237 return self._Session
239 @property 240 def engine(self): 241 """ 242 Return the SQLAlchemy engine connected to the configured database. 243 """ 244 import os 245 import threading 246 if '_engine' not in self.__dict__: 247 self._engine, self._engine_str = self.create_engine(include_uri=True) 248 249 same_process = os.getpid() == self._pid 250 same_thread = threading.current_thread().ident == self._thread_ident 251 252 ### handle child processes 253 if not same_process: 254 self._pid = os.getpid() 255 self._thread = threading.current_thread() 256 warn("Different PID detected. Disposing of connections...") 257 self._engine.dispose() 258 259 ### handle different threads 260 if not same_thread: 261 if self.flavor == 'duckdb': 262 warn("Different thread detected.") 263 self._engine.dispose() 264 265 return self._engine
Return the SQLAlchemy engine connected to the configured database.
267 @property 268 def DATABASE_URL(self) -> str: 269 """ 270 Return the URI connection string (alias for `SQLConnector.URI`. 271 """ 272 _ = self.engine 273 return str(self._engine_str)
Return the URI connection string (alias for SQLConnector.URI
.
275 @property 276 def URI(self) -> str: 277 """ 278 Return the URI connection string. 279 """ 280 _ = self.engine 281 return str(self._engine_str)
Return the URI connection string.
283 @property 284 def IS_THREAD_SAFE(self) -> str: 285 """ 286 Return whether this connector may be multithreaded. 287 """ 288 if self.flavor in ('duckdb', 'oracle'): 289 return False 290 if self.flavor == 'sqlite': 291 return ':memory:' not in self.URI 292 return True
Return whether this connector may be multithreaded.
294 @property 295 def metadata(self): 296 """ 297 Return the metadata bound to this configured schema. 298 """ 299 from meerschaum.utils.packages import attempt_import 300 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 301 if '_metadata' not in self.__dict__: 302 self._metadata = sqlalchemy.MetaData(schema=self.schema) 303 return self._metadata
Return the metadata bound to this configured schema.
305 @property 306 def instance_schema(self): 307 """ 308 Return the schema name for Meerschaum tables. 309 """ 310 return self.schema
Return the schema name for Meerschaum tables.
312 @property 313 def internal_schema(self): 314 """ 315 Return the schema name for internal tables. 316 """ 317 from meerschaum.config.static import STATIC_CONFIG 318 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 319 schema_name = self.__dict__.get('internal_schema', None) or ( 320 STATIC_CONFIG['sql']['internal_schema'] 321 if self.flavor not in NO_SCHEMA_FLAVORS 322 else self.schema 323 ) 324 325 if '_internal_schema' not in self.__dict__: 326 self._internal_schema = schema_name 327 return self._internal_schema
Return the schema name for internal tables.
329 @property 330 def db(self) -> Optional[databases.Database]: 331 from meerschaum.utils.packages import attempt_import 332 databases = attempt_import('databases', lazy=False, install=True) 333 url = self.DATABASE_URL 334 if 'mysql' in url: 335 url = url.replace('+pymysql', '') 336 if '_db' not in self.__dict__: 337 try: 338 self._db = databases.Database(url) 339 except KeyError: 340 ### Likely encountered an unsupported flavor. 341 from meerschaum.utils.warnings import warn 342 self._db = None 343 return self._db
345 @property 346 def db_version(self) -> Union[str, None]: 347 """ 348 Return the database version. 349 """ 350 _db_version = self.__dict__.get('_db_version', None) 351 if _db_version is not None: 352 return _db_version 353 354 from meerschaum.utils.sql import get_db_version 355 self._db_version = get_db_version(self) 356 return self._db_version
Return the database version.
358 @property 359 def schema(self) -> Union[str, None]: 360 """ 361 Return the default schema to use. 362 A value of `None` will not prepend a schema. 363 """ 364 if 'schema' in self.__dict__: 365 return self.__dict__['schema'] 366 367 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 368 if self.flavor in NO_SCHEMA_FLAVORS: 369 self.__dict__['schema'] = None 370 return None 371 372 sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False) 373 _schema = sqlalchemy.inspect(self.engine).default_schema_name 374 self.__dict__['schema'] = _schema 375 return _schema
Return the default schema to use.
A value of None
will not prepend a schema.
193def create_engine( 194 self, 195 include_uri: bool = False, 196 debug: bool = False, 197 **kw 198) -> 'sqlalchemy.engine.Engine': 199 """Create a sqlalchemy engine by building the engine string.""" 200 from meerschaum.utils.packages import attempt_import 201 from meerschaum.utils.warnings import error, warn 202 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 203 import urllib 204 import copy 205 ### Install and patch required drivers. 206 if self.flavor in install_flavor_drivers: 207 _ = attempt_import( 208 *install_flavor_drivers[self.flavor], 209 debug=debug, 210 lazy=False, 211 warn=False, 212 ) 213 if self.flavor == 'mssql': 214 _init_mssql_sqlalchemy() 215 if self.flavor in require_patching_flavors: 216 from meerschaum.utils.packages import determine_version, _monkey_patch_get_distribution 217 import pathlib 218 for install_name, import_name in require_patching_flavors[self.flavor]: 219 pkg = attempt_import( 220 import_name, 221 debug=debug, 222 lazy=False, 223 warn=False 224 ) 225 _monkey_patch_get_distribution( 226 install_name, determine_version(pathlib.Path(pkg.__file__), venv='mrsm') 227 ) 228 229 ### supplement missing values with defaults (e.g. port number) 230 for a, value in flavor_configs[self.flavor]['defaults'].items(): 231 if a not in self.__dict__: 232 self.__dict__[a] = value 233 234 ### Verify that everything is in order. 235 if self.flavor not in flavor_configs: 236 error(f"Cannot create a connector with the flavor '{self.flavor}'.") 237 238 _engine = flavor_configs[self.flavor].get('engine', None) 239 _username = self.__dict__.get('username', None) 240 _password = self.__dict__.get('password', None) 241 _host = self.__dict__.get('host', None) 242 _port = self.__dict__.get('port', None) 243 _database = self.__dict__.get('database', None) 244 _options = self.__dict__.get('options', {}) 245 if isinstance(_options, str): 246 _options = dict(urllib.parse.parse_qsl(_options)) 247 _uri = self.__dict__.get('uri', None) 248 249 ### Handle registering specific dialects (due to installing in virtual environments). 250 if self.flavor in flavor_dialects: 251 sqlalchemy.dialects.registry.register(*flavor_dialects[self.flavor]) 252 253 ### self._sys_config was deepcopied and can be updated safely 254 if self.flavor in ("sqlite", "duckdb"): 255 engine_str = f"{_engine}:///{_database}" if not _uri else _uri 256 if 'create_engine' not in self._sys_config: 257 self._sys_config['create_engine'] = {} 258 if 'connect_args' not in self._sys_config['create_engine']: 259 self._sys_config['create_engine']['connect_args'] = {} 260 self._sys_config['create_engine']['connect_args'].update({"check_same_thread": False}) 261 else: 262 engine_str = ( 263 _engine + "://" + (_username if _username is not None else '') + 264 ((":" + urllib.parse.quote_plus(_password)) if _password is not None else '') + 265 "@" + _host + ((":" + str(_port)) if _port is not None else '') + 266 (("/" + _database) if _database is not None else '') 267 + (("?" + urllib.parse.urlencode(_options)) if _options else '') 268 ) if not _uri else _uri 269 270 ### Sometimes the timescaledb:// flavor can slip in. 271 if _uri and self.flavor in _uri: 272 if self.flavor in ('timescaledb', 'postgis'): 273 engine_str = engine_str.replace(self.flavor, 'postgresql', 1) 274 elif _uri.startswith('postgresql://'): 275 engine_str = engine_str.replace('postgresql://', 'postgresql+psycopg2://') 276 277 if debug: 278 dprint( 279 ( 280 (engine_str.replace(':' + _password, ':' + ('*' * len(_password)))) 281 if _password is not None else engine_str 282 ) + '\n' + f"{self._sys_config.get('create_engine', {}).get('connect_args', {})}" 283 ) 284 285 _kw_copy = copy.deepcopy(kw) 286 287 ### NOTE: Order of inheritance: 288 ### 1. Defaults 289 ### 2. System configuration 290 ### 3. Connector configuration 291 ### 4. Keyword arguments 292 _create_engine_args = flavor_configs.get(self.flavor, {}).get('create_engine', {}) 293 def _apply_create_engine_args(update): 294 if 'ALL' not in flavor_configs[self.flavor].get('omit_create_engine', {}): 295 _create_engine_args.update( 296 { k: v for k, v in update.items() 297 if 'omit_create_engine' not in flavor_configs[self.flavor] 298 or k not in flavor_configs[self.flavor].get('omit_create_engine') 299 } 300 ) 301 _apply_create_engine_args(self._sys_config.get('create_engine', {})) 302 _apply_create_engine_args(self.__dict__.get('create_engine', {})) 303 _apply_create_engine_args(_kw_copy) 304 305 try: 306 engine = sqlalchemy.create_engine( 307 engine_str, 308 ### I know this looks confusing, and maybe it's bad code, 309 ### but it's simple. It dynamically parses the config string 310 ### and splits it to separate the class name (QueuePool) 311 ### from the module name (sqlalchemy.pool). 312 poolclass = getattr( 313 attempt_import( 314 ".".join(self._sys_config['poolclass'].split('.')[:-1]) 315 ), 316 self._sys_config['poolclass'].split('.')[-1] 317 ), 318 echo = debug, 319 **_create_engine_args 320 ) 321 except Exception: 322 warn(f"Failed to create connector '{self}':\n{traceback.format_exc()}", stack=False) 323 engine = None 324 325 if include_uri: 326 return engine, engine_str 327 return engine
Create a sqlalchemy engine by building the engine string.
28def read( 29 self, 30 query_or_table: Union[str, sqlalchemy.Query], 31 params: Union[Dict[str, Any], List[str], None] = None, 32 dtype: Optional[Dict[str, Any]] = None, 33 coerce_float: bool = True, 34 chunksize: Optional[int] = -1, 35 workers: Optional[int] = None, 36 chunk_hook: Optional[Callable[[pandas.DataFrame], Any]] = None, 37 as_hook_results: bool = False, 38 chunks: Optional[int] = None, 39 schema: Optional[str] = None, 40 as_chunks: bool = False, 41 as_iterator: bool = False, 42 as_dask: bool = False, 43 index_col: Optional[str] = None, 44 silent: bool = False, 45 debug: bool = False, 46 **kw: Any 47) -> Union[ 48 pandas.DataFrame, 49 dask.DataFrame, 50 List[pandas.DataFrame], 51 List[Any], 52 None, 53]: 54 """ 55 Read a SQL query or table into a pandas dataframe. 56 57 Parameters 58 ---------- 59 query_or_table: Union[str, sqlalchemy.Query] 60 The SQL query (sqlalchemy Query or string) or name of the table from which to select. 61 62 params: Optional[Dict[str, Any]], default None 63 `List` or `Dict` of parameters to pass to `pandas.read_sql()`. 64 See the pandas documentation for more information: 65 https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html 66 67 dtype: Optional[Dict[str, Any]], default None 68 A dictionary of data types to pass to `pandas.read_sql()`. 69 See the pandas documentation for more information: 70 https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html 71 72 chunksize: Optional[int], default -1 73 How many chunks to read at a time. `None` will read everything in one large chunk. 74 Defaults to system configuration. 75 76 **NOTE:** DuckDB does not allow for chunking. 77 78 workers: Optional[int], default None 79 How many threads to use when consuming the generator. 80 Only applies if `chunk_hook` is provided. 81 82 chunk_hook: Optional[Callable[[pandas.DataFrame], Any]], default None 83 Hook function to execute once per chunk, e.g. writing and reading chunks intermittently. 84 See `--sync-chunks` for an example. 85 **NOTE:** `as_iterator` MUST be False (default). 86 87 as_hook_results: bool, default False 88 If `True`, return a `List` of the outputs of the hook function. 89 Only applicable if `chunk_hook` is not None. 90 91 **NOTE:** `as_iterator` MUST be `False` (default). 92 93 chunks: Optional[int], default None 94 Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and 95 return into a single dataframe. 96 For example, to limit the returned dataframe to 100,000 rows, 97 you could specify a `chunksize` of `1000` and `chunks` of `100`. 98 99 schema: Optional[str], default None 100 If just a table name is provided, optionally specify the table schema. 101 Defaults to `SQLConnector.schema`. 102 103 as_chunks: bool, default False 104 If `True`, return a list of DataFrames. 105 Otherwise return a single DataFrame. 106 107 as_iterator: bool, default False 108 If `True`, return the pandas DataFrame iterator. 109 `chunksize` must not be `None` (falls back to 1000 if so), 110 and hooks are not called in this case. 111 112 index_col: Optional[str], default None 113 If using Dask, use this column as the index column. 114 If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame. 115 116 silent: bool, default False 117 If `True`, don't raise warnings in case of errors. 118 Defaults to `False`. 119 120 Returns 121 ------- 122 A `pd.DataFrame` (default case), or an iterator, or a list of dataframes / iterators, 123 or `None` if something breaks. 124 125 """ 126 if chunks is not None and chunks <= 0: 127 return [] 128 from meerschaum.utils.sql import sql_item_name, truncate_item_name 129 from meerschaum.utils.dtypes import are_dtypes_equal, coerce_timezone 130 from meerschaum.utils.dtypes.sql import TIMEZONE_NAIVE_FLAVORS 131 from meerschaum.utils.packages import attempt_import, import_pandas 132 from meerschaum.utils.pool import get_pool 133 from meerschaum.utils.dataframe import chunksize_to_npartitions, get_numeric_cols 134 import warnings 135 import traceback 136 from decimal import Decimal 137 pd = import_pandas() 138 dd = None 139 is_dask = 'dask' in pd.__name__ 140 pandas = attempt_import('pandas') 141 is_dask = dd is not None 142 npartitions = chunksize_to_npartitions(chunksize) 143 if is_dask: 144 chunksize = None 145 schema = schema or self.schema 146 utc_dt_cols = [ 147 col 148 for col, typ in dtype.items() 149 if are_dtypes_equal(typ, 'datetime') and 'utc' in typ.lower() 150 ] if dtype else [] 151 152 if dtype and utc_dt_cols and self.flavor in TIMEZONE_NAIVE_FLAVORS: 153 dtype = dtype.copy() 154 for col in utc_dt_cols: 155 dtype[col] = 'datetime64[ns]' 156 157 pool = get_pool(workers=workers) 158 sqlalchemy = attempt_import("sqlalchemy", lazy=False) 159 default_chunksize = self._sys_config.get('chunksize', None) 160 chunksize = chunksize if chunksize != -1 else default_chunksize 161 if chunksize is None and as_iterator: 162 if not silent and self.flavor not in _disallow_chunks_flavors: 163 warn( 164 "An iterator may only be generated if chunksize is not None.\n" 165 + "Falling back to a chunksize of 1000.", stacklevel=3, 166 ) 167 chunksize = 1000 168 if chunksize is not None and self.flavor in _max_chunks_flavors: 169 if chunksize > _max_chunks_flavors[self.flavor]: 170 if chunksize != default_chunksize: 171 warn( 172 f"The specified chunksize of {chunksize} exceeds the maximum of " 173 + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n" 174 + f" Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.", 175 stacklevel=3, 176 ) 177 chunksize = _max_chunks_flavors[self.flavor] 178 179 if chunksize is not None and self.flavor in _disallow_chunks_flavors: 180 chunksize = None 181 182 if debug: 183 import time 184 start = time.perf_counter() 185 dprint(f"[{self}]\n{query_or_table}") 186 dprint(f"[{self}] Fetching with chunksize: {chunksize}") 187 188 ### This might be sqlalchemy object or the string of a table name. 189 ### We check for spaces and quotes to see if it might be a weird table. 190 if ( 191 ' ' not in str(query_or_table) 192 or ( 193 ' ' in str(query_or_table) 194 and str(query_or_table).startswith('"') 195 and str(query_or_table).endswith('"') 196 ) 197 ): 198 truncated_table_name = truncate_item_name(str(query_or_table), self.flavor) 199 if truncated_table_name != str(query_or_table) and not silent: 200 warn( 201 f"Table '{query_or_table}' is too long for '{self.flavor}'," 202 + f" will instead read the table '{truncated_table_name}'." 203 ) 204 205 query_or_table = sql_item_name(str(query_or_table), self.flavor, schema) 206 if debug: 207 dprint(f"[{self}] Reading from table {query_or_table}") 208 formatted_query = sqlalchemy.text("SELECT * FROM " + str(query_or_table)) 209 str_query = f"SELECT * FROM {query_or_table}" 210 else: 211 str_query = query_or_table 212 213 formatted_query = ( 214 sqlalchemy.text(str_query) 215 if not is_dask and isinstance(str_query, str) 216 else format_sql_query_for_dask(str_query) 217 ) 218 219 chunk_list = [] 220 chunk_hook_results = [] 221 def _process_chunk(_chunk, _retry_on_failure: bool = True): 222 if self.flavor in TIMEZONE_NAIVE_FLAVORS: 223 for col in utc_dt_cols: 224 _chunk[col] = coerce_timezone(_chunk[col], strip_timezone=False) 225 if not as_hook_results: 226 chunk_list.append(_chunk) 227 if chunk_hook is None: 228 return None 229 230 result = None 231 try: 232 result = chunk_hook( 233 _chunk, 234 workers=workers, 235 chunksize=chunksize, 236 debug=debug, 237 **kw 238 ) 239 except Exception: 240 result = False, traceback.format_exc() 241 from meerschaum.utils.formatting import get_console 242 if not silent: 243 get_console().print_exception() 244 245 ### If the chunk fails to process, try it again one more time. 246 if isinstance(result, tuple) and result[0] is False: 247 if _retry_on_failure: 248 return _process_chunk(_chunk, _retry_on_failure=False) 249 250 return result 251 252 try: 253 stream_results = not as_iterator and chunk_hook is not None and chunksize is not None 254 with warnings.catch_warnings(): 255 warnings.filterwarnings('ignore', 'case sensitivity issues') 256 257 read_sql_query_kwargs = { 258 'params': params, 259 'dtype': dtype, 260 'coerce_float': coerce_float, 261 'index_col': index_col, 262 } 263 if is_dask: 264 if index_col is None: 265 dd = None 266 pd = attempt_import('pandas') 267 read_sql_query_kwargs.update({ 268 'chunksize': chunksize, 269 }) 270 else: 271 read_sql_query_kwargs.update({ 272 'chunksize': chunksize, 273 }) 274 275 if is_dask and dd is not None: 276 ddf = dd.read_sql_query( 277 formatted_query, 278 self.URI, 279 **read_sql_query_kwargs 280 ) 281 else: 282 283 def get_chunk_generator(connectable): 284 chunk_generator = pd.read_sql_query( 285 formatted_query, 286 self.engine, 287 **read_sql_query_kwargs 288 ) 289 to_return = ( 290 chunk_generator 291 if as_iterator or chunksize is None 292 else ( 293 list(pool.imap(_process_chunk, chunk_generator)) 294 if as_hook_results 295 else None 296 ) 297 ) 298 return chunk_generator, to_return 299 300 if self.flavor in SKIP_READ_TRANSACTION_FLAVORS: 301 chunk_generator, to_return = get_chunk_generator(self.engine) 302 else: 303 with self.engine.begin() as transaction: 304 with transaction.execution_options(stream_results=stream_results) as connection: 305 chunk_generator, to_return = get_chunk_generator(connection) 306 307 if to_return is not None: 308 return to_return 309 310 except Exception as e: 311 if debug: 312 dprint(f"[{self}] Failed to execute query:\n\n{query_or_table}\n\n") 313 if not silent: 314 warn(str(e), stacklevel=3) 315 from meerschaum.utils.formatting import get_console 316 if not silent: 317 get_console().print_exception() 318 319 return None 320 321 if is_dask and dd is not None: 322 ddf = ddf.reset_index() 323 return ddf 324 325 chunk_list = [] 326 read_chunks = 0 327 chunk_hook_results = [] 328 if chunksize is None: 329 chunk_list.append(chunk_generator) 330 elif as_iterator: 331 return chunk_generator 332 else: 333 try: 334 for chunk in chunk_generator: 335 if chunk_hook is not None: 336 chunk_hook_results.append( 337 chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw) 338 ) 339 chunk_list.append(chunk) 340 read_chunks += 1 341 if chunks is not None and read_chunks >= chunks: 342 break 343 except Exception as e: 344 warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3) 345 from meerschaum.utils.formatting import get_console 346 if not silent: 347 get_console().print_exception() 348 349 read_chunks = 0 350 try: 351 for chunk in chunk_generator: 352 if chunk_hook is not None: 353 chunk_hook_results.append( 354 chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw) 355 ) 356 chunk_list.append(chunk) 357 read_chunks += 1 358 if chunks is not None and read_chunks >= chunks: 359 break 360 except Exception as e: 361 warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3) 362 from meerschaum.utils.formatting import get_console 363 if not silent: 364 get_console().print_exception() 365 366 return None 367 368 ### If no chunks returned, read without chunks 369 ### to get columns 370 if len(chunk_list) == 0: 371 with warnings.catch_warnings(): 372 warnings.filterwarnings('ignore', 'case sensitivity issues') 373 _ = read_sql_query_kwargs.pop('chunksize', None) 374 with self.engine.begin() as connection: 375 chunk_list.append( 376 pd.read_sql_query( 377 formatted_query, 378 connection, 379 **read_sql_query_kwargs 380 ) 381 ) 382 383 ### call the hook on any missed chunks. 384 if chunk_hook is not None and len(chunk_list) > len(chunk_hook_results): 385 for c in chunk_list[len(chunk_hook_results):]: 386 chunk_hook_results.append( 387 chunk_hook(c, chunksize=chunksize, debug=debug, **kw) 388 ) 389 390 ### chunksize is not None so must iterate 391 if debug: 392 end = time.perf_counter() 393 dprint(f"Fetched {len(chunk_list)} chunks in {round(end - start, 2)} seconds.") 394 395 if as_hook_results: 396 return chunk_hook_results 397 398 ### Skip `pd.concat()` if `as_chunks` is specified. 399 if as_chunks: 400 for c in chunk_list: 401 c.reset_index(drop=True, inplace=True) 402 for col in get_numeric_cols(c): 403 c[col] = c[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x) 404 return chunk_list 405 406 df = pd.concat(chunk_list).reset_index(drop=True) 407 ### NOTE: The calls to `canonical()` are to drop leading and trailing zeroes. 408 for col in get_numeric_cols(df): 409 df[col] = df[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x) 410 411 return df
Read a SQL query or table into a pandas dataframe.
Parameters
- query_or_table (Union[str, sqlalchemy.Query]): The SQL query (sqlalchemy Query or string) or name of the table from which to select.
- params (Optional[Dict[str, Any]], default None):
List
orDict
of parameters to pass topandas.read_sql()
. See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html - dtype (Optional[Dict[str, Any]], default None):
A dictionary of data types to pass to
pandas.read_sql()
. See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html chunksize (Optional[int], default -1): How many chunks to read at a time.
None
will read everything in one large chunk. Defaults to system configuration.NOTE: DuckDB does not allow for chunking.
- workers (Optional[int], default None):
How many threads to use when consuming the generator.
Only applies if
chunk_hook
is provided. - chunk_hook (Optional[Callable[[pandas.DataFrame], Any]], default None):
Hook function to execute once per chunk, e.g. writing and reading chunks intermittently.
See
--sync-chunks
for an example. NOTE:as_iterator
MUST be False (default). as_hook_results (bool, default False): If
True
, return aList
of the outputs of the hook function. Only applicable ifchunk_hook
is not None.NOTE:
as_iterator
MUST beFalse
(default).- chunks (Optional[int], default None):
Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and
return into a single dataframe.
For example, to limit the returned dataframe to 100,000 rows,
you could specify a
chunksize
of1000
andchunks
of100
. - schema (Optional[str], default None):
If just a table name is provided, optionally specify the table schema.
Defaults to
SQLConnector.schema
. - as_chunks (bool, default False):
If
True
, return a list of DataFrames. Otherwise return a single DataFrame. - as_iterator (bool, default False):
If
True
, return the pandas DataFrame iterator.chunksize
must not beNone
(falls back to 1000 if so), and hooks are not called in this case. - index_col (Optional[str], default None): If using Dask, use this column as the index column. If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame.
- silent (bool, default False):
If
True
, don't raise warnings in case of errors. Defaults toFalse
.
Returns
- A
pd.DataFrame
(default case), or an iterator, or a list of dataframes / iterators, - or
None
if something breaks.
414def value( 415 self, 416 query: str, 417 *args: Any, 418 use_pandas: bool = False, 419 **kw: Any 420) -> Any: 421 """ 422 Execute the provided query and return the first value. 423 424 Parameters 425 ---------- 426 query: str 427 The SQL query to execute. 428 429 *args: Any 430 The arguments passed to `meerschaum.connectors.sql.SQLConnector.exec` 431 if `use_pandas` is `False` (default) or to `meerschaum.connectors.sql.SQLConnector.read`. 432 433 use_pandas: bool, default False 434 If `True`, use `meerschaum.connectors.SQLConnector.read`, otherwise use 435 `meerschaum.connectors.sql.SQLConnector.exec` (default). 436 **NOTE:** This is always `True` for DuckDB. 437 438 **kw: Any 439 See `args`. 440 441 Returns 442 ------- 443 Any value returned from the query. 444 445 """ 446 from meerschaum.utils.packages import attempt_import 447 if self.flavor == 'duckdb': 448 use_pandas = True 449 if use_pandas: 450 try: 451 return self.read(query, *args, **kw).iloc[0, 0] 452 except Exception: 453 return None 454 455 _close = kw.get('close', True) 456 _commit = kw.get('commit', (self.flavor != 'mssql')) 457 458 try: 459 result, connection = self.exec( 460 query, 461 *args, 462 with_connection=True, 463 close=False, 464 commit=_commit, 465 **kw 466 ) 467 first = result.first() if result is not None else None 468 _val = first[0] if first is not None else None 469 except Exception as e: 470 warn(e, stacklevel=3) 471 return None 472 if _close: 473 try: 474 connection.close() 475 except Exception as e: 476 warn("Failed to close connection with exception:\n" + str(e)) 477 return _val
Execute the provided query and return the first value.
Parameters
- query (str): The SQL query to execute.
- *args (Any):
The arguments passed to
meerschaum.connectors.sql.SQLConnector.exec
ifuse_pandas
isFalse
(default) or tomeerschaum.connectors.sql.SQLConnector.read
. - use_pandas (bool, default False):
If
True
, usemeerschaum.connectors.SQLConnector.read
, otherwise usemeerschaum.connectors.sql.SQLConnector.exec
(default). NOTE: This is alwaysTrue
for DuckDB. - **kw (Any):
See
args
.
Returns
- Any value returned from the query.
491def exec( 492 self, 493 query: str, 494 *args: Any, 495 silent: bool = False, 496 debug: bool = False, 497 commit: Optional[bool] = None, 498 close: Optional[bool] = None, 499 with_connection: bool = False, 500 _connection=None, 501 _transaction=None, 502 **kw: Any 503) -> Union[ 504 sqlalchemy.engine.result.resultProxy, 505 sqlalchemy.engine.cursor.LegacyCursorResult, 506 Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection], 507 Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection], 508 None 509]: 510 """ 511 Execute SQL code and return the `sqlalchemy` result, e.g. when calling stored procedures. 512 513 If inserting data, please use bind variables to avoid SQL injection! 514 515 Parameters 516 ---------- 517 query: Union[str, List[str], Tuple[str]] 518 The query to execute. 519 If `query` is a list or tuple, call `self.exec_queries()` instead. 520 521 args: Any 522 Arguments passed to `sqlalchemy.engine.execute`. 523 524 silent: bool, default False 525 If `True`, suppress warnings. 526 527 commit: Optional[bool], default None 528 If `True`, commit the changes after execution. 529 Causes issues with flavors like `'mssql'`. 530 This does not apply if `query` is a list of strings. 531 532 close: Optional[bool], default None 533 If `True`, close the connection after execution. 534 Causes issues with flavors like `'mssql'`. 535 This does not apply if `query` is a list of strings. 536 537 with_connection: bool, default False 538 If `True`, return a tuple including the connection object. 539 This does not apply if `query` is a list of strings. 540 541 Returns 542 ------- 543 The `sqlalchemy` result object, or a tuple with the connection if `with_connection` is provided. 544 545 """ 546 if isinstance(query, (list, tuple)): 547 return self.exec_queries( 548 list(query), 549 *args, 550 silent=silent, 551 debug=debug, 552 **kw 553 ) 554 555 from meerschaum.utils.packages import attempt_import 556 sqlalchemy = attempt_import("sqlalchemy", lazy=False) 557 if debug: 558 dprint(f"[{self}] Executing query:\n{query}") 559 560 _close = close if close is not None else (self.flavor != 'mssql') 561 _commit = commit if commit is not None else ( 562 (self.flavor != 'mssql' or 'select' not in str(query).lower()) 563 ) 564 565 ### Select and Insert objects need to be compiled (SQLAlchemy 2.0.0+). 566 if not hasattr(query, 'compile'): 567 query = sqlalchemy.text(query) 568 569 connection = _connection if _connection is not None else self.get_connection() 570 571 try: 572 transaction = ( 573 _transaction 574 if _transaction is not None else ( 575 connection.begin() 576 if _commit 577 else None 578 ) 579 ) 580 except sqlalchemy.exc.InvalidRequestError as e: 581 if _connection is not None or _transaction is not None: 582 raise e 583 connection = self.get_connection(rebuild=True) 584 transaction = connection.begin() 585 586 if transaction is not None and not transaction.is_active and _transaction is not None: 587 connection = self.get_connection(rebuild=True) 588 transaction = connection.begin() if _commit else None 589 590 result = None 591 try: 592 result = connection.execute(query, *args, **kw) 593 if _commit: 594 transaction.commit() 595 except Exception as e: 596 if debug: 597 dprint(f"[{self}] Failed to execute query:\n\n{query}\n\n{e}") 598 if not silent: 599 warn(str(e), stacklevel=3) 600 result = None 601 if _commit: 602 transaction.rollback() 603 connection = self.get_connection(rebuild=True) 604 finally: 605 if _close: 606 connection.close() 607 608 if with_connection: 609 return result, connection 610 611 return result
Execute SQL code and return the sqlalchemy
result, e.g. when calling stored procedures.
If inserting data, please use bind variables to avoid SQL injection!
Parameters
- query (Union[str, List[str], Tuple[str]]):
The query to execute.
If
query
is a list or tuple, callself.exec_queries()
instead. - args (Any):
Arguments passed to
sqlalchemy.engine.execute
. - silent (bool, default False):
If
True
, suppress warnings. - commit (Optional[bool], default None):
If
True
, commit the changes after execution. Causes issues with flavors like'mssql'
. This does not apply ifquery
is a list of strings. - close (Optional[bool], default None):
If
True
, close the connection after execution. Causes issues with flavors like'mssql'
. This does not apply ifquery
is a list of strings. - with_connection (bool, default False):
If
True
, return a tuple including the connection object. This does not apply ifquery
is a list of strings.
Returns
- The
sqlalchemy
result object, or a tuple with the connection ifwith_connection
is provided.
480def execute( 481 self, 482 *args : Any, 483 **kw : Any 484) -> Optional[sqlalchemy.engine.result.resultProxy]: 485 """ 486 An alias for `meerschaum.connectors.sql.SQLConnector.exec`. 487 """ 488 return self.exec(*args, **kw)
An alias for meerschaum.connectors.sql.SQLConnector.exec
.
709def to_sql( 710 self, 711 df: pandas.DataFrame, 712 name: str = None, 713 index: bool = False, 714 if_exists: str = 'replace', 715 method: str = "", 716 chunksize: Optional[int] = -1, 717 schema: Optional[str] = None, 718 safe_copy: bool = True, 719 silent: bool = False, 720 debug: bool = False, 721 as_tuple: bool = False, 722 as_dict: bool = False, 723 _connection=None, 724 _transaction=None, 725 **kw 726) -> Union[bool, SuccessTuple]: 727 """ 728 Upload a DataFrame's contents to the SQL server. 729 730 Parameters 731 ---------- 732 df: pd.DataFrame 733 The DataFrame to be inserted. 734 735 name: str 736 The name of the table to be created. 737 738 index: bool, default False 739 If True, creates the DataFrame's indices as columns. 740 741 if_exists: str, default 'replace' 742 Drop and create the table ('replace') or append if it exists 743 ('append') or raise Exception ('fail'). 744 Options are ['replace', 'append', 'fail']. 745 746 method: str, default '' 747 None or multi. Details on pandas.to_sql. 748 749 chunksize: Optional[int], default -1 750 How many rows to insert at a time. 751 752 schema: Optional[str], default None 753 Optionally override the schema for the table. 754 Defaults to `SQLConnector.schema`. 755 756 safe_copy: bool, defaul True 757 If `True`, copy the dataframe before making any changes. 758 759 as_tuple: bool, default False 760 If `True`, return a (success_bool, message) tuple instead of a `bool`. 761 Defaults to `False`. 762 763 as_dict: bool, default False 764 If `True`, return a dictionary of transaction information. 765 The keys are `success`, `msg`, `start`, `end`, `duration`, `num_rows`, `chunksize`, 766 `method`, and `target`. 767 768 kw: Any 769 Additional arguments will be passed to the DataFrame's `to_sql` function 770 771 Returns 772 ------- 773 Either a `bool` or a `SuccessTuple` (depends on `as_tuple`). 774 """ 775 import time 776 import json 777 from datetime import timedelta 778 from meerschaum.utils.warnings import error, warn 779 import warnings 780 import functools 781 782 if name is None: 783 error(f"Name must not be `None` to insert data into {self}.") 784 785 ### We're requiring `name` to be positional, and sometimes it's passed in from background jobs. 786 kw.pop('name', None) 787 788 schema = schema or self.schema 789 790 from meerschaum.utils.sql import ( 791 sql_item_name, 792 table_exists, 793 json_flavors, 794 truncate_item_name, 795 DROP_IF_EXISTS_FLAVORS, 796 ) 797 from meerschaum.utils.dataframe import ( 798 get_json_cols, 799 get_numeric_cols, 800 get_uuid_cols, 801 get_bytes_cols, 802 get_geometry_cols, 803 ) 804 from meerschaum.utils.dtypes import ( 805 are_dtypes_equal, 806 coerce_timezone, 807 encode_bytes_for_bytea, 808 serialize_bytes, 809 serialize_decimal, 810 serialize_geometry, 811 json_serialize_value, 812 get_geometry_type_srid, 813 ) 814 from meerschaum.utils.dtypes.sql import ( 815 PD_TO_SQLALCHEMY_DTYPES_FLAVORS, 816 get_db_type_from_pd_type, 817 get_pd_type_from_db_type, 818 get_numeric_precision_scale, 819 ) 820 from meerschaum.utils.misc import interval_str 821 from meerschaum.connectors.sql._create_engine import flavor_configs 822 from meerschaum.utils.packages import attempt_import, import_pandas 823 sqlalchemy = attempt_import('sqlalchemy', debug=debug, lazy=False) 824 pd = import_pandas() 825 is_dask = 'dask' in df.__module__ 826 827 bytes_cols = get_bytes_cols(df) 828 numeric_cols = get_numeric_cols(df) 829 geometry_cols = get_geometry_cols(df) 830 ### NOTE: This excludes non-numeric serialized Decimals (e.g. SQLite). 831 numeric_cols_dtypes = { 832 col: typ 833 for col, typ in kw.get('dtype', {}).items() 834 if ( 835 col in df.columns 836 and 'numeric' in str(typ).lower() 837 ) 838 } 839 numeric_cols.extend([col for col in numeric_cols_dtypes if col not in numeric_cols]) 840 numeric_cols_precisions_scales = { 841 col: ( 842 (typ.precision, typ.scale) 843 if hasattr(typ, 'precision') 844 else get_numeric_precision_scale(self.flavor) 845 ) 846 for col, typ in numeric_cols_dtypes.items() 847 } 848 geometry_cols_dtypes = { 849 col: typ 850 for col, typ in kw.get('dtype', {}).items() 851 if ( 852 col in df.columns 853 and 'geometry' in str(typ).lower() or 'geography' in str(typ).lower() 854 ) 855 } 856 geometry_cols.extend([col for col in geometry_cols_dtypes if col not in geometry_cols]) 857 geometry_cols_types_srids = { 858 col: (typ.geometry_type, typ.srid) 859 if hasattr(typ, 'srid') 860 else get_geometry_type_srid() 861 for col, typ in geometry_cols_dtypes.items() 862 } 863 864 cols_pd_types = { 865 col: get_pd_type_from_db_type(str(typ)) 866 for col, typ in kw.get('dtype', {}).items() 867 } 868 cols_pd_types.update({ 869 col: f'numeric[{precision},{scale}]' 870 for col, (precision, scale) in numeric_cols_precisions_scales.items() 871 if precision and scale 872 }) 873 cols_db_types = { 874 col: get_db_type_from_pd_type(typ, flavor=self.flavor) 875 for col, typ in cols_pd_types.items() 876 } 877 878 enable_bulk_insert = mrsm.get_config( 879 'system', 'connectors', 'sql', 'bulk_insert', self.flavor, 880 warn=False, 881 ) or False 882 stats = {'target': name} 883 ### resort to defaults if None 884 copied = False 885 use_bulk_insert = False 886 if method == "": 887 if enable_bulk_insert: 888 method = ( 889 functools.partial(mssql_insert_json, cols_types=cols_db_types, debug=debug) 890 if self.flavor == 'mssql' 891 else functools.partial(psql_insert_copy, debug=debug) 892 ) 893 use_bulk_insert = True 894 else: 895 ### Should resolve to 'multi' or `None`. 896 method = flavor_configs.get(self.flavor, {}).get('to_sql', {}).get('method', 'multi') 897 898 if bytes_cols and (use_bulk_insert or self.flavor == 'oracle'): 899 if safe_copy and not copied: 900 df = df.copy() 901 copied = True 902 bytes_serializer = ( 903 functools.partial(encode_bytes_for_bytea, with_prefix=(self.flavor != 'oracle')) 904 if self.flavor != 'mssql' 905 else serialize_bytes 906 ) 907 for col in bytes_cols: 908 df[col] = df[col].apply(bytes_serializer) 909 910 ### Check for numeric columns. 911 for col in numeric_cols: 912 precision, scale = numeric_cols_precisions_scales.get( 913 col, 914 get_numeric_precision_scale(self.flavor) 915 ) 916 df[col] = df[col].apply( 917 functools.partial( 918 serialize_decimal, 919 quantize=True, 920 precision=precision, 921 scale=scale, 922 ) 923 ) 924 925 for col in geometry_cols: 926 geometry_type, srid = geometry_cols_types_srids.get(col, get_geometry_type_srid()) 927 with warnings.catch_warnings(): 928 warnings.simplefilter("ignore") 929 df[col] = df[col].apply( 930 functools.partial( 931 serialize_geometry, 932 as_wkt=(self.flavor == 'mssql') 933 ) 934 ) 935 936 stats['method'] = method.__name__ if hasattr(method, '__name__') else str(method) 937 938 default_chunksize = self._sys_config.get('chunksize', None) 939 chunksize = chunksize if chunksize != -1 else default_chunksize 940 if chunksize is not None and self.flavor in _max_chunks_flavors: 941 if chunksize > _max_chunks_flavors[self.flavor]: 942 if chunksize != default_chunksize: 943 warn( 944 f"The specified chunksize of {chunksize} exceeds the maximum of " 945 + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n" 946 + f" Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.", 947 stacklevel = 3, 948 ) 949 chunksize = _max_chunks_flavors[self.flavor] 950 stats['chunksize'] = chunksize 951 952 success, msg = False, "Default to_sql message" 953 start = time.perf_counter() 954 if debug: 955 msg = f"[{self}] Inserting {len(df)} rows with chunksize: {chunksize}..." 956 print(msg, end="", flush=True) 957 stats['num_rows'] = len(df) 958 959 ### Check if the name is too long. 960 truncated_name = truncate_item_name(name, self.flavor) 961 if name != truncated_name: 962 warn( 963 f"Table '{name}' is too long for '{self.flavor}'," 964 f" will instead create the table '{truncated_name}'." 965 ) 966 967 ### filter out non-pandas args 968 import inspect 969 to_sql_params = inspect.signature(df.to_sql).parameters 970 to_sql_kw = {} 971 for k, v in kw.items(): 972 if k in to_sql_params: 973 to_sql_kw[k] = v 974 975 to_sql_kw.update({ 976 'name': truncated_name, 977 'schema': schema, 978 ('con' if not is_dask else 'uri'): (self.engine if not is_dask else self.URI), 979 'index': index, 980 'if_exists': if_exists, 981 'method': method, 982 'chunksize': chunksize, 983 }) 984 if is_dask: 985 to_sql_kw.update({ 986 'parallel': True, 987 }) 988 elif _connection is not None: 989 to_sql_kw['con'] = _connection 990 991 if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else "" 992 if self.flavor == 'oracle': 993 ### For some reason 'replace' doesn't work properly in pandas, 994 ### so try dropping first. 995 if if_exists == 'replace' and table_exists(name, self, schema=schema, debug=debug): 996 success = self.exec( 997 f"DROP TABLE {if_exists_str}" + sql_item_name(name, 'oracle', schema) 998 ) is not None 999 if not success: 1000 warn(f"Unable to drop {name}") 1001 1002 ### Enforce NVARCHAR(2000) as text instead of CLOB. 1003 dtype = to_sql_kw.get('dtype', {}) 1004 for col, typ in df.dtypes.items(): 1005 if are_dtypes_equal(str(typ), 'object'): 1006 dtype[col] = sqlalchemy.types.NVARCHAR(2000) 1007 elif are_dtypes_equal(str(typ), 'int'): 1008 dtype[col] = sqlalchemy.types.INTEGER 1009 to_sql_kw['dtype'] = dtype 1010 elif self.flavor == 'duckdb': 1011 dtype = to_sql_kw.get('dtype', {}) 1012 dt_cols = [col for col, typ in df.dtypes.items() if are_dtypes_equal(str(typ), 'datetime')] 1013 for col in dt_cols: 1014 df[col] = coerce_timezone(df[col], strip_utc=False) 1015 elif self.flavor == 'mssql': 1016 dtype = to_sql_kw.get('dtype', {}) 1017 dt_cols = [col for col, typ in df.dtypes.items() if are_dtypes_equal(str(typ), 'datetime')] 1018 new_dtype = {} 1019 for col in dt_cols: 1020 if col in dtype: 1021 continue 1022 dt_typ = get_db_type_from_pd_type(str(df.dtypes[col]), self.flavor, as_sqlalchemy=True) 1023 if col not in dtype: 1024 new_dtype[col] = dt_typ 1025 1026 dtype.update(new_dtype) 1027 to_sql_kw['dtype'] = dtype 1028 1029 ### Check for JSON columns. 1030 if self.flavor not in json_flavors: 1031 json_cols = get_json_cols(df) 1032 for col in json_cols: 1033 df[col] = df[col].apply( 1034 ( 1035 lambda x: json.dumps(x, default=json_serialize_value, sort_keys=True) 1036 if not isinstance(x, Hashable) 1037 else x 1038 ) 1039 ) 1040 1041 if PD_TO_SQLALCHEMY_DTYPES_FLAVORS['uuid'].get(self.flavor, None) != 'Uuid': 1042 uuid_cols = get_uuid_cols(df) 1043 for col in uuid_cols: 1044 df[col] = df[col].astype(str) 1045 1046 try: 1047 with warnings.catch_warnings(): 1048 warnings.filterwarnings('ignore') 1049 df.to_sql(**to_sql_kw) 1050 success = True 1051 except Exception as e: 1052 if not silent: 1053 warn(str(e)) 1054 success, msg = False, str(e) 1055 1056 end = time.perf_counter() 1057 if success: 1058 num_rows = len(df) 1059 msg = ( 1060 f"It took {interval_str(timedelta(seconds=(end - start)))} " 1061 + f"to sync {num_rows:,} row" 1062 + ('s' if num_rows != 1 else '') 1063 + f" to {name}." 1064 ) 1065 stats['start'] = start 1066 stats['end'] = end 1067 stats['duration'] = end - start 1068 1069 if debug: 1070 print(" done.", flush=True) 1071 dprint(msg) 1072 1073 stats['success'] = success 1074 stats['msg'] = msg 1075 if as_tuple: 1076 return success, msg 1077 if as_dict: 1078 return stats 1079 return success
Upload a DataFrame's contents to the SQL server.
Parameters
- df (pd.DataFrame): The DataFrame to be inserted.
- name (str): The name of the table to be created.
- index (bool, default False): If True, creates the DataFrame's indices as columns.
- if_exists (str, default 'replace'): Drop and create the table ('replace') or append if it exists ('append') or raise Exception ('fail'). Options are ['replace', 'append', 'fail'].
- method (str, default ''): None or multi. Details on pandas.to_sql.
- chunksize (Optional[int], default -1): How many rows to insert at a time.
- schema (Optional[str], default None):
Optionally override the schema for the table.
Defaults to
SQLConnector.schema
. - safe_copy (bool, defaul True):
If
True
, copy the dataframe before making any changes. - as_tuple (bool, default False):
If
True
, return a (success_bool, message) tuple instead of abool
. Defaults toFalse
. - as_dict (bool, default False):
If
True
, return a dictionary of transaction information. The keys aresuccess
,msg
,start
,end
,duration
,num_rows
,chunksize
,method
, andtarget
. - kw (Any):
Additional arguments will be passed to the DataFrame's
to_sql
function
Returns
- Either a
bool
or aSuccessTuple
(depends onas_tuple
).
614def exec_queries( 615 self, 616 queries: List[ 617 Union[ 618 str, 619 Tuple[str, Callable[['sqlalchemy.orm.session.Session'], List[str]]] 620 ] 621 ], 622 break_on_error: bool = False, 623 rollback: bool = True, 624 silent: bool = False, 625 debug: bool = False, 626) -> List[Union[sqlalchemy.engine.cursor.CursorResult, None]]: 627 """ 628 Execute a list of queries in a single transaction. 629 630 Parameters 631 ---------- 632 queries: List[ 633 Union[ 634 str, 635 Tuple[str, Callable[[], List[str]]] 636 ] 637 ] 638 The queries in the transaction to be executed. 639 If a query is a tuple, the second item of the tuple 640 will be considered a callable hook that returns a list of queries to be executed 641 before the next item in the list. 642 643 break_on_error: bool, default False 644 If `True`, stop executing when a query fails. 645 646 rollback: bool, default True 647 If `break_on_error` is `True`, rollback the transaction if a query fails. 648 649 silent: bool, default False 650 If `True`, suppress warnings. 651 652 Returns 653 ------- 654 A list of SQLAlchemy results. 655 """ 656 from meerschaum.utils.warnings import warn 657 from meerschaum.utils.debug import dprint 658 from meerschaum.utils.packages import attempt_import 659 sqlalchemy, sqlalchemy_orm = attempt_import('sqlalchemy', 'sqlalchemy.orm', lazy=False) 660 session = sqlalchemy_orm.Session(self.engine) 661 662 result = None 663 results = [] 664 with session.begin(): 665 for query in queries: 666 hook = None 667 result = None 668 669 if isinstance(query, tuple): 670 query, hook = query 671 if isinstance(query, str): 672 query = sqlalchemy.text(query) 673 674 if debug: 675 dprint(f"[{self}]\n" + str(query)) 676 677 try: 678 result = session.execute(query) 679 session.flush() 680 except Exception as e: 681 msg = (f"Encountered error while executing:\n{e}") 682 if not silent: 683 warn(msg) 684 elif debug: 685 dprint(f"[{self}]\n" + str(msg)) 686 result = None 687 if result is None and break_on_error: 688 if rollback: 689 session.rollback() 690 results.append(result) 691 break 692 elif result is not None and hook is not None: 693 hook_queries = hook(session) 694 if hook_queries: 695 hook_results = self.exec_queries( 696 hook_queries, 697 break_on_error = break_on_error, 698 rollback=rollback, 699 silent=silent, 700 debug=debug, 701 ) 702 result = (result, hook_results) 703 704 results.append(result) 705 706 return results
Execute a list of queries in a single transaction.
Parameters
- queries (List[): Union[ str, Tuple[str, Callable[[], List[str]]] ]
- ]: The queries in the transaction to be executed. If a query is a tuple, the second item of the tuple will be considered a callable hook that returns a list of queries to be executed before the next item in the list.
- break_on_error (bool, default False):
If
True
, stop executing when a query fails. - rollback (bool, default True):
If
break_on_error
isTrue
, rollback the transaction if a query fails. - silent (bool, default False):
If
True
, suppress warnings.
Returns
- A list of SQLAlchemy results.
1262def get_connection(self, rebuild: bool = False) -> 'sqlalchemy.engine.base.Connection': 1263 """ 1264 Return the current alive connection. 1265 1266 Parameters 1267 ---------- 1268 rebuild: bool, default False 1269 If `True`, close the previous connection and open a new one. 1270 1271 Returns 1272 ------- 1273 A `sqlalchemy.engine.base.Connection` object. 1274 """ 1275 import threading 1276 if '_thread_connections' not in self.__dict__: 1277 self.__dict__['_thread_connections'] = {} 1278 1279 self._cleanup_connections() 1280 1281 thread_id = threading.get_ident() 1282 1283 thread_connections = self.__dict__.get('_thread_connections', {}) 1284 connection = thread_connections.get(thread_id, None) 1285 1286 if rebuild and connection is not None: 1287 try: 1288 connection.close() 1289 except Exception: 1290 pass 1291 1292 _ = thread_connections.pop(thread_id, None) 1293 connection = None 1294 1295 if connection is None or connection.closed: 1296 connection = self.engine.connect() 1297 thread_connections[thread_id] = connection 1298 1299 return connection
Return the current alive connection.
Parameters
- rebuild (bool, default False):
If
True
, close the previous connection and open a new one.
Returns
- A
sqlalchemy.engine.base.Connection
object.
717def test_connection( 718 self, 719 **kw: Any 720) -> Union[bool, None]: 721 """ 722 Test if a successful connection to the database may be made. 723 724 Parameters 725 ---------- 726 **kw: 727 The keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`. 728 729 Returns 730 ------- 731 `True` if a connection is made, otherwise `False` or `None` in case of failure. 732 733 """ 734 import warnings 735 from meerschaum.connectors.poll import retry_connect 736 _default_kw = {'max_retries': 1, 'retry_wait': 0, 'warn': False, 'connector': self} 737 _default_kw.update(kw) 738 with warnings.catch_warnings(): 739 warnings.filterwarnings('ignore', 'Could not') 740 try: 741 return retry_connect(**_default_kw) 742 except Exception: 743 return False
Test if a successful connection to the database may be made.
Parameters
- **kw:: The keyword arguments are passed to
meerschaum.connectors.poll.retry_connect
.
Returns
True
if a connection is made, otherwiseFalse
orNone
in case of failure.
18def fetch( 19 self, 20 pipe: mrsm.Pipe, 21 begin: Union[datetime, int, str, None] = '', 22 end: Union[datetime, int, str, None] = None, 23 check_existing: bool = True, 24 chunksize: Optional[int] = -1, 25 workers: Optional[int] = None, 26 debug: bool = False, 27 **kw: Any 28) -> Union['pd.DataFrame', List[Any], None]: 29 """Execute the SQL definition and return a Pandas DataFrame. 30 31 Parameters 32 ---------- 33 pipe: mrsm.Pipe 34 The pipe object which contains the `fetch` metadata. 35 36 - pipe.columns['datetime']: str 37 - Name of the datetime column for the remote table. 38 - pipe.parameters['fetch']: Dict[str, Any] 39 - Parameters necessary to execute a query. 40 - pipe.parameters['fetch']['definition']: str 41 - Raw SQL query to execute to generate the pandas DataFrame. 42 - pipe.parameters['fetch']['backtrack_minutes']: Union[int, float] 43 - How many minutes before `begin` to search for data (*optional*). 44 45 begin: Union[datetime, int, str, None], default None 46 Most recent datatime to search for data. 47 If `backtrack_minutes` is provided, subtract `backtrack_minutes`. 48 49 end: Union[datetime, int, str, None], default None 50 The latest datetime to search for data. 51 If `end` is `None`, do not bound 52 53 check_existing: bool, defult True 54 If `False`, use a backtrack interval of 0 minutes. 55 56 chunksize: Optional[int], default -1 57 How many rows to load into memory at once. 58 Otherwise the entire result set is loaded into memory. 59 60 workers: Optional[int], default None 61 How many threads to use when consuming the generator. 62 Defaults to the number of cores. 63 64 debug: bool, default False 65 Verbosity toggle. 66 67 Returns 68 ------- 69 A pandas DataFrame generator. 70 """ 71 meta_def = self.get_pipe_metadef( 72 pipe, 73 begin=begin, 74 end=end, 75 check_existing=check_existing, 76 debug=debug, 77 **kw 78 ) 79 chunks = self.read( 80 meta_def, 81 chunksize=chunksize, 82 workers=workers, 83 as_iterator=True, 84 debug=debug, 85 ) 86 return chunks
Execute the SQL definition and return a Pandas DataFrame.
Parameters
pipe (mrsm.Pipe): The pipe object which contains the
fetch
metadata.- pipe.columns['datetime']: str
- Name of the datetime column for the remote table.
- pipe.parameters['fetch']: Dict[str, Any]
- Parameters necessary to execute a query.
- pipe.parameters['fetch']['definition']: str
- Raw SQL query to execute to generate the pandas DataFrame.
- pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
- How many minutes before
begin
to search for data (optional).
- How many minutes before
- pipe.columns['datetime']: str
- begin (Union[datetime, int, str, None], default None):
Most recent datatime to search for data.
If
backtrack_minutes
is provided, subtractbacktrack_minutes
. - end (Union[datetime, int, str, None], default None):
The latest datetime to search for data.
If
end
isNone
, do not bound - check_existing (bool, defult True):
If
False
, use a backtrack interval of 0 minutes. - chunksize (Optional[int], default -1): How many rows to load into memory at once. Otherwise the entire result set is loaded into memory.
- workers (Optional[int], default None): How many threads to use when consuming the generator. Defaults to the number of cores.
- debug (bool, default False): Verbosity toggle.
Returns
- A pandas DataFrame generator.
89def get_pipe_metadef( 90 self, 91 pipe: mrsm.Pipe, 92 params: Optional[Dict[str, Any]] = None, 93 begin: Union[datetime, int, str, None] = '', 94 end: Union[datetime, int, str, None] = None, 95 check_existing: bool = True, 96 debug: bool = False, 97 **kw: Any 98) -> Union[str, None]: 99 """ 100 Return a pipe's meta definition fetch query. 101 102 params: Optional[Dict[str, Any]], default None 103 Optional params dictionary to build the `WHERE` clause. 104 See `meerschaum.utils.sql.build_where`. 105 106 begin: Union[datetime, int, str, None], default None 107 Most recent datatime to search for data. 108 If `backtrack_minutes` is provided, subtract `backtrack_minutes`. 109 110 end: Union[datetime, int, str, None], default None 111 The latest datetime to search for data. 112 If `end` is `None`, do not bound 113 114 check_existing: bool, default True 115 If `True`, apply the backtrack interval. 116 117 debug: bool, default False 118 Verbosity toggle. 119 120 Returns 121 ------- 122 A pipe's meta definition fetch query string. 123 """ 124 from meerschaum.utils.warnings import warn 125 from meerschaum.utils.sql import sql_item_name, dateadd_str, build_where 126 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 127 from meerschaum.config import get_config 128 129 dt_col = pipe.columns.get('datetime', None) 130 if not dt_col: 131 dt_col = pipe.guess_datetime() 132 dt_name = sql_item_name(dt_col, self.flavor, None) if dt_col else None 133 is_guess = True 134 else: 135 dt_name = sql_item_name(dt_col, self.flavor, None) 136 is_guess = False 137 dt_typ = pipe.dtypes.get(dt_col, 'datetime') if dt_col else None 138 db_dt_typ = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None 139 140 if begin not in (None, '') or end is not None: 141 if is_guess: 142 if dt_col is None: 143 warn( 144 f"Unable to determine a datetime column for {pipe}." 145 + "\n Ignoring begin and end...", 146 stack=False, 147 ) 148 begin, end = '', None 149 else: 150 warn( 151 f"A datetime wasn't specified for {pipe}.\n" 152 + f" Using column \"{dt_col}\" for datetime bounds...", 153 stack=False 154 ) 155 156 apply_backtrack = begin == '' and check_existing 157 backtrack_interval = pipe.get_backtrack_interval(check_existing=check_existing, debug=debug) 158 btm = ( 159 int(backtrack_interval.total_seconds() / 60) 160 if isinstance(backtrack_interval, timedelta) 161 else backtrack_interval 162 ) 163 begin = ( 164 pipe.get_sync_time(debug=debug) 165 if begin == '' 166 else begin 167 ) 168 169 if begin not in (None, '') and end is not None and begin >= end: 170 begin = None 171 172 if dt_name: 173 begin_da = ( 174 dateadd_str( 175 flavor=self.flavor, 176 datepart='minute', 177 number=((-1 * btm) if apply_backtrack else 0), 178 begin=begin, 179 db_type=db_dt_typ, 180 ) 181 if begin not in ('', None) 182 else None 183 ) 184 end_da = ( 185 dateadd_str( 186 flavor=self.flavor, 187 datepart='minute', 188 number=0, 189 begin=end, 190 db_type=db_dt_typ, 191 ) 192 if end is not None 193 else None 194 ) 195 196 definition_name = sql_item_name('definition', self.flavor, None) 197 meta_def = ( 198 _simple_fetch_query(pipe, self.flavor) if ( 199 (not (pipe.columns or {}).get('id', None)) 200 or (not get_config('system', 'experimental', 'join_fetch')) 201 ) else _join_fetch_query(pipe, self.flavor, debug=debug, **kw) 202 ) 203 204 has_where = 'where' in meta_def.lower()[meta_def.lower().rfind('definition'):] 205 if dt_name and (begin_da or end_da): 206 definition_dt_name = f"{definition_name}.{dt_name}" 207 meta_def += "\n" + ("AND" if has_where else "WHERE") + " " 208 has_where = True 209 if begin_da: 210 meta_def += f"\n {definition_dt_name}\n >=\n {begin_da}\n" 211 if begin_da and end_da: 212 meta_def += " AND" 213 if end_da: 214 meta_def += f"\n {definition_dt_name}\n <\n {end_da}\n" 215 216 if params is not None: 217 params_where = build_where(params, self, with_where=False) 218 meta_def += "\n " + ("AND" if has_where else "WHERE") + " " 219 has_where = True 220 meta_def += params_where 221 222 return meta_def.rstrip()
Return a pipe's meta definition fetch query.
params: Optional[Dict[str, Any]], default None
Optional params dictionary to build the WHERE
clause.
See meerschaum.utils.sql.build_where
.
begin: Union[datetime, int, str, None], default None
Most recent datatime to search for data.
If backtrack_minutes
is provided, subtract backtrack_minutes
.
end: Union[datetime, int, str, None], default None
The latest datetime to search for data.
If end
is None
, do not bound
check_existing: bool, default True
If True
, apply the backtrack interval.
debug: bool, default False Verbosity toggle.
Returns
- A pipe's meta definition fetch query string.
37def cli( 38 self, 39 debug: bool = False, 40) -> SuccessTuple: 41 """ 42 Launch a subprocess for an interactive CLI. 43 """ 44 from meerschaum.utils.warnings import dprint 45 from meerschaum.utils.venv import venv_exec 46 env = copy.deepcopy(dict(os.environ)) 47 env_key = f"MRSM_SQL_{self.label.upper()}" 48 env_val = json.dumps(self.meta) 49 env[env_key] = env_val 50 cli_code = ( 51 "import sys\n" 52 "import meerschaum as mrsm\n" 53 "import os\n" 54 f"conn = mrsm.get_connector('sql:{self.label}')\n" 55 "success, msg = conn._cli_exit()\n" 56 "mrsm.pprint((success, msg))\n" 57 "if not success:\n" 58 " raise Exception(msg)" 59 ) 60 if debug: 61 dprint(cli_code) 62 try: 63 _ = venv_exec(cli_code, venv=None, env=env, debug=debug, capture_output=False) 64 except Exception as e: 65 return False, f"[{self}] Failed to start CLI:\n{e}" 66 return True, "Success"
Launch a subprocess for an interactive CLI.
144def fetch_pipes_keys( 145 self, 146 connector_keys: Optional[List[str]] = None, 147 metric_keys: Optional[List[str]] = None, 148 location_keys: Optional[List[str]] = None, 149 tags: Optional[List[str]] = None, 150 params: Optional[Dict[str, Any]] = None, 151 debug: bool = False 152) -> Optional[List[Tuple[str, str, Optional[str]]]]: 153 """ 154 Return a list of tuples corresponding to the parameters provided. 155 156 Parameters 157 ---------- 158 connector_keys: Optional[List[str]], default None 159 List of connector_keys to search by. 160 161 metric_keys: Optional[List[str]], default None 162 List of metric_keys to search by. 163 164 location_keys: Optional[List[str]], default None 165 List of location_keys to search by. 166 167 params: Optional[Dict[str, Any]], default None 168 Dictionary of additional parameters to search by. 169 E.g. `--params pipe_id:1` 170 171 debug: bool, default False 172 Verbosity toggle. 173 """ 174 from meerschaum.utils.debug import dprint 175 from meerschaum.utils.packages import attempt_import 176 from meerschaum.utils.misc import separate_negation_values 177 from meerschaum.utils.sql import OMIT_NULLSFIRST_FLAVORS, table_exists 178 from meerschaum.config.static import STATIC_CONFIG 179 import json 180 from copy import deepcopy 181 sqlalchemy, sqlalchemy_sql_functions = attempt_import( 182 'sqlalchemy', 183 'sqlalchemy.sql.functions', lazy=False, 184 ) 185 coalesce = sqlalchemy_sql_functions.coalesce 186 187 if connector_keys is None: 188 connector_keys = [] 189 if metric_keys is None: 190 metric_keys = [] 191 if location_keys is None: 192 location_keys = [] 193 else: 194 location_keys = [ 195 ( 196 lk 197 if lk not in ('[None]', 'None', 'null') 198 else 'None' 199 ) 200 for lk in location_keys 201 ] 202 if tags is None: 203 tags = [] 204 205 if params is None: 206 params = {} 207 208 ### Add three primary keys to params dictionary 209 ### (separated for convenience of arguments). 210 cols = { 211 'connector_keys': [str(ck) for ck in connector_keys], 212 'metric_key': [str(mk) for mk in metric_keys], 213 'location_key': [str(lk) for lk in location_keys], 214 } 215 216 ### Make deep copy so we don't mutate this somewhere else. 217 parameters = deepcopy(params) 218 for col, vals in cols.items(): 219 if vals not in [[], ['*']]: 220 parameters[col] = vals 221 222 if not table_exists('mrsm_pipes', self, schema=self.instance_schema, debug=debug): 223 return [] 224 225 from meerschaum.connectors.sql.tables import get_tables 226 pipes_tbl = get_tables(mrsm_instance=self, create=False, debug=debug)['pipes'] 227 228 _params = {} 229 for k, v in parameters.items(): 230 _v = json.dumps(v) if isinstance(v, dict) else v 231 _params[k] = _v 232 233 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 234 ### Parse regular params. 235 ### If a param begins with '_', negate it instead. 236 _where = [ 237 ( 238 (coalesce(pipes_tbl.c[key], 'None') == val) 239 if not str(val).startswith(negation_prefix) 240 else (pipes_tbl.c[key] != key) 241 ) for key, val in _params.items() 242 if not isinstance(val, (list, tuple)) and key in pipes_tbl.c 243 ] 244 select_cols = ( 245 [ 246 pipes_tbl.c.connector_keys, 247 pipes_tbl.c.metric_key, 248 pipes_tbl.c.location_key, 249 ] 250 ) 251 252 q = sqlalchemy.select(*select_cols).where(sqlalchemy.and_(True, *_where)) 253 for c, vals in cols.items(): 254 if not isinstance(vals, (list, tuple)) or not vals or c not in pipes_tbl.c: 255 continue 256 _in_vals, _ex_vals = separate_negation_values(vals) 257 q = q.where(coalesce(pipes_tbl.c[c], 'None').in_(_in_vals)) if _in_vals else q 258 q = q.where(coalesce(pipes_tbl.c[c], 'None').not_in(_ex_vals)) if _ex_vals else q 259 260 ### Finally, parse tags. 261 tag_groups = [tag.split(',') for tag in tags] 262 in_ex_tag_groups = [separate_negation_values(tag_group) for tag_group in tag_groups] 263 264 ors, nands = [], [] 265 for _in_tags, _ex_tags in in_ex_tag_groups: 266 sub_ands = [] 267 for nt in _in_tags: 268 sub_ands.append( 269 sqlalchemy.cast( 270 pipes_tbl.c['parameters'], 271 sqlalchemy.String, 272 ).like(f'%"tags":%"{nt}"%') 273 ) 274 if sub_ands: 275 ors.append(sqlalchemy.and_(*sub_ands)) 276 277 for xt in _ex_tags: 278 nands.append( 279 sqlalchemy.cast( 280 pipes_tbl.c['parameters'], 281 sqlalchemy.String, 282 ).not_like(f'%"tags":%"{xt}"%') 283 ) 284 285 q = q.where(sqlalchemy.and_(*nands)) if nands else q 286 q = q.where(sqlalchemy.or_(*ors)) if ors else q 287 loc_asc = sqlalchemy.asc(pipes_tbl.c['location_key']) 288 if self.flavor not in OMIT_NULLSFIRST_FLAVORS: 289 loc_asc = sqlalchemy.nullsfirst(loc_asc) 290 q = q.order_by( 291 sqlalchemy.asc(pipes_tbl.c['connector_keys']), 292 sqlalchemy.asc(pipes_tbl.c['metric_key']), 293 loc_asc, 294 ) 295 296 ### execute the query and return a list of tuples 297 if debug: 298 dprint(q.compile(compile_kwargs={'literal_binds': True})) 299 try: 300 rows = ( 301 self.execute(q).fetchall() 302 if self.flavor != 'duckdb' 303 else [ 304 (row['connector_keys'], row['metric_key'], row['location_key']) 305 for row in self.read(q).to_dict(orient='records') 306 ] 307 ) 308 except Exception as e: 309 error(str(e)) 310 311 return [(row[0], row[1], row[2]) for row in rows]
Return a list of tuples corresponding to the parameters provided.
Parameters
- connector_keys (Optional[List[str]], default None): List of connector_keys to search by.
- metric_keys (Optional[List[str]], default None): List of metric_keys to search by.
- location_keys (Optional[List[str]], default None): List of location_keys to search by.
- params (Optional[Dict[str, Any]], default None):
Dictionary of additional parameters to search by.
E.g.
--params pipe_id:1
- debug (bool, default False): Verbosity toggle.
332def create_indices( 333 self, 334 pipe: mrsm.Pipe, 335 columns: Optional[List[str]] = None, 336 indices: Optional[List[str]] = None, 337 debug: bool = False 338) -> bool: 339 """ 340 Create a pipe's indices. 341 """ 342 from meerschaum.utils.debug import dprint 343 if debug: 344 dprint(f"Creating indices for {pipe}...") 345 346 if not pipe.indices: 347 warn(f"{pipe} has no index columns; skipping index creation.", stack=False) 348 return True 349 350 cols_to_include = set((columns or []) + (indices or [])) or None 351 352 _ = pipe.__dict__.pop('_columns_indices', None) 353 ix_queries = { 354 col: queries 355 for col, queries in self.get_create_index_queries(pipe, debug=debug).items() 356 if cols_to_include is None or col in cols_to_include 357 } 358 success = True 359 for col, queries in ix_queries.items(): 360 ix_success = all(self.exec_queries(queries, debug=debug, silent=False)) 361 success = success and ix_success 362 if not ix_success: 363 warn(f"Failed to create index on column: {col}") 364 365 return success
Create a pipe's indices.
386def drop_indices( 387 self, 388 pipe: mrsm.Pipe, 389 columns: Optional[List[str]] = None, 390 indices: Optional[List[str]] = None, 391 debug: bool = False 392) -> bool: 393 """ 394 Drop a pipe's indices. 395 """ 396 from meerschaum.utils.debug import dprint 397 if debug: 398 dprint(f"Dropping indices for {pipe}...") 399 400 if not pipe.indices: 401 warn(f"No indices to drop for {pipe}.", stack=False) 402 return False 403 404 cols_to_include = set((columns or []) + (indices or [])) or None 405 406 ix_queries = { 407 col: queries 408 for col, queries in self.get_drop_index_queries(pipe, debug=debug).items() 409 if cols_to_include is None or col in cols_to_include 410 } 411 success = True 412 for col, queries in ix_queries.items(): 413 ix_success = all(self.exec_queries(queries, debug=debug, silent=(not debug))) 414 if not ix_success: 415 success = False 416 if debug: 417 dprint(f"Failed to drop index on column: {col}") 418 return success
Drop a pipe's indices.
474def get_create_index_queries( 475 self, 476 pipe: mrsm.Pipe, 477 debug: bool = False, 478) -> Dict[str, List[str]]: 479 """ 480 Return a dictionary mapping columns to a `CREATE INDEX` or equivalent query. 481 482 Parameters 483 ---------- 484 pipe: mrsm.Pipe 485 The pipe to which the queries will correspond. 486 487 Returns 488 ------- 489 A dictionary of index names mapping to lists of queries. 490 """ 491 ### NOTE: Due to recent breaking changes in DuckDB, indices don't behave properly. 492 if self.flavor == 'duckdb': 493 return {} 494 from meerschaum.utils.sql import ( 495 sql_item_name, 496 get_distinct_col_count, 497 UPDATE_QUERIES, 498 get_null_replacement, 499 get_create_table_queries, 500 get_rename_table_queries, 501 COALESCE_UNIQUE_INDEX_FLAVORS, 502 ) 503 from meerschaum.utils.dtypes import are_dtypes_equal 504 from meerschaum.utils.dtypes.sql import ( 505 get_db_type_from_pd_type, 506 get_pd_type_from_db_type, 507 AUTO_INCREMENT_COLUMN_FLAVORS, 508 ) 509 from meerschaum.config import get_config 510 index_queries = {} 511 512 upsert = pipe.parameters.get('upsert', False) and (self.flavor + '-upsert') in UPDATE_QUERIES 513 static = pipe.parameters.get('static', False) 514 null_indices = pipe.parameters.get('null_indices', True) 515 index_names = pipe.get_indices() 516 unique_index_name_unquoted = index_names.get('unique', None) or f'IX_{pipe.target}_unique' 517 if upsert: 518 _ = index_names.pop('unique', None) 519 indices = pipe.indices 520 existing_cols_types = pipe.get_columns_types(debug=debug) 521 existing_cols_pd_types = { 522 col: get_pd_type_from_db_type(typ) 523 for col, typ in existing_cols_types.items() 524 } 525 existing_cols_indices = self.get_pipe_columns_indices(pipe, debug=debug) 526 existing_ix_names = set() 527 existing_primary_keys = [] 528 existing_clustered_primary_keys = [] 529 for col, col_indices in existing_cols_indices.items(): 530 for col_ix_doc in col_indices: 531 existing_ix_names.add(col_ix_doc.get('name', '').lower()) 532 if col_ix_doc.get('type', None) == 'PRIMARY KEY': 533 existing_primary_keys.append(col.lower()) 534 if col_ix_doc.get('clustered', True): 535 existing_clustered_primary_keys.append(col.lower()) 536 537 _datetime = pipe.get_columns('datetime', error=False) 538 _datetime_name = ( 539 sql_item_name(_datetime, self.flavor, None) 540 if _datetime is not None else None 541 ) 542 _datetime_index_name = ( 543 sql_item_name(index_names['datetime'], flavor=self.flavor, schema=None) 544 if index_names.get('datetime', None) 545 else None 546 ) 547 _id = pipe.get_columns('id', error=False) 548 _id_name = ( 549 sql_item_name(_id, self.flavor, None) 550 if _id is not None 551 else None 552 ) 553 primary_key = pipe.columns.get('primary', None) 554 primary_key_name = ( 555 sql_item_name(primary_key, flavor=self.flavor, schema=None) 556 if primary_key 557 else None 558 ) 559 autoincrement = ( 560 pipe.parameters.get('autoincrement', False) 561 or ( 562 primary_key is not None 563 and primary_key not in existing_cols_pd_types 564 ) 565 ) 566 primary_key_db_type = ( 567 get_db_type_from_pd_type(pipe.dtypes.get(primary_key, 'int') or 'int', self.flavor) 568 if primary_key 569 else None 570 ) 571 primary_key_constraint_name = ( 572 sql_item_name(f'PK_{pipe.target}', self.flavor, None) 573 if primary_key is not None 574 else None 575 ) 576 primary_key_clustered = "CLUSTERED" if _datetime is None else "NONCLUSTERED" 577 datetime_clustered = ( 578 "CLUSTERED" 579 if not existing_clustered_primary_keys and _datetime is not None 580 else "NONCLUSTERED" 581 ) 582 include_columns_str = "\n ,".join( 583 [ 584 sql_item_name(col, flavor=self.flavor) for col in existing_cols_types 585 if col != _datetime 586 ] 587 ).rstrip(',') 588 include_clause = ( 589 ( 590 f"\nINCLUDE (\n {include_columns_str}\n)" 591 ) 592 if datetime_clustered == 'NONCLUSTERED' 593 else '' 594 ) 595 596 _id_index_name = ( 597 sql_item_name(index_names['id'], self.flavor, None) 598 if index_names.get('id', None) 599 else None 600 ) 601 _pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 602 _create_space_partition = get_config('system', 'experimental', 'space') 603 604 ### create datetime index 605 dt_query = None 606 if _datetime is not None: 607 if self.flavor == 'timescaledb' and pipe.parameters.get('hypertable', True): 608 _id_count = ( 609 get_distinct_col_count(_id, f"SELECT {_id_name} FROM {_pipe_name}", self) 610 if (_id is not None and _create_space_partition) else None 611 ) 612 613 chunk_interval = pipe.get_chunk_interval(debug=debug) 614 chunk_interval_minutes = ( 615 chunk_interval 616 if isinstance(chunk_interval, int) 617 else int(chunk_interval.total_seconds() / 60) 618 ) 619 chunk_time_interval = ( 620 f"INTERVAL '{chunk_interval_minutes} MINUTES'" 621 if isinstance(chunk_interval, timedelta) 622 else f'{chunk_interval_minutes}' 623 ) 624 625 dt_query = ( 626 f"SELECT public.create_hypertable('{_pipe_name}', " + 627 f"'{_datetime}', " 628 + ( 629 f"'{_id}', {_id_count}, " if (_id is not None and _create_space_partition) 630 else '' 631 ) 632 + f'chunk_time_interval => {chunk_time_interval}, ' 633 + 'if_not_exists => true, ' 634 + "migrate_data => true);" 635 ) 636 elif _datetime_index_name and _datetime != primary_key: 637 if self.flavor == 'mssql': 638 dt_query = ( 639 f"CREATE {datetime_clustered} INDEX {_datetime_index_name} " 640 f"\nON {_pipe_name} ({_datetime_name}){include_clause}" 641 ) 642 else: 643 dt_query = ( 644 f"CREATE INDEX {_datetime_index_name} " 645 + f"ON {_pipe_name} ({_datetime_name})" 646 ) 647 648 if dt_query: 649 index_queries[_datetime] = [dt_query] 650 651 primary_queries = [] 652 if ( 653 primary_key is not None 654 and primary_key.lower() not in existing_primary_keys 655 and not static 656 ): 657 if autoincrement and primary_key not in existing_cols_pd_types: 658 autoincrement_str = AUTO_INCREMENT_COLUMN_FLAVORS.get( 659 self.flavor, 660 AUTO_INCREMENT_COLUMN_FLAVORS['default'] 661 ) 662 primary_queries.extend([ 663 ( 664 f"ALTER TABLE {_pipe_name}\n" 665 f"ADD {primary_key_name} {primary_key_db_type} {autoincrement_str}" 666 ), 667 ]) 668 elif not autoincrement and primary_key in existing_cols_pd_types: 669 if self.flavor == 'sqlite': 670 new_table_name = sql_item_name( 671 f'_new_{pipe.target}', 672 self.flavor, 673 self.get_pipe_schema(pipe) 674 ) 675 select_cols_str = ', '.join( 676 [ 677 sql_item_name(col, self.flavor, None) 678 for col in existing_cols_types 679 ] 680 ) 681 primary_queries.extend( 682 get_create_table_queries( 683 existing_cols_pd_types, 684 f'_new_{pipe.target}', 685 self.flavor, 686 schema=self.get_pipe_schema(pipe), 687 primary_key=primary_key, 688 ) + [ 689 ( 690 f"INSERT INTO {new_table_name} ({select_cols_str})\n" 691 f"SELECT {select_cols_str}\nFROM {_pipe_name}" 692 ), 693 f"DROP TABLE {_pipe_name}", 694 ] + get_rename_table_queries( 695 f'_new_{pipe.target}', 696 pipe.target, 697 self.flavor, 698 schema=self.get_pipe_schema(pipe), 699 ) 700 ) 701 elif self.flavor == 'oracle': 702 primary_queries.extend([ 703 ( 704 f"ALTER TABLE {_pipe_name}\n" 705 f"MODIFY {primary_key_name} NOT NULL" 706 ), 707 ( 708 f"ALTER TABLE {_pipe_name}\n" 709 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})" 710 ) 711 ]) 712 elif self.flavor in ('mysql', 'mariadb'): 713 primary_queries.extend([ 714 ( 715 f"ALTER TABLE {_pipe_name}\n" 716 f"MODIFY {primary_key_name} {primary_key_db_type} NOT NULL" 717 ), 718 ( 719 f"ALTER TABLE {_pipe_name}\n" 720 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})" 721 ) 722 ]) 723 elif self.flavor == 'timescaledb': 724 primary_queries.extend([ 725 ( 726 f"ALTER TABLE {_pipe_name}\n" 727 f"ALTER COLUMN {primary_key_name} SET NOT NULL" 728 ), 729 ( 730 f"ALTER TABLE {_pipe_name}\n" 731 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY (" + ( 732 f"{_datetime_name}, " if _datetime_name else "" 733 ) + f"{primary_key_name})" 734 ), 735 ]) 736 elif self.flavor in ('citus', 'postgresql', 'duckdb', 'postgis'): 737 primary_queries.extend([ 738 ( 739 f"ALTER TABLE {_pipe_name}\n" 740 f"ALTER COLUMN {primary_key_name} SET NOT NULL" 741 ), 742 ( 743 f"ALTER TABLE {_pipe_name}\n" 744 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})" 745 ), 746 ]) 747 else: 748 primary_queries.extend([ 749 ( 750 f"ALTER TABLE {_pipe_name}\n" 751 f"ALTER COLUMN {primary_key_name} {primary_key_db_type} NOT NULL" 752 ), 753 ( 754 f"ALTER TABLE {_pipe_name}\n" 755 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY {primary_key_clustered} ({primary_key_name})" 756 ), 757 ]) 758 index_queries[primary_key] = primary_queries 759 760 ### create id index 761 if _id_name is not None: 762 if self.flavor == 'timescaledb': 763 ### Already created indices via create_hypertable. 764 id_query = ( 765 None if (_id is not None and _create_space_partition) 766 else ( 767 f"CREATE INDEX IF NOT EXISTS {_id_index_name} ON {_pipe_name} ({_id_name})" 768 if _id is not None 769 else None 770 ) 771 ) 772 pass 773 else: ### mssql, sqlite, etc. 774 id_query = f"CREATE INDEX {_id_index_name} ON {_pipe_name} ({_id_name})" 775 776 if id_query is not None: 777 index_queries[_id] = id_query if isinstance(id_query, list) else [id_query] 778 779 ### Create indices for other labels in `pipe.columns`. 780 other_index_names = { 781 ix_key: ix_unquoted 782 for ix_key, ix_unquoted in index_names.items() 783 if ( 784 ix_key not in ('datetime', 'id', 'primary') 785 and ix_unquoted.lower() not in existing_ix_names 786 ) 787 } 788 for ix_key, ix_unquoted in other_index_names.items(): 789 ix_name = sql_item_name(ix_unquoted, self.flavor, None) 790 cols = indices[ix_key] 791 if not isinstance(cols, (list, tuple)): 792 cols = [cols] 793 if ix_key == 'unique' and upsert: 794 continue 795 cols_names = [sql_item_name(col, self.flavor, None) for col in cols if col] 796 if not cols_names: 797 continue 798 799 cols_names_str = ", ".join(cols_names) 800 index_query_params_clause = f" ({cols_names_str})" 801 if self.flavor == 'postgis': 802 for col in cols: 803 col_typ = existing_cols_pd_types.get(cols[0], 'object') 804 if col_typ != 'object' and are_dtypes_equal(col_typ, 'geometry'): 805 index_query_params_clause = f" USING GIST ({cols_names_str})" 806 break 807 808 index_queries[ix_key] = [ 809 f"CREATE INDEX {ix_name} ON {_pipe_name}{index_query_params_clause}" 810 ] 811 812 indices_cols_str = ', '.join( 813 list({ 814 sql_item_name(ix, self.flavor) 815 for ix_key, ix in pipe.columns.items() 816 if ix and ix in existing_cols_types 817 }) 818 ) 819 coalesce_indices_cols_str = ', '.join( 820 [ 821 ( 822 ( 823 "COALESCE(" 824 + sql_item_name(ix, self.flavor) 825 + ", " 826 + get_null_replacement(existing_cols_types[ix], self.flavor) 827 + ") " 828 ) 829 if ix_key != 'datetime' and null_indices 830 else sql_item_name(ix, self.flavor) 831 ) 832 for ix_key, ix in pipe.columns.items() 833 if ix and ix in existing_cols_types 834 ] 835 ) 836 unique_index_name = sql_item_name(unique_index_name_unquoted, self.flavor) 837 constraint_name_unquoted = unique_index_name_unquoted.replace('IX_', 'UQ_') 838 constraint_name = sql_item_name(constraint_name_unquoted, self.flavor) 839 add_constraint_query = ( 840 f"ALTER TABLE {_pipe_name} ADD CONSTRAINT {constraint_name} UNIQUE ({indices_cols_str})" 841 ) 842 unique_index_cols_str = ( 843 indices_cols_str 844 if self.flavor not in COALESCE_UNIQUE_INDEX_FLAVORS or not null_indices 845 else coalesce_indices_cols_str 846 ) 847 create_unique_index_query = ( 848 f"CREATE UNIQUE INDEX {unique_index_name} ON {_pipe_name} ({unique_index_cols_str})" 849 ) 850 constraint_queries = [create_unique_index_query] 851 if self.flavor != 'sqlite': 852 constraint_queries.append(add_constraint_query) 853 if upsert and indices_cols_str: 854 index_queries[unique_index_name] = constraint_queries 855 return index_queries
Return a dictionary mapping columns to a CREATE INDEX
or equivalent query.
Parameters
- pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
- A dictionary of index names mapping to lists of queries.
858def get_drop_index_queries( 859 self, 860 pipe: mrsm.Pipe, 861 debug: bool = False, 862) -> Dict[str, List[str]]: 863 """ 864 Return a dictionary mapping columns to a `DROP INDEX` or equivalent query. 865 866 Parameters 867 ---------- 868 pipe: mrsm.Pipe 869 The pipe to which the queries will correspond. 870 871 Returns 872 ------- 873 A dictionary of column names mapping to lists of queries. 874 """ 875 ### NOTE: Due to breaking changes within DuckDB, indices must be skipped. 876 if self.flavor == 'duckdb': 877 return {} 878 if not pipe.exists(debug=debug): 879 return {} 880 881 from collections import defaultdict 882 from meerschaum.utils.sql import ( 883 sql_item_name, 884 table_exists, 885 hypertable_queries, 886 DROP_INDEX_IF_EXISTS_FLAVORS, 887 ) 888 drop_queries = defaultdict(lambda: []) 889 schema = self.get_pipe_schema(pipe) 890 index_schema = schema if self.flavor != 'mssql' else None 891 indices = { 892 ix_key: ix 893 for ix_key, ix in pipe.get_indices().items() 894 } 895 cols_indices = pipe.get_columns_indices(debug=debug) 896 existing_indices = set() 897 clustered_ix = None 898 for col, ix_metas in cols_indices.items(): 899 for ix_meta in ix_metas: 900 ix_name = ix_meta.get('name', None) 901 if ix_meta.get('clustered', False): 902 clustered_ix = ix_name 903 existing_indices.add(ix_name.lower()) 904 pipe_name = sql_item_name(pipe.target, self.flavor, schema) 905 pipe_name_no_schema = sql_item_name(pipe.target, self.flavor, None) 906 upsert = pipe.upsert 907 908 if self.flavor not in hypertable_queries: 909 is_hypertable = False 910 else: 911 is_hypertable_query = hypertable_queries[self.flavor].format(table_name=pipe_name) 912 is_hypertable = self.value(is_hypertable_query, silent=True, debug=debug) is not None 913 914 if_exists_str = "IF EXISTS " if self.flavor in DROP_INDEX_IF_EXISTS_FLAVORS else "" 915 if is_hypertable: 916 nuke_queries = [] 917 temp_table = '_' + pipe.target + '_temp_migration' 918 temp_table_name = sql_item_name(temp_table, self.flavor, self.get_pipe_schema(pipe)) 919 920 if table_exists(temp_table, self, schema=self.get_pipe_schema(pipe), debug=debug): 921 nuke_queries.append(f"DROP TABLE {if_exists_str} {temp_table_name}") 922 nuke_queries += [ 923 f"SELECT * INTO {temp_table_name} FROM {pipe_name}", 924 f"DROP TABLE {if_exists_str}{pipe_name}", 925 f"ALTER TABLE {temp_table_name} RENAME TO {pipe_name_no_schema}", 926 ] 927 nuke_ix_keys = ('datetime', 'id') 928 nuked = False 929 for ix_key in nuke_ix_keys: 930 if ix_key in indices and not nuked: 931 drop_queries[ix_key].extend(nuke_queries) 932 nuked = True 933 934 for ix_key, ix_unquoted in indices.items(): 935 if ix_key in drop_queries: 936 continue 937 if ix_unquoted.lower() not in existing_indices: 938 continue 939 940 if ix_key == 'unique' and upsert and self.flavor not in ('sqlite',) and not is_hypertable: 941 constraint_name_unquoted = ix_unquoted.replace('IX_', 'UQ_') 942 constraint_name = sql_item_name(constraint_name_unquoted, self.flavor) 943 constraint_or_index = ( 944 "CONSTRAINT" 945 if self.flavor not in ('mysql', 'mariadb') 946 else 'INDEX' 947 ) 948 drop_queries[ix_key].append( 949 f"ALTER TABLE {pipe_name}\n" 950 f"DROP {constraint_or_index} {constraint_name}" 951 ) 952 953 query = ( 954 ( 955 f"ALTER TABLE {pipe_name}\n" 956 if self.flavor in ('mysql', 'mariadb') 957 else '' 958 ) 959 + f"DROP INDEX {if_exists_str}" 960 + sql_item_name(ix_unquoted, self.flavor, index_schema) 961 ) 962 if self.flavor == 'mssql': 963 query += f"\nON {pipe_name}" 964 if ix_unquoted == clustered_ix: 965 query += "\nWITH (ONLINE = ON, MAXDOP = 4)" 966 drop_queries[ix_key].append(query) 967 968 969 return drop_queries
Return a dictionary mapping columns to a DROP INDEX
or equivalent query.
Parameters
- pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
- A dictionary of column names mapping to lists of queries.
3167def get_add_columns_queries( 3168 self, 3169 pipe: mrsm.Pipe, 3170 df: Union[pd.DataFrame, Dict[str, str]], 3171 _is_db_types: bool = False, 3172 debug: bool = False, 3173) -> List[str]: 3174 """ 3175 Add new null columns of the correct type to a table from a dataframe. 3176 3177 Parameters 3178 ---------- 3179 pipe: mrsm.Pipe 3180 The pipe to be altered. 3181 3182 df: Union[pd.DataFrame, Dict[str, str]] 3183 The pandas DataFrame which contains new columns. 3184 If a dictionary is provided, assume it maps columns to Pandas data types. 3185 3186 _is_db_types: bool, default False 3187 If `True`, assume `df` is a dictionary mapping columns to SQL native dtypes. 3188 3189 Returns 3190 ------- 3191 A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector. 3192 """ 3193 if not pipe.exists(debug=debug): 3194 return [] 3195 3196 if pipe.parameters.get('static', False): 3197 return [] 3198 3199 from decimal import Decimal 3200 import copy 3201 from meerschaum.utils.sql import ( 3202 sql_item_name, 3203 SINGLE_ALTER_TABLE_FLAVORS, 3204 get_table_cols_types, 3205 ) 3206 from meerschaum.utils.dtypes.sql import ( 3207 get_pd_type_from_db_type, 3208 get_db_type_from_pd_type, 3209 ) 3210 from meerschaum.utils.misc import flatten_list 3211 table_obj = self.get_pipe_table(pipe, debug=debug) 3212 is_dask = 'dask' in df.__module__ if not isinstance(df, dict) else False 3213 if is_dask: 3214 df = df.partitions[0].compute() 3215 df_cols_types = ( 3216 { 3217 col: str(typ) 3218 for col, typ in df.dtypes.items() 3219 } 3220 if not isinstance(df, dict) 3221 else copy.deepcopy(df) 3222 ) 3223 if not isinstance(df, dict) and len(df.index) > 0: 3224 for col, typ in list(df_cols_types.items()): 3225 if typ != 'object': 3226 continue 3227 val = df.iloc[0][col] 3228 if isinstance(val, (dict, list)): 3229 df_cols_types[col] = 'json' 3230 elif isinstance(val, Decimal): 3231 df_cols_types[col] = 'numeric' 3232 elif isinstance(val, str): 3233 df_cols_types[col] = 'str' 3234 db_cols_types = { 3235 col: get_pd_type_from_db_type(str(typ.type)) 3236 for col, typ in table_obj.columns.items() 3237 } if table_obj is not None else { 3238 col: get_pd_type_from_db_type(typ) 3239 for col, typ in get_table_cols_types( 3240 pipe.target, 3241 self, 3242 schema=self.get_pipe_schema(pipe), 3243 debug=debug, 3244 ).items() 3245 } 3246 new_cols = set(df_cols_types) - set(db_cols_types) 3247 if not new_cols: 3248 return [] 3249 3250 new_cols_types = { 3251 col: get_db_type_from_pd_type( 3252 df_cols_types[col], 3253 self.flavor 3254 ) 3255 for col in new_cols 3256 if col and df_cols_types.get(col, None) 3257 } 3258 3259 alter_table_query = "ALTER TABLE " + sql_item_name( 3260 pipe.target, self.flavor, self.get_pipe_schema(pipe) 3261 ) 3262 queries = [] 3263 for col, typ in new_cols_types.items(): 3264 add_col_query = ( 3265 "\nADD " 3266 + sql_item_name(col, self.flavor, None) 3267 + " " + typ + "," 3268 ) 3269 3270 if self.flavor in SINGLE_ALTER_TABLE_FLAVORS: 3271 queries.append(alter_table_query + add_col_query[:-1]) 3272 else: 3273 alter_table_query += add_col_query 3274 3275 ### For most flavors, only one query is required. 3276 ### This covers SQLite which requires one query per column. 3277 if not queries: 3278 queries.append(alter_table_query[:-1]) 3279 3280 if self.flavor != 'duckdb': 3281 return queries 3282 3283 ### NOTE: For DuckDB, we must drop and rebuild the indices. 3284 drop_index_queries = list(flatten_list( 3285 [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()] 3286 )) 3287 create_index_queries = list(flatten_list( 3288 [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()] 3289 )) 3290 3291 return drop_index_queries + queries + create_index_queries
Add new null columns of the correct type to a table from a dataframe.
Parameters
- pipe (mrsm.Pipe): The pipe to be altered.
- df (Union[pd.DataFrame, Dict[str, str]]): The pandas DataFrame which contains new columns. If a dictionary is provided, assume it maps columns to Pandas data types.
- _is_db_types (bool, default False):
If
True
, assumedf
is a dictionary mapping columns to SQL native dtypes.
Returns
- A list of the
ALTER TABLE
SQL query or queries to be executed on the provided connector.
3294def get_alter_columns_queries( 3295 self, 3296 pipe: mrsm.Pipe, 3297 df: Union[pd.DataFrame, Dict[str, str]], 3298 debug: bool = False, 3299) -> List[str]: 3300 """ 3301 If we encounter a column of a different type, set the entire column to text. 3302 If the altered columns are numeric, alter to numeric instead. 3303 3304 Parameters 3305 ---------- 3306 pipe: mrsm.Pipe 3307 The pipe to be altered. 3308 3309 df: Union[pd.DataFrame, Dict[str, str]] 3310 The pandas DataFrame which may contain altered columns. 3311 If a dict is provided, assume it maps columns to Pandas data types. 3312 3313 Returns 3314 ------- 3315 A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector. 3316 """ 3317 if not pipe.exists(debug=debug): 3318 return [] 3319 if pipe.static: 3320 return 3321 from meerschaum.utils.sql import ( 3322 sql_item_name, 3323 get_table_cols_types, 3324 DROP_IF_EXISTS_FLAVORS, 3325 SINGLE_ALTER_TABLE_FLAVORS, 3326 ) 3327 from meerschaum.utils.dataframe import get_numeric_cols 3328 from meerschaum.utils.dtypes import are_dtypes_equal 3329 from meerschaum.utils.dtypes.sql import ( 3330 get_pd_type_from_db_type, 3331 get_db_type_from_pd_type, 3332 ) 3333 from meerschaum.utils.misc import flatten_list, generate_password, items_str 3334 table_obj = self.get_pipe_table(pipe, debug=debug) 3335 target = pipe.target 3336 session_id = generate_password(3) 3337 numeric_cols = ( 3338 get_numeric_cols(df) 3339 if not isinstance(df, dict) 3340 else [ 3341 col 3342 for col, typ in df.items() 3343 if typ.startswith('numeric') 3344 ] 3345 ) 3346 df_cols_types = ( 3347 { 3348 col: str(typ) 3349 for col, typ in df.dtypes.items() 3350 } 3351 if not isinstance(df, dict) 3352 else df 3353 ) 3354 db_cols_types = { 3355 col: get_pd_type_from_db_type(str(typ.type)) 3356 for col, typ in table_obj.columns.items() 3357 } if table_obj is not None else { 3358 col: get_pd_type_from_db_type(typ) 3359 for col, typ in get_table_cols_types( 3360 pipe.target, 3361 self, 3362 schema=self.get_pipe_schema(pipe), 3363 debug=debug, 3364 ).items() 3365 } 3366 pipe_bool_cols = [col for col, typ in pipe.dtypes.items() if are_dtypes_equal(str(typ), 'bool')] 3367 pd_db_df_aliases = { 3368 'int': 'bool', 3369 'float': 'bool', 3370 'numeric': 'bool', 3371 'guid': 'object', 3372 } 3373 if self.flavor == 'oracle': 3374 pd_db_df_aliases['int'] = 'numeric' 3375 3376 altered_cols = { 3377 col: (db_cols_types.get(col, 'object'), typ) 3378 for col, typ in df_cols_types.items() 3379 if not are_dtypes_equal(typ, db_cols_types.get(col, 'object').lower()) 3380 and not are_dtypes_equal(db_cols_types.get(col, 'object'), 'string') 3381 } 3382 3383 ### NOTE: Sometimes bools are coerced into ints or floats. 3384 altered_cols_to_ignore = set() 3385 for col, (db_typ, df_typ) in altered_cols.items(): 3386 for db_alias, df_alias in pd_db_df_aliases.items(): 3387 if db_alias in db_typ.lower() and df_alias in df_typ.lower(): 3388 altered_cols_to_ignore.add(col) 3389 3390 ### Oracle's bool handling sometimes mixes NUMBER and INT. 3391 for bool_col in pipe_bool_cols: 3392 if bool_col not in altered_cols: 3393 continue 3394 db_is_bool_compatible = ( 3395 are_dtypes_equal('int', altered_cols[bool_col][0]) 3396 or are_dtypes_equal('float', altered_cols[bool_col][0]) 3397 or are_dtypes_equal('numeric', altered_cols[bool_col][0]) 3398 or are_dtypes_equal('bool', altered_cols[bool_col][0]) 3399 ) 3400 df_is_bool_compatible = ( 3401 are_dtypes_equal('int', altered_cols[bool_col][1]) 3402 or are_dtypes_equal('float', altered_cols[bool_col][1]) 3403 or are_dtypes_equal('numeric', altered_cols[bool_col][1]) 3404 or are_dtypes_equal('bool', altered_cols[bool_col][1]) 3405 ) 3406 if db_is_bool_compatible and df_is_bool_compatible: 3407 altered_cols_to_ignore.add(bool_col) 3408 3409 for col in altered_cols_to_ignore: 3410 _ = altered_cols.pop(col, None) 3411 if not altered_cols: 3412 return [] 3413 3414 if numeric_cols: 3415 pipe.dtypes.update({col: 'numeric' for col in numeric_cols}) 3416 edit_success, edit_msg = pipe.edit(debug=debug) 3417 if not edit_success: 3418 warn( 3419 f"Failed to update dtypes for numeric columns {items_str(numeric_cols)}:\n" 3420 + f"{edit_msg}" 3421 ) 3422 else: 3423 numeric_cols.extend([col for col, typ in pipe.dtypes.items() if typ.startswith('numeric')]) 3424 3425 numeric_type = get_db_type_from_pd_type('numeric', self.flavor, as_sqlalchemy=False) 3426 text_type = get_db_type_from_pd_type('str', self.flavor, as_sqlalchemy=False) 3427 altered_cols_types = { 3428 col: ( 3429 numeric_type 3430 if col in numeric_cols 3431 else text_type 3432 ) 3433 for col, (db_typ, typ) in altered_cols.items() 3434 } 3435 3436 if self.flavor == 'sqlite': 3437 temp_table_name = '-' + session_id + '_' + target 3438 rename_query = ( 3439 "ALTER TABLE " 3440 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3441 + " RENAME TO " 3442 + sql_item_name(temp_table_name, self.flavor, None) 3443 ) 3444 create_query = ( 3445 "CREATE TABLE " 3446 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3447 + " (\n" 3448 ) 3449 for col_name, col_obj in table_obj.columns.items(): 3450 create_query += ( 3451 sql_item_name(col_name, self.flavor, None) 3452 + " " 3453 + ( 3454 str(col_obj.type) 3455 if col_name not in altered_cols 3456 else altered_cols_types[col_name] 3457 ) 3458 + ",\n" 3459 ) 3460 create_query = create_query[:-2] + "\n)" 3461 3462 insert_query = ( 3463 "INSERT INTO " 3464 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3465 + ' (' 3466 + ', '.join([ 3467 sql_item_name(col_name, self.flavor, None) 3468 for col_name, _ in table_obj.columns.items() 3469 ]) 3470 + ')' 3471 + "\nSELECT\n" 3472 ) 3473 for col_name, col_obj in table_obj.columns.items(): 3474 new_col_str = ( 3475 sql_item_name(col_name, self.flavor, None) 3476 if col_name not in altered_cols 3477 else ( 3478 "CAST(" 3479 + sql_item_name(col_name, self.flavor, None) 3480 + " AS " 3481 + altered_cols_types[col_name] 3482 + ")" 3483 ) 3484 ) 3485 insert_query += new_col_str + ",\n" 3486 insert_query = insert_query[:-2] + ( 3487 f"\nFROM {sql_item_name(temp_table_name, self.flavor, self.get_pipe_schema(pipe))}" 3488 ) 3489 3490 if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else "" 3491 3492 drop_query = f"DROP TABLE {if_exists_str}" + sql_item_name( 3493 temp_table_name, self.flavor, self.get_pipe_schema(pipe) 3494 ) 3495 return [ 3496 rename_query, 3497 create_query, 3498 insert_query, 3499 drop_query, 3500 ] 3501 3502 queries = [] 3503 if self.flavor == 'oracle': 3504 for col, typ in altered_cols_types.items(): 3505 add_query = ( 3506 "ALTER TABLE " 3507 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3508 + "\nADD " + sql_item_name(col + '_temp', self.flavor, None) 3509 + " " + typ 3510 ) 3511 queries.append(add_query) 3512 3513 for col, typ in altered_cols_types.items(): 3514 populate_temp_query = ( 3515 "UPDATE " 3516 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3517 + "\nSET " + sql_item_name(col + '_temp', self.flavor, None) 3518 + ' = ' + sql_item_name(col, self.flavor, None) 3519 ) 3520 queries.append(populate_temp_query) 3521 3522 for col, typ in altered_cols_types.items(): 3523 set_old_cols_to_null_query = ( 3524 "UPDATE " 3525 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3526 + "\nSET " + sql_item_name(col, self.flavor, None) 3527 + ' = NULL' 3528 ) 3529 queries.append(set_old_cols_to_null_query) 3530 3531 for col, typ in altered_cols_types.items(): 3532 alter_type_query = ( 3533 "ALTER TABLE " 3534 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3535 + "\nMODIFY " + sql_item_name(col, self.flavor, None) + ' ' 3536 + typ 3537 ) 3538 queries.append(alter_type_query) 3539 3540 for col, typ in altered_cols_types.items(): 3541 set_old_to_temp_query = ( 3542 "UPDATE " 3543 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3544 + "\nSET " + sql_item_name(col, self.flavor, None) 3545 + ' = ' + sql_item_name(col + '_temp', self.flavor, None) 3546 ) 3547 queries.append(set_old_to_temp_query) 3548 3549 for col, typ in altered_cols_types.items(): 3550 drop_temp_query = ( 3551 "ALTER TABLE " 3552 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3553 + "\nDROP COLUMN " + sql_item_name(col + '_temp', self.flavor, None) 3554 ) 3555 queries.append(drop_temp_query) 3556 3557 return queries 3558 3559 query = "ALTER TABLE " + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3560 for col, typ in altered_cols_types.items(): 3561 alter_col_prefix = ( 3562 'ALTER' if self.flavor not in ('mysql', 'mariadb', 'oracle') 3563 else 'MODIFY' 3564 ) 3565 type_prefix = ( 3566 '' if self.flavor in ('mssql', 'mariadb', 'mysql') 3567 else 'TYPE ' 3568 ) 3569 column_str = 'COLUMN' if self.flavor != 'oracle' else '' 3570 query_suffix = ( 3571 f"\n{alter_col_prefix} {column_str} " 3572 + sql_item_name(col, self.flavor, None) 3573 + " " + type_prefix + typ + "," 3574 ) 3575 if self.flavor not in SINGLE_ALTER_TABLE_FLAVORS: 3576 query += query_suffix 3577 else: 3578 queries.append(query + query_suffix[:-1]) 3579 3580 if self.flavor not in SINGLE_ALTER_TABLE_FLAVORS: 3581 queries.append(query[:-1]) 3582 3583 if self.flavor != 'duckdb': 3584 return queries 3585 3586 drop_index_queries = list(flatten_list( 3587 [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()] 3588 )) 3589 create_index_queries = list(flatten_list( 3590 [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()] 3591 )) 3592 3593 return drop_index_queries + queries + create_index_queries
If we encounter a column of a different type, set the entire column to text. If the altered columns are numeric, alter to numeric instead.
Parameters
- pipe (mrsm.Pipe): The pipe to be altered.
- df (Union[pd.DataFrame, Dict[str, str]]): The pandas DataFrame which may contain altered columns. If a dict is provided, assume it maps columns to Pandas data types.
Returns
- A list of the
ALTER TABLE
SQL query or queries to be executed on the provided connector.
972def delete_pipe( 973 self, 974 pipe: mrsm.Pipe, 975 debug: bool = False, 976) -> SuccessTuple: 977 """ 978 Delete a Pipe's registration. 979 """ 980 from meerschaum.utils.packages import attempt_import 981 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 982 983 if not pipe.id: 984 return False, f"{pipe} is not registered." 985 986 ### ensure pipes table exists 987 from meerschaum.connectors.sql.tables import get_tables 988 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 989 990 q = sqlalchemy.delete(pipes_tbl).where(pipes_tbl.c.pipe_id == pipe.id) 991 if not self.exec(q, debug=debug): 992 return False, f"Failed to delete registration for {pipe}." 993 994 return True, "Success"
Delete a Pipe's registration.
997def get_pipe_data( 998 self, 999 pipe: mrsm.Pipe, 1000 select_columns: Optional[List[str]] = None, 1001 omit_columns: Optional[List[str]] = None, 1002 begin: Union[datetime, str, None] = None, 1003 end: Union[datetime, str, None] = None, 1004 params: Optional[Dict[str, Any]] = None, 1005 order: str = 'asc', 1006 limit: Optional[int] = None, 1007 begin_add_minutes: int = 0, 1008 end_add_minutes: int = 0, 1009 debug: bool = False, 1010 **kw: Any 1011) -> Union[pd.DataFrame, None]: 1012 """ 1013 Access a pipe's data from the SQL instance. 1014 1015 Parameters 1016 ---------- 1017 pipe: mrsm.Pipe: 1018 The pipe to get data from. 1019 1020 select_columns: Optional[List[str]], default None 1021 If provided, only select these given columns. 1022 Otherwise select all available columns (i.e. `SELECT *`). 1023 1024 omit_columns: Optional[List[str]], default None 1025 If provided, remove these columns from the selection. 1026 1027 begin: Union[datetime, str, None], default None 1028 If provided, get rows newer than or equal to this value. 1029 1030 end: Union[datetime, str, None], default None 1031 If provided, get rows older than or equal to this value. 1032 1033 params: Optional[Dict[str, Any]], default None 1034 Additional parameters to filter by. 1035 See `meerschaum.connectors.sql.build_where`. 1036 1037 order: Optional[str], default 'asc' 1038 The selection order for all of the indices in the query. 1039 If `None`, omit the `ORDER BY` clause. 1040 1041 limit: Optional[int], default None 1042 If specified, limit the number of rows retrieved to this value. 1043 1044 begin_add_minutes: int, default 0 1045 The number of minutes to add to the `begin` datetime (i.e. `DATEADD`. 1046 1047 end_add_minutes: int, default 0 1048 The number of minutes to add to the `end` datetime (i.e. `DATEADD`. 1049 1050 chunksize: Optional[int], default -1 1051 The size of dataframe chunks to load into memory. 1052 1053 debug: bool, default False 1054 Verbosity toggle. 1055 1056 Returns 1057 ------- 1058 A `pd.DataFrame` of the pipe's data. 1059 1060 """ 1061 import json 1062 from meerschaum.utils.misc import parse_df_datetimes, to_pandas_dtype 1063 from meerschaum.utils.packages import import_pandas 1064 from meerschaum.utils.dtypes import ( 1065 attempt_cast_to_numeric, 1066 attempt_cast_to_uuid, 1067 attempt_cast_to_bytes, 1068 attempt_cast_to_geometry, 1069 are_dtypes_equal, 1070 ) 1071 from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type 1072 pd = import_pandas() 1073 is_dask = 'dask' in pd.__name__ 1074 1075 cols_types = pipe.get_columns_types(debug=debug) if pipe.enforce else {} 1076 dtypes = { 1077 **{ 1078 p_col: to_pandas_dtype(p_typ) 1079 for p_col, p_typ in pipe.dtypes.items() 1080 }, 1081 **{ 1082 col: get_pd_type_from_db_type(typ) 1083 for col, typ in cols_types.items() 1084 } 1085 } if pipe.enforce else {} 1086 if dtypes: 1087 if self.flavor == 'sqlite': 1088 if not pipe.columns.get('datetime', None): 1089 _dt = pipe.guess_datetime() 1090 else: 1091 _dt = pipe.get_columns('datetime') 1092 1093 if _dt: 1094 dt_type = dtypes.get(_dt, 'object').lower() 1095 if 'datetime' not in dt_type: 1096 if 'int' not in dt_type: 1097 dtypes[_dt] = 'datetime64[ns, UTC]' 1098 1099 existing_cols = cols_types.keys() 1100 select_columns = ( 1101 [ 1102 col 1103 for col in existing_cols 1104 if col not in (omit_columns or []) 1105 ] 1106 if not select_columns 1107 else [ 1108 col 1109 for col in select_columns 1110 if col in existing_cols 1111 and col not in (omit_columns or []) 1112 ] 1113 ) if pipe.enforce else select_columns 1114 if select_columns: 1115 dtypes = {col: typ for col, typ in dtypes.items() if col in select_columns} 1116 dtypes = { 1117 col: to_pandas_dtype(typ) 1118 for col, typ in dtypes.items() 1119 if col in select_columns and col not in (omit_columns or []) 1120 } if pipe.enforce else {} 1121 query = self.get_pipe_data_query( 1122 pipe, 1123 select_columns=select_columns, 1124 omit_columns=omit_columns, 1125 begin=begin, 1126 end=end, 1127 params=params, 1128 order=order, 1129 limit=limit, 1130 begin_add_minutes=begin_add_minutes, 1131 end_add_minutes=end_add_minutes, 1132 debug=debug, 1133 **kw 1134 ) 1135 1136 if is_dask: 1137 index_col = pipe.columns.get('datetime', None) 1138 kw['index_col'] = index_col 1139 1140 numeric_columns = [ 1141 col 1142 for col, typ in pipe.dtypes.items() 1143 if typ.startswith('numeric') and col in dtypes 1144 ] 1145 uuid_columns = [ 1146 col 1147 for col, typ in pipe.dtypes.items() 1148 if typ == 'uuid' and col in dtypes 1149 ] 1150 bytes_columns = [ 1151 col 1152 for col, typ in pipe.dtypes.items() 1153 if typ == 'bytes' and col in dtypes 1154 ] 1155 geometry_columns = [ 1156 col 1157 for col, typ in pipe.dtypes.items() 1158 if typ.startswith('geometry') and col in dtypes 1159 ] 1160 1161 kw['coerce_float'] = kw.get('coerce_float', (len(numeric_columns) == 0)) 1162 1163 df = self.read( 1164 query, 1165 dtype=dtypes, 1166 debug=debug, 1167 **kw 1168 ) 1169 for col in numeric_columns: 1170 if col not in df.columns: 1171 continue 1172 df[col] = df[col].apply(attempt_cast_to_numeric) 1173 1174 for col in uuid_columns: 1175 if col not in df.columns: 1176 continue 1177 df[col] = df[col].apply(attempt_cast_to_uuid) 1178 1179 for col in bytes_columns: 1180 if col not in df.columns: 1181 continue 1182 df[col] = df[col].apply(attempt_cast_to_bytes) 1183 1184 for col in geometry_columns: 1185 if col not in df.columns: 1186 continue 1187 df[col] = df[col].apply(attempt_cast_to_geometry) 1188 1189 if self.flavor == 'sqlite': 1190 ignore_dt_cols = [ 1191 col 1192 for col, dtype in pipe.dtypes.items() 1193 if not are_dtypes_equal(str(dtype), 'datetime') 1194 ] 1195 ### NOTE: We have to consume the iterator here to ensure that datetimes are parsed correctly 1196 df = ( 1197 parse_df_datetimes( 1198 df, 1199 ignore_cols=ignore_dt_cols, 1200 chunksize=kw.get('chunksize', None), 1201 strip_timezone=(pipe.tzinfo is None), 1202 debug=debug, 1203 ) if isinstance(df, pd.DataFrame) else ( 1204 [ 1205 parse_df_datetimes( 1206 c, 1207 ignore_cols=ignore_dt_cols, 1208 chunksize=kw.get('chunksize', None), 1209 strip_timezone=(pipe.tzinfo is None), 1210 debug=debug, 1211 ) 1212 for c in df 1213 ] 1214 ) 1215 ) 1216 for col, typ in dtypes.items(): 1217 if typ != 'json': 1218 continue 1219 df[col] = df[col].apply(lambda x: json.loads(x) if x is not None else x) 1220 return df
Access a pipe's data from the SQL instance.
Parameters
- pipe (mrsm.Pipe:): The pipe to get data from.
- select_columns (Optional[List[str]], default None):
If provided, only select these given columns.
Otherwise select all available columns (i.e.
SELECT *
). - omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
- begin (Union[datetime, str, None], default None): If provided, get rows newer than or equal to this value.
- end (Union[datetime, str, None], default None): If provided, get rows older than or equal to this value.
- params (Optional[Dict[str, Any]], default None):
Additional parameters to filter by.
See
meerschaum.connectors.sql.build_where
. - order (Optional[str], default 'asc'):
The selection order for all of the indices in the query.
If
None
, omit theORDER BY
clause. - limit (Optional[int], default None): If specified, limit the number of rows retrieved to this value.
- begin_add_minutes (int, default 0):
The number of minutes to add to the
begin
datetime (i.e.DATEADD
. - end_add_minutes (int, default 0):
The number of minutes to add to the
end
datetime (i.e.DATEADD
. - chunksize (Optional[int], default -1): The size of dataframe chunks to load into memory.
- debug (bool, default False): Verbosity toggle.
Returns
- A
pd.DataFrame
of the pipe's data.
1223def get_pipe_data_query( 1224 self, 1225 pipe: mrsm.Pipe, 1226 select_columns: Optional[List[str]] = None, 1227 omit_columns: Optional[List[str]] = None, 1228 begin: Union[datetime, int, str, None] = None, 1229 end: Union[datetime, int, str, None] = None, 1230 params: Optional[Dict[str, Any]] = None, 1231 order: Optional[str] = 'asc', 1232 sort_datetimes: bool = False, 1233 limit: Optional[int] = None, 1234 begin_add_minutes: int = 0, 1235 end_add_minutes: int = 0, 1236 replace_nulls: Optional[str] = None, 1237 skip_existing_cols_check: bool = False, 1238 debug: bool = False, 1239 **kw: Any 1240) -> Union[str, None]: 1241 """ 1242 Return the `SELECT` query for retrieving a pipe's data from its instance. 1243 1244 Parameters 1245 ---------- 1246 pipe: mrsm.Pipe: 1247 The pipe to get data from. 1248 1249 select_columns: Optional[List[str]], default None 1250 If provided, only select these given columns. 1251 Otherwise select all available columns (i.e. `SELECT *`). 1252 1253 omit_columns: Optional[List[str]], default None 1254 If provided, remove these columns from the selection. 1255 1256 begin: Union[datetime, int, str, None], default None 1257 If provided, get rows newer than or equal to this value. 1258 1259 end: Union[datetime, str, None], default None 1260 If provided, get rows older than or equal to this value. 1261 1262 params: Optional[Dict[str, Any]], default None 1263 Additional parameters to filter by. 1264 See `meerschaum.connectors.sql.build_where`. 1265 1266 order: Optional[str], default None 1267 The selection order for all of the indices in the query. 1268 If `None`, omit the `ORDER BY` clause. 1269 1270 sort_datetimes: bool, default False 1271 Alias for `order='desc'`. 1272 1273 limit: Optional[int], default None 1274 If specified, limit the number of rows retrieved to this value. 1275 1276 begin_add_minutes: int, default 0 1277 The number of minutes to add to the `begin` datetime (i.e. `DATEADD`). 1278 1279 end_add_minutes: int, default 0 1280 The number of minutes to add to the `end` datetime (i.e. `DATEADD`). 1281 1282 chunksize: Optional[int], default -1 1283 The size of dataframe chunks to load into memory. 1284 1285 replace_nulls: Optional[str], default None 1286 If provided, replace null values with this value. 1287 1288 skip_existing_cols_check: bool, default False 1289 If `True`, do not verify that querying columns are actually on the table. 1290 1291 debug: bool, default False 1292 Verbosity toggle. 1293 1294 Returns 1295 ------- 1296 A `SELECT` query to retrieve a pipe's data. 1297 """ 1298 from meerschaum.utils.misc import items_str 1299 from meerschaum.utils.sql import sql_item_name, dateadd_str 1300 from meerschaum.utils.dtypes import coerce_timezone 1301 from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type, get_db_type_from_pd_type 1302 1303 dt_col = pipe.columns.get('datetime', None) 1304 existing_cols = pipe.get_columns_types(debug=debug) if pipe.enforce else [] 1305 skip_existing_cols_check = skip_existing_cols_check or not pipe.enforce 1306 dt_typ = get_pd_type_from_db_type(existing_cols[dt_col]) if dt_col in existing_cols else None 1307 dt_db_type = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None 1308 select_columns = ( 1309 [col for col in existing_cols] 1310 if not select_columns 1311 else [col for col in select_columns if skip_existing_cols_check or col in existing_cols] 1312 ) 1313 if omit_columns: 1314 select_columns = [col for col in select_columns if col not in omit_columns] 1315 1316 if order is None and sort_datetimes: 1317 order = 'desc' 1318 1319 if begin == '': 1320 begin = pipe.get_sync_time(debug=debug) 1321 backtrack_interval = pipe.get_backtrack_interval(debug=debug) 1322 if begin is not None: 1323 begin -= backtrack_interval 1324 1325 begin, end = pipe.parse_date_bounds(begin, end) 1326 if isinstance(begin, datetime) and dt_typ: 1327 begin = coerce_timezone(begin, strip_utc=('utc' not in dt_typ.lower())) 1328 if isinstance(end, datetime) and dt_typ: 1329 end = coerce_timezone(end, strip_utc=('utc' not in dt_typ.lower())) 1330 1331 cols_names = [ 1332 sql_item_name(col, self.flavor, None) 1333 for col in select_columns 1334 ] 1335 select_cols_str = ( 1336 'SELECT\n ' 1337 + ',\n '.join( 1338 [ 1339 ( 1340 col_name 1341 if not replace_nulls 1342 else f"COALESCE(col_name, '{replace_nulls}') AS {col_name}" 1343 ) 1344 for col_name in cols_names 1345 ] 1346 ) 1347 ) if cols_names else 'SELECT *' 1348 pipe_table_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 1349 query = f"{select_cols_str}\nFROM {pipe_table_name}" 1350 where = "" 1351 1352 if order is not None: 1353 default_order = 'asc' 1354 if order not in ('asc', 'desc'): 1355 warn(f"Ignoring unsupported order '{order}'. Falling back to '{default_order}'.") 1356 order = default_order 1357 order = order.upper() 1358 1359 if not pipe.columns.get('datetime', None): 1360 _dt = pipe.guess_datetime() 1361 dt = sql_item_name(_dt, self.flavor, None) if _dt else None 1362 is_guess = True 1363 else: 1364 _dt = pipe.get_columns('datetime') 1365 dt = sql_item_name(_dt, self.flavor, None) 1366 is_guess = False 1367 1368 quoted_indices = { 1369 key: sql_item_name(val, self.flavor, None) 1370 for key, val in pipe.columns.items() 1371 if val in existing_cols or skip_existing_cols_check 1372 } 1373 1374 if begin is not None or end is not None: 1375 if is_guess: 1376 if _dt is None: 1377 warn( 1378 f"No datetime could be determined for {pipe}." 1379 + "\n Ignoring begin and end...", 1380 stack=False, 1381 ) 1382 begin, end = None, None 1383 else: 1384 warn( 1385 f"A datetime wasn't specified for {pipe}.\n" 1386 + f" Using column \"{_dt}\" for datetime bounds...", 1387 stack=False, 1388 ) 1389 1390 is_dt_bound = False 1391 if begin is not None and (_dt in existing_cols or skip_existing_cols_check): 1392 begin_da = dateadd_str( 1393 flavor=self.flavor, 1394 datepart='minute', 1395 number=begin_add_minutes, 1396 begin=begin, 1397 db_type=dt_db_type, 1398 ) 1399 where += f"\n {dt} >= {begin_da}" + ("\n AND\n " if end is not None else "") 1400 is_dt_bound = True 1401 1402 if end is not None and (_dt in existing_cols or skip_existing_cols_check): 1403 if 'int' in str(type(end)).lower() and end == begin: 1404 end += 1 1405 end_da = dateadd_str( 1406 flavor=self.flavor, 1407 datepart='minute', 1408 number=end_add_minutes, 1409 begin=end, 1410 db_type=dt_db_type, 1411 ) 1412 where += f"{dt} < {end_da}" 1413 is_dt_bound = True 1414 1415 if params is not None: 1416 from meerschaum.utils.sql import build_where 1417 valid_params = { 1418 k: v 1419 for k, v in params.items() 1420 if k in existing_cols or skip_existing_cols_check 1421 } 1422 if valid_params: 1423 where += build_where(valid_params, self).replace( 1424 'WHERE', (' AND' if is_dt_bound else " ") 1425 ) 1426 1427 if len(where) > 0: 1428 query += "\nWHERE " + where 1429 1430 if order is not None: 1431 ### Sort by indices, starting with datetime. 1432 order_by = "" 1433 if quoted_indices: 1434 order_by += "\nORDER BY " 1435 if _dt and (_dt in existing_cols or skip_existing_cols_check): 1436 order_by += dt + ' ' + order + ',' 1437 for key, quoted_col_name in quoted_indices.items(): 1438 if dt == quoted_col_name: 1439 continue 1440 order_by += ' ' + quoted_col_name + ' ' + order + ',' 1441 order_by = order_by[:-1] 1442 1443 query += order_by 1444 1445 if isinstance(limit, int): 1446 if self.flavor == 'mssql': 1447 query = f'SELECT TOP {limit}\n' + query[len("SELECT "):] 1448 elif self.flavor == 'oracle': 1449 query = ( 1450 f"SELECT * FROM (\n {query}\n)\n" 1451 + f"WHERE ROWNUM IN ({', '.join([str(i) for i in range(1, limit+1)])})" 1452 ) 1453 else: 1454 query += f"\nLIMIT {limit}" 1455 1456 if debug: 1457 to_print = ( 1458 [] 1459 + ([f"begin='{begin}'"] if begin else []) 1460 + ([f"end='{end}'"] if end else []) 1461 + ([f"params={params}"] if params else []) 1462 ) 1463 dprint("Getting pipe data with constraints: " + items_str(to_print, quotes=False)) 1464 1465 return query
Return the SELECT
query for retrieving a pipe's data from its instance.
Parameters
- pipe (mrsm.Pipe:): The pipe to get data from.
- select_columns (Optional[List[str]], default None):
If provided, only select these given columns.
Otherwise select all available columns (i.e.
SELECT *
). - omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
- begin (Union[datetime, int, str, None], default None): If provided, get rows newer than or equal to this value.
- end (Union[datetime, str, None], default None): If provided, get rows older than or equal to this value.
- params (Optional[Dict[str, Any]], default None):
Additional parameters to filter by.
See
meerschaum.connectors.sql.build_where
. - order (Optional[str], default None):
The selection order for all of the indices in the query.
If
None
, omit theORDER BY
clause. - sort_datetimes (bool, default False):
Alias for
order='desc'
. - limit (Optional[int], default None): If specified, limit the number of rows retrieved to this value.
- begin_add_minutes (int, default 0):
The number of minutes to add to the
begin
datetime (i.e.DATEADD
). - end_add_minutes (int, default 0):
The number of minutes to add to the
end
datetime (i.e.DATEADD
). - chunksize (Optional[int], default -1): The size of dataframe chunks to load into memory.
- replace_nulls (Optional[str], default None): If provided, replace null values with this value.
- skip_existing_cols_check (bool, default False):
If
True
, do not verify that querying columns are actually on the table. - debug (bool, default False): Verbosity toggle.
Returns
- A
SELECT
query to retrieve a pipe's data.
20def register_pipe( 21 self, 22 pipe: mrsm.Pipe, 23 debug: bool = False, 24) -> SuccessTuple: 25 """ 26 Register a new pipe. 27 A pipe's attributes must be set before registering. 28 """ 29 from meerschaum.utils.debug import dprint 30 from meerschaum.utils.packages import attempt_import 31 from meerschaum.utils.sql import json_flavors 32 33 ### ensure pipes table exists 34 from meerschaum.connectors.sql.tables import get_tables 35 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 36 37 if pipe.get_id(debug=debug) is not None: 38 return False, f"{pipe} is already registered." 39 40 ### NOTE: if `parameters` is supplied in the Pipe constructor, 41 ### then `pipe.parameters` will exist and not be fetched from the database. 42 43 ### 1. Prioritize the Pipe object's `parameters` first. 44 ### E.g. if the user manually sets the `parameters` property 45 ### or if the Pipe already exists 46 ### (which shouldn't be able to be registered anyway but that's an issue for later). 47 parameters = None 48 try: 49 parameters = pipe.parameters 50 except Exception as e: 51 if debug: 52 dprint(str(e)) 53 parameters = None 54 55 ### ensure `parameters` is a dictionary 56 if parameters is None: 57 parameters = {} 58 59 import json 60 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 61 values = { 62 'connector_keys' : pipe.connector_keys, 63 'metric_key' : pipe.metric_key, 64 'location_key' : pipe.location_key, 65 'parameters' : ( 66 json.dumps(parameters) 67 if self.flavor not in json_flavors 68 else parameters 69 ), 70 } 71 query = sqlalchemy.insert(pipes_tbl).values(**values) 72 result = self.exec(query, debug=debug) 73 if result is None: 74 return False, f"Failed to register {pipe}." 75 return True, f"Successfully registered {pipe}."
Register a new pipe. A pipe's attributes must be set before registering.
78def edit_pipe( 79 self, 80 pipe : mrsm.Pipe = None, 81 patch: bool = False, 82 debug: bool = False, 83 **kw : Any 84) -> SuccessTuple: 85 """ 86 Persist a Pipe's parameters to its database. 87 88 Parameters 89 ---------- 90 pipe: mrsm.Pipe, default None 91 The pipe to be edited. 92 patch: bool, default False 93 If patch is `True`, update the existing parameters by cascading. 94 Otherwise overwrite the parameters (default). 95 debug: bool, default False 96 Verbosity toggle. 97 """ 98 99 if pipe.id is None: 100 return False, f"{pipe} is not registered and cannot be edited." 101 102 from meerschaum.utils.packages import attempt_import 103 from meerschaum.utils.sql import json_flavors 104 if not patch: 105 parameters = pipe.__dict__.get('_attributes', {}).get('parameters', {}) 106 else: 107 from meerschaum import Pipe 108 from meerschaum.config._patch import apply_patch_to_config 109 original_parameters = Pipe( 110 pipe.connector_keys, pipe.metric_key, pipe.location_key, 111 mrsm_instance=pipe.instance_keys 112 ).parameters 113 parameters = apply_patch_to_config( 114 original_parameters, 115 pipe.parameters 116 ) 117 118 ### ensure pipes table exists 119 from meerschaum.connectors.sql.tables import get_tables 120 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 121 122 import json 123 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 124 125 values = { 126 'parameters': ( 127 json.dumps(parameters) 128 if self.flavor not in json_flavors 129 else parameters 130 ), 131 } 132 q = sqlalchemy.update(pipes_tbl).values(**values).where( 133 pipes_tbl.c.pipe_id == pipe.id 134 ) 135 136 result = self.exec(q, debug=debug) 137 message = ( 138 f"Successfully edited {pipe}." 139 if result is not None else f"Failed to edit {pipe}." 140 ) 141 return (result is not None), message
Persist a Pipe's parameters to its database.
Parameters
- pipe (mrsm.Pipe, default None): The pipe to be edited.
- patch (bool, default False):
If patch is
True
, update the existing parameters by cascading. Otherwise overwrite the parameters (default). - debug (bool, default False): Verbosity toggle.
1468def get_pipe_id( 1469 self, 1470 pipe: mrsm.Pipe, 1471 debug: bool = False, 1472) -> Any: 1473 """ 1474 Get a Pipe's ID from the pipes table. 1475 """ 1476 if pipe.temporary: 1477 return None 1478 from meerschaum.utils.packages import attempt_import 1479 sqlalchemy = attempt_import('sqlalchemy') 1480 from meerschaum.connectors.sql.tables import get_tables 1481 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 1482 1483 query = sqlalchemy.select(pipes_tbl.c.pipe_id).where( 1484 pipes_tbl.c.connector_keys == pipe.connector_keys 1485 ).where( 1486 pipes_tbl.c.metric_key == pipe.metric_key 1487 ).where( 1488 (pipes_tbl.c.location_key == pipe.location_key) if pipe.location_key is not None 1489 else pipes_tbl.c.location_key.is_(None) 1490 ) 1491 _id = self.value(query, debug=debug, silent=pipe.temporary) 1492 if _id is not None: 1493 _id = int(_id) 1494 return _id
Get a Pipe's ID from the pipes table.
1497def get_pipe_attributes( 1498 self, 1499 pipe: mrsm.Pipe, 1500 debug: bool = False, 1501) -> Dict[str, Any]: 1502 """ 1503 Get a Pipe's attributes dictionary. 1504 """ 1505 from meerschaum.connectors.sql.tables import get_tables 1506 from meerschaum.utils.packages import attempt_import 1507 sqlalchemy = attempt_import('sqlalchemy') 1508 1509 if pipe.get_id(debug=debug) is None: 1510 return {} 1511 1512 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 1513 1514 try: 1515 q = sqlalchemy.select(pipes_tbl).where(pipes_tbl.c.pipe_id == pipe.id) 1516 if debug: 1517 dprint(q) 1518 attributes = ( 1519 dict(self.exec(q, silent=True, debug=debug).first()._mapping) 1520 if self.flavor != 'duckdb' 1521 else self.read(q, debug=debug).to_dict(orient='records')[0] 1522 ) 1523 except Exception as e: 1524 import traceback 1525 traceback.print_exc() 1526 warn(e) 1527 print(pipe) 1528 return {} 1529 1530 ### handle non-PostgreSQL databases (text vs JSON) 1531 if not isinstance(attributes.get('parameters', None), dict): 1532 try: 1533 import json 1534 parameters = json.loads(attributes['parameters']) 1535 if isinstance(parameters, str) and parameters[0] == '{': 1536 parameters = json.loads(parameters) 1537 attributes['parameters'] = parameters 1538 except Exception: 1539 attributes['parameters'] = {} 1540 1541 return attributes
Get a Pipe's attributes dictionary.
1648def sync_pipe( 1649 self, 1650 pipe: mrsm.Pipe, 1651 df: Union[pd.DataFrame, str, Dict[Any, Any], None] = None, 1652 begin: Optional[datetime] = None, 1653 end: Optional[datetime] = None, 1654 chunksize: Optional[int] = -1, 1655 check_existing: bool = True, 1656 blocking: bool = True, 1657 debug: bool = False, 1658 _check_temporary_tables: bool = True, 1659 **kw: Any 1660) -> SuccessTuple: 1661 """ 1662 Sync a pipe using a database connection. 1663 1664 Parameters 1665 ---------- 1666 pipe: mrsm.Pipe 1667 The Meerschaum Pipe instance into which to sync the data. 1668 1669 df: Union[pandas.DataFrame, str, Dict[Any, Any], List[Dict[str, Any]]] 1670 An optional DataFrame or equivalent to sync into the pipe. 1671 Defaults to `None`. 1672 1673 begin: Optional[datetime], default None 1674 Optionally specify the earliest datetime to search for data. 1675 Defaults to `None`. 1676 1677 end: Optional[datetime], default None 1678 Optionally specify the latest datetime to search for data. 1679 Defaults to `None`. 1680 1681 chunksize: Optional[int], default -1 1682 Specify the number of rows to sync per chunk. 1683 If `-1`, resort to system configuration (default is `900`). 1684 A `chunksize` of `None` will sync all rows in one transaction. 1685 Defaults to `-1`. 1686 1687 check_existing: bool, default True 1688 If `True`, pull and diff with existing data from the pipe. Defaults to `True`. 1689 1690 blocking: bool, default True 1691 If `True`, wait for sync to finish and return its result, otherwise asyncronously sync. 1692 Defaults to `True`. 1693 1694 debug: bool, default False 1695 Verbosity toggle. Defaults to False. 1696 1697 kw: Any 1698 Catch-all for keyword arguments. 1699 1700 Returns 1701 ------- 1702 A `SuccessTuple` of success (`bool`) and message (`str`). 1703 """ 1704 from meerschaum.utils.packages import import_pandas 1705 from meerschaum.utils.sql import ( 1706 get_update_queries, 1707 sql_item_name, 1708 UPDATE_QUERIES, 1709 get_reset_autoincrement_queries, 1710 ) 1711 from meerschaum.utils.dtypes import are_dtypes_equal 1712 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 1713 from meerschaum import Pipe 1714 import time 1715 import copy 1716 pd = import_pandas() 1717 if df is None: 1718 msg = f"DataFrame is None. Cannot sync {pipe}." 1719 warn(msg) 1720 return False, msg 1721 1722 start = time.perf_counter() 1723 pipe_name = sql_item_name(pipe.target, self.flavor, schema=self.get_pipe_schema(pipe)) 1724 1725 if not pipe.temporary and not pipe.get_id(debug=debug): 1726 register_tuple = pipe.register(debug=debug) 1727 if not register_tuple[0]: 1728 return register_tuple 1729 1730 ### df is the dataframe returned from the remote source 1731 ### via the connector 1732 if debug: 1733 dprint("Fetched data:\n" + str(df)) 1734 1735 if not isinstance(df, pd.DataFrame): 1736 df = pipe.enforce_dtypes( 1737 df, 1738 chunksize=chunksize, 1739 safe_copy=kw.get('safe_copy', False), 1740 debug=debug, 1741 ) 1742 1743 ### if table does not exist, create it with indices 1744 is_new = False 1745 if not pipe.exists(debug=debug): 1746 check_existing = False 1747 is_new = True 1748 else: 1749 ### Check for new columns. 1750 add_cols_queries = self.get_add_columns_queries(pipe, df, debug=debug) 1751 if add_cols_queries: 1752 _ = pipe.__dict__.pop('_columns_indices', None) 1753 _ = pipe.__dict__.pop('_columns_types', None) 1754 if not self.exec_queries(add_cols_queries, debug=debug): 1755 warn(f"Failed to add new columns to {pipe}.") 1756 1757 alter_cols_queries = self.get_alter_columns_queries(pipe, df, debug=debug) 1758 if alter_cols_queries: 1759 _ = pipe.__dict__.pop('_columns_indices', None) 1760 _ = pipe.__dict__.pop('_columns_types', None) 1761 if not self.exec_queries(alter_cols_queries, debug=debug): 1762 warn(f"Failed to alter columns for {pipe}.") 1763 else: 1764 _ = pipe.infer_dtypes(persist=True) 1765 1766 ### NOTE: Oracle SQL < 23c (2023) and SQLite does not support booleans, 1767 ### so infer bools and persist them to `dtypes`. 1768 if self.flavor in ('oracle', 'sqlite', 'mysql', 'mariadb'): 1769 pipe_dtypes = pipe.dtypes 1770 new_bool_cols = { 1771 col: 'bool[pyarrow]' 1772 for col, typ in df.dtypes.items() 1773 if col not in pipe_dtypes 1774 and are_dtypes_equal(str(typ), 'bool') 1775 } 1776 pipe_dtypes.update(new_bool_cols) 1777 pipe.dtypes = pipe_dtypes 1778 if new_bool_cols and not pipe.temporary: 1779 infer_bool_success, infer_bool_msg = pipe.edit(debug=debug) 1780 if not infer_bool_success: 1781 return infer_bool_success, infer_bool_msg 1782 1783 upsert = pipe.parameters.get('upsert', False) and (self.flavor + '-upsert') in UPDATE_QUERIES 1784 if upsert: 1785 check_existing = False 1786 kw['safe_copy'] = kw.get('safe_copy', False) 1787 1788 unseen_df, update_df, delta_df = ( 1789 pipe.filter_existing( 1790 df, 1791 chunksize=chunksize, 1792 debug=debug, 1793 **kw 1794 ) if check_existing else (df, None, df) 1795 ) 1796 if upsert: 1797 unseen_df, update_df, delta_df = (df.head(0), df, df) 1798 1799 if debug: 1800 dprint("Delta data:\n" + str(delta_df)) 1801 dprint("Unseen data:\n" + str(unseen_df)) 1802 if update_df is not None: 1803 dprint(("Update" if not upsert else "Upsert") + " data:\n" + str(update_df)) 1804 1805 if_exists = kw.get('if_exists', 'append') 1806 if 'if_exists' in kw: 1807 kw.pop('if_exists') 1808 if 'name' in kw: 1809 kw.pop('name') 1810 1811 ### Insert new data into Pipe's table. 1812 unseen_kw = copy.deepcopy(kw) 1813 unseen_kw.update({ 1814 'name': pipe.target, 1815 'if_exists': if_exists, 1816 'debug': debug, 1817 'as_dict': True, 1818 'safe_copy': kw.get('safe_copy', False), 1819 'chunksize': chunksize, 1820 'dtype': self.get_to_sql_dtype(pipe, unseen_df, update_dtypes=True), 1821 'schema': self.get_pipe_schema(pipe), 1822 }) 1823 1824 dt_col = pipe.columns.get('datetime', None) 1825 primary_key = pipe.columns.get('primary', None) 1826 autoincrement = ( 1827 pipe.parameters.get('autoincrement', False) 1828 or ( 1829 is_new 1830 and primary_key 1831 and primary_key 1832 not in pipe.dtypes 1833 and primary_key not in unseen_df.columns 1834 ) 1835 ) 1836 if autoincrement and autoincrement not in pipe.parameters: 1837 pipe.parameters['autoincrement'] = autoincrement 1838 edit_success, edit_msg = pipe.edit(debug=debug) 1839 if not edit_success: 1840 return edit_success, edit_msg 1841 1842 def _check_pk(_df_to_clear): 1843 if _df_to_clear is None: 1844 return 1845 if primary_key not in _df_to_clear.columns: 1846 return 1847 if not _df_to_clear[primary_key].notnull().any(): 1848 del _df_to_clear[primary_key] 1849 1850 autoincrement_needs_reset = bool( 1851 autoincrement 1852 and primary_key 1853 and primary_key in unseen_df.columns 1854 and unseen_df[primary_key].notnull().any() 1855 ) 1856 if autoincrement and primary_key: 1857 for _df_to_clear in (unseen_df, update_df, delta_df): 1858 _check_pk(_df_to_clear) 1859 1860 if is_new: 1861 create_success, create_msg = self.create_pipe_table_from_df( 1862 pipe, 1863 unseen_df, 1864 debug=debug, 1865 ) 1866 if not create_success: 1867 return create_success, create_msg 1868 1869 do_identity_insert = bool( 1870 self.flavor in ('mssql',) 1871 and primary_key 1872 and primary_key in unseen_df.columns 1873 and autoincrement 1874 ) 1875 stats = {'success': True, 'msg': ''} 1876 if len(unseen_df) > 0: 1877 with self.engine.connect() as connection: 1878 with connection.begin(): 1879 if do_identity_insert: 1880 identity_on_result = self.exec( 1881 f"SET IDENTITY_INSERT {pipe_name} ON", 1882 commit=False, 1883 _connection=connection, 1884 close=False, 1885 debug=debug, 1886 ) 1887 if identity_on_result is None: 1888 return False, f"Could not enable identity inserts on {pipe}." 1889 1890 stats = self.to_sql( 1891 unseen_df, 1892 _connection=connection, 1893 **unseen_kw 1894 ) 1895 1896 if do_identity_insert: 1897 identity_off_result = self.exec( 1898 f"SET IDENTITY_INSERT {pipe_name} OFF", 1899 commit=False, 1900 _connection=connection, 1901 close=False, 1902 debug=debug, 1903 ) 1904 if identity_off_result is None: 1905 return False, f"Could not disable identity inserts on {pipe}." 1906 1907 if is_new: 1908 if not self.create_indices(pipe, debug=debug): 1909 warn(f"Failed to create indices for {pipe}. Continuing...") 1910 1911 if autoincrement_needs_reset: 1912 reset_autoincrement_queries = get_reset_autoincrement_queries( 1913 pipe.target, 1914 primary_key, 1915 self, 1916 schema=self.get_pipe_schema(pipe), 1917 debug=debug, 1918 ) 1919 results = self.exec_queries(reset_autoincrement_queries, debug=debug) 1920 for result in results: 1921 if result is None: 1922 warn(f"Could not reset auto-incrementing primary key for {pipe}.", stack=False) 1923 1924 if update_df is not None and len(update_df) > 0: 1925 temp_target = self.get_temporary_target( 1926 pipe.target, 1927 label=('update' if not upsert else 'upsert'), 1928 ) 1929 self._log_temporary_tables_creation(temp_target, create=(not pipe.temporary), debug=debug) 1930 temp_pipe = Pipe( 1931 pipe.connector_keys.replace(':', '_') + '_', pipe.metric_key, pipe.location_key, 1932 instance=pipe.instance_keys, 1933 columns={ 1934 (ix_key if ix_key != 'primary' else 'primary_'): ix 1935 for ix_key, ix in pipe.columns.items() 1936 if ix and ix in update_df.columns 1937 }, 1938 dtypes={ 1939 col: typ 1940 for col, typ in pipe.dtypes.items() 1941 if col in update_df.columns 1942 }, 1943 target=temp_target, 1944 temporary=True, 1945 enforce=False, 1946 static=True, 1947 autoincrement=False, 1948 parameters={ 1949 'schema': self.internal_schema, 1950 'hypertable': False, 1951 }, 1952 ) 1953 temp_pipe.__dict__['_columns_types'] = { 1954 col: get_db_type_from_pd_type( 1955 pipe.dtypes.get(col, str(typ)), 1956 self.flavor, 1957 ) 1958 for col, typ in update_df.dtypes.items() 1959 } 1960 now_ts = time.perf_counter() 1961 temp_pipe.__dict__['_columns_types_timestamp'] = now_ts 1962 temp_pipe.__dict__['_skip_check_indices'] = True 1963 temp_success, temp_msg = temp_pipe.sync(update_df, check_existing=False, debug=debug) 1964 if not temp_success: 1965 return temp_success, temp_msg 1966 existing_cols = pipe.get_columns_types(debug=debug) 1967 join_cols = [ 1968 col 1969 for col_key, col in pipe.columns.items() 1970 if col and col in existing_cols 1971 ] if not primary_key or self.flavor == 'oracle' else ( 1972 [dt_col, primary_key] 1973 if self.flavor == 'timescaledb' and dt_col and dt_col in update_df.columns 1974 else [primary_key] 1975 ) 1976 update_queries = get_update_queries( 1977 pipe.target, 1978 temp_target, 1979 self, 1980 join_cols, 1981 upsert=upsert, 1982 schema=self.get_pipe_schema(pipe), 1983 patch_schema=self.internal_schema, 1984 datetime_col=(dt_col if dt_col in update_df.columns else None), 1985 identity_insert=(autoincrement and primary_key in update_df.columns), 1986 null_indices=pipe.null_indices, 1987 cast_columns=pipe.enforce, 1988 debug=debug, 1989 ) 1990 update_results = self.exec_queries( 1991 update_queries, 1992 break_on_error=True, 1993 rollback=True, 1994 debug=debug, 1995 ) 1996 update_success = all(update_results) 1997 self._log_temporary_tables_creation( 1998 temp_target, 1999 ready_to_drop=True, 2000 create=(not pipe.temporary), 2001 debug=debug, 2002 ) 2003 if not update_success: 2004 warn(f"Failed to apply update to {pipe}.") 2005 stats['success'] = stats['success'] and update_success 2006 stats['msg'] = ( 2007 (stats.get('msg', '') + f'\nFailed to apply update to {pipe}.').lstrip() 2008 if not update_success 2009 else stats.get('msg', '') 2010 ) 2011 2012 stop = time.perf_counter() 2013 success = stats['success'] 2014 if not success: 2015 return success, stats['msg'] or str(stats) 2016 2017 unseen_count = len(unseen_df.index) if unseen_df is not None else 0 2018 update_count = len(update_df.index) if update_df is not None else 0 2019 msg = ( 2020 ( 2021 f"Inserted {unseen_count:,}, " 2022 + f"updated {update_count:,} rows." 2023 ) 2024 if not upsert 2025 else ( 2026 f"Upserted {update_count:,} row" 2027 + ('s' if update_count != 1 else '') 2028 + "." 2029 ) 2030 ) 2031 if debug: 2032 msg = msg[:-1] + ( 2033 f"\non table {sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))}\n" 2034 + f"in {round(stop - start, 2)} seconds." 2035 ) 2036 2037 if _check_temporary_tables: 2038 drop_stale_success, drop_stale_msg = self._drop_old_temporary_tables( 2039 refresh=False, debug=debug 2040 ) 2041 if not drop_stale_success: 2042 warn(drop_stale_msg) 2043 2044 return success, msg
Sync a pipe using a database connection.
Parameters
- pipe (mrsm.Pipe): The Meerschaum Pipe instance into which to sync the data.
- df (Union[pandas.DataFrame, str, Dict[Any, Any], List[Dict[str, Any]]]):
An optional DataFrame or equivalent to sync into the pipe.
Defaults to
None
. - begin (Optional[datetime], default None):
Optionally specify the earliest datetime to search for data.
Defaults to
None
. - end (Optional[datetime], default None):
Optionally specify the latest datetime to search for data.
Defaults to
None
. - chunksize (Optional[int], default -1):
Specify the number of rows to sync per chunk.
If
-1
, resort to system configuration (default is900
). Achunksize
ofNone
will sync all rows in one transaction. Defaults to-1
. - check_existing (bool, default True):
If
True
, pull and diff with existing data from the pipe. Defaults toTrue
. - blocking (bool, default True):
If
True
, wait for sync to finish and return its result, otherwise asyncronously sync. Defaults toTrue
. - debug (bool, default False): Verbosity toggle. Defaults to False.
- kw (Any): Catch-all for keyword arguments.
Returns
- A
SuccessTuple
of success (bool
) and message (str
).
2047def sync_pipe_inplace( 2048 self, 2049 pipe: 'mrsm.Pipe', 2050 params: Optional[Dict[str, Any]] = None, 2051 begin: Union[datetime, int, None] = None, 2052 end: Union[datetime, int, None] = None, 2053 chunksize: Optional[int] = -1, 2054 check_existing: bool = True, 2055 debug: bool = False, 2056 **kw: Any 2057) -> SuccessTuple: 2058 """ 2059 If a pipe's connector is the same as its instance connector, 2060 it's more efficient to sync the pipe in-place rather than reading data into Pandas. 2061 2062 Parameters 2063 ---------- 2064 pipe: mrsm.Pipe 2065 The pipe whose connector is the same as its instance. 2066 2067 params: Optional[Dict[str, Any]], default None 2068 Optional params dictionary to build the `WHERE` clause. 2069 See `meerschaum.utils.sql.build_where`. 2070 2071 begin: Union[datetime, int, None], default None 2072 Optionally specify the earliest datetime to search for data. 2073 Defaults to `None`. 2074 2075 end: Union[datetime, int, None], default None 2076 Optionally specify the latest datetime to search for data. 2077 Defaults to `None`. 2078 2079 chunksize: Optional[int], default -1 2080 Specify the number of rows to sync per chunk. 2081 If `-1`, resort to system configuration (default is `900`). 2082 A `chunksize` of `None` will sync all rows in one transaction. 2083 Defaults to `-1`. 2084 2085 check_existing: bool, default True 2086 If `True`, pull and diff with existing data from the pipe. 2087 2088 debug: bool, default False 2089 Verbosity toggle. 2090 2091 Returns 2092 ------- 2093 A SuccessTuple. 2094 """ 2095 if self.flavor == 'duckdb': 2096 return pipe.sync( 2097 params=params, 2098 begin=begin, 2099 end=end, 2100 chunksize=chunksize, 2101 check_existing=check_existing, 2102 debug=debug, 2103 _inplace=False, 2104 **kw 2105 ) 2106 from meerschaum.utils.sql import ( 2107 sql_item_name, 2108 get_update_queries, 2109 get_null_replacement, 2110 get_create_table_queries, 2111 get_create_schema_if_not_exists_queries, 2112 get_table_cols_types, 2113 session_execute, 2114 dateadd_str, 2115 UPDATE_QUERIES, 2116 ) 2117 from meerschaum.utils.dtypes.sql import ( 2118 get_pd_type_from_db_type, 2119 get_db_type_from_pd_type, 2120 ) 2121 from meerschaum.utils.misc import generate_password 2122 2123 transaction_id_length = ( 2124 mrsm.get_config( 2125 'system', 'connectors', 'sql', 'instance', 'temporary_target', 'transaction_id_length' 2126 ) 2127 ) 2128 transact_id = generate_password(transaction_id_length) 2129 2130 internal_schema = self.internal_schema 2131 target = pipe.target 2132 temp_table_roots = ['backtrack', 'new', 'delta', 'joined', 'unseen', 'update'] 2133 temp_tables = { 2134 table_root: self.get_temporary_target(target, transact_id=transact_id, label=table_root) 2135 for table_root in temp_table_roots 2136 } 2137 temp_table_names = { 2138 table_root: sql_item_name(table_name_raw, self.flavor, internal_schema) 2139 for table_root, table_name_raw in temp_tables.items() 2140 } 2141 temp_table_aliases = { 2142 table_root: sql_item_name(table_root, self.flavor) 2143 for table_root in temp_table_roots 2144 } 2145 table_alias_as = " AS" if self.flavor != 'oracle' else '' 2146 metadef = self.get_pipe_metadef( 2147 pipe, 2148 params=params, 2149 begin=begin, 2150 end=end, 2151 check_existing=check_existing, 2152 debug=debug, 2153 ) 2154 pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 2155 upsert = pipe.parameters.get('upsert', False) and f'{self.flavor}-upsert' in UPDATE_QUERIES 2156 static = pipe.parameters.get('static', False) 2157 database = getattr(self, 'database', self.parse_uri(self.URI).get('database', None)) 2158 primary_key = pipe.columns.get('primary', None) 2159 primary_key_typ = pipe.dtypes.get(primary_key, None) if primary_key else None 2160 primary_key_db_type = ( 2161 get_db_type_from_pd_type(primary_key_typ, self.flavor) 2162 if primary_key_typ 2163 else None 2164 ) 2165 autoincrement = pipe.parameters.get('autoincrement', False) 2166 dt_col = pipe.columns.get('datetime', None) 2167 dt_col_name = sql_item_name(dt_col, self.flavor, None) if dt_col else None 2168 dt_typ = pipe.dtypes.get(dt_col, 'datetime') if dt_col else None 2169 dt_db_type = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None 2170 2171 def clean_up_temp_tables(ready_to_drop: bool = False): 2172 log_success, log_msg = self._log_temporary_tables_creation( 2173 [ 2174 table 2175 for table in temp_tables.values() 2176 ] if not upsert else [temp_tables['update']], 2177 ready_to_drop=ready_to_drop, 2178 create=(not pipe.temporary), 2179 debug=debug, 2180 ) 2181 if not log_success: 2182 warn(log_msg) 2183 drop_stale_success, drop_stale_msg = self._drop_old_temporary_tables( 2184 refresh=False, 2185 debug=debug, 2186 ) 2187 if not drop_stale_success: 2188 warn(drop_stale_msg) 2189 return drop_stale_success, drop_stale_msg 2190 2191 sqlalchemy, sqlalchemy_orm = mrsm.attempt_import( 2192 'sqlalchemy', 2193 'sqlalchemy.orm', 2194 ) 2195 if not pipe.exists(debug=debug): 2196 schema = self.get_pipe_schema(pipe) 2197 create_pipe_queries = get_create_table_queries( 2198 metadef, 2199 pipe.target, 2200 self.flavor, 2201 schema=schema, 2202 primary_key=primary_key, 2203 primary_key_db_type=primary_key_db_type, 2204 autoincrement=autoincrement, 2205 datetime_column=dt_col, 2206 ) 2207 if schema: 2208 create_pipe_queries = ( 2209 get_create_schema_if_not_exists_queries(schema, self.flavor) 2210 + create_pipe_queries 2211 ) 2212 2213 results = self.exec_queries(create_pipe_queries, debug=debug) 2214 if not all(results): 2215 _ = clean_up_temp_tables() 2216 return False, f"Could not insert new data into {pipe} from its SQL query definition." 2217 2218 if not self.create_indices(pipe, debug=debug): 2219 warn(f"Failed to create indices for {pipe}. Continuing...") 2220 2221 rowcount = pipe.get_rowcount(debug=debug) 2222 _ = clean_up_temp_tables() 2223 return True, f"Inserted {rowcount:,}, updated 0 rows." 2224 2225 session = sqlalchemy_orm.Session(self.engine) 2226 connectable = session if self.flavor != 'duckdb' else self 2227 2228 create_new_query = get_create_table_queries( 2229 metadef, 2230 temp_tables[('new') if not upsert else 'update'], 2231 self.flavor, 2232 schema=internal_schema, 2233 )[0] 2234 (create_new_success, create_new_msg), create_new_results = session_execute( 2235 session, 2236 create_new_query, 2237 with_results=True, 2238 debug=debug, 2239 ) 2240 if not create_new_success: 2241 _ = clean_up_temp_tables() 2242 return create_new_success, create_new_msg 2243 new_count = create_new_results[0].rowcount if create_new_results else 0 2244 2245 new_cols_types = get_table_cols_types( 2246 temp_tables[('new' if not upsert else 'update')], 2247 connectable=connectable, 2248 flavor=self.flavor, 2249 schema=internal_schema, 2250 database=database, 2251 debug=debug, 2252 ) if not static else pipe.get_columns_types(debug=debug) 2253 if not new_cols_types: 2254 return False, f"Failed to get new columns for {pipe}." 2255 2256 new_cols = { 2257 str(col_name): get_pd_type_from_db_type(str(col_type)) 2258 for col_name, col_type in new_cols_types.items() 2259 } 2260 new_cols_str = '\n ' + ',\n '.join([ 2261 sql_item_name(col, self.flavor) 2262 for col in new_cols 2263 ]) 2264 def get_col_typ(col: str, cols_types: Dict[str, str]) -> str: 2265 if self.flavor == 'oracle' and new_cols_types.get(col, '').lower() == 'char': 2266 return new_cols_types[col] 2267 return cols_types[col] 2268 2269 add_cols_queries = self.get_add_columns_queries(pipe, new_cols, debug=debug) 2270 if add_cols_queries: 2271 _ = pipe.__dict__.pop('_columns_types', None) 2272 _ = pipe.__dict__.pop('_columns_indices', None) 2273 self.exec_queries(add_cols_queries, debug=debug) 2274 2275 alter_cols_queries = self.get_alter_columns_queries(pipe, new_cols, debug=debug) 2276 if alter_cols_queries: 2277 _ = pipe.__dict__.pop('_columns_types', None) 2278 self.exec_queries(alter_cols_queries, debug=debug) 2279 2280 insert_queries = [ 2281 ( 2282 f"INSERT INTO {pipe_name} ({new_cols_str})\n" 2283 f"SELECT {new_cols_str}\nFROM {temp_table_names['new']}{table_alias_as}" 2284 f" {temp_table_aliases['new']}" 2285 ) 2286 ] if not check_existing and not upsert else [] 2287 2288 new_queries = insert_queries 2289 new_success, new_msg = ( 2290 session_execute(session, new_queries, debug=debug) 2291 if new_queries 2292 else (True, "Success") 2293 ) 2294 if not new_success: 2295 _ = clean_up_temp_tables() 2296 return new_success, new_msg 2297 2298 if not check_existing: 2299 session.commit() 2300 _ = clean_up_temp_tables() 2301 return True, f"Inserted {new_count}, updated 0 rows." 2302 2303 min_dt_col_name_da = dateadd_str( 2304 flavor=self.flavor, begin=f"MIN({dt_col_name})", db_type=dt_db_type, 2305 ) 2306 max_dt_col_name_da = dateadd_str( 2307 flavor=self.flavor, begin=f"MAX({dt_col_name})", db_type=dt_db_type, 2308 ) 2309 2310 (new_dt_bounds_success, new_dt_bounds_msg), new_dt_bounds_results = session_execute( 2311 session, 2312 [ 2313 "SELECT\n" 2314 f" {min_dt_col_name_da} AS {sql_item_name('min_dt', self.flavor)},\n" 2315 f" {max_dt_col_name_da} AS {sql_item_name('max_dt', self.flavor)}\n" 2316 f"FROM {temp_table_names['new' if not upsert else 'update']}\n" 2317 f"WHERE {dt_col_name} IS NOT NULL" 2318 ], 2319 with_results=True, 2320 debug=debug, 2321 ) if dt_col and not upsert else ((True, "Success"), None) 2322 if not new_dt_bounds_success: 2323 return ( 2324 new_dt_bounds_success, 2325 f"Could not determine in-place datetime bounds:\n{new_dt_bounds_msg}" 2326 ) 2327 2328 if dt_col and not upsert: 2329 begin, end = new_dt_bounds_results[0].fetchone() 2330 2331 backtrack_def = self.get_pipe_data_query( 2332 pipe, 2333 begin=begin, 2334 end=end, 2335 begin_add_minutes=0, 2336 end_add_minutes=1, 2337 params=params, 2338 debug=debug, 2339 order=None, 2340 ) 2341 create_backtrack_query = get_create_table_queries( 2342 backtrack_def, 2343 temp_tables['backtrack'], 2344 self.flavor, 2345 schema=internal_schema, 2346 )[0] 2347 (create_backtrack_success, create_backtrack_msg), create_backtrack_results = session_execute( 2348 session, 2349 create_backtrack_query, 2350 with_results=True, 2351 debug=debug, 2352 ) if not upsert else ((True, "Success"), None) 2353 2354 if not create_backtrack_success: 2355 _ = clean_up_temp_tables() 2356 return create_backtrack_success, create_backtrack_msg 2357 2358 backtrack_cols_types = get_table_cols_types( 2359 temp_tables['backtrack'], 2360 connectable=connectable, 2361 flavor=self.flavor, 2362 schema=internal_schema, 2363 database=database, 2364 debug=debug, 2365 ) if not (upsert or static) else new_cols_types 2366 2367 common_cols = [col for col in new_cols if col in backtrack_cols_types] 2368 primary_key = pipe.columns.get('primary', None) 2369 on_cols = { 2370 col: new_cols.get(col) 2371 for col_key, col in pipe.columns.items() 2372 if ( 2373 col 2374 and 2375 col_key != 'value' 2376 and col in backtrack_cols_types 2377 and col in new_cols 2378 ) 2379 } if not primary_key or self.flavor == 'oracle' else {primary_key: new_cols.get(primary_key)} 2380 2381 null_replace_new_cols_str = ( 2382 '\n ' + ',\n '.join([ 2383 f"COALESCE({temp_table_aliases['new']}.{sql_item_name(col, self.flavor)}, " 2384 + get_null_replacement(get_col_typ(col, new_cols_types), self.flavor) 2385 + ") AS " 2386 + sql_item_name(col, self.flavor, None) 2387 for col, typ in new_cols.items() 2388 ]) 2389 ) 2390 2391 select_delta_query = ( 2392 "SELECT" 2393 + null_replace_new_cols_str 2394 + f"\nFROM {temp_table_names['new']}{table_alias_as} {temp_table_aliases['new']}\n" 2395 + f"LEFT OUTER JOIN {temp_table_names['backtrack']}{table_alias_as} {temp_table_aliases['backtrack']}" 2396 + "\n ON\n " 2397 + '\n AND\n '.join([ 2398 ( 2399 f" COALESCE({temp_table_aliases['new']}." 2400 + sql_item_name(c, self.flavor, None) 2401 + ", " 2402 + get_null_replacement(get_col_typ(c, new_cols_types), self.flavor) 2403 + ")" 2404 + '\n =\n ' 2405 + f" COALESCE({temp_table_aliases['backtrack']}." 2406 + sql_item_name(c, self.flavor, None) 2407 + ", " 2408 + get_null_replacement(get_col_typ(c, backtrack_cols_types), self.flavor) 2409 + ") " 2410 ) for c in common_cols 2411 ]) 2412 + "\nWHERE\n " 2413 + '\n AND\n '.join([ 2414 ( 2415 f"{temp_table_aliases['backtrack']}." + sql_item_name(c, self.flavor) + ' IS NULL' 2416 ) for c in common_cols 2417 ]) 2418 ) 2419 create_delta_query = get_create_table_queries( 2420 select_delta_query, 2421 temp_tables['delta'], 2422 self.flavor, 2423 schema=internal_schema, 2424 )[0] 2425 create_delta_success, create_delta_msg = session_execute( 2426 session, 2427 create_delta_query, 2428 debug=debug, 2429 ) if not upsert else (True, "Success") 2430 if not create_delta_success: 2431 _ = clean_up_temp_tables() 2432 return create_delta_success, create_delta_msg 2433 2434 delta_cols_types = get_table_cols_types( 2435 temp_tables['delta'], 2436 connectable=connectable, 2437 flavor=self.flavor, 2438 schema=internal_schema, 2439 database=database, 2440 debug=debug, 2441 ) if not (upsert or static) else new_cols_types 2442 2443 ### This is a weird bug on SQLite. 2444 ### Sometimes the backtrack dtypes are all empty strings. 2445 if not all(delta_cols_types.values()): 2446 delta_cols_types = new_cols_types 2447 2448 delta_cols = { 2449 col: get_pd_type_from_db_type(typ) 2450 for col, typ in delta_cols_types.items() 2451 } 2452 delta_cols_str = ', '.join([ 2453 sql_item_name(col, self.flavor) 2454 for col in delta_cols 2455 ]) 2456 2457 select_joined_query = ( 2458 "SELECT\n " 2459 + (',\n '.join([ 2460 ( 2461 f"{temp_table_aliases['delta']}." + sql_item_name(c, self.flavor, None) 2462 + " AS " + sql_item_name(c + '_delta', self.flavor, None) 2463 ) for c in delta_cols 2464 ])) 2465 + ",\n " 2466 + (',\n '.join([ 2467 ( 2468 f"{temp_table_aliases['backtrack']}." + sql_item_name(c, self.flavor, None) 2469 + " AS " + sql_item_name(c + '_backtrack', self.flavor, None) 2470 ) for c in backtrack_cols_types 2471 ])) 2472 + f"\nFROM {temp_table_names['delta']}{table_alias_as} {temp_table_aliases['delta']}\n" 2473 + f"LEFT OUTER JOIN {temp_table_names['backtrack']}{table_alias_as}" 2474 + f" {temp_table_aliases['backtrack']}" 2475 + "\n ON\n " 2476 + '\n AND\n '.join([ 2477 ( 2478 f" COALESCE({temp_table_aliases['delta']}." + sql_item_name(c, self.flavor) 2479 + ", " 2480 + get_null_replacement(get_col_typ(c, new_cols_types), self.flavor) + ")" 2481 + '\n =\n ' 2482 + f" COALESCE({temp_table_aliases['backtrack']}." + sql_item_name(c, self.flavor) 2483 + ", " 2484 + get_null_replacement(get_col_typ(c, new_cols_types), self.flavor) + ")" 2485 ) for c, typ in on_cols.items() 2486 ]) 2487 ) 2488 2489 create_joined_query = get_create_table_queries( 2490 select_joined_query, 2491 temp_tables['joined'], 2492 self.flavor, 2493 schema=internal_schema, 2494 )[0] 2495 create_joined_success, create_joined_msg = session_execute( 2496 session, 2497 create_joined_query, 2498 debug=debug, 2499 ) if on_cols and not upsert else (True, "Success") 2500 if not create_joined_success: 2501 _ = clean_up_temp_tables() 2502 return create_joined_success, create_joined_msg 2503 2504 select_unseen_query = ( 2505 "SELECT\n " 2506 + (',\n '.join([ 2507 ( 2508 "CASE\n WHEN " + sql_item_name(c + '_delta', self.flavor, None) 2509 + " != " + get_null_replacement(get_col_typ(c, delta_cols_types), self.flavor) 2510 + " THEN " + sql_item_name(c + '_delta', self.flavor, None) 2511 + "\n ELSE NULL\n END" 2512 + " AS " + sql_item_name(c, self.flavor, None) 2513 ) for c, typ in delta_cols.items() 2514 ])) 2515 + f"\nFROM {temp_table_names['joined']}{table_alias_as} {temp_table_aliases['joined']}\n" 2516 + "WHERE\n " 2517 + '\n AND\n '.join([ 2518 ( 2519 sql_item_name(c + '_backtrack', self.flavor, None) + ' IS NULL' 2520 ) for c in delta_cols 2521 ]) 2522 ) 2523 create_unseen_query = get_create_table_queries( 2524 select_unseen_query, 2525 temp_tables['unseen'], 2526 self.flavor, 2527 internal_schema, 2528 )[0] 2529 (create_unseen_success, create_unseen_msg), create_unseen_results = session_execute( 2530 session, 2531 create_unseen_query, 2532 with_results=True, 2533 debug=debug 2534 ) if not upsert else ((True, "Success"), None) 2535 if not create_unseen_success: 2536 _ = clean_up_temp_tables() 2537 return create_unseen_success, create_unseen_msg 2538 2539 select_update_query = ( 2540 "SELECT\n " 2541 + (',\n '.join([ 2542 ( 2543 "CASE\n WHEN " + sql_item_name(c + '_delta', self.flavor, None) 2544 + " != " + get_null_replacement(get_col_typ(c, delta_cols_types), self.flavor) 2545 + " THEN " + sql_item_name(c + '_delta', self.flavor, None) 2546 + "\n ELSE NULL\n END" 2547 + " AS " + sql_item_name(c, self.flavor, None) 2548 ) for c, typ in delta_cols.items() 2549 ])) 2550 + f"\nFROM {temp_table_names['joined']}{table_alias_as} {temp_table_aliases['joined']}\n" 2551 + "WHERE\n " 2552 + '\n OR\n '.join([ 2553 ( 2554 sql_item_name(c + '_backtrack', self.flavor, None) + ' IS NOT NULL' 2555 ) for c in delta_cols 2556 ]) 2557 ) 2558 2559 create_update_query = get_create_table_queries( 2560 select_update_query, 2561 temp_tables['update'], 2562 self.flavor, 2563 internal_schema, 2564 )[0] 2565 (create_update_success, create_update_msg), create_update_results = session_execute( 2566 session, 2567 create_update_query, 2568 with_results=True, 2569 debug=debug, 2570 ) if on_cols and not upsert else ((True, "Success"), []) 2571 apply_update_queries = ( 2572 get_update_queries( 2573 pipe.target, 2574 temp_tables['update'], 2575 session, 2576 on_cols, 2577 upsert=upsert, 2578 schema=self.get_pipe_schema(pipe), 2579 patch_schema=internal_schema, 2580 datetime_col=pipe.columns.get('datetime', None), 2581 flavor=self.flavor, 2582 null_indices=pipe.null_indices, 2583 cast_columns=pipe.enforce, 2584 debug=debug, 2585 ) 2586 if on_cols else [] 2587 ) 2588 2589 apply_unseen_queries = [ 2590 ( 2591 f"INSERT INTO {pipe_name} ({delta_cols_str})\n" 2592 + f"SELECT {delta_cols_str}\nFROM " 2593 + ( 2594 temp_table_names['unseen'] 2595 if on_cols 2596 else temp_table_names['delta'] 2597 ) 2598 ), 2599 ] 2600 2601 (apply_unseen_success, apply_unseen_msg), apply_unseen_results = session_execute( 2602 session, 2603 apply_unseen_queries, 2604 with_results=True, 2605 debug=debug, 2606 ) if not upsert else ((True, "Success"), None) 2607 if not apply_unseen_success: 2608 _ = clean_up_temp_tables() 2609 return apply_unseen_success, apply_unseen_msg 2610 unseen_count = apply_unseen_results[0].rowcount if apply_unseen_results else 0 2611 2612 (apply_update_success, apply_update_msg), apply_update_results = session_execute( 2613 session, 2614 apply_update_queries, 2615 with_results=True, 2616 debug=debug, 2617 ) 2618 if not apply_update_success: 2619 _ = clean_up_temp_tables() 2620 return apply_update_success, apply_update_msg 2621 update_count = apply_update_results[0].rowcount if apply_update_results else 0 2622 2623 session.commit() 2624 2625 msg = ( 2626 f"Inserted {unseen_count:,}, updated {update_count:,} rows." 2627 if not upsert 2628 else f"Upserted {update_count:,} row" + ('s' if update_count != 1 else '') + "." 2629 ) 2630 _ = clean_up_temp_tables(ready_to_drop=True) 2631 2632 return True, msg
If a pipe's connector is the same as its instance connector, it's more efficient to sync the pipe in-place rather than reading data into Pandas.
Parameters
- pipe (mrsm.Pipe): The pipe whose connector is the same as its instance.
- params (Optional[Dict[str, Any]], default None):
Optional params dictionary to build the
WHERE
clause. Seemeerschaum.utils.sql.build_where
. - begin (Union[datetime, int, None], default None):
Optionally specify the earliest datetime to search for data.
Defaults to
None
. - end (Union[datetime, int, None], default None):
Optionally specify the latest datetime to search for data.
Defaults to
None
. - chunksize (Optional[int], default -1):
Specify the number of rows to sync per chunk.
If
-1
, resort to system configuration (default is900
). Achunksize
ofNone
will sync all rows in one transaction. Defaults to-1
. - check_existing (bool, default True):
If
True
, pull and diff with existing data from the pipe. - debug (bool, default False): Verbosity toggle.
Returns
- A SuccessTuple.
2635def get_sync_time( 2636 self, 2637 pipe: 'mrsm.Pipe', 2638 params: Optional[Dict[str, Any]] = None, 2639 newest: bool = True, 2640 remote: bool = False, 2641 debug: bool = False, 2642) -> Union[datetime, int, None]: 2643 """Get a Pipe's most recent datetime value. 2644 2645 Parameters 2646 ---------- 2647 pipe: mrsm.Pipe 2648 The pipe to get the sync time for. 2649 2650 params: Optional[Dict[str, Any]], default None 2651 Optional params dictionary to build the `WHERE` clause. 2652 See `meerschaum.utils.sql.build_where`. 2653 2654 newest: bool, default True 2655 If `True`, get the most recent datetime (honoring `params`). 2656 If `False`, get the oldest datetime (ASC instead of DESC). 2657 2658 remote: bool, default False 2659 If `True`, return the sync time for the remote fetch definition. 2660 2661 Returns 2662 ------- 2663 A `datetime` object (or `int` if using an integer axis) if the pipe exists, otherwise `None`. 2664 """ 2665 from meerschaum.utils.sql import sql_item_name, build_where, wrap_query_with_cte 2666 src_name = sql_item_name('src', self.flavor) 2667 table_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 2668 2669 dt_col = pipe.columns.get('datetime', None) 2670 if dt_col is None: 2671 return None 2672 dt_col_name = sql_item_name(dt_col, self.flavor, None) 2673 2674 if remote and pipe.connector.type != 'sql': 2675 warn(f"Cannot get the remote sync time for {pipe}.") 2676 return None 2677 2678 ASC_or_DESC = "DESC" if newest else "ASC" 2679 existing_cols = pipe.get_columns_types(debug=debug) 2680 valid_params = {} 2681 if params is not None: 2682 valid_params = {k: v for k, v in params.items() if k in existing_cols} 2683 flavor = self.flavor if not remote else pipe.connector.flavor 2684 2685 ### If no bounds are provided for the datetime column, 2686 ### add IS NOT NULL to the WHERE clause. 2687 if dt_col not in valid_params: 2688 valid_params[dt_col] = '_None' 2689 where = "" if not valid_params else build_where(valid_params, self) 2690 src_query = ( 2691 f"SELECT {dt_col_name}\nFROM {table_name}{where}" 2692 if not remote 2693 else self.get_pipe_metadef(pipe, params=params, begin=None, end=None) 2694 ) 2695 2696 base_query = ( 2697 f"SELECT {dt_col_name}\n" 2698 f"FROM {src_name}{where}\n" 2699 f"ORDER BY {dt_col_name} {ASC_or_DESC}\n" 2700 f"LIMIT 1" 2701 ) 2702 if self.flavor == 'mssql': 2703 base_query = ( 2704 f"SELECT TOP 1 {dt_col_name}\n" 2705 f"FROM {src_name}{where}\n" 2706 f"ORDER BY {dt_col_name} {ASC_or_DESC}" 2707 ) 2708 elif self.flavor == 'oracle': 2709 base_query = ( 2710 "SELECT * FROM (\n" 2711 f" SELECT {dt_col_name}\n" 2712 f" FROM {src_name}{where}\n" 2713 f" ORDER BY {dt_col_name} {ASC_or_DESC}\n" 2714 ") WHERE ROWNUM = 1" 2715 ) 2716 2717 query = wrap_query_with_cte(src_query, base_query, flavor) 2718 2719 try: 2720 db_time = self.value(query, silent=True, debug=debug) 2721 2722 ### No datetime could be found. 2723 if db_time is None: 2724 return None 2725 ### sqlite returns str. 2726 if isinstance(db_time, str): 2727 dateutil_parser = mrsm.attempt_import('dateutil.parser') 2728 st = dateutil_parser.parse(db_time) 2729 ### Do nothing if a datetime object is returned. 2730 elif isinstance(db_time, datetime): 2731 if hasattr(db_time, 'to_pydatetime'): 2732 st = db_time.to_pydatetime() 2733 else: 2734 st = db_time 2735 ### Sometimes the datetime is actually a date. 2736 elif isinstance(db_time, date): 2737 st = datetime.combine(db_time, datetime.min.time()) 2738 ### Adding support for an integer datetime axis. 2739 elif 'int' in str(type(db_time)).lower(): 2740 st = int(db_time) 2741 ### Convert pandas timestamp to Python datetime. 2742 else: 2743 st = db_time.to_pydatetime() 2744 2745 sync_time = st 2746 2747 except Exception as e: 2748 sync_time = None 2749 warn(str(e)) 2750 2751 return sync_time
Get a Pipe's most recent datetime value.
Parameters
- pipe (mrsm.Pipe): The pipe to get the sync time for.
- params (Optional[Dict[str, Any]], default None):
Optional params dictionary to build the
WHERE
clause. Seemeerschaum.utils.sql.build_where
. - newest (bool, default True):
If
True
, get the most recent datetime (honoringparams
). IfFalse
, get the oldest datetime (ASC instead of DESC). - remote (bool, default False):
If
True
, return the sync time for the remote fetch definition.
Returns
- A
datetime
object (orint
if using an integer axis) if the pipe exists, otherwiseNone
.
2754def pipe_exists( 2755 self, 2756 pipe: mrsm.Pipe, 2757 debug: bool = False 2758) -> bool: 2759 """ 2760 Check that a Pipe's table exists. 2761 2762 Parameters 2763 ---------- 2764 pipe: mrsm.Pipe: 2765 The pipe to check. 2766 2767 debug: bool, default False 2768 Verbosity toggle. 2769 2770 Returns 2771 ------- 2772 A `bool` corresponding to whether a pipe's table exists. 2773 2774 """ 2775 from meerschaum.utils.sql import table_exists 2776 exists = table_exists( 2777 pipe.target, 2778 self, 2779 schema=self.get_pipe_schema(pipe), 2780 debug=debug, 2781 ) 2782 if debug: 2783 from meerschaum.utils.debug import dprint 2784 dprint(f"{pipe} " + ('exists.' if exists else 'does not exist.')) 2785 return exists
Check that a Pipe's table exists.
Parameters
- pipe (mrsm.Pipe:): The pipe to check.
- debug (bool, default False): Verbosity toggle.
Returns
- A
bool
corresponding to whether a pipe's table exists.
2788def get_pipe_rowcount( 2789 self, 2790 pipe: mrsm.Pipe, 2791 begin: Union[datetime, int, None] = None, 2792 end: Union[datetime, int, None] = None, 2793 params: Optional[Dict[str, Any]] = None, 2794 remote: bool = False, 2795 debug: bool = False 2796) -> Union[int, None]: 2797 """ 2798 Get the rowcount for a pipe in accordance with given parameters. 2799 2800 Parameters 2801 ---------- 2802 pipe: mrsm.Pipe 2803 The pipe to query with. 2804 2805 begin: Union[datetime, int, None], default None 2806 The begin datetime value. 2807 2808 end: Union[datetime, int, None], default None 2809 The end datetime value. 2810 2811 params: Optional[Dict[str, Any]], default None 2812 See `meerschaum.utils.sql.build_where`. 2813 2814 remote: bool, default False 2815 If `True`, get the rowcount for the remote table. 2816 2817 debug: bool, default False 2818 Verbosity toggle. 2819 2820 Returns 2821 ------- 2822 An `int` for the number of rows if the `pipe` exists, otherwise `None`. 2823 2824 """ 2825 from meerschaum.utils.sql import dateadd_str, sql_item_name, wrap_query_with_cte, build_where 2826 from meerschaum.connectors.sql._fetch import get_pipe_query 2827 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 2828 if remote: 2829 msg = f"'fetch:definition' must be an attribute of {pipe} to get a remote rowcount." 2830 if 'fetch' not in pipe.parameters: 2831 error(msg) 2832 return None 2833 if 'definition' not in pipe.parameters['fetch']: 2834 error(msg) 2835 return None 2836 2837 2838 flavor = self.flavor if not remote else pipe.connector.flavor 2839 conn = self if not remote else pipe.connector 2840 _pipe_name = sql_item_name(pipe.target, flavor, self.get_pipe_schema(pipe)) 2841 dt_col = pipe.columns.get('datetime', None) 2842 dt_typ = pipe.dtypes.get(dt_col, 'datetime') if dt_col else None 2843 dt_db_type = get_db_type_from_pd_type(dt_typ, flavor) if dt_typ else None 2844 if not dt_col: 2845 dt_col = pipe.guess_datetime() 2846 dt_name = sql_item_name(dt_col, flavor, None) if dt_col else None 2847 is_guess = True 2848 else: 2849 dt_col = pipe.get_columns('datetime') 2850 dt_name = sql_item_name(dt_col, flavor, None) 2851 is_guess = False 2852 2853 if begin is not None or end is not None: 2854 if is_guess: 2855 if dt_col is None: 2856 warn( 2857 f"No datetime could be determined for {pipe}." 2858 + "\n Ignoring begin and end...", 2859 stack=False, 2860 ) 2861 begin, end = None, None 2862 else: 2863 warn( 2864 f"A datetime wasn't specified for {pipe}.\n" 2865 + f" Using column \"{dt_col}\" for datetime bounds...", 2866 stack=False, 2867 ) 2868 2869 2870 _datetime_name = sql_item_name(dt_col, flavor) 2871 _cols_names = [ 2872 sql_item_name(col, flavor) 2873 for col in set( 2874 ( 2875 [dt_col] 2876 if dt_col 2877 else [] 2878 ) + ( 2879 [] 2880 if params is None 2881 else list(params.keys()) 2882 ) 2883 ) 2884 ] 2885 if not _cols_names: 2886 _cols_names = ['*'] 2887 2888 src = ( 2889 f"SELECT {', '.join(_cols_names)}\nFROM {_pipe_name}" 2890 if not remote 2891 else get_pipe_query(pipe) 2892 ) 2893 parent_query = f"SELECT COUNT(*)\nFROM {sql_item_name('src', flavor)}" 2894 query = wrap_query_with_cte(src, parent_query, flavor) 2895 if begin is not None or end is not None: 2896 query += "\nWHERE" 2897 if begin is not None: 2898 query += ( 2899 f"\n {dt_name} >= " 2900 + dateadd_str(flavor, datepart='minute', number=0, begin=begin, db_type=dt_db_type) 2901 ) 2902 if end is not None and begin is not None: 2903 query += "\n AND" 2904 if end is not None: 2905 query += ( 2906 f"\n {dt_name} < " 2907 + dateadd_str(flavor, datepart='minute', number=0, begin=end, db_type=dt_db_type) 2908 ) 2909 if params is not None: 2910 existing_cols = pipe.get_columns_types(debug=debug) 2911 valid_params = {k: v for k, v in params.items() if k in existing_cols} 2912 if valid_params: 2913 query += build_where(valid_params, conn).replace('WHERE', ( 2914 'AND' if (begin is not None or end is not None) 2915 else 'WHERE' 2916 ) 2917 ) 2918 2919 result = conn.value(query, debug=debug, silent=True) 2920 try: 2921 return int(result) 2922 except Exception: 2923 return None
Get the rowcount for a pipe in accordance with given parameters.
Parameters
- pipe (mrsm.Pipe): The pipe to query with.
- begin (Union[datetime, int, None], default None): The begin datetime value.
- end (Union[datetime, int, None], default None): The end datetime value.
- params (Optional[Dict[str, Any]], default None):
See
meerschaum.utils.sql.build_where
. - remote (bool, default False):
If
True
, get the rowcount for the remote table. - debug (bool, default False): Verbosity toggle.
Returns
- An
int
for the number of rows if thepipe
exists, otherwiseNone
.
2926def drop_pipe( 2927 self, 2928 pipe: mrsm.Pipe, 2929 debug: bool = False, 2930 **kw 2931) -> SuccessTuple: 2932 """ 2933 Drop a pipe's tables but maintain its registration. 2934 2935 Parameters 2936 ---------- 2937 pipe: mrsm.Pipe 2938 The pipe to drop. 2939 2940 Returns 2941 ------- 2942 A `SuccessTuple` indicated success. 2943 """ 2944 from meerschaum.utils.sql import table_exists, sql_item_name, DROP_IF_EXISTS_FLAVORS 2945 success = True 2946 target = pipe.target 2947 schema = self.get_pipe_schema(pipe) 2948 target_name = ( 2949 sql_item_name(target, self.flavor, schema) 2950 ) 2951 if table_exists(target, self, schema=schema, debug=debug): 2952 if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else "" 2953 success = self.exec( 2954 f"DROP TABLE {if_exists_str} {target_name}", silent=True, debug=debug 2955 ) is not None 2956 2957 msg = "Success" if success else f"Failed to drop {pipe}." 2958 return success, msg
Drop a pipe's tables but maintain its registration.
Parameters
- pipe (mrsm.Pipe): The pipe to drop.
Returns
- A
SuccessTuple
indicated success.
2961def clear_pipe( 2962 self, 2963 pipe: mrsm.Pipe, 2964 begin: Union[datetime, int, None] = None, 2965 end: Union[datetime, int, None] = None, 2966 params: Optional[Dict[str, Any]] = None, 2967 debug: bool = False, 2968 **kw 2969) -> SuccessTuple: 2970 """ 2971 Delete a pipe's data within a bounded or unbounded interval without dropping the table. 2972 2973 Parameters 2974 ---------- 2975 pipe: mrsm.Pipe 2976 The pipe to clear. 2977 2978 begin: Union[datetime, int, None], default None 2979 Beginning datetime. Inclusive. 2980 2981 end: Union[datetime, int, None], default None 2982 Ending datetime. Exclusive. 2983 2984 params: Optional[Dict[str, Any]], default None 2985 See `meerschaum.utils.sql.build_where`. 2986 2987 """ 2988 if not pipe.exists(debug=debug): 2989 return True, f"{pipe} does not exist, so nothing was cleared." 2990 2991 from meerschaum.utils.sql import sql_item_name, build_where, dateadd_str 2992 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 2993 pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 2994 2995 dt_col = pipe.columns.get('datetime', None) 2996 dt_typ = pipe.dtypes.get(dt_col, 'datetime') if dt_col else None 2997 dt_db_type = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None 2998 if not pipe.columns.get('datetime', None): 2999 dt_col = pipe.guess_datetime() 3000 dt_name = sql_item_name(dt_col, self.flavor, None) if dt_col else None 3001 is_guess = True 3002 else: 3003 dt_col = pipe.get_columns('datetime') 3004 dt_name = sql_item_name(dt_col, self.flavor, None) 3005 is_guess = False 3006 3007 if begin is not None or end is not None: 3008 if is_guess: 3009 if dt_col is None: 3010 warn( 3011 f"No datetime could be determined for {pipe}." 3012 + "\n Ignoring datetime bounds...", 3013 stack=False, 3014 ) 3015 begin, end = None, None 3016 else: 3017 warn( 3018 f"A datetime wasn't specified for {pipe}.\n" 3019 + f" Using column \"{dt_col}\" for datetime bounds...", 3020 stack=False, 3021 ) 3022 3023 valid_params = {} 3024 if params is not None: 3025 existing_cols = pipe.get_columns_types(debug=debug) 3026 valid_params = {k: v for k, v in params.items() if k in existing_cols} 3027 clear_query = ( 3028 f"DELETE FROM {pipe_name}\nWHERE 1 = 1\n" 3029 + ('\n AND ' + build_where(valid_params, self, with_where=False) if valid_params else '') 3030 + ( 3031 ( 3032 f'\n AND {dt_name} >= ' 3033 + dateadd_str(self.flavor, 'day', 0, begin, db_type=dt_db_type) 3034 ) 3035 if begin is not None 3036 else '' 3037 ) + ( 3038 ( 3039 f'\n AND {dt_name} < ' 3040 + dateadd_str(self.flavor, 'day', 0, end, db_type=dt_db_type) 3041 ) 3042 if end is not None 3043 else '' 3044 ) 3045 ) 3046 success = self.exec(clear_query, silent=True, debug=debug) is not None 3047 msg = "Success" if success else f"Failed to clear {pipe}." 3048 return success, msg
Delete a pipe's data within a bounded or unbounded interval without dropping the table.
Parameters
- pipe (mrsm.Pipe): The pipe to clear.
- begin (Union[datetime, int, None], default None): Beginning datetime. Inclusive.
- end (Union[datetime, int, None], default None): Ending datetime. Exclusive.
- params (Optional[Dict[str, Any]], default None):
See
meerschaum.utils.sql.build_where
.
3652def deduplicate_pipe( 3653 self, 3654 pipe: mrsm.Pipe, 3655 begin: Union[datetime, int, None] = None, 3656 end: Union[datetime, int, None] = None, 3657 params: Optional[Dict[str, Any]] = None, 3658 debug: bool = False, 3659 **kwargs: Any 3660) -> SuccessTuple: 3661 """ 3662 Delete duplicate values within a pipe's table. 3663 3664 Parameters 3665 ---------- 3666 pipe: mrsm.Pipe 3667 The pipe whose table to deduplicate. 3668 3669 begin: Union[datetime, int, None], default None 3670 If provided, only deduplicate values greater than or equal to this value. 3671 3672 end: Union[datetime, int, None], default None 3673 If provided, only deduplicate values less than this value. 3674 3675 params: Optional[Dict[str, Any]], default None 3676 If provided, further limit deduplication to values which match this query dictionary. 3677 3678 debug: bool, default False 3679 Verbosity toggle. 3680 3681 Returns 3682 ------- 3683 A `SuccessTuple` indicating success. 3684 """ 3685 from meerschaum.utils.sql import ( 3686 sql_item_name, 3687 get_rename_table_queries, 3688 DROP_IF_EXISTS_FLAVORS, 3689 get_create_table_query, 3690 format_cte_subquery, 3691 get_null_replacement, 3692 ) 3693 from meerschaum.utils.misc import generate_password, flatten_list 3694 3695 pipe_table_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 3696 3697 if not pipe.exists(debug=debug): 3698 return False, f"Table {pipe_table_name} does not exist." 3699 3700 dt_col = pipe.columns.get('datetime', None) 3701 cols_types = pipe.get_columns_types(debug=debug) 3702 existing_cols = pipe.get_columns_types(debug=debug) 3703 3704 get_rowcount_query = f"SELECT COUNT(*) FROM {pipe_table_name}" 3705 old_rowcount = self.value(get_rowcount_query, debug=debug) 3706 if old_rowcount is None: 3707 return False, f"Failed to get rowcount for table {pipe_table_name}." 3708 3709 ### Non-datetime indices that in fact exist. 3710 indices = [ 3711 col 3712 for key, col in pipe.columns.items() 3713 if col and col != dt_col and col in cols_types 3714 ] 3715 indices_names = [sql_item_name(index_col, self.flavor, None) for index_col in indices] 3716 existing_cols_names = [sql_item_name(col, self.flavor, None) for col in existing_cols] 3717 duplicate_row_number_name = sql_item_name('dup_row_num', self.flavor, None) 3718 previous_row_number_name = sql_item_name('prev_row_num', self.flavor, None) 3719 3720 index_list_str = ( 3721 sql_item_name(dt_col, self.flavor, None) 3722 if dt_col 3723 else '' 3724 ) 3725 index_list_str_ordered = ( 3726 ( 3727 sql_item_name(dt_col, self.flavor, None) + " DESC" 3728 ) 3729 if dt_col 3730 else '' 3731 ) 3732 if indices: 3733 index_list_str += ', ' + ', '.join(indices_names) 3734 index_list_str_ordered += ', ' + ', '.join(indices_names) 3735 if index_list_str.startswith(','): 3736 index_list_str = index_list_str.lstrip(',').lstrip() 3737 if index_list_str_ordered.startswith(','): 3738 index_list_str_ordered = index_list_str_ordered.lstrip(',').lstrip() 3739 3740 cols_list_str = ', '.join(existing_cols_names) 3741 3742 try: 3743 ### NOTE: MySQL 5 and below does not support window functions (ROW_NUMBER()). 3744 is_old_mysql = ( 3745 self.flavor in ('mysql', 'mariadb') 3746 and 3747 int(self.db_version.split('.')[0]) < 8 3748 ) 3749 except Exception: 3750 is_old_mysql = False 3751 3752 src_query = f""" 3753 SELECT 3754 {cols_list_str}, 3755 ROW_NUMBER() OVER ( 3756 PARTITION BY 3757 {index_list_str} 3758 ORDER BY {index_list_str_ordered} 3759 ) AS {duplicate_row_number_name} 3760 FROM {pipe_table_name} 3761 """ 3762 duplicates_cte_subquery = format_cte_subquery( 3763 src_query, 3764 self.flavor, 3765 sub_name = 'src', 3766 cols_to_select = cols_list_str, 3767 ) + f""" 3768 WHERE {duplicate_row_number_name} = 1 3769 """ 3770 old_mysql_query = ( 3771 f""" 3772 SELECT 3773 {index_list_str} 3774 FROM ( 3775 SELECT 3776 {index_list_str}, 3777 IF( 3778 @{previous_row_number_name} <> {index_list_str.replace(', ', ' + ')}, 3779 @{duplicate_row_number_name} := 0, 3780 @{duplicate_row_number_name} 3781 ), 3782 @{previous_row_number_name} := {index_list_str.replace(', ', ' + ')}, 3783 @{duplicate_row_number_name} := @{duplicate_row_number_name} + 1 AS """ 3784 + f"""{duplicate_row_number_name} 3785 FROM 3786 {pipe_table_name}, 3787 ( 3788 SELECT @{duplicate_row_number_name} := 0 3789 ) AS {duplicate_row_number_name}, 3790 ( 3791 SELECT @{previous_row_number_name} := '{get_null_replacement('str', 'mysql')}' 3792 ) AS {previous_row_number_name} 3793 ORDER BY {index_list_str_ordered} 3794 ) AS t 3795 WHERE {duplicate_row_number_name} = 1 3796 """ 3797 ) 3798 if is_old_mysql: 3799 duplicates_cte_subquery = old_mysql_query 3800 3801 session_id = generate_password(3) 3802 3803 dedup_table = self.get_temporary_target(pipe.target, transact_id=session_id, label='dedup') 3804 temp_old_table = self.get_temporary_target(pipe.target, transact_id=session_id, label='old') 3805 temp_old_table_name = sql_item_name(temp_old_table, self.flavor, self.get_pipe_schema(pipe)) 3806 3807 create_temporary_table_query = get_create_table_query( 3808 duplicates_cte_subquery, 3809 dedup_table, 3810 self.flavor, 3811 ) + f""" 3812 ORDER BY {index_list_str_ordered} 3813 """ 3814 if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else "" 3815 alter_queries = flatten_list([ 3816 get_rename_table_queries( 3817 pipe.target, 3818 temp_old_table, 3819 self.flavor, 3820 schema=self.get_pipe_schema(pipe), 3821 ), 3822 get_rename_table_queries( 3823 dedup_table, 3824 pipe.target, 3825 self.flavor, 3826 schema=self.get_pipe_schema(pipe), 3827 ), 3828 f"DROP TABLE {if_exists_str} {temp_old_table_name}", 3829 ]) 3830 3831 self._log_temporary_tables_creation(temp_old_table, create=(not pipe.temporary), debug=debug) 3832 create_temporary_result = self.execute(create_temporary_table_query, debug=debug) 3833 if create_temporary_result is None: 3834 return False, f"Failed to deduplicate table {pipe_table_name}." 3835 3836 results = self.exec_queries( 3837 alter_queries, 3838 break_on_error=True, 3839 rollback=True, 3840 debug=debug, 3841 ) 3842 3843 fail_query = None 3844 for result, query in zip(results, alter_queries): 3845 if result is None: 3846 fail_query = query 3847 break 3848 success = fail_query is None 3849 3850 new_rowcount = ( 3851 self.value(get_rowcount_query, debug=debug) 3852 if success 3853 else None 3854 ) 3855 3856 msg = ( 3857 ( 3858 f"Successfully deduplicated table {pipe_table_name}" 3859 + ( 3860 f"\nfrom {old_rowcount:,} to {new_rowcount:,} rows" 3861 if old_rowcount != new_rowcount 3862 else '' 3863 ) + '.' 3864 ) 3865 if success 3866 else f"Failed to execute query:\n{fail_query}" 3867 ) 3868 return success, msg
Delete duplicate values within a pipe's table.
Parameters
- pipe (mrsm.Pipe): The pipe whose table to deduplicate.
- begin (Union[datetime, int, None], default None): If provided, only deduplicate values greater than or equal to this value.
- end (Union[datetime, int, None], default None): If provided, only deduplicate values less than this value.
- params (Optional[Dict[str, Any]], default None): If provided, further limit deduplication to values which match this query dictionary.
- debug (bool, default False): Verbosity toggle.
Returns
- A
SuccessTuple
indicating success.
3051def get_pipe_table( 3052 self, 3053 pipe: mrsm.Pipe, 3054 debug: bool = False, 3055) -> Union['sqlalchemy.Table', None]: 3056 """ 3057 Return the `sqlalchemy.Table` object for a `mrsm.Pipe`. 3058 3059 Parameters 3060 ---------- 3061 pipe: mrsm.Pipe: 3062 The pipe in question. 3063 3064 Returns 3065 ------- 3066 A `sqlalchemy.Table` object. 3067 3068 """ 3069 from meerschaum.utils.sql import get_sqlalchemy_table 3070 if not pipe.exists(debug=debug): 3071 return None 3072 return get_sqlalchemy_table( 3073 pipe.target, 3074 connector=self, 3075 schema=self.get_pipe_schema(pipe), 3076 debug=debug, 3077 refresh=True, 3078 )
Return the sqlalchemy.Table
object for a mrsm.Pipe
.
Parameters
- pipe (mrsm.Pipe:): The pipe in question.
Returns
- A
sqlalchemy.Table
object.
3081def get_pipe_columns_types( 3082 self, 3083 pipe: mrsm.Pipe, 3084 debug: bool = False, 3085) -> Dict[str, str]: 3086 """ 3087 Get the pipe's columns and types. 3088 3089 Parameters 3090 ---------- 3091 pipe: mrsm.Pipe: 3092 The pipe to get the columns for. 3093 3094 Returns 3095 ------- 3096 A dictionary of columns names (`str`) and types (`str`). 3097 3098 Examples 3099 -------- 3100 >>> conn.get_pipe_columns_types(pipe) 3101 { 3102 'dt': 'TIMESTAMP WITHOUT TIMEZONE', 3103 'id': 'BIGINT', 3104 'val': 'DOUBLE PRECISION', 3105 } 3106 >>> 3107 """ 3108 from meerschaum.utils.sql import get_table_cols_types 3109 if not pipe.exists(debug=debug): 3110 return {} 3111 3112 if self.flavor not in ('oracle', 'mysql', 'mariadb', 'sqlite'): 3113 return get_table_cols_types( 3114 pipe.target, 3115 self, 3116 flavor=self.flavor, 3117 schema=self.get_pipe_schema(pipe), 3118 debug=debug, 3119 ) 3120 3121 table_columns = {} 3122 try: 3123 pipe_table = self.get_pipe_table(pipe, debug=debug) 3124 if pipe_table is None: 3125 return {} 3126 for col in pipe_table.columns: 3127 table_columns[str(col.name)] = str(col.type) 3128 except Exception as e: 3129 import traceback 3130 traceback.print_exc() 3131 warn(e) 3132 table_columns = {} 3133 3134 return table_columns
Get the pipe's columns and types.
Parameters
- pipe (mrsm.Pipe:): The pipe to get the columns for.
Returns
- A dictionary of columns names (
str
) and types (str
).
Examples
>>> conn.get_pipe_columns_types(pipe)
{
'dt': 'TIMESTAMP WITHOUT TIMEZONE',
'id': 'BIGINT',
'val': 'DOUBLE PRECISION',
}
>>>
3596def get_to_sql_dtype( 3597 self, 3598 pipe: 'mrsm.Pipe', 3599 df: 'pd.DataFrame', 3600 update_dtypes: bool = True, 3601) -> Dict[str, 'sqlalchemy.sql.visitors.TraversibleType']: 3602 """ 3603 Given a pipe and DataFrame, return the `dtype` dictionary for `to_sql()`. 3604 3605 Parameters 3606 ---------- 3607 pipe: mrsm.Pipe 3608 The pipe which may contain a `dtypes` parameter. 3609 3610 df: pd.DataFrame 3611 The DataFrame to be pushed via `to_sql()`. 3612 3613 update_dtypes: bool, default True 3614 If `True`, patch the pipe's dtypes onto the DataFrame's dtypes. 3615 3616 Returns 3617 ------- 3618 A dictionary with `sqlalchemy` datatypes. 3619 3620 Examples 3621 -------- 3622 >>> import pandas as pd 3623 >>> import meerschaum as mrsm 3624 >>> 3625 >>> conn = mrsm.get_connector('sql:memory') 3626 >>> df = pd.DataFrame([{'a': {'b': 1}}]) 3627 >>> pipe = mrsm.Pipe('a', 'b', dtypes={'a': 'json'}) 3628 >>> get_to_sql_dtype(pipe, df) 3629 {'a': <class 'sqlalchemy.sql.sqltypes.JSON'>} 3630 """ 3631 from meerschaum.utils.dataframe import get_json_cols, get_numeric_cols, get_uuid_cols 3632 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 3633 df_dtypes = { 3634 col: str(typ) 3635 for col, typ in df.dtypes.items() 3636 } 3637 json_cols = get_json_cols(df) 3638 numeric_cols = get_numeric_cols(df) 3639 uuid_cols = get_uuid_cols(df) 3640 df_dtypes.update({col: 'json' for col in json_cols}) 3641 df_dtypes.update({col: 'numeric' for col in numeric_cols}) 3642 df_dtypes.update({col: 'uuid' for col in uuid_cols}) 3643 if update_dtypes: 3644 df_dtypes.update(pipe.dtypes) 3645 return { 3646 col: get_db_type_from_pd_type(typ, self.flavor, as_sqlalchemy=True) 3647 for col, typ in df_dtypes.items() 3648 if col and typ 3649 }
Given a pipe and DataFrame, return the dtype
dictionary for to_sql()
.
Parameters
- pipe (mrsm.Pipe):
The pipe which may contain a
dtypes
parameter. - df (pd.DataFrame):
The DataFrame to be pushed via
to_sql()
. - update_dtypes (bool, default True):
If
True
, patch the pipe's dtypes onto the DataFrame's dtypes.
Returns
- A dictionary with
sqlalchemy
datatypes.
Examples
>>> import pandas as pd
>>> import meerschaum as mrsm
>>>
>>> conn = mrsm.get_connector('sql:memory')
>>> df = pd.DataFrame([{'a': {'b': 1}}])
>>> pipe = mrsm.Pipe('a', 'b', dtypes={'a': 'json'})
>>> get_to_sql_dtype(pipe, df)
{'a': <class 'sqlalchemy.sql.sqltypes.JSON'>}
3871def get_pipe_schema(self, pipe: mrsm.Pipe) -> Union[str, None]: 3872 """ 3873 Return the schema to use for this pipe. 3874 First check `pipe.parameters['schema']`, then check `self.schema`. 3875 3876 Parameters 3877 ---------- 3878 pipe: mrsm.Pipe 3879 The pipe which may contain a configured schema. 3880 3881 Returns 3882 ------- 3883 A schema string or `None` if nothing is configured. 3884 """ 3885 return pipe.parameters.get('schema', self.schema)
Return the schema to use for this pipe.
First check pipe.parameters['schema']
, then check self.schema
.
Parameters
- pipe (mrsm.Pipe): The pipe which may contain a configured schema.
Returns
- A schema string or
None
if nothing is configured.
1544def create_pipe_table_from_df( 1545 self, 1546 pipe: mrsm.Pipe, 1547 df: 'pd.DataFrame', 1548 debug: bool = False, 1549) -> mrsm.SuccessTuple: 1550 """ 1551 Create a pipe's table from its configured dtypes and an incoming dataframe. 1552 """ 1553 from meerschaum.utils.dataframe import ( 1554 get_json_cols, 1555 get_numeric_cols, 1556 get_uuid_cols, 1557 get_datetime_cols, 1558 get_bytes_cols, 1559 ) 1560 from meerschaum.utils.sql import ( 1561 get_create_table_queries, 1562 sql_item_name, 1563 get_create_schema_if_not_exists_queries, 1564 ) 1565 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 1566 primary_key = pipe.columns.get('primary', None) 1567 primary_key_typ = ( 1568 pipe.dtypes.get(primary_key, str(df.dtypes.get(primary_key, 'int'))) 1569 if primary_key 1570 else None 1571 ) 1572 primary_key_db_type = ( 1573 get_db_type_from_pd_type(primary_key_typ, self.flavor) 1574 if primary_key 1575 else None 1576 ) 1577 dt_col = pipe.columns.get('datetime', None) 1578 new_dtypes = { 1579 **{ 1580 col: str(typ) 1581 for col, typ in df.dtypes.items() 1582 }, 1583 **{ 1584 col: str(df.dtypes.get(col, 'int')) 1585 for col_ix, col in pipe.columns.items() 1586 if col and col_ix != 'primary' 1587 }, 1588 **{ 1589 col: 'uuid' 1590 for col in get_uuid_cols(df) 1591 }, 1592 **{ 1593 col: 'json' 1594 for col in get_json_cols(df) 1595 }, 1596 **{ 1597 col: 'numeric' 1598 for col in get_numeric_cols(df) 1599 }, 1600 **{ 1601 col: 'bytes' 1602 for col in get_bytes_cols(df) 1603 }, 1604 **{ 1605 col: 'datetime64[ns, UTC]' 1606 for col in get_datetime_cols(df, timezone_aware=True, timezone_naive=False) 1607 }, 1608 **{ 1609 col: 'datetime64[ns]' 1610 for col in get_datetime_cols(df, timezone_aware=False, timezone_naive=True) 1611 }, 1612 **pipe.dtypes 1613 } 1614 autoincrement = ( 1615 pipe.parameters.get('autoincrement', False) 1616 or (primary_key and primary_key not in new_dtypes) 1617 ) 1618 if autoincrement: 1619 _ = new_dtypes.pop(primary_key, None) 1620 1621 schema = self.get_pipe_schema(pipe) 1622 create_table_queries = get_create_table_queries( 1623 new_dtypes, 1624 pipe.target, 1625 self.flavor, 1626 schema=schema, 1627 primary_key=primary_key, 1628 primary_key_db_type=primary_key_db_type, 1629 datetime_column=dt_col, 1630 ) 1631 if schema: 1632 create_table_queries = ( 1633 get_create_schema_if_not_exists_queries(schema, self.flavor) 1634 + create_table_queries 1635 ) 1636 success = all( 1637 self.exec_queries(create_table_queries, break_on_error=True, rollback=True, debug=debug) 1638 ) 1639 target_name = sql_item_name(pipe.target, schema=self.get_pipe_schema(pipe), flavor=self.flavor) 1640 msg = ( 1641 "Success" 1642 if success 1643 else f"Failed to create {target_name}." 1644 ) 1645 return success, msg
Create a pipe's table from its configured dtypes and an incoming dataframe.
3137def get_pipe_columns_indices( 3138 self, 3139 pipe: mrsm.Pipe, 3140 debug: bool = False, 3141) -> Dict[str, List[Dict[str, str]]]: 3142 """ 3143 Return a dictionary mapping columns to the indices created on those columns. 3144 3145 Parameters 3146 ---------- 3147 pipe: mrsm.Pipe 3148 The pipe to be queried against. 3149 3150 Returns 3151 ------- 3152 A dictionary mapping columns names to lists of dictionaries. 3153 The dictionaries in the lists contain the name and type of the indices. 3154 """ 3155 if pipe.__dict__.get('_skip_check_indices', False): 3156 return {} 3157 from meerschaum.utils.sql import get_table_cols_indices 3158 return get_table_cols_indices( 3159 pipe.target, 3160 self, 3161 flavor=self.flavor, 3162 schema=self.get_pipe_schema(pipe), 3163 debug=debug, 3164 )
Return a dictionary mapping columns to the indices created on those columns.
Parameters
- pipe (mrsm.Pipe): The pipe to be queried against.
Returns
- A dictionary mapping columns names to lists of dictionaries.
- The dictionaries in the lists contain the name and type of the indices.
3888@staticmethod 3889def get_temporary_target( 3890 target: str, 3891 transact_id: Optional[str, None] = None, 3892 label: Optional[str] = None, 3893 separator: Optional[str] = None, 3894) -> str: 3895 """ 3896 Return a unique(ish) temporary target for a pipe. 3897 """ 3898 from meerschaum.utils.misc import generate_password 3899 temp_target_cf = ( 3900 mrsm.get_config('system', 'connectors', 'sql', 'instance', 'temporary_target') or {} 3901 ) 3902 transaction_id_len = temp_target_cf.get('transaction_id_length', 3) 3903 transact_id = transact_id or generate_password(transaction_id_len) 3904 temp_prefix = temp_target_cf.get('prefix', '_') 3905 separator = separator or temp_target_cf.get('separator', '_') 3906 return ( 3907 temp_prefix 3908 + target 3909 + separator 3910 + transact_id 3911 + ((separator + label) if label else '') 3912 )
Return a unique(ish) temporary target for a pipe.
314def create_pipe_indices( 315 self, 316 pipe: mrsm.Pipe, 317 columns: Optional[List[str]] = None, 318 debug: bool = False, 319) -> SuccessTuple: 320 """ 321 Create a pipe's indices. 322 """ 323 success = self.create_indices(pipe, columns=columns, debug=debug) 324 msg = ( 325 "Success" 326 if success 327 else f"Failed to create indices for {pipe}." 328 ) 329 return success, msg
Create a pipe's indices.
368def drop_pipe_indices( 369 self, 370 pipe: mrsm.Pipe, 371 columns: Optional[List[str]] = None, 372 debug: bool = False, 373) -> SuccessTuple: 374 """ 375 Drop a pipe's indices. 376 """ 377 success = self.drop_indices(pipe, columns=columns, debug=debug) 378 msg = ( 379 "Success" 380 if success 381 else f"Failed to drop indices for {pipe}." 382 ) 383 return success, msg
Drop a pipe's indices.
421def get_pipe_index_names(self, pipe: mrsm.Pipe) -> Dict[str, str]: 422 """ 423 Return a dictionary mapping index keys to their names on the database. 424 425 Returns 426 ------- 427 A dictionary of index keys to column names. 428 """ 429 from meerschaum.utils.sql import DEFAULT_SCHEMA_FLAVORS 430 _parameters = pipe.parameters 431 _index_template = _parameters.get('index_template', "IX_{schema_str}{target}_{column_names}") 432 _schema = self.get_pipe_schema(pipe) 433 if _schema is None: 434 _schema = ( 435 DEFAULT_SCHEMA_FLAVORS.get(self.flavor, None) 436 if self.flavor != 'mssql' 437 else None 438 ) 439 schema_str = '' if _schema is None else f'{_schema}_' 440 schema_str = '' 441 _indices = pipe.indices 442 _target = pipe.target 443 _column_names = { 444 ix: ( 445 '_'.join(cols) 446 if isinstance(cols, (list, tuple)) 447 else str(cols) 448 ) 449 for ix, cols in _indices.items() 450 if cols 451 } 452 _index_names = { 453 ix: _index_template.format( 454 target=_target, 455 column_names=column_names, 456 connector_keys=pipe.connector_keys, 457 metric_key=pipe.metric_key, 458 location_key=pipe.location_key, 459 schema_str=schema_str, 460 ) 461 for ix, column_names in _column_names.items() 462 } 463 ### NOTE: Skip any duplicate indices. 464 seen_index_names = {} 465 for ix, index_name in _index_names.items(): 466 if index_name in seen_index_names: 467 continue 468 seen_index_names[index_name] = ix 469 return { 470 ix: index_name 471 for index_name, ix in seen_index_names.items() 472 }
Return a dictionary mapping index keys to their names on the database.
Returns
- A dictionary of index keys to column names.
17def register_plugin( 18 self, 19 plugin: 'mrsm.core.Plugin', 20 force: bool = False, 21 debug: bool = False, 22 **kw: Any 23) -> SuccessTuple: 24 """Register a new plugin to the plugins table.""" 25 from meerschaum.utils.packages import attempt_import 26 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 27 from meerschaum.utils.sql import json_flavors 28 from meerschaum.connectors.sql.tables import get_tables 29 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 30 31 old_id = self.get_plugin_id(plugin, debug=debug) 32 33 ### Check for version conflict. May be overridden with `--force`. 34 if old_id is not None and not force: 35 old_version = self.get_plugin_version(plugin, debug=debug) 36 new_version = plugin.version 37 if old_version is None: 38 old_version = '' 39 if new_version is None: 40 new_version = '' 41 42 ### verify that the new version is greater than the old 43 packaging_version = attempt_import('packaging.version') 44 if ( 45 old_version and new_version 46 and packaging_version.parse(old_version) >= packaging_version.parse(new_version) 47 ): 48 return False, ( 49 f"Version '{new_version}' of plugin '{plugin}' " + 50 f"must be greater than existing version '{old_version}'." 51 ) 52 53 bind_variables = { 54 'plugin_name': plugin.name, 55 'version': plugin.version, 56 'attributes': ( 57 json.dumps(plugin.attributes) if self.flavor not in json_flavors else plugin.attributes 58 ), 59 'user_id': plugin.user_id, 60 } 61 62 if old_id is None: 63 query = sqlalchemy.insert(plugins_tbl).values(**bind_variables) 64 else: 65 query = ( 66 sqlalchemy.update(plugins_tbl) 67 .values(**bind_variables) 68 .where(plugins_tbl.c.plugin_id == old_id) 69 ) 70 71 result = self.exec(query, debug=debug) 72 if result is None: 73 return False, f"Failed to register plugin '{plugin}'." 74 return True, f"Successfully registered plugin '{plugin}'."
Register a new plugin to the plugins table.
243def delete_plugin( 244 self, 245 plugin: 'mrsm.core.Plugin', 246 debug: bool = False, 247 **kw: Any 248) -> SuccessTuple: 249 """Delete a plugin from the plugins table.""" 250 from meerschaum.utils.packages import attempt_import 251 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 252 from meerschaum.connectors.sql.tables import get_tables 253 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 254 255 plugin_id = self.get_plugin_id(plugin, debug=debug) 256 if plugin_id is None: 257 return True, f"Plugin '{plugin}' was not registered." 258 259 query = sqlalchemy.delete(plugins_tbl).where(plugins_tbl.c.plugin_id == plugin_id) 260 result = self.exec(query, debug=debug) 261 if result is None: 262 return False, f"Failed to delete plugin '{plugin}'." 263 return True, f"Successfully deleted plugin '{plugin}'."
Delete a plugin from the plugins table.
76def get_plugin_id( 77 self, 78 plugin: 'mrsm.core.Plugin', 79 debug: bool = False 80) -> Optional[int]: 81 """ 82 Return a plugin's ID. 83 """ 84 ### ensure plugins table exists 85 from meerschaum.connectors.sql.tables import get_tables 86 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 87 from meerschaum.utils.packages import attempt_import 88 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 89 90 query = ( 91 sqlalchemy 92 .select(plugins_tbl.c.plugin_id) 93 .where(plugins_tbl.c.plugin_name == plugin.name) 94 ) 95 96 try: 97 return int(self.value(query, debug=debug)) 98 except Exception: 99 return None
Return a plugin's ID.
102def get_plugin_version( 103 self, 104 plugin: 'mrsm.core.Plugin', 105 debug: bool = False 106) -> Optional[str]: 107 """ 108 Return a plugin's version. 109 """ 110 ### ensure plugins table exists 111 from meerschaum.connectors.sql.tables import get_tables 112 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 113 from meerschaum.utils.packages import attempt_import 114 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 115 query = sqlalchemy.select(plugins_tbl.c.version).where(plugins_tbl.c.plugin_name == plugin.name) 116 return self.value(query, debug=debug)
Return a plugin's version.
196def get_plugins( 197 self, 198 user_id: Optional[int] = None, 199 search_term: Optional[str] = None, 200 debug: bool = False, 201 **kw: Any 202) -> List[str]: 203 """ 204 Return a list of all registered plugins. 205 206 Parameters 207 ---------- 208 user_id: Optional[int], default None 209 If specified, filter plugins by a specific `user_id`. 210 211 search_term: Optional[str], default None 212 If specified, add a `WHERE plugin_name LIKE '{search_term}%'` clause to filter the plugins. 213 214 215 Returns 216 ------- 217 A list of plugin names. 218 """ 219 ### ensure plugins table exists 220 from meerschaum.connectors.sql.tables import get_tables 221 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 222 from meerschaum.utils.packages import attempt_import 223 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 224 225 query = sqlalchemy.select(plugins_tbl.c.plugin_name) 226 if user_id is not None: 227 query = query.where(plugins_tbl.c.user_id == user_id) 228 if search_term is not None: 229 query = query.where(plugins_tbl.c.plugin_name.like(search_term + '%')) 230 231 rows = ( 232 self.execute(query).fetchall() 233 if self.flavor != 'duckdb' 234 else [ 235 (row['plugin_name'],) 236 for row in self.read(query).to_dict(orient='records') 237 ] 238 ) 239 240 return [row[0] for row in rows]
Return a list of all registered plugins.
Parameters
- user_id (Optional[int], default None):
If specified, filter plugins by a specific
user_id
. - search_term (Optional[str], default None):
If specified, add a
WHERE plugin_name LIKE '{search_term}%'
clause to filter the plugins.
Returns
- A list of plugin names.
118def get_plugin_user_id( 119 self, 120 plugin: 'mrsm.core.Plugin', 121 debug: bool = False 122) -> Optional[int]: 123 """ 124 Return a plugin's user ID. 125 """ 126 ### ensure plugins table exists 127 from meerschaum.connectors.sql.tables import get_tables 128 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 129 from meerschaum.utils.packages import attempt_import 130 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 131 132 query = ( 133 sqlalchemy 134 .select(plugins_tbl.c.user_id) 135 .where(plugins_tbl.c.plugin_name == plugin.name) 136 ) 137 138 try: 139 return int(self.value(query, debug=debug)) 140 except Exception: 141 return None
Return a plugin's user ID.
143def get_plugin_username( 144 self, 145 plugin: 'mrsm.core.Plugin', 146 debug: bool = False 147) -> Optional[str]: 148 """ 149 Return the username of a plugin's owner. 150 """ 151 ### ensure plugins table exists 152 from meerschaum.connectors.sql.tables import get_tables 153 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 154 users = get_tables(mrsm_instance=self, debug=debug)['users'] 155 from meerschaum.utils.packages import attempt_import 156 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 157 158 query = ( 159 sqlalchemy.select(users.c.username) 160 .where( 161 users.c.user_id == plugins_tbl.c.user_id 162 and plugins_tbl.c.plugin_name == plugin.name 163 ) 164 ) 165 166 return self.value(query, debug=debug)
Return the username of a plugin's owner.
169def get_plugin_attributes( 170 self, 171 plugin: 'mrsm.core.Plugin', 172 debug: bool = False 173) -> Dict[str, Any]: 174 """ 175 Return the attributes of a plugin. 176 """ 177 ### ensure plugins table exists 178 from meerschaum.connectors.sql.tables import get_tables 179 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 180 from meerschaum.utils.packages import attempt_import 181 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 182 183 query = ( 184 sqlalchemy 185 .select(plugins_tbl.c.attributes) 186 .where(plugins_tbl.c.plugin_name == plugin.name) 187 ) 188 189 _attr = self.value(query, debug=debug) 190 if isinstance(_attr, str): 191 _attr = json.loads(_attr) 192 elif _attr is None: 193 _attr = {} 194 return _attr
Return the attributes of a plugin.
16def register_user( 17 self, 18 user: mrsm.core.User, 19 debug: bool = False, 20 **kw: Any 21) -> SuccessTuple: 22 """Register a new user.""" 23 from meerschaum.utils.packages import attempt_import 24 from meerschaum.utils.sql import json_flavors 25 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 26 27 valid_tuple = valid_username(user.username) 28 if not valid_tuple[0]: 29 return valid_tuple 30 31 old_id = self.get_user_id(user, debug=debug) 32 33 if old_id is not None: 34 return False, f"User '{user}' already exists." 35 36 ### ensure users table exists 37 from meerschaum.connectors.sql.tables import get_tables 38 tables = get_tables(mrsm_instance=self, debug=debug) 39 40 import json 41 bind_variables = { 42 'username': user.username, 43 'email': user.email, 44 'password_hash': user.password_hash, 45 'user_type': user.type, 46 'attributes': ( 47 json.dumps(user.attributes) 48 if self.flavor not in json_flavors 49 else user.attributes 50 ), 51 } 52 if old_id is not None: 53 return False, f"User '{user.username}' already exists." 54 if old_id is None: 55 query = ( 56 sqlalchemy.insert(tables['users']). 57 values(**bind_variables) 58 ) 59 60 result = self.exec(query, debug=debug) 61 if result is None: 62 return False, f"Failed to register user '{user}'." 63 return True, f"Successfully registered user '{user}'."
Register a new user.
154def get_user_id( 155 self, 156 user: 'mrsm.core.User', 157 debug: bool = False 158) -> Optional[int]: 159 """If a user is registered, return the `user_id`.""" 160 ### ensure users table exists 161 from meerschaum.utils.packages import attempt_import 162 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 163 from meerschaum.connectors.sql.tables import get_tables 164 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 165 166 query = ( 167 sqlalchemy.select(users_tbl.c.user_id) 168 .where(users_tbl.c.username == user.username) 169 ) 170 171 result = self.value(query, debug=debug) 172 if result is not None: 173 return int(result) 174 return None
If a user is registered, return the user_id
.
248def get_users( 249 self, 250 debug: bool = False, 251 **kw: Any 252) -> List[str]: 253 """ 254 Get the registered usernames. 255 """ 256 ### ensure users table exists 257 from meerschaum.connectors.sql.tables import get_tables 258 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 259 from meerschaum.utils.packages import attempt_import 260 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 261 262 query = sqlalchemy.select(users_tbl.c.username) 263 264 return list(self.read(query, debug=debug)['username'])
Get the registered usernames.
100def edit_user( 101 self, 102 user: 'mrsm.core.User', 103 debug: bool = False, 104 **kw: Any 105) -> SuccessTuple: 106 """Update an existing user's metadata.""" 107 from meerschaum.utils.packages import attempt_import 108 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 109 from meerschaum.connectors.sql.tables import get_tables 110 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 111 112 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 113 if user_id is None: 114 return False, ( 115 f"User '{user.username}' does not exist. " 116 f"Register user '{user.username}' before editing." 117 ) 118 user.user_id = user_id 119 120 import json 121 valid_tuple = valid_username(user.username) 122 if not valid_tuple[0]: 123 return valid_tuple 124 125 bind_variables = { 126 'user_id' : user_id, 127 'username' : user.username, 128 } 129 if user.password != '': 130 bind_variables['password_hash'] = user.password_hash 131 if user.email != '': 132 bind_variables['email'] = user.email 133 if user.attributes is not None and user.attributes != {}: 134 bind_variables['attributes'] = ( 135 json.dumps(user.attributes) if self.flavor in ('duckdb',) 136 else user.attributes 137 ) 138 if user.type != '': 139 bind_variables['user_type'] = user.type 140 141 query = ( 142 sqlalchemy 143 .update(users_tbl) 144 .values(**bind_variables) 145 .where(users_tbl.c.user_id == user_id) 146 ) 147 148 result = self.exec(query, debug=debug) 149 if result is None: 150 return False, f"Failed to edit user '{user}'." 151 return True, f"Successfully edited user '{user}'."
Update an existing user's metadata.
216def delete_user( 217 self, 218 user: 'mrsm.core.User', 219 debug: bool = False 220) -> SuccessTuple: 221 """Delete a user's record from the users table.""" 222 ### ensure users table exists 223 from meerschaum.connectors.sql.tables import get_tables 224 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 225 plugins = get_tables(mrsm_instance=self, debug=debug)['plugins'] 226 from meerschaum.utils.packages import attempt_import 227 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 228 229 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 230 231 if user_id is None: 232 return False, f"User '{user.username}' is not registered and cannot be deleted." 233 234 query = sqlalchemy.delete(users_tbl).where(users_tbl.c.user_id == user_id) 235 236 result = self.exec(query, debug=debug) 237 if result is None: 238 return False, f"Failed to delete user '{user}'." 239 240 query = sqlalchemy.delete(plugins).where(plugins.c.user_id == user_id) 241 result = self.exec(query, debug=debug) 242 if result is None: 243 return False, f"Failed to delete plugins of user '{user}'." 244 245 return True, f"Successfully deleted user '{user}'"
Delete a user's record from the users table.
267def get_user_password_hash( 268 self, 269 user: 'mrsm.core.User', 270 debug: bool = False, 271 **kw: Any 272) -> Optional[str]: 273 """ 274 Return the password has for a user. 275 **NOTE**: This may be dangerous and is only allowed if the security settings explicity allow it. 276 """ 277 from meerschaum.utils.debug import dprint 278 from meerschaum.connectors.sql.tables import get_tables 279 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 280 from meerschaum.utils.packages import attempt_import 281 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 282 283 if user.user_id is not None: 284 user_id = user.user_id 285 if debug: 286 dprint(f"Already given user_id: {user_id}") 287 else: 288 if debug: 289 dprint("Fetching user_id...") 290 user_id = self.get_user_id(user, debug=debug) 291 292 if user_id is None: 293 return None 294 295 query = sqlalchemy.select(users_tbl.c.password_hash).where(users_tbl.c.user_id == user_id) 296 297 return self.value(query, debug=debug)
Return the password has for a user. NOTE: This may be dangerous and is only allowed if the security settings explicity allow it.
300def get_user_type( 301 self, 302 user: 'mrsm.core.User', 303 debug: bool = False, 304 **kw: Any 305) -> Optional[str]: 306 """ 307 Return the user's type. 308 """ 309 from meerschaum.connectors.sql.tables import get_tables 310 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 311 from meerschaum.utils.packages import attempt_import 312 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 313 314 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 315 316 if user_id is None: 317 return None 318 319 query = sqlalchemy.select(users_tbl.c.user_type).where(users_tbl.c.user_id == user_id) 320 321 return self.value(query, debug=debug)
Return the user's type.
176def get_user_attributes( 177 self, 178 user: 'mrsm.core.User', 179 debug: bool = False 180) -> Union[Dict[str, Any], None]: 181 """ 182 Return the user's attributes. 183 """ 184 ### ensure users table exists 185 from meerschaum.utils.warnings import warn 186 from meerschaum.utils.packages import attempt_import 187 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 188 from meerschaum.connectors.sql.tables import get_tables 189 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 190 191 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 192 193 query = ( 194 sqlalchemy.select(users_tbl.c.attributes) 195 .where(users_tbl.c.user_id == user_id) 196 ) 197 198 result = self.value(query, debug=debug) 199 if result is not None and not isinstance(result, dict): 200 try: 201 result = dict(result) 202 _parsed = True 203 except Exception: 204 _parsed = False 205 if not _parsed: 206 try: 207 import json 208 result = json.loads(result) 209 _parsed = True 210 except Exception: 211 _parsed = False 212 if not _parsed: 213 warn(f"Received unexpected type for attributes: {result}") 214 return result
Return the user's attributes.
15@classmethod 16def from_uri( 17 cls, 18 uri: str, 19 label: Optional[str] = None, 20 as_dict: bool = False, 21) -> Union[ 22 'meerschaum.connectors.SQLConnector', 23 Dict[str, Union[str, int]], 24]: 25 """ 26 Create a new SQLConnector from a URI string. 27 28 Parameters 29 ---------- 30 uri: str 31 The URI connection string. 32 33 label: Optional[str], default None 34 If provided, use this as the connector label. 35 Otherwise use the determined database name. 36 37 as_dict: bool, default False 38 If `True`, return a dictionary of the keyword arguments 39 necessary to create a new `SQLConnector`, otherwise create a new object. 40 41 Returns 42 ------- 43 A new SQLConnector object or a dictionary of attributes (if `as_dict` is `True`). 44 """ 45 46 params = cls.parse_uri(uri) 47 params['uri'] = uri 48 flavor = params.get('flavor', None) 49 if not flavor or flavor not in cls.flavor_configs: 50 error(f"Invalid flavor '{flavor}' detected from the provided URI.") 51 52 if 'database' not in params: 53 error("Unable to determine the database from the provided URI.") 54 55 if flavor in ('sqlite', 'duckdb'): 56 if params['database'] == ':memory:': 57 params['label'] = label or f'memory_{flavor}' 58 else: 59 params['label'] = label or params['database'].split(os.path.sep)[-1].lower() 60 else: 61 params['label'] = label or ( 62 ( 63 (params['username'] + '@' if 'username' in params else '') 64 + params.get('host', '') 65 + ('/' if 'host' in params else '') 66 + params.get('database', '') 67 ).lower() 68 ) 69 70 return cls(**params) if not as_dict else params
Create a new SQLConnector from a URI string.
Parameters
- uri (str): The URI connection string.
- label (Optional[str], default None): If provided, use this as the connector label. Otherwise use the determined database name.
- as_dict (bool, default False):
If
True
, return a dictionary of the keyword arguments necessary to create a newSQLConnector
, otherwise create a new object.
Returns
- A new SQLConnector object or a dictionary of attributes (if
as_dict
isTrue
).
73@staticmethod 74def parse_uri(uri: str) -> Dict[str, Any]: 75 """ 76 Parse a URI string into a dictionary of parameters. 77 78 Parameters 79 ---------- 80 uri: str 81 The database connection URI. 82 83 Returns 84 ------- 85 A dictionary of attributes. 86 87 Examples 88 -------- 89 >>> parse_uri('sqlite:////home/foo/bar.db') 90 {'database': '/home/foo/bar.db', 'flavor': 'sqlite'} 91 >>> parse_uri( 92 ... 'mssql+pyodbc://sa:supersecureSECRETPASSWORD123!@localhost:1439' 93 ... + '/master?driver=ODBC+Driver+17+for+SQL+Server' 94 ... ) 95 {'host': 'localhost', 'database': 'master', 'username': 'sa', 96 'password': 'supersecureSECRETPASSWORD123!', 'port': 1439, 'flavor': 'mssql', 97 'driver': 'ODBC Driver 17 for SQL Server'} 98 >>> 99 """ 100 from urllib.parse import parse_qs, urlparse 101 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 102 parser = sqlalchemy.engine.url.make_url 103 params = parser(uri).translate_connect_args() 104 params['flavor'] = uri.split(':')[0].split('+')[0] 105 if params['flavor'] == 'postgres': 106 params['flavor'] = 'postgresql' 107 if '?' in uri: 108 parsed_uri = urlparse(uri) 109 for key, value in parse_qs(parsed_uri.query).items(): 110 params.update({key: value[0]}) 111 112 if '--search_path' in params.get('options', ''): 113 params.update({'schema': params['options'].replace('--search_path=', '', 1)}) 114 return params
Parse a URI string into a dictionary of parameters.
Parameters
- uri (str): The database connection URI.
Returns
- A dictionary of attributes.
Examples
>>> parse_uri('sqlite:////home/foo/bar.db')
{'database': '/home/foo/bar.db', 'flavor': 'sqlite'}
>>> parse_uri(
... 'mssql+pyodbc://sa:supersecureSECRETPASSWORD123!@localhost:1439'
... + '/master?driver=ODBC+Driver+17+for+SQL+Server'
... )
{'host': 'localhost', 'database': 'master', 'username': 'sa',
'password': 'supersecureSECRETPASSWORD123!', 'port': 1439, 'flavor': 'mssql',
'driver': 'ODBC Driver 17 for SQL Server'}
>>>