meerschaum.utils.dataframe
Utility functions for working with DataFrames.
1#! /usr/bin/env python3 2# -*- coding: utf-8 -*- 3# vim:fenc=utf-8 4 5""" 6Utility functions for working with DataFrames. 7""" 8 9from __future__ import annotations 10from datetime import datetime, timezone 11from collections import defaultdict 12 13import meerschaum as mrsm 14from meerschaum.utils.typing import ( 15 Optional, Dict, Any, List, Hashable, Generator, 16 Iterator, Iterable, Union, TYPE_CHECKING, 17) 18 19if TYPE_CHECKING: 20 pd, dask = mrsm.attempt_import('pandas', 'dask') 21 22 23def add_missing_cols_to_df( 24 df: 'pd.DataFrame', 25 dtypes: Dict[str, Any], 26) -> 'pd.DataFrame': 27 """ 28 Add columns from the dtypes dictionary as null columns to a new DataFrame. 29 30 Parameters 31 ---------- 32 df: pd.DataFrame 33 The dataframe we should copy and add null columns. 34 35 dtypes: 36 The data types dictionary which may contain keys not present in `df.columns`. 37 38 Returns 39 ------- 40 A new `DataFrame` with the keys from `dtypes` added as null columns. 41 If `df.dtypes` is the same as `dtypes`, then return a reference to `df`. 42 NOTE: This will not ensure that dtypes are enforced! 43 44 Examples 45 -------- 46 >>> import pandas as pd 47 >>> df = pd.DataFrame([{'a': 1}]) 48 >>> dtypes = {'b': 'Int64'} 49 >>> add_missing_cols_to_df(df, dtypes) 50 a b 51 0 1 <NA> 52 >>> add_missing_cols_to_df(df, dtypes).dtypes 53 a int64 54 b Int64 55 dtype: object 56 >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes 57 a int64 58 dtype: object 59 >>> 60 """ 61 if set(df.columns) == set(dtypes): 62 return df 63 64 from meerschaum.utils.packages import attempt_import 65 from meerschaum.utils.dtypes import to_pandas_dtype 66 pandas = attempt_import('pandas') 67 68 def build_series(dtype: str): 69 return pandas.Series([], dtype=to_pandas_dtype(dtype)) 70 71 assign_kwargs = { 72 str(col): build_series(str(typ)) 73 for col, typ in dtypes.items() 74 if col not in df.columns 75 } 76 df_with_cols = df.assign(**assign_kwargs) 77 for col in assign_kwargs: 78 df_with_cols[col] = df_with_cols[col].fillna(pandas.NA) 79 return df_with_cols 80 81 82def filter_unseen_df( 83 old_df: 'pd.DataFrame', 84 new_df: 'pd.DataFrame', 85 safe_copy: bool = True, 86 dtypes: Optional[Dict[str, Any]] = None, 87 include_unchanged_columns: bool = False, 88 debug: bool = False, 89) -> 'pd.DataFrame': 90 """ 91 Left join two DataFrames to find the newest unseen data. 92 93 Parameters 94 ---------- 95 old_df: 'pd.DataFrame' 96 The original (target) dataframe. Acts as a filter on the `new_df`. 97 98 new_df: 'pd.DataFrame' 99 The fetched (source) dataframe. Rows that are contained in `old_df` are removed. 100 101 safe_copy: bool, default True 102 If `True`, create a copy before comparing and modifying the dataframes. 103 Setting to `False` may mutate the DataFrames. 104 105 dtypes: Optional[Dict[str, Any]], default None 106 Optionally specify the datatypes of the dataframe. 107 108 include_unchanged_columns: bool, default False 109 If `True`, include columns which haven't changed on rows which have changed. 110 111 debug: bool, default False 112 Verbosity toggle. 113 114 Returns 115 ------- 116 A pandas dataframe of the new, unseen rows in `new_df`. 117 118 Examples 119 -------- 120 ```python 121 >>> import pandas as pd 122 >>> df1 = pd.DataFrame({'a': [1,2]}) 123 >>> df2 = pd.DataFrame({'a': [2,3]}) 124 >>> filter_unseen_df(df1, df2) 125 a 126 0 3 127 128 ``` 129 130 """ 131 if old_df is None: 132 return new_df 133 134 if safe_copy: 135 old_df = old_df.copy() 136 new_df = new_df.copy() 137 138 import json 139 import functools 140 import traceback 141 from decimal import Decimal 142 from uuid import UUID 143 from meerschaum.utils.warnings import warn 144 from meerschaum.utils.packages import import_pandas, attempt_import 145 from meerschaum.utils.dtypes import ( 146 to_pandas_dtype, 147 are_dtypes_equal, 148 attempt_cast_to_numeric, 149 attempt_cast_to_uuid, 150 coerce_timezone, 151 ) 152 pd = import_pandas(debug=debug) 153 is_dask = 'dask' in new_df.__module__ 154 if is_dask: 155 pandas = attempt_import('pandas') 156 _ = attempt_import('partd', lazy=False) 157 dd = attempt_import('dask.dataframe') 158 merge = dd.merge 159 NA = pandas.NA 160 else: 161 merge = pd.merge 162 NA = pd.NA 163 164 new_df_dtypes = dict(new_df.dtypes) 165 old_df_dtypes = dict(old_df.dtypes) 166 167 same_cols = set(new_df.columns) == set(old_df.columns) 168 if not same_cols: 169 new_df = add_missing_cols_to_df(new_df, old_df_dtypes) 170 old_df = add_missing_cols_to_df(old_df, new_df_dtypes) 171 172 new_types_missing_from_old = { 173 col: typ 174 for col, typ in new_df_dtypes.items() 175 if col not in old_df_dtypes 176 } 177 old_types_missing_from_new = { 178 col: typ 179 for col, typ in new_df_dtypes.items() 180 if col not in old_df_dtypes 181 } 182 old_df_dtypes.update(new_types_missing_from_old) 183 new_df_dtypes.update(old_types_missing_from_new) 184 185 ### Edge case: two empty lists cast to DFs. 186 elif len(new_df.columns) == 0: 187 return new_df 188 189 try: 190 ### Order matters when checking equality. 191 new_df = new_df[old_df.columns] 192 193 except Exception as e: 194 warn( 195 "Was not able to cast old columns onto new DataFrame. " + 196 f"Are both DataFrames the same shape? Error:\n{e}", 197 stacklevel=3, 198 ) 199 return new_df[list(new_df_dtypes.keys())] 200 201 ### assume the old_df knows what it's doing, even if it's technically wrong. 202 if dtypes is None: 203 dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()} 204 205 dtypes = { 206 col: to_pandas_dtype(typ) 207 for col, typ in dtypes.items() 208 if col in new_df_dtypes and col in old_df_dtypes 209 } 210 for col, typ in new_df_dtypes.items(): 211 if col not in dtypes: 212 dtypes[col] = typ 213 214 dt_dtypes = { 215 col: typ 216 for col, typ in dtypes.items() 217 if are_dtypes_equal(typ, 'datetime') 218 } 219 non_dt_dtypes = { 220 col: typ 221 for col, typ in dtypes.items() 222 if col not in dt_dtypes 223 } 224 225 cast_non_dt_cols = True 226 try: 227 new_df = new_df.astype(non_dt_dtypes) 228 cast_non_dt_cols = False 229 except Exception as e: 230 warn( 231 f"Was not able to cast the new DataFrame to the given dtypes.\n{e}" 232 ) 233 234 cast_dt_cols = True 235 try: 236 for col, typ in dt_dtypes.items(): 237 strip_utc = ( 238 (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]' 239 ) 240 if col in old_df.columns: 241 old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc) 242 if col in new_df.columns: 243 new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc) 244 cast_dt_cols = False 245 except Exception as e: 246 warn(f"Could not cast datetime columns:\n{e}") 247 248 cast_cols = cast_dt_cols or cast_non_dt_cols 249 250 new_numeric_cols_existing = get_numeric_cols(new_df) 251 old_numeric_cols = get_numeric_cols(old_df) 252 for col, typ in {k: v for k, v in dtypes.items()}.items(): 253 if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')): 254 new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float') 255 new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int') 256 new_is_numeric = col in new_numeric_cols_existing 257 old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float') 258 old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int') 259 old_is_numeric = col in old_numeric_cols 260 261 if ( 262 (new_is_float or new_is_int or new_is_numeric) 263 and 264 (old_is_float or old_is_int or old_is_numeric) 265 ): 266 dtypes[col] = attempt_cast_to_numeric 267 cast_cols = True 268 continue 269 270 ### Fallback to object if the types don't match. 271 warn( 272 f"Detected different types for '{col}' " 273 + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), " 274 + "falling back to 'object'..." 275 ) 276 dtypes[col] = 'object' 277 cast_cols = True 278 279 if cast_cols: 280 for col, dtype in dtypes.items(): 281 if col in new_df.columns: 282 try: 283 new_df[col] = ( 284 new_df[col].astype(dtype) 285 if not callable(dtype) 286 else new_df[col].apply(dtype) 287 ) 288 except Exception as e: 289 warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}") 290 291 serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str) 292 new_json_cols = get_json_cols(new_df) 293 old_json_cols = get_json_cols(old_df) 294 json_cols = set(new_json_cols + old_json_cols) 295 for json_col in old_json_cols: 296 old_df[json_col] = old_df[json_col].apply(serializer) 297 for json_col in new_json_cols: 298 new_df[json_col] = new_df[json_col].apply(serializer) 299 300 new_numeric_cols = get_numeric_cols(new_df) 301 numeric_cols = set(new_numeric_cols + old_numeric_cols) 302 for numeric_col in old_numeric_cols: 303 old_df[numeric_col] = old_df[numeric_col].apply( 304 lambda x: f'{x:f}' if isinstance(x, Decimal) else x 305 ) 306 for numeric_col in new_numeric_cols: 307 new_df[numeric_col] = new_df[numeric_col].apply( 308 lambda x: f'{x:f}' if isinstance(x, Decimal) else x 309 ) 310 311 old_dt_cols = [ 312 col 313 for col, typ in old_df.dtypes.items() 314 if are_dtypes_equal(str(typ), 'datetime') 315 ] 316 for col in old_dt_cols: 317 strip_utc = ( 318 (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]' 319 ) 320 old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc) 321 322 new_dt_cols = [ 323 col 324 for col, typ in new_df.dtypes.items() 325 if are_dtypes_equal(str(typ), 'datetime') 326 ] 327 for col in new_dt_cols: 328 strip_utc = ( 329 (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]' 330 ) 331 new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc) 332 333 old_uuid_cols = get_uuid_cols(old_df) 334 new_uuid_cols = get_uuid_cols(new_df) 335 uuid_cols = set(new_uuid_cols + old_uuid_cols) 336 joined_df = merge( 337 new_df.infer_objects(copy=False).fillna(NA), 338 old_df.infer_objects(copy=False).fillna(NA), 339 how='left', 340 on=None, 341 indicator=True, 342 ) 343 changed_rows_mask = (joined_df['_merge'] == 'left_only') 344 new_cols = list(new_df_dtypes) 345 delta_df = joined_df[new_cols][changed_rows_mask].reset_index(drop=True) 346 347 for json_col in json_cols: 348 if json_col not in delta_df.columns: 349 continue 350 try: 351 delta_df[json_col] = delta_df[json_col].apply(json.loads) 352 except Exception: 353 warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}") 354 355 for numeric_col in numeric_cols: 356 if numeric_col not in delta_df.columns: 357 continue 358 try: 359 delta_df[numeric_col] = delta_df[numeric_col].apply(attempt_cast_to_numeric) 360 except Exception: 361 warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}") 362 363 for uuid_col in uuid_cols: 364 if uuid_col not in delta_df.columns: 365 continue 366 try: 367 delta_df[uuid_col] = delta_df[uuid_col].apply(attempt_cast_to_uuid) 368 except Exception: 369 warn(f"Unable to parse numeric column '{uuid_col}':\n{traceback.format_exc()}") 370 371 return delta_df 372 373 374def parse_df_datetimes( 375 df: 'pd.DataFrame', 376 ignore_cols: Optional[Iterable[str]] = None, 377 strip_timezone: bool = False, 378 chunksize: Optional[int] = None, 379 dtype_backend: str = 'numpy_nullable', 380 debug: bool = False, 381) -> 'pd.DataFrame': 382 """ 383 Parse a pandas DataFrame for datetime columns and cast as datetimes. 384 385 Parameters 386 ---------- 387 df: pd.DataFrame 388 The pandas DataFrame to parse. 389 390 ignore_cols: Optional[Iterable[str]], default None 391 If provided, do not attempt to coerce these columns as datetimes. 392 393 strip_timezone: bool, default False 394 If `True`, remove the UTC `tzinfo` property. 395 396 chunksize: Optional[int], default None 397 If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe. 398 399 dtype_backend: str, default 'numpy_nullable' 400 If `df` is not a DataFrame and new one needs to be constructed, 401 use this as the datatypes backend. 402 Accepted values are 'numpy_nullable' and 'pyarrow'. 403 404 debug: bool, default False 405 Verbosity toggle. 406 407 Returns 408 ------- 409 A new pandas DataFrame with the determined datetime columns 410 (usually ISO strings) cast as datetimes. 411 412 Examples 413 -------- 414 ```python 415 >>> import pandas as pd 416 >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 417 >>> df.dtypes 418 a object 419 dtype: object 420 >>> df = parse_df_datetimes(df) 421 >>> df.dtypes 422 a datetime64[ns] 423 dtype: object 424 425 ``` 426 427 """ 428 from meerschaum.utils.packages import import_pandas, attempt_import 429 from meerschaum.utils.debug import dprint 430 from meerschaum.utils.warnings import warn 431 from meerschaum.utils.misc import items_str 432 import traceback 433 pd = import_pandas() 434 pandas = attempt_import('pandas') 435 pd_name = pd.__name__ 436 using_dask = 'dask' in pd_name 437 df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__) 438 dask_dataframe = None 439 if using_dask or df_is_dask: 440 npartitions = chunksize_to_npartitions(chunksize) 441 dask_dataframe = attempt_import('dask.dataframe') 442 443 ### if df is a dict, build DataFrame 444 if isinstance(df, pandas.DataFrame): 445 pdf = df 446 elif df_is_dask and isinstance(df, dask_dataframe.DataFrame): 447 pdf = get_first_valid_dask_partition(df) 448 else: 449 if debug: 450 dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...") 451 452 if using_dask: 453 if isinstance(df, list): 454 keys = set() 455 for doc in df: 456 for key in doc: 457 keys.add(key) 458 df = pd.DataFrame.from_dict( 459 { 460 k: [ 461 doc.get(k, None) 462 for doc in df 463 ] for k in keys 464 }, 465 npartitions=npartitions, 466 ) 467 elif isinstance(df, dict): 468 df = pd.DataFrame.from_dict(df, npartitions=npartitions) 469 elif 'pandas.core.frame.DataFrame' in str(type(df)): 470 df = pd.from_pandas(df, npartitions=npartitions) 471 else: 472 raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.") 473 pandas = attempt_import('pandas') 474 pdf = get_first_valid_dask_partition(df) 475 476 else: 477 df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend) 478 pdf = df 479 480 ### skip parsing if DataFrame is empty 481 if len(pdf) == 0: 482 if debug: 483 dprint(f"df is empty. Returning original DataFrame without casting datetime columns...") 484 return df 485 486 ignore_cols = set( 487 (ignore_cols or []) + [ 488 col 489 for col, dtype in pdf.dtypes.items() 490 if 'datetime' in str(dtype) 491 ] 492 ) 493 cols_to_inspect = [col for col in pdf.columns if col not in ignore_cols] 494 495 if len(cols_to_inspect) == 0: 496 if debug: 497 dprint(f"All columns are ignored, skipping datetime detection...") 498 return df.fillna(pandas.NA) 499 500 ### apply regex to columns to determine which are ISO datetimes 501 iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+' 502 dt_mask = pdf[cols_to_inspect].astype(str).apply( 503 lambda s: s.str.match(iso_dt_regex).all() 504 ) 505 506 ### list of datetime column names 507 datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]] 508 if not datetime_cols: 509 if debug: 510 dprint("No columns detected as datetimes, returning...") 511 return df.fillna(pandas.NA) 512 513 if debug: 514 dprint("Converting columns to datetimes: " + str(datetime_cols)) 515 516 try: 517 if not using_dask: 518 df[datetime_cols] = df[datetime_cols].apply( 519 pd.to_datetime, 520 utc=True, 521 format='ISO8601', 522 ) 523 else: 524 df[datetime_cols] = df[datetime_cols].apply( 525 pd.to_datetime, 526 utc=True, 527 axis=1, 528 meta={ 529 col: 'datetime64[ns, UTC]' 530 for col in datetime_cols 531 } 532 ) 533 except Exception: 534 warn( 535 f"Unable to apply `pd.to_datetime` to {items_str(datetime_cols)}:\n" 536 + f"{traceback.format_exc()}" 537 ) 538 539 if strip_timezone: 540 for dt in datetime_cols: 541 try: 542 df[dt] = df[dt].dt.tz_localize(None) 543 except Exception: 544 warn( 545 f"Unable to convert column '{dt}' to naive datetime:\n" 546 + f"{traceback.format_exc()}" 547 ) 548 549 return df.fillna(pandas.NA) 550 551 552def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]: 553 """ 554 Get the columns which contain unhashable objects from a Pandas DataFrame. 555 556 Parameters 557 ---------- 558 df: pd.DataFrame 559 The DataFrame which may contain unhashable objects. 560 561 Returns 562 ------- 563 A list of columns. 564 """ 565 if df is None: 566 return [] 567 if len(df) == 0: 568 return [] 569 570 is_dask = 'dask' in df.__module__ 571 if is_dask: 572 from meerschaum.utils.packages import attempt_import 573 pandas = attempt_import('pandas') 574 df = pandas.DataFrame(get_first_valid_dask_partition(df)) 575 return [ 576 col for col, val in df.iloc[0].items() 577 if not isinstance(val, Hashable) 578 ] 579 580 581def get_json_cols(df: 'pd.DataFrame') -> List[str]: 582 """ 583 Get the columns which contain unhashable objects from a Pandas DataFrame. 584 585 Parameters 586 ---------- 587 df: pd.DataFrame 588 The DataFrame which may contain unhashable objects. 589 590 Returns 591 ------- 592 A list of columns to be encoded as JSON. 593 """ 594 if df is None: 595 return [] 596 597 is_dask = 'dask' in df.__module__ if hasattr(df, '__module__') else False 598 if is_dask: 599 df = get_first_valid_dask_partition(df) 600 601 if len(df) == 0: 602 return [] 603 604 cols_indices = { 605 col: df[col].first_valid_index() 606 for col in df.columns 607 } 608 return [ 609 col 610 for col, ix in cols_indices.items() 611 if ( 612 ix is not None 613 and 614 not isinstance(df.loc[ix][col], Hashable) 615 ) 616 ] 617 618 619def get_numeric_cols(df: 'pd.DataFrame') -> List[str]: 620 """ 621 Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame. 622 623 Parameters 624 ---------- 625 df: pd.DataFrame 626 The DataFrame which may contain decimal objects. 627 628 Returns 629 ------- 630 A list of columns to treat as numerics. 631 """ 632 if df is None: 633 return [] 634 from decimal import Decimal 635 is_dask = 'dask' in df.__module__ 636 if is_dask: 637 df = get_first_valid_dask_partition(df) 638 639 if len(df) == 0: 640 return [] 641 642 cols_indices = { 643 col: df[col].first_valid_index() 644 for col in df.columns 645 } 646 return [ 647 col 648 for col, ix in cols_indices.items() 649 if ( 650 ix is not None 651 and 652 isinstance(df.loc[ix][col], Decimal) 653 ) 654 ] 655 656 657def get_uuid_cols(df: 'pd.DataFrame') -> List[str]: 658 """ 659 Get the columns which contain `uuid.UUID` objects from a Pandas DataFrame. 660 661 Parameters 662 ---------- 663 df: pd.DataFrame 664 The DataFrame which may contain UUID objects. 665 666 Returns 667 ------- 668 A list of columns to treat as numerics. 669 """ 670 if df is None: 671 return [] 672 from uuid import UUID 673 is_dask = 'dask' in df.__module__ 674 if is_dask: 675 df = get_first_valid_dask_partition(df) 676 677 if len(df) == 0: 678 return [] 679 680 cols_indices = { 681 col: df[col].first_valid_index() 682 for col in df.columns 683 } 684 return [ 685 col 686 for col, ix in cols_indices.items() 687 if ( 688 ix is not None 689 and 690 isinstance(df.loc[ix][col], UUID) 691 ) 692 ] 693 694 695def enforce_dtypes( 696 df: 'pd.DataFrame', 697 dtypes: Dict[str, str], 698 safe_copy: bool = True, 699 coerce_numeric: bool = True, 700 coerce_timezone: bool = True, 701 strip_timezone: bool = False, 702 debug: bool = False, 703) -> 'pd.DataFrame': 704 """ 705 Enforce the `dtypes` dictionary on a DataFrame. 706 707 Parameters 708 ---------- 709 df: pd.DataFrame 710 The DataFrame on which to enforce dtypes. 711 712 dtypes: Dict[str, str] 713 The data types to attempt to enforce on the DataFrame. 714 715 safe_copy: bool, default True 716 If `True`, create a copy before comparing and modifying the dataframes. 717 Setting to `False` may mutate the DataFrames. 718 See `meerschaum.utils.dataframe.filter_unseen_df`. 719 720 coerce_numeric: bool, default True 721 If `True`, convert float and int collisions to numeric. 722 723 coerce_timezone: bool, default True 724 If `True`, convert datetimes to UTC. 725 726 strip_timezone: bool, default False 727 If `coerce_timezone` and `strip_timezone` are `True`, 728 remove timezone information from datetimes. 729 730 debug: bool, default False 731 Verbosity toggle. 732 733 Returns 734 ------- 735 The Pandas DataFrame with the types enforced. 736 """ 737 import json 738 from meerschaum.utils.debug import dprint 739 from meerschaum.utils.formatting import pprint 740 from meerschaum.utils.dtypes import ( 741 are_dtypes_equal, 742 to_pandas_dtype, 743 is_dtype_numeric, 744 attempt_cast_to_numeric, 745 attempt_cast_to_uuid, 746 coerce_timezone as _coerce_timezone, 747 ) 748 pandas = mrsm.attempt_import('pandas') 749 is_dask = 'dask' in df.__module__ 750 if safe_copy: 751 df = df.copy() 752 if len(df.columns) == 0: 753 if debug: 754 dprint("Incoming DataFrame has no columns. Skipping enforcement...") 755 return df 756 757 pipe_pandas_dtypes = { 758 col: to_pandas_dtype(typ) 759 for col, typ in dtypes.items() 760 } 761 json_cols = [ 762 col 763 for col, typ in dtypes.items() 764 if typ == 'json' 765 ] 766 numeric_cols = [ 767 col 768 for col, typ in dtypes.items() 769 if typ == 'numeric' 770 ] 771 uuid_cols = [ 772 col 773 for col, typ in dtypes.items() 774 if typ == 'uuid' 775 ] 776 datetime_cols = [ 777 col 778 for col, typ in dtypes.items() 779 if are_dtypes_equal(typ, 'datetime') 780 ] 781 df_numeric_cols = get_numeric_cols(df) 782 if debug: 783 dprint("Desired data types:") 784 pprint(dtypes) 785 dprint("Data types for incoming DataFrame:") 786 pprint({_col: str(_typ) for _col, _typ in df.dtypes.items()}) 787 788 if json_cols and len(df) > 0: 789 if debug: 790 dprint(f"Checking columns for JSON encoding: {json_cols}") 791 for col in json_cols: 792 if col in df.columns: 793 try: 794 df[col] = df[col].apply( 795 ( 796 lambda x: ( 797 json.loads(x) 798 if isinstance(x, str) 799 else x 800 ) 801 ) 802 ) 803 except Exception as e: 804 if debug: 805 dprint(f"Unable to parse column '{col}' as JSON:\n{e}") 806 807 if numeric_cols: 808 if debug: 809 dprint(f"Checking for numerics: {numeric_cols}") 810 for col in numeric_cols: 811 if col in df.columns: 812 try: 813 df[col] = df[col].apply(attempt_cast_to_numeric) 814 except Exception as e: 815 if debug: 816 dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}") 817 818 if uuid_cols: 819 if debug: 820 dprint(f"Checking for UUIDs: {uuid_cols}") 821 for col in uuid_cols: 822 if col in df.columns: 823 try: 824 df[col] = df[col].apply(attempt_cast_to_uuid) 825 except Exception as e: 826 if debug: 827 dprint(f"Unable to parse column '{col}' as UUID:\n{e}") 828 829 if datetime_cols and coerce_timezone: 830 if debug: 831 dprint(f"Checking for datetime conversion: {datetime_cols}") 832 for col in datetime_cols: 833 if col in df.columns: 834 df[col] = _coerce_timezone(df[col], strip_utc=strip_timezone) 835 836 df_dtypes = {c: str(t) for c, t in df.dtypes.items()} 837 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 838 if debug: 839 dprint("Data types match. Exiting enforcement...") 840 return df 841 842 common_dtypes = {} 843 common_diff_dtypes = {} 844 for col, typ in pipe_pandas_dtypes.items(): 845 if col in df_dtypes: 846 common_dtypes[col] = typ 847 if not are_dtypes_equal(typ, df_dtypes[col]): 848 common_diff_dtypes[col] = df_dtypes[col] 849 850 if debug: 851 dprint("Common columns with different dtypes:") 852 pprint(common_diff_dtypes) 853 854 detected_dt_cols = {} 855 for col, typ in common_diff_dtypes.items(): 856 if 'datetime' in typ and 'datetime' in common_dtypes[col]: 857 df_dtypes[col] = typ 858 detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col]) 859 for col in detected_dt_cols: 860 del common_diff_dtypes[col] 861 862 if debug: 863 dprint("Common columns with different dtypes (after dates):") 864 pprint(common_diff_dtypes) 865 866 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 867 if debug: 868 dprint( 869 "The incoming DataFrame has mostly the same types, skipping enforcement." 870 + "The only detected difference was in the following datetime columns." 871 ) 872 pprint(detected_dt_cols) 873 return df 874 875 for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items(): 876 previous_typ = common_dtypes[col] 877 mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ)) 878 explicitly_float = are_dtypes_equal(dtypes.get(col, 'object'), 'float') 879 explicitly_numeric = dtypes.get(col, 'numeric') == 'numeric' 880 cast_to_numeric = ( 881 explicitly_numeric 882 or col in df_numeric_cols 883 or (mixed_numeric_types and not explicitly_float) 884 ) and coerce_numeric 885 if cast_to_numeric: 886 common_dtypes[col] = attempt_cast_to_numeric 887 common_diff_dtypes[col] = attempt_cast_to_numeric 888 889 for d in common_diff_dtypes: 890 t = common_dtypes[d] 891 if debug: 892 dprint(f"Casting column {d} to dtype {t}.") 893 try: 894 df[d] = ( 895 df[d].apply(t) 896 if callable(t) 897 else df[d].astype(t) 898 ) 899 except Exception as e: 900 if debug: 901 dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}") 902 if 'int' in str(t).lower(): 903 try: 904 df[d] = df[d].astype('float64').astype(t) 905 except Exception: 906 if debug: 907 dprint(f"Was unable to convert to float then {t}.") 908 return df 909 910 911def get_datetime_bound_from_df( 912 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]], 913 datetime_column: str, 914 minimum: bool = True, 915) -> Union[int, datetime, None]: 916 """ 917 Return the minimum or maximum datetime (or integer) from a DataFrame. 918 919 Parameters 920 ---------- 921 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]] 922 The DataFrame, list, or dict which contains the range axis. 923 924 datetime_column: str 925 The name of the datetime (or int) column. 926 927 minimum: bool 928 Whether to return the minimum (default) or maximum value. 929 930 Returns 931 ------- 932 The minimum or maximum datetime value in the dataframe, or `None`. 933 """ 934 if df is None: 935 return None 936 if not datetime_column: 937 return None 938 939 def compare(a, b): 940 if a is None: 941 return b 942 if b is None: 943 return a 944 if minimum: 945 return a if a < b else b 946 return a if a > b else b 947 948 if isinstance(df, list): 949 if len(df) == 0: 950 return None 951 best_yet = df[0].get(datetime_column, None) 952 for doc in df: 953 val = doc.get(datetime_column, None) 954 best_yet = compare(best_yet, val) 955 return best_yet 956 957 if isinstance(df, dict): 958 if datetime_column not in df: 959 return None 960 best_yet = df[datetime_column][0] 961 for val in df[datetime_column]: 962 best_yet = compare(best_yet, val) 963 return best_yet 964 965 if 'DataFrame' in str(type(df)): 966 from meerschaum.utils.dtypes import are_dtypes_equal 967 pandas = mrsm.attempt_import('pandas') 968 is_dask = 'dask' in df.__module__ 969 970 if datetime_column not in df.columns: 971 return None 972 973 try: 974 dt_val = ( 975 df[datetime_column].min(skipna=True) 976 if minimum 977 else df[datetime_column].max(skipna=True) 978 ) 979 except Exception: 980 dt_val = pandas.NA 981 if is_dask and dt_val is not None and dt_val is not pandas.NA: 982 dt_val = dt_val.compute() 983 984 return ( 985 pandas.to_datetime(dt_val).to_pydatetime() 986 if are_dtypes_equal(str(type(dt_val)), 'datetime') 987 else (dt_val if dt_val is not pandas.NA else None) 988 ) 989 990 return None 991 992 993def get_unique_index_values( 994 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]], 995 indices: List[str], 996) -> Dict[str, List[Any]]: 997 """ 998 Return a dictionary of the unique index values in a DataFrame. 999 1000 Parameters 1001 ---------- 1002 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]] 1003 The dataframe (or list or dict) which contains index values. 1004 1005 indices: List[str] 1006 The list of index columns. 1007 1008 Returns 1009 ------- 1010 A dictionary mapping indices to unique values. 1011 """ 1012 if df is None: 1013 return {} 1014 if 'dataframe' in str(type(df)).lower(): 1015 pandas = mrsm.attempt_import('pandas') 1016 return { 1017 col: list({ 1018 (val if val is not pandas.NA else None) 1019 for val in df[col].unique() 1020 }) 1021 for col in indices 1022 if col in df.columns 1023 } 1024 1025 unique_indices = defaultdict(lambda: set()) 1026 if isinstance(df, list): 1027 for doc in df: 1028 for index in indices: 1029 if index in doc: 1030 unique_indices[index].add(doc[index]) 1031 1032 elif isinstance(df, dict): 1033 for index in indices: 1034 if index in df: 1035 unique_indices[index] = unique_indices[index].union(set(df[index])) 1036 1037 return {key: list(val) for key, val in unique_indices.items()} 1038 1039 1040def df_is_chunk_generator(df: Any) -> bool: 1041 """ 1042 Determine whether to treat `df` as a chunk generator. 1043 1044 Note this should only be used in a context where generators are expected, 1045 as it will return `True` for any iterable. 1046 1047 Parameters 1048 ---------- 1049 The DataFrame or chunk generator to evaluate. 1050 1051 Returns 1052 ------- 1053 A `bool` indicating whether to treat `df` as a generator. 1054 """ 1055 return ( 1056 not isinstance(df, (dict, list, str)) 1057 and 'DataFrame' not in str(type(df)) 1058 and isinstance(df, (Generator, Iterable, Iterator)) 1059 ) 1060 1061 1062def chunksize_to_npartitions(chunksize: Optional[int]) -> int: 1063 """ 1064 Return the Dask `npartitions` value for a given `chunksize`. 1065 """ 1066 if chunksize == -1: 1067 from meerschaum.config import get_config 1068 chunksize = get_config('system', 'connectors', 'sql', 'chunksize') 1069 if chunksize is None: 1070 return 1 1071 return -1 * chunksize 1072 1073 1074def df_from_literal( 1075 pipe: Optional[mrsm.Pipe] = None, 1076 literal: str = None, 1077 debug: bool = False 1078) -> 'pd.DataFrame': 1079 """ 1080 Construct a dataframe from a literal value, using the pipe's datetime and value column names. 1081 1082 Parameters 1083 ---------- 1084 pipe: Optional['meerschaum.Pipe'], default None 1085 The pipe which will consume the literal value. 1086 1087 Returns 1088 ------- 1089 A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns 1090 and the literal as the value. 1091 """ 1092 from meerschaum.utils.packages import import_pandas 1093 from meerschaum.utils.warnings import error, warn 1094 from meerschaum.utils.debug import dprint 1095 1096 if pipe is None or literal is None: 1097 error("Please provide a Pipe and a literal value") 1098 ### this will raise an error if the columns are undefined 1099 dt_name, val_name = pipe.get_columns('datetime', 'value') 1100 1101 val = literal 1102 if isinstance(literal, str): 1103 if debug: 1104 dprint(f"Received literal string: '{literal}'") 1105 import ast 1106 try: 1107 val = ast.literal_eval(literal) 1108 except Exception as e: 1109 warn( 1110 "Failed to parse value from string:\n" + f"{literal}" + 1111 "\n\nWill cast as a string instead."\ 1112 ) 1113 val = literal 1114 1115 from datetime import datetime, timezone 1116 now = datetime.now(timezone.utc).replace(tzinfo=None) 1117 1118 pd = import_pandas() 1119 return pd.DataFrame({dt_name: [now], val_name: [val]}) 1120 1121 1122def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]: 1123 """ 1124 Return the first valid Dask DataFrame partition (if possible). 1125 """ 1126 pdf = None 1127 for partition in ddf.partitions: 1128 try: 1129 pdf = partition.compute() 1130 except Exception as e: 1131 continue 1132 if len(pdf) > 0: 1133 return pdf 1134 _ = mrsm.attempt_import('partd', lazy=False) 1135 return ddf.compute() 1136 1137 1138def query_df( 1139 df: 'pd.DataFrame', 1140 params: Optional[Dict[str, Any]] = None, 1141 begin: Union[datetime, int, None] = None, 1142 end: Union[datetime, int, None] = None, 1143 datetime_column: Optional[str] = None, 1144 select_columns: Optional[List[str]] = None, 1145 omit_columns: Optional[List[str]] = None, 1146 inplace: bool = False, 1147 reset_index: bool = False, 1148 coerce_types: bool = False, 1149 debug: bool = False, 1150) -> 'pd.DataFrame': 1151 """ 1152 Query the dataframe with the params dictionary. 1153 1154 Parameters 1155 ---------- 1156 df: pd.DataFrame 1157 The DataFrame to query against. 1158 1159 params: Optional[Dict[str, Any]], default None 1160 The parameters dictionary to use for the query. 1161 1162 begin: Union[datetime, int, None], default None 1163 If `begin` and `datetime_column` are provided, only return rows with a timestamp 1164 greater than or equal to this value. 1165 1166 end: Union[datetime, int, None], default None 1167 If `begin` and `datetime_column` are provided, only return rows with a timestamp 1168 less than this value. 1169 1170 datetime_column: Optional[str], default None 1171 A `datetime_column` must be provided to use `begin` and `end`. 1172 1173 select_columns: Optional[List[str]], default None 1174 If provided, only return these columns. 1175 1176 omit_columns: Optional[List[str]], default None 1177 If provided, do not include these columns in the result. 1178 1179 inplace: bool, default False 1180 If `True`, modify the DataFrame inplace rather than creating a new DataFrame. 1181 1182 reset_index: bool, default False 1183 If `True`, reset the index in the resulting DataFrame. 1184 1185 coerce_types: bool, default False 1186 If `True`, cast the dataframe and parameters as strings before querying. 1187 1188 Returns 1189 ------- 1190 A Pandas DataFrame query result. 1191 """ 1192 1193 def _process_select_columns(_df): 1194 if not select_columns: 1195 return 1196 for col in list(_df.columns): 1197 if col not in select_columns: 1198 del _df[col] 1199 1200 def _process_omit_columns(_df): 1201 if not omit_columns: 1202 return 1203 for col in list(_df.columns): 1204 if col in omit_columns: 1205 del _df[col] 1206 1207 if not params and not begin and not end: 1208 if not inplace: 1209 df = df.copy() 1210 _process_select_columns(df) 1211 _process_omit_columns(df) 1212 return df 1213 1214 from meerschaum.utils.debug import dprint 1215 from meerschaum.utils.misc import get_in_ex_params 1216 from meerschaum.utils.warnings import warn 1217 from meerschaum.utils.dtypes import are_dtypes_equal, value_is_null 1218 dateutil_parser = mrsm.attempt_import('dateutil.parser') 1219 pandas = mrsm.attempt_import('pandas') 1220 NA = pandas.NA 1221 1222 if params: 1223 params = params.copy() 1224 for key, val in {k: v for k, v in params.items()}.items(): 1225 if isinstance(val, (list, tuple)): 1226 if None in val: 1227 val = [item for item in val if item is not None] + [NA] 1228 params[key] = val 1229 if coerce_types: 1230 params[key] = [str(x) for x in val] 1231 else: 1232 if value_is_null(val): 1233 val = NA 1234 params[key] = NA 1235 if coerce_types: 1236 params[key] = str(val) 1237 1238 dtypes = {col: str(typ) for col, typ in df.dtypes.items()} 1239 1240 if inplace: 1241 df.fillna(NA, inplace=True) 1242 else: 1243 df = df.infer_objects().fillna(NA) 1244 1245 if isinstance(begin, str): 1246 begin = dateutil_parser.parse(begin) 1247 if isinstance(end, str): 1248 end = dateutil_parser.parse(end) 1249 1250 if begin is not None or end is not None: 1251 if not datetime_column or datetime_column not in df.columns: 1252 warn( 1253 f"The datetime column '{datetime_column}' is not present in the Dataframe, " 1254 + "ignoring begin and end...", 1255 ) 1256 begin, end = None, None 1257 1258 if debug: 1259 dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}") 1260 1261 if datetime_column and (begin is not None or end is not None): 1262 if debug: 1263 dprint("Checking for datetime column compatability.") 1264 1265 from meerschaum.utils.dtypes import coerce_timezone 1266 df_is_dt = are_dtypes_equal(str(df.dtypes[datetime_column]), 'datetime') 1267 begin_is_int = are_dtypes_equal(str(type(begin)), 'int') 1268 end_is_int = are_dtypes_equal(str(type(end)), 'int') 1269 1270 if df_is_dt: 1271 df_tz = ( 1272 getattr(df[datetime_column].dt, 'tz', None) 1273 if hasattr(df[datetime_column], 'dt') 1274 else None 1275 ) 1276 1277 if begin_is_int: 1278 begin = datetime.fromtimestamp(int(begin), timezone.utc).replace(tzinfo=None) 1279 if debug: 1280 dprint(f"`begin` will be cast to '{begin}'.") 1281 if end_is_int: 1282 end = datetime.fromtimestamp(int(end), timezone.utc).replace(tzinfo=None) 1283 if debug: 1284 dprint(f"`end` will be cast to '{end}'.") 1285 1286 begin = coerce_timezone(begin, strip_utc=(df_tz is None)) if begin is not None else None 1287 end = coerce_timezone(end, strip_utc=(df_tz is None)) if begin is not None else None 1288 1289 in_ex_params = get_in_ex_params(params) 1290 1291 masks = [ 1292 ( 1293 (df[datetime_column] >= begin) 1294 if begin is not None and datetime_column 1295 else True 1296 ) & ( 1297 (df[datetime_column] < end) 1298 if end is not None and datetime_column 1299 else True 1300 ) 1301 ] 1302 1303 masks.extend([ 1304 ( 1305 ( 1306 (df[col] if not coerce_types else df[col].astype(str)).isin(in_vals) 1307 if in_vals 1308 else True 1309 ) & ( 1310 ~(df[col] if not coerce_types else df[col].astype(str)).isin(ex_vals) 1311 if ex_vals 1312 else True 1313 ) 1314 ) 1315 for col, (in_vals, ex_vals) in in_ex_params.items() 1316 if col in df.columns 1317 ]) 1318 query_mask = masks[0] 1319 for mask in masks[1:]: 1320 query_mask = query_mask & mask 1321 1322 original_cols = df.columns 1323 1324 ### NOTE: We must cast bool columns to `boolean[pyarrow]` 1325 ### to allow for `<NA>` values. 1326 bool_cols = [ 1327 col 1328 for col, typ in df.dtypes.items() 1329 if are_dtypes_equal(str(typ), 'bool') 1330 ] 1331 for col in bool_cols: 1332 df[col] = df[col].astype('boolean[pyarrow]') 1333 1334 if not isinstance(query_mask, bool): 1335 df['__mrsm_mask'] = ( 1336 query_mask.astype('boolean[pyarrow]') 1337 if hasattr(query_mask, 'astype') 1338 else query_mask 1339 ) 1340 1341 if inplace: 1342 df.where(query_mask, other=NA, inplace=True) 1343 df.dropna(how='all', inplace=True) 1344 result_df = df 1345 else: 1346 result_df = df.where(query_mask, other=NA) 1347 result_df.dropna(how='all', inplace=True) 1348 1349 else: 1350 result_df = df 1351 1352 if '__mrsm_mask' in df.columns: 1353 del df['__mrsm_mask'] 1354 if '__mrsm_mask' in result_df.columns: 1355 del result_df['__mrsm_mask'] 1356 1357 if reset_index: 1358 result_df.reset_index(drop=True, inplace=True) 1359 1360 result_df = enforce_dtypes( 1361 result_df, 1362 dtypes, 1363 safe_copy=False, 1364 debug=debug, 1365 coerce_numeric=False, 1366 coerce_timezone=False, 1367 ) 1368 1369 if select_columns == ['*']: 1370 select_columns = None 1371 1372 if not select_columns and not omit_columns: 1373 return result_df[original_cols] 1374 1375 _process_select_columns(result_df) 1376 _process_omit_columns(result_df) 1377 1378 return result_df 1379 1380 1381def to_json( 1382 df: 'pd.DataFrame', 1383 safe_copy: bool = True, 1384 orient: str = 'records', 1385 date_format: str = 'iso', 1386 date_unit: str = 'us', 1387 **kwargs: Any 1388) -> str: 1389 """ 1390 Serialize the given dataframe as a JSON string. 1391 1392 Parameters 1393 ---------- 1394 df: pd.DataFrame 1395 The DataFrame to be serialized. 1396 1397 safe_copy: bool, default True 1398 If `False`, modify the DataFrame inplace. 1399 1400 date_format: str, default 'iso' 1401 The default format for timestamps. 1402 1403 date_unit: str, default 'us' 1404 The precision of the timestamps. 1405 1406 Returns 1407 ------- 1408 A JSON string. 1409 """ 1410 from meerschaum.utils.packages import import_pandas 1411 pd = import_pandas() 1412 uuid_cols = get_uuid_cols(df) 1413 if uuid_cols and safe_copy: 1414 df = df.copy() 1415 for col in uuid_cols: 1416 df[col] = df[col].astype(str) 1417 return df.infer_objects(copy=False).fillna(pd.NA).to_json( 1418 date_format=date_format, 1419 date_unit=date_unit, 1420 orient=orient, 1421 **kwargs 1422 )
24def add_missing_cols_to_df( 25 df: 'pd.DataFrame', 26 dtypes: Dict[str, Any], 27) -> 'pd.DataFrame': 28 """ 29 Add columns from the dtypes dictionary as null columns to a new DataFrame. 30 31 Parameters 32 ---------- 33 df: pd.DataFrame 34 The dataframe we should copy and add null columns. 35 36 dtypes: 37 The data types dictionary which may contain keys not present in `df.columns`. 38 39 Returns 40 ------- 41 A new `DataFrame` with the keys from `dtypes` added as null columns. 42 If `df.dtypes` is the same as `dtypes`, then return a reference to `df`. 43 NOTE: This will not ensure that dtypes are enforced! 44 45 Examples 46 -------- 47 >>> import pandas as pd 48 >>> df = pd.DataFrame([{'a': 1}]) 49 >>> dtypes = {'b': 'Int64'} 50 >>> add_missing_cols_to_df(df, dtypes) 51 a b 52 0 1 <NA> 53 >>> add_missing_cols_to_df(df, dtypes).dtypes 54 a int64 55 b Int64 56 dtype: object 57 >>> add_missing_cols_to_df(df, {'a': 'object'}).dtypes 58 a int64 59 dtype: object 60 >>> 61 """ 62 if set(df.columns) == set(dtypes): 63 return df 64 65 from meerschaum.utils.packages import attempt_import 66 from meerschaum.utils.dtypes import to_pandas_dtype 67 pandas = attempt_import('pandas') 68 69 def build_series(dtype: str): 70 return pandas.Series([], dtype=to_pandas_dtype(dtype)) 71 72 assign_kwargs = { 73 str(col): build_series(str(typ)) 74 for col, typ in dtypes.items() 75 if col not in df.columns 76 } 77 df_with_cols = df.assign(**assign_kwargs) 78 for col in assign_kwargs: 79 df_with_cols[col] = df_with_cols[col].fillna(pandas.NA) 80 return df_with_cols
Add columns from the dtypes dictionary as null columns to a new DataFrame.
Parameters
- df (pd.DataFrame): The dataframe we should copy and add null columns.
- dtypes:: The data types dictionary which may contain keys not present in
df.columns
.
Returns
- A new
DataFrame
with the keys fromdtypes
added as null columns. - If
df.dtypes
is the same asdtypes
, then return a reference todf
. - NOTE (This will not ensure that dtypes are enforced!):
Examples
>>> import pandas as pd
>>> df = pd.DataFrame([{'a': 1}])
>>> dtypes = {'b': 'Int64'}
>>> add_missing_cols_to_df(df, dtypes)
a b
0 1 <NA>
>>> add_missing_cols_to_df(df, dtypes)meerschaum.utils.dtypes
a int64
b Int64
dtype: object
>>> add_missing_cols_to_df(df, {'a': 'object'})meerschaum.utils.dtypes
a int64
dtype: object
>>>
83def filter_unseen_df( 84 old_df: 'pd.DataFrame', 85 new_df: 'pd.DataFrame', 86 safe_copy: bool = True, 87 dtypes: Optional[Dict[str, Any]] = None, 88 include_unchanged_columns: bool = False, 89 debug: bool = False, 90) -> 'pd.DataFrame': 91 """ 92 Left join two DataFrames to find the newest unseen data. 93 94 Parameters 95 ---------- 96 old_df: 'pd.DataFrame' 97 The original (target) dataframe. Acts as a filter on the `new_df`. 98 99 new_df: 'pd.DataFrame' 100 The fetched (source) dataframe. Rows that are contained in `old_df` are removed. 101 102 safe_copy: bool, default True 103 If `True`, create a copy before comparing and modifying the dataframes. 104 Setting to `False` may mutate the DataFrames. 105 106 dtypes: Optional[Dict[str, Any]], default None 107 Optionally specify the datatypes of the dataframe. 108 109 include_unchanged_columns: bool, default False 110 If `True`, include columns which haven't changed on rows which have changed. 111 112 debug: bool, default False 113 Verbosity toggle. 114 115 Returns 116 ------- 117 A pandas dataframe of the new, unseen rows in `new_df`. 118 119 Examples 120 -------- 121 ```python 122 >>> import pandas as pd 123 >>> df1 = pd.DataFrame({'a': [1,2]}) 124 >>> df2 = pd.DataFrame({'a': [2,3]}) 125 >>> filter_unseen_df(df1, df2) 126 a 127 0 3 128 129 ``` 130 131 """ 132 if old_df is None: 133 return new_df 134 135 if safe_copy: 136 old_df = old_df.copy() 137 new_df = new_df.copy() 138 139 import json 140 import functools 141 import traceback 142 from decimal import Decimal 143 from uuid import UUID 144 from meerschaum.utils.warnings import warn 145 from meerschaum.utils.packages import import_pandas, attempt_import 146 from meerschaum.utils.dtypes import ( 147 to_pandas_dtype, 148 are_dtypes_equal, 149 attempt_cast_to_numeric, 150 attempt_cast_to_uuid, 151 coerce_timezone, 152 ) 153 pd = import_pandas(debug=debug) 154 is_dask = 'dask' in new_df.__module__ 155 if is_dask: 156 pandas = attempt_import('pandas') 157 _ = attempt_import('partd', lazy=False) 158 dd = attempt_import('dask.dataframe') 159 merge = dd.merge 160 NA = pandas.NA 161 else: 162 merge = pd.merge 163 NA = pd.NA 164 165 new_df_dtypes = dict(new_df.dtypes) 166 old_df_dtypes = dict(old_df.dtypes) 167 168 same_cols = set(new_df.columns) == set(old_df.columns) 169 if not same_cols: 170 new_df = add_missing_cols_to_df(new_df, old_df_dtypes) 171 old_df = add_missing_cols_to_df(old_df, new_df_dtypes) 172 173 new_types_missing_from_old = { 174 col: typ 175 for col, typ in new_df_dtypes.items() 176 if col not in old_df_dtypes 177 } 178 old_types_missing_from_new = { 179 col: typ 180 for col, typ in new_df_dtypes.items() 181 if col not in old_df_dtypes 182 } 183 old_df_dtypes.update(new_types_missing_from_old) 184 new_df_dtypes.update(old_types_missing_from_new) 185 186 ### Edge case: two empty lists cast to DFs. 187 elif len(new_df.columns) == 0: 188 return new_df 189 190 try: 191 ### Order matters when checking equality. 192 new_df = new_df[old_df.columns] 193 194 except Exception as e: 195 warn( 196 "Was not able to cast old columns onto new DataFrame. " + 197 f"Are both DataFrames the same shape? Error:\n{e}", 198 stacklevel=3, 199 ) 200 return new_df[list(new_df_dtypes.keys())] 201 202 ### assume the old_df knows what it's doing, even if it's technically wrong. 203 if dtypes is None: 204 dtypes = {col: str(typ) for col, typ in old_df.dtypes.items()} 205 206 dtypes = { 207 col: to_pandas_dtype(typ) 208 for col, typ in dtypes.items() 209 if col in new_df_dtypes and col in old_df_dtypes 210 } 211 for col, typ in new_df_dtypes.items(): 212 if col not in dtypes: 213 dtypes[col] = typ 214 215 dt_dtypes = { 216 col: typ 217 for col, typ in dtypes.items() 218 if are_dtypes_equal(typ, 'datetime') 219 } 220 non_dt_dtypes = { 221 col: typ 222 for col, typ in dtypes.items() 223 if col not in dt_dtypes 224 } 225 226 cast_non_dt_cols = True 227 try: 228 new_df = new_df.astype(non_dt_dtypes) 229 cast_non_dt_cols = False 230 except Exception as e: 231 warn( 232 f"Was not able to cast the new DataFrame to the given dtypes.\n{e}" 233 ) 234 235 cast_dt_cols = True 236 try: 237 for col, typ in dt_dtypes.items(): 238 strip_utc = ( 239 (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]' 240 ) 241 if col in old_df.columns: 242 old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc) 243 if col in new_df.columns: 244 new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc) 245 cast_dt_cols = False 246 except Exception as e: 247 warn(f"Could not cast datetime columns:\n{e}") 248 249 cast_cols = cast_dt_cols or cast_non_dt_cols 250 251 new_numeric_cols_existing = get_numeric_cols(new_df) 252 old_numeric_cols = get_numeric_cols(old_df) 253 for col, typ in {k: v for k, v in dtypes.items()}.items(): 254 if not are_dtypes_equal(new_df_dtypes.get(col, 'None'), old_df_dtypes.get(col, 'None')): 255 new_is_float = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'float') 256 new_is_int = are_dtypes_equal(new_df_dtypes.get(col, 'None'), 'int') 257 new_is_numeric = col in new_numeric_cols_existing 258 old_is_float = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'float') 259 old_is_int = are_dtypes_equal(old_df_dtypes.get(col, 'None'), 'int') 260 old_is_numeric = col in old_numeric_cols 261 262 if ( 263 (new_is_float or new_is_int or new_is_numeric) 264 and 265 (old_is_float or old_is_int or old_is_numeric) 266 ): 267 dtypes[col] = attempt_cast_to_numeric 268 cast_cols = True 269 continue 270 271 ### Fallback to object if the types don't match. 272 warn( 273 f"Detected different types for '{col}' " 274 + f"({new_df_dtypes.get(col, None)} vs {old_df_dtypes.get(col, None)}), " 275 + "falling back to 'object'..." 276 ) 277 dtypes[col] = 'object' 278 cast_cols = True 279 280 if cast_cols: 281 for col, dtype in dtypes.items(): 282 if col in new_df.columns: 283 try: 284 new_df[col] = ( 285 new_df[col].astype(dtype) 286 if not callable(dtype) 287 else new_df[col].apply(dtype) 288 ) 289 except Exception as e: 290 warn(f"Was not able to cast column '{col}' to dtype '{dtype}'.\n{e}") 291 292 serializer = functools.partial(json.dumps, sort_keys=True, separators=(',', ':'), default=str) 293 new_json_cols = get_json_cols(new_df) 294 old_json_cols = get_json_cols(old_df) 295 json_cols = set(new_json_cols + old_json_cols) 296 for json_col in old_json_cols: 297 old_df[json_col] = old_df[json_col].apply(serializer) 298 for json_col in new_json_cols: 299 new_df[json_col] = new_df[json_col].apply(serializer) 300 301 new_numeric_cols = get_numeric_cols(new_df) 302 numeric_cols = set(new_numeric_cols + old_numeric_cols) 303 for numeric_col in old_numeric_cols: 304 old_df[numeric_col] = old_df[numeric_col].apply( 305 lambda x: f'{x:f}' if isinstance(x, Decimal) else x 306 ) 307 for numeric_col in new_numeric_cols: 308 new_df[numeric_col] = new_df[numeric_col].apply( 309 lambda x: f'{x:f}' if isinstance(x, Decimal) else x 310 ) 311 312 old_dt_cols = [ 313 col 314 for col, typ in old_df.dtypes.items() 315 if are_dtypes_equal(str(typ), 'datetime') 316 ] 317 for col in old_dt_cols: 318 strip_utc = ( 319 (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]' 320 ) 321 old_df[col] = coerce_timezone(old_df[col], strip_utc=strip_utc) 322 323 new_dt_cols = [ 324 col 325 for col, typ in new_df.dtypes.items() 326 if are_dtypes_equal(str(typ), 'datetime') 327 ] 328 for col in new_dt_cols: 329 strip_utc = ( 330 (dtypes or {}).get(col, 'datetime') == 'datetime64[ns]' 331 ) 332 new_df[col] = coerce_timezone(new_df[col], strip_utc=strip_utc) 333 334 old_uuid_cols = get_uuid_cols(old_df) 335 new_uuid_cols = get_uuid_cols(new_df) 336 uuid_cols = set(new_uuid_cols + old_uuid_cols) 337 joined_df = merge( 338 new_df.infer_objects(copy=False).fillna(NA), 339 old_df.infer_objects(copy=False).fillna(NA), 340 how='left', 341 on=None, 342 indicator=True, 343 ) 344 changed_rows_mask = (joined_df['_merge'] == 'left_only') 345 new_cols = list(new_df_dtypes) 346 delta_df = joined_df[new_cols][changed_rows_mask].reset_index(drop=True) 347 348 for json_col in json_cols: 349 if json_col not in delta_df.columns: 350 continue 351 try: 352 delta_df[json_col] = delta_df[json_col].apply(json.loads) 353 except Exception: 354 warn(f"Unable to deserialize JSON column '{json_col}':\n{traceback.format_exc()}") 355 356 for numeric_col in numeric_cols: 357 if numeric_col not in delta_df.columns: 358 continue 359 try: 360 delta_df[numeric_col] = delta_df[numeric_col].apply(attempt_cast_to_numeric) 361 except Exception: 362 warn(f"Unable to parse numeric column '{numeric_col}':\n{traceback.format_exc()}") 363 364 for uuid_col in uuid_cols: 365 if uuid_col not in delta_df.columns: 366 continue 367 try: 368 delta_df[uuid_col] = delta_df[uuid_col].apply(attempt_cast_to_uuid) 369 except Exception: 370 warn(f"Unable to parse numeric column '{uuid_col}':\n{traceback.format_exc()}") 371 372 return delta_df
Left join two DataFrames to find the newest unseen data.
Parameters
- old_df ('pd.DataFrame'):
The original (target) dataframe. Acts as a filter on the
new_df
. - new_df ('pd.DataFrame'):
The fetched (source) dataframe. Rows that are contained in
old_df
are removed. - safe_copy (bool, default True):
If
True
, create a copy before comparing and modifying the dataframes. Setting toFalse
may mutate the DataFrames. - dtypes (Optional[Dict[str, Any]], default None): Optionally specify the datatypes of the dataframe.
- include_unchanged_columns (bool, default False):
If
True
, include columns which haven't changed on rows which have changed. - debug (bool, default False): Verbosity toggle.
Returns
- A pandas dataframe of the new, unseen rows in
new_df
.
Examples
>>> import pandas as pd
>>> df1 = pd.DataFrame({'a': [1,2]})
>>> df2 = pd.DataFrame({'a': [2,3]})
>>> filter_unseen_df(df1, df2)
a
0 3
375def parse_df_datetimes( 376 df: 'pd.DataFrame', 377 ignore_cols: Optional[Iterable[str]] = None, 378 strip_timezone: bool = False, 379 chunksize: Optional[int] = None, 380 dtype_backend: str = 'numpy_nullable', 381 debug: bool = False, 382) -> 'pd.DataFrame': 383 """ 384 Parse a pandas DataFrame for datetime columns and cast as datetimes. 385 386 Parameters 387 ---------- 388 df: pd.DataFrame 389 The pandas DataFrame to parse. 390 391 ignore_cols: Optional[Iterable[str]], default None 392 If provided, do not attempt to coerce these columns as datetimes. 393 394 strip_timezone: bool, default False 395 If `True`, remove the UTC `tzinfo` property. 396 397 chunksize: Optional[int], default None 398 If the pandas implementation is `'dask'`, use this chunksize for the distributed dataframe. 399 400 dtype_backend: str, default 'numpy_nullable' 401 If `df` is not a DataFrame and new one needs to be constructed, 402 use this as the datatypes backend. 403 Accepted values are 'numpy_nullable' and 'pyarrow'. 404 405 debug: bool, default False 406 Verbosity toggle. 407 408 Returns 409 ------- 410 A new pandas DataFrame with the determined datetime columns 411 (usually ISO strings) cast as datetimes. 412 413 Examples 414 -------- 415 ```python 416 >>> import pandas as pd 417 >>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']}) 418 >>> df.dtypes 419 a object 420 dtype: object 421 >>> df = parse_df_datetimes(df) 422 >>> df.dtypes 423 a datetime64[ns] 424 dtype: object 425 426 ``` 427 428 """ 429 from meerschaum.utils.packages import import_pandas, attempt_import 430 from meerschaum.utils.debug import dprint 431 from meerschaum.utils.warnings import warn 432 from meerschaum.utils.misc import items_str 433 import traceback 434 pd = import_pandas() 435 pandas = attempt_import('pandas') 436 pd_name = pd.__name__ 437 using_dask = 'dask' in pd_name 438 df_is_dask = (hasattr(df, '__module__') and 'dask' in df.__module__) 439 dask_dataframe = None 440 if using_dask or df_is_dask: 441 npartitions = chunksize_to_npartitions(chunksize) 442 dask_dataframe = attempt_import('dask.dataframe') 443 444 ### if df is a dict, build DataFrame 445 if isinstance(df, pandas.DataFrame): 446 pdf = df 447 elif df_is_dask and isinstance(df, dask_dataframe.DataFrame): 448 pdf = get_first_valid_dask_partition(df) 449 else: 450 if debug: 451 dprint(f"df is of type '{type(df)}'. Building {pd.DataFrame}...") 452 453 if using_dask: 454 if isinstance(df, list): 455 keys = set() 456 for doc in df: 457 for key in doc: 458 keys.add(key) 459 df = pd.DataFrame.from_dict( 460 { 461 k: [ 462 doc.get(k, None) 463 for doc in df 464 ] for k in keys 465 }, 466 npartitions=npartitions, 467 ) 468 elif isinstance(df, dict): 469 df = pd.DataFrame.from_dict(df, npartitions=npartitions) 470 elif 'pandas.core.frame.DataFrame' in str(type(df)): 471 df = pd.from_pandas(df, npartitions=npartitions) 472 else: 473 raise Exception("Can only parse dictionaries or lists of dictionaries with Dask.") 474 pandas = attempt_import('pandas') 475 pdf = get_first_valid_dask_partition(df) 476 477 else: 478 df = pd.DataFrame(df).convert_dtypes(dtype_backend=dtype_backend) 479 pdf = df 480 481 ### skip parsing if DataFrame is empty 482 if len(pdf) == 0: 483 if debug: 484 dprint(f"df is empty. Returning original DataFrame without casting datetime columns...") 485 return df 486 487 ignore_cols = set( 488 (ignore_cols or []) + [ 489 col 490 for col, dtype in pdf.dtypes.items() 491 if 'datetime' in str(dtype) 492 ] 493 ) 494 cols_to_inspect = [col for col in pdf.columns if col not in ignore_cols] 495 496 if len(cols_to_inspect) == 0: 497 if debug: 498 dprint(f"All columns are ignored, skipping datetime detection...") 499 return df.fillna(pandas.NA) 500 501 ### apply regex to columns to determine which are ISO datetimes 502 iso_dt_regex = r'\d{4}-\d{2}-\d{2}.\d{2}\:\d{2}\:\d+' 503 dt_mask = pdf[cols_to_inspect].astype(str).apply( 504 lambda s: s.str.match(iso_dt_regex).all() 505 ) 506 507 ### list of datetime column names 508 datetime_cols = [col for col in pdf[cols_to_inspect].loc[:, dt_mask]] 509 if not datetime_cols: 510 if debug: 511 dprint("No columns detected as datetimes, returning...") 512 return df.fillna(pandas.NA) 513 514 if debug: 515 dprint("Converting columns to datetimes: " + str(datetime_cols)) 516 517 try: 518 if not using_dask: 519 df[datetime_cols] = df[datetime_cols].apply( 520 pd.to_datetime, 521 utc=True, 522 format='ISO8601', 523 ) 524 else: 525 df[datetime_cols] = df[datetime_cols].apply( 526 pd.to_datetime, 527 utc=True, 528 axis=1, 529 meta={ 530 col: 'datetime64[ns, UTC]' 531 for col in datetime_cols 532 } 533 ) 534 except Exception: 535 warn( 536 f"Unable to apply `pd.to_datetime` to {items_str(datetime_cols)}:\n" 537 + f"{traceback.format_exc()}" 538 ) 539 540 if strip_timezone: 541 for dt in datetime_cols: 542 try: 543 df[dt] = df[dt].dt.tz_localize(None) 544 except Exception: 545 warn( 546 f"Unable to convert column '{dt}' to naive datetime:\n" 547 + f"{traceback.format_exc()}" 548 ) 549 550 return df.fillna(pandas.NA)
Parse a pandas DataFrame for datetime columns and cast as datetimes.
Parameters
- df (pd.DataFrame): The pandas DataFrame to parse.
- ignore_cols (Optional[Iterable[str]], default None): If provided, do not attempt to coerce these columns as datetimes.
- strip_timezone (bool, default False):
If
True
, remove the UTCtzinfo
property. - chunksize (Optional[int], default None):
If the pandas implementation is
'dask'
, use this chunksize for the distributed dataframe. - dtype_backend (str, default 'numpy_nullable'):
If
df
is not a DataFrame and new one needs to be constructed, use this as the datatypes backend. Accepted values are 'numpy_nullable' and 'pyarrow'. - debug (bool, default False): Verbosity toggle.
Returns
- A new pandas DataFrame with the determined datetime columns
- (usually ISO strings) cast as datetimes.
Examples
>>> import pandas as pd
>>> df = pd.DataFrame({'a': ['2022-01-01 00:00:00']})
>>> df.dtypes
a object
dtype: object
>>> df = parse_df_datetimes(df)
>>> df.dtypes
a datetime64[ns]
dtype: object
553def get_unhashable_cols(df: 'pd.DataFrame') -> List[str]: 554 """ 555 Get the columns which contain unhashable objects from a Pandas DataFrame. 556 557 Parameters 558 ---------- 559 df: pd.DataFrame 560 The DataFrame which may contain unhashable objects. 561 562 Returns 563 ------- 564 A list of columns. 565 """ 566 if df is None: 567 return [] 568 if len(df) == 0: 569 return [] 570 571 is_dask = 'dask' in df.__module__ 572 if is_dask: 573 from meerschaum.utils.packages import attempt_import 574 pandas = attempt_import('pandas') 575 df = pandas.DataFrame(get_first_valid_dask_partition(df)) 576 return [ 577 col for col, val in df.iloc[0].items() 578 if not isinstance(val, Hashable) 579 ]
Get the columns which contain unhashable objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
- A list of columns.
582def get_json_cols(df: 'pd.DataFrame') -> List[str]: 583 """ 584 Get the columns which contain unhashable objects from a Pandas DataFrame. 585 586 Parameters 587 ---------- 588 df: pd.DataFrame 589 The DataFrame which may contain unhashable objects. 590 591 Returns 592 ------- 593 A list of columns to be encoded as JSON. 594 """ 595 if df is None: 596 return [] 597 598 is_dask = 'dask' in df.__module__ if hasattr(df, '__module__') else False 599 if is_dask: 600 df = get_first_valid_dask_partition(df) 601 602 if len(df) == 0: 603 return [] 604 605 cols_indices = { 606 col: df[col].first_valid_index() 607 for col in df.columns 608 } 609 return [ 610 col 611 for col, ix in cols_indices.items() 612 if ( 613 ix is not None 614 and 615 not isinstance(df.loc[ix][col], Hashable) 616 ) 617 ]
Get the columns which contain unhashable objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain unhashable objects.
Returns
- A list of columns to be encoded as JSON.
620def get_numeric_cols(df: 'pd.DataFrame') -> List[str]: 621 """ 622 Get the columns which contain `decimal.Decimal` objects from a Pandas DataFrame. 623 624 Parameters 625 ---------- 626 df: pd.DataFrame 627 The DataFrame which may contain decimal objects. 628 629 Returns 630 ------- 631 A list of columns to treat as numerics. 632 """ 633 if df is None: 634 return [] 635 from decimal import Decimal 636 is_dask = 'dask' in df.__module__ 637 if is_dask: 638 df = get_first_valid_dask_partition(df) 639 640 if len(df) == 0: 641 return [] 642 643 cols_indices = { 644 col: df[col].first_valid_index() 645 for col in df.columns 646 } 647 return [ 648 col 649 for col, ix in cols_indices.items() 650 if ( 651 ix is not None 652 and 653 isinstance(df.loc[ix][col], Decimal) 654 ) 655 ]
Get the columns which contain decimal.Decimal
objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain decimal objects.
Returns
- A list of columns to treat as numerics.
658def get_uuid_cols(df: 'pd.DataFrame') -> List[str]: 659 """ 660 Get the columns which contain `uuid.UUID` objects from a Pandas DataFrame. 661 662 Parameters 663 ---------- 664 df: pd.DataFrame 665 The DataFrame which may contain UUID objects. 666 667 Returns 668 ------- 669 A list of columns to treat as numerics. 670 """ 671 if df is None: 672 return [] 673 from uuid import UUID 674 is_dask = 'dask' in df.__module__ 675 if is_dask: 676 df = get_first_valid_dask_partition(df) 677 678 if len(df) == 0: 679 return [] 680 681 cols_indices = { 682 col: df[col].first_valid_index() 683 for col in df.columns 684 } 685 return [ 686 col 687 for col, ix in cols_indices.items() 688 if ( 689 ix is not None 690 and 691 isinstance(df.loc[ix][col], UUID) 692 ) 693 ]
Get the columns which contain uuid.UUID
objects from a Pandas DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame which may contain UUID objects.
Returns
- A list of columns to treat as numerics.
696def enforce_dtypes( 697 df: 'pd.DataFrame', 698 dtypes: Dict[str, str], 699 safe_copy: bool = True, 700 coerce_numeric: bool = True, 701 coerce_timezone: bool = True, 702 strip_timezone: bool = False, 703 debug: bool = False, 704) -> 'pd.DataFrame': 705 """ 706 Enforce the `dtypes` dictionary on a DataFrame. 707 708 Parameters 709 ---------- 710 df: pd.DataFrame 711 The DataFrame on which to enforce dtypes. 712 713 dtypes: Dict[str, str] 714 The data types to attempt to enforce on the DataFrame. 715 716 safe_copy: bool, default True 717 If `True`, create a copy before comparing and modifying the dataframes. 718 Setting to `False` may mutate the DataFrames. 719 See `meerschaum.utils.dataframe.filter_unseen_df`. 720 721 coerce_numeric: bool, default True 722 If `True`, convert float and int collisions to numeric. 723 724 coerce_timezone: bool, default True 725 If `True`, convert datetimes to UTC. 726 727 strip_timezone: bool, default False 728 If `coerce_timezone` and `strip_timezone` are `True`, 729 remove timezone information from datetimes. 730 731 debug: bool, default False 732 Verbosity toggle. 733 734 Returns 735 ------- 736 The Pandas DataFrame with the types enforced. 737 """ 738 import json 739 from meerschaum.utils.debug import dprint 740 from meerschaum.utils.formatting import pprint 741 from meerschaum.utils.dtypes import ( 742 are_dtypes_equal, 743 to_pandas_dtype, 744 is_dtype_numeric, 745 attempt_cast_to_numeric, 746 attempt_cast_to_uuid, 747 coerce_timezone as _coerce_timezone, 748 ) 749 pandas = mrsm.attempt_import('pandas') 750 is_dask = 'dask' in df.__module__ 751 if safe_copy: 752 df = df.copy() 753 if len(df.columns) == 0: 754 if debug: 755 dprint("Incoming DataFrame has no columns. Skipping enforcement...") 756 return df 757 758 pipe_pandas_dtypes = { 759 col: to_pandas_dtype(typ) 760 for col, typ in dtypes.items() 761 } 762 json_cols = [ 763 col 764 for col, typ in dtypes.items() 765 if typ == 'json' 766 ] 767 numeric_cols = [ 768 col 769 for col, typ in dtypes.items() 770 if typ == 'numeric' 771 ] 772 uuid_cols = [ 773 col 774 for col, typ in dtypes.items() 775 if typ == 'uuid' 776 ] 777 datetime_cols = [ 778 col 779 for col, typ in dtypes.items() 780 if are_dtypes_equal(typ, 'datetime') 781 ] 782 df_numeric_cols = get_numeric_cols(df) 783 if debug: 784 dprint("Desired data types:") 785 pprint(dtypes) 786 dprint("Data types for incoming DataFrame:") 787 pprint({_col: str(_typ) for _col, _typ in df.dtypes.items()}) 788 789 if json_cols and len(df) > 0: 790 if debug: 791 dprint(f"Checking columns for JSON encoding: {json_cols}") 792 for col in json_cols: 793 if col in df.columns: 794 try: 795 df[col] = df[col].apply( 796 ( 797 lambda x: ( 798 json.loads(x) 799 if isinstance(x, str) 800 else x 801 ) 802 ) 803 ) 804 except Exception as e: 805 if debug: 806 dprint(f"Unable to parse column '{col}' as JSON:\n{e}") 807 808 if numeric_cols: 809 if debug: 810 dprint(f"Checking for numerics: {numeric_cols}") 811 for col in numeric_cols: 812 if col in df.columns: 813 try: 814 df[col] = df[col].apply(attempt_cast_to_numeric) 815 except Exception as e: 816 if debug: 817 dprint(f"Unable to parse column '{col}' as NUMERIC:\n{e}") 818 819 if uuid_cols: 820 if debug: 821 dprint(f"Checking for UUIDs: {uuid_cols}") 822 for col in uuid_cols: 823 if col in df.columns: 824 try: 825 df[col] = df[col].apply(attempt_cast_to_uuid) 826 except Exception as e: 827 if debug: 828 dprint(f"Unable to parse column '{col}' as UUID:\n{e}") 829 830 if datetime_cols and coerce_timezone: 831 if debug: 832 dprint(f"Checking for datetime conversion: {datetime_cols}") 833 for col in datetime_cols: 834 if col in df.columns: 835 df[col] = _coerce_timezone(df[col], strip_utc=strip_timezone) 836 837 df_dtypes = {c: str(t) for c, t in df.dtypes.items()} 838 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 839 if debug: 840 dprint("Data types match. Exiting enforcement...") 841 return df 842 843 common_dtypes = {} 844 common_diff_dtypes = {} 845 for col, typ in pipe_pandas_dtypes.items(): 846 if col in df_dtypes: 847 common_dtypes[col] = typ 848 if not are_dtypes_equal(typ, df_dtypes[col]): 849 common_diff_dtypes[col] = df_dtypes[col] 850 851 if debug: 852 dprint("Common columns with different dtypes:") 853 pprint(common_diff_dtypes) 854 855 detected_dt_cols = {} 856 for col, typ in common_diff_dtypes.items(): 857 if 'datetime' in typ and 'datetime' in common_dtypes[col]: 858 df_dtypes[col] = typ 859 detected_dt_cols[col] = (common_dtypes[col], common_diff_dtypes[col]) 860 for col in detected_dt_cols: 861 del common_diff_dtypes[col] 862 863 if debug: 864 dprint("Common columns with different dtypes (after dates):") 865 pprint(common_diff_dtypes) 866 867 if are_dtypes_equal(df_dtypes, pipe_pandas_dtypes): 868 if debug: 869 dprint( 870 "The incoming DataFrame has mostly the same types, skipping enforcement." 871 + "The only detected difference was in the following datetime columns." 872 ) 873 pprint(detected_dt_cols) 874 return df 875 876 for col, typ in {k: v for k, v in common_diff_dtypes.items()}.items(): 877 previous_typ = common_dtypes[col] 878 mixed_numeric_types = (is_dtype_numeric(typ) and is_dtype_numeric(previous_typ)) 879 explicitly_float = are_dtypes_equal(dtypes.get(col, 'object'), 'float') 880 explicitly_numeric = dtypes.get(col, 'numeric') == 'numeric' 881 cast_to_numeric = ( 882 explicitly_numeric 883 or col in df_numeric_cols 884 or (mixed_numeric_types and not explicitly_float) 885 ) and coerce_numeric 886 if cast_to_numeric: 887 common_dtypes[col] = attempt_cast_to_numeric 888 common_diff_dtypes[col] = attempt_cast_to_numeric 889 890 for d in common_diff_dtypes: 891 t = common_dtypes[d] 892 if debug: 893 dprint(f"Casting column {d} to dtype {t}.") 894 try: 895 df[d] = ( 896 df[d].apply(t) 897 if callable(t) 898 else df[d].astype(t) 899 ) 900 except Exception as e: 901 if debug: 902 dprint(f"Encountered an error when casting column {d} to type {t}:\n{e}") 903 if 'int' in str(t).lower(): 904 try: 905 df[d] = df[d].astype('float64').astype(t) 906 except Exception: 907 if debug: 908 dprint(f"Was unable to convert to float then {t}.") 909 return df
Enforce the dtypes
dictionary on a DataFrame.
Parameters
- df (pd.DataFrame): The DataFrame on which to enforce dtypes.
- dtypes (Dict[str, str]): The data types to attempt to enforce on the DataFrame.
- safe_copy (bool, default True):
If
True
, create a copy before comparing and modifying the dataframes. Setting toFalse
may mutate the DataFrames. Seemeerschaum.utils.dataframe.filter_unseen_df
. - coerce_numeric (bool, default True):
If
True
, convert float and int collisions to numeric. - coerce_timezone (bool, default True):
If
True
, convert datetimes to UTC. - strip_timezone (bool, default False):
If
coerce_timezone
andstrip_timezone
areTrue
, remove timezone information from datetimes. - debug (bool, default False): Verbosity toggle.
Returns
- The Pandas DataFrame with the types enforced.
912def get_datetime_bound_from_df( 913 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]], 914 datetime_column: str, 915 minimum: bool = True, 916) -> Union[int, datetime, None]: 917 """ 918 Return the minimum or maximum datetime (or integer) from a DataFrame. 919 920 Parameters 921 ---------- 922 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]] 923 The DataFrame, list, or dict which contains the range axis. 924 925 datetime_column: str 926 The name of the datetime (or int) column. 927 928 minimum: bool 929 Whether to return the minimum (default) or maximum value. 930 931 Returns 932 ------- 933 The minimum or maximum datetime value in the dataframe, or `None`. 934 """ 935 if df is None: 936 return None 937 if not datetime_column: 938 return None 939 940 def compare(a, b): 941 if a is None: 942 return b 943 if b is None: 944 return a 945 if minimum: 946 return a if a < b else b 947 return a if a > b else b 948 949 if isinstance(df, list): 950 if len(df) == 0: 951 return None 952 best_yet = df[0].get(datetime_column, None) 953 for doc in df: 954 val = doc.get(datetime_column, None) 955 best_yet = compare(best_yet, val) 956 return best_yet 957 958 if isinstance(df, dict): 959 if datetime_column not in df: 960 return None 961 best_yet = df[datetime_column][0] 962 for val in df[datetime_column]: 963 best_yet = compare(best_yet, val) 964 return best_yet 965 966 if 'DataFrame' in str(type(df)): 967 from meerschaum.utils.dtypes import are_dtypes_equal 968 pandas = mrsm.attempt_import('pandas') 969 is_dask = 'dask' in df.__module__ 970 971 if datetime_column not in df.columns: 972 return None 973 974 try: 975 dt_val = ( 976 df[datetime_column].min(skipna=True) 977 if minimum 978 else df[datetime_column].max(skipna=True) 979 ) 980 except Exception: 981 dt_val = pandas.NA 982 if is_dask and dt_val is not None and dt_val is not pandas.NA: 983 dt_val = dt_val.compute() 984 985 return ( 986 pandas.to_datetime(dt_val).to_pydatetime() 987 if are_dtypes_equal(str(type(dt_val)), 'datetime') 988 else (dt_val if dt_val is not pandas.NA else None) 989 ) 990 991 return None
Return the minimum or maximum datetime (or integer) from a DataFrame.
Parameters
- df (Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]): The DataFrame, list, or dict which contains the range axis.
- datetime_column (str): The name of the datetime (or int) column.
- minimum (bool): Whether to return the minimum (default) or maximum value.
Returns
- The minimum or maximum datetime value in the dataframe, or
None
.
994def get_unique_index_values( 995 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]], 996 indices: List[str], 997) -> Dict[str, List[Any]]: 998 """ 999 Return a dictionary of the unique index values in a DataFrame. 1000 1001 Parameters 1002 ---------- 1003 df: Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]] 1004 The dataframe (or list or dict) which contains index values. 1005 1006 indices: List[str] 1007 The list of index columns. 1008 1009 Returns 1010 ------- 1011 A dictionary mapping indices to unique values. 1012 """ 1013 if df is None: 1014 return {} 1015 if 'dataframe' in str(type(df)).lower(): 1016 pandas = mrsm.attempt_import('pandas') 1017 return { 1018 col: list({ 1019 (val if val is not pandas.NA else None) 1020 for val in df[col].unique() 1021 }) 1022 for col in indices 1023 if col in df.columns 1024 } 1025 1026 unique_indices = defaultdict(lambda: set()) 1027 if isinstance(df, list): 1028 for doc in df: 1029 for index in indices: 1030 if index in doc: 1031 unique_indices[index].add(doc[index]) 1032 1033 elif isinstance(df, dict): 1034 for index in indices: 1035 if index in df: 1036 unique_indices[index] = unique_indices[index].union(set(df[index])) 1037 1038 return {key: list(val) for key, val in unique_indices.items()}
Return a dictionary of the unique index values in a DataFrame.
Parameters
- df (Union['pd.DataFrame', Dict[str, List[Any]], List[Dict[str, Any]]]): The dataframe (or list or dict) which contains index values.
- indices (List[str]): The list of index columns.
Returns
- A dictionary mapping indices to unique values.
1041def df_is_chunk_generator(df: Any) -> bool: 1042 """ 1043 Determine whether to treat `df` as a chunk generator. 1044 1045 Note this should only be used in a context where generators are expected, 1046 as it will return `True` for any iterable. 1047 1048 Parameters 1049 ---------- 1050 The DataFrame or chunk generator to evaluate. 1051 1052 Returns 1053 ------- 1054 A `bool` indicating whether to treat `df` as a generator. 1055 """ 1056 return ( 1057 not isinstance(df, (dict, list, str)) 1058 and 'DataFrame' not in str(type(df)) 1059 and isinstance(df, (Generator, Iterable, Iterator)) 1060 )
Determine whether to treat df
as a chunk generator.
Note this should only be used in a context where generators are expected,
as it will return True
for any iterable.
Parameters
- The DataFrame or chunk generator to evaluate.
Returns
- A
bool
indicating whether to treatdf
as a generator.
1063def chunksize_to_npartitions(chunksize: Optional[int]) -> int: 1064 """ 1065 Return the Dask `npartitions` value for a given `chunksize`. 1066 """ 1067 if chunksize == -1: 1068 from meerschaum.config import get_config 1069 chunksize = get_config('system', 'connectors', 'sql', 'chunksize') 1070 if chunksize is None: 1071 return 1 1072 return -1 * chunksize
Return the Dask npartitions
value for a given chunksize
.
1075def df_from_literal( 1076 pipe: Optional[mrsm.Pipe] = None, 1077 literal: str = None, 1078 debug: bool = False 1079) -> 'pd.DataFrame': 1080 """ 1081 Construct a dataframe from a literal value, using the pipe's datetime and value column names. 1082 1083 Parameters 1084 ---------- 1085 pipe: Optional['meerschaum.Pipe'], default None 1086 The pipe which will consume the literal value. 1087 1088 Returns 1089 ------- 1090 A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns 1091 and the literal as the value. 1092 """ 1093 from meerschaum.utils.packages import import_pandas 1094 from meerschaum.utils.warnings import error, warn 1095 from meerschaum.utils.debug import dprint 1096 1097 if pipe is None or literal is None: 1098 error("Please provide a Pipe and a literal value") 1099 ### this will raise an error if the columns are undefined 1100 dt_name, val_name = pipe.get_columns('datetime', 'value') 1101 1102 val = literal 1103 if isinstance(literal, str): 1104 if debug: 1105 dprint(f"Received literal string: '{literal}'") 1106 import ast 1107 try: 1108 val = ast.literal_eval(literal) 1109 except Exception as e: 1110 warn( 1111 "Failed to parse value from string:\n" + f"{literal}" + 1112 "\n\nWill cast as a string instead."\ 1113 ) 1114 val = literal 1115 1116 from datetime import datetime, timezone 1117 now = datetime.now(timezone.utc).replace(tzinfo=None) 1118 1119 pd = import_pandas() 1120 return pd.DataFrame({dt_name: [now], val_name: [val]})
Construct a dataframe from a literal value, using the pipe's datetime and value column names.
Parameters
- pipe (Optional['meerschaum.Pipe'], default None): The pipe which will consume the literal value.
Returns
- A 1-row pandas DataFrame from with the current UTC timestamp as the datetime columns
- and the literal as the value.
1123def get_first_valid_dask_partition(ddf: 'dask.dataframe.DataFrame') -> Union['pd.DataFrame', None]: 1124 """ 1125 Return the first valid Dask DataFrame partition (if possible). 1126 """ 1127 pdf = None 1128 for partition in ddf.partitions: 1129 try: 1130 pdf = partition.compute() 1131 except Exception as e: 1132 continue 1133 if len(pdf) > 0: 1134 return pdf 1135 _ = mrsm.attempt_import('partd', lazy=False) 1136 return ddf.compute()
Return the first valid Dask DataFrame partition (if possible).
1139def query_df( 1140 df: 'pd.DataFrame', 1141 params: Optional[Dict[str, Any]] = None, 1142 begin: Union[datetime, int, None] = None, 1143 end: Union[datetime, int, None] = None, 1144 datetime_column: Optional[str] = None, 1145 select_columns: Optional[List[str]] = None, 1146 omit_columns: Optional[List[str]] = None, 1147 inplace: bool = False, 1148 reset_index: bool = False, 1149 coerce_types: bool = False, 1150 debug: bool = False, 1151) -> 'pd.DataFrame': 1152 """ 1153 Query the dataframe with the params dictionary. 1154 1155 Parameters 1156 ---------- 1157 df: pd.DataFrame 1158 The DataFrame to query against. 1159 1160 params: Optional[Dict[str, Any]], default None 1161 The parameters dictionary to use for the query. 1162 1163 begin: Union[datetime, int, None], default None 1164 If `begin` and `datetime_column` are provided, only return rows with a timestamp 1165 greater than or equal to this value. 1166 1167 end: Union[datetime, int, None], default None 1168 If `begin` and `datetime_column` are provided, only return rows with a timestamp 1169 less than this value. 1170 1171 datetime_column: Optional[str], default None 1172 A `datetime_column` must be provided to use `begin` and `end`. 1173 1174 select_columns: Optional[List[str]], default None 1175 If provided, only return these columns. 1176 1177 omit_columns: Optional[List[str]], default None 1178 If provided, do not include these columns in the result. 1179 1180 inplace: bool, default False 1181 If `True`, modify the DataFrame inplace rather than creating a new DataFrame. 1182 1183 reset_index: bool, default False 1184 If `True`, reset the index in the resulting DataFrame. 1185 1186 coerce_types: bool, default False 1187 If `True`, cast the dataframe and parameters as strings before querying. 1188 1189 Returns 1190 ------- 1191 A Pandas DataFrame query result. 1192 """ 1193 1194 def _process_select_columns(_df): 1195 if not select_columns: 1196 return 1197 for col in list(_df.columns): 1198 if col not in select_columns: 1199 del _df[col] 1200 1201 def _process_omit_columns(_df): 1202 if not omit_columns: 1203 return 1204 for col in list(_df.columns): 1205 if col in omit_columns: 1206 del _df[col] 1207 1208 if not params and not begin and not end: 1209 if not inplace: 1210 df = df.copy() 1211 _process_select_columns(df) 1212 _process_omit_columns(df) 1213 return df 1214 1215 from meerschaum.utils.debug import dprint 1216 from meerschaum.utils.misc import get_in_ex_params 1217 from meerschaum.utils.warnings import warn 1218 from meerschaum.utils.dtypes import are_dtypes_equal, value_is_null 1219 dateutil_parser = mrsm.attempt_import('dateutil.parser') 1220 pandas = mrsm.attempt_import('pandas') 1221 NA = pandas.NA 1222 1223 if params: 1224 params = params.copy() 1225 for key, val in {k: v for k, v in params.items()}.items(): 1226 if isinstance(val, (list, tuple)): 1227 if None in val: 1228 val = [item for item in val if item is not None] + [NA] 1229 params[key] = val 1230 if coerce_types: 1231 params[key] = [str(x) for x in val] 1232 else: 1233 if value_is_null(val): 1234 val = NA 1235 params[key] = NA 1236 if coerce_types: 1237 params[key] = str(val) 1238 1239 dtypes = {col: str(typ) for col, typ in df.dtypes.items()} 1240 1241 if inplace: 1242 df.fillna(NA, inplace=True) 1243 else: 1244 df = df.infer_objects().fillna(NA) 1245 1246 if isinstance(begin, str): 1247 begin = dateutil_parser.parse(begin) 1248 if isinstance(end, str): 1249 end = dateutil_parser.parse(end) 1250 1251 if begin is not None or end is not None: 1252 if not datetime_column or datetime_column not in df.columns: 1253 warn( 1254 f"The datetime column '{datetime_column}' is not present in the Dataframe, " 1255 + "ignoring begin and end...", 1256 ) 1257 begin, end = None, None 1258 1259 if debug: 1260 dprint(f"Querying dataframe:\n{params=} {begin=} {end=} {datetime_column=}") 1261 1262 if datetime_column and (begin is not None or end is not None): 1263 if debug: 1264 dprint("Checking for datetime column compatability.") 1265 1266 from meerschaum.utils.dtypes import coerce_timezone 1267 df_is_dt = are_dtypes_equal(str(df.dtypes[datetime_column]), 'datetime') 1268 begin_is_int = are_dtypes_equal(str(type(begin)), 'int') 1269 end_is_int = are_dtypes_equal(str(type(end)), 'int') 1270 1271 if df_is_dt: 1272 df_tz = ( 1273 getattr(df[datetime_column].dt, 'tz', None) 1274 if hasattr(df[datetime_column], 'dt') 1275 else None 1276 ) 1277 1278 if begin_is_int: 1279 begin = datetime.fromtimestamp(int(begin), timezone.utc).replace(tzinfo=None) 1280 if debug: 1281 dprint(f"`begin` will be cast to '{begin}'.") 1282 if end_is_int: 1283 end = datetime.fromtimestamp(int(end), timezone.utc).replace(tzinfo=None) 1284 if debug: 1285 dprint(f"`end` will be cast to '{end}'.") 1286 1287 begin = coerce_timezone(begin, strip_utc=(df_tz is None)) if begin is not None else None 1288 end = coerce_timezone(end, strip_utc=(df_tz is None)) if begin is not None else None 1289 1290 in_ex_params = get_in_ex_params(params) 1291 1292 masks = [ 1293 ( 1294 (df[datetime_column] >= begin) 1295 if begin is not None and datetime_column 1296 else True 1297 ) & ( 1298 (df[datetime_column] < end) 1299 if end is not None and datetime_column 1300 else True 1301 ) 1302 ] 1303 1304 masks.extend([ 1305 ( 1306 ( 1307 (df[col] if not coerce_types else df[col].astype(str)).isin(in_vals) 1308 if in_vals 1309 else True 1310 ) & ( 1311 ~(df[col] if not coerce_types else df[col].astype(str)).isin(ex_vals) 1312 if ex_vals 1313 else True 1314 ) 1315 ) 1316 for col, (in_vals, ex_vals) in in_ex_params.items() 1317 if col in df.columns 1318 ]) 1319 query_mask = masks[0] 1320 for mask in masks[1:]: 1321 query_mask = query_mask & mask 1322 1323 original_cols = df.columns 1324 1325 ### NOTE: We must cast bool columns to `boolean[pyarrow]` 1326 ### to allow for `<NA>` values. 1327 bool_cols = [ 1328 col 1329 for col, typ in df.dtypes.items() 1330 if are_dtypes_equal(str(typ), 'bool') 1331 ] 1332 for col in bool_cols: 1333 df[col] = df[col].astype('boolean[pyarrow]') 1334 1335 if not isinstance(query_mask, bool): 1336 df['__mrsm_mask'] = ( 1337 query_mask.astype('boolean[pyarrow]') 1338 if hasattr(query_mask, 'astype') 1339 else query_mask 1340 ) 1341 1342 if inplace: 1343 df.where(query_mask, other=NA, inplace=True) 1344 df.dropna(how='all', inplace=True) 1345 result_df = df 1346 else: 1347 result_df = df.where(query_mask, other=NA) 1348 result_df.dropna(how='all', inplace=True) 1349 1350 else: 1351 result_df = df 1352 1353 if '__mrsm_mask' in df.columns: 1354 del df['__mrsm_mask'] 1355 if '__mrsm_mask' in result_df.columns: 1356 del result_df['__mrsm_mask'] 1357 1358 if reset_index: 1359 result_df.reset_index(drop=True, inplace=True) 1360 1361 result_df = enforce_dtypes( 1362 result_df, 1363 dtypes, 1364 safe_copy=False, 1365 debug=debug, 1366 coerce_numeric=False, 1367 coerce_timezone=False, 1368 ) 1369 1370 if select_columns == ['*']: 1371 select_columns = None 1372 1373 if not select_columns and not omit_columns: 1374 return result_df[original_cols] 1375 1376 _process_select_columns(result_df) 1377 _process_omit_columns(result_df) 1378 1379 return result_df
Query the dataframe with the params dictionary.
Parameters
- df (pd.DataFrame): The DataFrame to query against.
- params (Optional[Dict[str, Any]], default None): The parameters dictionary to use for the query.
- begin (Union[datetime, int, None], default None):
If
begin
anddatetime_column
are provided, only return rows with a timestamp greater than or equal to this value. - end (Union[datetime, int, None], default None):
If
begin
anddatetime_column
are provided, only return rows with a timestamp less than this value. - datetime_column (Optional[str], default None):
A
datetime_column
must be provided to usebegin
andend
. - select_columns (Optional[List[str]], default None): If provided, only return these columns.
- omit_columns (Optional[List[str]], default None): If provided, do not include these columns in the result.
- inplace (bool, default False):
If
True
, modify the DataFrame inplace rather than creating a new DataFrame. - reset_index (bool, default False):
If
True
, reset the index in the resulting DataFrame. - coerce_types (bool, default False):
If
True
, cast the dataframe and parameters as strings before querying.
Returns
- A Pandas DataFrame query result.
1382def to_json( 1383 df: 'pd.DataFrame', 1384 safe_copy: bool = True, 1385 orient: str = 'records', 1386 date_format: str = 'iso', 1387 date_unit: str = 'us', 1388 **kwargs: Any 1389) -> str: 1390 """ 1391 Serialize the given dataframe as a JSON string. 1392 1393 Parameters 1394 ---------- 1395 df: pd.DataFrame 1396 The DataFrame to be serialized. 1397 1398 safe_copy: bool, default True 1399 If `False`, modify the DataFrame inplace. 1400 1401 date_format: str, default 'iso' 1402 The default format for timestamps. 1403 1404 date_unit: str, default 'us' 1405 The precision of the timestamps. 1406 1407 Returns 1408 ------- 1409 A JSON string. 1410 """ 1411 from meerschaum.utils.packages import import_pandas 1412 pd = import_pandas() 1413 uuid_cols = get_uuid_cols(df) 1414 if uuid_cols and safe_copy: 1415 df = df.copy() 1416 for col in uuid_cols: 1417 df[col] = df[col].astype(str) 1418 return df.infer_objects(copy=False).fillna(pd.NA).to_json( 1419 date_format=date_format, 1420 date_unit=date_unit, 1421 orient=orient, 1422 **kwargs 1423 )
Serialize the given dataframe as a JSON string.
Parameters
- df (pd.DataFrame): The DataFrame to be serialized.
- safe_copy (bool, default True):
If
False
, modify the DataFrame inplace. - date_format (str, default 'iso'): The default format for timestamps.
- date_unit (str, default 'us'): The precision of the timestamps.
Returns
- A JSON string.