meerschaum.utils.dataframe

Utility functions for working with DataFrames.

   1#! /usr/bin/env python3
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4
   5"""
   6Utility functions for working with DataFrames.
   7"""
   8
   9from __future__ import annotations
  10
  11import pathlib
  12from datetime import datetime, timezone
  13from collections import defaultdict
  14
  15import meerschaum as mrsm
  16from meerschaum.utils.typing import (
  17    Optional, Dict, Any, List, Hashable, Generator,
  18    Iterator, Iterable, Union, TYPE_CHECKING,
  19)
  20
  21if TYPE_CHECKING:
  22    pd, dask = mrsm.attempt_import('pandas', 'dask')
  23
  24
  25def add_missing_cols_to_df(
  26    df: 'pd.DataFrame',
  27    dtypes: Dict[str, Any],
  28) -> 'pd.DataFrame':
  29    """
  30    Add columns from the dtypes dictionary as null columns to a new DataFrame.
  31
  32    Parameters
  33    ----------
  34    df: pd.DataFrame
  35        The dataframe we should copy and add null columns.
  36
  37    dtypes:
  38        The data types dictionary which may contain keys not present in `df.columns`.
  39
  40    Returns
  41    -------
  42    A new `DataFrame` with the keys from `dtypes` added as null columns.
  43    If `df.dtypes` is the same as `dtypes`, then return a reference to `df`.
  44    NOTE: This will not ensure that dtypes are enforced!
  45
  46    Examples
  47    --------
  48    >>> import pandas as pd
  49    >>> df = pd.DataFrame([{'a': 1}])
  50    >>> dtypes = {'b': 'Int64'}
  51    >>> add_missing_cols_to_df(df, dtypes)
  52          a  b
  53       0  1  <NA>
  54    >>> add_missing_cols_to_df(df, dtypes).dtypes
  55    a    int64
  56    b    Int64
  57    dtype: object
  58    >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes
  59    a    int64
  60    dtype: object
  61    >>> 
  62    """
  63    if set(df.columns) == set(dtypes):
  64        return df
  65
  66    from meerschaum.utils.packages import attempt_import
  67    from meerschaum.utils.dtypes import to_pandas_dtype
  68    pandas = attempt_import('pandas')
  69
  70    def build_series(dtype: str):
  71        return pandas.Series([], dtype=to_pandas_dtype(dtype))
  72
  73    assign_kwargs = {
  74        str(col): build_series(str(typ))
  75        for col, typ in dtypes.items()
  76        if col not in df.columns
  77    }
  78    df_with_cols = df.assign(**assign_kwargs)
  79    for col in assign_kwargs:
  80        df_with_cols[col] = df_with_cols[col].fillna(pandas.NA)
  81    return df_with_cols
  82
  83
  84def filter_unseen_df(
  85    old_df: 'pd.DataFrame',
  86    new_df: 'pd.DataFrame',
  87    safe_copy: bool = True,
  88    dtypes: Optional[Dict[str, Any]] = None,
  89    include_unchanged_columns: bool = False,
  90    coerce_mixed_numerics: bool = True,
  91    debug: bool = False,
  92) -> 'pd.DataFrame':
  93    """
  94    Left join two DataFrames to find the newest unseen data.
  95
  96    Parameters
  97    ----------
  98    old_df: 'pd.DataFrame'
  99        The original (target) dataframe. Acts as a filter on the `new_df`.
 100
 101    new_df: 'pd.DataFrame'
 102        The fetched (source) dataframe. Rows that are contained in `old_df` are removed.
 103
 104    safe_copy: bool, default True
 105        If `True`, create a copy before comparing and modifying the dataframes.
 106        Setting to `False` may mutate the DataFrames.
 107
 108    dtypes: Optional[Dict[str, Any]], default None
 109        Optionally specify the datatypes of the dataframe.
 110
 111    include_unchanged_columns: bool, default False
 112        If `True`, include columns which haven't changed on rows which have changed.
 113
 114    coerce_mixed_numerics: bool, default True
 115        If `True`, cast mixed integer and float columns between the old and new dataframes into
 116        numeric values (`decimal.Decimal`).
 117
 118    debug: bool, default False
 119        Verbosity toggle.
 120
 121    Returns
 122    -------
 123    A pandas dataframe of the new, unseen rows in `new_df`.
 124
 125    Examples
 126    --------
 127    ```python
 128    >>> import pandas as pd
 129    >>> df1 = pd.DataFrame({'a': [1,2]})
 130    >>> df2 = pd.DataFrame({'a': [2,3]})
 131    >>> filter_unseen_df(df1, df2)
 132       a
 133    0  3
 134
 135    ```
 136
 137    """
 138    if old_df is None:
 139        return new_df
 140
 141    if safe_copy:
 142        old_df = old_df.copy()
 143        new_df = new_df.copy()
 144
 145    import json
 146    import functools
 147    import traceback
 148    from meerschaum.utils.warnings import warn
 149    from meerschaum.utils.packages import import_pandas, attempt_import
 150    from meerschaum.utils.dtypes import (
 151        to_pandas_dtype,
 152        are_dtypes_equal,
 153        attempt_cast_to_numeric,
 154        attempt_cast_to_uuid,
 155        attempt_cast_to_bytes,
 156        attempt_cast_to_geometry,
 157        coerce_timezone,
 158        serialize_decimal,
 159    )
 160    from meerschaum.utils.dtypes.sql import get_numeric_precision_scale
 161    pd = import_pandas(debug=debug)
 162    is_dask = 'dask' in new_df.__module__
 163    if is_dask:
 164        pandas = attempt_import('pandas')
 165        _ = attempt_import('partd', lazy=False)
 166        dd = attempt_import('dask.dataframe')
 167        merge = dd.merge
 168        NA = pandas.NA
 169    else:
 170        merge = pd.merge
 171        NA = pd.NA
 172
 173    new_df_dtypes = dict(new_df.dtypes)
 174    old_df_dtypes = dict(old_df.dtypes)
 175
 176    same_cols = set(new_df.columns) == set(old_df.columns)
 177    if not same_cols:
 178        new_df = add_missing_cols_to_df(new_df, old_df_dtypes)
 179        old_df = add_missing_cols_to_df(old_df, new_df_dtypes)
 180
 181        new_types_missing_from_old = {
 182            col: typ
 183            for col, typ in new_df_dtypes.items()
 184            if col not in old_df_dtypes
 185        }
 186        old_types_missing_from_new = {
 187            col: typ
 188            for col, typ in new_df_dtypes.items()
 189            if col not in old_df_dtypes
 190        }
 191        old_df_dtypes.update(new_types_missing_from_old)
 192        new_df_dtypes.update(old_types_missing_from_new)
 193
 194    ### Edge case: two empty lists cast to DFs.
 195    elif len(new_df.columns) == 0:
 196        return new_df
 197
 198    try:
 199        ### Order matters when checking equality.
 200        new_df = new_df[old_df.columns]
 201
 202    except Exception as e:
 203        warn(
 204            "Was not able to cast old columns onto new DataFrame. " +
 205            f"Are both DataFrames the same shape? Error:\n{e}",
 206            stacklevel=3,
 207        )
 208        return new_df[list(new_df_dtypes.keys())]
 209
 210    ### assume the old_df knows what it's doing, even if it's technically wrong.
 211    if dtypes is None:
 212        dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()}
 213
 214    dtypes = {
 215        col: to_pandas_dtype(typ)
 216        for col, typ in dtypes.items()
 217        if col in new_df_dtypes and col in old_df_dtypes
 218    }
 219    for col, typ in new_df_dtypes.items():
 220        if col not in dtypes:
 221            dtypes[col] = typ
 222
 223    numeric_cols_precisions_scales = {
 224        col: get_numeric_precision_scale(None, typ)
 225        for col, typ in dtypes.items()
 226        if col and str(typ).lower().startswith('numeric')
 227    }
 228
 229    dt_dtypes = {
 230        col: typ
 231        for col, typ in dtypes.items()
 232        if are_dtypes_equal(typ, 'datetime')
 233    }
 234    non_dt_dtypes = {
 235        col: typ
 236        for col, typ in dtypes.items()
 237        if col not in dt_dtypes
 238    }
 239
 240    cast_non_dt_cols = True
 241    try:
 242        new_df = new_df.astype(non_dt_dtypes)
 243        cast_non_dt_cols = False
 244    except Exception as e:
 245        warn(
 246            f"Was not able to cast the new DataFrame to the given dtypes.\n{e}"
 247        )
 248
 249    cast_dt_cols = True
 250    try:
 251        for col, typ in dt_dtypes.items():
 252            strip_utc = (
 253                (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]'
 254            )
 255            if col in old_df.columns:
 256                old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc)
 257            if col in new_df.columns:
 258                new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc)
 259        cast_dt_cols = False
 260    except Exception as e:
 261        warn(f"Could not cast datetime columns:\n{e}")
 262
 263    cast_cols = cast_dt_cols or cast_non_dt_cols
 264
 265    new_numeric_cols_existing = get_numeric_cols(new_df)
 266    old_numeric_cols = get_numeric_cols(old_df)
 267    for col, typ in {k: v for k, v in dtypes.items()}.items():
 268        if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')):
 269            new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float')
 270            new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int')
 271            new_is_numeric = col in new_numeric_cols_existing
 272            old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float')
 273            old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int')
 274            old_is_numeric = col in old_numeric_cols
 275
 276            if (
 277                coerce_mixed_numerics
 278                and
 279                (new_is_float or new_is_int or new_is_numeric)
 280                and
 281                (old_is_float or old_is_int or old_is_numeric)
 282            ):
 283                dtypes[col] = attempt_cast_to_numeric
 284                cast_cols = True
 285                continue
 286
 287            ### Fallback to object if the types don't match.
 288            warn(
 289                f"Detected different types for '{col}' "
 290                + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), "
 291                + "falling back to 'object'..."
 292            )
 293            dtypes[col] = 'object'
 294            cast_cols = True
 295
 296    if cast_cols:
 297        for col, dtype in dtypes.items():
 298            if col in new_df.columns:
 299                try:
 300                    new_df[col] = (
 301                        new_df[col].astype(dtype)
 302                        if not callable(dtype)
 303                        else new_df[col].apply(dtype)
 304                    )
 305                except Exception as e:
 306                    warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}")
 307
 308    serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str)
 309    new_json_cols = get_json_cols(new_df)
 310    old_json_cols = get_json_cols(old_df)
 311    json_cols = set(new_json_cols + old_json_cols)
 312    for json_col in old_json_cols:
 313        old_df[json_col] = old_df[json_col].apply(serializer)
 314    for json_col in new_json_cols:
 315        new_df[json_col] = new_df[json_col].apply(serializer)
 316
 317    new_numeric_cols = get_numeric_cols(new_df)
 318    numeric_cols = set(new_numeric_cols + old_numeric_cols)
 319    for numeric_col in old_numeric_cols:
 320        old_df[numeric_col] = old_df[numeric_col].apply(serialize_decimal)
 321    for numeric_col in new_numeric_cols:
 322        new_df[numeric_col] = new_df[numeric_col].apply(serialize_decimal)
 323
 324    old_dt_cols = [
 325        col
 326        for col, typ in old_df.dtypes.items()
 327        if are_dtypes_equal(str(typ), 'datetime')
 328    ]
 329    for col in old_dt_cols:
 330        strip_utc = (
 331            (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]'
 332        )
 333        old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc)
 334
 335    new_dt_cols = [
 336        col
 337        for col, typ in new_df.dtypes.items()
 338        if are_dtypes_equal(str(typ), 'datetime')
 339    ]
 340    for col in new_dt_cols:
 341        strip_utc = (
 342            (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]'
 343        )
 344        new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc)
 345
 346    old_uuid_cols = get_uuid_cols(old_df)
 347    new_uuid_cols = get_uuid_cols(new_df)
 348    uuid_cols = set(new_uuid_cols + old_uuid_cols)
 349
 350    old_bytes_cols = get_bytes_cols(old_df)
 351    new_bytes_cols = get_bytes_cols(new_df)
 352    bytes_cols = set(new_bytes_cols + old_bytes_cols)
 353
 354    old_geometry_cols = get_geometry_cols(old_df)
 355    new_geometry_cols = get_geometry_cols(new_df)
 356    geometry_cols = set(new_geometry_cols + old_geometry_cols)
 357
 358    joined_df = merge(
 359        new_df.infer_objects(copy=False).fillna(NA),
 360        old_df.infer_objects(copy=False).fillna(NA),
 361        how='left',
 362        on=None,
 363        indicator=True,
 364    )
 365    changed_rows_mask = (joined_df['_merge'] == 'left_only')
 366    new_cols = list(new_df_dtypes)
 367    delta_df = joined_df[new_cols][changed_rows_mask].reset_index(drop=True)
 368
 369    for json_col in json_cols:
 370        if json_col not in delta_df.columns:
 371            continue
 372        try:
 373            delta_df[json_col] = delta_df[json_col].apply(json.loads)
 374        except Exception:
 375            warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}")
 376
 377    for numeric_col in numeric_cols:
 378        if numeric_col not in delta_df.columns:
 379            continue
 380        try:
 381            delta_df[numeric_col] = delta_df[numeric_col].apply(
 382                functools.partial(
 383                    attempt_cast_to_numeric,
 384                    quantize=True,
 385                    precision=numeric_cols_precisions_scales.get(numeric_col, (None, None)[0]),
 386                    scale=numeric_cols_precisions_scales.get(numeric_col, (None, None)[1]),
 387                )
 388            )
 389        except Exception:
 390            warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}")
 391
 392    for uuid_col in uuid_cols:
 393        if uuid_col not in delta_df.columns:
 394            continue
 395        try:
 396            delta_df[uuid_col] = delta_df[uuid_col].apply(attempt_cast_to_uuid)
 397        except Exception:
 398            warn(f"Unable to parse numeric column '{uuid_col}':\n{traceback.format_exc()}")
 399
 400    for bytes_col in bytes_cols:
 401        if bytes_col not in delta_df.columns:
 402            continue
 403        try:
 404            delta_df[bytes_col] = delta_df[bytes_col].apply(attempt_cast_to_bytes)
 405        except Exception:
 406            warn(f"Unable to parse bytes column '{bytes_col}':\n{traceback.format_exc()}")
 407
 408    for geometry_col in geometry_cols:
 409        if geometry_col not in delta_df.columns:
 410            continue
 411        try:
 412            delta_df[geometry_col] = delta_df[geometry_col].apply(attempt_cast_to_geometry)
 413        except Exception:
 414            warn(f"Unable to parse bytes column '{bytes_col}':\n{traceback.format_exc()}")
 415
 416    return delta_df
 417
 418
 419def parse_df_datetimes(
 420    df: 'pd.DataFrame',
 421    ignore_cols: Optional[Iterable[str]] = None,
 422    strip_timezone: bool = False,
 423    chunksize: Optional[int] = None,
 424    dtype_backend: str = 'numpy_nullable',
 425    ignore_all: bool = False,
 426    debug: bool = False,
 427) -> 'pd.DataFrame':
 428    """
 429    Parse a pandas DataFrame for datetime columns and cast as datetimes.
 430
 431    Parameters
 432    ----------
 433    df: pd.DataFrame
 434        The pandas DataFrame to parse.
 435
 436    ignore_cols: Optional[Iterable[str]], default None
 437        If provided, do not attempt to coerce these columns as datetimes.
 438
 439    strip_timezone: bool, default False
 440        If `True`, remove the UTC `tzinfo` property.
 441
 442    chunksize: Optional[int], default None
 443        If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe.
 444
 445    dtype_backend: str, default 'numpy_nullable'
 446        If `df` is not a DataFrame and new one needs to be constructed,
 447        use this as the datatypes backend.
 448        Accepted values are 'numpy_nullable' and 'pyarrow'.
 449
 450    ignore_all: bool, default False
 451        If `True`, do not attempt to cast any columns to datetimes.
 452
 453    debug: bool, default False
 454        Verbosity toggle.
 455
 456    Returns
 457    -------
 458    A new pandas DataFrame with the determined datetime columns
 459    (usually ISO strings) cast as datetimes.
 460
 461    Examples
 462    --------
 463    ```python
 464    >>> import pandas as pd
 465    >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 
 466    >>> df.dtypes
 467    a    object
 468    dtype: object
 469    >>> df = parse_df_datetimes(df)
 470    >>> df.dtypes
 471    a    datetime64[ns]
 472    dtype: object
 473
 474    ```
 475
 476    """
 477    from meerschaum.utils.packages import import_pandas, attempt_import
 478    from meerschaum.utils.debug import dprint
 479    from meerschaum.utils.warnings import warn
 480    from meerschaum.utils.misc import items_str
 481    from meerschaum.utils.dtypes import to_datetime
 482    import traceback
 483    pd = import_pandas()
 484    pandas = attempt_import('pandas')
 485    pd_name = pd.__name__
 486    using_dask = 'dask' in pd_name
 487    df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__)
 488    dask_dataframe = None
 489    if using_dask or df_is_dask:
 490        npartitions = chunksize_to_npartitions(chunksize)
 491        dask_dataframe = attempt_import('dask.dataframe')
 492
 493    ### if df is a dict, build DataFrame
 494    if isinstance(df, pandas.DataFrame):
 495        pdf = df
 496    elif df_is_dask and isinstance(df, dask_dataframe.DataFrame):
 497        pdf = get_first_valid_dask_partition(df)
 498    else:
 499        if debug:
 500            dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...")
 501
 502        if using_dask:
 503            if isinstance(df, list):
 504                keys = set()
 505                for doc in df:
 506                    for key in doc:
 507                        keys.add(key)
 508                df = pd.DataFrame.from_dict(
 509                    {
 510                        k: [
 511                            doc.get(k, None)
 512                            for doc in df
 513                        ] for k in keys
 514                    },
 515                    npartitions=npartitions,
 516                )
 517            elif isinstance(df, dict):
 518                df = pd.DataFrame.from_dict(df, npartitions=npartitions)
 519            elif 'pandas.core.frame.DataFrame' in str(type(df)):
 520                df = pd.from_pandas(df, npartitions=npartitions)
 521            else:
 522                raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.")
 523            pandas = attempt_import('pandas')
 524            pdf = get_first_valid_dask_partition(df)
 525
 526        else:
 527            df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend)
 528            pdf = df
 529
 530    ### skip parsing if DataFrame is empty
 531    if len(pdf) == 0:
 532        if debug:
 533            dprint("df is empty. Returning original DataFrame without casting datetime columns...")
 534        return df
 535
 536    ignore_cols = set(
 537        (ignore_cols or []) + [
 538            col
 539            for col, dtype in pdf.dtypes.items() 
 540            if 'datetime' in str(dtype)
 541        ]
 542    )
 543    cols_to_inspect = [
 544        col
 545        for col in pdf.columns
 546        if col not in ignore_cols
 547    ] if not ignore_all else []
 548
 549    if len(cols_to_inspect) == 0:
 550        if debug:
 551            dprint("All columns are ignored, skipping datetime detection...")
 552        return df.infer_objects(copy=False).fillna(pandas.NA)
 553
 554    ### apply regex to columns to determine which are ISO datetimes
 555    iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+'
 556    dt_mask = pdf[cols_to_inspect].astype(str).apply(
 557        lambda s: s.str.match(iso_dt_regex).all()
 558    )
 559
 560    ### list of datetime column names
 561    datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]]
 562    if not datetime_cols:
 563        if debug:
 564            dprint("No columns detected as datetimes, returning...")
 565        return df.infer_objects(copy=False).fillna(pandas.NA)
 566
 567    if debug:
 568        dprint("Converting columns to datetimes: " + str(datetime_cols))
 569
 570    try:
 571        if not using_dask:
 572            df[datetime_cols] = df[datetime_cols].apply(to_datetime)
 573        else:
 574            df[datetime_cols] = df[datetime_cols].apply(
 575                to_datetime,
 576                utc=True,
 577                axis=1,
 578                meta={
 579                    col: 'datetime64[ns, UTC]'
 580                    for col in datetime_cols
 581                }
 582            )
 583    except Exception:
 584        warn(
 585            f"Unable to apply `pd.to_datetime` to {items_str(datetime_cols)}:\n"
 586            + f"{traceback.format_exc()}"
 587        )
 588
 589    if strip_timezone:
 590        for dt in datetime_cols:
 591            try:
 592                df[dt] = df[dt].dt.tz_localize(None)
 593            except Exception:
 594                warn(
 595                    f"Unable to convert column '{dt}' to naive datetime:\n"
 596                    + f"{traceback.format_exc()}"
 597                )
 598
 599    return df.fillna(pandas.NA)
 600
 601
 602def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]:
 603    """
 604    Get the columns which contain unhashable objects from a Pandas DataFrame.
 605
 606    Parameters
 607    ----------
 608    df: pd.DataFrame
 609        The DataFrame which may contain unhashable objects.
 610
 611    Returns
 612    -------
 613    A list of columns.
 614    """
 615    if df is None:
 616        return []
 617    if len(df) == 0:
 618        return []
 619
 620    is_dask = 'dask' in df.__module__
 621    if is_dask:
 622        from meerschaum.utils.packages import attempt_import
 623        pandas = attempt_import('pandas')
 624        df = pandas.DataFrame(get_first_valid_dask_partition(df))
 625    return [
 626        col for col, val in df.iloc[0].items()
 627        if not isinstance(val, Hashable)
 628    ]
 629
 630
 631def get_json_cols(df: 'pd.DataFrame') -> List[str]:
 632    """
 633    Get the columns which contain unhashable objects from a Pandas DataFrame.
 634
 635    Parameters
 636    ----------
 637    df: pd.DataFrame
 638        The DataFrame which may contain unhashable objects.
 639
 640    Returns
 641    -------
 642    A list of columns to be encoded as JSON.
 643    """
 644    if df is None:
 645        return []
 646
 647    is_dask = 'dask' in df.__module__ if hasattr(df, '__module__') else False
 648    if is_dask:
 649        df = get_first_valid_dask_partition(df)
 650
 651    if len(df) == 0:
 652        return []
 653
 654    cols_indices = {
 655        col: df[col].first_valid_index()
 656        for col in df.columns
 657    }
 658    return [
 659        col
 660        for col, ix in cols_indices.items()
 661        if (
 662            ix is not None
 663            and
 664            not isinstance(df.loc[ix][col], Hashable)
 665        )
 666    ]
 667
 668
 669def get_numeric_cols(df: 'pd.DataFrame') -> List[str]:
 670    """
 671    Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame.
 672
 673    Parameters
 674    ----------
 675    df: pd.DataFrame
 676        The DataFrame which may contain decimal objects.
 677
 678    Returns
 679    -------
 680    A list of columns to treat as numerics.
 681    """
 682    if df is None:
 683        return []
 684    from decimal import Decimal
 685    is_dask = 'dask' in df.__module__
 686    if is_dask:
 687        df = get_first_valid_dask_partition(df)
 688
 689    if len(df) == 0:
 690        return []
 691
 692    cols_indices = {
 693        col: df[col].first_valid_index()
 694        for col in df.columns
 695    }
 696    return [
 697        col
 698        for col, ix in cols_indices.items()
 699        if (
 700            ix is not None
 701            and
 702            isinstance(df.loc[ix][col], Decimal)
 703        )
 704    ]
 705
 706
 707def get_uuid_cols(df: 'pd.DataFrame') -> List[str]:
 708    """
 709    Get the columns which contain `uuid.UUID` objects from a Pandas DataFrame.
 710
 711    Parameters
 712    ----------
 713    df: pd.DataFrame
 714        The DataFrame which may contain UUID objects.
 715
 716    Returns
 717    -------
 718    A list of columns to treat as UUIDs.
 719    """
 720    if df is None:
 721        return []
 722    from uuid import UUID
 723    is_dask = 'dask' in df.__module__
 724    if is_dask:
 725        df = get_first_valid_dask_partition(df)
 726
 727    if len(df) == 0:
 728        return []
 729
 730    cols_indices = {
 731        col: df[col].first_valid_index()
 732        for col in df.columns
 733    }
 734    return [
 735        col
 736        for col, ix in cols_indices.items()
 737        if (
 738            ix is not None
 739            and
 740            isinstance(df.loc[ix][col], UUID)
 741        )
 742    ]
 743
 744
 745def get_datetime_cols(
 746    df: 'pd.DataFrame',
 747    timezone_aware: bool = True,
 748    timezone_naive: bool = True,
 749) -> List[str]:
 750    """
 751    Get the columns which contain `datetime` or `Timestamp` objects from a Pandas DataFrame.
 752
 753    Parameters
 754    ----------
 755    df: pd.DataFrame
 756        The DataFrame which may contain `datetime` or `Timestamp` objects.
 757
 758    timezone_aware: bool, default True
 759        If `True`, include timezone-aware datetime columns.
 760
 761    timezone_naive: bool, default True
 762        If `True`, include timezone-naive datetime columns.
 763
 764    Returns
 765    -------
 766    A list of columns to treat as datetimes.
 767    """
 768    if not timezone_aware and not timezone_naive:
 769        raise ValueError("`timezone_aware` and `timezone_naive` cannot both be `False`.")
 770
 771    if df is None:
 772        return []
 773
 774    from datetime import datetime
 775    from meerschaum.utils.dtypes import are_dtypes_equal
 776    is_dask = 'dask' in df.__module__
 777    if is_dask:
 778        df = get_first_valid_dask_partition(df)
 779
 780    known_dt_cols = [
 781        col
 782        for col, typ in df.dtypes.items()
 783        if are_dtypes_equal('datetime', str(typ))
 784    ]
 785
 786    if len(df) == 0:
 787        return known_dt_cols
 788
 789    cols_indices = {
 790        col: df[col].first_valid_index()
 791        for col in df.columns
 792        if col not in known_dt_cols
 793    }
 794    pydt_cols = [
 795        col
 796        for col, ix in cols_indices.items()
 797        if (
 798            ix is not None
 799            and
 800            isinstance(df.loc[ix][col], datetime)
 801        )
 802    ]
 803    dt_cols_set = set(known_dt_cols + pydt_cols)
 804    all_dt_cols = [
 805        col
 806        for col in df.columns
 807        if col in dt_cols_set
 808    ]
 809    if timezone_aware and timezone_naive:
 810        return all_dt_cols
 811
 812    known_timezone_aware_dt_cols = [
 813        col
 814        for col in known_dt_cols
 815        if getattr(df[col], 'tz', None) is not None
 816    ]
 817    timezone_aware_pydt_cols = [
 818        col
 819        for col in pydt_cols
 820        if df.loc[cols_indices[col]][col].tzinfo is not None
 821    ]
 822    timezone_aware_dt_cols_set = set(known_timezone_aware_dt_cols + timezone_aware_pydt_cols)
 823    if timezone_aware:
 824        return [
 825            col
 826            for col in all_dt_cols
 827            if col in timezone_aware_pydt_cols
 828        ]
 829
 830    return [
 831        col
 832        for col in all_dt_cols
 833        if col not in timezone_aware_dt_cols_set
 834    ]
 835
 836
 837def get_bytes_cols(df: 'pd.DataFrame') -> List[str]:
 838    """
 839    Get the columns which contain bytes strings from a Pandas DataFrame.
 840
 841    Parameters
 842    ----------
 843    df: pd.DataFrame
 844        The DataFrame which may contain bytes strings.
 845
 846    Returns
 847    -------
 848    A list of columns to treat as bytes.
 849    """
 850    if df is None:
 851        return []
 852    is_dask = 'dask' in df.__module__
 853    if is_dask:
 854        df = get_first_valid_dask_partition(df)
 855
 856    if len(df) == 0:
 857        return []
 858
 859    cols_indices = {
 860        col: df[col].first_valid_index()
 861        for col in df.columns
 862    }
 863    return [
 864        col
 865        for col, ix in cols_indices.items()
 866        if (
 867            ix is not None
 868            and
 869            isinstance(df.loc[ix][col], bytes)
 870        )
 871    ]
 872
 873
 874def get_geometry_cols(
 875    df: 'pd.DataFrame',
 876    with_types_srids: bool = False,
 877) -> Union[List[str], Dict[str, Any]]:
 878    """
 879    Get the columns which contain shapely objects from a Pandas DataFrame.
 880
 881    Parameters
 882    ----------
 883    df: pd.DataFrame
 884        The DataFrame which may contain bytes strings.
 885
 886    with_types_srids: bool, default False
 887        If `True`, return a dictionary mapping columns to geometry types and SRIDs.
 888
 889    Returns
 890    -------
 891    A list of columns to treat as `geometry`.
 892    If `with_types_srids`, return a dictionary mapping columns to tuples in the form (type, SRID).
 893    """
 894    if df is None:
 895        return []
 896
 897    is_dask = 'dask' in df.__module__
 898    if is_dask:
 899        df = get_first_valid_dask_partition(df)
 900
 901    if len(df) == 0:
 902        return []
 903
 904    cols_indices = {
 905        col: df[col].first_valid_index()
 906        for col in df.columns
 907    }
 908    geo_cols = [
 909        col
 910        for col, ix in cols_indices.items()
 911        if (
 912            ix is not None
 913            and
 914            'shapely' in str(type(df.loc[ix][col]))
 915        )
 916    ]
 917    if not with_types_srids:
 918        return geo_cols
 919
 920    gpd = mrsm.attempt_import('geopandas', lazy=False)
 921    geo_cols_types_srids = {}
 922    for col in geo_cols:
 923        try:
 924            sample_geo_series = gpd.GeoSeries(df[col], crs=None)
 925            geometry_types = {
 926                geom.geom_type
 927                for geom in sample_geo_series
 928                if hasattr(geom, 'geom_type')
 929            }
 930            geometry_has_z = any(getattr(geom, 'has_z', False) for geom in sample_geo_series)
 931            srid = (
 932                (
 933                    sample_geo_series.crs.sub_crs_list[0].to_epsg()
 934                    if sample_geo_series.crs.is_compound
 935                    else sample_geo_series.crs.to_epsg()
 936                )
 937                if sample_geo_series.crs
 938                else 0
 939            )
 940            geometry_type = list(geometry_types)[0] if len(geometry_types) == 1 else 'geometry'
 941            if geometry_type != 'geometry' and geometry_has_z:
 942                geometry_type = geometry_type + 'Z'
 943        except Exception:
 944            srid = 0
 945            geometry_type = 'geometry'
 946        geo_cols_types_srids[col] = (geometry_type, srid)
 947
 948    return geo_cols_types_srids
 949
 950
 951def enforce_dtypes(
 952    df: 'pd.DataFrame',
 953    dtypes: Dict[str, str],
 954    safe_copy: bool = True,
 955    coerce_numeric: bool = True,
 956    coerce_timezone: bool = True,
 957    strip_timezone: bool = False,
 958    debug: bool = False,
 959) -> 'pd.DataFrame':
 960    """
 961    Enforce the `dtypes` dictionary on a DataFrame.
 962
 963    Parameters
 964    ----------
 965    df: pd.DataFrame
 966        The DataFrame on which to enforce dtypes.
 967
 968    dtypes: Dict[str, str]
 969        The data types to attempt to enforce on the DataFrame.
 970
 971    safe_copy: bool, default True
 972        If `True`, create a copy before comparing and modifying the dataframes.
 973        Setting to `False` may mutate the DataFrames.
 974        See `meerschaum.utils.dataframe.filter_unseen_df`.
 975
 976    coerce_numeric: bool, default True
 977        If `True`, convert float and int collisions to numeric.
 978
 979    coerce_timezone: bool, default True
 980        If `True`, convert datetimes to UTC.
 981
 982    strip_timezone: bool, default False
 983        If `coerce_timezone` and `strip_timezone` are `True`,
 984        remove timezone information from datetimes.
 985
 986    debug: bool, default False
 987        Verbosity toggle.
 988
 989    Returns
 990    -------
 991    The Pandas DataFrame with the types enforced.
 992    """
 993    import json
 994    import functools
 995    from meerschaum.utils.debug import dprint
 996    from meerschaum.utils.formatting import pprint
 997    from meerschaum.utils.dtypes import (
 998        are_dtypes_equal,
 999        to_pandas_dtype,
1000        is_dtype_numeric,
1001        attempt_cast_to_numeric,
1002        attempt_cast_to_uuid,
1003        attempt_cast_to_bytes,
1004        attempt_cast_to_geometry,
1005        coerce_timezone as _coerce_timezone,
1006    )
1007    from meerschaum.utils.dtypes.sql import get_numeric_precision_scale
1008    pandas = mrsm.attempt_import('pandas')
1009    is_dask = 'dask' in df.__module__
1010    if safe_copy:
1011        df = df.copy()
1012    if len(df.columns) == 0:
1013        if debug:
1014            dprint("Incoming DataFrame has no columns. Skipping enforcement...")
1015        return df
1016
1017    pipe_pandas_dtypes = {
1018        col: to_pandas_dtype(typ)
1019        for col, typ in dtypes.items()
1020    }
1021    json_cols = [
1022        col
1023        for col, typ in dtypes.items()
1024        if typ == 'json'
1025    ]
1026    numeric_cols = [
1027        col
1028        for col, typ in dtypes.items()
1029        if typ.startswith('numeric')
1030    ]
1031    geometry_cols = [
1032        col
1033        for col, typ in dtypes.items()
1034        if typ.startswith('geometry') or typ.startswith('geography')
1035    ]
1036    uuid_cols = [
1037        col
1038        for col, typ in dtypes.items()
1039        if typ == 'uuid'
1040    ]
1041    bytes_cols = [
1042        col
1043        for col, typ in dtypes.items()
1044        if typ == 'bytes'
1045    ]
1046    datetime_cols = [
1047        col
1048        for col, typ in dtypes.items()
1049        if are_dtypes_equal(typ, 'datetime')
1050    ]
1051    df_numeric_cols = get_numeric_cols(df)
1052    if debug:
1053        dprint("Desired data types:")
1054        pprint(dtypes)
1055        dprint("Data types for incoming DataFrame:")
1056        pprint({_col: str(_typ) for _col, _typ in df.dtypes.items()})
1057
1058    if json_cols and len(df) > 0:
1059        if debug:
1060            dprint(f"Checking columns for JSON encoding: {json_cols}")
1061        for col in json_cols:
1062            if col in df.columns:
1063                try:
1064                    df[col] = df[col].apply(
1065                        (
1066                            lambda x: (
1067                                json.loads(x)
1068                                if isinstance(x, str)
1069                                else x
1070                            )
1071                        )
1072                    )
1073                except Exception as e:
1074                    if debug:
1075                        dprint(f"Unable to parse column '{col}' as JSON:\n{e}")
1076
1077    if numeric_cols:
1078        if debug:
1079            dprint(f"Checking for numerics: {numeric_cols}")
1080        for col in numeric_cols:
1081            precision, scale = get_numeric_precision_scale(None, dtypes.get(col, ''))
1082            if col in df.columns:
1083                try:
1084                    df[col] = df[col].apply(
1085                        functools.partial(
1086                            attempt_cast_to_numeric,
1087                            quantize=True,
1088                            precision=precision,
1089                            scale=scale,
1090                        )
1091                    )
1092                except Exception as e:
1093                    if debug:
1094                        dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}")
1095
1096    if uuid_cols:
1097        if debug:
1098            dprint(f"Checking for UUIDs: {uuid_cols}")
1099        for col in uuid_cols:
1100            if col in df.columns:
1101                try:
1102                    df[col] = df[col].apply(attempt_cast_to_uuid)
1103                except Exception as e:
1104                    if debug:
1105                        dprint(f"Unable to parse column '{col}' as UUID:\n{e}")
1106
1107    if bytes_cols:
1108        if debug:
1109            dprint(f"Checking for bytes: {bytes_cols}")
1110        for col in bytes_cols:
1111            if col in df.columns:
1112                try:
1113                    df[col] = df[col].apply(attempt_cast_to_bytes)
1114                except Exception as e:
1115                    if debug:
1116                        dprint(f"Unable to parse column '{col}' as bytes:\n{e}")
1117
1118    if datetime_cols and coerce_timezone:
1119        if debug:
1120            dprint(f"Checking for datetime conversion: {datetime_cols}")
1121        for col in datetime_cols:
1122            if col in df.columns:
1123                df[col] = _coerce_timezone(df[col], strip_utc=strip_timezone)
1124
1125    if geometry_cols:
1126        geopandas = mrsm.attempt_import('geopandas')
1127        if debug:
1128            dprint(f"Checking for geometry: {geometry_cols}")
1129        parsed_geom_cols = []
1130        for col in geometry_cols:
1131            try:
1132                df[col] = df[col].apply(attempt_cast_to_geometry)
1133                parsed_geom_cols.append(col)
1134            except Exception as e:
1135                if debug:
1136                    dprint(f"Unable to parse column '{col}' as geometry:\n{e}")
1137
1138        if parsed_geom_cols:
1139            if debug:
1140                dprint(f"Converting to GeoDataFrame (geometry column: '{parsed_geom_cols[0]}')...")
1141            df = geopandas.GeoDataFrame(df, geometry=parsed_geom_cols[0])
1142            try:
1143                df.rename_geometry(parsed_geom_cols[0], inplace=True)
1144            except ValueError:
1145                pass
1146
1147    df_dtypes = {c: str(t) for c, t in df.dtypes.items()}
1148    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
1149        if debug:
1150            dprint("Data types match. Exiting enforcement...")
1151        return df
1152
1153    common_dtypes = {}
1154    common_diff_dtypes = {}
1155    for col, typ in pipe_pandas_dtypes.items():
1156        if col in df_dtypes:
1157            common_dtypes[col] = typ
1158            if not are_dtypes_equal(typ, df_dtypes[col]):
1159                common_diff_dtypes[col] = df_dtypes[col]
1160
1161    if debug:
1162        dprint("Common columns with different dtypes:")
1163        pprint(common_diff_dtypes)
1164
1165    detected_dt_cols = {}
1166    for col, typ in common_diff_dtypes.items():
1167        if 'datetime' in typ and 'datetime' in common_dtypes[col]:
1168            df_dtypes[col] = typ
1169            detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col])
1170    for col in detected_dt_cols:
1171        del common_diff_dtypes[col]
1172
1173    if debug:
1174        dprint("Common columns with different dtypes (after dates):")
1175        pprint(common_diff_dtypes)
1176
1177    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
1178        if debug:
1179            dprint(
1180                "The incoming DataFrame has mostly the same types, skipping enforcement."
1181                + "The only detected difference was in the following datetime columns."
1182            )
1183            pprint(detected_dt_cols)
1184        return df
1185
1186    for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items():
1187        previous_typ = common_dtypes[col]
1188        mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ))
1189        explicitly_float = are_dtypes_equal(dtypes.get(col, 'object'), 'float')
1190        explicitly_numeric = dtypes.get(col, 'numeric').startswith('numeric')
1191        cast_to_numeric = (
1192            explicitly_numeric
1193            or col in df_numeric_cols
1194            or (mixed_numeric_types and not explicitly_float)
1195        ) and coerce_numeric
1196        if cast_to_numeric:
1197            common_dtypes[col] = attempt_cast_to_numeric
1198            common_diff_dtypes[col] = attempt_cast_to_numeric
1199
1200    for d in common_diff_dtypes:
1201        t = common_dtypes[d]
1202        if debug:
1203            dprint(f"Casting column {d} to dtype {t}.")
1204        try:
1205            df[d] = (
1206                df[d].apply(t)
1207                if callable(t)
1208                else df[d].astype(t)
1209            )
1210        except Exception as e:
1211            if debug:
1212                dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}")
1213            if 'int' in str(t).lower():
1214                try:
1215                    df[d] = df[d].astype('float64').astype(t)
1216                except Exception:
1217                    if debug:
1218                        dprint(f"Was unable to convert to float then {t}.")
1219    return df
1220
1221
1222def get_datetime_bound_from_df(
1223    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]],
1224    datetime_column: str,
1225    minimum: bool = True,
1226) -> Union[int, datetime, None]:
1227    """
1228    Return the minimum or maximum datetime (or integer) from a DataFrame.
1229
1230    Parameters
1231    ----------
1232    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]
1233        The DataFrame, list, or dict which contains the range axis.
1234
1235    datetime_column: str
1236        The name of the datetime (or int) column.
1237
1238    minimum: bool
1239        Whether to return the minimum (default) or maximum value.
1240
1241    Returns
1242    -------
1243    The minimum or maximum datetime value in the dataframe, or `None`.
1244    """
1245    from meerschaum.utils.dtypes import to_datetime, value_is_null
1246
1247    if df is None:
1248        return None
1249    if not datetime_column:
1250        return None
1251
1252    def compare(a, b):
1253        if a is None:
1254            return b
1255        if b is None:
1256            return a
1257        if minimum:
1258            return a if a < b else b
1259        return a if a > b else b
1260
1261    if isinstance(df, list):
1262        if len(df) == 0:
1263            return None
1264        best_yet = df[0].get(datetime_column, None)
1265        for doc in df:
1266            val = doc.get(datetime_column, None)
1267            best_yet = compare(best_yet, val)
1268        return best_yet
1269
1270    if isinstance(df, dict):
1271        if datetime_column not in df:
1272            return None
1273        best_yet = df[datetime_column][0]
1274        for val in df[datetime_column]:
1275            best_yet = compare(best_yet, val)
1276        return best_yet
1277
1278    if 'DataFrame' in str(type(df)):
1279        from meerschaum.utils.dtypes import are_dtypes_equal
1280        pandas = mrsm.attempt_import('pandas')
1281        is_dask = 'dask' in df.__module__
1282
1283        if datetime_column not in df.columns:
1284            return None
1285
1286        try:
1287            dt_val = (
1288                df[datetime_column].min(skipna=True)
1289                if minimum
1290                else df[datetime_column].max(skipna=True)
1291            )
1292        except Exception:
1293            dt_val = pandas.NA
1294        if is_dask and dt_val is not None and dt_val is not pandas.NA:
1295            dt_val = dt_val.compute()
1296
1297        return (
1298            to_datetime(dt_val, as_pydatetime=True)
1299            if are_dtypes_equal(str(type(dt_val)), 'datetime')
1300            else (dt_val if not value_is_null(dt_val) else None)
1301        )
1302
1303    return None
1304
1305
1306def get_unique_index_values(
1307    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]],
1308    indices: List[str],
1309) -> Dict[str, List[Any]]:
1310    """
1311    Return a dictionary of the unique index values in a DataFrame.
1312
1313    Parameters
1314    ----------
1315    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]
1316        The dataframe (or list or dict) which contains index values.
1317
1318    indices: List[str]
1319        The list of index columns.
1320
1321    Returns
1322    -------
1323    A dictionary mapping indices to unique values.
1324    """
1325    if df is None:
1326        return {}
1327    if 'dataframe' in str(type(df)).lower():
1328        pandas = mrsm.attempt_import('pandas')
1329        return {
1330            col: list({
1331                (val if val is not pandas.NA else None)
1332                for val in df[col].unique()
1333            })
1334            for col in indices
1335            if col in df.columns
1336        }
1337
1338    unique_indices = defaultdict(lambda: set())
1339    if isinstance(df, list):
1340        for doc in df:
1341            for index in indices:
1342                if index in doc:
1343                    unique_indices[index].add(doc[index])
1344
1345    elif isinstance(df, dict):
1346        for index in indices:
1347            if index in df:
1348                unique_indices[index] = unique_indices[index].union(set(df[index]))
1349
1350    return {key: list(val) for key, val in unique_indices.items()}
1351
1352
1353def df_is_chunk_generator(df: Any) -> bool:
1354    """
1355    Determine whether to treat `df` as a chunk generator.
1356
1357    Note this should only be used in a context where generators are expected,
1358    as it will return `True` for any iterable.
1359
1360    Parameters
1361    ----------
1362    The DataFrame or chunk generator to evaluate.
1363
1364    Returns
1365    -------
1366    A `bool` indicating whether to treat `df` as a generator.
1367    """
1368    return (
1369        not isinstance(df, (dict, list, str))
1370        and 'DataFrame' not in str(type(df))
1371        and isinstance(df, (Generator, Iterable, Iterator))
1372    )
1373
1374
1375def chunksize_to_npartitions(chunksize: Optional[int]) -> int:
1376    """
1377    Return the Dask `npartitions` value for a given `chunksize`.
1378    """
1379    if chunksize == -1:
1380        from meerschaum.config import get_config
1381        chunksize = get_config('system', 'connectors', 'sql', 'chunksize')
1382    if chunksize is None:
1383        return 1
1384    return -1 * chunksize
1385
1386
1387def df_from_literal(
1388    pipe: Optional[mrsm.Pipe] = None,
1389    literal: str = None,
1390    debug: bool = False
1391) -> 'pd.DataFrame':
1392    """
1393    Construct a dataframe from a literal value, using the pipe's datetime and value column names.
1394
1395    Parameters
1396    ----------
1397    pipe: Optional['meerschaum.Pipe'], default None
1398        The pipe which will consume the literal value.
1399
1400    Returns
1401    -------
1402    A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
1403    and the literal as the value.
1404    """
1405    from meerschaum.utils.packages import import_pandas
1406    from meerschaum.utils.warnings import error, warn
1407    from meerschaum.utils.debug import dprint
1408
1409    if pipe is None or literal is None:
1410        error("Please provide a Pipe and a literal value")
1411    ### this will raise an error if the columns are undefined
1412    dt_name, val_name = pipe.get_columns('datetime', 'value')
1413
1414    val = literal
1415    if isinstance(literal, str):
1416        if debug:
1417            dprint(f"Received literal string: '{literal}'")
1418        import ast
1419        try:
1420            val = ast.literal_eval(literal)
1421        except Exception:
1422            warn(
1423                "Failed to parse value from string:\n" + f"{literal}" +
1424                "\n\nWill cast as a string instead."\
1425            )
1426            val = literal
1427
1428    from datetime import datetime, timezone
1429    now = datetime.now(timezone.utc).replace(tzinfo=None)
1430
1431    pd = import_pandas()
1432    return pd.DataFrame({dt_name: [now], val_name: [val]})
1433
1434
1435def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]:
1436    """
1437    Return the first valid Dask DataFrame partition (if possible).
1438    """
1439    pdf = None
1440    for partition in ddf.partitions:
1441        try:
1442            pdf = partition.compute()
1443        except Exception:
1444            continue
1445        if len(pdf) > 0:
1446            return pdf
1447    _ = mrsm.attempt_import('partd', lazy=False)
1448    return ddf.compute()
1449
1450
1451def query_df(
1452    df: 'pd.DataFrame',
1453    params: Optional[Dict[str, Any]] = None,
1454    begin: Union[datetime, int, None] = None,
1455    end: Union[datetime, int, None] = None,
1456    datetime_column: Optional[str] = None,
1457    select_columns: Optional[List[str]] = None,
1458    omit_columns: Optional[List[str]] = None,
1459    inplace: bool = False,
1460    reset_index: bool = False,
1461    coerce_types: bool = False,
1462    debug: bool = False,
1463) -> 'pd.DataFrame':
1464    """
1465    Query the dataframe with the params dictionary.
1466
1467    Parameters
1468    ----------
1469    df: pd.DataFrame
1470        The DataFrame to query against.
1471
1472    params: Optional[Dict[str, Any]], default None
1473        The parameters dictionary to use for the query.
1474
1475    begin: Union[datetime, int, None], default None
1476        If `begin` and `datetime_column` are provided, only return rows with a timestamp
1477        greater than or equal to this value.
1478
1479    end: Union[datetime, int, None], default None
1480        If `begin` and `datetime_column` are provided, only return rows with a timestamp
1481        less than this value.
1482
1483    datetime_column: Optional[str], default None
1484        A `datetime_column` must be provided to use `begin` and `end`.
1485
1486    select_columns: Optional[List[str]], default None
1487        If provided, only return these columns.
1488
1489    omit_columns: Optional[List[str]], default None
1490        If provided, do not include these columns in the result.
1491
1492    inplace: bool, default False
1493        If `True`, modify the DataFrame inplace rather than creating a new DataFrame.
1494
1495    reset_index: bool, default False
1496        If `True`, reset the index in the resulting DataFrame.
1497
1498    coerce_types: bool, default False
1499        If `True`, cast the dataframe and parameters as strings before querying.
1500
1501    Returns
1502    -------
1503    A Pandas DataFrame query result.
1504    """
1505
1506    def _process_select_columns(_df):
1507        if not select_columns:
1508            return
1509        for col in list(_df.columns):
1510            if col not in select_columns:
1511                del _df[col]
1512
1513    def _process_omit_columns(_df):
1514        if not omit_columns:
1515            return
1516        for col in list(_df.columns):
1517            if col in omit_columns:
1518                del _df[col]
1519
1520    if not params and not begin and not end:
1521        if not inplace:
1522            df = df.copy()
1523        _process_select_columns(df)
1524        _process_omit_columns(df)
1525        return df
1526
1527    from meerschaum.utils.debug import dprint
1528    from meerschaum.utils.misc import get_in_ex_params
1529    from meerschaum.utils.warnings import warn
1530    from meerschaum.utils.dtypes import are_dtypes_equal, value_is_null
1531    dateutil_parser = mrsm.attempt_import('dateutil.parser')
1532    pandas = mrsm.attempt_import('pandas')
1533    NA = pandas.NA
1534
1535    if params:
1536        params = params.copy()
1537        for key, val in {k: v for k, v in params.items()}.items():
1538            if isinstance(val, (list, tuple)):
1539                if None in val:
1540                    val = [item for item in val if item is not None] + [NA]
1541                    params[key] = val
1542                if coerce_types:
1543                    params[key] = [str(x) for x in val]
1544            else:
1545                if value_is_null(val):
1546                    val = NA
1547                    params[key] = NA
1548                if coerce_types:
1549                    params[key] = str(val)
1550
1551    dtypes = {col: str(typ) for col, typ in df.dtypes.items()}
1552
1553    if inplace:
1554        df.fillna(NA, inplace=True)
1555    else:
1556        df = df.infer_objects().fillna(NA)
1557
1558    if isinstance(begin, str):
1559        begin = dateutil_parser.parse(begin)
1560    if isinstance(end, str):
1561        end = dateutil_parser.parse(end)
1562
1563    if begin is not None or end is not None:
1564        if not datetime_column or datetime_column not in df.columns:
1565            warn(
1566                f"The datetime column '{datetime_column}' is not present in the Dataframe, "
1567                + "ignoring begin and end...",
1568            )
1569            begin, end = None, None
1570
1571    if debug:
1572        dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}")
1573
1574    if datetime_column and (begin is not None or end is not None):
1575        if debug:
1576            dprint("Checking for datetime column compatability.")
1577
1578        from meerschaum.utils.dtypes import coerce_timezone
1579        df_is_dt = are_dtypes_equal(str(df.dtypes[datetime_column]), 'datetime')
1580        begin_is_int = are_dtypes_equal(str(type(begin)), 'int')
1581        end_is_int = are_dtypes_equal(str(type(end)), 'int')
1582
1583        if df_is_dt:
1584            df_tz = (
1585                getattr(df[datetime_column].dt, 'tz', None)
1586                if hasattr(df[datetime_column], 'dt')
1587                else None
1588            )
1589
1590            if begin_is_int:
1591                begin = datetime.fromtimestamp(int(begin), timezone.utc).replace(tzinfo=None)
1592                if debug:
1593                    dprint(f"`begin` will be cast to '{begin}'.")
1594            if end_is_int:
1595                end = datetime.fromtimestamp(int(end), timezone.utc).replace(tzinfo=None)
1596                if debug:
1597                    dprint(f"`end` will be cast to '{end}'.")
1598
1599            begin = coerce_timezone(begin, strip_utc=(df_tz is None)) if begin is not None else None
1600            end = coerce_timezone(end, strip_utc=(df_tz is None)) if begin is not None else None
1601
1602    in_ex_params = get_in_ex_params(params)
1603
1604    masks = [
1605        (
1606            (df[datetime_column] >= begin)
1607            if begin is not None and datetime_column
1608            else True
1609        ) & (
1610            (df[datetime_column] < end)
1611            if end is not None and datetime_column
1612            else True
1613        )
1614    ]
1615
1616    masks.extend([
1617        (
1618            (
1619                (df[col] if not coerce_types else df[col].astype(str)).isin(in_vals)
1620                if in_vals
1621                else True
1622            ) & (
1623                ~(df[col] if not coerce_types else df[col].astype(str)).isin(ex_vals)
1624                if ex_vals
1625                else True
1626            )
1627        )
1628        for col, (in_vals, ex_vals) in in_ex_params.items()
1629        if col in df.columns
1630    ])
1631    query_mask = masks[0]
1632    for mask in masks[1:]:
1633        query_mask = query_mask & mask
1634
1635    original_cols = df.columns
1636
1637    ### NOTE: We must cast bool columns to `boolean[pyarrow]`
1638    ###       to allow for `<NA>` values.
1639    bool_cols = [
1640        col
1641        for col, typ in df.dtypes.items()
1642        if are_dtypes_equal(str(typ), 'bool')
1643    ]
1644    for col in bool_cols:
1645        df[col] = df[col].astype('boolean[pyarrow]')
1646
1647    if not isinstance(query_mask, bool):
1648        df['__mrsm_mask'] = (
1649            query_mask.astype('boolean[pyarrow]')
1650            if hasattr(query_mask, 'astype')
1651            else query_mask
1652        )
1653
1654        if inplace:
1655            df.where(query_mask, other=NA, inplace=True)
1656            df.dropna(how='all', inplace=True)
1657            result_df = df
1658        else:
1659            result_df = df.where(query_mask, other=NA)
1660            result_df.dropna(how='all', inplace=True)
1661
1662    else:
1663        result_df = df
1664
1665    if '__mrsm_mask' in df.columns:
1666        del df['__mrsm_mask']
1667    if '__mrsm_mask' in result_df.columns:
1668        del result_df['__mrsm_mask']
1669
1670    if reset_index:
1671        result_df.reset_index(drop=True, inplace=True)
1672
1673    result_df = enforce_dtypes(
1674        result_df,
1675        dtypes,
1676        safe_copy=False,
1677        debug=debug,
1678        coerce_numeric=False,
1679        coerce_timezone=False,
1680    )
1681
1682    if select_columns == ['*']:
1683        select_columns = None
1684
1685    if not select_columns and not omit_columns:
1686        return result_df[original_cols]
1687
1688    _process_select_columns(result_df)
1689    _process_omit_columns(result_df)
1690
1691    return result_df
1692
1693
1694def to_json(
1695    df: 'pd.DataFrame',
1696    safe_copy: bool = True,
1697    orient: str = 'records',
1698    date_format: str = 'iso',
1699    date_unit: str = 'us',
1700    double_precision: int = 15,
1701    geometry_format: str = 'geojson',
1702    **kwargs: Any
1703) -> str:
1704    """
1705    Serialize the given dataframe as a JSON string.
1706
1707    Parameters
1708    ----------
1709    df: pd.DataFrame
1710        The DataFrame to be serialized.
1711
1712    safe_copy: bool, default True
1713        If `False`, modify the DataFrame inplace.
1714
1715    date_format: str, default 'iso'
1716        The default format for timestamps.
1717
1718    date_unit: str, default 'us'
1719        The precision of the timestamps.
1720
1721    double_precision: int, default 15
1722        The number of decimal places to use when encoding floating point values (maximum 15).
1723
1724    geometry_format: str, default 'geojson'
1725        The serialization format for geometry data.
1726        Accepted values are `geojson`, `wkb_hex`, and `wkt`.
1727
1728    Returns
1729    -------
1730    A JSON string.
1731    """
1732    import warnings
1733    import functools
1734    from meerschaum.utils.packages import import_pandas
1735    from meerschaum.utils.dtypes import (
1736        serialize_bytes,
1737        serialize_decimal,
1738        serialize_geometry,
1739    )
1740    pd = import_pandas()
1741    uuid_cols = get_uuid_cols(df)
1742    bytes_cols = get_bytes_cols(df)
1743    numeric_cols = get_numeric_cols(df)
1744    geometry_cols = get_geometry_cols(df)
1745    if safe_copy and bool(uuid_cols or bytes_cols or geometry_cols or numeric_cols):
1746        df = df.copy()
1747    for col in uuid_cols:
1748        df[col] = df[col].astype(str)
1749    for col in bytes_cols:
1750        df[col] = df[col].apply(serialize_bytes)
1751    for col in numeric_cols:
1752        df[col] = df[col].apply(serialize_decimal)
1753    with warnings.catch_warnings():
1754        warnings.simplefilter("ignore")
1755        for col in geometry_cols:
1756            df[col] = df[col].apply(
1757                functools.partial(
1758                    serialize_geometry,
1759                    geometry_format=geometry_format,
1760                )
1761            )
1762    return df.infer_objects(copy=False).fillna(pd.NA).to_json(
1763        date_format=date_format,
1764        date_unit=date_unit,
1765        double_precision=double_precision,
1766        orient=orient,
1767        **kwargs
1768    )
def add_missing_cols_to_df( df: pandas.core.frame.DataFrame, dtypes: Dict[str, Any]) -> pandas.core.frame.DataFrame:
26def add_missing_cols_to_df(
27    df: 'pd.DataFrame',
28    dtypes: Dict[str, Any],
29) -> 'pd.DataFrame':
30    """
31    Add columns from the dtypes dictionary as null columns to a new DataFrame.
32
33    Parameters
34    ----------
35    df: pd.DataFrame
36        The dataframe we should copy and add null columns.
37
38    dtypes:
39        The data types dictionary which may contain keys not present in `df.columns`.
40
41    Returns
42    -------
43    A new `DataFrame` with the keys from `dtypes` added as null columns.
44    If `df.dtypes` is the same as `dtypes`, then return a reference to `df`.
45    NOTE: This will not ensure that dtypes are enforced!
46
47    Examples
48    --------
49    >>> import pandas as pd
50    >>> df = pd.DataFrame([{'a': 1}])
51    >>> dtypes = {'b': 'Int64'}
52    >>> add_missing_cols_to_df(df, dtypes)
53          a  b
54       0  1  <NA>
55    >>> add_missing_cols_to_df(df, dtypes).dtypes
56    a    int64
57    b    Int64
58    dtype: object
59    >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes
60    a    int64
61    dtype: object
62    >>> 
63    """
64    if set(df.columns) == set(dtypes):
65        return df
66
67    from meerschaum.utils.packages import attempt_import
68    from meerschaum.utils.dtypes import to_pandas_dtype
69    pandas = attempt_import('pandas')
70
71    def build_series(dtype: str):
72        return pandas.Series([], dtype=to_pandas_dtype(dtype))
73
74    assign_kwargs = {
75        str(col): build_series(str(typ))
76        for col, typ in dtypes.items()
77        if col not in df.columns
78    }
79    df_with_cols = df.assign(**assign_kwargs)
80    for col in assign_kwargs:
81        df_with_cols[col] = df_with_cols[col].fillna(pandas.NA)
82    return df_with_cols

Add columns from the dtypes dictionary as null columns to a new DataFrame.

Parameters
  • df (pd.DataFrame): The dataframe we should copy and add null columns.
  • dtypes:: The data types dictionary which may contain keys not present in df.columns.
Returns
  • A new DataFrame with the keys from dtypes added as null columns.
  • If df.dtypes is the same as dtypes, then return a reference to df.
  • NOTE (This will not ensure that dtypes are enforced!):
Examples
>>> import pandas as pd
>>> df = pd.DataFrame([{'a': 1}])
>>> dtypes = {'b': 'Int64'}
>>> add_missing_cols_to_df(df, dtypes)
      a  b
   0  1  <NA>
>>> add_missing_cols_to_df(df, dtypes)meerschaum.utils.dtypes
a    int64
b    Int64
dtype: object
>>> add_missing_cols_to_df(df, {'a': 'object'})meerschaum.utils.dtypes
a    int64
dtype: object
>>>
def filter_unseen_df( old_df: pandas.core.frame.DataFrame, new_df: pandas.core.frame.DataFrame, safe_copy: bool = True, dtypes: Optional[Dict[str, Any]] = None, include_unchanged_columns: bool = False, coerce_mixed_numerics: bool = True, debug: bool = False) -> pandas.core.frame.DataFrame:
 85def filter_unseen_df(
 86    old_df: 'pd.DataFrame',
 87    new_df: 'pd.DataFrame',
 88    safe_copy: bool = True,
 89    dtypes: Optional[Dict[str, Any]] = None,
 90    include_unchanged_columns: bool = False,
 91    coerce_mixed_numerics: bool = True,
 92    debug: bool = False,
 93) -> 'pd.DataFrame':
 94    """
 95    Left join two DataFrames to find the newest unseen data.
 96
 97    Parameters
 98    ----------
 99    old_df: 'pd.DataFrame'
100        The original (target) dataframe. Acts as a filter on the `new_df`.
101
102    new_df: 'pd.DataFrame'
103        The fetched (source) dataframe. Rows that are contained in `old_df` are removed.
104
105    safe_copy: bool, default True
106        If `True`, create a copy before comparing and modifying the dataframes.
107        Setting to `False` may mutate the DataFrames.
108
109    dtypes: Optional[Dict[str, Any]], default None
110        Optionally specify the datatypes of the dataframe.
111
112    include_unchanged_columns: bool, default False
113        If `True`, include columns which haven't changed on rows which have changed.
114
115    coerce_mixed_numerics: bool, default True
116        If `True`, cast mixed integer and float columns between the old and new dataframes into
117        numeric values (`decimal.Decimal`).
118
119    debug: bool, default False
120        Verbosity toggle.
121
122    Returns
123    -------
124    A pandas dataframe of the new, unseen rows in `new_df`.
125
126    Examples
127    --------
128    ```python
129    >>> import pandas as pd
130    >>> df1 = pd.DataFrame({'a': [1,2]})
131    >>> df2 = pd.DataFrame({'a': [2,3]})
132    >>> filter_unseen_df(df1, df2)
133       a
134    0  3
135
136    ```
137
138    """
139    if old_df is None:
140        return new_df
141
142    if safe_copy:
143        old_df = old_df.copy()
144        new_df = new_df.copy()
145
146    import json
147    import functools
148    import traceback
149    from meerschaum.utils.warnings import warn
150    from meerschaum.utils.packages import import_pandas, attempt_import
151    from meerschaum.utils.dtypes import (
152        to_pandas_dtype,
153        are_dtypes_equal,
154        attempt_cast_to_numeric,
155        attempt_cast_to_uuid,
156        attempt_cast_to_bytes,
157        attempt_cast_to_geometry,
158        coerce_timezone,
159        serialize_decimal,
160    )
161    from meerschaum.utils.dtypes.sql import get_numeric_precision_scale
162    pd = import_pandas(debug=debug)
163    is_dask = 'dask' in new_df.__module__
164    if is_dask:
165        pandas = attempt_import('pandas')
166        _ = attempt_import('partd', lazy=False)
167        dd = attempt_import('dask.dataframe')
168        merge = dd.merge
169        NA = pandas.NA
170    else:
171        merge = pd.merge
172        NA = pd.NA
173
174    new_df_dtypes = dict(new_df.dtypes)
175    old_df_dtypes = dict(old_df.dtypes)
176
177    same_cols = set(new_df.columns) == set(old_df.columns)
178    if not same_cols:
179        new_df = add_missing_cols_to_df(new_df, old_df_dtypes)
180        old_df = add_missing_cols_to_df(old_df, new_df_dtypes)
181
182        new_types_missing_from_old = {
183            col: typ
184            for col, typ in new_df_dtypes.items()
185            if col not in old_df_dtypes
186        }
187        old_types_missing_from_new = {
188            col: typ
189            for col, typ in new_df_dtypes.items()
190            if col not in old_df_dtypes
191        }
192        old_df_dtypes.update(new_types_missing_from_old)
193        new_df_dtypes.update(old_types_missing_from_new)
194
195    ### Edge case: two empty lists cast to DFs.
196    elif len(new_df.columns) == 0:
197        return new_df
198
199    try:
200        ### Order matters when checking equality.
201        new_df = new_df[old_df.columns]
202
203    except Exception as e:
204        warn(
205            "Was not able to cast old columns onto new DataFrame. " +
206            f"Are both DataFrames the same shape? Error:\n{e}",
207            stacklevel=3,
208        )
209        return new_df[list(new_df_dtypes.keys())]
210
211    ### assume the old_df knows what it's doing, even if it's technically wrong.
212    if dtypes is None:
213        dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()}
214
215    dtypes = {
216        col: to_pandas_dtype(typ)
217        for col, typ in dtypes.items()
218        if col in new_df_dtypes and col in old_df_dtypes
219    }
220    for col, typ in new_df_dtypes.items():
221        if col not in dtypes:
222            dtypes[col] = typ
223
224    numeric_cols_precisions_scales = {
225        col: get_numeric_precision_scale(None, typ)
226        for col, typ in dtypes.items()
227        if col and str(typ).lower().startswith('numeric')
228    }
229
230    dt_dtypes = {
231        col: typ
232        for col, typ in dtypes.items()
233        if are_dtypes_equal(typ, 'datetime')
234    }
235    non_dt_dtypes = {
236        col: typ
237        for col, typ in dtypes.items()
238        if col not in dt_dtypes
239    }
240
241    cast_non_dt_cols = True
242    try:
243        new_df = new_df.astype(non_dt_dtypes)
244        cast_non_dt_cols = False
245    except Exception as e:
246        warn(
247            f"Was not able to cast the new DataFrame to the given dtypes.\n{e}"
248        )
249
250    cast_dt_cols = True
251    try:
252        for col, typ in dt_dtypes.items():
253            strip_utc = (
254                (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]'
255            )
256            if col in old_df.columns:
257                old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc)
258            if col in new_df.columns:
259                new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc)
260        cast_dt_cols = False
261    except Exception as e:
262        warn(f"Could not cast datetime columns:\n{e}")
263
264    cast_cols = cast_dt_cols or cast_non_dt_cols
265
266    new_numeric_cols_existing = get_numeric_cols(new_df)
267    old_numeric_cols = get_numeric_cols(old_df)
268    for col, typ in {k: v for k, v in dtypes.items()}.items():
269        if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')):
270            new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float')
271            new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int')
272            new_is_numeric = col in new_numeric_cols_existing
273            old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float')
274            old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int')
275            old_is_numeric = col in old_numeric_cols
276
277            if (
278                coerce_mixed_numerics
279                and
280                (new_is_float or new_is_int or new_is_numeric)
281                and
282                (old_is_float or old_is_int or old_is_numeric)
283            ):
284                dtypes[col] = attempt_cast_to_numeric
285                cast_cols = True
286                continue
287
288            ### Fallback to object if the types don't match.
289            warn(
290                f"Detected different types for '{col}' "
291                + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), "
292                + "falling back to 'object'..."
293            )
294            dtypes[col] = 'object'
295            cast_cols = True
296
297    if cast_cols:
298        for col, dtype in dtypes.items():
299            if col in new_df.columns:
300                try:
301                    new_df[col] = (
302                        new_df[col].astype(dtype)
303                        if not callable(dtype)
304                        else new_df[col].apply(dtype)
305                    )
306                except Exception as e:
307                    warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}")
308
309    serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str)
310    new_json_cols = get_json_cols(new_df)
311    old_json_cols = get_json_cols(old_df)
312    json_cols = set(new_json_cols + old_json_cols)
313    for json_col in old_json_cols:
314        old_df[json_col] = old_df[json_col].apply(serializer)
315    for json_col in new_json_cols:
316        new_df[json_col] = new_df[json_col].apply(serializer)
317
318    new_numeric_cols = get_numeric_cols(new_df)
319    numeric_cols = set(new_numeric_cols + old_numeric_cols)
320    for numeric_col in old_numeric_cols:
321        old_df[numeric_col] = old_df[numeric_col].apply(serialize_decimal)
322    for numeric_col in new_numeric_cols:
323        new_df[numeric_col] = new_df[numeric_col].apply(serialize_decimal)
324
325    old_dt_cols = [
326        col
327        for col, typ in old_df.dtypes.items()
328        if are_dtypes_equal(str(typ), 'datetime')
329    ]
330    for col in old_dt_cols:
331        strip_utc = (
332            (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]'
333        )
334        old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc)
335
336    new_dt_cols = [
337        col
338        for col, typ in new_df.dtypes.items()
339        if are_dtypes_equal(str(typ), 'datetime')
340    ]
341    for col in new_dt_cols:
342        strip_utc = (
343            (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]'
344        )
345        new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc)
346
347    old_uuid_cols = get_uuid_cols(old_df)
348    new_uuid_cols = get_uuid_cols(new_df)
349    uuid_cols = set(new_uuid_cols + old_uuid_cols)
350
351    old_bytes_cols = get_bytes_cols(old_df)
352    new_bytes_cols = get_bytes_cols(new_df)
353    bytes_cols = set(new_bytes_cols + old_bytes_cols)
354
355    old_geometry_cols = get_geometry_cols(old_df)
356    new_geometry_cols = get_geometry_cols(new_df)
357    geometry_cols = set(new_geometry_cols + old_geometry_cols)
358
359    joined_df = merge(
360        new_df.infer_objects(copy=False).fillna(NA),
361        old_df.infer_objects(copy=False).fillna(NA),
362        how='left',
363        on=None,
364        indicator=True,
365    )
366    changed_rows_mask = (joined_df['_merge'] == 'left_only')
367    new_cols = list(new_df_dtypes)
368    delta_df = joined_df[new_cols][changed_rows_mask].reset_index(drop=True)
369
370    for json_col in json_cols:
371        if json_col not in delta_df.columns:
372            continue
373        try:
374            delta_df[json_col] = delta_df[json_col].apply(json.loads)
375        except Exception:
376            warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}")
377
378    for numeric_col in numeric_cols:
379        if numeric_col not in delta_df.columns:
380            continue
381        try:
382            delta_df[numeric_col] = delta_df[numeric_col].apply(
383                functools.partial(
384                    attempt_cast_to_numeric,
385                    quantize=True,
386                    precision=numeric_cols_precisions_scales.get(numeric_col, (None, None)[0]),
387                    scale=numeric_cols_precisions_scales.get(numeric_col, (None, None)[1]),
388                )
389            )
390        except Exception:
391            warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}")
392
393    for uuid_col in uuid_cols:
394        if uuid_col not in delta_df.columns:
395            continue
396        try:
397            delta_df[uuid_col] = delta_df[uuid_col].apply(attempt_cast_to_uuid)
398        except Exception:
399            warn(f"Unable to parse numeric column '{uuid_col}':\n{traceback.format_exc()}")
400
401    for bytes_col in bytes_cols:
402        if bytes_col not in delta_df.columns:
403            continue
404        try:
405            delta_df[bytes_col] = delta_df[bytes_col].apply(attempt_cast_to_bytes)
406        except Exception:
407            warn(f"Unable to parse bytes column '{bytes_col}':\n{traceback.format_exc()}")
408
409    for geometry_col in geometry_cols:
410        if geometry_col not in delta_df.columns:
411            continue
412        try:
413            delta_df[geometry_col] = delta_df[geometry_col].apply(attempt_cast_to_geometry)
414        except Exception:
415            warn(f"Unable to parse bytes column '{bytes_col}':\n{traceback.format_exc()}")
416
417    return delta_df

Left join two DataFrames to find the newest unseen data.

Parameters
  • old_df ('pd.DataFrame'): The original (target) dataframe. Acts as a filter on the new_df.
  • new_df ('pd.DataFrame'): The fetched (source) dataframe. Rows that are contained in old_df are removed.
  • safe_copy (bool, default True): If True, create a copy before comparing and modifying the dataframes. Setting to False may mutate the DataFrames.
  • dtypes (Optional[Dict[str, Any]], default None): Optionally specify the datatypes of the dataframe.
  • include_unchanged_columns (bool, default False): If True, include columns which haven't changed on rows which have changed.
  • coerce_mixed_numerics (bool, default True): If True, cast mixed integer and float columns between the old and new dataframes into numeric values (decimal.Decimal).
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pandas dataframe of the new, unseen rows in new_df.
Examples
>>> import pandas as pd
>>> df1 = pd.DataFrame({'a': [1,2]})
>>> df2 = pd.DataFrame({'a': [2,3]})
>>> filter_unseen_df(df1, df2)
   a
0  3
def parse_df_datetimes( df: pandas.core.frame.DataFrame, ignore_cols: Optional[Iterable[str]] = None, strip_timezone: bool = False, chunksize: Optional[int] = None, dtype_backend: str = 'numpy_nullable', ignore_all: bool = False, debug: bool = False) -> pandas.core.frame.DataFrame:
420def parse_df_datetimes(
421    df: 'pd.DataFrame',
422    ignore_cols: Optional[Iterable[str]] = None,
423    strip_timezone: bool = False,
424    chunksize: Optional[int] = None,
425    dtype_backend: str = 'numpy_nullable',
426    ignore_all: bool = False,
427    debug: bool = False,
428) -> 'pd.DataFrame':
429    """
430    Parse a pandas DataFrame for datetime columns and cast as datetimes.
431
432    Parameters
433    ----------
434    df: pd.DataFrame
435        The pandas DataFrame to parse.
436
437    ignore_cols: Optional[Iterable[str]], default None
438        If provided, do not attempt to coerce these columns as datetimes.
439
440    strip_timezone: bool, default False
441        If `True`, remove the UTC `tzinfo` property.
442
443    chunksize: Optional[int], default None
444        If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe.
445
446    dtype_backend: str, default 'numpy_nullable'
447        If `df` is not a DataFrame and new one needs to be constructed,
448        use this as the datatypes backend.
449        Accepted values are 'numpy_nullable' and 'pyarrow'.
450
451    ignore_all: bool, default False
452        If `True`, do not attempt to cast any columns to datetimes.
453
454    debug: bool, default False
455        Verbosity toggle.
456
457    Returns
458    -------
459    A new pandas DataFrame with the determined datetime columns
460    (usually ISO strings) cast as datetimes.
461
462    Examples
463    --------
464    ```python
465    >>> import pandas as pd
466    >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 
467    >>> df.dtypes
468    a    object
469    dtype: object
470    >>> df = parse_df_datetimes(df)
471    >>> df.dtypes
472    a    datetime64[ns]
473    dtype: object
474
475    ```
476
477    """
478    from meerschaum.utils.packages import import_pandas, attempt_import
479    from meerschaum.utils.debug import dprint
480    from meerschaum.utils.warnings import warn
481    from meerschaum.utils.misc import items_str
482    from meerschaum.utils.dtypes import to_datetime
483    import traceback
484    pd = import_pandas()
485    pandas = attempt_import('pandas')
486    pd_name = pd.__name__
487    using_dask = 'dask' in pd_name
488    df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__)
489    dask_dataframe = None
490    if using_dask or df_is_dask:
491        npartitions = chunksize_to_npartitions(chunksize)
492        dask_dataframe = attempt_import('dask.dataframe')
493
494    ### if df is a dict, build DataFrame
495    if isinstance(df, pandas.DataFrame):
496        pdf = df
497    elif df_is_dask and isinstance(df, dask_dataframe.DataFrame):
498        pdf = get_first_valid_dask_partition(df)
499    else:
500        if debug:
501            dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...")
502
503        if using_dask:
504            if isinstance(df, list):
505                keys = set()
506                for doc in df:
507                    for key in doc:
508                        keys.add(key)
509                df = pd.DataFrame.from_dict(
510                    {
511                        k: [
512                            doc.get(k, None)
513                            for doc in df
514                        ] for k in keys
515                    },
516                    npartitions=npartitions,
517                )
518            elif isinstance(df, dict):
519                df = pd.DataFrame.from_dict(df, npartitions=npartitions)
520            elif 'pandas.core.frame.DataFrame' in str(type(df)):
521                df = pd.from_pandas(df, npartitions=npartitions)
522            else:
523                raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.")
524            pandas = attempt_import('pandas')
525            pdf = get_first_valid_dask_partition(df)
526
527        else:
528            df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend)
529            pdf = df
530
531    ### skip parsing if DataFrame is empty
532    if len(pdf) == 0:
533        if debug:
534            dprint("df is empty. Returning original DataFrame without casting datetime columns...")
535        return df
536
537    ignore_cols = set(
538        (ignore_cols or []) + [
539            col
540            for col, dtype in pdf.dtypes.items() 
541            if 'datetime' in str(dtype)
542        ]
543    )
544    cols_to_inspect = [
545        col
546        for col in pdf.columns
547        if col not in ignore_cols
548    ] if not ignore_all else []
549
550    if len(cols_to_inspect) == 0:
551        if debug:
552            dprint("All columns are ignored, skipping datetime detection...")
553        return df.infer_objects(copy=False).fillna(pandas.NA)
554
555    ### apply regex to columns to determine which are ISO datetimes
556    iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+'
557    dt_mask = pdf[cols_to_inspect].astype(str).apply(
558        lambda s: s.str.match(iso_dt_regex).all()
559    )
560
561    ### list of datetime column names
562    datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]]
563    if not datetime_cols:
564        if debug:
565            dprint("No columns detected as datetimes, returning...")
566        return df.infer_objects(copy=False).fillna(pandas.NA)
567
568    if debug:
569        dprint("Converting columns to datetimes: " + str(datetime_cols))
570
571    try:
572        if not using_dask:
573            df[datetime_cols] = df[datetime_cols].apply(to_datetime)
574        else:
575            df[datetime_cols] = df[datetime_cols].apply(
576                to_datetime,
577                utc=True,
578                axis=1,
579                meta={
580                    col: 'datetime64[ns, UTC]'
581                    for col in datetime_cols
582                }
583            )
584    except Exception:
585        warn(
586            f"Unable to apply `pd.to_datetime` to {items_str(datetime_cols)}:\n"
587            + f"{traceback.format_exc()}"
588        )
589
590    if strip_timezone:
591        for dt in datetime_cols:
592            try:
593                df[dt] = df[dt].dt.tz_localize(None)
594            except Exception:
595                warn(
596                    f"Unable to convert column '{dt}' to naive datetime:\n"
597                    + f"{traceback.format_exc()}"
598                )
599
600    return df.fillna(pandas.NA)

Parse a pandas DataFrame for datetime columns and cast as datetimes.

Parameters
  • df (pd.DataFrame): The pandas DataFrame to parse.
  • ignore_cols (Optional[Iterable[str]], default None): If provided, do not attempt to coerce these columns as datetimes.
  • strip_timezone (bool, default False): If True, remove the UTC tzinfo property.
  • chunksize (Optional[int], default None): If the pandas implementation is 'dask', use this chunksize for the distributed dataframe.
  • dtype_backend (str, default 'numpy_nullable'): If df is not a DataFrame and new one needs to be constructed, use this as the datatypes backend. Accepted values are 'numpy_nullable' and 'pyarrow'.
  • ignore_all (bool, default False): If True, do not attempt to cast any columns to datetimes.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A new pandas DataFrame with the determined datetime columns
  • (usually ISO strings) cast as datetimes.
Examples
>>> import pandas as pd
>>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 
>>> df.dtypes
a    object
dtype: object
>>> df = parse_df_datetimes(df)
>>> df.dtypes
a    datetime64[ns]
dtype: object
def get_unhashable_cols(df: pandas.core.frame.DataFrame) -> List[str]:
603def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]:
604    """
605    Get the columns which contain unhashable objects from a Pandas DataFrame.
606
607    Parameters
608    ----------
609    df: pd.DataFrame
610        The DataFrame which may contain unhashable objects.
611
612    Returns
613    -------
614    A list of columns.
615    """
616    if df is None:
617        return []
618    if len(df) == 0:
619        return []
620
621    is_dask = 'dask' in df.__module__
622    if is_dask:
623        from meerschaum.utils.packages import attempt_import
624        pandas = attempt_import('pandas')
625        df = pandas.DataFrame(get_first_valid_dask_partition(df))
626    return [
627        col for col, val in df.iloc[0].items()
628        if not isinstance(val, Hashable)
629    ]

Get the columns which contain unhashable objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
  • A list of columns.
def get_json_cols(df: pandas.core.frame.DataFrame) -> List[str]:
632def get_json_cols(df: 'pd.DataFrame') -> List[str]:
633    """
634    Get the columns which contain unhashable objects from a Pandas DataFrame.
635
636    Parameters
637    ----------
638    df: pd.DataFrame
639        The DataFrame which may contain unhashable objects.
640
641    Returns
642    -------
643    A list of columns to be encoded as JSON.
644    """
645    if df is None:
646        return []
647
648    is_dask = 'dask' in df.__module__ if hasattr(df, '__module__') else False
649    if is_dask:
650        df = get_first_valid_dask_partition(df)
651
652    if len(df) == 0:
653        return []
654
655    cols_indices = {
656        col: df[col].first_valid_index()
657        for col in df.columns
658    }
659    return [
660        col
661        for col, ix in cols_indices.items()
662        if (
663            ix is not None
664            and
665            not isinstance(df.loc[ix][col], Hashable)
666        )
667    ]

Get the columns which contain unhashable objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
  • A list of columns to be encoded as JSON.
def get_numeric_cols(df: pandas.core.frame.DataFrame) -> List[str]:
670def get_numeric_cols(df: 'pd.DataFrame') -> List[str]:
671    """
672    Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame.
673
674    Parameters
675    ----------
676    df: pd.DataFrame
677        The DataFrame which may contain decimal objects.
678
679    Returns
680    -------
681    A list of columns to treat as numerics.
682    """
683    if df is None:
684        return []
685    from decimal import Decimal
686    is_dask = 'dask' in df.__module__
687    if is_dask:
688        df = get_first_valid_dask_partition(df)
689
690    if len(df) == 0:
691        return []
692
693    cols_indices = {
694        col: df[col].first_valid_index()
695        for col in df.columns
696    }
697    return [
698        col
699        for col, ix in cols_indices.items()
700        if (
701            ix is not None
702            and
703            isinstance(df.loc[ix][col], Decimal)
704        )
705    ]

Get the columns which contain decimal.Decimal objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain decimal objects.
Returns
  • A list of columns to treat as numerics.
def get_uuid_cols(df: pandas.core.frame.DataFrame) -> List[str]:
708def get_uuid_cols(df: 'pd.DataFrame') -> List[str]:
709    """
710    Get the columns which contain `uuid.UUID` objects from a Pandas DataFrame.
711
712    Parameters
713    ----------
714    df: pd.DataFrame
715        The DataFrame which may contain UUID objects.
716
717    Returns
718    -------
719    A list of columns to treat as UUIDs.
720    """
721    if df is None:
722        return []
723    from uuid import UUID
724    is_dask = 'dask' in df.__module__
725    if is_dask:
726        df = get_first_valid_dask_partition(df)
727
728    if len(df) == 0:
729        return []
730
731    cols_indices = {
732        col: df[col].first_valid_index()
733        for col in df.columns
734    }
735    return [
736        col
737        for col, ix in cols_indices.items()
738        if (
739            ix is not None
740            and
741            isinstance(df.loc[ix][col], UUID)
742        )
743    ]

Get the columns which contain uuid.UUID objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain UUID objects.
Returns
  • A list of columns to treat as UUIDs.
def get_datetime_cols( df: pandas.core.frame.DataFrame, timezone_aware: bool = True, timezone_naive: bool = True) -> List[str]:
746def get_datetime_cols(
747    df: 'pd.DataFrame',
748    timezone_aware: bool = True,
749    timezone_naive: bool = True,
750) -> List[str]:
751    """
752    Get the columns which contain `datetime` or `Timestamp` objects from a Pandas DataFrame.
753
754    Parameters
755    ----------
756    df: pd.DataFrame
757        The DataFrame which may contain `datetime` or `Timestamp` objects.
758
759    timezone_aware: bool, default True
760        If `True`, include timezone-aware datetime columns.
761
762    timezone_naive: bool, default True
763        If `True`, include timezone-naive datetime columns.
764
765    Returns
766    -------
767    A list of columns to treat as datetimes.
768    """
769    if not timezone_aware and not timezone_naive:
770        raise ValueError("`timezone_aware` and `timezone_naive` cannot both be `False`.")
771
772    if df is None:
773        return []
774
775    from datetime import datetime
776    from meerschaum.utils.dtypes import are_dtypes_equal
777    is_dask = 'dask' in df.__module__
778    if is_dask:
779        df = get_first_valid_dask_partition(df)
780
781    known_dt_cols = [
782        col
783        for col, typ in df.dtypes.items()
784        if are_dtypes_equal('datetime', str(typ))
785    ]
786
787    if len(df) == 0:
788        return known_dt_cols
789
790    cols_indices = {
791        col: df[col].first_valid_index()
792        for col in df.columns
793        if col not in known_dt_cols
794    }
795    pydt_cols = [
796        col
797        for col, ix in cols_indices.items()
798        if (
799            ix is not None
800            and
801            isinstance(df.loc[ix][col], datetime)
802        )
803    ]
804    dt_cols_set = set(known_dt_cols + pydt_cols)
805    all_dt_cols = [
806        col
807        for col in df.columns
808        if col in dt_cols_set
809    ]
810    if timezone_aware and timezone_naive:
811        return all_dt_cols
812
813    known_timezone_aware_dt_cols = [
814        col
815        for col in known_dt_cols
816        if getattr(df[col], 'tz', None) is not None
817    ]
818    timezone_aware_pydt_cols = [
819        col
820        for col in pydt_cols
821        if df.loc[cols_indices[col]][col].tzinfo is not None
822    ]
823    timezone_aware_dt_cols_set = set(known_timezone_aware_dt_cols + timezone_aware_pydt_cols)
824    if timezone_aware:
825        return [
826            col
827            for col in all_dt_cols
828            if col in timezone_aware_pydt_cols
829        ]
830
831    return [
832        col
833        for col in all_dt_cols
834        if col not in timezone_aware_dt_cols_set
835    ]

Get the columns which contain datetime or Timestamp objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain datetime or Timestamp objects.
  • timezone_aware (bool, default True): If True, include timezone-aware datetime columns.
  • timezone_naive (bool, default True): If True, include timezone-naive datetime columns.
Returns
  • A list of columns to treat as datetimes.
def get_bytes_cols(df: pandas.core.frame.DataFrame) -> List[str]:
838def get_bytes_cols(df: 'pd.DataFrame') -> List[str]:
839    """
840    Get the columns which contain bytes strings from a Pandas DataFrame.
841
842    Parameters
843    ----------
844    df: pd.DataFrame
845        The DataFrame which may contain bytes strings.
846
847    Returns
848    -------
849    A list of columns to treat as bytes.
850    """
851    if df is None:
852        return []
853    is_dask = 'dask' in df.__module__
854    if is_dask:
855        df = get_first_valid_dask_partition(df)
856
857    if len(df) == 0:
858        return []
859
860    cols_indices = {
861        col: df[col].first_valid_index()
862        for col in df.columns
863    }
864    return [
865        col
866        for col, ix in cols_indices.items()
867        if (
868            ix is not None
869            and
870            isinstance(df.loc[ix][col], bytes)
871        )
872    ]

Get the columns which contain bytes strings from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain bytes strings.
Returns
  • A list of columns to treat as bytes.
def get_geometry_cols( df: pandas.core.frame.DataFrame, with_types_srids: bool = False) -> Union[List[str], Dict[str, Any]]:
875def get_geometry_cols(
876    df: 'pd.DataFrame',
877    with_types_srids: bool = False,
878) -> Union[List[str], Dict[str, Any]]:
879    """
880    Get the columns which contain shapely objects from a Pandas DataFrame.
881
882    Parameters
883    ----------
884    df: pd.DataFrame
885        The DataFrame which may contain bytes strings.
886
887    with_types_srids: bool, default False
888        If `True`, return a dictionary mapping columns to geometry types and SRIDs.
889
890    Returns
891    -------
892    A list of columns to treat as `geometry`.
893    If `with_types_srids`, return a dictionary mapping columns to tuples in the form (type, SRID).
894    """
895    if df is None:
896        return []
897
898    is_dask = 'dask' in df.__module__
899    if is_dask:
900        df = get_first_valid_dask_partition(df)
901
902    if len(df) == 0:
903        return []
904
905    cols_indices = {
906        col: df[col].first_valid_index()
907        for col in df.columns
908    }
909    geo_cols = [
910        col
911        for col, ix in cols_indices.items()
912        if (
913            ix is not None
914            and
915            'shapely' in str(type(df.loc[ix][col]))
916        )
917    ]
918    if not with_types_srids:
919        return geo_cols
920
921    gpd = mrsm.attempt_import('geopandas', lazy=False)
922    geo_cols_types_srids = {}
923    for col in geo_cols:
924        try:
925            sample_geo_series = gpd.GeoSeries(df[col], crs=None)
926            geometry_types = {
927                geom.geom_type
928                for geom in sample_geo_series
929                if hasattr(geom, 'geom_type')
930            }
931            geometry_has_z = any(getattr(geom, 'has_z', False) for geom in sample_geo_series)
932            srid = (
933                (
934                    sample_geo_series.crs.sub_crs_list[0].to_epsg()
935                    if sample_geo_series.crs.is_compound
936                    else sample_geo_series.crs.to_epsg()
937                )
938                if sample_geo_series.crs
939                else 0
940            )
941            geometry_type = list(geometry_types)[0] if len(geometry_types) == 1 else 'geometry'
942            if geometry_type != 'geometry' and geometry_has_z:
943                geometry_type = geometry_type + 'Z'
944        except Exception:
945            srid = 0
946            geometry_type = 'geometry'
947        geo_cols_types_srids[col] = (geometry_type, srid)
948
949    return geo_cols_types_srids

Get the columns which contain shapely objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain bytes strings.
  • with_types_srids (bool, default False): If True, return a dictionary mapping columns to geometry types and SRIDs.
Returns
  • A list of columns to treat as geometry.
  • If with_types_srids, return a dictionary mapping columns to tuples in the form (type, SRID).
def enforce_dtypes( df: pandas.core.frame.DataFrame, dtypes: Dict[str, str], safe_copy: bool = True, coerce_numeric: bool = True, coerce_timezone: bool = True, strip_timezone: bool = False, debug: bool = False) -> pandas.core.frame.DataFrame:
 952def enforce_dtypes(
 953    df: 'pd.DataFrame',
 954    dtypes: Dict[str, str],
 955    safe_copy: bool = True,
 956    coerce_numeric: bool = True,
 957    coerce_timezone: bool = True,
 958    strip_timezone: bool = False,
 959    debug: bool = False,
 960) -> 'pd.DataFrame':
 961    """
 962    Enforce the `dtypes` dictionary on a DataFrame.
 963
 964    Parameters
 965    ----------
 966    df: pd.DataFrame
 967        The DataFrame on which to enforce dtypes.
 968
 969    dtypes: Dict[str, str]
 970        The data types to attempt to enforce on the DataFrame.
 971
 972    safe_copy: bool, default True
 973        If `True`, create a copy before comparing and modifying the dataframes.
 974        Setting to `False` may mutate the DataFrames.
 975        See `meerschaum.utils.dataframe.filter_unseen_df`.
 976
 977    coerce_numeric: bool, default True
 978        If `True`, convert float and int collisions to numeric.
 979
 980    coerce_timezone: bool, default True
 981        If `True`, convert datetimes to UTC.
 982
 983    strip_timezone: bool, default False
 984        If `coerce_timezone` and `strip_timezone` are `True`,
 985        remove timezone information from datetimes.
 986
 987    debug: bool, default False
 988        Verbosity toggle.
 989
 990    Returns
 991    -------
 992    The Pandas DataFrame with the types enforced.
 993    """
 994    import json
 995    import functools
 996    from meerschaum.utils.debug import dprint
 997    from meerschaum.utils.formatting import pprint
 998    from meerschaum.utils.dtypes import (
 999        are_dtypes_equal,
1000        to_pandas_dtype,
1001        is_dtype_numeric,
1002        attempt_cast_to_numeric,
1003        attempt_cast_to_uuid,
1004        attempt_cast_to_bytes,
1005        attempt_cast_to_geometry,
1006        coerce_timezone as _coerce_timezone,
1007    )
1008    from meerschaum.utils.dtypes.sql import get_numeric_precision_scale
1009    pandas = mrsm.attempt_import('pandas')
1010    is_dask = 'dask' in df.__module__
1011    if safe_copy:
1012        df = df.copy()
1013    if len(df.columns) == 0:
1014        if debug:
1015            dprint("Incoming DataFrame has no columns. Skipping enforcement...")
1016        return df
1017
1018    pipe_pandas_dtypes = {
1019        col: to_pandas_dtype(typ)
1020        for col, typ in dtypes.items()
1021    }
1022    json_cols = [
1023        col
1024        for col, typ in dtypes.items()
1025        if typ == 'json'
1026    ]
1027    numeric_cols = [
1028        col
1029        for col, typ in dtypes.items()
1030        if typ.startswith('numeric')
1031    ]
1032    geometry_cols = [
1033        col
1034        for col, typ in dtypes.items()
1035        if typ.startswith('geometry') or typ.startswith('geography')
1036    ]
1037    uuid_cols = [
1038        col
1039        for col, typ in dtypes.items()
1040        if typ == 'uuid'
1041    ]
1042    bytes_cols = [
1043        col
1044        for col, typ in dtypes.items()
1045        if typ == 'bytes'
1046    ]
1047    datetime_cols = [
1048        col
1049        for col, typ in dtypes.items()
1050        if are_dtypes_equal(typ, 'datetime')
1051    ]
1052    df_numeric_cols = get_numeric_cols(df)
1053    if debug:
1054        dprint("Desired data types:")
1055        pprint(dtypes)
1056        dprint("Data types for incoming DataFrame:")
1057        pprint({_col: str(_typ) for _col, _typ in df.dtypes.items()})
1058
1059    if json_cols and len(df) > 0:
1060        if debug:
1061            dprint(f"Checking columns for JSON encoding: {json_cols}")
1062        for col in json_cols:
1063            if col in df.columns:
1064                try:
1065                    df[col] = df[col].apply(
1066                        (
1067                            lambda x: (
1068                                json.loads(x)
1069                                if isinstance(x, str)
1070                                else x
1071                            )
1072                        )
1073                    )
1074                except Exception as e:
1075                    if debug:
1076                        dprint(f"Unable to parse column '{col}' as JSON:\n{e}")
1077
1078    if numeric_cols:
1079        if debug:
1080            dprint(f"Checking for numerics: {numeric_cols}")
1081        for col in numeric_cols:
1082            precision, scale = get_numeric_precision_scale(None, dtypes.get(col, ''))
1083            if col in df.columns:
1084                try:
1085                    df[col] = df[col].apply(
1086                        functools.partial(
1087                            attempt_cast_to_numeric,
1088                            quantize=True,
1089                            precision=precision,
1090                            scale=scale,
1091                        )
1092                    )
1093                except Exception as e:
1094                    if debug:
1095                        dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}")
1096
1097    if uuid_cols:
1098        if debug:
1099            dprint(f"Checking for UUIDs: {uuid_cols}")
1100        for col in uuid_cols:
1101            if col in df.columns:
1102                try:
1103                    df[col] = df[col].apply(attempt_cast_to_uuid)
1104                except Exception as e:
1105                    if debug:
1106                        dprint(f"Unable to parse column '{col}' as UUID:\n{e}")
1107
1108    if bytes_cols:
1109        if debug:
1110            dprint(f"Checking for bytes: {bytes_cols}")
1111        for col in bytes_cols:
1112            if col in df.columns:
1113                try:
1114                    df[col] = df[col].apply(attempt_cast_to_bytes)
1115                except Exception as e:
1116                    if debug:
1117                        dprint(f"Unable to parse column '{col}' as bytes:\n{e}")
1118
1119    if datetime_cols and coerce_timezone:
1120        if debug:
1121            dprint(f"Checking for datetime conversion: {datetime_cols}")
1122        for col in datetime_cols:
1123            if col in df.columns:
1124                df[col] = _coerce_timezone(df[col], strip_utc=strip_timezone)
1125
1126    if geometry_cols:
1127        geopandas = mrsm.attempt_import('geopandas')
1128        if debug:
1129            dprint(f"Checking for geometry: {geometry_cols}")
1130        parsed_geom_cols = []
1131        for col in geometry_cols:
1132            try:
1133                df[col] = df[col].apply(attempt_cast_to_geometry)
1134                parsed_geom_cols.append(col)
1135            except Exception as e:
1136                if debug:
1137                    dprint(f"Unable to parse column '{col}' as geometry:\n{e}")
1138
1139        if parsed_geom_cols:
1140            if debug:
1141                dprint(f"Converting to GeoDataFrame (geometry column: '{parsed_geom_cols[0]}')...")
1142            df = geopandas.GeoDataFrame(df, geometry=parsed_geom_cols[0])
1143            try:
1144                df.rename_geometry(parsed_geom_cols[0], inplace=True)
1145            except ValueError:
1146                pass
1147
1148    df_dtypes = {c: str(t) for c, t in df.dtypes.items()}
1149    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
1150        if debug:
1151            dprint("Data types match. Exiting enforcement...")
1152        return df
1153
1154    common_dtypes = {}
1155    common_diff_dtypes = {}
1156    for col, typ in pipe_pandas_dtypes.items():
1157        if col in df_dtypes:
1158            common_dtypes[col] = typ
1159            if not are_dtypes_equal(typ, df_dtypes[col]):
1160                common_diff_dtypes[col] = df_dtypes[col]
1161
1162    if debug:
1163        dprint("Common columns with different dtypes:")
1164        pprint(common_diff_dtypes)
1165
1166    detected_dt_cols = {}
1167    for col, typ in common_diff_dtypes.items():
1168        if 'datetime' in typ and 'datetime' in common_dtypes[col]:
1169            df_dtypes[col] = typ
1170            detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col])
1171    for col in detected_dt_cols:
1172        del common_diff_dtypes[col]
1173
1174    if debug:
1175        dprint("Common columns with different dtypes (after dates):")
1176        pprint(common_diff_dtypes)
1177
1178    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
1179        if debug:
1180            dprint(
1181                "The incoming DataFrame has mostly the same types, skipping enforcement."
1182                + "The only detected difference was in the following datetime columns."
1183            )
1184            pprint(detected_dt_cols)
1185        return df
1186
1187    for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items():
1188        previous_typ = common_dtypes[col]
1189        mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ))
1190        explicitly_float = are_dtypes_equal(dtypes.get(col, 'object'), 'float')
1191        explicitly_numeric = dtypes.get(col, 'numeric').startswith('numeric')
1192        cast_to_numeric = (
1193            explicitly_numeric
1194            or col in df_numeric_cols
1195            or (mixed_numeric_types and not explicitly_float)
1196        ) and coerce_numeric
1197        if cast_to_numeric:
1198            common_dtypes[col] = attempt_cast_to_numeric
1199            common_diff_dtypes[col] = attempt_cast_to_numeric
1200
1201    for d in common_diff_dtypes:
1202        t = common_dtypes[d]
1203        if debug:
1204            dprint(f"Casting column {d} to dtype {t}.")
1205        try:
1206            df[d] = (
1207                df[d].apply(t)
1208                if callable(t)
1209                else df[d].astype(t)
1210            )
1211        except Exception as e:
1212            if debug:
1213                dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}")
1214            if 'int' in str(t).lower():
1215                try:
1216                    df[d] = df[d].astype('float64').astype(t)
1217                except Exception:
1218                    if debug:
1219                        dprint(f"Was unable to convert to float then {t}.")
1220    return df

Enforce the dtypes dictionary on a DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame on which to enforce dtypes.
  • dtypes (Dict[str, str]): The data types to attempt to enforce on the DataFrame.
  • safe_copy (bool, default True): If True, create a copy before comparing and modifying the dataframes. Setting to False may mutate the DataFrames. See meerschaum.utils.dataframe.filter_unseen_df.
  • coerce_numeric (bool, default True): If True, convert float and int collisions to numeric.
  • coerce_timezone (bool, default True): If True, convert datetimes to UTC.
  • strip_timezone (bool, default False): If coerce_timezone and strip_timezone are True, remove timezone information from datetimes.
  • debug (bool, default False): Verbosity toggle.
Returns
  • The Pandas DataFrame with the types enforced.
def get_datetime_bound_from_df( df: Union[pandas.core.frame.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]]], datetime_column: str, minimum: bool = True) -> Union[int, datetime.datetime, NoneType]:
1223def get_datetime_bound_from_df(
1224    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]],
1225    datetime_column: str,
1226    minimum: bool = True,
1227) -> Union[int, datetime, None]:
1228    """
1229    Return the minimum or maximum datetime (or integer) from a DataFrame.
1230
1231    Parameters
1232    ----------
1233    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]
1234        The DataFrame, list, or dict which contains the range axis.
1235
1236    datetime_column: str
1237        The name of the datetime (or int) column.
1238
1239    minimum: bool
1240        Whether to return the minimum (default) or maximum value.
1241
1242    Returns
1243    -------
1244    The minimum or maximum datetime value in the dataframe, or `None`.
1245    """
1246    from meerschaum.utils.dtypes import to_datetime, value_is_null
1247
1248    if df is None:
1249        return None
1250    if not datetime_column:
1251        return None
1252
1253    def compare(a, b):
1254        if a is None:
1255            return b
1256        if b is None:
1257            return a
1258        if minimum:
1259            return a if a < b else b
1260        return a if a > b else b
1261
1262    if isinstance(df, list):
1263        if len(df) == 0:
1264            return None
1265        best_yet = df[0].get(datetime_column, None)
1266        for doc in df:
1267            val = doc.get(datetime_column, None)
1268            best_yet = compare(best_yet, val)
1269        return best_yet
1270
1271    if isinstance(df, dict):
1272        if datetime_column not in df:
1273            return None
1274        best_yet = df[datetime_column][0]
1275        for val in df[datetime_column]:
1276            best_yet = compare(best_yet, val)
1277        return best_yet
1278
1279    if 'DataFrame' in str(type(df)):
1280        from meerschaum.utils.dtypes import are_dtypes_equal
1281        pandas = mrsm.attempt_import('pandas')
1282        is_dask = 'dask' in df.__module__
1283
1284        if datetime_column not in df.columns:
1285            return None
1286
1287        try:
1288            dt_val = (
1289                df[datetime_column].min(skipna=True)
1290                if minimum
1291                else df[datetime_column].max(skipna=True)
1292            )
1293        except Exception:
1294            dt_val = pandas.NA
1295        if is_dask and dt_val is not None and dt_val is not pandas.NA:
1296            dt_val = dt_val.compute()
1297
1298        return (
1299            to_datetime(dt_val, as_pydatetime=True)
1300            if are_dtypes_equal(str(type(dt_val)), 'datetime')
1301            else (dt_val if not value_is_null(dt_val) else None)
1302        )
1303
1304    return None

Return the minimum or maximum datetime (or integer) from a DataFrame.

Parameters
  • df (Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]): The DataFrame, list, or dict which contains the range axis.
  • datetime_column (str): The name of the datetime (or int) column.
  • minimum (bool): Whether to return the minimum (default) or maximum value.
Returns
  • The minimum or maximum datetime value in the dataframe, or None.
def get_unique_index_values( df: Union[pandas.core.frame.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]]], indices: List[str]) -> Dict[str, List[Any]]:
1307def get_unique_index_values(
1308    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]],
1309    indices: List[str],
1310) -> Dict[str, List[Any]]:
1311    """
1312    Return a dictionary of the unique index values in a DataFrame.
1313
1314    Parameters
1315    ----------
1316    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]
1317        The dataframe (or list or dict) which contains index values.
1318
1319    indices: List[str]
1320        The list of index columns.
1321
1322    Returns
1323    -------
1324    A dictionary mapping indices to unique values.
1325    """
1326    if df is None:
1327        return {}
1328    if 'dataframe' in str(type(df)).lower():
1329        pandas = mrsm.attempt_import('pandas')
1330        return {
1331            col: list({
1332                (val if val is not pandas.NA else None)
1333                for val in df[col].unique()
1334            })
1335            for col in indices
1336            if col in df.columns
1337        }
1338
1339    unique_indices = defaultdict(lambda: set())
1340    if isinstance(df, list):
1341        for doc in df:
1342            for index in indices:
1343                if index in doc:
1344                    unique_indices[index].add(doc[index])
1345
1346    elif isinstance(df, dict):
1347        for index in indices:
1348            if index in df:
1349                unique_indices[index] = unique_indices[index].union(set(df[index]))
1350
1351    return {key: list(val) for key, val in unique_indices.items()}

Return a dictionary of the unique index values in a DataFrame.

Parameters
  • df (Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]): The dataframe (or list or dict) which contains index values.
  • indices (List[str]): The list of index columns.
Returns
  • A dictionary mapping indices to unique values.
def df_is_chunk_generator(df: Any) -> bool:
1354def df_is_chunk_generator(df: Any) -> bool:
1355    """
1356    Determine whether to treat `df` as a chunk generator.
1357
1358    Note this should only be used in a context where generators are expected,
1359    as it will return `True` for any iterable.
1360
1361    Parameters
1362    ----------
1363    The DataFrame or chunk generator to evaluate.
1364
1365    Returns
1366    -------
1367    A `bool` indicating whether to treat `df` as a generator.
1368    """
1369    return (
1370        not isinstance(df, (dict, list, str))
1371        and 'DataFrame' not in str(type(df))
1372        and isinstance(df, (Generator, Iterable, Iterator))
1373    )

Determine whether to treat df as a chunk generator.

Note this should only be used in a context where generators are expected, as it will return True for any iterable.

Parameters
  • The DataFrame or chunk generator to evaluate.
Returns
  • A bool indicating whether to treat df as a generator.
def chunksize_to_npartitions(chunksize: Optional[int]) -> int:
1376def chunksize_to_npartitions(chunksize: Optional[int]) -> int:
1377    """
1378    Return the Dask `npartitions` value for a given `chunksize`.
1379    """
1380    if chunksize == -1:
1381        from meerschaum.config import get_config
1382        chunksize = get_config('system', 'connectors', 'sql', 'chunksize')
1383    if chunksize is None:
1384        return 1
1385    return -1 * chunksize

Return the Dask npartitions value for a given chunksize.

def df_from_literal( pipe: Optional[meerschaum.Pipe] = None, literal: str = None, debug: bool = False) -> pandas.core.frame.DataFrame:
1388def df_from_literal(
1389    pipe: Optional[mrsm.Pipe] = None,
1390    literal: str = None,
1391    debug: bool = False
1392) -> 'pd.DataFrame':
1393    """
1394    Construct a dataframe from a literal value, using the pipe's datetime and value column names.
1395
1396    Parameters
1397    ----------
1398    pipe: Optional['meerschaum.Pipe'], default None
1399        The pipe which will consume the literal value.
1400
1401    Returns
1402    -------
1403    A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
1404    and the literal as the value.
1405    """
1406    from meerschaum.utils.packages import import_pandas
1407    from meerschaum.utils.warnings import error, warn
1408    from meerschaum.utils.debug import dprint
1409
1410    if pipe is None or literal is None:
1411        error("Please provide a Pipe and a literal value")
1412    ### this will raise an error if the columns are undefined
1413    dt_name, val_name = pipe.get_columns('datetime', 'value')
1414
1415    val = literal
1416    if isinstance(literal, str):
1417        if debug:
1418            dprint(f"Received literal string: '{literal}'")
1419        import ast
1420        try:
1421            val = ast.literal_eval(literal)
1422        except Exception:
1423            warn(
1424                "Failed to parse value from string:\n" + f"{literal}" +
1425                "\n\nWill cast as a string instead."\
1426            )
1427            val = literal
1428
1429    from datetime import datetime, timezone
1430    now = datetime.now(timezone.utc).replace(tzinfo=None)
1431
1432    pd = import_pandas()
1433    return pd.DataFrame({dt_name: [now], val_name: [val]})

Construct a dataframe from a literal value, using the pipe's datetime and value column names.

Parameters
  • pipe (Optional['meerschaum.Pipe'], default None): The pipe which will consume the literal value.
Returns
  • A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
  • and the literal as the value.
def get_first_valid_dask_partition( ddf: "'dask.dataframe.DataFrame'") -> Optional[pandas.core.frame.DataFrame]:
1436def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]:
1437    """
1438    Return the first valid Dask DataFrame partition (if possible).
1439    """
1440    pdf = None
1441    for partition in ddf.partitions:
1442        try:
1443            pdf = partition.compute()
1444        except Exception:
1445            continue
1446        if len(pdf) > 0:
1447            return pdf
1448    _ = mrsm.attempt_import('partd', lazy=False)
1449    return ddf.compute()

Return the first valid Dask DataFrame partition (if possible).

def query_df( df: pandas.core.frame.DataFrame, params: Optional[Dict[str, Any]] = None, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, datetime_column: Optional[str] = None, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, inplace: bool = False, reset_index: bool = False, coerce_types: bool = False, debug: bool = False) -> pandas.core.frame.DataFrame:
1452def query_df(
1453    df: 'pd.DataFrame',
1454    params: Optional[Dict[str, Any]] = None,
1455    begin: Union[datetime, int, None] = None,
1456    end: Union[datetime, int, None] = None,
1457    datetime_column: Optional[str] = None,
1458    select_columns: Optional[List[str]] = None,
1459    omit_columns: Optional[List[str]] = None,
1460    inplace: bool = False,
1461    reset_index: bool = False,
1462    coerce_types: bool = False,
1463    debug: bool = False,
1464) -> 'pd.DataFrame':
1465    """
1466    Query the dataframe with the params dictionary.
1467
1468    Parameters
1469    ----------
1470    df: pd.DataFrame
1471        The DataFrame to query against.
1472
1473    params: Optional[Dict[str, Any]], default None
1474        The parameters dictionary to use for the query.
1475
1476    begin: Union[datetime, int, None], default None
1477        If `begin` and `datetime_column` are provided, only return rows with a timestamp
1478        greater than or equal to this value.
1479
1480    end: Union[datetime, int, None], default None
1481        If `begin` and `datetime_column` are provided, only return rows with a timestamp
1482        less than this value.
1483
1484    datetime_column: Optional[str], default None
1485        A `datetime_column` must be provided to use `begin` and `end`.
1486
1487    select_columns: Optional[List[str]], default None
1488        If provided, only return these columns.
1489
1490    omit_columns: Optional[List[str]], default None
1491        If provided, do not include these columns in the result.
1492
1493    inplace: bool, default False
1494        If `True`, modify the DataFrame inplace rather than creating a new DataFrame.
1495
1496    reset_index: bool, default False
1497        If `True`, reset the index in the resulting DataFrame.
1498
1499    coerce_types: bool, default False
1500        If `True`, cast the dataframe and parameters as strings before querying.
1501
1502    Returns
1503    -------
1504    A Pandas DataFrame query result.
1505    """
1506
1507    def _process_select_columns(_df):
1508        if not select_columns:
1509            return
1510        for col in list(_df.columns):
1511            if col not in select_columns:
1512                del _df[col]
1513
1514    def _process_omit_columns(_df):
1515        if not omit_columns:
1516            return
1517        for col in list(_df.columns):
1518            if col in omit_columns:
1519                del _df[col]
1520
1521    if not params and not begin and not end:
1522        if not inplace:
1523            df = df.copy()
1524        _process_select_columns(df)
1525        _process_omit_columns(df)
1526        return df
1527
1528    from meerschaum.utils.debug import dprint
1529    from meerschaum.utils.misc import get_in_ex_params
1530    from meerschaum.utils.warnings import warn
1531    from meerschaum.utils.dtypes import are_dtypes_equal, value_is_null
1532    dateutil_parser = mrsm.attempt_import('dateutil.parser')
1533    pandas = mrsm.attempt_import('pandas')
1534    NA = pandas.NA
1535
1536    if params:
1537        params = params.copy()
1538        for key, val in {k: v for k, v in params.items()}.items():
1539            if isinstance(val, (list, tuple)):
1540                if None in val:
1541                    val = [item for item in val if item is not None] + [NA]
1542                    params[key] = val
1543                if coerce_types:
1544                    params[key] = [str(x) for x in val]
1545            else:
1546                if value_is_null(val):
1547                    val = NA
1548                    params[key] = NA
1549                if coerce_types:
1550                    params[key] = str(val)
1551
1552    dtypes = {col: str(typ) for col, typ in df.dtypes.items()}
1553
1554    if inplace:
1555        df.fillna(NA, inplace=True)
1556    else:
1557        df = df.infer_objects().fillna(NA)
1558
1559    if isinstance(begin, str):
1560        begin = dateutil_parser.parse(begin)
1561    if isinstance(end, str):
1562        end = dateutil_parser.parse(end)
1563
1564    if begin is not None or end is not None:
1565        if not datetime_column or datetime_column not in df.columns:
1566            warn(
1567                f"The datetime column '{datetime_column}' is not present in the Dataframe, "
1568                + "ignoring begin and end...",
1569            )
1570            begin, end = None, None
1571
1572    if debug:
1573        dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}")
1574
1575    if datetime_column and (begin is not None or end is not None):
1576        if debug:
1577            dprint("Checking for datetime column compatability.")
1578
1579        from meerschaum.utils.dtypes import coerce_timezone
1580        df_is_dt = are_dtypes_equal(str(df.dtypes[datetime_column]), 'datetime')
1581        begin_is_int = are_dtypes_equal(str(type(begin)), 'int')
1582        end_is_int = are_dtypes_equal(str(type(end)), 'int')
1583
1584        if df_is_dt:
1585            df_tz = (
1586                getattr(df[datetime_column].dt, 'tz', None)
1587                if hasattr(df[datetime_column], 'dt')
1588                else None
1589            )
1590
1591            if begin_is_int:
1592                begin = datetime.fromtimestamp(int(begin), timezone.utc).replace(tzinfo=None)
1593                if debug:
1594                    dprint(f"`begin` will be cast to '{begin}'.")
1595            if end_is_int:
1596                end = datetime.fromtimestamp(int(end), timezone.utc).replace(tzinfo=None)
1597                if debug:
1598                    dprint(f"`end` will be cast to '{end}'.")
1599
1600            begin = coerce_timezone(begin, strip_utc=(df_tz is None)) if begin is not None else None
1601            end = coerce_timezone(end, strip_utc=(df_tz is None)) if begin is not None else None
1602
1603    in_ex_params = get_in_ex_params(params)
1604
1605    masks = [
1606        (
1607            (df[datetime_column] >= begin)
1608            if begin is not None and datetime_column
1609            else True
1610        ) & (
1611            (df[datetime_column] < end)
1612            if end is not None and datetime_column
1613            else True
1614        )
1615    ]
1616
1617    masks.extend([
1618        (
1619            (
1620                (df[col] if not coerce_types else df[col].astype(str)).isin(in_vals)
1621                if in_vals
1622                else True
1623            ) & (
1624                ~(df[col] if not coerce_types else df[col].astype(str)).isin(ex_vals)
1625                if ex_vals
1626                else True
1627            )
1628        )
1629        for col, (in_vals, ex_vals) in in_ex_params.items()
1630        if col in df.columns
1631    ])
1632    query_mask = masks[0]
1633    for mask in masks[1:]:
1634        query_mask = query_mask & mask
1635
1636    original_cols = df.columns
1637
1638    ### NOTE: We must cast bool columns to `boolean[pyarrow]`
1639    ###       to allow for `<NA>` values.
1640    bool_cols = [
1641        col
1642        for col, typ in df.dtypes.items()
1643        if are_dtypes_equal(str(typ), 'bool')
1644    ]
1645    for col in bool_cols:
1646        df[col] = df[col].astype('boolean[pyarrow]')
1647
1648    if not isinstance(query_mask, bool):
1649        df['__mrsm_mask'] = (
1650            query_mask.astype('boolean[pyarrow]')
1651            if hasattr(query_mask, 'astype')
1652            else query_mask
1653        )
1654
1655        if inplace:
1656            df.where(query_mask, other=NA, inplace=True)
1657            df.dropna(how='all', inplace=True)
1658            result_df = df
1659        else:
1660            result_df = df.where(query_mask, other=NA)
1661            result_df.dropna(how='all', inplace=True)
1662
1663    else:
1664        result_df = df
1665
1666    if '__mrsm_mask' in df.columns:
1667        del df['__mrsm_mask']
1668    if '__mrsm_mask' in result_df.columns:
1669        del result_df['__mrsm_mask']
1670
1671    if reset_index:
1672        result_df.reset_index(drop=True, inplace=True)
1673
1674    result_df = enforce_dtypes(
1675        result_df,
1676        dtypes,
1677        safe_copy=False,
1678        debug=debug,
1679        coerce_numeric=False,
1680        coerce_timezone=False,
1681    )
1682
1683    if select_columns == ['*']:
1684        select_columns = None
1685
1686    if not select_columns and not omit_columns:
1687        return result_df[original_cols]
1688
1689    _process_select_columns(result_df)
1690    _process_omit_columns(result_df)
1691
1692    return result_df

Query the dataframe with the params dictionary.

Parameters
  • df (pd.DataFrame): The DataFrame to query against.
  • params (Optional[Dict[str, Any]], default None): The parameters dictionary to use for the query.
  • begin (Union[datetime, int, None], default None): If begin and datetime_column are provided, only return rows with a timestamp greater than or equal to this value.
  • end (Union[datetime, int, None], default None): If begin and datetime_column are provided, only return rows with a timestamp less than this value.
  • datetime_column (Optional[str], default None): A datetime_column must be provided to use begin and end.
  • select_columns (Optional[List[str]], default None): If provided, only return these columns.
  • omit_columns (Optional[List[str]], default None): If provided, do not include these columns in the result.
  • inplace (bool, default False): If True, modify the DataFrame inplace rather than creating a new DataFrame.
  • reset_index (bool, default False): If True, reset the index in the resulting DataFrame.
  • coerce_types (bool, default False): If True, cast the dataframe and parameters as strings before querying.
Returns
  • A Pandas DataFrame query result.
def to_json( df: pandas.core.frame.DataFrame, safe_copy: bool = True, orient: str = 'records', date_format: str = 'iso', date_unit: str = 'us', double_precision: int = 15, geometry_format: str = 'geojson', **kwargs: Any) -> str:
1695def to_json(
1696    df: 'pd.DataFrame',
1697    safe_copy: bool = True,
1698    orient: str = 'records',
1699    date_format: str = 'iso',
1700    date_unit: str = 'us',
1701    double_precision: int = 15,
1702    geometry_format: str = 'geojson',
1703    **kwargs: Any
1704) -> str:
1705    """
1706    Serialize the given dataframe as a JSON string.
1707
1708    Parameters
1709    ----------
1710    df: pd.DataFrame
1711        The DataFrame to be serialized.
1712
1713    safe_copy: bool, default True
1714        If `False`, modify the DataFrame inplace.
1715
1716    date_format: str, default 'iso'
1717        The default format for timestamps.
1718
1719    date_unit: str, default 'us'
1720        The precision of the timestamps.
1721
1722    double_precision: int, default 15
1723        The number of decimal places to use when encoding floating point values (maximum 15).
1724
1725    geometry_format: str, default 'geojson'
1726        The serialization format for geometry data.
1727        Accepted values are `geojson`, `wkb_hex`, and `wkt`.
1728
1729    Returns
1730    -------
1731    A JSON string.
1732    """
1733    import warnings
1734    import functools
1735    from meerschaum.utils.packages import import_pandas
1736    from meerschaum.utils.dtypes import (
1737        serialize_bytes,
1738        serialize_decimal,
1739        serialize_geometry,
1740    )
1741    pd = import_pandas()
1742    uuid_cols = get_uuid_cols(df)
1743    bytes_cols = get_bytes_cols(df)
1744    numeric_cols = get_numeric_cols(df)
1745    geometry_cols = get_geometry_cols(df)
1746    if safe_copy and bool(uuid_cols or bytes_cols or geometry_cols or numeric_cols):
1747        df = df.copy()
1748    for col in uuid_cols:
1749        df[col] = df[col].astype(str)
1750    for col in bytes_cols:
1751        df[col] = df[col].apply(serialize_bytes)
1752    for col in numeric_cols:
1753        df[col] = df[col].apply(serialize_decimal)
1754    with warnings.catch_warnings():
1755        warnings.simplefilter("ignore")
1756        for col in geometry_cols:
1757            df[col] = df[col].apply(
1758                functools.partial(
1759                    serialize_geometry,
1760                    geometry_format=geometry_format,
1761                )
1762            )
1763    return df.infer_objects(copy=False).fillna(pd.NA).to_json(
1764        date_format=date_format,
1765        date_unit=date_unit,
1766        double_precision=double_precision,
1767        orient=orient,
1768        **kwargs
1769    )

Serialize the given dataframe as a JSON string.

Parameters
  • df (pd.DataFrame): The DataFrame to be serialized.
  • safe_copy (bool, default True): If False, modify the DataFrame inplace.
  • date_format (str, default 'iso'): The default format for timestamps.
  • date_unit (str, default 'us'): The precision of the timestamps.
  • double_precision (int, default 15): The number of decimal places to use when encoding floating point values (maximum 15).
  • geometry_format (str, default 'geojson'): The serialization format for geometry data. Accepted values are geojson, wkb_hex, and wkt.
Returns
  • A JSON string.