meerschaum.utils.dataframe

Utility functions for working with DataFrames.

   1#! /usr/bin/env python3
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4
   5"""
   6Utility functions for working with DataFrames.
   7"""
   8
   9from __future__ import annotations
  10
  11from datetime import datetime, timezone, date
  12from collections import defaultdict
  13
  14import meerschaum as mrsm
  15from meerschaum.utils.typing import (
  16    Optional, Dict, Any, List, Hashable, Generator,
  17    Iterator, Iterable, Union, TYPE_CHECKING, Tuple,
  18)
  19
  20if TYPE_CHECKING:
  21    pd, dask = mrsm.attempt_import('pandas', 'dask')
  22
  23
  24def add_missing_cols_to_df(
  25    df: 'pd.DataFrame',
  26    dtypes: Dict[str, Any],
  27) -> 'pd.DataFrame':
  28    """
  29    Add columns from the dtypes dictionary as null columns to a new DataFrame.
  30
  31    Parameters
  32    ----------
  33    df: pd.DataFrame
  34        The dataframe we should copy and add null columns.
  35
  36    dtypes:
  37        The data types dictionary which may contain keys not present in `df.columns`.
  38
  39    Returns
  40    -------
  41    A new `DataFrame` with the keys from `dtypes` added as null columns.
  42    If `df.dtypes` is the same as `dtypes`, then return a reference to `df`.
  43    NOTE: This will not ensure that dtypes are enforced!
  44
  45    Examples
  46    --------
  47    >>> import pandas as pd
  48    >>> df = pd.DataFrame([{'a': 1}])
  49    >>> dtypes = {'b': 'Int64'}
  50    >>> add_missing_cols_to_df(df, dtypes)
  51          a  b
  52       0  1  <NA>
  53    >>> add_missing_cols_to_df(df, dtypes).dtypes
  54    a    int64
  55    b    Int64
  56    dtype: object
  57    >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes
  58    a    int64
  59    dtype: object
  60    >>> 
  61    """
  62    if set(df.columns) == set(dtypes):
  63        return df
  64
  65    from meerschaum.utils.packages import attempt_import
  66    from meerschaum.utils.dtypes import to_pandas_dtype
  67    pandas = attempt_import('pandas')
  68
  69    def build_series(dtype: str):
  70        return pandas.Series([], dtype=to_pandas_dtype(dtype))
  71
  72    assign_kwargs = {
  73        str(col): build_series(str(typ))
  74        for col, typ in dtypes.items()
  75        if col not in df.columns
  76    }
  77    df_with_cols = df.assign(**assign_kwargs)
  78    for col in assign_kwargs:
  79        df_with_cols[col] = df_with_cols[col].fillna(pandas.NA)
  80    return df_with_cols
  81
  82
  83def filter_unseen_df(
  84    old_df: 'pd.DataFrame',
  85    new_df: 'pd.DataFrame',
  86    safe_copy: bool = True,
  87    dtypes: Optional[Dict[str, Any]] = None,
  88    include_unchanged_columns: bool = False,
  89    coerce_mixed_numerics: bool = True,
  90    debug: bool = False,
  91) -> 'pd.DataFrame':
  92    """
  93    Left join two DataFrames to find the newest unseen data.
  94
  95    Parameters
  96    ----------
  97    old_df: 'pd.DataFrame'
  98        The original (target) dataframe. Acts as a filter on the `new_df`.
  99
 100    new_df: 'pd.DataFrame'
 101        The fetched (source) dataframe. Rows that are contained in `old_df` are removed.
 102
 103    safe_copy: bool, default True
 104        If `True`, create a copy before comparing and modifying the dataframes.
 105        Setting to `False` may mutate the DataFrames.
 106
 107    dtypes: Optional[Dict[str, Any]], default None
 108        Optionally specify the datatypes of the dataframe.
 109
 110    include_unchanged_columns: bool, default False
 111        If `True`, include columns which haven't changed on rows which have changed.
 112
 113    coerce_mixed_numerics: bool, default True
 114        If `True`, cast mixed integer and float columns between the old and new dataframes into
 115        numeric values (`decimal.Decimal`).
 116
 117    debug: bool, default False
 118        Verbosity toggle.
 119
 120    Returns
 121    -------
 122    A pandas dataframe of the new, unseen rows in `new_df`.
 123
 124    Examples
 125    --------
 126    ```python
 127    >>> import pandas as pd
 128    >>> df1 = pd.DataFrame({'a': [1,2]})
 129    >>> df2 = pd.DataFrame({'a': [2,3]})
 130    >>> filter_unseen_df(df1, df2)
 131       a
 132    0  3
 133
 134    ```
 135
 136    """
 137    if old_df is None:
 138        return new_df
 139
 140    if safe_copy:
 141        old_df = old_df.copy()
 142        new_df = new_df.copy()
 143
 144    import json
 145    import functools
 146    import traceback
 147    from meerschaum.utils.warnings import warn
 148    from meerschaum.utils.packages import import_pandas, attempt_import
 149    from meerschaum.utils.dtypes import (
 150        to_pandas_dtype,
 151        are_dtypes_equal,
 152        attempt_cast_to_numeric,
 153        attempt_cast_to_uuid,
 154        attempt_cast_to_bytes,
 155        attempt_cast_to_geometry,
 156        coerce_timezone,
 157        serialize_decimal,
 158    )
 159    from meerschaum.utils.dtypes.sql import get_numeric_precision_scale
 160    pd = import_pandas(debug=debug)
 161    is_dask = 'dask' in new_df.__module__
 162    if is_dask:
 163        pandas = attempt_import('pandas')
 164        _ = attempt_import('partd', lazy=False)
 165        dd = attempt_import('dask.dataframe')
 166        merge = dd.merge
 167        NA = pandas.NA
 168    else:
 169        merge = pd.merge
 170        NA = pd.NA
 171
 172    new_df_dtypes = dict(new_df.dtypes)
 173    old_df_dtypes = dict(old_df.dtypes)
 174
 175    same_cols = set(new_df.columns) == set(old_df.columns)
 176    if not same_cols:
 177        new_df = add_missing_cols_to_df(new_df, old_df_dtypes)
 178        old_df = add_missing_cols_to_df(old_df, new_df_dtypes)
 179
 180        new_types_missing_from_old = {
 181            col: typ
 182            for col, typ in new_df_dtypes.items()
 183            if col not in old_df_dtypes
 184        }
 185        old_types_missing_from_new = {
 186            col: typ
 187            for col, typ in new_df_dtypes.items()
 188            if col not in old_df_dtypes
 189        }
 190        old_df_dtypes.update(new_types_missing_from_old)
 191        new_df_dtypes.update(old_types_missing_from_new)
 192
 193    ### Edge case: two empty lists cast to DFs.
 194    elif len(new_df.columns) == 0:
 195        return new_df
 196
 197    try:
 198        ### Order matters when checking equality.
 199        new_df = new_df[old_df.columns]
 200
 201    except Exception as e:
 202        warn(
 203            "Was not able to cast old columns onto new DataFrame. " +
 204            f"Are both DataFrames the same shape? Error:\n{e}",
 205            stacklevel=3,
 206        )
 207        return new_df[list(new_df_dtypes.keys())]
 208
 209    ### assume the old_df knows what it's doing, even if it's technically wrong.
 210    if dtypes is None:
 211        dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()}
 212
 213    dtypes = {
 214        col: to_pandas_dtype(typ)
 215        for col, typ in dtypes.items()
 216        if col in new_df_dtypes and col in old_df_dtypes
 217    }
 218    for col, typ in new_df_dtypes.items():
 219        if col not in dtypes:
 220            dtypes[col] = typ
 221
 222    numeric_cols_precisions_scales = {
 223        col: get_numeric_precision_scale(None, typ)
 224        for col, typ in dtypes.items()
 225        if col and str(typ).lower().startswith('numeric')
 226    }
 227
 228    dt_dtypes = {
 229        col: typ
 230        for col, typ in dtypes.items()
 231        if are_dtypes_equal(typ, 'datetime')
 232    }
 233    non_dt_dtypes = {
 234        col: typ
 235        for col, typ in dtypes.items()
 236        if col not in dt_dtypes
 237    }
 238
 239    cast_non_dt_cols = True
 240    try:
 241        new_df = new_df.astype(non_dt_dtypes)
 242        cast_non_dt_cols = False
 243    except Exception as e:
 244        warn(
 245            f"Was not able to cast the new DataFrame to the given dtypes.\n{e}"
 246        )
 247
 248    cast_dt_cols = True
 249    try:
 250        for col, typ in dt_dtypes.items():
 251            _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime'))
 252            strip_utc = (
 253                _dtypes_col_dtype.startswith('datetime64')
 254                and 'utc' not in _dtypes_col_dtype.lower()
 255            )
 256            if col in old_df.columns:
 257                old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc)
 258            if col in new_df.columns:
 259                new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc)
 260        cast_dt_cols = False
 261    except Exception as e:
 262        warn(f"Could not cast datetime columns:\n{e}")
 263
 264    cast_cols = cast_dt_cols or cast_non_dt_cols
 265
 266    new_numeric_cols_existing = get_numeric_cols(new_df)
 267    old_numeric_cols = get_numeric_cols(old_df)
 268    for col, typ in {k: v for k, v in dtypes.items()}.items():
 269        if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')):
 270            new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float')
 271            new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int')
 272            new_is_numeric = col in new_numeric_cols_existing
 273            old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float')
 274            old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int')
 275            old_is_numeric = col in old_numeric_cols
 276
 277            if (
 278                coerce_mixed_numerics
 279                and
 280                (new_is_float or new_is_int or new_is_numeric)
 281                and
 282                (old_is_float or old_is_int or old_is_numeric)
 283            ):
 284                dtypes[col] = attempt_cast_to_numeric
 285                cast_cols = True
 286                continue
 287
 288            ### Fallback to object if the types don't match.
 289            warn(
 290                f"Detected different types for '{col}' "
 291                + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), "
 292                + "falling back to 'object'..."
 293            )
 294            dtypes[col] = 'object'
 295            cast_cols = True
 296
 297    if cast_cols:
 298        for col, dtype in dtypes.items():
 299            if col in new_df.columns:
 300                try:
 301                    new_df[col] = (
 302                        new_df[col].astype(dtype)
 303                        if not callable(dtype)
 304                        else new_df[col].apply(dtype)
 305                    )
 306                except Exception as e:
 307                    warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}")
 308
 309    serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str)
 310    new_json_cols = get_json_cols(new_df)
 311    old_json_cols = get_json_cols(old_df)
 312    json_cols = set(new_json_cols + old_json_cols)
 313    for json_col in old_json_cols:
 314        old_df[json_col] = old_df[json_col].apply(serializer)
 315    for json_col in new_json_cols:
 316        new_df[json_col] = new_df[json_col].apply(serializer)
 317
 318    new_numeric_cols = get_numeric_cols(new_df)
 319    numeric_cols = set(new_numeric_cols + old_numeric_cols)
 320    for numeric_col in old_numeric_cols:
 321        old_df[numeric_col] = old_df[numeric_col].apply(serialize_decimal)
 322    for numeric_col in new_numeric_cols:
 323        new_df[numeric_col] = new_df[numeric_col].apply(serialize_decimal)
 324
 325    old_dt_cols = [
 326        col
 327        for col, typ in old_df.dtypes.items()
 328        if are_dtypes_equal(str(typ), 'datetime')
 329    ]
 330    for col in old_dt_cols:
 331        _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime'))
 332        strip_utc = (
 333            _dtypes_col_dtype.startswith('datetime64')
 334            and 'utc' not in _dtypes_col_dtype.lower()
 335        )
 336        old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc)
 337
 338    new_dt_cols = [
 339        col
 340        for col, typ in new_df.dtypes.items()
 341        if are_dtypes_equal(str(typ), 'datetime')
 342    ]
 343    for col in new_dt_cols:
 344        _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime'))
 345        strip_utc = (
 346            _dtypes_col_dtype.startswith('datetime64')
 347            and 'utc' not in _dtypes_col_dtype.lower()
 348        )
 349        new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc)
 350
 351    old_uuid_cols = get_uuid_cols(old_df)
 352    new_uuid_cols = get_uuid_cols(new_df)
 353    uuid_cols = set(new_uuid_cols + old_uuid_cols)
 354
 355    old_bytes_cols = get_bytes_cols(old_df)
 356    new_bytes_cols = get_bytes_cols(new_df)
 357    bytes_cols = set(new_bytes_cols + old_bytes_cols)
 358
 359    old_geometry_cols = get_geometry_cols(old_df)
 360    new_geometry_cols = get_geometry_cols(new_df)
 361    geometry_cols = set(new_geometry_cols + old_geometry_cols)
 362
 363    na_pattern = r'(?i)^(none|nan|na|nat|natz|<na>)$'
 364    joined_df = merge(
 365        new_df.infer_objects().replace(na_pattern, pd.NA, regex=True).fillna(NA),
 366        old_df.infer_objects().replace(na_pattern, pd.NA, regex=True).fillna(NA),
 367        how='left',
 368        on=None,
 369        indicator=True,
 370    )
 371    changed_rows_mask = (joined_df['_merge'] == 'left_only')
 372    new_cols = list(new_df_dtypes)
 373    delta_df = joined_df[new_cols][changed_rows_mask].reset_index(drop=True)
 374
 375    delta_json_cols = get_json_cols(delta_df)
 376    for json_col in json_cols:
 377        if (
 378            json_col in delta_json_cols
 379            or json_col not in delta_df.columns
 380        ):
 381            continue
 382        try:
 383            delta_df[json_col] = delta_df[json_col].apply(
 384                lambda x: (json.loads(x) if isinstance(x, str) else x)
 385            )
 386        except Exception:
 387            warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}")
 388
 389    delta_numeric_cols = get_numeric_cols(delta_df)
 390    for numeric_col in numeric_cols:
 391        if (
 392            numeric_col in delta_numeric_cols
 393            or numeric_col not in delta_df.columns
 394        ):
 395            continue
 396        try:
 397            delta_df[numeric_col] = delta_df[numeric_col].apply(
 398                functools.partial(
 399                    attempt_cast_to_numeric,
 400                    quantize=True,
 401                    precision=numeric_cols_precisions_scales.get(numeric_col, (None, None)[0]),
 402                    scale=numeric_cols_precisions_scales.get(numeric_col, (None, None)[1]),
 403                )
 404            )
 405        except Exception:
 406            warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}")
 407
 408    delta_uuid_cols = get_uuid_cols(delta_df)
 409    for uuid_col in uuid_cols:
 410        if (
 411            uuid_col in delta_uuid_cols
 412            or uuid_col not in delta_df.columns
 413        ):
 414            continue
 415        try:
 416            delta_df[uuid_col] = delta_df[uuid_col].apply(attempt_cast_to_uuid)
 417        except Exception:
 418            warn(f"Unable to parse numeric column '{uuid_col}':\n{traceback.format_exc()}")
 419
 420    delta_bytes_cols = get_bytes_cols(delta_df)
 421    for bytes_col in bytes_cols:
 422        if (
 423            bytes_col in delta_bytes_cols
 424            or bytes_col not in delta_df.columns
 425        ):
 426            continue
 427        try:
 428            delta_df[bytes_col] = delta_df[bytes_col].apply(attempt_cast_to_bytes)
 429        except Exception:
 430            warn(f"Unable to parse bytes column '{bytes_col}':\n{traceback.format_exc()}")
 431
 432    delta_geometry_cols = get_geometry_cols(delta_df)
 433    for geometry_col in geometry_cols:
 434        if (
 435            geometry_col in delta_geometry_cols
 436            or geometry_col not in delta_df.columns
 437        ):
 438            continue
 439        try:
 440            delta_df[geometry_col] = attempt_cast_to_geometry(delta_df[geometry_col])
 441        except Exception:
 442            warn(f"Unable to parse geometry column '{geometry_col}':\n{traceback.format_exc()}")
 443
 444    return delta_df
 445
 446
 447def parse_df_datetimes(
 448    df: 'pd.DataFrame',
 449    ignore_cols: Optional[Iterable[str]] = None,
 450    strip_timezone: bool = False,
 451    chunksize: Optional[int] = None,
 452    dtype_backend: str = 'numpy_nullable',
 453    ignore_all: bool = False,
 454    precision_unit: Optional[str] = None,
 455    coerce_utc: bool = True,
 456    debug: bool = False,
 457) -> 'pd.DataFrame':
 458    """
 459    Parse a pandas DataFrame for datetime columns and cast as datetimes.
 460
 461    Parameters
 462    ----------
 463    df: pd.DataFrame
 464        The pandas DataFrame to parse.
 465
 466    ignore_cols: Optional[Iterable[str]], default None
 467        If provided, do not attempt to coerce these columns as datetimes.
 468
 469    strip_timezone: bool, default False
 470        If `True`, remove the UTC `tzinfo` property.
 471
 472    chunksize: Optional[int], default None
 473        If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe.
 474
 475    dtype_backend: str, default 'numpy_nullable'
 476        If `df` is not a DataFrame and new one needs to be constructed,
 477        use this as the datatypes backend.
 478        Accepted values are 'numpy_nullable' and 'pyarrow'.
 479
 480    ignore_all: bool, default False
 481        If `True`, do not attempt to cast any columns to datetimes.
 482
 483    precision_unit: Optional[str], default None
 484        If provided, enforce the given precision on the coerced datetime columns.
 485
 486    coerce_utc: bool, default True
 487        Coerce the datetime columns to UTC (see `meerschaum.utils.dtypes.to_datetime()`).
 488
 489    debug: bool, default False
 490        Verbosity toggle.
 491
 492    Returns
 493    -------
 494    A new pandas DataFrame with the determined datetime columns
 495    (usually ISO strings) cast as datetimes.
 496
 497    Examples
 498    --------
 499    ```python
 500    >>> import pandas as pd
 501    >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 
 502    >>> df.dtypes
 503    a    object
 504    dtype: object
 505    >>> df2 = parse_df_datetimes(df)
 506    >>> df2.dtypes
 507    a    datetime64[us, UTC]
 508    dtype: object
 509
 510    ```
 511
 512    """
 513    from meerschaum.utils.packages import import_pandas, attempt_import
 514    from meerschaum.utils.debug import dprint
 515    from meerschaum.utils.warnings import warn
 516    from meerschaum.utils.misc import items_str
 517    from meerschaum.utils.dtypes import to_datetime, MRSM_PD_DTYPES
 518    import traceback
 519
 520    pd = import_pandas()
 521    pandas = attempt_import('pandas')
 522    pd_name = pd.__name__
 523    using_dask = 'dask' in pd_name
 524    df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__)
 525    dask_dataframe = None
 526    if using_dask or df_is_dask:
 527        npartitions = chunksize_to_npartitions(chunksize)
 528        dask_dataframe = attempt_import('dask.dataframe')
 529
 530    ### if df is a dict, build DataFrame
 531    if isinstance(df, pandas.DataFrame):
 532        pdf = df
 533    elif df_is_dask and isinstance(df, dask_dataframe.DataFrame):
 534        pdf = get_first_valid_dask_partition(df)
 535    else:
 536        if debug:
 537            dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...")
 538
 539        if using_dask:
 540            if isinstance(df, list):
 541                keys = set()
 542                for doc in df:
 543                    for key in doc:
 544                        keys.add(key)
 545                df = pd.DataFrame.from_dict(
 546                    {
 547                        k: [
 548                            doc.get(k, None)
 549                            for doc in df
 550                        ] for k in keys
 551                    },
 552                    npartitions=npartitions,
 553                )
 554            elif isinstance(df, dict):
 555                df = pd.DataFrame.from_dict(df, npartitions=npartitions)
 556            elif 'pandas.core.frame.DataFrame' in str(type(df)):
 557                df = pd.from_pandas(df, npartitions=npartitions)
 558            else:
 559                raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.")
 560            pandas = attempt_import('pandas')
 561            pdf = get_first_valid_dask_partition(df)
 562
 563        else:
 564            df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend)
 565            pdf = df
 566
 567    ### skip parsing if DataFrame is empty
 568    if len(pdf) == 0:
 569        if debug:
 570            dprint("df is empty. Returning original DataFrame without casting datetime columns...")
 571        return df
 572
 573    ignore_cols = set(
 574        (ignore_cols or []) + [
 575            col
 576            for col, dtype in pdf.dtypes.items() 
 577            if 'datetime' in str(dtype)
 578        ]
 579    )
 580    cols_to_inspect = [
 581        col
 582        for col in pdf.columns
 583        if col not in ignore_cols
 584    ] if not ignore_all else []
 585
 586    if len(cols_to_inspect) == 0:
 587        if debug:
 588            dprint("All columns are ignored, skipping datetime detection...")
 589        return df.infer_objects().fillna(pandas.NA)
 590
 591    ### apply regex to columns to determine which are ISO datetimes
 592    iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+'
 593    dt_mask = pdf[cols_to_inspect].astype(str).apply(
 594        lambda s: s.str.match(iso_dt_regex).all()
 595    )
 596
 597    ### list of datetime column names
 598    datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]]
 599    if not datetime_cols:
 600        if debug:
 601            dprint("No columns detected as datetimes, returning...")
 602        return df.infer_objects().fillna(pandas.NA)
 603
 604    if debug:
 605        dprint("Converting columns to datetimes: " + str(datetime_cols))
 606
 607    def _parse_to_datetime(x):
 608        return to_datetime(x, precision_unit=precision_unit, coerce_utc=coerce_utc)
 609
 610    try:
 611        if not using_dask:
 612            df[datetime_cols] = df[datetime_cols].apply(_parse_to_datetime)
 613        else:
 614            df[datetime_cols] = df[datetime_cols].apply(
 615                _parse_to_datetime,
 616                utc=True,
 617                axis=1,
 618                meta={
 619                    col: MRSM_PD_DTYPES['datetime']
 620                    for col in datetime_cols
 621                }
 622            )
 623    except Exception:
 624        warn(
 625            f"Unable to apply `to_datetime()` to {items_str(datetime_cols)}:\n"
 626            + f"{traceback.format_exc()}"
 627        )
 628
 629    if strip_timezone:
 630        for dt in datetime_cols:
 631            try:
 632                df[dt] = df[dt].dt.tz_localize(None)
 633            except Exception:
 634                warn(
 635                    f"Unable to convert column '{dt}' to naive datetime:\n"
 636                    + f"{traceback.format_exc()}"
 637                )
 638
 639    return df.fillna(pandas.NA)
 640
 641
 642def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]:
 643    """
 644    Get the columns which contain unhashable objects from a Pandas DataFrame.
 645
 646    Parameters
 647    ----------
 648    df: pd.DataFrame
 649        The DataFrame which may contain unhashable objects.
 650
 651    Returns
 652    -------
 653    A list of columns.
 654    """
 655    if df is None:
 656        return []
 657    if len(df) == 0:
 658        return []
 659
 660    is_dask = 'dask' in df.__module__
 661    if is_dask:
 662        from meerschaum.utils.packages import attempt_import
 663        pandas = attempt_import('pandas')
 664        df = pandas.DataFrame(get_first_valid_dask_partition(df))
 665    return [
 666        col for col, val in df.iloc[0].items()
 667        if not isinstance(val, Hashable)
 668    ]
 669
 670
 671def get_json_cols(df: 'pd.DataFrame') -> List[str]:
 672    """
 673    Get the columns which contain unhashable objects from a Pandas DataFrame.
 674
 675    Parameters
 676    ----------
 677    df: pd.DataFrame
 678        The DataFrame which may contain unhashable objects.
 679
 680    Returns
 681    -------
 682    A list of columns to be encoded as JSON.
 683    """
 684    if df is None:
 685        return []
 686
 687    is_dask = 'dask' in df.__module__ if hasattr(df, '__module__') else False
 688    if is_dask:
 689        df = get_first_valid_dask_partition(df)
 690
 691    if len(df) == 0:
 692        return []
 693
 694    cols_indices = {
 695        col: df[col].first_valid_index()
 696        for col in df.columns
 697    }
 698    return [
 699        col
 700        for col, ix in cols_indices.items()
 701        if (
 702            ix is not None
 703            and isinstance(df.loc[ix][col], (dict, list))
 704        )
 705    ]
 706
 707
 708def get_numeric_cols(df: 'pd.DataFrame') -> List[str]:
 709    """
 710    Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame.
 711
 712    Parameters
 713    ----------
 714    df: pd.DataFrame
 715        The DataFrame which may contain decimal objects.
 716
 717    Returns
 718    -------
 719    A list of columns to treat as numerics.
 720    """
 721    if df is None:
 722        return []
 723    from decimal import Decimal
 724    is_dask = 'dask' in df.__module__
 725    if is_dask:
 726        df = get_first_valid_dask_partition(df)
 727
 728    if len(df) == 0:
 729        return []
 730
 731    cols_indices = {
 732        col: df[col].first_valid_index()
 733        for col in df.columns
 734    }
 735    return [
 736        col
 737        for col, ix in cols_indices.items()
 738        if (
 739            ix is not None
 740            and
 741            isinstance(df.loc[ix][col], Decimal)
 742        )
 743    ]
 744
 745
 746def get_bool_cols(df: 'pd.DataFrame') -> List[str]:
 747    """
 748    Get the columns which contain `bool` objects from a Pandas DataFrame.
 749
 750    Parameters
 751    ----------
 752    df: pd.DataFrame
 753        The DataFrame which may contain bools.
 754
 755    Returns
 756    -------
 757    A list of columns to treat as bools.
 758    """
 759    if df is None:
 760        return []
 761
 762    is_dask = 'dask' in df.__module__
 763    if is_dask:
 764        df = get_first_valid_dask_partition(df)
 765
 766    if len(df) == 0:
 767        return []
 768
 769    from meerschaum.utils.dtypes import are_dtypes_equal
 770
 771    return [
 772        col
 773        for col, typ in df.dtypes.items()
 774        if are_dtypes_equal(str(typ), 'bool')
 775    ]
 776
 777
 778def get_uuid_cols(df: 'pd.DataFrame') -> List[str]:
 779    """
 780    Get the columns which contain `uuid.UUID` objects from a Pandas DataFrame.
 781
 782    Parameters
 783    ----------
 784    df: pd.DataFrame
 785        The DataFrame which may contain UUID objects.
 786
 787    Returns
 788    -------
 789    A list of columns to treat as UUIDs.
 790    """
 791    if df is None:
 792        return []
 793    from uuid import UUID
 794    is_dask = 'dask' in df.__module__
 795    if is_dask:
 796        df = get_first_valid_dask_partition(df)
 797
 798    if len(df) == 0:
 799        return []
 800
 801    cols_indices = {
 802        col: df[col].first_valid_index()
 803        for col in df.columns
 804    }
 805    return [
 806        col
 807        for col, ix in cols_indices.items()
 808        if (
 809            ix is not None
 810            and
 811            isinstance(df.loc[ix][col], UUID)
 812        )
 813    ]
 814
 815
 816def get_datetime_cols(
 817    df: 'pd.DataFrame',
 818    timezone_aware: bool = True,
 819    timezone_naive: bool = True,
 820    with_tz_precision: bool = False,
 821) -> Union[List[str], Dict[str, Tuple[Union[str, None], str]]]:
 822    """
 823    Get the columns which contain `datetime` or `Timestamp` objects from a Pandas DataFrame.
 824
 825    Parameters
 826    ----------
 827    df: pd.DataFrame
 828        The DataFrame which may contain `datetime` or `Timestamp` objects.
 829
 830    timezone_aware: bool, default True
 831        If `True`, include timezone-aware datetime columns.
 832
 833    timezone_naive: bool, default True
 834        If `True`, include timezone-naive datetime columns.
 835
 836    with_tz_precision: bool, default False
 837        If `True`, return a dictionary mapping column names to tuples in the form
 838        `(timezone, precision)`.
 839
 840    Returns
 841    -------
 842    A list of columns to treat as datetimes, or a dictionary of columns to tz+precision tuples
 843    (if `with_tz_precision` is `True`).
 844    """
 845    if not timezone_aware and not timezone_naive:
 846        raise ValueError("`timezone_aware` and `timezone_naive` cannot both be `False`.")
 847
 848    if df is None:
 849        return [] if not with_tz_precision else {}
 850
 851    from meerschaum.utils.dtypes import are_dtypes_equal, MRSM_PRECISION_UNITS_ALIASES
 852    is_dask = 'dask' in df.__module__
 853    if is_dask:
 854        df = get_first_valid_dask_partition(df)
 855   
 856    def get_tz_precision_from_dtype(dtype: str) -> Tuple[Union[str, None], str]:
 857        """
 858        Extract the tz + precision tuple from a dtype string.
 859        """
 860        meta_str = dtype.split('[', maxsplit=1)[-1].rstrip(']').replace(' ', '')
 861        tz = (
 862            None
 863            if ',' not in meta_str
 864            else meta_str.split(',', maxsplit=1)[-1]
 865        )
 866        precision_abbreviation = (
 867            meta_str
 868            if ',' not in meta_str
 869            else meta_str.split(',')[0]
 870        )
 871        precision = MRSM_PRECISION_UNITS_ALIASES[precision_abbreviation]
 872        return tz, precision
 873
 874    def get_tz_precision_from_datetime(dt: datetime) -> Tuple[Union[str, None], str]:
 875        """
 876        Return the tz + precision tuple from a Python datetime object.
 877        """
 878        return dt.tzname(), 'microsecond'
 879
 880    known_dt_cols_types = {
 881        col: str(typ)
 882        for col, typ in df.dtypes.items()
 883        if are_dtypes_equal('datetime', str(typ))
 884    }
 885 
 886    known_dt_cols_tuples = {
 887        col: get_tz_precision_from_dtype(typ)
 888        for col, typ in known_dt_cols_types.items()
 889    }
 890
 891    if len(df) == 0:
 892        return (
 893            list(known_dt_cols_types)
 894            if not with_tz_precision
 895            else known_dt_cols_tuples
 896        )
 897
 898    cols_indices = {
 899        col: df[col].first_valid_index()
 900        for col in df.columns
 901        if col not in known_dt_cols_types
 902    }
 903    pydt_cols_tuples = {
 904        col: get_tz_precision_from_datetime(sample_val)
 905        for col, ix in cols_indices.items()
 906        if (
 907            ix is not None
 908            and
 909            isinstance((sample_val := df.loc[ix][col]), datetime)
 910        )
 911    }
 912
 913    dt_cols_tuples = {
 914        **known_dt_cols_tuples,
 915        **pydt_cols_tuples
 916    }
 917
 918    all_dt_cols_tuples = {
 919        col: dt_cols_tuples[col]
 920        for col in df.columns
 921        if col in dt_cols_tuples
 922    }
 923    if timezone_aware and timezone_naive:
 924        return (
 925            list(all_dt_cols_tuples)
 926            if not with_tz_precision
 927            else all_dt_cols_tuples
 928        )
 929
 930    known_timezone_aware_dt_cols = [
 931        col
 932        for col in known_dt_cols_types
 933        if getattr(df[col], 'tz', None) is not None
 934    ]
 935    timezone_aware_pydt_cols_tuples = {
 936        col: (tz, precision)
 937        for col, (tz, precision) in pydt_cols_tuples.items()
 938        if df.loc[cols_indices[col]][col].tzinfo is not None
 939    }
 940    timezone_aware_dt_cols_set = set(
 941        known_timezone_aware_dt_cols + list(timezone_aware_pydt_cols_tuples)
 942    )
 943    timezone_aware_cols_tuples = {
 944        col: (tz, precision)
 945        for col, (tz, precision) in all_dt_cols_tuples.items()
 946        if col in timezone_aware_dt_cols_set
 947    }
 948    timezone_naive_cols_tuples = {
 949        col: (tz, precision)
 950        for col, (tz, precision) in all_dt_cols_tuples.items()
 951        if col not in timezone_aware_dt_cols_set
 952    }
 953
 954    if timezone_aware:
 955        return (
 956            list(timezone_aware_cols_tuples)
 957            if not with_tz_precision
 958            else timezone_aware_cols_tuples
 959        )
 960
 961    return (
 962        list(timezone_naive_cols_tuples)
 963        if not with_tz_precision
 964        else timezone_naive_cols_tuples
 965    )
 966
 967
 968def get_datetime_cols_types(df: 'pd.DataFrame') -> Dict[str, str]:
 969    """
 970    Return a dictionary mapping datetime columns to specific types strings.
 971
 972    Parameters
 973    ----------
 974    df: pd.DataFrame
 975        The DataFrame which may contain datetime columns.
 976
 977    Returns
 978    -------
 979    A dictionary mapping the datetime columns' names to dtype strings
 980    (containing timezone and precision metadata).
 981
 982    Examples
 983    --------
 984    >>> from datetime import datetime, timezone
 985    >>> import pandas as pd
 986    >>> df = pd.DataFrame({'dt_tz_aware': [datetime(2025, 1, 1, tzinfo=timezone.utc)]})
 987    >>> get_datetime_cols_types(df)
 988    {'dt_tz_aware': 'datetime64[us, UTC]'}
 989    >>> df = pd.DataFrame({'distant_dt': [datetime(1, 1, 1)]})
 990    >>> get_datetime_cols_types(df)
 991    {'distant_dt': 'datetime64[us]'}
 992    >>> df = pd.DataFrame({'dt_second': datetime(2025, 1, 1)})
 993    >>> df['dt_second'] = df['dt_second'].astype('datetime64[s]')
 994    >>> get_datetime_cols_types(df)
 995    {'dt_second': 'datetime64[s]'}
 996    """
 997    from meerschaum.utils.dtypes import MRSM_PRECISION_UNITS_ABBREVIATIONS
 998    dt_cols_tuples = get_datetime_cols(df, with_tz_precision=True)
 999    if not dt_cols_tuples:
1000        return {}
1001
1002    return {
1003        col: (
1004            f"datetime64[{MRSM_PRECISION_UNITS_ABBREVIATIONS[precision]}]"
1005            if tz is None
1006            else f"datetime64[{MRSM_PRECISION_UNITS_ABBREVIATIONS[precision]}, {tz}]"
1007        )
1008        for col, (tz, precision) in dt_cols_tuples.items()
1009    }
1010
1011
1012def get_date_cols(df: 'pd.DataFrame') -> List[str]:
1013    """
1014    Get the `date` columns from a Pandas DataFrame.
1015
1016    Parameters
1017    ----------
1018    df: pd.DataFrame
1019        The DataFrame which may contain dates.
1020
1021    Returns
1022    -------
1023    A list of columns to treat as dates.
1024    """
1025    from meerschaum.utils.dtypes import are_dtypes_equal
1026    if df is None:
1027        return []
1028
1029    is_dask = 'dask' in df.__module__
1030    if is_dask:
1031        df = get_first_valid_dask_partition(df)
1032
1033    known_date_cols = [
1034        col
1035        for col, typ in df.dtypes.items()
1036        if are_dtypes_equal(typ, 'date')
1037    ]
1038
1039    if len(df) == 0:
1040        return known_date_cols
1041
1042    cols_indices = {
1043        col: df[col].first_valid_index()
1044        for col in df.columns
1045        if col not in known_date_cols
1046    }
1047    object_date_cols = [
1048        col
1049        for col, ix in cols_indices.items()
1050        if (
1051            ix is not None
1052            and isinstance(df.loc[ix][col], date)
1053        )
1054    ]
1055
1056    all_date_cols = set(known_date_cols + object_date_cols)
1057
1058    return [
1059        col
1060        for col in df.columns
1061        if col in all_date_cols
1062    ]
1063
1064
1065def get_bytes_cols(df: 'pd.DataFrame') -> List[str]:
1066    """
1067    Get the columns which contain bytes strings from a Pandas DataFrame.
1068
1069    Parameters
1070    ----------
1071    df: pd.DataFrame
1072        The DataFrame which may contain bytes strings.
1073
1074    Returns
1075    -------
1076    A list of columns to treat as bytes.
1077    """
1078    if df is None:
1079        return []
1080
1081    is_dask = 'dask' in df.__module__
1082    if is_dask:
1083        df = get_first_valid_dask_partition(df)
1084
1085    known_bytes_cols = [
1086        col
1087        for col, typ in df.dtypes.items()
1088        if str(typ) == 'binary[pyarrow]'
1089    ]
1090
1091    if len(df) == 0:
1092        return known_bytes_cols
1093
1094    cols_indices = {
1095        col: df[col].first_valid_index()
1096        for col in df.columns
1097        if col not in known_bytes_cols
1098    }
1099    object_bytes_cols = [
1100        col
1101        for col, ix in cols_indices.items()
1102        if (
1103            ix is not None
1104            and isinstance(df.loc[ix][col], bytes)
1105        )
1106    ]
1107
1108    all_bytes_cols = set(known_bytes_cols + object_bytes_cols)
1109
1110    return [
1111        col
1112        for col in df.columns
1113        if col in all_bytes_cols
1114    ]
1115
1116
1117def get_geometry_cols(
1118    df: 'pd.DataFrame',
1119    with_types_srids: bool = False,
1120) -> Union[List[str], Dict[str, Any]]:
1121    """
1122    Get the columns which contain shapely objects from a Pandas DataFrame.
1123
1124    Parameters
1125    ----------
1126    df: pd.DataFrame
1127        The DataFrame which may contain bytes strings.
1128
1129    with_types_srids: bool, default False
1130        If `True`, return a dictionary mapping columns to geometry types and SRIDs.
1131
1132    Returns
1133    -------
1134    A list of columns to treat as `geometry`.
1135    If `with_types_srids`, return a dictionary mapping columns to tuples in the form (type, SRID).
1136    """
1137    if df is None:
1138        return [] if not with_types_srids else {}
1139
1140    is_dask = 'dask' in df.__module__
1141    if is_dask:
1142        df = get_first_valid_dask_partition(df)
1143
1144    if len(df) == 0:
1145        return [] if not with_types_srids else {}
1146
1147    cols_indices = {
1148        col: df[col].first_valid_index()
1149        for col in df.columns
1150    }
1151    geo_cols = [
1152        col
1153        for col, ix in cols_indices.items()
1154        if (
1155            ix is not None
1156            and
1157            'shapely' in str(type(df.loc[ix][col]))
1158        )
1159    ]
1160    if not with_types_srids:
1161        return geo_cols
1162
1163    gpd = mrsm.attempt_import('geopandas', lazy=False)
1164    geo_cols_types_srids = {}
1165    for col in geo_cols:
1166        try:
1167            sample_geo_series = gpd.GeoSeries(df[col], crs=None)
1168            geometry_types = {
1169                geom.geom_type
1170                for geom in sample_geo_series
1171                if hasattr(geom, 'geom_type')
1172            }
1173            geometry_has_z = any(getattr(geom, 'has_z', False) for geom in sample_geo_series)
1174            srid = (
1175                (
1176                    sample_geo_series.crs.sub_crs_list[0].to_epsg()
1177                    if sample_geo_series.crs.is_compound
1178                    else sample_geo_series.crs.to_epsg()
1179                )
1180                if sample_geo_series.crs
1181                else 0
1182            )
1183            geometry_type = list(geometry_types)[0] if len(geometry_types) == 1 else 'geometry'
1184            if geometry_type != 'geometry' and geometry_has_z:
1185                geometry_type = geometry_type + 'Z'
1186        except Exception:
1187            srid = 0
1188            geometry_type = 'geometry'
1189        geo_cols_types_srids[col] = (geometry_type, srid)
1190
1191    return geo_cols_types_srids
1192
1193
1194def get_geometry_cols_types(df: 'pd.DataFrame') -> Dict[str, str]:
1195    """
1196    Return a dtypes dictionary mapping columns to specific geometry types (type, srid).
1197    """
1198    geometry_cols_types_srids = get_geometry_cols(df, with_types_srids=True)
1199    new_cols_types = {}
1200    for col, (geometry_type, srid) in geometry_cols_types_srids.items():
1201        new_dtype = "geometry"
1202        modifier = ""
1203        if not srid and geometry_type.lower() == 'geometry':
1204            new_cols_types[col] = new_dtype
1205            continue
1206
1207        modifier = "["
1208        if geometry_type.lower() != 'geometry':
1209            modifier += f"{geometry_type}"
1210
1211        if srid:
1212            if modifier != '[':
1213                modifier += ", "
1214            modifier += f"{srid}"
1215        modifier += "]"
1216        new_cols_types[col] = f"{new_dtype}{modifier}"
1217    return new_cols_types
1218
1219
1220def get_special_cols(df: 'pd.DataFrame') -> Dict[str, str]:
1221    """
1222    Return a dtypes dictionary mapping special columns to their dtypes.
1223    """
1224    return {
1225        **{col: 'json' for col in get_json_cols(df)},
1226        **{col: 'uuid' for col in get_uuid_cols(df)},
1227        **{col: 'bytes' for col in get_bytes_cols(df)},
1228        **{col: 'bool' for col in get_bool_cols(df)},
1229        **{col: 'numeric' for col in get_numeric_cols(df)},
1230        **{col: 'date' for col in get_date_cols(df)},
1231        **get_datetime_cols_types(df),
1232        **get_geometry_cols_types(df),
1233    }
1234
1235
1236def enforce_dtypes(
1237    df: 'pd.DataFrame',
1238    dtypes: Dict[str, str],
1239    explicit_dtypes: Optional[Dict[str, str]] = None,
1240    safe_copy: bool = True,
1241    coerce_numeric: bool = False,
1242    coerce_timezone: bool = True,
1243    strip_timezone: bool = False,
1244    debug: bool = False,
1245) -> 'pd.DataFrame':
1246    """
1247    Enforce the `dtypes` dictionary on a DataFrame.
1248
1249    Parameters
1250    ----------
1251    df: pd.DataFrame
1252        The DataFrame on which to enforce dtypes.
1253
1254    dtypes: Dict[str, str]
1255        The data types to attempt to enforce on the DataFrame.
1256
1257    explicit_dtypes: Optional[Dict[str, str]], default None
1258        If provided, automatic dtype coersion will respect explicitly configured
1259        dtypes (`int`, `float`, `numeric`).
1260
1261    safe_copy: bool, default True
1262        If `True`, create a copy before comparing and modifying the dataframes.
1263        Setting to `False` may mutate the DataFrames.
1264        See `meerschaum.utils.dataframe.filter_unseen_df`.
1265
1266    coerce_numeric: bool, default False
1267        If `True`, convert float and int collisions to numeric.
1268
1269    coerce_timezone: bool, default True
1270        If `True`, convert datetimes to UTC.
1271
1272    strip_timezone: bool, default False
1273        If `coerce_timezone` and `strip_timezone` are `True`,
1274        remove timezone information from datetimes.
1275
1276    debug: bool, default False
1277        Verbosity toggle.
1278
1279    Returns
1280    -------
1281    The Pandas DataFrame with the types enforced.
1282    """
1283    import json
1284    import functools
1285    from meerschaum.utils.debug import dprint
1286    from meerschaum.utils.formatting import pprint
1287    from meerschaum.utils.dtypes import (
1288        are_dtypes_equal,
1289        to_pandas_dtype,
1290        is_dtype_numeric,
1291        attempt_cast_to_numeric,
1292        attempt_cast_to_uuid,
1293        attempt_cast_to_bytes,
1294        attempt_cast_to_geometry,
1295        coerce_timezone as _coerce_timezone,
1296        get_geometry_type_srid,
1297    )
1298    from meerschaum.utils.dtypes.sql import get_numeric_precision_scale
1299    pandas = mrsm.attempt_import('pandas')
1300    is_dask = 'dask' in df.__module__
1301    if safe_copy:
1302        df = df.copy()
1303    if len(df.columns) == 0:
1304        if debug:
1305            dprint("Incoming DataFrame has no columns. Skipping enforcement...")
1306        return df
1307
1308    explicit_dtypes = explicit_dtypes or {}
1309    pipe_pandas_dtypes = {
1310        col: to_pandas_dtype(typ)
1311        for col, typ in dtypes.items()
1312    }
1313    json_cols = [
1314        col
1315        for col, typ in dtypes.items()
1316        if typ == 'json'
1317    ]
1318    numeric_cols = [
1319        col
1320        for col, typ in dtypes.items()
1321        if typ.startswith('numeric')
1322    ]
1323    geometry_cols_types_srids = {
1324        col: get_geometry_type_srid(typ, default_srid=0)
1325        for col, typ in dtypes.items()
1326        if typ.startswith('geometry') or typ.startswith('geography')
1327    }
1328    uuid_cols = [
1329        col
1330        for col, typ in dtypes.items()
1331        if typ == 'uuid'
1332    ]
1333    bytes_cols = [
1334        col
1335        for col, typ in dtypes.items()
1336        if typ == 'bytes'
1337    ]
1338    datetime_cols = [
1339        col
1340        for col, typ in dtypes.items()
1341        if are_dtypes_equal(typ, 'datetime')
1342    ]
1343    df_numeric_cols = get_numeric_cols(df)
1344    if debug:
1345        dprint("Desired data types:")
1346        pprint(dtypes)
1347        dprint("Data types for incoming DataFrame:")
1348        pprint({_col: str(_typ) for _col, _typ in df.dtypes.items()})
1349
1350    if json_cols and len(df) > 0:
1351        if debug:
1352            dprint(f"Checking columns for JSON encoding: {json_cols}")
1353        for col in json_cols:
1354            if col in df.columns:
1355                try:
1356                    df[col] = df[col].apply(
1357                        (
1358                            lambda x: (
1359                                json.loads(x)
1360                                if isinstance(x, str)
1361                                else x
1362                            )
1363                        )
1364                    )
1365                except Exception as e:
1366                    if debug:
1367                        dprint(f"Unable to parse column '{col}' as JSON:\n{e}")
1368
1369    if numeric_cols:
1370        if debug:
1371            dprint(f"Checking for numerics: {numeric_cols}")
1372        for col in numeric_cols:
1373            precision, scale = get_numeric_precision_scale(None, dtypes.get(col, ''))
1374            if col in df.columns:
1375                try:
1376                    df[col] = df[col].apply(
1377                        functools.partial(
1378                            attempt_cast_to_numeric,
1379                            quantize=True,
1380                            precision=precision,
1381                            scale=scale,
1382                        )
1383                    )
1384                except Exception as e:
1385                    if debug:
1386                        dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}")
1387
1388    if uuid_cols:
1389        if debug:
1390            dprint(f"Checking for UUIDs: {uuid_cols}")
1391        for col in uuid_cols:
1392            if col in df.columns:
1393                try:
1394                    df[col] = df[col].apply(attempt_cast_to_uuid)
1395                except Exception as e:
1396                    if debug:
1397                        dprint(f"Unable to parse column '{col}' as UUID:\n{e}")
1398
1399    if bytes_cols:
1400        if debug:
1401            dprint(f"Checking for bytes: {bytes_cols}")
1402        for col in bytes_cols:
1403            if col in df.columns:
1404                try:
1405                    df[col] = df[col].apply(attempt_cast_to_bytes)
1406                except Exception as e:
1407                    if debug:
1408                        dprint(f"Unable to parse column '{col}' as bytes:\n{e}")
1409
1410    if datetime_cols and coerce_timezone:
1411        if debug:
1412            dprint(f"Checking for datetime conversion: {datetime_cols}")
1413        for col in datetime_cols:
1414            if col in df.columns:
1415                if not strip_timezone and 'utc' in str(df.dtypes[col]).lower():
1416                    if debug:
1417                        dprint(f"Skip UTC coersion for column '{col}' ({str(df[col].dtype)}).")
1418                    continue
1419                if strip_timezone and ',' not in str(df.dtypes[col]):
1420                    if debug:
1421                        dprint(
1422                            f"Skip UTC coersion (stripped) for column '{col}' "
1423                            f"({str(df[col].dtype)})."
1424                        )
1425                        continue
1426
1427                if debug:
1428                    dprint(
1429                        f"Data type for column '{col}' before timezone coersion: "
1430                        f"{str(df[col].dtype)}"
1431                    )
1432
1433                df[col] = _coerce_timezone(df[col], strip_utc=strip_timezone)
1434                if debug:
1435                    dprint(
1436                        f"Data type for column '{col}' after timezone coersion: "
1437                        f"{str(df[col].dtype)}"
1438                    )
1439
1440    if geometry_cols_types_srids:
1441        geopandas = mrsm.attempt_import('geopandas')
1442        if debug:
1443            dprint(f"Checking for geometry: {list(geometry_cols_types_srids)}")
1444        parsed_geom_cols = []
1445        for col in geometry_cols_types_srids:
1446            if col not in df.columns:
1447                continue
1448            try:
1449                df[col] = attempt_cast_to_geometry(df[col])
1450                parsed_geom_cols.append(col)
1451            except Exception as e:
1452                import traceback
1453                traceback.print_exc()
1454                if debug:
1455                    dprint(f"Unable to parse column '{col}' as geometry:\n{e}")
1456
1457        if parsed_geom_cols:
1458            if debug:
1459                dprint(f"Converting to GeoDataFrame (geometry column: '{parsed_geom_cols[0]}')...")
1460            try:
1461                _, default_srid = geometry_cols_types_srids[parsed_geom_cols[0]]
1462                df = geopandas.GeoDataFrame(df, geometry=parsed_geom_cols[0], crs=default_srid)
1463                for col, (_, srid) in geometry_cols_types_srids.items():
1464                    if srid:
1465                        if debug:
1466                            dprint(f"Setting '{col}' to SRID '{srid}'...")
1467                        _ = df[col].set_crs(srid)
1468                if parsed_geom_cols[0] not in df.columns:
1469                    df.rename_geometry(parsed_geom_cols[0], inplace=True)
1470            except (ValueError, TypeError):
1471                import traceback
1472                dprint(f"Failed to cast to GeoDataFrame:\n{traceback.format_exc()}")
1473
1474    df_dtypes = {c: str(t) for c, t in df.dtypes.items()}
1475    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
1476        if debug:
1477            dprint("Data types match. Exiting enforcement...")
1478        return df
1479
1480    common_dtypes = {}
1481    common_diff_dtypes = {}
1482    for col, typ in pipe_pandas_dtypes.items():
1483        if col in df_dtypes:
1484            common_dtypes[col] = typ
1485            if not are_dtypes_equal(typ, df_dtypes[col]):
1486                common_diff_dtypes[col] = df_dtypes[col]
1487
1488    if debug:
1489        dprint("Common columns with different dtypes:")
1490        pprint(common_diff_dtypes)
1491
1492    detected_dt_cols = {}
1493    for col, typ in common_diff_dtypes.items():
1494        if 'datetime' in typ and 'datetime' in common_dtypes[col]:
1495            df_dtypes[col] = typ
1496            detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col])
1497    for col in detected_dt_cols:
1498        del common_diff_dtypes[col]
1499
1500    if debug:
1501        dprint("Common columns with different dtypes (after dates):")
1502        pprint(common_diff_dtypes)
1503
1504    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
1505        if debug:
1506            dprint(
1507                "The incoming DataFrame has mostly the same types, skipping enforcement."
1508                + "The only detected difference was in the following datetime columns."
1509            )
1510            pprint(detected_dt_cols)
1511        return df
1512
1513    for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items():
1514        previous_typ = common_dtypes[col]
1515        mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ))
1516        explicitly_float = are_dtypes_equal(explicit_dtypes.get(col, 'object'), 'float')
1517        explicitly_int = are_dtypes_equal(explicit_dtypes.get(col, 'object'), 'int')
1518        explicitly_numeric = explicit_dtypes.get(col, 'object').startswith('numeric')
1519        all_nan = (
1520            df[col].isnull().all()
1521            if mixed_numeric_types and coerce_numeric and not (explicitly_float or explicitly_int)
1522            else None
1523        )
1524        cast_to_numeric = explicitly_numeric or (
1525            (
1526                col in df_numeric_cols
1527                or (
1528                    mixed_numeric_types
1529                    and not (explicitly_float or explicitly_int)
1530                    and not all_nan
1531                    and coerce_numeric
1532                )
1533            )
1534        )
1535
1536        if debug and (explicitly_numeric or df_numeric_cols or mixed_numeric_types):
1537            from meerschaum.utils.formatting import make_header
1538            msg = (
1539                make_header(f"Coercing column '{col}' to numeric:", left_pad=0)
1540                + "\n"
1541                + f"  Previous type: {previous_typ}\n"
1542                + f"  Current type: {typ if col not in df_numeric_cols else 'Decimal'}"
1543                + ("\n  Column is explicitly numeric." if explicitly_numeric else "")
1544            ) if cast_to_numeric else (
1545                f"Will not coerce column '{col}' to numeric.\n"
1546                f"  Numeric columns in dataframe: {df_numeric_cols}\n"
1547                f"  Mixed numeric types: {mixed_numeric_types}\n"
1548                f"  Explicitly float: {explicitly_float}\n"
1549                f"  Explicitly int: {explicitly_int}\n"
1550                f"  All NaN: {all_nan}\n"
1551                f"  Coerce numeric: {coerce_numeric}"
1552            )
1553            dprint(msg)
1554
1555        if cast_to_numeric:
1556            common_dtypes[col] = attempt_cast_to_numeric
1557            common_diff_dtypes[col] = attempt_cast_to_numeric
1558
1559    for d in common_diff_dtypes:
1560        t = common_dtypes[d]
1561        if debug:
1562            dprint(f"Casting column {d} to dtype {t}.")
1563        try:
1564            df[d] = (
1565                df[d].apply(t)
1566                if callable(t)
1567                else df[d].astype(t)
1568            )
1569        except Exception as e:
1570            if debug:
1571                dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}\ndf:\n{df}")
1572            if 'int' in str(t).lower():
1573                try:
1574                    df[d] = df[d].astype('float64').astype(t)
1575                except Exception:
1576                    if debug:
1577                        dprint(f"Was unable to convert to float then {t}.")
1578    return df
1579
1580
1581def get_datetime_bound_from_df(
1582    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]],
1583    datetime_column: str,
1584    minimum: bool = True,
1585) -> Union[int, datetime, None]:
1586    """
1587    Return the minimum or maximum datetime (or integer) from a DataFrame.
1588
1589    Parameters
1590    ----------
1591    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]
1592        The DataFrame, list, or dict which contains the range axis.
1593
1594    datetime_column: str
1595        The name of the datetime (or int) column.
1596
1597    minimum: bool
1598        Whether to return the minimum (default) or maximum value.
1599
1600    Returns
1601    -------
1602    The minimum or maximum datetime value in the dataframe, or `None`.
1603    """
1604    from meerschaum.utils.dtypes import to_datetime, value_is_null
1605
1606    if df is None:
1607        return None
1608    if not datetime_column:
1609        return None
1610
1611    def compare(a, b):
1612        if a is None:
1613            return b
1614        if b is None:
1615            return a
1616        if minimum:
1617            return a if a < b else b
1618        return a if a > b else b
1619
1620    if isinstance(df, list):
1621        if len(df) == 0:
1622            return None
1623        best_yet = df[0].get(datetime_column, None)
1624        for doc in df:
1625            val = doc.get(datetime_column, None)
1626            best_yet = compare(best_yet, val)
1627        return best_yet
1628
1629    if isinstance(df, dict):
1630        if datetime_column not in df:
1631            return None
1632        best_yet = df[datetime_column][0]
1633        for val in df[datetime_column]:
1634            best_yet = compare(best_yet, val)
1635        return best_yet
1636
1637    if 'DataFrame' in str(type(df)):
1638        from meerschaum.utils.dtypes import are_dtypes_equal
1639        pandas = mrsm.attempt_import('pandas')
1640        is_dask = 'dask' in df.__module__
1641
1642        if datetime_column not in df.columns:
1643            return None
1644
1645        try:
1646            dt_val = (
1647                df[datetime_column].min(skipna=True)
1648                if minimum
1649                else df[datetime_column].max(skipna=True)
1650            )
1651        except Exception:
1652            dt_val = pandas.NA
1653        if is_dask and dt_val is not None and dt_val is not pandas.NA:
1654            dt_val = dt_val.compute()
1655
1656        return (
1657            to_datetime(dt_val, as_pydatetime=True)
1658            if are_dtypes_equal(str(type(dt_val)), 'datetime')
1659            else (dt_val if not value_is_null(dt_val) else None)
1660        )
1661
1662    return None
1663
1664
1665def get_unique_index_values(
1666    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]],
1667    indices: List[str],
1668) -> Dict[str, List[Any]]:
1669    """
1670    Return a dictionary of the unique index values in a DataFrame.
1671
1672    Parameters
1673    ----------
1674    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]
1675        The dataframe (or list or dict) which contains index values.
1676
1677    indices: List[str]
1678        The list of index columns.
1679
1680    Returns
1681    -------
1682    A dictionary mapping indices to unique values.
1683    """
1684    if df is None:
1685        return {}
1686    if 'dataframe' in str(type(df)).lower():
1687        pandas = mrsm.attempt_import('pandas')
1688        return {
1689            col: list({
1690                (val if val is not pandas.NA else None)
1691                for val in df[col].unique()
1692            })
1693            for col in indices
1694            if col in df.columns
1695        }
1696
1697    unique_indices = defaultdict(lambda: set())
1698    if isinstance(df, list):
1699        for doc in df:
1700            for index in indices:
1701                if index in doc:
1702                    unique_indices[index].add(doc[index])
1703
1704    elif isinstance(df, dict):
1705        for index in indices:
1706            if index in df:
1707                unique_indices[index] = unique_indices[index].union(set(df[index]))
1708
1709    return {key: list(val) for key, val in unique_indices.items()}
1710
1711
1712def df_is_chunk_generator(df: Any) -> bool:
1713    """
1714    Determine whether to treat `df` as a chunk generator.
1715
1716    Note this should only be used in a context where generators are expected,
1717    as it will return `True` for any iterable.
1718
1719    Parameters
1720    ----------
1721    The DataFrame or chunk generator to evaluate.
1722
1723    Returns
1724    -------
1725    A `bool` indicating whether to treat `df` as a generator.
1726    """
1727    return (
1728        not isinstance(df, (dict, list, str))
1729        and 'DataFrame' not in str(type(df))
1730        and isinstance(df, (Generator, Iterable, Iterator))
1731    )
1732
1733
1734def chunksize_to_npartitions(chunksize: Optional[int]) -> int:
1735    """
1736    Return the Dask `npartitions` value for a given `chunksize`.
1737    """
1738    if chunksize == -1:
1739        from meerschaum.config import get_config
1740        chunksize = get_config('system', 'connectors', 'sql', 'chunksize')
1741    if chunksize is None:
1742        return 1
1743    return -1 * chunksize
1744
1745
1746def df_from_literal(
1747    pipe: Optional[mrsm.Pipe] = None,
1748    literal: Optional[str] = None,
1749    debug: bool = False
1750) -> 'pd.DataFrame':
1751    """
1752    Construct a dataframe from a literal value, using the pipe's datetime and value column names.
1753
1754    Parameters
1755    ----------
1756    pipe: Optional['meerschaum.Pipe'], default None
1757        The pipe which will consume the literal value.
1758
1759    Returns
1760    -------
1761    A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
1762    and the literal as the value.
1763    """
1764    from meerschaum.utils.packages import import_pandas
1765    from meerschaum.utils.warnings import error, warn
1766    from meerschaum.utils.debug import dprint
1767    from meerschaum.utils.dtypes import get_current_timestamp
1768
1769    if pipe is None or literal is None:
1770        error("Please provide a Pipe and a literal value")
1771
1772    dt_col = pipe.columns.get(
1773        'datetime',
1774        mrsm.get_config('pipes', 'autotime', 'column_name_if_datetime_missing')
1775    )
1776    val_col = pipe.get_val_column(debug=debug)
1777
1778    val = literal
1779    if isinstance(literal, str):
1780        if debug:
1781            dprint(f"Received literal string: '{literal}'")
1782        import ast
1783        try:
1784            val = ast.literal_eval(literal)
1785        except Exception:
1786            warn(
1787                "Failed to parse value from string:\n" + f"{literal}" +
1788                "\n\nWill cast as a string instead."\
1789            )
1790            val = literal
1791
1792    now = get_current_timestamp(pipe.precision)
1793    pd = import_pandas()
1794    return pd.DataFrame({dt_col: [now], val_col: [val]})
1795
1796
1797def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]:
1798    """
1799    Return the first valid Dask DataFrame partition (if possible).
1800    """
1801    pdf = None
1802    for partition in ddf.partitions:
1803        try:
1804            pdf = partition.compute()
1805        except Exception:
1806            continue
1807        if len(pdf) > 0:
1808            return pdf
1809    _ = mrsm.attempt_import('partd', lazy=False)
1810    return ddf.compute()
1811
1812
1813def query_df(
1814    df: 'pd.DataFrame',
1815    params: Optional[Dict[str, Any]] = None,
1816    begin: Union[datetime, int, None] = None,
1817    end: Union[datetime, int, None] = None,
1818    datetime_column: Optional[str] = None,
1819    select_columns: Optional[List[str]] = None,
1820    omit_columns: Optional[List[str]] = None,
1821    inplace: bool = False,
1822    reset_index: bool = False,
1823    coerce_types: bool = False,
1824    debug: bool = False,
1825) -> 'pd.DataFrame':
1826    """
1827    Query the dataframe with the params dictionary.
1828
1829    Parameters
1830    ----------
1831    df: pd.DataFrame
1832        The DataFrame to query against.
1833
1834    params: Optional[Dict[str, Any]], default None
1835        The parameters dictionary to use for the query.
1836
1837    begin: Union[datetime, int, None], default None
1838        If `begin` and `datetime_column` are provided, only return rows with a timestamp
1839        greater than or equal to this value.
1840
1841    end: Union[datetime, int, None], default None
1842        If `begin` and `datetime_column` are provided, only return rows with a timestamp
1843        less than this value.
1844
1845    datetime_column: Optional[str], default None
1846        A `datetime_column` must be provided to use `begin` and `end`.
1847
1848    select_columns: Optional[List[str]], default None
1849        If provided, only return these columns.
1850
1851    omit_columns: Optional[List[str]], default None
1852        If provided, do not include these columns in the result.
1853
1854    inplace: bool, default False
1855        If `True`, modify the DataFrame inplace rather than creating a new DataFrame.
1856
1857    reset_index: bool, default False
1858        If `True`, reset the index in the resulting DataFrame.
1859
1860    coerce_types: bool, default False
1861        If `True`, cast the dataframe and parameters as strings before querying.
1862
1863    Returns
1864    -------
1865    A Pandas DataFrame query result.
1866    """
1867
1868    def _process_select_columns(_df):
1869        if not select_columns:
1870            return
1871        for col in list(_df.columns):
1872            if col not in select_columns:
1873                del _df[col]
1874
1875    def _process_omit_columns(_df):
1876        if not omit_columns:
1877            return
1878        for col in list(_df.columns):
1879            if col in omit_columns:
1880                del _df[col]
1881
1882    if not params and not begin and not end:
1883        if not inplace:
1884            df = df.copy()
1885        _process_select_columns(df)
1886        _process_omit_columns(df)
1887        return df
1888
1889    from meerschaum.utils.debug import dprint
1890    from meerschaum.utils.misc import get_in_ex_params
1891    from meerschaum.utils.warnings import warn
1892    from meerschaum.utils.dtypes import are_dtypes_equal, value_is_null
1893    dateutil_parser = mrsm.attempt_import('dateutil.parser')
1894    pandas = mrsm.attempt_import('pandas')
1895    NA = pandas.NA
1896
1897    if params:
1898        proto_in_ex_params = get_in_ex_params(params)
1899        for key, (proto_in_vals, proto_ex_vals) in proto_in_ex_params.items():
1900            if proto_ex_vals:
1901                coerce_types = True
1902                break
1903        params = params.copy()
1904        for key, val in {k: v for k, v in params.items()}.items():
1905            if isinstance(val, (list, tuple, set)) or hasattr(val, 'astype'):
1906                if None in val:
1907                    val = [item for item in val if item is not None] + [NA]
1908                    params[key] = val
1909                if coerce_types:
1910                    params[key] = [str(x) for x in val]
1911            else:
1912                if value_is_null(val):
1913                    val = NA
1914                    params[key] = NA
1915                if coerce_types:
1916                    params[key] = str(val)
1917
1918    dtypes = {col: str(typ) for col, typ in df.dtypes.items()}
1919
1920    if inplace:
1921        df.fillna(NA, inplace=True)
1922    else:
1923        df = df.infer_objects().fillna(NA)
1924
1925    if isinstance(begin, str):
1926        begin = dateutil_parser.parse(begin)
1927    if isinstance(end, str):
1928        end = dateutil_parser.parse(end)
1929
1930    if begin is not None or end is not None:
1931        if not datetime_column or datetime_column not in df.columns:
1932            warn(
1933                f"The datetime column '{datetime_column}' is not present in the Dataframe, "
1934                + "ignoring begin and end...",
1935            )
1936            begin, end = None, None
1937
1938    if debug:
1939        dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}")
1940
1941    if datetime_column and (begin is not None or end is not None):
1942        if debug:
1943            dprint("Checking for datetime column compatability.")
1944
1945        from meerschaum.utils.dtypes import coerce_timezone
1946        df_is_dt = are_dtypes_equal(str(df.dtypes[datetime_column]), 'datetime')
1947        begin_is_int = are_dtypes_equal(str(type(begin)), 'int')
1948        end_is_int = are_dtypes_equal(str(type(end)), 'int')
1949
1950        if df_is_dt:
1951            df_tz = (
1952                getattr(df[datetime_column].dt, 'tz', None)
1953                if hasattr(df[datetime_column], 'dt')
1954                else None
1955            )
1956
1957            if begin_is_int:
1958                begin = datetime.fromtimestamp(int(begin), timezone.utc).replace(tzinfo=None)
1959                if debug:
1960                    dprint(f"`begin` will be cast to '{begin}'.")
1961            if end_is_int:
1962                end = datetime.fromtimestamp(int(end), timezone.utc).replace(tzinfo=None)
1963                if debug:
1964                    dprint(f"`end` will be cast to '{end}'.")
1965
1966            begin = coerce_timezone(begin, strip_utc=(df_tz is None)) if begin is not None else None
1967            end = coerce_timezone(end, strip_utc=(df_tz is None)) if begin is not None else None
1968
1969    in_ex_params = get_in_ex_params(params)
1970
1971    masks = [
1972        (
1973            (df[datetime_column] >= begin)
1974            if begin is not None and datetime_column
1975            else True
1976        ) & (
1977            (df[datetime_column] < end)
1978            if end is not None and datetime_column
1979            else True
1980        )
1981    ]
1982
1983    masks.extend([
1984        (
1985            (
1986                (df[col] if not coerce_types else df[col].astype(str)).isin(in_vals)
1987                if in_vals
1988                else True
1989            ) & (
1990                ~(df[col] if not coerce_types else df[col].astype(str)).isin(ex_vals)
1991                if ex_vals
1992                else True
1993            )
1994        )
1995        for col, (in_vals, ex_vals) in in_ex_params.items()
1996        if col in df.columns
1997    ])
1998    query_mask = masks[0]
1999    for mask in masks[1:]:
2000        query_mask = query_mask & mask
2001
2002    original_cols = df.columns
2003
2004    ### NOTE: We must cast bool columns to `boolean[pyarrow]`
2005    ###       to allow for `<NA>` values.
2006    bool_cols = [
2007        col
2008        for col, typ in df.dtypes.items()
2009        if are_dtypes_equal(str(typ), 'bool')
2010    ]
2011    for col in bool_cols:
2012        df[col] = df[col].astype('boolean[pyarrow]')
2013
2014    if not isinstance(query_mask, bool):
2015        df['__mrsm_mask'] = (
2016            query_mask.astype('boolean[pyarrow]')
2017            if hasattr(query_mask, 'astype')
2018            else query_mask
2019        )
2020
2021        if inplace:
2022            df.where(query_mask, other=NA, inplace=True)
2023            df.dropna(how='all', inplace=True)
2024            result_df = df
2025        else:
2026            result_df = df.where(query_mask, other=NA)
2027            result_df.dropna(how='all', inplace=True)
2028
2029    else:
2030        result_df = df
2031
2032    if '__mrsm_mask' in df.columns:
2033        del df['__mrsm_mask']
2034    if '__mrsm_mask' in result_df.columns:
2035        del result_df['__mrsm_mask']
2036
2037    if reset_index:
2038        result_df.reset_index(drop=True, inplace=True)
2039
2040    result_df = enforce_dtypes(
2041        result_df,
2042        dtypes,
2043        safe_copy=False,
2044        debug=debug,
2045        coerce_numeric=False,
2046        coerce_timezone=False,
2047    )
2048
2049    if select_columns == ['*']:
2050        select_columns = None
2051
2052    if not select_columns and not omit_columns:
2053        return result_df[original_cols]
2054
2055    _process_select_columns(result_df)
2056    _process_omit_columns(result_df)
2057
2058    return result_df
2059
2060
2061def to_json(
2062    df: 'pd.DataFrame',
2063    safe_copy: bool = True,
2064    orient: str = 'records',
2065    date_format: str = 'iso',
2066    date_unit: str = 'us',
2067    double_precision: int = 15,
2068    geometry_format: str = 'geojson',
2069    **kwargs: Any
2070) -> str:
2071    """
2072    Serialize the given dataframe as a JSON string.
2073
2074    Parameters
2075    ----------
2076    df: pd.DataFrame
2077        The DataFrame to be serialized.
2078
2079    safe_copy: bool, default True
2080        If `False`, modify the DataFrame inplace.
2081
2082    date_format: str, default 'iso'
2083        The default format for timestamps.
2084
2085    date_unit: str, default 'us'
2086        The precision of the timestamps.
2087
2088    double_precision: int, default 15
2089        The number of decimal places to use when encoding floating point values (maximum 15).
2090
2091    geometry_format: str, default 'geojson'
2092        The serialization format for geometry data.
2093        Accepted values are `geojson`, `wkb_hex`, and `wkt`.
2094
2095    Returns
2096    -------
2097    A JSON string.
2098    """
2099    import warnings
2100    import functools
2101    from meerschaum.utils.packages import import_pandas
2102    from meerschaum.utils.dtypes import (
2103        serialize_bytes,
2104        serialize_decimal,
2105        serialize_geometry,
2106        serialize_date,
2107    )
2108    pd = import_pandas()
2109    uuid_cols = get_uuid_cols(df)
2110    bytes_cols = get_bytes_cols(df)
2111    numeric_cols = get_numeric_cols(df)
2112    date_cols = get_date_cols(df)
2113    geometry_cols = get_geometry_cols(df)
2114    geometry_cols_srids = {
2115        col: int((getattr(df[col].crs, 'srs', '') or '').split(':', maxsplit=1)[-1] or '0')
2116        for col in geometry_cols
2117    } if 'geodataframe' in str(type(df)).lower() else {}
2118    if safe_copy and bool(uuid_cols or bytes_cols or geometry_cols or numeric_cols):
2119        df = df.copy()
2120    if 'geodataframe' in str(type(df)).lower():
2121        geometry_data = {
2122            col: df[col]
2123            for col in geometry_cols
2124        }
2125        df = pd.DataFrame({
2126            col: df[col]
2127            for col in df.columns
2128            if col not in geometry_cols
2129        })
2130        for col in geometry_cols:
2131            df[col] = pd.Series(ob for ob in geometry_data[col])
2132    for col in uuid_cols:
2133        df[col] = df[col].astype(str)
2134    for col in bytes_cols:
2135        df[col] = df[col].apply(serialize_bytes)
2136    for col in numeric_cols:
2137        df[col] = df[col].apply(serialize_decimal)
2138    for col in date_cols:
2139        df[col] = df[col].apply(serialize_date)
2140    with warnings.catch_warnings():
2141        warnings.simplefilter("ignore")
2142        for col in geometry_cols:
2143            srid = geometry_cols_srids.get(col, None) or None
2144            df[col] = pd.Series(
2145                serialize_geometry(val, geometry_format=geometry_format, srid=srid)
2146                for val in df[col]
2147            )
2148    return df.infer_objects().fillna(pd.NA).to_json(
2149        date_format=date_format,
2150        date_unit=date_unit,
2151        double_precision=double_precision,
2152        orient=orient,
2153        **kwargs
2154    )
2155
2156
2157def to_simple_lines(df: 'pd.DataFrame') -> str:
2158    """
2159    Serialize a Pandas Dataframe as lines of simple dictionaries.
2160
2161    Parameters
2162    ----------
2163    df: pd.DataFrame
2164        The dataframe to serialize into simple lines text.
2165
2166    Returns
2167    -------
2168    A string of simple line dictionaries joined by newlines.
2169    """
2170    from meerschaum.utils.misc import to_simple_dict
2171    if df is None or len(df) == 0:
2172        return ''
2173
2174    docs = df.to_dict(orient='records')
2175    return '\n'.join(to_simple_dict(doc) for doc in docs)
2176
2177
2178def parse_simple_lines(data: str) -> 'pd.DataFrame':
2179    """
2180    Parse simple lines text into a DataFrame.
2181
2182    Parameters
2183    ----------
2184    data: str
2185        The simple lines text to parse into a DataFrame.
2186
2187    Returns
2188    -------
2189    A dataframe containing the rows serialized in `data`.
2190    """
2191    from meerschaum.utils.misc import string_to_dict
2192    from meerschaum.utils.packages import import_pandas
2193    pd = import_pandas()
2194    lines = data.splitlines()
2195    try:
2196        docs = [string_to_dict(line) for line in lines]
2197        df = pd.DataFrame(docs)
2198    except Exception:
2199        df = None
2200
2201    if df is None:
2202        raise ValueError("Cannot parse simple lines into a dataframe.")
2203
2204    return df
def add_missing_cols_to_df(df: pandas.DataFrame, dtypes: Dict[str, Any]) -> pandas.DataFrame:
25def add_missing_cols_to_df(
26    df: 'pd.DataFrame',
27    dtypes: Dict[str, Any],
28) -> 'pd.DataFrame':
29    """
30    Add columns from the dtypes dictionary as null columns to a new DataFrame.
31
32    Parameters
33    ----------
34    df: pd.DataFrame
35        The dataframe we should copy and add null columns.
36
37    dtypes:
38        The data types dictionary which may contain keys not present in `df.columns`.
39
40    Returns
41    -------
42    A new `DataFrame` with the keys from `dtypes` added as null columns.
43    If `df.dtypes` is the same as `dtypes`, then return a reference to `df`.
44    NOTE: This will not ensure that dtypes are enforced!
45
46    Examples
47    --------
48    >>> import pandas as pd
49    >>> df = pd.DataFrame([{'a': 1}])
50    >>> dtypes = {'b': 'Int64'}
51    >>> add_missing_cols_to_df(df, dtypes)
52          a  b
53       0  1  <NA>
54    >>> add_missing_cols_to_df(df, dtypes).dtypes
55    a    int64
56    b    Int64
57    dtype: object
58    >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes
59    a    int64
60    dtype: object
61    >>> 
62    """
63    if set(df.columns) == set(dtypes):
64        return df
65
66    from meerschaum.utils.packages import attempt_import
67    from meerschaum.utils.dtypes import to_pandas_dtype
68    pandas = attempt_import('pandas')
69
70    def build_series(dtype: str):
71        return pandas.Series([], dtype=to_pandas_dtype(dtype))
72
73    assign_kwargs = {
74        str(col): build_series(str(typ))
75        for col, typ in dtypes.items()
76        if col not in df.columns
77    }
78    df_with_cols = df.assign(**assign_kwargs)
79    for col in assign_kwargs:
80        df_with_cols[col] = df_with_cols[col].fillna(pandas.NA)
81    return df_with_cols

Add columns from the dtypes dictionary as null columns to a new DataFrame.

Parameters
  • df (pd.DataFrame): The dataframe we should copy and add null columns.
  • dtypes:: The data types dictionary which may contain keys not present in df.columns.
Returns
  • A new DataFrame with the keys from dtypes added as null columns.
  • If df.dtypes is the same as dtypes, then return a reference to df.
  • NOTE (This will not ensure that dtypes are enforced!):
Examples
>>> import pandas as pd
>>> df = pd.DataFrame([{'a': 1}])
>>> dtypes = {'b': 'Int64'}
>>> add_missing_cols_to_df(df, dtypes)
      a  b
   0  1  <NA>
>>> add_missing_cols_to_df(df, dtypes)meerschaum.utils.dtypes
a    int64
b    Int64
dtype: object
>>> add_missing_cols_to_df(df, {'a': 'object'})meerschaum.utils.dtypes
a    int64
dtype: object
>>>
def filter_unseen_df( old_df: pandas.DataFrame, new_df: pandas.DataFrame, safe_copy: bool = True, dtypes: Optional[Dict[str, Any]] = None, include_unchanged_columns: bool = False, coerce_mixed_numerics: bool = True, debug: bool = False) -> pandas.DataFrame:
 84def filter_unseen_df(
 85    old_df: 'pd.DataFrame',
 86    new_df: 'pd.DataFrame',
 87    safe_copy: bool = True,
 88    dtypes: Optional[Dict[str, Any]] = None,
 89    include_unchanged_columns: bool = False,
 90    coerce_mixed_numerics: bool = True,
 91    debug: bool = False,
 92) -> 'pd.DataFrame':
 93    """
 94    Left join two DataFrames to find the newest unseen data.
 95
 96    Parameters
 97    ----------
 98    old_df: 'pd.DataFrame'
 99        The original (target) dataframe. Acts as a filter on the `new_df`.
100
101    new_df: 'pd.DataFrame'
102        The fetched (source) dataframe. Rows that are contained in `old_df` are removed.
103
104    safe_copy: bool, default True
105        If `True`, create a copy before comparing and modifying the dataframes.
106        Setting to `False` may mutate the DataFrames.
107
108    dtypes: Optional[Dict[str, Any]], default None
109        Optionally specify the datatypes of the dataframe.
110
111    include_unchanged_columns: bool, default False
112        If `True`, include columns which haven't changed on rows which have changed.
113
114    coerce_mixed_numerics: bool, default True
115        If `True`, cast mixed integer and float columns between the old and new dataframes into
116        numeric values (`decimal.Decimal`).
117
118    debug: bool, default False
119        Verbosity toggle.
120
121    Returns
122    -------
123    A pandas dataframe of the new, unseen rows in `new_df`.
124
125    Examples
126    --------
127    ```python
128    >>> import pandas as pd
129    >>> df1 = pd.DataFrame({'a': [1,2]})
130    >>> df2 = pd.DataFrame({'a': [2,3]})
131    >>> filter_unseen_df(df1, df2)
132       a
133    0  3
134
135    ```
136
137    """
138    if old_df is None:
139        return new_df
140
141    if safe_copy:
142        old_df = old_df.copy()
143        new_df = new_df.copy()
144
145    import json
146    import functools
147    import traceback
148    from meerschaum.utils.warnings import warn
149    from meerschaum.utils.packages import import_pandas, attempt_import
150    from meerschaum.utils.dtypes import (
151        to_pandas_dtype,
152        are_dtypes_equal,
153        attempt_cast_to_numeric,
154        attempt_cast_to_uuid,
155        attempt_cast_to_bytes,
156        attempt_cast_to_geometry,
157        coerce_timezone,
158        serialize_decimal,
159    )
160    from meerschaum.utils.dtypes.sql import get_numeric_precision_scale
161    pd = import_pandas(debug=debug)
162    is_dask = 'dask' in new_df.__module__
163    if is_dask:
164        pandas = attempt_import('pandas')
165        _ = attempt_import('partd', lazy=False)
166        dd = attempt_import('dask.dataframe')
167        merge = dd.merge
168        NA = pandas.NA
169    else:
170        merge = pd.merge
171        NA = pd.NA
172
173    new_df_dtypes = dict(new_df.dtypes)
174    old_df_dtypes = dict(old_df.dtypes)
175
176    same_cols = set(new_df.columns) == set(old_df.columns)
177    if not same_cols:
178        new_df = add_missing_cols_to_df(new_df, old_df_dtypes)
179        old_df = add_missing_cols_to_df(old_df, new_df_dtypes)
180
181        new_types_missing_from_old = {
182            col: typ
183            for col, typ in new_df_dtypes.items()
184            if col not in old_df_dtypes
185        }
186        old_types_missing_from_new = {
187            col: typ
188            for col, typ in new_df_dtypes.items()
189            if col not in old_df_dtypes
190        }
191        old_df_dtypes.update(new_types_missing_from_old)
192        new_df_dtypes.update(old_types_missing_from_new)
193
194    ### Edge case: two empty lists cast to DFs.
195    elif len(new_df.columns) == 0:
196        return new_df
197
198    try:
199        ### Order matters when checking equality.
200        new_df = new_df[old_df.columns]
201
202    except Exception as e:
203        warn(
204            "Was not able to cast old columns onto new DataFrame. " +
205            f"Are both DataFrames the same shape? Error:\n{e}",
206            stacklevel=3,
207        )
208        return new_df[list(new_df_dtypes.keys())]
209
210    ### assume the old_df knows what it's doing, even if it's technically wrong.
211    if dtypes is None:
212        dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()}
213
214    dtypes = {
215        col: to_pandas_dtype(typ)
216        for col, typ in dtypes.items()
217        if col in new_df_dtypes and col in old_df_dtypes
218    }
219    for col, typ in new_df_dtypes.items():
220        if col not in dtypes:
221            dtypes[col] = typ
222
223    numeric_cols_precisions_scales = {
224        col: get_numeric_precision_scale(None, typ)
225        for col, typ in dtypes.items()
226        if col and str(typ).lower().startswith('numeric')
227    }
228
229    dt_dtypes = {
230        col: typ
231        for col, typ in dtypes.items()
232        if are_dtypes_equal(typ, 'datetime')
233    }
234    non_dt_dtypes = {
235        col: typ
236        for col, typ in dtypes.items()
237        if col not in dt_dtypes
238    }
239
240    cast_non_dt_cols = True
241    try:
242        new_df = new_df.astype(non_dt_dtypes)
243        cast_non_dt_cols = False
244    except Exception as e:
245        warn(
246            f"Was not able to cast the new DataFrame to the given dtypes.\n{e}"
247        )
248
249    cast_dt_cols = True
250    try:
251        for col, typ in dt_dtypes.items():
252            _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime'))
253            strip_utc = (
254                _dtypes_col_dtype.startswith('datetime64')
255                and 'utc' not in _dtypes_col_dtype.lower()
256            )
257            if col in old_df.columns:
258                old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc)
259            if col in new_df.columns:
260                new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc)
261        cast_dt_cols = False
262    except Exception as e:
263        warn(f"Could not cast datetime columns:\n{e}")
264
265    cast_cols = cast_dt_cols or cast_non_dt_cols
266
267    new_numeric_cols_existing = get_numeric_cols(new_df)
268    old_numeric_cols = get_numeric_cols(old_df)
269    for col, typ in {k: v for k, v in dtypes.items()}.items():
270        if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')):
271            new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float')
272            new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int')
273            new_is_numeric = col in new_numeric_cols_existing
274            old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float')
275            old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int')
276            old_is_numeric = col in old_numeric_cols
277
278            if (
279                coerce_mixed_numerics
280                and
281                (new_is_float or new_is_int or new_is_numeric)
282                and
283                (old_is_float or old_is_int or old_is_numeric)
284            ):
285                dtypes[col] = attempt_cast_to_numeric
286                cast_cols = True
287                continue
288
289            ### Fallback to object if the types don't match.
290            warn(
291                f"Detected different types for '{col}' "
292                + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), "
293                + "falling back to 'object'..."
294            )
295            dtypes[col] = 'object'
296            cast_cols = True
297
298    if cast_cols:
299        for col, dtype in dtypes.items():
300            if col in new_df.columns:
301                try:
302                    new_df[col] = (
303                        new_df[col].astype(dtype)
304                        if not callable(dtype)
305                        else new_df[col].apply(dtype)
306                    )
307                except Exception as e:
308                    warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}")
309
310    serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str)
311    new_json_cols = get_json_cols(new_df)
312    old_json_cols = get_json_cols(old_df)
313    json_cols = set(new_json_cols + old_json_cols)
314    for json_col in old_json_cols:
315        old_df[json_col] = old_df[json_col].apply(serializer)
316    for json_col in new_json_cols:
317        new_df[json_col] = new_df[json_col].apply(serializer)
318
319    new_numeric_cols = get_numeric_cols(new_df)
320    numeric_cols = set(new_numeric_cols + old_numeric_cols)
321    for numeric_col in old_numeric_cols:
322        old_df[numeric_col] = old_df[numeric_col].apply(serialize_decimal)
323    for numeric_col in new_numeric_cols:
324        new_df[numeric_col] = new_df[numeric_col].apply(serialize_decimal)
325
326    old_dt_cols = [
327        col
328        for col, typ in old_df.dtypes.items()
329        if are_dtypes_equal(str(typ), 'datetime')
330    ]
331    for col in old_dt_cols:
332        _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime'))
333        strip_utc = (
334            _dtypes_col_dtype.startswith('datetime64')
335            and 'utc' not in _dtypes_col_dtype.lower()
336        )
337        old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc)
338
339    new_dt_cols = [
340        col
341        for col, typ in new_df.dtypes.items()
342        if are_dtypes_equal(str(typ), 'datetime')
343    ]
344    for col in new_dt_cols:
345        _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime'))
346        strip_utc = (
347            _dtypes_col_dtype.startswith('datetime64')
348            and 'utc' not in _dtypes_col_dtype.lower()
349        )
350        new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc)
351
352    old_uuid_cols = get_uuid_cols(old_df)
353    new_uuid_cols = get_uuid_cols(new_df)
354    uuid_cols = set(new_uuid_cols + old_uuid_cols)
355
356    old_bytes_cols = get_bytes_cols(old_df)
357    new_bytes_cols = get_bytes_cols(new_df)
358    bytes_cols = set(new_bytes_cols + old_bytes_cols)
359
360    old_geometry_cols = get_geometry_cols(old_df)
361    new_geometry_cols = get_geometry_cols(new_df)
362    geometry_cols = set(new_geometry_cols + old_geometry_cols)
363
364    na_pattern = r'(?i)^(none|nan|na|nat|natz|<na>)$'
365    joined_df = merge(
366        new_df.infer_objects().replace(na_pattern, pd.NA, regex=True).fillna(NA),
367        old_df.infer_objects().replace(na_pattern, pd.NA, regex=True).fillna(NA),
368        how='left',
369        on=None,
370        indicator=True,
371    )
372    changed_rows_mask = (joined_df['_merge'] == 'left_only')
373    new_cols = list(new_df_dtypes)
374    delta_df = joined_df[new_cols][changed_rows_mask].reset_index(drop=True)
375
376    delta_json_cols = get_json_cols(delta_df)
377    for json_col in json_cols:
378        if (
379            json_col in delta_json_cols
380            or json_col not in delta_df.columns
381        ):
382            continue
383        try:
384            delta_df[json_col] = delta_df[json_col].apply(
385                lambda x: (json.loads(x) if isinstance(x, str) else x)
386            )
387        except Exception:
388            warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}")
389
390    delta_numeric_cols = get_numeric_cols(delta_df)
391    for numeric_col in numeric_cols:
392        if (
393            numeric_col in delta_numeric_cols
394            or numeric_col not in delta_df.columns
395        ):
396            continue
397        try:
398            delta_df[numeric_col] = delta_df[numeric_col].apply(
399                functools.partial(
400                    attempt_cast_to_numeric,
401                    quantize=True,
402                    precision=numeric_cols_precisions_scales.get(numeric_col, (None, None)[0]),
403                    scale=numeric_cols_precisions_scales.get(numeric_col, (None, None)[1]),
404                )
405            )
406        except Exception:
407            warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}")
408
409    delta_uuid_cols = get_uuid_cols(delta_df)
410    for uuid_col in uuid_cols:
411        if (
412            uuid_col in delta_uuid_cols
413            or uuid_col not in delta_df.columns
414        ):
415            continue
416        try:
417            delta_df[uuid_col] = delta_df[uuid_col].apply(attempt_cast_to_uuid)
418        except Exception:
419            warn(f"Unable to parse numeric column '{uuid_col}':\n{traceback.format_exc()}")
420
421    delta_bytes_cols = get_bytes_cols(delta_df)
422    for bytes_col in bytes_cols:
423        if (
424            bytes_col in delta_bytes_cols
425            or bytes_col not in delta_df.columns
426        ):
427            continue
428        try:
429            delta_df[bytes_col] = delta_df[bytes_col].apply(attempt_cast_to_bytes)
430        except Exception:
431            warn(f"Unable to parse bytes column '{bytes_col}':\n{traceback.format_exc()}")
432
433    delta_geometry_cols = get_geometry_cols(delta_df)
434    for geometry_col in geometry_cols:
435        if (
436            geometry_col in delta_geometry_cols
437            or geometry_col not in delta_df.columns
438        ):
439            continue
440        try:
441            delta_df[geometry_col] = attempt_cast_to_geometry(delta_df[geometry_col])
442        except Exception:
443            warn(f"Unable to parse geometry column '{geometry_col}':\n{traceback.format_exc()}")
444
445    return delta_df

Left join two DataFrames to find the newest unseen data.

Parameters
  • old_df ('pd.DataFrame'): The original (target) dataframe. Acts as a filter on the new_df.
  • new_df ('pd.DataFrame'): The fetched (source) dataframe. Rows that are contained in old_df are removed.
  • safe_copy (bool, default True): If True, create a copy before comparing and modifying the dataframes. Setting to False may mutate the DataFrames.
  • dtypes (Optional[Dict[str, Any]], default None): Optionally specify the datatypes of the dataframe.
  • include_unchanged_columns (bool, default False): If True, include columns which haven't changed on rows which have changed.
  • coerce_mixed_numerics (bool, default True): If True, cast mixed integer and float columns between the old and new dataframes into numeric values (decimal.Decimal).
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pandas dataframe of the new, unseen rows in new_df.
Examples
>>> import pandas as pd
>>> df1 = pd.DataFrame({'a': [1,2]})
>>> df2 = pd.DataFrame({'a': [2,3]})
>>> filter_unseen_df(df1, df2)
   a
0  3
def parse_df_datetimes( df: pandas.DataFrame, ignore_cols: Optional[Iterable[str]] = None, strip_timezone: bool = False, chunksize: Optional[int] = None, dtype_backend: str = 'numpy_nullable', ignore_all: bool = False, precision_unit: Optional[str] = None, coerce_utc: bool = True, debug: bool = False) -> pandas.DataFrame:
448def parse_df_datetimes(
449    df: 'pd.DataFrame',
450    ignore_cols: Optional[Iterable[str]] = None,
451    strip_timezone: bool = False,
452    chunksize: Optional[int] = None,
453    dtype_backend: str = 'numpy_nullable',
454    ignore_all: bool = False,
455    precision_unit: Optional[str] = None,
456    coerce_utc: bool = True,
457    debug: bool = False,
458) -> 'pd.DataFrame':
459    """
460    Parse a pandas DataFrame for datetime columns and cast as datetimes.
461
462    Parameters
463    ----------
464    df: pd.DataFrame
465        The pandas DataFrame to parse.
466
467    ignore_cols: Optional[Iterable[str]], default None
468        If provided, do not attempt to coerce these columns as datetimes.
469
470    strip_timezone: bool, default False
471        If `True`, remove the UTC `tzinfo` property.
472
473    chunksize: Optional[int], default None
474        If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe.
475
476    dtype_backend: str, default 'numpy_nullable'
477        If `df` is not a DataFrame and new one needs to be constructed,
478        use this as the datatypes backend.
479        Accepted values are 'numpy_nullable' and 'pyarrow'.
480
481    ignore_all: bool, default False
482        If `True`, do not attempt to cast any columns to datetimes.
483
484    precision_unit: Optional[str], default None
485        If provided, enforce the given precision on the coerced datetime columns.
486
487    coerce_utc: bool, default True
488        Coerce the datetime columns to UTC (see `meerschaum.utils.dtypes.to_datetime()`).
489
490    debug: bool, default False
491        Verbosity toggle.
492
493    Returns
494    -------
495    A new pandas DataFrame with the determined datetime columns
496    (usually ISO strings) cast as datetimes.
497
498    Examples
499    --------
500    ```python
501    >>> import pandas as pd
502    >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 
503    >>> df.dtypes
504    a    object
505    dtype: object
506    >>> df2 = parse_df_datetimes(df)
507    >>> df2.dtypes
508    a    datetime64[us, UTC]
509    dtype: object
510
511    ```
512
513    """
514    from meerschaum.utils.packages import import_pandas, attempt_import
515    from meerschaum.utils.debug import dprint
516    from meerschaum.utils.warnings import warn
517    from meerschaum.utils.misc import items_str
518    from meerschaum.utils.dtypes import to_datetime, MRSM_PD_DTYPES
519    import traceback
520
521    pd = import_pandas()
522    pandas = attempt_import('pandas')
523    pd_name = pd.__name__
524    using_dask = 'dask' in pd_name
525    df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__)
526    dask_dataframe = None
527    if using_dask or df_is_dask:
528        npartitions = chunksize_to_npartitions(chunksize)
529        dask_dataframe = attempt_import('dask.dataframe')
530
531    ### if df is a dict, build DataFrame
532    if isinstance(df, pandas.DataFrame):
533        pdf = df
534    elif df_is_dask and isinstance(df, dask_dataframe.DataFrame):
535        pdf = get_first_valid_dask_partition(df)
536    else:
537        if debug:
538            dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...")
539
540        if using_dask:
541            if isinstance(df, list):
542                keys = set()
543                for doc in df:
544                    for key in doc:
545                        keys.add(key)
546                df = pd.DataFrame.from_dict(
547                    {
548                        k: [
549                            doc.get(k, None)
550                            for doc in df
551                        ] for k in keys
552                    },
553                    npartitions=npartitions,
554                )
555            elif isinstance(df, dict):
556                df = pd.DataFrame.from_dict(df, npartitions=npartitions)
557            elif 'pandas.core.frame.DataFrame' in str(type(df)):
558                df = pd.from_pandas(df, npartitions=npartitions)
559            else:
560                raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.")
561            pandas = attempt_import('pandas')
562            pdf = get_first_valid_dask_partition(df)
563
564        else:
565            df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend)
566            pdf = df
567
568    ### skip parsing if DataFrame is empty
569    if len(pdf) == 0:
570        if debug:
571            dprint("df is empty. Returning original DataFrame without casting datetime columns...")
572        return df
573
574    ignore_cols = set(
575        (ignore_cols or []) + [
576            col
577            for col, dtype in pdf.dtypes.items() 
578            if 'datetime' in str(dtype)
579        ]
580    )
581    cols_to_inspect = [
582        col
583        for col in pdf.columns
584        if col not in ignore_cols
585    ] if not ignore_all else []
586
587    if len(cols_to_inspect) == 0:
588        if debug:
589            dprint("All columns are ignored, skipping datetime detection...")
590        return df.infer_objects().fillna(pandas.NA)
591
592    ### apply regex to columns to determine which are ISO datetimes
593    iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+'
594    dt_mask = pdf[cols_to_inspect].astype(str).apply(
595        lambda s: s.str.match(iso_dt_regex).all()
596    )
597
598    ### list of datetime column names
599    datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]]
600    if not datetime_cols:
601        if debug:
602            dprint("No columns detected as datetimes, returning...")
603        return df.infer_objects().fillna(pandas.NA)
604
605    if debug:
606        dprint("Converting columns to datetimes: " + str(datetime_cols))
607
608    def _parse_to_datetime(x):
609        return to_datetime(x, precision_unit=precision_unit, coerce_utc=coerce_utc)
610
611    try:
612        if not using_dask:
613            df[datetime_cols] = df[datetime_cols].apply(_parse_to_datetime)
614        else:
615            df[datetime_cols] = df[datetime_cols].apply(
616                _parse_to_datetime,
617                utc=True,
618                axis=1,
619                meta={
620                    col: MRSM_PD_DTYPES['datetime']
621                    for col in datetime_cols
622                }
623            )
624    except Exception:
625        warn(
626            f"Unable to apply `to_datetime()` to {items_str(datetime_cols)}:\n"
627            + f"{traceback.format_exc()}"
628        )
629
630    if strip_timezone:
631        for dt in datetime_cols:
632            try:
633                df[dt] = df[dt].dt.tz_localize(None)
634            except Exception:
635                warn(
636                    f"Unable to convert column '{dt}' to naive datetime:\n"
637                    + f"{traceback.format_exc()}"
638                )
639
640    return df.fillna(pandas.NA)

Parse a pandas DataFrame for datetime columns and cast as datetimes.

Parameters
  • df (pd.DataFrame): The pandas DataFrame to parse.
  • ignore_cols (Optional[Iterable[str]], default None): If provided, do not attempt to coerce these columns as datetimes.
  • strip_timezone (bool, default False): If True, remove the UTC tzinfo property.
  • chunksize (Optional[int], default None): If the pandas implementation is 'dask', use this chunksize for the distributed dataframe.
  • dtype_backend (str, default 'numpy_nullable'): If df is not a DataFrame and new one needs to be constructed, use this as the datatypes backend. Accepted values are 'numpy_nullable' and 'pyarrow'.
  • ignore_all (bool, default False): If True, do not attempt to cast any columns to datetimes.
  • precision_unit (Optional[str], default None): If provided, enforce the given precision on the coerced datetime columns.
  • coerce_utc (bool, default True): Coerce the datetime columns to UTC (see meerschaum.utils.dtypes.to_datetime()).
  • debug (bool, default False): Verbosity toggle.
Returns
  • A new pandas DataFrame with the determined datetime columns
  • (usually ISO strings) cast as datetimes.
Examples
>>> import pandas as pd
>>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 
>>> df.dtypes
a    object
dtype: object
>>> df2 = parse_df_datetimes(df)
>>> df2.dtypes
a    datetime64[us, UTC]
dtype: object
def get_unhashable_cols(df: pandas.DataFrame) -> List[str]:
643def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]:
644    """
645    Get the columns which contain unhashable objects from a Pandas DataFrame.
646
647    Parameters
648    ----------
649    df: pd.DataFrame
650        The DataFrame which may contain unhashable objects.
651
652    Returns
653    -------
654    A list of columns.
655    """
656    if df is None:
657        return []
658    if len(df) == 0:
659        return []
660
661    is_dask = 'dask' in df.__module__
662    if is_dask:
663        from meerschaum.utils.packages import attempt_import
664        pandas = attempt_import('pandas')
665        df = pandas.DataFrame(get_first_valid_dask_partition(df))
666    return [
667        col for col, val in df.iloc[0].items()
668        if not isinstance(val, Hashable)
669    ]

Get the columns which contain unhashable objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
  • A list of columns.
def get_json_cols(df: pandas.DataFrame) -> List[str]:
672def get_json_cols(df: 'pd.DataFrame') -> List[str]:
673    """
674    Get the columns which contain unhashable objects from a Pandas DataFrame.
675
676    Parameters
677    ----------
678    df: pd.DataFrame
679        The DataFrame which may contain unhashable objects.
680
681    Returns
682    -------
683    A list of columns to be encoded as JSON.
684    """
685    if df is None:
686        return []
687
688    is_dask = 'dask' in df.__module__ if hasattr(df, '__module__') else False
689    if is_dask:
690        df = get_first_valid_dask_partition(df)
691
692    if len(df) == 0:
693        return []
694
695    cols_indices = {
696        col: df[col].first_valid_index()
697        for col in df.columns
698    }
699    return [
700        col
701        for col, ix in cols_indices.items()
702        if (
703            ix is not None
704            and isinstance(df.loc[ix][col], (dict, list))
705        )
706    ]

Get the columns which contain unhashable objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
  • A list of columns to be encoded as JSON.
def get_numeric_cols(df: pandas.DataFrame) -> List[str]:
709def get_numeric_cols(df: 'pd.DataFrame') -> List[str]:
710    """
711    Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame.
712
713    Parameters
714    ----------
715    df: pd.DataFrame
716        The DataFrame which may contain decimal objects.
717
718    Returns
719    -------
720    A list of columns to treat as numerics.
721    """
722    if df is None:
723        return []
724    from decimal import Decimal
725    is_dask = 'dask' in df.__module__
726    if is_dask:
727        df = get_first_valid_dask_partition(df)
728
729    if len(df) == 0:
730        return []
731
732    cols_indices = {
733        col: df[col].first_valid_index()
734        for col in df.columns
735    }
736    return [
737        col
738        for col, ix in cols_indices.items()
739        if (
740            ix is not None
741            and
742            isinstance(df.loc[ix][col], Decimal)
743        )
744    ]

Get the columns which contain decimal.Decimal objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain decimal objects.
Returns
  • A list of columns to treat as numerics.
def get_bool_cols(df: pandas.DataFrame) -> List[str]:
747def get_bool_cols(df: 'pd.DataFrame') -> List[str]:
748    """
749    Get the columns which contain `bool` objects from a Pandas DataFrame.
750
751    Parameters
752    ----------
753    df: pd.DataFrame
754        The DataFrame which may contain bools.
755
756    Returns
757    -------
758    A list of columns to treat as bools.
759    """
760    if df is None:
761        return []
762
763    is_dask = 'dask' in df.__module__
764    if is_dask:
765        df = get_first_valid_dask_partition(df)
766
767    if len(df) == 0:
768        return []
769
770    from meerschaum.utils.dtypes import are_dtypes_equal
771
772    return [
773        col
774        for col, typ in df.dtypes.items()
775        if are_dtypes_equal(str(typ), 'bool')
776    ]

Get the columns which contain bool objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain bools.
Returns
  • A list of columns to treat as bools.
def get_uuid_cols(df: pandas.DataFrame) -> List[str]:
779def get_uuid_cols(df: 'pd.DataFrame') -> List[str]:
780    """
781    Get the columns which contain `uuid.UUID` objects from a Pandas DataFrame.
782
783    Parameters
784    ----------
785    df: pd.DataFrame
786        The DataFrame which may contain UUID objects.
787
788    Returns
789    -------
790    A list of columns to treat as UUIDs.
791    """
792    if df is None:
793        return []
794    from uuid import UUID
795    is_dask = 'dask' in df.__module__
796    if is_dask:
797        df = get_first_valid_dask_partition(df)
798
799    if len(df) == 0:
800        return []
801
802    cols_indices = {
803        col: df[col].first_valid_index()
804        for col in df.columns
805    }
806    return [
807        col
808        for col, ix in cols_indices.items()
809        if (
810            ix is not None
811            and
812            isinstance(df.loc[ix][col], UUID)
813        )
814    ]

Get the columns which contain uuid.UUID objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain UUID objects.
Returns
  • A list of columns to treat as UUIDs.
def get_datetime_cols( df: pandas.DataFrame, timezone_aware: bool = True, timezone_naive: bool = True, with_tz_precision: bool = False) -> Union[List[str], Dict[str, Tuple[Optional[str], str]]]:
817def get_datetime_cols(
818    df: 'pd.DataFrame',
819    timezone_aware: bool = True,
820    timezone_naive: bool = True,
821    with_tz_precision: bool = False,
822) -> Union[List[str], Dict[str, Tuple[Union[str, None], str]]]:
823    """
824    Get the columns which contain `datetime` or `Timestamp` objects from a Pandas DataFrame.
825
826    Parameters
827    ----------
828    df: pd.DataFrame
829        The DataFrame which may contain `datetime` or `Timestamp` objects.
830
831    timezone_aware: bool, default True
832        If `True`, include timezone-aware datetime columns.
833
834    timezone_naive: bool, default True
835        If `True`, include timezone-naive datetime columns.
836
837    with_tz_precision: bool, default False
838        If `True`, return a dictionary mapping column names to tuples in the form
839        `(timezone, precision)`.
840
841    Returns
842    -------
843    A list of columns to treat as datetimes, or a dictionary of columns to tz+precision tuples
844    (if `with_tz_precision` is `True`).
845    """
846    if not timezone_aware and not timezone_naive:
847        raise ValueError("`timezone_aware` and `timezone_naive` cannot both be `False`.")
848
849    if df is None:
850        return [] if not with_tz_precision else {}
851
852    from meerschaum.utils.dtypes import are_dtypes_equal, MRSM_PRECISION_UNITS_ALIASES
853    is_dask = 'dask' in df.__module__
854    if is_dask:
855        df = get_first_valid_dask_partition(df)
856   
857    def get_tz_precision_from_dtype(dtype: str) -> Tuple[Union[str, None], str]:
858        """
859        Extract the tz + precision tuple from a dtype string.
860        """
861        meta_str = dtype.split('[', maxsplit=1)[-1].rstrip(']').replace(' ', '')
862        tz = (
863            None
864            if ',' not in meta_str
865            else meta_str.split(',', maxsplit=1)[-1]
866        )
867        precision_abbreviation = (
868            meta_str
869            if ',' not in meta_str
870            else meta_str.split(',')[0]
871        )
872        precision = MRSM_PRECISION_UNITS_ALIASES[precision_abbreviation]
873        return tz, precision
874
875    def get_tz_precision_from_datetime(dt: datetime) -> Tuple[Union[str, None], str]:
876        """
877        Return the tz + precision tuple from a Python datetime object.
878        """
879        return dt.tzname(), 'microsecond'
880
881    known_dt_cols_types = {
882        col: str(typ)
883        for col, typ in df.dtypes.items()
884        if are_dtypes_equal('datetime', str(typ))
885    }
886 
887    known_dt_cols_tuples = {
888        col: get_tz_precision_from_dtype(typ)
889        for col, typ in known_dt_cols_types.items()
890    }
891
892    if len(df) == 0:
893        return (
894            list(known_dt_cols_types)
895            if not with_tz_precision
896            else known_dt_cols_tuples
897        )
898
899    cols_indices = {
900        col: df[col].first_valid_index()
901        for col in df.columns
902        if col not in known_dt_cols_types
903    }
904    pydt_cols_tuples = {
905        col: get_tz_precision_from_datetime(sample_val)
906        for col, ix in cols_indices.items()
907        if (
908            ix is not None
909            and
910            isinstance((sample_val := df.loc[ix][col]), datetime)
911        )
912    }
913
914    dt_cols_tuples = {
915        **known_dt_cols_tuples,
916        **pydt_cols_tuples
917    }
918
919    all_dt_cols_tuples = {
920        col: dt_cols_tuples[col]
921        for col in df.columns
922        if col in dt_cols_tuples
923    }
924    if timezone_aware and timezone_naive:
925        return (
926            list(all_dt_cols_tuples)
927            if not with_tz_precision
928            else all_dt_cols_tuples
929        )
930
931    known_timezone_aware_dt_cols = [
932        col
933        for col in known_dt_cols_types
934        if getattr(df[col], 'tz', None) is not None
935    ]
936    timezone_aware_pydt_cols_tuples = {
937        col: (tz, precision)
938        for col, (tz, precision) in pydt_cols_tuples.items()
939        if df.loc[cols_indices[col]][col].tzinfo is not None
940    }
941    timezone_aware_dt_cols_set = set(
942        known_timezone_aware_dt_cols + list(timezone_aware_pydt_cols_tuples)
943    )
944    timezone_aware_cols_tuples = {
945        col: (tz, precision)
946        for col, (tz, precision) in all_dt_cols_tuples.items()
947        if col in timezone_aware_dt_cols_set
948    }
949    timezone_naive_cols_tuples = {
950        col: (tz, precision)
951        for col, (tz, precision) in all_dt_cols_tuples.items()
952        if col not in timezone_aware_dt_cols_set
953    }
954
955    if timezone_aware:
956        return (
957            list(timezone_aware_cols_tuples)
958            if not with_tz_precision
959            else timezone_aware_cols_tuples
960        )
961
962    return (
963        list(timezone_naive_cols_tuples)
964        if not with_tz_precision
965        else timezone_naive_cols_tuples
966    )

Get the columns which contain datetime or Timestamp objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain datetime or Timestamp objects.
  • timezone_aware (bool, default True): If True, include timezone-aware datetime columns.
  • timezone_naive (bool, default True): If True, include timezone-naive datetime columns.
  • with_tz_precision (bool, default False): If True, return a dictionary mapping column names to tuples in the form (timezone, precision).
Returns
  • A list of columns to treat as datetimes, or a dictionary of columns to tz+precision tuples
  • (if with_tz_precision is True).
def get_datetime_cols_types(df: pandas.DataFrame) -> Dict[str, str]:
 969def get_datetime_cols_types(df: 'pd.DataFrame') -> Dict[str, str]:
 970    """
 971    Return a dictionary mapping datetime columns to specific types strings.
 972
 973    Parameters
 974    ----------
 975    df: pd.DataFrame
 976        The DataFrame which may contain datetime columns.
 977
 978    Returns
 979    -------
 980    A dictionary mapping the datetime columns' names to dtype strings
 981    (containing timezone and precision metadata).
 982
 983    Examples
 984    --------
 985    >>> from datetime import datetime, timezone
 986    >>> import pandas as pd
 987    >>> df = pd.DataFrame({'dt_tz_aware': [datetime(2025, 1, 1, tzinfo=timezone.utc)]})
 988    >>> get_datetime_cols_types(df)
 989    {'dt_tz_aware': 'datetime64[us, UTC]'}
 990    >>> df = pd.DataFrame({'distant_dt': [datetime(1, 1, 1)]})
 991    >>> get_datetime_cols_types(df)
 992    {'distant_dt': 'datetime64[us]'}
 993    >>> df = pd.DataFrame({'dt_second': datetime(2025, 1, 1)})
 994    >>> df['dt_second'] = df['dt_second'].astype('datetime64[s]')
 995    >>> get_datetime_cols_types(df)
 996    {'dt_second': 'datetime64[s]'}
 997    """
 998    from meerschaum.utils.dtypes import MRSM_PRECISION_UNITS_ABBREVIATIONS
 999    dt_cols_tuples = get_datetime_cols(df, with_tz_precision=True)
1000    if not dt_cols_tuples:
1001        return {}
1002
1003    return {
1004        col: (
1005            f"datetime64[{MRSM_PRECISION_UNITS_ABBREVIATIONS[precision]}]"
1006            if tz is None
1007            else f"datetime64[{MRSM_PRECISION_UNITS_ABBREVIATIONS[precision]}, {tz}]"
1008        )
1009        for col, (tz, precision) in dt_cols_tuples.items()
1010    }

Return a dictionary mapping datetime columns to specific types strings.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain datetime columns.
Returns
  • A dictionary mapping the datetime columns' names to dtype strings
  • (containing timezone and precision metadata).
Examples
>>> from datetime import datetime, timezone
>>> import pandas as pd
>>> df = pd.DataFrame({'dt_tz_aware': [datetime(2025, 1, 1, tzinfo=timezone.utc)]})
>>> get_datetime_cols_types(df)
{'dt_tz_aware': 'datetime64[us, UTC]'}
>>> df = pd.DataFrame({'distant_dt': [datetime(1, 1, 1)]})
>>> get_datetime_cols_types(df)
{'distant_dt': 'datetime64[us]'}
>>> df = pd.DataFrame({'dt_second': datetime(2025, 1, 1)})
>>> df['dt_second'] = df['dt_second'].astype('datetime64[s]')
>>> get_datetime_cols_types(df)
{'dt_second': 'datetime64[s]'}
def get_date_cols(df: pandas.DataFrame) -> List[str]:
1013def get_date_cols(df: 'pd.DataFrame') -> List[str]:
1014    """
1015    Get the `date` columns from a Pandas DataFrame.
1016
1017    Parameters
1018    ----------
1019    df: pd.DataFrame
1020        The DataFrame which may contain dates.
1021
1022    Returns
1023    -------
1024    A list of columns to treat as dates.
1025    """
1026    from meerschaum.utils.dtypes import are_dtypes_equal
1027    if df is None:
1028        return []
1029
1030    is_dask = 'dask' in df.__module__
1031    if is_dask:
1032        df = get_first_valid_dask_partition(df)
1033
1034    known_date_cols = [
1035        col
1036        for col, typ in df.dtypes.items()
1037        if are_dtypes_equal(typ, 'date')
1038    ]
1039
1040    if len(df) == 0:
1041        return known_date_cols
1042
1043    cols_indices = {
1044        col: df[col].first_valid_index()
1045        for col in df.columns
1046        if col not in known_date_cols
1047    }
1048    object_date_cols = [
1049        col
1050        for col, ix in cols_indices.items()
1051        if (
1052            ix is not None
1053            and isinstance(df.loc[ix][col], date)
1054        )
1055    ]
1056
1057    all_date_cols = set(known_date_cols + object_date_cols)
1058
1059    return [
1060        col
1061        for col in df.columns
1062        if col in all_date_cols
1063    ]

Get the date columns from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain dates.
Returns
  • A list of columns to treat as dates.
def get_bytes_cols(df: pandas.DataFrame) -> List[str]:
1066def get_bytes_cols(df: 'pd.DataFrame') -> List[str]:
1067    """
1068    Get the columns which contain bytes strings from a Pandas DataFrame.
1069
1070    Parameters
1071    ----------
1072    df: pd.DataFrame
1073        The DataFrame which may contain bytes strings.
1074
1075    Returns
1076    -------
1077    A list of columns to treat as bytes.
1078    """
1079    if df is None:
1080        return []
1081
1082    is_dask = 'dask' in df.__module__
1083    if is_dask:
1084        df = get_first_valid_dask_partition(df)
1085
1086    known_bytes_cols = [
1087        col
1088        for col, typ in df.dtypes.items()
1089        if str(typ) == 'binary[pyarrow]'
1090    ]
1091
1092    if len(df) == 0:
1093        return known_bytes_cols
1094
1095    cols_indices = {
1096        col: df[col].first_valid_index()
1097        for col in df.columns
1098        if col not in known_bytes_cols
1099    }
1100    object_bytes_cols = [
1101        col
1102        for col, ix in cols_indices.items()
1103        if (
1104            ix is not None
1105            and isinstance(df.loc[ix][col], bytes)
1106        )
1107    ]
1108
1109    all_bytes_cols = set(known_bytes_cols + object_bytes_cols)
1110
1111    return [
1112        col
1113        for col in df.columns
1114        if col in all_bytes_cols
1115    ]

Get the columns which contain bytes strings from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain bytes strings.
Returns
  • A list of columns to treat as bytes.
def get_geometry_cols( df: pandas.DataFrame, with_types_srids: bool = False) -> Union[List[str], Dict[str, Any]]:
1118def get_geometry_cols(
1119    df: 'pd.DataFrame',
1120    with_types_srids: bool = False,
1121) -> Union[List[str], Dict[str, Any]]:
1122    """
1123    Get the columns which contain shapely objects from a Pandas DataFrame.
1124
1125    Parameters
1126    ----------
1127    df: pd.DataFrame
1128        The DataFrame which may contain bytes strings.
1129
1130    with_types_srids: bool, default False
1131        If `True`, return a dictionary mapping columns to geometry types and SRIDs.
1132
1133    Returns
1134    -------
1135    A list of columns to treat as `geometry`.
1136    If `with_types_srids`, return a dictionary mapping columns to tuples in the form (type, SRID).
1137    """
1138    if df is None:
1139        return [] if not with_types_srids else {}
1140
1141    is_dask = 'dask' in df.__module__
1142    if is_dask:
1143        df = get_first_valid_dask_partition(df)
1144
1145    if len(df) == 0:
1146        return [] if not with_types_srids else {}
1147
1148    cols_indices = {
1149        col: df[col].first_valid_index()
1150        for col in df.columns
1151    }
1152    geo_cols = [
1153        col
1154        for col, ix in cols_indices.items()
1155        if (
1156            ix is not None
1157            and
1158            'shapely' in str(type(df.loc[ix][col]))
1159        )
1160    ]
1161    if not with_types_srids:
1162        return geo_cols
1163
1164    gpd = mrsm.attempt_import('geopandas', lazy=False)
1165    geo_cols_types_srids = {}
1166    for col in geo_cols:
1167        try:
1168            sample_geo_series = gpd.GeoSeries(df[col], crs=None)
1169            geometry_types = {
1170                geom.geom_type
1171                for geom in sample_geo_series
1172                if hasattr(geom, 'geom_type')
1173            }
1174            geometry_has_z = any(getattr(geom, 'has_z', False) for geom in sample_geo_series)
1175            srid = (
1176                (
1177                    sample_geo_series.crs.sub_crs_list[0].to_epsg()
1178                    if sample_geo_series.crs.is_compound
1179                    else sample_geo_series.crs.to_epsg()
1180                )
1181                if sample_geo_series.crs
1182                else 0
1183            )
1184            geometry_type = list(geometry_types)[0] if len(geometry_types) == 1 else 'geometry'
1185            if geometry_type != 'geometry' and geometry_has_z:
1186                geometry_type = geometry_type + 'Z'
1187        except Exception:
1188            srid = 0
1189            geometry_type = 'geometry'
1190        geo_cols_types_srids[col] = (geometry_type, srid)
1191
1192    return geo_cols_types_srids

Get the columns which contain shapely objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain bytes strings.
  • with_types_srids (bool, default False): If True, return a dictionary mapping columns to geometry types and SRIDs.
Returns
  • A list of columns to treat as geometry.
  • If with_types_srids, return a dictionary mapping columns to tuples in the form (type, SRID).
def get_geometry_cols_types(df: pandas.DataFrame) -> Dict[str, str]:
1195def get_geometry_cols_types(df: 'pd.DataFrame') -> Dict[str, str]:
1196    """
1197    Return a dtypes dictionary mapping columns to specific geometry types (type, srid).
1198    """
1199    geometry_cols_types_srids = get_geometry_cols(df, with_types_srids=True)
1200    new_cols_types = {}
1201    for col, (geometry_type, srid) in geometry_cols_types_srids.items():
1202        new_dtype = "geometry"
1203        modifier = ""
1204        if not srid and geometry_type.lower() == 'geometry':
1205            new_cols_types[col] = new_dtype
1206            continue
1207
1208        modifier = "["
1209        if geometry_type.lower() != 'geometry':
1210            modifier += f"{geometry_type}"
1211
1212        if srid:
1213            if modifier != '[':
1214                modifier += ", "
1215            modifier += f"{srid}"
1216        modifier += "]"
1217        new_cols_types[col] = f"{new_dtype}{modifier}"
1218    return new_cols_types

Return a dtypes dictionary mapping columns to specific geometry types (type, srid).

def get_special_cols(df: pandas.DataFrame) -> Dict[str, str]:
1221def get_special_cols(df: 'pd.DataFrame') -> Dict[str, str]:
1222    """
1223    Return a dtypes dictionary mapping special columns to their dtypes.
1224    """
1225    return {
1226        **{col: 'json' for col in get_json_cols(df)},
1227        **{col: 'uuid' for col in get_uuid_cols(df)},
1228        **{col: 'bytes' for col in get_bytes_cols(df)},
1229        **{col: 'bool' for col in get_bool_cols(df)},
1230        **{col: 'numeric' for col in get_numeric_cols(df)},
1231        **{col: 'date' for col in get_date_cols(df)},
1232        **get_datetime_cols_types(df),
1233        **get_geometry_cols_types(df),
1234    }

Return a dtypes dictionary mapping special columns to their dtypes.

def enforce_dtypes( df: pandas.DataFrame, dtypes: Dict[str, str], explicit_dtypes: Optional[Dict[str, str]] = None, safe_copy: bool = True, coerce_numeric: bool = False, coerce_timezone: bool = True, strip_timezone: bool = False, debug: bool = False) -> pandas.DataFrame:
1237def enforce_dtypes(
1238    df: 'pd.DataFrame',
1239    dtypes: Dict[str, str],
1240    explicit_dtypes: Optional[Dict[str, str]] = None,
1241    safe_copy: bool = True,
1242    coerce_numeric: bool = False,
1243    coerce_timezone: bool = True,
1244    strip_timezone: bool = False,
1245    debug: bool = False,
1246) -> 'pd.DataFrame':
1247    """
1248    Enforce the `dtypes` dictionary on a DataFrame.
1249
1250    Parameters
1251    ----------
1252    df: pd.DataFrame
1253        The DataFrame on which to enforce dtypes.
1254
1255    dtypes: Dict[str, str]
1256        The data types to attempt to enforce on the DataFrame.
1257
1258    explicit_dtypes: Optional[Dict[str, str]], default None
1259        If provided, automatic dtype coersion will respect explicitly configured
1260        dtypes (`int`, `float`, `numeric`).
1261
1262    safe_copy: bool, default True
1263        If `True`, create a copy before comparing and modifying the dataframes.
1264        Setting to `False` may mutate the DataFrames.
1265        See `meerschaum.utils.dataframe.filter_unseen_df`.
1266
1267    coerce_numeric: bool, default False
1268        If `True`, convert float and int collisions to numeric.
1269
1270    coerce_timezone: bool, default True
1271        If `True`, convert datetimes to UTC.
1272
1273    strip_timezone: bool, default False
1274        If `coerce_timezone` and `strip_timezone` are `True`,
1275        remove timezone information from datetimes.
1276
1277    debug: bool, default False
1278        Verbosity toggle.
1279
1280    Returns
1281    -------
1282    The Pandas DataFrame with the types enforced.
1283    """
1284    import json
1285    import functools
1286    from meerschaum.utils.debug import dprint
1287    from meerschaum.utils.formatting import pprint
1288    from meerschaum.utils.dtypes import (
1289        are_dtypes_equal,
1290        to_pandas_dtype,
1291        is_dtype_numeric,
1292        attempt_cast_to_numeric,
1293        attempt_cast_to_uuid,
1294        attempt_cast_to_bytes,
1295        attempt_cast_to_geometry,
1296        coerce_timezone as _coerce_timezone,
1297        get_geometry_type_srid,
1298    )
1299    from meerschaum.utils.dtypes.sql import get_numeric_precision_scale
1300    pandas = mrsm.attempt_import('pandas')
1301    is_dask = 'dask' in df.__module__
1302    if safe_copy:
1303        df = df.copy()
1304    if len(df.columns) == 0:
1305        if debug:
1306            dprint("Incoming DataFrame has no columns. Skipping enforcement...")
1307        return df
1308
1309    explicit_dtypes = explicit_dtypes or {}
1310    pipe_pandas_dtypes = {
1311        col: to_pandas_dtype(typ)
1312        for col, typ in dtypes.items()
1313    }
1314    json_cols = [
1315        col
1316        for col, typ in dtypes.items()
1317        if typ == 'json'
1318    ]
1319    numeric_cols = [
1320        col
1321        for col, typ in dtypes.items()
1322        if typ.startswith('numeric')
1323    ]
1324    geometry_cols_types_srids = {
1325        col: get_geometry_type_srid(typ, default_srid=0)
1326        for col, typ in dtypes.items()
1327        if typ.startswith('geometry') or typ.startswith('geography')
1328    }
1329    uuid_cols = [
1330        col
1331        for col, typ in dtypes.items()
1332        if typ == 'uuid'
1333    ]
1334    bytes_cols = [
1335        col
1336        for col, typ in dtypes.items()
1337        if typ == 'bytes'
1338    ]
1339    datetime_cols = [
1340        col
1341        for col, typ in dtypes.items()
1342        if are_dtypes_equal(typ, 'datetime')
1343    ]
1344    df_numeric_cols = get_numeric_cols(df)
1345    if debug:
1346        dprint("Desired data types:")
1347        pprint(dtypes)
1348        dprint("Data types for incoming DataFrame:")
1349        pprint({_col: str(_typ) for _col, _typ in df.dtypes.items()})
1350
1351    if json_cols and len(df) > 0:
1352        if debug:
1353            dprint(f"Checking columns for JSON encoding: {json_cols}")
1354        for col in json_cols:
1355            if col in df.columns:
1356                try:
1357                    df[col] = df[col].apply(
1358                        (
1359                            lambda x: (
1360                                json.loads(x)
1361                                if isinstance(x, str)
1362                                else x
1363                            )
1364                        )
1365                    )
1366                except Exception as e:
1367                    if debug:
1368                        dprint(f"Unable to parse column '{col}' as JSON:\n{e}")
1369
1370    if numeric_cols:
1371        if debug:
1372            dprint(f"Checking for numerics: {numeric_cols}")
1373        for col in numeric_cols:
1374            precision, scale = get_numeric_precision_scale(None, dtypes.get(col, ''))
1375            if col in df.columns:
1376                try:
1377                    df[col] = df[col].apply(
1378                        functools.partial(
1379                            attempt_cast_to_numeric,
1380                            quantize=True,
1381                            precision=precision,
1382                            scale=scale,
1383                        )
1384                    )
1385                except Exception as e:
1386                    if debug:
1387                        dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}")
1388
1389    if uuid_cols:
1390        if debug:
1391            dprint(f"Checking for UUIDs: {uuid_cols}")
1392        for col in uuid_cols:
1393            if col in df.columns:
1394                try:
1395                    df[col] = df[col].apply(attempt_cast_to_uuid)
1396                except Exception as e:
1397                    if debug:
1398                        dprint(f"Unable to parse column '{col}' as UUID:\n{e}")
1399
1400    if bytes_cols:
1401        if debug:
1402            dprint(f"Checking for bytes: {bytes_cols}")
1403        for col in bytes_cols:
1404            if col in df.columns:
1405                try:
1406                    df[col] = df[col].apply(attempt_cast_to_bytes)
1407                except Exception as e:
1408                    if debug:
1409                        dprint(f"Unable to parse column '{col}' as bytes:\n{e}")
1410
1411    if datetime_cols and coerce_timezone:
1412        if debug:
1413            dprint(f"Checking for datetime conversion: {datetime_cols}")
1414        for col in datetime_cols:
1415            if col in df.columns:
1416                if not strip_timezone and 'utc' in str(df.dtypes[col]).lower():
1417                    if debug:
1418                        dprint(f"Skip UTC coersion for column '{col}' ({str(df[col].dtype)}).")
1419                    continue
1420                if strip_timezone and ',' not in str(df.dtypes[col]):
1421                    if debug:
1422                        dprint(
1423                            f"Skip UTC coersion (stripped) for column '{col}' "
1424                            f"({str(df[col].dtype)})."
1425                        )
1426                        continue
1427
1428                if debug:
1429                    dprint(
1430                        f"Data type for column '{col}' before timezone coersion: "
1431                        f"{str(df[col].dtype)}"
1432                    )
1433
1434                df[col] = _coerce_timezone(df[col], strip_utc=strip_timezone)
1435                if debug:
1436                    dprint(
1437                        f"Data type for column '{col}' after timezone coersion: "
1438                        f"{str(df[col].dtype)}"
1439                    )
1440
1441    if geometry_cols_types_srids:
1442        geopandas = mrsm.attempt_import('geopandas')
1443        if debug:
1444            dprint(f"Checking for geometry: {list(geometry_cols_types_srids)}")
1445        parsed_geom_cols = []
1446        for col in geometry_cols_types_srids:
1447            if col not in df.columns:
1448                continue
1449            try:
1450                df[col] = attempt_cast_to_geometry(df[col])
1451                parsed_geom_cols.append(col)
1452            except Exception as e:
1453                import traceback
1454                traceback.print_exc()
1455                if debug:
1456                    dprint(f"Unable to parse column '{col}' as geometry:\n{e}")
1457
1458        if parsed_geom_cols:
1459            if debug:
1460                dprint(f"Converting to GeoDataFrame (geometry column: '{parsed_geom_cols[0]}')...")
1461            try:
1462                _, default_srid = geometry_cols_types_srids[parsed_geom_cols[0]]
1463                df = geopandas.GeoDataFrame(df, geometry=parsed_geom_cols[0], crs=default_srid)
1464                for col, (_, srid) in geometry_cols_types_srids.items():
1465                    if srid:
1466                        if debug:
1467                            dprint(f"Setting '{col}' to SRID '{srid}'...")
1468                        _ = df[col].set_crs(srid)
1469                if parsed_geom_cols[0] not in df.columns:
1470                    df.rename_geometry(parsed_geom_cols[0], inplace=True)
1471            except (ValueError, TypeError):
1472                import traceback
1473                dprint(f"Failed to cast to GeoDataFrame:\n{traceback.format_exc()}")
1474
1475    df_dtypes = {c: str(t) for c, t in df.dtypes.items()}
1476    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
1477        if debug:
1478            dprint("Data types match. Exiting enforcement...")
1479        return df
1480
1481    common_dtypes = {}
1482    common_diff_dtypes = {}
1483    for col, typ in pipe_pandas_dtypes.items():
1484        if col in df_dtypes:
1485            common_dtypes[col] = typ
1486            if not are_dtypes_equal(typ, df_dtypes[col]):
1487                common_diff_dtypes[col] = df_dtypes[col]
1488
1489    if debug:
1490        dprint("Common columns with different dtypes:")
1491        pprint(common_diff_dtypes)
1492
1493    detected_dt_cols = {}
1494    for col, typ in common_diff_dtypes.items():
1495        if 'datetime' in typ and 'datetime' in common_dtypes[col]:
1496            df_dtypes[col] = typ
1497            detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col])
1498    for col in detected_dt_cols:
1499        del common_diff_dtypes[col]
1500
1501    if debug:
1502        dprint("Common columns with different dtypes (after dates):")
1503        pprint(common_diff_dtypes)
1504
1505    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
1506        if debug:
1507            dprint(
1508                "The incoming DataFrame has mostly the same types, skipping enforcement."
1509                + "The only detected difference was in the following datetime columns."
1510            )
1511            pprint(detected_dt_cols)
1512        return df
1513
1514    for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items():
1515        previous_typ = common_dtypes[col]
1516        mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ))
1517        explicitly_float = are_dtypes_equal(explicit_dtypes.get(col, 'object'), 'float')
1518        explicitly_int = are_dtypes_equal(explicit_dtypes.get(col, 'object'), 'int')
1519        explicitly_numeric = explicit_dtypes.get(col, 'object').startswith('numeric')
1520        all_nan = (
1521            df[col].isnull().all()
1522            if mixed_numeric_types and coerce_numeric and not (explicitly_float or explicitly_int)
1523            else None
1524        )
1525        cast_to_numeric = explicitly_numeric or (
1526            (
1527                col in df_numeric_cols
1528                or (
1529                    mixed_numeric_types
1530                    and not (explicitly_float or explicitly_int)
1531                    and not all_nan
1532                    and coerce_numeric
1533                )
1534            )
1535        )
1536
1537        if debug and (explicitly_numeric or df_numeric_cols or mixed_numeric_types):
1538            from meerschaum.utils.formatting import make_header
1539            msg = (
1540                make_header(f"Coercing column '{col}' to numeric:", left_pad=0)
1541                + "\n"
1542                + f"  Previous type: {previous_typ}\n"
1543                + f"  Current type: {typ if col not in df_numeric_cols else 'Decimal'}"
1544                + ("\n  Column is explicitly numeric." if explicitly_numeric else "")
1545            ) if cast_to_numeric else (
1546                f"Will not coerce column '{col}' to numeric.\n"
1547                f"  Numeric columns in dataframe: {df_numeric_cols}\n"
1548                f"  Mixed numeric types: {mixed_numeric_types}\n"
1549                f"  Explicitly float: {explicitly_float}\n"
1550                f"  Explicitly int: {explicitly_int}\n"
1551                f"  All NaN: {all_nan}\n"
1552                f"  Coerce numeric: {coerce_numeric}"
1553            )
1554            dprint(msg)
1555
1556        if cast_to_numeric:
1557            common_dtypes[col] = attempt_cast_to_numeric
1558            common_diff_dtypes[col] = attempt_cast_to_numeric
1559
1560    for d in common_diff_dtypes:
1561        t = common_dtypes[d]
1562        if debug:
1563            dprint(f"Casting column {d} to dtype {t}.")
1564        try:
1565            df[d] = (
1566                df[d].apply(t)
1567                if callable(t)
1568                else df[d].astype(t)
1569            )
1570        except Exception as e:
1571            if debug:
1572                dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}\ndf:\n{df}")
1573            if 'int' in str(t).lower():
1574                try:
1575                    df[d] = df[d].astype('float64').astype(t)
1576                except Exception:
1577                    if debug:
1578                        dprint(f"Was unable to convert to float then {t}.")
1579    return df

Enforce the dtypes dictionary on a DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame on which to enforce dtypes.
  • dtypes (Dict[str, str]): The data types to attempt to enforce on the DataFrame.
  • explicit_dtypes (Optional[Dict[str, str]], default None): If provided, automatic dtype coersion will respect explicitly configured dtypes (int, float, numeric).
  • safe_copy (bool, default True): If True, create a copy before comparing and modifying the dataframes. Setting to False may mutate the DataFrames. See meerschaum.utils.dataframe.filter_unseen_df.
  • coerce_numeric (bool, default False): If True, convert float and int collisions to numeric.
  • coerce_timezone (bool, default True): If True, convert datetimes to UTC.
  • strip_timezone (bool, default False): If coerce_timezone and strip_timezone are True, remove timezone information from datetimes.
  • debug (bool, default False): Verbosity toggle.
Returns
  • The Pandas DataFrame with the types enforced.
def get_datetime_bound_from_df( df: Union[pandas.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]]], datetime_column: str, minimum: bool = True) -> Union[int, datetime.datetime, NoneType]:
1582def get_datetime_bound_from_df(
1583    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]],
1584    datetime_column: str,
1585    minimum: bool = True,
1586) -> Union[int, datetime, None]:
1587    """
1588    Return the minimum or maximum datetime (or integer) from a DataFrame.
1589
1590    Parameters
1591    ----------
1592    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]
1593        The DataFrame, list, or dict which contains the range axis.
1594
1595    datetime_column: str
1596        The name of the datetime (or int) column.
1597
1598    minimum: bool
1599        Whether to return the minimum (default) or maximum value.
1600
1601    Returns
1602    -------
1603    The minimum or maximum datetime value in the dataframe, or `None`.
1604    """
1605    from meerschaum.utils.dtypes import to_datetime, value_is_null
1606
1607    if df is None:
1608        return None
1609    if not datetime_column:
1610        return None
1611
1612    def compare(a, b):
1613        if a is None:
1614            return b
1615        if b is None:
1616            return a
1617        if minimum:
1618            return a if a < b else b
1619        return a if a > b else b
1620
1621    if isinstance(df, list):
1622        if len(df) == 0:
1623            return None
1624        best_yet = df[0].get(datetime_column, None)
1625        for doc in df:
1626            val = doc.get(datetime_column, None)
1627            best_yet = compare(best_yet, val)
1628        return best_yet
1629
1630    if isinstance(df, dict):
1631        if datetime_column not in df:
1632            return None
1633        best_yet = df[datetime_column][0]
1634        for val in df[datetime_column]:
1635            best_yet = compare(best_yet, val)
1636        return best_yet
1637
1638    if 'DataFrame' in str(type(df)):
1639        from meerschaum.utils.dtypes import are_dtypes_equal
1640        pandas = mrsm.attempt_import('pandas')
1641        is_dask = 'dask' in df.__module__
1642
1643        if datetime_column not in df.columns:
1644            return None
1645
1646        try:
1647            dt_val = (
1648                df[datetime_column].min(skipna=True)
1649                if minimum
1650                else df[datetime_column].max(skipna=True)
1651            )
1652        except Exception:
1653            dt_val = pandas.NA
1654        if is_dask and dt_val is not None and dt_val is not pandas.NA:
1655            dt_val = dt_val.compute()
1656
1657        return (
1658            to_datetime(dt_val, as_pydatetime=True)
1659            if are_dtypes_equal(str(type(dt_val)), 'datetime')
1660            else (dt_val if not value_is_null(dt_val) else None)
1661        )
1662
1663    return None

Return the minimum or maximum datetime (or integer) from a DataFrame.

Parameters
  • df (Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]): The DataFrame, list, or dict which contains the range axis.
  • datetime_column (str): The name of the datetime (or int) column.
  • minimum (bool): Whether to return the minimum (default) or maximum value.
Returns
  • The minimum or maximum datetime value in the dataframe, or None.
def get_unique_index_values( df: Union[pandas.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]]], indices: List[str]) -> Dict[str, List[Any]]:
1666def get_unique_index_values(
1667    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]],
1668    indices: List[str],
1669) -> Dict[str, List[Any]]:
1670    """
1671    Return a dictionary of the unique index values in a DataFrame.
1672
1673    Parameters
1674    ----------
1675    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]
1676        The dataframe (or list or dict) which contains index values.
1677
1678    indices: List[str]
1679        The list of index columns.
1680
1681    Returns
1682    -------
1683    A dictionary mapping indices to unique values.
1684    """
1685    if df is None:
1686        return {}
1687    if 'dataframe' in str(type(df)).lower():
1688        pandas = mrsm.attempt_import('pandas')
1689        return {
1690            col: list({
1691                (val if val is not pandas.NA else None)
1692                for val in df[col].unique()
1693            })
1694            for col in indices
1695            if col in df.columns
1696        }
1697
1698    unique_indices = defaultdict(lambda: set())
1699    if isinstance(df, list):
1700        for doc in df:
1701            for index in indices:
1702                if index in doc:
1703                    unique_indices[index].add(doc[index])
1704
1705    elif isinstance(df, dict):
1706        for index in indices:
1707            if index in df:
1708                unique_indices[index] = unique_indices[index].union(set(df[index]))
1709
1710    return {key: list(val) for key, val in unique_indices.items()}

Return a dictionary of the unique index values in a DataFrame.

Parameters
  • df (Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]): The dataframe (or list or dict) which contains index values.
  • indices (List[str]): The list of index columns.
Returns
  • A dictionary mapping indices to unique values.
def df_is_chunk_generator(df: Any) -> bool:
1713def df_is_chunk_generator(df: Any) -> bool:
1714    """
1715    Determine whether to treat `df` as a chunk generator.
1716
1717    Note this should only be used in a context where generators are expected,
1718    as it will return `True` for any iterable.
1719
1720    Parameters
1721    ----------
1722    The DataFrame or chunk generator to evaluate.
1723
1724    Returns
1725    -------
1726    A `bool` indicating whether to treat `df` as a generator.
1727    """
1728    return (
1729        not isinstance(df, (dict, list, str))
1730        and 'DataFrame' not in str(type(df))
1731        and isinstance(df, (Generator, Iterable, Iterator))
1732    )

Determine whether to treat df as a chunk generator.

Note this should only be used in a context where generators are expected, as it will return True for any iterable.

Parameters
  • The DataFrame or chunk generator to evaluate.
Returns
  • A bool indicating whether to treat df as a generator.
def chunksize_to_npartitions(chunksize: Optional[int]) -> int:
1735def chunksize_to_npartitions(chunksize: Optional[int]) -> int:
1736    """
1737    Return the Dask `npartitions` value for a given `chunksize`.
1738    """
1739    if chunksize == -1:
1740        from meerschaum.config import get_config
1741        chunksize = get_config('system', 'connectors', 'sql', 'chunksize')
1742    if chunksize is None:
1743        return 1
1744    return -1 * chunksize

Return the Dask npartitions value for a given chunksize.

def df_from_literal( pipe: Optional[meerschaum.Pipe] = None, literal: Optional[str] = None, debug: bool = False) -> pandas.DataFrame:
1747def df_from_literal(
1748    pipe: Optional[mrsm.Pipe] = None,
1749    literal: Optional[str] = None,
1750    debug: bool = False
1751) -> 'pd.DataFrame':
1752    """
1753    Construct a dataframe from a literal value, using the pipe's datetime and value column names.
1754
1755    Parameters
1756    ----------
1757    pipe: Optional['meerschaum.Pipe'], default None
1758        The pipe which will consume the literal value.
1759
1760    Returns
1761    -------
1762    A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
1763    and the literal as the value.
1764    """
1765    from meerschaum.utils.packages import import_pandas
1766    from meerschaum.utils.warnings import error, warn
1767    from meerschaum.utils.debug import dprint
1768    from meerschaum.utils.dtypes import get_current_timestamp
1769
1770    if pipe is None or literal is None:
1771        error("Please provide a Pipe and a literal value")
1772
1773    dt_col = pipe.columns.get(
1774        'datetime',
1775        mrsm.get_config('pipes', 'autotime', 'column_name_if_datetime_missing')
1776    )
1777    val_col = pipe.get_val_column(debug=debug)
1778
1779    val = literal
1780    if isinstance(literal, str):
1781        if debug:
1782            dprint(f"Received literal string: '{literal}'")
1783        import ast
1784        try:
1785            val = ast.literal_eval(literal)
1786        except Exception:
1787            warn(
1788                "Failed to parse value from string:\n" + f"{literal}" +
1789                "\n\nWill cast as a string instead."\
1790            )
1791            val = literal
1792
1793    now = get_current_timestamp(pipe.precision)
1794    pd = import_pandas()
1795    return pd.DataFrame({dt_col: [now], val_col: [val]})

Construct a dataframe from a literal value, using the pipe's datetime and value column names.

Parameters
  • pipe (Optional['meerschaum.Pipe'], default None): The pipe which will consume the literal value.
Returns
  • A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
  • and the literal as the value.
def get_first_valid_dask_partition(ddf: "'dask.dataframe.DataFrame'") -> Optional[pandas.DataFrame]:
1798def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]:
1799    """
1800    Return the first valid Dask DataFrame partition (if possible).
1801    """
1802    pdf = None
1803    for partition in ddf.partitions:
1804        try:
1805            pdf = partition.compute()
1806        except Exception:
1807            continue
1808        if len(pdf) > 0:
1809            return pdf
1810    _ = mrsm.attempt_import('partd', lazy=False)
1811    return ddf.compute()

Return the first valid Dask DataFrame partition (if possible).

def query_df( df: pandas.DataFrame, params: Optional[Dict[str, Any]] = None, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, datetime_column: Optional[str] = None, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, inplace: bool = False, reset_index: bool = False, coerce_types: bool = False, debug: bool = False) -> pandas.DataFrame:
1814def query_df(
1815    df: 'pd.DataFrame',
1816    params: Optional[Dict[str, Any]] = None,
1817    begin: Union[datetime, int, None] = None,
1818    end: Union[datetime, int, None] = None,
1819    datetime_column: Optional[str] = None,
1820    select_columns: Optional[List[str]] = None,
1821    omit_columns: Optional[List[str]] = None,
1822    inplace: bool = False,
1823    reset_index: bool = False,
1824    coerce_types: bool = False,
1825    debug: bool = False,
1826) -> 'pd.DataFrame':
1827    """
1828    Query the dataframe with the params dictionary.
1829
1830    Parameters
1831    ----------
1832    df: pd.DataFrame
1833        The DataFrame to query against.
1834
1835    params: Optional[Dict[str, Any]], default None
1836        The parameters dictionary to use for the query.
1837
1838    begin: Union[datetime, int, None], default None
1839        If `begin` and `datetime_column` are provided, only return rows with a timestamp
1840        greater than or equal to this value.
1841
1842    end: Union[datetime, int, None], default None
1843        If `begin` and `datetime_column` are provided, only return rows with a timestamp
1844        less than this value.
1845
1846    datetime_column: Optional[str], default None
1847        A `datetime_column` must be provided to use `begin` and `end`.
1848
1849    select_columns: Optional[List[str]], default None
1850        If provided, only return these columns.
1851
1852    omit_columns: Optional[List[str]], default None
1853        If provided, do not include these columns in the result.
1854
1855    inplace: bool, default False
1856        If `True`, modify the DataFrame inplace rather than creating a new DataFrame.
1857
1858    reset_index: bool, default False
1859        If `True`, reset the index in the resulting DataFrame.
1860
1861    coerce_types: bool, default False
1862        If `True`, cast the dataframe and parameters as strings before querying.
1863
1864    Returns
1865    -------
1866    A Pandas DataFrame query result.
1867    """
1868
1869    def _process_select_columns(_df):
1870        if not select_columns:
1871            return
1872        for col in list(_df.columns):
1873            if col not in select_columns:
1874                del _df[col]
1875
1876    def _process_omit_columns(_df):
1877        if not omit_columns:
1878            return
1879        for col in list(_df.columns):
1880            if col in omit_columns:
1881                del _df[col]
1882
1883    if not params and not begin and not end:
1884        if not inplace:
1885            df = df.copy()
1886        _process_select_columns(df)
1887        _process_omit_columns(df)
1888        return df
1889
1890    from meerschaum.utils.debug import dprint
1891    from meerschaum.utils.misc import get_in_ex_params
1892    from meerschaum.utils.warnings import warn
1893    from meerschaum.utils.dtypes import are_dtypes_equal, value_is_null
1894    dateutil_parser = mrsm.attempt_import('dateutil.parser')
1895    pandas = mrsm.attempt_import('pandas')
1896    NA = pandas.NA
1897
1898    if params:
1899        proto_in_ex_params = get_in_ex_params(params)
1900        for key, (proto_in_vals, proto_ex_vals) in proto_in_ex_params.items():
1901            if proto_ex_vals:
1902                coerce_types = True
1903                break
1904        params = params.copy()
1905        for key, val in {k: v for k, v in params.items()}.items():
1906            if isinstance(val, (list, tuple, set)) or hasattr(val, 'astype'):
1907                if None in val:
1908                    val = [item for item in val if item is not None] + [NA]
1909                    params[key] = val
1910                if coerce_types:
1911                    params[key] = [str(x) for x in val]
1912            else:
1913                if value_is_null(val):
1914                    val = NA
1915                    params[key] = NA
1916                if coerce_types:
1917                    params[key] = str(val)
1918
1919    dtypes = {col: str(typ) for col, typ in df.dtypes.items()}
1920
1921    if inplace:
1922        df.fillna(NA, inplace=True)
1923    else:
1924        df = df.infer_objects().fillna(NA)
1925
1926    if isinstance(begin, str):
1927        begin = dateutil_parser.parse(begin)
1928    if isinstance(end, str):
1929        end = dateutil_parser.parse(end)
1930
1931    if begin is not None or end is not None:
1932        if not datetime_column or datetime_column not in df.columns:
1933            warn(
1934                f"The datetime column '{datetime_column}' is not present in the Dataframe, "
1935                + "ignoring begin and end...",
1936            )
1937            begin, end = None, None
1938
1939    if debug:
1940        dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}")
1941
1942    if datetime_column and (begin is not None or end is not None):
1943        if debug:
1944            dprint("Checking for datetime column compatability.")
1945
1946        from meerschaum.utils.dtypes import coerce_timezone
1947        df_is_dt = are_dtypes_equal(str(df.dtypes[datetime_column]), 'datetime')
1948        begin_is_int = are_dtypes_equal(str(type(begin)), 'int')
1949        end_is_int = are_dtypes_equal(str(type(end)), 'int')
1950
1951        if df_is_dt:
1952            df_tz = (
1953                getattr(df[datetime_column].dt, 'tz', None)
1954                if hasattr(df[datetime_column], 'dt')
1955                else None
1956            )
1957
1958            if begin_is_int:
1959                begin = datetime.fromtimestamp(int(begin), timezone.utc).replace(tzinfo=None)
1960                if debug:
1961                    dprint(f"`begin` will be cast to '{begin}'.")
1962            if end_is_int:
1963                end = datetime.fromtimestamp(int(end), timezone.utc).replace(tzinfo=None)
1964                if debug:
1965                    dprint(f"`end` will be cast to '{end}'.")
1966
1967            begin = coerce_timezone(begin, strip_utc=(df_tz is None)) if begin is not None else None
1968            end = coerce_timezone(end, strip_utc=(df_tz is None)) if begin is not None else None
1969
1970    in_ex_params = get_in_ex_params(params)
1971
1972    masks = [
1973        (
1974            (df[datetime_column] >= begin)
1975            if begin is not None and datetime_column
1976            else True
1977        ) & (
1978            (df[datetime_column] < end)
1979            if end is not None and datetime_column
1980            else True
1981        )
1982    ]
1983
1984    masks.extend([
1985        (
1986            (
1987                (df[col] if not coerce_types else df[col].astype(str)).isin(in_vals)
1988                if in_vals
1989                else True
1990            ) & (
1991                ~(df[col] if not coerce_types else df[col].astype(str)).isin(ex_vals)
1992                if ex_vals
1993                else True
1994            )
1995        )
1996        for col, (in_vals, ex_vals) in in_ex_params.items()
1997        if col in df.columns
1998    ])
1999    query_mask = masks[0]
2000    for mask in masks[1:]:
2001        query_mask = query_mask & mask
2002
2003    original_cols = df.columns
2004
2005    ### NOTE: We must cast bool columns to `boolean[pyarrow]`
2006    ###       to allow for `<NA>` values.
2007    bool_cols = [
2008        col
2009        for col, typ in df.dtypes.items()
2010        if are_dtypes_equal(str(typ), 'bool')
2011    ]
2012    for col in bool_cols:
2013        df[col] = df[col].astype('boolean[pyarrow]')
2014
2015    if not isinstance(query_mask, bool):
2016        df['__mrsm_mask'] = (
2017            query_mask.astype('boolean[pyarrow]')
2018            if hasattr(query_mask, 'astype')
2019            else query_mask
2020        )
2021
2022        if inplace:
2023            df.where(query_mask, other=NA, inplace=True)
2024            df.dropna(how='all', inplace=True)
2025            result_df = df
2026        else:
2027            result_df = df.where(query_mask, other=NA)
2028            result_df.dropna(how='all', inplace=True)
2029
2030    else:
2031        result_df = df
2032
2033    if '__mrsm_mask' in df.columns:
2034        del df['__mrsm_mask']
2035    if '__mrsm_mask' in result_df.columns:
2036        del result_df['__mrsm_mask']
2037
2038    if reset_index:
2039        result_df.reset_index(drop=True, inplace=True)
2040
2041    result_df = enforce_dtypes(
2042        result_df,
2043        dtypes,
2044        safe_copy=False,
2045        debug=debug,
2046        coerce_numeric=False,
2047        coerce_timezone=False,
2048    )
2049
2050    if select_columns == ['*']:
2051        select_columns = None
2052
2053    if not select_columns and not omit_columns:
2054        return result_df[original_cols]
2055
2056    _process_select_columns(result_df)
2057    _process_omit_columns(result_df)
2058
2059    return result_df

Query the dataframe with the params dictionary.

Parameters
  • df (pd.DataFrame): The DataFrame to query against.
  • params (Optional[Dict[str, Any]], default None): The parameters dictionary to use for the query.
  • begin (Union[datetime, int, None], default None): If begin and datetime_column are provided, only return rows with a timestamp greater than or equal to this value.
  • end (Union[datetime, int, None], default None): If begin and datetime_column are provided, only return rows with a timestamp less than this value.
  • datetime_column (Optional[str], default None): A datetime_column must be provided to use begin and end.
  • select_columns (Optional[List[str]], default None): If provided, only return these columns.
  • omit_columns (Optional[List[str]], default None): If provided, do not include these columns in the result.
  • inplace (bool, default False): If True, modify the DataFrame inplace rather than creating a new DataFrame.
  • reset_index (bool, default False): If True, reset the index in the resulting DataFrame.
  • coerce_types (bool, default False): If True, cast the dataframe and parameters as strings before querying.
Returns
  • A Pandas DataFrame query result.
def to_json( df: pandas.DataFrame, safe_copy: bool = True, orient: str = 'records', date_format: str = 'iso', date_unit: str = 'us', double_precision: int = 15, geometry_format: str = 'geojson', **kwargs: Any) -> str:
2062def to_json(
2063    df: 'pd.DataFrame',
2064    safe_copy: bool = True,
2065    orient: str = 'records',
2066    date_format: str = 'iso',
2067    date_unit: str = 'us',
2068    double_precision: int = 15,
2069    geometry_format: str = 'geojson',
2070    **kwargs: Any
2071) -> str:
2072    """
2073    Serialize the given dataframe as a JSON string.
2074
2075    Parameters
2076    ----------
2077    df: pd.DataFrame
2078        The DataFrame to be serialized.
2079
2080    safe_copy: bool, default True
2081        If `False`, modify the DataFrame inplace.
2082
2083    date_format: str, default 'iso'
2084        The default format for timestamps.
2085
2086    date_unit: str, default 'us'
2087        The precision of the timestamps.
2088
2089    double_precision: int, default 15
2090        The number of decimal places to use when encoding floating point values (maximum 15).
2091
2092    geometry_format: str, default 'geojson'
2093        The serialization format for geometry data.
2094        Accepted values are `geojson`, `wkb_hex`, and `wkt`.
2095
2096    Returns
2097    -------
2098    A JSON string.
2099    """
2100    import warnings
2101    import functools
2102    from meerschaum.utils.packages import import_pandas
2103    from meerschaum.utils.dtypes import (
2104        serialize_bytes,
2105        serialize_decimal,
2106        serialize_geometry,
2107        serialize_date,
2108    )
2109    pd = import_pandas()
2110    uuid_cols = get_uuid_cols(df)
2111    bytes_cols = get_bytes_cols(df)
2112    numeric_cols = get_numeric_cols(df)
2113    date_cols = get_date_cols(df)
2114    geometry_cols = get_geometry_cols(df)
2115    geometry_cols_srids = {
2116        col: int((getattr(df[col].crs, 'srs', '') or '').split(':', maxsplit=1)[-1] or '0')
2117        for col in geometry_cols
2118    } if 'geodataframe' in str(type(df)).lower() else {}
2119    if safe_copy and bool(uuid_cols or bytes_cols or geometry_cols or numeric_cols):
2120        df = df.copy()
2121    if 'geodataframe' in str(type(df)).lower():
2122        geometry_data = {
2123            col: df[col]
2124            for col in geometry_cols
2125        }
2126        df = pd.DataFrame({
2127            col: df[col]
2128            for col in df.columns
2129            if col not in geometry_cols
2130        })
2131        for col in geometry_cols:
2132            df[col] = pd.Series(ob for ob in geometry_data[col])
2133    for col in uuid_cols:
2134        df[col] = df[col].astype(str)
2135    for col in bytes_cols:
2136        df[col] = df[col].apply(serialize_bytes)
2137    for col in numeric_cols:
2138        df[col] = df[col].apply(serialize_decimal)
2139    for col in date_cols:
2140        df[col] = df[col].apply(serialize_date)
2141    with warnings.catch_warnings():
2142        warnings.simplefilter("ignore")
2143        for col in geometry_cols:
2144            srid = geometry_cols_srids.get(col, None) or None
2145            df[col] = pd.Series(
2146                serialize_geometry(val, geometry_format=geometry_format, srid=srid)
2147                for val in df[col]
2148            )
2149    return df.infer_objects().fillna(pd.NA).to_json(
2150        date_format=date_format,
2151        date_unit=date_unit,
2152        double_precision=double_precision,
2153        orient=orient,
2154        **kwargs
2155    )

Serialize the given dataframe as a JSON string.

Parameters
  • df (pd.DataFrame): The DataFrame to be serialized.
  • safe_copy (bool, default True): If False, modify the DataFrame inplace.
  • date_format (str, default 'iso'): The default format for timestamps.
  • date_unit (str, default 'us'): The precision of the timestamps.
  • double_precision (int, default 15): The number of decimal places to use when encoding floating point values (maximum 15).
  • geometry_format (str, default 'geojson'): The serialization format for geometry data. Accepted values are geojson, wkb_hex, and wkt.
Returns
  • A JSON string.
def to_simple_lines(df: pandas.DataFrame) -> str:
2158def to_simple_lines(df: 'pd.DataFrame') -> str:
2159    """
2160    Serialize a Pandas Dataframe as lines of simple dictionaries.
2161
2162    Parameters
2163    ----------
2164    df: pd.DataFrame
2165        The dataframe to serialize into simple lines text.
2166
2167    Returns
2168    -------
2169    A string of simple line dictionaries joined by newlines.
2170    """
2171    from meerschaum.utils.misc import to_simple_dict
2172    if df is None or len(df) == 0:
2173        return ''
2174
2175    docs = df.to_dict(orient='records')
2176    return '\n'.join(to_simple_dict(doc) for doc in docs)

Serialize a Pandas Dataframe as lines of simple dictionaries.

Parameters
  • df (pd.DataFrame): The dataframe to serialize into simple lines text.
Returns
  • A string of simple line dictionaries joined by newlines.
def parse_simple_lines(data: str) -> pandas.DataFrame:
2179def parse_simple_lines(data: str) -> 'pd.DataFrame':
2180    """
2181    Parse simple lines text into a DataFrame.
2182
2183    Parameters
2184    ----------
2185    data: str
2186        The simple lines text to parse into a DataFrame.
2187
2188    Returns
2189    -------
2190    A dataframe containing the rows serialized in `data`.
2191    """
2192    from meerschaum.utils.misc import string_to_dict
2193    from meerschaum.utils.packages import import_pandas
2194    pd = import_pandas()
2195    lines = data.splitlines()
2196    try:
2197        docs = [string_to_dict(line) for line in lines]
2198        df = pd.DataFrame(docs)
2199    except Exception:
2200        df = None
2201
2202    if df is None:
2203        raise ValueError("Cannot parse simple lines into a dataframe.")
2204
2205    return df

Parse simple lines text into a DataFrame.

Parameters
  • data (str): The simple lines text to parse into a DataFrame.
Returns
  • A dataframe containing the rows serialized in data.