meerschaum.utils.dataframe
Utility functions for working with DataFrames.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Utility functions for working with DataFrames. 7""" 8 9from __future__ import annotations 10from datetime import datetime, timezone 11from collections import defaultdict 12 13import meerschaum as mrsm 14from meerschaum.utils.typing import ( 15 Optional, Dict, Any, List, Hashable, Generator, 16 Iterator, Iterable, Union, TYPE_CHECKING, 17) 18 19if TYPE_CHECKING: 20 pd, dask = mrsm.attempt_import('pandas', 'dask') 21 22 23def add_missing_cols_to_df( 24 df: 'pd.DataFrame', 25 dtypes: Dict[str, Any], 26) -> 'pd.DataFrame': 27 """ 28 Add columns from the dtypes dictionary as null columns to a new DataFrame. 29 30 Parameters 31 ---------- 32 df: pd.DataFrame 33 The dataframe we should copy and add null columns. 34 35 dtypes: 36 The data types dictionary which may contain keys not present in `df.columns`. 37 38 Returns 39 ------- 40 A new `DataFrame` with the keys from `dtypes` added as null columns. 41 If `df.dtypes` is the same as `dtypes`, then return a reference to `df`. 42 NOTE: This will not ensure that dtypes are enforced! 43 44 Examples 45 -------- 46 >>> import pandas as pd 47 >>> df = pd.DataFrame([{'a': 1}]) 48 >>> dtypes = {'b': 'Int64'} 49 >>> add_missing_cols_to_df(df, dtypes) 50 a b 51 0 1 <NA> 52 >>> add_missing_cols_to_df(df, dtypes).dtypes 53 a int64 54 b Int64 55 dtype: object 56 >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes 57 a int64 58 dtype: object 59 >>> 60 """ 61 if set(df.columns) == set(dtypes): 62 return df 63 64 from meerschaum.utils.packages import attempt_import 65 from meerschaum.utils.dtypes import to_pandas_dtype 66 pandas = attempt_import('pandas') 67 68 def build_series(dtype: str): 69 return pandas.Series([], dtype=to_pandas_dtype(dtype)) 70 71 assign_kwargs = { 72 str(col): build_series(str(typ)) 73 for col, typ in dtypes.items() 74 if col not in df.columns 75 } 76 df_with_cols = df.assign(**assign_kwargs) 77 for col in assign_kwargs: 78 df_with_cols[col] = df_with_cols[col].fillna(pandas.NA) 79 return df_with_cols 80 81 82def filter_unseen_df( 83 old_df: 'pd.DataFrame', 84 new_df: 'pd.DataFrame', 85 safe_copy: bool = True, 86 dtypes: Optional[Dict[str, Any]] = None, 87 include_unchanged_columns: bool = False, 88 debug: bool = False, 89) -> 'pd.DataFrame': 90 """ 91 Left join two DataFrames to find the newest unseen data. 92 93 Parameters 94 ---------- 95 old_df: 'pd.DataFrame' 96 The original (target) dataframe. Acts as a filter on the `new_df`. 97 98 new_df: 'pd.DataFrame' 99 The fetched (source) dataframe. Rows that are contained in `old_df` are removed. 100 101 safe_copy: bool, default True 102 If `True`, create a copy before comparing and modifying the dataframes. 103 Setting to `False` may mutate the DataFrames. 104 105 dtypes: Optional[Dict[str, Any]], default None 106 Optionally specify the datatypes of the dataframe. 107 108 include_unchanged_columns: bool, default False 109 If `True`, include columns which haven't changed on rows which have changed. 110 111 debug: bool, default False 112 Verbosity toggle. 113 114 Returns 115 ------- 116 A pandas dataframe of the new, unseen rows in `new_df`. 117 118 Examples 119 -------- 120 ```python 121 >>> import pandas as pd 122 >>> df1 = pd.DataFrame({'a': [1,2]}) 123 >>> df2 = pd.DataFrame({'a': [2,3]}) 124 >>> filter_unseen_df(df1, df2) 125 a 126 0 3 127 128 ``` 129 130 """ 131 if old_df is None: 132 return new_df 133 134 if safe_copy: 135 old_df = old_df.copy() 136 new_df = new_df.copy() 137 138 import json 139 import functools 140 import traceback 141 from decimal import Decimal 142 from uuid import UUID 143 from meerschaum.utils.warnings import warn 144 from meerschaum.utils.packages import import_pandas, attempt_import 145 from meerschaum.utils.dtypes import ( 146 to_pandas_dtype, 147 are_dtypes_equal, 148 attempt_cast_to_numeric, 149 attempt_cast_to_uuid, 150 coerce_timezone, 151 ) 152 pd = import_pandas(debug=debug) 153 is_dask = 'dask' in new_df.__module__ 154 if is_dask: 155 pandas = attempt_import('pandas') 156 _ = attempt_import('partd', lazy=False) 157 dd = attempt_import('dask.dataframe') 158 merge = dd.merge 159 NA = pandas.NA 160 else: 161 merge = pd.merge 162 NA = pd.NA 163 164 new_df_dtypes = dict(new_df.dtypes) 165 old_df_dtypes = dict(old_df.dtypes) 166 167 same_cols = set(new_df.columns) == set(old_df.columns) 168 if not same_cols: 169 new_df = add_missing_cols_to_df(new_df, old_df_dtypes) 170 old_df = add_missing_cols_to_df(old_df, new_df_dtypes) 171 172 new_types_missing_from_old = { 173 col: typ 174 for col, typ in new_df_dtypes.items() 175 if col not in old_df_dtypes 176 } 177 old_types_missing_from_new = { 178 col: typ 179 for col, typ in new_df_dtypes.items() 180 if col not in old_df_dtypes 181 } 182 old_df_dtypes.update(new_types_missing_from_old) 183 new_df_dtypes.update(old_types_missing_from_new) 184 185 ### Edge case: two empty lists cast to DFs. 186 elif len(new_df.columns) == 0: 187 return new_df 188 189 try: 190 ### Order matters when checking equality. 191 new_df = new_df[old_df.columns] 192 193 except Exception as e: 194 warn( 195 "Was not able to cast old columns onto new DataFrame. " + 196 f"Are both DataFrames the same shape? Error:\n{e}", 197 stacklevel = 3, 198 ) 199 return new_df[list(new_df_dtypes.keys())] 200 201 ### assume the old_df knows what it's doing, even if it's technically wrong. 202 if dtypes is None: 203 dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()} 204 205 dtypes = { 206 col: to_pandas_dtype(typ) 207 for col, typ in dtypes.items() 208 if col in new_df_dtypes and col in old_df_dtypes 209 } 210 for col, typ in new_df_dtypes.items(): 211 if col not in dtypes: 212 dtypes[col] = typ 213 214 dt_dtypes = { 215 col: typ 216 for col, typ in dtypes.items() 217 if are_dtypes_equal(typ, 'datetime') 218 } 219 non_dt_dtypes = { 220 col: typ 221 for col, typ in dtypes.items() 222 if col not in dt_dtypes 223 } 224 225 cast_non_dt_cols = True 226 try: 227 new_df = new_df.astype(non_dt_dtypes) 228 cast_non_dt_cols = False 229 except Exception as e: 230 warn( 231 f"Was not able to cast the new DataFrame to the given dtypes.\n{e}" 232 ) 233 234 cast_dt_cols = True 235 try: 236 for col, typ in dt_dtypes.items(): 237 tz = typ.split(',')[-1].strip() if ',' in typ else None 238 new_df[col] = coerce_timezone(pd.to_datetime(new_df[col], utc=True)) 239 cast_dt_cols = False 240 except Exception as e: 241 warn(f"Could not cast datetime columns:\n{e}") 242 243 cast_cols = cast_dt_cols or cast_non_dt_cols 244 245 new_numeric_cols_existing = get_numeric_cols(new_df) 246 old_numeric_cols = get_numeric_cols(old_df) 247 for col, typ in {k: v for k, v in dtypes.items()}.items(): 248 if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')): 249 new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float') 250 new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int') 251 new_is_numeric = col in new_numeric_cols_existing 252 old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float') 253 old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int') 254 old_is_numeric = col in old_numeric_cols 255 256 if ( 257 (new_is_float or new_is_int or new_is_numeric) 258 and 259 (old_is_float or old_is_int or old_is_numeric) 260 ): 261 dtypes[col] = attempt_cast_to_numeric 262 cast_cols = True 263 continue 264 265 ### Fallback to object if the types don't match. 266 warn( 267 f"Detected different types for '{col}' " 268 + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), " 269 + "falling back to 'object'..." 270 ) 271 dtypes[col] = 'object' 272 cast_cols = True 273 274 if cast_cols: 275 for col, dtype in dtypes.items(): 276 if col in new_df.columns: 277 try: 278 new_df[col] = ( 279 new_df[col].astype(dtype) 280 if not callable(dtype) 281 else new_df[col].apply(dtype) 282 ) 283 except Exception as e: 284 warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}") 285 286 serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str) 287 new_json_cols = get_json_cols(new_df) 288 old_json_cols = get_json_cols(old_df) 289 json_cols = set(new_json_cols + old_json_cols) 290 for json_col in old_json_cols: 291 old_df[json_col] = old_df[json_col].apply(serializer) 292 for json_col in new_json_cols: 293 new_df[json_col] = new_df[json_col].apply(serializer) 294 295 new_numeric_cols = get_numeric_cols(new_df) 296 numeric_cols = set(new_numeric_cols + old_numeric_cols) 297 for numeric_col in old_numeric_cols: 298 old_df[numeric_col] = old_df[numeric_col].apply( 299 lambda x: f'{x:f}' if isinstance(x, Decimal) else x 300 ) 301 for numeric_col in new_numeric_cols: 302 new_df[numeric_col] = new_df[numeric_col].apply( 303 lambda x: f'{x:f}' if isinstance(x, Decimal) else x 304 ) 305 306 old_dt_cols = [ 307 col 308 for col, typ in old_df.dtypes.items() 309 if are_dtypes_equal(str(typ), 'datetime') 310 ] 311 for col in old_dt_cols: 312 old_df[col] = coerce_timezone(old_df[col]) 313 314 new_dt_cols = [ 315 col 316 for col, typ in old_df.dtypes.items() 317 if are_dtypes_equal(str(typ), 'datetime') 318 ] 319 for col in new_dt_cols: 320 new_df[col] = coerce_timezone(new_df[col]) 321 322 old_uuid_cols = get_uuid_cols(old_df) 323 new_uuid_cols = get_uuid_cols(new_df) 324 uuid_cols = set(new_uuid_cols + old_uuid_cols) 325 joined_df = merge( 326 new_df.infer_objects(copy=False).fillna(NA), 327 old_df.infer_objects(copy=False).fillna(NA), 328 how='left', 329 on=None, 330 indicator=True, 331 ) 332 changed_rows_mask = (joined_df['_merge'] == 'left_only') 333 new_cols = list(new_df_dtypes) 334 delta_df = joined_df[new_cols][changed_rows_mask].reset_index(drop=True) 335 336 for json_col in json_cols: 337 if json_col not in delta_df.columns: 338 continue 339 try: 340 delta_df[json_col] = delta_df[json_col].apply(json.loads) 341 except Exception: 342 warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}") 343 344 for numeric_col in numeric_cols: 345 if numeric_col not in delta_df.columns: 346 continue 347 try: 348 delta_df[numeric_col] = delta_df[numeric_col].apply(attempt_cast_to_numeric) 349 except Exception: 350 warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}") 351 352 for uuid_col in uuid_cols: 353 if uuid_col not in delta_df.columns: 354 continue 355 try: 356 delta_df[uuid_col] = delta_df[uuid_col].apply(attempt_cast_to_uuid) 357 except Exception: 358 warn(f"Unable to parse numeric column '{uuid_col}':\n{traceback.format_exc()}") 359 360 return delta_df 361 362 363def parse_df_datetimes( 364 df: 'pd.DataFrame', 365 ignore_cols: Optional[Iterable[str]] = None, 366 chunksize: Optional[int] = None, 367 dtype_backend: str = 'numpy_nullable', 368 debug: bool = False, 369) -> 'pd.DataFrame': 370 """ 371 Parse a pandas DataFrame for datetime columns and cast as datetimes. 372 373 Parameters 374 ---------- 375 df: pd.DataFrame 376 The pandas DataFrame to parse. 377 378 ignore_cols: Optional[Iterable[str]], default None 379 If provided, do not attempt to coerce these columns as datetimes. 380 381 chunksize: Optional[int], default None 382 If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe. 383 384 dtype_backend: str, default 'numpy_nullable' 385 If `df` is not a DataFrame and new one needs to be constructed, 386 use this as the datatypes backend. 387 Accepted values are 'numpy_nullable' and 'pyarrow'. 388 389 debug: bool, default False 390 Verbosity toggle. 391 392 Returns 393 ------- 394 A new pandas DataFrame with the determined datetime columns 395 (usually ISO strings) cast as datetimes. 396 397 Examples 398 -------- 399 ```python 400 >>> import pandas as pd 401 >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 402 >>> df.dtypes 403 a object 404 dtype: object 405 >>> df = parse_df_datetimes(df) 406 >>> df.dtypes 407 a datetime64[ns] 408 dtype: object 409 410 ``` 411 412 """ 413 from meerschaum.utils.packages import import_pandas, attempt_import 414 from meerschaum.utils.debug import dprint 415 from meerschaum.utils.warnings import warn 416 from meerschaum.utils.misc import items_str 417 import traceback 418 pd = import_pandas() 419 pandas = attempt_import('pandas') 420 pd_name = pd.__name__ 421 using_dask = 'dask' in pd_name 422 df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__) 423 dask_dataframe = None 424 if using_dask or df_is_dask: 425 npartitions = chunksize_to_npartitions(chunksize) 426 dask_dataframe = attempt_import('dask.dataframe') 427 428 ### if df is a dict, build DataFrame 429 if isinstance(df, pandas.DataFrame): 430 pdf = df 431 elif df_is_dask and isinstance(df, dask_dataframe.DataFrame): 432 pdf = get_first_valid_dask_partition(df) 433 else: 434 if debug: 435 dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...") 436 437 if using_dask: 438 if isinstance(df, list): 439 keys = set() 440 for doc in df: 441 for key in doc: 442 keys.add(key) 443 df = pd.DataFrame.from_dict( 444 { 445 k: [ 446 doc.get(k, None) 447 for doc in df 448 ] for k in keys 449 }, 450 npartitions = npartitions, 451 ) 452 elif isinstance(df, dict): 453 df = pd.DataFrame.from_dict(df, npartitions=npartitions) 454 elif 'pandas.core.frame.DataFrame' in str(type(df)): 455 df = pd.from_pandas(df, npartitions=npartitions) 456 else: 457 raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.") 458 pandas = attempt_import('pandas') 459 pdf = get_first_valid_dask_partition(df) 460 461 else: 462 df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend) 463 pdf = df 464 465 ### skip parsing if DataFrame is empty 466 if len(pdf) == 0: 467 if debug: 468 dprint(f"df is empty. Returning original DataFrame without casting datetime columns...") 469 return df 470 471 ignore_cols = set( 472 (ignore_cols or []) + [ 473 col 474 for col, dtype in pdf.dtypes.items() 475 if 'datetime' in str(dtype) 476 ] 477 ) 478 cols_to_inspect = [col for col in pdf.columns if col not in ignore_cols] 479 480 if len(cols_to_inspect) == 0: 481 if debug: 482 dprint(f"All columns are ignored, skipping datetime detection...") 483 return df 484 485 ### apply regex to columns to determine which are ISO datetimes 486 iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+' 487 dt_mask = pdf[cols_to_inspect].astype(str).apply( 488 lambda s: s.str.match(iso_dt_regex).all() 489 ) 490 491 ### list of datetime column names 492 datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]] 493 if not datetime_cols: 494 if debug: 495 dprint("No columns detected as datetimes, returning...") 496 return df 497 498 if debug: 499 dprint("Converting columns to datetimes: " + str(datetime_cols)) 500 501 try: 502 if not using_dask: 503 df[datetime_cols] = df[datetime_cols].apply(pd.to_datetime, utc=True) 504 else: 505 df[datetime_cols] = df[datetime_cols].apply( 506 pd.to_datetime, 507 utc=True, 508 axis=1, 509 meta={ 510 col: 'datetime64[ns]' 511 for col in datetime_cols 512 } 513 ) 514 except Exception: 515 warn( 516 f"Unable to apply `pd.to_datetime` to {items_str(datetime_cols)}:\n" 517 + f"{traceback.format_exc()}" 518 ) 519 520 for dt in datetime_cols: 521 try: 522 df[dt] = df[dt].dt.tz_localize(None) 523 except Exception: 524 warn(f"Unable to convert column '{dt}' to naive datetime:\n{traceback.format_exc()}") 525 526 return df 527 528 529def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]: 530 """ 531 Get the columns which contain unhashable objects from a Pandas DataFrame. 532 533 Parameters 534 ---------- 535 df: pd.DataFrame 536 The DataFrame which may contain unhashable objects. 537 538 Returns 539 ------- 540 A list of columns. 541 """ 542 if df is None: 543 return [] 544 if len(df) == 0: 545 return [] 546 547 is_dask = 'dask' in df.__module__ 548 if is_dask: 549 from meerschaum.utils.packages import attempt_import 550 pandas = attempt_import('pandas') 551 df = pandas.DataFrame(get_first_valid_dask_partition(df)) 552 return [ 553 col for col, val in df.iloc[0].items() 554 if not isinstance(val, Hashable) 555 ] 556 557 558def get_json_cols(df: 'pd.DataFrame') -> List[str]: 559 """ 560 Get the columns which contain unhashable objects from a Pandas DataFrame. 561 562 Parameters 563 ---------- 564 df: pd.DataFrame 565 The DataFrame which may contain unhashable objects. 566 567 Returns 568 ------- 569 A list of columns to be encoded as JSON. 570 """ 571 if df is None: 572 return [] 573 574 is_dask = 'dask' in df.__module__ if hasattr(df, '__module__') else False 575 if is_dask: 576 df = get_first_valid_dask_partition(df) 577 578 if len(df) == 0: 579 return [] 580 581 cols_indices = { 582 col: df[col].first_valid_index() 583 for col in df.columns 584 } 585 return [ 586 col 587 for col, ix in cols_indices.items() 588 if ( 589 ix is not None 590 and 591 not isinstance(df.loc[ix][col], Hashable) 592 ) 593 ] 594 595 596def get_numeric_cols(df: 'pd.DataFrame') -> List[str]: 597 """ 598 Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame. 599 600 Parameters 601 ---------- 602 df: pd.DataFrame 603 The DataFrame which may contain decimal objects. 604 605 Returns 606 ------- 607 A list of columns to treat as numerics. 608 """ 609 if df is None: 610 return [] 611 from decimal import Decimal 612 is_dask = 'dask' in df.__module__ 613 if is_dask: 614 df = get_first_valid_dask_partition(df) 615 616 if len(df) == 0: 617 return [] 618 619 cols_indices = { 620 col: df[col].first_valid_index() 621 for col in df.columns 622 } 623 return [ 624 col 625 for col, ix in cols_indices.items() 626 if ( 627 ix is not None 628 and 629 isinstance(df.loc[ix][col], Decimal) 630 ) 631 ] 632 633 634def get_uuid_cols(df: 'pd.DataFrame') -> List[str]: 635 """ 636 Get the columns which contain `uuid.UUID` objects from a Pandas DataFrame. 637 638 Parameters 639 ---------- 640 df: pd.DataFrame 641 The DataFrame which may contain UUID objects. 642 643 Returns 644 ------- 645 A list of columns to treat as numerics. 646 """ 647 if df is None: 648 return [] 649 from uuid import UUID 650 is_dask = 'dask' in df.__module__ 651 if is_dask: 652 df = get_first_valid_dask_partition(df) 653 654 if len(df) == 0: 655 return [] 656 657 cols_indices = { 658 col: df[col].first_valid_index() 659 for col in df.columns 660 } 661 return [ 662 col 663 for col, ix in cols_indices.items() 664 if ( 665 ix is not None 666 and 667 isinstance(df.loc[ix][col], UUID) 668 ) 669 ] 670 671 672def enforce_dtypes( 673 df: 'pd.DataFrame', 674 dtypes: Dict[str, str], 675 safe_copy: bool = True, 676 coerce_numeric: bool = True, 677 debug: bool = False, 678) -> 'pd.DataFrame': 679 """ 680 Enforce the `dtypes` dictionary on a DataFrame. 681 682 Parameters 683 ---------- 684 df: pd.DataFrame 685 The DataFrame on which to enforce dtypes. 686 687 dtypes: Dict[str, str] 688 The data types to attempt to enforce on the DataFrame. 689 690 safe_copy: bool, default True 691 If `True`, create a copy before comparing and modifying the dataframes. 692 Setting to `False` may mutate the DataFrames. 693 See `meerschaum.utils.dataframe.filter_unseen_df`. 694 695 coerce_numeric: bool, default True 696 If `True`, convert float and int collisions to numeric. 697 698 debug: bool, default False 699 Verbosity toggle. 700 701 Returns 702 ------- 703 The Pandas DataFrame with the types enforced. 704 """ 705 import json 706 import traceback 707 from decimal import Decimal 708 from meerschaum.utils.debug import dprint 709 from meerschaum.utils.warnings import warn 710 from meerschaum.utils.formatting import pprint 711 from meerschaum.config.static import STATIC_CONFIG 712 from meerschaum.utils.packages import import_pandas 713 from meerschaum.utils.dtypes import ( 714 are_dtypes_equal, 715 to_pandas_dtype, 716 is_dtype_numeric, 717 attempt_cast_to_numeric, 718 attempt_cast_to_uuid, 719 coerce_timezone, 720 ) 721 if safe_copy: 722 df = df.copy() 723 if len(df.columns) == 0: 724 if debug: 725 dprint("Incoming DataFrame has no columns. Skipping enforcement...") 726 return df 727 728 pipe_pandas_dtypes = { 729 col: to_pandas_dtype(typ) 730 for col, typ in dtypes.items() 731 } 732 json_cols = [ 733 col 734 for col, typ in dtypes.items() 735 if typ == 'json' 736 ] 737 numeric_cols = [ 738 col 739 for col, typ in dtypes.items() 740 if typ == 'numeric' 741 ] 742 uuid_cols = [ 743 col 744 for col, typ in dtypes.items() 745 if typ == 'uuid' 746 ] 747 df_numeric_cols = get_numeric_cols(df) 748 if debug: 749 dprint("Desired data types:") 750 pprint(dtypes) 751 dprint("Data types for incoming DataFrame:") 752 pprint({_col: str(_typ) for _col, _typ in df.dtypes.items()}) 753 754 if json_cols and len(df) > 0: 755 if debug: 756 dprint(f"Checking columns for JSON encoding: {json_cols}") 757 for col in json_cols: 758 if col in df.columns: 759 try: 760 df[col] = df[col].apply( 761 ( 762 lambda x: ( 763 json.loads(x) 764 if isinstance(x, str) 765 else x 766 ) 767 ) 768 ) 769 except Exception as e: 770 if debug: 771 dprint(f"Unable to parse column '{col}' as JSON:\n{e}") 772 773 if numeric_cols: 774 if debug: 775 dprint(f"Checking for numerics: {numeric_cols}") 776 for col in numeric_cols: 777 if col in df.columns: 778 try: 779 df[col] = df[col].apply(attempt_cast_to_numeric) 780 except Exception as e: 781 if debug: 782 dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}") 783 784 if uuid_cols: 785 if debug: 786 dprint(f"Checking for UUIDs: {uuid_cols}") 787 for col in uuid_cols: 788 if col in df.columns: 789 try: 790 df[col] = df[col].apply(attempt_cast_to_uuid) 791 except Exception as e: 792 if debug: 793 dprint(f"Unable to parse column '{col}' as UUID:\n{e}") 794 795 df_dtypes = {c: str(t) for c, t in df.dtypes.items()} 796 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 797 if debug: 798 dprint("Data types match. Exiting enforcement...") 799 return df 800 801 common_dtypes = {} 802 common_diff_dtypes = {} 803 for col, typ in pipe_pandas_dtypes.items(): 804 if col in df_dtypes: 805 common_dtypes[col] = typ 806 if not are_dtypes_equal(typ, df_dtypes[col]): 807 common_diff_dtypes[col] = df_dtypes[col] 808 809 if debug: 810 dprint("Common columns with different dtypes:") 811 pprint(common_diff_dtypes) 812 813 detected_dt_cols = {} 814 for col, typ in common_diff_dtypes.items(): 815 if 'datetime' in typ and 'datetime' in common_dtypes[col]: 816 df_dtypes[col] = typ 817 detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col]) 818 for col in detected_dt_cols: 819 del common_diff_dtypes[col] 820 821 if debug: 822 dprint("Common columns with different dtypes (after dates):") 823 pprint(common_diff_dtypes) 824 825 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 826 if debug: 827 dprint( 828 "The incoming DataFrame has mostly the same types, skipping enforcement." 829 + "The only detected difference was in the following datetime columns.\n" 830 + " Timezone information may be stripped." 831 ) 832 pprint(detected_dt_cols) 833 return df 834 835 for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items(): 836 previous_typ = common_dtypes[col] 837 mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ)) 838 explicitly_float = are_dtypes_equal(dtypes.get(col, 'object'), 'float') 839 explicitly_numeric = dtypes.get(col, 'numeric') == 'numeric' 840 cast_to_numeric = ( 841 explicitly_numeric 842 or col in df_numeric_cols 843 or (mixed_numeric_types and not explicitly_float) 844 ) and coerce_numeric 845 if cast_to_numeric: 846 common_dtypes[col] = attempt_cast_to_numeric 847 common_diff_dtypes[col] = attempt_cast_to_numeric 848 849 for d in common_diff_dtypes: 850 t = common_dtypes[d] 851 if debug: 852 dprint(f"Casting column {d} to dtype {t}.") 853 try: 854 df[d] = ( 855 df[d].apply(t) 856 if callable(t) 857 else df[d].astype(t) 858 ) 859 except Exception as e: 860 if debug: 861 dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}") 862 if 'int' in str(t).lower(): 863 try: 864 df[d] = df[d].astype('float64').astype(t) 865 except Exception: 866 if debug: 867 dprint(f"Was unable to convert to float then {t}.") 868 return df 869 870 871def get_datetime_bound_from_df( 872 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]], 873 datetime_column: str, 874 minimum: bool = True, 875) -> Union[int, datetime, None]: 876 """ 877 Return the minimum or maximum datetime (or integer) from a DataFrame. 878 879 Parameters 880 ---------- 881 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]] 882 The DataFrame, list, or dict which contains the range axis. 883 884 datetime_column: str 885 The name of the datetime (or int) column. 886 887 minimum: bool 888 Whether to return the minimum (default) or maximum value. 889 890 Returns 891 ------- 892 The minimum or maximum datetime value in the dataframe, or `None`. 893 """ 894 if df is None: 895 return None 896 if not datetime_column: 897 return None 898 899 def compare(a, b): 900 if a is None: 901 return b 902 if b is None: 903 return a 904 if minimum: 905 return a if a < b else b 906 return a if a > b else b 907 908 if isinstance(df, list): 909 if len(df) == 0: 910 return None 911 best_yet = df[0].get(datetime_column, None) 912 for doc in df: 913 val = doc.get(datetime_column, None) 914 best_yet = compare(best_yet, val) 915 return best_yet 916 917 if isinstance(df, dict): 918 if datetime_column not in df: 919 return None 920 best_yet = df[datetime_column][0] 921 for val in df[datetime_column]: 922 best_yet = compare(best_yet, val) 923 return best_yet 924 925 if 'DataFrame' in str(type(df)): 926 from meerschaum.utils.dtypes import are_dtypes_equal 927 pandas = mrsm.attempt_import('pandas') 928 is_dask = 'dask' in df.__module__ 929 930 if datetime_column not in df.columns: 931 return None 932 933 dt_val = ( 934 df[datetime_column].min(skipna=True) 935 if minimum else df[datetime_column].max(skipna=True) 936 ) 937 if is_dask and dt_val is not None: 938 dt_val = dt_val.compute() 939 940 return ( 941 pandas.to_datetime(dt_val).to_pydatetime() 942 if are_dtypes_equal(str(type(dt_val)), 'datetime') 943 else (dt_val if dt_val is not pandas.NA else None) 944 ) 945 946 return None 947 948 949def get_unique_index_values( 950 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]], 951 indices: List[str], 952) -> Dict[str, List[Any]]: 953 """ 954 Return a dictionary of the unique index values in a DataFrame. 955 956 Parameters 957 ---------- 958 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]] 959 The dataframe (or list or dict) which contains index values. 960 961 indices: List[str] 962 The list of index columns. 963 964 Returns 965 ------- 966 A dictionary mapping indices to unique values. 967 """ 968 if df is None: 969 return {} 970 if 'dataframe' in str(type(df)).lower(): 971 pandas = mrsm.attempt_import('pandas') 972 return { 973 col: list({ 974 (val if val is not pandas.NA else None) 975 for val in df[col].unique() 976 }) 977 for col in indices 978 if col in df.columns 979 } 980 981 unique_indices = defaultdict(lambda: set()) 982 if isinstance(df, list): 983 for doc in df: 984 for index in indices: 985 if index in doc: 986 unique_indices[index].add(doc[index]) 987 988 elif isinstance(df, dict): 989 for index in indices: 990 if index in df: 991 unique_indices[index] = unique_indices[index].union(set(df[index])) 992 993 return {key: list(val) for key, val in unique_indices.items()} 994 995 996def df_is_chunk_generator(df: Any) -> bool: 997 """ 998 Determine whether to treat `df` as a chunk generator. 999 1000 Note this should only be used in a context where generators are expected, 1001 as it will return `True` for any iterable. 1002 1003 Parameters 1004 ---------- 1005 The DataFrame or chunk generator to evaluate. 1006 1007 Returns 1008 ------- 1009 A `bool` indicating whether to treat `df` as a generator. 1010 """ 1011 return ( 1012 not isinstance(df, (dict, list, str)) 1013 and 'DataFrame' not in str(type(df)) 1014 and isinstance(df, (Generator, Iterable, Iterator)) 1015 ) 1016 1017 1018def chunksize_to_npartitions(chunksize: Optional[int]) -> int: 1019 """ 1020 Return the Dask `npartitions` value for a given `chunksize`. 1021 """ 1022 if chunksize == -1: 1023 from meerschaum.config import get_config 1024 chunksize = get_config('system', 'connectors', 'sql', 'chunksize') 1025 if chunksize is None: 1026 return 1 1027 return -1 * chunksize 1028 1029 1030def df_from_literal( 1031 pipe: Optional[mrsm.Pipe] = None, 1032 literal: str = None, 1033 debug: bool = False 1034) -> 'pd.DataFrame': 1035 """ 1036 Construct a dataframe from a literal value, using the pipe's datetime and value column names. 1037 1038 Parameters 1039 ---------- 1040 pipe: Optional['meerschaum.Pipe'], default None 1041 The pipe which will consume the literal value. 1042 1043 Returns 1044 ------- 1045 A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns 1046 and the literal as the value. 1047 """ 1048 from meerschaum.utils.packages import import_pandas 1049 from meerschaum.utils.warnings import error, warn 1050 from meerschaum.utils.debug import dprint 1051 1052 if pipe is None or literal is None: 1053 error("Please provide a Pipe and a literal value") 1054 ### this will raise an error if the columns are undefined 1055 dt_name, val_name = pipe.get_columns('datetime', 'value') 1056 1057 val = literal 1058 if isinstance(literal, str): 1059 if debug: 1060 dprint(f"Received literal string: '{literal}'") 1061 import ast 1062 try: 1063 val = ast.literal_eval(literal) 1064 except Exception as e: 1065 warn( 1066 "Failed to parse value from string:\n" + f"{literal}" + 1067 "\n\nWill cast as a string instead."\ 1068 ) 1069 val = literal 1070 1071 from datetime import datetime, timezone 1072 now = datetime.now(timezone.utc).replace(tzinfo=None) 1073 1074 pd = import_pandas() 1075 return pd.DataFrame({dt_name: [now], val_name: [val]}) 1076 1077 1078def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]: 1079 """ 1080 Return the first valid Dask DataFrame partition (if possible). 1081 """ 1082 pdf = None 1083 for partition in ddf.partitions: 1084 try: 1085 pdf = partition.compute() 1086 except Exception as e: 1087 continue 1088 if len(pdf) > 0: 1089 return pdf 1090 _ = mrsm.attempt_import('partd', lazy=False) 1091 return ddf.compute() 1092 1093 1094def query_df( 1095 df: 'pd.DataFrame', 1096 params: Optional[Dict[str, Any]] = None, 1097 begin: Union[datetime, int, None] = None, 1098 end: Union[datetime, int, None] = None, 1099 datetime_column: Optional[str] = None, 1100 select_columns: Optional[List[str]] = None, 1101 omit_columns: Optional[List[str]] = None, 1102 inplace: bool = False, 1103 reset_index: bool = False, 1104 coerce_types: bool = False, 1105 debug: bool = False, 1106) -> 'pd.DataFrame': 1107 """ 1108 Query the dataframe with the params dictionary. 1109 1110 Parameters 1111 ---------- 1112 df: pd.DataFrame 1113 The DataFrame to query against. 1114 1115 params: Optional[Dict[str, Any]], default None 1116 The parameters dictionary to use for the query. 1117 1118 begin: Union[datetime, int, None], default None 1119 If `begin` and `datetime_column` are provided, only return rows with a timestamp 1120 greater than or equal to this value. 1121 1122 end: Union[datetime, int, None], default None 1123 If `begin` and `datetime_column` are provided, only return rows with a timestamp 1124 less than this value. 1125 1126 datetime_column: Optional[str], default None 1127 A `datetime_column` must be provided to use `begin` and `end`. 1128 1129 select_columns: Optional[List[str]], default None 1130 If provided, only return these columns. 1131 1132 omit_columns: Optional[List[str]], default None 1133 If provided, do not include these columns in the result. 1134 1135 inplace: bool, default False 1136 If `True`, modify the DataFrame inplace rather than creating a new DataFrame. 1137 1138 reset_index: bool, default False 1139 If `True`, reset the index in the resulting DataFrame. 1140 1141 coerce_types: bool, default False 1142 If `True`, cast the dataframe and parameters as strings before querying. 1143 1144 Returns 1145 ------- 1146 A Pandas DataFrame query result. 1147 """ 1148 1149 def _process_select_columns(_df): 1150 if not select_columns: 1151 return 1152 for col in list(_df.columns): 1153 if col not in select_columns: 1154 del _df[col] 1155 1156 def _process_omit_columns(_df): 1157 if not omit_columns: 1158 return 1159 for col in list(_df.columns): 1160 if col in omit_columns: 1161 del _df[col] 1162 1163 if not params and not begin and not end: 1164 if not inplace: 1165 df = df.copy() 1166 _process_select_columns(df) 1167 _process_omit_columns(df) 1168 return df 1169 1170 from meerschaum.utils.debug import dprint 1171 from meerschaum.utils.misc import get_in_ex_params 1172 from meerschaum.utils.warnings import warn 1173 from meerschaum.utils.dtypes import are_dtypes_equal, value_is_null 1174 dateutil_parser = mrsm.attempt_import('dateutil.parser') 1175 pandas = mrsm.attempt_import('pandas') 1176 NA = pandas.NA 1177 1178 if params: 1179 params = params.copy() 1180 for key, val in {k: v for k, v in params.items()}.items(): 1181 if isinstance(val, (list, tuple)): 1182 if None in val: 1183 val = [item for item in val if item is not None] + [NA] 1184 params[key] = val 1185 if coerce_types: 1186 params[key] = [str(x) for x in val] 1187 else: 1188 if value_is_null(val): 1189 val = NA 1190 params[key] = NA 1191 if coerce_types: 1192 params[key] = str(val) 1193 1194 dtypes = {col: str(typ) for col, typ in df.dtypes.items()} 1195 1196 if inplace: 1197 df.fillna(NA, inplace=True) 1198 else: 1199 df = df.infer_objects().fillna(NA) 1200 1201 if isinstance(begin, str): 1202 begin = dateutil_parser.parse(begin) 1203 if isinstance(end, str): 1204 end = dateutil_parser.parse(end) 1205 1206 if begin is not None or end is not None: 1207 if not datetime_column or datetime_column not in df.columns: 1208 warn( 1209 f"The datetime column '{datetime_column}' is not present in the Dataframe, " 1210 + "ignoring begin and end...", 1211 ) 1212 begin, end = None, None 1213 1214 if debug: 1215 dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}") 1216 1217 if datetime_column and (begin is not None or end is not None): 1218 if debug: 1219 dprint("Checking for datetime column compatability.") 1220 1221 from meerschaum.utils.dtypes import are_dtypes_equal, coerce_timezone 1222 df_is_dt = are_dtypes_equal(str(df.dtypes[datetime_column]), 'datetime') 1223 begin_is_int = are_dtypes_equal(str(type(begin)), 'int') 1224 end_is_int = are_dtypes_equal(str(type(end)), 'int') 1225 1226 if df_is_dt: 1227 df_tz = ( 1228 getattr(df[datetime_column].dt, 'tz', None) 1229 if hasattr(df[datetime_column], 'dt') 1230 else None 1231 ) 1232 1233 if begin_is_int: 1234 begin = datetime.fromtimestamp(int(begin), timezone.utc).replace(tzinfo=None) 1235 if debug: 1236 dprint(f"`begin` will be cast to '{begin}'.") 1237 if end_is_int: 1238 end = datetime.fromtimestamp(int(end), timezone.utc).replace(tzinfo=None) 1239 if debug: 1240 dprint(f"`end` will be cast to '{end}'.") 1241 1242 begin_tz = begin.tzinfo if begin is not None else None 1243 end_tz = end.tzinfo if end is not None else None 1244 1245 if begin_tz is not None or end_tz is not None or df_tz is not None: 1246 begin = coerce_timezone(begin) 1247 end = coerce_timezone(end) 1248 if df_tz is not None: 1249 if debug: 1250 dprint(f"Casting column '{datetime_column}' to UTC...") 1251 df[datetime_column] = coerce_timezone(df[datetime_column]) 1252 dprint(f"Using datetime bounds:\n{begin=}\n{end=}") 1253 1254 in_ex_params = get_in_ex_params(params) 1255 1256 masks = [ 1257 ( 1258 (df[datetime_column] >= begin) 1259 if begin is not None and datetime_column 1260 else True 1261 ) & ( 1262 (df[datetime_column] < end) 1263 if end is not None and datetime_column 1264 else True 1265 ) 1266 ] 1267 1268 masks.extend([ 1269 ( 1270 ( 1271 (df[col] if not coerce_types else df[col].astype(str)).isin(in_vals) 1272 if in_vals 1273 else True 1274 ) & ( 1275 ~(df[col] if not coerce_types else df[col].astype(str)).isin(ex_vals) 1276 if ex_vals 1277 else True 1278 ) 1279 ) 1280 for col, (in_vals, ex_vals) in in_ex_params.items() 1281 if col in df.columns 1282 ]) 1283 query_mask = masks[0] 1284 for mask in masks[1:]: 1285 query_mask = query_mask & mask 1286 1287 original_cols = df.columns 1288 1289 ### NOTE: We must cast bool columns to `boolean[pyarrow]` 1290 ### to allow for `<NA>` values. 1291 bool_cols = [ 1292 col 1293 for col, typ in df.dtypes.items() 1294 if are_dtypes_equal(str(typ), 'bool') 1295 ] 1296 for col in bool_cols: 1297 df[col] = df[col].astype('boolean[pyarrow]') 1298 df['__mrsm_mask'] = query_mask.astype('boolean[pyarrow]') 1299 1300 if inplace: 1301 df.where(query_mask, other=NA, inplace=True) 1302 df.dropna(how='all', inplace=True) 1303 result_df = df 1304 else: 1305 result_df = df.where(query_mask, other=NA) 1306 result_df.dropna(how='all', inplace=True) 1307 1308 if '__mrsm_mask' in df.columns: 1309 del df['__mrsm_mask'] 1310 if '__mrsm_mask' in result_df.columns: 1311 del result_df['__mrsm_mask'] 1312 1313 if reset_index: 1314 result_df.reset_index(drop=True, inplace=True) 1315 1316 result_df = enforce_dtypes( 1317 result_df, 1318 dtypes, 1319 safe_copy=False, 1320 debug=debug, 1321 coerce_numeric=False, 1322 ) 1323 1324 if select_columns == ['*']: 1325 select_columns = None 1326 1327 if not select_columns and not omit_columns: 1328 return result_df[original_cols] 1329 1330 _process_select_columns(result_df) 1331 _process_omit_columns(result_df) 1332 1333 return result_df 1334 1335 1336def to_json( 1337 df: 'pd.DataFrame', 1338 safe_copy: bool = True, 1339 orient: str = 'records', 1340 date_format: str = 'iso', 1341 date_unit: str = 'us', 1342 **kwargs: Any 1343) -> str: 1344 """ 1345 Serialize the given dataframe as a JSON string. 1346 1347 Parameters 1348 ---------- 1349 df: pd.DataFrame 1350 The DataFrame to be serialized. 1351 1352 safe_copy: bool, default True 1353 If `False`, modify the DataFrame inplace. 1354 1355 date_format: str, default 'iso' 1356 The default format for timestamps. 1357 1358 date_unit: str, default 'us' 1359 The precision of the timestamps. 1360 1361 Returns 1362 ------- 1363 A JSON string. 1364 """ 1365 from meerschaum.utils.packages import import_pandas 1366 pd = import_pandas() 1367 uuid_cols = get_uuid_cols(df) 1368 if uuid_cols and safe_copy: 1369 df = df.copy() 1370 for col in uuid_cols: 1371 df[col] = df[col].astype(str) 1372 return df.infer_objects(copy=False).fillna(pd.NA).to_json( 1373 date_format=date_format, 1374 date_unit=date_unit, 1375 orient=orient, 1376 **kwargs 1377 )
24def add_missing_cols_to_df( 25 df: 'pd.DataFrame', 26 dtypes: Dict[str, Any], 27) -> 'pd.DataFrame': 28 """ 29 Add columns from the dtypes dictionary as null columns to a new DataFrame. 30 31 Parameters 32 ---------- 33 df: pd.DataFrame 34 The dataframe we should copy and add null columns. 35 36 dtypes: 37 The data types dictionary which may contain keys not present in `df.columns`. 38 39 Returns 40 ------- 41 A new `DataFrame` with the keys from `dtypes` added as null columns. 42 If `df.dtypes` is the same as `dtypes`, then return a reference to `df`. 43 NOTE: This will not ensure that dtypes are enforced! 44 45 Examples 46 -------- 47 >>> import pandas as pd 48 >>> df = pd.DataFrame([{'a': 1}]) 49 >>> dtypes = {'b': 'Int64'} 50 >>> add_missing_cols_to_df(df, dtypes) 51 a b 52 0 1 <NA> 53 >>> add_missing_cols_to_df(df, dtypes).dtypes 54 a int64 55 b Int64 56 dtype: object 57 >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes 58 a int64 59 dtype: object 60 >>> 61 """ 62 if set(df.columns) == set(dtypes): 63 return df 64 65 from meerschaum.utils.packages import attempt_import 66 from meerschaum.utils.dtypes import to_pandas_dtype 67 pandas = attempt_import('pandas') 68 69 def build_series(dtype: str): 70 return pandas.Series([], dtype=to_pandas_dtype(dtype)) 71 72 assign_kwargs = { 73 str(col): build_series(str(typ)) 74 for col, typ in dtypes.items() 75 if col not in df.columns 76 } 77 df_with_cols = df.assign(**assign_kwargs) 78 for col in assign_kwargs: 79 df_with_cols[col] = df_with_cols[col].fillna(pandas.NA) 80 return df_with_cols
Add columns from the dtypes dictionary as null columns to a new DataFrame.
Parameters
- df (pd.DataFrame): The dataframe we should copy and add null columns.
- dtypes:: The data types dictionary which may contain keys not present in
df.columns
.
Returns
- A new
DataFrame
with the keys fromdtypes
added as null columns. - If
df.dtypes
is the same asdtypes
, then return a reference todf
. - NOTE (This will not ensure that dtypes are enforced!):
Examples
>>> import pandas as pd
>>> df = pd.DataFrame([{'a': 1}])
>>> dtypes = {'b': 'Int64'}
>>> add_missing_cols_to_df(df, dtypes)
a b
0 1 <NA>
>>> add_missing_cols_to_df(df, dtypes)meerschaum.utils.dtypes
a int64
b Int64
dtype: object
>>> add_missing_cols_to_df(df, {'a': 'object'})meerschaum.utils.dtypes
a int64
dtype: object
>>>
83def filter_unseen_df( 84 old_df: 'pd.DataFrame', 85 new_df: 'pd.DataFrame', 86 safe_copy: bool = True, 87 dtypes: Optional[Dict[str, Any]] = None, 88 include_unchanged_columns: bool = False, 89 debug: bool = False, 90) -> 'pd.DataFrame': 91 """ 92 Left join two DataFrames to find the newest unseen data. 93 94 Parameters 95 ---------- 96 old_df: 'pd.DataFrame' 97 The original (target) dataframe. Acts as a filter on the `new_df`. 98 99 new_df: 'pd.DataFrame' 100 The fetched (source) dataframe. Rows that are contained in `old_df` are removed. 101 102 safe_copy: bool, default True 103 If `True`, create a copy before comparing and modifying the dataframes. 104 Setting to `False` may mutate the DataFrames. 105 106 dtypes: Optional[Dict[str, Any]], default None 107 Optionally specify the datatypes of the dataframe. 108 109 include_unchanged_columns: bool, default False 110 If `True`, include columns which haven't changed on rows which have changed. 111 112 debug: bool, default False 113 Verbosity toggle. 114 115 Returns 116 ------- 117 A pandas dataframe of the new, unseen rows in `new_df`. 118 119 Examples 120 -------- 121 ```python 122 >>> import pandas as pd 123 >>> df1 = pd.DataFrame({'a': [1,2]}) 124 >>> df2 = pd.DataFrame({'a': [2,3]}) 125 >>> filter_unseen_df(df1, df2) 126 a 127 0 3 128 129 ``` 130 131 """ 132 if old_df is None: 133 return new_df 134 135 if safe_copy: 136 old_df = old_df.copy() 137 new_df = new_df.copy() 138 139 import json 140 import functools 141 import traceback 142 from decimal import Decimal 143 from uuid import UUID 144 from meerschaum.utils.warnings import warn 145 from meerschaum.utils.packages import import_pandas, attempt_import 146 from meerschaum.utils.dtypes import ( 147 to_pandas_dtype, 148 are_dtypes_equal, 149 attempt_cast_to_numeric, 150 attempt_cast_to_uuid, 151 coerce_timezone, 152 ) 153 pd = import_pandas(debug=debug) 154 is_dask = 'dask' in new_df.__module__ 155 if is_dask: 156 pandas = attempt_import('pandas') 157 _ = attempt_import('partd', lazy=False) 158 dd = attempt_import('dask.dataframe') 159 merge = dd.merge 160 NA = pandas.NA 161 else: 162 merge = pd.merge 163 NA = pd.NA 164 165 new_df_dtypes = dict(new_df.dtypes) 166 old_df_dtypes = dict(old_df.dtypes) 167 168 same_cols = set(new_df.columns) == set(old_df.columns) 169 if not same_cols: 170 new_df = add_missing_cols_to_df(new_df, old_df_dtypes) 171 old_df = add_missing_cols_to_df(old_df, new_df_dtypes) 172 173 new_types_missing_from_old = { 174 col: typ 175 for col, typ in new_df_dtypes.items() 176 if col not in old_df_dtypes 177 } 178 old_types_missing_from_new = { 179 col: typ 180 for col, typ in new_df_dtypes.items() 181 if col not in old_df_dtypes 182 } 183 old_df_dtypes.update(new_types_missing_from_old) 184 new_df_dtypes.update(old_types_missing_from_new) 185 186 ### Edge case: two empty lists cast to DFs. 187 elif len(new_df.columns) == 0: 188 return new_df 189 190 try: 191 ### Order matters when checking equality. 192 new_df = new_df[old_df.columns] 193 194 except Exception as e: 195 warn( 196 "Was not able to cast old columns onto new DataFrame. " + 197 f"Are both DataFrames the same shape? Error:\n{e}", 198 stacklevel = 3, 199 ) 200 return new_df[list(new_df_dtypes.keys())] 201 202 ### assume the old_df knows what it's doing, even if it's technically wrong. 203 if dtypes is None: 204 dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()} 205 206 dtypes = { 207 col: to_pandas_dtype(typ) 208 for col, typ in dtypes.items() 209 if col in new_df_dtypes and col in old_df_dtypes 210 } 211 for col, typ in new_df_dtypes.items(): 212 if col not in dtypes: 213 dtypes[col] = typ 214 215 dt_dtypes = { 216 col: typ 217 for col, typ in dtypes.items() 218 if are_dtypes_equal(typ, 'datetime') 219 } 220 non_dt_dtypes = { 221 col: typ 222 for col, typ in dtypes.items() 223 if col not in dt_dtypes 224 } 225 226 cast_non_dt_cols = True 227 try: 228 new_df = new_df.astype(non_dt_dtypes) 229 cast_non_dt_cols = False 230 except Exception as e: 231 warn( 232 f"Was not able to cast the new DataFrame to the given dtypes.\n{e}" 233 ) 234 235 cast_dt_cols = True 236 try: 237 for col, typ in dt_dtypes.items(): 238 tz = typ.split(',')[-1].strip() if ',' in typ else None 239 new_df[col] = coerce_timezone(pd.to_datetime(new_df[col], utc=True)) 240 cast_dt_cols = False 241 except Exception as e: 242 warn(f"Could not cast datetime columns:\n{e}") 243 244 cast_cols = cast_dt_cols or cast_non_dt_cols 245 246 new_numeric_cols_existing = get_numeric_cols(new_df) 247 old_numeric_cols = get_numeric_cols(old_df) 248 for col, typ in {k: v for k, v in dtypes.items()}.items(): 249 if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')): 250 new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float') 251 new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int') 252 new_is_numeric = col in new_numeric_cols_existing 253 old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float') 254 old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int') 255 old_is_numeric = col in old_numeric_cols 256 257 if ( 258 (new_is_float or new_is_int or new_is_numeric) 259 and 260 (old_is_float or old_is_int or old_is_numeric) 261 ): 262 dtypes[col] = attempt_cast_to_numeric 263 cast_cols = True 264 continue 265 266 ### Fallback to object if the types don't match. 267 warn( 268 f"Detected different types for '{col}' " 269 + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), " 270 + "falling back to 'object'..." 271 ) 272 dtypes[col] = 'object' 273 cast_cols = True 274 275 if cast_cols: 276 for col, dtype in dtypes.items(): 277 if col in new_df.columns: 278 try: 279 new_df[col] = ( 280 new_df[col].astype(dtype) 281 if not callable(dtype) 282 else new_df[col].apply(dtype) 283 ) 284 except Exception as e: 285 warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}") 286 287 serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str) 288 new_json_cols = get_json_cols(new_df) 289 old_json_cols = get_json_cols(old_df) 290 json_cols = set(new_json_cols + old_json_cols) 291 for json_col in old_json_cols: 292 old_df[json_col] = old_df[json_col].apply(serializer) 293 for json_col in new_json_cols: 294 new_df[json_col] = new_df[json_col].apply(serializer) 295 296 new_numeric_cols = get_numeric_cols(new_df) 297 numeric_cols = set(new_numeric_cols + old_numeric_cols) 298 for numeric_col in old_numeric_cols: 299 old_df[numeric_col] = old_df[numeric_col].apply( 300 lambda x: f'{x:f}' if isinstance(x, Decimal) else x 301 ) 302 for numeric_col in new_numeric_cols: 303 new_df[numeric_col] = new_df[numeric_col].apply( 304 lambda x: f'{x:f}' if isinstance(x, Decimal) else x 305 ) 306 307 old_dt_cols = [ 308 col 309 for col, typ in old_df.dtypes.items() 310 if are_dtypes_equal(str(typ), 'datetime') 311 ] 312 for col in old_dt_cols: 313 old_df[col] = coerce_timezone(old_df[col]) 314 315 new_dt_cols = [ 316 col 317 for col, typ in old_df.dtypes.items() 318 if are_dtypes_equal(str(typ), 'datetime') 319 ] 320 for col in new_dt_cols: 321 new_df[col] = coerce_timezone(new_df[col]) 322 323 old_uuid_cols = get_uuid_cols(old_df) 324 new_uuid_cols = get_uuid_cols(new_df) 325 uuid_cols = set(new_uuid_cols + old_uuid_cols) 326 joined_df = merge( 327 new_df.infer_objects(copy=False).fillna(NA), 328 old_df.infer_objects(copy=False).fillna(NA), 329 how='left', 330 on=None, 331 indicator=True, 332 ) 333 changed_rows_mask = (joined_df['_merge'] == 'left_only') 334 new_cols = list(new_df_dtypes) 335 delta_df = joined_df[new_cols][changed_rows_mask].reset_index(drop=True) 336 337 for json_col in json_cols: 338 if json_col not in delta_df.columns: 339 continue 340 try: 341 delta_df[json_col] = delta_df[json_col].apply(json.loads) 342 except Exception: 343 warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}") 344 345 for numeric_col in numeric_cols: 346 if numeric_col not in delta_df.columns: 347 continue 348 try: 349 delta_df[numeric_col] = delta_df[numeric_col].apply(attempt_cast_to_numeric) 350 except Exception: 351 warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}") 352 353 for uuid_col in uuid_cols: 354 if uuid_col not in delta_df.columns: 355 continue 356 try: 357 delta_df[uuid_col] = delta_df[uuid_col].apply(attempt_cast_to_uuid) 358 except Exception: 359 warn(f"Unable to parse numeric column '{uuid_col}':\n{traceback.format_exc()}") 360 361 return delta_df
Left join two DataFrames to find the newest unseen data.
Parameters
- old_df ('pd.DataFrame'):
The original (target) dataframe. Acts as a filter on the
new_df
. - new_df ('pd.DataFrame'):
The fetched (source) dataframe. Rows that are contained in
old_df
are removed. - safe_copy (bool, default True):
If
True
, create a copy before comparing and modifying the dataframes. Setting toFalse
may mutate the DataFrames. - dtypes (Optional[Dict[str, Any]], default None): Optionally specify the datatypes of the dataframe.
- include_unchanged_columns (bool, default False):
If
True
, include columns which haven't changed on rows which have changed. - debug (bool, default False): Verbosity toggle.
Returns
- A pandas dataframe of the new, unseen rows in
new_df
.
Examples
>>> import pandas as pd
>>> df1 = pd.DataFrame({'a': [1,2]})
>>> df2 = pd.DataFrame({'a': [2,3]})
>>> filter_unseen_df(df1, df2)
a
0 3
364def parse_df_datetimes( 365 df: 'pd.DataFrame', 366 ignore_cols: Optional[Iterable[str]] = None, 367 chunksize: Optional[int] = None, 368 dtype_backend: str = 'numpy_nullable', 369 debug: bool = False, 370) -> 'pd.DataFrame': 371 """ 372 Parse a pandas DataFrame for datetime columns and cast as datetimes. 373 374 Parameters 375 ---------- 376 df: pd.DataFrame 377 The pandas DataFrame to parse. 378 379 ignore_cols: Optional[Iterable[str]], default None 380 If provided, do not attempt to coerce these columns as datetimes. 381 382 chunksize: Optional[int], default None 383 If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe. 384 385 dtype_backend: str, default 'numpy_nullable' 386 If `df` is not a DataFrame and new one needs to be constructed, 387 use this as the datatypes backend. 388 Accepted values are 'numpy_nullable' and 'pyarrow'. 389 390 debug: bool, default False 391 Verbosity toggle. 392 393 Returns 394 ------- 395 A new pandas DataFrame with the determined datetime columns 396 (usually ISO strings) cast as datetimes. 397 398 Examples 399 -------- 400 ```python 401 >>> import pandas as pd 402 >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 403 >>> df.dtypes 404 a object 405 dtype: object 406 >>> df = parse_df_datetimes(df) 407 >>> df.dtypes 408 a datetime64[ns] 409 dtype: object 410 411 ``` 412 413 """ 414 from meerschaum.utils.packages import import_pandas, attempt_import 415 from meerschaum.utils.debug import dprint 416 from meerschaum.utils.warnings import warn 417 from meerschaum.utils.misc import items_str 418 import traceback 419 pd = import_pandas() 420 pandas = attempt_import('pandas') 421 pd_name = pd.__name__ 422 using_dask = 'dask' in pd_name 423 df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__) 424 dask_dataframe = None 425 if using_dask or df_is_dask: 426 npartitions = chunksize_to_npartitions(chunksize) 427 dask_dataframe = attempt_import('dask.dataframe') 428 429 ### if df is a dict, build DataFrame 430 if isinstance(df, pandas.DataFrame): 431 pdf = df 432 elif df_is_dask and isinstance(df, dask_dataframe.DataFrame): 433 pdf = get_first_valid_dask_partition(df) 434 else: 435 if debug: 436 dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...") 437 438 if using_dask: 439 if isinstance(df, list): 440 keys = set() 441 for doc in df: 442 for key in doc: 443 keys.add(key) 444 df = pd.DataFrame.from_dict( 445 { 446 k: [ 447 doc.get(k, None) 448 for doc in df 449 ] for k in keys 450 }, 451 npartitions = npartitions, 452 ) 453 elif isinstance(df, dict): 454 df = pd.DataFrame.from_dict(df, npartitions=npartitions) 455 elif 'pandas.core.frame.DataFrame' in str(type(df)): 456 df = pd.from_pandas(df, npartitions=npartitions) 457 else: 458 raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.") 459 pandas = attempt_import('pandas') 460 pdf = get_first_valid_dask_partition(df) 461 462 else: 463 df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend) 464 pdf = df 465 466 ### skip parsing if DataFrame is empty 467 if len(pdf) == 0: 468 if debug: 469 dprint(f"df is empty. Returning original DataFrame without casting datetime columns...") 470 return df 471 472 ignore_cols = set( 473 (ignore_cols or []) + [ 474 col 475 for col, dtype in pdf.dtypes.items() 476 if 'datetime' in str(dtype) 477 ] 478 ) 479 cols_to_inspect = [col for col in pdf.columns if col not in ignore_cols] 480 481 if len(cols_to_inspect) == 0: 482 if debug: 483 dprint(f"All columns are ignored, skipping datetime detection...") 484 return df 485 486 ### apply regex to columns to determine which are ISO datetimes 487 iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+' 488 dt_mask = pdf[cols_to_inspect].astype(str).apply( 489 lambda s: s.str.match(iso_dt_regex).all() 490 ) 491 492 ### list of datetime column names 493 datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]] 494 if not datetime_cols: 495 if debug: 496 dprint("No columns detected as datetimes, returning...") 497 return df 498 499 if debug: 500 dprint("Converting columns to datetimes: " + str(datetime_cols)) 501 502 try: 503 if not using_dask: 504 df[datetime_cols] = df[datetime_cols].apply(pd.to_datetime, utc=True) 505 else: 506 df[datetime_cols] = df[datetime_cols].apply( 507 pd.to_datetime, 508 utc=True, 509 axis=1, 510 meta={ 511 col: 'datetime64[ns]' 512 for col in datetime_cols 513 } 514 ) 515 except Exception: 516 warn( 517 f"Unable to apply `pd.to_datetime` to {items_str(datetime_cols)}:\n" 518 + f"{traceback.format_exc()}" 519 ) 520 521 for dt in datetime_cols: 522 try: 523 df[dt] = df[dt].dt.tz_localize(None) 524 except Exception: 525 warn(f"Unable to convert column '{dt}' to naive datetime:\n{traceback.format_exc()}") 526 527 return df
Parse a pandas DataFrame for datetime columns and cast as datetimes.
Parameters
- df (pd.DataFrame): The pandas DataFrame to parse.
- ignore_cols (Optional[Iterable[str]], default None): If provided, do not attempt to coerce these columns as datetimes.
- chunksize (Optional[int], default None):
If the pandas implementation is
'dask'
, use this chunksize for the distributed dataframe. - dtype_backend (str, default 'numpy_nullable'):
If
df
is not a DataFrame and new one needs to be constructed, use this as the datatypes backend. Accepted values are 'numpy_nullable' and 'pyarrow'. - debug (bool, default False): Verbosity toggle.
Returns
- A new pandas DataFrame with the determined datetime columns
- (usually ISO strings) cast as datetimes.
Examples
>>> import pandas as pd
>>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']})
>>> df.dtypes
a object
dtype: object
>>> df = parse_df_datetimes(df)
>>> df.dtypes
a datetime64[ns]
dtype: object
530def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]: 531 """ 532 Get the columns which contain unhashable objects from a Pandas DataFrame. 533 534 Parameters 535 ---------- 536 df: pd.DataFrame 537 The DataFrame which may contain unhashable objects. 538 539 Returns 540 ------- 541 A list of columns. 542 """ 543 if df is None: 544 return [] 545 if len(df) == 0: 546 return [] 547 548 is_dask = 'dask' in df.__module__ 549 if is_dask: 550 from meerschaum.utils.packages import attempt_import 551 pandas = attempt_import('pandas') 552 df = pandas.DataFrame(get_first_valid_dask_partition(df)) 553 return [ 554 col for col, val in df.iloc[0].items() 555 if not isinstance(val, Hashable) 556 ]
Get the columns which contain unhashable objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
- A list of columns.
559def get_json_cols(df: 'pd.DataFrame') -> List[str]: 560 """ 561 Get the columns which contain unhashable objects from a Pandas DataFrame. 562 563 Parameters 564 ---------- 565 df: pd.DataFrame 566 The DataFrame which may contain unhashable objects. 567 568 Returns 569 ------- 570 A list of columns to be encoded as JSON. 571 """ 572 if df is None: 573 return [] 574 575 is_dask = 'dask' in df.__module__ if hasattr(df, '__module__') else False 576 if is_dask: 577 df = get_first_valid_dask_partition(df) 578 579 if len(df) == 0: 580 return [] 581 582 cols_indices = { 583 col: df[col].first_valid_index() 584 for col in df.columns 585 } 586 return [ 587 col 588 for col, ix in cols_indices.items() 589 if ( 590 ix is not None 591 and 592 not isinstance(df.loc[ix][col], Hashable) 593 ) 594 ]
Get the columns which contain unhashable objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
- A list of columns to be encoded as JSON.
597def get_numeric_cols(df: 'pd.DataFrame') -> List[str]: 598 """ 599 Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame. 600 601 Parameters 602 ---------- 603 df: pd.DataFrame 604 The DataFrame which may contain decimal objects. 605 606 Returns 607 ------- 608 A list of columns to treat as numerics. 609 """ 610 if df is None: 611 return [] 612 from decimal import Decimal 613 is_dask = 'dask' in df.__module__ 614 if is_dask: 615 df = get_first_valid_dask_partition(df) 616 617 if len(df) == 0: 618 return [] 619 620 cols_indices = { 621 col: df[col].first_valid_index() 622 for col in df.columns 623 } 624 return [ 625 col 626 for col, ix in cols_indices.items() 627 if ( 628 ix is not None 629 and 630 isinstance(df.loc[ix][col], Decimal) 631 ) 632 ]
Get the columns which contain decimal.Decimal
objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain decimal objects.
Returns
- A list of columns to treat as numerics.
635def get_uuid_cols(df: 'pd.DataFrame') -> List[str]: 636 """ 637 Get the columns which contain `uuid.UUID` objects from a Pandas DataFrame. 638 639 Parameters 640 ---------- 641 df: pd.DataFrame 642 The DataFrame which may contain UUID objects. 643 644 Returns 645 ------- 646 A list of columns to treat as numerics. 647 """ 648 if df is None: 649 return [] 650 from uuid import UUID 651 is_dask = 'dask' in df.__module__ 652 if is_dask: 653 df = get_first_valid_dask_partition(df) 654 655 if len(df) == 0: 656 return [] 657 658 cols_indices = { 659 col: df[col].first_valid_index() 660 for col in df.columns 661 } 662 return [ 663 col 664 for col, ix in cols_indices.items() 665 if ( 666 ix is not None 667 and 668 isinstance(df.loc[ix][col], UUID) 669 ) 670 ]
Get the columns which contain uuid.UUID
objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain UUID objects.
Returns
- A list of columns to treat as numerics.
673def enforce_dtypes( 674 df: 'pd.DataFrame', 675 dtypes: Dict[str, str], 676 safe_copy: bool = True, 677 coerce_numeric: bool = True, 678 debug: bool = False, 679) -> 'pd.DataFrame': 680 """ 681 Enforce the `dtypes` dictionary on a DataFrame. 682 683 Parameters 684 ---------- 685 df: pd.DataFrame 686 The DataFrame on which to enforce dtypes. 687 688 dtypes: Dict[str, str] 689 The data types to attempt to enforce on the DataFrame. 690 691 safe_copy: bool, default True 692 If `True`, create a copy before comparing and modifying the dataframes. 693 Setting to `False` may mutate the DataFrames. 694 See `meerschaum.utils.dataframe.filter_unseen_df`. 695 696 coerce_numeric: bool, default True 697 If `True`, convert float and int collisions to numeric. 698 699 debug: bool, default False 700 Verbosity toggle. 701 702 Returns 703 ------- 704 The Pandas DataFrame with the types enforced. 705 """ 706 import json 707 import traceback 708 from decimal import Decimal 709 from meerschaum.utils.debug import dprint 710 from meerschaum.utils.warnings import warn 711 from meerschaum.utils.formatting import pprint 712 from meerschaum.config.static import STATIC_CONFIG 713 from meerschaum.utils.packages import import_pandas 714 from meerschaum.utils.dtypes import ( 715 are_dtypes_equal, 716 to_pandas_dtype, 717 is_dtype_numeric, 718 attempt_cast_to_numeric, 719 attempt_cast_to_uuid, 720 coerce_timezone, 721 ) 722 if safe_copy: 723 df = df.copy() 724 if len(df.columns) == 0: 725 if debug: 726 dprint("Incoming DataFrame has no columns. Skipping enforcement...") 727 return df 728 729 pipe_pandas_dtypes = { 730 col: to_pandas_dtype(typ) 731 for col, typ in dtypes.items() 732 } 733 json_cols = [ 734 col 735 for col, typ in dtypes.items() 736 if typ == 'json' 737 ] 738 numeric_cols = [ 739 col 740 for col, typ in dtypes.items() 741 if typ == 'numeric' 742 ] 743 uuid_cols = [ 744 col 745 for col, typ in dtypes.items() 746 if typ == 'uuid' 747 ] 748 df_numeric_cols = get_numeric_cols(df) 749 if debug: 750 dprint("Desired data types:") 751 pprint(dtypes) 752 dprint("Data types for incoming DataFrame:") 753 pprint({_col: str(_typ) for _col, _typ in df.dtypes.items()}) 754 755 if json_cols and len(df) > 0: 756 if debug: 757 dprint(f"Checking columns for JSON encoding: {json_cols}") 758 for col in json_cols: 759 if col in df.columns: 760 try: 761 df[col] = df[col].apply( 762 ( 763 lambda x: ( 764 json.loads(x) 765 if isinstance(x, str) 766 else x 767 ) 768 ) 769 ) 770 except Exception as e: 771 if debug: 772 dprint(f"Unable to parse column '{col}' as JSON:\n{e}") 773 774 if numeric_cols: 775 if debug: 776 dprint(f"Checking for numerics: {numeric_cols}") 777 for col in numeric_cols: 778 if col in df.columns: 779 try: 780 df[col] = df[col].apply(attempt_cast_to_numeric) 781 except Exception as e: 782 if debug: 783 dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}") 784 785 if uuid_cols: 786 if debug: 787 dprint(f"Checking for UUIDs: {uuid_cols}") 788 for col in uuid_cols: 789 if col in df.columns: 790 try: 791 df[col] = df[col].apply(attempt_cast_to_uuid) 792 except Exception as e: 793 if debug: 794 dprint(f"Unable to parse column '{col}' as UUID:\n{e}") 795 796 df_dtypes = {c: str(t) for c, t in df.dtypes.items()} 797 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 798 if debug: 799 dprint("Data types match. Exiting enforcement...") 800 return df 801 802 common_dtypes = {} 803 common_diff_dtypes = {} 804 for col, typ in pipe_pandas_dtypes.items(): 805 if col in df_dtypes: 806 common_dtypes[col] = typ 807 if not are_dtypes_equal(typ, df_dtypes[col]): 808 common_diff_dtypes[col] = df_dtypes[col] 809 810 if debug: 811 dprint("Common columns with different dtypes:") 812 pprint(common_diff_dtypes) 813 814 detected_dt_cols = {} 815 for col, typ in common_diff_dtypes.items(): 816 if 'datetime' in typ and 'datetime' in common_dtypes[col]: 817 df_dtypes[col] = typ 818 detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col]) 819 for col in detected_dt_cols: 820 del common_diff_dtypes[col] 821 822 if debug: 823 dprint("Common columns with different dtypes (after dates):") 824 pprint(common_diff_dtypes) 825 826 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 827 if debug: 828 dprint( 829 "The incoming DataFrame has mostly the same types, skipping enforcement." 830 + "The only detected difference was in the following datetime columns.\n" 831 + " Timezone information may be stripped." 832 ) 833 pprint(detected_dt_cols) 834 return df 835 836 for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items(): 837 previous_typ = common_dtypes[col] 838 mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ)) 839 explicitly_float = are_dtypes_equal(dtypes.get(col, 'object'), 'float') 840 explicitly_numeric = dtypes.get(col, 'numeric') == 'numeric' 841 cast_to_numeric = ( 842 explicitly_numeric 843 or col in df_numeric_cols 844 or (mixed_numeric_types and not explicitly_float) 845 ) and coerce_numeric 846 if cast_to_numeric: 847 common_dtypes[col] = attempt_cast_to_numeric 848 common_diff_dtypes[col] = attempt_cast_to_numeric 849 850 for d in common_diff_dtypes: 851 t = common_dtypes[d] 852 if debug: 853 dprint(f"Casting column {d} to dtype {t}.") 854 try: 855 df[d] = ( 856 df[d].apply(t) 857 if callable(t) 858 else df[d].astype(t) 859 ) 860 except Exception as e: 861 if debug: 862 dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}") 863 if 'int' in str(t).lower(): 864 try: 865 df[d] = df[d].astype('float64').astype(t) 866 except Exception: 867 if debug: 868 dprint(f"Was unable to convert to float then {t}.") 869 return df
Enforce the dtypes
dictionary on a DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame on which to enforce dtypes.
- dtypes (Dict[str, str]): The data types to attempt to enforce on the DataFrame.
- safe_copy (bool, default True):
If
True
, create a copy before comparing and modifying the dataframes. Setting toFalse
may mutate the DataFrames. Seemeerschaum.utils.dataframe.filter_unseen_df
. - coerce_numeric (bool, default True):
If
True
, convert float and int collisions to numeric. - debug (bool, default False): Verbosity toggle.
Returns
- The Pandas DataFrame with the types enforced.
872def get_datetime_bound_from_df( 873 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]], 874 datetime_column: str, 875 minimum: bool = True, 876) -> Union[int, datetime, None]: 877 """ 878 Return the minimum or maximum datetime (or integer) from a DataFrame. 879 880 Parameters 881 ---------- 882 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]] 883 The DataFrame, list, or dict which contains the range axis. 884 885 datetime_column: str 886 The name of the datetime (or int) column. 887 888 minimum: bool 889 Whether to return the minimum (default) or maximum value. 890 891 Returns 892 ------- 893 The minimum or maximum datetime value in the dataframe, or `None`. 894 """ 895 if df is None: 896 return None 897 if not datetime_column: 898 return None 899 900 def compare(a, b): 901 if a is None: 902 return b 903 if b is None: 904 return a 905 if minimum: 906 return a if a < b else b 907 return a if a > b else b 908 909 if isinstance(df, list): 910 if len(df) == 0: 911 return None 912 best_yet = df[0].get(datetime_column, None) 913 for doc in df: 914 val = doc.get(datetime_column, None) 915 best_yet = compare(best_yet, val) 916 return best_yet 917 918 if isinstance(df, dict): 919 if datetime_column not in df: 920 return None 921 best_yet = df[datetime_column][0] 922 for val in df[datetime_column]: 923 best_yet = compare(best_yet, val) 924 return best_yet 925 926 if 'DataFrame' in str(type(df)): 927 from meerschaum.utils.dtypes import are_dtypes_equal 928 pandas = mrsm.attempt_import('pandas') 929 is_dask = 'dask' in df.__module__ 930 931 if datetime_column not in df.columns: 932 return None 933 934 dt_val = ( 935 df[datetime_column].min(skipna=True) 936 if minimum else df[datetime_column].max(skipna=True) 937 ) 938 if is_dask and dt_val is not None: 939 dt_val = dt_val.compute() 940 941 return ( 942 pandas.to_datetime(dt_val).to_pydatetime() 943 if are_dtypes_equal(str(type(dt_val)), 'datetime') 944 else (dt_val if dt_val is not pandas.NA else None) 945 ) 946 947 return None
Return the minimum or maximum datetime (or integer) from a DataFrame.
Parameters
- df (Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]): The DataFrame, list, or dict which contains the range axis.
- datetime_column (str): The name of the datetime (or int) column.
- minimum (bool): Whether to return the minimum (default) or maximum value.
Returns
- The minimum or maximum datetime value in the dataframe, or
None
.
950def get_unique_index_values( 951 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]], 952 indices: List[str], 953) -> Dict[str, List[Any]]: 954 """ 955 Return a dictionary of the unique index values in a DataFrame. 956 957 Parameters 958 ---------- 959 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]] 960 The dataframe (or list or dict) which contains index values. 961 962 indices: List[str] 963 The list of index columns. 964 965 Returns 966 ------- 967 A dictionary mapping indices to unique values. 968 """ 969 if df is None: 970 return {} 971 if 'dataframe' in str(type(df)).lower(): 972 pandas = mrsm.attempt_import('pandas') 973 return { 974 col: list({ 975 (val if val is not pandas.NA else None) 976 for val in df[col].unique() 977 }) 978 for col in indices 979 if col in df.columns 980 } 981 982 unique_indices = defaultdict(lambda: set()) 983 if isinstance(df, list): 984 for doc in df: 985 for index in indices: 986 if index in doc: 987 unique_indices[index].add(doc[index]) 988 989 elif isinstance(df, dict): 990 for index in indices: 991 if index in df: 992 unique_indices[index] = unique_indices[index].union(set(df[index])) 993 994 return {key: list(val) for key, val in unique_indices.items()}
Return a dictionary of the unique index values in a DataFrame.
Parameters
- df (Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]): The dataframe (or list or dict) which contains index values.
- indices (List[str]): The list of index columns.
Returns
- A dictionary mapping indices to unique values.
997def df_is_chunk_generator(df: Any) -> bool: 998 """ 999 Determine whether to treat `df` as a chunk generator. 1000 1001 Note this should only be used in a context where generators are expected, 1002 as it will return `True` for any iterable. 1003 1004 Parameters 1005 ---------- 1006 The DataFrame or chunk generator to evaluate. 1007 1008 Returns 1009 ------- 1010 A `bool` indicating whether to treat `df` as a generator. 1011 """ 1012 return ( 1013 not isinstance(df, (dict, list, str)) 1014 and 'DataFrame' not in str(type(df)) 1015 and isinstance(df, (Generator, Iterable, Iterator)) 1016 )
Determine whether to treat df
as a chunk generator.
Note this should only be used in a context where generators are expected,
as it will return True
for any iterable.
Parameters
- The DataFrame or chunk generator to evaluate.
Returns
- A
bool
indicating whether to treatdf
as a generator.
1019def chunksize_to_npartitions(chunksize: Optional[int]) -> int: 1020 """ 1021 Return the Dask `npartitions` value for a given `chunksize`. 1022 """ 1023 if chunksize == -1: 1024 from meerschaum.config import get_config 1025 chunksize = get_config('system', 'connectors', 'sql', 'chunksize') 1026 if chunksize is None: 1027 return 1 1028 return -1 * chunksize
Return the Dask npartitions
value for a given chunksize
.
1031def df_from_literal( 1032 pipe: Optional[mrsm.Pipe] = None, 1033 literal: str = None, 1034 debug: bool = False 1035) -> 'pd.DataFrame': 1036 """ 1037 Construct a dataframe from a literal value, using the pipe's datetime and value column names. 1038 1039 Parameters 1040 ---------- 1041 pipe: Optional['meerschaum.Pipe'], default None 1042 The pipe which will consume the literal value. 1043 1044 Returns 1045 ------- 1046 A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns 1047 and the literal as the value. 1048 """ 1049 from meerschaum.utils.packages import import_pandas 1050 from meerschaum.utils.warnings import error, warn 1051 from meerschaum.utils.debug import dprint 1052 1053 if pipe is None or literal is None: 1054 error("Please provide a Pipe and a literal value") 1055 ### this will raise an error if the columns are undefined 1056 dt_name, val_name = pipe.get_columns('datetime', 'value') 1057 1058 val = literal 1059 if isinstance(literal, str): 1060 if debug: 1061 dprint(f"Received literal string: '{literal}'") 1062 import ast 1063 try: 1064 val = ast.literal_eval(literal) 1065 except Exception as e: 1066 warn( 1067 "Failed to parse value from string:\n" + f"{literal}" + 1068 "\n\nWill cast as a string instead."\ 1069 ) 1070 val = literal 1071 1072 from datetime import datetime, timezone 1073 now = datetime.now(timezone.utc).replace(tzinfo=None) 1074 1075 pd = import_pandas() 1076 return pd.DataFrame({dt_name: [now], val_name: [val]})
Construct a dataframe from a literal value, using the pipe's datetime and value column names.
Parameters
- pipe (Optional['meerschaum.Pipe'], default None): The pipe which will consume the literal value.
Returns
- A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
- and the literal as the value.
1079def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]: 1080 """ 1081 Return the first valid Dask DataFrame partition (if possible). 1082 """ 1083 pdf = None 1084 for partition in ddf.partitions: 1085 try: 1086 pdf = partition.compute() 1087 except Exception as e: 1088 continue 1089 if len(pdf) > 0: 1090 return pdf 1091 _ = mrsm.attempt_import('partd', lazy=False) 1092 return ddf.compute()
Return the first valid Dask DataFrame partition (if possible).
1095def query_df( 1096 df: 'pd.DataFrame', 1097 params: Optional[Dict[str, Any]] = None, 1098 begin: Union[datetime, int, None] = None, 1099 end: Union[datetime, int, None] = None, 1100 datetime_column: Optional[str] = None, 1101 select_columns: Optional[List[str]] = None, 1102 omit_columns: Optional[List[str]] = None, 1103 inplace: bool = False, 1104 reset_index: bool = False, 1105 coerce_types: bool = False, 1106 debug: bool = False, 1107) -> 'pd.DataFrame': 1108 """ 1109 Query the dataframe with the params dictionary. 1110 1111 Parameters 1112 ---------- 1113 df: pd.DataFrame 1114 The DataFrame to query against. 1115 1116 params: Optional[Dict[str, Any]], default None 1117 The parameters dictionary to use for the query. 1118 1119 begin: Union[datetime, int, None], default None 1120 If `begin` and `datetime_column` are provided, only return rows with a timestamp 1121 greater than or equal to this value. 1122 1123 end: Union[datetime, int, None], default None 1124 If `begin` and `datetime_column` are provided, only return rows with a timestamp 1125 less than this value. 1126 1127 datetime_column: Optional[str], default None 1128 A `datetime_column` must be provided to use `begin` and `end`. 1129 1130 select_columns: Optional[List[str]], default None 1131 If provided, only return these columns. 1132 1133 omit_columns: Optional[List[str]], default None 1134 If provided, do not include these columns in the result. 1135 1136 inplace: bool, default False 1137 If `True`, modify the DataFrame inplace rather than creating a new DataFrame. 1138 1139 reset_index: bool, default False 1140 If `True`, reset the index in the resulting DataFrame. 1141 1142 coerce_types: bool, default False 1143 If `True`, cast the dataframe and parameters as strings before querying. 1144 1145 Returns 1146 ------- 1147 A Pandas DataFrame query result. 1148 """ 1149 1150 def _process_select_columns(_df): 1151 if not select_columns: 1152 return 1153 for col in list(_df.columns): 1154 if col not in select_columns: 1155 del _df[col] 1156 1157 def _process_omit_columns(_df): 1158 if not omit_columns: 1159 return 1160 for col in list(_df.columns): 1161 if col in omit_columns: 1162 del _df[col] 1163 1164 if not params and not begin and not end: 1165 if not inplace: 1166 df = df.copy() 1167 _process_select_columns(df) 1168 _process_omit_columns(df) 1169 return df 1170 1171 from meerschaum.utils.debug import dprint 1172 from meerschaum.utils.misc import get_in_ex_params 1173 from meerschaum.utils.warnings import warn 1174 from meerschaum.utils.dtypes import are_dtypes_equal, value_is_null 1175 dateutil_parser = mrsm.attempt_import('dateutil.parser') 1176 pandas = mrsm.attempt_import('pandas') 1177 NA = pandas.NA 1178 1179 if params: 1180 params = params.copy() 1181 for key, val in {k: v for k, v in params.items()}.items(): 1182 if isinstance(val, (list, tuple)): 1183 if None in val: 1184 val = [item for item in val if item is not None] + [NA] 1185 params[key] = val 1186 if coerce_types: 1187 params[key] = [str(x) for x in val] 1188 else: 1189 if value_is_null(val): 1190 val = NA 1191 params[key] = NA 1192 if coerce_types: 1193 params[key] = str(val) 1194 1195 dtypes = {col: str(typ) for col, typ in df.dtypes.items()} 1196 1197 if inplace: 1198 df.fillna(NA, inplace=True) 1199 else: 1200 df = df.infer_objects().fillna(NA) 1201 1202 if isinstance(begin, str): 1203 begin = dateutil_parser.parse(begin) 1204 if isinstance(end, str): 1205 end = dateutil_parser.parse(end) 1206 1207 if begin is not None or end is not None: 1208 if not datetime_column or datetime_column not in df.columns: 1209 warn( 1210 f"The datetime column '{datetime_column}' is not present in the Dataframe, " 1211 + "ignoring begin and end...", 1212 ) 1213 begin, end = None, None 1214 1215 if debug: 1216 dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}") 1217 1218 if datetime_column and (begin is not None or end is not None): 1219 if debug: 1220 dprint("Checking for datetime column compatability.") 1221 1222 from meerschaum.utils.dtypes import are_dtypes_equal, coerce_timezone 1223 df_is_dt = are_dtypes_equal(str(df.dtypes[datetime_column]), 'datetime') 1224 begin_is_int = are_dtypes_equal(str(type(begin)), 'int') 1225 end_is_int = are_dtypes_equal(str(type(end)), 'int') 1226 1227 if df_is_dt: 1228 df_tz = ( 1229 getattr(df[datetime_column].dt, 'tz', None) 1230 if hasattr(df[datetime_column], 'dt') 1231 else None 1232 ) 1233 1234 if begin_is_int: 1235 begin = datetime.fromtimestamp(int(begin), timezone.utc).replace(tzinfo=None) 1236 if debug: 1237 dprint(f"`begin` will be cast to '{begin}'.") 1238 if end_is_int: 1239 end = datetime.fromtimestamp(int(end), timezone.utc).replace(tzinfo=None) 1240 if debug: 1241 dprint(f"`end` will be cast to '{end}'.") 1242 1243 begin_tz = begin.tzinfo if begin is not None else None 1244 end_tz = end.tzinfo if end is not None else None 1245 1246 if begin_tz is not None or end_tz is not None or df_tz is not None: 1247 begin = coerce_timezone(begin) 1248 end = coerce_timezone(end) 1249 if df_tz is not None: 1250 if debug: 1251 dprint(f"Casting column '{datetime_column}' to UTC...") 1252 df[datetime_column] = coerce_timezone(df[datetime_column]) 1253 dprint(f"Using datetime bounds:\n{begin=}\n{end=}") 1254 1255 in_ex_params = get_in_ex_params(params) 1256 1257 masks = [ 1258 ( 1259 (df[datetime_column] >= begin) 1260 if begin is not None and datetime_column 1261 else True 1262 ) & ( 1263 (df[datetime_column] < end) 1264 if end is not None and datetime_column 1265 else True 1266 ) 1267 ] 1268 1269 masks.extend([ 1270 ( 1271 ( 1272 (df[col] if not coerce_types else df[col].astype(str)).isin(in_vals) 1273 if in_vals 1274 else True 1275 ) & ( 1276 ~(df[col] if not coerce_types else df[col].astype(str)).isin(ex_vals) 1277 if ex_vals 1278 else True 1279 ) 1280 ) 1281 for col, (in_vals, ex_vals) in in_ex_params.items() 1282 if col in df.columns 1283 ]) 1284 query_mask = masks[0] 1285 for mask in masks[1:]: 1286 query_mask = query_mask & mask 1287 1288 original_cols = df.columns 1289 1290 ### NOTE: We must cast bool columns to `boolean[pyarrow]` 1291 ### to allow for `<NA>` values. 1292 bool_cols = [ 1293 col 1294 for col, typ in df.dtypes.items() 1295 if are_dtypes_equal(str(typ), 'bool') 1296 ] 1297 for col in bool_cols: 1298 df[col] = df[col].astype('boolean[pyarrow]') 1299 df['__mrsm_mask'] = query_mask.astype('boolean[pyarrow]') 1300 1301 if inplace: 1302 df.where(query_mask, other=NA, inplace=True) 1303 df.dropna(how='all', inplace=True) 1304 result_df = df 1305 else: 1306 result_df = df.where(query_mask, other=NA) 1307 result_df.dropna(how='all', inplace=True) 1308 1309 if '__mrsm_mask' in df.columns: 1310 del df['__mrsm_mask'] 1311 if '__mrsm_mask' in result_df.columns: 1312 del result_df['__mrsm_mask'] 1313 1314 if reset_index: 1315 result_df.reset_index(drop=True, inplace=True) 1316 1317 result_df = enforce_dtypes( 1318 result_df, 1319 dtypes, 1320 safe_copy=False, 1321 debug=debug, 1322 coerce_numeric=False, 1323 ) 1324 1325 if select_columns == ['*']: 1326 select_columns = None 1327 1328 if not select_columns and not omit_columns: 1329 return result_df[original_cols] 1330 1331 _process_select_columns(result_df) 1332 _process_omit_columns(result_df) 1333 1334 return result_df
Query the dataframe with the params dictionary.
Parameters
- df (pd.DataFrame): The DataFrame to query against.
- params (Optional[Dict[str, Any]], default None): The parameters dictionary to use for the query.
- begin (Union[datetime, int, None], default None):
If
begin
anddatetime_column
are provided, only return rows with a timestamp greater than or equal to this value. - end (Union[datetime, int, None], default None):
If
begin
anddatetime_column
are provided, only return rows with a timestamp less than this value. - datetime_column (Optional[str], default None):
A
datetime_column
must be provided to usebegin
andend
. - select_columns (Optional[List[str]], default None): If provided, only return these columns.
- omit_columns (Optional[List[str]], default None): If provided, do not include these columns in the result.
- inplace (bool, default False):
If
True
, modify the DataFrame inplace rather than creating a new DataFrame. - reset_index (bool, default False):
If
True
, reset the index in the resulting DataFrame. - coerce_types (bool, default False):
If
True
, cast the dataframe and parameters as strings before querying.
Returns
- A Pandas DataFrame query result.
1337def to_json( 1338 df: 'pd.DataFrame', 1339 safe_copy: bool = True, 1340 orient: str = 'records', 1341 date_format: str = 'iso', 1342 date_unit: str = 'us', 1343 **kwargs: Any 1344) -> str: 1345 """ 1346 Serialize the given dataframe as a JSON string. 1347 1348 Parameters 1349 ---------- 1350 df: pd.DataFrame 1351 The DataFrame to be serialized. 1352 1353 safe_copy: bool, default True 1354 If `False`, modify the DataFrame inplace. 1355 1356 date_format: str, default 'iso' 1357 The default format for timestamps. 1358 1359 date_unit: str, default 'us' 1360 The precision of the timestamps. 1361 1362 Returns 1363 ------- 1364 A JSON string. 1365 """ 1366 from meerschaum.utils.packages import import_pandas 1367 pd = import_pandas() 1368 uuid_cols = get_uuid_cols(df) 1369 if uuid_cols and safe_copy: 1370 df = df.copy() 1371 for col in uuid_cols: 1372 df[col] = df[col].astype(str) 1373 return df.infer_objects(copy=False).fillna(pd.NA).to_json( 1374 date_format=date_format, 1375 date_unit=date_unit, 1376 orient=orient, 1377 **kwargs 1378 )
Serialize the given dataframe as a JSON string.
Parameters
- df (pd.DataFrame): The DataFrame to be serialized.
- safe_copy (bool, default True):
If
False
, modify the DataFrame inplace. - date_format (str, default 'iso'): The default format for timestamps.
- date_unit (str, default 'us'): The precision of the timestamps.
Returns
- A JSON string.