meerschaum.utils.dataframe

Utility functions for working with DataFrames.

   1#! /usr/bin/env python3
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4
   5"""
   6Utility functions for working with DataFrames.
   7"""
   8
   9from __future__ import annotations
  10
  11import pathlib
  12from datetime import datetime, timezone
  13from collections import defaultdict
  14
  15import meerschaum as mrsm
  16from meerschaum.utils.typing import (
  17    Optional, Dict, Any, List, Hashable, Generator,
  18    Iterator, Iterable, Union, TYPE_CHECKING,
  19)
  20
  21if TYPE_CHECKING:
  22    pd, dask = mrsm.attempt_import('pandas', 'dask')
  23
  24
  25def add_missing_cols_to_df(
  26    df: 'pd.DataFrame',
  27    dtypes: Dict[str, Any],
  28) -> 'pd.DataFrame':
  29    """
  30    Add columns from the dtypes dictionary as null columns to a new DataFrame.
  31
  32    Parameters
  33    ----------
  34    df: pd.DataFrame
  35        The dataframe we should copy and add null columns.
  36
  37    dtypes:
  38        The data types dictionary which may contain keys not present in `df.columns`.
  39
  40    Returns
  41    -------
  42    A new `DataFrame` with the keys from `dtypes` added as null columns.
  43    If `df.dtypes` is the same as `dtypes`, then return a reference to `df`.
  44    NOTE: This will not ensure that dtypes are enforced!
  45
  46    Examples
  47    --------
  48    >>> import pandas as pd
  49    >>> df = pd.DataFrame([{'a': 1}])
  50    >>> dtypes = {'b': 'Int64'}
  51    >>> add_missing_cols_to_df(df, dtypes)
  52          a  b
  53       0  1  <NA>
  54    >>> add_missing_cols_to_df(df, dtypes).dtypes
  55    a    int64
  56    b    Int64
  57    dtype: object
  58    >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes
  59    a    int64
  60    dtype: object
  61    >>> 
  62    """
  63    if set(df.columns) == set(dtypes):
  64        return df
  65
  66    from meerschaum.utils.packages import attempt_import
  67    from meerschaum.utils.dtypes import to_pandas_dtype
  68    pandas = attempt_import('pandas')
  69
  70    def build_series(dtype: str):
  71        return pandas.Series([], dtype=to_pandas_dtype(dtype))
  72
  73    assign_kwargs = {
  74        str(col): build_series(str(typ))
  75        for col, typ in dtypes.items()
  76        if col not in df.columns
  77    }
  78    df_with_cols = df.assign(**assign_kwargs)
  79    for col in assign_kwargs:
  80        df_with_cols[col] = df_with_cols[col].fillna(pandas.NA)
  81    return df_with_cols
  82
  83
  84def filter_unseen_df(
  85    old_df: 'pd.DataFrame',
  86    new_df: 'pd.DataFrame',
  87    safe_copy: bool = True,
  88    dtypes: Optional[Dict[str, Any]] = None,
  89    include_unchanged_columns: bool = False,
  90    coerce_mixed_numerics: bool = True,
  91    debug: bool = False,
  92) -> 'pd.DataFrame':
  93    """
  94    Left join two DataFrames to find the newest unseen data.
  95
  96    Parameters
  97    ----------
  98    old_df: 'pd.DataFrame'
  99        The original (target) dataframe. Acts as a filter on the `new_df`.
 100
 101    new_df: 'pd.DataFrame'
 102        The fetched (source) dataframe. Rows that are contained in `old_df` are removed.
 103
 104    safe_copy: bool, default True
 105        If `True`, create a copy before comparing and modifying the dataframes.
 106        Setting to `False` may mutate the DataFrames.
 107
 108    dtypes: Optional[Dict[str, Any]], default None
 109        Optionally specify the datatypes of the dataframe.
 110
 111    include_unchanged_columns: bool, default False
 112        If `True`, include columns which haven't changed on rows which have changed.
 113
 114    coerce_mixed_numerics: bool, default True
 115        If `True`, cast mixed integer and float columns between the old and new dataframes into
 116        numeric values (`decimal.Decimal`).
 117
 118    debug: bool, default False
 119        Verbosity toggle.
 120
 121    Returns
 122    -------
 123    A pandas dataframe of the new, unseen rows in `new_df`.
 124
 125    Examples
 126    --------
 127    ```python
 128    >>> import pandas as pd
 129    >>> df1 = pd.DataFrame({'a': [1,2]})
 130    >>> df2 = pd.DataFrame({'a': [2,3]})
 131    >>> filter_unseen_df(df1, df2)
 132       a
 133    0  3
 134
 135    ```
 136
 137    """
 138    if old_df is None:
 139        return new_df
 140
 141    if safe_copy:
 142        old_df = old_df.copy()
 143        new_df = new_df.copy()
 144
 145    import json
 146    import functools
 147    import traceback
 148    from meerschaum.utils.warnings import warn
 149    from meerschaum.utils.packages import import_pandas, attempt_import
 150    from meerschaum.utils.dtypes import (
 151        to_pandas_dtype,
 152        are_dtypes_equal,
 153        attempt_cast_to_numeric,
 154        attempt_cast_to_uuid,
 155        attempt_cast_to_bytes,
 156        coerce_timezone,
 157        serialize_decimal,
 158    )
 159    from meerschaum.utils.dtypes.sql import get_numeric_precision_scale
 160    pd = import_pandas(debug=debug)
 161    is_dask = 'dask' in new_df.__module__
 162    if is_dask:
 163        pandas = attempt_import('pandas')
 164        _ = attempt_import('partd', lazy=False)
 165        dd = attempt_import('dask.dataframe')
 166        merge = dd.merge
 167        NA = pandas.NA
 168    else:
 169        merge = pd.merge
 170        NA = pd.NA
 171
 172    new_df_dtypes = dict(new_df.dtypes)
 173    old_df_dtypes = dict(old_df.dtypes)
 174
 175    same_cols = set(new_df.columns) == set(old_df.columns)
 176    if not same_cols:
 177        new_df = add_missing_cols_to_df(new_df, old_df_dtypes)
 178        old_df = add_missing_cols_to_df(old_df, new_df_dtypes)
 179
 180        new_types_missing_from_old = {
 181            col: typ
 182            for col, typ in new_df_dtypes.items()
 183            if col not in old_df_dtypes
 184        }
 185        old_types_missing_from_new = {
 186            col: typ
 187            for col, typ in new_df_dtypes.items()
 188            if col not in old_df_dtypes
 189        }
 190        old_df_dtypes.update(new_types_missing_from_old)
 191        new_df_dtypes.update(old_types_missing_from_new)
 192
 193    ### Edge case: two empty lists cast to DFs.
 194    elif len(new_df.columns) == 0:
 195        return new_df
 196
 197    try:
 198        ### Order matters when checking equality.
 199        new_df = new_df[old_df.columns]
 200
 201    except Exception as e:
 202        warn(
 203            "Was not able to cast old columns onto new DataFrame. " +
 204            f"Are both DataFrames the same shape? Error:\n{e}",
 205            stacklevel=3,
 206        )
 207        return new_df[list(new_df_dtypes.keys())]
 208
 209    ### assume the old_df knows what it's doing, even if it's technically wrong.
 210    if dtypes is None:
 211        dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()}
 212
 213    dtypes = {
 214        col: to_pandas_dtype(typ)
 215        for col, typ in dtypes.items()
 216        if col in new_df_dtypes and col in old_df_dtypes
 217    }
 218    for col, typ in new_df_dtypes.items():
 219        if col not in dtypes:
 220            dtypes[col] = typ
 221
 222    numeric_cols_precisions_scales = {
 223        col: get_numeric_precision_scale(None, typ)
 224        for col, typ in dtypes.items()
 225        if col and typ and typ.startswith('numeric')
 226    }
 227
 228    dt_dtypes = {
 229        col: typ
 230        for col, typ in dtypes.items()
 231        if are_dtypes_equal(typ, 'datetime')
 232    }
 233    non_dt_dtypes = {
 234        col: typ
 235        for col, typ in dtypes.items()
 236        if col not in dt_dtypes
 237    }
 238
 239    cast_non_dt_cols = True
 240    try:
 241        new_df = new_df.astype(non_dt_dtypes)
 242        cast_non_dt_cols = False
 243    except Exception as e:
 244        warn(
 245            f"Was not able to cast the new DataFrame to the given dtypes.\n{e}"
 246        )
 247
 248    cast_dt_cols = True
 249    try:
 250        for col, typ in dt_dtypes.items():
 251            strip_utc = (
 252                (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]'
 253            )
 254            if col in old_df.columns:
 255                old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc)
 256            if col in new_df.columns:
 257                new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc)
 258        cast_dt_cols = False
 259    except Exception as e:
 260        warn(f"Could not cast datetime columns:\n{e}")
 261
 262    cast_cols = cast_dt_cols or cast_non_dt_cols
 263
 264    new_numeric_cols_existing = get_numeric_cols(new_df)
 265    old_numeric_cols = get_numeric_cols(old_df)
 266    for col, typ in {k: v for k, v in dtypes.items()}.items():
 267        if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')):
 268            new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float')
 269            new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int')
 270            new_is_numeric = col in new_numeric_cols_existing
 271            old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float')
 272            old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int')
 273            old_is_numeric = col in old_numeric_cols
 274
 275            if (
 276                coerce_mixed_numerics
 277                and
 278                (new_is_float or new_is_int or new_is_numeric)
 279                and
 280                (old_is_float or old_is_int or old_is_numeric)
 281            ):
 282                dtypes[col] = attempt_cast_to_numeric
 283                cast_cols = True
 284                continue
 285
 286            ### Fallback to object if the types don't match.
 287            warn(
 288                f"Detected different types for '{col}' "
 289                + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), "
 290                + "falling back to 'object'..."
 291            )
 292            dtypes[col] = 'object'
 293            cast_cols = True
 294
 295    if cast_cols:
 296        for col, dtype in dtypes.items():
 297            if col in new_df.columns:
 298                try:
 299                    new_df[col] = (
 300                        new_df[col].astype(dtype)
 301                        if not callable(dtype)
 302                        else new_df[col].apply(dtype)
 303                    )
 304                except Exception as e:
 305                    warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}")
 306
 307    serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str)
 308    new_json_cols = get_json_cols(new_df)
 309    old_json_cols = get_json_cols(old_df)
 310    json_cols = set(new_json_cols + old_json_cols)
 311    for json_col in old_json_cols:
 312        old_df[json_col] = old_df[json_col].apply(serializer)
 313    for json_col in new_json_cols:
 314        new_df[json_col] = new_df[json_col].apply(serializer)
 315
 316    new_numeric_cols = get_numeric_cols(new_df)
 317    numeric_cols = set(new_numeric_cols + old_numeric_cols)
 318    for numeric_col in old_numeric_cols:
 319        old_df[numeric_col] = old_df[numeric_col].apply(serialize_decimal)
 320    for numeric_col in new_numeric_cols:
 321        new_df[numeric_col] = new_df[numeric_col].apply(serialize_decimal)
 322
 323    old_dt_cols = [
 324        col
 325        for col, typ in old_df.dtypes.items()
 326        if are_dtypes_equal(str(typ), 'datetime')
 327    ]
 328    for col in old_dt_cols:
 329        strip_utc = (
 330            (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]'
 331        )
 332        old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc)
 333
 334    new_dt_cols = [
 335        col
 336        for col, typ in new_df.dtypes.items()
 337        if are_dtypes_equal(str(typ), 'datetime')
 338    ]
 339    for col in new_dt_cols:
 340        strip_utc = (
 341            (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]'
 342        )
 343        new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc)
 344
 345    old_uuid_cols = get_uuid_cols(old_df)
 346    new_uuid_cols = get_uuid_cols(new_df)
 347    uuid_cols = set(new_uuid_cols + old_uuid_cols)
 348
 349    old_bytes_cols = get_bytes_cols(old_df)
 350    new_bytes_cols = get_bytes_cols(new_df)
 351    bytes_cols = set(new_bytes_cols + old_bytes_cols)
 352
 353    joined_df = merge(
 354        new_df.infer_objects(copy=False).fillna(NA),
 355        old_df.infer_objects(copy=False).fillna(NA),
 356        how='left',
 357        on=None,
 358        indicator=True,
 359    )
 360    changed_rows_mask = (joined_df['_merge'] == 'left_only')
 361    new_cols = list(new_df_dtypes)
 362    delta_df = joined_df[new_cols][changed_rows_mask].reset_index(drop=True)
 363
 364    for json_col in json_cols:
 365        if json_col not in delta_df.columns:
 366            continue
 367        try:
 368            delta_df[json_col] = delta_df[json_col].apply(json.loads)
 369        except Exception:
 370            warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}")
 371
 372    for numeric_col in numeric_cols:
 373        if numeric_col not in delta_df.columns:
 374            continue
 375        try:
 376            delta_df[numeric_col] = delta_df[numeric_col].apply(
 377                functools.partial(
 378                    attempt_cast_to_numeric,
 379                    quantize=True,
 380                    precision=numeric_cols_precisions_scales.get(numeric_col, (None, None)[0]),
 381                    scale=numeric_cols_precisions_scales.get(numeric_col, (None, None)[1]),
 382                )
 383            )
 384        except Exception:
 385            warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}")
 386
 387    for uuid_col in uuid_cols:
 388        if uuid_col not in delta_df.columns:
 389            continue
 390        try:
 391            delta_df[uuid_col] = delta_df[uuid_col].apply(attempt_cast_to_uuid)
 392        except Exception:
 393            warn(f"Unable to parse numeric column '{uuid_col}':\n{traceback.format_exc()}")
 394
 395    for bytes_col in bytes_cols:
 396        if bytes_col not in delta_df.columns:
 397            continue
 398        try:
 399            delta_df[bytes_col] = delta_df[bytes_col].apply(attempt_cast_to_bytes)
 400        except Exception:
 401            warn(f"Unable to parse bytes column '{bytes_col}':\n{traceback.format_exc()}")
 402
 403    return delta_df
 404
 405
 406def parse_df_datetimes(
 407    df: 'pd.DataFrame',
 408    ignore_cols: Optional[Iterable[str]] = None,
 409    strip_timezone: bool = False,
 410    chunksize: Optional[int] = None,
 411    dtype_backend: str = 'numpy_nullable',
 412    ignore_all: bool = False,
 413    debug: bool = False,
 414) -> 'pd.DataFrame':
 415    """
 416    Parse a pandas DataFrame for datetime columns and cast as datetimes.
 417
 418    Parameters
 419    ----------
 420    df: pd.DataFrame
 421        The pandas DataFrame to parse.
 422
 423    ignore_cols: Optional[Iterable[str]], default None
 424        If provided, do not attempt to coerce these columns as datetimes.
 425
 426    strip_timezone: bool, default False
 427        If `True`, remove the UTC `tzinfo` property.
 428
 429    chunksize: Optional[int], default None
 430        If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe.
 431
 432    dtype_backend: str, default 'numpy_nullable'
 433        If `df` is not a DataFrame and new one needs to be constructed,
 434        use this as the datatypes backend.
 435        Accepted values are 'numpy_nullable' and 'pyarrow'.
 436
 437    ignore_all: bool, default False
 438        If `True`, do not attempt to cast any columns to datetimes.
 439
 440    debug: bool, default False
 441        Verbosity toggle.
 442
 443    Returns
 444    -------
 445    A new pandas DataFrame with the determined datetime columns
 446    (usually ISO strings) cast as datetimes.
 447
 448    Examples
 449    --------
 450    ```python
 451    >>> import pandas as pd
 452    >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 
 453    >>> df.dtypes
 454    a    object
 455    dtype: object
 456    >>> df = parse_df_datetimes(df)
 457    >>> df.dtypes
 458    a    datetime64[ns]
 459    dtype: object
 460
 461    ```
 462
 463    """
 464    from meerschaum.utils.packages import import_pandas, attempt_import
 465    from meerschaum.utils.debug import dprint
 466    from meerschaum.utils.warnings import warn
 467    from meerschaum.utils.misc import items_str
 468    from meerschaum.utils.dtypes import to_datetime
 469    import traceback
 470    pd = import_pandas()
 471    pandas = attempt_import('pandas')
 472    pd_name = pd.__name__
 473    using_dask = 'dask' in pd_name
 474    df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__)
 475    dask_dataframe = None
 476    if using_dask or df_is_dask:
 477        npartitions = chunksize_to_npartitions(chunksize)
 478        dask_dataframe = attempt_import('dask.dataframe')
 479
 480    ### if df is a dict, build DataFrame
 481    if isinstance(df, pandas.DataFrame):
 482        pdf = df
 483    elif df_is_dask and isinstance(df, dask_dataframe.DataFrame):
 484        pdf = get_first_valid_dask_partition(df)
 485    else:
 486        if debug:
 487            dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...")
 488
 489        if using_dask:
 490            if isinstance(df, list):
 491                keys = set()
 492                for doc in df:
 493                    for key in doc:
 494                        keys.add(key)
 495                df = pd.DataFrame.from_dict(
 496                    {
 497                        k: [
 498                            doc.get(k, None)
 499                            for doc in df
 500                        ] for k in keys
 501                    },
 502                    npartitions=npartitions,
 503                )
 504            elif isinstance(df, dict):
 505                df = pd.DataFrame.from_dict(df, npartitions=npartitions)
 506            elif 'pandas.core.frame.DataFrame' in str(type(df)):
 507                df = pd.from_pandas(df, npartitions=npartitions)
 508            else:
 509                raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.")
 510            pandas = attempt_import('pandas')
 511            pdf = get_first_valid_dask_partition(df)
 512
 513        else:
 514            df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend)
 515            pdf = df
 516
 517    ### skip parsing if DataFrame is empty
 518    if len(pdf) == 0:
 519        if debug:
 520            dprint("df is empty. Returning original DataFrame without casting datetime columns...")
 521        return df
 522
 523    ignore_cols = set(
 524        (ignore_cols or []) + [
 525            col
 526            for col, dtype in pdf.dtypes.items() 
 527            if 'datetime' in str(dtype)
 528        ]
 529    )
 530    cols_to_inspect = [
 531        col
 532        for col in pdf.columns
 533        if col not in ignore_cols
 534    ] if not ignore_all else []
 535
 536    if len(cols_to_inspect) == 0:
 537        if debug:
 538            dprint("All columns are ignored, skipping datetime detection...")
 539        return df.infer_objects(copy=False).fillna(pandas.NA)
 540
 541    ### apply regex to columns to determine which are ISO datetimes
 542    iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+'
 543    dt_mask = pdf[cols_to_inspect].astype(str).apply(
 544        lambda s: s.str.match(iso_dt_regex).all()
 545    )
 546
 547    ### list of datetime column names
 548    datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]]
 549    if not datetime_cols:
 550        if debug:
 551            dprint("No columns detected as datetimes, returning...")
 552        return df.infer_objects(copy=False).fillna(pandas.NA)
 553
 554    if debug:
 555        dprint("Converting columns to datetimes: " + str(datetime_cols))
 556
 557    try:
 558        if not using_dask:
 559            df[datetime_cols] = df[datetime_cols].apply(to_datetime)
 560        else:
 561            df[datetime_cols] = df[datetime_cols].apply(
 562                to_datetime,
 563                utc=True,
 564                axis=1,
 565                meta={
 566                    col: 'datetime64[ns, UTC]'
 567                    for col in datetime_cols
 568                }
 569            )
 570    except Exception:
 571        warn(
 572            f"Unable to apply `pd.to_datetime` to {items_str(datetime_cols)}:\n"
 573            + f"{traceback.format_exc()}"
 574        )
 575
 576    if strip_timezone:
 577        for dt in datetime_cols:
 578            try:
 579                df[dt] = df[dt].dt.tz_localize(None)
 580            except Exception:
 581                warn(
 582                    f"Unable to convert column '{dt}' to naive datetime:\n"
 583                    + f"{traceback.format_exc()}"
 584                )
 585
 586    return df.fillna(pandas.NA)
 587
 588
 589def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]:
 590    """
 591    Get the columns which contain unhashable objects from a Pandas DataFrame.
 592
 593    Parameters
 594    ----------
 595    df: pd.DataFrame
 596        The DataFrame which may contain unhashable objects.
 597
 598    Returns
 599    -------
 600    A list of columns.
 601    """
 602    if df is None:
 603        return []
 604    if len(df) == 0:
 605        return []
 606
 607    is_dask = 'dask' in df.__module__
 608    if is_dask:
 609        from meerschaum.utils.packages import attempt_import
 610        pandas = attempt_import('pandas')
 611        df = pandas.DataFrame(get_first_valid_dask_partition(df))
 612    return [
 613        col for col, val in df.iloc[0].items()
 614        if not isinstance(val, Hashable)
 615    ]
 616
 617
 618def get_json_cols(df: 'pd.DataFrame') -> List[str]:
 619    """
 620    Get the columns which contain unhashable objects from a Pandas DataFrame.
 621
 622    Parameters
 623    ----------
 624    df: pd.DataFrame
 625        The DataFrame which may contain unhashable objects.
 626
 627    Returns
 628    -------
 629    A list of columns to be encoded as JSON.
 630    """
 631    if df is None:
 632        return []
 633
 634    is_dask = 'dask' in df.__module__ if hasattr(df, '__module__') else False
 635    if is_dask:
 636        df = get_first_valid_dask_partition(df)
 637
 638    if len(df) == 0:
 639        return []
 640
 641    cols_indices = {
 642        col: df[col].first_valid_index()
 643        for col in df.columns
 644    }
 645    return [
 646        col
 647        for col, ix in cols_indices.items()
 648        if (
 649            ix is not None
 650            and
 651            not isinstance(df.loc[ix][col], Hashable)
 652        )
 653    ]
 654
 655
 656def get_numeric_cols(df: 'pd.DataFrame') -> List[str]:
 657    """
 658    Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame.
 659
 660    Parameters
 661    ----------
 662    df: pd.DataFrame
 663        The DataFrame which may contain decimal objects.
 664
 665    Returns
 666    -------
 667    A list of columns to treat as numerics.
 668    """
 669    if df is None:
 670        return []
 671    from decimal import Decimal
 672    is_dask = 'dask' in df.__module__
 673    if is_dask:
 674        df = get_first_valid_dask_partition(df)
 675
 676    if len(df) == 0:
 677        return []
 678
 679    cols_indices = {
 680        col: df[col].first_valid_index()
 681        for col in df.columns
 682    }
 683    return [
 684        col
 685        for col, ix in cols_indices.items()
 686        if (
 687            ix is not None
 688            and
 689            isinstance(df.loc[ix][col], Decimal)
 690        )
 691    ]
 692
 693
 694def get_uuid_cols(df: 'pd.DataFrame') -> List[str]:
 695    """
 696    Get the columns which contain `uuid.UUID` objects from a Pandas DataFrame.
 697
 698    Parameters
 699    ----------
 700    df: pd.DataFrame
 701        The DataFrame which may contain UUID objects.
 702
 703    Returns
 704    -------
 705    A list of columns to treat as UUIDs.
 706    """
 707    if df is None:
 708        return []
 709    from uuid import UUID
 710    is_dask = 'dask' in df.__module__
 711    if is_dask:
 712        df = get_first_valid_dask_partition(df)
 713
 714    if len(df) == 0:
 715        return []
 716
 717    cols_indices = {
 718        col: df[col].first_valid_index()
 719        for col in df.columns
 720    }
 721    return [
 722        col
 723        for col, ix in cols_indices.items()
 724        if (
 725            ix is not None
 726            and
 727            isinstance(df.loc[ix][col], UUID)
 728        )
 729    ]
 730
 731
 732def get_datetime_cols(
 733    df: 'pd.DataFrame',
 734    timezone_aware: bool = True,
 735    timezone_naive: bool = True,
 736) -> List[str]:
 737    """
 738    Get the columns which contain `datetime` or `Timestamp` objects from a Pandas DataFrame.
 739
 740    Parameters
 741    ----------
 742    df: pd.DataFrame
 743        The DataFrame which may contain `datetime` or `Timestamp` objects.
 744
 745    timezone_aware: bool, default True
 746        If `True`, include timezone-aware datetime columns.
 747
 748    timezone_naive: bool, default True
 749        If `True`, include timezone-naive datetime columns.
 750
 751    Returns
 752    -------
 753    A list of columns to treat as datetimes.
 754    """
 755    if not timezone_aware and not timezone_naive:
 756        raise ValueError("`timezone_aware` and `timezone_naive` cannot both be `False`.")
 757
 758    if df is None:
 759        return []
 760
 761    from datetime import datetime
 762    from meerschaum.utils.dtypes import are_dtypes_equal
 763    is_dask = 'dask' in df.__module__
 764    if is_dask:
 765        df = get_first_valid_dask_partition(df)
 766
 767    known_dt_cols = [
 768        col
 769        for col, typ in df.dtypes.items()
 770        if are_dtypes_equal('datetime', str(typ))
 771    ]
 772
 773    if len(df) == 0:
 774        return known_dt_cols
 775
 776    cols_indices = {
 777        col: df[col].first_valid_index()
 778        for col in df.columns
 779        if col not in known_dt_cols
 780    }
 781    pydt_cols = [
 782        col
 783        for col, ix in cols_indices.items()
 784        if (
 785            ix is not None
 786            and
 787            isinstance(df.loc[ix][col], datetime)
 788        )
 789    ]
 790    dt_cols_set = set(known_dt_cols + pydt_cols)
 791    all_dt_cols = [
 792        col
 793        for col in df.columns
 794        if col in dt_cols_set
 795    ]
 796    if timezone_aware and timezone_naive:
 797        return all_dt_cols
 798
 799    known_timezone_aware_dt_cols = [
 800        col
 801        for col in known_dt_cols
 802        if getattr(df[col], 'tz', None) is not None
 803    ]
 804    timezone_aware_pydt_cols = [
 805        col
 806        for col in pydt_cols
 807        if df.loc[cols_indices[col]][col].tzinfo is not None
 808    ]
 809    timezone_aware_dt_cols_set = set(known_timezone_aware_dt_cols + timezone_aware_pydt_cols)
 810    if timezone_aware:
 811        return [
 812            col
 813            for col in all_dt_cols
 814            if col in timezone_aware_pydt_cols
 815        ]
 816
 817    return [
 818        col
 819        for col in all_dt_cols
 820        if col not in timezone_aware_dt_cols_set
 821    ]
 822
 823
 824def get_bytes_cols(df: 'pd.DataFrame') -> List[str]:
 825    """
 826    Get the columns which contain bytes strings from a Pandas DataFrame.
 827
 828    Parameters
 829    ----------
 830    df: pd.DataFrame
 831        The DataFrame which may contain bytes strings.
 832
 833    Returns
 834    -------
 835    A list of columns to treat as bytes.
 836    """
 837    if df is None:
 838        return []
 839    is_dask = 'dask' in df.__module__
 840    if is_dask:
 841        df = get_first_valid_dask_partition(df)
 842
 843    if len(df) == 0:
 844        return []
 845
 846    cols_indices = {
 847        col: df[col].first_valid_index()
 848        for col in df.columns
 849    }
 850    return [
 851        col
 852        for col, ix in cols_indices.items()
 853        if (
 854            ix is not None
 855            and
 856            isinstance(df.loc[ix][col], bytes)
 857        )
 858    ]
 859
 860
 861def enforce_dtypes(
 862    df: 'pd.DataFrame',
 863    dtypes: Dict[str, str],
 864    safe_copy: bool = True,
 865    coerce_numeric: bool = True,
 866    coerce_timezone: bool = True,
 867    strip_timezone: bool = False,
 868    debug: bool = False,
 869) -> 'pd.DataFrame':
 870    """
 871    Enforce the `dtypes` dictionary on a DataFrame.
 872
 873    Parameters
 874    ----------
 875    df: pd.DataFrame
 876        The DataFrame on which to enforce dtypes.
 877
 878    dtypes: Dict[str, str]
 879        The data types to attempt to enforce on the DataFrame.
 880
 881    safe_copy: bool, default True
 882        If `True`, create a copy before comparing and modifying the dataframes.
 883        Setting to `False` may mutate the DataFrames.
 884        See `meerschaum.utils.dataframe.filter_unseen_df`.
 885
 886    coerce_numeric: bool, default True
 887        If `True`, convert float and int collisions to numeric.
 888
 889    coerce_timezone: bool, default True
 890        If `True`, convert datetimes to UTC.
 891
 892    strip_timezone: bool, default False
 893        If `coerce_timezone` and `strip_timezone` are `True`,
 894        remove timezone information from datetimes.
 895
 896    debug: bool, default False
 897        Verbosity toggle.
 898
 899    Returns
 900    -------
 901    The Pandas DataFrame with the types enforced.
 902    """
 903    import json
 904    import functools
 905    from meerschaum.utils.debug import dprint
 906    from meerschaum.utils.formatting import pprint
 907    from meerschaum.utils.dtypes import (
 908        are_dtypes_equal,
 909        to_pandas_dtype,
 910        is_dtype_numeric,
 911        attempt_cast_to_numeric,
 912        attempt_cast_to_uuid,
 913        attempt_cast_to_bytes,
 914        coerce_timezone as _coerce_timezone,
 915    )
 916    from meerschaum.utils.dtypes.sql import get_numeric_precision_scale
 917    pandas = mrsm.attempt_import('pandas')
 918    is_dask = 'dask' in df.__module__
 919    if safe_copy:
 920        df = df.copy()
 921    if len(df.columns) == 0:
 922        if debug:
 923            dprint("Incoming DataFrame has no columns. Skipping enforcement...")
 924        return df
 925
 926    pipe_pandas_dtypes = {
 927        col: to_pandas_dtype(typ)
 928        for col, typ in dtypes.items()
 929    }
 930    json_cols = [
 931        col
 932        for col, typ in dtypes.items()
 933        if typ == 'json'
 934    ]
 935    numeric_cols = [
 936        col
 937        for col, typ in dtypes.items()
 938        if typ.startswith('numeric')
 939    ]
 940    uuid_cols = [
 941        col
 942        for col, typ in dtypes.items()
 943        if typ == 'uuid'
 944    ]
 945    bytes_cols = [
 946        col
 947        for col, typ in dtypes.items()
 948        if typ == 'bytes'
 949    ]
 950    datetime_cols = [
 951        col
 952        for col, typ in dtypes.items()
 953        if are_dtypes_equal(typ, 'datetime')
 954    ]
 955    df_numeric_cols = get_numeric_cols(df)
 956    if debug:
 957        dprint("Desired data types:")
 958        pprint(dtypes)
 959        dprint("Data types for incoming DataFrame:")
 960        pprint({_col: str(_typ) for _col, _typ in df.dtypes.items()})
 961
 962    if json_cols and len(df) > 0:
 963        if debug:
 964            dprint(f"Checking columns for JSON encoding: {json_cols}")
 965        for col in json_cols:
 966            if col in df.columns:
 967                try:
 968                    df[col] = df[col].apply(
 969                        (
 970                            lambda x: (
 971                                json.loads(x)
 972                                if isinstance(x, str)
 973                                else x
 974                            )
 975                        )
 976                    )
 977                except Exception as e:
 978                    if debug:
 979                        dprint(f"Unable to parse column '{col}' as JSON:\n{e}")
 980
 981    if numeric_cols:
 982        if debug:
 983            dprint(f"Checking for numerics: {numeric_cols}")
 984        for col in numeric_cols:
 985            precision, scale = get_numeric_precision_scale(None, dtypes.get(col, ''))
 986            if col in df.columns:
 987                try:
 988                    df[col] = df[col].apply(
 989                        functools.partial(
 990                            attempt_cast_to_numeric,
 991                            quantize=True,
 992                            precision=precision,
 993                            scale=scale,
 994                        )
 995                    )
 996                except Exception as e:
 997                    if debug:
 998                        dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}")
 999
1000    if uuid_cols:
1001        if debug:
1002            dprint(f"Checking for UUIDs: {uuid_cols}")
1003        for col in uuid_cols:
1004            if col in df.columns:
1005                try:
1006                    df[col] = df[col].apply(attempt_cast_to_uuid)
1007                except Exception as e:
1008                    if debug:
1009                        dprint(f"Unable to parse column '{col}' as UUID:\n{e}")
1010
1011    if bytes_cols:
1012        if debug:
1013            dprint(f"Checking for bytes: {bytes_cols}")
1014        for col in bytes_cols:
1015            if col in df.columns:
1016                try:
1017                    df[col] = df[col].apply(attempt_cast_to_bytes)
1018                except Exception as e:
1019                    if debug:
1020                        dprint(f"Unable to parse column '{col}' as bytes:\n{e}")
1021
1022    if datetime_cols and coerce_timezone:
1023        if debug:
1024            dprint(f"Checking for datetime conversion: {datetime_cols}")
1025        for col in datetime_cols:
1026            if col in df.columns:
1027                df[col] = _coerce_timezone(df[col], strip_utc=strip_timezone)
1028
1029    df_dtypes = {c: str(t) for c, t in df.dtypes.items()}
1030    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
1031        if debug:
1032            dprint("Data types match. Exiting enforcement...")
1033        return df
1034
1035    common_dtypes = {}
1036    common_diff_dtypes = {}
1037    for col, typ in pipe_pandas_dtypes.items():
1038        if col in df_dtypes:
1039            common_dtypes[col] = typ
1040            if not are_dtypes_equal(typ, df_dtypes[col]):
1041                common_diff_dtypes[col] = df_dtypes[col]
1042
1043    if debug:
1044        dprint("Common columns with different dtypes:")
1045        pprint(common_diff_dtypes)
1046
1047    detected_dt_cols = {}
1048    for col, typ in common_diff_dtypes.items():
1049        if 'datetime' in typ and 'datetime' in common_dtypes[col]:
1050            df_dtypes[col] = typ
1051            detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col])
1052    for col in detected_dt_cols:
1053        del common_diff_dtypes[col]
1054
1055    if debug:
1056        dprint("Common columns with different dtypes (after dates):")
1057        pprint(common_diff_dtypes)
1058
1059    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
1060        if debug:
1061            dprint(
1062                "The incoming DataFrame has mostly the same types, skipping enforcement."
1063                + "The only detected difference was in the following datetime columns."
1064            )
1065            pprint(detected_dt_cols)
1066        return df
1067
1068    for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items():
1069        previous_typ = common_dtypes[col]
1070        mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ))
1071        explicitly_float = are_dtypes_equal(dtypes.get(col, 'object'), 'float')
1072        explicitly_numeric = dtypes.get(col, 'numeric').startswith('numeric')
1073        cast_to_numeric = (
1074            explicitly_numeric
1075            or col in df_numeric_cols
1076            or (mixed_numeric_types and not explicitly_float)
1077        ) and coerce_numeric
1078        if cast_to_numeric:
1079            common_dtypes[col] = attempt_cast_to_numeric
1080            common_diff_dtypes[col] = attempt_cast_to_numeric
1081
1082    for d in common_diff_dtypes:
1083        t = common_dtypes[d]
1084        if debug:
1085            dprint(f"Casting column {d} to dtype {t}.")
1086        try:
1087            df[d] = (
1088                df[d].apply(t)
1089                if callable(t)
1090                else df[d].astype(t)
1091            )
1092        except Exception as e:
1093            if debug:
1094                dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}")
1095            if 'int' in str(t).lower():
1096                try:
1097                    df[d] = df[d].astype('float64').astype(t)
1098                except Exception:
1099                    if debug:
1100                        dprint(f"Was unable to convert to float then {t}.")
1101    return df
1102
1103
1104def get_datetime_bound_from_df(
1105    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]],
1106    datetime_column: str,
1107    minimum: bool = True,
1108) -> Union[int, datetime, None]:
1109    """
1110    Return the minimum or maximum datetime (or integer) from a DataFrame.
1111
1112    Parameters
1113    ----------
1114    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]
1115        The DataFrame, list, or dict which contains the range axis.
1116
1117    datetime_column: str
1118        The name of the datetime (or int) column.
1119
1120    minimum: bool
1121        Whether to return the minimum (default) or maximum value.
1122
1123    Returns
1124    -------
1125    The minimum or maximum datetime value in the dataframe, or `None`.
1126    """
1127    from meerschaum.utils.dtypes import to_datetime, value_is_null
1128
1129    if df is None:
1130        return None
1131    if not datetime_column:
1132        return None
1133
1134    def compare(a, b):
1135        if a is None:
1136            return b
1137        if b is None:
1138            return a
1139        if minimum:
1140            return a if a < b else b
1141        return a if a > b else b
1142
1143    if isinstance(df, list):
1144        if len(df) == 0:
1145            return None
1146        best_yet = df[0].get(datetime_column, None)
1147        for doc in df:
1148            val = doc.get(datetime_column, None)
1149            best_yet = compare(best_yet, val)
1150        return best_yet
1151
1152    if isinstance(df, dict):
1153        if datetime_column not in df:
1154            return None
1155        best_yet = df[datetime_column][0]
1156        for val in df[datetime_column]:
1157            best_yet = compare(best_yet, val)
1158        return best_yet
1159
1160    if 'DataFrame' in str(type(df)):
1161        from meerschaum.utils.dtypes import are_dtypes_equal
1162        pandas = mrsm.attempt_import('pandas')
1163        is_dask = 'dask' in df.__module__
1164
1165        if datetime_column not in df.columns:
1166            return None
1167
1168        try:
1169            dt_val = (
1170                df[datetime_column].min(skipna=True)
1171                if minimum
1172                else df[datetime_column].max(skipna=True)
1173            )
1174        except Exception:
1175            dt_val = pandas.NA
1176        if is_dask and dt_val is not None and dt_val is not pandas.NA:
1177            dt_val = dt_val.compute()
1178
1179        return (
1180            to_datetime(dt_val, as_pydatetime=True)
1181            if are_dtypes_equal(str(type(dt_val)), 'datetime')
1182            else (dt_val if not value_is_null(dt_val) else None)
1183        )
1184
1185    return None
1186
1187
1188def get_unique_index_values(
1189    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]],
1190    indices: List[str],
1191) -> Dict[str, List[Any]]:
1192    """
1193    Return a dictionary of the unique index values in a DataFrame.
1194
1195    Parameters
1196    ----------
1197    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]
1198        The dataframe (or list or dict) which contains index values.
1199
1200    indices: List[str]
1201        The list of index columns.
1202
1203    Returns
1204    -------
1205    A dictionary mapping indices to unique values.
1206    """
1207    if df is None:
1208        return {}
1209    if 'dataframe' in str(type(df)).lower():
1210        pandas = mrsm.attempt_import('pandas')
1211        return {
1212            col: list({
1213                (val if val is not pandas.NA else None)
1214                for val in df[col].unique()
1215            })
1216            for col in indices
1217            if col in df.columns
1218        }
1219
1220    unique_indices = defaultdict(lambda: set())
1221    if isinstance(df, list):
1222        for doc in df:
1223            for index in indices:
1224                if index in doc:
1225                    unique_indices[index].add(doc[index])
1226
1227    elif isinstance(df, dict):
1228        for index in indices:
1229            if index in df:
1230                unique_indices[index] = unique_indices[index].union(set(df[index]))
1231
1232    return {key: list(val) for key, val in unique_indices.items()}
1233
1234
1235def df_is_chunk_generator(df: Any) -> bool:
1236    """
1237    Determine whether to treat `df` as a chunk generator.
1238
1239    Note this should only be used in a context where generators are expected,
1240    as it will return `True` for any iterable.
1241
1242    Parameters
1243    ----------
1244    The DataFrame or chunk generator to evaluate.
1245
1246    Returns
1247    -------
1248    A `bool` indicating whether to treat `df` as a generator.
1249    """
1250    return (
1251        not isinstance(df, (dict, list, str))
1252        and 'DataFrame' not in str(type(df))
1253        and isinstance(df, (Generator, Iterable, Iterator))
1254    )
1255
1256
1257def chunksize_to_npartitions(chunksize: Optional[int]) -> int:
1258    """
1259    Return the Dask `npartitions` value for a given `chunksize`.
1260    """
1261    if chunksize == -1:
1262        from meerschaum.config import get_config
1263        chunksize = get_config('system', 'connectors', 'sql', 'chunksize')
1264    if chunksize is None:
1265        return 1
1266    return -1 * chunksize
1267
1268
1269def df_from_literal(
1270    pipe: Optional[mrsm.Pipe] = None,
1271    literal: str = None,
1272    debug: bool = False
1273) -> 'pd.DataFrame':
1274    """
1275    Construct a dataframe from a literal value, using the pipe's datetime and value column names.
1276
1277    Parameters
1278    ----------
1279    pipe: Optional['meerschaum.Pipe'], default None
1280        The pipe which will consume the literal value.
1281
1282    Returns
1283    -------
1284    A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
1285    and the literal as the value.
1286    """
1287    from meerschaum.utils.packages import import_pandas
1288    from meerschaum.utils.warnings import error, warn
1289    from meerschaum.utils.debug import dprint
1290
1291    if pipe is None or literal is None:
1292        error("Please provide a Pipe and a literal value")
1293    ### this will raise an error if the columns are undefined
1294    dt_name, val_name = pipe.get_columns('datetime', 'value')
1295
1296    val = literal
1297    if isinstance(literal, str):
1298        if debug:
1299            dprint(f"Received literal string: '{literal}'")
1300        import ast
1301        try:
1302            val = ast.literal_eval(literal)
1303        except Exception:
1304            warn(
1305                "Failed to parse value from string:\n" + f"{literal}" +
1306                "\n\nWill cast as a string instead."\
1307            )
1308            val = literal
1309
1310    from datetime import datetime, timezone
1311    now = datetime.now(timezone.utc).replace(tzinfo=None)
1312
1313    pd = import_pandas()
1314    return pd.DataFrame({dt_name: [now], val_name: [val]})
1315
1316
1317def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]:
1318    """
1319    Return the first valid Dask DataFrame partition (if possible).
1320    """
1321    pdf = None
1322    for partition in ddf.partitions:
1323        try:
1324            pdf = partition.compute()
1325        except Exception:
1326            continue
1327        if len(pdf) > 0:
1328            return pdf
1329    _ = mrsm.attempt_import('partd', lazy=False)
1330    return ddf.compute()
1331
1332
1333def query_df(
1334    df: 'pd.DataFrame',
1335    params: Optional[Dict[str, Any]] = None,
1336    begin: Union[datetime, int, None] = None,
1337    end: Union[datetime, int, None] = None,
1338    datetime_column: Optional[str] = None,
1339    select_columns: Optional[List[str]] = None,
1340    omit_columns: Optional[List[str]] = None,
1341    inplace: bool = False,
1342    reset_index: bool = False,
1343    coerce_types: bool = False,
1344    debug: bool = False,
1345) -> 'pd.DataFrame':
1346    """
1347    Query the dataframe with the params dictionary.
1348
1349    Parameters
1350    ----------
1351    df: pd.DataFrame
1352        The DataFrame to query against.
1353
1354    params: Optional[Dict[str, Any]], default None
1355        The parameters dictionary to use for the query.
1356
1357    begin: Union[datetime, int, None], default None
1358        If `begin` and `datetime_column` are provided, only return rows with a timestamp
1359        greater than or equal to this value.
1360
1361    end: Union[datetime, int, None], default None
1362        If `begin` and `datetime_column` are provided, only return rows with a timestamp
1363        less than this value.
1364
1365    datetime_column: Optional[str], default None
1366        A `datetime_column` must be provided to use `begin` and `end`.
1367
1368    select_columns: Optional[List[str]], default None
1369        If provided, only return these columns.
1370
1371    omit_columns: Optional[List[str]], default None
1372        If provided, do not include these columns in the result.
1373
1374    inplace: bool, default False
1375        If `True`, modify the DataFrame inplace rather than creating a new DataFrame.
1376
1377    reset_index: bool, default False
1378        If `True`, reset the index in the resulting DataFrame.
1379
1380    coerce_types: bool, default False
1381        If `True`, cast the dataframe and parameters as strings before querying.
1382
1383    Returns
1384    -------
1385    A Pandas DataFrame query result.
1386    """
1387
1388    def _process_select_columns(_df):
1389        if not select_columns:
1390            return
1391        for col in list(_df.columns):
1392            if col not in select_columns:
1393                del _df[col]
1394
1395    def _process_omit_columns(_df):
1396        if not omit_columns:
1397            return
1398        for col in list(_df.columns):
1399            if col in omit_columns:
1400                del _df[col]
1401
1402    if not params and not begin and not end:
1403        if not inplace:
1404            df = df.copy()
1405        _process_select_columns(df)
1406        _process_omit_columns(df)
1407        return df
1408
1409    from meerschaum.utils.debug import dprint
1410    from meerschaum.utils.misc import get_in_ex_params
1411    from meerschaum.utils.warnings import warn
1412    from meerschaum.utils.dtypes import are_dtypes_equal, value_is_null
1413    dateutil_parser = mrsm.attempt_import('dateutil.parser')
1414    pandas = mrsm.attempt_import('pandas')
1415    NA = pandas.NA
1416
1417    if params:
1418        params = params.copy()
1419        for key, val in {k: v for k, v in params.items()}.items():
1420            if isinstance(val, (list, tuple)):
1421                if None in val:
1422                    val = [item for item in val if item is not None] + [NA]
1423                    params[key] = val
1424                if coerce_types:
1425                    params[key] = [str(x) for x in val]
1426            else:
1427                if value_is_null(val):
1428                    val = NA
1429                    params[key] = NA
1430                if coerce_types:
1431                    params[key] = str(val)
1432
1433    dtypes = {col: str(typ) for col, typ in df.dtypes.items()}
1434
1435    if inplace:
1436        df.fillna(NA, inplace=True)
1437    else:
1438        df = df.infer_objects().fillna(NA)
1439
1440    if isinstance(begin, str):
1441        begin = dateutil_parser.parse(begin)
1442    if isinstance(end, str):
1443        end = dateutil_parser.parse(end)
1444
1445    if begin is not None or end is not None:
1446        if not datetime_column or datetime_column not in df.columns:
1447            warn(
1448                f"The datetime column '{datetime_column}' is not present in the Dataframe, "
1449                + "ignoring begin and end...",
1450            )
1451            begin, end = None, None
1452
1453    if debug:
1454        dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}")
1455
1456    if datetime_column and (begin is not None or end is not None):
1457        if debug:
1458            dprint("Checking for datetime column compatability.")
1459
1460        from meerschaum.utils.dtypes import coerce_timezone
1461        df_is_dt = are_dtypes_equal(str(df.dtypes[datetime_column]), 'datetime')
1462        begin_is_int = are_dtypes_equal(str(type(begin)), 'int')
1463        end_is_int = are_dtypes_equal(str(type(end)), 'int')
1464
1465        if df_is_dt:
1466            df_tz = (
1467                getattr(df[datetime_column].dt, 'tz', None)
1468                if hasattr(df[datetime_column], 'dt')
1469                else None
1470            )
1471
1472            if begin_is_int:
1473                begin = datetime.fromtimestamp(int(begin), timezone.utc).replace(tzinfo=None)
1474                if debug:
1475                    dprint(f"`begin` will be cast to '{begin}'.")
1476            if end_is_int:
1477                end = datetime.fromtimestamp(int(end), timezone.utc).replace(tzinfo=None)
1478                if debug:
1479                    dprint(f"`end` will be cast to '{end}'.")
1480
1481            begin = coerce_timezone(begin, strip_utc=(df_tz is None)) if begin is not None else None
1482            end = coerce_timezone(end, strip_utc=(df_tz is None)) if begin is not None else None
1483
1484    in_ex_params = get_in_ex_params(params)
1485
1486    masks = [
1487        (
1488            (df[datetime_column] >= begin)
1489            if begin is not None and datetime_column
1490            else True
1491        ) & (
1492            (df[datetime_column] < end)
1493            if end is not None and datetime_column
1494            else True
1495        )
1496    ]
1497
1498    masks.extend([
1499        (
1500            (
1501                (df[col] if not coerce_types else df[col].astype(str)).isin(in_vals)
1502                if in_vals
1503                else True
1504            ) & (
1505                ~(df[col] if not coerce_types else df[col].astype(str)).isin(ex_vals)
1506                if ex_vals
1507                else True
1508            )
1509        )
1510        for col, (in_vals, ex_vals) in in_ex_params.items()
1511        if col in df.columns
1512    ])
1513    query_mask = masks[0]
1514    for mask in masks[1:]:
1515        query_mask = query_mask & mask
1516
1517    original_cols = df.columns
1518
1519    ### NOTE: We must cast bool columns to `boolean[pyarrow]`
1520    ###       to allow for `<NA>` values.
1521    bool_cols = [
1522        col
1523        for col, typ in df.dtypes.items()
1524        if are_dtypes_equal(str(typ), 'bool')
1525    ]
1526    for col in bool_cols:
1527        df[col] = df[col].astype('boolean[pyarrow]')
1528
1529    if not isinstance(query_mask, bool):
1530        df['__mrsm_mask'] = (
1531            query_mask.astype('boolean[pyarrow]')
1532            if hasattr(query_mask, 'astype')
1533            else query_mask
1534        )
1535
1536        if inplace:
1537            df.where(query_mask, other=NA, inplace=True)
1538            df.dropna(how='all', inplace=True)
1539            result_df = df
1540        else:
1541            result_df = df.where(query_mask, other=NA)
1542            result_df.dropna(how='all', inplace=True)
1543
1544    else:
1545        result_df = df
1546
1547    if '__mrsm_mask' in df.columns:
1548        del df['__mrsm_mask']
1549    if '__mrsm_mask' in result_df.columns:
1550        del result_df['__mrsm_mask']
1551
1552    if reset_index:
1553        result_df.reset_index(drop=True, inplace=True)
1554
1555    result_df = enforce_dtypes(
1556        result_df,
1557        dtypes,
1558        safe_copy=False,
1559        debug=debug,
1560        coerce_numeric=False,
1561        coerce_timezone=False,
1562    )
1563
1564    if select_columns == ['*']:
1565        select_columns = None
1566
1567    if not select_columns and not omit_columns:
1568        return result_df[original_cols]
1569
1570    _process_select_columns(result_df)
1571    _process_omit_columns(result_df)
1572
1573    return result_df
1574
1575
1576def to_json(
1577    df: 'pd.DataFrame',
1578    safe_copy: bool = True,
1579    orient: str = 'records',
1580    date_format: str = 'iso',
1581    date_unit: str = 'us',
1582    **kwargs: Any
1583) -> str:
1584    """
1585    Serialize the given dataframe as a JSON string.
1586
1587    Parameters
1588    ----------
1589    df: pd.DataFrame
1590        The DataFrame to be serialized.
1591
1592    safe_copy: bool, default True
1593        If `False`, modify the DataFrame inplace.
1594
1595    date_format: str, default 'iso'
1596        The default format for timestamps.
1597
1598    date_unit: str, default 'us'
1599        The precision of the timestamps.
1600
1601    Returns
1602    -------
1603    A JSON string.
1604    """
1605    from meerschaum.utils.packages import import_pandas
1606    from meerschaum.utils.dtypes import serialize_bytes, serialize_decimal
1607    pd = import_pandas()
1608    uuid_cols = get_uuid_cols(df)
1609    bytes_cols = get_bytes_cols(df)
1610    numeric_cols = get_numeric_cols(df)
1611    if safe_copy and bool(uuid_cols or bytes_cols):
1612        df = df.copy()
1613    for col in uuid_cols:
1614        df[col] = df[col].astype(str)
1615    for col in bytes_cols:
1616        df[col] = df[col].apply(serialize_bytes)
1617    for col in numeric_cols:
1618        df[col] = df[col].apply(serialize_decimal)
1619    return df.infer_objects(copy=False).fillna(pd.NA).to_json(
1620        date_format=date_format,
1621        date_unit=date_unit,
1622        orient=orient,
1623        **kwargs
1624    )
def add_missing_cols_to_df( df: pandas.core.frame.DataFrame, dtypes: Dict[str, Any]) -> pandas.core.frame.DataFrame:
26def add_missing_cols_to_df(
27    df: 'pd.DataFrame',
28    dtypes: Dict[str, Any],
29) -> 'pd.DataFrame':
30    """
31    Add columns from the dtypes dictionary as null columns to a new DataFrame.
32
33    Parameters
34    ----------
35    df: pd.DataFrame
36        The dataframe we should copy and add null columns.
37
38    dtypes:
39        The data types dictionary which may contain keys not present in `df.columns`.
40
41    Returns
42    -------
43    A new `DataFrame` with the keys from `dtypes` added as null columns.
44    If `df.dtypes` is the same as `dtypes`, then return a reference to `df`.
45    NOTE: This will not ensure that dtypes are enforced!
46
47    Examples
48    --------
49    >>> import pandas as pd
50    >>> df = pd.DataFrame([{'a': 1}])
51    >>> dtypes = {'b': 'Int64'}
52    >>> add_missing_cols_to_df(df, dtypes)
53          a  b
54       0  1  <NA>
55    >>> add_missing_cols_to_df(df, dtypes).dtypes
56    a    int64
57    b    Int64
58    dtype: object
59    >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes
60    a    int64
61    dtype: object
62    >>> 
63    """
64    if set(df.columns) == set(dtypes):
65        return df
66
67    from meerschaum.utils.packages import attempt_import
68    from meerschaum.utils.dtypes import to_pandas_dtype
69    pandas = attempt_import('pandas')
70
71    def build_series(dtype: str):
72        return pandas.Series([], dtype=to_pandas_dtype(dtype))
73
74    assign_kwargs = {
75        str(col): build_series(str(typ))
76        for col, typ in dtypes.items()
77        if col not in df.columns
78    }
79    df_with_cols = df.assign(**assign_kwargs)
80    for col in assign_kwargs:
81        df_with_cols[col] = df_with_cols[col].fillna(pandas.NA)
82    return df_with_cols

Add columns from the dtypes dictionary as null columns to a new DataFrame.

Parameters
  • df (pd.DataFrame): The dataframe we should copy and add null columns.
  • dtypes:: The data types dictionary which may contain keys not present in df.columns.
Returns
  • A new DataFrame with the keys from dtypes added as null columns.
  • If df.dtypes is the same as dtypes, then return a reference to df.
  • NOTE (This will not ensure that dtypes are enforced!):
Examples
>>> import pandas as pd
>>> df = pd.DataFrame([{'a': 1}])
>>> dtypes = {'b': 'Int64'}
>>> add_missing_cols_to_df(df, dtypes)
      a  b
   0  1  <NA>
>>> add_missing_cols_to_df(df, dtypes)meerschaum.utils.dtypes
a    int64
b    Int64
dtype: object
>>> add_missing_cols_to_df(df, {'a': 'object'})meerschaum.utils.dtypes
a    int64
dtype: object
>>>
def filter_unseen_df( old_df: pandas.core.frame.DataFrame, new_df: pandas.core.frame.DataFrame, safe_copy: bool = True, dtypes: Optional[Dict[str, Any]] = None, include_unchanged_columns: bool = False, coerce_mixed_numerics: bool = True, debug: bool = False) -> pandas.core.frame.DataFrame:
 85def filter_unseen_df(
 86    old_df: 'pd.DataFrame',
 87    new_df: 'pd.DataFrame',
 88    safe_copy: bool = True,
 89    dtypes: Optional[Dict[str, Any]] = None,
 90    include_unchanged_columns: bool = False,
 91    coerce_mixed_numerics: bool = True,
 92    debug: bool = False,
 93) -> 'pd.DataFrame':
 94    """
 95    Left join two DataFrames to find the newest unseen data.
 96
 97    Parameters
 98    ----------
 99    old_df: 'pd.DataFrame'
100        The original (target) dataframe. Acts as a filter on the `new_df`.
101
102    new_df: 'pd.DataFrame'
103        The fetched (source) dataframe. Rows that are contained in `old_df` are removed.
104
105    safe_copy: bool, default True
106        If `True`, create a copy before comparing and modifying the dataframes.
107        Setting to `False` may mutate the DataFrames.
108
109    dtypes: Optional[Dict[str, Any]], default None
110        Optionally specify the datatypes of the dataframe.
111
112    include_unchanged_columns: bool, default False
113        If `True`, include columns which haven't changed on rows which have changed.
114
115    coerce_mixed_numerics: bool, default True
116        If `True`, cast mixed integer and float columns between the old and new dataframes into
117        numeric values (`decimal.Decimal`).
118
119    debug: bool, default False
120        Verbosity toggle.
121
122    Returns
123    -------
124    A pandas dataframe of the new, unseen rows in `new_df`.
125
126    Examples
127    --------
128    ```python
129    >>> import pandas as pd
130    >>> df1 = pd.DataFrame({'a': [1,2]})
131    >>> df2 = pd.DataFrame({'a': [2,3]})
132    >>> filter_unseen_df(df1, df2)
133       a
134    0  3
135
136    ```
137
138    """
139    if old_df is None:
140        return new_df
141
142    if safe_copy:
143        old_df = old_df.copy()
144        new_df = new_df.copy()
145
146    import json
147    import functools
148    import traceback
149    from meerschaum.utils.warnings import warn
150    from meerschaum.utils.packages import import_pandas, attempt_import
151    from meerschaum.utils.dtypes import (
152        to_pandas_dtype,
153        are_dtypes_equal,
154        attempt_cast_to_numeric,
155        attempt_cast_to_uuid,
156        attempt_cast_to_bytes,
157        coerce_timezone,
158        serialize_decimal,
159    )
160    from meerschaum.utils.dtypes.sql import get_numeric_precision_scale
161    pd = import_pandas(debug=debug)
162    is_dask = 'dask' in new_df.__module__
163    if is_dask:
164        pandas = attempt_import('pandas')
165        _ = attempt_import('partd', lazy=False)
166        dd = attempt_import('dask.dataframe')
167        merge = dd.merge
168        NA = pandas.NA
169    else:
170        merge = pd.merge
171        NA = pd.NA
172
173    new_df_dtypes = dict(new_df.dtypes)
174    old_df_dtypes = dict(old_df.dtypes)
175
176    same_cols = set(new_df.columns) == set(old_df.columns)
177    if not same_cols:
178        new_df = add_missing_cols_to_df(new_df, old_df_dtypes)
179        old_df = add_missing_cols_to_df(old_df, new_df_dtypes)
180
181        new_types_missing_from_old = {
182            col: typ
183            for col, typ in new_df_dtypes.items()
184            if col not in old_df_dtypes
185        }
186        old_types_missing_from_new = {
187            col: typ
188            for col, typ in new_df_dtypes.items()
189            if col not in old_df_dtypes
190        }
191        old_df_dtypes.update(new_types_missing_from_old)
192        new_df_dtypes.update(old_types_missing_from_new)
193
194    ### Edge case: two empty lists cast to DFs.
195    elif len(new_df.columns) == 0:
196        return new_df
197
198    try:
199        ### Order matters when checking equality.
200        new_df = new_df[old_df.columns]
201
202    except Exception as e:
203        warn(
204            "Was not able to cast old columns onto new DataFrame. " +
205            f"Are both DataFrames the same shape? Error:\n{e}",
206            stacklevel=3,
207        )
208        return new_df[list(new_df_dtypes.keys())]
209
210    ### assume the old_df knows what it's doing, even if it's technically wrong.
211    if dtypes is None:
212        dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()}
213
214    dtypes = {
215        col: to_pandas_dtype(typ)
216        for col, typ in dtypes.items()
217        if col in new_df_dtypes and col in old_df_dtypes
218    }
219    for col, typ in new_df_dtypes.items():
220        if col not in dtypes:
221            dtypes[col] = typ
222
223    numeric_cols_precisions_scales = {
224        col: get_numeric_precision_scale(None, typ)
225        for col, typ in dtypes.items()
226        if col and typ and typ.startswith('numeric')
227    }
228
229    dt_dtypes = {
230        col: typ
231        for col, typ in dtypes.items()
232        if are_dtypes_equal(typ, 'datetime')
233    }
234    non_dt_dtypes = {
235        col: typ
236        for col, typ in dtypes.items()
237        if col not in dt_dtypes
238    }
239
240    cast_non_dt_cols = True
241    try:
242        new_df = new_df.astype(non_dt_dtypes)
243        cast_non_dt_cols = False
244    except Exception as e:
245        warn(
246            f"Was not able to cast the new DataFrame to the given dtypes.\n{e}"
247        )
248
249    cast_dt_cols = True
250    try:
251        for col, typ in dt_dtypes.items():
252            strip_utc = (
253                (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]'
254            )
255            if col in old_df.columns:
256                old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc)
257            if col in new_df.columns:
258                new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc)
259        cast_dt_cols = False
260    except Exception as e:
261        warn(f"Could not cast datetime columns:\n{e}")
262
263    cast_cols = cast_dt_cols or cast_non_dt_cols
264
265    new_numeric_cols_existing = get_numeric_cols(new_df)
266    old_numeric_cols = get_numeric_cols(old_df)
267    for col, typ in {k: v for k, v in dtypes.items()}.items():
268        if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')):
269            new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float')
270            new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int')
271            new_is_numeric = col in new_numeric_cols_existing
272            old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float')
273            old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int')
274            old_is_numeric = col in old_numeric_cols
275
276            if (
277                coerce_mixed_numerics
278                and
279                (new_is_float or new_is_int or new_is_numeric)
280                and
281                (old_is_float or old_is_int or old_is_numeric)
282            ):
283                dtypes[col] = attempt_cast_to_numeric
284                cast_cols = True
285                continue
286
287            ### Fallback to object if the types don't match.
288            warn(
289                f"Detected different types for '{col}' "
290                + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), "
291                + "falling back to 'object'..."
292            )
293            dtypes[col] = 'object'
294            cast_cols = True
295
296    if cast_cols:
297        for col, dtype in dtypes.items():
298            if col in new_df.columns:
299                try:
300                    new_df[col] = (
301                        new_df[col].astype(dtype)
302                        if not callable(dtype)
303                        else new_df[col].apply(dtype)
304                    )
305                except Exception as e:
306                    warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}")
307
308    serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str)
309    new_json_cols = get_json_cols(new_df)
310    old_json_cols = get_json_cols(old_df)
311    json_cols = set(new_json_cols + old_json_cols)
312    for json_col in old_json_cols:
313        old_df[json_col] = old_df[json_col].apply(serializer)
314    for json_col in new_json_cols:
315        new_df[json_col] = new_df[json_col].apply(serializer)
316
317    new_numeric_cols = get_numeric_cols(new_df)
318    numeric_cols = set(new_numeric_cols + old_numeric_cols)
319    for numeric_col in old_numeric_cols:
320        old_df[numeric_col] = old_df[numeric_col].apply(serialize_decimal)
321    for numeric_col in new_numeric_cols:
322        new_df[numeric_col] = new_df[numeric_col].apply(serialize_decimal)
323
324    old_dt_cols = [
325        col
326        for col, typ in old_df.dtypes.items()
327        if are_dtypes_equal(str(typ), 'datetime')
328    ]
329    for col in old_dt_cols:
330        strip_utc = (
331            (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]'
332        )
333        old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc)
334
335    new_dt_cols = [
336        col
337        for col, typ in new_df.dtypes.items()
338        if are_dtypes_equal(str(typ), 'datetime')
339    ]
340    for col in new_dt_cols:
341        strip_utc = (
342            (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]'
343        )
344        new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc)
345
346    old_uuid_cols = get_uuid_cols(old_df)
347    new_uuid_cols = get_uuid_cols(new_df)
348    uuid_cols = set(new_uuid_cols + old_uuid_cols)
349
350    old_bytes_cols = get_bytes_cols(old_df)
351    new_bytes_cols = get_bytes_cols(new_df)
352    bytes_cols = set(new_bytes_cols + old_bytes_cols)
353
354    joined_df = merge(
355        new_df.infer_objects(copy=False).fillna(NA),
356        old_df.infer_objects(copy=False).fillna(NA),
357        how='left',
358        on=None,
359        indicator=True,
360    )
361    changed_rows_mask = (joined_df['_merge'] == 'left_only')
362    new_cols = list(new_df_dtypes)
363    delta_df = joined_df[new_cols][changed_rows_mask].reset_index(drop=True)
364
365    for json_col in json_cols:
366        if json_col not in delta_df.columns:
367            continue
368        try:
369            delta_df[json_col] = delta_df[json_col].apply(json.loads)
370        except Exception:
371            warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}")
372
373    for numeric_col in numeric_cols:
374        if numeric_col not in delta_df.columns:
375            continue
376        try:
377            delta_df[numeric_col] = delta_df[numeric_col].apply(
378                functools.partial(
379                    attempt_cast_to_numeric,
380                    quantize=True,
381                    precision=numeric_cols_precisions_scales.get(numeric_col, (None, None)[0]),
382                    scale=numeric_cols_precisions_scales.get(numeric_col, (None, None)[1]),
383                )
384            )
385        except Exception:
386            warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}")
387
388    for uuid_col in uuid_cols:
389        if uuid_col not in delta_df.columns:
390            continue
391        try:
392            delta_df[uuid_col] = delta_df[uuid_col].apply(attempt_cast_to_uuid)
393        except Exception:
394            warn(f"Unable to parse numeric column '{uuid_col}':\n{traceback.format_exc()}")
395
396    for bytes_col in bytes_cols:
397        if bytes_col not in delta_df.columns:
398            continue
399        try:
400            delta_df[bytes_col] = delta_df[bytes_col].apply(attempt_cast_to_bytes)
401        except Exception:
402            warn(f"Unable to parse bytes column '{bytes_col}':\n{traceback.format_exc()}")
403
404    return delta_df

Left join two DataFrames to find the newest unseen data.

Parameters
  • old_df ('pd.DataFrame'): The original (target) dataframe. Acts as a filter on the new_df.
  • new_df ('pd.DataFrame'): The fetched (source) dataframe. Rows that are contained in old_df are removed.
  • safe_copy (bool, default True): If True, create a copy before comparing and modifying the dataframes. Setting to False may mutate the DataFrames.
  • dtypes (Optional[Dict[str, Any]], default None): Optionally specify the datatypes of the dataframe.
  • include_unchanged_columns (bool, default False): If True, include columns which haven't changed on rows which have changed.
  • coerce_mixed_numerics (bool, default True): If True, cast mixed integer and float columns between the old and new dataframes into numeric values (decimal.Decimal).
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pandas dataframe of the new, unseen rows in new_df.
Examples
>>> import pandas as pd
>>> df1 = pd.DataFrame({'a': [1,2]})
>>> df2 = pd.DataFrame({'a': [2,3]})
>>> filter_unseen_df(df1, df2)
   a
0  3
def parse_df_datetimes( df: pandas.core.frame.DataFrame, ignore_cols: Optional[Iterable[str]] = None, strip_timezone: bool = False, chunksize: Optional[int] = None, dtype_backend: str = 'numpy_nullable', ignore_all: bool = False, debug: bool = False) -> pandas.core.frame.DataFrame:
407def parse_df_datetimes(
408    df: 'pd.DataFrame',
409    ignore_cols: Optional[Iterable[str]] = None,
410    strip_timezone: bool = False,
411    chunksize: Optional[int] = None,
412    dtype_backend: str = 'numpy_nullable',
413    ignore_all: bool = False,
414    debug: bool = False,
415) -> 'pd.DataFrame':
416    """
417    Parse a pandas DataFrame for datetime columns and cast as datetimes.
418
419    Parameters
420    ----------
421    df: pd.DataFrame
422        The pandas DataFrame to parse.
423
424    ignore_cols: Optional[Iterable[str]], default None
425        If provided, do not attempt to coerce these columns as datetimes.
426
427    strip_timezone: bool, default False
428        If `True`, remove the UTC `tzinfo` property.
429
430    chunksize: Optional[int], default None
431        If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe.
432
433    dtype_backend: str, default 'numpy_nullable'
434        If `df` is not a DataFrame and new one needs to be constructed,
435        use this as the datatypes backend.
436        Accepted values are 'numpy_nullable' and 'pyarrow'.
437
438    ignore_all: bool, default False
439        If `True`, do not attempt to cast any columns to datetimes.
440
441    debug: bool, default False
442        Verbosity toggle.
443
444    Returns
445    -------
446    A new pandas DataFrame with the determined datetime columns
447    (usually ISO strings) cast as datetimes.
448
449    Examples
450    --------
451    ```python
452    >>> import pandas as pd
453    >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 
454    >>> df.dtypes
455    a    object
456    dtype: object
457    >>> df = parse_df_datetimes(df)
458    >>> df.dtypes
459    a    datetime64[ns]
460    dtype: object
461
462    ```
463
464    """
465    from meerschaum.utils.packages import import_pandas, attempt_import
466    from meerschaum.utils.debug import dprint
467    from meerschaum.utils.warnings import warn
468    from meerschaum.utils.misc import items_str
469    from meerschaum.utils.dtypes import to_datetime
470    import traceback
471    pd = import_pandas()
472    pandas = attempt_import('pandas')
473    pd_name = pd.__name__
474    using_dask = 'dask' in pd_name
475    df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__)
476    dask_dataframe = None
477    if using_dask or df_is_dask:
478        npartitions = chunksize_to_npartitions(chunksize)
479        dask_dataframe = attempt_import('dask.dataframe')
480
481    ### if df is a dict, build DataFrame
482    if isinstance(df, pandas.DataFrame):
483        pdf = df
484    elif df_is_dask and isinstance(df, dask_dataframe.DataFrame):
485        pdf = get_first_valid_dask_partition(df)
486    else:
487        if debug:
488            dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...")
489
490        if using_dask:
491            if isinstance(df, list):
492                keys = set()
493                for doc in df:
494                    for key in doc:
495                        keys.add(key)
496                df = pd.DataFrame.from_dict(
497                    {
498                        k: [
499                            doc.get(k, None)
500                            for doc in df
501                        ] for k in keys
502                    },
503                    npartitions=npartitions,
504                )
505            elif isinstance(df, dict):
506                df = pd.DataFrame.from_dict(df, npartitions=npartitions)
507            elif 'pandas.core.frame.DataFrame' in str(type(df)):
508                df = pd.from_pandas(df, npartitions=npartitions)
509            else:
510                raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.")
511            pandas = attempt_import('pandas')
512            pdf = get_first_valid_dask_partition(df)
513
514        else:
515            df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend)
516            pdf = df
517
518    ### skip parsing if DataFrame is empty
519    if len(pdf) == 0:
520        if debug:
521            dprint("df is empty. Returning original DataFrame without casting datetime columns...")
522        return df
523
524    ignore_cols = set(
525        (ignore_cols or []) + [
526            col
527            for col, dtype in pdf.dtypes.items() 
528            if 'datetime' in str(dtype)
529        ]
530    )
531    cols_to_inspect = [
532        col
533        for col in pdf.columns
534        if col not in ignore_cols
535    ] if not ignore_all else []
536
537    if len(cols_to_inspect) == 0:
538        if debug:
539            dprint("All columns are ignored, skipping datetime detection...")
540        return df.infer_objects(copy=False).fillna(pandas.NA)
541
542    ### apply regex to columns to determine which are ISO datetimes
543    iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+'
544    dt_mask = pdf[cols_to_inspect].astype(str).apply(
545        lambda s: s.str.match(iso_dt_regex).all()
546    )
547
548    ### list of datetime column names
549    datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]]
550    if not datetime_cols:
551        if debug:
552            dprint("No columns detected as datetimes, returning...")
553        return df.infer_objects(copy=False).fillna(pandas.NA)
554
555    if debug:
556        dprint("Converting columns to datetimes: " + str(datetime_cols))
557
558    try:
559        if not using_dask:
560            df[datetime_cols] = df[datetime_cols].apply(to_datetime)
561        else:
562            df[datetime_cols] = df[datetime_cols].apply(
563                to_datetime,
564                utc=True,
565                axis=1,
566                meta={
567                    col: 'datetime64[ns, UTC]'
568                    for col in datetime_cols
569                }
570            )
571    except Exception:
572        warn(
573            f"Unable to apply `pd.to_datetime` to {items_str(datetime_cols)}:\n"
574            + f"{traceback.format_exc()}"
575        )
576
577    if strip_timezone:
578        for dt in datetime_cols:
579            try:
580                df[dt] = df[dt].dt.tz_localize(None)
581            except Exception:
582                warn(
583                    f"Unable to convert column '{dt}' to naive datetime:\n"
584                    + f"{traceback.format_exc()}"
585                )
586
587    return df.fillna(pandas.NA)

Parse a pandas DataFrame for datetime columns and cast as datetimes.

Parameters
  • df (pd.DataFrame): The pandas DataFrame to parse.
  • ignore_cols (Optional[Iterable[str]], default None): If provided, do not attempt to coerce these columns as datetimes.
  • strip_timezone (bool, default False): If True, remove the UTC tzinfo property.
  • chunksize (Optional[int], default None): If the pandas implementation is 'dask', use this chunksize for the distributed dataframe.
  • dtype_backend (str, default 'numpy_nullable'): If df is not a DataFrame and new one needs to be constructed, use this as the datatypes backend. Accepted values are 'numpy_nullable' and 'pyarrow'.
  • ignore_all (bool, default False): If True, do not attempt to cast any columns to datetimes.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A new pandas DataFrame with the determined datetime columns
  • (usually ISO strings) cast as datetimes.
Examples
>>> import pandas as pd
>>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 
>>> df.dtypes
a    object
dtype: object
>>> df = parse_df_datetimes(df)
>>> df.dtypes
a    datetime64[ns]
dtype: object
def get_unhashable_cols(df: pandas.core.frame.DataFrame) -> List[str]:
590def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]:
591    """
592    Get the columns which contain unhashable objects from a Pandas DataFrame.
593
594    Parameters
595    ----------
596    df: pd.DataFrame
597        The DataFrame which may contain unhashable objects.
598
599    Returns
600    -------
601    A list of columns.
602    """
603    if df is None:
604        return []
605    if len(df) == 0:
606        return []
607
608    is_dask = 'dask' in df.__module__
609    if is_dask:
610        from meerschaum.utils.packages import attempt_import
611        pandas = attempt_import('pandas')
612        df = pandas.DataFrame(get_first_valid_dask_partition(df))
613    return [
614        col for col, val in df.iloc[0].items()
615        if not isinstance(val, Hashable)
616    ]

Get the columns which contain unhashable objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
  • A list of columns.
def get_json_cols(df: pandas.core.frame.DataFrame) -> List[str]:
619def get_json_cols(df: 'pd.DataFrame') -> List[str]:
620    """
621    Get the columns which contain unhashable objects from a Pandas DataFrame.
622
623    Parameters
624    ----------
625    df: pd.DataFrame
626        The DataFrame which may contain unhashable objects.
627
628    Returns
629    -------
630    A list of columns to be encoded as JSON.
631    """
632    if df is None:
633        return []
634
635    is_dask = 'dask' in df.__module__ if hasattr(df, '__module__') else False
636    if is_dask:
637        df = get_first_valid_dask_partition(df)
638
639    if len(df) == 0:
640        return []
641
642    cols_indices = {
643        col: df[col].first_valid_index()
644        for col in df.columns
645    }
646    return [
647        col
648        for col, ix in cols_indices.items()
649        if (
650            ix is not None
651            and
652            not isinstance(df.loc[ix][col], Hashable)
653        )
654    ]

Get the columns which contain unhashable objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
  • A list of columns to be encoded as JSON.
def get_numeric_cols(df: pandas.core.frame.DataFrame) -> List[str]:
657def get_numeric_cols(df: 'pd.DataFrame') -> List[str]:
658    """
659    Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame.
660
661    Parameters
662    ----------
663    df: pd.DataFrame
664        The DataFrame which may contain decimal objects.
665
666    Returns
667    -------
668    A list of columns to treat as numerics.
669    """
670    if df is None:
671        return []
672    from decimal import Decimal
673    is_dask = 'dask' in df.__module__
674    if is_dask:
675        df = get_first_valid_dask_partition(df)
676
677    if len(df) == 0:
678        return []
679
680    cols_indices = {
681        col: df[col].first_valid_index()
682        for col in df.columns
683    }
684    return [
685        col
686        for col, ix in cols_indices.items()
687        if (
688            ix is not None
689            and
690            isinstance(df.loc[ix][col], Decimal)
691        )
692    ]

Get the columns which contain decimal.Decimal objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain decimal objects.
Returns
  • A list of columns to treat as numerics.
def get_uuid_cols(df: pandas.core.frame.DataFrame) -> List[str]:
695def get_uuid_cols(df: 'pd.DataFrame') -> List[str]:
696    """
697    Get the columns which contain `uuid.UUID` objects from a Pandas DataFrame.
698
699    Parameters
700    ----------
701    df: pd.DataFrame
702        The DataFrame which may contain UUID objects.
703
704    Returns
705    -------
706    A list of columns to treat as UUIDs.
707    """
708    if df is None:
709        return []
710    from uuid import UUID
711    is_dask = 'dask' in df.__module__
712    if is_dask:
713        df = get_first_valid_dask_partition(df)
714
715    if len(df) == 0:
716        return []
717
718    cols_indices = {
719        col: df[col].first_valid_index()
720        for col in df.columns
721    }
722    return [
723        col
724        for col, ix in cols_indices.items()
725        if (
726            ix is not None
727            and
728            isinstance(df.loc[ix][col], UUID)
729        )
730    ]

Get the columns which contain uuid.UUID objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain UUID objects.
Returns
  • A list of columns to treat as UUIDs.
def get_datetime_cols( df: pandas.core.frame.DataFrame, timezone_aware: bool = True, timezone_naive: bool = True) -> List[str]:
733def get_datetime_cols(
734    df: 'pd.DataFrame',
735    timezone_aware: bool = True,
736    timezone_naive: bool = True,
737) -> List[str]:
738    """
739    Get the columns which contain `datetime` or `Timestamp` objects from a Pandas DataFrame.
740
741    Parameters
742    ----------
743    df: pd.DataFrame
744        The DataFrame which may contain `datetime` or `Timestamp` objects.
745
746    timezone_aware: bool, default True
747        If `True`, include timezone-aware datetime columns.
748
749    timezone_naive: bool, default True
750        If `True`, include timezone-naive datetime columns.
751
752    Returns
753    -------
754    A list of columns to treat as datetimes.
755    """
756    if not timezone_aware and not timezone_naive:
757        raise ValueError("`timezone_aware` and `timezone_naive` cannot both be `False`.")
758
759    if df is None:
760        return []
761
762    from datetime import datetime
763    from meerschaum.utils.dtypes import are_dtypes_equal
764    is_dask = 'dask' in df.__module__
765    if is_dask:
766        df = get_first_valid_dask_partition(df)
767
768    known_dt_cols = [
769        col
770        for col, typ in df.dtypes.items()
771        if are_dtypes_equal('datetime', str(typ))
772    ]
773
774    if len(df) == 0:
775        return known_dt_cols
776
777    cols_indices = {
778        col: df[col].first_valid_index()
779        for col in df.columns
780        if col not in known_dt_cols
781    }
782    pydt_cols = [
783        col
784        for col, ix in cols_indices.items()
785        if (
786            ix is not None
787            and
788            isinstance(df.loc[ix][col], datetime)
789        )
790    ]
791    dt_cols_set = set(known_dt_cols + pydt_cols)
792    all_dt_cols = [
793        col
794        for col in df.columns
795        if col in dt_cols_set
796    ]
797    if timezone_aware and timezone_naive:
798        return all_dt_cols
799
800    known_timezone_aware_dt_cols = [
801        col
802        for col in known_dt_cols
803        if getattr(df[col], 'tz', None) is not None
804    ]
805    timezone_aware_pydt_cols = [
806        col
807        for col in pydt_cols
808        if df.loc[cols_indices[col]][col].tzinfo is not None
809    ]
810    timezone_aware_dt_cols_set = set(known_timezone_aware_dt_cols + timezone_aware_pydt_cols)
811    if timezone_aware:
812        return [
813            col
814            for col in all_dt_cols
815            if col in timezone_aware_pydt_cols
816        ]
817
818    return [
819        col
820        for col in all_dt_cols
821        if col not in timezone_aware_dt_cols_set
822    ]

Get the columns which contain datetime or Timestamp objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain datetime or Timestamp objects.
  • timezone_aware (bool, default True): If True, include timezone-aware datetime columns.
  • timezone_naive (bool, default True): If True, include timezone-naive datetime columns.
Returns
  • A list of columns to treat as datetimes.
def get_bytes_cols(df: pandas.core.frame.DataFrame) -> List[str]:
825def get_bytes_cols(df: 'pd.DataFrame') -> List[str]:
826    """
827    Get the columns which contain bytes strings from a Pandas DataFrame.
828
829    Parameters
830    ----------
831    df: pd.DataFrame
832        The DataFrame which may contain bytes strings.
833
834    Returns
835    -------
836    A list of columns to treat as bytes.
837    """
838    if df is None:
839        return []
840    is_dask = 'dask' in df.__module__
841    if is_dask:
842        df = get_first_valid_dask_partition(df)
843
844    if len(df) == 0:
845        return []
846
847    cols_indices = {
848        col: df[col].first_valid_index()
849        for col in df.columns
850    }
851    return [
852        col
853        for col, ix in cols_indices.items()
854        if (
855            ix is not None
856            and
857            isinstance(df.loc[ix][col], bytes)
858        )
859    ]

Get the columns which contain bytes strings from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain bytes strings.
Returns
  • A list of columns to treat as bytes.
def enforce_dtypes( df: pandas.core.frame.DataFrame, dtypes: Dict[str, str], safe_copy: bool = True, coerce_numeric: bool = True, coerce_timezone: bool = True, strip_timezone: bool = False, debug: bool = False) -> pandas.core.frame.DataFrame:
 862def enforce_dtypes(
 863    df: 'pd.DataFrame',
 864    dtypes: Dict[str, str],
 865    safe_copy: bool = True,
 866    coerce_numeric: bool = True,
 867    coerce_timezone: bool = True,
 868    strip_timezone: bool = False,
 869    debug: bool = False,
 870) -> 'pd.DataFrame':
 871    """
 872    Enforce the `dtypes` dictionary on a DataFrame.
 873
 874    Parameters
 875    ----------
 876    df: pd.DataFrame
 877        The DataFrame on which to enforce dtypes.
 878
 879    dtypes: Dict[str, str]
 880        The data types to attempt to enforce on the DataFrame.
 881
 882    safe_copy: bool, default True
 883        If `True`, create a copy before comparing and modifying the dataframes.
 884        Setting to `False` may mutate the DataFrames.
 885        See `meerschaum.utils.dataframe.filter_unseen_df`.
 886
 887    coerce_numeric: bool, default True
 888        If `True`, convert float and int collisions to numeric.
 889
 890    coerce_timezone: bool, default True
 891        If `True`, convert datetimes to UTC.
 892
 893    strip_timezone: bool, default False
 894        If `coerce_timezone` and `strip_timezone` are `True`,
 895        remove timezone information from datetimes.
 896
 897    debug: bool, default False
 898        Verbosity toggle.
 899
 900    Returns
 901    -------
 902    The Pandas DataFrame with the types enforced.
 903    """
 904    import json
 905    import functools
 906    from meerschaum.utils.debug import dprint
 907    from meerschaum.utils.formatting import pprint
 908    from meerschaum.utils.dtypes import (
 909        are_dtypes_equal,
 910        to_pandas_dtype,
 911        is_dtype_numeric,
 912        attempt_cast_to_numeric,
 913        attempt_cast_to_uuid,
 914        attempt_cast_to_bytes,
 915        coerce_timezone as _coerce_timezone,
 916    )
 917    from meerschaum.utils.dtypes.sql import get_numeric_precision_scale
 918    pandas = mrsm.attempt_import('pandas')
 919    is_dask = 'dask' in df.__module__
 920    if safe_copy:
 921        df = df.copy()
 922    if len(df.columns) == 0:
 923        if debug:
 924            dprint("Incoming DataFrame has no columns. Skipping enforcement...")
 925        return df
 926
 927    pipe_pandas_dtypes = {
 928        col: to_pandas_dtype(typ)
 929        for col, typ in dtypes.items()
 930    }
 931    json_cols = [
 932        col
 933        for col, typ in dtypes.items()
 934        if typ == 'json'
 935    ]
 936    numeric_cols = [
 937        col
 938        for col, typ in dtypes.items()
 939        if typ.startswith('numeric')
 940    ]
 941    uuid_cols = [
 942        col
 943        for col, typ in dtypes.items()
 944        if typ == 'uuid'
 945    ]
 946    bytes_cols = [
 947        col
 948        for col, typ in dtypes.items()
 949        if typ == 'bytes'
 950    ]
 951    datetime_cols = [
 952        col
 953        for col, typ in dtypes.items()
 954        if are_dtypes_equal(typ, 'datetime')
 955    ]
 956    df_numeric_cols = get_numeric_cols(df)
 957    if debug:
 958        dprint("Desired data types:")
 959        pprint(dtypes)
 960        dprint("Data types for incoming DataFrame:")
 961        pprint({_col: str(_typ) for _col, _typ in df.dtypes.items()})
 962
 963    if json_cols and len(df) > 0:
 964        if debug:
 965            dprint(f"Checking columns for JSON encoding: {json_cols}")
 966        for col in json_cols:
 967            if col in df.columns:
 968                try:
 969                    df[col] = df[col].apply(
 970                        (
 971                            lambda x: (
 972                                json.loads(x)
 973                                if isinstance(x, str)
 974                                else x
 975                            )
 976                        )
 977                    )
 978                except Exception as e:
 979                    if debug:
 980                        dprint(f"Unable to parse column '{col}' as JSON:\n{e}")
 981
 982    if numeric_cols:
 983        if debug:
 984            dprint(f"Checking for numerics: {numeric_cols}")
 985        for col in numeric_cols:
 986            precision, scale = get_numeric_precision_scale(None, dtypes.get(col, ''))
 987            if col in df.columns:
 988                try:
 989                    df[col] = df[col].apply(
 990                        functools.partial(
 991                            attempt_cast_to_numeric,
 992                            quantize=True,
 993                            precision=precision,
 994                            scale=scale,
 995                        )
 996                    )
 997                except Exception as e:
 998                    if debug:
 999                        dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}")
1000
1001    if uuid_cols:
1002        if debug:
1003            dprint(f"Checking for UUIDs: {uuid_cols}")
1004        for col in uuid_cols:
1005            if col in df.columns:
1006                try:
1007                    df[col] = df[col].apply(attempt_cast_to_uuid)
1008                except Exception as e:
1009                    if debug:
1010                        dprint(f"Unable to parse column '{col}' as UUID:\n{e}")
1011
1012    if bytes_cols:
1013        if debug:
1014            dprint(f"Checking for bytes: {bytes_cols}")
1015        for col in bytes_cols:
1016            if col in df.columns:
1017                try:
1018                    df[col] = df[col].apply(attempt_cast_to_bytes)
1019                except Exception as e:
1020                    if debug:
1021                        dprint(f"Unable to parse column '{col}' as bytes:\n{e}")
1022
1023    if datetime_cols and coerce_timezone:
1024        if debug:
1025            dprint(f"Checking for datetime conversion: {datetime_cols}")
1026        for col in datetime_cols:
1027            if col in df.columns:
1028                df[col] = _coerce_timezone(df[col], strip_utc=strip_timezone)
1029
1030    df_dtypes = {c: str(t) for c, t in df.dtypes.items()}
1031    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
1032        if debug:
1033            dprint("Data types match. Exiting enforcement...")
1034        return df
1035
1036    common_dtypes = {}
1037    common_diff_dtypes = {}
1038    for col, typ in pipe_pandas_dtypes.items():
1039        if col in df_dtypes:
1040            common_dtypes[col] = typ
1041            if not are_dtypes_equal(typ, df_dtypes[col]):
1042                common_diff_dtypes[col] = df_dtypes[col]
1043
1044    if debug:
1045        dprint("Common columns with different dtypes:")
1046        pprint(common_diff_dtypes)
1047
1048    detected_dt_cols = {}
1049    for col, typ in common_diff_dtypes.items():
1050        if 'datetime' in typ and 'datetime' in common_dtypes[col]:
1051            df_dtypes[col] = typ
1052            detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col])
1053    for col in detected_dt_cols:
1054        del common_diff_dtypes[col]
1055
1056    if debug:
1057        dprint("Common columns with different dtypes (after dates):")
1058        pprint(common_diff_dtypes)
1059
1060    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
1061        if debug:
1062            dprint(
1063                "The incoming DataFrame has mostly the same types, skipping enforcement."
1064                + "The only detected difference was in the following datetime columns."
1065            )
1066            pprint(detected_dt_cols)
1067        return df
1068
1069    for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items():
1070        previous_typ = common_dtypes[col]
1071        mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ))
1072        explicitly_float = are_dtypes_equal(dtypes.get(col, 'object'), 'float')
1073        explicitly_numeric = dtypes.get(col, 'numeric').startswith('numeric')
1074        cast_to_numeric = (
1075            explicitly_numeric
1076            or col in df_numeric_cols
1077            or (mixed_numeric_types and not explicitly_float)
1078        ) and coerce_numeric
1079        if cast_to_numeric:
1080            common_dtypes[col] = attempt_cast_to_numeric
1081            common_diff_dtypes[col] = attempt_cast_to_numeric
1082
1083    for d in common_diff_dtypes:
1084        t = common_dtypes[d]
1085        if debug:
1086            dprint(f"Casting column {d} to dtype {t}.")
1087        try:
1088            df[d] = (
1089                df[d].apply(t)
1090                if callable(t)
1091                else df[d].astype(t)
1092            )
1093        except Exception as e:
1094            if debug:
1095                dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}")
1096            if 'int' in str(t).lower():
1097                try:
1098                    df[d] = df[d].astype('float64').astype(t)
1099                except Exception:
1100                    if debug:
1101                        dprint(f"Was unable to convert to float then {t}.")
1102    return df

Enforce the dtypes dictionary on a DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame on which to enforce dtypes.
  • dtypes (Dict[str, str]): The data types to attempt to enforce on the DataFrame.
  • safe_copy (bool, default True): If True, create a copy before comparing and modifying the dataframes. Setting to False may mutate the DataFrames. See meerschaum.utils.dataframe.filter_unseen_df.
  • coerce_numeric (bool, default True): If True, convert float and int collisions to numeric.
  • coerce_timezone (bool, default True): If True, convert datetimes to UTC.
  • strip_timezone (bool, default False): If coerce_timezone and strip_timezone are True, remove timezone information from datetimes.
  • debug (bool, default False): Verbosity toggle.
Returns
  • The Pandas DataFrame with the types enforced.
def get_datetime_bound_from_df( df: Union[pandas.core.frame.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]]], datetime_column: str, minimum: bool = True) -> Union[int, datetime.datetime, NoneType]:
1105def get_datetime_bound_from_df(
1106    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]],
1107    datetime_column: str,
1108    minimum: bool = True,
1109) -> Union[int, datetime, None]:
1110    """
1111    Return the minimum or maximum datetime (or integer) from a DataFrame.
1112
1113    Parameters
1114    ----------
1115    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]
1116        The DataFrame, list, or dict which contains the range axis.
1117
1118    datetime_column: str
1119        The name of the datetime (or int) column.
1120
1121    minimum: bool
1122        Whether to return the minimum (default) or maximum value.
1123
1124    Returns
1125    -------
1126    The minimum or maximum datetime value in the dataframe, or `None`.
1127    """
1128    from meerschaum.utils.dtypes import to_datetime, value_is_null
1129
1130    if df is None:
1131        return None
1132    if not datetime_column:
1133        return None
1134
1135    def compare(a, b):
1136        if a is None:
1137            return b
1138        if b is None:
1139            return a
1140        if minimum:
1141            return a if a < b else b
1142        return a if a > b else b
1143
1144    if isinstance(df, list):
1145        if len(df) == 0:
1146            return None
1147        best_yet = df[0].get(datetime_column, None)
1148        for doc in df:
1149            val = doc.get(datetime_column, None)
1150            best_yet = compare(best_yet, val)
1151        return best_yet
1152
1153    if isinstance(df, dict):
1154        if datetime_column not in df:
1155            return None
1156        best_yet = df[datetime_column][0]
1157        for val in df[datetime_column]:
1158            best_yet = compare(best_yet, val)
1159        return best_yet
1160
1161    if 'DataFrame' in str(type(df)):
1162        from meerschaum.utils.dtypes import are_dtypes_equal
1163        pandas = mrsm.attempt_import('pandas')
1164        is_dask = 'dask' in df.__module__
1165
1166        if datetime_column not in df.columns:
1167            return None
1168
1169        try:
1170            dt_val = (
1171                df[datetime_column].min(skipna=True)
1172                if minimum
1173                else df[datetime_column].max(skipna=True)
1174            )
1175        except Exception:
1176            dt_val = pandas.NA
1177        if is_dask and dt_val is not None and dt_val is not pandas.NA:
1178            dt_val = dt_val.compute()
1179
1180        return (
1181            to_datetime(dt_val, as_pydatetime=True)
1182            if are_dtypes_equal(str(type(dt_val)), 'datetime')
1183            else (dt_val if not value_is_null(dt_val) else None)
1184        )
1185
1186    return None

Return the minimum or maximum datetime (or integer) from a DataFrame.

Parameters
  • df (Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]): The DataFrame, list, or dict which contains the range axis.
  • datetime_column (str): The name of the datetime (or int) column.
  • minimum (bool): Whether to return the minimum (default) or maximum value.
Returns
  • The minimum or maximum datetime value in the dataframe, or None.
def get_unique_index_values( df: Union[pandas.core.frame.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]]], indices: List[str]) -> Dict[str, List[Any]]:
1189def get_unique_index_values(
1190    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]],
1191    indices: List[str],
1192) -> Dict[str, List[Any]]:
1193    """
1194    Return a dictionary of the unique index values in a DataFrame.
1195
1196    Parameters
1197    ----------
1198    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]
1199        The dataframe (or list or dict) which contains index values.
1200
1201    indices: List[str]
1202        The list of index columns.
1203
1204    Returns
1205    -------
1206    A dictionary mapping indices to unique values.
1207    """
1208    if df is None:
1209        return {}
1210    if 'dataframe' in str(type(df)).lower():
1211        pandas = mrsm.attempt_import('pandas')
1212        return {
1213            col: list({
1214                (val if val is not pandas.NA else None)
1215                for val in df[col].unique()
1216            })
1217            for col in indices
1218            if col in df.columns
1219        }
1220
1221    unique_indices = defaultdict(lambda: set())
1222    if isinstance(df, list):
1223        for doc in df:
1224            for index in indices:
1225                if index in doc:
1226                    unique_indices[index].add(doc[index])
1227
1228    elif isinstance(df, dict):
1229        for index in indices:
1230            if index in df:
1231                unique_indices[index] = unique_indices[index].union(set(df[index]))
1232
1233    return {key: list(val) for key, val in unique_indices.items()}

Return a dictionary of the unique index values in a DataFrame.

Parameters
  • df (Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]): The dataframe (or list or dict) which contains index values.
  • indices (List[str]): The list of index columns.
Returns
  • A dictionary mapping indices to unique values.
def df_is_chunk_generator(df: Any) -> bool:
1236def df_is_chunk_generator(df: Any) -> bool:
1237    """
1238    Determine whether to treat `df` as a chunk generator.
1239
1240    Note this should only be used in a context where generators are expected,
1241    as it will return `True` for any iterable.
1242
1243    Parameters
1244    ----------
1245    The DataFrame or chunk generator to evaluate.
1246
1247    Returns
1248    -------
1249    A `bool` indicating whether to treat `df` as a generator.
1250    """
1251    return (
1252        not isinstance(df, (dict, list, str))
1253        and 'DataFrame' not in str(type(df))
1254        and isinstance(df, (Generator, Iterable, Iterator))
1255    )

Determine whether to treat df as a chunk generator.

Note this should only be used in a context where generators are expected, as it will return True for any iterable.

Parameters
  • The DataFrame or chunk generator to evaluate.
Returns
  • A bool indicating whether to treat df as a generator.
def chunksize_to_npartitions(chunksize: Optional[int]) -> int:
1258def chunksize_to_npartitions(chunksize: Optional[int]) -> int:
1259    """
1260    Return the Dask `npartitions` value for a given `chunksize`.
1261    """
1262    if chunksize == -1:
1263        from meerschaum.config import get_config
1264        chunksize = get_config('system', 'connectors', 'sql', 'chunksize')
1265    if chunksize is None:
1266        return 1
1267    return -1 * chunksize

Return the Dask npartitions value for a given chunksize.

def df_from_literal( pipe: Optional[meerschaum.Pipe] = None, literal: str = None, debug: bool = False) -> pandas.core.frame.DataFrame:
1270def df_from_literal(
1271    pipe: Optional[mrsm.Pipe] = None,
1272    literal: str = None,
1273    debug: bool = False
1274) -> 'pd.DataFrame':
1275    """
1276    Construct a dataframe from a literal value, using the pipe's datetime and value column names.
1277
1278    Parameters
1279    ----------
1280    pipe: Optional['meerschaum.Pipe'], default None
1281        The pipe which will consume the literal value.
1282
1283    Returns
1284    -------
1285    A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
1286    and the literal as the value.
1287    """
1288    from meerschaum.utils.packages import import_pandas
1289    from meerschaum.utils.warnings import error, warn
1290    from meerschaum.utils.debug import dprint
1291
1292    if pipe is None or literal is None:
1293        error("Please provide a Pipe and a literal value")
1294    ### this will raise an error if the columns are undefined
1295    dt_name, val_name = pipe.get_columns('datetime', 'value')
1296
1297    val = literal
1298    if isinstance(literal, str):
1299        if debug:
1300            dprint(f"Received literal string: '{literal}'")
1301        import ast
1302        try:
1303            val = ast.literal_eval(literal)
1304        except Exception:
1305            warn(
1306                "Failed to parse value from string:\n" + f"{literal}" +
1307                "\n\nWill cast as a string instead."\
1308            )
1309            val = literal
1310
1311    from datetime import datetime, timezone
1312    now = datetime.now(timezone.utc).replace(tzinfo=None)
1313
1314    pd = import_pandas()
1315    return pd.DataFrame({dt_name: [now], val_name: [val]})

Construct a dataframe from a literal value, using the pipe's datetime and value column names.

Parameters
  • pipe (Optional['meerschaum.Pipe'], default None): The pipe which will consume the literal value.
Returns
  • A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
  • and the literal as the value.
def get_first_valid_dask_partition( ddf: "'dask.dataframe.DataFrame'") -> Optional[pandas.core.frame.DataFrame]:
1318def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]:
1319    """
1320    Return the first valid Dask DataFrame partition (if possible).
1321    """
1322    pdf = None
1323    for partition in ddf.partitions:
1324        try:
1325            pdf = partition.compute()
1326        except Exception:
1327            continue
1328        if len(pdf) > 0:
1329            return pdf
1330    _ = mrsm.attempt_import('partd', lazy=False)
1331    return ddf.compute()

Return the first valid Dask DataFrame partition (if possible).

def query_df( df: pandas.core.frame.DataFrame, params: Optional[Dict[str, Any]] = None, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, datetime_column: Optional[str] = None, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, inplace: bool = False, reset_index: bool = False, coerce_types: bool = False, debug: bool = False) -> pandas.core.frame.DataFrame:
1334def query_df(
1335    df: 'pd.DataFrame',
1336    params: Optional[Dict[str, Any]] = None,
1337    begin: Union[datetime, int, None] = None,
1338    end: Union[datetime, int, None] = None,
1339    datetime_column: Optional[str] = None,
1340    select_columns: Optional[List[str]] = None,
1341    omit_columns: Optional[List[str]] = None,
1342    inplace: bool = False,
1343    reset_index: bool = False,
1344    coerce_types: bool = False,
1345    debug: bool = False,
1346) -> 'pd.DataFrame':
1347    """
1348    Query the dataframe with the params dictionary.
1349
1350    Parameters
1351    ----------
1352    df: pd.DataFrame
1353        The DataFrame to query against.
1354
1355    params: Optional[Dict[str, Any]], default None
1356        The parameters dictionary to use for the query.
1357
1358    begin: Union[datetime, int, None], default None
1359        If `begin` and `datetime_column` are provided, only return rows with a timestamp
1360        greater than or equal to this value.
1361
1362    end: Union[datetime, int, None], default None
1363        If `begin` and `datetime_column` are provided, only return rows with a timestamp
1364        less than this value.
1365
1366    datetime_column: Optional[str], default None
1367        A `datetime_column` must be provided to use `begin` and `end`.
1368
1369    select_columns: Optional[List[str]], default None
1370        If provided, only return these columns.
1371
1372    omit_columns: Optional[List[str]], default None
1373        If provided, do not include these columns in the result.
1374
1375    inplace: bool, default False
1376        If `True`, modify the DataFrame inplace rather than creating a new DataFrame.
1377
1378    reset_index: bool, default False
1379        If `True`, reset the index in the resulting DataFrame.
1380
1381    coerce_types: bool, default False
1382        If `True`, cast the dataframe and parameters as strings before querying.
1383
1384    Returns
1385    -------
1386    A Pandas DataFrame query result.
1387    """
1388
1389    def _process_select_columns(_df):
1390        if not select_columns:
1391            return
1392        for col in list(_df.columns):
1393            if col not in select_columns:
1394                del _df[col]
1395
1396    def _process_omit_columns(_df):
1397        if not omit_columns:
1398            return
1399        for col in list(_df.columns):
1400            if col in omit_columns:
1401                del _df[col]
1402
1403    if not params and not begin and not end:
1404        if not inplace:
1405            df = df.copy()
1406        _process_select_columns(df)
1407        _process_omit_columns(df)
1408        return df
1409
1410    from meerschaum.utils.debug import dprint
1411    from meerschaum.utils.misc import get_in_ex_params
1412    from meerschaum.utils.warnings import warn
1413    from meerschaum.utils.dtypes import are_dtypes_equal, value_is_null
1414    dateutil_parser = mrsm.attempt_import('dateutil.parser')
1415    pandas = mrsm.attempt_import('pandas')
1416    NA = pandas.NA
1417
1418    if params:
1419        params = params.copy()
1420        for key, val in {k: v for k, v in params.items()}.items():
1421            if isinstance(val, (list, tuple)):
1422                if None in val:
1423                    val = [item for item in val if item is not None] + [NA]
1424                    params[key] = val
1425                if coerce_types:
1426                    params[key] = [str(x) for x in val]
1427            else:
1428                if value_is_null(val):
1429                    val = NA
1430                    params[key] = NA
1431                if coerce_types:
1432                    params[key] = str(val)
1433
1434    dtypes = {col: str(typ) for col, typ in df.dtypes.items()}
1435
1436    if inplace:
1437        df.fillna(NA, inplace=True)
1438    else:
1439        df = df.infer_objects().fillna(NA)
1440
1441    if isinstance(begin, str):
1442        begin = dateutil_parser.parse(begin)
1443    if isinstance(end, str):
1444        end = dateutil_parser.parse(end)
1445
1446    if begin is not None or end is not None:
1447        if not datetime_column or datetime_column not in df.columns:
1448            warn(
1449                f"The datetime column '{datetime_column}' is not present in the Dataframe, "
1450                + "ignoring begin and end...",
1451            )
1452            begin, end = None, None
1453
1454    if debug:
1455        dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}")
1456
1457    if datetime_column and (begin is not None or end is not None):
1458        if debug:
1459            dprint("Checking for datetime column compatability.")
1460
1461        from meerschaum.utils.dtypes import coerce_timezone
1462        df_is_dt = are_dtypes_equal(str(df.dtypes[datetime_column]), 'datetime')
1463        begin_is_int = are_dtypes_equal(str(type(begin)), 'int')
1464        end_is_int = are_dtypes_equal(str(type(end)), 'int')
1465
1466        if df_is_dt:
1467            df_tz = (
1468                getattr(df[datetime_column].dt, 'tz', None)
1469                if hasattr(df[datetime_column], 'dt')
1470                else None
1471            )
1472
1473            if begin_is_int:
1474                begin = datetime.fromtimestamp(int(begin), timezone.utc).replace(tzinfo=None)
1475                if debug:
1476                    dprint(f"`begin` will be cast to '{begin}'.")
1477            if end_is_int:
1478                end = datetime.fromtimestamp(int(end), timezone.utc).replace(tzinfo=None)
1479                if debug:
1480                    dprint(f"`end` will be cast to '{end}'.")
1481
1482            begin = coerce_timezone(begin, strip_utc=(df_tz is None)) if begin is not None else None
1483            end = coerce_timezone(end, strip_utc=(df_tz is None)) if begin is not None else None
1484
1485    in_ex_params = get_in_ex_params(params)
1486
1487    masks = [
1488        (
1489            (df[datetime_column] >= begin)
1490            if begin is not None and datetime_column
1491            else True
1492        ) & (
1493            (df[datetime_column] < end)
1494            if end is not None and datetime_column
1495            else True
1496        )
1497    ]
1498
1499    masks.extend([
1500        (
1501            (
1502                (df[col] if not coerce_types else df[col].astype(str)).isin(in_vals)
1503                if in_vals
1504                else True
1505            ) & (
1506                ~(df[col] if not coerce_types else df[col].astype(str)).isin(ex_vals)
1507                if ex_vals
1508                else True
1509            )
1510        )
1511        for col, (in_vals, ex_vals) in in_ex_params.items()
1512        if col in df.columns
1513    ])
1514    query_mask = masks[0]
1515    for mask in masks[1:]:
1516        query_mask = query_mask & mask
1517
1518    original_cols = df.columns
1519
1520    ### NOTE: We must cast bool columns to `boolean[pyarrow]`
1521    ###       to allow for `<NA>` values.
1522    bool_cols = [
1523        col
1524        for col, typ in df.dtypes.items()
1525        if are_dtypes_equal(str(typ), 'bool')
1526    ]
1527    for col in bool_cols:
1528        df[col] = df[col].astype('boolean[pyarrow]')
1529
1530    if not isinstance(query_mask, bool):
1531        df['__mrsm_mask'] = (
1532            query_mask.astype('boolean[pyarrow]')
1533            if hasattr(query_mask, 'astype')
1534            else query_mask
1535        )
1536
1537        if inplace:
1538            df.where(query_mask, other=NA, inplace=True)
1539            df.dropna(how='all', inplace=True)
1540            result_df = df
1541        else:
1542            result_df = df.where(query_mask, other=NA)
1543            result_df.dropna(how='all', inplace=True)
1544
1545    else:
1546        result_df = df
1547
1548    if '__mrsm_mask' in df.columns:
1549        del df['__mrsm_mask']
1550    if '__mrsm_mask' in result_df.columns:
1551        del result_df['__mrsm_mask']
1552
1553    if reset_index:
1554        result_df.reset_index(drop=True, inplace=True)
1555
1556    result_df = enforce_dtypes(
1557        result_df,
1558        dtypes,
1559        safe_copy=False,
1560        debug=debug,
1561        coerce_numeric=False,
1562        coerce_timezone=False,
1563    )
1564
1565    if select_columns == ['*']:
1566        select_columns = None
1567
1568    if not select_columns and not omit_columns:
1569        return result_df[original_cols]
1570
1571    _process_select_columns(result_df)
1572    _process_omit_columns(result_df)
1573
1574    return result_df

Query the dataframe with the params dictionary.

Parameters
  • df (pd.DataFrame): The DataFrame to query against.
  • params (Optional[Dict[str, Any]], default None): The parameters dictionary to use for the query.
  • begin (Union[datetime, int, None], default None): If begin and datetime_column are provided, only return rows with a timestamp greater than or equal to this value.
  • end (Union[datetime, int, None], default None): If begin and datetime_column are provided, only return rows with a timestamp less than this value.
  • datetime_column (Optional[str], default None): A datetime_column must be provided to use begin and end.
  • select_columns (Optional[List[str]], default None): If provided, only return these columns.
  • omit_columns (Optional[List[str]], default None): If provided, do not include these columns in the result.
  • inplace (bool, default False): If True, modify the DataFrame inplace rather than creating a new DataFrame.
  • reset_index (bool, default False): If True, reset the index in the resulting DataFrame.
  • coerce_types (bool, default False): If True, cast the dataframe and parameters as strings before querying.
Returns
  • A Pandas DataFrame query result.
def to_json( df: pandas.core.frame.DataFrame, safe_copy: bool = True, orient: str = 'records', date_format: str = 'iso', date_unit: str = 'us', **kwargs: Any) -> str:
1577def to_json(
1578    df: 'pd.DataFrame',
1579    safe_copy: bool = True,
1580    orient: str = 'records',
1581    date_format: str = 'iso',
1582    date_unit: str = 'us',
1583    **kwargs: Any
1584) -> str:
1585    """
1586    Serialize the given dataframe as a JSON string.
1587
1588    Parameters
1589    ----------
1590    df: pd.DataFrame
1591        The DataFrame to be serialized.
1592
1593    safe_copy: bool, default True
1594        If `False`, modify the DataFrame inplace.
1595
1596    date_format: str, default 'iso'
1597        The default format for timestamps.
1598
1599    date_unit: str, default 'us'
1600        The precision of the timestamps.
1601
1602    Returns
1603    -------
1604    A JSON string.
1605    """
1606    from meerschaum.utils.packages import import_pandas
1607    from meerschaum.utils.dtypes import serialize_bytes, serialize_decimal
1608    pd = import_pandas()
1609    uuid_cols = get_uuid_cols(df)
1610    bytes_cols = get_bytes_cols(df)
1611    numeric_cols = get_numeric_cols(df)
1612    if safe_copy and bool(uuid_cols or bytes_cols):
1613        df = df.copy()
1614    for col in uuid_cols:
1615        df[col] = df[col].astype(str)
1616    for col in bytes_cols:
1617        df[col] = df[col].apply(serialize_bytes)
1618    for col in numeric_cols:
1619        df[col] = df[col].apply(serialize_decimal)
1620    return df.infer_objects(copy=False).fillna(pd.NA).to_json(
1621        date_format=date_format,
1622        date_unit=date_unit,
1623        orient=orient,
1624        **kwargs
1625    )

Serialize the given dataframe as a JSON string.

Parameters
  • df (pd.DataFrame): The DataFrame to be serialized.
  • safe_copy (bool, default True): If False, modify the DataFrame inplace.
  • date_format (str, default 'iso'): The default format for timestamps.
  • date_unit (str, default 'us'): The precision of the timestamps.
Returns
  • A JSON string.