meerschaum.utils.dataframe

Utility functions for working with DataFrames.

   1#! /usr/bin/env python3
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4
   5"""
   6Utility functions for working with DataFrames.
   7"""
   8
   9from __future__ import annotations
  10from datetime import datetime
  11
  12import meerschaum as mrsm
  13from meerschaum.utils.typing import (
  14    Optional, Dict, Any, List, Hashable, Generator,
  15    Iterator, Iterable, Union, TYPE_CHECKING,
  16)
  17
  18if TYPE_CHECKING:
  19    pd, dask = mrsm.attempt_import('pandas', 'dask')
  20
  21
  22def add_missing_cols_to_df(
  23    df: 'pd.DataFrame',
  24    dtypes: Dict[str, Any],
  25) -> 'pd.DataFrame':
  26    """
  27    Add columns from the dtypes dictionary as null columns to a new DataFrame.
  28
  29    Parameters
  30    ----------
  31    df: pd.DataFrame
  32        The dataframe we should copy and add null columns.
  33
  34    dtypes:
  35        The data types dictionary which may contain keys not present in `df.columns`.
  36
  37    Returns
  38    -------
  39    A new `DataFrame` with the keys from `dtypes` added as null columns.
  40    If `df.dtypes` is the same as `dtypes`, then return a reference to `df`.
  41    NOTE: This will not ensure that dtypes are enforced!
  42
  43    Examples
  44    --------
  45    >>> import pandas as pd
  46    >>> df = pd.DataFrame([{'a': 1}])
  47    >>> dtypes = {'b': 'Int64'}
  48    >>> add_missing_cols_to_df(df, dtypes)
  49          a  b
  50       0  1  <NA>
  51    >>> add_missing_cols_to_df(df, dtypes).dtypes
  52    a    int64
  53    b    Int64
  54    dtype: object
  55    >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes
  56    a    int64
  57    dtype: object
  58    >>> 
  59    """
  60    if set(df.columns) == set(dtypes):
  61        return df
  62
  63    import traceback
  64    from meerschaum.utils.packages import import_pandas, attempt_import
  65    from meerschaum.utils.warnings import warn
  66    from meerschaum.utils.dtypes import to_pandas_dtype
  67    pandas = attempt_import('pandas')
  68    
  69    def build_series(dtype: str):
  70        return pandas.Series([], dtype=to_pandas_dtype(dtype))
  71
  72    assign_kwargs = {
  73        str(col): build_series(str(typ))
  74        for col, typ in dtypes.items()
  75        if col not in df.columns
  76    }
  77    return df.assign(**assign_kwargs)
  78
  79
  80def filter_unseen_df(
  81        old_df: 'pd.DataFrame',
  82        new_df: 'pd.DataFrame',
  83        safe_copy: bool = True,
  84        dtypes: Optional[Dict[str, Any]] = None,
  85        debug: bool = False,
  86    ) -> 'pd.DataFrame':
  87    """
  88    Left join two DataFrames to find the newest unseen data.
  89
  90    Parameters
  91    ----------
  92    old_df: 'pd.DataFrame'
  93        The original (target) dataframe. Acts as a filter on the `new_df`.
  94        
  95    new_df: 'pd.DataFrame'
  96        The fetched (source) dataframe. Rows that are contained in `old_df` are removed.
  97
  98    safe_copy: bool, default True
  99        If `True`, create a copy before comparing and modifying the dataframes.
 100        Setting to `False` may mutate the DataFrames.
 101        
 102    dtypes: Optional[Dict[str, Any]], default None
 103        Optionally specify the datatypes of the dataframe.
 104
 105    debug: bool, default False
 106        Verbosity toggle.
 107
 108    Returns
 109    -------
 110    A pandas dataframe of the new, unseen rows in `new_df`.
 111
 112    Examples
 113    --------
 114    ```python
 115    >>> import pandas as pd
 116    >>> df1 = pd.DataFrame({'a': [1,2]})
 117    >>> df2 = pd.DataFrame({'a': [2,3]})
 118    >>> filter_unseen_df(df1, df2)
 119       a
 120    0  3
 121
 122    ```
 123
 124    """
 125    if old_df is None:
 126        return new_df
 127
 128    if safe_copy:
 129        old_df = old_df.copy()
 130        new_df = new_df.copy()
 131
 132    import json
 133    import functools
 134    import traceback
 135    from decimal import Decimal
 136    from meerschaum.utils.warnings import warn
 137    from meerschaum.utils.packages import import_pandas, attempt_import
 138    from meerschaum.utils.dtypes import to_pandas_dtype, are_dtypes_equal, attempt_cast_to_numeric
 139    from meerschaum.utils.debug import dprint
 140    pd = import_pandas(debug=debug)
 141    is_dask = 'dask' in new_df.__module__
 142    if is_dask:
 143        pandas = attempt_import('pandas')
 144        dd = attempt_import('dask.dataframe')
 145        merge = dd.merge
 146        NA = pandas.NA
 147    else:
 148        merge = pd.merge
 149        NA = pd.NA
 150
 151    new_df_dtypes = dict(new_df.dtypes)
 152    old_df_dtypes = dict(old_df.dtypes)
 153
 154    same_cols = set(new_df.columns) == set(old_df.columns)
 155    if not same_cols:
 156        new_df = add_missing_cols_to_df(new_df, old_df_dtypes)
 157        old_df = add_missing_cols_to_df(old_df, new_df_dtypes)
 158
 159    ### Edge case: two empty lists cast to DFs.
 160    elif len(new_df.columns) == 0:
 161        return new_df
 162
 163    try:
 164        ### Order matters when checking equality.
 165        new_df = new_df[old_df.columns]
 166    except Exception as e:
 167        warn(
 168            "Was not able to cast old columns onto new DataFrame. " +
 169            f"Are both DataFrames the same shape? Error:\n{e}",
 170            stacklevel = 3,
 171        )
 172        return new_df[list(new_df_dtypes.keys())]
 173
 174    ### assume the old_df knows what it's doing, even if it's technically wrong.
 175    if dtypes is None:
 176        dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()}
 177
 178    dtypes = {
 179        col: to_pandas_dtype(typ)
 180        for col, typ in dtypes.items()
 181        if col in new_df_dtypes and col in old_df_dtypes
 182    }
 183    for col, typ in new_df_dtypes.items():
 184        if col not in dtypes:
 185            dtypes[col] = typ
 186    
 187    cast_cols = True
 188    try:
 189        new_df = new_df.astype(dtypes)
 190        cast_cols = False
 191    except Exception as e:
 192        warn(
 193            f"Was not able to cast the new DataFrame to the given dtypes.\n{e}"
 194        )
 195
 196    new_numeric_cols_existing = get_numeric_cols(new_df)
 197    old_numeric_cols = get_numeric_cols(old_df)
 198    for col, typ in {k: v for k, v in dtypes.items()}.items():
 199        if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')):
 200            new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float')
 201            new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int')
 202            new_is_numeric = col in new_numeric_cols_existing
 203            old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float')
 204            old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int')
 205            old_is_numeric = col in old_numeric_cols
 206
 207            if (
 208                (new_is_float or new_is_int or new_is_numeric)
 209                and
 210                (old_is_float or old_is_int or old_is_numeric)
 211            ):
 212                dtypes[col] = attempt_cast_to_numeric
 213                cast_cols = True
 214                continue
 215
 216            ### Fallback to object if the types don't match.
 217            warn(
 218                f"Detected different types for '{col}' "
 219                + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), "
 220                + "falling back to 'object'..."
 221            )
 222            dtypes[col] = 'object'
 223            cast_cols = True
 224
 225    if cast_cols:
 226        for col, dtype in dtypes.items():
 227            if col in new_df.columns:
 228                try:
 229                    new_df[col] = (
 230                        new_df[col].astype(dtype)
 231                        if not callable(dtype)
 232                        else new_df[col].apply(dtype)
 233                    )
 234                except Exception as e:
 235                    warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}")
 236
 237    serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str)
 238    new_json_cols = get_json_cols(new_df)
 239    old_json_cols = get_json_cols(old_df)
 240    json_cols = set(new_json_cols + old_json_cols)
 241    for json_col in old_json_cols:
 242        old_df[json_col] = old_df[json_col].apply(serializer)
 243    for json_col in new_json_cols:
 244        new_df[json_col] = new_df[json_col].apply(serializer)
 245
 246    new_numeric_cols = get_numeric_cols(new_df)
 247    numeric_cols = set(new_numeric_cols + old_numeric_cols)
 248    for numeric_col in old_numeric_cols:
 249        old_df[numeric_col] = old_df[numeric_col].apply(
 250            lambda x: f'{x:f}' if isinstance(x, Decimal) else x
 251        )
 252    for numeric_col in new_numeric_cols:
 253        new_df[numeric_col] = new_df[numeric_col].apply(
 254            lambda x: f'{x:f}' if isinstance(x, Decimal) else x
 255        )
 256
 257    joined_df = merge(
 258        new_df.fillna(NA),
 259        old_df.fillna(NA),
 260        how = 'left',
 261        on = None,
 262        indicator = True,
 263    )
 264    changed_rows_mask = (joined_df['_merge'] == 'left_only')
 265    delta_df = joined_df[list(new_df_dtypes.keys())][changed_rows_mask].reset_index(drop=True)
 266
 267    for json_col in json_cols:
 268        if json_col not in delta_df.columns:
 269            continue
 270        try:
 271            delta_df[json_col] = delta_df[json_col].apply(json.loads)
 272        except Exception as e:
 273            warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}")
 274
 275    for numeric_col in numeric_cols:
 276        if numeric_col not in delta_df.columns:
 277            continue
 278        try:
 279            delta_df[numeric_col] = delta_df[numeric_col].apply(attempt_cast_to_numeric)
 280        except Exception as e:
 281            warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}")
 282
 283    return delta_df
 284
 285
 286def parse_df_datetimes(
 287        df: 'pd.DataFrame',
 288        ignore_cols: Optional[Iterable[str]] = None,
 289        chunksize: Optional[int] = None,
 290        dtype_backend: str = 'numpy_nullable',
 291        debug: bool = False,
 292    ) -> 'pd.DataFrame':
 293    """
 294    Parse a pandas DataFrame for datetime columns and cast as datetimes.
 295
 296    Parameters
 297    ----------
 298    df: pd.DataFrame
 299        The pandas DataFrame to parse.
 300
 301    ignore_cols: Optional[Iterable[str]], default None
 302        If provided, do not attempt to coerce these columns as datetimes.
 303
 304    chunksize: Optional[int], default None
 305        If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe.
 306
 307    dtype_backend: str, default 'numpy_nullable'
 308        If `df` is not a DataFrame and new one needs to be constructed,
 309        use this as the datatypes backend.
 310        Accepted values are 'numpy_nullable' and 'pyarrow'.
 311        
 312    debug: bool, default False
 313        Verbosity toggle.
 314
 315    Returns
 316    -------
 317    A new pandas DataFrame with the determined datetime columns
 318    (usually ISO strings) cast as datetimes.
 319
 320    Examples
 321    --------
 322    ```python
 323    >>> import pandas as pd
 324    >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 
 325    >>> df.dtypes
 326    a    object
 327    dtype: object
 328    >>> df = parse_df_datetimes(df)
 329    >>> df.dtypes
 330    a    datetime64[ns]
 331    dtype: object
 332
 333    ```
 334
 335    """
 336    from meerschaum.utils.packages import import_pandas, attempt_import
 337    from meerschaum.utils.debug import dprint
 338    from meerschaum.utils.warnings import warn
 339    import traceback
 340    pd = import_pandas()
 341    pandas = attempt_import('pandas')
 342    pd_name = pd.__name__
 343    using_dask = 'dask' in pd_name
 344    df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__)
 345    dask_dataframe = None
 346    if using_dask or df_is_dask:
 347        npartitions = chunksize_to_npartitions(chunksize)
 348        dask_dataframe = attempt_import('dask.dataframe')
 349
 350    ### if df is a dict, build DataFrame
 351    if isinstance(df, pandas.DataFrame):
 352        pdf = df
 353    elif df_is_dask and isinstance(df, dask_dataframe.DataFrame):
 354        pdf = get_first_valid_dask_partition(df)
 355    else:
 356        if debug:
 357            dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...")
 358
 359        if using_dask:
 360            if isinstance(df, list):
 361                keys = set()
 362                for doc in df:
 363                    for key in doc:
 364                        keys.add(key)
 365                df = pd.DataFrame.from_dict(
 366                    {
 367                        k: [
 368                            doc.get(k, None)
 369                            for doc in df
 370                        ] for k in keys
 371                    },
 372                    npartitions = npartitions,
 373                )
 374            elif isinstance(df, dict):
 375                df = pd.DataFrame.from_dict(df, npartitions=npartitions)
 376            elif 'pandas.core.frame.DataFrame' in str(type(df)):
 377                df = pd.from_pandas(df, npartitions=npartitions)
 378            else:
 379                raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.")
 380            pandas = attempt_import('pandas')
 381            pdf = get_first_valid_dask_partition(df)
 382
 383        else:
 384            df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend)
 385            pdf = df
 386
 387    ### skip parsing if DataFrame is empty
 388    if len(pdf) == 0:
 389        if debug:
 390            dprint(f"df is empty. Returning original DataFrame without casting datetime columns...")
 391        return df
 392
 393    ignore_cols = set(
 394        (ignore_cols or []) + [
 395            col
 396            for col, dtype in pdf.dtypes.items() 
 397            if 'datetime' in str(dtype)
 398        ]
 399    )
 400    cols_to_inspect = [col for col in pdf.columns if col not in ignore_cols]
 401
 402    if len(cols_to_inspect) == 0:
 403        if debug:
 404            dprint(f"All columns are ignored, skipping datetime detection...")
 405        return df
 406
 407    ### apply regex to columns to determine which are ISO datetimes
 408    iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+'
 409    dt_mask = pdf[cols_to_inspect].astype(str).apply(
 410        lambda s: s.str.match(iso_dt_regex).all()
 411    )
 412
 413    ### list of datetime column names
 414    datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]]
 415    if not datetime_cols:
 416        if debug:
 417            dprint("No columns detected as datetimes, returning...")
 418        return df
 419
 420    if debug:
 421        dprint("Converting columns to datetimes: " + str(datetime_cols))
 422
 423    try:
 424        if not using_dask:
 425            df[datetime_cols] = df[datetime_cols].apply(pd.to_datetime, utc=True)
 426        else:
 427            df[datetime_cols] = df[datetime_cols].apply(
 428                pd.to_datetime,
 429                utc = True,
 430                axis = 1,
 431                meta = {
 432                    col: 'datetime64[ns]'
 433                    for col in datetime_cols
 434                }
 435            )
 436    except Exception as e:
 437        warn(
 438            f"Unable to apply `pd.to_datetime` to {items_str(datetime_cols)}:\n"
 439            + f"{traceback.format_exc()}"
 440        )
 441
 442    for dt in datetime_cols:
 443        try:
 444            df[dt] = df[dt].dt.tz_localize(None)
 445        except Exception as e:
 446            warn(f"Unable to convert column '{dt}' to naive datetime:\n{traceback.format_exc()}")
 447
 448    return df
 449
 450
 451def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]:
 452    """
 453    Get the columns which contain unhashable objects from a Pandas DataFrame.
 454
 455    Parameters
 456    ----------
 457    df: pd.DataFrame
 458        The DataFrame which may contain unhashable objects.
 459
 460    Returns
 461    -------
 462    A list of columns.
 463    """
 464    if len(df) == 0:
 465        return []
 466
 467    is_dask = 'dask' in df.__module__
 468    if is_dask:
 469        from meerschaum.utils.packages import attempt_import
 470        pandas = attempt_import('pandas')
 471        df = pandas.DataFrame(get_first_valid_dask_partition(df))
 472    return [
 473        col for col, val in df.iloc[0].items()
 474        if not isinstance(val, Hashable)
 475    ]
 476
 477
 478def get_json_cols(df: 'pd.DataFrame') -> List[str]:
 479    """
 480    Get the columns which contain unhashable objects from a Pandas DataFrame.
 481
 482    Parameters
 483    ----------
 484    df: pd.DataFrame
 485        The DataFrame which may contain unhashable objects.
 486
 487    Returns
 488    -------
 489    A list of columns to be encoded as JSON.
 490    """
 491    is_dask = 'dask' in df.__module__
 492    if is_dask:
 493        df = get_first_valid_dask_partition(df)
 494    
 495    if len(df) == 0:
 496        return []
 497
 498    cols_indices = {
 499        col: df[col].first_valid_index()
 500        for col in df.columns
 501    }
 502    return [
 503        col
 504        for col, ix in cols_indices.items()
 505        if (
 506            ix is not None
 507            and
 508            not isinstance(df.loc[ix][col], Hashable)
 509        )
 510    ]
 511
 512
 513def get_numeric_cols(df: 'pd.DataFrame') -> List[str]:
 514    """
 515    Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame.
 516
 517    Parameters
 518    ----------
 519    df: pd.DataFrame
 520        The DataFrame which may contain decimal objects.
 521
 522    Returns
 523    -------
 524    A list of columns to treat as numerics.
 525    """
 526    from decimal import Decimal
 527    is_dask = 'dask' in df.__module__
 528    if is_dask:
 529        df = get_first_valid_dask_partition(df)
 530    
 531    if len(df) == 0:
 532        return []
 533
 534    cols_indices = {
 535        col: df[col].first_valid_index()
 536        for col in df.columns
 537    }
 538    return [
 539        col
 540        for col, ix in cols_indices.items()
 541        if (
 542            ix is not None
 543            and
 544            isinstance(df.loc[ix][col], Decimal)
 545        )
 546    ]
 547
 548
 549def enforce_dtypes(
 550        df: 'pd.DataFrame',
 551        dtypes: Dict[str, str],
 552        safe_copy: bool = True,
 553        coerce_numeric: bool = True,
 554        debug: bool = False,
 555    ) -> 'pd.DataFrame':
 556    """
 557    Enforce the `dtypes` dictionary on a DataFrame.
 558
 559    Parameters
 560    ----------
 561    df: pd.DataFrame
 562        The DataFrame on which to enforce dtypes.
 563
 564    dtypes: Dict[str, str]
 565        The data types to attempt to enforce on the DataFrame.
 566
 567    safe_copy: bool, default True
 568        If `True`, create a copy before comparing and modifying the dataframes.
 569        Setting to `False` may mutate the DataFrames.
 570        See `meerschaum.utils.dataframe.filter_unseen_df`.
 571
 572    coerce_numeric: bool, default True
 573        If `True`, convert float and int collisions to numeric.
 574
 575    debug: bool, default False
 576        Verbosity toggle.
 577
 578    Returns
 579    -------
 580    The Pandas DataFrame with the types enforced.
 581    """
 582    import json
 583    import traceback
 584    from decimal import Decimal
 585    from meerschaum.utils.debug import dprint
 586    from meerschaum.utils.warnings import warn
 587    from meerschaum.utils.formatting import pprint
 588    from meerschaum.config.static import STATIC_CONFIG
 589    from meerschaum.utils.packages import import_pandas
 590    from meerschaum.utils.dtypes import (
 591        are_dtypes_equal,
 592        to_pandas_dtype,
 593        is_dtype_numeric,
 594        attempt_cast_to_numeric,
 595    )
 596    if safe_copy:
 597        df = df.copy()
 598    df_dtypes = {c: str(t) for c, t in df.dtypes.items()}
 599    if len(df_dtypes) == 0:
 600        if debug:
 601            dprint("Incoming DataFrame has no columns. Skipping enforcement...")
 602        return df
 603
 604    pipe_pandas_dtypes = {
 605        col: to_pandas_dtype(typ)
 606        for col, typ in dtypes.items()
 607    }
 608    json_cols = [
 609        col
 610        for col, typ in dtypes.items()
 611        if typ == 'json'
 612    ]
 613    numeric_cols = [
 614        col
 615        for col, typ in dtypes.items()
 616        if typ == 'numeric'
 617    ]
 618    df_numeric_cols = get_numeric_cols(df)
 619    if debug:
 620        dprint(f"Desired data types:")
 621        pprint(dtypes)
 622        dprint(f"Data types for incoming DataFrame:")
 623        pprint(df_dtypes)
 624
 625    if json_cols and len(df) > 0:
 626        if debug:
 627            dprint(f"Checking columns for JSON encoding: {json_cols}")
 628        for col in json_cols:
 629            if col in df.columns:
 630                try:
 631                    df[col] = df[col].apply(
 632                        (
 633                            lambda x: (
 634                                json.loads(x)
 635                                if isinstance(x, str)
 636                                else x
 637                            )
 638                        )
 639                    )
 640                except Exception as e:
 641                    if debug:
 642                        dprint(f"Unable to parse column '{col}' as JSON:\n{e}")
 643
 644    if numeric_cols:
 645        if debug:
 646            dprint(f"Checking for numerics: {numeric_cols}")
 647        for col in numeric_cols:
 648            if col in df.columns:
 649                try:
 650                    df[col] = df[col].apply(attempt_cast_to_numeric)
 651                except Exception as e:
 652                    if debug:
 653                        dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}")
 654
 655    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
 656        if debug:
 657            dprint(f"Data types match. Exiting enforcement...")
 658        return df
 659
 660    common_dtypes = {}
 661    common_diff_dtypes = {}
 662    for col, typ in pipe_pandas_dtypes.items():
 663        if col in df_dtypes:
 664            common_dtypes[col] = typ
 665            if not are_dtypes_equal(typ, df_dtypes[col]):
 666                common_diff_dtypes[col] = df_dtypes[col]
 667
 668    if debug:
 669        dprint(f"Common columns with different dtypes:")
 670        pprint(common_diff_dtypes)
 671
 672    detected_dt_cols = {}
 673    for col, typ in common_diff_dtypes.items():
 674        if 'datetime' in typ and 'datetime' in common_dtypes[col]:
 675            df_dtypes[col] = typ
 676            detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col])
 677    for col in detected_dt_cols:
 678        del common_diff_dtypes[col]
 679
 680    if debug:
 681        dprint(f"Common columns with different dtypes (after dates):")
 682        pprint(common_diff_dtypes)
 683
 684    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
 685        if debug:
 686            dprint(
 687                "The incoming DataFrame has mostly the same types, skipping enforcement."
 688                + f"The only detected difference was in the following datetime columns.\n"
 689                + "    Timezone information may be stripped."
 690            )
 691            pprint(detected_dt_cols)
 692        return df
 693
 694    for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items():
 695        previous_typ = common_dtypes[col]
 696        mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ))
 697        explicitly_float = are_dtypes_equal(dtypes.get(col, 'object'), 'float')
 698        explicitly_numeric = dtypes.get(col, 'numeric') == 'numeric'
 699        cast_to_numeric = (
 700            explicitly_numeric
 701            or col in df_numeric_cols
 702            or (mixed_numeric_types and not explicitly_float)
 703        ) and coerce_numeric
 704        if cast_to_numeric:
 705            common_dtypes[col] = attempt_cast_to_numeric
 706            common_diff_dtypes[col] = attempt_cast_to_numeric
 707
 708    for d in common_diff_dtypes:
 709        t = common_dtypes[d]
 710        if debug:
 711            dprint(f"Casting column {d} to dtype {t}.")
 712        try:
 713            df[d] = (
 714                df[d].apply(t)
 715                if callable(t)
 716                else df[d].astype(t)
 717            )
 718        except Exception as e:
 719            if debug:
 720                dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}")
 721            if 'int' in str(t).lower():
 722                try:
 723                    df[d] = df[d].astype('float64').astype(t)
 724                except Exception as e:
 725                    if debug:
 726                        dprint(f"Was unable to convert to float then {t}.")
 727    return df
 728
 729
 730def get_datetime_bound_from_df(
 731        df: Union['pd.DataFrame', dict, list],
 732        datetime_column: str,
 733        minimum: bool = True,
 734    ) -> Union[int, datetime, None]:
 735    """
 736    Return the minimum or maximum datetime (or integer) from a DataFrame.
 737
 738    Parameters
 739    ----------
 740    df: pd.DataFrame
 741        The DataFrame, list, or dict which contains the range axis.
 742
 743    datetime_column: str
 744        The name of the datetime (or int) column.
 745
 746    minimum: bool
 747        Whether to return the minimum (default) or maximum value.
 748
 749    Returns
 750    -------
 751    The minimum or maximum datetime value in the dataframe, or `None`.
 752    """
 753    if not datetime_column:
 754        return None
 755
 756    def compare(a, b):
 757        if a is None:
 758            return b
 759        if b is None:
 760            return a
 761        if minimum:
 762            return a if a < b else b
 763        return a if a > b else b
 764
 765    if isinstance(df, list):
 766        if len(df) == 0:
 767            return None
 768        best_yet = df[0].get(datetime_column, None)
 769        for doc in df:
 770            val = doc.get(datetime_column, None)
 771            best_yet = compare(best_yet, val)
 772        return best_yet
 773
 774    if isinstance(df, dict):
 775        if datetime_column not in df:
 776            return None
 777        best_yet = df[datetime_column][0]
 778        for val in df[datetime_column]:
 779            best_yet = compare(best_yet, val)
 780        return best_yet
 781
 782    if 'DataFrame' in str(type(df)):
 783        if datetime_column not in df.columns:
 784            return None
 785        return (
 786            df[datetime_column].min(skipna=True)
 787            if minimum
 788            else df[datetime_column].max(skipna=True)
 789        )
 790
 791    return None
 792
 793
 794def df_is_chunk_generator(df: Any) -> bool:
 795    """
 796    Determine whether to treat `df` as a chunk generator.
 797
 798    Note this should only be used in a context where generators are expected,
 799    as it will return `True` for any iterable.
 800
 801    Parameters
 802    ----------
 803    The DataFrame or chunk generator to evaluate.
 804
 805    Returns
 806    -------
 807    A `bool` indicating whether to treat `df` as a generator.
 808    """
 809    return (
 810        not isinstance(df, (dict, list, str))
 811        and 'DataFrame' not in str(type(df))
 812        and isinstance(df, (Generator, Iterable, Iterator))
 813    )
 814
 815
 816def chunksize_to_npartitions(chunksize: Optional[int]) -> int:
 817    """
 818    Return the Dask `npartitions` value for a given `chunksize`.
 819    """
 820    if chunksize == -1:
 821        from meerschaum.config import get_config
 822        chunksize = get_config('system', 'connectors', 'sql', 'chunksize')
 823    if chunksize is None:
 824        return 1
 825    return -1 * chunksize
 826
 827
 828def df_from_literal(
 829        pipe: Optional[mrsm.Pipe] = None,
 830        literal: str = None,
 831        debug: bool = False
 832    ) -> 'pd.DataFrame':
 833    """
 834    Construct a dataframe from a literal value, using the pipe's datetime and value column names.
 835
 836    Parameters
 837    ----------
 838    pipe: Optional['meerschaum.Pipe'], default None
 839        The pipe which will consume the literal value.
 840
 841    Returns
 842    -------
 843    A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
 844    and the literal as the value.
 845    """
 846    from meerschaum.utils.packages import import_pandas
 847    from meerschaum.utils.warnings import error, warn
 848    from meerschaum.utils.debug import dprint
 849
 850    if pipe is None or literal is None:
 851        error("Please provide a Pipe and a literal value")
 852    ### this will raise an error if the columns are undefined
 853    dt_name, val_name = pipe.get_columns('datetime', 'value')
 854
 855    val = literal
 856    if isinstance(literal, str):
 857        if debug:
 858            dprint(f"Received literal string: '{literal}'")
 859        import ast
 860        try:
 861            val = ast.literal_eval(literal)
 862        except Exception as e:
 863            warn(
 864                "Failed to parse value from string:\n" + f"{literal}" +
 865                "\n\nWill cast as a string instead."\
 866            )
 867            val = literal
 868
 869    from datetime import datetime, timezone
 870    now = datetime.now(timezone.utc).replace(tzinfo=None)
 871
 872    pd = import_pandas()
 873    return pd.DataFrame({dt_name: [now], val_name: [val]})
 874
 875
 876def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]:
 877    """
 878    Return the first valid Dask DataFrame partition (if possible).
 879    """
 880    pdf = None
 881    for partition in ddf.partitions:
 882        try:
 883            pdf = partition.compute()
 884        except Exception as e:
 885            continue
 886        if len(pdf) > 0:
 887            return pdf
 888    return ddf.compute()
 889
 890
 891def query_df(
 892        df: 'pd.DataFrame',
 893        params: Optional[Dict[str, Any]] = None,
 894        begin: Union[datetime, int, None] = None,
 895        end: Union[datetime, int, None] = None,
 896        datetime_column: Optional[str] = None,
 897        select_columns: Optional[List[str]] = None,
 898        omit_columns: Optional[List[str]] = None,
 899        inplace: bool = False,
 900        reset_index: bool = False,
 901        debug: bool = False,
 902    ) -> 'pd.DataFrame':
 903    """
 904    Query the dataframe with the params dictionary.
 905
 906    Parameters
 907    ----------
 908    df: pd.DataFrame
 909        The DataFrame to query against.
 910
 911    params: Optional[Dict[str, Any]], default None
 912        The parameters dictionary to use for the query.
 913
 914    begin: Union[datetime, int, None], default None
 915        If `begin` and `datetime_column` are provided, only return rows with a timestamp
 916        greater than or equal to this value.
 917
 918    end: Union[datetime, int, None], default None
 919        If `begin` and `datetime_column` are provided, only return rows with a timestamp
 920        less than this value.
 921
 922    datetime_column: Optional[str], default None
 923        A `datetime_column` must be provided to use `begin` and `end`.
 924
 925    select_columns: Optional[List[str]], default None
 926        If provided, only return these columns.
 927
 928    omit_columns: Optional[List[str]], default None
 929        If provided, do not include these columns in the result.
 930
 931    inplace: bool, default False
 932        If `True`, modify the DataFrame inplace rather than creating a new DataFrame.
 933
 934    reset_index: bool, default True
 935        If `True`, reset the index in the resulting DataFrame.
 936
 937    Returns
 938    -------
 939    A Pandas DataFrame query result.
 940    """
 941    if not params and not begin and not end:
 942        return df
 943
 944    import json
 945    import meerschaum as mrsm
 946    from meerschaum.utils.debug import dprint
 947    from meerschaum.utils.misc import get_in_ex_params
 948    from meerschaum.utils.warnings import warn
 949
 950    dtypes = {col: str(typ) for col, typ in df.dtypes.items()}
 951
 952    if begin or end:
 953        if not datetime_column or datetime_column not in df.columns:
 954            warn(
 955                f"The datetime column '{datetime_column}' is not present in the Dataframe, "
 956                + "ignoring begin and end...",
 957            )
 958            begin, end = None, None
 959
 960    if debug:
 961        dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}")
 962
 963    in_ex_params = get_in_ex_params(params)
 964
 965    def serialize(x: Any) -> str:
 966        if isinstance(x, (dict, list, tuple)):
 967            return json.dumps(x, sort_keys=True, separators=(',', ':'), default=str)
 968        if hasattr(x, 'isoformat'):
 969            return x.isoformat()
 970        return str(x)
 971
 972    masks = [
 973        (
 974            (df[datetime_column] >= begin)
 975            if begin is not None and datetime_column
 976            else True
 977        ) & (
 978            (df[datetime_column] < end)
 979            if end is not None and datetime_column
 980            else True
 981        )
 982    ]
 983
 984    masks.extend([
 985        (
 986            (
 987                df[col].apply(serialize).isin(
 988                    [
 989                        serialize(_in_val)
 990                        for _in_val in in_vals
 991                    ]
 992                ) if in_vals else True
 993            ) & (
 994                ~df[col].apply(serialize).isin(
 995                    [
 996                        serialize(_ex_val)
 997                        for _ex_val in ex_vals
 998                    ]
 999                ) if ex_vals else True
1000            )
1001        )
1002        for col, (in_vals, ex_vals) in in_ex_params.items()
1003        if col in df.columns
1004    ])
1005    query_mask = masks[0]
1006    for mask in masks:
1007        query_mask = query_mask & mask
1008
1009    if inplace:
1010        df.where(query_mask, inplace=inplace)
1011        df.dropna(how='all', inplace=inplace)
1012        result_df = df
1013    else:
1014        result_df = df.where(query_mask).dropna(how='all')
1015
1016    if reset_index:
1017        result_df.reset_index(drop=True, inplace=True)
1018
1019    result_df = enforce_dtypes(
1020        result_df,
1021        dtypes,
1022        safe_copy = (not inplace),
1023        debug = debug,
1024        coerce_numeric = False,
1025    )
1026
1027    if select_columns == ['*']:
1028        select_columns = None
1029
1030    if not select_columns and not omit_columns:
1031        return result_df
1032
1033    if select_columns:
1034        for col in list(result_df.columns):
1035            if col not in select_columns:
1036                del result_df[col]
1037        return result_df
1038
1039    if omit_columns:
1040        for col in list(result_df.columns):
1041            if col in omit_columns:
1042                del result_df[col]
1043    if debug:
1044        dprint(f"{dtypes=}")
1045    return result_df
def add_missing_cols_to_df( df: pandas.core.frame.DataFrame, dtypes: Dict[str, Any]) -> pandas.core.frame.DataFrame:
23def add_missing_cols_to_df(
24    df: 'pd.DataFrame',
25    dtypes: Dict[str, Any],
26) -> 'pd.DataFrame':
27    """
28    Add columns from the dtypes dictionary as null columns to a new DataFrame.
29
30    Parameters
31    ----------
32    df: pd.DataFrame
33        The dataframe we should copy and add null columns.
34
35    dtypes:
36        The data types dictionary which may contain keys not present in `df.columns`.
37
38    Returns
39    -------
40    A new `DataFrame` with the keys from `dtypes` added as null columns.
41    If `df.dtypes` is the same as `dtypes`, then return a reference to `df`.
42    NOTE: This will not ensure that dtypes are enforced!
43
44    Examples
45    --------
46    >>> import pandas as pd
47    >>> df = pd.DataFrame([{'a': 1}])
48    >>> dtypes = {'b': 'Int64'}
49    >>> add_missing_cols_to_df(df, dtypes)
50          a  b
51       0  1  <NA>
52    >>> add_missing_cols_to_df(df, dtypes).dtypes
53    a    int64
54    b    Int64
55    dtype: object
56    >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes
57    a    int64
58    dtype: object
59    >>> 
60    """
61    if set(df.columns) == set(dtypes):
62        return df
63
64    import traceback
65    from meerschaum.utils.packages import import_pandas, attempt_import
66    from meerschaum.utils.warnings import warn
67    from meerschaum.utils.dtypes import to_pandas_dtype
68    pandas = attempt_import('pandas')
69    
70    def build_series(dtype: str):
71        return pandas.Series([], dtype=to_pandas_dtype(dtype))
72
73    assign_kwargs = {
74        str(col): build_series(str(typ))
75        for col, typ in dtypes.items()
76        if col not in df.columns
77    }
78    return df.assign(**assign_kwargs)

Add columns from the dtypes dictionary as null columns to a new DataFrame.

Parameters
  • df (pd.DataFrame): The dataframe we should copy and add null columns.
  • dtypes:: The data types dictionary which may contain keys not present in df.columns.
Returns
  • A new DataFrame with the keys from dtypes added as null columns.
  • If df.dtypes is the same as dtypes, then return a reference to df.
  • NOTE (This will not ensure that dtypes are enforced!):
Examples
>>> import pandas as pd
>>> df = pd.DataFrame([{'a': 1}])
>>> dtypes = {'b': 'Int64'}
>>> add_missing_cols_to_df(df, dtypes)
      a  b
   0  1  <NA>
>>> add_missing_cols_to_df(df, dtypes)meerschaum.utils.dtypes
a    int64
b    Int64
dtype: object
>>> add_missing_cols_to_df(df, {'a': 'object'})meerschaum.utils.dtypes
a    int64
dtype: object
>>>
def filter_unseen_df( old_df: pandas.core.frame.DataFrame, new_df: pandas.core.frame.DataFrame, safe_copy: bool = True, dtypes: Optional[Dict[str, Any]] = None, debug: bool = False) -> pandas.core.frame.DataFrame:
 81def filter_unseen_df(
 82        old_df: 'pd.DataFrame',
 83        new_df: 'pd.DataFrame',
 84        safe_copy: bool = True,
 85        dtypes: Optional[Dict[str, Any]] = None,
 86        debug: bool = False,
 87    ) -> 'pd.DataFrame':
 88    """
 89    Left join two DataFrames to find the newest unseen data.
 90
 91    Parameters
 92    ----------
 93    old_df: 'pd.DataFrame'
 94        The original (target) dataframe. Acts as a filter on the `new_df`.
 95        
 96    new_df: 'pd.DataFrame'
 97        The fetched (source) dataframe. Rows that are contained in `old_df` are removed.
 98
 99    safe_copy: bool, default True
100        If `True`, create a copy before comparing and modifying the dataframes.
101        Setting to `False` may mutate the DataFrames.
102        
103    dtypes: Optional[Dict[str, Any]], default None
104        Optionally specify the datatypes of the dataframe.
105
106    debug: bool, default False
107        Verbosity toggle.
108
109    Returns
110    -------
111    A pandas dataframe of the new, unseen rows in `new_df`.
112
113    Examples
114    --------
115    ```python
116    >>> import pandas as pd
117    >>> df1 = pd.DataFrame({'a': [1,2]})
118    >>> df2 = pd.DataFrame({'a': [2,3]})
119    >>> filter_unseen_df(df1, df2)
120       a
121    0  3
122
123    ```
124
125    """
126    if old_df is None:
127        return new_df
128
129    if safe_copy:
130        old_df = old_df.copy()
131        new_df = new_df.copy()
132
133    import json
134    import functools
135    import traceback
136    from decimal import Decimal
137    from meerschaum.utils.warnings import warn
138    from meerschaum.utils.packages import import_pandas, attempt_import
139    from meerschaum.utils.dtypes import to_pandas_dtype, are_dtypes_equal, attempt_cast_to_numeric
140    from meerschaum.utils.debug import dprint
141    pd = import_pandas(debug=debug)
142    is_dask = 'dask' in new_df.__module__
143    if is_dask:
144        pandas = attempt_import('pandas')
145        dd = attempt_import('dask.dataframe')
146        merge = dd.merge
147        NA = pandas.NA
148    else:
149        merge = pd.merge
150        NA = pd.NA
151
152    new_df_dtypes = dict(new_df.dtypes)
153    old_df_dtypes = dict(old_df.dtypes)
154
155    same_cols = set(new_df.columns) == set(old_df.columns)
156    if not same_cols:
157        new_df = add_missing_cols_to_df(new_df, old_df_dtypes)
158        old_df = add_missing_cols_to_df(old_df, new_df_dtypes)
159
160    ### Edge case: two empty lists cast to DFs.
161    elif len(new_df.columns) == 0:
162        return new_df
163
164    try:
165        ### Order matters when checking equality.
166        new_df = new_df[old_df.columns]
167    except Exception as e:
168        warn(
169            "Was not able to cast old columns onto new DataFrame. " +
170            f"Are both DataFrames the same shape? Error:\n{e}",
171            stacklevel = 3,
172        )
173        return new_df[list(new_df_dtypes.keys())]
174
175    ### assume the old_df knows what it's doing, even if it's technically wrong.
176    if dtypes is None:
177        dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()}
178
179    dtypes = {
180        col: to_pandas_dtype(typ)
181        for col, typ in dtypes.items()
182        if col in new_df_dtypes and col in old_df_dtypes
183    }
184    for col, typ in new_df_dtypes.items():
185        if col not in dtypes:
186            dtypes[col] = typ
187    
188    cast_cols = True
189    try:
190        new_df = new_df.astype(dtypes)
191        cast_cols = False
192    except Exception as e:
193        warn(
194            f"Was not able to cast the new DataFrame to the given dtypes.\n{e}"
195        )
196
197    new_numeric_cols_existing = get_numeric_cols(new_df)
198    old_numeric_cols = get_numeric_cols(old_df)
199    for col, typ in {k: v for k, v in dtypes.items()}.items():
200        if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')):
201            new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float')
202            new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int')
203            new_is_numeric = col in new_numeric_cols_existing
204            old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float')
205            old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int')
206            old_is_numeric = col in old_numeric_cols
207
208            if (
209                (new_is_float or new_is_int or new_is_numeric)
210                and
211                (old_is_float or old_is_int or old_is_numeric)
212            ):
213                dtypes[col] = attempt_cast_to_numeric
214                cast_cols = True
215                continue
216
217            ### Fallback to object if the types don't match.
218            warn(
219                f"Detected different types for '{col}' "
220                + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), "
221                + "falling back to 'object'..."
222            )
223            dtypes[col] = 'object'
224            cast_cols = True
225
226    if cast_cols:
227        for col, dtype in dtypes.items():
228            if col in new_df.columns:
229                try:
230                    new_df[col] = (
231                        new_df[col].astype(dtype)
232                        if not callable(dtype)
233                        else new_df[col].apply(dtype)
234                    )
235                except Exception as e:
236                    warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}")
237
238    serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str)
239    new_json_cols = get_json_cols(new_df)
240    old_json_cols = get_json_cols(old_df)
241    json_cols = set(new_json_cols + old_json_cols)
242    for json_col in old_json_cols:
243        old_df[json_col] = old_df[json_col].apply(serializer)
244    for json_col in new_json_cols:
245        new_df[json_col] = new_df[json_col].apply(serializer)
246
247    new_numeric_cols = get_numeric_cols(new_df)
248    numeric_cols = set(new_numeric_cols + old_numeric_cols)
249    for numeric_col in old_numeric_cols:
250        old_df[numeric_col] = old_df[numeric_col].apply(
251            lambda x: f'{x:f}' if isinstance(x, Decimal) else x
252        )
253    for numeric_col in new_numeric_cols:
254        new_df[numeric_col] = new_df[numeric_col].apply(
255            lambda x: f'{x:f}' if isinstance(x, Decimal) else x
256        )
257
258    joined_df = merge(
259        new_df.fillna(NA),
260        old_df.fillna(NA),
261        how = 'left',
262        on = None,
263        indicator = True,
264    )
265    changed_rows_mask = (joined_df['_merge'] == 'left_only')
266    delta_df = joined_df[list(new_df_dtypes.keys())][changed_rows_mask].reset_index(drop=True)
267
268    for json_col in json_cols:
269        if json_col not in delta_df.columns:
270            continue
271        try:
272            delta_df[json_col] = delta_df[json_col].apply(json.loads)
273        except Exception as e:
274            warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}")
275
276    for numeric_col in numeric_cols:
277        if numeric_col not in delta_df.columns:
278            continue
279        try:
280            delta_df[numeric_col] = delta_df[numeric_col].apply(attempt_cast_to_numeric)
281        except Exception as e:
282            warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}")
283
284    return delta_df

Left join two DataFrames to find the newest unseen data.

Parameters
  • old_df ('pd.DataFrame'): The original (target) dataframe. Acts as a filter on the new_df.
  • new_df ('pd.DataFrame'): The fetched (source) dataframe. Rows that are contained in old_df are removed.
  • safe_copy (bool, default True): If True, create a copy before comparing and modifying the dataframes. Setting to False may mutate the DataFrames.
  • dtypes (Optional[Dict[str, Any]], default None): Optionally specify the datatypes of the dataframe.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pandas dataframe of the new, unseen rows in new_df.
Examples
>>> import pandas as pd
>>> df1 = pd.DataFrame({'a': [1,2]})
>>> df2 = pd.DataFrame({'a': [2,3]})
>>> filter_unseen_df(df1, df2)
   a
0  3
def parse_df_datetimes( df: pandas.core.frame.DataFrame, ignore_cols: Optional[Iterable[str]] = None, chunksize: Optional[int] = None, dtype_backend: str = 'numpy_nullable', debug: bool = False) -> pandas.core.frame.DataFrame:
287def parse_df_datetimes(
288        df: 'pd.DataFrame',
289        ignore_cols: Optional[Iterable[str]] = None,
290        chunksize: Optional[int] = None,
291        dtype_backend: str = 'numpy_nullable',
292        debug: bool = False,
293    ) -> 'pd.DataFrame':
294    """
295    Parse a pandas DataFrame for datetime columns and cast as datetimes.
296
297    Parameters
298    ----------
299    df: pd.DataFrame
300        The pandas DataFrame to parse.
301
302    ignore_cols: Optional[Iterable[str]], default None
303        If provided, do not attempt to coerce these columns as datetimes.
304
305    chunksize: Optional[int], default None
306        If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe.
307
308    dtype_backend: str, default 'numpy_nullable'
309        If `df` is not a DataFrame and new one needs to be constructed,
310        use this as the datatypes backend.
311        Accepted values are 'numpy_nullable' and 'pyarrow'.
312        
313    debug: bool, default False
314        Verbosity toggle.
315
316    Returns
317    -------
318    A new pandas DataFrame with the determined datetime columns
319    (usually ISO strings) cast as datetimes.
320
321    Examples
322    --------
323    ```python
324    >>> import pandas as pd
325    >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 
326    >>> df.dtypes
327    a    object
328    dtype: object
329    >>> df = parse_df_datetimes(df)
330    >>> df.dtypes
331    a    datetime64[ns]
332    dtype: object
333
334    ```
335
336    """
337    from meerschaum.utils.packages import import_pandas, attempt_import
338    from meerschaum.utils.debug import dprint
339    from meerschaum.utils.warnings import warn
340    import traceback
341    pd = import_pandas()
342    pandas = attempt_import('pandas')
343    pd_name = pd.__name__
344    using_dask = 'dask' in pd_name
345    df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__)
346    dask_dataframe = None
347    if using_dask or df_is_dask:
348        npartitions = chunksize_to_npartitions(chunksize)
349        dask_dataframe = attempt_import('dask.dataframe')
350
351    ### if df is a dict, build DataFrame
352    if isinstance(df, pandas.DataFrame):
353        pdf = df
354    elif df_is_dask and isinstance(df, dask_dataframe.DataFrame):
355        pdf = get_first_valid_dask_partition(df)
356    else:
357        if debug:
358            dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...")
359
360        if using_dask:
361            if isinstance(df, list):
362                keys = set()
363                for doc in df:
364                    for key in doc:
365                        keys.add(key)
366                df = pd.DataFrame.from_dict(
367                    {
368                        k: [
369                            doc.get(k, None)
370                            for doc in df
371                        ] for k in keys
372                    },
373                    npartitions = npartitions,
374                )
375            elif isinstance(df, dict):
376                df = pd.DataFrame.from_dict(df, npartitions=npartitions)
377            elif 'pandas.core.frame.DataFrame' in str(type(df)):
378                df = pd.from_pandas(df, npartitions=npartitions)
379            else:
380                raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.")
381            pandas = attempt_import('pandas')
382            pdf = get_first_valid_dask_partition(df)
383
384        else:
385            df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend)
386            pdf = df
387
388    ### skip parsing if DataFrame is empty
389    if len(pdf) == 0:
390        if debug:
391            dprint(f"df is empty. Returning original DataFrame without casting datetime columns...")
392        return df
393
394    ignore_cols = set(
395        (ignore_cols or []) + [
396            col
397            for col, dtype in pdf.dtypes.items() 
398            if 'datetime' in str(dtype)
399        ]
400    )
401    cols_to_inspect = [col for col in pdf.columns if col not in ignore_cols]
402
403    if len(cols_to_inspect) == 0:
404        if debug:
405            dprint(f"All columns are ignored, skipping datetime detection...")
406        return df
407
408    ### apply regex to columns to determine which are ISO datetimes
409    iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+'
410    dt_mask = pdf[cols_to_inspect].astype(str).apply(
411        lambda s: s.str.match(iso_dt_regex).all()
412    )
413
414    ### list of datetime column names
415    datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]]
416    if not datetime_cols:
417        if debug:
418            dprint("No columns detected as datetimes, returning...")
419        return df
420
421    if debug:
422        dprint("Converting columns to datetimes: " + str(datetime_cols))
423
424    try:
425        if not using_dask:
426            df[datetime_cols] = df[datetime_cols].apply(pd.to_datetime, utc=True)
427        else:
428            df[datetime_cols] = df[datetime_cols].apply(
429                pd.to_datetime,
430                utc = True,
431                axis = 1,
432                meta = {
433                    col: 'datetime64[ns]'
434                    for col in datetime_cols
435                }
436            )
437    except Exception as e:
438        warn(
439            f"Unable to apply `pd.to_datetime` to {items_str(datetime_cols)}:\n"
440            + f"{traceback.format_exc()}"
441        )
442
443    for dt in datetime_cols:
444        try:
445            df[dt] = df[dt].dt.tz_localize(None)
446        except Exception as e:
447            warn(f"Unable to convert column '{dt}' to naive datetime:\n{traceback.format_exc()}")
448
449    return df

Parse a pandas DataFrame for datetime columns and cast as datetimes.

Parameters
  • df (pd.DataFrame): The pandas DataFrame to parse.
  • ignore_cols (Optional[Iterable[str]], default None): If provided, do not attempt to coerce these columns as datetimes.
  • chunksize (Optional[int], default None): If the pandas implementation is 'dask', use this chunksize for the distributed dataframe.
  • dtype_backend (str, default 'numpy_nullable'): If df is not a DataFrame and new one needs to be constructed, use this as the datatypes backend. Accepted values are 'numpy_nullable' and 'pyarrow'.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A new pandas DataFrame with the determined datetime columns
  • (usually ISO strings) cast as datetimes.
Examples
>>> import pandas as pd
>>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 
>>> df.dtypes
a    object
dtype: object
>>> df = parse_df_datetimes(df)
>>> df.dtypes
a    datetime64[ns]
dtype: object
def get_unhashable_cols(df: pandas.core.frame.DataFrame) -> List[str]:
452def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]:
453    """
454    Get the columns which contain unhashable objects from a Pandas DataFrame.
455
456    Parameters
457    ----------
458    df: pd.DataFrame
459        The DataFrame which may contain unhashable objects.
460
461    Returns
462    -------
463    A list of columns.
464    """
465    if len(df) == 0:
466        return []
467
468    is_dask = 'dask' in df.__module__
469    if is_dask:
470        from meerschaum.utils.packages import attempt_import
471        pandas = attempt_import('pandas')
472        df = pandas.DataFrame(get_first_valid_dask_partition(df))
473    return [
474        col for col, val in df.iloc[0].items()
475        if not isinstance(val, Hashable)
476    ]

Get the columns which contain unhashable objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
  • A list of columns.
def get_json_cols(df: pandas.core.frame.DataFrame) -> List[str]:
479def get_json_cols(df: 'pd.DataFrame') -> List[str]:
480    """
481    Get the columns which contain unhashable objects from a Pandas DataFrame.
482
483    Parameters
484    ----------
485    df: pd.DataFrame
486        The DataFrame which may contain unhashable objects.
487
488    Returns
489    -------
490    A list of columns to be encoded as JSON.
491    """
492    is_dask = 'dask' in df.__module__
493    if is_dask:
494        df = get_first_valid_dask_partition(df)
495    
496    if len(df) == 0:
497        return []
498
499    cols_indices = {
500        col: df[col].first_valid_index()
501        for col in df.columns
502    }
503    return [
504        col
505        for col, ix in cols_indices.items()
506        if (
507            ix is not None
508            and
509            not isinstance(df.loc[ix][col], Hashable)
510        )
511    ]

Get the columns which contain unhashable objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
  • A list of columns to be encoded as JSON.
def get_numeric_cols(df: pandas.core.frame.DataFrame) -> List[str]:
514def get_numeric_cols(df: 'pd.DataFrame') -> List[str]:
515    """
516    Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame.
517
518    Parameters
519    ----------
520    df: pd.DataFrame
521        The DataFrame which may contain decimal objects.
522
523    Returns
524    -------
525    A list of columns to treat as numerics.
526    """
527    from decimal import Decimal
528    is_dask = 'dask' in df.__module__
529    if is_dask:
530        df = get_first_valid_dask_partition(df)
531    
532    if len(df) == 0:
533        return []
534
535    cols_indices = {
536        col: df[col].first_valid_index()
537        for col in df.columns
538    }
539    return [
540        col
541        for col, ix in cols_indices.items()
542        if (
543            ix is not None
544            and
545            isinstance(df.loc[ix][col], Decimal)
546        )
547    ]

Get the columns which contain decimal.Decimal objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain decimal objects.
Returns
  • A list of columns to treat as numerics.
def enforce_dtypes( df: pandas.core.frame.DataFrame, dtypes: Dict[str, str], safe_copy: bool = True, coerce_numeric: bool = True, debug: bool = False) -> pandas.core.frame.DataFrame:
550def enforce_dtypes(
551        df: 'pd.DataFrame',
552        dtypes: Dict[str, str],
553        safe_copy: bool = True,
554        coerce_numeric: bool = True,
555        debug: bool = False,
556    ) -> 'pd.DataFrame':
557    """
558    Enforce the `dtypes` dictionary on a DataFrame.
559
560    Parameters
561    ----------
562    df: pd.DataFrame
563        The DataFrame on which to enforce dtypes.
564
565    dtypes: Dict[str, str]
566        The data types to attempt to enforce on the DataFrame.
567
568    safe_copy: bool, default True
569        If `True`, create a copy before comparing and modifying the dataframes.
570        Setting to `False` may mutate the DataFrames.
571        See `meerschaum.utils.dataframe.filter_unseen_df`.
572
573    coerce_numeric: bool, default True
574        If `True`, convert float and int collisions to numeric.
575
576    debug: bool, default False
577        Verbosity toggle.
578
579    Returns
580    -------
581    The Pandas DataFrame with the types enforced.
582    """
583    import json
584    import traceback
585    from decimal import Decimal
586    from meerschaum.utils.debug import dprint
587    from meerschaum.utils.warnings import warn
588    from meerschaum.utils.formatting import pprint
589    from meerschaum.config.static import STATIC_CONFIG
590    from meerschaum.utils.packages import import_pandas
591    from meerschaum.utils.dtypes import (
592        are_dtypes_equal,
593        to_pandas_dtype,
594        is_dtype_numeric,
595        attempt_cast_to_numeric,
596    )
597    if safe_copy:
598        df = df.copy()
599    df_dtypes = {c: str(t) for c, t in df.dtypes.items()}
600    if len(df_dtypes) == 0:
601        if debug:
602            dprint("Incoming DataFrame has no columns. Skipping enforcement...")
603        return df
604
605    pipe_pandas_dtypes = {
606        col: to_pandas_dtype(typ)
607        for col, typ in dtypes.items()
608    }
609    json_cols = [
610        col
611        for col, typ in dtypes.items()
612        if typ == 'json'
613    ]
614    numeric_cols = [
615        col
616        for col, typ in dtypes.items()
617        if typ == 'numeric'
618    ]
619    df_numeric_cols = get_numeric_cols(df)
620    if debug:
621        dprint(f"Desired data types:")
622        pprint(dtypes)
623        dprint(f"Data types for incoming DataFrame:")
624        pprint(df_dtypes)
625
626    if json_cols and len(df) > 0:
627        if debug:
628            dprint(f"Checking columns for JSON encoding: {json_cols}")
629        for col in json_cols:
630            if col in df.columns:
631                try:
632                    df[col] = df[col].apply(
633                        (
634                            lambda x: (
635                                json.loads(x)
636                                if isinstance(x, str)
637                                else x
638                            )
639                        )
640                    )
641                except Exception as e:
642                    if debug:
643                        dprint(f"Unable to parse column '{col}' as JSON:\n{e}")
644
645    if numeric_cols:
646        if debug:
647            dprint(f"Checking for numerics: {numeric_cols}")
648        for col in numeric_cols:
649            if col in df.columns:
650                try:
651                    df[col] = df[col].apply(attempt_cast_to_numeric)
652                except Exception as e:
653                    if debug:
654                        dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}")
655
656    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
657        if debug:
658            dprint(f"Data types match. Exiting enforcement...")
659        return df
660
661    common_dtypes = {}
662    common_diff_dtypes = {}
663    for col, typ in pipe_pandas_dtypes.items():
664        if col in df_dtypes:
665            common_dtypes[col] = typ
666            if not are_dtypes_equal(typ, df_dtypes[col]):
667                common_diff_dtypes[col] = df_dtypes[col]
668
669    if debug:
670        dprint(f"Common columns with different dtypes:")
671        pprint(common_diff_dtypes)
672
673    detected_dt_cols = {}
674    for col, typ in common_diff_dtypes.items():
675        if 'datetime' in typ and 'datetime' in common_dtypes[col]:
676            df_dtypes[col] = typ
677            detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col])
678    for col in detected_dt_cols:
679        del common_diff_dtypes[col]
680
681    if debug:
682        dprint(f"Common columns with different dtypes (after dates):")
683        pprint(common_diff_dtypes)
684
685    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
686        if debug:
687            dprint(
688                "The incoming DataFrame has mostly the same types, skipping enforcement."
689                + f"The only detected difference was in the following datetime columns.\n"
690                + "    Timezone information may be stripped."
691            )
692            pprint(detected_dt_cols)
693        return df
694
695    for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items():
696        previous_typ = common_dtypes[col]
697        mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ))
698        explicitly_float = are_dtypes_equal(dtypes.get(col, 'object'), 'float')
699        explicitly_numeric = dtypes.get(col, 'numeric') == 'numeric'
700        cast_to_numeric = (
701            explicitly_numeric
702            or col in df_numeric_cols
703            or (mixed_numeric_types and not explicitly_float)
704        ) and coerce_numeric
705        if cast_to_numeric:
706            common_dtypes[col] = attempt_cast_to_numeric
707            common_diff_dtypes[col] = attempt_cast_to_numeric
708
709    for d in common_diff_dtypes:
710        t = common_dtypes[d]
711        if debug:
712            dprint(f"Casting column {d} to dtype {t}.")
713        try:
714            df[d] = (
715                df[d].apply(t)
716                if callable(t)
717                else df[d].astype(t)
718            )
719        except Exception as e:
720            if debug:
721                dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}")
722            if 'int' in str(t).lower():
723                try:
724                    df[d] = df[d].astype('float64').astype(t)
725                except Exception as e:
726                    if debug:
727                        dprint(f"Was unable to convert to float then {t}.")
728    return df

Enforce the dtypes dictionary on a DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame on which to enforce dtypes.
  • dtypes (Dict[str, str]): The data types to attempt to enforce on the DataFrame.
  • safe_copy (bool, default True): If True, create a copy before comparing and modifying the dataframes. Setting to False may mutate the DataFrames. See filter_unseen_df.
  • coerce_numeric (bool, default True): If True, convert float and int collisions to numeric.
  • debug (bool, default False): Verbosity toggle.
Returns
  • The Pandas DataFrame with the types enforced.
def get_datetime_bound_from_df( df: Union[pandas.core.frame.DataFrame, dict, list], datetime_column: str, minimum: bool = True) -> Union[int, datetime.datetime, NoneType]:
731def get_datetime_bound_from_df(
732        df: Union['pd.DataFrame', dict, list],
733        datetime_column: str,
734        minimum: bool = True,
735    ) -> Union[int, datetime, None]:
736    """
737    Return the minimum or maximum datetime (or integer) from a DataFrame.
738
739    Parameters
740    ----------
741    df: pd.DataFrame
742        The DataFrame, list, or dict which contains the range axis.
743
744    datetime_column: str
745        The name of the datetime (or int) column.
746
747    minimum: bool
748        Whether to return the minimum (default) or maximum value.
749
750    Returns
751    -------
752    The minimum or maximum datetime value in the dataframe, or `None`.
753    """
754    if not datetime_column:
755        return None
756
757    def compare(a, b):
758        if a is None:
759            return b
760        if b is None:
761            return a
762        if minimum:
763            return a if a < b else b
764        return a if a > b else b
765
766    if isinstance(df, list):
767        if len(df) == 0:
768            return None
769        best_yet = df[0].get(datetime_column, None)
770        for doc in df:
771            val = doc.get(datetime_column, None)
772            best_yet = compare(best_yet, val)
773        return best_yet
774
775    if isinstance(df, dict):
776        if datetime_column not in df:
777            return None
778        best_yet = df[datetime_column][0]
779        for val in df[datetime_column]:
780            best_yet = compare(best_yet, val)
781        return best_yet
782
783    if 'DataFrame' in str(type(df)):
784        if datetime_column not in df.columns:
785            return None
786        return (
787            df[datetime_column].min(skipna=True)
788            if minimum
789            else df[datetime_column].max(skipna=True)
790        )
791
792    return None

Return the minimum or maximum datetime (or integer) from a DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame, list, or dict which contains the range axis.
  • datetime_column (str): The name of the datetime (or int) column.
  • minimum (bool): Whether to return the minimum (default) or maximum value.
Returns
  • The minimum or maximum datetime value in the dataframe, or None.
def df_is_chunk_generator(df: Any) -> bool:
795def df_is_chunk_generator(df: Any) -> bool:
796    """
797    Determine whether to treat `df` as a chunk generator.
798
799    Note this should only be used in a context where generators are expected,
800    as it will return `True` for any iterable.
801
802    Parameters
803    ----------
804    The DataFrame or chunk generator to evaluate.
805
806    Returns
807    -------
808    A `bool` indicating whether to treat `df` as a generator.
809    """
810    return (
811        not isinstance(df, (dict, list, str))
812        and 'DataFrame' not in str(type(df))
813        and isinstance(df, (Generator, Iterable, Iterator))
814    )

Determine whether to treat df as a chunk generator.

Note this should only be used in a context where generators are expected, as it will return True for any iterable.

Parameters
  • The DataFrame or chunk generator to evaluate.
Returns
  • A bool indicating whether to treat df as a generator.
def chunksize_to_npartitions(chunksize: Optional[int]) -> int:
817def chunksize_to_npartitions(chunksize: Optional[int]) -> int:
818    """
819    Return the Dask `npartitions` value for a given `chunksize`.
820    """
821    if chunksize == -1:
822        from meerschaum.config import get_config
823        chunksize = get_config('system', 'connectors', 'sql', 'chunksize')
824    if chunksize is None:
825        return 1
826    return -1 * chunksize

Return the Dask npartitions value for a given chunksize.

def df_from_literal( pipe: Optional[meerschaum.Pipe] = None, literal: str = None, debug: bool = False) -> pandas.core.frame.DataFrame:
829def df_from_literal(
830        pipe: Optional[mrsm.Pipe] = None,
831        literal: str = None,
832        debug: bool = False
833    ) -> 'pd.DataFrame':
834    """
835    Construct a dataframe from a literal value, using the pipe's datetime and value column names.
836
837    Parameters
838    ----------
839    pipe: Optional['meerschaum.Pipe'], default None
840        The pipe which will consume the literal value.
841
842    Returns
843    -------
844    A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
845    and the literal as the value.
846    """
847    from meerschaum.utils.packages import import_pandas
848    from meerschaum.utils.warnings import error, warn
849    from meerschaum.utils.debug import dprint
850
851    if pipe is None or literal is None:
852        error("Please provide a Pipe and a literal value")
853    ### this will raise an error if the columns are undefined
854    dt_name, val_name = pipe.get_columns('datetime', 'value')
855
856    val = literal
857    if isinstance(literal, str):
858        if debug:
859            dprint(f"Received literal string: '{literal}'")
860        import ast
861        try:
862            val = ast.literal_eval(literal)
863        except Exception as e:
864            warn(
865                "Failed to parse value from string:\n" + f"{literal}" +
866                "\n\nWill cast as a string instead."\
867            )
868            val = literal
869
870    from datetime import datetime, timezone
871    now = datetime.now(timezone.utc).replace(tzinfo=None)
872
873    pd = import_pandas()
874    return pd.DataFrame({dt_name: [now], val_name: [val]})

Construct a dataframe from a literal value, using the pipe's datetime and value column names.

Parameters
  • pipe (Optional['meerschaum.Pipe'], default None): The pipe which will consume the literal value.
Returns
  • A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
  • and the literal as the value.
def get_first_valid_dask_partition( ddf: "'dask.dataframe.DataFrame'") -> Optional[pandas.core.frame.DataFrame]:
877def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]:
878    """
879    Return the first valid Dask DataFrame partition (if possible).
880    """
881    pdf = None
882    for partition in ddf.partitions:
883        try:
884            pdf = partition.compute()
885        except Exception as e:
886            continue
887        if len(pdf) > 0:
888            return pdf
889    return ddf.compute()

Return the first valid Dask DataFrame partition (if possible).

def query_df( df: pandas.core.frame.DataFrame, params: Optional[Dict[str, Any]] = None, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, datetime_column: Optional[str] = None, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, inplace: bool = False, reset_index: bool = False, debug: bool = False) -> pandas.core.frame.DataFrame:
 892def query_df(
 893        df: 'pd.DataFrame',
 894        params: Optional[Dict[str, Any]] = None,
 895        begin: Union[datetime, int, None] = None,
 896        end: Union[datetime, int, None] = None,
 897        datetime_column: Optional[str] = None,
 898        select_columns: Optional[List[str]] = None,
 899        omit_columns: Optional[List[str]] = None,
 900        inplace: bool = False,
 901        reset_index: bool = False,
 902        debug: bool = False,
 903    ) -> 'pd.DataFrame':
 904    """
 905    Query the dataframe with the params dictionary.
 906
 907    Parameters
 908    ----------
 909    df: pd.DataFrame
 910        The DataFrame to query against.
 911
 912    params: Optional[Dict[str, Any]], default None
 913        The parameters dictionary to use for the query.
 914
 915    begin: Union[datetime, int, None], default None
 916        If `begin` and `datetime_column` are provided, only return rows with a timestamp
 917        greater than or equal to this value.
 918
 919    end: Union[datetime, int, None], default None
 920        If `begin` and `datetime_column` are provided, only return rows with a timestamp
 921        less than this value.
 922
 923    datetime_column: Optional[str], default None
 924        A `datetime_column` must be provided to use `begin` and `end`.
 925
 926    select_columns: Optional[List[str]], default None
 927        If provided, only return these columns.
 928
 929    omit_columns: Optional[List[str]], default None
 930        If provided, do not include these columns in the result.
 931
 932    inplace: bool, default False
 933        If `True`, modify the DataFrame inplace rather than creating a new DataFrame.
 934
 935    reset_index: bool, default True
 936        If `True`, reset the index in the resulting DataFrame.
 937
 938    Returns
 939    -------
 940    A Pandas DataFrame query result.
 941    """
 942    if not params and not begin and not end:
 943        return df
 944
 945    import json
 946    import meerschaum as mrsm
 947    from meerschaum.utils.debug import dprint
 948    from meerschaum.utils.misc import get_in_ex_params
 949    from meerschaum.utils.warnings import warn
 950
 951    dtypes = {col: str(typ) for col, typ in df.dtypes.items()}
 952
 953    if begin or end:
 954        if not datetime_column or datetime_column not in df.columns:
 955            warn(
 956                f"The datetime column '{datetime_column}' is not present in the Dataframe, "
 957                + "ignoring begin and end...",
 958            )
 959            begin, end = None, None
 960
 961    if debug:
 962        dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}")
 963
 964    in_ex_params = get_in_ex_params(params)
 965
 966    def serialize(x: Any) -> str:
 967        if isinstance(x, (dict, list, tuple)):
 968            return json.dumps(x, sort_keys=True, separators=(',', ':'), default=str)
 969        if hasattr(x, 'isoformat'):
 970            return x.isoformat()
 971        return str(x)
 972
 973    masks = [
 974        (
 975            (df[datetime_column] >= begin)
 976            if begin is not None and datetime_column
 977            else True
 978        ) & (
 979            (df[datetime_column] < end)
 980            if end is not None and datetime_column
 981            else True
 982        )
 983    ]
 984
 985    masks.extend([
 986        (
 987            (
 988                df[col].apply(serialize).isin(
 989                    [
 990                        serialize(_in_val)
 991                        for _in_val in in_vals
 992                    ]
 993                ) if in_vals else True
 994            ) & (
 995                ~df[col].apply(serialize).isin(
 996                    [
 997                        serialize(_ex_val)
 998                        for _ex_val in ex_vals
 999                    ]
1000                ) if ex_vals else True
1001            )
1002        )
1003        for col, (in_vals, ex_vals) in in_ex_params.items()
1004        if col in df.columns
1005    ])
1006    query_mask = masks[0]
1007    for mask in masks:
1008        query_mask = query_mask & mask
1009
1010    if inplace:
1011        df.where(query_mask, inplace=inplace)
1012        df.dropna(how='all', inplace=inplace)
1013        result_df = df
1014    else:
1015        result_df = df.where(query_mask).dropna(how='all')
1016
1017    if reset_index:
1018        result_df.reset_index(drop=True, inplace=True)
1019
1020    result_df = enforce_dtypes(
1021        result_df,
1022        dtypes,
1023        safe_copy = (not inplace),
1024        debug = debug,
1025        coerce_numeric = False,
1026    )
1027
1028    if select_columns == ['*']:
1029        select_columns = None
1030
1031    if not select_columns and not omit_columns:
1032        return result_df
1033
1034    if select_columns:
1035        for col in list(result_df.columns):
1036            if col not in select_columns:
1037                del result_df[col]
1038        return result_df
1039
1040    if omit_columns:
1041        for col in list(result_df.columns):
1042            if col in omit_columns:
1043                del result_df[col]
1044    if debug:
1045        dprint(f"{dtypes=}")
1046    return result_df

Query the dataframe with the params dictionary.

Parameters
  • df (pd.DataFrame): The DataFrame to query against.
  • params (Optional[Dict[str, Any]], default None): The parameters dictionary to use for the query.
  • begin (Union[datetime, int, None], default None): If begin and datetime_column are provided, only return rows with a timestamp greater than or equal to this value.
  • end (Union[datetime, int, None], default None): If begin and datetime_column are provided, only return rows with a timestamp less than this value.
  • datetime_column (Optional[str], default None): A datetime_column must be provided to use begin and end.
  • select_columns (Optional[List[str]], default None): If provided, only return these columns.
  • omit_columns (Optional[List[str]], default None): If provided, do not include these columns in the result.
  • inplace (bool, default False): If True, modify the DataFrame inplace rather than creating a new DataFrame.
  • reset_index (bool, default True): If True, reset the index in the resulting DataFrame.
Returns
  • A Pandas DataFrame query result.