meerschaum.utils.dataframe

Utility functions for working with DataFrames.

   1#! /usr/bin/env python3
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4
   5"""
   6Utility functions for working with DataFrames.
   7"""
   8
   9from __future__ import annotations
  10
  11from datetime import datetime, timezone, date
  12from collections import defaultdict
  13
  14import meerschaum as mrsm
  15from meerschaum.utils.typing import (
  16    Optional, Dict, Any, List, Hashable, Generator,
  17    Iterator, Iterable, Union, TYPE_CHECKING, Tuple,
  18)
  19
  20if TYPE_CHECKING:
  21    pd, dask = mrsm.attempt_import('pandas', 'dask')
  22
  23
  24def add_missing_cols_to_df(
  25    df: 'pd.DataFrame',
  26    dtypes: Dict[str, Any],
  27) -> 'pd.DataFrame':
  28    """
  29    Add columns from the dtypes dictionary as null columns to a new DataFrame.
  30
  31    Parameters
  32    ----------
  33    df: pd.DataFrame
  34        The dataframe we should copy and add null columns.
  35
  36    dtypes:
  37        The data types dictionary which may contain keys not present in `df.columns`.
  38
  39    Returns
  40    -------
  41    A new `DataFrame` with the keys from `dtypes` added as null columns.
  42    If `df.dtypes` is the same as `dtypes`, then return a reference to `df`.
  43    NOTE: This will not ensure that dtypes are enforced!
  44
  45    Examples
  46    --------
  47    >>> import pandas as pd
  48    >>> df = pd.DataFrame([{'a': 1}])
  49    >>> dtypes = {'b': 'Int64'}
  50    >>> add_missing_cols_to_df(df, dtypes)
  51          a  b
  52       0  1  <NA>
  53    >>> add_missing_cols_to_df(df, dtypes).dtypes
  54    a    int64
  55    b    Int64
  56    dtype: object
  57    >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes
  58    a    int64
  59    dtype: object
  60    >>> 
  61    """
  62    if set(df.columns) == set(dtypes):
  63        return df
  64
  65    from meerschaum.utils.packages import attempt_import
  66    from meerschaum.utils.dtypes import to_pandas_dtype
  67    pandas = attempt_import('pandas')
  68
  69    def build_series(dtype: str):
  70        return pandas.Series([], dtype=to_pandas_dtype(dtype))
  71
  72    assign_kwargs = {
  73        str(col): build_series(str(typ))
  74        for col, typ in dtypes.items()
  75        if col not in df.columns
  76    }
  77    df_with_cols = df.assign(**assign_kwargs)
  78    for col in assign_kwargs:
  79        df_with_cols[col] = df_with_cols[col].fillna(pandas.NA)
  80    return df_with_cols
  81
  82
  83def filter_unseen_df(
  84    old_df: 'pd.DataFrame',
  85    new_df: 'pd.DataFrame',
  86    safe_copy: bool = True,
  87    dtypes: Optional[Dict[str, Any]] = None,
  88    include_unchanged_columns: bool = False,
  89    coerce_mixed_numerics: bool = True,
  90    debug: bool = False,
  91) -> 'pd.DataFrame':
  92    """
  93    Left join two DataFrames to find the newest unseen data.
  94
  95    Parameters
  96    ----------
  97    old_df: 'pd.DataFrame'
  98        The original (target) dataframe. Acts as a filter on the `new_df`.
  99
 100    new_df: 'pd.DataFrame'
 101        The fetched (source) dataframe. Rows that are contained in `old_df` are removed.
 102
 103    safe_copy: bool, default True
 104        If `True`, create a copy before comparing and modifying the dataframes.
 105        Setting to `False` may mutate the DataFrames.
 106
 107    dtypes: Optional[Dict[str, Any]], default None
 108        Optionally specify the datatypes of the dataframe.
 109
 110    include_unchanged_columns: bool, default False
 111        If `True`, include columns which haven't changed on rows which have changed.
 112
 113    coerce_mixed_numerics: bool, default True
 114        If `True`, cast mixed integer and float columns between the old and new dataframes into
 115        numeric values (`decimal.Decimal`).
 116
 117    debug: bool, default False
 118        Verbosity toggle.
 119
 120    Returns
 121    -------
 122    A pandas dataframe of the new, unseen rows in `new_df`.
 123
 124    Examples
 125    --------
 126    ```python
 127    >>> import pandas as pd
 128    >>> df1 = pd.DataFrame({'a': [1,2]})
 129    >>> df2 = pd.DataFrame({'a': [2,3]})
 130    >>> filter_unseen_df(df1, df2)
 131       a
 132    0  3
 133
 134    ```
 135
 136    """
 137    if old_df is None:
 138        return new_df
 139
 140    if safe_copy:
 141        old_df = old_df.copy()
 142        new_df = new_df.copy()
 143
 144    import json
 145    import functools
 146    import traceback
 147    from meerschaum.utils.warnings import warn
 148    from meerschaum.utils.packages import import_pandas, attempt_import
 149    from meerschaum.utils.dtypes import (
 150        to_pandas_dtype,
 151        are_dtypes_equal,
 152        attempt_cast_to_numeric,
 153        attempt_cast_to_uuid,
 154        attempt_cast_to_bytes,
 155        attempt_cast_to_geometry,
 156        coerce_timezone,
 157        serialize_decimal,
 158    )
 159    from meerschaum.utils.dtypes.sql import get_numeric_precision_scale
 160    pd = import_pandas(debug=debug)
 161    is_dask = 'dask' in new_df.__module__
 162    if is_dask:
 163        pandas = attempt_import('pandas')
 164        _ = attempt_import('partd', lazy=False)
 165        dd = attempt_import('dask.dataframe')
 166        merge = dd.merge
 167        NA = pandas.NA
 168    else:
 169        merge = pd.merge
 170        NA = pd.NA
 171
 172    new_df_dtypes = dict(new_df.dtypes)
 173    old_df_dtypes = dict(old_df.dtypes)
 174
 175    same_cols = set(new_df.columns) == set(old_df.columns)
 176    if not same_cols:
 177        new_df = add_missing_cols_to_df(new_df, old_df_dtypes)
 178        old_df = add_missing_cols_to_df(old_df, new_df_dtypes)
 179
 180        new_types_missing_from_old = {
 181            col: typ
 182            for col, typ in new_df_dtypes.items()
 183            if col not in old_df_dtypes
 184        }
 185        old_types_missing_from_new = {
 186            col: typ
 187            for col, typ in new_df_dtypes.items()
 188            if col not in old_df_dtypes
 189        }
 190        old_df_dtypes.update(new_types_missing_from_old)
 191        new_df_dtypes.update(old_types_missing_from_new)
 192
 193    ### Edge case: two empty lists cast to DFs.
 194    elif len(new_df.columns) == 0:
 195        return new_df
 196
 197    try:
 198        ### Order matters when checking equality.
 199        new_df = new_df[old_df.columns]
 200
 201    except Exception as e:
 202        warn(
 203            "Was not able to cast old columns onto new DataFrame. " +
 204            f"Are both DataFrames the same shape? Error:\n{e}",
 205            stacklevel=3,
 206        )
 207        return new_df[list(new_df_dtypes.keys())]
 208
 209    ### assume the old_df knows what it's doing, even if it's technically wrong.
 210    if dtypes is None:
 211        dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()}
 212
 213    dtypes = {
 214        col: to_pandas_dtype(typ)
 215        for col, typ in dtypes.items()
 216        if col in new_df_dtypes and col in old_df_dtypes
 217    }
 218    for col, typ in new_df_dtypes.items():
 219        if col not in dtypes:
 220            dtypes[col] = typ
 221
 222    numeric_cols_precisions_scales = {
 223        col: get_numeric_precision_scale(None, typ)
 224        for col, typ in dtypes.items()
 225        if col and str(typ).lower().startswith('numeric')
 226    }
 227
 228    dt_dtypes = {
 229        col: typ
 230        for col, typ in dtypes.items()
 231        if are_dtypes_equal(typ, 'datetime')
 232    }
 233    non_dt_dtypes = {
 234        col: typ
 235        for col, typ in dtypes.items()
 236        if col not in dt_dtypes
 237    }
 238
 239    cast_non_dt_cols = True
 240    try:
 241        new_df = new_df.astype(non_dt_dtypes)
 242        cast_non_dt_cols = False
 243    except Exception as e:
 244        warn(
 245            f"Was not able to cast the new DataFrame to the given dtypes.\n{e}"
 246        )
 247
 248    cast_dt_cols = True
 249    try:
 250        for col, typ in dt_dtypes.items():
 251            _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime'))
 252            strip_utc = (
 253                _dtypes_col_dtype.startswith('datetime64')
 254                and 'utc' not in _dtypes_col_dtype.lower()
 255            )
 256            if col in old_df.columns:
 257                old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc)
 258            if col in new_df.columns:
 259                new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc)
 260        cast_dt_cols = False
 261    except Exception as e:
 262        warn(f"Could not cast datetime columns:\n{e}")
 263
 264    cast_cols = cast_dt_cols or cast_non_dt_cols
 265
 266    new_numeric_cols_existing = get_numeric_cols(new_df)
 267    old_numeric_cols = get_numeric_cols(old_df)
 268    for col, typ in {k: v for k, v in dtypes.items()}.items():
 269        if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')):
 270            new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float')
 271            new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int')
 272            new_is_numeric = col in new_numeric_cols_existing
 273            old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float')
 274            old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int')
 275            old_is_numeric = col in old_numeric_cols
 276
 277            if (
 278                coerce_mixed_numerics
 279                and
 280                (new_is_float or new_is_int or new_is_numeric)
 281                and
 282                (old_is_float or old_is_int or old_is_numeric)
 283            ):
 284                dtypes[col] = attempt_cast_to_numeric
 285                cast_cols = True
 286                continue
 287
 288            ### Fallback to object if the types don't match.
 289            warn(
 290                f"Detected different types for '{col}' "
 291                + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), "
 292                + "falling back to 'object'..."
 293            )
 294            dtypes[col] = 'object'
 295            cast_cols = True
 296
 297    if cast_cols:
 298        for col, dtype in dtypes.items():
 299            if col in new_df.columns:
 300                try:
 301                    new_df[col] = (
 302                        new_df[col].astype(dtype)
 303                        if not callable(dtype)
 304                        else new_df[col].apply(dtype)
 305                    )
 306                except Exception as e:
 307                    warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}")
 308
 309    serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str)
 310    new_json_cols = get_json_cols(new_df)
 311    old_json_cols = get_json_cols(old_df)
 312    json_cols = set(new_json_cols + old_json_cols)
 313    for json_col in old_json_cols:
 314        old_df[json_col] = old_df[json_col].apply(serializer)
 315    for json_col in new_json_cols:
 316        new_df[json_col] = new_df[json_col].apply(serializer)
 317
 318    new_numeric_cols = get_numeric_cols(new_df)
 319    numeric_cols = set(new_numeric_cols + old_numeric_cols)
 320    for numeric_col in old_numeric_cols:
 321        old_df[numeric_col] = old_df[numeric_col].apply(serialize_decimal)
 322    for numeric_col in new_numeric_cols:
 323        new_df[numeric_col] = new_df[numeric_col].apply(serialize_decimal)
 324
 325    old_dt_cols = [
 326        col
 327        for col, typ in old_df.dtypes.items()
 328        if are_dtypes_equal(str(typ), 'datetime')
 329    ]
 330    for col in old_dt_cols:
 331        _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime'))
 332        strip_utc = (
 333            _dtypes_col_dtype.startswith('datetime64')
 334            and 'utc' not in _dtypes_col_dtype.lower()
 335        )
 336        old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc)
 337
 338    new_dt_cols = [
 339        col
 340        for col, typ in new_df.dtypes.items()
 341        if are_dtypes_equal(str(typ), 'datetime')
 342    ]
 343    for col in new_dt_cols:
 344        _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime'))
 345        strip_utc = (
 346            _dtypes_col_dtype.startswith('datetime64')
 347            and 'utc' not in _dtypes_col_dtype.lower()
 348        )
 349        new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc)
 350
 351    old_uuid_cols = get_uuid_cols(old_df)
 352    new_uuid_cols = get_uuid_cols(new_df)
 353    uuid_cols = set(new_uuid_cols + old_uuid_cols)
 354
 355    old_bytes_cols = get_bytes_cols(old_df)
 356    new_bytes_cols = get_bytes_cols(new_df)
 357    bytes_cols = set(new_bytes_cols + old_bytes_cols)
 358
 359    old_geometry_cols = get_geometry_cols(old_df)
 360    new_geometry_cols = get_geometry_cols(new_df)
 361    geometry_cols = set(new_geometry_cols + old_geometry_cols)
 362
 363    na_pattern = r'(?i)^(none|nan|na|nat|natz|<na>)$'
 364    joined_df = merge(
 365        new_df.infer_objects(copy=False).replace(na_pattern, pd.NA, regex=True).fillna(NA),
 366        old_df.infer_objects(copy=False).replace(na_pattern, pd.NA, regex=True).fillna(NA),
 367        how='left',
 368        on=None,
 369        indicator=True,
 370    )
 371    changed_rows_mask = (joined_df['_merge'] == 'left_only')
 372    new_cols = list(new_df_dtypes)
 373    delta_df = joined_df[new_cols][changed_rows_mask].reset_index(drop=True)
 374
 375    delta_json_cols = get_json_cols(delta_df)
 376    for json_col in json_cols:
 377        if (
 378            json_col in delta_json_cols
 379            or json_col not in delta_df.columns
 380        ):
 381            continue
 382        try:
 383            delta_df[json_col] = delta_df[json_col].apply(
 384                lambda x: (json.loads(x) if isinstance(x, str) else x)
 385            )
 386        except Exception:
 387            warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}")
 388
 389    delta_numeric_cols = get_numeric_cols(delta_df)
 390    for numeric_col in numeric_cols:
 391        if (
 392            numeric_col in delta_numeric_cols
 393            or numeric_col not in delta_df.columns
 394        ):
 395            continue
 396        try:
 397            delta_df[numeric_col] = delta_df[numeric_col].apply(
 398                functools.partial(
 399                    attempt_cast_to_numeric,
 400                    quantize=True,
 401                    precision=numeric_cols_precisions_scales.get(numeric_col, (None, None)[0]),
 402                    scale=numeric_cols_precisions_scales.get(numeric_col, (None, None)[1]),
 403                )
 404            )
 405        except Exception:
 406            warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}")
 407
 408    delta_uuid_cols = get_uuid_cols(delta_df)
 409    for uuid_col in uuid_cols:
 410        if (
 411            uuid_col in delta_uuid_cols
 412            or uuid_col not in delta_df.columns
 413        ):
 414            continue
 415        try:
 416            delta_df[uuid_col] = delta_df[uuid_col].apply(attempt_cast_to_uuid)
 417        except Exception:
 418            warn(f"Unable to parse numeric column '{uuid_col}':\n{traceback.format_exc()}")
 419
 420    delta_bytes_cols = get_bytes_cols(delta_df)
 421    for bytes_col in bytes_cols:
 422        if (
 423            bytes_col in delta_bytes_cols
 424            or bytes_col not in delta_df.columns
 425        ):
 426            continue
 427        try:
 428            delta_df[bytes_col] = delta_df[bytes_col].apply(attempt_cast_to_bytes)
 429        except Exception:
 430            warn(f"Unable to parse bytes column '{bytes_col}':\n{traceback.format_exc()}")
 431
 432    delta_geometry_cols = get_geometry_cols(delta_df)
 433    for geometry_col in geometry_cols:
 434        if (
 435            geometry_col in delta_geometry_cols
 436            or geometry_col not in delta_df.columns
 437        ):
 438            continue
 439        try:
 440            delta_df[geometry_col] = attempt_cast_to_geometry(delta_df[geometry_col])
 441        except Exception:
 442            warn(f"Unable to parse geometry column '{geometry_col}':\n{traceback.format_exc()}")
 443
 444    return delta_df
 445
 446
 447def parse_df_datetimes(
 448    df: 'pd.DataFrame',
 449    ignore_cols: Optional[Iterable[str]] = None,
 450    strip_timezone: bool = False,
 451    chunksize: Optional[int] = None,
 452    dtype_backend: str = 'numpy_nullable',
 453    ignore_all: bool = False,
 454    precision_unit: Optional[str] = None,
 455    coerce_utc: bool = True,
 456    debug: bool = False,
 457) -> 'pd.DataFrame':
 458    """
 459    Parse a pandas DataFrame for datetime columns and cast as datetimes.
 460
 461    Parameters
 462    ----------
 463    df: pd.DataFrame
 464        The pandas DataFrame to parse.
 465
 466    ignore_cols: Optional[Iterable[str]], default None
 467        If provided, do not attempt to coerce these columns as datetimes.
 468
 469    strip_timezone: bool, default False
 470        If `True`, remove the UTC `tzinfo` property.
 471
 472    chunksize: Optional[int], default None
 473        If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe.
 474
 475    dtype_backend: str, default 'numpy_nullable'
 476        If `df` is not a DataFrame and new one needs to be constructed,
 477        use this as the datatypes backend.
 478        Accepted values are 'numpy_nullable' and 'pyarrow'.
 479
 480    ignore_all: bool, default False
 481        If `True`, do not attempt to cast any columns to datetimes.
 482
 483    precision_unit: Optional[str], default None
 484        If provided, enforce the given precision on the coerced datetime columns.
 485
 486    coerce_utc: bool, default True
 487        Coerce the datetime columns to UTC (see `meerschaum.utils.dtypes.to_datetime()`).
 488
 489    debug: bool, default False
 490        Verbosity toggle.
 491
 492    Returns
 493    -------
 494    A new pandas DataFrame with the determined datetime columns
 495    (usually ISO strings) cast as datetimes.
 496
 497    Examples
 498    --------
 499    ```python
 500    >>> import pandas as pd
 501    >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 
 502    >>> df.dtypes
 503    a    object
 504    dtype: object
 505    >>> df2 = parse_df_datetimes(df)
 506    >>> df2.dtypes
 507    a    datetime64[us, UTC]
 508    dtype: object
 509
 510    ```
 511
 512    """
 513    from meerschaum.utils.packages import import_pandas, attempt_import
 514    from meerschaum.utils.debug import dprint
 515    from meerschaum.utils.warnings import warn
 516    from meerschaum.utils.misc import items_str
 517    from meerschaum.utils.dtypes import to_datetime, MRSM_PD_DTYPES
 518    import traceback
 519
 520    pd = import_pandas()
 521    pandas = attempt_import('pandas')
 522    pd_name = pd.__name__
 523    using_dask = 'dask' in pd_name
 524    df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__)
 525    dask_dataframe = None
 526    if using_dask or df_is_dask:
 527        npartitions = chunksize_to_npartitions(chunksize)
 528        dask_dataframe = attempt_import('dask.dataframe')
 529
 530    ### if df is a dict, build DataFrame
 531    if isinstance(df, pandas.DataFrame):
 532        pdf = df
 533    elif df_is_dask and isinstance(df, dask_dataframe.DataFrame):
 534        pdf = get_first_valid_dask_partition(df)
 535    else:
 536        if debug:
 537            dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...")
 538
 539        if using_dask:
 540            if isinstance(df, list):
 541                keys = set()
 542                for doc in df:
 543                    for key in doc:
 544                        keys.add(key)
 545                df = pd.DataFrame.from_dict(
 546                    {
 547                        k: [
 548                            doc.get(k, None)
 549                            for doc in df
 550                        ] for k in keys
 551                    },
 552                    npartitions=npartitions,
 553                )
 554            elif isinstance(df, dict):
 555                df = pd.DataFrame.from_dict(df, npartitions=npartitions)
 556            elif 'pandas.core.frame.DataFrame' in str(type(df)):
 557                df = pd.from_pandas(df, npartitions=npartitions)
 558            else:
 559                raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.")
 560            pandas = attempt_import('pandas')
 561            pdf = get_first_valid_dask_partition(df)
 562
 563        else:
 564            df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend)
 565            pdf = df
 566
 567    ### skip parsing if DataFrame is empty
 568    if len(pdf) == 0:
 569        if debug:
 570            dprint("df is empty. Returning original DataFrame without casting datetime columns...")
 571        return df
 572
 573    ignore_cols = set(
 574        (ignore_cols or []) + [
 575            col
 576            for col, dtype in pdf.dtypes.items() 
 577            if 'datetime' in str(dtype)
 578        ]
 579    )
 580    cols_to_inspect = [
 581        col
 582        for col in pdf.columns
 583        if col not in ignore_cols
 584    ] if not ignore_all else []
 585
 586    if len(cols_to_inspect) == 0:
 587        if debug:
 588            dprint("All columns are ignored, skipping datetime detection...")
 589        return df.infer_objects(copy=False).fillna(pandas.NA)
 590
 591    ### apply regex to columns to determine which are ISO datetimes
 592    iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+'
 593    dt_mask = pdf[cols_to_inspect].astype(str).apply(
 594        lambda s: s.str.match(iso_dt_regex).all()
 595    )
 596
 597    ### list of datetime column names
 598    datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]]
 599    if not datetime_cols:
 600        if debug:
 601            dprint("No columns detected as datetimes, returning...")
 602        return df.infer_objects(copy=False).fillna(pandas.NA)
 603
 604    if debug:
 605        dprint("Converting columns to datetimes: " + str(datetime_cols))
 606
 607    def _parse_to_datetime(x):
 608        return to_datetime(x, precision_unit=precision_unit, coerce_utc=coerce_utc)
 609
 610    try:
 611        if not using_dask:
 612            df[datetime_cols] = df[datetime_cols].apply(_parse_to_datetime)
 613        else:
 614            df[datetime_cols] = df[datetime_cols].apply(
 615                _parse_to_datetime,
 616                utc=True,
 617                axis=1,
 618                meta={
 619                    col: MRSM_PD_DTYPES['datetime']
 620                    for col in datetime_cols
 621                }
 622            )
 623    except Exception:
 624        warn(
 625            f"Unable to apply `to_datetime()` to {items_str(datetime_cols)}:\n"
 626            + f"{traceback.format_exc()}"
 627        )
 628
 629    if strip_timezone:
 630        for dt in datetime_cols:
 631            try:
 632                df[dt] = df[dt].dt.tz_localize(None)
 633            except Exception:
 634                warn(
 635                    f"Unable to convert column '{dt}' to naive datetime:\n"
 636                    + f"{traceback.format_exc()}"
 637                )
 638
 639    return df.fillna(pandas.NA)
 640
 641
 642def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]:
 643    """
 644    Get the columns which contain unhashable objects from a Pandas DataFrame.
 645
 646    Parameters
 647    ----------
 648    df: pd.DataFrame
 649        The DataFrame which may contain unhashable objects.
 650
 651    Returns
 652    -------
 653    A list of columns.
 654    """
 655    if df is None:
 656        return []
 657    if len(df) == 0:
 658        return []
 659
 660    is_dask = 'dask' in df.__module__
 661    if is_dask:
 662        from meerschaum.utils.packages import attempt_import
 663        pandas = attempt_import('pandas')
 664        df = pandas.DataFrame(get_first_valid_dask_partition(df))
 665    return [
 666        col for col, val in df.iloc[0].items()
 667        if not isinstance(val, Hashable)
 668    ]
 669
 670
 671def get_json_cols(df: 'pd.DataFrame') -> List[str]:
 672    """
 673    Get the columns which contain unhashable objects from a Pandas DataFrame.
 674
 675    Parameters
 676    ----------
 677    df: pd.DataFrame
 678        The DataFrame which may contain unhashable objects.
 679
 680    Returns
 681    -------
 682    A list of columns to be encoded as JSON.
 683    """
 684    if df is None:
 685        return []
 686
 687    is_dask = 'dask' in df.__module__ if hasattr(df, '__module__') else False
 688    if is_dask:
 689        df = get_first_valid_dask_partition(df)
 690
 691    if len(df) == 0:
 692        return []
 693
 694    cols_indices = {
 695        col: df[col].first_valid_index()
 696        for col in df.columns
 697    }
 698    return [
 699        col
 700        for col, ix in cols_indices.items()
 701        if (
 702            ix is not None
 703            and isinstance(df.loc[ix][col], (dict, list))
 704        )
 705    ]
 706
 707
 708def get_numeric_cols(df: 'pd.DataFrame') -> List[str]:
 709    """
 710    Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame.
 711
 712    Parameters
 713    ----------
 714    df: pd.DataFrame
 715        The DataFrame which may contain decimal objects.
 716
 717    Returns
 718    -------
 719    A list of columns to treat as numerics.
 720    """
 721    if df is None:
 722        return []
 723    from decimal import Decimal
 724    is_dask = 'dask' in df.__module__
 725    if is_dask:
 726        df = get_first_valid_dask_partition(df)
 727
 728    if len(df) == 0:
 729        return []
 730
 731    cols_indices = {
 732        col: df[col].first_valid_index()
 733        for col in df.columns
 734    }
 735    return [
 736        col
 737        for col, ix in cols_indices.items()
 738        if (
 739            ix is not None
 740            and
 741            isinstance(df.loc[ix][col], Decimal)
 742        )
 743    ]
 744
 745
 746def get_bool_cols(df: 'pd.DataFrame') -> List[str]:
 747    """
 748    Get the columns which contain `bool` objects from a Pandas DataFrame.
 749
 750    Parameters
 751    ----------
 752    df: pd.DataFrame
 753        The DataFrame which may contain bools.
 754
 755    Returns
 756    -------
 757    A list of columns to treat as bools.
 758    """
 759    if df is None:
 760        return []
 761
 762    is_dask = 'dask' in df.__module__
 763    if is_dask:
 764        df = get_first_valid_dask_partition(df)
 765
 766    if len(df) == 0:
 767        return []
 768
 769    from meerschaum.utils.dtypes import are_dtypes_equal
 770
 771    return [
 772        col
 773        for col, typ in df.dtypes.items()
 774        if are_dtypes_equal(str(typ), 'bool')
 775    ]
 776
 777
 778def get_uuid_cols(df: 'pd.DataFrame') -> List[str]:
 779    """
 780    Get the columns which contain `uuid.UUID` objects from a Pandas DataFrame.
 781
 782    Parameters
 783    ----------
 784    df: pd.DataFrame
 785        The DataFrame which may contain UUID objects.
 786
 787    Returns
 788    -------
 789    A list of columns to treat as UUIDs.
 790    """
 791    if df is None:
 792        return []
 793    from uuid import UUID
 794    is_dask = 'dask' in df.__module__
 795    if is_dask:
 796        df = get_first_valid_dask_partition(df)
 797
 798    if len(df) == 0:
 799        return []
 800
 801    cols_indices = {
 802        col: df[col].first_valid_index()
 803        for col in df.columns
 804    }
 805    return [
 806        col
 807        for col, ix in cols_indices.items()
 808        if (
 809            ix is not None
 810            and
 811            isinstance(df.loc[ix][col], UUID)
 812        )
 813    ]
 814
 815
 816def get_datetime_cols(
 817    df: 'pd.DataFrame',
 818    timezone_aware: bool = True,
 819    timezone_naive: bool = True,
 820    with_tz_precision: bool = False,
 821) -> Union[List[str], Dict[str, Tuple[Union[str, None], str]]]:
 822    """
 823    Get the columns which contain `datetime` or `Timestamp` objects from a Pandas DataFrame.
 824
 825    Parameters
 826    ----------
 827    df: pd.DataFrame
 828        The DataFrame which may contain `datetime` or `Timestamp` objects.
 829
 830    timezone_aware: bool, default True
 831        If `True`, include timezone-aware datetime columns.
 832
 833    timezone_naive: bool, default True
 834        If `True`, include timezone-naive datetime columns.
 835
 836    with_tz_precision: bool, default False
 837        If `True`, return a dictionary mapping column names to tuples in the form
 838        `(timezone, precision)`.
 839
 840    Returns
 841    -------
 842    A list of columns to treat as datetimes, or a dictionary of columns to tz+precision tuples
 843    (if `with_tz_precision` is `True`).
 844    """
 845    if not timezone_aware and not timezone_naive:
 846        raise ValueError("`timezone_aware` and `timezone_naive` cannot both be `False`.")
 847
 848    if df is None:
 849        return [] if not with_tz_precision else {}
 850
 851    from datetime import datetime
 852    from meerschaum.utils.dtypes import are_dtypes_equal, MRSM_PRECISION_UNITS_ALIASES
 853    is_dask = 'dask' in df.__module__
 854    if is_dask:
 855        df = get_first_valid_dask_partition(df)
 856   
 857    def get_tz_precision_from_dtype(dtype: str) -> Tuple[Union[str, None], str]:
 858        """
 859        Extract the tz + precision tuple from a dtype string.
 860        """
 861        meta_str = dtype.split('[', maxsplit=1)[-1].rstrip(']').replace(' ', '')
 862        tz = (
 863            None
 864            if ',' not in meta_str
 865            else meta_str.split(',', maxsplit=1)[-1]
 866        )
 867        precision_abbreviation = (
 868            meta_str
 869            if ',' not in meta_str
 870            else meta_str.split(',')[0]
 871        )
 872        precision = MRSM_PRECISION_UNITS_ALIASES[precision_abbreviation]
 873        return tz, precision
 874
 875    def get_tz_precision_from_datetime(dt: datetime) -> Tuple[Union[str, None], str]:
 876        """
 877        Return the tz + precision tuple from a Python datetime object.
 878        """
 879        return dt.tzname(), 'microsecond'
 880
 881    known_dt_cols_types = {
 882        col: str(typ)
 883        for col, typ in df.dtypes.items()
 884        if are_dtypes_equal('datetime', str(typ))
 885    }
 886 
 887    known_dt_cols_tuples = {
 888        col: get_tz_precision_from_dtype(typ)
 889        for col, typ in known_dt_cols_types.items()
 890    }
 891
 892    if len(df) == 0:
 893        return (
 894            list(known_dt_cols_types)
 895            if not with_tz_precision
 896            else known_dt_cols_tuples
 897        )
 898
 899    cols_indices = {
 900        col: df[col].first_valid_index()
 901        for col in df.columns
 902        if col not in known_dt_cols_types
 903    }
 904    pydt_cols_tuples = {
 905        col: get_tz_precision_from_datetime(sample_val)
 906        for col, ix in cols_indices.items()
 907        if (
 908            ix is not None
 909            and
 910            isinstance((sample_val := df.loc[ix][col]), datetime)
 911        )
 912    }
 913
 914    dt_cols_tuples = {
 915        **known_dt_cols_tuples,
 916        **pydt_cols_tuples
 917    }
 918
 919    all_dt_cols_tuples = {
 920        col: dt_cols_tuples[col]
 921        for col in df.columns
 922        if col in dt_cols_tuples
 923    }
 924    if timezone_aware and timezone_naive:
 925        return (
 926            list(all_dt_cols_tuples)
 927            if not with_tz_precision
 928            else all_dt_cols_tuples
 929        )
 930
 931    known_timezone_aware_dt_cols = [
 932        col
 933        for col in known_dt_cols_types
 934        if getattr(df[col], 'tz', None) is not None
 935    ]
 936    timezone_aware_pydt_cols_tuples = {
 937        col: (tz, precision)
 938        for col, (tz, precision) in pydt_cols_tuples.items()
 939        if df.loc[cols_indices[col]][col].tzinfo is not None
 940    }
 941    timezone_aware_dt_cols_set = set(
 942        known_timezone_aware_dt_cols + list(timezone_aware_pydt_cols_tuples)
 943    )
 944    timezone_aware_cols_tuples = {
 945        col: (tz, precision)
 946        for col, (tz, precision) in all_dt_cols_tuples.items()
 947        if col in timezone_aware_dt_cols_set
 948    }
 949    timezone_naive_cols_tuples = {
 950        col: (tz, precision)
 951        for col, (tz, precision) in all_dt_cols_tuples.items()
 952        if col not in timezone_aware_dt_cols_set
 953    }
 954
 955    if timezone_aware:
 956        return (
 957            list(timezone_aware_cols_tuples)
 958            if not with_tz_precision
 959            else timezone_aware_cols_tuples
 960        )
 961
 962    return (
 963        list(timezone_naive_cols_tuples)
 964        if not with_tz_precision
 965        else timezone_naive_cols_tuples
 966    )
 967
 968
 969def get_datetime_cols_types(df: 'pd.DataFrame') -> Dict[str, str]:
 970    """
 971    Return a dictionary mapping datetime columns to specific types strings.
 972
 973    Parameters
 974    ----------
 975    df: pd.DataFrame
 976        The DataFrame which may contain datetime columns.
 977
 978    Returns
 979    -------
 980    A dictionary mapping the datetime columns' names to dtype strings
 981    (containing timezone and precision metadata).
 982
 983    Examples
 984    --------
 985    >>> from datetime import datetime, timezone
 986    >>> import pandas as pd
 987    >>> df = pd.DataFrame({'dt_tz_aware': [datetime(2025, 1, 1, tzinfo=timezone.utc)]})
 988    >>> get_datetime_cols_types(df)
 989    {'dt_tz_aware': 'datetime64[us, UTC]'}
 990    >>> df = pd.DataFrame({'distant_dt': [datetime(1, 1, 1)]})
 991    >>> get_datetime_cols_types(df)
 992    {'distant_dt': 'datetime64[us]'}
 993    >>> df = pd.DataFrame({'dt_second': datetime(2025, 1, 1)})
 994    >>> df['dt_second'] = df['dt_second'].astype('datetime64[s]')
 995    >>> get_datetime_cols_types(df)
 996    {'dt_second': 'datetime64[s]'}
 997    """
 998    from meerschaum.utils.dtypes import MRSM_PRECISION_UNITS_ABBREVIATIONS
 999    dt_cols_tuples = get_datetime_cols(df, with_tz_precision=True)
1000    if not dt_cols_tuples:
1001        return {}
1002
1003    return {
1004        col: (
1005            f"datetime64[{MRSM_PRECISION_UNITS_ABBREVIATIONS[precision]}]"
1006            if tz is None
1007            else f"datetime64[{MRSM_PRECISION_UNITS_ABBREVIATIONS[precision]}, {tz}]"
1008        )
1009        for col, (tz, precision) in dt_cols_tuples.items()
1010    }
1011
1012
1013def get_date_cols(df: 'pd.DataFrame') -> List[str]:
1014    """
1015    Get the `date` columns from a Pandas DataFrame.
1016
1017    Parameters
1018    ----------
1019    df: pd.DataFrame
1020        The DataFrame which may contain dates.
1021
1022    Returns
1023    -------
1024    A list of columns to treat as dates.
1025    """
1026    from meerschaum.utils.dtypes import are_dtypes_equal
1027    if df is None:
1028        return []
1029
1030    is_dask = 'dask' in df.__module__
1031    if is_dask:
1032        df = get_first_valid_dask_partition(df)
1033
1034    known_date_cols = [
1035        col
1036        for col, typ in df.dtypes.items()
1037        if are_dtypes_equal(typ, 'date')
1038    ]
1039
1040    if len(df) == 0:
1041        return known_date_cols
1042
1043    cols_indices = {
1044        col: df[col].first_valid_index()
1045        for col in df.columns
1046        if col not in known_date_cols
1047    }
1048    object_date_cols = [
1049        col
1050        for col, ix in cols_indices.items()
1051        if (
1052            ix is not None
1053            and isinstance(df.loc[ix][col], date)
1054        )
1055    ]
1056
1057    all_date_cols = set(known_date_cols + object_date_cols)
1058
1059    return [
1060        col
1061        for col in df.columns
1062        if col in all_date_cols
1063    ]
1064
1065
1066def get_bytes_cols(df: 'pd.DataFrame') -> List[str]:
1067    """
1068    Get the columns which contain bytes strings from a Pandas DataFrame.
1069
1070    Parameters
1071    ----------
1072    df: pd.DataFrame
1073        The DataFrame which may contain bytes strings.
1074
1075    Returns
1076    -------
1077    A list of columns to treat as bytes.
1078    """
1079    if df is None:
1080        return []
1081
1082    is_dask = 'dask' in df.__module__
1083    if is_dask:
1084        df = get_first_valid_dask_partition(df)
1085
1086    known_bytes_cols = [
1087        col
1088        for col, typ in df.dtypes.items()
1089        if str(typ) == 'binary[pyarrow]'
1090    ]
1091
1092    if len(df) == 0:
1093        return known_bytes_cols
1094
1095    cols_indices = {
1096        col: df[col].first_valid_index()
1097        for col in df.columns
1098        if col not in known_bytes_cols
1099    }
1100    object_bytes_cols = [
1101        col
1102        for col, ix in cols_indices.items()
1103        if (
1104            ix is not None
1105            and isinstance(df.loc[ix][col], bytes)
1106        )
1107    ]
1108
1109    all_bytes_cols = set(known_bytes_cols + object_bytes_cols)
1110
1111    return [
1112        col
1113        for col in df.columns
1114        if col in all_bytes_cols
1115    ]
1116
1117
1118def get_geometry_cols(
1119    df: 'pd.DataFrame',
1120    with_types_srids: bool = False,
1121) -> Union[List[str], Dict[str, Any]]:
1122    """
1123    Get the columns which contain shapely objects from a Pandas DataFrame.
1124
1125    Parameters
1126    ----------
1127    df: pd.DataFrame
1128        The DataFrame which may contain bytes strings.
1129
1130    with_types_srids: bool, default False
1131        If `True`, return a dictionary mapping columns to geometry types and SRIDs.
1132
1133    Returns
1134    -------
1135    A list of columns to treat as `geometry`.
1136    If `with_types_srids`, return a dictionary mapping columns to tuples in the form (type, SRID).
1137    """
1138    if df is None:
1139        return [] if not with_types_srids else {}
1140
1141    is_dask = 'dask' in df.__module__
1142    if is_dask:
1143        df = get_first_valid_dask_partition(df)
1144
1145    if len(df) == 0:
1146        return [] if not with_types_srids else {}
1147
1148    cols_indices = {
1149        col: df[col].first_valid_index()
1150        for col in df.columns
1151    }
1152    geo_cols = [
1153        col
1154        for col, ix in cols_indices.items()
1155        if (
1156            ix is not None
1157            and
1158            'shapely' in str(type(df.loc[ix][col]))
1159        )
1160    ]
1161    if not with_types_srids:
1162        return geo_cols
1163
1164    gpd = mrsm.attempt_import('geopandas', lazy=False)
1165    geo_cols_types_srids = {}
1166    for col in geo_cols:
1167        try:
1168            sample_geo_series = gpd.GeoSeries(df[col], crs=None)
1169            geometry_types = {
1170                geom.geom_type
1171                for geom in sample_geo_series
1172                if hasattr(geom, 'geom_type')
1173            }
1174            geometry_has_z = any(getattr(geom, 'has_z', False) for geom in sample_geo_series)
1175            srid = (
1176                (
1177                    sample_geo_series.crs.sub_crs_list[0].to_epsg()
1178                    if sample_geo_series.crs.is_compound
1179                    else sample_geo_series.crs.to_epsg()
1180                )
1181                if sample_geo_series.crs
1182                else 0
1183            )
1184            geometry_type = list(geometry_types)[0] if len(geometry_types) == 1 else 'geometry'
1185            if geometry_type != 'geometry' and geometry_has_z:
1186                geometry_type = geometry_type + 'Z'
1187        except Exception:
1188            srid = 0
1189            geometry_type = 'geometry'
1190        geo_cols_types_srids[col] = (geometry_type, srid)
1191
1192    return geo_cols_types_srids
1193
1194
1195def get_geometry_cols_types(df: 'pd.DataFrame') -> Dict[str, str]:
1196    """
1197    Return a dtypes dictionary mapping columns to specific geometry types (type, srid).
1198    """
1199    geometry_cols_types_srids = get_geometry_cols(df, with_types_srids=True)
1200    new_cols_types = {}
1201    for col, (geometry_type, srid) in geometry_cols_types_srids.items():
1202        new_dtype = "geometry"
1203        modifier = ""
1204        if not srid and geometry_type.lower() == 'geometry':
1205            new_cols_types[col] = new_dtype
1206            continue
1207
1208        modifier = "["
1209        if geometry_type.lower() != 'geometry':
1210            modifier += f"{geometry_type}"
1211
1212        if srid:
1213            if modifier != '[':
1214                modifier += ", "
1215            modifier += f"{srid}"
1216        modifier += "]"
1217        new_cols_types[col] = f"{new_dtype}{modifier}"
1218    return new_cols_types
1219
1220
1221def get_special_cols(df: 'pd.DataFrame') -> Dict[str, str]:
1222    """
1223    Return a dtypes dictionary mapping special columns to their dtypes.
1224    """
1225    return {
1226        **{col: 'json' for col in get_json_cols(df)},
1227        **{col: 'uuid' for col in get_uuid_cols(df)},
1228        **{col: 'bytes' for col in get_bytes_cols(df)},
1229        **{col: 'bool' for col in get_bool_cols(df)},
1230        **{col: 'numeric' for col in get_numeric_cols(df)},
1231        **{col: 'date' for col in get_date_cols(df)},
1232        **get_datetime_cols_types(df),
1233        **get_geometry_cols_types(df),
1234    }
1235
1236
1237def enforce_dtypes(
1238    df: 'pd.DataFrame',
1239    dtypes: Dict[str, str],
1240    explicit_dtypes: Optional[Dict[str, str]] = None,
1241    safe_copy: bool = True,
1242    coerce_numeric: bool = False,
1243    coerce_timezone: bool = True,
1244    strip_timezone: bool = False,
1245    debug: bool = False,
1246) -> 'pd.DataFrame':
1247    """
1248    Enforce the `dtypes` dictionary on a DataFrame.
1249
1250    Parameters
1251    ----------
1252    df: pd.DataFrame
1253        The DataFrame on which to enforce dtypes.
1254
1255    dtypes: Dict[str, str]
1256        The data types to attempt to enforce on the DataFrame.
1257
1258    explicit_dtypes: Optional[Dict[str, str]], default None
1259        If provided, automatic dtype coersion will respect explicitly configured
1260        dtypes (`int`, `float`, `numeric`).
1261
1262    safe_copy: bool, default True
1263        If `True`, create a copy before comparing and modifying the dataframes.
1264        Setting to `False` may mutate the DataFrames.
1265        See `meerschaum.utils.dataframe.filter_unseen_df`.
1266
1267    coerce_numeric: bool, default False
1268        If `True`, convert float and int collisions to numeric.
1269
1270    coerce_timezone: bool, default True
1271        If `True`, convert datetimes to UTC.
1272
1273    strip_timezone: bool, default False
1274        If `coerce_timezone` and `strip_timezone` are `True`,
1275        remove timezone information from datetimes.
1276
1277    debug: bool, default False
1278        Verbosity toggle.
1279
1280    Returns
1281    -------
1282    The Pandas DataFrame with the types enforced.
1283    """
1284    import json
1285    import functools
1286    from meerschaum.utils.debug import dprint
1287    from meerschaum.utils.formatting import pprint
1288    from meerschaum.utils.dtypes import (
1289        are_dtypes_equal,
1290        to_pandas_dtype,
1291        is_dtype_numeric,
1292        attempt_cast_to_numeric,
1293        attempt_cast_to_uuid,
1294        attempt_cast_to_bytes,
1295        attempt_cast_to_geometry,
1296        coerce_timezone as _coerce_timezone,
1297        get_geometry_type_srid,
1298    )
1299    from meerschaum.utils.dtypes.sql import get_numeric_precision_scale
1300    pandas = mrsm.attempt_import('pandas')
1301    is_dask = 'dask' in df.__module__
1302    if safe_copy:
1303        df = df.copy()
1304    if len(df.columns) == 0:
1305        if debug:
1306            dprint("Incoming DataFrame has no columns. Skipping enforcement...")
1307        return df
1308
1309    explicit_dtypes = explicit_dtypes or {}
1310    pipe_pandas_dtypes = {
1311        col: to_pandas_dtype(typ)
1312        for col, typ in dtypes.items()
1313    }
1314    json_cols = [
1315        col
1316        for col, typ in dtypes.items()
1317        if typ == 'json'
1318    ]
1319    numeric_cols = [
1320        col
1321        for col, typ in dtypes.items()
1322        if typ.startswith('numeric')
1323    ]
1324    geometry_cols_types_srids = {
1325        col: get_geometry_type_srid(typ, default_srid=0)
1326        for col, typ in dtypes.items()
1327        if typ.startswith('geometry') or typ.startswith('geography')
1328    }
1329    uuid_cols = [
1330        col
1331        for col, typ in dtypes.items()
1332        if typ == 'uuid'
1333    ]
1334    bytes_cols = [
1335        col
1336        for col, typ in dtypes.items()
1337        if typ == 'bytes'
1338    ]
1339    datetime_cols = [
1340        col
1341        for col, typ in dtypes.items()
1342        if are_dtypes_equal(typ, 'datetime')
1343    ]
1344    df_numeric_cols = get_numeric_cols(df)
1345    if debug:
1346        dprint("Desired data types:")
1347        pprint(dtypes)
1348        dprint("Data types for incoming DataFrame:")
1349        pprint({_col: str(_typ) for _col, _typ in df.dtypes.items()})
1350
1351    if json_cols and len(df) > 0:
1352        if debug:
1353            dprint(f"Checking columns for JSON encoding: {json_cols}")
1354        for col in json_cols:
1355            if col in df.columns:
1356                try:
1357                    df[col] = df[col].apply(
1358                        (
1359                            lambda x: (
1360                                json.loads(x)
1361                                if isinstance(x, str)
1362                                else x
1363                            )
1364                        )
1365                    )
1366                except Exception as e:
1367                    if debug:
1368                        dprint(f"Unable to parse column '{col}' as JSON:\n{e}")
1369
1370    if numeric_cols:
1371        if debug:
1372            dprint(f"Checking for numerics: {numeric_cols}")
1373        for col in numeric_cols:
1374            precision, scale = get_numeric_precision_scale(None, dtypes.get(col, ''))
1375            if col in df.columns:
1376                try:
1377                    df[col] = df[col].apply(
1378                        functools.partial(
1379                            attempt_cast_to_numeric,
1380                            quantize=True,
1381                            precision=precision,
1382                            scale=scale,
1383                        )
1384                    )
1385                except Exception as e:
1386                    if debug:
1387                        dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}")
1388
1389    if uuid_cols:
1390        if debug:
1391            dprint(f"Checking for UUIDs: {uuid_cols}")
1392        for col in uuid_cols:
1393            if col in df.columns:
1394                try:
1395                    df[col] = df[col].apply(attempt_cast_to_uuid)
1396                except Exception as e:
1397                    if debug:
1398                        dprint(f"Unable to parse column '{col}' as UUID:\n{e}")
1399
1400    if bytes_cols:
1401        if debug:
1402            dprint(f"Checking for bytes: {bytes_cols}")
1403        for col in bytes_cols:
1404            if col in df.columns:
1405                try:
1406                    df[col] = df[col].apply(attempt_cast_to_bytes)
1407                except Exception as e:
1408                    if debug:
1409                        dprint(f"Unable to parse column '{col}' as bytes:\n{e}")
1410
1411    if datetime_cols and coerce_timezone:
1412        if debug:
1413            dprint(f"Checking for datetime conversion: {datetime_cols}")
1414        for col in datetime_cols:
1415            if col in df.columns:
1416                if not strip_timezone and 'utc' in str(df.dtypes[col]).lower():
1417                    if debug:
1418                        dprint(f"Skip UTC coersion for column '{col}' ({str(df[col].dtype)}).")
1419                    continue
1420                if strip_timezone and ',' not in str(df.dtypes[col]):
1421                    if debug:
1422                        dprint(
1423                            f"Skip UTC coersion (stripped) for column '{col}' "
1424                            f"({str(df[col].dtype)})."
1425                        )
1426                        continue
1427
1428                if debug:
1429                    dprint(
1430                        f"Data type for column '{col}' before timezone coersion: "
1431                        f"{str(df[col].dtype)}"
1432                    )
1433
1434                df[col] = _coerce_timezone(df[col], strip_utc=strip_timezone)
1435                if debug:
1436                    dprint(
1437                        f"Data type for column '{col}' after timezone coersion: "
1438                        f"{str(df[col].dtype)}"
1439                    )
1440
1441    if geometry_cols_types_srids:
1442        geopandas = mrsm.attempt_import('geopandas')
1443        if debug:
1444            dprint(f"Checking for geometry: {list(geometry_cols_types_srids)}")
1445        parsed_geom_cols = []
1446        for col in geometry_cols_types_srids:
1447            if col not in df.columns:
1448                continue
1449            try:
1450                df[col] = attempt_cast_to_geometry(df[col])
1451                parsed_geom_cols.append(col)
1452            except Exception as e:
1453                import traceback
1454                traceback.print_exc()
1455                if debug:
1456                    dprint(f"Unable to parse column '{col}' as geometry:\n{e}")
1457
1458        if parsed_geom_cols:
1459            if debug:
1460                dprint(f"Converting to GeoDataFrame (geometry column: '{parsed_geom_cols[0]}')...")
1461            try:
1462                _, default_srid = geometry_cols_types_srids[parsed_geom_cols[0]]
1463                df = geopandas.GeoDataFrame(df, geometry=parsed_geom_cols[0], crs=default_srid)
1464                for col, (_, srid) in geometry_cols_types_srids.items():
1465                    if srid:
1466                        if debug:
1467                            dprint(f"Setting '{col}' to SRID '{srid}'...")
1468                        _ = df[col].set_crs(srid)
1469                if parsed_geom_cols[0] not in df.columns:
1470                    df.rename_geometry(parsed_geom_cols[0], inplace=True)
1471            except (ValueError, TypeError):
1472                import traceback
1473                dprint(f"Failed to cast to GeoDataFrame:\n{traceback.format_exc()}")
1474
1475    df_dtypes = {c: str(t) for c, t in df.dtypes.items()}
1476    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
1477        if debug:
1478            dprint("Data types match. Exiting enforcement...")
1479        return df
1480
1481    common_dtypes = {}
1482    common_diff_dtypes = {}
1483    for col, typ in pipe_pandas_dtypes.items():
1484        if col in df_dtypes:
1485            common_dtypes[col] = typ
1486            if not are_dtypes_equal(typ, df_dtypes[col]):
1487                common_diff_dtypes[col] = df_dtypes[col]
1488
1489    if debug:
1490        dprint("Common columns with different dtypes:")
1491        pprint(common_diff_dtypes)
1492
1493    detected_dt_cols = {}
1494    for col, typ in common_diff_dtypes.items():
1495        if 'datetime' in typ and 'datetime' in common_dtypes[col]:
1496            df_dtypes[col] = typ
1497            detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col])
1498    for col in detected_dt_cols:
1499        del common_diff_dtypes[col]
1500
1501    if debug:
1502        dprint("Common columns with different dtypes (after dates):")
1503        pprint(common_diff_dtypes)
1504
1505    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
1506        if debug:
1507            dprint(
1508                "The incoming DataFrame has mostly the same types, skipping enforcement."
1509                + "The only detected difference was in the following datetime columns."
1510            )
1511            pprint(detected_dt_cols)
1512        return df
1513
1514    for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items():
1515        previous_typ = common_dtypes[col]
1516        mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ))
1517        explicitly_float = are_dtypes_equal(explicit_dtypes.get(col, 'object'), 'float')
1518        explicitly_int = are_dtypes_equal(explicit_dtypes.get(col, 'object'), 'int')
1519        explicitly_numeric = explicit_dtypes.get(col, 'object').startswith('numeric')
1520        all_nan = (
1521            df[col].isnull().all()
1522            if mixed_numeric_types and coerce_numeric and not (explicitly_float or explicitly_int)
1523            else None
1524        )
1525        cast_to_numeric = explicitly_numeric or (
1526            (
1527                col in df_numeric_cols
1528                or (
1529                    mixed_numeric_types
1530                    and not (explicitly_float or explicitly_int)
1531                    and not all_nan
1532                    and coerce_numeric
1533                )
1534            )
1535        )
1536
1537        if debug and (explicitly_numeric or df_numeric_cols or mixed_numeric_types):
1538            from meerschaum.utils.formatting import make_header
1539            msg = (
1540                make_header(f"Coercing column '{col}' to numeric:", left_pad=0)
1541                + "\n"
1542                + f"  Previous type: {previous_typ}\n"
1543                + f"  Current type: {typ if col not in df_numeric_cols else 'Decimal'}"
1544                + ("\n  Column is explicitly numeric." if explicitly_numeric else "")
1545            ) if cast_to_numeric else (
1546                f"Will not coerce column '{col}' to numeric.\n"
1547                f"  Numeric columns in dataframe: {df_numeric_cols}\n"
1548                f"  Mixed numeric types: {mixed_numeric_types}\n"
1549                f"  Explicitly float: {explicitly_float}\n"
1550                f"  Explicitly int: {explicitly_int}\n"
1551                f"  All NaN: {all_nan}\n"
1552                f"  Coerce numeric: {coerce_numeric}"
1553            )
1554            dprint(msg)
1555
1556        if cast_to_numeric:
1557            common_dtypes[col] = attempt_cast_to_numeric
1558            common_diff_dtypes[col] = attempt_cast_to_numeric
1559
1560    for d in common_diff_dtypes:
1561        t = common_dtypes[d]
1562        if debug:
1563            dprint(f"Casting column {d} to dtype {t}.")
1564        try:
1565            df[d] = (
1566                df[d].apply(t)
1567                if callable(t)
1568                else df[d].astype(t)
1569            )
1570        except Exception as e:
1571            if debug:
1572                dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}\ndf:\n{df}")
1573            if 'int' in str(t).lower():
1574                try:
1575                    df[d] = df[d].astype('float64').astype(t)
1576                except Exception:
1577                    if debug:
1578                        dprint(f"Was unable to convert to float then {t}.")
1579    return df
1580
1581
1582def get_datetime_bound_from_df(
1583    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]],
1584    datetime_column: str,
1585    minimum: bool = True,
1586) -> Union[int, datetime, None]:
1587    """
1588    Return the minimum or maximum datetime (or integer) from a DataFrame.
1589
1590    Parameters
1591    ----------
1592    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]
1593        The DataFrame, list, or dict which contains the range axis.
1594
1595    datetime_column: str
1596        The name of the datetime (or int) column.
1597
1598    minimum: bool
1599        Whether to return the minimum (default) or maximum value.
1600
1601    Returns
1602    -------
1603    The minimum or maximum datetime value in the dataframe, or `None`.
1604    """
1605    from meerschaum.utils.dtypes import to_datetime, value_is_null
1606
1607    if df is None:
1608        return None
1609    if not datetime_column:
1610        return None
1611
1612    def compare(a, b):
1613        if a is None:
1614            return b
1615        if b is None:
1616            return a
1617        if minimum:
1618            return a if a < b else b
1619        return a if a > b else b
1620
1621    if isinstance(df, list):
1622        if len(df) == 0:
1623            return None
1624        best_yet = df[0].get(datetime_column, None)
1625        for doc in df:
1626            val = doc.get(datetime_column, None)
1627            best_yet = compare(best_yet, val)
1628        return best_yet
1629
1630    if isinstance(df, dict):
1631        if datetime_column not in df:
1632            return None
1633        best_yet = df[datetime_column][0]
1634        for val in df[datetime_column]:
1635            best_yet = compare(best_yet, val)
1636        return best_yet
1637
1638    if 'DataFrame' in str(type(df)):
1639        from meerschaum.utils.dtypes import are_dtypes_equal
1640        pandas = mrsm.attempt_import('pandas')
1641        is_dask = 'dask' in df.__module__
1642
1643        if datetime_column not in df.columns:
1644            return None
1645
1646        try:
1647            dt_val = (
1648                df[datetime_column].min(skipna=True)
1649                if minimum
1650                else df[datetime_column].max(skipna=True)
1651            )
1652        except Exception:
1653            dt_val = pandas.NA
1654        if is_dask and dt_val is not None and dt_val is not pandas.NA:
1655            dt_val = dt_val.compute()
1656
1657        return (
1658            to_datetime(dt_val, as_pydatetime=True)
1659            if are_dtypes_equal(str(type(dt_val)), 'datetime')
1660            else (dt_val if not value_is_null(dt_val) else None)
1661        )
1662
1663    return None
1664
1665
1666def get_unique_index_values(
1667    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]],
1668    indices: List[str],
1669) -> Dict[str, List[Any]]:
1670    """
1671    Return a dictionary of the unique index values in a DataFrame.
1672
1673    Parameters
1674    ----------
1675    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]
1676        The dataframe (or list or dict) which contains index values.
1677
1678    indices: List[str]
1679        The list of index columns.
1680
1681    Returns
1682    -------
1683    A dictionary mapping indices to unique values.
1684    """
1685    if df is None:
1686        return {}
1687    if 'dataframe' in str(type(df)).lower():
1688        pandas = mrsm.attempt_import('pandas')
1689        return {
1690            col: list({
1691                (val if val is not pandas.NA else None)
1692                for val in df[col].unique()
1693            })
1694            for col in indices
1695            if col in df.columns
1696        }
1697
1698    unique_indices = defaultdict(lambda: set())
1699    if isinstance(df, list):
1700        for doc in df:
1701            for index in indices:
1702                if index in doc:
1703                    unique_indices[index].add(doc[index])
1704
1705    elif isinstance(df, dict):
1706        for index in indices:
1707            if index in df:
1708                unique_indices[index] = unique_indices[index].union(set(df[index]))
1709
1710    return {key: list(val) for key, val in unique_indices.items()}
1711
1712
1713def df_is_chunk_generator(df: Any) -> bool:
1714    """
1715    Determine whether to treat `df` as a chunk generator.
1716
1717    Note this should only be used in a context where generators are expected,
1718    as it will return `True` for any iterable.
1719
1720    Parameters
1721    ----------
1722    The DataFrame or chunk generator to evaluate.
1723
1724    Returns
1725    -------
1726    A `bool` indicating whether to treat `df` as a generator.
1727    """
1728    return (
1729        not isinstance(df, (dict, list, str))
1730        and 'DataFrame' not in str(type(df))
1731        and isinstance(df, (Generator, Iterable, Iterator))
1732    )
1733
1734
1735def chunksize_to_npartitions(chunksize: Optional[int]) -> int:
1736    """
1737    Return the Dask `npartitions` value for a given `chunksize`.
1738    """
1739    if chunksize == -1:
1740        from meerschaum.config import get_config
1741        chunksize = get_config('system', 'connectors', 'sql', 'chunksize')
1742    if chunksize is None:
1743        return 1
1744    return -1 * chunksize
1745
1746
1747def df_from_literal(
1748    pipe: Optional[mrsm.Pipe] = None,
1749    literal: Optional[str] = None,
1750    debug: bool = False
1751) -> 'pd.DataFrame':
1752    """
1753    Construct a dataframe from a literal value, using the pipe's datetime and value column names.
1754
1755    Parameters
1756    ----------
1757    pipe: Optional['meerschaum.Pipe'], default None
1758        The pipe which will consume the literal value.
1759
1760    Returns
1761    -------
1762    A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
1763    and the literal as the value.
1764    """
1765    from meerschaum.utils.packages import import_pandas
1766    from meerschaum.utils.warnings import error, warn
1767    from meerschaum.utils.debug import dprint
1768    from meerschaum.utils.dtypes import get_current_timestamp
1769
1770    if pipe is None or literal is None:
1771        error("Please provide a Pipe and a literal value")
1772
1773    dt_col = pipe.columns.get(
1774        'datetime',
1775        mrsm.get_config('pipes', 'autotime', 'column_name_if_datetime_missing')
1776    )
1777    val_col = pipe.get_val_column(debug=debug)
1778
1779    val = literal
1780    if isinstance(literal, str):
1781        if debug:
1782            dprint(f"Received literal string: '{literal}'")
1783        import ast
1784        try:
1785            val = ast.literal_eval(literal)
1786        except Exception:
1787            warn(
1788                "Failed to parse value from string:\n" + f"{literal}" +
1789                "\n\nWill cast as a string instead."\
1790            )
1791            val = literal
1792
1793    now = get_current_timestamp(pipe.precision)
1794    pd = import_pandas()
1795    return pd.DataFrame({dt_col: [now], val_col: [val]})
1796
1797
1798def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]:
1799    """
1800    Return the first valid Dask DataFrame partition (if possible).
1801    """
1802    pdf = None
1803    for partition in ddf.partitions:
1804        try:
1805            pdf = partition.compute()
1806        except Exception:
1807            continue
1808        if len(pdf) > 0:
1809            return pdf
1810    _ = mrsm.attempt_import('partd', lazy=False)
1811    return ddf.compute()
1812
1813
1814def query_df(
1815    df: 'pd.DataFrame',
1816    params: Optional[Dict[str, Any]] = None,
1817    begin: Union[datetime, int, None] = None,
1818    end: Union[datetime, int, None] = None,
1819    datetime_column: Optional[str] = None,
1820    select_columns: Optional[List[str]] = None,
1821    omit_columns: Optional[List[str]] = None,
1822    inplace: bool = False,
1823    reset_index: bool = False,
1824    coerce_types: bool = False,
1825    debug: bool = False,
1826) -> 'pd.DataFrame':
1827    """
1828    Query the dataframe with the params dictionary.
1829
1830    Parameters
1831    ----------
1832    df: pd.DataFrame
1833        The DataFrame to query against.
1834
1835    params: Optional[Dict[str, Any]], default None
1836        The parameters dictionary to use for the query.
1837
1838    begin: Union[datetime, int, None], default None
1839        If `begin` and `datetime_column` are provided, only return rows with a timestamp
1840        greater than or equal to this value.
1841
1842    end: Union[datetime, int, None], default None
1843        If `begin` and `datetime_column` are provided, only return rows with a timestamp
1844        less than this value.
1845
1846    datetime_column: Optional[str], default None
1847        A `datetime_column` must be provided to use `begin` and `end`.
1848
1849    select_columns: Optional[List[str]], default None
1850        If provided, only return these columns.
1851
1852    omit_columns: Optional[List[str]], default None
1853        If provided, do not include these columns in the result.
1854
1855    inplace: bool, default False
1856        If `True`, modify the DataFrame inplace rather than creating a new DataFrame.
1857
1858    reset_index: bool, default False
1859        If `True`, reset the index in the resulting DataFrame.
1860
1861    coerce_types: bool, default False
1862        If `True`, cast the dataframe and parameters as strings before querying.
1863
1864    Returns
1865    -------
1866    A Pandas DataFrame query result.
1867    """
1868
1869    def _process_select_columns(_df):
1870        if not select_columns:
1871            return
1872        for col in list(_df.columns):
1873            if col not in select_columns:
1874                del _df[col]
1875
1876    def _process_omit_columns(_df):
1877        if not omit_columns:
1878            return
1879        for col in list(_df.columns):
1880            if col in omit_columns:
1881                del _df[col]
1882
1883    if not params and not begin and not end:
1884        if not inplace:
1885            df = df.copy()
1886        _process_select_columns(df)
1887        _process_omit_columns(df)
1888        return df
1889
1890    from meerschaum.utils.debug import dprint
1891    from meerschaum.utils.misc import get_in_ex_params
1892    from meerschaum.utils.warnings import warn
1893    from meerschaum.utils.dtypes import are_dtypes_equal, value_is_null
1894    dateutil_parser = mrsm.attempt_import('dateutil.parser')
1895    pandas = mrsm.attempt_import('pandas')
1896    NA = pandas.NA
1897
1898    if params:
1899        proto_in_ex_params = get_in_ex_params(params)
1900        for key, (proto_in_vals, proto_ex_vals) in proto_in_ex_params.items():
1901            if proto_ex_vals:
1902                coerce_types = True
1903                break
1904        params = params.copy()
1905        for key, val in {k: v for k, v in params.items()}.items():
1906            if isinstance(val, (list, tuple, set)) or hasattr(val, 'astype'):
1907                if None in val:
1908                    val = [item for item in val if item is not None] + [NA]
1909                    params[key] = val
1910                if coerce_types:
1911                    params[key] = [str(x) for x in val]
1912            else:
1913                if value_is_null(val):
1914                    val = NA
1915                    params[key] = NA
1916                if coerce_types:
1917                    params[key] = str(val)
1918
1919    dtypes = {col: str(typ) for col, typ in df.dtypes.items()}
1920
1921    if inplace:
1922        df.fillna(NA, inplace=True)
1923    else:
1924        df = df.infer_objects().fillna(NA)
1925
1926    if isinstance(begin, str):
1927        begin = dateutil_parser.parse(begin)
1928    if isinstance(end, str):
1929        end = dateutil_parser.parse(end)
1930
1931    if begin is not None or end is not None:
1932        if not datetime_column or datetime_column not in df.columns:
1933            warn(
1934                f"The datetime column '{datetime_column}' is not present in the Dataframe, "
1935                + "ignoring begin and end...",
1936            )
1937            begin, end = None, None
1938
1939    if debug:
1940        dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}")
1941
1942    if datetime_column and (begin is not None or end is not None):
1943        if debug:
1944            dprint("Checking for datetime column compatability.")
1945
1946        from meerschaum.utils.dtypes import coerce_timezone
1947        df_is_dt = are_dtypes_equal(str(df.dtypes[datetime_column]), 'datetime')
1948        begin_is_int = are_dtypes_equal(str(type(begin)), 'int')
1949        end_is_int = are_dtypes_equal(str(type(end)), 'int')
1950
1951        if df_is_dt:
1952            df_tz = (
1953                getattr(df[datetime_column].dt, 'tz', None)
1954                if hasattr(df[datetime_column], 'dt')
1955                else None
1956            )
1957
1958            if begin_is_int:
1959                begin = datetime.fromtimestamp(int(begin), timezone.utc).replace(tzinfo=None)
1960                if debug:
1961                    dprint(f"`begin` will be cast to '{begin}'.")
1962            if end_is_int:
1963                end = datetime.fromtimestamp(int(end), timezone.utc).replace(tzinfo=None)
1964                if debug:
1965                    dprint(f"`end` will be cast to '{end}'.")
1966
1967            begin = coerce_timezone(begin, strip_utc=(df_tz is None)) if begin is not None else None
1968            end = coerce_timezone(end, strip_utc=(df_tz is None)) if begin is not None else None
1969
1970    in_ex_params = get_in_ex_params(params)
1971
1972    masks = [
1973        (
1974            (df[datetime_column] >= begin)
1975            if begin is not None and datetime_column
1976            else True
1977        ) & (
1978            (df[datetime_column] < end)
1979            if end is not None and datetime_column
1980            else True
1981        )
1982    ]
1983
1984    masks.extend([
1985        (
1986            (
1987                (df[col] if not coerce_types else df[col].astype(str)).isin(in_vals)
1988                if in_vals
1989                else True
1990            ) & (
1991                ~(df[col] if not coerce_types else df[col].astype(str)).isin(ex_vals)
1992                if ex_vals
1993                else True
1994            )
1995        )
1996        for col, (in_vals, ex_vals) in in_ex_params.items()
1997        if col in df.columns
1998    ])
1999    query_mask = masks[0]
2000    for mask in masks[1:]:
2001        query_mask = query_mask & mask
2002
2003    original_cols = df.columns
2004
2005    ### NOTE: We must cast bool columns to `boolean[pyarrow]`
2006    ###       to allow for `<NA>` values.
2007    bool_cols = [
2008        col
2009        for col, typ in df.dtypes.items()
2010        if are_dtypes_equal(str(typ), 'bool')
2011    ]
2012    for col in bool_cols:
2013        df[col] = df[col].astype('boolean[pyarrow]')
2014
2015    if not isinstance(query_mask, bool):
2016        df['__mrsm_mask'] = (
2017            query_mask.astype('boolean[pyarrow]')
2018            if hasattr(query_mask, 'astype')
2019            else query_mask
2020        )
2021
2022        if inplace:
2023            df.where(query_mask, other=NA, inplace=True)
2024            df.dropna(how='all', inplace=True)
2025            result_df = df
2026        else:
2027            result_df = df.where(query_mask, other=NA)
2028            result_df.dropna(how='all', inplace=True)
2029
2030    else:
2031        result_df = df
2032
2033    if '__mrsm_mask' in df.columns:
2034        del df['__mrsm_mask']
2035    if '__mrsm_mask' in result_df.columns:
2036        del result_df['__mrsm_mask']
2037
2038    if reset_index:
2039        result_df.reset_index(drop=True, inplace=True)
2040
2041    result_df = enforce_dtypes(
2042        result_df,
2043        dtypes,
2044        safe_copy=False,
2045        debug=debug,
2046        coerce_numeric=False,
2047        coerce_timezone=False,
2048    )
2049
2050    if select_columns == ['*']:
2051        select_columns = None
2052
2053    if not select_columns and not omit_columns:
2054        return result_df[original_cols]
2055
2056    _process_select_columns(result_df)
2057    _process_omit_columns(result_df)
2058
2059    return result_df
2060
2061
2062def to_json(
2063    df: 'pd.DataFrame',
2064    safe_copy: bool = True,
2065    orient: str = 'records',
2066    date_format: str = 'iso',
2067    date_unit: str = 'us',
2068    double_precision: int = 15,
2069    geometry_format: str = 'geojson',
2070    **kwargs: Any
2071) -> str:
2072    """
2073    Serialize the given dataframe as a JSON string.
2074
2075    Parameters
2076    ----------
2077    df: pd.DataFrame
2078        The DataFrame to be serialized.
2079
2080    safe_copy: bool, default True
2081        If `False`, modify the DataFrame inplace.
2082
2083    date_format: str, default 'iso'
2084        The default format for timestamps.
2085
2086    date_unit: str, default 'us'
2087        The precision of the timestamps.
2088
2089    double_precision: int, default 15
2090        The number of decimal places to use when encoding floating point values (maximum 15).
2091
2092    geometry_format: str, default 'geojson'
2093        The serialization format for geometry data.
2094        Accepted values are `geojson`, `wkb_hex`, and `wkt`.
2095
2096    Returns
2097    -------
2098    A JSON string.
2099    """
2100    import warnings
2101    import functools
2102    from meerschaum.utils.packages import import_pandas
2103    from meerschaum.utils.dtypes import (
2104        serialize_bytes,
2105        serialize_decimal,
2106        serialize_geometry,
2107    )
2108    pd = import_pandas()
2109    uuid_cols = get_uuid_cols(df)
2110    bytes_cols = get_bytes_cols(df)
2111    numeric_cols = get_numeric_cols(df)
2112    geometry_cols = get_geometry_cols(df)
2113    geometry_cols_srids = {
2114        col: int((getattr(df[col].crs, 'srs', '') or '').split(':', maxsplit=1)[-1] or '0')
2115        for col in geometry_cols
2116    } if 'geodataframe' in str(type(df)).lower() else {}
2117    if safe_copy and bool(uuid_cols or bytes_cols or geometry_cols or numeric_cols):
2118        df = df.copy()
2119    if 'geodataframe' in str(type(df)).lower():
2120        geometry_data = {
2121            col: df[col]
2122            for col in geometry_cols
2123        }
2124        df = pd.DataFrame({
2125            col: df[col]
2126            for col in df.columns
2127            if col not in geometry_cols
2128        })
2129        for col in geometry_cols:
2130            df[col] = pd.Series(ob for ob in geometry_data[col])
2131    for col in uuid_cols:
2132        df[col] = df[col].astype(str)
2133    for col in bytes_cols:
2134        df[col] = df[col].apply(serialize_bytes)
2135    for col in numeric_cols:
2136        df[col] = df[col].apply(serialize_decimal)
2137    with warnings.catch_warnings():
2138        warnings.simplefilter("ignore")
2139        for col in geometry_cols:
2140            srid = geometry_cols_srids.get(col, None) or None
2141            df[col] = pd.Series(
2142                serialize_geometry(val, geometry_format=geometry_format, srid=srid)
2143                for val in df[col]
2144            )
2145    return df.infer_objects(copy=False).fillna(pd.NA).to_json(
2146        date_format=date_format,
2147        date_unit=date_unit,
2148        double_precision=double_precision,
2149        orient=orient,
2150        **kwargs
2151    )
2152
2153
2154def to_simple_lines(df: 'pd.DataFrame') -> str:
2155    """
2156    Serialize a Pandas Dataframe as lines of simple dictionaries.
2157
2158    Parameters
2159    ----------
2160    df: pd.DataFrame
2161        The dataframe to serialize into simple lines text.
2162
2163    Returns
2164    -------
2165    A string of simple line dictionaries joined by newlines.
2166    """
2167    from meerschaum.utils.misc import to_simple_dict
2168    if df is None or len(df) == 0:
2169        return ''
2170
2171    docs = df.to_dict(orient='records')
2172    return '\n'.join(to_simple_dict(doc) for doc in docs)
2173
2174
2175def parse_simple_lines(data: str) -> 'pd.DataFrame':
2176    """
2177    Parse simple lines text into a DataFrame.
2178
2179    Parameters
2180    ----------
2181    data: str
2182        The simple lines text to parse into a DataFrame.
2183
2184    Returns
2185    -------
2186    A dataframe containing the rows serialized in `data`.
2187    """
2188    from meerschaum.utils.misc import string_to_dict
2189    from meerschaum.utils.packages import import_pandas
2190    pd = import_pandas()
2191    lines = data.splitlines()
2192    try:
2193        docs = [string_to_dict(line) for line in lines]
2194        df = pd.DataFrame(docs)
2195    except Exception:
2196        df = None
2197
2198    if df is None:
2199        raise ValueError("Cannot parse simple lines into a dataframe.")
2200
2201    return df
def add_missing_cols_to_df( df: pandas.core.frame.DataFrame, dtypes: Dict[str, Any]) -> pandas.core.frame.DataFrame:
25def add_missing_cols_to_df(
26    df: 'pd.DataFrame',
27    dtypes: Dict[str, Any],
28) -> 'pd.DataFrame':
29    """
30    Add columns from the dtypes dictionary as null columns to a new DataFrame.
31
32    Parameters
33    ----------
34    df: pd.DataFrame
35        The dataframe we should copy and add null columns.
36
37    dtypes:
38        The data types dictionary which may contain keys not present in `df.columns`.
39
40    Returns
41    -------
42    A new `DataFrame` with the keys from `dtypes` added as null columns.
43    If `df.dtypes` is the same as `dtypes`, then return a reference to `df`.
44    NOTE: This will not ensure that dtypes are enforced!
45
46    Examples
47    --------
48    >>> import pandas as pd
49    >>> df = pd.DataFrame([{'a': 1}])
50    >>> dtypes = {'b': 'Int64'}
51    >>> add_missing_cols_to_df(df, dtypes)
52          a  b
53       0  1  <NA>
54    >>> add_missing_cols_to_df(df, dtypes).dtypes
55    a    int64
56    b    Int64
57    dtype: object
58    >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes
59    a    int64
60    dtype: object
61    >>> 
62    """
63    if set(df.columns) == set(dtypes):
64        return df
65
66    from meerschaum.utils.packages import attempt_import
67    from meerschaum.utils.dtypes import to_pandas_dtype
68    pandas = attempt_import('pandas')
69
70    def build_series(dtype: str):
71        return pandas.Series([], dtype=to_pandas_dtype(dtype))
72
73    assign_kwargs = {
74        str(col): build_series(str(typ))
75        for col, typ in dtypes.items()
76        if col not in df.columns
77    }
78    df_with_cols = df.assign(**assign_kwargs)
79    for col in assign_kwargs:
80        df_with_cols[col] = df_with_cols[col].fillna(pandas.NA)
81    return df_with_cols

Add columns from the dtypes dictionary as null columns to a new DataFrame.

Parameters
  • df (pd.DataFrame): The dataframe we should copy and add null columns.
  • dtypes:: The data types dictionary which may contain keys not present in df.columns.
Returns
  • A new DataFrame with the keys from dtypes added as null columns.
  • If df.dtypes is the same as dtypes, then return a reference to df.
  • NOTE (This will not ensure that dtypes are enforced!):
Examples
>>> import pandas as pd
>>> df = pd.DataFrame([{'a': 1}])
>>> dtypes = {'b': 'Int64'}
>>> add_missing_cols_to_df(df, dtypes)
      a  b
   0  1  <NA>
>>> add_missing_cols_to_df(df, dtypes)meerschaum.utils.dtypes
a    int64
b    Int64
dtype: object
>>> add_missing_cols_to_df(df, {'a': 'object'})meerschaum.utils.dtypes
a    int64
dtype: object
>>>
def filter_unseen_df( old_df: pandas.core.frame.DataFrame, new_df: pandas.core.frame.DataFrame, safe_copy: bool = True, dtypes: Optional[Dict[str, Any]] = None, include_unchanged_columns: bool = False, coerce_mixed_numerics: bool = True, debug: bool = False) -> pandas.core.frame.DataFrame:
 84def filter_unseen_df(
 85    old_df: 'pd.DataFrame',
 86    new_df: 'pd.DataFrame',
 87    safe_copy: bool = True,
 88    dtypes: Optional[Dict[str, Any]] = None,
 89    include_unchanged_columns: bool = False,
 90    coerce_mixed_numerics: bool = True,
 91    debug: bool = False,
 92) -> 'pd.DataFrame':
 93    """
 94    Left join two DataFrames to find the newest unseen data.
 95
 96    Parameters
 97    ----------
 98    old_df: 'pd.DataFrame'
 99        The original (target) dataframe. Acts as a filter on the `new_df`.
100
101    new_df: 'pd.DataFrame'
102        The fetched (source) dataframe. Rows that are contained in `old_df` are removed.
103
104    safe_copy: bool, default True
105        If `True`, create a copy before comparing and modifying the dataframes.
106        Setting to `False` may mutate the DataFrames.
107
108    dtypes: Optional[Dict[str, Any]], default None
109        Optionally specify the datatypes of the dataframe.
110
111    include_unchanged_columns: bool, default False
112        If `True`, include columns which haven't changed on rows which have changed.
113
114    coerce_mixed_numerics: bool, default True
115        If `True`, cast mixed integer and float columns between the old and new dataframes into
116        numeric values (`decimal.Decimal`).
117
118    debug: bool, default False
119        Verbosity toggle.
120
121    Returns
122    -------
123    A pandas dataframe of the new, unseen rows in `new_df`.
124
125    Examples
126    --------
127    ```python
128    >>> import pandas as pd
129    >>> df1 = pd.DataFrame({'a': [1,2]})
130    >>> df2 = pd.DataFrame({'a': [2,3]})
131    >>> filter_unseen_df(df1, df2)
132       a
133    0  3
134
135    ```
136
137    """
138    if old_df is None:
139        return new_df
140
141    if safe_copy:
142        old_df = old_df.copy()
143        new_df = new_df.copy()
144
145    import json
146    import functools
147    import traceback
148    from meerschaum.utils.warnings import warn
149    from meerschaum.utils.packages import import_pandas, attempt_import
150    from meerschaum.utils.dtypes import (
151        to_pandas_dtype,
152        are_dtypes_equal,
153        attempt_cast_to_numeric,
154        attempt_cast_to_uuid,
155        attempt_cast_to_bytes,
156        attempt_cast_to_geometry,
157        coerce_timezone,
158        serialize_decimal,
159    )
160    from meerschaum.utils.dtypes.sql import get_numeric_precision_scale
161    pd = import_pandas(debug=debug)
162    is_dask = 'dask' in new_df.__module__
163    if is_dask:
164        pandas = attempt_import('pandas')
165        _ = attempt_import('partd', lazy=False)
166        dd = attempt_import('dask.dataframe')
167        merge = dd.merge
168        NA = pandas.NA
169    else:
170        merge = pd.merge
171        NA = pd.NA
172
173    new_df_dtypes = dict(new_df.dtypes)
174    old_df_dtypes = dict(old_df.dtypes)
175
176    same_cols = set(new_df.columns) == set(old_df.columns)
177    if not same_cols:
178        new_df = add_missing_cols_to_df(new_df, old_df_dtypes)
179        old_df = add_missing_cols_to_df(old_df, new_df_dtypes)
180
181        new_types_missing_from_old = {
182            col: typ
183            for col, typ in new_df_dtypes.items()
184            if col not in old_df_dtypes
185        }
186        old_types_missing_from_new = {
187            col: typ
188            for col, typ in new_df_dtypes.items()
189            if col not in old_df_dtypes
190        }
191        old_df_dtypes.update(new_types_missing_from_old)
192        new_df_dtypes.update(old_types_missing_from_new)
193
194    ### Edge case: two empty lists cast to DFs.
195    elif len(new_df.columns) == 0:
196        return new_df
197
198    try:
199        ### Order matters when checking equality.
200        new_df = new_df[old_df.columns]
201
202    except Exception as e:
203        warn(
204            "Was not able to cast old columns onto new DataFrame. " +
205            f"Are both DataFrames the same shape? Error:\n{e}",
206            stacklevel=3,
207        )
208        return new_df[list(new_df_dtypes.keys())]
209
210    ### assume the old_df knows what it's doing, even if it's technically wrong.
211    if dtypes is None:
212        dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()}
213
214    dtypes = {
215        col: to_pandas_dtype(typ)
216        for col, typ in dtypes.items()
217        if col in new_df_dtypes and col in old_df_dtypes
218    }
219    for col, typ in new_df_dtypes.items():
220        if col not in dtypes:
221            dtypes[col] = typ
222
223    numeric_cols_precisions_scales = {
224        col: get_numeric_precision_scale(None, typ)
225        for col, typ in dtypes.items()
226        if col and str(typ).lower().startswith('numeric')
227    }
228
229    dt_dtypes = {
230        col: typ
231        for col, typ in dtypes.items()
232        if are_dtypes_equal(typ, 'datetime')
233    }
234    non_dt_dtypes = {
235        col: typ
236        for col, typ in dtypes.items()
237        if col not in dt_dtypes
238    }
239
240    cast_non_dt_cols = True
241    try:
242        new_df = new_df.astype(non_dt_dtypes)
243        cast_non_dt_cols = False
244    except Exception as e:
245        warn(
246            f"Was not able to cast the new DataFrame to the given dtypes.\n{e}"
247        )
248
249    cast_dt_cols = True
250    try:
251        for col, typ in dt_dtypes.items():
252            _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime'))
253            strip_utc = (
254                _dtypes_col_dtype.startswith('datetime64')
255                and 'utc' not in _dtypes_col_dtype.lower()
256            )
257            if col in old_df.columns:
258                old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc)
259            if col in new_df.columns:
260                new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc)
261        cast_dt_cols = False
262    except Exception as e:
263        warn(f"Could not cast datetime columns:\n{e}")
264
265    cast_cols = cast_dt_cols or cast_non_dt_cols
266
267    new_numeric_cols_existing = get_numeric_cols(new_df)
268    old_numeric_cols = get_numeric_cols(old_df)
269    for col, typ in {k: v for k, v in dtypes.items()}.items():
270        if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')):
271            new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float')
272            new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int')
273            new_is_numeric = col in new_numeric_cols_existing
274            old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float')
275            old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int')
276            old_is_numeric = col in old_numeric_cols
277
278            if (
279                coerce_mixed_numerics
280                and
281                (new_is_float or new_is_int or new_is_numeric)
282                and
283                (old_is_float or old_is_int or old_is_numeric)
284            ):
285                dtypes[col] = attempt_cast_to_numeric
286                cast_cols = True
287                continue
288
289            ### Fallback to object if the types don't match.
290            warn(
291                f"Detected different types for '{col}' "
292                + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), "
293                + "falling back to 'object'..."
294            )
295            dtypes[col] = 'object'
296            cast_cols = True
297
298    if cast_cols:
299        for col, dtype in dtypes.items():
300            if col in new_df.columns:
301                try:
302                    new_df[col] = (
303                        new_df[col].astype(dtype)
304                        if not callable(dtype)
305                        else new_df[col].apply(dtype)
306                    )
307                except Exception as e:
308                    warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}")
309
310    serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str)
311    new_json_cols = get_json_cols(new_df)
312    old_json_cols = get_json_cols(old_df)
313    json_cols = set(new_json_cols + old_json_cols)
314    for json_col in old_json_cols:
315        old_df[json_col] = old_df[json_col].apply(serializer)
316    for json_col in new_json_cols:
317        new_df[json_col] = new_df[json_col].apply(serializer)
318
319    new_numeric_cols = get_numeric_cols(new_df)
320    numeric_cols = set(new_numeric_cols + old_numeric_cols)
321    for numeric_col in old_numeric_cols:
322        old_df[numeric_col] = old_df[numeric_col].apply(serialize_decimal)
323    for numeric_col in new_numeric_cols:
324        new_df[numeric_col] = new_df[numeric_col].apply(serialize_decimal)
325
326    old_dt_cols = [
327        col
328        for col, typ in old_df.dtypes.items()
329        if are_dtypes_equal(str(typ), 'datetime')
330    ]
331    for col in old_dt_cols:
332        _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime'))
333        strip_utc = (
334            _dtypes_col_dtype.startswith('datetime64')
335            and 'utc' not in _dtypes_col_dtype.lower()
336        )
337        old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc)
338
339    new_dt_cols = [
340        col
341        for col, typ in new_df.dtypes.items()
342        if are_dtypes_equal(str(typ), 'datetime')
343    ]
344    for col in new_dt_cols:
345        _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime'))
346        strip_utc = (
347            _dtypes_col_dtype.startswith('datetime64')
348            and 'utc' not in _dtypes_col_dtype.lower()
349        )
350        new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc)
351
352    old_uuid_cols = get_uuid_cols(old_df)
353    new_uuid_cols = get_uuid_cols(new_df)
354    uuid_cols = set(new_uuid_cols + old_uuid_cols)
355
356    old_bytes_cols = get_bytes_cols(old_df)
357    new_bytes_cols = get_bytes_cols(new_df)
358    bytes_cols = set(new_bytes_cols + old_bytes_cols)
359
360    old_geometry_cols = get_geometry_cols(old_df)
361    new_geometry_cols = get_geometry_cols(new_df)
362    geometry_cols = set(new_geometry_cols + old_geometry_cols)
363
364    na_pattern = r'(?i)^(none|nan|na|nat|natz|<na>)$'
365    joined_df = merge(
366        new_df.infer_objects(copy=False).replace(na_pattern, pd.NA, regex=True).fillna(NA),
367        old_df.infer_objects(copy=False).replace(na_pattern, pd.NA, regex=True).fillna(NA),
368        how='left',
369        on=None,
370        indicator=True,
371    )
372    changed_rows_mask = (joined_df['_merge'] == 'left_only')
373    new_cols = list(new_df_dtypes)
374    delta_df = joined_df[new_cols][changed_rows_mask].reset_index(drop=True)
375
376    delta_json_cols = get_json_cols(delta_df)
377    for json_col in json_cols:
378        if (
379            json_col in delta_json_cols
380            or json_col not in delta_df.columns
381        ):
382            continue
383        try:
384            delta_df[json_col] = delta_df[json_col].apply(
385                lambda x: (json.loads(x) if isinstance(x, str) else x)
386            )
387        except Exception:
388            warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}")
389
390    delta_numeric_cols = get_numeric_cols(delta_df)
391    for numeric_col in numeric_cols:
392        if (
393            numeric_col in delta_numeric_cols
394            or numeric_col not in delta_df.columns
395        ):
396            continue
397        try:
398            delta_df[numeric_col] = delta_df[numeric_col].apply(
399                functools.partial(
400                    attempt_cast_to_numeric,
401                    quantize=True,
402                    precision=numeric_cols_precisions_scales.get(numeric_col, (None, None)[0]),
403                    scale=numeric_cols_precisions_scales.get(numeric_col, (None, None)[1]),
404                )
405            )
406        except Exception:
407            warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}")
408
409    delta_uuid_cols = get_uuid_cols(delta_df)
410    for uuid_col in uuid_cols:
411        if (
412            uuid_col in delta_uuid_cols
413            or uuid_col not in delta_df.columns
414        ):
415            continue
416        try:
417            delta_df[uuid_col] = delta_df[uuid_col].apply(attempt_cast_to_uuid)
418        except Exception:
419            warn(f"Unable to parse numeric column '{uuid_col}':\n{traceback.format_exc()}")
420
421    delta_bytes_cols = get_bytes_cols(delta_df)
422    for bytes_col in bytes_cols:
423        if (
424            bytes_col in delta_bytes_cols
425            or bytes_col not in delta_df.columns
426        ):
427            continue
428        try:
429            delta_df[bytes_col] = delta_df[bytes_col].apply(attempt_cast_to_bytes)
430        except Exception:
431            warn(f"Unable to parse bytes column '{bytes_col}':\n{traceback.format_exc()}")
432
433    delta_geometry_cols = get_geometry_cols(delta_df)
434    for geometry_col in geometry_cols:
435        if (
436            geometry_col in delta_geometry_cols
437            or geometry_col not in delta_df.columns
438        ):
439            continue
440        try:
441            delta_df[geometry_col] = attempt_cast_to_geometry(delta_df[geometry_col])
442        except Exception:
443            warn(f"Unable to parse geometry column '{geometry_col}':\n{traceback.format_exc()}")
444
445    return delta_df

Left join two DataFrames to find the newest unseen data.

Parameters
  • old_df ('pd.DataFrame'): The original (target) dataframe. Acts as a filter on the new_df.
  • new_df ('pd.DataFrame'): The fetched (source) dataframe. Rows that are contained in old_df are removed.
  • safe_copy (bool, default True): If True, create a copy before comparing and modifying the dataframes. Setting to False may mutate the DataFrames.
  • dtypes (Optional[Dict[str, Any]], default None): Optionally specify the datatypes of the dataframe.
  • include_unchanged_columns (bool, default False): If True, include columns which haven't changed on rows which have changed.
  • coerce_mixed_numerics (bool, default True): If True, cast mixed integer and float columns between the old and new dataframes into numeric values (decimal.Decimal).
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pandas dataframe of the new, unseen rows in new_df.
Examples
>>> import pandas as pd
>>> df1 = pd.DataFrame({'a': [1,2]})
>>> df2 = pd.DataFrame({'a': [2,3]})
>>> filter_unseen_df(df1, df2)
   a
0  3
def parse_df_datetimes( df: pandas.core.frame.DataFrame, ignore_cols: Optional[Iterable[str]] = None, strip_timezone: bool = False, chunksize: Optional[int] = None, dtype_backend: str = 'numpy_nullable', ignore_all: bool = False, precision_unit: Optional[str] = None, coerce_utc: bool = True, debug: bool = False) -> pandas.core.frame.DataFrame:
448def parse_df_datetimes(
449    df: 'pd.DataFrame',
450    ignore_cols: Optional[Iterable[str]] = None,
451    strip_timezone: bool = False,
452    chunksize: Optional[int] = None,
453    dtype_backend: str = 'numpy_nullable',
454    ignore_all: bool = False,
455    precision_unit: Optional[str] = None,
456    coerce_utc: bool = True,
457    debug: bool = False,
458) -> 'pd.DataFrame':
459    """
460    Parse a pandas DataFrame for datetime columns and cast as datetimes.
461
462    Parameters
463    ----------
464    df: pd.DataFrame
465        The pandas DataFrame to parse.
466
467    ignore_cols: Optional[Iterable[str]], default None
468        If provided, do not attempt to coerce these columns as datetimes.
469
470    strip_timezone: bool, default False
471        If `True`, remove the UTC `tzinfo` property.
472
473    chunksize: Optional[int], default None
474        If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe.
475
476    dtype_backend: str, default 'numpy_nullable'
477        If `df` is not a DataFrame and new one needs to be constructed,
478        use this as the datatypes backend.
479        Accepted values are 'numpy_nullable' and 'pyarrow'.
480
481    ignore_all: bool, default False
482        If `True`, do not attempt to cast any columns to datetimes.
483
484    precision_unit: Optional[str], default None
485        If provided, enforce the given precision on the coerced datetime columns.
486
487    coerce_utc: bool, default True
488        Coerce the datetime columns to UTC (see `meerschaum.utils.dtypes.to_datetime()`).
489
490    debug: bool, default False
491        Verbosity toggle.
492
493    Returns
494    -------
495    A new pandas DataFrame with the determined datetime columns
496    (usually ISO strings) cast as datetimes.
497
498    Examples
499    --------
500    ```python
501    >>> import pandas as pd
502    >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 
503    >>> df.dtypes
504    a    object
505    dtype: object
506    >>> df2 = parse_df_datetimes(df)
507    >>> df2.dtypes
508    a    datetime64[us, UTC]
509    dtype: object
510
511    ```
512
513    """
514    from meerschaum.utils.packages import import_pandas, attempt_import
515    from meerschaum.utils.debug import dprint
516    from meerschaum.utils.warnings import warn
517    from meerschaum.utils.misc import items_str
518    from meerschaum.utils.dtypes import to_datetime, MRSM_PD_DTYPES
519    import traceback
520
521    pd = import_pandas()
522    pandas = attempt_import('pandas')
523    pd_name = pd.__name__
524    using_dask = 'dask' in pd_name
525    df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__)
526    dask_dataframe = None
527    if using_dask or df_is_dask:
528        npartitions = chunksize_to_npartitions(chunksize)
529        dask_dataframe = attempt_import('dask.dataframe')
530
531    ### if df is a dict, build DataFrame
532    if isinstance(df, pandas.DataFrame):
533        pdf = df
534    elif df_is_dask and isinstance(df, dask_dataframe.DataFrame):
535        pdf = get_first_valid_dask_partition(df)
536    else:
537        if debug:
538            dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...")
539
540        if using_dask:
541            if isinstance(df, list):
542                keys = set()
543                for doc in df:
544                    for key in doc:
545                        keys.add(key)
546                df = pd.DataFrame.from_dict(
547                    {
548                        k: [
549                            doc.get(k, None)
550                            for doc in df
551                        ] for k in keys
552                    },
553                    npartitions=npartitions,
554                )
555            elif isinstance(df, dict):
556                df = pd.DataFrame.from_dict(df, npartitions=npartitions)
557            elif 'pandas.core.frame.DataFrame' in str(type(df)):
558                df = pd.from_pandas(df, npartitions=npartitions)
559            else:
560                raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.")
561            pandas = attempt_import('pandas')
562            pdf = get_first_valid_dask_partition(df)
563
564        else:
565            df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend)
566            pdf = df
567
568    ### skip parsing if DataFrame is empty
569    if len(pdf) == 0:
570        if debug:
571            dprint("df is empty. Returning original DataFrame without casting datetime columns...")
572        return df
573
574    ignore_cols = set(
575        (ignore_cols or []) + [
576            col
577            for col, dtype in pdf.dtypes.items() 
578            if 'datetime' in str(dtype)
579        ]
580    )
581    cols_to_inspect = [
582        col
583        for col in pdf.columns
584        if col not in ignore_cols
585    ] if not ignore_all else []
586
587    if len(cols_to_inspect) == 0:
588        if debug:
589            dprint("All columns are ignored, skipping datetime detection...")
590        return df.infer_objects(copy=False).fillna(pandas.NA)
591
592    ### apply regex to columns to determine which are ISO datetimes
593    iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+'
594    dt_mask = pdf[cols_to_inspect].astype(str).apply(
595        lambda s: s.str.match(iso_dt_regex).all()
596    )
597
598    ### list of datetime column names
599    datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]]
600    if not datetime_cols:
601        if debug:
602            dprint("No columns detected as datetimes, returning...")
603        return df.infer_objects(copy=False).fillna(pandas.NA)
604
605    if debug:
606        dprint("Converting columns to datetimes: " + str(datetime_cols))
607
608    def _parse_to_datetime(x):
609        return to_datetime(x, precision_unit=precision_unit, coerce_utc=coerce_utc)
610
611    try:
612        if not using_dask:
613            df[datetime_cols] = df[datetime_cols].apply(_parse_to_datetime)
614        else:
615            df[datetime_cols] = df[datetime_cols].apply(
616                _parse_to_datetime,
617                utc=True,
618                axis=1,
619                meta={
620                    col: MRSM_PD_DTYPES['datetime']
621                    for col in datetime_cols
622                }
623            )
624    except Exception:
625        warn(
626            f"Unable to apply `to_datetime()` to {items_str(datetime_cols)}:\n"
627            + f"{traceback.format_exc()}"
628        )
629
630    if strip_timezone:
631        for dt in datetime_cols:
632            try:
633                df[dt] = df[dt].dt.tz_localize(None)
634            except Exception:
635                warn(
636                    f"Unable to convert column '{dt}' to naive datetime:\n"
637                    + f"{traceback.format_exc()}"
638                )
639
640    return df.fillna(pandas.NA)

Parse a pandas DataFrame for datetime columns and cast as datetimes.

Parameters
  • df (pd.DataFrame): The pandas DataFrame to parse.
  • ignore_cols (Optional[Iterable[str]], default None): If provided, do not attempt to coerce these columns as datetimes.
  • strip_timezone (bool, default False): If True, remove the UTC tzinfo property.
  • chunksize (Optional[int], default None): If the pandas implementation is 'dask', use this chunksize for the distributed dataframe.
  • dtype_backend (str, default 'numpy_nullable'): If df is not a DataFrame and new one needs to be constructed, use this as the datatypes backend. Accepted values are 'numpy_nullable' and 'pyarrow'.
  • ignore_all (bool, default False): If True, do not attempt to cast any columns to datetimes.
  • precision_unit (Optional[str], default None): If provided, enforce the given precision on the coerced datetime columns.
  • coerce_utc (bool, default True): Coerce the datetime columns to UTC (see meerschaum.utils.dtypes.to_datetime()).
  • debug (bool, default False): Verbosity toggle.
Returns
  • A new pandas DataFrame with the determined datetime columns
  • (usually ISO strings) cast as datetimes.
Examples
>>> import pandas as pd
>>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 
>>> df.dtypes
a    object
dtype: object
>>> df2 = parse_df_datetimes(df)
>>> df2.dtypes
a    datetime64[us, UTC]
dtype: object
def get_unhashable_cols(df: pandas.core.frame.DataFrame) -> List[str]:
643def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]:
644    """
645    Get the columns which contain unhashable objects from a Pandas DataFrame.
646
647    Parameters
648    ----------
649    df: pd.DataFrame
650        The DataFrame which may contain unhashable objects.
651
652    Returns
653    -------
654    A list of columns.
655    """
656    if df is None:
657        return []
658    if len(df) == 0:
659        return []
660
661    is_dask = 'dask' in df.__module__
662    if is_dask:
663        from meerschaum.utils.packages import attempt_import
664        pandas = attempt_import('pandas')
665        df = pandas.DataFrame(get_first_valid_dask_partition(df))
666    return [
667        col for col, val in df.iloc[0].items()
668        if not isinstance(val, Hashable)
669    ]

Get the columns which contain unhashable objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
  • A list of columns.
def get_json_cols(df: pandas.core.frame.DataFrame) -> List[str]:
672def get_json_cols(df: 'pd.DataFrame') -> List[str]:
673    """
674    Get the columns which contain unhashable objects from a Pandas DataFrame.
675
676    Parameters
677    ----------
678    df: pd.DataFrame
679        The DataFrame which may contain unhashable objects.
680
681    Returns
682    -------
683    A list of columns to be encoded as JSON.
684    """
685    if df is None:
686        return []
687
688    is_dask = 'dask' in df.__module__ if hasattr(df, '__module__') else False
689    if is_dask:
690        df = get_first_valid_dask_partition(df)
691
692    if len(df) == 0:
693        return []
694
695    cols_indices = {
696        col: df[col].first_valid_index()
697        for col in df.columns
698    }
699    return [
700        col
701        for col, ix in cols_indices.items()
702        if (
703            ix is not None
704            and isinstance(df.loc[ix][col], (dict, list))
705        )
706    ]

Get the columns which contain unhashable objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
  • A list of columns to be encoded as JSON.
def get_numeric_cols(df: pandas.core.frame.DataFrame) -> List[str]:
709def get_numeric_cols(df: 'pd.DataFrame') -> List[str]:
710    """
711    Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame.
712
713    Parameters
714    ----------
715    df: pd.DataFrame
716        The DataFrame which may contain decimal objects.
717
718    Returns
719    -------
720    A list of columns to treat as numerics.
721    """
722    if df is None:
723        return []
724    from decimal import Decimal
725    is_dask = 'dask' in df.__module__
726    if is_dask:
727        df = get_first_valid_dask_partition(df)
728
729    if len(df) == 0:
730        return []
731
732    cols_indices = {
733        col: df[col].first_valid_index()
734        for col in df.columns
735    }
736    return [
737        col
738        for col, ix in cols_indices.items()
739        if (
740            ix is not None
741            and
742            isinstance(df.loc[ix][col], Decimal)
743        )
744    ]

Get the columns which contain decimal.Decimal objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain decimal objects.
Returns
  • A list of columns to treat as numerics.
def get_bool_cols(df: pandas.core.frame.DataFrame) -> List[str]:
747def get_bool_cols(df: 'pd.DataFrame') -> List[str]:
748    """
749    Get the columns which contain `bool` objects from a Pandas DataFrame.
750
751    Parameters
752    ----------
753    df: pd.DataFrame
754        The DataFrame which may contain bools.
755
756    Returns
757    -------
758    A list of columns to treat as bools.
759    """
760    if df is None:
761        return []
762
763    is_dask = 'dask' in df.__module__
764    if is_dask:
765        df = get_first_valid_dask_partition(df)
766
767    if len(df) == 0:
768        return []
769
770    from meerschaum.utils.dtypes import are_dtypes_equal
771
772    return [
773        col
774        for col, typ in df.dtypes.items()
775        if are_dtypes_equal(str(typ), 'bool')
776    ]

Get the columns which contain bool objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain bools.
Returns
  • A list of columns to treat as bools.
def get_uuid_cols(df: pandas.core.frame.DataFrame) -> List[str]:
779def get_uuid_cols(df: 'pd.DataFrame') -> List[str]:
780    """
781    Get the columns which contain `uuid.UUID` objects from a Pandas DataFrame.
782
783    Parameters
784    ----------
785    df: pd.DataFrame
786        The DataFrame which may contain UUID objects.
787
788    Returns
789    -------
790    A list of columns to treat as UUIDs.
791    """
792    if df is None:
793        return []
794    from uuid import UUID
795    is_dask = 'dask' in df.__module__
796    if is_dask:
797        df = get_first_valid_dask_partition(df)
798
799    if len(df) == 0:
800        return []
801
802    cols_indices = {
803        col: df[col].first_valid_index()
804        for col in df.columns
805    }
806    return [
807        col
808        for col, ix in cols_indices.items()
809        if (
810            ix is not None
811            and
812            isinstance(df.loc[ix][col], UUID)
813        )
814    ]

Get the columns which contain uuid.UUID objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain UUID objects.
Returns
  • A list of columns to treat as UUIDs.
def get_datetime_cols( df: pandas.core.frame.DataFrame, timezone_aware: bool = True, timezone_naive: bool = True, with_tz_precision: bool = False) -> Union[List[str], Dict[str, Tuple[Optional[str], str]]]:
817def get_datetime_cols(
818    df: 'pd.DataFrame',
819    timezone_aware: bool = True,
820    timezone_naive: bool = True,
821    with_tz_precision: bool = False,
822) -> Union[List[str], Dict[str, Tuple[Union[str, None], str]]]:
823    """
824    Get the columns which contain `datetime` or `Timestamp` objects from a Pandas DataFrame.
825
826    Parameters
827    ----------
828    df: pd.DataFrame
829        The DataFrame which may contain `datetime` or `Timestamp` objects.
830
831    timezone_aware: bool, default True
832        If `True`, include timezone-aware datetime columns.
833
834    timezone_naive: bool, default True
835        If `True`, include timezone-naive datetime columns.
836
837    with_tz_precision: bool, default False
838        If `True`, return a dictionary mapping column names to tuples in the form
839        `(timezone, precision)`.
840
841    Returns
842    -------
843    A list of columns to treat as datetimes, or a dictionary of columns to tz+precision tuples
844    (if `with_tz_precision` is `True`).
845    """
846    if not timezone_aware and not timezone_naive:
847        raise ValueError("`timezone_aware` and `timezone_naive` cannot both be `False`.")
848
849    if df is None:
850        return [] if not with_tz_precision else {}
851
852    from datetime import datetime
853    from meerschaum.utils.dtypes import are_dtypes_equal, MRSM_PRECISION_UNITS_ALIASES
854    is_dask = 'dask' in df.__module__
855    if is_dask:
856        df = get_first_valid_dask_partition(df)
857   
858    def get_tz_precision_from_dtype(dtype: str) -> Tuple[Union[str, None], str]:
859        """
860        Extract the tz + precision tuple from a dtype string.
861        """
862        meta_str = dtype.split('[', maxsplit=1)[-1].rstrip(']').replace(' ', '')
863        tz = (
864            None
865            if ',' not in meta_str
866            else meta_str.split(',', maxsplit=1)[-1]
867        )
868        precision_abbreviation = (
869            meta_str
870            if ',' not in meta_str
871            else meta_str.split(',')[0]
872        )
873        precision = MRSM_PRECISION_UNITS_ALIASES[precision_abbreviation]
874        return tz, precision
875
876    def get_tz_precision_from_datetime(dt: datetime) -> Tuple[Union[str, None], str]:
877        """
878        Return the tz + precision tuple from a Python datetime object.
879        """
880        return dt.tzname(), 'microsecond'
881
882    known_dt_cols_types = {
883        col: str(typ)
884        for col, typ in df.dtypes.items()
885        if are_dtypes_equal('datetime', str(typ))
886    }
887 
888    known_dt_cols_tuples = {
889        col: get_tz_precision_from_dtype(typ)
890        for col, typ in known_dt_cols_types.items()
891    }
892
893    if len(df) == 0:
894        return (
895            list(known_dt_cols_types)
896            if not with_tz_precision
897            else known_dt_cols_tuples
898        )
899
900    cols_indices = {
901        col: df[col].first_valid_index()
902        for col in df.columns
903        if col not in known_dt_cols_types
904    }
905    pydt_cols_tuples = {
906        col: get_tz_precision_from_datetime(sample_val)
907        for col, ix in cols_indices.items()
908        if (
909            ix is not None
910            and
911            isinstance((sample_val := df.loc[ix][col]), datetime)
912        )
913    }
914
915    dt_cols_tuples = {
916        **known_dt_cols_tuples,
917        **pydt_cols_tuples
918    }
919
920    all_dt_cols_tuples = {
921        col: dt_cols_tuples[col]
922        for col in df.columns
923        if col in dt_cols_tuples
924    }
925    if timezone_aware and timezone_naive:
926        return (
927            list(all_dt_cols_tuples)
928            if not with_tz_precision
929            else all_dt_cols_tuples
930        )
931
932    known_timezone_aware_dt_cols = [
933        col
934        for col in known_dt_cols_types
935        if getattr(df[col], 'tz', None) is not None
936    ]
937    timezone_aware_pydt_cols_tuples = {
938        col: (tz, precision)
939        for col, (tz, precision) in pydt_cols_tuples.items()
940        if df.loc[cols_indices[col]][col].tzinfo is not None
941    }
942    timezone_aware_dt_cols_set = set(
943        known_timezone_aware_dt_cols + list(timezone_aware_pydt_cols_tuples)
944    )
945    timezone_aware_cols_tuples = {
946        col: (tz, precision)
947        for col, (tz, precision) in all_dt_cols_tuples.items()
948        if col in timezone_aware_dt_cols_set
949    }
950    timezone_naive_cols_tuples = {
951        col: (tz, precision)
952        for col, (tz, precision) in all_dt_cols_tuples.items()
953        if col not in timezone_aware_dt_cols_set
954    }
955
956    if timezone_aware:
957        return (
958            list(timezone_aware_cols_tuples)
959            if not with_tz_precision
960            else timezone_aware_cols_tuples
961        )
962
963    return (
964        list(timezone_naive_cols_tuples)
965        if not with_tz_precision
966        else timezone_naive_cols_tuples
967    )

Get the columns which contain datetime or Timestamp objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain datetime or Timestamp objects.
  • timezone_aware (bool, default True): If True, include timezone-aware datetime columns.
  • timezone_naive (bool, default True): If True, include timezone-naive datetime columns.
  • with_tz_precision (bool, default False): If True, return a dictionary mapping column names to tuples in the form (timezone, precision).
Returns
  • A list of columns to treat as datetimes, or a dictionary of columns to tz+precision tuples
  • (if with_tz_precision is True).
def get_datetime_cols_types(df: pandas.core.frame.DataFrame) -> Dict[str, str]:
 970def get_datetime_cols_types(df: 'pd.DataFrame') -> Dict[str, str]:
 971    """
 972    Return a dictionary mapping datetime columns to specific types strings.
 973
 974    Parameters
 975    ----------
 976    df: pd.DataFrame
 977        The DataFrame which may contain datetime columns.
 978
 979    Returns
 980    -------
 981    A dictionary mapping the datetime columns' names to dtype strings
 982    (containing timezone and precision metadata).
 983
 984    Examples
 985    --------
 986    >>> from datetime import datetime, timezone
 987    >>> import pandas as pd
 988    >>> df = pd.DataFrame({'dt_tz_aware': [datetime(2025, 1, 1, tzinfo=timezone.utc)]})
 989    >>> get_datetime_cols_types(df)
 990    {'dt_tz_aware': 'datetime64[us, UTC]'}
 991    >>> df = pd.DataFrame({'distant_dt': [datetime(1, 1, 1)]})
 992    >>> get_datetime_cols_types(df)
 993    {'distant_dt': 'datetime64[us]'}
 994    >>> df = pd.DataFrame({'dt_second': datetime(2025, 1, 1)})
 995    >>> df['dt_second'] = df['dt_second'].astype('datetime64[s]')
 996    >>> get_datetime_cols_types(df)
 997    {'dt_second': 'datetime64[s]'}
 998    """
 999    from meerschaum.utils.dtypes import MRSM_PRECISION_UNITS_ABBREVIATIONS
1000    dt_cols_tuples = get_datetime_cols(df, with_tz_precision=True)
1001    if not dt_cols_tuples:
1002        return {}
1003
1004    return {
1005        col: (
1006            f"datetime64[{MRSM_PRECISION_UNITS_ABBREVIATIONS[precision]}]"
1007            if tz is None
1008            else f"datetime64[{MRSM_PRECISION_UNITS_ABBREVIATIONS[precision]}, {tz}]"
1009        )
1010        for col, (tz, precision) in dt_cols_tuples.items()
1011    }

Return a dictionary mapping datetime columns to specific types strings.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain datetime columns.
Returns
  • A dictionary mapping the datetime columns' names to dtype strings
  • (containing timezone and precision metadata).
Examples
>>> from datetime import datetime, timezone
>>> import pandas as pd
>>> df = pd.DataFrame({'dt_tz_aware': [datetime(2025, 1, 1, tzinfo=timezone.utc)]})
>>> get_datetime_cols_types(df)
{'dt_tz_aware': 'datetime64[us, UTC]'}
>>> df = pd.DataFrame({'distant_dt': [datetime(1, 1, 1)]})
>>> get_datetime_cols_types(df)
{'distant_dt': 'datetime64[us]'}
>>> df = pd.DataFrame({'dt_second': datetime(2025, 1, 1)})
>>> df['dt_second'] = df['dt_second'].astype('datetime64[s]')
>>> get_datetime_cols_types(df)
{'dt_second': 'datetime64[s]'}
def get_date_cols(df: pandas.core.frame.DataFrame) -> List[str]:
1014def get_date_cols(df: 'pd.DataFrame') -> List[str]:
1015    """
1016    Get the `date` columns from a Pandas DataFrame.
1017
1018    Parameters
1019    ----------
1020    df: pd.DataFrame
1021        The DataFrame which may contain dates.
1022
1023    Returns
1024    -------
1025    A list of columns to treat as dates.
1026    """
1027    from meerschaum.utils.dtypes import are_dtypes_equal
1028    if df is None:
1029        return []
1030
1031    is_dask = 'dask' in df.__module__
1032    if is_dask:
1033        df = get_first_valid_dask_partition(df)
1034
1035    known_date_cols = [
1036        col
1037        for col, typ in df.dtypes.items()
1038        if are_dtypes_equal(typ, 'date')
1039    ]
1040
1041    if len(df) == 0:
1042        return known_date_cols
1043
1044    cols_indices = {
1045        col: df[col].first_valid_index()
1046        for col in df.columns
1047        if col not in known_date_cols
1048    }
1049    object_date_cols = [
1050        col
1051        for col, ix in cols_indices.items()
1052        if (
1053            ix is not None
1054            and isinstance(df.loc[ix][col], date)
1055        )
1056    ]
1057
1058    all_date_cols = set(known_date_cols + object_date_cols)
1059
1060    return [
1061        col
1062        for col in df.columns
1063        if col in all_date_cols
1064    ]

Get the date columns from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain dates.
Returns
  • A list of columns to treat as dates.
def get_bytes_cols(df: pandas.core.frame.DataFrame) -> List[str]:
1067def get_bytes_cols(df: 'pd.DataFrame') -> List[str]:
1068    """
1069    Get the columns which contain bytes strings from a Pandas DataFrame.
1070
1071    Parameters
1072    ----------
1073    df: pd.DataFrame
1074        The DataFrame which may contain bytes strings.
1075
1076    Returns
1077    -------
1078    A list of columns to treat as bytes.
1079    """
1080    if df is None:
1081        return []
1082
1083    is_dask = 'dask' in df.__module__
1084    if is_dask:
1085        df = get_first_valid_dask_partition(df)
1086
1087    known_bytes_cols = [
1088        col
1089        for col, typ in df.dtypes.items()
1090        if str(typ) == 'binary[pyarrow]'
1091    ]
1092
1093    if len(df) == 0:
1094        return known_bytes_cols
1095
1096    cols_indices = {
1097        col: df[col].first_valid_index()
1098        for col in df.columns
1099        if col not in known_bytes_cols
1100    }
1101    object_bytes_cols = [
1102        col
1103        for col, ix in cols_indices.items()
1104        if (
1105            ix is not None
1106            and isinstance(df.loc[ix][col], bytes)
1107        )
1108    ]
1109
1110    all_bytes_cols = set(known_bytes_cols + object_bytes_cols)
1111
1112    return [
1113        col
1114        for col in df.columns
1115        if col in all_bytes_cols
1116    ]

Get the columns which contain bytes strings from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain bytes strings.
Returns
  • A list of columns to treat as bytes.
def get_geometry_cols( df: pandas.core.frame.DataFrame, with_types_srids: bool = False) -> Union[List[str], Dict[str, Any]]:
1119def get_geometry_cols(
1120    df: 'pd.DataFrame',
1121    with_types_srids: bool = False,
1122) -> Union[List[str], Dict[str, Any]]:
1123    """
1124    Get the columns which contain shapely objects from a Pandas DataFrame.
1125
1126    Parameters
1127    ----------
1128    df: pd.DataFrame
1129        The DataFrame which may contain bytes strings.
1130
1131    with_types_srids: bool, default False
1132        If `True`, return a dictionary mapping columns to geometry types and SRIDs.
1133
1134    Returns
1135    -------
1136    A list of columns to treat as `geometry`.
1137    If `with_types_srids`, return a dictionary mapping columns to tuples in the form (type, SRID).
1138    """
1139    if df is None:
1140        return [] if not with_types_srids else {}
1141
1142    is_dask = 'dask' in df.__module__
1143    if is_dask:
1144        df = get_first_valid_dask_partition(df)
1145
1146    if len(df) == 0:
1147        return [] if not with_types_srids else {}
1148
1149    cols_indices = {
1150        col: df[col].first_valid_index()
1151        for col in df.columns
1152    }
1153    geo_cols = [
1154        col
1155        for col, ix in cols_indices.items()
1156        if (
1157            ix is not None
1158            and
1159            'shapely' in str(type(df.loc[ix][col]))
1160        )
1161    ]
1162    if not with_types_srids:
1163        return geo_cols
1164
1165    gpd = mrsm.attempt_import('geopandas', lazy=False)
1166    geo_cols_types_srids = {}
1167    for col in geo_cols:
1168        try:
1169            sample_geo_series = gpd.GeoSeries(df[col], crs=None)
1170            geometry_types = {
1171                geom.geom_type
1172                for geom in sample_geo_series
1173                if hasattr(geom, 'geom_type')
1174            }
1175            geometry_has_z = any(getattr(geom, 'has_z', False) for geom in sample_geo_series)
1176            srid = (
1177                (
1178                    sample_geo_series.crs.sub_crs_list[0].to_epsg()
1179                    if sample_geo_series.crs.is_compound
1180                    else sample_geo_series.crs.to_epsg()
1181                )
1182                if sample_geo_series.crs
1183                else 0
1184            )
1185            geometry_type = list(geometry_types)[0] if len(geometry_types) == 1 else 'geometry'
1186            if geometry_type != 'geometry' and geometry_has_z:
1187                geometry_type = geometry_type + 'Z'
1188        except Exception:
1189            srid = 0
1190            geometry_type = 'geometry'
1191        geo_cols_types_srids[col] = (geometry_type, srid)
1192
1193    return geo_cols_types_srids

Get the columns which contain shapely objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain bytes strings.
  • with_types_srids (bool, default False): If True, return a dictionary mapping columns to geometry types and SRIDs.
Returns
  • A list of columns to treat as geometry.
  • If with_types_srids, return a dictionary mapping columns to tuples in the form (type, SRID).
def get_geometry_cols_types(df: pandas.core.frame.DataFrame) -> Dict[str, str]:
1196def get_geometry_cols_types(df: 'pd.DataFrame') -> Dict[str, str]:
1197    """
1198    Return a dtypes dictionary mapping columns to specific geometry types (type, srid).
1199    """
1200    geometry_cols_types_srids = get_geometry_cols(df, with_types_srids=True)
1201    new_cols_types = {}
1202    for col, (geometry_type, srid) in geometry_cols_types_srids.items():
1203        new_dtype = "geometry"
1204        modifier = ""
1205        if not srid and geometry_type.lower() == 'geometry':
1206            new_cols_types[col] = new_dtype
1207            continue
1208
1209        modifier = "["
1210        if geometry_type.lower() != 'geometry':
1211            modifier += f"{geometry_type}"
1212
1213        if srid:
1214            if modifier != '[':
1215                modifier += ", "
1216            modifier += f"{srid}"
1217        modifier += "]"
1218        new_cols_types[col] = f"{new_dtype}{modifier}"
1219    return new_cols_types

Return a dtypes dictionary mapping columns to specific geometry types (type, srid).

def get_special_cols(df: pandas.core.frame.DataFrame) -> Dict[str, str]:
1222def get_special_cols(df: 'pd.DataFrame') -> Dict[str, str]:
1223    """
1224    Return a dtypes dictionary mapping special columns to their dtypes.
1225    """
1226    return {
1227        **{col: 'json' for col in get_json_cols(df)},
1228        **{col: 'uuid' for col in get_uuid_cols(df)},
1229        **{col: 'bytes' for col in get_bytes_cols(df)},
1230        **{col: 'bool' for col in get_bool_cols(df)},
1231        **{col: 'numeric' for col in get_numeric_cols(df)},
1232        **{col: 'date' for col in get_date_cols(df)},
1233        **get_datetime_cols_types(df),
1234        **get_geometry_cols_types(df),
1235    }

Return a dtypes dictionary mapping special columns to their dtypes.

def enforce_dtypes( df: pandas.core.frame.DataFrame, dtypes: Dict[str, str], explicit_dtypes: Optional[Dict[str, str]] = None, safe_copy: bool = True, coerce_numeric: bool = False, coerce_timezone: bool = True, strip_timezone: bool = False, debug: bool = False) -> pandas.core.frame.DataFrame:
1238def enforce_dtypes(
1239    df: 'pd.DataFrame',
1240    dtypes: Dict[str, str],
1241    explicit_dtypes: Optional[Dict[str, str]] = None,
1242    safe_copy: bool = True,
1243    coerce_numeric: bool = False,
1244    coerce_timezone: bool = True,
1245    strip_timezone: bool = False,
1246    debug: bool = False,
1247) -> 'pd.DataFrame':
1248    """
1249    Enforce the `dtypes` dictionary on a DataFrame.
1250
1251    Parameters
1252    ----------
1253    df: pd.DataFrame
1254        The DataFrame on which to enforce dtypes.
1255
1256    dtypes: Dict[str, str]
1257        The data types to attempt to enforce on the DataFrame.
1258
1259    explicit_dtypes: Optional[Dict[str, str]], default None
1260        If provided, automatic dtype coersion will respect explicitly configured
1261        dtypes (`int`, `float`, `numeric`).
1262
1263    safe_copy: bool, default True
1264        If `True`, create a copy before comparing and modifying the dataframes.
1265        Setting to `False` may mutate the DataFrames.
1266        See `meerschaum.utils.dataframe.filter_unseen_df`.
1267
1268    coerce_numeric: bool, default False
1269        If `True`, convert float and int collisions to numeric.
1270
1271    coerce_timezone: bool, default True
1272        If `True`, convert datetimes to UTC.
1273
1274    strip_timezone: bool, default False
1275        If `coerce_timezone` and `strip_timezone` are `True`,
1276        remove timezone information from datetimes.
1277
1278    debug: bool, default False
1279        Verbosity toggle.
1280
1281    Returns
1282    -------
1283    The Pandas DataFrame with the types enforced.
1284    """
1285    import json
1286    import functools
1287    from meerschaum.utils.debug import dprint
1288    from meerschaum.utils.formatting import pprint
1289    from meerschaum.utils.dtypes import (
1290        are_dtypes_equal,
1291        to_pandas_dtype,
1292        is_dtype_numeric,
1293        attempt_cast_to_numeric,
1294        attempt_cast_to_uuid,
1295        attempt_cast_to_bytes,
1296        attempt_cast_to_geometry,
1297        coerce_timezone as _coerce_timezone,
1298        get_geometry_type_srid,
1299    )
1300    from meerschaum.utils.dtypes.sql import get_numeric_precision_scale
1301    pandas = mrsm.attempt_import('pandas')
1302    is_dask = 'dask' in df.__module__
1303    if safe_copy:
1304        df = df.copy()
1305    if len(df.columns) == 0:
1306        if debug:
1307            dprint("Incoming DataFrame has no columns. Skipping enforcement...")
1308        return df
1309
1310    explicit_dtypes = explicit_dtypes or {}
1311    pipe_pandas_dtypes = {
1312        col: to_pandas_dtype(typ)
1313        for col, typ in dtypes.items()
1314    }
1315    json_cols = [
1316        col
1317        for col, typ in dtypes.items()
1318        if typ == 'json'
1319    ]
1320    numeric_cols = [
1321        col
1322        for col, typ in dtypes.items()
1323        if typ.startswith('numeric')
1324    ]
1325    geometry_cols_types_srids = {
1326        col: get_geometry_type_srid(typ, default_srid=0)
1327        for col, typ in dtypes.items()
1328        if typ.startswith('geometry') or typ.startswith('geography')
1329    }
1330    uuid_cols = [
1331        col
1332        for col, typ in dtypes.items()
1333        if typ == 'uuid'
1334    ]
1335    bytes_cols = [
1336        col
1337        for col, typ in dtypes.items()
1338        if typ == 'bytes'
1339    ]
1340    datetime_cols = [
1341        col
1342        for col, typ in dtypes.items()
1343        if are_dtypes_equal(typ, 'datetime')
1344    ]
1345    df_numeric_cols = get_numeric_cols(df)
1346    if debug:
1347        dprint("Desired data types:")
1348        pprint(dtypes)
1349        dprint("Data types for incoming DataFrame:")
1350        pprint({_col: str(_typ) for _col, _typ in df.dtypes.items()})
1351
1352    if json_cols and len(df) > 0:
1353        if debug:
1354            dprint(f"Checking columns for JSON encoding: {json_cols}")
1355        for col in json_cols:
1356            if col in df.columns:
1357                try:
1358                    df[col] = df[col].apply(
1359                        (
1360                            lambda x: (
1361                                json.loads(x)
1362                                if isinstance(x, str)
1363                                else x
1364                            )
1365                        )
1366                    )
1367                except Exception as e:
1368                    if debug:
1369                        dprint(f"Unable to parse column '{col}' as JSON:\n{e}")
1370
1371    if numeric_cols:
1372        if debug:
1373            dprint(f"Checking for numerics: {numeric_cols}")
1374        for col in numeric_cols:
1375            precision, scale = get_numeric_precision_scale(None, dtypes.get(col, ''))
1376            if col in df.columns:
1377                try:
1378                    df[col] = df[col].apply(
1379                        functools.partial(
1380                            attempt_cast_to_numeric,
1381                            quantize=True,
1382                            precision=precision,
1383                            scale=scale,
1384                        )
1385                    )
1386                except Exception as e:
1387                    if debug:
1388                        dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}")
1389
1390    if uuid_cols:
1391        if debug:
1392            dprint(f"Checking for UUIDs: {uuid_cols}")
1393        for col in uuid_cols:
1394            if col in df.columns:
1395                try:
1396                    df[col] = df[col].apply(attempt_cast_to_uuid)
1397                except Exception as e:
1398                    if debug:
1399                        dprint(f"Unable to parse column '{col}' as UUID:\n{e}")
1400
1401    if bytes_cols:
1402        if debug:
1403            dprint(f"Checking for bytes: {bytes_cols}")
1404        for col in bytes_cols:
1405            if col in df.columns:
1406                try:
1407                    df[col] = df[col].apply(attempt_cast_to_bytes)
1408                except Exception as e:
1409                    if debug:
1410                        dprint(f"Unable to parse column '{col}' as bytes:\n{e}")
1411
1412    if datetime_cols and coerce_timezone:
1413        if debug:
1414            dprint(f"Checking for datetime conversion: {datetime_cols}")
1415        for col in datetime_cols:
1416            if col in df.columns:
1417                if not strip_timezone and 'utc' in str(df.dtypes[col]).lower():
1418                    if debug:
1419                        dprint(f"Skip UTC coersion for column '{col}' ({str(df[col].dtype)}).")
1420                    continue
1421                if strip_timezone and ',' not in str(df.dtypes[col]):
1422                    if debug:
1423                        dprint(
1424                            f"Skip UTC coersion (stripped) for column '{col}' "
1425                            f"({str(df[col].dtype)})."
1426                        )
1427                        continue
1428
1429                if debug:
1430                    dprint(
1431                        f"Data type for column '{col}' before timezone coersion: "
1432                        f"{str(df[col].dtype)}"
1433                    )
1434
1435                df[col] = _coerce_timezone(df[col], strip_utc=strip_timezone)
1436                if debug:
1437                    dprint(
1438                        f"Data type for column '{col}' after timezone coersion: "
1439                        f"{str(df[col].dtype)}"
1440                    )
1441
1442    if geometry_cols_types_srids:
1443        geopandas = mrsm.attempt_import('geopandas')
1444        if debug:
1445            dprint(f"Checking for geometry: {list(geometry_cols_types_srids)}")
1446        parsed_geom_cols = []
1447        for col in geometry_cols_types_srids:
1448            if col not in df.columns:
1449                continue
1450            try:
1451                df[col] = attempt_cast_to_geometry(df[col])
1452                parsed_geom_cols.append(col)
1453            except Exception as e:
1454                import traceback
1455                traceback.print_exc()
1456                if debug:
1457                    dprint(f"Unable to parse column '{col}' as geometry:\n{e}")
1458
1459        if parsed_geom_cols:
1460            if debug:
1461                dprint(f"Converting to GeoDataFrame (geometry column: '{parsed_geom_cols[0]}')...")
1462            try:
1463                _, default_srid = geometry_cols_types_srids[parsed_geom_cols[0]]
1464                df = geopandas.GeoDataFrame(df, geometry=parsed_geom_cols[0], crs=default_srid)
1465                for col, (_, srid) in geometry_cols_types_srids.items():
1466                    if srid:
1467                        if debug:
1468                            dprint(f"Setting '{col}' to SRID '{srid}'...")
1469                        _ = df[col].set_crs(srid)
1470                if parsed_geom_cols[0] not in df.columns:
1471                    df.rename_geometry(parsed_geom_cols[0], inplace=True)
1472            except (ValueError, TypeError):
1473                import traceback
1474                dprint(f"Failed to cast to GeoDataFrame:\n{traceback.format_exc()}")
1475
1476    df_dtypes = {c: str(t) for c, t in df.dtypes.items()}
1477    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
1478        if debug:
1479            dprint("Data types match. Exiting enforcement...")
1480        return df
1481
1482    common_dtypes = {}
1483    common_diff_dtypes = {}
1484    for col, typ in pipe_pandas_dtypes.items():
1485        if col in df_dtypes:
1486            common_dtypes[col] = typ
1487            if not are_dtypes_equal(typ, df_dtypes[col]):
1488                common_diff_dtypes[col] = df_dtypes[col]
1489
1490    if debug:
1491        dprint("Common columns with different dtypes:")
1492        pprint(common_diff_dtypes)
1493
1494    detected_dt_cols = {}
1495    for col, typ in common_diff_dtypes.items():
1496        if 'datetime' in typ and 'datetime' in common_dtypes[col]:
1497            df_dtypes[col] = typ
1498            detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col])
1499    for col in detected_dt_cols:
1500        del common_diff_dtypes[col]
1501
1502    if debug:
1503        dprint("Common columns with different dtypes (after dates):")
1504        pprint(common_diff_dtypes)
1505
1506    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
1507        if debug:
1508            dprint(
1509                "The incoming DataFrame has mostly the same types, skipping enforcement."
1510                + "The only detected difference was in the following datetime columns."
1511            )
1512            pprint(detected_dt_cols)
1513        return df
1514
1515    for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items():
1516        previous_typ = common_dtypes[col]
1517        mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ))
1518        explicitly_float = are_dtypes_equal(explicit_dtypes.get(col, 'object'), 'float')
1519        explicitly_int = are_dtypes_equal(explicit_dtypes.get(col, 'object'), 'int')
1520        explicitly_numeric = explicit_dtypes.get(col, 'object').startswith('numeric')
1521        all_nan = (
1522            df[col].isnull().all()
1523            if mixed_numeric_types and coerce_numeric and not (explicitly_float or explicitly_int)
1524            else None
1525        )
1526        cast_to_numeric = explicitly_numeric or (
1527            (
1528                col in df_numeric_cols
1529                or (
1530                    mixed_numeric_types
1531                    and not (explicitly_float or explicitly_int)
1532                    and not all_nan
1533                    and coerce_numeric
1534                )
1535            )
1536        )
1537
1538        if debug and (explicitly_numeric or df_numeric_cols or mixed_numeric_types):
1539            from meerschaum.utils.formatting import make_header
1540            msg = (
1541                make_header(f"Coercing column '{col}' to numeric:", left_pad=0)
1542                + "\n"
1543                + f"  Previous type: {previous_typ}\n"
1544                + f"  Current type: {typ if col not in df_numeric_cols else 'Decimal'}"
1545                + ("\n  Column is explicitly numeric." if explicitly_numeric else "")
1546            ) if cast_to_numeric else (
1547                f"Will not coerce column '{col}' to numeric.\n"
1548                f"  Numeric columns in dataframe: {df_numeric_cols}\n"
1549                f"  Mixed numeric types: {mixed_numeric_types}\n"
1550                f"  Explicitly float: {explicitly_float}\n"
1551                f"  Explicitly int: {explicitly_int}\n"
1552                f"  All NaN: {all_nan}\n"
1553                f"  Coerce numeric: {coerce_numeric}"
1554            )
1555            dprint(msg)
1556
1557        if cast_to_numeric:
1558            common_dtypes[col] = attempt_cast_to_numeric
1559            common_diff_dtypes[col] = attempt_cast_to_numeric
1560
1561    for d in common_diff_dtypes:
1562        t = common_dtypes[d]
1563        if debug:
1564            dprint(f"Casting column {d} to dtype {t}.")
1565        try:
1566            df[d] = (
1567                df[d].apply(t)
1568                if callable(t)
1569                else df[d].astype(t)
1570            )
1571        except Exception as e:
1572            if debug:
1573                dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}\ndf:\n{df}")
1574            if 'int' in str(t).lower():
1575                try:
1576                    df[d] = df[d].astype('float64').astype(t)
1577                except Exception:
1578                    if debug:
1579                        dprint(f"Was unable to convert to float then {t}.")
1580    return df

Enforce the dtypes dictionary on a DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame on which to enforce dtypes.
  • dtypes (Dict[str, str]): The data types to attempt to enforce on the DataFrame.
  • explicit_dtypes (Optional[Dict[str, str]], default None): If provided, automatic dtype coersion will respect explicitly configured dtypes (int, float, numeric).
  • safe_copy (bool, default True): If True, create a copy before comparing and modifying the dataframes. Setting to False may mutate the DataFrames. See meerschaum.utils.dataframe.filter_unseen_df.
  • coerce_numeric (bool, default False): If True, convert float and int collisions to numeric.
  • coerce_timezone (bool, default True): If True, convert datetimes to UTC.
  • strip_timezone (bool, default False): If coerce_timezone and strip_timezone are True, remove timezone information from datetimes.
  • debug (bool, default False): Verbosity toggle.
Returns
  • The Pandas DataFrame with the types enforced.
def get_datetime_bound_from_df( df: Union[pandas.core.frame.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]]], datetime_column: str, minimum: bool = True) -> Union[int, datetime.datetime, NoneType]:
1583def get_datetime_bound_from_df(
1584    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]],
1585    datetime_column: str,
1586    minimum: bool = True,
1587) -> Union[int, datetime, None]:
1588    """
1589    Return the minimum or maximum datetime (or integer) from a DataFrame.
1590
1591    Parameters
1592    ----------
1593    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]
1594        The DataFrame, list, or dict which contains the range axis.
1595
1596    datetime_column: str
1597        The name of the datetime (or int) column.
1598
1599    minimum: bool
1600        Whether to return the minimum (default) or maximum value.
1601
1602    Returns
1603    -------
1604    The minimum or maximum datetime value in the dataframe, or `None`.
1605    """
1606    from meerschaum.utils.dtypes import to_datetime, value_is_null
1607
1608    if df is None:
1609        return None
1610    if not datetime_column:
1611        return None
1612
1613    def compare(a, b):
1614        if a is None:
1615            return b
1616        if b is None:
1617            return a
1618        if minimum:
1619            return a if a < b else b
1620        return a if a > b else b
1621
1622    if isinstance(df, list):
1623        if len(df) == 0:
1624            return None
1625        best_yet = df[0].get(datetime_column, None)
1626        for doc in df:
1627            val = doc.get(datetime_column, None)
1628            best_yet = compare(best_yet, val)
1629        return best_yet
1630
1631    if isinstance(df, dict):
1632        if datetime_column not in df:
1633            return None
1634        best_yet = df[datetime_column][0]
1635        for val in df[datetime_column]:
1636            best_yet = compare(best_yet, val)
1637        return best_yet
1638
1639    if 'DataFrame' in str(type(df)):
1640        from meerschaum.utils.dtypes import are_dtypes_equal
1641        pandas = mrsm.attempt_import('pandas')
1642        is_dask = 'dask' in df.__module__
1643
1644        if datetime_column not in df.columns:
1645            return None
1646
1647        try:
1648            dt_val = (
1649                df[datetime_column].min(skipna=True)
1650                if minimum
1651                else df[datetime_column].max(skipna=True)
1652            )
1653        except Exception:
1654            dt_val = pandas.NA
1655        if is_dask and dt_val is not None and dt_val is not pandas.NA:
1656            dt_val = dt_val.compute()
1657
1658        return (
1659            to_datetime(dt_val, as_pydatetime=True)
1660            if are_dtypes_equal(str(type(dt_val)), 'datetime')
1661            else (dt_val if not value_is_null(dt_val) else None)
1662        )
1663
1664    return None

Return the minimum or maximum datetime (or integer) from a DataFrame.

Parameters
  • df (Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]): The DataFrame, list, or dict which contains the range axis.
  • datetime_column (str): The name of the datetime (or int) column.
  • minimum (bool): Whether to return the minimum (default) or maximum value.
Returns
  • The minimum or maximum datetime value in the dataframe, or None.
def get_unique_index_values( df: Union[pandas.core.frame.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]]], indices: List[str]) -> Dict[str, List[Any]]:
1667def get_unique_index_values(
1668    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]],
1669    indices: List[str],
1670) -> Dict[str, List[Any]]:
1671    """
1672    Return a dictionary of the unique index values in a DataFrame.
1673
1674    Parameters
1675    ----------
1676    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]
1677        The dataframe (or list or dict) which contains index values.
1678
1679    indices: List[str]
1680        The list of index columns.
1681
1682    Returns
1683    -------
1684    A dictionary mapping indices to unique values.
1685    """
1686    if df is None:
1687        return {}
1688    if 'dataframe' in str(type(df)).lower():
1689        pandas = mrsm.attempt_import('pandas')
1690        return {
1691            col: list({
1692                (val if val is not pandas.NA else None)
1693                for val in df[col].unique()
1694            })
1695            for col in indices
1696            if col in df.columns
1697        }
1698
1699    unique_indices = defaultdict(lambda: set())
1700    if isinstance(df, list):
1701        for doc in df:
1702            for index in indices:
1703                if index in doc:
1704                    unique_indices[index].add(doc[index])
1705
1706    elif isinstance(df, dict):
1707        for index in indices:
1708            if index in df:
1709                unique_indices[index] = unique_indices[index].union(set(df[index]))
1710
1711    return {key: list(val) for key, val in unique_indices.items()}

Return a dictionary of the unique index values in a DataFrame.

Parameters
  • df (Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]): The dataframe (or list or dict) which contains index values.
  • indices (List[str]): The list of index columns.
Returns
  • A dictionary mapping indices to unique values.
def df_is_chunk_generator(df: Any) -> bool:
1714def df_is_chunk_generator(df: Any) -> bool:
1715    """
1716    Determine whether to treat `df` as a chunk generator.
1717
1718    Note this should only be used in a context where generators are expected,
1719    as it will return `True` for any iterable.
1720
1721    Parameters
1722    ----------
1723    The DataFrame or chunk generator to evaluate.
1724
1725    Returns
1726    -------
1727    A `bool` indicating whether to treat `df` as a generator.
1728    """
1729    return (
1730        not isinstance(df, (dict, list, str))
1731        and 'DataFrame' not in str(type(df))
1732        and isinstance(df, (Generator, Iterable, Iterator))
1733    )

Determine whether to treat df as a chunk generator.

Note this should only be used in a context where generators are expected, as it will return True for any iterable.

Parameters
  • The DataFrame or chunk generator to evaluate.
Returns
  • A bool indicating whether to treat df as a generator.
def chunksize_to_npartitions(chunksize: Optional[int]) -> int:
1736def chunksize_to_npartitions(chunksize: Optional[int]) -> int:
1737    """
1738    Return the Dask `npartitions` value for a given `chunksize`.
1739    """
1740    if chunksize == -1:
1741        from meerschaum.config import get_config
1742        chunksize = get_config('system', 'connectors', 'sql', 'chunksize')
1743    if chunksize is None:
1744        return 1
1745    return -1 * chunksize

Return the Dask npartitions value for a given chunksize.

def df_from_literal( pipe: Optional[meerschaum.Pipe] = None, literal: Optional[str] = None, debug: bool = False) -> pandas.core.frame.DataFrame:
1748def df_from_literal(
1749    pipe: Optional[mrsm.Pipe] = None,
1750    literal: Optional[str] = None,
1751    debug: bool = False
1752) -> 'pd.DataFrame':
1753    """
1754    Construct a dataframe from a literal value, using the pipe's datetime and value column names.
1755
1756    Parameters
1757    ----------
1758    pipe: Optional['meerschaum.Pipe'], default None
1759        The pipe which will consume the literal value.
1760
1761    Returns
1762    -------
1763    A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
1764    and the literal as the value.
1765    """
1766    from meerschaum.utils.packages import import_pandas
1767    from meerschaum.utils.warnings import error, warn
1768    from meerschaum.utils.debug import dprint
1769    from meerschaum.utils.dtypes import get_current_timestamp
1770
1771    if pipe is None or literal is None:
1772        error("Please provide a Pipe and a literal value")
1773
1774    dt_col = pipe.columns.get(
1775        'datetime',
1776        mrsm.get_config('pipes', 'autotime', 'column_name_if_datetime_missing')
1777    )
1778    val_col = pipe.get_val_column(debug=debug)
1779
1780    val = literal
1781    if isinstance(literal, str):
1782        if debug:
1783            dprint(f"Received literal string: '{literal}'")
1784        import ast
1785        try:
1786            val = ast.literal_eval(literal)
1787        except Exception:
1788            warn(
1789                "Failed to parse value from string:\n" + f"{literal}" +
1790                "\n\nWill cast as a string instead."\
1791            )
1792            val = literal
1793
1794    now = get_current_timestamp(pipe.precision)
1795    pd = import_pandas()
1796    return pd.DataFrame({dt_col: [now], val_col: [val]})

Construct a dataframe from a literal value, using the pipe's datetime and value column names.

Parameters
  • pipe (Optional['meerschaum.Pipe'], default None): The pipe which will consume the literal value.
Returns
  • A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
  • and the literal as the value.
def get_first_valid_dask_partition( ddf: "'dask.dataframe.DataFrame'") -> Optional[pandas.core.frame.DataFrame]:
1799def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]:
1800    """
1801    Return the first valid Dask DataFrame partition (if possible).
1802    """
1803    pdf = None
1804    for partition in ddf.partitions:
1805        try:
1806            pdf = partition.compute()
1807        except Exception:
1808            continue
1809        if len(pdf) > 0:
1810            return pdf
1811    _ = mrsm.attempt_import('partd', lazy=False)
1812    return ddf.compute()

Return the first valid Dask DataFrame partition (if possible).

def query_df( df: pandas.core.frame.DataFrame, params: Optional[Dict[str, Any]] = None, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, datetime_column: Optional[str] = None, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, inplace: bool = False, reset_index: bool = False, coerce_types: bool = False, debug: bool = False) -> pandas.core.frame.DataFrame:
1815def query_df(
1816    df: 'pd.DataFrame',
1817    params: Optional[Dict[str, Any]] = None,
1818    begin: Union[datetime, int, None] = None,
1819    end: Union[datetime, int, None] = None,
1820    datetime_column: Optional[str] = None,
1821    select_columns: Optional[List[str]] = None,
1822    omit_columns: Optional[List[str]] = None,
1823    inplace: bool = False,
1824    reset_index: bool = False,
1825    coerce_types: bool = False,
1826    debug: bool = False,
1827) -> 'pd.DataFrame':
1828    """
1829    Query the dataframe with the params dictionary.
1830
1831    Parameters
1832    ----------
1833    df: pd.DataFrame
1834        The DataFrame to query against.
1835
1836    params: Optional[Dict[str, Any]], default None
1837        The parameters dictionary to use for the query.
1838
1839    begin: Union[datetime, int, None], default None
1840        If `begin` and `datetime_column` are provided, only return rows with a timestamp
1841        greater than or equal to this value.
1842
1843    end: Union[datetime, int, None], default None
1844        If `begin` and `datetime_column` are provided, only return rows with a timestamp
1845        less than this value.
1846
1847    datetime_column: Optional[str], default None
1848        A `datetime_column` must be provided to use `begin` and `end`.
1849
1850    select_columns: Optional[List[str]], default None
1851        If provided, only return these columns.
1852
1853    omit_columns: Optional[List[str]], default None
1854        If provided, do not include these columns in the result.
1855
1856    inplace: bool, default False
1857        If `True`, modify the DataFrame inplace rather than creating a new DataFrame.
1858
1859    reset_index: bool, default False
1860        If `True`, reset the index in the resulting DataFrame.
1861
1862    coerce_types: bool, default False
1863        If `True`, cast the dataframe and parameters as strings before querying.
1864
1865    Returns
1866    -------
1867    A Pandas DataFrame query result.
1868    """
1869
1870    def _process_select_columns(_df):
1871        if not select_columns:
1872            return
1873        for col in list(_df.columns):
1874            if col not in select_columns:
1875                del _df[col]
1876
1877    def _process_omit_columns(_df):
1878        if not omit_columns:
1879            return
1880        for col in list(_df.columns):
1881            if col in omit_columns:
1882                del _df[col]
1883
1884    if not params and not begin and not end:
1885        if not inplace:
1886            df = df.copy()
1887        _process_select_columns(df)
1888        _process_omit_columns(df)
1889        return df
1890
1891    from meerschaum.utils.debug import dprint
1892    from meerschaum.utils.misc import get_in_ex_params
1893    from meerschaum.utils.warnings import warn
1894    from meerschaum.utils.dtypes import are_dtypes_equal, value_is_null
1895    dateutil_parser = mrsm.attempt_import('dateutil.parser')
1896    pandas = mrsm.attempt_import('pandas')
1897    NA = pandas.NA
1898
1899    if params:
1900        proto_in_ex_params = get_in_ex_params(params)
1901        for key, (proto_in_vals, proto_ex_vals) in proto_in_ex_params.items():
1902            if proto_ex_vals:
1903                coerce_types = True
1904                break
1905        params = params.copy()
1906        for key, val in {k: v for k, v in params.items()}.items():
1907            if isinstance(val, (list, tuple, set)) or hasattr(val, 'astype'):
1908                if None in val:
1909                    val = [item for item in val if item is not None] + [NA]
1910                    params[key] = val
1911                if coerce_types:
1912                    params[key] = [str(x) for x in val]
1913            else:
1914                if value_is_null(val):
1915                    val = NA
1916                    params[key] = NA
1917                if coerce_types:
1918                    params[key] = str(val)
1919
1920    dtypes = {col: str(typ) for col, typ in df.dtypes.items()}
1921
1922    if inplace:
1923        df.fillna(NA, inplace=True)
1924    else:
1925        df = df.infer_objects().fillna(NA)
1926
1927    if isinstance(begin, str):
1928        begin = dateutil_parser.parse(begin)
1929    if isinstance(end, str):
1930        end = dateutil_parser.parse(end)
1931
1932    if begin is not None or end is not None:
1933        if not datetime_column or datetime_column not in df.columns:
1934            warn(
1935                f"The datetime column '{datetime_column}' is not present in the Dataframe, "
1936                + "ignoring begin and end...",
1937            )
1938            begin, end = None, None
1939
1940    if debug:
1941        dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}")
1942
1943    if datetime_column and (begin is not None or end is not None):
1944        if debug:
1945            dprint("Checking for datetime column compatability.")
1946
1947        from meerschaum.utils.dtypes import coerce_timezone
1948        df_is_dt = are_dtypes_equal(str(df.dtypes[datetime_column]), 'datetime')
1949        begin_is_int = are_dtypes_equal(str(type(begin)), 'int')
1950        end_is_int = are_dtypes_equal(str(type(end)), 'int')
1951
1952        if df_is_dt:
1953            df_tz = (
1954                getattr(df[datetime_column].dt, 'tz', None)
1955                if hasattr(df[datetime_column], 'dt')
1956                else None
1957            )
1958
1959            if begin_is_int:
1960                begin = datetime.fromtimestamp(int(begin), timezone.utc).replace(tzinfo=None)
1961                if debug:
1962                    dprint(f"`begin` will be cast to '{begin}'.")
1963            if end_is_int:
1964                end = datetime.fromtimestamp(int(end), timezone.utc).replace(tzinfo=None)
1965                if debug:
1966                    dprint(f"`end` will be cast to '{end}'.")
1967
1968            begin = coerce_timezone(begin, strip_utc=(df_tz is None)) if begin is not None else None
1969            end = coerce_timezone(end, strip_utc=(df_tz is None)) if begin is not None else None
1970
1971    in_ex_params = get_in_ex_params(params)
1972
1973    masks = [
1974        (
1975            (df[datetime_column] >= begin)
1976            if begin is not None and datetime_column
1977            else True
1978        ) & (
1979            (df[datetime_column] < end)
1980            if end is not None and datetime_column
1981            else True
1982        )
1983    ]
1984
1985    masks.extend([
1986        (
1987            (
1988                (df[col] if not coerce_types else df[col].astype(str)).isin(in_vals)
1989                if in_vals
1990                else True
1991            ) & (
1992                ~(df[col] if not coerce_types else df[col].astype(str)).isin(ex_vals)
1993                if ex_vals
1994                else True
1995            )
1996        )
1997        for col, (in_vals, ex_vals) in in_ex_params.items()
1998        if col in df.columns
1999    ])
2000    query_mask = masks[0]
2001    for mask in masks[1:]:
2002        query_mask = query_mask & mask
2003
2004    original_cols = df.columns
2005
2006    ### NOTE: We must cast bool columns to `boolean[pyarrow]`
2007    ###       to allow for `<NA>` values.
2008    bool_cols = [
2009        col
2010        for col, typ in df.dtypes.items()
2011        if are_dtypes_equal(str(typ), 'bool')
2012    ]
2013    for col in bool_cols:
2014        df[col] = df[col].astype('boolean[pyarrow]')
2015
2016    if not isinstance(query_mask, bool):
2017        df['__mrsm_mask'] = (
2018            query_mask.astype('boolean[pyarrow]')
2019            if hasattr(query_mask, 'astype')
2020            else query_mask
2021        )
2022
2023        if inplace:
2024            df.where(query_mask, other=NA, inplace=True)
2025            df.dropna(how='all', inplace=True)
2026            result_df = df
2027        else:
2028            result_df = df.where(query_mask, other=NA)
2029            result_df.dropna(how='all', inplace=True)
2030
2031    else:
2032        result_df = df
2033
2034    if '__mrsm_mask' in df.columns:
2035        del df['__mrsm_mask']
2036    if '__mrsm_mask' in result_df.columns:
2037        del result_df['__mrsm_mask']
2038
2039    if reset_index:
2040        result_df.reset_index(drop=True, inplace=True)
2041
2042    result_df = enforce_dtypes(
2043        result_df,
2044        dtypes,
2045        safe_copy=False,
2046        debug=debug,
2047        coerce_numeric=False,
2048        coerce_timezone=False,
2049    )
2050
2051    if select_columns == ['*']:
2052        select_columns = None
2053
2054    if not select_columns and not omit_columns:
2055        return result_df[original_cols]
2056
2057    _process_select_columns(result_df)
2058    _process_omit_columns(result_df)
2059
2060    return result_df

Query the dataframe with the params dictionary.

Parameters
  • df (pd.DataFrame): The DataFrame to query against.
  • params (Optional[Dict[str, Any]], default None): The parameters dictionary to use for the query.
  • begin (Union[datetime, int, None], default None): If begin and datetime_column are provided, only return rows with a timestamp greater than or equal to this value.
  • end (Union[datetime, int, None], default None): If begin and datetime_column are provided, only return rows with a timestamp less than this value.
  • datetime_column (Optional[str], default None): A datetime_column must be provided to use begin and end.
  • select_columns (Optional[List[str]], default None): If provided, only return these columns.
  • omit_columns (Optional[List[str]], default None): If provided, do not include these columns in the result.
  • inplace (bool, default False): If True, modify the DataFrame inplace rather than creating a new DataFrame.
  • reset_index (bool, default False): If True, reset the index in the resulting DataFrame.
  • coerce_types (bool, default False): If True, cast the dataframe and parameters as strings before querying.
Returns
  • A Pandas DataFrame query result.
def to_json( df: pandas.core.frame.DataFrame, safe_copy: bool = True, orient: str = 'records', date_format: str = 'iso', date_unit: str = 'us', double_precision: int = 15, geometry_format: str = 'geojson', **kwargs: Any) -> str:
2063def to_json(
2064    df: 'pd.DataFrame',
2065    safe_copy: bool = True,
2066    orient: str = 'records',
2067    date_format: str = 'iso',
2068    date_unit: str = 'us',
2069    double_precision: int = 15,
2070    geometry_format: str = 'geojson',
2071    **kwargs: Any
2072) -> str:
2073    """
2074    Serialize the given dataframe as a JSON string.
2075
2076    Parameters
2077    ----------
2078    df: pd.DataFrame
2079        The DataFrame to be serialized.
2080
2081    safe_copy: bool, default True
2082        If `False`, modify the DataFrame inplace.
2083
2084    date_format: str, default 'iso'
2085        The default format for timestamps.
2086
2087    date_unit: str, default 'us'
2088        The precision of the timestamps.
2089
2090    double_precision: int, default 15
2091        The number of decimal places to use when encoding floating point values (maximum 15).
2092
2093    geometry_format: str, default 'geojson'
2094        The serialization format for geometry data.
2095        Accepted values are `geojson`, `wkb_hex`, and `wkt`.
2096
2097    Returns
2098    -------
2099    A JSON string.
2100    """
2101    import warnings
2102    import functools
2103    from meerschaum.utils.packages import import_pandas
2104    from meerschaum.utils.dtypes import (
2105        serialize_bytes,
2106        serialize_decimal,
2107        serialize_geometry,
2108    )
2109    pd = import_pandas()
2110    uuid_cols = get_uuid_cols(df)
2111    bytes_cols = get_bytes_cols(df)
2112    numeric_cols = get_numeric_cols(df)
2113    geometry_cols = get_geometry_cols(df)
2114    geometry_cols_srids = {
2115        col: int((getattr(df[col].crs, 'srs', '') or '').split(':', maxsplit=1)[-1] or '0')
2116        for col in geometry_cols
2117    } if 'geodataframe' in str(type(df)).lower() else {}
2118    if safe_copy and bool(uuid_cols or bytes_cols or geometry_cols or numeric_cols):
2119        df = df.copy()
2120    if 'geodataframe' in str(type(df)).lower():
2121        geometry_data = {
2122            col: df[col]
2123            for col in geometry_cols
2124        }
2125        df = pd.DataFrame({
2126            col: df[col]
2127            for col in df.columns
2128            if col not in geometry_cols
2129        })
2130        for col in geometry_cols:
2131            df[col] = pd.Series(ob for ob in geometry_data[col])
2132    for col in uuid_cols:
2133        df[col] = df[col].astype(str)
2134    for col in bytes_cols:
2135        df[col] = df[col].apply(serialize_bytes)
2136    for col in numeric_cols:
2137        df[col] = df[col].apply(serialize_decimal)
2138    with warnings.catch_warnings():
2139        warnings.simplefilter("ignore")
2140        for col in geometry_cols:
2141            srid = geometry_cols_srids.get(col, None) or None
2142            df[col] = pd.Series(
2143                serialize_geometry(val, geometry_format=geometry_format, srid=srid)
2144                for val in df[col]
2145            )
2146    return df.infer_objects(copy=False).fillna(pd.NA).to_json(
2147        date_format=date_format,
2148        date_unit=date_unit,
2149        double_precision=double_precision,
2150        orient=orient,
2151        **kwargs
2152    )

Serialize the given dataframe as a JSON string.

Parameters
  • df (pd.DataFrame): The DataFrame to be serialized.
  • safe_copy (bool, default True): If False, modify the DataFrame inplace.
  • date_format (str, default 'iso'): The default format for timestamps.
  • date_unit (str, default 'us'): The precision of the timestamps.
  • double_precision (int, default 15): The number of decimal places to use when encoding floating point values (maximum 15).
  • geometry_format (str, default 'geojson'): The serialization format for geometry data. Accepted values are geojson, wkb_hex, and wkt.
Returns
  • A JSON string.
def to_simple_lines(df: pandas.core.frame.DataFrame) -> str:
2155def to_simple_lines(df: 'pd.DataFrame') -> str:
2156    """
2157    Serialize a Pandas Dataframe as lines of simple dictionaries.
2158
2159    Parameters
2160    ----------
2161    df: pd.DataFrame
2162        The dataframe to serialize into simple lines text.
2163
2164    Returns
2165    -------
2166    A string of simple line dictionaries joined by newlines.
2167    """
2168    from meerschaum.utils.misc import to_simple_dict
2169    if df is None or len(df) == 0:
2170        return ''
2171
2172    docs = df.to_dict(orient='records')
2173    return '\n'.join(to_simple_dict(doc) for doc in docs)

Serialize a Pandas Dataframe as lines of simple dictionaries.

Parameters
  • df (pd.DataFrame): The dataframe to serialize into simple lines text.
Returns
  • A string of simple line dictionaries joined by newlines.
def parse_simple_lines(data: str) -> pandas.core.frame.DataFrame:
2176def parse_simple_lines(data: str) -> 'pd.DataFrame':
2177    """
2178    Parse simple lines text into a DataFrame.
2179
2180    Parameters
2181    ----------
2182    data: str
2183        The simple lines text to parse into a DataFrame.
2184
2185    Returns
2186    -------
2187    A dataframe containing the rows serialized in `data`.
2188    """
2189    from meerschaum.utils.misc import string_to_dict
2190    from meerschaum.utils.packages import import_pandas
2191    pd = import_pandas()
2192    lines = data.splitlines()
2193    try:
2194        docs = [string_to_dict(line) for line in lines]
2195        df = pd.DataFrame(docs)
2196    except Exception:
2197        df = None
2198
2199    if df is None:
2200        raise ValueError("Cannot parse simple lines into a dataframe.")
2201
2202    return df

Parse simple lines text into a DataFrame.

Parameters
  • data (str): The simple lines text to parse into a DataFrame.
Returns
  • A dataframe containing the rows serialized in data.