meerschaum.utils.dataframe
Utility functions for working with DataFrames.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Utility functions for working with DataFrames. 7""" 8 9from __future__ import annotations 10from datetime import datetime 11from meerschaum.utils.typing import ( 12 Optional, Dict, Any, List, Hashable, Generator, 13 Iterator, Iterable, Union, Tuple, 14) 15 16 17def add_missing_cols_to_df(df: 'pd.DataFrame', dtypes: Dict[str, Any]) -> pd.DataFrame: 18 """ 19 Add columns from the dtypes dictionary as null columns to a new DataFrame. 20 21 Parameters 22 ---------- 23 df: pd.DataFrame 24 The dataframe we should copy and add null columns. 25 26 dtypes: 27 The data types dictionary which may contain keys not present in `df.columns`. 28 29 Returns 30 ------- 31 A new `DataFrame` with the keys from `dtypes` added as null columns. 32 If `df.dtypes` is the same as `dtypes`, then return a reference to `df`. 33 NOTE: This will not ensure that dtypes are enforced! 34 35 Examples 36 -------- 37 >>> import pandas as pd 38 >>> df = pd.DataFrame([{'a': 1}]) 39 >>> dtypes = {'b': 'Int64'} 40 >>> add_missing_cols_to_df(df, dtypes) 41 a b 42 0 1 <NA> 43 >>> add_missing_cols_to_df(df, dtypes).dtypes 44 a int64 45 b Int64 46 dtype: object 47 >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes 48 a int64 49 dtype: object 50 >>> 51 """ 52 if set(df.columns) == set(dtypes): 53 return df 54 55 import traceback 56 from meerschaum.utils.packages import import_pandas, attempt_import 57 from meerschaum.utils.warnings import warn 58 from meerschaum.utils.dtypes import to_pandas_dtype 59 pandas = attempt_import('pandas') 60 61 def build_series(dtype: str): 62 return pandas.Series([], dtype=to_pandas_dtype(dtype)) 63 64 assign_kwargs = { 65 str(col): build_series(str(typ)) 66 for col, typ in dtypes.items() 67 if col not in df.columns 68 } 69 return df.assign(**assign_kwargs) 70 71 72def filter_unseen_df( 73 old_df: 'pd.DataFrame', 74 new_df: 'pd.DataFrame', 75 safe_copy: bool = True, 76 dtypes: Optional[Dict[str, Any]] = None, 77 debug: bool = False, 78 ) -> 'pd.DataFrame': 79 """ 80 Left join two DataFrames to find the newest unseen data. 81 82 Parameters 83 ---------- 84 old_df: 'pd.DataFrame' 85 The original (target) dataframe. Acts as a filter on the `new_df`. 86 87 new_df: 'pd.DataFrame' 88 The fetched (source) dataframe. Rows that are contained in `old_df` are removed. 89 90 safe_copy: bool, default True 91 If `True`, create a copy before comparing and modifying the dataframes. 92 Setting to `False` may mutate the DataFrames. 93 94 dtypes: Optional[Dict[str, Any]], default None 95 Optionally specify the datatypes of the dataframe. 96 97 debug: bool, default False 98 Verbosity toggle. 99 100 Returns 101 ------- 102 A pandas dataframe of the new, unseen rows in `new_df`. 103 104 Examples 105 -------- 106 ```python 107 >>> import pandas as pd 108 >>> df1 = pd.DataFrame({'a': [1,2]}) 109 >>> df2 = pd.DataFrame({'a': [2,3]}) 110 >>> filter_unseen_df(df1, df2) 111 a 112 0 3 113 114 ``` 115 116 """ 117 if old_df is None: 118 return new_df 119 120 if safe_copy: 121 old_df = old_df.copy() 122 new_df = new_df.copy() 123 124 import json 125 import functools 126 import traceback 127 from decimal import Decimal 128 from meerschaum.utils.warnings import warn 129 from meerschaum.utils.packages import import_pandas, attempt_import 130 from meerschaum.utils.dtypes import to_pandas_dtype, are_dtypes_equal, attempt_cast_to_numeric 131 from meerschaum.utils.debug import dprint 132 pd = import_pandas(debug=debug) 133 is_dask = 'dask' in new_df.__module__ 134 if is_dask: 135 pandas = attempt_import('pandas') 136 dd = attempt_import('dask.dataframe') 137 merge = dd.merge 138 NA = pandas.NA 139 else: 140 merge = pd.merge 141 NA = pd.NA 142 143 new_df_dtypes = dict(new_df.dtypes) 144 old_df_dtypes = dict(old_df.dtypes) 145 146 same_cols = set(new_df.columns) == set(old_df.columns) 147 if not same_cols: 148 new_df = add_missing_cols_to_df(new_df, old_df_dtypes) 149 old_df = add_missing_cols_to_df(old_df, new_df_dtypes) 150 151 ### Edge case: two empty lists cast to DFs. 152 elif len(new_df.columns) == 0: 153 return new_df 154 155 try: 156 ### Order matters when checking equality. 157 new_df = new_df[old_df.columns] 158 except Exception as e: 159 warn( 160 "Was not able to cast old columns onto new DataFrame. " + 161 f"Are both DataFrames the same shape? Error:\n{e}", 162 stacklevel = 3, 163 ) 164 return new_df[list(new_df_dtypes.keys())] 165 166 ### assume the old_df knows what it's doing, even if it's technically wrong. 167 if dtypes is None: 168 dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()} 169 170 dtypes = { 171 col: to_pandas_dtype(typ) 172 for col, typ in dtypes.items() 173 if col in new_df_dtypes and col in old_df_dtypes 174 } 175 for col, typ in new_df_dtypes.items(): 176 if col not in dtypes: 177 dtypes[col] = typ 178 179 cast_cols = True 180 try: 181 new_df = new_df.astype(dtypes) 182 cast_cols = False 183 except Exception as e: 184 warn( 185 f"Was not able to cast the new DataFrame to the given dtypes.\n{e}" 186 ) 187 188 new_numeric_cols_existing = get_numeric_cols(new_df) 189 old_numeric_cols = get_numeric_cols(old_df) 190 for col, typ in {k: v for k, v in dtypes.items()}.items(): 191 if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')): 192 new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float') 193 new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int') 194 new_is_numeric = col in new_numeric_cols_existing 195 old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float') 196 old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int') 197 old_is_numeric = col in old_numeric_cols 198 199 if ( 200 (new_is_float or new_is_int or new_is_numeric) 201 and 202 (old_is_float or old_is_int or old_is_numeric) 203 ): 204 dtypes[col] = attempt_cast_to_numeric 205 cast_cols = True 206 continue 207 208 ### Fallback to object if the types don't match. 209 warn( 210 f"Detected different types for '{col}' " 211 + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), " 212 + "falling back to 'object'..." 213 ) 214 dtypes[col] = 'object' 215 cast_cols = True 216 217 if cast_cols: 218 for col, dtype in dtypes.items(): 219 if col in new_df.columns: 220 try: 221 new_df[col] = ( 222 new_df[col].astype(dtype) 223 if not callable(dtype) 224 else new_df[col].apply(dtype) 225 ) 226 except Exception as e: 227 warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}") 228 229 serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str) 230 new_json_cols = get_json_cols(new_df) 231 old_json_cols = get_json_cols(old_df) 232 json_cols = set(new_json_cols + old_json_cols) 233 for json_col in old_json_cols: 234 old_df[json_col] = old_df[json_col].apply(serializer) 235 for json_col in new_json_cols: 236 new_df[json_col] = new_df[json_col].apply(serializer) 237 238 new_numeric_cols = get_numeric_cols(new_df) 239 numeric_cols = set(new_numeric_cols + old_numeric_cols) 240 for numeric_col in old_numeric_cols: 241 old_df[numeric_col] = old_df[numeric_col].apply( 242 lambda x: f'{x:f}' if isinstance(x, Decimal) else x 243 ) 244 for numeric_col in new_numeric_cols: 245 new_df[numeric_col] = new_df[numeric_col].apply( 246 lambda x: f'{x:f}' if isinstance(x, Decimal) else x 247 ) 248 249 joined_df = merge( 250 new_df.fillna(NA), 251 old_df.fillna(NA), 252 how = 'left', 253 on = None, 254 indicator = True, 255 ) 256 changed_rows_mask = (joined_df['_merge'] == 'left_only') 257 delta_df = joined_df[list(new_df_dtypes.keys())][changed_rows_mask].reset_index(drop=True) 258 259 for json_col in json_cols: 260 if json_col not in delta_df.columns: 261 continue 262 try: 263 delta_df[json_col] = delta_df[json_col].apply(json.loads) 264 except Exception as e: 265 warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}") 266 267 for numeric_col in numeric_cols: 268 if numeric_col not in delta_df.columns: 269 continue 270 try: 271 delta_df[numeric_col] = delta_df[numeric_col].apply(attempt_cast_to_numeric) 272 except Exception as e: 273 warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}") 274 275 return delta_df 276 277 278def parse_df_datetimes( 279 df: 'pd.DataFrame', 280 ignore_cols: Optional[Iterable[str]] = None, 281 chunksize: Optional[int] = None, 282 dtype_backend: str = 'numpy_nullable', 283 debug: bool = False, 284 ) -> 'pd.DataFrame': 285 """ 286 Parse a pandas DataFrame for datetime columns and cast as datetimes. 287 288 Parameters 289 ---------- 290 df: pd.DataFrame 291 The pandas DataFrame to parse. 292 293 ignore_cols: Optional[Iterable[str]], default None 294 If provided, do not attempt to coerce these columns as datetimes. 295 296 chunksize: Optional[int], default None 297 If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe. 298 299 dtype_backend: str, default 'numpy_nullable' 300 If `df` is not a DataFrame and new one needs to be constructed, 301 use this as the datatypes backend. 302 Accepted values are 'numpy_nullable' and 'pyarrow'. 303 304 debug: bool, default False 305 Verbosity toggle. 306 307 Returns 308 ------- 309 A new pandas DataFrame with the determined datetime columns 310 (usually ISO strings) cast as datetimes. 311 312 Examples 313 -------- 314 ```python 315 >>> import pandas as pd 316 >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 317 >>> df.dtypes 318 a object 319 dtype: object 320 >>> df = parse_df_datetimes(df) 321 >>> df.dtypes 322 a datetime64[ns] 323 dtype: object 324 325 ``` 326 327 """ 328 from meerschaum.utils.packages import import_pandas, attempt_import 329 from meerschaum.utils.debug import dprint 330 from meerschaum.utils.warnings import warn 331 import traceback 332 pd = import_pandas() 333 pandas = attempt_import('pandas') 334 pd_name = pd.__name__ 335 using_dask = 'dask' in pd_name 336 df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__) 337 dask_dataframe = None 338 if using_dask or df_is_dask: 339 npartitions = chunksize_to_npartitions(chunksize) 340 dask_dataframe = attempt_import('dask.dataframe') 341 342 ### if df is a dict, build DataFrame 343 if isinstance(df, pandas.DataFrame): 344 pdf = df 345 elif df_is_dask and isinstance(df, dask_dataframe.DataFrame): 346 pdf = get_first_valid_dask_partition(df) 347 else: 348 if debug: 349 dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...") 350 351 if using_dask: 352 if isinstance(df, list): 353 keys = set() 354 for doc in df: 355 for key in doc: 356 keys.add(key) 357 df = pd.DataFrame.from_dict( 358 { 359 k: [ 360 doc.get(k, None) 361 for doc in df 362 ] for k in keys 363 }, 364 npartitions = npartitions, 365 ) 366 elif isinstance(df, dict): 367 df = pd.DataFrame.from_dict(df, npartitions=npartitions) 368 elif 'pandas.core.frame.DataFrame' in str(type(df)): 369 df = pd.from_pandas(df, npartitions=npartitions) 370 else: 371 raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.") 372 pandas = attempt_import('pandas') 373 pdf = get_first_valid_dask_partition(df) 374 375 else: 376 df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend) 377 pdf = df 378 379 ### skip parsing if DataFrame is empty 380 if len(pdf) == 0: 381 if debug: 382 dprint(f"df is empty. Returning original DataFrame without casting datetime columns...") 383 return df 384 385 ignore_cols = set( 386 (ignore_cols or []) + [ 387 col 388 for col, dtype in pdf.dtypes.items() 389 if 'datetime' in str(dtype) 390 ] 391 ) 392 cols_to_inspect = [col for col in pdf.columns if col not in ignore_cols] 393 394 if len(cols_to_inspect) == 0: 395 if debug: 396 dprint(f"All columns are ignored, skipping datetime detection...") 397 return df 398 399 ### apply regex to columns to determine which are ISO datetimes 400 iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+' 401 dt_mask = pdf[cols_to_inspect].astype(str).apply( 402 lambda s: s.str.match(iso_dt_regex).all() 403 ) 404 405 ### list of datetime column names 406 datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]] 407 if not datetime_cols: 408 if debug: 409 dprint("No columns detected as datetimes, returning...") 410 return df 411 412 if debug: 413 dprint("Converting columns to datetimes: " + str(datetime_cols)) 414 415 try: 416 if not using_dask: 417 df[datetime_cols] = df[datetime_cols].apply(pd.to_datetime, utc=True) 418 else: 419 df[datetime_cols] = df[datetime_cols].apply( 420 pd.to_datetime, 421 utc = True, 422 axis = 1, 423 meta = { 424 col: 'datetime64[ns]' 425 for col in datetime_cols 426 } 427 ) 428 except Exception as e: 429 warn( 430 f"Unable to apply `pd.to_datetime` to {items_str(datetime_cols)}:\n" 431 + f"{traceback.format_exc()}" 432 ) 433 434 for dt in datetime_cols: 435 try: 436 df[dt] = df[dt].dt.tz_localize(None) 437 except Exception as e: 438 warn(f"Unable to convert column '{dt}' to naive datetime:\n{traceback.format_exc()}") 439 440 return df 441 442 443def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]: 444 """ 445 Get the columns which contain unhashable objects from a Pandas DataFrame. 446 447 Parameters 448 ---------- 449 df: pd.DataFrame 450 The DataFrame which may contain unhashable objects. 451 452 Returns 453 ------- 454 A list of columns. 455 """ 456 if len(df) == 0: 457 return [] 458 459 is_dask = 'dask' in df.__module__ 460 if is_dask: 461 from meerschaum.utils.packages import attempt_import 462 pandas = attempt_import('pandas') 463 df = pandas.DataFrame(get_first_valid_dask_partition(df)) 464 return [ 465 col for col, val in df.iloc[0].items() 466 if not isinstance(val, Hashable) 467 ] 468 469 470def get_json_cols(df: 'pd.DataFrame') -> List[str]: 471 """ 472 Get the columns which contain unhashable objects from a Pandas DataFrame. 473 474 Parameters 475 ---------- 476 df: pd.DataFrame 477 The DataFrame which may contain unhashable objects. 478 479 Returns 480 ------- 481 A list of columns to be encoded as JSON. 482 """ 483 is_dask = 'dask' in df.__module__ 484 if is_dask: 485 df = get_first_valid_dask_partition(df) 486 487 if len(df) == 0: 488 return [] 489 490 cols_indices = { 491 col: df[col].first_valid_index() 492 for col in df.columns 493 } 494 return [ 495 col 496 for col, ix in cols_indices.items() 497 if ( 498 ix is not None 499 and 500 not isinstance(df.loc[ix][col], Hashable) 501 ) 502 ] 503 504 505def get_numeric_cols(df: 'pd.DataFrame') -> List[str]: 506 """ 507 Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame. 508 509 Parameters 510 ---------- 511 df: pd.DataFrame 512 The DataFrame which may contain decimal objects. 513 514 Returns 515 ------- 516 A list of columns to treat as numerics. 517 """ 518 from decimal import Decimal 519 is_dask = 'dask' in df.__module__ 520 if is_dask: 521 df = get_first_valid_dask_partition(df) 522 523 if len(df) == 0: 524 return [] 525 526 cols_indices = { 527 col: df[col].first_valid_index() 528 for col in df.columns 529 } 530 return [ 531 col 532 for col, ix in cols_indices.items() 533 if ( 534 ix is not None 535 and 536 isinstance(df.loc[ix][col], Decimal) 537 ) 538 ] 539 540 541def enforce_dtypes( 542 df: 'pd.DataFrame', 543 dtypes: Dict[str, str], 544 safe_copy: bool = True, 545 coerce_numeric: bool = True, 546 debug: bool = False, 547 ) -> 'pd.DataFrame': 548 """ 549 Enforce the `dtypes` dictionary on a DataFrame. 550 551 Parameters 552 ---------- 553 df: pd.DataFrame 554 The DataFrame on which to enforce dtypes. 555 556 dtypes: Dict[str, str] 557 The data types to attempt to enforce on the DataFrame. 558 559 safe_copy: bool, default True 560 If `True`, create a copy before comparing and modifying the dataframes. 561 Setting to `False` may mutate the DataFrames. 562 See `meerschaum.utils.dataframe.filter_unseen_df`. 563 564 coerce_numeric: bool, default True 565 If `True`, convert float and int collisions to numeric. 566 567 debug: bool, default False 568 Verbosity toggle. 569 570 Returns 571 ------- 572 The Pandas DataFrame with the types enforced. 573 """ 574 import json 575 import traceback 576 from decimal import Decimal 577 from meerschaum.utils.debug import dprint 578 from meerschaum.utils.warnings import warn 579 from meerschaum.utils.formatting import pprint 580 from meerschaum.config.static import STATIC_CONFIG 581 from meerschaum.utils.packages import import_pandas 582 from meerschaum.utils.dtypes import ( 583 are_dtypes_equal, 584 to_pandas_dtype, 585 is_dtype_numeric, 586 attempt_cast_to_numeric, 587 ) 588 if safe_copy: 589 df = df.copy() 590 df_dtypes = {c: str(t) for c, t in df.dtypes.items()} 591 if len(df_dtypes) == 0: 592 if debug: 593 dprint("Incoming DataFrame has no columns. Skipping enforcement...") 594 return df 595 596 pipe_pandas_dtypes = { 597 col: to_pandas_dtype(typ) 598 for col, typ in dtypes.items() 599 } 600 json_cols = [ 601 col 602 for col, typ in dtypes.items() 603 if typ == 'json' 604 ] 605 numeric_cols = [ 606 col 607 for col, typ in dtypes.items() 608 if typ == 'numeric' 609 ] 610 df_numeric_cols = get_numeric_cols(df) 611 if debug: 612 dprint(f"Desired data types:") 613 pprint(dtypes) 614 dprint(f"Data types for incoming DataFrame:") 615 pprint(df_dtypes) 616 617 if json_cols and len(df) > 0: 618 if debug: 619 dprint(f"Checking columns for JSON encoding: {json_cols}") 620 for col in json_cols: 621 if col in df.columns: 622 try: 623 df[col] = df[col].apply( 624 ( 625 lambda x: ( 626 json.loads(x) 627 if isinstance(x, str) 628 else x 629 ) 630 ) 631 ) 632 except Exception as e: 633 if debug: 634 dprint(f"Unable to parse column '{col}' as JSON:\n{e}") 635 636 if numeric_cols: 637 if debug: 638 dprint(f"Checking for numerics: {numeric_cols}") 639 for col in numeric_cols: 640 if col in df.columns: 641 try: 642 df[col] = df[col].apply(attempt_cast_to_numeric) 643 except Exception as e: 644 if debug: 645 dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}") 646 647 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 648 if debug: 649 dprint(f"Data types match. Exiting enforcement...") 650 return df 651 652 common_dtypes = {} 653 common_diff_dtypes = {} 654 for col, typ in pipe_pandas_dtypes.items(): 655 if col in df_dtypes: 656 common_dtypes[col] = typ 657 if not are_dtypes_equal(typ, df_dtypes[col]): 658 common_diff_dtypes[col] = df_dtypes[col] 659 660 if debug: 661 dprint(f"Common columns with different dtypes:") 662 pprint(common_diff_dtypes) 663 664 detected_dt_cols = {} 665 for col, typ in common_diff_dtypes.items(): 666 if 'datetime' in typ and 'datetime' in common_dtypes[col]: 667 df_dtypes[col] = typ 668 detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col]) 669 for col in detected_dt_cols: 670 del common_diff_dtypes[col] 671 672 if debug: 673 dprint(f"Common columns with different dtypes (after dates):") 674 pprint(common_diff_dtypes) 675 676 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 677 if debug: 678 dprint( 679 "The incoming DataFrame has mostly the same types, skipping enforcement." 680 + f"The only detected difference was in the following datetime columns.\n" 681 + " Timezone information may be stripped." 682 ) 683 pprint(detected_dt_cols) 684 return df 685 686 for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items(): 687 previous_typ = common_dtypes[col] 688 mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ)) 689 explicitly_float = are_dtypes_equal(dtypes.get(col, 'object'), 'float') 690 explicitly_numeric = dtypes.get(col, 'numeric') == 'numeric' 691 cast_to_numeric = ( 692 explicitly_numeric 693 or col in df_numeric_cols 694 or (mixed_numeric_types and not explicitly_float) 695 ) and coerce_numeric 696 if cast_to_numeric: 697 common_dtypes[col] = attempt_cast_to_numeric 698 common_diff_dtypes[col] = attempt_cast_to_numeric 699 700 for d in common_diff_dtypes: 701 t = common_dtypes[d] 702 if debug: 703 dprint(f"Casting column {d} to dtype {t}.") 704 try: 705 df[d] = ( 706 df[d].apply(t) 707 if callable(t) 708 else df[d].astype(t) 709 ) 710 except Exception as e: 711 if debug: 712 dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}") 713 if 'int' in str(t).lower(): 714 try: 715 df[d] = df[d].astype('float64').astype(t) 716 except Exception as e: 717 if debug: 718 dprint(f"Was unable to convert to float then {t}.") 719 return df 720 721 722def get_datetime_bound_from_df( 723 df: Union['pd.DataFrame', dict, list], 724 datetime_column: str, 725 minimum: bool = True, 726 ) -> Union[int, 'datetime.datetime', None]: 727 """ 728 Return the minimum or maximum datetime (or integer) from a DataFrame. 729 730 Parameters 731 ---------- 732 df: pd.DataFrame 733 The DataFrame, list, or dict which contains the range axis. 734 735 datetime_column: str 736 The name of the datetime (or int) column. 737 738 minimum: bool 739 Whether to return the minimum (default) or maximum value. 740 741 Returns 742 ------- 743 The minimum or maximum datetime value in the dataframe, or `None`. 744 """ 745 if not datetime_column: 746 return None 747 748 def compare(a, b): 749 if a is None: 750 return b 751 if b is None: 752 return a 753 if minimum: 754 return a if a < b else b 755 return a if a > b else b 756 757 if isinstance(df, list): 758 if len(df) == 0: 759 return None 760 best_yet = df[0].get(datetime_column, None) 761 for doc in df: 762 val = doc.get(datetime_column, None) 763 best_yet = compare(best_yet, val) 764 return best_yet 765 766 if isinstance(df, dict): 767 if datetime_column not in df: 768 return None 769 best_yet = df[datetime_column][0] 770 for val in df[datetime_column]: 771 best_yet = compare(best_yet, val) 772 return best_yet 773 774 if 'DataFrame' in str(type(df)): 775 if datetime_column not in df.columns: 776 return None 777 return ( 778 df[datetime_column].min(skipna=True) 779 if minimum 780 else df[datetime_column].max(skipna=True) 781 ) 782 783 return None 784 785 786def df_is_chunk_generator(df: Any) -> bool: 787 """ 788 Determine whether to treat `df` as a chunk generator. 789 790 Note this should only be used in a context where generators are expected, 791 as it will return `True` for any iterable. 792 793 Parameters 794 ---------- 795 The DataFrame or chunk generator to evaluate. 796 797 Returns 798 ------- 799 A `bool` indicating whether to treat `df` as a generator. 800 """ 801 return ( 802 not isinstance(df, (dict, list, str)) 803 and 'DataFrame' not in str(type(df)) 804 and isinstance(df, (Generator, Iterable, Iterator)) 805 ) 806 807 808def chunksize_to_npartitions(chunksize: Optional[int]) -> int: 809 """ 810 Return the Dask `npartitions` value for a given `chunksize`. 811 """ 812 if chunksize == -1: 813 from meerschaum.config import get_config 814 chunksize = get_config('system', 'connectors', 'sql', 'chunksize') 815 if chunksize is None: 816 return 1 817 return -1 * chunksize 818 819 820def df_from_literal( 821 pipe: Optional['meerschaum.Pipe'] = None, 822 literal: str = None, 823 debug: bool = False 824 ) -> 'pd.DataFrame': 825 """ 826 Construct a dataframe from a literal value, using the pipe's datetime and value column names. 827 828 Parameters 829 ---------- 830 pipe: Optional['meerschaum.Pipe'], default None 831 The pipe which will consume the literal value. 832 833 Returns 834 ------- 835 A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns 836 and the literal as the value. 837 """ 838 from meerschaum.utils.packages import import_pandas 839 from meerschaum.utils.warnings import error, warn 840 from meerschaum.utils.debug import dprint 841 842 if pipe is None or literal is None: 843 error("Please provide a Pipe and a literal value") 844 ### this will raise an error if the columns are undefined 845 dt_name, val_name = pipe.get_columns('datetime', 'value') 846 847 val = literal 848 if isinstance(literal, str): 849 if debug: 850 dprint(f"Received literal string: '{literal}'") 851 import ast 852 try: 853 val = ast.literal_eval(literal) 854 except Exception as e: 855 warn( 856 "Failed to parse value from string:\n" + f"{literal}" + 857 "\n\nWill cast as a string instead."\ 858 ) 859 val = literal 860 861 from datetime import datetime, timezone 862 now = datetime.now(timezone.utc).replace(tzinfo=None) 863 864 pd = import_pandas() 865 return pd.DataFrame({dt_name: [now], val_name: [val]}) 866 867 868def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]: 869 """ 870 Return the first valid Dask DataFrame partition (if possible). 871 """ 872 pdf = None 873 for partition in ddf.partitions: 874 try: 875 pdf = partition.compute() 876 except Exception as e: 877 continue 878 if len(pdf) > 0: 879 return pdf 880 return ddf.compute() 881 882 883def query_df( 884 df: 'pd.DataFrame', 885 params: Optional[Dict[str, Any]] = None, 886 begin: Union[datetime, int, None] = None, 887 end: Union[datetime, int, None] = None, 888 datetime_column: Optional[str] = None, 889 select_columns: Optional[List[str]] = None, 890 omit_columns: Optional[List[str]] = None, 891 inplace: bool = False, 892 reset_index: bool = False, 893 debug: bool = False, 894 ) -> 'pd.DataFrame': 895 """ 896 Query the dataframe with the params dictionary. 897 898 Parameters 899 ---------- 900 df: pd.DataFrame 901 The DataFrame to query against. 902 903 params: Optional[Dict[str, Any]], default None 904 The parameters dictionary to use for the query. 905 906 begin: Union[datetime, int, None], default None 907 If `begin` and `datetime_column` are provided, only return rows with a timestamp 908 greater than or equal to this value. 909 910 end: Union[datetime, int, None], default None 911 If `begin` and `datetime_column` are provided, only return rows with a timestamp 912 less than this value. 913 914 datetime_column: Optional[str], default None 915 A `datetime_column` must be provided to use `begin` and `end`. 916 917 select_columns: Optional[List[str]], default None 918 If provided, only return these columns. 919 920 omit_columns: Optional[List[str]], default None 921 If provided, do not include these columns in the result. 922 923 inplace: bool, default False 924 If `True`, modify the DataFrame inplace rather than creating a new DataFrame. 925 926 reset_index: bool, default True 927 If `True`, reset the index in the resulting DataFrame. 928 929 Returns 930 ------- 931 A Pandas DataFrame query result. 932 """ 933 if not params and not begin and not end: 934 return df 935 936 import json 937 import meerschaum as mrsm 938 from meerschaum.utils.debug import dprint 939 from meerschaum.utils.misc import get_in_ex_params 940 from meerschaum.utils.warnings import warn 941 942 dtypes = {col: str(typ) for col, typ in df.dtypes.items()} 943 944 if begin or end: 945 if not datetime_column or datetime_column not in df.columns: 946 warn( 947 f"The datetime column '{datetime_column}' is not present in the Dataframe, " 948 + "ignoring begin and end...", 949 ) 950 begin, end = None, None 951 952 if debug: 953 dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}") 954 955 in_ex_params = get_in_ex_params(params) 956 957 def serialize(x: Any) -> str: 958 if isinstance(x, (dict, list, tuple)): 959 return json.dumps(x, sort_keys=True, separators=(',', ':'), default=str) 960 if hasattr(x, 'isoformat'): 961 return x.isoformat() 962 return str(x) 963 964 masks = [ 965 ( 966 (df[datetime_column] >= begin) 967 if begin is not None and datetime_column 968 else True 969 ) & ( 970 (df[datetime_column] < end) 971 if end is not None and datetime_column 972 else True 973 ) 974 ] 975 976 masks.extend([ 977 ( 978 ( 979 df[col].apply(serialize).isin( 980 [ 981 serialize(_in_val) 982 for _in_val in in_vals 983 ] 984 ) if in_vals else True 985 ) & ( 986 ~df[col].apply(serialize).isin( 987 [ 988 serialize(_ex_val) 989 for _ex_val in ex_vals 990 ] 991 ) if ex_vals else True 992 ) 993 ) 994 for col, (in_vals, ex_vals) in in_ex_params.items() 995 if col in df.columns 996 ]) 997 query_mask = masks[0] 998 for mask in masks: 999 query_mask = query_mask & mask 1000 1001 if inplace: 1002 df.where(query_mask, inplace=inplace) 1003 df.dropna(how='all', inplace=inplace) 1004 result_df = df 1005 else: 1006 result_df = df.where(query_mask).dropna(how='all') 1007 1008 if reset_index: 1009 result_df.reset_index(drop=True, inplace=True) 1010 1011 result_df = enforce_dtypes( 1012 result_df, 1013 dtypes, 1014 safe_copy = (not inplace), 1015 debug = debug, 1016 coerce_numeric = False, 1017 ) 1018 1019 if select_columns == ['*']: 1020 select_columns = None 1021 1022 if not select_columns and not omit_columns: 1023 return result_df 1024 1025 if select_columns: 1026 for col in list(result_df.columns): 1027 if col not in select_columns: 1028 del result_df[col] 1029 return result_df 1030 1031 if omit_columns: 1032 for col in list(result_df.columns): 1033 if col in omit_columns: 1034 del result_df[col] 1035 if debug: 1036 dprint(f"{dtypes=}") 1037 return result_df
18def add_missing_cols_to_df(df: 'pd.DataFrame', dtypes: Dict[str, Any]) -> pd.DataFrame: 19 """ 20 Add columns from the dtypes dictionary as null columns to a new DataFrame. 21 22 Parameters 23 ---------- 24 df: pd.DataFrame 25 The dataframe we should copy and add null columns. 26 27 dtypes: 28 The data types dictionary which may contain keys not present in `df.columns`. 29 30 Returns 31 ------- 32 A new `DataFrame` with the keys from `dtypes` added as null columns. 33 If `df.dtypes` is the same as `dtypes`, then return a reference to `df`. 34 NOTE: This will not ensure that dtypes are enforced! 35 36 Examples 37 -------- 38 >>> import pandas as pd 39 >>> df = pd.DataFrame([{'a': 1}]) 40 >>> dtypes = {'b': 'Int64'} 41 >>> add_missing_cols_to_df(df, dtypes) 42 a b 43 0 1 <NA> 44 >>> add_missing_cols_to_df(df, dtypes).dtypes 45 a int64 46 b Int64 47 dtype: object 48 >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes 49 a int64 50 dtype: object 51 >>> 52 """ 53 if set(df.columns) == set(dtypes): 54 return df 55 56 import traceback 57 from meerschaum.utils.packages import import_pandas, attempt_import 58 from meerschaum.utils.warnings import warn 59 from meerschaum.utils.dtypes import to_pandas_dtype 60 pandas = attempt_import('pandas') 61 62 def build_series(dtype: str): 63 return pandas.Series([], dtype=to_pandas_dtype(dtype)) 64 65 assign_kwargs = { 66 str(col): build_series(str(typ)) 67 for col, typ in dtypes.items() 68 if col not in df.columns 69 } 70 return df.assign(**assign_kwargs)
Add columns from the dtypes dictionary as null columns to a new DataFrame.
Parameters
- df (pd.DataFrame): The dataframe we should copy and add null columns.
- dtypes:: The data types dictionary which may contain keys not present in
df.columns
.
Returns
- A new
DataFrame
with the keys fromdtypes
added as null columns. - If
df.dtypes
is the same asdtypes
, then return a reference todf
. - NOTE (This will not ensure that dtypes are enforced!):
Examples
>>> import pandas as pd
>>> df = pd.DataFrame([{'a': 1}])
>>> dtypes = {'b': 'Int64'}
>>> add_missing_cols_to_df(df, dtypes)
a b
0 1 <NA>
>>> add_missing_cols_to_df(df, dtypes)meerschaum.utils.dtypes
a int64
b Int64
dtype: object
>>> add_missing_cols_to_df(df, {'a': 'object'})meerschaum.utils.dtypes
a int64
dtype: object
>>>
73def filter_unseen_df( 74 old_df: 'pd.DataFrame', 75 new_df: 'pd.DataFrame', 76 safe_copy: bool = True, 77 dtypes: Optional[Dict[str, Any]] = None, 78 debug: bool = False, 79 ) -> 'pd.DataFrame': 80 """ 81 Left join two DataFrames to find the newest unseen data. 82 83 Parameters 84 ---------- 85 old_df: 'pd.DataFrame' 86 The original (target) dataframe. Acts as a filter on the `new_df`. 87 88 new_df: 'pd.DataFrame' 89 The fetched (source) dataframe. Rows that are contained in `old_df` are removed. 90 91 safe_copy: bool, default True 92 If `True`, create a copy before comparing and modifying the dataframes. 93 Setting to `False` may mutate the DataFrames. 94 95 dtypes: Optional[Dict[str, Any]], default None 96 Optionally specify the datatypes of the dataframe. 97 98 debug: bool, default False 99 Verbosity toggle. 100 101 Returns 102 ------- 103 A pandas dataframe of the new, unseen rows in `new_df`. 104 105 Examples 106 -------- 107 ```python 108 >>> import pandas as pd 109 >>> df1 = pd.DataFrame({'a': [1,2]}) 110 >>> df2 = pd.DataFrame({'a': [2,3]}) 111 >>> filter_unseen_df(df1, df2) 112 a 113 0 3 114 115 ``` 116 117 """ 118 if old_df is None: 119 return new_df 120 121 if safe_copy: 122 old_df = old_df.copy() 123 new_df = new_df.copy() 124 125 import json 126 import functools 127 import traceback 128 from decimal import Decimal 129 from meerschaum.utils.warnings import warn 130 from meerschaum.utils.packages import import_pandas, attempt_import 131 from meerschaum.utils.dtypes import to_pandas_dtype, are_dtypes_equal, attempt_cast_to_numeric 132 from meerschaum.utils.debug import dprint 133 pd = import_pandas(debug=debug) 134 is_dask = 'dask' in new_df.__module__ 135 if is_dask: 136 pandas = attempt_import('pandas') 137 dd = attempt_import('dask.dataframe') 138 merge = dd.merge 139 NA = pandas.NA 140 else: 141 merge = pd.merge 142 NA = pd.NA 143 144 new_df_dtypes = dict(new_df.dtypes) 145 old_df_dtypes = dict(old_df.dtypes) 146 147 same_cols = set(new_df.columns) == set(old_df.columns) 148 if not same_cols: 149 new_df = add_missing_cols_to_df(new_df, old_df_dtypes) 150 old_df = add_missing_cols_to_df(old_df, new_df_dtypes) 151 152 ### Edge case: two empty lists cast to DFs. 153 elif len(new_df.columns) == 0: 154 return new_df 155 156 try: 157 ### Order matters when checking equality. 158 new_df = new_df[old_df.columns] 159 except Exception as e: 160 warn( 161 "Was not able to cast old columns onto new DataFrame. " + 162 f"Are both DataFrames the same shape? Error:\n{e}", 163 stacklevel = 3, 164 ) 165 return new_df[list(new_df_dtypes.keys())] 166 167 ### assume the old_df knows what it's doing, even if it's technically wrong. 168 if dtypes is None: 169 dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()} 170 171 dtypes = { 172 col: to_pandas_dtype(typ) 173 for col, typ in dtypes.items() 174 if col in new_df_dtypes and col in old_df_dtypes 175 } 176 for col, typ in new_df_dtypes.items(): 177 if col not in dtypes: 178 dtypes[col] = typ 179 180 cast_cols = True 181 try: 182 new_df = new_df.astype(dtypes) 183 cast_cols = False 184 except Exception as e: 185 warn( 186 f"Was not able to cast the new DataFrame to the given dtypes.\n{e}" 187 ) 188 189 new_numeric_cols_existing = get_numeric_cols(new_df) 190 old_numeric_cols = get_numeric_cols(old_df) 191 for col, typ in {k: v for k, v in dtypes.items()}.items(): 192 if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')): 193 new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float') 194 new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int') 195 new_is_numeric = col in new_numeric_cols_existing 196 old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float') 197 old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int') 198 old_is_numeric = col in old_numeric_cols 199 200 if ( 201 (new_is_float or new_is_int or new_is_numeric) 202 and 203 (old_is_float or old_is_int or old_is_numeric) 204 ): 205 dtypes[col] = attempt_cast_to_numeric 206 cast_cols = True 207 continue 208 209 ### Fallback to object if the types don't match. 210 warn( 211 f"Detected different types for '{col}' " 212 + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), " 213 + "falling back to 'object'..." 214 ) 215 dtypes[col] = 'object' 216 cast_cols = True 217 218 if cast_cols: 219 for col, dtype in dtypes.items(): 220 if col in new_df.columns: 221 try: 222 new_df[col] = ( 223 new_df[col].astype(dtype) 224 if not callable(dtype) 225 else new_df[col].apply(dtype) 226 ) 227 except Exception as e: 228 warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}") 229 230 serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str) 231 new_json_cols = get_json_cols(new_df) 232 old_json_cols = get_json_cols(old_df) 233 json_cols = set(new_json_cols + old_json_cols) 234 for json_col in old_json_cols: 235 old_df[json_col] = old_df[json_col].apply(serializer) 236 for json_col in new_json_cols: 237 new_df[json_col] = new_df[json_col].apply(serializer) 238 239 new_numeric_cols = get_numeric_cols(new_df) 240 numeric_cols = set(new_numeric_cols + old_numeric_cols) 241 for numeric_col in old_numeric_cols: 242 old_df[numeric_col] = old_df[numeric_col].apply( 243 lambda x: f'{x:f}' if isinstance(x, Decimal) else x 244 ) 245 for numeric_col in new_numeric_cols: 246 new_df[numeric_col] = new_df[numeric_col].apply( 247 lambda x: f'{x:f}' if isinstance(x, Decimal) else x 248 ) 249 250 joined_df = merge( 251 new_df.fillna(NA), 252 old_df.fillna(NA), 253 how = 'left', 254 on = None, 255 indicator = True, 256 ) 257 changed_rows_mask = (joined_df['_merge'] == 'left_only') 258 delta_df = joined_df[list(new_df_dtypes.keys())][changed_rows_mask].reset_index(drop=True) 259 260 for json_col in json_cols: 261 if json_col not in delta_df.columns: 262 continue 263 try: 264 delta_df[json_col] = delta_df[json_col].apply(json.loads) 265 except Exception as e: 266 warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}") 267 268 for numeric_col in numeric_cols: 269 if numeric_col not in delta_df.columns: 270 continue 271 try: 272 delta_df[numeric_col] = delta_df[numeric_col].apply(attempt_cast_to_numeric) 273 except Exception as e: 274 warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}") 275 276 return delta_df
Left join two DataFrames to find the newest unseen data.
Parameters
- old_df ('pd.DataFrame'):
The original (target) dataframe. Acts as a filter on the
new_df
. - new_df ('pd.DataFrame'):
The fetched (source) dataframe. Rows that are contained in
old_df
are removed. - safe_copy (bool, default True):
If
True
, create a copy before comparing and modifying the dataframes. Setting toFalse
may mutate the DataFrames. - dtypes (Optional[Dict[str, Any]], default None): Optionally specify the datatypes of the dataframe.
- debug (bool, default False): Verbosity toggle.
Returns
- A pandas dataframe of the new, unseen rows in
new_df
.
Examples
>>> import pandas as pd
>>> df1 = pd.DataFrame({'a': [1,2]})
>>> df2 = pd.DataFrame({'a': [2,3]})
>>> filter_unseen_df(df1, df2)
a
0 3
279def parse_df_datetimes( 280 df: 'pd.DataFrame', 281 ignore_cols: Optional[Iterable[str]] = None, 282 chunksize: Optional[int] = None, 283 dtype_backend: str = 'numpy_nullable', 284 debug: bool = False, 285 ) -> 'pd.DataFrame': 286 """ 287 Parse a pandas DataFrame for datetime columns and cast as datetimes. 288 289 Parameters 290 ---------- 291 df: pd.DataFrame 292 The pandas DataFrame to parse. 293 294 ignore_cols: Optional[Iterable[str]], default None 295 If provided, do not attempt to coerce these columns as datetimes. 296 297 chunksize: Optional[int], default None 298 If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe. 299 300 dtype_backend: str, default 'numpy_nullable' 301 If `df` is not a DataFrame and new one needs to be constructed, 302 use this as the datatypes backend. 303 Accepted values are 'numpy_nullable' and 'pyarrow'. 304 305 debug: bool, default False 306 Verbosity toggle. 307 308 Returns 309 ------- 310 A new pandas DataFrame with the determined datetime columns 311 (usually ISO strings) cast as datetimes. 312 313 Examples 314 -------- 315 ```python 316 >>> import pandas as pd 317 >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 318 >>> df.dtypes 319 a object 320 dtype: object 321 >>> df = parse_df_datetimes(df) 322 >>> df.dtypes 323 a datetime64[ns] 324 dtype: object 325 326 ``` 327 328 """ 329 from meerschaum.utils.packages import import_pandas, attempt_import 330 from meerschaum.utils.debug import dprint 331 from meerschaum.utils.warnings import warn 332 import traceback 333 pd = import_pandas() 334 pandas = attempt_import('pandas') 335 pd_name = pd.__name__ 336 using_dask = 'dask' in pd_name 337 df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__) 338 dask_dataframe = None 339 if using_dask or df_is_dask: 340 npartitions = chunksize_to_npartitions(chunksize) 341 dask_dataframe = attempt_import('dask.dataframe') 342 343 ### if df is a dict, build DataFrame 344 if isinstance(df, pandas.DataFrame): 345 pdf = df 346 elif df_is_dask and isinstance(df, dask_dataframe.DataFrame): 347 pdf = get_first_valid_dask_partition(df) 348 else: 349 if debug: 350 dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...") 351 352 if using_dask: 353 if isinstance(df, list): 354 keys = set() 355 for doc in df: 356 for key in doc: 357 keys.add(key) 358 df = pd.DataFrame.from_dict( 359 { 360 k: [ 361 doc.get(k, None) 362 for doc in df 363 ] for k in keys 364 }, 365 npartitions = npartitions, 366 ) 367 elif isinstance(df, dict): 368 df = pd.DataFrame.from_dict(df, npartitions=npartitions) 369 elif 'pandas.core.frame.DataFrame' in str(type(df)): 370 df = pd.from_pandas(df, npartitions=npartitions) 371 else: 372 raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.") 373 pandas = attempt_import('pandas') 374 pdf = get_first_valid_dask_partition(df) 375 376 else: 377 df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend) 378 pdf = df 379 380 ### skip parsing if DataFrame is empty 381 if len(pdf) == 0: 382 if debug: 383 dprint(f"df is empty. Returning original DataFrame without casting datetime columns...") 384 return df 385 386 ignore_cols = set( 387 (ignore_cols or []) + [ 388 col 389 for col, dtype in pdf.dtypes.items() 390 if 'datetime' in str(dtype) 391 ] 392 ) 393 cols_to_inspect = [col for col in pdf.columns if col not in ignore_cols] 394 395 if len(cols_to_inspect) == 0: 396 if debug: 397 dprint(f"All columns are ignored, skipping datetime detection...") 398 return df 399 400 ### apply regex to columns to determine which are ISO datetimes 401 iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+' 402 dt_mask = pdf[cols_to_inspect].astype(str).apply( 403 lambda s: s.str.match(iso_dt_regex).all() 404 ) 405 406 ### list of datetime column names 407 datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]] 408 if not datetime_cols: 409 if debug: 410 dprint("No columns detected as datetimes, returning...") 411 return df 412 413 if debug: 414 dprint("Converting columns to datetimes: " + str(datetime_cols)) 415 416 try: 417 if not using_dask: 418 df[datetime_cols] = df[datetime_cols].apply(pd.to_datetime, utc=True) 419 else: 420 df[datetime_cols] = df[datetime_cols].apply( 421 pd.to_datetime, 422 utc = True, 423 axis = 1, 424 meta = { 425 col: 'datetime64[ns]' 426 for col in datetime_cols 427 } 428 ) 429 except Exception as e: 430 warn( 431 f"Unable to apply `pd.to_datetime` to {items_str(datetime_cols)}:\n" 432 + f"{traceback.format_exc()}" 433 ) 434 435 for dt in datetime_cols: 436 try: 437 df[dt] = df[dt].dt.tz_localize(None) 438 except Exception as e: 439 warn(f"Unable to convert column '{dt}' to naive datetime:\n{traceback.format_exc()}") 440 441 return df
Parse a pandas DataFrame for datetime columns and cast as datetimes.
Parameters
- df (pd.DataFrame): The pandas DataFrame to parse.
- ignore_cols (Optional[Iterable[str]], default None): If provided, do not attempt to coerce these columns as datetimes.
- chunksize (Optional[int], default None):
If the pandas implementation is
'dask'
, use this chunksize for the distributed dataframe. - dtype_backend (str, default 'numpy_nullable'):
If
df
is not a DataFrame and new one needs to be constructed, use this as the datatypes backend. Accepted values are 'numpy_nullable' and 'pyarrow'. - debug (bool, default False): Verbosity toggle.
Returns
- A new pandas DataFrame with the determined datetime columns
- (usually ISO strings) cast as datetimes.
Examples
>>> import pandas as pd
>>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']})
>>> df.dtypes
a object
dtype: object
>>> df = parse_df_datetimes(df)
>>> df.dtypes
a datetime64[ns]
dtype: object
444def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]: 445 """ 446 Get the columns which contain unhashable objects from a Pandas DataFrame. 447 448 Parameters 449 ---------- 450 df: pd.DataFrame 451 The DataFrame which may contain unhashable objects. 452 453 Returns 454 ------- 455 A list of columns. 456 """ 457 if len(df) == 0: 458 return [] 459 460 is_dask = 'dask' in df.__module__ 461 if is_dask: 462 from meerschaum.utils.packages import attempt_import 463 pandas = attempt_import('pandas') 464 df = pandas.DataFrame(get_first_valid_dask_partition(df)) 465 return [ 466 col for col, val in df.iloc[0].items() 467 if not isinstance(val, Hashable) 468 ]
Get the columns which contain unhashable objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
- A list of columns.
471def get_json_cols(df: 'pd.DataFrame') -> List[str]: 472 """ 473 Get the columns which contain unhashable objects from a Pandas DataFrame. 474 475 Parameters 476 ---------- 477 df: pd.DataFrame 478 The DataFrame which may contain unhashable objects. 479 480 Returns 481 ------- 482 A list of columns to be encoded as JSON. 483 """ 484 is_dask = 'dask' in df.__module__ 485 if is_dask: 486 df = get_first_valid_dask_partition(df) 487 488 if len(df) == 0: 489 return [] 490 491 cols_indices = { 492 col: df[col].first_valid_index() 493 for col in df.columns 494 } 495 return [ 496 col 497 for col, ix in cols_indices.items() 498 if ( 499 ix is not None 500 and 501 not isinstance(df.loc[ix][col], Hashable) 502 ) 503 ]
Get the columns which contain unhashable objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
- A list of columns to be encoded as JSON.
506def get_numeric_cols(df: 'pd.DataFrame') -> List[str]: 507 """ 508 Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame. 509 510 Parameters 511 ---------- 512 df: pd.DataFrame 513 The DataFrame which may contain decimal objects. 514 515 Returns 516 ------- 517 A list of columns to treat as numerics. 518 """ 519 from decimal import Decimal 520 is_dask = 'dask' in df.__module__ 521 if is_dask: 522 df = get_first_valid_dask_partition(df) 523 524 if len(df) == 0: 525 return [] 526 527 cols_indices = { 528 col: df[col].first_valid_index() 529 for col in df.columns 530 } 531 return [ 532 col 533 for col, ix in cols_indices.items() 534 if ( 535 ix is not None 536 and 537 isinstance(df.loc[ix][col], Decimal) 538 ) 539 ]
Get the columns which contain decimal.Decimal
objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain decimal objects.
Returns
- A list of columns to treat as numerics.
542def enforce_dtypes( 543 df: 'pd.DataFrame', 544 dtypes: Dict[str, str], 545 safe_copy: bool = True, 546 coerce_numeric: bool = True, 547 debug: bool = False, 548 ) -> 'pd.DataFrame': 549 """ 550 Enforce the `dtypes` dictionary on a DataFrame. 551 552 Parameters 553 ---------- 554 df: pd.DataFrame 555 The DataFrame on which to enforce dtypes. 556 557 dtypes: Dict[str, str] 558 The data types to attempt to enforce on the DataFrame. 559 560 safe_copy: bool, default True 561 If `True`, create a copy before comparing and modifying the dataframes. 562 Setting to `False` may mutate the DataFrames. 563 See `meerschaum.utils.dataframe.filter_unseen_df`. 564 565 coerce_numeric: bool, default True 566 If `True`, convert float and int collisions to numeric. 567 568 debug: bool, default False 569 Verbosity toggle. 570 571 Returns 572 ------- 573 The Pandas DataFrame with the types enforced. 574 """ 575 import json 576 import traceback 577 from decimal import Decimal 578 from meerschaum.utils.debug import dprint 579 from meerschaum.utils.warnings import warn 580 from meerschaum.utils.formatting import pprint 581 from meerschaum.config.static import STATIC_CONFIG 582 from meerschaum.utils.packages import import_pandas 583 from meerschaum.utils.dtypes import ( 584 are_dtypes_equal, 585 to_pandas_dtype, 586 is_dtype_numeric, 587 attempt_cast_to_numeric, 588 ) 589 if safe_copy: 590 df = df.copy() 591 df_dtypes = {c: str(t) for c, t in df.dtypes.items()} 592 if len(df_dtypes) == 0: 593 if debug: 594 dprint("Incoming DataFrame has no columns. Skipping enforcement...") 595 return df 596 597 pipe_pandas_dtypes = { 598 col: to_pandas_dtype(typ) 599 for col, typ in dtypes.items() 600 } 601 json_cols = [ 602 col 603 for col, typ in dtypes.items() 604 if typ == 'json' 605 ] 606 numeric_cols = [ 607 col 608 for col, typ in dtypes.items() 609 if typ == 'numeric' 610 ] 611 df_numeric_cols = get_numeric_cols(df) 612 if debug: 613 dprint(f"Desired data types:") 614 pprint(dtypes) 615 dprint(f"Data types for incoming DataFrame:") 616 pprint(df_dtypes) 617 618 if json_cols and len(df) > 0: 619 if debug: 620 dprint(f"Checking columns for JSON encoding: {json_cols}") 621 for col in json_cols: 622 if col in df.columns: 623 try: 624 df[col] = df[col].apply( 625 ( 626 lambda x: ( 627 json.loads(x) 628 if isinstance(x, str) 629 else x 630 ) 631 ) 632 ) 633 except Exception as e: 634 if debug: 635 dprint(f"Unable to parse column '{col}' as JSON:\n{e}") 636 637 if numeric_cols: 638 if debug: 639 dprint(f"Checking for numerics: {numeric_cols}") 640 for col in numeric_cols: 641 if col in df.columns: 642 try: 643 df[col] = df[col].apply(attempt_cast_to_numeric) 644 except Exception as e: 645 if debug: 646 dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}") 647 648 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 649 if debug: 650 dprint(f"Data types match. Exiting enforcement...") 651 return df 652 653 common_dtypes = {} 654 common_diff_dtypes = {} 655 for col, typ in pipe_pandas_dtypes.items(): 656 if col in df_dtypes: 657 common_dtypes[col] = typ 658 if not are_dtypes_equal(typ, df_dtypes[col]): 659 common_diff_dtypes[col] = df_dtypes[col] 660 661 if debug: 662 dprint(f"Common columns with different dtypes:") 663 pprint(common_diff_dtypes) 664 665 detected_dt_cols = {} 666 for col, typ in common_diff_dtypes.items(): 667 if 'datetime' in typ and 'datetime' in common_dtypes[col]: 668 df_dtypes[col] = typ 669 detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col]) 670 for col in detected_dt_cols: 671 del common_diff_dtypes[col] 672 673 if debug: 674 dprint(f"Common columns with different dtypes (after dates):") 675 pprint(common_diff_dtypes) 676 677 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 678 if debug: 679 dprint( 680 "The incoming DataFrame has mostly the same types, skipping enforcement." 681 + f"The only detected difference was in the following datetime columns.\n" 682 + " Timezone information may be stripped." 683 ) 684 pprint(detected_dt_cols) 685 return df 686 687 for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items(): 688 previous_typ = common_dtypes[col] 689 mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ)) 690 explicitly_float = are_dtypes_equal(dtypes.get(col, 'object'), 'float') 691 explicitly_numeric = dtypes.get(col, 'numeric') == 'numeric' 692 cast_to_numeric = ( 693 explicitly_numeric 694 or col in df_numeric_cols 695 or (mixed_numeric_types and not explicitly_float) 696 ) and coerce_numeric 697 if cast_to_numeric: 698 common_dtypes[col] = attempt_cast_to_numeric 699 common_diff_dtypes[col] = attempt_cast_to_numeric 700 701 for d in common_diff_dtypes: 702 t = common_dtypes[d] 703 if debug: 704 dprint(f"Casting column {d} to dtype {t}.") 705 try: 706 df[d] = ( 707 df[d].apply(t) 708 if callable(t) 709 else df[d].astype(t) 710 ) 711 except Exception as e: 712 if debug: 713 dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}") 714 if 'int' in str(t).lower(): 715 try: 716 df[d] = df[d].astype('float64').astype(t) 717 except Exception as e: 718 if debug: 719 dprint(f"Was unable to convert to float then {t}.") 720 return df
Enforce the dtypes
dictionary on a DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame on which to enforce dtypes.
- dtypes (Dict[str, str]): The data types to attempt to enforce on the DataFrame.
- safe_copy (bool, default True):
If
True
, create a copy before comparing and modifying the dataframes. Setting toFalse
may mutate the DataFrames. Seefilter_unseen_df
. - coerce_numeric (bool, default True):
If
True
, convert float and int collisions to numeric. - debug (bool, default False): Verbosity toggle.
Returns
- The Pandas DataFrame with the types enforced.
723def get_datetime_bound_from_df( 724 df: Union['pd.DataFrame', dict, list], 725 datetime_column: str, 726 minimum: bool = True, 727 ) -> Union[int, 'datetime.datetime', None]: 728 """ 729 Return the minimum or maximum datetime (or integer) from a DataFrame. 730 731 Parameters 732 ---------- 733 df: pd.DataFrame 734 The DataFrame, list, or dict which contains the range axis. 735 736 datetime_column: str 737 The name of the datetime (or int) column. 738 739 minimum: bool 740 Whether to return the minimum (default) or maximum value. 741 742 Returns 743 ------- 744 The minimum or maximum datetime value in the dataframe, or `None`. 745 """ 746 if not datetime_column: 747 return None 748 749 def compare(a, b): 750 if a is None: 751 return b 752 if b is None: 753 return a 754 if minimum: 755 return a if a < b else b 756 return a if a > b else b 757 758 if isinstance(df, list): 759 if len(df) == 0: 760 return None 761 best_yet = df[0].get(datetime_column, None) 762 for doc in df: 763 val = doc.get(datetime_column, None) 764 best_yet = compare(best_yet, val) 765 return best_yet 766 767 if isinstance(df, dict): 768 if datetime_column not in df: 769 return None 770 best_yet = df[datetime_column][0] 771 for val in df[datetime_column]: 772 best_yet = compare(best_yet, val) 773 return best_yet 774 775 if 'DataFrame' in str(type(df)): 776 if datetime_column not in df.columns: 777 return None 778 return ( 779 df[datetime_column].min(skipna=True) 780 if minimum 781 else df[datetime_column].max(skipna=True) 782 ) 783 784 return None
Return the minimum or maximum datetime (or integer) from a DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame, list, or dict which contains the range axis.
- datetime_column (str): The name of the datetime (or int) column.
- minimum (bool): Whether to return the minimum (default) or maximum value.
Returns
- The minimum or maximum datetime value in the dataframe, or
None
.
787def df_is_chunk_generator(df: Any) -> bool: 788 """ 789 Determine whether to treat `df` as a chunk generator. 790 791 Note this should only be used in a context where generators are expected, 792 as it will return `True` for any iterable. 793 794 Parameters 795 ---------- 796 The DataFrame or chunk generator to evaluate. 797 798 Returns 799 ------- 800 A `bool` indicating whether to treat `df` as a generator. 801 """ 802 return ( 803 not isinstance(df, (dict, list, str)) 804 and 'DataFrame' not in str(type(df)) 805 and isinstance(df, (Generator, Iterable, Iterator)) 806 )
Determine whether to treat df
as a chunk generator.
Note this should only be used in a context where generators are expected,
as it will return True
for any iterable.
Parameters
- The DataFrame or chunk generator to evaluate.
Returns
- A
bool
indicating whether to treatdf
as a generator.
809def chunksize_to_npartitions(chunksize: Optional[int]) -> int: 810 """ 811 Return the Dask `npartitions` value for a given `chunksize`. 812 """ 813 if chunksize == -1: 814 from meerschaum.config import get_config 815 chunksize = get_config('system', 'connectors', 'sql', 'chunksize') 816 if chunksize is None: 817 return 1 818 return -1 * chunksize
Return the Dask npartitions
value for a given chunksize
.
821def df_from_literal( 822 pipe: Optional['meerschaum.Pipe'] = None, 823 literal: str = None, 824 debug: bool = False 825 ) -> 'pd.DataFrame': 826 """ 827 Construct a dataframe from a literal value, using the pipe's datetime and value column names. 828 829 Parameters 830 ---------- 831 pipe: Optional['meerschaum.Pipe'], default None 832 The pipe which will consume the literal value. 833 834 Returns 835 ------- 836 A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns 837 and the literal as the value. 838 """ 839 from meerschaum.utils.packages import import_pandas 840 from meerschaum.utils.warnings import error, warn 841 from meerschaum.utils.debug import dprint 842 843 if pipe is None or literal is None: 844 error("Please provide a Pipe and a literal value") 845 ### this will raise an error if the columns are undefined 846 dt_name, val_name = pipe.get_columns('datetime', 'value') 847 848 val = literal 849 if isinstance(literal, str): 850 if debug: 851 dprint(f"Received literal string: '{literal}'") 852 import ast 853 try: 854 val = ast.literal_eval(literal) 855 except Exception as e: 856 warn( 857 "Failed to parse value from string:\n" + f"{literal}" + 858 "\n\nWill cast as a string instead."\ 859 ) 860 val = literal 861 862 from datetime import datetime, timezone 863 now = datetime.now(timezone.utc).replace(tzinfo=None) 864 865 pd = import_pandas() 866 return pd.DataFrame({dt_name: [now], val_name: [val]})
Construct a dataframe from a literal value, using the pipe's datetime and value column names.
Parameters
- pipe (Optional['meerschaum.Pipe'], default None): The pipe which will consume the literal value.
Returns
- A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
- and the literal as the value.
869def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]: 870 """ 871 Return the first valid Dask DataFrame partition (if possible). 872 """ 873 pdf = None 874 for partition in ddf.partitions: 875 try: 876 pdf = partition.compute() 877 except Exception as e: 878 continue 879 if len(pdf) > 0: 880 return pdf 881 return ddf.compute()
Return the first valid Dask DataFrame partition (if possible).
884def query_df( 885 df: 'pd.DataFrame', 886 params: Optional[Dict[str, Any]] = None, 887 begin: Union[datetime, int, None] = None, 888 end: Union[datetime, int, None] = None, 889 datetime_column: Optional[str] = None, 890 select_columns: Optional[List[str]] = None, 891 omit_columns: Optional[List[str]] = None, 892 inplace: bool = False, 893 reset_index: bool = False, 894 debug: bool = False, 895 ) -> 'pd.DataFrame': 896 """ 897 Query the dataframe with the params dictionary. 898 899 Parameters 900 ---------- 901 df: pd.DataFrame 902 The DataFrame to query against. 903 904 params: Optional[Dict[str, Any]], default None 905 The parameters dictionary to use for the query. 906 907 begin: Union[datetime, int, None], default None 908 If `begin` and `datetime_column` are provided, only return rows with a timestamp 909 greater than or equal to this value. 910 911 end: Union[datetime, int, None], default None 912 If `begin` and `datetime_column` are provided, only return rows with a timestamp 913 less than this value. 914 915 datetime_column: Optional[str], default None 916 A `datetime_column` must be provided to use `begin` and `end`. 917 918 select_columns: Optional[List[str]], default None 919 If provided, only return these columns. 920 921 omit_columns: Optional[List[str]], default None 922 If provided, do not include these columns in the result. 923 924 inplace: bool, default False 925 If `True`, modify the DataFrame inplace rather than creating a new DataFrame. 926 927 reset_index: bool, default True 928 If `True`, reset the index in the resulting DataFrame. 929 930 Returns 931 ------- 932 A Pandas DataFrame query result. 933 """ 934 if not params and not begin and not end: 935 return df 936 937 import json 938 import meerschaum as mrsm 939 from meerschaum.utils.debug import dprint 940 from meerschaum.utils.misc import get_in_ex_params 941 from meerschaum.utils.warnings import warn 942 943 dtypes = {col: str(typ) for col, typ in df.dtypes.items()} 944 945 if begin or end: 946 if not datetime_column or datetime_column not in df.columns: 947 warn( 948 f"The datetime column '{datetime_column}' is not present in the Dataframe, " 949 + "ignoring begin and end...", 950 ) 951 begin, end = None, None 952 953 if debug: 954 dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}") 955 956 in_ex_params = get_in_ex_params(params) 957 958 def serialize(x: Any) -> str: 959 if isinstance(x, (dict, list, tuple)): 960 return json.dumps(x, sort_keys=True, separators=(',', ':'), default=str) 961 if hasattr(x, 'isoformat'): 962 return x.isoformat() 963 return str(x) 964 965 masks = [ 966 ( 967 (df[datetime_column] >= begin) 968 if begin is not None and datetime_column 969 else True 970 ) & ( 971 (df[datetime_column] < end) 972 if end is not None and datetime_column 973 else True 974 ) 975 ] 976 977 masks.extend([ 978 ( 979 ( 980 df[col].apply(serialize).isin( 981 [ 982 serialize(_in_val) 983 for _in_val in in_vals 984 ] 985 ) if in_vals else True 986 ) & ( 987 ~df[col].apply(serialize).isin( 988 [ 989 serialize(_ex_val) 990 for _ex_val in ex_vals 991 ] 992 ) if ex_vals else True 993 ) 994 ) 995 for col, (in_vals, ex_vals) in in_ex_params.items() 996 if col in df.columns 997 ]) 998 query_mask = masks[0] 999 for mask in masks: 1000 query_mask = query_mask & mask 1001 1002 if inplace: 1003 df.where(query_mask, inplace=inplace) 1004 df.dropna(how='all', inplace=inplace) 1005 result_df = df 1006 else: 1007 result_df = df.where(query_mask).dropna(how='all') 1008 1009 if reset_index: 1010 result_df.reset_index(drop=True, inplace=True) 1011 1012 result_df = enforce_dtypes( 1013 result_df, 1014 dtypes, 1015 safe_copy = (not inplace), 1016 debug = debug, 1017 coerce_numeric = False, 1018 ) 1019 1020 if select_columns == ['*']: 1021 select_columns = None 1022 1023 if not select_columns and not omit_columns: 1024 return result_df 1025 1026 if select_columns: 1027 for col in list(result_df.columns): 1028 if col not in select_columns: 1029 del result_df[col] 1030 return result_df 1031 1032 if omit_columns: 1033 for col in list(result_df.columns): 1034 if col in omit_columns: 1035 del result_df[col] 1036 if debug: 1037 dprint(f"{dtypes=}") 1038 return result_df
Query the dataframe with the params dictionary.
Parameters
- df (pd.DataFrame): The DataFrame to query against.
- params (Optional[Dict[str, Any]], default None): The parameters dictionary to use for the query.
- begin (Union[datetime, int, None], default None):
If
begin
anddatetime_column
are provided, only return rows with a timestamp greater than or equal to this value. - end (Union[datetime, int, None], default None):
If
begin
anddatetime_column
are provided, only return rows with a timestamp less than this value. - datetime_column (Optional[str], default None):
A
datetime_column
must be provided to usebegin
andend
. - select_columns (Optional[List[str]], default None): If provided, only return these columns.
- omit_columns (Optional[List[str]], default None): If provided, do not include these columns in the result.
- inplace (bool, default False):
If
True
, modify the DataFrame inplace rather than creating a new DataFrame. - reset_index (bool, default True):
If
True
, reset the index in the resulting DataFrame.
Returns
- A Pandas DataFrame query result.