meerschaum.utils.dataframe

Utility functions for working with DataFrames.

   1#! /usr/bin/env python3
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4
   5"""
   6Utility functions for working with DataFrames.
   7"""
   8
   9from __future__ import annotations
  10
  11from datetime import datetime, timezone, date
  12from collections import defaultdict
  13
  14import meerschaum as mrsm
  15from meerschaum.utils.typing import (
  16    Optional, Dict, Any, List, Hashable, Generator,
  17    Iterator, Iterable, Union, TYPE_CHECKING, Tuple,
  18)
  19
  20if TYPE_CHECKING:
  21    pd, dask = mrsm.attempt_import('pandas', 'dask')
  22
  23
  24def add_missing_cols_to_df(
  25    df: 'pd.DataFrame',
  26    dtypes: Dict[str, Any],
  27) -> 'pd.DataFrame':
  28    """
  29    Add columns from the dtypes dictionary as null columns to a new DataFrame.
  30
  31    Parameters
  32    ----------
  33    df: pd.DataFrame
  34        The dataframe we should copy and add null columns.
  35
  36    dtypes:
  37        The data types dictionary which may contain keys not present in `df.columns`.
  38
  39    Returns
  40    -------
  41    A new `DataFrame` with the keys from `dtypes` added as null columns.
  42    If `df.dtypes` is the same as `dtypes`, then return a reference to `df`.
  43    NOTE: This will not ensure that dtypes are enforced!
  44
  45    Examples
  46    --------
  47    >>> import pandas as pd
  48    >>> df = pd.DataFrame([{'a': 1}])
  49    >>> dtypes = {'b': 'Int64'}
  50    >>> add_missing_cols_to_df(df, dtypes)
  51          a  b
  52       0  1  <NA>
  53    >>> add_missing_cols_to_df(df, dtypes).dtypes
  54    a    int64
  55    b    Int64
  56    dtype: object
  57    >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes
  58    a    int64
  59    dtype: object
  60    >>> 
  61    """
  62    if set(df.columns) == set(dtypes):
  63        return df
  64
  65    from meerschaum.utils.packages import attempt_import
  66    from meerschaum.utils.dtypes import to_pandas_dtype
  67    pandas = attempt_import('pandas')
  68
  69    def build_series(dtype: str):
  70        return pandas.Series([], dtype=to_pandas_dtype(dtype))
  71
  72    assign_kwargs = {
  73        str(col): build_series(str(typ))
  74        for col, typ in dtypes.items()
  75        if col not in df.columns
  76    }
  77    df_with_cols = df.assign(**assign_kwargs)
  78    for col in assign_kwargs:
  79        df_with_cols[col] = df_with_cols[col].fillna(pandas.NA)
  80    return df_with_cols
  81
  82
  83def filter_unseen_df(
  84    old_df: 'pd.DataFrame',
  85    new_df: 'pd.DataFrame',
  86    safe_copy: bool = True,
  87    dtypes: Optional[Dict[str, Any]] = None,
  88    include_unchanged_columns: bool = False,
  89    coerce_mixed_numerics: bool = True,
  90    debug: bool = False,
  91) -> 'pd.DataFrame':
  92    """
  93    Left join two DataFrames to find the newest unseen data.
  94
  95    Parameters
  96    ----------
  97    old_df: 'pd.DataFrame'
  98        The original (target) dataframe. Acts as a filter on the `new_df`.
  99
 100    new_df: 'pd.DataFrame'
 101        The fetched (source) dataframe. Rows that are contained in `old_df` are removed.
 102
 103    safe_copy: bool, default True
 104        If `True`, create a copy before comparing and modifying the dataframes.
 105        Setting to `False` may mutate the DataFrames.
 106
 107    dtypes: Optional[Dict[str, Any]], default None
 108        Optionally specify the datatypes of the dataframe.
 109
 110    include_unchanged_columns: bool, default False
 111        If `True`, include columns which haven't changed on rows which have changed.
 112
 113    coerce_mixed_numerics: bool, default True
 114        If `True`, cast mixed integer and float columns between the old and new dataframes into
 115        numeric values (`decimal.Decimal`).
 116
 117    debug: bool, default False
 118        Verbosity toggle.
 119
 120    Returns
 121    -------
 122    A pandas dataframe of the new, unseen rows in `new_df`.
 123
 124    Examples
 125    --------
 126    ```python
 127    >>> import pandas as pd
 128    >>> df1 = pd.DataFrame({'a': [1,2]})
 129    >>> df2 = pd.DataFrame({'a': [2,3]})
 130    >>> filter_unseen_df(df1, df2)
 131       a
 132    0  3
 133
 134    ```
 135
 136    """
 137    if old_df is None:
 138        return new_df
 139
 140    if safe_copy:
 141        old_df = old_df.copy()
 142        new_df = new_df.copy()
 143
 144    import json
 145    import functools
 146    import traceback
 147    from meerschaum.utils.warnings import warn
 148    from meerschaum.utils.packages import import_pandas, attempt_import
 149    from meerschaum.utils.dtypes import (
 150        to_pandas_dtype,
 151        are_dtypes_equal,
 152        attempt_cast_to_numeric,
 153        attempt_cast_to_uuid,
 154        attempt_cast_to_bytes,
 155        attempt_cast_to_geometry,
 156        coerce_timezone,
 157        serialize_decimal,
 158    )
 159    from meerschaum.utils.dtypes.sql import get_numeric_precision_scale
 160    pd = import_pandas(debug=debug)
 161    is_dask = 'dask' in new_df.__module__
 162    if is_dask:
 163        pandas = attempt_import('pandas')
 164        _ = attempt_import('partd', lazy=False)
 165        dd = attempt_import('dask.dataframe')
 166        merge = dd.merge
 167        NA = pandas.NA
 168    else:
 169        merge = pd.merge
 170        NA = pd.NA
 171
 172    new_df_dtypes = dict(new_df.dtypes)
 173    old_df_dtypes = dict(old_df.dtypes)
 174
 175    same_cols = set(new_df.columns) == set(old_df.columns)
 176    if not same_cols:
 177        new_df = add_missing_cols_to_df(new_df, old_df_dtypes)
 178        old_df = add_missing_cols_to_df(old_df, new_df_dtypes)
 179
 180        new_types_missing_from_old = {
 181            col: typ
 182            for col, typ in new_df_dtypes.items()
 183            if col not in old_df_dtypes
 184        }
 185        old_types_missing_from_new = {
 186            col: typ
 187            for col, typ in new_df_dtypes.items()
 188            if col not in old_df_dtypes
 189        }
 190        old_df_dtypes.update(new_types_missing_from_old)
 191        new_df_dtypes.update(old_types_missing_from_new)
 192
 193    ### Edge case: two empty lists cast to DFs.
 194    elif len(new_df.columns) == 0:
 195        return new_df
 196
 197    try:
 198        ### Order matters when checking equality.
 199        new_df = new_df[old_df.columns]
 200
 201    except Exception as e:
 202        warn(
 203            "Was not able to cast old columns onto new DataFrame. " +
 204            f"Are both DataFrames the same shape? Error:\n{e}",
 205            stacklevel=3,
 206        )
 207        return new_df[list(new_df_dtypes.keys())]
 208
 209    ### assume the old_df knows what it's doing, even if it's technically wrong.
 210    if dtypes is None:
 211        dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()}
 212
 213    dtypes = {
 214        col: to_pandas_dtype(typ)
 215        for col, typ in dtypes.items()
 216        if col in new_df_dtypes and col in old_df_dtypes
 217    }
 218    for col, typ in new_df_dtypes.items():
 219        if col not in dtypes:
 220            dtypes[col] = typ
 221
 222    numeric_cols_precisions_scales = {
 223        col: get_numeric_precision_scale(None, typ)
 224        for col, typ in dtypes.items()
 225        if col and str(typ).lower().startswith('numeric')
 226    }
 227
 228    dt_dtypes = {
 229        col: typ
 230        for col, typ in dtypes.items()
 231        if are_dtypes_equal(typ, 'datetime')
 232    }
 233    non_dt_dtypes = {
 234        col: typ
 235        for col, typ in dtypes.items()
 236        if col not in dt_dtypes
 237    }
 238
 239    cast_non_dt_cols = True
 240    try:
 241        new_df = new_df.astype(non_dt_dtypes)
 242        cast_non_dt_cols = False
 243    except Exception as e:
 244        warn(
 245            f"Was not able to cast the new DataFrame to the given dtypes.\n{e}"
 246        )
 247
 248    cast_dt_cols = True
 249    try:
 250        for col, typ in dt_dtypes.items():
 251            _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime'))
 252            strip_utc = (
 253                _dtypes_col_dtype.startswith('datetime64')
 254                and 'utc' not in _dtypes_col_dtype.lower()
 255            )
 256            if col in old_df.columns:
 257                old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc)
 258            if col in new_df.columns:
 259                new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc)
 260        cast_dt_cols = False
 261    except Exception as e:
 262        warn(f"Could not cast datetime columns:\n{e}")
 263
 264    cast_cols = cast_dt_cols or cast_non_dt_cols
 265
 266    new_numeric_cols_existing = get_numeric_cols(new_df)
 267    old_numeric_cols = get_numeric_cols(old_df)
 268    for col, typ in {k: v for k, v in dtypes.items()}.items():
 269        if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')):
 270            new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float')
 271            new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int')
 272            new_is_numeric = col in new_numeric_cols_existing
 273            old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float')
 274            old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int')
 275            old_is_numeric = col in old_numeric_cols
 276
 277            if (
 278                coerce_mixed_numerics
 279                and
 280                (new_is_float or new_is_int or new_is_numeric)
 281                and
 282                (old_is_float or old_is_int or old_is_numeric)
 283            ):
 284                dtypes[col] = attempt_cast_to_numeric
 285                cast_cols = True
 286                continue
 287
 288            ### Fallback to object if the types don't match.
 289            warn(
 290                f"Detected different types for '{col}' "
 291                + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), "
 292                + "falling back to 'object'..."
 293            )
 294            dtypes[col] = 'object'
 295            cast_cols = True
 296
 297    if cast_cols:
 298        for col, dtype in dtypes.items():
 299            if col in new_df.columns:
 300                try:
 301                    new_df[col] = (
 302                        new_df[col].astype(dtype)
 303                        if not callable(dtype)
 304                        else new_df[col].apply(dtype)
 305                    )
 306                except Exception as e:
 307                    warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}")
 308
 309    serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str)
 310    new_json_cols = get_json_cols(new_df)
 311    old_json_cols = get_json_cols(old_df)
 312    json_cols = set(new_json_cols + old_json_cols)
 313    for json_col in old_json_cols:
 314        old_df[json_col] = old_df[json_col].apply(serializer)
 315    for json_col in new_json_cols:
 316        new_df[json_col] = new_df[json_col].apply(serializer)
 317
 318    new_numeric_cols = get_numeric_cols(new_df)
 319    numeric_cols = set(new_numeric_cols + old_numeric_cols)
 320    for numeric_col in old_numeric_cols:
 321        old_df[numeric_col] = old_df[numeric_col].apply(serialize_decimal)
 322    for numeric_col in new_numeric_cols:
 323        new_df[numeric_col] = new_df[numeric_col].apply(serialize_decimal)
 324
 325    old_dt_cols = [
 326        col
 327        for col, typ in old_df.dtypes.items()
 328        if are_dtypes_equal(str(typ), 'datetime')
 329    ]
 330    for col in old_dt_cols:
 331        _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime'))
 332        strip_utc = (
 333            _dtypes_col_dtype.startswith('datetime64')
 334            and 'utc' not in _dtypes_col_dtype.lower()
 335        )
 336        old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc)
 337
 338    new_dt_cols = [
 339        col
 340        for col, typ in new_df.dtypes.items()
 341        if are_dtypes_equal(str(typ), 'datetime')
 342    ]
 343    for col in new_dt_cols:
 344        _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime'))
 345        strip_utc = (
 346            _dtypes_col_dtype.startswith('datetime64')
 347            and 'utc' not in _dtypes_col_dtype.lower()
 348        )
 349        new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc)
 350
 351    old_uuid_cols = get_uuid_cols(old_df)
 352    new_uuid_cols = get_uuid_cols(new_df)
 353    uuid_cols = set(new_uuid_cols + old_uuid_cols)
 354
 355    old_bytes_cols = get_bytes_cols(old_df)
 356    new_bytes_cols = get_bytes_cols(new_df)
 357    bytes_cols = set(new_bytes_cols + old_bytes_cols)
 358
 359    old_geometry_cols = get_geometry_cols(old_df)
 360    new_geometry_cols = get_geometry_cols(new_df)
 361    geometry_cols = set(new_geometry_cols + old_geometry_cols)
 362
 363    joined_df = merge(
 364        new_df.infer_objects(copy=False).fillna(NA),
 365        old_df.infer_objects(copy=False).fillna(NA),
 366        how='left',
 367        on=None,
 368        indicator=True,
 369    )
 370    changed_rows_mask = (joined_df['_merge'] == 'left_only')
 371    new_cols = list(new_df_dtypes)
 372    delta_df = joined_df[new_cols][changed_rows_mask].reset_index(drop=True)
 373
 374    for json_col in json_cols:
 375        if json_col not in delta_df.columns:
 376            continue
 377        try:
 378            delta_df[json_col] = delta_df[json_col].apply(
 379                lambda x: (json.loads(x) if isinstance(x, str) else x)
 380            )
 381        except Exception:
 382            warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}")
 383
 384    for numeric_col in numeric_cols:
 385        if numeric_col not in delta_df.columns:
 386            continue
 387        try:
 388            delta_df[numeric_col] = delta_df[numeric_col].apply(
 389                functools.partial(
 390                    attempt_cast_to_numeric,
 391                    quantize=True,
 392                    precision=numeric_cols_precisions_scales.get(numeric_col, (None, None)[0]),
 393                    scale=numeric_cols_precisions_scales.get(numeric_col, (None, None)[1]),
 394                )
 395            )
 396        except Exception:
 397            warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}")
 398
 399    for uuid_col in uuid_cols:
 400        if uuid_col not in delta_df.columns:
 401            continue
 402        try:
 403            delta_df[uuid_col] = delta_df[uuid_col].apply(attempt_cast_to_uuid)
 404        except Exception:
 405            warn(f"Unable to parse numeric column '{uuid_col}':\n{traceback.format_exc()}")
 406
 407    for bytes_col in bytes_cols:
 408        if bytes_col not in delta_df.columns:
 409            continue
 410        try:
 411            delta_df[bytes_col] = delta_df[bytes_col].apply(attempt_cast_to_bytes)
 412        except Exception:
 413            warn(f"Unable to parse bytes column '{bytes_col}':\n{traceback.format_exc()}")
 414
 415    for geometry_col in geometry_cols:
 416        if geometry_col not in delta_df.columns:
 417            continue
 418        try:
 419            delta_df[geometry_col] = delta_df[geometry_col].apply(attempt_cast_to_geometry)
 420        except Exception:
 421            warn(f"Unable to parse bytes column '{bytes_col}':\n{traceback.format_exc()}")
 422
 423    return delta_df
 424
 425
 426def parse_df_datetimes(
 427    df: 'pd.DataFrame',
 428    ignore_cols: Optional[Iterable[str]] = None,
 429    strip_timezone: bool = False,
 430    chunksize: Optional[int] = None,
 431    dtype_backend: str = 'numpy_nullable',
 432    ignore_all: bool = False,
 433    precision_unit: Optional[str] = None,
 434    coerce_utc: bool = True,
 435    debug: bool = False,
 436) -> 'pd.DataFrame':
 437    """
 438    Parse a pandas DataFrame for datetime columns and cast as datetimes.
 439
 440    Parameters
 441    ----------
 442    df: pd.DataFrame
 443        The pandas DataFrame to parse.
 444
 445    ignore_cols: Optional[Iterable[str]], default None
 446        If provided, do not attempt to coerce these columns as datetimes.
 447
 448    strip_timezone: bool, default False
 449        If `True`, remove the UTC `tzinfo` property.
 450
 451    chunksize: Optional[int], default None
 452        If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe.
 453
 454    dtype_backend: str, default 'numpy_nullable'
 455        If `df` is not a DataFrame and new one needs to be constructed,
 456        use this as the datatypes backend.
 457        Accepted values are 'numpy_nullable' and 'pyarrow'.
 458
 459    ignore_all: bool, default False
 460        If `True`, do not attempt to cast any columns to datetimes.
 461
 462    precision_unit: Optional[str], default None
 463        If provided, enforce the given precision on the coerced datetime columns.
 464
 465    coerce_utc: bool, default True
 466        Coerce the datetime columns to UTC (see `meerschaum.utils.dtypes.to_datetime()`).
 467
 468    debug: bool, default False
 469        Verbosity toggle.
 470
 471    Returns
 472    -------
 473    A new pandas DataFrame with the determined datetime columns
 474    (usually ISO strings) cast as datetimes.
 475
 476    Examples
 477    --------
 478    ```python
 479    >>> import pandas as pd
 480    >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 
 481    >>> df.dtypes
 482    a    object
 483    dtype: object
 484    >>> df2 = parse_df_datetimes(df)
 485    >>> df2.dtypes
 486    a    datetime64[us, UTC]
 487    dtype: object
 488
 489    ```
 490
 491    """
 492    from meerschaum.utils.packages import import_pandas, attempt_import
 493    from meerschaum.utils.debug import dprint
 494    from meerschaum.utils.warnings import warn
 495    from meerschaum.utils.misc import items_str
 496    from meerschaum.utils.dtypes import to_datetime, MRSM_PD_DTYPES
 497    import traceback
 498
 499    pd = import_pandas()
 500    pandas = attempt_import('pandas')
 501    pd_name = pd.__name__
 502    using_dask = 'dask' in pd_name
 503    df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__)
 504    dask_dataframe = None
 505    if using_dask or df_is_dask:
 506        npartitions = chunksize_to_npartitions(chunksize)
 507        dask_dataframe = attempt_import('dask.dataframe')
 508
 509    ### if df is a dict, build DataFrame
 510    if isinstance(df, pandas.DataFrame):
 511        pdf = df
 512    elif df_is_dask and isinstance(df, dask_dataframe.DataFrame):
 513        pdf = get_first_valid_dask_partition(df)
 514    else:
 515        if debug:
 516            dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...")
 517
 518        if using_dask:
 519            if isinstance(df, list):
 520                keys = set()
 521                for doc in df:
 522                    for key in doc:
 523                        keys.add(key)
 524                df = pd.DataFrame.from_dict(
 525                    {
 526                        k: [
 527                            doc.get(k, None)
 528                            for doc in df
 529                        ] for k in keys
 530                    },
 531                    npartitions=npartitions,
 532                )
 533            elif isinstance(df, dict):
 534                df = pd.DataFrame.from_dict(df, npartitions=npartitions)
 535            elif 'pandas.core.frame.DataFrame' in str(type(df)):
 536                df = pd.from_pandas(df, npartitions=npartitions)
 537            else:
 538                raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.")
 539            pandas = attempt_import('pandas')
 540            pdf = get_first_valid_dask_partition(df)
 541
 542        else:
 543            df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend)
 544            pdf = df
 545
 546    ### skip parsing if DataFrame is empty
 547    if len(pdf) == 0:
 548        if debug:
 549            dprint("df is empty. Returning original DataFrame without casting datetime columns...")
 550        return df
 551
 552    ignore_cols = set(
 553        (ignore_cols or []) + [
 554            col
 555            for col, dtype in pdf.dtypes.items() 
 556            if 'datetime' in str(dtype)
 557        ]
 558    )
 559    cols_to_inspect = [
 560        col
 561        for col in pdf.columns
 562        if col not in ignore_cols
 563    ] if not ignore_all else []
 564
 565    if len(cols_to_inspect) == 0:
 566        if debug:
 567            dprint("All columns are ignored, skipping datetime detection...")
 568        return df.infer_objects(copy=False).fillna(pandas.NA)
 569
 570    ### apply regex to columns to determine which are ISO datetimes
 571    iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+'
 572    dt_mask = pdf[cols_to_inspect].astype(str).apply(
 573        lambda s: s.str.match(iso_dt_regex).all()
 574    )
 575
 576    ### list of datetime column names
 577    datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]]
 578    if not datetime_cols:
 579        if debug:
 580            dprint("No columns detected as datetimes, returning...")
 581        return df.infer_objects(copy=False).fillna(pandas.NA)
 582
 583    if debug:
 584        dprint("Converting columns to datetimes: " + str(datetime_cols))
 585
 586    def _parse_to_datetime(x):
 587        return to_datetime(x, precision_unit=precision_unit, coerce_utc=coerce_utc)
 588
 589    try:
 590        if not using_dask:
 591            df[datetime_cols] = df[datetime_cols].apply(_parse_to_datetime)
 592        else:
 593            df[datetime_cols] = df[datetime_cols].apply(
 594                _parse_to_datetime,
 595                utc=True,
 596                axis=1,
 597                meta={
 598                    col: MRSM_PD_DTYPES['datetime']
 599                    for col in datetime_cols
 600                }
 601            )
 602    except Exception:
 603        warn(
 604            f"Unable to apply `to_datetime()` to {items_str(datetime_cols)}:\n"
 605            + f"{traceback.format_exc()}"
 606        )
 607
 608    if strip_timezone:
 609        for dt in datetime_cols:
 610            try:
 611                df[dt] = df[dt].dt.tz_localize(None)
 612            except Exception:
 613                warn(
 614                    f"Unable to convert column '{dt}' to naive datetime:\n"
 615                    + f"{traceback.format_exc()}"
 616                )
 617
 618    return df.fillna(pandas.NA)
 619
 620
 621def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]:
 622    """
 623    Get the columns which contain unhashable objects from a Pandas DataFrame.
 624
 625    Parameters
 626    ----------
 627    df: pd.DataFrame
 628        The DataFrame which may contain unhashable objects.
 629
 630    Returns
 631    -------
 632    A list of columns.
 633    """
 634    if df is None:
 635        return []
 636    if len(df) == 0:
 637        return []
 638
 639    is_dask = 'dask' in df.__module__
 640    if is_dask:
 641        from meerschaum.utils.packages import attempt_import
 642        pandas = attempt_import('pandas')
 643        df = pandas.DataFrame(get_first_valid_dask_partition(df))
 644    return [
 645        col for col, val in df.iloc[0].items()
 646        if not isinstance(val, Hashable)
 647    ]
 648
 649
 650def get_json_cols(df: 'pd.DataFrame') -> List[str]:
 651    """
 652    Get the columns which contain unhashable objects from a Pandas DataFrame.
 653
 654    Parameters
 655    ----------
 656    df: pd.DataFrame
 657        The DataFrame which may contain unhashable objects.
 658
 659    Returns
 660    -------
 661    A list of columns to be encoded as JSON.
 662    """
 663    if df is None:
 664        return []
 665
 666    is_dask = 'dask' in df.__module__ if hasattr(df, '__module__') else False
 667    if is_dask:
 668        df = get_first_valid_dask_partition(df)
 669
 670    if len(df) == 0:
 671        return []
 672
 673    cols_indices = {
 674        col: df[col].first_valid_index()
 675        for col in df.columns
 676    }
 677    return [
 678        col
 679        for col, ix in cols_indices.items()
 680        if (
 681            ix is not None
 682            and isinstance(df.loc[ix][col], (dict, list))
 683        )
 684    ]
 685
 686
 687def get_numeric_cols(df: 'pd.DataFrame') -> List[str]:
 688    """
 689    Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame.
 690
 691    Parameters
 692    ----------
 693    df: pd.DataFrame
 694        The DataFrame which may contain decimal objects.
 695
 696    Returns
 697    -------
 698    A list of columns to treat as numerics.
 699    """
 700    if df is None:
 701        return []
 702    from decimal import Decimal
 703    is_dask = 'dask' in df.__module__
 704    if is_dask:
 705        df = get_first_valid_dask_partition(df)
 706
 707    if len(df) == 0:
 708        return []
 709
 710    cols_indices = {
 711        col: df[col].first_valid_index()
 712        for col in df.columns
 713    }
 714    return [
 715        col
 716        for col, ix in cols_indices.items()
 717        if (
 718            ix is not None
 719            and
 720            isinstance(df.loc[ix][col], Decimal)
 721        )
 722    ]
 723
 724
 725def get_bool_cols(df: 'pd.DataFrame') -> List[str]:
 726    """
 727    Get the columns which contain `bool` objects from a Pandas DataFrame.
 728
 729    Parameters
 730    ----------
 731    df: pd.DataFrame
 732        The DataFrame which may contain bools.
 733
 734    Returns
 735    -------
 736    A list of columns to treat as bools.
 737    """
 738    if df is None:
 739        return []
 740
 741    is_dask = 'dask' in df.__module__
 742    if is_dask:
 743        df = get_first_valid_dask_partition(df)
 744
 745    if len(df) == 0:
 746        return []
 747
 748    from meerschaum.utils.dtypes import are_dtypes_equal
 749
 750    return [
 751        col
 752        for col, typ in df.dtypes.items()
 753        if are_dtypes_equal(str(typ), 'bool')
 754    ]
 755
 756
 757def get_uuid_cols(df: 'pd.DataFrame') -> List[str]:
 758    """
 759    Get the columns which contain `uuid.UUID` objects from a Pandas DataFrame.
 760
 761    Parameters
 762    ----------
 763    df: pd.DataFrame
 764        The DataFrame which may contain UUID objects.
 765
 766    Returns
 767    -------
 768    A list of columns to treat as UUIDs.
 769    """
 770    if df is None:
 771        return []
 772    from uuid import UUID
 773    is_dask = 'dask' in df.__module__
 774    if is_dask:
 775        df = get_first_valid_dask_partition(df)
 776
 777    if len(df) == 0:
 778        return []
 779
 780    cols_indices = {
 781        col: df[col].first_valid_index()
 782        for col in df.columns
 783    }
 784    return [
 785        col
 786        for col, ix in cols_indices.items()
 787        if (
 788            ix is not None
 789            and
 790            isinstance(df.loc[ix][col], UUID)
 791        )
 792    ]
 793
 794
 795def get_datetime_cols(
 796    df: 'pd.DataFrame',
 797    timezone_aware: bool = True,
 798    timezone_naive: bool = True,
 799    with_tz_precision: bool = False,
 800) -> Union[List[str], Dict[str, Tuple[Union[str, None], str]]]:
 801    """
 802    Get the columns which contain `datetime` or `Timestamp` objects from a Pandas DataFrame.
 803
 804    Parameters
 805    ----------
 806    df: pd.DataFrame
 807        The DataFrame which may contain `datetime` or `Timestamp` objects.
 808
 809    timezone_aware: bool, default True
 810        If `True`, include timezone-aware datetime columns.
 811
 812    timezone_naive: bool, default True
 813        If `True`, include timezone-naive datetime columns.
 814
 815    with_tz_precision: bool, default False
 816        If `True`, return a dictionary mapping column names to tuples in the form
 817        `(timezone, precision)`.
 818
 819    Returns
 820    -------
 821    A list of columns to treat as datetimes, or a dictionary of columns to tz+precision tuples
 822    (if `with_tz_precision` is `True`).
 823    """
 824    if not timezone_aware and not timezone_naive:
 825        raise ValueError("`timezone_aware` and `timezone_naive` cannot both be `False`.")
 826
 827    if df is None:
 828        return [] if not with_tz_precision else {}
 829
 830    from datetime import datetime
 831    from meerschaum.utils.dtypes import are_dtypes_equal, MRSM_PRECISION_UNITS_ALIASES
 832    is_dask = 'dask' in df.__module__
 833    if is_dask:
 834        df = get_first_valid_dask_partition(df)
 835   
 836    def get_tz_precision_from_dtype(dtype: str) -> Tuple[Union[str, None], str]:
 837        """
 838        Extract the tz + precision tuple from a dtype string.
 839        """
 840        meta_str = dtype.split('[', maxsplit=1)[-1].rstrip(']').replace(' ', '')
 841        tz = (
 842            None
 843            if ',' not in meta_str
 844            else meta_str.split(',', maxsplit=1)[-1]
 845        )
 846        precision_abbreviation = (
 847            meta_str
 848            if ',' not in meta_str
 849            else meta_str.split(',')[0]
 850        )
 851        precision = MRSM_PRECISION_UNITS_ALIASES[precision_abbreviation]
 852        return tz, precision
 853
 854    def get_tz_precision_from_datetime(dt: datetime) -> Tuple[Union[str, None], str]:
 855        """
 856        Return the tz + precision tuple from a Python datetime object.
 857        """
 858        return dt.tzname(), 'microsecond'
 859
 860    known_dt_cols_types = {
 861        col: str(typ)
 862        for col, typ in df.dtypes.items()
 863        if are_dtypes_equal('datetime', str(typ))
 864    }
 865 
 866    known_dt_cols_tuples = {
 867        col: get_tz_precision_from_dtype(typ)
 868        for col, typ in known_dt_cols_types.items()
 869    }
 870
 871    if len(df) == 0:
 872        return (
 873            list(known_dt_cols_types)
 874            if not with_tz_precision
 875            else known_dt_cols_tuples
 876        )
 877
 878    cols_indices = {
 879        col: df[col].first_valid_index()
 880        for col in df.columns
 881        if col not in known_dt_cols_types
 882    }
 883    pydt_cols_tuples = {
 884        col: get_tz_precision_from_datetime(sample_val)
 885        for col, ix in cols_indices.items()
 886        if (
 887            ix is not None
 888            and
 889            isinstance((sample_val := df.loc[ix][col]), datetime)
 890        )
 891    }
 892
 893    dt_cols_tuples = {
 894        **known_dt_cols_tuples,
 895        **pydt_cols_tuples
 896    }
 897
 898    all_dt_cols_tuples = {
 899        col: dt_cols_tuples[col]
 900        for col in df.columns
 901        if col in dt_cols_tuples
 902    }
 903    if timezone_aware and timezone_naive:
 904        return (
 905            list(all_dt_cols_tuples)
 906            if not with_tz_precision
 907            else all_dt_cols_tuples
 908        )
 909
 910    known_timezone_aware_dt_cols = [
 911        col
 912        for col in known_dt_cols_types
 913        if getattr(df[col], 'tz', None) is not None
 914    ]
 915    timezone_aware_pydt_cols_tuples = {
 916        col: (tz, precision)
 917        for col, (tz, precision) in pydt_cols_tuples.items()
 918        if df.loc[cols_indices[col]][col].tzinfo is not None
 919    }
 920    timezone_aware_dt_cols_set = set(
 921        known_timezone_aware_dt_cols + list(timezone_aware_pydt_cols_tuples)
 922    )
 923    timezone_aware_cols_tuples = {
 924        col: (tz, precision)
 925        for col, (tz, precision) in all_dt_cols_tuples.items()
 926        if col in timezone_aware_dt_cols_set
 927    }
 928    timezone_naive_cols_tuples = {
 929        col: (tz, precision)
 930        for col, (tz, precision) in all_dt_cols_tuples.items()
 931        if col not in timezone_aware_dt_cols_set
 932    }
 933
 934    if timezone_aware:
 935        return (
 936            list(timezone_aware_cols_tuples)
 937            if not with_tz_precision
 938            else timezone_aware_cols_tuples
 939        )
 940
 941    return (
 942        list(timezone_naive_cols_tuples)
 943        if not with_tz_precision
 944        else timezone_naive_cols_tuples
 945    )
 946
 947
 948def get_datetime_cols_types(df: 'pd.DataFrame') -> Dict[str, str]:
 949    """
 950    Return a dictionary mapping datetime columns to specific types strings.
 951
 952    Parameters
 953    ----------
 954    df: pd.DataFrame
 955        The DataFrame which may contain datetime columns.
 956
 957    Returns
 958    -------
 959    A dictionary mapping the datetime columns' names to dtype strings
 960    (containing timezone and precision metadata).
 961
 962    Examples
 963    --------
 964    >>> from datetime import datetime, timezone
 965    >>> import pandas as pd
 966    >>> df = pd.DataFrame({'dt_tz_aware': [datetime(2025, 1, 1, tzinfo=timezone.utc)]})
 967    >>> get_datetime_cols_types(df)
 968    {'dt_tz_aware': 'datetime64[us, UTC]'}
 969    >>> df = pd.DataFrame({'distant_dt': [datetime(1, 1, 1)]})
 970    >>> get_datetime_cols_types(df)
 971    {'distant_dt': 'datetime64[us]'}
 972    >>> df = pd.DataFrame({'dt_second': datetime(2025, 1, 1)})
 973    >>> df['dt_second'] = df['dt_second'].astype('datetime64[s]')
 974    >>> get_datetime_cols_types(df)
 975    {'dt_second': 'datetime64[s]'}
 976    """
 977    from meerschaum.utils.dtypes import MRSM_PRECISION_UNITS_ABBREVIATIONS
 978    dt_cols_tuples = get_datetime_cols(df, with_tz_precision=True)
 979    if not dt_cols_tuples:
 980        return {}
 981
 982    return {
 983        col: (
 984            f"datetime64[{MRSM_PRECISION_UNITS_ABBREVIATIONS[precision]}]"
 985            if tz is None
 986            else f"datetime64[{MRSM_PRECISION_UNITS_ABBREVIATIONS[precision]}, {tz}]"
 987        )
 988        for col, (tz, precision) in dt_cols_tuples.items()
 989    }
 990
 991
 992def get_date_cols(df: 'pd.DataFrame') -> List[str]:
 993    """
 994    Get the `date` columns from a Pandas DataFrame.
 995
 996    Parameters
 997    ----------
 998    df: pd.DataFrame
 999        The DataFrame which may contain dates.
1000
1001    Returns
1002    -------
1003    A list of columns to treat as dates.
1004    """
1005    from meerschaum.utils.dtypes import are_dtypes_equal
1006    if df is None:
1007        return []
1008
1009    is_dask = 'dask' in df.__module__
1010    if is_dask:
1011        df = get_first_valid_dask_partition(df)
1012
1013    known_date_cols = [
1014        col
1015        for col, typ in df.dtypes.items()
1016        if are_dtypes_equal(typ, 'date')
1017    ]
1018
1019    if len(df) == 0:
1020        return known_date_cols
1021
1022    cols_indices = {
1023        col: df[col].first_valid_index()
1024        for col in df.columns
1025        if col not in known_date_cols
1026    }
1027    object_date_cols = [
1028        col
1029        for col, ix in cols_indices.items()
1030        if (
1031            ix is not None
1032            and isinstance(df.loc[ix][col], date)
1033        )
1034    ]
1035
1036    all_date_cols = set(known_date_cols + object_date_cols)
1037
1038    return [
1039        col
1040        for col in df.columns
1041        if col in all_date_cols
1042    ]
1043
1044
1045def get_bytes_cols(df: 'pd.DataFrame') -> List[str]:
1046    """
1047    Get the columns which contain bytes strings from a Pandas DataFrame.
1048
1049    Parameters
1050    ----------
1051    df: pd.DataFrame
1052        The DataFrame which may contain bytes strings.
1053
1054    Returns
1055    -------
1056    A list of columns to treat as bytes.
1057    """
1058    if df is None:
1059        return []
1060
1061    is_dask = 'dask' in df.__module__
1062    if is_dask:
1063        df = get_first_valid_dask_partition(df)
1064
1065    known_bytes_cols = [
1066        col
1067        for col, typ in df.dtypes.items()
1068        if str(typ) == 'binary[pyarrow]'
1069    ]
1070
1071    if len(df) == 0:
1072        return known_bytes_cols
1073
1074    cols_indices = {
1075        col: df[col].first_valid_index()
1076        for col in df.columns
1077        if col not in known_bytes_cols
1078    }
1079    object_bytes_cols = [
1080        col
1081        for col, ix in cols_indices.items()
1082        if (
1083            ix is not None
1084            and isinstance(df.loc[ix][col], bytes)
1085        )
1086    ]
1087
1088    all_bytes_cols = set(known_bytes_cols + object_bytes_cols)
1089
1090    return [
1091        col
1092        for col in df.columns
1093        if col in all_bytes_cols
1094    ]
1095
1096
1097def get_geometry_cols(
1098    df: 'pd.DataFrame',
1099    with_types_srids: bool = False,
1100) -> Union[List[str], Dict[str, Any]]:
1101    """
1102    Get the columns which contain shapely objects from a Pandas DataFrame.
1103
1104    Parameters
1105    ----------
1106    df: pd.DataFrame
1107        The DataFrame which may contain bytes strings.
1108
1109    with_types_srids: bool, default False
1110        If `True`, return a dictionary mapping columns to geometry types and SRIDs.
1111
1112    Returns
1113    -------
1114    A list of columns to treat as `geometry`.
1115    If `with_types_srids`, return a dictionary mapping columns to tuples in the form (type, SRID).
1116    """
1117    if df is None:
1118        return [] if not with_types_srids else {}
1119
1120    is_dask = 'dask' in df.__module__
1121    if is_dask:
1122        df = get_first_valid_dask_partition(df)
1123
1124    if len(df) == 0:
1125        return [] if not with_types_srids else {}
1126
1127    cols_indices = {
1128        col: df[col].first_valid_index()
1129        for col in df.columns
1130    }
1131    geo_cols = [
1132        col
1133        for col, ix in cols_indices.items()
1134        if (
1135            ix is not None
1136            and
1137            'shapely' in str(type(df.loc[ix][col]))
1138        )
1139    ]
1140    if not with_types_srids:
1141        return geo_cols
1142
1143    gpd = mrsm.attempt_import('geopandas', lazy=False)
1144    geo_cols_types_srids = {}
1145    for col in geo_cols:
1146        try:
1147            sample_geo_series = gpd.GeoSeries(df[col], crs=None)
1148            geometry_types = {
1149                geom.geom_type
1150                for geom in sample_geo_series
1151                if hasattr(geom, 'geom_type')
1152            }
1153            geometry_has_z = any(getattr(geom, 'has_z', False) for geom in sample_geo_series)
1154            srid = (
1155                (
1156                    sample_geo_series.crs.sub_crs_list[0].to_epsg()
1157                    if sample_geo_series.crs.is_compound
1158                    else sample_geo_series.crs.to_epsg()
1159                )
1160                if sample_geo_series.crs
1161                else 0
1162            )
1163            geometry_type = list(geometry_types)[0] if len(geometry_types) == 1 else 'geometry'
1164            if geometry_type != 'geometry' and geometry_has_z:
1165                geometry_type = geometry_type + 'Z'
1166        except Exception:
1167            srid = 0
1168            geometry_type = 'geometry'
1169        geo_cols_types_srids[col] = (geometry_type, srid)
1170
1171    return geo_cols_types_srids
1172
1173
1174def get_geometry_cols_types(df: 'pd.DataFrame') -> Dict[str, str]:
1175    """
1176    Return a dtypes dictionary mapping columns to specific geometry types (type, srid).
1177    """
1178    geometry_cols_types_srids = get_geometry_cols(df, with_types_srids=True)
1179    new_cols_types = {}
1180    for col, (geometry_type, srid) in geometry_cols_types_srids.items():
1181        new_dtype = "geometry"
1182        modifier = ""
1183        if not srid and geometry_type.lower() == 'geometry':
1184            new_cols_types[col] = new_dtype
1185            continue
1186
1187        modifier = "["
1188        if geometry_type.lower() != 'geometry':
1189            modifier += f"{geometry_type}"
1190
1191        if srid:
1192            if modifier != '[':
1193                modifier += ", "
1194            modifier += f"{srid}"
1195        modifier += "]"
1196        new_cols_types[col] = f"{new_dtype}{modifier}"
1197    return new_cols_types
1198
1199
1200def get_special_cols(df: 'pd.DataFrame') -> Dict[str, str]:
1201    """
1202    Return a dtypes dictionary mapping special columns to their dtypes.
1203    """
1204    return {
1205        **{col: 'json' for col in get_json_cols(df)},
1206        **{col: 'uuid' for col in get_uuid_cols(df)},
1207        **{col: 'bytes' for col in get_bytes_cols(df)},
1208        **{col: 'bool' for col in get_bool_cols(df)},
1209        **{col: 'numeric' for col in get_numeric_cols(df)},
1210        **{col: 'date' for col in get_date_cols(df)},
1211        **get_datetime_cols_types(df),
1212        **get_geometry_cols_types(df),
1213    }
1214
1215
1216def enforce_dtypes(
1217    df: 'pd.DataFrame',
1218    dtypes: Dict[str, str],
1219    explicit_dtypes: Optional[Dict[str, str]] = None,
1220    safe_copy: bool = True,
1221    coerce_numeric: bool = False,
1222    coerce_timezone: bool = True,
1223    strip_timezone: bool = False,
1224    debug: bool = False,
1225) -> 'pd.DataFrame':
1226    """
1227    Enforce the `dtypes` dictionary on a DataFrame.
1228
1229    Parameters
1230    ----------
1231    df: pd.DataFrame
1232        The DataFrame on which to enforce dtypes.
1233
1234    dtypes: Dict[str, str]
1235        The data types to attempt to enforce on the DataFrame.
1236
1237    explicit_dtypes: Optional[Dict[str, str]], default None
1238        If provided, automatic dtype coersion will respect explicitly configured
1239        dtypes (`int`, `float`, `numeric`).
1240
1241    safe_copy: bool, default True
1242        If `True`, create a copy before comparing and modifying the dataframes.
1243        Setting to `False` may mutate the DataFrames.
1244        See `meerschaum.utils.dataframe.filter_unseen_df`.
1245
1246    coerce_numeric: bool, default False
1247        If `True`, convert float and int collisions to numeric.
1248
1249    coerce_timezone: bool, default True
1250        If `True`, convert datetimes to UTC.
1251
1252    strip_timezone: bool, default False
1253        If `coerce_timezone` and `strip_timezone` are `True`,
1254        remove timezone information from datetimes.
1255
1256    debug: bool, default False
1257        Verbosity toggle.
1258
1259    Returns
1260    -------
1261    The Pandas DataFrame with the types enforced.
1262    """
1263    import json
1264    import functools
1265    from meerschaum.utils.debug import dprint
1266    from meerschaum.utils.formatting import pprint
1267    from meerschaum.utils.dtypes import (
1268        are_dtypes_equal,
1269        to_pandas_dtype,
1270        is_dtype_numeric,
1271        attempt_cast_to_numeric,
1272        attempt_cast_to_uuid,
1273        attempt_cast_to_bytes,
1274        attempt_cast_to_geometry,
1275        coerce_timezone as _coerce_timezone,
1276        get_geometry_type_srid,
1277    )
1278    from meerschaum.utils.dtypes.sql import get_numeric_precision_scale
1279    pandas = mrsm.attempt_import('pandas')
1280    is_dask = 'dask' in df.__module__
1281    if safe_copy:
1282        df = df.copy()
1283    if len(df.columns) == 0:
1284        if debug:
1285            dprint("Incoming DataFrame has no columns. Skipping enforcement...")
1286        return df
1287
1288    explicit_dtypes = explicit_dtypes or {}
1289    pipe_pandas_dtypes = {
1290        col: to_pandas_dtype(typ)
1291        for col, typ in dtypes.items()
1292    }
1293    json_cols = [
1294        col
1295        for col, typ in dtypes.items()
1296        if typ == 'json'
1297    ]
1298    numeric_cols = [
1299        col
1300        for col, typ in dtypes.items()
1301        if typ.startswith('numeric')
1302    ]
1303    geometry_cols_types_srids = {
1304        col: get_geometry_type_srid(typ, default_srid=0)
1305        for col, typ in dtypes.items()
1306        if typ.startswith('geometry') or typ.startswith('geography')
1307    }
1308    uuid_cols = [
1309        col
1310        for col, typ in dtypes.items()
1311        if typ == 'uuid'
1312    ]
1313    bytes_cols = [
1314        col
1315        for col, typ in dtypes.items()
1316        if typ == 'bytes'
1317    ]
1318    datetime_cols = [
1319        col
1320        for col, typ in dtypes.items()
1321        if are_dtypes_equal(typ, 'datetime')
1322    ]
1323    df_numeric_cols = get_numeric_cols(df)
1324    if debug:
1325        dprint("Desired data types:")
1326        pprint(dtypes)
1327        dprint("Data types for incoming DataFrame:")
1328        pprint({_col: str(_typ) for _col, _typ in df.dtypes.items()})
1329
1330    if json_cols and len(df) > 0:
1331        if debug:
1332            dprint(f"Checking columns for JSON encoding: {json_cols}")
1333        for col in json_cols:
1334            if col in df.columns:
1335                try:
1336                    df[col] = df[col].apply(
1337                        (
1338                            lambda x: (
1339                                json.loads(x)
1340                                if isinstance(x, str)
1341                                else x
1342                            )
1343                        )
1344                    )
1345                except Exception as e:
1346                    if debug:
1347                        dprint(f"Unable to parse column '{col}' as JSON:\n{e}")
1348
1349    if numeric_cols:
1350        if debug:
1351            dprint(f"Checking for numerics: {numeric_cols}")
1352        for col in numeric_cols:
1353            precision, scale = get_numeric_precision_scale(None, dtypes.get(col, ''))
1354            if col in df.columns:
1355                try:
1356                    df[col] = df[col].apply(
1357                        functools.partial(
1358                            attempt_cast_to_numeric,
1359                            quantize=True,
1360                            precision=precision,
1361                            scale=scale,
1362                        )
1363                    )
1364                except Exception as e:
1365                    if debug:
1366                        dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}")
1367
1368    if uuid_cols:
1369        if debug:
1370            dprint(f"Checking for UUIDs: {uuid_cols}")
1371        for col in uuid_cols:
1372            if col in df.columns:
1373                try:
1374                    df[col] = df[col].apply(attempt_cast_to_uuid)
1375                except Exception as e:
1376                    if debug:
1377                        dprint(f"Unable to parse column '{col}' as UUID:\n{e}")
1378
1379    if bytes_cols:
1380        if debug:
1381            dprint(f"Checking for bytes: {bytes_cols}")
1382        for col in bytes_cols:
1383            if col in df.columns:
1384                try:
1385                    df[col] = df[col].apply(attempt_cast_to_bytes)
1386                except Exception as e:
1387                    if debug:
1388                        dprint(f"Unable to parse column '{col}' as bytes:\n{e}")
1389
1390    if datetime_cols and coerce_timezone:
1391        if debug:
1392            dprint(f"Checking for datetime conversion: {datetime_cols}")
1393        for col in datetime_cols:
1394            if col in df.columns:
1395                if not strip_timezone and 'utc' in str(df.dtypes[col]).lower():
1396                    if debug:
1397                        dprint(f"Skip UTC coersion for column '{col}' ({str(df[col].dtype)}).")
1398                    continue
1399                if strip_timezone and ',' not in str(df.dtypes[col]):
1400                    if debug:
1401                        dprint(
1402                            f"Skip UTC coersion (stripped) for column '{col}' "
1403                            f"({str(df[col].dtype)})."
1404                        )
1405                        continue
1406
1407                if debug:
1408                    dprint(
1409                        f"Data type for column '{col}' before timezone coersion: "
1410                        f"{str(df[col].dtype)}"
1411                    )
1412
1413                df[col] = _coerce_timezone(df[col], strip_utc=strip_timezone)
1414                if debug:
1415                    dprint(
1416                        f"Data type for column '{col}' after timezone coersion: "
1417                        f"{str(df[col].dtype)}"
1418                    )
1419
1420    if geometry_cols_types_srids:
1421        geopandas = mrsm.attempt_import('geopandas')
1422        if debug:
1423            dprint(f"Checking for geometry: {list(geometry_cols_types_srids)}")
1424        parsed_geom_cols = []
1425        for col in geometry_cols_types_srids:
1426            try:
1427                df[col] = df[col].apply(attempt_cast_to_geometry)
1428                parsed_geom_cols.append(col)
1429            except Exception as e:
1430                if debug:
1431                    dprint(f"Unable to parse column '{col}' as geometry:\n{e}")
1432
1433        if parsed_geom_cols:
1434            if debug:
1435                dprint(f"Converting to GeoDataFrame (geometry column: '{parsed_geom_cols[0]}')...")
1436            try:
1437                _, default_srid = geometry_cols_types_srids[parsed_geom_cols[0]]
1438                df = geopandas.GeoDataFrame(df, geometry=parsed_geom_cols[0], crs=default_srid)
1439                for col, (_, srid) in geometry_cols_types_srids.items():
1440                    if srid:
1441                        if debug:
1442                            dprint(f"Setting '{col}' to SRID '{srid}'...")
1443                        _ = df[col].set_crs(srid)
1444                if parsed_geom_cols[0] not in df.columns:
1445                    df.rename_geometry(parsed_geom_cols[0], inplace=True)
1446            except (ValueError, TypeError):
1447                if debug:
1448                    import traceback
1449                    dprint(f"Failed to cast to GeoDataFrame:\n{traceback.format_exc()}")
1450
1451    df_dtypes = {c: str(t) for c, t in df.dtypes.items()}
1452    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
1453        if debug:
1454            dprint("Data types match. Exiting enforcement...")
1455        return df
1456
1457    common_dtypes = {}
1458    common_diff_dtypes = {}
1459    for col, typ in pipe_pandas_dtypes.items():
1460        if col in df_dtypes:
1461            common_dtypes[col] = typ
1462            if not are_dtypes_equal(typ, df_dtypes[col]):
1463                common_diff_dtypes[col] = df_dtypes[col]
1464
1465    if debug:
1466        dprint("Common columns with different dtypes:")
1467        pprint(common_diff_dtypes)
1468
1469    detected_dt_cols = {}
1470    for col, typ in common_diff_dtypes.items():
1471        if 'datetime' in typ and 'datetime' in common_dtypes[col]:
1472            df_dtypes[col] = typ
1473            detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col])
1474    for col in detected_dt_cols:
1475        del common_diff_dtypes[col]
1476
1477    if debug:
1478        dprint("Common columns with different dtypes (after dates):")
1479        pprint(common_diff_dtypes)
1480
1481    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
1482        if debug:
1483            dprint(
1484                "The incoming DataFrame has mostly the same types, skipping enforcement."
1485                + "The only detected difference was in the following datetime columns."
1486            )
1487            pprint(detected_dt_cols)
1488        return df
1489
1490    for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items():
1491        previous_typ = common_dtypes[col]
1492        mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ))
1493        explicitly_float = are_dtypes_equal(explicit_dtypes.get(col, 'object'), 'float')
1494        explicitly_int = are_dtypes_equal(explicit_dtypes.get(col, 'object'), 'int')
1495        explicitly_numeric = explicit_dtypes.get(col, 'object').startswith('numeric')
1496        all_nan = (
1497            df[col].isnull().all()
1498            if mixed_numeric_types and coerce_numeric and not (explicitly_float or explicitly_int)
1499            else None
1500        )
1501        cast_to_numeric = explicitly_numeric or (
1502            (
1503                col in df_numeric_cols
1504                or (
1505                    mixed_numeric_types
1506                    and not (explicitly_float or explicitly_int)
1507                    and not all_nan
1508                    and coerce_numeric
1509                )
1510            )
1511        )
1512
1513        if debug and (explicitly_numeric or df_numeric_cols or mixed_numeric_types):
1514            from meerschaum.utils.formatting import make_header
1515            msg = (
1516                make_header(f"Coercing column '{col}' to numeric:", left_pad=0)
1517                + "\n"
1518                + f"  Previous type: {previous_typ}\n"
1519                + f"  Current type: {typ if col not in df_numeric_cols else 'Decimal'}"
1520                + ("\n  Column is explicitly numeric." if explicitly_numeric else "")
1521            ) if cast_to_numeric else (
1522                f"Will not coerce column '{col}' to numeric.\n"
1523                f"  Numeric columns in dataframe: {df_numeric_cols}\n"
1524                f"  Mixed numeric types: {mixed_numeric_types}\n"
1525                f"  Explicitly float: {explicitly_float}\n"
1526                f"  Explicitly int: {explicitly_int}\n"
1527                f"  All NaN: {all_nan}\n"
1528                f"  Coerce numeric: {coerce_numeric}"
1529            )
1530            dprint(msg)
1531
1532        if cast_to_numeric:
1533            common_dtypes[col] = attempt_cast_to_numeric
1534            common_diff_dtypes[col] = attempt_cast_to_numeric
1535
1536    for d in common_diff_dtypes:
1537        t = common_dtypes[d]
1538        if debug:
1539            dprint(f"Casting column {d} to dtype {t}.")
1540        try:
1541            df[d] = (
1542                df[d].apply(t)
1543                if callable(t)
1544                else df[d].astype(t)
1545            )
1546        except Exception as e:
1547            if debug:
1548                dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}\ndf:\n{df}")
1549            if 'int' in str(t).lower():
1550                try:
1551                    df[d] = df[d].astype('float64').astype(t)
1552                except Exception:
1553                    if debug:
1554                        dprint(f"Was unable to convert to float then {t}.")
1555    return df
1556
1557
1558def get_datetime_bound_from_df(
1559    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]],
1560    datetime_column: str,
1561    minimum: bool = True,
1562) -> Union[int, datetime, None]:
1563    """
1564    Return the minimum or maximum datetime (or integer) from a DataFrame.
1565
1566    Parameters
1567    ----------
1568    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]
1569        The DataFrame, list, or dict which contains the range axis.
1570
1571    datetime_column: str
1572        The name of the datetime (or int) column.
1573
1574    minimum: bool
1575        Whether to return the minimum (default) or maximum value.
1576
1577    Returns
1578    -------
1579    The minimum or maximum datetime value in the dataframe, or `None`.
1580    """
1581    from meerschaum.utils.dtypes import to_datetime, value_is_null
1582
1583    if df is None:
1584        return None
1585    if not datetime_column:
1586        return None
1587
1588    def compare(a, b):
1589        if a is None:
1590            return b
1591        if b is None:
1592            return a
1593        if minimum:
1594            return a if a < b else b
1595        return a if a > b else b
1596
1597    if isinstance(df, list):
1598        if len(df) == 0:
1599            return None
1600        best_yet = df[0].get(datetime_column, None)
1601        for doc in df:
1602            val = doc.get(datetime_column, None)
1603            best_yet = compare(best_yet, val)
1604        return best_yet
1605
1606    if isinstance(df, dict):
1607        if datetime_column not in df:
1608            return None
1609        best_yet = df[datetime_column][0]
1610        for val in df[datetime_column]:
1611            best_yet = compare(best_yet, val)
1612        return best_yet
1613
1614    if 'DataFrame' in str(type(df)):
1615        from meerschaum.utils.dtypes import are_dtypes_equal
1616        pandas = mrsm.attempt_import('pandas')
1617        is_dask = 'dask' in df.__module__
1618
1619        if datetime_column not in df.columns:
1620            return None
1621
1622        try:
1623            dt_val = (
1624                df[datetime_column].min(skipna=True)
1625                if minimum
1626                else df[datetime_column].max(skipna=True)
1627            )
1628        except Exception:
1629            dt_val = pandas.NA
1630        if is_dask and dt_val is not None and dt_val is not pandas.NA:
1631            dt_val = dt_val.compute()
1632
1633        return (
1634            to_datetime(dt_val, as_pydatetime=True)
1635            if are_dtypes_equal(str(type(dt_val)), 'datetime')
1636            else (dt_val if not value_is_null(dt_val) else None)
1637        )
1638
1639    return None
1640
1641
1642def get_unique_index_values(
1643    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]],
1644    indices: List[str],
1645) -> Dict[str, List[Any]]:
1646    """
1647    Return a dictionary of the unique index values in a DataFrame.
1648
1649    Parameters
1650    ----------
1651    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]
1652        The dataframe (or list or dict) which contains index values.
1653
1654    indices: List[str]
1655        The list of index columns.
1656
1657    Returns
1658    -------
1659    A dictionary mapping indices to unique values.
1660    """
1661    if df is None:
1662        return {}
1663    if 'dataframe' in str(type(df)).lower():
1664        pandas = mrsm.attempt_import('pandas')
1665        return {
1666            col: list({
1667                (val if val is not pandas.NA else None)
1668                for val in df[col].unique()
1669            })
1670            for col in indices
1671            if col in df.columns
1672        }
1673
1674    unique_indices = defaultdict(lambda: set())
1675    if isinstance(df, list):
1676        for doc in df:
1677            for index in indices:
1678                if index in doc:
1679                    unique_indices[index].add(doc[index])
1680
1681    elif isinstance(df, dict):
1682        for index in indices:
1683            if index in df:
1684                unique_indices[index] = unique_indices[index].union(set(df[index]))
1685
1686    return {key: list(val) for key, val in unique_indices.items()}
1687
1688
1689def df_is_chunk_generator(df: Any) -> bool:
1690    """
1691    Determine whether to treat `df` as a chunk generator.
1692
1693    Note this should only be used in a context where generators are expected,
1694    as it will return `True` for any iterable.
1695
1696    Parameters
1697    ----------
1698    The DataFrame or chunk generator to evaluate.
1699
1700    Returns
1701    -------
1702    A `bool` indicating whether to treat `df` as a generator.
1703    """
1704    return (
1705        not isinstance(df, (dict, list, str))
1706        and 'DataFrame' not in str(type(df))
1707        and isinstance(df, (Generator, Iterable, Iterator))
1708    )
1709
1710
1711def chunksize_to_npartitions(chunksize: Optional[int]) -> int:
1712    """
1713    Return the Dask `npartitions` value for a given `chunksize`.
1714    """
1715    if chunksize == -1:
1716        from meerschaum.config import get_config
1717        chunksize = get_config('system', 'connectors', 'sql', 'chunksize')
1718    if chunksize is None:
1719        return 1
1720    return -1 * chunksize
1721
1722
1723def df_from_literal(
1724    pipe: Optional[mrsm.Pipe] = None,
1725    literal: Optional[str] = None,
1726    debug: bool = False
1727) -> 'pd.DataFrame':
1728    """
1729    Construct a dataframe from a literal value, using the pipe's datetime and value column names.
1730
1731    Parameters
1732    ----------
1733    pipe: Optional['meerschaum.Pipe'], default None
1734        The pipe which will consume the literal value.
1735
1736    Returns
1737    -------
1738    A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
1739    and the literal as the value.
1740    """
1741    from meerschaum.utils.packages import import_pandas
1742    from meerschaum.utils.warnings import error, warn
1743    from meerschaum.utils.debug import dprint
1744    from meerschaum.utils.dtypes import get_current_timestamp
1745
1746    if pipe is None or literal is None:
1747        error("Please provide a Pipe and a literal value")
1748
1749    dt_col = pipe.columns.get(
1750        'datetime',
1751        mrsm.get_config('pipes', 'autotime', 'column_name_if_datetime_missing')
1752    )
1753    val_col = pipe.get_val_column(debug=debug)
1754
1755    val = literal
1756    if isinstance(literal, str):
1757        if debug:
1758            dprint(f"Received literal string: '{literal}'")
1759        import ast
1760        try:
1761            val = ast.literal_eval(literal)
1762        except Exception:
1763            warn(
1764                "Failed to parse value from string:\n" + f"{literal}" +
1765                "\n\nWill cast as a string instead."\
1766            )
1767            val = literal
1768
1769    now = get_current_timestamp(pipe.precision)
1770    pd = import_pandas()
1771    return pd.DataFrame({dt_col: [now], val_col: [val]})
1772
1773
1774def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]:
1775    """
1776    Return the first valid Dask DataFrame partition (if possible).
1777    """
1778    pdf = None
1779    for partition in ddf.partitions:
1780        try:
1781            pdf = partition.compute()
1782        except Exception:
1783            continue
1784        if len(pdf) > 0:
1785            return pdf
1786    _ = mrsm.attempt_import('partd', lazy=False)
1787    return ddf.compute()
1788
1789
1790def query_df(
1791    df: 'pd.DataFrame',
1792    params: Optional[Dict[str, Any]] = None,
1793    begin: Union[datetime, int, None] = None,
1794    end: Union[datetime, int, None] = None,
1795    datetime_column: Optional[str] = None,
1796    select_columns: Optional[List[str]] = None,
1797    omit_columns: Optional[List[str]] = None,
1798    inplace: bool = False,
1799    reset_index: bool = False,
1800    coerce_types: bool = False,
1801    debug: bool = False,
1802) -> 'pd.DataFrame':
1803    """
1804    Query the dataframe with the params dictionary.
1805
1806    Parameters
1807    ----------
1808    df: pd.DataFrame
1809        The DataFrame to query against.
1810
1811    params: Optional[Dict[str, Any]], default None
1812        The parameters dictionary to use for the query.
1813
1814    begin: Union[datetime, int, None], default None
1815        If `begin` and `datetime_column` are provided, only return rows with a timestamp
1816        greater than or equal to this value.
1817
1818    end: Union[datetime, int, None], default None
1819        If `begin` and `datetime_column` are provided, only return rows with a timestamp
1820        less than this value.
1821
1822    datetime_column: Optional[str], default None
1823        A `datetime_column` must be provided to use `begin` and `end`.
1824
1825    select_columns: Optional[List[str]], default None
1826        If provided, only return these columns.
1827
1828    omit_columns: Optional[List[str]], default None
1829        If provided, do not include these columns in the result.
1830
1831    inplace: bool, default False
1832        If `True`, modify the DataFrame inplace rather than creating a new DataFrame.
1833
1834    reset_index: bool, default False
1835        If `True`, reset the index in the resulting DataFrame.
1836
1837    coerce_types: bool, default False
1838        If `True`, cast the dataframe and parameters as strings before querying.
1839
1840    Returns
1841    -------
1842    A Pandas DataFrame query result.
1843    """
1844
1845    def _process_select_columns(_df):
1846        if not select_columns:
1847            return
1848        for col in list(_df.columns):
1849            if col not in select_columns:
1850                del _df[col]
1851
1852    def _process_omit_columns(_df):
1853        if not omit_columns:
1854            return
1855        for col in list(_df.columns):
1856            if col in omit_columns:
1857                del _df[col]
1858
1859    if not params and not begin and not end:
1860        if not inplace:
1861            df = df.copy()
1862        _process_select_columns(df)
1863        _process_omit_columns(df)
1864        return df
1865
1866    from meerschaum.utils.debug import dprint
1867    from meerschaum.utils.misc import get_in_ex_params
1868    from meerschaum.utils.warnings import warn
1869    from meerschaum.utils.dtypes import are_dtypes_equal, value_is_null
1870    dateutil_parser = mrsm.attempt_import('dateutil.parser')
1871    pandas = mrsm.attempt_import('pandas')
1872    NA = pandas.NA
1873
1874    if params:
1875        proto_in_ex_params = get_in_ex_params(params)
1876        for key, (proto_in_vals, proto_ex_vals) in proto_in_ex_params.items():
1877            if proto_ex_vals:
1878                coerce_types = True
1879                break
1880        params = params.copy()
1881        for key, val in {k: v for k, v in params.items()}.items():
1882            if isinstance(val, (list, tuple, set)) or hasattr(val, 'astype'):
1883                if None in val:
1884                    val = [item for item in val if item is not None] + [NA]
1885                    params[key] = val
1886                if coerce_types:
1887                    params[key] = [str(x) for x in val]
1888            else:
1889                if value_is_null(val):
1890                    val = NA
1891                    params[key] = NA
1892                if coerce_types:
1893                    params[key] = str(val)
1894
1895    dtypes = {col: str(typ) for col, typ in df.dtypes.items()}
1896
1897    if inplace:
1898        df.fillna(NA, inplace=True)
1899    else:
1900        df = df.infer_objects().fillna(NA)
1901
1902    if isinstance(begin, str):
1903        begin = dateutil_parser.parse(begin)
1904    if isinstance(end, str):
1905        end = dateutil_parser.parse(end)
1906
1907    if begin is not None or end is not None:
1908        if not datetime_column or datetime_column not in df.columns:
1909            warn(
1910                f"The datetime column '{datetime_column}' is not present in the Dataframe, "
1911                + "ignoring begin and end...",
1912            )
1913            begin, end = None, None
1914
1915    if debug:
1916        dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}")
1917
1918    if datetime_column and (begin is not None or end is not None):
1919        if debug:
1920            dprint("Checking for datetime column compatability.")
1921
1922        from meerschaum.utils.dtypes import coerce_timezone
1923        df_is_dt = are_dtypes_equal(str(df.dtypes[datetime_column]), 'datetime')
1924        begin_is_int = are_dtypes_equal(str(type(begin)), 'int')
1925        end_is_int = are_dtypes_equal(str(type(end)), 'int')
1926
1927        if df_is_dt:
1928            df_tz = (
1929                getattr(df[datetime_column].dt, 'tz', None)
1930                if hasattr(df[datetime_column], 'dt')
1931                else None
1932            )
1933
1934            if begin_is_int:
1935                begin = datetime.fromtimestamp(int(begin), timezone.utc).replace(tzinfo=None)
1936                if debug:
1937                    dprint(f"`begin` will be cast to '{begin}'.")
1938            if end_is_int:
1939                end = datetime.fromtimestamp(int(end), timezone.utc).replace(tzinfo=None)
1940                if debug:
1941                    dprint(f"`end` will be cast to '{end}'.")
1942
1943            begin = coerce_timezone(begin, strip_utc=(df_tz is None)) if begin is not None else None
1944            end = coerce_timezone(end, strip_utc=(df_tz is None)) if begin is not None else None
1945
1946    in_ex_params = get_in_ex_params(params)
1947
1948    masks = [
1949        (
1950            (df[datetime_column] >= begin)
1951            if begin is not None and datetime_column
1952            else True
1953        ) & (
1954            (df[datetime_column] < end)
1955            if end is not None and datetime_column
1956            else True
1957        )
1958    ]
1959
1960    masks.extend([
1961        (
1962            (
1963                (df[col] if not coerce_types else df[col].astype(str)).isin(in_vals)
1964                if in_vals
1965                else True
1966            ) & (
1967                ~(df[col] if not coerce_types else df[col].astype(str)).isin(ex_vals)
1968                if ex_vals
1969                else True
1970            )
1971        )
1972        for col, (in_vals, ex_vals) in in_ex_params.items()
1973        if col in df.columns
1974    ])
1975    query_mask = masks[0]
1976    for mask in masks[1:]:
1977        query_mask = query_mask & mask
1978
1979    original_cols = df.columns
1980
1981    ### NOTE: We must cast bool columns to `boolean[pyarrow]`
1982    ###       to allow for `<NA>` values.
1983    bool_cols = [
1984        col
1985        for col, typ in df.dtypes.items()
1986        if are_dtypes_equal(str(typ), 'bool')
1987    ]
1988    for col in bool_cols:
1989        df[col] = df[col].astype('boolean[pyarrow]')
1990
1991    if not isinstance(query_mask, bool):
1992        df['__mrsm_mask'] = (
1993            query_mask.astype('boolean[pyarrow]')
1994            if hasattr(query_mask, 'astype')
1995            else query_mask
1996        )
1997
1998        if inplace:
1999            df.where(query_mask, other=NA, inplace=True)
2000            df.dropna(how='all', inplace=True)
2001            result_df = df
2002        else:
2003            result_df = df.where(query_mask, other=NA)
2004            result_df.dropna(how='all', inplace=True)
2005
2006    else:
2007        result_df = df
2008
2009    if '__mrsm_mask' in df.columns:
2010        del df['__mrsm_mask']
2011    if '__mrsm_mask' in result_df.columns:
2012        del result_df['__mrsm_mask']
2013
2014    if reset_index:
2015        result_df.reset_index(drop=True, inplace=True)
2016
2017    result_df = enforce_dtypes(
2018        result_df,
2019        dtypes,
2020        safe_copy=False,
2021        debug=debug,
2022        coerce_numeric=False,
2023        coerce_timezone=False,
2024    )
2025
2026    if select_columns == ['*']:
2027        select_columns = None
2028
2029    if not select_columns and not omit_columns:
2030        return result_df[original_cols]
2031
2032    _process_select_columns(result_df)
2033    _process_omit_columns(result_df)
2034
2035    return result_df
2036
2037
2038def to_json(
2039    df: 'pd.DataFrame',
2040    safe_copy: bool = True,
2041    orient: str = 'records',
2042    date_format: str = 'iso',
2043    date_unit: str = 'us',
2044    double_precision: int = 15,
2045    geometry_format: str = 'geojson',
2046    **kwargs: Any
2047) -> str:
2048    """
2049    Serialize the given dataframe as a JSON string.
2050
2051    Parameters
2052    ----------
2053    df: pd.DataFrame
2054        The DataFrame to be serialized.
2055
2056    safe_copy: bool, default True
2057        If `False`, modify the DataFrame inplace.
2058
2059    date_format: str, default 'iso'
2060        The default format for timestamps.
2061
2062    date_unit: str, default 'us'
2063        The precision of the timestamps.
2064
2065    double_precision: int, default 15
2066        The number of decimal places to use when encoding floating point values (maximum 15).
2067
2068    geometry_format: str, default 'geojson'
2069        The serialization format for geometry data.
2070        Accepted values are `geojson`, `wkb_hex`, and `wkt`.
2071
2072    Returns
2073    -------
2074    A JSON string.
2075    """
2076    import warnings
2077    import functools
2078    from meerschaum.utils.packages import import_pandas
2079    from meerschaum.utils.dtypes import (
2080        serialize_bytes,
2081        serialize_decimal,
2082        serialize_geometry,
2083    )
2084    pd = import_pandas()
2085    uuid_cols = get_uuid_cols(df)
2086    bytes_cols = get_bytes_cols(df)
2087    numeric_cols = get_numeric_cols(df)
2088    geometry_cols = get_geometry_cols(df)
2089    geometry_cols_srids = {
2090        col: int((getattr(df[col].crs, 'srs', '') or '').split(':', maxsplit=1)[-1] or '0')
2091        for col in geometry_cols
2092    } if 'geodataframe' in str(type(df)).lower() else {}
2093    if safe_copy and bool(uuid_cols or bytes_cols or geometry_cols or numeric_cols):
2094        df = df.copy()
2095    for col in uuid_cols:
2096        df[col] = df[col].astype(str)
2097    for col in bytes_cols:
2098        df[col] = df[col].apply(serialize_bytes)
2099    for col in numeric_cols:
2100        df[col] = df[col].apply(serialize_decimal)
2101    with warnings.catch_warnings():
2102        warnings.simplefilter("ignore")
2103        for col in geometry_cols:
2104            srid = geometry_cols_srids.get(col, None) or None
2105            df[col] = df[col].apply(
2106                functools.partial(
2107                    serialize_geometry,
2108                    geometry_format=geometry_format,
2109                    srid=srid,
2110                )
2111            )
2112    return df.infer_objects(copy=False).fillna(pd.NA).to_json(
2113        date_format=date_format,
2114        date_unit=date_unit,
2115        double_precision=double_precision,
2116        orient=orient,
2117        **kwargs
2118    )
2119
2120
2121def to_simple_lines(df: 'pd.DataFrame') -> str:
2122    """
2123    Serialize a Pandas Dataframe as lines of simple dictionaries.
2124
2125    Parameters
2126    ----------
2127    df: pd.DataFrame
2128        The dataframe to serialize into simple lines text.
2129
2130    Returns
2131    -------
2132    A string of simple line dictionaries joined by newlines.
2133    """
2134    from meerschaum.utils.misc import to_simple_dict
2135    if df is None or len(df) == 0:
2136        return ''
2137
2138    docs = df.to_dict(orient='records')
2139    return '\n'.join(to_simple_dict(doc) for doc in docs)
2140
2141
2142def parse_simple_lines(data: str) -> 'pd.DataFrame':
2143    """
2144    Parse simple lines text into a DataFrame.
2145
2146    Parameters
2147    ----------
2148    data: str
2149        The simple lines text to parse into a DataFrame.
2150
2151    Returns
2152    -------
2153    A dataframe containing the rows serialized in `data`.
2154    """
2155    from meerschaum.utils.misc import string_to_dict
2156    from meerschaum.utils.packages import import_pandas
2157    pd = import_pandas()
2158    lines = data.splitlines()
2159    try:
2160        docs = [string_to_dict(line) for line in lines]
2161        df = pd.DataFrame(docs)
2162    except Exception:
2163        df = None
2164
2165    if df is None:
2166        raise ValueError("Cannot parse simple lines into a dataframe.")
2167
2168    return df
def add_missing_cols_to_df( df: pandas.core.frame.DataFrame, dtypes: Dict[str, Any]) -> pandas.core.frame.DataFrame:
25def add_missing_cols_to_df(
26    df: 'pd.DataFrame',
27    dtypes: Dict[str, Any],
28) -> 'pd.DataFrame':
29    """
30    Add columns from the dtypes dictionary as null columns to a new DataFrame.
31
32    Parameters
33    ----------
34    df: pd.DataFrame
35        The dataframe we should copy and add null columns.
36
37    dtypes:
38        The data types dictionary which may contain keys not present in `df.columns`.
39
40    Returns
41    -------
42    A new `DataFrame` with the keys from `dtypes` added as null columns.
43    If `df.dtypes` is the same as `dtypes`, then return a reference to `df`.
44    NOTE: This will not ensure that dtypes are enforced!
45
46    Examples
47    --------
48    >>> import pandas as pd
49    >>> df = pd.DataFrame([{'a': 1}])
50    >>> dtypes = {'b': 'Int64'}
51    >>> add_missing_cols_to_df(df, dtypes)
52          a  b
53       0  1  <NA>
54    >>> add_missing_cols_to_df(df, dtypes).dtypes
55    a    int64
56    b    Int64
57    dtype: object
58    >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes
59    a    int64
60    dtype: object
61    >>> 
62    """
63    if set(df.columns) == set(dtypes):
64        return df
65
66    from meerschaum.utils.packages import attempt_import
67    from meerschaum.utils.dtypes import to_pandas_dtype
68    pandas = attempt_import('pandas')
69
70    def build_series(dtype: str):
71        return pandas.Series([], dtype=to_pandas_dtype(dtype))
72
73    assign_kwargs = {
74        str(col): build_series(str(typ))
75        for col, typ in dtypes.items()
76        if col not in df.columns
77    }
78    df_with_cols = df.assign(**assign_kwargs)
79    for col in assign_kwargs:
80        df_with_cols[col] = df_with_cols[col].fillna(pandas.NA)
81    return df_with_cols

Add columns from the dtypes dictionary as null columns to a new DataFrame.

Parameters
  • df (pd.DataFrame): The dataframe we should copy and add null columns.
  • dtypes:: The data types dictionary which may contain keys not present in df.columns.
Returns
  • A new DataFrame with the keys from dtypes added as null columns.
  • If df.dtypes is the same as dtypes, then return a reference to df.
  • NOTE (This will not ensure that dtypes are enforced!):
Examples
>>> import pandas as pd
>>> df = pd.DataFrame([{'a': 1}])
>>> dtypes = {'b': 'Int64'}
>>> add_missing_cols_to_df(df, dtypes)
      a  b
   0  1  <NA>
>>> add_missing_cols_to_df(df, dtypes)meerschaum.utils.dtypes
a    int64
b    Int64
dtype: object
>>> add_missing_cols_to_df(df, {'a': 'object'})meerschaum.utils.dtypes
a    int64
dtype: object
>>>
def filter_unseen_df( old_df: pandas.core.frame.DataFrame, new_df: pandas.core.frame.DataFrame, safe_copy: bool = True, dtypes: Optional[Dict[str, Any]] = None, include_unchanged_columns: bool = False, coerce_mixed_numerics: bool = True, debug: bool = False) -> pandas.core.frame.DataFrame:
 84def filter_unseen_df(
 85    old_df: 'pd.DataFrame',
 86    new_df: 'pd.DataFrame',
 87    safe_copy: bool = True,
 88    dtypes: Optional[Dict[str, Any]] = None,
 89    include_unchanged_columns: bool = False,
 90    coerce_mixed_numerics: bool = True,
 91    debug: bool = False,
 92) -> 'pd.DataFrame':
 93    """
 94    Left join two DataFrames to find the newest unseen data.
 95
 96    Parameters
 97    ----------
 98    old_df: 'pd.DataFrame'
 99        The original (target) dataframe. Acts as a filter on the `new_df`.
100
101    new_df: 'pd.DataFrame'
102        The fetched (source) dataframe. Rows that are contained in `old_df` are removed.
103
104    safe_copy: bool, default True
105        If `True`, create a copy before comparing and modifying the dataframes.
106        Setting to `False` may mutate the DataFrames.
107
108    dtypes: Optional[Dict[str, Any]], default None
109        Optionally specify the datatypes of the dataframe.
110
111    include_unchanged_columns: bool, default False
112        If `True`, include columns which haven't changed on rows which have changed.
113
114    coerce_mixed_numerics: bool, default True
115        If `True`, cast mixed integer and float columns between the old and new dataframes into
116        numeric values (`decimal.Decimal`).
117
118    debug: bool, default False
119        Verbosity toggle.
120
121    Returns
122    -------
123    A pandas dataframe of the new, unseen rows in `new_df`.
124
125    Examples
126    --------
127    ```python
128    >>> import pandas as pd
129    >>> df1 = pd.DataFrame({'a': [1,2]})
130    >>> df2 = pd.DataFrame({'a': [2,3]})
131    >>> filter_unseen_df(df1, df2)
132       a
133    0  3
134
135    ```
136
137    """
138    if old_df is None:
139        return new_df
140
141    if safe_copy:
142        old_df = old_df.copy()
143        new_df = new_df.copy()
144
145    import json
146    import functools
147    import traceback
148    from meerschaum.utils.warnings import warn
149    from meerschaum.utils.packages import import_pandas, attempt_import
150    from meerschaum.utils.dtypes import (
151        to_pandas_dtype,
152        are_dtypes_equal,
153        attempt_cast_to_numeric,
154        attempt_cast_to_uuid,
155        attempt_cast_to_bytes,
156        attempt_cast_to_geometry,
157        coerce_timezone,
158        serialize_decimal,
159    )
160    from meerschaum.utils.dtypes.sql import get_numeric_precision_scale
161    pd = import_pandas(debug=debug)
162    is_dask = 'dask' in new_df.__module__
163    if is_dask:
164        pandas = attempt_import('pandas')
165        _ = attempt_import('partd', lazy=False)
166        dd = attempt_import('dask.dataframe')
167        merge = dd.merge
168        NA = pandas.NA
169    else:
170        merge = pd.merge
171        NA = pd.NA
172
173    new_df_dtypes = dict(new_df.dtypes)
174    old_df_dtypes = dict(old_df.dtypes)
175
176    same_cols = set(new_df.columns) == set(old_df.columns)
177    if not same_cols:
178        new_df = add_missing_cols_to_df(new_df, old_df_dtypes)
179        old_df = add_missing_cols_to_df(old_df, new_df_dtypes)
180
181        new_types_missing_from_old = {
182            col: typ
183            for col, typ in new_df_dtypes.items()
184            if col not in old_df_dtypes
185        }
186        old_types_missing_from_new = {
187            col: typ
188            for col, typ in new_df_dtypes.items()
189            if col not in old_df_dtypes
190        }
191        old_df_dtypes.update(new_types_missing_from_old)
192        new_df_dtypes.update(old_types_missing_from_new)
193
194    ### Edge case: two empty lists cast to DFs.
195    elif len(new_df.columns) == 0:
196        return new_df
197
198    try:
199        ### Order matters when checking equality.
200        new_df = new_df[old_df.columns]
201
202    except Exception as e:
203        warn(
204            "Was not able to cast old columns onto new DataFrame. " +
205            f"Are both DataFrames the same shape? Error:\n{e}",
206            stacklevel=3,
207        )
208        return new_df[list(new_df_dtypes.keys())]
209
210    ### assume the old_df knows what it's doing, even if it's technically wrong.
211    if dtypes is None:
212        dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()}
213
214    dtypes = {
215        col: to_pandas_dtype(typ)
216        for col, typ in dtypes.items()
217        if col in new_df_dtypes and col in old_df_dtypes
218    }
219    for col, typ in new_df_dtypes.items():
220        if col not in dtypes:
221            dtypes[col] = typ
222
223    numeric_cols_precisions_scales = {
224        col: get_numeric_precision_scale(None, typ)
225        for col, typ in dtypes.items()
226        if col and str(typ).lower().startswith('numeric')
227    }
228
229    dt_dtypes = {
230        col: typ
231        for col, typ in dtypes.items()
232        if are_dtypes_equal(typ, 'datetime')
233    }
234    non_dt_dtypes = {
235        col: typ
236        for col, typ in dtypes.items()
237        if col not in dt_dtypes
238    }
239
240    cast_non_dt_cols = True
241    try:
242        new_df = new_df.astype(non_dt_dtypes)
243        cast_non_dt_cols = False
244    except Exception as e:
245        warn(
246            f"Was not able to cast the new DataFrame to the given dtypes.\n{e}"
247        )
248
249    cast_dt_cols = True
250    try:
251        for col, typ in dt_dtypes.items():
252            _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime'))
253            strip_utc = (
254                _dtypes_col_dtype.startswith('datetime64')
255                and 'utc' not in _dtypes_col_dtype.lower()
256            )
257            if col in old_df.columns:
258                old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc)
259            if col in new_df.columns:
260                new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc)
261        cast_dt_cols = False
262    except Exception as e:
263        warn(f"Could not cast datetime columns:\n{e}")
264
265    cast_cols = cast_dt_cols or cast_non_dt_cols
266
267    new_numeric_cols_existing = get_numeric_cols(new_df)
268    old_numeric_cols = get_numeric_cols(old_df)
269    for col, typ in {k: v for k, v in dtypes.items()}.items():
270        if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')):
271            new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float')
272            new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int')
273            new_is_numeric = col in new_numeric_cols_existing
274            old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float')
275            old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int')
276            old_is_numeric = col in old_numeric_cols
277
278            if (
279                coerce_mixed_numerics
280                and
281                (new_is_float or new_is_int or new_is_numeric)
282                and
283                (old_is_float or old_is_int or old_is_numeric)
284            ):
285                dtypes[col] = attempt_cast_to_numeric
286                cast_cols = True
287                continue
288
289            ### Fallback to object if the types don't match.
290            warn(
291                f"Detected different types for '{col}' "
292                + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), "
293                + "falling back to 'object'..."
294            )
295            dtypes[col] = 'object'
296            cast_cols = True
297
298    if cast_cols:
299        for col, dtype in dtypes.items():
300            if col in new_df.columns:
301                try:
302                    new_df[col] = (
303                        new_df[col].astype(dtype)
304                        if not callable(dtype)
305                        else new_df[col].apply(dtype)
306                    )
307                except Exception as e:
308                    warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}")
309
310    serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str)
311    new_json_cols = get_json_cols(new_df)
312    old_json_cols = get_json_cols(old_df)
313    json_cols = set(new_json_cols + old_json_cols)
314    for json_col in old_json_cols:
315        old_df[json_col] = old_df[json_col].apply(serializer)
316    for json_col in new_json_cols:
317        new_df[json_col] = new_df[json_col].apply(serializer)
318
319    new_numeric_cols = get_numeric_cols(new_df)
320    numeric_cols = set(new_numeric_cols + old_numeric_cols)
321    for numeric_col in old_numeric_cols:
322        old_df[numeric_col] = old_df[numeric_col].apply(serialize_decimal)
323    for numeric_col in new_numeric_cols:
324        new_df[numeric_col] = new_df[numeric_col].apply(serialize_decimal)
325
326    old_dt_cols = [
327        col
328        for col, typ in old_df.dtypes.items()
329        if are_dtypes_equal(str(typ), 'datetime')
330    ]
331    for col in old_dt_cols:
332        _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime'))
333        strip_utc = (
334            _dtypes_col_dtype.startswith('datetime64')
335            and 'utc' not in _dtypes_col_dtype.lower()
336        )
337        old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc)
338
339    new_dt_cols = [
340        col
341        for col, typ in new_df.dtypes.items()
342        if are_dtypes_equal(str(typ), 'datetime')
343    ]
344    for col in new_dt_cols:
345        _dtypes_col_dtype = str((dtypes or {}).get(col, 'datetime'))
346        strip_utc = (
347            _dtypes_col_dtype.startswith('datetime64')
348            and 'utc' not in _dtypes_col_dtype.lower()
349        )
350        new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc)
351
352    old_uuid_cols = get_uuid_cols(old_df)
353    new_uuid_cols = get_uuid_cols(new_df)
354    uuid_cols = set(new_uuid_cols + old_uuid_cols)
355
356    old_bytes_cols = get_bytes_cols(old_df)
357    new_bytes_cols = get_bytes_cols(new_df)
358    bytes_cols = set(new_bytes_cols + old_bytes_cols)
359
360    old_geometry_cols = get_geometry_cols(old_df)
361    new_geometry_cols = get_geometry_cols(new_df)
362    geometry_cols = set(new_geometry_cols + old_geometry_cols)
363
364    joined_df = merge(
365        new_df.infer_objects(copy=False).fillna(NA),
366        old_df.infer_objects(copy=False).fillna(NA),
367        how='left',
368        on=None,
369        indicator=True,
370    )
371    changed_rows_mask = (joined_df['_merge'] == 'left_only')
372    new_cols = list(new_df_dtypes)
373    delta_df = joined_df[new_cols][changed_rows_mask].reset_index(drop=True)
374
375    for json_col in json_cols:
376        if json_col not in delta_df.columns:
377            continue
378        try:
379            delta_df[json_col] = delta_df[json_col].apply(
380                lambda x: (json.loads(x) if isinstance(x, str) else x)
381            )
382        except Exception:
383            warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}")
384
385    for numeric_col in numeric_cols:
386        if numeric_col not in delta_df.columns:
387            continue
388        try:
389            delta_df[numeric_col] = delta_df[numeric_col].apply(
390                functools.partial(
391                    attempt_cast_to_numeric,
392                    quantize=True,
393                    precision=numeric_cols_precisions_scales.get(numeric_col, (None, None)[0]),
394                    scale=numeric_cols_precisions_scales.get(numeric_col, (None, None)[1]),
395                )
396            )
397        except Exception:
398            warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}")
399
400    for uuid_col in uuid_cols:
401        if uuid_col not in delta_df.columns:
402            continue
403        try:
404            delta_df[uuid_col] = delta_df[uuid_col].apply(attempt_cast_to_uuid)
405        except Exception:
406            warn(f"Unable to parse numeric column '{uuid_col}':\n{traceback.format_exc()}")
407
408    for bytes_col in bytes_cols:
409        if bytes_col not in delta_df.columns:
410            continue
411        try:
412            delta_df[bytes_col] = delta_df[bytes_col].apply(attempt_cast_to_bytes)
413        except Exception:
414            warn(f"Unable to parse bytes column '{bytes_col}':\n{traceback.format_exc()}")
415
416    for geometry_col in geometry_cols:
417        if geometry_col not in delta_df.columns:
418            continue
419        try:
420            delta_df[geometry_col] = delta_df[geometry_col].apply(attempt_cast_to_geometry)
421        except Exception:
422            warn(f"Unable to parse bytes column '{bytes_col}':\n{traceback.format_exc()}")
423
424    return delta_df

Left join two DataFrames to find the newest unseen data.

Parameters
  • old_df ('pd.DataFrame'): The original (target) dataframe. Acts as a filter on the new_df.
  • new_df ('pd.DataFrame'): The fetched (source) dataframe. Rows that are contained in old_df are removed.
  • safe_copy (bool, default True): If True, create a copy before comparing and modifying the dataframes. Setting to False may mutate the DataFrames.
  • dtypes (Optional[Dict[str, Any]], default None): Optionally specify the datatypes of the dataframe.
  • include_unchanged_columns (bool, default False): If True, include columns which haven't changed on rows which have changed.
  • coerce_mixed_numerics (bool, default True): If True, cast mixed integer and float columns between the old and new dataframes into numeric values (decimal.Decimal).
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pandas dataframe of the new, unseen rows in new_df.
Examples
>>> import pandas as pd
>>> df1 = pd.DataFrame({'a': [1,2]})
>>> df2 = pd.DataFrame({'a': [2,3]})
>>> filter_unseen_df(df1, df2)
   a
0  3
def parse_df_datetimes( df: pandas.core.frame.DataFrame, ignore_cols: Optional[Iterable[str]] = None, strip_timezone: bool = False, chunksize: Optional[int] = None, dtype_backend: str = 'numpy_nullable', ignore_all: bool = False, precision_unit: Optional[str] = None, coerce_utc: bool = True, debug: bool = False) -> pandas.core.frame.DataFrame:
427def parse_df_datetimes(
428    df: 'pd.DataFrame',
429    ignore_cols: Optional[Iterable[str]] = None,
430    strip_timezone: bool = False,
431    chunksize: Optional[int] = None,
432    dtype_backend: str = 'numpy_nullable',
433    ignore_all: bool = False,
434    precision_unit: Optional[str] = None,
435    coerce_utc: bool = True,
436    debug: bool = False,
437) -> 'pd.DataFrame':
438    """
439    Parse a pandas DataFrame for datetime columns and cast as datetimes.
440
441    Parameters
442    ----------
443    df: pd.DataFrame
444        The pandas DataFrame to parse.
445
446    ignore_cols: Optional[Iterable[str]], default None
447        If provided, do not attempt to coerce these columns as datetimes.
448
449    strip_timezone: bool, default False
450        If `True`, remove the UTC `tzinfo` property.
451
452    chunksize: Optional[int], default None
453        If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe.
454
455    dtype_backend: str, default 'numpy_nullable'
456        If `df` is not a DataFrame and new one needs to be constructed,
457        use this as the datatypes backend.
458        Accepted values are 'numpy_nullable' and 'pyarrow'.
459
460    ignore_all: bool, default False
461        If `True`, do not attempt to cast any columns to datetimes.
462
463    precision_unit: Optional[str], default None
464        If provided, enforce the given precision on the coerced datetime columns.
465
466    coerce_utc: bool, default True
467        Coerce the datetime columns to UTC (see `meerschaum.utils.dtypes.to_datetime()`).
468
469    debug: bool, default False
470        Verbosity toggle.
471
472    Returns
473    -------
474    A new pandas DataFrame with the determined datetime columns
475    (usually ISO strings) cast as datetimes.
476
477    Examples
478    --------
479    ```python
480    >>> import pandas as pd
481    >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 
482    >>> df.dtypes
483    a    object
484    dtype: object
485    >>> df2 = parse_df_datetimes(df)
486    >>> df2.dtypes
487    a    datetime64[us, UTC]
488    dtype: object
489
490    ```
491
492    """
493    from meerschaum.utils.packages import import_pandas, attempt_import
494    from meerschaum.utils.debug import dprint
495    from meerschaum.utils.warnings import warn
496    from meerschaum.utils.misc import items_str
497    from meerschaum.utils.dtypes import to_datetime, MRSM_PD_DTYPES
498    import traceback
499
500    pd = import_pandas()
501    pandas = attempt_import('pandas')
502    pd_name = pd.__name__
503    using_dask = 'dask' in pd_name
504    df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__)
505    dask_dataframe = None
506    if using_dask or df_is_dask:
507        npartitions = chunksize_to_npartitions(chunksize)
508        dask_dataframe = attempt_import('dask.dataframe')
509
510    ### if df is a dict, build DataFrame
511    if isinstance(df, pandas.DataFrame):
512        pdf = df
513    elif df_is_dask and isinstance(df, dask_dataframe.DataFrame):
514        pdf = get_first_valid_dask_partition(df)
515    else:
516        if debug:
517            dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...")
518
519        if using_dask:
520            if isinstance(df, list):
521                keys = set()
522                for doc in df:
523                    for key in doc:
524                        keys.add(key)
525                df = pd.DataFrame.from_dict(
526                    {
527                        k: [
528                            doc.get(k, None)
529                            for doc in df
530                        ] for k in keys
531                    },
532                    npartitions=npartitions,
533                )
534            elif isinstance(df, dict):
535                df = pd.DataFrame.from_dict(df, npartitions=npartitions)
536            elif 'pandas.core.frame.DataFrame' in str(type(df)):
537                df = pd.from_pandas(df, npartitions=npartitions)
538            else:
539                raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.")
540            pandas = attempt_import('pandas')
541            pdf = get_first_valid_dask_partition(df)
542
543        else:
544            df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend)
545            pdf = df
546
547    ### skip parsing if DataFrame is empty
548    if len(pdf) == 0:
549        if debug:
550            dprint("df is empty. Returning original DataFrame without casting datetime columns...")
551        return df
552
553    ignore_cols = set(
554        (ignore_cols or []) + [
555            col
556            for col, dtype in pdf.dtypes.items() 
557            if 'datetime' in str(dtype)
558        ]
559    )
560    cols_to_inspect = [
561        col
562        for col in pdf.columns
563        if col not in ignore_cols
564    ] if not ignore_all else []
565
566    if len(cols_to_inspect) == 0:
567        if debug:
568            dprint("All columns are ignored, skipping datetime detection...")
569        return df.infer_objects(copy=False).fillna(pandas.NA)
570
571    ### apply regex to columns to determine which are ISO datetimes
572    iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+'
573    dt_mask = pdf[cols_to_inspect].astype(str).apply(
574        lambda s: s.str.match(iso_dt_regex).all()
575    )
576
577    ### list of datetime column names
578    datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]]
579    if not datetime_cols:
580        if debug:
581            dprint("No columns detected as datetimes, returning...")
582        return df.infer_objects(copy=False).fillna(pandas.NA)
583
584    if debug:
585        dprint("Converting columns to datetimes: " + str(datetime_cols))
586
587    def _parse_to_datetime(x):
588        return to_datetime(x, precision_unit=precision_unit, coerce_utc=coerce_utc)
589
590    try:
591        if not using_dask:
592            df[datetime_cols] = df[datetime_cols].apply(_parse_to_datetime)
593        else:
594            df[datetime_cols] = df[datetime_cols].apply(
595                _parse_to_datetime,
596                utc=True,
597                axis=1,
598                meta={
599                    col: MRSM_PD_DTYPES['datetime']
600                    for col in datetime_cols
601                }
602            )
603    except Exception:
604        warn(
605            f"Unable to apply `to_datetime()` to {items_str(datetime_cols)}:\n"
606            + f"{traceback.format_exc()}"
607        )
608
609    if strip_timezone:
610        for dt in datetime_cols:
611            try:
612                df[dt] = df[dt].dt.tz_localize(None)
613            except Exception:
614                warn(
615                    f"Unable to convert column '{dt}' to naive datetime:\n"
616                    + f"{traceback.format_exc()}"
617                )
618
619    return df.fillna(pandas.NA)

Parse a pandas DataFrame for datetime columns and cast as datetimes.

Parameters
  • df (pd.DataFrame): The pandas DataFrame to parse.
  • ignore_cols (Optional[Iterable[str]], default None): If provided, do not attempt to coerce these columns as datetimes.
  • strip_timezone (bool, default False): If True, remove the UTC tzinfo property.
  • chunksize (Optional[int], default None): If the pandas implementation is 'dask', use this chunksize for the distributed dataframe.
  • dtype_backend (str, default 'numpy_nullable'): If df is not a DataFrame and new one needs to be constructed, use this as the datatypes backend. Accepted values are 'numpy_nullable' and 'pyarrow'.
  • ignore_all (bool, default False): If True, do not attempt to cast any columns to datetimes.
  • precision_unit (Optional[str], default None): If provided, enforce the given precision on the coerced datetime columns.
  • coerce_utc (bool, default True): Coerce the datetime columns to UTC (see meerschaum.utils.dtypes.to_datetime()).
  • debug (bool, default False): Verbosity toggle.
Returns
  • A new pandas DataFrame with the determined datetime columns
  • (usually ISO strings) cast as datetimes.
Examples
>>> import pandas as pd
>>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 
>>> df.dtypes
a    object
dtype: object
>>> df2 = parse_df_datetimes(df)
>>> df2.dtypes
a    datetime64[us, UTC]
dtype: object
def get_unhashable_cols(df: pandas.core.frame.DataFrame) -> List[str]:
622def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]:
623    """
624    Get the columns which contain unhashable objects from a Pandas DataFrame.
625
626    Parameters
627    ----------
628    df: pd.DataFrame
629        The DataFrame which may contain unhashable objects.
630
631    Returns
632    -------
633    A list of columns.
634    """
635    if df is None:
636        return []
637    if len(df) == 0:
638        return []
639
640    is_dask = 'dask' in df.__module__
641    if is_dask:
642        from meerschaum.utils.packages import attempt_import
643        pandas = attempt_import('pandas')
644        df = pandas.DataFrame(get_first_valid_dask_partition(df))
645    return [
646        col for col, val in df.iloc[0].items()
647        if not isinstance(val, Hashable)
648    ]

Get the columns which contain unhashable objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
  • A list of columns.
def get_json_cols(df: pandas.core.frame.DataFrame) -> List[str]:
651def get_json_cols(df: 'pd.DataFrame') -> List[str]:
652    """
653    Get the columns which contain unhashable objects from a Pandas DataFrame.
654
655    Parameters
656    ----------
657    df: pd.DataFrame
658        The DataFrame which may contain unhashable objects.
659
660    Returns
661    -------
662    A list of columns to be encoded as JSON.
663    """
664    if df is None:
665        return []
666
667    is_dask = 'dask' in df.__module__ if hasattr(df, '__module__') else False
668    if is_dask:
669        df = get_first_valid_dask_partition(df)
670
671    if len(df) == 0:
672        return []
673
674    cols_indices = {
675        col: df[col].first_valid_index()
676        for col in df.columns
677    }
678    return [
679        col
680        for col, ix in cols_indices.items()
681        if (
682            ix is not None
683            and isinstance(df.loc[ix][col], (dict, list))
684        )
685    ]

Get the columns which contain unhashable objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
  • A list of columns to be encoded as JSON.
def get_numeric_cols(df: pandas.core.frame.DataFrame) -> List[str]:
688def get_numeric_cols(df: 'pd.DataFrame') -> List[str]:
689    """
690    Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame.
691
692    Parameters
693    ----------
694    df: pd.DataFrame
695        The DataFrame which may contain decimal objects.
696
697    Returns
698    -------
699    A list of columns to treat as numerics.
700    """
701    if df is None:
702        return []
703    from decimal import Decimal
704    is_dask = 'dask' in df.__module__
705    if is_dask:
706        df = get_first_valid_dask_partition(df)
707
708    if len(df) == 0:
709        return []
710
711    cols_indices = {
712        col: df[col].first_valid_index()
713        for col in df.columns
714    }
715    return [
716        col
717        for col, ix in cols_indices.items()
718        if (
719            ix is not None
720            and
721            isinstance(df.loc[ix][col], Decimal)
722        )
723    ]

Get the columns which contain decimal.Decimal objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain decimal objects.
Returns
  • A list of columns to treat as numerics.
def get_bool_cols(df: pandas.core.frame.DataFrame) -> List[str]:
726def get_bool_cols(df: 'pd.DataFrame') -> List[str]:
727    """
728    Get the columns which contain `bool` objects from a Pandas DataFrame.
729
730    Parameters
731    ----------
732    df: pd.DataFrame
733        The DataFrame which may contain bools.
734
735    Returns
736    -------
737    A list of columns to treat as bools.
738    """
739    if df is None:
740        return []
741
742    is_dask = 'dask' in df.__module__
743    if is_dask:
744        df = get_first_valid_dask_partition(df)
745
746    if len(df) == 0:
747        return []
748
749    from meerschaum.utils.dtypes import are_dtypes_equal
750
751    return [
752        col
753        for col, typ in df.dtypes.items()
754        if are_dtypes_equal(str(typ), 'bool')
755    ]

Get the columns which contain bool objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain bools.
Returns
  • A list of columns to treat as bools.
def get_uuid_cols(df: pandas.core.frame.DataFrame) -> List[str]:
758def get_uuid_cols(df: 'pd.DataFrame') -> List[str]:
759    """
760    Get the columns which contain `uuid.UUID` objects from a Pandas DataFrame.
761
762    Parameters
763    ----------
764    df: pd.DataFrame
765        The DataFrame which may contain UUID objects.
766
767    Returns
768    -------
769    A list of columns to treat as UUIDs.
770    """
771    if df is None:
772        return []
773    from uuid import UUID
774    is_dask = 'dask' in df.__module__
775    if is_dask:
776        df = get_first_valid_dask_partition(df)
777
778    if len(df) == 0:
779        return []
780
781    cols_indices = {
782        col: df[col].first_valid_index()
783        for col in df.columns
784    }
785    return [
786        col
787        for col, ix in cols_indices.items()
788        if (
789            ix is not None
790            and
791            isinstance(df.loc[ix][col], UUID)
792        )
793    ]

Get the columns which contain uuid.UUID objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain UUID objects.
Returns
  • A list of columns to treat as UUIDs.
def get_datetime_cols( df: pandas.core.frame.DataFrame, timezone_aware: bool = True, timezone_naive: bool = True, with_tz_precision: bool = False) -> Union[List[str], Dict[str, Tuple[Optional[str], str]]]:
796def get_datetime_cols(
797    df: 'pd.DataFrame',
798    timezone_aware: bool = True,
799    timezone_naive: bool = True,
800    with_tz_precision: bool = False,
801) -> Union[List[str], Dict[str, Tuple[Union[str, None], str]]]:
802    """
803    Get the columns which contain `datetime` or `Timestamp` objects from a Pandas DataFrame.
804
805    Parameters
806    ----------
807    df: pd.DataFrame
808        The DataFrame which may contain `datetime` or `Timestamp` objects.
809
810    timezone_aware: bool, default True
811        If `True`, include timezone-aware datetime columns.
812
813    timezone_naive: bool, default True
814        If `True`, include timezone-naive datetime columns.
815
816    with_tz_precision: bool, default False
817        If `True`, return a dictionary mapping column names to tuples in the form
818        `(timezone, precision)`.
819
820    Returns
821    -------
822    A list of columns to treat as datetimes, or a dictionary of columns to tz+precision tuples
823    (if `with_tz_precision` is `True`).
824    """
825    if not timezone_aware and not timezone_naive:
826        raise ValueError("`timezone_aware` and `timezone_naive` cannot both be `False`.")
827
828    if df is None:
829        return [] if not with_tz_precision else {}
830
831    from datetime import datetime
832    from meerschaum.utils.dtypes import are_dtypes_equal, MRSM_PRECISION_UNITS_ALIASES
833    is_dask = 'dask' in df.__module__
834    if is_dask:
835        df = get_first_valid_dask_partition(df)
836   
837    def get_tz_precision_from_dtype(dtype: str) -> Tuple[Union[str, None], str]:
838        """
839        Extract the tz + precision tuple from a dtype string.
840        """
841        meta_str = dtype.split('[', maxsplit=1)[-1].rstrip(']').replace(' ', '')
842        tz = (
843            None
844            if ',' not in meta_str
845            else meta_str.split(',', maxsplit=1)[-1]
846        )
847        precision_abbreviation = (
848            meta_str
849            if ',' not in meta_str
850            else meta_str.split(',')[0]
851        )
852        precision = MRSM_PRECISION_UNITS_ALIASES[precision_abbreviation]
853        return tz, precision
854
855    def get_tz_precision_from_datetime(dt: datetime) -> Tuple[Union[str, None], str]:
856        """
857        Return the tz + precision tuple from a Python datetime object.
858        """
859        return dt.tzname(), 'microsecond'
860
861    known_dt_cols_types = {
862        col: str(typ)
863        for col, typ in df.dtypes.items()
864        if are_dtypes_equal('datetime', str(typ))
865    }
866 
867    known_dt_cols_tuples = {
868        col: get_tz_precision_from_dtype(typ)
869        for col, typ in known_dt_cols_types.items()
870    }
871
872    if len(df) == 0:
873        return (
874            list(known_dt_cols_types)
875            if not with_tz_precision
876            else known_dt_cols_tuples
877        )
878
879    cols_indices = {
880        col: df[col].first_valid_index()
881        for col in df.columns
882        if col not in known_dt_cols_types
883    }
884    pydt_cols_tuples = {
885        col: get_tz_precision_from_datetime(sample_val)
886        for col, ix in cols_indices.items()
887        if (
888            ix is not None
889            and
890            isinstance((sample_val := df.loc[ix][col]), datetime)
891        )
892    }
893
894    dt_cols_tuples = {
895        **known_dt_cols_tuples,
896        **pydt_cols_tuples
897    }
898
899    all_dt_cols_tuples = {
900        col: dt_cols_tuples[col]
901        for col in df.columns
902        if col in dt_cols_tuples
903    }
904    if timezone_aware and timezone_naive:
905        return (
906            list(all_dt_cols_tuples)
907            if not with_tz_precision
908            else all_dt_cols_tuples
909        )
910
911    known_timezone_aware_dt_cols = [
912        col
913        for col in known_dt_cols_types
914        if getattr(df[col], 'tz', None) is not None
915    ]
916    timezone_aware_pydt_cols_tuples = {
917        col: (tz, precision)
918        for col, (tz, precision) in pydt_cols_tuples.items()
919        if df.loc[cols_indices[col]][col].tzinfo is not None
920    }
921    timezone_aware_dt_cols_set = set(
922        known_timezone_aware_dt_cols + list(timezone_aware_pydt_cols_tuples)
923    )
924    timezone_aware_cols_tuples = {
925        col: (tz, precision)
926        for col, (tz, precision) in all_dt_cols_tuples.items()
927        if col in timezone_aware_dt_cols_set
928    }
929    timezone_naive_cols_tuples = {
930        col: (tz, precision)
931        for col, (tz, precision) in all_dt_cols_tuples.items()
932        if col not in timezone_aware_dt_cols_set
933    }
934
935    if timezone_aware:
936        return (
937            list(timezone_aware_cols_tuples)
938            if not with_tz_precision
939            else timezone_aware_cols_tuples
940        )
941
942    return (
943        list(timezone_naive_cols_tuples)
944        if not with_tz_precision
945        else timezone_naive_cols_tuples
946    )

Get the columns which contain datetime or Timestamp objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain datetime or Timestamp objects.
  • timezone_aware (bool, default True): If True, include timezone-aware datetime columns.
  • timezone_naive (bool, default True): If True, include timezone-naive datetime columns.
  • with_tz_precision (bool, default False): If True, return a dictionary mapping column names to tuples in the form (timezone, precision).
Returns
  • A list of columns to treat as datetimes, or a dictionary of columns to tz+precision tuples
  • (if with_tz_precision is True).
def get_datetime_cols_types(df: pandas.core.frame.DataFrame) -> Dict[str, str]:
949def get_datetime_cols_types(df: 'pd.DataFrame') -> Dict[str, str]:
950    """
951    Return a dictionary mapping datetime columns to specific types strings.
952
953    Parameters
954    ----------
955    df: pd.DataFrame
956        The DataFrame which may contain datetime columns.
957
958    Returns
959    -------
960    A dictionary mapping the datetime columns' names to dtype strings
961    (containing timezone and precision metadata).
962
963    Examples
964    --------
965    >>> from datetime import datetime, timezone
966    >>> import pandas as pd
967    >>> df = pd.DataFrame({'dt_tz_aware': [datetime(2025, 1, 1, tzinfo=timezone.utc)]})
968    >>> get_datetime_cols_types(df)
969    {'dt_tz_aware': 'datetime64[us, UTC]'}
970    >>> df = pd.DataFrame({'distant_dt': [datetime(1, 1, 1)]})
971    >>> get_datetime_cols_types(df)
972    {'distant_dt': 'datetime64[us]'}
973    >>> df = pd.DataFrame({'dt_second': datetime(2025, 1, 1)})
974    >>> df['dt_second'] = df['dt_second'].astype('datetime64[s]')
975    >>> get_datetime_cols_types(df)
976    {'dt_second': 'datetime64[s]'}
977    """
978    from meerschaum.utils.dtypes import MRSM_PRECISION_UNITS_ABBREVIATIONS
979    dt_cols_tuples = get_datetime_cols(df, with_tz_precision=True)
980    if not dt_cols_tuples:
981        return {}
982
983    return {
984        col: (
985            f"datetime64[{MRSM_PRECISION_UNITS_ABBREVIATIONS[precision]}]"
986            if tz is None
987            else f"datetime64[{MRSM_PRECISION_UNITS_ABBREVIATIONS[precision]}, {tz}]"
988        )
989        for col, (tz, precision) in dt_cols_tuples.items()
990    }

Return a dictionary mapping datetime columns to specific types strings.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain datetime columns.
Returns
  • A dictionary mapping the datetime columns' names to dtype strings
  • (containing timezone and precision metadata).
Examples
>>> from datetime import datetime, timezone
>>> import pandas as pd
>>> df = pd.DataFrame({'dt_tz_aware': [datetime(2025, 1, 1, tzinfo=timezone.utc)]})
>>> get_datetime_cols_types(df)
{'dt_tz_aware': 'datetime64[us, UTC]'}
>>> df = pd.DataFrame({'distant_dt': [datetime(1, 1, 1)]})
>>> get_datetime_cols_types(df)
{'distant_dt': 'datetime64[us]'}
>>> df = pd.DataFrame({'dt_second': datetime(2025, 1, 1)})
>>> df['dt_second'] = df['dt_second'].astype('datetime64[s]')
>>> get_datetime_cols_types(df)
{'dt_second': 'datetime64[s]'}
def get_date_cols(df: pandas.core.frame.DataFrame) -> List[str]:
 993def get_date_cols(df: 'pd.DataFrame') -> List[str]:
 994    """
 995    Get the `date` columns from a Pandas DataFrame.
 996
 997    Parameters
 998    ----------
 999    df: pd.DataFrame
1000        The DataFrame which may contain dates.
1001
1002    Returns
1003    -------
1004    A list of columns to treat as dates.
1005    """
1006    from meerschaum.utils.dtypes import are_dtypes_equal
1007    if df is None:
1008        return []
1009
1010    is_dask = 'dask' in df.__module__
1011    if is_dask:
1012        df = get_first_valid_dask_partition(df)
1013
1014    known_date_cols = [
1015        col
1016        for col, typ in df.dtypes.items()
1017        if are_dtypes_equal(typ, 'date')
1018    ]
1019
1020    if len(df) == 0:
1021        return known_date_cols
1022
1023    cols_indices = {
1024        col: df[col].first_valid_index()
1025        for col in df.columns
1026        if col not in known_date_cols
1027    }
1028    object_date_cols = [
1029        col
1030        for col, ix in cols_indices.items()
1031        if (
1032            ix is not None
1033            and isinstance(df.loc[ix][col], date)
1034        )
1035    ]
1036
1037    all_date_cols = set(known_date_cols + object_date_cols)
1038
1039    return [
1040        col
1041        for col in df.columns
1042        if col in all_date_cols
1043    ]

Get the date columns from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain dates.
Returns
  • A list of columns to treat as dates.
def get_bytes_cols(df: pandas.core.frame.DataFrame) -> List[str]:
1046def get_bytes_cols(df: 'pd.DataFrame') -> List[str]:
1047    """
1048    Get the columns which contain bytes strings from a Pandas DataFrame.
1049
1050    Parameters
1051    ----------
1052    df: pd.DataFrame
1053        The DataFrame which may contain bytes strings.
1054
1055    Returns
1056    -------
1057    A list of columns to treat as bytes.
1058    """
1059    if df is None:
1060        return []
1061
1062    is_dask = 'dask' in df.__module__
1063    if is_dask:
1064        df = get_first_valid_dask_partition(df)
1065
1066    known_bytes_cols = [
1067        col
1068        for col, typ in df.dtypes.items()
1069        if str(typ) == 'binary[pyarrow]'
1070    ]
1071
1072    if len(df) == 0:
1073        return known_bytes_cols
1074
1075    cols_indices = {
1076        col: df[col].first_valid_index()
1077        for col in df.columns
1078        if col not in known_bytes_cols
1079    }
1080    object_bytes_cols = [
1081        col
1082        for col, ix in cols_indices.items()
1083        if (
1084            ix is not None
1085            and isinstance(df.loc[ix][col], bytes)
1086        )
1087    ]
1088
1089    all_bytes_cols = set(known_bytes_cols + object_bytes_cols)
1090
1091    return [
1092        col
1093        for col in df.columns
1094        if col in all_bytes_cols
1095    ]

Get the columns which contain bytes strings from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain bytes strings.
Returns
  • A list of columns to treat as bytes.
def get_geometry_cols( df: pandas.core.frame.DataFrame, with_types_srids: bool = False) -> Union[List[str], Dict[str, Any]]:
1098def get_geometry_cols(
1099    df: 'pd.DataFrame',
1100    with_types_srids: bool = False,
1101) -> Union[List[str], Dict[str, Any]]:
1102    """
1103    Get the columns which contain shapely objects from a Pandas DataFrame.
1104
1105    Parameters
1106    ----------
1107    df: pd.DataFrame
1108        The DataFrame which may contain bytes strings.
1109
1110    with_types_srids: bool, default False
1111        If `True`, return a dictionary mapping columns to geometry types and SRIDs.
1112
1113    Returns
1114    -------
1115    A list of columns to treat as `geometry`.
1116    If `with_types_srids`, return a dictionary mapping columns to tuples in the form (type, SRID).
1117    """
1118    if df is None:
1119        return [] if not with_types_srids else {}
1120
1121    is_dask = 'dask' in df.__module__
1122    if is_dask:
1123        df = get_first_valid_dask_partition(df)
1124
1125    if len(df) == 0:
1126        return [] if not with_types_srids else {}
1127
1128    cols_indices = {
1129        col: df[col].first_valid_index()
1130        for col in df.columns
1131    }
1132    geo_cols = [
1133        col
1134        for col, ix in cols_indices.items()
1135        if (
1136            ix is not None
1137            and
1138            'shapely' in str(type(df.loc[ix][col]))
1139        )
1140    ]
1141    if not with_types_srids:
1142        return geo_cols
1143
1144    gpd = mrsm.attempt_import('geopandas', lazy=False)
1145    geo_cols_types_srids = {}
1146    for col in geo_cols:
1147        try:
1148            sample_geo_series = gpd.GeoSeries(df[col], crs=None)
1149            geometry_types = {
1150                geom.geom_type
1151                for geom in sample_geo_series
1152                if hasattr(geom, 'geom_type')
1153            }
1154            geometry_has_z = any(getattr(geom, 'has_z', False) for geom in sample_geo_series)
1155            srid = (
1156                (
1157                    sample_geo_series.crs.sub_crs_list[0].to_epsg()
1158                    if sample_geo_series.crs.is_compound
1159                    else sample_geo_series.crs.to_epsg()
1160                )
1161                if sample_geo_series.crs
1162                else 0
1163            )
1164            geometry_type = list(geometry_types)[0] if len(geometry_types) == 1 else 'geometry'
1165            if geometry_type != 'geometry' and geometry_has_z:
1166                geometry_type = geometry_type + 'Z'
1167        except Exception:
1168            srid = 0
1169            geometry_type = 'geometry'
1170        geo_cols_types_srids[col] = (geometry_type, srid)
1171
1172    return geo_cols_types_srids

Get the columns which contain shapely objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain bytes strings.
  • with_types_srids (bool, default False): If True, return a dictionary mapping columns to geometry types and SRIDs.
Returns
  • A list of columns to treat as geometry.
  • If with_types_srids, return a dictionary mapping columns to tuples in the form (type, SRID).
def get_geometry_cols_types(df: pandas.core.frame.DataFrame) -> Dict[str, str]:
1175def get_geometry_cols_types(df: 'pd.DataFrame') -> Dict[str, str]:
1176    """
1177    Return a dtypes dictionary mapping columns to specific geometry types (type, srid).
1178    """
1179    geometry_cols_types_srids = get_geometry_cols(df, with_types_srids=True)
1180    new_cols_types = {}
1181    for col, (geometry_type, srid) in geometry_cols_types_srids.items():
1182        new_dtype = "geometry"
1183        modifier = ""
1184        if not srid and geometry_type.lower() == 'geometry':
1185            new_cols_types[col] = new_dtype
1186            continue
1187
1188        modifier = "["
1189        if geometry_type.lower() != 'geometry':
1190            modifier += f"{geometry_type}"
1191
1192        if srid:
1193            if modifier != '[':
1194                modifier += ", "
1195            modifier += f"{srid}"
1196        modifier += "]"
1197        new_cols_types[col] = f"{new_dtype}{modifier}"
1198    return new_cols_types

Return a dtypes dictionary mapping columns to specific geometry types (type, srid).

def get_special_cols(df: pandas.core.frame.DataFrame) -> Dict[str, str]:
1201def get_special_cols(df: 'pd.DataFrame') -> Dict[str, str]:
1202    """
1203    Return a dtypes dictionary mapping special columns to their dtypes.
1204    """
1205    return {
1206        **{col: 'json' for col in get_json_cols(df)},
1207        **{col: 'uuid' for col in get_uuid_cols(df)},
1208        **{col: 'bytes' for col in get_bytes_cols(df)},
1209        **{col: 'bool' for col in get_bool_cols(df)},
1210        **{col: 'numeric' for col in get_numeric_cols(df)},
1211        **{col: 'date' for col in get_date_cols(df)},
1212        **get_datetime_cols_types(df),
1213        **get_geometry_cols_types(df),
1214    }

Return a dtypes dictionary mapping special columns to their dtypes.

def enforce_dtypes( df: pandas.core.frame.DataFrame, dtypes: Dict[str, str], explicit_dtypes: Optional[Dict[str, str]] = None, safe_copy: bool = True, coerce_numeric: bool = False, coerce_timezone: bool = True, strip_timezone: bool = False, debug: bool = False) -> pandas.core.frame.DataFrame:
1217def enforce_dtypes(
1218    df: 'pd.DataFrame',
1219    dtypes: Dict[str, str],
1220    explicit_dtypes: Optional[Dict[str, str]] = None,
1221    safe_copy: bool = True,
1222    coerce_numeric: bool = False,
1223    coerce_timezone: bool = True,
1224    strip_timezone: bool = False,
1225    debug: bool = False,
1226) -> 'pd.DataFrame':
1227    """
1228    Enforce the `dtypes` dictionary on a DataFrame.
1229
1230    Parameters
1231    ----------
1232    df: pd.DataFrame
1233        The DataFrame on which to enforce dtypes.
1234
1235    dtypes: Dict[str, str]
1236        The data types to attempt to enforce on the DataFrame.
1237
1238    explicit_dtypes: Optional[Dict[str, str]], default None
1239        If provided, automatic dtype coersion will respect explicitly configured
1240        dtypes (`int`, `float`, `numeric`).
1241
1242    safe_copy: bool, default True
1243        If `True`, create a copy before comparing and modifying the dataframes.
1244        Setting to `False` may mutate the DataFrames.
1245        See `meerschaum.utils.dataframe.filter_unseen_df`.
1246
1247    coerce_numeric: bool, default False
1248        If `True`, convert float and int collisions to numeric.
1249
1250    coerce_timezone: bool, default True
1251        If `True`, convert datetimes to UTC.
1252
1253    strip_timezone: bool, default False
1254        If `coerce_timezone` and `strip_timezone` are `True`,
1255        remove timezone information from datetimes.
1256
1257    debug: bool, default False
1258        Verbosity toggle.
1259
1260    Returns
1261    -------
1262    The Pandas DataFrame with the types enforced.
1263    """
1264    import json
1265    import functools
1266    from meerschaum.utils.debug import dprint
1267    from meerschaum.utils.formatting import pprint
1268    from meerschaum.utils.dtypes import (
1269        are_dtypes_equal,
1270        to_pandas_dtype,
1271        is_dtype_numeric,
1272        attempt_cast_to_numeric,
1273        attempt_cast_to_uuid,
1274        attempt_cast_to_bytes,
1275        attempt_cast_to_geometry,
1276        coerce_timezone as _coerce_timezone,
1277        get_geometry_type_srid,
1278    )
1279    from meerschaum.utils.dtypes.sql import get_numeric_precision_scale
1280    pandas = mrsm.attempt_import('pandas')
1281    is_dask = 'dask' in df.__module__
1282    if safe_copy:
1283        df = df.copy()
1284    if len(df.columns) == 0:
1285        if debug:
1286            dprint("Incoming DataFrame has no columns. Skipping enforcement...")
1287        return df
1288
1289    explicit_dtypes = explicit_dtypes or {}
1290    pipe_pandas_dtypes = {
1291        col: to_pandas_dtype(typ)
1292        for col, typ in dtypes.items()
1293    }
1294    json_cols = [
1295        col
1296        for col, typ in dtypes.items()
1297        if typ == 'json'
1298    ]
1299    numeric_cols = [
1300        col
1301        for col, typ in dtypes.items()
1302        if typ.startswith('numeric')
1303    ]
1304    geometry_cols_types_srids = {
1305        col: get_geometry_type_srid(typ, default_srid=0)
1306        for col, typ in dtypes.items()
1307        if typ.startswith('geometry') or typ.startswith('geography')
1308    }
1309    uuid_cols = [
1310        col
1311        for col, typ in dtypes.items()
1312        if typ == 'uuid'
1313    ]
1314    bytes_cols = [
1315        col
1316        for col, typ in dtypes.items()
1317        if typ == 'bytes'
1318    ]
1319    datetime_cols = [
1320        col
1321        for col, typ in dtypes.items()
1322        if are_dtypes_equal(typ, 'datetime')
1323    ]
1324    df_numeric_cols = get_numeric_cols(df)
1325    if debug:
1326        dprint("Desired data types:")
1327        pprint(dtypes)
1328        dprint("Data types for incoming DataFrame:")
1329        pprint({_col: str(_typ) for _col, _typ in df.dtypes.items()})
1330
1331    if json_cols and len(df) > 0:
1332        if debug:
1333            dprint(f"Checking columns for JSON encoding: {json_cols}")
1334        for col in json_cols:
1335            if col in df.columns:
1336                try:
1337                    df[col] = df[col].apply(
1338                        (
1339                            lambda x: (
1340                                json.loads(x)
1341                                if isinstance(x, str)
1342                                else x
1343                            )
1344                        )
1345                    )
1346                except Exception as e:
1347                    if debug:
1348                        dprint(f"Unable to parse column '{col}' as JSON:\n{e}")
1349
1350    if numeric_cols:
1351        if debug:
1352            dprint(f"Checking for numerics: {numeric_cols}")
1353        for col in numeric_cols:
1354            precision, scale = get_numeric_precision_scale(None, dtypes.get(col, ''))
1355            if col in df.columns:
1356                try:
1357                    df[col] = df[col].apply(
1358                        functools.partial(
1359                            attempt_cast_to_numeric,
1360                            quantize=True,
1361                            precision=precision,
1362                            scale=scale,
1363                        )
1364                    )
1365                except Exception as e:
1366                    if debug:
1367                        dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}")
1368
1369    if uuid_cols:
1370        if debug:
1371            dprint(f"Checking for UUIDs: {uuid_cols}")
1372        for col in uuid_cols:
1373            if col in df.columns:
1374                try:
1375                    df[col] = df[col].apply(attempt_cast_to_uuid)
1376                except Exception as e:
1377                    if debug:
1378                        dprint(f"Unable to parse column '{col}' as UUID:\n{e}")
1379
1380    if bytes_cols:
1381        if debug:
1382            dprint(f"Checking for bytes: {bytes_cols}")
1383        for col in bytes_cols:
1384            if col in df.columns:
1385                try:
1386                    df[col] = df[col].apply(attempt_cast_to_bytes)
1387                except Exception as e:
1388                    if debug:
1389                        dprint(f"Unable to parse column '{col}' as bytes:\n{e}")
1390
1391    if datetime_cols and coerce_timezone:
1392        if debug:
1393            dprint(f"Checking for datetime conversion: {datetime_cols}")
1394        for col in datetime_cols:
1395            if col in df.columns:
1396                if not strip_timezone and 'utc' in str(df.dtypes[col]).lower():
1397                    if debug:
1398                        dprint(f"Skip UTC coersion for column '{col}' ({str(df[col].dtype)}).")
1399                    continue
1400                if strip_timezone and ',' not in str(df.dtypes[col]):
1401                    if debug:
1402                        dprint(
1403                            f"Skip UTC coersion (stripped) for column '{col}' "
1404                            f"({str(df[col].dtype)})."
1405                        )
1406                        continue
1407
1408                if debug:
1409                    dprint(
1410                        f"Data type for column '{col}' before timezone coersion: "
1411                        f"{str(df[col].dtype)}"
1412                    )
1413
1414                df[col] = _coerce_timezone(df[col], strip_utc=strip_timezone)
1415                if debug:
1416                    dprint(
1417                        f"Data type for column '{col}' after timezone coersion: "
1418                        f"{str(df[col].dtype)}"
1419                    )
1420
1421    if geometry_cols_types_srids:
1422        geopandas = mrsm.attempt_import('geopandas')
1423        if debug:
1424            dprint(f"Checking for geometry: {list(geometry_cols_types_srids)}")
1425        parsed_geom_cols = []
1426        for col in geometry_cols_types_srids:
1427            try:
1428                df[col] = df[col].apply(attempt_cast_to_geometry)
1429                parsed_geom_cols.append(col)
1430            except Exception as e:
1431                if debug:
1432                    dprint(f"Unable to parse column '{col}' as geometry:\n{e}")
1433
1434        if parsed_geom_cols:
1435            if debug:
1436                dprint(f"Converting to GeoDataFrame (geometry column: '{parsed_geom_cols[0]}')...")
1437            try:
1438                _, default_srid = geometry_cols_types_srids[parsed_geom_cols[0]]
1439                df = geopandas.GeoDataFrame(df, geometry=parsed_geom_cols[0], crs=default_srid)
1440                for col, (_, srid) in geometry_cols_types_srids.items():
1441                    if srid:
1442                        if debug:
1443                            dprint(f"Setting '{col}' to SRID '{srid}'...")
1444                        _ = df[col].set_crs(srid)
1445                if parsed_geom_cols[0] not in df.columns:
1446                    df.rename_geometry(parsed_geom_cols[0], inplace=True)
1447            except (ValueError, TypeError):
1448                if debug:
1449                    import traceback
1450                    dprint(f"Failed to cast to GeoDataFrame:\n{traceback.format_exc()}")
1451
1452    df_dtypes = {c: str(t) for c, t in df.dtypes.items()}
1453    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
1454        if debug:
1455            dprint("Data types match. Exiting enforcement...")
1456        return df
1457
1458    common_dtypes = {}
1459    common_diff_dtypes = {}
1460    for col, typ in pipe_pandas_dtypes.items():
1461        if col in df_dtypes:
1462            common_dtypes[col] = typ
1463            if not are_dtypes_equal(typ, df_dtypes[col]):
1464                common_diff_dtypes[col] = df_dtypes[col]
1465
1466    if debug:
1467        dprint("Common columns with different dtypes:")
1468        pprint(common_diff_dtypes)
1469
1470    detected_dt_cols = {}
1471    for col, typ in common_diff_dtypes.items():
1472        if 'datetime' in typ and 'datetime' in common_dtypes[col]:
1473            df_dtypes[col] = typ
1474            detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col])
1475    for col in detected_dt_cols:
1476        del common_diff_dtypes[col]
1477
1478    if debug:
1479        dprint("Common columns with different dtypes (after dates):")
1480        pprint(common_diff_dtypes)
1481
1482    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
1483        if debug:
1484            dprint(
1485                "The incoming DataFrame has mostly the same types, skipping enforcement."
1486                + "The only detected difference was in the following datetime columns."
1487            )
1488            pprint(detected_dt_cols)
1489        return df
1490
1491    for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items():
1492        previous_typ = common_dtypes[col]
1493        mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ))
1494        explicitly_float = are_dtypes_equal(explicit_dtypes.get(col, 'object'), 'float')
1495        explicitly_int = are_dtypes_equal(explicit_dtypes.get(col, 'object'), 'int')
1496        explicitly_numeric = explicit_dtypes.get(col, 'object').startswith('numeric')
1497        all_nan = (
1498            df[col].isnull().all()
1499            if mixed_numeric_types and coerce_numeric and not (explicitly_float or explicitly_int)
1500            else None
1501        )
1502        cast_to_numeric = explicitly_numeric or (
1503            (
1504                col in df_numeric_cols
1505                or (
1506                    mixed_numeric_types
1507                    and not (explicitly_float or explicitly_int)
1508                    and not all_nan
1509                    and coerce_numeric
1510                )
1511            )
1512        )
1513
1514        if debug and (explicitly_numeric or df_numeric_cols or mixed_numeric_types):
1515            from meerschaum.utils.formatting import make_header
1516            msg = (
1517                make_header(f"Coercing column '{col}' to numeric:", left_pad=0)
1518                + "\n"
1519                + f"  Previous type: {previous_typ}\n"
1520                + f"  Current type: {typ if col not in df_numeric_cols else 'Decimal'}"
1521                + ("\n  Column is explicitly numeric." if explicitly_numeric else "")
1522            ) if cast_to_numeric else (
1523                f"Will not coerce column '{col}' to numeric.\n"
1524                f"  Numeric columns in dataframe: {df_numeric_cols}\n"
1525                f"  Mixed numeric types: {mixed_numeric_types}\n"
1526                f"  Explicitly float: {explicitly_float}\n"
1527                f"  Explicitly int: {explicitly_int}\n"
1528                f"  All NaN: {all_nan}\n"
1529                f"  Coerce numeric: {coerce_numeric}"
1530            )
1531            dprint(msg)
1532
1533        if cast_to_numeric:
1534            common_dtypes[col] = attempt_cast_to_numeric
1535            common_diff_dtypes[col] = attempt_cast_to_numeric
1536
1537    for d in common_diff_dtypes:
1538        t = common_dtypes[d]
1539        if debug:
1540            dprint(f"Casting column {d} to dtype {t}.")
1541        try:
1542            df[d] = (
1543                df[d].apply(t)
1544                if callable(t)
1545                else df[d].astype(t)
1546            )
1547        except Exception as e:
1548            if debug:
1549                dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}\ndf:\n{df}")
1550            if 'int' in str(t).lower():
1551                try:
1552                    df[d] = df[d].astype('float64').astype(t)
1553                except Exception:
1554                    if debug:
1555                        dprint(f"Was unable to convert to float then {t}.")
1556    return df

Enforce the dtypes dictionary on a DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame on which to enforce dtypes.
  • dtypes (Dict[str, str]): The data types to attempt to enforce on the DataFrame.
  • explicit_dtypes (Optional[Dict[str, str]], default None): If provided, automatic dtype coersion will respect explicitly configured dtypes (int, float, numeric).
  • safe_copy (bool, default True): If True, create a copy before comparing and modifying the dataframes. Setting to False may mutate the DataFrames. See meerschaum.utils.dataframe.filter_unseen_df.
  • coerce_numeric (bool, default False): If True, convert float and int collisions to numeric.
  • coerce_timezone (bool, default True): If True, convert datetimes to UTC.
  • strip_timezone (bool, default False): If coerce_timezone and strip_timezone are True, remove timezone information from datetimes.
  • debug (bool, default False): Verbosity toggle.
Returns
  • The Pandas DataFrame with the types enforced.
def get_datetime_bound_from_df( df: Union[pandas.core.frame.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]]], datetime_column: str, minimum: bool = True) -> Union[int, datetime.datetime, NoneType]:
1559def get_datetime_bound_from_df(
1560    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]],
1561    datetime_column: str,
1562    minimum: bool = True,
1563) -> Union[int, datetime, None]:
1564    """
1565    Return the minimum or maximum datetime (or integer) from a DataFrame.
1566
1567    Parameters
1568    ----------
1569    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]
1570        The DataFrame, list, or dict which contains the range axis.
1571
1572    datetime_column: str
1573        The name of the datetime (or int) column.
1574
1575    minimum: bool
1576        Whether to return the minimum (default) or maximum value.
1577
1578    Returns
1579    -------
1580    The minimum or maximum datetime value in the dataframe, or `None`.
1581    """
1582    from meerschaum.utils.dtypes import to_datetime, value_is_null
1583
1584    if df is None:
1585        return None
1586    if not datetime_column:
1587        return None
1588
1589    def compare(a, b):
1590        if a is None:
1591            return b
1592        if b is None:
1593            return a
1594        if minimum:
1595            return a if a < b else b
1596        return a if a > b else b
1597
1598    if isinstance(df, list):
1599        if len(df) == 0:
1600            return None
1601        best_yet = df[0].get(datetime_column, None)
1602        for doc in df:
1603            val = doc.get(datetime_column, None)
1604            best_yet = compare(best_yet, val)
1605        return best_yet
1606
1607    if isinstance(df, dict):
1608        if datetime_column not in df:
1609            return None
1610        best_yet = df[datetime_column][0]
1611        for val in df[datetime_column]:
1612            best_yet = compare(best_yet, val)
1613        return best_yet
1614
1615    if 'DataFrame' in str(type(df)):
1616        from meerschaum.utils.dtypes import are_dtypes_equal
1617        pandas = mrsm.attempt_import('pandas')
1618        is_dask = 'dask' in df.__module__
1619
1620        if datetime_column not in df.columns:
1621            return None
1622
1623        try:
1624            dt_val = (
1625                df[datetime_column].min(skipna=True)
1626                if minimum
1627                else df[datetime_column].max(skipna=True)
1628            )
1629        except Exception:
1630            dt_val = pandas.NA
1631        if is_dask and dt_val is not None and dt_val is not pandas.NA:
1632            dt_val = dt_val.compute()
1633
1634        return (
1635            to_datetime(dt_val, as_pydatetime=True)
1636            if are_dtypes_equal(str(type(dt_val)), 'datetime')
1637            else (dt_val if not value_is_null(dt_val) else None)
1638        )
1639
1640    return None

Return the minimum or maximum datetime (or integer) from a DataFrame.

Parameters
  • df (Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]): The DataFrame, list, or dict which contains the range axis.
  • datetime_column (str): The name of the datetime (or int) column.
  • minimum (bool): Whether to return the minimum (default) or maximum value.
Returns
  • The minimum or maximum datetime value in the dataframe, or None.
def get_unique_index_values( df: Union[pandas.core.frame.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]]], indices: List[str]) -> Dict[str, List[Any]]:
1643def get_unique_index_values(
1644    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]],
1645    indices: List[str],
1646) -> Dict[str, List[Any]]:
1647    """
1648    Return a dictionary of the unique index values in a DataFrame.
1649
1650    Parameters
1651    ----------
1652    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]
1653        The dataframe (or list or dict) which contains index values.
1654
1655    indices: List[str]
1656        The list of index columns.
1657
1658    Returns
1659    -------
1660    A dictionary mapping indices to unique values.
1661    """
1662    if df is None:
1663        return {}
1664    if 'dataframe' in str(type(df)).lower():
1665        pandas = mrsm.attempt_import('pandas')
1666        return {
1667            col: list({
1668                (val if val is not pandas.NA else None)
1669                for val in df[col].unique()
1670            })
1671            for col in indices
1672            if col in df.columns
1673        }
1674
1675    unique_indices = defaultdict(lambda: set())
1676    if isinstance(df, list):
1677        for doc in df:
1678            for index in indices:
1679                if index in doc:
1680                    unique_indices[index].add(doc[index])
1681
1682    elif isinstance(df, dict):
1683        for index in indices:
1684            if index in df:
1685                unique_indices[index] = unique_indices[index].union(set(df[index]))
1686
1687    return {key: list(val) for key, val in unique_indices.items()}

Return a dictionary of the unique index values in a DataFrame.

Parameters
  • df (Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]): The dataframe (or list or dict) which contains index values.
  • indices (List[str]): The list of index columns.
Returns
  • A dictionary mapping indices to unique values.
def df_is_chunk_generator(df: Any) -> bool:
1690def df_is_chunk_generator(df: Any) -> bool:
1691    """
1692    Determine whether to treat `df` as a chunk generator.
1693
1694    Note this should only be used in a context where generators are expected,
1695    as it will return `True` for any iterable.
1696
1697    Parameters
1698    ----------
1699    The DataFrame or chunk generator to evaluate.
1700
1701    Returns
1702    -------
1703    A `bool` indicating whether to treat `df` as a generator.
1704    """
1705    return (
1706        not isinstance(df, (dict, list, str))
1707        and 'DataFrame' not in str(type(df))
1708        and isinstance(df, (Generator, Iterable, Iterator))
1709    )

Determine whether to treat df as a chunk generator.

Note this should only be used in a context where generators are expected, as it will return True for any iterable.

Parameters
  • The DataFrame or chunk generator to evaluate.
Returns
  • A bool indicating whether to treat df as a generator.
def chunksize_to_npartitions(chunksize: Optional[int]) -> int:
1712def chunksize_to_npartitions(chunksize: Optional[int]) -> int:
1713    """
1714    Return the Dask `npartitions` value for a given `chunksize`.
1715    """
1716    if chunksize == -1:
1717        from meerschaum.config import get_config
1718        chunksize = get_config('system', 'connectors', 'sql', 'chunksize')
1719    if chunksize is None:
1720        return 1
1721    return -1 * chunksize

Return the Dask npartitions value for a given chunksize.

def df_from_literal( pipe: Optional[meerschaum.Pipe] = None, literal: Optional[str] = None, debug: bool = False) -> pandas.core.frame.DataFrame:
1724def df_from_literal(
1725    pipe: Optional[mrsm.Pipe] = None,
1726    literal: Optional[str] = None,
1727    debug: bool = False
1728) -> 'pd.DataFrame':
1729    """
1730    Construct a dataframe from a literal value, using the pipe's datetime and value column names.
1731
1732    Parameters
1733    ----------
1734    pipe: Optional['meerschaum.Pipe'], default None
1735        The pipe which will consume the literal value.
1736
1737    Returns
1738    -------
1739    A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
1740    and the literal as the value.
1741    """
1742    from meerschaum.utils.packages import import_pandas
1743    from meerschaum.utils.warnings import error, warn
1744    from meerschaum.utils.debug import dprint
1745    from meerschaum.utils.dtypes import get_current_timestamp
1746
1747    if pipe is None or literal is None:
1748        error("Please provide a Pipe and a literal value")
1749
1750    dt_col = pipe.columns.get(
1751        'datetime',
1752        mrsm.get_config('pipes', 'autotime', 'column_name_if_datetime_missing')
1753    )
1754    val_col = pipe.get_val_column(debug=debug)
1755
1756    val = literal
1757    if isinstance(literal, str):
1758        if debug:
1759            dprint(f"Received literal string: '{literal}'")
1760        import ast
1761        try:
1762            val = ast.literal_eval(literal)
1763        except Exception:
1764            warn(
1765                "Failed to parse value from string:\n" + f"{literal}" +
1766                "\n\nWill cast as a string instead."\
1767            )
1768            val = literal
1769
1770    now = get_current_timestamp(pipe.precision)
1771    pd = import_pandas()
1772    return pd.DataFrame({dt_col: [now], val_col: [val]})

Construct a dataframe from a literal value, using the pipe's datetime and value column names.

Parameters
  • pipe (Optional['meerschaum.Pipe'], default None): The pipe which will consume the literal value.
Returns
  • A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
  • and the literal as the value.
def get_first_valid_dask_partition( ddf: "'dask.dataframe.DataFrame'") -> Optional[pandas.core.frame.DataFrame]:
1775def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]:
1776    """
1777    Return the first valid Dask DataFrame partition (if possible).
1778    """
1779    pdf = None
1780    for partition in ddf.partitions:
1781        try:
1782            pdf = partition.compute()
1783        except Exception:
1784            continue
1785        if len(pdf) > 0:
1786            return pdf
1787    _ = mrsm.attempt_import('partd', lazy=False)
1788    return ddf.compute()

Return the first valid Dask DataFrame partition (if possible).

def query_df( df: pandas.core.frame.DataFrame, params: Optional[Dict[str, Any]] = None, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, datetime_column: Optional[str] = None, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, inplace: bool = False, reset_index: bool = False, coerce_types: bool = False, debug: bool = False) -> pandas.core.frame.DataFrame:
1791def query_df(
1792    df: 'pd.DataFrame',
1793    params: Optional[Dict[str, Any]] = None,
1794    begin: Union[datetime, int, None] = None,
1795    end: Union[datetime, int, None] = None,
1796    datetime_column: Optional[str] = None,
1797    select_columns: Optional[List[str]] = None,
1798    omit_columns: Optional[List[str]] = None,
1799    inplace: bool = False,
1800    reset_index: bool = False,
1801    coerce_types: bool = False,
1802    debug: bool = False,
1803) -> 'pd.DataFrame':
1804    """
1805    Query the dataframe with the params dictionary.
1806
1807    Parameters
1808    ----------
1809    df: pd.DataFrame
1810        The DataFrame to query against.
1811
1812    params: Optional[Dict[str, Any]], default None
1813        The parameters dictionary to use for the query.
1814
1815    begin: Union[datetime, int, None], default None
1816        If `begin` and `datetime_column` are provided, only return rows with a timestamp
1817        greater than or equal to this value.
1818
1819    end: Union[datetime, int, None], default None
1820        If `begin` and `datetime_column` are provided, only return rows with a timestamp
1821        less than this value.
1822
1823    datetime_column: Optional[str], default None
1824        A `datetime_column` must be provided to use `begin` and `end`.
1825
1826    select_columns: Optional[List[str]], default None
1827        If provided, only return these columns.
1828
1829    omit_columns: Optional[List[str]], default None
1830        If provided, do not include these columns in the result.
1831
1832    inplace: bool, default False
1833        If `True`, modify the DataFrame inplace rather than creating a new DataFrame.
1834
1835    reset_index: bool, default False
1836        If `True`, reset the index in the resulting DataFrame.
1837
1838    coerce_types: bool, default False
1839        If `True`, cast the dataframe and parameters as strings before querying.
1840
1841    Returns
1842    -------
1843    A Pandas DataFrame query result.
1844    """
1845
1846    def _process_select_columns(_df):
1847        if not select_columns:
1848            return
1849        for col in list(_df.columns):
1850            if col not in select_columns:
1851                del _df[col]
1852
1853    def _process_omit_columns(_df):
1854        if not omit_columns:
1855            return
1856        for col in list(_df.columns):
1857            if col in omit_columns:
1858                del _df[col]
1859
1860    if not params and not begin and not end:
1861        if not inplace:
1862            df = df.copy()
1863        _process_select_columns(df)
1864        _process_omit_columns(df)
1865        return df
1866
1867    from meerschaum.utils.debug import dprint
1868    from meerschaum.utils.misc import get_in_ex_params
1869    from meerschaum.utils.warnings import warn
1870    from meerschaum.utils.dtypes import are_dtypes_equal, value_is_null
1871    dateutil_parser = mrsm.attempt_import('dateutil.parser')
1872    pandas = mrsm.attempt_import('pandas')
1873    NA = pandas.NA
1874
1875    if params:
1876        proto_in_ex_params = get_in_ex_params(params)
1877        for key, (proto_in_vals, proto_ex_vals) in proto_in_ex_params.items():
1878            if proto_ex_vals:
1879                coerce_types = True
1880                break
1881        params = params.copy()
1882        for key, val in {k: v for k, v in params.items()}.items():
1883            if isinstance(val, (list, tuple, set)) or hasattr(val, 'astype'):
1884                if None in val:
1885                    val = [item for item in val if item is not None] + [NA]
1886                    params[key] = val
1887                if coerce_types:
1888                    params[key] = [str(x) for x in val]
1889            else:
1890                if value_is_null(val):
1891                    val = NA
1892                    params[key] = NA
1893                if coerce_types:
1894                    params[key] = str(val)
1895
1896    dtypes = {col: str(typ) for col, typ in df.dtypes.items()}
1897
1898    if inplace:
1899        df.fillna(NA, inplace=True)
1900    else:
1901        df = df.infer_objects().fillna(NA)
1902
1903    if isinstance(begin, str):
1904        begin = dateutil_parser.parse(begin)
1905    if isinstance(end, str):
1906        end = dateutil_parser.parse(end)
1907
1908    if begin is not None or end is not None:
1909        if not datetime_column or datetime_column not in df.columns:
1910            warn(
1911                f"The datetime column '{datetime_column}' is not present in the Dataframe, "
1912                + "ignoring begin and end...",
1913            )
1914            begin, end = None, None
1915
1916    if debug:
1917        dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}")
1918
1919    if datetime_column and (begin is not None or end is not None):
1920        if debug:
1921            dprint("Checking for datetime column compatability.")
1922
1923        from meerschaum.utils.dtypes import coerce_timezone
1924        df_is_dt = are_dtypes_equal(str(df.dtypes[datetime_column]), 'datetime')
1925        begin_is_int = are_dtypes_equal(str(type(begin)), 'int')
1926        end_is_int = are_dtypes_equal(str(type(end)), 'int')
1927
1928        if df_is_dt:
1929            df_tz = (
1930                getattr(df[datetime_column].dt, 'tz', None)
1931                if hasattr(df[datetime_column], 'dt')
1932                else None
1933            )
1934
1935            if begin_is_int:
1936                begin = datetime.fromtimestamp(int(begin), timezone.utc).replace(tzinfo=None)
1937                if debug:
1938                    dprint(f"`begin` will be cast to '{begin}'.")
1939            if end_is_int:
1940                end = datetime.fromtimestamp(int(end), timezone.utc).replace(tzinfo=None)
1941                if debug:
1942                    dprint(f"`end` will be cast to '{end}'.")
1943
1944            begin = coerce_timezone(begin, strip_utc=(df_tz is None)) if begin is not None else None
1945            end = coerce_timezone(end, strip_utc=(df_tz is None)) if begin is not None else None
1946
1947    in_ex_params = get_in_ex_params(params)
1948
1949    masks = [
1950        (
1951            (df[datetime_column] >= begin)
1952            if begin is not None and datetime_column
1953            else True
1954        ) & (
1955            (df[datetime_column] < end)
1956            if end is not None and datetime_column
1957            else True
1958        )
1959    ]
1960
1961    masks.extend([
1962        (
1963            (
1964                (df[col] if not coerce_types else df[col].astype(str)).isin(in_vals)
1965                if in_vals
1966                else True
1967            ) & (
1968                ~(df[col] if not coerce_types else df[col].astype(str)).isin(ex_vals)
1969                if ex_vals
1970                else True
1971            )
1972        )
1973        for col, (in_vals, ex_vals) in in_ex_params.items()
1974        if col in df.columns
1975    ])
1976    query_mask = masks[0]
1977    for mask in masks[1:]:
1978        query_mask = query_mask & mask
1979
1980    original_cols = df.columns
1981
1982    ### NOTE: We must cast bool columns to `boolean[pyarrow]`
1983    ###       to allow for `<NA>` values.
1984    bool_cols = [
1985        col
1986        for col, typ in df.dtypes.items()
1987        if are_dtypes_equal(str(typ), 'bool')
1988    ]
1989    for col in bool_cols:
1990        df[col] = df[col].astype('boolean[pyarrow]')
1991
1992    if not isinstance(query_mask, bool):
1993        df['__mrsm_mask'] = (
1994            query_mask.astype('boolean[pyarrow]')
1995            if hasattr(query_mask, 'astype')
1996            else query_mask
1997        )
1998
1999        if inplace:
2000            df.where(query_mask, other=NA, inplace=True)
2001            df.dropna(how='all', inplace=True)
2002            result_df = df
2003        else:
2004            result_df = df.where(query_mask, other=NA)
2005            result_df.dropna(how='all', inplace=True)
2006
2007    else:
2008        result_df = df
2009
2010    if '__mrsm_mask' in df.columns:
2011        del df['__mrsm_mask']
2012    if '__mrsm_mask' in result_df.columns:
2013        del result_df['__mrsm_mask']
2014
2015    if reset_index:
2016        result_df.reset_index(drop=True, inplace=True)
2017
2018    result_df = enforce_dtypes(
2019        result_df,
2020        dtypes,
2021        safe_copy=False,
2022        debug=debug,
2023        coerce_numeric=False,
2024        coerce_timezone=False,
2025    )
2026
2027    if select_columns == ['*']:
2028        select_columns = None
2029
2030    if not select_columns and not omit_columns:
2031        return result_df[original_cols]
2032
2033    _process_select_columns(result_df)
2034    _process_omit_columns(result_df)
2035
2036    return result_df

Query the dataframe with the params dictionary.

Parameters
  • df (pd.DataFrame): The DataFrame to query against.
  • params (Optional[Dict[str, Any]], default None): The parameters dictionary to use for the query.
  • begin (Union[datetime, int, None], default None): If begin and datetime_column are provided, only return rows with a timestamp greater than or equal to this value.
  • end (Union[datetime, int, None], default None): If begin and datetime_column are provided, only return rows with a timestamp less than this value.
  • datetime_column (Optional[str], default None): A datetime_column must be provided to use begin and end.
  • select_columns (Optional[List[str]], default None): If provided, only return these columns.
  • omit_columns (Optional[List[str]], default None): If provided, do not include these columns in the result.
  • inplace (bool, default False): If True, modify the DataFrame inplace rather than creating a new DataFrame.
  • reset_index (bool, default False): If True, reset the index in the resulting DataFrame.
  • coerce_types (bool, default False): If True, cast the dataframe and parameters as strings before querying.
Returns
  • A Pandas DataFrame query result.
def to_json( df: pandas.core.frame.DataFrame, safe_copy: bool = True, orient: str = 'records', date_format: str = 'iso', date_unit: str = 'us', double_precision: int = 15, geometry_format: str = 'geojson', **kwargs: Any) -> str:
2039def to_json(
2040    df: 'pd.DataFrame',
2041    safe_copy: bool = True,
2042    orient: str = 'records',
2043    date_format: str = 'iso',
2044    date_unit: str = 'us',
2045    double_precision: int = 15,
2046    geometry_format: str = 'geojson',
2047    **kwargs: Any
2048) -> str:
2049    """
2050    Serialize the given dataframe as a JSON string.
2051
2052    Parameters
2053    ----------
2054    df: pd.DataFrame
2055        The DataFrame to be serialized.
2056
2057    safe_copy: bool, default True
2058        If `False`, modify the DataFrame inplace.
2059
2060    date_format: str, default 'iso'
2061        The default format for timestamps.
2062
2063    date_unit: str, default 'us'
2064        The precision of the timestamps.
2065
2066    double_precision: int, default 15
2067        The number of decimal places to use when encoding floating point values (maximum 15).
2068
2069    geometry_format: str, default 'geojson'
2070        The serialization format for geometry data.
2071        Accepted values are `geojson`, `wkb_hex`, and `wkt`.
2072
2073    Returns
2074    -------
2075    A JSON string.
2076    """
2077    import warnings
2078    import functools
2079    from meerschaum.utils.packages import import_pandas
2080    from meerschaum.utils.dtypes import (
2081        serialize_bytes,
2082        serialize_decimal,
2083        serialize_geometry,
2084    )
2085    pd = import_pandas()
2086    uuid_cols = get_uuid_cols(df)
2087    bytes_cols = get_bytes_cols(df)
2088    numeric_cols = get_numeric_cols(df)
2089    geometry_cols = get_geometry_cols(df)
2090    geometry_cols_srids = {
2091        col: int((getattr(df[col].crs, 'srs', '') or '').split(':', maxsplit=1)[-1] or '0')
2092        for col in geometry_cols
2093    } if 'geodataframe' in str(type(df)).lower() else {}
2094    if safe_copy and bool(uuid_cols or bytes_cols or geometry_cols or numeric_cols):
2095        df = df.copy()
2096    for col in uuid_cols:
2097        df[col] = df[col].astype(str)
2098    for col in bytes_cols:
2099        df[col] = df[col].apply(serialize_bytes)
2100    for col in numeric_cols:
2101        df[col] = df[col].apply(serialize_decimal)
2102    with warnings.catch_warnings():
2103        warnings.simplefilter("ignore")
2104        for col in geometry_cols:
2105            srid = geometry_cols_srids.get(col, None) or None
2106            df[col] = df[col].apply(
2107                functools.partial(
2108                    serialize_geometry,
2109                    geometry_format=geometry_format,
2110                    srid=srid,
2111                )
2112            )
2113    return df.infer_objects(copy=False).fillna(pd.NA).to_json(
2114        date_format=date_format,
2115        date_unit=date_unit,
2116        double_precision=double_precision,
2117        orient=orient,
2118        **kwargs
2119    )

Serialize the given dataframe as a JSON string.

Parameters
  • df (pd.DataFrame): The DataFrame to be serialized.
  • safe_copy (bool, default True): If False, modify the DataFrame inplace.
  • date_format (str, default 'iso'): The default format for timestamps.
  • date_unit (str, default 'us'): The precision of the timestamps.
  • double_precision (int, default 15): The number of decimal places to use when encoding floating point values (maximum 15).
  • geometry_format (str, default 'geojson'): The serialization format for geometry data. Accepted values are geojson, wkb_hex, and wkt.
Returns
  • A JSON string.
def to_simple_lines(df: pandas.core.frame.DataFrame) -> str:
2122def to_simple_lines(df: 'pd.DataFrame') -> str:
2123    """
2124    Serialize a Pandas Dataframe as lines of simple dictionaries.
2125
2126    Parameters
2127    ----------
2128    df: pd.DataFrame
2129        The dataframe to serialize into simple lines text.
2130
2131    Returns
2132    -------
2133    A string of simple line dictionaries joined by newlines.
2134    """
2135    from meerschaum.utils.misc import to_simple_dict
2136    if df is None or len(df) == 0:
2137        return ''
2138
2139    docs = df.to_dict(orient='records')
2140    return '\n'.join(to_simple_dict(doc) for doc in docs)

Serialize a Pandas Dataframe as lines of simple dictionaries.

Parameters
  • df (pd.DataFrame): The dataframe to serialize into simple lines text.
Returns
  • A string of simple line dictionaries joined by newlines.
def parse_simple_lines(data: str) -> pandas.core.frame.DataFrame:
2143def parse_simple_lines(data: str) -> 'pd.DataFrame':
2144    """
2145    Parse simple lines text into a DataFrame.
2146
2147    Parameters
2148    ----------
2149    data: str
2150        The simple lines text to parse into a DataFrame.
2151
2152    Returns
2153    -------
2154    A dataframe containing the rows serialized in `data`.
2155    """
2156    from meerschaum.utils.misc import string_to_dict
2157    from meerschaum.utils.packages import import_pandas
2158    pd = import_pandas()
2159    lines = data.splitlines()
2160    try:
2161        docs = [string_to_dict(line) for line in lines]
2162        df = pd.DataFrame(docs)
2163    except Exception:
2164        df = None
2165
2166    if df is None:
2167        raise ValueError("Cannot parse simple lines into a dataframe.")
2168
2169    return df

Parse simple lines text into a DataFrame.

Parameters
  • data (str): The simple lines text to parse into a DataFrame.
Returns
  • A dataframe containing the rows serialized in data.