meerschaum.connectors.sql
Subpackage for SQLConnector subclass
18class SQLConnector(Connector): 19 """ 20 Connect to SQL databases via `sqlalchemy`. 21 22 SQLConnectors may be used as Meerschaum instance connectors. 23 Read more about connectors and instances at 24 https://meerschaum.io/reference/connectors/ 25 26 """ 27 28 IS_INSTANCE: bool = True 29 30 from ._create_engine import flavor_configs, create_engine 31 from ._sql import ( 32 read, 33 value, 34 exec, 35 execute, 36 to_sql, 37 exec_queries, 38 get_connection, 39 _cleanup_connections, 40 ) 41 from meerschaum.utils.sql import test_connection 42 from ._fetch import fetch, get_pipe_metadef 43 from ._cli import cli, _cli_exit 44 from ._pipes import ( 45 fetch_pipes_keys, 46 create_indices, 47 drop_indices, 48 get_create_index_queries, 49 get_drop_index_queries, 50 get_add_columns_queries, 51 get_alter_columns_queries, 52 delete_pipe, 53 get_pipe_data, 54 get_pipe_data_query, 55 register_pipe, 56 edit_pipe, 57 get_pipe_id, 58 get_pipe_attributes, 59 sync_pipe, 60 sync_pipe_inplace, 61 get_sync_time, 62 pipe_exists, 63 get_pipe_rowcount, 64 drop_pipe, 65 clear_pipe, 66 deduplicate_pipe, 67 get_pipe_table, 68 get_pipe_columns_types, 69 get_to_sql_dtype, 70 get_pipe_schema, 71 create_pipe_table_from_df, 72 get_pipe_columns_indices, 73 get_temporary_target, 74 create_pipe_indices, 75 drop_pipe_indices, 76 get_pipe_index_names, 77 ) 78 from ._plugins import ( 79 register_plugin, 80 delete_plugin, 81 get_plugin_id, 82 get_plugin_version, 83 get_plugins, 84 get_plugin_user_id, 85 get_plugin_username, 86 get_plugin_attributes, 87 ) 88 from ._users import ( 89 register_user, 90 get_user_id, 91 get_users, 92 edit_user, 93 delete_user, 94 get_user_password_hash, 95 get_user_type, 96 get_user_attributes, 97 ) 98 from ._uri import from_uri, parse_uri 99 from ._instance import ( 100 _log_temporary_tables_creation, 101 _drop_temporary_table, 102 _drop_temporary_tables, 103 _drop_old_temporary_tables, 104 ) 105 106 def __init__( 107 self, 108 label: Optional[str] = None, 109 flavor: Optional[str] = None, 110 wait: bool = False, 111 connect: bool = False, 112 debug: bool = False, 113 **kw: Any 114 ): 115 """ 116 Parameters 117 ---------- 118 label: str, default 'main' 119 The identifying label for the connector. 120 E.g. for `sql:main`, 'main' is the label. 121 Defaults to 'main'. 122 123 flavor: Optional[str], default None 124 The database flavor, e.g. 125 `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc. 126 To see supported flavors, run the `bootstrap connectors` command. 127 128 wait: bool, default False 129 If `True`, block until a database connection has been made. 130 Defaults to `False`. 131 132 connect: bool, default False 133 If `True`, immediately attempt to connect the database and raise 134 a warning if the connection fails. 135 Defaults to `False`. 136 137 debug: bool, default False 138 Verbosity toggle. 139 Defaults to `False`. 140 141 kw: Any 142 All other arguments will be passed to the connector's attributes. 143 Therefore, a connector may be made without being registered, 144 as long enough parameters are supplied to the constructor. 145 """ 146 if 'uri' in kw: 147 uri = kw['uri'] 148 if uri.startswith('postgres') and not uri.startswith('postgresql'): 149 uri = uri.replace('postgres', 'postgresql', 1) 150 if uri.startswith('postgresql') and not uri.startswith('postgresql+'): 151 uri = uri.replace('postgresql://', 'postgresql+psycopg://', 1) 152 if uri.startswith('timescaledb://'): 153 uri = uri.replace('timescaledb://', 'postgresql+psycopg://', 1) 154 flavor = 'timescaledb' 155 kw['uri'] = uri 156 from_uri_params = self.from_uri(kw['uri'], as_dict=True) 157 label = label or from_uri_params.get('label', None) 158 _ = from_uri_params.pop('label', None) 159 160 ### Sometimes the flavor may be provided with a URI. 161 kw.update(from_uri_params) 162 if flavor: 163 kw['flavor'] = flavor 164 165 ### set __dict__ in base class 166 super().__init__( 167 'sql', 168 label = label or self.__dict__.get('label', None), 169 **kw 170 ) 171 172 if self.__dict__.get('flavor', None) == 'sqlite': 173 self._reset_attributes() 174 self._set_attributes( 175 'sql', 176 label = label, 177 inherit_default = False, 178 **kw 179 ) 180 ### For backwards compatability reasons, set the path for sql:local if its missing. 181 if self.label == 'local' and not self.__dict__.get('database', None): 182 from meerschaum.config._paths import SQLITE_DB_PATH 183 self.database = str(SQLITE_DB_PATH) 184 185 ### ensure flavor and label are set accordingly 186 if 'flavor' not in self.__dict__: 187 if flavor is None and 'uri' not in self.__dict__: 188 raise Exception( 189 f" Missing flavor. Provide flavor as a key for '{self}'." 190 ) 191 self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None) 192 193 if self.flavor == 'postgres': 194 self.flavor = 'postgresql' 195 196 self._debug = debug 197 ### Store the PID and thread at initialization 198 ### so we can dispose of the Pool in child processes or threads. 199 import os 200 import threading 201 self._pid = os.getpid() 202 self._thread_ident = threading.current_thread().ident 203 self._sessions = {} 204 self._locks = {'_sessions': threading.RLock(), } 205 206 ### verify the flavor's requirements are met 207 if self.flavor not in self.flavor_configs: 208 error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector") 209 if not self.__dict__.get('uri'): 210 self.verify_attributes( 211 self.flavor_configs[self.flavor].get('requirements', set()), 212 debug=debug, 213 ) 214 215 if wait: 216 from meerschaum.connectors.poll import retry_connect 217 retry_connect(connector=self, debug=debug) 218 219 if connect: 220 if not self.test_connection(debug=debug): 221 warn(f"Failed to connect with connector '{self}'!", stack=False) 222 223 @property 224 def Session(self): 225 if '_Session' not in self.__dict__: 226 if self.engine is None: 227 return None 228 229 from meerschaum.utils.packages import attempt_import 230 sqlalchemy_orm = attempt_import('sqlalchemy.orm', lazy=False) 231 session_factory = sqlalchemy_orm.sessionmaker(self.engine) 232 self._Session = sqlalchemy_orm.scoped_session(session_factory) 233 234 return self._Session 235 236 @property 237 def engine(self): 238 """ 239 Return the SQLAlchemy engine connected to the configured database. 240 """ 241 import os 242 import threading 243 if '_engine' not in self.__dict__: 244 self._engine, self._engine_str = self.create_engine(include_uri=True) 245 246 same_process = os.getpid() == self._pid 247 same_thread = threading.current_thread().ident == self._thread_ident 248 249 ### handle child processes 250 if not same_process: 251 self._pid = os.getpid() 252 self._thread = threading.current_thread() 253 warn("Different PID detected. Disposing of connections...") 254 self._engine.dispose() 255 256 ### handle different threads 257 if not same_thread: 258 if self.flavor == 'duckdb': 259 warn("Different thread detected.") 260 self._engine.dispose() 261 262 return self._engine 263 264 @property 265 def DATABASE_URL(self) -> str: 266 """ 267 Return the URI connection string (alias for `SQLConnector.URI`. 268 """ 269 _ = self.engine 270 return str(self._engine_str) 271 272 @property 273 def URI(self) -> str: 274 """ 275 Return the URI connection string. 276 """ 277 _ = self.engine 278 return str(self._engine_str) 279 280 @property 281 def IS_THREAD_SAFE(self) -> str: 282 """ 283 Return whether this connector may be multithreaded. 284 """ 285 if self.flavor in ('duckdb', 'oracle'): 286 return False 287 if self.flavor == 'sqlite': 288 return ':memory:' not in self.URI 289 return True 290 291 @property 292 def metadata(self): 293 """ 294 Return the metadata bound to this configured schema. 295 """ 296 from meerschaum.utils.packages import attempt_import 297 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 298 if '_metadata' not in self.__dict__: 299 self._metadata = sqlalchemy.MetaData(schema=self.schema) 300 return self._metadata 301 302 @property 303 def instance_schema(self): 304 """ 305 Return the schema name for Meerschaum tables. 306 """ 307 return self.schema 308 309 @property 310 def internal_schema(self): 311 """ 312 Return the schema name for internal tables. 313 """ 314 from meerschaum.config.static import STATIC_CONFIG 315 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 316 schema_name = self.__dict__.get('internal_schema', None) or ( 317 STATIC_CONFIG['sql']['internal_schema'] 318 if self.flavor not in NO_SCHEMA_FLAVORS 319 else self.schema 320 ) 321 322 if '_internal_schema' not in self.__dict__: 323 self._internal_schema = schema_name 324 return self._internal_schema 325 326 @property 327 def db(self) -> Optional[databases.Database]: 328 from meerschaum.utils.packages import attempt_import 329 databases = attempt_import('databases', lazy=False, install=True) 330 url = self.DATABASE_URL 331 if 'mysql' in url: 332 url = url.replace('+pymysql', '') 333 if '_db' not in self.__dict__: 334 try: 335 self._db = databases.Database(url) 336 except KeyError: 337 ### Likely encountered an unsupported flavor. 338 from meerschaum.utils.warnings import warn 339 self._db = None 340 return self._db 341 342 @property 343 def db_version(self) -> Union[str, None]: 344 """ 345 Return the database version. 346 """ 347 _db_version = self.__dict__.get('_db_version', None) 348 if _db_version is not None: 349 return _db_version 350 351 from meerschaum.utils.sql import get_db_version 352 self._db_version = get_db_version(self) 353 return self._db_version 354 355 @property 356 def schema(self) -> Union[str, None]: 357 """ 358 Return the default schema to use. 359 A value of `None` will not prepend a schema. 360 """ 361 if 'schema' in self.__dict__: 362 return self.__dict__['schema'] 363 364 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 365 if self.flavor in NO_SCHEMA_FLAVORS: 366 self.__dict__['schema'] = None 367 return None 368 369 sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False) 370 _schema = sqlalchemy.inspect(self.engine).default_schema_name 371 self.__dict__['schema'] = _schema 372 return _schema 373 374 def __getstate__(self): 375 return self.__dict__ 376 377 def __setstate__(self, d): 378 self.__dict__.update(d) 379 380 def __call__(self): 381 return self
Connect to SQL databases via sqlalchemy
.
SQLConnectors may be used as Meerschaum instance connectors. Read more about connectors and instances at https://meerschaum.io/reference/connectors/
106 def __init__( 107 self, 108 label: Optional[str] = None, 109 flavor: Optional[str] = None, 110 wait: bool = False, 111 connect: bool = False, 112 debug: bool = False, 113 **kw: Any 114 ): 115 """ 116 Parameters 117 ---------- 118 label: str, default 'main' 119 The identifying label for the connector. 120 E.g. for `sql:main`, 'main' is the label. 121 Defaults to 'main'. 122 123 flavor: Optional[str], default None 124 The database flavor, e.g. 125 `'sqlite'`, `'postgresql'`, `'cockroachdb'`, etc. 126 To see supported flavors, run the `bootstrap connectors` command. 127 128 wait: bool, default False 129 If `True`, block until a database connection has been made. 130 Defaults to `False`. 131 132 connect: bool, default False 133 If `True`, immediately attempt to connect the database and raise 134 a warning if the connection fails. 135 Defaults to `False`. 136 137 debug: bool, default False 138 Verbosity toggle. 139 Defaults to `False`. 140 141 kw: Any 142 All other arguments will be passed to the connector's attributes. 143 Therefore, a connector may be made without being registered, 144 as long enough parameters are supplied to the constructor. 145 """ 146 if 'uri' in kw: 147 uri = kw['uri'] 148 if uri.startswith('postgres') and not uri.startswith('postgresql'): 149 uri = uri.replace('postgres', 'postgresql', 1) 150 if uri.startswith('postgresql') and not uri.startswith('postgresql+'): 151 uri = uri.replace('postgresql://', 'postgresql+psycopg://', 1) 152 if uri.startswith('timescaledb://'): 153 uri = uri.replace('timescaledb://', 'postgresql+psycopg://', 1) 154 flavor = 'timescaledb' 155 kw['uri'] = uri 156 from_uri_params = self.from_uri(kw['uri'], as_dict=True) 157 label = label or from_uri_params.get('label', None) 158 _ = from_uri_params.pop('label', None) 159 160 ### Sometimes the flavor may be provided with a URI. 161 kw.update(from_uri_params) 162 if flavor: 163 kw['flavor'] = flavor 164 165 ### set __dict__ in base class 166 super().__init__( 167 'sql', 168 label = label or self.__dict__.get('label', None), 169 **kw 170 ) 171 172 if self.__dict__.get('flavor', None) == 'sqlite': 173 self._reset_attributes() 174 self._set_attributes( 175 'sql', 176 label = label, 177 inherit_default = False, 178 **kw 179 ) 180 ### For backwards compatability reasons, set the path for sql:local if its missing. 181 if self.label == 'local' and not self.__dict__.get('database', None): 182 from meerschaum.config._paths import SQLITE_DB_PATH 183 self.database = str(SQLITE_DB_PATH) 184 185 ### ensure flavor and label are set accordingly 186 if 'flavor' not in self.__dict__: 187 if flavor is None and 'uri' not in self.__dict__: 188 raise Exception( 189 f" Missing flavor. Provide flavor as a key for '{self}'." 190 ) 191 self.flavor = flavor or self.parse_uri(self.__dict__['uri']).get('flavor', None) 192 193 if self.flavor == 'postgres': 194 self.flavor = 'postgresql' 195 196 self._debug = debug 197 ### Store the PID and thread at initialization 198 ### so we can dispose of the Pool in child processes or threads. 199 import os 200 import threading 201 self._pid = os.getpid() 202 self._thread_ident = threading.current_thread().ident 203 self._sessions = {} 204 self._locks = {'_sessions': threading.RLock(), } 205 206 ### verify the flavor's requirements are met 207 if self.flavor not in self.flavor_configs: 208 error(f"Flavor '{self.flavor}' is not supported by Meerschaum SQLConnector") 209 if not self.__dict__.get('uri'): 210 self.verify_attributes( 211 self.flavor_configs[self.flavor].get('requirements', set()), 212 debug=debug, 213 ) 214 215 if wait: 216 from meerschaum.connectors.poll import retry_connect 217 retry_connect(connector=self, debug=debug) 218 219 if connect: 220 if not self.test_connection(debug=debug): 221 warn(f"Failed to connect with connector '{self}'!", stack=False)
Parameters
- label (str, default 'main'):
The identifying label for the connector.
E.g. for
sql:main
, 'main' is the label. Defaults to 'main'. - flavor (Optional[str], default None):
The database flavor, e.g.
'sqlite'
,'postgresql'
,'cockroachdb'
, etc. To see supported flavors, run thebootstrap connectors
command. - wait (bool, default False):
If
True
, block until a database connection has been made. Defaults toFalse
. - connect (bool, default False):
If
True
, immediately attempt to connect the database and raise a warning if the connection fails. Defaults toFalse
. - debug (bool, default False):
Verbosity toggle.
Defaults to
False
. - kw (Any): All other arguments will be passed to the connector's attributes. Therefore, a connector may be made without being registered, as long enough parameters are supplied to the constructor.
223 @property 224 def Session(self): 225 if '_Session' not in self.__dict__: 226 if self.engine is None: 227 return None 228 229 from meerschaum.utils.packages import attempt_import 230 sqlalchemy_orm = attempt_import('sqlalchemy.orm', lazy=False) 231 session_factory = sqlalchemy_orm.sessionmaker(self.engine) 232 self._Session = sqlalchemy_orm.scoped_session(session_factory) 233 234 return self._Session
236 @property 237 def engine(self): 238 """ 239 Return the SQLAlchemy engine connected to the configured database. 240 """ 241 import os 242 import threading 243 if '_engine' not in self.__dict__: 244 self._engine, self._engine_str = self.create_engine(include_uri=True) 245 246 same_process = os.getpid() == self._pid 247 same_thread = threading.current_thread().ident == self._thread_ident 248 249 ### handle child processes 250 if not same_process: 251 self._pid = os.getpid() 252 self._thread = threading.current_thread() 253 warn("Different PID detected. Disposing of connections...") 254 self._engine.dispose() 255 256 ### handle different threads 257 if not same_thread: 258 if self.flavor == 'duckdb': 259 warn("Different thread detected.") 260 self._engine.dispose() 261 262 return self._engine
Return the SQLAlchemy engine connected to the configured database.
264 @property 265 def DATABASE_URL(self) -> str: 266 """ 267 Return the URI connection string (alias for `SQLConnector.URI`. 268 """ 269 _ = self.engine 270 return str(self._engine_str)
Return the URI connection string (alias for SQLConnector.URI
.
272 @property 273 def URI(self) -> str: 274 """ 275 Return the URI connection string. 276 """ 277 _ = self.engine 278 return str(self._engine_str)
Return the URI connection string.
280 @property 281 def IS_THREAD_SAFE(self) -> str: 282 """ 283 Return whether this connector may be multithreaded. 284 """ 285 if self.flavor in ('duckdb', 'oracle'): 286 return False 287 if self.flavor == 'sqlite': 288 return ':memory:' not in self.URI 289 return True
Return whether this connector may be multithreaded.
291 @property 292 def metadata(self): 293 """ 294 Return the metadata bound to this configured schema. 295 """ 296 from meerschaum.utils.packages import attempt_import 297 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 298 if '_metadata' not in self.__dict__: 299 self._metadata = sqlalchemy.MetaData(schema=self.schema) 300 return self._metadata
Return the metadata bound to this configured schema.
302 @property 303 def instance_schema(self): 304 """ 305 Return the schema name for Meerschaum tables. 306 """ 307 return self.schema
Return the schema name for Meerschaum tables.
309 @property 310 def internal_schema(self): 311 """ 312 Return the schema name for internal tables. 313 """ 314 from meerschaum.config.static import STATIC_CONFIG 315 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 316 schema_name = self.__dict__.get('internal_schema', None) or ( 317 STATIC_CONFIG['sql']['internal_schema'] 318 if self.flavor not in NO_SCHEMA_FLAVORS 319 else self.schema 320 ) 321 322 if '_internal_schema' not in self.__dict__: 323 self._internal_schema = schema_name 324 return self._internal_schema
Return the schema name for internal tables.
326 @property 327 def db(self) -> Optional[databases.Database]: 328 from meerschaum.utils.packages import attempt_import 329 databases = attempt_import('databases', lazy=False, install=True) 330 url = self.DATABASE_URL 331 if 'mysql' in url: 332 url = url.replace('+pymysql', '') 333 if '_db' not in self.__dict__: 334 try: 335 self._db = databases.Database(url) 336 except KeyError: 337 ### Likely encountered an unsupported flavor. 338 from meerschaum.utils.warnings import warn 339 self._db = None 340 return self._db
342 @property 343 def db_version(self) -> Union[str, None]: 344 """ 345 Return the database version. 346 """ 347 _db_version = self.__dict__.get('_db_version', None) 348 if _db_version is not None: 349 return _db_version 350 351 from meerschaum.utils.sql import get_db_version 352 self._db_version = get_db_version(self) 353 return self._db_version
Return the database version.
355 @property 356 def schema(self) -> Union[str, None]: 357 """ 358 Return the default schema to use. 359 A value of `None` will not prepend a schema. 360 """ 361 if 'schema' in self.__dict__: 362 return self.__dict__['schema'] 363 364 from meerschaum.utils.sql import NO_SCHEMA_FLAVORS 365 if self.flavor in NO_SCHEMA_FLAVORS: 366 self.__dict__['schema'] = None 367 return None 368 369 sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False) 370 _schema = sqlalchemy.inspect(self.engine).default_schema_name 371 self.__dict__['schema'] = _schema 372 return _schema
Return the default schema to use.
A value of None
will not prepend a schema.
181def create_engine( 182 self, 183 include_uri: bool = False, 184 debug: bool = False, 185 **kw 186) -> 'sqlalchemy.engine.Engine': 187 """Create a sqlalchemy engine by building the engine string.""" 188 from meerschaum.utils.packages import attempt_import 189 from meerschaum.utils.warnings import error, warn 190 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 191 import urllib 192 import copy 193 ### Install and patch required drivers. 194 if self.flavor in install_flavor_drivers: 195 _ = attempt_import( 196 *install_flavor_drivers[self.flavor], 197 debug=debug, 198 lazy=False, 199 warn=False, 200 ) 201 if self.flavor == 'mssql': 202 pyodbc = attempt_import('pyodbc', debug=debug, lazy=False, warn=False) 203 pyodbc.pooling = False 204 if self.flavor in require_patching_flavors: 205 from meerschaum.utils.packages import determine_version, _monkey_patch_get_distribution 206 import pathlib 207 for install_name, import_name in require_patching_flavors[self.flavor]: 208 pkg = attempt_import( 209 import_name, 210 debug=debug, 211 lazy=False, 212 warn=False 213 ) 214 _monkey_patch_get_distribution( 215 install_name, determine_version(pathlib.Path(pkg.__file__), venv='mrsm') 216 ) 217 218 ### supplement missing values with defaults (e.g. port number) 219 for a, value in flavor_configs[self.flavor]['defaults'].items(): 220 if a not in self.__dict__: 221 self.__dict__[a] = value 222 223 ### Verify that everything is in order. 224 if self.flavor not in flavor_configs: 225 error(f"Cannot create a connector with the flavor '{self.flavor}'.") 226 227 _engine = flavor_configs[self.flavor].get('engine', None) 228 _username = self.__dict__.get('username', None) 229 _password = self.__dict__.get('password', None) 230 _host = self.__dict__.get('host', None) 231 _port = self.__dict__.get('port', None) 232 _database = self.__dict__.get('database', None) 233 _options = self.__dict__.get('options', {}) 234 if isinstance(_options, str): 235 _options = dict(urllib.parse.parse_qsl(_options)) 236 _uri = self.__dict__.get('uri', None) 237 238 ### Handle registering specific dialects (due to installing in virtual environments). 239 if self.flavor in flavor_dialects: 240 sqlalchemy.dialects.registry.register(*flavor_dialects[self.flavor]) 241 242 ### self._sys_config was deepcopied and can be updated safely 243 if self.flavor in ("sqlite", "duckdb"): 244 engine_str = f"{_engine}:///{_database}" if not _uri else _uri 245 if 'create_engine' not in self._sys_config: 246 self._sys_config['create_engine'] = {} 247 if 'connect_args' not in self._sys_config['create_engine']: 248 self._sys_config['create_engine']['connect_args'] = {} 249 self._sys_config['create_engine']['connect_args'].update({"check_same_thread": False}) 250 else: 251 engine_str = ( 252 _engine + "://" + (_username if _username is not None else '') + 253 ((":" + urllib.parse.quote_plus(_password)) if _password is not None else '') + 254 "@" + _host + ((":" + str(_port)) if _port is not None else '') + 255 (("/" + _database) if _database is not None else '') 256 + (("?" + urllib.parse.urlencode(_options)) if _options else '') 257 ) if not _uri else _uri 258 259 ### Sometimes the timescaledb:// flavor can slip in. 260 if _uri and self.flavor in ('timescaledb',) and self.flavor in _uri: 261 engine_str = engine_str.replace(f'{self.flavor}', 'postgresql', 1) 262 263 if debug: 264 dprint( 265 ( 266 (engine_str.replace(':' + _password, ':' + ('*' * len(_password)))) 267 if _password is not None else engine_str 268 ) + '\n' + f"{self._sys_config.get('create_engine', {}).get('connect_args', {})}" 269 ) 270 271 _kw_copy = copy.deepcopy(kw) 272 273 ### NOTE: Order of inheritance: 274 ### 1. Defaults 275 ### 2. System configuration 276 ### 3. Connector configuration 277 ### 4. Keyword arguments 278 _create_engine_args = flavor_configs.get(self.flavor, {}).get('create_engine', {}) 279 def _apply_create_engine_args(update): 280 if 'ALL' not in flavor_configs[self.flavor].get('omit_create_engine', {}): 281 _create_engine_args.update( 282 { k: v for k, v in update.items() 283 if 'omit_create_engine' not in flavor_configs[self.flavor] 284 or k not in flavor_configs[self.flavor].get('omit_create_engine') 285 } 286 ) 287 _apply_create_engine_args(self._sys_config.get('create_engine', {})) 288 _apply_create_engine_args(self.__dict__.get('create_engine', {})) 289 _apply_create_engine_args(_kw_copy) 290 291 try: 292 engine = sqlalchemy.create_engine( 293 engine_str, 294 ### I know this looks confusing, and maybe it's bad code, 295 ### but it's simple. It dynamically parses the config string 296 ### and splits it to separate the class name (QueuePool) 297 ### from the module name (sqlalchemy.pool). 298 poolclass = getattr( 299 attempt_import( 300 ".".join(self._sys_config['poolclass'].split('.')[:-1]) 301 ), 302 self._sys_config['poolclass'].split('.')[-1] 303 ), 304 echo = debug, 305 **_create_engine_args 306 ) 307 except Exception as e: 308 warn(f"Failed to create connector '{self}':\n{traceback.format_exc()}", stack=False) 309 engine = None 310 311 if include_uri: 312 return engine, engine_str 313 return engine
Create a sqlalchemy engine by building the engine string.
28def read( 29 self, 30 query_or_table: Union[str, sqlalchemy.Query], 31 params: Union[Dict[str, Any], List[str], None] = None, 32 dtype: Optional[Dict[str, Any]] = None, 33 coerce_float: bool = True, 34 chunksize: Optional[int] = -1, 35 workers: Optional[int] = None, 36 chunk_hook: Optional[Callable[[pandas.DataFrame], Any]] = None, 37 as_hook_results: bool = False, 38 chunks: Optional[int] = None, 39 schema: Optional[str] = None, 40 as_chunks: bool = False, 41 as_iterator: bool = False, 42 as_dask: bool = False, 43 index_col: Optional[str] = None, 44 silent: bool = False, 45 debug: bool = False, 46 **kw: Any 47) -> Union[ 48 pandas.DataFrame, 49 dask.DataFrame, 50 List[pandas.DataFrame], 51 List[Any], 52 None, 53]: 54 """ 55 Read a SQL query or table into a pandas dataframe. 56 57 Parameters 58 ---------- 59 query_or_table: Union[str, sqlalchemy.Query] 60 The SQL query (sqlalchemy Query or string) or name of the table from which to select. 61 62 params: Optional[Dict[str, Any]], default None 63 `List` or `Dict` of parameters to pass to `pandas.read_sql()`. 64 See the pandas documentation for more information: 65 https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html 66 67 dtype: Optional[Dict[str, Any]], default None 68 A dictionary of data types to pass to `pandas.read_sql()`. 69 See the pandas documentation for more information: 70 https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html 71 72 chunksize: Optional[int], default -1 73 How many chunks to read at a time. `None` will read everything in one large chunk. 74 Defaults to system configuration. 75 76 **NOTE:** DuckDB does not allow for chunking. 77 78 workers: Optional[int], default None 79 How many threads to use when consuming the generator. 80 Only applies if `chunk_hook` is provided. 81 82 chunk_hook: Optional[Callable[[pandas.DataFrame], Any]], default None 83 Hook function to execute once per chunk, e.g. writing and reading chunks intermittently. 84 See `--sync-chunks` for an example. 85 **NOTE:** `as_iterator` MUST be False (default). 86 87 as_hook_results: bool, default False 88 If `True`, return a `List` of the outputs of the hook function. 89 Only applicable if `chunk_hook` is not None. 90 91 **NOTE:** `as_iterator` MUST be `False` (default). 92 93 chunks: Optional[int], default None 94 Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and 95 return into a single dataframe. 96 For example, to limit the returned dataframe to 100,000 rows, 97 you could specify a `chunksize` of `1000` and `chunks` of `100`. 98 99 schema: Optional[str], default None 100 If just a table name is provided, optionally specify the table schema. 101 Defaults to `SQLConnector.schema`. 102 103 as_chunks: bool, default False 104 If `True`, return a list of DataFrames. 105 Otherwise return a single DataFrame. 106 107 as_iterator: bool, default False 108 If `True`, return the pandas DataFrame iterator. 109 `chunksize` must not be `None` (falls back to 1000 if so), 110 and hooks are not called in this case. 111 112 index_col: Optional[str], default None 113 If using Dask, use this column as the index column. 114 If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame. 115 116 silent: bool, default False 117 If `True`, don't raise warnings in case of errors. 118 Defaults to `False`. 119 120 Returns 121 ------- 122 A `pd.DataFrame` (default case), or an iterator, or a list of dataframes / iterators, 123 or `None` if something breaks. 124 125 """ 126 if chunks is not None and chunks <= 0: 127 return [] 128 from meerschaum.utils.sql import sql_item_name, truncate_item_name 129 from meerschaum.utils.dtypes import are_dtypes_equal, coerce_timezone 130 from meerschaum.utils.dtypes.sql import TIMEZONE_NAIVE_FLAVORS 131 from meerschaum.utils.packages import attempt_import, import_pandas 132 from meerschaum.utils.pool import get_pool 133 from meerschaum.utils.dataframe import chunksize_to_npartitions, get_numeric_cols 134 import warnings 135 import traceback 136 from decimal import Decimal 137 pd = import_pandas() 138 dd = None 139 is_dask = 'dask' in pd.__name__ 140 pandas = attempt_import('pandas') 141 is_dask = dd is not None 142 npartitions = chunksize_to_npartitions(chunksize) 143 if is_dask: 144 chunksize = None 145 schema = schema or self.schema 146 utc_dt_cols = [ 147 col 148 for col, typ in dtype.items() 149 if are_dtypes_equal(typ, 'datetime') and 'utc' in typ.lower() 150 ] if dtype else [] 151 152 if dtype and utc_dt_cols and self.flavor in TIMEZONE_NAIVE_FLAVORS: 153 dtype = dtype.copy() 154 for col in utc_dt_cols: 155 dtype[col] = 'datetime64[ns]' 156 157 pool = get_pool(workers=workers) 158 sqlalchemy = attempt_import("sqlalchemy", lazy=False) 159 default_chunksize = self._sys_config.get('chunksize', None) 160 chunksize = chunksize if chunksize != -1 else default_chunksize 161 if chunksize is None and as_iterator: 162 if not silent and self.flavor not in _disallow_chunks_flavors: 163 warn( 164 "An iterator may only be generated if chunksize is not None.\n" 165 + "Falling back to a chunksize of 1000.", stacklevel=3, 166 ) 167 chunksize = 1000 168 if chunksize is not None and self.flavor in _max_chunks_flavors: 169 if chunksize > _max_chunks_flavors[self.flavor]: 170 if chunksize != default_chunksize: 171 warn( 172 f"The specified chunksize of {chunksize} exceeds the maximum of " 173 + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n" 174 + f" Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.", 175 stacklevel=3, 176 ) 177 chunksize = _max_chunks_flavors[self.flavor] 178 179 if chunksize is not None and self.flavor in _disallow_chunks_flavors: 180 chunksize = None 181 182 if debug: 183 import time 184 start = time.perf_counter() 185 dprint(f"[{self}]\n{query_or_table}") 186 dprint(f"[{self}] Fetching with chunksize: {chunksize}") 187 188 ### This might be sqlalchemy object or the string of a table name. 189 ### We check for spaces and quotes to see if it might be a weird table. 190 if ( 191 ' ' not in str(query_or_table) 192 or ( 193 ' ' in str(query_or_table) 194 and str(query_or_table).startswith('"') 195 and str(query_or_table).endswith('"') 196 ) 197 ): 198 truncated_table_name = truncate_item_name(str(query_or_table), self.flavor) 199 if truncated_table_name != str(query_or_table) and not silent: 200 warn( 201 f"Table '{query_or_table}' is too long for '{self.flavor}'," 202 + f" will instead read the table '{truncated_table_name}'." 203 ) 204 205 query_or_table = sql_item_name(str(query_or_table), self.flavor, schema) 206 if debug: 207 dprint(f"[{self}] Reading from table {query_or_table}") 208 formatted_query = sqlalchemy.text("SELECT * FROM " + str(query_or_table)) 209 str_query = f"SELECT * FROM {query_or_table}" 210 else: 211 str_query = query_or_table 212 213 formatted_query = ( 214 sqlalchemy.text(str_query) 215 if not is_dask and isinstance(str_query, str) 216 else format_sql_query_for_dask(str_query) 217 ) 218 219 chunk_list = [] 220 chunk_hook_results = [] 221 def _process_chunk(_chunk, _retry_on_failure: bool = True): 222 if self.flavor in TIMEZONE_NAIVE_FLAVORS: 223 for col in utc_dt_cols: 224 _chunk[col] = coerce_timezone(_chunk[col], strip_timezone=False) 225 if not as_hook_results: 226 chunk_list.append(_chunk) 227 if chunk_hook is None: 228 return None 229 230 result = None 231 try: 232 result = chunk_hook( 233 _chunk, 234 workers=workers, 235 chunksize=chunksize, 236 debug=debug, 237 **kw 238 ) 239 except Exception: 240 result = False, traceback.format_exc() 241 from meerschaum.utils.formatting import get_console 242 if not silent: 243 get_console().print_exception() 244 245 ### If the chunk fails to process, try it again one more time. 246 if isinstance(result, tuple) and result[0] is False: 247 if _retry_on_failure: 248 return _process_chunk(_chunk, _retry_on_failure=False) 249 250 return result 251 252 try: 253 stream_results = not as_iterator and chunk_hook is not None and chunksize is not None 254 with warnings.catch_warnings(): 255 warnings.filterwarnings('ignore', 'case sensitivity issues') 256 257 read_sql_query_kwargs = { 258 'params': params, 259 'dtype': dtype, 260 'coerce_float': coerce_float, 261 'index_col': index_col, 262 } 263 if is_dask: 264 if index_col is None: 265 dd = None 266 pd = attempt_import('pandas') 267 read_sql_query_kwargs.update({ 268 'chunksize': chunksize, 269 }) 270 else: 271 read_sql_query_kwargs.update({ 272 'chunksize': chunksize, 273 }) 274 275 if is_dask and dd is not None: 276 ddf = dd.read_sql_query( 277 formatted_query, 278 self.URI, 279 **read_sql_query_kwargs 280 ) 281 else: 282 283 def get_chunk_generator(connectable): 284 chunk_generator = pd.read_sql_query( 285 formatted_query, 286 self.engine, 287 **read_sql_query_kwargs 288 ) 289 to_return = ( 290 chunk_generator 291 if as_iterator or chunksize is None 292 else ( 293 list(pool.imap(_process_chunk, chunk_generator)) 294 if as_hook_results 295 else None 296 ) 297 ) 298 return chunk_generator, to_return 299 300 if self.flavor in SKIP_READ_TRANSACTION_FLAVORS: 301 chunk_generator, to_return = get_chunk_generator(self.engine) 302 else: 303 with self.engine.begin() as transaction: 304 with transaction.execution_options(stream_results=stream_results) as connection: 305 chunk_generator, to_return = get_chunk_generator(connection) 306 307 if to_return is not None: 308 return to_return 309 310 except Exception as e: 311 if debug: 312 dprint(f"[{self}] Failed to execute query:\n\n{query_or_table}\n\n") 313 if not silent: 314 warn(str(e), stacklevel=3) 315 from meerschaum.utils.formatting import get_console 316 if not silent: 317 get_console().print_exception() 318 319 return None 320 321 if is_dask and dd is not None: 322 ddf = ddf.reset_index() 323 return ddf 324 325 chunk_list = [] 326 read_chunks = 0 327 chunk_hook_results = [] 328 if chunksize is None: 329 chunk_list.append(chunk_generator) 330 elif as_iterator: 331 return chunk_generator 332 else: 333 try: 334 for chunk in chunk_generator: 335 if chunk_hook is not None: 336 chunk_hook_results.append( 337 chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw) 338 ) 339 chunk_list.append(chunk) 340 read_chunks += 1 341 if chunks is not None and read_chunks >= chunks: 342 break 343 except Exception as e: 344 warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3) 345 from meerschaum.utils.formatting import get_console 346 if not silent: 347 get_console().print_exception() 348 349 read_chunks = 0 350 try: 351 for chunk in chunk_generator: 352 if chunk_hook is not None: 353 chunk_hook_results.append( 354 chunk_hook(chunk, chunksize=chunksize, debug=debug, **kw) 355 ) 356 chunk_list.append(chunk) 357 read_chunks += 1 358 if chunks is not None and read_chunks >= chunks: 359 break 360 except Exception as e: 361 warn(f"[{self}] Failed to retrieve query results:\n" + str(e), stacklevel=3) 362 from meerschaum.utils.formatting import get_console 363 if not silent: 364 get_console().print_exception() 365 366 return None 367 368 ### If no chunks returned, read without chunks 369 ### to get columns 370 if len(chunk_list) == 0: 371 with warnings.catch_warnings(): 372 warnings.filterwarnings('ignore', 'case sensitivity issues') 373 _ = read_sql_query_kwargs.pop('chunksize', None) 374 with self.engine.begin() as connection: 375 chunk_list.append( 376 pd.read_sql_query( 377 formatted_query, 378 connection, 379 **read_sql_query_kwargs 380 ) 381 ) 382 383 ### call the hook on any missed chunks. 384 if chunk_hook is not None and len(chunk_list) > len(chunk_hook_results): 385 for c in chunk_list[len(chunk_hook_results):]: 386 chunk_hook_results.append( 387 chunk_hook(c, chunksize=chunksize, debug=debug, **kw) 388 ) 389 390 ### chunksize is not None so must iterate 391 if debug: 392 end = time.perf_counter() 393 dprint(f"Fetched {len(chunk_list)} chunks in {round(end - start, 2)} seconds.") 394 395 if as_hook_results: 396 return chunk_hook_results 397 398 ### Skip `pd.concat()` if `as_chunks` is specified. 399 if as_chunks: 400 for c in chunk_list: 401 c.reset_index(drop=True, inplace=True) 402 for col in get_numeric_cols(c): 403 c[col] = c[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x) 404 return chunk_list 405 406 df = pd.concat(chunk_list).reset_index(drop=True) 407 ### NOTE: The calls to `canonical()` are to drop leading and trailing zeroes. 408 for col in get_numeric_cols(df): 409 df[col] = df[col].apply(lambda x: x.canonical() if isinstance(x, Decimal) else x) 410 411 return df
Read a SQL query or table into a pandas dataframe.
Parameters
- query_or_table (Union[str, sqlalchemy.Query]): The SQL query (sqlalchemy Query or string) or name of the table from which to select.
- params (Optional[Dict[str, Any]], default None):
List
orDict
of parameters to pass topandas.read_sql()
. See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql.html - dtype (Optional[Dict[str, Any]], default None):
A dictionary of data types to pass to
pandas.read_sql()
. See the pandas documentation for more information: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html chunksize (Optional[int], default -1): How many chunks to read at a time.
None
will read everything in one large chunk. Defaults to system configuration.NOTE: DuckDB does not allow for chunking.
- workers (Optional[int], default None):
How many threads to use when consuming the generator.
Only applies if
chunk_hook
is provided. - chunk_hook (Optional[Callable[[pandas.DataFrame], Any]], default None):
Hook function to execute once per chunk, e.g. writing and reading chunks intermittently.
See
--sync-chunks
for an example. NOTE:as_iterator
MUST be False (default). as_hook_results (bool, default False): If
True
, return aList
of the outputs of the hook function. Only applicable ifchunk_hook
is not None.NOTE:
as_iterator
MUST beFalse
(default).- chunks (Optional[int], default None):
Limit the number of chunks to read into memory, i.e. how many chunks to retrieve and
return into a single dataframe.
For example, to limit the returned dataframe to 100,000 rows,
you could specify a
chunksize
of1000
andchunks
of100
. - schema (Optional[str], default None):
If just a table name is provided, optionally specify the table schema.
Defaults to
SQLConnector.schema
. - as_chunks (bool, default False):
If
True
, return a list of DataFrames. Otherwise return a single DataFrame. - as_iterator (bool, default False):
If
True
, return the pandas DataFrame iterator.chunksize
must not beNone
(falls back to 1000 if so), and hooks are not called in this case. - index_col (Optional[str], default None): If using Dask, use this column as the index column. If omitted, a Pandas DataFrame will be fetched and converted to a Dask DataFrame.
- silent (bool, default False):
If
True
, don't raise warnings in case of errors. Defaults toFalse
.
Returns
- A
pd.DataFrame
(default case), or an iterator, or a list of dataframes / iterators, - or
None
if something breaks.
414def value( 415 self, 416 query: str, 417 *args: Any, 418 use_pandas: bool = False, 419 **kw: Any 420) -> Any: 421 """ 422 Execute the provided query and return the first value. 423 424 Parameters 425 ---------- 426 query: str 427 The SQL query to execute. 428 429 *args: Any 430 The arguments passed to `meerschaum.connectors.sql.SQLConnector.exec` 431 if `use_pandas` is `False` (default) or to `meerschaum.connectors.sql.SQLConnector.read`. 432 433 use_pandas: bool, default False 434 If `True`, use `meerschaum.connectors.SQLConnector.read`, otherwise use 435 `meerschaum.connectors.sql.SQLConnector.exec` (default). 436 **NOTE:** This is always `True` for DuckDB. 437 438 **kw: Any 439 See `args`. 440 441 Returns 442 ------- 443 Any value returned from the query. 444 445 """ 446 from meerschaum.utils.packages import attempt_import 447 if self.flavor == 'duckdb': 448 use_pandas = True 449 if use_pandas: 450 try: 451 return self.read(query, *args, **kw).iloc[0, 0] 452 except Exception: 453 return None 454 455 _close = kw.get('close', True) 456 _commit = kw.get('commit', (self.flavor != 'mssql')) 457 458 try: 459 result, connection = self.exec( 460 query, 461 *args, 462 with_connection=True, 463 close=False, 464 commit=_commit, 465 **kw 466 ) 467 first = result.first() if result is not None else None 468 _val = first[0] if first is not None else None 469 except Exception as e: 470 warn(e, stacklevel=3) 471 return None 472 if _close: 473 try: 474 connection.close() 475 except Exception as e: 476 warn("Failed to close connection with exception:\n" + str(e)) 477 return _val
Execute the provided query and return the first value.
Parameters
- query (str): The SQL query to execute.
- *args (Any):
The arguments passed to
meerschaum.connectors.sql.SQLConnector.exec
ifuse_pandas
isFalse
(default) or tomeerschaum.connectors.sql.SQLConnector.read
. - use_pandas (bool, default False):
If
True
, usemeerschaum.connectors.SQLConnector.read
, otherwise usemeerschaum.connectors.sql.SQLConnector.exec
(default). NOTE: This is alwaysTrue
for DuckDB. - **kw (Any):
See
args
.
Returns
- Any value returned from the query.
491def exec( 492 self, 493 query: str, 494 *args: Any, 495 silent: bool = False, 496 debug: bool = False, 497 commit: Optional[bool] = None, 498 close: Optional[bool] = None, 499 with_connection: bool = False, 500 _connection=None, 501 _transaction=None, 502 **kw: Any 503) -> Union[ 504 sqlalchemy.engine.result.resultProxy, 505 sqlalchemy.engine.cursor.LegacyCursorResult, 506 Tuple[sqlalchemy.engine.result.resultProxy, sqlalchemy.engine.base.Connection], 507 Tuple[sqlalchemy.engine.cursor.LegacyCursorResult, sqlalchemy.engine.base.Connection], 508 None 509]: 510 """ 511 Execute SQL code and return the `sqlalchemy` result, e.g. when calling stored procedures. 512 513 If inserting data, please use bind variables to avoid SQL injection! 514 515 Parameters 516 ---------- 517 query: Union[str, List[str], Tuple[str]] 518 The query to execute. 519 If `query` is a list or tuple, call `self.exec_queries()` instead. 520 521 args: Any 522 Arguments passed to `sqlalchemy.engine.execute`. 523 524 silent: bool, default False 525 If `True`, suppress warnings. 526 527 commit: Optional[bool], default None 528 If `True`, commit the changes after execution. 529 Causes issues with flavors like `'mssql'`. 530 This does not apply if `query` is a list of strings. 531 532 close: Optional[bool], default None 533 If `True`, close the connection after execution. 534 Causes issues with flavors like `'mssql'`. 535 This does not apply if `query` is a list of strings. 536 537 with_connection: bool, default False 538 If `True`, return a tuple including the connection object. 539 This does not apply if `query` is a list of strings. 540 541 Returns 542 ------- 543 The `sqlalchemy` result object, or a tuple with the connection if `with_connection` is provided. 544 545 """ 546 if isinstance(query, (list, tuple)): 547 return self.exec_queries( 548 list(query), 549 *args, 550 silent=silent, 551 debug=debug, 552 **kw 553 ) 554 555 from meerschaum.utils.packages import attempt_import 556 sqlalchemy = attempt_import("sqlalchemy", lazy=False) 557 if debug: 558 dprint(f"[{self}] Executing query:\n{query}") 559 560 _close = close if close is not None else (self.flavor != 'mssql') 561 _commit = commit if commit is not None else ( 562 (self.flavor != 'mssql' or 'select' not in str(query).lower()) 563 ) 564 565 ### Select and Insert objects need to be compiled (SQLAlchemy 2.0.0+). 566 if not hasattr(query, 'compile'): 567 query = sqlalchemy.text(query) 568 569 connection = _connection if _connection is not None else self.get_connection() 570 571 try: 572 transaction = ( 573 _transaction 574 if _transaction is not None else ( 575 connection.begin() 576 if _commit 577 else None 578 ) 579 ) 580 except sqlalchemy.exc.InvalidRequestError as e: 581 if _connection is not None or _transaction is not None: 582 raise e 583 connection = self.get_connection(rebuild=True) 584 transaction = connection.begin() 585 586 if transaction is not None and not transaction.is_active and _transaction is not None: 587 connection = self.get_connection(rebuild=True) 588 transaction = connection.begin() if _commit else None 589 590 result = None 591 try: 592 result = connection.execute(query, *args, **kw) 593 if _commit: 594 transaction.commit() 595 except Exception as e: 596 if debug: 597 dprint(f"[{self}] Failed to execute query:\n\n{query}\n\n{e}") 598 if not silent: 599 warn(str(e), stacklevel=3) 600 result = None 601 if _commit: 602 transaction.rollback() 603 connection = self.get_connection(rebuild=True) 604 finally: 605 if _close: 606 connection.close() 607 608 if with_connection: 609 return result, connection 610 611 return result
Execute SQL code and return the sqlalchemy
result, e.g. when calling stored procedures.
If inserting data, please use bind variables to avoid SQL injection!
Parameters
- query (Union[str, List[str], Tuple[str]]):
The query to execute.
If
query
is a list or tuple, callself.exec_queries()
instead. - args (Any):
Arguments passed to
sqlalchemy.engine.execute
. - silent (bool, default False):
If
True
, suppress warnings. - commit (Optional[bool], default None):
If
True
, commit the changes after execution. Causes issues with flavors like'mssql'
. This does not apply ifquery
is a list of strings. - close (Optional[bool], default None):
If
True
, close the connection after execution. Causes issues with flavors like'mssql'
. This does not apply ifquery
is a list of strings. - with_connection (bool, default False):
If
True
, return a tuple including the connection object. This does not apply ifquery
is a list of strings.
Returns
- The
sqlalchemy
result object, or a tuple with the connection ifwith_connection
is provided.
480def execute( 481 self, 482 *args : Any, 483 **kw : Any 484) -> Optional[sqlalchemy.engine.result.resultProxy]: 485 """ 486 An alias for `meerschaum.connectors.sql.SQLConnector.exec`. 487 """ 488 return self.exec(*args, **kw)
An alias for meerschaum.connectors.sql.SQLConnector.exec
.
709def to_sql( 710 self, 711 df: pandas.DataFrame, 712 name: str = None, 713 index: bool = False, 714 if_exists: str = 'replace', 715 method: str = "", 716 chunksize: Optional[int] = -1, 717 schema: Optional[str] = None, 718 safe_copy: bool = True, 719 silent: bool = False, 720 debug: bool = False, 721 as_tuple: bool = False, 722 as_dict: bool = False, 723 _connection=None, 724 _transaction=None, 725 **kw 726) -> Union[bool, SuccessTuple]: 727 """ 728 Upload a DataFrame's contents to the SQL server. 729 730 Parameters 731 ---------- 732 df: pd.DataFrame 733 The DataFrame to be inserted. 734 735 name: str 736 The name of the table to be created. 737 738 index: bool, default False 739 If True, creates the DataFrame's indices as columns. 740 741 if_exists: str, default 'replace' 742 Drop and create the table ('replace') or append if it exists 743 ('append') or raise Exception ('fail'). 744 Options are ['replace', 'append', 'fail']. 745 746 method: str, default '' 747 None or multi. Details on pandas.to_sql. 748 749 chunksize: Optional[int], default -1 750 How many rows to insert at a time. 751 752 schema: Optional[str], default None 753 Optionally override the schema for the table. 754 Defaults to `SQLConnector.schema`. 755 756 safe_copy: bool, defaul True 757 If `True`, copy the dataframe before making any changes. 758 759 as_tuple: bool, default False 760 If `True`, return a (success_bool, message) tuple instead of a `bool`. 761 Defaults to `False`. 762 763 as_dict: bool, default False 764 If `True`, return a dictionary of transaction information. 765 The keys are `success`, `msg`, `start`, `end`, `duration`, `num_rows`, `chunksize`, 766 `method`, and `target`. 767 768 kw: Any 769 Additional arguments will be passed to the DataFrame's `to_sql` function 770 771 Returns 772 ------- 773 Either a `bool` or a `SuccessTuple` (depends on `as_tuple`). 774 """ 775 import time 776 import json 777 from datetime import timedelta 778 from meerschaum.utils.warnings import error, warn 779 import warnings 780 import functools 781 782 if name is None: 783 error(f"Name must not be `None` to insert data into {self}.") 784 785 ### We're requiring `name` to be positional, and sometimes it's passed in from background jobs. 786 kw.pop('name', None) 787 788 schema = schema or self.schema 789 790 from meerschaum.utils.sql import ( 791 sql_item_name, 792 table_exists, 793 json_flavors, 794 truncate_item_name, 795 DROP_IF_EXISTS_FLAVORS, 796 ) 797 from meerschaum.utils.dataframe import ( 798 get_json_cols, 799 get_numeric_cols, 800 get_uuid_cols, 801 get_bytes_cols, 802 ) 803 from meerschaum.utils.dtypes import ( 804 are_dtypes_equal, 805 coerce_timezone, 806 encode_bytes_for_bytea, 807 serialize_bytes, 808 serialize_decimal, 809 json_serialize_value, 810 ) 811 from meerschaum.utils.dtypes.sql import ( 812 PD_TO_SQLALCHEMY_DTYPES_FLAVORS, 813 get_db_type_from_pd_type, 814 get_pd_type_from_db_type, 815 get_numeric_precision_scale, 816 ) 817 from meerschaum.utils.misc import interval_str 818 from meerschaum.connectors.sql._create_engine import flavor_configs 819 from meerschaum.utils.packages import attempt_import, import_pandas 820 sqlalchemy = attempt_import('sqlalchemy', debug=debug, lazy=False) 821 pd = import_pandas() 822 is_dask = 'dask' in df.__module__ 823 824 bytes_cols = get_bytes_cols(df) 825 numeric_cols = get_numeric_cols(df) 826 ### NOTE: This excludes non-numeric serialized Decimals (e.g. SQLite). 827 numeric_cols_dtypes = { 828 col: typ 829 for col, typ in kw.get('dtype', {}).items() 830 if ( 831 col in df.columns 832 and 'numeric' in str(typ).lower() 833 ) 834 835 } 836 numeric_cols.extend([col for col in numeric_cols_dtypes if col not in numeric_cols]) 837 numeric_cols_precisions_scales = { 838 col: ( 839 (typ.precision, typ.scale) 840 if hasattr(typ, 'precision') 841 else get_numeric_precision_scale(self.flavor) 842 ) 843 for col, typ in numeric_cols_dtypes.items() 844 } 845 cols_pd_types = { 846 col: get_pd_type_from_db_type(str(typ)) 847 for col, typ in kw.get('dtype', {}).items() 848 } 849 cols_pd_types.update({ 850 col: f'numeric[{precision},{scale}]' 851 for col, (precision, scale) in numeric_cols_precisions_scales.items() 852 if precision and scale 853 }) 854 cols_db_types = { 855 col: get_db_type_from_pd_type(typ, flavor=self.flavor) 856 for col, typ in cols_pd_types.items() 857 } 858 859 enable_bulk_insert = mrsm.get_config( 860 'system', 'connectors', 'sql', 'bulk_insert' 861 ).get(self.flavor, False) 862 stats = {'target': name} 863 ### resort to defaults if None 864 copied = False 865 use_bulk_insert = False 866 if method == "": 867 if enable_bulk_insert: 868 method = ( 869 functools.partial(mssql_insert_json, cols_types=cols_db_types, debug=debug) 870 if self.flavor == 'mssql' 871 else functools.partial(psql_insert_copy, debug=debug) 872 ) 873 use_bulk_insert = True 874 else: 875 ### Should resolve to 'multi' or `None`. 876 method = flavor_configs.get(self.flavor, {}).get('to_sql', {}).get('method', 'multi') 877 878 if bytes_cols and (use_bulk_insert or self.flavor == 'oracle'): 879 if safe_copy and not copied: 880 df = df.copy() 881 copied = True 882 bytes_serializer = ( 883 functools.partial(encode_bytes_for_bytea, with_prefix=(self.flavor != 'oracle')) 884 if self.flavor != 'mssql' 885 else serialize_bytes 886 ) 887 for col in bytes_cols: 888 df[col] = df[col].apply(bytes_serializer) 889 890 ### Check for numeric columns. 891 for col in numeric_cols: 892 precision, scale = numeric_cols_precisions_scales.get( 893 col, 894 get_numeric_precision_scale(self.flavor) 895 ) 896 df[col] = df[col].apply( 897 functools.partial( 898 serialize_decimal, 899 quantize=True, 900 precision=precision, 901 scale=scale, 902 ) 903 ) 904 905 stats['method'] = method.__name__ if hasattr(method, '__name__') else str(method) 906 907 default_chunksize = self._sys_config.get('chunksize', None) 908 chunksize = chunksize if chunksize != -1 else default_chunksize 909 if chunksize is not None and self.flavor in _max_chunks_flavors: 910 if chunksize > _max_chunks_flavors[self.flavor]: 911 if chunksize != default_chunksize: 912 warn( 913 f"The specified chunksize of {chunksize} exceeds the maximum of " 914 + f"{_max_chunks_flavors[self.flavor]} for flavor '{self.flavor}'.\n" 915 + f" Falling back to a chunksize of {_max_chunks_flavors[self.flavor]}.", 916 stacklevel = 3, 917 ) 918 chunksize = _max_chunks_flavors[self.flavor] 919 stats['chunksize'] = chunksize 920 921 success, msg = False, "Default to_sql message" 922 start = time.perf_counter() 923 if debug: 924 msg = f"[{self}] Inserting {len(df)} rows with chunksize: {chunksize}..." 925 print(msg, end="", flush=True) 926 stats['num_rows'] = len(df) 927 928 ### Check if the name is too long. 929 truncated_name = truncate_item_name(name, self.flavor) 930 if name != truncated_name: 931 warn( 932 f"Table '{name}' is too long for '{self.flavor}'," 933 f" will instead create the table '{truncated_name}'." 934 ) 935 936 ### filter out non-pandas args 937 import inspect 938 to_sql_params = inspect.signature(df.to_sql).parameters 939 to_sql_kw = {} 940 for k, v in kw.items(): 941 if k in to_sql_params: 942 to_sql_kw[k] = v 943 944 to_sql_kw.update({ 945 'name': truncated_name, 946 'schema': schema, 947 ('con' if not is_dask else 'uri'): (self.engine if not is_dask else self.URI), 948 'index': index, 949 'if_exists': if_exists, 950 'method': method, 951 'chunksize': chunksize, 952 }) 953 if is_dask: 954 to_sql_kw.update({ 955 'parallel': True, 956 }) 957 elif _connection is not None: 958 to_sql_kw['con'] = _connection 959 960 if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else "" 961 if self.flavor == 'oracle': 962 ### For some reason 'replace' doesn't work properly in pandas, 963 ### so try dropping first. 964 if if_exists == 'replace' and table_exists(name, self, schema=schema, debug=debug): 965 success = self.exec( 966 f"DROP TABLE {if_exists_str}" + sql_item_name(name, 'oracle', schema) 967 ) is not None 968 if not success: 969 warn(f"Unable to drop {name}") 970 971 ### Enforce NVARCHAR(2000) as text instead of CLOB. 972 dtype = to_sql_kw.get('dtype', {}) 973 for col, typ in df.dtypes.items(): 974 if are_dtypes_equal(str(typ), 'object'): 975 dtype[col] = sqlalchemy.types.NVARCHAR(2000) 976 elif are_dtypes_equal(str(typ), 'int'): 977 dtype[col] = sqlalchemy.types.INTEGER 978 to_sql_kw['dtype'] = dtype 979 elif self.flavor == 'duckdb': 980 dtype = to_sql_kw.get('dtype', {}) 981 dt_cols = [col for col, typ in df.dtypes.items() if are_dtypes_equal(str(typ), 'datetime')] 982 for col in dt_cols: 983 df[col] = coerce_timezone(df[col], strip_utc=False) 984 elif self.flavor == 'mssql': 985 dtype = to_sql_kw.get('dtype', {}) 986 dt_cols = [col for col, typ in df.dtypes.items() if are_dtypes_equal(str(typ), 'datetime')] 987 new_dtype = {} 988 for col in dt_cols: 989 if col in dtype: 990 continue 991 dt_typ = get_db_type_from_pd_type(str(df.dtypes[col]), self.flavor, as_sqlalchemy=True) 992 if col not in dtype: 993 new_dtype[col] = dt_typ 994 995 dtype.update(new_dtype) 996 to_sql_kw['dtype'] = dtype 997 998 ### Check for JSON columns. 999 if self.flavor not in json_flavors: 1000 json_cols = get_json_cols(df) 1001 for col in json_cols: 1002 df[col] = df[col].apply( 1003 ( 1004 lambda x: json.dumps(x, default=json_serialize_value, sort_keys=True) 1005 if not isinstance(x, Hashable) 1006 else x 1007 ) 1008 ) 1009 1010 if PD_TO_SQLALCHEMY_DTYPES_FLAVORS['uuid'].get(self.flavor, None) != 'Uuid': 1011 uuid_cols = get_uuid_cols(df) 1012 for col in uuid_cols: 1013 df[col] = df[col].astype(str) 1014 1015 try: 1016 with warnings.catch_warnings(): 1017 warnings.filterwarnings('ignore') 1018 df.to_sql(**to_sql_kw) 1019 success = True 1020 except Exception as e: 1021 if not silent: 1022 warn(str(e)) 1023 success, msg = False, str(e) 1024 1025 end = time.perf_counter() 1026 if success: 1027 num_rows = len(df) 1028 msg = ( 1029 f"It took {interval_str(timedelta(seconds=(end - start)))} " 1030 + f"to sync {num_rows:,} row" 1031 + ('s' if num_rows != 1 else '') 1032 + f" to {name}." 1033 ) 1034 stats['start'] = start 1035 stats['end'] = end 1036 stats['duration'] = end - start 1037 1038 if debug: 1039 print(" done.", flush=True) 1040 dprint(msg) 1041 1042 stats['success'] = success 1043 stats['msg'] = msg 1044 if as_tuple: 1045 return success, msg 1046 if as_dict: 1047 return stats 1048 return success
Upload a DataFrame's contents to the SQL server.
Parameters
- df (pd.DataFrame): The DataFrame to be inserted.
- name (str): The name of the table to be created.
- index (bool, default False): If True, creates the DataFrame's indices as columns.
- if_exists (str, default 'replace'): Drop and create the table ('replace') or append if it exists ('append') or raise Exception ('fail'). Options are ['replace', 'append', 'fail'].
- method (str, default ''): None or multi. Details on pandas.to_sql.
- chunksize (Optional[int], default -1): How many rows to insert at a time.
- schema (Optional[str], default None):
Optionally override the schema for the table.
Defaults to
SQLConnector.schema
. - safe_copy (bool, defaul True):
If
True
, copy the dataframe before making any changes. - as_tuple (bool, default False):
If
True
, return a (success_bool, message) tuple instead of abool
. Defaults toFalse
. - as_dict (bool, default False):
If
True
, return a dictionary of transaction information. The keys aresuccess
,msg
,start
,end
,duration
,num_rows
,chunksize
,method
, andtarget
. - kw (Any):
Additional arguments will be passed to the DataFrame's
to_sql
function
Returns
- Either a
bool
or aSuccessTuple
(depends onas_tuple
).
614def exec_queries( 615 self, 616 queries: List[ 617 Union[ 618 str, 619 Tuple[str, Callable[['sqlalchemy.orm.session.Session'], List[str]]] 620 ] 621 ], 622 break_on_error: bool = False, 623 rollback: bool = True, 624 silent: bool = False, 625 debug: bool = False, 626) -> List[Union[sqlalchemy.engine.cursor.CursorResult, None]]: 627 """ 628 Execute a list of queries in a single transaction. 629 630 Parameters 631 ---------- 632 queries: List[ 633 Union[ 634 str, 635 Tuple[str, Callable[[], List[str]]] 636 ] 637 ] 638 The queries in the transaction to be executed. 639 If a query is a tuple, the second item of the tuple 640 will be considered a callable hook that returns a list of queries to be executed 641 before the next item in the list. 642 643 break_on_error: bool, default False 644 If `True`, stop executing when a query fails. 645 646 rollback: bool, default True 647 If `break_on_error` is `True`, rollback the transaction if a query fails. 648 649 silent: bool, default False 650 If `True`, suppress warnings. 651 652 Returns 653 ------- 654 A list of SQLAlchemy results. 655 """ 656 from meerschaum.utils.warnings import warn 657 from meerschaum.utils.debug import dprint 658 from meerschaum.utils.packages import attempt_import 659 sqlalchemy, sqlalchemy_orm = attempt_import('sqlalchemy', 'sqlalchemy.orm', lazy=False) 660 session = sqlalchemy_orm.Session(self.engine) 661 662 result = None 663 results = [] 664 with session.begin(): 665 for query in queries: 666 hook = None 667 result = None 668 669 if isinstance(query, tuple): 670 query, hook = query 671 if isinstance(query, str): 672 query = sqlalchemy.text(query) 673 674 if debug: 675 dprint(f"[{self}]\n" + str(query)) 676 677 try: 678 result = session.execute(query) 679 session.flush() 680 except Exception as e: 681 msg = (f"Encountered error while executing:\n{e}") 682 if not silent: 683 warn(msg) 684 elif debug: 685 dprint(f"[{self}]\n" + str(msg)) 686 result = None 687 if result is None and break_on_error: 688 if rollback: 689 session.rollback() 690 results.append(result) 691 break 692 elif result is not None and hook is not None: 693 hook_queries = hook(session) 694 if hook_queries: 695 hook_results = self.exec_queries( 696 hook_queries, 697 break_on_error = break_on_error, 698 rollback=rollback, 699 silent=silent, 700 debug=debug, 701 ) 702 result = (result, hook_results) 703 704 results.append(result) 705 706 return results
Execute a list of queries in a single transaction.
Parameters
- queries (List[): Union[ str, Tuple[str, Callable[[], List[str]]] ]
- ]: The queries in the transaction to be executed. If a query is a tuple, the second item of the tuple will be considered a callable hook that returns a list of queries to be executed before the next item in the list.
- break_on_error (bool, default False):
If
True
, stop executing when a query fails. - rollback (bool, default True):
If
break_on_error
isTrue
, rollback the transaction if a query fails. - silent (bool, default False):
If
True
, suppress warnings.
Returns
- A list of SQLAlchemy results.
1231def get_connection(self, rebuild: bool = False) -> 'sqlalchemy.engine.base.Connection': 1232 """ 1233 Return the current alive connection. 1234 1235 Parameters 1236 ---------- 1237 rebuild: bool, default False 1238 If `True`, close the previous connection and open a new one. 1239 1240 Returns 1241 ------- 1242 A `sqlalchemy.engine.base.Connection` object. 1243 """ 1244 import threading 1245 if '_thread_connections' not in self.__dict__: 1246 self.__dict__['_thread_connections'] = {} 1247 1248 self._cleanup_connections() 1249 1250 thread_id = threading.get_ident() 1251 1252 thread_connections = self.__dict__.get('_thread_connections', {}) 1253 connection = thread_connections.get(thread_id, None) 1254 1255 if rebuild and connection is not None: 1256 try: 1257 connection.close() 1258 except Exception: 1259 pass 1260 1261 _ = thread_connections.pop(thread_id, None) 1262 connection = None 1263 1264 if connection is None or connection.closed: 1265 connection = self.engine.connect() 1266 thread_connections[thread_id] = connection 1267 1268 return connection
Return the current alive connection.
Parameters
- rebuild (bool, default False):
If
True
, close the previous connection and open a new one.
Returns
- A
sqlalchemy.engine.base.Connection
object.
707def test_connection( 708 self, 709 **kw: Any 710) -> Union[bool, None]: 711 """ 712 Test if a successful connection to the database may be made. 713 714 Parameters 715 ---------- 716 **kw: 717 The keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`. 718 719 Returns 720 ------- 721 `True` if a connection is made, otherwise `False` or `None` in case of failure. 722 723 """ 724 import warnings 725 from meerschaum.connectors.poll import retry_connect 726 _default_kw = {'max_retries': 1, 'retry_wait': 0, 'warn': False, 'connector': self} 727 _default_kw.update(kw) 728 with warnings.catch_warnings(): 729 warnings.filterwarnings('ignore', 'Could not') 730 try: 731 return retry_connect(**_default_kw) 732 except Exception: 733 return False
Test if a successful connection to the database may be made.
Parameters
- **kw:: The keyword arguments are passed to
meerschaum.connectors.poll.retry_connect
.
Returns
True
if a connection is made, otherwiseFalse
orNone
in case of failure.
18def fetch( 19 self, 20 pipe: mrsm.Pipe, 21 begin: Union[datetime, int, str, None] = '', 22 end: Union[datetime, int, str, None] = None, 23 check_existing: bool = True, 24 chunksize: Optional[int] = -1, 25 workers: Optional[int] = None, 26 debug: bool = False, 27 **kw: Any 28) -> Union['pd.DataFrame', List[Any], None]: 29 """Execute the SQL definition and return a Pandas DataFrame. 30 31 Parameters 32 ---------- 33 pipe: mrsm.Pipe 34 The pipe object which contains the `fetch` metadata. 35 36 - pipe.columns['datetime']: str 37 - Name of the datetime column for the remote table. 38 - pipe.parameters['fetch']: Dict[str, Any] 39 - Parameters necessary to execute a query. 40 - pipe.parameters['fetch']['definition']: str 41 - Raw SQL query to execute to generate the pandas DataFrame. 42 - pipe.parameters['fetch']['backtrack_minutes']: Union[int, float] 43 - How many minutes before `begin` to search for data (*optional*). 44 45 begin: Union[datetime, int, str, None], default None 46 Most recent datatime to search for data. 47 If `backtrack_minutes` is provided, subtract `backtrack_minutes`. 48 49 end: Union[datetime, int, str, None], default None 50 The latest datetime to search for data. 51 If `end` is `None`, do not bound 52 53 check_existing: bool, defult True 54 If `False`, use a backtrack interval of 0 minutes. 55 56 chunksize: Optional[int], default -1 57 How many rows to load into memory at once. 58 Otherwise the entire result set is loaded into memory. 59 60 workers: Optional[int], default None 61 How many threads to use when consuming the generator. 62 Defaults to the number of cores. 63 64 debug: bool, default False 65 Verbosity toggle. 66 67 Returns 68 ------- 69 A pandas DataFrame generator. 70 """ 71 meta_def = self.get_pipe_metadef( 72 pipe, 73 begin=begin, 74 end=end, 75 check_existing=check_existing, 76 debug=debug, 77 **kw 78 ) 79 chunks = self.read( 80 meta_def, 81 chunksize=chunksize, 82 workers=workers, 83 as_iterator=True, 84 debug=debug, 85 ) 86 return chunks
Execute the SQL definition and return a Pandas DataFrame.
Parameters
pipe (mrsm.Pipe): The pipe object which contains the
fetch
metadata.- pipe.columns['datetime']: str
- Name of the datetime column for the remote table.
- pipe.parameters['fetch']: Dict[str, Any]
- Parameters necessary to execute a query.
- pipe.parameters['fetch']['definition']: str
- Raw SQL query to execute to generate the pandas DataFrame.
- pipe.parameters['fetch']['backtrack_minutes']: Union[int, float]
- How many minutes before
begin
to search for data (optional).
- How many minutes before
- pipe.columns['datetime']: str
- begin (Union[datetime, int, str, None], default None):
Most recent datatime to search for data.
If
backtrack_minutes
is provided, subtractbacktrack_minutes
. - end (Union[datetime, int, str, None], default None):
The latest datetime to search for data.
If
end
isNone
, do not bound - check_existing (bool, defult True):
If
False
, use a backtrack interval of 0 minutes. - chunksize (Optional[int], default -1): How many rows to load into memory at once. Otherwise the entire result set is loaded into memory.
- workers (Optional[int], default None): How many threads to use when consuming the generator. Defaults to the number of cores.
- debug (bool, default False): Verbosity toggle.
Returns
- A pandas DataFrame generator.
89def get_pipe_metadef( 90 self, 91 pipe: mrsm.Pipe, 92 params: Optional[Dict[str, Any]] = None, 93 begin: Union[datetime, int, str, None] = '', 94 end: Union[datetime, int, str, None] = None, 95 check_existing: bool = True, 96 debug: bool = False, 97 **kw: Any 98) -> Union[str, None]: 99 """ 100 Return a pipe's meta definition fetch query. 101 102 params: Optional[Dict[str, Any]], default None 103 Optional params dictionary to build the `WHERE` clause. 104 See `meerschaum.utils.sql.build_where`. 105 106 begin: Union[datetime, int, str, None], default None 107 Most recent datatime to search for data. 108 If `backtrack_minutes` is provided, subtract `backtrack_minutes`. 109 110 end: Union[datetime, int, str, None], default None 111 The latest datetime to search for data. 112 If `end` is `None`, do not bound 113 114 check_existing: bool, default True 115 If `True`, apply the backtrack interval. 116 117 debug: bool, default False 118 Verbosity toggle. 119 120 Returns 121 ------- 122 A pipe's meta definition fetch query string. 123 """ 124 from meerschaum.utils.warnings import warn 125 from meerschaum.utils.sql import sql_item_name, dateadd_str, build_where 126 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 127 from meerschaum.config import get_config 128 129 dt_col = pipe.columns.get('datetime', None) 130 if not dt_col: 131 dt_col = pipe.guess_datetime() 132 dt_name = sql_item_name(dt_col, self.flavor, None) if dt_col else None 133 is_guess = True 134 else: 135 dt_name = sql_item_name(dt_col, self.flavor, None) 136 is_guess = False 137 dt_typ = pipe.dtypes.get(dt_col, 'datetime') if dt_col else None 138 db_dt_typ = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None 139 140 if begin not in (None, '') or end is not None: 141 if is_guess: 142 if dt_col is None: 143 warn( 144 f"Unable to determine a datetime column for {pipe}." 145 + "\n Ignoring begin and end...", 146 stack=False, 147 ) 148 begin, end = '', None 149 else: 150 warn( 151 f"A datetime wasn't specified for {pipe}.\n" 152 + f" Using column \"{dt_col}\" for datetime bounds...", 153 stack=False 154 ) 155 156 apply_backtrack = begin == '' and check_existing 157 backtrack_interval = pipe.get_backtrack_interval(check_existing=check_existing, debug=debug) 158 btm = ( 159 int(backtrack_interval.total_seconds() / 60) 160 if isinstance(backtrack_interval, timedelta) 161 else backtrack_interval 162 ) 163 begin = ( 164 pipe.get_sync_time(debug=debug) 165 if begin == '' 166 else begin 167 ) 168 169 if begin not in (None, '') and end is not None and begin >= end: 170 begin = None 171 172 if dt_name: 173 begin_da = ( 174 dateadd_str( 175 flavor=self.flavor, 176 datepart='minute', 177 number=((-1 * btm) if apply_backtrack else 0), 178 begin=begin, 179 db_type=db_dt_typ, 180 ) 181 if begin not in ('', None) 182 else None 183 ) 184 end_da = ( 185 dateadd_str( 186 flavor=self.flavor, 187 datepart='minute', 188 number=0, 189 begin=end, 190 db_type=db_dt_typ, 191 ) 192 if end is not None 193 else None 194 ) 195 196 definition_name = sql_item_name('definition', self.flavor, None) 197 meta_def = ( 198 _simple_fetch_query(pipe, self.flavor) if ( 199 (not (pipe.columns or {}).get('id', None)) 200 or (not get_config('system', 'experimental', 'join_fetch')) 201 ) else _join_fetch_query(pipe, self.flavor, debug=debug, **kw) 202 ) 203 204 has_where = 'where' in meta_def.lower()[meta_def.lower().rfind('definition'):] 205 if dt_name and (begin_da or end_da): 206 definition_dt_name = f"{definition_name}.{dt_name}" 207 meta_def += "\n" + ("AND" if has_where else "WHERE") + " " 208 has_where = True 209 if begin_da: 210 meta_def += f"\n {definition_dt_name}\n >=\n {begin_da}\n" 211 if begin_da and end_da: 212 meta_def += " AND" 213 if end_da: 214 meta_def += f"\n {definition_dt_name}\n <\n {end_da}\n" 215 216 if params is not None: 217 params_where = build_where(params, self, with_where=False) 218 meta_def += "\n " + ("AND" if has_where else "WHERE") + " " 219 has_where = True 220 meta_def += params_where 221 222 return meta_def.rstrip()
Return a pipe's meta definition fetch query.
params: Optional[Dict[str, Any]], default None
Optional params dictionary to build the WHERE
clause.
See meerschaum.utils.sql.build_where
.
begin: Union[datetime, int, str, None], default None
Most recent datatime to search for data.
If backtrack_minutes
is provided, subtract backtrack_minutes
.
end: Union[datetime, int, str, None], default None
The latest datetime to search for data.
If end
is None
, do not bound
check_existing: bool, default True
If True
, apply the backtrack interval.
debug: bool, default False Verbosity toggle.
Returns
- A pipe's meta definition fetch query string.
36def cli( 37 self, 38 debug: bool = False, 39) -> SuccessTuple: 40 """ 41 Launch a subprocess for an interactive CLI. 42 """ 43 from meerschaum.utils.warnings import dprint 44 from meerschaum.utils.venv import venv_exec 45 env = copy.deepcopy(dict(os.environ)) 46 env_key = f"MRSM_SQL_{self.label.upper()}" 47 env_val = json.dumps(self.meta) 48 env[env_key] = env_val 49 cli_code = ( 50 "import sys\n" 51 "import meerschaum as mrsm\n" 52 "import os\n" 53 f"conn = mrsm.get_connector('sql:{self.label}')\n" 54 "success, msg = conn._cli_exit()\n" 55 "mrsm.pprint((success, msg))\n" 56 "if not success:\n" 57 " raise Exception(msg)" 58 ) 59 if debug: 60 dprint(cli_code) 61 try: 62 _ = venv_exec(cli_code, venv=None, env=env, debug=debug, capture_output=False) 63 except Exception as e: 64 return False, f"[{self}] Failed to start CLI:\n{e}" 65 return True, "Success"
Launch a subprocess for an interactive CLI.
144def fetch_pipes_keys( 145 self, 146 connector_keys: Optional[List[str]] = None, 147 metric_keys: Optional[List[str]] = None, 148 location_keys: Optional[List[str]] = None, 149 tags: Optional[List[str]] = None, 150 params: Optional[Dict[str, Any]] = None, 151 debug: bool = False 152) -> Optional[List[Tuple[str, str, Optional[str]]]]: 153 """ 154 Return a list of tuples corresponding to the parameters provided. 155 156 Parameters 157 ---------- 158 connector_keys: Optional[List[str]], default None 159 List of connector_keys to search by. 160 161 metric_keys: Optional[List[str]], default None 162 List of metric_keys to search by. 163 164 location_keys: Optional[List[str]], default None 165 List of location_keys to search by. 166 167 params: Optional[Dict[str, Any]], default None 168 Dictionary of additional parameters to search by. 169 E.g. `--params pipe_id:1` 170 171 debug: bool, default False 172 Verbosity toggle. 173 """ 174 from meerschaum.utils.debug import dprint 175 from meerschaum.utils.packages import attempt_import 176 from meerschaum.utils.misc import separate_negation_values 177 from meerschaum.utils.sql import OMIT_NULLSFIRST_FLAVORS, table_exists 178 from meerschaum.config.static import STATIC_CONFIG 179 import json 180 from copy import deepcopy 181 sqlalchemy, sqlalchemy_sql_functions = attempt_import( 182 'sqlalchemy', 183 'sqlalchemy.sql.functions', lazy=False, 184 ) 185 coalesce = sqlalchemy_sql_functions.coalesce 186 187 if connector_keys is None: 188 connector_keys = [] 189 if metric_keys is None: 190 metric_keys = [] 191 if location_keys is None: 192 location_keys = [] 193 else: 194 location_keys = [ 195 ( 196 lk 197 if lk not in ('[None]', 'None', 'null') 198 else 'None' 199 ) 200 for lk in location_keys 201 ] 202 if tags is None: 203 tags = [] 204 205 if params is None: 206 params = {} 207 208 ### Add three primary keys to params dictionary 209 ### (separated for convenience of arguments). 210 cols = { 211 'connector_keys': [str(ck) for ck in connector_keys], 212 'metric_key': [str(mk) for mk in metric_keys], 213 'location_key': [str(lk) for lk in location_keys], 214 } 215 216 ### Make deep copy so we don't mutate this somewhere else. 217 parameters = deepcopy(params) 218 for col, vals in cols.items(): 219 if vals not in [[], ['*']]: 220 parameters[col] = vals 221 222 if not table_exists('mrsm_pipes', self, schema=self.instance_schema, debug=debug): 223 return [] 224 225 from meerschaum.connectors.sql.tables import get_tables 226 pipes_tbl = get_tables(mrsm_instance=self, create=False, debug=debug)['pipes'] 227 228 _params = {} 229 for k, v in parameters.items(): 230 _v = json.dumps(v) if isinstance(v, dict) else v 231 _params[k] = _v 232 233 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 234 ### Parse regular params. 235 ### If a param begins with '_', negate it instead. 236 _where = [ 237 ( 238 (coalesce(pipes_tbl.c[key], 'None') == val) 239 if not str(val).startswith(negation_prefix) 240 else (pipes_tbl.c[key] != key) 241 ) for key, val in _params.items() 242 if not isinstance(val, (list, tuple)) and key in pipes_tbl.c 243 ] 244 select_cols = ( 245 [ 246 pipes_tbl.c.connector_keys, 247 pipes_tbl.c.metric_key, 248 pipes_tbl.c.location_key, 249 ] 250 ) 251 252 q = sqlalchemy.select(*select_cols).where(sqlalchemy.and_(True, *_where)) 253 for c, vals in cols.items(): 254 if not isinstance(vals, (list, tuple)) or not vals or c not in pipes_tbl.c: 255 continue 256 _in_vals, _ex_vals = separate_negation_values(vals) 257 q = q.where(coalesce(pipes_tbl.c[c], 'None').in_(_in_vals)) if _in_vals else q 258 q = q.where(coalesce(pipes_tbl.c[c], 'None').not_in(_ex_vals)) if _ex_vals else q 259 260 ### Finally, parse tags. 261 tag_groups = [tag.split(',') for tag in tags] 262 in_ex_tag_groups = [separate_negation_values(tag_group) for tag_group in tag_groups] 263 264 ors, nands = [], [] 265 for _in_tags, _ex_tags in in_ex_tag_groups: 266 sub_ands = [] 267 for nt in _in_tags: 268 sub_ands.append( 269 sqlalchemy.cast( 270 pipes_tbl.c['parameters'], 271 sqlalchemy.String, 272 ).like(f'%"tags":%"{nt}"%') 273 ) 274 if sub_ands: 275 ors.append(sqlalchemy.and_(*sub_ands)) 276 277 for xt in _ex_tags: 278 nands.append( 279 sqlalchemy.cast( 280 pipes_tbl.c['parameters'], 281 sqlalchemy.String, 282 ).not_like(f'%"tags":%"{xt}"%') 283 ) 284 285 q = q.where(sqlalchemy.and_(*nands)) if nands else q 286 q = q.where(sqlalchemy.or_(*ors)) if ors else q 287 loc_asc = sqlalchemy.asc(pipes_tbl.c['location_key']) 288 if self.flavor not in OMIT_NULLSFIRST_FLAVORS: 289 loc_asc = sqlalchemy.nullsfirst(loc_asc) 290 q = q.order_by( 291 sqlalchemy.asc(pipes_tbl.c['connector_keys']), 292 sqlalchemy.asc(pipes_tbl.c['metric_key']), 293 loc_asc, 294 ) 295 296 ### execute the query and return a list of tuples 297 if debug: 298 dprint(q.compile(compile_kwargs={'literal_binds': True})) 299 try: 300 rows = ( 301 self.execute(q).fetchall() 302 if self.flavor != 'duckdb' 303 else [ 304 (row['connector_keys'], row['metric_key'], row['location_key']) 305 for row in self.read(q).to_dict(orient='records') 306 ] 307 ) 308 except Exception as e: 309 error(str(e)) 310 311 return [(row[0], row[1], row[2]) for row in rows]
Return a list of tuples corresponding to the parameters provided.
Parameters
- connector_keys (Optional[List[str]], default None): List of connector_keys to search by.
- metric_keys (Optional[List[str]], default None): List of metric_keys to search by.
- location_keys (Optional[List[str]], default None): List of location_keys to search by.
- params (Optional[Dict[str, Any]], default None):
Dictionary of additional parameters to search by.
E.g.
--params pipe_id:1
- debug (bool, default False): Verbosity toggle.
332def create_indices( 333 self, 334 pipe: mrsm.Pipe, 335 columns: Optional[List[str]] = None, 336 indices: Optional[List[str]] = None, 337 debug: bool = False 338) -> bool: 339 """ 340 Create a pipe's indices. 341 """ 342 from meerschaum.utils.debug import dprint 343 if debug: 344 dprint(f"Creating indices for {pipe}...") 345 346 if not pipe.indices: 347 warn(f"{pipe} has no index columns; skipping index creation.", stack=False) 348 return True 349 350 cols_to_include = set((columns or []) + (indices or [])) or None 351 352 _ = pipe.__dict__.pop('_columns_indices', None) 353 ix_queries = { 354 col: queries 355 for col, queries in self.get_create_index_queries(pipe, debug=debug).items() 356 if cols_to_include is None or col in cols_to_include 357 } 358 success = True 359 for col, queries in ix_queries.items(): 360 ix_success = all(self.exec_queries(queries, debug=debug, silent=False)) 361 success = success and ix_success 362 if not ix_success: 363 warn(f"Failed to create index on column: {col}") 364 365 return success
Create a pipe's indices.
386def drop_indices( 387 self, 388 pipe: mrsm.Pipe, 389 columns: Optional[List[str]] = None, 390 indices: Optional[List[str]] = None, 391 debug: bool = False 392) -> bool: 393 """ 394 Drop a pipe's indices. 395 """ 396 from meerschaum.utils.debug import dprint 397 if debug: 398 dprint(f"Dropping indices for {pipe}...") 399 400 if not pipe.indices: 401 warn(f"No indices to drop for {pipe}.", stack=False) 402 return False 403 404 cols_to_include = set((columns or []) + (indices or [])) or None 405 406 ix_queries = { 407 col: queries 408 for col, queries in self.get_drop_index_queries(pipe, debug=debug).items() 409 if cols_to_include is None or col in cols_to_include 410 } 411 success = True 412 for col, queries in ix_queries.items(): 413 ix_success = all(self.exec_queries(queries, debug=debug, silent=(not debug))) 414 if not ix_success: 415 success = False 416 if debug: 417 dprint(f"Failed to drop index on column: {col}") 418 return success
Drop a pipe's indices.
474def get_create_index_queries( 475 self, 476 pipe: mrsm.Pipe, 477 debug: bool = False, 478) -> Dict[str, List[str]]: 479 """ 480 Return a dictionary mapping columns to a `CREATE INDEX` or equivalent query. 481 482 Parameters 483 ---------- 484 pipe: mrsm.Pipe 485 The pipe to which the queries will correspond. 486 487 Returns 488 ------- 489 A dictionary of index names mapping to lists of queries. 490 """ 491 ### NOTE: Due to recent breaking changes in DuckDB, indices don't behave properly. 492 if self.flavor == 'duckdb': 493 return {} 494 from meerschaum.utils.sql import ( 495 sql_item_name, 496 get_distinct_col_count, 497 UPDATE_QUERIES, 498 get_null_replacement, 499 get_create_table_queries, 500 get_rename_table_queries, 501 COALESCE_UNIQUE_INDEX_FLAVORS, 502 ) 503 from meerschaum.utils.dtypes.sql import ( 504 get_db_type_from_pd_type, 505 get_pd_type_from_db_type, 506 AUTO_INCREMENT_COLUMN_FLAVORS, 507 ) 508 from meerschaum.config import get_config 509 index_queries = {} 510 511 upsert = pipe.parameters.get('upsert', False) and (self.flavor + '-upsert') in UPDATE_QUERIES 512 static = pipe.parameters.get('static', False) 513 null_indices = pipe.parameters.get('null_indices', True) 514 index_names = pipe.get_indices() 515 unique_index_name_unquoted = index_names.get('unique', None) or f'IX_{pipe.target}_unique' 516 if upsert: 517 _ = index_names.pop('unique', None) 518 indices = pipe.indices 519 existing_cols_types = pipe.get_columns_types(debug=debug) 520 existing_cols_pd_types = { 521 col: get_pd_type_from_db_type(typ) 522 for col, typ in existing_cols_types.items() 523 } 524 existing_cols_indices = self.get_pipe_columns_indices(pipe, debug=debug) 525 existing_ix_names = set() 526 existing_primary_keys = [] 527 existing_clustered_primary_keys = [] 528 for col, col_indices in existing_cols_indices.items(): 529 for col_ix_doc in col_indices: 530 existing_ix_names.add(col_ix_doc.get('name', '').lower()) 531 if col_ix_doc.get('type', None) == 'PRIMARY KEY': 532 existing_primary_keys.append(col.lower()) 533 if col_ix_doc.get('clustered', True): 534 existing_clustered_primary_keys.append(col.lower()) 535 536 _datetime = pipe.get_columns('datetime', error=False) 537 _datetime_name = ( 538 sql_item_name(_datetime, self.flavor, None) 539 if _datetime is not None else None 540 ) 541 _datetime_index_name = ( 542 sql_item_name(index_names['datetime'], flavor=self.flavor, schema=None) 543 if index_names.get('datetime', None) 544 else None 545 ) 546 _id = pipe.get_columns('id', error=False) 547 _id_name = ( 548 sql_item_name(_id, self.flavor, None) 549 if _id is not None 550 else None 551 ) 552 primary_key = pipe.columns.get('primary', None) 553 primary_key_name = ( 554 sql_item_name(primary_key, flavor=self.flavor, schema=None) 555 if primary_key 556 else None 557 ) 558 autoincrement = ( 559 pipe.parameters.get('autoincrement', False) 560 or ( 561 primary_key is not None 562 and primary_key not in existing_cols_pd_types 563 ) 564 ) 565 primary_key_db_type = ( 566 get_db_type_from_pd_type(pipe.dtypes.get(primary_key, 'int') or 'int', self.flavor) 567 if primary_key 568 else None 569 ) 570 primary_key_constraint_name = ( 571 sql_item_name(f'PK_{pipe.target}', self.flavor, None) 572 if primary_key is not None 573 else None 574 ) 575 primary_key_clustered = "CLUSTERED" if _datetime is None else "NONCLUSTERED" 576 datetime_clustered = ( 577 "CLUSTERED" 578 if not existing_clustered_primary_keys and _datetime is not None 579 else "NONCLUSTERED" 580 ) 581 include_columns_str = "\n ,".join( 582 [ 583 sql_item_name(col, flavor=self.flavor) for col in existing_cols_types 584 if col != _datetime 585 ] 586 ).rstrip(',') 587 include_clause = ( 588 ( 589 f"\nINCLUDE (\n {include_columns_str}\n)" 590 ) 591 if datetime_clustered == 'NONCLUSTERED' 592 else '' 593 ) 594 595 _id_index_name = ( 596 sql_item_name(index_names['id'], self.flavor, None) 597 if index_names.get('id', None) 598 else None 599 ) 600 _pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 601 _create_space_partition = get_config('system', 'experimental', 'space') 602 603 ### create datetime index 604 dt_query = None 605 if _datetime is not None: 606 if self.flavor == 'timescaledb' and pipe.parameters.get('hypertable', True): 607 _id_count = ( 608 get_distinct_col_count(_id, f"SELECT {_id_name} FROM {_pipe_name}", self) 609 if (_id is not None and _create_space_partition) else None 610 ) 611 612 chunk_interval = pipe.get_chunk_interval(debug=debug) 613 chunk_interval_minutes = ( 614 chunk_interval 615 if isinstance(chunk_interval, int) 616 else int(chunk_interval.total_seconds() / 60) 617 ) 618 chunk_time_interval = ( 619 f"INTERVAL '{chunk_interval_minutes} MINUTES'" 620 if isinstance(chunk_interval, timedelta) 621 else f'{chunk_interval_minutes}' 622 ) 623 624 dt_query = ( 625 f"SELECT public.create_hypertable('{_pipe_name}', " + 626 f"'{_datetime}', " 627 + ( 628 f"'{_id}', {_id_count}, " if (_id is not None and _create_space_partition) 629 else '' 630 ) 631 + f'chunk_time_interval => {chunk_time_interval}, ' 632 + 'if_not_exists => true, ' 633 + "migrate_data => true);" 634 ) 635 elif _datetime_index_name and _datetime != primary_key: 636 if self.flavor == 'mssql': 637 dt_query = ( 638 f"CREATE {datetime_clustered} INDEX {_datetime_index_name} " 639 f"\nON {_pipe_name} ({_datetime_name}){include_clause}" 640 ) 641 else: 642 dt_query = ( 643 f"CREATE INDEX {_datetime_index_name} " 644 + f"ON {_pipe_name} ({_datetime_name})" 645 ) 646 647 if dt_query: 648 index_queries[_datetime] = [dt_query] 649 650 primary_queries = [] 651 if ( 652 primary_key is not None 653 and primary_key.lower() not in existing_primary_keys 654 and not static 655 ): 656 if autoincrement and primary_key not in existing_cols_pd_types: 657 autoincrement_str = AUTO_INCREMENT_COLUMN_FLAVORS.get( 658 self.flavor, 659 AUTO_INCREMENT_COLUMN_FLAVORS['default'] 660 ) 661 primary_queries.extend([ 662 ( 663 f"ALTER TABLE {_pipe_name}\n" 664 f"ADD {primary_key_name} {primary_key_db_type} {autoincrement_str}" 665 ), 666 ]) 667 elif not autoincrement and primary_key in existing_cols_pd_types: 668 if self.flavor == 'sqlite': 669 new_table_name = sql_item_name( 670 f'_new_{pipe.target}', 671 self.flavor, 672 self.get_pipe_schema(pipe) 673 ) 674 select_cols_str = ', '.join( 675 [ 676 sql_item_name(col, self.flavor, None) 677 for col in existing_cols_types 678 ] 679 ) 680 primary_queries.extend( 681 get_create_table_queries( 682 existing_cols_pd_types, 683 f'_new_{pipe.target}', 684 self.flavor, 685 schema=self.get_pipe_schema(pipe), 686 primary_key=primary_key, 687 ) + [ 688 ( 689 f"INSERT INTO {new_table_name} ({select_cols_str})\n" 690 f"SELECT {select_cols_str}\nFROM {_pipe_name}" 691 ), 692 f"DROP TABLE {_pipe_name}", 693 ] + get_rename_table_queries( 694 f'_new_{pipe.target}', 695 pipe.target, 696 self.flavor, 697 schema=self.get_pipe_schema(pipe), 698 ) 699 ) 700 elif self.flavor == 'oracle': 701 primary_queries.extend([ 702 ( 703 f"ALTER TABLE {_pipe_name}\n" 704 f"MODIFY {primary_key_name} NOT NULL" 705 ), 706 ( 707 f"ALTER TABLE {_pipe_name}\n" 708 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})" 709 ) 710 ]) 711 elif self.flavor in ('mysql', 'mariadb'): 712 primary_queries.extend([ 713 ( 714 f"ALTER TABLE {_pipe_name}\n" 715 f"MODIFY {primary_key_name} {primary_key_db_type} NOT NULL" 716 ), 717 ( 718 f"ALTER TABLE {_pipe_name}\n" 719 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})" 720 ) 721 ]) 722 elif self.flavor == 'timescaledb': 723 primary_queries.extend([ 724 ( 725 f"ALTER TABLE {_pipe_name}\n" 726 f"ALTER COLUMN {primary_key_name} SET NOT NULL" 727 ), 728 ( 729 f"ALTER TABLE {_pipe_name}\n" 730 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY (" + ( 731 f"{_datetime_name}, " if _datetime_name else "" 732 ) + f"{primary_key_name})" 733 ), 734 ]) 735 elif self.flavor in ('citus', 'postgresql', 'duckdb'): 736 primary_queries.extend([ 737 ( 738 f"ALTER TABLE {_pipe_name}\n" 739 f"ALTER COLUMN {primary_key_name} SET NOT NULL" 740 ), 741 ( 742 f"ALTER TABLE {_pipe_name}\n" 743 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY ({primary_key_name})" 744 ), 745 ]) 746 else: 747 primary_queries.extend([ 748 ( 749 f"ALTER TABLE {_pipe_name}\n" 750 f"ALTER COLUMN {primary_key_name} {primary_key_db_type} NOT NULL" 751 ), 752 ( 753 f"ALTER TABLE {_pipe_name}\n" 754 f"ADD CONSTRAINT {primary_key_constraint_name} PRIMARY KEY {primary_key_clustered} ({primary_key_name})" 755 ), 756 ]) 757 index_queries[primary_key] = primary_queries 758 759 ### create id index 760 if _id_name is not None: 761 if self.flavor == 'timescaledb': 762 ### Already created indices via create_hypertable. 763 id_query = ( 764 None if (_id is not None and _create_space_partition) 765 else ( 766 f"CREATE INDEX IF NOT EXISTS {_id_index_name} ON {_pipe_name} ({_id_name})" 767 if _id is not None 768 else None 769 ) 770 ) 771 pass 772 else: ### mssql, sqlite, etc. 773 id_query = f"CREATE INDEX {_id_index_name} ON {_pipe_name} ({_id_name})" 774 775 if id_query is not None: 776 index_queries[_id] = id_query if isinstance(id_query, list) else [id_query] 777 778 ### Create indices for other labels in `pipe.columns`. 779 other_index_names = { 780 ix_key: ix_unquoted 781 for ix_key, ix_unquoted in index_names.items() 782 if ( 783 ix_key not in ('datetime', 'id', 'primary') 784 and ix_unquoted.lower() not in existing_ix_names 785 ) 786 } 787 for ix_key, ix_unquoted in other_index_names.items(): 788 ix_name = sql_item_name(ix_unquoted, self.flavor, None) 789 cols = indices[ix_key] 790 if not isinstance(cols, (list, tuple)): 791 cols = [cols] 792 if ix_key == 'unique' and upsert: 793 continue 794 cols_names = [sql_item_name(col, self.flavor, None) for col in cols if col] 795 if not cols_names: 796 continue 797 cols_names_str = ", ".join(cols_names) 798 index_queries[ix_key] = [f"CREATE INDEX {ix_name} ON {_pipe_name} ({cols_names_str})"] 799 800 indices_cols_str = ', '.join( 801 list({ 802 sql_item_name(ix, self.flavor) 803 for ix_key, ix in pipe.columns.items() 804 if ix and ix in existing_cols_types 805 }) 806 ) 807 coalesce_indices_cols_str = ', '.join( 808 [ 809 ( 810 ( 811 "COALESCE(" 812 + sql_item_name(ix, self.flavor) 813 + ", " 814 + get_null_replacement(existing_cols_types[ix], self.flavor) 815 + ") " 816 ) 817 if ix_key != 'datetime' and null_indices 818 else sql_item_name(ix, self.flavor) 819 ) 820 for ix_key, ix in pipe.columns.items() 821 if ix and ix in existing_cols_types 822 ] 823 ) 824 unique_index_name = sql_item_name(unique_index_name_unquoted, self.flavor) 825 constraint_name_unquoted = unique_index_name_unquoted.replace('IX_', 'UQ_') 826 constraint_name = sql_item_name(constraint_name_unquoted, self.flavor) 827 add_constraint_query = ( 828 f"ALTER TABLE {_pipe_name} ADD CONSTRAINT {constraint_name} UNIQUE ({indices_cols_str})" 829 ) 830 unique_index_cols_str = ( 831 indices_cols_str 832 if self.flavor not in COALESCE_UNIQUE_INDEX_FLAVORS or not null_indices 833 else coalesce_indices_cols_str 834 ) 835 create_unique_index_query = ( 836 f"CREATE UNIQUE INDEX {unique_index_name} ON {_pipe_name} ({unique_index_cols_str})" 837 ) 838 constraint_queries = [create_unique_index_query] 839 if self.flavor != 'sqlite': 840 constraint_queries.append(add_constraint_query) 841 if upsert and indices_cols_str: 842 index_queries[unique_index_name] = constraint_queries 843 return index_queries
Return a dictionary mapping columns to a CREATE INDEX
or equivalent query.
Parameters
- pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
- A dictionary of index names mapping to lists of queries.
846def get_drop_index_queries( 847 self, 848 pipe: mrsm.Pipe, 849 debug: bool = False, 850) -> Dict[str, List[str]]: 851 """ 852 Return a dictionary mapping columns to a `DROP INDEX` or equivalent query. 853 854 Parameters 855 ---------- 856 pipe: mrsm.Pipe 857 The pipe to which the queries will correspond. 858 859 Returns 860 ------- 861 A dictionary of column names mapping to lists of queries. 862 """ 863 ### NOTE: Due to breaking changes within DuckDB, indices must be skipped. 864 if self.flavor == 'duckdb': 865 return {} 866 if not pipe.exists(debug=debug): 867 return {} 868 869 from collections import defaultdict 870 from meerschaum.utils.sql import ( 871 sql_item_name, 872 table_exists, 873 hypertable_queries, 874 DROP_INDEX_IF_EXISTS_FLAVORS, 875 ) 876 drop_queries = defaultdict(lambda: []) 877 schema = self.get_pipe_schema(pipe) 878 index_schema = schema if self.flavor != 'mssql' else None 879 indices = { 880 ix_key: ix 881 for ix_key, ix in pipe.get_indices().items() 882 } 883 cols_indices = pipe.get_columns_indices(debug=debug) 884 existing_indices = set() 885 clustered_ix = None 886 for col, ix_metas in cols_indices.items(): 887 for ix_meta in ix_metas: 888 ix_name = ix_meta.get('name', None) 889 if ix_meta.get('clustered', False): 890 clustered_ix = ix_name 891 existing_indices.add(ix_name.lower()) 892 pipe_name = sql_item_name(pipe.target, self.flavor, schema) 893 pipe_name_no_schema = sql_item_name(pipe.target, self.flavor, None) 894 upsert = pipe.upsert 895 896 if self.flavor not in hypertable_queries: 897 is_hypertable = False 898 else: 899 is_hypertable_query = hypertable_queries[self.flavor].format(table_name=pipe_name) 900 is_hypertable = self.value(is_hypertable_query, silent=True, debug=debug) is not None 901 902 if_exists_str = "IF EXISTS " if self.flavor in DROP_INDEX_IF_EXISTS_FLAVORS else "" 903 if is_hypertable: 904 nuke_queries = [] 905 temp_table = '_' + pipe.target + '_temp_migration' 906 temp_table_name = sql_item_name(temp_table, self.flavor, self.get_pipe_schema(pipe)) 907 908 if table_exists(temp_table, self, schema=self.get_pipe_schema(pipe), debug=debug): 909 nuke_queries.append(f"DROP TABLE {if_exists_str} {temp_table_name}") 910 nuke_queries += [ 911 f"SELECT * INTO {temp_table_name} FROM {pipe_name}", 912 f"DROP TABLE {if_exists_str}{pipe_name}", 913 f"ALTER TABLE {temp_table_name} RENAME TO {pipe_name_no_schema}", 914 ] 915 nuke_ix_keys = ('datetime', 'id') 916 nuked = False 917 for ix_key in nuke_ix_keys: 918 if ix_key in indices and not nuked: 919 drop_queries[ix_key].extend(nuke_queries) 920 nuked = True 921 922 for ix_key, ix_unquoted in indices.items(): 923 if ix_key in drop_queries: 924 continue 925 if ix_unquoted.lower() not in existing_indices: 926 continue 927 928 if ix_key == 'unique' and upsert and self.flavor not in ('sqlite',) and not is_hypertable: 929 constraint_name_unquoted = ix_unquoted.replace('IX_', 'UQ_') 930 constraint_name = sql_item_name(constraint_name_unquoted, self.flavor) 931 constraint_or_index = ( 932 "CONSTRAINT" 933 if self.flavor not in ('mysql', 'mariadb') 934 else 'INDEX' 935 ) 936 drop_queries[ix_key].append( 937 f"ALTER TABLE {pipe_name}\n" 938 f"DROP {constraint_or_index} {constraint_name}" 939 ) 940 941 query = ( 942 ( 943 f"ALTER TABLE {pipe_name}\n" 944 if self.flavor in ('mysql', 'mariadb') 945 else '' 946 ) 947 + f"DROP INDEX {if_exists_str}" 948 + sql_item_name(ix_unquoted, self.flavor, index_schema) 949 ) 950 if self.flavor == 'mssql': 951 query += f"\nON {pipe_name}" 952 if ix_unquoted == clustered_ix: 953 query += "\nWITH (ONLINE = ON, MAXDOP = 4)" 954 drop_queries[ix_key].append(query) 955 956 957 return drop_queries
Return a dictionary mapping columns to a DROP INDEX
or equivalent query.
Parameters
- pipe (mrsm.Pipe): The pipe to which the queries will correspond.
Returns
- A dictionary of column names mapping to lists of queries.
3123def get_add_columns_queries( 3124 self, 3125 pipe: mrsm.Pipe, 3126 df: Union[pd.DataFrame, Dict[str, str]], 3127 _is_db_types: bool = False, 3128 debug: bool = False, 3129) -> List[str]: 3130 """ 3131 Add new null columns of the correct type to a table from a dataframe. 3132 3133 Parameters 3134 ---------- 3135 pipe: mrsm.Pipe 3136 The pipe to be altered. 3137 3138 df: Union[pd.DataFrame, Dict[str, str]] 3139 The pandas DataFrame which contains new columns. 3140 If a dictionary is provided, assume it maps columns to Pandas data types. 3141 3142 _is_db_types: bool, default False 3143 If `True`, assume `df` is a dictionary mapping columns to SQL native dtypes. 3144 3145 Returns 3146 ------- 3147 A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector. 3148 """ 3149 if not pipe.exists(debug=debug): 3150 return [] 3151 3152 if pipe.parameters.get('static', False): 3153 return [] 3154 3155 from decimal import Decimal 3156 import copy 3157 from meerschaum.utils.sql import ( 3158 sql_item_name, 3159 SINGLE_ALTER_TABLE_FLAVORS, 3160 get_table_cols_types, 3161 ) 3162 from meerschaum.utils.dtypes.sql import ( 3163 get_pd_type_from_db_type, 3164 get_db_type_from_pd_type, 3165 ) 3166 from meerschaum.utils.misc import flatten_list 3167 table_obj = self.get_pipe_table(pipe, debug=debug) 3168 is_dask = 'dask' in df.__module__ if not isinstance(df, dict) else False 3169 if is_dask: 3170 df = df.partitions[0].compute() 3171 df_cols_types = ( 3172 { 3173 col: str(typ) 3174 for col, typ in df.dtypes.items() 3175 } 3176 if not isinstance(df, dict) 3177 else copy.deepcopy(df) 3178 ) 3179 if not isinstance(df, dict) and len(df.index) > 0: 3180 for col, typ in list(df_cols_types.items()): 3181 if typ != 'object': 3182 continue 3183 val = df.iloc[0][col] 3184 if isinstance(val, (dict, list)): 3185 df_cols_types[col] = 'json' 3186 elif isinstance(val, Decimal): 3187 df_cols_types[col] = 'numeric' 3188 elif isinstance(val, str): 3189 df_cols_types[col] = 'str' 3190 db_cols_types = { 3191 col: get_pd_type_from_db_type(str(typ.type)) 3192 for col, typ in table_obj.columns.items() 3193 } if table_obj is not None else { 3194 col: get_pd_type_from_db_type(typ) 3195 for col, typ in get_table_cols_types( 3196 pipe.target, 3197 self, 3198 schema=self.get_pipe_schema(pipe), 3199 debug=debug, 3200 ).items() 3201 } 3202 new_cols = set(df_cols_types) - set(db_cols_types) 3203 if not new_cols: 3204 return [] 3205 3206 new_cols_types = { 3207 col: get_db_type_from_pd_type( 3208 df_cols_types[col], 3209 self.flavor 3210 ) 3211 for col in new_cols 3212 if col and df_cols_types.get(col, None) 3213 } 3214 3215 alter_table_query = "ALTER TABLE " + sql_item_name( 3216 pipe.target, self.flavor, self.get_pipe_schema(pipe) 3217 ) 3218 queries = [] 3219 for col, typ in new_cols_types.items(): 3220 add_col_query = ( 3221 "\nADD " 3222 + sql_item_name(col, self.flavor, None) 3223 + " " + typ + "," 3224 ) 3225 3226 if self.flavor in SINGLE_ALTER_TABLE_FLAVORS: 3227 queries.append(alter_table_query + add_col_query[:-1]) 3228 else: 3229 alter_table_query += add_col_query 3230 3231 ### For most flavors, only one query is required. 3232 ### This covers SQLite which requires one query per column. 3233 if not queries: 3234 queries.append(alter_table_query[:-1]) 3235 3236 if self.flavor != 'duckdb': 3237 return queries 3238 3239 ### NOTE: For DuckDB, we must drop and rebuild the indices. 3240 drop_index_queries = list(flatten_list( 3241 [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()] 3242 )) 3243 create_index_queries = list(flatten_list( 3244 [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()] 3245 )) 3246 3247 return drop_index_queries + queries + create_index_queries
Add new null columns of the correct type to a table from a dataframe.
Parameters
- pipe (mrsm.Pipe): The pipe to be altered.
- df (Union[pd.DataFrame, Dict[str, str]]): The pandas DataFrame which contains new columns. If a dictionary is provided, assume it maps columns to Pandas data types.
- _is_db_types (bool, default False):
If
True
, assumedf
is a dictionary mapping columns to SQL native dtypes.
Returns
- A list of the
ALTER TABLE
SQL query or queries to be executed on the provided connector.
3250def get_alter_columns_queries( 3251 self, 3252 pipe: mrsm.Pipe, 3253 df: Union[pd.DataFrame, Dict[str, str]], 3254 debug: bool = False, 3255) -> List[str]: 3256 """ 3257 If we encounter a column of a different type, set the entire column to text. 3258 If the altered columns are numeric, alter to numeric instead. 3259 3260 Parameters 3261 ---------- 3262 pipe: mrsm.Pipe 3263 The pipe to be altered. 3264 3265 df: Union[pd.DataFrame, Dict[str, str]] 3266 The pandas DataFrame which may contain altered columns. 3267 If a dict is provided, assume it maps columns to Pandas data types. 3268 3269 Returns 3270 ------- 3271 A list of the `ALTER TABLE` SQL query or queries to be executed on the provided connector. 3272 """ 3273 if not pipe.exists(debug=debug): 3274 return [] 3275 if pipe.static: 3276 return 3277 from meerschaum.utils.sql import ( 3278 sql_item_name, 3279 get_table_cols_types, 3280 DROP_IF_EXISTS_FLAVORS, 3281 SINGLE_ALTER_TABLE_FLAVORS, 3282 ) 3283 from meerschaum.utils.dataframe import get_numeric_cols 3284 from meerschaum.utils.dtypes import are_dtypes_equal 3285 from meerschaum.utils.dtypes.sql import ( 3286 get_pd_type_from_db_type, 3287 get_db_type_from_pd_type, 3288 ) 3289 from meerschaum.utils.misc import flatten_list, generate_password, items_str 3290 table_obj = self.get_pipe_table(pipe, debug=debug) 3291 target = pipe.target 3292 session_id = generate_password(3) 3293 numeric_cols = ( 3294 get_numeric_cols(df) 3295 if not isinstance(df, dict) 3296 else [ 3297 col 3298 for col, typ in df.items() 3299 if typ.startswith('numeric') 3300 ] 3301 ) 3302 df_cols_types = ( 3303 { 3304 col: str(typ) 3305 for col, typ in df.dtypes.items() 3306 } 3307 if not isinstance(df, dict) 3308 else df 3309 ) 3310 db_cols_types = { 3311 col: get_pd_type_from_db_type(str(typ.type)) 3312 for col, typ in table_obj.columns.items() 3313 } if table_obj is not None else { 3314 col: get_pd_type_from_db_type(typ) 3315 for col, typ in get_table_cols_types( 3316 pipe.target, 3317 self, 3318 schema=self.get_pipe_schema(pipe), 3319 debug=debug, 3320 ).items() 3321 } 3322 pipe_bool_cols = [col for col, typ in pipe.dtypes.items() if are_dtypes_equal(str(typ), 'bool')] 3323 pd_db_df_aliases = { 3324 'int': 'bool', 3325 'float': 'bool', 3326 'numeric': 'bool', 3327 'guid': 'object', 3328 } 3329 if self.flavor == 'oracle': 3330 pd_db_df_aliases['int'] = 'numeric' 3331 3332 altered_cols = { 3333 col: (db_cols_types.get(col, 'object'), typ) 3334 for col, typ in df_cols_types.items() 3335 if not are_dtypes_equal(typ, db_cols_types.get(col, 'object').lower()) 3336 and not are_dtypes_equal(db_cols_types.get(col, 'object'), 'string') 3337 } 3338 3339 ### NOTE: Sometimes bools are coerced into ints or floats. 3340 altered_cols_to_ignore = set() 3341 for col, (db_typ, df_typ) in altered_cols.items(): 3342 for db_alias, df_alias in pd_db_df_aliases.items(): 3343 if db_alias in db_typ.lower() and df_alias in df_typ.lower(): 3344 altered_cols_to_ignore.add(col) 3345 3346 ### Oracle's bool handling sometimes mixes NUMBER and INT. 3347 for bool_col in pipe_bool_cols: 3348 if bool_col not in altered_cols: 3349 continue 3350 db_is_bool_compatible = ( 3351 are_dtypes_equal('int', altered_cols[bool_col][0]) 3352 or are_dtypes_equal('float', altered_cols[bool_col][0]) 3353 or are_dtypes_equal('numeric', altered_cols[bool_col][0]) 3354 or are_dtypes_equal('bool', altered_cols[bool_col][0]) 3355 ) 3356 df_is_bool_compatible = ( 3357 are_dtypes_equal('int', altered_cols[bool_col][1]) 3358 or are_dtypes_equal('float', altered_cols[bool_col][1]) 3359 or are_dtypes_equal('numeric', altered_cols[bool_col][1]) 3360 or are_dtypes_equal('bool', altered_cols[bool_col][1]) 3361 ) 3362 if db_is_bool_compatible and df_is_bool_compatible: 3363 altered_cols_to_ignore.add(bool_col) 3364 3365 for col in altered_cols_to_ignore: 3366 _ = altered_cols.pop(col, None) 3367 if not altered_cols: 3368 return [] 3369 3370 if numeric_cols: 3371 pipe.dtypes.update({col: 'numeric' for col in numeric_cols}) 3372 edit_success, edit_msg = pipe.edit(debug=debug) 3373 if not edit_success: 3374 warn( 3375 f"Failed to update dtypes for numeric columns {items_str(numeric_cols)}:\n" 3376 + f"{edit_msg}" 3377 ) 3378 else: 3379 numeric_cols.extend([col for col, typ in pipe.dtypes.items() if typ.startswith('numeric')]) 3380 3381 numeric_type = get_db_type_from_pd_type('numeric', self.flavor, as_sqlalchemy=False) 3382 text_type = get_db_type_from_pd_type('str', self.flavor, as_sqlalchemy=False) 3383 altered_cols_types = { 3384 col: ( 3385 numeric_type 3386 if col in numeric_cols 3387 else text_type 3388 ) 3389 for col, (db_typ, typ) in altered_cols.items() 3390 } 3391 3392 if self.flavor == 'sqlite': 3393 temp_table_name = '-' + session_id + '_' + target 3394 rename_query = ( 3395 "ALTER TABLE " 3396 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3397 + " RENAME TO " 3398 + sql_item_name(temp_table_name, self.flavor, None) 3399 ) 3400 create_query = ( 3401 "CREATE TABLE " 3402 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3403 + " (\n" 3404 ) 3405 for col_name, col_obj in table_obj.columns.items(): 3406 create_query += ( 3407 sql_item_name(col_name, self.flavor, None) 3408 + " " 3409 + ( 3410 str(col_obj.type) 3411 if col_name not in altered_cols 3412 else altered_cols_types[col_name] 3413 ) 3414 + ",\n" 3415 ) 3416 create_query = create_query[:-2] + "\n)" 3417 3418 insert_query = ( 3419 "INSERT INTO " 3420 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3421 + ' (' 3422 + ', '.join([ 3423 sql_item_name(col_name, self.flavor, None) 3424 for col_name, _ in table_obj.columns.items() 3425 ]) 3426 + ')' 3427 + "\nSELECT\n" 3428 ) 3429 for col_name, col_obj in table_obj.columns.items(): 3430 new_col_str = ( 3431 sql_item_name(col_name, self.flavor, None) 3432 if col_name not in altered_cols 3433 else ( 3434 "CAST(" 3435 + sql_item_name(col_name, self.flavor, None) 3436 + " AS " 3437 + altered_cols_types[col_name] 3438 + ")" 3439 ) 3440 ) 3441 insert_query += new_col_str + ",\n" 3442 insert_query = insert_query[:-2] + ( 3443 f"\nFROM {sql_item_name(temp_table_name, self.flavor, self.get_pipe_schema(pipe))}" 3444 ) 3445 3446 if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else "" 3447 3448 drop_query = f"DROP TABLE {if_exists_str}" + sql_item_name( 3449 temp_table_name, self.flavor, self.get_pipe_schema(pipe) 3450 ) 3451 return [ 3452 rename_query, 3453 create_query, 3454 insert_query, 3455 drop_query, 3456 ] 3457 3458 queries = [] 3459 if self.flavor == 'oracle': 3460 for col, typ in altered_cols_types.items(): 3461 add_query = ( 3462 "ALTER TABLE " 3463 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3464 + "\nADD " + sql_item_name(col + '_temp', self.flavor, None) 3465 + " " + typ 3466 ) 3467 queries.append(add_query) 3468 3469 for col, typ in altered_cols_types.items(): 3470 populate_temp_query = ( 3471 "UPDATE " 3472 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3473 + "\nSET " + sql_item_name(col + '_temp', self.flavor, None) 3474 + ' = ' + sql_item_name(col, self.flavor, None) 3475 ) 3476 queries.append(populate_temp_query) 3477 3478 for col, typ in altered_cols_types.items(): 3479 set_old_cols_to_null_query = ( 3480 "UPDATE " 3481 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3482 + "\nSET " + sql_item_name(col, self.flavor, None) 3483 + ' = NULL' 3484 ) 3485 queries.append(set_old_cols_to_null_query) 3486 3487 for col, typ in altered_cols_types.items(): 3488 alter_type_query = ( 3489 "ALTER TABLE " 3490 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3491 + "\nMODIFY " + sql_item_name(col, self.flavor, None) + ' ' 3492 + typ 3493 ) 3494 queries.append(alter_type_query) 3495 3496 for col, typ in altered_cols_types.items(): 3497 set_old_to_temp_query = ( 3498 "UPDATE " 3499 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3500 + "\nSET " + sql_item_name(col, self.flavor, None) 3501 + ' = ' + sql_item_name(col + '_temp', self.flavor, None) 3502 ) 3503 queries.append(set_old_to_temp_query) 3504 3505 for col, typ in altered_cols_types.items(): 3506 drop_temp_query = ( 3507 "ALTER TABLE " 3508 + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3509 + "\nDROP COLUMN " + sql_item_name(col + '_temp', self.flavor, None) 3510 ) 3511 queries.append(drop_temp_query) 3512 3513 return queries 3514 3515 query = "ALTER TABLE " + sql_item_name(target, self.flavor, self.get_pipe_schema(pipe)) 3516 for col, typ in altered_cols_types.items(): 3517 alter_col_prefix = ( 3518 'ALTER' if self.flavor not in ('mysql', 'mariadb', 'oracle') 3519 else 'MODIFY' 3520 ) 3521 type_prefix = ( 3522 '' if self.flavor in ('mssql', 'mariadb', 'mysql') 3523 else 'TYPE ' 3524 ) 3525 column_str = 'COLUMN' if self.flavor != 'oracle' else '' 3526 query_suffix = ( 3527 f"\n{alter_col_prefix} {column_str} " 3528 + sql_item_name(col, self.flavor, None) 3529 + " " + type_prefix + typ + "," 3530 ) 3531 if self.flavor not in SINGLE_ALTER_TABLE_FLAVORS: 3532 query += query_suffix 3533 else: 3534 queries.append(query + query_suffix[:-1]) 3535 3536 if self.flavor not in SINGLE_ALTER_TABLE_FLAVORS: 3537 queries.append(query[:-1]) 3538 3539 if self.flavor != 'duckdb': 3540 return queries 3541 3542 drop_index_queries = list(flatten_list( 3543 [q for ix, q in self.get_drop_index_queries(pipe, debug=debug).items()] 3544 )) 3545 create_index_queries = list(flatten_list( 3546 [q for ix, q in self.get_create_index_queries(pipe, debug=debug).items()] 3547 )) 3548 3549 return drop_index_queries + queries + create_index_queries
If we encounter a column of a different type, set the entire column to text. If the altered columns are numeric, alter to numeric instead.
Parameters
- pipe (mrsm.Pipe): The pipe to be altered.
- df (Union[pd.DataFrame, Dict[str, str]]): The pandas DataFrame which may contain altered columns. If a dict is provided, assume it maps columns to Pandas data types.
Returns
- A list of the
ALTER TABLE
SQL query or queries to be executed on the provided connector.
960def delete_pipe( 961 self, 962 pipe: mrsm.Pipe, 963 debug: bool = False, 964) -> SuccessTuple: 965 """ 966 Delete a Pipe's registration. 967 """ 968 from meerschaum.utils.packages import attempt_import 969 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 970 971 if not pipe.id: 972 return False, f"{pipe} is not registered." 973 974 ### ensure pipes table exists 975 from meerschaum.connectors.sql.tables import get_tables 976 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 977 978 q = sqlalchemy.delete(pipes_tbl).where(pipes_tbl.c.pipe_id == pipe.id) 979 if not self.exec(q, debug=debug): 980 return False, f"Failed to delete registration for {pipe}." 981 982 return True, "Success"
Delete a Pipe's registration.
985def get_pipe_data( 986 self, 987 pipe: mrsm.Pipe, 988 select_columns: Optional[List[str]] = None, 989 omit_columns: Optional[List[str]] = None, 990 begin: Union[datetime, str, None] = None, 991 end: Union[datetime, str, None] = None, 992 params: Optional[Dict[str, Any]] = None, 993 order: str = 'asc', 994 limit: Optional[int] = None, 995 begin_add_minutes: int = 0, 996 end_add_minutes: int = 0, 997 debug: bool = False, 998 **kw: Any 999) -> Union[pd.DataFrame, None]: 1000 """ 1001 Access a pipe's data from the SQL instance. 1002 1003 Parameters 1004 ---------- 1005 pipe: mrsm.Pipe: 1006 The pipe to get data from. 1007 1008 select_columns: Optional[List[str]], default None 1009 If provided, only select these given columns. 1010 Otherwise select all available columns (i.e. `SELECT *`). 1011 1012 omit_columns: Optional[List[str]], default None 1013 If provided, remove these columns from the selection. 1014 1015 begin: Union[datetime, str, None], default None 1016 If provided, get rows newer than or equal to this value. 1017 1018 end: Union[datetime, str, None], default None 1019 If provided, get rows older than or equal to this value. 1020 1021 params: Optional[Dict[str, Any]], default None 1022 Additional parameters to filter by. 1023 See `meerschaum.connectors.sql.build_where`. 1024 1025 order: Optional[str], default 'asc' 1026 The selection order for all of the indices in the query. 1027 If `None`, omit the `ORDER BY` clause. 1028 1029 limit: Optional[int], default None 1030 If specified, limit the number of rows retrieved to this value. 1031 1032 begin_add_minutes: int, default 0 1033 The number of minutes to add to the `begin` datetime (i.e. `DATEADD`. 1034 1035 end_add_minutes: int, default 0 1036 The number of minutes to add to the `end` datetime (i.e. `DATEADD`. 1037 1038 chunksize: Optional[int], default -1 1039 The size of dataframe chunks to load into memory. 1040 1041 debug: bool, default False 1042 Verbosity toggle. 1043 1044 Returns 1045 ------- 1046 A `pd.DataFrame` of the pipe's data. 1047 1048 """ 1049 import json 1050 from meerschaum.utils.misc import parse_df_datetimes, to_pandas_dtype 1051 from meerschaum.utils.packages import import_pandas 1052 from meerschaum.utils.dtypes import ( 1053 attempt_cast_to_numeric, 1054 attempt_cast_to_uuid, 1055 attempt_cast_to_bytes, 1056 are_dtypes_equal, 1057 ) 1058 from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type 1059 pd = import_pandas() 1060 is_dask = 'dask' in pd.__name__ 1061 1062 cols_types = pipe.get_columns_types(debug=debug) if pipe.enforce else {} 1063 dtypes = { 1064 **{ 1065 p_col: to_pandas_dtype(p_typ) 1066 for p_col, p_typ in pipe.dtypes.items() 1067 }, 1068 **{ 1069 col: get_pd_type_from_db_type(typ) 1070 for col, typ in cols_types.items() 1071 } 1072 } if pipe.enforce else {} 1073 if dtypes: 1074 if self.flavor == 'sqlite': 1075 if not pipe.columns.get('datetime', None): 1076 _dt = pipe.guess_datetime() 1077 else: 1078 _dt = pipe.get_columns('datetime') 1079 1080 if _dt: 1081 dt_type = dtypes.get(_dt, 'object').lower() 1082 if 'datetime' not in dt_type: 1083 if 'int' not in dt_type: 1084 dtypes[_dt] = 'datetime64[ns, UTC]' 1085 1086 existing_cols = cols_types.keys() 1087 select_columns = ( 1088 [ 1089 col 1090 for col in existing_cols 1091 if col not in (omit_columns or []) 1092 ] 1093 if not select_columns 1094 else [ 1095 col 1096 for col in select_columns 1097 if col in existing_cols 1098 and col not in (omit_columns or []) 1099 ] 1100 ) if pipe.enforce else select_columns 1101 if select_columns: 1102 dtypes = {col: typ for col, typ in dtypes.items() if col in select_columns} 1103 dtypes = { 1104 col: to_pandas_dtype(typ) 1105 for col, typ in dtypes.items() 1106 if col in select_columns and col not in (omit_columns or []) 1107 } if pipe.enforce else {} 1108 query = self.get_pipe_data_query( 1109 pipe, 1110 select_columns=select_columns, 1111 omit_columns=omit_columns, 1112 begin=begin, 1113 end=end, 1114 params=params, 1115 order=order, 1116 limit=limit, 1117 begin_add_minutes=begin_add_minutes, 1118 end_add_minutes=end_add_minutes, 1119 debug=debug, 1120 **kw 1121 ) 1122 1123 if is_dask: 1124 index_col = pipe.columns.get('datetime', None) 1125 kw['index_col'] = index_col 1126 1127 numeric_columns = [ 1128 col 1129 for col, typ in pipe.dtypes.items() 1130 if typ.startswith('numeric') and col in dtypes 1131 ] 1132 uuid_columns = [ 1133 col 1134 for col, typ in pipe.dtypes.items() 1135 if typ == 'uuid' and col in dtypes 1136 ] 1137 bytes_columns = [ 1138 col 1139 for col, typ in pipe.dtypes.items() 1140 if typ == 'bytes' and col in dtypes 1141 ] 1142 1143 kw['coerce_float'] = kw.get('coerce_float', (len(numeric_columns) == 0)) 1144 1145 df = self.read( 1146 query, 1147 dtype=dtypes, 1148 debug=debug, 1149 **kw 1150 ) 1151 for col in numeric_columns: 1152 if col not in df.columns: 1153 continue 1154 df[col] = df[col].apply(attempt_cast_to_numeric) 1155 1156 for col in uuid_columns: 1157 if col not in df.columns: 1158 continue 1159 df[col] = df[col].apply(attempt_cast_to_uuid) 1160 1161 for col in bytes_columns: 1162 if col not in df.columns: 1163 continue 1164 df[col] = df[col].apply(attempt_cast_to_bytes) 1165 1166 if self.flavor == 'sqlite': 1167 ignore_dt_cols = [ 1168 col 1169 for col, dtype in pipe.dtypes.items() 1170 if not are_dtypes_equal(str(dtype), 'datetime') 1171 ] 1172 ### NOTE: We have to consume the iterator here to ensure that datetimes are parsed correctly 1173 df = ( 1174 parse_df_datetimes( 1175 df, 1176 ignore_cols=ignore_dt_cols, 1177 chunksize=kw.get('chunksize', None), 1178 strip_timezone=(pipe.tzinfo is None), 1179 debug=debug, 1180 ) if isinstance(df, pd.DataFrame) else ( 1181 [ 1182 parse_df_datetimes( 1183 c, 1184 ignore_cols=ignore_dt_cols, 1185 chunksize=kw.get('chunksize', None), 1186 strip_timezone=(pipe.tzinfo is None), 1187 debug=debug, 1188 ) 1189 for c in df 1190 ] 1191 ) 1192 ) 1193 for col, typ in dtypes.items(): 1194 if typ != 'json': 1195 continue 1196 df[col] = df[col].apply(lambda x: json.loads(x) if x is not None else x) 1197 return df
Access a pipe's data from the SQL instance.
Parameters
- pipe (mrsm.Pipe:): The pipe to get data from.
- select_columns (Optional[List[str]], default None):
If provided, only select these given columns.
Otherwise select all available columns (i.e.
SELECT *
). - omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
- begin (Union[datetime, str, None], default None): If provided, get rows newer than or equal to this value.
- end (Union[datetime, str, None], default None): If provided, get rows older than or equal to this value.
- params (Optional[Dict[str, Any]], default None):
Additional parameters to filter by.
See
meerschaum.connectors.sql.build_where
. - order (Optional[str], default 'asc'):
The selection order for all of the indices in the query.
If
None
, omit theORDER BY
clause. - limit (Optional[int], default None): If specified, limit the number of rows retrieved to this value.
- begin_add_minutes (int, default 0):
The number of minutes to add to the
begin
datetime (i.e.DATEADD
. - end_add_minutes (int, default 0):
The number of minutes to add to the
end
datetime (i.e.DATEADD
. - chunksize (Optional[int], default -1): The size of dataframe chunks to load into memory.
- debug (bool, default False): Verbosity toggle.
Returns
- A
pd.DataFrame
of the pipe's data.
1200def get_pipe_data_query( 1201 self, 1202 pipe: mrsm.Pipe, 1203 select_columns: Optional[List[str]] = None, 1204 omit_columns: Optional[List[str]] = None, 1205 begin: Union[datetime, int, str, None] = None, 1206 end: Union[datetime, int, str, None] = None, 1207 params: Optional[Dict[str, Any]] = None, 1208 order: Optional[str] = 'asc', 1209 sort_datetimes: bool = False, 1210 limit: Optional[int] = None, 1211 begin_add_minutes: int = 0, 1212 end_add_minutes: int = 0, 1213 replace_nulls: Optional[str] = None, 1214 skip_existing_cols_check: bool = False, 1215 debug: bool = False, 1216 **kw: Any 1217) -> Union[str, None]: 1218 """ 1219 Return the `SELECT` query for retrieving a pipe's data from its instance. 1220 1221 Parameters 1222 ---------- 1223 pipe: mrsm.Pipe: 1224 The pipe to get data from. 1225 1226 select_columns: Optional[List[str]], default None 1227 If provided, only select these given columns. 1228 Otherwise select all available columns (i.e. `SELECT *`). 1229 1230 omit_columns: Optional[List[str]], default None 1231 If provided, remove these columns from the selection. 1232 1233 begin: Union[datetime, int, str, None], default None 1234 If provided, get rows newer than or equal to this value. 1235 1236 end: Union[datetime, str, None], default None 1237 If provided, get rows older than or equal to this value. 1238 1239 params: Optional[Dict[str, Any]], default None 1240 Additional parameters to filter by. 1241 See `meerschaum.connectors.sql.build_where`. 1242 1243 order: Optional[str], default None 1244 The selection order for all of the indices in the query. 1245 If `None`, omit the `ORDER BY` clause. 1246 1247 sort_datetimes: bool, default False 1248 Alias for `order='desc'`. 1249 1250 limit: Optional[int], default None 1251 If specified, limit the number of rows retrieved to this value. 1252 1253 begin_add_minutes: int, default 0 1254 The number of minutes to add to the `begin` datetime (i.e. `DATEADD`). 1255 1256 end_add_minutes: int, default 0 1257 The number of minutes to add to the `end` datetime (i.e. `DATEADD`). 1258 1259 chunksize: Optional[int], default -1 1260 The size of dataframe chunks to load into memory. 1261 1262 replace_nulls: Optional[str], default None 1263 If provided, replace null values with this value. 1264 1265 skip_existing_cols_check: bool, default False 1266 If `True`, do not verify that querying columns are actually on the table. 1267 1268 debug: bool, default False 1269 Verbosity toggle. 1270 1271 Returns 1272 ------- 1273 A `SELECT` query to retrieve a pipe's data. 1274 """ 1275 from meerschaum.utils.misc import items_str 1276 from meerschaum.utils.sql import sql_item_name, dateadd_str 1277 from meerschaum.utils.dtypes import coerce_timezone 1278 from meerschaum.utils.dtypes.sql import get_pd_type_from_db_type, get_db_type_from_pd_type 1279 1280 dt_col = pipe.columns.get('datetime', None) 1281 existing_cols = pipe.get_columns_types(debug=debug) if pipe.enforce else [] 1282 skip_existing_cols_check = skip_existing_cols_check or not pipe.enforce 1283 dt_typ = get_pd_type_from_db_type(existing_cols[dt_col]) if dt_col in existing_cols else None 1284 dt_db_type = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None 1285 select_columns = ( 1286 [col for col in existing_cols] 1287 if not select_columns 1288 else [col for col in select_columns if skip_existing_cols_check or col in existing_cols] 1289 ) 1290 if omit_columns: 1291 select_columns = [col for col in select_columns if col not in omit_columns] 1292 1293 if order is None and sort_datetimes: 1294 order = 'desc' 1295 1296 if begin == '': 1297 begin = pipe.get_sync_time(debug=debug) 1298 backtrack_interval = pipe.get_backtrack_interval(debug=debug) 1299 if begin is not None: 1300 begin -= backtrack_interval 1301 1302 begin, end = pipe.parse_date_bounds(begin, end) 1303 if isinstance(begin, datetime) and dt_typ: 1304 begin = coerce_timezone(begin, strip_utc=('utc' not in dt_typ.lower())) 1305 if isinstance(end, datetime) and dt_typ: 1306 end = coerce_timezone(end, strip_utc=('utc' not in dt_typ.lower())) 1307 1308 cols_names = [ 1309 sql_item_name(col, self.flavor, None) 1310 for col in select_columns 1311 ] 1312 select_cols_str = ( 1313 'SELECT\n ' 1314 + ',\n '.join( 1315 [ 1316 ( 1317 col_name 1318 if not replace_nulls 1319 else f"COALESCE(col_name, '{replace_nulls}') AS {col_name}" 1320 ) 1321 for col_name in cols_names 1322 ] 1323 ) 1324 ) if cols_names else 'SELECT *' 1325 pipe_table_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 1326 query = f"{select_cols_str}\nFROM {pipe_table_name}" 1327 where = "" 1328 1329 if order is not None: 1330 default_order = 'asc' 1331 if order not in ('asc', 'desc'): 1332 warn(f"Ignoring unsupported order '{order}'. Falling back to '{default_order}'.") 1333 order = default_order 1334 order = order.upper() 1335 1336 if not pipe.columns.get('datetime', None): 1337 _dt = pipe.guess_datetime() 1338 dt = sql_item_name(_dt, self.flavor, None) if _dt else None 1339 is_guess = True 1340 else: 1341 _dt = pipe.get_columns('datetime') 1342 dt = sql_item_name(_dt, self.flavor, None) 1343 is_guess = False 1344 1345 quoted_indices = { 1346 key: sql_item_name(val, self.flavor, None) 1347 for key, val in pipe.columns.items() 1348 if val in existing_cols or skip_existing_cols_check 1349 } 1350 1351 if begin is not None or end is not None: 1352 if is_guess: 1353 if _dt is None: 1354 warn( 1355 f"No datetime could be determined for {pipe}." 1356 + "\n Ignoring begin and end...", 1357 stack=False, 1358 ) 1359 begin, end = None, None 1360 else: 1361 warn( 1362 f"A datetime wasn't specified for {pipe}.\n" 1363 + f" Using column \"{_dt}\" for datetime bounds...", 1364 stack=False, 1365 ) 1366 1367 is_dt_bound = False 1368 if begin is not None and (_dt in existing_cols or skip_existing_cols_check): 1369 begin_da = dateadd_str( 1370 flavor=self.flavor, 1371 datepart='minute', 1372 number=begin_add_minutes, 1373 begin=begin, 1374 db_type=dt_db_type, 1375 ) 1376 where += f"\n {dt} >= {begin_da}" + ("\n AND\n " if end is not None else "") 1377 is_dt_bound = True 1378 1379 if end is not None and (_dt in existing_cols or skip_existing_cols_check): 1380 if 'int' in str(type(end)).lower() and end == begin: 1381 end += 1 1382 end_da = dateadd_str( 1383 flavor=self.flavor, 1384 datepart='minute', 1385 number=end_add_minutes, 1386 begin=end, 1387 db_type=dt_db_type, 1388 ) 1389 where += f"{dt} < {end_da}" 1390 is_dt_bound = True 1391 1392 if params is not None: 1393 from meerschaum.utils.sql import build_where 1394 valid_params = { 1395 k: v 1396 for k, v in params.items() 1397 if k in existing_cols or skip_existing_cols_check 1398 } 1399 if valid_params: 1400 where += build_where(valid_params, self).replace( 1401 'WHERE', (' AND' if is_dt_bound else " ") 1402 ) 1403 1404 if len(where) > 0: 1405 query += "\nWHERE " + where 1406 1407 if order is not None: 1408 ### Sort by indices, starting with datetime. 1409 order_by = "" 1410 if quoted_indices: 1411 order_by += "\nORDER BY " 1412 if _dt and (_dt in existing_cols or skip_existing_cols_check): 1413 order_by += dt + ' ' + order + ',' 1414 for key, quoted_col_name in quoted_indices.items(): 1415 if dt == quoted_col_name: 1416 continue 1417 order_by += ' ' + quoted_col_name + ' ' + order + ',' 1418 order_by = order_by[:-1] 1419 1420 query += order_by 1421 1422 if isinstance(limit, int): 1423 if self.flavor == 'mssql': 1424 query = f'SELECT TOP {limit}\n' + query[len("SELECT "):] 1425 elif self.flavor == 'oracle': 1426 query = ( 1427 f"SELECT * FROM (\n {query}\n)\n" 1428 + f"WHERE ROWNUM IN ({', '.join([str(i) for i in range(1, limit+1)])})" 1429 ) 1430 else: 1431 query += f"\nLIMIT {limit}" 1432 1433 if debug: 1434 to_print = ( 1435 [] 1436 + ([f"begin='{begin}'"] if begin else []) 1437 + ([f"end='{end}'"] if end else []) 1438 + ([f"params={params}"] if params else []) 1439 ) 1440 dprint("Getting pipe data with constraints: " + items_str(to_print, quotes=False)) 1441 1442 return query
Return the SELECT
query for retrieving a pipe's data from its instance.
Parameters
- pipe (mrsm.Pipe:): The pipe to get data from.
- select_columns (Optional[List[str]], default None):
If provided, only select these given columns.
Otherwise select all available columns (i.e.
SELECT *
). - omit_columns (Optional[List[str]], default None): If provided, remove these columns from the selection.
- begin (Union[datetime, int, str, None], default None): If provided, get rows newer than or equal to this value.
- end (Union[datetime, str, None], default None): If provided, get rows older than or equal to this value.
- params (Optional[Dict[str, Any]], default None):
Additional parameters to filter by.
See
meerschaum.connectors.sql.build_where
. - order (Optional[str], default None):
The selection order for all of the indices in the query.
If
None
, omit theORDER BY
clause. - sort_datetimes (bool, default False):
Alias for
order='desc'
. - limit (Optional[int], default None): If specified, limit the number of rows retrieved to this value.
- begin_add_minutes (int, default 0):
The number of minutes to add to the
begin
datetime (i.e.DATEADD
). - end_add_minutes (int, default 0):
The number of minutes to add to the
end
datetime (i.e.DATEADD
). - chunksize (Optional[int], default -1): The size of dataframe chunks to load into memory.
- replace_nulls (Optional[str], default None): If provided, replace null values with this value.
- skip_existing_cols_check (bool, default False):
If
True
, do not verify that querying columns are actually on the table. - debug (bool, default False): Verbosity toggle.
Returns
- A
SELECT
query to retrieve a pipe's data.
20def register_pipe( 21 self, 22 pipe: mrsm.Pipe, 23 debug: bool = False, 24) -> SuccessTuple: 25 """ 26 Register a new pipe. 27 A pipe's attributes must be set before registering. 28 """ 29 from meerschaum.utils.debug import dprint 30 from meerschaum.utils.packages import attempt_import 31 from meerschaum.utils.sql import json_flavors 32 33 ### ensure pipes table exists 34 from meerschaum.connectors.sql.tables import get_tables 35 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 36 37 if pipe.get_id(debug=debug) is not None: 38 return False, f"{pipe} is already registered." 39 40 ### NOTE: if `parameters` is supplied in the Pipe constructor, 41 ### then `pipe.parameters` will exist and not be fetched from the database. 42 43 ### 1. Prioritize the Pipe object's `parameters` first. 44 ### E.g. if the user manually sets the `parameters` property 45 ### or if the Pipe already exists 46 ### (which shouldn't be able to be registered anyway but that's an issue for later). 47 parameters = None 48 try: 49 parameters = pipe.parameters 50 except Exception as e: 51 if debug: 52 dprint(str(e)) 53 parameters = None 54 55 ### ensure `parameters` is a dictionary 56 if parameters is None: 57 parameters = {} 58 59 import json 60 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 61 values = { 62 'connector_keys' : pipe.connector_keys, 63 'metric_key' : pipe.metric_key, 64 'location_key' : pipe.location_key, 65 'parameters' : ( 66 json.dumps(parameters) 67 if self.flavor not in json_flavors 68 else parameters 69 ), 70 } 71 query = sqlalchemy.insert(pipes_tbl).values(**values) 72 result = self.exec(query, debug=debug) 73 if result is None: 74 return False, f"Failed to register {pipe}." 75 return True, f"Successfully registered {pipe}."
Register a new pipe. A pipe's attributes must be set before registering.
78def edit_pipe( 79 self, 80 pipe : mrsm.Pipe = None, 81 patch: bool = False, 82 debug: bool = False, 83 **kw : Any 84) -> SuccessTuple: 85 """ 86 Persist a Pipe's parameters to its database. 87 88 Parameters 89 ---------- 90 pipe: mrsm.Pipe, default None 91 The pipe to be edited. 92 patch: bool, default False 93 If patch is `True`, update the existing parameters by cascading. 94 Otherwise overwrite the parameters (default). 95 debug: bool, default False 96 Verbosity toggle. 97 """ 98 99 if pipe.id is None: 100 return False, f"{pipe} is not registered and cannot be edited." 101 102 from meerschaum.utils.packages import attempt_import 103 from meerschaum.utils.sql import json_flavors 104 if not patch: 105 parameters = pipe.__dict__.get('_attributes', {}).get('parameters', {}) 106 else: 107 from meerschaum import Pipe 108 from meerschaum.config._patch import apply_patch_to_config 109 original_parameters = Pipe( 110 pipe.connector_keys, pipe.metric_key, pipe.location_key, 111 mrsm_instance=pipe.instance_keys 112 ).parameters 113 parameters = apply_patch_to_config( 114 original_parameters, 115 pipe.parameters 116 ) 117 118 ### ensure pipes table exists 119 from meerschaum.connectors.sql.tables import get_tables 120 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 121 122 import json 123 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 124 125 values = { 126 'parameters': ( 127 json.dumps(parameters) 128 if self.flavor not in json_flavors 129 else parameters 130 ), 131 } 132 q = sqlalchemy.update(pipes_tbl).values(**values).where( 133 pipes_tbl.c.pipe_id == pipe.id 134 ) 135 136 result = self.exec(q, debug=debug) 137 message = ( 138 f"Successfully edited {pipe}." 139 if result is not None else f"Failed to edit {pipe}." 140 ) 141 return (result is not None), message
Persist a Pipe's parameters to its database.
Parameters
- pipe (mrsm.Pipe, default None): The pipe to be edited.
- patch (bool, default False):
If patch is
True
, update the existing parameters by cascading. Otherwise overwrite the parameters (default). - debug (bool, default False): Verbosity toggle.
1445def get_pipe_id( 1446 self, 1447 pipe: mrsm.Pipe, 1448 debug: bool = False, 1449) -> Any: 1450 """ 1451 Get a Pipe's ID from the pipes table. 1452 """ 1453 if pipe.temporary: 1454 return None 1455 from meerschaum.utils.packages import attempt_import 1456 sqlalchemy = attempt_import('sqlalchemy') 1457 from meerschaum.connectors.sql.tables import get_tables 1458 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 1459 1460 query = sqlalchemy.select(pipes_tbl.c.pipe_id).where( 1461 pipes_tbl.c.connector_keys == pipe.connector_keys 1462 ).where( 1463 pipes_tbl.c.metric_key == pipe.metric_key 1464 ).where( 1465 (pipes_tbl.c.location_key == pipe.location_key) if pipe.location_key is not None 1466 else pipes_tbl.c.location_key.is_(None) 1467 ) 1468 _id = self.value(query, debug=debug, silent=pipe.temporary) 1469 if _id is not None: 1470 _id = int(_id) 1471 return _id
Get a Pipe's ID from the pipes table.
1474def get_pipe_attributes( 1475 self, 1476 pipe: mrsm.Pipe, 1477 debug: bool = False, 1478) -> Dict[str, Any]: 1479 """ 1480 Get a Pipe's attributes dictionary. 1481 """ 1482 from meerschaum.connectors.sql.tables import get_tables 1483 from meerschaum.utils.packages import attempt_import 1484 sqlalchemy = attempt_import('sqlalchemy') 1485 1486 if pipe.get_id(debug=debug) is None: 1487 return {} 1488 1489 pipes_tbl = get_tables(mrsm_instance=self, create=(not pipe.temporary), debug=debug)['pipes'] 1490 1491 try: 1492 q = sqlalchemy.select(pipes_tbl).where(pipes_tbl.c.pipe_id == pipe.id) 1493 if debug: 1494 dprint(q) 1495 attributes = ( 1496 dict(self.exec(q, silent=True, debug=debug).first()._mapping) 1497 if self.flavor != 'duckdb' 1498 else self.read(q, debug=debug).to_dict(orient='records')[0] 1499 ) 1500 except Exception as e: 1501 import traceback 1502 traceback.print_exc() 1503 warn(e) 1504 print(pipe) 1505 return {} 1506 1507 ### handle non-PostgreSQL databases (text vs JSON) 1508 if not isinstance(attributes.get('parameters', None), dict): 1509 try: 1510 import json 1511 parameters = json.loads(attributes['parameters']) 1512 if isinstance(parameters, str) and parameters[0] == '{': 1513 parameters = json.loads(parameters) 1514 attributes['parameters'] = parameters 1515 except Exception as e: 1516 attributes['parameters'] = {} 1517 1518 return attributes
Get a Pipe's attributes dictionary.
1615def sync_pipe( 1616 self, 1617 pipe: mrsm.Pipe, 1618 df: Union[pd.DataFrame, str, Dict[Any, Any], None] = None, 1619 begin: Optional[datetime] = None, 1620 end: Optional[datetime] = None, 1621 chunksize: Optional[int] = -1, 1622 check_existing: bool = True, 1623 blocking: bool = True, 1624 debug: bool = False, 1625 _check_temporary_tables: bool = True, 1626 **kw: Any 1627) -> SuccessTuple: 1628 """ 1629 Sync a pipe using a database connection. 1630 1631 Parameters 1632 ---------- 1633 pipe: mrsm.Pipe 1634 The Meerschaum Pipe instance into which to sync the data. 1635 1636 df: Union[pandas.DataFrame, str, Dict[Any, Any], List[Dict[str, Any]]] 1637 An optional DataFrame or equivalent to sync into the pipe. 1638 Defaults to `None`. 1639 1640 begin: Optional[datetime], default None 1641 Optionally specify the earliest datetime to search for data. 1642 Defaults to `None`. 1643 1644 end: Optional[datetime], default None 1645 Optionally specify the latest datetime to search for data. 1646 Defaults to `None`. 1647 1648 chunksize: Optional[int], default -1 1649 Specify the number of rows to sync per chunk. 1650 If `-1`, resort to system configuration (default is `900`). 1651 A `chunksize` of `None` will sync all rows in one transaction. 1652 Defaults to `-1`. 1653 1654 check_existing: bool, default True 1655 If `True`, pull and diff with existing data from the pipe. Defaults to `True`. 1656 1657 blocking: bool, default True 1658 If `True`, wait for sync to finish and return its result, otherwise asyncronously sync. 1659 Defaults to `True`. 1660 1661 debug: bool, default False 1662 Verbosity toggle. Defaults to False. 1663 1664 kw: Any 1665 Catch-all for keyword arguments. 1666 1667 Returns 1668 ------- 1669 A `SuccessTuple` of success (`bool`) and message (`str`). 1670 """ 1671 from meerschaum.utils.packages import import_pandas 1672 from meerschaum.utils.sql import ( 1673 get_update_queries, 1674 sql_item_name, 1675 UPDATE_QUERIES, 1676 get_reset_autoincrement_queries, 1677 ) 1678 from meerschaum.utils.dtypes import are_dtypes_equal 1679 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 1680 from meerschaum import Pipe 1681 import time 1682 import copy 1683 pd = import_pandas() 1684 if df is None: 1685 msg = f"DataFrame is None. Cannot sync {pipe}." 1686 warn(msg) 1687 return False, msg 1688 1689 start = time.perf_counter() 1690 pipe_name = sql_item_name(pipe.target, self.flavor, schema=self.get_pipe_schema(pipe)) 1691 1692 if not pipe.temporary and not pipe.get_id(debug=debug): 1693 register_tuple = pipe.register(debug=debug) 1694 if not register_tuple[0]: 1695 return register_tuple 1696 1697 ### df is the dataframe returned from the remote source 1698 ### via the connector 1699 if debug: 1700 dprint("Fetched data:\n" + str(df)) 1701 1702 if not isinstance(df, pd.DataFrame): 1703 df = pipe.enforce_dtypes( 1704 df, 1705 chunksize=chunksize, 1706 safe_copy=kw.get('safe_copy', False), 1707 debug=debug, 1708 ) 1709 1710 ### if table does not exist, create it with indices 1711 is_new = False 1712 if not pipe.exists(debug=debug): 1713 check_existing = False 1714 is_new = True 1715 else: 1716 ### Check for new columns. 1717 add_cols_queries = self.get_add_columns_queries(pipe, df, debug=debug) 1718 if add_cols_queries: 1719 _ = pipe.__dict__.pop('_columns_indices', None) 1720 _ = pipe.__dict__.pop('_columns_types', None) 1721 if not self.exec_queries(add_cols_queries, debug=debug): 1722 warn(f"Failed to add new columns to {pipe}.") 1723 1724 alter_cols_queries = self.get_alter_columns_queries(pipe, df, debug=debug) 1725 if alter_cols_queries: 1726 _ = pipe.__dict__.pop('_columns_indices', None) 1727 _ = pipe.__dict__.pop('_columns_types', None) 1728 if not self.exec_queries(alter_cols_queries, debug=debug): 1729 warn(f"Failed to alter columns for {pipe}.") 1730 else: 1731 _ = pipe.infer_dtypes(persist=True) 1732 1733 ### NOTE: Oracle SQL < 23c (2023) and SQLite does not support booleans, 1734 ### so infer bools and persist them to `dtypes`. 1735 if self.flavor in ('oracle', 'sqlite', 'mysql', 'mariadb'): 1736 pipe_dtypes = pipe.dtypes 1737 new_bool_cols = { 1738 col: 'bool[pyarrow]' 1739 for col, typ in df.dtypes.items() 1740 if col not in pipe_dtypes 1741 and are_dtypes_equal(str(typ), 'bool') 1742 } 1743 pipe_dtypes.update(new_bool_cols) 1744 pipe.dtypes = pipe_dtypes 1745 if new_bool_cols and not pipe.temporary: 1746 infer_bool_success, infer_bool_msg = pipe.edit(debug=debug) 1747 if not infer_bool_success: 1748 return infer_bool_success, infer_bool_msg 1749 1750 upsert = pipe.parameters.get('upsert', False) and (self.flavor + '-upsert') in UPDATE_QUERIES 1751 if upsert: 1752 check_existing = False 1753 kw['safe_copy'] = kw.get('safe_copy', False) 1754 1755 unseen_df, update_df, delta_df = ( 1756 pipe.filter_existing( 1757 df, 1758 chunksize=chunksize, 1759 debug=debug, 1760 **kw 1761 ) if check_existing else (df, None, df) 1762 ) 1763 if upsert: 1764 unseen_df, update_df, delta_df = (df.head(0), df, df) 1765 1766 if debug: 1767 dprint("Delta data:\n" + str(delta_df)) 1768 dprint("Unseen data:\n" + str(unseen_df)) 1769 if update_df is not None: 1770 dprint(("Update" if not upsert else "Upsert") + " data:\n" + str(update_df)) 1771 1772 if_exists = kw.get('if_exists', 'append') 1773 if 'if_exists' in kw: 1774 kw.pop('if_exists') 1775 if 'name' in kw: 1776 kw.pop('name') 1777 1778 ### Insert new data into Pipe's table. 1779 unseen_kw = copy.deepcopy(kw) 1780 unseen_kw.update({ 1781 'name': pipe.target, 1782 'if_exists': if_exists, 1783 'debug': debug, 1784 'as_dict': True, 1785 'safe_copy': kw.get('safe_copy', False), 1786 'chunksize': chunksize, 1787 'dtype': self.get_to_sql_dtype(pipe, unseen_df, update_dtypes=True), 1788 'schema': self.get_pipe_schema(pipe), 1789 }) 1790 1791 dt_col = pipe.columns.get('datetime', None) 1792 primary_key = pipe.columns.get('primary', None) 1793 autoincrement = ( 1794 pipe.parameters.get('autoincrement', False) 1795 or ( 1796 is_new 1797 and primary_key 1798 and primary_key 1799 not in pipe.dtypes 1800 and primary_key not in unseen_df.columns 1801 ) 1802 ) 1803 if autoincrement and autoincrement not in pipe.parameters: 1804 pipe.parameters['autoincrement'] = autoincrement 1805 edit_success, edit_msg = pipe.edit(debug=debug) 1806 if not edit_success: 1807 return edit_success, edit_msg 1808 1809 def _check_pk(_df_to_clear): 1810 if _df_to_clear is None: 1811 return 1812 if primary_key not in _df_to_clear.columns: 1813 return 1814 if not _df_to_clear[primary_key].notnull().any(): 1815 del _df_to_clear[primary_key] 1816 1817 autoincrement_needs_reset = bool( 1818 autoincrement 1819 and primary_key 1820 and primary_key in unseen_df.columns 1821 and unseen_df[primary_key].notnull().any() 1822 ) 1823 if autoincrement and primary_key: 1824 for _df_to_clear in (unseen_df, update_df, delta_df): 1825 _check_pk(_df_to_clear) 1826 1827 if is_new: 1828 create_success, create_msg = self.create_pipe_table_from_df( 1829 pipe, 1830 unseen_df, 1831 debug=debug, 1832 ) 1833 if not create_success: 1834 return create_success, create_msg 1835 1836 do_identity_insert = bool( 1837 self.flavor in ('mssql',) 1838 and primary_key 1839 and primary_key in unseen_df.columns 1840 and autoincrement 1841 ) 1842 stats = {'success': True, 'msg': ''} 1843 if len(unseen_df) > 0: 1844 with self.engine.connect() as connection: 1845 with connection.begin(): 1846 if do_identity_insert: 1847 identity_on_result = self.exec( 1848 f"SET IDENTITY_INSERT {pipe_name} ON", 1849 commit=False, 1850 _connection=connection, 1851 close=False, 1852 debug=debug, 1853 ) 1854 if identity_on_result is None: 1855 return False, f"Could not enable identity inserts on {pipe}." 1856 1857 stats = self.to_sql( 1858 unseen_df, 1859 _connection=connection, 1860 **unseen_kw 1861 ) 1862 1863 if do_identity_insert: 1864 identity_off_result = self.exec( 1865 f"SET IDENTITY_INSERT {pipe_name} OFF", 1866 commit=False, 1867 _connection=connection, 1868 close=False, 1869 debug=debug, 1870 ) 1871 if identity_off_result is None: 1872 return False, f"Could not disable identity inserts on {pipe}." 1873 1874 if is_new: 1875 if not self.create_indices(pipe, debug=debug): 1876 warn(f"Failed to create indices for {pipe}. Continuing...") 1877 1878 if autoincrement_needs_reset: 1879 reset_autoincrement_queries = get_reset_autoincrement_queries( 1880 pipe.target, 1881 primary_key, 1882 self, 1883 schema=self.get_pipe_schema(pipe), 1884 debug=debug, 1885 ) 1886 results = self.exec_queries(reset_autoincrement_queries, debug=debug) 1887 for result in results: 1888 if result is None: 1889 warn(f"Could not reset auto-incrementing primary key for {pipe}.", stack=False) 1890 1891 if update_df is not None and len(update_df) > 0: 1892 temp_target = self.get_temporary_target( 1893 pipe.target, 1894 label=('update' if not upsert else 'upsert'), 1895 ) 1896 self._log_temporary_tables_creation(temp_target, create=(not pipe.temporary), debug=debug) 1897 temp_pipe = Pipe( 1898 pipe.connector_keys.replace(':', '_') + '_', pipe.metric_key, pipe.location_key, 1899 instance=pipe.instance_keys, 1900 columns={ 1901 (ix_key if ix_key != 'primary' else 'primary_'): ix 1902 for ix_key, ix in pipe.columns.items() 1903 if ix and ix in update_df.columns 1904 }, 1905 dtypes={ 1906 col: typ 1907 for col, typ in pipe.dtypes.items() 1908 if col in update_df.columns 1909 }, 1910 target=temp_target, 1911 temporary=True, 1912 enforce=False, 1913 static=True, 1914 autoincrement=False, 1915 parameters={ 1916 'schema': self.internal_schema, 1917 'hypertable': False, 1918 }, 1919 ) 1920 temp_pipe.__dict__['_columns_types'] = { 1921 col: get_db_type_from_pd_type( 1922 pipe.dtypes.get(col, str(typ)), 1923 self.flavor, 1924 ) 1925 for col, typ in update_df.dtypes.items() 1926 } 1927 now_ts = time.perf_counter() 1928 temp_pipe.__dict__['_columns_types_timestamp'] = now_ts 1929 temp_pipe.__dict__['_skip_check_indices'] = True 1930 temp_success, temp_msg = temp_pipe.sync(update_df, check_existing=False, debug=debug) 1931 if not temp_success: 1932 return temp_success, temp_msg 1933 existing_cols = pipe.get_columns_types(debug=debug) 1934 join_cols = [ 1935 col 1936 for col_key, col in pipe.columns.items() 1937 if col and col in existing_cols 1938 ] if not primary_key or self.flavor == 'oracle' else ( 1939 [dt_col, primary_key] 1940 if self.flavor == 'timescaledb' and dt_col and dt_col in update_df.columns 1941 else [primary_key] 1942 ) 1943 update_queries = get_update_queries( 1944 pipe.target, 1945 temp_target, 1946 self, 1947 join_cols, 1948 upsert=upsert, 1949 schema=self.get_pipe_schema(pipe), 1950 patch_schema=self.internal_schema, 1951 datetime_col=(dt_col if dt_col in update_df.columns else None), 1952 identity_insert=(autoincrement and primary_key in update_df.columns), 1953 null_indices=pipe.null_indices, 1954 cast_columns=pipe.enforce, 1955 debug=debug, 1956 ) 1957 update_results = self.exec_queries( 1958 update_queries, 1959 break_on_error=True, 1960 rollback=True, 1961 debug=debug, 1962 ) 1963 update_success = all(update_results) 1964 self._log_temporary_tables_creation( 1965 temp_target, 1966 ready_to_drop=True, 1967 create=(not pipe.temporary), 1968 debug=debug, 1969 ) 1970 if not update_success: 1971 warn(f"Failed to apply update to {pipe}.") 1972 stats['success'] = stats['success'] and update_success 1973 stats['msg'] = ( 1974 (stats.get('msg', '') + f'\nFailed to apply update to {pipe}.').lstrip() 1975 if not update_success 1976 else stats.get('msg', '') 1977 ) 1978 1979 stop = time.perf_counter() 1980 success = stats['success'] 1981 if not success: 1982 return success, stats['msg'] or str(stats) 1983 1984 unseen_count = len(unseen_df.index) if unseen_df is not None else 0 1985 update_count = len(update_df.index) if update_df is not None else 0 1986 msg = ( 1987 ( 1988 f"Inserted {unseen_count:,}, " 1989 + f"updated {update_count:,} rows." 1990 ) 1991 if not upsert 1992 else ( 1993 f"Upserted {update_count:,} row" 1994 + ('s' if update_count != 1 else '') 1995 + "." 1996 ) 1997 ) 1998 if debug: 1999 msg = msg[:-1] + ( 2000 f"\non table {sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe))}\n" 2001 + f"in {round(stop - start, 2)} seconds." 2002 ) 2003 2004 if _check_temporary_tables: 2005 drop_stale_success, drop_stale_msg = self._drop_old_temporary_tables( 2006 refresh=False, debug=debug 2007 ) 2008 if not drop_stale_success: 2009 warn(drop_stale_msg) 2010 2011 return success, msg
Sync a pipe using a database connection.
Parameters
- pipe (mrsm.Pipe): The Meerschaum Pipe instance into which to sync the data.
- df (Union[pandas.DataFrame, str, Dict[Any, Any], List[Dict[str, Any]]]):
An optional DataFrame or equivalent to sync into the pipe.
Defaults to
None
. - begin (Optional[datetime], default None):
Optionally specify the earliest datetime to search for data.
Defaults to
None
. - end (Optional[datetime], default None):
Optionally specify the latest datetime to search for data.
Defaults to
None
. - chunksize (Optional[int], default -1):
Specify the number of rows to sync per chunk.
If
-1
, resort to system configuration (default is900
). Achunksize
ofNone
will sync all rows in one transaction. Defaults to-1
. - check_existing (bool, default True):
If
True
, pull and diff with existing data from the pipe. Defaults toTrue
. - blocking (bool, default True):
If
True
, wait for sync to finish and return its result, otherwise asyncronously sync. Defaults toTrue
. - debug (bool, default False): Verbosity toggle. Defaults to False.
- kw (Any): Catch-all for keyword arguments.
Returns
- A
SuccessTuple
of success (bool
) and message (str
).
2014def sync_pipe_inplace( 2015 self, 2016 pipe: 'mrsm.Pipe', 2017 params: Optional[Dict[str, Any]] = None, 2018 begin: Union[datetime, int, None] = None, 2019 end: Union[datetime, int, None] = None, 2020 chunksize: Optional[int] = -1, 2021 check_existing: bool = True, 2022 debug: bool = False, 2023 **kw: Any 2024) -> SuccessTuple: 2025 """ 2026 If a pipe's connector is the same as its instance connector, 2027 it's more efficient to sync the pipe in-place rather than reading data into Pandas. 2028 2029 Parameters 2030 ---------- 2031 pipe: mrsm.Pipe 2032 The pipe whose connector is the same as its instance. 2033 2034 params: Optional[Dict[str, Any]], default None 2035 Optional params dictionary to build the `WHERE` clause. 2036 See `meerschaum.utils.sql.build_where`. 2037 2038 begin: Union[datetime, int, None], default None 2039 Optionally specify the earliest datetime to search for data. 2040 Defaults to `None`. 2041 2042 end: Union[datetime, int, None], default None 2043 Optionally specify the latest datetime to search for data. 2044 Defaults to `None`. 2045 2046 chunksize: Optional[int], default -1 2047 Specify the number of rows to sync per chunk. 2048 If `-1`, resort to system configuration (default is `900`). 2049 A `chunksize` of `None` will sync all rows in one transaction. 2050 Defaults to `-1`. 2051 2052 check_existing: bool, default True 2053 If `True`, pull and diff with existing data from the pipe. 2054 2055 debug: bool, default False 2056 Verbosity toggle. 2057 2058 Returns 2059 ------- 2060 A SuccessTuple. 2061 """ 2062 if self.flavor == 'duckdb': 2063 return pipe.sync( 2064 params=params, 2065 begin=begin, 2066 end=end, 2067 chunksize=chunksize, 2068 check_existing=check_existing, 2069 debug=debug, 2070 _inplace=False, 2071 **kw 2072 ) 2073 from meerschaum.utils.sql import ( 2074 sql_item_name, 2075 get_update_queries, 2076 get_null_replacement, 2077 get_create_table_queries, 2078 get_table_cols_types, 2079 session_execute, 2080 dateadd_str, 2081 UPDATE_QUERIES, 2082 ) 2083 from meerschaum.utils.dtypes.sql import ( 2084 get_pd_type_from_db_type, 2085 get_db_type_from_pd_type, 2086 ) 2087 from meerschaum.utils.misc import generate_password 2088 2089 transaction_id_length = ( 2090 mrsm.get_config( 2091 'system', 'connectors', 'sql', 'instance', 'temporary_target', 'transaction_id_length' 2092 ) 2093 ) 2094 transact_id = generate_password(transaction_id_length) 2095 2096 internal_schema = self.internal_schema 2097 target = pipe.target 2098 temp_table_roots = ['backtrack', 'new', 'delta', 'joined', 'unseen', 'update'] 2099 temp_tables = { 2100 table_root: self.get_temporary_target(target, transact_id=transact_id, label=table_root) 2101 for table_root in temp_table_roots 2102 } 2103 temp_table_names = { 2104 table_root: sql_item_name(table_name_raw, self.flavor, internal_schema) 2105 for table_root, table_name_raw in temp_tables.items() 2106 } 2107 temp_table_aliases = { 2108 table_root: sql_item_name(table_root, self.flavor) 2109 for table_root in temp_table_roots 2110 } 2111 table_alias_as = " AS" if self.flavor != 'oracle' else '' 2112 metadef = self.get_pipe_metadef( 2113 pipe, 2114 params=params, 2115 begin=begin, 2116 end=end, 2117 check_existing=check_existing, 2118 debug=debug, 2119 ) 2120 pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 2121 upsert = pipe.parameters.get('upsert', False) and f'{self.flavor}-upsert' in UPDATE_QUERIES 2122 static = pipe.parameters.get('static', False) 2123 database = getattr(self, 'database', self.parse_uri(self.URI).get('database', None)) 2124 primary_key = pipe.columns.get('primary', None) 2125 primary_key_typ = pipe.dtypes.get(primary_key, None) if primary_key else None 2126 primary_key_db_type = ( 2127 get_db_type_from_pd_type(primary_key_typ, self.flavor) 2128 if primary_key_typ 2129 else None 2130 ) 2131 autoincrement = pipe.parameters.get('autoincrement', False) 2132 dt_col = pipe.columns.get('datetime', None) 2133 dt_col_name = sql_item_name(dt_col, self.flavor, None) if dt_col else None 2134 dt_typ = pipe.dtypes.get(dt_col, 'datetime') if dt_col else None 2135 dt_db_type = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None 2136 2137 def clean_up_temp_tables(ready_to_drop: bool = False): 2138 log_success, log_msg = self._log_temporary_tables_creation( 2139 [ 2140 table 2141 for table in temp_tables.values() 2142 ] if not upsert else [temp_tables['update']], 2143 ready_to_drop=ready_to_drop, 2144 create=(not pipe.temporary), 2145 debug=debug, 2146 ) 2147 if not log_success: 2148 warn(log_msg) 2149 drop_stale_success, drop_stale_msg = self._drop_old_temporary_tables( 2150 refresh=False, 2151 debug=debug, 2152 ) 2153 if not drop_stale_success: 2154 warn(drop_stale_msg) 2155 return drop_stale_success, drop_stale_msg 2156 2157 sqlalchemy, sqlalchemy_orm = mrsm.attempt_import('sqlalchemy', 'sqlalchemy.orm') 2158 if not pipe.exists(debug=debug): 2159 create_pipe_queries = get_create_table_queries( 2160 metadef, 2161 pipe.target, 2162 self.flavor, 2163 schema=self.get_pipe_schema(pipe), 2164 primary_key=primary_key, 2165 primary_key_db_type=primary_key_db_type, 2166 autoincrement=autoincrement, 2167 datetime_column=dt_col, 2168 ) 2169 results = self.exec_queries(create_pipe_queries, debug=debug) 2170 if not all(results): 2171 _ = clean_up_temp_tables() 2172 return False, f"Could not insert new data into {pipe} from its SQL query definition." 2173 2174 if not self.create_indices(pipe, debug=debug): 2175 warn(f"Failed to create indices for {pipe}. Continuing...") 2176 2177 rowcount = pipe.get_rowcount(debug=debug) 2178 _ = clean_up_temp_tables() 2179 return True, f"Inserted {rowcount:,}, updated 0 rows." 2180 2181 session = sqlalchemy_orm.Session(self.engine) 2182 connectable = session if self.flavor != 'duckdb' else self 2183 2184 create_new_query = get_create_table_queries( 2185 metadef, 2186 temp_tables[('new') if not upsert else 'update'], 2187 self.flavor, 2188 schema=internal_schema, 2189 )[0] 2190 (create_new_success, create_new_msg), create_new_results = session_execute( 2191 session, 2192 create_new_query, 2193 with_results=True, 2194 debug=debug, 2195 ) 2196 if not create_new_success: 2197 _ = clean_up_temp_tables() 2198 return create_new_success, create_new_msg 2199 new_count = create_new_results[0].rowcount if create_new_results else 0 2200 2201 new_cols_types = get_table_cols_types( 2202 temp_tables[('new' if not upsert else 'update')], 2203 connectable=connectable, 2204 flavor=self.flavor, 2205 schema=internal_schema, 2206 database=database, 2207 debug=debug, 2208 ) if not static else pipe.get_columns_types(debug=debug) 2209 if not new_cols_types: 2210 return False, f"Failed to get new columns for {pipe}." 2211 2212 new_cols = { 2213 str(col_name): get_pd_type_from_db_type(str(col_type)) 2214 for col_name, col_type in new_cols_types.items() 2215 } 2216 new_cols_str = '\n ' + ',\n '.join([ 2217 sql_item_name(col, self.flavor) 2218 for col in new_cols 2219 ]) 2220 def get_col_typ(col: str, cols_types: Dict[str, str]) -> str: 2221 if self.flavor == 'oracle' and new_cols_types.get(col, '').lower() == 'char': 2222 return new_cols_types[col] 2223 return cols_types[col] 2224 2225 add_cols_queries = self.get_add_columns_queries(pipe, new_cols, debug=debug) 2226 if add_cols_queries: 2227 _ = pipe.__dict__.pop('_columns_types', None) 2228 _ = pipe.__dict__.pop('_columns_indices', None) 2229 self.exec_queries(add_cols_queries, debug=debug) 2230 2231 alter_cols_queries = self.get_alter_columns_queries(pipe, new_cols, debug=debug) 2232 if alter_cols_queries: 2233 _ = pipe.__dict__.pop('_columns_types', None) 2234 self.exec_queries(alter_cols_queries, debug=debug) 2235 2236 insert_queries = [ 2237 ( 2238 f"INSERT INTO {pipe_name} ({new_cols_str})\n" 2239 f"SELECT {new_cols_str}\nFROM {temp_table_names['new']}{table_alias_as}" 2240 f" {temp_table_aliases['new']}" 2241 ) 2242 ] if not check_existing and not upsert else [] 2243 2244 new_queries = insert_queries 2245 new_success, new_msg = ( 2246 session_execute(session, new_queries, debug=debug) 2247 if new_queries 2248 else (True, "Success") 2249 ) 2250 if not new_success: 2251 _ = clean_up_temp_tables() 2252 return new_success, new_msg 2253 2254 if not check_existing: 2255 session.commit() 2256 _ = clean_up_temp_tables() 2257 return True, f"Inserted {new_count}, updated 0 rows." 2258 2259 min_dt_col_name_da = dateadd_str( 2260 flavor=self.flavor, begin=f"MIN({dt_col_name})", db_type=dt_db_type, 2261 ) 2262 max_dt_col_name_da = dateadd_str( 2263 flavor=self.flavor, begin=f"MAX({dt_col_name})", db_type=dt_db_type, 2264 ) 2265 2266 (new_dt_bounds_success, new_dt_bounds_msg), new_dt_bounds_results = session_execute( 2267 session, 2268 [ 2269 "SELECT\n" 2270 f" {min_dt_col_name_da} AS {sql_item_name('min_dt', self.flavor)},\n" 2271 f" {max_dt_col_name_da} AS {sql_item_name('max_dt', self.flavor)}\n" 2272 f"FROM {temp_table_names['new' if not upsert else 'update']}\n" 2273 f"WHERE {dt_col_name} IS NOT NULL" 2274 ], 2275 with_results=True, 2276 debug=debug, 2277 ) if dt_col and not upsert else ((True, "Success"), None) 2278 if not new_dt_bounds_success: 2279 return ( 2280 new_dt_bounds_success, 2281 f"Could not determine in-place datetime bounds:\n{new_dt_bounds_msg}" 2282 ) 2283 2284 if dt_col and not upsert: 2285 begin, end = new_dt_bounds_results[0].fetchone() 2286 2287 backtrack_def = self.get_pipe_data_query( 2288 pipe, 2289 begin=begin, 2290 end=end, 2291 begin_add_minutes=0, 2292 end_add_minutes=1, 2293 params=params, 2294 debug=debug, 2295 order=None, 2296 ) 2297 create_backtrack_query = get_create_table_queries( 2298 backtrack_def, 2299 temp_tables['backtrack'], 2300 self.flavor, 2301 schema=internal_schema, 2302 )[0] 2303 (create_backtrack_success, create_backtrack_msg), create_backtrack_results = session_execute( 2304 session, 2305 create_backtrack_query, 2306 with_results=True, 2307 debug=debug, 2308 ) if not upsert else ((True, "Success"), None) 2309 2310 if not create_backtrack_success: 2311 _ = clean_up_temp_tables() 2312 return create_backtrack_success, create_backtrack_msg 2313 2314 backtrack_cols_types = get_table_cols_types( 2315 temp_tables['backtrack'], 2316 connectable=connectable, 2317 flavor=self.flavor, 2318 schema=internal_schema, 2319 database=database, 2320 debug=debug, 2321 ) if not (upsert or static) else new_cols_types 2322 2323 common_cols = [col for col in new_cols if col in backtrack_cols_types] 2324 primary_key = pipe.columns.get('primary', None) 2325 on_cols = { 2326 col: new_cols.get(col) 2327 for col_key, col in pipe.columns.items() 2328 if ( 2329 col 2330 and 2331 col_key != 'value' 2332 and col in backtrack_cols_types 2333 and col in new_cols 2334 ) 2335 } if not primary_key or self.flavor == 'oracle' else {primary_key: new_cols.get(primary_key)} 2336 2337 null_replace_new_cols_str = ( 2338 '\n ' + ',\n '.join([ 2339 f"COALESCE({temp_table_aliases['new']}.{sql_item_name(col, self.flavor)}, " 2340 + get_null_replacement(get_col_typ(col, new_cols_types), self.flavor) 2341 + ") AS " 2342 + sql_item_name(col, self.flavor, None) 2343 for col, typ in new_cols.items() 2344 ]) 2345 ) 2346 2347 select_delta_query = ( 2348 "SELECT" 2349 + null_replace_new_cols_str 2350 + f"\nFROM {temp_table_names['new']}{table_alias_as} {temp_table_aliases['new']}\n" 2351 + f"LEFT OUTER JOIN {temp_table_names['backtrack']}{table_alias_as} {temp_table_aliases['backtrack']}" 2352 + "\n ON\n " 2353 + '\n AND\n '.join([ 2354 ( 2355 f" COALESCE({temp_table_aliases['new']}." 2356 + sql_item_name(c, self.flavor, None) 2357 + ", " 2358 + get_null_replacement(get_col_typ(c, new_cols_types), self.flavor) 2359 + ")" 2360 + '\n =\n ' 2361 + f" COALESCE({temp_table_aliases['backtrack']}." 2362 + sql_item_name(c, self.flavor, None) 2363 + ", " 2364 + get_null_replacement(get_col_typ(c, backtrack_cols_types), self.flavor) 2365 + ") " 2366 ) for c in common_cols 2367 ]) 2368 + "\nWHERE\n " 2369 + '\n AND\n '.join([ 2370 ( 2371 f"{temp_table_aliases['backtrack']}." + sql_item_name(c, self.flavor) + ' IS NULL' 2372 ) for c in common_cols 2373 ]) 2374 ) 2375 create_delta_query = get_create_table_queries( 2376 select_delta_query, 2377 temp_tables['delta'], 2378 self.flavor, 2379 schema=internal_schema, 2380 )[0] 2381 create_delta_success, create_delta_msg = session_execute( 2382 session, 2383 create_delta_query, 2384 debug=debug, 2385 ) if not upsert else (True, "Success") 2386 if not create_delta_success: 2387 _ = clean_up_temp_tables() 2388 return create_delta_success, create_delta_msg 2389 2390 delta_cols_types = get_table_cols_types( 2391 temp_tables['delta'], 2392 connectable=connectable, 2393 flavor=self.flavor, 2394 schema=internal_schema, 2395 database=database, 2396 debug=debug, 2397 ) if not (upsert or static) else new_cols_types 2398 2399 ### This is a weird bug on SQLite. 2400 ### Sometimes the backtrack dtypes are all empty strings. 2401 if not all(delta_cols_types.values()): 2402 delta_cols_types = new_cols_types 2403 2404 delta_cols = { 2405 col: get_pd_type_from_db_type(typ) 2406 for col, typ in delta_cols_types.items() 2407 } 2408 delta_cols_str = ', '.join([ 2409 sql_item_name(col, self.flavor) 2410 for col in delta_cols 2411 ]) 2412 2413 select_joined_query = ( 2414 "SELECT\n " 2415 + (',\n '.join([ 2416 ( 2417 f"{temp_table_aliases['delta']}." + sql_item_name(c, self.flavor, None) 2418 + " AS " + sql_item_name(c + '_delta', self.flavor, None) 2419 ) for c in delta_cols 2420 ])) 2421 + ",\n " 2422 + (',\n '.join([ 2423 ( 2424 f"{temp_table_aliases['backtrack']}." + sql_item_name(c, self.flavor, None) 2425 + " AS " + sql_item_name(c + '_backtrack', self.flavor, None) 2426 ) for c in backtrack_cols_types 2427 ])) 2428 + f"\nFROM {temp_table_names['delta']}{table_alias_as} {temp_table_aliases['delta']}\n" 2429 + f"LEFT OUTER JOIN {temp_table_names['backtrack']}{table_alias_as}" 2430 + f" {temp_table_aliases['backtrack']}" 2431 + "\n ON\n " 2432 + '\n AND\n '.join([ 2433 ( 2434 f" COALESCE({temp_table_aliases['delta']}." + sql_item_name(c, self.flavor) 2435 + ", " 2436 + get_null_replacement(get_col_typ(c, new_cols_types), self.flavor) + ")" 2437 + '\n =\n ' 2438 + f" COALESCE({temp_table_aliases['backtrack']}." + sql_item_name(c, self.flavor) 2439 + ", " 2440 + get_null_replacement(get_col_typ(c, new_cols_types), self.flavor) + ")" 2441 ) for c, typ in on_cols.items() 2442 ]) 2443 ) 2444 2445 create_joined_query = get_create_table_queries( 2446 select_joined_query, 2447 temp_tables['joined'], 2448 self.flavor, 2449 schema=internal_schema, 2450 )[0] 2451 create_joined_success, create_joined_msg = session_execute( 2452 session, 2453 create_joined_query, 2454 debug=debug, 2455 ) if on_cols and not upsert else (True, "Success") 2456 if not create_joined_success: 2457 _ = clean_up_temp_tables() 2458 return create_joined_success, create_joined_msg 2459 2460 select_unseen_query = ( 2461 "SELECT\n " 2462 + (',\n '.join([ 2463 ( 2464 "CASE\n WHEN " + sql_item_name(c + '_delta', self.flavor, None) 2465 + " != " + get_null_replacement(get_col_typ(c, delta_cols_types), self.flavor) 2466 + " THEN " + sql_item_name(c + '_delta', self.flavor, None) 2467 + "\n ELSE NULL\n END" 2468 + " AS " + sql_item_name(c, self.flavor, None) 2469 ) for c, typ in delta_cols.items() 2470 ])) 2471 + f"\nFROM {temp_table_names['joined']}{table_alias_as} {temp_table_aliases['joined']}\n" 2472 + "WHERE\n " 2473 + '\n AND\n '.join([ 2474 ( 2475 sql_item_name(c + '_backtrack', self.flavor, None) + ' IS NULL' 2476 ) for c in delta_cols 2477 ]) 2478 ) 2479 create_unseen_query = get_create_table_queries( 2480 select_unseen_query, 2481 temp_tables['unseen'], 2482 self.flavor, 2483 internal_schema, 2484 )[0] 2485 (create_unseen_success, create_unseen_msg), create_unseen_results = session_execute( 2486 session, 2487 create_unseen_query, 2488 with_results=True, 2489 debug=debug 2490 ) if not upsert else ((True, "Success"), None) 2491 if not create_unseen_success: 2492 _ = clean_up_temp_tables() 2493 return create_unseen_success, create_unseen_msg 2494 2495 select_update_query = ( 2496 "SELECT\n " 2497 + (',\n '.join([ 2498 ( 2499 "CASE\n WHEN " + sql_item_name(c + '_delta', self.flavor, None) 2500 + " != " + get_null_replacement(get_col_typ(c, delta_cols_types), self.flavor) 2501 + " THEN " + sql_item_name(c + '_delta', self.flavor, None) 2502 + "\n ELSE NULL\n END" 2503 + " AS " + sql_item_name(c, self.flavor, None) 2504 ) for c, typ in delta_cols.items() 2505 ])) 2506 + f"\nFROM {temp_table_names['joined']}{table_alias_as} {temp_table_aliases['joined']}\n" 2507 + "WHERE\n " 2508 + '\n OR\n '.join([ 2509 ( 2510 sql_item_name(c + '_backtrack', self.flavor, None) + ' IS NOT NULL' 2511 ) for c in delta_cols 2512 ]) 2513 ) 2514 2515 create_update_query = get_create_table_queries( 2516 select_update_query, 2517 temp_tables['update'], 2518 self.flavor, 2519 internal_schema, 2520 )[0] 2521 (create_update_success, create_update_msg), create_update_results = session_execute( 2522 session, 2523 create_update_query, 2524 with_results=True, 2525 debug=debug, 2526 ) if on_cols and not upsert else ((True, "Success"), []) 2527 apply_update_queries = ( 2528 get_update_queries( 2529 pipe.target, 2530 temp_tables['update'], 2531 session, 2532 on_cols, 2533 upsert=upsert, 2534 schema=self.get_pipe_schema(pipe), 2535 patch_schema=internal_schema, 2536 datetime_col=pipe.columns.get('datetime', None), 2537 flavor=self.flavor, 2538 null_indices=pipe.null_indices, 2539 cast_columns=pipe.enforce, 2540 debug=debug, 2541 ) 2542 if on_cols else [] 2543 ) 2544 2545 apply_unseen_queries = [ 2546 ( 2547 f"INSERT INTO {pipe_name} ({delta_cols_str})\n" 2548 + f"SELECT {delta_cols_str}\nFROM " 2549 + ( 2550 temp_table_names['unseen'] 2551 if on_cols 2552 else temp_table_names['delta'] 2553 ) 2554 ), 2555 ] 2556 2557 (apply_unseen_success, apply_unseen_msg), apply_unseen_results = session_execute( 2558 session, 2559 apply_unseen_queries, 2560 with_results=True, 2561 debug=debug, 2562 ) if not upsert else ((True, "Success"), None) 2563 if not apply_unseen_success: 2564 _ = clean_up_temp_tables() 2565 return apply_unseen_success, apply_unseen_msg 2566 unseen_count = apply_unseen_results[0].rowcount if apply_unseen_results else 0 2567 2568 (apply_update_success, apply_update_msg), apply_update_results = session_execute( 2569 session, 2570 apply_update_queries, 2571 with_results=True, 2572 debug=debug, 2573 ) 2574 if not apply_update_success: 2575 _ = clean_up_temp_tables() 2576 return apply_update_success, apply_update_msg 2577 update_count = apply_update_results[0].rowcount if apply_update_results else 0 2578 2579 session.commit() 2580 2581 msg = ( 2582 f"Inserted {unseen_count:,}, updated {update_count:,} rows." 2583 if not upsert 2584 else f"Upserted {update_count:,} row" + ('s' if update_count != 1 else '') + "." 2585 ) 2586 _ = clean_up_temp_tables(ready_to_drop=True) 2587 2588 return True, msg
If a pipe's connector is the same as its instance connector, it's more efficient to sync the pipe in-place rather than reading data into Pandas.
Parameters
- pipe (mrsm.Pipe): The pipe whose connector is the same as its instance.
- params (Optional[Dict[str, Any]], default None):
Optional params dictionary to build the
WHERE
clause. Seemeerschaum.utils.sql.build_where
. - begin (Union[datetime, int, None], default None):
Optionally specify the earliest datetime to search for data.
Defaults to
None
. - end (Union[datetime, int, None], default None):
Optionally specify the latest datetime to search for data.
Defaults to
None
. - chunksize (Optional[int], default -1):
Specify the number of rows to sync per chunk.
If
-1
, resort to system configuration (default is900
). Achunksize
ofNone
will sync all rows in one transaction. Defaults to-1
. - check_existing (bool, default True):
If
True
, pull and diff with existing data from the pipe. - debug (bool, default False): Verbosity toggle.
Returns
- A SuccessTuple.
2591def get_sync_time( 2592 self, 2593 pipe: 'mrsm.Pipe', 2594 params: Optional[Dict[str, Any]] = None, 2595 newest: bool = True, 2596 remote: bool = False, 2597 debug: bool = False, 2598) -> Union[datetime, int, None]: 2599 """Get a Pipe's most recent datetime value. 2600 2601 Parameters 2602 ---------- 2603 pipe: mrsm.Pipe 2604 The pipe to get the sync time for. 2605 2606 params: Optional[Dict[str, Any]], default None 2607 Optional params dictionary to build the `WHERE` clause. 2608 See `meerschaum.utils.sql.build_where`. 2609 2610 newest: bool, default True 2611 If `True`, get the most recent datetime (honoring `params`). 2612 If `False`, get the oldest datetime (ASC instead of DESC). 2613 2614 remote: bool, default False 2615 If `True`, return the sync time for the remote fetch definition. 2616 2617 Returns 2618 ------- 2619 A `datetime` object (or `int` if using an integer axis) if the pipe exists, otherwise `None`. 2620 """ 2621 from meerschaum.utils.sql import sql_item_name, build_where, wrap_query_with_cte 2622 src_name = sql_item_name('src', self.flavor) 2623 table_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 2624 2625 dt_col = pipe.columns.get('datetime', None) 2626 if dt_col is None: 2627 return None 2628 dt_col_name = sql_item_name(dt_col, self.flavor, None) 2629 2630 if remote and pipe.connector.type != 'sql': 2631 warn(f"Cannot get the remote sync time for {pipe}.") 2632 return None 2633 2634 ASC_or_DESC = "DESC" if newest else "ASC" 2635 existing_cols = pipe.get_columns_types(debug=debug) 2636 valid_params = {} 2637 if params is not None: 2638 valid_params = {k: v for k, v in params.items() if k in existing_cols} 2639 flavor = self.flavor if not remote else pipe.connector.flavor 2640 2641 ### If no bounds are provided for the datetime column, 2642 ### add IS NOT NULL to the WHERE clause. 2643 if dt_col not in valid_params: 2644 valid_params[dt_col] = '_None' 2645 where = "" if not valid_params else build_where(valid_params, self) 2646 src_query = ( 2647 f"SELECT {dt_col_name}\nFROM {table_name}{where}" 2648 if not remote 2649 else self.get_pipe_metadef(pipe, params=params, begin=None, end=None) 2650 ) 2651 2652 base_query = ( 2653 f"SELECT {dt_col_name}\n" 2654 f"FROM {src_name}{where}\n" 2655 f"ORDER BY {dt_col_name} {ASC_or_DESC}\n" 2656 f"LIMIT 1" 2657 ) 2658 if self.flavor == 'mssql': 2659 base_query = ( 2660 f"SELECT TOP 1 {dt_col_name}\n" 2661 f"FROM {src_name}{where}\n" 2662 f"ORDER BY {dt_col_name} {ASC_or_DESC}" 2663 ) 2664 elif self.flavor == 'oracle': 2665 base_query = ( 2666 "SELECT * FROM (\n" 2667 f" SELECT {dt_col_name}\n" 2668 f" FROM {src_name}{where}\n" 2669 f" ORDER BY {dt_col_name} {ASC_or_DESC}\n" 2670 ") WHERE ROWNUM = 1" 2671 ) 2672 2673 query = wrap_query_with_cte(src_query, base_query, flavor) 2674 2675 try: 2676 db_time = self.value(query, silent=True, debug=debug) 2677 2678 ### No datetime could be found. 2679 if db_time is None: 2680 return None 2681 ### sqlite returns str. 2682 if isinstance(db_time, str): 2683 dateutil_parser = mrsm.attempt_import('dateutil.parser') 2684 st = dateutil_parser.parse(db_time) 2685 ### Do nothing if a datetime object is returned. 2686 elif isinstance(db_time, datetime): 2687 if hasattr(db_time, 'to_pydatetime'): 2688 st = db_time.to_pydatetime() 2689 else: 2690 st = db_time 2691 ### Sometimes the datetime is actually a date. 2692 elif isinstance(db_time, date): 2693 st = datetime.combine(db_time, datetime.min.time()) 2694 ### Adding support for an integer datetime axis. 2695 elif 'int' in str(type(db_time)).lower(): 2696 st = int(db_time) 2697 ### Convert pandas timestamp to Python datetime. 2698 else: 2699 st = db_time.to_pydatetime() 2700 2701 sync_time = st 2702 2703 except Exception as e: 2704 sync_time = None 2705 warn(str(e)) 2706 2707 return sync_time
Get a Pipe's most recent datetime value.
Parameters
- pipe (mrsm.Pipe): The pipe to get the sync time for.
- params (Optional[Dict[str, Any]], default None):
Optional params dictionary to build the
WHERE
clause. Seemeerschaum.utils.sql.build_where
. - newest (bool, default True):
If
True
, get the most recent datetime (honoringparams
). IfFalse
, get the oldest datetime (ASC instead of DESC). - remote (bool, default False):
If
True
, return the sync time for the remote fetch definition.
Returns
- A
datetime
object (orint
if using an integer axis) if the pipe exists, otherwiseNone
.
2710def pipe_exists( 2711 self, 2712 pipe: mrsm.Pipe, 2713 debug: bool = False 2714) -> bool: 2715 """ 2716 Check that a Pipe's table exists. 2717 2718 Parameters 2719 ---------- 2720 pipe: mrsm.Pipe: 2721 The pipe to check. 2722 2723 debug: bool, default False 2724 Verbosity toggle. 2725 2726 Returns 2727 ------- 2728 A `bool` corresponding to whether a pipe's table exists. 2729 2730 """ 2731 from meerschaum.utils.sql import table_exists 2732 exists = table_exists( 2733 pipe.target, 2734 self, 2735 schema=self.get_pipe_schema(pipe), 2736 debug=debug, 2737 ) 2738 if debug: 2739 from meerschaum.utils.debug import dprint 2740 dprint(f"{pipe} " + ('exists.' if exists else 'does not exist.')) 2741 return exists
Check that a Pipe's table exists.
Parameters
- pipe (mrsm.Pipe:): The pipe to check.
- debug (bool, default False): Verbosity toggle.
Returns
- A
bool
corresponding to whether a pipe's table exists.
2744def get_pipe_rowcount( 2745 self, 2746 pipe: mrsm.Pipe, 2747 begin: Union[datetime, int, None] = None, 2748 end: Union[datetime, int, None] = None, 2749 params: Optional[Dict[str, Any]] = None, 2750 remote: bool = False, 2751 debug: bool = False 2752) -> Union[int, None]: 2753 """ 2754 Get the rowcount for a pipe in accordance with given parameters. 2755 2756 Parameters 2757 ---------- 2758 pipe: mrsm.Pipe 2759 The pipe to query with. 2760 2761 begin: Union[datetime, int, None], default None 2762 The begin datetime value. 2763 2764 end: Union[datetime, int, None], default None 2765 The end datetime value. 2766 2767 params: Optional[Dict[str, Any]], default None 2768 See `meerschaum.utils.sql.build_where`. 2769 2770 remote: bool, default False 2771 If `True`, get the rowcount for the remote table. 2772 2773 debug: bool, default False 2774 Verbosity toggle. 2775 2776 Returns 2777 ------- 2778 An `int` for the number of rows if the `pipe` exists, otherwise `None`. 2779 2780 """ 2781 from meerschaum.utils.sql import dateadd_str, sql_item_name, wrap_query_with_cte, build_where 2782 from meerschaum.connectors.sql._fetch import get_pipe_query 2783 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 2784 if remote: 2785 msg = f"'fetch:definition' must be an attribute of {pipe} to get a remote rowcount." 2786 if 'fetch' not in pipe.parameters: 2787 error(msg) 2788 return None 2789 if 'definition' not in pipe.parameters['fetch']: 2790 error(msg) 2791 return None 2792 2793 2794 flavor = self.flavor if not remote else pipe.connector.flavor 2795 conn = self if not remote else pipe.connector 2796 _pipe_name = sql_item_name(pipe.target, flavor, self.get_pipe_schema(pipe)) 2797 dt_col = pipe.columns.get('datetime', None) 2798 dt_typ = pipe.dtypes.get(dt_col, 'datetime') if dt_col else None 2799 dt_db_type = get_db_type_from_pd_type(dt_typ, flavor) if dt_typ else None 2800 if not dt_col: 2801 dt_col = pipe.guess_datetime() 2802 dt_name = sql_item_name(dt_col, flavor, None) if dt_col else None 2803 is_guess = True 2804 else: 2805 dt_col = pipe.get_columns('datetime') 2806 dt_name = sql_item_name(dt_col, flavor, None) 2807 is_guess = False 2808 2809 if begin is not None or end is not None: 2810 if is_guess: 2811 if dt_col is None: 2812 warn( 2813 f"No datetime could be determined for {pipe}." 2814 + "\n Ignoring begin and end...", 2815 stack=False, 2816 ) 2817 begin, end = None, None 2818 else: 2819 warn( 2820 f"A datetime wasn't specified for {pipe}.\n" 2821 + f" Using column \"{dt_col}\" for datetime bounds...", 2822 stack=False, 2823 ) 2824 2825 2826 _datetime_name = sql_item_name(dt_col, flavor) 2827 _cols_names = [ 2828 sql_item_name(col, flavor) 2829 for col in set( 2830 ( 2831 [dt_col] 2832 if dt_col 2833 else [] 2834 ) + ( 2835 [] 2836 if params is None 2837 else list(params.keys()) 2838 ) 2839 ) 2840 ] 2841 if not _cols_names: 2842 _cols_names = ['*'] 2843 2844 src = ( 2845 f"SELECT {', '.join(_cols_names)}\nFROM {_pipe_name}" 2846 if not remote 2847 else get_pipe_query(pipe) 2848 ) 2849 parent_query = f"SELECT COUNT(*)\nFROM {sql_item_name('src', flavor)}" 2850 query = wrap_query_with_cte(src, parent_query, flavor) 2851 if begin is not None or end is not None: 2852 query += "\nWHERE" 2853 if begin is not None: 2854 query += ( 2855 f"\n {dt_name} >= " 2856 + dateadd_str(flavor, datepart='minute', number=0, begin=begin, db_type=dt_db_type) 2857 ) 2858 if end is not None and begin is not None: 2859 query += "\n AND" 2860 if end is not None: 2861 query += ( 2862 f"\n {dt_name} < " 2863 + dateadd_str(flavor, datepart='minute', number=0, begin=end, db_type=dt_db_type) 2864 ) 2865 if params is not None: 2866 existing_cols = pipe.get_columns_types(debug=debug) 2867 valid_params = {k: v for k, v in params.items() if k in existing_cols} 2868 if valid_params: 2869 query += build_where(valid_params, conn).replace('WHERE', ( 2870 'AND' if (begin is not None or end is not None) 2871 else 'WHERE' 2872 ) 2873 ) 2874 2875 result = conn.value(query, debug=debug, silent=True) 2876 try: 2877 return int(result) 2878 except Exception: 2879 return None
Get the rowcount for a pipe in accordance with given parameters.
Parameters
- pipe (mrsm.Pipe): The pipe to query with.
- begin (Union[datetime, int, None], default None): The begin datetime value.
- end (Union[datetime, int, None], default None): The end datetime value.
- params (Optional[Dict[str, Any]], default None):
See
meerschaum.utils.sql.build_where
. - remote (bool, default False):
If
True
, get the rowcount for the remote table. - debug (bool, default False): Verbosity toggle.
Returns
- An
int
for the number of rows if thepipe
exists, otherwiseNone
.
2882def drop_pipe( 2883 self, 2884 pipe: mrsm.Pipe, 2885 debug: bool = False, 2886 **kw 2887) -> SuccessTuple: 2888 """ 2889 Drop a pipe's tables but maintain its registration. 2890 2891 Parameters 2892 ---------- 2893 pipe: mrsm.Pipe 2894 The pipe to drop. 2895 2896 Returns 2897 ------- 2898 A `SuccessTuple` indicated success. 2899 """ 2900 from meerschaum.utils.sql import table_exists, sql_item_name, DROP_IF_EXISTS_FLAVORS 2901 success = True 2902 target = pipe.target 2903 schema = self.get_pipe_schema(pipe) 2904 target_name = ( 2905 sql_item_name(target, self.flavor, schema) 2906 ) 2907 if table_exists(target, self, schema=schema, debug=debug): 2908 if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else "" 2909 success = self.exec( 2910 f"DROP TABLE {if_exists_str} {target_name}", silent=True, debug=debug 2911 ) is not None 2912 2913 msg = "Success" if success else f"Failed to drop {pipe}." 2914 return success, msg
Drop a pipe's tables but maintain its registration.
Parameters
- pipe (mrsm.Pipe): The pipe to drop.
Returns
- A
SuccessTuple
indicated success.
2917def clear_pipe( 2918 self, 2919 pipe: mrsm.Pipe, 2920 begin: Union[datetime, int, None] = None, 2921 end: Union[datetime, int, None] = None, 2922 params: Optional[Dict[str, Any]] = None, 2923 debug: bool = False, 2924 **kw 2925) -> SuccessTuple: 2926 """ 2927 Delete a pipe's data within a bounded or unbounded interval without dropping the table. 2928 2929 Parameters 2930 ---------- 2931 pipe: mrsm.Pipe 2932 The pipe to clear. 2933 2934 begin: Union[datetime, int, None], default None 2935 Beginning datetime. Inclusive. 2936 2937 end: Union[datetime, int, None], default None 2938 Ending datetime. Exclusive. 2939 2940 params: Optional[Dict[str, Any]], default None 2941 See `meerschaum.utils.sql.build_where`. 2942 2943 """ 2944 if not pipe.exists(debug=debug): 2945 return True, f"{pipe} does not exist, so nothing was cleared." 2946 2947 from meerschaum.utils.sql import sql_item_name, build_where, dateadd_str 2948 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 2949 pipe_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 2950 2951 dt_col = pipe.columns.get('datetime', None) 2952 dt_typ = pipe.dtypes.get(dt_col, 'datetime') if dt_col else None 2953 dt_db_type = get_db_type_from_pd_type(dt_typ, self.flavor) if dt_typ else None 2954 if not pipe.columns.get('datetime', None): 2955 dt_col = pipe.guess_datetime() 2956 dt_name = sql_item_name(dt_col, self.flavor, None) if dt_col else None 2957 is_guess = True 2958 else: 2959 dt_col = pipe.get_columns('datetime') 2960 dt_name = sql_item_name(dt_col, self.flavor, None) 2961 is_guess = False 2962 2963 if begin is not None or end is not None: 2964 if is_guess: 2965 if dt_col is None: 2966 warn( 2967 f"No datetime could be determined for {pipe}." 2968 + "\n Ignoring datetime bounds...", 2969 stack=False, 2970 ) 2971 begin, end = None, None 2972 else: 2973 warn( 2974 f"A datetime wasn't specified for {pipe}.\n" 2975 + f" Using column \"{dt_col}\" for datetime bounds...", 2976 stack=False, 2977 ) 2978 2979 valid_params = {} 2980 if params is not None: 2981 existing_cols = pipe.get_columns_types(debug=debug) 2982 valid_params = {k: v for k, v in params.items() if k in existing_cols} 2983 clear_query = ( 2984 f"DELETE FROM {pipe_name}\nWHERE 1 = 1\n" 2985 + ('\n AND ' + build_where(valid_params, self, with_where=False) if valid_params else '') 2986 + ( 2987 ( 2988 f'\n AND {dt_name} >= ' 2989 + dateadd_str(self.flavor, 'day', 0, begin, db_type=dt_db_type) 2990 ) 2991 if begin is not None 2992 else '' 2993 ) + ( 2994 ( 2995 f'\n AND {dt_name} < ' 2996 + dateadd_str(self.flavor, 'day', 0, end, db_type=dt_db_type) 2997 ) 2998 if end is not None 2999 else '' 3000 ) 3001 ) 3002 success = self.exec(clear_query, silent=True, debug=debug) is not None 3003 msg = "Success" if success else f"Failed to clear {pipe}." 3004 return success, msg
Delete a pipe's data within a bounded or unbounded interval without dropping the table.
Parameters
- pipe (mrsm.Pipe): The pipe to clear.
- begin (Union[datetime, int, None], default None): Beginning datetime. Inclusive.
- end (Union[datetime, int, None], default None): Ending datetime. Exclusive.
- params (Optional[Dict[str, Any]], default None):
See
meerschaum.utils.sql.build_where
.
3608def deduplicate_pipe( 3609 self, 3610 pipe: mrsm.Pipe, 3611 begin: Union[datetime, int, None] = None, 3612 end: Union[datetime, int, None] = None, 3613 params: Optional[Dict[str, Any]] = None, 3614 debug: bool = False, 3615 **kwargs: Any 3616) -> SuccessTuple: 3617 """ 3618 Delete duplicate values within a pipe's table. 3619 3620 Parameters 3621 ---------- 3622 pipe: mrsm.Pipe 3623 The pipe whose table to deduplicate. 3624 3625 begin: Union[datetime, int, None], default None 3626 If provided, only deduplicate values greater than or equal to this value. 3627 3628 end: Union[datetime, int, None], default None 3629 If provided, only deduplicate values less than this value. 3630 3631 params: Optional[Dict[str, Any]], default None 3632 If provided, further limit deduplication to values which match this query dictionary. 3633 3634 debug: bool, default False 3635 Verbosity toggle. 3636 3637 Returns 3638 ------- 3639 A `SuccessTuple` indicating success. 3640 """ 3641 from meerschaum.utils.sql import ( 3642 sql_item_name, 3643 get_rename_table_queries, 3644 DROP_IF_EXISTS_FLAVORS, 3645 get_create_table_query, 3646 format_cte_subquery, 3647 get_null_replacement, 3648 ) 3649 from meerschaum.utils.misc import generate_password, flatten_list 3650 3651 pipe_table_name = sql_item_name(pipe.target, self.flavor, self.get_pipe_schema(pipe)) 3652 3653 if not pipe.exists(debug=debug): 3654 return False, f"Table {pipe_table_name} does not exist." 3655 3656 dt_col = pipe.columns.get('datetime', None) 3657 cols_types = pipe.get_columns_types(debug=debug) 3658 existing_cols = pipe.get_columns_types(debug=debug) 3659 3660 get_rowcount_query = f"SELECT COUNT(*) FROM {pipe_table_name}" 3661 old_rowcount = self.value(get_rowcount_query, debug=debug) 3662 if old_rowcount is None: 3663 return False, f"Failed to get rowcount for table {pipe_table_name}." 3664 3665 ### Non-datetime indices that in fact exist. 3666 indices = [ 3667 col 3668 for key, col in pipe.columns.items() 3669 if col and col != dt_col and col in cols_types 3670 ] 3671 indices_names = [sql_item_name(index_col, self.flavor, None) for index_col in indices] 3672 existing_cols_names = [sql_item_name(col, self.flavor, None) for col in existing_cols] 3673 duplicate_row_number_name = sql_item_name('dup_row_num', self.flavor, None) 3674 previous_row_number_name = sql_item_name('prev_row_num', self.flavor, None) 3675 3676 index_list_str = ( 3677 sql_item_name(dt_col, self.flavor, None) 3678 if dt_col 3679 else '' 3680 ) 3681 index_list_str_ordered = ( 3682 ( 3683 sql_item_name(dt_col, self.flavor, None) + " DESC" 3684 ) 3685 if dt_col 3686 else '' 3687 ) 3688 if indices: 3689 index_list_str += ', ' + ', '.join(indices_names) 3690 index_list_str_ordered += ', ' + ', '.join(indices_names) 3691 if index_list_str.startswith(','): 3692 index_list_str = index_list_str.lstrip(',').lstrip() 3693 if index_list_str_ordered.startswith(','): 3694 index_list_str_ordered = index_list_str_ordered.lstrip(',').lstrip() 3695 3696 cols_list_str = ', '.join(existing_cols_names) 3697 3698 try: 3699 ### NOTE: MySQL 5 and below does not support window functions (ROW_NUMBER()). 3700 is_old_mysql = ( 3701 self.flavor in ('mysql', 'mariadb') 3702 and 3703 int(self.db_version.split('.')[0]) < 8 3704 ) 3705 except Exception: 3706 is_old_mysql = False 3707 3708 src_query = f""" 3709 SELECT 3710 {cols_list_str}, 3711 ROW_NUMBER() OVER ( 3712 PARTITION BY 3713 {index_list_str} 3714 ORDER BY {index_list_str_ordered} 3715 ) AS {duplicate_row_number_name} 3716 FROM {pipe_table_name} 3717 """ 3718 duplicates_cte_subquery = format_cte_subquery( 3719 src_query, 3720 self.flavor, 3721 sub_name = 'src', 3722 cols_to_select = cols_list_str, 3723 ) + f""" 3724 WHERE {duplicate_row_number_name} = 1 3725 """ 3726 old_mysql_query = ( 3727 f""" 3728 SELECT 3729 {index_list_str} 3730 FROM ( 3731 SELECT 3732 {index_list_str}, 3733 IF( 3734 @{previous_row_number_name} <> {index_list_str.replace(', ', ' + ')}, 3735 @{duplicate_row_number_name} := 0, 3736 @{duplicate_row_number_name} 3737 ), 3738 @{previous_row_number_name} := {index_list_str.replace(', ', ' + ')}, 3739 @{duplicate_row_number_name} := @{duplicate_row_number_name} + 1 AS """ 3740 + f"""{duplicate_row_number_name} 3741 FROM 3742 {pipe_table_name}, 3743 ( 3744 SELECT @{duplicate_row_number_name} := 0 3745 ) AS {duplicate_row_number_name}, 3746 ( 3747 SELECT @{previous_row_number_name} := '{get_null_replacement('str', 'mysql')}' 3748 ) AS {previous_row_number_name} 3749 ORDER BY {index_list_str_ordered} 3750 ) AS t 3751 WHERE {duplicate_row_number_name} = 1 3752 """ 3753 ) 3754 if is_old_mysql: 3755 duplicates_cte_subquery = old_mysql_query 3756 3757 session_id = generate_password(3) 3758 3759 dedup_table = self.get_temporary_target(pipe.target, transact_id=session_id, label='dedup') 3760 temp_old_table = self.get_temporary_target(pipe.target, transact_id=session_id, label='old') 3761 temp_old_table_name = sql_item_name(temp_old_table, self.flavor, self.get_pipe_schema(pipe)) 3762 3763 create_temporary_table_query = get_create_table_query( 3764 duplicates_cte_subquery, 3765 dedup_table, 3766 self.flavor, 3767 ) + f""" 3768 ORDER BY {index_list_str_ordered} 3769 """ 3770 if_exists_str = "IF EXISTS" if self.flavor in DROP_IF_EXISTS_FLAVORS else "" 3771 alter_queries = flatten_list([ 3772 get_rename_table_queries( 3773 pipe.target, 3774 temp_old_table, 3775 self.flavor, 3776 schema=self.get_pipe_schema(pipe), 3777 ), 3778 get_rename_table_queries( 3779 dedup_table, 3780 pipe.target, 3781 self.flavor, 3782 schema=self.get_pipe_schema(pipe), 3783 ), 3784 f"DROP TABLE {if_exists_str} {temp_old_table_name}", 3785 ]) 3786 3787 self._log_temporary_tables_creation(temp_old_table, create=(not pipe.temporary), debug=debug) 3788 create_temporary_result = self.execute(create_temporary_table_query, debug=debug) 3789 if create_temporary_result is None: 3790 return False, f"Failed to deduplicate table {pipe_table_name}." 3791 3792 results = self.exec_queries( 3793 alter_queries, 3794 break_on_error=True, 3795 rollback=True, 3796 debug=debug, 3797 ) 3798 3799 fail_query = None 3800 for result, query in zip(results, alter_queries): 3801 if result is None: 3802 fail_query = query 3803 break 3804 success = fail_query is None 3805 3806 new_rowcount = ( 3807 self.value(get_rowcount_query, debug=debug) 3808 if success 3809 else None 3810 ) 3811 3812 msg = ( 3813 ( 3814 f"Successfully deduplicated table {pipe_table_name}" 3815 + ( 3816 f"\nfrom {old_rowcount:,} to {new_rowcount:,} rows" 3817 if old_rowcount != new_rowcount 3818 else '' 3819 ) + '.' 3820 ) 3821 if success 3822 else f"Failed to execute query:\n{fail_query}" 3823 ) 3824 return success, msg
Delete duplicate values within a pipe's table.
Parameters
- pipe (mrsm.Pipe): The pipe whose table to deduplicate.
- begin (Union[datetime, int, None], default None): If provided, only deduplicate values greater than or equal to this value.
- end (Union[datetime, int, None], default None): If provided, only deduplicate values less than this value.
- params (Optional[Dict[str, Any]], default None): If provided, further limit deduplication to values which match this query dictionary.
- debug (bool, default False): Verbosity toggle.
Returns
- A
SuccessTuple
indicating success.
3007def get_pipe_table( 3008 self, 3009 pipe: mrsm.Pipe, 3010 debug: bool = False, 3011) -> Union['sqlalchemy.Table', None]: 3012 """ 3013 Return the `sqlalchemy.Table` object for a `mrsm.Pipe`. 3014 3015 Parameters 3016 ---------- 3017 pipe: mrsm.Pipe: 3018 The pipe in question. 3019 3020 Returns 3021 ------- 3022 A `sqlalchemy.Table` object. 3023 3024 """ 3025 from meerschaum.utils.sql import get_sqlalchemy_table 3026 if not pipe.exists(debug=debug): 3027 return None 3028 return get_sqlalchemy_table( 3029 pipe.target, 3030 connector=self, 3031 schema=self.get_pipe_schema(pipe), 3032 debug=debug, 3033 refresh=True, 3034 )
Return the sqlalchemy.Table
object for a mrsm.Pipe
.
Parameters
- pipe (mrsm.Pipe:): The pipe in question.
Returns
- A
sqlalchemy.Table
object.
3037def get_pipe_columns_types( 3038 self, 3039 pipe: mrsm.Pipe, 3040 debug: bool = False, 3041) -> Dict[str, str]: 3042 """ 3043 Get the pipe's columns and types. 3044 3045 Parameters 3046 ---------- 3047 pipe: mrsm.Pipe: 3048 The pipe to get the columns for. 3049 3050 Returns 3051 ------- 3052 A dictionary of columns names (`str`) and types (`str`). 3053 3054 Examples 3055 -------- 3056 >>> conn.get_pipe_columns_types(pipe) 3057 { 3058 'dt': 'TIMESTAMP WITHOUT TIMEZONE', 3059 'id': 'BIGINT', 3060 'val': 'DOUBLE PRECISION', 3061 } 3062 >>> 3063 """ 3064 from meerschaum.utils.sql import get_table_cols_types 3065 if not pipe.exists(debug=debug): 3066 return {} 3067 3068 if self.flavor not in ('oracle', 'mysql', 'mariadb', 'sqlite'): 3069 return get_table_cols_types( 3070 pipe.target, 3071 self, 3072 flavor=self.flavor, 3073 schema=self.get_pipe_schema(pipe), 3074 debug=debug, 3075 ) 3076 3077 table_columns = {} 3078 try: 3079 pipe_table = self.get_pipe_table(pipe, debug=debug) 3080 if pipe_table is None: 3081 return {} 3082 for col in pipe_table.columns: 3083 table_columns[str(col.name)] = str(col.type) 3084 except Exception as e: 3085 import traceback 3086 traceback.print_exc() 3087 warn(e) 3088 table_columns = {} 3089 3090 return table_columns
Get the pipe's columns and types.
Parameters
- pipe (mrsm.Pipe:): The pipe to get the columns for.
Returns
- A dictionary of columns names (
str
) and types (str
).
Examples
>>> conn.get_pipe_columns_types(pipe)
{
'dt': 'TIMESTAMP WITHOUT TIMEZONE',
'id': 'BIGINT',
'val': 'DOUBLE PRECISION',
}
>>>
3552def get_to_sql_dtype( 3553 self, 3554 pipe: 'mrsm.Pipe', 3555 df: 'pd.DataFrame', 3556 update_dtypes: bool = True, 3557) -> Dict[str, 'sqlalchemy.sql.visitors.TraversibleType']: 3558 """ 3559 Given a pipe and DataFrame, return the `dtype` dictionary for `to_sql()`. 3560 3561 Parameters 3562 ---------- 3563 pipe: mrsm.Pipe 3564 The pipe which may contain a `dtypes` parameter. 3565 3566 df: pd.DataFrame 3567 The DataFrame to be pushed via `to_sql()`. 3568 3569 update_dtypes: bool, default True 3570 If `True`, patch the pipe's dtypes onto the DataFrame's dtypes. 3571 3572 Returns 3573 ------- 3574 A dictionary with `sqlalchemy` datatypes. 3575 3576 Examples 3577 -------- 3578 >>> import pandas as pd 3579 >>> import meerschaum as mrsm 3580 >>> 3581 >>> conn = mrsm.get_connector('sql:memory') 3582 >>> df = pd.DataFrame([{'a': {'b': 1}}]) 3583 >>> pipe = mrsm.Pipe('a', 'b', dtypes={'a': 'json'}) 3584 >>> get_to_sql_dtype(pipe, df) 3585 {'a': <class 'sqlalchemy.sql.sqltypes.JSON'>} 3586 """ 3587 from meerschaum.utils.dataframe import get_json_cols, get_numeric_cols, get_uuid_cols 3588 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 3589 df_dtypes = { 3590 col: str(typ) 3591 for col, typ in df.dtypes.items() 3592 } 3593 json_cols = get_json_cols(df) 3594 numeric_cols = get_numeric_cols(df) 3595 uuid_cols = get_uuid_cols(df) 3596 df_dtypes.update({col: 'json' for col in json_cols}) 3597 df_dtypes.update({col: 'numeric' for col in numeric_cols}) 3598 df_dtypes.update({col: 'uuid' for col in uuid_cols}) 3599 if update_dtypes: 3600 df_dtypes.update(pipe.dtypes) 3601 return { 3602 col: get_db_type_from_pd_type(typ, self.flavor, as_sqlalchemy=True) 3603 for col, typ in df_dtypes.items() 3604 if col and typ 3605 }
Given a pipe and DataFrame, return the dtype
dictionary for to_sql()
.
Parameters
- pipe (mrsm.Pipe):
The pipe which may contain a
dtypes
parameter. - df (pd.DataFrame):
The DataFrame to be pushed via
to_sql()
. - update_dtypes (bool, default True):
If
True
, patch the pipe's dtypes onto the DataFrame's dtypes.
Returns
- A dictionary with
sqlalchemy
datatypes.
Examples
>>> import pandas as pd
>>> import meerschaum as mrsm
>>>
>>> conn = mrsm.get_connector('sql:memory')
>>> df = pd.DataFrame([{'a': {'b': 1}}])
>>> pipe = mrsm.Pipe('a', 'b', dtypes={'a': 'json'})
>>> get_to_sql_dtype(pipe, df)
{'a': <class 'sqlalchemy.sql.sqltypes.JSON'>}
3827def get_pipe_schema(self, pipe: mrsm.Pipe) -> Union[str, None]: 3828 """ 3829 Return the schema to use for this pipe. 3830 First check `pipe.parameters['schema']`, then check `self.schema`. 3831 3832 Parameters 3833 ---------- 3834 pipe: mrsm.Pipe 3835 The pipe which may contain a configured schema. 3836 3837 Returns 3838 ------- 3839 A schema string or `None` if nothing is configured. 3840 """ 3841 return pipe.parameters.get('schema', self.schema)
Return the schema to use for this pipe.
First check pipe.parameters['schema']
, then check self.schema
.
Parameters
- pipe (mrsm.Pipe): The pipe which may contain a configured schema.
Returns
- A schema string or
None
if nothing is configured.
1521def create_pipe_table_from_df( 1522 self, 1523 pipe: mrsm.Pipe, 1524 df: 'pd.DataFrame', 1525 debug: bool = False, 1526) -> mrsm.SuccessTuple: 1527 """ 1528 Create a pipe's table from its configured dtypes and an incoming dataframe. 1529 """ 1530 from meerschaum.utils.dataframe import ( 1531 get_json_cols, 1532 get_numeric_cols, 1533 get_uuid_cols, 1534 get_datetime_cols, 1535 get_bytes_cols, 1536 ) 1537 from meerschaum.utils.sql import get_create_table_queries, sql_item_name 1538 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type 1539 primary_key = pipe.columns.get('primary', None) 1540 primary_key_typ = ( 1541 pipe.dtypes.get(primary_key, str(df.dtypes.get(primary_key, 'int'))) 1542 if primary_key 1543 else None 1544 ) 1545 primary_key_db_type = ( 1546 get_db_type_from_pd_type(primary_key_typ, self.flavor) 1547 if primary_key 1548 else None 1549 ) 1550 dt_col = pipe.columns.get('datetime', None) 1551 new_dtypes = { 1552 **{ 1553 col: str(typ) 1554 for col, typ in df.dtypes.items() 1555 }, 1556 **{ 1557 col: str(df.dtypes.get(col, 'int')) 1558 for col_ix, col in pipe.columns.items() 1559 if col and col_ix != 'primary' 1560 }, 1561 **{ 1562 col: 'uuid' 1563 for col in get_uuid_cols(df) 1564 }, 1565 **{ 1566 col: 'json' 1567 for col in get_json_cols(df) 1568 }, 1569 **{ 1570 col: 'numeric' 1571 for col in get_numeric_cols(df) 1572 }, 1573 **{ 1574 col: 'bytes' 1575 for col in get_bytes_cols(df) 1576 }, 1577 **{ 1578 col: 'datetime64[ns, UTC]' 1579 for col in get_datetime_cols(df, timezone_aware=True, timezone_naive=False) 1580 }, 1581 **{ 1582 col: 'datetime64[ns]' 1583 for col in get_datetime_cols(df, timezone_aware=False, timezone_naive=True) 1584 }, 1585 **pipe.dtypes 1586 } 1587 autoincrement = ( 1588 pipe.parameters.get('autoincrement', False) 1589 or (primary_key and primary_key not in new_dtypes) 1590 ) 1591 if autoincrement: 1592 _ = new_dtypes.pop(primary_key, None) 1593 1594 create_table_queries = get_create_table_queries( 1595 new_dtypes, 1596 pipe.target, 1597 self.flavor, 1598 schema=self.get_pipe_schema(pipe), 1599 primary_key=primary_key, 1600 primary_key_db_type=primary_key_db_type, 1601 datetime_column=dt_col, 1602 ) 1603 success = all( 1604 self.exec_queries(create_table_queries, break_on_error=True, rollback=True, debug=debug) 1605 ) 1606 target_name = sql_item_name(pipe.target, schema=self.get_pipe_schema(pipe), flavor=self.flavor) 1607 msg = ( 1608 "Success" 1609 if success 1610 else f"Failed to create {target_name}." 1611 ) 1612 return success, msg
Create a pipe's table from its configured dtypes and an incoming dataframe.
3093def get_pipe_columns_indices( 3094 self, 3095 pipe: mrsm.Pipe, 3096 debug: bool = False, 3097) -> Dict[str, List[Dict[str, str]]]: 3098 """ 3099 Return a dictionary mapping columns to the indices created on those columns. 3100 3101 Parameters 3102 ---------- 3103 pipe: mrsm.Pipe 3104 The pipe to be queried against. 3105 3106 Returns 3107 ------- 3108 A dictionary mapping columns names to lists of dictionaries. 3109 The dictionaries in the lists contain the name and type of the indices. 3110 """ 3111 if pipe.__dict__.get('_skip_check_indices', False): 3112 return {} 3113 from meerschaum.utils.sql import get_table_cols_indices 3114 return get_table_cols_indices( 3115 pipe.target, 3116 self, 3117 flavor=self.flavor, 3118 schema=self.get_pipe_schema(pipe), 3119 debug=debug, 3120 )
Return a dictionary mapping columns to the indices created on those columns.
Parameters
- pipe (mrsm.Pipe): The pipe to be queried against.
Returns
- A dictionary mapping columns names to lists of dictionaries.
- The dictionaries in the lists contain the name and type of the indices.
3844@staticmethod 3845def get_temporary_target( 3846 target: str, 3847 transact_id: Optional[str, None] = None, 3848 label: Optional[str] = None, 3849 separator: Optional[str] = None, 3850) -> str: 3851 """ 3852 Return a unique(ish) temporary target for a pipe. 3853 """ 3854 from meerschaum.utils.misc import generate_password 3855 temp_target_cf = ( 3856 mrsm.get_config('system', 'connectors', 'sql', 'instance', 'temporary_target') or {} 3857 ) 3858 transaction_id_len = temp_target_cf.get('transaction_id_length', 3) 3859 transact_id = transact_id or generate_password(transaction_id_len) 3860 temp_prefix = temp_target_cf.get('prefix', '_') 3861 separator = separator or temp_target_cf.get('separator', '_') 3862 return ( 3863 temp_prefix 3864 + target 3865 + separator 3866 + transact_id 3867 + ((separator + label) if label else '') 3868 )
Return a unique(ish) temporary target for a pipe.
314def create_pipe_indices( 315 self, 316 pipe: mrsm.Pipe, 317 columns: Optional[List[str]] = None, 318 debug: bool = False, 319) -> SuccessTuple: 320 """ 321 Create a pipe's indices. 322 """ 323 success = self.create_indices(pipe, columns=columns, debug=debug) 324 msg = ( 325 "Success" 326 if success 327 else f"Failed to create indices for {pipe}." 328 ) 329 return success, msg
Create a pipe's indices.
368def drop_pipe_indices( 369 self, 370 pipe: mrsm.Pipe, 371 columns: Optional[List[str]] = None, 372 debug: bool = False, 373) -> SuccessTuple: 374 """ 375 Drop a pipe's indices. 376 """ 377 success = self.drop_indices(pipe, columns=columns, debug=debug) 378 msg = ( 379 "Success" 380 if success 381 else f"Failed to drop indices for {pipe}." 382 ) 383 return success, msg
Drop a pipe's indices.
421def get_pipe_index_names(self, pipe: mrsm.Pipe) -> Dict[str, str]: 422 """ 423 Return a dictionary mapping index keys to their names on the database. 424 425 Returns 426 ------- 427 A dictionary of index keys to column names. 428 """ 429 from meerschaum.utils.sql import DEFAULT_SCHEMA_FLAVORS 430 _parameters = pipe.parameters 431 _index_template = _parameters.get('index_template', "IX_{schema_str}{target}_{column_names}") 432 _schema = self.get_pipe_schema(pipe) 433 if _schema is None: 434 _schema = ( 435 DEFAULT_SCHEMA_FLAVORS.get(self.flavor, None) 436 if self.flavor != 'mssql' 437 else None 438 ) 439 schema_str = '' if _schema is None else f'{_schema}_' 440 schema_str = '' 441 _indices = pipe.indices 442 _target = pipe.target 443 _column_names = { 444 ix: ( 445 '_'.join(cols) 446 if isinstance(cols, (list, tuple)) 447 else str(cols) 448 ) 449 for ix, cols in _indices.items() 450 if cols 451 } 452 _index_names = { 453 ix: _index_template.format( 454 target=_target, 455 column_names=column_names, 456 connector_keys=pipe.connector_keys, 457 metric_key=pipe.metric_key, 458 location_key=pipe.location_key, 459 schema_str=schema_str, 460 ) 461 for ix, column_names in _column_names.items() 462 } 463 ### NOTE: Skip any duplicate indices. 464 seen_index_names = {} 465 for ix, index_name in _index_names.items(): 466 if index_name in seen_index_names: 467 continue 468 seen_index_names[index_name] = ix 469 return { 470 ix: index_name 471 for index_name, ix in seen_index_names.items() 472 }
Return a dictionary mapping index keys to their names on the database.
Returns
- A dictionary of index keys to column names.
17def register_plugin( 18 self, 19 plugin: 'mrsm.core.Plugin', 20 force: bool = False, 21 debug: bool = False, 22 **kw: Any 23) -> SuccessTuple: 24 """Register a new plugin to the plugins table.""" 25 from meerschaum.utils.packages import attempt_import 26 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 27 from meerschaum.utils.sql import json_flavors 28 from meerschaum.connectors.sql.tables import get_tables 29 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 30 31 old_id = self.get_plugin_id(plugin, debug=debug) 32 33 ### Check for version conflict. May be overridden with `--force`. 34 if old_id is not None and not force: 35 old_version = self.get_plugin_version(plugin, debug=debug) 36 new_version = plugin.version 37 if old_version is None: 38 old_version = '' 39 if new_version is None: 40 new_version = '' 41 42 ### verify that the new version is greater than the old 43 packaging_version = attempt_import('packaging.version') 44 if ( 45 old_version and new_version 46 and packaging_version.parse(old_version) >= packaging_version.parse(new_version) 47 ): 48 return False, ( 49 f"Version '{new_version}' of plugin '{plugin}' " + 50 f"must be greater than existing version '{old_version}'." 51 ) 52 53 bind_variables = { 54 'plugin_name': plugin.name, 55 'version': plugin.version, 56 'attributes': ( 57 json.dumps(plugin.attributes) if self.flavor not in json_flavors else plugin.attributes 58 ), 59 'user_id': plugin.user_id, 60 } 61 62 if old_id is None: 63 query = sqlalchemy.insert(plugins_tbl).values(**bind_variables) 64 else: 65 query = ( 66 sqlalchemy.update(plugins_tbl) 67 .values(**bind_variables) 68 .where(plugins_tbl.c.plugin_id == old_id) 69 ) 70 71 result = self.exec(query, debug=debug) 72 if result is None: 73 return False, f"Failed to register plugin '{plugin}'." 74 return True, f"Successfully registered plugin '{plugin}'."
Register a new plugin to the plugins table.
243def delete_plugin( 244 self, 245 plugin: 'mrsm.core.Plugin', 246 debug: bool = False, 247 **kw: Any 248) -> SuccessTuple: 249 """Delete a plugin from the plugins table.""" 250 from meerschaum.utils.packages import attempt_import 251 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 252 from meerschaum.connectors.sql.tables import get_tables 253 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 254 255 plugin_id = self.get_plugin_id(plugin, debug=debug) 256 if plugin_id is None: 257 return True, f"Plugin '{plugin}' was not registered." 258 259 query = sqlalchemy.delete(plugins_tbl).where(plugins_tbl.c.plugin_id == plugin_id) 260 result = self.exec(query, debug=debug) 261 if result is None: 262 return False, f"Failed to delete plugin '{plugin}'." 263 return True, f"Successfully deleted plugin '{plugin}'."
Delete a plugin from the plugins table.
76def get_plugin_id( 77 self, 78 plugin: 'mrsm.core.Plugin', 79 debug: bool = False 80) -> Optional[int]: 81 """ 82 Return a plugin's ID. 83 """ 84 ### ensure plugins table exists 85 from meerschaum.connectors.sql.tables import get_tables 86 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 87 from meerschaum.utils.packages import attempt_import 88 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 89 90 query = ( 91 sqlalchemy 92 .select(plugins_tbl.c.plugin_id) 93 .where(plugins_tbl.c.plugin_name == plugin.name) 94 ) 95 96 try: 97 return int(self.value(query, debug=debug)) 98 except Exception: 99 return None
Return a plugin's ID.
102def get_plugin_version( 103 self, 104 plugin: 'mrsm.core.Plugin', 105 debug: bool = False 106) -> Optional[str]: 107 """ 108 Return a plugin's version. 109 """ 110 ### ensure plugins table exists 111 from meerschaum.connectors.sql.tables import get_tables 112 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 113 from meerschaum.utils.packages import attempt_import 114 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 115 query = sqlalchemy.select(plugins_tbl.c.version).where(plugins_tbl.c.plugin_name == plugin.name) 116 return self.value(query, debug=debug)
Return a plugin's version.
196def get_plugins( 197 self, 198 user_id: Optional[int] = None, 199 search_term: Optional[str] = None, 200 debug: bool = False, 201 **kw: Any 202) -> List[str]: 203 """ 204 Return a list of all registered plugins. 205 206 Parameters 207 ---------- 208 user_id: Optional[int], default None 209 If specified, filter plugins by a specific `user_id`. 210 211 search_term: Optional[str], default None 212 If specified, add a `WHERE plugin_name LIKE '{search_term}%'` clause to filter the plugins. 213 214 215 Returns 216 ------- 217 A list of plugin names. 218 """ 219 ### ensure plugins table exists 220 from meerschaum.connectors.sql.tables import get_tables 221 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 222 from meerschaum.utils.packages import attempt_import 223 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 224 225 query = sqlalchemy.select(plugins_tbl.c.plugin_name) 226 if user_id is not None: 227 query = query.where(plugins_tbl.c.user_id == user_id) 228 if search_term is not None: 229 query = query.where(plugins_tbl.c.plugin_name.like(search_term + '%')) 230 231 rows = ( 232 self.execute(query).fetchall() 233 if self.flavor != 'duckdb' 234 else [ 235 (row['plugin_name'],) 236 for row in self.read(query).to_dict(orient='records') 237 ] 238 ) 239 240 return [row[0] for row in rows]
Return a list of all registered plugins.
Parameters
- user_id (Optional[int], default None):
If specified, filter plugins by a specific
user_id
. - search_term (Optional[str], default None):
If specified, add a
WHERE plugin_name LIKE '{search_term}%'
clause to filter the plugins.
Returns
- A list of plugin names.
118def get_plugin_user_id( 119 self, 120 plugin: 'mrsm.core.Plugin', 121 debug: bool = False 122) -> Optional[int]: 123 """ 124 Return a plugin's user ID. 125 """ 126 ### ensure plugins table exists 127 from meerschaum.connectors.sql.tables import get_tables 128 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 129 from meerschaum.utils.packages import attempt_import 130 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 131 132 query = ( 133 sqlalchemy 134 .select(plugins_tbl.c.user_id) 135 .where(plugins_tbl.c.plugin_name == plugin.name) 136 ) 137 138 try: 139 return int(self.value(query, debug=debug)) 140 except Exception: 141 return None
Return a plugin's user ID.
143def get_plugin_username( 144 self, 145 plugin: 'mrsm.core.Plugin', 146 debug: bool = False 147) -> Optional[str]: 148 """ 149 Return the username of a plugin's owner. 150 """ 151 ### ensure plugins table exists 152 from meerschaum.connectors.sql.tables import get_tables 153 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 154 users = get_tables(mrsm_instance=self, debug=debug)['users'] 155 from meerschaum.utils.packages import attempt_import 156 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 157 158 query = ( 159 sqlalchemy.select(users.c.username) 160 .where( 161 users.c.user_id == plugins_tbl.c.user_id 162 and plugins_tbl.c.plugin_name == plugin.name 163 ) 164 ) 165 166 return self.value(query, debug=debug)
Return the username of a plugin's owner.
169def get_plugin_attributes( 170 self, 171 plugin: 'mrsm.core.Plugin', 172 debug: bool = False 173) -> Dict[str, Any]: 174 """ 175 Return the attributes of a plugin. 176 """ 177 ### ensure plugins table exists 178 from meerschaum.connectors.sql.tables import get_tables 179 plugins_tbl = get_tables(mrsm_instance=self, debug=debug)['plugins'] 180 from meerschaum.utils.packages import attempt_import 181 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 182 183 query = ( 184 sqlalchemy 185 .select(plugins_tbl.c.attributes) 186 .where(plugins_tbl.c.plugin_name == plugin.name) 187 ) 188 189 _attr = self.value(query, debug=debug) 190 if isinstance(_attr, str): 191 _attr = json.loads(_attr) 192 elif _attr is None: 193 _attr = {} 194 return _attr
Return the attributes of a plugin.
16def register_user( 17 self, 18 user: mrsm.core.User, 19 debug: bool = False, 20 **kw: Any 21) -> SuccessTuple: 22 """Register a new user.""" 23 from meerschaum.utils.packages import attempt_import 24 from meerschaum.utils.sql import json_flavors 25 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 26 27 valid_tuple = valid_username(user.username) 28 if not valid_tuple[0]: 29 return valid_tuple 30 31 old_id = self.get_user_id(user, debug=debug) 32 33 if old_id is not None: 34 return False, f"User '{user}' already exists." 35 36 ### ensure users table exists 37 from meerschaum.connectors.sql.tables import get_tables 38 tables = get_tables(mrsm_instance=self, debug=debug) 39 40 import json 41 bind_variables = { 42 'username': user.username, 43 'email': user.email, 44 'password_hash': user.password_hash, 45 'user_type': user.type, 46 'attributes': ( 47 json.dumps(user.attributes) if self.flavor not in json_flavors else user.attributes 48 ), 49 } 50 if old_id is not None: 51 return False, f"User '{user.username}' already exists." 52 if old_id is None: 53 query = ( 54 sqlalchemy.insert(tables['users']). 55 values(**bind_variables) 56 ) 57 58 result = self.exec(query, debug=debug) 59 if result is None: 60 return False, f"Failed to register user '{user}'." 61 return True, f"Successfully registered user '{user}'."
Register a new user.
152def get_user_id( 153 self, 154 user: 'mrsm.core.User', 155 debug: bool = False 156) -> Optional[int]: 157 """If a user is registered, return the `user_id`.""" 158 ### ensure users table exists 159 from meerschaum.utils.packages import attempt_import 160 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 161 from meerschaum.connectors.sql.tables import get_tables 162 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 163 164 query = ( 165 sqlalchemy.select(users_tbl.c.user_id) 166 .where(users_tbl.c.username == user.username) 167 ) 168 169 result = self.value(query, debug=debug) 170 if result is not None: 171 return int(result) 172 return None
If a user is registered, return the user_id
.
246def get_users( 247 self, 248 debug: bool = False, 249 **kw: Any 250) -> List[str]: 251 """ 252 Get the registered usernames. 253 """ 254 ### ensure users table exists 255 from meerschaum.connectors.sql.tables import get_tables 256 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 257 from meerschaum.utils.packages import attempt_import 258 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 259 260 query = sqlalchemy.select(users_tbl.c.username) 261 262 return list(self.read(query, debug=debug)['username'])
Get the registered usernames.
98def edit_user( 99 self, 100 user: 'mrsm.core.User', 101 debug: bool = False, 102 **kw: Any 103) -> SuccessTuple: 104 """Update an existing user's metadata.""" 105 from meerschaum.utils.packages import attempt_import 106 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 107 from meerschaum.connectors.sql.tables import get_tables 108 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 109 110 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 111 if user_id is None: 112 return False, ( 113 f"User '{user.username}' does not exist. " + 114 f"Register user '{user.username}' before editing." 115 ) 116 user.user_id = user_id 117 118 import json 119 valid_tuple = valid_username(user.username) 120 if not valid_tuple[0]: 121 return valid_tuple 122 123 bind_variables = { 124 'user_id' : user_id, 125 'username' : user.username, 126 } 127 if user.password != '': 128 bind_variables['password_hash'] = user.password_hash 129 if user.email != '': 130 bind_variables['email'] = user.email 131 if user.attributes is not None and user.attributes != {}: 132 bind_variables['attributes'] = ( 133 json.dumps(user.attributes) if self.flavor in ('duckdb',) 134 else user.attributes 135 ) 136 if user.type != '': 137 bind_variables['user_type'] = user.type 138 139 query = ( 140 sqlalchemy 141 .update(users_tbl) 142 .values(**bind_variables) 143 .where(users_tbl.c.user_id == user_id) 144 ) 145 146 result = self.exec(query, debug=debug) 147 if result is None: 148 return False, f"Failed to edit user '{user}'." 149 return True, f"Successfully edited user '{user}'."
Update an existing user's metadata.
214def delete_user( 215 self, 216 user: 'mrsm.core.User', 217 debug: bool = False 218) -> SuccessTuple: 219 """Delete a user's record from the users table.""" 220 ### ensure users table exists 221 from meerschaum.connectors.sql.tables import get_tables 222 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 223 plugins = get_tables(mrsm_instance=self, debug=debug)['plugins'] 224 from meerschaum.utils.packages import attempt_import 225 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 226 227 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 228 229 if user_id is None: 230 return False, f"User '{user.username}' is not registered and cannot be deleted." 231 232 query = sqlalchemy.delete(users_tbl).where(users_tbl.c.user_id == user_id) 233 234 result = self.exec(query, debug=debug) 235 if result is None: 236 return False, f"Failed to delete user '{user}'." 237 238 query = sqlalchemy.delete(plugins).where(plugins.c.user_id == user_id) 239 result = self.exec(query, debug=debug) 240 if result is None: 241 return False, f"Failed to delete plugins of user '{user}'." 242 243 return True, f"Successfully deleted user '{user}'"
Delete a user's record from the users table.
265def get_user_password_hash( 266 self, 267 user: 'mrsm.core.User', 268 debug: bool = False, 269 **kw: Any 270) -> Optional[str]: 271 """ 272 Return the password has for a user. 273 **NOTE**: This may be dangerous and is only allowed if the security settings explicity allow it. 274 """ 275 from meerschaum.utils.debug import dprint 276 from meerschaum.connectors.sql.tables import get_tables 277 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 278 from meerschaum.utils.packages import attempt_import 279 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 280 281 if user.user_id is not None: 282 user_id = user.user_id 283 if debug: 284 dprint(f"Already given user_id: {user_id}") 285 else: 286 if debug: 287 dprint("Fetching user_id...") 288 user_id = self.get_user_id(user, debug=debug) 289 290 if user_id is None: 291 return None 292 293 query = sqlalchemy.select(users_tbl.c.password_hash).where(users_tbl.c.user_id == user_id) 294 295 return self.value(query, debug=debug)
Return the password has for a user. NOTE: This may be dangerous and is only allowed if the security settings explicity allow it.
298def get_user_type( 299 self, 300 user: 'mrsm.core.User', 301 debug: bool = False, 302 **kw: Any 303) -> Optional[str]: 304 """ 305 Return the user's type. 306 """ 307 from meerschaum.connectors.sql.tables import get_tables 308 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 309 from meerschaum.utils.packages import attempt_import 310 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 311 312 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 313 314 if user_id is None: 315 return None 316 317 query = sqlalchemy.select(users_tbl.c.user_type).where(users_tbl.c.user_id == user_id) 318 319 return self.value(query, debug=debug)
Return the user's type.
174def get_user_attributes( 175 self, 176 user: 'mrsm.core.User', 177 debug: bool = False 178) -> Union[Dict[str, Any], None]: 179 """ 180 Return the user's attributes. 181 """ 182 ### ensure users table exists 183 from meerschaum.utils.warnings import warn 184 from meerschaum.utils.packages import attempt_import 185 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 186 from meerschaum.connectors.sql.tables import get_tables 187 users_tbl = get_tables(mrsm_instance=self, debug=debug)['users'] 188 189 user_id = user.user_id if user.user_id is not None else self.get_user_id(user, debug=debug) 190 191 query = ( 192 sqlalchemy.select(users_tbl.c.attributes) 193 .where(users_tbl.c.user_id == user_id) 194 ) 195 196 result = self.value(query, debug=debug) 197 if result is not None and not isinstance(result, dict): 198 try: 199 result = dict(result) 200 _parsed = True 201 except Exception: 202 _parsed = False 203 if not _parsed: 204 try: 205 import json 206 result = json.loads(result) 207 _parsed = True 208 except Exception: 209 _parsed = False 210 if not _parsed: 211 warn(f"Received unexpected type for attributes: {result}") 212 return result
Return the user's attributes.
15@classmethod 16def from_uri( 17 cls, 18 uri: str, 19 label: Optional[str] = None, 20 as_dict: bool = False, 21) -> Union[ 22 'meerschaum.connectors.SQLConnector', 23 Dict[str, Union[str, int]], 24]: 25 """ 26 Create a new SQLConnector from a URI string. 27 28 Parameters 29 ---------- 30 uri: str 31 The URI connection string. 32 33 label: Optional[str], default None 34 If provided, use this as the connector label. 35 Otherwise use the determined database name. 36 37 as_dict: bool, default False 38 If `True`, return a dictionary of the keyword arguments 39 necessary to create a new `SQLConnector`, otherwise create a new object. 40 41 Returns 42 ------- 43 A new SQLConnector object or a dictionary of attributes (if `as_dict` is `True`). 44 """ 45 46 params = cls.parse_uri(uri) 47 params['uri'] = uri 48 flavor = params.get('flavor', None) 49 if not flavor or flavor not in cls.flavor_configs: 50 error(f"Invalid flavor '{flavor}' detected from the provided URI.") 51 52 if 'database' not in params: 53 error("Unable to determine the database from the provided URI.") 54 55 if flavor in ('sqlite', 'duckdb'): 56 if params['database'] == ':memory:': 57 params['label'] = label or f'memory_{flavor}' 58 else: 59 params['label'] = label or params['database'].split(os.path.sep)[-1].lower() 60 else: 61 params['label'] = label or ( 62 ( 63 (params['username'] + '@' if 'username' in params else '') 64 + params.get('host', '') 65 + ('/' if 'host' in params else '') 66 + params.get('database', '') 67 ).lower() 68 ) 69 70 return cls(**params) if not as_dict else params
Create a new SQLConnector from a URI string.
Parameters
- uri (str): The URI connection string.
- label (Optional[str], default None): If provided, use this as the connector label. Otherwise use the determined database name.
- as_dict (bool, default False):
If
True
, return a dictionary of the keyword arguments necessary to create a newSQLConnector
, otherwise create a new object.
Returns
- A new SQLConnector object or a dictionary of attributes (if
as_dict
isTrue
).
73@staticmethod 74def parse_uri(uri: str) -> Dict[str, Any]: 75 """ 76 Parse a URI string into a dictionary of parameters. 77 78 Parameters 79 ---------- 80 uri: str 81 The database connection URI. 82 83 Returns 84 ------- 85 A dictionary of attributes. 86 87 Examples 88 -------- 89 >>> parse_uri('sqlite:////home/foo/bar.db') 90 {'database': '/home/foo/bar.db', 'flavor': 'sqlite'} 91 >>> parse_uri( 92 ... 'mssql+pyodbc://sa:supersecureSECRETPASSWORD123!@localhost:1439' 93 ... + '/master?driver=ODBC+Driver+17+for+SQL+Server' 94 ... ) 95 {'host': 'localhost', 'database': 'master', 'username': 'sa', 96 'password': 'supersecureSECRETPASSWORD123!', 'port': 1439, 'flavor': 'mssql', 97 'driver': 'ODBC Driver 17 for SQL Server'} 98 >>> 99 """ 100 from urllib.parse import parse_qs, urlparse 101 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 102 parser = sqlalchemy.engine.url.make_url 103 params = parser(uri).translate_connect_args() 104 params['flavor'] = uri.split(':')[0].split('+')[0] 105 if params['flavor'] == 'postgres': 106 params['flavor'] = 'postgresql' 107 if '?' in uri: 108 parsed_uri = urlparse(uri) 109 for key, value in parse_qs(parsed_uri.query).items(): 110 params.update({key: value[0]}) 111 112 if '--search_path' in params.get('options', ''): 113 params.update({'schema': params['options'].replace('--search_path=', '', 1)}) 114 return params
Parse a URI string into a dictionary of parameters.
Parameters
- uri (str): The database connection URI.
Returns
- A dictionary of attributes.
Examples
>>> parse_uri('sqlite:////home/foo/bar.db')
{'database': '/home/foo/bar.db', 'flavor': 'sqlite'}
>>> parse_uri(
... 'mssql+pyodbc://sa:supersecureSECRETPASSWORD123!@localhost:1439'
... + '/master?driver=ODBC+Driver+17+for+SQL+Server'
... )
{'host': 'localhost', 'database': 'master', 'username': 'sa',
'password': 'supersecureSECRETPASSWORD123!', 'port': 1439, 'flavor': 'mssql',
'driver': 'ODBC Driver 17 for SQL Server'}
>>>