meerschaum.utils.dataframe

Utility functions for working with DataFrames.

   1#! /usr/bin/env python3
   2# -*- coding: utf-8 -*-
   3# vim:fenc=utf-8
   4
   5"""
   6Utility functions for working with DataFrames.
   7"""
   8
   9from __future__ import annotations
  10from datetime import datetime
  11from meerschaum.utils.typing import (
  12    Optional, Dict, Any, List, Hashable, Generator,
  13    Iterator, Iterable, Union, Tuple,
  14)
  15
  16
  17def add_missing_cols_to_df(df: 'pd.DataFrame', dtypes: Dict[str, Any]) -> pd.DataFrame:
  18    """
  19    Add columns from the dtypes dictionary as null columns to a new DataFrame.
  20
  21    Parameters
  22    ----------
  23    df: pd.DataFrame
  24        The dataframe we should copy and add null columns.
  25
  26    dtypes:
  27        The data types dictionary which may contain keys not present in `df.columns`.
  28
  29    Returns
  30    -------
  31    A new `DataFrame` with the keys from `dtypes` added as null columns.
  32    If `df.dtypes` is the same as `dtypes`, then return a reference to `df`.
  33    NOTE: This will not ensure that dtypes are enforced!
  34
  35    Examples
  36    --------
  37    >>> import pandas as pd
  38    >>> df = pd.DataFrame([{'a': 1}])
  39    >>> dtypes = {'b': 'Int64'}
  40    >>> add_missing_cols_to_df(df, dtypes)
  41          a  b
  42       0  1  <NA>
  43    >>> add_missing_cols_to_df(df, dtypes).dtypes
  44    a    int64
  45    b    Int64
  46    dtype: object
  47    >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes
  48    a    int64
  49    dtype: object
  50    >>> 
  51    """
  52    if set(df.columns) == set(dtypes):
  53        return df
  54
  55    import traceback
  56    from meerschaum.utils.packages import import_pandas, attempt_import
  57    from meerschaum.utils.warnings import warn
  58    from meerschaum.utils.dtypes import to_pandas_dtype
  59    pandas = attempt_import('pandas')
  60    
  61    def build_series(dtype: str):
  62        return pandas.Series([], dtype=to_pandas_dtype(dtype))
  63
  64    assign_kwargs = {
  65        str(col): build_series(str(typ))
  66        for col, typ in dtypes.items()
  67        if col not in df.columns
  68    }
  69    return df.assign(**assign_kwargs)
  70
  71
  72def filter_unseen_df(
  73        old_df: 'pd.DataFrame',
  74        new_df: 'pd.DataFrame',
  75        safe_copy: bool = True,
  76        dtypes: Optional[Dict[str, Any]] = None,
  77        debug: bool = False,
  78    ) -> 'pd.DataFrame':
  79    """
  80    Left join two DataFrames to find the newest unseen data.
  81
  82    Parameters
  83    ----------
  84    old_df: 'pd.DataFrame'
  85        The original (target) dataframe. Acts as a filter on the `new_df`.
  86        
  87    new_df: 'pd.DataFrame'
  88        The fetched (source) dataframe. Rows that are contained in `old_df` are removed.
  89
  90    safe_copy: bool, default True
  91        If `True`, create a copy before comparing and modifying the dataframes.
  92        Setting to `False` may mutate the DataFrames.
  93        
  94    dtypes: Optional[Dict[str, Any]], default None
  95        Optionally specify the datatypes of the dataframe.
  96
  97    debug: bool, default False
  98        Verbosity toggle.
  99
 100    Returns
 101    -------
 102    A pandas dataframe of the new, unseen rows in `new_df`.
 103
 104    Examples
 105    --------
 106    ```python
 107    >>> import pandas as pd
 108    >>> df1 = pd.DataFrame({'a': [1,2]})
 109    >>> df2 = pd.DataFrame({'a': [2,3]})
 110    >>> filter_unseen_df(df1, df2)
 111       a
 112    0  3
 113
 114    ```
 115
 116    """
 117    if old_df is None:
 118        return new_df
 119
 120    if safe_copy:
 121        old_df = old_df.copy()
 122        new_df = new_df.copy()
 123
 124    import json
 125    import functools
 126    import traceback
 127    from decimal import Decimal
 128    from meerschaum.utils.warnings import warn
 129    from meerschaum.utils.packages import import_pandas, attempt_import
 130    from meerschaum.utils.dtypes import to_pandas_dtype, are_dtypes_equal, attempt_cast_to_numeric
 131    from meerschaum.utils.debug import dprint
 132    pd = import_pandas(debug=debug)
 133    is_dask = 'dask' in new_df.__module__
 134    if is_dask:
 135        pandas = attempt_import('pandas')
 136        dd = attempt_import('dask.dataframe')
 137        merge = dd.merge
 138        NA = pandas.NA
 139    else:
 140        merge = pd.merge
 141        NA = pd.NA
 142
 143    new_df_dtypes = dict(new_df.dtypes)
 144    old_df_dtypes = dict(old_df.dtypes)
 145
 146    same_cols = set(new_df.columns) == set(old_df.columns)
 147    if not same_cols:
 148        new_df = add_missing_cols_to_df(new_df, old_df_dtypes)
 149        old_df = add_missing_cols_to_df(old_df, new_df_dtypes)
 150
 151    ### Edge case: two empty lists cast to DFs.
 152    elif len(new_df.columns) == 0:
 153        return new_df
 154
 155    try:
 156        ### Order matters when checking equality.
 157        new_df = new_df[old_df.columns]
 158    except Exception as e:
 159        warn(
 160            "Was not able to cast old columns onto new DataFrame. " +
 161            f"Are both DataFrames the same shape? Error:\n{e}",
 162            stacklevel = 3,
 163        )
 164        return new_df[list(new_df_dtypes.keys())]
 165
 166    ### assume the old_df knows what it's doing, even if it's technically wrong.
 167    if dtypes is None:
 168        dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()}
 169
 170    dtypes = {
 171        col: to_pandas_dtype(typ)
 172        for col, typ in dtypes.items()
 173        if col in new_df_dtypes and col in old_df_dtypes
 174    }
 175    for col, typ in new_df_dtypes.items():
 176        if col not in dtypes:
 177            dtypes[col] = typ
 178    
 179    cast_cols = True
 180    try:
 181        new_df = new_df.astype(dtypes)
 182        cast_cols = False
 183    except Exception as e:
 184        warn(
 185            f"Was not able to cast the new DataFrame to the given dtypes.\n{e}"
 186        )
 187
 188    new_numeric_cols_existing = get_numeric_cols(new_df)
 189    old_numeric_cols = get_numeric_cols(old_df)
 190    for col, typ in {k: v for k, v in dtypes.items()}.items():
 191        if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')):
 192            new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float')
 193            new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int')
 194            new_is_numeric = col in new_numeric_cols_existing
 195            old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float')
 196            old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int')
 197            old_is_numeric = col in old_numeric_cols
 198
 199            if (
 200                (new_is_float or new_is_int or new_is_numeric)
 201                and
 202                (old_is_float or old_is_int or old_is_numeric)
 203            ):
 204                dtypes[col] = attempt_cast_to_numeric
 205                cast_cols = True
 206                continue
 207
 208            ### Fallback to object if the types don't match.
 209            warn(
 210                f"Detected different types for '{col}' "
 211                + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), "
 212                + "falling back to 'object'..."
 213            )
 214            dtypes[col] = 'object'
 215            cast_cols = True
 216
 217    if cast_cols:
 218        for col, dtype in dtypes.items():
 219            if col in new_df.columns:
 220                try:
 221                    new_df[col] = (
 222                        new_df[col].astype(dtype)
 223                        if not callable(dtype)
 224                        else new_df[col].apply(dtype)
 225                    )
 226                except Exception as e:
 227                    warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}")
 228
 229    serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str)
 230    new_json_cols = get_json_cols(new_df)
 231    old_json_cols = get_json_cols(old_df)
 232    json_cols = set(new_json_cols + old_json_cols)
 233    for json_col in old_json_cols:
 234        old_df[json_col] = old_df[json_col].apply(serializer)
 235    for json_col in new_json_cols:
 236        new_df[json_col] = new_df[json_col].apply(serializer)
 237
 238    new_numeric_cols = get_numeric_cols(new_df)
 239    numeric_cols = set(new_numeric_cols + old_numeric_cols)
 240    for numeric_col in old_numeric_cols:
 241        old_df[numeric_col] = old_df[numeric_col].apply(
 242            lambda x: f'{x:f}' if isinstance(x, Decimal) else x
 243        )
 244    for numeric_col in new_numeric_cols:
 245        new_df[numeric_col] = new_df[numeric_col].apply(
 246            lambda x: f'{x:f}' if isinstance(x, Decimal) else x
 247        )
 248
 249    joined_df = merge(
 250        new_df.fillna(NA),
 251        old_df.fillna(NA),
 252        how = 'left',
 253        on = None,
 254        indicator = True,
 255    )
 256    changed_rows_mask = (joined_df['_merge'] == 'left_only')
 257    delta_df = joined_df[list(new_df_dtypes.keys())][changed_rows_mask].reset_index(drop=True)
 258
 259    for json_col in json_cols:
 260        if json_col not in delta_df.columns:
 261            continue
 262        try:
 263            delta_df[json_col] = delta_df[json_col].apply(json.loads)
 264        except Exception as e:
 265            warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}")
 266
 267    for numeric_col in numeric_cols:
 268        if numeric_col not in delta_df.columns:
 269            continue
 270        try:
 271            delta_df[numeric_col] = delta_df[numeric_col].apply(attempt_cast_to_numeric)
 272        except Exception as e:
 273            warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}")
 274
 275    return delta_df
 276
 277
 278def parse_df_datetimes(
 279        df: 'pd.DataFrame',
 280        ignore_cols: Optional[Iterable[str]] = None,
 281        chunksize: Optional[int] = None,
 282        dtype_backend: str = 'numpy_nullable',
 283        debug: bool = False,
 284    ) -> 'pd.DataFrame':
 285    """
 286    Parse a pandas DataFrame for datetime columns and cast as datetimes.
 287
 288    Parameters
 289    ----------
 290    df: pd.DataFrame
 291        The pandas DataFrame to parse.
 292
 293    ignore_cols: Optional[Iterable[str]], default None
 294        If provided, do not attempt to coerce these columns as datetimes.
 295
 296    chunksize: Optional[int], default None
 297        If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe.
 298
 299    dtype_backend: str, default 'numpy_nullable'
 300        If `df` is not a DataFrame and new one needs to be constructed,
 301        use this as the datatypes backend.
 302        Accepted values are 'numpy_nullable' and 'pyarrow'.
 303        
 304    debug: bool, default False
 305        Verbosity toggle.
 306
 307    Returns
 308    -------
 309    A new pandas DataFrame with the determined datetime columns
 310    (usually ISO strings) cast as datetimes.
 311
 312    Examples
 313    --------
 314    ```python
 315    >>> import pandas as pd
 316    >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 
 317    >>> df.dtypes
 318    a    object
 319    dtype: object
 320    >>> df = parse_df_datetimes(df)
 321    >>> df.dtypes
 322    a    datetime64[ns]
 323    dtype: object
 324
 325    ```
 326
 327    """
 328    from meerschaum.utils.packages import import_pandas, attempt_import
 329    from meerschaum.utils.debug import dprint
 330    from meerschaum.utils.warnings import warn
 331    import traceback
 332    pd = import_pandas()
 333    pandas = attempt_import('pandas')
 334    pd_name = pd.__name__
 335    using_dask = 'dask' in pd_name
 336    df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__)
 337    dask_dataframe = None
 338    if using_dask or df_is_dask:
 339        npartitions = chunksize_to_npartitions(chunksize)
 340        dask_dataframe = attempt_import('dask.dataframe')
 341
 342    ### if df is a dict, build DataFrame
 343    if isinstance(df, pandas.DataFrame):
 344        pdf = df
 345    elif df_is_dask and isinstance(df, dask_dataframe.DataFrame):
 346        pdf = get_first_valid_dask_partition(df)
 347    else:
 348        if debug:
 349            dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...")
 350
 351        if using_dask:
 352            if isinstance(df, list):
 353                keys = set()
 354                for doc in df:
 355                    for key in doc:
 356                        keys.add(key)
 357                df = pd.DataFrame.from_dict(
 358                    {
 359                        k: [
 360                            doc.get(k, None)
 361                            for doc in df
 362                        ] for k in keys
 363                    },
 364                    npartitions = npartitions,
 365                )
 366            elif isinstance(df, dict):
 367                df = pd.DataFrame.from_dict(df, npartitions=npartitions)
 368            elif 'pandas.core.frame.DataFrame' in str(type(df)):
 369                df = pd.from_pandas(df, npartitions=npartitions)
 370            else:
 371                raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.")
 372            pandas = attempt_import('pandas')
 373            pdf = get_first_valid_dask_partition(df)
 374
 375        else:
 376            df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend)
 377            pdf = df
 378
 379    ### skip parsing if DataFrame is empty
 380    if len(pdf) == 0:
 381        if debug:
 382            dprint(f"df is empty. Returning original DataFrame without casting datetime columns...")
 383        return df
 384
 385    ignore_cols = set(
 386        (ignore_cols or []) + [
 387            col
 388            for col, dtype in pdf.dtypes.items() 
 389            if 'datetime' in str(dtype)
 390        ]
 391    )
 392    cols_to_inspect = [col for col in pdf.columns if col not in ignore_cols]
 393
 394    if len(cols_to_inspect) == 0:
 395        if debug:
 396            dprint(f"All columns are ignored, skipping datetime detection...")
 397        return df
 398
 399    ### apply regex to columns to determine which are ISO datetimes
 400    iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+'
 401    dt_mask = pdf[cols_to_inspect].astype(str).apply(
 402        lambda s: s.str.match(iso_dt_regex).all()
 403    )
 404
 405    ### list of datetime column names
 406    datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]]
 407    if not datetime_cols:
 408        if debug:
 409            dprint("No columns detected as datetimes, returning...")
 410        return df
 411
 412    if debug:
 413        dprint("Converting columns to datetimes: " + str(datetime_cols))
 414
 415    try:
 416        if not using_dask:
 417            df[datetime_cols] = df[datetime_cols].apply(pd.to_datetime, utc=True)
 418        else:
 419            df[datetime_cols] = df[datetime_cols].apply(
 420                pd.to_datetime,
 421                utc = True,
 422                axis = 1,
 423                meta = {
 424                    col: 'datetime64[ns]'
 425                    for col in datetime_cols
 426                }
 427            )
 428    except Exception as e:
 429        warn(
 430            f"Unable to apply `pd.to_datetime` to {items_str(datetime_cols)}:\n"
 431            + f"{traceback.format_exc()}"
 432        )
 433
 434    for dt in datetime_cols:
 435        try:
 436            df[dt] = df[dt].dt.tz_localize(None)
 437        except Exception as e:
 438            warn(f"Unable to convert column '{dt}' to naive datetime:\n{traceback.format_exc()}")
 439
 440    return df
 441
 442
 443def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]:
 444    """
 445    Get the columns which contain unhashable objects from a Pandas DataFrame.
 446
 447    Parameters
 448    ----------
 449    df: pd.DataFrame
 450        The DataFrame which may contain unhashable objects.
 451
 452    Returns
 453    -------
 454    A list of columns.
 455    """
 456    if len(df) == 0:
 457        return []
 458
 459    is_dask = 'dask' in df.__module__
 460    if is_dask:
 461        from meerschaum.utils.packages import attempt_import
 462        pandas = attempt_import('pandas')
 463        df = pandas.DataFrame(get_first_valid_dask_partition(df))
 464    return [
 465        col for col, val in df.iloc[0].items()
 466        if not isinstance(val, Hashable)
 467    ]
 468
 469
 470def get_json_cols(df: 'pd.DataFrame') -> List[str]:
 471    """
 472    Get the columns which contain unhashable objects from a Pandas DataFrame.
 473
 474    Parameters
 475    ----------
 476    df: pd.DataFrame
 477        The DataFrame which may contain unhashable objects.
 478
 479    Returns
 480    -------
 481    A list of columns to be encoded as JSON.
 482    """
 483    is_dask = 'dask' in df.__module__
 484    if is_dask:
 485        df = get_first_valid_dask_partition(df)
 486    
 487    if len(df) == 0:
 488        return []
 489
 490    cols_indices = {
 491        col: df[col].first_valid_index()
 492        for col in df.columns
 493    }
 494    return [
 495        col
 496        for col, ix in cols_indices.items()
 497        if (
 498            ix is not None
 499            and
 500            not isinstance(df.loc[ix][col], Hashable)
 501        )
 502    ]
 503
 504
 505def get_numeric_cols(df: 'pd.DataFrame') -> List[str]:
 506    """
 507    Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame.
 508
 509    Parameters
 510    ----------
 511    df: pd.DataFrame
 512        The DataFrame which may contain decimal objects.
 513
 514    Returns
 515    -------
 516    A list of columns to treat as numerics.
 517    """
 518    from decimal import Decimal
 519    is_dask = 'dask' in df.__module__
 520    if is_dask:
 521        df = get_first_valid_dask_partition(df)
 522    
 523    if len(df) == 0:
 524        return []
 525
 526    cols_indices = {
 527        col: df[col].first_valid_index()
 528        for col in df.columns
 529    }
 530    return [
 531        col
 532        for col, ix in cols_indices.items()
 533        if (
 534            ix is not None
 535            and
 536            isinstance(df.loc[ix][col], Decimal)
 537        )
 538    ]
 539
 540
 541def enforce_dtypes(
 542        df: 'pd.DataFrame',
 543        dtypes: Dict[str, str],
 544        safe_copy: bool = True,
 545        coerce_numeric: bool = True,
 546        debug: bool = False,
 547    ) -> 'pd.DataFrame':
 548    """
 549    Enforce the `dtypes` dictionary on a DataFrame.
 550
 551    Parameters
 552    ----------
 553    df: pd.DataFrame
 554        The DataFrame on which to enforce dtypes.
 555
 556    dtypes: Dict[str, str]
 557        The data types to attempt to enforce on the DataFrame.
 558
 559    safe_copy: bool, default True
 560        If `True`, create a copy before comparing and modifying the dataframes.
 561        Setting to `False` may mutate the DataFrames.
 562        See `meerschaum.utils.dataframe.filter_unseen_df`.
 563
 564    coerce_numeric: bool, default True
 565        If `True`, convert float and int collisions to numeric.
 566
 567    debug: bool, default False
 568        Verbosity toggle.
 569
 570    Returns
 571    -------
 572    The Pandas DataFrame with the types enforced.
 573    """
 574    import json
 575    import traceback
 576    from decimal import Decimal
 577    from meerschaum.utils.debug import dprint
 578    from meerschaum.utils.warnings import warn
 579    from meerschaum.utils.formatting import pprint
 580    from meerschaum.config.static import STATIC_CONFIG
 581    from meerschaum.utils.packages import import_pandas
 582    from meerschaum.utils.dtypes import (
 583        are_dtypes_equal,
 584        to_pandas_dtype,
 585        is_dtype_numeric,
 586        attempt_cast_to_numeric,
 587    )
 588    if safe_copy:
 589        df = df.copy()
 590    df_dtypes = {c: str(t) for c, t in df.dtypes.items()}
 591    if len(df_dtypes) == 0:
 592        if debug:
 593            dprint("Incoming DataFrame has no columns. Skipping enforcement...")
 594        return df
 595
 596    pipe_pandas_dtypes = {
 597        col: to_pandas_dtype(typ)
 598        for col, typ in dtypes.items()
 599    }
 600    json_cols = [
 601        col
 602        for col, typ in dtypes.items()
 603        if typ == 'json'
 604    ]
 605    numeric_cols = [
 606        col
 607        for col, typ in dtypes.items()
 608        if typ == 'numeric'
 609    ]
 610    df_numeric_cols = get_numeric_cols(df)
 611    if debug:
 612        dprint(f"Desired data types:")
 613        pprint(dtypes)
 614        dprint(f"Data types for incoming DataFrame:")
 615        pprint(df_dtypes)
 616
 617    if json_cols and len(df) > 0:
 618        if debug:
 619            dprint(f"Checking columns for JSON encoding: {json_cols}")
 620        for col in json_cols:
 621            if col in df.columns:
 622                try:
 623                    df[col] = df[col].apply(
 624                        (
 625                            lambda x: (
 626                                json.loads(x)
 627                                if isinstance(x, str)
 628                                else x
 629                            )
 630                        )
 631                    )
 632                except Exception as e:
 633                    if debug:
 634                        dprint(f"Unable to parse column '{col}' as JSON:\n{e}")
 635
 636    if numeric_cols:
 637        if debug:
 638            dprint(f"Checking for numerics: {numeric_cols}")
 639        for col in numeric_cols:
 640            if col in df.columns:
 641                try:
 642                    df[col] = df[col].apply(attempt_cast_to_numeric)
 643                except Exception as e:
 644                    if debug:
 645                        dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}")
 646
 647    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
 648        if debug:
 649            dprint(f"Data types match. Exiting enforcement...")
 650        return df
 651
 652    common_dtypes = {}
 653    common_diff_dtypes = {}
 654    for col, typ in pipe_pandas_dtypes.items():
 655        if col in df_dtypes:
 656            common_dtypes[col] = typ
 657            if not are_dtypes_equal(typ, df_dtypes[col]):
 658                common_diff_dtypes[col] = df_dtypes[col]
 659
 660    if debug:
 661        dprint(f"Common columns with different dtypes:")
 662        pprint(common_diff_dtypes)
 663
 664    detected_dt_cols = {}
 665    for col, typ in common_diff_dtypes.items():
 666        if 'datetime' in typ and 'datetime' in common_dtypes[col]:
 667            df_dtypes[col] = typ
 668            detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col])
 669    for col in detected_dt_cols:
 670        del common_diff_dtypes[col]
 671
 672    if debug:
 673        dprint(f"Common columns with different dtypes (after dates):")
 674        pprint(common_diff_dtypes)
 675
 676    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
 677        if debug:
 678            dprint(
 679                "The incoming DataFrame has mostly the same types, skipping enforcement."
 680                + f"The only detected difference was in the following datetime columns.\n"
 681                + "    Timezone information may be stripped."
 682            )
 683            pprint(detected_dt_cols)
 684        return df
 685
 686    for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items():
 687        previous_typ = common_dtypes[col]
 688        mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ))
 689        explicitly_float = are_dtypes_equal(dtypes.get(col, 'object'), 'float')
 690        explicitly_numeric = dtypes.get(col, 'numeric') == 'numeric'
 691        cast_to_numeric = (
 692            explicitly_numeric
 693            or col in df_numeric_cols
 694            or (mixed_numeric_types and not explicitly_float)
 695        ) and coerce_numeric
 696        if cast_to_numeric:
 697            common_dtypes[col] = attempt_cast_to_numeric
 698            common_diff_dtypes[col] = attempt_cast_to_numeric
 699
 700    for d in common_diff_dtypes:
 701        t = common_dtypes[d]
 702        if debug:
 703            dprint(f"Casting column {d} to dtype {t}.")
 704        try:
 705            df[d] = (
 706                df[d].apply(t)
 707                if callable(t)
 708                else df[d].astype(t)
 709            )
 710        except Exception as e:
 711            if debug:
 712                dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}")
 713            if 'int' in str(t).lower():
 714                try:
 715                    df[d] = df[d].astype('float64').astype(t)
 716                except Exception as e:
 717                    if debug:
 718                        dprint(f"Was unable to convert to float then {t}.")
 719    return df
 720
 721
 722def get_datetime_bound_from_df(
 723        df: Union['pd.DataFrame', dict, list],
 724        datetime_column: str,
 725        minimum: bool = True,
 726    ) -> Union[int, 'datetime.datetime', None]:
 727    """
 728    Return the minimum or maximum datetime (or integer) from a DataFrame.
 729
 730    Parameters
 731    ----------
 732    df: pd.DataFrame
 733        The DataFrame, list, or dict which contains the range axis.
 734
 735    datetime_column: str
 736        The name of the datetime (or int) column.
 737
 738    minimum: bool
 739        Whether to return the minimum (default) or maximum value.
 740
 741    Returns
 742    -------
 743    The minimum or maximum datetime value in the dataframe, or `None`.
 744    """
 745    if not datetime_column:
 746        return None
 747
 748    def compare(a, b):
 749        if a is None:
 750            return b
 751        if b is None:
 752            return a
 753        if minimum:
 754            return a if a < b else b
 755        return a if a > b else b
 756
 757    if isinstance(df, list):
 758        if len(df) == 0:
 759            return None
 760        best_yet = df[0].get(datetime_column, None)
 761        for doc in df:
 762            val = doc.get(datetime_column, None)
 763            best_yet = compare(best_yet, val)
 764        return best_yet
 765
 766    if isinstance(df, dict):
 767        if datetime_column not in df:
 768            return None
 769        best_yet = df[datetime_column][0]
 770        for val in df[datetime_column]:
 771            best_yet = compare(best_yet, val)
 772        return best_yet
 773
 774    if 'DataFrame' in str(type(df)):
 775        if datetime_column not in df.columns:
 776            return None
 777        return (
 778            df[datetime_column].min(skipna=True)
 779            if minimum
 780            else df[datetime_column].max(skipna=True)
 781        )
 782
 783    return None
 784
 785
 786def df_is_chunk_generator(df: Any) -> bool:
 787    """
 788    Determine whether to treat `df` as a chunk generator.
 789
 790    Note this should only be used in a context where generators are expected,
 791    as it will return `True` for any iterable.
 792
 793    Parameters
 794    ----------
 795    The DataFrame or chunk generator to evaluate.
 796
 797    Returns
 798    -------
 799    A `bool` indicating whether to treat `df` as a generator.
 800    """
 801    return (
 802        not isinstance(df, (dict, list, str))
 803        and 'DataFrame' not in str(type(df))
 804        and isinstance(df, (Generator, Iterable, Iterator))
 805    )
 806
 807
 808def chunksize_to_npartitions(chunksize: Optional[int]) -> int:
 809    """
 810    Return the Dask `npartitions` value for a given `chunksize`.
 811    """
 812    if chunksize == -1:
 813        from meerschaum.config import get_config
 814        chunksize = get_config('system', 'connectors', 'sql', 'chunksize')
 815    if chunksize is None:
 816        return 1
 817    return -1 * chunksize
 818
 819
 820def df_from_literal(
 821        pipe: Optional['meerschaum.Pipe'] = None,
 822        literal: str = None,
 823        debug: bool = False
 824    ) -> 'pd.DataFrame':
 825    """
 826    Construct a dataframe from a literal value, using the pipe's datetime and value column names.
 827
 828    Parameters
 829    ----------
 830    pipe: Optional['meerschaum.Pipe'], default None
 831        The pipe which will consume the literal value.
 832
 833    Returns
 834    -------
 835    A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
 836    and the literal as the value.
 837    """
 838    from meerschaum.utils.packages import import_pandas
 839    from meerschaum.utils.warnings import error, warn
 840    from meerschaum.utils.debug import dprint
 841
 842    if pipe is None or literal is None:
 843        error("Please provide a Pipe and a literal value")
 844    ### this will raise an error if the columns are undefined
 845    dt_name, val_name = pipe.get_columns('datetime', 'value')
 846
 847    val = literal
 848    if isinstance(literal, str):
 849        if debug:
 850            dprint(f"Received literal string: '{literal}'")
 851        import ast
 852        try:
 853            val = ast.literal_eval(literal)
 854        except Exception as e:
 855            warn(
 856                "Failed to parse value from string:\n" + f"{literal}" +
 857                "\n\nWill cast as a string instead."\
 858            )
 859            val = literal
 860
 861    from datetime import datetime, timezone
 862    now = datetime.now(timezone.utc).replace(tzinfo=None)
 863
 864    pd = import_pandas()
 865    return pd.DataFrame({dt_name: [now], val_name: [val]})
 866
 867
 868def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]:
 869    """
 870    Return the first valid Dask DataFrame partition (if possible).
 871    """
 872    pdf = None
 873    for partition in ddf.partitions:
 874        try:
 875            pdf = partition.compute()
 876        except Exception as e:
 877            continue
 878        if len(pdf) > 0:
 879            return pdf
 880    return ddf.compute()
 881
 882
 883def query_df(
 884        df: 'pd.DataFrame',
 885        params: Optional[Dict[str, Any]] = None,
 886        begin: Union[datetime, int, None] = None,
 887        end: Union[datetime, int, None] = None,
 888        datetime_column: Optional[str] = None,
 889        select_columns: Optional[List[str]] = None,
 890        omit_columns: Optional[List[str]] = None,
 891        inplace: bool = False,
 892        reset_index: bool = False,
 893        debug: bool = False,
 894    ) -> 'pd.DataFrame':
 895    """
 896    Query the dataframe with the params dictionary.
 897
 898    Parameters
 899    ----------
 900    df: pd.DataFrame
 901        The DataFrame to query against.
 902
 903    params: Optional[Dict[str, Any]], default None
 904        The parameters dictionary to use for the query.
 905
 906    begin: Union[datetime, int, None], default None
 907        If `begin` and `datetime_column` are provided, only return rows with a timestamp
 908        greater than or equal to this value.
 909
 910    end: Union[datetime, int, None], default None
 911        If `begin` and `datetime_column` are provided, only return rows with a timestamp
 912        less than this value.
 913
 914    datetime_column: Optional[str], default None
 915        A `datetime_column` must be provided to use `begin` and `end`.
 916
 917    select_columns: Optional[List[str]], default None
 918        If provided, only return these columns.
 919
 920    omit_columns: Optional[List[str]], default None
 921        If provided, do not include these columns in the result.
 922
 923    inplace: bool, default False
 924        If `True`, modify the DataFrame inplace rather than creating a new DataFrame.
 925
 926    reset_index: bool, default True
 927        If `True`, reset the index in the resulting DataFrame.
 928
 929    Returns
 930    -------
 931    A Pandas DataFrame query result.
 932    """
 933    if not params and not begin and not end:
 934        return df
 935
 936    import json
 937    import meerschaum as mrsm
 938    from meerschaum.utils.debug import dprint
 939    from meerschaum.utils.misc import get_in_ex_params
 940    from meerschaum.utils.warnings import warn
 941
 942    dtypes = {col: str(typ) for col, typ in df.dtypes.items()}
 943
 944    if begin or end:
 945        if not datetime_column or datetime_column not in df.columns:
 946            warn(
 947                f"The datetime column '{datetime_column}' is not present in the Dataframe, "
 948                + "ignoring begin and end...",
 949            )
 950            begin, end = None, None
 951
 952    if debug:
 953        dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}")
 954
 955    in_ex_params = get_in_ex_params(params)
 956
 957    def serialize(x: Any) -> str:
 958        if isinstance(x, (dict, list, tuple)):
 959            return json.dumps(x, sort_keys=True, separators=(',', ':'), default=str)
 960        if hasattr(x, 'isoformat'):
 961            return x.isoformat()
 962        return str(x)
 963
 964    masks = [
 965        (
 966            (df[datetime_column] >= begin)
 967            if begin is not None and datetime_column
 968            else True
 969        ) & (
 970            (df[datetime_column] < end)
 971            if end is not None and datetime_column
 972            else True
 973        )
 974    ]
 975
 976    masks.extend([
 977        (
 978            (
 979                df[col].apply(serialize).isin(
 980                    [
 981                        serialize(_in_val)
 982                        for _in_val in in_vals
 983                    ]
 984                ) if in_vals else True
 985            ) & (
 986                ~df[col].apply(serialize).isin(
 987                    [
 988                        serialize(_ex_val)
 989                        for _ex_val in ex_vals
 990                    ]
 991                ) if ex_vals else True
 992            )
 993        )
 994        for col, (in_vals, ex_vals) in in_ex_params.items()
 995        if col in df.columns
 996    ])
 997    query_mask = masks[0]
 998    for mask in masks:
 999        query_mask = query_mask & mask
1000
1001    if inplace:
1002        df.where(query_mask, inplace=inplace)
1003        df.dropna(how='all', inplace=inplace)
1004        result_df = df
1005    else:
1006        result_df = df.where(query_mask).dropna(how='all')
1007
1008    if reset_index:
1009        result_df.reset_index(drop=True, inplace=True)
1010
1011    result_df = enforce_dtypes(
1012        result_df,
1013        dtypes,
1014        safe_copy = (not inplace),
1015        debug = debug,
1016        coerce_numeric = False,
1017    )
1018
1019    if select_columns == ['*']:
1020        select_columns = None
1021
1022    if not select_columns and not omit_columns:
1023        return result_df
1024
1025    if select_columns:
1026        for col in list(result_df.columns):
1027            if col not in select_columns:
1028                del result_df[col]
1029        return result_df
1030
1031    if omit_columns:
1032        for col in list(result_df.columns):
1033            if col in omit_columns:
1034                del result_df[col]
1035    if debug:
1036        dprint(f"{dtypes=}")
1037    return result_df
def add_missing_cols_to_df(df: "'pd.DataFrame'", dtypes: Dict[str, Any]) -> 'pd.DataFrame':
18def add_missing_cols_to_df(df: 'pd.DataFrame', dtypes: Dict[str, Any]) -> pd.DataFrame:
19    """
20    Add columns from the dtypes dictionary as null columns to a new DataFrame.
21
22    Parameters
23    ----------
24    df: pd.DataFrame
25        The dataframe we should copy and add null columns.
26
27    dtypes:
28        The data types dictionary which may contain keys not present in `df.columns`.
29
30    Returns
31    -------
32    A new `DataFrame` with the keys from `dtypes` added as null columns.
33    If `df.dtypes` is the same as `dtypes`, then return a reference to `df`.
34    NOTE: This will not ensure that dtypes are enforced!
35
36    Examples
37    --------
38    >>> import pandas as pd
39    >>> df = pd.DataFrame([{'a': 1}])
40    >>> dtypes = {'b': 'Int64'}
41    >>> add_missing_cols_to_df(df, dtypes)
42          a  b
43       0  1  <NA>
44    >>> add_missing_cols_to_df(df, dtypes).dtypes
45    a    int64
46    b    Int64
47    dtype: object
48    >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes
49    a    int64
50    dtype: object
51    >>> 
52    """
53    if set(df.columns) == set(dtypes):
54        return df
55
56    import traceback
57    from meerschaum.utils.packages import import_pandas, attempt_import
58    from meerschaum.utils.warnings import warn
59    from meerschaum.utils.dtypes import to_pandas_dtype
60    pandas = attempt_import('pandas')
61    
62    def build_series(dtype: str):
63        return pandas.Series([], dtype=to_pandas_dtype(dtype))
64
65    assign_kwargs = {
66        str(col): build_series(str(typ))
67        for col, typ in dtypes.items()
68        if col not in df.columns
69    }
70    return df.assign(**assign_kwargs)

Add columns from the dtypes dictionary as null columns to a new DataFrame.

Parameters
  • df (pd.DataFrame): The dataframe we should copy and add null columns.
  • dtypes:: The data types dictionary which may contain keys not present in df.columns.
Returns
  • A new DataFrame with the keys from dtypes added as null columns.
  • If df.dtypes is the same as dtypes, then return a reference to df.
  • NOTE (This will not ensure that dtypes are enforced!):
Examples
>>> import pandas as pd
>>> df = pd.DataFrame([{'a': 1}])
>>> dtypes = {'b': 'Int64'}
>>> add_missing_cols_to_df(df, dtypes)
      a  b
   0  1  <NA>
>>> add_missing_cols_to_df(df, dtypes)meerschaum.utils.dtypes
a    int64
b    Int64
dtype: object
>>> add_missing_cols_to_df(df, {'a': 'object'})meerschaum.utils.dtypes
a    int64
dtype: object
>>>
def filter_unseen_df( old_df: "'pd.DataFrame'", new_df: "'pd.DataFrame'", safe_copy: bool = True, dtypes: Optional[Dict[str, Any]] = None, debug: bool = False) -> "'pd.DataFrame'":
 73def filter_unseen_df(
 74        old_df: 'pd.DataFrame',
 75        new_df: 'pd.DataFrame',
 76        safe_copy: bool = True,
 77        dtypes: Optional[Dict[str, Any]] = None,
 78        debug: bool = False,
 79    ) -> 'pd.DataFrame':
 80    """
 81    Left join two DataFrames to find the newest unseen data.
 82
 83    Parameters
 84    ----------
 85    old_df: 'pd.DataFrame'
 86        The original (target) dataframe. Acts as a filter on the `new_df`.
 87        
 88    new_df: 'pd.DataFrame'
 89        The fetched (source) dataframe. Rows that are contained in `old_df` are removed.
 90
 91    safe_copy: bool, default True
 92        If `True`, create a copy before comparing and modifying the dataframes.
 93        Setting to `False` may mutate the DataFrames.
 94        
 95    dtypes: Optional[Dict[str, Any]], default None
 96        Optionally specify the datatypes of the dataframe.
 97
 98    debug: bool, default False
 99        Verbosity toggle.
100
101    Returns
102    -------
103    A pandas dataframe of the new, unseen rows in `new_df`.
104
105    Examples
106    --------
107    ```python
108    >>> import pandas as pd
109    >>> df1 = pd.DataFrame({'a': [1,2]})
110    >>> df2 = pd.DataFrame({'a': [2,3]})
111    >>> filter_unseen_df(df1, df2)
112       a
113    0  3
114
115    ```
116
117    """
118    if old_df is None:
119        return new_df
120
121    if safe_copy:
122        old_df = old_df.copy()
123        new_df = new_df.copy()
124
125    import json
126    import functools
127    import traceback
128    from decimal import Decimal
129    from meerschaum.utils.warnings import warn
130    from meerschaum.utils.packages import import_pandas, attempt_import
131    from meerschaum.utils.dtypes import to_pandas_dtype, are_dtypes_equal, attempt_cast_to_numeric
132    from meerschaum.utils.debug import dprint
133    pd = import_pandas(debug=debug)
134    is_dask = 'dask' in new_df.__module__
135    if is_dask:
136        pandas = attempt_import('pandas')
137        dd = attempt_import('dask.dataframe')
138        merge = dd.merge
139        NA = pandas.NA
140    else:
141        merge = pd.merge
142        NA = pd.NA
143
144    new_df_dtypes = dict(new_df.dtypes)
145    old_df_dtypes = dict(old_df.dtypes)
146
147    same_cols = set(new_df.columns) == set(old_df.columns)
148    if not same_cols:
149        new_df = add_missing_cols_to_df(new_df, old_df_dtypes)
150        old_df = add_missing_cols_to_df(old_df, new_df_dtypes)
151
152    ### Edge case: two empty lists cast to DFs.
153    elif len(new_df.columns) == 0:
154        return new_df
155
156    try:
157        ### Order matters when checking equality.
158        new_df = new_df[old_df.columns]
159    except Exception as e:
160        warn(
161            "Was not able to cast old columns onto new DataFrame. " +
162            f"Are both DataFrames the same shape? Error:\n{e}",
163            stacklevel = 3,
164        )
165        return new_df[list(new_df_dtypes.keys())]
166
167    ### assume the old_df knows what it's doing, even if it's technically wrong.
168    if dtypes is None:
169        dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()}
170
171    dtypes = {
172        col: to_pandas_dtype(typ)
173        for col, typ in dtypes.items()
174        if col in new_df_dtypes and col in old_df_dtypes
175    }
176    for col, typ in new_df_dtypes.items():
177        if col not in dtypes:
178            dtypes[col] = typ
179    
180    cast_cols = True
181    try:
182        new_df = new_df.astype(dtypes)
183        cast_cols = False
184    except Exception as e:
185        warn(
186            f"Was not able to cast the new DataFrame to the given dtypes.\n{e}"
187        )
188
189    new_numeric_cols_existing = get_numeric_cols(new_df)
190    old_numeric_cols = get_numeric_cols(old_df)
191    for col, typ in {k: v for k, v in dtypes.items()}.items():
192        if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')):
193            new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float')
194            new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int')
195            new_is_numeric = col in new_numeric_cols_existing
196            old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float')
197            old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int')
198            old_is_numeric = col in old_numeric_cols
199
200            if (
201                (new_is_float or new_is_int or new_is_numeric)
202                and
203                (old_is_float or old_is_int or old_is_numeric)
204            ):
205                dtypes[col] = attempt_cast_to_numeric
206                cast_cols = True
207                continue
208
209            ### Fallback to object if the types don't match.
210            warn(
211                f"Detected different types for '{col}' "
212                + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), "
213                + "falling back to 'object'..."
214            )
215            dtypes[col] = 'object'
216            cast_cols = True
217
218    if cast_cols:
219        for col, dtype in dtypes.items():
220            if col in new_df.columns:
221                try:
222                    new_df[col] = (
223                        new_df[col].astype(dtype)
224                        if not callable(dtype)
225                        else new_df[col].apply(dtype)
226                    )
227                except Exception as e:
228                    warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}")
229
230    serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str)
231    new_json_cols = get_json_cols(new_df)
232    old_json_cols = get_json_cols(old_df)
233    json_cols = set(new_json_cols + old_json_cols)
234    for json_col in old_json_cols:
235        old_df[json_col] = old_df[json_col].apply(serializer)
236    for json_col in new_json_cols:
237        new_df[json_col] = new_df[json_col].apply(serializer)
238
239    new_numeric_cols = get_numeric_cols(new_df)
240    numeric_cols = set(new_numeric_cols + old_numeric_cols)
241    for numeric_col in old_numeric_cols:
242        old_df[numeric_col] = old_df[numeric_col].apply(
243            lambda x: f'{x:f}' if isinstance(x, Decimal) else x
244        )
245    for numeric_col in new_numeric_cols:
246        new_df[numeric_col] = new_df[numeric_col].apply(
247            lambda x: f'{x:f}' if isinstance(x, Decimal) else x
248        )
249
250    joined_df = merge(
251        new_df.fillna(NA),
252        old_df.fillna(NA),
253        how = 'left',
254        on = None,
255        indicator = True,
256    )
257    changed_rows_mask = (joined_df['_merge'] == 'left_only')
258    delta_df = joined_df[list(new_df_dtypes.keys())][changed_rows_mask].reset_index(drop=True)
259
260    for json_col in json_cols:
261        if json_col not in delta_df.columns:
262            continue
263        try:
264            delta_df[json_col] = delta_df[json_col].apply(json.loads)
265        except Exception as e:
266            warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}")
267
268    for numeric_col in numeric_cols:
269        if numeric_col not in delta_df.columns:
270            continue
271        try:
272            delta_df[numeric_col] = delta_df[numeric_col].apply(attempt_cast_to_numeric)
273        except Exception as e:
274            warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}")
275
276    return delta_df

Left join two DataFrames to find the newest unseen data.

Parameters
  • old_df ('pd.DataFrame'): The original (target) dataframe. Acts as a filter on the new_df.
  • new_df ('pd.DataFrame'): The fetched (source) dataframe. Rows that are contained in old_df are removed.
  • safe_copy (bool, default True): If True, create a copy before comparing and modifying the dataframes. Setting to False may mutate the DataFrames.
  • dtypes (Optional[Dict[str, Any]], default None): Optionally specify the datatypes of the dataframe.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A pandas dataframe of the new, unseen rows in new_df.
Examples
>>> import pandas as pd
>>> df1 = pd.DataFrame({'a': [1,2]})
>>> df2 = pd.DataFrame({'a': [2,3]})
>>> filter_unseen_df(df1, df2)
   a
0  3
def parse_df_datetimes( df: "'pd.DataFrame'", ignore_cols: Optional[Iterable[str]] = None, chunksize: Optional[int] = None, dtype_backend: str = 'numpy_nullable', debug: bool = False) -> "'pd.DataFrame'":
279def parse_df_datetimes(
280        df: 'pd.DataFrame',
281        ignore_cols: Optional[Iterable[str]] = None,
282        chunksize: Optional[int] = None,
283        dtype_backend: str = 'numpy_nullable',
284        debug: bool = False,
285    ) -> 'pd.DataFrame':
286    """
287    Parse a pandas DataFrame for datetime columns and cast as datetimes.
288
289    Parameters
290    ----------
291    df: pd.DataFrame
292        The pandas DataFrame to parse.
293
294    ignore_cols: Optional[Iterable[str]], default None
295        If provided, do not attempt to coerce these columns as datetimes.
296
297    chunksize: Optional[int], default None
298        If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe.
299
300    dtype_backend: str, default 'numpy_nullable'
301        If `df` is not a DataFrame and new one needs to be constructed,
302        use this as the datatypes backend.
303        Accepted values are 'numpy_nullable' and 'pyarrow'.
304        
305    debug: bool, default False
306        Verbosity toggle.
307
308    Returns
309    -------
310    A new pandas DataFrame with the determined datetime columns
311    (usually ISO strings) cast as datetimes.
312
313    Examples
314    --------
315    ```python
316    >>> import pandas as pd
317    >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 
318    >>> df.dtypes
319    a    object
320    dtype: object
321    >>> df = parse_df_datetimes(df)
322    >>> df.dtypes
323    a    datetime64[ns]
324    dtype: object
325
326    ```
327
328    """
329    from meerschaum.utils.packages import import_pandas, attempt_import
330    from meerschaum.utils.debug import dprint
331    from meerschaum.utils.warnings import warn
332    import traceback
333    pd = import_pandas()
334    pandas = attempt_import('pandas')
335    pd_name = pd.__name__
336    using_dask = 'dask' in pd_name
337    df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__)
338    dask_dataframe = None
339    if using_dask or df_is_dask:
340        npartitions = chunksize_to_npartitions(chunksize)
341        dask_dataframe = attempt_import('dask.dataframe')
342
343    ### if df is a dict, build DataFrame
344    if isinstance(df, pandas.DataFrame):
345        pdf = df
346    elif df_is_dask and isinstance(df, dask_dataframe.DataFrame):
347        pdf = get_first_valid_dask_partition(df)
348    else:
349        if debug:
350            dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...")
351
352        if using_dask:
353            if isinstance(df, list):
354                keys = set()
355                for doc in df:
356                    for key in doc:
357                        keys.add(key)
358                df = pd.DataFrame.from_dict(
359                    {
360                        k: [
361                            doc.get(k, None)
362                            for doc in df
363                        ] for k in keys
364                    },
365                    npartitions = npartitions,
366                )
367            elif isinstance(df, dict):
368                df = pd.DataFrame.from_dict(df, npartitions=npartitions)
369            elif 'pandas.core.frame.DataFrame' in str(type(df)):
370                df = pd.from_pandas(df, npartitions=npartitions)
371            else:
372                raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.")
373            pandas = attempt_import('pandas')
374            pdf = get_first_valid_dask_partition(df)
375
376        else:
377            df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend)
378            pdf = df
379
380    ### skip parsing if DataFrame is empty
381    if len(pdf) == 0:
382        if debug:
383            dprint(f"df is empty. Returning original DataFrame without casting datetime columns...")
384        return df
385
386    ignore_cols = set(
387        (ignore_cols or []) + [
388            col
389            for col, dtype in pdf.dtypes.items() 
390            if 'datetime' in str(dtype)
391        ]
392    )
393    cols_to_inspect = [col for col in pdf.columns if col not in ignore_cols]
394
395    if len(cols_to_inspect) == 0:
396        if debug:
397            dprint(f"All columns are ignored, skipping datetime detection...")
398        return df
399
400    ### apply regex to columns to determine which are ISO datetimes
401    iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+'
402    dt_mask = pdf[cols_to_inspect].astype(str).apply(
403        lambda s: s.str.match(iso_dt_regex).all()
404    )
405
406    ### list of datetime column names
407    datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]]
408    if not datetime_cols:
409        if debug:
410            dprint("No columns detected as datetimes, returning...")
411        return df
412
413    if debug:
414        dprint("Converting columns to datetimes: " + str(datetime_cols))
415
416    try:
417        if not using_dask:
418            df[datetime_cols] = df[datetime_cols].apply(pd.to_datetime, utc=True)
419        else:
420            df[datetime_cols] = df[datetime_cols].apply(
421                pd.to_datetime,
422                utc = True,
423                axis = 1,
424                meta = {
425                    col: 'datetime64[ns]'
426                    for col in datetime_cols
427                }
428            )
429    except Exception as e:
430        warn(
431            f"Unable to apply `pd.to_datetime` to {items_str(datetime_cols)}:\n"
432            + f"{traceback.format_exc()}"
433        )
434
435    for dt in datetime_cols:
436        try:
437            df[dt] = df[dt].dt.tz_localize(None)
438        except Exception as e:
439            warn(f"Unable to convert column '{dt}' to naive datetime:\n{traceback.format_exc()}")
440
441    return df

Parse a pandas DataFrame for datetime columns and cast as datetimes.

Parameters
  • df (pd.DataFrame): The pandas DataFrame to parse.
  • ignore_cols (Optional[Iterable[str]], default None): If provided, do not attempt to coerce these columns as datetimes.
  • chunksize (Optional[int], default None): If the pandas implementation is 'dask', use this chunksize for the distributed dataframe.
  • dtype_backend (str, default 'numpy_nullable'): If df is not a DataFrame and new one needs to be constructed, use this as the datatypes backend. Accepted values are 'numpy_nullable' and 'pyarrow'.
  • debug (bool, default False): Verbosity toggle.
Returns
  • A new pandas DataFrame with the determined datetime columns
  • (usually ISO strings) cast as datetimes.
Examples
>>> import pandas as pd
>>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 
>>> df.dtypes
a    object
dtype: object
>>> df = parse_df_datetimes(df)
>>> df.dtypes
a    datetime64[ns]
dtype: object
def get_unhashable_cols(df: "'pd.DataFrame'") -> List[str]:
444def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]:
445    """
446    Get the columns which contain unhashable objects from a Pandas DataFrame.
447
448    Parameters
449    ----------
450    df: pd.DataFrame
451        The DataFrame which may contain unhashable objects.
452
453    Returns
454    -------
455    A list of columns.
456    """
457    if len(df) == 0:
458        return []
459
460    is_dask = 'dask' in df.__module__
461    if is_dask:
462        from meerschaum.utils.packages import attempt_import
463        pandas = attempt_import('pandas')
464        df = pandas.DataFrame(get_first_valid_dask_partition(df))
465    return [
466        col for col, val in df.iloc[0].items()
467        if not isinstance(val, Hashable)
468    ]

Get the columns which contain unhashable objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
  • A list of columns.
def get_json_cols(df: "'pd.DataFrame'") -> List[str]:
471def get_json_cols(df: 'pd.DataFrame') -> List[str]:
472    """
473    Get the columns which contain unhashable objects from a Pandas DataFrame.
474
475    Parameters
476    ----------
477    df: pd.DataFrame
478        The DataFrame which may contain unhashable objects.
479
480    Returns
481    -------
482    A list of columns to be encoded as JSON.
483    """
484    is_dask = 'dask' in df.__module__
485    if is_dask:
486        df = get_first_valid_dask_partition(df)
487    
488    if len(df) == 0:
489        return []
490
491    cols_indices = {
492        col: df[col].first_valid_index()
493        for col in df.columns
494    }
495    return [
496        col
497        for col, ix in cols_indices.items()
498        if (
499            ix is not None
500            and
501            not isinstance(df.loc[ix][col], Hashable)
502        )
503    ]

Get the columns which contain unhashable objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
  • A list of columns to be encoded as JSON.
def get_numeric_cols(df: "'pd.DataFrame'") -> List[str]:
506def get_numeric_cols(df: 'pd.DataFrame') -> List[str]:
507    """
508    Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame.
509
510    Parameters
511    ----------
512    df: pd.DataFrame
513        The DataFrame which may contain decimal objects.
514
515    Returns
516    -------
517    A list of columns to treat as numerics.
518    """
519    from decimal import Decimal
520    is_dask = 'dask' in df.__module__
521    if is_dask:
522        df = get_first_valid_dask_partition(df)
523    
524    if len(df) == 0:
525        return []
526
527    cols_indices = {
528        col: df[col].first_valid_index()
529        for col in df.columns
530    }
531    return [
532        col
533        for col, ix in cols_indices.items()
534        if (
535            ix is not None
536            and
537            isinstance(df.loc[ix][col], Decimal)
538        )
539    ]

Get the columns which contain decimal.Decimal objects from a Pandas DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame which may contain decimal objects.
Returns
  • A list of columns to treat as numerics.
def enforce_dtypes( df: "'pd.DataFrame'", dtypes: Dict[str, str], safe_copy: bool = True, coerce_numeric: bool = True, debug: bool = False) -> "'pd.DataFrame'":
542def enforce_dtypes(
543        df: 'pd.DataFrame',
544        dtypes: Dict[str, str],
545        safe_copy: bool = True,
546        coerce_numeric: bool = True,
547        debug: bool = False,
548    ) -> 'pd.DataFrame':
549    """
550    Enforce the `dtypes` dictionary on a DataFrame.
551
552    Parameters
553    ----------
554    df: pd.DataFrame
555        The DataFrame on which to enforce dtypes.
556
557    dtypes: Dict[str, str]
558        The data types to attempt to enforce on the DataFrame.
559
560    safe_copy: bool, default True
561        If `True`, create a copy before comparing and modifying the dataframes.
562        Setting to `False` may mutate the DataFrames.
563        See `meerschaum.utils.dataframe.filter_unseen_df`.
564
565    coerce_numeric: bool, default True
566        If `True`, convert float and int collisions to numeric.
567
568    debug: bool, default False
569        Verbosity toggle.
570
571    Returns
572    -------
573    The Pandas DataFrame with the types enforced.
574    """
575    import json
576    import traceback
577    from decimal import Decimal
578    from meerschaum.utils.debug import dprint
579    from meerschaum.utils.warnings import warn
580    from meerschaum.utils.formatting import pprint
581    from meerschaum.config.static import STATIC_CONFIG
582    from meerschaum.utils.packages import import_pandas
583    from meerschaum.utils.dtypes import (
584        are_dtypes_equal,
585        to_pandas_dtype,
586        is_dtype_numeric,
587        attempt_cast_to_numeric,
588    )
589    if safe_copy:
590        df = df.copy()
591    df_dtypes = {c: str(t) for c, t in df.dtypes.items()}
592    if len(df_dtypes) == 0:
593        if debug:
594            dprint("Incoming DataFrame has no columns. Skipping enforcement...")
595        return df
596
597    pipe_pandas_dtypes = {
598        col: to_pandas_dtype(typ)
599        for col, typ in dtypes.items()
600    }
601    json_cols = [
602        col
603        for col, typ in dtypes.items()
604        if typ == 'json'
605    ]
606    numeric_cols = [
607        col
608        for col, typ in dtypes.items()
609        if typ == 'numeric'
610    ]
611    df_numeric_cols = get_numeric_cols(df)
612    if debug:
613        dprint(f"Desired data types:")
614        pprint(dtypes)
615        dprint(f"Data types for incoming DataFrame:")
616        pprint(df_dtypes)
617
618    if json_cols and len(df) > 0:
619        if debug:
620            dprint(f"Checking columns for JSON encoding: {json_cols}")
621        for col in json_cols:
622            if col in df.columns:
623                try:
624                    df[col] = df[col].apply(
625                        (
626                            lambda x: (
627                                json.loads(x)
628                                if isinstance(x, str)
629                                else x
630                            )
631                        )
632                    )
633                except Exception as e:
634                    if debug:
635                        dprint(f"Unable to parse column '{col}' as JSON:\n{e}")
636
637    if numeric_cols:
638        if debug:
639            dprint(f"Checking for numerics: {numeric_cols}")
640        for col in numeric_cols:
641            if col in df.columns:
642                try:
643                    df[col] = df[col].apply(attempt_cast_to_numeric)
644                except Exception as e:
645                    if debug:
646                        dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}")
647
648    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
649        if debug:
650            dprint(f"Data types match. Exiting enforcement...")
651        return df
652
653    common_dtypes = {}
654    common_diff_dtypes = {}
655    for col, typ in pipe_pandas_dtypes.items():
656        if col in df_dtypes:
657            common_dtypes[col] = typ
658            if not are_dtypes_equal(typ, df_dtypes[col]):
659                common_diff_dtypes[col] = df_dtypes[col]
660
661    if debug:
662        dprint(f"Common columns with different dtypes:")
663        pprint(common_diff_dtypes)
664
665    detected_dt_cols = {}
666    for col, typ in common_diff_dtypes.items():
667        if 'datetime' in typ and 'datetime' in common_dtypes[col]:
668            df_dtypes[col] = typ
669            detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col])
670    for col in detected_dt_cols:
671        del common_diff_dtypes[col]
672
673    if debug:
674        dprint(f"Common columns with different dtypes (after dates):")
675        pprint(common_diff_dtypes)
676
677    if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes):
678        if debug:
679            dprint(
680                "The incoming DataFrame has mostly the same types, skipping enforcement."
681                + f"The only detected difference was in the following datetime columns.\n"
682                + "    Timezone information may be stripped."
683            )
684            pprint(detected_dt_cols)
685        return df
686
687    for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items():
688        previous_typ = common_dtypes[col]
689        mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ))
690        explicitly_float = are_dtypes_equal(dtypes.get(col, 'object'), 'float')
691        explicitly_numeric = dtypes.get(col, 'numeric') == 'numeric'
692        cast_to_numeric = (
693            explicitly_numeric
694            or col in df_numeric_cols
695            or (mixed_numeric_types and not explicitly_float)
696        ) and coerce_numeric
697        if cast_to_numeric:
698            common_dtypes[col] = attempt_cast_to_numeric
699            common_diff_dtypes[col] = attempt_cast_to_numeric
700
701    for d in common_diff_dtypes:
702        t = common_dtypes[d]
703        if debug:
704            dprint(f"Casting column {d} to dtype {t}.")
705        try:
706            df[d] = (
707                df[d].apply(t)
708                if callable(t)
709                else df[d].astype(t)
710            )
711        except Exception as e:
712            if debug:
713                dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}")
714            if 'int' in str(t).lower():
715                try:
716                    df[d] = df[d].astype('float64').astype(t)
717                except Exception as e:
718                    if debug:
719                        dprint(f"Was unable to convert to float then {t}.")
720    return df

Enforce the dtypes dictionary on a DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame on which to enforce dtypes.
  • dtypes (Dict[str, str]): The data types to attempt to enforce on the DataFrame.
  • safe_copy (bool, default True): If True, create a copy before comparing and modifying the dataframes. Setting to False may mutate the DataFrames. See filter_unseen_df.
  • coerce_numeric (bool, default True): If True, convert float and int collisions to numeric.
  • debug (bool, default False): Verbosity toggle.
Returns
  • The Pandas DataFrame with the types enforced.
def get_datetime_bound_from_df( df: "Union['pd.DataFrame', dict, list]", datetime_column: str, minimum: bool = True) -> "Union[int, 'datetime.datetime', None]":
723def get_datetime_bound_from_df(
724        df: Union['pd.DataFrame', dict, list],
725        datetime_column: str,
726        minimum: bool = True,
727    ) -> Union[int, 'datetime.datetime', None]:
728    """
729    Return the minimum or maximum datetime (or integer) from a DataFrame.
730
731    Parameters
732    ----------
733    df: pd.DataFrame
734        The DataFrame, list, or dict which contains the range axis.
735
736    datetime_column: str
737        The name of the datetime (or int) column.
738
739    minimum: bool
740        Whether to return the minimum (default) or maximum value.
741
742    Returns
743    -------
744    The minimum or maximum datetime value in the dataframe, or `None`.
745    """
746    if not datetime_column:
747        return None
748
749    def compare(a, b):
750        if a is None:
751            return b
752        if b is None:
753            return a
754        if minimum:
755            return a if a < b else b
756        return a if a > b else b
757
758    if isinstance(df, list):
759        if len(df) == 0:
760            return None
761        best_yet = df[0].get(datetime_column, None)
762        for doc in df:
763            val = doc.get(datetime_column, None)
764            best_yet = compare(best_yet, val)
765        return best_yet
766
767    if isinstance(df, dict):
768        if datetime_column not in df:
769            return None
770        best_yet = df[datetime_column][0]
771        for val in df[datetime_column]:
772            best_yet = compare(best_yet, val)
773        return best_yet
774
775    if 'DataFrame' in str(type(df)):
776        if datetime_column not in df.columns:
777            return None
778        return (
779            df[datetime_column].min(skipna=True)
780            if minimum
781            else df[datetime_column].max(skipna=True)
782        )
783
784    return None

Return the minimum or maximum datetime (or integer) from a DataFrame.

Parameters
  • df (pd.DataFrame): The DataFrame, list, or dict which contains the range axis.
  • datetime_column (str): The name of the datetime (or int) column.
  • minimum (bool): Whether to return the minimum (default) or maximum value.
Returns
  • The minimum or maximum datetime value in the dataframe, or None.
def df_is_chunk_generator(df: Any) -> bool:
787def df_is_chunk_generator(df: Any) -> bool:
788    """
789    Determine whether to treat `df` as a chunk generator.
790
791    Note this should only be used in a context where generators are expected,
792    as it will return `True` for any iterable.
793
794    Parameters
795    ----------
796    The DataFrame or chunk generator to evaluate.
797
798    Returns
799    -------
800    A `bool` indicating whether to treat `df` as a generator.
801    """
802    return (
803        not isinstance(df, (dict, list, str))
804        and 'DataFrame' not in str(type(df))
805        and isinstance(df, (Generator, Iterable, Iterator))
806    )

Determine whether to treat df as a chunk generator.

Note this should only be used in a context where generators are expected, as it will return True for any iterable.

Parameters
  • The DataFrame or chunk generator to evaluate.
Returns
  • A bool indicating whether to treat df as a generator.
def chunksize_to_npartitions(chunksize: Optional[int]) -> int:
809def chunksize_to_npartitions(chunksize: Optional[int]) -> int:
810    """
811    Return the Dask `npartitions` value for a given `chunksize`.
812    """
813    if chunksize == -1:
814        from meerschaum.config import get_config
815        chunksize = get_config('system', 'connectors', 'sql', 'chunksize')
816    if chunksize is None:
817        return 1
818    return -1 * chunksize

Return the Dask npartitions value for a given chunksize.

def df_from_literal( pipe: Optional[meerschaum.core.Pipe.Pipe] = None, literal: str = None, debug: bool = False) -> "'pd.DataFrame'":
821def df_from_literal(
822        pipe: Optional['meerschaum.Pipe'] = None,
823        literal: str = None,
824        debug: bool = False
825    ) -> 'pd.DataFrame':
826    """
827    Construct a dataframe from a literal value, using the pipe's datetime and value column names.
828
829    Parameters
830    ----------
831    pipe: Optional['meerschaum.Pipe'], default None
832        The pipe which will consume the literal value.
833
834    Returns
835    -------
836    A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
837    and the literal as the value.
838    """
839    from meerschaum.utils.packages import import_pandas
840    from meerschaum.utils.warnings import error, warn
841    from meerschaum.utils.debug import dprint
842
843    if pipe is None or literal is None:
844        error("Please provide a Pipe and a literal value")
845    ### this will raise an error if the columns are undefined
846    dt_name, val_name = pipe.get_columns('datetime', 'value')
847
848    val = literal
849    if isinstance(literal, str):
850        if debug:
851            dprint(f"Received literal string: '{literal}'")
852        import ast
853        try:
854            val = ast.literal_eval(literal)
855        except Exception as e:
856            warn(
857                "Failed to parse value from string:\n" + f"{literal}" +
858                "\n\nWill cast as a string instead."\
859            )
860            val = literal
861
862    from datetime import datetime, timezone
863    now = datetime.now(timezone.utc).replace(tzinfo=None)
864
865    pd = import_pandas()
866    return pd.DataFrame({dt_name: [now], val_name: [val]})

Construct a dataframe from a literal value, using the pipe's datetime and value column names.

Parameters
  • pipe (Optional['meerschaum.Pipe'], default None): The pipe which will consume the literal value.
Returns
  • A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
  • and the literal as the value.
def get_first_valid_dask_partition(ddf: "'dask.dataframe.DataFrame'") -> "Union['pd.DataFrame', None]":
869def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]:
870    """
871    Return the first valid Dask DataFrame partition (if possible).
872    """
873    pdf = None
874    for partition in ddf.partitions:
875        try:
876            pdf = partition.compute()
877        except Exception as e:
878            continue
879        if len(pdf) > 0:
880            return pdf
881    return ddf.compute()

Return the first valid Dask DataFrame partition (if possible).

def query_df( df: "'pd.DataFrame'", params: Optional[Dict[str, Any]] = None, begin: Union[datetime.datetime, int, NoneType] = None, end: Union[datetime.datetime, int, NoneType] = None, datetime_column: Optional[str] = None, select_columns: Optional[List[str]] = None, omit_columns: Optional[List[str]] = None, inplace: bool = False, reset_index: bool = False, debug: bool = False) -> "'pd.DataFrame'":
 884def query_df(
 885        df: 'pd.DataFrame',
 886        params: Optional[Dict[str, Any]] = None,
 887        begin: Union[datetime, int, None] = None,
 888        end: Union[datetime, int, None] = None,
 889        datetime_column: Optional[str] = None,
 890        select_columns: Optional[List[str]] = None,
 891        omit_columns: Optional[List[str]] = None,
 892        inplace: bool = False,
 893        reset_index: bool = False,
 894        debug: bool = False,
 895    ) -> 'pd.DataFrame':
 896    """
 897    Query the dataframe with the params dictionary.
 898
 899    Parameters
 900    ----------
 901    df: pd.DataFrame
 902        The DataFrame to query against.
 903
 904    params: Optional[Dict[str, Any]], default None
 905        The parameters dictionary to use for the query.
 906
 907    begin: Union[datetime, int, None], default None
 908        If `begin` and `datetime_column` are provided, only return rows with a timestamp
 909        greater than or equal to this value.
 910
 911    end: Union[datetime, int, None], default None
 912        If `begin` and `datetime_column` are provided, only return rows with a timestamp
 913        less than this value.
 914
 915    datetime_column: Optional[str], default None
 916        A `datetime_column` must be provided to use `begin` and `end`.
 917
 918    select_columns: Optional[List[str]], default None
 919        If provided, only return these columns.
 920
 921    omit_columns: Optional[List[str]], default None
 922        If provided, do not include these columns in the result.
 923
 924    inplace: bool, default False
 925        If `True`, modify the DataFrame inplace rather than creating a new DataFrame.
 926
 927    reset_index: bool, default True
 928        If `True`, reset the index in the resulting DataFrame.
 929
 930    Returns
 931    -------
 932    A Pandas DataFrame query result.
 933    """
 934    if not params and not begin and not end:
 935        return df
 936
 937    import json
 938    import meerschaum as mrsm
 939    from meerschaum.utils.debug import dprint
 940    from meerschaum.utils.misc import get_in_ex_params
 941    from meerschaum.utils.warnings import warn
 942
 943    dtypes = {col: str(typ) for col, typ in df.dtypes.items()}
 944
 945    if begin or end:
 946        if not datetime_column or datetime_column not in df.columns:
 947            warn(
 948                f"The datetime column '{datetime_column}' is not present in the Dataframe, "
 949                + "ignoring begin and end...",
 950            )
 951            begin, end = None, None
 952
 953    if debug:
 954        dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}")
 955
 956    in_ex_params = get_in_ex_params(params)
 957
 958    def serialize(x: Any) -> str:
 959        if isinstance(x, (dict, list, tuple)):
 960            return json.dumps(x, sort_keys=True, separators=(',', ':'), default=str)
 961        if hasattr(x, 'isoformat'):
 962            return x.isoformat()
 963        return str(x)
 964
 965    masks = [
 966        (
 967            (df[datetime_column] >= begin)
 968            if begin is not None and datetime_column
 969            else True
 970        ) & (
 971            (df[datetime_column] < end)
 972            if end is not None and datetime_column
 973            else True
 974        )
 975    ]
 976
 977    masks.extend([
 978        (
 979            (
 980                df[col].apply(serialize).isin(
 981                    [
 982                        serialize(_in_val)
 983                        for _in_val in in_vals
 984                    ]
 985                ) if in_vals else True
 986            ) & (
 987                ~df[col].apply(serialize).isin(
 988                    [
 989                        serialize(_ex_val)
 990                        for _ex_val in ex_vals
 991                    ]
 992                ) if ex_vals else True
 993            )
 994        )
 995        for col, (in_vals, ex_vals) in in_ex_params.items()
 996        if col in df.columns
 997    ])
 998    query_mask = masks[0]
 999    for mask in masks:
1000        query_mask = query_mask & mask
1001
1002    if inplace:
1003        df.where(query_mask, inplace=inplace)
1004        df.dropna(how='all', inplace=inplace)
1005        result_df = df
1006    else:
1007        result_df = df.where(query_mask).dropna(how='all')
1008
1009    if reset_index:
1010        result_df.reset_index(drop=True, inplace=True)
1011
1012    result_df = enforce_dtypes(
1013        result_df,
1014        dtypes,
1015        safe_copy = (not inplace),
1016        debug = debug,
1017        coerce_numeric = False,
1018    )
1019
1020    if select_columns == ['*']:
1021        select_columns = None
1022
1023    if not select_columns and not omit_columns:
1024        return result_df
1025
1026    if select_columns:
1027        for col in list(result_df.columns):
1028            if col not in select_columns:
1029                del result_df[col]
1030        return result_df
1031
1032    if omit_columns:
1033        for col in list(result_df.columns):
1034            if col in omit_columns:
1035                del result_df[col]
1036    if debug:
1037        dprint(f"{dtypes=}")
1038    return result_df

Query the dataframe with the params dictionary.

Parameters
  • df (pd.DataFrame): The DataFrame to query against.
  • params (Optional[Dict[str, Any]], default None): The parameters dictionary to use for the query.
  • begin (Union[datetime, int, None], default None): If begin and datetime_column are provided, only return rows with a timestamp greater than or equal to this value.
  • end (Union[datetime, int, None], default None): If begin and datetime_column are provided, only return rows with a timestamp less than this value.
  • datetime_column (Optional[str], default None): A datetime_column must be provided to use begin and end.
  • select_columns (Optional[List[str]], default None): If provided, only return these columns.
  • omit_columns (Optional[List[str]], default None): If provided, do not include these columns in the result.
  • inplace (bool, default False): If True, modify the DataFrame inplace rather than creating a new DataFrame.
  • reset_index (bool, default True): If True, reset the index in the resulting DataFrame.
Returns
  • A Pandas DataFrame query result.