meerschaum.utils.sql

Flavor-specific SQL tools.

   1#! /usr/bin/env python
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4
   5"""
   6Flavor-specific SQL tools.
   7"""
   8
   9from __future__ import annotations
  10
  11from datetime import datetime, timezone, timedelta
  12import meerschaum as mrsm
  13from meerschaum.utils.typing import Optional, Dict, Any, Union, List, Iterable, Tuple
  14### Preserve legacy imports.
  15from meerschaum.utils.dtypes.sql import (
  16    DB_TO_PD_DTYPES,
  17    PD_TO_DB_DTYPES_FLAVORS,
  18    get_pd_type_from_db_type as get_pd_type,
  19    get_db_type_from_pd_type as get_db_type,
  20    TIMEZONE_NAIVE_FLAVORS,
  21)
  22from meerschaum.utils.warnings import warn
  23from meerschaum.utils.debug import dprint
  24
  25test_queries = {
  26    'default'    : 'SELECT 1',
  27    'oracle'     : 'SELECT 1 FROM DUAL',
  28    'informix'   : 'SELECT COUNT(*) FROM systables',
  29    'hsqldb'     : 'SELECT 1 FROM INFORMATION_SCHEMA.SYSTEM_USERS',
  30}
  31### `table_name` is the escaped name of the table.
  32### `table` is the unescaped name of the table.
  33exists_queries = {
  34    'default': "SELECT COUNT(*) FROM {table_name} WHERE 1 = 0",
  35}
  36version_queries = {
  37    'default': "SELECT VERSION() AS {version_name}",
  38    'sqlite': "SELECT SQLITE_VERSION() AS {version_name}",
  39    'geopackage': "SELECT SQLITE_VERSION() AS {version_name}",
  40    'mssql': "SELECT @@version",
  41    'oracle': "SELECT version from PRODUCT_COMPONENT_VERSION WHERE rownum = 1",
  42}
  43SKIP_IF_EXISTS_FLAVORS = {'mssql', 'oracle'}
  44DROP_IF_EXISTS_FLAVORS = {
  45    'timescaledb',
  46    'timescaledb-ha',
  47    'postgresql',
  48    'postgis',
  49    'citus',
  50    'mssql',
  51    'mysql',
  52    'mariadb',
  53    'sqlite',
  54    'geopackage',
  55}
  56DROP_INDEX_IF_EXISTS_FLAVORS = {
  57    'mssql',
  58    'timescaledb',
  59    'timescaledb-ha',
  60    'postgresql',
  61    'postgis',
  62    'sqlite',
  63    'geopackage',
  64    'citus',
  65}
  66SKIP_AUTO_INCREMENT_FLAVORS = {'citus', 'duckdb'}
  67COALESCE_UNIQUE_INDEX_FLAVORS = {
  68    'timescaledb',
  69    'timescaledb-ha',
  70    'postgresql',
  71    'postgis',
  72    'citus',
  73}
  74UPDATE_QUERIES = {
  75    'default': """
  76        UPDATE {target_table_name} AS f
  77        {sets_subquery_none}
  78        FROM {target_table_name} AS t
  79        INNER JOIN (SELECT {patch_cols_str} FROM {patch_table_name}) AS p
  80            ON
  81                {and_subquery_t}
  82        WHERE
  83            {and_subquery_f}
  84            AND
  85            {date_bounds_subquery}
  86    """,
  87    'timescaledb-upsert': """
  88        INSERT INTO {target_table_name} ({patch_cols_str})
  89        SELECT {patch_cols_str}
  90        FROM {patch_table_name}
  91        ON CONFLICT ({join_cols_str}) DO {update_or_nothing} {sets_subquery_none_excluded}
  92    """,
  93    'timescaledb-ha-upsert': """
  94        INSERT INTO {target_table_name} ({patch_cols_str})
  95        SELECT {patch_cols_str}
  96        FROM {patch_table_name}
  97        ON CONFLICT ({join_cols_str}) DO {update_or_nothing} {sets_subquery_none_excluded}
  98    """,
  99    'postgresql-upsert': """
 100        INSERT INTO {target_table_name} ({patch_cols_str})
 101        SELECT {patch_cols_str}
 102        FROM {patch_table_name}
 103        ON CONFLICT ({join_cols_str}) DO {update_or_nothing} {sets_subquery_none_excluded}
 104    """,
 105    'postgis-upsert': """
 106        INSERT INTO {target_table_name} ({patch_cols_str})
 107        SELECT {patch_cols_str}
 108        FROM {patch_table_name}
 109        ON CONFLICT ({join_cols_str}) DO {update_or_nothing} {sets_subquery_none_excluded}
 110    """,
 111    'citus-upsert': """
 112        INSERT INTO {target_table_name} ({patch_cols_str})
 113        SELECT {patch_cols_str}
 114        FROM {patch_table_name}
 115        ON CONFLICT ({join_cols_str}) DO {update_or_nothing} {sets_subquery_none_excluded}
 116    """,
 117    'cockroachdb-upsert': """
 118        INSERT INTO {target_table_name} ({patch_cols_str})
 119        SELECT {patch_cols_str}
 120        FROM {patch_table_name}
 121        ON CONFLICT ({join_cols_str}) DO {update_or_nothing} {sets_subquery_none_excluded}
 122    """,
 123    'mysql': """
 124        UPDATE {target_table_name} AS f
 125        JOIN (SELECT {patch_cols_str} FROM {patch_table_name}) AS p
 126        ON
 127            {and_subquery_f}
 128        {sets_subquery_f}
 129        WHERE
 130            {date_bounds_subquery}
 131    """,
 132    'mysql-upsert': """
 133        INSERT {ignore}INTO {target_table_name} ({patch_cols_str})
 134        SELECT {patch_cols_str}
 135        FROM {patch_table_name}
 136        {on_duplicate_key_update}
 137            {cols_equal_values}
 138    """,
 139    'mariadb': """
 140        UPDATE {target_table_name} AS f
 141        JOIN (SELECT {patch_cols_str} FROM {patch_table_name}) AS p
 142        ON
 143            {and_subquery_f}
 144        {sets_subquery_f}
 145        WHERE
 146            {date_bounds_subquery}
 147    """,
 148    'mariadb-upsert': """
 149        INSERT {ignore}INTO {target_table_name} ({patch_cols_str})
 150        SELECT {patch_cols_str}
 151        FROM {patch_table_name}
 152        {on_duplicate_key_update}
 153            {cols_equal_values}
 154    """,
 155    'mssql': """
 156        {with_temp_date_bounds}
 157        MERGE {target_table_name} f
 158            USING (SELECT {patch_cols_str} FROM {patch_table_name}) p
 159            ON
 160                {and_subquery_f}
 161            AND
 162                {date_bounds_subquery}
 163        WHEN MATCHED THEN
 164            UPDATE
 165            {sets_subquery_none};
 166    """,
 167    'mssql-upsert': [
 168        "{identity_insert_on}",
 169        """
 170        {with_temp_date_bounds}
 171        MERGE {target_table_name} f
 172            USING (SELECT {patch_cols_str} FROM {patch_table_name}) p
 173            ON
 174                {and_subquery_f}
 175            AND
 176                {date_bounds_subquery}{when_matched_update_sets_subquery_none}
 177        WHEN NOT MATCHED THEN
 178            INSERT ({patch_cols_str})
 179            VALUES ({patch_cols_prefixed_str});
 180        """,
 181        "{identity_insert_off}",
 182    ],
 183    'oracle': """
 184        MERGE INTO {target_table_name} f
 185            USING (SELECT {patch_cols_str} FROM {patch_table_name}) p
 186            ON (
 187                {and_subquery_f}
 188                AND
 189                {date_bounds_subquery}
 190            )
 191            WHEN MATCHED THEN
 192                UPDATE
 193                {sets_subquery_none}
 194    """,
 195    'oracle-upsert': """
 196        MERGE INTO {target_table_name} f
 197            USING (SELECT {patch_cols_str} FROM {patch_table_name}) p
 198            ON (
 199                {and_subquery_f}
 200                AND
 201                {date_bounds_subquery}
 202            ){when_matched_update_sets_subquery_none}
 203            WHEN NOT MATCHED THEN
 204                INSERT ({patch_cols_str})
 205                VALUES ({patch_cols_prefixed_str})
 206    """,
 207    'sqlite-upsert': """
 208        INSERT INTO {target_table_name} ({patch_cols_str})
 209        SELECT {patch_cols_str}
 210        FROM {patch_table_name}
 211        WHERE true
 212        ON CONFLICT ({join_cols_str}) DO {update_or_nothing} {sets_subquery_none_excluded}
 213    """,
 214    'sqlite_delete_insert': [
 215        """
 216        DELETE FROM {target_table_name} AS f
 217        WHERE ROWID IN (
 218            SELECT t.ROWID
 219            FROM {target_table_name} AS t
 220            INNER JOIN (SELECT * FROM {patch_table_name}) AS p
 221                ON {and_subquery_t}
 222        );
 223        """,
 224        """
 225        INSERT INTO {target_table_name} AS f
 226        SELECT {patch_cols_str} FROM {patch_table_name} AS p
 227        """,
 228    ],
 229    'geopackage-upsert': """
 230        INSERT INTO {target_table_name} ({patch_cols_str})
 231        SELECT {patch_cols_str}
 232        FROM {patch_table_name}
 233        WHERE true
 234        ON CONFLICT ({join_cols_str}) DO {update_or_nothing} {sets_subquery_none_excluded}
 235    """,
 236}
 237columns_types_queries = {
 238    'default': """
 239        SELECT
 240            table_catalog AS database,
 241            table_schema AS schema,
 242            table_name AS table,
 243            column_name AS column,
 244            data_type AS type,
 245            numeric_precision,
 246            numeric_scale
 247        FROM information_schema.columns
 248        WHERE table_name IN ('{table}', '{table_trunc}')
 249    """,
 250    'sqlite': """
 251        SELECT
 252            '' "database",
 253            '' "schema",
 254            m.name "table",
 255            p.name "column",
 256            p.type "type"
 257        FROM sqlite_master m
 258        LEFT OUTER JOIN pragma_table_info(m.name) p
 259            ON m.name <> p.name
 260        WHERE m.type = 'table'
 261            AND m.name IN ('{table}', '{table_trunc}')
 262    """,
 263    'geopackage': """
 264        SELECT
 265            '' "database",
 266            '' "schema",
 267            m.name "table",
 268            p.name "column",
 269            p.type "type"
 270        FROM sqlite_master m
 271        LEFT OUTER JOIN pragma_table_info(m.name) p
 272            ON m.name <> p.name
 273        WHERE m.type = 'table'
 274            AND m.name IN ('{table}', '{table_trunc}')
 275    """,
 276    'mssql': """
 277        SELECT
 278            TABLE_CATALOG AS [database],
 279            TABLE_SCHEMA AS [schema],
 280            TABLE_NAME AS [table],
 281            COLUMN_NAME AS [column],
 282            DATA_TYPE AS [type],
 283            NUMERIC_PRECISION AS [numeric_precision],
 284            NUMERIC_SCALE AS [numeric_scale]
 285        FROM {db_prefix}INFORMATION_SCHEMA.COLUMNS WITH (NOLOCK)
 286        WHERE TABLE_NAME IN (
 287            '{table}',
 288            '{table_trunc}'
 289        )
 290
 291    """,
 292    'mysql': """
 293        SELECT
 294            TABLE_SCHEMA `database`,
 295            TABLE_SCHEMA `schema`,
 296            TABLE_NAME `table`,
 297            COLUMN_NAME `column`,
 298            DATA_TYPE `type`,
 299            NUMERIC_PRECISION `numeric_precision`,
 300            NUMERIC_SCALE `numeric_scale`
 301        FROM INFORMATION_SCHEMA.COLUMNS
 302        WHERE TABLE_NAME IN ('{table}', '{table_trunc}')
 303    """,
 304    'mariadb': """
 305        SELECT
 306            TABLE_SCHEMA `database`,
 307            TABLE_SCHEMA `schema`,
 308            TABLE_NAME `table`,
 309            COLUMN_NAME `column`,
 310            DATA_TYPE `type`,
 311            NUMERIC_PRECISION `numeric_precision`,
 312            NUMERIC_SCALE `numeric_scale`
 313        FROM INFORMATION_SCHEMA.COLUMNS
 314        WHERE TABLE_NAME IN ('{table}', '{table_trunc}')
 315    """,
 316    'oracle': """
 317        SELECT
 318            NULL AS "database",
 319            NULL AS "schema",
 320            TABLE_NAME AS "table",
 321            COLUMN_NAME AS "column",
 322            DATA_TYPE AS "type",
 323            DATA_PRECISION AS "numeric_precision",
 324            DATA_SCALE AS "numeric_scale"
 325        FROM all_tab_columns
 326        WHERE TABLE_NAME IN (
 327            '{table}',
 328            '{table_trunc}',
 329            '{table_lower}',
 330            '{table_lower_trunc}',
 331            '{table_upper}',
 332            '{table_upper_trunc}'
 333        )
 334    """,
 335}
 336hypertable_queries = {
 337    'timescaledb': 'SELECT hypertable_size(\'{table_name}\')',
 338    'timescaledb-ha': 'SELECT hypertable_size(\'{table_name}\')',
 339    'citus': 'SELECT citus_table_size(\'{table_name}\')',
 340}
 341columns_indices_queries = {
 342    'default': """
 343        SELECT
 344            current_database() AS "database",
 345            n.nspname AS "schema",
 346            t.relname AS "table",
 347            c.column_name AS "column",
 348            i.relname AS "index",
 349            CASE WHEN con.contype = 'p' THEN 'PRIMARY KEY' ELSE 'INDEX' END AS "index_type"
 350        FROM pg_class t
 351        INNER JOIN pg_index AS ix
 352            ON t.oid = ix.indrelid
 353        INNER JOIN pg_class AS i
 354            ON i.oid = ix.indexrelid
 355        INNER JOIN pg_namespace AS n
 356            ON n.oid = t.relnamespace
 357        INNER JOIN pg_attribute AS a
 358            ON a.attnum = ANY(ix.indkey)
 359            AND a.attrelid = t.oid
 360        INNER JOIN information_schema.columns AS c
 361            ON c.column_name = a.attname
 362            AND c.table_name = t.relname
 363            AND c.table_schema = n.nspname
 364        LEFT JOIN pg_constraint AS con
 365            ON con.conindid = i.oid
 366            AND con.contype = 'p'
 367        WHERE
 368            t.relname IN ('{table}', '{table_trunc}')
 369            AND n.nspname = '{schema}'
 370    """,
 371    'sqlite': """
 372        WITH indexed_columns AS (
 373            SELECT
 374                '{table}' AS table_name,
 375                pi.name AS column_name,
 376                i.name AS index_name,
 377                'INDEX' AS index_type
 378            FROM
 379                sqlite_master AS i,
 380                pragma_index_info(i.name) AS pi
 381            WHERE
 382                i.type = 'index'
 383                AND i.tbl_name = '{table}'
 384        ),
 385        primary_key_columns AS (
 386            SELECT
 387                '{table}' AS table_name,
 388                ti.name AS column_name,
 389                'PRIMARY_KEY' AS index_name,
 390                'PRIMARY KEY' AS index_type
 391            FROM
 392                pragma_table_info('{table}') AS ti
 393            WHERE
 394                ti.pk > 0
 395        )
 396        SELECT
 397            NULL AS "database",
 398            NULL AS "schema",
 399            "table_name" AS "table",
 400            "column_name" AS "column",
 401            "index_name" AS "index",
 402            "index_type"
 403        FROM indexed_columns
 404        UNION ALL
 405        SELECT
 406            NULL AS "database",
 407            NULL AS "schema",
 408            table_name AS "table",
 409            column_name AS "column",
 410            index_name AS "index",
 411            index_type
 412        FROM primary_key_columns
 413    """,
 414    'geopackage': """
 415        WITH indexed_columns AS (
 416            SELECT
 417                '{table}' AS table_name,
 418                pi.name AS column_name,
 419                i.name AS index_name,
 420                'INDEX' AS index_type
 421            FROM
 422                sqlite_master AS i,
 423                pragma_index_info(i.name) AS pi
 424            WHERE
 425                i.type = 'index'
 426                AND i.tbl_name = '{table}'
 427        ),
 428        primary_key_columns AS (
 429            SELECT
 430                '{table}' AS table_name,
 431                ti.name AS column_name,
 432                'PRIMARY_KEY' AS index_name,
 433                'PRIMARY KEY' AS index_type
 434            FROM
 435                pragma_table_info('{table}') AS ti
 436            WHERE
 437                ti.pk > 0
 438        )
 439        SELECT
 440            NULL AS "database",
 441            NULL AS "schema",
 442            "table_name" AS "table",
 443            "column_name" AS "column",
 444            "index_name" AS "index",
 445            "index_type"
 446        FROM indexed_columns
 447        UNION ALL
 448        SELECT
 449            NULL AS "database",
 450            NULL AS "schema",
 451            table_name AS "table",
 452            column_name AS "column",
 453            index_name AS "index",
 454            index_type
 455        FROM primary_key_columns
 456    """,
 457    'mssql': """
 458        SELECT
 459            NULL AS [database],
 460            s.name AS [schema],
 461            t.name AS [table],
 462            c.name AS [column],
 463            i.name AS [index],
 464            CASE
 465                WHEN kc.type = 'PK' THEN 'PRIMARY KEY'
 466                ELSE 'INDEX'
 467            END AS [index_type],
 468            CASE
 469                WHEN i.type = 1 THEN CAST(1 AS BIT)
 470                ELSE CAST(0 AS BIT)
 471            END AS [clustered]
 472        FROM
 473            sys.schemas s WITH (NOLOCK)
 474        INNER JOIN sys.tables t WITH (NOLOCK)
 475            ON s.schema_id = t.schema_id
 476        INNER JOIN sys.indexes i WITH (NOLOCK)
 477            ON t.object_id = i.object_id
 478        INNER JOIN sys.index_columns ic WITH (NOLOCK)
 479            ON i.object_id = ic.object_id
 480            AND i.index_id = ic.index_id
 481        INNER JOIN sys.columns c WITH (NOLOCK)
 482            ON ic.object_id = c.object_id
 483            AND ic.column_id = c.column_id
 484        LEFT JOIN sys.key_constraints kc WITH (NOLOCK)
 485            ON kc.parent_object_id = i.object_id
 486            AND kc.type = 'PK'
 487            AND kc.name = i.name
 488        WHERE
 489            t.name IN ('{table}', '{table_trunc}')
 490            AND s.name = '{schema}'
 491            AND i.type IN (1, 2)
 492    """,
 493    'oracle': """
 494        SELECT
 495            NULL AS "database",
 496            ic.table_owner AS "schema",
 497            ic.table_name AS "table",
 498            ic.column_name AS "column",
 499            i.index_name AS "index",
 500            CASE
 501                WHEN c.constraint_type = 'P' THEN 'PRIMARY KEY'
 502                WHEN i.uniqueness = 'UNIQUE' THEN 'UNIQUE INDEX'
 503                ELSE 'INDEX'
 504            END AS index_type
 505        FROM
 506            all_ind_columns ic
 507        INNER JOIN all_indexes i
 508            ON ic.index_name = i.index_name
 509            AND ic.table_owner = i.owner
 510        LEFT JOIN all_constraints c
 511            ON i.index_name = c.constraint_name
 512            AND i.table_owner = c.owner
 513            AND c.constraint_type = 'P'
 514        WHERE ic.table_name IN (
 515            '{table}',
 516            '{table_trunc}',
 517            '{table_upper}',
 518            '{table_upper_trunc}'
 519        )
 520    """,
 521    'mysql': """
 522        SELECT
 523            TABLE_SCHEMA AS `database`,
 524            TABLE_SCHEMA AS `schema`,
 525            TABLE_NAME AS `table`,
 526            COLUMN_NAME AS `column`,
 527            INDEX_NAME AS `index`,
 528            CASE
 529                WHEN NON_UNIQUE = 0 THEN 'PRIMARY KEY'
 530                ELSE 'INDEX'
 531            END AS `index_type`
 532        FROM
 533            information_schema.STATISTICS
 534        WHERE
 535            TABLE_NAME IN ('{table}', '{table_trunc}')
 536    """,
 537    'mariadb': """
 538        SELECT
 539            TABLE_SCHEMA AS `database`,
 540            TABLE_SCHEMA AS `schema`,
 541            TABLE_NAME AS `table`,
 542            COLUMN_NAME AS `column`,
 543            INDEX_NAME AS `index`,
 544            CASE
 545                WHEN NON_UNIQUE = 0 THEN 'PRIMARY KEY'
 546                ELSE 'INDEX'
 547            END AS `index_type`
 548        FROM
 549            information_schema.STATISTICS
 550        WHERE
 551            TABLE_NAME IN ('{table}', '{table_trunc}')
 552    """,
 553}
 554reset_autoincrement_queries: Dict[str, Union[str, List[str]]] = {
 555    'default': """
 556        SELECT SETVAL(pg_get_serial_sequence('{table_name}', '{column}'), {val})
 557        FROM {table_name}
 558    """,
 559    'mssql': """
 560        DBCC CHECKIDENT ('{table_name}', RESEED, {val})
 561    """,
 562    'mysql': """
 563        ALTER TABLE {table_name} AUTO_INCREMENT = {val}
 564    """,
 565    'mariadb': """
 566        ALTER TABLE {table_name} AUTO_INCREMENT = {val}
 567    """,
 568    'sqlite': """
 569        UPDATE sqlite_sequence
 570        SET seq = {val}
 571        WHERE name = '{table}'
 572    """,
 573    'geopackage': """
 574        UPDATE sqlite_sequence
 575        SET seq = {val}
 576        WHERE name = '{table}'
 577    """,
 578    'oracle': (
 579        "ALTER TABLE {table_name} MODIFY {column_name} "
 580        "GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH {val_plus_1})"
 581    ),
 582}
 583table_wrappers = {
 584    'default'       : ('"', '"'),
 585    'timescaledb'   : ('"', '"'),
 586    'timescaledb-ha': ('"', '"'),
 587    'citus'         : ('"', '"'),
 588    'duckdb'        : ('"', '"'),
 589    'postgresql'    : ('"', '"'),
 590    'postgis'       : ('"', '"'),
 591    'sqlite'        : ('"', '"'),
 592    'geopackage'    : ('"', '"'),
 593    'mysql'         : ('`', '`'),
 594    'mariadb'       : ('`', '`'),
 595    'mssql'         : ('[', ']'),
 596    'cockroachdb'   : ('"', '"'),
 597    'oracle'        : ('"', '"'),
 598}
 599max_name_lens = {
 600    'default'       : 64,
 601    'mssql'         : 128,
 602    'oracle'        : 30,
 603    'postgresql'    : 64,
 604    'postgis'       : 64,
 605    'timescaledb'   : 64,
 606    'timescaledb-ha': 64,
 607    'citus'         : 64,
 608    'cockroachdb'   : 64,
 609    'sqlite'        : 1024,
 610    'geopackage'    : 1024,
 611    'mysql'         : 64,
 612    'mariadb'       : 64,
 613}
 614json_flavors = {
 615    'postgresql',
 616    'postgis',
 617    'timescaledb',
 618    'timescaledb-ha',
 619    'citus',
 620    'cockroachdb',
 621}
 622NO_SCHEMA_FLAVORS = {
 623    'oracle',
 624    'sqlite',
 625    'geopackage',
 626    'mysql',
 627    'mariadb',
 628    'duckdb',
 629}
 630DEFAULT_SCHEMA_FLAVORS = {
 631    'postgresql': 'public',
 632    'postgis': 'public',
 633    'timescaledb': 'public',
 634    'timescaledb-ha': 'public',
 635    'citus': 'public',
 636    'cockroachdb': 'public',
 637    'mysql': 'mysql',
 638    'mariadb': 'mysql',
 639    'mssql': 'dbo',
 640}
 641OMIT_NULLSFIRST_FLAVORS = {
 642    'mariadb',
 643    'mysql',
 644    'mssql',
 645}
 646
 647SINGLE_ALTER_TABLE_FLAVORS = {
 648    'duckdb',
 649    'sqlite',
 650    'geopackage',
 651    'mssql',
 652    'oracle',
 653}
 654NO_CTE_FLAVORS = {
 655    'mysql',
 656    'mariadb',
 657}
 658NO_SELECT_INTO_FLAVORS = {
 659    'sqlite',
 660    'geopackage',
 661    'oracle',
 662    'mysql',
 663    'mariadb',
 664    'duckdb',
 665}
 666
 667
 668def clean(substring: str) -> None:
 669    """
 670    Ensure a substring is clean enough to be inserted into a SQL query.
 671    Raises an exception when banned words are used.
 672    """
 673    from meerschaum.utils.warnings import error
 674    banned_symbols = [';', '--', 'drop ',]
 675    for symbol in banned_symbols:
 676        if symbol in str(substring).lower():
 677            error(f"Invalid string: '{substring}'")
 678
 679
 680def dateadd_str(
 681    flavor: str = 'postgresql',
 682    datepart: str = 'day',
 683    number: Union[int, float] = 0,
 684    begin: Union[str, datetime, int] = 'now',
 685    db_type: Optional[str] = None,
 686) -> str:
 687    """
 688    Generate a `DATEADD` clause depending on database flavor.
 689
 690    Parameters
 691    ----------
 692    flavor: str, default `'postgresql'`
 693        SQL database flavor, e.g. `'postgresql'`, `'sqlite'`.
 694
 695        Currently supported flavors:
 696
 697        - `'postgresql'`
 698        - `'postgis'`
 699        - `'timescaledb'`
 700        - `'timescaledb-ha'`
 701        - `'citus'`
 702        - `'cockroachdb'`
 703        - `'duckdb'`
 704        - `'mssql'`
 705        - `'mysql'`
 706        - `'mariadb'`
 707        - `'sqlite'`
 708        - `'geopackage'`
 709        - `'oracle'`
 710
 711    datepart: str, default `'day'`
 712        Which part of the date to modify. Supported values:
 713
 714        - `'year'`
 715        - `'month'`
 716        - `'day'`
 717        - `'hour'`
 718        - `'minute'`
 719        - `'second'`
 720
 721    number: Union[int, float], default `0`
 722        How many units to add to the date part.
 723
 724    begin: Union[str, datetime], default `'now'`
 725        Base datetime to which to add dateparts.
 726
 727    db_type: Optional[str], default None
 728        If provided, cast the datetime string as the type.
 729        Otherwise, infer this from the input datetime value.
 730
 731    Returns
 732    -------
 733    The appropriate `DATEADD` string for the corresponding database flavor.
 734
 735    Examples
 736    --------
 737    >>> dateadd_str(
 738    ...     flavor='mssql',
 739    ...     begin=datetime(2022, 1, 1, 0, 0),
 740    ...     number=1,
 741    ... )
 742    "DATEADD(day, 1, CAST('2022-01-01 00:00:00' AS DATETIME2))"
 743    >>> dateadd_str(
 744    ...     flavor='postgresql',
 745    ...     begin=datetime(2022, 1, 1, 0, 0),
 746    ...     number=1,
 747    ... )
 748    "CAST('2022-01-01 00:00:00' AS TIMESTAMP) + INTERVAL '1 day'"
 749
 750    """
 751    from meerschaum.utils.packages import attempt_import
 752    from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type, get_pd_type_from_db_type
 753    dateutil_parser = attempt_import('dateutil.parser')
 754    if 'int' in str(type(begin)).lower():
 755        num_str = str(begin)
 756        if number is not None and number != 0:
 757            num_str += (
 758                f' + {number}'
 759                if number > 0
 760                else f" - {number * -1}"
 761            )
 762        return num_str
 763    if not begin:
 764        return ''
 765
 766    _original_begin = begin
 767    begin_time = None
 768    ### Sanity check: make sure `begin` is a valid datetime before we inject anything.
 769    if not isinstance(begin, datetime):
 770        try:
 771            begin_time = dateutil_parser.parse(begin)
 772        except Exception:
 773            begin_time = None
 774    else:
 775        begin_time = begin
 776
 777    ### Unable to parse into a datetime.
 778    if begin_time is None:
 779        ### Throw an error if banned symbols are included in the `begin` string.
 780        clean(str(begin))
 781    ### If begin is a valid datetime, wrap it in quotes.
 782    else:
 783        if isinstance(begin, datetime) and begin.tzinfo is not None:
 784            begin = begin.astimezone(timezone.utc)
 785        begin = (
 786            f"'{begin.replace(tzinfo=None)}'"
 787            if isinstance(begin, datetime) and flavor in TIMEZONE_NAIVE_FLAVORS
 788            else f"'{begin}'"
 789        )
 790
 791    dt_is_utc = (
 792        begin_time.tzinfo is not None
 793        if begin_time is not None
 794        else ('+' in str(begin) or '-' in str(begin).split(':', maxsplit=1)[-1])
 795    )
 796    if db_type:
 797        db_type_is_utc = 'utc' in get_pd_type_from_db_type(db_type).lower()
 798        dt_is_utc = dt_is_utc or db_type_is_utc
 799    db_type = db_type or get_db_type_from_pd_type(
 800        ('datetime64[ns, UTC]' if dt_is_utc else 'datetime64[ns]'),
 801        flavor=flavor,
 802    )
 803
 804    da = ""
 805    if flavor in (
 806        'postgresql',
 807        'postgis',
 808        'timescaledb',
 809        'timescaledb-ha',
 810        'cockroachdb',
 811        'citus',
 812    ):
 813        begin = (
 814            f"CAST({begin} AS {db_type})" if begin != 'now'
 815            else f"CAST(NOW() AT TIME ZONE 'utc' AS {db_type})"
 816        )
 817        if dt_is_utc:
 818            begin += " AT TIME ZONE 'UTC'"
 819        da = begin + (f" + INTERVAL '{number} {datepart}'" if number != 0 else '')
 820
 821    elif flavor == 'duckdb':
 822        begin = f"CAST({begin} AS {db_type})" if begin != 'now' else 'NOW()'
 823        if dt_is_utc:
 824            begin += " AT TIME ZONE 'UTC'"
 825        da = begin + (f" + INTERVAL '{number} {datepart}'" if number != 0 else '')
 826
 827    elif flavor in ('mssql',):
 828        if begin_time and begin_time.microsecond != 0 and not dt_is_utc:
 829            begin = begin[:-4] + "'"
 830        begin = f"CAST({begin} AS {db_type})" if begin != 'now' else 'GETUTCDATE()'
 831        if dt_is_utc:
 832            begin += " AT TIME ZONE 'UTC'"
 833        da = f"DATEADD({datepart}, {number}, {begin})" if number != 0 else begin
 834
 835    elif flavor in ('mysql', 'mariadb'):
 836        begin = (
 837            f"CAST({begin} AS DATETIME(6))"
 838            if begin != 'now'
 839            else 'UTC_TIMESTAMP(6)'
 840        )
 841        da = (f"DATE_ADD({begin}, INTERVAL {number} {datepart})" if number != 0 else begin)
 842
 843    elif flavor in ('sqlite', 'geopackage'):
 844        da = f"datetime({begin}, '{number} {datepart}')"
 845
 846    elif flavor == 'oracle':
 847        if begin == 'now':
 848            begin = str(
 849                datetime.now(timezone.utc).replace(tzinfo=None).strftime(r'%Y:%m:%d %M:%S.%f')
 850            )
 851        elif begin_time:
 852            begin = str(begin_time.strftime(r'%Y-%m-%d %H:%M:%S.%f'))
 853        dt_format = 'YYYY-MM-DD HH24:MI:SS.FF'
 854        _begin = f"'{begin}'" if begin_time else begin
 855        da = (
 856            (f"TO_TIMESTAMP({_begin}, '{dt_format}')" if begin_time else _begin)
 857            + (f" + INTERVAL '{number}' {datepart}" if number != 0 else "")
 858        )
 859    return da
 860
 861
 862def test_connection(
 863    self,
 864    **kw: Any
 865) -> Union[bool, None]:
 866    """
 867    Test if a successful connection to the database may be made.
 868
 869    Parameters
 870    ----------
 871    **kw:
 872        The keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`.
 873
 874    Returns
 875    -------
 876    `True` if a connection is made, otherwise `False` or `None` in case of failure.
 877
 878    """
 879    import warnings
 880    from meerschaum.connectors.poll import retry_connect
 881    _default_kw = {'max_retries': 1, 'retry_wait': 0, 'warn': False, 'connector': self}
 882    _default_kw.update(kw)
 883    with warnings.catch_warnings():
 884        warnings.filterwarnings('ignore', 'Could not')
 885        try:
 886            return retry_connect(**_default_kw)
 887        except Exception:
 888            return False
 889
 890
 891def get_distinct_col_count(
 892    col: str,
 893    query: str,
 894    connector: Optional[mrsm.connectors.sql.SQLConnector] = None,
 895    debug: bool = False
 896) -> Optional[int]:
 897    """
 898    Returns the number of distinct items in a column of a SQL query.
 899
 900    Parameters
 901    ----------
 902    col: str:
 903        The column in the query to count.
 904
 905    query: str:
 906        The SQL query to count from.
 907
 908    connector: Optional[mrsm.connectors.sql.SQLConnector], default None:
 909        The SQLConnector to execute the query.
 910
 911    debug: bool, default False:
 912        Verbosity toggle.
 913
 914    Returns
 915    -------
 916    An `int` of the number of columns in the query or `None` if the query fails.
 917
 918    """
 919    if connector is None:
 920        connector = mrsm.get_connector('sql')
 921
 922    _col_name = sql_item_name(col, connector.flavor, None)
 923
 924    _meta_query = (
 925        f"""
 926        WITH src AS ( {query} ),
 927        dist AS ( SELECT DISTINCT {_col_name} FROM src )
 928        SELECT COUNT(*) FROM dist"""
 929    ) if connector.flavor not in ('mysql', 'mariadb') else (
 930        f"""
 931        SELECT COUNT(*)
 932        FROM (
 933            SELECT DISTINCT {_col_name}
 934            FROM ({query}) AS src
 935        ) AS dist"""
 936    )
 937
 938    result = connector.value(_meta_query, debug=debug)
 939    try:
 940        return int(result)
 941    except Exception:
 942        return None
 943
 944
 945def sql_item_name(item: str, flavor: str, schema: Optional[str] = None) -> str:
 946    """
 947    Parse SQL items depending on the flavor.
 948
 949    Parameters
 950    ----------
 951    item: str
 952        The database item (table, view, etc.) in need of quotes.
 953        
 954    flavor: str
 955        The database flavor (`'postgresql'`, `'mssql'`, `'sqllite'`, etc.).
 956
 957    schema: Optional[str], default None
 958        If provided, prefix the table name with the schema.
 959
 960    Returns
 961    -------
 962    A `str` which contains the input `item` wrapped in the corresponding escape characters.
 963    
 964    Examples
 965    --------
 966    >>> sql_item_name('table', 'sqlite')
 967    '"table"'
 968    >>> sql_item_name('table', 'mssql')
 969    "[table]"
 970    >>> sql_item_name('table', 'postgresql', schema='abc')
 971    '"abc"."table"'
 972
 973    """
 974    truncated_item = truncate_item_name(str(item), flavor)
 975    if flavor == 'oracle':
 976        truncated_item = pg_capital(truncated_item, quote_capitals=True)
 977        ### NOTE: System-reserved words must be quoted.
 978        if truncated_item.lower() in (
 979            'float', 'varchar', 'nvarchar', 'clob',
 980            'boolean', 'integer', 'table', 'row', 'date',
 981        ):
 982            wrappers = ('"', '"')
 983        else:
 984            wrappers = ('', '')
 985    else:
 986        wrappers = table_wrappers.get(flavor, table_wrappers['default'])
 987
 988    ### NOTE: SQLite does not support schemas.
 989    if flavor in ('sqlite', 'geopackage'):
 990        schema = None
 991    elif flavor == 'mssql' and str(item).startswith('#'):
 992        schema = None
 993
 994    schema_prefix = (
 995        (wrappers[0] + schema + wrappers[1] + '.')
 996        if schema is not None
 997        else ''
 998    )
 999
1000    return schema_prefix + wrappers[0] + truncated_item + wrappers[1]
1001
1002
1003def pg_capital(s: str, quote_capitals: bool = True) -> str:
1004    """
1005    If string contains a capital letter, wrap it in double quotes.
1006    
1007    Parameters
1008    ----------
1009    s: str
1010        The string to be escaped.
1011
1012    quote_capitals: bool, default True
1013        If `False`, do not quote strings with contain only a mix of capital and lower-case letters.
1014
1015    Returns
1016    -------
1017    The input string wrapped in quotes only if it needs them.
1018
1019    Examples
1020    --------
1021    >>> pg_capital("My Table")
1022    '"My Table"'
1023    >>> pg_capital('my_table')
1024    'my_table'
1025
1026    """
1027    if s.startswith('"') and s.endswith('"'):
1028        return s
1029
1030    s = s.replace('"', '')
1031
1032    needs_quotes = s.startswith('_')
1033    if not needs_quotes:
1034        for c in s:
1035            if c == '_':
1036                continue
1037
1038            if not c.isalnum() or (quote_capitals and c.isupper()):
1039                needs_quotes = True
1040                break
1041
1042    if needs_quotes:
1043        return '"' + s + '"'
1044
1045    return s
1046
1047
1048def oracle_capital(s: str) -> str:
1049    """
1050    Capitalize the string of an item on an Oracle database.
1051    """
1052    return s
1053
1054
1055def truncate_item_name(item: str, flavor: str) -> str:
1056    """
1057    Truncate item names to stay within the database flavor's character limit.
1058
1059    Parameters
1060    ----------
1061    item: str
1062        The database item being referenced. This string is the "canonical" name internally.
1063
1064    flavor: str
1065        The flavor of the database on which `item` resides.
1066
1067    Returns
1068    -------
1069    The truncated string.
1070    """
1071    from meerschaum.utils.misc import truncate_string_sections
1072    return truncate_string_sections(
1073        item, max_len=max_name_lens.get(flavor, max_name_lens['default'])
1074    )
1075
1076
1077def build_where(
1078    params: Dict[str, Any],
1079    connector: Optional[mrsm.connectors.sql.SQLConnector] = None,
1080    with_where: bool = True,
1081    flavor: str = 'postgresql',
1082) -> str:
1083    """
1084    Build the `WHERE` clause based on the input criteria.
1085
1086    Parameters
1087    ----------
1088    params: Dict[str, Any]:
1089        The keywords dictionary to convert into a WHERE clause.
1090        If a value is a string which begins with an underscore, negate that value
1091        (e.g. `!=` instead of `=` or `NOT IN` instead of `IN`).
1092        A value of `_None` will be interpreted as `IS NOT NULL`.
1093
1094    connector: Optional[meerschaum.connectors.sql.SQLConnector], default None:
1095        The Meerschaum SQLConnector that will be executing the query.
1096        The connector is used to extract the SQL dialect.
1097
1098    with_where: bool, default True:
1099        If `True`, include the leading `'WHERE'` string.
1100
1101    flavor: str, default 'postgresql'
1102        If `connector` is `None`, fall back to this flavor.
1103
1104    Returns
1105    -------
1106    A `str` of the `WHERE` clause from the input `params` dictionary for the connector's flavor.
1107
1108    Examples
1109    --------
1110    ```
1111    >>> print(build_where({'foo': [1, 2, 3]}))
1112    
1113    WHERE
1114        "foo" IN ('1', '2', '3')
1115    ```
1116    """
1117    import json
1118    from meerschaum._internal.static import STATIC_CONFIG
1119    from meerschaum.utils.warnings import warn
1120    from meerschaum.utils.dtypes import value_is_null, none_if_null
1121    negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
1122    try:
1123        params_json = json.dumps(params)
1124    except Exception:
1125        params_json = str(params)
1126    bad_words = ['drop ', '--', ';']
1127    for word in bad_words:
1128        if word in params_json.lower():
1129            warn("Aborting build_where() due to possible SQL injection.")
1130            return ''
1131
1132    query_flavor = getattr(connector, 'flavor', flavor) if connector is not None else flavor
1133    where = ""
1134    leading_and = "\n    AND "
1135    for key, value in params.items():
1136        _key = sql_item_name(key, query_flavor, None)
1137        ### search across a list (i.e. IN syntax)
1138        if isinstance(value, Iterable) and not isinstance(value, (dict, str)):
1139            includes = [
1140                none_if_null(item)
1141                for item in value
1142                if not str(item).startswith(negation_prefix)
1143            ]
1144            null_includes = [item for item in includes if item is None]
1145            not_null_includes = [item for item in includes if item is not None]
1146            excludes = [
1147                none_if_null(str(item)[len(negation_prefix):])
1148                for item in value
1149                if str(item).startswith(negation_prefix)
1150            ]
1151            null_excludes = [item for item in excludes if item is None]
1152            not_null_excludes = [item for item in excludes if item is not None]
1153
1154            if includes:
1155                where += f"{leading_and}("
1156            if not_null_includes:
1157                where += f"{_key} IN ("
1158                for item in not_null_includes:
1159                    quoted_item = str(item).replace("'", "''")
1160                    where += f"'{quoted_item}', "
1161                where = where[:-2] + ")"
1162            if null_includes:
1163                where += ("\n    OR " if not_null_includes else "") + f"{_key} IS NULL"
1164            if includes:
1165                where += ")"
1166
1167            if excludes:
1168                where += f"{leading_and}("
1169            if not_null_excludes:
1170                where += f"{_key} NOT IN ("
1171                for item in not_null_excludes:
1172                    quoted_item = str(item).replace("'", "''")
1173                    where += f"'{quoted_item}', "
1174                where = where[:-2] + ")"
1175            if null_excludes:
1176                where += ("\n    AND " if not_null_excludes else "") + f"{_key} IS NOT NULL"
1177            if excludes:
1178                where += ")"
1179
1180            continue
1181
1182        ### search a dictionary
1183        elif isinstance(value, dict):
1184            import json
1185            where += (f"{leading_and}CAST({_key} AS TEXT) = '" + json.dumps(value) + "'")
1186            continue
1187
1188        eq_sign = '='
1189        is_null = 'IS NULL'
1190        if value_is_null(str(value).lstrip(negation_prefix)):
1191            value = (
1192                (negation_prefix + 'None')
1193                if str(value).startswith(negation_prefix)
1194                else None
1195            )
1196        if str(value).startswith(negation_prefix):
1197            value = str(value)[len(negation_prefix):]
1198            eq_sign = '!='
1199            if value_is_null(value):
1200                value = None
1201                is_null = 'IS NOT NULL'
1202        quoted_value = str(value).replace("'", "''")
1203        where += (
1204            f"{leading_and}{_key} "
1205            + (is_null if value is None else f"{eq_sign} '{quoted_value}'")
1206        )
1207
1208    if len(where) > 1:
1209        where = ("\nWHERE\n    " if with_where else '') + where[len(leading_and):]
1210    return where
1211
1212
1213def table_exists(
1214    table: str,
1215    connector: mrsm.connectors.sql.SQLConnector,
1216    schema: Optional[str] = None,
1217    debug: bool = False,
1218) -> bool:
1219    """Check if a table exists.
1220
1221    Parameters
1222    ----------
1223    table: str:
1224        The name of the table in question.
1225
1226    connector: mrsm.connectors.sql.SQLConnector
1227        The connector to the database which holds the table.
1228
1229    schema: Optional[str], default None
1230        Optionally specify the table schema.
1231        Defaults to `connector.schema`.
1232
1233    debug: bool, default False :
1234        Verbosity toggle.
1235
1236    Returns
1237    -------
1238    A `bool` indicating whether or not the table exists on the database.
1239    """
1240    sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False)
1241    schema = schema or connector.schema
1242    insp = sqlalchemy.inspect(connector.engine)
1243    truncated_table_name = truncate_item_name(str(table), connector.flavor)
1244    return insp.has_table(truncated_table_name, schema=schema)
1245
1246
1247def get_sqlalchemy_table(
1248    table: str,
1249    connector: Optional[mrsm.connectors.sql.SQLConnector] = None,
1250    schema: Optional[str] = None,
1251    refresh: bool = False,
1252    debug: bool = False,
1253) -> Union['sqlalchemy.Table', None]:
1254    """
1255    Construct a SQLAlchemy table from its name.
1256
1257    Parameters
1258    ----------
1259    table: str
1260        The name of the table on the database. Does not need to be escaped.
1261
1262    connector: Optional[meerschaum.connectors.sql.SQLConnector], default None:
1263        The connector to the database which holds the table. 
1264
1265    schema: Optional[str], default None
1266        Specify on which schema the table resides.
1267        Defaults to the schema set in `connector`.
1268
1269    refresh: bool, default False
1270        If `True`, rebuild the cached table object.
1271
1272    debug: bool, default False:
1273        Verbosity toggle.
1274
1275    Returns
1276    -------
1277    A `sqlalchemy.Table` object for the table.
1278
1279    """
1280    if connector is None:
1281        from meerschaum import get_connector
1282        connector = get_connector('sql')
1283
1284    if connector.flavor == 'duckdb':
1285        return None
1286
1287    from meerschaum.connectors.sql.tables import get_tables
1288    from meerschaum.utils.packages import attempt_import
1289    from meerschaum.utils.warnings import warn
1290    if refresh:
1291        connector.metadata.clear()
1292    tables = get_tables(mrsm_instance=connector, debug=debug, create=False)
1293    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
1294    truncated_table_name = truncate_item_name(str(table), connector.flavor)
1295    table_kwargs = {
1296        'autoload_with': connector.engine,
1297    }
1298    if schema:
1299        table_kwargs['schema'] = schema
1300
1301    if refresh or truncated_table_name not in tables:
1302        try:
1303            tables[truncated_table_name] = sqlalchemy.Table(
1304                truncated_table_name,
1305                connector.metadata,
1306                **table_kwargs
1307            )
1308        except sqlalchemy.exc.NoSuchTableError:
1309            warn(f"Table '{truncated_table_name}' does not exist in '{connector}'.")
1310            return None
1311    return tables[truncated_table_name]
1312
1313
1314def get_table_cols_types(
1315    table: str,
1316    connectable: Union[
1317        'mrsm.connectors.sql.SQLConnector',
1318        'sqlalchemy.orm.session.Session',
1319        'sqlalchemy.engine.base.Engine'
1320    ],
1321    flavor: Optional[str] = None,
1322    schema: Optional[str] = None,
1323    database: Optional[str] = None,
1324    debug: bool = False,
1325) -> Dict[str, str]:
1326    """
1327    Return a dictionary mapping a table's columns to data types.
1328    This is useful for inspecting tables creating during a not-yet-committed session.
1329
1330    NOTE: This may return incorrect columns if the schema is not explicitly stated.
1331        Use this function if you are confident the table name is unique or if you have
1332        and explicit schema.
1333        To use the configured schema, get the columns from `get_sqlalchemy_table()` instead.
1334
1335    Parameters
1336    ----------
1337    table: str
1338        The name of the table (unquoted).
1339
1340    connectable: Union[
1341        'mrsm.connectors.sql.SQLConnector',
1342        'sqlalchemy.orm.session.Session',
1343        'sqlalchemy.engine.base.Engine'
1344    ]
1345        The connection object used to fetch the columns and types.
1346
1347    flavor: Optional[str], default None
1348        The database dialect flavor to use for the query.
1349        If omitted, default to `connectable.flavor`.
1350
1351    schema: Optional[str], default None
1352        If provided, restrict the query to this schema.
1353
1354    database: Optional[str]. default None
1355        If provided, restrict the query to this database.
1356
1357    Returns
1358    -------
1359    A dictionary mapping column names to data types.
1360    """
1361    import textwrap
1362    from meerschaum.connectors import SQLConnector
1363    sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False)
1364    flavor = flavor or getattr(connectable, 'flavor', None)
1365    if not flavor:
1366        raise ValueError("Please provide a database flavor.")
1367    if flavor == 'duckdb' and not isinstance(connectable, SQLConnector):
1368        raise ValueError("You must provide a SQLConnector when using DuckDB.")
1369    if flavor in NO_SCHEMA_FLAVORS:
1370        schema = None
1371    if schema is None:
1372        schema = DEFAULT_SCHEMA_FLAVORS.get(flavor, None)
1373    if flavor in ('sqlite', 'duckdb', 'oracle', 'geopackage'):
1374        database = None
1375    table_trunc = truncate_item_name(table, flavor=flavor)
1376    table_lower = table.lower()
1377    table_upper = table.upper()
1378    table_lower_trunc = truncate_item_name(table_lower, flavor=flavor)
1379    table_upper_trunc = truncate_item_name(table_upper, flavor=flavor)
1380    db_prefix = (
1381        "tempdb."
1382        if flavor == 'mssql' and table.startswith('#')
1383        else ""
1384    )
1385
1386    cols_types_query = sqlalchemy.text(
1387        textwrap.dedent(columns_types_queries.get(
1388            flavor,
1389            columns_types_queries['default']
1390        ).format(
1391            table=table,
1392            table_trunc=table_trunc,
1393            table_lower=table_lower,
1394            table_lower_trunc=table_lower_trunc,
1395            table_upper=table_upper,
1396            table_upper_trunc=table_upper_trunc,
1397            db_prefix=db_prefix,
1398        )).lstrip().rstrip()
1399    )
1400
1401    cols = ['database', 'schema', 'table', 'column', 'type', 'numeric_precision', 'numeric_scale']
1402    result_cols_ix = dict(enumerate(cols))
1403
1404    debug_kwargs = {'debug': debug} if isinstance(connectable, SQLConnector) else {}
1405    if not debug_kwargs and debug:
1406        dprint(cols_types_query)
1407
1408    try:
1409        result_rows = (
1410            [
1411                row
1412                for row in connectable.execute(cols_types_query, **debug_kwargs).fetchall()
1413            ]
1414            if flavor != 'duckdb'
1415            else [
1416                tuple([doc[col] for col in cols])
1417                for doc in connectable.read(cols_types_query, debug=debug).to_dict(orient='records')
1418            ]
1419        )
1420        cols_types_docs = [
1421            {
1422                result_cols_ix[i]: val
1423                for i, val in enumerate(row)
1424            }
1425            for row in result_rows
1426        ]
1427        cols_types_docs_filtered = [
1428            doc
1429            for doc in cols_types_docs
1430            if (
1431                (
1432                    not schema
1433                    or doc['schema'] == schema
1434                )
1435                and
1436                (
1437                    not database
1438                    or doc['database'] == database
1439                )
1440            )
1441        ]
1442
1443        ### NOTE: This may return incorrect columns if the schema is not explicitly stated.
1444        if cols_types_docs and not cols_types_docs_filtered:
1445            cols_types_docs_filtered = cols_types_docs
1446
1447        ### NOTE: Check for PostGIS GEOMETRY columns.
1448        geometry_cols_types = {}
1449        user_defined_cols = [
1450            doc
1451            for doc in cols_types_docs_filtered
1452            if str(doc.get('type', None)).upper() == 'USER-DEFINED'
1453        ]
1454        if user_defined_cols:
1455            geometry_cols_types.update(
1456                get_postgis_geo_columns_types(
1457                    connectable,
1458                    table,
1459                    schema=schema,
1460                    debug=debug,
1461                )
1462            )
1463
1464        cols_types = {
1465            (
1466                doc['column']
1467                if flavor != 'oracle' else (
1468                    (
1469                        doc['column'].lower()
1470                        if (doc['column'].isupper() and doc['column'].replace('_', '').isalpha())
1471                        else doc['column']
1472                    )
1473                )
1474            ): doc['type'].upper() + (
1475                f'({precision},{scale})'
1476                if (
1477                    (precision := doc.get('numeric_precision', None))
1478                     and
1479                    (scale := doc.get('numeric_scale', None))
1480                )
1481                else ''
1482            )
1483            for doc in cols_types_docs_filtered
1484        }
1485        cols_types.update(geometry_cols_types)
1486        return cols_types
1487    except Exception as e:
1488        warn(f"Failed to fetch columns for table '{table}':\n{e}")
1489        return {}
1490
1491
1492def get_table_cols_indices(
1493    table: str,
1494    connectable: Union[
1495        'mrsm.connectors.sql.SQLConnector',
1496        'sqlalchemy.orm.session.Session',
1497        'sqlalchemy.engine.base.Engine'
1498    ],
1499    flavor: Optional[str] = None,
1500    schema: Optional[str] = None,
1501    database: Optional[str] = None,
1502    debug: bool = False,
1503) -> Dict[str, List[str]]:
1504    """
1505    Return a dictionary mapping a table's columns to lists of indices.
1506    This is useful for inspecting tables creating during a not-yet-committed session.
1507
1508    NOTE: This may return incorrect columns if the schema is not explicitly stated.
1509        Use this function if you are confident the table name is unique or if you have
1510        and explicit schema.
1511        To use the configured schema, get the columns from `get_sqlalchemy_table()` instead.
1512
1513    Parameters
1514    ----------
1515    table: str
1516        The name of the table (unquoted).
1517
1518    connectable: Union[
1519        'mrsm.connectors.sql.SQLConnector',
1520        'sqlalchemy.orm.session.Session',
1521        'sqlalchemy.engine.base.Engine'
1522    ]
1523        The connection object used to fetch the columns and types.
1524
1525    flavor: Optional[str], default None
1526        The database dialect flavor to use for the query.
1527        If omitted, default to `connectable.flavor`.
1528
1529    schema: Optional[str], default None
1530        If provided, restrict the query to this schema.
1531
1532    database: Optional[str]. default None
1533        If provided, restrict the query to this database.
1534
1535    Returns
1536    -------
1537    A dictionary mapping column names to a list of indices.
1538    """
1539    import textwrap
1540    from collections import defaultdict
1541    from meerschaum.connectors import SQLConnector
1542    sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False)
1543    flavor = flavor or getattr(connectable, 'flavor', None)
1544    if not flavor:
1545        raise ValueError("Please provide a database flavor.")
1546    if flavor == 'duckdb' and not isinstance(connectable, SQLConnector):
1547        raise ValueError("You must provide a SQLConnector when using DuckDB.")
1548    if flavor in NO_SCHEMA_FLAVORS:
1549        schema = None
1550    if schema is None:
1551        schema = DEFAULT_SCHEMA_FLAVORS.get(flavor, None)
1552    if flavor in ('sqlite', 'duckdb', 'oracle', 'geopackage'):
1553        database = None
1554    table_trunc = truncate_item_name(table, flavor=flavor)
1555    table_lower = table.lower()
1556    table_upper = table.upper()
1557    table_lower_trunc = truncate_item_name(table_lower, flavor=flavor)
1558    table_upper_trunc = truncate_item_name(table_upper, flavor=flavor)
1559    db_prefix = (
1560        "tempdb."
1561        if flavor == 'mssql' and table.startswith('#')
1562        else ""
1563    )
1564
1565    cols_indices_query = sqlalchemy.text(
1566        textwrap.dedent(columns_indices_queries.get(
1567            flavor,
1568            columns_indices_queries['default']
1569        ).format(
1570            table=table,
1571            table_trunc=table_trunc,
1572            table_lower=table_lower,
1573            table_lower_trunc=table_lower_trunc,
1574            table_upper=table_upper,
1575            table_upper_trunc=table_upper_trunc,
1576            db_prefix=db_prefix,
1577            schema=schema,
1578        )).lstrip().rstrip()
1579    )
1580
1581    cols = ['database', 'schema', 'table', 'column', 'index', 'index_type']
1582    if flavor == 'mssql':
1583        cols.append('clustered')
1584    result_cols_ix = dict(enumerate(cols))
1585
1586    debug_kwargs = {'debug': debug} if isinstance(connectable, SQLConnector) else {}
1587    if not debug_kwargs and debug:
1588        dprint(cols_indices_query)
1589
1590    try:
1591        result_rows = (
1592            [
1593                row
1594                for row in connectable.execute(cols_indices_query, **debug_kwargs).fetchall()
1595            ]
1596            if flavor != 'duckdb'
1597            else [
1598                tuple([doc[col] for col in cols])
1599                for doc in connectable.read(cols_indices_query, debug=debug).to_dict(orient='records')
1600            ]
1601        )
1602        cols_types_docs = [
1603            {
1604                result_cols_ix[i]: val
1605                for i, val in enumerate(row)
1606            }
1607            for row in result_rows
1608        ]
1609        cols_types_docs_filtered = [
1610            doc
1611            for doc in cols_types_docs
1612            if (
1613                (
1614                    not schema
1615                    or doc['schema'] == schema
1616                )
1617                and
1618                (
1619                    not database
1620                    or doc['database'] == database
1621                )
1622            )
1623        ]
1624        ### NOTE: This may return incorrect columns if the schema is not explicitly stated.
1625        if cols_types_docs and not cols_types_docs_filtered:
1626            cols_types_docs_filtered = cols_types_docs
1627
1628        cols_indices = defaultdict(lambda: [])
1629        for doc in cols_types_docs_filtered:
1630            col = (
1631                doc['column']
1632                if flavor != 'oracle'
1633                else (
1634                    doc['column'].lower()
1635                    if (doc['column'].isupper() and doc['column'].replace('_', '').isalpha())
1636                    else doc['column']
1637                )
1638            )
1639            index_doc = {
1640                'name': doc.get('index', None),
1641                'type': doc.get('index_type', None)
1642            }
1643            if flavor == 'mssql':
1644                index_doc['clustered'] = doc.get('clustered', None)
1645            cols_indices[col].append(index_doc)
1646
1647        return dict(cols_indices)
1648    except Exception as e:
1649        warn(f"Failed to fetch columns for table '{table}':\n{e}")
1650        return {}
1651
1652
1653def get_update_queries(
1654    target: str,
1655    patch: str,
1656    connectable: Union[
1657        mrsm.connectors.sql.SQLConnector,
1658        'sqlalchemy.orm.session.Session'
1659    ],
1660    join_cols: Iterable[str],
1661    flavor: Optional[str] = None,
1662    upsert: bool = False,
1663    datetime_col: Optional[str] = None,
1664    schema: Optional[str] = None,
1665    patch_schema: Optional[str] = None,
1666    target_cols_types: Optional[Dict[str, str]] = None,
1667    patch_cols_types: Optional[Dict[str, str]] = None,
1668    identity_insert: bool = False,
1669    null_indices: bool = True,
1670    cast_columns: bool = True,
1671    debug: bool = False,
1672) -> List[str]:
1673    """
1674    Build a list of `MERGE`, `UPDATE`, `DELETE`/`INSERT` queries to apply a patch to target table.
1675
1676    Parameters
1677    ----------
1678    target: str
1679        The name of the target table.
1680
1681    patch: str
1682        The name of the patch table. This should have the same shape as the target.
1683
1684    connectable: Union[meerschaum.connectors.sql.SQLConnector, sqlalchemy.orm.session.Session]
1685        The `SQLConnector` or SQLAlchemy session which will later execute the queries.
1686
1687    join_cols: List[str]
1688        The columns to use to join the patch to the target.
1689
1690    flavor: Optional[str], default None
1691        If using a SQLAlchemy session, provide the expected database flavor.
1692
1693    upsert: bool, default False
1694        If `True`, return an upsert query rather than an update.
1695
1696    datetime_col: Optional[str], default None
1697        If provided, bound the join query using this column as the datetime index.
1698        This must be present on both tables.
1699
1700    schema: Optional[str], default None
1701        If provided, use this schema when quoting the target table.
1702        Defaults to `connector.schema`.
1703
1704    patch_schema: Optional[str], default None
1705        If provided, use this schema when quoting the patch table.
1706        Defaults to `schema`.
1707
1708    target_cols_types: Optional[Dict[str, Any]], default None
1709        If provided, use these as the columns-types dictionary for the target table.
1710        Default will infer from the database context.
1711
1712    patch_cols_types: Optional[Dict[str, Any]], default None
1713        If provided, use these as the columns-types dictionary for the target table.
1714        Default will infer from the database context.
1715
1716    identity_insert: bool, default False
1717        If `True`, include `SET IDENTITY_INSERT` queries before and after the update queries.
1718        Only applies for MSSQL upserts.
1719
1720    null_indices: bool, default True
1721        If `False`, do not coalesce index columns before joining.
1722
1723    cast_columns: bool, default True
1724        If `False`, do not cast update columns to the target table types.
1725
1726    debug: bool, default False
1727        Verbosity toggle.
1728
1729    Returns
1730    -------
1731    A list of query strings to perform the update operation.
1732    """
1733    import textwrap
1734    from meerschaum.connectors import SQLConnector
1735    from meerschaum.utils.debug import dprint
1736    from meerschaum.utils.dtypes import are_dtypes_equal
1737    from meerschaum.utils.dtypes.sql import DB_FLAVORS_CAST_DTYPES, get_pd_type_from_db_type
1738    flavor = flavor or getattr(connectable, 'flavor', None)
1739    if not flavor:
1740        raise ValueError("Provide a flavor if using a SQLAlchemy session.")
1741    if (
1742        flavor in ('sqlite', 'geopackage')
1743        and isinstance(connectable, SQLConnector)
1744        and connectable.db_version < '3.33.0'
1745    ):
1746        flavor = 'sqlite_delete_insert'
1747    flavor_key = (f'{flavor}-upsert' if upsert else flavor)
1748    base_queries = UPDATE_QUERIES.get(
1749        flavor_key,
1750        UPDATE_QUERIES['default']
1751    )
1752    if not isinstance(base_queries, list):
1753        base_queries = [base_queries]
1754    schema = schema or (connectable.schema if isinstance(connectable, SQLConnector) else None)
1755    patch_schema = patch_schema or schema
1756    target_table_columns = get_table_cols_types(
1757        target,
1758        connectable,
1759        flavor=flavor,
1760        schema=schema,
1761        debug=debug,
1762    ) if not target_cols_types else target_cols_types
1763    patch_table_columns = get_table_cols_types(
1764        patch,
1765        connectable,
1766        flavor=flavor,
1767        schema=patch_schema,
1768        debug=debug,
1769    ) if not patch_cols_types else patch_cols_types
1770
1771    patch_cols_str = ', '.join(
1772        [
1773            sql_item_name(col, flavor)
1774            for col in patch_table_columns
1775        ]
1776    )
1777    patch_cols_prefixed_str = ', '.join(
1778        [
1779            'p.' + sql_item_name(col, flavor)
1780            for col in patch_table_columns
1781        ]
1782    )
1783
1784    join_cols_str = ', '.join(
1785        [
1786            sql_item_name(col, flavor)
1787            for col in join_cols
1788        ]
1789    )
1790
1791    value_cols = []
1792    join_cols_types = []
1793    if debug:
1794        dprint("target_table_columns:")
1795        mrsm.pprint(target_table_columns)
1796    for c_name, c_type in target_table_columns.items():
1797        if c_name not in patch_table_columns:
1798            continue
1799        if flavor in DB_FLAVORS_CAST_DTYPES:
1800            c_type = DB_FLAVORS_CAST_DTYPES[flavor].get(c_type.upper(), c_type)
1801        (
1802            join_cols_types
1803            if c_name in join_cols
1804            else value_cols
1805        ).append((c_name, c_type))
1806    if debug:
1807        dprint(f"value_cols: {value_cols}")
1808
1809    if not join_cols_types:
1810        return []
1811    if not value_cols and not upsert:
1812        return []
1813
1814    coalesce_join_cols_str = ', '.join(
1815        [
1816            (
1817                (
1818                    'COALESCE('
1819                    + sql_item_name(c_name, flavor)
1820                    + ', '
1821                    + get_null_replacement(c_type, flavor)
1822                    + ')'
1823                )
1824                if null_indices
1825                else sql_item_name(c_name, flavor)
1826            )
1827            for c_name, c_type in join_cols_types
1828        ]
1829    )
1830
1831    update_or_nothing = ('UPDATE' if value_cols else 'NOTHING')
1832
1833    def sets_subquery(l_prefix: str, r_prefix: str):
1834        if not value_cols:
1835            return ''
1836
1837        utc_value_cols = {
1838            c_name
1839            for c_name, c_type in value_cols
1840            if ('utc' in get_pd_type_from_db_type(c_type).lower())
1841        } if flavor not in TIMEZONE_NAIVE_FLAVORS else set()
1842
1843        cast_func_cols = {
1844            c_name: (
1845                ('', '', '')
1846                if not cast_columns or (
1847                    flavor == 'oracle'
1848                    and are_dtypes_equal(get_pd_type_from_db_type(c_type), 'bytes')
1849                )
1850                else (
1851                    ('CAST(', f" AS {c_type.replace('_', ' ')}", ')' + (
1852                        " AT TIME ZONE 'UTC'"
1853                        if c_name in utc_value_cols
1854                        else ''
1855                    ))
1856                    if flavor not in ('sqlite', 'geopackage')
1857                    else ('', '', '')
1858                )
1859            )
1860            for c_name, c_type in value_cols
1861        }
1862        return 'SET ' + ',\n'.join([
1863            (
1864                l_prefix + sql_item_name(c_name, flavor, None)
1865                + ' = '
1866                + cast_func_cols[c_name][0]
1867                + r_prefix + sql_item_name(c_name, flavor, None)
1868                + cast_func_cols[c_name][1]
1869                + cast_func_cols[c_name][2]
1870            ) for c_name, c_type in value_cols
1871        ])
1872
1873    def and_subquery(l_prefix: str, r_prefix: str):
1874        return '\n            AND\n                '.join([
1875            (
1876                (
1877                    "COALESCE("
1878                    + l_prefix
1879                    + sql_item_name(c_name, flavor, None)
1880                    + ", "
1881                    + get_null_replacement(c_type, flavor)
1882                    + ")"
1883                    + '\n                =\n                '
1884                    + "COALESCE("
1885                    + r_prefix
1886                    + sql_item_name(c_name, flavor, None)
1887                    + ", "
1888                    + get_null_replacement(c_type, flavor)
1889                    + ")"
1890                )
1891                if null_indices
1892                else (
1893                    l_prefix
1894                    + sql_item_name(c_name, flavor, None)
1895                    + ' = '
1896                    + r_prefix
1897                    + sql_item_name(c_name, flavor, None)
1898                )
1899            ) for c_name, c_type in join_cols_types
1900        ])
1901
1902    skip_query_val = ""
1903    target_table_name = sql_item_name(target, flavor, schema)
1904    patch_table_name = sql_item_name(patch, flavor, patch_schema)
1905    dt_col_name = sql_item_name(datetime_col, flavor, None) if datetime_col else None
1906    date_bounds_table = patch_table_name if flavor != 'mssql' else '[date_bounds]'
1907    min_dt_col_name = f"MIN({dt_col_name})" if flavor != 'mssql' else '[Min_dt]'
1908    max_dt_col_name = f"MAX({dt_col_name})" if flavor != 'mssql' else '[Max_dt]'
1909    date_bounds_subquery = (
1910        f"""f.{dt_col_name} >= (SELECT {min_dt_col_name} FROM {date_bounds_table})
1911            AND
1912                f.{dt_col_name} <= (SELECT {max_dt_col_name} FROM {date_bounds_table})"""
1913        if datetime_col
1914        else "1 = 1"
1915    )
1916    with_temp_date_bounds = f"""WITH [date_bounds] AS (
1917        SELECT MIN({dt_col_name}) AS {min_dt_col_name}, MAX({dt_col_name}) AS {max_dt_col_name}
1918        FROM {patch_table_name}
1919    )""" if datetime_col else ""
1920    identity_insert_on = (
1921        f"SET IDENTITY_INSERT {target_table_name} ON"
1922        if identity_insert
1923        else skip_query_val
1924    )
1925    identity_insert_off = (
1926        f"SET IDENTITY_INSERT {target_table_name} OFF"
1927        if identity_insert
1928        else skip_query_val
1929    )
1930
1931    ### NOTE: MSSQL upserts must exclude the update portion if only upserting indices.
1932    when_matched_update_sets_subquery_none = "" if not value_cols else (
1933        "\n        WHEN MATCHED THEN\n"
1934        f"            UPDATE {sets_subquery('', 'p.')}"
1935    )
1936
1937    cols_equal_values = '\n,'.join(
1938        [
1939            f"{sql_item_name(c_name, flavor)} = VALUES({sql_item_name(c_name, flavor)})"
1940            for c_name, c_type in value_cols
1941        ]
1942    )
1943    on_duplicate_key_update = (
1944        "ON DUPLICATE KEY UPDATE"
1945        if value_cols
1946        else ""
1947    )
1948    ignore = "IGNORE " if not value_cols else ""
1949
1950    formatted_queries = [
1951        textwrap.dedent(base_query.format(
1952            sets_subquery_none=sets_subquery('', 'p.'),
1953            sets_subquery_none_excluded=sets_subquery('', 'EXCLUDED.'),
1954            sets_subquery_f=sets_subquery('f.', 'p.'),
1955            and_subquery_f=and_subquery('p.', 'f.'),
1956            and_subquery_t=and_subquery('p.', 't.'),
1957            target_table_name=target_table_name,
1958            patch_table_name=patch_table_name,
1959            patch_cols_str=patch_cols_str,
1960            patch_cols_prefixed_str=patch_cols_prefixed_str,
1961            date_bounds_subquery=date_bounds_subquery,
1962            join_cols_str=join_cols_str,
1963            coalesce_join_cols_str=coalesce_join_cols_str,
1964            update_or_nothing=update_or_nothing,
1965            when_matched_update_sets_subquery_none=when_matched_update_sets_subquery_none,
1966            cols_equal_values=cols_equal_values,
1967            on_duplicate_key_update=on_duplicate_key_update,
1968            ignore=ignore,
1969            with_temp_date_bounds=with_temp_date_bounds,
1970            identity_insert_on=identity_insert_on,
1971            identity_insert_off=identity_insert_off,
1972        )).lstrip().rstrip()
1973        for base_query in base_queries
1974    ]
1975
1976    ### NOTE: Allow for skipping some queries.
1977    return [query for query in formatted_queries if query]
1978
1979
1980def get_null_replacement(typ: str, flavor: str) -> str:
1981    """
1982    Return a value that may temporarily be used in place of NULL for this type.
1983
1984    Parameters
1985    ----------
1986    typ: str
1987        The typ to be converted to NULL.
1988
1989    flavor: str
1990        The database flavor for which this value will be used.
1991
1992    Returns
1993    -------
1994    A value which may stand in place of NULL for this type.
1995    `'None'` is returned if a value cannot be determined.
1996    """
1997    from meerschaum.utils.dtypes.sql import DB_FLAVORS_CAST_DTYPES
1998    if 'geometry' in typ.lower():
1999        return "'010100000000008058346FCDC100008058346FCDC1'"
2000    if 'int' in typ.lower() or typ.lower() in ('numeric', 'number'):
2001        return '-987654321'
2002    if 'bool' in typ.lower() or typ.lower() == 'bit':
2003        bool_typ = (
2004            PD_TO_DB_DTYPES_FLAVORS
2005            .get('bool', {})
2006            .get(flavor, PD_TO_DB_DTYPES_FLAVORS['bool']['default'])
2007        )
2008        if flavor in DB_FLAVORS_CAST_DTYPES:
2009            bool_typ = DB_FLAVORS_CAST_DTYPES[flavor].get(bool_typ, bool_typ)
2010        val_to_cast = (
2011            -987654321
2012            if flavor in ('mysql', 'mariadb')
2013            else 0
2014        )
2015        return f'CAST({val_to_cast} AS {bool_typ})'
2016    if 'time' in typ.lower() or 'date' in typ.lower():
2017        db_type = typ if typ.isupper() else None
2018        return dateadd_str(flavor=flavor, begin='1900-01-01', db_type=db_type)
2019    if 'float' in typ.lower() or 'double' in typ.lower() or typ.lower() in ('decimal',):
2020        return '-987654321.0'
2021    if flavor == 'oracle' and typ.lower().split('(', maxsplit=1)[0] == 'char':
2022        return "'-987654321'"
2023    if flavor == 'oracle' and typ.lower() in ('blob', 'bytes'):
2024        return '00'
2025    if typ.lower() in ('uniqueidentifier', 'guid', 'uuid'):
2026        magic_val = 'DEADBEEF-ABBA-BABE-CAFE-DECAFC0FFEE5'
2027        if flavor == 'mssql':
2028            return f"CAST('{magic_val}' AS UNIQUEIDENTIFIER)"
2029        return f"'{magic_val}'"
2030    return ('n' if flavor == 'oracle' else '') + "'-987654321'"
2031
2032
2033def get_db_version(conn: 'SQLConnector', debug: bool = False) -> Union[str, None]:
2034    """
2035    Fetch the database version if possible.
2036    """
2037    version_name = sql_item_name('version', conn.flavor, None)
2038    version_query = version_queries.get(
2039        conn.flavor,
2040        version_queries['default']
2041    ).format(version_name=version_name)
2042    return conn.value(version_query, debug=debug)
2043
2044
2045def get_rename_table_queries(
2046    old_table: str,
2047    new_table: str,
2048    flavor: str,
2049    schema: Optional[str] = None,
2050) -> List[str]:
2051    """
2052    Return queries to alter a table's name.
2053
2054    Parameters
2055    ----------
2056    old_table: str
2057        The unquoted name of the old table.
2058
2059    new_table: str
2060        The unquoted name of the new table.
2061
2062    flavor: str
2063        The database flavor to use for the query (e.g. `'mssql'`, `'postgresql'`.
2064
2065    schema: Optional[str], default None
2066        The schema on which the table resides.
2067
2068    Returns
2069    -------
2070    A list of `ALTER TABLE` or equivalent queries for the database flavor.
2071    """
2072    old_table_name = sql_item_name(old_table, flavor, schema)
2073    new_table_name = sql_item_name(new_table, flavor, None)
2074    tmp_table = '_tmp_rename_' + new_table
2075    tmp_table_name = sql_item_name(tmp_table, flavor, schema)
2076    if flavor == 'mssql':
2077        return [f"EXEC sp_rename '{old_table}', '{new_table}'"]
2078
2079    if_exists_str = "IF EXISTS" if flavor in DROP_IF_EXISTS_FLAVORS else ""
2080    if flavor == 'duckdb':
2081        return (
2082            get_create_table_queries(
2083                f"SELECT * FROM {old_table_name}",
2084                tmp_table,
2085                'duckdb',
2086                schema,
2087            ) + get_create_table_queries(
2088                f"SELECT * FROM {tmp_table_name}",
2089                new_table,
2090                'duckdb',
2091                schema,
2092            ) + [
2093                f"DROP TABLE {if_exists_str} {tmp_table_name}",
2094                f"DROP TABLE {if_exists_str} {old_table_name}",
2095            ]
2096        )
2097
2098    return [f"ALTER TABLE {old_table_name} RENAME TO {new_table_name}"]
2099
2100
2101def get_create_table_query(
2102    query_or_dtypes: Union[str, Dict[str, str]],
2103    new_table: str,
2104    flavor: str,
2105    schema: Optional[str] = None,
2106) -> str:
2107    """
2108    NOTE: This function is deprecated. Use `get_create_table_queries()` instead.
2109
2110    Return a query to create a new table from a `SELECT` query.
2111
2112    Parameters
2113    ----------
2114    query: Union[str, Dict[str, str]]
2115        The select query to use for the creation of the table.
2116        If a dictionary is provided, return a `CREATE TABLE` query from the given `dtypes` columns.
2117
2118    new_table: str
2119        The unquoted name of the new table.
2120
2121    flavor: str
2122        The database flavor to use for the query (e.g. `'mssql'`, `'postgresql'`).
2123
2124    schema: Optional[str], default None
2125        The schema on which the table will reside.
2126
2127    Returns
2128    -------
2129    A `CREATE TABLE` (or `SELECT INTO`) query for the database flavor.
2130    """
2131    return get_create_table_queries(
2132        query_or_dtypes,
2133        new_table,
2134        flavor,
2135        schema=schema,
2136        primary_key=None,
2137    )[0]
2138
2139
2140def get_create_table_queries(
2141    query_or_dtypes: Union[str, Dict[str, str]],
2142    new_table: str,
2143    flavor: str,
2144    schema: Optional[str] = None,
2145    primary_key: Optional[str] = None,
2146    primary_key_db_type: Optional[str] = None,
2147    autoincrement: bool = False,
2148    datetime_column: Optional[str] = None,
2149) -> List[str]:
2150    """
2151    Return a query to create a new table from a `SELECT` query or a `dtypes` dictionary.
2152
2153    Parameters
2154    ----------
2155    query_or_dtypes: Union[str, Dict[str, str]]
2156        The select query to use for the creation of the table.
2157        If a dictionary is provided, return a `CREATE TABLE` query from the given `dtypes` columns.
2158
2159    new_table: str
2160        The unquoted name of the new table.
2161
2162    flavor: str
2163        The database flavor to use for the query (e.g. `'mssql'`, `'postgresql'`).
2164
2165    schema: Optional[str], default None
2166        The schema on which the table will reside.
2167
2168    primary_key: Optional[str], default None
2169        If provided, designate this column as the primary key in the new table.
2170
2171    primary_key_db_type: Optional[str], default None
2172        If provided, alter the primary key to this type (to set NOT NULL constraint).
2173
2174    autoincrement: bool, default False
2175        If `True` and `primary_key` is provided, create the `primary_key` column
2176        as an auto-incrementing integer column.
2177
2178    datetime_column: Optional[str], default None
2179        If provided, include this column in the primary key.
2180        Applicable to TimescaleDB only.
2181
2182    Returns
2183    -------
2184    A `CREATE TABLE` (or `SELECT INTO`) query for the database flavor.
2185    """
2186    if not isinstance(query_or_dtypes, (str, dict)):
2187        raise TypeError("`query_or_dtypes` must be a query or a dtypes dictionary.")
2188
2189    method = (
2190        _get_create_table_query_from_cte
2191        if isinstance(query_or_dtypes, str)
2192        else _get_create_table_query_from_dtypes
2193    )
2194    return method(
2195        query_or_dtypes,
2196        new_table,
2197        flavor,
2198        schema=schema,
2199        primary_key=primary_key,
2200        primary_key_db_type=primary_key_db_type,
2201        autoincrement=(autoincrement and flavor not in SKIP_AUTO_INCREMENT_FLAVORS),
2202        datetime_column=datetime_column,
2203    )
2204
2205
2206def _get_create_table_query_from_dtypes(
2207    dtypes: Dict[str, str],
2208    new_table: str,
2209    flavor: str,
2210    schema: Optional[str] = None,
2211    primary_key: Optional[str] = None,
2212    primary_key_db_type: Optional[str] = None,
2213    autoincrement: bool = False,
2214    datetime_column: Optional[str] = None,
2215) -> List[str]:
2216    """
2217    Create a new table from a `dtypes` dictionary.
2218    """
2219    from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type, AUTO_INCREMENT_COLUMN_FLAVORS
2220    if not dtypes and not primary_key:
2221        raise ValueError(f"Expecting columns for table '{new_table}'.")
2222
2223    if flavor in SKIP_AUTO_INCREMENT_FLAVORS:
2224        autoincrement = False
2225
2226    cols_types = (
2227        [
2228            (
2229                primary_key,
2230                get_db_type_from_pd_type(dtypes.get(primary_key, 'int') or 'int', flavor=flavor)
2231            )
2232        ]
2233        if primary_key
2234        else []
2235    ) + [
2236        (col, get_db_type_from_pd_type(typ, flavor=flavor))
2237        for col, typ in dtypes.items()
2238        if col != primary_key
2239    ]
2240
2241    table_name = sql_item_name(new_table, schema=schema, flavor=flavor)
2242    primary_key_name = sql_item_name(primary_key, flavor) if primary_key else None
2243    primary_key_constraint_name = (
2244        sql_item_name(f'PK_{new_table}', flavor, None)
2245        if primary_key
2246        else None
2247    )
2248    datetime_column_name = sql_item_name(datetime_column, flavor) if datetime_column else None
2249    primary_key_clustered = (
2250        "CLUSTERED"
2251        if not datetime_column or datetime_column == primary_key
2252        else "NONCLUSTERED"
2253    )
2254    query = f"CREATE TABLE {table_name} ("
2255    if primary_key:
2256        col_db_type = cols_types[0][1]
2257        auto_increment_str = (' ' + AUTO_INCREMENT_COLUMN_FLAVORS.get(
2258            flavor,
2259            AUTO_INCREMENT_COLUMN_FLAVORS['default']
2260        )) if autoincrement or primary_key not in dtypes else ''
2261        col_name = sql_item_name(primary_key, flavor=flavor, schema=None)
2262
2263        if flavor in ('sqlite', 'geopackage'):
2264            query += (
2265                f"\n    {col_name} "
2266                + (f"{col_db_type}" if not auto_increment_str else 'INTEGER')
2267                + f" PRIMARY KEY{auto_increment_str} NOT NULL,"
2268            )
2269        elif flavor == 'oracle':
2270            query += f"\n    {col_name} {col_db_type} {auto_increment_str} PRIMARY KEY,"
2271        elif (
2272            flavor in ('timescaledb', 'timescaledb-ha')
2273            and datetime_column
2274            and datetime_column != primary_key
2275        ):
2276            query += f"\n    {col_name} {col_db_type}{auto_increment_str} NOT NULL,"
2277        elif flavor == 'mssql':
2278            query += f"\n    {col_name} {col_db_type}{auto_increment_str} NOT NULL,"
2279        else:
2280            query += f"\n    {col_name} {col_db_type} PRIMARY KEY{auto_increment_str} NOT NULL,"
2281
2282    for col, db_type in cols_types:
2283        if col == primary_key:
2284            continue
2285        col_name = sql_item_name(col, schema=None, flavor=flavor)
2286        query += f"\n    {col_name} {db_type},"
2287    if (
2288        flavor in ('timescaledb', 'timescaledb-ha')
2289        and datetime_column
2290        and primary_key
2291        and datetime_column != primary_key
2292    ):
2293        query += f"\n    PRIMARY KEY({datetime_column_name}, {primary_key_name}),"
2294
2295    if flavor == 'mssql' and primary_key:
2296        query += f"\n    CONSTRAINT {primary_key_constraint_name} PRIMARY KEY {primary_key_clustered} ({primary_key_name}),"
2297
2298    query = query[:-1]
2299    query += "\n)"
2300
2301    queries = [query]
2302    return queries
2303
2304
2305def _get_create_table_query_from_cte(
2306    query: str,
2307    new_table: str,
2308    flavor: str,
2309    schema: Optional[str] = None,
2310    primary_key: Optional[str] = None,
2311    primary_key_db_type: Optional[str] = None,
2312    autoincrement: bool = False,
2313    datetime_column: Optional[str] = None,
2314) -> List[str]:
2315    """
2316    Create a new table from a CTE query.
2317    """
2318    import textwrap
2319    create_cte = 'create_query'
2320    create_cte_name = sql_item_name(create_cte, flavor, None)
2321    new_table_name = sql_item_name(new_table, flavor, schema)
2322    primary_key_constraint_name = (
2323        sql_item_name(f'PK_{new_table}', flavor, None)
2324        if primary_key
2325        else None
2326    )
2327    primary_key_name = (
2328        sql_item_name(primary_key, flavor, None)
2329        if primary_key
2330        else None
2331    )
2332    primary_key_clustered = (
2333        "CLUSTERED"
2334        if not datetime_column or datetime_column == primary_key
2335        else "NONCLUSTERED"
2336    )
2337    datetime_column_name = (
2338        sql_item_name(datetime_column, flavor)
2339        if datetime_column
2340        else None
2341    )
2342    if flavor in ('mssql',):
2343        query = query.lstrip()
2344        if query.lower().startswith('with '):
2345            final_select_ix = query.lower().rfind('select')
2346            create_table_queries = [
2347                (
2348                    query[:final_select_ix].rstrip() + ',\n'
2349                    + f"{create_cte_name} AS (\n"
2350                    + textwrap.indent(query[final_select_ix:], '    ')
2351                    + "\n)\n"
2352                    + f"SELECT *\nINTO {new_table_name}\nFROM {create_cte_name}"
2353                ),
2354            ]
2355        else:
2356            create_table_queries = [
2357                (
2358                    "SELECT *\n"
2359                    f"INTO {new_table_name}\n"
2360                    f"FROM (\n{textwrap.indent(query, '    ')}\n) AS {create_cte_name}"
2361                ),
2362            ]
2363
2364        alter_type_queries = []
2365        if primary_key_db_type:
2366            alter_type_queries.extend([
2367                (
2368                    f"ALTER TABLE {new_table_name}\n"
2369                    f"ALTER COLUMN {primary_key_name} {primary_key_db_type} NOT NULL"
2370                ),
2371            ])
2372        alter_type_queries.extend([
2373            (
2374                f"ALTER TABLE {new_table_name}\n"
2375                f"ADD CONSTRAINT {primary_key_constraint_name} "
2376                f"PRIMARY KEY {primary_key_clustered} ({primary_key_name})"
2377            ),
2378        ])
2379    elif flavor in (None,):
2380        create_table_queries = [
2381            (
2382                f"WITH {create_cte_name} AS (\n{textwrap.index(query, '    ')}\n)\n"
2383                f"CREATE TABLE {new_table_name} AS\n"
2384                "SELECT *\n"
2385                f"FROM {create_cte_name}"
2386            ),
2387        ]
2388
2389        alter_type_queries = [
2390            (
2391                f"ALTER TABLE {new_table_name}\n"
2392                f"ADD PRIMARY KEY ({primary_key_name})"
2393            ),
2394        ]
2395    elif flavor in ('sqlite', 'mysql', 'mariadb', 'duckdb', 'oracle', 'geopackage'):
2396        create_table_queries = [
2397            (
2398                f"CREATE TABLE {new_table_name} AS\n"
2399                "SELECT *\n"
2400                f"FROM (\n{textwrap.indent(query, '    ')}\n)"
2401                + (f" AS {create_cte_name}" if flavor != 'oracle' else '')
2402            ),
2403        ]
2404
2405        alter_type_queries = [
2406            (
2407                f"ALTER TABLE {new_table_name}\n"
2408                "ADD PRIMARY KEY ({primary_key_name})"
2409            ),
2410        ]
2411    elif (
2412        flavor in ('timescaledb', 'timescaledb-ha')
2413        and datetime_column
2414        and datetime_column != primary_key
2415    ):
2416        create_table_queries = [
2417            (
2418                "SELECT *\n"
2419                f"INTO {new_table_name}\n"
2420                f"FROM (\n{textwrap.indent(query, '    ')}\n) AS {create_cte_name}\n"
2421            ),
2422        ]
2423
2424        alter_type_queries = [
2425            (
2426                f"ALTER TABLE {new_table_name}\n"
2427                f"ADD PRIMARY KEY ({datetime_column_name}, {primary_key_name})"
2428            ),
2429        ]
2430    else:
2431        create_table_queries = [
2432            (
2433                "SELECT *\n"
2434                f"INTO {new_table_name}\n"
2435                f"FROM (\n{textwrap.indent(query, '    ')}\n) AS {create_cte_name}"
2436            ),
2437        ]
2438
2439        alter_type_queries = [
2440            (
2441                f"ALTER TABLE {new_table_name}\n"
2442                f"ADD PRIMARY KEY ({primary_key_name})"
2443            ),
2444        ]
2445
2446    if not primary_key:
2447        return create_table_queries
2448
2449    return create_table_queries + alter_type_queries
2450
2451
2452def wrap_query_with_cte(
2453    sub_query: str,
2454    parent_query: str,
2455    flavor: str,
2456    cte_name: str = "src",
2457) -> str:
2458    """
2459    Wrap a subquery in a CTE and append an encapsulating query.
2460
2461    Parameters
2462    ----------
2463    sub_query: str
2464        The query to be referenced. This may itself contain CTEs.
2465        Unless `cte_name` is provided, this will be aliased as `src`.
2466
2467    parent_query: str
2468        The larger query to append which references the subquery.
2469        This must not contain CTEs.
2470
2471    flavor: str
2472        The database flavor, e.g. `'mssql'`.
2473
2474    cte_name: str, default 'src'
2475        The CTE alias, defaults to `src`.
2476
2477    Returns
2478    -------
2479    An encapsulating query which allows you to treat `sub_query` as a temporary table.
2480
2481    Examples
2482    --------
2483
2484    ```python
2485    from meerschaum.utils.sql import wrap_query_with_cte
2486    sub_query = "WITH foo AS (SELECT 1 AS val) SELECT (val * 2) AS newval FROM foo"
2487    parent_query = "SELECT newval * 3 FROM src"
2488    query = wrap_query_with_cte(sub_query, parent_query, 'mssql')
2489    print(query)
2490    # WITH foo AS (SELECT 1 AS val),
2491    # [src] AS (
2492    #     SELECT (val * 2) AS newval FROM foo
2493    # )
2494    # SELECT newval * 3 FROM src
2495    ```
2496
2497    """
2498    import textwrap
2499    sub_query = sub_query.lstrip()
2500    cte_name_quoted = sql_item_name(cte_name, flavor, None)
2501
2502    if flavor in NO_CTE_FLAVORS:
2503        return (
2504            parent_query
2505            .replace(cte_name_quoted, '--MRSM_SUBQUERY--')
2506            .replace(cte_name, '--MRSM_SUBQUERY--')
2507            .replace('--MRSM_SUBQUERY--', f"(\n{sub_query}\n) AS {cte_name_quoted}")
2508        )
2509
2510    if sub_query.lstrip().lower().startswith('with '):
2511        final_select_ix = sub_query.lower().rfind('select')
2512        return (
2513            sub_query[:final_select_ix].rstrip() + ',\n'
2514            + f"{cte_name_quoted} AS (\n"
2515            + '    ' + sub_query[final_select_ix:]
2516            + "\n)\n"
2517            + parent_query
2518        )
2519
2520    return (
2521        f"WITH {cte_name_quoted} AS (\n"
2522        f"{textwrap.indent(sub_query, '    ')}\n"
2523        f")\n{parent_query}"
2524    )
2525
2526
2527def format_cte_subquery(
2528    sub_query: str,
2529    flavor: str,
2530    sub_name: str = 'src',
2531    cols_to_select: Union[List[str], str] = '*',
2532) -> str:
2533    """
2534    Given a subquery, build a wrapper query that selects from the CTE subquery.
2535
2536    Parameters
2537    ----------
2538    sub_query: str
2539        The subquery to wrap.
2540
2541    flavor: str
2542        The database flavor to use for the query (e.g. `'mssql'`, `'postgresql'`.
2543
2544    sub_name: str, default 'src'
2545        If possible, give this name to the CTE (must be unquoted).
2546
2547    cols_to_select: Union[List[str], str], default ''
2548        If specified, choose which columns to select from the CTE.
2549        If a list of strings is provided, each item will be quoted and joined with commas.
2550        If a string is given, assume it is quoted and insert it into the query.
2551
2552    Returns
2553    -------
2554    A wrapper query that selects from the CTE.
2555    """
2556    quoted_sub_name = sql_item_name(sub_name, flavor, None)
2557    cols_str = (
2558        cols_to_select
2559        if isinstance(cols_to_select, str)
2560        else ', '.join([sql_item_name(col, flavor, None) for col in cols_to_select])
2561    )
2562    parent_query = (
2563        f"SELECT {cols_str}\n"
2564        f"FROM {quoted_sub_name}"
2565    )
2566    return wrap_query_with_cte(sub_query, parent_query, flavor, cte_name=sub_name)
2567
2568
2569def session_execute(
2570    session: 'sqlalchemy.orm.session.Session',
2571    queries: Union[List[str], str],
2572    with_results: bool = False,
2573    debug: bool = False,
2574) -> Union[mrsm.SuccessTuple, Tuple[mrsm.SuccessTuple, List['sqlalchemy.sql.ResultProxy']]]:
2575    """
2576    Similar to `SQLConnector.exec_queries()`, execute a list of queries
2577    and roll back when one fails.
2578
2579    Parameters
2580    ----------
2581    session: sqlalchemy.orm.session.Session
2582        A SQLAlchemy session representing a transaction.
2583
2584    queries: Union[List[str], str]
2585        A query or list of queries to be executed.
2586        If a query fails, roll back the session.
2587
2588    with_results: bool, default False
2589        If `True`, return a list of result objects.
2590
2591    Returns
2592    -------
2593    A `SuccessTuple` indicating the queries were successfully executed.
2594    If `with_results`, return the `SuccessTuple` and a list of results.
2595    """
2596    sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False)
2597    if not isinstance(queries, list):
2598        queries = [queries]
2599    successes, msgs, results = [], [], []
2600    for query in queries:
2601        if debug:
2602            dprint(query)
2603        query_text = sqlalchemy.text(query)
2604        fail_msg = "Failed to execute queries."
2605        try:
2606            result = session.execute(query_text)
2607            query_success = result is not None
2608            query_msg = "Success" if query_success else fail_msg
2609        except Exception as e:
2610            query_success = False
2611            query_msg = f"{fail_msg}\n{e}"
2612            result = None
2613        successes.append(query_success)
2614        msgs.append(query_msg)
2615        results.append(result)
2616        if not query_success:
2617            if debug:
2618                dprint("Rolling back session.")
2619            session.rollback()
2620            break
2621    success, msg = all(successes), '\n'.join(msgs)
2622    if with_results:
2623        return (success, msg), results
2624    return success, msg
2625
2626
2627def get_reset_autoincrement_queries(
2628    table: str,
2629    column: str,
2630    connector: mrsm.connectors.SQLConnector,
2631    schema: Optional[str] = None,
2632    debug: bool = False,
2633) -> List[str]:
2634    """
2635    Return a list of queries to reset a table's auto-increment counter to the next largest value.
2636
2637    Parameters
2638    ----------
2639    table: str
2640        The name of the table on which the auto-incrementing column exists.
2641
2642    column: str
2643        The name of the auto-incrementing column.
2644
2645    connector: mrsm.connectors.SQLConnector
2646        The SQLConnector to the database on which the table exists.
2647
2648    schema: Optional[str], default None
2649        The schema of the table. Defaults to `connector.schema`.
2650
2651    Returns
2652    -------
2653    A list of queries to be executed to reset the auto-incrementing column.
2654    """
2655    if not table_exists(table, connector, schema=schema, debug=debug):
2656        return []
2657
2658    schema = schema or connector.schema
2659    max_id_name = sql_item_name('max_id', connector.flavor)
2660    table_name = sql_item_name(table, connector.flavor, schema)
2661    table_seq_name = sql_item_name(table + '_' + column + '_seq', connector.flavor, schema)
2662    column_name = sql_item_name(column, connector.flavor)
2663    max_id = connector.value(
2664        f"""
2665        SELECT COALESCE(MAX({column_name}), 0) AS {max_id_name}
2666        FROM {table_name}
2667        """,
2668        debug=debug,
2669    )
2670    if max_id is None:
2671        return []
2672
2673    reset_queries = reset_autoincrement_queries.get(
2674        connector.flavor,
2675        reset_autoincrement_queries['default']
2676    )
2677    if not isinstance(reset_queries, list):
2678        reset_queries = [reset_queries]
2679
2680    return [
2681        query.format(
2682            column=column,
2683            column_name=column_name,
2684            table=table,
2685            table_name=table_name,
2686            table_seq_name=table_seq_name,
2687            val=max_id,
2688            val_plus_1=(max_id + 1),
2689        )
2690        for query in reset_queries
2691    ]
2692
2693
2694def get_postgis_geo_columns_types(
2695    connectable: Union[
2696        'mrsm.connectors.sql.SQLConnector',
2697        'sqlalchemy.orm.session.Session',
2698        'sqlalchemy.engine.base.Engine'
2699    ],
2700    table: str,
2701    schema: Optional[str] = 'public',
2702    debug: bool = False,
2703) -> Dict[str, str]:
2704    """
2705    Return a dictionary mapping PostGIS geometry column names to geometry types.
2706    """
2707    from meerschaum.utils.dtypes import get_geometry_type_srid
2708    sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False)
2709    default_type, default_srid = get_geometry_type_srid()
2710    default_type = default_type.upper()
2711
2712    clean(table)
2713    clean(str(schema))
2714    schema = schema or 'public'
2715    truncated_schema_name = truncate_item_name(schema, flavor='postgis')
2716    truncated_table_name = truncate_item_name(table, flavor='postgis')
2717    query = sqlalchemy.text(
2718        "SELECT \"f_geometry_column\" AS \"column\", 'GEOMETRY' AS \"func\", \"type\", \"srid\"\n"
2719        "FROM \"geometry_columns\"\n"
2720        f"WHERE \"f_table_schema\" = '{truncated_schema_name}'\n"
2721        f"    AND \"f_table_name\" = '{truncated_table_name}'\n"
2722        "UNION ALL\n"
2723        "SELECT \"f_geography_column\" AS \"column\", 'GEOGRAPHY' AS \"func\", \"type\", \"srid\"\n"
2724        "FROM \"geography_columns\"\n"
2725        f"WHERE \"f_table_schema\" = '{truncated_schema_name}'\n"
2726        f"    AND \"f_table_name\" = '{truncated_table_name}'\n"
2727    )
2728    debug_kwargs = {'debug': debug} if isinstance(connectable, mrsm.connectors.SQLConnector) else {}
2729    result_rows = [
2730        row
2731        for row in connectable.execute(query, **debug_kwargs).fetchall()
2732    ]
2733    cols_type_tuples = {
2734        row[0]: (row[1], row[2], row[3])
2735        for row in result_rows
2736    }
2737
2738    geometry_cols_types = {
2739        col: (
2740            f"{func}({typ.upper()}, {srid})"
2741            if srid
2742            else (
2743                func
2744                + (
2745                    f'({typ.upper()})'
2746                    if typ.upper() not in ('GEOMETRY', 'GEOGRAPHY')
2747                    else ''
2748                )
2749            )
2750        )
2751        for col, (func, typ, srid) in cols_type_tuples.items()
2752    }
2753    return geometry_cols_types
2754
2755
2756def get_create_schema_if_not_exists_queries(
2757    schema: str,
2758    flavor: str,
2759) -> List[str]:
2760    """
2761    Return the queries to create a schema if it does not yet exist.
2762    For databases which do not support schemas, an empty list will be returned.
2763    """
2764    if not schema:
2765        return []
2766    
2767    if flavor in NO_SCHEMA_FLAVORS:
2768        return []
2769
2770    if schema == DEFAULT_SCHEMA_FLAVORS.get(flavor, None):
2771        return []
2772
2773    clean(schema)
2774
2775    if flavor == 'mssql':
2776        return [
2777            (
2778                f"IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = '{schema}')\n"
2779                "BEGIN\n"
2780                f"    EXEC('CREATE SCHEMA {sql_item_name(schema, flavor)}');\n"
2781                "END;"
2782            )
2783        ]
2784
2785    if flavor == 'oracle':
2786        return []
2787
2788    return [
2789        f"CREATE SCHEMA IF NOT EXISTS {sql_item_name(schema, flavor)};"
2790    ]
test_queries = {'default': 'SELECT 1', 'oracle': 'SELECT 1 FROM DUAL', 'informix': 'SELECT COUNT(*) FROM systables', 'hsqldb': 'SELECT 1 FROM INFORMATION_SCHEMA.SYSTEM_USERS'}
exists_queries = {'default': 'SELECT COUNT(*) FROM {table_name} WHERE 1 = 0'}
version_queries = {'default': 'SELECT VERSION() AS {version_name}', 'sqlite': 'SELECT SQLITE_VERSION() AS {version_name}', 'geopackage': 'SELECT SQLITE_VERSION() AS {version_name}', 'mssql': 'SELECT @@version', 'oracle': 'SELECT version from PRODUCT_COMPONENT_VERSION WHERE rownum = 1'}
SKIP_IF_EXISTS_FLAVORS = {'mssql', 'oracle'}
DROP_IF_EXISTS_FLAVORS = {'citus', 'mysql', 'timescaledb', 'mssql', 'postgis', 'geopackage', 'postgresql', 'mariadb', 'sqlite', 'timescaledb-ha'}
DROP_INDEX_IF_EXISTS_FLAVORS = {'citus', 'timescaledb', 'mssql', 'postgis', 'geopackage', 'postgresql', 'sqlite', 'timescaledb-ha'}
SKIP_AUTO_INCREMENT_FLAVORS = {'duckdb', 'citus'}
COALESCE_UNIQUE_INDEX_FLAVORS = {'citus', 'timescaledb', 'postgresql', 'timescaledb-ha', 'postgis'}
UPDATE_QUERIES = {'default': '\n UPDATE {target_table_name} AS f\n {sets_subquery_none}\n FROM {target_table_name} AS t\n INNER JOIN (SELECT {patch_cols_str} FROM {patch_table_name}) AS p\n ON\n {and_subquery_t}\n WHERE\n {and_subquery_f}\n AND\n {date_bounds_subquery}\n ', 'timescaledb-upsert': '\n INSERT INTO {target_table_name} ({patch_cols_str})\n SELECT {patch_cols_str}\n FROM {patch_table_name}\n ON CONFLICT ({join_cols_str}) DO {update_or_nothing} {sets_subquery_none_excluded}\n ', 'timescaledb-ha-upsert': '\n INSERT INTO {target_table_name} ({patch_cols_str})\n SELECT {patch_cols_str}\n FROM {patch_table_name}\n ON CONFLICT ({join_cols_str}) DO {update_or_nothing} {sets_subquery_none_excluded}\n ', 'postgresql-upsert': '\n INSERT INTO {target_table_name} ({patch_cols_str})\n SELECT {patch_cols_str}\n FROM {patch_table_name}\n ON CONFLICT ({join_cols_str}) DO {update_or_nothing} {sets_subquery_none_excluded}\n ', 'postgis-upsert': '\n INSERT INTO {target_table_name} ({patch_cols_str})\n SELECT {patch_cols_str}\n FROM {patch_table_name}\n ON CONFLICT ({join_cols_str}) DO {update_or_nothing} {sets_subquery_none_excluded}\n ', 'citus-upsert': '\n INSERT INTO {target_table_name} ({patch_cols_str})\n SELECT {patch_cols_str}\n FROM {patch_table_name}\n ON CONFLICT ({join_cols_str}) DO {update_or_nothing} {sets_subquery_none_excluded}\n ', 'cockroachdb-upsert': '\n INSERT INTO {target_table_name} ({patch_cols_str})\n SELECT {patch_cols_str}\n FROM {patch_table_name}\n ON CONFLICT ({join_cols_str}) DO {update_or_nothing} {sets_subquery_none_excluded}\n ', 'mysql': '\n UPDATE {target_table_name} AS f\n JOIN (SELECT {patch_cols_str} FROM {patch_table_name}) AS p\n ON\n {and_subquery_f}\n {sets_subquery_f}\n WHERE\n {date_bounds_subquery}\n ', 'mysql-upsert': '\n INSERT {ignore}INTO {target_table_name} ({patch_cols_str})\n SELECT {patch_cols_str}\n FROM {patch_table_name}\n {on_duplicate_key_update}\n {cols_equal_values}\n ', 'mariadb': '\n UPDATE {target_table_name} AS f\n JOIN (SELECT {patch_cols_str} FROM {patch_table_name}) AS p\n ON\n {and_subquery_f}\n {sets_subquery_f}\n WHERE\n {date_bounds_subquery}\n ', 'mariadb-upsert': '\n INSERT {ignore}INTO {target_table_name} ({patch_cols_str})\n SELECT {patch_cols_str}\n FROM {patch_table_name}\n {on_duplicate_key_update}\n {cols_equal_values}\n ', 'mssql': '\n {with_temp_date_bounds}\n MERGE {target_table_name} f\n USING (SELECT {patch_cols_str} FROM {patch_table_name}) p\n ON\n {and_subquery_f}\n AND\n {date_bounds_subquery}\n WHEN MATCHED THEN\n UPDATE\n {sets_subquery_none};\n ', 'mssql-upsert': ['{identity_insert_on}', '\n {with_temp_date_bounds}\n MERGE {target_table_name} f\n USING (SELECT {patch_cols_str} FROM {patch_table_name}) p\n ON\n {and_subquery_f}\n AND\n {date_bounds_subquery}{when_matched_update_sets_subquery_none}\n WHEN NOT MATCHED THEN\n INSERT ({patch_cols_str})\n VALUES ({patch_cols_prefixed_str});\n ', '{identity_insert_off}'], 'oracle': '\n MERGE INTO {target_table_name} f\n USING (SELECT {patch_cols_str} FROM {patch_table_name}) p\n ON (\n {and_subquery_f}\n AND\n {date_bounds_subquery}\n )\n WHEN MATCHED THEN\n UPDATE\n {sets_subquery_none}\n ', 'oracle-upsert': '\n MERGE INTO {target_table_name} f\n USING (SELECT {patch_cols_str} FROM {patch_table_name}) p\n ON (\n {and_subquery_f}\n AND\n {date_bounds_subquery}\n ){when_matched_update_sets_subquery_none}\n WHEN NOT MATCHED THEN\n INSERT ({patch_cols_str})\n VALUES ({patch_cols_prefixed_str})\n ', 'sqlite-upsert': '\n INSERT INTO {target_table_name} ({patch_cols_str})\n SELECT {patch_cols_str}\n FROM {patch_table_name}\n WHERE true\n ON CONFLICT ({join_cols_str}) DO {update_or_nothing} {sets_subquery_none_excluded}\n ', 'sqlite_delete_insert': ['\n DELETE FROM {target_table_name} AS f\n WHERE ROWID IN (\n SELECT t.ROWID\n FROM {target_table_name} AS t\n INNER JOIN (SELECT * FROM {patch_table_name}) AS p\n ON {and_subquery_t}\n );\n ', '\n INSERT INTO {target_table_name} AS f\n SELECT {patch_cols_str} FROM {patch_table_name} AS p\n '], 'geopackage-upsert': '\n INSERT INTO {target_table_name} ({patch_cols_str})\n SELECT {patch_cols_str}\n FROM {patch_table_name}\n WHERE true\n ON CONFLICT ({join_cols_str}) DO {update_or_nothing} {sets_subquery_none_excluded}\n '}
columns_types_queries = {'default': "\n SELECT\n table_catalog AS database,\n table_schema AS schema,\n table_name AS table,\n column_name AS column,\n data_type AS type,\n numeric_precision,\n numeric_scale\n FROM information_schema.columns\n WHERE table_name IN ('{table}', '{table_trunc}')\n ", 'sqlite': '\n SELECT\n \'\' "database",\n \'\' "schema",\n m.name "table",\n p.name "column",\n p.type "type"\n FROM sqlite_master m\n LEFT OUTER JOIN pragma_table_info(m.name) p\n ON m.name <> p.name\n WHERE m.type = \'table\'\n AND m.name IN (\'{table}\', \'{table_trunc}\')\n ', 'geopackage': '\n SELECT\n \'\' "database",\n \'\' "schema",\n m.name "table",\n p.name "column",\n p.type "type"\n FROM sqlite_master m\n LEFT OUTER JOIN pragma_table_info(m.name) p\n ON m.name <> p.name\n WHERE m.type = \'table\'\n AND m.name IN (\'{table}\', \'{table_trunc}\')\n ', 'mssql': "\n SELECT\n TABLE_CATALOG AS [database],\n TABLE_SCHEMA AS [schema],\n TABLE_NAME AS [table],\n COLUMN_NAME AS [column],\n DATA_TYPE AS [type],\n NUMERIC_PRECISION AS [numeric_precision],\n NUMERIC_SCALE AS [numeric_scale]\n FROM {db_prefix}INFORMATION_SCHEMA.COLUMNS WITH (NOLOCK)\n WHERE TABLE_NAME IN (\n '{table}',\n '{table_trunc}'\n )\n\n ", 'mysql': "\n SELECT\n TABLE_SCHEMA `database`,\n TABLE_SCHEMA `schema`,\n TABLE_NAME `table`,\n COLUMN_NAME `column`,\n DATA_TYPE `type`,\n NUMERIC_PRECISION `numeric_precision`,\n NUMERIC_SCALE `numeric_scale`\n FROM INFORMATION_SCHEMA.COLUMNS\n WHERE TABLE_NAME IN ('{table}', '{table_trunc}')\n ", 'mariadb': "\n SELECT\n TABLE_SCHEMA `database`,\n TABLE_SCHEMA `schema`,\n TABLE_NAME `table`,\n COLUMN_NAME `column`,\n DATA_TYPE `type`,\n NUMERIC_PRECISION `numeric_precision`,\n NUMERIC_SCALE `numeric_scale`\n FROM INFORMATION_SCHEMA.COLUMNS\n WHERE TABLE_NAME IN ('{table}', '{table_trunc}')\n ", 'oracle': '\n SELECT\n NULL AS "database",\n NULL AS "schema",\n TABLE_NAME AS "table",\n COLUMN_NAME AS "column",\n DATA_TYPE AS "type",\n DATA_PRECISION AS "numeric_precision",\n DATA_SCALE AS "numeric_scale"\n FROM all_tab_columns\n WHERE TABLE_NAME IN (\n \'{table}\',\n \'{table_trunc}\',\n \'{table_lower}\',\n \'{table_lower_trunc}\',\n \'{table_upper}\',\n \'{table_upper_trunc}\'\n )\n '}
hypertable_queries = {'timescaledb': "SELECT hypertable_size('{table_name}')", 'timescaledb-ha': "SELECT hypertable_size('{table_name}')", 'citus': "SELECT citus_table_size('{table_name}')"}
columns_indices_queries = {'default': '\n SELECT\n current_database() AS "database",\n n.nspname AS "schema",\n t.relname AS "table",\n c.column_name AS "column",\n i.relname AS "index",\n CASE WHEN con.contype = \'p\' THEN \'PRIMARY KEY\' ELSE \'INDEX\' END AS "index_type"\n FROM pg_class t\n INNER JOIN pg_index AS ix\n ON t.oid = ix.indrelid\n INNER JOIN pg_class AS i\n ON i.oid = ix.indexrelid\n INNER JOIN pg_namespace AS n\n ON n.oid = t.relnamespace\n INNER JOIN pg_attribute AS a\n ON a.attnum = ANY(ix.indkey)\n AND a.attrelid = t.oid\n INNER JOIN information_schema.columns AS c\n ON c.column_name = a.attname\n AND c.table_name = t.relname\n AND c.table_schema = n.nspname\n LEFT JOIN pg_constraint AS con\n ON con.conindid = i.oid\n AND con.contype = \'p\'\n WHERE\n t.relname IN (\'{table}\', \'{table_trunc}\')\n AND n.nspname = \'{schema}\'\n ', 'sqlite': '\n WITH indexed_columns AS (\n SELECT\n \'{table}\' AS table_name,\n pi.name AS column_name,\n i.name AS index_name,\n \'INDEX\' AS index_type\n FROM\n sqlite_master AS i,\n pragma_index_info(i.name) AS pi\n WHERE\n i.type = \'index\'\n AND i.tbl_name = \'{table}\'\n ),\n primary_key_columns AS (\n SELECT\n \'{table}\' AS table_name,\n ti.name AS column_name,\n \'PRIMARY_KEY\' AS index_name,\n \'PRIMARY KEY\' AS index_type\n FROM\n pragma_table_info(\'{table}\') AS ti\n WHERE\n ti.pk > 0\n )\n SELECT\n NULL AS "database",\n NULL AS "schema",\n "table_name" AS "table",\n "column_name" AS "column",\n "index_name" AS "index",\n "index_type"\n FROM indexed_columns\n UNION ALL\n SELECT\n NULL AS "database",\n NULL AS "schema",\n table_name AS "table",\n column_name AS "column",\n index_name AS "index",\n index_type\n FROM primary_key_columns\n ', 'geopackage': '\n WITH indexed_columns AS (\n SELECT\n \'{table}\' AS table_name,\n pi.name AS column_name,\n i.name AS index_name,\n \'INDEX\' AS index_type\n FROM\n sqlite_master AS i,\n pragma_index_info(i.name) AS pi\n WHERE\n i.type = \'index\'\n AND i.tbl_name = \'{table}\'\n ),\n primary_key_columns AS (\n SELECT\n \'{table}\' AS table_name,\n ti.name AS column_name,\n \'PRIMARY_KEY\' AS index_name,\n \'PRIMARY KEY\' AS index_type\n FROM\n pragma_table_info(\'{table}\') AS ti\n WHERE\n ti.pk > 0\n )\n SELECT\n NULL AS "database",\n NULL AS "schema",\n "table_name" AS "table",\n "column_name" AS "column",\n "index_name" AS "index",\n "index_type"\n FROM indexed_columns\n UNION ALL\n SELECT\n NULL AS "database",\n NULL AS "schema",\n table_name AS "table",\n column_name AS "column",\n index_name AS "index",\n index_type\n FROM primary_key_columns\n ', 'mssql': "\n SELECT\n NULL AS [database],\n s.name AS [schema],\n t.name AS [table],\n c.name AS [column],\n i.name AS [index],\n CASE\n WHEN kc.type = 'PK' THEN 'PRIMARY KEY'\n ELSE 'INDEX'\n END AS [index_type],\n CASE\n WHEN i.type = 1 THEN CAST(1 AS BIT)\n ELSE CAST(0 AS BIT)\n END AS [clustered]\n FROM\n sys.schemas s WITH (NOLOCK)\n INNER JOIN sys.tables t WITH (NOLOCK)\n ON s.schema_id = t.schema_id\n INNER JOIN sys.indexes i WITH (NOLOCK)\n ON t.object_id = i.object_id\n INNER JOIN sys.index_columns ic WITH (NOLOCK)\n ON i.object_id = ic.object_id\n AND i.index_id = ic.index_id\n INNER JOIN sys.columns c WITH (NOLOCK)\n ON ic.object_id = c.object_id\n AND ic.column_id = c.column_id\n LEFT JOIN sys.key_constraints kc WITH (NOLOCK)\n ON kc.parent_object_id = i.object_id\n AND kc.type = 'PK'\n AND kc.name = i.name\n WHERE\n t.name IN ('{table}', '{table_trunc}')\n AND s.name = '{schema}'\n AND i.type IN (1, 2)\n ", 'oracle': '\n SELECT\n NULL AS "database",\n ic.table_owner AS "schema",\n ic.table_name AS "table",\n ic.column_name AS "column",\n i.index_name AS "index",\n CASE\n WHEN c.constraint_type = \'P\' THEN \'PRIMARY KEY\'\n WHEN i.uniqueness = \'UNIQUE\' THEN \'UNIQUE INDEX\'\n ELSE \'INDEX\'\n END AS index_type\n FROM\n all_ind_columns ic\n INNER JOIN all_indexes i\n ON ic.index_name = i.index_name\n AND ic.table_owner = i.owner\n LEFT JOIN all_constraints c\n ON i.index_name = c.constraint_name\n AND i.table_owner = c.owner\n AND c.constraint_type = \'P\'\n WHERE ic.table_name IN (\n \'{table}\',\n \'{table_trunc}\',\n \'{table_upper}\',\n \'{table_upper_trunc}\'\n )\n ', 'mysql': "\n SELECT\n TABLE_SCHEMA AS `database`,\n TABLE_SCHEMA AS `schema`,\n TABLE_NAME AS `table`,\n COLUMN_NAME AS `column`,\n INDEX_NAME AS `index`,\n CASE\n WHEN NON_UNIQUE = 0 THEN 'PRIMARY KEY'\n ELSE 'INDEX'\n END AS `index_type`\n FROM\n information_schema.STATISTICS\n WHERE\n TABLE_NAME IN ('{table}', '{table_trunc}')\n ", 'mariadb': "\n SELECT\n TABLE_SCHEMA AS `database`,\n TABLE_SCHEMA AS `schema`,\n TABLE_NAME AS `table`,\n COLUMN_NAME AS `column`,\n INDEX_NAME AS `index`,\n CASE\n WHEN NON_UNIQUE = 0 THEN 'PRIMARY KEY'\n ELSE 'INDEX'\n END AS `index_type`\n FROM\n information_schema.STATISTICS\n WHERE\n TABLE_NAME IN ('{table}', '{table_trunc}')\n "}
reset_autoincrement_queries: Dict[str, Union[str, List[str]]] = {'default': "\n SELECT SETVAL(pg_get_serial_sequence('{table_name}', '{column}'), {val})\n FROM {table_name}\n ", 'mssql': "\n DBCC CHECKIDENT ('{table_name}', RESEED, {val})\n ", 'mysql': '\n ALTER TABLE {table_name} AUTO_INCREMENT = {val}\n ', 'mariadb': '\n ALTER TABLE {table_name} AUTO_INCREMENT = {val}\n ', 'sqlite': "\n UPDATE sqlite_sequence\n SET seq = {val}\n WHERE name = '{table}'\n ", 'geopackage': "\n UPDATE sqlite_sequence\n SET seq = {val}\n WHERE name = '{table}'\n ", 'oracle': 'ALTER TABLE {table_name} MODIFY {column_name} GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH {val_plus_1})'}
table_wrappers = {'default': ('"', '"'), 'timescaledb': ('"', '"'), 'timescaledb-ha': ('"', '"'), 'citus': ('"', '"'), 'duckdb': ('"', '"'), 'postgresql': ('"', '"'), 'postgis': ('"', '"'), 'sqlite': ('"', '"'), 'geopackage': ('"', '"'), 'mysql': ('`', '`'), 'mariadb': ('`', '`'), 'mssql': ('[', ']'), 'cockroachdb': ('"', '"'), 'oracle': ('"', '"')}
max_name_lens = {'default': 64, 'mssql': 128, 'oracle': 30, 'postgresql': 64, 'postgis': 64, 'timescaledb': 64, 'timescaledb-ha': 64, 'citus': 64, 'cockroachdb': 64, 'sqlite': 1024, 'geopackage': 1024, 'mysql': 64, 'mariadb': 64}
json_flavors = {'citus', 'cockroachdb', 'timescaledb', 'postgresql', 'timescaledb-ha', 'postgis'}
NO_SCHEMA_FLAVORS = {'geopackage', 'duckdb', 'mysql', 'mariadb', 'sqlite', 'oracle'}
DEFAULT_SCHEMA_FLAVORS = {'postgresql': 'public', 'postgis': 'public', 'timescaledb': 'public', 'timescaledb-ha': 'public', 'citus': 'public', 'cockroachdb': 'public', 'mysql': 'mysql', 'mariadb': 'mysql', 'mssql': 'dbo'}
OMIT_NULLSFIRST_FLAVORS = {'mariadb', 'mysql', 'mssql'}
SINGLE_ALTER_TABLE_FLAVORS = {'geopackage', 'duckdb', 'sqlite', 'mssql', 'oracle'}
NO_CTE_FLAVORS = {'mariadb', 'mysql'}
NO_SELECT_INTO_FLAVORS = {'geopackage', 'duckdb', 'mysql', 'mariadb', 'sqlite', 'oracle'}
def clean(substring: str) -> None:
669def clean(substring: str) -> None:
670    """
671    Ensure a substring is clean enough to be inserted into a SQL query.
672    Raises an exception when banned words are used.
673    """
674    from meerschaum.utils.warnings import error
675    banned_symbols = [';', '--', 'drop ',]
676    for symbol in banned_symbols:
677        if symbol in str(substring).lower():
678            error(f"Invalid string: '{substring}'")

Ensure a substring is clean enough to be inserted into a SQL query. Raises an exception when banned words are used.

def dateadd_str( flavor: str = 'postgresql', datepart: str = 'day', number: Union[int, float] = 0, begin: Union[str, datetime.datetime, int] = 'now', db_type: Optional[str] = None) -> str:
681def dateadd_str(
682    flavor: str = 'postgresql',
683    datepart: str = 'day',
684    number: Union[int, float] = 0,
685    begin: Union[str, datetime, int] = 'now',
686    db_type: Optional[str] = None,
687) -> str:
688    """
689    Generate a `DATEADD` clause depending on database flavor.
690
691    Parameters
692    ----------
693    flavor: str, default `'postgresql'`
694        SQL database flavor, e.g. `'postgresql'`, `'sqlite'`.
695
696        Currently supported flavors:
697
698        - `'postgresql'`
699        - `'postgis'`
700        - `'timescaledb'`
701        - `'timescaledb-ha'`
702        - `'citus'`
703        - `'cockroachdb'`
704        - `'duckdb'`
705        - `'mssql'`
706        - `'mysql'`
707        - `'mariadb'`
708        - `'sqlite'`
709        - `'geopackage'`
710        - `'oracle'`
711
712    datepart: str, default `'day'`
713        Which part of the date to modify. Supported values:
714
715        - `'year'`
716        - `'month'`
717        - `'day'`
718        - `'hour'`
719        - `'minute'`
720        - `'second'`
721
722    number: Union[int, float], default `0`
723        How many units to add to the date part.
724
725    begin: Union[str, datetime], default `'now'`
726        Base datetime to which to add dateparts.
727
728    db_type: Optional[str], default None
729        If provided, cast the datetime string as the type.
730        Otherwise, infer this from the input datetime value.
731
732    Returns
733    -------
734    The appropriate `DATEADD` string for the corresponding database flavor.
735
736    Examples
737    --------
738    >>> dateadd_str(
739    ...     flavor='mssql',
740    ...     begin=datetime(2022, 1, 1, 0, 0),
741    ...     number=1,
742    ... )
743    "DATEADD(day, 1, CAST('2022-01-01 00:00:00' AS DATETIME2))"
744    >>> dateadd_str(
745    ...     flavor='postgresql',
746    ...     begin=datetime(2022, 1, 1, 0, 0),
747    ...     number=1,
748    ... )
749    "CAST('2022-01-01 00:00:00' AS TIMESTAMP) + INTERVAL '1 day'"
750
751    """
752    from meerschaum.utils.packages import attempt_import
753    from meerschaum.utils.dtypes.sql import get_db_type_from_pd_type, get_pd_type_from_db_type
754    dateutil_parser = attempt_import('dateutil.parser')
755    if 'int' in str(type(begin)).lower():
756        num_str = str(begin)
757        if number is not None and number != 0:
758            num_str += (
759                f' + {number}'
760                if number > 0
761                else f" - {number * -1}"
762            )
763        return num_str
764    if not begin:
765        return ''
766
767    _original_begin = begin
768    begin_time = None
769    ### Sanity check: make sure `begin` is a valid datetime before we inject anything.
770    if not isinstance(begin, datetime):
771        try:
772            begin_time = dateutil_parser.parse(begin)
773        except Exception:
774            begin_time = None
775    else:
776        begin_time = begin
777
778    ### Unable to parse into a datetime.
779    if begin_time is None:
780        ### Throw an error if banned symbols are included in the `begin` string.
781        clean(str(begin))
782    ### If begin is a valid datetime, wrap it in quotes.
783    else:
784        if isinstance(begin, datetime) and begin.tzinfo is not None:
785            begin = begin.astimezone(timezone.utc)
786        begin = (
787            f"'{begin.replace(tzinfo=None)}'"
788            if isinstance(begin, datetime) and flavor in TIMEZONE_NAIVE_FLAVORS
789            else f"'{begin}'"
790        )
791
792    dt_is_utc = (
793        begin_time.tzinfo is not None
794        if begin_time is not None
795        else ('+' in str(begin) or '-' in str(begin).split(':', maxsplit=1)[-1])
796    )
797    if db_type:
798        db_type_is_utc = 'utc' in get_pd_type_from_db_type(db_type).lower()
799        dt_is_utc = dt_is_utc or db_type_is_utc
800    db_type = db_type or get_db_type_from_pd_type(
801        ('datetime64[ns, UTC]' if dt_is_utc else 'datetime64[ns]'),
802        flavor=flavor,
803    )
804
805    da = ""
806    if flavor in (
807        'postgresql',
808        'postgis',
809        'timescaledb',
810        'timescaledb-ha',
811        'cockroachdb',
812        'citus',
813    ):
814        begin = (
815            f"CAST({begin} AS {db_type})" if begin != 'now'
816            else f"CAST(NOW() AT TIME ZONE 'utc' AS {db_type})"
817        )
818        if dt_is_utc:
819            begin += " AT TIME ZONE 'UTC'"
820        da = begin + (f" + INTERVAL '{number} {datepart}'" if number != 0 else '')
821
822    elif flavor == 'duckdb':
823        begin = f"CAST({begin} AS {db_type})" if begin != 'now' else 'NOW()'
824        if dt_is_utc:
825            begin += " AT TIME ZONE 'UTC'"
826        da = begin + (f" + INTERVAL '{number} {datepart}'" if number != 0 else '')
827
828    elif flavor in ('mssql',):
829        if begin_time and begin_time.microsecond != 0 and not dt_is_utc:
830            begin = begin[:-4] + "'"
831        begin = f"CAST({begin} AS {db_type})" if begin != 'now' else 'GETUTCDATE()'
832        if dt_is_utc:
833            begin += " AT TIME ZONE 'UTC'"
834        da = f"DATEADD({datepart}, {number}, {begin})" if number != 0 else begin
835
836    elif flavor in ('mysql', 'mariadb'):
837        begin = (
838            f"CAST({begin} AS DATETIME(6))"
839            if begin != 'now'
840            else 'UTC_TIMESTAMP(6)'
841        )
842        da = (f"DATE_ADD({begin}, INTERVAL {number} {datepart})" if number != 0 else begin)
843
844    elif flavor in ('sqlite', 'geopackage'):
845        da = f"datetime({begin}, '{number} {datepart}')"
846
847    elif flavor == 'oracle':
848        if begin == 'now':
849            begin = str(
850                datetime.now(timezone.utc).replace(tzinfo=None).strftime(r'%Y:%m:%d %M:%S.%f')
851            )
852        elif begin_time:
853            begin = str(begin_time.strftime(r'%Y-%m-%d %H:%M:%S.%f'))
854        dt_format = 'YYYY-MM-DD HH24:MI:SS.FF'
855        _begin = f"'{begin}'" if begin_time else begin
856        da = (
857            (f"TO_TIMESTAMP({_begin}, '{dt_format}')" if begin_time else _begin)
858            + (f" + INTERVAL '{number}' {datepart}" if number != 0 else "")
859        )
860    return da

Generate a DATEADD clause depending on database flavor.

Parameters
  • flavor (str, default 'postgresql'): SQL database flavor, e.g. 'postgresql', 'sqlite'.

    Currently supported flavors:

    • 'postgresql'
    • 'postgis'
    • 'timescaledb'
    • 'timescaledb-ha'
    • 'citus'
    • 'cockroachdb'
    • 'duckdb'
    • 'mssql'
    • 'mysql'
    • 'mariadb'
    • 'sqlite'
    • 'geopackage'
    • 'oracle'
  • datepart (str, default 'day'): Which part of the date to modify. Supported values:

    • 'year'
    • 'month'
    • 'day'
    • 'hour'
    • 'minute'
    • 'second'
  • number (Union[int, float], default 0): How many units to add to the date part.
  • begin (Union[str, datetime], default 'now'): Base datetime to which to add dateparts.
  • db_type (Optional[str], default None): If provided, cast the datetime string as the type. Otherwise, infer this from the input datetime value.
Returns
  • The appropriate DATEADD string for the corresponding database flavor.
Examples
>>> dateadd_str(
...     flavor='mssql',
...     begin=datetime(2022, 1, 1, 0, 0),
...     number=1,
... )
"DATEADD(day, 1, CAST('2022-01-01 00:00:00' AS DATETIME2))"
>>> dateadd_str(
...     flavor='postgresql',
...     begin=datetime(2022, 1, 1, 0, 0),
...     number=1,
... )
"CAST('2022-01-01 00:00:00' AS TIMESTAMP) + INTERVAL '1 day'"
def test_connection(self, **kw: Any) -> Optional[bool]:
863def test_connection(
864    self,
865    **kw: Any
866) -> Union[bool, None]:
867    """
868    Test if a successful connection to the database may be made.
869
870    Parameters
871    ----------
872    **kw:
873        The keyword arguments are passed to `meerschaum.connectors.poll.retry_connect`.
874
875    Returns
876    -------
877    `True` if a connection is made, otherwise `False` or `None` in case of failure.
878
879    """
880    import warnings
881    from meerschaum.connectors.poll import retry_connect
882    _default_kw = {'max_retries': 1, 'retry_wait': 0, 'warn': False, 'connector': self}
883    _default_kw.update(kw)
884    with warnings.catch_warnings():
885        warnings.filterwarnings('ignore', 'Could not')
886        try:
887            return retry_connect(**_default_kw)
888        except Exception:
889            return False

Test if a successful connection to the database may be made.

Parameters
Returns
  • True if a connection is made, otherwise False or None in case of failure.
def get_distinct_col_count( col: str, query: str, connector: Optional[meerschaum.connectors.SQLConnector] = None, debug: bool = False) -> Optional[int]:
892def get_distinct_col_count(
893    col: str,
894    query: str,
895    connector: Optional[mrsm.connectors.sql.SQLConnector] = None,
896    debug: bool = False
897) -> Optional[int]:
898    """
899    Returns the number of distinct items in a column of a SQL query.
900
901    Parameters
902    ----------
903    col: str:
904        The column in the query to count.
905
906    query: str:
907        The SQL query to count from.
908
909    connector: Optional[mrsm.connectors.sql.SQLConnector], default None:
910        The SQLConnector to execute the query.
911
912    debug: bool, default False:
913        Verbosity toggle.
914
915    Returns
916    -------
917    An `int` of the number of columns in the query or `None` if the query fails.
918
919    """
920    if connector is None:
921        connector = mrsm.get_connector('sql')
922
923    _col_name = sql_item_name(col, connector.flavor, None)
924
925    _meta_query = (
926        f"""
927        WITH src AS ( {query} ),
928        dist AS ( SELECT DISTINCT {_col_name} FROM src )
929        SELECT COUNT(*) FROM dist"""
930    ) if connector.flavor not in ('mysql', 'mariadb') else (
931        f"""
932        SELECT COUNT(*)
933        FROM (
934            SELECT DISTINCT {_col_name}
935            FROM ({query}) AS src
936        ) AS dist"""
937    )
938
939    result = connector.value(_meta_query, debug=debug)
940    try:
941        return int(result)
942    except Exception:
943        return None

Returns the number of distinct items in a column of a SQL query.

Parameters
  • col (str:): The column in the query to count.
  • query (str:): The SQL query to count from.
  • connector (Optional[mrsm.connectors.sql.SQLConnector], default None:): The SQLConnector to execute the query.
  • debug (bool, default False:): Verbosity toggle.
Returns
  • An int of the number of columns in the query or None if the query fails.
def sql_item_name(item: str, flavor: str, schema: Optional[str] = None) -> str:
 946def sql_item_name(item: str, flavor: str, schema: Optional[str] = None) -> str:
 947    """
 948    Parse SQL items depending on the flavor.
 949
 950    Parameters
 951    ----------
 952    item: str
 953        The database item (table, view, etc.) in need of quotes.
 954        
 955    flavor: str
 956        The database flavor (`'postgresql'`, `'mssql'`, `'sqllite'`, etc.).
 957
 958    schema: Optional[str], default None
 959        If provided, prefix the table name with the schema.
 960
 961    Returns
 962    -------
 963    A `str` which contains the input `item` wrapped in the corresponding escape characters.
 964    
 965    Examples
 966    --------
 967    >>> sql_item_name('table', 'sqlite')
 968    '"table"'
 969    >>> sql_item_name('table', 'mssql')
 970    "[table]"
 971    >>> sql_item_name('table', 'postgresql', schema='abc')
 972    '"abc"."table"'
 973
 974    """
 975    truncated_item = truncate_item_name(str(item), flavor)
 976    if flavor == 'oracle':
 977        truncated_item = pg_capital(truncated_item, quote_capitals=True)
 978        ### NOTE: System-reserved words must be quoted.
 979        if truncated_item.lower() in (
 980            'float', 'varchar', 'nvarchar', 'clob',
 981            'boolean', 'integer', 'table', 'row', 'date',
 982        ):
 983            wrappers = ('"', '"')
 984        else:
 985            wrappers = ('', '')
 986    else:
 987        wrappers = table_wrappers.get(flavor, table_wrappers['default'])
 988
 989    ### NOTE: SQLite does not support schemas.
 990    if flavor in ('sqlite', 'geopackage'):
 991        schema = None
 992    elif flavor == 'mssql' and str(item).startswith('#'):
 993        schema = None
 994
 995    schema_prefix = (
 996        (wrappers[0] + schema + wrappers[1] + '.')
 997        if schema is not None
 998        else ''
 999    )
1000
1001    return schema_prefix + wrappers[0] + truncated_item + wrappers[1]

Parse SQL items depending on the flavor.

Parameters
  • item (str): The database item (table, view, etc.) in need of quotes.
  • flavor (str): The database flavor ('postgresql', 'mssql', 'sqllite', etc.).
  • schema (Optional[str], default None): If provided, prefix the table name with the schema.
Returns
  • A str which contains the input item wrapped in the corresponding escape characters.
Examples
>>> sql_item_name('table', 'sqlite')
'"table"'
>>> sql_item_name('table', 'mssql')
"[table]"
>>> sql_item_name('table', 'postgresql', schema='abc')
'"abc"."table"'
def pg_capital(s: str, quote_capitals: bool = True) -> str:
1004def pg_capital(s: str, quote_capitals: bool = True) -> str:
1005    """
1006    If string contains a capital letter, wrap it in double quotes.
1007    
1008    Parameters
1009    ----------
1010    s: str
1011        The string to be escaped.
1012
1013    quote_capitals: bool, default True
1014        If `False`, do not quote strings with contain only a mix of capital and lower-case letters.
1015
1016    Returns
1017    -------
1018    The input string wrapped in quotes only if it needs them.
1019
1020    Examples
1021    --------
1022    >>> pg_capital("My Table")
1023    '"My Table"'
1024    >>> pg_capital('my_table')
1025    'my_table'
1026
1027    """
1028    if s.startswith('"') and s.endswith('"'):
1029        return s
1030
1031    s = s.replace('"', '')
1032
1033    needs_quotes = s.startswith('_')
1034    if not needs_quotes:
1035        for c in s:
1036            if c == '_':
1037                continue
1038
1039            if not c.isalnum() or (quote_capitals and c.isupper()):
1040                needs_quotes = True
1041                break
1042
1043    if needs_quotes:
1044        return '"' + s + '"'
1045
1046    return s

If string contains a capital letter, wrap it in double quotes.

Parameters
  • s (str): The string to be escaped.
  • quote_capitals (bool, default True): If False, do not quote strings with contain only a mix of capital and lower-case letters.
Returns
  • The input string wrapped in quotes only if it needs them.
Examples
>>> pg_capital("My Table")
'"My Table"'
>>> pg_capital('my_table')
'my_table'
def oracle_capital(s: str) -> str:
1049def oracle_capital(s: str) -> str:
1050    """
1051    Capitalize the string of an item on an Oracle database.
1052    """
1053    return s

Capitalize the string of an item on an Oracle database.

def truncate_item_name(item: str, flavor: str) -> str:
1056def truncate_item_name(item: str, flavor: str) -> str:
1057    """
1058    Truncate item names to stay within the database flavor's character limit.
1059
1060    Parameters
1061    ----------
1062    item: str
1063        The database item being referenced. This string is the "canonical" name internally.
1064
1065    flavor: str
1066        The flavor of the database on which `item` resides.
1067
1068    Returns
1069    -------
1070    The truncated string.
1071    """
1072    from meerschaum.utils.misc import truncate_string_sections
1073    return truncate_string_sections(
1074        item, max_len=max_name_lens.get(flavor, max_name_lens['default'])
1075    )

Truncate item names to stay within the database flavor's character limit.

Parameters
  • item (str): The database item being referenced. This string is the "canonical" name internally.
  • flavor (str): The flavor of the database on which item resides.
Returns
  • The truncated string.
def build_where( params: Dict[str, Any], connector: Optional[meerschaum.connectors.SQLConnector] = None, with_where: bool = True, flavor: str = 'postgresql') -> str:
1078def build_where(
1079    params: Dict[str, Any],
1080    connector: Optional[mrsm.connectors.sql.SQLConnector] = None,
1081    with_where: bool = True,
1082    flavor: str = 'postgresql',
1083) -> str:
1084    """
1085    Build the `WHERE` clause based on the input criteria.
1086
1087    Parameters
1088    ----------
1089    params: Dict[str, Any]:
1090        The keywords dictionary to convert into a WHERE clause.
1091        If a value is a string which begins with an underscore, negate that value
1092        (e.g. `!=` instead of `=` or `NOT IN` instead of `IN`).
1093        A value of `_None` will be interpreted as `IS NOT NULL`.
1094
1095    connector: Optional[meerschaum.connectors.sql.SQLConnector], default None:
1096        The Meerschaum SQLConnector that will be executing the query.
1097        The connector is used to extract the SQL dialect.
1098
1099    with_where: bool, default True:
1100        If `True`, include the leading `'WHERE'` string.
1101
1102    flavor: str, default 'postgresql'
1103        If `connector` is `None`, fall back to this flavor.
1104
1105    Returns
1106    -------
1107    A `str` of the `WHERE` clause from the input `params` dictionary for the connector's flavor.
1108
1109    Examples
1110    --------
1111    ```
1112    >>> print(build_where({'foo': [1, 2, 3]}))
1113    
1114    WHERE
1115        "foo" IN ('1', '2', '3')
1116    ```
1117    """
1118    import json
1119    from meerschaum._internal.static import STATIC_CONFIG
1120    from meerschaum.utils.warnings import warn
1121    from meerschaum.utils.dtypes import value_is_null, none_if_null
1122    negation_prefix = STATIC_CONFIG['system']['fetch_pipes_keys']['negation_prefix']
1123    try:
1124        params_json = json.dumps(params)
1125    except Exception:
1126        params_json = str(params)
1127    bad_words = ['drop ', '--', ';']
1128    for word in bad_words:
1129        if word in params_json.lower():
1130            warn("Aborting build_where() due to possible SQL injection.")
1131            return ''
1132
1133    query_flavor = getattr(connector, 'flavor', flavor) if connector is not None else flavor
1134    where = ""
1135    leading_and = "\n    AND "
1136    for key, value in params.items():
1137        _key = sql_item_name(key, query_flavor, None)
1138        ### search across a list (i.e. IN syntax)
1139        if isinstance(value, Iterable) and not isinstance(value, (dict, str)):
1140            includes = [
1141                none_if_null(item)
1142                for item in value
1143                if not str(item).startswith(negation_prefix)
1144            ]
1145            null_includes = [item for item in includes if item is None]
1146            not_null_includes = [item for item in includes if item is not None]
1147            excludes = [
1148                none_if_null(str(item)[len(negation_prefix):])
1149                for item in value
1150                if str(item).startswith(negation_prefix)
1151            ]
1152            null_excludes = [item for item in excludes if item is None]
1153            not_null_excludes = [item for item in excludes if item is not None]
1154
1155            if includes:
1156                where += f"{leading_and}("
1157            if not_null_includes:
1158                where += f"{_key} IN ("
1159                for item in not_null_includes:
1160                    quoted_item = str(item).replace("'", "''")
1161                    where += f"'{quoted_item}', "
1162                where = where[:-2] + ")"
1163            if null_includes:
1164                where += ("\n    OR " if not_null_includes else "") + f"{_key} IS NULL"
1165            if includes:
1166                where += ")"
1167
1168            if excludes:
1169                where += f"{leading_and}("
1170            if not_null_excludes:
1171                where += f"{_key} NOT IN ("
1172                for item in not_null_excludes:
1173                    quoted_item = str(item).replace("'", "''")
1174                    where += f"'{quoted_item}', "
1175                where = where[:-2] + ")"
1176            if null_excludes:
1177                where += ("\n    AND " if not_null_excludes else "") + f"{_key} IS NOT NULL"
1178            if excludes:
1179                where += ")"
1180
1181            continue
1182
1183        ### search a dictionary
1184        elif isinstance(value, dict):
1185            import json
1186            where += (f"{leading_and}CAST({_key} AS TEXT) = '" + json.dumps(value) + "'")
1187            continue
1188
1189        eq_sign = '='
1190        is_null = 'IS NULL'
1191        if value_is_null(str(value).lstrip(negation_prefix)):
1192            value = (
1193                (negation_prefix + 'None')
1194                if str(value).startswith(negation_prefix)
1195                else None
1196            )
1197        if str(value).startswith(negation_prefix):
1198            value = str(value)[len(negation_prefix):]
1199            eq_sign = '!='
1200            if value_is_null(value):
1201                value = None
1202                is_null = 'IS NOT NULL'
1203        quoted_value = str(value).replace("'", "''")
1204        where += (
1205            f"{leading_and}{_key} "
1206            + (is_null if value is None else f"{eq_sign} '{quoted_value}'")
1207        )
1208
1209    if len(where) > 1:
1210        where = ("\nWHERE\n    " if with_where else '') + where[len(leading_and):]
1211    return where

Build the WHERE clause based on the input criteria.

Parameters
  • params (Dict[str, Any]:): The keywords dictionary to convert into a WHERE clause. If a value is a string which begins with an underscore, negate that value (e.g. != instead of = or NOT IN instead of IN). A value of _None will be interpreted as IS NOT NULL.
  • connector (Optional[meerschaum.connectors.sql.SQLConnector], default None:): The Meerschaum SQLConnector that will be executing the query. The connector is used to extract the SQL dialect.
  • with_where (bool, default True:): If True, include the leading 'WHERE' string.
  • flavor (str, default 'postgresql'): If connector is None, fall back to this flavor.
Returns
  • A str of the WHERE clause from the input params dictionary for the connector's flavor.
Examples
>>> print(build_where({'foo': [1, 2, 3]}))

WHERE
    "foo" IN ('1', '2', '3')
def table_exists( table: str, connector: meerschaum.connectors.SQLConnector, schema: Optional[str] = None, debug: bool = False) -> bool:
1214def table_exists(
1215    table: str,
1216    connector: mrsm.connectors.sql.SQLConnector,
1217    schema: Optional[str] = None,
1218    debug: bool = False,
1219) -> bool:
1220    """Check if a table exists.
1221
1222    Parameters
1223    ----------
1224    table: str:
1225        The name of the table in question.
1226
1227    connector: mrsm.connectors.sql.SQLConnector
1228        The connector to the database which holds the table.
1229
1230    schema: Optional[str], default None
1231        Optionally specify the table schema.
1232        Defaults to `connector.schema`.
1233
1234    debug: bool, default False :
1235        Verbosity toggle.
1236
1237    Returns
1238    -------
1239    A `bool` indicating whether or not the table exists on the database.
1240    """
1241    sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False)
1242    schema = schema or connector.schema
1243    insp = sqlalchemy.inspect(connector.engine)
1244    truncated_table_name = truncate_item_name(str(table), connector.flavor)
1245    return insp.has_table(truncated_table_name, schema=schema)

Check if a table exists.

Parameters
  • table (str:): The name of the table in question.
  • connector (mrsm.connectors.sql.SQLConnector): The connector to the database which holds the table.
  • schema (Optional[str], default None): Optionally specify the table schema. Defaults to connector.schema.
  • debug (bool, default False :): Verbosity toggle.
Returns
  • A bool indicating whether or not the table exists on the database.
def get_sqlalchemy_table( table: str, connector: Optional[meerschaum.connectors.SQLConnector] = None, schema: Optional[str] = None, refresh: bool = False, debug: bool = False) -> "Union['sqlalchemy.Table', None]":
1248def get_sqlalchemy_table(
1249    table: str,
1250    connector: Optional[mrsm.connectors.sql.SQLConnector] = None,
1251    schema: Optional[str] = None,
1252    refresh: bool = False,
1253    debug: bool = False,
1254) -> Union['sqlalchemy.Table', None]:
1255    """
1256    Construct a SQLAlchemy table from its name.
1257
1258    Parameters
1259    ----------
1260    table: str
1261        The name of the table on the database. Does not need to be escaped.
1262
1263    connector: Optional[meerschaum.connectors.sql.SQLConnector], default None:
1264        The connector to the database which holds the table. 
1265
1266    schema: Optional[str], default None
1267        Specify on which schema the table resides.
1268        Defaults to the schema set in `connector`.
1269
1270    refresh: bool, default False
1271        If `True`, rebuild the cached table object.
1272
1273    debug: bool, default False:
1274        Verbosity toggle.
1275
1276    Returns
1277    -------
1278    A `sqlalchemy.Table` object for the table.
1279
1280    """
1281    if connector is None:
1282        from meerschaum import get_connector
1283        connector = get_connector('sql')
1284
1285    if connector.flavor == 'duckdb':
1286        return None
1287
1288    from meerschaum.connectors.sql.tables import get_tables
1289    from meerschaum.utils.packages import attempt_import
1290    from meerschaum.utils.warnings import warn
1291    if refresh:
1292        connector.metadata.clear()
1293    tables = get_tables(mrsm_instance=connector, debug=debug, create=False)
1294    sqlalchemy = attempt_import('sqlalchemy', lazy=False)
1295    truncated_table_name = truncate_item_name(str(table), connector.flavor)
1296    table_kwargs = {
1297        'autoload_with': connector.engine,
1298    }
1299    if schema:
1300        table_kwargs['schema'] = schema
1301
1302    if refresh or truncated_table_name not in tables:
1303        try:
1304            tables[truncated_table_name] = sqlalchemy.Table(
1305                truncated_table_name,
1306                connector.metadata,
1307                **table_kwargs
1308            )
1309        except sqlalchemy.exc.NoSuchTableError:
1310            warn(f"Table '{truncated_table_name}' does not exist in '{connector}'.")
1311            return None
1312    return tables[truncated_table_name]

Construct a SQLAlchemy table from its name.

Parameters
  • table (str): The name of the table on the database. Does not need to be escaped.
  • connector (Optional[meerschaum.connectors.sql.SQLConnector], default None:): The connector to the database which holds the table.
  • schema (Optional[str], default None): Specify on which schema the table resides. Defaults to the schema set in connector.
  • refresh (bool, default False): If True, rebuild the cached table object.
  • debug (bool, default False:): Verbosity toggle.
Returns
  • A sqlalchemy.Table object for the table.
def get_table_cols_types( table: str, connectable: "Union['mrsm.connectors.sql.SQLConnector', 'sqlalchemy.orm.session.Session', 'sqlalchemy.engine.base.Engine']", flavor: Optional[str] = None, schema: Optional[str] = None, database: Optional[str] = None, debug: bool = False) -> Dict[str, str]:
1315def get_table_cols_types(
1316    table: str,
1317    connectable: Union[
1318        'mrsm.connectors.sql.SQLConnector',
1319        'sqlalchemy.orm.session.Session',
1320        'sqlalchemy.engine.base.Engine'
1321    ],
1322    flavor: Optional[str] = None,
1323    schema: Optional[str] = None,
1324    database: Optional[str] = None,
1325    debug: bool = False,
1326) -> Dict[str, str]:
1327    """
1328    Return a dictionary mapping a table's columns to data types.
1329    This is useful for inspecting tables creating during a not-yet-committed session.
1330
1331    NOTE: This may return incorrect columns if the schema is not explicitly stated.
1332        Use this function if you are confident the table name is unique or if you have
1333        and explicit schema.
1334        To use the configured schema, get the columns from `get_sqlalchemy_table()` instead.
1335
1336    Parameters
1337    ----------
1338    table: str
1339        The name of the table (unquoted).
1340
1341    connectable: Union[
1342        'mrsm.connectors.sql.SQLConnector',
1343        'sqlalchemy.orm.session.Session',
1344        'sqlalchemy.engine.base.Engine'
1345    ]
1346        The connection object used to fetch the columns and types.
1347
1348    flavor: Optional[str], default None
1349        The database dialect flavor to use for the query.
1350        If omitted, default to `connectable.flavor`.
1351
1352    schema: Optional[str], default None
1353        If provided, restrict the query to this schema.
1354
1355    database: Optional[str]. default None
1356        If provided, restrict the query to this database.
1357
1358    Returns
1359    -------
1360    A dictionary mapping column names to data types.
1361    """
1362    import textwrap
1363    from meerschaum.connectors import SQLConnector
1364    sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False)
1365    flavor = flavor or getattr(connectable, 'flavor', None)
1366    if not flavor:
1367        raise ValueError("Please provide a database flavor.")
1368    if flavor == 'duckdb' and not isinstance(connectable, SQLConnector):
1369        raise ValueError("You must provide a SQLConnector when using DuckDB.")
1370    if flavor in NO_SCHEMA_FLAVORS:
1371        schema = None
1372    if schema is None:
1373        schema = DEFAULT_SCHEMA_FLAVORS.get(flavor, None)
1374    if flavor in ('sqlite', 'duckdb', 'oracle', 'geopackage'):
1375        database = None
1376    table_trunc = truncate_item_name(table, flavor=flavor)
1377    table_lower = table.lower()
1378    table_upper = table.upper()
1379    table_lower_trunc = truncate_item_name(table_lower, flavor=flavor)
1380    table_upper_trunc = truncate_item_name(table_upper, flavor=flavor)
1381    db_prefix = (
1382        "tempdb."
1383        if flavor == 'mssql' and table.startswith('#')
1384        else ""
1385    )
1386
1387    cols_types_query = sqlalchemy.text(
1388        textwrap.dedent(columns_types_queries.get(
1389            flavor,
1390            columns_types_queries['default']
1391        ).format(
1392            table=table,
1393            table_trunc=table_trunc,
1394            table_lower=table_lower,
1395            table_lower_trunc=table_lower_trunc,
1396            table_upper=table_upper,
1397            table_upper_trunc=table_upper_trunc,
1398            db_prefix=db_prefix,
1399        )).lstrip().rstrip()
1400    )
1401
1402    cols = ['database', 'schema', 'table', 'column', 'type', 'numeric_precision', 'numeric_scale']
1403    result_cols_ix = dict(enumerate(cols))
1404
1405    debug_kwargs = {'debug': debug} if isinstance(connectable, SQLConnector) else {}
1406    if not debug_kwargs and debug:
1407        dprint(cols_types_query)
1408
1409    try:
1410        result_rows = (
1411            [
1412                row
1413                for row in connectable.execute(cols_types_query, **debug_kwargs).fetchall()
1414            ]
1415            if flavor != 'duckdb'
1416            else [
1417                tuple([doc[col] for col in cols])
1418                for doc in connectable.read(cols_types_query, debug=debug).to_dict(orient='records')
1419            ]
1420        )
1421        cols_types_docs = [
1422            {
1423                result_cols_ix[i]: val
1424                for i, val in enumerate(row)
1425            }
1426            for row in result_rows
1427        ]
1428        cols_types_docs_filtered = [
1429            doc
1430            for doc in cols_types_docs
1431            if (
1432                (
1433                    not schema
1434                    or doc['schema'] == schema
1435                )
1436                and
1437                (
1438                    not database
1439                    or doc['database'] == database
1440                )
1441            )
1442        ]
1443
1444        ### NOTE: This may return incorrect columns if the schema is not explicitly stated.
1445        if cols_types_docs and not cols_types_docs_filtered:
1446            cols_types_docs_filtered = cols_types_docs
1447
1448        ### NOTE: Check for PostGIS GEOMETRY columns.
1449        geometry_cols_types = {}
1450        user_defined_cols = [
1451            doc
1452            for doc in cols_types_docs_filtered
1453            if str(doc.get('type', None)).upper() == 'USER-DEFINED'
1454        ]
1455        if user_defined_cols:
1456            geometry_cols_types.update(
1457                get_postgis_geo_columns_types(
1458                    connectable,
1459                    table,
1460                    schema=schema,
1461                    debug=debug,
1462                )
1463            )
1464
1465        cols_types = {
1466            (
1467                doc['column']
1468                if flavor != 'oracle' else (
1469                    (
1470                        doc['column'].lower()
1471                        if (doc['column'].isupper() and doc['column'].replace('_', '').isalpha())
1472                        else doc['column']
1473                    )
1474                )
1475            ): doc['type'].upper() + (
1476                f'({precision},{scale})'
1477                if (
1478                    (precision := doc.get('numeric_precision', None))
1479                     and
1480                    (scale := doc.get('numeric_scale', None))
1481                )
1482                else ''
1483            )
1484            for doc in cols_types_docs_filtered
1485        }
1486        cols_types.update(geometry_cols_types)
1487        return cols_types
1488    except Exception as e:
1489        warn(f"Failed to fetch columns for table '{table}':\n{e}")
1490        return {}

Return a dictionary mapping a table's columns to data types. This is useful for inspecting tables creating during a not-yet-committed session.

NOTE: This may return incorrect columns if the schema is not explicitly stated. Use this function if you are confident the table name is unique or if you have and explicit schema. To use the configured schema, get the columns from get_sqlalchemy_table() instead.

Parameters
  • table (str): The name of the table (unquoted).
  • connectable (Union[): 'mrsm.connectors.sql.SQLConnector', 'sqlalchemy.orm.session.Session', 'sqlalchemy.engine.base.Engine'
  • ]: The connection object used to fetch the columns and types.
  • flavor (Optional[str], default None): The database dialect flavor to use for the query. If omitted, default to connectable.flavor.
  • schema (Optional[str], default None): If provided, restrict the query to this schema.
  • database (Optional[str]. default None): If provided, restrict the query to this database.
Returns
  • A dictionary mapping column names to data types.
def get_table_cols_indices( table: str, connectable: "Union['mrsm.connectors.sql.SQLConnector', 'sqlalchemy.orm.session.Session', 'sqlalchemy.engine.base.Engine']", flavor: Optional[str] = None, schema: Optional[str] = None, database: Optional[str] = None, debug: bool = False) -> Dict[str, List[str]]:
1493def get_table_cols_indices(
1494    table: str,
1495    connectable: Union[
1496        'mrsm.connectors.sql.SQLConnector',
1497        'sqlalchemy.orm.session.Session',
1498        'sqlalchemy.engine.base.Engine'
1499    ],
1500    flavor: Optional[str] = None,
1501    schema: Optional[str] = None,
1502    database: Optional[str] = None,
1503    debug: bool = False,
1504) -> Dict[str, List[str]]:
1505    """
1506    Return a dictionary mapping a table's columns to lists of indices.
1507    This is useful for inspecting tables creating during a not-yet-committed session.
1508
1509    NOTE: This may return incorrect columns if the schema is not explicitly stated.
1510        Use this function if you are confident the table name is unique or if you have
1511        and explicit schema.
1512        To use the configured schema, get the columns from `get_sqlalchemy_table()` instead.
1513
1514    Parameters
1515    ----------
1516    table: str
1517        The name of the table (unquoted).
1518
1519    connectable: Union[
1520        'mrsm.connectors.sql.SQLConnector',
1521        'sqlalchemy.orm.session.Session',
1522        'sqlalchemy.engine.base.Engine'
1523    ]
1524        The connection object used to fetch the columns and types.
1525
1526    flavor: Optional[str], default None
1527        The database dialect flavor to use for the query.
1528        If omitted, default to `connectable.flavor`.
1529
1530    schema: Optional[str], default None
1531        If provided, restrict the query to this schema.
1532
1533    database: Optional[str]. default None
1534        If provided, restrict the query to this database.
1535
1536    Returns
1537    -------
1538    A dictionary mapping column names to a list of indices.
1539    """
1540    import textwrap
1541    from collections import defaultdict
1542    from meerschaum.connectors import SQLConnector
1543    sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False)
1544    flavor = flavor or getattr(connectable, 'flavor', None)
1545    if not flavor:
1546        raise ValueError("Please provide a database flavor.")
1547    if flavor == 'duckdb' and not isinstance(connectable, SQLConnector):
1548        raise ValueError("You must provide a SQLConnector when using DuckDB.")
1549    if flavor in NO_SCHEMA_FLAVORS:
1550        schema = None
1551    if schema is None:
1552        schema = DEFAULT_SCHEMA_FLAVORS.get(flavor, None)
1553    if flavor in ('sqlite', 'duckdb', 'oracle', 'geopackage'):
1554        database = None
1555    table_trunc = truncate_item_name(table, flavor=flavor)
1556    table_lower = table.lower()
1557    table_upper = table.upper()
1558    table_lower_trunc = truncate_item_name(table_lower, flavor=flavor)
1559    table_upper_trunc = truncate_item_name(table_upper, flavor=flavor)
1560    db_prefix = (
1561        "tempdb."
1562        if flavor == 'mssql' and table.startswith('#')
1563        else ""
1564    )
1565
1566    cols_indices_query = sqlalchemy.text(
1567        textwrap.dedent(columns_indices_queries.get(
1568            flavor,
1569            columns_indices_queries['default']
1570        ).format(
1571            table=table,
1572            table_trunc=table_trunc,
1573            table_lower=table_lower,
1574            table_lower_trunc=table_lower_trunc,
1575            table_upper=table_upper,
1576            table_upper_trunc=table_upper_trunc,
1577            db_prefix=db_prefix,
1578            schema=schema,
1579        )).lstrip().rstrip()
1580    )
1581
1582    cols = ['database', 'schema', 'table', 'column', 'index', 'index_type']
1583    if flavor == 'mssql':
1584        cols.append('clustered')
1585    result_cols_ix = dict(enumerate(cols))
1586
1587    debug_kwargs = {'debug': debug} if isinstance(connectable, SQLConnector) else {}
1588    if not debug_kwargs and debug:
1589        dprint(cols_indices_query)
1590
1591    try:
1592        result_rows = (
1593            [
1594                row
1595                for row in connectable.execute(cols_indices_query, **debug_kwargs).fetchall()
1596            ]
1597            if flavor != 'duckdb'
1598            else [
1599                tuple([doc[col] for col in cols])
1600                for doc in connectable.read(cols_indices_query, debug=debug).to_dict(orient='records')
1601            ]
1602        )
1603        cols_types_docs = [
1604            {
1605                result_cols_ix[i]: val
1606                for i, val in enumerate(row)
1607            }
1608            for row in result_rows
1609        ]
1610        cols_types_docs_filtered = [
1611            doc
1612            for doc in cols_types_docs
1613            if (
1614                (
1615                    not schema
1616                    or doc['schema'] == schema
1617                )
1618                and
1619                (
1620                    not database
1621                    or doc['database'] == database
1622                )
1623            )
1624        ]
1625        ### NOTE: This may return incorrect columns if the schema is not explicitly stated.
1626        if cols_types_docs and not cols_types_docs_filtered:
1627            cols_types_docs_filtered = cols_types_docs
1628
1629        cols_indices = defaultdict(lambda: [])
1630        for doc in cols_types_docs_filtered:
1631            col = (
1632                doc['column']
1633                if flavor != 'oracle'
1634                else (
1635                    doc['column'].lower()
1636                    if (doc['column'].isupper() and doc['column'].replace('_', '').isalpha())
1637                    else doc['column']
1638                )
1639            )
1640            index_doc = {
1641                'name': doc.get('index', None),
1642                'type': doc.get('index_type', None)
1643            }
1644            if flavor == 'mssql':
1645                index_doc['clustered'] = doc.get('clustered', None)
1646            cols_indices[col].append(index_doc)
1647
1648        return dict(cols_indices)
1649    except Exception as e:
1650        warn(f"Failed to fetch columns for table '{table}':\n{e}")
1651        return {}

Return a dictionary mapping a table's columns to lists of indices. This is useful for inspecting tables creating during a not-yet-committed session.

NOTE: This may return incorrect columns if the schema is not explicitly stated. Use this function if you are confident the table name is unique or if you have and explicit schema. To use the configured schema, get the columns from get_sqlalchemy_table() instead.

Parameters
  • table (str): The name of the table (unquoted).
  • connectable (Union[): 'mrsm.connectors.sql.SQLConnector', 'sqlalchemy.orm.session.Session', 'sqlalchemy.engine.base.Engine'
  • ]: The connection object used to fetch the columns and types.
  • flavor (Optional[str], default None): The database dialect flavor to use for the query. If omitted, default to connectable.flavor.
  • schema (Optional[str], default None): If provided, restrict the query to this schema.
  • database (Optional[str]. default None): If provided, restrict the query to this database.
Returns
  • A dictionary mapping column names to a list of indices.
def get_update_queries( target: str, patch: str, connectable: "Union[mrsm.connectors.sql.SQLConnector, 'sqlalchemy.orm.session.Session']", join_cols: Iterable[str], flavor: Optional[str] = None, upsert: bool = False, datetime_col: Optional[str] = None, schema: Optional[str] = None, patch_schema: Optional[str] = None, target_cols_types: Optional[Dict[str, str]] = None, patch_cols_types: Optional[Dict[str, str]] = None, identity_insert: bool = False, null_indices: bool = True, cast_columns: bool = True, debug: bool = False) -> List[str]:
1654def get_update_queries(
1655    target: str,
1656    patch: str,
1657    connectable: Union[
1658        mrsm.connectors.sql.SQLConnector,
1659        'sqlalchemy.orm.session.Session'
1660    ],
1661    join_cols: Iterable[str],
1662    flavor: Optional[str] = None,
1663    upsert: bool = False,
1664    datetime_col: Optional[str] = None,
1665    schema: Optional[str] = None,
1666    patch_schema: Optional[str] = None,
1667    target_cols_types: Optional[Dict[str, str]] = None,
1668    patch_cols_types: Optional[Dict[str, str]] = None,
1669    identity_insert: bool = False,
1670    null_indices: bool = True,
1671    cast_columns: bool = True,
1672    debug: bool = False,
1673) -> List[str]:
1674    """
1675    Build a list of `MERGE`, `UPDATE`, `DELETE`/`INSERT` queries to apply a patch to target table.
1676
1677    Parameters
1678    ----------
1679    target: str
1680        The name of the target table.
1681
1682    patch: str
1683        The name of the patch table. This should have the same shape as the target.
1684
1685    connectable: Union[meerschaum.connectors.sql.SQLConnector, sqlalchemy.orm.session.Session]
1686        The `SQLConnector` or SQLAlchemy session which will later execute the queries.
1687
1688    join_cols: List[str]
1689        The columns to use to join the patch to the target.
1690
1691    flavor: Optional[str], default None
1692        If using a SQLAlchemy session, provide the expected database flavor.
1693
1694    upsert: bool, default False
1695        If `True`, return an upsert query rather than an update.
1696
1697    datetime_col: Optional[str], default None
1698        If provided, bound the join query using this column as the datetime index.
1699        This must be present on both tables.
1700
1701    schema: Optional[str], default None
1702        If provided, use this schema when quoting the target table.
1703        Defaults to `connector.schema`.
1704
1705    patch_schema: Optional[str], default None
1706        If provided, use this schema when quoting the patch table.
1707        Defaults to `schema`.
1708
1709    target_cols_types: Optional[Dict[str, Any]], default None
1710        If provided, use these as the columns-types dictionary for the target table.
1711        Default will infer from the database context.
1712
1713    patch_cols_types: Optional[Dict[str, Any]], default None
1714        If provided, use these as the columns-types dictionary for the target table.
1715        Default will infer from the database context.
1716
1717    identity_insert: bool, default False
1718        If `True`, include `SET IDENTITY_INSERT` queries before and after the update queries.
1719        Only applies for MSSQL upserts.
1720
1721    null_indices: bool, default True
1722        If `False`, do not coalesce index columns before joining.
1723
1724    cast_columns: bool, default True
1725        If `False`, do not cast update columns to the target table types.
1726
1727    debug: bool, default False
1728        Verbosity toggle.
1729
1730    Returns
1731    -------
1732    A list of query strings to perform the update operation.
1733    """
1734    import textwrap
1735    from meerschaum.connectors import SQLConnector
1736    from meerschaum.utils.debug import dprint
1737    from meerschaum.utils.dtypes import are_dtypes_equal
1738    from meerschaum.utils.dtypes.sql import DB_FLAVORS_CAST_DTYPES, get_pd_type_from_db_type
1739    flavor = flavor or getattr(connectable, 'flavor', None)
1740    if not flavor:
1741        raise ValueError("Provide a flavor if using a SQLAlchemy session.")
1742    if (
1743        flavor in ('sqlite', 'geopackage')
1744        and isinstance(connectable, SQLConnector)
1745        and connectable.db_version < '3.33.0'
1746    ):
1747        flavor = 'sqlite_delete_insert'
1748    flavor_key = (f'{flavor}-upsert' if upsert else flavor)
1749    base_queries = UPDATE_QUERIES.get(
1750        flavor_key,
1751        UPDATE_QUERIES['default']
1752    )
1753    if not isinstance(base_queries, list):
1754        base_queries = [base_queries]
1755    schema = schema or (connectable.schema if isinstance(connectable, SQLConnector) else None)
1756    patch_schema = patch_schema or schema
1757    target_table_columns = get_table_cols_types(
1758        target,
1759        connectable,
1760        flavor=flavor,
1761        schema=schema,
1762        debug=debug,
1763    ) if not target_cols_types else target_cols_types
1764    patch_table_columns = get_table_cols_types(
1765        patch,
1766        connectable,
1767        flavor=flavor,
1768        schema=patch_schema,
1769        debug=debug,
1770    ) if not patch_cols_types else patch_cols_types
1771
1772    patch_cols_str = ', '.join(
1773        [
1774            sql_item_name(col, flavor)
1775            for col in patch_table_columns
1776        ]
1777    )
1778    patch_cols_prefixed_str = ', '.join(
1779        [
1780            'p.' + sql_item_name(col, flavor)
1781            for col in patch_table_columns
1782        ]
1783    )
1784
1785    join_cols_str = ', '.join(
1786        [
1787            sql_item_name(col, flavor)
1788            for col in join_cols
1789        ]
1790    )
1791
1792    value_cols = []
1793    join_cols_types = []
1794    if debug:
1795        dprint("target_table_columns:")
1796        mrsm.pprint(target_table_columns)
1797    for c_name, c_type in target_table_columns.items():
1798        if c_name not in patch_table_columns:
1799            continue
1800        if flavor in DB_FLAVORS_CAST_DTYPES:
1801            c_type = DB_FLAVORS_CAST_DTYPES[flavor].get(c_type.upper(), c_type)
1802        (
1803            join_cols_types
1804            if c_name in join_cols
1805            else value_cols
1806        ).append((c_name, c_type))
1807    if debug:
1808        dprint(f"value_cols: {value_cols}")
1809
1810    if not join_cols_types:
1811        return []
1812    if not value_cols and not upsert:
1813        return []
1814
1815    coalesce_join_cols_str = ', '.join(
1816        [
1817            (
1818                (
1819                    'COALESCE('
1820                    + sql_item_name(c_name, flavor)
1821                    + ', '
1822                    + get_null_replacement(c_type, flavor)
1823                    + ')'
1824                )
1825                if null_indices
1826                else sql_item_name(c_name, flavor)
1827            )
1828            for c_name, c_type in join_cols_types
1829        ]
1830    )
1831
1832    update_or_nothing = ('UPDATE' if value_cols else 'NOTHING')
1833
1834    def sets_subquery(l_prefix: str, r_prefix: str):
1835        if not value_cols:
1836            return ''
1837
1838        utc_value_cols = {
1839            c_name
1840            for c_name, c_type in value_cols
1841            if ('utc' in get_pd_type_from_db_type(c_type).lower())
1842        } if flavor not in TIMEZONE_NAIVE_FLAVORS else set()
1843
1844        cast_func_cols = {
1845            c_name: (
1846                ('', '', '')
1847                if not cast_columns or (
1848                    flavor == 'oracle'
1849                    and are_dtypes_equal(get_pd_type_from_db_type(c_type), 'bytes')
1850                )
1851                else (
1852                    ('CAST(', f" AS {c_type.replace('_', ' ')}", ')' + (
1853                        " AT TIME ZONE 'UTC'"
1854                        if c_name in utc_value_cols
1855                        else ''
1856                    ))
1857                    if flavor not in ('sqlite', 'geopackage')
1858                    else ('', '', '')
1859                )
1860            )
1861            for c_name, c_type in value_cols
1862        }
1863        return 'SET ' + ',\n'.join([
1864            (
1865                l_prefix + sql_item_name(c_name, flavor, None)
1866                + ' = '
1867                + cast_func_cols[c_name][0]
1868                + r_prefix + sql_item_name(c_name, flavor, None)
1869                + cast_func_cols[c_name][1]
1870                + cast_func_cols[c_name][2]
1871            ) for c_name, c_type in value_cols
1872        ])
1873
1874    def and_subquery(l_prefix: str, r_prefix: str):
1875        return '\n            AND\n                '.join([
1876            (
1877                (
1878                    "COALESCE("
1879                    + l_prefix
1880                    + sql_item_name(c_name, flavor, None)
1881                    + ", "
1882                    + get_null_replacement(c_type, flavor)
1883                    + ")"
1884                    + '\n                =\n                '
1885                    + "COALESCE("
1886                    + r_prefix
1887                    + sql_item_name(c_name, flavor, None)
1888                    + ", "
1889                    + get_null_replacement(c_type, flavor)
1890                    + ")"
1891                )
1892                if null_indices
1893                else (
1894                    l_prefix
1895                    + sql_item_name(c_name, flavor, None)
1896                    + ' = '
1897                    + r_prefix
1898                    + sql_item_name(c_name, flavor, None)
1899                )
1900            ) for c_name, c_type in join_cols_types
1901        ])
1902
1903    skip_query_val = ""
1904    target_table_name = sql_item_name(target, flavor, schema)
1905    patch_table_name = sql_item_name(patch, flavor, patch_schema)
1906    dt_col_name = sql_item_name(datetime_col, flavor, None) if datetime_col else None
1907    date_bounds_table = patch_table_name if flavor != 'mssql' else '[date_bounds]'
1908    min_dt_col_name = f"MIN({dt_col_name})" if flavor != 'mssql' else '[Min_dt]'
1909    max_dt_col_name = f"MAX({dt_col_name})" if flavor != 'mssql' else '[Max_dt]'
1910    date_bounds_subquery = (
1911        f"""f.{dt_col_name} >= (SELECT {min_dt_col_name} FROM {date_bounds_table})
1912            AND
1913                f.{dt_col_name} <= (SELECT {max_dt_col_name} FROM {date_bounds_table})"""
1914        if datetime_col
1915        else "1 = 1"
1916    )
1917    with_temp_date_bounds = f"""WITH [date_bounds] AS (
1918        SELECT MIN({dt_col_name}) AS {min_dt_col_name}, MAX({dt_col_name}) AS {max_dt_col_name}
1919        FROM {patch_table_name}
1920    )""" if datetime_col else ""
1921    identity_insert_on = (
1922        f"SET IDENTITY_INSERT {target_table_name} ON"
1923        if identity_insert
1924        else skip_query_val
1925    )
1926    identity_insert_off = (
1927        f"SET IDENTITY_INSERT {target_table_name} OFF"
1928        if identity_insert
1929        else skip_query_val
1930    )
1931
1932    ### NOTE: MSSQL upserts must exclude the update portion if only upserting indices.
1933    when_matched_update_sets_subquery_none = "" if not value_cols else (
1934        "\n        WHEN MATCHED THEN\n"
1935        f"            UPDATE {sets_subquery('', 'p.')}"
1936    )
1937
1938    cols_equal_values = '\n,'.join(
1939        [
1940            f"{sql_item_name(c_name, flavor)} = VALUES({sql_item_name(c_name, flavor)})"
1941            for c_name, c_type in value_cols
1942        ]
1943    )
1944    on_duplicate_key_update = (
1945        "ON DUPLICATE KEY UPDATE"
1946        if value_cols
1947        else ""
1948    )
1949    ignore = "IGNORE " if not value_cols else ""
1950
1951    formatted_queries = [
1952        textwrap.dedent(base_query.format(
1953            sets_subquery_none=sets_subquery('', 'p.'),
1954            sets_subquery_none_excluded=sets_subquery('', 'EXCLUDED.'),
1955            sets_subquery_f=sets_subquery('f.', 'p.'),
1956            and_subquery_f=and_subquery('p.', 'f.'),
1957            and_subquery_t=and_subquery('p.', 't.'),
1958            target_table_name=target_table_name,
1959            patch_table_name=patch_table_name,
1960            patch_cols_str=patch_cols_str,
1961            patch_cols_prefixed_str=patch_cols_prefixed_str,
1962            date_bounds_subquery=date_bounds_subquery,
1963            join_cols_str=join_cols_str,
1964            coalesce_join_cols_str=coalesce_join_cols_str,
1965            update_or_nothing=update_or_nothing,
1966            when_matched_update_sets_subquery_none=when_matched_update_sets_subquery_none,
1967            cols_equal_values=cols_equal_values,
1968            on_duplicate_key_update=on_duplicate_key_update,
1969            ignore=ignore,
1970            with_temp_date_bounds=with_temp_date_bounds,
1971            identity_insert_on=identity_insert_on,
1972            identity_insert_off=identity_insert_off,
1973        )).lstrip().rstrip()
1974        for base_query in base_queries
1975    ]
1976
1977    ### NOTE: Allow for skipping some queries.
1978    return [query for query in formatted_queries if query]

Build a list of MERGE, UPDATE, DELETE/INSERT queries to apply a patch to target table.

Parameters
  • target (str): The name of the target table.
  • patch (str): The name of the patch table. This should have the same shape as the target.
  • connectable (Union[meerschaum.connectors.sql.SQLConnector, sqlalchemy.orm.session.Session]): The SQLConnector or SQLAlchemy session which will later execute the queries.
  • join_cols (List[str]): The columns to use to join the patch to the target.
  • flavor (Optional[str], default None): If using a SQLAlchemy session, provide the expected database flavor.
  • upsert (bool, default False): If True, return an upsert query rather than an update.
  • datetime_col (Optional[str], default None): If provided, bound the join query using this column as the datetime index. This must be present on both tables.
  • schema (Optional[str], default None): If provided, use this schema when quoting the target table. Defaults to connector.schema.
  • patch_schema (Optional[str], default None): If provided, use this schema when quoting the patch table. Defaults to schema.
  • target_cols_types (Optional[Dict[str, Any]], default None): If provided, use these as the columns-types dictionary for the target table. Default will infer from the database context.
  • patch_cols_types (Optional[Dict[str, Any]], default None): If provided, use these as the columns-types dictionary for the target table. Default will infer from the database context.
  • identity_insert (bool, default False): If True, include SET IDENTITY_INSERT queries before and after the update queries. Only applies for MSSQL upserts.
  • null_indices (bool, default True): If False, do not coalesce index columns before joining.
  • cast_columns (bool, default True): If False, do not cast update columns to the target table types.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A list of query strings to perform the update operation.
def get_null_replacement(typ: str, flavor: str) -> str:
1981def get_null_replacement(typ: str, flavor: str) -> str:
1982    """
1983    Return a value that may temporarily be used in place of NULL for this type.
1984
1985    Parameters
1986    ----------
1987    typ: str
1988        The typ to be converted to NULL.
1989
1990    flavor: str
1991        The database flavor for which this value will be used.
1992
1993    Returns
1994    -------
1995    A value which may stand in place of NULL for this type.
1996    `'None'` is returned if a value cannot be determined.
1997    """
1998    from meerschaum.utils.dtypes.sql import DB_FLAVORS_CAST_DTYPES
1999    if 'geometry' in typ.lower():
2000        return "'010100000000008058346FCDC100008058346FCDC1'"
2001    if 'int' in typ.lower() or typ.lower() in ('numeric', 'number'):
2002        return '-987654321'
2003    if 'bool' in typ.lower() or typ.lower() == 'bit':
2004        bool_typ = (
2005            PD_TO_DB_DTYPES_FLAVORS
2006            .get('bool', {})
2007            .get(flavor, PD_TO_DB_DTYPES_FLAVORS['bool']['default'])
2008        )
2009        if flavor in DB_FLAVORS_CAST_DTYPES:
2010            bool_typ = DB_FLAVORS_CAST_DTYPES[flavor].get(bool_typ, bool_typ)
2011        val_to_cast = (
2012            -987654321
2013            if flavor in ('mysql', 'mariadb')
2014            else 0
2015        )
2016        return f'CAST({val_to_cast} AS {bool_typ})'
2017    if 'time' in typ.lower() or 'date' in typ.lower():
2018        db_type = typ if typ.isupper() else None
2019        return dateadd_str(flavor=flavor, begin='1900-01-01', db_type=db_type)
2020    if 'float' in typ.lower() or 'double' in typ.lower() or typ.lower() in ('decimal',):
2021        return '-987654321.0'
2022    if flavor == 'oracle' and typ.lower().split('(', maxsplit=1)[0] == 'char':
2023        return "'-987654321'"
2024    if flavor == 'oracle' and typ.lower() in ('blob', 'bytes'):
2025        return '00'
2026    if typ.lower() in ('uniqueidentifier', 'guid', 'uuid'):
2027        magic_val = 'DEADBEEF-ABBA-BABE-CAFE-DECAFC0FFEE5'
2028        if flavor == 'mssql':
2029            return f"CAST('{magic_val}' AS UNIQUEIDENTIFIER)"
2030        return f"'{magic_val}'"
2031    return ('n' if flavor == 'oracle' else '') + "'-987654321'"

Return a value that may temporarily be used in place of NULL for this type.

Parameters
  • typ (str): The typ to be converted to NULL.
  • flavor (str): The database flavor for which this value will be used.
Returns
  • A value which may stand in place of NULL for this type.
  • 'None' is returned if a value cannot be determined.
def get_db_version(conn: "'SQLConnector'", debug: bool = False) -> Optional[str]:
2034def get_db_version(conn: 'SQLConnector', debug: bool = False) -> Union[str, None]:
2035    """
2036    Fetch the database version if possible.
2037    """
2038    version_name = sql_item_name('version', conn.flavor, None)
2039    version_query = version_queries.get(
2040        conn.flavor,
2041        version_queries['default']
2042    ).format(version_name=version_name)
2043    return conn.value(version_query, debug=debug)

Fetch the database version if possible.

def get_rename_table_queries( old_table: str, new_table: str, flavor: str, schema: Optional[str] = None) -> List[str]:
2046def get_rename_table_queries(
2047    old_table: str,
2048    new_table: str,
2049    flavor: str,
2050    schema: Optional[str] = None,
2051) -> List[str]:
2052    """
2053    Return queries to alter a table's name.
2054
2055    Parameters
2056    ----------
2057    old_table: str
2058        The unquoted name of the old table.
2059
2060    new_table: str
2061        The unquoted name of the new table.
2062
2063    flavor: str
2064        The database flavor to use for the query (e.g. `'mssql'`, `'postgresql'`.
2065
2066    schema: Optional[str], default None
2067        The schema on which the table resides.
2068
2069    Returns
2070    -------
2071    A list of `ALTER TABLE` or equivalent queries for the database flavor.
2072    """
2073    old_table_name = sql_item_name(old_table, flavor, schema)
2074    new_table_name = sql_item_name(new_table, flavor, None)
2075    tmp_table = '_tmp_rename_' + new_table
2076    tmp_table_name = sql_item_name(tmp_table, flavor, schema)
2077    if flavor == 'mssql':
2078        return [f"EXEC sp_rename '{old_table}', '{new_table}'"]
2079
2080    if_exists_str = "IF EXISTS" if flavor in DROP_IF_EXISTS_FLAVORS else ""
2081    if flavor == 'duckdb':
2082        return (
2083            get_create_table_queries(
2084                f"SELECT * FROM {old_table_name}",
2085                tmp_table,
2086                'duckdb',
2087                schema,
2088            ) + get_create_table_queries(
2089                f"SELECT * FROM {tmp_table_name}",
2090                new_table,
2091                'duckdb',
2092                schema,
2093            ) + [
2094                f"DROP TABLE {if_exists_str} {tmp_table_name}",
2095                f"DROP TABLE {if_exists_str} {old_table_name}",
2096            ]
2097        )
2098
2099    return [f"ALTER TABLE {old_table_name} RENAME TO {new_table_name}"]

Return queries to alter a table's name.

Parameters
  • old_table (str): The unquoted name of the old table.
  • new_table (str): The unquoted name of the new table.
  • flavor (str): The database flavor to use for the query (e.g. 'mssql', 'postgresql'.
  • schema (Optional[str], default None): The schema on which the table resides.
Returns
  • A list of ALTER TABLE or equivalent queries for the database flavor.
def get_create_table_query( query_or_dtypes: Union[str, Dict[str, str]], new_table: str, flavor: str, schema: Optional[str] = None) -> str:
2102def get_create_table_query(
2103    query_or_dtypes: Union[str, Dict[str, str]],
2104    new_table: str,
2105    flavor: str,
2106    schema: Optional[str] = None,
2107) -> str:
2108    """
2109    NOTE: This function is deprecated. Use `get_create_table_queries()` instead.
2110
2111    Return a query to create a new table from a `SELECT` query.
2112
2113    Parameters
2114    ----------
2115    query: Union[str, Dict[str, str]]
2116        The select query to use for the creation of the table.
2117        If a dictionary is provided, return a `CREATE TABLE` query from the given `dtypes` columns.
2118
2119    new_table: str
2120        The unquoted name of the new table.
2121
2122    flavor: str
2123        The database flavor to use for the query (e.g. `'mssql'`, `'postgresql'`).
2124
2125    schema: Optional[str], default None
2126        The schema on which the table will reside.
2127
2128    Returns
2129    -------
2130    A `CREATE TABLE` (or `SELECT INTO`) query for the database flavor.
2131    """
2132    return get_create_table_queries(
2133        query_or_dtypes,
2134        new_table,
2135        flavor,
2136        schema=schema,
2137        primary_key=None,
2138    )[0]

NOTE: This function is deprecated. Use get_create_table_queries() instead.

Return a query to create a new table from a SELECT query.

Parameters
  • query (Union[str, Dict[str, str]]): The select query to use for the creation of the table. If a dictionary is provided, return a CREATE TABLE query from the given dtypes columns.
  • new_table (str): The unquoted name of the new table.
  • flavor (str): The database flavor to use for the query (e.g. 'mssql', 'postgresql').
  • schema (Optional[str], default None): The schema on which the table will reside.
Returns
  • A CREATE TABLE (or SELECT INTO) query for the database flavor.
def get_create_table_queries( query_or_dtypes: Union[str, Dict[str, str]], new_table: str, flavor: str, schema: Optional[str] = None, primary_key: Optional[str] = None, primary_key_db_type: Optional[str] = None, autoincrement: bool = False, datetime_column: Optional[str] = None) -> List[str]:
2141def get_create_table_queries(
2142    query_or_dtypes: Union[str, Dict[str, str]],
2143    new_table: str,
2144    flavor: str,
2145    schema: Optional[str] = None,
2146    primary_key: Optional[str] = None,
2147    primary_key_db_type: Optional[str] = None,
2148    autoincrement: bool = False,
2149    datetime_column: Optional[str] = None,
2150) -> List[str]:
2151    """
2152    Return a query to create a new table from a `SELECT` query or a `dtypes` dictionary.
2153
2154    Parameters
2155    ----------
2156    query_or_dtypes: Union[str, Dict[str, str]]
2157        The select query to use for the creation of the table.
2158        If a dictionary is provided, return a `CREATE TABLE` query from the given `dtypes` columns.
2159
2160    new_table: str
2161        The unquoted name of the new table.
2162
2163    flavor: str
2164        The database flavor to use for the query (e.g. `'mssql'`, `'postgresql'`).
2165
2166    schema: Optional[str], default None
2167        The schema on which the table will reside.
2168
2169    primary_key: Optional[str], default None
2170        If provided, designate this column as the primary key in the new table.
2171
2172    primary_key_db_type: Optional[str], default None
2173        If provided, alter the primary key to this type (to set NOT NULL constraint).
2174
2175    autoincrement: bool, default False
2176        If `True` and `primary_key` is provided, create the `primary_key` column
2177        as an auto-incrementing integer column.
2178
2179    datetime_column: Optional[str], default None
2180        If provided, include this column in the primary key.
2181        Applicable to TimescaleDB only.
2182
2183    Returns
2184    -------
2185    A `CREATE TABLE` (or `SELECT INTO`) query for the database flavor.
2186    """
2187    if not isinstance(query_or_dtypes, (str, dict)):
2188        raise TypeError("`query_or_dtypes` must be a query or a dtypes dictionary.")
2189
2190    method = (
2191        _get_create_table_query_from_cte
2192        if isinstance(query_or_dtypes, str)
2193        else _get_create_table_query_from_dtypes
2194    )
2195    return method(
2196        query_or_dtypes,
2197        new_table,
2198        flavor,
2199        schema=schema,
2200        primary_key=primary_key,
2201        primary_key_db_type=primary_key_db_type,
2202        autoincrement=(autoincrement and flavor not in SKIP_AUTO_INCREMENT_FLAVORS),
2203        datetime_column=datetime_column,
2204    )

Return a query to create a new table from a SELECT query or a dtypes dictionary.

Parameters
  • query_or_dtypes (Union[str, Dict[str, str]]): The select query to use for the creation of the table. If a dictionary is provided, return a CREATE TABLE query from the given dtypes columns.
  • new_table (str): The unquoted name of the new table.
  • flavor (str): The database flavor to use for the query (e.g. 'mssql', 'postgresql').
  • schema (Optional[str], default None): The schema on which the table will reside.
  • primary_key (Optional[str], default None): If provided, designate this column as the primary key in the new table.
  • primary_key_db_type (Optional[str], default None): If provided, alter the primary key to this type (to set NOT NULL constraint).
  • autoincrement (bool, default False): If True and primary_key is provided, create the primary_key column as an auto-incrementing integer column.
  • datetime_column (Optional[str], default None): If provided, include this column in the primary key. Applicable to TimescaleDB only.
Returns
  • A CREATE TABLE (or SELECT INTO) query for the database flavor.
def wrap_query_with_cte( sub_query: str, parent_query: str, flavor: str, cte_name: str = 'src') -> str:
2453def wrap_query_with_cte(
2454    sub_query: str,
2455    parent_query: str,
2456    flavor: str,
2457    cte_name: str = "src",
2458) -> str:
2459    """
2460    Wrap a subquery in a CTE and append an encapsulating query.
2461
2462    Parameters
2463    ----------
2464    sub_query: str
2465        The query to be referenced. This may itself contain CTEs.
2466        Unless `cte_name` is provided, this will be aliased as `src`.
2467
2468    parent_query: str
2469        The larger query to append which references the subquery.
2470        This must not contain CTEs.
2471
2472    flavor: str
2473        The database flavor, e.g. `'mssql'`.
2474
2475    cte_name: str, default 'src'
2476        The CTE alias, defaults to `src`.
2477
2478    Returns
2479    -------
2480    An encapsulating query which allows you to treat `sub_query` as a temporary table.
2481
2482    Examples
2483    --------
2484
2485    ```python
2486    from meerschaum.utils.sql import wrap_query_with_cte
2487    sub_query = "WITH foo AS (SELECT 1 AS val) SELECT (val * 2) AS newval FROM foo"
2488    parent_query = "SELECT newval * 3 FROM src"
2489    query = wrap_query_with_cte(sub_query, parent_query, 'mssql')
2490    print(query)
2491    # WITH foo AS (SELECT 1 AS val),
2492    # [src] AS (
2493    #     SELECT (val * 2) AS newval FROM foo
2494    # )
2495    # SELECT newval * 3 FROM src
2496    ```
2497
2498    """
2499    import textwrap
2500    sub_query = sub_query.lstrip()
2501    cte_name_quoted = sql_item_name(cte_name, flavor, None)
2502
2503    if flavor in NO_CTE_FLAVORS:
2504        return (
2505            parent_query
2506            .replace(cte_name_quoted, '--MRSM_SUBQUERY--')
2507            .replace(cte_name, '--MRSM_SUBQUERY--')
2508            .replace('--MRSM_SUBQUERY--', f"(\n{sub_query}\n) AS {cte_name_quoted}")
2509        )
2510
2511    if sub_query.lstrip().lower().startswith('with '):
2512        final_select_ix = sub_query.lower().rfind('select')
2513        return (
2514            sub_query[:final_select_ix].rstrip() + ',\n'
2515            + f"{cte_name_quoted} AS (\n"
2516            + '    ' + sub_query[final_select_ix:]
2517            + "\n)\n"
2518            + parent_query
2519        )
2520
2521    return (
2522        f"WITH {cte_name_quoted} AS (\n"
2523        f"{textwrap.indent(sub_query, '    ')}\n"
2524        f")\n{parent_query}"
2525    )

Wrap a subquery in a CTE and append an encapsulating query.

Parameters
  • sub_query (str): The query to be referenced. This may itself contain CTEs. Unless cte_name is provided, this will be aliased as src.
  • parent_query (str): The larger query to append which references the subquery. This must not contain CTEs.
  • flavor (str): The database flavor, e.g. 'mssql'.
  • cte_name (str, default 'src'): The CTE alias, defaults to src.
Returns
  • An encapsulating query which allows you to treat sub_query as a temporary table.
Examples
from meerschaum.utils.sql import wrap_query_with_cte
sub_query = "WITH foo AS (SELECT 1 AS val) SELECT (val * 2) AS newval FROM foo"
parent_query = "SELECT newval * 3 FROM src"
query = wrap_query_with_cte(sub_query, parent_query, 'mssql')
print(query)
# WITH foo AS (SELECT 1 AS val),
# [src] AS (
#     SELECT (val * 2) AS newval FROM foo
# )
# SELECT newval * 3 FROM src
def format_cte_subquery( sub_query: str, flavor: str, sub_name: str = 'src', cols_to_select: Union[List[str], str] = '*') -> str:
2528def format_cte_subquery(
2529    sub_query: str,
2530    flavor: str,
2531    sub_name: str = 'src',
2532    cols_to_select: Union[List[str], str] = '*',
2533) -> str:
2534    """
2535    Given a subquery, build a wrapper query that selects from the CTE subquery.
2536
2537    Parameters
2538    ----------
2539    sub_query: str
2540        The subquery to wrap.
2541
2542    flavor: str
2543        The database flavor to use for the query (e.g. `'mssql'`, `'postgresql'`.
2544
2545    sub_name: str, default 'src'
2546        If possible, give this name to the CTE (must be unquoted).
2547
2548    cols_to_select: Union[List[str], str], default ''
2549        If specified, choose which columns to select from the CTE.
2550        If a list of strings is provided, each item will be quoted and joined with commas.
2551        If a string is given, assume it is quoted and insert it into the query.
2552
2553    Returns
2554    -------
2555    A wrapper query that selects from the CTE.
2556    """
2557    quoted_sub_name = sql_item_name(sub_name, flavor, None)
2558    cols_str = (
2559        cols_to_select
2560        if isinstance(cols_to_select, str)
2561        else ', '.join([sql_item_name(col, flavor, None) for col in cols_to_select])
2562    )
2563    parent_query = (
2564        f"SELECT {cols_str}\n"
2565        f"FROM {quoted_sub_name}"
2566    )
2567    return wrap_query_with_cte(sub_query, parent_query, flavor, cte_name=sub_name)

Given a subquery, build a wrapper query that selects from the CTE subquery.

Parameters
  • sub_query (str): The subquery to wrap.
  • flavor (str): The database flavor to use for the query (e.g. 'mssql', 'postgresql'.
  • sub_name (str, default 'src'): If possible, give this name to the CTE (must be unquoted).
  • cols_to_select (Union[List[str], str], default ''): If specified, choose which columns to select from the CTE. If a list of strings is provided, each item will be quoted and joined with commas. If a string is given, assume it is quoted and insert it into the query.
Returns
  • A wrapper query that selects from the CTE.
def session_execute( session: "'sqlalchemy.orm.session.Session'", queries: Union[List[str], str], with_results: bool = False, debug: bool = False) -> "Union[mrsm.SuccessTuple, Tuple[mrsm.SuccessTuple, List['sqlalchemy.sql.ResultProxy']]]":
2570def session_execute(
2571    session: 'sqlalchemy.orm.session.Session',
2572    queries: Union[List[str], str],
2573    with_results: bool = False,
2574    debug: bool = False,
2575) -> Union[mrsm.SuccessTuple, Tuple[mrsm.SuccessTuple, List['sqlalchemy.sql.ResultProxy']]]:
2576    """
2577    Similar to `SQLConnector.exec_queries()`, execute a list of queries
2578    and roll back when one fails.
2579
2580    Parameters
2581    ----------
2582    session: sqlalchemy.orm.session.Session
2583        A SQLAlchemy session representing a transaction.
2584
2585    queries: Union[List[str], str]
2586        A query or list of queries to be executed.
2587        If a query fails, roll back the session.
2588
2589    with_results: bool, default False
2590        If `True`, return a list of result objects.
2591
2592    Returns
2593    -------
2594    A `SuccessTuple` indicating the queries were successfully executed.
2595    If `with_results`, return the `SuccessTuple` and a list of results.
2596    """
2597    sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False)
2598    if not isinstance(queries, list):
2599        queries = [queries]
2600    successes, msgs, results = [], [], []
2601    for query in queries:
2602        if debug:
2603            dprint(query)
2604        query_text = sqlalchemy.text(query)
2605        fail_msg = "Failed to execute queries."
2606        try:
2607            result = session.execute(query_text)
2608            query_success = result is not None
2609            query_msg = "Success" if query_success else fail_msg
2610        except Exception as e:
2611            query_success = False
2612            query_msg = f"{fail_msg}\n{e}"
2613            result = None
2614        successes.append(query_success)
2615        msgs.append(query_msg)
2616        results.append(result)
2617        if not query_success:
2618            if debug:
2619                dprint("Rolling back session.")
2620            session.rollback()
2621            break
2622    success, msg = all(successes), '\n'.join(msgs)
2623    if with_results:
2624        return (success, msg), results
2625    return success, msg

Similar to SQLConnector.exec_queries(), execute a list of queries and roll back when one fails.

Parameters
  • session (sqlalchemy.orm.session.Session): A SQLAlchemy session representing a transaction.
  • queries (Union[List[str], str]): A query or list of queries to be executed. If a query fails, roll back the session.
  • with_results (bool, default False): If True, return a list of result objects.
Returns
  • A SuccessTuple indicating the queries were successfully executed.
  • If with_results, return the SuccessTuple and a list of results.
def get_reset_autoincrement_queries( table: str, column: str, connector: meerschaum.connectors.SQLConnector, schema: Optional[str] = None, debug: bool = False) -> List[str]:
2628def get_reset_autoincrement_queries(
2629    table: str,
2630    column: str,
2631    connector: mrsm.connectors.SQLConnector,
2632    schema: Optional[str] = None,
2633    debug: bool = False,
2634) -> List[str]:
2635    """
2636    Return a list of queries to reset a table's auto-increment counter to the next largest value.
2637
2638    Parameters
2639    ----------
2640    table: str
2641        The name of the table on which the auto-incrementing column exists.
2642
2643    column: str
2644        The name of the auto-incrementing column.
2645
2646    connector: mrsm.connectors.SQLConnector
2647        The SQLConnector to the database on which the table exists.
2648
2649    schema: Optional[str], default None
2650        The schema of the table. Defaults to `connector.schema`.
2651
2652    Returns
2653    -------
2654    A list of queries to be executed to reset the auto-incrementing column.
2655    """
2656    if not table_exists(table, connector, schema=schema, debug=debug):
2657        return []
2658
2659    schema = schema or connector.schema
2660    max_id_name = sql_item_name('max_id', connector.flavor)
2661    table_name = sql_item_name(table, connector.flavor, schema)
2662    table_seq_name = sql_item_name(table + '_' + column + '_seq', connector.flavor, schema)
2663    column_name = sql_item_name(column, connector.flavor)
2664    max_id = connector.value(
2665        f"""
2666        SELECT COALESCE(MAX({column_name}), 0) AS {max_id_name}
2667        FROM {table_name}
2668        """,
2669        debug=debug,
2670    )
2671    if max_id is None:
2672        return []
2673
2674    reset_queries = reset_autoincrement_queries.get(
2675        connector.flavor,
2676        reset_autoincrement_queries['default']
2677    )
2678    if not isinstance(reset_queries, list):
2679        reset_queries = [reset_queries]
2680
2681    return [
2682        query.format(
2683            column=column,
2684            column_name=column_name,
2685            table=table,
2686            table_name=table_name,
2687            table_seq_name=table_seq_name,
2688            val=max_id,
2689            val_plus_1=(max_id + 1),
2690        )
2691        for query in reset_queries
2692    ]

Return a list of queries to reset a table's auto-increment counter to the next largest value.

Parameters
  • table (str): The name of the table on which the auto-incrementing column exists.
  • column (str): The name of the auto-incrementing column.
  • connector (mrsm.connectors.SQLConnector): The SQLConnector to the database on which the table exists.
  • schema (Optional[str], default None): The schema of the table. Defaults to connector.schema.
Returns
  • A list of queries to be executed to reset the auto-incrementing column.
def get_postgis_geo_columns_types( connectable: "Union['mrsm.connectors.sql.SQLConnector', 'sqlalchemy.orm.session.Session', 'sqlalchemy.engine.base.Engine']", table: str, schema: Optional[str] = 'public', debug: bool = False) -> Dict[str, str]:
2695def get_postgis_geo_columns_types(
2696    connectable: Union[
2697        'mrsm.connectors.sql.SQLConnector',
2698        'sqlalchemy.orm.session.Session',
2699        'sqlalchemy.engine.base.Engine'
2700    ],
2701    table: str,
2702    schema: Optional[str] = 'public',
2703    debug: bool = False,
2704) -> Dict[str, str]:
2705    """
2706    Return a dictionary mapping PostGIS geometry column names to geometry types.
2707    """
2708    from meerschaum.utils.dtypes import get_geometry_type_srid
2709    sqlalchemy = mrsm.attempt_import('sqlalchemy', lazy=False)
2710    default_type, default_srid = get_geometry_type_srid()
2711    default_type = default_type.upper()
2712
2713    clean(table)
2714    clean(str(schema))
2715    schema = schema or 'public'
2716    truncated_schema_name = truncate_item_name(schema, flavor='postgis')
2717    truncated_table_name = truncate_item_name(table, flavor='postgis')
2718    query = sqlalchemy.text(
2719        "SELECT \"f_geometry_column\" AS \"column\", 'GEOMETRY' AS \"func\", \"type\", \"srid\"\n"
2720        "FROM \"geometry_columns\"\n"
2721        f"WHERE \"f_table_schema\" = '{truncated_schema_name}'\n"
2722        f"    AND \"f_table_name\" = '{truncated_table_name}'\n"
2723        "UNION ALL\n"
2724        "SELECT \"f_geography_column\" AS \"column\", 'GEOGRAPHY' AS \"func\", \"type\", \"srid\"\n"
2725        "FROM \"geography_columns\"\n"
2726        f"WHERE \"f_table_schema\" = '{truncated_schema_name}'\n"
2727        f"    AND \"f_table_name\" = '{truncated_table_name}'\n"
2728    )
2729    debug_kwargs = {'debug': debug} if isinstance(connectable, mrsm.connectors.SQLConnector) else {}
2730    result_rows = [
2731        row
2732        for row in connectable.execute(query, **debug_kwargs).fetchall()
2733    ]
2734    cols_type_tuples = {
2735        row[0]: (row[1], row[2], row[3])
2736        for row in result_rows
2737    }
2738
2739    geometry_cols_types = {
2740        col: (
2741            f"{func}({typ.upper()}, {srid})"
2742            if srid
2743            else (
2744                func
2745                + (
2746                    f'({typ.upper()})'
2747                    if typ.upper() not in ('GEOMETRY', 'GEOGRAPHY')
2748                    else ''
2749                )
2750            )
2751        )
2752        for col, (func, typ, srid) in cols_type_tuples.items()
2753    }
2754    return geometry_cols_types

Return a dictionary mapping PostGIS geometry column names to geometry types.

def get_create_schema_if_not_exists_queries(schema: str, flavor: str) -> List[str]:
2757def get_create_schema_if_not_exists_queries(
2758    schema: str,
2759    flavor: str,
2760) -> List[str]:
2761    """
2762    Return the queries to create a schema if it does not yet exist.
2763    For databases which do not support schemas, an empty list will be returned.
2764    """
2765    if not schema:
2766        return []
2767    
2768    if flavor in NO_SCHEMA_FLAVORS:
2769        return []
2770
2771    if schema == DEFAULT_SCHEMA_FLAVORS.get(flavor, None):
2772        return []
2773
2774    clean(schema)
2775
2776    if flavor == 'mssql':
2777        return [
2778            (
2779                f"IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = '{schema}')\n"
2780                "BEGIN\n"
2781                f"    EXEC('CREATE SCHEMA {sql_item_name(schema, flavor)}');\n"
2782                "END;"
2783            )
2784        ]
2785
2786    if flavor == 'oracle':
2787        return []
2788
2789    return [
2790        f"CREATE SCHEMA IF NOT EXISTS {sql_item_name(schema, flavor)};"
2791    ]

Return the queries to create a schema if it does not yet exist. For databases which do not support schemas, an empty list will be returned.