meerschaum.utils.sql
Flavor-specific SQL tools.
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Flavor-specific SQL tools. 7""" 8 9from __future__ import annotations 10from datetime import datetime, timezone, timedelta 11import meerschaum as mrsm 12from meerschaum.utils.typing import Optional, Dict, Any, Union, List, Iterable, Tuple 13### Preserve legacy imports. 14from meerschaum.utils.dtypes.sql import ( 15 DB_TO_PD_DTYPES, 16 PD_TO_DB_DTYPES_FLAVORS, 17 get_pd_type_from_db_type as get_pd_type, 18 get_db_type_from_pd_type as get_db_type, 19) 20from meerschaum.utils.warnings import warn 21from meerschaum.utils.debug import dprint 22 23test_queries = { 24 'default' : 'SELECT 1', 25 'oracle' : 'SELECT 1 FROM DUAL', 26 'informix' : 'SELECT COUNT(*) FROM systables', 27 'hsqldb' : 'SELECT 1 FROM INFORMATION_SCHEMA.SYSTEM_USERS', 28} 29### `table_name` is the escaped name of the table. 30### `table` is the unescaped name of the table. 31exists_queries = { 32 'default': "SELECT COUNT(*) FROM {table_name} WHERE 1 = 0", 33} 34version_queries = { 35 'default': "SELECT VERSION() AS {version_name}", 36 'sqlite': "SELECT SQLITE_VERSION() AS {version_name}", 37 'mssql': "SELECT @@version", 38 'oracle': "SELECT version from PRODUCT_COMPONENT_VERSION WHERE rownum = 1", 39} 40SKIP_IF_EXISTS_FLAVORS = {'mssql', 'oracle'} 41DROP_IF_EXISTS_FLAVORS = { 42 'timescaledb', 'postgresql', 'citus', 'mssql', 'mysql', 'mariadb', 'sqlite', 43} 44COALESCE_UNIQUE_INDEX_FLAVORS = {'timescaledb', 'postgresql', 'citus'} 45update_queries = { 46 'default': """ 47 UPDATE {target_table_name} AS f 48 {sets_subquery_none} 49 FROM {target_table_name} AS t 50 INNER JOIN (SELECT DISTINCT {patch_cols_str} FROM {patch_table_name}) AS p 51 ON {and_subquery_t} 52 WHERE 53 {and_subquery_f} 54 AND {date_bounds_subquery} 55 """, 56 'timescaledb-upsert': """ 57 INSERT INTO {target_table_name} ({patch_cols_str}) 58 SELECT {patch_cols_str} 59 FROM {patch_table_name} 60 ON CONFLICT ({join_cols_str}) DO {update_or_nothing} {sets_subquery_none_excluded} 61 """, 62 'postgresql-upsert': """ 63 INSERT INTO {target_table_name} ({patch_cols_str}) 64 SELECT {patch_cols_str} 65 FROM {patch_table_name} 66 ON CONFLICT ({join_cols_str}) DO {update_or_nothing} {sets_subquery_none_excluded} 67 """, 68 'citus-upsert': """ 69 INSERT INTO {target_table_name} ({patch_cols_str}) 70 SELECT {patch_cols_str} 71 FROM {patch_table_name} 72 ON CONFLICT ({join_cols_str}) DO {update_or_nothing} {sets_subquery_none_excluded} 73 """, 74 'cockroachdb-upsert': """ 75 INSERT INTO {target_table_name} ({patch_cols_str}) 76 SELECT {patch_cols_str} 77 FROM {patch_table_name} 78 ON CONFLICT ({join_cols_str}) DO {update_or_nothing} {sets_subquery_none_excluded} 79 """, 80 'mysql': """ 81 UPDATE {target_table_name} AS f 82 JOIN (SELECT DISTINCT {patch_cols_str} FROM {patch_table_name}) AS p 83 ON {and_subquery_f} 84 {sets_subquery_f} 85 WHERE {date_bounds_subquery} 86 """, 87 'mysql-upsert': """ 88 REPLACE INTO {target_table_name} ({patch_cols_str}) 89 SELECT {patch_cols_str} 90 FROM {patch_table_name} 91 """, 92 'mariadb': """ 93 UPDATE {target_table_name} AS f 94 JOIN (SELECT DISTINCT {patch_cols_str} FROM {patch_table_name}) AS p 95 ON {and_subquery_f} 96 {sets_subquery_f} 97 WHERE {date_bounds_subquery} 98 """, 99 'mariadb-upsert': """ 100 REPLACE INTO {target_table_name} ({patch_cols_str}) 101 SELECT {patch_cols_str} 102 FROM {patch_table_name} 103 """, 104 'mssql': """ 105 MERGE {target_table_name} f 106 USING (SELECT DISTINCT {patch_cols_str} FROM {patch_table_name}) p 107 ON {and_subquery_f} 108 AND {date_bounds_subquery} 109 WHEN MATCHED THEN 110 UPDATE 111 {sets_subquery_none}; 112 """, 113 'mssql-upsert': """ 114 MERGE {target_table_name} f 115 USING (SELECT DISTINCT {patch_cols_str} FROM {patch_table_name}) p 116 ON {and_subquery_f} 117 AND {date_bounds_subquery} 118 {when_matched_update_sets_subquery_none} 119 WHEN NOT MATCHED THEN 120 INSERT ({patch_cols_str}) 121 VALUES ({patch_cols_prefixed_str}); 122 """, 123 'oracle': """ 124 MERGE INTO {target_table_name} f 125 USING (SELECT DISTINCT {patch_cols_str} FROM {patch_table_name}) p 126 ON ( 127 {and_subquery_f} 128 AND {date_bounds_subquery} 129 ) 130 WHEN MATCHED THEN 131 UPDATE 132 {sets_subquery_none} 133 """, 134 'sqlite-upsert': """ 135 INSERT INTO {target_table_name} ({patch_cols_str}) 136 SELECT {patch_cols_str} 137 FROM {patch_table_name} 138 WHERE true 139 ON CONFLICT ({join_cols_str}) DO {update_or_nothing} {sets_subquery_none_excluded} 140 """, 141 'sqlite_delete_insert': [ 142 """ 143 DELETE FROM {target_table_name} AS f 144 WHERE ROWID IN ( 145 SELECT t.ROWID 146 FROM {target_table_name} AS t 147 INNER JOIN (SELECT DISTINCT * FROM {patch_table_name}) AS p 148 ON {and_subquery_t} 149 ); 150 """, 151 """ 152 INSERT INTO {target_table_name} AS f 153 SELECT DISTINCT {patch_cols_str} FROM {patch_table_name} AS p 154 """, 155 ], 156} 157columns_types_queries = { 158 'default': """ 159 SELECT 160 table_catalog AS database, 161 table_schema AS schema, 162 table_name AS table, 163 column_name AS column, 164 data_type AS type 165 FROM information_schema.columns 166 WHERE table_name IN ('{table}', '{table_trunc}') 167 """, 168 'sqlite': """ 169 SELECT 170 '' "database", 171 '' "schema", 172 m.name "table", 173 p.name "column", 174 p.type "type" 175 FROM sqlite_master m 176 LEFT OUTER JOIN pragma_table_info((m.name)) p 177 ON m.name <> p.name 178 WHERE m.type = 'table' 179 AND m.name IN ('{table}', '{table_trunc}') 180 """, 181 'mssql': """ 182 SELECT 183 TABLE_CATALOG AS [database], 184 TABLE_SCHEMA AS [schema], 185 TABLE_NAME AS [table], 186 COLUMN_NAME AS [column], 187 DATA_TYPE AS [type] 188 FROM {db_prefix}INFORMATION_SCHEMA.COLUMNS 189 WHERE TABLE_NAME LIKE '{table}%' 190 OR TABLE_NAME LIKE '{table_trunc}%' 191 """, 192 'mysql': """ 193 SELECT 194 TABLE_SCHEMA `database`, 195 TABLE_SCHEMA `schema`, 196 TABLE_NAME `table`, 197 COLUMN_NAME `column`, 198 DATA_TYPE `type` 199 FROM INFORMATION_SCHEMA.COLUMNS 200 WHERE TABLE_NAME IN ('{table}', '{table_trunc}') 201 """, 202 'mariadb': """ 203 SELECT 204 TABLE_SCHEMA `database`, 205 TABLE_SCHEMA `schema`, 206 TABLE_NAME `table`, 207 COLUMN_NAME `column`, 208 DATA_TYPE `type` 209 FROM INFORMATION_SCHEMA.COLUMNS 210 WHERE TABLE_NAME IN ('{table}', '{table_trunc}') 211 """, 212 'oracle': """ 213 SELECT 214 NULL AS "database", 215 NULL AS "schema", 216 TABLE_NAME AS "table", 217 COLUMN_NAME AS "column", 218 DATA_TYPE AS "type" 219 FROM all_tab_columns 220 WHERE TABLE_NAME IN ( 221 '{table}', 222 '{table_trunc}', 223 '{table_lower}', 224 '{table_lower_trunc}', 225 '{table_upper}', 226 '{table_upper_trunc}' 227 ) 228 """, 229} 230hypertable_queries = { 231 'timescaledb': 'SELECT hypertable_size(\'{table_name}\')', 232 'citus': 'SELECT citus_table_size(\'{table_name}\')', 233} 234table_wrappers = { 235 'default' : ('"', '"'), 236 'timescaledb': ('"', '"'), 237 'citus' : ('"', '"'), 238 'duckdb' : ('"', '"'), 239 'postgresql' : ('"', '"'), 240 'sqlite' : ('"', '"'), 241 'mysql' : ('`', '`'), 242 'mariadb' : ('`', '`'), 243 'mssql' : ('[', ']'), 244 'cockroachdb': ('"', '"'), 245 'oracle' : ('"', '"'), 246} 247max_name_lens = { 248 'default' : 64, 249 'mssql' : 128, 250 'oracle' : 30, 251 'postgresql' : 64, 252 'timescaledb': 64, 253 'citus' : 64, 254 'cockroachdb': 64, 255 'sqlite' : 1024, ### Probably more, but 1024 seems more than reasonable. 256 'mysql' : 64, 257 'mariadb' : 64, 258} 259json_flavors = {'postgresql', 'timescaledb', 'citus', 'cockroachdb'} 260NO_SCHEMA_FLAVORS = {'oracle', 'sqlite', 'mysql', 'mariadb', 'duckdb'} 261DEFAULT_SCHEMA_FLAVORS = { 262 'postgresql': 'public', 263 'timescaledb': 'public', 264 'citus': 'public', 265 'cockroachdb': 'public', 266 'mysql': 'mysql', 267 'mariadb': 'mysql', 268 'mssql': 'dbo', 269} 270OMIT_NULLSFIRST_FLAVORS = {'mariadb', 'mysql', 'mssql'} 271 272SINGLE_ALTER_TABLE_FLAVORS = {'duckdb', 'sqlite', 'mssql', 'oracle'} 273NO_CTE_FLAVORS = {'mysql', 'mariadb'} 274NO_SELECT_INTO_FLAVORS = {'sqlite', 'oracle', 'mysql', 'mariadb', 'duckdb'} 275 276 277def clean(substring: str) -> str: 278 """ 279 Ensure a substring is clean enough to be inserted into a SQL query. 280 Raises an exception when banned words are used. 281 """ 282 from meerschaum.utils.warnings import error 283 banned_symbols = [';', '--', 'drop ',] 284 for symbol in banned_symbols: 285 if symbol in str(substring).lower(): 286 error(f"Invalid string: '{substring}'") 287 288 289def dateadd_str( 290 flavor: str = 'postgresql', 291 datepart: str = 'day', 292 number: Union[int, float] = 0, 293 begin: Union[str, datetime, int] = 'now' 294) -> str: 295 """ 296 Generate a `DATEADD` clause depending on database flavor. 297 298 Parameters 299 ---------- 300 flavor: str, default `'postgresql'` 301 SQL database flavor, e.g. `'postgresql'`, `'sqlite'`. 302 303 Currently supported flavors: 304 305 - `'postgresql'` 306 - `'timescaledb'` 307 - `'citus'` 308 - `'cockroachdb'` 309 - `'duckdb'` 310 - `'mssql'` 311 - `'mysql'` 312 - `'mariadb'` 313 - `'sqlite'` 314 - `'oracle'` 315 316 datepart: str, default `'day'` 317 Which part of the date to modify. Supported values: 318 319 - `'year'` 320 - `'month'` 321 - `'day'` 322 - `'hour'` 323 - `'minute'` 324 - `'second'` 325 326 number: Union[int, float], default `0` 327 How many units to add to the date part. 328 329 begin: Union[str, datetime], default `'now'` 330 Base datetime to which to add dateparts. 331 332 Returns 333 ------- 334 The appropriate `DATEADD` string for the corresponding database flavor. 335 336 Examples 337 -------- 338 >>> dateadd_str( 339 ... flavor = 'mssql', 340 ... begin = datetime(2022, 1, 1, 0, 0), 341 ... number = 1, 342 ... ) 343 "DATEADD(day, 1, CAST('2022-01-01 00:00:00' AS DATETIME))" 344 >>> dateadd_str( 345 ... flavor = 'postgresql', 346 ... begin = datetime(2022, 1, 1, 0, 0), 347 ... number = 1, 348 ... ) 349 "CAST('2022-01-01 00:00:00' AS TIMESTAMP) + INTERVAL '1 day'" 350 351 """ 352 from meerschaum.utils.debug import dprint 353 from meerschaum.utils.packages import attempt_import 354 from meerschaum.utils.warnings import error 355 dateutil_parser = attempt_import('dateutil.parser') 356 if 'int' in str(type(begin)).lower(): 357 return str(begin) 358 if not begin: 359 return '' 360 361 _original_begin = begin 362 begin_time = None 363 ### Sanity check: make sure `begin` is a valid datetime before we inject anything. 364 if not isinstance(begin, datetime): 365 try: 366 begin_time = dateutil_parser.parse(begin) 367 except Exception: 368 begin_time = None 369 else: 370 begin_time = begin 371 372 ### Unable to parse into a datetime. 373 if begin_time is None: 374 ### Throw an error if banned symbols are included in the `begin` string. 375 clean(str(begin)) 376 ### If begin is a valid datetime, wrap it in quotes. 377 else: 378 if isinstance(begin, datetime) and begin.tzinfo is not None: 379 begin = begin.astimezone(timezone.utc) 380 begin = ( 381 f"'{begin.replace(tzinfo=None)}'" 382 if isinstance(begin, datetime) 383 else f"'{begin}'" 384 ) 385 386 da = "" 387 if flavor in ('postgresql', 'timescaledb', 'cockroachdb', 'citus'): 388 begin = ( 389 f"CAST({begin} AS TIMESTAMP)" if begin != 'now' 390 else "CAST(NOW() AT TIME ZONE 'utc' AS TIMESTAMP)" 391 ) 392 da = begin + (f" + INTERVAL '{number} {datepart}'" if number != 0 else '') 393 394 elif flavor == 'duckdb': 395 begin = f"CAST({begin} AS TIMESTAMP)" if begin != 'now' else 'NOW()' 396 da = begin + (f" + INTERVAL '{number} {datepart}'" if number != 0 else '') 397 398 elif flavor in ('mssql',): 399 if begin_time and begin_time.microsecond != 0: 400 begin = begin[:-4] + "'" 401 begin = f"CAST({begin} AS DATETIME)" if begin != 'now' else 'GETUTCDATE()' 402 da = f"DATEADD({datepart}, {number}, {begin})" if number != 0 else begin 403 404 elif flavor in ('mysql', 'mariadb'): 405 begin = f"CAST({begin} AS DATETIME(6))" if begin != 'now' else 'UTC_TIMESTAMP(6)' 406 da = (f"DATE_ADD({begin}, INTERVAL {number} {datepart})" if number != 0 else begin) 407 408 elif flavor == 'sqlite': 409 da = f"datetime({begin}, '{number} {datepart}')" 410 411 elif flavor == 'oracle': 412 if begin == 'now': 413 begin = str( 414 datetime.now(timezone.utc).replace(tzinfo=None).strftime('%Y:%m:%d %M:%S.%f') 415 ) 416 elif begin_time: 417 begin = str(begin_time.strftime('%Y-%m-%d %H:%M:%S.%f')) 418 dt_format = 'YYYY-MM-DD HH24:MI:SS.FF' 419 _begin = f"'{begin}'" if begin_time else begin 420 da = ( 421 (f"TO_TIMESTAMP({_begin}, '{dt_format}')" if begin_time else _begin) 422 + (f" + INTERVAL '{number}' {datepart}" if number != 0 else "") 423 ) 424 return da 425 426 427def test_connection( 428 self, 429 **kw: Any 430 ) -> Union[bool, None]: 431 """ 432 Test if a successful connection to the database may be made. 433 434 Parameters 435 ---------- 436 **kw: 437 The keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`. 438 439 Returns 440 ------- 441 `True` if a connection is made, otherwise `False` or `None` in case of failure. 442 443 """ 444 import warnings 445 from meerschaum.connectors.poll import retry_connect 446 _default_kw = {'max_retries': 1, 'retry_wait': 0, 'warn': False, 'connector': self} 447 _default_kw.update(kw) 448 with warnings.catch_warnings(): 449 warnings.filterwarnings('ignore', 'Could not') 450 try: 451 return retry_connect(**_default_kw) 452 except Exception as e: 453 return False 454 455 456def get_distinct_col_count( 457 col: str, 458 query: str, 459 connector: Optional[mrsm.connectors.sql.SQLConnector] = None, 460 debug: bool = False 461 ) -> Optional[int]: 462 """ 463 Returns the number of distinct items in a column of a SQL query. 464 465 Parameters 466 ---------- 467 col: str: 468 The column in the query to count. 469 470 query: str: 471 The SQL query to count from. 472 473 connector: Optional[mrsm.connectors.sql.SQLConnector], default None: 474 The SQLConnector to execute the query. 475 476 debug: bool, default False: 477 Verbosity toggle. 478 479 Returns 480 ------- 481 An `int` of the number of columns in the query or `None` if the query fails. 482 483 """ 484 if connector is None: 485 connector = mrsm.get_connector('sql') 486 487 _col_name = sql_item_name(col, connector.flavor, None) 488 489 _meta_query = ( 490 f""" 491 WITH src AS ( {query} ), 492 dist AS ( SELECT DISTINCT {_col_name} FROM src ) 493 SELECT COUNT(*) FROM dist""" 494 ) if connector.flavor not in ('mysql', 'mariadb') else ( 495 f""" 496 SELECT COUNT(*) 497 FROM ( 498 SELECT DISTINCT {_col_name} 499 FROM ({query}) AS src 500 ) AS dist""" 501 ) 502 503 result = connector.value(_meta_query, debug=debug) 504 try: 505 return int(result) 506 except Exception as e: 507 return None 508 509 510def sql_item_name(item: str, flavor: str, schema: Optional[str] = None) -> str: 511 """ 512 Parse SQL items depending on the flavor. 513 514 Parameters 515 ---------- 516 item: str : 517 The database item (table, view, etc.) in need of quotes. 518 519 flavor: str : 520 The database flavor (`'postgresql'`, `'mssql'`, `'sqllite'`, etc.). 521 522 Returns 523 ------- 524 A `str` which contains the input `item` wrapped in the corresponding escape characters. 525 526 Examples 527 -------- 528 >>> sql_item_name('table', 'sqlite') 529 '"table"' 530 >>> sql_item_name('table', 'mssql') 531 "[table]" 532 >>> sql_item_name('table', 'postgresql', schema='abc') 533 '"abc"."table"' 534 535 """ 536 truncated_item = truncate_item_name(str(item), flavor) 537 if flavor == 'oracle': 538 truncated_item = pg_capital(truncated_item) 539 ### NOTE: System-reserved words must be quoted. 540 if truncated_item.lower() in ( 541 'float', 'varchar', 'nvarchar', 'clob', 542 'boolean', 'integer', 'table', 543 ): 544 wrappers = ('"', '"') 545 else: 546 wrappers = ('', '') 547 else: 548 wrappers = table_wrappers.get(flavor, table_wrappers['default']) 549 550 ### NOTE: SQLite does not support schemas. 551 if flavor == 'sqlite': 552 schema = None 553 554 schema_prefix = ( 555 (wrappers[0] + schema + wrappers[1] + '.') 556 if schema is not None 557 else '' 558 ) 559 560 return schema_prefix + wrappers[0] + truncated_item + wrappers[1] 561 562 563def pg_capital(s: str) -> str: 564 """ 565 If string contains a capital letter, wrap it in double quotes. 566 567 Parameters 568 ---------- 569 s: str : 570 The string to be escaped. 571 572 Returns 573 ------- 574 The input string wrapped in quotes only if it needs them. 575 576 Examples 577 -------- 578 >>> pg_capital("My Table") 579 '"My Table"' 580 >>> pg_capital('my_table') 581 'my_table' 582 583 """ 584 if '"' in s: 585 return s 586 needs_quotes = s.startswith('_') 587 for c in str(s): 588 if ord(c) < ord('a') or ord(c) > ord('z'): 589 if not c.isdigit() and c != '_': 590 needs_quotes = True 591 break 592 if needs_quotes: 593 return '"' + s + '"' 594 return s 595 596 597def oracle_capital(s: str) -> str: 598 """ 599 Capitalize the string of an item on an Oracle database. 600 """ 601 return s 602 603 604def truncate_item_name(item: str, flavor: str) -> str: 605 """ 606 Truncate item names to stay within the database flavor's character limit. 607 608 Parameters 609 ---------- 610 item: str 611 The database item being referenced. This string is the "canonical" name internally. 612 613 flavor: str 614 The flavor of the database on which `item` resides. 615 616 Returns 617 ------- 618 The truncated string. 619 """ 620 from meerschaum.utils.misc import truncate_string_sections 621 return truncate_string_sections( 622 item, max_len=max_name_lens.get(flavor, max_name_lens['default']) 623 ) 624 625 626def build_where( 627 params: Dict[str, Any], 628 connector: Optional[meerschaum.connectors.sql.SQLConnector] = None, 629 with_where: bool = True, 630 ) -> str: 631 """ 632 Build the `WHERE` clause based on the input criteria. 633 634 Parameters 635 ---------- 636 params: Dict[str, Any]: 637 The keywords dictionary to convert into a WHERE clause. 638 If a value is a string which begins with an underscore, negate that value 639 (e.g. `!=` instead of `=` or `NOT IN` instead of `IN`). 640 A value of `_None` will be interpreted as `IS NOT NULL`. 641 642 connector: Optional[meerschaum.connectors.sql.SQLConnector], default None: 643 The Meerschaum SQLConnector that will be executing the query. 644 The connector is used to extract the SQL dialect. 645 646 with_where: bool, default True: 647 If `True`, include the leading `'WHERE'` string. 648 649 Returns 650 ------- 651 A `str` of the `WHERE` clause from the input `params` dictionary for the connector's flavor. 652 653 Examples 654 -------- 655 ``` 656 >>> print(build_where({'foo': [1, 2, 3]})) 657 658 WHERE 659 "foo" IN ('1', '2', '3') 660 ``` 661 """ 662 import json 663 from meerschaum.config.static import STATIC_CONFIG 664 from meerschaum.utils.warnings import warn 665 from meerschaum.utils.dtypes import value_is_null, none_if_null 666 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 667 try: 668 params_json = json.dumps(params) 669 except Exception as e: 670 params_json = str(params) 671 bad_words = ['drop ', '--', ';'] 672 for word in bad_words: 673 if word in params_json.lower(): 674 warn(f"Aborting build_where() due to possible SQL injection.") 675 return '' 676 677 if connector is None: 678 from meerschaum import get_connector 679 connector = get_connector('sql') 680 where = "" 681 leading_and = "\n AND " 682 for key, value in params.items(): 683 _key = sql_item_name(key, connector.flavor, None) 684 ### search across a list (i.e. IN syntax) 685 if isinstance(value, Iterable) and not isinstance(value, (dict, str)): 686 includes = [ 687 none_if_null(item) 688 for item in value 689 if not str(item).startswith(negation_prefix) 690 ] 691 null_includes = [item for item in includes if item is None] 692 not_null_includes = [item for item in includes if item is not None] 693 excludes = [ 694 none_if_null(str(item)[len(negation_prefix):]) 695 for item in value 696 if str(item).startswith(negation_prefix) 697 ] 698 null_excludes = [item for item in excludes if item is None] 699 not_null_excludes = [item for item in excludes if item is not None] 700 701 if includes: 702 where += f"{leading_and}(" 703 if not_null_includes: 704 where += f"{_key} IN (" 705 for item in not_null_includes: 706 quoted_item = str(item).replace("'", "''") 707 where += f"'{quoted_item}', " 708 where = where[:-2] + ")" 709 if null_includes: 710 where += ("\n OR " if not_null_includes else "") + f"{_key} IS NULL" 711 if includes: 712 where += ")" 713 714 if excludes: 715 where += f"{leading_and}(" 716 if not_null_excludes: 717 where += f"{_key} NOT IN (" 718 for item in not_null_excludes: 719 quoted_item = str(item).replace("'", "''") 720 where += f"'{quoted_item}', " 721 where = where[:-2] + ")" 722 if null_excludes: 723 where += ("\n AND " if not_null_excludes else "") + f"{_key} IS NOT NULL" 724 if excludes: 725 where += ")" 726 727 continue 728 729 ### search a dictionary 730 elif isinstance(value, dict): 731 import json 732 where += (f"{leading_and}CAST({_key} AS TEXT) = '" + json.dumps(value) + "'") 733 continue 734 735 eq_sign = '=' 736 is_null = 'IS NULL' 737 if value_is_null(str(value).lstrip(negation_prefix)): 738 value = ( 739 (negation_prefix + 'None') 740 if str(value).startswith(negation_prefix) 741 else None 742 ) 743 if str(value).startswith(negation_prefix): 744 value = str(value)[len(negation_prefix):] 745 eq_sign = '!=' 746 if value_is_null(value): 747 value = None 748 is_null = 'IS NOT NULL' 749 quoted_value = str(value).replace("'", "''") 750 where += ( 751 f"{leading_and}{_key} " 752 + (is_null if value is None else f"{eq_sign} '{quoted_value}'") 753 ) 754 755 if len(where) > 1: 756 where = ("\nWHERE\n " if with_where else '') + where[len(leading_and):] 757 return where 758 759 760def table_exists( 761 table: str, 762 connector: mrsm.connectors.sql.SQLConnector, 763 schema: Optional[str] = None, 764 debug: bool = False, 765) -> bool: 766 """Check if a table exists. 767 768 Parameters 769 ---------- 770 table: str: 771 The name of the table in question. 772 773 connector: mrsm.connectors.sql.SQLConnector 774 The connector to the database which holds the table. 775 776 schema: Optional[str], default None 777 Optionally specify the table schema. 778 Defaults to `connector.schema`. 779 780 debug: bool, default False : 781 Verbosity toggle. 782 783 Returns 784 ------- 785 A `bool` indicating whether or not the table exists on the database. 786 787 """ 788 sqlalchemy = mrsm.attempt_import('sqlalchemy') 789 schema = schema or connector.schema 790 insp = sqlalchemy.inspect(connector.engine) 791 truncated_table_name = truncate_item_name(str(table), connector.flavor) 792 return insp.has_table(truncated_table_name, schema=schema) 793 794 795def get_sqlalchemy_table( 796 table: str, 797 connector: Optional[mrsm.connectors.sql.SQLConnector] = None, 798 schema: Optional[str] = None, 799 refresh: bool = False, 800 debug: bool = False, 801) -> Union['sqlalchemy.Table', None]: 802 """ 803 Construct a SQLAlchemy table from its name. 804 805 Parameters 806 ---------- 807 table: str 808 The name of the table on the database. Does not need to be escaped. 809 810 connector: Optional[meerschaum.connectors.sql.SQLConnector], default None: 811 The connector to the database which holds the table. 812 813 schema: Optional[str], default None 814 Specify on which schema the table resides. 815 Defaults to the schema set in `connector`. 816 817 refresh: bool, default False 818 If `True`, rebuild the cached table object. 819 820 debug: bool, default False: 821 Verbosity toggle. 822 823 Returns 824 ------- 825 A `sqlalchemy.Table` object for the table. 826 827 """ 828 if connector is None: 829 from meerschaum import get_connector 830 connector = get_connector('sql') 831 832 if connector.flavor == 'duckdb': 833 return None 834 835 from meerschaum.connectors.sql.tables import get_tables 836 from meerschaum.utils.packages import attempt_import 837 from meerschaum.utils.warnings import warn 838 if refresh: 839 connector.metadata.clear() 840 tables = get_tables(mrsm_instance=connector, debug=debug, create=False) 841 sqlalchemy = attempt_import('sqlalchemy') 842 truncated_table_name = truncate_item_name(str(table), connector.flavor) 843 table_kwargs = { 844 'autoload_with': connector.engine, 845 } 846 if schema: 847 table_kwargs['schema'] = schema 848 849 if refresh or truncated_table_name not in tables: 850 try: 851 tables[truncated_table_name] = sqlalchemy.Table( 852 truncated_table_name, 853 connector.metadata, 854 **table_kwargs 855 ) 856 except sqlalchemy.exc.NoSuchTableError as e: 857 warn(f"Table '{truncated_table_name}' does not exist in '{connector}'.") 858 return None 859 return tables[truncated_table_name] 860 861 862def get_table_cols_types( 863 table: str, 864 connectable: Union[ 865 'mrsm.connectors.sql.SQLConnector', 866 'sqlalchemy.orm.session.Session', 867 'sqlalchemy.engine.base.Engine' 868 ], 869 flavor: Optional[str] = None, 870 schema: Optional[str] = None, 871 database: Optional[str] = None, 872 debug: bool = False, 873) -> Dict[str, str]: 874 """ 875 Return a dictionary mapping a table's columns to data types. 876 This is useful for inspecting tables creating during a not-yet-committed session. 877 878 NOTE: This may return incorrect columns if the schema is not explicitly stated. 879 Use this function if you are confident the table name is unique or if you have 880 and explicit schema. 881 To use the configured schema, get the columns from `get_sqlalchemy_table()` instead. 882 883 Parameters 884 ---------- 885 table: str 886 The name of the table (unquoted). 887 888 connectable: Union[ 889 'mrsm.connectors.sql.SQLConnector', 890 'sqlalchemy.orm.session.Session', 891 ] 892 The connection object used to fetch the columns and types. 893 894 flavor: Optional[str], default None 895 The database dialect flavor to use for the query. 896 If omitted, default to `connectable.flavor`. 897 898 schema: Optional[str], default None 899 If provided, restrict the query to this schema. 900 901 database: Optional[str]. default None 902 If provided, restrict the query to this database. 903 904 Returns 905 ------- 906 A dictionary mapping column names to data types. 907 """ 908 from meerschaum.connectors import SQLConnector 909 sqlalchemy = mrsm.attempt_import('sqlalchemy') 910 flavor = flavor or getattr(connectable, 'flavor', None) 911 if not flavor: 912 raise ValueError("Please provide a database flavor.") 913 if flavor == 'duckdb' and not isinstance(connectable, SQLConnector): 914 raise ValueError("You must provide a SQLConnector when using DuckDB.") 915 if flavor in NO_SCHEMA_FLAVORS: 916 schema = None 917 if schema is None: 918 schema = DEFAULT_SCHEMA_FLAVORS.get(flavor, None) 919 if flavor in ('sqlite', 'duckdb', 'oracle'): 920 database = None 921 table_trunc = truncate_item_name(table, flavor=flavor) 922 table_lower = table.lower() 923 table_upper = table.upper() 924 table_lower_trunc = truncate_item_name(table_lower, flavor=flavor) 925 table_upper_trunc = truncate_item_name(table_upper, flavor=flavor) 926 db_prefix = ( 927 "tempdb." 928 if flavor == 'mssql' and table.startswith('#') 929 else "" 930 ) 931 932 cols_types_query = sqlalchemy.text( 933 columns_types_queries.get( 934 flavor, 935 columns_types_queries['default'] 936 ).format( 937 table=table, 938 table_trunc=table_trunc, 939 table_lower=table_lower, 940 table_lower_trunc=table_lower_trunc, 941 table_upper=table_upper, 942 table_upper_trunc=table_upper_trunc, 943 db_prefix=db_prefix, 944 ) 945 ) 946 947 cols = ['database', 'schema', 'table', 'column', 'type'] 948 result_cols_ix = dict(enumerate(cols)) 949 950 debug_kwargs = {'debug': debug} if isinstance(connectable, SQLConnector) else {} 951 if not debug_kwargs and debug: 952 dprint(cols_types_query) 953 954 try: 955 result_rows = ( 956 [ 957 row 958 for row in connectable.execute(cols_types_query, **debug_kwargs).fetchall() 959 ] 960 if flavor != 'duckdb' 961 else [ 962 tuple([doc[col] for col in cols]) 963 for doc in connectable.read(cols_types_query, debug=debug).to_dict(orient='records') 964 ] 965 ) 966 cols_types_docs = [ 967 { 968 result_cols_ix[i]: val 969 for i, val in enumerate(row) 970 } 971 for row in result_rows 972 ] 973 cols_types_docs_filtered = [ 974 doc 975 for doc in cols_types_docs 976 if ( 977 ( 978 not schema 979 or doc['schema'] == schema 980 ) 981 and 982 ( 983 not database 984 or doc['database'] == database 985 ) 986 ) 987 ] 988 989 ### NOTE: This may return incorrect columns if the schema is not explicitly stated. 990 if cols_types_docs and not cols_types_docs_filtered: 991 cols_types_docs_filtered = cols_types_docs 992 993 return { 994 ( 995 doc['column'] 996 if flavor != 'oracle' else ( 997 ( 998 doc['column'].lower() 999 if (doc['column'].isupper() and doc['column'].replace('_', '').isalpha()) 1000 else doc['column'] 1001 ) 1002 ) 1003 ): doc['type'].upper() 1004 for doc in cols_types_docs_filtered 1005 } 1006 except Exception as e: 1007 warn(f"Failed to fetch columns for table '{table}':\n{e}") 1008 return {} 1009 1010 1011def get_update_queries( 1012 target: str, 1013 patch: str, 1014 connectable: Union[ 1015 mrsm.connectors.sql.SQLConnector, 1016 'sqlalchemy.orm.session.Session' 1017 ], 1018 join_cols: Iterable[str], 1019 flavor: Optional[str] = None, 1020 upsert: bool = False, 1021 datetime_col: Optional[str] = None, 1022 schema: Optional[str] = None, 1023 patch_schema: Optional[str] = None, 1024 debug: bool = False, 1025) -> List[str]: 1026 """ 1027 Build a list of `MERGE`, `UPDATE`, `DELETE`/`INSERT` queries to apply a patch to target table. 1028 1029 Parameters 1030 ---------- 1031 target: str 1032 The name of the target table. 1033 1034 patch: str 1035 The name of the patch table. This should have the same shape as the target. 1036 1037 connectable: Union[meerschaum.connectors.sql.SQLConnector, sqlalchemy.orm.session.Session] 1038 The `SQLConnector` or SQLAlchemy session which will later execute the queries. 1039 1040 join_cols: List[str] 1041 The columns to use to join the patch to the target. 1042 1043 flavor: Optional[str], default None 1044 If using a SQLAlchemy session, provide the expected database flavor. 1045 1046 upsert: bool, default False 1047 If `True`, return an upsert query rather than an update. 1048 1049 datetime_col: Optional[str], default None 1050 If provided, bound the join query using this column as the datetime index. 1051 This must be present on both tables. 1052 1053 schema: Optional[str], default None 1054 If provided, use this schema when quoting the target table. 1055 Defaults to `connector.schema`. 1056 1057 patch_schema: Optional[str], default None 1058 If provided, use this schema when quoting the patch table. 1059 Defaults to `schema`. 1060 1061 debug: bool, default False 1062 Verbosity toggle. 1063 1064 Returns 1065 ------- 1066 A list of query strings to perform the update operation. 1067 """ 1068 from meerschaum.connectors import SQLConnector 1069 from meerschaum.utils.debug import dprint 1070 from meerschaum.utils.dtypes.sql import DB_FLAVORS_CAST_DTYPES 1071 flavor = flavor or (connectable.flavor if isinstance(connectable, SQLConnector) else None) 1072 if not flavor: 1073 raise ValueError("Provide a flavor if using a SQLAlchemy session.") 1074 if ( 1075 flavor == 'sqlite' 1076 and isinstance(connectable, SQLConnector) 1077 and connectable.db_version < '3.33.0' 1078 ): 1079 flavor = 'sqlite_delete_insert' 1080 flavor_key = (f'{flavor}-upsert' if upsert else flavor) 1081 base_queries = update_queries.get( 1082 flavor_key, 1083 update_queries['default'] 1084 ) 1085 if not isinstance(base_queries, list): 1086 base_queries = [base_queries] 1087 schema = schema or (connectable.schema if isinstance(connectable, SQLConnector) else None) 1088 patch_schema = patch_schema or schema 1089 target_table_columns = get_table_cols_types( 1090 target, 1091 connectable, 1092 flavor=flavor, 1093 schema=schema, 1094 debug=debug, 1095 ) 1096 patch_table_columns = get_table_cols_types( 1097 patch, 1098 connectable, 1099 flavor=flavor, 1100 schema=patch_schema, 1101 debug=debug, 1102 ) 1103 1104 patch_cols_str = ', '.join( 1105 [ 1106 sql_item_name(col, flavor) 1107 for col in patch_table_columns 1108 ] 1109 ) 1110 patch_cols_prefixed_str = ', '.join( 1111 [ 1112 'p.' + sql_item_name(col, flavor) 1113 for col in patch_table_columns 1114 ] 1115 ) 1116 1117 join_cols_str = ', '.join( 1118 [ 1119 sql_item_name(col, flavor) 1120 for col in join_cols 1121 ] 1122 ) 1123 1124 value_cols = [] 1125 join_cols_types = [] 1126 if debug: 1127 dprint("target_table_columns:") 1128 mrsm.pprint(target_table_columns) 1129 for c_name, c_type in target_table_columns.items(): 1130 if c_name not in patch_table_columns: 1131 continue 1132 if flavor in DB_FLAVORS_CAST_DTYPES: 1133 c_type = DB_FLAVORS_CAST_DTYPES[flavor].get(c_type.upper(), c_type) 1134 ( 1135 join_cols_types 1136 if c_name in join_cols 1137 else value_cols 1138 ).append((c_name, c_type)) 1139 if debug: 1140 dprint(f"value_cols: {value_cols}") 1141 1142 if not join_cols_types: 1143 return [] 1144 if not value_cols and not upsert: 1145 return [] 1146 1147 coalesce_join_cols_str = ', '.join( 1148 [ 1149 'COALESCE(' 1150 + sql_item_name(c_name, flavor) 1151 + ', ' 1152 + get_null_replacement(c_type, flavor) 1153 + ')' 1154 for c_name, c_type in join_cols_types 1155 ] 1156 ) 1157 1158 update_or_nothing = ('UPDATE' if value_cols else 'NOTHING') 1159 1160 def sets_subquery(l_prefix: str, r_prefix: str): 1161 if not value_cols: 1162 return '' 1163 return 'SET ' + ',\n'.join([ 1164 ( 1165 l_prefix + sql_item_name(c_name, flavor, None) 1166 + ' = ' 1167 + ('CAST(' if flavor != 'sqlite' else '') 1168 + r_prefix 1169 + sql_item_name(c_name, flavor, None) 1170 + (' AS ' if flavor != 'sqlite' else '') 1171 + (c_type.replace('_', ' ') if flavor != 'sqlite' else '') 1172 + (')' if flavor != 'sqlite' else '') 1173 ) for c_name, c_type in value_cols 1174 ]) 1175 1176 def and_subquery(l_prefix: str, r_prefix: str): 1177 return '\nAND\n'.join([ 1178 ( 1179 "COALESCE(" 1180 + l_prefix 1181 + sql_item_name(c_name, flavor, None) 1182 + ", " 1183 + get_null_replacement(c_type, flavor) 1184 + ")" 1185 + ' = ' 1186 + "COALESCE(" 1187 + r_prefix 1188 + sql_item_name(c_name, flavor, None) 1189 + ", " 1190 + get_null_replacement(c_type, flavor) 1191 + ")" 1192 ) for c_name, c_type in join_cols_types 1193 ]) 1194 1195 target_table_name = sql_item_name(target, flavor, schema) 1196 patch_table_name = sql_item_name(patch, flavor, patch_schema) 1197 dt_col_name = sql_item_name(datetime_col, flavor, None) if datetime_col else None 1198 date_bounds_subquery = ( 1199 f""" 1200 f.{dt_col_name} >= (SELECT MIN({dt_col_name}) FROM {patch_table_name}) 1201 AND f.{dt_col_name} <= (SELECT MAX({dt_col_name}) FROM {patch_table_name}) 1202 """ 1203 if datetime_col 1204 else "1 = 1" 1205 ) 1206 1207 ### NOTE: MSSQL upserts must exclude the update portion if only upserting indices. 1208 when_matched_update_sets_subquery_none = "" if not value_cols else ( 1209 "WHEN MATCHED THEN" 1210 f" UPDATE {sets_subquery('', 'p.')}" 1211 ) 1212 1213 return [ 1214 base_query.format( 1215 sets_subquery_none=sets_subquery('', 'p.'), 1216 sets_subquery_none_excluded=sets_subquery('', 'EXCLUDED.'), 1217 sets_subquery_f=sets_subquery('f.', 'p.'), 1218 and_subquery_f=and_subquery('p.', 'f.'), 1219 and_subquery_t=and_subquery('p.', 't.'), 1220 target_table_name=target_table_name, 1221 patch_table_name=patch_table_name, 1222 patch_cols_str=patch_cols_str, 1223 patch_cols_prefixed_str=patch_cols_prefixed_str, 1224 date_bounds_subquery=date_bounds_subquery, 1225 join_cols_str=join_cols_str, 1226 coalesce_join_cols_str=coalesce_join_cols_str, 1227 update_or_nothing=update_or_nothing, 1228 when_matched_update_sets_subquery_none=when_matched_update_sets_subquery_none, 1229 ) 1230 for base_query in base_queries 1231 ] 1232 1233 1234def get_null_replacement(typ: str, flavor: str) -> str: 1235 """ 1236 Return a value that may temporarily be used in place of NULL for this type. 1237 1238 Parameters 1239 ---------- 1240 typ: str 1241 The typ to be converted to NULL. 1242 1243 flavor: str 1244 The database flavor for which this value will be used. 1245 1246 Returns 1247 ------- 1248 A value which may stand in place of NULL for this type. 1249 `'None'` is returned if a value cannot be determined. 1250 """ 1251 from meerschaum.utils.dtypes.sql import DB_FLAVORS_CAST_DTYPES 1252 if 'int' in typ.lower() or typ.lower() in ('numeric', 'number'): 1253 return '-987654321' 1254 if 'bool' in typ.lower(): 1255 bool_typ = ( 1256 PD_TO_DB_DTYPES_FLAVORS 1257 .get('bool', {}) 1258 .get(flavor, PD_TO_DB_DTYPES_FLAVORS['bool']['default']) 1259 ) 1260 if flavor in DB_FLAVORS_CAST_DTYPES: 1261 bool_typ = DB_FLAVORS_CAST_DTYPES[flavor].get(bool_typ, bool_typ) 1262 val_to_cast = ( 1263 -987654321 1264 if flavor in ('mysql', 'mariadb', 'sqlite', 'mssql') 1265 else 0 1266 ) 1267 return f'CAST({val_to_cast} AS {bool_typ})' 1268 if 'time' in typ.lower() or 'date' in typ.lower(): 1269 return dateadd_str(flavor=flavor, begin='1900-01-01') 1270 if 'float' in typ.lower() or 'double' in typ.lower() or typ.lower() in ('decimal',): 1271 return '-987654321.0' 1272 if typ.lower() in ('uniqueidentifier', 'guid', 'uuid'): 1273 magic_val = 'DEADBEEF-ABBA-BABE-CAFE-DECAFC0FFEE5' 1274 if flavor == 'mssql': 1275 return f"CAST('{magic_val}' AS UNIQUEIDENTIFIER)" 1276 return f"'{magic_val}'" 1277 return ('n' if flavor == 'oracle' else '') + "'-987654321'" 1278 1279 1280def get_db_version(conn: 'SQLConnector', debug: bool = False) -> Union[str, None]: 1281 """ 1282 Fetch the database version if possible. 1283 """ 1284 version_name = sql_item_name('version', conn.flavor, None) 1285 version_query = version_queries.get( 1286 conn.flavor, 1287 version_queries['default'] 1288 ).format(version_name=version_name) 1289 return conn.value(version_query, debug=debug) 1290 1291 1292def get_rename_table_queries( 1293 old_table: str, 1294 new_table: str, 1295 flavor: str, 1296 schema: Optional[str] = None, 1297) -> List[str]: 1298 """ 1299 Return queries to alter a table's name. 1300 1301 Parameters 1302 ---------- 1303 old_table: str 1304 The unquoted name of the old table. 1305 1306 new_table: str 1307 The unquoted name of the new table. 1308 1309 flavor: str 1310 The database flavor to use for the query (e.g. `'mssql'`, `'postgresql'`. 1311 1312 schema: Optional[str], default None 1313 The schema on which the table resides. 1314 1315 Returns 1316 ------- 1317 A list of `ALTER TABLE` or equivalent queries for the database flavor. 1318 """ 1319 old_table_name = sql_item_name(old_table, flavor, schema) 1320 new_table_name = sql_item_name(new_table, flavor, None) 1321 tmp_table = '_tmp_rename_' + new_table 1322 tmp_table_name = sql_item_name(tmp_table, flavor, schema) 1323 if flavor == 'mssql': 1324 return [f"EXEC sp_rename '{old_table}', '{new_table}'"] 1325 1326 if_exists_str = "IF EXISTS" if flavor in DROP_IF_EXISTS_FLAVORS else "" 1327 if flavor == 'duckdb': 1328 return [ 1329 get_create_table_query(f"SELECT * FROM {old_table_name}", tmp_table, 'duckdb', schema), 1330 get_create_table_query(f"SELECT * FROM {tmp_table_name}", new_table, 'duckdb', schema), 1331 f"DROP TABLE {if_exists_str} {tmp_table_name}", 1332 f"DROP TABLE {if_exists_str} {old_table_name}", 1333 ] 1334 1335 return [f"ALTER TABLE {old_table_name} RENAME TO {new_table_name}"] 1336 1337 1338def get_create_table_query( 1339 query: str, 1340 new_table: str, 1341 flavor: str, 1342 schema: Optional[str] = None, 1343) -> str: 1344 """ 1345 Return a query to create a new table from a `SELECT` query. 1346 1347 Parameters 1348 ---------- 1349 query: str 1350 The select query to use for the creation of the table. 1351 1352 new_table: str 1353 The unquoted name of the new table. 1354 1355 flavor: str 1356 The database flavor to use for the query (e.g. `'mssql'`, `'postgresql'`. 1357 1358 schema: Optional[str], default None 1359 The schema on which the table will reside. 1360 1361 Returns 1362 ------- 1363 A `CREATE TABLE` (or `SELECT INTO`) query for the database flavor. 1364 """ 1365 import textwrap 1366 create_cte = 'create_query' 1367 create_cte_name = sql_item_name(create_cte, flavor, None) 1368 new_table_name = sql_item_name(new_table, flavor, schema) 1369 if flavor in ('mssql',): 1370 query = query.lstrip() 1371 if 'with ' in query.lower(): 1372 final_select_ix = query.lower().rfind('select') 1373 return ( 1374 query[:final_select_ix].rstrip() + ',\n' 1375 + f"{create_cte_name} AS (\n" 1376 + query[final_select_ix:] 1377 + "\n)\n" 1378 + f"SELECT *\nINTO {new_table_name}\nFROM {create_cte_name}" 1379 ) 1380 1381 create_table_query = f""" 1382 SELECT * 1383 INTO {new_table_name} 1384 FROM ({query}) AS {create_cte_name} 1385 """ 1386 elif flavor in (None,): 1387 create_table_query = f""" 1388 WITH {create_cte_name} AS ({query}) 1389 CREATE TABLE {new_table_name} AS 1390 SELECT * 1391 FROM {create_cte_name} 1392 """ 1393 elif flavor in ('sqlite', 'mysql', 'mariadb', 'duckdb', 'oracle'): 1394 create_table_query = f""" 1395 CREATE TABLE {new_table_name} AS 1396 SELECT * 1397 FROM ({query})""" + (f""" AS {create_cte_name}""" if flavor != 'oracle' else '') + """ 1398 """ 1399 else: 1400 create_table_query = f""" 1401 SELECT * 1402 INTO {new_table_name} 1403 FROM ({query}) AS {create_cte_name} 1404 """ 1405 1406 return textwrap.dedent(create_table_query) 1407 1408 1409def wrap_query_with_cte( 1410 sub_query: str, 1411 parent_query: str, 1412 flavor: str, 1413 cte_name: str = "src", 1414) -> str: 1415 """ 1416 Wrap a subquery in a CTE and append an encapsulating query. 1417 1418 Parameters 1419 ---------- 1420 sub_query: str 1421 The query to be referenced. This may itself contain CTEs. 1422 Unless `cte_name` is provided, this will be aliased as `src`. 1423 1424 parent_query: str 1425 The larger query to append which references the subquery. 1426 This must not contain CTEs. 1427 1428 flavor: str 1429 The database flavor, e.g. `'mssql'`. 1430 1431 cte_name: str, default 'src' 1432 The CTE alias, defaults to `src`. 1433 1434 Returns 1435 ------- 1436 An encapsulating query which allows you to treat `sub_query` as a temporary table. 1437 1438 Examples 1439 -------- 1440 1441 ```python 1442 from meerschaum.utils.sql import wrap_query_with_cte 1443 sub_query = "WITH foo AS (SELECT 1 AS val) SELECT (val * 2) AS newval FROM foo" 1444 parent_query = "SELECT newval * 3 FROM src" 1445 query = wrap_query_with_cte(sub_query, parent_query, 'mssql') 1446 print(query) 1447 # WITH foo AS (SELECT 1 AS val), 1448 # [src] AS ( 1449 # SELECT (val * 2) AS newval FROM foo 1450 # ) 1451 # SELECT newval * 3 FROM src 1452 ``` 1453 1454 """ 1455 sub_query = sub_query.lstrip() 1456 cte_name_quoted = sql_item_name(cte_name, flavor, None) 1457 1458 if flavor in NO_CTE_FLAVORS: 1459 return ( 1460 parent_query 1461 .replace(cte_name_quoted, '--MRSM_SUBQUERY--') 1462 .replace(cte_name, '--MRSM_SUBQUERY--') 1463 .replace('--MRSM_SUBQUERY--', f"(\n{sub_query}\n) AS {cte_name_quoted}") 1464 ) 1465 1466 if 'with ' in sub_query.lower(): 1467 final_select_ix = sub_query.lower().rfind('select') 1468 return ( 1469 sub_query[:final_select_ix].rstrip() + ',\n' 1470 + f"{cte_name_quoted} AS (\n" 1471 + ' ' + sub_query[final_select_ix:] 1472 + "\n)\n" 1473 + parent_query 1474 ) 1475 1476 return ( 1477 f"WITH {cte_name_quoted} AS (\n" 1478 f" {sub_query}\n" 1479 f")\n{parent_query}" 1480 ) 1481 1482 1483def format_cte_subquery( 1484 sub_query: str, 1485 flavor: str, 1486 sub_name: str = 'src', 1487 cols_to_select: Union[List[str], str] = '*', 1488) -> str: 1489 """ 1490 Given a subquery, build a wrapper query that selects from the CTE subquery. 1491 1492 Parameters 1493 ---------- 1494 sub_query: str 1495 The subquery to wrap. 1496 1497 flavor: str 1498 The database flavor to use for the query (e.g. `'mssql'`, `'postgresql'`. 1499 1500 sub_name: str, default 'src' 1501 If possible, give this name to the CTE (must be unquoted). 1502 1503 cols_to_select: Union[List[str], str], default '' 1504 If specified, choose which columns to select from the CTE. 1505 If a list of strings is provided, each item will be quoted and joined with commas. 1506 If a string is given, assume it is quoted and insert it into the query. 1507 1508 Returns 1509 ------- 1510 A wrapper query that selects from the CTE. 1511 """ 1512 quoted_sub_name = sql_item_name(sub_name, flavor, None) 1513 cols_str = ( 1514 cols_to_select 1515 if isinstance(cols_to_select, str) 1516 else ', '.join([sql_item_name(col, flavor, None) for col in cols_to_select]) 1517 ) 1518 parent_query = ( 1519 f"SELECT {cols_str}\n" 1520 f"FROM {quoted_sub_name}" 1521 ) 1522 return wrap_query_with_cte(sub_query, parent_query, flavor, cte_name=sub_name) 1523 1524 1525def session_execute( 1526 session: 'sqlalchemy.orm.session.Session', 1527 queries: Union[List[str], str], 1528 with_results: bool = False, 1529 debug: bool = False, 1530) -> Union[mrsm.SuccessTuple, Tuple[mrsm.SuccessTuple, List['sqlalchemy.sql.ResultProxy']]]: 1531 """ 1532 Similar to `SQLConnector.exec_queries()`, execute a list of queries 1533 and roll back when one fails. 1534 1535 Parameters 1536 ---------- 1537 session: sqlalchemy.orm.session.Session 1538 A SQLAlchemy session representing a transaction. 1539 1540 queries: Union[List[str], str] 1541 A query or list of queries to be executed. 1542 If a query fails, roll back the session. 1543 1544 with_results: bool, default False 1545 If `True`, return a list of result objects. 1546 1547 Returns 1548 ------- 1549 A `SuccessTuple` indicating the queries were successfully executed. 1550 If `with_results`, return the `SuccessTuple` and a list of results. 1551 """ 1552 sqlalchemy = mrsm.attempt_import('sqlalchemy') 1553 if not isinstance(queries, list): 1554 queries = [queries] 1555 successes, msgs, results = [], [], [] 1556 for query in queries: 1557 query_text = sqlalchemy.text(query) 1558 fail_msg = "Failed to execute queries." 1559 try: 1560 result = session.execute(query_text) 1561 query_success = result is not None 1562 query_msg = "Success" if query_success else fail_msg 1563 except Exception as e: 1564 query_success = False 1565 query_msg = f"{fail_msg}\n{e}" 1566 result = None 1567 successes.append(query_success) 1568 msgs.append(query_msg) 1569 results.append(result) 1570 if not query_success: 1571 session.rollback() 1572 break 1573 success, msg = all(successes), '\n'.join(msgs) 1574 if with_results: 1575 return (success, msg), results 1576 return success, msg
278def clean(substring: str) -> str: 279 """ 280 Ensure a substring is clean enough to be inserted into a SQL query. 281 Raises an exception when banned words are used. 282 """ 283 from meerschaum.utils.warnings import error 284 banned_symbols = [';', '--', 'drop ',] 285 for symbol in banned_symbols: 286 if symbol in str(substring).lower(): 287 error(f"Invalid string: '{substring}'")
Ensure a substring is clean enough to be inserted into a SQL query. Raises an exception when banned words are used.
290def dateadd_str( 291 flavor: str = 'postgresql', 292 datepart: str = 'day', 293 number: Union[int, float] = 0, 294 begin: Union[str, datetime, int] = 'now' 295) -> str: 296 """ 297 Generate a `DATEADD` clause depending on database flavor. 298 299 Parameters 300 ---------- 301 flavor: str, default `'postgresql'` 302 SQL database flavor, e.g. `'postgresql'`, `'sqlite'`. 303 304 Currently supported flavors: 305 306 - `'postgresql'` 307 - `'timescaledb'` 308 - `'citus'` 309 - `'cockroachdb'` 310 - `'duckdb'` 311 - `'mssql'` 312 - `'mysql'` 313 - `'mariadb'` 314 - `'sqlite'` 315 - `'oracle'` 316 317 datepart: str, default `'day'` 318 Which part of the date to modify. Supported values: 319 320 - `'year'` 321 - `'month'` 322 - `'day'` 323 - `'hour'` 324 - `'minute'` 325 - `'second'` 326 327 number: Union[int, float], default `0` 328 How many units to add to the date part. 329 330 begin: Union[str, datetime], default `'now'` 331 Base datetime to which to add dateparts. 332 333 Returns 334 ------- 335 The appropriate `DATEADD` string for the corresponding database flavor. 336 337 Examples 338 -------- 339 >>> dateadd_str( 340 ... flavor = 'mssql', 341 ... begin = datetime(2022, 1, 1, 0, 0), 342 ... number = 1, 343 ... ) 344 "DATEADD(day, 1, CAST('2022-01-01 00:00:00' AS DATETIME))" 345 >>> dateadd_str( 346 ... flavor = 'postgresql', 347 ... begin = datetime(2022, 1, 1, 0, 0), 348 ... number = 1, 349 ... ) 350 "CAST('2022-01-01 00:00:00' AS TIMESTAMP) + INTERVAL '1 day'" 351 352 """ 353 from meerschaum.utils.debug import dprint 354 from meerschaum.utils.packages import attempt_import 355 from meerschaum.utils.warnings import error 356 dateutil_parser = attempt_import('dateutil.parser') 357 if 'int' in str(type(begin)).lower(): 358 return str(begin) 359 if not begin: 360 return '' 361 362 _original_begin = begin 363 begin_time = None 364 ### Sanity check: make sure `begin` is a valid datetime before we inject anything. 365 if not isinstance(begin, datetime): 366 try: 367 begin_time = dateutil_parser.parse(begin) 368 except Exception: 369 begin_time = None 370 else: 371 begin_time = begin 372 373 ### Unable to parse into a datetime. 374 if begin_time is None: 375 ### Throw an error if banned symbols are included in the `begin` string. 376 clean(str(begin)) 377 ### If begin is a valid datetime, wrap it in quotes. 378 else: 379 if isinstance(begin, datetime) and begin.tzinfo is not None: 380 begin = begin.astimezone(timezone.utc) 381 begin = ( 382 f"'{begin.replace(tzinfo=None)}'" 383 if isinstance(begin, datetime) 384 else f"'{begin}'" 385 ) 386 387 da = "" 388 if flavor in ('postgresql', 'timescaledb', 'cockroachdb', 'citus'): 389 begin = ( 390 f"CAST({begin} AS TIMESTAMP)" if begin != 'now' 391 else "CAST(NOW() AT TIME ZONE 'utc' AS TIMESTAMP)" 392 ) 393 da = begin + (f" + INTERVAL '{number} {datepart}'" if number != 0 else '') 394 395 elif flavor == 'duckdb': 396 begin = f"CAST({begin} AS TIMESTAMP)" if begin != 'now' else 'NOW()' 397 da = begin + (f" + INTERVAL '{number} {datepart}'" if number != 0 else '') 398 399 elif flavor in ('mssql',): 400 if begin_time and begin_time.microsecond != 0: 401 begin = begin[:-4] + "'" 402 begin = f"CAST({begin} AS DATETIME)" if begin != 'now' else 'GETUTCDATE()' 403 da = f"DATEADD({datepart}, {number}, {begin})" if number != 0 else begin 404 405 elif flavor in ('mysql', 'mariadb'): 406 begin = f"CAST({begin} AS DATETIME(6))" if begin != 'now' else 'UTC_TIMESTAMP(6)' 407 da = (f"DATE_ADD({begin}, INTERVAL {number} {datepart})" if number != 0 else begin) 408 409 elif flavor == 'sqlite': 410 da = f"datetime({begin}, '{number} {datepart}')" 411 412 elif flavor == 'oracle': 413 if begin == 'now': 414 begin = str( 415 datetime.now(timezone.utc).replace(tzinfo=None).strftime('%Y:%m:%d %M:%S.%f') 416 ) 417 elif begin_time: 418 begin = str(begin_time.strftime('%Y-%m-%d %H:%M:%S.%f')) 419 dt_format = 'YYYY-MM-DD HH24:MI:SS.FF' 420 _begin = f"'{begin}'" if begin_time else begin 421 da = ( 422 (f"TO_TIMESTAMP({_begin}, '{dt_format}')" if begin_time else _begin) 423 + (f" + INTERVAL '{number}' {datepart}" if number != 0 else "") 424 ) 425 return da
Generate a DATEADD
clause depending on database flavor.
Parameters
flavor (str, default
'postgresql'
): SQL database flavor, e.g.'postgresql'
,'sqlite'
.Currently supported flavors:
'postgresql'
'timescaledb'
'citus'
'cockroachdb'
'duckdb'
'mssql'
'mysql'
'mariadb'
'sqlite'
'oracle'
datepart (str, default
'day'
): Which part of the date to modify. Supported values:'year'
'month'
'day'
'hour'
'minute'
'second'
- number (Union[int, float], default
0
): How many units to add to the date part. - begin (Union[str, datetime], default
'now'
): Base datetime to which to add dateparts.
Returns
- The appropriate
DATEADD
string for the corresponding database flavor.
Examples
>>> dateadd_str(
... flavor = 'mssql',
... begin = datetime(2022, 1, 1, 0, 0),
... number = 1,
... )
"DATEADD(day, 1, CAST('2022-01-01 00:00:00' AS DATETIME))"
>>> dateadd_str(
... flavor = 'postgresql',
... begin = datetime(2022, 1, 1, 0, 0),
... number = 1,
... )
"CAST('2022-01-01 00:00:00' AS TIMESTAMP) + INTERVAL '1 day'"
428def test_connection( 429 self, 430 **kw: Any 431 ) -> Union[bool, None]: 432 """ 433 Test if a successful connection to the database may be made. 434 435 Parameters 436 ---------- 437 **kw: 438 The keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`. 439 440 Returns 441 ------- 442 `True` if a connection is made, otherwise `False` or `None` in case of failure. 443 444 """ 445 import warnings 446 from meerschaum.connectors.poll import retry_connect 447 _default_kw = {'max_retries': 1, 'retry_wait': 0, 'warn': False, 'connector': self} 448 _default_kw.update(kw) 449 with warnings.catch_warnings(): 450 warnings.filterwarnings('ignore', 'Could not') 451 try: 452 return retry_connect(**_default_kw) 453 except Exception as e: 454 return False
Test if a successful connection to the database may be made.
Parameters
- **kw:: The keyword arguments are passed to
meerschaum.connectors.poll.retry_connect
.
Returns
True
if a connection is made, otherwiseFalse
orNone
in case of failure.
457def get_distinct_col_count( 458 col: str, 459 query: str, 460 connector: Optional[mrsm.connectors.sql.SQLConnector] = None, 461 debug: bool = False 462 ) -> Optional[int]: 463 """ 464 Returns the number of distinct items in a column of a SQL query. 465 466 Parameters 467 ---------- 468 col: str: 469 The column in the query to count. 470 471 query: str: 472 The SQL query to count from. 473 474 connector: Optional[mrsm.connectors.sql.SQLConnector], default None: 475 The SQLConnector to execute the query. 476 477 debug: bool, default False: 478 Verbosity toggle. 479 480 Returns 481 ------- 482 An `int` of the number of columns in the query or `None` if the query fails. 483 484 """ 485 if connector is None: 486 connector = mrsm.get_connector('sql') 487 488 _col_name = sql_item_name(col, connector.flavor, None) 489 490 _meta_query = ( 491 f""" 492 WITH src AS ( {query} ), 493 dist AS ( SELECT DISTINCT {_col_name} FROM src ) 494 SELECT COUNT(*) FROM dist""" 495 ) if connector.flavor not in ('mysql', 'mariadb') else ( 496 f""" 497 SELECT COUNT(*) 498 FROM ( 499 SELECT DISTINCT {_col_name} 500 FROM ({query}) AS src 501 ) AS dist""" 502 ) 503 504 result = connector.value(_meta_query, debug=debug) 505 try: 506 return int(result) 507 except Exception as e: 508 return None
Returns the number of distinct items in a column of a SQL query.
Parameters
- col (str:): The column in the query to count.
- query (str:): The SQL query to count from.
- connector (Optional[mrsm.connectors.sql.SQLConnector], default None:): The SQLConnector to execute the query.
- debug (bool, default False:): Verbosity toggle.
Returns
- An
int
of the number of columns in the query orNone
if the query fails.
511def sql_item_name(item: str, flavor: str, schema: Optional[str] = None) -> str: 512 """ 513 Parse SQL items depending on the flavor. 514 515 Parameters 516 ---------- 517 item: str : 518 The database item (table, view, etc.) in need of quotes. 519 520 flavor: str : 521 The database flavor (`'postgresql'`, `'mssql'`, `'sqllite'`, etc.). 522 523 Returns 524 ------- 525 A `str` which contains the input `item` wrapped in the corresponding escape characters. 526 527 Examples 528 -------- 529 >>> sql_item_name('table', 'sqlite') 530 '"table"' 531 >>> sql_item_name('table', 'mssql') 532 "[table]" 533 >>> sql_item_name('table', 'postgresql', schema='abc') 534 '"abc"."table"' 535 536 """ 537 truncated_item = truncate_item_name(str(item), flavor) 538 if flavor == 'oracle': 539 truncated_item = pg_capital(truncated_item) 540 ### NOTE: System-reserved words must be quoted. 541 if truncated_item.lower() in ( 542 'float', 'varchar', 'nvarchar', 'clob', 543 'boolean', 'integer', 'table', 544 ): 545 wrappers = ('"', '"') 546 else: 547 wrappers = ('', '') 548 else: 549 wrappers = table_wrappers.get(flavor, table_wrappers['default']) 550 551 ### NOTE: SQLite does not support schemas. 552 if flavor == 'sqlite': 553 schema = None 554 555 schema_prefix = ( 556 (wrappers[0] + schema + wrappers[1] + '.') 557 if schema is not None 558 else '' 559 ) 560 561 return schema_prefix + wrappers[0] + truncated_item + wrappers[1]
Parse SQL items depending on the flavor.
Parameters
- item (str :): The database item (table, view, etc.) in need of quotes.
- flavor (str :):
The database flavor (
'postgresql'
,'mssql'
,'sqllite'
, etc.).
Returns
- A
str
which contains the inputitem
wrapped in the corresponding escape characters.
Examples
>>> sql_item_name('table', 'sqlite')
'"table"'
>>> sql_item_name('table', 'mssql')
"[table]"
>>> sql_item_name('table', 'postgresql', schema='abc')
'"abc"."table"'
564def pg_capital(s: str) -> str: 565 """ 566 If string contains a capital letter, wrap it in double quotes. 567 568 Parameters 569 ---------- 570 s: str : 571 The string to be escaped. 572 573 Returns 574 ------- 575 The input string wrapped in quotes only if it needs them. 576 577 Examples 578 -------- 579 >>> pg_capital("My Table") 580 '"My Table"' 581 >>> pg_capital('my_table') 582 'my_table' 583 584 """ 585 if '"' in s: 586 return s 587 needs_quotes = s.startswith('_') 588 for c in str(s): 589 if ord(c) < ord('a') or ord(c) > ord('z'): 590 if not c.isdigit() and c != '_': 591 needs_quotes = True 592 break 593 if needs_quotes: 594 return '"' + s + '"' 595 return s
If string contains a capital letter, wrap it in double quotes.
Parameters
s (str :):
The string to be escaped.
Returns
- The input string wrapped in quotes only if it needs them.
Examples
>>> pg_capital("My Table")
'"My Table"'
>>> pg_capital('my_table')
'my_table'
598def oracle_capital(s: str) -> str: 599 """ 600 Capitalize the string of an item on an Oracle database. 601 """ 602 return s
Capitalize the string of an item on an Oracle database.
605def truncate_item_name(item: str, flavor: str) -> str: 606 """ 607 Truncate item names to stay within the database flavor's character limit. 608 609 Parameters 610 ---------- 611 item: str 612 The database item being referenced. This string is the "canonical" name internally. 613 614 flavor: str 615 The flavor of the database on which `item` resides. 616 617 Returns 618 ------- 619 The truncated string. 620 """ 621 from meerschaum.utils.misc import truncate_string_sections 622 return truncate_string_sections( 623 item, max_len=max_name_lens.get(flavor, max_name_lens['default']) 624 )
Truncate item names to stay within the database flavor's character limit.
Parameters
- item (str): The database item being referenced. This string is the "canonical" name internally.
- flavor (str):
The flavor of the database on which
item
resides.
Returns
- The truncated string.
627def build_where( 628 params: Dict[str, Any], 629 connector: Optional[meerschaum.connectors.sql.SQLConnector] = None, 630 with_where: bool = True, 631 ) -> str: 632 """ 633 Build the `WHERE` clause based on the input criteria. 634 635 Parameters 636 ---------- 637 params: Dict[str, Any]: 638 The keywords dictionary to convert into a WHERE clause. 639 If a value is a string which begins with an underscore, negate that value 640 (e.g. `!=` instead of `=` or `NOT IN` instead of `IN`). 641 A value of `_None` will be interpreted as `IS NOT NULL`. 642 643 connector: Optional[meerschaum.connectors.sql.SQLConnector], default None: 644 The Meerschaum SQLConnector that will be executing the query. 645 The connector is used to extract the SQL dialect. 646 647 with_where: bool, default True: 648 If `True`, include the leading `'WHERE'` string. 649 650 Returns 651 ------- 652 A `str` of the `WHERE` clause from the input `params` dictionary for the connector's flavor. 653 654 Examples 655 -------- 656 ``` 657 >>> print(build_where({'foo': [1, 2, 3]})) 658 659 WHERE 660 "foo" IN ('1', '2', '3') 661 ``` 662 """ 663 import json 664 from meerschaum.config.static import STATIC_CONFIG 665 from meerschaum.utils.warnings import warn 666 from meerschaum.utils.dtypes import value_is_null, none_if_null 667 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 668 try: 669 params_json = json.dumps(params) 670 except Exception as e: 671 params_json = str(params) 672 bad_words = ['drop ', '--', ';'] 673 for word in bad_words: 674 if word in params_json.lower(): 675 warn(f"Aborting build_where() due to possible SQL injection.") 676 return '' 677 678 if connector is None: 679 from meerschaum import get_connector 680 connector = get_connector('sql') 681 where = "" 682 leading_and = "\n AND " 683 for key, value in params.items(): 684 _key = sql_item_name(key, connector.flavor, None) 685 ### search across a list (i.e. IN syntax) 686 if isinstance(value, Iterable) and not isinstance(value, (dict, str)): 687 includes = [ 688 none_if_null(item) 689 for item in value 690 if not str(item).startswith(negation_prefix) 691 ] 692 null_includes = [item for item in includes if item is None] 693 not_null_includes = [item for item in includes if item is not None] 694 excludes = [ 695 none_if_null(str(item)[len(negation_prefix):]) 696 for item in value 697 if str(item).startswith(negation_prefix) 698 ] 699 null_excludes = [item for item in excludes if item is None] 700 not_null_excludes = [item for item in excludes if item is not None] 701 702 if includes: 703 where += f"{leading_and}(" 704 if not_null_includes: 705 where += f"{_key} IN (" 706 for item in not_null_includes: 707 quoted_item = str(item).replace("'", "''") 708 where += f"'{quoted_item}', " 709 where = where[:-2] + ")" 710 if null_includes: 711 where += ("\n OR " if not_null_includes else "") + f"{_key} IS NULL" 712 if includes: 713 where += ")" 714 715 if excludes: 716 where += f"{leading_and}(" 717 if not_null_excludes: 718 where += f"{_key} NOT IN (" 719 for item in not_null_excludes: 720 quoted_item = str(item).replace("'", "''") 721 where += f"'{quoted_item}', " 722 where = where[:-2] + ")" 723 if null_excludes: 724 where += ("\n AND " if not_null_excludes else "") + f"{_key} IS NOT NULL" 725 if excludes: 726 where += ")" 727 728 continue 729 730 ### search a dictionary 731 elif isinstance(value, dict): 732 import json 733 where += (f"{leading_and}CAST({_key} AS TEXT) = '" + json.dumps(value) + "'") 734 continue 735 736 eq_sign = '=' 737 is_null = 'IS NULL' 738 if value_is_null(str(value).lstrip(negation_prefix)): 739 value = ( 740 (negation_prefix + 'None') 741 if str(value).startswith(negation_prefix) 742 else None 743 ) 744 if str(value).startswith(negation_prefix): 745 value = str(value)[len(negation_prefix):] 746 eq_sign = '!=' 747 if value_is_null(value): 748 value = None 749 is_null = 'IS NOT NULL' 750 quoted_value = str(value).replace("'", "''") 751 where += ( 752 f"{leading_and}{_key} " 753 + (is_null if value is None else f"{eq_sign} '{quoted_value}'") 754 ) 755 756 if len(where) > 1: 757 where = ("\nWHERE\n " if with_where else '') + where[len(leading_and):] 758 return where
Build the WHERE
clause based on the input criteria.
Parameters
- params (Dict[str, Any]:):
The keywords dictionary to convert into a WHERE clause.
If a value is a string which begins with an underscore, negate that value
(e.g.
!=
instead of=
orNOT IN
instead ofIN
). A value of_None
will be interpreted asIS NOT NULL
. - connector (Optional[meerschaum.connectors.sql.SQLConnector], default None:): The Meerschaum SQLConnector that will be executing the query. The connector is used to extract the SQL dialect.
- with_where (bool, default True:):
If
True
, include the leading'WHERE'
string.
Returns
- A
str
of theWHERE
clause from the inputparams
dictionary for the connector's flavor.
Examples
>>> print(build_where({'foo': [1, 2, 3]}))
WHERE
"foo" IN ('1', '2', '3')
761def table_exists( 762 table: str, 763 connector: mrsm.connectors.sql.SQLConnector, 764 schema: Optional[str] = None, 765 debug: bool = False, 766) -> bool: 767 """Check if a table exists. 768 769 Parameters 770 ---------- 771 table: str: 772 The name of the table in question. 773 774 connector: mrsm.connectors.sql.SQLConnector 775 The connector to the database which holds the table. 776 777 schema: Optional[str], default None 778 Optionally specify the table schema. 779 Defaults to `connector.schema`. 780 781 debug: bool, default False : 782 Verbosity toggle. 783 784 Returns 785 ------- 786 A `bool` indicating whether or not the table exists on the database. 787 788 """ 789 sqlalchemy = mrsm.attempt_import('sqlalchemy') 790 schema = schema or connector.schema 791 insp = sqlalchemy.inspect(connector.engine) 792 truncated_table_name = truncate_item_name(str(table), connector.flavor) 793 return insp.has_table(truncated_table_name, schema=schema)
Check if a table exists.
Parameters
- table (str:): The name of the table in question.
- connector (mrsm.connectors.sql.SQLConnector): The connector to the database which holds the table.
- schema (Optional[str], default None):
Optionally specify the table schema.
Defaults to
connector.schema
. - debug (bool, default False :): Verbosity toggle.
Returns
- A
bool
indicating whether or not the table exists on the database.
796def get_sqlalchemy_table( 797 table: str, 798 connector: Optional[mrsm.connectors.sql.SQLConnector] = None, 799 schema: Optional[str] = None, 800 refresh: bool = False, 801 debug: bool = False, 802) -> Union['sqlalchemy.Table', None]: 803 """ 804 Construct a SQLAlchemy table from its name. 805 806 Parameters 807 ---------- 808 table: str 809 The name of the table on the database. Does not need to be escaped. 810 811 connector: Optional[meerschaum.connectors.sql.SQLConnector], default None: 812 The connector to the database which holds the table. 813 814 schema: Optional[str], default None 815 Specify on which schema the table resides. 816 Defaults to the schema set in `connector`. 817 818 refresh: bool, default False 819 If `True`, rebuild the cached table object. 820 821 debug: bool, default False: 822 Verbosity toggle. 823 824 Returns 825 ------- 826 A `sqlalchemy.Table` object for the table. 827 828 """ 829 if connector is None: 830 from meerschaum import get_connector 831 connector = get_connector('sql') 832 833 if connector.flavor == 'duckdb': 834 return None 835 836 from meerschaum.connectors.sql.tables import get_tables 837 from meerschaum.utils.packages import attempt_import 838 from meerschaum.utils.warnings import warn 839 if refresh: 840 connector.metadata.clear() 841 tables = get_tables(mrsm_instance=connector, debug=debug, create=False) 842 sqlalchemy = attempt_import('sqlalchemy') 843 truncated_table_name = truncate_item_name(str(table), connector.flavor) 844 table_kwargs = { 845 'autoload_with': connector.engine, 846 } 847 if schema: 848 table_kwargs['schema'] = schema 849 850 if refresh or truncated_table_name not in tables: 851 try: 852 tables[truncated_table_name] = sqlalchemy.Table( 853 truncated_table_name, 854 connector.metadata, 855 **table_kwargs 856 ) 857 except sqlalchemy.exc.NoSuchTableError as e: 858 warn(f"Table '{truncated_table_name}' does not exist in '{connector}'.") 859 return None 860 return tables[truncated_table_name]
Construct a SQLAlchemy table from its name.
Parameters
- table (str): The name of the table on the database. Does not need to be escaped.
- connector (Optional[meerschaum.connectors.sql.SQLConnector], default None:): The connector to the database which holds the table.
- schema (Optional[str], default None):
Specify on which schema the table resides.
Defaults to the schema set in
connector
. - refresh (bool, default False):
If
True
, rebuild the cached table object. - debug (bool, default False:): Verbosity toggle.
Returns
- A
sqlalchemy.Table
object for the table.
863def get_table_cols_types( 864 table: str, 865 connectable: Union[ 866 'mrsm.connectors.sql.SQLConnector', 867 'sqlalchemy.orm.session.Session', 868 'sqlalchemy.engine.base.Engine' 869 ], 870 flavor: Optional[str] = None, 871 schema: Optional[str] = None, 872 database: Optional[str] = None, 873 debug: bool = False, 874) -> Dict[str, str]: 875 """ 876 Return a dictionary mapping a table's columns to data types. 877 This is useful for inspecting tables creating during a not-yet-committed session. 878 879 NOTE: This may return incorrect columns if the schema is not explicitly stated. 880 Use this function if you are confident the table name is unique or if you have 881 and explicit schema. 882 To use the configured schema, get the columns from `get_sqlalchemy_table()` instead. 883 884 Parameters 885 ---------- 886 table: str 887 The name of the table (unquoted). 888 889 connectable: Union[ 890 'mrsm.connectors.sql.SQLConnector', 891 'sqlalchemy.orm.session.Session', 892 ] 893 The connection object used to fetch the columns and types. 894 895 flavor: Optional[str], default None 896 The database dialect flavor to use for the query. 897 If omitted, default to `connectable.flavor`. 898 899 schema: Optional[str], default None 900 If provided, restrict the query to this schema. 901 902 database: Optional[str]. default None 903 If provided, restrict the query to this database. 904 905 Returns 906 ------- 907 A dictionary mapping column names to data types. 908 """ 909 from meerschaum.connectors import SQLConnector 910 sqlalchemy = mrsm.attempt_import('sqlalchemy') 911 flavor = flavor or getattr(connectable, 'flavor', None) 912 if not flavor: 913 raise ValueError("Please provide a database flavor.") 914 if flavor == 'duckdb' and not isinstance(connectable, SQLConnector): 915 raise ValueError("You must provide a SQLConnector when using DuckDB.") 916 if flavor in NO_SCHEMA_FLAVORS: 917 schema = None 918 if schema is None: 919 schema = DEFAULT_SCHEMA_FLAVORS.get(flavor, None) 920 if flavor in ('sqlite', 'duckdb', 'oracle'): 921 database = None 922 table_trunc = truncate_item_name(table, flavor=flavor) 923 table_lower = table.lower() 924 table_upper = table.upper() 925 table_lower_trunc = truncate_item_name(table_lower, flavor=flavor) 926 table_upper_trunc = truncate_item_name(table_upper, flavor=flavor) 927 db_prefix = ( 928 "tempdb." 929 if flavor == 'mssql' and table.startswith('#') 930 else "" 931 ) 932 933 cols_types_query = sqlalchemy.text( 934 columns_types_queries.get( 935 flavor, 936 columns_types_queries['default'] 937 ).format( 938 table=table, 939 table_trunc=table_trunc, 940 table_lower=table_lower, 941 table_lower_trunc=table_lower_trunc, 942 table_upper=table_upper, 943 table_upper_trunc=table_upper_trunc, 944 db_prefix=db_prefix, 945 ) 946 ) 947 948 cols = ['database', 'schema', 'table', 'column', 'type'] 949 result_cols_ix = dict(enumerate(cols)) 950 951 debug_kwargs = {'debug': debug} if isinstance(connectable, SQLConnector) else {} 952 if not debug_kwargs and debug: 953 dprint(cols_types_query) 954 955 try: 956 result_rows = ( 957 [ 958 row 959 for row in connectable.execute(cols_types_query, **debug_kwargs).fetchall() 960 ] 961 if flavor != 'duckdb' 962 else [ 963 tuple([doc[col] for col in cols]) 964 for doc in connectable.read(cols_types_query, debug=debug).to_dict(orient='records') 965 ] 966 ) 967 cols_types_docs = [ 968 { 969 result_cols_ix[i]: val 970 for i, val in enumerate(row) 971 } 972 for row in result_rows 973 ] 974 cols_types_docs_filtered = [ 975 doc 976 for doc in cols_types_docs 977 if ( 978 ( 979 not schema 980 or doc['schema'] == schema 981 ) 982 and 983 ( 984 not database 985 or doc['database'] == database 986 ) 987 ) 988 ] 989 990 ### NOTE: This may return incorrect columns if the schema is not explicitly stated. 991 if cols_types_docs and not cols_types_docs_filtered: 992 cols_types_docs_filtered = cols_types_docs 993 994 return { 995 ( 996 doc['column'] 997 if flavor != 'oracle' else ( 998 ( 999 doc['column'].lower() 1000 if (doc['column'].isupper() and doc['column'].replace('_', '').isalpha()) 1001 else doc['column'] 1002 ) 1003 ) 1004 ): doc['type'].upper() 1005 for doc in cols_types_docs_filtered 1006 } 1007 except Exception as e: 1008 warn(f"Failed to fetch columns for table '{table}':\n{e}") 1009 return {}
Return a dictionary mapping a table's columns to data types. This is useful for inspecting tables creating during a not-yet-committed session.
NOTE: This may return incorrect columns if the schema is not explicitly stated.
Use this function if you are confident the table name is unique or if you have
and explicit schema.
To use the configured schema, get the columns from get_sqlalchemy_table()
instead.
Parameters
- table (str): The name of the table (unquoted).
- connectable (Union[): 'mrsm.connectors.sql.SQLConnector', 'sqlalchemy.orm.session.Session',
- ]: The connection object used to fetch the columns and types.
- flavor (Optional[str], default None):
The database dialect flavor to use for the query.
If omitted, default to
connectable.flavor
. - schema (Optional[str], default None): If provided, restrict the query to this schema.
- database (Optional[str]. default None): If provided, restrict the query to this database.
Returns
- A dictionary mapping column names to data types.
1012def get_update_queries( 1013 target: str, 1014 patch: str, 1015 connectable: Union[ 1016 mrsm.connectors.sql.SQLConnector, 1017 'sqlalchemy.orm.session.Session' 1018 ], 1019 join_cols: Iterable[str], 1020 flavor: Optional[str] = None, 1021 upsert: bool = False, 1022 datetime_col: Optional[str] = None, 1023 schema: Optional[str] = None, 1024 patch_schema: Optional[str] = None, 1025 debug: bool = False, 1026) -> List[str]: 1027 """ 1028 Build a list of `MERGE`, `UPDATE`, `DELETE`/`INSERT` queries to apply a patch to target table. 1029 1030 Parameters 1031 ---------- 1032 target: str 1033 The name of the target table. 1034 1035 patch: str 1036 The name of the patch table. This should have the same shape as the target. 1037 1038 connectable: Union[meerschaum.connectors.sql.SQLConnector, sqlalchemy.orm.session.Session] 1039 The `SQLConnector` or SQLAlchemy session which will later execute the queries. 1040 1041 join_cols: List[str] 1042 The columns to use to join the patch to the target. 1043 1044 flavor: Optional[str], default None 1045 If using a SQLAlchemy session, provide the expected database flavor. 1046 1047 upsert: bool, default False 1048 If `True`, return an upsert query rather than an update. 1049 1050 datetime_col: Optional[str], default None 1051 If provided, bound the join query using this column as the datetime index. 1052 This must be present on both tables. 1053 1054 schema: Optional[str], default None 1055 If provided, use this schema when quoting the target table. 1056 Defaults to `connector.schema`. 1057 1058 patch_schema: Optional[str], default None 1059 If provided, use this schema when quoting the patch table. 1060 Defaults to `schema`. 1061 1062 debug: bool, default False 1063 Verbosity toggle. 1064 1065 Returns 1066 ------- 1067 A list of query strings to perform the update operation. 1068 """ 1069 from meerschaum.connectors import SQLConnector 1070 from meerschaum.utils.debug import dprint 1071 from meerschaum.utils.dtypes.sql import DB_FLAVORS_CAST_DTYPES 1072 flavor = flavor or (connectable.flavor if isinstance(connectable, SQLConnector) else None) 1073 if not flavor: 1074 raise ValueError("Provide a flavor if using a SQLAlchemy session.") 1075 if ( 1076 flavor == 'sqlite' 1077 and isinstance(connectable, SQLConnector) 1078 and connectable.db_version < '3.33.0' 1079 ): 1080 flavor = 'sqlite_delete_insert' 1081 flavor_key = (f'{flavor}-upsert' if upsert else flavor) 1082 base_queries = update_queries.get( 1083 flavor_key, 1084 update_queries['default'] 1085 ) 1086 if not isinstance(base_queries, list): 1087 base_queries = [base_queries] 1088 schema = schema or (connectable.schema if isinstance(connectable, SQLConnector) else None) 1089 patch_schema = patch_schema or schema 1090 target_table_columns = get_table_cols_types( 1091 target, 1092 connectable, 1093 flavor=flavor, 1094 schema=schema, 1095 debug=debug, 1096 ) 1097 patch_table_columns = get_table_cols_types( 1098 patch, 1099 connectable, 1100 flavor=flavor, 1101 schema=patch_schema, 1102 debug=debug, 1103 ) 1104 1105 patch_cols_str = ', '.join( 1106 [ 1107 sql_item_name(col, flavor) 1108 for col in patch_table_columns 1109 ] 1110 ) 1111 patch_cols_prefixed_str = ', '.join( 1112 [ 1113 'p.' + sql_item_name(col, flavor) 1114 for col in patch_table_columns 1115 ] 1116 ) 1117 1118 join_cols_str = ', '.join( 1119 [ 1120 sql_item_name(col, flavor) 1121 for col in join_cols 1122 ] 1123 ) 1124 1125 value_cols = [] 1126 join_cols_types = [] 1127 if debug: 1128 dprint("target_table_columns:") 1129 mrsm.pprint(target_table_columns) 1130 for c_name, c_type in target_table_columns.items(): 1131 if c_name not in patch_table_columns: 1132 continue 1133 if flavor in DB_FLAVORS_CAST_DTYPES: 1134 c_type = DB_FLAVORS_CAST_DTYPES[flavor].get(c_type.upper(), c_type) 1135 ( 1136 join_cols_types 1137 if c_name in join_cols 1138 else value_cols 1139 ).append((c_name, c_type)) 1140 if debug: 1141 dprint(f"value_cols: {value_cols}") 1142 1143 if not join_cols_types: 1144 return [] 1145 if not value_cols and not upsert: 1146 return [] 1147 1148 coalesce_join_cols_str = ', '.join( 1149 [ 1150 'COALESCE(' 1151 + sql_item_name(c_name, flavor) 1152 + ', ' 1153 + get_null_replacement(c_type, flavor) 1154 + ')' 1155 for c_name, c_type in join_cols_types 1156 ] 1157 ) 1158 1159 update_or_nothing = ('UPDATE' if value_cols else 'NOTHING') 1160 1161 def sets_subquery(l_prefix: str, r_prefix: str): 1162 if not value_cols: 1163 return '' 1164 return 'SET ' + ',\n'.join([ 1165 ( 1166 l_prefix + sql_item_name(c_name, flavor, None) 1167 + ' = ' 1168 + ('CAST(' if flavor != 'sqlite' else '') 1169 + r_prefix 1170 + sql_item_name(c_name, flavor, None) 1171 + (' AS ' if flavor != 'sqlite' else '') 1172 + (c_type.replace('_', ' ') if flavor != 'sqlite' else '') 1173 + (')' if flavor != 'sqlite' else '') 1174 ) for c_name, c_type in value_cols 1175 ]) 1176 1177 def and_subquery(l_prefix: str, r_prefix: str): 1178 return '\nAND\n'.join([ 1179 ( 1180 "COALESCE(" 1181 + l_prefix 1182 + sql_item_name(c_name, flavor, None) 1183 + ", " 1184 + get_null_replacement(c_type, flavor) 1185 + ")" 1186 + ' = ' 1187 + "COALESCE(" 1188 + r_prefix 1189 + sql_item_name(c_name, flavor, None) 1190 + ", " 1191 + get_null_replacement(c_type, flavor) 1192 + ")" 1193 ) for c_name, c_type in join_cols_types 1194 ]) 1195 1196 target_table_name = sql_item_name(target, flavor, schema) 1197 patch_table_name = sql_item_name(patch, flavor, patch_schema) 1198 dt_col_name = sql_item_name(datetime_col, flavor, None) if datetime_col else None 1199 date_bounds_subquery = ( 1200 f""" 1201 f.{dt_col_name} >= (SELECT MIN({dt_col_name}) FROM {patch_table_name}) 1202 AND f.{dt_col_name} <= (SELECT MAX({dt_col_name}) FROM {patch_table_name}) 1203 """ 1204 if datetime_col 1205 else "1 = 1" 1206 ) 1207 1208 ### NOTE: MSSQL upserts must exclude the update portion if only upserting indices. 1209 when_matched_update_sets_subquery_none = "" if not value_cols else ( 1210 "WHEN MATCHED THEN" 1211 f" UPDATE {sets_subquery('', 'p.')}" 1212 ) 1213 1214 return [ 1215 base_query.format( 1216 sets_subquery_none=sets_subquery('', 'p.'), 1217 sets_subquery_none_excluded=sets_subquery('', 'EXCLUDED.'), 1218 sets_subquery_f=sets_subquery('f.', 'p.'), 1219 and_subquery_f=and_subquery('p.', 'f.'), 1220 and_subquery_t=and_subquery('p.', 't.'), 1221 target_table_name=target_table_name, 1222 patch_table_name=patch_table_name, 1223 patch_cols_str=patch_cols_str, 1224 patch_cols_prefixed_str=patch_cols_prefixed_str, 1225 date_bounds_subquery=date_bounds_subquery, 1226 join_cols_str=join_cols_str, 1227 coalesce_join_cols_str=coalesce_join_cols_str, 1228 update_or_nothing=update_or_nothing, 1229 when_matched_update_sets_subquery_none=when_matched_update_sets_subquery_none, 1230 ) 1231 for base_query in base_queries 1232 ]
Build a list of MERGE
, UPDATE
, DELETE
/INSERT
queries to apply a patch to target table.
Parameters
- target (str): The name of the target table.
- patch (str): The name of the patch table. This should have the same shape as the target.
- connectable (Union[meerschaum.connectors.sql.SQLConnector, sqlalchemy.orm.session.Session]):
The
SQLConnector
or SQLAlchemy session which will later execute the queries. - join_cols (List[str]): The columns to use to join the patch to the target.
- flavor (Optional[str], default None): If using a SQLAlchemy session, provide the expected database flavor.
- upsert (bool, default False):
If
True
, return an upsert query rather than an update. - datetime_col (Optional[str], default None): If provided, bound the join query using this column as the datetime index. This must be present on both tables.
- schema (Optional[str], default None):
If provided, use this schema when quoting the target table.
Defaults to
connector.schema
. - patch_schema (Optional[str], default None):
If provided, use this schema when quoting the patch table.
Defaults to
schema
. - debug (bool, default False): Verbosity toggle.
Returns
- A list of query strings to perform the update operation.
1235def get_null_replacement(typ: str, flavor: str) -> str: 1236 """ 1237 Return a value that may temporarily be used in place of NULL for this type. 1238 1239 Parameters 1240 ---------- 1241 typ: str 1242 The typ to be converted to NULL. 1243 1244 flavor: str 1245 The database flavor for which this value will be used. 1246 1247 Returns 1248 ------- 1249 A value which may stand in place of NULL for this type. 1250 `'None'` is returned if a value cannot be determined. 1251 """ 1252 from meerschaum.utils.dtypes.sql import DB_FLAVORS_CAST_DTYPES 1253 if 'int' in typ.lower() or typ.lower() in ('numeric', 'number'): 1254 return '-987654321' 1255 if 'bool' in typ.lower(): 1256 bool_typ = ( 1257 PD_TO_DB_DTYPES_FLAVORS 1258 .get('bool', {}) 1259 .get(flavor, PD_TO_DB_DTYPES_FLAVORS['bool']['default']) 1260 ) 1261 if flavor in DB_FLAVORS_CAST_DTYPES: 1262 bool_typ = DB_FLAVORS_CAST_DTYPES[flavor].get(bool_typ, bool_typ) 1263 val_to_cast = ( 1264 -987654321 1265 if flavor in ('mysql', 'mariadb', 'sqlite', 'mssql') 1266 else 0 1267 ) 1268 return f'CAST({val_to_cast} AS {bool_typ})' 1269 if 'time' in typ.lower() or 'date' in typ.lower(): 1270 return dateadd_str(flavor=flavor, begin='1900-01-01') 1271 if 'float' in typ.lower() or 'double' in typ.lower() or typ.lower() in ('decimal',): 1272 return '-987654321.0' 1273 if typ.lower() in ('uniqueidentifier', 'guid', 'uuid'): 1274 magic_val = 'DEADBEEF-ABBA-BABE-CAFE-DECAFC0FFEE5' 1275 if flavor == 'mssql': 1276 return f"CAST('{magic_val}' AS UNIQUEIDENTIFIER)" 1277 return f"'{magic_val}'" 1278 return ('n' if flavor == 'oracle' else '') + "'-987654321'"
Return a value that may temporarily be used in place of NULL for this type.
Parameters
- typ (str): The typ to be converted to NULL.
- flavor (str): The database flavor for which this value will be used.
Returns
- A value which may stand in place of NULL for this type.
'None'
is returned if a value cannot be determined.
1281def get_db_version(conn: 'SQLConnector', debug: bool = False) -> Union[str, None]: 1282 """ 1283 Fetch the database version if possible. 1284 """ 1285 version_name = sql_item_name('version', conn.flavor, None) 1286 version_query = version_queries.get( 1287 conn.flavor, 1288 version_queries['default'] 1289 ).format(version_name=version_name) 1290 return conn.value(version_query, debug=debug)
Fetch the database version if possible.
1293def get_rename_table_queries( 1294 old_table: str, 1295 new_table: str, 1296 flavor: str, 1297 schema: Optional[str] = None, 1298) -> List[str]: 1299 """ 1300 Return queries to alter a table's name. 1301 1302 Parameters 1303 ---------- 1304 old_table: str 1305 The unquoted name of the old table. 1306 1307 new_table: str 1308 The unquoted name of the new table. 1309 1310 flavor: str 1311 The database flavor to use for the query (e.g. `'mssql'`, `'postgresql'`. 1312 1313 schema: Optional[str], default None 1314 The schema on which the table resides. 1315 1316 Returns 1317 ------- 1318 A list of `ALTER TABLE` or equivalent queries for the database flavor. 1319 """ 1320 old_table_name = sql_item_name(old_table, flavor, schema) 1321 new_table_name = sql_item_name(new_table, flavor, None) 1322 tmp_table = '_tmp_rename_' + new_table 1323 tmp_table_name = sql_item_name(tmp_table, flavor, schema) 1324 if flavor == 'mssql': 1325 return [f"EXEC sp_rename '{old_table}', '{new_table}'"] 1326 1327 if_exists_str = "IF EXISTS" if flavor in DROP_IF_EXISTS_FLAVORS else "" 1328 if flavor == 'duckdb': 1329 return [ 1330 get_create_table_query(f"SELECT * FROM {old_table_name}", tmp_table, 'duckdb', schema), 1331 get_create_table_query(f"SELECT * FROM {tmp_table_name}", new_table, 'duckdb', schema), 1332 f"DROP TABLE {if_exists_str} {tmp_table_name}", 1333 f"DROP TABLE {if_exists_str} {old_table_name}", 1334 ] 1335 1336 return [f"ALTER TABLE {old_table_name} RENAME TO {new_table_name}"]
Return queries to alter a table's name.
Parameters
- old_table (str): The unquoted name of the old table.
- new_table (str): The unquoted name of the new table.
- flavor (str):
The database flavor to use for the query (e.g.
'mssql'
,'postgresql'
. - schema (Optional[str], default None): The schema on which the table resides.
Returns
- A list of
ALTER TABLE
or equivalent queries for the database flavor.
1339def get_create_table_query( 1340 query: str, 1341 new_table: str, 1342 flavor: str, 1343 schema: Optional[str] = None, 1344) -> str: 1345 """ 1346 Return a query to create a new table from a `SELECT` query. 1347 1348 Parameters 1349 ---------- 1350 query: str 1351 The select query to use for the creation of the table. 1352 1353 new_table: str 1354 The unquoted name of the new table. 1355 1356 flavor: str 1357 The database flavor to use for the query (e.g. `'mssql'`, `'postgresql'`. 1358 1359 schema: Optional[str], default None 1360 The schema on which the table will reside. 1361 1362 Returns 1363 ------- 1364 A `CREATE TABLE` (or `SELECT INTO`) query for the database flavor. 1365 """ 1366 import textwrap 1367 create_cte = 'create_query' 1368 create_cte_name = sql_item_name(create_cte, flavor, None) 1369 new_table_name = sql_item_name(new_table, flavor, schema) 1370 if flavor in ('mssql',): 1371 query = query.lstrip() 1372 if 'with ' in query.lower(): 1373 final_select_ix = query.lower().rfind('select') 1374 return ( 1375 query[:final_select_ix].rstrip() + ',\n' 1376 + f"{create_cte_name} AS (\n" 1377 + query[final_select_ix:] 1378 + "\n)\n" 1379 + f"SELECT *\nINTO {new_table_name}\nFROM {create_cte_name}" 1380 ) 1381 1382 create_table_query = f""" 1383 SELECT * 1384 INTO {new_table_name} 1385 FROM ({query}) AS {create_cte_name} 1386 """ 1387 elif flavor in (None,): 1388 create_table_query = f""" 1389 WITH {create_cte_name} AS ({query}) 1390 CREATE TABLE {new_table_name} AS 1391 SELECT * 1392 FROM {create_cte_name} 1393 """ 1394 elif flavor in ('sqlite', 'mysql', 'mariadb', 'duckdb', 'oracle'): 1395 create_table_query = f""" 1396 CREATE TABLE {new_table_name} AS 1397 SELECT * 1398 FROM ({query})""" + (f""" AS {create_cte_name}""" if flavor != 'oracle' else '') + """ 1399 """ 1400 else: 1401 create_table_query = f""" 1402 SELECT * 1403 INTO {new_table_name} 1404 FROM ({query}) AS {create_cte_name} 1405 """ 1406 1407 return textwrap.dedent(create_table_query)
Return a query to create a new table from a SELECT
query.
Parameters
- query (str): The select query to use for the creation of the table.
- new_table (str): The unquoted name of the new table.
- flavor (str):
The database flavor to use for the query (e.g.
'mssql'
,'postgresql'
. - schema (Optional[str], default None): The schema on which the table will reside.
Returns
- A
CREATE TABLE
(orSELECT INTO
) query for the database flavor.
1410def wrap_query_with_cte( 1411 sub_query: str, 1412 parent_query: str, 1413 flavor: str, 1414 cte_name: str = "src", 1415) -> str: 1416 """ 1417 Wrap a subquery in a CTE and append an encapsulating query. 1418 1419 Parameters 1420 ---------- 1421 sub_query: str 1422 The query to be referenced. This may itself contain CTEs. 1423 Unless `cte_name` is provided, this will be aliased as `src`. 1424 1425 parent_query: str 1426 The larger query to append which references the subquery. 1427 This must not contain CTEs. 1428 1429 flavor: str 1430 The database flavor, e.g. `'mssql'`. 1431 1432 cte_name: str, default 'src' 1433 The CTE alias, defaults to `src`. 1434 1435 Returns 1436 ------- 1437 An encapsulating query which allows you to treat `sub_query` as a temporary table. 1438 1439 Examples 1440 -------- 1441 1442 ```python 1443 from meerschaum.utils.sql import wrap_query_with_cte 1444 sub_query = "WITH foo AS (SELECT 1 AS val) SELECT (val * 2) AS newval FROM foo" 1445 parent_query = "SELECT newval * 3 FROM src" 1446 query = wrap_query_with_cte(sub_query, parent_query, 'mssql') 1447 print(query) 1448 # WITH foo AS (SELECT 1 AS val), 1449 # [src] AS ( 1450 # SELECT (val * 2) AS newval FROM foo 1451 # ) 1452 # SELECT newval * 3 FROM src 1453 ``` 1454 1455 """ 1456 sub_query = sub_query.lstrip() 1457 cte_name_quoted = sql_item_name(cte_name, flavor, None) 1458 1459 if flavor in NO_CTE_FLAVORS: 1460 return ( 1461 parent_query 1462 .replace(cte_name_quoted, '--MRSM_SUBQUERY--') 1463 .replace(cte_name, '--MRSM_SUBQUERY--') 1464 .replace('--MRSM_SUBQUERY--', f"(\n{sub_query}\n) AS {cte_name_quoted}") 1465 ) 1466 1467 if 'with ' in sub_query.lower(): 1468 final_select_ix = sub_query.lower().rfind('select') 1469 return ( 1470 sub_query[:final_select_ix].rstrip() + ',\n' 1471 + f"{cte_name_quoted} AS (\n" 1472 + ' ' + sub_query[final_select_ix:] 1473 + "\n)\n" 1474 + parent_query 1475 ) 1476 1477 return ( 1478 f"WITH {cte_name_quoted} AS (\n" 1479 f" {sub_query}\n" 1480 f")\n{parent_query}" 1481 )
Wrap a subquery in a CTE and append an encapsulating query.
Parameters
- sub_query (str):
The query to be referenced. This may itself contain CTEs.
Unless
cte_name
is provided, this will be aliased assrc
. - parent_query (str): The larger query to append which references the subquery. This must not contain CTEs.
- flavor (str):
The database flavor, e.g.
'mssql'
. - cte_name (str, default 'src'):
The CTE alias, defaults to
src
.
Returns
- An encapsulating query which allows you to treat
sub_query
as a temporary table.
Examples
from meerschaum.utils.sql import wrap_query_with_cte
sub_query = "WITH foo AS (SELECT 1 AS val) SELECT (val * 2) AS newval FROM foo"
parent_query = "SELECT newval * 3 FROM src"
query = wrap_query_with_cte(sub_query, parent_query, 'mssql')
print(query)
# WITH foo AS (SELECT 1 AS val),
# [src] AS (
# SELECT (val * 2) AS newval FROM foo
# )
# SELECT newval * 3 FROM src
1484def format_cte_subquery( 1485 sub_query: str, 1486 flavor: str, 1487 sub_name: str = 'src', 1488 cols_to_select: Union[List[str], str] = '*', 1489) -> str: 1490 """ 1491 Given a subquery, build a wrapper query that selects from the CTE subquery. 1492 1493 Parameters 1494 ---------- 1495 sub_query: str 1496 The subquery to wrap. 1497 1498 flavor: str 1499 The database flavor to use for the query (e.g. `'mssql'`, `'postgresql'`. 1500 1501 sub_name: str, default 'src' 1502 If possible, give this name to the CTE (must be unquoted). 1503 1504 cols_to_select: Union[List[str], str], default '' 1505 If specified, choose which columns to select from the CTE. 1506 If a list of strings is provided, each item will be quoted and joined with commas. 1507 If a string is given, assume it is quoted and insert it into the query. 1508 1509 Returns 1510 ------- 1511 A wrapper query that selects from the CTE. 1512 """ 1513 quoted_sub_name = sql_item_name(sub_name, flavor, None) 1514 cols_str = ( 1515 cols_to_select 1516 if isinstance(cols_to_select, str) 1517 else ', '.join([sql_item_name(col, flavor, None) for col in cols_to_select]) 1518 ) 1519 parent_query = ( 1520 f"SELECT {cols_str}\n" 1521 f"FROM {quoted_sub_name}" 1522 ) 1523 return wrap_query_with_cte(sub_query, parent_query, flavor, cte_name=sub_name)
Given a subquery, build a wrapper query that selects from the CTE subquery.
Parameters
- sub_query (str): The subquery to wrap.
- flavor (str):
The database flavor to use for the query (e.g.
'mssql'
,'postgresql'
. - sub_name (str, default 'src'): If possible, give this name to the CTE (must be unquoted).
- cols_to_select (Union[List[str], str], default ''): If specified, choose which columns to select from the CTE. If a list of strings is provided, each item will be quoted and joined with commas. If a string is given, assume it is quoted and insert it into the query.
Returns
- A wrapper query that selects from the CTE.
1526def session_execute( 1527 session: 'sqlalchemy.orm.session.Session', 1528 queries: Union[List[str], str], 1529 with_results: bool = False, 1530 debug: bool = False, 1531) -> Union[mrsm.SuccessTuple, Tuple[mrsm.SuccessTuple, List['sqlalchemy.sql.ResultProxy']]]: 1532 """ 1533 Similar to `SQLConnector.exec_queries()`, execute a list of queries 1534 and roll back when one fails. 1535 1536 Parameters 1537 ---------- 1538 session: sqlalchemy.orm.session.Session 1539 A SQLAlchemy session representing a transaction. 1540 1541 queries: Union[List[str], str] 1542 A query or list of queries to be executed. 1543 If a query fails, roll back the session. 1544 1545 with_results: bool, default False 1546 If `True`, return a list of result objects. 1547 1548 Returns 1549 ------- 1550 A `SuccessTuple` indicating the queries were successfully executed. 1551 If `with_results`, return the `SuccessTuple` and a list of results. 1552 """ 1553 sqlalchemy = mrsm.attempt_import('sqlalchemy') 1554 if not isinstance(queries, list): 1555 queries = [queries] 1556 successes, msgs, results = [], [], [] 1557 for query in queries: 1558 query_text = sqlalchemy.text(query) 1559 fail_msg = "Failed to execute queries." 1560 try: 1561 result = session.execute(query_text) 1562 query_success = result is not None 1563 query_msg = "Success" if query_success else fail_msg 1564 except Exception as e: 1565 query_success = False 1566 query_msg = f"{fail_msg}\n{e}" 1567 result = None 1568 successes.append(query_success) 1569 msgs.append(query_msg) 1570 results.append(result) 1571 if not query_success: 1572 session.rollback() 1573 break 1574 success, msg = all(successes), '\n'.join(msgs) 1575 if with_results: 1576 return (success, msg), results 1577 return success, msg
Similar to SQLConnector.exec_queries()
, execute a list of queries
and roll back when one fails.
Parameters
- session (sqlalchemy.orm.session.Session): A SQLAlchemy session representing a transaction.
- queries (Union[List[str], str]): A query or list of queries to be executed. If a query fails, roll back the session.
- with_results (bool, default False):
If
True
, return a list of result objects.
Returns
- A
SuccessTuple
indicating the queries were successfully executed. - If
with_results
, return theSuccessTuple
and a list of results.