meerschaum.utils.dataframe
Utility functions for working with DataFrames.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Utility functions for working with DataFrames. 7""" 8 9from __future__ import annotations 10 11import pathlib 12from datetime import datetime, timezone 13from collections import defaultdict 14 15import meerschaum as mrsm 16from meerschaum.utils.typing import ( 17 Optional, Dict, Any, List, Hashable, Generator, 18 Iterator, Iterable, Union, TYPE_CHECKING, 19) 20 21if TYPE_CHECKING: 22 pd, dask = mrsm.attempt_import('pandas', 'dask') 23 24 25def add_missing_cols_to_df( 26 df: 'pd.DataFrame', 27 dtypes: Dict[str, Any], 28) -> 'pd.DataFrame': 29 """ 30 Add columns from the dtypes dictionary as null columns to a new DataFrame. 31 32 Parameters 33 ---------- 34 df: pd.DataFrame 35 The dataframe we should copy and add null columns. 36 37 dtypes: 38 The data types dictionary which may contain keys not present in `df.columns`. 39 40 Returns 41 ------- 42 A new `DataFrame` with the keys from `dtypes` added as null columns. 43 If `df.dtypes` is the same as `dtypes`, then return a reference to `df`. 44 NOTE: This will not ensure that dtypes are enforced! 45 46 Examples 47 -------- 48 >>> import pandas as pd 49 >>> df = pd.DataFrame([{'a': 1}]) 50 >>> dtypes = {'b': 'Int64'} 51 >>> add_missing_cols_to_df(df, dtypes) 52 a b 53 0 1 <NA> 54 >>> add_missing_cols_to_df(df, dtypes).dtypes 55 a int64 56 b Int64 57 dtype: object 58 >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes 59 a int64 60 dtype: object 61 >>> 62 """ 63 if set(df.columns) == set(dtypes): 64 return df 65 66 from meerschaum.utils.packages import attempt_import 67 from meerschaum.utils.dtypes import to_pandas_dtype 68 pandas = attempt_import('pandas') 69 70 def build_series(dtype: str): 71 return pandas.Series([], dtype=to_pandas_dtype(dtype)) 72 73 assign_kwargs = { 74 str(col): build_series(str(typ)) 75 for col, typ in dtypes.items() 76 if col not in df.columns 77 } 78 df_with_cols = df.assign(**assign_kwargs) 79 for col in assign_kwargs: 80 df_with_cols[col] = df_with_cols[col].fillna(pandas.NA) 81 return df_with_cols 82 83 84def filter_unseen_df( 85 old_df: 'pd.DataFrame', 86 new_df: 'pd.DataFrame', 87 safe_copy: bool = True, 88 dtypes: Optional[Dict[str, Any]] = None, 89 include_unchanged_columns: bool = False, 90 coerce_mixed_numerics: bool = True, 91 debug: bool = False, 92) -> 'pd.DataFrame': 93 """ 94 Left join two DataFrames to find the newest unseen data. 95 96 Parameters 97 ---------- 98 old_df: 'pd.DataFrame' 99 The original (target) dataframe. Acts as a filter on the `new_df`. 100 101 new_df: 'pd.DataFrame' 102 The fetched (source) dataframe. Rows that are contained in `old_df` are removed. 103 104 safe_copy: bool, default True 105 If `True`, create a copy before comparing and modifying the dataframes. 106 Setting to `False` may mutate the DataFrames. 107 108 dtypes: Optional[Dict[str, Any]], default None 109 Optionally specify the datatypes of the dataframe. 110 111 include_unchanged_columns: bool, default False 112 If `True`, include columns which haven't changed on rows which have changed. 113 114 coerce_mixed_numerics: bool, default True 115 If `True`, cast mixed integer and float columns between the old and new dataframes into 116 numeric values (`decimal.Decimal`). 117 118 debug: bool, default False 119 Verbosity toggle. 120 121 Returns 122 ------- 123 A pandas dataframe of the new, unseen rows in `new_df`. 124 125 Examples 126 -------- 127 ```python 128 >>> import pandas as pd 129 >>> df1 = pd.DataFrame({'a': [1,2]}) 130 >>> df2 = pd.DataFrame({'a': [2,3]}) 131 >>> filter_unseen_df(df1, df2) 132 a 133 0 3 134 135 ``` 136 137 """ 138 if old_df is None: 139 return new_df 140 141 if safe_copy: 142 old_df = old_df.copy() 143 new_df = new_df.copy() 144 145 import json 146 import functools 147 import traceback 148 from meerschaum.utils.warnings import warn 149 from meerschaum.utils.packages import import_pandas, attempt_import 150 from meerschaum.utils.dtypes import ( 151 to_pandas_dtype, 152 are_dtypes_equal, 153 attempt_cast_to_numeric, 154 attempt_cast_to_uuid, 155 attempt_cast_to_bytes, 156 coerce_timezone, 157 serialize_decimal, 158 ) 159 from meerschaum.utils.dtypes.sql import get_numeric_precision_scale 160 pd = import_pandas(debug=debug) 161 is_dask = 'dask' in new_df.__module__ 162 if is_dask: 163 pandas = attempt_import('pandas') 164 _ = attempt_import('partd', lazy=False) 165 dd = attempt_import('dask.dataframe') 166 merge = dd.merge 167 NA = pandas.NA 168 else: 169 merge = pd.merge 170 NA = pd.NA 171 172 new_df_dtypes = dict(new_df.dtypes) 173 old_df_dtypes = dict(old_df.dtypes) 174 175 same_cols = set(new_df.columns) == set(old_df.columns) 176 if not same_cols: 177 new_df = add_missing_cols_to_df(new_df, old_df_dtypes) 178 old_df = add_missing_cols_to_df(old_df, new_df_dtypes) 179 180 new_types_missing_from_old = { 181 col: typ 182 for col, typ in new_df_dtypes.items() 183 if col not in old_df_dtypes 184 } 185 old_types_missing_from_new = { 186 col: typ 187 for col, typ in new_df_dtypes.items() 188 if col not in old_df_dtypes 189 } 190 old_df_dtypes.update(new_types_missing_from_old) 191 new_df_dtypes.update(old_types_missing_from_new) 192 193 ### Edge case: two empty lists cast to DFs. 194 elif len(new_df.columns) == 0: 195 return new_df 196 197 try: 198 ### Order matters when checking equality. 199 new_df = new_df[old_df.columns] 200 201 except Exception as e: 202 warn( 203 "Was not able to cast old columns onto new DataFrame. " + 204 f"Are both DataFrames the same shape? Error:\n{e}", 205 stacklevel=3, 206 ) 207 return new_df[list(new_df_dtypes.keys())] 208 209 ### assume the old_df knows what it's doing, even if it's technically wrong. 210 if dtypes is None: 211 dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()} 212 213 dtypes = { 214 col: to_pandas_dtype(typ) 215 for col, typ in dtypes.items() 216 if col in new_df_dtypes and col in old_df_dtypes 217 } 218 for col, typ in new_df_dtypes.items(): 219 if col not in dtypes: 220 dtypes[col] = typ 221 222 numeric_cols_precisions_scales = { 223 col: get_numeric_precision_scale(None, typ) 224 for col, typ in dtypes.items() 225 if col and typ and typ.startswith('numeric') 226 } 227 228 dt_dtypes = { 229 col: typ 230 for col, typ in dtypes.items() 231 if are_dtypes_equal(typ, 'datetime') 232 } 233 non_dt_dtypes = { 234 col: typ 235 for col, typ in dtypes.items() 236 if col not in dt_dtypes 237 } 238 239 cast_non_dt_cols = True 240 try: 241 new_df = new_df.astype(non_dt_dtypes) 242 cast_non_dt_cols = False 243 except Exception as e: 244 warn( 245 f"Was not able to cast the new DataFrame to the given dtypes.\n{e}" 246 ) 247 248 cast_dt_cols = True 249 try: 250 for col, typ in dt_dtypes.items(): 251 strip_utc = ( 252 (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]' 253 ) 254 if col in old_df.columns: 255 old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc) 256 if col in new_df.columns: 257 new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc) 258 cast_dt_cols = False 259 except Exception as e: 260 warn(f"Could not cast datetime columns:\n{e}") 261 262 cast_cols = cast_dt_cols or cast_non_dt_cols 263 264 new_numeric_cols_existing = get_numeric_cols(new_df) 265 old_numeric_cols = get_numeric_cols(old_df) 266 for col, typ in {k: v for k, v in dtypes.items()}.items(): 267 if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')): 268 new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float') 269 new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int') 270 new_is_numeric = col in new_numeric_cols_existing 271 old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float') 272 old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int') 273 old_is_numeric = col in old_numeric_cols 274 275 if ( 276 coerce_mixed_numerics 277 and 278 (new_is_float or new_is_int or new_is_numeric) 279 and 280 (old_is_float or old_is_int or old_is_numeric) 281 ): 282 dtypes[col] = attempt_cast_to_numeric 283 cast_cols = True 284 continue 285 286 ### Fallback to object if the types don't match. 287 warn( 288 f"Detected different types for '{col}' " 289 + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), " 290 + "falling back to 'object'..." 291 ) 292 dtypes[col] = 'object' 293 cast_cols = True 294 295 if cast_cols: 296 for col, dtype in dtypes.items(): 297 if col in new_df.columns: 298 try: 299 new_df[col] = ( 300 new_df[col].astype(dtype) 301 if not callable(dtype) 302 else new_df[col].apply(dtype) 303 ) 304 except Exception as e: 305 warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}") 306 307 serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str) 308 new_json_cols = get_json_cols(new_df) 309 old_json_cols = get_json_cols(old_df) 310 json_cols = set(new_json_cols + old_json_cols) 311 for json_col in old_json_cols: 312 old_df[json_col] = old_df[json_col].apply(serializer) 313 for json_col in new_json_cols: 314 new_df[json_col] = new_df[json_col].apply(serializer) 315 316 new_numeric_cols = get_numeric_cols(new_df) 317 numeric_cols = set(new_numeric_cols + old_numeric_cols) 318 for numeric_col in old_numeric_cols: 319 old_df[numeric_col] = old_df[numeric_col].apply(serialize_decimal) 320 for numeric_col in new_numeric_cols: 321 new_df[numeric_col] = new_df[numeric_col].apply(serialize_decimal) 322 323 old_dt_cols = [ 324 col 325 for col, typ in old_df.dtypes.items() 326 if are_dtypes_equal(str(typ), 'datetime') 327 ] 328 for col in old_dt_cols: 329 strip_utc = ( 330 (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]' 331 ) 332 old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc) 333 334 new_dt_cols = [ 335 col 336 for col, typ in new_df.dtypes.items() 337 if are_dtypes_equal(str(typ), 'datetime') 338 ] 339 for col in new_dt_cols: 340 strip_utc = ( 341 (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]' 342 ) 343 new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc) 344 345 old_uuid_cols = get_uuid_cols(old_df) 346 new_uuid_cols = get_uuid_cols(new_df) 347 uuid_cols = set(new_uuid_cols + old_uuid_cols) 348 349 old_bytes_cols = get_bytes_cols(old_df) 350 new_bytes_cols = get_bytes_cols(new_df) 351 bytes_cols = set(new_bytes_cols + old_bytes_cols) 352 353 joined_df = merge( 354 new_df.infer_objects(copy=False).fillna(NA), 355 old_df.infer_objects(copy=False).fillna(NA), 356 how='left', 357 on=None, 358 indicator=True, 359 ) 360 changed_rows_mask = (joined_df['_merge'] == 'left_only') 361 new_cols = list(new_df_dtypes) 362 delta_df = joined_df[new_cols][changed_rows_mask].reset_index(drop=True) 363 364 for json_col in json_cols: 365 if json_col not in delta_df.columns: 366 continue 367 try: 368 delta_df[json_col] = delta_df[json_col].apply(json.loads) 369 except Exception: 370 warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}") 371 372 for numeric_col in numeric_cols: 373 if numeric_col not in delta_df.columns: 374 continue 375 try: 376 delta_df[numeric_col] = delta_df[numeric_col].apply( 377 functools.partial( 378 attempt_cast_to_numeric, 379 quantize=True, 380 precision=numeric_cols_precisions_scales.get(numeric_col, (None, None)[0]), 381 scale=numeric_cols_precisions_scales.get(numeric_col, (None, None)[1]), 382 ) 383 ) 384 except Exception: 385 warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}") 386 387 for uuid_col in uuid_cols: 388 if uuid_col not in delta_df.columns: 389 continue 390 try: 391 delta_df[uuid_col] = delta_df[uuid_col].apply(attempt_cast_to_uuid) 392 except Exception: 393 warn(f"Unable to parse numeric column '{uuid_col}':\n{traceback.format_exc()}") 394 395 for bytes_col in bytes_cols: 396 if bytes_col not in delta_df.columns: 397 continue 398 try: 399 delta_df[bytes_col] = delta_df[bytes_col].apply(attempt_cast_to_bytes) 400 except Exception: 401 warn(f"Unable to parse bytes column '{bytes_col}':\n{traceback.format_exc()}") 402 403 return delta_df 404 405 406def parse_df_datetimes( 407 df: 'pd.DataFrame', 408 ignore_cols: Optional[Iterable[str]] = None, 409 strip_timezone: bool = False, 410 chunksize: Optional[int] = None, 411 dtype_backend: str = 'numpy_nullable', 412 ignore_all: bool = False, 413 debug: bool = False, 414) -> 'pd.DataFrame': 415 """ 416 Parse a pandas DataFrame for datetime columns and cast as datetimes. 417 418 Parameters 419 ---------- 420 df: pd.DataFrame 421 The pandas DataFrame to parse. 422 423 ignore_cols: Optional[Iterable[str]], default None 424 If provided, do not attempt to coerce these columns as datetimes. 425 426 strip_timezone: bool, default False 427 If `True`, remove the UTC `tzinfo` property. 428 429 chunksize: Optional[int], default None 430 If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe. 431 432 dtype_backend: str, default 'numpy_nullable' 433 If `df` is not a DataFrame and new one needs to be constructed, 434 use this as the datatypes backend. 435 Accepted values are 'numpy_nullable' and 'pyarrow'. 436 437 ignore_all: bool, default False 438 If `True`, do not attempt to cast any columns to datetimes. 439 440 debug: bool, default False 441 Verbosity toggle. 442 443 Returns 444 ------- 445 A new pandas DataFrame with the determined datetime columns 446 (usually ISO strings) cast as datetimes. 447 448 Examples 449 -------- 450 ```python 451 >>> import pandas as pd 452 >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 453 >>> df.dtypes 454 a object 455 dtype: object 456 >>> df = parse_df_datetimes(df) 457 >>> df.dtypes 458 a datetime64[ns] 459 dtype: object 460 461 ``` 462 463 """ 464 from meerschaum.utils.packages import import_pandas, attempt_import 465 from meerschaum.utils.debug import dprint 466 from meerschaum.utils.warnings import warn 467 from meerschaum.utils.misc import items_str 468 from meerschaum.utils.dtypes import to_datetime 469 import traceback 470 pd = import_pandas() 471 pandas = attempt_import('pandas') 472 pd_name = pd.__name__ 473 using_dask = 'dask' in pd_name 474 df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__) 475 dask_dataframe = None 476 if using_dask or df_is_dask: 477 npartitions = chunksize_to_npartitions(chunksize) 478 dask_dataframe = attempt_import('dask.dataframe') 479 480 ### if df is a dict, build DataFrame 481 if isinstance(df, pandas.DataFrame): 482 pdf = df 483 elif df_is_dask and isinstance(df, dask_dataframe.DataFrame): 484 pdf = get_first_valid_dask_partition(df) 485 else: 486 if debug: 487 dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...") 488 489 if using_dask: 490 if isinstance(df, list): 491 keys = set() 492 for doc in df: 493 for key in doc: 494 keys.add(key) 495 df = pd.DataFrame.from_dict( 496 { 497 k: [ 498 doc.get(k, None) 499 for doc in df 500 ] for k in keys 501 }, 502 npartitions=npartitions, 503 ) 504 elif isinstance(df, dict): 505 df = pd.DataFrame.from_dict(df, npartitions=npartitions) 506 elif 'pandas.core.frame.DataFrame' in str(type(df)): 507 df = pd.from_pandas(df, npartitions=npartitions) 508 else: 509 raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.") 510 pandas = attempt_import('pandas') 511 pdf = get_first_valid_dask_partition(df) 512 513 else: 514 df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend) 515 pdf = df 516 517 ### skip parsing if DataFrame is empty 518 if len(pdf) == 0: 519 if debug: 520 dprint("df is empty. Returning original DataFrame without casting datetime columns...") 521 return df 522 523 ignore_cols = set( 524 (ignore_cols or []) + [ 525 col 526 for col, dtype in pdf.dtypes.items() 527 if 'datetime' in str(dtype) 528 ] 529 ) 530 cols_to_inspect = [ 531 col 532 for col in pdf.columns 533 if col not in ignore_cols 534 ] if not ignore_all else [] 535 536 if len(cols_to_inspect) == 0: 537 if debug: 538 dprint("All columns are ignored, skipping datetime detection...") 539 return df.infer_objects(copy=False).fillna(pandas.NA) 540 541 ### apply regex to columns to determine which are ISO datetimes 542 iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+' 543 dt_mask = pdf[cols_to_inspect].astype(str).apply( 544 lambda s: s.str.match(iso_dt_regex).all() 545 ) 546 547 ### list of datetime column names 548 datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]] 549 if not datetime_cols: 550 if debug: 551 dprint("No columns detected as datetimes, returning...") 552 return df.infer_objects(copy=False).fillna(pandas.NA) 553 554 if debug: 555 dprint("Converting columns to datetimes: " + str(datetime_cols)) 556 557 try: 558 if not using_dask: 559 df[datetime_cols] = df[datetime_cols].apply(to_datetime) 560 else: 561 df[datetime_cols] = df[datetime_cols].apply( 562 to_datetime, 563 utc=True, 564 axis=1, 565 meta={ 566 col: 'datetime64[ns, UTC]' 567 for col in datetime_cols 568 } 569 ) 570 except Exception: 571 warn( 572 f"Unable to apply `pd.to_datetime` to {items_str(datetime_cols)}:\n" 573 + f"{traceback.format_exc()}" 574 ) 575 576 if strip_timezone: 577 for dt in datetime_cols: 578 try: 579 df[dt] = df[dt].dt.tz_localize(None) 580 except Exception: 581 warn( 582 f"Unable to convert column '{dt}' to naive datetime:\n" 583 + f"{traceback.format_exc()}" 584 ) 585 586 return df.fillna(pandas.NA) 587 588 589def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]: 590 """ 591 Get the columns which contain unhashable objects from a Pandas DataFrame. 592 593 Parameters 594 ---------- 595 df: pd.DataFrame 596 The DataFrame which may contain unhashable objects. 597 598 Returns 599 ------- 600 A list of columns. 601 """ 602 if df is None: 603 return [] 604 if len(df) == 0: 605 return [] 606 607 is_dask = 'dask' in df.__module__ 608 if is_dask: 609 from meerschaum.utils.packages import attempt_import 610 pandas = attempt_import('pandas') 611 df = pandas.DataFrame(get_first_valid_dask_partition(df)) 612 return [ 613 col for col, val in df.iloc[0].items() 614 if not isinstance(val, Hashable) 615 ] 616 617 618def get_json_cols(df: 'pd.DataFrame') -> List[str]: 619 """ 620 Get the columns which contain unhashable objects from a Pandas DataFrame. 621 622 Parameters 623 ---------- 624 df: pd.DataFrame 625 The DataFrame which may contain unhashable objects. 626 627 Returns 628 ------- 629 A list of columns to be encoded as JSON. 630 """ 631 if df is None: 632 return [] 633 634 is_dask = 'dask' in df.__module__ if hasattr(df, '__module__') else False 635 if is_dask: 636 df = get_first_valid_dask_partition(df) 637 638 if len(df) == 0: 639 return [] 640 641 cols_indices = { 642 col: df[col].first_valid_index() 643 for col in df.columns 644 } 645 return [ 646 col 647 for col, ix in cols_indices.items() 648 if ( 649 ix is not None 650 and 651 not isinstance(df.loc[ix][col], Hashable) 652 ) 653 ] 654 655 656def get_numeric_cols(df: 'pd.DataFrame') -> List[str]: 657 """ 658 Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame. 659 660 Parameters 661 ---------- 662 df: pd.DataFrame 663 The DataFrame which may contain decimal objects. 664 665 Returns 666 ------- 667 A list of columns to treat as numerics. 668 """ 669 if df is None: 670 return [] 671 from decimal import Decimal 672 is_dask = 'dask' in df.__module__ 673 if is_dask: 674 df = get_first_valid_dask_partition(df) 675 676 if len(df) == 0: 677 return [] 678 679 cols_indices = { 680 col: df[col].first_valid_index() 681 for col in df.columns 682 } 683 return [ 684 col 685 for col, ix in cols_indices.items() 686 if ( 687 ix is not None 688 and 689 isinstance(df.loc[ix][col], Decimal) 690 ) 691 ] 692 693 694def get_uuid_cols(df: 'pd.DataFrame') -> List[str]: 695 """ 696 Get the columns which contain `uuid.UUID` objects from a Pandas DataFrame. 697 698 Parameters 699 ---------- 700 df: pd.DataFrame 701 The DataFrame which may contain UUID objects. 702 703 Returns 704 ------- 705 A list of columns to treat as UUIDs. 706 """ 707 if df is None: 708 return [] 709 from uuid import UUID 710 is_dask = 'dask' in df.__module__ 711 if is_dask: 712 df = get_first_valid_dask_partition(df) 713 714 if len(df) == 0: 715 return [] 716 717 cols_indices = { 718 col: df[col].first_valid_index() 719 for col in df.columns 720 } 721 return [ 722 col 723 for col, ix in cols_indices.items() 724 if ( 725 ix is not None 726 and 727 isinstance(df.loc[ix][col], UUID) 728 ) 729 ] 730 731 732def get_datetime_cols( 733 df: 'pd.DataFrame', 734 timezone_aware: bool = True, 735 timezone_naive: bool = True, 736) -> List[str]: 737 """ 738 Get the columns which contain `datetime` or `Timestamp` objects from a Pandas DataFrame. 739 740 Parameters 741 ---------- 742 df: pd.DataFrame 743 The DataFrame which may contain `datetime` or `Timestamp` objects. 744 745 timezone_aware: bool, default True 746 If `True`, include timezone-aware datetime columns. 747 748 timezone_naive: bool, default True 749 If `True`, include timezone-naive datetime columns. 750 751 Returns 752 ------- 753 A list of columns to treat as datetimes. 754 """ 755 if not timezone_aware and not timezone_naive: 756 raise ValueError("`timezone_aware` and `timezone_naive` cannot both be `False`.") 757 758 if df is None: 759 return [] 760 761 from datetime import datetime 762 from meerschaum.utils.dtypes import are_dtypes_equal 763 is_dask = 'dask' in df.__module__ 764 if is_dask: 765 df = get_first_valid_dask_partition(df) 766 767 known_dt_cols = [ 768 col 769 for col, typ in df.dtypes.items() 770 if are_dtypes_equal('datetime', str(typ)) 771 ] 772 773 if len(df) == 0: 774 return known_dt_cols 775 776 cols_indices = { 777 col: df[col].first_valid_index() 778 for col in df.columns 779 if col not in known_dt_cols 780 } 781 pydt_cols = [ 782 col 783 for col, ix in cols_indices.items() 784 if ( 785 ix is not None 786 and 787 isinstance(df.loc[ix][col], datetime) 788 ) 789 ] 790 dt_cols_set = set(known_dt_cols + pydt_cols) 791 all_dt_cols = [ 792 col 793 for col in df.columns 794 if col in dt_cols_set 795 ] 796 if timezone_aware and timezone_naive: 797 return all_dt_cols 798 799 known_timezone_aware_dt_cols = [ 800 col 801 for col in known_dt_cols 802 if getattr(df[col], 'tz', None) is not None 803 ] 804 timezone_aware_pydt_cols = [ 805 col 806 for col in pydt_cols 807 if df.loc[cols_indices[col]][col].tzinfo is not None 808 ] 809 timezone_aware_dt_cols_set = set(known_timezone_aware_dt_cols + timezone_aware_pydt_cols) 810 if timezone_aware: 811 return [ 812 col 813 for col in all_dt_cols 814 if col in timezone_aware_pydt_cols 815 ] 816 817 return [ 818 col 819 for col in all_dt_cols 820 if col not in timezone_aware_dt_cols_set 821 ] 822 823 824def get_bytes_cols(df: 'pd.DataFrame') -> List[str]: 825 """ 826 Get the columns which contain bytes strings from a Pandas DataFrame. 827 828 Parameters 829 ---------- 830 df: pd.DataFrame 831 The DataFrame which may contain bytes strings. 832 833 Returns 834 ------- 835 A list of columns to treat as bytes. 836 """ 837 if df is None: 838 return [] 839 is_dask = 'dask' in df.__module__ 840 if is_dask: 841 df = get_first_valid_dask_partition(df) 842 843 if len(df) == 0: 844 return [] 845 846 cols_indices = { 847 col: df[col].first_valid_index() 848 for col in df.columns 849 } 850 return [ 851 col 852 for col, ix in cols_indices.items() 853 if ( 854 ix is not None 855 and 856 isinstance(df.loc[ix][col], bytes) 857 ) 858 ] 859 860 861def enforce_dtypes( 862 df: 'pd.DataFrame', 863 dtypes: Dict[str, str], 864 safe_copy: bool = True, 865 coerce_numeric: bool = True, 866 coerce_timezone: bool = True, 867 strip_timezone: bool = False, 868 debug: bool = False, 869) -> 'pd.DataFrame': 870 """ 871 Enforce the `dtypes` dictionary on a DataFrame. 872 873 Parameters 874 ---------- 875 df: pd.DataFrame 876 The DataFrame on which to enforce dtypes. 877 878 dtypes: Dict[str, str] 879 The data types to attempt to enforce on the DataFrame. 880 881 safe_copy: bool, default True 882 If `True`, create a copy before comparing and modifying the dataframes. 883 Setting to `False` may mutate the DataFrames. 884 See `meerschaum.utils.dataframe.filter_unseen_df`. 885 886 coerce_numeric: bool, default True 887 If `True`, convert float and int collisions to numeric. 888 889 coerce_timezone: bool, default True 890 If `True`, convert datetimes to UTC. 891 892 strip_timezone: bool, default False 893 If `coerce_timezone` and `strip_timezone` are `True`, 894 remove timezone information from datetimes. 895 896 debug: bool, default False 897 Verbosity toggle. 898 899 Returns 900 ------- 901 The Pandas DataFrame with the types enforced. 902 """ 903 import json 904 import functools 905 from meerschaum.utils.debug import dprint 906 from meerschaum.utils.formatting import pprint 907 from meerschaum.utils.dtypes import ( 908 are_dtypes_equal, 909 to_pandas_dtype, 910 is_dtype_numeric, 911 attempt_cast_to_numeric, 912 attempt_cast_to_uuid, 913 attempt_cast_to_bytes, 914 coerce_timezone as _coerce_timezone, 915 ) 916 from meerschaum.utils.dtypes.sql import get_numeric_precision_scale 917 pandas = mrsm.attempt_import('pandas') 918 is_dask = 'dask' in df.__module__ 919 if safe_copy: 920 df = df.copy() 921 if len(df.columns) == 0: 922 if debug: 923 dprint("Incoming DataFrame has no columns. Skipping enforcement...") 924 return df 925 926 pipe_pandas_dtypes = { 927 col: to_pandas_dtype(typ) 928 for col, typ in dtypes.items() 929 } 930 json_cols = [ 931 col 932 for col, typ in dtypes.items() 933 if typ == 'json' 934 ] 935 numeric_cols = [ 936 col 937 for col, typ in dtypes.items() 938 if typ.startswith('numeric') 939 ] 940 uuid_cols = [ 941 col 942 for col, typ in dtypes.items() 943 if typ == 'uuid' 944 ] 945 bytes_cols = [ 946 col 947 for col, typ in dtypes.items() 948 if typ == 'bytes' 949 ] 950 datetime_cols = [ 951 col 952 for col, typ in dtypes.items() 953 if are_dtypes_equal(typ, 'datetime') 954 ] 955 df_numeric_cols = get_numeric_cols(df) 956 if debug: 957 dprint("Desired data types:") 958 pprint(dtypes) 959 dprint("Data types for incoming DataFrame:") 960 pprint({_col: str(_typ) for _col, _typ in df.dtypes.items()}) 961 962 if json_cols and len(df) > 0: 963 if debug: 964 dprint(f"Checking columns for JSON encoding: {json_cols}") 965 for col in json_cols: 966 if col in df.columns: 967 try: 968 df[col] = df[col].apply( 969 ( 970 lambda x: ( 971 json.loads(x) 972 if isinstance(x, str) 973 else x 974 ) 975 ) 976 ) 977 except Exception as e: 978 if debug: 979 dprint(f"Unable to parse column '{col}' as JSON:\n{e}") 980 981 if numeric_cols: 982 if debug: 983 dprint(f"Checking for numerics: {numeric_cols}") 984 for col in numeric_cols: 985 precision, scale = get_numeric_precision_scale(None, dtypes.get(col, '')) 986 if col in df.columns: 987 try: 988 df[col] = df[col].apply( 989 functools.partial( 990 attempt_cast_to_numeric, 991 quantize=True, 992 precision=precision, 993 scale=scale, 994 ) 995 ) 996 except Exception as e: 997 if debug: 998 dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}") 999 1000 if uuid_cols: 1001 if debug: 1002 dprint(f"Checking for UUIDs: {uuid_cols}") 1003 for col in uuid_cols: 1004 if col in df.columns: 1005 try: 1006 df[col] = df[col].apply(attempt_cast_to_uuid) 1007 except Exception as e: 1008 if debug: 1009 dprint(f"Unable to parse column '{col}' as UUID:\n{e}") 1010 1011 if bytes_cols: 1012 if debug: 1013 dprint(f"Checking for bytes: {bytes_cols}") 1014 for col in bytes_cols: 1015 if col in df.columns: 1016 try: 1017 df[col] = df[col].apply(attempt_cast_to_bytes) 1018 except Exception as e: 1019 if debug: 1020 dprint(f"Unable to parse column '{col}' as bytes:\n{e}") 1021 1022 if datetime_cols and coerce_timezone: 1023 if debug: 1024 dprint(f"Checking for datetime conversion: {datetime_cols}") 1025 for col in datetime_cols: 1026 if col in df.columns: 1027 df[col] = _coerce_timezone(df[col], strip_utc=strip_timezone) 1028 1029 df_dtypes = {c: str(t) for c, t in df.dtypes.items()} 1030 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 1031 if debug: 1032 dprint("Data types match. Exiting enforcement...") 1033 return df 1034 1035 common_dtypes = {} 1036 common_diff_dtypes = {} 1037 for col, typ in pipe_pandas_dtypes.items(): 1038 if col in df_dtypes: 1039 common_dtypes[col] = typ 1040 if not are_dtypes_equal(typ, df_dtypes[col]): 1041 common_diff_dtypes[col] = df_dtypes[col] 1042 1043 if debug: 1044 dprint("Common columns with different dtypes:") 1045 pprint(common_diff_dtypes) 1046 1047 detected_dt_cols = {} 1048 for col, typ in common_diff_dtypes.items(): 1049 if 'datetime' in typ and 'datetime' in common_dtypes[col]: 1050 df_dtypes[col] = typ 1051 detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col]) 1052 for col in detected_dt_cols: 1053 del common_diff_dtypes[col] 1054 1055 if debug: 1056 dprint("Common columns with different dtypes (after dates):") 1057 pprint(common_diff_dtypes) 1058 1059 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 1060 if debug: 1061 dprint( 1062 "The incoming DataFrame has mostly the same types, skipping enforcement." 1063 + "The only detected difference was in the following datetime columns." 1064 ) 1065 pprint(detected_dt_cols) 1066 return df 1067 1068 for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items(): 1069 previous_typ = common_dtypes[col] 1070 mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ)) 1071 explicitly_float = are_dtypes_equal(dtypes.get(col, 'object'), 'float') 1072 explicitly_numeric = dtypes.get(col, 'numeric').startswith('numeric') 1073 cast_to_numeric = ( 1074 explicitly_numeric 1075 or col in df_numeric_cols 1076 or (mixed_numeric_types and not explicitly_float) 1077 ) and coerce_numeric 1078 if cast_to_numeric: 1079 common_dtypes[col] = attempt_cast_to_numeric 1080 common_diff_dtypes[col] = attempt_cast_to_numeric 1081 1082 for d in common_diff_dtypes: 1083 t = common_dtypes[d] 1084 if debug: 1085 dprint(f"Casting column {d} to dtype {t}.") 1086 try: 1087 df[d] = ( 1088 df[d].apply(t) 1089 if callable(t) 1090 else df[d].astype(t) 1091 ) 1092 except Exception as e: 1093 if debug: 1094 dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}") 1095 if 'int' in str(t).lower(): 1096 try: 1097 df[d] = df[d].astype('float64').astype(t) 1098 except Exception: 1099 if debug: 1100 dprint(f"Was unable to convert to float then {t}.") 1101 return df 1102 1103 1104def get_datetime_bound_from_df( 1105 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]], 1106 datetime_column: str, 1107 minimum: bool = True, 1108) -> Union[int, datetime, None]: 1109 """ 1110 Return the minimum or maximum datetime (or integer) from a DataFrame. 1111 1112 Parameters 1113 ---------- 1114 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]] 1115 The DataFrame, list, or dict which contains the range axis. 1116 1117 datetime_column: str 1118 The name of the datetime (or int) column. 1119 1120 minimum: bool 1121 Whether to return the minimum (default) or maximum value. 1122 1123 Returns 1124 ------- 1125 The minimum or maximum datetime value in the dataframe, or `None`. 1126 """ 1127 from meerschaum.utils.dtypes import to_datetime, value_is_null 1128 1129 if df is None: 1130 return None 1131 if not datetime_column: 1132 return None 1133 1134 def compare(a, b): 1135 if a is None: 1136 return b 1137 if b is None: 1138 return a 1139 if minimum: 1140 return a if a < b else b 1141 return a if a > b else b 1142 1143 if isinstance(df, list): 1144 if len(df) == 0: 1145 return None 1146 best_yet = df[0].get(datetime_column, None) 1147 for doc in df: 1148 val = doc.get(datetime_column, None) 1149 best_yet = compare(best_yet, val) 1150 return best_yet 1151 1152 if isinstance(df, dict): 1153 if datetime_column not in df: 1154 return None 1155 best_yet = df[datetime_column][0] 1156 for val in df[datetime_column]: 1157 best_yet = compare(best_yet, val) 1158 return best_yet 1159 1160 if 'DataFrame' in str(type(df)): 1161 from meerschaum.utils.dtypes import are_dtypes_equal 1162 pandas = mrsm.attempt_import('pandas') 1163 is_dask = 'dask' in df.__module__ 1164 1165 if datetime_column not in df.columns: 1166 return None 1167 1168 try: 1169 dt_val = ( 1170 df[datetime_column].min(skipna=True) 1171 if minimum 1172 else df[datetime_column].max(skipna=True) 1173 ) 1174 except Exception: 1175 dt_val = pandas.NA 1176 if is_dask and dt_val is not None and dt_val is not pandas.NA: 1177 dt_val = dt_val.compute() 1178 1179 return ( 1180 to_datetime(dt_val, as_pydatetime=True) 1181 if are_dtypes_equal(str(type(dt_val)), 'datetime') 1182 else (dt_val if not value_is_null(dt_val) else None) 1183 ) 1184 1185 return None 1186 1187 1188def get_unique_index_values( 1189 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]], 1190 indices: List[str], 1191) -> Dict[str, List[Any]]: 1192 """ 1193 Return a dictionary of the unique index values in a DataFrame. 1194 1195 Parameters 1196 ---------- 1197 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]] 1198 The dataframe (or list or dict) which contains index values. 1199 1200 indices: List[str] 1201 The list of index columns. 1202 1203 Returns 1204 ------- 1205 A dictionary mapping indices to unique values. 1206 """ 1207 if df is None: 1208 return {} 1209 if 'dataframe' in str(type(df)).lower(): 1210 pandas = mrsm.attempt_import('pandas') 1211 return { 1212 col: list({ 1213 (val if val is not pandas.NA else None) 1214 for val in df[col].unique() 1215 }) 1216 for col in indices 1217 if col in df.columns 1218 } 1219 1220 unique_indices = defaultdict(lambda: set()) 1221 if isinstance(df, list): 1222 for doc in df: 1223 for index in indices: 1224 if index in doc: 1225 unique_indices[index].add(doc[index]) 1226 1227 elif isinstance(df, dict): 1228 for index in indices: 1229 if index in df: 1230 unique_indices[index] = unique_indices[index].union(set(df[index])) 1231 1232 return {key: list(val) for key, val in unique_indices.items()} 1233 1234 1235def df_is_chunk_generator(df: Any) -> bool: 1236 """ 1237 Determine whether to treat `df` as a chunk generator. 1238 1239 Note this should only be used in a context where generators are expected, 1240 as it will return `True` for any iterable. 1241 1242 Parameters 1243 ---------- 1244 The DataFrame or chunk generator to evaluate. 1245 1246 Returns 1247 ------- 1248 A `bool` indicating whether to treat `df` as a generator. 1249 """ 1250 return ( 1251 not isinstance(df, (dict, list, str)) 1252 and 'DataFrame' not in str(type(df)) 1253 and isinstance(df, (Generator, Iterable, Iterator)) 1254 ) 1255 1256 1257def chunksize_to_npartitions(chunksize: Optional[int]) -> int: 1258 """ 1259 Return the Dask `npartitions` value for a given `chunksize`. 1260 """ 1261 if chunksize == -1: 1262 from meerschaum.config import get_config 1263 chunksize = get_config('system', 'connectors', 'sql', 'chunksize') 1264 if chunksize is None: 1265 return 1 1266 return -1 * chunksize 1267 1268 1269def df_from_literal( 1270 pipe: Optional[mrsm.Pipe] = None, 1271 literal: str = None, 1272 debug: bool = False 1273) -> 'pd.DataFrame': 1274 """ 1275 Construct a dataframe from a literal value, using the pipe's datetime and value column names. 1276 1277 Parameters 1278 ---------- 1279 pipe: Optional['meerschaum.Pipe'], default None 1280 The pipe which will consume the literal value. 1281 1282 Returns 1283 ------- 1284 A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns 1285 and the literal as the value. 1286 """ 1287 from meerschaum.utils.packages import import_pandas 1288 from meerschaum.utils.warnings import error, warn 1289 from meerschaum.utils.debug import dprint 1290 1291 if pipe is None or literal is None: 1292 error("Please provide a Pipe and a literal value") 1293 ### this will raise an error if the columns are undefined 1294 dt_name, val_name = pipe.get_columns('datetime', 'value') 1295 1296 val = literal 1297 if isinstance(literal, str): 1298 if debug: 1299 dprint(f"Received literal string: '{literal}'") 1300 import ast 1301 try: 1302 val = ast.literal_eval(literal) 1303 except Exception: 1304 warn( 1305 "Failed to parse value from string:\n" + f"{literal}" + 1306 "\n\nWill cast as a string instead."\ 1307 ) 1308 val = literal 1309 1310 from datetime import datetime, timezone 1311 now = datetime.now(timezone.utc).replace(tzinfo=None) 1312 1313 pd = import_pandas() 1314 return pd.DataFrame({dt_name: [now], val_name: [val]}) 1315 1316 1317def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]: 1318 """ 1319 Return the first valid Dask DataFrame partition (if possible). 1320 """ 1321 pdf = None 1322 for partition in ddf.partitions: 1323 try: 1324 pdf = partition.compute() 1325 except Exception: 1326 continue 1327 if len(pdf) > 0: 1328 return pdf 1329 _ = mrsm.attempt_import('partd', lazy=False) 1330 return ddf.compute() 1331 1332 1333def query_df( 1334 df: 'pd.DataFrame', 1335 params: Optional[Dict[str, Any]] = None, 1336 begin: Union[datetime, int, None] = None, 1337 end: Union[datetime, int, None] = None, 1338 datetime_column: Optional[str] = None, 1339 select_columns: Optional[List[str]] = None, 1340 omit_columns: Optional[List[str]] = None, 1341 inplace: bool = False, 1342 reset_index: bool = False, 1343 coerce_types: bool = False, 1344 debug: bool = False, 1345) -> 'pd.DataFrame': 1346 """ 1347 Query the dataframe with the params dictionary. 1348 1349 Parameters 1350 ---------- 1351 df: pd.DataFrame 1352 The DataFrame to query against. 1353 1354 params: Optional[Dict[str, Any]], default None 1355 The parameters dictionary to use for the query. 1356 1357 begin: Union[datetime, int, None], default None 1358 If `begin` and `datetime_column` are provided, only return rows with a timestamp 1359 greater than or equal to this value. 1360 1361 end: Union[datetime, int, None], default None 1362 If `begin` and `datetime_column` are provided, only return rows with a timestamp 1363 less than this value. 1364 1365 datetime_column: Optional[str], default None 1366 A `datetime_column` must be provided to use `begin` and `end`. 1367 1368 select_columns: Optional[List[str]], default None 1369 If provided, only return these columns. 1370 1371 omit_columns: Optional[List[str]], default None 1372 If provided, do not include these columns in the result. 1373 1374 inplace: bool, default False 1375 If `True`, modify the DataFrame inplace rather than creating a new DataFrame. 1376 1377 reset_index: bool, default False 1378 If `True`, reset the index in the resulting DataFrame. 1379 1380 coerce_types: bool, default False 1381 If `True`, cast the dataframe and parameters as strings before querying. 1382 1383 Returns 1384 ------- 1385 A Pandas DataFrame query result. 1386 """ 1387 1388 def _process_select_columns(_df): 1389 if not select_columns: 1390 return 1391 for col in list(_df.columns): 1392 if col not in select_columns: 1393 del _df[col] 1394 1395 def _process_omit_columns(_df): 1396 if not omit_columns: 1397 return 1398 for col in list(_df.columns): 1399 if col in omit_columns: 1400 del _df[col] 1401 1402 if not params and not begin and not end: 1403 if not inplace: 1404 df = df.copy() 1405 _process_select_columns(df) 1406 _process_omit_columns(df) 1407 return df 1408 1409 from meerschaum.utils.debug import dprint 1410 from meerschaum.utils.misc import get_in_ex_params 1411 from meerschaum.utils.warnings import warn 1412 from meerschaum.utils.dtypes import are_dtypes_equal, value_is_null 1413 dateutil_parser = mrsm.attempt_import('dateutil.parser') 1414 pandas = mrsm.attempt_import('pandas') 1415 NA = pandas.NA 1416 1417 if params: 1418 params = params.copy() 1419 for key, val in {k: v for k, v in params.items()}.items(): 1420 if isinstance(val, (list, tuple)): 1421 if None in val: 1422 val = [item for item in val if item is not None] + [NA] 1423 params[key] = val 1424 if coerce_types: 1425 params[key] = [str(x) for x in val] 1426 else: 1427 if value_is_null(val): 1428 val = NA 1429 params[key] = NA 1430 if coerce_types: 1431 params[key] = str(val) 1432 1433 dtypes = {col: str(typ) for col, typ in df.dtypes.items()} 1434 1435 if inplace: 1436 df.fillna(NA, inplace=True) 1437 else: 1438 df = df.infer_objects().fillna(NA) 1439 1440 if isinstance(begin, str): 1441 begin = dateutil_parser.parse(begin) 1442 if isinstance(end, str): 1443 end = dateutil_parser.parse(end) 1444 1445 if begin is not None or end is not None: 1446 if not datetime_column or datetime_column not in df.columns: 1447 warn( 1448 f"The datetime column '{datetime_column}' is not present in the Dataframe, " 1449 + "ignoring begin and end...", 1450 ) 1451 begin, end = None, None 1452 1453 if debug: 1454 dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}") 1455 1456 if datetime_column and (begin is not None or end is not None): 1457 if debug: 1458 dprint("Checking for datetime column compatability.") 1459 1460 from meerschaum.utils.dtypes import coerce_timezone 1461 df_is_dt = are_dtypes_equal(str(df.dtypes[datetime_column]), 'datetime') 1462 begin_is_int = are_dtypes_equal(str(type(begin)), 'int') 1463 end_is_int = are_dtypes_equal(str(type(end)), 'int') 1464 1465 if df_is_dt: 1466 df_tz = ( 1467 getattr(df[datetime_column].dt, 'tz', None) 1468 if hasattr(df[datetime_column], 'dt') 1469 else None 1470 ) 1471 1472 if begin_is_int: 1473 begin = datetime.fromtimestamp(int(begin), timezone.utc).replace(tzinfo=None) 1474 if debug: 1475 dprint(f"`begin` will be cast to '{begin}'.") 1476 if end_is_int: 1477 end = datetime.fromtimestamp(int(end), timezone.utc).replace(tzinfo=None) 1478 if debug: 1479 dprint(f"`end` will be cast to '{end}'.") 1480 1481 begin = coerce_timezone(begin, strip_utc=(df_tz is None)) if begin is not None else None 1482 end = coerce_timezone(end, strip_utc=(df_tz is None)) if begin is not None else None 1483 1484 in_ex_params = get_in_ex_params(params) 1485 1486 masks = [ 1487 ( 1488 (df[datetime_column] >= begin) 1489 if begin is not None and datetime_column 1490 else True 1491 ) & ( 1492 (df[datetime_column] < end) 1493 if end is not None and datetime_column 1494 else True 1495 ) 1496 ] 1497 1498 masks.extend([ 1499 ( 1500 ( 1501 (df[col] if not coerce_types else df[col].astype(str)).isin(in_vals) 1502 if in_vals 1503 else True 1504 ) & ( 1505 ~(df[col] if not coerce_types else df[col].astype(str)).isin(ex_vals) 1506 if ex_vals 1507 else True 1508 ) 1509 ) 1510 for col, (in_vals, ex_vals) in in_ex_params.items() 1511 if col in df.columns 1512 ]) 1513 query_mask = masks[0] 1514 for mask in masks[1:]: 1515 query_mask = query_mask & mask 1516 1517 original_cols = df.columns 1518 1519 ### NOTE: We must cast bool columns to `boolean[pyarrow]` 1520 ### to allow for `<NA>` values. 1521 bool_cols = [ 1522 col 1523 for col, typ in df.dtypes.items() 1524 if are_dtypes_equal(str(typ), 'bool') 1525 ] 1526 for col in bool_cols: 1527 df[col] = df[col].astype('boolean[pyarrow]') 1528 1529 if not isinstance(query_mask, bool): 1530 df['__mrsm_mask'] = ( 1531 query_mask.astype('boolean[pyarrow]') 1532 if hasattr(query_mask, 'astype') 1533 else query_mask 1534 ) 1535 1536 if inplace: 1537 df.where(query_mask, other=NA, inplace=True) 1538 df.dropna(how='all', inplace=True) 1539 result_df = df 1540 else: 1541 result_df = df.where(query_mask, other=NA) 1542 result_df.dropna(how='all', inplace=True) 1543 1544 else: 1545 result_df = df 1546 1547 if '__mrsm_mask' in df.columns: 1548 del df['__mrsm_mask'] 1549 if '__mrsm_mask' in result_df.columns: 1550 del result_df['__mrsm_mask'] 1551 1552 if reset_index: 1553 result_df.reset_index(drop=True, inplace=True) 1554 1555 result_df = enforce_dtypes( 1556 result_df, 1557 dtypes, 1558 safe_copy=False, 1559 debug=debug, 1560 coerce_numeric=False, 1561 coerce_timezone=False, 1562 ) 1563 1564 if select_columns == ['*']: 1565 select_columns = None 1566 1567 if not select_columns and not omit_columns: 1568 return result_df[original_cols] 1569 1570 _process_select_columns(result_df) 1571 _process_omit_columns(result_df) 1572 1573 return result_df 1574 1575 1576def to_json( 1577 df: 'pd.DataFrame', 1578 safe_copy: bool = True, 1579 orient: str = 'records', 1580 date_format: str = 'iso', 1581 date_unit: str = 'us', 1582 **kwargs: Any 1583) -> str: 1584 """ 1585 Serialize the given dataframe as a JSON string. 1586 1587 Parameters 1588 ---------- 1589 df: pd.DataFrame 1590 The DataFrame to be serialized. 1591 1592 safe_copy: bool, default True 1593 If `False`, modify the DataFrame inplace. 1594 1595 date_format: str, default 'iso' 1596 The default format for timestamps. 1597 1598 date_unit: str, default 'us' 1599 The precision of the timestamps. 1600 1601 Returns 1602 ------- 1603 A JSON string. 1604 """ 1605 from meerschaum.utils.packages import import_pandas 1606 from meerschaum.utils.dtypes import serialize_bytes, serialize_decimal 1607 pd = import_pandas() 1608 uuid_cols = get_uuid_cols(df) 1609 bytes_cols = get_bytes_cols(df) 1610 numeric_cols = get_numeric_cols(df) 1611 if safe_copy and bool(uuid_cols or bytes_cols): 1612 df = df.copy() 1613 for col in uuid_cols: 1614 df[col] = df[col].astype(str) 1615 for col in bytes_cols: 1616 df[col] = df[col].apply(serialize_bytes) 1617 for col in numeric_cols: 1618 df[col] = df[col].apply(serialize_decimal) 1619 return df.infer_objects(copy=False).fillna(pd.NA).to_json( 1620 date_format=date_format, 1621 date_unit=date_unit, 1622 orient=orient, 1623 **kwargs 1624 )
26def add_missing_cols_to_df( 27 df: 'pd.DataFrame', 28 dtypes: Dict[str, Any], 29) -> 'pd.DataFrame': 30 """ 31 Add columns from the dtypes dictionary as null columns to a new DataFrame. 32 33 Parameters 34 ---------- 35 df: pd.DataFrame 36 The dataframe we should copy and add null columns. 37 38 dtypes: 39 The data types dictionary which may contain keys not present in `df.columns`. 40 41 Returns 42 ------- 43 A new `DataFrame` with the keys from `dtypes` added as null columns. 44 If `df.dtypes` is the same as `dtypes`, then return a reference to `df`. 45 NOTE: This will not ensure that dtypes are enforced! 46 47 Examples 48 -------- 49 >>> import pandas as pd 50 >>> df = pd.DataFrame([{'a': 1}]) 51 >>> dtypes = {'b': 'Int64'} 52 >>> add_missing_cols_to_df(df, dtypes) 53 a b 54 0 1 <NA> 55 >>> add_missing_cols_to_df(df, dtypes).dtypes 56 a int64 57 b Int64 58 dtype: object 59 >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes 60 a int64 61 dtype: object 62 >>> 63 """ 64 if set(df.columns) == set(dtypes): 65 return df 66 67 from meerschaum.utils.packages import attempt_import 68 from meerschaum.utils.dtypes import to_pandas_dtype 69 pandas = attempt_import('pandas') 70 71 def build_series(dtype: str): 72 return pandas.Series([], dtype=to_pandas_dtype(dtype)) 73 74 assign_kwargs = { 75 str(col): build_series(str(typ)) 76 for col, typ in dtypes.items() 77 if col not in df.columns 78 } 79 df_with_cols = df.assign(**assign_kwargs) 80 for col in assign_kwargs: 81 df_with_cols[col] = df_with_cols[col].fillna(pandas.NA) 82 return df_with_cols
Add columns from the dtypes dictionary as null columns to a new DataFrame.
Parameters
- df (pd.DataFrame): The dataframe we should copy and add null columns.
- dtypes:: The data types dictionary which may contain keys not present in
df.columns
.
Returns
- A new
DataFrame
with the keys fromdtypes
added as null columns. - If
df.dtypes
is the same asdtypes
, then return a reference todf
. - NOTE (This will not ensure that dtypes are enforced!):
Examples
>>> import pandas as pd
>>> df = pd.DataFrame([{'a': 1}])
>>> dtypes = {'b': 'Int64'}
>>> add_missing_cols_to_df(df, dtypes)
a b
0 1 <NA>
>>> add_missing_cols_to_df(df, dtypes)meerschaum.utils.dtypes
a int64
b Int64
dtype: object
>>> add_missing_cols_to_df(df, {'a': 'object'})meerschaum.utils.dtypes
a int64
dtype: object
>>>
85def filter_unseen_df( 86 old_df: 'pd.DataFrame', 87 new_df: 'pd.DataFrame', 88 safe_copy: bool = True, 89 dtypes: Optional[Dict[str, Any]] = None, 90 include_unchanged_columns: bool = False, 91 coerce_mixed_numerics: bool = True, 92 debug: bool = False, 93) -> 'pd.DataFrame': 94 """ 95 Left join two DataFrames to find the newest unseen data. 96 97 Parameters 98 ---------- 99 old_df: 'pd.DataFrame' 100 The original (target) dataframe. Acts as a filter on the `new_df`. 101 102 new_df: 'pd.DataFrame' 103 The fetched (source) dataframe. Rows that are contained in `old_df` are removed. 104 105 safe_copy: bool, default True 106 If `True`, create a copy before comparing and modifying the dataframes. 107 Setting to `False` may mutate the DataFrames. 108 109 dtypes: Optional[Dict[str, Any]], default None 110 Optionally specify the datatypes of the dataframe. 111 112 include_unchanged_columns: bool, default False 113 If `True`, include columns which haven't changed on rows which have changed. 114 115 coerce_mixed_numerics: bool, default True 116 If `True`, cast mixed integer and float columns between the old and new dataframes into 117 numeric values (`decimal.Decimal`). 118 119 debug: bool, default False 120 Verbosity toggle. 121 122 Returns 123 ------- 124 A pandas dataframe of the new, unseen rows in `new_df`. 125 126 Examples 127 -------- 128 ```python 129 >>> import pandas as pd 130 >>> df1 = pd.DataFrame({'a': [1,2]}) 131 >>> df2 = pd.DataFrame({'a': [2,3]}) 132 >>> filter_unseen_df(df1, df2) 133 a 134 0 3 135 136 ``` 137 138 """ 139 if old_df is None: 140 return new_df 141 142 if safe_copy: 143 old_df = old_df.copy() 144 new_df = new_df.copy() 145 146 import json 147 import functools 148 import traceback 149 from meerschaum.utils.warnings import warn 150 from meerschaum.utils.packages import import_pandas, attempt_import 151 from meerschaum.utils.dtypes import ( 152 to_pandas_dtype, 153 are_dtypes_equal, 154 attempt_cast_to_numeric, 155 attempt_cast_to_uuid, 156 attempt_cast_to_bytes, 157 coerce_timezone, 158 serialize_decimal, 159 ) 160 from meerschaum.utils.dtypes.sql import get_numeric_precision_scale 161 pd = import_pandas(debug=debug) 162 is_dask = 'dask' in new_df.__module__ 163 if is_dask: 164 pandas = attempt_import('pandas') 165 _ = attempt_import('partd', lazy=False) 166 dd = attempt_import('dask.dataframe') 167 merge = dd.merge 168 NA = pandas.NA 169 else: 170 merge = pd.merge 171 NA = pd.NA 172 173 new_df_dtypes = dict(new_df.dtypes) 174 old_df_dtypes = dict(old_df.dtypes) 175 176 same_cols = set(new_df.columns) == set(old_df.columns) 177 if not same_cols: 178 new_df = add_missing_cols_to_df(new_df, old_df_dtypes) 179 old_df = add_missing_cols_to_df(old_df, new_df_dtypes) 180 181 new_types_missing_from_old = { 182 col: typ 183 for col, typ in new_df_dtypes.items() 184 if col not in old_df_dtypes 185 } 186 old_types_missing_from_new = { 187 col: typ 188 for col, typ in new_df_dtypes.items() 189 if col not in old_df_dtypes 190 } 191 old_df_dtypes.update(new_types_missing_from_old) 192 new_df_dtypes.update(old_types_missing_from_new) 193 194 ### Edge case: two empty lists cast to DFs. 195 elif len(new_df.columns) == 0: 196 return new_df 197 198 try: 199 ### Order matters when checking equality. 200 new_df = new_df[old_df.columns] 201 202 except Exception as e: 203 warn( 204 "Was not able to cast old columns onto new DataFrame. " + 205 f"Are both DataFrames the same shape? Error:\n{e}", 206 stacklevel=3, 207 ) 208 return new_df[list(new_df_dtypes.keys())] 209 210 ### assume the old_df knows what it's doing, even if it's technically wrong. 211 if dtypes is None: 212 dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()} 213 214 dtypes = { 215 col: to_pandas_dtype(typ) 216 for col, typ in dtypes.items() 217 if col in new_df_dtypes and col in old_df_dtypes 218 } 219 for col, typ in new_df_dtypes.items(): 220 if col not in dtypes: 221 dtypes[col] = typ 222 223 numeric_cols_precisions_scales = { 224 col: get_numeric_precision_scale(None, typ) 225 for col, typ in dtypes.items() 226 if col and typ and typ.startswith('numeric') 227 } 228 229 dt_dtypes = { 230 col: typ 231 for col, typ in dtypes.items() 232 if are_dtypes_equal(typ, 'datetime') 233 } 234 non_dt_dtypes = { 235 col: typ 236 for col, typ in dtypes.items() 237 if col not in dt_dtypes 238 } 239 240 cast_non_dt_cols = True 241 try: 242 new_df = new_df.astype(non_dt_dtypes) 243 cast_non_dt_cols = False 244 except Exception as e: 245 warn( 246 f"Was not able to cast the new DataFrame to the given dtypes.\n{e}" 247 ) 248 249 cast_dt_cols = True 250 try: 251 for col, typ in dt_dtypes.items(): 252 strip_utc = ( 253 (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]' 254 ) 255 if col in old_df.columns: 256 old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc) 257 if col in new_df.columns: 258 new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc) 259 cast_dt_cols = False 260 except Exception as e: 261 warn(f"Could not cast datetime columns:\n{e}") 262 263 cast_cols = cast_dt_cols or cast_non_dt_cols 264 265 new_numeric_cols_existing = get_numeric_cols(new_df) 266 old_numeric_cols = get_numeric_cols(old_df) 267 for col, typ in {k: v for k, v in dtypes.items()}.items(): 268 if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')): 269 new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float') 270 new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int') 271 new_is_numeric = col in new_numeric_cols_existing 272 old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float') 273 old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int') 274 old_is_numeric = col in old_numeric_cols 275 276 if ( 277 coerce_mixed_numerics 278 and 279 (new_is_float or new_is_int or new_is_numeric) 280 and 281 (old_is_float or old_is_int or old_is_numeric) 282 ): 283 dtypes[col] = attempt_cast_to_numeric 284 cast_cols = True 285 continue 286 287 ### Fallback to object if the types don't match. 288 warn( 289 f"Detected different types for '{col}' " 290 + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), " 291 + "falling back to 'object'..." 292 ) 293 dtypes[col] = 'object' 294 cast_cols = True 295 296 if cast_cols: 297 for col, dtype in dtypes.items(): 298 if col in new_df.columns: 299 try: 300 new_df[col] = ( 301 new_df[col].astype(dtype) 302 if not callable(dtype) 303 else new_df[col].apply(dtype) 304 ) 305 except Exception as e: 306 warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}") 307 308 serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str) 309 new_json_cols = get_json_cols(new_df) 310 old_json_cols = get_json_cols(old_df) 311 json_cols = set(new_json_cols + old_json_cols) 312 for json_col in old_json_cols: 313 old_df[json_col] = old_df[json_col].apply(serializer) 314 for json_col in new_json_cols: 315 new_df[json_col] = new_df[json_col].apply(serializer) 316 317 new_numeric_cols = get_numeric_cols(new_df) 318 numeric_cols = set(new_numeric_cols + old_numeric_cols) 319 for numeric_col in old_numeric_cols: 320 old_df[numeric_col] = old_df[numeric_col].apply(serialize_decimal) 321 for numeric_col in new_numeric_cols: 322 new_df[numeric_col] = new_df[numeric_col].apply(serialize_decimal) 323 324 old_dt_cols = [ 325 col 326 for col, typ in old_df.dtypes.items() 327 if are_dtypes_equal(str(typ), 'datetime') 328 ] 329 for col in old_dt_cols: 330 strip_utc = ( 331 (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]' 332 ) 333 old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc) 334 335 new_dt_cols = [ 336 col 337 for col, typ in new_df.dtypes.items() 338 if are_dtypes_equal(str(typ), 'datetime') 339 ] 340 for col in new_dt_cols: 341 strip_utc = ( 342 (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]' 343 ) 344 new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc) 345 346 old_uuid_cols = get_uuid_cols(old_df) 347 new_uuid_cols = get_uuid_cols(new_df) 348 uuid_cols = set(new_uuid_cols + old_uuid_cols) 349 350 old_bytes_cols = get_bytes_cols(old_df) 351 new_bytes_cols = get_bytes_cols(new_df) 352 bytes_cols = set(new_bytes_cols + old_bytes_cols) 353 354 joined_df = merge( 355 new_df.infer_objects(copy=False).fillna(NA), 356 old_df.infer_objects(copy=False).fillna(NA), 357 how='left', 358 on=None, 359 indicator=True, 360 ) 361 changed_rows_mask = (joined_df['_merge'] == 'left_only') 362 new_cols = list(new_df_dtypes) 363 delta_df = joined_df[new_cols][changed_rows_mask].reset_index(drop=True) 364 365 for json_col in json_cols: 366 if json_col not in delta_df.columns: 367 continue 368 try: 369 delta_df[json_col] = delta_df[json_col].apply(json.loads) 370 except Exception: 371 warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}") 372 373 for numeric_col in numeric_cols: 374 if numeric_col not in delta_df.columns: 375 continue 376 try: 377 delta_df[numeric_col] = delta_df[numeric_col].apply( 378 functools.partial( 379 attempt_cast_to_numeric, 380 quantize=True, 381 precision=numeric_cols_precisions_scales.get(numeric_col, (None, None)[0]), 382 scale=numeric_cols_precisions_scales.get(numeric_col, (None, None)[1]), 383 ) 384 ) 385 except Exception: 386 warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}") 387 388 for uuid_col in uuid_cols: 389 if uuid_col not in delta_df.columns: 390 continue 391 try: 392 delta_df[uuid_col] = delta_df[uuid_col].apply(attempt_cast_to_uuid) 393 except Exception: 394 warn(f"Unable to parse numeric column '{uuid_col}':\n{traceback.format_exc()}") 395 396 for bytes_col in bytes_cols: 397 if bytes_col not in delta_df.columns: 398 continue 399 try: 400 delta_df[bytes_col] = delta_df[bytes_col].apply(attempt_cast_to_bytes) 401 except Exception: 402 warn(f"Unable to parse bytes column '{bytes_col}':\n{traceback.format_exc()}") 403 404 return delta_df
Left join two DataFrames to find the newest unseen data.
Parameters
- old_df ('pd.DataFrame'):
The original (target) dataframe. Acts as a filter on the
new_df
. - new_df ('pd.DataFrame'):
The fetched (source) dataframe. Rows that are contained in
old_df
are removed. - safe_copy (bool, default True):
If
True
, create a copy before comparing and modifying the dataframes. Setting toFalse
may mutate the DataFrames. - dtypes (Optional[Dict[str, Any]], default None): Optionally specify the datatypes of the dataframe.
- include_unchanged_columns (bool, default False):
If
True
, include columns which haven't changed on rows which have changed. - coerce_mixed_numerics (bool, default True):
If
True
, cast mixed integer and float columns between the old and new dataframes into numeric values (decimal.Decimal
). - debug (bool, default False): Verbosity toggle.
Returns
- A pandas dataframe of the new, unseen rows in
new_df
.
Examples
>>> import pandas as pd
>>> df1 = pd.DataFrame({'a': [1,2]})
>>> df2 = pd.DataFrame({'a': [2,3]})
>>> filter_unseen_df(df1, df2)
a
0 3
407def parse_df_datetimes( 408 df: 'pd.DataFrame', 409 ignore_cols: Optional[Iterable[str]] = None, 410 strip_timezone: bool = False, 411 chunksize: Optional[int] = None, 412 dtype_backend: str = 'numpy_nullable', 413 ignore_all: bool = False, 414 debug: bool = False, 415) -> 'pd.DataFrame': 416 """ 417 Parse a pandas DataFrame for datetime columns and cast as datetimes. 418 419 Parameters 420 ---------- 421 df: pd.DataFrame 422 The pandas DataFrame to parse. 423 424 ignore_cols: Optional[Iterable[str]], default None 425 If provided, do not attempt to coerce these columns as datetimes. 426 427 strip_timezone: bool, default False 428 If `True`, remove the UTC `tzinfo` property. 429 430 chunksize: Optional[int], default None 431 If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe. 432 433 dtype_backend: str, default 'numpy_nullable' 434 If `df` is not a DataFrame and new one needs to be constructed, 435 use this as the datatypes backend. 436 Accepted values are 'numpy_nullable' and 'pyarrow'. 437 438 ignore_all: bool, default False 439 If `True`, do not attempt to cast any columns to datetimes. 440 441 debug: bool, default False 442 Verbosity toggle. 443 444 Returns 445 ------- 446 A new pandas DataFrame with the determined datetime columns 447 (usually ISO strings) cast as datetimes. 448 449 Examples 450 -------- 451 ```python 452 >>> import pandas as pd 453 >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 454 >>> df.dtypes 455 a object 456 dtype: object 457 >>> df = parse_df_datetimes(df) 458 >>> df.dtypes 459 a datetime64[ns] 460 dtype: object 461 462 ``` 463 464 """ 465 from meerschaum.utils.packages import import_pandas, attempt_import 466 from meerschaum.utils.debug import dprint 467 from meerschaum.utils.warnings import warn 468 from meerschaum.utils.misc import items_str 469 from meerschaum.utils.dtypes import to_datetime 470 import traceback 471 pd = import_pandas() 472 pandas = attempt_import('pandas') 473 pd_name = pd.__name__ 474 using_dask = 'dask' in pd_name 475 df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__) 476 dask_dataframe = None 477 if using_dask or df_is_dask: 478 npartitions = chunksize_to_npartitions(chunksize) 479 dask_dataframe = attempt_import('dask.dataframe') 480 481 ### if df is a dict, build DataFrame 482 if isinstance(df, pandas.DataFrame): 483 pdf = df 484 elif df_is_dask and isinstance(df, dask_dataframe.DataFrame): 485 pdf = get_first_valid_dask_partition(df) 486 else: 487 if debug: 488 dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...") 489 490 if using_dask: 491 if isinstance(df, list): 492 keys = set() 493 for doc in df: 494 for key in doc: 495 keys.add(key) 496 df = pd.DataFrame.from_dict( 497 { 498 k: [ 499 doc.get(k, None) 500 for doc in df 501 ] for k in keys 502 }, 503 npartitions=npartitions, 504 ) 505 elif isinstance(df, dict): 506 df = pd.DataFrame.from_dict(df, npartitions=npartitions) 507 elif 'pandas.core.frame.DataFrame' in str(type(df)): 508 df = pd.from_pandas(df, npartitions=npartitions) 509 else: 510 raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.") 511 pandas = attempt_import('pandas') 512 pdf = get_first_valid_dask_partition(df) 513 514 else: 515 df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend) 516 pdf = df 517 518 ### skip parsing if DataFrame is empty 519 if len(pdf) == 0: 520 if debug: 521 dprint("df is empty. Returning original DataFrame without casting datetime columns...") 522 return df 523 524 ignore_cols = set( 525 (ignore_cols or []) + [ 526 col 527 for col, dtype in pdf.dtypes.items() 528 if 'datetime' in str(dtype) 529 ] 530 ) 531 cols_to_inspect = [ 532 col 533 for col in pdf.columns 534 if col not in ignore_cols 535 ] if not ignore_all else [] 536 537 if len(cols_to_inspect) == 0: 538 if debug: 539 dprint("All columns are ignored, skipping datetime detection...") 540 return df.infer_objects(copy=False).fillna(pandas.NA) 541 542 ### apply regex to columns to determine which are ISO datetimes 543 iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+' 544 dt_mask = pdf[cols_to_inspect].astype(str).apply( 545 lambda s: s.str.match(iso_dt_regex).all() 546 ) 547 548 ### list of datetime column names 549 datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]] 550 if not datetime_cols: 551 if debug: 552 dprint("No columns detected as datetimes, returning...") 553 return df.infer_objects(copy=False).fillna(pandas.NA) 554 555 if debug: 556 dprint("Converting columns to datetimes: " + str(datetime_cols)) 557 558 try: 559 if not using_dask: 560 df[datetime_cols] = df[datetime_cols].apply(to_datetime) 561 else: 562 df[datetime_cols] = df[datetime_cols].apply( 563 to_datetime, 564 utc=True, 565 axis=1, 566 meta={ 567 col: 'datetime64[ns, UTC]' 568 for col in datetime_cols 569 } 570 ) 571 except Exception: 572 warn( 573 f"Unable to apply `pd.to_datetime` to {items_str(datetime_cols)}:\n" 574 + f"{traceback.format_exc()}" 575 ) 576 577 if strip_timezone: 578 for dt in datetime_cols: 579 try: 580 df[dt] = df[dt].dt.tz_localize(None) 581 except Exception: 582 warn( 583 f"Unable to convert column '{dt}' to naive datetime:\n" 584 + f"{traceback.format_exc()}" 585 ) 586 587 return df.fillna(pandas.NA)
Parse a pandas DataFrame for datetime columns and cast as datetimes.
Parameters
- df (pd.DataFrame): The pandas DataFrame to parse.
- ignore_cols (Optional[Iterable[str]], default None): If provided, do not attempt to coerce these columns as datetimes.
- strip_timezone (bool, default False):
If
True
, remove the UTCtzinfo
property. - chunksize (Optional[int], default None):
If the pandas implementation is
'dask'
, use this chunksize for the distributed dataframe. - dtype_backend (str, default 'numpy_nullable'):
If
df
is not a DataFrame and new one needs to be constructed, use this as the datatypes backend. Accepted values are 'numpy_nullable' and 'pyarrow'. - ignore_all (bool, default False):
If
True
, do not attempt to cast any columns to datetimes. - debug (bool, default False): Verbosity toggle.
Returns
- A new pandas DataFrame with the determined datetime columns
- (usually ISO strings) cast as datetimes.
Examples
>>> import pandas as pd
>>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']})
>>> df.dtypes
a object
dtype: object
>>> df = parse_df_datetimes(df)
>>> df.dtypes
a datetime64[ns]
dtype: object
590def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]: 591 """ 592 Get the columns which contain unhashable objects from a Pandas DataFrame. 593 594 Parameters 595 ---------- 596 df: pd.DataFrame 597 The DataFrame which may contain unhashable objects. 598 599 Returns 600 ------- 601 A list of columns. 602 """ 603 if df is None: 604 return [] 605 if len(df) == 0: 606 return [] 607 608 is_dask = 'dask' in df.__module__ 609 if is_dask: 610 from meerschaum.utils.packages import attempt_import 611 pandas = attempt_import('pandas') 612 df = pandas.DataFrame(get_first_valid_dask_partition(df)) 613 return [ 614 col for col, val in df.iloc[0].items() 615 if not isinstance(val, Hashable) 616 ]
Get the columns which contain unhashable objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
- A list of columns.
619def get_json_cols(df: 'pd.DataFrame') -> List[str]: 620 """ 621 Get the columns which contain unhashable objects from a Pandas DataFrame. 622 623 Parameters 624 ---------- 625 df: pd.DataFrame 626 The DataFrame which may contain unhashable objects. 627 628 Returns 629 ------- 630 A list of columns to be encoded as JSON. 631 """ 632 if df is None: 633 return [] 634 635 is_dask = 'dask' in df.__module__ if hasattr(df, '__module__') else False 636 if is_dask: 637 df = get_first_valid_dask_partition(df) 638 639 if len(df) == 0: 640 return [] 641 642 cols_indices = { 643 col: df[col].first_valid_index() 644 for col in df.columns 645 } 646 return [ 647 col 648 for col, ix in cols_indices.items() 649 if ( 650 ix is not None 651 and 652 not isinstance(df.loc[ix][col], Hashable) 653 ) 654 ]
Get the columns which contain unhashable objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
- A list of columns to be encoded as JSON.
657def get_numeric_cols(df: 'pd.DataFrame') -> List[str]: 658 """ 659 Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame. 660 661 Parameters 662 ---------- 663 df: pd.DataFrame 664 The DataFrame which may contain decimal objects. 665 666 Returns 667 ------- 668 A list of columns to treat as numerics. 669 """ 670 if df is None: 671 return [] 672 from decimal import Decimal 673 is_dask = 'dask' in df.__module__ 674 if is_dask: 675 df = get_first_valid_dask_partition(df) 676 677 if len(df) == 0: 678 return [] 679 680 cols_indices = { 681 col: df[col].first_valid_index() 682 for col in df.columns 683 } 684 return [ 685 col 686 for col, ix in cols_indices.items() 687 if ( 688 ix is not None 689 and 690 isinstance(df.loc[ix][col], Decimal) 691 ) 692 ]
Get the columns which contain decimal.Decimal
objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain decimal objects.
Returns
- A list of columns to treat as numerics.
695def get_uuid_cols(df: 'pd.DataFrame') -> List[str]: 696 """ 697 Get the columns which contain `uuid.UUID` objects from a Pandas DataFrame. 698 699 Parameters 700 ---------- 701 df: pd.DataFrame 702 The DataFrame which may contain UUID objects. 703 704 Returns 705 ------- 706 A list of columns to treat as UUIDs. 707 """ 708 if df is None: 709 return [] 710 from uuid import UUID 711 is_dask = 'dask' in df.__module__ 712 if is_dask: 713 df = get_first_valid_dask_partition(df) 714 715 if len(df) == 0: 716 return [] 717 718 cols_indices = { 719 col: df[col].first_valid_index() 720 for col in df.columns 721 } 722 return [ 723 col 724 for col, ix in cols_indices.items() 725 if ( 726 ix is not None 727 and 728 isinstance(df.loc[ix][col], UUID) 729 ) 730 ]
Get the columns which contain uuid.UUID
objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain UUID objects.
Returns
- A list of columns to treat as UUIDs.
733def get_datetime_cols( 734 df: 'pd.DataFrame', 735 timezone_aware: bool = True, 736 timezone_naive: bool = True, 737) -> List[str]: 738 """ 739 Get the columns which contain `datetime` or `Timestamp` objects from a Pandas DataFrame. 740 741 Parameters 742 ---------- 743 df: pd.DataFrame 744 The DataFrame which may contain `datetime` or `Timestamp` objects. 745 746 timezone_aware: bool, default True 747 If `True`, include timezone-aware datetime columns. 748 749 timezone_naive: bool, default True 750 If `True`, include timezone-naive datetime columns. 751 752 Returns 753 ------- 754 A list of columns to treat as datetimes. 755 """ 756 if not timezone_aware and not timezone_naive: 757 raise ValueError("`timezone_aware` and `timezone_naive` cannot both be `False`.") 758 759 if df is None: 760 return [] 761 762 from datetime import datetime 763 from meerschaum.utils.dtypes import are_dtypes_equal 764 is_dask = 'dask' in df.__module__ 765 if is_dask: 766 df = get_first_valid_dask_partition(df) 767 768 known_dt_cols = [ 769 col 770 for col, typ in df.dtypes.items() 771 if are_dtypes_equal('datetime', str(typ)) 772 ] 773 774 if len(df) == 0: 775 return known_dt_cols 776 777 cols_indices = { 778 col: df[col].first_valid_index() 779 for col in df.columns 780 if col not in known_dt_cols 781 } 782 pydt_cols = [ 783 col 784 for col, ix in cols_indices.items() 785 if ( 786 ix is not None 787 and 788 isinstance(df.loc[ix][col], datetime) 789 ) 790 ] 791 dt_cols_set = set(known_dt_cols + pydt_cols) 792 all_dt_cols = [ 793 col 794 for col in df.columns 795 if col in dt_cols_set 796 ] 797 if timezone_aware and timezone_naive: 798 return all_dt_cols 799 800 known_timezone_aware_dt_cols = [ 801 col 802 for col in known_dt_cols 803 if getattr(df[col], 'tz', None) is not None 804 ] 805 timezone_aware_pydt_cols = [ 806 col 807 for col in pydt_cols 808 if df.loc[cols_indices[col]][col].tzinfo is not None 809 ] 810 timezone_aware_dt_cols_set = set(known_timezone_aware_dt_cols + timezone_aware_pydt_cols) 811 if timezone_aware: 812 return [ 813 col 814 for col in all_dt_cols 815 if col in timezone_aware_pydt_cols 816 ] 817 818 return [ 819 col 820 for col in all_dt_cols 821 if col not in timezone_aware_dt_cols_set 822 ]
Get the columns which contain datetime
or Timestamp
objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame):
The DataFrame which may contain
datetime
orTimestamp
objects. - timezone_aware (bool, default True):
If
True
, include timezone-aware datetime columns. - timezone_naive (bool, default True):
If
True
, include timezone-naive datetime columns.
Returns
- A list of columns to treat as datetimes.
825def get_bytes_cols(df: 'pd.DataFrame') -> List[str]: 826 """ 827 Get the columns which contain bytes strings from a Pandas DataFrame. 828 829 Parameters 830 ---------- 831 df: pd.DataFrame 832 The DataFrame which may contain bytes strings. 833 834 Returns 835 ------- 836 A list of columns to treat as bytes. 837 """ 838 if df is None: 839 return [] 840 is_dask = 'dask' in df.__module__ 841 if is_dask: 842 df = get_first_valid_dask_partition(df) 843 844 if len(df) == 0: 845 return [] 846 847 cols_indices = { 848 col: df[col].first_valid_index() 849 for col in df.columns 850 } 851 return [ 852 col 853 for col, ix in cols_indices.items() 854 if ( 855 ix is not None 856 and 857 isinstance(df.loc[ix][col], bytes) 858 ) 859 ]
Get the columns which contain bytes strings from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain bytes strings.
Returns
- A list of columns to treat as bytes.
862def enforce_dtypes( 863 df: 'pd.DataFrame', 864 dtypes: Dict[str, str], 865 safe_copy: bool = True, 866 coerce_numeric: bool = True, 867 coerce_timezone: bool = True, 868 strip_timezone: bool = False, 869 debug: bool = False, 870) -> 'pd.DataFrame': 871 """ 872 Enforce the `dtypes` dictionary on a DataFrame. 873 874 Parameters 875 ---------- 876 df: pd.DataFrame 877 The DataFrame on which to enforce dtypes. 878 879 dtypes: Dict[str, str] 880 The data types to attempt to enforce on the DataFrame. 881 882 safe_copy: bool, default True 883 If `True`, create a copy before comparing and modifying the dataframes. 884 Setting to `False` may mutate the DataFrames. 885 See `meerschaum.utils.dataframe.filter_unseen_df`. 886 887 coerce_numeric: bool, default True 888 If `True`, convert float and int collisions to numeric. 889 890 coerce_timezone: bool, default True 891 If `True`, convert datetimes to UTC. 892 893 strip_timezone: bool, default False 894 If `coerce_timezone` and `strip_timezone` are `True`, 895 remove timezone information from datetimes. 896 897 debug: bool, default False 898 Verbosity toggle. 899 900 Returns 901 ------- 902 The Pandas DataFrame with the types enforced. 903 """ 904 import json 905 import functools 906 from meerschaum.utils.debug import dprint 907 from meerschaum.utils.formatting import pprint 908 from meerschaum.utils.dtypes import ( 909 are_dtypes_equal, 910 to_pandas_dtype, 911 is_dtype_numeric, 912 attempt_cast_to_numeric, 913 attempt_cast_to_uuid, 914 attempt_cast_to_bytes, 915 coerce_timezone as _coerce_timezone, 916 ) 917 from meerschaum.utils.dtypes.sql import get_numeric_precision_scale 918 pandas = mrsm.attempt_import('pandas') 919 is_dask = 'dask' in df.__module__ 920 if safe_copy: 921 df = df.copy() 922 if len(df.columns) == 0: 923 if debug: 924 dprint("Incoming DataFrame has no columns. Skipping enforcement...") 925 return df 926 927 pipe_pandas_dtypes = { 928 col: to_pandas_dtype(typ) 929 for col, typ in dtypes.items() 930 } 931 json_cols = [ 932 col 933 for col, typ in dtypes.items() 934 if typ == 'json' 935 ] 936 numeric_cols = [ 937 col 938 for col, typ in dtypes.items() 939 if typ.startswith('numeric') 940 ] 941 uuid_cols = [ 942 col 943 for col, typ in dtypes.items() 944 if typ == 'uuid' 945 ] 946 bytes_cols = [ 947 col 948 for col, typ in dtypes.items() 949 if typ == 'bytes' 950 ] 951 datetime_cols = [ 952 col 953 for col, typ in dtypes.items() 954 if are_dtypes_equal(typ, 'datetime') 955 ] 956 df_numeric_cols = get_numeric_cols(df) 957 if debug: 958 dprint("Desired data types:") 959 pprint(dtypes) 960 dprint("Data types for incoming DataFrame:") 961 pprint({_col: str(_typ) for _col, _typ in df.dtypes.items()}) 962 963 if json_cols and len(df) > 0: 964 if debug: 965 dprint(f"Checking columns for JSON encoding: {json_cols}") 966 for col in json_cols: 967 if col in df.columns: 968 try: 969 df[col] = df[col].apply( 970 ( 971 lambda x: ( 972 json.loads(x) 973 if isinstance(x, str) 974 else x 975 ) 976 ) 977 ) 978 except Exception as e: 979 if debug: 980 dprint(f"Unable to parse column '{col}' as JSON:\n{e}") 981 982 if numeric_cols: 983 if debug: 984 dprint(f"Checking for numerics: {numeric_cols}") 985 for col in numeric_cols: 986 precision, scale = get_numeric_precision_scale(None, dtypes.get(col, '')) 987 if col in df.columns: 988 try: 989 df[col] = df[col].apply( 990 functools.partial( 991 attempt_cast_to_numeric, 992 quantize=True, 993 precision=precision, 994 scale=scale, 995 ) 996 ) 997 except Exception as e: 998 if debug: 999 dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}") 1000 1001 if uuid_cols: 1002 if debug: 1003 dprint(f"Checking for UUIDs: {uuid_cols}") 1004 for col in uuid_cols: 1005 if col in df.columns: 1006 try: 1007 df[col] = df[col].apply(attempt_cast_to_uuid) 1008 except Exception as e: 1009 if debug: 1010 dprint(f"Unable to parse column '{col}' as UUID:\n{e}") 1011 1012 if bytes_cols: 1013 if debug: 1014 dprint(f"Checking for bytes: {bytes_cols}") 1015 for col in bytes_cols: 1016 if col in df.columns: 1017 try: 1018 df[col] = df[col].apply(attempt_cast_to_bytes) 1019 except Exception as e: 1020 if debug: 1021 dprint(f"Unable to parse column '{col}' as bytes:\n{e}") 1022 1023 if datetime_cols and coerce_timezone: 1024 if debug: 1025 dprint(f"Checking for datetime conversion: {datetime_cols}") 1026 for col in datetime_cols: 1027 if col in df.columns: 1028 df[col] = _coerce_timezone(df[col], strip_utc=strip_timezone) 1029 1030 df_dtypes = {c: str(t) for c, t in df.dtypes.items()} 1031 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 1032 if debug: 1033 dprint("Data types match. Exiting enforcement...") 1034 return df 1035 1036 common_dtypes = {} 1037 common_diff_dtypes = {} 1038 for col, typ in pipe_pandas_dtypes.items(): 1039 if col in df_dtypes: 1040 common_dtypes[col] = typ 1041 if not are_dtypes_equal(typ, df_dtypes[col]): 1042 common_diff_dtypes[col] = df_dtypes[col] 1043 1044 if debug: 1045 dprint("Common columns with different dtypes:") 1046 pprint(common_diff_dtypes) 1047 1048 detected_dt_cols = {} 1049 for col, typ in common_diff_dtypes.items(): 1050 if 'datetime' in typ and 'datetime' in common_dtypes[col]: 1051 df_dtypes[col] = typ 1052 detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col]) 1053 for col in detected_dt_cols: 1054 del common_diff_dtypes[col] 1055 1056 if debug: 1057 dprint("Common columns with different dtypes (after dates):") 1058 pprint(common_diff_dtypes) 1059 1060 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 1061 if debug: 1062 dprint( 1063 "The incoming DataFrame has mostly the same types, skipping enforcement." 1064 + "The only detected difference was in the following datetime columns." 1065 ) 1066 pprint(detected_dt_cols) 1067 return df 1068 1069 for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items(): 1070 previous_typ = common_dtypes[col] 1071 mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ)) 1072 explicitly_float = are_dtypes_equal(dtypes.get(col, 'object'), 'float') 1073 explicitly_numeric = dtypes.get(col, 'numeric').startswith('numeric') 1074 cast_to_numeric = ( 1075 explicitly_numeric 1076 or col in df_numeric_cols 1077 or (mixed_numeric_types and not explicitly_float) 1078 ) and coerce_numeric 1079 if cast_to_numeric: 1080 common_dtypes[col] = attempt_cast_to_numeric 1081 common_diff_dtypes[col] = attempt_cast_to_numeric 1082 1083 for d in common_diff_dtypes: 1084 t = common_dtypes[d] 1085 if debug: 1086 dprint(f"Casting column {d} to dtype {t}.") 1087 try: 1088 df[d] = ( 1089 df[d].apply(t) 1090 if callable(t) 1091 else df[d].astype(t) 1092 ) 1093 except Exception as e: 1094 if debug: 1095 dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}") 1096 if 'int' in str(t).lower(): 1097 try: 1098 df[d] = df[d].astype('float64').astype(t) 1099 except Exception: 1100 if debug: 1101 dprint(f"Was unable to convert to float then {t}.") 1102 return df
Enforce the dtypes
dictionary on a DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame on which to enforce dtypes.
- dtypes (Dict[str, str]): The data types to attempt to enforce on the DataFrame.
- safe_copy (bool, default True):
If
True
, create a copy before comparing and modifying the dataframes. Setting toFalse
may mutate the DataFrames. Seemeerschaum.utils.dataframe.filter_unseen_df
. - coerce_numeric (bool, default True):
If
True
, convert float and int collisions to numeric. - coerce_timezone (bool, default True):
If
True
, convert datetimes to UTC. - strip_timezone (bool, default False):
If
coerce_timezone
andstrip_timezone
areTrue
, remove timezone information from datetimes. - debug (bool, default False): Verbosity toggle.
Returns
- The Pandas DataFrame with the types enforced.
1105def get_datetime_bound_from_df( 1106 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]], 1107 datetime_column: str, 1108 minimum: bool = True, 1109) -> Union[int, datetime, None]: 1110 """ 1111 Return the minimum or maximum datetime (or integer) from a DataFrame. 1112 1113 Parameters 1114 ---------- 1115 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]] 1116 The DataFrame, list, or dict which contains the range axis. 1117 1118 datetime_column: str 1119 The name of the datetime (or int) column. 1120 1121 minimum: bool 1122 Whether to return the minimum (default) or maximum value. 1123 1124 Returns 1125 ------- 1126 The minimum or maximum datetime value in the dataframe, or `None`. 1127 """ 1128 from meerschaum.utils.dtypes import to_datetime, value_is_null 1129 1130 if df is None: 1131 return None 1132 if not datetime_column: 1133 return None 1134 1135 def compare(a, b): 1136 if a is None: 1137 return b 1138 if b is None: 1139 return a 1140 if minimum: 1141 return a if a < b else b 1142 return a if a > b else b 1143 1144 if isinstance(df, list): 1145 if len(df) == 0: 1146 return None 1147 best_yet = df[0].get(datetime_column, None) 1148 for doc in df: 1149 val = doc.get(datetime_column, None) 1150 best_yet = compare(best_yet, val) 1151 return best_yet 1152 1153 if isinstance(df, dict): 1154 if datetime_column not in df: 1155 return None 1156 best_yet = df[datetime_column][0] 1157 for val in df[datetime_column]: 1158 best_yet = compare(best_yet, val) 1159 return best_yet 1160 1161 if 'DataFrame' in str(type(df)): 1162 from meerschaum.utils.dtypes import are_dtypes_equal 1163 pandas = mrsm.attempt_import('pandas') 1164 is_dask = 'dask' in df.__module__ 1165 1166 if datetime_column not in df.columns: 1167 return None 1168 1169 try: 1170 dt_val = ( 1171 df[datetime_column].min(skipna=True) 1172 if minimum 1173 else df[datetime_column].max(skipna=True) 1174 ) 1175 except Exception: 1176 dt_val = pandas.NA 1177 if is_dask and dt_val is not None and dt_val is not pandas.NA: 1178 dt_val = dt_val.compute() 1179 1180 return ( 1181 to_datetime(dt_val, as_pydatetime=True) 1182 if are_dtypes_equal(str(type(dt_val)), 'datetime') 1183 else (dt_val if not value_is_null(dt_val) else None) 1184 ) 1185 1186 return None
Return the minimum or maximum datetime (or integer) from a DataFrame.
Parameters
- df (Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]): The DataFrame, list, or dict which contains the range axis.
- datetime_column (str): The name of the datetime (or int) column.
- minimum (bool): Whether to return the minimum (default) or maximum value.
Returns
- The minimum or maximum datetime value in the dataframe, or
None
.
1189def get_unique_index_values( 1190 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]], 1191 indices: List[str], 1192) -> Dict[str, List[Any]]: 1193 """ 1194 Return a dictionary of the unique index values in a DataFrame. 1195 1196 Parameters 1197 ---------- 1198 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]] 1199 The dataframe (or list or dict) which contains index values. 1200 1201 indices: List[str] 1202 The list of index columns. 1203 1204 Returns 1205 ------- 1206 A dictionary mapping indices to unique values. 1207 """ 1208 if df is None: 1209 return {} 1210 if 'dataframe' in str(type(df)).lower(): 1211 pandas = mrsm.attempt_import('pandas') 1212 return { 1213 col: list({ 1214 (val if val is not pandas.NA else None) 1215 for val in df[col].unique() 1216 }) 1217 for col in indices 1218 if col in df.columns 1219 } 1220 1221 unique_indices = defaultdict(lambda: set()) 1222 if isinstance(df, list): 1223 for doc in df: 1224 for index in indices: 1225 if index in doc: 1226 unique_indices[index].add(doc[index]) 1227 1228 elif isinstance(df, dict): 1229 for index in indices: 1230 if index in df: 1231 unique_indices[index] = unique_indices[index].union(set(df[index])) 1232 1233 return {key: list(val) for key, val in unique_indices.items()}
Return a dictionary of the unique index values in a DataFrame.
Parameters
- df (Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]): The dataframe (or list or dict) which contains index values.
- indices (List[str]): The list of index columns.
Returns
- A dictionary mapping indices to unique values.
1236def df_is_chunk_generator(df: Any) -> bool: 1237 """ 1238 Determine whether to treat `df` as a chunk generator. 1239 1240 Note this should only be used in a context where generators are expected, 1241 as it will return `True` for any iterable. 1242 1243 Parameters 1244 ---------- 1245 The DataFrame or chunk generator to evaluate. 1246 1247 Returns 1248 ------- 1249 A `bool` indicating whether to treat `df` as a generator. 1250 """ 1251 return ( 1252 not isinstance(df, (dict, list, str)) 1253 and 'DataFrame' not in str(type(df)) 1254 and isinstance(df, (Generator, Iterable, Iterator)) 1255 )
Determine whether to treat df
as a chunk generator.
Note this should only be used in a context where generators are expected,
as it will return True
for any iterable.
Parameters
- The DataFrame or chunk generator to evaluate.
Returns
- A
bool
indicating whether to treatdf
as a generator.
1258def chunksize_to_npartitions(chunksize: Optional[int]) -> int: 1259 """ 1260 Return the Dask `npartitions` value for a given `chunksize`. 1261 """ 1262 if chunksize == -1: 1263 from meerschaum.config import get_config 1264 chunksize = get_config('system', 'connectors', 'sql', 'chunksize') 1265 if chunksize is None: 1266 return 1 1267 return -1 * chunksize
Return the Dask npartitions
value for a given chunksize
.
1270def df_from_literal( 1271 pipe: Optional[mrsm.Pipe] = None, 1272 literal: str = None, 1273 debug: bool = False 1274) -> 'pd.DataFrame': 1275 """ 1276 Construct a dataframe from a literal value, using the pipe's datetime and value column names. 1277 1278 Parameters 1279 ---------- 1280 pipe: Optional['meerschaum.Pipe'], default None 1281 The pipe which will consume the literal value. 1282 1283 Returns 1284 ------- 1285 A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns 1286 and the literal as the value. 1287 """ 1288 from meerschaum.utils.packages import import_pandas 1289 from meerschaum.utils.warnings import error, warn 1290 from meerschaum.utils.debug import dprint 1291 1292 if pipe is None or literal is None: 1293 error("Please provide a Pipe and a literal value") 1294 ### this will raise an error if the columns are undefined 1295 dt_name, val_name = pipe.get_columns('datetime', 'value') 1296 1297 val = literal 1298 if isinstance(literal, str): 1299 if debug: 1300 dprint(f"Received literal string: '{literal}'") 1301 import ast 1302 try: 1303 val = ast.literal_eval(literal) 1304 except Exception: 1305 warn( 1306 "Failed to parse value from string:\n" + f"{literal}" + 1307 "\n\nWill cast as a string instead."\ 1308 ) 1309 val = literal 1310 1311 from datetime import datetime, timezone 1312 now = datetime.now(timezone.utc).replace(tzinfo=None) 1313 1314 pd = import_pandas() 1315 return pd.DataFrame({dt_name: [now], val_name: [val]})
Construct a dataframe from a literal value, using the pipe's datetime and value column names.
Parameters
- pipe (Optional['meerschaum.Pipe'], default None): The pipe which will consume the literal value.
Returns
- A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
- and the literal as the value.
1318def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]: 1319 """ 1320 Return the first valid Dask DataFrame partition (if possible). 1321 """ 1322 pdf = None 1323 for partition in ddf.partitions: 1324 try: 1325 pdf = partition.compute() 1326 except Exception: 1327 continue 1328 if len(pdf) > 0: 1329 return pdf 1330 _ = mrsm.attempt_import('partd', lazy=False) 1331 return ddf.compute()
Return the first valid Dask DataFrame partition (if possible).
1334def query_df( 1335 df: 'pd.DataFrame', 1336 params: Optional[Dict[str, Any]] = None, 1337 begin: Union[datetime, int, None] = None, 1338 end: Union[datetime, int, None] = None, 1339 datetime_column: Optional[str] = None, 1340 select_columns: Optional[List[str]] = None, 1341 omit_columns: Optional[List[str]] = None, 1342 inplace: bool = False, 1343 reset_index: bool = False, 1344 coerce_types: bool = False, 1345 debug: bool = False, 1346) -> 'pd.DataFrame': 1347 """ 1348 Query the dataframe with the params dictionary. 1349 1350 Parameters 1351 ---------- 1352 df: pd.DataFrame 1353 The DataFrame to query against. 1354 1355 params: Optional[Dict[str, Any]], default None 1356 The parameters dictionary to use for the query. 1357 1358 begin: Union[datetime, int, None], default None 1359 If `begin` and `datetime_column` are provided, only return rows with a timestamp 1360 greater than or equal to this value. 1361 1362 end: Union[datetime, int, None], default None 1363 If `begin` and `datetime_column` are provided, only return rows with a timestamp 1364 less than this value. 1365 1366 datetime_column: Optional[str], default None 1367 A `datetime_column` must be provided to use `begin` and `end`. 1368 1369 select_columns: Optional[List[str]], default None 1370 If provided, only return these columns. 1371 1372 omit_columns: Optional[List[str]], default None 1373 If provided, do not include these columns in the result. 1374 1375 inplace: bool, default False 1376 If `True`, modify the DataFrame inplace rather than creating a new DataFrame. 1377 1378 reset_index: bool, default False 1379 If `True`, reset the index in the resulting DataFrame. 1380 1381 coerce_types: bool, default False 1382 If `True`, cast the dataframe and parameters as strings before querying. 1383 1384 Returns 1385 ------- 1386 A Pandas DataFrame query result. 1387 """ 1388 1389 def _process_select_columns(_df): 1390 if not select_columns: 1391 return 1392 for col in list(_df.columns): 1393 if col not in select_columns: 1394 del _df[col] 1395 1396 def _process_omit_columns(_df): 1397 if not omit_columns: 1398 return 1399 for col in list(_df.columns): 1400 if col in omit_columns: 1401 del _df[col] 1402 1403 if not params and not begin and not end: 1404 if not inplace: 1405 df = df.copy() 1406 _process_select_columns(df) 1407 _process_omit_columns(df) 1408 return df 1409 1410 from meerschaum.utils.debug import dprint 1411 from meerschaum.utils.misc import get_in_ex_params 1412 from meerschaum.utils.warnings import warn 1413 from meerschaum.utils.dtypes import are_dtypes_equal, value_is_null 1414 dateutil_parser = mrsm.attempt_import('dateutil.parser') 1415 pandas = mrsm.attempt_import('pandas') 1416 NA = pandas.NA 1417 1418 if params: 1419 params = params.copy() 1420 for key, val in {k: v for k, v in params.items()}.items(): 1421 if isinstance(val, (list, tuple)): 1422 if None in val: 1423 val = [item for item in val if item is not None] + [NA] 1424 params[key] = val 1425 if coerce_types: 1426 params[key] = [str(x) for x in val] 1427 else: 1428 if value_is_null(val): 1429 val = NA 1430 params[key] = NA 1431 if coerce_types: 1432 params[key] = str(val) 1433 1434 dtypes = {col: str(typ) for col, typ in df.dtypes.items()} 1435 1436 if inplace: 1437 df.fillna(NA, inplace=True) 1438 else: 1439 df = df.infer_objects().fillna(NA) 1440 1441 if isinstance(begin, str): 1442 begin = dateutil_parser.parse(begin) 1443 if isinstance(end, str): 1444 end = dateutil_parser.parse(end) 1445 1446 if begin is not None or end is not None: 1447 if not datetime_column or datetime_column not in df.columns: 1448 warn( 1449 f"The datetime column '{datetime_column}' is not present in the Dataframe, " 1450 + "ignoring begin and end...", 1451 ) 1452 begin, end = None, None 1453 1454 if debug: 1455 dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}") 1456 1457 if datetime_column and (begin is not None or end is not None): 1458 if debug: 1459 dprint("Checking for datetime column compatability.") 1460 1461 from meerschaum.utils.dtypes import coerce_timezone 1462 df_is_dt = are_dtypes_equal(str(df.dtypes[datetime_column]), 'datetime') 1463 begin_is_int = are_dtypes_equal(str(type(begin)), 'int') 1464 end_is_int = are_dtypes_equal(str(type(end)), 'int') 1465 1466 if df_is_dt: 1467 df_tz = ( 1468 getattr(df[datetime_column].dt, 'tz', None) 1469 if hasattr(df[datetime_column], 'dt') 1470 else None 1471 ) 1472 1473 if begin_is_int: 1474 begin = datetime.fromtimestamp(int(begin), timezone.utc).replace(tzinfo=None) 1475 if debug: 1476 dprint(f"`begin` will be cast to '{begin}'.") 1477 if end_is_int: 1478 end = datetime.fromtimestamp(int(end), timezone.utc).replace(tzinfo=None) 1479 if debug: 1480 dprint(f"`end` will be cast to '{end}'.") 1481 1482 begin = coerce_timezone(begin, strip_utc=(df_tz is None)) if begin is not None else None 1483 end = coerce_timezone(end, strip_utc=(df_tz is None)) if begin is not None else None 1484 1485 in_ex_params = get_in_ex_params(params) 1486 1487 masks = [ 1488 ( 1489 (df[datetime_column] >= begin) 1490 if begin is not None and datetime_column 1491 else True 1492 ) & ( 1493 (df[datetime_column] < end) 1494 if end is not None and datetime_column 1495 else True 1496 ) 1497 ] 1498 1499 masks.extend([ 1500 ( 1501 ( 1502 (df[col] if not coerce_types else df[col].astype(str)).isin(in_vals) 1503 if in_vals 1504 else True 1505 ) & ( 1506 ~(df[col] if not coerce_types else df[col].astype(str)).isin(ex_vals) 1507 if ex_vals 1508 else True 1509 ) 1510 ) 1511 for col, (in_vals, ex_vals) in in_ex_params.items() 1512 if col in df.columns 1513 ]) 1514 query_mask = masks[0] 1515 for mask in masks[1:]: 1516 query_mask = query_mask & mask 1517 1518 original_cols = df.columns 1519 1520 ### NOTE: We must cast bool columns to `boolean[pyarrow]` 1521 ### to allow for `<NA>` values. 1522 bool_cols = [ 1523 col 1524 for col, typ in df.dtypes.items() 1525 if are_dtypes_equal(str(typ), 'bool') 1526 ] 1527 for col in bool_cols: 1528 df[col] = df[col].astype('boolean[pyarrow]') 1529 1530 if not isinstance(query_mask, bool): 1531 df['__mrsm_mask'] = ( 1532 query_mask.astype('boolean[pyarrow]') 1533 if hasattr(query_mask, 'astype') 1534 else query_mask 1535 ) 1536 1537 if inplace: 1538 df.where(query_mask, other=NA, inplace=True) 1539 df.dropna(how='all', inplace=True) 1540 result_df = df 1541 else: 1542 result_df = df.where(query_mask, other=NA) 1543 result_df.dropna(how='all', inplace=True) 1544 1545 else: 1546 result_df = df 1547 1548 if '__mrsm_mask' in df.columns: 1549 del df['__mrsm_mask'] 1550 if '__mrsm_mask' in result_df.columns: 1551 del result_df['__mrsm_mask'] 1552 1553 if reset_index: 1554 result_df.reset_index(drop=True, inplace=True) 1555 1556 result_df = enforce_dtypes( 1557 result_df, 1558 dtypes, 1559 safe_copy=False, 1560 debug=debug, 1561 coerce_numeric=False, 1562 coerce_timezone=False, 1563 ) 1564 1565 if select_columns == ['*']: 1566 select_columns = None 1567 1568 if not select_columns and not omit_columns: 1569 return result_df[original_cols] 1570 1571 _process_select_columns(result_df) 1572 _process_omit_columns(result_df) 1573 1574 return result_df
Query the dataframe with the params dictionary.
Parameters
- df (pd.DataFrame): The DataFrame to query against.
- params (Optional[Dict[str, Any]], default None): The parameters dictionary to use for the query.
- begin (Union[datetime, int, None], default None):
If
begin
anddatetime_column
are provided, only return rows with a timestamp greater than or equal to this value. - end (Union[datetime, int, None], default None):
If
begin
anddatetime_column
are provided, only return rows with a timestamp less than this value. - datetime_column (Optional[str], default None):
A
datetime_column
must be provided to usebegin
andend
. - select_columns (Optional[List[str]], default None): If provided, only return these columns.
- omit_columns (Optional[List[str]], default None): If provided, do not include these columns in the result.
- inplace (bool, default False):
If
True
, modify the DataFrame inplace rather than creating a new DataFrame. - reset_index (bool, default False):
If
True
, reset the index in the resulting DataFrame. - coerce_types (bool, default False):
If
True
, cast the dataframe and parameters as strings before querying.
Returns
- A Pandas DataFrame query result.
1577def to_json( 1578 df: 'pd.DataFrame', 1579 safe_copy: bool = True, 1580 orient: str = 'records', 1581 date_format: str = 'iso', 1582 date_unit: str = 'us', 1583 **kwargs: Any 1584) -> str: 1585 """ 1586 Serialize the given dataframe as a JSON string. 1587 1588 Parameters 1589 ---------- 1590 df: pd.DataFrame 1591 The DataFrame to be serialized. 1592 1593 safe_copy: bool, default True 1594 If `False`, modify the DataFrame inplace. 1595 1596 date_format: str, default 'iso' 1597 The default format for timestamps. 1598 1599 date_unit: str, default 'us' 1600 The precision of the timestamps. 1601 1602 Returns 1603 ------- 1604 A JSON string. 1605 """ 1606 from meerschaum.utils.packages import import_pandas 1607 from meerschaum.utils.dtypes import serialize_bytes, serialize_decimal 1608 pd = import_pandas() 1609 uuid_cols = get_uuid_cols(df) 1610 bytes_cols = get_bytes_cols(df) 1611 numeric_cols = get_numeric_cols(df) 1612 if safe_copy and bool(uuid_cols or bytes_cols): 1613 df = df.copy() 1614 for col in uuid_cols: 1615 df[col] = df[col].astype(str) 1616 for col in bytes_cols: 1617 df[col] = df[col].apply(serialize_bytes) 1618 for col in numeric_cols: 1619 df[col] = df[col].apply(serialize_decimal) 1620 return df.infer_objects(copy=False).fillna(pd.NA).to_json( 1621 date_format=date_format, 1622 date_unit=date_unit, 1623 orient=orient, 1624 **kwargs 1625 )
Serialize the given dataframe as a JSON string.
Parameters
- df (pd.DataFrame): The DataFrame to be serialized.
- safe_copy (bool, default True):
If
False
, modify the DataFrame inplace. - date_format (str, default 'iso'): The default format for timestamps.
- date_unit (str, default 'us'): The precision of the timestamps.
Returns
- A JSON string.