meerschaum.utils.dataframe

Utility functions for working with DataFrames.

   1#! /usr/bin/env python3
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4
   5"""
   6Utility functions for working with DataFrames.
   7"""
   8
   9from __future__ import annotations
  10from datetime import datetime, timezone
  11from collections import defaultdict
  12
  13import meerschaum as mrsm
  14from meerschaum.utils.typing import (
  15    Optional, Dict, Any, List, Hashable, Generator,
  16    Iterator, Iterable, Union, TYPE_CHECKING,
  17)
  18
  19if TYPE_CHECKING:
  20    pd, dask = mrsm.attempt_import('pandas', 'dask')
  21
  22
  23def add_missing_cols_to_df(
  24    df: 'pd.DataFrame',
  25    dtypes: Dict[str, Any],
  26) -> 'pd.DataFrame':
  27    """
  28    Add columns from the dtypes dictionary as null columns to a new DataFrame.
  29
  30    Parameters
  31    ----------
  32    df: pd.DataFrame
  33        The dataframe we should copy and add null columns.
  34
  35    dtypes:
  36        The data types dictionary which may contain keys not present in `df.columns`.
  37
  38    Returns
  39    -------
  40    A new `DataFrame` with the keys from `dtypes` added as null columns.
  41    If `df.dtypes` is the same as `dtypes`, then return a reference to `df`.
  42    NOTE: This will not ensure that dtypes are enforced!
  43
  44    Examples
  45    --------
  46    >>> import pandas as pd
  47    >>> df = pd.DataFrame([{'a': 1}])
  48    >>> dtypes = {'b': 'Int64'}
  49    >>> add_missing_cols_to_df(df, dtypes)
  50          a  b
  51       0  1  <NA>
  52    >>> add_missing_cols_to_df(df, dtypes).dtypes
  53    a    int64
  54    b    Int64
  55    dtype: object
  56    >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes
  57    a    int64
  58    dtype: object
  59    >>> 
  60    """
  61    if set(df.columns) == set(dtypes):
  62        return df
  63
  64    from meerschaum.utils.packages import attempt_import
  65    from meerschaum.utils.dtypes import to_pandas_dtype
  66    pandas = attempt_import('pandas')
  67
  68    def build_series(dtype: str):
  69        return pandas.Series([], dtype=to_pandas_dtype(dtype))
  70
  71    assign_kwargs = {
  72        str(col): build_series(str(typ))
  73        for col, typ in dtypes.items()
  74        if col not in df.columns
  75    }
  76    df_with_cols = df.assign(**assign_kwargs)
  77    for col in assign_kwargs:
  78        df_with_cols[col] = df_with_cols[col].fillna(pandas.NA)
  79    return df_with_cols
  80
  81
  82def filter_unseen_df(
  83    old_df: 'pd.DataFrame',
  84    new_df: 'pd.DataFrame',
  85    safe_copy: bool = True,
  86    dtypes: Optional[Dict[str, Any]] = None,
  87    include_unchanged_columns: bool = False,
  88    debug: bool = False,
  89) -> 'pd.DataFrame':
  90    """
  91    Left join two DataFrames to find the newest unseen data.
  92
  93    Parameters
  94    ----------
  95    old_df: 'pd.DataFrame'
  96        The original (target) dataframe. Acts as a filter on the `new_df`.
  97        
  98    new_df: 'pd.DataFrame'
  99        The fetched (source) dataframe. Rows that are contained in `old_df` are removed.
 100
 101    safe_copy: bool, default True
 102        If `True`, create a copy before comparing and modifying the dataframes.
 103        Setting to `False` may mutate the DataFrames.
 104        
 105    dtypes: Optional[Dict[str, Any]], default None
 106        Optionally specify the datatypes of the dataframe.
 107
 108    include_unchanged_columns: bool, default False
 109        If `True`, include columns which haven't changed on rows which have changed.
 110
 111    debug: bool, default False
 112        Verbosity toggle.
 113
 114    Returns
 115    -------
 116    A pandas dataframe of the new, unseen rows in `new_df`.
 117
 118    Examples
 119    --------
 120    ```python
 121    >>> import pandas as pd
 122    >>> df1 = pd.DataFrame({'a': [1,2]})
 123    >>> df2 = pd.DataFrame({'a': [2,3]})
 124    >>> filter_unseen_df(df1, df2)
 125       a
 126    0  3
 127
 128    ```
 129
 130    """
 131    if old_df is None:
 132        return new_df
 133
 134    if safe_copy:
 135        old_df = old_df.copy()
 136        new_df = new_df.copy()
 137
 138    import json
 139    import functools
 140    import traceback
 141    from decimal import Decimal
 142    from uuid import UUID
 143    from meerschaum.utils.warnings import warn
 144    from meerschaum.utils.packages import import_pandas, attempt_import
 145    from meerschaum.utils.dtypes import (
 146        to_pandas_dtype,
 147        are_dtypes_equal,
 148        attempt_cast_to_numeric,
 149        attempt_cast_to_uuid,
 150        coerce_timezone,
 151    )
 152    pd = import_pandas(debug=debug)
 153    is_dask = 'dask' in new_df.__module__
 154    if is_dask:
 155        pandas = attempt_import('pandas')
 156        _ = attempt_import('partd', lazy=False)
 157        dd = attempt_import('dask.dataframe')
 158        merge = dd.merge
 159        NA = pandas.NA
 160    else:
 161        merge = pd.merge
 162        NA = pd.NA
 163
 164    new_df_dtypes = dict(new_df.dtypes)
 165    old_df_dtypes = dict(old_df.dtypes)
 166
 167    same_cols = set(new_df.columns) == set(old_df.columns)
 168    if not same_cols:
 169        new_df = add_missing_cols_to_df(new_df, old_df_dtypes)
 170        old_df = add_missing_cols_to_df(old_df, new_df_dtypes)
 171
 172        new_types_missing_from_old = {
 173            col: typ
 174            for col, typ in new_df_dtypes.items()
 175            if col not in old_df_dtypes
 176        }
 177        old_types_missing_from_new = {
 178            col: typ
 179            for col, typ in new_df_dtypes.items()
 180            if col not in old_df_dtypes
 181        }
 182        old_df_dtypes.update(new_types_missing_from_old)
 183        new_df_dtypes.update(old_types_missing_from_new)
 184
 185    ### Edge case: two empty lists cast to DFs.
 186    elif len(new_df.columns) == 0:
 187        return new_df
 188
 189    try:
 190        ### Order matters when checking equality.
 191        new_df = new_df[old_df.columns]
 192
 193    except Exception as e:
 194        warn(
 195            "Was not able to cast old columns onto new DataFrame. " +
 196            f"Are both DataFrames the same shape? Error:\n{e}",
 197            stacklevel = 3,
 198        )
 199        return new_df[list(new_df_dtypes.keys())]
 200
 201    ### assume the old_df knows what it's doing, even if it's technically wrong.
 202    if dtypes is None:
 203        dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()}
 204
 205    dtypes = {
 206        col: to_pandas_dtype(typ)
 207        for col, typ in dtypes.items()
 208        if col in new_df_dtypes and col in old_df_dtypes
 209    }
 210    for col, typ in new_df_dtypes.items():
 211        if col not in dtypes:
 212            dtypes[col] = typ
 213
 214    dt_dtypes = {
 215        col: typ
 216        for col, typ in dtypes.items()
 217        if are_dtypes_equal(typ, 'datetime')
 218    }
 219    non_dt_dtypes = {
 220        col: typ
 221        for col, typ in dtypes.items()
 222        if col not in dt_dtypes
 223    }
 224
 225    cast_non_dt_cols = True
 226    try:
 227        new_df = new_df.astype(non_dt_dtypes)
 228        cast_non_dt_cols = False
 229    except Exception as e:
 230        warn(
 231            f"Was not able to cast the new DataFrame to the given dtypes.\n{e}"
 232        )
 233
 234    cast_dt_cols = True
 235    try:
 236        for col, typ in dt_dtypes.items():
 237            tz = typ.split(',')[-1].strip() if ',' in typ else None
 238            new_df[col] = coerce_timezone(pd.to_datetime(new_df[col], utc=True))
 239        cast_dt_cols = False
 240    except Exception as e:
 241        warn(f"Could not cast datetime columns:\n{e}")
 242
 243    cast_cols = cast_dt_cols or cast_non_dt_cols
 244
 245    new_numeric_cols_existing = get_numeric_cols(new_df)
 246    old_numeric_cols = get_numeric_cols(old_df)
 247    for col, typ in {k: v for k, v in dtypes.items()}.items():
 248        if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')):
 249            new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float')
 250            new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int')
 251            new_is_numeric = col in new_numeric_cols_existing
 252            old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float')
 253            old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int')
 254            old_is_numeric = col in old_numeric_cols
 255
 256            if (
 257                (new_is_float or new_is_int or new_is_numeric)
 258                and
 259                (old_is_float or old_is_int or old_is_numeric)
 260            ):
 261                dtypes[col] = attempt_cast_to_numeric
 262                cast_cols = True
 263                continue
 264
 265            ### Fallback to object if the types don't match.
 266            warn(
 267                f"Detected different types for '{col}' "
 268                + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), "
 269                + "falling back to 'object'..."
 270            )
 271            dtypes[col] = 'object'
 272            cast_cols = True
 273
 274    if cast_cols:
 275        for col, dtype in dtypes.items():
 276            if col in new_df.columns:
 277                try:
 278                    new_df[col] = (
 279                        new_df[col].astype(dtype)
 280                        if not callable(dtype)
 281                        else new_df[col].apply(dtype)
 282                    )
 283                except Exception as e:
 284                    warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}")
 285
 286    serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str)
 287    new_json_cols = get_json_cols(new_df)
 288    old_json_cols = get_json_cols(old_df)
 289    json_cols = set(new_json_cols + old_json_cols)
 290    for json_col in old_json_cols:
 291        old_df[json_col] = old_df[json_col].apply(serializer)
 292    for json_col in new_json_cols:
 293        new_df[json_col] = new_df[json_col].apply(serializer)
 294
 295    new_numeric_cols = get_numeric_cols(new_df)
 296    numeric_cols = set(new_numeric_cols + old_numeric_cols)
 297    for numeric_col in old_numeric_cols:
 298        old_df[numeric_col] = old_df[numeric_col].apply(
 299            lambda x: f'{x:f}' if isinstance(x, Decimal) else x
 300        )
 301    for numeric_col in new_numeric_cols:
 302        new_df[numeric_col] = new_df[numeric_col].apply(
 303            lambda x: f'{x:f}' if isinstance(x, Decimal) else x
 304        )
 305
 306    old_dt_cols = [
 307        col
 308        for col, typ in old_df.dtypes.items()
 309        if are_dtypes_equal(str(typ), 'datetime')
 310    ]
 311    for col in old_dt_cols:
 312        old_df[col] = coerce_timezone(old_df[col])
 313
 314    new_dt_cols = [
 315        col
 316        for col, typ in old_df.dtypes.items()
 317        if are_dtypes_equal(str(typ), 'datetime')
 318    ]
 319    for col in new_dt_cols:
 320        new_df[col] = coerce_timezone(new_df[col])
 321
 322    old_uuid_cols = get_uuid_cols(old_df)
 323    new_uuid_cols = get_uuid_cols(new_df)
 324    uuid_cols = set(new_uuid_cols + old_uuid_cols)
 325    joined_df = merge(
 326        new_df.infer_objects(copy=False).fillna(NA),
 327        old_df.infer_objects(copy=False).fillna(NA),
 328        how='left',
 329        on=None,
 330        indicator=True,
 331    )
 332    changed_rows_mask = (joined_df['_merge'] == 'left_only')
 333    new_cols = list(new_df_dtypes)
 334    delta_df = joined_df[new_cols][changed_rows_mask].reset_index(drop=True)
 335
 336    for json_col in json_cols:
 337        if json_col not in delta_df.columns:
 338            continue
 339        try:
 340            delta_df[json_col] = delta_df[json_col].apply(json.loads)
 341        except Exception:
 342            warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}")
 343
 344    for numeric_col in numeric_cols:
 345        if numeric_col not in delta_df.columns:
 346            continue
 347        try:
 348            delta_df[numeric_col] = delta_df[numeric_col].apply(attempt_cast_to_numeric)
 349        except Exception:
 350            warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}")
 351
 352    for uuid_col in uuid_cols:
 353        if uuid_col not in delta_df.columns:
 354            continue
 355        try:
 356            delta_df[uuid_col] = delta_df[uuid_col].apply(attempt_cast_to_uuid)
 357        except Exception:
 358            warn(f"Unable to parse numeric column '{uuid_col}':\n{traceback.format_exc()}")
 359
 360    return delta_df
 361
 362
 363def parse_df_datetimes(
 364    df: 'pd.DataFrame',
 365    ignore_cols: Optional[Iterable[str]] = None,
 366    chunksize: Optional[int] = None,
 367    dtype_backend: str = 'numpy_nullable',
 368    debug: bool = False,
 369) -> 'pd.DataFrame':
 370    """
 371    Parse a pandas DataFrame for datetime columns and cast as datetimes.
 372
 373    Parameters
 374    ----------
 375    df: pd.DataFrame
 376        The pandas DataFrame to parse.
 377
 378    ignore_cols: Optional[Iterable[str]], default None
 379        If provided, do not attempt to coerce these columns as datetimes.
 380
 381    chunksize: Optional[int], default None
 382        If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe.
 383
 384    dtype_backend: str, default 'numpy_nullable'
 385        If `df` is not a DataFrame and new one needs to be constructed,
 386        use this as the datatypes backend.
 387        Accepted values are 'numpy_nullable' and 'pyarrow'.
 388        
 389    debug: bool, default False
 390        Verbosity toggle.
 391
 392    Returns
 393    -------
 394    A new pandas DataFrame with the determined datetime columns
 395    (usually ISO strings) cast as datetimes.
 396
 397    Examples
 398    --------
 399    ```python
 400    >>> import pandas as pd
 401    >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 
 402    >>> df.dtypes
 403    a    object
 404    dtype: object
 405    >>> df = parse_df_datetimes(df)
 406    >>> df.dtypes
 407    a    datetime64[ns]
 408    dtype: object
 409
 410    ```
 411
 412    """
 413    from meerschaum.utils.packages import import_pandas, attempt_import
 414    from meerschaum.utils.debug import dprint
 415    from meerschaum.utils.warnings import warn
 416    from meerschaum.utils.misc import items_str
 417    import traceback
 418    pd = import_pandas()
 419    pandas = attempt_import('pandas')
 420    pd_name = pd.__name__
 421    using_dask = 'dask' in pd_name
 422    df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__)
 423    dask_dataframe = None
 424    if using_dask or df_is_dask:
 425        npartitions = chunksize_to_npartitions(chunksize)
 426        dask_dataframe = attempt_import('dask.dataframe')
 427
 428    ### if df is a dict, build DataFrame
 429    if isinstance(df, pandas.DataFrame):
 430        pdf = df
 431    elif df_is_dask and isinstance(df, dask_dataframe.DataFrame):
 432        pdf = get_first_valid_dask_partition(df)
 433    else:
 434        if debug:
 435            dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...")
 436
 437        if using_dask:
 438            if isinstance(df, list):
 439                keys = set()
 440                for doc in df:
 441                    for key in doc:
 442                        keys.add(key)
 443                df = pd.DataFrame.from_dict(
 444                    {
 445                        k: [
 446                            doc.get(k, None)
 447                            for doc in df
 448                        ] for k in keys
 449                    },
 450                    npartitions = npartitions,
 451                )
 452            elif isinstance(df, dict):
 453                df = pd.DataFrame.from_dict(df, npartitions=npartitions)
 454            elif 'pandas.core.frame.DataFrame' in str(type(df)):
 455                df = pd.from_pandas(df, npartitions=npartitions)
 456            else:
 457                raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.")
 458            pandas = attempt_import('pandas')
 459            pdf = get_first_valid_dask_partition(df)
 460
 461        else:
 462            df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend)
 463            pdf = df
 464
 465    ### skip parsing if DataFrame is empty
 466    if len(pdf) == 0:
 467        if debug:
 468            dprint(f"df is empty. Returning original DataFrame without casting datetime columns...")
 469        return df
 470
 471    ignore_cols = set(
 472        (ignore_cols or []) + [
 473            col
 474            for col, dtype in pdf.dtypes.items() 
 475            if 'datetime' in str(dtype)
 476        ]
 477    )
 478    cols_to_inspect = [col for col in pdf.columns if col not in ignore_cols]
 479
 480    if len(cols_to_inspect) == 0:
 481        if debug:
 482            dprint(f"All columns are ignored, skipping datetime detection...")
 483        return df
 484
 485    ### apply regex to columns to determine which are ISO datetimes
 486    iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+'
 487    dt_mask = pdf[cols_to_inspect].astype(str).apply(
 488        lambda s: s.str.match(iso_dt_regex).all()
 489    )
 490
 491    ### list of datetime column names
 492    datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]]
 493    if not datetime_cols:
 494        if debug:
 495            dprint("No columns detected as datetimes, returning...")
 496        return df
 497
 498    if debug:
 499        dprint("Converting columns to datetimes: " + str(datetime_cols))
 500
 501    try:
 502        if not using_dask:
 503            df[datetime_cols] = df[datetime_cols].apply(pd.to_datetime, utc=True)
 504        else:
 505            df[datetime_cols] = df[datetime_cols].apply(
 506                pd.to_datetime,
 507                utc=True,
 508                axis=1,
 509                meta={
 510                    col: 'datetime64[ns]'
 511                    for col in datetime_cols
 512                }
 513            )
 514    except Exception:
 515        warn(
 516            f"Unable to apply `pd.to_datetime` to {items_str(datetime_cols)}:\n"
 517            + f"{traceback.format_exc()}"
 518        )
 519
 520    for dt in datetime_cols:
 521        try:
 522            df[dt] = df[dt].dt.tz_localize(None)
 523        except Exception:
 524            warn(f"Unable to convert column '{dt}' to naive datetime:\n{traceback.format_exc()}")
 525
 526    return df
 527
 528
 529def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]:
 530    """
 531    Get the columns which contain unhashable objects from a Pandas DataFrame.
 532
 533    Parameters
 534    ----------
 535    df: pd.DataFrame
 536        The DataFrame which may contain unhashable objects.
 537
 538    Returns
 539    -------
 540    A list of columns.
 541    """
 542    if df is None:
 543        return []
 544    if len(df) == 0:
 545        return []
 546
 547    is_dask = 'dask' in df.__module__
 548    if is_dask:
 549        from meerschaum.utils.packages import attempt_import
 550        pandas = attempt_import('pandas')
 551        df = pandas.DataFrame(get_first_valid_dask_partition(df))
 552    return [
 553        col for col, val in df.iloc[0].items()
 554        if not isinstance(val, Hashable)
 555    ]
 556
 557
 558def get_json_cols(df: 'pd.DataFrame') -> List[str]:
 559    """
 560    Get the columns which contain unhashable objects from a Pandas DataFrame.
 561
 562    Parameters
 563    ----------
 564    df: pd.DataFrame
 565        The DataFrame which may contain unhashable objects.
 566
 567    Returns
 568    -------
 569    A list of columns to be encoded as JSON.
 570    """
 571    if df is None:
 572        return []
 573
 574    is_dask = 'dask' in df.__module__ if hasattr(df, '__module__') else False
 575    if is_dask:
 576        df = get_first_valid_dask_partition(df)
 577
 578    if len(df) == 0:
 579        return []
 580
 581    cols_indices = {
 582        col: df[col].first_valid_index()
 583        for col in df.columns
 584    }
 585    return [
 586        col
 587        for col, ix in cols_indices.items()
 588        if (
 589            ix is not None
 590            and
 591            not isinstance(df.loc[ix][col], Hashable)
 592        )
 593    ]
 594
 595
 596def get_numeric_cols(df: 'pd.DataFrame') -> List[str]:
 597    """
 598    Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame.
 599
 600    Parameters
 601    ----------
 602    df: pd.DataFrame
 603        The DataFrame which may contain decimal objects.
 604
 605    Returns
 606    -------
 607    A list of columns to treat as numerics.
 608    """
 609    if df is None:
 610        return []
 611    from decimal import Decimal
 612    is_dask = 'dask' in df.__module__
 613    if is_dask:
 614        df = get_first_valid_dask_partition(df)
 615
 616    if len(df) == 0:
 617        return []
 618
 619    cols_indices = {
 620        col: df[col].first_valid_index()
 621        for col in df.columns
 622    }
 623    return [
 624        col
 625        for col, ix in cols_indices.items()
 626        if (
 627            ix is not None
 628            and
 629            isinstance(df.loc[ix][col], Decimal)
 630        )
 631    ]
 632
 633
 634def get_uuid_cols(df: 'pd.DataFrame') -> List[str]:
 635    """
 636    Get the columns which contain `uuid.UUID` objects from a Pandas DataFrame.
 637
 638    Parameters
 639    ----------
 640    df: pd.DataFrame
 641        The DataFrame which may contain UUID objects.
 642
 643    Returns
 644    -------
 645    A list of columns to treat as numerics.
 646    """
 647    if df is None:
 648        return []
 649    from uuid import UUID
 650    is_dask = 'dask' in df.__module__
 651    if is_dask:
 652        df = get_first_valid_dask_partition(df)
 653
 654    if len(df) == 0:
 655        return []
 656
 657    cols_indices = {
 658        col: df[col].first_valid_index()
 659        for col in df.columns
 660    }
 661    return [
 662        col
 663        for col, ix in cols_indices.items()
 664        if (
 665            ix is not None
 666            and
 667            isinstance(df.loc[ix][col], UUID)
 668        )
 669    ]
 670
 671
 672def enforce_dtypes(
 673    df: 'pd.DataFrame',
 674    dtypes: Dict[str, str],
 675    safe_copy: bool = True,
 676    coerce_numeric: bool = True,
 677    debug: bool = False,
 678) -> 'pd.DataFrame':
 679    """
 680    Enforce the `dtypes` dictionary on a DataFrame.
 681
 682    Parameters
 683    ----------
 684    df: pd.DataFrame
 685        The DataFrame on which to enforce dtypes.
 686
 687    dtypes: Dict[str, str]
 688        The data types to attempt to enforce on the DataFrame.
 689
 690    safe_copy: bool, default True
 691        If `True`, create a copy before comparing and modifying the dataframes.
 692        Setting to `False` may mutate the DataFrames.
 693        See `meerschaum.utils.dataframe.filter_unseen_df`.
 694
 695    coerce_numeric: bool, default True
 696        If `True`, convert float and int collisions to numeric.
 697
 698    debug: bool, default False
 699        Verbosity toggle.
 700
 701    Returns
 702    -------
 703    The Pandas DataFrame with the types enforced.
 704    """
 705    import json
 706    import traceback
 707    from decimal import Decimal
 708    from meerschaum.utils.debug import dprint
 709    from meerschaum.utils.warnings import warn
 710    from meerschaum.utils.formatting import pprint
 711    from meerschaum.config.static import STATIC_CONFIG
 712    from meerschaum.utils.packages import import_pandas
 713    from meerschaum.utils.dtypes import (
 714        are_dtypes_equal,
 715        to_pandas_dtype,
 716        is_dtype_numeric,
 717        attempt_cast_to_numeric,
 718        attempt_cast_to_uuid,
 719        coerce_timezone,
 720    )
 721    if safe_copy:
 722        df = df.copy()
 723    if len(df.columns) == 0:
 724        if debug:
 725            dprint("Incoming DataFrame has no columns. Skipping enforcement...")
 726        return df
 727
 728    pipe_pandas_dtypes = {
 729        col: to_pandas_dtype(typ)
 730        for col, typ in dtypes.items()
 731    }
 732    json_cols = [
 733        col
 734        for col, typ in dtypes.items()
 735        if typ == 'json'
 736    ]
 737    numeric_cols = [
 738        col
 739        for col, typ in dtypes.items()
 740        if typ == 'numeric'
 741    ]
 742    uuid_cols = [
 743        col
 744        for col, typ in dtypes.items()
 745        if typ == 'uuid'
 746    ]
 747    df_numeric_cols = get_numeric_cols(df)
 748    if debug:
 749        dprint("Desired data types:")
 750        pprint(dtypes)
 751        dprint("Data types for incoming DataFrame:")
 752        pprint({_col: str(_typ) for _col, _typ in df.dtypes.items()})
 753
 754    if json_cols and len(df) > 0:
 755        if debug:
 756            dprint(f"Checking columns for JSON encoding: {json_cols}")
 757        for col in json_cols:
 758            if col in df.columns:
 759                try:
 760                    df[col] = df[col].apply(
 761                        (
 762                            lambda x: (
 763                                json.loads(x)
 764                                if isinstance(x, str)
 765                                else x
 766                            )
 767                        )
 768                    )
 769                except Exception as e:
 770                    if debug:
 771                        dprint(f"Unable to parse column '{col}' as JSON:\n{e}")
 772
 773    if numeric_cols:
 774        if debug:
 775            dprint(f"Checking for numerics: {numeric_cols}")
 776        for col in numeric_cols:
 777            if col in df.columns:
 778                try:
 779                    df[col] = df[col].apply(attempt_cast_to_numeric)
 780                except Exception as e:
 781                    if debug:
 782                        dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}")
 783
 784    if uuid_cols:
 785        if debug:
 786            dprint(f"Checking for UUIDs: {uuid_cols}")
 787        for col in uuid_cols:
 788            if col in df.columns:
 789                try:
 790                    df[col] = df[col].apply(attempt_cast_to_uuid)
 791                except Exception as e:
 792                    if debug:
 793                        dprint(f"Unable to parse column '{col}' as UUID:\n{e}")
 794
 795    df_dtypes = {c: str(t) for c, t in df.dtypes.items()}
 796    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
 797        if debug:
 798            dprint("Data types match. Exiting enforcement...")
 799        return df
 800
 801    common_dtypes = {}
 802    common_diff_dtypes = {}
 803    for col, typ in pipe_pandas_dtypes.items():
 804        if col in df_dtypes:
 805            common_dtypes[col] = typ
 806            if not are_dtypes_equal(typ, df_dtypes[col]):
 807                common_diff_dtypes[col] = df_dtypes[col]
 808
 809    if debug:
 810        dprint("Common columns with different dtypes:")
 811        pprint(common_diff_dtypes)
 812
 813    detected_dt_cols = {}
 814    for col, typ in common_diff_dtypes.items():
 815        if 'datetime' in typ and 'datetime' in common_dtypes[col]:
 816            df_dtypes[col] = typ
 817            detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col])
 818    for col in detected_dt_cols:
 819        del common_diff_dtypes[col]
 820
 821    if debug:
 822        dprint("Common columns with different dtypes (after dates):")
 823        pprint(common_diff_dtypes)
 824
 825    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
 826        if debug:
 827            dprint(
 828                "The incoming DataFrame has mostly the same types, skipping enforcement."
 829                + "The only detected difference was in the following datetime columns.\n"
 830                + "    Timezone information may be stripped."
 831            )
 832            pprint(detected_dt_cols)
 833        return df
 834
 835    for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items():
 836        previous_typ = common_dtypes[col]
 837        mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ))
 838        explicitly_float = are_dtypes_equal(dtypes.get(col, 'object'), 'float')
 839        explicitly_numeric = dtypes.get(col, 'numeric') == 'numeric'
 840        cast_to_numeric = (
 841            explicitly_numeric
 842            or col in df_numeric_cols
 843            or (mixed_numeric_types and not explicitly_float)
 844        ) and coerce_numeric
 845        if cast_to_numeric:
 846            common_dtypes[col] = attempt_cast_to_numeric
 847            common_diff_dtypes[col] = attempt_cast_to_numeric
 848
 849    for d in common_diff_dtypes:
 850        t = common_dtypes[d]
 851        if debug:
 852            dprint(f"Casting column {d} to dtype {t}.")
 853        try:
 854            df[d] = (
 855                df[d].apply(t)
 856                if callable(t)
 857                else df[d].astype(t)
 858            )
 859        except Exception as e:
 860            if debug:
 861                dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}")
 862            if 'int' in str(t).lower():
 863                try:
 864                    df[d] = df[d].astype('float64').astype(t)
 865                except Exception:
 866                    if debug:
 867                        dprint(f"Was unable to convert to float then {t}.")
 868    return df
 869
 870
 871def get_datetime_bound_from_df(
 872    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]],
 873    datetime_column: str,
 874    minimum: bool = True,
 875) -> Union[int, datetime, None]:
 876    """
 877    Return the minimum or maximum datetime (or integer) from a DataFrame.
 878
 879    Parameters
 880    ----------
 881    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]
 882        The DataFrame, list, or dict which contains the range axis.
 883
 884    datetime_column: str
 885        The name of the datetime (or int) column.
 886
 887    minimum: bool
 888        Whether to return the minimum (default) or maximum value.
 889
 890    Returns
 891    -------
 892    The minimum or maximum datetime value in the dataframe, or `None`.
 893    """
 894    if df is None:
 895        return None
 896    if not datetime_column:
 897        return None
 898
 899    def compare(a, b):
 900        if a is None:
 901            return b
 902        if b is None:
 903            return a
 904        if minimum:
 905            return a if a < b else b
 906        return a if a > b else b
 907
 908    if isinstance(df, list):
 909        if len(df) == 0:
 910            return None
 911        best_yet = df[0].get(datetime_column, None)
 912        for doc in df:
 913            val = doc.get(datetime_column, None)
 914            best_yet = compare(best_yet, val)
 915        return best_yet
 916
 917    if isinstance(df, dict):
 918        if datetime_column not in df:
 919            return None
 920        best_yet = df[datetime_column][0]
 921        for val in df[datetime_column]:
 922            best_yet = compare(best_yet, val)
 923        return best_yet
 924
 925    if 'DataFrame' in str(type(df)):
 926        from meerschaum.utils.dtypes import are_dtypes_equal
 927        pandas = mrsm.attempt_import('pandas')
 928        is_dask = 'dask' in df.__module__
 929
 930        if datetime_column not in df.columns:
 931            return None
 932
 933        dt_val = (
 934            df[datetime_column].min(skipna=True)
 935            if minimum else df[datetime_column].max(skipna=True)
 936        )
 937        if is_dask and dt_val is not None:
 938            dt_val = dt_val.compute()
 939
 940        return (
 941            pandas.to_datetime(dt_val).to_pydatetime()
 942            if are_dtypes_equal(str(type(dt_val)), 'datetime')
 943            else (dt_val if dt_val is not pandas.NA else None)
 944        )
 945
 946    return None
 947
 948
 949def get_unique_index_values(
 950    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]],
 951    indices: List[str],
 952) -> Dict[str, List[Any]]:
 953    """
 954    Return a dictionary of the unique index values in a DataFrame.
 955
 956    Parameters
 957    ----------
 958    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]
 959        The dataframe (or list or dict) which contains index values.
 960
 961    indices: List[str]
 962        The list of index columns.
 963
 964    Returns
 965    -------
 966    A dictionary mapping indices to unique values.
 967    """
 968    if df is None:
 969        return {}
 970    if 'dataframe' in str(type(df)).lower():
 971        pandas = mrsm.attempt_import('pandas')
 972        return {
 973            col: list({
 974                (val if val is not pandas.NA else None)
 975                for val in df[col].unique()
 976            })
 977            for col in indices
 978            if col in df.columns
 979        }
 980
 981    unique_indices = defaultdict(lambda: set())
 982    if isinstance(df, list):
 983        for doc in df:
 984            for index in indices:
 985                if index in doc:
 986                    unique_indices[index].add(doc[index])
 987
 988    elif isinstance(df, dict):
 989        for index in indices:
 990            if index in df:
 991                unique_indices[index] = unique_indices[index].union(set(df[index]))
 992
 993    return {key: list(val) for key, val in unique_indices.items()}
 994
 995
 996def df_is_chunk_generator(df: Any) -> bool:
 997    """
 998    Determine whether to treat `df` as a chunk generator.
 999
1000    Note this should only be used in a context where generators are expected,
1001    as it will return `True` for any iterable.
1002
1003    Parameters
1004    ----------
1005    The DataFrame or chunk generator to evaluate.
1006
1007    Returns
1008    -------
1009    A `bool` indicating whether to treat `df` as a generator.
1010    """
1011    return (
1012        not isinstance(df, (dict, list, str))
1013        and 'DataFrame' not in str(type(df))
1014        and isinstance(df, (Generator, Iterable, Iterator))
1015    )
1016
1017
1018def chunksize_to_npartitions(chunksize: Optional[int]) -> int:
1019    """
1020    Return the Dask `npartitions` value for a given `chunksize`.
1021    """
1022    if chunksize == -1:
1023        from meerschaum.config import get_config
1024        chunksize = get_config('system', 'connectors', 'sql', 'chunksize')
1025    if chunksize is None:
1026        return 1
1027    return -1 * chunksize
1028
1029
1030def df_from_literal(
1031    pipe: Optional[mrsm.Pipe] = None,
1032    literal: str = None,
1033    debug: bool = False
1034) -> 'pd.DataFrame':
1035    """
1036    Construct a dataframe from a literal value, using the pipe's datetime and value column names.
1037
1038    Parameters
1039    ----------
1040    pipe: Optional['meerschaum.Pipe'], default None
1041        The pipe which will consume the literal value.
1042
1043    Returns
1044    -------
1045    A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
1046    and the literal as the value.
1047    """
1048    from meerschaum.utils.packages import import_pandas
1049    from meerschaum.utils.warnings import error, warn
1050    from meerschaum.utils.debug import dprint
1051
1052    if pipe is None or literal is None:
1053        error("Please provide a Pipe and a literal value")
1054    ### this will raise an error if the columns are undefined
1055    dt_name, val_name = pipe.get_columns('datetime', 'value')
1056
1057    val = literal
1058    if isinstance(literal, str):
1059        if debug:
1060            dprint(f"Received literal string: '{literal}'")
1061        import ast
1062        try:
1063            val = ast.literal_eval(literal)
1064        except Exception as e:
1065            warn(
1066                "Failed to parse value from string:\n" + f"{literal}" +
1067                "\n\nWill cast as a string instead."\
1068            )
1069            val = literal
1070
1071    from datetime import datetime, timezone
1072    now = datetime.now(timezone.utc).replace(tzinfo=None)
1073
1074    pd = import_pandas()
1075    return pd.DataFrame({dt_name: [now], val_name: [val]})
1076
1077
1078def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]:
1079    """
1080    Return the first valid Dask DataFrame partition (if possible).
1081    """
1082    pdf = None
1083    for partition in ddf.partitions:
1084        try:
1085            pdf = partition.compute()
1086        except Exception as e:
1087            continue
1088        if len(pdf) > 0:
1089            return pdf
1090    _ = mrsm.attempt_import('partd', lazy=False)
1091    return ddf.compute()
1092
1093
1094def query_df(
1095    df: 'pd.DataFrame',
1096    params: Optional[Dict[str, Any]] = None,
1097    begin: Union[datetime, int, None] = None,
1098    end: Union[datetime, int, None] = None,
1099    datetime_column: Optional[str] = None,
1100    select_columns: Optional[List[str]] = None,
1101    omit_columns: Optional[List[str]] = None,
1102    inplace: bool = False,
1103    reset_index: bool = False,
1104    coerce_types: bool = False,
1105    debug: bool = False,
1106) -> 'pd.DataFrame':
1107    """
1108    Query the dataframe with the params dictionary.
1109
1110    Parameters
1111    ----------
1112    df: pd.DataFrame
1113        The DataFrame to query against.
1114
1115    params: Optional[Dict[str, Any]], default None
1116        The parameters dictionary to use for the query.
1117
1118    begin: Union[datetime, int, None], default None
1119        If `begin` and `datetime_column` are provided, only return rows with a timestamp
1120        greater than or equal to this value.
1121
1122    end: Union[datetime, int, None], default None
1123        If `begin` and `datetime_column` are provided, only return rows with a timestamp
1124        less than this value.
1125
1126    datetime_column: Optional[str], default None
1127        A `datetime_column` must be provided to use `begin` and `end`.
1128
1129    select_columns: Optional[List[str]], default None
1130        If provided, only return these columns.
1131
1132    omit_columns: Optional[List[str]], default None
1133        If provided, do not include these columns in the result.
1134
1135    inplace: bool, default False
1136        If `True`, modify the DataFrame inplace rather than creating a new DataFrame.
1137
1138    reset_index: bool, default False
1139        If `True`, reset the index in the resulting DataFrame.
1140
1141    coerce_types: bool, default False
1142        If `True`, cast the dataframe and parameters as strings before querying.
1143
1144    Returns
1145    -------
1146    A Pandas DataFrame query result.
1147    """
1148
1149    def _process_select_columns(_df):
1150        if not select_columns:
1151            return
1152        for col in list(_df.columns):
1153            if col not in select_columns:
1154                del _df[col]
1155
1156    def _process_omit_columns(_df):
1157        if not omit_columns:
1158            return
1159        for col in list(_df.columns):
1160            if col in omit_columns:
1161                del _df[col]
1162
1163    if not params and not begin and not end:
1164        if not inplace:
1165            df = df.copy()
1166        _process_select_columns(df)
1167        _process_omit_columns(df)
1168        return df
1169
1170    from meerschaum.utils.debug import dprint
1171    from meerschaum.utils.misc import get_in_ex_params
1172    from meerschaum.utils.warnings import warn
1173    from meerschaum.utils.dtypes import are_dtypes_equal, value_is_null
1174    dateutil_parser = mrsm.attempt_import('dateutil.parser')
1175    pandas = mrsm.attempt_import('pandas')
1176    NA = pandas.NA
1177
1178    if params:
1179        params = params.copy()
1180        for key, val in {k: v for k, v in params.items()}.items():
1181            if isinstance(val, (list, tuple)):
1182                if None in val:
1183                    val = [item for item in val if item is not None] + [NA]
1184                    params[key] = val
1185                if coerce_types:
1186                    params[key] = [str(x) for x in val]
1187            else:
1188                if value_is_null(val):
1189                    val = NA
1190                    params[key] = NA
1191                if coerce_types:
1192                    params[key] = str(val)
1193
1194    dtypes = {col: str(typ) for col, typ in df.dtypes.items()}
1195
1196    if inplace:
1197        df.fillna(NA, inplace=True)
1198    else:
1199        df = df.infer_objects().fillna(NA)
1200
1201    if isinstance(begin, str):
1202        begin = dateutil_parser.parse(begin)
1203    if isinstance(end, str):
1204        end = dateutil_parser.parse(end)
1205
1206    if begin is not None or end is not None:
1207        if not datetime_column or datetime_column not in df.columns:
1208            warn(
1209                f"The datetime column '{datetime_column}' is not present in the Dataframe, "
1210                + "ignoring begin and end...",
1211            )
1212            begin, end = None, None
1213
1214    if debug:
1215        dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}")
1216
1217    if datetime_column and (begin is not None or end is not None):
1218        if debug:
1219            dprint("Checking for datetime column compatability.")
1220
1221        from meerschaum.utils.dtypes import are_dtypes_equal, coerce_timezone
1222        df_is_dt = are_dtypes_equal(str(df.dtypes[datetime_column]), 'datetime')
1223        begin_is_int = are_dtypes_equal(str(type(begin)), 'int')
1224        end_is_int = are_dtypes_equal(str(type(end)), 'int')
1225
1226        if df_is_dt:
1227            df_tz = (
1228                getattr(df[datetime_column].dt, 'tz', None)
1229                if hasattr(df[datetime_column], 'dt')
1230                else None
1231            )
1232
1233            if begin_is_int:
1234                begin = datetime.fromtimestamp(int(begin), timezone.utc).replace(tzinfo=None)
1235                if debug:
1236                    dprint(f"`begin` will be cast to '{begin}'.")
1237            if end_is_int:
1238                end = datetime.fromtimestamp(int(end), timezone.utc).replace(tzinfo=None)
1239                if debug:
1240                    dprint(f"`end` will be cast to '{end}'.")
1241
1242            begin_tz = begin.tzinfo if begin is not None else None
1243            end_tz = end.tzinfo if end is not None else None
1244
1245            if begin_tz is not None or end_tz is not None or df_tz is not None:
1246                begin = coerce_timezone(begin)
1247                end = coerce_timezone(end)
1248                if df_tz is not None:
1249                    if debug:
1250                        dprint(f"Casting column '{datetime_column}' to UTC...")
1251                    df[datetime_column] = coerce_timezone(df[datetime_column])
1252                dprint(f"Using datetime bounds:\n{begin=}\n{end=}")
1253
1254    in_ex_params = get_in_ex_params(params)
1255
1256    masks = [
1257        (
1258            (df[datetime_column] >= begin)
1259            if begin is not None and datetime_column
1260            else True
1261        ) & (
1262            (df[datetime_column] < end)
1263            if end is not None and datetime_column
1264            else True
1265        )
1266    ]
1267
1268    masks.extend([
1269        (
1270            (
1271                (df[col] if not coerce_types else df[col].astype(str)).isin(in_vals)
1272                if in_vals
1273                else True
1274            ) & (
1275                ~(df[col] if not coerce_types else df[col].astype(str)).isin(ex_vals)
1276                if ex_vals
1277                else True
1278            )
1279        )
1280        for col, (in_vals, ex_vals) in in_ex_params.items()
1281        if col in df.columns
1282    ])
1283    query_mask = masks[0]
1284    for mask in masks[1:]:
1285        query_mask = query_mask & mask
1286
1287    original_cols = df.columns
1288
1289    ### NOTE: We must cast bool columns to `boolean[pyarrow]`
1290    ###       to allow for `<NA>` values.
1291    bool_cols = [
1292        col
1293        for col, typ in df.dtypes.items()
1294        if are_dtypes_equal(str(typ), 'bool')
1295    ]
1296    for col in bool_cols:
1297        df[col] = df[col].astype('boolean[pyarrow]')
1298    df['__mrsm_mask'] = query_mask.astype('boolean[pyarrow]')
1299
1300    if inplace:
1301        df.where(query_mask, other=NA, inplace=True)
1302        df.dropna(how='all', inplace=True)
1303        result_df = df
1304    else:
1305        result_df = df.where(query_mask, other=NA)
1306        result_df.dropna(how='all', inplace=True)
1307
1308    if '__mrsm_mask' in df.columns:
1309        del df['__mrsm_mask']
1310    if '__mrsm_mask' in result_df.columns:
1311        del result_df['__mrsm_mask']
1312
1313    if reset_index:
1314        result_df.reset_index(drop=True, inplace=True)
1315
1316    result_df = enforce_dtypes(
1317        result_df,
1318        dtypes,
1319        safe_copy=False,
1320        debug=debug,
1321        coerce_numeric=False,
1322    )
1323
1324    if select_columns == ['*']:
1325        select_columns = None
1326
1327    if not select_columns and not omit_columns:
1328        return result_df[original_cols]
1329
1330    _process_select_columns(result_df)
1331    _process_omit_columns(result_df)
1332
1333    return result_df
1334
1335
1336def to_json(
1337    df: 'pd.DataFrame',
1338    safe_copy: bool = True,
1339    orient: str = 'records',
1340    date_format: str = 'iso',
1341    date_unit: str = 'us',
1342    **kwargs: Any
1343) -> str:
1344    """
1345    Serialize the given dataframe as a JSON string.
1346
1347    Parameters
1348    ----------
1349    df: pd.DataFrame
1350        The DataFrame to be serialized.
1351
1352    safe_copy: bool, default True
1353        If `False`, modify the DataFrame inplace.
1354
1355    date_format: str, default 'iso'
1356        The default format for timestamps.
1357
1358    date_unit: str, default 'us'
1359        The precision of the timestamps.
1360
1361    Returns
1362    -------
1363    A JSON string.
1364    """
1365    from meerschaum.utils.packages import import_pandas
1366    pd = import_pandas()
1367    uuid_cols = get_uuid_cols(df)
1368    if uuid_cols and safe_copy:
1369        df = df.copy()
1370    for col in uuid_cols:
1371        df[col] = df[col].astype(str)
1372    return df.infer_objects(copy=False).fillna(pd.NA).to_json(
1373        date_format=date_format,
1374        date_unit=date_unit,
1375        orient=orient,
1376        **kwargs
1377    )
def add_missing_cols_to_df( df: pandas.core.frame.DataFrame, dtypes: Dict[str, Any]) -> pandas.core.frame.DataFrame:
24def add_missing_cols_to_df(
25    df: 'pd.DataFrame',
26    dtypes: Dict[str, Any],
27) -> 'pd.DataFrame':
28    """
29    Add columns from the dtypes dictionary as null columns to a new DataFrame.
30
31    Parameters
32    ----------
33    df: pd.DataFrame
34        The dataframe we should copy and add null columns.
35
36    dtypes:
37        The data types dictionary which may contain keys not present in `df.columns`.
38
39    Returns
40    -------
41    A new `DataFrame` with the keys from `dtypes` added as null columns.
42    If `df.dtypes` is the same as `dtypes`, then return a reference to `df`.
43    NOTE: This will not ensure that dtypes are enforced!
44
45    Examples
46    --------
47    >>> import pandas as pd
48    >>> df = pd.DataFrame([{'a': 1}])
49    >>> dtypes = {'b': 'Int64'}
50    >>> add_missing_cols_to_df(df, dtypes)
51          a  b
52       0  1  <NA>
53    >>> add_missing_cols_to_df(df, dtypes).dtypes
54    a    int64
55    b    Int64
56    dtype: object
57    >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes
58    a    int64
59    dtype: object
60    >>> 
61    """
62    if set(df.columns) == set(dtypes):
63        return df
64
65    from meerschaum.utils.packages import attempt_import
66    from meerschaum.utils.dtypes import to_pandas_dtype
67    pandas = attempt_import('pandas')
68
69    def build_series(dtype: str):
70        return pandas.Series([], dtype=to_pandas_dtype(dtype))
71
72    assign_kwargs = {
73        str(col): build_series(str(typ))
74        for col, typ in dtypes.items()
75        if col not in df.columns
76    }
77    df_with_cols = df.assign(**assign_kwargs)
78    for col in assign_kwargs:
79        df_with_cols[col] = df_with_cols[col].fillna(pandas.NA)
80    return df_with_cols

Add columns from the dtypes dictionary as null columns to a new DataFrame.

Parameters
  • df (pd.DataFrame): The dataframe we should copy and add null columns.
  • dtypes:: The data types dictionary which may contain keys not present in df.columns.
Returns
  • A new DataFrame with the keys from dtypes added as null columns.
  • If df.dtypes is the same as dtypes, then return a reference to df.
  • NOTE (This will not ensure that dtypes are enforced!):
Examples
>>> import pandas as pd
>>> df = pd.DataFrame([{'a': 1}])
>>> dtypes = {'b': 'Int64'}
>>> add_missing_cols_to_df(df, dtypes)
      a  b
   0  1  <NA>
>>> add_missing_cols_to_df(df, dtypes)meerschaum.utils.dtypes
a    int64
b    Int64
dtype: object
>>> add_missing_cols_to_df(df, {'a': 'object'})meerschaum.utils.dtypes
a    int64
dtype: object
>>>
def filter_unseen_df( old_df: pandas.core.frame.DataFrame, new_df: pandas.core.frame.DataFrame, safe_copy: bool = True, dtypes: Optional[Dict[str, Any]] = None, include_unchanged_columns: bool = False, debug: bool = False) -> pandas.core.frame.DataFrame:
 83def filter_unseen_df(
 84    old_df: 'pd.DataFrame',
 85    new_df: 'pd.DataFrame',
 86    safe_copy: bool = True,
 87    dtypes: Optional[Dict[str, Any]] = None,
 88    include_unchanged_columns: bool = False,
 89    debug: bool = False,
 90) -> 'pd.DataFrame':
 91    """
 92    Left join two DataFrames to find the newest unseen data.
 93
 94    Parameters
 95    ----------
 96    old_df: 'pd.DataFrame'
 97        The original (target) dataframe. Acts as a filter on the `new_df`.
 98        
 99    new_df: 'pd.DataFrame'
100        The fetched (source) dataframe. Rows that are contained in `old_df` are removed.
101
102    safe_copy: bool, default True
103        If `True`, create a copy before comparing and modifying the dataframes.
104        Setting to `False` may mutate the DataFrames.
105        
106    dtypes: Optional[Dict[str, Any]], default None
107        Optionally specify the datatypes of the dataframe.
108
109    include_unchanged_columns: bool, default False
110        If `True`, include columns which haven't changed on rows which have changed.
111
112    debug: bool, default False
113        Verbosity toggle.
114
115    Returns
116    -------
117    A pandas dataframe of the new, unseen rows in `new_df`.
118
119    Examples
120    --------
121    ```python
122    >>> import pandas as pd
123    >>> df1 = pd.DataFrame({'a': [1,2]})
124    >>> df2 = pd.DataFrame({'a': [2,3]})
125    >>> filter_unseen_df(df1, df2)
126       a
127    0  3
128
129    ```
130
131    """
132    if old_df is None:
133        return new_df
134
135    if safe_copy:
136        old_df = old_df.copy()
137        new_df = new_df.copy()
138
139    import json
140    import functools
141    import traceback
142    from decimal import Decimal
143    from uuid import UUID
144    from meerschaum.utils.warnings import warn
145    from meerschaum.utils.packages import import_pandas, attempt_import
146    from meerschaum.utils.dtypes import (
147        to_pandas_dtype,
148        are_dtypes_equal,
149        attempt_cast_to_numeric,
150        attempt_cast_to_uuid,
151        coerce_timezone,
152    )
153    pd = import_pandas(debug=debug)
154    is_dask = 'dask' in new_df.__module__
155    if is_dask:
156        pandas = attempt_import('pandas')
157        _ = attempt_import('partd', lazy=False)
158        dd = attempt_import('dask.dataframe')
159        merge = dd.merge
160        NA = pandas.NA
161    else:
162        merge = pd.merge
163        NA = pd.NA
164
165    new_df_dtypes = dict(new_df.dtypes)
166    old_df_dtypes = dict(old_df.dtypes)
167
168    same_cols = set(new_df.columns) == set(old_df.columns)
169    if not same_cols:
170        new_df = add_missing_cols_to_df(new_df, old_df_dtypes)
171        old_df = add_missing_cols_to_df(old_df, new_df_dtypes)
172
173        new_types_missing_from_old = {
174            col: typ
175            for col, typ in new_df_dtypes.items()
176            if col not in old_df_dtypes
177        }
178        old_types_missing_from_new = {
179            col: typ
180            for col, typ in new_df_dtypes.items()
181            if col not in old_df_dtypes
182        }
183        old_df_dtypes.update(new_types_missing_from_old)
184        new_df_dtypes.update(old_types_missing_from_new)
185
186    ### Edge case: two empty lists cast to DFs.
187    elif len(new_df.columns) == 0:
188        return new_df
189
190    try:
191        ### Order matters when checking equality.
192        new_df = new_df[old_df.columns]
193
194    except Exception as e:
195        warn(
196            "Was not able to cast old columns onto new DataFrame. " +
197            f"Are both DataFrames the same shape? Error:\n{e}",
198            stacklevel = 3,
199        )
200        return new_df[list(new_df_dtypes.keys())]
201
202    ### assume the old_df knows what it's doing, even if it's technically wrong.
203    if dtypes is None:
204        dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()}
205
206    dtypes = {
207        col: to_pandas_dtype(typ)
208        for col, typ in dtypes.items()
209        if col in new_df_dtypes and col in old_df_dtypes
210    }
211    for col, typ in new_df_dtypes.items():
212        if col not in dtypes:
213            dtypes[col] = typ
214
215    dt_dtypes = {
216        col: typ
217        for col, typ in dtypes.items()
218        if are_dtypes_equal(typ, 'datetime')
219    }
220    non_dt_dtypes = {
221        col: typ
222        for col, typ in dtypes.items()
223        if col not in dt_dtypes
224    }
225
226    cast_non_dt_cols = True
227    try:
228        new_df = new_df.astype(non_dt_dtypes)
229        cast_non_dt_cols = False
230    except Exception as e:
231        warn(
232            f"Was not able to cast the new DataFrame to the given dtypes.\n{e}"
233        )
234
235    cast_dt_cols = True
236    try:
237        for col, typ in dt_dtypes.items():
238            tz = typ.split(',')[-1].strip() if ',' in typ else None
239            new_df[col] = coerce_timezone(pd.to_datetime(new_df[col], utc=True))
240        cast_dt_cols = False
241    except Exception as e:
242        warn(f"Could not cast datetime columns:\n{e}")
243
244    cast_cols = cast_dt_cols or cast_non_dt_cols
245
246    new_numeric_cols_existing = get_numeric_cols(new_df)
247    old_numeric_cols = get_numeric_cols(old_df)
248    for col, typ in {k: v for k, v in dtypes.items()}.items():
249        if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')):
250            new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float')
251            new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int')
252            new_is_numeric = col in new_numeric_cols_existing
253            old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float')
254            old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int')
255            old_is_numeric = col in old_numeric_cols
256
257            if (
258                (new_is_float or new_is_int or new_is_numeric)
259                and
260                (old_is_float or old_is_int or old_is_numeric)
261            ):
262                dtypes[col] = attempt_cast_to_numeric
263                cast_cols = True
264                continue
265
266            ### Fallback to object if the types don't match.
267            warn(
268                f"Detected different types for '{col}' "
269                + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), "
270                + "falling back to 'object'..."
271            )
272            dtypes[col] = 'object'
273            cast_cols = True
274
275    if cast_cols:
276        for col, dtype in dtypes.items():
277            if col in new_df.columns:
278                try:
279                    new_df[col] = (
280                        new_df[col].astype(dtype)
281                        if not callable(dtype)
282                        else new_df[col].apply(dtype)
283                    )
284                except Exception as e:
285                    warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}")
286
287    serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str)
288    new_json_cols = get_json_cols(new_df)
289    old_json_cols = get_json_cols(old_df)
290    json_cols = set(new_json_cols + old_json_cols)
291    for json_col in old_json_cols:
292        old_df[json_col] = old_df[json_col].apply(serializer)
293    for json_col in new_json_cols:
294        new_df[json_col] = new_df[json_col].apply(serializer)
295
296    new_numeric_cols = get_numeric_cols(new_df)
297    numeric_cols = set(new_numeric_cols + old_numeric_cols)
298    for numeric_col in old_numeric_cols:
299        old_df[numeric_col] = old_df[numeric_col].apply(
300            lambda x: f'{x:f}' if isinstance(x, Decimal) else x
301        )
302    for numeric_col in new_numeric_cols:
303        new_df[numeric_col] = new_df[numeric_col].apply(
304            lambda x: f'{x:f}' if isinstance(x, Decimal) else x
305        )
306
307    old_dt_cols = [
308        col
309        for col, typ in old_df.dtypes.items()
310        if are_dtypes_equal(str(typ), 'datetime')
311    ]
312    for col in old_dt_cols:
313        old_df[col] = coerce_timezone(old_df[col])
314
315    new_dt_cols = [
316        col
317        for col, typ in old_df.dtypes.items()
318        if are_dtypes_equal(str(typ), 'datetime')
319    ]
320    for col in new_dt_cols:
321        new_df[col] = coerce_timezone(new_df[col])
322
323    old_uuid_cols = get_uuid_cols(old_df)
324    new_uuid_cols = get_uuid_cols(new_df)
325    uuid_cols = set(new_uuid_cols + old_uuid_cols)
326    joined_df = merge(
327        new_df.infer_objects(copy=False).fillna(NA),
328        old_df.infer_objects(copy=False).fillna(NA),
329        how='left',
330        on=None,
331        indicator=True,
332    )
333    changed_rows_mask = (joined_df['_merge'] == 'left_only')
334    new_cols = list(new_df_dtypes)
335    delta_df = joined_df[new_cols][changed_rows_mask].reset_index(drop=True)
336
337    for json_col in json_cols:
338        if json_col not in delta_df.columns:
339            continue
340        try:
341            delta_df[json_col] = delta_df[json_col].apply(json.loads)
342        except Exception:
343            warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}")
344
345    for numeric_col in numeric_cols:
346        if numeric_col not in delta_df.columns:
347            continue
348        try:
349            delta_df[numeric_col] = delta_df[numeric_col].apply(attempt_cast_to_numeric)
350        except Exception:
351            warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}")
352
353    for uuid_col in uuid_cols:
354        if uuid_col not in delta_df.columns:
355            continue
356        try:
357            delta_df[uuid_col] = delta_df[uuid_col].apply(attempt_cast_to_uuid)
358        except Exception:
359            warn(f"Unable to parse numeric column '{uuid_col}':\n{traceback.format_exc()}")
360
361    return delta_df

Left join two DataFrames to find the newest unseen data.

Parameters
  • old_df ('pd.DataFrame'): The original (target) dataframe. Acts as a filter on the new_df.
  • new_df ('pd.DataFrame'): The fetched (source) dataframe. Rows that are contained in old_df are removed.
  • safe_copy (bool, default True): If True, create a copy before comparing and modifying the dataframes. Setting to False may mutate the DataFrames.
  • dtypes (Optional[Dict[str, Any]], default None): Optionally specify the datatypes of the dataframe.
  • include_unchanged_columns (bool, default False): If True, include columns which haven't changed on rows which have changed.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pandas dataframe of the new, unseen rows in new_df.
Examples
>>> import pandas as pd
>>> df1 = pd.DataFrame({'a': [1,2]})
>>> df2 = pd.DataFrame({'a': [2,3]})
>>> filter_unseen_df(df1, df2)
   a
0  3
def parse_df_datetimes( df: pandas.core.frame.DataFrame, ignore_cols: Optional[Iterable[str]] = None, chunksize: Optional[int] = None, dtype_backend: str = 'numpy_nullable', debug: bool = False) -> pandas.core.frame.DataFrame:
364def parse_df_datetimes(
365    df: 'pd.DataFrame',
366    ignore_cols: Optional[Iterable[str]] = None,
367    chunksize: Optional[int] = None,
368    dtype_backend: str = 'numpy_nullable',
369    debug: bool = False,
370) -> 'pd.DataFrame':
371    """
372    Parse a pandas DataFrame for datetime columns and cast as datetimes.
373
374    Parameters
375    ----------
376    df: pd.DataFrame
377        The pandas DataFrame to parse.
378
379    ignore_cols: Optional[Iterable[str]], default None
380        If provided, do not attempt to coerce these columns as datetimes.
381
382    chunksize: Optional[int], default None
383        If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe.
384
385    dtype_backend: str, default 'numpy_nullable'
386        If `df` is not a DataFrame and new one needs to be constructed,
387        use this as the datatypes backend.
388        Accepted values are 'numpy_nullable' and 'pyarrow'.
389        
390    debug: bool, default False
391        Verbosity toggle.
392
393    Returns
394    -------
395    A new pandas DataFrame with the determined datetime columns
396    (usually ISO strings) cast as datetimes.
397
398    Examples
399    --------
400    ```python
401    >>> import pandas as pd
402    >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 
403    >>> df.dtypes
404    a    object
405    dtype: object
406    >>> df = parse_df_datetimes(df)
407    >>> df.dtypes
408    a    datetime64[ns]
409    dtype: object
410
411    ```
412
413    """
414    from meerschaum.utils.packages import import_pandas, attempt_import
415    from meerschaum.utils.debug import dprint
416    from meerschaum.utils.warnings import warn
417    from meerschaum.utils.misc import items_str
418    import traceback
419    pd = import_pandas()
420    pandas = attempt_import('pandas')
421    pd_name = pd.__name__
422    using_dask = 'dask' in pd_name
423    df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__)
424    dask_dataframe = None
425    if using_dask or df_is_dask:
426        npartitions = chunksize_to_npartitions(chunksize)
427        dask_dataframe = attempt_import('dask.dataframe')
428
429    ### if df is a dict, build DataFrame
430    if isinstance(df, pandas.DataFrame):
431        pdf = df
432    elif df_is_dask and isinstance(df, dask_dataframe.DataFrame):
433        pdf = get_first_valid_dask_partition(df)
434    else:
435        if debug:
436            dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...")
437
438        if using_dask:
439            if isinstance(df, list):
440                keys = set()
441                for doc in df:
442                    for key in doc:
443                        keys.add(key)
444                df = pd.DataFrame.from_dict(
445                    {
446                        k: [
447                            doc.get(k, None)
448                            for doc in df
449                        ] for k in keys
450                    },
451                    npartitions = npartitions,
452                )
453            elif isinstance(df, dict):
454                df = pd.DataFrame.from_dict(df, npartitions=npartitions)
455            elif 'pandas.core.frame.DataFrame' in str(type(df)):
456                df = pd.from_pandas(df, npartitions=npartitions)
457            else:
458                raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.")
459            pandas = attempt_import('pandas')
460            pdf = get_first_valid_dask_partition(df)
461
462        else:
463            df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend)
464            pdf = df
465
466    ### skip parsing if DataFrame is empty
467    if len(pdf) == 0:
468        if debug:
469            dprint(f"df is empty. Returning original DataFrame without casting datetime columns...")
470        return df
471
472    ignore_cols = set(
473        (ignore_cols or []) + [
474            col
475            for col, dtype in pdf.dtypes.items() 
476            if 'datetime' in str(dtype)
477        ]
478    )
479    cols_to_inspect = [col for col in pdf.columns if col not in ignore_cols]
480
481    if len(cols_to_inspect) == 0:
482        if debug:
483            dprint(f"All columns are ignored, skipping datetime detection...")
484        return df
485
486    ### apply regex to columns to determine which are ISO datetimes
487    iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+'
488    dt_mask = pdf[cols_to_inspect].astype(str).apply(
489        lambda s: s.str.match(iso_dt_regex).all()
490    )
491
492    ### list of datetime column names
493    datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]]
494    if not datetime_cols:
495        if debug:
496            dprint("No columns detected as datetimes, returning...")
497        return df
498
499    if debug:
500        dprint("Converting columns to datetimes: " + str(datetime_cols))
501
502    try:
503        if not using_dask:
504            df[datetime_cols] = df[datetime_cols].apply(pd.to_datetime, utc=True)
505        else:
506            df[datetime_cols] = df[datetime_cols].apply(
507                pd.to_datetime,
508                utc=True,
509                axis=1,
510                meta={
511                    col: 'datetime64[ns]'
512                    for col in datetime_cols
513                }
514            )
515    except Exception:
516        warn(
517            f"Unable to apply `pd.to_datetime` to {items_str(datetime_cols)}:\n"
518            + f"{traceback.format_exc()}"
519        )
520
521    for dt in datetime_cols:
522        try:
523            df[dt] = df[dt].dt.tz_localize(None)
524        except Exception:
525            warn(f"Unable to convert column '{dt}' to naive datetime:\n{traceback.format_exc()}")
526
527    return df

Parse a pandas DataFrame for datetime columns and cast as datetimes.

Parameters
  • df (pd.DataFrame): The pandas DataFrame to parse.
  • ignore_cols (Optional[Iterable[str]], default None): If provided, do not attempt to coerce these columns as datetimes.
  • chunksize (Optional[int], default None): If the pandas implementation is 'dask', use this chunksize for the distributed dataframe.
  • dtype_backend (str, default 'numpy_nullable'): If df is not a DataFrame and new one needs to be constructed, use this as the datatypes backend. Accepted values are 'numpy_nullable' and 'pyarrow'.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A new pandas DataFrame with the determined datetime columns
  • (usually ISO strings) cast as datetimes.
Examples
>>> import pandas as pd
>>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 
>>> df.dtypes
a    object
dtype: object
>>> df = parse_df_datetimes(df)
>>> df.dtypes
a    datetime64[ns]
dtype: object
def get_unhashable_cols(df: pandas.core.frame.DataFrame) -> List[str]:
530def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]:
531    """
532    Get the columns which contain unhashable objects from a Pandas DataFrame.
533
534    Parameters
535    ----------
536    df: pd.DataFrame
537        The DataFrame which may contain unhashable objects.
538
539    Returns
540    -------
541    A list of columns.
542    """
543    if df is None:
544        return []
545    if len(df) == 0:
546        return []
547
548    is_dask = 'dask' in df.__module__
549    if is_dask:
550        from meerschaum.utils.packages import attempt_import
551        pandas = attempt_import('pandas')
552        df = pandas.DataFrame(get_first_valid_dask_partition(df))
553    return [
554        col for col, val in df.iloc[0].items()
555        if not isinstance(val, Hashable)
556    ]

Get the columns which contain unhashable objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
  • A list of columns.
def get_json_cols(df: pandas.core.frame.DataFrame) -> List[str]:
559def get_json_cols(df: 'pd.DataFrame') -> List[str]:
560    """
561    Get the columns which contain unhashable objects from a Pandas DataFrame.
562
563    Parameters
564    ----------
565    df: pd.DataFrame
566        The DataFrame which may contain unhashable objects.
567
568    Returns
569    -------
570    A list of columns to be encoded as JSON.
571    """
572    if df is None:
573        return []
574
575    is_dask = 'dask' in df.__module__ if hasattr(df, '__module__') else False
576    if is_dask:
577        df = get_first_valid_dask_partition(df)
578
579    if len(df) == 0:
580        return []
581
582    cols_indices = {
583        col: df[col].first_valid_index()
584        for col in df.columns
585    }
586    return [
587        col
588        for col, ix in cols_indices.items()
589        if (
590            ix is not None
591            and
592            not isinstance(df.loc[ix][col], Hashable)
593        )
594    ]

Get the columns which contain unhashable objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
  • A list of columns to be encoded as JSON.
def get_numeric_cols(df: pandas.core.frame.DataFrame) -> List[str]:
597def get_numeric_cols(df: 'pd.DataFrame') -> List[str]:
598    """
599    Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame.
600
601    Parameters
602    ----------
603    df: pd.DataFrame
604        The DataFrame which may contain decimal objects.
605
606    Returns
607    -------
608    A list of columns to treat as numerics.
609    """
610    if df is None:
611        return []
612    from decimal import Decimal
613    is_dask = 'dask' in df.__module__
614    if is_dask:
615        df = get_first_valid_dask_partition(df)
616
617    if len(df) == 0:
618        return []
619
620    cols_indices = {
621        col: df[col].first_valid_index()
622        for col in df.columns
623    }
624    return [
625        col
626        for col, ix in cols_indices.items()
627        if (
628            ix is not None
629            and
630            isinstance(df.loc[ix][col], Decimal)
631        )
632    ]

Get the columns which contain decimal.Decimal objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain decimal objects.
Returns
  • A list of columns to treat as numerics.
def get_uuid_cols(df: pandas.core.frame.DataFrame) -> List[str]:
635def get_uuid_cols(df: 'pd.DataFrame') -> List[str]:
636    """
637    Get the columns which contain `uuid.UUID` objects from a Pandas DataFrame.
638
639    Parameters
640    ----------
641    df: pd.DataFrame
642        The DataFrame which may contain UUID objects.
643
644    Returns
645    -------
646    A list of columns to treat as numerics.
647    """
648    if df is None:
649        return []
650    from uuid import UUID
651    is_dask = 'dask' in df.__module__
652    if is_dask:
653        df = get_first_valid_dask_partition(df)
654
655    if len(df) == 0:
656        return []
657
658    cols_indices = {
659        col: df[col].first_valid_index()
660        for col in df.columns
661    }
662    return [
663        col
664        for col, ix in cols_indices.items()
665        if (
666            ix is not None
667            and
668            isinstance(df.loc[ix][col], UUID)
669        )
670    ]

Get the columns which contain uuid.UUID objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain UUID objects.
Returns
  • A list of columns to treat as numerics.
def enforce_dtypes( df: pandas.core.frame.DataFrame, dtypes: Dict[str, str], safe_copy: bool = True, coerce_numeric: bool = True, debug: bool = False) -> pandas.core.frame.DataFrame:
673def enforce_dtypes(
674    df: 'pd.DataFrame',
675    dtypes: Dict[str, str],
676    safe_copy: bool = True,
677    coerce_numeric: bool = True,
678    debug: bool = False,
679) -> 'pd.DataFrame':
680    """
681    Enforce the `dtypes` dictionary on a DataFrame.
682
683    Parameters
684    ----------
685    df: pd.DataFrame
686        The DataFrame on which to enforce dtypes.
687
688    dtypes: Dict[str, str]
689        The data types to attempt to enforce on the DataFrame.
690
691    safe_copy: bool, default True
692        If `True`, create a copy before comparing and modifying the dataframes.
693        Setting to `False` may mutate the DataFrames.
694        See `meerschaum.utils.dataframe.filter_unseen_df`.
695
696    coerce_numeric: bool, default True
697        If `True`, convert float and int collisions to numeric.
698
699    debug: bool, default False
700        Verbosity toggle.
701
702    Returns
703    -------
704    The Pandas DataFrame with the types enforced.
705    """
706    import json
707    import traceback
708    from decimal import Decimal
709    from meerschaum.utils.debug import dprint
710    from meerschaum.utils.warnings import warn
711    from meerschaum.utils.formatting import pprint
712    from meerschaum.config.static import STATIC_CONFIG
713    from meerschaum.utils.packages import import_pandas
714    from meerschaum.utils.dtypes import (
715        are_dtypes_equal,
716        to_pandas_dtype,
717        is_dtype_numeric,
718        attempt_cast_to_numeric,
719        attempt_cast_to_uuid,
720        coerce_timezone,
721    )
722    if safe_copy:
723        df = df.copy()
724    if len(df.columns) == 0:
725        if debug:
726            dprint("Incoming DataFrame has no columns. Skipping enforcement...")
727        return df
728
729    pipe_pandas_dtypes = {
730        col: to_pandas_dtype(typ)
731        for col, typ in dtypes.items()
732    }
733    json_cols = [
734        col
735        for col, typ in dtypes.items()
736        if typ == 'json'
737    ]
738    numeric_cols = [
739        col
740        for col, typ in dtypes.items()
741        if typ == 'numeric'
742    ]
743    uuid_cols = [
744        col
745        for col, typ in dtypes.items()
746        if typ == 'uuid'
747    ]
748    df_numeric_cols = get_numeric_cols(df)
749    if debug:
750        dprint("Desired data types:")
751        pprint(dtypes)
752        dprint("Data types for incoming DataFrame:")
753        pprint({_col: str(_typ) for _col, _typ in df.dtypes.items()})
754
755    if json_cols and len(df) > 0:
756        if debug:
757            dprint(f"Checking columns for JSON encoding: {json_cols}")
758        for col in json_cols:
759            if col in df.columns:
760                try:
761                    df[col] = df[col].apply(
762                        (
763                            lambda x: (
764                                json.loads(x)
765                                if isinstance(x, str)
766                                else x
767                            )
768                        )
769                    )
770                except Exception as e:
771                    if debug:
772                        dprint(f"Unable to parse column '{col}' as JSON:\n{e}")
773
774    if numeric_cols:
775        if debug:
776            dprint(f"Checking for numerics: {numeric_cols}")
777        for col in numeric_cols:
778            if col in df.columns:
779                try:
780                    df[col] = df[col].apply(attempt_cast_to_numeric)
781                except Exception as e:
782                    if debug:
783                        dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}")
784
785    if uuid_cols:
786        if debug:
787            dprint(f"Checking for UUIDs: {uuid_cols}")
788        for col in uuid_cols:
789            if col in df.columns:
790                try:
791                    df[col] = df[col].apply(attempt_cast_to_uuid)
792                except Exception as e:
793                    if debug:
794                        dprint(f"Unable to parse column '{col}' as UUID:\n{e}")
795
796    df_dtypes = {c: str(t) for c, t in df.dtypes.items()}
797    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
798        if debug:
799            dprint("Data types match. Exiting enforcement...")
800        return df
801
802    common_dtypes = {}
803    common_diff_dtypes = {}
804    for col, typ in pipe_pandas_dtypes.items():
805        if col in df_dtypes:
806            common_dtypes[col] = typ
807            if not are_dtypes_equal(typ, df_dtypes[col]):
808                common_diff_dtypes[col] = df_dtypes[col]
809
810    if debug:
811        dprint("Common columns with different dtypes:")
812        pprint(common_diff_dtypes)
813
814    detected_dt_cols = {}
815    for col, typ in common_diff_dtypes.items():
816        if 'datetime' in typ and 'datetime' in common_dtypes[col]:
817            df_dtypes[col] = typ
818            detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col])
819    for col in detected_dt_cols:
820        del common_diff_dtypes[col]
821
822    if debug:
823        dprint("Common columns with different dtypes (after dates):")
824        pprint(common_diff_dtypes)
825
826    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
827        if debug:
828            dprint(
829                "The incoming DataFrame has mostly the same types, skipping enforcement."
830                + "The only detected difference was in the following datetime columns.\n"
831                + "    Timezone information may be stripped."
832            )
833            pprint(detected_dt_cols)
834        return df
835
836    for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items():
837        previous_typ = common_dtypes[col]
838        mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ))
839        explicitly_float = are_dtypes_equal(dtypes.get(col, 'object'), 'float')
840        explicitly_numeric = dtypes.get(col, 'numeric') == 'numeric'
841        cast_to_numeric = (
842            explicitly_numeric
843            or col in df_numeric_cols
844            or (mixed_numeric_types and not explicitly_float)
845        ) and coerce_numeric
846        if cast_to_numeric:
847            common_dtypes[col] = attempt_cast_to_numeric
848            common_diff_dtypes[col] = attempt_cast_to_numeric
849
850    for d in common_diff_dtypes:
851        t = common_dtypes[d]
852        if debug:
853            dprint(f"Casting column {d} to dtype {t}.")
854        try:
855            df[d] = (
856                df[d].apply(t)
857                if callable(t)
858                else df[d].astype(t)
859            )
860        except Exception as e:
861            if debug:
862                dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}")
863            if 'int' in str(t).lower():
864                try:
865                    df[d] = df[d].astype('float64').astype(t)
866                except Exception:
867                    if debug:
868                        dprint(f"Was unable to convert to float then {t}.")
869    return df

Enforce the dtypes dictionary on a DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame on which to enforce dtypes.
  • dtypes (Dict[str, str]): The data types to attempt to enforce on the DataFrame.
  • safe_copy (bool, default True): If True, create a copy before comparing and modifying the dataframes. Setting to False may mutate the DataFrames. See meerschaum.utils.dataframe.filter_unseen_df.
  • coerce_numeric (bool, default True): If True, convert float and int collisions to numeric.
  • debug (bool, default False): Verbosity toggle.
Returns
  • The Pandas DataFrame with the types enforced.
def get_datetime_bound_from_df( df: Union[pandas.core.frame.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]]], datetime_column: str, minimum: bool = True) -> Union[int, datetime.datetime, NoneType]:
872def get_datetime_bound_from_df(
873    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]],
874    datetime_column: str,
875    minimum: bool = True,
876) -> Union[int, datetime, None]:
877    """
878    Return the minimum or maximum datetime (or integer) from a DataFrame.
879
880    Parameters
881    ----------
882    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]
883        The DataFrame, list, or dict which contains the range axis.
884
885    datetime_column: str
886        The name of the datetime (or int) column.
887
888    minimum: bool
889        Whether to return the minimum (default) or maximum value.
890
891    Returns
892    -------
893    The minimum or maximum datetime value in the dataframe, or `None`.
894    """
895    if df is None:
896        return None
897    if not datetime_column:
898        return None
899
900    def compare(a, b):
901        if a is None:
902            return b
903        if b is None:
904            return a
905        if minimum:
906            return a if a < b else b
907        return a if a > b else b
908
909    if isinstance(df, list):
910        if len(df) == 0:
911            return None
912        best_yet = df[0].get(datetime_column, None)
913        for doc in df:
914            val = doc.get(datetime_column, None)
915            best_yet = compare(best_yet, val)
916        return best_yet
917
918    if isinstance(df, dict):
919        if datetime_column not in df:
920            return None
921        best_yet = df[datetime_column][0]
922        for val in df[datetime_column]:
923            best_yet = compare(best_yet, val)
924        return best_yet
925
926    if 'DataFrame' in str(type(df)):
927        from meerschaum.utils.dtypes import are_dtypes_equal
928        pandas = mrsm.attempt_import('pandas')
929        is_dask = 'dask' in df.__module__
930
931        if datetime_column not in df.columns:
932            return None
933
934        dt_val = (
935            df[datetime_column].min(skipna=True)
936            if minimum else df[datetime_column].max(skipna=True)
937        )
938        if is_dask and dt_val is not None:
939            dt_val = dt_val.compute()
940
941        return (
942            pandas.to_datetime(dt_val).to_pydatetime()
943            if are_dtypes_equal(str(type(dt_val)), 'datetime')
944            else (dt_val if dt_val is not pandas.NA else None)
945        )
946
947    return None

Return the minimum or maximum datetime (or integer) from a DataFrame.

Parameters
  • df (Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]): The DataFrame, list, or dict which contains the range axis.
  • datetime_column (str): The name of the datetime (or int) column.
  • minimum (bool): Whether to return the minimum (default) or maximum value.
Returns
  • The minimum or maximum datetime value in the dataframe, or None.
def get_unique_index_values( df: Union[pandas.core.frame.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]]], indices: List[str]) -> Dict[str, List[Any]]:
950def get_unique_index_values(
951    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]],
952    indices: List[str],
953) -> Dict[str, List[Any]]:
954    """
955    Return a dictionary of the unique index values in a DataFrame.
956
957    Parameters
958    ----------
959    df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]
960        The dataframe (or list or dict) which contains index values.
961
962    indices: List[str]
963        The list of index columns.
964
965    Returns
966    -------
967    A dictionary mapping indices to unique values.
968    """
969    if df is None:
970        return {}
971    if 'dataframe' in str(type(df)).lower():
972        pandas = mrsm.attempt_import('pandas')
973        return {
974            col: list({
975                (val if val is not pandas.NA else None)
976                for val in df[col].unique()
977            })
978            for col in indices
979            if col in df.columns
980        }
981
982    unique_indices = defaultdict(lambda: set())
983    if isinstance(df, list):
984        for doc in df:
985            for index in indices:
986                if index in doc:
987                    unique_indices[index].add(doc[index])
988
989    elif isinstance(df, dict):
990        for index in indices:
991            if index in df:
992                unique_indices[index] = unique_indices[index].union(set(df[index]))
993
994    return {key: list(val) for key, val in unique_indices.items()}

Return a dictionary of the unique index values in a DataFrame.

Parameters
  • df (Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]): The dataframe (or list or dict) which contains index values.
  • indices (List[str]): The list of index columns.
Returns
  • A dictionary mapping indices to unique values.
def df_is_chunk_generator(df: Any) -> bool:
 997def df_is_chunk_generator(df: Any) -> bool:
 998    """
 999    Determine whether to treat `df` as a chunk generator.
1000
1001    Note this should only be used in a context where generators are expected,
1002    as it will return `True` for any iterable.
1003
1004    Parameters
1005    ----------
1006    The DataFrame or chunk generator to evaluate.
1007
1008    Returns
1009    -------
1010    A `bool` indicating whether to treat `df` as a generator.
1011    """
1012    return (
1013        not isinstance(df, (dict, list, str))
1014        and 'DataFrame' not in str(type(df))
1015        and isinstance(df, (Generator, Iterable, Iterator))
1016    )

Determine whether to treat df as a chunk generator.

Note this should only be used in a context where generators are expected, as it will return True for any iterable.

Parameters
  • The DataFrame or chunk generator to evaluate.
Returns
  • A bool indicating whether to treat df as a generator.
def chunksize_to_npartitions(chunksize: Optional[int]) -> int:
1019def chunksize_to_npartitions(chunksize: Optional[int]) -> int:
1020    """
1021    Return the Dask `npartitions` value for a given `chunksize`.
1022    """
1023    if chunksize == -1:
1024        from meerschaum.config import get_config
1025        chunksize = get_config('system', 'connectors', 'sql', 'chunksize')
1026    if chunksize is None:
1027        return 1
1028    return -1 * chunksize

Return the Dask npartitions value for a given chunksize.

def df_from_literal( pipe: Optional[meerschaum.Pipe] = None, literal: str = None, debug: bool = False) -> pandas.core.frame.DataFrame:
1031def df_from_literal(
1032    pipe: Optional[mrsm.Pipe] = None,
1033    literal: str = None,
1034    debug: bool = False
1035) -> 'pd.DataFrame':
1036    """
1037    Construct a dataframe from a literal value, using the pipe's datetime and value column names.
1038
1039    Parameters
1040    ----------
1041    pipe: Optional['meerschaum.Pipe'], default None
1042        The pipe which will consume the literal value.
1043
1044    Returns
1045    -------
1046    A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
1047    and the literal as the value.
1048    """
1049    from meerschaum.utils.packages import import_pandas
1050    from meerschaum.utils.warnings import error, warn
1051    from meerschaum.utils.debug import dprint
1052
1053    if pipe is None or literal is None:
1054        error("Please provide a Pipe and a literal value")
1055    ### this will raise an error if the columns are undefined
1056    dt_name, val_name = pipe.get_columns('datetime', 'value')
1057
1058    val = literal
1059    if isinstance(literal, str):
1060        if debug:
1061            dprint(f"Received literal string: '{literal}'")
1062        import ast
1063        try:
1064            val = ast.literal_eval(literal)
1065        except Exception as e:
1066            warn(
1067                "Failed to parse value from string:\n" + f"{literal}" +
1068                "\n\nWill cast as a string instead."\
1069            )
1070            val = literal
1071
1072    from datetime import datetime, timezone
1073    now = datetime.now(timezone.utc).replace(tzinfo=None)
1074
1075    pd = import_pandas()
1076    return pd.DataFrame({dt_name: [now], val_name: [val]})

Construct a dataframe from a literal value, using the pipe's datetime and value column names.

Parameters
  • pipe (Optional['meerschaum.Pipe'], default None): The pipe which will consume the literal value.
Returns
  • A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
  • and the literal as the value.
def get_first_valid_dask_partition( ddf: "'dask.dataframe.DataFrame'") -> Optional[pandas.core.frame.DataFrame]:
1079def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]:
1080    """
1081    Return the first valid Dask DataFrame partition (if possible).
1082    """
1083    pdf = None
1084    for partition in ddf.partitions:
1085        try:
1086            pdf = partition.compute()
1087        except Exception as e:
1088            continue
1089        if len(pdf) > 0:
1090            return pdf
1091    _ = mrsm.attempt_import('partd', lazy=False)
1092    return ddf.compute()

Return the first valid Dask DataFrame partition (if possible).

def query_df( df: pandas.core.frame.DataFrame, params: Optional[Dict[str, Any]] = None, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, datetime_column: Optional[str] = None, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, inplace: bool = False, reset_index: bool = False, coerce_types: bool = False, debug: bool = False) -> pandas.core.frame.DataFrame:
1095def query_df(
1096    df: 'pd.DataFrame',
1097    params: Optional[Dict[str, Any]] = None,
1098    begin: Union[datetime, int, None] = None,
1099    end: Union[datetime, int, None] = None,
1100    datetime_column: Optional[str] = None,
1101    select_columns: Optional[List[str]] = None,
1102    omit_columns: Optional[List[str]] = None,
1103    inplace: bool = False,
1104    reset_index: bool = False,
1105    coerce_types: bool = False,
1106    debug: bool = False,
1107) -> 'pd.DataFrame':
1108    """
1109    Query the dataframe with the params dictionary.
1110
1111    Parameters
1112    ----------
1113    df: pd.DataFrame
1114        The DataFrame to query against.
1115
1116    params: Optional[Dict[str, Any]], default None
1117        The parameters dictionary to use for the query.
1118
1119    begin: Union[datetime, int, None], default None
1120        If `begin` and `datetime_column` are provided, only return rows with a timestamp
1121        greater than or equal to this value.
1122
1123    end: Union[datetime, int, None], default None
1124        If `begin` and `datetime_column` are provided, only return rows with a timestamp
1125        less than this value.
1126
1127    datetime_column: Optional[str], default None
1128        A `datetime_column` must be provided to use `begin` and `end`.
1129
1130    select_columns: Optional[List[str]], default None
1131        If provided, only return these columns.
1132
1133    omit_columns: Optional[List[str]], default None
1134        If provided, do not include these columns in the result.
1135
1136    inplace: bool, default False
1137        If `True`, modify the DataFrame inplace rather than creating a new DataFrame.
1138
1139    reset_index: bool, default False
1140        If `True`, reset the index in the resulting DataFrame.
1141
1142    coerce_types: bool, default False
1143        If `True`, cast the dataframe and parameters as strings before querying.
1144
1145    Returns
1146    -------
1147    A Pandas DataFrame query result.
1148    """
1149
1150    def _process_select_columns(_df):
1151        if not select_columns:
1152            return
1153        for col in list(_df.columns):
1154            if col not in select_columns:
1155                del _df[col]
1156
1157    def _process_omit_columns(_df):
1158        if not omit_columns:
1159            return
1160        for col in list(_df.columns):
1161            if col in omit_columns:
1162                del _df[col]
1163
1164    if not params and not begin and not end:
1165        if not inplace:
1166            df = df.copy()
1167        _process_select_columns(df)
1168        _process_omit_columns(df)
1169        return df
1170
1171    from meerschaum.utils.debug import dprint
1172    from meerschaum.utils.misc import get_in_ex_params
1173    from meerschaum.utils.warnings import warn
1174    from meerschaum.utils.dtypes import are_dtypes_equal, value_is_null
1175    dateutil_parser = mrsm.attempt_import('dateutil.parser')
1176    pandas = mrsm.attempt_import('pandas')
1177    NA = pandas.NA
1178
1179    if params:
1180        params = params.copy()
1181        for key, val in {k: v for k, v in params.items()}.items():
1182            if isinstance(val, (list, tuple)):
1183                if None in val:
1184                    val = [item for item in val if item is not None] + [NA]
1185                    params[key] = val
1186                if coerce_types:
1187                    params[key] = [str(x) for x in val]
1188            else:
1189                if value_is_null(val):
1190                    val = NA
1191                    params[key] = NA
1192                if coerce_types:
1193                    params[key] = str(val)
1194
1195    dtypes = {col: str(typ) for col, typ in df.dtypes.items()}
1196
1197    if inplace:
1198        df.fillna(NA, inplace=True)
1199    else:
1200        df = df.infer_objects().fillna(NA)
1201
1202    if isinstance(begin, str):
1203        begin = dateutil_parser.parse(begin)
1204    if isinstance(end, str):
1205        end = dateutil_parser.parse(end)
1206
1207    if begin is not None or end is not None:
1208        if not datetime_column or datetime_column not in df.columns:
1209            warn(
1210                f"The datetime column '{datetime_column}' is not present in the Dataframe, "
1211                + "ignoring begin and end...",
1212            )
1213            begin, end = None, None
1214
1215    if debug:
1216        dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}")
1217
1218    if datetime_column and (begin is not None or end is not None):
1219        if debug:
1220            dprint("Checking for datetime column compatability.")
1221
1222        from meerschaum.utils.dtypes import are_dtypes_equal, coerce_timezone
1223        df_is_dt = are_dtypes_equal(str(df.dtypes[datetime_column]), 'datetime')
1224        begin_is_int = are_dtypes_equal(str(type(begin)), 'int')
1225        end_is_int = are_dtypes_equal(str(type(end)), 'int')
1226
1227        if df_is_dt:
1228            df_tz = (
1229                getattr(df[datetime_column].dt, 'tz', None)
1230                if hasattr(df[datetime_column], 'dt')
1231                else None
1232            )
1233
1234            if begin_is_int:
1235                begin = datetime.fromtimestamp(int(begin), timezone.utc).replace(tzinfo=None)
1236                if debug:
1237                    dprint(f"`begin` will be cast to '{begin}'.")
1238            if end_is_int:
1239                end = datetime.fromtimestamp(int(end), timezone.utc).replace(tzinfo=None)
1240                if debug:
1241                    dprint(f"`end` will be cast to '{end}'.")
1242
1243            begin_tz = begin.tzinfo if begin is not None else None
1244            end_tz = end.tzinfo if end is not None else None
1245
1246            if begin_tz is not None or end_tz is not None or df_tz is not None:
1247                begin = coerce_timezone(begin)
1248                end = coerce_timezone(end)
1249                if df_tz is not None:
1250                    if debug:
1251                        dprint(f"Casting column '{datetime_column}' to UTC...")
1252                    df[datetime_column] = coerce_timezone(df[datetime_column])
1253                dprint(f"Using datetime bounds:\n{begin=}\n{end=}")
1254
1255    in_ex_params = get_in_ex_params(params)
1256
1257    masks = [
1258        (
1259            (df[datetime_column] >= begin)
1260            if begin is not None and datetime_column
1261            else True
1262        ) & (
1263            (df[datetime_column] < end)
1264            if end is not None and datetime_column
1265            else True
1266        )
1267    ]
1268
1269    masks.extend([
1270        (
1271            (
1272                (df[col] if not coerce_types else df[col].astype(str)).isin(in_vals)
1273                if in_vals
1274                else True
1275            ) & (
1276                ~(df[col] if not coerce_types else df[col].astype(str)).isin(ex_vals)
1277                if ex_vals
1278                else True
1279            )
1280        )
1281        for col, (in_vals, ex_vals) in in_ex_params.items()
1282        if col in df.columns
1283    ])
1284    query_mask = masks[0]
1285    for mask in masks[1:]:
1286        query_mask = query_mask & mask
1287
1288    original_cols = df.columns
1289
1290    ### NOTE: We must cast bool columns to `boolean[pyarrow]`
1291    ###       to allow for `<NA>` values.
1292    bool_cols = [
1293        col
1294        for col, typ in df.dtypes.items()
1295        if are_dtypes_equal(str(typ), 'bool')
1296    ]
1297    for col in bool_cols:
1298        df[col] = df[col].astype('boolean[pyarrow]')
1299    df['__mrsm_mask'] = query_mask.astype('boolean[pyarrow]')
1300
1301    if inplace:
1302        df.where(query_mask, other=NA, inplace=True)
1303        df.dropna(how='all', inplace=True)
1304        result_df = df
1305    else:
1306        result_df = df.where(query_mask, other=NA)
1307        result_df.dropna(how='all', inplace=True)
1308
1309    if '__mrsm_mask' in df.columns:
1310        del df['__mrsm_mask']
1311    if '__mrsm_mask' in result_df.columns:
1312        del result_df['__mrsm_mask']
1313
1314    if reset_index:
1315        result_df.reset_index(drop=True, inplace=True)
1316
1317    result_df = enforce_dtypes(
1318        result_df,
1319        dtypes,
1320        safe_copy=False,
1321        debug=debug,
1322        coerce_numeric=False,
1323    )
1324
1325    if select_columns == ['*']:
1326        select_columns = None
1327
1328    if not select_columns and not omit_columns:
1329        return result_df[original_cols]
1330
1331    _process_select_columns(result_df)
1332    _process_omit_columns(result_df)
1333
1334    return result_df

Query the dataframe with the params dictionary.

Parameters
  • df (pd.DataFrame): The DataFrame to query against.
  • params (Optional[Dict[str, Any]], default None): The parameters dictionary to use for the query.
  • begin (Union[datetime, int, None], default None): If begin and datetime_column are provided, only return rows with a timestamp greater than or equal to this value.
  • end (Union[datetime, int, None], default None): If begin and datetime_column are provided, only return rows with a timestamp less than this value.
  • datetime_column (Optional[str], default None): A datetime_column must be provided to use begin and end.
  • select_columns (Optional[List[str]], default None): If provided, only return these columns.
  • omit_columns (Optional[List[str]], default None): If provided, do not include these columns in the result.
  • inplace (bool, default False): If True, modify the DataFrame inplace rather than creating a new DataFrame.
  • reset_index (bool, default False): If True, reset the index in the resulting DataFrame.
  • coerce_types (bool, default False): If True, cast the dataframe and parameters as strings before querying.
Returns
  • A Pandas DataFrame query result.
def to_json( df: pandas.core.frame.DataFrame, safe_copy: bool = True, orient: str = 'records', date_format: str = 'iso', date_unit: str = 'us', **kwargs: Any) -> str:
1337def to_json(
1338    df: 'pd.DataFrame',
1339    safe_copy: bool = True,
1340    orient: str = 'records',
1341    date_format: str = 'iso',
1342    date_unit: str = 'us',
1343    **kwargs: Any
1344) -> str:
1345    """
1346    Serialize the given dataframe as a JSON string.
1347
1348    Parameters
1349    ----------
1350    df: pd.DataFrame
1351        The DataFrame to be serialized.
1352
1353    safe_copy: bool, default True
1354        If `False`, modify the DataFrame inplace.
1355
1356    date_format: str, default 'iso'
1357        The default format for timestamps.
1358
1359    date_unit: str, default 'us'
1360        The precision of the timestamps.
1361
1362    Returns
1363    -------
1364    A JSON string.
1365    """
1366    from meerschaum.utils.packages import import_pandas
1367    pd = import_pandas()
1368    uuid_cols = get_uuid_cols(df)
1369    if uuid_cols and safe_copy:
1370        df = df.copy()
1371    for col in uuid_cols:
1372        df[col] = df[col].astype(str)
1373    return df.infer_objects(copy=False).fillna(pd.NA).to_json(
1374        date_format=date_format,
1375        date_unit=date_unit,
1376        orient=orient,
1377        **kwargs
1378    )

Serialize the given dataframe as a JSON string.

Parameters
  • df (pd.DataFrame): The DataFrame to be serialized.
  • safe_copy (bool, default True): If False, modify the DataFrame inplace.
  • date_format (str, default 'iso'): The default format for timestamps.
  • date_unit (str, default 'us'): The precision of the timestamps.
Returns
  • A JSON string.