meerschaum.utils.sql
Flavor-specific SQL tools.
1#! /usr/bin/env python 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Flavor-specific SQL tools. 7""" 8 9from __future__ import annotations 10 11from datetime import datetime, timezone, timedelta 12import meerschaum as mrsm 13from meerschaum.utils.typing import Optional, Dict, Any, Union, List, Iterable, Tuple 14### Preserve legacy imports. 15from meerschaum.utils.dtypes.sql import ( 16 DB_TO_PD_DTYPES, 17 PD_TO_DB_DTYPES_FLAVORS, 18 get_pd_type_from_db_type as get_pd_type, 19 get_db_type_from_pd_type as get_db_type, 20 TIMEZONE_NAIVE_FLAVORS, 21) 22from meerschaum.utils.warnings import warn 23from meerschaum.utils.debug import dprint 24 25test_queries = { 26 'default' : 'SELECT 1', 27 'oracle' : 'SELECT 1 FROM DUAL', 28 'informix' : 'SELECT COUNT(*) FROM systables', 29 'hsqldb' : 'SELECT 1 FROM INFORMATION_SCHEMA.SYSTEM_USERS', 30} 31### `table_name` is the escaped name of the table. 32### `table` is the unescaped name of the table. 33exists_queries = { 34 'default': "SELECT COUNT(*) FROM {table_name} WHERE 1 = 0", 35} 36version_queries = { 37 'default': "SELECT VERSION() AS {version_name}", 38 'sqlite': "SELECT SQLITE_VERSION() AS {version_name}", 39 'geopackage': "SELECT SQLITE_VERSION() AS {version_name}", 40 'mssql': "SELECT @@version", 41 'oracle': "SELECT version from PRODUCT_COMPONENT_VERSION WHERE rownum = 1", 42} 43SKIP_IF_EXISTS_FLAVORS = {'mssql', 'oracle'} 44DROP_IF_EXISTS_FLAVORS = { 45 'timescaledb', 46 'timescaledb-ha', 47 'postgresql', 48 'postgis', 49 'citus', 50 'mssql', 51 'mysql', 52 'mariadb', 53 'sqlite', 54 'geopackage', 55} 56DROP_INDEX_IF_EXISTS_FLAVORS = { 57 'mssql', 58 'timescaledb', 59 'timescaledb-ha', 60 'postgresql', 61 'postgis', 62 'sqlite', 63 'geopackage', 64 'citus', 65} 66SKIP_AUTO_INCREMENT_FLAVORS = {'citus', 'duckdb'} 67COALESCE_UNIQUE_INDEX_FLAVORS = { 68 'timescaledb', 69 'timescaledb-ha', 70 'postgresql', 71 'postgis', 72 'citus', 73} 74UPDATE_QUERIES = { 75 'default': """ 76 UPDATE {target_table_name} AS f 77 {sets_subquery_none} 78 FROM {target_table_name} AS t 79 INNER JOIN (SELECT {patch_cols_str} FROM {patch_table_name}) AS p 80 ON 81 {and_subquery_t} 82 WHERE 83 {and_subquery_f} 84 AND 85 {date_bounds_subquery} 86 """, 87 'timescaledb-upsert': """ 88 INSERT INTO {target_table_name} ({patch_cols_str}) 89 SELECT {patch_cols_str} 90 FROM {patch_table_name} 91 ON CONFLICT ({join_cols_str}) DO {update_or_nothing} {sets_subquery_none_excluded} 92 """, 93 'timescaledb-ha-upsert': """ 94 INSERT INTO {target_table_name} ({patch_cols_str}) 95 SELECT {patch_cols_str} 96 FROM {patch_table_name} 97 ON CONFLICT ({join_cols_str}) DO {update_or_nothing} {sets_subquery_none_excluded} 98 """, 99 'postgresql-upsert': """ 100 INSERT INTO {target_table_name} ({patch_cols_str}) 101 SELECT {patch_cols_str} 102 FROM {patch_table_name} 103 ON CONFLICT ({join_cols_str}) DO {update_or_nothing} {sets_subquery_none_excluded} 104 """, 105 'postgis-upsert': """ 106 INSERT INTO {target_table_name} ({patch_cols_str}) 107 SELECT {patch_cols_str} 108 FROM {patch_table_name} 109 ON CONFLICT ({join_cols_str}) DO {update_or_nothing} {sets_subquery_none_excluded} 110 """, 111 'citus-upsert': """ 112 INSERT INTO {target_table_name} ({patch_cols_str}) 113 SELECT {patch_cols_str} 114 FROM {patch_table_name} 115 ON CONFLICT ({join_cols_str}) DO {update_or_nothing} {sets_subquery_none_excluded} 116 """, 117 'cockroachdb-upsert': """ 118 INSERT INTO {target_table_name} ({patch_cols_str}) 119 SELECT {patch_cols_str} 120 FROM {patch_table_name} 121 ON CONFLICT ({join_cols_str}) DO {update_or_nothing} {sets_subquery_none_excluded} 122 """, 123 'mysql': """ 124 UPDATE {target_table_name} AS f 125 JOIN (SELECT {patch_cols_str} FROM {patch_table_name}) AS p 126 ON 127 {and_subquery_f} 128 {sets_subquery_f} 129 WHERE 130 {date_bounds_subquery} 131 """, 132 'mysql-upsert': """ 133 INSERT {ignore}INTO {target_table_name} ({patch_cols_str}) 134 SELECT {patch_cols_str} 135 FROM {patch_table_name} 136 {on_duplicate_key_update} 137 {cols_equal_values} 138 """, 139 'mariadb': """ 140 UPDATE {target_table_name} AS f 141 JOIN (SELECT {patch_cols_str} FROM {patch_table_name}) AS p 142 ON 143 {and_subquery_f} 144 {sets_subquery_f} 145 WHERE 146 {date_bounds_subquery} 147 """, 148 'mariadb-upsert': """ 149 INSERT {ignore}INTO {target_table_name} ({patch_cols_str}) 150 SELECT {patch_cols_str} 151 FROM {patch_table_name} 152 {on_duplicate_key_update} 153 {cols_equal_values} 154 """, 155 'mssql': """ 156 {with_temp_date_bounds} 157 MERGE {target_table_name} f 158 USING (SELECT {patch_cols_str} FROM {patch_table_name}) p 159 ON 160 {and_subquery_f} 161 AND 162 {date_bounds_subquery} 163 WHEN MATCHED THEN 164 UPDATE 165 {sets_subquery_none}; 166 """, 167 'mssql-upsert': [ 168 "{identity_insert_on}", 169 """ 170 {with_temp_date_bounds} 171 MERGE {target_table_name} f 172 USING (SELECT {patch_cols_str} FROM {patch_table_name}) p 173 ON 174 {and_subquery_f} 175 AND 176 {date_bounds_subquery}{when_matched_update_sets_subquery_none} 177 WHEN NOT MATCHED THEN 178 INSERT ({patch_cols_str}) 179 VALUES ({patch_cols_prefixed_str}); 180 """, 181 "{identity_insert_off}", 182 ], 183 'oracle': """ 184 MERGE INTO {target_table_name} f 185 USING (SELECT {patch_cols_str} FROM {patch_table_name}) p 186 ON ( 187 {and_subquery_f} 188 AND 189 {date_bounds_subquery} 190 ) 191 WHEN MATCHED THEN 192 UPDATE 193 {sets_subquery_none} 194 """, 195 'oracle-upsert': """ 196 MERGE INTO {target_table_name} f 197 USING (SELECT {patch_cols_str} FROM {patch_table_name}) p 198 ON ( 199 {and_subquery_f} 200 AND 201 {date_bounds_subquery} 202 ){when_matched_update_sets_subquery_none} 203 WHEN NOT MATCHED THEN 204 INSERT ({patch_cols_str}) 205 VALUES ({patch_cols_prefixed_str}) 206 """, 207 'sqlite-upsert': """ 208 INSERT INTO {target_table_name} ({patch_cols_str}) 209 SELECT {patch_cols_str} 210 FROM {patch_table_name} 211 WHERE true 212 ON CONFLICT ({join_cols_str}) DO {update_or_nothing} {sets_subquery_none_excluded} 213 """, 214 'sqlite_delete_insert': [ 215 """ 216 DELETE FROM {target_table_name} AS f 217 WHERE ROWID IN ( 218 SELECT t.ROWID 219 FROM {target_table_name} AS t 220 INNER JOIN (SELECT * FROM {patch_table_name}) AS p 221 ON {and_subquery_t} 222 ); 223 """, 224 """ 225 INSERT INTO {target_table_name} AS f 226 SELECT {patch_cols_str} FROM {patch_table_name} AS p 227 """, 228 ], 229 'geopackage-upsert': """ 230 INSERT INTO {target_table_name} ({patch_cols_str}) 231 SELECT {patch_cols_str} 232 FROM {patch_table_name} 233 WHERE true 234 ON CONFLICT ({join_cols_str}) DO {update_or_nothing} {sets_subquery_none_excluded} 235 """, 236} 237columns_types_queries = { 238 'default': """ 239 SELECT 240 table_catalog AS database, 241 table_schema AS schema, 242 table_name AS table, 243 column_name AS column, 244 data_type AS type, 245 numeric_precision, 246 numeric_scale 247 FROM information_schema.columns 248 WHERE table_name IN ('{table}', '{table_trunc}') 249 """, 250 'sqlite': """ 251 SELECT 252 '' "database", 253 '' "schema", 254 m.name "table", 255 p.name "column", 256 p.type "type" 257 FROM sqlite_master m 258 LEFT OUTER JOIN pragma_table_info(m.name) p 259 ON m.name <> p.name 260 WHERE m.type = 'table' 261 AND m.name IN ('{table}', '{table_trunc}') 262 """, 263 'geopackage': """ 264 SELECT 265 '' "database", 266 '' "schema", 267 m.name "table", 268 p.name "column", 269 p.type "type" 270 FROM sqlite_master m 271 LEFT OUTER JOIN pragma_table_info(m.name) p 272 ON m.name <> p.name 273 WHERE m.type = 'table' 274 AND m.name IN ('{table}', '{table_trunc}') 275 """, 276 'mssql': """ 277 SELECT 278 TABLE_CATALOG AS [database], 279 TABLE_SCHEMA AS [schema], 280 TABLE_NAME AS [table], 281 COLUMN_NAME AS [column], 282 DATA_TYPE AS [type], 283 NUMERIC_PRECISION AS [numeric_precision], 284 NUMERIC_SCALE AS [numeric_scale] 285 FROM {db_prefix}INFORMATION_SCHEMA.COLUMNS WITH (NOLOCK) 286 WHERE TABLE_NAME IN ( 287 '{table}', 288 '{table_trunc}' 289 ) 290 291 """, 292 'mysql': """ 293 SELECT 294 TABLE_SCHEMA `database`, 295 TABLE_SCHEMA `schema`, 296 TABLE_NAME `table`, 297 COLUMN_NAME `column`, 298 DATA_TYPE `type`, 299 NUMERIC_PRECISION `numeric_precision`, 300 NUMERIC_SCALE `numeric_scale` 301 FROM INFORMATION_SCHEMA.COLUMNS 302 WHERE TABLE_NAME IN ('{table}', '{table_trunc}') 303 """, 304 'mariadb': """ 305 SELECT 306 TABLE_SCHEMA `database`, 307 TABLE_SCHEMA `schema`, 308 TABLE_NAME `table`, 309 COLUMN_NAME `column`, 310 DATA_TYPE `type`, 311 NUMERIC_PRECISION `numeric_precision`, 312 NUMERIC_SCALE `numeric_scale` 313 FROM INFORMATION_SCHEMA.COLUMNS 314 WHERE TABLE_NAME IN ('{table}', '{table_trunc}') 315 """, 316 'oracle': """ 317 SELECT 318 NULL AS "database", 319 NULL AS "schema", 320 TABLE_NAME AS "table", 321 COLUMN_NAME AS "column", 322 DATA_TYPE AS "type", 323 DATA_PRECISION AS "numeric_precision", 324 DATA_SCALE AS "numeric_scale" 325 FROM all_tab_columns 326 WHERE TABLE_NAME IN ( 327 '{table}', 328 '{table_trunc}', 329 '{table_lower}', 330 '{table_lower_trunc}', 331 '{table_upper}', 332 '{table_upper_trunc}' 333 ) 334 """, 335} 336hypertable_queries = { 337 'timescaledb': 'SELECT hypertable_size(\'{table_name}\')', 338 'timescaledb-ha': 'SELECT hypertable_size(\'{table_name}\')', 339 'citus': 'SELECT citus_table_size(\'{table_name}\')', 340} 341columns_indices_queries = { 342 'default': """ 343 SELECT 344 current_database() AS "database", 345 n.nspname AS "schema", 346 t.relname AS "table", 347 c.column_name AS "column", 348 i.relname AS "index", 349 CASE WHEN con.contype = 'p' THEN 'PRIMARY KEY' ELSE 'INDEX' END AS "index_type" 350 FROM pg_class t 351 INNER JOIN pg_index AS ix 352 ON t.oid = ix.indrelid 353 INNER JOIN pg_class AS i 354 ON i.oid = ix.indexrelid 355 INNER JOIN pg_namespace AS n 356 ON n.oid = t.relnamespace 357 INNER JOIN pg_attribute AS a 358 ON a.attnum = ANY(ix.indkey) 359 AND a.attrelid = t.oid 360 INNER JOIN information_schema.columns AS c 361 ON c.column_name = a.attname 362 AND c.table_name = t.relname 363 AND c.table_schema = n.nspname 364 LEFT JOIN pg_constraint AS con 365 ON con.conindid = i.oid 366 AND con.contype = 'p' 367 WHERE 368 t.relname IN ('{table}', '{table_trunc}') 369 AND n.nspname = '{schema}' 370 """, 371 'sqlite': """ 372 WITH indexed_columns AS ( 373 SELECT 374 '{table}' AS table_name, 375 pi.name AS column_name, 376 i.name AS index_name, 377 'INDEX' AS index_type 378 FROM 379 sqlite_master AS i, 380 pragma_index_info(i.name) AS pi 381 WHERE 382 i.type = 'index' 383 AND i.tbl_name = '{table}' 384 ), 385 primary_key_columns AS ( 386 SELECT 387 '{table}' AS table_name, 388 ti.name AS column_name, 389 'PRIMARY_KEY' AS index_name, 390 'PRIMARY KEY' AS index_type 391 FROM 392 pragma_table_info('{table}') AS ti 393 WHERE 394 ti.pk > 0 395 ) 396 SELECT 397 NULL AS "database", 398 NULL AS "schema", 399 "table_name" AS "table", 400 "column_name" AS "column", 401 "index_name" AS "index", 402 "index_type" 403 FROM indexed_columns 404 UNION ALL 405 SELECT 406 NULL AS "database", 407 NULL AS "schema", 408 table_name AS "table", 409 column_name AS "column", 410 index_name AS "index", 411 index_type 412 FROM primary_key_columns 413 """, 414 'geopackage': """ 415 WITH indexed_columns AS ( 416 SELECT 417 '{table}' AS table_name, 418 pi.name AS column_name, 419 i.name AS index_name, 420 'INDEX' AS index_type 421 FROM 422 sqlite_master AS i, 423 pragma_index_info(i.name) AS pi 424 WHERE 425 i.type = 'index' 426 AND i.tbl_name = '{table}' 427 ), 428 primary_key_columns AS ( 429 SELECT 430 '{table}' AS table_name, 431 ti.name AS column_name, 432 'PRIMARY_KEY' AS index_name, 433 'PRIMARY KEY' AS index_type 434 FROM 435 pragma_table_info('{table}') AS ti 436 WHERE 437 ti.pk > 0 438 ) 439 SELECT 440 NULL AS "database", 441 NULL AS "schema", 442 "table_name" AS "table", 443 "column_name" AS "column", 444 "index_name" AS "index", 445 "index_type" 446 FROM indexed_columns 447 UNION ALL 448 SELECT 449 NULL AS "database", 450 NULL AS "schema", 451 table_name AS "table", 452 column_name AS "column", 453 index_name AS "index", 454 index_type 455 FROM primary_key_columns 456 """, 457 'mssql': """ 458 SELECT 459 NULL AS [database], 460 s.name AS [schema], 461 t.name AS [table], 462 c.name AS [column], 463 i.name AS [index], 464 CASE 465 WHEN kc.type = 'PK' THEN 'PRIMARY KEY' 466 ELSE 'INDEX' 467 END AS [index_type], 468 CASE 469 WHEN i.type = 1 THEN CAST(1 AS BIT) 470 ELSE CAST(0 AS BIT) 471 END AS [clustered] 472 FROM 473 sys.schemas s WITH (NOLOCK) 474 INNER JOIN sys.tables t WITH (NOLOCK) 475 ON s.schema_id = t.schema_id 476 INNER JOIN sys.indexes i WITH (NOLOCK) 477 ON t.object_id = i.object_id 478 INNER JOIN sys.index_columns ic WITH (NOLOCK) 479 ON i.object_id = ic.object_id 480 AND i.index_id = ic.index_id 481 INNER JOIN sys.columns c WITH (NOLOCK) 482 ON ic.object_id = c.object_id 483 AND ic.column_id = c.column_id 484 LEFT JOIN sys.key_constraints kc WITH (NOLOCK) 485 ON kc.parent_object_id = i.object_id 486 AND kc.type = 'PK' 487 AND kc.name = i.name 488 WHERE 489 t.name IN ('{table}', '{table_trunc}') 490 AND s.name = '{schema}' 491 AND i.type IN (1, 2) 492 """, 493 'oracle': """ 494 SELECT 495 NULL AS "database", 496 ic.table_owner AS "schema", 497 ic.table_name AS "table", 498 ic.column_name AS "column", 499 i.index_name AS "index", 500 CASE 501 WHEN c.constraint_type = 'P' THEN 'PRIMARY KEY' 502 WHEN i.uniqueness = 'UNIQUE' THEN 'UNIQUE INDEX' 503 ELSE 'INDEX' 504 END AS index_type 505 FROM 506 all_ind_columns ic 507 INNER JOIN all_indexes i 508 ON ic.index_name = i.index_name 509 AND ic.table_owner = i.owner 510 LEFT JOIN all_constraints c 511 ON i.index_name = c.constraint_name 512 AND i.table_owner = c.owner 513 AND c.constraint_type = 'P' 514 WHERE ic.table_name IN ( 515 '{table}', 516 '{table_trunc}', 517 '{table_upper}', 518 '{table_upper_trunc}' 519 ) 520 """, 521 'mysql': """ 522 SELECT 523 TABLE_SCHEMA AS `database`, 524 TABLE_SCHEMA AS `schema`, 525 TABLE_NAME AS `table`, 526 COLUMN_NAME AS `column`, 527 INDEX_NAME AS `index`, 528 CASE 529 WHEN NON_UNIQUE = 0 THEN 'PRIMARY KEY' 530 ELSE 'INDEX' 531 END AS `index_type` 532 FROM 533 information_schema.STATISTICS 534 WHERE 535 TABLE_NAME IN ('{table}', '{table_trunc}') 536 """, 537 'mariadb': """ 538 SELECT 539 TABLE_SCHEMA AS `database`, 540 TABLE_SCHEMA AS `schema`, 541 TABLE_NAME AS `table`, 542 COLUMN_NAME AS `column`, 543 INDEX_NAME AS `index`, 544 CASE 545 WHEN NON_UNIQUE = 0 THEN 'PRIMARY KEY' 546 ELSE 'INDEX' 547 END AS `index_type` 548 FROM 549 information_schema.STATISTICS 550 WHERE 551 TABLE_NAME IN ('{table}', '{table_trunc}') 552 """, 553} 554reset_autoincrement_queries: Dict[str, Union[str, List[str]]] = { 555 'default': """ 556 SELECT SETVAL(pg_get_serial_sequence('{table_name}', '{column}'), {val}) 557 FROM {table_name} 558 """, 559 'mssql': """ 560 DBCC CHECKIDENT ('{table_name}', RESEED, {val}) 561 """, 562 'mysql': """ 563 ALTER TABLE {table_name} AUTO_INCREMENT = {val} 564 """, 565 'mariadb': """ 566 ALTER TABLE {table_name} AUTO_INCREMENT = {val} 567 """, 568 'sqlite': """ 569 UPDATE sqlite_sequence 570 SET seq = {val} 571 WHERE name = '{table}' 572 """, 573 'geopackage': """ 574 UPDATE sqlite_sequence 575 SET seq = {val} 576 WHERE name = '{table}' 577 """, 578 'oracle': ( 579 "ALTER TABLE {table_name} MODIFY {column_name} " 580 "GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH {val_plus_1})" 581 ), 582} 583table_wrappers = { 584 'default' : ('"', '"'), 585 'timescaledb' : ('"', '"'), 586 'timescaledb-ha': ('"', '"'), 587 'citus' : ('"', '"'), 588 'duckdb' : ('"', '"'), 589 'postgresql' : ('"', '"'), 590 'postgis' : ('"', '"'), 591 'sqlite' : ('"', '"'), 592 'geopackage' : ('"', '"'), 593 'mysql' : ('`', '`'), 594 'mariadb' : ('`', '`'), 595 'mssql' : ('[', ']'), 596 'cockroachdb' : ('"', '"'), 597 'oracle' : ('"', '"'), 598} 599max_name_lens = { 600 'default' : 64, 601 'mssql' : 128, 602 'oracle' : 30, 603 'postgresql' : 64, 604 'postgis' : 64, 605 'timescaledb' : 64, 606 'timescaledb-ha': 64, 607 'citus' : 64, 608 'cockroachdb' : 64, 609 'sqlite' : 1024, 610 'geopackage' : 1024, 611 'mysql' : 64, 612 'mariadb' : 64, 613} 614json_flavors = { 615 'postgresql', 616 'postgis', 617 'timescaledb', 618 'timescaledb-ha', 619 'citus', 620 'cockroachdb', 621} 622NO_SCHEMA_FLAVORS = { 623 'oracle', 624 'sqlite', 625 'geopackage', 626 'mysql', 627 'mariadb', 628 'duckdb', 629} 630DEFAULT_SCHEMA_FLAVORS = { 631 'postgresql': 'public', 632 'postgis': 'public', 633 'timescaledb': 'public', 634 'timescaledb-ha': 'public', 635 'citus': 'public', 636 'cockroachdb': 'public', 637 'mysql': 'mysql', 638 'mariadb': 'mysql', 639 'mssql': 'dbo', 640} 641OMIT_NULLSFIRST_FLAVORS = { 642 'mariadb', 643 'mysql', 644 'mssql', 645} 646 647SINGLE_ALTER_TABLE_FLAVORS = { 648 'duckdb', 649 'sqlite', 650 'geopackage', 651 'mssql', 652 'oracle', 653} 654NO_CTE_FLAVORS = { 655 'mysql', 656 'mariadb', 657} 658NO_SELECT_INTO_FLAVORS = { 659 'sqlite', 660 'geopackage', 661 'oracle', 662 'mysql', 663 'mariadb', 664 'duckdb', 665} 666 667 668def clean(substring: str) -> None: 669 """ 670 Ensure a substring is clean enough to be inserted into a SQL query. 671 Raises an exception when banned words are used. 672 """ 673 from meerschaum.utils.warnings import error 674 banned_symbols = [';', '--', 'drop ',] 675 for symbol in banned_symbols: 676 if symbol in str(substring).lower(): 677 error(f"Invalid string: '{substring}'") 678 679 680def dateadd_str( 681 flavor: str = 'postgresql', 682 datepart: str = 'day', 683 number: Union[int, float] = 0, 684 begin: Union[str, datetime, int] = 'now', 685 db_type: Optional[str] = None, 686) -> str: 687 """ 688 Generate a `DATEADD` clause depending on database flavor. 689 690 Parameters 691 ---------- 692 flavor: str, default `'postgresql'` 693 SQL database flavor, e.g. `'postgresql'`, `'sqlite'`. 694 695 Currently supported flavors: 696 697 - `'postgresql'` 698 - `'postgis'` 699 - `'timescaledb'` 700 - `'timescaledb-ha'` 701 - `'citus'` 702 - `'cockroachdb'` 703 - `'duckdb'` 704 - `'mssql'` 705 - `'mysql'` 706 - `'mariadb'` 707 - `'sqlite'` 708 - `'geopackage'` 709 - `'oracle'` 710 711 datepart: str, default `'day'` 712 Which part of the date to modify. Supported values: 713 714 - `'year'` 715 - `'month'` 716 - `'day'` 717 - `'hour'` 718 - `'minute'` 719 - `'second'` 720 721 number: Union[int, float], default `0` 722 How many units to add to the date part. 723 724 begin: Union[str, datetime], default `'now'` 725 Base datetime to which to add dateparts. 726 727 db_type: Optional[str], default None 728 If provided, cast the datetime string as the type. 729 Otherwise, infer this from the input datetime value. 730 731 Returns 732 ------- 733 The appropriate `DATEADD` string for the corresponding database flavor. 734 735 Examples 736 -------- 737 >>> dateadd_str( 738 ... flavor='mssql', 739 ... begin=datetime(2022, 1, 1, 0, 0), 740 ... number=1, 741 ... ) 742 "DATEADD(day, 1, CAST('2022-01-01 00:00:00' AS DATETIME2))" 743 >>> dateadd_str( 744 ... flavor='postgresql', 745 ... begin=datetime(2022, 1, 1, 0, 0), 746 ... number=1, 747 ... ) 748 "CAST('2022-01-01 00:00:00' AS TIMESTAMP) + INTERVAL '1 day'" 749 750 """ 751 from meerschaum.utils.packages import attempt_import 752 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type, get_pd_type_from_db_type 753 dateutil_parser = attempt_import('dateutil.parser') 754 if 'int' in str(type(begin)).lower(): 755 num_str = str(begin) 756 if number is not None and number != 0: 757 num_str += ( 758 f' + {number}' 759 if number > 0 760 else f" - {number * -1}" 761 ) 762 return num_str 763 if not begin: 764 return '' 765 766 _original_begin = begin 767 begin_time = None 768 ### Sanity check: make sure `begin` is a valid datetime before we inject anything. 769 if not isinstance(begin, datetime): 770 try: 771 begin_time = dateutil_parser.parse(begin) 772 except Exception: 773 begin_time = None 774 else: 775 begin_time = begin 776 777 ### Unable to parse into a datetime. 778 if begin_time is None: 779 ### Throw an error if banned symbols are included in the `begin` string. 780 clean(str(begin)) 781 ### If begin is a valid datetime, wrap it in quotes. 782 else: 783 if isinstance(begin, datetime) and begin.tzinfo is not None: 784 begin = begin.astimezone(timezone.utc) 785 begin = ( 786 f"'{begin.replace(tzinfo=None)}'" 787 if isinstance(begin, datetime) and flavor in TIMEZONE_NAIVE_FLAVORS 788 else f"'{begin}'" 789 ) 790 791 dt_is_utc = ( 792 begin_time.tzinfo is not None 793 if begin_time is not None 794 else ('+' in str(begin) or '-' in str(begin).split(':', maxsplit=1)[-1]) 795 ) 796 if db_type: 797 db_type_is_utc = 'utc' in get_pd_type_from_db_type(db_type).lower() 798 dt_is_utc = dt_is_utc or db_type_is_utc 799 db_type = db_type or get_db_type_from_pd_type( 800 ('datetime64[ns, UTC]' if dt_is_utc else 'datetime64[ns]'), 801 flavor=flavor, 802 ) 803 804 da = "" 805 if flavor in ( 806 'postgresql', 807 'postgis', 808 'timescaledb', 809 'timescaledb-ha', 810 'cockroachdb', 811 'citus', 812 ): 813 begin = ( 814 f"CAST({begin} AS {db_type})" if begin != 'now' 815 else f"CAST(NOW() AT TIME ZONE 'utc' AS {db_type})" 816 ) 817 if dt_is_utc: 818 begin += " AT TIME ZONE 'UTC'" 819 da = begin + (f" + INTERVAL '{number} {datepart}'" if number != 0 else '') 820 821 elif flavor == 'duckdb': 822 begin = f"CAST({begin} AS {db_type})" if begin != 'now' else 'NOW()' 823 if dt_is_utc: 824 begin += " AT TIME ZONE 'UTC'" 825 da = begin + (f" + INTERVAL '{number} {datepart}'" if number != 0 else '') 826 827 elif flavor in ('mssql',): 828 if begin_time and begin_time.microsecond != 0 and not dt_is_utc: 829 begin = begin[:-4] + "'" 830 begin = f"CAST({begin} AS {db_type})" if begin != 'now' else 'GETUTCDATE()' 831 if dt_is_utc: 832 begin += " AT TIME ZONE 'UTC'" 833 da = f"DATEADD({datepart}, {number}, {begin})" if number != 0 else begin 834 835 elif flavor in ('mysql', 'mariadb'): 836 begin = ( 837 f"CAST({begin} AS DATETIME(6))" 838 if begin != 'now' 839 else 'UTC_TIMESTAMP(6)' 840 ) 841 da = (f"DATE_ADD({begin}, INTERVAL {number} {datepart})" if number != 0 else begin) 842 843 elif flavor in ('sqlite', 'geopackage'): 844 da = f"datetime({begin}, '{number} {datepart}')" 845 846 elif flavor == 'oracle': 847 if begin == 'now': 848 begin = str( 849 datetime.now(timezone.utc).replace(tzinfo=None).strftime(r'%Y:%m:%d %M:%S.%f') 850 ) 851 elif begin_time: 852 begin = str(begin_time.strftime(r'%Y-%m-%d %H:%M:%S.%f')) 853 dt_format = 'YYYY-MM-DD HH24:MI:SS.FF' 854 _begin = f"'{begin}'" if begin_time else begin 855 da = ( 856 (f"TO_TIMESTAMP({_begin}, '{dt_format}')" if begin_time else _begin) 857 + (f" + INTERVAL '{number}' {datepart}" if number != 0 else "") 858 ) 859 return da 860 861 862def test_connection( 863 self, 864 **kw: Any 865) -> Union[bool, None]: 866 """ 867 Test if a successful connection to the database may be made. 868 869 Parameters 870 ---------- 871 **kw: 872 The keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`. 873 874 Returns 875 ------- 876 `True` if a connection is made, otherwise `False` or `None` in case of failure. 877 878 """ 879 import warnings 880 from meerschaum.connectors.poll import retry_connect 881 _default_kw = {'max_retries': 1, 'retry_wait': 0, 'warn': False, 'connector': self} 882 _default_kw.update(kw) 883 with warnings.catch_warnings(): 884 warnings.filterwarnings('ignore', 'Could not') 885 try: 886 return retry_connect(**_default_kw) 887 except Exception: 888 return False 889 890 891def get_distinct_col_count( 892 col: str, 893 query: str, 894 connector: Optional[mrsm.connectors.sql.SQLConnector] = None, 895 debug: bool = False 896) -> Optional[int]: 897 """ 898 Returns the number of distinct items in a column of a SQL query. 899 900 Parameters 901 ---------- 902 col: str: 903 The column in the query to count. 904 905 query: str: 906 The SQL query to count from. 907 908 connector: Optional[mrsm.connectors.sql.SQLConnector], default None: 909 The SQLConnector to execute the query. 910 911 debug: bool, default False: 912 Verbosity toggle. 913 914 Returns 915 ------- 916 An `int` of the number of columns in the query or `None` if the query fails. 917 918 """ 919 if connector is None: 920 connector = mrsm.get_connector('sql') 921 922 _col_name = sql_item_name(col, connector.flavor, None) 923 924 _meta_query = ( 925 f""" 926 WITH src AS ( {query} ), 927 dist AS ( SELECT DISTINCT {_col_name} FROM src ) 928 SELECT COUNT(*) FROM dist""" 929 ) if connector.flavor not in ('mysql', 'mariadb') else ( 930 f""" 931 SELECT COUNT(*) 932 FROM ( 933 SELECT DISTINCT {_col_name} 934 FROM ({query}) AS src 935 ) AS dist""" 936 ) 937 938 result = connector.value(_meta_query, debug=debug) 939 try: 940 return int(result) 941 except Exception: 942 return None 943 944 945def sql_item_name(item: str, flavor: str, schema: Optional[str] = None) -> str: 946 """ 947 Parse SQL items depending on the flavor. 948 949 Parameters 950 ---------- 951 item: str 952 The database item (table, view, etc.) in need of quotes. 953 954 flavor: str 955 The database flavor (`'postgresql'`, `'mssql'`, `'sqllite'`, etc.). 956 957 schema: Optional[str], default None 958 If provided, prefix the table name with the schema. 959 960 Returns 961 ------- 962 A `str` which contains the input `item` wrapped in the corresponding escape characters. 963 964 Examples 965 -------- 966 >>> sql_item_name('table', 'sqlite') 967 '"table"' 968 >>> sql_item_name('table', 'mssql') 969 "[table]" 970 >>> sql_item_name('table', 'postgresql', schema='abc') 971 '"abc"."table"' 972 973 """ 974 truncated_item = truncate_item_name(str(item), flavor) 975 if flavor == 'oracle': 976 truncated_item = pg_capital(truncated_item, quote_capitals=True) 977 ### NOTE: System-reserved words must be quoted. 978 if truncated_item.lower() in ( 979 'float', 'varchar', 'nvarchar', 'clob', 980 'boolean', 'integer', 'table', 'row', 'date', 981 ): 982 wrappers = ('"', '"') 983 else: 984 wrappers = ('', '') 985 else: 986 wrappers = table_wrappers.get(flavor, table_wrappers['default']) 987 988 ### NOTE: SQLite does not support schemas. 989 if flavor in ('sqlite', 'geopackage'): 990 schema = None 991 elif flavor == 'mssql' and str(item).startswith('#'): 992 schema = None 993 994 schema_prefix = ( 995 (wrappers[0] + schema + wrappers[1] + '.') 996 if schema is not None 997 else '' 998 ) 999 1000 return schema_prefix + wrappers[0] + truncated_item + wrappers[1] 1001 1002 1003def pg_capital(s: str, quote_capitals: bool = True) -> str: 1004 """ 1005 If string contains a capital letter, wrap it in double quotes. 1006 1007 Parameters 1008 ---------- 1009 s: str 1010 The string to be escaped. 1011 1012 quote_capitals: bool, default True 1013 If `False`, do not quote strings with contain only a mix of capital and lower-case letters. 1014 1015 Returns 1016 ------- 1017 The input string wrapped in quotes only if it needs them. 1018 1019 Examples 1020 -------- 1021 >>> pg_capital("My Table") 1022 '"My Table"' 1023 >>> pg_capital('my_table') 1024 'my_table' 1025 1026 """ 1027 if s.startswith('"') and s.endswith('"'): 1028 return s 1029 1030 s = s.replace('"', '') 1031 1032 needs_quotes = s.startswith('_') 1033 if not needs_quotes: 1034 for c in s: 1035 if c == '_': 1036 continue 1037 1038 if not c.isalnum() or (quote_capitals and c.isupper()): 1039 needs_quotes = True 1040 break 1041 1042 if needs_quotes: 1043 return '"' + s + '"' 1044 1045 return s 1046 1047 1048def oracle_capital(s: str) -> str: 1049 """ 1050 Capitalize the string of an item on an Oracle database. 1051 """ 1052 return s 1053 1054 1055def truncate_item_name(item: str, flavor: str) -> str: 1056 """ 1057 Truncate item names to stay within the database flavor's character limit. 1058 1059 Parameters 1060 ---------- 1061 item: str 1062 The database item being referenced. This string is the "canonical" name internally. 1063 1064 flavor: str 1065 The flavor of the database on which `item` resides. 1066 1067 Returns 1068 ------- 1069 The truncated string. 1070 """ 1071 from meerschaum.utils.misc import truncate_string_sections 1072 return truncate_string_sections( 1073 item, max_len=max_name_lens.get(flavor, max_name_lens['default']) 1074 ) 1075 1076 1077def build_where( 1078 params: Dict[str, Any], 1079 connector: Optional[mrsm.connectors.sql.SQLConnector] = None, 1080 with_where: bool = True, 1081 flavor: str = 'postgresql', 1082) -> str: 1083 """ 1084 Build the `WHERE` clause based on the input criteria. 1085 1086 Parameters 1087 ---------- 1088 params: Dict[str, Any]: 1089 The keywords dictionary to convert into a WHERE clause. 1090 If a value is a string which begins with an underscore, negate that value 1091 (e.g. `!=` instead of `=` or `NOT IN` instead of `IN`). 1092 A value of `_None` will be interpreted as `IS NOT NULL`. 1093 1094 connector: Optional[meerschaum.connectors.sql.SQLConnector], default None: 1095 The Meerschaum SQLConnector that will be executing the query. 1096 The connector is used to extract the SQL dialect. 1097 1098 with_where: bool, default True: 1099 If `True`, include the leading `'WHERE'` string. 1100 1101 flavor: str, default 'postgresql' 1102 If `connector` is `None`, fall back to this flavor. 1103 1104 Returns 1105 ------- 1106 A `str` of the `WHERE` clause from the input `params` dictionary for the connector's flavor. 1107 1108 Examples 1109 -------- 1110 ``` 1111 >>> print(build_where({'foo': [1, 2, 3]})) 1112 1113 WHERE 1114 "foo" IN ('1', '2', '3') 1115 ``` 1116 """ 1117 import json 1118 from meerschaum._internal.static import STATIC_CONFIG 1119 from meerschaum.utils.warnings import warn 1120 from meerschaum.utils.dtypes import value_is_null, none_if_null 1121 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 1122 try: 1123 params_json = json.dumps(params) 1124 except Exception: 1125 params_json = str(params) 1126 bad_words = ['drop ', '--', ';'] 1127 for word in bad_words: 1128 if word in params_json.lower(): 1129 warn("Aborting build_where() due to possible SQL injection.") 1130 return '' 1131 1132 query_flavor = getattr(connector, 'flavor', flavor) if connector is not None else flavor 1133 where = "" 1134 leading_and = "\n AND " 1135 for key, value in params.items(): 1136 _key = sql_item_name(key, query_flavor, None) 1137 ### search across a list (i.e. IN syntax) 1138 if isinstance(value, Iterable) and not isinstance(value, (dict, str)): 1139 includes = [ 1140 none_if_null(item) 1141 for item in value 1142 if not str(item).startswith(negation_prefix) 1143 ] 1144 null_includes = [item for item in includes if item is None] 1145 not_null_includes = [item for item in includes if item is not None] 1146 excludes = [ 1147 none_if_null(str(item)[len(negation_prefix):]) 1148 for item in value 1149 if str(item).startswith(negation_prefix) 1150 ] 1151 null_excludes = [item for item in excludes if item is None] 1152 not_null_excludes = [item for item in excludes if item is not None] 1153 1154 if includes: 1155 where += f"{leading_and}(" 1156 if not_null_includes: 1157 where += f"{_key} IN (" 1158 for item in not_null_includes: 1159 quoted_item = str(item).replace("'", "''") 1160 where += f"'{quoted_item}', " 1161 where = where[:-2] + ")" 1162 if null_includes: 1163 where += ("\n OR " if not_null_includes else "") + f"{_key} IS NULL" 1164 if includes: 1165 where += ")" 1166 1167 if excludes: 1168 where += f"{leading_and}(" 1169 if not_null_excludes: 1170 where += f"{_key} NOT IN (" 1171 for item in not_null_excludes: 1172 quoted_item = str(item).replace("'", "''") 1173 where += f"'{quoted_item}', " 1174 where = where[:-2] + ")" 1175 if null_excludes: 1176 where += ("\n AND " if not_null_excludes else "") + f"{_key} IS NOT NULL" 1177 if excludes: 1178 where += ")" 1179 1180 continue 1181 1182 ### search a dictionary 1183 elif isinstance(value, dict): 1184 import json 1185 where += (f"{leading_and}CAST({_key} AS TEXT) = '" + json.dumps(value) + "'") 1186 continue 1187 1188 eq_sign = '=' 1189 is_null = 'IS NULL' 1190 if value_is_null(str(value).lstrip(negation_prefix)): 1191 value = ( 1192 (negation_prefix + 'None') 1193 if str(value).startswith(negation_prefix) 1194 else None 1195 ) 1196 if str(value).startswith(negation_prefix): 1197 value = str(value)[len(negation_prefix):] 1198 eq_sign = '!=' 1199 if value_is_null(value): 1200 value = None 1201 is_null = 'IS NOT NULL' 1202 quoted_value = str(value).replace("'", "''") 1203 where += ( 1204 f"{leading_and}{_key} " 1205 + (is_null if value is None else f"{eq_sign} '{quoted_value}'") 1206 ) 1207 1208 if len(where) > 1: 1209 where = ("\nWHERE\n " if with_where else '') + where[len(leading_and):] 1210 return where 1211 1212 1213def table_exists( 1214 table: str, 1215 connector: mrsm.connectors.sql.SQLConnector, 1216 schema: Optional[str] = None, 1217 debug: bool = False, 1218) -> bool: 1219 """Check if a table exists. 1220 1221 Parameters 1222 ---------- 1223 table: str: 1224 The name of the table in question. 1225 1226 connector: mrsm.connectors.sql.SQLConnector 1227 The connector to the database which holds the table. 1228 1229 schema: Optional[str], default None 1230 Optionally specify the table schema. 1231 Defaults to `connector.schema`. 1232 1233 debug: bool, default False : 1234 Verbosity toggle. 1235 1236 Returns 1237 ------- 1238 A `bool` indicating whether or not the table exists on the database. 1239 """ 1240 sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False) 1241 schema = schema or connector.schema 1242 insp = sqlalchemy.inspect(connector.engine) 1243 truncated_table_name = truncate_item_name(str(table), connector.flavor) 1244 return insp.has_table(truncated_table_name, schema=schema) 1245 1246 1247def get_sqlalchemy_table( 1248 table: str, 1249 connector: Optional[mrsm.connectors.sql.SQLConnector] = None, 1250 schema: Optional[str] = None, 1251 refresh: bool = False, 1252 debug: bool = False, 1253) -> Union['sqlalchemy.Table', None]: 1254 """ 1255 Construct a SQLAlchemy table from its name. 1256 1257 Parameters 1258 ---------- 1259 table: str 1260 The name of the table on the database. Does not need to be escaped. 1261 1262 connector: Optional[meerschaum.connectors.sql.SQLConnector], default None: 1263 The connector to the database which holds the table. 1264 1265 schema: Optional[str], default None 1266 Specify on which schema the table resides. 1267 Defaults to the schema set in `connector`. 1268 1269 refresh: bool, default False 1270 If `True`, rebuild the cached table object. 1271 1272 debug: bool, default False: 1273 Verbosity toggle. 1274 1275 Returns 1276 ------- 1277 A `sqlalchemy.Table` object for the table. 1278 1279 """ 1280 if connector is None: 1281 from meerschaum import get_connector 1282 connector = get_connector('sql') 1283 1284 if connector.flavor == 'duckdb': 1285 return None 1286 1287 from meerschaum.connectors.sql.tables import get_tables 1288 from meerschaum.utils.packages import attempt_import 1289 from meerschaum.utils.warnings import warn 1290 if refresh: 1291 connector.metadata.clear() 1292 tables = get_tables(mrsm_instance=connector, debug=debug, create=False) 1293 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 1294 truncated_table_name = truncate_item_name(str(table), connector.flavor) 1295 table_kwargs = { 1296 'autoload_with': connector.engine, 1297 } 1298 if schema: 1299 table_kwargs['schema'] = schema 1300 1301 if refresh or truncated_table_name not in tables: 1302 try: 1303 tables[truncated_table_name] = sqlalchemy.Table( 1304 truncated_table_name, 1305 connector.metadata, 1306 **table_kwargs 1307 ) 1308 except sqlalchemy.exc.NoSuchTableError: 1309 warn(f"Table '{truncated_table_name}' does not exist in '{connector}'.") 1310 return None 1311 return tables[truncated_table_name] 1312 1313 1314def get_table_cols_types( 1315 table: str, 1316 connectable: Union[ 1317 'mrsm.connectors.sql.SQLConnector', 1318 'sqlalchemy.orm.session.Session', 1319 'sqlalchemy.engine.base.Engine' 1320 ], 1321 flavor: Optional[str] = None, 1322 schema: Optional[str] = None, 1323 database: Optional[str] = None, 1324 debug: bool = False, 1325) -> Dict[str, str]: 1326 """ 1327 Return a dictionary mapping a table's columns to data types. 1328 This is useful for inspecting tables creating during a not-yet-committed session. 1329 1330 NOTE: This may return incorrect columns if the schema is not explicitly stated. 1331 Use this function if you are confident the table name is unique or if you have 1332 and explicit schema. 1333 To use the configured schema, get the columns from `get_sqlalchemy_table()` instead. 1334 1335 Parameters 1336 ---------- 1337 table: str 1338 The name of the table (unquoted). 1339 1340 connectable: Union[ 1341 'mrsm.connectors.sql.SQLConnector', 1342 'sqlalchemy.orm.session.Session', 1343 'sqlalchemy.engine.base.Engine' 1344 ] 1345 The connection object used to fetch the columns and types. 1346 1347 flavor: Optional[str], default None 1348 The database dialect flavor to use for the query. 1349 If omitted, default to `connectable.flavor`. 1350 1351 schema: Optional[str], default None 1352 If provided, restrict the query to this schema. 1353 1354 database: Optional[str]. default None 1355 If provided, restrict the query to this database. 1356 1357 Returns 1358 ------- 1359 A dictionary mapping column names to data types. 1360 """ 1361 import textwrap 1362 from meerschaum.connectors import SQLConnector 1363 sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False) 1364 flavor = flavor or getattr(connectable, 'flavor', None) 1365 if not flavor: 1366 raise ValueError("Please provide a database flavor.") 1367 if flavor == 'duckdb' and not isinstance(connectable, SQLConnector): 1368 raise ValueError("You must provide a SQLConnector when using DuckDB.") 1369 if flavor in NO_SCHEMA_FLAVORS: 1370 schema = None 1371 if schema is None: 1372 schema = DEFAULT_SCHEMA_FLAVORS.get(flavor, None) 1373 if flavor in ('sqlite', 'duckdb', 'oracle', 'geopackage'): 1374 database = None 1375 table_trunc = truncate_item_name(table, flavor=flavor) 1376 table_lower = table.lower() 1377 table_upper = table.upper() 1378 table_lower_trunc = truncate_item_name(table_lower, flavor=flavor) 1379 table_upper_trunc = truncate_item_name(table_upper, flavor=flavor) 1380 db_prefix = ( 1381 "tempdb." 1382 if flavor == 'mssql' and table.startswith('#') 1383 else "" 1384 ) 1385 1386 cols_types_query = sqlalchemy.text( 1387 textwrap.dedent(columns_types_queries.get( 1388 flavor, 1389 columns_types_queries['default'] 1390 ).format( 1391 table=table, 1392 table_trunc=table_trunc, 1393 table_lower=table_lower, 1394 table_lower_trunc=table_lower_trunc, 1395 table_upper=table_upper, 1396 table_upper_trunc=table_upper_trunc, 1397 db_prefix=db_prefix, 1398 )).lstrip().rstrip() 1399 ) 1400 1401 cols = ['database', 'schema', 'table', 'column', 'type', 'numeric_precision', 'numeric_scale'] 1402 result_cols_ix = dict(enumerate(cols)) 1403 1404 debug_kwargs = {'debug': debug} if isinstance(connectable, SQLConnector) else {} 1405 if not debug_kwargs and debug: 1406 dprint(cols_types_query) 1407 1408 try: 1409 result_rows = ( 1410 [ 1411 row 1412 for row in connectable.execute(cols_types_query, **debug_kwargs).fetchall() 1413 ] 1414 if flavor != 'duckdb' 1415 else [ 1416 tuple([doc[col] for col in cols]) 1417 for doc in connectable.read(cols_types_query, debug=debug).to_dict(orient='records') 1418 ] 1419 ) 1420 cols_types_docs = [ 1421 { 1422 result_cols_ix[i]: val 1423 for i, val in enumerate(row) 1424 } 1425 for row in result_rows 1426 ] 1427 cols_types_docs_filtered = [ 1428 doc 1429 for doc in cols_types_docs 1430 if ( 1431 ( 1432 not schema 1433 or doc['schema'] == schema 1434 ) 1435 and 1436 ( 1437 not database 1438 or doc['database'] == database 1439 ) 1440 ) 1441 ] 1442 1443 ### NOTE: This may return incorrect columns if the schema is not explicitly stated. 1444 if cols_types_docs and not cols_types_docs_filtered: 1445 cols_types_docs_filtered = cols_types_docs 1446 1447 ### NOTE: Check for PostGIS GEOMETRY columns. 1448 geometry_cols_types = {} 1449 user_defined_cols = [ 1450 doc 1451 for doc in cols_types_docs_filtered 1452 if str(doc.get('type', None)).upper() == 'USER-DEFINED' 1453 ] 1454 if user_defined_cols: 1455 geometry_cols_types.update( 1456 get_postgis_geo_columns_types( 1457 connectable, 1458 table, 1459 schema=schema, 1460 debug=debug, 1461 ) 1462 ) 1463 1464 cols_types = { 1465 ( 1466 doc['column'] 1467 if flavor != 'oracle' else ( 1468 ( 1469 doc['column'].lower() 1470 if (doc['column'].isupper() and doc['column'].replace('_', '').isalpha()) 1471 else doc['column'] 1472 ) 1473 ) 1474 ): doc['type'].upper() + ( 1475 f'({precision},{scale})' 1476 if ( 1477 (precision := doc.get('numeric_precision', None)) 1478 and 1479 (scale := doc.get('numeric_scale', None)) 1480 ) 1481 else '' 1482 ) 1483 for doc in cols_types_docs_filtered 1484 } 1485 cols_types.update(geometry_cols_types) 1486 return cols_types 1487 except Exception as e: 1488 warn(f"Failed to fetch columns for table '{table}':\n{e}") 1489 return {} 1490 1491 1492def get_table_cols_indices( 1493 table: str, 1494 connectable: Union[ 1495 'mrsm.connectors.sql.SQLConnector', 1496 'sqlalchemy.orm.session.Session', 1497 'sqlalchemy.engine.base.Engine' 1498 ], 1499 flavor: Optional[str] = None, 1500 schema: Optional[str] = None, 1501 database: Optional[str] = None, 1502 debug: bool = False, 1503) -> Dict[str, List[str]]: 1504 """ 1505 Return a dictionary mapping a table's columns to lists of indices. 1506 This is useful for inspecting tables creating during a not-yet-committed session. 1507 1508 NOTE: This may return incorrect columns if the schema is not explicitly stated. 1509 Use this function if you are confident the table name is unique or if you have 1510 and explicit schema. 1511 To use the configured schema, get the columns from `get_sqlalchemy_table()` instead. 1512 1513 Parameters 1514 ---------- 1515 table: str 1516 The name of the table (unquoted). 1517 1518 connectable: Union[ 1519 'mrsm.connectors.sql.SQLConnector', 1520 'sqlalchemy.orm.session.Session', 1521 'sqlalchemy.engine.base.Engine' 1522 ] 1523 The connection object used to fetch the columns and types. 1524 1525 flavor: Optional[str], default None 1526 The database dialect flavor to use for the query. 1527 If omitted, default to `connectable.flavor`. 1528 1529 schema: Optional[str], default None 1530 If provided, restrict the query to this schema. 1531 1532 database: Optional[str]. default None 1533 If provided, restrict the query to this database. 1534 1535 Returns 1536 ------- 1537 A dictionary mapping column names to a list of indices. 1538 """ 1539 import textwrap 1540 from collections import defaultdict 1541 from meerschaum.connectors import SQLConnector 1542 sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False) 1543 flavor = flavor or getattr(connectable, 'flavor', None) 1544 if not flavor: 1545 raise ValueError("Please provide a database flavor.") 1546 if flavor == 'duckdb' and not isinstance(connectable, SQLConnector): 1547 raise ValueError("You must provide a SQLConnector when using DuckDB.") 1548 if flavor in NO_SCHEMA_FLAVORS: 1549 schema = None 1550 if schema is None: 1551 schema = DEFAULT_SCHEMA_FLAVORS.get(flavor, None) 1552 if flavor in ('sqlite', 'duckdb', 'oracle', 'geopackage'): 1553 database = None 1554 table_trunc = truncate_item_name(table, flavor=flavor) 1555 table_lower = table.lower() 1556 table_upper = table.upper() 1557 table_lower_trunc = truncate_item_name(table_lower, flavor=flavor) 1558 table_upper_trunc = truncate_item_name(table_upper, flavor=flavor) 1559 db_prefix = ( 1560 "tempdb." 1561 if flavor == 'mssql' and table.startswith('#') 1562 else "" 1563 ) 1564 1565 cols_indices_query = sqlalchemy.text( 1566 textwrap.dedent(columns_indices_queries.get( 1567 flavor, 1568 columns_indices_queries['default'] 1569 ).format( 1570 table=table, 1571 table_trunc=table_trunc, 1572 table_lower=table_lower, 1573 table_lower_trunc=table_lower_trunc, 1574 table_upper=table_upper, 1575 table_upper_trunc=table_upper_trunc, 1576 db_prefix=db_prefix, 1577 schema=schema, 1578 )).lstrip().rstrip() 1579 ) 1580 1581 cols = ['database', 'schema', 'table', 'column', 'index', 'index_type'] 1582 if flavor == 'mssql': 1583 cols.append('clustered') 1584 result_cols_ix = dict(enumerate(cols)) 1585 1586 debug_kwargs = {'debug': debug} if isinstance(connectable, SQLConnector) else {} 1587 if not debug_kwargs and debug: 1588 dprint(cols_indices_query) 1589 1590 try: 1591 result_rows = ( 1592 [ 1593 row 1594 for row in connectable.execute(cols_indices_query, **debug_kwargs).fetchall() 1595 ] 1596 if flavor != 'duckdb' 1597 else [ 1598 tuple([doc[col] for col in cols]) 1599 for doc in connectable.read(cols_indices_query, debug=debug).to_dict(orient='records') 1600 ] 1601 ) 1602 cols_types_docs = [ 1603 { 1604 result_cols_ix[i]: val 1605 for i, val in enumerate(row) 1606 } 1607 for row in result_rows 1608 ] 1609 cols_types_docs_filtered = [ 1610 doc 1611 for doc in cols_types_docs 1612 if ( 1613 ( 1614 not schema 1615 or doc['schema'] == schema 1616 ) 1617 and 1618 ( 1619 not database 1620 or doc['database'] == database 1621 ) 1622 ) 1623 ] 1624 ### NOTE: This may return incorrect columns if the schema is not explicitly stated. 1625 if cols_types_docs and not cols_types_docs_filtered: 1626 cols_types_docs_filtered = cols_types_docs 1627 1628 cols_indices = defaultdict(lambda: []) 1629 for doc in cols_types_docs_filtered: 1630 col = ( 1631 doc['column'] 1632 if flavor != 'oracle' 1633 else ( 1634 doc['column'].lower() 1635 if (doc['column'].isupper() and doc['column'].replace('_', '').isalpha()) 1636 else doc['column'] 1637 ) 1638 ) 1639 index_doc = { 1640 'name': doc.get('index', None), 1641 'type': doc.get('index_type', None) 1642 } 1643 if flavor == 'mssql': 1644 index_doc['clustered'] = doc.get('clustered', None) 1645 cols_indices[col].append(index_doc) 1646 1647 return dict(cols_indices) 1648 except Exception as e: 1649 warn(f"Failed to fetch columns for table '{table}':\n{e}") 1650 return {} 1651 1652 1653def get_update_queries( 1654 target: str, 1655 patch: str, 1656 connectable: Union[ 1657 mrsm.connectors.sql.SQLConnector, 1658 'sqlalchemy.orm.session.Session' 1659 ], 1660 join_cols: Iterable[str], 1661 flavor: Optional[str] = None, 1662 upsert: bool = False, 1663 datetime_col: Optional[str] = None, 1664 schema: Optional[str] = None, 1665 patch_schema: Optional[str] = None, 1666 target_cols_types: Optional[Dict[str, str]] = None, 1667 patch_cols_types: Optional[Dict[str, str]] = None, 1668 identity_insert: bool = False, 1669 null_indices: bool = True, 1670 cast_columns: bool = True, 1671 debug: bool = False, 1672) -> List[str]: 1673 """ 1674 Build a list of `MERGE`, `UPDATE`, `DELETE`/`INSERT` queries to apply a patch to target table. 1675 1676 Parameters 1677 ---------- 1678 target: str 1679 The name of the target table. 1680 1681 patch: str 1682 The name of the patch table. This should have the same shape as the target. 1683 1684 connectable: Union[meerschaum.connectors.sql.SQLConnector, sqlalchemy.orm.session.Session] 1685 The `SQLConnector` or SQLAlchemy session which will later execute the queries. 1686 1687 join_cols: List[str] 1688 The columns to use to join the patch to the target. 1689 1690 flavor: Optional[str], default None 1691 If using a SQLAlchemy session, provide the expected database flavor. 1692 1693 upsert: bool, default False 1694 If `True`, return an upsert query rather than an update. 1695 1696 datetime_col: Optional[str], default None 1697 If provided, bound the join query using this column as the datetime index. 1698 This must be present on both tables. 1699 1700 schema: Optional[str], default None 1701 If provided, use this schema when quoting the target table. 1702 Defaults to `connector.schema`. 1703 1704 patch_schema: Optional[str], default None 1705 If provided, use this schema when quoting the patch table. 1706 Defaults to `schema`. 1707 1708 target_cols_types: Optional[Dict[str, Any]], default None 1709 If provided, use these as the columns-types dictionary for the target table. 1710 Default will infer from the database context. 1711 1712 patch_cols_types: Optional[Dict[str, Any]], default None 1713 If provided, use these as the columns-types dictionary for the target table. 1714 Default will infer from the database context. 1715 1716 identity_insert: bool, default False 1717 If `True`, include `SET IDENTITY_INSERT` queries before and after the update queries. 1718 Only applies for MSSQL upserts. 1719 1720 null_indices: bool, default True 1721 If `False`, do not coalesce index columns before joining. 1722 1723 cast_columns: bool, default True 1724 If `False`, do not cast update columns to the target table types. 1725 1726 debug: bool, default False 1727 Verbosity toggle. 1728 1729 Returns 1730 ------- 1731 A list of query strings to perform the update operation. 1732 """ 1733 import textwrap 1734 from meerschaum.connectors import SQLConnector 1735 from meerschaum.utils.debug import dprint 1736 from meerschaum.utils.dtypes import are_dtypes_equal 1737 from meerschaum.utils.dtypes.sql import DB_FLAVORS_CAST_DTYPES, get_pd_type_from_db_type 1738 flavor = flavor or getattr(connectable, 'flavor', None) 1739 if not flavor: 1740 raise ValueError("Provide a flavor if using a SQLAlchemy session.") 1741 if ( 1742 flavor in ('sqlite', 'geopackage') 1743 and isinstance(connectable, SQLConnector) 1744 and connectable.db_version < '3.33.0' 1745 ): 1746 flavor = 'sqlite_delete_insert' 1747 flavor_key = (f'{flavor}-upsert' if upsert else flavor) 1748 base_queries = UPDATE_QUERIES.get( 1749 flavor_key, 1750 UPDATE_QUERIES['default'] 1751 ) 1752 if not isinstance(base_queries, list): 1753 base_queries = [base_queries] 1754 schema = schema or (connectable.schema if isinstance(connectable, SQLConnector) else None) 1755 patch_schema = patch_schema or schema 1756 target_table_columns = get_table_cols_types( 1757 target, 1758 connectable, 1759 flavor=flavor, 1760 schema=schema, 1761 debug=debug, 1762 ) if not target_cols_types else target_cols_types 1763 patch_table_columns = get_table_cols_types( 1764 patch, 1765 connectable, 1766 flavor=flavor, 1767 schema=patch_schema, 1768 debug=debug, 1769 ) if not patch_cols_types else patch_cols_types 1770 1771 patch_cols_str = ', '.join( 1772 [ 1773 sql_item_name(col, flavor) 1774 for col in patch_table_columns 1775 ] 1776 ) 1777 patch_cols_prefixed_str = ', '.join( 1778 [ 1779 'p.' + sql_item_name(col, flavor) 1780 for col in patch_table_columns 1781 ] 1782 ) 1783 1784 join_cols_str = ', '.join( 1785 [ 1786 sql_item_name(col, flavor) 1787 for col in join_cols 1788 ] 1789 ) 1790 1791 value_cols = [] 1792 join_cols_types = [] 1793 if debug: 1794 dprint("target_table_columns:") 1795 mrsm.pprint(target_table_columns) 1796 for c_name, c_type in target_table_columns.items(): 1797 if c_name not in patch_table_columns: 1798 continue 1799 if flavor in DB_FLAVORS_CAST_DTYPES: 1800 c_type = DB_FLAVORS_CAST_DTYPES[flavor].get(c_type.upper(), c_type) 1801 ( 1802 join_cols_types 1803 if c_name in join_cols 1804 else value_cols 1805 ).append((c_name, c_type)) 1806 if debug: 1807 dprint(f"value_cols: {value_cols}") 1808 1809 if not join_cols_types: 1810 return [] 1811 if not value_cols and not upsert: 1812 return [] 1813 1814 coalesce_join_cols_str = ', '.join( 1815 [ 1816 ( 1817 ( 1818 'COALESCE(' 1819 + sql_item_name(c_name, flavor) 1820 + ', ' 1821 + get_null_replacement(c_type, flavor) 1822 + ')' 1823 ) 1824 if null_indices 1825 else sql_item_name(c_name, flavor) 1826 ) 1827 for c_name, c_type in join_cols_types 1828 ] 1829 ) 1830 1831 update_or_nothing = ('UPDATE' if value_cols else 'NOTHING') 1832 1833 def sets_subquery(l_prefix: str, r_prefix: str): 1834 if not value_cols: 1835 return '' 1836 1837 utc_value_cols = { 1838 c_name 1839 for c_name, c_type in value_cols 1840 if ('utc' in get_pd_type_from_db_type(c_type).lower()) 1841 } if flavor not in TIMEZONE_NAIVE_FLAVORS else set() 1842 1843 cast_func_cols = { 1844 c_name: ( 1845 ('', '', '') 1846 if not cast_columns or ( 1847 flavor == 'oracle' 1848 and are_dtypes_equal(get_pd_type_from_db_type(c_type), 'bytes') 1849 ) 1850 else ( 1851 ('CAST(', f" AS {c_type.replace('_', ' ')}", ')' + ( 1852 " AT TIME ZONE 'UTC'" 1853 if c_name in utc_value_cols 1854 else '' 1855 )) 1856 if flavor not in ('sqlite', 'geopackage') 1857 else ('', '', '') 1858 ) 1859 ) 1860 for c_name, c_type in value_cols 1861 } 1862 return 'SET ' + ',\n'.join([ 1863 ( 1864 l_prefix + sql_item_name(c_name, flavor, None) 1865 + ' = ' 1866 + cast_func_cols[c_name][0] 1867 + r_prefix + sql_item_name(c_name, flavor, None) 1868 + cast_func_cols[c_name][1] 1869 + cast_func_cols[c_name][2] 1870 ) for c_name, c_type in value_cols 1871 ]) 1872 1873 def and_subquery(l_prefix: str, r_prefix: str): 1874 return '\n AND\n '.join([ 1875 ( 1876 ( 1877 "COALESCE(" 1878 + l_prefix 1879 + sql_item_name(c_name, flavor, None) 1880 + ", " 1881 + get_null_replacement(c_type, flavor) 1882 + ")" 1883 + '\n =\n ' 1884 + "COALESCE(" 1885 + r_prefix 1886 + sql_item_name(c_name, flavor, None) 1887 + ", " 1888 + get_null_replacement(c_type, flavor) 1889 + ")" 1890 ) 1891 if null_indices 1892 else ( 1893 l_prefix 1894 + sql_item_name(c_name, flavor, None) 1895 + ' = ' 1896 + r_prefix 1897 + sql_item_name(c_name, flavor, None) 1898 ) 1899 ) for c_name, c_type in join_cols_types 1900 ]) 1901 1902 skip_query_val = "" 1903 target_table_name = sql_item_name(target, flavor, schema) 1904 patch_table_name = sql_item_name(patch, flavor, patch_schema) 1905 dt_col_name = sql_item_name(datetime_col, flavor, None) if datetime_col else None 1906 date_bounds_table = patch_table_name if flavor != 'mssql' else '[date_bounds]' 1907 min_dt_col_name = f"MIN({dt_col_name})" if flavor != 'mssql' else '[Min_dt]' 1908 max_dt_col_name = f"MAX({dt_col_name})" if flavor != 'mssql' else '[Max_dt]' 1909 date_bounds_subquery = ( 1910 f"""f.{dt_col_name} >= (SELECT {min_dt_col_name} FROM {date_bounds_table}) 1911 AND 1912 f.{dt_col_name} <= (SELECT {max_dt_col_name} FROM {date_bounds_table})""" 1913 if datetime_col 1914 else "1 = 1" 1915 ) 1916 with_temp_date_bounds = f"""WITH [date_bounds] AS ( 1917 SELECT MIN({dt_col_name}) AS {min_dt_col_name}, MAX({dt_col_name}) AS {max_dt_col_name} 1918 FROM {patch_table_name} 1919 )""" if datetime_col else "" 1920 identity_insert_on = ( 1921 f"SET IDENTITY_INSERT {target_table_name} ON" 1922 if identity_insert 1923 else skip_query_val 1924 ) 1925 identity_insert_off = ( 1926 f"SET IDENTITY_INSERT {target_table_name} OFF" 1927 if identity_insert 1928 else skip_query_val 1929 ) 1930 1931 ### NOTE: MSSQL upserts must exclude the update portion if only upserting indices. 1932 when_matched_update_sets_subquery_none = "" if not value_cols else ( 1933 "\n WHEN MATCHED THEN\n" 1934 f" UPDATE {sets_subquery('', 'p.')}" 1935 ) 1936 1937 cols_equal_values = '\n,'.join( 1938 [ 1939 f"{sql_item_name(c_name, flavor)} = VALUES({sql_item_name(c_name, flavor)})" 1940 for c_name, c_type in value_cols 1941 ] 1942 ) 1943 on_duplicate_key_update = ( 1944 "ON DUPLICATE KEY UPDATE" 1945 if value_cols 1946 else "" 1947 ) 1948 ignore = "IGNORE " if not value_cols else "" 1949 1950 formatted_queries = [ 1951 textwrap.dedent(base_query.format( 1952 sets_subquery_none=sets_subquery('', 'p.'), 1953 sets_subquery_none_excluded=sets_subquery('', 'EXCLUDED.'), 1954 sets_subquery_f=sets_subquery('f.', 'p.'), 1955 and_subquery_f=and_subquery('p.', 'f.'), 1956 and_subquery_t=and_subquery('p.', 't.'), 1957 target_table_name=target_table_name, 1958 patch_table_name=patch_table_name, 1959 patch_cols_str=patch_cols_str, 1960 patch_cols_prefixed_str=patch_cols_prefixed_str, 1961 date_bounds_subquery=date_bounds_subquery, 1962 join_cols_str=join_cols_str, 1963 coalesce_join_cols_str=coalesce_join_cols_str, 1964 update_or_nothing=update_or_nothing, 1965 when_matched_update_sets_subquery_none=when_matched_update_sets_subquery_none, 1966 cols_equal_values=cols_equal_values, 1967 on_duplicate_key_update=on_duplicate_key_update, 1968 ignore=ignore, 1969 with_temp_date_bounds=with_temp_date_bounds, 1970 identity_insert_on=identity_insert_on, 1971 identity_insert_off=identity_insert_off, 1972 )).lstrip().rstrip() 1973 for base_query in base_queries 1974 ] 1975 1976 ### NOTE: Allow for skipping some queries. 1977 return [query for query in formatted_queries if query] 1978 1979 1980def get_null_replacement(typ: str, flavor: str) -> str: 1981 """ 1982 Return a value that may temporarily be used in place of NULL for this type. 1983 1984 Parameters 1985 ---------- 1986 typ: str 1987 The typ to be converted to NULL. 1988 1989 flavor: str 1990 The database flavor for which this value will be used. 1991 1992 Returns 1993 ------- 1994 A value which may stand in place of NULL for this type. 1995 `'None'` is returned if a value cannot be determined. 1996 """ 1997 from meerschaum.utils.dtypes.sql import DB_FLAVORS_CAST_DTYPES 1998 if 'geometry' in typ.lower(): 1999 return "'010100000000008058346FCDC100008058346FCDC1'" 2000 if 'int' in typ.lower() or typ.lower() in ('numeric', 'number'): 2001 return '-987654321' 2002 if 'bool' in typ.lower() or typ.lower() == 'bit': 2003 bool_typ = ( 2004 PD_TO_DB_DTYPES_FLAVORS 2005 .get('bool', {}) 2006 .get(flavor, PD_TO_DB_DTYPES_FLAVORS['bool']['default']) 2007 ) 2008 if flavor in DB_FLAVORS_CAST_DTYPES: 2009 bool_typ = DB_FLAVORS_CAST_DTYPES[flavor].get(bool_typ, bool_typ) 2010 val_to_cast = ( 2011 -987654321 2012 if flavor in ('mysql', 'mariadb') 2013 else 0 2014 ) 2015 return f'CAST({val_to_cast} AS {bool_typ})' 2016 if 'time' in typ.lower() or 'date' in typ.lower(): 2017 db_type = typ if typ.isupper() else None 2018 return dateadd_str(flavor=flavor, begin='1900-01-01', db_type=db_type) 2019 if 'float' in typ.lower() or 'double' in typ.lower() or typ.lower() in ('decimal',): 2020 return '-987654321.0' 2021 if flavor == 'oracle' and typ.lower().split('(', maxsplit=1)[0] == 'char': 2022 return "'-987654321'" 2023 if flavor == 'oracle' and typ.lower() in ('blob', 'bytes'): 2024 return '00' 2025 if typ.lower() in ('uniqueidentifier', 'guid', 'uuid'): 2026 magic_val = 'DEADBEEF-ABBA-BABE-CAFE-DECAFC0FFEE5' 2027 if flavor == 'mssql': 2028 return f"CAST('{magic_val}' AS UNIQUEIDENTIFIER)" 2029 return f"'{magic_val}'" 2030 return ('n' if flavor == 'oracle' else '') + "'-987654321'" 2031 2032 2033def get_db_version(conn: 'SQLConnector', debug: bool = False) -> Union[str, None]: 2034 """ 2035 Fetch the database version if possible. 2036 """ 2037 version_name = sql_item_name('version', conn.flavor, None) 2038 version_query = version_queries.get( 2039 conn.flavor, 2040 version_queries['default'] 2041 ).format(version_name=version_name) 2042 return conn.value(version_query, debug=debug) 2043 2044 2045def get_rename_table_queries( 2046 old_table: str, 2047 new_table: str, 2048 flavor: str, 2049 schema: Optional[str] = None, 2050) -> List[str]: 2051 """ 2052 Return queries to alter a table's name. 2053 2054 Parameters 2055 ---------- 2056 old_table: str 2057 The unquoted name of the old table. 2058 2059 new_table: str 2060 The unquoted name of the new table. 2061 2062 flavor: str 2063 The database flavor to use for the query (e.g. `'mssql'`, `'postgresql'`. 2064 2065 schema: Optional[str], default None 2066 The schema on which the table resides. 2067 2068 Returns 2069 ------- 2070 A list of `ALTER TABLE` or equivalent queries for the database flavor. 2071 """ 2072 old_table_name = sql_item_name(old_table, flavor, schema) 2073 new_table_name = sql_item_name(new_table, flavor, None) 2074 tmp_table = '_tmp_rename_' + new_table 2075 tmp_table_name = sql_item_name(tmp_table, flavor, schema) 2076 if flavor == 'mssql': 2077 return [f"EXEC sp_rename '{old_table}', '{new_table}'"] 2078 2079 if_exists_str = "IF EXISTS" if flavor in DROP_IF_EXISTS_FLAVORS else "" 2080 if flavor == 'duckdb': 2081 return ( 2082 get_create_table_queries( 2083 f"SELECT * FROM {old_table_name}", 2084 tmp_table, 2085 'duckdb', 2086 schema, 2087 ) + get_create_table_queries( 2088 f"SELECT * FROM {tmp_table_name}", 2089 new_table, 2090 'duckdb', 2091 schema, 2092 ) + [ 2093 f"DROP TABLE {if_exists_str} {tmp_table_name}", 2094 f"DROP TABLE {if_exists_str} {old_table_name}", 2095 ] 2096 ) 2097 2098 return [f"ALTER TABLE {old_table_name} RENAME TO {new_table_name}"] 2099 2100 2101def get_create_table_query( 2102 query_or_dtypes: Union[str, Dict[str, str]], 2103 new_table: str, 2104 flavor: str, 2105 schema: Optional[str] = None, 2106) -> str: 2107 """ 2108 NOTE: This function is deprecated. Use `get_create_table_queries()` instead. 2109 2110 Return a query to create a new table from a `SELECT` query. 2111 2112 Parameters 2113 ---------- 2114 query: Union[str, Dict[str, str]] 2115 The select query to use for the creation of the table. 2116 If a dictionary is provided, return a `CREATE TABLE` query from the given `dtypes` columns. 2117 2118 new_table: str 2119 The unquoted name of the new table. 2120 2121 flavor: str 2122 The database flavor to use for the query (e.g. `'mssql'`, `'postgresql'`). 2123 2124 schema: Optional[str], default None 2125 The schema on which the table will reside. 2126 2127 Returns 2128 ------- 2129 A `CREATE TABLE` (or `SELECT INTO`) query for the database flavor. 2130 """ 2131 return get_create_table_queries( 2132 query_or_dtypes, 2133 new_table, 2134 flavor, 2135 schema=schema, 2136 primary_key=None, 2137 )[0] 2138 2139 2140def get_create_table_queries( 2141 query_or_dtypes: Union[str, Dict[str, str]], 2142 new_table: str, 2143 flavor: str, 2144 schema: Optional[str] = None, 2145 primary_key: Optional[str] = None, 2146 primary_key_db_type: Optional[str] = None, 2147 autoincrement: bool = False, 2148 datetime_column: Optional[str] = None, 2149) -> List[str]: 2150 """ 2151 Return a query to create a new table from a `SELECT` query or a `dtypes` dictionary. 2152 2153 Parameters 2154 ---------- 2155 query_or_dtypes: Union[str, Dict[str, str]] 2156 The select query to use for the creation of the table. 2157 If a dictionary is provided, return a `CREATE TABLE` query from the given `dtypes` columns. 2158 2159 new_table: str 2160 The unquoted name of the new table. 2161 2162 flavor: str 2163 The database flavor to use for the query (e.g. `'mssql'`, `'postgresql'`). 2164 2165 schema: Optional[str], default None 2166 The schema on which the table will reside. 2167 2168 primary_key: Optional[str], default None 2169 If provided, designate this column as the primary key in the new table. 2170 2171 primary_key_db_type: Optional[str], default None 2172 If provided, alter the primary key to this type (to set NOT NULL constraint). 2173 2174 autoincrement: bool, default False 2175 If `True` and `primary_key` is provided, create the `primary_key` column 2176 as an auto-incrementing integer column. 2177 2178 datetime_column: Optional[str], default None 2179 If provided, include this column in the primary key. 2180 Applicable to TimescaleDB only. 2181 2182 Returns 2183 ------- 2184 A `CREATE TABLE` (or `SELECT INTO`) query for the database flavor. 2185 """ 2186 if not isinstance(query_or_dtypes, (str, dict)): 2187 raise TypeError("`query_or_dtypes` must be a query or a dtypes dictionary.") 2188 2189 method = ( 2190 _get_create_table_query_from_cte 2191 if isinstance(query_or_dtypes, str) 2192 else _get_create_table_query_from_dtypes 2193 ) 2194 return method( 2195 query_or_dtypes, 2196 new_table, 2197 flavor, 2198 schema=schema, 2199 primary_key=primary_key, 2200 primary_key_db_type=primary_key_db_type, 2201 autoincrement=(autoincrement and flavor not in SKIP_AUTO_INCREMENT_FLAVORS), 2202 datetime_column=datetime_column, 2203 ) 2204 2205 2206def _get_create_table_query_from_dtypes( 2207 dtypes: Dict[str, str], 2208 new_table: str, 2209 flavor: str, 2210 schema: Optional[str] = None, 2211 primary_key: Optional[str] = None, 2212 primary_key_db_type: Optional[str] = None, 2213 autoincrement: bool = False, 2214 datetime_column: Optional[str] = None, 2215) -> List[str]: 2216 """ 2217 Create a new table from a `dtypes` dictionary. 2218 """ 2219 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type, AUTO_INCREMENT_COLUMN_FLAVORS 2220 if not dtypes and not primary_key: 2221 raise ValueError(f"Expecting columns for table '{new_table}'.") 2222 2223 if flavor in SKIP_AUTO_INCREMENT_FLAVORS: 2224 autoincrement = False 2225 2226 cols_types = ( 2227 [ 2228 ( 2229 primary_key, 2230 get_db_type_from_pd_type(dtypes.get(primary_key, 'int') or 'int', flavor=flavor) 2231 ) 2232 ] 2233 if primary_key 2234 else [] 2235 ) + [ 2236 (col, get_db_type_from_pd_type(typ, flavor=flavor)) 2237 for col, typ in dtypes.items() 2238 if col != primary_key 2239 ] 2240 2241 table_name = sql_item_name(new_table, schema=schema, flavor=flavor) 2242 primary_key_name = sql_item_name(primary_key, flavor) if primary_key else None 2243 primary_key_constraint_name = ( 2244 sql_item_name(f'PK_{new_table}', flavor, None) 2245 if primary_key 2246 else None 2247 ) 2248 datetime_column_name = sql_item_name(datetime_column, flavor) if datetime_column else None 2249 primary_key_clustered = ( 2250 "CLUSTERED" 2251 if not datetime_column or datetime_column == primary_key 2252 else "NONCLUSTERED" 2253 ) 2254 query = f"CREATE TABLE {table_name} (" 2255 if primary_key: 2256 col_db_type = cols_types[0][1] 2257 auto_increment_str = (' ' + AUTO_INCREMENT_COLUMN_FLAVORS.get( 2258 flavor, 2259 AUTO_INCREMENT_COLUMN_FLAVORS['default'] 2260 )) if autoincrement or primary_key not in dtypes else '' 2261 col_name = sql_item_name(primary_key, flavor=flavor, schema=None) 2262 2263 if flavor in ('sqlite', 'geopackage'): 2264 query += ( 2265 f"\n {col_name} " 2266 + (f"{col_db_type}" if not auto_increment_str else 'INTEGER') 2267 + f" PRIMARY KEY{auto_increment_str} NOT NULL," 2268 ) 2269 elif flavor == 'oracle': 2270 query += f"\n {col_name} {col_db_type} {auto_increment_str} PRIMARY KEY," 2271 elif ( 2272 flavor in ('timescaledb', 'timescaledb-ha') 2273 and datetime_column 2274 and datetime_column != primary_key 2275 ): 2276 query += f"\n {col_name} {col_db_type}{auto_increment_str} NOT NULL," 2277 elif flavor == 'mssql': 2278 query += f"\n {col_name} {col_db_type}{auto_increment_str} NOT NULL," 2279 else: 2280 query += f"\n {col_name} {col_db_type} PRIMARY KEY{auto_increment_str} NOT NULL," 2281 2282 for col, db_type in cols_types: 2283 if col == primary_key: 2284 continue 2285 col_name = sql_item_name(col, schema=None, flavor=flavor) 2286 query += f"\n {col_name} {db_type}," 2287 if ( 2288 flavor in ('timescaledb', 'timescaledb-ha') 2289 and datetime_column 2290 and primary_key 2291 and datetime_column != primary_key 2292 ): 2293 query += f"\n PRIMARY KEY({datetime_column_name}, {primary_key_name})," 2294 2295 if flavor == 'mssql' and primary_key: 2296 query += f"\n CONSTRAINT {primary_key_constraint_name} PRIMARY KEY {primary_key_clustered} ({primary_key_name})," 2297 2298 query = query[:-1] 2299 query += "\n)" 2300 2301 queries = [query] 2302 return queries 2303 2304 2305def _get_create_table_query_from_cte( 2306 query: str, 2307 new_table: str, 2308 flavor: str, 2309 schema: Optional[str] = None, 2310 primary_key: Optional[str] = None, 2311 primary_key_db_type: Optional[str] = None, 2312 autoincrement: bool = False, 2313 datetime_column: Optional[str] = None, 2314) -> List[str]: 2315 """ 2316 Create a new table from a CTE query. 2317 """ 2318 import textwrap 2319 create_cte = 'create_query' 2320 create_cte_name = sql_item_name(create_cte, flavor, None) 2321 new_table_name = sql_item_name(new_table, flavor, schema) 2322 primary_key_constraint_name = ( 2323 sql_item_name(f'PK_{new_table}', flavor, None) 2324 if primary_key 2325 else None 2326 ) 2327 primary_key_name = ( 2328 sql_item_name(primary_key, flavor, None) 2329 if primary_key 2330 else None 2331 ) 2332 primary_key_clustered = ( 2333 "CLUSTERED" 2334 if not datetime_column or datetime_column == primary_key 2335 else "NONCLUSTERED" 2336 ) 2337 datetime_column_name = ( 2338 sql_item_name(datetime_column, flavor) 2339 if datetime_column 2340 else None 2341 ) 2342 if flavor in ('mssql',): 2343 query = query.lstrip() 2344 if query.lower().startswith('with '): 2345 final_select_ix = query.lower().rfind('select') 2346 create_table_queries = [ 2347 ( 2348 query[:final_select_ix].rstrip() + ',\n' 2349 + f"{create_cte_name} AS (\n" 2350 + textwrap.indent(query[final_select_ix:], ' ') 2351 + "\n)\n" 2352 + f"SELECT *\nINTO {new_table_name}\nFROM {create_cte_name}" 2353 ), 2354 ] 2355 else: 2356 create_table_queries = [ 2357 ( 2358 "SELECT *\n" 2359 f"INTO {new_table_name}\n" 2360 f"FROM (\n{textwrap.indent(query, ' ')}\n) AS {create_cte_name}" 2361 ), 2362 ] 2363 2364 alter_type_queries = [] 2365 if primary_key_db_type: 2366 alter_type_queries.extend([ 2367 ( 2368 f"ALTER TABLE {new_table_name}\n" 2369 f"ALTER COLUMN {primary_key_name} {primary_key_db_type} NOT NULL" 2370 ), 2371 ]) 2372 alter_type_queries.extend([ 2373 ( 2374 f"ALTER TABLE {new_table_name}\n" 2375 f"ADD CONSTRAINT {primary_key_constraint_name} " 2376 f"PRIMARY KEY {primary_key_clustered} ({primary_key_name})" 2377 ), 2378 ]) 2379 elif flavor in (None,): 2380 create_table_queries = [ 2381 ( 2382 f"WITH {create_cte_name} AS (\n{textwrap.index(query, ' ')}\n)\n" 2383 f"CREATE TABLE {new_table_name} AS\n" 2384 "SELECT *\n" 2385 f"FROM {create_cte_name}" 2386 ), 2387 ] 2388 2389 alter_type_queries = [ 2390 ( 2391 f"ALTER TABLE {new_table_name}\n" 2392 f"ADD PRIMARY KEY ({primary_key_name})" 2393 ), 2394 ] 2395 elif flavor in ('sqlite', 'mysql', 'mariadb', 'duckdb', 'oracle', 'geopackage'): 2396 create_table_queries = [ 2397 ( 2398 f"CREATE TABLE {new_table_name} AS\n" 2399 "SELECT *\n" 2400 f"FROM (\n{textwrap.indent(query, ' ')}\n)" 2401 + (f" AS {create_cte_name}" if flavor != 'oracle' else '') 2402 ), 2403 ] 2404 2405 alter_type_queries = [ 2406 ( 2407 f"ALTER TABLE {new_table_name}\n" 2408 "ADD PRIMARY KEY ({primary_key_name})" 2409 ), 2410 ] 2411 elif ( 2412 flavor in ('timescaledb', 'timescaledb-ha') 2413 and datetime_column 2414 and datetime_column != primary_key 2415 ): 2416 create_table_queries = [ 2417 ( 2418 "SELECT *\n" 2419 f"INTO {new_table_name}\n" 2420 f"FROM (\n{textwrap.indent(query, ' ')}\n) AS {create_cte_name}\n" 2421 ), 2422 ] 2423 2424 alter_type_queries = [ 2425 ( 2426 f"ALTER TABLE {new_table_name}\n" 2427 f"ADD PRIMARY KEY ({datetime_column_name}, {primary_key_name})" 2428 ), 2429 ] 2430 else: 2431 create_table_queries = [ 2432 ( 2433 "SELECT *\n" 2434 f"INTO {new_table_name}\n" 2435 f"FROM (\n{textwrap.indent(query, ' ')}\n) AS {create_cte_name}" 2436 ), 2437 ] 2438 2439 alter_type_queries = [ 2440 ( 2441 f"ALTER TABLE {new_table_name}\n" 2442 f"ADD PRIMARY KEY ({primary_key_name})" 2443 ), 2444 ] 2445 2446 if not primary_key: 2447 return create_table_queries 2448 2449 return create_table_queries + alter_type_queries 2450 2451 2452def wrap_query_with_cte( 2453 sub_query: str, 2454 parent_query: str, 2455 flavor: str, 2456 cte_name: str = "src", 2457) -> str: 2458 """ 2459 Wrap a subquery in a CTE and append an encapsulating query. 2460 2461 Parameters 2462 ---------- 2463 sub_query: str 2464 The query to be referenced. This may itself contain CTEs. 2465 Unless `cte_name` is provided, this will be aliased as `src`. 2466 2467 parent_query: str 2468 The larger query to append which references the subquery. 2469 This must not contain CTEs. 2470 2471 flavor: str 2472 The database flavor, e.g. `'mssql'`. 2473 2474 cte_name: str, default 'src' 2475 The CTE alias, defaults to `src`. 2476 2477 Returns 2478 ------- 2479 An encapsulating query which allows you to treat `sub_query` as a temporary table. 2480 2481 Examples 2482 -------- 2483 2484 ```python 2485 from meerschaum.utils.sql import wrap_query_with_cte 2486 sub_query = "WITH foo AS (SELECT 1 AS val) SELECT (val * 2) AS newval FROM foo" 2487 parent_query = "SELECT newval * 3 FROM src" 2488 query = wrap_query_with_cte(sub_query, parent_query, 'mssql') 2489 print(query) 2490 # WITH foo AS (SELECT 1 AS val), 2491 # [src] AS ( 2492 # SELECT (val * 2) AS newval FROM foo 2493 # ) 2494 # SELECT newval * 3 FROM src 2495 ``` 2496 2497 """ 2498 import textwrap 2499 sub_query = sub_query.lstrip() 2500 cte_name_quoted = sql_item_name(cte_name, flavor, None) 2501 2502 if flavor in NO_CTE_FLAVORS: 2503 return ( 2504 parent_query 2505 .replace(cte_name_quoted, '--MRSM_SUBQUERY--') 2506 .replace(cte_name, '--MRSM_SUBQUERY--') 2507 .replace('--MRSM_SUBQUERY--', f"(\n{sub_query}\n) AS {cte_name_quoted}") 2508 ) 2509 2510 if sub_query.lstrip().lower().startswith('with '): 2511 final_select_ix = sub_query.lower().rfind('select') 2512 return ( 2513 sub_query[:final_select_ix].rstrip() + ',\n' 2514 + f"{cte_name_quoted} AS (\n" 2515 + ' ' + sub_query[final_select_ix:] 2516 + "\n)\n" 2517 + parent_query 2518 ) 2519 2520 return ( 2521 f"WITH {cte_name_quoted} AS (\n" 2522 f"{textwrap.indent(sub_query, ' ')}\n" 2523 f")\n{parent_query}" 2524 ) 2525 2526 2527def format_cte_subquery( 2528 sub_query: str, 2529 flavor: str, 2530 sub_name: str = 'src', 2531 cols_to_select: Union[List[str], str] = '*', 2532) -> str: 2533 """ 2534 Given a subquery, build a wrapper query that selects from the CTE subquery. 2535 2536 Parameters 2537 ---------- 2538 sub_query: str 2539 The subquery to wrap. 2540 2541 flavor: str 2542 The database flavor to use for the query (e.g. `'mssql'`, `'postgresql'`. 2543 2544 sub_name: str, default 'src' 2545 If possible, give this name to the CTE (must be unquoted). 2546 2547 cols_to_select: Union[List[str], str], default '' 2548 If specified, choose which columns to select from the CTE. 2549 If a list of strings is provided, each item will be quoted and joined with commas. 2550 If a string is given, assume it is quoted and insert it into the query. 2551 2552 Returns 2553 ------- 2554 A wrapper query that selects from the CTE. 2555 """ 2556 quoted_sub_name = sql_item_name(sub_name, flavor, None) 2557 cols_str = ( 2558 cols_to_select 2559 if isinstance(cols_to_select, str) 2560 else ', '.join([sql_item_name(col, flavor, None) for col in cols_to_select]) 2561 ) 2562 parent_query = ( 2563 f"SELECT {cols_str}\n" 2564 f"FROM {quoted_sub_name}" 2565 ) 2566 return wrap_query_with_cte(sub_query, parent_query, flavor, cte_name=sub_name) 2567 2568 2569def session_execute( 2570 session: 'sqlalchemy.orm.session.Session', 2571 queries: Union[List[str], str], 2572 with_results: bool = False, 2573 debug: bool = False, 2574) -> Union[mrsm.SuccessTuple, Tuple[mrsm.SuccessTuple, List['sqlalchemy.sql.ResultProxy']]]: 2575 """ 2576 Similar to `SQLConnector.exec_queries()`, execute a list of queries 2577 and roll back when one fails. 2578 2579 Parameters 2580 ---------- 2581 session: sqlalchemy.orm.session.Session 2582 A SQLAlchemy session representing a transaction. 2583 2584 queries: Union[List[str], str] 2585 A query or list of queries to be executed. 2586 If a query fails, roll back the session. 2587 2588 with_results: bool, default False 2589 If `True`, return a list of result objects. 2590 2591 Returns 2592 ------- 2593 A `SuccessTuple` indicating the queries were successfully executed. 2594 If `with_results`, return the `SuccessTuple` and a list of results. 2595 """ 2596 sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False) 2597 if not isinstance(queries, list): 2598 queries = [queries] 2599 successes, msgs, results = [], [], [] 2600 for query in queries: 2601 if debug: 2602 dprint(query) 2603 query_text = sqlalchemy.text(query) 2604 fail_msg = "Failed to execute queries." 2605 try: 2606 result = session.execute(query_text) 2607 query_success = result is not None 2608 query_msg = "Success" if query_success else fail_msg 2609 except Exception as e: 2610 query_success = False 2611 query_msg = f"{fail_msg}\n{e}" 2612 result = None 2613 successes.append(query_success) 2614 msgs.append(query_msg) 2615 results.append(result) 2616 if not query_success: 2617 if debug: 2618 dprint("Rolling back session.") 2619 session.rollback() 2620 break 2621 success, msg = all(successes), '\n'.join(msgs) 2622 if with_results: 2623 return (success, msg), results 2624 return success, msg 2625 2626 2627def get_reset_autoincrement_queries( 2628 table: str, 2629 column: str, 2630 connector: mrsm.connectors.SQLConnector, 2631 schema: Optional[str] = None, 2632 debug: bool = False, 2633) -> List[str]: 2634 """ 2635 Return a list of queries to reset a table's auto-increment counter to the next largest value. 2636 2637 Parameters 2638 ---------- 2639 table: str 2640 The name of the table on which the auto-incrementing column exists. 2641 2642 column: str 2643 The name of the auto-incrementing column. 2644 2645 connector: mrsm.connectors.SQLConnector 2646 The SQLConnector to the database on which the table exists. 2647 2648 schema: Optional[str], default None 2649 The schema of the table. Defaults to `connector.schema`. 2650 2651 Returns 2652 ------- 2653 A list of queries to be executed to reset the auto-incrementing column. 2654 """ 2655 if not table_exists(table, connector, schema=schema, debug=debug): 2656 return [] 2657 2658 schema = schema or connector.schema 2659 max_id_name = sql_item_name('max_id', connector.flavor) 2660 table_name = sql_item_name(table, connector.flavor, schema) 2661 table_seq_name = sql_item_name(table + '_' + column + '_seq', connector.flavor, schema) 2662 column_name = sql_item_name(column, connector.flavor) 2663 max_id = connector.value( 2664 f""" 2665 SELECT COALESCE(MAX({column_name}), 0) AS {max_id_name} 2666 FROM {table_name} 2667 """, 2668 debug=debug, 2669 ) 2670 if max_id is None: 2671 return [] 2672 2673 reset_queries = reset_autoincrement_queries.get( 2674 connector.flavor, 2675 reset_autoincrement_queries['default'] 2676 ) 2677 if not isinstance(reset_queries, list): 2678 reset_queries = [reset_queries] 2679 2680 return [ 2681 query.format( 2682 column=column, 2683 column_name=column_name, 2684 table=table, 2685 table_name=table_name, 2686 table_seq_name=table_seq_name, 2687 val=max_id, 2688 val_plus_1=(max_id + 1), 2689 ) 2690 for query in reset_queries 2691 ] 2692 2693 2694def get_postgis_geo_columns_types( 2695 connectable: Union[ 2696 'mrsm.connectors.sql.SQLConnector', 2697 'sqlalchemy.orm.session.Session', 2698 'sqlalchemy.engine.base.Engine' 2699 ], 2700 table: str, 2701 schema: Optional[str] = 'public', 2702 debug: bool = False, 2703) -> Dict[str, str]: 2704 """ 2705 Return a dictionary mapping PostGIS geometry column names to geometry types. 2706 """ 2707 from meerschaum.utils.dtypes import get_geometry_type_srid 2708 sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False) 2709 default_type, default_srid = get_geometry_type_srid() 2710 default_type = default_type.upper() 2711 2712 clean(table) 2713 clean(str(schema)) 2714 schema = schema or 'public' 2715 truncated_schema_name = truncate_item_name(schema, flavor='postgis') 2716 truncated_table_name = truncate_item_name(table, flavor='postgis') 2717 query = sqlalchemy.text( 2718 "SELECT \"f_geometry_column\" AS \"column\", 'GEOMETRY' AS \"func\", \"type\", \"srid\"\n" 2719 "FROM \"geometry_columns\"\n" 2720 f"WHERE \"f_table_schema\" = '{truncated_schema_name}'\n" 2721 f" AND \"f_table_name\" = '{truncated_table_name}'\n" 2722 "UNION ALL\n" 2723 "SELECT \"f_geography_column\" AS \"column\", 'GEOGRAPHY' AS \"func\", \"type\", \"srid\"\n" 2724 "FROM \"geography_columns\"\n" 2725 f"WHERE \"f_table_schema\" = '{truncated_schema_name}'\n" 2726 f" AND \"f_table_name\" = '{truncated_table_name}'\n" 2727 ) 2728 debug_kwargs = {'debug': debug} if isinstance(connectable, mrsm.connectors.SQLConnector) else {} 2729 result_rows = [ 2730 row 2731 for row in connectable.execute(query, **debug_kwargs).fetchall() 2732 ] 2733 cols_type_tuples = { 2734 row[0]: (row[1], row[2], row[3]) 2735 for row in result_rows 2736 } 2737 2738 geometry_cols_types = { 2739 col: ( 2740 f"{func}({typ.upper()}, {srid})" 2741 if srid 2742 else ( 2743 func 2744 + ( 2745 f'({typ.upper()})' 2746 if typ.upper() not in ('GEOMETRY', 'GEOGRAPHY') 2747 else '' 2748 ) 2749 ) 2750 ) 2751 for col, (func, typ, srid) in cols_type_tuples.items() 2752 } 2753 return geometry_cols_types 2754 2755 2756def get_create_schema_if_not_exists_queries( 2757 schema: str, 2758 flavor: str, 2759) -> List[str]: 2760 """ 2761 Return the queries to create a schema if it does not yet exist. 2762 For databases which do not support schemas, an empty list will be returned. 2763 """ 2764 if not schema: 2765 return [] 2766 2767 if flavor in NO_SCHEMA_FLAVORS: 2768 return [] 2769 2770 if schema == DEFAULT_SCHEMA_FLAVORS.get(flavor, None): 2771 return [] 2772 2773 clean(schema) 2774 2775 if flavor == 'mssql': 2776 return [ 2777 ( 2778 f"IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = '{schema}')\n" 2779 "BEGIN\n" 2780 f" EXEC('CREATE SCHEMA {sql_item_name(schema, flavor)}');\n" 2781 "END;" 2782 ) 2783 ] 2784 2785 if flavor == 'oracle': 2786 return [] 2787 2788 return [ 2789 f"CREATE SCHEMA IF NOT EXISTS {sql_item_name(schema, flavor)};" 2790 ]
669def clean(substring: str) -> None: 670 """ 671 Ensure a substring is clean enough to be inserted into a SQL query. 672 Raises an exception when banned words are used. 673 """ 674 from meerschaum.utils.warnings import error 675 banned_symbols = [';', '--', 'drop ',] 676 for symbol in banned_symbols: 677 if symbol in str(substring).lower(): 678 error(f"Invalid string: '{substring}'")
Ensure a substring is clean enough to be inserted into a SQL query. Raises an exception when banned words are used.
681def dateadd_str( 682 flavor: str = 'postgresql', 683 datepart: str = 'day', 684 number: Union[int, float] = 0, 685 begin: Union[str, datetime, int] = 'now', 686 db_type: Optional[str] = None, 687) -> str: 688 """ 689 Generate a `DATEADD` clause depending on database flavor. 690 691 Parameters 692 ---------- 693 flavor: str, default `'postgresql'` 694 SQL database flavor, e.g. `'postgresql'`, `'sqlite'`. 695 696 Currently supported flavors: 697 698 - `'postgresql'` 699 - `'postgis'` 700 - `'timescaledb'` 701 - `'timescaledb-ha'` 702 - `'citus'` 703 - `'cockroachdb'` 704 - `'duckdb'` 705 - `'mssql'` 706 - `'mysql'` 707 - `'mariadb'` 708 - `'sqlite'` 709 - `'geopackage'` 710 - `'oracle'` 711 712 datepart: str, default `'day'` 713 Which part of the date to modify. Supported values: 714 715 - `'year'` 716 - `'month'` 717 - `'day'` 718 - `'hour'` 719 - `'minute'` 720 - `'second'` 721 722 number: Union[int, float], default `0` 723 How many units to add to the date part. 724 725 begin: Union[str, datetime], default `'now'` 726 Base datetime to which to add dateparts. 727 728 db_type: Optional[str], default None 729 If provided, cast the datetime string as the type. 730 Otherwise, infer this from the input datetime value. 731 732 Returns 733 ------- 734 The appropriate `DATEADD` string for the corresponding database flavor. 735 736 Examples 737 -------- 738 >>> dateadd_str( 739 ... flavor='mssql', 740 ... begin=datetime(2022, 1, 1, 0, 0), 741 ... number=1, 742 ... ) 743 "DATEADD(day, 1, CAST('2022-01-01 00:00:00' AS DATETIME2))" 744 >>> dateadd_str( 745 ... flavor='postgresql', 746 ... begin=datetime(2022, 1, 1, 0, 0), 747 ... number=1, 748 ... ) 749 "CAST('2022-01-01 00:00:00' AS TIMESTAMP) + INTERVAL '1 day'" 750 751 """ 752 from meerschaum.utils.packages import attempt_import 753 from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type, get_pd_type_from_db_type 754 dateutil_parser = attempt_import('dateutil.parser') 755 if 'int' in str(type(begin)).lower(): 756 num_str = str(begin) 757 if number is not None and number != 0: 758 num_str += ( 759 f' + {number}' 760 if number > 0 761 else f" - {number * -1}" 762 ) 763 return num_str 764 if not begin: 765 return '' 766 767 _original_begin = begin 768 begin_time = None 769 ### Sanity check: make sure `begin` is a valid datetime before we inject anything. 770 if not isinstance(begin, datetime): 771 try: 772 begin_time = dateutil_parser.parse(begin) 773 except Exception: 774 begin_time = None 775 else: 776 begin_time = begin 777 778 ### Unable to parse into a datetime. 779 if begin_time is None: 780 ### Throw an error if banned symbols are included in the `begin` string. 781 clean(str(begin)) 782 ### If begin is a valid datetime, wrap it in quotes. 783 else: 784 if isinstance(begin, datetime) and begin.tzinfo is not None: 785 begin = begin.astimezone(timezone.utc) 786 begin = ( 787 f"'{begin.replace(tzinfo=None)}'" 788 if isinstance(begin, datetime) and flavor in TIMEZONE_NAIVE_FLAVORS 789 else f"'{begin}'" 790 ) 791 792 dt_is_utc = ( 793 begin_time.tzinfo is not None 794 if begin_time is not None 795 else ('+' in str(begin) or '-' in str(begin).split(':', maxsplit=1)[-1]) 796 ) 797 if db_type: 798 db_type_is_utc = 'utc' in get_pd_type_from_db_type(db_type).lower() 799 dt_is_utc = dt_is_utc or db_type_is_utc 800 db_type = db_type or get_db_type_from_pd_type( 801 ('datetime64[ns, UTC]' if dt_is_utc else 'datetime64[ns]'), 802 flavor=flavor, 803 ) 804 805 da = "" 806 if flavor in ( 807 'postgresql', 808 'postgis', 809 'timescaledb', 810 'timescaledb-ha', 811 'cockroachdb', 812 'citus', 813 ): 814 begin = ( 815 f"CAST({begin} AS {db_type})" if begin != 'now' 816 else f"CAST(NOW() AT TIME ZONE 'utc' AS {db_type})" 817 ) 818 if dt_is_utc: 819 begin += " AT TIME ZONE 'UTC'" 820 da = begin + (f" + INTERVAL '{number} {datepart}'" if number != 0 else '') 821 822 elif flavor == 'duckdb': 823 begin = f"CAST({begin} AS {db_type})" if begin != 'now' else 'NOW()' 824 if dt_is_utc: 825 begin += " AT TIME ZONE 'UTC'" 826 da = begin + (f" + INTERVAL '{number} {datepart}'" if number != 0 else '') 827 828 elif flavor in ('mssql',): 829 if begin_time and begin_time.microsecond != 0 and not dt_is_utc: 830 begin = begin[:-4] + "'" 831 begin = f"CAST({begin} AS {db_type})" if begin != 'now' else 'GETUTCDATE()' 832 if dt_is_utc: 833 begin += " AT TIME ZONE 'UTC'" 834 da = f"DATEADD({datepart}, {number}, {begin})" if number != 0 else begin 835 836 elif flavor in ('mysql', 'mariadb'): 837 begin = ( 838 f"CAST({begin} AS DATETIME(6))" 839 if begin != 'now' 840 else 'UTC_TIMESTAMP(6)' 841 ) 842 da = (f"DATE_ADD({begin}, INTERVAL {number} {datepart})" if number != 0 else begin) 843 844 elif flavor in ('sqlite', 'geopackage'): 845 da = f"datetime({begin}, '{number} {datepart}')" 846 847 elif flavor == 'oracle': 848 if begin == 'now': 849 begin = str( 850 datetime.now(timezone.utc).replace(tzinfo=None).strftime(r'%Y:%m:%d %M:%S.%f') 851 ) 852 elif begin_time: 853 begin = str(begin_time.strftime(r'%Y-%m-%d %H:%M:%S.%f')) 854 dt_format = 'YYYY-MM-DD HH24:MI:SS.FF' 855 _begin = f"'{begin}'" if begin_time else begin 856 da = ( 857 (f"TO_TIMESTAMP({_begin}, '{dt_format}')" if begin_time else _begin) 858 + (f" + INTERVAL '{number}' {datepart}" if number != 0 else "") 859 ) 860 return da
Generate a DATEADD
clause depending on database flavor.
Parameters
flavor (str, default
'postgresql'
): SQL database flavor, e.g.'postgresql'
,'sqlite'
.Currently supported flavors:
'postgresql'
'postgis'
'timescaledb'
'timescaledb-ha'
'citus'
'cockroachdb'
'duckdb'
'mssql'
'mysql'
'mariadb'
'sqlite'
'geopackage'
'oracle'
datepart (str, default
'day'
): Which part of the date to modify. Supported values:'year'
'month'
'day'
'hour'
'minute'
'second'
- number (Union[int, float], default
0
): How many units to add to the date part. - begin (Union[str, datetime], default
'now'
): Base datetime to which to add dateparts. - db_type (Optional[str], default None): If provided, cast the datetime string as the type. Otherwise, infer this from the input datetime value.
Returns
- The appropriate
DATEADD
string for the corresponding database flavor.
Examples
>>> dateadd_str(
... flavor='mssql',
... begin=datetime(2022, 1, 1, 0, 0),
... number=1,
... )
"DATEADD(day, 1, CAST('2022-01-01 00:00:00' AS DATETIME2))"
>>> dateadd_str(
... flavor='postgresql',
... begin=datetime(2022, 1, 1, 0, 0),
... number=1,
... )
"CAST('2022-01-01 00:00:00' AS TIMESTAMP) + INTERVAL '1 day'"
863def test_connection( 864 self, 865 **kw: Any 866) -> Union[bool, None]: 867 """ 868 Test if a successful connection to the database may be made. 869 870 Parameters 871 ---------- 872 **kw: 873 The keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`. 874 875 Returns 876 ------- 877 `True` if a connection is made, otherwise `False` or `None` in case of failure. 878 879 """ 880 import warnings 881 from meerschaum.connectors.poll import retry_connect 882 _default_kw = {'max_retries': 1, 'retry_wait': 0, 'warn': False, 'connector': self} 883 _default_kw.update(kw) 884 with warnings.catch_warnings(): 885 warnings.filterwarnings('ignore', 'Could not') 886 try: 887 return retry_connect(**_default_kw) 888 except Exception: 889 return False
Test if a successful connection to the database may be made.
Parameters
- **kw:: The keyword arguments are passed to
meerschaum.connectors.poll.retry_connect
.
Returns
True
if a connection is made, otherwiseFalse
orNone
in case of failure.
892def get_distinct_col_count( 893 col: str, 894 query: str, 895 connector: Optional[mrsm.connectors.sql.SQLConnector] = None, 896 debug: bool = False 897) -> Optional[int]: 898 """ 899 Returns the number of distinct items in a column of a SQL query. 900 901 Parameters 902 ---------- 903 col: str: 904 The column in the query to count. 905 906 query: str: 907 The SQL query to count from. 908 909 connector: Optional[mrsm.connectors.sql.SQLConnector], default None: 910 The SQLConnector to execute the query. 911 912 debug: bool, default False: 913 Verbosity toggle. 914 915 Returns 916 ------- 917 An `int` of the number of columns in the query or `None` if the query fails. 918 919 """ 920 if connector is None: 921 connector = mrsm.get_connector('sql') 922 923 _col_name = sql_item_name(col, connector.flavor, None) 924 925 _meta_query = ( 926 f""" 927 WITH src AS ( {query} ), 928 dist AS ( SELECT DISTINCT {_col_name} FROM src ) 929 SELECT COUNT(*) FROM dist""" 930 ) if connector.flavor not in ('mysql', 'mariadb') else ( 931 f""" 932 SELECT COUNT(*) 933 FROM ( 934 SELECT DISTINCT {_col_name} 935 FROM ({query}) AS src 936 ) AS dist""" 937 ) 938 939 result = connector.value(_meta_query, debug=debug) 940 try: 941 return int(result) 942 except Exception: 943 return None
Returns the number of distinct items in a column of a SQL query.
Parameters
- col (str:): The column in the query to count.
- query (str:): The SQL query to count from.
- connector (Optional[mrsm.connectors.sql.SQLConnector], default None:): The SQLConnector to execute the query.
- debug (bool, default False:): Verbosity toggle.
Returns
- An
int
of the number of columns in the query orNone
if the query fails.
946def sql_item_name(item: str, flavor: str, schema: Optional[str] = None) -> str: 947 """ 948 Parse SQL items depending on the flavor. 949 950 Parameters 951 ---------- 952 item: str 953 The database item (table, view, etc.) in need of quotes. 954 955 flavor: str 956 The database flavor (`'postgresql'`, `'mssql'`, `'sqllite'`, etc.). 957 958 schema: Optional[str], default None 959 If provided, prefix the table name with the schema. 960 961 Returns 962 ------- 963 A `str` which contains the input `item` wrapped in the corresponding escape characters. 964 965 Examples 966 -------- 967 >>> sql_item_name('table', 'sqlite') 968 '"table"' 969 >>> sql_item_name('table', 'mssql') 970 "[table]" 971 >>> sql_item_name('table', 'postgresql', schema='abc') 972 '"abc"."table"' 973 974 """ 975 truncated_item = truncate_item_name(str(item), flavor) 976 if flavor == 'oracle': 977 truncated_item = pg_capital(truncated_item, quote_capitals=True) 978 ### NOTE: System-reserved words must be quoted. 979 if truncated_item.lower() in ( 980 'float', 'varchar', 'nvarchar', 'clob', 981 'boolean', 'integer', 'table', 'row', 'date', 982 ): 983 wrappers = ('"', '"') 984 else: 985 wrappers = ('', '') 986 else: 987 wrappers = table_wrappers.get(flavor, table_wrappers['default']) 988 989 ### NOTE: SQLite does not support schemas. 990 if flavor in ('sqlite', 'geopackage'): 991 schema = None 992 elif flavor == 'mssql' and str(item).startswith('#'): 993 schema = None 994 995 schema_prefix = ( 996 (wrappers[0] + schema + wrappers[1] + '.') 997 if schema is not None 998 else '' 999 ) 1000 1001 return schema_prefix + wrappers[0] + truncated_item + wrappers[1]
Parse SQL items depending on the flavor.
Parameters
- item (str): The database item (table, view, etc.) in need of quotes.
- flavor (str):
The database flavor (
'postgresql'
,'mssql'
,'sqllite'
, etc.). - schema (Optional[str], default None): If provided, prefix the table name with the schema.
Returns
- A
str
which contains the inputitem
wrapped in the corresponding escape characters.
Examples
>>> sql_item_name('table', 'sqlite')
'"table"'
>>> sql_item_name('table', 'mssql')
"[table]"
>>> sql_item_name('table', 'postgresql', schema='abc')
'"abc"."table"'
1004def pg_capital(s: str, quote_capitals: bool = True) -> str: 1005 """ 1006 If string contains a capital letter, wrap it in double quotes. 1007 1008 Parameters 1009 ---------- 1010 s: str 1011 The string to be escaped. 1012 1013 quote_capitals: bool, default True 1014 If `False`, do not quote strings with contain only a mix of capital and lower-case letters. 1015 1016 Returns 1017 ------- 1018 The input string wrapped in quotes only if it needs them. 1019 1020 Examples 1021 -------- 1022 >>> pg_capital("My Table") 1023 '"My Table"' 1024 >>> pg_capital('my_table') 1025 'my_table' 1026 1027 """ 1028 if s.startswith('"') and s.endswith('"'): 1029 return s 1030 1031 s = s.replace('"', '') 1032 1033 needs_quotes = s.startswith('_') 1034 if not needs_quotes: 1035 for c in s: 1036 if c == '_': 1037 continue 1038 1039 if not c.isalnum() or (quote_capitals and c.isupper()): 1040 needs_quotes = True 1041 break 1042 1043 if needs_quotes: 1044 return '"' + s + '"' 1045 1046 return s
If string contains a capital letter, wrap it in double quotes.
Parameters
- s (str): The string to be escaped.
- quote_capitals (bool, default True):
If
False
, do not quote strings with contain only a mix of capital and lower-case letters.
Returns
- The input string wrapped in quotes only if it needs them.
Examples
>>> pg_capital("My Table")
'"My Table"'
>>> pg_capital('my_table')
'my_table'
1049def oracle_capital(s: str) -> str: 1050 """ 1051 Capitalize the string of an item on an Oracle database. 1052 """ 1053 return s
Capitalize the string of an item on an Oracle database.
1056def truncate_item_name(item: str, flavor: str) -> str: 1057 """ 1058 Truncate item names to stay within the database flavor's character limit. 1059 1060 Parameters 1061 ---------- 1062 item: str 1063 The database item being referenced. This string is the "canonical" name internally. 1064 1065 flavor: str 1066 The flavor of the database on which `item` resides. 1067 1068 Returns 1069 ------- 1070 The truncated string. 1071 """ 1072 from meerschaum.utils.misc import truncate_string_sections 1073 return truncate_string_sections( 1074 item, max_len=max_name_lens.get(flavor, max_name_lens['default']) 1075 )
Truncate item names to stay within the database flavor's character limit.
Parameters
- item (str): The database item being referenced. This string is the "canonical" name internally.
- flavor (str):
The flavor of the database on which
item
resides.
Returns
- The truncated string.
1078def build_where( 1079 params: Dict[str, Any], 1080 connector: Optional[mrsm.connectors.sql.SQLConnector] = None, 1081 with_where: bool = True, 1082 flavor: str = 'postgresql', 1083) -> str: 1084 """ 1085 Build the `WHERE` clause based on the input criteria. 1086 1087 Parameters 1088 ---------- 1089 params: Dict[str, Any]: 1090 The keywords dictionary to convert into a WHERE clause. 1091 If a value is a string which begins with an underscore, negate that value 1092 (e.g. `!=` instead of `=` or `NOT IN` instead of `IN`). 1093 A value of `_None` will be interpreted as `IS NOT NULL`. 1094 1095 connector: Optional[meerschaum.connectors.sql.SQLConnector], default None: 1096 The Meerschaum SQLConnector that will be executing the query. 1097 The connector is used to extract the SQL dialect. 1098 1099 with_where: bool, default True: 1100 If `True`, include the leading `'WHERE'` string. 1101 1102 flavor: str, default 'postgresql' 1103 If `connector` is `None`, fall back to this flavor. 1104 1105 Returns 1106 ------- 1107 A `str` of the `WHERE` clause from the input `params` dictionary for the connector's flavor. 1108 1109 Examples 1110 -------- 1111 ``` 1112 >>> print(build_where({'foo': [1, 2, 3]})) 1113 1114 WHERE 1115 "foo" IN ('1', '2', '3') 1116 ``` 1117 """ 1118 import json 1119 from meerschaum._internal.static import STATIC_CONFIG 1120 from meerschaum.utils.warnings import warn 1121 from meerschaum.utils.dtypes import value_is_null, none_if_null 1122 negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix'] 1123 try: 1124 params_json = json.dumps(params) 1125 except Exception: 1126 params_json = str(params) 1127 bad_words = ['drop ', '--', ';'] 1128 for word in bad_words: 1129 if word in params_json.lower(): 1130 warn("Aborting build_where() due to possible SQL injection.") 1131 return '' 1132 1133 query_flavor = getattr(connector, 'flavor', flavor) if connector is not None else flavor 1134 where = "" 1135 leading_and = "\n AND " 1136 for key, value in params.items(): 1137 _key = sql_item_name(key, query_flavor, None) 1138 ### search across a list (i.e. IN syntax) 1139 if isinstance(value, Iterable) and not isinstance(value, (dict, str)): 1140 includes = [ 1141 none_if_null(item) 1142 for item in value 1143 if not str(item).startswith(negation_prefix) 1144 ] 1145 null_includes = [item for item in includes if item is None] 1146 not_null_includes = [item for item in includes if item is not None] 1147 excludes = [ 1148 none_if_null(str(item)[len(negation_prefix):]) 1149 for item in value 1150 if str(item).startswith(negation_prefix) 1151 ] 1152 null_excludes = [item for item in excludes if item is None] 1153 not_null_excludes = [item for item in excludes if item is not None] 1154 1155 if includes: 1156 where += f"{leading_and}(" 1157 if not_null_includes: 1158 where += f"{_key} IN (" 1159 for item in not_null_includes: 1160 quoted_item = str(item).replace("'", "''") 1161 where += f"'{quoted_item}', " 1162 where = where[:-2] + ")" 1163 if null_includes: 1164 where += ("\n OR " if not_null_includes else "") + f"{_key} IS NULL" 1165 if includes: 1166 where += ")" 1167 1168 if excludes: 1169 where += f"{leading_and}(" 1170 if not_null_excludes: 1171 where += f"{_key} NOT IN (" 1172 for item in not_null_excludes: 1173 quoted_item = str(item).replace("'", "''") 1174 where += f"'{quoted_item}', " 1175 where = where[:-2] + ")" 1176 if null_excludes: 1177 where += ("\n AND " if not_null_excludes else "") + f"{_key} IS NOT NULL" 1178 if excludes: 1179 where += ")" 1180 1181 continue 1182 1183 ### search a dictionary 1184 elif isinstance(value, dict): 1185 import json 1186 where += (f"{leading_and}CAST({_key} AS TEXT) = '" + json.dumps(value) + "'") 1187 continue 1188 1189 eq_sign = '=' 1190 is_null = 'IS NULL' 1191 if value_is_null(str(value).lstrip(negation_prefix)): 1192 value = ( 1193 (negation_prefix + 'None') 1194 if str(value).startswith(negation_prefix) 1195 else None 1196 ) 1197 if str(value).startswith(negation_prefix): 1198 value = str(value)[len(negation_prefix):] 1199 eq_sign = '!=' 1200 if value_is_null(value): 1201 value = None 1202 is_null = 'IS NOT NULL' 1203 quoted_value = str(value).replace("'", "''") 1204 where += ( 1205 f"{leading_and}{_key} " 1206 + (is_null if value is None else f"{eq_sign} '{quoted_value}'") 1207 ) 1208 1209 if len(where) > 1: 1210 where = ("\nWHERE\n " if with_where else '') + where[len(leading_and):] 1211 return where
Build the WHERE
clause based on the input criteria.
Parameters
- params (Dict[str, Any]:):
The keywords dictionary to convert into a WHERE clause.
If a value is a string which begins with an underscore, negate that value
(e.g.
!=
instead of=
orNOT IN
instead ofIN
). A value of_None
will be interpreted asIS NOT NULL
. - connector (Optional[meerschaum.connectors.sql.SQLConnector], default None:): The Meerschaum SQLConnector that will be executing the query. The connector is used to extract the SQL dialect.
- with_where (bool, default True:):
If
True
, include the leading'WHERE'
string. - flavor (str, default 'postgresql'):
If
connector
isNone
, fall back to this flavor.
Returns
- A
str
of theWHERE
clause from the inputparams
dictionary for the connector's flavor.
Examples
>>> print(build_where({'foo': [1, 2, 3]}))
WHERE
"foo" IN ('1', '2', '3')
1214def table_exists( 1215 table: str, 1216 connector: mrsm.connectors.sql.SQLConnector, 1217 schema: Optional[str] = None, 1218 debug: bool = False, 1219) -> bool: 1220 """Check if a table exists. 1221 1222 Parameters 1223 ---------- 1224 table: str: 1225 The name of the table in question. 1226 1227 connector: mrsm.connectors.sql.SQLConnector 1228 The connector to the database which holds the table. 1229 1230 schema: Optional[str], default None 1231 Optionally specify the table schema. 1232 Defaults to `connector.schema`. 1233 1234 debug: bool, default False : 1235 Verbosity toggle. 1236 1237 Returns 1238 ------- 1239 A `bool` indicating whether or not the table exists on the database. 1240 """ 1241 sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False) 1242 schema = schema or connector.schema 1243 insp = sqlalchemy.inspect(connector.engine) 1244 truncated_table_name = truncate_item_name(str(table), connector.flavor) 1245 return insp.has_table(truncated_table_name, schema=schema)
Check if a table exists.
Parameters
- table (str:): The name of the table in question.
- connector (mrsm.connectors.sql.SQLConnector): The connector to the database which holds the table.
- schema (Optional[str], default None):
Optionally specify the table schema.
Defaults to
connector.schema
. - debug (bool, default False :): Verbosity toggle.
Returns
- A
bool
indicating whether or not the table exists on the database.
1248def get_sqlalchemy_table( 1249 table: str, 1250 connector: Optional[mrsm.connectors.sql.SQLConnector] = None, 1251 schema: Optional[str] = None, 1252 refresh: bool = False, 1253 debug: bool = False, 1254) -> Union['sqlalchemy.Table', None]: 1255 """ 1256 Construct a SQLAlchemy table from its name. 1257 1258 Parameters 1259 ---------- 1260 table: str 1261 The name of the table on the database. Does not need to be escaped. 1262 1263 connector: Optional[meerschaum.connectors.sql.SQLConnector], default None: 1264 The connector to the database which holds the table. 1265 1266 schema: Optional[str], default None 1267 Specify on which schema the table resides. 1268 Defaults to the schema set in `connector`. 1269 1270 refresh: bool, default False 1271 If `True`, rebuild the cached table object. 1272 1273 debug: bool, default False: 1274 Verbosity toggle. 1275 1276 Returns 1277 ------- 1278 A `sqlalchemy.Table` object for the table. 1279 1280 """ 1281 if connector is None: 1282 from meerschaum import get_connector 1283 connector = get_connector('sql') 1284 1285 if connector.flavor == 'duckdb': 1286 return None 1287 1288 from meerschaum.connectors.sql.tables import get_tables 1289 from meerschaum.utils.packages import attempt_import 1290 from meerschaum.utils.warnings import warn 1291 if refresh: 1292 connector.metadata.clear() 1293 tables = get_tables(mrsm_instance=connector, debug=debug, create=False) 1294 sqlalchemy = attempt_import('sqlalchemy', lazy=False) 1295 truncated_table_name = truncate_item_name(str(table), connector.flavor) 1296 table_kwargs = { 1297 'autoload_with': connector.engine, 1298 } 1299 if schema: 1300 table_kwargs['schema'] = schema 1301 1302 if refresh or truncated_table_name not in tables: 1303 try: 1304 tables[truncated_table_name] = sqlalchemy.Table( 1305 truncated_table_name, 1306 connector.metadata, 1307 **table_kwargs 1308 ) 1309 except sqlalchemy.exc.NoSuchTableError: 1310 warn(f"Table '{truncated_table_name}' does not exist in '{connector}'.") 1311 return None 1312 return tables[truncated_table_name]
Construct a SQLAlchemy table from its name.
Parameters
- table (str): The name of the table on the database. Does not need to be escaped.
- connector (Optional[meerschaum.connectors.sql.SQLConnector], default None:): The connector to the database which holds the table.
- schema (Optional[str], default None):
Specify on which schema the table resides.
Defaults to the schema set in
connector
. - refresh (bool, default False):
If
True
, rebuild the cached table object. - debug (bool, default False:): Verbosity toggle.
Returns
- A
sqlalchemy.Table
object for the table.
1315def get_table_cols_types( 1316 table: str, 1317 connectable: Union[ 1318 'mrsm.connectors.sql.SQLConnector', 1319 'sqlalchemy.orm.session.Session', 1320 'sqlalchemy.engine.base.Engine' 1321 ], 1322 flavor: Optional[str] = None, 1323 schema: Optional[str] = None, 1324 database: Optional[str] = None, 1325 debug: bool = False, 1326) -> Dict[str, str]: 1327 """ 1328 Return a dictionary mapping a table's columns to data types. 1329 This is useful for inspecting tables creating during a not-yet-committed session. 1330 1331 NOTE: This may return incorrect columns if the schema is not explicitly stated. 1332 Use this function if you are confident the table name is unique or if you have 1333 and explicit schema. 1334 To use the configured schema, get the columns from `get_sqlalchemy_table()` instead. 1335 1336 Parameters 1337 ---------- 1338 table: str 1339 The name of the table (unquoted). 1340 1341 connectable: Union[ 1342 'mrsm.connectors.sql.SQLConnector', 1343 'sqlalchemy.orm.session.Session', 1344 'sqlalchemy.engine.base.Engine' 1345 ] 1346 The connection object used to fetch the columns and types. 1347 1348 flavor: Optional[str], default None 1349 The database dialect flavor to use for the query. 1350 If omitted, default to `connectable.flavor`. 1351 1352 schema: Optional[str], default None 1353 If provided, restrict the query to this schema. 1354 1355 database: Optional[str]. default None 1356 If provided, restrict the query to this database. 1357 1358 Returns 1359 ------- 1360 A dictionary mapping column names to data types. 1361 """ 1362 import textwrap 1363 from meerschaum.connectors import SQLConnector 1364 sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False) 1365 flavor = flavor or getattr(connectable, 'flavor', None) 1366 if not flavor: 1367 raise ValueError("Please provide a database flavor.") 1368 if flavor == 'duckdb' and not isinstance(connectable, SQLConnector): 1369 raise ValueError("You must provide a SQLConnector when using DuckDB.") 1370 if flavor in NO_SCHEMA_FLAVORS: 1371 schema = None 1372 if schema is None: 1373 schema = DEFAULT_SCHEMA_FLAVORS.get(flavor, None) 1374 if flavor in ('sqlite', 'duckdb', 'oracle', 'geopackage'): 1375 database = None 1376 table_trunc = truncate_item_name(table, flavor=flavor) 1377 table_lower = table.lower() 1378 table_upper = table.upper() 1379 table_lower_trunc = truncate_item_name(table_lower, flavor=flavor) 1380 table_upper_trunc = truncate_item_name(table_upper, flavor=flavor) 1381 db_prefix = ( 1382 "tempdb." 1383 if flavor == 'mssql' and table.startswith('#') 1384 else "" 1385 ) 1386 1387 cols_types_query = sqlalchemy.text( 1388 textwrap.dedent(columns_types_queries.get( 1389 flavor, 1390 columns_types_queries['default'] 1391 ).format( 1392 table=table, 1393 table_trunc=table_trunc, 1394 table_lower=table_lower, 1395 table_lower_trunc=table_lower_trunc, 1396 table_upper=table_upper, 1397 table_upper_trunc=table_upper_trunc, 1398 db_prefix=db_prefix, 1399 )).lstrip().rstrip() 1400 ) 1401 1402 cols = ['database', 'schema', 'table', 'column', 'type', 'numeric_precision', 'numeric_scale'] 1403 result_cols_ix = dict(enumerate(cols)) 1404 1405 debug_kwargs = {'debug': debug} if isinstance(connectable, SQLConnector) else {} 1406 if not debug_kwargs and debug: 1407 dprint(cols_types_query) 1408 1409 try: 1410 result_rows = ( 1411 [ 1412 row 1413 for row in connectable.execute(cols_types_query, **debug_kwargs).fetchall() 1414 ] 1415 if flavor != 'duckdb' 1416 else [ 1417 tuple([doc[col] for col in cols]) 1418 for doc in connectable.read(cols_types_query, debug=debug).to_dict(orient='records') 1419 ] 1420 ) 1421 cols_types_docs = [ 1422 { 1423 result_cols_ix[i]: val 1424 for i, val in enumerate(row) 1425 } 1426 for row in result_rows 1427 ] 1428 cols_types_docs_filtered = [ 1429 doc 1430 for doc in cols_types_docs 1431 if ( 1432 ( 1433 not schema 1434 or doc['schema'] == schema 1435 ) 1436 and 1437 ( 1438 not database 1439 or doc['database'] == database 1440 ) 1441 ) 1442 ] 1443 1444 ### NOTE: This may return incorrect columns if the schema is not explicitly stated. 1445 if cols_types_docs and not cols_types_docs_filtered: 1446 cols_types_docs_filtered = cols_types_docs 1447 1448 ### NOTE: Check for PostGIS GEOMETRY columns. 1449 geometry_cols_types = {} 1450 user_defined_cols = [ 1451 doc 1452 for doc in cols_types_docs_filtered 1453 if str(doc.get('type', None)).upper() == 'USER-DEFINED' 1454 ] 1455 if user_defined_cols: 1456 geometry_cols_types.update( 1457 get_postgis_geo_columns_types( 1458 connectable, 1459 table, 1460 schema=schema, 1461 debug=debug, 1462 ) 1463 ) 1464 1465 cols_types = { 1466 ( 1467 doc['column'] 1468 if flavor != 'oracle' else ( 1469 ( 1470 doc['column'].lower() 1471 if (doc['column'].isupper() and doc['column'].replace('_', '').isalpha()) 1472 else doc['column'] 1473 ) 1474 ) 1475 ): doc['type'].upper() + ( 1476 f'({precision},{scale})' 1477 if ( 1478 (precision := doc.get('numeric_precision', None)) 1479 and 1480 (scale := doc.get('numeric_scale', None)) 1481 ) 1482 else '' 1483 ) 1484 for doc in cols_types_docs_filtered 1485 } 1486 cols_types.update(geometry_cols_types) 1487 return cols_types 1488 except Exception as e: 1489 warn(f"Failed to fetch columns for table '{table}':\n{e}") 1490 return {}
Return a dictionary mapping a table's columns to data types. This is useful for inspecting tables creating during a not-yet-committed session.
NOTE: This may return incorrect columns if the schema is not explicitly stated.
Use this function if you are confident the table name is unique or if you have
and explicit schema.
To use the configured schema, get the columns from get_sqlalchemy_table()
instead.
Parameters
- table (str): The name of the table (unquoted).
- connectable (Union[): 'mrsm.connectors.sql.SQLConnector', 'sqlalchemy.orm.session.Session', 'sqlalchemy.engine.base.Engine'
- ]: The connection object used to fetch the columns and types.
- flavor (Optional[str], default None):
The database dialect flavor to use for the query.
If omitted, default to
connectable.flavor
. - schema (Optional[str], default None): If provided, restrict the query to this schema.
- database (Optional[str]. default None): If provided, restrict the query to this database.
Returns
- A dictionary mapping column names to data types.
1493def get_table_cols_indices( 1494 table: str, 1495 connectable: Union[ 1496 'mrsm.connectors.sql.SQLConnector', 1497 'sqlalchemy.orm.session.Session', 1498 'sqlalchemy.engine.base.Engine' 1499 ], 1500 flavor: Optional[str] = None, 1501 schema: Optional[str] = None, 1502 database: Optional[str] = None, 1503 debug: bool = False, 1504) -> Dict[str, List[str]]: 1505 """ 1506 Return a dictionary mapping a table's columns to lists of indices. 1507 This is useful for inspecting tables creating during a not-yet-committed session. 1508 1509 NOTE: This may return incorrect columns if the schema is not explicitly stated. 1510 Use this function if you are confident the table name is unique or if you have 1511 and explicit schema. 1512 To use the configured schema, get the columns from `get_sqlalchemy_table()` instead. 1513 1514 Parameters 1515 ---------- 1516 table: str 1517 The name of the table (unquoted). 1518 1519 connectable: Union[ 1520 'mrsm.connectors.sql.SQLConnector', 1521 'sqlalchemy.orm.session.Session', 1522 'sqlalchemy.engine.base.Engine' 1523 ] 1524 The connection object used to fetch the columns and types. 1525 1526 flavor: Optional[str], default None 1527 The database dialect flavor to use for the query. 1528 If omitted, default to `connectable.flavor`. 1529 1530 schema: Optional[str], default None 1531 If provided, restrict the query to this schema. 1532 1533 database: Optional[str]. default None 1534 If provided, restrict the query to this database. 1535 1536 Returns 1537 ------- 1538 A dictionary mapping column names to a list of indices. 1539 """ 1540 import textwrap 1541 from collections import defaultdict 1542 from meerschaum.connectors import SQLConnector 1543 sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False) 1544 flavor = flavor or getattr(connectable, 'flavor', None) 1545 if not flavor: 1546 raise ValueError("Please provide a database flavor.") 1547 if flavor == 'duckdb' and not isinstance(connectable, SQLConnector): 1548 raise ValueError("You must provide a SQLConnector when using DuckDB.") 1549 if flavor in NO_SCHEMA_FLAVORS: 1550 schema = None 1551 if schema is None: 1552 schema = DEFAULT_SCHEMA_FLAVORS.get(flavor, None) 1553 if flavor in ('sqlite', 'duckdb', 'oracle', 'geopackage'): 1554 database = None 1555 table_trunc = truncate_item_name(table, flavor=flavor) 1556 table_lower = table.lower() 1557 table_upper = table.upper() 1558 table_lower_trunc = truncate_item_name(table_lower, flavor=flavor) 1559 table_upper_trunc = truncate_item_name(table_upper, flavor=flavor) 1560 db_prefix = ( 1561 "tempdb." 1562 if flavor == 'mssql' and table.startswith('#') 1563 else "" 1564 ) 1565 1566 cols_indices_query = sqlalchemy.text( 1567 textwrap.dedent(columns_indices_queries.get( 1568 flavor, 1569 columns_indices_queries['default'] 1570 ).format( 1571 table=table, 1572 table_trunc=table_trunc, 1573 table_lower=table_lower, 1574 table_lower_trunc=table_lower_trunc, 1575 table_upper=table_upper, 1576 table_upper_trunc=table_upper_trunc, 1577 db_prefix=db_prefix, 1578 schema=schema, 1579 )).lstrip().rstrip() 1580 ) 1581 1582 cols = ['database', 'schema', 'table', 'column', 'index', 'index_type'] 1583 if flavor == 'mssql': 1584 cols.append('clustered') 1585 result_cols_ix = dict(enumerate(cols)) 1586 1587 debug_kwargs = {'debug': debug} if isinstance(connectable, SQLConnector) else {} 1588 if not debug_kwargs and debug: 1589 dprint(cols_indices_query) 1590 1591 try: 1592 result_rows = ( 1593 [ 1594 row 1595 for row in connectable.execute(cols_indices_query, **debug_kwargs).fetchall() 1596 ] 1597 if flavor != 'duckdb' 1598 else [ 1599 tuple([doc[col] for col in cols]) 1600 for doc in connectable.read(cols_indices_query, debug=debug).to_dict(orient='records') 1601 ] 1602 ) 1603 cols_types_docs = [ 1604 { 1605 result_cols_ix[i]: val 1606 for i, val in enumerate(row) 1607 } 1608 for row in result_rows 1609 ] 1610 cols_types_docs_filtered = [ 1611 doc 1612 for doc in cols_types_docs 1613 if ( 1614 ( 1615 not schema 1616 or doc['schema'] == schema 1617 ) 1618 and 1619 ( 1620 not database 1621 or doc['database'] == database 1622 ) 1623 ) 1624 ] 1625 ### NOTE: This may return incorrect columns if the schema is not explicitly stated. 1626 if cols_types_docs and not cols_types_docs_filtered: 1627 cols_types_docs_filtered = cols_types_docs 1628 1629 cols_indices = defaultdict(lambda: []) 1630 for doc in cols_types_docs_filtered: 1631 col = ( 1632 doc['column'] 1633 if flavor != 'oracle' 1634 else ( 1635 doc['column'].lower() 1636 if (doc['column'].isupper() and doc['column'].replace('_', '').isalpha()) 1637 else doc['column'] 1638 ) 1639 ) 1640 index_doc = { 1641 'name': doc.get('index', None), 1642 'type': doc.get('index_type', None) 1643 } 1644 if flavor == 'mssql': 1645 index_doc['clustered'] = doc.get('clustered', None) 1646 cols_indices[col].append(index_doc) 1647 1648 return dict(cols_indices) 1649 except Exception as e: 1650 warn(f"Failed to fetch columns for table '{table}':\n{e}") 1651 return {}
Return a dictionary mapping a table's columns to lists of indices. This is useful for inspecting tables creating during a not-yet-committed session.
NOTE: This may return incorrect columns if the schema is not explicitly stated.
Use this function if you are confident the table name is unique or if you have
and explicit schema.
To use the configured schema, get the columns from get_sqlalchemy_table()
instead.
Parameters
- table (str): The name of the table (unquoted).
- connectable (Union[): 'mrsm.connectors.sql.SQLConnector', 'sqlalchemy.orm.session.Session', 'sqlalchemy.engine.base.Engine'
- ]: The connection object used to fetch the columns and types.
- flavor (Optional[str], default None):
The database dialect flavor to use for the query.
If omitted, default to
connectable.flavor
. - schema (Optional[str], default None): If provided, restrict the query to this schema.
- database (Optional[str]. default None): If provided, restrict the query to this database.
Returns
- A dictionary mapping column names to a list of indices.
1654def get_update_queries( 1655 target: str, 1656 patch: str, 1657 connectable: Union[ 1658 mrsm.connectors.sql.SQLConnector, 1659 'sqlalchemy.orm.session.Session' 1660 ], 1661 join_cols: Iterable[str], 1662 flavor: Optional[str] = None, 1663 upsert: bool = False, 1664 datetime_col: Optional[str] = None, 1665 schema: Optional[str] = None, 1666 patch_schema: Optional[str] = None, 1667 target_cols_types: Optional[Dict[str, str]] = None, 1668 patch_cols_types: Optional[Dict[str, str]] = None, 1669 identity_insert: bool = False, 1670 null_indices: bool = True, 1671 cast_columns: bool = True, 1672 debug: bool = False, 1673) -> List[str]: 1674 """ 1675 Build a list of `MERGE`, `UPDATE`, `DELETE`/`INSERT` queries to apply a patch to target table. 1676 1677 Parameters 1678 ---------- 1679 target: str 1680 The name of the target table. 1681 1682 patch: str 1683 The name of the patch table. This should have the same shape as the target. 1684 1685 connectable: Union[meerschaum.connectors.sql.SQLConnector, sqlalchemy.orm.session.Session] 1686 The `SQLConnector` or SQLAlchemy session which will later execute the queries. 1687 1688 join_cols: List[str] 1689 The columns to use to join the patch to the target. 1690 1691 flavor: Optional[str], default None 1692 If using a SQLAlchemy session, provide the expected database flavor. 1693 1694 upsert: bool, default False 1695 If `True`, return an upsert query rather than an update. 1696 1697 datetime_col: Optional[str], default None 1698 If provided, bound the join query using this column as the datetime index. 1699 This must be present on both tables. 1700 1701 schema: Optional[str], default None 1702 If provided, use this schema when quoting the target table. 1703 Defaults to `connector.schema`. 1704 1705 patch_schema: Optional[str], default None 1706 If provided, use this schema when quoting the patch table. 1707 Defaults to `schema`. 1708 1709 target_cols_types: Optional[Dict[str, Any]], default None 1710 If provided, use these as the columns-types dictionary for the target table. 1711 Default will infer from the database context. 1712 1713 patch_cols_types: Optional[Dict[str, Any]], default None 1714 If provided, use these as the columns-types dictionary for the target table. 1715 Default will infer from the database context. 1716 1717 identity_insert: bool, default False 1718 If `True`, include `SET IDENTITY_INSERT` queries before and after the update queries. 1719 Only applies for MSSQL upserts. 1720 1721 null_indices: bool, default True 1722 If `False`, do not coalesce index columns before joining. 1723 1724 cast_columns: bool, default True 1725 If `False`, do not cast update columns to the target table types. 1726 1727 debug: bool, default False 1728 Verbosity toggle. 1729 1730 Returns 1731 ------- 1732 A list of query strings to perform the update operation. 1733 """ 1734 import textwrap 1735 from meerschaum.connectors import SQLConnector 1736 from meerschaum.utils.debug import dprint 1737 from meerschaum.utils.dtypes import are_dtypes_equal 1738 from meerschaum.utils.dtypes.sql import DB_FLAVORS_CAST_DTYPES, get_pd_type_from_db_type 1739 flavor = flavor or getattr(connectable, 'flavor', None) 1740 if not flavor: 1741 raise ValueError("Provide a flavor if using a SQLAlchemy session.") 1742 if ( 1743 flavor in ('sqlite', 'geopackage') 1744 and isinstance(connectable, SQLConnector) 1745 and connectable.db_version < '3.33.0' 1746 ): 1747 flavor = 'sqlite_delete_insert' 1748 flavor_key = (f'{flavor}-upsert' if upsert else flavor) 1749 base_queries = UPDATE_QUERIES.get( 1750 flavor_key, 1751 UPDATE_QUERIES['default'] 1752 ) 1753 if not isinstance(base_queries, list): 1754 base_queries = [base_queries] 1755 schema = schema or (connectable.schema if isinstance(connectable, SQLConnector) else None) 1756 patch_schema = patch_schema or schema 1757 target_table_columns = get_table_cols_types( 1758 target, 1759 connectable, 1760 flavor=flavor, 1761 schema=schema, 1762 debug=debug, 1763 ) if not target_cols_types else target_cols_types 1764 patch_table_columns = get_table_cols_types( 1765 patch, 1766 connectable, 1767 flavor=flavor, 1768 schema=patch_schema, 1769 debug=debug, 1770 ) if not patch_cols_types else patch_cols_types 1771 1772 patch_cols_str = ', '.join( 1773 [ 1774 sql_item_name(col, flavor) 1775 for col in patch_table_columns 1776 ] 1777 ) 1778 patch_cols_prefixed_str = ', '.join( 1779 [ 1780 'p.' + sql_item_name(col, flavor) 1781 for col in patch_table_columns 1782 ] 1783 ) 1784 1785 join_cols_str = ', '.join( 1786 [ 1787 sql_item_name(col, flavor) 1788 for col in join_cols 1789 ] 1790 ) 1791 1792 value_cols = [] 1793 join_cols_types = [] 1794 if debug: 1795 dprint("target_table_columns:") 1796 mrsm.pprint(target_table_columns) 1797 for c_name, c_type in target_table_columns.items(): 1798 if c_name not in patch_table_columns: 1799 continue 1800 if flavor in DB_FLAVORS_CAST_DTYPES: 1801 c_type = DB_FLAVORS_CAST_DTYPES[flavor].get(c_type.upper(), c_type) 1802 ( 1803 join_cols_types 1804 if c_name in join_cols 1805 else value_cols 1806 ).append((c_name, c_type)) 1807 if debug: 1808 dprint(f"value_cols: {value_cols}") 1809 1810 if not join_cols_types: 1811 return [] 1812 if not value_cols and not upsert: 1813 return [] 1814 1815 coalesce_join_cols_str = ', '.join( 1816 [ 1817 ( 1818 ( 1819 'COALESCE(' 1820 + sql_item_name(c_name, flavor) 1821 + ', ' 1822 + get_null_replacement(c_type, flavor) 1823 + ')' 1824 ) 1825 if null_indices 1826 else sql_item_name(c_name, flavor) 1827 ) 1828 for c_name, c_type in join_cols_types 1829 ] 1830 ) 1831 1832 update_or_nothing = ('UPDATE' if value_cols else 'NOTHING') 1833 1834 def sets_subquery(l_prefix: str, r_prefix: str): 1835 if not value_cols: 1836 return '' 1837 1838 utc_value_cols = { 1839 c_name 1840 for c_name, c_type in value_cols 1841 if ('utc' in get_pd_type_from_db_type(c_type).lower()) 1842 } if flavor not in TIMEZONE_NAIVE_FLAVORS else set() 1843 1844 cast_func_cols = { 1845 c_name: ( 1846 ('', '', '') 1847 if not cast_columns or ( 1848 flavor == 'oracle' 1849 and are_dtypes_equal(get_pd_type_from_db_type(c_type), 'bytes') 1850 ) 1851 else ( 1852 ('CAST(', f" AS {c_type.replace('_', ' ')}", ')' + ( 1853 " AT TIME ZONE 'UTC'" 1854 if c_name in utc_value_cols 1855 else '' 1856 )) 1857 if flavor not in ('sqlite', 'geopackage') 1858 else ('', '', '') 1859 ) 1860 ) 1861 for c_name, c_type in value_cols 1862 } 1863 return 'SET ' + ',\n'.join([ 1864 ( 1865 l_prefix + sql_item_name(c_name, flavor, None) 1866 + ' = ' 1867 + cast_func_cols[c_name][0] 1868 + r_prefix + sql_item_name(c_name, flavor, None) 1869 + cast_func_cols[c_name][1] 1870 + cast_func_cols[c_name][2] 1871 ) for c_name, c_type in value_cols 1872 ]) 1873 1874 def and_subquery(l_prefix: str, r_prefix: str): 1875 return '\n AND\n '.join([ 1876 ( 1877 ( 1878 "COALESCE(" 1879 + l_prefix 1880 + sql_item_name(c_name, flavor, None) 1881 + ", " 1882 + get_null_replacement(c_type, flavor) 1883 + ")" 1884 + '\n =\n ' 1885 + "COALESCE(" 1886 + r_prefix 1887 + sql_item_name(c_name, flavor, None) 1888 + ", " 1889 + get_null_replacement(c_type, flavor) 1890 + ")" 1891 ) 1892 if null_indices 1893 else ( 1894 l_prefix 1895 + sql_item_name(c_name, flavor, None) 1896 + ' = ' 1897 + r_prefix 1898 + sql_item_name(c_name, flavor, None) 1899 ) 1900 ) for c_name, c_type in join_cols_types 1901 ]) 1902 1903 skip_query_val = "" 1904 target_table_name = sql_item_name(target, flavor, schema) 1905 patch_table_name = sql_item_name(patch, flavor, patch_schema) 1906 dt_col_name = sql_item_name(datetime_col, flavor, None) if datetime_col else None 1907 date_bounds_table = patch_table_name if flavor != 'mssql' else '[date_bounds]' 1908 min_dt_col_name = f"MIN({dt_col_name})" if flavor != 'mssql' else '[Min_dt]' 1909 max_dt_col_name = f"MAX({dt_col_name})" if flavor != 'mssql' else '[Max_dt]' 1910 date_bounds_subquery = ( 1911 f"""f.{dt_col_name} >= (SELECT {min_dt_col_name} FROM {date_bounds_table}) 1912 AND 1913 f.{dt_col_name} <= (SELECT {max_dt_col_name} FROM {date_bounds_table})""" 1914 if datetime_col 1915 else "1 = 1" 1916 ) 1917 with_temp_date_bounds = f"""WITH [date_bounds] AS ( 1918 SELECT MIN({dt_col_name}) AS {min_dt_col_name}, MAX({dt_col_name}) AS {max_dt_col_name} 1919 FROM {patch_table_name} 1920 )""" if datetime_col else "" 1921 identity_insert_on = ( 1922 f"SET IDENTITY_INSERT {target_table_name} ON" 1923 if identity_insert 1924 else skip_query_val 1925 ) 1926 identity_insert_off = ( 1927 f"SET IDENTITY_INSERT {target_table_name} OFF" 1928 if identity_insert 1929 else skip_query_val 1930 ) 1931 1932 ### NOTE: MSSQL upserts must exclude the update portion if only upserting indices. 1933 when_matched_update_sets_subquery_none = "" if not value_cols else ( 1934 "\n WHEN MATCHED THEN\n" 1935 f" UPDATE {sets_subquery('', 'p.')}" 1936 ) 1937 1938 cols_equal_values = '\n,'.join( 1939 [ 1940 f"{sql_item_name(c_name, flavor)} = VALUES({sql_item_name(c_name, flavor)})" 1941 for c_name, c_type in value_cols 1942 ] 1943 ) 1944 on_duplicate_key_update = ( 1945 "ON DUPLICATE KEY UPDATE" 1946 if value_cols 1947 else "" 1948 ) 1949 ignore = "IGNORE " if not value_cols else "" 1950 1951 formatted_queries = [ 1952 textwrap.dedent(base_query.format( 1953 sets_subquery_none=sets_subquery('', 'p.'), 1954 sets_subquery_none_excluded=sets_subquery('', 'EXCLUDED.'), 1955 sets_subquery_f=sets_subquery('f.', 'p.'), 1956 and_subquery_f=and_subquery('p.', 'f.'), 1957 and_subquery_t=and_subquery('p.', 't.'), 1958 target_table_name=target_table_name, 1959 patch_table_name=patch_table_name, 1960 patch_cols_str=patch_cols_str, 1961 patch_cols_prefixed_str=patch_cols_prefixed_str, 1962 date_bounds_subquery=date_bounds_subquery, 1963 join_cols_str=join_cols_str, 1964 coalesce_join_cols_str=coalesce_join_cols_str, 1965 update_or_nothing=update_or_nothing, 1966 when_matched_update_sets_subquery_none=when_matched_update_sets_subquery_none, 1967 cols_equal_values=cols_equal_values, 1968 on_duplicate_key_update=on_duplicate_key_update, 1969 ignore=ignore, 1970 with_temp_date_bounds=with_temp_date_bounds, 1971 identity_insert_on=identity_insert_on, 1972 identity_insert_off=identity_insert_off, 1973 )).lstrip().rstrip() 1974 for base_query in base_queries 1975 ] 1976 1977 ### NOTE: Allow for skipping some queries. 1978 return [query for query in formatted_queries if query]
Build a list of MERGE
, UPDATE
, DELETE
/INSERT
queries to apply a patch to target table.
Parameters
- target (str): The name of the target table.
- patch (str): The name of the patch table. This should have the same shape as the target.
- connectable (Union[meerschaum.connectors.sql.SQLConnector, sqlalchemy.orm.session.Session]):
The
SQLConnector
or SQLAlchemy session which will later execute the queries. - join_cols (List[str]): The columns to use to join the patch to the target.
- flavor (Optional[str], default None): If using a SQLAlchemy session, provide the expected database flavor.
- upsert (bool, default False):
If
True
, return an upsert query rather than an update. - datetime_col (Optional[str], default None): If provided, bound the join query using this column as the datetime index. This must be present on both tables.
- schema (Optional[str], default None):
If provided, use this schema when quoting the target table.
Defaults to
connector.schema
. - patch_schema (Optional[str], default None):
If provided, use this schema when quoting the patch table.
Defaults to
schema
. - target_cols_types (Optional[Dict[str, Any]], default None): If provided, use these as the columns-types dictionary for the target table. Default will infer from the database context.
- patch_cols_types (Optional[Dict[str, Any]], default None): If provided, use these as the columns-types dictionary for the target table. Default will infer from the database context.
- identity_insert (bool, default False):
If
True
, includeSET IDENTITY_INSERT
queries before and after the update queries. Only applies for MSSQL upserts. - null_indices (bool, default True):
If
False
, do not coalesce index columns before joining. - cast_columns (bool, default True):
If
False
, do not cast update columns to the target table types. - debug (bool, default False): Verbosity toggle.
Returns
- A list of query strings to perform the update operation.
1981def get_null_replacement(typ: str, flavor: str) -> str: 1982 """ 1983 Return a value that may temporarily be used in place of NULL for this type. 1984 1985 Parameters 1986 ---------- 1987 typ: str 1988 The typ to be converted to NULL. 1989 1990 flavor: str 1991 The database flavor for which this value will be used. 1992 1993 Returns 1994 ------- 1995 A value which may stand in place of NULL for this type. 1996 `'None'` is returned if a value cannot be determined. 1997 """ 1998 from meerschaum.utils.dtypes.sql import DB_FLAVORS_CAST_DTYPES 1999 if 'geometry' in typ.lower(): 2000 return "'010100000000008058346FCDC100008058346FCDC1'" 2001 if 'int' in typ.lower() or typ.lower() in ('numeric', 'number'): 2002 return '-987654321' 2003 if 'bool' in typ.lower() or typ.lower() == 'bit': 2004 bool_typ = ( 2005 PD_TO_DB_DTYPES_FLAVORS 2006 .get('bool', {}) 2007 .get(flavor, PD_TO_DB_DTYPES_FLAVORS['bool']['default']) 2008 ) 2009 if flavor in DB_FLAVORS_CAST_DTYPES: 2010 bool_typ = DB_FLAVORS_CAST_DTYPES[flavor].get(bool_typ, bool_typ) 2011 val_to_cast = ( 2012 -987654321 2013 if flavor in ('mysql', 'mariadb') 2014 else 0 2015 ) 2016 return f'CAST({val_to_cast} AS {bool_typ})' 2017 if 'time' in typ.lower() or 'date' in typ.lower(): 2018 db_type = typ if typ.isupper() else None 2019 return dateadd_str(flavor=flavor, begin='1900-01-01', db_type=db_type) 2020 if 'float' in typ.lower() or 'double' in typ.lower() or typ.lower() in ('decimal',): 2021 return '-987654321.0' 2022 if flavor == 'oracle' and typ.lower().split('(', maxsplit=1)[0] == 'char': 2023 return "'-987654321'" 2024 if flavor == 'oracle' and typ.lower() in ('blob', 'bytes'): 2025 return '00' 2026 if typ.lower() in ('uniqueidentifier', 'guid', 'uuid'): 2027 magic_val = 'DEADBEEF-ABBA-BABE-CAFE-DECAFC0FFEE5' 2028 if flavor == 'mssql': 2029 return f"CAST('{magic_val}' AS UNIQUEIDENTIFIER)" 2030 return f"'{magic_val}'" 2031 return ('n' if flavor == 'oracle' else '') + "'-987654321'"
Return a value that may temporarily be used in place of NULL for this type.
Parameters
- typ (str): The typ to be converted to NULL.
- flavor (str): The database flavor for which this value will be used.
Returns
- A value which may stand in place of NULL for this type.
'None'
is returned if a value cannot be determined.
2034def get_db_version(conn: 'SQLConnector', debug: bool = False) -> Union[str, None]: 2035 """ 2036 Fetch the database version if possible. 2037 """ 2038 version_name = sql_item_name('version', conn.flavor, None) 2039 version_query = version_queries.get( 2040 conn.flavor, 2041 version_queries['default'] 2042 ).format(version_name=version_name) 2043 return conn.value(version_query, debug=debug)
Fetch the database version if possible.
2046def get_rename_table_queries( 2047 old_table: str, 2048 new_table: str, 2049 flavor: str, 2050 schema: Optional[str] = None, 2051) -> List[str]: 2052 """ 2053 Return queries to alter a table's name. 2054 2055 Parameters 2056 ---------- 2057 old_table: str 2058 The unquoted name of the old table. 2059 2060 new_table: str 2061 The unquoted name of the new table. 2062 2063 flavor: str 2064 The database flavor to use for the query (e.g. `'mssql'`, `'postgresql'`. 2065 2066 schema: Optional[str], default None 2067 The schema on which the table resides. 2068 2069 Returns 2070 ------- 2071 A list of `ALTER TABLE` or equivalent queries for the database flavor. 2072 """ 2073 old_table_name = sql_item_name(old_table, flavor, schema) 2074 new_table_name = sql_item_name(new_table, flavor, None) 2075 tmp_table = '_tmp_rename_' + new_table 2076 tmp_table_name = sql_item_name(tmp_table, flavor, schema) 2077 if flavor == 'mssql': 2078 return [f"EXEC sp_rename '{old_table}', '{new_table}'"] 2079 2080 if_exists_str = "IF EXISTS" if flavor in DROP_IF_EXISTS_FLAVORS else "" 2081 if flavor == 'duckdb': 2082 return ( 2083 get_create_table_queries( 2084 f"SELECT * FROM {old_table_name}", 2085 tmp_table, 2086 'duckdb', 2087 schema, 2088 ) + get_create_table_queries( 2089 f"SELECT * FROM {tmp_table_name}", 2090 new_table, 2091 'duckdb', 2092 schema, 2093 ) + [ 2094 f"DROP TABLE {if_exists_str} {tmp_table_name}", 2095 f"DROP TABLE {if_exists_str} {old_table_name}", 2096 ] 2097 ) 2098 2099 return [f"ALTER TABLE {old_table_name} RENAME TO {new_table_name}"]
Return queries to alter a table's name.
Parameters
- old_table (str): The unquoted name of the old table.
- new_table (str): The unquoted name of the new table.
- flavor (str):
The database flavor to use for the query (e.g.
'mssql'
,'postgresql'
. - schema (Optional[str], default None): The schema on which the table resides.
Returns
- A list of
ALTER TABLE
or equivalent queries for the database flavor.
2102def get_create_table_query( 2103 query_or_dtypes: Union[str, Dict[str, str]], 2104 new_table: str, 2105 flavor: str, 2106 schema: Optional[str] = None, 2107) -> str: 2108 """ 2109 NOTE: This function is deprecated. Use `get_create_table_queries()` instead. 2110 2111 Return a query to create a new table from a `SELECT` query. 2112 2113 Parameters 2114 ---------- 2115 query: Union[str, Dict[str, str]] 2116 The select query to use for the creation of the table. 2117 If a dictionary is provided, return a `CREATE TABLE` query from the given `dtypes` columns. 2118 2119 new_table: str 2120 The unquoted name of the new table. 2121 2122 flavor: str 2123 The database flavor to use for the query (e.g. `'mssql'`, `'postgresql'`). 2124 2125 schema: Optional[str], default None 2126 The schema on which the table will reside. 2127 2128 Returns 2129 ------- 2130 A `CREATE TABLE` (or `SELECT INTO`) query for the database flavor. 2131 """ 2132 return get_create_table_queries( 2133 query_or_dtypes, 2134 new_table, 2135 flavor, 2136 schema=schema, 2137 primary_key=None, 2138 )[0]
NOTE: This function is deprecated. Use get_create_table_queries()
instead.
Return a query to create a new table from a SELECT
query.
Parameters
- query (Union[str, Dict[str, str]]):
The select query to use for the creation of the table.
If a dictionary is provided, return a
CREATE TABLE
query from the givendtypes
columns. - new_table (str): The unquoted name of the new table.
- flavor (str):
The database flavor to use for the query (e.g.
'mssql'
,'postgresql'
). - schema (Optional[str], default None): The schema on which the table will reside.
Returns
- A
CREATE TABLE
(orSELECT INTO
) query for the database flavor.
2141def get_create_table_queries( 2142 query_or_dtypes: Union[str, Dict[str, str]], 2143 new_table: str, 2144 flavor: str, 2145 schema: Optional[str] = None, 2146 primary_key: Optional[str] = None, 2147 primary_key_db_type: Optional[str] = None, 2148 autoincrement: bool = False, 2149 datetime_column: Optional[str] = None, 2150) -> List[str]: 2151 """ 2152 Return a query to create a new table from a `SELECT` query or a `dtypes` dictionary. 2153 2154 Parameters 2155 ---------- 2156 query_or_dtypes: Union[str, Dict[str, str]] 2157 The select query to use for the creation of the table. 2158 If a dictionary is provided, return a `CREATE TABLE` query from the given `dtypes` columns. 2159 2160 new_table: str 2161 The unquoted name of the new table. 2162 2163 flavor: str 2164 The database flavor to use for the query (e.g. `'mssql'`, `'postgresql'`). 2165 2166 schema: Optional[str], default None 2167 The schema on which the table will reside. 2168 2169 primary_key: Optional[str], default None 2170 If provided, designate this column as the primary key in the new table. 2171 2172 primary_key_db_type: Optional[str], default None 2173 If provided, alter the primary key to this type (to set NOT NULL constraint). 2174 2175 autoincrement: bool, default False 2176 If `True` and `primary_key` is provided, create the `primary_key` column 2177 as an auto-incrementing integer column. 2178 2179 datetime_column: Optional[str], default None 2180 If provided, include this column in the primary key. 2181 Applicable to TimescaleDB only. 2182 2183 Returns 2184 ------- 2185 A `CREATE TABLE` (or `SELECT INTO`) query for the database flavor. 2186 """ 2187 if not isinstance(query_or_dtypes, (str, dict)): 2188 raise TypeError("`query_or_dtypes` must be a query or a dtypes dictionary.") 2189 2190 method = ( 2191 _get_create_table_query_from_cte 2192 if isinstance(query_or_dtypes, str) 2193 else _get_create_table_query_from_dtypes 2194 ) 2195 return method( 2196 query_or_dtypes, 2197 new_table, 2198 flavor, 2199 schema=schema, 2200 primary_key=primary_key, 2201 primary_key_db_type=primary_key_db_type, 2202 autoincrement=(autoincrement and flavor not in SKIP_AUTO_INCREMENT_FLAVORS), 2203 datetime_column=datetime_column, 2204 )
Return a query to create a new table from a SELECT
query or a dtypes
dictionary.
Parameters
- query_or_dtypes (Union[str, Dict[str, str]]):
The select query to use for the creation of the table.
If a dictionary is provided, return a
CREATE TABLE
query from the givendtypes
columns. - new_table (str): The unquoted name of the new table.
- flavor (str):
The database flavor to use for the query (e.g.
'mssql'
,'postgresql'
). - schema (Optional[str], default None): The schema on which the table will reside.
- primary_key (Optional[str], default None): If provided, designate this column as the primary key in the new table.
- primary_key_db_type (Optional[str], default None): If provided, alter the primary key to this type (to set NOT NULL constraint).
- autoincrement (bool, default False):
If
True
andprimary_key
is provided, create theprimary_key
column as an auto-incrementing integer column. - datetime_column (Optional[str], default None): If provided, include this column in the primary key. Applicable to TimescaleDB only.
Returns
- A
CREATE TABLE
(orSELECT INTO
) query for the database flavor.
2453def wrap_query_with_cte( 2454 sub_query: str, 2455 parent_query: str, 2456 flavor: str, 2457 cte_name: str = "src", 2458) -> str: 2459 """ 2460 Wrap a subquery in a CTE and append an encapsulating query. 2461 2462 Parameters 2463 ---------- 2464 sub_query: str 2465 The query to be referenced. This may itself contain CTEs. 2466 Unless `cte_name` is provided, this will be aliased as `src`. 2467 2468 parent_query: str 2469 The larger query to append which references the subquery. 2470 This must not contain CTEs. 2471 2472 flavor: str 2473 The database flavor, e.g. `'mssql'`. 2474 2475 cte_name: str, default 'src' 2476 The CTE alias, defaults to `src`. 2477 2478 Returns 2479 ------- 2480 An encapsulating query which allows you to treat `sub_query` as a temporary table. 2481 2482 Examples 2483 -------- 2484 2485 ```python 2486 from meerschaum.utils.sql import wrap_query_with_cte 2487 sub_query = "WITH foo AS (SELECT 1 AS val) SELECT (val * 2) AS newval FROM foo" 2488 parent_query = "SELECT newval * 3 FROM src" 2489 query = wrap_query_with_cte(sub_query, parent_query, 'mssql') 2490 print(query) 2491 # WITH foo AS (SELECT 1 AS val), 2492 # [src] AS ( 2493 # SELECT (val * 2) AS newval FROM foo 2494 # ) 2495 # SELECT newval * 3 FROM src 2496 ``` 2497 2498 """ 2499 import textwrap 2500 sub_query = sub_query.lstrip() 2501 cte_name_quoted = sql_item_name(cte_name, flavor, None) 2502 2503 if flavor in NO_CTE_FLAVORS: 2504 return ( 2505 parent_query 2506 .replace(cte_name_quoted, '--MRSM_SUBQUERY--') 2507 .replace(cte_name, '--MRSM_SUBQUERY--') 2508 .replace('--MRSM_SUBQUERY--', f"(\n{sub_query}\n) AS {cte_name_quoted}") 2509 ) 2510 2511 if sub_query.lstrip().lower().startswith('with '): 2512 final_select_ix = sub_query.lower().rfind('select') 2513 return ( 2514 sub_query[:final_select_ix].rstrip() + ',\n' 2515 + f"{cte_name_quoted} AS (\n" 2516 + ' ' + sub_query[final_select_ix:] 2517 + "\n)\n" 2518 + parent_query 2519 ) 2520 2521 return ( 2522 f"WITH {cte_name_quoted} AS (\n" 2523 f"{textwrap.indent(sub_query, ' ')}\n" 2524 f")\n{parent_query}" 2525 )
Wrap a subquery in a CTE and append an encapsulating query.
Parameters
- sub_query (str):
The query to be referenced. This may itself contain CTEs.
Unless
cte_name
is provided, this will be aliased assrc
. - parent_query (str): The larger query to append which references the subquery. This must not contain CTEs.
- flavor (str):
The database flavor, e.g.
'mssql'
. - cte_name (str, default 'src'):
The CTE alias, defaults to
src
.
Returns
- An encapsulating query which allows you to treat
sub_query
as a temporary table.
Examples
from meerschaum.utils.sql import wrap_query_with_cte
sub_query = "WITH foo AS (SELECT 1 AS val) SELECT (val * 2) AS newval FROM foo"
parent_query = "SELECT newval * 3 FROM src"
query = wrap_query_with_cte(sub_query, parent_query, 'mssql')
print(query)
# WITH foo AS (SELECT 1 AS val),
# [src] AS (
# SELECT (val * 2) AS newval FROM foo
# )
# SELECT newval * 3 FROM src
2528def format_cte_subquery( 2529 sub_query: str, 2530 flavor: str, 2531 sub_name: str = 'src', 2532 cols_to_select: Union[List[str], str] = '*', 2533) -> str: 2534 """ 2535 Given a subquery, build a wrapper query that selects from the CTE subquery. 2536 2537 Parameters 2538 ---------- 2539 sub_query: str 2540 The subquery to wrap. 2541 2542 flavor: str 2543 The database flavor to use for the query (e.g. `'mssql'`, `'postgresql'`. 2544 2545 sub_name: str, default 'src' 2546 If possible, give this name to the CTE (must be unquoted). 2547 2548 cols_to_select: Union[List[str], str], default '' 2549 If specified, choose which columns to select from the CTE. 2550 If a list of strings is provided, each item will be quoted and joined with commas. 2551 If a string is given, assume it is quoted and insert it into the query. 2552 2553 Returns 2554 ------- 2555 A wrapper query that selects from the CTE. 2556 """ 2557 quoted_sub_name = sql_item_name(sub_name, flavor, None) 2558 cols_str = ( 2559 cols_to_select 2560 if isinstance(cols_to_select, str) 2561 else ', '.join([sql_item_name(col, flavor, None) for col in cols_to_select]) 2562 ) 2563 parent_query = ( 2564 f"SELECT {cols_str}\n" 2565 f"FROM {quoted_sub_name}" 2566 ) 2567 return wrap_query_with_cte(sub_query, parent_query, flavor, cte_name=sub_name)
Given a subquery, build a wrapper query that selects from the CTE subquery.
Parameters
- sub_query (str): The subquery to wrap.
- flavor (str):
The database flavor to use for the query (e.g.
'mssql'
,'postgresql'
. - sub_name (str, default 'src'): If possible, give this name to the CTE (must be unquoted).
- cols_to_select (Union[List[str], str], default ''): If specified, choose which columns to select from the CTE. If a list of strings is provided, each item will be quoted and joined with commas. If a string is given, assume it is quoted and insert it into the query.
Returns
- A wrapper query that selects from the CTE.
2570def session_execute( 2571 session: 'sqlalchemy.orm.session.Session', 2572 queries: Union[List[str], str], 2573 with_results: bool = False, 2574 debug: bool = False, 2575) -> Union[mrsm.SuccessTuple, Tuple[mrsm.SuccessTuple, List['sqlalchemy.sql.ResultProxy']]]: 2576 """ 2577 Similar to `SQLConnector.exec_queries()`, execute a list of queries 2578 and roll back when one fails. 2579 2580 Parameters 2581 ---------- 2582 session: sqlalchemy.orm.session.Session 2583 A SQLAlchemy session representing a transaction. 2584 2585 queries: Union[List[str], str] 2586 A query or list of queries to be executed. 2587 If a query fails, roll back the session. 2588 2589 with_results: bool, default False 2590 If `True`, return a list of result objects. 2591 2592 Returns 2593 ------- 2594 A `SuccessTuple` indicating the queries were successfully executed. 2595 If `with_results`, return the `SuccessTuple` and a list of results. 2596 """ 2597 sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False) 2598 if not isinstance(queries, list): 2599 queries = [queries] 2600 successes, msgs, results = [], [], [] 2601 for query in queries: 2602 if debug: 2603 dprint(query) 2604 query_text = sqlalchemy.text(query) 2605 fail_msg = "Failed to execute queries." 2606 try: 2607 result = session.execute(query_text) 2608 query_success = result is not None 2609 query_msg = "Success" if query_success else fail_msg 2610 except Exception as e: 2611 query_success = False 2612 query_msg = f"{fail_msg}\n{e}" 2613 result = None 2614 successes.append(query_success) 2615 msgs.append(query_msg) 2616 results.append(result) 2617 if not query_success: 2618 if debug: 2619 dprint("Rolling back session.") 2620 session.rollback() 2621 break 2622 success, msg = all(successes), '\n'.join(msgs) 2623 if with_results: 2624 return (success, msg), results 2625 return success, msg
Similar to SQLConnector.exec_queries()
, execute a list of queries
and roll back when one fails.
Parameters
- session (sqlalchemy.orm.session.Session): A SQLAlchemy session representing a transaction.
- queries (Union[List[str], str]): A query or list of queries to be executed. If a query fails, roll back the session.
- with_results (bool, default False):
If
True
, return a list of result objects.
Returns
- A
SuccessTuple
indicating the queries were successfully executed. - If
with_results
, return theSuccessTuple
and a list of results.
2628def get_reset_autoincrement_queries( 2629 table: str, 2630 column: str, 2631 connector: mrsm.connectors.SQLConnector, 2632 schema: Optional[str] = None, 2633 debug: bool = False, 2634) -> List[str]: 2635 """ 2636 Return a list of queries to reset a table's auto-increment counter to the next largest value. 2637 2638 Parameters 2639 ---------- 2640 table: str 2641 The name of the table on which the auto-incrementing column exists. 2642 2643 column: str 2644 The name of the auto-incrementing column. 2645 2646 connector: mrsm.connectors.SQLConnector 2647 The SQLConnector to the database on which the table exists. 2648 2649 schema: Optional[str], default None 2650 The schema of the table. Defaults to `connector.schema`. 2651 2652 Returns 2653 ------- 2654 A list of queries to be executed to reset the auto-incrementing column. 2655 """ 2656 if not table_exists(table, connector, schema=schema, debug=debug): 2657 return [] 2658 2659 schema = schema or connector.schema 2660 max_id_name = sql_item_name('max_id', connector.flavor) 2661 table_name = sql_item_name(table, connector.flavor, schema) 2662 table_seq_name = sql_item_name(table + '_' + column + '_seq', connector.flavor, schema) 2663 column_name = sql_item_name(column, connector.flavor) 2664 max_id = connector.value( 2665 f""" 2666 SELECT COALESCE(MAX({column_name}), 0) AS {max_id_name} 2667 FROM {table_name} 2668 """, 2669 debug=debug, 2670 ) 2671 if max_id is None: 2672 return [] 2673 2674 reset_queries = reset_autoincrement_queries.get( 2675 connector.flavor, 2676 reset_autoincrement_queries['default'] 2677 ) 2678 if not isinstance(reset_queries, list): 2679 reset_queries = [reset_queries] 2680 2681 return [ 2682 query.format( 2683 column=column, 2684 column_name=column_name, 2685 table=table, 2686 table_name=table_name, 2687 table_seq_name=table_seq_name, 2688 val=max_id, 2689 val_plus_1=(max_id + 1), 2690 ) 2691 for query in reset_queries 2692 ]
Return a list of queries to reset a table's auto-increment counter to the next largest value.
Parameters
- table (str): The name of the table on which the auto-incrementing column exists.
- column (str): The name of the auto-incrementing column.
- connector (mrsm.connectors.SQLConnector): The SQLConnector to the database on which the table exists.
- schema (Optional[str], default None):
The schema of the table. Defaults to
connector.schema
.
Returns
- A list of queries to be executed to reset the auto-incrementing column.
2695def get_postgis_geo_columns_types( 2696 connectable: Union[ 2697 'mrsm.connectors.sql.SQLConnector', 2698 'sqlalchemy.orm.session.Session', 2699 'sqlalchemy.engine.base.Engine' 2700 ], 2701 table: str, 2702 schema: Optional[str] = 'public', 2703 debug: bool = False, 2704) -> Dict[str, str]: 2705 """ 2706 Return a dictionary mapping PostGIS geometry column names to geometry types. 2707 """ 2708 from meerschaum.utils.dtypes import get_geometry_type_srid 2709 sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False) 2710 default_type, default_srid = get_geometry_type_srid() 2711 default_type = default_type.upper() 2712 2713 clean(table) 2714 clean(str(schema)) 2715 schema = schema or 'public' 2716 truncated_schema_name = truncate_item_name(schema, flavor='postgis') 2717 truncated_table_name = truncate_item_name(table, flavor='postgis') 2718 query = sqlalchemy.text( 2719 "SELECT \"f_geometry_column\" AS \"column\", 'GEOMETRY' AS \"func\", \"type\", \"srid\"\n" 2720 "FROM \"geometry_columns\"\n" 2721 f"WHERE \"f_table_schema\" = '{truncated_schema_name}'\n" 2722 f" AND \"f_table_name\" = '{truncated_table_name}'\n" 2723 "UNION ALL\n" 2724 "SELECT \"f_geography_column\" AS \"column\", 'GEOGRAPHY' AS \"func\", \"type\", \"srid\"\n" 2725 "FROM \"geography_columns\"\n" 2726 f"WHERE \"f_table_schema\" = '{truncated_schema_name}'\n" 2727 f" AND \"f_table_name\" = '{truncated_table_name}'\n" 2728 ) 2729 debug_kwargs = {'debug': debug} if isinstance(connectable, mrsm.connectors.SQLConnector) else {} 2730 result_rows = [ 2731 row 2732 for row in connectable.execute(query, **debug_kwargs).fetchall() 2733 ] 2734 cols_type_tuples = { 2735 row[0]: (row[1], row[2], row[3]) 2736 for row in result_rows 2737 } 2738 2739 geometry_cols_types = { 2740 col: ( 2741 f"{func}({typ.upper()}, {srid})" 2742 if srid 2743 else ( 2744 func 2745 + ( 2746 f'({typ.upper()})' 2747 if typ.upper() not in ('GEOMETRY', 'GEOGRAPHY') 2748 else '' 2749 ) 2750 ) 2751 ) 2752 for col, (func, typ, srid) in cols_type_tuples.items() 2753 } 2754 return geometry_cols_types
Return a dictionary mapping PostGIS geometry column names to geometry types.
2757def get_create_schema_if_not_exists_queries( 2758 schema: str, 2759 flavor: str, 2760) -> List[str]: 2761 """ 2762 Return the queries to create a schema if it does not yet exist. 2763 For databases which do not support schemas, an empty list will be returned. 2764 """ 2765 if not schema: 2766 return [] 2767 2768 if flavor in NO_SCHEMA_FLAVORS: 2769 return [] 2770 2771 if schema == DEFAULT_SCHEMA_FLAVORS.get(flavor, None): 2772 return [] 2773 2774 clean(schema) 2775 2776 if flavor == 'mssql': 2777 return [ 2778 ( 2779 f"IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = '{schema}')\n" 2780 "BEGIN\n" 2781 f" EXEC('CREATE SCHEMA {sql_item_name(schema, flavor)}');\n" 2782 "END;" 2783 ) 2784 ] 2785 2786 if flavor == 'oracle': 2787 return [] 2788 2789 return [ 2790 f"CREATE SCHEMA IF NOT EXISTS {sql_item_name(schema, flavor)};" 2791 ]
Return the queries to create a schema if it does not yet exist. For databases which do not support schemas, an empty list will be returned.