meerschaum.utils.dataframe

Utility functions for working with DataFrames.

   1#! /usr/bin/env python3
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4
   5"""
   6Utility functions for working with DataFrames.
   7"""
   8
   9from __future__ import annotations
  10from datetime import datetime, timezone
  11from collections import defaultdict
  12
  13import meerschaum as mrsm
  14from meerschaum.utils.typing import (
  15    Optional, Dict, Any, List, Hashable, Generator,
  16    Iterator, Iterable, Union, TYPE_CHECKING,
  17)
  18
  19if TYPE_CHECKING:
  20    pd, dask = mrsm.attempt_import('pandas', 'dask')
  21
  22
  23def add_missing_cols_to_df(
  24    df: 'pd.DataFrame',
  25    dtypes: Dict[str, Any],
  26) -> 'pd.DataFrame':
  27    """
  28    Add columns from the dtypes dictionary as null columns to a new DataFrame.
  29
  30    Parameters
  31    ----------
  32    df: pd.DataFrame
  33        The dataframe we should copy and add null columns.
  34
  35    dtypes:
  36        The data types dictionary which may contain keys not present in `df.columns`.
  37
  38    Returns
  39    -------
  40    A new `DataFrame` with the keys from `dtypes` added as null columns.
  41    If `df.dtypes` is the same as `dtypes`, then return a reference to `df`.
  42    NOTE: This will not ensure that dtypes are enforced!
  43
  44    Examples
  45    --------
  46    >>> import pandas as pd
  47    >>> df = pd.DataFrame([{'a': 1}])
  48    >>> dtypes = {'b': 'Int64'}
  49    >>> add_missing_cols_to_df(df, dtypes)
  50          a  b
  51       0  1  <NA>
  52    >>> add_missing_cols_to_df(df, dtypes).dtypes
  53    a    int64
  54    b    Int64
  55    dtype: object
  56    >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes
  57    a    int64
  58    dtype: object
  59    >>> 
  60    """
  61    if set(df.columns) == set(dtypes):
  62        return df
  63
  64    from meerschaum.utils.packages import attempt_import
  65    from meerschaum.utils.dtypes import to_pandas_dtype
  66    pandas = attempt_import('pandas')
  67
  68    def build_series(dtype: str):
  69        return pandas.Series([], dtype=to_pandas_dtype(dtype))
  70
  71    assign_kwargs = {
  72        str(col): build_series(str(typ))
  73        for col, typ in dtypes.items()
  74        if col not in df.columns
  75    }
  76    df_with_cols = df.assign(**assign_kwargs)
  77    for col in assign_kwargs:
  78        df_with_cols[col] = df_with_cols[col].fillna(pandas.NA)
  79    return df_with_cols
  80
  81
  82def filter_unseen_df(
  83    old_df: 'pd.DataFrame',
  84    new_df: 'pd.DataFrame',
  85    safe_copy: bool = True,
  86    dtypes: Optional[Dict[str, Any]] = None,
  87    include_unchanged_columns: bool = False,
  88    debug: bool = False,
  89) -> 'pd.DataFrame':
  90    """
  91    Left join two DataFrames to find the newest unseen data.
  92
  93    Parameters
  94    ----------
  95    old_df: 'pd.DataFrame'
  96        The original (target) dataframe. Acts as a filter on the `new_df`.
  97
  98    new_df: 'pd.DataFrame'
  99        The fetched (source) dataframe. Rows that are contained in `old_df` are removed.
 100
 101    safe_copy: bool, default True
 102        If `True`, create a copy before comparing and modifying the dataframes.
 103        Setting to `False` may mutate the DataFrames.
 104
 105    dtypes: Optional[Dict[str, Any]], default None
 106        Optionally specify the datatypes of the dataframe.
 107
 108    include_unchanged_columns: bool, default False
 109        If `True`, include columns which haven't changed on rows which have changed.
 110
 111    debug: bool, default False
 112        Verbosity toggle.
 113
 114    Returns
 115    -------
 116    A pandas dataframe of the new, unseen rows in `new_df`.
 117
 118    Examples
 119    --------
 120    ```python
 121    >>> import pandas as pd
 122    >>> df1 = pd.DataFrame({'a': [1,2]})
 123    >>> df2 = pd.DataFrame({'a': [2,3]})
 124    >>> filter_unseen_df(df1, df2)
 125       a
 126    0  3
 127
 128    ```
 129
 130    """
 131    if old_df is None:
 132        return new_df
 133
 134    if safe_copy:
 135        old_df = old_df.copy()
 136        new_df = new_df.copy()
 137
 138    import json
 139    import functools
 140    import traceback
 141    from decimal import Decimal
 142    from uuid import UUID
 143    from meerschaum.utils.warnings import warn
 144    from meerschaum.utils.packages import import_pandas, attempt_import
 145    from meerschaum.utils.dtypes import (
 146        to_pandas_dtype,
 147        are_dtypes_equal,
 148        attempt_cast_to_numeric,
 149        attempt_cast_to_uuid,
 150        coerce_timezone,
 151    )
 152    pd = import_pandas(debug=debug)
 153    is_dask = 'dask' in new_df.__module__
 154    if is_dask:
 155        pandas = attempt_import('pandas')
 156        _ = attempt_import('partd', lazy=False)
 157        dd = attempt_import('dask.dataframe')
 158        merge = dd.merge
 159        NA = pandas.NA
 160    else:
 161        merge = pd.merge
 162        NA = pd.NA
 163
 164    new_df_dtypes = dict(new_df.dtypes)
 165    old_df_dtypes = dict(old_df.dtypes)
 166
 167    same_cols = set(new_df.columns) == set(old_df.columns)
 168    if not same_cols:
 169        new_df = add_missing_cols_to_df(new_df, old_df_dtypes)
 170        old_df = add_missing_cols_to_df(old_df, new_df_dtypes)
 171
 172        new_types_missing_from_old = {
 173            col: typ
 174            for col, typ in new_df_dtypes.items()
 175            if col not in old_df_dtypes
 176        }
 177        old_types_missing_from_new = {
 178            col: typ
 179            for col, typ in new_df_dtypes.items()
 180            if col not in old_df_dtypes
 181        }
 182        old_df_dtypes.update(new_types_missing_from_old)
 183        new_df_dtypes.update(old_types_missing_from_new)
 184
 185    ### Edge case: two empty lists cast to DFs.
 186    elif len(new_df.columns) == 0:
 187        return new_df
 188
 189    try:
 190        ### Order matters when checking equality.
 191        new_df = new_df[old_df.columns]
 192
 193    except Exception as e:
 194        warn(
 195            "Was not able to cast old columns onto new DataFrame. " +
 196            f"Are both DataFrames the same shape? Error:\n{e}",
 197            stacklevel=3,
 198        )
 199        return new_df[list(new_df_dtypes.keys())]
 200
 201    ### assume the old_df knows what it's doing, even if it's technically wrong.
 202    if dtypes is None:
 203        dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()}
 204
 205    dtypes = {
 206        col: to_pandas_dtype(typ)
 207        for col, typ in dtypes.items()
 208        if col in new_df_dtypes and col in old_df_dtypes
 209    }
 210    for col, typ in new_df_dtypes.items():
 211        if col not in dtypes:
 212            dtypes[col] = typ
 213
 214    dt_dtypes = {
 215        col: typ
 216        for col, typ in dtypes.items()
 217        if are_dtypes_equal(typ, 'datetime')
 218    }
 219    non_dt_dtypes = {
 220        col: typ
 221        for col, typ in dtypes.items()
 222        if col not in dt_dtypes
 223    }
 224
 225    cast_non_dt_cols = True
 226    try:
 227        new_df = new_df.astype(non_dt_dtypes)
 228        cast_non_dt_cols = False
 229    except Exception as e:
 230        warn(
 231            f"Was not able to cast the new DataFrame to the given dtypes.\n{e}"
 232        )
 233
 234    cast_dt_cols = True
 235    try:
 236        for col, typ in dt_dtypes.items():
 237            strip_utc = (
 238                (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]'
 239            )
 240            if col in old_df.columns:
 241                old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc)
 242            if col in new_df.columns:
 243                new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc)
 244        cast_dt_cols = False
 245    except Exception as e:
 246        warn(f"Could not cast datetime columns:\n{e}")
 247
 248    cast_cols = cast_dt_cols or cast_non_dt_cols
 249
 250    new_numeric_cols_existing = get_numeric_cols(new_df)
 251    old_numeric_cols = get_numeric_cols(old_df)
 252    for col, typ in {k: v for k, v in dtypes.items()}.items():
 253        if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')):
 254            new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float')
 255            new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int')
 256            new_is_numeric = col in new_numeric_cols_existing
 257            old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float')
 258            old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int')
 259            old_is_numeric = col in old_numeric_cols
 260
 261            if (
 262                (new_is_float or new_is_int or new_is_numeric)
 263                and
 264                (old_is_float or old_is_int or old_is_numeric)
 265            ):
 266                dtypes[col] = attempt_cast_to_numeric
 267                cast_cols = True
 268                continue
 269
 270            ### Fallback to object if the types don't match.
 271            warn(
 272                f"Detected different types for '{col}' "
 273                + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), "
 274                + "falling back to 'object'..."
 275            )
 276            dtypes[col] = 'object'
 277            cast_cols = True
 278
 279    if cast_cols:
 280        for col, dtype in dtypes.items():
 281            if col in new_df.columns:
 282                try:
 283                    new_df[col] = (
 284                        new_df[col].astype(dtype)
 285                        if not callable(dtype)
 286                        else new_df[col].apply(dtype)
 287                    )
 288                except Exception as e:
 289                    warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}")
 290
 291    serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str)
 292    new_json_cols = get_json_cols(new_df)
 293    old_json_cols = get_json_cols(old_df)
 294    json_cols = set(new_json_cols + old_json_cols)
 295    for json_col in old_json_cols:
 296        old_df[json_col] = old_df[json_col].apply(serializer)
 297    for json_col in new_json_cols:
 298        new_df[json_col] = new_df[json_col].apply(serializer)
 299
 300    new_numeric_cols = get_numeric_cols(new_df)
 301    numeric_cols = set(new_numeric_cols + old_numeric_cols)
 302    for numeric_col in old_numeric_cols:
 303        old_df[numeric_col] = old_df[numeric_col].apply(
 304            lambda x: f'{x:f}' if isinstance(x, Decimal) else x
 305        )
 306    for numeric_col in new_numeric_cols:
 307        new_df[numeric_col] = new_df[numeric_col].apply(
 308            lambda x: f'{x:f}' if isinstance(x, Decimal) else x
 309        )
 310
 311    old_dt_cols = [
 312        col
 313        for col, typ in old_df.dtypes.items()
 314        if are_dtypes_equal(str(typ), 'datetime')
 315    ]
 316    for col in old_dt_cols:
 317        strip_utc = (
 318            (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]'
 319        )
 320        old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc)
 321
 322    new_dt_cols = [
 323        col
 324        for col, typ in new_df.dtypes.items()
 325        if are_dtypes_equal(str(typ), 'datetime')
 326    ]
 327    for col in new_dt_cols:
 328        strip_utc = (
 329            (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]'
 330        )
 331        new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc)
 332
 333    old_uuid_cols = get_uuid_cols(old_df)
 334    new_uuid_cols = get_uuid_cols(new_df)
 335    uuid_cols = set(new_uuid_cols + old_uuid_cols)
 336    joined_df = merge(
 337        new_df.infer_objects(copy=False).fillna(NA),
 338        old_df.infer_objects(copy=False).fillna(NA),
 339        how='left',
 340        on=None,
 341        indicator=True,
 342    )
 343    changed_rows_mask = (joined_df['_merge'] == 'left_only')
 344    new_cols = list(new_df_dtypes)
 345    delta_df = joined_df[new_cols][changed_rows_mask].reset_index(drop=True)
 346
 347    for json_col in json_cols:
 348        if json_col not in delta_df.columns:
 349            continue
 350        try:
 351            delta_df[json_col] = delta_df[json_col].apply(json.loads)
 352        except Exception:
 353            warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}")
 354
 355    for numeric_col in numeric_cols:
 356        if numeric_col not in delta_df.columns:
 357            continue
 358        try:
 359            delta_df[numeric_col] = delta_df[numeric_col].apply(attempt_cast_to_numeric)
 360        except Exception:
 361            warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}")
 362
 363    for uuid_col in uuid_cols:
 364        if uuid_col not in delta_df.columns:
 365            continue
 366        try:
 367            delta_df[uuid_col] = delta_df[uuid_col].apply(attempt_cast_to_uuid)
 368        except Exception:
 369            warn(f"Unable to parse numeric column '{uuid_col}':\n{traceback.format_exc()}")
 370
 371    return delta_df
 372
 373
 374def parse_df_datetimes(
 375    df: 'pd.DataFrame',
 376    ignore_cols: Optional[Iterable[str]] = None,
 377    strip_timezone: bool = False,
 378    chunksize: Optional[int] = None,
 379    dtype_backend: str = 'numpy_nullable',
 380    debug: bool = False,
 381) -> 'pd.DataFrame':
 382    """
 383    Parse a pandas DataFrame for datetime columns and cast as datetimes.
 384
 385    Parameters
 386    ----------
 387    df: pd.DataFrame
 388        The pandas DataFrame to parse.
 389
 390    ignore_cols: Optional[Iterable[str]], default None
 391        If provided, do not attempt to coerce these columns as datetimes.
 392
 393    strip_timezone: bool, default False
 394        If `True`, remove the UTC `tzinfo` property.
 395
 396    chunksize: Optional[int], default None
 397        If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe.
 398
 399    dtype_backend: str, default 'numpy_nullable'
 400        If `df` is not a DataFrame and new one needs to be constructed,
 401        use this as the datatypes backend.
 402        Accepted values are 'numpy_nullable' and 'pyarrow'.
 403
 404    debug: bool, default False
 405        Verbosity toggle.
 406
 407    Returns
 408    -------
 409    A new pandas DataFrame with the determined datetime columns
 410    (usually ISO strings) cast as datetimes.
 411
 412    Examples
 413    --------
 414    ```python
 415    >>> import pandas as pd
 416    >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 
 417    >>> df.dtypes
 418    a    object
 419    dtype: object
 420    >>> df = parse_df_datetimes(df)
 421    >>> df.dtypes
 422    a    datetime64[ns]
 423    dtype: object
 424
 425    ```
 426
 427    """
 428    from meerschaum.utils.packages import import_pandas, attempt_import
 429    from meerschaum.utils.debug import dprint
 430    from meerschaum.utils.warnings import warn
 431    from meerschaum.utils.misc import items_str
 432    import traceback
 433    pd = import_pandas()
 434    pandas = attempt_import('pandas')
 435    pd_name = pd.__name__
 436    using_dask = 'dask' in pd_name
 437    df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__)
 438    dask_dataframe = None
 439    if using_dask or df_is_dask:
 440        npartitions = chunksize_to_npartitions(chunksize)
 441        dask_dataframe = attempt_import('dask.dataframe')
 442
 443    ### if df is a dict, build DataFrame
 444    if isinstance(df, pandas.DataFrame):
 445        pdf = df
 446    elif df_is_dask and isinstance(df, dask_dataframe.DataFrame):
 447        pdf = get_first_valid_dask_partition(df)
 448    else:
 449        if debug:
 450            dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...")
 451
 452        if using_dask:
 453            if isinstance(df, list):
 454                keys = set()
 455                for doc in df:
 456                    for key in doc:
 457                        keys.add(key)
 458                df = pd.DataFrame.from_dict(
 459                    {
 460                        k: [
 461                            doc.get(k, None)
 462                            for doc in df
 463                        ] for k in keys
 464                    },
 465                    npartitions=npartitions,
 466                )
 467            elif isinstance(df, dict):
 468                df = pd.DataFrame.from_dict(df, npartitions=npartitions)
 469            elif 'pandas.core.frame.DataFrame' in str(type(df)):
 470                df = pd.from_pandas(df, npartitions=npartitions)
 471            else:
 472                raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.")
 473            pandas = attempt_import('pandas')
 474            pdf = get_first_valid_dask_partition(df)
 475
 476        else:
 477            df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend)
 478            pdf = df
 479
 480    ### skip parsing if DataFrame is empty
 481    if len(pdf) == 0:
 482        if debug:
 483            dprint(f"df is empty. Returning original DataFrame without casting datetime columns...")
 484        return df
 485
 486    ignore_cols = set(
 487        (ignore_cols or []) + [
 488            col
 489            for col, dtype in pdf.dtypes.items() 
 490            if 'datetime' in str(dtype)
 491        ]
 492    )
 493    cols_to_inspect = [col for col in pdf.columns if col not in ignore_cols]
 494
 495    if len(cols_to_inspect) == 0:
 496        if debug:
 497            dprint(f"All columns are ignored, skipping datetime detection...")
 498        return df.fillna(pandas.NA)
 499
 500    ### apply regex to columns to determine which are ISO datetimes
 501    iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+'
 502    dt_mask = pdf[cols_to_inspect].astype(str).apply(
 503        lambda s: s.str.match(iso_dt_regex).all()
 504    )
 505
 506    ### list of datetime column names
 507    datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]]
 508    if not datetime_cols:
 509        if debug:
 510            dprint("No columns detected as datetimes, returning...")
 511        return df.fillna(pandas.NA)
 512
 513    if debug:
 514        dprint("Converting columns to datetimes: " + str(datetime_cols))
 515
 516    try:
 517        if not using_dask:
 518            df[datetime_cols] = df[datetime_cols].apply(
 519                pd.to_datetime,
 520                utc=True,
 521                format='ISO8601',
 522            )
 523        else:
 524            df[datetime_cols] = df[datetime_cols].apply(
 525                pd.to_datetime,
 526                utc=True,
 527                axis=1,
 528                meta={
 529                    col: 'datetime64[ns, UTC]'
 530                    for col in datetime_cols
 531                }
 532            )
 533    except Exception:
 534        warn(
 535            f"Unable to apply `pd.to_datetime` to {items_str(datetime_cols)}:\n"
 536            + f"{traceback.format_exc()}"
 537        )
 538
 539    if strip_timezone:
 540        for dt in datetime_cols:
 541            try:
 542                df[dt] = df[dt].dt.tz_localize(None)
 543            except Exception:
 544                warn(
 545                    f"Unable to convert column '{dt}' to naive datetime:\n"
 546                    + f"{traceback.format_exc()}"
 547                )
 548
 549    return df.fillna(pandas.NA)
 550
 551
 552def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]:
 553    """
 554    Get the columns which contain unhashable objects from a Pandas DataFrame.
 555
 556    Parameters
 557    ----------
 558    df: pd.DataFrame
 559        The DataFrame which may contain unhashable objects.
 560
 561    Returns
 562    -------
 563    A list of columns.
 564    """
 565    if df is None:
 566        return []
 567    if len(df) == 0:
 568        return []
 569
 570    is_dask = 'dask' in df.__module__
 571    if is_dask:
 572        from meerschaum.utils.packages import attempt_import
 573        pandas = attempt_import('pandas')
 574        df = pandas.DataFrame(get_first_valid_dask_partition(df))
 575    return [
 576        col for col, val in df.iloc[0].items()
 577        if not isinstance(val, Hashable)
 578    ]
 579
 580
 581def get_json_cols(df: 'pd.DataFrame') -> List[str]:
 582    """
 583    Get the columns which contain unhashable objects from a Pandas DataFrame.
 584
 585    Parameters
 586    ----------
 587    df: pd.DataFrame
 588        The DataFrame which may contain unhashable objects.
 589
 590    Returns
 591    -------
 592    A list of columns to be encoded as JSON.
 593    """
 594    if df is None:
 595        return []
 596
 597    is_dask = 'dask' in df.__module__ if hasattr(df, '__module__') else False
 598    if is_dask:
 599        df = get_first_valid_dask_partition(df)
 600
 601    if len(df) == 0:
 602        return []
 603
 604    cols_indices = {
 605        col: df[col].first_valid_index()
 606        for col in df.columns
 607    }
 608    return [
 609        col
 610        for col, ix in cols_indices.items()
 611        if (
 612            ix is not None
 613            and
 614            not isinstance(df.loc[ix][col], Hashable)
 615        )
 616    ]
 617
 618
 619def get_numeric_cols(df: 'pd.DataFrame') -> List[str]:
 620    """
 621    Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame.
 622
 623    Parameters
 624    ----------
 625    df: pd.DataFrame
 626        The DataFrame which may contain decimal objects.
 627
 628    Returns
 629    -------
 630    A list of columns to treat as numerics.
 631    """
 632    if df is None:
 633        return []
 634    from decimal import Decimal
 635    is_dask = 'dask' in df.__module__
 636    if is_dask:
 637        df = get_first_valid_dask_partition(df)
 638
 639    if len(df) == 0:
 640        return []
 641
 642    cols_indices = {
 643        col: df[col].first_valid_index()
 644        for col in df.columns
 645    }
 646    return [
 647        col
 648        for col, ix in cols_indices.items()
 649        if (
 650            ix is not None
 651            and
 652            isinstance(df.loc[ix][col], Decimal)
 653        )
 654    ]
 655
 656
 657def get_uuid_cols(df: 'pd.DataFrame') -> List[str]:
 658    """
 659    Get the columns which contain `uuid.UUID` objects from a Pandas DataFrame.
 660
 661    Parameters
 662    ----------
 663    df: pd.DataFrame
 664        The DataFrame which may contain UUID objects.
 665
 666    Returns
 667    -------
 668    A list of columns to treat as numerics.
 669    """
 670    if df is None:
 671        return []
 672    from uuid import UUID
 673    is_dask = 'dask' in df.__module__
 674    if is_dask:
 675        df = get_first_valid_dask_partition(df)
 676
 677    if len(df) == 0:
 678        return []
 679
 680    cols_indices = {
 681        col: df[col].first_valid_index()
 682        for col in df.columns
 683    }
 684    return [
 685        col
 686        for col, ix in cols_indices.items()
 687        if (
 688            ix is not None
 689            and
 690            isinstance(df.loc[ix][col], UUID)
 691        )
 692    ]
 693
 694
 695def enforce_dtypes(
 696    df: 'pd.DataFrame',
 697    dtypes: Dict[str, str],
 698    safe_copy: bool = True,
 699    coerce_numeric: bool = True,
 700    coerce_timezone: bool = True,
 701    strip_timezone: bool = False,
 702    debug: bool = False,
 703) -> 'pd.DataFrame':
 704    """
 705    Enforce the `dtypes` dictionary on a DataFrame.
 706
 707    Parameters
 708    ----------
 709    df: pd.DataFrame
 710        The DataFrame on which to enforce dtypes.
 711
 712    dtypes: Dict[str, str]
 713        The data types to attempt to enforce on the DataFrame.
 714
 715    safe_copy: bool, default True
 716        If `True`, create a copy before comparing and modifying the dataframes.
 717        Setting to `False` may mutate the DataFrames.
 718        See `meerschaum.utils.dataframe.filter_unseen_df`.
 719
 720    coerce_numeric: bool, default True
 721        If `True`, convert float and int collisions to numeric.
 722
 723    coerce_timezone: bool, default True
 724        If `True`, convert datetimes to UTC.
 725
 726    strip_timezone: bool, default False
 727        If `coerce_timezone` and `strip_timezone` are `True`,
 728        remove timezone information from datetimes.
 729
 730    debug: bool, default False
 731        Verbosity toggle.
 732
 733    Returns
 734    -------
 735    The Pandas DataFrame with the types enforced.
 736    """
 737    import json
 738    from meerschaum.utils.debug import dprint
 739    from meerschaum.utils.formatting import pprint
 740    from meerschaum.utils.dtypes import (
 741        are_dtypes_equal,
 742        to_pandas_dtype,
 743        is_dtype_numeric,
 744        attempt_cast_to_numeric,
 745        attempt_cast_to_uuid,
 746        coerce_timezone as _coerce_timezone,
 747    )
 748    pandas = mrsm.attempt_import('pandas')
 749    is_dask = 'dask' in df.__module__
 750    if safe_copy:
 751        df = df.copy()
 752    if len(df.columns) == 0:
 753        if debug:
 754            dprint("Incoming DataFrame has no columns. Skipping enforcement...")
 755        return df
 756
 757    pipe_pandas_dtypes = {
 758        col: to_pandas_dtype(typ)
 759        for col, typ in dtypes.items()
 760    }
 761    json_cols = [
 762        col
 763        for col, typ in dtypes.items()
 764        if typ == 'json'
 765    ]
 766    numeric_cols = [
 767        col
 768        for col, typ in dtypes.items()
 769        if typ == 'numeric'
 770    ]
 771    uuid_cols = [
 772        col
 773        for col, typ in dtypes.items()
 774        if typ == 'uuid'
 775    ]
 776    datetime_cols = [
 777        col
 778        for col, typ in dtypes.items()
 779        if are_dtypes_equal(typ, 'datetime')
 780    ]
 781    df_numeric_cols = get_numeric_cols(df)
 782    if debug:
 783        dprint("Desired data types:")
 784        pprint(dtypes)
 785        dprint("Data types for incoming DataFrame:")
 786        pprint({_col: str(_typ) for _col, _typ in df.dtypes.items()})
 787
 788    if json_cols and len(df) > 0:
 789        if debug:
 790            dprint(f"Checking columns for JSON encoding: {json_cols}")
 791        for col in json_cols:
 792            if col in df.columns:
 793                try:
 794                    df[col] = df[col].apply(
 795                        (
 796                            lambda x: (
 797                                json.loads(x)
 798                                if isinstance(x, str)
 799                                else x
 800                            )
 801                        )
 802                    )
 803                except Exception as e:
 804                    if debug:
 805                        dprint(f"Unable to parse column '{col}' as JSON:\n{e}")
 806
 807    if numeric_cols:
 808        if debug:
 809            dprint(f"Checking for numerics: {numeric_cols}")
 810        for col in numeric_cols:
 811            if col in df.columns:
 812                try:
 813                    df[col] = df[col].apply(attempt_cast_to_numeric)
 814                except Exception as e:
 815                    if debug:
 816                        dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}")
 817
 818    if uuid_cols:
 819        if debug:
 820            dprint(f"Checking for UUIDs: {uuid_cols}")
 821        for col in uuid_cols:
 822            if col in df.columns:
 823                try:
 824                    df[col] = df[col].apply(attempt_cast_to_uuid)
 825                except Exception as e:
 826                    if debug:
 827                        dprint(f"Unable to parse column '{col}' as UUID:\n{e}")
 828
 829    if datetime_cols and coerce_timezone:
 830        if debug:
 831            dprint(f"Checking for datetime conversion: {datetime_cols}")
 832        for col in datetime_cols:
 833            if col in df.columns:
 834                df[col] = _coerce_timezone(df[col], strip_utc=strip_timezone)
 835
 836    df_dtypes = {c: str(t) for c, t in df.dtypes.items()}
 837    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
 838        if debug:
 839            dprint("Data types match. Exiting enforcement...")
 840        return df
 841
 842    common_dtypes = {}
 843    common_diff_dtypes = {}
 844    for col, typ in pipe_pandas_dtypes.items():
 845        if col in df_dtypes:
 846            common_dtypes[col] = typ
 847            if not are_dtypes_equal(typ, df_dtypes[col]):
 848                common_diff_dtypes[col] = df_dtypes[col]
 849
 850    if debug:
 851        dprint("Common columns with different dtypes:")
 852        pprint(common_diff_dtypes)
 853
 854    detected_dt_cols = {}
 855    for col, typ in common_diff_dtypes.items():
 856        if 'datetime' in typ and 'datetime' in common_dtypes[col]:
 857            df_dtypes[col] = typ
 858            detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col])
 859    for col in detected_dt_cols:
 860        del common_diff_dtypes[col]
 861
 862    if debug:
 863        dprint("Common columns with different dtypes (after dates):")
 864        pprint(common_diff_dtypes)
 865
 866    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
 867        if debug:
 868            dprint(
 869                "The incoming DataFrame has mostly the same types, skipping enforcement."
 870                + "The only detected difference was in the following datetime columns."
 871            )
 872            pprint(detected_dt_cols)
 873        return df
 874
 875    for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items():
 876        previous_typ = common_dtypes[col]
 877        mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ))
 878        explicitly_float = are_dtypes_equal(dtypes.get(col, 'object'), 'float')
 879        explicitly_numeric = dtypes.get(col, 'numeric') == 'numeric'
 880        cast_to_numeric = (
 881            explicitly_numeric
 882            or col in df_numeric_cols
 883            or (mixed_numeric_types and not explicitly_float)
 884        ) and coerce_numeric
 885        if cast_to_numeric:
 886            common_dtypes[col] = attempt_cast_to_numeric
 887            common_diff_dtypes[col] = attempt_cast_to_numeric
 888
 889    for d in common_diff_dtypes:
 890        t = common_dtypes[d]
 891        if debug:
 892            dprint(f"Casting column {d} to dtype {t}.")
 893        try:
 894            df[d] = (
 895                df[d].apply(t)
 896                if callable(t)
 897                else df[d].astype(t)
 898            )
 899        except Exception as e:
 900            if debug:
 901                dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}")
 902            if 'int' in str(t).lower():
 903                try:
 904                    df[d] = df[d].astype('float64').astype(t)
 905                except Exception:
 906                    if debug:
 907                        dprint(f"Was unable to convert to float then {t}.")
 908    return df
 909
 910
 911def get_datetime_bound_from_df(
 912    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]],
 913    datetime_column: str,
 914    minimum: bool = True,
 915) -> Union[int, datetime, None]:
 916    """
 917    Return the minimum or maximum datetime (or integer) from a DataFrame.
 918
 919    Parameters
 920    ----------
 921    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]
 922        The DataFrame, list, or dict which contains the range axis.
 923
 924    datetime_column: str
 925        The name of the datetime (or int) column.
 926
 927    minimum: bool
 928        Whether to return the minimum (default) or maximum value.
 929
 930    Returns
 931    -------
 932    The minimum or maximum datetime value in the dataframe, or `None`.
 933    """
 934    if df is None:
 935        return None
 936    if not datetime_column:
 937        return None
 938
 939    def compare(a, b):
 940        if a is None:
 941            return b
 942        if b is None:
 943            return a
 944        if minimum:
 945            return a if a < b else b
 946        return a if a > b else b
 947
 948    if isinstance(df, list):
 949        if len(df) == 0:
 950            return None
 951        best_yet = df[0].get(datetime_column, None)
 952        for doc in df:
 953            val = doc.get(datetime_column, None)
 954            best_yet = compare(best_yet, val)
 955        return best_yet
 956
 957    if isinstance(df, dict):
 958        if datetime_column not in df:
 959            return None
 960        best_yet = df[datetime_column][0]
 961        for val in df[datetime_column]:
 962            best_yet = compare(best_yet, val)
 963        return best_yet
 964
 965    if 'DataFrame' in str(type(df)):
 966        from meerschaum.utils.dtypes import are_dtypes_equal
 967        pandas = mrsm.attempt_import('pandas')
 968        is_dask = 'dask' in df.__module__
 969
 970        if datetime_column not in df.columns:
 971            return None
 972
 973        try:
 974            dt_val = (
 975                df[datetime_column].min(skipna=True)
 976                if minimum
 977                else df[datetime_column].max(skipna=True)
 978            )
 979        except Exception:
 980            dt_val = pandas.NA
 981        if is_dask and dt_val is not None and dt_val is not pandas.NA:
 982            dt_val = dt_val.compute()
 983
 984        return (
 985            pandas.to_datetime(dt_val).to_pydatetime()
 986            if are_dtypes_equal(str(type(dt_val)), 'datetime')
 987            else (dt_val if dt_val is not pandas.NA else None)
 988        )
 989
 990    return None
 991
 992
 993def get_unique_index_values(
 994    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]],
 995    indices: List[str],
 996) -> Dict[str, List[Any]]:
 997    """
 998    Return a dictionary of the unique index values in a DataFrame.
 999
1000    Parameters
1001    ----------
1002    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]
1003        The dataframe (or list or dict) which contains index values.
1004
1005    indices: List[str]
1006        The list of index columns.
1007
1008    Returns
1009    -------
1010    A dictionary mapping indices to unique values.
1011    """
1012    if df is None:
1013        return {}
1014    if 'dataframe' in str(type(df)).lower():
1015        pandas = mrsm.attempt_import('pandas')
1016        return {
1017            col: list({
1018                (val if val is not pandas.NA else None)
1019                for val in df[col].unique()
1020            })
1021            for col in indices
1022            if col in df.columns
1023        }
1024
1025    unique_indices = defaultdict(lambda: set())
1026    if isinstance(df, list):
1027        for doc in df:
1028            for index in indices:
1029                if index in doc:
1030                    unique_indices[index].add(doc[index])
1031
1032    elif isinstance(df, dict):
1033        for index in indices:
1034            if index in df:
1035                unique_indices[index] = unique_indices[index].union(set(df[index]))
1036
1037    return {key: list(val) for key, val in unique_indices.items()}
1038
1039
1040def df_is_chunk_generator(df: Any) -> bool:
1041    """
1042    Determine whether to treat `df` as a chunk generator.
1043
1044    Note this should only be used in a context where generators are expected,
1045    as it will return `True` for any iterable.
1046
1047    Parameters
1048    ----------
1049    The DataFrame or chunk generator to evaluate.
1050
1051    Returns
1052    -------
1053    A `bool` indicating whether to treat `df` as a generator.
1054    """
1055    return (
1056        not isinstance(df, (dict, list, str))
1057        and 'DataFrame' not in str(type(df))
1058        and isinstance(df, (Generator, Iterable, Iterator))
1059    )
1060
1061
1062def chunksize_to_npartitions(chunksize: Optional[int]) -> int:
1063    """
1064    Return the Dask `npartitions` value for a given `chunksize`.
1065    """
1066    if chunksize == -1:
1067        from meerschaum.config import get_config
1068        chunksize = get_config('system', 'connectors', 'sql', 'chunksize')
1069    if chunksize is None:
1070        return 1
1071    return -1 * chunksize
1072
1073
1074def df_from_literal(
1075    pipe: Optional[mrsm.Pipe] = None,
1076    literal: str = None,
1077    debug: bool = False
1078) -> 'pd.DataFrame':
1079    """
1080    Construct a dataframe from a literal value, using the pipe's datetime and value column names.
1081
1082    Parameters
1083    ----------
1084    pipe: Optional['meerschaum.Pipe'], default None
1085        The pipe which will consume the literal value.
1086
1087    Returns
1088    -------
1089    A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
1090    and the literal as the value.
1091    """
1092    from meerschaum.utils.packages import import_pandas
1093    from meerschaum.utils.warnings import error, warn
1094    from meerschaum.utils.debug import dprint
1095
1096    if pipe is None or literal is None:
1097        error("Please provide a Pipe and a literal value")
1098    ### this will raise an error if the columns are undefined
1099    dt_name, val_name = pipe.get_columns('datetime', 'value')
1100
1101    val = literal
1102    if isinstance(literal, str):
1103        if debug:
1104            dprint(f"Received literal string: '{literal}'")
1105        import ast
1106        try:
1107            val = ast.literal_eval(literal)
1108        except Exception as e:
1109            warn(
1110                "Failed to parse value from string:\n" + f"{literal}" +
1111                "\n\nWill cast as a string instead."\
1112            )
1113            val = literal
1114
1115    from datetime import datetime, timezone
1116    now = datetime.now(timezone.utc).replace(tzinfo=None)
1117
1118    pd = import_pandas()
1119    return pd.DataFrame({dt_name: [now], val_name: [val]})
1120
1121
1122def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]:
1123    """
1124    Return the first valid Dask DataFrame partition (if possible).
1125    """
1126    pdf = None
1127    for partition in ddf.partitions:
1128        try:
1129            pdf = partition.compute()
1130        except Exception as e:
1131            continue
1132        if len(pdf) > 0:
1133            return pdf
1134    _ = mrsm.attempt_import('partd', lazy=False)
1135    return ddf.compute()
1136
1137
1138def query_df(
1139    df: 'pd.DataFrame',
1140    params: Optional[Dict[str, Any]] = None,
1141    begin: Union[datetime, int, None] = None,
1142    end: Union[datetime, int, None] = None,
1143    datetime_column: Optional[str] = None,
1144    select_columns: Optional[List[str]] = None,
1145    omit_columns: Optional[List[str]] = None,
1146    inplace: bool = False,
1147    reset_index: bool = False,
1148    coerce_types: bool = False,
1149    debug: bool = False,
1150) -> 'pd.DataFrame':
1151    """
1152    Query the dataframe with the params dictionary.
1153
1154    Parameters
1155    ----------
1156    df: pd.DataFrame
1157        The DataFrame to query against.
1158
1159    params: Optional[Dict[str, Any]], default None
1160        The parameters dictionary to use for the query.
1161
1162    begin: Union[datetime, int, None], default None
1163        If `begin` and `datetime_column` are provided, only return rows with a timestamp
1164        greater than or equal to this value.
1165
1166    end: Union[datetime, int, None], default None
1167        If `begin` and `datetime_column` are provided, only return rows with a timestamp
1168        less than this value.
1169
1170    datetime_column: Optional[str], default None
1171        A `datetime_column` must be provided to use `begin` and `end`.
1172
1173    select_columns: Optional[List[str]], default None
1174        If provided, only return these columns.
1175
1176    omit_columns: Optional[List[str]], default None
1177        If provided, do not include these columns in the result.
1178
1179    inplace: bool, default False
1180        If `True`, modify the DataFrame inplace rather than creating a new DataFrame.
1181
1182    reset_index: bool, default False
1183        If `True`, reset the index in the resulting DataFrame.
1184
1185    coerce_types: bool, default False
1186        If `True`, cast the dataframe and parameters as strings before querying.
1187
1188    Returns
1189    -------
1190    A Pandas DataFrame query result.
1191    """
1192
1193    def _process_select_columns(_df):
1194        if not select_columns:
1195            return
1196        for col in list(_df.columns):
1197            if col not in select_columns:
1198                del _df[col]
1199
1200    def _process_omit_columns(_df):
1201        if not omit_columns:
1202            return
1203        for col in list(_df.columns):
1204            if col in omit_columns:
1205                del _df[col]
1206
1207    if not params and not begin and not end:
1208        if not inplace:
1209            df = df.copy()
1210        _process_select_columns(df)
1211        _process_omit_columns(df)
1212        return df
1213
1214    from meerschaum.utils.debug import dprint
1215    from meerschaum.utils.misc import get_in_ex_params
1216    from meerschaum.utils.warnings import warn
1217    from meerschaum.utils.dtypes import are_dtypes_equal, value_is_null
1218    dateutil_parser = mrsm.attempt_import('dateutil.parser')
1219    pandas = mrsm.attempt_import('pandas')
1220    NA = pandas.NA
1221
1222    if params:
1223        params = params.copy()
1224        for key, val in {k: v for k, v in params.items()}.items():
1225            if isinstance(val, (list, tuple)):
1226                if None in val:
1227                    val = [item for item in val if item is not None] + [NA]
1228                    params[key] = val
1229                if coerce_types:
1230                    params[key] = [str(x) for x in val]
1231            else:
1232                if value_is_null(val):
1233                    val = NA
1234                    params[key] = NA
1235                if coerce_types:
1236                    params[key] = str(val)
1237
1238    dtypes = {col: str(typ) for col, typ in df.dtypes.items()}
1239
1240    if inplace:
1241        df.fillna(NA, inplace=True)
1242    else:
1243        df = df.infer_objects().fillna(NA)
1244
1245    if isinstance(begin, str):
1246        begin = dateutil_parser.parse(begin)
1247    if isinstance(end, str):
1248        end = dateutil_parser.parse(end)
1249
1250    if begin is not None or end is not None:
1251        if not datetime_column or datetime_column not in df.columns:
1252            warn(
1253                f"The datetime column '{datetime_column}' is not present in the Dataframe, "
1254                + "ignoring begin and end...",
1255            )
1256            begin, end = None, None
1257
1258    if debug:
1259        dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}")
1260
1261    if datetime_column and (begin is not None or end is not None):
1262        if debug:
1263            dprint("Checking for datetime column compatability.")
1264
1265        from meerschaum.utils.dtypes import coerce_timezone
1266        df_is_dt = are_dtypes_equal(str(df.dtypes[datetime_column]), 'datetime')
1267        begin_is_int = are_dtypes_equal(str(type(begin)), 'int')
1268        end_is_int = are_dtypes_equal(str(type(end)), 'int')
1269
1270        if df_is_dt:
1271            df_tz = (
1272                getattr(df[datetime_column].dt, 'tz', None)
1273                if hasattr(df[datetime_column], 'dt')
1274                else None
1275            )
1276
1277            if begin_is_int:
1278                begin = datetime.fromtimestamp(int(begin), timezone.utc).replace(tzinfo=None)
1279                if debug:
1280                    dprint(f"`begin` will be cast to '{begin}'.")
1281            if end_is_int:
1282                end = datetime.fromtimestamp(int(end), timezone.utc).replace(tzinfo=None)
1283                if debug:
1284                    dprint(f"`end` will be cast to '{end}'.")
1285
1286            begin = coerce_timezone(begin, strip_utc=(df_tz is None)) if begin is not None else None
1287            end = coerce_timezone(end, strip_utc=(df_tz is None)) if begin is not None else None
1288
1289    in_ex_params = get_in_ex_params(params)
1290
1291    masks = [
1292        (
1293            (df[datetime_column] >= begin)
1294            if begin is not None and datetime_column
1295            else True
1296        ) & (
1297            (df[datetime_column] < end)
1298            if end is not None and datetime_column
1299            else True
1300        )
1301    ]
1302
1303    masks.extend([
1304        (
1305            (
1306                (df[col] if not coerce_types else df[col].astype(str)).isin(in_vals)
1307                if in_vals
1308                else True
1309            ) & (
1310                ~(df[col] if not coerce_types else df[col].astype(str)).isin(ex_vals)
1311                if ex_vals
1312                else True
1313            )
1314        )
1315        for col, (in_vals, ex_vals) in in_ex_params.items()
1316        if col in df.columns
1317    ])
1318    query_mask = masks[0]
1319    for mask in masks[1:]:
1320        query_mask = query_mask & mask
1321
1322    original_cols = df.columns
1323
1324    ### NOTE: We must cast bool columns to `boolean[pyarrow]`
1325    ###       to allow for `<NA>` values.
1326    bool_cols = [
1327        col
1328        for col, typ in df.dtypes.items()
1329        if are_dtypes_equal(str(typ), 'bool')
1330    ]
1331    for col in bool_cols:
1332        df[col] = df[col].astype('boolean[pyarrow]')
1333
1334    if not isinstance(query_mask, bool):
1335        df['__mrsm_mask'] = (
1336            query_mask.astype('boolean[pyarrow]')
1337            if hasattr(query_mask, 'astype')
1338            else query_mask
1339        )
1340
1341        if inplace:
1342            df.where(query_mask, other=NA, inplace=True)
1343            df.dropna(how='all', inplace=True)
1344            result_df = df
1345        else:
1346            result_df = df.where(query_mask, other=NA)
1347            result_df.dropna(how='all', inplace=True)
1348
1349    else:
1350        result_df = df
1351
1352    if '__mrsm_mask' in df.columns:
1353        del df['__mrsm_mask']
1354    if '__mrsm_mask' in result_df.columns:
1355        del result_df['__mrsm_mask']
1356
1357    if reset_index:
1358        result_df.reset_index(drop=True, inplace=True)
1359
1360    result_df = enforce_dtypes(
1361        result_df,
1362        dtypes,
1363        safe_copy=False,
1364        debug=debug,
1365        coerce_numeric=False,
1366        coerce_timezone=False,
1367    )
1368
1369    if select_columns == ['*']:
1370        select_columns = None
1371
1372    if not select_columns and not omit_columns:
1373        return result_df[original_cols]
1374
1375    _process_select_columns(result_df)
1376    _process_omit_columns(result_df)
1377
1378    return result_df
1379
1380
1381def to_json(
1382    df: 'pd.DataFrame',
1383    safe_copy: bool = True,
1384    orient: str = 'records',
1385    date_format: str = 'iso',
1386    date_unit: str = 'us',
1387    **kwargs: Any
1388) -> str:
1389    """
1390    Serialize the given dataframe as a JSON string.
1391
1392    Parameters
1393    ----------
1394    df: pd.DataFrame
1395        The DataFrame to be serialized.
1396
1397    safe_copy: bool, default True
1398        If `False`, modify the DataFrame inplace.
1399
1400    date_format: str, default 'iso'
1401        The default format for timestamps.
1402
1403    date_unit: str, default 'us'
1404        The precision of the timestamps.
1405
1406    Returns
1407    -------
1408    A JSON string.
1409    """
1410    from meerschaum.utils.packages import import_pandas
1411    pd = import_pandas()
1412    uuid_cols = get_uuid_cols(df)
1413    if uuid_cols and safe_copy:
1414        df = df.copy()
1415    for col in uuid_cols:
1416        df[col] = df[col].astype(str)
1417    return df.infer_objects(copy=False).fillna(pd.NA).to_json(
1418        date_format=date_format,
1419        date_unit=date_unit,
1420        orient=orient,
1421        **kwargs
1422    )
def add_missing_cols_to_df( df: pandas.core.frame.DataFrame, dtypes: Dict[str, Any]) -> pandas.core.frame.DataFrame:
24def add_missing_cols_to_df(
25    df: 'pd.DataFrame',
26    dtypes: Dict[str, Any],
27) -> 'pd.DataFrame':
28    """
29    Add columns from the dtypes dictionary as null columns to a new DataFrame.
30
31    Parameters
32    ----------
33    df: pd.DataFrame
34        The dataframe we should copy and add null columns.
35
36    dtypes:
37        The data types dictionary which may contain keys not present in `df.columns`.
38
39    Returns
40    -------
41    A new `DataFrame` with the keys from `dtypes` added as null columns.
42    If `df.dtypes` is the same as `dtypes`, then return a reference to `df`.
43    NOTE: This will not ensure that dtypes are enforced!
44
45    Examples
46    --------
47    >>> import pandas as pd
48    >>> df = pd.DataFrame([{'a': 1}])
49    >>> dtypes = {'b': 'Int64'}
50    >>> add_missing_cols_to_df(df, dtypes)
51          a  b
52       0  1  <NA>
53    >>> add_missing_cols_to_df(df, dtypes).dtypes
54    a    int64
55    b    Int64
56    dtype: object
57    >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes
58    a    int64
59    dtype: object
60    >>> 
61    """
62    if set(df.columns) == set(dtypes):
63        return df
64
65    from meerschaum.utils.packages import attempt_import
66    from meerschaum.utils.dtypes import to_pandas_dtype
67    pandas = attempt_import('pandas')
68
69    def build_series(dtype: str):
70        return pandas.Series([], dtype=to_pandas_dtype(dtype))
71
72    assign_kwargs = {
73        str(col): build_series(str(typ))
74        for col, typ in dtypes.items()
75        if col not in df.columns
76    }
77    df_with_cols = df.assign(**assign_kwargs)
78    for col in assign_kwargs:
79        df_with_cols[col] = df_with_cols[col].fillna(pandas.NA)
80    return df_with_cols

Add columns from the dtypes dictionary as null columns to a new DataFrame.

Parameters
  • df (pd.DataFrame): The dataframe we should copy and add null columns.
  • dtypes:: The data types dictionary which may contain keys not present in df.columns.
Returns
  • A new DataFrame with the keys from dtypes added as null columns.
  • If df.dtypes is the same as dtypes, then return a reference to df.
  • NOTE (This will not ensure that dtypes are enforced!):
Examples
>>> import pandas as pd
>>> df = pd.DataFrame([{'a': 1}])
>>> dtypes = {'b': 'Int64'}
>>> add_missing_cols_to_df(df, dtypes)
      a  b
   0  1  <NA>
>>> add_missing_cols_to_df(df, dtypes)meerschaum.utils.dtypes
a    int64
b    Int64
dtype: object
>>> add_missing_cols_to_df(df, {'a': 'object'})meerschaum.utils.dtypes
a    int64
dtype: object
>>>
def filter_unseen_df( old_df: pandas.core.frame.DataFrame, new_df: pandas.core.frame.DataFrame, safe_copy: bool = True, dtypes: Optional[Dict[str, Any]] = None, include_unchanged_columns: bool = False, debug: bool = False) -> pandas.core.frame.DataFrame:
 83def filter_unseen_df(
 84    old_df: 'pd.DataFrame',
 85    new_df: 'pd.DataFrame',
 86    safe_copy: bool = True,
 87    dtypes: Optional[Dict[str, Any]] = None,
 88    include_unchanged_columns: bool = False,
 89    debug: bool = False,
 90) -> 'pd.DataFrame':
 91    """
 92    Left join two DataFrames to find the newest unseen data.
 93
 94    Parameters
 95    ----------
 96    old_df: 'pd.DataFrame'
 97        The original (target) dataframe. Acts as a filter on the `new_df`.
 98
 99    new_df: 'pd.DataFrame'
100        The fetched (source) dataframe. Rows that are contained in `old_df` are removed.
101
102    safe_copy: bool, default True
103        If `True`, create a copy before comparing and modifying the dataframes.
104        Setting to `False` may mutate the DataFrames.
105
106    dtypes: Optional[Dict[str, Any]], default None
107        Optionally specify the datatypes of the dataframe.
108
109    include_unchanged_columns: bool, default False
110        If `True`, include columns which haven't changed on rows which have changed.
111
112    debug: bool, default False
113        Verbosity toggle.
114
115    Returns
116    -------
117    A pandas dataframe of the new, unseen rows in `new_df`.
118
119    Examples
120    --------
121    ```python
122    >>> import pandas as pd
123    >>> df1 = pd.DataFrame({'a': [1,2]})
124    >>> df2 = pd.DataFrame({'a': [2,3]})
125    >>> filter_unseen_df(df1, df2)
126       a
127    0  3
128
129    ```
130
131    """
132    if old_df is None:
133        return new_df
134
135    if safe_copy:
136        old_df = old_df.copy()
137        new_df = new_df.copy()
138
139    import json
140    import functools
141    import traceback
142    from decimal import Decimal
143    from uuid import UUID
144    from meerschaum.utils.warnings import warn
145    from meerschaum.utils.packages import import_pandas, attempt_import
146    from meerschaum.utils.dtypes import (
147        to_pandas_dtype,
148        are_dtypes_equal,
149        attempt_cast_to_numeric,
150        attempt_cast_to_uuid,
151        coerce_timezone,
152    )
153    pd = import_pandas(debug=debug)
154    is_dask = 'dask' in new_df.__module__
155    if is_dask:
156        pandas = attempt_import('pandas')
157        _ = attempt_import('partd', lazy=False)
158        dd = attempt_import('dask.dataframe')
159        merge = dd.merge
160        NA = pandas.NA
161    else:
162        merge = pd.merge
163        NA = pd.NA
164
165    new_df_dtypes = dict(new_df.dtypes)
166    old_df_dtypes = dict(old_df.dtypes)
167
168    same_cols = set(new_df.columns) == set(old_df.columns)
169    if not same_cols:
170        new_df = add_missing_cols_to_df(new_df, old_df_dtypes)
171        old_df = add_missing_cols_to_df(old_df, new_df_dtypes)
172
173        new_types_missing_from_old = {
174            col: typ
175            for col, typ in new_df_dtypes.items()
176            if col not in old_df_dtypes
177        }
178        old_types_missing_from_new = {
179            col: typ
180            for col, typ in new_df_dtypes.items()
181            if col not in old_df_dtypes
182        }
183        old_df_dtypes.update(new_types_missing_from_old)
184        new_df_dtypes.update(old_types_missing_from_new)
185
186    ### Edge case: two empty lists cast to DFs.
187    elif len(new_df.columns) == 0:
188        return new_df
189
190    try:
191        ### Order matters when checking equality.
192        new_df = new_df[old_df.columns]
193
194    except Exception as e:
195        warn(
196            "Was not able to cast old columns onto new DataFrame. " +
197            f"Are both DataFrames the same shape? Error:\n{e}",
198            stacklevel=3,
199        )
200        return new_df[list(new_df_dtypes.keys())]
201
202    ### assume the old_df knows what it's doing, even if it's technically wrong.
203    if dtypes is None:
204        dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()}
205
206    dtypes = {
207        col: to_pandas_dtype(typ)
208        for col, typ in dtypes.items()
209        if col in new_df_dtypes and col in old_df_dtypes
210    }
211    for col, typ in new_df_dtypes.items():
212        if col not in dtypes:
213            dtypes[col] = typ
214
215    dt_dtypes = {
216        col: typ
217        for col, typ in dtypes.items()
218        if are_dtypes_equal(typ, 'datetime')
219    }
220    non_dt_dtypes = {
221        col: typ
222        for col, typ in dtypes.items()
223        if col not in dt_dtypes
224    }
225
226    cast_non_dt_cols = True
227    try:
228        new_df = new_df.astype(non_dt_dtypes)
229        cast_non_dt_cols = False
230    except Exception as e:
231        warn(
232            f"Was not able to cast the new DataFrame to the given dtypes.\n{e}"
233        )
234
235    cast_dt_cols = True
236    try:
237        for col, typ in dt_dtypes.items():
238            strip_utc = (
239                (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]'
240            )
241            if col in old_df.columns:
242                old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc)
243            if col in new_df.columns:
244                new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc)
245        cast_dt_cols = False
246    except Exception as e:
247        warn(f"Could not cast datetime columns:\n{e}")
248
249    cast_cols = cast_dt_cols or cast_non_dt_cols
250
251    new_numeric_cols_existing = get_numeric_cols(new_df)
252    old_numeric_cols = get_numeric_cols(old_df)
253    for col, typ in {k: v for k, v in dtypes.items()}.items():
254        if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')):
255            new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float')
256            new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int')
257            new_is_numeric = col in new_numeric_cols_existing
258            old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float')
259            old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int')
260            old_is_numeric = col in old_numeric_cols
261
262            if (
263                (new_is_float or new_is_int or new_is_numeric)
264                and
265                (old_is_float or old_is_int or old_is_numeric)
266            ):
267                dtypes[col] = attempt_cast_to_numeric
268                cast_cols = True
269                continue
270
271            ### Fallback to object if the types don't match.
272            warn(
273                f"Detected different types for '{col}' "
274                + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), "
275                + "falling back to 'object'..."
276            )
277            dtypes[col] = 'object'
278            cast_cols = True
279
280    if cast_cols:
281        for col, dtype in dtypes.items():
282            if col in new_df.columns:
283                try:
284                    new_df[col] = (
285                        new_df[col].astype(dtype)
286                        if not callable(dtype)
287                        else new_df[col].apply(dtype)
288                    )
289                except Exception as e:
290                    warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}")
291
292    serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str)
293    new_json_cols = get_json_cols(new_df)
294    old_json_cols = get_json_cols(old_df)
295    json_cols = set(new_json_cols + old_json_cols)
296    for json_col in old_json_cols:
297        old_df[json_col] = old_df[json_col].apply(serializer)
298    for json_col in new_json_cols:
299        new_df[json_col] = new_df[json_col].apply(serializer)
300
301    new_numeric_cols = get_numeric_cols(new_df)
302    numeric_cols = set(new_numeric_cols + old_numeric_cols)
303    for numeric_col in old_numeric_cols:
304        old_df[numeric_col] = old_df[numeric_col].apply(
305            lambda x: f'{x:f}' if isinstance(x, Decimal) else x
306        )
307    for numeric_col in new_numeric_cols:
308        new_df[numeric_col] = new_df[numeric_col].apply(
309            lambda x: f'{x:f}' if isinstance(x, Decimal) else x
310        )
311
312    old_dt_cols = [
313        col
314        for col, typ in old_df.dtypes.items()
315        if are_dtypes_equal(str(typ), 'datetime')
316    ]
317    for col in old_dt_cols:
318        strip_utc = (
319            (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]'
320        )
321        old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc)
322
323    new_dt_cols = [
324        col
325        for col, typ in new_df.dtypes.items()
326        if are_dtypes_equal(str(typ), 'datetime')
327    ]
328    for col in new_dt_cols:
329        strip_utc = (
330            (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]'
331        )
332        new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc)
333
334    old_uuid_cols = get_uuid_cols(old_df)
335    new_uuid_cols = get_uuid_cols(new_df)
336    uuid_cols = set(new_uuid_cols + old_uuid_cols)
337    joined_df = merge(
338        new_df.infer_objects(copy=False).fillna(NA),
339        old_df.infer_objects(copy=False).fillna(NA),
340        how='left',
341        on=None,
342        indicator=True,
343    )
344    changed_rows_mask = (joined_df['_merge'] == 'left_only')
345    new_cols = list(new_df_dtypes)
346    delta_df = joined_df[new_cols][changed_rows_mask].reset_index(drop=True)
347
348    for json_col in json_cols:
349        if json_col not in delta_df.columns:
350            continue
351        try:
352            delta_df[json_col] = delta_df[json_col].apply(json.loads)
353        except Exception:
354            warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}")
355
356    for numeric_col in numeric_cols:
357        if numeric_col not in delta_df.columns:
358            continue
359        try:
360            delta_df[numeric_col] = delta_df[numeric_col].apply(attempt_cast_to_numeric)
361        except Exception:
362            warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}")
363
364    for uuid_col in uuid_cols:
365        if uuid_col not in delta_df.columns:
366            continue
367        try:
368            delta_df[uuid_col] = delta_df[uuid_col].apply(attempt_cast_to_uuid)
369        except Exception:
370            warn(f"Unable to parse numeric column '{uuid_col}':\n{traceback.format_exc()}")
371
372    return delta_df

Left join two DataFrames to find the newest unseen data.

Parameters
  • old_df ('pd.DataFrame'): The original (target) dataframe. Acts as a filter on the new_df.
  • new_df ('pd.DataFrame'): The fetched (source) dataframe. Rows that are contained in old_df are removed.
  • safe_copy (bool, default True): If True, create a copy before comparing and modifying the dataframes. Setting to False may mutate the DataFrames.
  • dtypes (Optional[Dict[str, Any]], default None): Optionally specify the datatypes of the dataframe.
  • include_unchanged_columns (bool, default False): If True, include columns which haven't changed on rows which have changed.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pandas dataframe of the new, unseen rows in new_df.
Examples
>>> import pandas as pd
>>> df1 = pd.DataFrame({'a': [1,2]})
>>> df2 = pd.DataFrame({'a': [2,3]})
>>> filter_unseen_df(df1, df2)
   a
0  3
def parse_df_datetimes( df: pandas.core.frame.DataFrame, ignore_cols: Optional[Iterable[str]] = None, strip_timezone: bool = False, chunksize: Optional[int] = None, dtype_backend: str = 'numpy_nullable', debug: bool = False) -> pandas.core.frame.DataFrame:
375def parse_df_datetimes(
376    df: 'pd.DataFrame',
377    ignore_cols: Optional[Iterable[str]] = None,
378    strip_timezone: bool = False,
379    chunksize: Optional[int] = None,
380    dtype_backend: str = 'numpy_nullable',
381    debug: bool = False,
382) -> 'pd.DataFrame':
383    """
384    Parse a pandas DataFrame for datetime columns and cast as datetimes.
385
386    Parameters
387    ----------
388    df: pd.DataFrame
389        The pandas DataFrame to parse.
390
391    ignore_cols: Optional[Iterable[str]], default None
392        If provided, do not attempt to coerce these columns as datetimes.
393
394    strip_timezone: bool, default False
395        If `True`, remove the UTC `tzinfo` property.
396
397    chunksize: Optional[int], default None
398        If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe.
399
400    dtype_backend: str, default 'numpy_nullable'
401        If `df` is not a DataFrame and new one needs to be constructed,
402        use this as the datatypes backend.
403        Accepted values are 'numpy_nullable' and 'pyarrow'.
404
405    debug: bool, default False
406        Verbosity toggle.
407
408    Returns
409    -------
410    A new pandas DataFrame with the determined datetime columns
411    (usually ISO strings) cast as datetimes.
412
413    Examples
414    --------
415    ```python
416    >>> import pandas as pd
417    >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 
418    >>> df.dtypes
419    a    object
420    dtype: object
421    >>> df = parse_df_datetimes(df)
422    >>> df.dtypes
423    a    datetime64[ns]
424    dtype: object
425
426    ```
427
428    """
429    from meerschaum.utils.packages import import_pandas, attempt_import
430    from meerschaum.utils.debug import dprint
431    from meerschaum.utils.warnings import warn
432    from meerschaum.utils.misc import items_str
433    import traceback
434    pd = import_pandas()
435    pandas = attempt_import('pandas')
436    pd_name = pd.__name__
437    using_dask = 'dask' in pd_name
438    df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__)
439    dask_dataframe = None
440    if using_dask or df_is_dask:
441        npartitions = chunksize_to_npartitions(chunksize)
442        dask_dataframe = attempt_import('dask.dataframe')
443
444    ### if df is a dict, build DataFrame
445    if isinstance(df, pandas.DataFrame):
446        pdf = df
447    elif df_is_dask and isinstance(df, dask_dataframe.DataFrame):
448        pdf = get_first_valid_dask_partition(df)
449    else:
450        if debug:
451            dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...")
452
453        if using_dask:
454            if isinstance(df, list):
455                keys = set()
456                for doc in df:
457                    for key in doc:
458                        keys.add(key)
459                df = pd.DataFrame.from_dict(
460                    {
461                        k: [
462                            doc.get(k, None)
463                            for doc in df
464                        ] for k in keys
465                    },
466                    npartitions=npartitions,
467                )
468            elif isinstance(df, dict):
469                df = pd.DataFrame.from_dict(df, npartitions=npartitions)
470            elif 'pandas.core.frame.DataFrame' in str(type(df)):
471                df = pd.from_pandas(df, npartitions=npartitions)
472            else:
473                raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.")
474            pandas = attempt_import('pandas')
475            pdf = get_first_valid_dask_partition(df)
476
477        else:
478            df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend)
479            pdf = df
480
481    ### skip parsing if DataFrame is empty
482    if len(pdf) == 0:
483        if debug:
484            dprint(f"df is empty. Returning original DataFrame without casting datetime columns...")
485        return df
486
487    ignore_cols = set(
488        (ignore_cols or []) + [
489            col
490            for col, dtype in pdf.dtypes.items() 
491            if 'datetime' in str(dtype)
492        ]
493    )
494    cols_to_inspect = [col for col in pdf.columns if col not in ignore_cols]
495
496    if len(cols_to_inspect) == 0:
497        if debug:
498            dprint(f"All columns are ignored, skipping datetime detection...")
499        return df.fillna(pandas.NA)
500
501    ### apply regex to columns to determine which are ISO datetimes
502    iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+'
503    dt_mask = pdf[cols_to_inspect].astype(str).apply(
504        lambda s: s.str.match(iso_dt_regex).all()
505    )
506
507    ### list of datetime column names
508    datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]]
509    if not datetime_cols:
510        if debug:
511            dprint("No columns detected as datetimes, returning...")
512        return df.fillna(pandas.NA)
513
514    if debug:
515        dprint("Converting columns to datetimes: " + str(datetime_cols))
516
517    try:
518        if not using_dask:
519            df[datetime_cols] = df[datetime_cols].apply(
520                pd.to_datetime,
521                utc=True,
522                format='ISO8601',
523            )
524        else:
525            df[datetime_cols] = df[datetime_cols].apply(
526                pd.to_datetime,
527                utc=True,
528                axis=1,
529                meta={
530                    col: 'datetime64[ns, UTC]'
531                    for col in datetime_cols
532                }
533            )
534    except Exception:
535        warn(
536            f"Unable to apply `pd.to_datetime` to {items_str(datetime_cols)}:\n"
537            + f"{traceback.format_exc()}"
538        )
539
540    if strip_timezone:
541        for dt in datetime_cols:
542            try:
543                df[dt] = df[dt].dt.tz_localize(None)
544            except Exception:
545                warn(
546                    f"Unable to convert column '{dt}' to naive datetime:\n"
547                    + f"{traceback.format_exc()}"
548                )
549
550    return df.fillna(pandas.NA)

Parse a pandas DataFrame for datetime columns and cast as datetimes.

Parameters
  • df (pd.DataFrame): The pandas DataFrame to parse.
  • ignore_cols (Optional[Iterable[str]], default None): If provided, do not attempt to coerce these columns as datetimes.
  • strip_timezone (bool, default False): If True, remove the UTC tzinfo property.
  • chunksize (Optional[int], default None): If the pandas implementation is 'dask', use this chunksize for the distributed dataframe.
  • dtype_backend (str, default 'numpy_nullable'): If df is not a DataFrame and new one needs to be constructed, use this as the datatypes backend. Accepted values are 'numpy_nullable' and 'pyarrow'.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A new pandas DataFrame with the determined datetime columns
  • (usually ISO strings) cast as datetimes.
Examples
>>> import pandas as pd
>>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 
>>> df.dtypes
a    object
dtype: object
>>> df = parse_df_datetimes(df)
>>> df.dtypes
a    datetime64[ns]
dtype: object
def get_unhashable_cols(df: pandas.core.frame.DataFrame) -> List[str]:
553def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]:
554    """
555    Get the columns which contain unhashable objects from a Pandas DataFrame.
556
557    Parameters
558    ----------
559    df: pd.DataFrame
560        The DataFrame which may contain unhashable objects.
561
562    Returns
563    -------
564    A list of columns.
565    """
566    if df is None:
567        return []
568    if len(df) == 0:
569        return []
570
571    is_dask = 'dask' in df.__module__
572    if is_dask:
573        from meerschaum.utils.packages import attempt_import
574        pandas = attempt_import('pandas')
575        df = pandas.DataFrame(get_first_valid_dask_partition(df))
576    return [
577        col for col, val in df.iloc[0].items()
578        if not isinstance(val, Hashable)
579    ]

Get the columns which contain unhashable objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
  • A list of columns.
def get_json_cols(df: pandas.core.frame.DataFrame) -> List[str]:
582def get_json_cols(df: 'pd.DataFrame') -> List[str]:
583    """
584    Get the columns which contain unhashable objects from a Pandas DataFrame.
585
586    Parameters
587    ----------
588    df: pd.DataFrame
589        The DataFrame which may contain unhashable objects.
590
591    Returns
592    -------
593    A list of columns to be encoded as JSON.
594    """
595    if df is None:
596        return []
597
598    is_dask = 'dask' in df.__module__ if hasattr(df, '__module__') else False
599    if is_dask:
600        df = get_first_valid_dask_partition(df)
601
602    if len(df) == 0:
603        return []
604
605    cols_indices = {
606        col: df[col].first_valid_index()
607        for col in df.columns
608    }
609    return [
610        col
611        for col, ix in cols_indices.items()
612        if (
613            ix is not None
614            and
615            not isinstance(df.loc[ix][col], Hashable)
616        )
617    ]

Get the columns which contain unhashable objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
  • A list of columns to be encoded as JSON.
def get_numeric_cols(df: pandas.core.frame.DataFrame) -> List[str]:
620def get_numeric_cols(df: 'pd.DataFrame') -> List[str]:
621    """
622    Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame.
623
624    Parameters
625    ----------
626    df: pd.DataFrame
627        The DataFrame which may contain decimal objects.
628
629    Returns
630    -------
631    A list of columns to treat as numerics.
632    """
633    if df is None:
634        return []
635    from decimal import Decimal
636    is_dask = 'dask' in df.__module__
637    if is_dask:
638        df = get_first_valid_dask_partition(df)
639
640    if len(df) == 0:
641        return []
642
643    cols_indices = {
644        col: df[col].first_valid_index()
645        for col in df.columns
646    }
647    return [
648        col
649        for col, ix in cols_indices.items()
650        if (
651            ix is not None
652            and
653            isinstance(df.loc[ix][col], Decimal)
654        )
655    ]

Get the columns which contain decimal.Decimal objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain decimal objects.
Returns
  • A list of columns to treat as numerics.
def get_uuid_cols(df: pandas.core.frame.DataFrame) -> List[str]:
658def get_uuid_cols(df: 'pd.DataFrame') -> List[str]:
659    """
660    Get the columns which contain `uuid.UUID` objects from a Pandas DataFrame.
661
662    Parameters
663    ----------
664    df: pd.DataFrame
665        The DataFrame which may contain UUID objects.
666
667    Returns
668    -------
669    A list of columns to treat as numerics.
670    """
671    if df is None:
672        return []
673    from uuid import UUID
674    is_dask = 'dask' in df.__module__
675    if is_dask:
676        df = get_first_valid_dask_partition(df)
677
678    if len(df) == 0:
679        return []
680
681    cols_indices = {
682        col: df[col].first_valid_index()
683        for col in df.columns
684    }
685    return [
686        col
687        for col, ix in cols_indices.items()
688        if (
689            ix is not None
690            and
691            isinstance(df.loc[ix][col], UUID)
692        )
693    ]

Get the columns which contain uuid.UUID objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain UUID objects.
Returns
  • A list of columns to treat as numerics.
def enforce_dtypes( df: pandas.core.frame.DataFrame, dtypes: Dict[str, str], safe_copy: bool = True, coerce_numeric: bool = True, coerce_timezone: bool = True, strip_timezone: bool = False, debug: bool = False) -> pandas.core.frame.DataFrame:
696def enforce_dtypes(
697    df: 'pd.DataFrame',
698    dtypes: Dict[str, str],
699    safe_copy: bool = True,
700    coerce_numeric: bool = True,
701    coerce_timezone: bool = True,
702    strip_timezone: bool = False,
703    debug: bool = False,
704) -> 'pd.DataFrame':
705    """
706    Enforce the `dtypes` dictionary on a DataFrame.
707
708    Parameters
709    ----------
710    df: pd.DataFrame
711        The DataFrame on which to enforce dtypes.
712
713    dtypes: Dict[str, str]
714        The data types to attempt to enforce on the DataFrame.
715
716    safe_copy: bool, default True
717        If `True`, create a copy before comparing and modifying the dataframes.
718        Setting to `False` may mutate the DataFrames.
719        See `meerschaum.utils.dataframe.filter_unseen_df`.
720
721    coerce_numeric: bool, default True
722        If `True`, convert float and int collisions to numeric.
723
724    coerce_timezone: bool, default True
725        If `True`, convert datetimes to UTC.
726
727    strip_timezone: bool, default False
728        If `coerce_timezone` and `strip_timezone` are `True`,
729        remove timezone information from datetimes.
730
731    debug: bool, default False
732        Verbosity toggle.
733
734    Returns
735    -------
736    The Pandas DataFrame with the types enforced.
737    """
738    import json
739    from meerschaum.utils.debug import dprint
740    from meerschaum.utils.formatting import pprint
741    from meerschaum.utils.dtypes import (
742        are_dtypes_equal,
743        to_pandas_dtype,
744        is_dtype_numeric,
745        attempt_cast_to_numeric,
746        attempt_cast_to_uuid,
747        coerce_timezone as _coerce_timezone,
748    )
749    pandas = mrsm.attempt_import('pandas')
750    is_dask = 'dask' in df.__module__
751    if safe_copy:
752        df = df.copy()
753    if len(df.columns) == 0:
754        if debug:
755            dprint("Incoming DataFrame has no columns. Skipping enforcement...")
756        return df
757
758    pipe_pandas_dtypes = {
759        col: to_pandas_dtype(typ)
760        for col, typ in dtypes.items()
761    }
762    json_cols = [
763        col
764        for col, typ in dtypes.items()
765        if typ == 'json'
766    ]
767    numeric_cols = [
768        col
769        for col, typ in dtypes.items()
770        if typ == 'numeric'
771    ]
772    uuid_cols = [
773        col
774        for col, typ in dtypes.items()
775        if typ == 'uuid'
776    ]
777    datetime_cols = [
778        col
779        for col, typ in dtypes.items()
780        if are_dtypes_equal(typ, 'datetime')
781    ]
782    df_numeric_cols = get_numeric_cols(df)
783    if debug:
784        dprint("Desired data types:")
785        pprint(dtypes)
786        dprint("Data types for incoming DataFrame:")
787        pprint({_col: str(_typ) for _col, _typ in df.dtypes.items()})
788
789    if json_cols and len(df) > 0:
790        if debug:
791            dprint(f"Checking columns for JSON encoding: {json_cols}")
792        for col in json_cols:
793            if col in df.columns:
794                try:
795                    df[col] = df[col].apply(
796                        (
797                            lambda x: (
798                                json.loads(x)
799                                if isinstance(x, str)
800                                else x
801                            )
802                        )
803                    )
804                except Exception as e:
805                    if debug:
806                        dprint(f"Unable to parse column '{col}' as JSON:\n{e}")
807
808    if numeric_cols:
809        if debug:
810            dprint(f"Checking for numerics: {numeric_cols}")
811        for col in numeric_cols:
812            if col in df.columns:
813                try:
814                    df[col] = df[col].apply(attempt_cast_to_numeric)
815                except Exception as e:
816                    if debug:
817                        dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}")
818
819    if uuid_cols:
820        if debug:
821            dprint(f"Checking for UUIDs: {uuid_cols}")
822        for col in uuid_cols:
823            if col in df.columns:
824                try:
825                    df[col] = df[col].apply(attempt_cast_to_uuid)
826                except Exception as e:
827                    if debug:
828                        dprint(f"Unable to parse column '{col}' as UUID:\n{e}")
829
830    if datetime_cols and coerce_timezone:
831        if debug:
832            dprint(f"Checking for datetime conversion: {datetime_cols}")
833        for col in datetime_cols:
834            if col in df.columns:
835                df[col] = _coerce_timezone(df[col], strip_utc=strip_timezone)
836
837    df_dtypes = {c: str(t) for c, t in df.dtypes.items()}
838    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
839        if debug:
840            dprint("Data types match. Exiting enforcement...")
841        return df
842
843    common_dtypes = {}
844    common_diff_dtypes = {}
845    for col, typ in pipe_pandas_dtypes.items():
846        if col in df_dtypes:
847            common_dtypes[col] = typ
848            if not are_dtypes_equal(typ, df_dtypes[col]):
849                common_diff_dtypes[col] = df_dtypes[col]
850
851    if debug:
852        dprint("Common columns with different dtypes:")
853        pprint(common_diff_dtypes)
854
855    detected_dt_cols = {}
856    for col, typ in common_diff_dtypes.items():
857        if 'datetime' in typ and 'datetime' in common_dtypes[col]:
858            df_dtypes[col] = typ
859            detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col])
860    for col in detected_dt_cols:
861        del common_diff_dtypes[col]
862
863    if debug:
864        dprint("Common columns with different dtypes (after dates):")
865        pprint(common_diff_dtypes)
866
867    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
868        if debug:
869            dprint(
870                "The incoming DataFrame has mostly the same types, skipping enforcement."
871                + "The only detected difference was in the following datetime columns."
872            )
873            pprint(detected_dt_cols)
874        return df
875
876    for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items():
877        previous_typ = common_dtypes[col]
878        mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ))
879        explicitly_float = are_dtypes_equal(dtypes.get(col, 'object'), 'float')
880        explicitly_numeric = dtypes.get(col, 'numeric') == 'numeric'
881        cast_to_numeric = (
882            explicitly_numeric
883            or col in df_numeric_cols
884            or (mixed_numeric_types and not explicitly_float)
885        ) and coerce_numeric
886        if cast_to_numeric:
887            common_dtypes[col] = attempt_cast_to_numeric
888            common_diff_dtypes[col] = attempt_cast_to_numeric
889
890    for d in common_diff_dtypes:
891        t = common_dtypes[d]
892        if debug:
893            dprint(f"Casting column {d} to dtype {t}.")
894        try:
895            df[d] = (
896                df[d].apply(t)
897                if callable(t)
898                else df[d].astype(t)
899            )
900        except Exception as e:
901            if debug:
902                dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}")
903            if 'int' in str(t).lower():
904                try:
905                    df[d] = df[d].astype('float64').astype(t)
906                except Exception:
907                    if debug:
908                        dprint(f"Was unable to convert to float then {t}.")
909    return df

Enforce the dtypes dictionary on a DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame on which to enforce dtypes.
  • dtypes (Dict[str, str]): The data types to attempt to enforce on the DataFrame.
  • safe_copy (bool, default True): If True, create a copy before comparing and modifying the dataframes. Setting to False may mutate the DataFrames. See meerschaum.utils.dataframe.filter_unseen_df.
  • coerce_numeric (bool, default True): If True, convert float and int collisions to numeric.
  • coerce_timezone (bool, default True): If True, convert datetimes to UTC.
  • strip_timezone (bool, default False): If coerce_timezone and strip_timezone are True, remove timezone information from datetimes.
  • debug (bool, default False): Verbosity toggle.
Returns
  • The Pandas DataFrame with the types enforced.
def get_datetime_bound_from_df( df: Union[pandas.core.frame.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]]], datetime_column: str, minimum: bool = True) -> Union[int, datetime.datetime, NoneType]:
912def get_datetime_bound_from_df(
913    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]],
914    datetime_column: str,
915    minimum: bool = True,
916) -> Union[int, datetime, None]:
917    """
918    Return the minimum or maximum datetime (or integer) from a DataFrame.
919
920    Parameters
921    ----------
922    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]
923        The DataFrame, list, or dict which contains the range axis.
924
925    datetime_column: str
926        The name of the datetime (or int) column.
927
928    minimum: bool
929        Whether to return the minimum (default) or maximum value.
930
931    Returns
932    -------
933    The minimum or maximum datetime value in the dataframe, or `None`.
934    """
935    if df is None:
936        return None
937    if not datetime_column:
938        return None
939
940    def compare(a, b):
941        if a is None:
942            return b
943        if b is None:
944            return a
945        if minimum:
946            return a if a < b else b
947        return a if a > b else b
948
949    if isinstance(df, list):
950        if len(df) == 0:
951            return None
952        best_yet = df[0].get(datetime_column, None)
953        for doc in df:
954            val = doc.get(datetime_column, None)
955            best_yet = compare(best_yet, val)
956        return best_yet
957
958    if isinstance(df, dict):
959        if datetime_column not in df:
960            return None
961        best_yet = df[datetime_column][0]
962        for val in df[datetime_column]:
963            best_yet = compare(best_yet, val)
964        return best_yet
965
966    if 'DataFrame' in str(type(df)):
967        from meerschaum.utils.dtypes import are_dtypes_equal
968        pandas = mrsm.attempt_import('pandas')
969        is_dask = 'dask' in df.__module__
970
971        if datetime_column not in df.columns:
972            return None
973
974        try:
975            dt_val = (
976                df[datetime_column].min(skipna=True)
977                if minimum
978                else df[datetime_column].max(skipna=True)
979            )
980        except Exception:
981            dt_val = pandas.NA
982        if is_dask and dt_val is not None and dt_val is not pandas.NA:
983            dt_val = dt_val.compute()
984
985        return (
986            pandas.to_datetime(dt_val).to_pydatetime()
987            if are_dtypes_equal(str(type(dt_val)), 'datetime')
988            else (dt_val if dt_val is not pandas.NA else None)
989        )
990
991    return None

Return the minimum or maximum datetime (or integer) from a DataFrame.

Parameters
  • df (Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]): The DataFrame, list, or dict which contains the range axis.
  • datetime_column (str): The name of the datetime (or int) column.
  • minimum (bool): Whether to return the minimum (default) or maximum value.
Returns
  • The minimum or maximum datetime value in the dataframe, or None.
def get_unique_index_values( df: Union[pandas.core.frame.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]]], indices: List[str]) -> Dict[str, List[Any]]:
 994def get_unique_index_values(
 995    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]],
 996    indices: List[str],
 997) -> Dict[str, List[Any]]:
 998    """
 999    Return a dictionary of the unique index values in a DataFrame.
1000
1001    Parameters
1002    ----------
1003    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]
1004        The dataframe (or list or dict) which contains index values.
1005
1006    indices: List[str]
1007        The list of index columns.
1008
1009    Returns
1010    -------
1011    A dictionary mapping indices to unique values.
1012    """
1013    if df is None:
1014        return {}
1015    if 'dataframe' in str(type(df)).lower():
1016        pandas = mrsm.attempt_import('pandas')
1017        return {
1018            col: list({
1019                (val if val is not pandas.NA else None)
1020                for val in df[col].unique()
1021            })
1022            for col in indices
1023            if col in df.columns
1024        }
1025
1026    unique_indices = defaultdict(lambda: set())
1027    if isinstance(df, list):
1028        for doc in df:
1029            for index in indices:
1030                if index in doc:
1031                    unique_indices[index].add(doc[index])
1032
1033    elif isinstance(df, dict):
1034        for index in indices:
1035            if index in df:
1036                unique_indices[index] = unique_indices[index].union(set(df[index]))
1037
1038    return {key: list(val) for key, val in unique_indices.items()}

Return a dictionary of the unique index values in a DataFrame.

Parameters
  • df (Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]): The dataframe (or list or dict) which contains index values.
  • indices (List[str]): The list of index columns.
Returns
  • A dictionary mapping indices to unique values.
def df_is_chunk_generator(df: Any) -> bool:
1041def df_is_chunk_generator(df: Any) -> bool:
1042    """
1043    Determine whether to treat `df` as a chunk generator.
1044
1045    Note this should only be used in a context where generators are expected,
1046    as it will return `True` for any iterable.
1047
1048    Parameters
1049    ----------
1050    The DataFrame or chunk generator to evaluate.
1051
1052    Returns
1053    -------
1054    A `bool` indicating whether to treat `df` as a generator.
1055    """
1056    return (
1057        not isinstance(df, (dict, list, str))
1058        and 'DataFrame' not in str(type(df))
1059        and isinstance(df, (Generator, Iterable, Iterator))
1060    )

Determine whether to treat df as a chunk generator.

Note this should only be used in a context where generators are expected, as it will return True for any iterable.

Parameters
  • The DataFrame or chunk generator to evaluate.
Returns
  • A bool indicating whether to treat df as a generator.
def chunksize_to_npartitions(chunksize: Optional[int]) -> int:
1063def chunksize_to_npartitions(chunksize: Optional[int]) -> int:
1064    """
1065    Return the Dask `npartitions` value for a given `chunksize`.
1066    """
1067    if chunksize == -1:
1068        from meerschaum.config import get_config
1069        chunksize = get_config('system', 'connectors', 'sql', 'chunksize')
1070    if chunksize is None:
1071        return 1
1072    return -1 * chunksize

Return the Dask npartitions value for a given chunksize.

def df_from_literal( pipe: Optional[meerschaum.Pipe] = None, literal: str = None, debug: bool = False) -> pandas.core.frame.DataFrame:
1075def df_from_literal(
1076    pipe: Optional[mrsm.Pipe] = None,
1077    literal: str = None,
1078    debug: bool = False
1079) -> 'pd.DataFrame':
1080    """
1081    Construct a dataframe from a literal value, using the pipe's datetime and value column names.
1082
1083    Parameters
1084    ----------
1085    pipe: Optional['meerschaum.Pipe'], default None
1086        The pipe which will consume the literal value.
1087
1088    Returns
1089    -------
1090    A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
1091    and the literal as the value.
1092    """
1093    from meerschaum.utils.packages import import_pandas
1094    from meerschaum.utils.warnings import error, warn
1095    from meerschaum.utils.debug import dprint
1096
1097    if pipe is None or literal is None:
1098        error("Please provide a Pipe and a literal value")
1099    ### this will raise an error if the columns are undefined
1100    dt_name, val_name = pipe.get_columns('datetime', 'value')
1101
1102    val = literal
1103    if isinstance(literal, str):
1104        if debug:
1105            dprint(f"Received literal string: '{literal}'")
1106        import ast
1107        try:
1108            val = ast.literal_eval(literal)
1109        except Exception as e:
1110            warn(
1111                "Failed to parse value from string:\n" + f"{literal}" +
1112                "\n\nWill cast as a string instead."\
1113            )
1114            val = literal
1115
1116    from datetime import datetime, timezone
1117    now = datetime.now(timezone.utc).replace(tzinfo=None)
1118
1119    pd = import_pandas()
1120    return pd.DataFrame({dt_name: [now], val_name: [val]})

Construct a dataframe from a literal value, using the pipe's datetime and value column names.

Parameters
  • pipe (Optional['meerschaum.Pipe'], default None): The pipe which will consume the literal value.
Returns
  • A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
  • and the literal as the value.
def get_first_valid_dask_partition( ddf: "'dask.dataframe.DataFrame'") -> Optional[pandas.core.frame.DataFrame]:
1123def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]:
1124    """
1125    Return the first valid Dask DataFrame partition (if possible).
1126    """
1127    pdf = None
1128    for partition in ddf.partitions:
1129        try:
1130            pdf = partition.compute()
1131        except Exception as e:
1132            continue
1133        if len(pdf) > 0:
1134            return pdf
1135    _ = mrsm.attempt_import('partd', lazy=False)
1136    return ddf.compute()

Return the first valid Dask DataFrame partition (if possible).

def query_df( df: pandas.core.frame.DataFrame, params: Optional[Dict[str, Any]] = None, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, datetime_column: Optional[str] = None, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, inplace: bool = False, reset_index: bool = False, coerce_types: bool = False, debug: bool = False) -> pandas.core.frame.DataFrame:
1139def query_df(
1140    df: 'pd.DataFrame',
1141    params: Optional[Dict[str, Any]] = None,
1142    begin: Union[datetime, int, None] = None,
1143    end: Union[datetime, int, None] = None,
1144    datetime_column: Optional[str] = None,
1145    select_columns: Optional[List[str]] = None,
1146    omit_columns: Optional[List[str]] = None,
1147    inplace: bool = False,
1148    reset_index: bool = False,
1149    coerce_types: bool = False,
1150    debug: bool = False,
1151) -> 'pd.DataFrame':
1152    """
1153    Query the dataframe with the params dictionary.
1154
1155    Parameters
1156    ----------
1157    df: pd.DataFrame
1158        The DataFrame to query against.
1159
1160    params: Optional[Dict[str, Any]], default None
1161        The parameters dictionary to use for the query.
1162
1163    begin: Union[datetime, int, None], default None
1164        If `begin` and `datetime_column` are provided, only return rows with a timestamp
1165        greater than or equal to this value.
1166
1167    end: Union[datetime, int, None], default None
1168        If `begin` and `datetime_column` are provided, only return rows with a timestamp
1169        less than this value.
1170
1171    datetime_column: Optional[str], default None
1172        A `datetime_column` must be provided to use `begin` and `end`.
1173
1174    select_columns: Optional[List[str]], default None
1175        If provided, only return these columns.
1176
1177    omit_columns: Optional[List[str]], default None
1178        If provided, do not include these columns in the result.
1179
1180    inplace: bool, default False
1181        If `True`, modify the DataFrame inplace rather than creating a new DataFrame.
1182
1183    reset_index: bool, default False
1184        If `True`, reset the index in the resulting DataFrame.
1185
1186    coerce_types: bool, default False
1187        If `True`, cast the dataframe and parameters as strings before querying.
1188
1189    Returns
1190    -------
1191    A Pandas DataFrame query result.
1192    """
1193
1194    def _process_select_columns(_df):
1195        if not select_columns:
1196            return
1197        for col in list(_df.columns):
1198            if col not in select_columns:
1199                del _df[col]
1200
1201    def _process_omit_columns(_df):
1202        if not omit_columns:
1203            return
1204        for col in list(_df.columns):
1205            if col in omit_columns:
1206                del _df[col]
1207
1208    if not params and not begin and not end:
1209        if not inplace:
1210            df = df.copy()
1211        _process_select_columns(df)
1212        _process_omit_columns(df)
1213        return df
1214
1215    from meerschaum.utils.debug import dprint
1216    from meerschaum.utils.misc import get_in_ex_params
1217    from meerschaum.utils.warnings import warn
1218    from meerschaum.utils.dtypes import are_dtypes_equal, value_is_null
1219    dateutil_parser = mrsm.attempt_import('dateutil.parser')
1220    pandas = mrsm.attempt_import('pandas')
1221    NA = pandas.NA
1222
1223    if params:
1224        params = params.copy()
1225        for key, val in {k: v for k, v in params.items()}.items():
1226            if isinstance(val, (list, tuple)):
1227                if None in val:
1228                    val = [item for item in val if item is not None] + [NA]
1229                    params[key] = val
1230                if coerce_types:
1231                    params[key] = [str(x) for x in val]
1232            else:
1233                if value_is_null(val):
1234                    val = NA
1235                    params[key] = NA
1236                if coerce_types:
1237                    params[key] = str(val)
1238
1239    dtypes = {col: str(typ) for col, typ in df.dtypes.items()}
1240
1241    if inplace:
1242        df.fillna(NA, inplace=True)
1243    else:
1244        df = df.infer_objects().fillna(NA)
1245
1246    if isinstance(begin, str):
1247        begin = dateutil_parser.parse(begin)
1248    if isinstance(end, str):
1249        end = dateutil_parser.parse(end)
1250
1251    if begin is not None or end is not None:
1252        if not datetime_column or datetime_column not in df.columns:
1253            warn(
1254                f"The datetime column '{datetime_column}' is not present in the Dataframe, "
1255                + "ignoring begin and end...",
1256            )
1257            begin, end = None, None
1258
1259    if debug:
1260        dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}")
1261
1262    if datetime_column and (begin is not None or end is not None):
1263        if debug:
1264            dprint("Checking for datetime column compatability.")
1265
1266        from meerschaum.utils.dtypes import coerce_timezone
1267        df_is_dt = are_dtypes_equal(str(df.dtypes[datetime_column]), 'datetime')
1268        begin_is_int = are_dtypes_equal(str(type(begin)), 'int')
1269        end_is_int = are_dtypes_equal(str(type(end)), 'int')
1270
1271        if df_is_dt:
1272            df_tz = (
1273                getattr(df[datetime_column].dt, 'tz', None)
1274                if hasattr(df[datetime_column], 'dt')
1275                else None
1276            )
1277
1278            if begin_is_int:
1279                begin = datetime.fromtimestamp(int(begin), timezone.utc).replace(tzinfo=None)
1280                if debug:
1281                    dprint(f"`begin` will be cast to '{begin}'.")
1282            if end_is_int:
1283                end = datetime.fromtimestamp(int(end), timezone.utc).replace(tzinfo=None)
1284                if debug:
1285                    dprint(f"`end` will be cast to '{end}'.")
1286
1287            begin = coerce_timezone(begin, strip_utc=(df_tz is None)) if begin is not None else None
1288            end = coerce_timezone(end, strip_utc=(df_tz is None)) if begin is not None else None
1289
1290    in_ex_params = get_in_ex_params(params)
1291
1292    masks = [
1293        (
1294            (df[datetime_column] >= begin)
1295            if begin is not None and datetime_column
1296            else True
1297        ) & (
1298            (df[datetime_column] < end)
1299            if end is not None and datetime_column
1300            else True
1301        )
1302    ]
1303
1304    masks.extend([
1305        (
1306            (
1307                (df[col] if not coerce_types else df[col].astype(str)).isin(in_vals)
1308                if in_vals
1309                else True
1310            ) & (
1311                ~(df[col] if not coerce_types else df[col].astype(str)).isin(ex_vals)
1312                if ex_vals
1313                else True
1314            )
1315        )
1316        for col, (in_vals, ex_vals) in in_ex_params.items()
1317        if col in df.columns
1318    ])
1319    query_mask = masks[0]
1320    for mask in masks[1:]:
1321        query_mask = query_mask & mask
1322
1323    original_cols = df.columns
1324
1325    ### NOTE: We must cast bool columns to `boolean[pyarrow]`
1326    ###       to allow for `<NA>` values.
1327    bool_cols = [
1328        col
1329        for col, typ in df.dtypes.items()
1330        if are_dtypes_equal(str(typ), 'bool')
1331    ]
1332    for col in bool_cols:
1333        df[col] = df[col].astype('boolean[pyarrow]')
1334
1335    if not isinstance(query_mask, bool):
1336        df['__mrsm_mask'] = (
1337            query_mask.astype('boolean[pyarrow]')
1338            if hasattr(query_mask, 'astype')
1339            else query_mask
1340        )
1341
1342        if inplace:
1343            df.where(query_mask, other=NA, inplace=True)
1344            df.dropna(how='all', inplace=True)
1345            result_df = df
1346        else:
1347            result_df = df.where(query_mask, other=NA)
1348            result_df.dropna(how='all', inplace=True)
1349
1350    else:
1351        result_df = df
1352
1353    if '__mrsm_mask' in df.columns:
1354        del df['__mrsm_mask']
1355    if '__mrsm_mask' in result_df.columns:
1356        del result_df['__mrsm_mask']
1357
1358    if reset_index:
1359        result_df.reset_index(drop=True, inplace=True)
1360
1361    result_df = enforce_dtypes(
1362        result_df,
1363        dtypes,
1364        safe_copy=False,
1365        debug=debug,
1366        coerce_numeric=False,
1367        coerce_timezone=False,
1368    )
1369
1370    if select_columns == ['*']:
1371        select_columns = None
1372
1373    if not select_columns and not omit_columns:
1374        return result_df[original_cols]
1375
1376    _process_select_columns(result_df)
1377    _process_omit_columns(result_df)
1378
1379    return result_df

Query the dataframe with the params dictionary.

Parameters
  • df (pd.DataFrame): The DataFrame to query against.
  • params (Optional[Dict[str, Any]], default None): The parameters dictionary to use for the query.
  • begin (Union[datetime, int, None], default None): If begin and datetime_column are provided, only return rows with a timestamp greater than or equal to this value.
  • end (Union[datetime, int, None], default None): If begin and datetime_column are provided, only return rows with a timestamp less than this value.
  • datetime_column (Optional[str], default None): A datetime_column must be provided to use begin and end.
  • select_columns (Optional[List[str]], default None): If provided, only return these columns.
  • omit_columns (Optional[List[str]], default None): If provided, do not include these columns in the result.
  • inplace (bool, default False): If True, modify the DataFrame inplace rather than creating a new DataFrame.
  • reset_index (bool, default False): If True, reset the index in the resulting DataFrame.
  • coerce_types (bool, default False): If True, cast the dataframe and parameters as strings before querying.
Returns
  • A Pandas DataFrame query result.
def to_json( df: pandas.core.frame.DataFrame, safe_copy: bool = True, orient: str = 'records', date_format: str = 'iso', date_unit: str = 'us', **kwargs: Any) -> str:
1382def to_json(
1383    df: 'pd.DataFrame',
1384    safe_copy: bool = True,
1385    orient: str = 'records',
1386    date_format: str = 'iso',
1387    date_unit: str = 'us',
1388    **kwargs: Any
1389) -> str:
1390    """
1391    Serialize the given dataframe as a JSON string.
1392
1393    Parameters
1394    ----------
1395    df: pd.DataFrame
1396        The DataFrame to be serialized.
1397
1398    safe_copy: bool, default True
1399        If `False`, modify the DataFrame inplace.
1400
1401    date_format: str, default 'iso'
1402        The default format for timestamps.
1403
1404    date_unit: str, default 'us'
1405        The precision of the timestamps.
1406
1407    Returns
1408    -------
1409    A JSON string.
1410    """
1411    from meerschaum.utils.packages import import_pandas
1412    pd = import_pandas()
1413    uuid_cols = get_uuid_cols(df)
1414    if uuid_cols and safe_copy:
1415        df = df.copy()
1416    for col in uuid_cols:
1417        df[col] = df[col].astype(str)
1418    return df.infer_objects(copy=False).fillna(pd.NA).to_json(
1419        date_format=date_format,
1420        date_unit=date_unit,
1421        orient=orient,
1422        **kwargs
1423    )

Serialize the given dataframe as a JSON string.

Parameters
  • df (pd.DataFrame): The DataFrame to be serialized.
  • safe_copy (bool, default True): If False, modify the DataFrame inplace.
  • date_format (str, default 'iso'): The default format for timestamps.
  • date_unit (str, default 'us'): The precision of the timestamps.
Returns
  • A JSON string.