meerschaum.utils.dataframe
Utility functions for working with DataFrames.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Utility functions for working with DataFrames. 7""" 8 9from __future__ import annotations 10from datetime import datetime 11 12import meerschaum as mrsm 13from meerschaum.utils.typing import ( 14 Optional, Dict, Any, List, Hashable, Generator, 15 Iterator, Iterable, Union, TYPE_CHECKING, 16) 17 18if TYPE_CHECKING: 19 pd, dask = mrsm.attempt_import('pandas', 'dask') 20 21 22def add_missing_cols_to_df( 23 df: 'pd.DataFrame', 24 dtypes: Dict[str, Any], 25) -> 'pd.DataFrame': 26 """ 27 Add columns from the dtypes dictionary as null columns to a new DataFrame. 28 29 Parameters 30 ---------- 31 df: pd.DataFrame 32 The dataframe we should copy and add null columns. 33 34 dtypes: 35 The data types dictionary which may contain keys not present in `df.columns`. 36 37 Returns 38 ------- 39 A new `DataFrame` with the keys from `dtypes` added as null columns. 40 If `df.dtypes` is the same as `dtypes`, then return a reference to `df`. 41 NOTE: This will not ensure that dtypes are enforced! 42 43 Examples 44 -------- 45 >>> import pandas as pd 46 >>> df = pd.DataFrame([{'a': 1}]) 47 >>> dtypes = {'b': 'Int64'} 48 >>> add_missing_cols_to_df(df, dtypes) 49 a b 50 0 1 <NA> 51 >>> add_missing_cols_to_df(df, dtypes).dtypes 52 a int64 53 b Int64 54 dtype: object 55 >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes 56 a int64 57 dtype: object 58 >>> 59 """ 60 if set(df.columns) == set(dtypes): 61 return df 62 63 import traceback 64 from meerschaum.utils.packages import import_pandas, attempt_import 65 from meerschaum.utils.warnings import warn 66 from meerschaum.utils.dtypes import to_pandas_dtype 67 pandas = attempt_import('pandas') 68 69 def build_series(dtype: str): 70 return pandas.Series([], dtype=to_pandas_dtype(dtype)) 71 72 assign_kwargs = { 73 str(col): build_series(str(typ)) 74 for col, typ in dtypes.items() 75 if col not in df.columns 76 } 77 return df.assign(**assign_kwargs) 78 79 80def filter_unseen_df( 81 old_df: 'pd.DataFrame', 82 new_df: 'pd.DataFrame', 83 safe_copy: bool = True, 84 dtypes: Optional[Dict[str, Any]] = None, 85 debug: bool = False, 86 ) -> 'pd.DataFrame': 87 """ 88 Left join two DataFrames to find the newest unseen data. 89 90 Parameters 91 ---------- 92 old_df: 'pd.DataFrame' 93 The original (target) dataframe. Acts as a filter on the `new_df`. 94 95 new_df: 'pd.DataFrame' 96 The fetched (source) dataframe. Rows that are contained in `old_df` are removed. 97 98 safe_copy: bool, default True 99 If `True`, create a copy before comparing and modifying the dataframes. 100 Setting to `False` may mutate the DataFrames. 101 102 dtypes: Optional[Dict[str, Any]], default None 103 Optionally specify the datatypes of the dataframe. 104 105 debug: bool, default False 106 Verbosity toggle. 107 108 Returns 109 ------- 110 A pandas dataframe of the new, unseen rows in `new_df`. 111 112 Examples 113 -------- 114 ```python 115 >>> import pandas as pd 116 >>> df1 = pd.DataFrame({'a': [1,2]}) 117 >>> df2 = pd.DataFrame({'a': [2,3]}) 118 >>> filter_unseen_df(df1, df2) 119 a 120 0 3 121 122 ``` 123 124 """ 125 if old_df is None: 126 return new_df 127 128 if safe_copy: 129 old_df = old_df.copy() 130 new_df = new_df.copy() 131 132 import json 133 import functools 134 import traceback 135 from decimal import Decimal 136 from meerschaum.utils.warnings import warn 137 from meerschaum.utils.packages import import_pandas, attempt_import 138 from meerschaum.utils.dtypes import to_pandas_dtype, are_dtypes_equal, attempt_cast_to_numeric 139 from meerschaum.utils.debug import dprint 140 pd = import_pandas(debug=debug) 141 is_dask = 'dask' in new_df.__module__ 142 if is_dask: 143 pandas = attempt_import('pandas') 144 dd = attempt_import('dask.dataframe') 145 merge = dd.merge 146 NA = pandas.NA 147 else: 148 merge = pd.merge 149 NA = pd.NA 150 151 new_df_dtypes = dict(new_df.dtypes) 152 old_df_dtypes = dict(old_df.dtypes) 153 154 same_cols = set(new_df.columns) == set(old_df.columns) 155 if not same_cols: 156 new_df = add_missing_cols_to_df(new_df, old_df_dtypes) 157 old_df = add_missing_cols_to_df(old_df, new_df_dtypes) 158 159 ### Edge case: two empty lists cast to DFs. 160 elif len(new_df.columns) == 0: 161 return new_df 162 163 try: 164 ### Order matters when checking equality. 165 new_df = new_df[old_df.columns] 166 except Exception as e: 167 warn( 168 "Was not able to cast old columns onto new DataFrame. " + 169 f"Are both DataFrames the same shape? Error:\n{e}", 170 stacklevel = 3, 171 ) 172 return new_df[list(new_df_dtypes.keys())] 173 174 ### assume the old_df knows what it's doing, even if it's technically wrong. 175 if dtypes is None: 176 dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()} 177 178 dtypes = { 179 col: to_pandas_dtype(typ) 180 for col, typ in dtypes.items() 181 if col in new_df_dtypes and col in old_df_dtypes 182 } 183 for col, typ in new_df_dtypes.items(): 184 if col not in dtypes: 185 dtypes[col] = typ 186 187 cast_cols = True 188 try: 189 new_df = new_df.astype(dtypes) 190 cast_cols = False 191 except Exception as e: 192 warn( 193 f"Was not able to cast the new DataFrame to the given dtypes.\n{e}" 194 ) 195 196 new_numeric_cols_existing = get_numeric_cols(new_df) 197 old_numeric_cols = get_numeric_cols(old_df) 198 for col, typ in {k: v for k, v in dtypes.items()}.items(): 199 if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')): 200 new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float') 201 new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int') 202 new_is_numeric = col in new_numeric_cols_existing 203 old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float') 204 old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int') 205 old_is_numeric = col in old_numeric_cols 206 207 if ( 208 (new_is_float or new_is_int or new_is_numeric) 209 and 210 (old_is_float or old_is_int or old_is_numeric) 211 ): 212 dtypes[col] = attempt_cast_to_numeric 213 cast_cols = True 214 continue 215 216 ### Fallback to object if the types don't match. 217 warn( 218 f"Detected different types for '{col}' " 219 + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), " 220 + "falling back to 'object'..." 221 ) 222 dtypes[col] = 'object' 223 cast_cols = True 224 225 if cast_cols: 226 for col, dtype in dtypes.items(): 227 if col in new_df.columns: 228 try: 229 new_df[col] = ( 230 new_df[col].astype(dtype) 231 if not callable(dtype) 232 else new_df[col].apply(dtype) 233 ) 234 except Exception as e: 235 warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}") 236 237 serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str) 238 new_json_cols = get_json_cols(new_df) 239 old_json_cols = get_json_cols(old_df) 240 json_cols = set(new_json_cols + old_json_cols) 241 for json_col in old_json_cols: 242 old_df[json_col] = old_df[json_col].apply(serializer) 243 for json_col in new_json_cols: 244 new_df[json_col] = new_df[json_col].apply(serializer) 245 246 new_numeric_cols = get_numeric_cols(new_df) 247 numeric_cols = set(new_numeric_cols + old_numeric_cols) 248 for numeric_col in old_numeric_cols: 249 old_df[numeric_col] = old_df[numeric_col].apply( 250 lambda x: f'{x:f}' if isinstance(x, Decimal) else x 251 ) 252 for numeric_col in new_numeric_cols: 253 new_df[numeric_col] = new_df[numeric_col].apply( 254 lambda x: f'{x:f}' if isinstance(x, Decimal) else x 255 ) 256 257 joined_df = merge( 258 new_df.fillna(NA), 259 old_df.fillna(NA), 260 how = 'left', 261 on = None, 262 indicator = True, 263 ) 264 changed_rows_mask = (joined_df['_merge'] == 'left_only') 265 delta_df = joined_df[list(new_df_dtypes.keys())][changed_rows_mask].reset_index(drop=True) 266 267 for json_col in json_cols: 268 if json_col not in delta_df.columns: 269 continue 270 try: 271 delta_df[json_col] = delta_df[json_col].apply(json.loads) 272 except Exception as e: 273 warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}") 274 275 for numeric_col in numeric_cols: 276 if numeric_col not in delta_df.columns: 277 continue 278 try: 279 delta_df[numeric_col] = delta_df[numeric_col].apply(attempt_cast_to_numeric) 280 except Exception as e: 281 warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}") 282 283 return delta_df 284 285 286def parse_df_datetimes( 287 df: 'pd.DataFrame', 288 ignore_cols: Optional[Iterable[str]] = None, 289 chunksize: Optional[int] = None, 290 dtype_backend: str = 'numpy_nullable', 291 debug: bool = False, 292 ) -> 'pd.DataFrame': 293 """ 294 Parse a pandas DataFrame for datetime columns and cast as datetimes. 295 296 Parameters 297 ---------- 298 df: pd.DataFrame 299 The pandas DataFrame to parse. 300 301 ignore_cols: Optional[Iterable[str]], default None 302 If provided, do not attempt to coerce these columns as datetimes. 303 304 chunksize: Optional[int], default None 305 If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe. 306 307 dtype_backend: str, default 'numpy_nullable' 308 If `df` is not a DataFrame and new one needs to be constructed, 309 use this as the datatypes backend. 310 Accepted values are 'numpy_nullable' and 'pyarrow'. 311 312 debug: bool, default False 313 Verbosity toggle. 314 315 Returns 316 ------- 317 A new pandas DataFrame with the determined datetime columns 318 (usually ISO strings) cast as datetimes. 319 320 Examples 321 -------- 322 ```python 323 >>> import pandas as pd 324 >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 325 >>> df.dtypes 326 a object 327 dtype: object 328 >>> df = parse_df_datetimes(df) 329 >>> df.dtypes 330 a datetime64[ns] 331 dtype: object 332 333 ``` 334 335 """ 336 from meerschaum.utils.packages import import_pandas, attempt_import 337 from meerschaum.utils.debug import dprint 338 from meerschaum.utils.warnings import warn 339 import traceback 340 pd = import_pandas() 341 pandas = attempt_import('pandas') 342 pd_name = pd.__name__ 343 using_dask = 'dask' in pd_name 344 df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__) 345 dask_dataframe = None 346 if using_dask or df_is_dask: 347 npartitions = chunksize_to_npartitions(chunksize) 348 dask_dataframe = attempt_import('dask.dataframe') 349 350 ### if df is a dict, build DataFrame 351 if isinstance(df, pandas.DataFrame): 352 pdf = df 353 elif df_is_dask and isinstance(df, dask_dataframe.DataFrame): 354 pdf = get_first_valid_dask_partition(df) 355 else: 356 if debug: 357 dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...") 358 359 if using_dask: 360 if isinstance(df, list): 361 keys = set() 362 for doc in df: 363 for key in doc: 364 keys.add(key) 365 df = pd.DataFrame.from_dict( 366 { 367 k: [ 368 doc.get(k, None) 369 for doc in df 370 ] for k in keys 371 }, 372 npartitions = npartitions, 373 ) 374 elif isinstance(df, dict): 375 df = pd.DataFrame.from_dict(df, npartitions=npartitions) 376 elif 'pandas.core.frame.DataFrame' in str(type(df)): 377 df = pd.from_pandas(df, npartitions=npartitions) 378 else: 379 raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.") 380 pandas = attempt_import('pandas') 381 pdf = get_first_valid_dask_partition(df) 382 383 else: 384 df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend) 385 pdf = df 386 387 ### skip parsing if DataFrame is empty 388 if len(pdf) == 0: 389 if debug: 390 dprint(f"df is empty. Returning original DataFrame without casting datetime columns...") 391 return df 392 393 ignore_cols = set( 394 (ignore_cols or []) + [ 395 col 396 for col, dtype in pdf.dtypes.items() 397 if 'datetime' in str(dtype) 398 ] 399 ) 400 cols_to_inspect = [col for col in pdf.columns if col not in ignore_cols] 401 402 if len(cols_to_inspect) == 0: 403 if debug: 404 dprint(f"All columns are ignored, skipping datetime detection...") 405 return df 406 407 ### apply regex to columns to determine which are ISO datetimes 408 iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+' 409 dt_mask = pdf[cols_to_inspect].astype(str).apply( 410 lambda s: s.str.match(iso_dt_regex).all() 411 ) 412 413 ### list of datetime column names 414 datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]] 415 if not datetime_cols: 416 if debug: 417 dprint("No columns detected as datetimes, returning...") 418 return df 419 420 if debug: 421 dprint("Converting columns to datetimes: " + str(datetime_cols)) 422 423 try: 424 if not using_dask: 425 df[datetime_cols] = df[datetime_cols].apply(pd.to_datetime, utc=True) 426 else: 427 df[datetime_cols] = df[datetime_cols].apply( 428 pd.to_datetime, 429 utc = True, 430 axis = 1, 431 meta = { 432 col: 'datetime64[ns]' 433 for col in datetime_cols 434 } 435 ) 436 except Exception as e: 437 warn( 438 f"Unable to apply `pd.to_datetime` to {items_str(datetime_cols)}:\n" 439 + f"{traceback.format_exc()}" 440 ) 441 442 for dt in datetime_cols: 443 try: 444 df[dt] = df[dt].dt.tz_localize(None) 445 except Exception as e: 446 warn(f"Unable to convert column '{dt}' to naive datetime:\n{traceback.format_exc()}") 447 448 return df 449 450 451def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]: 452 """ 453 Get the columns which contain unhashable objects from a Pandas DataFrame. 454 455 Parameters 456 ---------- 457 df: pd.DataFrame 458 The DataFrame which may contain unhashable objects. 459 460 Returns 461 ------- 462 A list of columns. 463 """ 464 if len(df) == 0: 465 return [] 466 467 is_dask = 'dask' in df.__module__ 468 if is_dask: 469 from meerschaum.utils.packages import attempt_import 470 pandas = attempt_import('pandas') 471 df = pandas.DataFrame(get_first_valid_dask_partition(df)) 472 return [ 473 col for col, val in df.iloc[0].items() 474 if not isinstance(val, Hashable) 475 ] 476 477 478def get_json_cols(df: 'pd.DataFrame') -> List[str]: 479 """ 480 Get the columns which contain unhashable objects from a Pandas DataFrame. 481 482 Parameters 483 ---------- 484 df: pd.DataFrame 485 The DataFrame which may contain unhashable objects. 486 487 Returns 488 ------- 489 A list of columns to be encoded as JSON. 490 """ 491 is_dask = 'dask' in df.__module__ 492 if is_dask: 493 df = get_first_valid_dask_partition(df) 494 495 if len(df) == 0: 496 return [] 497 498 cols_indices = { 499 col: df[col].first_valid_index() 500 for col in df.columns 501 } 502 return [ 503 col 504 for col, ix in cols_indices.items() 505 if ( 506 ix is not None 507 and 508 not isinstance(df.loc[ix][col], Hashable) 509 ) 510 ] 511 512 513def get_numeric_cols(df: 'pd.DataFrame') -> List[str]: 514 """ 515 Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame. 516 517 Parameters 518 ---------- 519 df: pd.DataFrame 520 The DataFrame which may contain decimal objects. 521 522 Returns 523 ------- 524 A list of columns to treat as numerics. 525 """ 526 from decimal import Decimal 527 is_dask = 'dask' in df.__module__ 528 if is_dask: 529 df = get_first_valid_dask_partition(df) 530 531 if len(df) == 0: 532 return [] 533 534 cols_indices = { 535 col: df[col].first_valid_index() 536 for col in df.columns 537 } 538 return [ 539 col 540 for col, ix in cols_indices.items() 541 if ( 542 ix is not None 543 and 544 isinstance(df.loc[ix][col], Decimal) 545 ) 546 ] 547 548 549def enforce_dtypes( 550 df: 'pd.DataFrame', 551 dtypes: Dict[str, str], 552 safe_copy: bool = True, 553 coerce_numeric: bool = True, 554 debug: bool = False, 555 ) -> 'pd.DataFrame': 556 """ 557 Enforce the `dtypes` dictionary on a DataFrame. 558 559 Parameters 560 ---------- 561 df: pd.DataFrame 562 The DataFrame on which to enforce dtypes. 563 564 dtypes: Dict[str, str] 565 The data types to attempt to enforce on the DataFrame. 566 567 safe_copy: bool, default True 568 If `True`, create a copy before comparing and modifying the dataframes. 569 Setting to `False` may mutate the DataFrames. 570 See `meerschaum.utils.dataframe.filter_unseen_df`. 571 572 coerce_numeric: bool, default True 573 If `True`, convert float and int collisions to numeric. 574 575 debug: bool, default False 576 Verbosity toggle. 577 578 Returns 579 ------- 580 The Pandas DataFrame with the types enforced. 581 """ 582 import json 583 import traceback 584 from decimal import Decimal 585 from meerschaum.utils.debug import dprint 586 from meerschaum.utils.warnings import warn 587 from meerschaum.utils.formatting import pprint 588 from meerschaum.config.static import STATIC_CONFIG 589 from meerschaum.utils.packages import import_pandas 590 from meerschaum.utils.dtypes import ( 591 are_dtypes_equal, 592 to_pandas_dtype, 593 is_dtype_numeric, 594 attempt_cast_to_numeric, 595 ) 596 if safe_copy: 597 df = df.copy() 598 df_dtypes = {c: str(t) for c, t in df.dtypes.items()} 599 if len(df_dtypes) == 0: 600 if debug: 601 dprint("Incoming DataFrame has no columns. Skipping enforcement...") 602 return df 603 604 pipe_pandas_dtypes = { 605 col: to_pandas_dtype(typ) 606 for col, typ in dtypes.items() 607 } 608 json_cols = [ 609 col 610 for col, typ in dtypes.items() 611 if typ == 'json' 612 ] 613 numeric_cols = [ 614 col 615 for col, typ in dtypes.items() 616 if typ == 'numeric' 617 ] 618 df_numeric_cols = get_numeric_cols(df) 619 if debug: 620 dprint(f"Desired data types:") 621 pprint(dtypes) 622 dprint(f"Data types for incoming DataFrame:") 623 pprint(df_dtypes) 624 625 if json_cols and len(df) > 0: 626 if debug: 627 dprint(f"Checking columns for JSON encoding: {json_cols}") 628 for col in json_cols: 629 if col in df.columns: 630 try: 631 df[col] = df[col].apply( 632 ( 633 lambda x: ( 634 json.loads(x) 635 if isinstance(x, str) 636 else x 637 ) 638 ) 639 ) 640 except Exception as e: 641 if debug: 642 dprint(f"Unable to parse column '{col}' as JSON:\n{e}") 643 644 if numeric_cols: 645 if debug: 646 dprint(f"Checking for numerics: {numeric_cols}") 647 for col in numeric_cols: 648 if col in df.columns: 649 try: 650 df[col] = df[col].apply(attempt_cast_to_numeric) 651 except Exception as e: 652 if debug: 653 dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}") 654 655 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 656 if debug: 657 dprint(f"Data types match. Exiting enforcement...") 658 return df 659 660 common_dtypes = {} 661 common_diff_dtypes = {} 662 for col, typ in pipe_pandas_dtypes.items(): 663 if col in df_dtypes: 664 common_dtypes[col] = typ 665 if not are_dtypes_equal(typ, df_dtypes[col]): 666 common_diff_dtypes[col] = df_dtypes[col] 667 668 if debug: 669 dprint(f"Common columns with different dtypes:") 670 pprint(common_diff_dtypes) 671 672 detected_dt_cols = {} 673 for col, typ in common_diff_dtypes.items(): 674 if 'datetime' in typ and 'datetime' in common_dtypes[col]: 675 df_dtypes[col] = typ 676 detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col]) 677 for col in detected_dt_cols: 678 del common_diff_dtypes[col] 679 680 if debug: 681 dprint(f"Common columns with different dtypes (after dates):") 682 pprint(common_diff_dtypes) 683 684 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 685 if debug: 686 dprint( 687 "The incoming DataFrame has mostly the same types, skipping enforcement." 688 + f"The only detected difference was in the following datetime columns.\n" 689 + " Timezone information may be stripped." 690 ) 691 pprint(detected_dt_cols) 692 return df 693 694 for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items(): 695 previous_typ = common_dtypes[col] 696 mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ)) 697 explicitly_float = are_dtypes_equal(dtypes.get(col, 'object'), 'float') 698 explicitly_numeric = dtypes.get(col, 'numeric') == 'numeric' 699 cast_to_numeric = ( 700 explicitly_numeric 701 or col in df_numeric_cols 702 or (mixed_numeric_types and not explicitly_float) 703 ) and coerce_numeric 704 if cast_to_numeric: 705 common_dtypes[col] = attempt_cast_to_numeric 706 common_diff_dtypes[col] = attempt_cast_to_numeric 707 708 for d in common_diff_dtypes: 709 t = common_dtypes[d] 710 if debug: 711 dprint(f"Casting column {d} to dtype {t}.") 712 try: 713 df[d] = ( 714 df[d].apply(t) 715 if callable(t) 716 else df[d].astype(t) 717 ) 718 except Exception as e: 719 if debug: 720 dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}") 721 if 'int' in str(t).lower(): 722 try: 723 df[d] = df[d].astype('float64').astype(t) 724 except Exception as e: 725 if debug: 726 dprint(f"Was unable to convert to float then {t}.") 727 return df 728 729 730def get_datetime_bound_from_df( 731 df: Union['pd.DataFrame', dict, list], 732 datetime_column: str, 733 minimum: bool = True, 734 ) -> Union[int, datetime, None]: 735 """ 736 Return the minimum or maximum datetime (or integer) from a DataFrame. 737 738 Parameters 739 ---------- 740 df: pd.DataFrame 741 The DataFrame, list, or dict which contains the range axis. 742 743 datetime_column: str 744 The name of the datetime (or int) column. 745 746 minimum: bool 747 Whether to return the minimum (default) or maximum value. 748 749 Returns 750 ------- 751 The minimum or maximum datetime value in the dataframe, or `None`. 752 """ 753 if not datetime_column: 754 return None 755 756 def compare(a, b): 757 if a is None: 758 return b 759 if b is None: 760 return a 761 if minimum: 762 return a if a < b else b 763 return a if a > b else b 764 765 if isinstance(df, list): 766 if len(df) == 0: 767 return None 768 best_yet = df[0].get(datetime_column, None) 769 for doc in df: 770 val = doc.get(datetime_column, None) 771 best_yet = compare(best_yet, val) 772 return best_yet 773 774 if isinstance(df, dict): 775 if datetime_column not in df: 776 return None 777 best_yet = df[datetime_column][0] 778 for val in df[datetime_column]: 779 best_yet = compare(best_yet, val) 780 return best_yet 781 782 if 'DataFrame' in str(type(df)): 783 if datetime_column not in df.columns: 784 return None 785 return ( 786 df[datetime_column].min(skipna=True) 787 if minimum 788 else df[datetime_column].max(skipna=True) 789 ) 790 791 return None 792 793 794def df_is_chunk_generator(df: Any) -> bool: 795 """ 796 Determine whether to treat `df` as a chunk generator. 797 798 Note this should only be used in a context where generators are expected, 799 as it will return `True` for any iterable. 800 801 Parameters 802 ---------- 803 The DataFrame or chunk generator to evaluate. 804 805 Returns 806 ------- 807 A `bool` indicating whether to treat `df` as a generator. 808 """ 809 return ( 810 not isinstance(df, (dict, list, str)) 811 and 'DataFrame' not in str(type(df)) 812 and isinstance(df, (Generator, Iterable, Iterator)) 813 ) 814 815 816def chunksize_to_npartitions(chunksize: Optional[int]) -> int: 817 """ 818 Return the Dask `npartitions` value for a given `chunksize`. 819 """ 820 if chunksize == -1: 821 from meerschaum.config import get_config 822 chunksize = get_config('system', 'connectors', 'sql', 'chunksize') 823 if chunksize is None: 824 return 1 825 return -1 * chunksize 826 827 828def df_from_literal( 829 pipe: Optional[mrsm.Pipe] = None, 830 literal: str = None, 831 debug: bool = False 832 ) -> 'pd.DataFrame': 833 """ 834 Construct a dataframe from a literal value, using the pipe's datetime and value column names. 835 836 Parameters 837 ---------- 838 pipe: Optional['meerschaum.Pipe'], default None 839 The pipe which will consume the literal value. 840 841 Returns 842 ------- 843 A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns 844 and the literal as the value. 845 """ 846 from meerschaum.utils.packages import import_pandas 847 from meerschaum.utils.warnings import error, warn 848 from meerschaum.utils.debug import dprint 849 850 if pipe is None or literal is None: 851 error("Please provide a Pipe and a literal value") 852 ### this will raise an error if the columns are undefined 853 dt_name, val_name = pipe.get_columns('datetime', 'value') 854 855 val = literal 856 if isinstance(literal, str): 857 if debug: 858 dprint(f"Received literal string: '{literal}'") 859 import ast 860 try: 861 val = ast.literal_eval(literal) 862 except Exception as e: 863 warn( 864 "Failed to parse value from string:\n" + f"{literal}" + 865 "\n\nWill cast as a string instead."\ 866 ) 867 val = literal 868 869 from datetime import datetime, timezone 870 now = datetime.now(timezone.utc).replace(tzinfo=None) 871 872 pd = import_pandas() 873 return pd.DataFrame({dt_name: [now], val_name: [val]}) 874 875 876def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]: 877 """ 878 Return the first valid Dask DataFrame partition (if possible). 879 """ 880 pdf = None 881 for partition in ddf.partitions: 882 try: 883 pdf = partition.compute() 884 except Exception as e: 885 continue 886 if len(pdf) > 0: 887 return pdf 888 return ddf.compute() 889 890 891def query_df( 892 df: 'pd.DataFrame', 893 params: Optional[Dict[str, Any]] = None, 894 begin: Union[datetime, int, None] = None, 895 end: Union[datetime, int, None] = None, 896 datetime_column: Optional[str] = None, 897 select_columns: Optional[List[str]] = None, 898 omit_columns: Optional[List[str]] = None, 899 inplace: bool = False, 900 reset_index: bool = False, 901 debug: bool = False, 902 ) -> 'pd.DataFrame': 903 """ 904 Query the dataframe with the params dictionary. 905 906 Parameters 907 ---------- 908 df: pd.DataFrame 909 The DataFrame to query against. 910 911 params: Optional[Dict[str, Any]], default None 912 The parameters dictionary to use for the query. 913 914 begin: Union[datetime, int, None], default None 915 If `begin` and `datetime_column` are provided, only return rows with a timestamp 916 greater than or equal to this value. 917 918 end: Union[datetime, int, None], default None 919 If `begin` and `datetime_column` are provided, only return rows with a timestamp 920 less than this value. 921 922 datetime_column: Optional[str], default None 923 A `datetime_column` must be provided to use `begin` and `end`. 924 925 select_columns: Optional[List[str]], default None 926 If provided, only return these columns. 927 928 omit_columns: Optional[List[str]], default None 929 If provided, do not include these columns in the result. 930 931 inplace: bool, default False 932 If `True`, modify the DataFrame inplace rather than creating a new DataFrame. 933 934 reset_index: bool, default True 935 If `True`, reset the index in the resulting DataFrame. 936 937 Returns 938 ------- 939 A Pandas DataFrame query result. 940 """ 941 if not params and not begin and not end: 942 return df 943 944 import json 945 import meerschaum as mrsm 946 from meerschaum.utils.debug import dprint 947 from meerschaum.utils.misc import get_in_ex_params 948 from meerschaum.utils.warnings import warn 949 950 dtypes = {col: str(typ) for col, typ in df.dtypes.items()} 951 952 if begin or end: 953 if not datetime_column or datetime_column not in df.columns: 954 warn( 955 f"The datetime column '{datetime_column}' is not present in the Dataframe, " 956 + "ignoring begin and end...", 957 ) 958 begin, end = None, None 959 960 if debug: 961 dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}") 962 963 in_ex_params = get_in_ex_params(params) 964 965 def serialize(x: Any) -> str: 966 if isinstance(x, (dict, list, tuple)): 967 return json.dumps(x, sort_keys=True, separators=(',', ':'), default=str) 968 if hasattr(x, 'isoformat'): 969 return x.isoformat() 970 return str(x) 971 972 masks = [ 973 ( 974 (df[datetime_column] >= begin) 975 if begin is not None and datetime_column 976 else True 977 ) & ( 978 (df[datetime_column] < end) 979 if end is not None and datetime_column 980 else True 981 ) 982 ] 983 984 masks.extend([ 985 ( 986 ( 987 df[col].apply(serialize).isin( 988 [ 989 serialize(_in_val) 990 for _in_val in in_vals 991 ] 992 ) if in_vals else True 993 ) & ( 994 ~df[col].apply(serialize).isin( 995 [ 996 serialize(_ex_val) 997 for _ex_val in ex_vals 998 ] 999 ) if ex_vals else True 1000 ) 1001 ) 1002 for col, (in_vals, ex_vals) in in_ex_params.items() 1003 if col in df.columns 1004 ]) 1005 query_mask = masks[0] 1006 for mask in masks: 1007 query_mask = query_mask & mask 1008 1009 if inplace: 1010 df.where(query_mask, inplace=inplace) 1011 df.dropna(how='all', inplace=inplace) 1012 result_df = df 1013 else: 1014 result_df = df.where(query_mask).dropna(how='all') 1015 1016 if reset_index: 1017 result_df.reset_index(drop=True, inplace=True) 1018 1019 result_df = enforce_dtypes( 1020 result_df, 1021 dtypes, 1022 safe_copy = (not inplace), 1023 debug = debug, 1024 coerce_numeric = False, 1025 ) 1026 1027 if select_columns == ['*']: 1028 select_columns = None 1029 1030 if not select_columns and not omit_columns: 1031 return result_df 1032 1033 if select_columns: 1034 for col in list(result_df.columns): 1035 if col not in select_columns: 1036 del result_df[col] 1037 return result_df 1038 1039 if omit_columns: 1040 for col in list(result_df.columns): 1041 if col in omit_columns: 1042 del result_df[col] 1043 if debug: 1044 dprint(f"{dtypes=}") 1045 return result_df
23def add_missing_cols_to_df( 24 df: 'pd.DataFrame', 25 dtypes: Dict[str, Any], 26) -> 'pd.DataFrame': 27 """ 28 Add columns from the dtypes dictionary as null columns to a new DataFrame. 29 30 Parameters 31 ---------- 32 df: pd.DataFrame 33 The dataframe we should copy and add null columns. 34 35 dtypes: 36 The data types dictionary which may contain keys not present in `df.columns`. 37 38 Returns 39 ------- 40 A new `DataFrame` with the keys from `dtypes` added as null columns. 41 If `df.dtypes` is the same as `dtypes`, then return a reference to `df`. 42 NOTE: This will not ensure that dtypes are enforced! 43 44 Examples 45 -------- 46 >>> import pandas as pd 47 >>> df = pd.DataFrame([{'a': 1}]) 48 >>> dtypes = {'b': 'Int64'} 49 >>> add_missing_cols_to_df(df, dtypes) 50 a b 51 0 1 <NA> 52 >>> add_missing_cols_to_df(df, dtypes).dtypes 53 a int64 54 b Int64 55 dtype: object 56 >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes 57 a int64 58 dtype: object 59 >>> 60 """ 61 if set(df.columns) == set(dtypes): 62 return df 63 64 import traceback 65 from meerschaum.utils.packages import import_pandas, attempt_import 66 from meerschaum.utils.warnings import warn 67 from meerschaum.utils.dtypes import to_pandas_dtype 68 pandas = attempt_import('pandas') 69 70 def build_series(dtype: str): 71 return pandas.Series([], dtype=to_pandas_dtype(dtype)) 72 73 assign_kwargs = { 74 str(col): build_series(str(typ)) 75 for col, typ in dtypes.items() 76 if col not in df.columns 77 } 78 return df.assign(**assign_kwargs)
Add columns from the dtypes dictionary as null columns to a new DataFrame.
Parameters
- df (pd.DataFrame): The dataframe we should copy and add null columns.
- dtypes:: The data types dictionary which may contain keys not present in
df.columns
.
Returns
- A new
DataFrame
with the keys fromdtypes
added as null columns. - If
df.dtypes
is the same asdtypes
, then return a reference todf
. - NOTE (This will not ensure that dtypes are enforced!):
Examples
>>> import pandas as pd
>>> df = pd.DataFrame([{'a': 1}])
>>> dtypes = {'b': 'Int64'}
>>> add_missing_cols_to_df(df, dtypes)
a b
0 1 <NA>
>>> add_missing_cols_to_df(df, dtypes)meerschaum.utils.dtypes
a int64
b Int64
dtype: object
>>> add_missing_cols_to_df(df, {'a': 'object'})meerschaum.utils.dtypes
a int64
dtype: object
>>>
81def filter_unseen_df( 82 old_df: 'pd.DataFrame', 83 new_df: 'pd.DataFrame', 84 safe_copy: bool = True, 85 dtypes: Optional[Dict[str, Any]] = None, 86 debug: bool = False, 87 ) -> 'pd.DataFrame': 88 """ 89 Left join two DataFrames to find the newest unseen data. 90 91 Parameters 92 ---------- 93 old_df: 'pd.DataFrame' 94 The original (target) dataframe. Acts as a filter on the `new_df`. 95 96 new_df: 'pd.DataFrame' 97 The fetched (source) dataframe. Rows that are contained in `old_df` are removed. 98 99 safe_copy: bool, default True 100 If `True`, create a copy before comparing and modifying the dataframes. 101 Setting to `False` may mutate the DataFrames. 102 103 dtypes: Optional[Dict[str, Any]], default None 104 Optionally specify the datatypes of the dataframe. 105 106 debug: bool, default False 107 Verbosity toggle. 108 109 Returns 110 ------- 111 A pandas dataframe of the new, unseen rows in `new_df`. 112 113 Examples 114 -------- 115 ```python 116 >>> import pandas as pd 117 >>> df1 = pd.DataFrame({'a': [1,2]}) 118 >>> df2 = pd.DataFrame({'a': [2,3]}) 119 >>> filter_unseen_df(df1, df2) 120 a 121 0 3 122 123 ``` 124 125 """ 126 if old_df is None: 127 return new_df 128 129 if safe_copy: 130 old_df = old_df.copy() 131 new_df = new_df.copy() 132 133 import json 134 import functools 135 import traceback 136 from decimal import Decimal 137 from meerschaum.utils.warnings import warn 138 from meerschaum.utils.packages import import_pandas, attempt_import 139 from meerschaum.utils.dtypes import to_pandas_dtype, are_dtypes_equal, attempt_cast_to_numeric 140 from meerschaum.utils.debug import dprint 141 pd = import_pandas(debug=debug) 142 is_dask = 'dask' in new_df.__module__ 143 if is_dask: 144 pandas = attempt_import('pandas') 145 dd = attempt_import('dask.dataframe') 146 merge = dd.merge 147 NA = pandas.NA 148 else: 149 merge = pd.merge 150 NA = pd.NA 151 152 new_df_dtypes = dict(new_df.dtypes) 153 old_df_dtypes = dict(old_df.dtypes) 154 155 same_cols = set(new_df.columns) == set(old_df.columns) 156 if not same_cols: 157 new_df = add_missing_cols_to_df(new_df, old_df_dtypes) 158 old_df = add_missing_cols_to_df(old_df, new_df_dtypes) 159 160 ### Edge case: two empty lists cast to DFs. 161 elif len(new_df.columns) == 0: 162 return new_df 163 164 try: 165 ### Order matters when checking equality. 166 new_df = new_df[old_df.columns] 167 except Exception as e: 168 warn( 169 "Was not able to cast old columns onto new DataFrame. " + 170 f"Are both DataFrames the same shape? Error:\n{e}", 171 stacklevel = 3, 172 ) 173 return new_df[list(new_df_dtypes.keys())] 174 175 ### assume the old_df knows what it's doing, even if it's technically wrong. 176 if dtypes is None: 177 dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()} 178 179 dtypes = { 180 col: to_pandas_dtype(typ) 181 for col, typ in dtypes.items() 182 if col in new_df_dtypes and col in old_df_dtypes 183 } 184 for col, typ in new_df_dtypes.items(): 185 if col not in dtypes: 186 dtypes[col] = typ 187 188 cast_cols = True 189 try: 190 new_df = new_df.astype(dtypes) 191 cast_cols = False 192 except Exception as e: 193 warn( 194 f"Was not able to cast the new DataFrame to the given dtypes.\n{e}" 195 ) 196 197 new_numeric_cols_existing = get_numeric_cols(new_df) 198 old_numeric_cols = get_numeric_cols(old_df) 199 for col, typ in {k: v for k, v in dtypes.items()}.items(): 200 if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')): 201 new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float') 202 new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int') 203 new_is_numeric = col in new_numeric_cols_existing 204 old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float') 205 old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int') 206 old_is_numeric = col in old_numeric_cols 207 208 if ( 209 (new_is_float or new_is_int or new_is_numeric) 210 and 211 (old_is_float or old_is_int or old_is_numeric) 212 ): 213 dtypes[col] = attempt_cast_to_numeric 214 cast_cols = True 215 continue 216 217 ### Fallback to object if the types don't match. 218 warn( 219 f"Detected different types for '{col}' " 220 + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), " 221 + "falling back to 'object'..." 222 ) 223 dtypes[col] = 'object' 224 cast_cols = True 225 226 if cast_cols: 227 for col, dtype in dtypes.items(): 228 if col in new_df.columns: 229 try: 230 new_df[col] = ( 231 new_df[col].astype(dtype) 232 if not callable(dtype) 233 else new_df[col].apply(dtype) 234 ) 235 except Exception as e: 236 warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}") 237 238 serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str) 239 new_json_cols = get_json_cols(new_df) 240 old_json_cols = get_json_cols(old_df) 241 json_cols = set(new_json_cols + old_json_cols) 242 for json_col in old_json_cols: 243 old_df[json_col] = old_df[json_col].apply(serializer) 244 for json_col in new_json_cols: 245 new_df[json_col] = new_df[json_col].apply(serializer) 246 247 new_numeric_cols = get_numeric_cols(new_df) 248 numeric_cols = set(new_numeric_cols + old_numeric_cols) 249 for numeric_col in old_numeric_cols: 250 old_df[numeric_col] = old_df[numeric_col].apply( 251 lambda x: f'{x:f}' if isinstance(x, Decimal) else x 252 ) 253 for numeric_col in new_numeric_cols: 254 new_df[numeric_col] = new_df[numeric_col].apply( 255 lambda x: f'{x:f}' if isinstance(x, Decimal) else x 256 ) 257 258 joined_df = merge( 259 new_df.fillna(NA), 260 old_df.fillna(NA), 261 how = 'left', 262 on = None, 263 indicator = True, 264 ) 265 changed_rows_mask = (joined_df['_merge'] == 'left_only') 266 delta_df = joined_df[list(new_df_dtypes.keys())][changed_rows_mask].reset_index(drop=True) 267 268 for json_col in json_cols: 269 if json_col not in delta_df.columns: 270 continue 271 try: 272 delta_df[json_col] = delta_df[json_col].apply(json.loads) 273 except Exception as e: 274 warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}") 275 276 for numeric_col in numeric_cols: 277 if numeric_col not in delta_df.columns: 278 continue 279 try: 280 delta_df[numeric_col] = delta_df[numeric_col].apply(attempt_cast_to_numeric) 281 except Exception as e: 282 warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}") 283 284 return delta_df
Left join two DataFrames to find the newest unseen data.
Parameters
- old_df ('pd.DataFrame'):
The original (target) dataframe. Acts as a filter on the
new_df
. - new_df ('pd.DataFrame'):
The fetched (source) dataframe. Rows that are contained in
old_df
are removed. - safe_copy (bool, default True):
If
True
, create a copy before comparing and modifying the dataframes. Setting toFalse
may mutate the DataFrames. - dtypes (Optional[Dict[str, Any]], default None): Optionally specify the datatypes of the dataframe.
- debug (bool, default False): Verbosity toggle.
Returns
- A pandas dataframe of the new, unseen rows in
new_df
.
Examples
>>> import pandas as pd
>>> df1 = pd.DataFrame({'a': [1,2]})
>>> df2 = pd.DataFrame({'a': [2,3]})
>>> filter_unseen_df(df1, df2)
a
0 3
287def parse_df_datetimes( 288 df: 'pd.DataFrame', 289 ignore_cols: Optional[Iterable[str]] = None, 290 chunksize: Optional[int] = None, 291 dtype_backend: str = 'numpy_nullable', 292 debug: bool = False, 293 ) -> 'pd.DataFrame': 294 """ 295 Parse a pandas DataFrame for datetime columns and cast as datetimes. 296 297 Parameters 298 ---------- 299 df: pd.DataFrame 300 The pandas DataFrame to parse. 301 302 ignore_cols: Optional[Iterable[str]], default None 303 If provided, do not attempt to coerce these columns as datetimes. 304 305 chunksize: Optional[int], default None 306 If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe. 307 308 dtype_backend: str, default 'numpy_nullable' 309 If `df` is not a DataFrame and new one needs to be constructed, 310 use this as the datatypes backend. 311 Accepted values are 'numpy_nullable' and 'pyarrow'. 312 313 debug: bool, default False 314 Verbosity toggle. 315 316 Returns 317 ------- 318 A new pandas DataFrame with the determined datetime columns 319 (usually ISO strings) cast as datetimes. 320 321 Examples 322 -------- 323 ```python 324 >>> import pandas as pd 325 >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 326 >>> df.dtypes 327 a object 328 dtype: object 329 >>> df = parse_df_datetimes(df) 330 >>> df.dtypes 331 a datetime64[ns] 332 dtype: object 333 334 ``` 335 336 """ 337 from meerschaum.utils.packages import import_pandas, attempt_import 338 from meerschaum.utils.debug import dprint 339 from meerschaum.utils.warnings import warn 340 import traceback 341 pd = import_pandas() 342 pandas = attempt_import('pandas') 343 pd_name = pd.__name__ 344 using_dask = 'dask' in pd_name 345 df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__) 346 dask_dataframe = None 347 if using_dask or df_is_dask: 348 npartitions = chunksize_to_npartitions(chunksize) 349 dask_dataframe = attempt_import('dask.dataframe') 350 351 ### if df is a dict, build DataFrame 352 if isinstance(df, pandas.DataFrame): 353 pdf = df 354 elif df_is_dask and isinstance(df, dask_dataframe.DataFrame): 355 pdf = get_first_valid_dask_partition(df) 356 else: 357 if debug: 358 dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...") 359 360 if using_dask: 361 if isinstance(df, list): 362 keys = set() 363 for doc in df: 364 for key in doc: 365 keys.add(key) 366 df = pd.DataFrame.from_dict( 367 { 368 k: [ 369 doc.get(k, None) 370 for doc in df 371 ] for k in keys 372 }, 373 npartitions = npartitions, 374 ) 375 elif isinstance(df, dict): 376 df = pd.DataFrame.from_dict(df, npartitions=npartitions) 377 elif 'pandas.core.frame.DataFrame' in str(type(df)): 378 df = pd.from_pandas(df, npartitions=npartitions) 379 else: 380 raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.") 381 pandas = attempt_import('pandas') 382 pdf = get_first_valid_dask_partition(df) 383 384 else: 385 df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend) 386 pdf = df 387 388 ### skip parsing if DataFrame is empty 389 if len(pdf) == 0: 390 if debug: 391 dprint(f"df is empty. Returning original DataFrame without casting datetime columns...") 392 return df 393 394 ignore_cols = set( 395 (ignore_cols or []) + [ 396 col 397 for col, dtype in pdf.dtypes.items() 398 if 'datetime' in str(dtype) 399 ] 400 ) 401 cols_to_inspect = [col for col in pdf.columns if col not in ignore_cols] 402 403 if len(cols_to_inspect) == 0: 404 if debug: 405 dprint(f"All columns are ignored, skipping datetime detection...") 406 return df 407 408 ### apply regex to columns to determine which are ISO datetimes 409 iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+' 410 dt_mask = pdf[cols_to_inspect].astype(str).apply( 411 lambda s: s.str.match(iso_dt_regex).all() 412 ) 413 414 ### list of datetime column names 415 datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]] 416 if not datetime_cols: 417 if debug: 418 dprint("No columns detected as datetimes, returning...") 419 return df 420 421 if debug: 422 dprint("Converting columns to datetimes: " + str(datetime_cols)) 423 424 try: 425 if not using_dask: 426 df[datetime_cols] = df[datetime_cols].apply(pd.to_datetime, utc=True) 427 else: 428 df[datetime_cols] = df[datetime_cols].apply( 429 pd.to_datetime, 430 utc = True, 431 axis = 1, 432 meta = { 433 col: 'datetime64[ns]' 434 for col in datetime_cols 435 } 436 ) 437 except Exception as e: 438 warn( 439 f"Unable to apply `pd.to_datetime` to {items_str(datetime_cols)}:\n" 440 + f"{traceback.format_exc()}" 441 ) 442 443 for dt in datetime_cols: 444 try: 445 df[dt] = df[dt].dt.tz_localize(None) 446 except Exception as e: 447 warn(f"Unable to convert column '{dt}' to naive datetime:\n{traceback.format_exc()}") 448 449 return df
Parse a pandas DataFrame for datetime columns and cast as datetimes.
Parameters
- df (pd.DataFrame): The pandas DataFrame to parse.
- ignore_cols (Optional[Iterable[str]], default None): If provided, do not attempt to coerce these columns as datetimes.
- chunksize (Optional[int], default None):
If the pandas implementation is
'dask'
, use this chunksize for the distributed dataframe. - dtype_backend (str, default 'numpy_nullable'):
If
df
is not a DataFrame and new one needs to be constructed, use this as the datatypes backend. Accepted values are 'numpy_nullable' and 'pyarrow'. - debug (bool, default False): Verbosity toggle.
Returns
- A new pandas DataFrame with the determined datetime columns
- (usually ISO strings) cast as datetimes.
Examples
>>> import pandas as pd
>>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']})
>>> df.dtypes
a object
dtype: object
>>> df = parse_df_datetimes(df)
>>> df.dtypes
a datetime64[ns]
dtype: object
452def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]: 453 """ 454 Get the columns which contain unhashable objects from a Pandas DataFrame. 455 456 Parameters 457 ---------- 458 df: pd.DataFrame 459 The DataFrame which may contain unhashable objects. 460 461 Returns 462 ------- 463 A list of columns. 464 """ 465 if len(df) == 0: 466 return [] 467 468 is_dask = 'dask' in df.__module__ 469 if is_dask: 470 from meerschaum.utils.packages import attempt_import 471 pandas = attempt_import('pandas') 472 df = pandas.DataFrame(get_first_valid_dask_partition(df)) 473 return [ 474 col for col, val in df.iloc[0].items() 475 if not isinstance(val, Hashable) 476 ]
Get the columns which contain unhashable objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
- A list of columns.
479def get_json_cols(df: 'pd.DataFrame') -> List[str]: 480 """ 481 Get the columns which contain unhashable objects from a Pandas DataFrame. 482 483 Parameters 484 ---------- 485 df: pd.DataFrame 486 The DataFrame which may contain unhashable objects. 487 488 Returns 489 ------- 490 A list of columns to be encoded as JSON. 491 """ 492 is_dask = 'dask' in df.__module__ 493 if is_dask: 494 df = get_first_valid_dask_partition(df) 495 496 if len(df) == 0: 497 return [] 498 499 cols_indices = { 500 col: df[col].first_valid_index() 501 for col in df.columns 502 } 503 return [ 504 col 505 for col, ix in cols_indices.items() 506 if ( 507 ix is not None 508 and 509 not isinstance(df.loc[ix][col], Hashable) 510 ) 511 ]
Get the columns which contain unhashable objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
- A list of columns to be encoded as JSON.
514def get_numeric_cols(df: 'pd.DataFrame') -> List[str]: 515 """ 516 Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame. 517 518 Parameters 519 ---------- 520 df: pd.DataFrame 521 The DataFrame which may contain decimal objects. 522 523 Returns 524 ------- 525 A list of columns to treat as numerics. 526 """ 527 from decimal import Decimal 528 is_dask = 'dask' in df.__module__ 529 if is_dask: 530 df = get_first_valid_dask_partition(df) 531 532 if len(df) == 0: 533 return [] 534 535 cols_indices = { 536 col: df[col].first_valid_index() 537 for col in df.columns 538 } 539 return [ 540 col 541 for col, ix in cols_indices.items() 542 if ( 543 ix is not None 544 and 545 isinstance(df.loc[ix][col], Decimal) 546 ) 547 ]
Get the columns which contain decimal.Decimal
objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain decimal objects.
Returns
- A list of columns to treat as numerics.
550def enforce_dtypes( 551 df: 'pd.DataFrame', 552 dtypes: Dict[str, str], 553 safe_copy: bool = True, 554 coerce_numeric: bool = True, 555 debug: bool = False, 556 ) -> 'pd.DataFrame': 557 """ 558 Enforce the `dtypes` dictionary on a DataFrame. 559 560 Parameters 561 ---------- 562 df: pd.DataFrame 563 The DataFrame on which to enforce dtypes. 564 565 dtypes: Dict[str, str] 566 The data types to attempt to enforce on the DataFrame. 567 568 safe_copy: bool, default True 569 If `True`, create a copy before comparing and modifying the dataframes. 570 Setting to `False` may mutate the DataFrames. 571 See `meerschaum.utils.dataframe.filter_unseen_df`. 572 573 coerce_numeric: bool, default True 574 If `True`, convert float and int collisions to numeric. 575 576 debug: bool, default False 577 Verbosity toggle. 578 579 Returns 580 ------- 581 The Pandas DataFrame with the types enforced. 582 """ 583 import json 584 import traceback 585 from decimal import Decimal 586 from meerschaum.utils.debug import dprint 587 from meerschaum.utils.warnings import warn 588 from meerschaum.utils.formatting import pprint 589 from meerschaum.config.static import STATIC_CONFIG 590 from meerschaum.utils.packages import import_pandas 591 from meerschaum.utils.dtypes import ( 592 are_dtypes_equal, 593 to_pandas_dtype, 594 is_dtype_numeric, 595 attempt_cast_to_numeric, 596 ) 597 if safe_copy: 598 df = df.copy() 599 df_dtypes = {c: str(t) for c, t in df.dtypes.items()} 600 if len(df_dtypes) == 0: 601 if debug: 602 dprint("Incoming DataFrame has no columns. Skipping enforcement...") 603 return df 604 605 pipe_pandas_dtypes = { 606 col: to_pandas_dtype(typ) 607 for col, typ in dtypes.items() 608 } 609 json_cols = [ 610 col 611 for col, typ in dtypes.items() 612 if typ == 'json' 613 ] 614 numeric_cols = [ 615 col 616 for col, typ in dtypes.items() 617 if typ == 'numeric' 618 ] 619 df_numeric_cols = get_numeric_cols(df) 620 if debug: 621 dprint(f"Desired data types:") 622 pprint(dtypes) 623 dprint(f"Data types for incoming DataFrame:") 624 pprint(df_dtypes) 625 626 if json_cols and len(df) > 0: 627 if debug: 628 dprint(f"Checking columns for JSON encoding: {json_cols}") 629 for col in json_cols: 630 if col in df.columns: 631 try: 632 df[col] = df[col].apply( 633 ( 634 lambda x: ( 635 json.loads(x) 636 if isinstance(x, str) 637 else x 638 ) 639 ) 640 ) 641 except Exception as e: 642 if debug: 643 dprint(f"Unable to parse column '{col}' as JSON:\n{e}") 644 645 if numeric_cols: 646 if debug: 647 dprint(f"Checking for numerics: {numeric_cols}") 648 for col in numeric_cols: 649 if col in df.columns: 650 try: 651 df[col] = df[col].apply(attempt_cast_to_numeric) 652 except Exception as e: 653 if debug: 654 dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}") 655 656 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 657 if debug: 658 dprint(f"Data types match. Exiting enforcement...") 659 return df 660 661 common_dtypes = {} 662 common_diff_dtypes = {} 663 for col, typ in pipe_pandas_dtypes.items(): 664 if col in df_dtypes: 665 common_dtypes[col] = typ 666 if not are_dtypes_equal(typ, df_dtypes[col]): 667 common_diff_dtypes[col] = df_dtypes[col] 668 669 if debug: 670 dprint(f"Common columns with different dtypes:") 671 pprint(common_diff_dtypes) 672 673 detected_dt_cols = {} 674 for col, typ in common_diff_dtypes.items(): 675 if 'datetime' in typ and 'datetime' in common_dtypes[col]: 676 df_dtypes[col] = typ 677 detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col]) 678 for col in detected_dt_cols: 679 del common_diff_dtypes[col] 680 681 if debug: 682 dprint(f"Common columns with different dtypes (after dates):") 683 pprint(common_diff_dtypes) 684 685 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 686 if debug: 687 dprint( 688 "The incoming DataFrame has mostly the same types, skipping enforcement." 689 + f"The only detected difference was in the following datetime columns.\n" 690 + " Timezone information may be stripped." 691 ) 692 pprint(detected_dt_cols) 693 return df 694 695 for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items(): 696 previous_typ = common_dtypes[col] 697 mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ)) 698 explicitly_float = are_dtypes_equal(dtypes.get(col, 'object'), 'float') 699 explicitly_numeric = dtypes.get(col, 'numeric') == 'numeric' 700 cast_to_numeric = ( 701 explicitly_numeric 702 or col in df_numeric_cols 703 or (mixed_numeric_types and not explicitly_float) 704 ) and coerce_numeric 705 if cast_to_numeric: 706 common_dtypes[col] = attempt_cast_to_numeric 707 common_diff_dtypes[col] = attempt_cast_to_numeric 708 709 for d in common_diff_dtypes: 710 t = common_dtypes[d] 711 if debug: 712 dprint(f"Casting column {d} to dtype {t}.") 713 try: 714 df[d] = ( 715 df[d].apply(t) 716 if callable(t) 717 else df[d].astype(t) 718 ) 719 except Exception as e: 720 if debug: 721 dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}") 722 if 'int' in str(t).lower(): 723 try: 724 df[d] = df[d].astype('float64').astype(t) 725 except Exception as e: 726 if debug: 727 dprint(f"Was unable to convert to float then {t}.") 728 return df
Enforce the dtypes
dictionary on a DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame on which to enforce dtypes.
- dtypes (Dict[str, str]): The data types to attempt to enforce on the DataFrame.
- safe_copy (bool, default True):
If
True
, create a copy before comparing and modifying the dataframes. Setting toFalse
may mutate the DataFrames. Seefilter_unseen_df
. - coerce_numeric (bool, default True):
If
True
, convert float and int collisions to numeric. - debug (bool, default False): Verbosity toggle.
Returns
- The Pandas DataFrame with the types enforced.
731def get_datetime_bound_from_df( 732 df: Union['pd.DataFrame', dict, list], 733 datetime_column: str, 734 minimum: bool = True, 735 ) -> Union[int, datetime, None]: 736 """ 737 Return the minimum or maximum datetime (or integer) from a DataFrame. 738 739 Parameters 740 ---------- 741 df: pd.DataFrame 742 The DataFrame, list, or dict which contains the range axis. 743 744 datetime_column: str 745 The name of the datetime (or int) column. 746 747 minimum: bool 748 Whether to return the minimum (default) or maximum value. 749 750 Returns 751 ------- 752 The minimum or maximum datetime value in the dataframe, or `None`. 753 """ 754 if not datetime_column: 755 return None 756 757 def compare(a, b): 758 if a is None: 759 return b 760 if b is None: 761 return a 762 if minimum: 763 return a if a < b else b 764 return a if a > b else b 765 766 if isinstance(df, list): 767 if len(df) == 0: 768 return None 769 best_yet = df[0].get(datetime_column, None) 770 for doc in df: 771 val = doc.get(datetime_column, None) 772 best_yet = compare(best_yet, val) 773 return best_yet 774 775 if isinstance(df, dict): 776 if datetime_column not in df: 777 return None 778 best_yet = df[datetime_column][0] 779 for val in df[datetime_column]: 780 best_yet = compare(best_yet, val) 781 return best_yet 782 783 if 'DataFrame' in str(type(df)): 784 if datetime_column not in df.columns: 785 return None 786 return ( 787 df[datetime_column].min(skipna=True) 788 if minimum 789 else df[datetime_column].max(skipna=True) 790 ) 791 792 return None
Return the minimum or maximum datetime (or integer) from a DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame, list, or dict which contains the range axis.
- datetime_column (str): The name of the datetime (or int) column.
- minimum (bool): Whether to return the minimum (default) or maximum value.
Returns
- The minimum or maximum datetime value in the dataframe, or
None
.
795def df_is_chunk_generator(df: Any) -> bool: 796 """ 797 Determine whether to treat `df` as a chunk generator. 798 799 Note this should only be used in a context where generators are expected, 800 as it will return `True` for any iterable. 801 802 Parameters 803 ---------- 804 The DataFrame or chunk generator to evaluate. 805 806 Returns 807 ------- 808 A `bool` indicating whether to treat `df` as a generator. 809 """ 810 return ( 811 not isinstance(df, (dict, list, str)) 812 and 'DataFrame' not in str(type(df)) 813 and isinstance(df, (Generator, Iterable, Iterator)) 814 )
Determine whether to treat df
as a chunk generator.
Note this should only be used in a context where generators are expected,
as it will return True
for any iterable.
Parameters
- The DataFrame or chunk generator to evaluate.
Returns
- A
bool
indicating whether to treatdf
as a generator.
817def chunksize_to_npartitions(chunksize: Optional[int]) -> int: 818 """ 819 Return the Dask `npartitions` value for a given `chunksize`. 820 """ 821 if chunksize == -1: 822 from meerschaum.config import get_config 823 chunksize = get_config('system', 'connectors', 'sql', 'chunksize') 824 if chunksize is None: 825 return 1 826 return -1 * chunksize
Return the Dask npartitions
value for a given chunksize
.
829def df_from_literal( 830 pipe: Optional[mrsm.Pipe] = None, 831 literal: str = None, 832 debug: bool = False 833 ) -> 'pd.DataFrame': 834 """ 835 Construct a dataframe from a literal value, using the pipe's datetime and value column names. 836 837 Parameters 838 ---------- 839 pipe: Optional['meerschaum.Pipe'], default None 840 The pipe which will consume the literal value. 841 842 Returns 843 ------- 844 A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns 845 and the literal as the value. 846 """ 847 from meerschaum.utils.packages import import_pandas 848 from meerschaum.utils.warnings import error, warn 849 from meerschaum.utils.debug import dprint 850 851 if pipe is None or literal is None: 852 error("Please provide a Pipe and a literal value") 853 ### this will raise an error if the columns are undefined 854 dt_name, val_name = pipe.get_columns('datetime', 'value') 855 856 val = literal 857 if isinstance(literal, str): 858 if debug: 859 dprint(f"Received literal string: '{literal}'") 860 import ast 861 try: 862 val = ast.literal_eval(literal) 863 except Exception as e: 864 warn( 865 "Failed to parse value from string:\n" + f"{literal}" + 866 "\n\nWill cast as a string instead."\ 867 ) 868 val = literal 869 870 from datetime import datetime, timezone 871 now = datetime.now(timezone.utc).replace(tzinfo=None) 872 873 pd = import_pandas() 874 return pd.DataFrame({dt_name: [now], val_name: [val]})
Construct a dataframe from a literal value, using the pipe's datetime and value column names.
Parameters
- pipe (Optional['meerschaum.Pipe'], default None): The pipe which will consume the literal value.
Returns
- A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
- and the literal as the value.
877def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]: 878 """ 879 Return the first valid Dask DataFrame partition (if possible). 880 """ 881 pdf = None 882 for partition in ddf.partitions: 883 try: 884 pdf = partition.compute() 885 except Exception as e: 886 continue 887 if len(pdf) > 0: 888 return pdf 889 return ddf.compute()
Return the first valid Dask DataFrame partition (if possible).
892def query_df( 893 df: 'pd.DataFrame', 894 params: Optional[Dict[str, Any]] = None, 895 begin: Union[datetime, int, None] = None, 896 end: Union[datetime, int, None] = None, 897 datetime_column: Optional[str] = None, 898 select_columns: Optional[List[str]] = None, 899 omit_columns: Optional[List[str]] = None, 900 inplace: bool = False, 901 reset_index: bool = False, 902 debug: bool = False, 903 ) -> 'pd.DataFrame': 904 """ 905 Query the dataframe with the params dictionary. 906 907 Parameters 908 ---------- 909 df: pd.DataFrame 910 The DataFrame to query against. 911 912 params: Optional[Dict[str, Any]], default None 913 The parameters dictionary to use for the query. 914 915 begin: Union[datetime, int, None], default None 916 If `begin` and `datetime_column` are provided, only return rows with a timestamp 917 greater than or equal to this value. 918 919 end: Union[datetime, int, None], default None 920 If `begin` and `datetime_column` are provided, only return rows with a timestamp 921 less than this value. 922 923 datetime_column: Optional[str], default None 924 A `datetime_column` must be provided to use `begin` and `end`. 925 926 select_columns: Optional[List[str]], default None 927 If provided, only return these columns. 928 929 omit_columns: Optional[List[str]], default None 930 If provided, do not include these columns in the result. 931 932 inplace: bool, default False 933 If `True`, modify the DataFrame inplace rather than creating a new DataFrame. 934 935 reset_index: bool, default True 936 If `True`, reset the index in the resulting DataFrame. 937 938 Returns 939 ------- 940 A Pandas DataFrame query result. 941 """ 942 if not params and not begin and not end: 943 return df 944 945 import json 946 import meerschaum as mrsm 947 from meerschaum.utils.debug import dprint 948 from meerschaum.utils.misc import get_in_ex_params 949 from meerschaum.utils.warnings import warn 950 951 dtypes = {col: str(typ) for col, typ in df.dtypes.items()} 952 953 if begin or end: 954 if not datetime_column or datetime_column not in df.columns: 955 warn( 956 f"The datetime column '{datetime_column}' is not present in the Dataframe, " 957 + "ignoring begin and end...", 958 ) 959 begin, end = None, None 960 961 if debug: 962 dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}") 963 964 in_ex_params = get_in_ex_params(params) 965 966 def serialize(x: Any) -> str: 967 if isinstance(x, (dict, list, tuple)): 968 return json.dumps(x, sort_keys=True, separators=(',', ':'), default=str) 969 if hasattr(x, 'isoformat'): 970 return x.isoformat() 971 return str(x) 972 973 masks = [ 974 ( 975 (df[datetime_column] >= begin) 976 if begin is not None and datetime_column 977 else True 978 ) & ( 979 (df[datetime_column] < end) 980 if end is not None and datetime_column 981 else True 982 ) 983 ] 984 985 masks.extend([ 986 ( 987 ( 988 df[col].apply(serialize).isin( 989 [ 990 serialize(_in_val) 991 for _in_val in in_vals 992 ] 993 ) if in_vals else True 994 ) & ( 995 ~df[col].apply(serialize).isin( 996 [ 997 serialize(_ex_val) 998 for _ex_val in ex_vals 999 ] 1000 ) if ex_vals else True 1001 ) 1002 ) 1003 for col, (in_vals, ex_vals) in in_ex_params.items() 1004 if col in df.columns 1005 ]) 1006 query_mask = masks[0] 1007 for mask in masks: 1008 query_mask = query_mask & mask 1009 1010 if inplace: 1011 df.where(query_mask, inplace=inplace) 1012 df.dropna(how='all', inplace=inplace) 1013 result_df = df 1014 else: 1015 result_df = df.where(query_mask).dropna(how='all') 1016 1017 if reset_index: 1018 result_df.reset_index(drop=True, inplace=True) 1019 1020 result_df = enforce_dtypes( 1021 result_df, 1022 dtypes, 1023 safe_copy = (not inplace), 1024 debug = debug, 1025 coerce_numeric = False, 1026 ) 1027 1028 if select_columns == ['*']: 1029 select_columns = None 1030 1031 if not select_columns and not omit_columns: 1032 return result_df 1033 1034 if select_columns: 1035 for col in list(result_df.columns): 1036 if col not in select_columns: 1037 del result_df[col] 1038 return result_df 1039 1040 if omit_columns: 1041 for col in list(result_df.columns): 1042 if col in omit_columns: 1043 del result_df[col] 1044 if debug: 1045 dprint(f"{dtypes=}") 1046 return result_df
Query the dataframe with the params dictionary.
Parameters
- df (pd.DataFrame): The DataFrame to query against.
- params (Optional[Dict[str, Any]], default None): The parameters dictionary to use for the query.
- begin (Union[datetime, int, None], default None):
If
begin
anddatetime_column
are provided, only return rows with a timestamp greater than or equal to this value. - end (Union[datetime, int, None], default None):
If
begin
anddatetime_column
are provided, only return rows with a timestamp less than this value. - datetime_column (Optional[str], default None):
A
datetime_column
must be provided to usebegin
andend
. - select_columns (Optional[List[str]], default None): If provided, only return these columns.
- omit_columns (Optional[List[str]], default None): If provided, do not include these columns in the result.
- inplace (bool, default False):
If
True
, modify the DataFrame inplace rather than creating a new DataFrame. - reset_index (bool, default True):
If
True
, reset the index in the resulting DataFrame.
Returns
- A Pandas DataFrame query result.